1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-02 17:22:27 +03:00

MCOL-1128 Allow exeMgrThreadPool to use unlimited threads. The number will be limited by the number of sessions allowed in server.

This commit is contained in:
David Hall
2018-01-12 16:29:01 -06:00
parent d61ff56989
commit 504fcf0778
7 changed files with 47 additions and 33 deletions

View File

@ -56,7 +56,7 @@ namespace joblist
{
boost::mutex JobStep::fLogMutex; //=PTHREAD_MUTEX_INITIALIZER;
ThreadPool JobStep::jobstepThreadPool(defaultJLThreadPoolSize, 0);
ThreadPool JobStep::jobstepThreadPool(0, 0);
ostream& operator<<(ostream& os, const JobStep* rhs)
{

View File

@ -93,8 +93,6 @@ namespace joblist
const uint64_t defaultNumBuckets = 128;
const uint64_t defaultMaxElementsPerBuckert = 16 * 1024 * 1024;
const int defaultEMServerThreads = 50;
const int defaultEMServerQueueSize = 100;
const int defaultEMSecondsBetweenMemChecks = 1;
const int defaultEMMaxPct = 95;
const int defaultEMPriority = 21; // @Bug 3385
@ -149,13 +147,10 @@ namespace joblist
typedef std::map <uint32_t, uint64_t> MemMap;
int getEmServerThreads() const { return getUintVal(fExeMgrStr, "ServerThreads", defaultEMServerThreads); }
int getEmServerQueueSize() const { return getUintVal(fExeMgrStr, "ServerQueueSize", defaultEMServerQueueSize); }
int getEmSecondsBetweenMemChecks() const { return getUintVal(fExeMgrStr, "SecondsBetweenMemChecks", defaultEMSecondsBetweenMemChecks); }
int getEmMaxPct() const { return getUintVal(fExeMgrStr, "MaxPct", defaultEMMaxPct); }
EXPORT int getEmPriority() const;
int getEmExecQueueSize() const { return getUintVal(fExeMgrStr, "ExecQueueSize", defaultEMExecQueueSize); }
int getEmExecQueueSize() const { return getIntVal(fExeMgrStr, "ExecQueueSize", defaultEMExecQueueSize); }
int getHjMaxBuckets() const { return getUintVal(fHashJoinStr, "MaxBuckets", defaultHJMaxBuckets); }
unsigned getHjNumThreads() const { return fHjNumThreads; } //getUintVal(fHashJoinStr, "NumThreads", defaultNumThreads); }

View File

@ -3923,7 +3923,7 @@ int ha_calpont_impl_end_bulk_insert(bool abort, TABLE* table)
if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) && !ci->singleInsert )
{
//@Bug 2438. Only load dta infile calls last batch process
//@Bug 2438. Only load data infile calls last batch process
/* if ( ci->isLoaddataInfile && ((thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) || (ci->useCpimport == 0))) {
//@Bug 2829 Handle ctrl-C
if ( thd->killed > 0 )

View File

@ -565,7 +565,7 @@ int main(int argc, char* argv[])
// because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton).
// From the pools perspective, it has no idea if it is ExeMgr doing the
// creation, so it has no idea which way to set the flag. So we set the max here.
JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize());
// JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize());
JobStep::jobstepThreadPool.setName("DMLProcJobList");
// if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y")

View File

@ -24,7 +24,7 @@ using namespace std;
using namespace joblist;
using namespace messageqcpp;
threadpool::ThreadPool FEMsgHandler::threadPool(50,100);
threadpool::ThreadPool FEMsgHandler::threadPool;
namespace {

View File

@ -1436,7 +1436,7 @@ int main(int argc, char* argv[])
// because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton).
// From the pools perspective, it has no idea if it is ExeMgr doing the
// creation, so it has no idea which way to set the flag. So we set the max here.
JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize());
// JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize());
JobStep::jobstepThreadPool.setName("ExeMgrJobList");
// if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y")
// {
@ -1444,14 +1444,10 @@ int main(int argc, char* argv[])
// JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool));
// }
int serverThreads = rm->getEmServerThreads();
int serverQueueSize = rm->getEmServerQueueSize();
int maxPct = rm->getEmMaxPct();
int pauseSeconds = rm->getEmSecondsBetweenMemChecks();
int priority = rm->getEmPriority();
FEMsgHandler::threadPool.setMaxThreads(serverThreads);
FEMsgHandler::threadPool.setQueueSize(serverQueueSize);
FEMsgHandler::threadPool.setName("FEMsgHandler");
if (maxPct > 0)
@ -1472,8 +1468,7 @@ int main(int argc, char* argv[])
}
}
cout << "Starting ExeMgr: st = " << serverThreads << ", sq = " <<
serverQueueSize << ", qs = " << rm->getEmExecQueueSize() << ", mx = " << maxPct << ", cf = " <<
cout << "Starting ExeMgr: qs = " << rm->getEmExecQueueSize() << ", mx = " << maxPct << ", cf = " <<
rm->getConfig()->configFile() << endl;
//set ACTIVE state
@ -1488,8 +1483,9 @@ int main(int argc, char* argv[])
}
}
threadpool::ThreadPool exeMgrThreadPool(serverThreads, serverQueueSize);
threadpool::ThreadPool exeMgrThreadPool;
exeMgrThreadPool.setName("ExeMgrServer");
exeMgrThreadPool.setDebug(true);
for (;;)
{
IOSocket ios;

View File

@ -196,19 +196,16 @@ uint64_t ThreadPool::invoke(const Functor_T &threadfunc)
bAdded = true;
}
// fQueueSize = 0 disables the queue and is an indicator to allow any number of threads to actually run.
if (fThreadCount < fMaxThreads || fQueueSize == 0)
{
++fThreadCount;
lock1.unlock();
fThreads.create_thread(beginThreadFunc(*this));
if (fDebug)
{
ostringstream oss;
oss << "invoke: Starting thread " << fThreadCount << " max " << fMaxThreads
<< " queue " << fQueueSize;
oss << "invoke thread " << " on " << fName
<< " max " << fMaxThreads
<< " queue " << fQueueSize
<< " threads " << fThreadCount
<< " running " << fIssued
<< " waiting " << (waitingFunctorsSize - fIssued)
<< " total " << waitingFunctorsSize;
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
@ -218,6 +215,14 @@ uint64_t ThreadPool::invoke(const Functor_T &threadfunc)
ml.logWarningMessage( message );
}
// fQueueSize = 0 disables the queue and is an indicator to allow any number of threads to actually run.
if (fThreadCount < fMaxThreads || fQueueSize == 0)
{
++fThreadCount;
lock1.unlock();
fThreads.create_thread(beginThreadFunc(*this));
if (bAdded)
break;
@ -301,6 +306,24 @@ void ThreadPool::beginThread() throw()
{
Container_T::iterator todo = fNextFunctor++;
++fIssued;
if (fDebug)
{
ostringstream oss;
oss << "starting thread " << " on " << fName
<< " max " << fMaxThreads
<< " queue " << fQueueSize
<< " threads " << fThreadCount
<< " running " << fIssued
<< " waiting " << (waitingFunctorsSize - fIssued)
<< " total " << waitingFunctorsSize;
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
}
lock1.unlock();
try
{