/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2016 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: we_ddlcommandproc.cpp 3082 2011-09-26 22:00:38Z chao $ #include #include "boost/filesystem/operations.hpp" #include "boost/filesystem/path.hpp" #include "boost/scoped_ptr.hpp" using namespace std; #include "bytestream.h" using namespace messageqcpp; #include "we_messages.h" #include "we_message_handlers.h" #include "we_ddlcommon.h" #include "we_ddlcommandproc.h" #include "ddlpkg.h" using namespace ddlpackage; #include #include "dataconvert.h" using namespace dataconvert; // #include "we_brm.h" namespace fs = boost::filesystem; #include "cacheutils.h" #include "IDBDataFile.h" #include "IDBPolicy.h" using namespace idbdatafile; using namespace execplan; namespace WriteEngine { WE_DDLCommandProc::WE_DDLCommandProc() { filesPerColumnPartition = 8; extentsPerSegmentFile = 1; dbrootCnt = 1; extentRows = 0x800000; config::Config* cf = config::Config::makeConfig(); string fpc = cf->getConfig("ExtentMap", "FilesPerColumnPartition"); if (fpc.length() != 0) filesPerColumnPartition = cf->uFromText(fpc); // MCOL-4685: remove the option to set more than 2 extents per file (ExtentsPreSegmentFile). extentsPerSegmentFile = DEFAULT_EXTENTS_PER_SEGMENT_FILE; string dbct = cf->getConfig("SystemConfig", "DBRootCount"); if (dbct.length() != 0) dbrootCnt = cf->uFromText(dbct); } WE_DDLCommandProc::WE_DDLCommandProc(const WE_DDLCommandProc& rhs) { } WE_DDLCommandProc::~WE_DDLCommandProc() { } uint8_t WE_DDLCommandProc::updateSyscolumnNextval(ByteStream& bs, std::string& err) { uint32_t columnOid, sessionID; uint64_t nextVal; int rc = 0; bs >> columnOid; bs >> nextVal; bs >> sessionID; uint16_t dbRoot; BRM::OID_t oid = 1021; fDbrm.getSysCatDBRoot(oid, dbRoot); std::map oids; // std::vector oidsToFlush; oids[columnOid] = columnOid; // oidsToFlush.push_back(columnOid); if (idbdatafile::IDBPolicy::useHdfs()) fWEWrapper.startTransaction(sessionID); rc = fWEWrapper.updateNextValue(sessionID, columnOid, nextVal, sessionID, dbRoot); if (rc != 0) { err = "Error in WE::updateNextValue"; rc = 1; } if (idbdatafile::IDBPolicy::useHdfs()) { fWEWrapper.flushDataFiles(rc, sessionID, oids); fWEWrapper.confirmTransaction(sessionID); if (rc == 0) fWEWrapper.endTransaction(sessionID, true); else fWEWrapper.endTransaction(sessionID, false); } purgeFDCache(); // if (idbdatafile::IDBPolicy::useHdfs()) // cacheutils::flushOIDsFromCache(oidsToFlush); return rc; } uint8_t WE_DDLCommandProc::writeSystable(ByteStream& bs, std::string& err) { int rc = 0; uint32_t sessionID, tmp32; int txnID, tableOID, tableAUXColumnOID; uint32_t tableWithAutoi; bs >> sessionID; bs >> tmp32; txnID = tmp32; bs >> tmp32; tableOID = tmp32; bs >> tmp32; tableAUXColumnOID = tmp32; bs >> tmp32; tableWithAutoi = tmp32; bs >> tmp32; uint16_t dbroot = tmp32; ddlpackage::TableDef tableDef; tableDef.unserialize(bs); WriteEngine::ColTuple colTuple; WriteEngine::ColStruct colStruct; WriteEngine::ColStructList colStructs; WriteEngine::CSCTypesList cscColTypeList; WriteEngine::ColTupleList colTuples; WriteEngine::dictStr dctColTuples; WriteEngine::DctnryStruct dctnryStruct; WriteEngine::ColValueList colValuesList; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DictStrList dctnryValueList; WriteEngine::RIDList ridList; CalpontSystemCatalog::TableName tableName; CalpontSystemCatalog::ROPair sysTableROPair; boost::shared_ptr systemCatalogPtr; ColumnList columns; ColumnList::const_iterator column_iterator; DDLColumn column; int error = 0; tableName.schema = CALPONT_SCHEMA; tableName.table = SYSTABLE_TABLE; systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); std::map oids; // std::vector oidsToFlush; try { sysTableROPair = systemCatalogPtr->tableRID(tableName); getColumnsForTable(sessionID, tableName.schema, tableName.table, columns); column_iterator = columns.begin(); NullString tmpStr; while (column_iterator != columns.end()) { column = *column_iterator; boost::to_lower(column.tableColName.column); tmpStr.dropString(); if (TABLENAME_COL == column.tableColName.column) { std::string tablename = tableDef.fQualifiedName->fName; colTuple.data = tablename; tmpStr.assign(tablename); } else if (SCHEMA_COL == column.tableColName.column) { std::string schema = tableDef.fQualifiedName->fSchema; colTuple.data = schema; tmpStr.assign(schema); } else if (OBJECTID_COL == column.tableColName.column) { colTuple.data = tableOID; } else if (AUXCOLUMNOID_COL == column.tableColName.column) { colTuple.data = tableAUXColumnOID; } else if (CREATEDATE_COL == column.tableColName.column) { time_t t; struct tm tmp; Date aDay; t = time(NULL); gmtime_r(&t, &tmp); aDay.year = tmp.tm_year + 1900; aDay.month = tmp.tm_mon + 1; aDay.day = tmp.tm_mday; colTuple.data = *(reinterpret_cast(&aDay)); } else if (INIT_COL == column.tableColName.column) { colTuple.data = column.colType.getNullValueForType(); } else if (NEXT_COL == column.tableColName.column) { colTuple.data = column.colType.getNullValueForType(); } else if (AUTOINC_COL == column.tableColName.column) { colTuple.data = tableWithAutoi; } else { colTuple.data = column.colType.getNullValueForType(); } colStruct.dataOid = column.oid; colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth; colStruct.tokenFlag = false; colStruct.tokenFlag = column.colType.colWidth > 8 ? true : false; colStruct.colDataType = column.colType.colDataType; colStruct.fColDbRoot = dbroot; if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; dctnryStruct.fCompressionType = 2; } dctnryStruct.fColDbRoot = dbroot; if (colStruct.tokenFlag) { dctnryStruct.dctnryOid = column.colType.ddn.dictOID; dctnryStruct.fCharsetNumber = column.colType.charsetNumber; dctnryStruct.columnOid = column.oid; } else { dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = column.oid; } colStructs.push_back(colStruct); cscColTypeList.push_back(column.colType); oids[colStruct.dataOid] = colStruct.dataOid; // oidsToFlush.push_back(colStruct.dataOid); if (dctnryStruct.dctnryOid > 0) { oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid; // oidsToFlush.push_back(dctnryStruct.dctnryOid); } colTuples.push_back(colTuple); dctColTuples.push_back(tmpStr); colValuesList.push_back(colTuples); dctnryStructList.push_back(dctnryStruct); dctnryValueList.push_back(dctColTuples); colTuples.pop_back(); dctColTuples.pop_back(); ++column_iterator; } // fWEWrapper.setDebugLevel(WriteEngine::DEBUG_3); fWEWrapper.setTransId(txnID); fWEWrapper.setIsInsert(true); fWEWrapper.setBulkFlag(false); fWEWrapper.startTransaction(txnID); if (0 != colStructs.size()) { // MCOL-66 The DBRM can't handle concurrent transactions to sys tables // TODO: This may be redundant static boost::mutex dbrmMutex; boost::mutex::scoped_lock lk(dbrmMutex); error = fWEWrapper.insertColumnRec_SYS(txnID, cscColTypeList, colStructs, colValuesList, dctnryStructList, dctnryValueList, SYSCOLUMN_BASE); if (error != WriteEngine::NO_ERROR) { if (error == ERR_BRM_WR_VB_ENTRY) { throw std::runtime_error("WE: Error writing to BRM."); } else { WErrorCodes ec; throw std::runtime_error("WE: Error updating calpontsys.systable:" + ec.errorString(error)); } } if (idbdatafile::IDBPolicy::useHdfs()) { int rc1 = fWEWrapper.flushDataFiles(error, txnID, oids); if ((error == 0) && (rc1 == 0)) { rc1 = fWEWrapper.confirmTransaction(txnID); if (rc1 == NO_ERROR) rc1 = fWEWrapper.endTransaction(txnID, true); else fWEWrapper.endTransaction(txnID, false); } else { fWEWrapper.endTransaction(txnID, false); } } } } catch (exception& ex) { err += ex.what(); rc = 1; } catch (...) { err += "Unknown exception caught"; rc = 1; } purgeFDCache(); // if (idbdatafile::IDBPolicy::useHdfs()) // cacheutils::flushOIDsFromCache(oidsToFlush); fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); return rc; } uint8_t WE_DDLCommandProc::writeCreateSyscolumn(ByteStream& bs, std::string& err) { int rc = 0; uint32_t sessionID, tmp32, columnSize, dictSize, i; uint8_t tmp8; int txnID, colpos; bs >> sessionID; bs >> tmp32; txnID = tmp32; bs >> columnSize; // deserialize column Oid and dictionary oid vector coloids; vector dictoids; for (i = 0; i < columnSize; ++i) { bs >> tmp32; coloids.push_back(tmp32); } bs >> dictSize; for (i = 0; i < dictSize; ++i) { bs >> tmp32; dictoids.push_back(tmp32); } bool alterFlag = 0; bs >> tmp8; alterFlag = (tmp8 != 0); bs >> tmp32; colpos = tmp32; bs >> tmp32; uint16_t dbroot = tmp32; ddlpackage::TableDef tableDef; tableDef.unserialize(bs); WriteEngine::ColTuple colTuple; WriteEngine::ColStruct colStruct; WriteEngine::ColStructList colStructs; WriteEngine::CSCTypesList cscColTypeList; WriteEngine::ColTupleList colTuples; WriteEngine::dictStr dctColTuples; WriteEngine::DctnryStruct dctnryStruct; WriteEngine::ColValueList colValuesList; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DictStrList dctnryValueList; WriteEngine::RIDList ridList; CalpontSystemCatalog::TableName tableName; CalpontSystemCatalog::ROPair sysTableROPair; boost::shared_ptr systemCatalogPtr; ColumnList columns; ColumnList::const_iterator column_iterator; DDLColumn column; int error = 0; systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); std::map oids; int rc1 = 0; ColumnDef* colDefPtr = 0; ColumnDefList::const_iterator iter; int startPos = colpos; tableName.schema = CALPONT_SCHEMA; tableName.table = SYSCOLUMN_TABLE; getColumnsForTable(sessionID, tableName.schema, tableName.table, columns); unsigned int numCols = columns.size(); // WriteEngine::ColTupleList colList[numCols]; // ColTupleList is NOT POD, so let's try this: std::vector colList; // WriteEngine::dictStr dctColList[numCols]; std::vector dctColList; ColumnDefList tableDefCols = tableDef.fColumns; ddlpackage::QualifiedName qualifiedName = *(tableDef.fQualifiedName); iter = tableDefCols.begin(); // colpos = 0; NullString tmpStr; for (unsigned int ii = 0; ii < numCols; ii++) { colList.push_back(WriteEngine::ColTupleList()); dctColList.push_back(WriteEngine::dictStr()); } try { unsigned int col = 0; unsigned int dictcol = 0; while (iter != tableDefCols.end()) { colDefPtr = *iter; DictOID dictOID = {0, 0, 0, 0, 0}; int dataType; dataType = convertDataType(colDefPtr->fType->fType); if (dataType == CalpontSystemCatalog::DECIMAL || dataType == CalpontSystemCatalog::UDECIMAL) { if (colDefPtr->fType->fPrecision > 38) // precision cannot be over 38. { ostringstream os; os << "Syntax error: The maximum precision (total number of digits) that can be specified is 38"; throw std::runtime_error(os.str()); } else if (colDefPtr->fType->fPrecision < colDefPtr->fType->fScale) { ostringstream os; os << "Syntax error: scale should be less than precision, precision: " << colDefPtr->fType->fPrecision << " scale: " << colDefPtr->fType->fScale; throw std::runtime_error(os.str()); } colDefPtr->convertDecimal(); } bool hasDict = false; if ((dataType == CalpontSystemCatalog::CHAR && colDefPtr->fType->fLength > 8) || (dataType == CalpontSystemCatalog::VARCHAR && colDefPtr->fType->fLength > 7) || (dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength > 7) || (dataType == CalpontSystemCatalog::BLOB && colDefPtr->fType->fLength > 7) || (dataType == CalpontSystemCatalog::TEXT && colDefPtr->fType->fLength > 7)) { hasDict = true; dictOID.compressionType = colDefPtr->fType->fCompressiontype; dictOID.colWidth = colDefPtr->fType->fLength; dictOID.dictOID = dictoids[dictcol]; dictcol++; //@Bug 2534. Take away the limit of 255 and set the limit to 8000. if ((colDefPtr->fType->fLength > 8000) && (dataType != CalpontSystemCatalog::BLOB) && (dataType != CalpontSystemCatalog::TEXT)) { ostringstream os; os << "char, varchar and varbinary length may not exceed 8000 bytes"; throw std::runtime_error(os.str()); } } else if ((dataType == CalpontSystemCatalog::VARBINARY || dataType == CalpontSystemCatalog::BLOB || dataType == CalpontSystemCatalog::TEXT) && colDefPtr->fType->fLength <= 7) { ostringstream os; os << "varbinary and blob length may not be less than 8"; throw std::runtime_error(os.str()); } unsigned int i = 0; column_iterator = columns.begin(); while (column_iterator != columns.end()) { column = *column_iterator; boost::to_lower(column.tableColName.column); if (SCHEMA_COL == column.tableColName.column) { colTuple.data = qualifiedName.fSchema; tmpStr.assign(qualifiedName.fSchema); } else if (TABLENAME_COL == column.tableColName.column) { colTuple.data = qualifiedName.fName; tmpStr.assign(qualifiedName.fName); } else if (COLNAME_COL == column.tableColName.column) { boost::to_lower(colDefPtr->fName); colTuple.data = colDefPtr->fName; tmpStr.assign(colDefPtr->fName); } else if (OBJECTID_COL == column.tableColName.column) { if (alterFlag) colTuple.data = coloids[col]; else colTuple.data = coloids[col]; } else if (DATATYPE_COL == column.tableColName.column) { colTuple.data = dataType; } else if (COLUMNLEN_COL == column.tableColName.column) { //@Bug 2089 Disallow zero length char and varch column to be created if (dataType == CalpontSystemCatalog::CHAR || dataType == CalpontSystemCatalog::VARCHAR || dataType == CalpontSystemCatalog::VARBINARY || dataType == CalpontSystemCatalog::BLOB || dataType == CalpontSystemCatalog::TEXT) { if (colDefPtr->fType->fLength <= 0) { ostringstream os; os << "char, varchar and varbinary length must be greater than zero"; throw std::runtime_error(os.str()); } } colTuple.data = colDefPtr->fType->fLength; } else if (COLUMNPOS_COL == column.tableColName.column) { colTuple.data = colpos; } else if (DEFAULTVAL_COL == column.tableColName.column) { if (colDefPtr->fDefaultValue && !colDefPtr->fDefaultValue->fNull) { tmpStr.assign(colDefPtr->fDefaultValue->fValue); } else { tmpStr.dropString(); } colTuple.data = tmpStr; } else if (NULLABLE_COL == column.tableColName.column) { int nullable = 1; ColumnConstraintList& colConstraints = colDefPtr->fConstraints; ColumnConstraintList::const_iterator constraint_iter = colConstraints.begin(); while (constraint_iter != colConstraints.end()) { ColumnConstraintDef* consDefPtr = *constraint_iter; if (consDefPtr->fConstraintType == ddlpackage::DDL_NOT_NULL) { nullable = 0; break; } ++constraint_iter; } colTuple.data = nullable; } else if (SCALE_COL == column.tableColName.column) { colTuple.data = colDefPtr->fType->fScale; } else if (PRECISION_COL == column.tableColName.column) { colTuple.data = colDefPtr->fType->fPrecision; } else if (DICTOID_COL == column.tableColName.column) { if (hasDict) { colTuple.data = dictOID.dictOID; } else { colTuple.data = column.colType.getNullValueForType(); } } else if (LISTOBJID_COL == column.tableColName.column) { colTuple.data = column.colType.getNullValueForType(); } else if (TREEOBJID_COL == column.tableColName.column) { colTuple.data = column.colType.getNullValueForType(); } else if (MINVAL_COL == column.tableColName.column) { tmpStr.dropString(); } else if (MAXVAL_COL == column.tableColName.column) { tmpStr.dropString(); } else if (COMPRESSIONTYPE_COL == column.tableColName.column) { colTuple.data = colDefPtr->fType->fCompressiontype; } else if (AUTOINC_COL == column.tableColName.column) { // cout << "autoincrement= " << colDefPtr->fType->fAutoincrement << endl; colTuple.data = colDefPtr->fType->fAutoincrement; } else if (NEXTVALUE_COL == column.tableColName.column) { colTuple.data = colDefPtr->fType->fNextvalue; } else if (CHARSETNUM_COL == column.tableColName.column) { colTuple.data = colDefPtr->fType->fCharsetNum; } else { colTuple.data = column.colType.getNullValueForType(); } colStruct.dataOid = column.oid; colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth; colStruct.tokenFlag = false; colStruct.tokenFlag = column.colType.colWidth > 8 ? true : false; colStruct.colDataType = column.colType.colDataType; colStruct.fColDbRoot = dbroot; dctnryStruct.fColDbRoot = dbroot; if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; dctnryStruct.fCompressionType = 2; } if (colStruct.tokenFlag) // TODO: XXX: this is copied aplenty. NEED TO REFACTOR. { dctnryStruct.dctnryOid = column.colType.ddn.dictOID; dctnryStruct.fCharsetNumber = column.colType.charsetNumber; dctnryStruct.columnOid = column.oid; } else { dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = column.oid; } oids[colStruct.dataOid] = colStruct.dataOid; // oidsToFlush.push_back(colStruct.dataOid); if (dctnryStruct.dctnryOid > 0) { oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid; // oidsToFlush.push_back(dctnryStruct.dctnryOid); } if (colpos == startPos) { colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); cscColTypeList.push_back(column.colType); } colList[i].push_back(colTuple); // colList.push_back(WriteEngine::ColTupleList()); // colList.back().push_back(colTuple); dctColList[i].push_back(tmpStr); // dctColList.push_back(WriteEngine::dictStr()); // dctColList.back().push_back(tmpStr); ++i; ++column_iterator; } ++colpos; col++; ++iter; } fWEWrapper.setTransId(txnID); fWEWrapper.setIsInsert(true); fWEWrapper.setBulkFlag(false); fWEWrapper.startTransaction(txnID); if (0 != colStructs.size()) { for (unsigned int n = 0; n < numCols; n++) { colValuesList.push_back(colList[n]); dctnryValueList.push_back(dctColList[n]); } // fWEWrapper.setDebugLevel(WriteEngine::DEBUG_3); error = fWEWrapper.insertColumnRec_SYS(txnID, cscColTypeList, colStructs, colValuesList, dctnryStructList, dctnryValueList, SYSCOLUMN_BASE); if (idbdatafile::IDBPolicy::useHdfs()) { rc1 = fWEWrapper.flushDataFiles(error, txnID, oids); if ((error == 0) && (rc1 == 0)) { rc1 = fWEWrapper.confirmTransaction(txnID); if (rc1 == NO_ERROR) rc1 = fWEWrapper.endTransaction(txnID, true); else fWEWrapper.endTransaction(txnID, false); } else { fWEWrapper.endTransaction(txnID, false); } } if (error != WriteEngine::NO_ERROR) { if (error == ERR_BRM_WR_VB_ENTRY) { throw std::runtime_error("writeSysColumnMetaData WE: Error writing to BRM."); } else { WErrorCodes ec; throw std::runtime_error("WE: Error updating calpontsys.syscolumn. " + ec.errorString(error)); } } else error = rc1; } } catch (exception& ex) { err += ex.what(); rc = 1; } catch (...) { err += "Unknown exception caught"; rc = 1; } purgeFDCache(); // if (idbdatafile::IDBPolicy::useHdfs()) // cacheutils::flushOIDsFromCache(oidsToFlush); fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); return rc; } uint8_t WE_DDLCommandProc::writeSyscolumn(ByteStream& bs, std::string& err) { int rc = 0; uint32_t sessionID, tmp32, coloid, dictoid; int txnID, startPos; string schema, tablename; uint8_t tmp8; bool isAlter = false; bs >> sessionID; bs >> tmp32; txnID = tmp32; bs >> schema; bs >> tablename; bs >> coloid; bs >> dictoid; bs >> tmp8; // alterFlag bs >> tmp32; startPos = tmp32; isAlter = (tmp8 != 0); boost::scoped_ptr colDefPtr(new ddlpackage::ColumnDef()); colDefPtr->unserialize(bs); WriteEngine::ColStruct colStruct; WriteEngine::ColTuple colTuple; WriteEngine::ColStructList colStructs; WriteEngine::CSCTypesList cscColTypeList; WriteEngine::ColTupleList colTuples; WriteEngine::DctColTupleList dctColTuples; WriteEngine::ColValueList colValuesList; WriteEngine::RIDList ridList; WriteEngine::DctnryStruct dctnryStruct; WriteEngine::dictStr dctnryTuple; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DictStrList dctnryValueList; CalpontSystemCatalog::TableName tableName; ColumnList columns; ColumnList::const_iterator column_iterator; DDLColumn column; int error = 0; tableName.schema = CALPONT_SCHEMA; tableName.table = SYSCOLUMN_TABLE; getColumnsForTable(sessionID, tableName.schema, tableName.table, columns); unsigned int numCols = columns.size(); // WriteEngine::ColTupleList colList[numCols]; // ColTupleList is NOT POD, so let's try this: std::vector colList; // WriteEngine::dictStr dctColList[numCols]; std::vector dctColList; std::map oids; std::vector oidsToFlush; // colpos = 0; for (unsigned int ii = 0; ii < numCols; ii++) { colList.push_back(WriteEngine::ColTupleList()); dctColList.push_back(WriteEngine::dictStr()); } try { DictOID dictOID = {0, 0, 0, 0, 0}; int dataType = convertDataType(colDefPtr->fType->fType); if (dataType == CalpontSystemCatalog::DECIMAL || dataType == CalpontSystemCatalog::UDECIMAL) { if (colDefPtr->fType->fPrecision > 38) //@Bug 5717 precision cannot be over 38. { ostringstream os; os << "Syntax error: The maximum precision (total number of digits) that can be specified is 38"; throw std::runtime_error(os.str()); } else if (colDefPtr->fType->fPrecision < colDefPtr->fType->fScale) { ostringstream os; os << "Syntax error: scale should be less than precision, precision: " << colDefPtr->fType->fPrecision << " scale: " << colDefPtr->fType->fScale; throw std::runtime_error(os.str()); } colDefPtr->convertDecimal(); } if (dictoid > 0) { dictOID.compressionType = colDefPtr->fType->fCompressiontype; dictOID.colWidth = colDefPtr->fType->fLength; dictOID.dictOID = dictoid; //@Bug 2534. Take away the limit of 255 and set the limit to 8000. if ((colDefPtr->fType->fLength > 8000) && (dataType != CalpontSystemCatalog::BLOB) && (dataType != CalpontSystemCatalog::TEXT)) { ostringstream os; os << "char, varchar and varbinary length may not exceed 8000 bytes"; throw std::runtime_error(os.str()); } } else if ((dataType == CalpontSystemCatalog::VARBINARY || dataType == CalpontSystemCatalog::BLOB || dataType == CalpontSystemCatalog::TEXT) && colDefPtr->fType->fLength <= 7) { ostringstream os; os << "varbinary and blob length may not be less than 8"; throw std::runtime_error(os.str()); } unsigned int i = 0; uint16_t dbRoot; BRM::OID_t sysOid = 1021; // Find out where syscolumn is rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot); column_iterator = columns.begin(); while (column_iterator != columns.end()) { NullString tmpStr; column = *column_iterator; boost::to_lower(column.tableColName.column); if (SCHEMA_COL == column.tableColName.column) { colTuple.data = schema; tmpStr.assign(schema); } else if (TABLENAME_COL == column.tableColName.column) { colTuple.data = tablename; tmpStr.assign(tablename); } else if (COLNAME_COL == column.tableColName.column) { boost::to_lower(colDefPtr->fName); colTuple.data = colDefPtr->fName; tmpStr.assign(colDefPtr->fName); } else if (OBJECTID_COL == column.tableColName.column) { colTuple.data = coloid; } else if (DATATYPE_COL == column.tableColName.column) { colTuple.data = dataType; } else if (COLUMNLEN_COL == column.tableColName.column) { //@Bug 2089 Disallow zero length char and varch column to be created if (dataType == CalpontSystemCatalog::CHAR || dataType == CalpontSystemCatalog::VARCHAR || dataType == CalpontSystemCatalog::VARBINARY || dataType == CalpontSystemCatalog::BLOB || dataType == CalpontSystemCatalog::TEXT) { if (colDefPtr->fType->fLength <= 0) { ostringstream os; os << "char, varchar and varbinary length must be greater than zero"; throw std::runtime_error(os.str()); } } colTuple.data = colDefPtr->fType->fLength; } else if (COLUMNPOS_COL == column.tableColName.column) { colTuple.data = startPos; } else if (DEFAULTVAL_COL == column.tableColName.column) { if (colDefPtr->fDefaultValue && !colDefPtr->fDefaultValue->fNull) { tmpStr.assign(colDefPtr->fDefaultValue->fValue); } else { tmpStr.dropString(); // colTuple.data = column.colType.getNullValueForType(); } colTuple.data = tmpStr; } else if (NULLABLE_COL == column.tableColName.column) { int nullable = 1; ColumnConstraintList& colConstraints = colDefPtr->fConstraints; ColumnConstraintList::const_iterator constraint_iter = colConstraints.begin(); while (constraint_iter != colConstraints.end()) { ColumnConstraintDef* consDefPtr = *constraint_iter; if (consDefPtr->fConstraintType == ddlpackage::DDL_NOT_NULL) { nullable = 0; break; } ++constraint_iter; } colTuple.data = nullable; } else if (SCALE_COL == column.tableColName.column) { colTuple.data = colDefPtr->fType->fScale; } else if (PRECISION_COL == column.tableColName.column) { colTuple.data = colDefPtr->fType->fPrecision; } else if (DICTOID_COL == column.tableColName.column) { if (dictoid > 0) { colTuple.data = dictOID.dictOID; } else { colTuple.data = column.colType.getNullValueForType(); } } else if (LISTOBJID_COL == column.tableColName.column) { colTuple.data = column.colType.getNullValueForType(); } else if (TREEOBJID_COL == column.tableColName.column) { colTuple.data = column.colType.getNullValueForType(); } else if (MINVAL_COL == column.tableColName.column) { tmpStr.dropString(); } else if (MAXVAL_COL == column.tableColName.column) { tmpStr.dropString(); } else if (COMPRESSIONTYPE_COL == column.tableColName.column) { colTuple.data = colDefPtr->fType->fCompressiontype; } else if (AUTOINC_COL == column.tableColName.column) { // cout << "autoincrement= " << colDefPtr->fType->fAutoincrement << endl; colTuple.data = colDefPtr->fType->fAutoincrement; } else if (NEXTVALUE_COL == column.tableColName.column) { colTuple.data = colDefPtr->fType->fNextvalue; } else if (CHARSETNUM_COL == column.tableColName.column) { colTuple.data = colDefPtr->fType->fCharsetNum; } else { colTuple.data = column.colType.getNullValueForType(); } colStruct.dataOid = column.oid; oids[column.oid] = column.oid; oidsToFlush.push_back(column.oid); colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth; colStruct.tokenFlag = false; colStruct.fColDbRoot = dbRoot; colStruct.tokenFlag = column.colType.colWidth > 8 ? true : false; colStruct.colDataType = column.colType.colDataType; dctnryStruct.fColDbRoot = dbRoot; if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; dctnryStruct.fCompressionType = 2; } if (colStruct.tokenFlag) { dctnryStruct.dctnryOid = column.colType.ddn.dictOID; dctnryStruct.fCharsetNumber = column.colType.charsetNumber; dctnryStruct.columnOid = column.oid; } else { dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = column.oid; } if (dctnryStruct.dctnryOid > 0) { oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid; oidsToFlush.push_back(dctnryStruct.dctnryOid); } colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); cscColTypeList.push_back(column.colType); colList[i].push_back(colTuple); // colList.push_back(WriteEngine::ColTupleList()); // colList.back().push_back(colTuple); dctColList[i].push_back(tmpStr); // dctColList.push_back(WriteEngine::dictStr()); // dctColList.back().push_back(tmpStr); ++i; ++column_iterator; } if (0 != colStructs.size()) { // FIXME: Is there a cleaner way to do this? Isn't colValuesList the same as colList after this? for (unsigned int n = 0; n < numCols; n++) { colValuesList.push_back(colList[n]); dctnryValueList.push_back(dctColList[n]); } // fWEWrapper.setDebugLevel(WriteEngine::DEBUG_3); fWEWrapper.setTransId(txnID); fWEWrapper.setIsInsert(true); fWEWrapper.setBulkFlag(false); fWEWrapper.startTransaction(txnID); int rc1 = 0; error = fWEWrapper.insertColumnRec_SYS(txnID, cscColTypeList, colStructs, colValuesList, dctnryStructList, dctnryValueList, SYSCOLUMN_BASE); if (idbdatafile::IDBPolicy::useHdfs()) { rc1 = fWEWrapper.flushDataFiles(error, txnID, oids); if ((error == 0) && (rc1 == 0)) { rc1 = fWEWrapper.confirmTransaction(txnID); if (rc1 == NO_ERROR) rc1 = fWEWrapper.endTransaction(txnID, true); else fWEWrapper.endTransaction(txnID, false); } else { fWEWrapper.endTransaction(txnID, false); } } if (error != WriteEngine::NO_ERROR) { if (error == ERR_BRM_WR_VB_ENTRY) { throw std::runtime_error("writeSysColumnMetaData WE: Error writing to BRM."); } else { WErrorCodes ec; throw std::runtime_error("WE: Error updating calpontsys.syscolumn. " + ec.errorString(error)); } } else error = rc1; } } catch (exception& ex) { err += ex.what(); rc = 1; } catch (...) { err += "Unknown exception caught"; rc = 1; } purgeFDCache(); if (isAlter) { if (idbdatafile::IDBPolicy::useHdfs()) cacheutils::flushOIDsFromCache(oidsToFlush); } return rc; } uint8_t WE_DDLCommandProc::createtablefiles(ByteStream& bs, std::string& err) { int rc = 0; uint32_t size, i; uint16_t tmp16; uint32_t tmp32; uint8_t tmp8; OID dataOid; int colWidth; bool tokenFlag; int txnID; CalpontSystemCatalog::ColDataType colDataType; uint16_t colDbRoot; int compressionType; bs >> tmp32; txnID = tmp32; bs >> size; fWEWrapper.setTransId(txnID); fWEWrapper.setIsInsert(true); fWEWrapper.setBulkFlag(true); std::map oids; for (i = 0; i < size; ++i) { bs >> tmp32; dataOid = tmp32; bs >> tmp8; colDataType = (CalpontSystemCatalog::ColDataType)tmp8; bs >> tmp8; tokenFlag = (tmp8 != 0); bs >> tmp32; colWidth = tmp32; bs >> tmp16; colDbRoot = tmp16; bs >> tmp32; compressionType = tmp32; oids[dataOid] = dataOid; if (tokenFlag) { rc = fWEWrapper.createDctnry(0, dataOid, colWidth, colDbRoot, 0, 0, compressionType); } else { rc = fWEWrapper.createColumn(0, dataOid, colDataType, colWidth, colDbRoot, 0, compressionType); } if (rc != 0) break; } // cout << "creating column file got error code " << rc << endl; if (rc != 0) { WErrorCodes ec; ostringstream oss; oss << "WE: Error creating column file for oid " << dataOid << "; " << ec.errorString(rc) << endl; err = oss.str(); } // if (idbdatafile::IDBPolicy::useHdfs()) fWEWrapper.flushDataFiles(rc, txnID, oids); purgeFDCache(); return rc; } uint8_t WE_DDLCommandProc::commitVersion(ByteStream& bs, std::string& err) { int rc = 0; uint32_t tmp32; int txnID; bs >> tmp32; txnID = tmp32; rc = fWEWrapper.commit(txnID); if (rc != 0) { WErrorCodes ec; ostringstream oss; oss << "WE: Error commiting transaction; " << txnID << ec.errorString(rc) << endl; err = oss.str(); } return rc; } uint8_t WE_DDLCommandProc::rollbackBlocks(ByteStream& bs, std::string& err) { int rc = 0; uint32_t sessionID, tmp32; ; int txnID; bs >> sessionID; bs >> tmp32; txnID = tmp32; fWEWrapper.setTransId(txnID); fWEWrapper.setIsInsert(true); fWEWrapper.setBulkFlag(true); rc = fWEWrapper.rollbackBlocks(txnID, sessionID); if (rc != 0) { WErrorCodes ec; ostringstream oss; oss << "WE: Error rolling back files " << txnID << " for session " << sessionID << "; " << ec.errorString(rc) << endl; err = oss.str(); } std::map oids; if (idbdatafile::IDBPolicy::useHdfs()) fWEWrapper.flushDataFiles(rc, txnID, oids); purgeFDCache(); fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); return rc; } uint8_t WE_DDLCommandProc::rollbackVersion(ByteStream& bs, std::string& err) { int rc = 0; uint32_t sessionID, tmp32; int txnID; bs >> sessionID; bs >> tmp32; txnID = tmp32; rc = fWEWrapper.rollbackVersion(txnID, sessionID); if (rc != 0) { WErrorCodes ec; ostringstream oss; oss << "WE: Error rolling back transaction " << txnID << " for session " << sessionID << "; " << ec.errorString(rc) << endl; err = oss.str(); } purgeFDCache(); return rc; } uint8_t WE_DDLCommandProc::deleteSyscolumn(ByteStream& bs, std::string& err) { int rc = 0; uint32_t sessionID, tmp32; int txnID; string schema, tablename; bs >> sessionID; bs >> tmp32; txnID = tmp32; bs >> schema; bs >> tablename; ddlpackage::QualifiedName sysCatalogTableName; sysCatalogTableName.fSchema = CALPONT_SCHEMA; sysCatalogTableName.fName = SYSCOLUMN_TABLE; CalpontSystemCatalog::TableName userTableName; userTableName.schema = schema; userTableName.table = tablename; boost::shared_ptr systemCatalogPtr; systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); uint16_t dbRoot; BRM::OID_t sysOid = 1021; // Find out where syscolumn is rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot); fWEWrapper.setTransId(txnID); fWEWrapper.startTransaction(txnID); fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); std::map oids; // std::vector oidsToFlush; try { CalpontSystemCatalog::RIDList colRidList = systemCatalogPtr->columnRIDs(userTableName); WriteEngine::ColStruct colStruct; WriteEngine::ColStructList colStructs; WriteEngine::CSCTypesList cscColTypeList; std::vector colExtentsStruct; std::vector colExtentsColType; std::vector colValuesList; WriteEngine::RIDList ridList; std::vector ridLists; DDLColumn column; CalpontSystemCatalog::RIDList::const_iterator colrid_iterator = colRidList.begin(); while (colrid_iterator != colRidList.end()) { WriteEngine::RID rid = (*colrid_iterator).rid; ridList.push_back(rid); ++colrid_iterator; } ColumnList columns; getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns); ColumnList::const_iterator column_iterator = columns.begin(); while (column_iterator != columns.end()) { column = *column_iterator; colStruct.dataOid = column.oid; colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth; colStruct.colDataType = column.colType.colDataType; colStruct.fColDbRoot = dbRoot; if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; } oids[colStruct.dataOid] = colStruct.dataOid; // oidsToFlush.push_back(colStruct.dataOid); colStructs.push_back(colStruct); cscColTypeList.push_back(column.colType); ++column_iterator; } colExtentsStruct.push_back(colStructs); colExtentsColType.push_back(cscColTypeList); ridLists.push_back(ridList); if (0 != colStructs.size() && 0 != ridLists[0].size()) { int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists, SYSCOLUMN_BASE); int rc1 = 0; if (idbdatafile::IDBPolicy::useHdfs()) { rc1 = fWEWrapper.flushDataFiles(error, txnID, oids); if ((error == 0) && (rc1 == 0)) { rc1 = fWEWrapper.confirmTransaction(txnID); if (rc1 == NO_ERROR) rc1 = fWEWrapper.endTransaction(txnID, true); else fWEWrapper.endTransaction(txnID, false); } else { fWEWrapper.endTransaction(txnID, false); } } if (error == NO_ERROR) rc = rc1; else rc = error; } } catch (exception& ex) { err = ex.what(); rc = 1; } catch (...) { err = "Unknown exception caught"; rc = 1; } systemCatalogPtr->flushCache(); purgeFDCache(); // if (idbdatafile::IDBPolicy::useHdfs()) // cacheutils::flushOIDsFromCache(oidsToFlush); return rc; } uint8_t WE_DDLCommandProc::deleteSyscolumnRow(ByteStream& bs, std::string& err) { int rc = 0; uint32_t sessionID, tmp32; ; int txnID; string schema, tablename, columnname; bs >> sessionID; bs >> tmp32; txnID = tmp32; bs >> schema; bs >> tablename; bs >> columnname; ddlpackage::QualifiedName sysCatalogTableName; sysCatalogTableName.fSchema = CALPONT_SCHEMA; sysCatalogTableName.fName = SYSCOLUMN_TABLE; CalpontSystemCatalog::TableColName tableColName; tableColName.schema = schema; tableColName.table = tablename; tableColName.column = columnname; boost::shared_ptr systemCatalogPtr; systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); uint16_t dbRoot; BRM::OID_t sysOid = 1021; // Find out where syscolumn is rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot); fWEWrapper.setTransId(txnID); fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); fWEWrapper.startTransaction(txnID); std::map oids; // std::vector oidsToFlush; try { CalpontSystemCatalog::ROPair colRO = systemCatalogPtr->columnRID(tableColName); if (colRO.objnum < 0) { err = "Column not found:" + tableColName.table + "." + tableColName.column; throw std::runtime_error(err); } WriteEngine::ColStruct colStruct; WriteEngine::ColStructList colStructs; WriteEngine::CSCTypesList cscColTypeList; std::vector colExtentsStruct; std::vector colExtentsColType; std::vector colValuesList; WriteEngine::RIDList ridList; std::vector ridLists; DDLColumn column; ridList.push_back(colRO.rid); ColumnList columns; getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns); ColumnList::const_iterator column_iterator = columns.begin(); while (column_iterator != columns.end()) { column = *column_iterator; colStruct.dataOid = column.oid; colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth; colStruct.colDataType = column.colType.colDataType; colStruct.fColDbRoot = dbRoot; if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; } oids[colStruct.dataOid] = colStruct.dataOid; // oidsToFlush.push_back(colStruct.dataOid); colStructs.push_back(colStruct); cscColTypeList.push_back(column.colType); ++column_iterator; } colExtentsStruct.push_back(colStructs); colExtentsColType.push_back(cscColTypeList); ridLists.push_back(ridList); if (0 != colStructs.size() && 0 != ridLists[0].size()) { int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists, SYSCOLUMN_BASE); int rc1 = 0; if (idbdatafile::IDBPolicy::useHdfs()) { rc1 = fWEWrapper.flushDataFiles(error, txnID, oids); if ((error == 0) && (rc1 == 0)) { rc1 = fWEWrapper.confirmTransaction(txnID); if (rc1 == NO_ERROR) rc1 = fWEWrapper.endTransaction(txnID, true); else fWEWrapper.endTransaction(txnID, false); } else { fWEWrapper.endTransaction(txnID, false); } } if (error == NO_ERROR) rc = rc1; else rc = error; } } catch (exception& ex) { err = ex.what(); rc = 1; } catch (...) { err = "Unknown exception caught"; rc = 1; } systemCatalogPtr->flushCache(); purgeFDCache(); // if (idbdatafile::IDBPolicy::useHdfs()) // cacheutils::flushOIDsFromCache(oidsToFlush); return rc; } uint8_t WE_DDLCommandProc::deleteSystable(ByteStream& bs, std::string& err) { int rc = 0; uint32_t sessionID, tmp32; ; int txnID; string schema, tablename; bs >> sessionID; bs >> tmp32; txnID = tmp32; bs >> schema; bs >> tablename; WriteEngine::WriteEngineWrapper writeEngine; ddlpackage::QualifiedName sysCatalogTableName; sysCatalogTableName.fSchema = CALPONT_SCHEMA; sysCatalogTableName.fName = SYSTABLE_TABLE; CalpontSystemCatalog::TableName userTableName; userTableName.schema = schema; userTableName.table = tablename; boost::shared_ptr systemCatalogPtr; systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); uint16_t dbRoot; BRM::OID_t sysOid = 1001; // Find out where systcolumn is rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot); fWEWrapper.setTransId(txnID); fWEWrapper.startTransaction(txnID); fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); std::map oids; // std::vector oidsToFlush; try { CalpontSystemCatalog::ROPair userTableROPair = systemCatalogPtr->tableRID(userTableName); if (userTableROPair.rid == std::numeric_limits::max()) { err = "RowID is not valid "; throw std::runtime_error(err); } WriteEngine::ColStruct colStruct; WriteEngine::ColStructList colStructs; WriteEngine::CSCTypesList cscColTypeList; std::vector colExtentsStruct; std::vector colExtentsColType; std::vector colValuesList; WriteEngine::RIDList ridList; std::vector ridLists; DDLColumn column; ridList.push_back(userTableROPair.rid); ColumnList columns; getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns); ColumnList::const_iterator column_iterator = columns.begin(); while (column_iterator != columns.end()) { column = *column_iterator; colStruct.dataOid = column.oid; colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth; colStruct.colDataType = column.colType.colDataType; colStruct.fColDbRoot = dbRoot; if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; } oids[colStruct.dataOid] = colStruct.dataOid; // oidsToFlush.push_back(colStruct.dataOid); colStructs.push_back(colStruct); cscColTypeList.push_back(column.colType); ++column_iterator; } colExtentsStruct.push_back(colStructs); colExtentsColType.push_back(cscColTypeList); ridLists.push_back(ridList); if (0 != colStructs.size() && 0 != ridLists[0].size()) { int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists, SYSCOLUMN_BASE); int rc1 = 0; if (idbdatafile::IDBPolicy::useHdfs()) { rc1 = fWEWrapper.flushDataFiles(error, txnID, oids); if ((error == 0) && (rc1 == 0)) { rc1 = fWEWrapper.confirmTransaction(txnID); if (rc1 == NO_ERROR) rc1 = fWEWrapper.endTransaction(txnID, true); else fWEWrapper.endTransaction(txnID, false); } else { fWEWrapper.endTransaction(txnID, false); } } if (error == NO_ERROR) rc = rc1; else rc = error; } } catch (exception& ex) { err = ex.what(); rc = 1; } catch (...) { err = "Unknown exception caught"; rc = 1; } systemCatalogPtr->flushCache(); purgeFDCache(); // if (idbdatafile::IDBPolicy::useHdfs()) // cacheutils::flushOIDsFromCache(oidsToFlush); return rc; } uint8_t WE_DDLCommandProc::deleteSystables(ByteStream& bs, std::string& err) { int rc = 0; uint32_t sessionID, tmp32; ; int txnID; string schema, tablename; bs >> sessionID; bs >> tmp32; txnID = tmp32; bs >> schema; bs >> tablename; WriteEngine::WriteEngineWrapper writeEngine; ddlpackage::QualifiedName sysCatalogTableName; sysCatalogTableName.fSchema = CALPONT_SCHEMA; sysCatalogTableName.fName = SYSTABLE_TABLE; CalpontSystemCatalog::TableName userTableName; userTableName.schema = schema; userTableName.table = tablename; boost::shared_ptr systemCatalogPtr; systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); WriteEngine::ColStruct colStruct; WriteEngine::ColStructList colStructs; WriteEngine::CSCTypesList cscColTypeList; std::vector colExtentsStruct; std::vector colExtentsColType; std::vector colValuesList; WriteEngine::RIDList ridList; std::vector ridLists; DDLColumn column; uint16_t dbRoot; BRM::OID_t sysOid = 1003; // Find out where systable is rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot); fWEWrapper.setTransId(txnID); fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); fWEWrapper.startTransaction(txnID); std::map oids; // std::vector oidsToFlush; try { CalpontSystemCatalog::ROPair userTableROPair = systemCatalogPtr->tableRID(userTableName); if (userTableROPair.rid == std::numeric_limits::max()) { err = "RowID is not valid "; throw std::runtime_error(err); } ridList.push_back(userTableROPair.rid); ColumnList columns; getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns); ColumnList::const_iterator column_iterator = columns.begin(); while (column_iterator != columns.end()) { column = *column_iterator; colStruct.dataOid = column.oid; colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth; colStruct.colDataType = column.colType.colDataType; colStruct.fColDbRoot = dbRoot; if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; } oids[colStruct.dataOid] = colStruct.dataOid; // oidsToFlush.push_back(colStruct.dataOid); colStructs.push_back(colStruct); cscColTypeList.push_back(column.colType); ++column_iterator; } colExtentsStruct.push_back(colStructs); colExtentsColType.push_back(cscColTypeList); ridLists.push_back(ridList); { int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists, SYSCOLUMN_BASE); int rc1 = 0; if (idbdatafile::IDBPolicy::useHdfs()) { rc1 = fWEWrapper.flushDataFiles(error, txnID, oids); if ((error == 0) && (rc1 == 0)) { rc1 = fWEWrapper.confirmTransaction(txnID); if (rc1 == NO_ERROR) rc1 = fWEWrapper.endTransaction(txnID, true); else fWEWrapper.endTransaction(txnID, false); } else { fWEWrapper.endTransaction(txnID, false); } } if (error == NO_ERROR) rc = rc1; else rc = error; } } catch (exception& ex) { err = ex.what(); rc = 1; } catch (...) { err = "Unknown exception caught"; rc = 1; } if (rc != 0) return rc; // deleting from SYSCOLUMN sysCatalogTableName.fSchema = CALPONT_SCHEMA; sysCatalogTableName.fName = SYSCOLUMN_TABLE; sysOid = 1021; // Find out where syscolumn is rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot); try { CalpontSystemCatalog::RIDList colRidList = systemCatalogPtr->columnRIDs(userTableName); colStructs.clear(); cscColTypeList.clear(); colExtentsStruct.clear(); colExtentsColType.clear(); colValuesList.clear(); ridList.clear(); ridLists.clear(); oids.clear(); DDLColumn column; CalpontSystemCatalog::RIDList::const_iterator colrid_iterator = colRidList.begin(); while (colrid_iterator != colRidList.end()) { WriteEngine::RID rid = (*colrid_iterator).rid; ridList.push_back(rid); ++colrid_iterator; } ColumnList columns; getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns); ColumnList::const_iterator column_iterator = columns.begin(); while (column_iterator != columns.end()) { column = *column_iterator; colStruct.dataOid = column.oid; colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth; colStruct.colDataType = column.colType.colDataType; colStruct.fColDbRoot = dbRoot; if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; } colStructs.push_back(colStruct); oids[colStruct.dataOid] = colStruct.dataOid; cscColTypeList.push_back(column.colType); // oidsToFlush.push_back(colStruct.dataOid); ++column_iterator; } colExtentsStruct.push_back(colStructs); colExtentsColType.push_back(cscColTypeList); ridLists.push_back(ridList); if (0 != colStructs.size() && 0 != ridLists[0].size()) { int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists, SYSCOLUMN_BASE); int rc1 = 0; if (idbdatafile::IDBPolicy::useHdfs()) { rc1 = fWEWrapper.flushDataFiles(error, txnID, oids); if ((error == 0) && (rc1 == 0)) { rc1 = fWEWrapper.confirmTransaction(txnID); if (rc1 == NO_ERROR) rc1 = fWEWrapper.endTransaction(txnID, true); else fWEWrapper.endTransaction(txnID, false); } else { fWEWrapper.endTransaction(txnID, false); } } if (error == NO_ERROR) rc = rc1; else rc = error; } } catch (exception& ex) { err = ex.what(); rc = 1; } catch (...) { err = "Unknown exception caught"; rc = 1; } systemCatalogPtr->flushCache(); purgeFDCache(); // if (idbdatafile::IDBPolicy::useHdfs()) // cacheutils::flushOIDsFromCache(oidsToFlush); return rc; } uint8_t WE_DDLCommandProc::dropFiles(ByteStream& bs, std::string& err) { int rc = 0; uint32_t size, i; uint32_t tmp32; std::vector dataOids; bs >> size; for (i = 0; i < size; ++i) { bs >> tmp32; dataOids.push_back(tmp32); } try { rc = fWEWrapper.dropFiles(0, dataOids); } catch (...) { err = "WE: Error removing files "; rc = 1; } purgeFDCache(); return rc; } uint8_t WE_DDLCommandProc::updateSyscolumnAuto(ByteStream& bs, std::string& err) { int rc = 0; uint32_t sessionID, tmp32; std::string schema, tablename; int txnID; uint8_t tmp8; bool autoIncrement = false; bs >> sessionID; bs >> tmp32; txnID = tmp32; bs >> schema; bs >> tablename; bs >> tmp8; autoIncrement = true; CalpontSystemCatalog::TableName tableName; tableName.schema = schema; tableName.table = tablename; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryValueList dctnryValueList; WriteEngine::DctColTupleList dctRowList; WriteEngine::DctnryTuple dctColList; uint16_t dbRoot = 0; uint16_t segment; uint32_t partition; CalpontSystemCatalog::RIDList roList; boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); try { roList = systemCatalogPtr->columnRIDs(tableName); } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } // Build colStructs for SYSTABLE std::vector ridList; WriteEngine::ColValueList colValuesList; WriteEngine::ColTupleList aColList; WriteEngine::ColStructList colStructs; WriteEngine::CSCTypesList cscColTypeList; std::vector colOldValuesList; std::map oids; // std::vector oidsToFlush; tableName.schema = CALPONT_SCHEMA; tableName.table = SYSCOLUMN_TABLE; DDLColumn column; WriteEngine::ColTuple colTuple; findColumnData(sessionID, tableName, AUTOINC_COL, column); WriteEngine::ColStruct colStruct; WriteEngine::DctnryStruct dctnryStruct; colStruct.dataOid = column.oid; colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth; colStruct.tokenFlag = false; colStruct.colDataType = column.colType.colDataType; if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; } string s1("y"), s2("n"); boost::any datavalue1 = s1; boost::any datavalue2 = s2; if (autoIncrement) colTuple.data = datavalue1; else colTuple.data = datavalue2; colStruct.colDataType = column.colType.colDataType; dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = colStruct.dataOid; colStructs.push_back(colStruct); oids[colStruct.dataOid] = colStruct.dataOid; // oidsToFlush.push_back(colStruct.dataOid); dctnryStructList.push_back(dctnryStruct); cscColTypeList.push_back(column.colType); for (unsigned int i = 0; i < roList.size(); i++) { aColList.push_back(colTuple); } colValuesList.push_back(aColList); std::vector colExtentsStruct; std::vector colExtentsColType; std::vector dctnryExtentsStruct; std::vector extentsinfo; extentInfo aExtentinfo; CalpontSystemCatalog::OID oid = 1021; fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); fWEWrapper.setTransId(txnID); for (unsigned int i = 0; i < roList.size(); i++) { convertRidToColumn(roList[i].rid, dbRoot, partition, segment, oid); aExtentinfo.dbRoot = dbRoot; aExtentinfo.partition = partition; aExtentinfo.segment = segment; if (extentsinfo.empty()) extentsinfo.push_back(aExtentinfo); else if (extentsinfo.back() != aExtentinfo) extentsinfo.push_back(aExtentinfo); ridList.push_back(roList[i].rid); } std::vector ridLists; ridLists.push_back(ridList); // build colExtentsStruct for (unsigned i = 0; i < extentsinfo.size(); i++) { for (unsigned j = 0; j < colStructs.size(); j++) { colStructs[j].fColPartition = extentsinfo[i].partition; colStructs[j].fColSegment = extentsinfo[i].segment; colStructs[j].fColDbRoot = extentsinfo[i].dbRoot; dctnryStructList[j].fColPartition = extentsinfo[i].partition; dctnryStructList[j].fColSegment = extentsinfo[i].segment; dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot; } colExtentsStruct.push_back(colStructs); dctnryExtentsStruct.push_back(dctnryStructList); colExtentsColType.push_back(cscColTypeList); } // call the write engine to update the row if (idbdatafile::IDBPolicy::useHdfs()) fWEWrapper.startTransaction(txnID); rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE); if (rc != NO_ERROR) { // build the logging message err = "WE: Update failed on: " + tableName.table; } int rc1 = 0; if (idbdatafile::IDBPolicy::useHdfs()) { rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids); if ((rc == 0) && (rc1 == 0)) { rc1 = fWEWrapper.confirmTransaction(txnID); if (rc1 == NO_ERROR) rc1 = fWEWrapper.endTransaction(txnID, true); else fWEWrapper.endTransaction(txnID, false); } else { fWEWrapper.endTransaction(txnID, false); } } systemCatalogPtr->flushCache(); purgeFDCache(); // if (idbdatafile::IDBPolicy::useHdfs()) // cacheutils::flushOIDsFromCache(oidsToFlush); return rc; } uint8_t WE_DDLCommandProc::updateSyscolumnNextvalCol(ByteStream& bs, std::string& err) { int rc = 0; uint32_t sessionID, tmp32; std::string schema, tablename; int txnID; uint8_t tmp8; bool autoIncrement = false; bs >> sessionID; bs >> tmp32; txnID = tmp32; bs >> schema; bs >> tablename; bs >> tmp8; autoIncrement = true; CalpontSystemCatalog::TableName tableName; tableName.schema = schema; tableName.table = tablename; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryValueList dctnryValueList; WriteEngine::DctColTupleList dctRowList; WriteEngine::DctnryTuple dctColList; uint16_t dbRoot = 0; uint16_t segment; uint32_t partition; CalpontSystemCatalog::RIDList roList; boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); try { roList = systemCatalogPtr->columnRIDs(tableName); } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } // Build colStructs for SYSTABLE tableName.schema = CALPONT_SCHEMA; tableName.table = SYSCOLUMN_TABLE; DDLColumn column; WriteEngine::ColStruct colStruct; WriteEngine::DctnryStruct dctnryStruct; WriteEngine::ColTuple colTuple; std::vector ridList; WriteEngine::ColValueList colValuesList; WriteEngine::ColTupleList aColList; WriteEngine::ColStructList colStructs; WriteEngine::CSCTypesList cscColTypeList; std::vector colOldValuesList; std::map oids; // std::vector oidsToFlush; boost::any datavalue; findColumnData(sessionID, tableName, AUTOINC_COL, column); colStruct.dataOid = column.oid; colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth; colStruct.tokenFlag = false; colStruct.colDataType = column.colType.colDataType; if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; dctnryStruct.fCompressionType = 2; } string ystr("y"); string nstr("n"); if (autoIncrement) colTuple.data = ystr; else colTuple.data = nstr; colStruct.colDataType = column.colType.colDataType; dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = colStruct.dataOid; oids[colStruct.dataOid] = colStruct.dataOid; // oidsToFlush.push_back(colStruct.dataOid); colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); cscColTypeList.push_back(column.colType); for (unsigned int i = 0; i < roList.size(); i++) { aColList.push_back(colTuple); } colValuesList.push_back(aColList); // get start dbroot for this PM. // int PMNum = Config::getLocalModuleID(); std::vector extentsinfo; extentInfo aExtentinfo; // oam.getDbroots(PMNum); // dbRoot will be the first dbroot on this pm. dbrootCnt will be how many dbroots on this PM. CalpontSystemCatalog::OID oid = 1021; for (unsigned int i = 0; i < roList.size(); i++) { convertRidToColumn(roList[i].rid, dbRoot, partition, segment, oid); aExtentinfo.dbRoot = dbRoot; aExtentinfo.partition = partition; aExtentinfo.segment = segment; if (extentsinfo.empty()) extentsinfo.push_back(aExtentinfo); else if (extentsinfo.back() != aExtentinfo) extentsinfo.push_back(aExtentinfo); ridList.push_back(roList[i].rid); } std::vector ridLists; std::vector colExtentsStruct; std::vector colExtentsColType; std::vector dctnryExtentsStruct; ridLists.push_back(ridList); // build colExtentsStruct for (unsigned i = 0; i < extentsinfo.size(); i++) { for (unsigned j = 0; j < colStructs.size(); j++) { colStructs[j].fColPartition = extentsinfo[i].partition; colStructs[j].fColSegment = extentsinfo[i].segment; colStructs[j].fColDbRoot = extentsinfo[i].dbRoot; dctnryStructList[j].fColPartition = extentsinfo[i].partition; dctnryStructList[j].fColSegment = extentsinfo[i].segment; dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot; } colExtentsStruct.push_back(colStructs); dctnryExtentsStruct.push_back(dctnryStructList); colExtentsColType.push_back(cscColTypeList); } // call the write engine to update the row fWEWrapper.setTransId(txnID); fWEWrapper.startTransaction(txnID); rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE); if (rc != NO_ERROR) { // build the logging message err = "WE: Update failed on: " + tableName.table; } if (idbdatafile::IDBPolicy::useHdfs()) { fWEWrapper.flushDataFiles(rc, txnID, oids); fWEWrapper.confirmTransaction(txnID); if (rc == 0) fWEWrapper.endTransaction(txnID, true); else fWEWrapper.endTransaction(txnID, false); } systemCatalogPtr->flushCache(); purgeFDCache(); fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); return rc; } uint8_t WE_DDLCommandProc::updateSystableEntryForSysColumn(int32_t sessionID, uint32_t txnID, const DDLColumn& column, const std::string& value, const std::string& oldValue, execplan::CalpontSystemCatalog::RIDList& roList, std::string& err) { int rc = 0; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryValueList dctnryValueList; WriteEngine::DctColTupleList dctRowList; WriteEngine::DctnryTuple dctColList; WriteEngine::ColTuple colTuple; std::map oids; // std::vector oidsToFlush; uint16_t dbRoot = 0; uint16_t segment; uint32_t partition; std::vector ridList; WriteEngine::ColValueList colValuesList; WriteEngine::ColTupleList aColList; WriteEngine::ColStructList colStructs; WriteEngine::CSCTypesList cscColTypeList; std::vector colOldValuesList; WriteEngine::ColStruct colStruct; WriteEngine::DctnryStruct dctnryStruct; WriteEngine::DctnryTuple dictTuple; dictTuple.isNull = false; colStruct.dataOid = column.oid; colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth; colStruct.tokenFlag = false; if ((column.colType.colDataType == CalpontSystemCatalog::CHAR && column.colType.colWidth > 8) || (column.colType.colDataType == CalpontSystemCatalog::VARCHAR && column.colType.colWidth > 7) || (column.colType.colDataType == CalpontSystemCatalog::VARBINARY && column.colType.colWidth > 7) || (column.colType.colDataType == CalpontSystemCatalog::BLOB && column.colType.colWidth > 7) || (column.colType.colDataType == CalpontSystemCatalog::TEXT && column.colType.colWidth > 7) || (column.colType.colDataType == CalpontSystemCatalog::DECIMAL && column.colType.precision > 18) || (column.colType.colDataType == CalpontSystemCatalog::UDECIMAL && column.colType.precision > 18)) // token { colStruct.colWidth = 8; colStruct.tokenFlag = true; } else { colStruct.colWidth = column.colType.colWidth; } colStruct.colDataType = column.colType.colDataType; if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; dctnryStruct.fCompressionType = 2; } if (colStruct.tokenFlag) { dctnryStruct.dctnryOid = column.colType.ddn.dictOID; dctnryStruct.fCharsetNumber = column.colType.charsetNumber; dctnryStruct.columnOid = colStruct.dataOid; } else { dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = colStruct.dataOid; } oids[colStruct.dataOid] = colStruct.dataOid; // oidsToFlush.push_back(colStruct.dataOid); if (dctnryStruct.dctnryOid > 0) { oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid; // oidsToFlush.push_back(dctnryStruct.dctnryOid); } colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); cscColTypeList.push_back(column.colType); for (unsigned int i = 0; i < roList.size(); i++) { aColList.push_back(colTuple); } colValuesList.push_back(aColList); // It's the same string for each column, so we just need one dictionary struct void* dictTuplePtr = static_cast(&dictTuple); memset(dictTuplePtr, 0, sizeof(dictTuple)); dictTuple.sigValue = (unsigned char*)value.c_str(); dictTuple.sigSize = value.length(); dictTuple.isNull = false; dctColList = dictTuple; dctRowList.push_back(dctColList); dctnryValueList.push_back(dctRowList); CalpontSystemCatalog::OID oid = 1021; std::vector extentsinfo; extentInfo aExtentinfo; std::vector colExtentsStruct; std::vector colExtentsColType; std::vector dctnryExtentsStruct; for (unsigned int i = 0; i < roList.size(); i++) { convertRidToColumn(roList[i].rid, dbRoot, partition, segment, oid); aExtentinfo.dbRoot = dbRoot; aExtentinfo.partition = partition; aExtentinfo.segment = segment; if (extentsinfo.empty()) extentsinfo.push_back(aExtentinfo); else if (extentsinfo.back() != aExtentinfo) extentsinfo.push_back(aExtentinfo); ridList.push_back(roList[i].rid); } std::vector ridLists; ridLists.push_back(ridList); // build colExtentsStruct for (unsigned i = 0; i < extentsinfo.size(); i++) { for (unsigned j = 0; j < colStructs.size(); j++) { colStructs[j].fColPartition = extentsinfo[i].partition; colStructs[j].fColSegment = extentsinfo[i].segment; colStructs[j].fColDbRoot = extentsinfo[i].dbRoot; dctnryStructList[j].fColPartition = extentsinfo[i].partition; dctnryStructList[j].fColSegment = extentsinfo[i].segment; dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot; } colExtentsStruct.push_back(colStructs); dctnryExtentsStruct.push_back(dctnryStructList); colExtentsColType.push_back(cscColTypeList); } // call the write engine to update the row fWEWrapper.setTransId(txnID); fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); fWEWrapper.startTransaction(txnID); rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE); if (rc != NO_ERROR) { // build the logging message err = "WE: Update failed on: " + value; } int rc1 = 0; if (idbdatafile::IDBPolicy::useHdfs()) { rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids); if ((rc == 0) && (rc1 == 0)) { rc1 = fWEWrapper.confirmTransaction(txnID); if (rc1 == NO_ERROR) rc1 = fWEWrapper.endTransaction(txnID, true); else fWEWrapper.endTransaction(txnID, false); } else { fWEWrapper.endTransaction(txnID, false); } } if (rc == 0) rc = rc1; return rc; } uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& err) { int rc = 0; uint32_t sessionID, txnID; std::string schema, oldTablename, newTablename, newSchema; bs >> sessionID; bs >> txnID; bs >> schema; bs >> oldTablename; bs >> newTablename; bs >> newSchema; CalpontSystemCatalog::TableName tableName; tableName.schema = schema; tableName.table = oldTablename; CalpontSystemCatalog::RIDList roList; boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); try { roList = systemCatalogPtr->columnRIDs(tableName); } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } // Build colStructs for SYSTABLE tableName.schema = CALPONT_SCHEMA; tableName.table = SYSCOLUMN_TABLE; DDLColumn column; findColumnData(sessionID, tableName, TABLENAME_COL, column); rc = updateSystableEntryForSysColumn(sessionID, txnID, column, newTablename, oldTablename, roList, err); if (newSchema != schema && rc == NO_ERROR) { findColumnData(sessionID, tableName, SCHEMA_COL, column); rc = updateSystableEntryForSysColumn(sessionID, txnID, column, newSchema, schema, roList, err); } systemCatalogPtr->flushCache(); purgeFDCache(); // if (idbdatafile::IDBPolicy::useHdfs()) // cacheutils::flushOIDsFromCache(oidsToFlush); // cout << "rename:syscolumn is updated" << endl; return rc; } uint8_t WE_DDLCommandProc::updateSystableAuto(ByteStream& bs, std::string& err) { int rc = 0; uint32_t sessionID, tmp32, autoVal; std::string schema, tablename; int txnID; bs >> sessionID; bs >> tmp32; txnID = tmp32; bs >> schema; bs >> tablename; bs >> autoVal; CalpontSystemCatalog::TableName tableName; tableName.schema = schema; tableName.table = tablename; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryValueList dctnryValueList; WriteEngine::DctColTupleList dctRowList; WriteEngine::DctnryTuple dctColList; uint16_t dbRoot = 0; uint16_t segment; uint32_t partition; CalpontSystemCatalog::ROPair ropair; boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); try { ropair = systemCatalogPtr->tableRID(tableName); } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } if (ropair.objnum < 0) { err = "No such table: " + tableName.table; rc = 1; return rc; } // now we have to prepare the various structures for the WE to update the column. std::vector ridList; WriteEngine::ColValueList colValuesList; WriteEngine::ColTupleList aColList; WriteEngine::ColStructList colStructs; WriteEngine::CSCTypesList cscColTypeList; std::vector colOldValuesList; std::map oids; // std::vector oidsToFlush; boost::any datavalue; datavalue = autoVal; WriteEngine::ColTuple colTuple; // Build colStructs for SYSTABLE tableName.schema = CALPONT_SCHEMA; tableName.table = SYSTABLE_TABLE; DDLColumn column; findColumnData(sessionID, tableName, AUTOINC_COL, column); WriteEngine::ColStruct colStruct; WriteEngine::DctnryStruct dctnryStruct; colStruct.dataOid = column.oid; colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth; colStruct.tokenFlag = false; colStruct.colDataType = column.colType.colDataType; colTuple.data = datavalue; dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = colStruct.dataOid; if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; dctnryStruct.fCompressionType = 2; } colStructs.push_back(colStruct); cscColTypeList.push_back(column.colType); oids[colStruct.dataOid] = colStruct.dataOid; // oidsToFlush.push_back(colStruct.dataOid); dctnryStructList.push_back(dctnryStruct); aColList.push_back(colTuple); colValuesList.push_back(aColList); std::vector colExtentsStruct; std::vector colExtentsColType; std::vector dctnryExtentsStruct; WriteEngine::DctnryTuple dctnryTuple; dctColList = dctnryTuple; dctRowList.push_back(dctColList); dctnryValueList.push_back(dctRowList); // In this case, there's only 1 row, so only one one extent, but keep it generic... std::vector extentsinfo; extentInfo aExtentinfo; CalpontSystemCatalog::OID oid = 1003; convertRidToColumn(ropair.rid, dbRoot, partition, segment, oid); ridList.push_back(ropair.rid); std::vector ridLists; ridLists.push_back(ridList); aExtentinfo.dbRoot = dbRoot; aExtentinfo.partition = partition; aExtentinfo.segment = segment; extentsinfo.push_back(aExtentinfo); // build colExtentsStruct for (unsigned i = 0; i < extentsinfo.size(); i++) { for (unsigned j = 0; j < colStructs.size(); j++) { colStructs[j].fColPartition = extentsinfo[i].partition; colStructs[j].fColSegment = extentsinfo[i].segment; colStructs[j].fColDbRoot = extentsinfo[i].dbRoot; dctnryStructList[j].fColPartition = extentsinfo[i].partition; dctnryStructList[j].fColSegment = extentsinfo[i].segment; dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot; } colExtentsStruct.push_back(colStructs); colExtentsColType.push_back(cscColTypeList); dctnryExtentsStruct.push_back(dctnryStructList); } // call the write engine to update the row fWEWrapper.setTransId(txnID); fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); fWEWrapper.startTransaction(txnID); rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE); if (rc != NO_ERROR) { // build the logging message err = "WE: Update failed on: " + tableName.table; } int rc1 = 0; if (idbdatafile::IDBPolicy::useHdfs()) { rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids); if ((rc == 0) && (rc1 == 0)) { rc1 = fWEWrapper.confirmTransaction(txnID); if (rc1 == NO_ERROR) rc1 = fWEWrapper.endTransaction(txnID, true); else fWEWrapper.endTransaction(txnID, false); } else { fWEWrapper.endTransaction(txnID, false); } } if (rc == 0) rc = rc1; systemCatalogPtr->flushCache(); purgeFDCache(); // if (idbdatafile::IDBPolicy::useHdfs()) // cacheutils::flushOIDsFromCache(oidsToFlush); return rc; } uint8_t WE_DDLCommandProc::updateSystableEntryForSysTable(int32_t sessionID, uint32_t txnID, const DDLColumn& column, const std::string& value, const std::string& oldValue, CalpontSystemCatalog::ROPair ropair, std::string& err) { std::vector ridList; WriteEngine::ColValueList colValuesList; WriteEngine::ColTupleList aColList; WriteEngine::ColStructList colStructs; WriteEngine::CSCTypesList cscColTypeList; std::vector colOldValuesList; std::map oids; WriteEngine::ColTuple colTuple; WriteEngine::ColStruct colStruct; WriteEngine::DctnryStruct dctnryStruct; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryValueList dctnryValueList; WriteEngine::DctColTupleList dctRowList; WriteEngine::DctnryTuple dctColList; colStruct.dataOid = column.oid; colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth; colStruct.tokenFlag = true; colStruct.colDataType = column.colType.colDataType; // Tokenize the data value WriteEngine::DctnryStruct dictStruct; dictStruct.dctnryOid = column.colType.ddn.dictOID; dictStruct.columnOid = column.colType.columnOID; WriteEngine::DctnryTuple dictTuple; dictTuple.isNull = false; dictTuple.sigValue = (unsigned char*)value.c_str(); dictTuple.sigSize = value.length(); if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; dctnryStruct.fCompressionType = 2; } if (colStruct.tokenFlag) { dctnryStruct.dctnryOid = column.colType.ddn.dictOID; dctnryStruct.fCharsetNumber = column.colType.charsetNumber; dctnryStruct.columnOid = colStruct.dataOid; } else { dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = colStruct.dataOid; } colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); oids[colStruct.dataOid] = colStruct.dataOid; cscColTypeList.push_back(column.colType); // oidsToFlush.push_back(colStruct.dataOid); if (dctnryStruct.dctnryOid > 0) { oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid; // oidsToFlush.push_back(dctnryStruct.dctnryOid); } aColList.push_back(colTuple); colValuesList.push_back(aColList); std::vector colExtentsStruct; std::vector dctnryExtentsStruct; std::vector colExtentsColType; dctColList = dictTuple; dctRowList.push_back(dctColList); dctnryValueList.push_back(dctRowList); // In this case, there's only 1 row, so only one one extent, but keep it generic... std::vector extentsinfo; extentInfo aExtentinfo; CalpontSystemCatalog::OID oid = 1003; uint16_t dbRoot = 0; uint16_t segment; uint32_t partition; convertRidToColumn(ropair.rid, dbRoot, partition, segment, oid); ridList.push_back(ropair.rid); std::vector ridLists; ridLists.push_back(ridList); aExtentinfo.dbRoot = dbRoot; aExtentinfo.partition = partition; aExtentinfo.segment = segment; extentsinfo.push_back(aExtentinfo); // build colExtentsStruct for (unsigned i = 0; i < extentsinfo.size(); i++) { for (unsigned j = 0; j < colStructs.size(); j++) { colStructs[j].fColPartition = extentsinfo[i].partition; colStructs[j].fColSegment = extentsinfo[i].segment; colStructs[j].fColDbRoot = extentsinfo[i].dbRoot; dctnryStructList[j].fColPartition = extentsinfo[i].partition; dctnryStructList[j].fColSegment = extentsinfo[i].segment; dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot; } colExtentsStruct.push_back(colStructs); dctnryExtentsStruct.push_back(dctnryStructList); colExtentsColType.push_back(cscColTypeList); } // call the write engine to update the row fWEWrapper.setTransId(txnID); fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); fWEWrapper.startTransaction(txnID); int rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE); if (rc != NO_ERROR) { // build the logging message err = "WE: Update failed on: " + oldValue; int rc1 = 0; if (idbdatafile::IDBPolicy::useHdfs()) { rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids); if ((rc == 0) && (rc1 == 0)) { rc1 = fWEWrapper.confirmTransaction(txnID); if (rc1 == NO_ERROR) rc1 = fWEWrapper.endTransaction(txnID, true); else fWEWrapper.endTransaction(txnID, false); } else { fWEWrapper.endTransaction(txnID, false); } } if (rc == 0) rc = rc1; if (rc != 0) return rc; } int rc1 = 0; if (idbdatafile::IDBPolicy::useHdfs()) { rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids); if ((rc == 0) && (rc1 == 0)) { rc1 = fWEWrapper.confirmTransaction(txnID); if (rc1 == NO_ERROR) rc1 = fWEWrapper.endTransaction(txnID, true); else fWEWrapper.endTransaction(txnID, false); } else { fWEWrapper.endTransaction(txnID, false); } } if (rc == 0) rc = rc1; return rc; } uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& err) { uint8_t rc; uint32_t sessionID, tmp32, txnID; std::string schema, oldTablename, newTablename, newSchema; bs >> sessionID; bs >> tmp32; txnID = tmp32; bs >> schema; bs >> oldTablename; bs >> newTablename; bs >> newSchema; CalpontSystemCatalog::TableName tableName; tableName.schema = schema; tableName.table = oldTablename; CalpontSystemCatalog::ROPair ropair; boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); try { ropair = systemCatalogPtr->tableRID(tableName); } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } if (ropair.objnum < 0) { err = "No such table: " + tableName.table; return 1; } // now we have to prepare the various structures for the WE to update the column. // Build colStructs for SYSTABLE tableName.schema = CALPONT_SCHEMA; tableName.table = SYSTABLE_TABLE; DDLColumn column; findColumnData(sessionID, tableName, TABLENAME_COL, column); rc = updateSystableEntryForSysTable(sessionID, txnID, column, newTablename, oldTablename, ropair, err); if (newSchema != schema && rc == NO_ERROR) { findColumnData(sessionID, tableName, SCHEMA_COL, column); rc = updateSystableEntryForSysTable(sessionID, txnID, column, newSchema, schema, ropair, err); } systemCatalogPtr->flushCache(); purgeFDCache(); // if (idbdatafile::IDBPolicy::useHdfs()) // cacheutils::flushOIDsFromCache(oidsToFlush); // cout << "rename:syscolumn is updated" << endl; return rc; } uint8_t WE_DDLCommandProc::updateSyscolumnColumnposCol(messageqcpp::ByteStream& bs, std::string& err) { int rc = 0; int colPos; string schema, atableName; uint32_t sessionID, tmp32; int txnID; bs >> sessionID; bs >> tmp32; txnID = tmp32; bs >> schema; bs >> atableName; bs >> tmp32; colPos = tmp32; WriteEngine::RIDList ridList; WriteEngine::ColValueList colValuesList; WriteEngine::ColValueList colOldValuesList; CalpontSystemCatalog::TableName tableName; tableName.table = atableName; tableName.schema = schema; boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); CalpontSystemCatalog::RIDList rids; try { rids = systemCatalogPtr->columnRIDs(tableName); } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } CalpontSystemCatalog::RIDList::const_iterator rid_iter = rids.begin(); boost::any value; WriteEngine::ColTupleList colTuples; CalpontSystemCatalog::ColType columnType; CalpontSystemCatalog::ROPair colRO; // cout << "colpos is " << colPos << endl; try { while (rid_iter != rids.end()) { // look up colType colRO = *rid_iter; columnType = systemCatalogPtr->colType(colRO.objnum); if (columnType.colPosition < colPos) { ++rid_iter; continue; } ridList.push_back(colRO.rid); value = columnType.colPosition - 1; WriteEngine::ColTuple colTuple; colTuple.data = value; colTuples.push_back(colTuple); ++rid_iter; } } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } colValuesList.push_back(colTuples); uint16_t dbRoot; BRM::OID_t sysOid = 1021; // Find out where systable is rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot); fWEWrapper.setTransId(txnID); fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); fWEWrapper.startTransaction(txnID); std::map oids; // std::vector oidsToFlush; if (colTuples.size() > 0) { WriteEngine::ColStructList colStructs; WriteEngine::ColStruct colStruct; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryValueList dctnryValueList; WriteEngine::CSCTypesList cscColTypeList; CalpontSystemCatalog::ColType colType; // Build column structure for COLUMNPOS_COL colType.columnOID = colStruct.dataOid = OID_SYSCOLUMN_COLUMNPOS; colType.colWidth = colStruct.colWidth = 4; colStruct.tokenFlag = false; colType.colDataType = colStruct.colDataType = CalpontSystemCatalog::INT; colStruct.fColDbRoot = dbRoot; if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; } colStructs.push_back(colStruct); cscColTypeList.push_back(colType); oids[colStruct.dataOid] = colStruct.dataOid; // oidsToFlush.push_back(colStruct.dataOid); rc = fWEWrapper.updateColumnRecs(txnID, cscColTypeList, colStructs, colValuesList, ridList, SYSCOLUMN_BASE); } int rc1 = 0; if (idbdatafile::IDBPolicy::useHdfs()) { rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids); if ((rc == 0) && (rc1 == 0)) { rc1 = fWEWrapper.confirmTransaction(txnID); if (rc1 == NO_ERROR) rc1 = fWEWrapper.endTransaction(txnID, true); else fWEWrapper.endTransaction(txnID, false); } else { fWEWrapper.endTransaction(txnID, false); } } if (rc == 0) rc = rc1; systemCatalogPtr->flushCache(); purgeFDCache(); // if (idbdatafile::IDBPolicy::useHdfs()) // cacheutils::flushOIDsFromCache(oidsToFlush); return rc; } uint8_t WE_DDLCommandProc::fillNewColumn(ByteStream& bs, std::string& err) { int rc = 0; uint32_t tmp32; uint8_t tmp8; int txnID; OID dataOid, dictOid, refColOID; CalpontSystemCatalog::ColDataType dataType, refColDataType; bool autoincrement; int dataWidth, scale, precision, compressionType, refColWidth, refCompressionType; string defaultValStr; ColTuple defaultVal; long timeZone; bs >> tmp32; txnID = tmp32; bs >> tmp32; dataOid = tmp32; bs >> tmp32; dictOid = tmp32; bs >> tmp8; dataType = (CalpontSystemCatalog::ColDataType)tmp8; bs >> tmp8; autoincrement = (tmp8 != 0); bs >> tmp32; dataWidth = tmp32; bs >> tmp32; scale = tmp32; bs >> tmp32; precision = tmp32; bs >> defaultValStr; bs >> tmp8; compressionType = tmp8; bs >> tmp32; refColOID = tmp32; bs >> tmp8; refColDataType = (CalpontSystemCatalog::ColDataType)tmp8; bs >> tmp32; refColWidth = tmp32; bs >> tmp8; refCompressionType = tmp8; messageqcpp::ByteStream::octbyte timeZoneTemp; bs >> timeZoneTemp; timeZone = timeZoneTemp; // Find the fill in value bool isNULL = false; if (defaultValStr == "") isNULL = true; CalpontSystemCatalog::ColType colType; colType.colDataType = static_cast(dataType); colType.colWidth = dataWidth; colType.scale = scale; colType.precision = precision; bool pushWarning = false; defaultVal.data = colType.convertColumnData(defaultValStr, pushWarning, timeZone, isNULL, false, false); fWEWrapper.setTransId(txnID); fWEWrapper.setIsInsert(true); fWEWrapper.setBulkFlag(true); std::map oids; oids[dataOid] = dataOid; oids[refColOID] = refColOID; rc = fWEWrapper.fillColumn(txnID, dataOid, colType, defaultVal, refColOID, refColDataType, refColWidth, refCompressionType, isNULL, compressionType, defaultValStr, dictOid, autoincrement); if (rc != 0) { WErrorCodes ec; err = ec.errorString(rc); } purgeFDCache(); return rc; } uint8_t WE_DDLCommandProc::writeTruncateLog(ByteStream& bs, std::string& err) { int rc = 0; uint32_t tableOid, numOid, tmp32; bs >> tableOid; bs >> numOid; std::vector oids; for (uint32_t i = 0; i < numOid; i++) { bs >> tmp32; oids.push_back(tmp32); } string prefix; config::Config* config = config::Config::makeConfig(); prefix = config->getConfig("SystemConfig", "DBRMRoot"); if (prefix.length() == 0) { err = "Need a valid DBRMRoot entry in Calpont configuation file"; rc = 1; return rc; } uint64_t pos = prefix.find_last_of("/"); std::string DDLLogFileName; if (pos != string::npos) { DDLLogFileName = prefix.substr(0, pos + 1); // Get the file path } else { err = "Cannot find the dbrm directory for the DDL log file"; rc = 1; return rc; } std::ostringstream oss; oss << tableOid; DDLLogFileName += "DDL_TRUNCATETABLE_Log_" + oss.str(); boost::scoped_ptr DDLLogFile(IDBDataFile::open( IDBPolicy::getType(DDLLogFileName.c_str(), IDBPolicy::WRITEENG), DDLLogFileName.c_str(), "w", 0)); if (!DDLLogFile) { err = "DDL truncate table log file cannot be created"; rc = 1; return rc; } std::ostringstream buf; for (unsigned i = 0; i < oids.size(); i++) buf << oids[i] << std::endl; std::string tmp(buf.str()); DDLLogFile->write(tmp.c_str(), tmp.size()); // DDLLogFile is a scoped_ptr, will be closed after return. return rc; } uint8_t WE_DDLCommandProc::writeDropPartitionLog(ByteStream& bs, std::string& err) { int rc = 0; uint32_t tableOid, numParts, numOid, tmp32; bs >> tableOid; std::set partitionNums; bs >> numParts; BRM::LogicalPartition lp; for (uint32_t i = 0; i < numParts; i++) { lp.unserialize(bs); partitionNums.insert(lp); } bs >> numOid; std::vector oids; for (uint32_t i = 0; i < numOid; i++) { bs >> tmp32; oids.push_back(tmp32); } string prefix; config::Config* config = config::Config::makeConfig(); prefix = config->getConfig("SystemConfig", "DBRMRoot"); if (prefix.length() == 0) { err = "Need a valid DBRMRoot entry in Calpont configuation file"; rc = 1; return rc; } uint64_t pos = prefix.find_last_of("/"); std::string DDLLogFileName; if (pos != string::npos) { DDLLogFileName = prefix.substr(0, pos + 1); // Get the file path } else { err = "Cannot find the dbrm directory for the DDL drop partitions log file"; rc = 1; return rc; } std::ostringstream oss; oss << tableOid; DDLLogFileName += "DDL_DROPPARTITION_Log_" + oss.str(); boost::scoped_ptr DDLLogFile(IDBDataFile::open( IDBPolicy::getType(DDLLogFileName.c_str(), IDBPolicy::WRITEENG), DDLLogFileName.c_str(), "w", 0)); if (!DDLLogFile) { err = "DDL drop partitions log file cannot be created"; rc = 1; return rc; } std::ostringstream buf; // @SN write partition numbers to the log file, separated by space set::const_iterator it; for (it = partitionNums.begin(); it != partitionNums.end(); ++it) buf << (*it) << endl; // -1 indicates the end of partition list BRM::LogicalPartition end(-1, -1, -1); buf << end << endl; for (unsigned i = 0; i < oids.size(); i++) buf << oids[i] << std::endl; std::string tmp(buf.str()); DDLLogFile->write(tmp.c_str(), tmp.size()); return rc; } uint8_t WE_DDLCommandProc::writeDropTableLog(ByteStream& bs, std::string& err) { int rc = 0; uint32_t tableOid, numOid, tmp32; bs >> tableOid; bs >> numOid; std::vector oids; for (uint32_t i = 0; i < numOid; i++) { bs >> tmp32; oids.push_back(tmp32); } string prefix; config::Config* config = config::Config::makeConfig(); prefix = config->getConfig("SystemConfig", "DBRMRoot"); if (prefix.length() == 0) { err = "Need a valid DBRMRoot entry in Calpont configuation file"; rc = 1; return rc; } uint64_t pos = prefix.find_last_of("/"); std::string DDLLogFileName; if (pos != string::npos) { DDLLogFileName = prefix.substr(0, pos + 1); // Get the file path } else { err = "Cannot find the dbrm directory for the DDL drop partitions log file"; rc = 1; return rc; } std::ostringstream oss; oss << tableOid; DDLLogFileName += "DDL_DROPTABLE_Log_" + oss.str(); boost::scoped_ptr DDLLogFile(IDBDataFile::open( IDBPolicy::getType(DDLLogFileName.c_str(), IDBPolicy::WRITEENG), DDLLogFileName.c_str(), "w", 0)); if (!DDLLogFile) { err = "DDL drop table log file cannot be created"; rc = 1; return rc; } std::ostringstream buf; for (unsigned i = 0; i < oids.size(); i++) buf << oids[i] << std::endl; std::string tmp(buf.str()); DDLLogFile->write(tmp.c_str(), tmp.size()); return rc; } uint8_t WE_DDLCommandProc::deleteDDLLog(ByteStream& bs, std::string& err) { int rc = 0; uint32_t tableOid, fileType; bs >> fileType; bs >> tableOid; string prefix; config::Config* config = config::Config::makeConfig(); prefix = config->getConfig("SystemConfig", "DBRMRoot"); if (prefix.length() == 0) { err = "Need a valid DBRMRoot entry in Calpont configuation file"; rc = 1; return rc; } uint64_t pos = prefix.find_last_of("/"); std::string DDLLogFileName; if (pos != string::npos) { DDLLogFileName = prefix.substr(0, pos + 1); // Get the file path } else { err = "Cannot find the dbrm directory for the DDL drop partitions log file"; rc = 1; return rc; } std::ostringstream oss; oss << tableOid; switch (fileType) { case DROPTABLE_LOG: { DDLLogFileName += "DDL_DROPTABLE_Log_" + oss.str(); break; } case DROPPART_LOG: { DDLLogFileName += "DDL_DROPPARTITION_Log_" + oss.str(); break; } case TRUNCATE_LOG: { DDLLogFileName += "DDL_TRUNCATETABLE_Log_" + oss.str(); break; } default: break; } IDBPolicy::remove(DDLLogFileName.c_str()); return rc; } uint8_t WE_DDLCommandProc::fetchDDLLog(ByteStream& bs, std::string& err) { int rc = 0; // Find the ddl log files under DBRMRoot directory string prefix, ddlLogDir; config::Config* config = config::Config::makeConfig(); prefix = config->getConfig("SystemConfig", "DBRMRoot"); if (prefix.length() == 0) { rc = 1; err = "Need a valid DBRMRoot entry in Calpont configuation file"; return rc; } uint64_t pos = prefix.find_last_of("/"); if (pos != string::npos) { ddlLogDir = prefix.substr(0, pos + 1); // Get the file path } else { rc = 1; err = "Cannot find the dbrm directory for the DDL log file"; return rc; } boost::filesystem::path filePath; filePath = fs::system_complete(fs::path(ddlLogDir)); if (!fs::exists(filePath)) { rc = 1; err = "\nDDL log file path is Not found: "; return rc; } std::vector fileNames; if (fs::is_directory(filePath)) { fs::directory_iterator end_iter; for (fs::directory_iterator dir_itr(filePath); dir_itr != end_iter; ++dir_itr) { try { if (!fs::is_directory(*dir_itr)) { #if BOOST_VERSION >= 105200 fileNames.push_back(dir_itr->path().generic_string()); #else fileNames.push_back(dir_itr->string()); #endif } } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } } } CalpontSystemCatalog::OID fileoid; string tableName; bs.restart(); for (unsigned i = 0; i < fileNames.size(); i++) { pos = fileNames[i].find("DDL_DROPTABLE_Log_"); if (pos != string::npos) { // Read the file to get oids // cout << "Found file " << fileNames[i] << endl; boost::scoped_ptr ddlLogFile(IDBDataFile::open( IDBPolicy::getType(fileNames[i].c_str(), IDBPolicy::WRITEENG), fileNames[i].c_str(), "r", 0)); if (!ddlLogFile) continue; // find the table oid pos = fileNames[i].find_last_of("_"); string tableOidStr = fileNames[i].substr(pos + 1, fileNames[i].length() - pos - 1); char* ep = NULL; uint32_t tableOid = strtoll(tableOidStr.c_str(), &ep, 10); bs << tableOid; bs << (uint32_t)DROPTABLE_LOG; std::vector oidList; ssize_t fileSize = ddlLogFile->size(); boost::scoped_array buf(new char[fileSize]); if (ddlLogFile->read(buf.get(), fileSize) != fileSize) return (uint8_t)ERR_FILE_READ; std::istringstream strbuf(string(buf.get(), fileSize)); while (strbuf >> fileoid) oidList.push_back(fileoid); bs << (uint32_t)oidList.size(); for (unsigned j = 0; j < oidList.size(); j++) { bs << (uint32_t)oidList[j]; } bs << (uint32_t)0; } else // Find drop partition log file { pos = fileNames[i].find("DDL_DROPPARTITION_Log_"); if (pos != string::npos) { boost::scoped_ptr ddlLogFile(IDBDataFile::open( IDBPolicy::getType(fileNames[i].c_str(), IDBPolicy::WRITEENG), fileNames[i].c_str(), "r", 0)); BRM::LogicalPartition partition; vector partitionNums; // find the table oid pos = fileNames[i].find_last_of("_"); string tableOidStr = fileNames[i].substr(pos + 1, fileNames[i].length() - pos - 1); char* ep = NULL; uint32_t tableOid = strtoll(tableOidStr.c_str(), &ep, 10); bs << tableOid; bs << (uint32_t)DROPPART_LOG; ssize_t fileSize = ddlLogFile->size(); boost::scoped_array buf(new char[fileSize]); if (ddlLogFile->read(buf.get(), fileSize) != fileSize) return (uint8_t)ERR_FILE_READ; std::istringstream strbuf(string(buf.get(), fileSize)); while (strbuf >> partition) { if (partition.dbroot == (uint16_t)-1) break; partitionNums.push_back(partition); } std::vector oidPartList; while (strbuf >> fileoid) oidPartList.push_back(fileoid); bs << (uint32_t)oidPartList.size(); for (unsigned j = 0; j < oidPartList.size(); j++) { bs << (uint32_t)oidPartList[j]; } bs << (uint32_t)partitionNums.size(); for (unsigned j = 0; j < partitionNums.size(); j++) { partitionNums[j].serialize(bs); } } else // find truncate table log file { pos = fileNames[i].find("DDL_TRUNCATETABLE_Log_"); if (pos != string::npos) { boost::scoped_ptr ddlLogFile(IDBDataFile::open( IDBPolicy::getType(fileNames[i].c_str(), IDBPolicy::WRITEENG), fileNames[i].c_str(), "r", 0)); if (!ddlLogFile) { continue; } // find the table oid pos = fileNames[i].find_last_of("_"); string tableOidStr = fileNames[i].substr(pos + 1, fileNames[i].length() - pos - 1); char* ep = NULL; uint32_t tableOid = strtoll(tableOidStr.c_str(), &ep, 10); bs << tableOid; bs << (uint32_t)TRUNCATE_LOG; std::vector oidList; ssize_t fileSize = ddlLogFile->size(); boost::scoped_array buf(new char[fileSize]); if (ddlLogFile->read(buf.get(), fileSize) != fileSize) return (uint8_t)ERR_FILE_READ; std::istringstream strbuf(string(buf.get(), fileSize)); while (strbuf >> fileoid) oidList.push_back(fileoid); bs << (uint32_t)oidList.size(); for (unsigned j = 0; j < oidList.size(); j++) { bs << (uint32_t)oidList[j]; } bs << (uint32_t)0; } } } } return rc; } uint8_t WE_DDLCommandProc::updateSyscolumnSetDefault(messageqcpp::ByteStream& bs, std::string& err) { // Will update five columns: columnname, defaultvalue, nullable, autoincrement, nextvalue. int rc = 0; uint32_t tmp32; string schema, tableName, colName, defaultvalue; int txnID; uint32_t sessionID; bs >> sessionID; bs >> tmp32; txnID = tmp32; bs >> schema; bs >> tableName; bs >> colName; bs >> defaultvalue; boost::shared_ptr systemCatalogPtr; systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); CalpontSystemCatalog::TableName atableName; CalpontSystemCatalog::TableColName tableColName; tableColName.schema = schema; tableColName.table = tableName; tableColName.column = colName; CalpontSystemCatalog::ROPair ropair; try { ropair = systemCatalogPtr->columnRID(tableColName); if (ropair.objnum < 0) { ostringstream oss; oss << "No such column: " << tableColName; throw std::runtime_error(oss.str().c_str()); } } catch (exception& ex) { err = ex.what(); rc = 1; return rc; } catch (...) { err = "renameColumn:Unknown exception caught"; rc = 1; return rc; } uint16_t dbRoot = 0; uint16_t segment; uint32_t partition; std::vector ridList; ridList.push_back(ropair.rid); WriteEngine::ColValueList colValuesList; WriteEngine::ColTupleList aColList1; WriteEngine::ColStructList colStructs; WriteEngine::CSCTypesList cscColTypeList; std::vector colOldValuesList; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryValueList dctnryValueList; WriteEngine::DctColTupleList dctRowList; WriteEngine::DctnryTuple dctColList; std::map oids; // std::vector oidsToFlush; WriteEngine::ColTuple colTuple; // Build colStructs for SYSCOLUMN atableName.schema = CALPONT_SCHEMA; atableName.table = SYSCOLUMN_TABLE; DDLColumn column; findColumnData(sessionID, atableName, DEFAULTVAL_COL, column); // DEFAULTVAL_COL column WriteEngine::ColStruct colStruct; WriteEngine::DctnryStruct dctnryStruct; fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); fWEWrapper.setTransId(txnID); fWEWrapper.startTransaction(txnID); // Build DEFAULTVAL_COL structure WriteEngine::ColTupleList aColList; colStruct.dataOid = column.oid; colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth; colStruct.tokenFlag = false; if ((column.colType.colDataType == CalpontSystemCatalog::CHAR && column.colType.colWidth > 8) || (column.colType.colDataType == CalpontSystemCatalog::VARCHAR && column.colType.colWidth > 7) || (column.colType.colDataType == CalpontSystemCatalog::VARBINARY && column.colType.colWidth > 7) || (column.colType.colDataType == CalpontSystemCatalog::BLOB && column.colType.colWidth > 7) || (column.colType.colDataType == CalpontSystemCatalog::TEXT && column.colType.colWidth > 7) || (column.colType.colDataType == CalpontSystemCatalog::DECIMAL && column.colType.precision > 18) || (column.colType.colDataType == CalpontSystemCatalog::UDECIMAL && column.colType.precision > 18)) // token { colStruct.colWidth = 8; colStruct.tokenFlag = true; } else { colStruct.colWidth = column.colType.colWidth; } colStruct.colDataType = column.colType.colDataType; if (colStruct.tokenFlag) { WriteEngine::DctnryStruct dictStruct; dictStruct.dctnryOid = column.colType.ddn.dictOID; dictStruct.columnOid = column.colType.columnOID; if (defaultvalue.length() <= 0) // null token { WriteEngine::Token nullToken; colTuple.data = nullToken; } else { WriteEngine::DctnryTuple dictTuple; dictTuple.sigValue = (unsigned char*)defaultvalue.c_str(); dictTuple.sigSize = defaultvalue.length(); dictTuple.isNull = false; int error = NO_ERROR; if (NO_ERROR != (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple, false))) // @bug 5572 HDFS tmp file { WErrorCodes ec; throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error)); } WriteEngine::Token aToken = dictTuple.token; colTuple.data = aToken; } } colStruct.colDataType = column.colType.colDataType; if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; dctnryStruct.fCompressionType = 2; } if (colStruct.tokenFlag) { dctnryStruct.dctnryOid = column.colType.ddn.dictOID; dctnryStruct.fCharsetNumber = column.colType.charsetNumber; dctnryStruct.columnOid = colStruct.dataOid; } else { dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = colStruct.dataOid; } colStructs.push_back(colStruct); oids[colStruct.dataOid] = colStruct.dataOid; cscColTypeList.push_back(column.colType); // oidsToFlush.push_back(colStruct.dataOid); if (dctnryStruct.dctnryOid > 0) { oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid; // oidsToFlush.push_back(dctnryStruct.dctnryOid); } dctnryStructList.push_back(dctnryStruct); aColList.push_back(colTuple); colValuesList.push_back(aColList); WriteEngine::DctnryTuple dctnryTuple; if (defaultvalue.length() > 0) { dctnryTuple.sigValue = (unsigned char*)defaultvalue.c_str(); dctnryTuple.sigSize = defaultvalue.length(); dctnryTuple.isNull = false; } else { dctnryTuple.isNull = true; } dctColList = dctnryTuple; dctRowList.clear(); dctRowList.push_back(dctColList); dctnryValueList.push_back(dctRowList); std::vector colExtentsStruct; std::vector dctnryExtentsStruct; std::vector colExtentsColType; std::vector ridLists; ridLists.push_back(ridList); // In this case, there's only 1 row, so only one one extent, but keep it generic... std::vector extentsinfo; extentInfo aExtentinfo; convertRidToColumn(ropair.rid, dbRoot, partition, segment, 1021); aExtentinfo.dbRoot = dbRoot; aExtentinfo.partition = partition; aExtentinfo.segment = segment; extentsinfo.push_back(aExtentinfo); // build colExtentsStruct for (unsigned i = 0; i < extentsinfo.size(); i++) { for (unsigned j = 0; j < colStructs.size(); j++) { colStructs[j].fColPartition = extentsinfo[i].partition; colStructs[j].fColSegment = extentsinfo[i].segment; colStructs[j].fColDbRoot = extentsinfo[i].dbRoot; dctnryStructList[j].fColPartition = extentsinfo[i].partition; dctnryStructList[j].fColSegment = extentsinfo[i].segment; dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot; } colExtentsStruct.push_back(colStructs); dctnryExtentsStruct.push_back(dctnryStructList); colExtentsColType.push_back(cscColTypeList); } // call the write engine to update the row if (NO_ERROR != fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE)) { err = "WE: Update failed on: " + atableName.table; rc = 1; } int rc1 = 0; if (idbdatafile::IDBPolicy::useHdfs()) { rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids); if ((rc == 0) && (rc1 == 0)) { rc1 = fWEWrapper.confirmTransaction(txnID); if (rc1 == NO_ERROR) rc1 = fWEWrapper.endTransaction(txnID, true); else fWEWrapper.endTransaction(txnID, false); } else { fWEWrapper.endTransaction(txnID, false); } } if (rc == 0) rc = rc1; // flush syscat cahche systemCatalogPtr->flushCache(); purgeFDCache(); // if (idbdatafile::IDBPolicy::useHdfs()) // cacheutils::flushOIDsFromCache(oidsToFlush); return rc; } uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream& bs, std::string& err) { // Will update five columns: columnname, defaultvalue, nullable, autoincrement, nextvalue. int rc = 0; uint64_t nextVal; uint32_t tmp32, nullable; string schema, tableName, colOldname, autoinc, colNewName, defaultvalue; int txnID; uint32_t sessionID; bs >> sessionID; bs >> tmp32; txnID = tmp32; bs >> schema; bs >> tableName; bs >> colOldname; bs >> colNewName; bs >> autoinc; bs >> nextVal; bs >> nullable; bs >> defaultvalue; boost::shared_ptr systemCatalogPtr; systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); CalpontSystemCatalog::TableName atableName; CalpontSystemCatalog::TableColName tableColName; tableColName.schema = schema; tableColName.table = tableName; tableColName.column = colOldname; CalpontSystemCatalog::ROPair ropair; try { ropair = systemCatalogPtr->columnRID(tableColName); if (ropair.objnum < 0) { ostringstream oss; oss << "No such column: " << tableColName; throw std::runtime_error(oss.str().c_str()); } } catch (exception& ex) { err = ex.what(); rc = 1; return rc; } catch (...) { err = "renameColumn:Unknown exception caught"; rc = 1; return rc; } uint16_t dbRoot = 0; uint16_t segment; uint32_t partition; std::vector ridList; ridList.push_back(ropair.rid); WriteEngine::ColValueList colValuesList; WriteEngine::ColTupleList aColList1; WriteEngine::ColStructList colStructs; WriteEngine::CSCTypesList cscColTypeList; std::vector colOldValuesList; std::map oids; // std::vector oidsToFlush; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryValueList dctnryValueList; WriteEngine::DctColTupleList dctRowList; WriteEngine::DctnryTuple dctColList; boost::any datavalue; datavalue = colNewName; WriteEngine::ColTuple colTuple; // Build colStructs for SYSCOLUMN atableName.schema = CALPONT_SCHEMA; atableName.table = SYSCOLUMN_TABLE; DDLColumn column1, column2, column3, column4, column5; findColumnData(sessionID, atableName, COLNAME_COL, column1); // COLNAME_COL column findColumnData(sessionID, atableName, AUTOINC_COL, column2); // AUTOINC_COL column findColumnData(sessionID, atableName, NEXTVALUE_COL, column3); // NEXTVALUE_COL column findColumnData(sessionID, atableName, NULLABLE_COL, column4); // NULLABLE_COL column findColumnData(sessionID, atableName, DEFAULTVAL_COL, column5); // DEFAULTVAL_COL column WriteEngine::ColStruct colStruct; WriteEngine::DctnryStruct dctnryStruct; if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; dctnryStruct.fCompressionType = 2; } fWEWrapper.setTransId(txnID); fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); fWEWrapper.startTransaction(txnID); // Build COLNAME_COL structure colStruct.dataOid = column1.oid; colStruct.colWidth = column1.colType.colWidth > 8 ? 8 : column1.colType.colWidth; colStruct.tokenFlag = false; if ((column1.colType.colDataType == CalpontSystemCatalog::CHAR && column1.colType.colWidth > 8) || (column1.colType.colDataType == CalpontSystemCatalog::VARCHAR && column1.colType.colWidth > 7) || (column1.colType.colDataType == CalpontSystemCatalog::VARBINARY && column1.colType.colWidth > 7) || (column1.colType.colDataType == CalpontSystemCatalog::BLOB && column1.colType.colWidth > 7) || (column1.colType.colDataType == CalpontSystemCatalog::TEXT && column1.colType.colWidth > 7) || (column1.colType.colDataType == CalpontSystemCatalog::DECIMAL && column1.colType.precision > 18) || (column1.colType.colDataType == CalpontSystemCatalog::UDECIMAL && column1.colType.precision > 18)) // token { colStruct.colWidth = 8; colStruct.tokenFlag = true; } else { colStruct.colWidth = column1.colType.colWidth; } colStruct.colDataType = column1.colType.colDataType; if (colStruct.tokenFlag) { WriteEngine::DctnryStruct dictStruct; if (idbdatafile::IDBPolicy::useHdfs()) { dictStruct.fCompressionType = 2; } dictStruct.dctnryOid = column1.colType.ddn.dictOID; dictStruct.columnOid = column1.colType.columnOID; WriteEngine::DctnryTuple dictTuple; dictTuple.sigValue = (unsigned char*)colNewName.c_str(); dictTuple.sigSize = colNewName.length(); dictTuple.isNull = false; int error = NO_ERROR; if (NO_ERROR != (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple, false))) // @bug 5572 HDFS tmp file { WErrorCodes ec; err = ec.errorString(error); rc = error; return rc; } WriteEngine::Token aToken = dictTuple.token; colTuple.data = aToken; } else colTuple.data = datavalue; colStruct.colDataType = column1.colType.colDataType; if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; dctnryStruct.fCompressionType = 2; } if (colStruct.tokenFlag) { dctnryStruct.dctnryOid = column1.colType.ddn.dictOID; dctnryStruct.fCharsetNumber = column1.colType.charsetNumber; dctnryStruct.columnOid = colStruct.dataOid; } else { dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = colStruct.dataOid; } colStructs.push_back(colStruct); oids[colStruct.dataOid] = colStruct.dataOid; cscColTypeList.push_back(column1.colType); // oidsToFlush.push_back(colStruct.dataOid); if (dctnryStruct.dctnryOid > 0) { oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid; // oidsToFlush.push_back(dctnryStruct.dctnryOid); } dctnryStructList.push_back(dctnryStruct); aColList1.push_back(colTuple); colValuesList.push_back(aColList1); WriteEngine::DctnryTuple dctnryTuple; boost::to_lower(colNewName); dctnryTuple.sigValue = (unsigned char*)colNewName.c_str(); dctnryTuple.sigSize = colNewName.length(); dctnryTuple.isNull = false; dctColList = dctnryTuple; dctRowList.clear(); dctRowList.push_back(dctColList); dctnryValueList.push_back(dctRowList); // Build AUTOINC_COL structure WriteEngine::ColTupleList aColList2; colStruct.dataOid = column2.oid; colStruct.colWidth = column2.colType.colWidth > 8 ? 8 : column2.colType.colWidth; colStruct.tokenFlag = false; colStruct.colDataType = column2.colType.colDataType; colTuple.data = autoinc; colStruct.colDataType = column2.colType.colDataType; dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = colStruct.dataOid; colStructs.push_back(colStruct); oids[colStruct.dataOid] = colStruct.dataOid; cscColTypeList.push_back(column2.colType); // oidsToFlush.push_back(colStruct.dataOid); if (dctnryStruct.dctnryOid > 0) { oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid; // oidsToFlush.push_back(dctnryStruct.dctnryOid); } dctnryStructList.push_back(dctnryStruct); aColList2.push_back(colTuple); colValuesList.push_back(aColList2); dctnryTuple.isNull = true; dctColList = dctnryTuple; dctRowList.clear(); dctRowList.push_back(dctColList); dctnryValueList.push_back(dctRowList); // Build NEXTVALUE_COL structure WriteEngine::ColTupleList aColList3; colStruct.dataOid = column3.oid; colStruct.colWidth = column3.colType.colWidth > 8 ? 8 : column3.colType.colWidth; colStruct.tokenFlag = false; colStruct.colDataType = column3.colType.colDataType; colTuple.data = nextVal; colStruct.colDataType = column3.colType.colDataType; dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = colStruct.dataOid; colStructs.push_back(colStruct); oids[colStruct.dataOid] = colStruct.dataOid; cscColTypeList.push_back(column3.colType); // oidsToFlush.push_back(colStruct.dataOid); if (dctnryStruct.dctnryOid > 0) { oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid; // oidsToFlush.push_back(dctnryStruct.dctnryOid); } dctnryStructList.push_back(dctnryStruct); aColList3.push_back(colTuple); colValuesList.push_back(aColList3); dctnryTuple.isNull = true; dctColList = dctnryTuple; dctRowList.clear(); dctRowList.push_back(dctColList); dctnryValueList.push_back(dctRowList); // Build NULLABLE_COL structure WriteEngine::ColTupleList aColList4; colStruct.dataOid = column4.oid; colStruct.colWidth = column4.colType.colWidth > 8 ? 8 : column4.colType.colWidth; colStruct.tokenFlag = false; colStruct.colDataType = column4.colType.colDataType; colTuple.data = nullable; colStruct.colDataType = column4.colType.colDataType; dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = colStruct.dataOid; colStructs.push_back(colStruct); oids[colStruct.dataOid] = colStruct.dataOid; cscColTypeList.push_back(column4.colType); // oidsToFlush.push_back(colStruct.dataOid); if (dctnryStruct.dctnryOid > 0) { oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid; // oidsToFlush.push_back(dctnryStruct.dctnryOid); } dctnryStructList.push_back(dctnryStruct); aColList4.push_back(colTuple); colValuesList.push_back(aColList4); dctnryTuple.isNull = true; dctColList = dctnryTuple; dctRowList.clear(); dctRowList.push_back(dctColList); dctnryValueList.push_back(dctRowList); // Build DEFAULTVAL_COL structure WriteEngine::ColTupleList aColList5; colStruct.dataOid = column5.oid; colStruct.colWidth = column5.colType.colWidth > 8 ? 8 : column5.colType.colWidth; colStruct.tokenFlag = false; if ((column5.colType.colDataType == CalpontSystemCatalog::CHAR && column5.colType.colWidth > 8) || (column5.colType.colDataType == CalpontSystemCatalog::VARCHAR && column5.colType.colWidth > 7) || (column5.colType.colDataType == CalpontSystemCatalog::VARBINARY && column5.colType.colWidth > 7) || (column5.colType.colDataType == CalpontSystemCatalog::BLOB && column5.colType.colWidth > 7) || (column5.colType.colDataType == CalpontSystemCatalog::TEXT && column5.colType.colWidth > 7) || (column5.colType.colDataType == CalpontSystemCatalog::DECIMAL && column5.colType.precision > 18) || (column5.colType.colDataType == CalpontSystemCatalog::UDECIMAL && column5.colType.precision > 18)) // token { colStruct.colWidth = 8; colStruct.tokenFlag = true; } else { colStruct.colWidth = column5.colType.colWidth; } colStruct.colDataType = column5.colType.colDataType; if (colStruct.tokenFlag) { WriteEngine::DctnryStruct dictStruct; if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; dictStruct.fCompressionType = 2; } dictStruct.dctnryOid = column5.colType.ddn.dictOID; dictStruct.columnOid = column5.colType.columnOID; if (defaultvalue.length() <= 0) // null token { WriteEngine::Token nullToken; colTuple.data = nullToken; } else { WriteEngine::DctnryTuple dictTuple; dictTuple.sigValue = (unsigned char*)defaultvalue.c_str(); dictTuple.sigSize = defaultvalue.length(); dictTuple.isNull = false; int error = NO_ERROR; if (NO_ERROR != (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple, false))) // @bug 5572 HDFS tmp file { WErrorCodes ec; throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error)); } WriteEngine::Token aToken = dictTuple.token; colTuple.data = aToken; } } fWEWrapper.flushDataFiles(rc, txnID, oids); colStruct.colDataType = column5.colType.colDataType; if (idbdatafile::IDBPolicy::useHdfs()) { colStruct.fCompressionType = 2; dctnryStruct.fCompressionType = 2; } if (colStruct.tokenFlag) { dctnryStruct.dctnryOid = column5.colType.ddn.dictOID; dctnryStruct.fCharsetNumber = column5.colType.charsetNumber; dctnryStruct.columnOid = colStruct.dataOid; } else { dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = colStruct.dataOid; } colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); oids[colStruct.dataOid] = colStruct.dataOid; cscColTypeList.push_back(column5.colType); // oidsToFlush.push_back(colStruct.dataOid); if (dctnryStruct.dctnryOid > 0) { oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid; // oidsToFlush.push_back(dctnryStruct.dctnryOid); } aColList5.push_back(colTuple); colValuesList.push_back(aColList5); if (defaultvalue.length() > 0) { dctnryTuple.sigValue = (unsigned char*)defaultvalue.c_str(); dctnryTuple.sigSize = defaultvalue.length(); dctnryTuple.isNull = false; } else { dctnryTuple.isNull = true; } dctColList = dctnryTuple; dctRowList.clear(); dctRowList.push_back(dctColList); dctnryValueList.push_back(dctRowList); std::vector colExtentsStruct; std::vector colExtentsColType; std::vector dctnryExtentsStruct; std::vector ridLists; ridLists.push_back(ridList); // In this case, there's only 1 row, so only one one extent, but keep it generic... std::vector extentsinfo; extentInfo aExtentinfo; convertRidToColumn(ropair.rid, dbRoot, partition, segment, 1021); aExtentinfo.dbRoot = dbRoot; aExtentinfo.partition = partition; aExtentinfo.segment = segment; extentsinfo.push_back(aExtentinfo); // build colExtentsStruct for (unsigned i = 0; i < extentsinfo.size(); i++) { for (unsigned j = 0; j < colStructs.size(); j++) { colStructs[j].fColPartition = extentsinfo[i].partition; colStructs[j].fColSegment = extentsinfo[i].segment; colStructs[j].fColDbRoot = extentsinfo[i].dbRoot; dctnryStructList[j].fColPartition = extentsinfo[i].partition; dctnryStructList[j].fColSegment = extentsinfo[i].segment; dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot; } colExtentsStruct.push_back(colStructs); dctnryExtentsStruct.push_back(dctnryStructList); colExtentsColType.push_back(cscColTypeList); } // call the write engine to update the row if (NO_ERROR != fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE)) { err = "WE: Update failed on: " + atableName.table; rc = 1; } int rc1 = 0; if (idbdatafile::IDBPolicy::useHdfs()) { rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids); if ((rc == 0) && (rc1 == 0)) { rc1 = fWEWrapper.confirmTransaction(txnID); if (rc1 == NO_ERROR) rc1 = fWEWrapper.endTransaction(txnID, true); else fWEWrapper.endTransaction(txnID, false); } else { fWEWrapper.endTransaction(txnID, false); } } if (rc == 0) rc = rc1; // flush syscat cahche systemCatalogPtr->flushCache(); purgeFDCache(); // if (idbdatafile::IDBPolicy::useHdfs()) // cacheutils::flushOIDsFromCache(oidsToFlush); return rc; } uint8_t WE_DDLCommandProc::dropPartitions(ByteStream& bs, std::string& err) { int rc = 0; uint32_t size, i; uint32_t tmp32; std::vector dataOids; std::vector partitions; bs >> size; for (i = 0; i < size; ++i) { bs >> tmp32; dataOids.push_back(tmp32); } bs >> size; BRM::PartitionInfo pi; for (i = 0; i < size; ++i) { pi.unserialize(bs); partitions.push_back(pi); } try { rc = fWEWrapper.deletePartitions(dataOids, partitions); } catch (...) { err = "WE: Error removing files "; rc = 1; } return rc; } void WE_DDLCommandProc::purgeFDCache() { if (idbdatafile::IDBPolicy::useHdfs()) { TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(SYSCOLUMN_BASE); ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap(); ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin(); ColExtsInfo::iterator aIt; std::vector files; BRM::FileInfo aFile; vector lbidList; BRM::LBID_t startLbid; while (it != colsExtsInfoMap.end()) { aIt = (it->second).begin(); aFile.oid = it->first; while (aIt != (it->second).end()) { aFile.partitionNum = aIt->partNum; aFile.dbRoot = aIt->dbRoot; aFile.segmentNum = aIt->segNum; aFile.compType = aIt->compType; files.push_back(aFile); fDbrm.lookupLocalStartLbid(aFile.oid, aFile.partitionNum, aFile.segmentNum, aIt->hwm, startLbid); // cout <<"Added to files oid:dbroot:part:seg:compType = " << // aFile.oid<<":"<