1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

MCOL-1212 Diuring abort, ensure that all normal threads are complete before running abort threads. Otherwise, they clash.

This commit is contained in:
David Hall
2018-02-14 10:02:50 -06:00
parent 4220ab0df3
commit e03d75feb6
3 changed files with 13 additions and 38 deletions

View File

@ -4691,21 +4691,6 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
if (more)
{
// Trying out a ramp-up strategy for starting the
// first phase threads to cut overhead on big systems
// processing small result
// sets. On every non-zero read from the input FIFO,
// and if there is more data to read, the
// first thread will start another thread until the
// maximum number is reached.
#if 0
if (threadID == 0 && fFirstPhaseThreadCount < fNumOfThreads &&
dlIn->more(fInputIter))
{
fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, fFirstPhaseThreadCount)));
fFirstPhaseThreadCount++;
}
#endif
fRowGroupIns[threadID].setData(&rgData);
fMemUsage[threadID] += fRowGroupIns[threadID].getSizeWithStrings();
if (!fRm->getMemory(fRowGroupIns[threadID].getSizeWithStrings(), fSessionMemLimit))
@ -4971,28 +4956,16 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
{
initializeMultiThread();
// This block of code starts all threads at the start
fFirstPhaseThreadCount = fNumOfThreads;
fFirstPhaseRunners.clear();
fFirstPhaseRunners.reserve(fNumOfThreads); // to prevent a resize during use
for (i = 0; i < fNumOfThreads; i++)
{
fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i)));
}
vector<uint64_t> runners; // thread pool handles
runners.reserve(fNumOfThreads); // to prevent a resize during use
// Start the aggregator threads
for (i = 0; i < fNumOfThreads; i++)
{
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i)));
}
#if 0
// This block of code starts one thread, relies on doThreadedAggregation()
// For reasons unknown, this doesn't work right with threadpool
// to start more as needed
fFirstPhaseRunners.clear();
fFirstPhaseRunners.reserve(fNumOfThreads); // to prevent a resize during use
fFirstPhaseThreadCount = 1;
fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, 0)));
#endif
// Now wait for that thread plus all the threads it may have spawned
jobstepThreadPool.join(fFirstPhaseRunners);
fFirstPhaseRunners.clear();
// Now wait for all those threads
jobstepThreadPool.join(runners);
}
if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()) && fAggregator->aggMapKeyLength() > 0)