From f2d8a33ee5ee299fddebc78cfd4c16501bee348b Mon Sep 17 00:00:00 2001 From: David Hall Date: Thu, 9 May 2019 12:42:18 -0500 Subject: [PATCH 1/3] MCOL-3296 CTRL+C should not double remove from dequeue. --- dmlproc/dmlproc.cpp | 12 +++++++---- dmlproc/dmlprocessor.cpp | 36 +++++++------------------------ dmlproc/dmlprocessor.h | 38 +++++++++++++++++++++++++++++---- utils/threadpool/threadpool.cpp | 20 +++++++++++++++++ 4 files changed, 70 insertions(+), 36 deletions(-) diff --git a/dmlproc/dmlproc.cpp b/dmlproc/dmlproc.cpp index 106977824..990015ac8 100644 --- a/dmlproc/dmlproc.cpp +++ b/dmlproc/dmlproc.cpp @@ -86,6 +86,8 @@ using namespace joblist; namespace fs = boost::filesystem; +ThreadPool DMLServer::fDmlPackagepool(10, 0); + namespace { DistributedEngineComm *Dec; @@ -551,16 +553,16 @@ int main(int argc, char* argv[]) int temp; int serverThreads = 10; - int serverQueueSize = 50; + int serverQueueSize = 0; const string DMLProc("DMLProc"); temp = toInt(cf->getConfig(DMLProc, "ServerThreads")); if (temp > 0) serverThreads = temp; - temp = toInt(cf->getConfig(DMLProc, "ServerQueueSize")); - if (temp > 0) - serverQueueSize = temp; +// temp = toInt(cf->getConfig(DMLProc, "ServerQueueSize")); +// if (temp > 0) +// serverQueueSize = temp; bool rootUser = true; @@ -599,6 +601,8 @@ int main(int argc, char* argv[]) { JobStep::jobstepThreadPool.setDebug(true); JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool)); + DMLServer::fDmlPackagepool.setDebug(true); + DMLServer::fDmlPackagepool.invoke(ThreadPoolMonitor(&DMLServer::fDmlPackagepool)); } //set ACTIVE state diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index 3b3a5cffc..ca0707989 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -413,7 +413,6 @@ int PackageHandler::releaseTableAccess() boost::lock_guard lock(tableOidMutex); if (fTableOid == 0 || (it=tableOidMap.find(fTableOid)) == tableOidMap.end()) { - // This will happen for DML_COMMAND, as we never got the tableoid or called synchTableAccess return 2; // For now, return codes are not used } PackageHandler::tableAccessQueue_t& tableOidQueue = it->second; @@ -433,7 +432,8 @@ int PackageHandler::releaseTableAccess() } else { - tableOidQueue.pop(); // Get off the waiting list. + if (!tableOidQueue.empty()) + tableOidQueue.pop(); // Get off the waiting list. if (tableOidQueue.empty()) { // remove the queue from the map. @@ -447,7 +447,7 @@ int PackageHandler::releaseTableAccess() int PackageHandler::forceReleaseTableAccess() { - // By removing the tcnid from the queue, the logic after the wait in + // By removing the txnid from the queue, the logic after the wait in // synchTableAccess() will release the thread and clean up if needed. std::map::iterator it; boost::lock_guard lock(tableOidMutex); @@ -481,7 +481,8 @@ void PackageHandler::run() std::string stmt; unsigned DMLLoggingId = 21; oam::OamCache* oamCache = oam::OamCache::makeOamCache(); - + SynchTable synchTable; + try { switch( fPackageType ) @@ -508,7 +509,7 @@ void PackageHandler::run() CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } - synchTableAccess(); // Blocks if another DML thread is using this fTableOid + synchTable.setPackage(this); // Blocks if another DML thread is using this fTableOid } #endif QueryTeleStats qts; @@ -870,7 +871,7 @@ void PackageHandler::run() CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } - synchTableAccess(); // Blocks if another DML thread is using this fTableOid + synchTable.setPackage(this); // Blocks if another DML thread is using this fTableOid } #endif updatePkg->set_TxnID(fTxnid); @@ -926,7 +927,7 @@ void PackageHandler::run() CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } - synchTableAccess(); // Blocks if another DML thread is using this fTableOid + synchTable.setPackage(this); // Blocks if another DML thread is using this fTableOid } #endif deletePkg->set_TxnID(fTxnid); @@ -989,13 +990,6 @@ void PackageHandler::run() } break; } -#ifdef MCOL_140 - if (fConcurrentSupport) - { - // MCOL-140 We're done. release the next waiting txn for this fTableOid - releaseTableAccess(); - } -#endif //Log errors if ( (result.result != dmlpackageprocessor::DMLPackageProcessor::NO_ERROR) && (result.result != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) @@ -1017,13 +1011,6 @@ void PackageHandler::run() } catch(std::exception& e) { -#ifdef MCOL_140 - if (fConcurrentSupport) - { - // MCOL-140 We're done. release the next waiting txn for this fTableOid - releaseTableAccess(); - } -#endif cout << "dmlprocessor.cpp PackageHandler::run() package type(" << fPackageType << ") exception: " << e.what() << endl; logging::LoggingID lid(21); @@ -1040,13 +1027,6 @@ void PackageHandler::run() } catch(...) { -#ifdef MCOL_140 - if (fConcurrentSupport) - { - // MCOL-140 We're done. release the next waiting txn for this fTableOid - releaseTableAccess(); - } -#endif logging::LoggingID lid(21); logging::MessageLog ml(lid); logging::Message::Args args; diff --git a/dmlproc/dmlprocessor.h b/dmlproc/dmlprocessor.h index 2faf0cfb2..02f993df6 100644 --- a/dmlproc/dmlprocessor.h +++ b/dmlproc/dmlprocessor.h @@ -120,15 +120,16 @@ private: DMLServer(const DMLServer& rhs); DMLServer& operator=(const DMLServer& rhs); - /** @brief the thread pool for processing dml packages - */ - threadpool::ThreadPool fDmlPackagepool; - int fPackageMaxThreads; /** @brief max number of threads to process dml packages */ int fPackageWorkQueueSize; /** @brief max number of packages waiting in the work queue */ boost::scoped_ptr fMqServer; BRM::DBRM* fDbrm; + +public: + /** @brief the thread pool for processing dml packages + */ + static threadpool::ThreadPool fDmlPackagepool; }; /** @brief Thread to process a single dml package. @@ -184,6 +185,35 @@ private: static boost::mutex tableOidMutex; public: static int clearTableAccess(); + + // MCOL-3296 Add a class to call synchTableAccess on creation and + // releaseTableAccess on destuction for exception safeness. + class SynchTable + { + public: + SynchTable() : fphp(NULL) {}; + SynchTable(PackageHandler* php) + { + setPackage(php); + } + ~SynchTable() + { + if (fphp) + fphp->releaseTableAccess(); + } + bool setPackage(PackageHandler* php) + { + if (fphp) + fphp->releaseTableAccess(); + fphp = php; + if (fphp) + fphp->synchTableAccess(); + return true; + } + private: + PackageHandler* fphp; + }; + }; /** @brief processes dml packages as they arrive diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index 602f868c4..892a14ff2 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -392,6 +392,24 @@ void ThreadPool::beginThread() throw() --fIssued; --waitingFunctorsSize; fWaitingFunctors.erase(todo); + if (fDebug) + { + ostringstream oss; + oss << "Ending thread " << " on " << fName + << " max " << fMaxThreads + << " queue " << fQueueSize + << " threads " << fThreadCount + << " running " << fIssued + << " waiting " << (waitingFunctorsSize - fIssued) + << " total " << waitingFunctorsSize; + logging::Message::Args args; + logging::Message message(0); + args.add(oss.str()); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + } } timeout = boost::get_system_time()+boost::posix_time::minutes(10); @@ -515,6 +533,8 @@ void ThreadPoolMonitor::operator()() << setw(4) << tv.tv_usec/100 << " Name " << fPool->fName << " Active " << fPool->waitingFunctorsSize + << " running " << fPool->fIssued + << " waiting " << (fPool->waitingFunctorsSize - fPool->fIssued) << " ThdCnt " << fPool->fThreadCount << " Max " << fPool->fMaxThreads << " Q " << fPool->fQueueSize From f98097673f72e7fbeae70fc1fd451167828a095a Mon Sep 17 00:00:00 2001 From: David Hall Date: Fri, 10 May 2019 10:04:57 -0500 Subject: [PATCH 2/3] MCOL-3296 Add logging when a dml is blocked because of another dml on the same table --- dmlproc/dmlprocessor.cpp | 53 ++++++++++++++++++++++++++++------------ dmlproc/dmlprocessor.h | 12 ++++----- 2 files changed, 44 insertions(+), 21 deletions(-) diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index ca0707989..b2ef6e6f6 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -334,7 +334,7 @@ PackageHandler::~PackageHandler() // Rollback will most likely be next. // // A tranasaction for one fTableOid is not blocked by a txn for a different fTableOid. -int PackageHandler::synchTableAccess() +int PackageHandler::synchTableAccess(dmlpackage::CalpontDMLPackage* dmlPackage) { // MCOL-140 Wait for any other DML using this table. std::map::iterator it; @@ -374,9 +374,27 @@ int PackageHandler::synchTableAccess() // tableOidQueue here is the queue holding the waitng transactions for this fTableOid while (true) { - tableOidCond.wait(lock); + // Log something that we're waiting + LoggingID logid(21, fSessionID, fTxnid); + logging::Message::Args args1; + logging::Message msg(1); + ostringstream oss; + oss << "Txn is waiting for" << tableOidQueue.front() << " " << dmlPackage->get_SQLStatement() << "; |" << dmlPackage->get_SchemaName() <<"|"; + args1.add(oss.str()); + args1.add((uint64_t)fTableOid); + msg.format(args1); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_DEBUG, msg, logid); + + tableOidCond.wait(lock); + // In case of CTRL+C, the tableOidQueue could be invalidated + if ((tableOidMap.find(fTableOid))->second != tableOidQueue) + { + break; + } if (tableOidQueue.front() == fTxnid) { + // We're up next. Let's go do stuff. break; } if (tableOidQueue.empty()) @@ -458,6 +476,11 @@ int PackageHandler::forceReleaseTableAccess() } PackageHandler::tableAccessQueue_t& tableOidQueue = it->second; tableOidQueue.erase(fTxnid); + if (tableOidQueue.empty()) + { + // remove the queue from the map. + tableOidMap.erase(fTableOid); + } // release the condition tableOidCond.notify_all(); return 1; @@ -509,7 +532,7 @@ void PackageHandler::run() CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } - synchTable.setPackage(this); // Blocks if another DML thread is using this fTableOid + synchTable.setPackage(this, &insertPkg); // Blocks if another DML thread is using this fTableOid } #endif QueryTeleStats qts; @@ -871,7 +894,7 @@ void PackageHandler::run() CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } - synchTable.setPackage(this); // Blocks if another DML thread is using this fTableOid + synchTable.setPackage(this, updatePkg.get()); // Blocks if another DML thread is using this fTableOid } #endif updatePkg->set_TxnID(fTxnid); @@ -927,7 +950,7 @@ void PackageHandler::run() CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } - synchTable.setPackage(this); // Blocks if another DML thread is using this fTableOid + synchTable.setPackage(this, deletePkg.get()); // Blocks if another DML thread is using this fTableOid } #endif deletePkg->set_TxnID(fTxnid); @@ -1070,16 +1093,6 @@ void PackageHandler::run() void PackageHandler::rollbackPending() { - // Force a release of the processing from MCOL-140 -#ifdef MCOL_140 - if (fConcurrentSupport) - { - // MCOL-140 We're not necessarily the next in line. - // This forces this thread to be released anyway. - forceReleaseTableAccess(); - } -#endif - if (fProcessor.get() == NULL) { // This happens when batch insert @@ -1088,6 +1101,16 @@ void PackageHandler::rollbackPending() fProcessor->setRollbackPending(true); + // Force a release of the processing from MCOL-140 +#ifdef MCOL_140 + if (fConcurrentSupport) + { + // MCOL-140 We're not necessarily the next in line. + // This forces this thread to be released anyway. + forceReleaseTableAccess(); + } +#endif + ostringstream oss; oss << "PackageHandler::rollbackPending: Processing DMLPackage."; DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG); diff --git a/dmlproc/dmlprocessor.h b/dmlproc/dmlprocessor.h index 02f993df6..978fbdda8 100644 --- a/dmlproc/dmlprocessor.h +++ b/dmlproc/dmlprocessor.h @@ -171,12 +171,12 @@ private: // Used to serialize operations because the VSS can't handle inserts // or updates on the same block. // When an Insert, Update or Delete command arrives, we look here - // for the table oid. If found, wait until it is no onger here. + // for the table oid. If found, wait until it is no longer here. // If this transactionID (SCN) is < the transactionID in the table, don't delay // and hope for the best, as we're already out of order. // When the VSS is engineered to handle transactions out of order, all MCOL-140 // code is to be removed. - int synchTableAccess(); + int synchTableAccess(dmlpackage::CalpontDMLPackage* dmlPackage); int releaseTableAccess(); int forceReleaseTableAccess(); typedef iterable_queue tableAccessQueue_t; @@ -192,22 +192,22 @@ public: { public: SynchTable() : fphp(NULL) {}; - SynchTable(PackageHandler* php) + SynchTable(PackageHandler* php, dmlpackage::CalpontDMLPackage* dmlPackage) { - setPackage(php); + setPackage(php, dmlPackage); } ~SynchTable() { if (fphp) fphp->releaseTableAccess(); } - bool setPackage(PackageHandler* php) + bool setPackage(PackageHandler* php, dmlpackage::CalpontDMLPackage* dmlPackage) { if (fphp) fphp->releaseTableAccess(); fphp = php; if (fphp) - fphp->synchTableAccess(); + fphp->synchTableAccess(dmlPackage); return true; } private: From 01ff86096a4092a8a80d8a177620d1f076bb946e Mon Sep 17 00:00:00 2001 From: David Hall Date: Fri, 17 May 2019 12:45:09 -0500 Subject: [PATCH 3/3] MCOL-3314 When setting up statistics functions for the UM, use values that actually exist. --- dbcon/joblist/tupleaggregatestep.cpp | 36 ++++++++++++++-------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index e68bc9f18..07fb2a828 100755 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -2325,8 +2325,8 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( throw logic_error("(9)A UDAF function is called but there's no RowUDAFFunctionCol"); } functionVec2[i]->fAuxColumnIndex = lastCol++; - oidsAggDist.push_back(oidsAggDist[j]); // Dummy? - keysAggDist.push_back(keysAggDist[j]); // Dummy? + oidsAggDist.push_back(oidsAgg[j]); // Dummy? + keysAggDist.push_back(keysAgg[j]); // Dummy? scaleAggDist.push_back(0); precisionAggDist.push_back(0); typeAggDist.push_back(CalpontSystemCatalog::UBIGINT); @@ -2340,8 +2340,8 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( functionVec2[i]->fAuxColumnIndex = lastCol; // sum(x) - oidsAggDist.push_back(oidsAggDist[j]); - keysAggDist.push_back(keysAggDist[j]); + oidsAggDist.push_back(oidsAgg[j]); + keysAggDist.push_back(keysAgg[j]); scaleAggDist.push_back(0); precisionAggDist.push_back(0); typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE); @@ -2349,8 +2349,8 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( ++lastCol; // sum(x**2) - oidsAggDist.push_back(oidsAggDist[j]); - keysAggDist.push_back(keysAggDist[j]); + oidsAggDist.push_back(oidsAgg[j]); + keysAggDist.push_back(keysAgg[j]); scaleAggDist.push_back(0); precisionAggDist.push_back(0); typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE); @@ -3258,8 +3258,8 @@ void TupleAggregateStep::prep2PhasesAggregate( throw logic_error("(9)A UDAF function is called but there's no RowUDAFFunctionCol"); } functionVecUm[i]->fAuxColumnIndex = lastCol++; - oidsAggUm.push_back(oidsAggUm[j]); // Dummy? - keysAggUm.push_back(keysAggUm[j]); // Dummy? + oidsAggUm.push_back(oidsAggPm[j]); // Dummy? + keysAggUm.push_back(keysAggPm[j]); // Dummy? scaleAggUm.push_back(0); precisionAggUm.push_back(0); typeAggUm.push_back(CalpontSystemCatalog::UBIGINT); @@ -3273,8 +3273,8 @@ void TupleAggregateStep::prep2PhasesAggregate( functionVecUm[i]->fAuxColumnIndex = lastCol; // sum(x) - oidsAggUm.push_back(oidsAggUm[j]); - keysAggUm.push_back(keysAggUm[j]); + oidsAggUm.push_back(oidsAggPm[j]); + keysAggUm.push_back(keysAggPm[j]); scaleAggUm.push_back(0); precisionAggUm.push_back(0); typeAggUm.push_back(CalpontSystemCatalog::LONGDOUBLE); @@ -3282,8 +3282,8 @@ void TupleAggregateStep::prep2PhasesAggregate( ++lastCol; // sum(x**2) - oidsAggUm.push_back(oidsAggUm[j]); - keysAggUm.push_back(keysAggUm[j]); + oidsAggUm.push_back(oidsAggPm[j]); + keysAggUm.push_back(keysAggPm[j]); scaleAggUm.push_back(0); precisionAggUm.push_back(0); typeAggUm.push_back(CalpontSystemCatalog::LONGDOUBLE); @@ -4189,8 +4189,8 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( throw logic_error("(9)A UDAF function is called but there's no RowUDAFFunctionCol"); } functionVecUm[i]->fAuxColumnIndex = lastCol++; - oidsAggDist.push_back(oidsAggUm[j]); // Dummy? - keysAggDist.push_back(keysAggUm[j]); // Dummy? + oidsAggDist.push_back(oidsAggPm[j]); // Dummy? + keysAggDist.push_back(keysAggPm[j]); // Dummy? scaleAggDist.push_back(0); precisionAggDist.push_back(0); typeAggDist.push_back(CalpontSystemCatalog::UBIGINT); @@ -4203,8 +4203,8 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( functionVecUm[i]->fAuxColumnIndex = lastCol; // sum(x) - oidsAggDist.push_back(oidsAggDist[j]); - keysAggDist.push_back(keysAggDist[j]); + oidsAggDist.push_back(oidsAggPm[j]); + keysAggDist.push_back(keysAggPm[j]); scaleAggDist.push_back(0); precisionAggDist.push_back(0); typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE); @@ -4212,8 +4212,8 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( ++lastCol; // sum(x**2) - oidsAggDist.push_back(oidsAggDist[j]); - keysAggDist.push_back(keysAggDist[j]); + oidsAggDist.push_back(oidsAggPm[j]); + keysAggDist.push_back(keysAggPm[j]); scaleAggDist.push_back(0); precisionAggDist.push_back(0); typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE);