/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2016 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: altertableprocessor.cpp 9744 2013-08-07 03:32:19Z bwilkinson $ /** @file */ #include #include #include #include using namespace std; #include #include #include #include "altertableprocessor.h" #include "brm.h" using namespace BRM; #include "calpontsystemcatalog.h" using namespace execplan; #include "ddlpkg.h" using namespace ddlpackage; #include "sqllogger.h" #include "messagelog.h" using namespace logging; #include "we_messages.h" #include "we_ddlcommandclient.h" using namespace WriteEngine; #include "oamcache.h" using namespace oam; #include "bytestream.h" using namespace messageqcpp; #include "cacheutils.h" using namespace cacheutils; #include "IDBDataFile.h" #include "IDBPolicy.h" using namespace idbdatafile; //TODO: this should be in a common header somewhere struct extentInfo { uint16_t dbRoot; uint32_t partition; uint16_t segment; bool operator==(const extentInfo& rhs) const { return (dbRoot == rhs.dbRoot && partition == rhs.partition && segment == rhs.segment); } bool operator!=(const extentInfo& rhs) const { return !(*this == rhs); } }; namespace { bool typesAreSame(const CalpontSystemCatalog::ColType& colType, const ColumnType& newType) { switch (colType.colDataType) { case (CalpontSystemCatalog::BIT): if (newType.fType == DDL_BIT) return true; break; case (CalpontSystemCatalog::TINYINT): if (newType.fType == DDL_TINYINT && colType.precision == newType.fPrecision && colType.scale == newType.fScale) return true; //@Bug 5443 Not allow user change data type. //Not sure this is possible... //if (newType.fType == DDL_DECIMAL && colType.precision == newType.fPrecision && // colType.scale == newType.fScale) return true; break; case (CalpontSystemCatalog::UTINYINT): if (newType.fType == DDL_UNSIGNED_TINYINT && colType.precision == newType.fPrecision && colType.scale == newType.fScale) return true; break; case (CalpontSystemCatalog::CHAR): if (newType.fType == DDL_CHAR && colType.colWidth == newType.fLength) return true; break; case (CalpontSystemCatalog::SMALLINT): if (newType.fType == DDL_SMALLINT && colType.precision == newType.fPrecision && colType.scale == newType.fScale) return true; //if (newType.fType == DDL_DECIMAL && colType.precision == newType.fPrecision && // colType.scale == newType.fScale) return true; break; case (CalpontSystemCatalog::USMALLINT): if (newType.fType == DDL_UNSIGNED_SMALLINT && colType.precision == newType.fPrecision && colType.scale == newType.fScale) return true; break; case (CalpontSystemCatalog::DECIMAL): if ((newType.fType == DDL_DECIMAL || newType.fType == DDL_NUMERIC) && colType.precision == newType.fPrecision && colType.scale == newType.fScale) return true; break; case (CalpontSystemCatalog::UDECIMAL): if ((newType.fType == DDL_UNSIGNED_DECIMAL || newType.fType == DDL_UNSIGNED_NUMERIC) && colType.precision == newType.fPrecision && colType.scale == newType.fScale) return true; break; // Don't think there can be such a type in syscat right now... case (CalpontSystemCatalog::MEDINT): if (newType.fType == DDL_MEDINT && colType.precision == newType.fPrecision && colType.scale == newType.fScale) return true; //@Bug 5443 Not allow user change data type. //if (newType.fType == DDL_DECIMAL && colType.precision == newType.fPrecision && // colType.scale == newType.fScale) return true; break; case (CalpontSystemCatalog::INT): if (newType.fType == DDL_INT && colType.precision == newType.fPrecision && colType.scale == newType.fScale) return true; //if (newType.fType == DDL_DECIMAL && colType.precision == newType.fPrecision && // colType.scale == newType.fScale) return true; break; case (CalpontSystemCatalog::UINT): if (newType.fType == DDL_UNSIGNED_INT && colType.precision == newType.fPrecision && colType.scale == newType.fScale) return true; break; case (CalpontSystemCatalog::FLOAT): if (newType.fType == DDL_FLOAT) return true; break; case (CalpontSystemCatalog::UFLOAT): if (newType.fType == DDL_UNSIGNED_FLOAT) return true; break; case (CalpontSystemCatalog::DATE): if (newType.fType == DDL_DATE) return true; break; case (CalpontSystemCatalog::BIGINT): if (newType.fType == DDL_BIGINT && colType.precision == newType.fPrecision && colType.scale == newType.fScale) return true; //@Bug 5443 Not allow user change data type. //decimal is mapped to bigint in syscat //if (newType.fType == DDL_DECIMAL && colType.precision == newType.fPrecision && // colType.scale == newType.fScale) return true; break; case (CalpontSystemCatalog::UBIGINT): if (newType.fType == DDL_UNSIGNED_BIGINT && colType.precision == newType.fPrecision && colType.scale == newType.fScale) return true; break; case (CalpontSystemCatalog::DOUBLE): if (newType.fType == DDL_DOUBLE) return true; break; case (CalpontSystemCatalog::UDOUBLE): if (newType.fType == DDL_UNSIGNED_DOUBLE) return true; break; case (CalpontSystemCatalog::DATETIME): if (newType.fType == DDL_DATETIME) return true; break; case (CalpontSystemCatalog::VARCHAR): if (newType.fType == DDL_VARCHAR && colType.colWidth == newType.fLength) return true; break; case (CalpontSystemCatalog::VARBINARY): if (newType.fType == DDL_VARBINARY && colType.colWidth == newType.fLength) return true; break; case (CalpontSystemCatalog::CLOB): break; case (CalpontSystemCatalog::BLOB): break; default: break; } return false; } bool comptypesAreCompat(int oldCtype, int newCtype) { switch (oldCtype) { case 1: case 2: return (newCtype == 1 || newCtype == 2); default: break; } return (oldCtype == newCtype); } } namespace ddlpackageprocessor { AlterTableProcessor::DDLResult AlterTableProcessor::processPackage(ddlpackage::AlterTableStatement& alterTableStmt) { SUMMARY_INFO("AlterTableProcessor::processPackage"); DDLResult result; BRM::TxnID txnID; txnID.id= fTxnid.id; txnID.valid= fTxnid.valid; result.result = NO_ERROR; std::string err; uint64_t tableLockId = 0; DETAIL_INFO(alterTableStmt); int rc = 0; rc = fDbrm->isReadWrite(); if (rc != 0 ) { logging::Message::Args args; logging::Message message(9); args.add("Unable to execute the statement due to DBRM is read only"); message.format(args); result.result = ALTER_ERROR; result.message = message; fSessionManager.rolledback(txnID); return result; } //@Bug 4538. Log the sql statement before grabbing tablelock string stmt = alterTableStmt.fSql + "|" + (alterTableStmt.fTableName)->fSchema +"|"; SQLLogger logger(stmt, fDDLLoggingId, alterTableStmt.fSessionID, txnID.id); VERBOSE_INFO("Getting current txnID"); OamCache * oamcache = OamCache::makeOamCache(); std::vector moduleIds = oamcache->getModuleIds(); uint64_t uniqueId = 0; //Bug 5070. Added exception handling try { uniqueId = fDbrm->getUnique64(); } catch (std::exception& ex) { logging::Message::Args args; logging::Message message(9); args.add(ex.what()); message.format(args); result.result = ALTER_ERROR; result.message = message; fSessionManager.rolledback(txnID); return result; } catch ( ... ) { logging::Message::Args args; logging::Message message(9); args.add("Unknown error occured while getting unique number."); message.format(args); result.result = ALTER_ERROR; result.message = message; fSessionManager.rolledback(txnID); return result; } fWEClient->addQueue(uniqueId); try { //check table lock boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(alterTableStmt.fSessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); systemCatalogPtr->sessionID(alterTableStmt.fSessionID); CalpontSystemCatalog::TableName tableName; tableName.schema = (alterTableStmt.fTableName)->fSchema; tableName.table = (alterTableStmt.fTableName)->fName; execplan::CalpontSystemCatalog::ROPair roPair; roPair = systemCatalogPtr->tableRID(tableName); uint32_t processID = ::getpid(); int32_t txnid = txnID.id; int32_t sessionId = alterTableStmt.fSessionID; std::string processName("DDLProc"); int i = 0; std::vector pms; for (unsigned i=0; i < moduleIds.size(); i++) { pms.push_back((uint32_t)moduleIds[i]); } try { tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, BRM::LOADING ); } catch (std::exception&) { throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); } if ( tableLockId == 0 ) { int waitPeriod = 10; int sleepTime = 100; // sleep 100 milliseconds between checks int numTries = 10; // try 10 times per second waitPeriod = Config::getWaitPeriod(); numTries = waitPeriod * 10; struct timespec rm_ts; rm_ts.tv_sec = sleepTime/1000; rm_ts.tv_nsec = sleepTime%1000 *1000000; for (; i < numTries; i++) { #ifdef _MSC_VER Sleep(rm_ts.tv_sec * 1000); #else struct timespec abs_ts; do { abs_ts.tv_sec = rm_ts.tv_sec; abs_ts.tv_nsec = rm_ts.tv_nsec; } while(nanosleep(&abs_ts,&rm_ts) < 0); #endif try { processID = ::getpid(); txnid = txnID.id; sessionId = alterTableStmt.fSessionID;; processName = "DDLProc"; tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, BRM::LOADING ); } catch (std::exception&) { throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); } if (tableLockId > 0) break; } if (i >= numTries) //error out { logging::Message::Args args; string strOp("alter"); args.add(strOp); args.add(processName); args.add((uint64_t)processID); args.add(sessionId); throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED,args)); } } ddlpackage::AlterTableActionList actionList = alterTableStmt.fActions; AlterTableActionList::const_iterator action_iterator = actionList.begin(); while (action_iterator != actionList.end()) { std::string s(typeid(*(*action_iterator)).name()); if (s.find(AlterActionString[0]) != string::npos) { //bug 827:change AtaAddColumn to AtaAddColumns //Add a column //Bug 1192 ddlpackage::ColumnDef* columnDefPtr = 0; ddlpackage::AtaAddColumn *addColumnPtr = dynamic_cast (*action_iterator); if (addColumnPtr) { columnDefPtr = addColumnPtr->fColumnDef; } else { ddlpackage::AtaAddColumns& addColumns = *(dynamic_cast (*action_iterator)); columnDefPtr = addColumns.fColumns[0]; } addColumn (alterTableStmt.fSessionID, txnID.id, result, columnDefPtr, *(alterTableStmt.fTableName), uniqueId); if (result.result != NO_ERROR) { err = "AlterTable: add column failed"; throw std::runtime_error(err); } } else if (s.find(AlterActionString[6]) != string::npos) { //Drop Column Default dropColumnDefault (alterTableStmt.fSessionID, txnID.id, result, *(dynamic_cast (*action_iterator)), *(alterTableStmt.fTableName), uniqueId); if (result.result != NO_ERROR) { err = "AlterTable: drop column default failed"; throw std::runtime_error(err); } } else if (s.find(AlterActionString[3]) != string::npos) { //Drop Columns dropColumns (alterTableStmt.fSessionID, txnID.id, result, *(dynamic_cast (*action_iterator)), *(alterTableStmt.fTableName), uniqueId); } else if (s.find(AlterActionString[2]) != string::npos) { //Drop a column dropColumn (alterTableStmt.fSessionID, txnID.id, result, *(dynamic_cast (*action_iterator)), *(alterTableStmt.fTableName), uniqueId); } #if 0 else if (s.find(AlterActionString[4]) != string::npos) { //Add Table Constraint addTableConstraint (alterTableStmt.fSessionID, txnID.id, result, *(dynamic_cast (*action_iterator)), *(alterTableStmt.fTableName)); } #endif else if (s.find(AlterActionString[5]) != string::npos) { //Set Column Default setColumnDefault (alterTableStmt.fSessionID, txnID.id, result, *(dynamic_cast (*action_iterator)), *(alterTableStmt.fTableName), uniqueId); } #if 0 else if (s.find(AlterActionString[7]) != string::npos) { //Drop Table Constraint dropTableConstraint (alterTableStmt.fSessionID, txnID.id, result, *(dynamic_cast (*action_iterator)), *(alterTableStmt.fTableName)); } #endif else if (s.find(AlterActionString[8]) != string::npos) { //Rename Table renameTable (alterTableStmt.fSessionID, txnID.id, result, *(dynamic_cast (*action_iterator)), *(alterTableStmt.fTableName), uniqueId); } else if (s.find(AlterActionString[10]) != string::npos) { //Rename a Column renameColumn (alterTableStmt.fSessionID, txnID.id, result, *(dynamic_cast (*action_iterator)), *(alterTableStmt.fTableName), uniqueId); } else if (s.find(AlterActionString[11]) != string::npos) { // Table Comment tableComment (alterTableStmt.fSessionID, txnID.id, result, *(dynamic_cast (*action_iterator)), *(alterTableStmt.fTableName), uniqueId); } else { throw std::runtime_error("Altertable: Error in the action type"); } ++action_iterator; } // Log the DDL statement. logging::logDDL(alterTableStmt.fSessionID, txnID.id, alterTableStmt.fSql, alterTableStmt.fOwner); DETAIL_INFO("Commiting transaction"); commitTransaction(uniqueId, txnID); fSessionManager.committed(txnID); } catch (std::exception& ex) { rollBackAlter(ex.what(), txnID, alterTableStmt.fSessionID, result, uniqueId); } catch (...) { rollBackAlter("encountered unknown exception. ", txnID, alterTableStmt.fSessionID, result, uniqueId); } //release table lock try { (void)fDbrm->releaseTableLock(tableLockId); //cout << "table lock " << tableLockId << " is released" << endl; } catch (std::exception&) { if (result.result == NO_ERROR) { logging::Message::Args args; logging::Message message(1); args.add("Table lock is not released due to "); args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); args.add(""); args.add(""); message.format(args); result.result = ALTER_ERROR; result.message = message; } } fWEClient->removeQueue(uniqueId); return result; } void AlterTableProcessor::rollBackAlter(const string& error, BRM::TxnID txnID, int sessionId, DDLResult& result, uint64_t uniqueId) { DETAIL_INFO("Rolling back transaction"); cerr << "AltertableProcessor::processPackage: " << error << endl; logging::Message::Args args; logging::Message message(1); args.add("Alter table Failed: "); args.add(error); args.add(""); args.add(""); message.format(args); rollBackTransaction( uniqueId, txnID, sessionId); fSessionManager.rolledback(txnID); result.result = ALTER_ERROR; result.message = message; } void AlterTableProcessor::addColumn (uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID, DDLResult& result, ddlpackage::ColumnDef* columnDefPtr, ddlpackage::QualifiedName& inTableName, const uint64_t uniqueId) { std::string err("AlterTableProcessor::addColumn "); SUMMARY_INFO(err); // Allocate an object ID for the column we are about to create, non systables only VERBOSE_INFO("Allocating object ID for a column"); ByteStream bs; ByteStream::byte tmp8; int rc = 0; std::string errorMsg; uint16_t dbRoot; BRM::OID_t sysOid = 1021; bool isDict = false; //@Bug 4111. Check whether the column exists in calpont systable boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); systemCatalogPtr->identity(CalpontSystemCatalog::FE); CalpontSystemCatalog::TableColName tableColName; tableColName.schema = inTableName.fSchema; tableColName.table = inTableName.fName; tableColName.column = columnDefPtr->fName; CalpontSystemCatalog::OID columnOid; try { columnOid = systemCatalogPtr->lookupOID(tableColName); } catch (std::exception& ex) { result.result = ALTER_ERROR; err += ex.what(); throw std::runtime_error(err); } catch (...) { result.result = ALTER_ERROR; err += "Unknown exception caught"; throw std::runtime_error(err); } if (columnOid > 0) //Column exists already { err =err + "Internal add column error for " + tableColName.schema + "." + tableColName.table + "." + tableColName.column + ". Column exists already. Your table is probably out-of-sync"; throw std::runtime_error(err); } if ((columnDefPtr->fType->fType == CalpontSystemCatalog::CHAR && columnDefPtr->fType->fLength > 8) || (columnDefPtr->fType->fType == CalpontSystemCatalog::VARCHAR && columnDefPtr->fType->fLength > 7) || (columnDefPtr->fType->fType == CalpontSystemCatalog::VARBINARY && columnDefPtr->fType->fLength > 7)) { isDict = true; } //Find out where syscolumn are rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot); if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot "); int pmNum = 1; OamCache * oamcache = OamCache::makeOamCache(); boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); pmNum = (*dbRootPMMap)[dbRoot]; boost::shared_ptr bsIn; //Will create files on each PM as needed. //BUG931 //In order to fill up the new column with data an existing column is selected as reference ColumnList columns; getColumnsForTable(sessionID, inTableName.fSchema, inTableName.fName, columns); ColumnList::const_iterator column_iterator; column_iterator = columns.begin(); if (inTableName.fSchema != CALPONT_SCHEMA) { try { execplan::ObjectIDManager fObjectIDManager; if (isDict) { fStartingColOID = fObjectIDManager.allocOIDs(2); } else fStartingColOID = fObjectIDManager.allocOIDs(1); } catch (std::exception& ex) { result.result = ALTER_ERROR; err += ex.what(); throw std::runtime_error(err); } //cout << "new oid is " << fStartingColOID << endl; } else { // Add columns to SYSTABLE and SYSCOLUMN if ((inTableName.fName == SYSTABLE_TABLE) && (columnDefPtr->fName == AUTOINC_COL)) { fStartingColOID = OID_SYSTABLE_AUTOINCREMENT; } else if ((inTableName.fName == SYSCOLUMN_TABLE) && (columnDefPtr->fName == COMPRESSIONTYPE_COL)) { fStartingColOID = OID_SYSCOLUMN_COMPRESSIONTYPE; } else if ((inTableName.fName == SYSCOLUMN_TABLE) && (columnDefPtr->fName == NEXTVALUE_COL)) { fStartingColOID = OID_SYSCOLUMN_NEXTVALUE; } else { throw std::runtime_error("Error adding column to calpontsys table"); } columnDefPtr->fType->fCompressiontype = 0; columnDefPtr->fType->fAutoincrement = "n"; columnDefPtr->fType->fNextvalue = 0; cerr << "updating calpontsys...using static OID " << fStartingColOID << endl; } fColumnNum = 1; //Find the position for the last column //@Bug 1358 CalpontSystemCatalog::TableName tableName; tableName.schema = inTableName.fSchema; tableName.table = inTableName.fName; std::set outOfSerPar; CalpontSystemCatalog::ROPair ropair; bool autoincrement = false; try { ropair = systemCatalogPtr->tableRID(tableName); if (ropair.objnum < 0) { err = "No such table: " + tableName.table; throw std::runtime_error(err); } int totalColumns = systemCatalogPtr->colNumbers(tableName); ColumnDefList aColumnList; aColumnList.push_back(columnDefPtr); bool alterFlag = true; // MCOL-66 The DBRM can't handle concurrent DDL boost::mutex::scoped_lock lk(dbrmMutex); if (inTableName.fSchema != CALPONT_SCHEMA) { VERBOSE_INFO("Writing meta data to SYSCOL"); //send to WES to process bs.restart(); bs << (ByteStream::byte) WE_SVR_WRITE_SYSCOLUMN; bs << uniqueId; bs << sessionID; bs << (uint32_t) txnID; bs << inTableName.fSchema; bs << inTableName.fName; bs << (uint32_t) fStartingColOID; if (isDict) bs << (uint32_t) (fStartingColOID+1); else bs << (uint32_t) 0; bs << (uint8_t) alterFlag; bs << (uint32_t) totalColumns; columnDefPtr->serialize(bs); //send to WES to process try { fWEClient->write(bs, (uint32_t)pmNum); while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; break; } else { *bsIn >> tmp8; rc = tmp8; if (rc != 0) { *bsIn >> errorMsg; } break; } } } catch (runtime_error& ex) //write error { rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { rc = NETWORK_ERROR; errorMsg = " Unknown exception caught while updating SYSTABLE."; } if (rc != 0) throw std::runtime_error(errorMsg); } if ((columnDefPtr->fType->fAutoincrement).compare("y") == 0) { //update systable autoincrement column sysOid = 1001; //Find out where systable is rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot); if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot "); pmNum = (*dbRootPMMap)[dbRoot]; bs.restart(); bs << (ByteStream::byte) WE_SVR_UPDATE_SYSTABLE_AUTO; bs << uniqueId; bs << sessionID; bs << (uint32_t) txnID; bs << inTableName.fSchema; bs << inTableName.fName; bs << (uint32_t) 1; //send to WES to process try { fWEClient->write(bs, (uint32_t)pmNum); while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; break; } else { *bsIn >> tmp8; rc = tmp8; if (rc != 0) { *bsIn >> errorMsg; } break; } } } catch (runtime_error& ex) //write error { rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { rc = NETWORK_ERROR; errorMsg = " Unknown exception caught while updating SYSTABLE."; } if (rc != 0) throw std::runtime_error(errorMsg); //start a sequence in controller fDbrm->startAISequence(fStartingColOID, columnDefPtr->fType->fNextvalue, columnDefPtr->fType->fLength, convertDataType(columnDefPtr->fType->fType)); } //@Bug 4176. save oids to a log file for cleanup after fail over. std::vector oidList; if (isDict) { oidList.push_back(fStartingColOID); oidList.push_back(fStartingColOID+1); } else oidList.push_back(fStartingColOID); createWriteDropLogFile( ropair.objnum, uniqueId, oidList ); //@Bug 1358,1427 Always use the first column in the table, not the first one in columnlist to prevent random result //@Bug 4182. Use widest column as reference column //Find the widest column unsigned int colpos = 0; int maxColwidth = 0; for (colpos = 0; colpos < columns.size(); colpos++) { if (columns[colpos].colType.colWidth > maxColwidth) maxColwidth = columns[colpos].colType.colWidth; } while(column_iterator != columns.end()) { if (column_iterator->colType.colWidth == maxColwidth) { //If there is atleast one existing column then use that as a reference to initialize the new column rows. //get dbroot information //fDbrm->getStartExtent((*column_iterator).oid, dbroot, partitionNum, true); rc = fDbrm->getOutOfServicePartitions(column_iterator->oid, outOfSerPar); if (rc != 0) { string errorMsg; BRM::errString(rc, errorMsg); ostringstream oss; oss << "getOutOfServicePartitions failed due to " << errorMsg; throw std::runtime_error(oss.str()); } int dataType1; dataType1 = convertDataType(columnDefPtr->fType->fType); if (dataType1 == CalpontSystemCatalog::DECIMAL || dataType1 == CalpontSystemCatalog::UDECIMAL) { columnDefPtr->convertDecimal(); } CalpontSystemCatalog::ColDataType dataType = convertDataType(columnDefPtr->fType->fType); if ((columnDefPtr->fType->fAutoincrement).compare("y") == 0) { autoincrement = true; } //send to all WES to add the new column bs.restart(); bs << (ByteStream::byte) WE_SVR_FILL_COLUMN; bs << uniqueId; bs << (uint32_t) txnID; bs << (uint32_t) fStartingColOID; if (isDict) bs << (uint32_t) (fStartingColOID+1); else bs << (uint32_t) 0; //new column info bs << (ByteStream::byte) dataType; bs << (ByteStream::byte) autoincrement; bs << (uint32_t) columnDefPtr->fType->fLength; bs << (uint32_t) columnDefPtr->fType->fScale; bs << (uint32_t) columnDefPtr->fType->fPrecision; std::string tmpStr(""); if (columnDefPtr->fDefaultValue) { tmpStr = columnDefPtr->fDefaultValue->fValue; } bs << tmpStr; bs << (ByteStream::byte) columnDefPtr->fType->fCompressiontype; //ref column info bs << (uint32_t) column_iterator->oid; bs << (ByteStream::byte) column_iterator->colType.colDataType; bs << (uint32_t) column_iterator->colType.colWidth; bs << (ByteStream::byte) column_iterator->colType.compressionType; //cout << "sending command fillcolumn " << endl; uint32_t msgRecived = 0; fWEClient->write_to_all(bs); bsIn.reset(new ByteStream()); while (1) { if (msgRecived == fPMCount) break; fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { rc = NETWORK_ERROR; break; } else { *bsIn >> tmp8; *bsIn >> errorMsg; rc = tmp8; //cout << "Got error code from WES " << rc << endl; if (rc != 0) break; else msgRecived++; } } if (rc != 0) //delete the newly created files before erroring out { bs.restart(); bs << (ByteStream::byte)WE_SVR_WRITE_DROPFILES; bs << uniqueId; bs << (uint32_t) oidList.size(); for (uint32_t i=0; i < oidList.size(); i++) { bs << (uint32_t) oidList[i]; } uint32_t msgRecived = 0; try { fWEClient->write_to_all(bs); bsIn.reset(new ByteStream()); ByteStream::byte tmp8; while (1) { if (msgRecived == fWEClient->getPmCount()) break; fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while dropping column files"; break; } else { *bsIn >> tmp8; rc = tmp8; if (rc != 0) { *bsIn >> errorMsg; break; } else msgRecived++; } } } catch (runtime_error& ex) //write error { rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { rc = NETWORK_ERROR; errorMsg = " Unknown exception caught while dropping column files."; } if ( rc == 0 ) { fWEClient->removeQueue(uniqueId); deleteLogFile(DROPTABLE_LOG, ropair.objnum, uniqueId); fWEClient->addQueue(uniqueId); } throw std::runtime_error(errorMsg); } //Update nextVal break; } //Update nextVal column_iterator++; } } catch (std::exception& ex) { if (result.result != CREATE_ERROR) result.result = ALTER_ERROR; err += ex.what(); throw std::runtime_error(err); } catch (...) { result.result = ALTER_ERROR; err += "Unknown exception caught"; throw std::runtime_error(err); } //update SYSCOLUMN of the new next value if (autoincrement) { DBRM aDbrm; aDbrm.getAILock(fStartingColOID); uint64_t nextValInController; bool validNextVal = aDbrm.getAIValue(fStartingColOID, &nextValInController); if (validNextVal) { WE_DDLCommandClient ddlClient; uint8_t rc = 0; if (idbdatafile::IDBPolicy::useHdfs()) rc = ddlClient.UpdateSyscolumnNextval(fStartingColOID, nextValInController,txnID); else rc = ddlClient.UpdateSyscolumnNextval(fStartingColOID, nextValInController,sessionID); aDbrm.releaseAILock(fStartingColOID); if (rc!=0) throw std::runtime_error("Update SYSCAT next value failed"); } else { aDbrm.releaseAILock(fStartingColOID); throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT)); } } std::vector oidList; oidList.push_back(fStartingColOID); if (outOfSerPar.size() > 0) rc = fDbrm->markPartitionForDeletion(oidList, outOfSerPar, errorMsg); if (rc != 0) { ostringstream oss; oss << "Mark partition for deletition failed due to " << errorMsg; throw std::runtime_error(oss.str()); } fWEClient->removeQueue(uniqueId); deleteLogFile(DROPTABLE_LOG, ropair.objnum, uniqueId); fWEClient->addQueue(uniqueId); } void AlterTableProcessor::dropColumn (uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID, DDLResult& result, ddlpackage::AtaDropColumn& ataDropColumn, ddlpackage::QualifiedName& fTableName, const uint64_t uniqueId) { // 1. Get the OIDs for the column // 2. Get the OIDs for the dictionary // 3. Remove the column from SYSCOLUMN // 4. update systable if the dropped column is autoincrement column // 5. update column position for affected columns // 6. Remove the files SUMMARY_INFO("AlterTableProcessor::dropColumn"); VERBOSE_INFO("Finding object IDs for the column"); CalpontSystemCatalog::TableColName tableColName; CalpontSystemCatalog::TableName tableName; tableName.schema = fTableName.fSchema; tableName.table = fTableName.fName; tableColName.schema = fTableName.fSchema; tableColName.table = fTableName.fName; tableColName.column = ataDropColumn.fColumnName; execplan::CalpontSystemCatalog::DictOIDList dictOIDList; boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); //@Bug 1358 systemCatalogPtr->identity(CalpontSystemCatalog::EC); std::string err; execplan::CalpontSystemCatalog::ROPair roPair; CalpontSystemCatalog::OID oid; CalpontSystemCatalog::ColType colType; try { roPair = systemCatalogPtr->tableRID( tableName ); oid = systemCatalogPtr->lookupOID(tableColName); colType = systemCatalogPtr->colType(oid); } catch (std::exception& ex) { throw std::runtime_error(ex.what()); } int colPos = colType.colPosition; ByteStream bytestream; bytestream << (ByteStream::byte)WE_SVR_DELETE_SYSCOLUMN_ROW; bytestream << uniqueId; bytestream << sessionID; bytestream << (uint32_t)txnID; bytestream << fTableName.fSchema; bytestream << fTableName.fName; bytestream << ataDropColumn.fColumnName; std::string errorMsg; uint16_t dbRoot; BRM::OID_t sysOid = 1021; ByteStream::byte rc = 0; //Find out where syscolumn is rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot); if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot "); int pmNum = 1; boost::shared_ptr bsIn; OamCache * oamcache = OamCache::makeOamCache(); boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); pmNum = (*dbRootPMMap)[dbRoot]; // MCOL-66 The DBRM can't handle concurrent DDL boost::mutex::scoped_lock lk(dbrmMutex); try { fWEClient->write(bytestream, (uint32_t)pmNum); #ifdef IDB_DDL_DEBUG cout << "Alter table drop column sending WE_SVR_DELETE_SYSCOLUMN_ROW to pm " << pmNum << endl; #endif while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; break; } else { *bsIn >> rc; *bsIn >> errorMsg; break; } } } catch (runtime_error& ex) //write error { #ifdef IDB_DDL_DEBUG cout << "Alter table drop column got exception" << ex.what() << endl; #endif rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { rc = NETWORK_ERROR; errorMsg = " Unknown exception caught while updating SYSTABLE."; #ifdef IDB_DDL_DEBUG cout << "Alter table drop column got unknown exception" << endl; #endif } if (rc != 0) throw std::runtime_error(errorMsg); //Update SYSTABLE if (colType.autoincrement) { sysOid = 1001; //Find out where systable is rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot); if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot "); pmNum = (*dbRootPMMap)[dbRoot]; bytestream.restart(); bytestream << (ByteStream::byte)WE_SVR_UPDATE_SYSTABLE_AUTO; bytestream << uniqueId; bytestream << sessionID; bytestream << (uint32_t)txnID; bytestream << fTableName.fSchema; bytestream << fTableName.fName; bytestream << (uint32_t) 0; //autoincrement off try { fWEClient->write(bytestream, (uint32_t)pmNum); #ifdef IDB_DDL_DEBUG cout << "Alter table drop column sending WE_SVR_UPDATE_SYSTABLE_AUTO to pm " << pmNum << endl; #endif while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; break; } else { *bsIn >> rc; *bsIn >> errorMsg; break; } } } catch (runtime_error& ex) //write error { #ifdef IDB_DDL_DEBUG cout << "Alter table drop column got exception" << ex.what() << endl; #endif rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { rc = NETWORK_ERROR; errorMsg = " Unknown exception caught while updating SYSTABLE."; #ifdef IDB_DDL_DEBUG cout << "Alter table drop column got unknown exception" << endl; #endif } if (rc != 0) throw std::runtime_error(errorMsg); } //Update column position bytestream.restart(); bytestream << (ByteStream::byte)WE_SVR_UPDATE_SYSCOLUMN_COLPOS; bytestream << uniqueId; bytestream << sessionID; bytestream << (uint32_t)txnID; bytestream << fTableName.fSchema; bytestream << fTableName.fName; bytestream << (uint32_t) colPos; sysOid = 1021; //Find out where syscolumn is rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot); if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot "); pmNum = (*dbRootPMMap)[dbRoot]; try { fWEClient->write(bytestream, (uint32_t)pmNum); #ifdef IDB_DDL_DEBUG cout << "Alter table drop column sending WE_SVR_UPDATE_SYSTABLE_AUTO to pm " << pmNum << endl; #endif while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; break; } else { *bsIn >> rc; *bsIn >> errorMsg; break; } } } catch (runtime_error& ex) //write error { #ifdef IDB_DDL_DEBUG cout << "Alter table drop column got exception" << ex.what() << endl; #endif rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { rc = NETWORK_ERROR; errorMsg = " Unknown exception caught while updating SYSTABLE."; #ifdef IDB_DDL_DEBUG cout << "Alter table drop column got unknown exception" << endl; #endif } if (rc != 0) throw std::runtime_error(errorMsg); //commit the transaction. BRM::TxnID aTxnID; aTxnID.id= txnID; aTxnID.valid= true; commitTransaction(uniqueId, aTxnID); // Bug 4208 Drop the PrimProcFDCache before droping the column files // FOr Windows, this ensures (most likely) that the column files have // no open handles to hinder the deletion of the files. rc = cacheutils::dropPrimProcFdCache(); VERBOSE_INFO("Removing column files"); //Drop files std::vector oidList; bytestream.restart(); bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES; bytestream << uniqueId; if (colType.ddn.dictOID > 3000) //@bug 4847. need to take care varchar(8) { bytestream << (uint32_t) 2; bytestream << (uint32_t) oid; bytestream << (uint32_t) colType.ddn.dictOID; oidList.push_back(oid); oidList.push_back( colType.ddn.dictOID); } else { bytestream << (uint32_t) 1; bytestream << (uint32_t) oid; oidList.push_back(oid); } //Save the oids to a file uint32_t msgRecived = 0; bool fileDropped = true; try { createWriteDropLogFile( roPair.objnum, uniqueId, oidList ); //@Bug 4811. Need to send to all PMs fWEClient->write_to_all(bytestream); #ifdef IDB_DDL_DEBUG cout << "Alter table drop column sending WE_SVR_UPDATE_SYSTABLE_AUTO to pm " << pmNum << endl; #endif bsIn.reset(new ByteStream()); ByteStream::byte tmp8; while (1) { if (msgRecived == fWEClient->getPmCount()) break; fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while dropping column files"; break; } else { *bsIn >> tmp8; rc = tmp8; if (rc != 0) { *bsIn >> errorMsg; fileDropped = false; break; } else msgRecived++; } } } catch (runtime_error& ex) //write error { #ifdef IDB_DDL_DEBUG cout << "Alter table drop column got exception" << ex.what() << endl; #endif rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { rc = NETWORK_ERROR; errorMsg = " Unknown exception caught while dropping column files."; #ifdef IDB_DDL_DEBUG cout << "create table got unknown exception" << endl; #endif } //@Bug 3860 rc = cacheutils::dropPrimProcFdCache(); //Flush primProc cache rc = cacheutils::flushOIDsFromCache( oidList ); //Delete extents from extent map rc = fDbrm->deleteOIDs(oidList); if (fileDropped) { fWEClient->removeQueue(uniqueId); deleteLogFile(DROPTABLE_LOG, roPair.objnum, uniqueId); fWEClient->addQueue(uniqueId); } } void AlterTableProcessor::dropColumns (uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID, DDLResult& result, ddlpackage::AtaDropColumns& ataDropColumns, ddlpackage::QualifiedName& fTableName, const uint64_t uniqueId) { SUMMARY_INFO("AlterTableProcessor::dropColumns"); ddlpackage::ColumnNameList colList = ataDropColumns.fColumns; ddlpackage::ColumnNameList::const_iterator col_iter = colList.begin(); std::string err; try { while (col_iter != colList.end()) { ddlpackage::AtaDropColumn ataDropColumn; ataDropColumn.fColumnName = *col_iter; dropColumn (sessionID,txnID, result,ataDropColumn,fTableName,uniqueId); if (result.result != NO_ERROR) { DETAIL_INFO("dropColumns::dropColumn failed"); return; } col_iter++; } } catch (std::exception& ex) { err = ex.what(); throw std::runtime_error(err); } catch (...) { err = "dropColumns:Unknown exception caught"; throw std::runtime_error(err); } } void AlterTableProcessor::addTableConstraint (uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID, DDLResult& result, ddlpackage::AtaAddTableConstraint& ataAddTableConstraint, ddlpackage::QualifiedName& fTableName) { /*TODO: Check if existing row satisfy the constraint. If not, the constraint will not be added. */ SUMMARY_INFO("AlterTableProcessor::addTableConstraint"); ddlpackage::TableConstraintDefList constrainList; constrainList.push_back(ataAddTableConstraint.fTableConstraint); VERBOSE_INFO("Writing table constraint meta data to SYSCONSTRAINT"); //bool alterFlag = true; std::string err; try { //writeTableSysConstraintMetaData(sessionID, txnID, result, constrainList, fTableName, alterFlag); VERBOSE_INFO("Writing table constraint meta data to SYSCONSTRAINTCOL"); //writeTableSysConstraintColMetaData(sessionID, txnID, result,constrainList, fTableName, alterFlag); } catch (std::exception& ex) { err = ex.what(); throw std::runtime_error(err); } catch (...) { err = "addTableConstraint:Unknown exception caught"; throw std::runtime_error(err); } } void AlterTableProcessor::setColumnDefault (uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID, DDLResult& result, ddlpackage::AtaSetColumnDefault& ataSetColumnDefault, ddlpackage::QualifiedName& fTableName, const uint64_t uniqueId) { SUMMARY_INFO("AlterTableProcessor::setColumnDefault"); /*Steps: 1. Update SYSCOLUMN for default value change */ SUMMARY_INFO("AlterTableProcessor::setColumnDefault"); ByteStream bs; std::string errorMsg; uint16_t dbRoot; BRM::OID_t sysOid = 1021; ByteStream::byte rc = 0; //Find out where syscolumns rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot); if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot"); int pmNum = 1; OamCache * oamcache = OamCache::makeOamCache(); boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); pmNum = (*dbRootPMMap)[dbRoot]; boost::shared_ptr bsIn; string err; //Update SYSCOLUMN bs.restart(); bs << (ByteStream::byte) WE_SVR_UPDATE_SYSCOLUMN_DEFAULTVAL; bs << uniqueId; bs << sessionID; bs << (uint32_t) txnID; bs << fTableName.fSchema; bs << fTableName.fName; bs << ataSetColumnDefault.fColumnName; string defaultValue(""); if (ataSetColumnDefault.fDefaultValue) defaultValue = ataSetColumnDefault.fDefaultValue->fValue; bs << defaultValue; //send to WES to process try { fWEClient->write(bs, (uint32_t)pmNum); while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; break; } else { *bsIn >> rc; if (rc != 0) { *bsIn >> errorMsg; } break; } } } catch (runtime_error& ex) //write error { rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { rc = NETWORK_ERROR; errorMsg = " Unknown exception caught while updating SYSTABLE."; } if (rc != 0) throw std::runtime_error(errorMsg); } void AlterTableProcessor::dropColumnDefault (uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID, DDLResult& result, ddlpackage::AtaDropColumnDefault& ataDropColumnDefault, ddlpackage::QualifiedName& fTableName, const uint64_t uniqueId) { SUMMARY_INFO("AlterTableProcessor::setColumnDefault"); /*Steps: 1. Update SYSCOLUMN for default value change */ SUMMARY_INFO("AlterTableProcessor::setColumnDefault"); ByteStream bs; std::string errorMsg; uint16_t dbRoot; BRM::OID_t sysOid = 1021; ByteStream::byte rc = 0; //Find out where syscolumn is rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot); if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot"); int pmNum = 1; OamCache * oamcache = OamCache::makeOamCache(); boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); pmNum = (*dbRootPMMap)[dbRoot]; boost::shared_ptr bsIn; string err; //Update SYSCOLUMN bs.restart(); bs << (ByteStream::byte) WE_SVR_UPDATE_SYSCOLUMN_DEFAULTVAL; bs << uniqueId; bs << sessionID; bs << (uint32_t) txnID; bs << fTableName.fSchema; bs << fTableName.fName; bs << ataDropColumnDefault.fColumnName; string defaultValue(""); bs << defaultValue; //send to WES to process try { fWEClient->write(bs, (uint32_t)pmNum); while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; break; } else { *bsIn >> rc; if (rc != 0) { *bsIn >> errorMsg; } break; } } } catch (runtime_error& ex) //write error { rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { rc = NETWORK_ERROR; errorMsg = " Unknown exception caught while updating SYSTABLE."; } if (rc != 0) throw std::runtime_error(errorMsg); } #if 0 void AlterTableProcessor::dropTableConstraint (uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID, DDLResult& result, ddlpackage::AtaDropTableConstraint& ataDropTableConstraint, ddlpackage::QualifiedName& fTableName) { /*Steps tp drop table constraint 1. Delete the ConstraintName from SYSCONSTRAINT 2. Delete the corresponding row from SYSCONSTRAINTCOL 3. Delete the row from SYSINDEX if PK; 3. Delete the rows from SYSINDEXCOL if PK; */ SUMMARY_INFO("AlterTableProcessor::dropTableConstraint"); ddlpackage::QualifiedName sysCatalogTableName; sysCatalogTableName.fSchema = CALPONT_SCHEMA; sysCatalogTableName.fName = SYSCONSTRAINT_TABLE; boost::shared_ptr systemCatalogPtr; systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); //@Bug 1358 systemCatalogPtr->identity(CalpontSystemCatalog::EC); VERBOSE_INFO("Removing constraint meta data from SYSCONSTRAINT"); std::string err; try { execplan::CalpontSystemCatalog::RID constrainRid = systemCatalogPtr->constraintRID(ataDropTableConstraint.fConstraintName); if (constrainRid == std::numeric_limits::max()) { // build the logging message err = "Alter Table failed: Constraint name not found"; throw std::runtime_error(err); } removeRowFromSysCatalog(sessionID, txnID, result, sysCatalogTableName, constrainRid); VERBOSE_INFO("Removing constraint meta data from SYSCONSTRAINTCOL"); sysCatalogTableName.fName = SYSCONSTRAINTCOL_TABLE; execplan::CalpontSystemCatalog::RIDList ridlist = systemCatalogPtr->constraintColRID(ataDropTableConstraint.fConstraintName); if (ridlist.size() == 0) { // build the logging message err = "Alter Table failed: Constraint name not found"; throw std::runtime_error(err); } removeRowsFromSysCatalog(sessionID, txnID, result, sysCatalogTableName, ridlist); execplan::CalpontSystemCatalog::IndexName idxName; idxName.schema = fTableName.fSchema; idxName.table = fTableName.fName; idxName.index = ataDropTableConstraint.fConstraintName; execplan::CalpontSystemCatalog::ROPair ropair = systemCatalogPtr->indexRID(idxName); if (ropair.rid >= 0) { sysCatalogTableName.fName = SYSINDEX_TABLE; removeRowFromSysCatalog(sessionID, txnID, result, sysCatalogTableName, ropair.rid); } execplan::CalpontSystemCatalog::RIDList ridList = systemCatalogPtr->indexColRIDs(idxName); if (ridList.size() > 0) { execplan::CalpontSystemCatalog::RIDList::const_iterator riditer; sysCatalogTableName.fName = SYSINDEXCOL_TABLE; riditer = ridList.begin(); while (riditer != ridList.end()) { ropair = *riditer; removeRowFromSysCatalog(sessionID, txnID, result, sysCatalogTableName, ropair.rid); riditer++; } } } catch (std::exception& ex) { err = ex.what(); throw std::runtime_error(err); } catch (...) { err = "dropTableConstraint:Unknown exception caught"; throw std::runtime_error(err); } } #endif void AlterTableProcessor::renameTable (uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID, DDLResult& result, ddlpackage::AtaRenameTable& ataRenameTable, ddlpackage::QualifiedName& fTableName, const uint64_t uniqueId) { /*Steps: 1. Update SYSTABLE (table name) 2. Update SYSCOLUMN (table name) */ SUMMARY_INFO("AlterTableProcessor::renameTable"); //@Bug 4599. Check whether the new table exists in infinidb boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); execplan::CalpontSystemCatalog::TableName tableName; tableName.schema = fTableName.fSchema; tableName.table = ataRenameTable.fQualifiedName->fName; execplan::CalpontSystemCatalog::ROPair roPair; roPair.objnum = 0; try { roPair = systemCatalogPtr->tableRID(tableName); } catch (...) { roPair.objnum = 0; } if (roPair.objnum >= 3000) throw std::runtime_error("The new tablename is already in use."); ByteStream bytestream; bytestream << (ByteStream::byte)WE_SVR_UPDATE_SYSTABLE_TABLENAME; bytestream << uniqueId; bytestream << sessionID; bytestream << (uint32_t)txnID; bytestream << fTableName.fSchema; bytestream << fTableName.fName; bytestream << ataRenameTable.fQualifiedName->fName; std::string errorMsg; uint16_t dbRoot; BRM::OID_t sysOid = 1001; ByteStream::byte rc = 0; //Find out where systable is rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot); if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot"); int pmNum = 1; boost::shared_ptr bsIn; OamCache * oamcache = OamCache::makeOamCache(); boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); pmNum = (*dbRootPMMap)[dbRoot]; try { fWEClient->write(bytestream, (uint32_t)pmNum); #ifdef IDB_DDL_DEBUG cout << "Rename table sending WE_SVR_UPDATE_SYSTABLE_TABLENAME to pm " << pmNum << endl; #endif while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; break; } else { *bsIn >> rc; *bsIn >> errorMsg; break; } } } catch (runtime_error& ex) //write error { #ifdef IDB_DDL_DEBUG cout << "create table got exception" << ex.what() << endl; #endif rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { rc = NETWORK_ERROR; errorMsg = " Unknown exception caught while updating SYSTABLE."; #ifdef IDB_DDL_DEBUG cout << "create table got unknown exception" << endl; #endif } if (rc != 0) throw std::runtime_error(errorMsg); //update SYSCOLUMN bytestream.restart(); bytestream << (ByteStream::byte)WE_SVR_UPDATE_SYSCOLUMN_TABLENAME; bytestream << uniqueId; bytestream << sessionID; bytestream << (uint32_t)txnID; bytestream << fTableName.fSchema; bytestream << fTableName.fName; bytestream << ataRenameTable.fQualifiedName->fName; sysOid = 1021; //Find out where syscolumn is rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot); if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot"); pmNum = (*dbRootPMMap)[dbRoot]; try { fWEClient->write(bytestream, (unsigned)pmNum); #ifdef IDB_DDL_DEBUG cout << "Rename table sending WE_SVR_UPDATE_SYSCOLUMN_TABLENAME to pm " << pmNum << endl; #endif while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; break; } else { *bsIn >> rc; *bsIn >> errorMsg; break; } } } catch (runtime_error& ex) //write error { #ifdef IDB_DDL_DEBUG cout << "create table got exception" << ex.what() << endl; #endif rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { rc = NETWORK_ERROR; errorMsg = " Unknown exception caught while updating SYSTABLE."; #ifdef IDB_DDL_DEBUG cout << "create table got unknown exception" << endl; #endif } if (rc != 0) throw std::runtime_error(errorMsg); } void AlterTableProcessor::tableComment(uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID, DDLResult& result, ddlpackage::AtaTableComment& ataTableComment, ddlpackage::QualifiedName& fTableName, const uint64_t uniqueId) { // Currently only process autoincrement values in table comments during alter SUMMARY_INFO("AlterTableProcessor::tableComment"); uint64_t nextVal; BRM::OID_t sysOid = 1001; ByteStream::byte rc = 0; boost::shared_ptr bsIn; uint16_t dbRoot; std::string errorMsg; int pmNum = 1; rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot); OamCache * oamcache = OamCache::makeOamCache(); boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); pmNum = (*dbRootPMMap)[dbRoot]; if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot"); boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); boost::algorithm::to_upper(ataTableComment.fTableComment); boost::regex compat("[[:space:]]*AUTOINCREMENT[[:space:]]*=[[:space:]]*", boost::regex_constants::extended); boost::match_results what; std::string::const_iterator start, end; start = ataTableComment.fTableComment.begin(); end = ataTableComment.fTableComment.end(); boost::match_flag_type flags = boost::match_default; if (boost::regex_search(start, end, what, compat, flags) && what[0].matched) { std::string params(&(*(what[0].second))); char *ep = NULL; const char *str = params.c_str(); errno = 0; nextVal = strtoull(str, &ep, 10); if ((ep == str) || (*ep != '\0') || (errno != 0)) { throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_INVALID_START_VALUE)); } } else { throw std::runtime_error("Invalid table comment"); } // Get the OID for autoinc (if exists) CalpontSystemCatalog::TableName tableName; tableName.schema = fTableName.fSchema; tableName.table = fTableName.fName; CalpontSystemCatalog::TableInfo tblInfo = systemCatalogPtr->tableInfo(tableName); if (tblInfo.tablewithautoincr != 1) { throw std::runtime_error("Table does not have an autoincrement column"); } int32_t oid = systemCatalogPtr->autoColumOid(tableName); fDbrm->resetAISequence(oid, nextVal); ByteStream bs; bs.restart(); bs << (ByteStream::byte) WE_SVR_UPDATE_SYSCOLUMN_AUTOVAL; bs << uniqueId; bs << oid; bs << nextVal; bs << sessionID; try { fWEClient->write(bs, (uint32_t)pmNum); while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; break; } else { *bsIn >> rc; *bsIn >> errorMsg; break; } } } catch (runtime_error& ex) //write error { rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { rc = NETWORK_ERROR; errorMsg = " Unknown exception caught while updating SYSTABLE."; } if (rc != 0) throw std::runtime_error(errorMsg); } void AlterTableProcessor::renameColumn(uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID, DDLResult& result, ddlpackage::AtaRenameColumn& ataRenameColumn, ddlpackage::QualifiedName& fTableName, const uint64_t uniqueId) { /*Steps: 1. Update SYSCOLUMN for name, autoincrement, nextval change 2. Update SYSTABLE if column is autoincrement column */ SUMMARY_INFO("AlterTableProcessor::renameColumn"); boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); ByteStream bs; std::string errorMsg; uint16_t dbRoot; BRM::OID_t sysOid = 1001; ByteStream::byte rc = 0; //Find out where systable is rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot); if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot"); int pmNum = 1; OamCache * oamcache = OamCache::makeOamCache(); boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); pmNum = (*dbRootPMMap)[dbRoot]; boost::shared_ptr bsIn; CalpontSystemCatalog::TableName tableName; CalpontSystemCatalog::TableColName tableColName; tableColName.schema = fTableName.fSchema; tableColName.table = fTableName.fName; tableColName.column = ataRenameColumn.fName; CalpontSystemCatalog::ROPair ropair; string err; try { //This gives us the rid in syscolumn that we want to update tableName.schema = tableColName.schema; tableName.table = tableColName.table; ropair = systemCatalogPtr->tableRID(tableName); if (ropair.objnum < 0) { ostringstream oss; oss << "No such table: " << tableName; throw std::runtime_error(oss.str().c_str()); } ropair = systemCatalogPtr->columnRID(tableColName); if (ropair.objnum < 0) { ostringstream oss; oss << "No such column: " << tableColName; throw std::runtime_error(oss.str().c_str()); } CalpontSystemCatalog::ColType colType = systemCatalogPtr->colType(ropair.objnum); if (!typesAreSame(colType, *ataRenameColumn.fNewType)) { ostringstream oss; oss << "Changing the datatype of a column is not supported"; throw std::runtime_error(oss.str().c_str()); } //@Bug 3746 Check whether the change is about the compression type if (!comptypesAreCompat(colType.compressionType, (*ataRenameColumn.fNewType).fCompressiontype)) { ostringstream oss; oss << "The compression type of an existing column cannot be changed."; throw std::runtime_error(oss.str().c_str()); } //Check whether SYSTABLE needs to be updated CalpontSystemCatalog::TableInfo tblInfo = systemCatalogPtr->tableInfo(tableName); if (((tblInfo.tablewithautoincr == 1) && (colType.autoincrement) && (ataRenameColumn.fNewType->fAutoincrement.compare("n") == 0)) || ((tblInfo.tablewithautoincr == 0) && (ataRenameColumn.fNewType->fAutoincrement.compare("y") == 0))) { //update systable autoincrement column bs.restart(); bs << (ByteStream::byte) WE_SVR_UPDATE_SYSTABLE_AUTO; bs << uniqueId; bs << sessionID; bs << (uint32_t) txnID; bs << fTableName.fSchema; bs << fTableName.fName; if (ataRenameColumn.fNewType->fAutoincrement.compare("y") == 0) bs << (uint32_t) 1; else bs << (uint32_t) 0; //send to WES to process try { fWEClient->write(bs, (uint32_t)pmNum); while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; break; } else { *bsIn >> rc; if (rc != 0) { *bsIn >> errorMsg; } break; } } } catch (runtime_error& ex) //write error { rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { rc = NETWORK_ERROR; errorMsg = " Unknown exception caught while updating SYSTABLE."; } if (rc != 0) throw std::runtime_error(errorMsg); //change a sequence in controller if ((!(tblInfo.tablewithautoincr == 1) || (colType.autoincrement)) && (ataRenameColumn.fNewType->fAutoincrement.compare("y") == 0)) { fDbrm->startAISequence(ropair.objnum, ataRenameColumn.fNewType->fNextvalue, ataRenameColumn.fNewType->fLength, convertDataType(ataRenameColumn.fNewType->fType)); //Reset it in case there is a sequence already fDbrm->resetAISequence(ropair.objnum, ataRenameColumn.fNewType->fNextvalue); } } else if ((tblInfo.tablewithautoincr == 1) && (colType.autoincrement) && (ataRenameColumn.fNewType->fAutoincrement.compare("y") == 0)) {} else { fDbrm->resetAISequence(ropair.objnum, 0); } //Update SYSCOLUMN bs.restart(); bs << (ByteStream::byte) WE_SVR_UPDATE_SYSCOLUMN_RENAMECOLUMN; bs << uniqueId; bs << sessionID; bs << (uint32_t) txnID; bs << fTableName.fSchema; bs << fTableName.fName; bs << ataRenameColumn.fName; bs << ataRenameColumn.fNewName; bs << ataRenameColumn.fNewType->fAutoincrement; //@Bug 5913. for autoincrement column, find the next value from SYSCOLUMN long long nextVal = ataRenameColumn.fNewType->fNextvalue; if ((tblInfo.tablewithautoincr == 1) && (colType.autoincrement) && (ataRenameColumn.fNewType->fAutoincrement.compare("y") == 0)) nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); bs << (uint64_t) nextVal; uint32_t nullable = 1; string defaultValue(""); if (ataRenameColumn.fConstraints.size() > 0) { for (uint32_t j=0; j < ataRenameColumn.fConstraints.size(); j++) { if (ataRenameColumn.fConstraints[j]->fConstraintType == DDL_NOT_NULL) { nullable = 0; break; } } } bs << nullable; if (ataRenameColumn.fDefaultValue) defaultValue = ataRenameColumn.fDefaultValue->fValue; bs << defaultValue; sysOid = 1021; //Find out where syscolumn is rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot); if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot"); pmNum = (*dbRootPMMap)[dbRoot]; //send to WES to process try { fWEClient->write(bs, (uint32_t)pmNum); while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; break; } else { *bsIn >> rc; if (rc != 0) { *bsIn >> errorMsg; } break; } } } catch (runtime_error& ex) //write error { rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { rc = NETWORK_ERROR; errorMsg = " Unknown exception caught while updating SYSTABLE."; } if (rc != 0) throw std::runtime_error(errorMsg); } catch (std::exception& ex) { err = ex.what(); throw std::runtime_error(err); } catch (...) { err = "renameColumn:Unknown exception caught"; throw std::runtime_error(err); } } } // vim:ts=4 sw=4: