diff --git a/build/bootstrap_mcs.sh b/build/bootstrap_mcs.sh index 063cce2cd..a10cb6d52 100755 --- a/build/bootstrap_mcs.sh +++ b/build/bootstrap_mcs.sh @@ -4,6 +4,7 @@ # - the server's source code is two directories above the MCS engine source. # - the script is to be run under root. +set -o pipefail SCRIPT_LOCATION=$(dirname "$0") MDB_SOURCE_PATH=$(realpath $SCRIPT_LOCATION/../../../..) @@ -44,7 +45,7 @@ optparse.define short=G long=draw-deps desc="Draw dependencies graph" variable=D optparse.define short=M long=skip-smoke desc="Skip final smoke test" variable=SKIP_SMOKE default=false value=true optparse.define short=n long=no-clean-install desc="Do not perform a clean install (keep existing db files)" variable=NO_CLEAN default=false value=true optparse.define short=j long=parallel desc="Number of paralles for build" variable=CPUS default=$(getconf _NPROCESSORS_ONLN) -optparse.define short=F long=show-build-flags desc="Print CMake flags, while build" variable=PRINT_CMAKE_FLAGS default=false +optparse.define short=F long=show-build-flags desc="Print CMake flags, while build" variable=PRINT_CMAKE_FLAGS default=false value=true optparse.define short=c long=cloud desc="Enable cloud storage" variable=CLOUD_STORAGE_ENABLED default=false value=true optparse.define short=f long=do-not-freeze-revision desc="Disable revision freezing, or do not set 'update none' for columnstore submodule in MDB repository" variable=DO_NOT_FREEZE_REVISION default=false value=true @@ -69,6 +70,8 @@ if [[ $OS = 'Ubuntu' || $OS = 'Debian' ]]; then CONFIG_DIR="/etc/mysql/mariadb.conf.d" fi +export CLICOLOR_FORCE=1 + disable_git_restore_frozen_revision() { @@ -323,7 +326,8 @@ build() message "Configuring cmake silently" ${CMAKE_BIN_NAME} -DCMAKE_BUILD_TYPE=$MCS_BUILD_TYPE $MDB_CMAKE_FLAGS . | spinner message_split - ${CMAKE_BIN_NAME} --build . -j $CPUS && \ + + ${CMAKE_BIN_NAME} --build . -j $CPUS | onelinearizator && \ message "Installing silently" && ${CMAKE_BIN_NAME} --install . | spinner 30 diff --git a/build/utils.sh b/build/utils.sh index 9e9b72bf0..d367347d7 100644 --- a/build/utils.sh +++ b/build/utils.sh @@ -97,6 +97,13 @@ function spinner echo } +function onelinearizator +{ + while read data; do + echo -ne "\r\033[K$data" + done; + echo +} detect_distro() { diff --git a/dbcon/ddlpackageproc/altertableprocessor.cpp b/dbcon/ddlpackageproc/altertableprocessor.cpp index 5055ebb06..2863b092e 100644 --- a/dbcon/ddlpackageproc/altertableprocessor.cpp +++ b/dbcon/ddlpackageproc/altertableprocessor.cpp @@ -1899,7 +1899,7 @@ void AlterTableProcessor::renameTable(uint32_t sessionID, execplan::CalpontSyste boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); execplan::CalpontSystemCatalog::TableName tableName; - tableName.schema = fTableName.fSchema; + tableName.schema = ataRenameTable.fQualifiedName->fSchema; tableName.table = ataRenameTable.fQualifiedName->fName; execplan::CalpontSystemCatalog::ROPair roPair; roPair.objnum = 0; @@ -1924,6 +1924,7 @@ void AlterTableProcessor::renameTable(uint32_t sessionID, execplan::CalpontSyste bytestream << fTableName.fSchema; bytestream << fTableName.fName; bytestream << ataRenameTable.fQualifiedName->fName; + bytestream << ataRenameTable.fQualifiedName->fSchema; std::string errorMsg; uint16_t dbRoot; @@ -1997,6 +1998,7 @@ void AlterTableProcessor::renameTable(uint32_t sessionID, execplan::CalpontSyste bytestream << fTableName.fSchema; bytestream << fTableName.fName; bytestream << ataRenameTable.fQualifiedName->fName; + bytestream << ataRenameTable.fQualifiedName->fSchema; sysOid = 1021; // Find out where syscolumn is rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot); diff --git a/dbcon/ddlpackageproc/ddlpackageprocessor.cpp b/dbcon/ddlpackageproc/ddlpackageprocessor.cpp index 533274be8..6d6bff705 100644 --- a/dbcon/ddlpackageproc/ddlpackageprocessor.cpp +++ b/dbcon/ddlpackageproc/ddlpackageprocessor.cpp @@ -1127,148 +1127,6 @@ void DDLPackageProcessor::createWriteTruncateTableLogFile( throw std::runtime_error(errorMsg); } -#if 0 -void DDLPackageProcessor::createOpenTruncateTableLogFile(execplan::CalpontSystemCatalog::OID tableOid, execplan::CalpontSystemCatalog::TableName tableName) -{ - SUMMARY_INFO("DDLPackageProcessor::createOpenTruncateTableLogFile"); - //Build file name with tableOid. Currently, table oid is not returned and therefore not reused - string prefix, error; - config::Config* config = config::Config::makeConfig(); - prefix = config->getConfig("SystemConfig", "DBRMRoot"); - - if (prefix.length() == 0) - { - error = "Need a valid DBRMRoot entry in Calpont configuation file"; - throw std::runtime_error(error); - } - - uint64_t pos = prefix.find_last_of ("/") ; - - if (pos != string::npos) - { - fDDLLogFileName = prefix.substr(0, pos + 1); //Get the file path - } - else - { - error = "Cannot find the dbrm directory for the DDL log file"; - throw std::runtime_error(error); - - } - - std::ostringstream oss; - oss << tableOid; - fDDLLogFileName += "DDL_TRUNCATETABLE_Log_" + oss.str(); - fDDLLogFile.open(fDDLLogFileName.c_str(), ios::out); - - if (!fDDLLogFile) - { - error = "DDL truncate table log file cannot be created"; - throw std::runtime_error(error); - } -} - -void DDLPackageProcessor::removeIndexFiles(execplan::CalpontSystemCatalog::SCN txnID, - DDLResult& result, - execplan::CalpontSystemCatalog::IndexOIDList& idxOIDList) -{ - /* SUMMARY_INFO("DDLPackageProcessor::removeIndexFiles"); - - if (result.result != NO_ERROR) - return; - - int err = 0; - CalpontSystemCatalog::IndexOID idxOID; - CalpontSystemCatalog::IndexOIDList::const_iterator iter = idxOIDList.begin(); - std::string error; - try - { - while(iter != idxOIDList.end()) - { - idxOID = *iter; - if (idxOID.objnum < 3000 || idxOID.listOID < 3000) - { - ++iter; - continue; - } - err = -1; - if (err) - { - WErrorCodes ec; - error = "WE: Error removing index files: " + getFileName(idxOID.objnum) + ", " + getFileName(idxOID.listOID) + ". error = " + ec.errorString(err); - throw std::runtime_error(error); - } - - ++iter; - } - } - catch (std::exception& ex) - { - error = ex.what(); - throw std::runtime_error(error); - } - catch (...) - { - error = "Unknown exception caught"; - throw std::runtime_error(error); - } - */ -} - - - -void DDLPackageProcessor::updateSyscolumns(execplan::CalpontSystemCatalog::SCN txnID, - DDLResult& result, WriteEngine::RIDList& ridList, - WriteEngine::ColValueList& colValuesList, - WriteEngine::ColValueList& colOldValuesList) -{ - SUMMARY_INFO("DDLPackageProcessor::updateSyscolumns"); - - if (result.result != NO_ERROR) - return; - - WriteEngine::ColStructList colStructs; - WriteEngine::CSCTypesList cscColTypeList; - //std::vector colStructs; - WriteEngine::ColStruct colStruct; - execplan::CalpontSystemCatalog::ColType colType; - WriteEngine::DctnryStructList dctnryStructList; - WriteEngine::DctnryValueList dctnryValueList; - //Build column structure for COLUMNPOS_COL - colType.columnOID = colStruct.dataOid = OID_SYSCOLUMN_COLUMNPOS; - colType.colWidth = colStruct.colWidth = 4; - colStruct.tokenFlag = false; - colType.colDataType = colStruct.colDataType = CalpontSystemCatalog::INT; - colStructs.push_back(colStruct); - cscColTypeList.push_back(colType); - int error; - std::string err; - std::vector colOldValuesList1; - - try - { - //@Bug 3051 use updateColumnRecs instead of updateColumnRec to use different value for diffrent rows. - if (NO_ERROR != (error = fWriteEngine.updateColumnRecs( txnID, cscColTypeList, colStructs, colValuesList, ridList ))) - { - // build the logging message - WErrorCodes ec; - err = "WE: Failed on update SYSCOLUMN table. " + ec.errorString(error); - throw std::runtime_error(err); - } - } - catch (std::exception& ex) - { - err = ex.what(); - throw std::runtime_error(err); - } - catch (...) - { - err = "updateSyscolumns:Unknown exception caught"; - throw std::runtime_error(err); - } - -} - -#endif void DDLPackageProcessor::returnOIDs(execplan::CalpontSystemCatalog::RIDList& ridList, execplan::CalpontSystemCatalog::DictOIDList& dictOIDList) { diff --git a/mysql-test/columnstore/basic/r/mcol4202-crossdb-rename.result b/mysql-test/columnstore/basic/r/mcol4202-crossdb-rename.result new file mode 100644 index 000000000..134b90f9a --- /dev/null +++ b/mysql-test/columnstore/basic/r/mcol4202-crossdb-rename.result @@ -0,0 +1,24 @@ +DROP DATABASE IF EXISTS mcol4202_db1; +DROP DATABASE IF EXISTS mcol4202_db2; +CREATE DATABASE mcol4202_db1; +CREATE DATABASE mcol4202_db2; +CREATE TABLE mcol4202_db1.t1 (id INT) ENGINE=Columnstore; +INSERT INTO mcol4202_db1.t1 values (111); +RENAME TABLE mcol4202_db1.t1 TO mcol4202_db2.t1; +SELECT * FROM mcol4202_db2.t1; +id +111 +USE mcol4202_db2; +INSERT INTO t1 values (222); +SELECT * FROM t1; +id +111 +222 +RENAME TABLE mcol4202_db2.t1 to mcol4202_db1.t33333; +USE mcol4202_db1; +SELECT * from t33333; +id +111 +222 +DROP DATABASE mcol4202_db1; +DROP DATABASE mcol4202_db2; diff --git a/mysql-test/columnstore/basic/r/mcs12_alter_table.result b/mysql-test/columnstore/basic/r/mcs12_alter_table.result index 1d01032bf..62a91fe61 100644 --- a/mysql-test/columnstore/basic/r/mcs12_alter_table.result +++ b/mysql-test/columnstore/basic/r/mcs12_alter_table.result @@ -69,8 +69,7 @@ a c1 3 c 4 d ALTER TABLE mcs12_db1.t1 RENAME TO mcs12_db2.t1; -ERROR HY000: Internal error: CAL0001: Alter table Failed: The new tablename is already in use. -DROP TABLE t1; +DROP TABLE mcs12_db2.t1; CREATE TABLE t1 (i INTEGER) ENGINE=Columnstore; ALTER TABLE t1 ADD COLUMN (c CHAR(10)); INSERT INTO t1 VALUES (1, 'a'),(2, 'b'); diff --git a/mysql-test/columnstore/basic/t/mcol4202-crossdb-rename.test b/mysql-test/columnstore/basic/t/mcol4202-crossdb-rename.test new file mode 100644 index 000000000..c416a5e2a --- /dev/null +++ b/mysql-test/columnstore/basic/t/mcol4202-crossdb-rename.test @@ -0,0 +1,27 @@ +# +# Test ALTER TABLE schemas in various possible ways. +# +-- source ../include/have_columnstore.inc +-- source include/have_innodb.inc + +--disable_warnings +DROP DATABASE IF EXISTS mcol4202_db1; +DROP DATABASE IF EXISTS mcol4202_db2; +--enable_warnings + +CREATE DATABASE mcol4202_db1; +CREATE DATABASE mcol4202_db2; +CREATE TABLE mcol4202_db1.t1 (id INT) ENGINE=Columnstore; +INSERT INTO mcol4202_db1.t1 values (111); +RENAME TABLE mcol4202_db1.t1 TO mcol4202_db2.t1; +SELECT * FROM mcol4202_db2.t1; +USE mcol4202_db2; +INSERT INTO t1 values (222); +SELECT * FROM t1; +RENAME TABLE mcol4202_db2.t1 to mcol4202_db1.t33333; +USE mcol4202_db1; +SELECT * from t33333; + + +DROP DATABASE mcol4202_db1; +DROP DATABASE mcol4202_db2; diff --git a/mysql-test/columnstore/basic/t/mcs12_alter_table.test b/mysql-test/columnstore/basic/t/mcs12_alter_table.test index fd007bdfc..f1f67b5f8 100644 --- a/mysql-test/columnstore/basic/t/mcs12_alter_table.test +++ b/mysql-test/columnstore/basic/t/mcs12_alter_table.test @@ -37,12 +37,10 @@ ALTER TABLE t2 RENAME TO mcs12_db1.t1; --replace_regex /( COLLATE=latin1_swedish_ci)// SHOW CREATE TABLE t1; SELECT * FROM t1; -# Cross db rename errors in CS but not in innodb. Need to check. ---error 1815 ALTER TABLE mcs12_db1.t1 RENAME TO mcs12_db2.t1; # Add column -DROP TABLE t1; +DROP TABLE mcs12_db2.t1; CREATE TABLE t1 (i INTEGER) ENGINE=Columnstore; ALTER TABLE t1 ADD COLUMN (c CHAR(10)); INSERT INTO t1 VALUES (1, 'a'),(2, 'b'); diff --git a/writeengine/server/we_ddlcommandproc.cpp b/writeengine/server/we_ddlcommandproc.cpp index e5aed232e..c264ac819 100644 --- a/writeengine/server/we_ddlcommandproc.cpp +++ b/writeengine/server/we_ddlcommandproc.cpp @@ -36,7 +36,7 @@ using namespace ddlpackage; #include #include "dataconvert.h" using namespace dataconvert; -//#include "we_brm.h" +// #include "we_brm.h" namespace fs = boost::filesystem; #include "cacheutils.h" #include "IDBDataFile.h" @@ -2366,23 +2366,14 @@ uint8_t WE_DDLCommandProc::updateSyscolumnNextvalCol(ByteStream& bs, std::string return rc; } -uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& err) +uint8_t WE_DDLCommandProc::updateSystableEntryForSysColumn(int32_t sessionID, uint32_t txnID, + const DDLColumn& column, const std::string& value, + const std::string& oldValue, + execplan::CalpontSystemCatalog::RIDList& roList, + std::string& err) { int rc = 0; - uint32_t sessionID, tmp32; - std::string schema, oldTablename, newTablename; - int txnID; - bs >> sessionID; - bs >> tmp32; - txnID = tmp32; - bs >> schema; - bs >> oldTablename; - bs >> newTablename; - - CalpontSystemCatalog::TableName tableName; - tableName.schema = schema; - tableName.table = oldTablename; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryValueList dctnryValueList; WriteEngine::DctColTupleList dctRowList; @@ -2395,33 +2386,13 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& uint16_t segment; uint32_t partition; - CalpontSystemCatalog::RIDList roList; - boost::shared_ptr systemCatalogPtr = - CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); - systemCatalogPtr->identity(CalpontSystemCatalog::EC); - - try - { - roList = systemCatalogPtr->columnRIDs(tableName); - } - catch (std::exception& ex) - { - err = ex.what(); - rc = 1; - return rc; - } - - // Build colStructs for SYSTABLE std::vector ridList; WriteEngine::ColValueList colValuesList; WriteEngine::ColTupleList aColList; WriteEngine::ColStructList colStructs; WriteEngine::CSCTypesList cscColTypeList; std::vector colOldValuesList; - tableName.schema = CALPONT_SCHEMA; - tableName.table = SYSCOLUMN_TABLE; - DDLColumn column; - findColumnData(sessionID, tableName, TABLENAME_COL, column); + WriteEngine::ColStruct colStruct; WriteEngine::DctnryStruct dctnryStruct; WriteEngine::DctnryTuple dictTuple; @@ -2491,8 +2462,8 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& // It's the same string for each column, so we just need one dictionary struct void* dictTuplePtr = static_cast(&dictTuple); memset(dictTuplePtr, 0, sizeof(dictTuple)); - dictTuple.sigValue = (unsigned char*)newTablename.c_str(); - dictTuple.sigSize = newTablename.length(); + dictTuple.sigValue = (unsigned char*)value.c_str(); + dictTuple.sigSize = value.length(); dictTuple.isNull = false; dctColList = dictTuple; dctRowList.push_back(dctColList); @@ -2553,7 +2524,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& if (rc != NO_ERROR) { // build the logging message - err = "WE: Update failed on: " + tableName.table; + err = "WE: Update failed on: " + value; } int rc1 = 0; @@ -2580,6 +2551,55 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& if (rc == 0) rc = rc1; + return rc; +} + +uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& err) +{ + int rc = 0; + uint32_t sessionID, txnID; + std::string schema, oldTablename, newTablename, newSchema; + + bs >> sessionID; + bs >> txnID; + bs >> schema; + bs >> oldTablename; + bs >> newTablename; + bs >> newSchema; + + CalpontSystemCatalog::TableName tableName; + tableName.schema = schema; + tableName.table = oldTablename; + + CalpontSystemCatalog::RIDList roList; + boost::shared_ptr systemCatalogPtr = + CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); + systemCatalogPtr->identity(CalpontSystemCatalog::EC); + + try + { + roList = systemCatalogPtr->columnRIDs(tableName); + } + catch (std::exception& ex) + { + err = ex.what(); + rc = 1; + return rc; + } + + // Build colStructs for SYSTABLE + tableName.schema = CALPONT_SCHEMA; + tableName.table = SYSCOLUMN_TABLE; + DDLColumn column; + findColumnData(sessionID, tableName, TABLENAME_COL, column); + rc = updateSystableEntryForSysColumn(sessionID, txnID, column, newTablename, oldTablename, roList, err); + + if (newSchema != schema && rc == NO_ERROR) + { + findColumnData(sessionID, tableName, SCHEMA_COL, column); + rc = updateSystableEntryForSysColumn(sessionID, txnID, column, newSchema, schema, roList, err); + } + systemCatalogPtr->flushCache(); purgeFDCache(); // if (idbdatafile::IDBPolicy::useHdfs()) @@ -2771,56 +2791,12 @@ uint8_t WE_DDLCommandProc::updateSystableAuto(ByteStream& bs, std::string& err) return rc; } -uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& err) +uint8_t WE_DDLCommandProc::updateSystableEntryForSysTable(int32_t sessionID, uint32_t txnID, + const DDLColumn& column, const std::string& value, + const std::string& oldValue, + CalpontSystemCatalog::ROPair ropair, + std::string& err) { - int rc = 0; - uint32_t sessionID, tmp32; - std::string schema, oldTablename, newTablename; - int txnID; - - bs >> sessionID; - bs >> tmp32; - txnID = tmp32; - bs >> schema; - bs >> oldTablename; - bs >> newTablename; - - CalpontSystemCatalog::TableName tableName; - tableName.schema = schema; - tableName.table = oldTablename; - WriteEngine::DctnryStructList dctnryStructList; - WriteEngine::DctnryValueList dctnryValueList; - WriteEngine::DctColTupleList dctRowList; - WriteEngine::DctnryTuple dctColList; - - uint16_t dbRoot = 0; - uint16_t segment; - uint32_t partition; - - CalpontSystemCatalog::ROPair ropair; - boost::shared_ptr systemCatalogPtr = - CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); - systemCatalogPtr->identity(CalpontSystemCatalog::EC); - - try - { - ropair = systemCatalogPtr->tableRID(tableName); - } - catch (std::exception& ex) - { - err = ex.what(); - rc = 1; - return rc; - } - - if (ropair.objnum < 0) - { - err = "No such table: " + tableName.table; - return 1; - } - - // now we have to prepare the various structures for the WE to update the column. - std::vector ridList; WriteEngine::ColValueList colValuesList; WriteEngine::ColTupleList aColList; @@ -2828,19 +2804,16 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& WriteEngine::CSCTypesList cscColTypeList; std::vector colOldValuesList; std::map oids; - // std::vector oidsToFlush; - boost::any datavalue; - datavalue = newTablename; WriteEngine::ColTuple colTuple; - - // Build colStructs for SYSTABLE - tableName.schema = CALPONT_SCHEMA; - tableName.table = SYSTABLE_TABLE; - DDLColumn column; - findColumnData(sessionID, tableName, TABLENAME_COL, column); WriteEngine::ColStruct colStruct; WriteEngine::DctnryStruct dctnryStruct; + + WriteEngine::DctnryStructList dctnryStructList; + WriteEngine::DctnryValueList dctnryValueList; + WriteEngine::DctColTupleList dctRowList; + WriteEngine::DctnryTuple dctColList; + colStruct.dataOid = column.oid; colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth; colStruct.tokenFlag = true; @@ -2852,8 +2825,8 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& dictStruct.columnOid = column.colType.columnOID; WriteEngine::DctnryTuple dictTuple; dictTuple.isNull = false; - dictTuple.sigValue = (unsigned char*)newTablename.c_str(); - dictTuple.sigSize = newTablename.length(); + dictTuple.sigValue = (unsigned char*)value.c_str(); + dictTuple.sigSize = value.length(); if (idbdatafile::IDBPolicy::useHdfs()) { @@ -2899,6 +2872,11 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& std::vector extentsinfo; extentInfo aExtentinfo; CalpontSystemCatalog::OID oid = 1003; + + uint16_t dbRoot = 0; + uint16_t segment; + uint32_t partition; + convertRidToColumn(ropair.rid, dbRoot, partition, segment, oid); ridList.push_back(ropair.rid); @@ -2934,13 +2912,14 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& fWEWrapper.setBulkFlag(false); fWEWrapper.startTransaction(txnID); - rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, - ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE); + int rc = + fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, + ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE); if (rc != NO_ERROR) { // build the logging message - err = "WE: Update failed on: " + tableName.table; + err = "WE: Update failed on: " + oldValue; int rc1 = 0; if (idbdatafile::IDBPolicy::useHdfs()) @@ -2993,20 +2972,14 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& if (rc == 0) rc = rc1; - systemCatalogPtr->flushCache(); - purgeFDCache(); - // if (idbdatafile::IDBPolicy::useHdfs()) - // cacheutils::flushOIDsFromCache(oidsToFlush); - // cout << "rename:syscolumn is updated" << endl; return rc; } -uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string& err) +uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& err) { - int rc = 0; - uint32_t sessionID, tmp32; - std::string schema, oldTablename, newTablename; - int txnID; + uint8_t rc; + uint32_t sessionID, tmp32, txnID; + std::string schema, oldTablename, newTablename, newSchema; bs >> sessionID; bs >> tmp32; @@ -3014,26 +2987,17 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string& bs >> schema; bs >> oldTablename; bs >> newTablename; + bs >> newSchema; CalpontSystemCatalog::TableName tableName; tableName.schema = schema; tableName.table = oldTablename; - WriteEngine::DctnryStructList dctnryStructList; - WriteEngine::DctnryValueList dctnryValueList; - WriteEngine::DctColTupleList dctRowList; - WriteEngine::DctnryTuple dctColList; - - uint16_t dbRoot = 0; - uint16_t segment; - uint32_t partition; CalpontSystemCatalog::ROPair ropair; boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); - //@bug 4592 Error handling for syscat call - try { ropair = systemCatalogPtr->tableRID(tableName); @@ -3053,361 +3017,20 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string& // now we have to prepare the various structures for the WE to update the column. - std::vector ridList; - WriteEngine::ColValueList colValuesList; - WriteEngine::ColTupleList aColList; - WriteEngine::ColStructList colStructs; - WriteEngine::CSCTypesList cscColTypeList; - std::vector colOldValuesList; - std::map oids; - // std::vector oidsToFlush; - boost::any datavalue; - datavalue = newTablename; - - WriteEngine::ColTuple colTuple; - // Build colStructs for SYSTABLE tableName.schema = CALPONT_SCHEMA; tableName.table = SYSTABLE_TABLE; DDLColumn column; findColumnData(sessionID, tableName, TABLENAME_COL, column); - WriteEngine::ColStruct colStruct; - WriteEngine::DctnryStruct dctnryStruct; - colStruct.dataOid = column.oid; - colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth; - colStruct.tokenFlag = true; - colStruct.colDataType = column.colType.colDataType; - // Tokenize the data value - WriteEngine::DctnryStruct dictStruct; - dictStruct.dctnryOid = column.colType.ddn.dictOID; - dictStruct.columnOid = column.colType.columnOID; - WriteEngine::DctnryTuple dictTuple; - dictTuple.isNull = false; - dictTuple.sigValue = (unsigned char*)newTablename.c_str(); - dictTuple.sigSize = newTablename.length(); - // int error = NO_ERROR; - // if (NO_ERROR != (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple))) - //{ - // WErrorCodes ec; - // throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error)); - //} - // WriteEngine::Token aToken = dictTuple.token; + rc = updateSystableEntryForSysTable(sessionID, txnID, column, newTablename, oldTablename, ropair, err); - // colTuple.data = aToken; - // cout << "token value for new table name is op:fbo = " << aToken.op <<":" << aToken.fbo << " null flag = " - // << (uint32_t)dictTuple.isNull<< endl; - if (idbdatafile::IDBPolicy::useHdfs()) + if (newSchema != schema && rc == NO_ERROR) { - colStruct.fCompressionType = 2; - dctnryStruct.fCompressionType = 2; + findColumnData(sessionID, tableName, SCHEMA_COL, column); + rc = updateSystableEntryForSysTable(sessionID, txnID, column, newSchema, schema, ropair, err); } - if (colStruct.tokenFlag) - { - dctnryStruct.dctnryOid = column.colType.ddn.dictOID; - dctnryStruct.fCharsetNumber = column.colType.charsetNumber; - dctnryStruct.columnOid = colStruct.dataOid; - } - else - { - dctnryStruct.dctnryOid = 0; - dctnryStruct.columnOid = colStruct.dataOid; - } - - colStructs.push_back(colStruct); - dctnryStructList.push_back(dctnryStruct); - oids[colStruct.dataOid] = colStruct.dataOid; - cscColTypeList.push_back(column.colType); - - // oidsToFlush.push_back(colStruct.dataOid); - if (dctnryStruct.dctnryOid > 0) - { - oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid; - // oidsToFlush.push_back(dctnryStruct.dctnryOid); - } - - aColList.push_back(colTuple); - colValuesList.push_back(aColList); - std::vector colExtentsStruct; - std::vector dctnryExtentsStruct; - std::vector colExtentsColType; - - dctColList = dictTuple; - dctRowList.push_back(dctColList); - dctnryValueList.push_back(dctRowList); - - // In this case, there's only 1 row, so only one one extent, but keep it generic... - std::vector extentsinfo; - extentInfo aExtentinfo; - CalpontSystemCatalog::OID oid = 1003; - convertRidToColumn(ropair.rid, dbRoot, partition, segment, oid); - - ridList.push_back(ropair.rid); - std::vector ridLists; - ridLists.push_back(ridList); - aExtentinfo.dbRoot = dbRoot; - aExtentinfo.partition = partition; - aExtentinfo.segment = segment; - - extentsinfo.push_back(aExtentinfo); - - // build colExtentsStruct - for (unsigned i = 0; i < extentsinfo.size(); i++) - { - for (unsigned j = 0; j < colStructs.size(); j++) - { - colStructs[j].fColPartition = extentsinfo[i].partition; - colStructs[j].fColSegment = extentsinfo[i].segment; - colStructs[j].fColDbRoot = extentsinfo[i].dbRoot; - dctnryStructList[j].fColPartition = extentsinfo[i].partition; - dctnryStructList[j].fColSegment = extentsinfo[i].segment; - dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot; - } - - colExtentsStruct.push_back(colStructs); - dctnryExtentsStruct.push_back(dctnryStructList); - colExtentsColType.push_back(cscColTypeList); - } - - // call the write engine to update the row - fWEWrapper.setTransId(txnID); - fWEWrapper.setIsInsert(false); - fWEWrapper.setBulkFlag(false); - fWEWrapper.startTransaction(txnID); - - rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, - ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE); - - if (rc != NO_ERROR) - { - // build the logging message - err = "WE: Update failed on: " + tableName.table; - int rc1 = 0; - - if (idbdatafile::IDBPolicy::useHdfs()) - { - rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids); - - if ((rc == 0) && (rc1 == 0)) - { - rc1 = fWEWrapper.confirmTransaction(txnID); - - if (rc1 == NO_ERROR) - rc1 = fWEWrapper.endTransaction(txnID, true); - else - fWEWrapper.endTransaction(txnID, false); - } - else - { - fWEWrapper.endTransaction(txnID, false); - } - } - - if (rc == 0) - rc = rc1; - - if (rc != 0) - return rc; - } - - // cout << "rename:systable is updated to " << newTablename << " for rid " << ropair.rid << endl; - // Update SYSCOLUMN table - tableName.schema = schema; - tableName.table = oldTablename; - dctnryStructList.clear(); - dctnryValueList.clear(); - dctRowList.clear(); - - CalpontSystemCatalog::RIDList roList; - - try - { - roList = systemCatalogPtr->columnRIDs(tableName); - } - catch (std::exception& ex) - { - err = ex.what(); - rc = 1; - return rc; - } - - // Build colStructs for SYSCOLUMN - ridList.clear(); - colValuesList.clear(); - aColList.clear(); - colStructs.clear(); - cscColTypeList.clear(); - colOldValuesList.clear(); - oids.clear(); - tableName.schema = CALPONT_SCHEMA; - tableName.table = SYSCOLUMN_TABLE; - findColumnData(sessionID, tableName, TABLENAME_COL, column); - - colStruct.dataOid = column.oid; - colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth; - colStruct.tokenFlag = false; - - if ((column.colType.colDataType == CalpontSystemCatalog::CHAR && column.colType.colWidth > 8) || - (column.colType.colDataType == CalpontSystemCatalog::VARCHAR && column.colType.colWidth > 7) || - (column.colType.colDataType == CalpontSystemCatalog::VARBINARY && column.colType.colWidth > 7) || - (column.colType.colDataType == CalpontSystemCatalog::BLOB && column.colType.colWidth > 7) || - (column.colType.colDataType == CalpontSystemCatalog::TEXT && column.colType.colWidth > 7) || - (column.colType.colDataType == CalpontSystemCatalog::DECIMAL && column.colType.precision > 18) || - (column.colType.colDataType == CalpontSystemCatalog::UDECIMAL && - column.colType.precision > 18)) // token - { - colStruct.colWidth = 8; - colStruct.tokenFlag = true; - } - else - { - colStruct.colWidth = column.colType.colWidth; - } - - colStruct.colDataType = column.colType.colDataType; - - // Tokenize the data value - dictStruct.dctnryOid = column.colType.ddn.dictOID; - dictStruct.columnOid = column.colType.columnOID; - dictTuple.sigValue = (unsigned char*)newTablename.c_str(); - dictTuple.sigSize = newTablename.length(); - dictTuple.isNull = false; - /* - if (NO_ERROR != (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple))) - { - WErrorCodes ec; - throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error)); - } - aToken = dictTuple.token; - colTuple.data = aToken; */ - - colStruct.colDataType = column.colType.colDataType; - - if (idbdatafile::IDBPolicy::useHdfs()) - { - colStruct.fCompressionType = 2; - dctnryStruct.fCompressionType = 2; - } - - if (colStruct.tokenFlag) - { - dctnryStruct.dctnryOid = column.colType.ddn.dictOID; - dctnryStruct.fCharsetNumber = column.colType.charsetNumber; - dctnryStruct.columnOid = colStruct.dataOid; - } - else - { - dctnryStruct.dctnryOid = 0; - dctnryStruct.columnOid = colStruct.dataOid; - } - - oids[colStruct.dataOid] = colStruct.dataOid; - - // oidsToFlush.push_back(colStruct.dataOid); - if (dctnryStruct.dctnryOid > 0) - { - oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid; - // oidsToFlush.push_back(dctnryStruct.dctnryOid); - } - - colStructs.push_back(colStruct); - dctnryStructList.push_back(dctnryStruct); - cscColTypeList.push_back(column.colType); - - for (unsigned int i = 0; i < roList.size(); i++) - { - aColList.push_back(colTuple); - } - - colValuesList.push_back(aColList); - - // It's the same string for each column, so we just need one dictionary struct - void* dictTuplePtr = static_cast(&dictTuple); - memset(dictTuplePtr, 0, sizeof(dictTuple)); - dictTuple.sigValue = (unsigned char*)newTablename.c_str(); - dictTuple.sigSize = newTablename.length(); - dictTuple.isNull = false; - dctColList = dictTuple; - dctRowList.push_back(dctColList); - dctnryValueList.push_back(dctRowList); - extentsinfo.clear(); - colExtentsStruct.clear(); - colExtentsColType.clear(); - dctnryExtentsStruct.clear(); - oid = 1021; - - for (unsigned int i = 0; i < roList.size(); i++) - { - convertRidToColumn(roList[i].rid, dbRoot, partition, segment, oid); - - aExtentinfo.dbRoot = dbRoot; - aExtentinfo.partition = partition; - aExtentinfo.segment = segment; - - if (extentsinfo.empty()) - extentsinfo.push_back(aExtentinfo); - else if (extentsinfo.back() != aExtentinfo) - extentsinfo.push_back(aExtentinfo); - - ridList.push_back(roList[i].rid); - } - - ridLists.clear(); - ridLists.push_back(ridList); - - // build colExtentsStruct - for (unsigned i = 0; i < extentsinfo.size(); i++) - { - for (unsigned j = 0; j < colStructs.size(); j++) - { - colStructs[j].fColPartition = extentsinfo[i].partition; - colStructs[j].fColSegment = extentsinfo[i].segment; - colStructs[j].fColDbRoot = extentsinfo[i].dbRoot; - dctnryStructList[j].fColPartition = extentsinfo[i].partition; - dctnryStructList[j].fColSegment = extentsinfo[i].segment; - dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot; - } - - colExtentsStruct.push_back(colStructs); - dctnryExtentsStruct.push_back(dctnryStructList); - colExtentsColType.push_back(cscColTypeList); - } - - // call the write engine to update the row - rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, - ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE); - - if (rc != NO_ERROR) - { - // build the logging message - err = "WE: Update failed on: " + tableName.table; - } - - int rc1 = 0; - - if (idbdatafile::IDBPolicy::useHdfs()) - { - rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids); - - if ((rc == 0) && (rc1 == 0)) - { - rc1 = fWEWrapper.confirmTransaction(txnID); - - if (rc1 == NO_ERROR) - rc1 = fWEWrapper.endTransaction(txnID, true); - else - fWEWrapper.endTransaction(txnID, false); - } - else - { - fWEWrapper.endTransaction(txnID, false); - } - } - - if (rc == 0) - rc = rc1; - systemCatalogPtr->flushCache(); purgeFDCache(); // if (idbdatafile::IDBPolicy::useHdfs()) diff --git a/writeengine/server/we_ddlcommandproc.h b/writeengine/server/we_ddlcommandproc.h index c42edad30..dbd93859d 100644 --- a/writeengine/server/we_ddlcommandproc.h +++ b/writeengine/server/we_ddlcommandproc.h @@ -32,8 +32,11 @@ #define EXPORT + namespace WriteEngine { +struct DDLColumn; + class WE_DDLCommandProc { public: @@ -43,9 +46,9 @@ class WE_DDLCommandProc DROPPART_LOG, TRUNCATE_LOG }; - EXPORT WE_DDLCommandProc(); - EXPORT WE_DDLCommandProc(const WE_DDLCommandProc& rhs); - EXPORT ~WE_DDLCommandProc(); + WE_DDLCommandProc(); + WE_DDLCommandProc(const WE_DDLCommandProc& rhs); + ~WE_DDLCommandProc(); /** @brief Update SYSCOLUMN nextval column for the columnoid with nextVal. * * Update SYSCOLUMN nextval column for the columnoid with nexValue. @@ -53,35 +56,36 @@ class WE_DDLCommandProc * @param nextVal (in) The partition number * @return 0 on success, non-0 on error. */ - EXPORT uint8_t updateSyscolumnNextval(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t writeSystable(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t writeSyscolumn(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t writeCreateSyscolumn(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t createtablefiles(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t commitVersion(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t rollbackBlocks(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t rollbackVersion(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t deleteSyscolumn(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t deleteSyscolumnRow(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t deleteSystable(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t deleteSystables(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t dropFiles(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t updateSyscolumnAuto(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t updateSyscolumnNextvalCol(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t updateSyscolumnTablename(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t updateSystableAuto(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t updateSystableTablename(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t updateSystablesTablename(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t updateSyscolumnColumnposCol(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t fillNewColumn(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t updateSyscolumnRenameColumn(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t updateSyscolumnSetDefault(messageqcpp::ByteStream& bs, std::string& err); - // EXPORT uint8_t updateSyscolumn(messageqcpp::ByteStream& bs, std::string & err); - EXPORT uint8_t writeTruncateLog(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t writeDropPartitionLog(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t writeDropTableLog(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t deleteDDLLog(messageqcpp::ByteStream& bs, std::string& err); - EXPORT uint8_t fetchDDLLog(messageqcpp::ByteStream& bs, std::string& err); + uint8_t updateSyscolumnNextval(messageqcpp::ByteStream& bs, std::string& err); + uint8_t writeSystable(messageqcpp::ByteStream& bs, std::string& err); + uint8_t writeSyscolumn(messageqcpp::ByteStream& bs, std::string& err); + uint8_t writeCreateSyscolumn(messageqcpp::ByteStream& bs, std::string& err); + uint8_t createtablefiles(messageqcpp::ByteStream& bs, std::string& err); + uint8_t commitVersion(messageqcpp::ByteStream& bs, std::string& err); + uint8_t rollbackBlocks(messageqcpp::ByteStream& bs, std::string& err); + uint8_t rollbackVersion(messageqcpp::ByteStream& bs, std::string& err); + uint8_t deleteSyscolumn(messageqcpp::ByteStream& bs, std::string& err); + uint8_t deleteSyscolumnRow(messageqcpp::ByteStream& bs, std::string& err); + uint8_t deleteSystable(messageqcpp::ByteStream& bs, std::string& err); + uint8_t deleteSystables(messageqcpp::ByteStream& bs, std::string& err); + uint8_t dropFiles(messageqcpp::ByteStream& bs, std::string& err); + uint8_t updateSyscolumnAuto(messageqcpp::ByteStream& bs, std::string& err); + uint8_t updateSyscolumnNextvalCol(messageqcpp::ByteStream& bs, std::string& err); + uint8_t updateSystableAuto(messageqcpp::ByteStream& bs, std::string& err); + + uint8_t updateSyscolumnTablename(messageqcpp::ByteStream& bs, std::string& err); + uint8_t updateSystableTablename(messageqcpp::ByteStream& bs, std::string& err); + + uint8_t updateSyscolumnColumnposCol(messageqcpp::ByteStream& bs, std::string& err); + uint8_t fillNewColumn(messageqcpp::ByteStream& bs, std::string& err); + uint8_t updateSyscolumnRenameColumn(messageqcpp::ByteStream& bs, std::string& err); + uint8_t updateSyscolumnSetDefault(messageqcpp::ByteStream& bs, std::string& err); + // uint8_t updateSyscolumn(messageqcpp::ByteStream& bs, std::string & err); + uint8_t writeTruncateLog(messageqcpp::ByteStream& bs, std::string& err); + uint8_t writeDropPartitionLog(messageqcpp::ByteStream& bs, std::string& err); + uint8_t writeDropTableLog(messageqcpp::ByteStream& bs, std::string& err); + uint8_t deleteDDLLog(messageqcpp::ByteStream& bs, std::string& err); + uint8_t fetchDDLLog(messageqcpp::ByteStream& bs, std::string& err); void purgeFDCache(); /** @brief drop a set of partitions * @@ -90,7 +94,7 @@ class WE_DDLCommandProc * @param err (out) error message when error occurs * @return 0 on success, otherwise error. */ - EXPORT uint8_t dropPartitions(messageqcpp::ByteStream& bs, std::string& err); + uint8_t dropPartitions(messageqcpp::ByteStream& bs, std::string& err); inline void convertRidToColumn(uint64_t& rid, uint16_t& dbRoot, uint32_t& partition, uint16_t& segment, const int32_t oid) { @@ -111,6 +115,24 @@ class WE_DDLCommandProc rid = relRidInThisExtent + numExtentsInThisSegPart * extentRows; } +private: +uint8_t updateSystableEntryForSysTable(int32_t sessionID, + uint32_t txnID, + const DDLColumn& column, + const std::string& value, + const std::string& oldValue, + execplan::CalpontSystemCatalog::ROPair ropair, + std::string& err); + +uint8_t updateSystableEntryForSysColumn(int32_t sessionID, + uint32_t txnID, + const DDLColumn& column, + const std::string& value, + const std::string& oldValue, + execplan::CalpontSystemCatalog::RIDList& roList, + std::string& err); + + private: WriteEngineWrapper fWEWrapper; BRM::DBRM fDbrm; diff --git a/writeengine/server/we_messages.h b/writeengine/server/we_messages.h index acf613664..15c1f916e 100644 --- a/writeengine/server/we_messages.h +++ b/writeengine/server/we_messages.h @@ -48,7 +48,6 @@ enum ServerMessages WE_SVR_UPDATE_SYSCOLUMN_COLPOS, WE_SVR_UPDATE_SYSCOLUMN_RENAMECOLUMN, WE_SVR_UPDATE_SYSTABLE_TABLENAME, - WE_SVR_UPDATE_SYSTABLES_TABLENAME, WE_SVR_DROP_PARTITIONS, WE_SVR_SINGLE_INSERT, WE_SVR_BATCH_KEEPALIVE, diff --git a/writeengine/server/we_readthread.cpp b/writeengine/server/we_readthread.cpp index 9da77a3a2..8f6eb600d 100644 --- a/writeengine/server/we_readthread.cpp +++ b/writeengine/server/we_readthread.cpp @@ -325,12 +325,6 @@ void DmlReadThread::operator()() break; } - case WE_SVR_UPDATE_SYSTABLES_TABLENAME: - { - rc = fWeDDLprocessor->updateSystablesTablename(ibs, errMsg); - break; - } - case WE_SVR_FILL_COLUMN: { rc = fWeDDLprocessor->fillNewColumn(ibs, errMsg);