/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2016 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /*********************************************************************** * $Id: droptableprocessor.cpp 9744 2013-08-07 03:32:19Z bwilkinson $ * * ***********************************************************************/ #include #include #include using namespace std; #include "droptableprocessor.h" #include "we_messages.h" #include "we_ddlcommandclient.h" using namespace WriteEngine; #include "cacheutils.h" using namespace cacheutils; #include "bytestream.h" using namespace messageqcpp; #include "sqllogger.h" #include "messagelog.h" using namespace logging; #include "calpontsystemcatalog.h" using namespace execplan; #include "oamcache.h" using namespace oam; namespace ddlpackageprocessor { DropTableProcessor::DDLResult DropTableProcessor::processPackageInternal(ddlpackage::SqlStatement* sqlStmt) { SUMMARY_INFO("DropTableProcessor::processPackage"); DDLResult result; result.result = NO_ERROR; std::string err; auto* dropTableStmt = dynamic_cast(sqlStmt); if (!dropTableStmt) { Message::Args args; Message message(9); args.add("DropTableStatement wrong cast"); message.format(args); result.result = DROP_ERROR; result.message = message; return result; } VERBOSE_INFO(dropTableStmt); // Commit current transaction. // all DDL statements cause an implicit commit VERBOSE_INFO("Getting current txnID"); ByteStream::byte rc = 0; BRM::TxnID txnID; txnID.id = fTxnid.id; txnID.valid = fTxnid.valid; int rc1 = 0; rc1 = fDbrm->isReadWrite(); if (rc1 != 0) { Message::Args args; Message message(9); args.add("Unable to execute the statement due to DBRM is read only"); message.format(args); result.result = DROP_ERROR; result.message = message; fSessionManager.rolledback(txnID); return result; } string stmt = dropTableStmt->fSql + "|" + dropTableStmt->fTableName->fSchema + "|"; SQLLogger logger(stmt, fDDLLoggingId, dropTableStmt->fSessionID, txnID.id); std::vector oidList; CalpontSystemCatalog::RIDList tableColRidList; CalpontSystemCatalog::DictOIDList dictOIDList; execplan::CalpontSystemCatalog::ROPair roPair; CalpontSystemCatalog::OID tableAUXColOid; std::string errorMsg; ByteStream bytestream; uint64_t uniqueId = 0; // Bug 5070. Added exception handling try { uniqueId = fDbrm->getUnique64(); } catch (std::exception& ex) { Message::Args args; Message message(9); args.add(ex.what()); message.format(args); result.result = DROP_ERROR; result.message = message; fSessionManager.rolledback(txnID); return result; } catch (...) { Message::Args args; Message message(9); args.add("Unknown error occurred while getting unique number."); message.format(args); result.result = DROP_ERROR; result.message = message; fSessionManager.rolledback(txnID); return result; } fWEClient->addQueue(uniqueId); int pmNum = 1; boost::shared_ptr bsIn; uint64_t tableLockId = 0; OamCache* oamcache = OamCache::makeOamCache(); std::vector moduleIds = oamcache->getModuleIds(); // MCOL-66 The DBRM can't handle concurrent DDL boost::mutex::scoped_lock lk(dbrmMutex); try { // check table lock boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(dropTableStmt->fSessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); systemCatalogPtr->sessionID(dropTableStmt->fSessionID); CalpontSystemCatalog::TableName tableName; tableName.schema = dropTableStmt->fTableName->fSchema; tableName.table = dropTableStmt->fTableName->fName; try { roPair = systemCatalogPtr->tableRID(tableName); if (tableName.schema.compare(execplan::CALPONT_SCHEMA) == 0) { tableAUXColOid = 0; } else { tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(tableName); } } catch (IDBExcept& ie) { if (checkPPLostConnection(ie.what())) { result.result = PP_LOST_CONNECTION; return result; } else { if (ie.errorCode() == ERR_TABLE_NOT_IN_CATALOG) { Message::Args args; Message message(1); args.add("Table does not exist in ColumnStore."); message.format(args); result.result = DROP_TABLE_NOT_IN_CATALOG_ERROR; result.message = message; fSessionManager.rolledback(txnID); return result; } else { result.result = DROP_ERROR; Message::Args args; Message message(9); args.add("Drop table failed due to "); args.add(ie.what()); message.format(args); result.message = message; fSessionManager.rolledback(txnID); return result; } } } uint32_t processID = ::getpid(); int32_t txnid = txnID.id; int32_t sessionId = dropTableStmt->fSessionID; std::string processName("DDLProc"); int i = 0; std::vector pms; for (unsigned i = 0; i < moduleIds.size(); i++) { pms.push_back((uint32_t)moduleIds[i]); } try { tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, BRM::LOADING); } catch (std::exception&) { throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); } if (tableLockId == 0) { int waitPeriod = 10; int sleepTime = 100; // sleep 100 milliseconds between checks int numTries = 10; // try 10 times per second waitPeriod = WriteEngine::Config::getWaitPeriod(); numTries = waitPeriod * 10; struct timespec rm_ts; rm_ts.tv_sec = sleepTime / 1000; rm_ts.tv_nsec = sleepTime % 1000 * 1000000; for (; i < numTries; i++) { struct timespec abs_ts; do { abs_ts.tv_sec = rm_ts.tv_sec; abs_ts.tv_nsec = rm_ts.tv_nsec; } while (nanosleep(&abs_ts, &rm_ts) < 0); try { processID = ::getpid(); txnid = txnID.id; sessionId = dropTableStmt->fSessionID; ; processName = "DDLProc"; tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, BRM::LOADING); } catch (std::exception&) { throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); } if (tableLockId > 0) break; } if (i >= numTries) // error out { Message::Args args; string strOp("drop table"); args.add(strOp); args.add(processName); args.add((uint64_t)processID); args.add(sessionId); throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args)); } } // 1. Get the OIDs for the columns // 2. Get the OIDs for the dictionaries // 3. Save the OIDs to a log file // 4. Remove the Table from SYSTABLE // 5. Remove the columns from SYSCOLUMN // 6. Commit the changes made to systables // 7. Flush PrimProc Cache // 8. Update extent map // 9. Remove the column and dictionary files // 10.Return the OIDs CalpontSystemCatalog::TableName userTableName; userTableName.schema = dropTableStmt->fTableName->fSchema; userTableName.table = dropTableStmt->fTableName->fName; tableColRidList = systemCatalogPtr->columnRIDs(userTableName); dictOIDList = systemCatalogPtr->dictOIDs(userTableName); Oam oam; // Save qualified tablename, all column, dictionary OIDs, and transaction ID into a file in ASCII format for (unsigned i = 0; i < tableColRidList.size(); i++) { if (tableColRidList[i].objnum > 3000) oidList.push_back(tableColRidList[i].objnum); } for (unsigned i = 0; i < dictOIDList.size(); i++) { if (dictOIDList[i].dictOID > 3000) oidList.push_back(dictOIDList[i].dictOID); } // get a unique number VERBOSE_INFO("Removing the SYSTABLE meta data"); #ifdef IDB_DDL_DEBUG cout << fTxnid.id << " Removing the SYSTABLEs meta data" << endl; #endif bytestream << (ByteStream::byte)WE_SVR_DELETE_SYSTABLE; bytestream << uniqueId; bytestream << (uint32_t)dropTableStmt->fSessionID; bytestream << (uint32_t)txnID.id; bytestream << dropTableStmt->fTableName->fSchema; bytestream << dropTableStmt->fTableName->fName; // Find out where systable is BRM::OID_t sysOid = 1001; ByteStream::byte rc = 0; uint16_t dbRoot; rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot); if (rc != 0) { result.result = (ResultCode)rc; Message::Args args; Message message(9); args.add("Error while calling getSysCatDBRoot"); args.add(errorMsg); result.message = message; // release transaction fSessionManager.rolledback(txnID); return result; } boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); pmNum = (*dbRootPMMap)[dbRoot]; try { #ifdef IDB_DDL_DEBUG cout << fTxnid.id << " Drop table sending WE_SVR_DELETE_SYSTABLES to pm " << pmNum << endl; #endif // cout << "deleting systable entries with txnid " << txnID.id << endl; fWEClient->write(bytestream, (uint32_t)pmNum); while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if (bsIn->length() == 0) // read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; break; } else { *bsIn >> rc; if (rc != 0) { *bsIn >> errorMsg; } break; } } } catch (runtime_error& ex) // write error { #ifdef IDB_DDL_DEBUG cout << fTxnid.id << " Drop table got exception" << endl; #endif rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { rc = NETWORK_ERROR; #ifdef IDB_DDL_DEBUG cout << fTxnid.id << " Drop table got unknown exception" << endl; #endif } if (rc != 0) { if (checkPPLostConnection(errorMsg)) { result.result = PP_LOST_CONNECTION; (void)fDbrm->releaseTableLock(tableLockId); fWEClient->removeQueue(uniqueId); return result; } else { cout << fTxnid.id << " Error in dropping table from systables(" << (int)rc << ") " << errorMsg.c_str() << endl; Message::Args args; Message message(9); args.add("Error in dropping table from systables."); args.add(errorMsg); message.format(args); result.result = (ResultCode)rc; result.message = message; fSessionManager.rolledback(txnID); (void)fDbrm->releaseTableLock(tableLockId); fWEClient->removeQueue(uniqueId); return result; } } // remove from syscolumn bytestream.restart(); bytestream << (ByteStream::byte)WE_SVR_DELETE_SYSCOLUMN; bytestream << uniqueId; bytestream << (uint32_t)dropTableStmt->fSessionID; bytestream << (uint32_t)txnID.id; bytestream << dropTableStmt->fTableName->fSchema; bytestream << dropTableStmt->fTableName->fName; // Find out where syscolumn is sysOid = 1021; rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot); if (rc != 0) { result.result = (ResultCode)rc; Message::Args args; Message message(9); args.add("Error while calling getSysCatDBRoot"); args.add(errorMsg); result.message = message; // release transaction fSessionManager.rolledback(txnID); return result; } pmNum = (*dbRootPMMap)[dbRoot]; try { #ifdef IDB_DDL_DEBUG cout << fTxnid.id << " Drop table sending WE_SVR_DELETE_SYSCOLUMN to pm " << pmNum << endl; #endif fWEClient->write(bytestream, (unsigned)pmNum); while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if (bsIn->length() == 0) // read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; break; } else { *bsIn >> rc; if (rc != 0) { *bsIn >> errorMsg; } break; } } } catch (runtime_error& ex) // write error { #ifdef IDB_DDL_DEBUG cout << fTxnid.id << " Drop table got exception" << endl; #endif rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { rc = NETWORK_ERROR; #ifdef IDB_DDL_DEBUG cout << fTxnid.id << " Drop table got unknown exception" << endl; #endif } if (rc != 0) { cout << fTxnid.id << " Error in dropping column from systables(" << (int)rc << ") " << errorMsg.c_str() << endl; Message::Args args; Message message(9); args.add("Error in dropping column from systables."); args.add(errorMsg); message.format(args); result.result = (ResultCode)rc; result.message = message; // release table lock and session fSessionManager.rolledback(txnID); (void)fDbrm->releaseTableLock(tableLockId); fWEClient->removeQueue(uniqueId); return result; } rc = commitTransaction(uniqueId, txnID); if (rc != 0) { cout << txnID.id << " rolledback transaction " << " and valid is " << txnID.valid << endl; fSessionManager.rolledback(txnID); } else { cout << txnID.id << " commiting transaction " << txnID.id << " and valid is " << txnID.valid << endl; fSessionManager.committed(txnID); } if (rc != 0) { Message::Args args; Message message(9); ostringstream oss; oss << " Commit failed with error code " << rc; args.add(oss.str()); fSessionManager.rolledback(txnID); (void)fDbrm->releaseTableLock(tableLockId); message.format(args); result.result = (ResultCode)rc; result.message = message; fWEClient->removeQueue(uniqueId); return result; } // Log the DDL statement logDDL(dropTableStmt->fSessionID, txnID.id, dropTableStmt->fSql, dropTableStmt->fOwner); } catch (std::exception& ex) { result.result = DROP_ERROR; Message::Args args; Message message(9); args.add("Drop table failed due to "); args.add(ex.what()); fSessionManager.rolledback(txnID); try { (void)fDbrm->releaseTableLock(tableLockId); } catch (std::exception&) { args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); } message.format(args); result.message = message; fWEClient->removeQueue(uniqueId); return result; } catch (...) { result.result = DROP_ERROR; errorMsg = "Error in getting information from system catalog or from dbrm."; Message::Args args; Message message(9); args.add("Drop table failed due to "); args.add(errorMsg); fSessionManager.rolledback(txnID); try { (void)fDbrm->releaseTableLock(tableLockId); } catch (std::exception&) { args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); } message.format(args); result.message = message; fWEClient->removeQueue(uniqueId); return result; } try { (void)fDbrm->releaseTableLock(tableLockId); } catch (std::exception&) { result.result = DROP_ERROR; Message::Args args; Message message(9); args.add("Drop table failed due to "); args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); fSessionManager.rolledback(txnID); message.format(args); result.message = message; fWEClient->removeQueue(uniqueId); return result; } // MCOL-5021 Valid AUX column OID for a table is > 3000 // Tables that were created before this feature was added will have // tableAUXColOid = 0 if (tableAUXColOid > 3000) { oidList.push_back(tableAUXColOid); CalpontSystemCatalog::ROPair auxRoPair; auxRoPair.rid = 0; auxRoPair.objnum = tableAUXColOid; tableColRidList.push_back(auxRoPair); } // Save the oids to a file try { createWriteDropLogFile(roPair.objnum, uniqueId, oidList); } catch (std::exception& ex) { result.result = WARNING; Message::Args args; Message message(9); args.add("Drop table failed due to "); args.add(ex.what()); message.format(args); result.message = message; fSessionManager.rolledback(txnID); fWEClient->removeQueue(uniqueId); return result; } // Bug 4208 Drop the PrimProcFDCache before droping the column files // FOr Windows, this ensures (most likely) that the column files have // no open handles to hinder the deletion of the files. rc = cacheutils::dropPrimProcFdCache(); // Drop files bytestream.restart(); bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES; bytestream << uniqueId; bytestream << (uint32_t)oidList.size(); for (unsigned i = 0; i < oidList.size(); i++) { bytestream << (uint32_t)oidList[i]; } #ifdef IDB_DDL_DEBUG cout << fTxnid.id << " Drop table removing column files" << endl; #endif uint32_t msgRecived = 0; try { fWEClient->write_to_all(bytestream); bsIn.reset(new ByteStream()); ByteStream::byte tmp8; while (1) { if (msgRecived == fWEClient->getPmCount()) break; fWEClient->read(uniqueId, bsIn); if (bsIn->length() == 0) // read error { rc = NETWORK_ERROR; fWEClient->removeQueue(uniqueId); break; } else { *bsIn >> tmp8; rc = tmp8; if (rc != 0) { *bsIn >> errorMsg; fWEClient->removeQueue(uniqueId); break; } else msgRecived++; } } } catch (std::exception& ex) { result.result = WARNING; Message::Args args; Message message(9); args.add("Drop table failed due to "); args.add(ex.what()); message.format(args); result.message = message; fSessionManager.rolledback(txnID); fWEClient->removeQueue(uniqueId); return result; } catch (...) { result.result = WARNING; errorMsg = "Error in getting information from system catalog or from dbrm."; Message::Args args; Message message(9); args.add("Drop table failed due to "); args.add(errorMsg); message.format(args); result.message = message; fSessionManager.rolledback(txnID); fWEClient->removeQueue(uniqueId); return result; } // Drop PrimProc FD cache rc = cacheutils::dropPrimProcFdCache(); // Flush primProc cache rc = cacheutils::flushOIDsFromCache(oidList); // Delete extents from extent map #ifdef IDB_DDL_DEBUG cout << fTxnid.id << " Drop table deleteOIDs" << endl; #endif rc = fDbrm->deleteOIDs(oidList); if (rc != 0) { Message::Args args; Message message(1); args.add("Table dropped with warning "); args.add("Remove from extent map failed."); args.add(""); args.add(""); message.format(args); result.result = WARNING; result.message = message; fSessionManager.rolledback(txnID); fWEClient->removeQueue(uniqueId); return result; } // Remove the log file fWEClient->removeQueue(uniqueId); deleteLogFile(DROPTABLE_LOG, roPair.objnum, uniqueId); // release the transaction // fSessionManager.committed(txnID); returnOIDs(tableColRidList, dictOIDList); return result; } TruncTableProcessor::DDLResult TruncTableProcessor::processPackageInternal(ddlpackage::SqlStatement* sqlStmt) { SUMMARY_INFO("TruncTableProcessor::processPackage"); // 1. lock the table // 2. Get the OIDs for the columns // 3. Get the OIDs for the dictionaries // 4. Save the OIDs // 5. Disable all partitions // 6. Remove the column and dictionary files // 7. Flush PrimProc Cache // 8. Update extent map // 9. Use the OIDs to create new column and dictionary files with abbreviate extent // 10 Update next value if the table has autoincrement column DDLResult result; result.result = NO_ERROR; std::string err; auto* truncTableStmt = dynamic_cast(sqlStmt); VERBOSE_INFO(truncTableStmt); // @Bug 4150. Check dbrm status before doing anything to the table. int rc = 0; rc = fDbrm->isReadWrite(); BRM::TxnID txnID; txnID.id = fTxnid.id; txnID.valid = fTxnid.valid; if (rc != 0) { Message::Args args; Message message(9); args.add("Unable to execute the statement due to DBRM is read only"); message.format(args); result.result = DROP_ERROR; result.message = message; fSessionManager.rolledback(txnID); return result; } //@Bug 5765 log the schema. string stmt = truncTableStmt->fSql + "|" + truncTableStmt->fTableName->fSchema + "|"; SQLLogger logger(stmt, fDDLLoggingId, truncTableStmt->fSessionID, txnID.id); std::vector columnOidList; std::vector allOidList; CalpontSystemCatalog::OID tableAuxColOid; CalpontSystemCatalog::RIDList tableColRidList; CalpontSystemCatalog::DictOIDList dictOIDList; execplan::CalpontSystemCatalog::ROPair roPair; std::string processName("DDLProc"); uint32_t processID = ::getpid(); int32_t txnid = txnID.id; boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(truncTableStmt->fSessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); systemCatalogPtr->sessionID(truncTableStmt->fSessionID); CalpontSystemCatalog::TableInfo tableInfo; uint64_t uniqueId = 0; // Bug 5070. Added exception handling try { uniqueId = fDbrm->getUnique64(); } catch (std::exception& ex) { Message::Args args; Message message(9); args.add(ex.what()); message.format(args); result.result = DROP_ERROR; result.message = message; fSessionManager.rolledback(txnID); return result; } catch (...) { Message::Args args; Message message(9); args.add("Unknown error occurred while getting unique number."); message.format(args); result.result = DROP_ERROR; result.message = message; fSessionManager.rolledback(txnID); return result; } fWEClient->addQueue(uniqueId); int pmNum = 1; boost::shared_ptr bsIn; string errorMsg; uint32_t autoIncColOid = 0; uint64_t tableLockId = 0; OamCache* oamcache = OamCache::makeOamCache(); std::vector moduleIds = oamcache->getModuleIds(); try { // check table lock CalpontSystemCatalog::TableName tableName; tableName.schema = truncTableStmt->fTableName->fSchema; tableName.table = truncTableStmt->fTableName->fName; roPair = systemCatalogPtr->tableRID(tableName); int32_t sessionId = truncTableStmt->fSessionID; std::string processName("DDLProc"); int i = 0; std::vector pms; for (unsigned i = 0; i < moduleIds.size(); i++) { pms.push_back((uint32_t)moduleIds[i]); } try { tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, BRM::LOADING); } catch (std::exception&) { throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); } if (tableLockId == 0) { int waitPeriod = 10; int sleepTime = 100; // sleep 100 milliseconds between checks int numTries = 10; // try 10 times per second waitPeriod = Config::getWaitPeriod(); numTries = waitPeriod * 10; struct timespec rm_ts; rm_ts.tv_sec = sleepTime / 1000; rm_ts.tv_nsec = sleepTime % 1000 * 1000000; for (; i < numTries; i++) { struct timespec abs_ts; do { abs_ts.tv_sec = rm_ts.tv_sec; abs_ts.tv_nsec = rm_ts.tv_nsec; } while (nanosleep(&abs_ts, &rm_ts) < 0); try { processID = ::getpid(); txnid = txnID.id; sessionId = truncTableStmt->fSessionID; processName = "DDLProc"; tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, BRM::LOADING); } catch (std::exception&) { throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); } if (tableLockId > 0) break; } if (i >= numTries) // error out { Message::Args args; args.add(processName); args.add((uint64_t)processID); args.add(sessionId); throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args)); } } CalpontSystemCatalog::TableName userTableName; userTableName.schema = truncTableStmt->fTableName->fSchema; userTableName.table = truncTableStmt->fTableName->fName; tableColRidList = systemCatalogPtr->columnRIDs(userTableName); tableAuxColOid = systemCatalogPtr->tableAUXColumnOID(userTableName); dictOIDList = systemCatalogPtr->dictOIDs(userTableName); for (unsigned i = 0; i < tableColRidList.size(); i++) { if (tableColRidList[i].objnum > 3000) { columnOidList.push_back(tableColRidList[i].objnum); allOidList.push_back(tableColRidList[i].objnum); } } if (tableAuxColOid > 3000) { columnOidList.push_back(tableAuxColOid); allOidList.push_back(tableAuxColOid); } for (unsigned i = 0; i < dictOIDList.size(); i++) { if (dictOIDList[i].dictOID > 3000) allOidList.push_back(dictOIDList[i].dictOID); } // Check whether the table has autoincrement column tableInfo = systemCatalogPtr->tableInfo(userTableName); } catch (std::exception& ex) { if (checkPPLostConnection(ex.what())) { if (tableLockId != 0) { Message::Args args; Message message(9); args.add("Truncate table failed: "); args.add(ex.what()); args.add(""); try { (void)fDbrm->releaseTableLock(tableLockId); } catch (std::exception&) { args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); fSessionManager.rolledback(txnID); message.format(args); fWEClient->removeQueue(uniqueId); result.result = TRUNC_ERROR; return result; } } fWEClient->removeQueue(uniqueId); result.result = PP_LOST_CONNECTION; return result; } else { cerr << "TruncateTableProcessor::processPackage: " << ex.what() << endl; Message::Args args; Message message(9); args.add("Truncate table failed: "); args.add(ex.what()); args.add(""); fSessionManager.rolledback(txnID); try { (void)fDbrm->releaseTableLock(tableLockId); } catch (std::exception&) { args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); } fWEClient->removeQueue(uniqueId); message.format(args); result.result = TRUNC_ERROR; result.message = message; return result; } } catch (...) { cerr << "TruncateTableProcessor::processPackage: caught unknown exception!" << endl; Message::Args args; Message message(1); args.add("Truncate table failed: "); args.add("encountered unkown exception"); args.add(""); try { (void)fDbrm->releaseTableLock(tableLockId); } catch (std::exception&) { args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); } fWEClient->removeQueue(uniqueId); message.format(args); result.result = TRUNC_ERROR; result.message = message; return result; } // Save the oids to a file try { createWriteTruncateTableLogFile(roPair.objnum, uniqueId, allOidList); } catch (std::exception& ex) { Message::Args args; Message message(9); args.add("Truncate table failed due to "); args.add(ex.what()); fSessionManager.rolledback(txnID); fWEClient->removeQueue(uniqueId); message.format(args); //@bug 4515 Release the tablelock as nothing has done to this table. try { (void)fDbrm->releaseTableLock(tableLockId); } catch (std::exception&) { } result.result = TRUNC_ERROR; result.message = message; return result; } ByteStream bytestream; ByteStream::byte tmp8; // MCOL-66 The DBRM can't handle concurrent DDL boost::mutex::scoped_lock lk(dbrmMutex); try { // Disable extents first int rc1 = fDbrm->markAllPartitionForDeletion(allOidList); if (rc1 != 0) { string errMsg; BRM::errString(rc, errMsg); throw std::runtime_error(errMsg); } // Bug 4208 Drop the PrimProcFDCache before droping the column files // FOr Windows, this ensures (most likely) that the column files have // no open handles to hinder the deletion of the files. rc = cacheutils::dropPrimProcFdCache(); VERBOSE_INFO("Removing files"); bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES; bytestream << uniqueId; bytestream << (uint32_t)allOidList.size(); for (unsigned i = 0; i < allOidList.size(); i++) { bytestream << (uint32_t)allOidList[i]; } uint32_t msgRecived = 0; try { fWEClient->write_to_all(bytestream); bsIn.reset(new ByteStream()); while (1) { if (msgRecived == fWEClient->getPmCount()) break; fWEClient->read(uniqueId, bsIn); if (bsIn->length() == 0) // read error { rc = NETWORK_ERROR; fWEClient->removeQueue(uniqueId); break; } else { *bsIn >> tmp8; rc = tmp8; if (rc != 0) { *bsIn >> errorMsg; fWEClient->removeQueue(uniqueId); break; } else msgRecived++; } } } catch (std::exception& ex) { Message::Args args; Message message(9); args.add("Truncate table failed due to "); args.add(ex.what()); fSessionManager.rolledback(txnID); fWEClient->removeQueue(uniqueId); message.format(args); result.result = TRUNC_ERROR; result.message = message; deleteLogFile(TRUNCATE_LOG, roPair.objnum, uniqueId); return result; } catch (...) { result.result = DROP_ERROR; errorMsg = "Error in getting information from system catalog or from dbrm."; Message::Args args; Message message(9); args.add("Truncate table failed due to "); args.add(errorMsg); fSessionManager.rolledback(txnID); fWEClient->removeQueue(uniqueId); message.format(args); result.result = TRUNC_ERROR; result.message = message; deleteLogFile(TRUNCATE_LOG, roPair.objnum, uniqueId); return result; } // Drop PrimProc FD cache rc = cacheutils::dropPrimProcFdCache(); // Flush primProc cache rc = cacheutils::flushOIDsFromCache(allOidList); // Delete extents from extent map rc = fDbrm->deleteOIDs(allOidList); if (rc != 0) { Message::Args args; Message message(1); args.add("Table truncated with warning "); args.add("Remove from extent map failed."); args.add(""); args.add(""); message.format(args); result.result = WARNING; result.message = message; fSessionManager.rolledback(txnID); fWEClient->removeQueue(uniqueId); return result; } // Get the number of tables in the database, the current table is included. int tableCount = systemCatalogPtr->getTableCount(); Oam oam; // Calculate which dbroot the columns should start DBRootConfigList dbRootList = oamcache->getDBRootNums(); uint16_t useDBRootIndex = tableCount % dbRootList.size(); // Find out the dbroot# corresponding the useDBRootIndex from oam uint16_t useDBRoot = dbRootList[useDBRootIndex]; bytestream.restart(); bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATETABLEFILES; bytestream << uniqueId; bytestream << (uint32_t)txnID.id; uint32_t numOids = columnOidList.size() + dictOIDList.size(); bytestream << numOids; CalpontSystemCatalog::ColType colType; for (unsigned col = 0; col < columnOidList.size(); col++) { colType = systemCatalogPtr->colType(columnOidList[col]); if (colType.autoincrement) autoIncColOid = colType.columnOID; bytestream << (uint32_t)columnOidList[col]; bytestream << (uint8_t)colType.colDataType; bytestream << (uint8_t) false; bytestream << (uint32_t)colType.colWidth; bytestream << (uint16_t)useDBRoot; bytestream << (uint32_t)colType.compressionType; } for (unsigned col = 0; col < dictOIDList.size(); col++) { colType = systemCatalogPtr->colTypeDct(dictOIDList[col].dictOID); bytestream << (uint32_t)dictOIDList[col].dictOID; bytestream << (uint8_t)colType.colDataType; bytestream << (uint8_t) true; bytestream << (uint32_t)colType.colWidth; bytestream << (uint16_t)useDBRoot; bytestream << (uint32_t)colType.compressionType; } boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); pmNum = (*dbRootPMMap)[useDBRoot]; try { #ifdef IDB_DDL_DEBUG cout << "Truncate table sending We_SVR_WRITE_CREATETABLEFILES to pm " << pmNum << endl; #endif fWEClient->write(bytestream, pmNum); while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if (bsIn->length() == 0) // read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; break; } else { *bsIn >> tmp8; rc = tmp8; if (rc != 0) { *bsIn >> errorMsg; } break; } } if (rc != 0) { // drop the newly created files bytestream.restart(); bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES; bytestream << uniqueId; bytestream << (uint32_t)(allOidList.size()); for (unsigned i = 0; i < (allOidList.size()); i++) { bytestream << (uint32_t)(allOidList[i]); } fWEClient->write(bytestream, pmNum); while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if (bsIn->length() == 0) // read error { break; } else { *bsIn >> tmp8; // rc = tmp8; break; } } Message::Args args; Message message(1); args.add("Truncate table failed."); args.add(errorMsg); args.add(""); message.format(args); result.result = TRUNC_ERROR; result.message = message; // rc = fSessionManager.setTableLock( roPair.objnum, truncTableStmt.fSessionID, processID, // processName, false ); fSessionManager.rolledback(txnID); return result; } } catch (runtime_error&) { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server"; } } catch (std::exception& ex) { Message::Args args; Message message(1); args.add("Truncate table failed."); args.add(ex.what()); args.add(""); message.format(args); result.result = TRUNC_ERROR; result.message = message; // rc = fSessionManager.setTableLock( roPair.objnum, truncTableStmt.fSessionID, processID, processName, // false ); fSessionManager.rolledback(txnID); return result; } catch (...) { Message::Args args; Message message(1); args.add("Truncate table failed: "); args.add("Remove column files failed."); args.add(""); args.add(""); message.format(args); result.result = TRUNC_ERROR; result.message = message; // rc = fSessionManager.setTableLock( roPair.objnum, truncTableStmt.fSessionID, processID, processName, // false ); fSessionManager.rolledback(txnID); return result; } if (rc != 0) { rollBackTransaction(uniqueId, txnID, truncTableStmt->fSessionID); // What to do with the error code fSessionManager.rolledback(txnID); } // Check whether the table has autoincrement column if (tableInfo.tablewithautoincr == 1) { // reset nextvalue to 1 WE_DDLCommandClient commandClient; rc = commandClient.UpdateSyscolumnNextval(autoIncColOid, 1); } // Log the DDL statement logDDL(truncTableStmt->fSessionID, txnID.id, truncTableStmt->fSql, truncTableStmt->fOwner); try { (void)fDbrm->releaseTableLock(tableLockId); } catch (std::exception&) { Message::Args args; Message message(1); args.add("Table truncated with warning "); args.add("Release table failed."); args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); args.add(""); message.format(args); result.result = WARNING; result.message = message; fSessionManager.rolledback(txnID); fWEClient->removeQueue(uniqueId); } // release the transaction fSessionManager.committed(txnID); fWEClient->removeQueue(uniqueId); // Remove the log file try { deleteLogFile(TRUNCATE_LOG, roPair.objnum, uniqueId); } catch (...) { } return result; } } // namespace ddlpackageprocessor