1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-3296 CTRL+C should not double remove from dequeue.

This commit is contained in:
David Hall
2019-05-09 12:42:18 -05:00
parent 93f00d9f32
commit f2d8a33ee5
4 changed files with 70 additions and 36 deletions

View File

@ -86,6 +86,8 @@ using namespace joblist;
namespace fs = boost::filesystem; namespace fs = boost::filesystem;
ThreadPool DMLServer::fDmlPackagepool(10, 0);
namespace namespace
{ {
DistributedEngineComm *Dec; DistributedEngineComm *Dec;
@ -551,16 +553,16 @@ int main(int argc, char* argv[])
int temp; int temp;
int serverThreads = 10; int serverThreads = 10;
int serverQueueSize = 50; int serverQueueSize = 0;
const string DMLProc("DMLProc"); const string DMLProc("DMLProc");
temp = toInt(cf->getConfig(DMLProc, "ServerThreads")); temp = toInt(cf->getConfig(DMLProc, "ServerThreads"));
if (temp > 0) if (temp > 0)
serverThreads = temp; serverThreads = temp;
temp = toInt(cf->getConfig(DMLProc, "ServerQueueSize")); // temp = toInt(cf->getConfig(DMLProc, "ServerQueueSize"));
if (temp > 0) // if (temp > 0)
serverQueueSize = temp; // serverQueueSize = temp;
bool rootUser = true; bool rootUser = true;
@ -599,6 +601,8 @@ int main(int argc, char* argv[])
{ {
JobStep::jobstepThreadPool.setDebug(true); JobStep::jobstepThreadPool.setDebug(true);
JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool)); JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool));
DMLServer::fDmlPackagepool.setDebug(true);
DMLServer::fDmlPackagepool.invoke(ThreadPoolMonitor(&DMLServer::fDmlPackagepool));
} }
//set ACTIVE state //set ACTIVE state

View File

@ -413,7 +413,6 @@ int PackageHandler::releaseTableAccess()
boost::lock_guard<boost::mutex> lock(tableOidMutex); boost::lock_guard<boost::mutex> lock(tableOidMutex);
if (fTableOid == 0 || (it=tableOidMap.find(fTableOid)) == tableOidMap.end()) if (fTableOid == 0 || (it=tableOidMap.find(fTableOid)) == tableOidMap.end())
{ {
// This will happen for DML_COMMAND, as we never got the tableoid or called synchTableAccess
return 2; // For now, return codes are not used return 2; // For now, return codes are not used
} }
PackageHandler::tableAccessQueue_t& tableOidQueue = it->second; PackageHandler::tableAccessQueue_t& tableOidQueue = it->second;
@ -433,7 +432,8 @@ int PackageHandler::releaseTableAccess()
} }
else else
{ {
tableOidQueue.pop(); // Get off the waiting list. if (!tableOidQueue.empty())
tableOidQueue.pop(); // Get off the waiting list.
if (tableOidQueue.empty()) if (tableOidQueue.empty())
{ {
// remove the queue from the map. // remove the queue from the map.
@ -447,7 +447,7 @@ int PackageHandler::releaseTableAccess()
int PackageHandler::forceReleaseTableAccess() int PackageHandler::forceReleaseTableAccess()
{ {
// By removing the tcnid from the queue, the logic after the wait in // By removing the txnid from the queue, the logic after the wait in
// synchTableAccess() will release the thread and clean up if needed. // synchTableAccess() will release the thread and clean up if needed.
std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it; std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it;
boost::lock_guard<boost::mutex> lock(tableOidMutex); boost::lock_guard<boost::mutex> lock(tableOidMutex);
@ -481,7 +481,8 @@ void PackageHandler::run()
std::string stmt; std::string stmt;
unsigned DMLLoggingId = 21; unsigned DMLLoggingId = 21;
oam::OamCache* oamCache = oam::OamCache::makeOamCache(); oam::OamCache* oamCache = oam::OamCache::makeOamCache();
SynchTable synchTable;
try try
{ {
switch( fPackageType ) switch( fPackageType )
@ -508,7 +509,7 @@ void PackageHandler::run()
CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
fTableOid = roPair.objnum; fTableOid = roPair.objnum;
} }
synchTableAccess(); // Blocks if another DML thread is using this fTableOid synchTable.setPackage(this); // Blocks if another DML thread is using this fTableOid
} }
#endif #endif
QueryTeleStats qts; QueryTeleStats qts;
@ -870,7 +871,7 @@ void PackageHandler::run()
CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
fTableOid = roPair.objnum; fTableOid = roPair.objnum;
} }
synchTableAccess(); // Blocks if another DML thread is using this fTableOid synchTable.setPackage(this); // Blocks if another DML thread is using this fTableOid
} }
#endif #endif
updatePkg->set_TxnID(fTxnid); updatePkg->set_TxnID(fTxnid);
@ -926,7 +927,7 @@ void PackageHandler::run()
CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
fTableOid = roPair.objnum; fTableOid = roPair.objnum;
} }
synchTableAccess(); // Blocks if another DML thread is using this fTableOid synchTable.setPackage(this); // Blocks if another DML thread is using this fTableOid
} }
#endif #endif
deletePkg->set_TxnID(fTxnid); deletePkg->set_TxnID(fTxnid);
@ -989,13 +990,6 @@ void PackageHandler::run()
} }
break; break;
} }
#ifdef MCOL_140
if (fConcurrentSupport)
{
// MCOL-140 We're done. release the next waiting txn for this fTableOid
releaseTableAccess();
}
#endif
//Log errors //Log errors
if ( (result.result != dmlpackageprocessor::DMLPackageProcessor::NO_ERROR) if ( (result.result != dmlpackageprocessor::DMLPackageProcessor::NO_ERROR)
&& (result.result != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) && (result.result != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
@ -1017,13 +1011,6 @@ void PackageHandler::run()
} }
catch(std::exception& e) catch(std::exception& e)
{ {
#ifdef MCOL_140
if (fConcurrentSupport)
{
// MCOL-140 We're done. release the next waiting txn for this fTableOid
releaseTableAccess();
}
#endif
cout << "dmlprocessor.cpp PackageHandler::run() package type(" cout << "dmlprocessor.cpp PackageHandler::run() package type("
<< fPackageType << ") exception: " << e.what() << endl; << fPackageType << ") exception: " << e.what() << endl;
logging::LoggingID lid(21); logging::LoggingID lid(21);
@ -1040,13 +1027,6 @@ void PackageHandler::run()
} }
catch(...) catch(...)
{ {
#ifdef MCOL_140
if (fConcurrentSupport)
{
// MCOL-140 We're done. release the next waiting txn for this fTableOid
releaseTableAccess();
}
#endif
logging::LoggingID lid(21); logging::LoggingID lid(21);
logging::MessageLog ml(lid); logging::MessageLog ml(lid);
logging::Message::Args args; logging::Message::Args args;

View File

@ -120,15 +120,16 @@ private:
DMLServer(const DMLServer& rhs); DMLServer(const DMLServer& rhs);
DMLServer& operator=(const DMLServer& rhs); DMLServer& operator=(const DMLServer& rhs);
/** @brief the thread pool for processing dml packages
*/
threadpool::ThreadPool fDmlPackagepool;
int fPackageMaxThreads; /** @brief max number of threads to process dml packages */ int fPackageMaxThreads; /** @brief max number of threads to process dml packages */
int fPackageWorkQueueSize; /** @brief max number of packages waiting in the work queue */ int fPackageWorkQueueSize; /** @brief max number of packages waiting in the work queue */
boost::scoped_ptr<messageqcpp::MessageQueueServer> fMqServer; boost::scoped_ptr<messageqcpp::MessageQueueServer> fMqServer;
BRM::DBRM* fDbrm; BRM::DBRM* fDbrm;
public:
/** @brief the thread pool for processing dml packages
*/
static threadpool::ThreadPool fDmlPackagepool;
}; };
/** @brief Thread to process a single dml package. /** @brief Thread to process a single dml package.
@ -184,6 +185,35 @@ private:
static boost::mutex tableOidMutex; static boost::mutex tableOidMutex;
public: public:
static int clearTableAccess(); static int clearTableAccess();
// MCOL-3296 Add a class to call synchTableAccess on creation and
// releaseTableAccess on destuction for exception safeness.
class SynchTable
{
public:
SynchTable() : fphp(NULL) {};
SynchTable(PackageHandler* php)
{
setPackage(php);
}
~SynchTable()
{
if (fphp)
fphp->releaseTableAccess();
}
bool setPackage(PackageHandler* php)
{
if (fphp)
fphp->releaseTableAccess();
fphp = php;
if (fphp)
fphp->synchTableAccess();
return true;
}
private:
PackageHandler* fphp;
};
}; };
/** @brief processes dml packages as they arrive /** @brief processes dml packages as they arrive

View File

@ -392,6 +392,24 @@ void ThreadPool::beginThread() throw()
--fIssued; --fIssued;
--waitingFunctorsSize; --waitingFunctorsSize;
fWaitingFunctors.erase(todo); fWaitingFunctors.erase(todo);
if (fDebug)
{
ostringstream oss;
oss << "Ending thread " << " on " << fName
<< " max " << fMaxThreads
<< " queue " << fQueueSize
<< " threads " << fThreadCount
<< " running " << fIssued
<< " waiting " << (waitingFunctorsSize - fIssued)
<< " total " << waitingFunctorsSize;
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
}
} }
timeout = boost::get_system_time()+boost::posix_time::minutes(10); timeout = boost::get_system_time()+boost::posix_time::minutes(10);
@ -515,6 +533,8 @@ void ThreadPoolMonitor::operator()()
<< setw(4) << tv.tv_usec/100 << setw(4) << tv.tv_usec/100
<< " Name " << fPool->fName << " Name " << fPool->fName
<< " Active " << fPool->waitingFunctorsSize << " Active " << fPool->waitingFunctorsSize
<< " running " << fPool->fIssued
<< " waiting " << (fPool->waitingFunctorsSize - fPool->fIssued)
<< " ThdCnt " << fPool->fThreadCount << " ThdCnt " << fPool->fThreadCount
<< " Max " << fPool->fMaxThreads << " Max " << fPool->fMaxThreads
<< " Q " << fPool->fQueueSize << " Q " << fPool->fQueueSize