/* Copyright (C) 2014 InfiniDB, Inc. * Copyright (C) 2016 MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /*********************************************************************** * $Id: commandpackageprocessor.cpp 9613 2013-06-12 15:44:24Z dhall $ * * ***********************************************************************/ #include #include #include #include #include #include #include #include #include "commandpackageprocessor.h" #include "messagelog.h" #include "dbrm.h" #include "sqllogger.h" #include "tablelockdata.h" #include "we_messages.h" #include "we_ddlcommandclient.h" #include "oamcache.h" #include "liboamcpp.h" #include "resourcemanager.h" #include "simplecolumn.h" #include "functioncolumn.h" #include "aggregatecolumn.h" #include "simplefilter.h" #include "constantcolumn.h" #include "pseudocolumn.h" #include "functor_str.h" using namespace std; using namespace WriteEngine; using namespace dmlpackage; using namespace execplan; using namespace logging; using namespace boost; using namespace BRM; using namespace funcexp; namespace dmlpackageprocessor { typedef CalpontSelectExecutionPlan::ColumnMap::value_type CMVT_; // Tracks active cleartablelock commands by storing set of table lock IDs /*static*/ std::set CommandPackageProcessor::fActiveClearTableLockCmds; /*static*/ boost::mutex CommandPackageProcessor::fActiveClearTableLockCmdMutex; DMLPackageProcessor::DMLResult CommandPackageProcessor::processPackageInternal( dmlpackage::CalpontDMLPackage& cpackage) { SUMMARY_INFO("CommandPackageProcessor::processPackage"); DMLResult result; result.result = NO_ERROR; VERBOSE_INFO("Processing Command DML Package..."); std::string stmt = cpackage.get_DMLStatement(); boost::algorithm::to_upper(stmt); trim(stmt); fSessionID = cpackage.get_SessionID(); BRM::TxnID txnid = fSessionManager.getTxnID(cpackage.get_SessionID()); uint64_t uniqueId = 0; // Bug 5070. Added exception handling try { uniqueId = fDbrm->getUnique64(); } catch (std::exception& ex) { logging::Message::Args args; logging::Message message(9); args.add(ex.what()); message.format(args); result.result = COMMAND_ERROR; result.message = message; fSessionManager.rolledback(txnid); return result; } catch (...) { logging::Message::Args args; logging::Message message(9); args.add("Unknown error occurred while getting unique number."); message.format(args); result.result = COMMAND_ERROR; result.message = message; fSessionManager.rolledback(txnid); return result; } string errorMsg; bool queRemoved = false; logging::LoggingID lid(20); logging::MessageLog ml(lid); LoggingID logid(DMLLoggingId, fSessionID, txnid.id); logging::Message::Args args1; logging::Message msg(1); Logger logger(logid.fSubsysID); if (stmt != "CLEANUP") { args1.add("Start SQL statement: "); args1.add(stmt); msg.format(args1); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); } // fWEClient->addQueue(uniqueId); try { // set-up the transaction if ((stmt == "COMMIT") || (stmt == "ROLLBACK")) { // SQLLogger sqlLogger(stmt, DMLLoggingId, cpackage.get_SessionID(), txnid.id); if ((txnid.valid)) { vector lbidList; fDbrm->getUncommittedExtentLBIDs(static_cast(txnid.id), lbidList); bool cpInvalidated = false; // cout << "get a valid txnid " << txnid.id << " and stmt is " << stmt << " and isBachinsert is " << // cpackage.get_isBatchInsert() << endl; if ((stmt == "COMMIT") && (cpackage.get_isBatchInsert())) { // update syscolumn for the next value boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID); CalpontSystemCatalog::TableName tableName; tableName = systemCatalogPtr->tableName(cpackage.getTableOid()); try { uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); if (nextVal != AUTOINCR_SATURATED) // need to update syscolumn { // get autoincrement column oid int32_t columnOid = systemCatalogPtr->autoColumOid(tableName); // get the current nextVal from controller scoped_ptr aDbrm(new DBRM()); uint64_t nextValInController = 0; bool validNextVal = false; aDbrm->getAILock(columnOid); nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); // in case it has changed validNextVal = aDbrm->getAIValue(columnOid, &nextValInController); if ((validNextVal) && (nextValInController > nextVal)) { fWEClient->removeQueue(uniqueId); queRemoved = true; WE_DDLCommandClient ddlClient; uint8_t rc = ddlClient.UpdateSyscolumnNextval(columnOid, nextValInController); //@bug 5894. Need release lock. aDbrm->releaseAILock(columnOid); if (rc != 0) throw std::runtime_error("Error in UpdateSyscolumnNextval"); } else aDbrm->releaseAILock(columnOid); } } catch (std::exception& ex) { // Rollback transaction rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg); fSessionManager.rolledback(txnid); throw std::runtime_error(ex.what()); } // systemCatalogPtr->updateColinfoCache(nextValMap); int weRc = 0; if (cpackage.get_isAutocommitOn()) { weRc = commitBatchAutoOnTransaction(uniqueId, txnid, cpackage.getTableOid(), errorMsg); if (weRc != 0) BRM::errString(weRc, errorMsg); cpInvalidated = true; } else { weRc = fDbrm->vbCommit(txnid.id); if (weRc != 0) BRM::errString(weRc, errorMsg); // weRc = commitBatchAutoOffTransaction(uniqueId, txnid, cpackage.getTableOid(), errorMsg); } if (weRc != 0) { throw std::runtime_error(errorMsg); } logging::logCommand(cpackage.get_SessionID(), txnid.id, "COMMIT;"); fSessionManager.committed(txnid); // cout << "releasing transaction id for batchinsert" << txnid.id << endl; } else if (stmt == "COMMIT") { // cout << "success in commitTransaction" << endl; // update syscolumn for the next value boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID); CalpontSystemCatalog::TableName tableName; uint32_t tableOid = cpackage.getTableOid(); std::vector tableLocks = fDbrm->getAllTableLocks(); if (tableOid == 0) // special case: transaction commit for autocommit off and not following a dml // statement immediately { TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID); TablelockData::OIDTablelock tablelockMap = tablelockData->getOidTablelockMap(); TablelockData::OIDTablelock::iterator iter; if (!tablelockMap.empty()) { for (unsigned k = 0; k < tableLocks.size(); k++) { iter = tablelockMap.find(tableLocks[k].tableOID); if (iter != tablelockMap.end()) { tableName = systemCatalogPtr->tableName(tableLocks[k].tableOID); try { uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); if (nextVal != AUTOINCR_SATURATED) // neet to update syscolumn { // get autoincrement column oid int32_t columnOid = systemCatalogPtr->autoColumOid(tableName); // get the current nextVal from controller scoped_ptr aDbrm(new DBRM()); uint64_t nextValInController = 0; bool validNextVal = false; aDbrm->getAILock(columnOid); nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); // in case it has changed validNextVal = aDbrm->getAIValue(columnOid, &nextValInController); if ((validNextVal) && (nextValInController > (uint64_t)nextVal)) { fWEClient->removeQueue(uniqueId); queRemoved = true; WE_DDLCommandClient ddlClient; uint8_t rc = ddlClient.UpdateSyscolumnNextval(columnOid, nextValInController, fSessionID); aDbrm->releaseAILock(columnOid); if (rc != 0) { // for now fSessionManager.committed(txnid); throw std::runtime_error("Error in UpdateSyscolumnNextval"); } } else aDbrm->releaseAILock(columnOid); } } catch (std::exception& ex) { // Rollback transaction, release tablelock rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg); fDbrm->releaseTableLock(iter->second); fSessionManager.rolledback(txnid); throw std::runtime_error(ex.what()); } } } } } else { if (tableOid >= 3000) { tableName = systemCatalogPtr->tableName(tableOid); try { uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); if (nextVal != AUTOINCR_SATURATED) // need to update syscolumn { // get autoincrement column oid int32_t columnOid = systemCatalogPtr->autoColumOid(tableName); // get the current nextVal from controller scoped_ptr aDbrm(new DBRM()); uint64_t nextValInController = 0; bool validNextVal = false; aDbrm->getAILock(columnOid); nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); // in case it has changed validNextVal = aDbrm->getAIValue(columnOid, &nextValInController); if ((validNextVal) && (nextValInController > (uint64_t)nextVal)) { fWEClient->removeQueue(uniqueId); queRemoved = true; WE_DDLCommandClient ddlClient; uint8_t rc = ddlClient.UpdateSyscolumnNextval(columnOid, nextValInController, fSessionID); aDbrm->releaseAILock(columnOid); if (rc != 0) { // for now fSessionManager.committed(txnid); throw std::runtime_error("Error in UpdateSyscolumnNextval"); } } else aDbrm->releaseAILock(columnOid); } } catch (std::exception& ex) { // Rollback transaction, release tablelock rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg); for (unsigned k = 0; k < tableLocks.size(); k++) { if (tableLocks[k].tableOID == tableOid) { try { fDbrm->releaseTableLock(tableLocks[k].id); } catch (std::exception&) { } } } fSessionManager.rolledback(txnid); throw std::runtime_error(ex.what()); } } } int weRc = commitTransaction(uniqueId, txnid); logging::logCommand(cpackage.get_SessionID(), txnid.id, "COMMIT;"); if (weRc != WriteEngine::NO_ERROR) { // cout << "Got an error in commitTransaction" << endl; WErrorCodes ec; ostringstream oss; oss << "COMMIT failed: " << ec.errorString(weRc); throw std::runtime_error(oss.str()); } fSessionManager.committed(txnid); // cout << "commit releasing transaction id " << txnid.id << endl; } else if ((stmt == "ROLLBACK") && (cpackage.get_isBatchInsert())) { int weRc = 0; // version rollback, Bulkrollback weRc = tryToRollBackTransaction(uniqueId, txnid, fSessionID, errorMsg); if (weRc == 0) { // MCOL-5021 fDbrm->addToLBIDList(fSessionID, lbidList); //@Bug 4560 invalidate cp first as bulkrollback will truncate the newly added lbids. fDbrm->invalidateUncommittedExtentLBIDs(0, true, &lbidList); cpInvalidated = true; weRc = rollBackBatchAutoOnTransaction(uniqueId, txnid, fSessionID, cpackage.getTableOid(), errorMsg); } else { throw std::runtime_error(errorMsg); } logging::logCommand(cpackage.get_SessionID(), txnid.id, "ROLLBACK;"); if (weRc != 0) { //@Bug 4524. Don't set to readonly. Just error out. WErrorCodes ec; ostringstream oss; oss << "ROLLBACK batch insert failed due to: " << "(" << weRc << ")" << ec.errorString(weRc); // Log to error log logging::Message::Args args1; logging::Message message1(2); args1.add(oss.str()); message1.format(args1); ml.logErrorMessage(message1); throw std::runtime_error(oss.str()); } fSessionManager.rolledback(txnid); // cout << "batch rollback releasing transaction id " << txnid.id << endl; } else if (stmt == "ROLLBACK") { std::string errorMsg(""); logging::logCommand(cpackage.get_SessionID(), txnid.id, "ROLLBACK;"); int weRc = tryToRollBackTransaction(uniqueId, txnid, fSessionID, errorMsg); if (weRc != 0) { // cout << "Rollback failed" << endl; //@Bug 4524. Don't set to readonly. Just error out. ostringstream oss; oss << "ROLLBACK failed due to: " << errorMsg; // Log to error log logging::Message::Args args2; logging::Message message2(2); args2.add(oss.str()); message2.format(args2); ml.logErrorMessage(message2); throw std::runtime_error(oss.str()); } fSessionManager.rolledback(txnid); // cout << "Rollback releasing transaction id " << txnid.id << endl; } if (!cpInvalidated) { // MCOL-5021 if (stmt == "ROLLBACK") { fDbrm->addToLBIDList(fSessionID, lbidList); } fDbrm->invalidateUncommittedExtentLBIDs(0, stmt == "ROLLBACK", &lbidList); } } } else if (stmt == "CLEANUP") { execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(cpackage.get_SessionID()); execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(cpackage.get_SessionID() | 0x80000000); } else if (stmt == "VIEWTABLELOCK") { viewTableLock(cpackage, result); } else if (stmt == "CLEARTABLELOCK") { clearTableLock(uniqueId, cpackage, result); } else if (stmt == "ANALYZEPARTITIONBLOAT") { analyzePartitionBloat(cpackage, result); } else if (stmt == "ANALYZETABLEBLOAT") { analyzeTableBloat(cpackage, result); } else if (!cpackage.get_Logging()) { BRM::TxnID txnid = fSessionManager.getTxnID(cpackage.get_SessionID()); logging::logDML(cpackage.get_SessionID(), txnid.id, cpackage.get_DMLStatement() + ";", cpackage.get_SchemaName()); SQLLogger sqlLogger(cpackage.get_DMLStatement(), DMLLoggingId, fSessionID, txnid.id); // cout << "commandpackageprocessor Logging " << cpackage.get_DMLStatement()+ ";" << endl; } else { std::string err = "Unknown command."; SUMMARY_INFO(err); throw std::runtime_error(err); } } catch (logging::IDBExcept& noTable) //@Bug 2606 catch no table found exception { cerr << "CommandPackageProcessor::processPackage: " << noTable.what() << endl; result.result = COMMAND_ERROR; result.message = Message(noTable.what()); } catch (std::exception& ex) { if (checkPPLostConnection(ex)) { result.result = PP_LOST_CONNECTION; } else { cerr << "CommandPackageProcessor::processPackage: " << ex.what() << endl; logging::Message::Args args; logging::Message message(1); args.add(ex.what()); args.add(""); args.add(""); message.format(args); result.result = COMMAND_ERROR; result.message = message; } } catch (...) { cerr << "CommandPackageProcessor::processPackage: caught unknown exception!" << endl; logging::Message::Args args; logging::Message message(1); args.add("Command Failed: "); args.add("encountered unkown exception"); args.add(""); args.add(""); message.format(args); result.result = COMMAND_ERROR; result.message = message; } if (!queRemoved) fWEClient->removeQueue(uniqueId); // release table lock if ((result.result == NO_ERROR) && ((stmt == "COMMIT") || (stmt == "ROLLBACK"))) { TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID); TablelockData::OIDTablelock tablelockMap = tablelockData->getOidTablelockMap(); bool lockReleased = true; if (!tablelockMap.empty()) { TablelockData::OIDTablelock::iterator it = tablelockMap.begin(); while (it != tablelockMap.end()) { try { lockReleased = fDbrm->releaseTableLock(it->second); // cout << "releasing tablelock " << it->second << endl; } catch (std::exception& ex) { logging::Message::Args args; logging::Message message(1); args.add(ex.what()); args.add(""); args.add(""); message.format(args); result.result = COMMAND_ERROR; result.message = message; } if (!lockReleased) // log an error { ostringstream os; os << "tablelock for table oid " << it->first << " is not found"; logging::Message::Args args; logging::Message message(1); args.add(os.str()); args.add(""); args.add(""); message.format(args); logging::LoggingID lid(21); logging::MessageLog ml(lid); ml.logErrorMessage(message); } // cout << "tablelock " << it->second << " is released" << endl; it++; } //@Bug 3557. Clean tablelock cache after commit/rollback. TablelockData::removeTablelockData(cpackage.get_SessionID()); } } VERBOSE_INFO("Finished processing Command DML Package"); // LoggingID logid( DMLLoggingId, fSessionID, txnid.id); logging::Message::Args args2; logging::Message msg1(1); if (stmt != "CLEANUP") { args2.add("End SQL statement"); msg1.format(args2); // Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg1, logid); } return result; } //------------------------------------------------------------------------------ // Process viewtablelock command; return table lock information for the // specified table. // This function closely resembles printTableLocks in viewtablelock.cpp. //------------------------------------------------------------------------------ void CommandPackageProcessor::viewTableLock(const dmlpackage::CalpontDMLPackage& cpackage, DMLPackageProcessor::DMLResult& result) { // Initialize System Catalog object used to get table name boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); CalpontSystemCatalog::TableName tableName; tableName.schema = cpackage.get_SchemaName(); tableName.table = cpackage.get_TableName(); execplan::CalpontSystemCatalog::ROPair roPair; roPair = systemCatalogPtr->tableRID(tableName); // Get list of table locks for the requested table std::vector tableLocks; tableLocks = fDbrm->getAllTableLocks(); // Make preliminary pass through the table locks in order to determine our // output column widths based on the data. Min column widths are based on // the width of the column heading (except for the 'state' column). uint64_t maxLockID = 0; uint32_t maxPID = 0; int32_t maxSessionID = 0; int32_t minSessionID = 0; int32_t maxTxnID = 0; unsigned int lockIDColumnWidth = 6; // "LockID" unsigned int ownerColumnWidth = 7; // "Process" unsigned int pidColumnWidth = 3; // "PID" unsigned int sessionIDColumnWidth = 7; // "Session" unsigned int txnIDColumnWidth = 3; // "Txn" unsigned int createTimeColumnWidth = 12; // "CreationTime" unsigned int pmColumnWidth = 7; // "DBRoots" std::vector createTimes; char cTimeBuffer[1024]; for (unsigned idx = 0; idx < tableLocks.size(); idx++) { if (tableLocks[idx].id > maxLockID) maxLockID = tableLocks[idx].id; if (tableLocks[idx].ownerName.length() > ownerColumnWidth) ownerColumnWidth = tableLocks[idx].ownerName.length(); if (tableLocks[idx].ownerPID > maxPID) maxPID = tableLocks[idx].ownerPID; if (tableLocks[idx].ownerSessionID > maxSessionID) maxSessionID = tableLocks[idx].ownerSessionID; if (tableLocks[idx].ownerSessionID < minSessionID) minSessionID = tableLocks[idx].ownerSessionID; if (tableLocks[idx].ownerTxnID > maxTxnID) maxTxnID = tableLocks[idx].ownerTxnID; ctime_r(&tableLocks[idx].creationTime, cTimeBuffer); cTimeBuffer[strlen(cTimeBuffer) - 1] = '\0'; // strip trailing '\n' std::string cTimeStr(cTimeBuffer); if (cTimeStr.length() > createTimeColumnWidth) createTimeColumnWidth = cTimeStr.length(); createTimes.push_back(cTimeStr); std::ostringstream pms; // It is dbroots now for (unsigned k = 0; k < tableLocks[idx].dbrootList.size(); k++) { if (k > 0) pms << ','; pms << tableLocks[idx].dbrootList[k]; } if (pms.str().length() > pmColumnWidth) pmColumnWidth = pms.str().length(); } ownerColumnWidth += 2; pmColumnWidth += 2; createTimeColumnWidth += 2; std::ostringstream idString; idString << maxLockID; if (idString.str().length() > lockIDColumnWidth) lockIDColumnWidth = idString.str().length(); lockIDColumnWidth += 2; std::ostringstream pidString; pidString << maxPID; if (pidString.str().length() > pidColumnWidth) pidColumnWidth = pidString.str().length(); pidColumnWidth += 2; const std::string sessionNoneStr("BulkLoad"); std::ostringstream sessionString; sessionString << maxSessionID; if (sessionString.str().length() > sessionIDColumnWidth) sessionIDColumnWidth = sessionString.str().length(); if ((minSessionID < 0) && (sessionNoneStr.length() > sessionIDColumnWidth)) sessionIDColumnWidth = sessionNoneStr.length(); sessionIDColumnWidth += 2; const std::string txnNoneStr("n/a"); std::ostringstream txnString; txnString << maxTxnID; if (txnString.str().length() > txnIDColumnWidth) txnIDColumnWidth = txnString.str().length(); txnIDColumnWidth += 2; // Make second pass through the table locks to build our result. // Keep in mind the same table could have more than 1 lock // (on different PMs), so we don't exit loop after "first" match. bool found = false; ostringstream os; for (unsigned idx = 0; idx < tableLocks.size(); idx++) { if (roPair.objnum == (CalpontSystemCatalog::OID)tableLocks[idx].tableOID) { std::ostringstream pms; for (unsigned k = 0; k < tableLocks[idx].dbrootList.size(); k++) { if (k > 0) pms << ','; pms << tableLocks[idx].dbrootList[k]; } if (found) // write newline between lines { os << endl; } else // write the column headers before the first entry { os.setf(ios::left, ios::adjustfield); os << setw(lockIDColumnWidth) << "LockID" << setw(ownerColumnWidth) << "Process" << setw(pidColumnWidth) << "PID" << setw(sessionIDColumnWidth) << "Session" << setw(txnIDColumnWidth) << "Txn" << setw(createTimeColumnWidth) << "CreationTime" << setw(9) << "State" << setw(pmColumnWidth) << "DBRoots" << endl; } os << " " << setw(lockIDColumnWidth) << tableLocks[idx].id << setw(ownerColumnWidth) << tableLocks[idx].ownerName << setw(pidColumnWidth) << tableLocks[idx].ownerPID; // Log session ID, or "BulkLoad" if session is -1 if (tableLocks[idx].ownerSessionID < 0) os << setw(sessionIDColumnWidth) << sessionNoneStr; else os << setw(sessionIDColumnWidth) << tableLocks[idx].ownerSessionID; // Log txn ID, or "n/a" if txn is -1 if (tableLocks[idx].ownerTxnID < 0) os << setw(txnIDColumnWidth) << txnNoneStr; else os << setw(txnIDColumnWidth) << tableLocks[idx].ownerTxnID; os << setw(createTimeColumnWidth) << createTimes[idx] << setw(9) << ((tableLocks[idx].state == BRM::LOADING) ? "LOADING" : "CLEANUP") << setw(pmColumnWidth) << pms.str(); found = true; } // end of displaying a table lock match } // end of loop through all table locks if (!found) { os << " Table " << tableName.schema << "." << tableName.table << " is not locked by any process."; } result.tableLockInfo = os.str(); } //------------------------------------------------------------------------------ // Process cleartablelock command; execute bulk rollback and release table lock // for the specified table. //------------------------------------------------------------------------------ void CommandPackageProcessor::clearTableLock(uint64_t uniqueId, const dmlpackage::CalpontDMLPackage& cpackage, DMLPackageProcessor::DMLResult& result) { CalpontSystemCatalog::TableName tableName; tableName.schema = cpackage.get_SchemaName(); tableName.table = cpackage.get_TableName(); // Get the Table lock ID that is passed in the SQL statement attribute. // This is a kludge we may want to consider changing. Could add a table // lock ID attribute to the CalpontDMLPackage object. std::istringstream lockIDString(cpackage.get_SQLStatement()); uint64_t tableLockID; lockIDString >> tableLockID; //----------------------------------------------------- start of syslog code // // Log initiation of cleartablelock to syslog // const std::string APPLNAME("cleartablelock SQL cmd"); const int SUBSYSTEM_ID = 21; // dmlpackageproc const int INIT_MSG_NUM = logging::M0088; logging::Message::Args msgArgs; logging::Message logMsg1(INIT_MSG_NUM); msgArgs.add(APPLNAME); msgArgs.add(tableName.toString()); msgArgs.add(tableLockID); logMsg1.format(msgArgs); logging::LoggingID lid(SUBSYSTEM_ID); logging::MessageLog ml(lid); ml.logInfoMessage(logMsg1); //------------------------------------------------------- end of syslog code boost::shared_ptr bsIn; messageqcpp::ByteStream bsOut; bool lockGrabbed = false; bool bErrFlag = false; bool bRemoveMetaErrFlag = false; std::ostringstream combinedErrMsg; try { // Make sure BRM is in READ-WRITE state before starting int brmRc = fDbrm->isReadWrite(); if (brmRc != BRM::ERR_OK) { std::string brmErrMsg; BRM::errString(brmRc, brmErrMsg); std::ostringstream oss; oss << "Failed BRM status check: " << brmErrMsg; throw std::runtime_error(oss.str()); } BRM::TableLockInfo lockInfo; establishTableLockToClear(tableLockID, lockInfo); lockGrabbed = true; std::set pmSet; // Construct relevant list of PMs based on the DBRoots associated // with the tableLock. for (unsigned int k = 0; k < lockInfo.dbrootList.size(); k++) { if (!oamcache()->isOffline(lockInfo.dbrootList[k])) { int pm = oamcache()->getOwnerPM(lockInfo.dbrootList[k]); pmSet.insert(pm); } else { std::ostringstream oss; oss << "DBRoot " << lockInfo.dbrootList[k] << " does not map to a PM. Cannot perform rollback"; throw std::runtime_error(oss.str()); } } std::vector pmList; for (std::set::const_iterator setIter = pmSet.begin(); setIter != pmSet.end(); ++setIter) { pmList.push_back(*setIter); } std::cout << "cleartablelock rollback for table lock " << tableLockID << " being forwarded to PM(s): "; for (unsigned int k = 0; k < pmList.size(); k++) { if (k > 0) std::cout << ", "; std::cout << pmList[k]; } std::cout << std::endl; // Perform bulk rollback if state is in LOADING state if (lockInfo.state == BRM::LOADING) { fWEClient->addQueue(uniqueId); //------------------------------------------------------------------ // Send rollback msg to the writeengine server for every PM. // We send to each PM, instead of just to PMs in the tablelock's // PM list, just in case a DBRoot has been moved to a different PM. //------------------------------------------------------------------ // std::cout << "cleartablelock rollback: tableLock-" << tableLockID << // ": uniqueID-" << uniqueId << // ": oid-" << lockInfo.tableOID << // "; name-" << tableName.toString() << // "; app-" << APPLNAME << std::endl; bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK; bsOut << uniqueId; bsOut << tableLockID; bsOut << lockInfo.tableOID; bsOut << tableName.toString(); bsOut << APPLNAME; for (unsigned j = 0; j < pmList.size(); j++) { fWEClient->write(bsOut, pmList[j]); } // Wait for "all" the responses, and accumulate any/all errors unsigned int pmMsgCnt = 0; while (pmMsgCnt < pmList.size()) { std::string rollbackErrMsg; bsIn.reset(new messageqcpp::ByteStream()); fWEClient->read(uniqueId, bsIn); if (bsIn->length() == 0) { bRemoveMetaErrFlag = true; if (combinedErrMsg.str().length() > 0) combinedErrMsg << std::endl; combinedErrMsg << "Network error, PM rollback; "; } else { messageqcpp::ByteStream::byte rc; uint16_t pmNum; *bsIn >> rc; *bsIn >> rollbackErrMsg; *bsIn >> pmNum; // std::cout << "cleartablelock rollback response from PM"<< // pmNum << "; rc-" << (int)rc << // "; errMsg: {" << rollbackErrMsg << '}' << std::endl; if (rc != 0) { bRemoveMetaErrFlag = true; if (combinedErrMsg.str().empty()) combinedErrMsg << "Rollback error; "; else combinedErrMsg << std::endl; combinedErrMsg << "[PM" << pmNum << "] " << rollbackErrMsg; } } pmMsgCnt++; } // end of while loop to process all responses to bulk rollback // If no errors so far, then change state to CLEANUP state. // We ignore the return stateChange flag. if (!bErrFlag) { fDbrm->changeState(tableLockID, BRM::CLEANUP); } } // end of (lockInfo.state == BRM::LOADING) // If no errors so far, then: // 1. delete meta data backup rollback files // 2. finally release table lock if (!bErrFlag) { bsOut.reset(); //------------------------------------------------------------------ // Delete meta data backup rollback files //------------------------------------------------------------------ // std::cout << "cleartablelock cleanup: uniqueID-" << uniqueId << // ": oid-" << lockInfo.tableOID << std::endl; bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK_CLEANUP; bsOut << uniqueId; bsOut << lockInfo.tableOID; for (unsigned j = 0; j < pmList.size(); j++) { fWEClient->write(bsOut, pmList[j]); } // Wait for "all" the responses, and accumulate any/all errors unsigned int pmMsgCnt = 0; while (pmMsgCnt < pmList.size()) { std::string fileDeleteErrMsg; bsIn.reset(new messageqcpp::ByteStream()); fWEClient->read(uniqueId, bsIn); if (bsIn->length() == 0) { bRemoveMetaErrFlag = true; if (combinedErrMsg.str().length() > 0) combinedErrMsg << std::endl; combinedErrMsg << "Network error, PM rollback cleanup; "; } else { messageqcpp::ByteStream::byte rc; uint16_t pmNum; *bsIn >> rc; *bsIn >> fileDeleteErrMsg; *bsIn >> pmNum; // std::cout << "cleartablelock cleanup response from PM" << // pmNum << "; rc-" << (int)rc << // "; errMsg: {" << fileDeleteErrMsg << '}' << std::endl; if (rc != 0) { bRemoveMetaErrFlag = true; if (combinedErrMsg.str().empty()) combinedErrMsg << "Cleanup error; "; else combinedErrMsg << std::endl; combinedErrMsg << "[PM" << pmNum << "] " << fileDeleteErrMsg; } } pmMsgCnt++; } // end of while loop to process all responses to rollback cleanup // We ignore return release flag from releaseTableLock(). if (!bErrFlag) { fDbrm->releaseTableLock(tableLockID); } } } catch (std::exception& ex) { bErrFlag = true; combinedErrMsg << ex.what(); } if (!bErrFlag) { std::ostringstream oss; oss << "Table lock " << tableLockID << " for table " << tableName.toString() << " is cleared."; //@Bug 4517. Release tablelock if remove meta files failed. if (bRemoveMetaErrFlag) { oss << " Warning: " << combinedErrMsg.str(); } result.tableLockInfo = oss.str(); } else { std::ostringstream oss; oss << "Table lock " << tableLockID << " for table " << tableName.toString() << " cannot be cleared. " << combinedErrMsg.str(); result.tableLockInfo = oss.str(); } // Remove tableLockID out of the active cleartableLock command list if (lockGrabbed) { boost::mutex::scoped_lock lock(fActiveClearTableLockCmdMutex); fActiveClearTableLockCmds.erase(tableLockID); } //----------------------------------------------------- start of syslog code // // Log completion of cleartablelock to syslog // const int COMP_MSG_NUM = logging::M0089; msgArgs.reset(); logging::Message logMsg2(COMP_MSG_NUM); msgArgs.add(APPLNAME); msgArgs.add(tableName.toString()); msgArgs.add(tableLockID); std::string finalStatus; if (!bErrFlag) { finalStatus = "Completed successfully"; } else { finalStatus = "Encountered errors: "; finalStatus += combinedErrMsg.str(); } msgArgs.add(finalStatus); logMsg2.format(msgArgs); ml.logInfoMessage(logMsg2); //------------------------------------------------------- end of syslog code } //------------------------------------------------------------------------------ // Get/return the current tablelock info for tableLockID, and reassign the table // lock to ourselves if we can. If the lock is still in use by another process // or by another DML cleartablelock thread, then we error out with exception. //------------------------------------------------------------------------------ void CommandPackageProcessor::establishTableLockToClear(uint64_t tableLockID, BRM::TableLockInfo& lockInfo) { boost::mutex::scoped_lock lock(fActiveClearTableLockCmdMutex); // Get current table lock info bool getLockInfo = fDbrm->getTableLockInfo(tableLockID, &lockInfo); if (!getLockInfo) { throw std::runtime_error(std::string("Lock does not exist.")); } std::string processName("DMLProc clearTableLock"); uint32_t processID = ::getpid(); // See if another thread is executing a cleartablelock cmd for this table if ((lockInfo.ownerName == processName) && (lockInfo.ownerPID == processID)) { std::set::const_iterator it = fActiveClearTableLockCmds.find(tableLockID); if (it != fActiveClearTableLockCmds.end()) { throw std::runtime_error( std::string("Lock in use. " "DML is executing another cleartablelock MySQL cmd.")); } } else { // Take over ownership of stale lock. // Use "DMLProc clearTableLock" as process name to differentiate // from a DMLProc lock used for inserts, updates, etc. int32_t sessionID = fSessionID; int32_t txnid = -1; bool ownerChanged = fDbrm->changeOwner(tableLockID, processName, processID, sessionID, txnid); if (!ownerChanged) { throw std::runtime_error(std::string("Unable to grab lock; lock not found or still in use.")); } } // Add this cleartablelock command to the list of active cleartablelock cmds fActiveClearTableLockCmds.insert(tableLockID); } void CommandPackageProcessor::analyzePartitionBloat(const dmlpackage::CalpontDMLPackage& cpackage, DMLPackageProcessor::DMLResult& result) { boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); CalpontSystemCatalog::TableName tableName; tableName.schema = cpackage.get_SchemaName(); tableName.table = cpackage.get_TableName(); std::string partitionStr = cpackage.get_SQLStatement(); std::ostringstream analysisResults; bool bErrFlag = false; std::string errorMsg; try { // Get AUX column OID for the table CalpontSystemCatalog::OID auxColumnOid = systemCatalogPtr->tableAUXColumnOID(tableName); if (auxColumnOid <= 3000) { analysisResults << "Table " << tableName.toString() << " does not have an AUX column for bloat analysis."; result.bloatAnalysis = analysisResults.str(); return; } // SELECT COUNT(aux) AS count_aux, COUNT(CASE aux WHEN 1 THEN 1 END) AS count_aux_deleted FROM schema.table WHERE idbPartition(aux) = partitionStr; CalpontSelectExecutionPlan csep; CalpontSelectExecutionPlan::ReturnedColumnList returnedColumnList; CalpontSelectExecutionPlan::FilterTokenList filterTokenList; CalpontSelectExecutionPlan::ColumnMap colMap; // Create the base SimpleColumn for 'aux' SimpleColumn* auxCol = new SimpleColumn(tableName.schema, tableName.table, "aux", fSessionID); auxCol->alias("aux"); CalpontSystemCatalog::ColType auxColType; auxColType.colDataType = CalpontSystemCatalog::INT; auxColType.colWidth = 4; auxCol->resultType(auxColType); // Create the COUNT(aux) AS count_aux aggregate column AggregateColumn* countAuxCol = new AggregateColumn(fSessionID); countAuxCol->alias("count_aux"); countAuxCol->aggOp(AggregateColumn::COUNT); countAuxCol->functionName("count"); countAuxCol->expressionId(1); CalpontSystemCatalog::ColType countAuxColType; countAuxColType.colDataType = CalpontSystemCatalog::INT; countAuxColType.colWidth = 4; countAuxCol->resultType(countAuxColType); SRCP auxSRCP(auxCol->clone()); countAuxCol->aggParms().push_back(auxSRCP); // Create the CASE aux WHEN 1 THEN 1 END expression FunctionColumn* caseCol = new FunctionColumn(); caseCol->functionName("case_simple"); // Use case_simple for expression comparison caseCol->sessionID(fSessionID); caseCol->expressionId(2); caseCol->alias("case_aux_deleted"); // Set the result type for the CASE expression CalpontSystemCatalog::ColType caseColType; caseColType.colDataType = CalpontSystemCatalog::INT; caseColType.colWidth = 4; caseCol->resultType(caseColType); // Create the WHEN value: 1 ConstantColumn* whenValue = new ConstantColumn("1", ConstantColumn::NUM); whenValue->sessionID(fSessionID); // Create the THEN result: 1 ConstantColumn* thenResult = new ConstantColumn("1", ConstantColumn::NUM); thenResult->sessionID(fSessionID); // Build the function parameters for CASE funcexp::FunctionParm funcParms; SPTP sptp; // Add the CASE expression (aux column) sptp.reset(new ParseTree(auxCol->clone())); funcParms.push_back(sptp); // Add the WHEN value sptp.reset(new ParseTree(whenValue)); funcParms.push_back(sptp); // Add the THEN result sptp.reset(new ParseTree(thenResult)); funcParms.push_back(sptp); // Set the function parameters caseCol->functionParms(funcParms); // Create the COUNT(CASE aux WHEN 1 THEN 1 END) AS count_aux_deleted aggregate column AggregateColumn* countCaseCol = new AggregateColumn(fSessionID); countCaseCol->alias("count_aux_deleted"); countCaseCol->aggOp(AggregateColumn::COUNT); countCaseCol->functionName("count"); countCaseCol->expressionId(3); CalpontSystemCatalog::ColType countCaseColType; countCaseColType.colDataType = CalpontSystemCatalog::INT; countCaseColType.colWidth = 4; countCaseCol->resultType(countCaseColType); SRCP caseSRCP(caseCol->clone()); countCaseCol->aggParms().push_back(caseSRCP); // Add the base 'aux' column to ColumnMap (used for reference resolution) // Note: The aggregate results do NOT go in ColumnMap // Add "aux" multiple times since it's referenced in COUNT(aux), CASE expression, and idbPartition(aux) colMap.insert(CMVT_(tableName.schema + "." + tableName.table + "." + "aux", auxSRCP)); auxSRCP.reset(auxCol->clone()); colMap.insert(CMVT_(tableName.schema + "." + tableName.table + "." + "aux", auxSRCP)); auxSRCP.reset(auxCol->clone()); colMap.insert(CMVT_(tableName.schema + "." + tableName.table + "." + "aux", auxSRCP)); // Add both COUNT columns to ReturnedColumnList (what gets returned by SELECT) SRCP countSRCP(countAuxCol->clone()); returnedColumnList.push_back(countSRCP); SRCP countCaseSRCP(countCaseCol->clone()); returnedColumnList.push_back(countCaseSRCP); csep.columnMapNonStatic(colMap); csep.returnedCols(returnedColumnList); // Define the filter using FunctionColumn for idbPartition() const SOP opeq(new Operator("=")); // Create a FunctionColumn for idbPartition(aux) // parms: psueducolumn dbroot, segmentdir, segment FunctionColumn* fc = new FunctionColumn(); fc->functionName("idbpartition"); fc->sessionID(fSessionID); fc->expressionId(0); funcexp::FunctionParm parms; PseudoColumn* dbroot = new PseudoColumn(*auxCol, PSEUDO_DBROOT, fSessionID); sptp.reset(new ParseTree(dbroot)); parms.push_back(sptp); PseudoColumn* pp = new PseudoColumn(*auxCol, PSEUDO_SEGMENTDIR, fSessionID); sptp.reset(new ParseTree(pp)); parms.push_back(sptp); PseudoColumn* seg = new PseudoColumn(*auxCol, PSEUDO_SEGMENT, fSessionID); sptp.reset(new ParseTree(seg)); parms.push_back(sptp); fc->functionParms(parms); CalpontSystemCatalog::ColType resultType; resultType.colDataType = CalpontSystemCatalog::VARCHAR; resultType.colWidth = 256; fc->resultType(resultType); funcexp::Func_idbpartition* idbpartition = new funcexp::Func_idbpartition(); fc->operationType(idbpartition->operationType(parms, fc->resultType())); delete idbpartition; // Set up the filter ConstantColumn* partitionConstCol = new ConstantColumn(partitionStr, ConstantColumn::LITERAL); SimpleFilter* f1 = new SimpleFilter(opeq, fc, partitionConstCol); filterTokenList.push_back(f1); csep.filterTokenList(filterTokenList); // Set the session ID, transaction ID and version Id BRM::QueryContext verID; verID = fSessionManager.verID(); csep.verID(verID); csep.sessionID(fSessionID); BRM::TxnID txnID; txnID = fSessionManager.getTxnID(fSessionID); csep.txnID(txnID.id); // Set the table list CalpontSelectExecutionPlan::TableList tablelist; tablelist.push_back(make_aliastable(tableName.schema, tableName.table, "")); csep.tableList(tablelist); csep.schemaName(tableName.schema, 0); csep.tableName(tableName.table, 0); // Send CSEP to ExeMgr CalpontSystemCatalog::NJLSysDataList sysDataList; systemCatalogPtr->getQueryData(csep, sysDataList); int64_t countAux = 0; int64_t countAuxDeleted = 0; for (auto it = sysDataList.begin(); it != sysDataList.end(); it++) { if ((*it)->ColumnOID() == static_cast(countAuxCol->expressionId())) { countAux = (*it)->GetData(0); } else if ((*it)->ColumnOID() == static_cast(countCaseCol->expressionId())) { countAuxDeleted = (*it)->GetData(0); } } analysisResults << std::fixed << std::setprecision(2) << (static_cast(countAuxDeleted) / countAux) * 100 << "%"; result.bloatAnalysis = analysisResults.str(); } catch (std::exception& ex) { bErrFlag = true; errorMsg = ex.what(); } if (bErrFlag) { std::ostringstream oss; oss << "Partition bloat analysis failed for table " << tableName.toString() << ", partition " << partitionStr << ": " << errorMsg; result.bloatAnalysis = oss.str(); } } void CommandPackageProcessor::analyzeTableBloat(const dmlpackage::CalpontDMLPackage& cpackage, DMLPackageProcessor::DMLResult& result) { boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); CalpontSystemCatalog::TableName tableName; tableName.schema = cpackage.get_SchemaName(); tableName.table = cpackage.get_TableName(); std::ostringstream analysisResults; bool bErrFlag = false; std::string errorMsg; try { // Get AUX column OID for the table CalpontSystemCatalog::OID auxColumnOid = systemCatalogPtr->tableAUXColumnOID(tableName); if (auxColumnOid <= 3000) { analysisResults << "Table " << tableName.toString() << " does not have an AUX column for bloat analysis."; result.bloatAnalysis = analysisResults.str(); return; } // SELECT idbPartition(aux), COUNT(aux) AS count_aux, COUNT(CASE aux WHEN 1 THEN 1 END) AS count_aux_deleted FROM test GROUP BY idbPartition(aux); CalpontSelectExecutionPlan csep; CalpontSelectExecutionPlan::ReturnedColumnList returnedColumnList; CalpontSelectExecutionPlan::ColumnMap colMap; // Create the base SimpleColumn for 'aux' SimpleColumn* auxCol = new SimpleColumn(tableName.schema, tableName.table, "aux", fSessionID); auxCol->alias("aux"); auxCol->setOID(); CalpontSystemCatalog::ColType auxColType; auxColType.colDataType = CalpontSystemCatalog::INT; auxColType.colWidth = 4; auxCol->resultType(auxColType); // Create the COUNT(aux) AS count_aux aggregate column AggregateColumn* countAuxCol = new AggregateColumn(fSessionID); countAuxCol->alias("count_aux"); countAuxCol->aggOp(AggregateColumn::COUNT); countAuxCol->functionName("count"); countAuxCol->expressionId(1); CalpontSystemCatalog::ColType countAuxColType; countAuxColType.colDataType = CalpontSystemCatalog::INT; countAuxColType.colWidth = 4; countAuxCol->resultType(countAuxColType); SRCP auxSRCP(auxCol->clone()); countAuxCol->aggParms().push_back(auxSRCP); // Create the CASE aux WHEN 1 THEN 1 END expression FunctionColumn* caseCol = new FunctionColumn(); caseCol->functionName("case_simple"); // Use case_simple for expression comparison caseCol->sessionID(fSessionID); caseCol->expressionId(2); caseCol->alias("case_aux_deleted"); // Set the result type for the CASE expression CalpontSystemCatalog::ColType caseColType; caseColType.colDataType = CalpontSystemCatalog::INT; caseColType.colWidth = 4; caseCol->resultType(caseColType); // Create the WHEN value: 1 ConstantColumn* whenValue = new ConstantColumn("1", ConstantColumn::NUM); whenValue->sessionID(fSessionID); // Create the THEN result: 1 ConstantColumn* thenResult = new ConstantColumn("1", ConstantColumn::NUM); thenResult->sessionID(fSessionID); // Build the function parameters for CASE funcexp::FunctionParm funcParms; SPTP sptp; // Add the CASE expression (aux column) sptp.reset(new ParseTree(auxCol->clone())); funcParms.push_back(sptp); // Add the WHEN value sptp.reset(new ParseTree(whenValue)); funcParms.push_back(sptp); // Add the THEN result sptp.reset(new ParseTree(thenResult)); funcParms.push_back(sptp); // Set the function parameters caseCol->functionParms(funcParms); // Create the COUNT(CASE aux WHEN 1 THEN 1 END) AS count_aux_deleted aggregate column AggregateColumn* countCaseCol = new AggregateColumn(fSessionID); countCaseCol->alias("count_aux_deleted"); countCaseCol->aggOp(AggregateColumn::COUNT); countCaseCol->functionName("count"); countCaseCol->expressionId(3); CalpontSystemCatalog::ColType countCaseColType; countCaseColType.colDataType = CalpontSystemCatalog::INT; countCaseColType.colWidth = 4; countCaseCol->resultType(countCaseColType); SRCP caseSRCP(caseCol->clone()); countCaseCol->aggParms().push_back(caseSRCP); // Add the base 'aux' column to ColumnMap (used for reference resolution) // Note: The aggregate results do NOT go in ColumnMap // Add "aux" multiple times since it's referenced in COUNT(aux), CASE expression, and idbPartition(aux) colMap.insert(CMVT_(tableName.schema + "." + tableName.table + "." + "aux", auxSRCP)); auxSRCP.reset(auxCol->clone()); colMap.insert(CMVT_(tableName.schema + "." + tableName.table + "." + "aux", auxSRCP)); auxSRCP.reset(auxCol->clone()); colMap.insert(CMVT_(tableName.schema + "." + tableName.table + "." + "aux", auxSRCP)); // Add both COUNT columns to ReturnedColumnList (what gets returned by SELECT) SRCP countSRCP(countAuxCol->clone()); returnedColumnList.push_back(countSRCP); SRCP countCaseSRCP(countCaseCol->clone()); returnedColumnList.push_back(countCaseSRCP); // Create a FunctionColumn for idbPartition(aux) // parms: psueducolumn dbroot, segmentdir, segment FunctionColumn* fc = new FunctionColumn(); fc->functionName("idbpartition"); fc->alias("idbPartition(aux)"); fc->sessionID(fSessionID); fc->expressionId(0); funcexp::FunctionParm parms; PseudoColumn* dbroot = new PseudoColumn(*auxCol, PSEUDO_DBROOT, fSessionID); sptp.reset(new ParseTree(dbroot)); parms.push_back(sptp); PseudoColumn* pp = new PseudoColumn(*auxCol, PSEUDO_SEGMENTDIR, fSessionID); sptp.reset(new ParseTree(pp)); parms.push_back(sptp); PseudoColumn* seg = new PseudoColumn(*auxCol, PSEUDO_SEGMENT, fSessionID); sptp.reset(new ParseTree(seg)); parms.push_back(sptp); fc->functionParms(parms); CalpontSystemCatalog::ColType resultType; resultType.colDataType = CalpontSystemCatalog::VARCHAR; resultType.colWidth = 256; fc->resultType(resultType); funcexp::Func_idbpartition* idbpartition = new funcexp::Func_idbpartition(); fc->operationType(idbpartition->operationType(parms, fc->resultType())); delete idbpartition; SRCP fcSRCP(fc->clone()); returnedColumnList.push_back(fcSRCP); csep.columnMapNonStatic(colMap); csep.returnedCols(returnedColumnList); // Set the group by CalpontSelectExecutionPlan::GroupByColumnList groupByList; groupByList.push_back(fcSRCP); csep.groupByCols(groupByList); // Set the session ID, transaction ID and version Id BRM::QueryContext verID; verID = fSessionManager.verID(); csep.verID(verID); csep.sessionID(fSessionID); BRM::TxnID txnID; txnID = fSessionManager.getTxnID(fSessionID); csep.txnID(txnID.id); // Set the table list CalpontSelectExecutionPlan::TableList tablelist; tablelist.push_back(make_aliastable(tableName.schema, tableName.table, "")); csep.tableList(tablelist); csep.schemaName(tableName.schema, 0); csep.tableName(tableName.table, 0); // Send CSEP to ExeMgr CalpontSystemCatalog::NJLSysDataList sysDataList; systemCatalogPtr->getQueryData(csep, sysDataList); size_t countAuxIndex = static_cast(-1); size_t countAuxDeletedIndex = static_cast(-1); size_t idbPartitionIndex = static_cast(-1); for (size_t i = 0; i < sysDataList.sysDataVec.size(); ++i) { if (sysDataList.sysDataVec[i]->ColumnOID() == static_cast(countAuxCol->expressionId())) { countAuxIndex = i; } else if (sysDataList.sysDataVec[i]->ColumnOID() == static_cast(countCaseCol->expressionId())) { countAuxDeletedIndex = i; } else if (sysDataList.sysDataVec[i]->ColumnOID() == static_cast(fc->expressionId())) { idbPartitionIndex = i; } } if (countAuxIndex != static_cast(-1) && countAuxDeletedIndex != static_cast(-1) && idbPartitionIndex != static_cast(-1)) { for (int i = 0; i < sysDataList.sysDataVec[idbPartitionIndex]->dataCount(); i++) { int64_t countAux = sysDataList.sysDataVec[countAuxIndex]->GetData(i); int64_t countAuxDeleted = sysDataList.sysDataVec[countAuxDeletedIndex]->GetData(i); string idbPartition = sysDataList.sysDataVec[idbPartitionIndex]->GetStringData(i).toString(); if (i > 0) { analysisResults << ", "; } analysisResults << idbPartition << ": " << std::fixed << std::setprecision(2) << (static_cast(countAuxDeleted) / countAux) * 100 << "%"; } } result.bloatAnalysis = analysisResults.str(); } catch (std::exception& ex) { bErrFlag = true; errorMsg = ex.what(); } if (bErrFlag) { std::ostringstream oss; oss << "Table bloat analysis failed for table " << tableName.toString() << ": " << errorMsg; result.bloatAnalysis = oss.str(); }} } // namespace dmlpackageprocessor