1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Merge branch 'develop-1.1' into develop-1.2-merge-up-20190517

This commit is contained in:
Andrew Hutchings
2019-05-17 20:43:49 +01:00
4 changed files with 105 additions and 53 deletions

View File

@ -87,6 +87,8 @@ using namespace joblist;
namespace fs = boost::filesystem; namespace fs = boost::filesystem;
ThreadPool DMLServer::fDmlPackagepool(10, 0);
namespace namespace
{ {
DistributedEngineComm* Dec; DistributedEngineComm* Dec;
@ -606,7 +608,7 @@ int main(int argc, char* argv[])
int temp; int temp;
int serverThreads = 10; int serverThreads = 10;
int serverQueueSize = 50; int serverQueueSize = 0;
const string DMLProc("DMLProc"); const string DMLProc("DMLProc");
temp = toInt(cf->getConfig(DMLProc, "ServerThreads")); temp = toInt(cf->getConfig(DMLProc, "ServerThreads"));
@ -614,10 +616,9 @@ int main(int argc, char* argv[])
if (temp > 0) if (temp > 0)
serverThreads = temp; serverThreads = temp;
temp = toInt(cf->getConfig(DMLProc, "ServerQueueSize")); // temp = toInt(cf->getConfig(DMLProc, "ServerQueueSize"));
// if (temp > 0)
if (temp > 0) // serverQueueSize = temp;
serverQueueSize = temp;
//read and cleanup port before trying to use //read and cleanup port before trying to use
try try
@ -657,6 +658,8 @@ int main(int argc, char* argv[])
{ {
JobStep::jobstepThreadPool.setDebug(true); JobStep::jobstepThreadPool.setDebug(true);
JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool)); JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool));
DMLServer::fDmlPackagepool.setDebug(true);
DMLServer::fDmlPackagepool.invoke(ThreadPoolMonitor(&DMLServer::fDmlPackagepool));
} }
//set ACTIVE state //set ACTIVE state

View File

@ -349,7 +349,7 @@ PackageHandler::~PackageHandler()
// Rollback will most likely be next. // Rollback will most likely be next.
// //
// A tranasaction for one fTableOid is not blocked by a txn for a different fTableOid. // A tranasaction for one fTableOid is not blocked by a txn for a different fTableOid.
int PackageHandler::synchTableAccess() int PackageHandler::synchTableAccess(dmlpackage::CalpontDMLPackage* dmlPackage)
{ {
// MCOL-140 Wait for any other DML using this table. // MCOL-140 Wait for any other DML using this table.
std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it; std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it;
@ -393,10 +393,27 @@ int PackageHandler::synchTableAccess()
// tableOidQueue here is the queue holding the waitng transactions for this fTableOid // tableOidQueue here is the queue holding the waitng transactions for this fTableOid
while (true) while (true)
{ {
// Log something that we're waiting
LoggingID logid(21, fSessionID, fTxnid);
logging::Message::Args args1;
logging::Message msg(1);
ostringstream oss;
oss << "Txn is waiting for" << tableOidQueue.front() << " " << dmlPackage->get_SQLStatement() << "; |" << dmlPackage->get_SchemaName() <<"|";
args1.add(oss.str());
args1.add((uint64_t)fTableOid);
msg.format(args1);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
tableOidCond.wait(lock); tableOidCond.wait(lock);
// In case of CTRL+C, the tableOidQueue could be invalidated
if ((tableOidMap.find(fTableOid))->second != tableOidQueue)
{
break;
}
if (tableOidQueue.front() == fTxnid) if (tableOidQueue.front() == fTxnid)
{ {
// We're up next. Let's go do stuff.
break; break;
} }
@ -439,7 +456,6 @@ int PackageHandler::releaseTableAccess()
if (fTableOid == 0 || (it = tableOidMap.find(fTableOid)) == tableOidMap.end()) if (fTableOid == 0 || (it = tableOidMap.find(fTableOid)) == tableOidMap.end())
{ {
// This will happen for DML_COMMAND, as we never got the tableoid or called synchTableAccess
return 2; // For now, return codes are not used return 2; // For now, return codes are not used
} }
@ -461,7 +477,8 @@ int PackageHandler::releaseTableAccess()
} }
else else
{ {
tableOidQueue.pop(); // Get off the waiting list. if (!tableOidQueue.empty())
tableOidQueue.pop(); // Get off the waiting list.
if (tableOidQueue.empty()) if (tableOidQueue.empty())
{ {
@ -477,7 +494,7 @@ int PackageHandler::releaseTableAccess()
int PackageHandler::forceReleaseTableAccess() int PackageHandler::forceReleaseTableAccess()
{ {
// By removing the tcnid from the queue, the logic after the wait in // By removing the txnid from the queue, the logic after the wait in
// synchTableAccess() will release the thread and clean up if needed. // synchTableAccess() will release the thread and clean up if needed.
std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it; std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it;
boost::lock_guard<boost::mutex> lock(tableOidMutex); boost::lock_guard<boost::mutex> lock(tableOidMutex);
@ -490,6 +507,11 @@ int PackageHandler::forceReleaseTableAccess()
PackageHandler::tableAccessQueue_t& tableOidQueue = it->second; PackageHandler::tableAccessQueue_t& tableOidQueue = it->second;
tableOidQueue.erase(fTxnid); tableOidQueue.erase(fTxnid);
if (tableOidQueue.empty())
{
// remove the queue from the map.
tableOidMap.erase(fTableOid);
}
// release the condition // release the condition
tableOidCond.notify_all(); tableOidCond.notify_all();
return 1; return 1;
@ -513,7 +535,8 @@ void PackageHandler::run()
std::string stmt; std::string stmt;
unsigned DMLLoggingId = 21; unsigned DMLLoggingId = 21;
oam::OamCache* oamCache = oam::OamCache::makeOamCache(); oam::OamCache* oamCache = oam::OamCache::makeOamCache();
SynchTable synchTable;
try try
{ {
switch ( fPackageType ) switch ( fPackageType )
@ -542,8 +565,7 @@ void PackageHandler::run()
CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
fTableOid = roPair.objnum; fTableOid = roPair.objnum;
} }
synchTable.setPackage(this, &insertPkg); // Blocks if another DML thread is using this fTableOid
synchTableAccess(); // Blocks if another DML thread is using this fTableOid
} }
#endif #endif
@ -921,8 +943,7 @@ void PackageHandler::run()
CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
fTableOid = roPair.objnum; fTableOid = roPair.objnum;
} }
synchTable.setPackage(this, updatePkg.get()); // Blocks if another DML thread is using this fTableOid
synchTableAccess(); // Blocks if another DML thread is using this fTableOid
} }
#endif #endif
@ -981,8 +1002,7 @@ void PackageHandler::run()
CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
fTableOid = roPair.objnum; fTableOid = roPair.objnum;
} }
synchTable.setPackage(this, deletePkg.get()); // Blocks if another DML thread is using this fTableOid
synchTableAccess(); // Blocks if another DML thread is using this fTableOid
} }
#endif #endif
@ -1049,15 +1069,8 @@ void PackageHandler::run()
break; break;
} }
#ifdef MCOL_140
if (fConcurrentSupport)
{
// MCOL-140 We're done. release the next waiting txn for this fTableOid
releaseTableAccess();
}
#endif
//Log errors //Log errors
if ( (result.result != dmlpackageprocessor::DMLPackageProcessor::NO_ERROR) if ( (result.result != dmlpackageprocessor::DMLPackageProcessor::NO_ERROR)
@ -1080,15 +1093,8 @@ void PackageHandler::run()
} }
catch (std::exception& e) catch (std::exception& e)
{ {
#ifdef MCOL_140
if (fConcurrentSupport)
{
// MCOL-140 We're done. release the next waiting txn for this fTableOid
releaseTableAccess();
}
#endif
cout << "dmlprocessor.cpp PackageHandler::run() package type(" cout << "dmlprocessor.cpp PackageHandler::run() package type("
<< fPackageType << ") exception: " << e.what() << endl; << fPackageType << ") exception: " << e.what() << endl;
logging::LoggingID lid(21); logging::LoggingID lid(21);
@ -1105,15 +1111,8 @@ void PackageHandler::run()
} }
catch (...) catch (...)
{ {
#ifdef MCOL_140
if (fConcurrentSupport)
{
// MCOL-140 We're done. release the next waiting txn for this fTableOid
releaseTableAccess();
}
#endif
logging::LoggingID lid(21); logging::LoggingID lid(21);
logging::MessageLog ml(lid); logging::MessageLog ml(lid);
logging::Message::Args args; logging::Message::Args args;
@ -1157,9 +1156,17 @@ void PackageHandler::run()
void PackageHandler::rollbackPending() void PackageHandler::rollbackPending()
{ {
if (fProcessor.get() == NULL)
{
// This happens when batch insert
return;
}
fProcessor->setRollbackPending(true);
// Force a release of the processing from MCOL-140 // Force a release of the processing from MCOL-140
#ifdef MCOL_140 #ifdef MCOL_140
if (fConcurrentSupport) if (fConcurrentSupport)
{ {
// MCOL-140 We're not necessarily the next in line. // MCOL-140 We're not necessarily the next in line.
// This forces this thread to be released anyway. // This forces this thread to be released anyway.
@ -1168,14 +1175,6 @@ void PackageHandler::rollbackPending()
#endif #endif
if (fProcessor.get() == NULL)
{
// This happens when batch insert
return;
}
fProcessor->setRollbackPending(true);
ostringstream oss; ostringstream oss;
oss << "PackageHandler::rollbackPending: Processing DMLPackage."; oss << "PackageHandler::rollbackPending: Processing DMLPackage.";
DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG); DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG);

View File

@ -151,15 +151,16 @@ private:
DMLServer(const DMLServer& rhs); DMLServer(const DMLServer& rhs);
DMLServer& operator=(const DMLServer& rhs); DMLServer& operator=(const DMLServer& rhs);
/** @brief the thread pool for processing dml packages
*/
threadpool::ThreadPool fDmlPackagepool;
int fPackageMaxThreads; /** @brief max number of threads to process dml packages */ int fPackageMaxThreads; /** @brief max number of threads to process dml packages */
int fPackageWorkQueueSize; /** @brief max number of packages waiting in the work queue */ int fPackageWorkQueueSize; /** @brief max number of packages waiting in the work queue */
boost::scoped_ptr<messageqcpp::MessageQueueServer> fMqServer; boost::scoped_ptr<messageqcpp::MessageQueueServer> fMqServer;
BRM::DBRM* fDbrm; BRM::DBRM* fDbrm;
public:
/** @brief the thread pool for processing dml packages
*/
static threadpool::ThreadPool fDmlPackagepool;
}; };
/** @brief Thread to process a single dml package. /** @brief Thread to process a single dml package.
@ -207,12 +208,12 @@ private:
// Used to serialize operations because the VSS can't handle inserts // Used to serialize operations because the VSS can't handle inserts
// or updates on the same block. // or updates on the same block.
// When an Insert, Update or Delete command arrives, we look here // When an Insert, Update or Delete command arrives, we look here
// for the table oid. If found, wait until it is no onger here. // for the table oid. If found, wait until it is no longer here.
// If this transactionID (SCN) is < the transactionID in the table, don't delay // If this transactionID (SCN) is < the transactionID in the table, don't delay
// and hope for the best, as we're already out of order. // and hope for the best, as we're already out of order.
// When the VSS is engineered to handle transactions out of order, all MCOL-140 // When the VSS is engineered to handle transactions out of order, all MCOL-140
// code is to be removed. // code is to be removed.
int synchTableAccess(); int synchTableAccess(dmlpackage::CalpontDMLPackage* dmlPackage);
int releaseTableAccess(); int releaseTableAccess();
int forceReleaseTableAccess(); int forceReleaseTableAccess();
typedef iterable_queue<execplan::CalpontSystemCatalog::SCN> tableAccessQueue_t; typedef iterable_queue<execplan::CalpontSystemCatalog::SCN> tableAccessQueue_t;
@ -221,6 +222,35 @@ private:
static boost::mutex tableOidMutex; static boost::mutex tableOidMutex;
public: public:
static int clearTableAccess(); static int clearTableAccess();
// MCOL-3296 Add a class to call synchTableAccess on creation and
// releaseTableAccess on destuction for exception safeness.
class SynchTable
{
public:
SynchTable() : fphp(NULL) {};
SynchTable(PackageHandler* php, dmlpackage::CalpontDMLPackage* dmlPackage)
{
setPackage(php, dmlPackage);
}
~SynchTable()
{
if (fphp)
fphp->releaseTableAccess();
}
bool setPackage(PackageHandler* php, dmlpackage::CalpontDMLPackage* dmlPackage)
{
if (fphp)
fphp->releaseTableAccess();
fphp = php;
if (fphp)
fphp->synchTableAccess(dmlPackage);
return true;
}
private:
PackageHandler* fphp;
};
}; };
/** @brief processes dml packages as they arrive /** @brief processes dml packages as they arrive

View File

@ -409,6 +409,24 @@ void ThreadPool::beginThread() throw()
--fIssued; --fIssued;
--waitingFunctorsSize; --waitingFunctorsSize;
fWaitingFunctors.erase(todo); fWaitingFunctors.erase(todo);
if (fDebug)
{
ostringstream oss;
oss << "Ending thread " << " on " << fName
<< " max " << fMaxThreads
<< " queue " << fQueueSize
<< " threads " << fThreadCount
<< " running " << fIssued
<< " waiting " << (waitingFunctorsSize - fIssued)
<< " total " << waitingFunctorsSize;
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
}
} }
timeout = boost::get_system_time() + boost::posix_time::minutes(10); timeout = boost::get_system_time() + boost::posix_time::minutes(10);
@ -536,6 +554,8 @@ void ThreadPoolMonitor::operator()()
<< setw(4) << tv.tv_usec / 100 << setw(4) << tv.tv_usec / 100
<< " Name " << fPool->fName << " Name " << fPool->fName
<< " Active " << fPool->waitingFunctorsSize << " Active " << fPool->waitingFunctorsSize
<< " running " << fPool->fIssued
<< " waiting " << (fPool->waitingFunctorsSize - fPool->fIssued)
<< " ThdCnt " << fPool->fThreadCount << " ThdCnt " << fPool->fThreadCount
<< " Max " << fPool->fMaxThreads << " Max " << fPool->fMaxThreads
<< " Q " << fPool->fQueueSize << " Q " << fPool->fQueueSize