diff --git a/dmlproc/dmlproc.cpp b/dmlproc/dmlproc.cpp index 106977824..990015ac8 100644 --- a/dmlproc/dmlproc.cpp +++ b/dmlproc/dmlproc.cpp @@ -86,6 +86,8 @@ using namespace joblist; namespace fs = boost::filesystem; +ThreadPool DMLServer::fDmlPackagepool(10, 0); + namespace { DistributedEngineComm *Dec; @@ -551,16 +553,16 @@ int main(int argc, char* argv[]) int temp; int serverThreads = 10; - int serverQueueSize = 50; + int serverQueueSize = 0; const string DMLProc("DMLProc"); temp = toInt(cf->getConfig(DMLProc, "ServerThreads")); if (temp > 0) serverThreads = temp; - temp = toInt(cf->getConfig(DMLProc, "ServerQueueSize")); - if (temp > 0) - serverQueueSize = temp; +// temp = toInt(cf->getConfig(DMLProc, "ServerQueueSize")); +// if (temp > 0) +// serverQueueSize = temp; bool rootUser = true; @@ -599,6 +601,8 @@ int main(int argc, char* argv[]) { JobStep::jobstepThreadPool.setDebug(true); JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool)); + DMLServer::fDmlPackagepool.setDebug(true); + DMLServer::fDmlPackagepool.invoke(ThreadPoolMonitor(&DMLServer::fDmlPackagepool)); } //set ACTIVE state diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index 3b3a5cffc..ca0707989 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -413,7 +413,6 @@ int PackageHandler::releaseTableAccess() boost::lock_guard lock(tableOidMutex); if (fTableOid == 0 || (it=tableOidMap.find(fTableOid)) == tableOidMap.end()) { - // This will happen for DML_COMMAND, as we never got the tableoid or called synchTableAccess return 2; // For now, return codes are not used } PackageHandler::tableAccessQueue_t& tableOidQueue = it->second; @@ -433,7 +432,8 @@ int PackageHandler::releaseTableAccess() } else { - tableOidQueue.pop(); // Get off the waiting list. + if (!tableOidQueue.empty()) + tableOidQueue.pop(); // Get off the waiting list. if (tableOidQueue.empty()) { // remove the queue from the map. @@ -447,7 +447,7 @@ int PackageHandler::releaseTableAccess() int PackageHandler::forceReleaseTableAccess() { - // By removing the tcnid from the queue, the logic after the wait in + // By removing the txnid from the queue, the logic after the wait in // synchTableAccess() will release the thread and clean up if needed. std::map::iterator it; boost::lock_guard lock(tableOidMutex); @@ -481,7 +481,8 @@ void PackageHandler::run() std::string stmt; unsigned DMLLoggingId = 21; oam::OamCache* oamCache = oam::OamCache::makeOamCache(); - + SynchTable synchTable; + try { switch( fPackageType ) @@ -508,7 +509,7 @@ void PackageHandler::run() CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } - synchTableAccess(); // Blocks if another DML thread is using this fTableOid + synchTable.setPackage(this); // Blocks if another DML thread is using this fTableOid } #endif QueryTeleStats qts; @@ -870,7 +871,7 @@ void PackageHandler::run() CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } - synchTableAccess(); // Blocks if another DML thread is using this fTableOid + synchTable.setPackage(this); // Blocks if another DML thread is using this fTableOid } #endif updatePkg->set_TxnID(fTxnid); @@ -926,7 +927,7 @@ void PackageHandler::run() CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } - synchTableAccess(); // Blocks if another DML thread is using this fTableOid + synchTable.setPackage(this); // Blocks if another DML thread is using this fTableOid } #endif deletePkg->set_TxnID(fTxnid); @@ -989,13 +990,6 @@ void PackageHandler::run() } break; } -#ifdef MCOL_140 - if (fConcurrentSupport) - { - // MCOL-140 We're done. release the next waiting txn for this fTableOid - releaseTableAccess(); - } -#endif //Log errors if ( (result.result != dmlpackageprocessor::DMLPackageProcessor::NO_ERROR) && (result.result != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) @@ -1017,13 +1011,6 @@ void PackageHandler::run() } catch(std::exception& e) { -#ifdef MCOL_140 - if (fConcurrentSupport) - { - // MCOL-140 We're done. release the next waiting txn for this fTableOid - releaseTableAccess(); - } -#endif cout << "dmlprocessor.cpp PackageHandler::run() package type(" << fPackageType << ") exception: " << e.what() << endl; logging::LoggingID lid(21); @@ -1040,13 +1027,6 @@ void PackageHandler::run() } catch(...) { -#ifdef MCOL_140 - if (fConcurrentSupport) - { - // MCOL-140 We're done. release the next waiting txn for this fTableOid - releaseTableAccess(); - } -#endif logging::LoggingID lid(21); logging::MessageLog ml(lid); logging::Message::Args args; diff --git a/dmlproc/dmlprocessor.h b/dmlproc/dmlprocessor.h index 2faf0cfb2..02f993df6 100644 --- a/dmlproc/dmlprocessor.h +++ b/dmlproc/dmlprocessor.h @@ -120,15 +120,16 @@ private: DMLServer(const DMLServer& rhs); DMLServer& operator=(const DMLServer& rhs); - /** @brief the thread pool for processing dml packages - */ - threadpool::ThreadPool fDmlPackagepool; - int fPackageMaxThreads; /** @brief max number of threads to process dml packages */ int fPackageWorkQueueSize; /** @brief max number of packages waiting in the work queue */ boost::scoped_ptr fMqServer; BRM::DBRM* fDbrm; + +public: + /** @brief the thread pool for processing dml packages + */ + static threadpool::ThreadPool fDmlPackagepool; }; /** @brief Thread to process a single dml package. @@ -184,6 +185,35 @@ private: static boost::mutex tableOidMutex; public: static int clearTableAccess(); + + // MCOL-3296 Add a class to call synchTableAccess on creation and + // releaseTableAccess on destuction for exception safeness. + class SynchTable + { + public: + SynchTable() : fphp(NULL) {}; + SynchTable(PackageHandler* php) + { + setPackage(php); + } + ~SynchTable() + { + if (fphp) + fphp->releaseTableAccess(); + } + bool setPackage(PackageHandler* php) + { + if (fphp) + fphp->releaseTableAccess(); + fphp = php; + if (fphp) + fphp->synchTableAccess(); + return true; + } + private: + PackageHandler* fphp; + }; + }; /** @brief processes dml packages as they arrive diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index 602f868c4..892a14ff2 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -392,6 +392,24 @@ void ThreadPool::beginThread() throw() --fIssued; --waitingFunctorsSize; fWaitingFunctors.erase(todo); + if (fDebug) + { + ostringstream oss; + oss << "Ending thread " << " on " << fName + << " max " << fMaxThreads + << " queue " << fQueueSize + << " threads " << fThreadCount + << " running " << fIssued + << " waiting " << (waitingFunctorsSize - fIssued) + << " total " << waitingFunctorsSize; + logging::Message::Args args; + logging::Message message(0); + args.add(oss.str()); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + } } timeout = boost::get_system_time()+boost::posix_time::minutes(10); @@ -515,6 +533,8 @@ void ThreadPoolMonitor::operator()() << setw(4) << tv.tv_usec/100 << " Name " << fPool->fName << " Active " << fPool->waitingFunctorsSize + << " running " << fPool->fIssued + << " waiting " << (fPool->waitingFunctorsSize - fPool->fIssued) << " ThdCnt " << fPool->fThreadCount << " Max " << fPool->fMaxThreads << " Q " << fPool->fQueueSize