diff --git a/dmlproc/dmlproc.cpp b/dmlproc/dmlproc.cpp index 3845ef37a..158bb4536 100644 --- a/dmlproc/dmlproc.cpp +++ b/dmlproc/dmlproc.cpp @@ -87,6 +87,8 @@ using namespace joblist; namespace fs = boost::filesystem; +ThreadPool DMLServer::fDmlPackagepool(10, 0); + namespace { DistributedEngineComm* Dec; @@ -606,7 +608,7 @@ int main(int argc, char* argv[]) int temp; int serverThreads = 10; - int serverQueueSize = 50; + int serverQueueSize = 0; const string DMLProc("DMLProc"); temp = toInt(cf->getConfig(DMLProc, "ServerThreads")); @@ -614,10 +616,9 @@ int main(int argc, char* argv[]) if (temp > 0) serverThreads = temp; - temp = toInt(cf->getConfig(DMLProc, "ServerQueueSize")); - - if (temp > 0) - serverQueueSize = temp; +// temp = toInt(cf->getConfig(DMLProc, "ServerQueueSize")); +// if (temp > 0) +// serverQueueSize = temp; //read and cleanup port before trying to use try @@ -657,6 +658,8 @@ int main(int argc, char* argv[]) { JobStep::jobstepThreadPool.setDebug(true); JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool)); + DMLServer::fDmlPackagepool.setDebug(true); + DMLServer::fDmlPackagepool.invoke(ThreadPoolMonitor(&DMLServer::fDmlPackagepool)); } //set ACTIVE state diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index 861198d45..9e1601a36 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -349,7 +349,7 @@ PackageHandler::~PackageHandler() // Rollback will most likely be next. // // A tranasaction for one fTableOid is not blocked by a txn for a different fTableOid. -int PackageHandler::synchTableAccess() +int PackageHandler::synchTableAccess(dmlpackage::CalpontDMLPackage* dmlPackage) { // MCOL-140 Wait for any other DML using this table. std::map::iterator it; @@ -393,10 +393,27 @@ int PackageHandler::synchTableAccess() // tableOidQueue here is the queue holding the waitng transactions for this fTableOid while (true) { + // Log something that we're waiting + LoggingID logid(21, fSessionID, fTxnid); + logging::Message::Args args1; + logging::Message msg(1); + ostringstream oss; + oss << "Txn is waiting for" << tableOidQueue.front() << " " << dmlPackage->get_SQLStatement() << "; |" << dmlPackage->get_SchemaName() <<"|"; + args1.add(oss.str()); + args1.add((uint64_t)fTableOid); + msg.format(args1); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_DEBUG, msg, logid); + tableOidCond.wait(lock); - + // In case of CTRL+C, the tableOidQueue could be invalidated + if ((tableOidMap.find(fTableOid))->second != tableOidQueue) + { + break; + } if (tableOidQueue.front() == fTxnid) { + // We're up next. Let's go do stuff. break; } @@ -439,7 +456,6 @@ int PackageHandler::releaseTableAccess() if (fTableOid == 0 || (it = tableOidMap.find(fTableOid)) == tableOidMap.end()) { - // This will happen for DML_COMMAND, as we never got the tableoid or called synchTableAccess return 2; // For now, return codes are not used } @@ -461,7 +477,8 @@ int PackageHandler::releaseTableAccess() } else { - tableOidQueue.pop(); // Get off the waiting list. + if (!tableOidQueue.empty()) + tableOidQueue.pop(); // Get off the waiting list. if (tableOidQueue.empty()) { @@ -477,7 +494,7 @@ int PackageHandler::releaseTableAccess() int PackageHandler::forceReleaseTableAccess() { - // By removing the tcnid from the queue, the logic after the wait in + // By removing the txnid from the queue, the logic after the wait in // synchTableAccess() will release the thread and clean up if needed. std::map::iterator it; boost::lock_guard lock(tableOidMutex); @@ -490,6 +507,11 @@ int PackageHandler::forceReleaseTableAccess() PackageHandler::tableAccessQueue_t& tableOidQueue = it->second; tableOidQueue.erase(fTxnid); + if (tableOidQueue.empty()) + { + // remove the queue from the map. + tableOidMap.erase(fTableOid); + } // release the condition tableOidCond.notify_all(); return 1; @@ -513,7 +535,8 @@ void PackageHandler::run() std::string stmt; unsigned DMLLoggingId = 21; oam::OamCache* oamCache = oam::OamCache::makeOamCache(); - + SynchTable synchTable; + try { switch ( fPackageType ) @@ -542,8 +565,7 @@ void PackageHandler::run() CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } - - synchTableAccess(); // Blocks if another DML thread is using this fTableOid + synchTable.setPackage(this, &insertPkg); // Blocks if another DML thread is using this fTableOid } #endif @@ -921,8 +943,7 @@ void PackageHandler::run() CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } - - synchTableAccess(); // Blocks if another DML thread is using this fTableOid + synchTable.setPackage(this, updatePkg.get()); // Blocks if another DML thread is using this fTableOid } #endif @@ -981,8 +1002,7 @@ void PackageHandler::run() CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } - - synchTableAccess(); // Blocks if another DML thread is using this fTableOid + synchTable.setPackage(this, deletePkg.get()); // Blocks if another DML thread is using this fTableOid } #endif @@ -1049,15 +1069,8 @@ void PackageHandler::run() break; } -#ifdef MCOL_140 - if (fConcurrentSupport) - { - // MCOL-140 We're done. release the next waiting txn for this fTableOid - releaseTableAccess(); - } -#endif //Log errors if ( (result.result != dmlpackageprocessor::DMLPackageProcessor::NO_ERROR) @@ -1080,15 +1093,8 @@ void PackageHandler::run() } catch (std::exception& e) { -#ifdef MCOL_140 - if (fConcurrentSupport) - { - // MCOL-140 We're done. release the next waiting txn for this fTableOid - releaseTableAccess(); - } -#endif cout << "dmlprocessor.cpp PackageHandler::run() package type(" << fPackageType << ") exception: " << e.what() << endl; logging::LoggingID lid(21); @@ -1105,15 +1111,8 @@ void PackageHandler::run() } catch (...) { -#ifdef MCOL_140 - if (fConcurrentSupport) - { - // MCOL-140 We're done. release the next waiting txn for this fTableOid - releaseTableAccess(); - } -#endif logging::LoggingID lid(21); logging::MessageLog ml(lid); logging::Message::Args args; @@ -1157,9 +1156,17 @@ void PackageHandler::run() void PackageHandler::rollbackPending() { + if (fProcessor.get() == NULL) + { + // This happens when batch insert + return; + } + + fProcessor->setRollbackPending(true); + // Force a release of the processing from MCOL-140 #ifdef MCOL_140 - if (fConcurrentSupport) + if (fConcurrentSupport) { // MCOL-140 We're not necessarily the next in line. // This forces this thread to be released anyway. @@ -1168,14 +1175,6 @@ void PackageHandler::rollbackPending() #endif - if (fProcessor.get() == NULL) - { - // This happens when batch insert - return; - } - - fProcessor->setRollbackPending(true); - ostringstream oss; oss << "PackageHandler::rollbackPending: Processing DMLPackage."; DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG); diff --git a/dmlproc/dmlprocessor.h b/dmlproc/dmlprocessor.h index 60ba8a14b..6a18493b1 100644 --- a/dmlproc/dmlprocessor.h +++ b/dmlproc/dmlprocessor.h @@ -151,15 +151,16 @@ private: DMLServer(const DMLServer& rhs); DMLServer& operator=(const DMLServer& rhs); - /** @brief the thread pool for processing dml packages - */ - threadpool::ThreadPool fDmlPackagepool; - int fPackageMaxThreads; /** @brief max number of threads to process dml packages */ int fPackageWorkQueueSize; /** @brief max number of packages waiting in the work queue */ boost::scoped_ptr fMqServer; BRM::DBRM* fDbrm; + +public: + /** @brief the thread pool for processing dml packages + */ + static threadpool::ThreadPool fDmlPackagepool; }; /** @brief Thread to process a single dml package. @@ -207,12 +208,12 @@ private: // Used to serialize operations because the VSS can't handle inserts // or updates on the same block. // When an Insert, Update or Delete command arrives, we look here - // for the table oid. If found, wait until it is no onger here. + // for the table oid. If found, wait until it is no longer here. // If this transactionID (SCN) is < the transactionID in the table, don't delay // and hope for the best, as we're already out of order. // When the VSS is engineered to handle transactions out of order, all MCOL-140 // code is to be removed. - int synchTableAccess(); + int synchTableAccess(dmlpackage::CalpontDMLPackage* dmlPackage); int releaseTableAccess(); int forceReleaseTableAccess(); typedef iterable_queue tableAccessQueue_t; @@ -221,6 +222,35 @@ private: static boost::mutex tableOidMutex; public: static int clearTableAccess(); + + // MCOL-3296 Add a class to call synchTableAccess on creation and + // releaseTableAccess on destuction for exception safeness. + class SynchTable + { + public: + SynchTable() : fphp(NULL) {}; + SynchTable(PackageHandler* php, dmlpackage::CalpontDMLPackage* dmlPackage) + { + setPackage(php, dmlPackage); + } + ~SynchTable() + { + if (fphp) + fphp->releaseTableAccess(); + } + bool setPackage(PackageHandler* php, dmlpackage::CalpontDMLPackage* dmlPackage) + { + if (fphp) + fphp->releaseTableAccess(); + fphp = php; + if (fphp) + fphp->synchTableAccess(dmlPackage); + return true; + } + private: + PackageHandler* fphp; + }; + }; /** @brief processes dml packages as they arrive diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index 589fb6dcc..e62de7769 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -409,6 +409,24 @@ void ThreadPool::beginThread() throw() --fIssued; --waitingFunctorsSize; fWaitingFunctors.erase(todo); + if (fDebug) + { + ostringstream oss; + oss << "Ending thread " << " on " << fName + << " max " << fMaxThreads + << " queue " << fQueueSize + << " threads " << fThreadCount + << " running " << fIssued + << " waiting " << (waitingFunctorsSize - fIssued) + << " total " << waitingFunctorsSize; + logging::Message::Args args; + logging::Message message(0); + args.add(oss.str()); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + } } timeout = boost::get_system_time() + boost::posix_time::minutes(10); @@ -536,6 +554,8 @@ void ThreadPoolMonitor::operator()() << setw(4) << tv.tv_usec / 100 << " Name " << fPool->fName << " Active " << fPool->waitingFunctorsSize + << " running " << fPool->fIssued + << " waiting " << (fPool->waitingFunctorsSize - fPool->fIssued) << " ThdCnt " << fPool->fThreadCount << " Max " << fPool->fMaxThreads << " Q " << fPool->fQueueSize