1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

Merge branch 'develop-1.1' into 1.1-mergeup-20180224

This commit is contained in:
Andrew Hutchings
2018-02-24 11:07:24 -05:00
33 changed files with 337 additions and 174 deletions

View File

@ -5063,22 +5063,6 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
if (more)
{
// Trying out a ramp-up strategy for starting the
// first phase threads to cut overhead on big systems
// processing small result
// sets. On every non-zero read from the input FIFO,
// and if there is more data to read, the
// first thread will start another thread until the
// maximum number is reached.
#if 0
if (threadID == 0 && fFirstPhaseThreadCount < fNumOfThreads &&
dlIn->more(fInputIter))
{
fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, fFirstPhaseThreadCount)));
fFirstPhaseThreadCount++;
}
#endif
fRowGroupIns[threadID].setData(&rgData);
fMemUsage[threadID] += fRowGroupIns[threadID].getSizeWithStrings();
@ -5364,29 +5348,17 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
{
initializeMultiThread();
// This block of code starts all threads at the start
fFirstPhaseThreadCount = fNumOfThreads;
fFirstPhaseRunners.clear();
fFirstPhaseRunners.reserve(fNumOfThreads); // to prevent a resize during use
vector<uint64_t> runners; // thread pool handles
runners.reserve(fNumOfThreads); // to prevent a resize during use
// Start the aggregator threads
for (i = 0; i < fNumOfThreads; i++)
{
fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i)));
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i)));
}
#if 0
// This block of code starts one thread, relies on doThreadedAggregation()
// For reasons unknown, this doesn't work right with threadpool
// to start more as needed
fFirstPhaseRunners.clear();
fFirstPhaseRunners.reserve(fNumOfThreads); // to prevent a resize during use
fFirstPhaseThreadCount = 1;
fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, 0)));
#endif
// Now wait for that thread plus all the threads it may have spawned
jobstepThreadPool.join(fFirstPhaseRunners);
fFirstPhaseRunners.clear();
// Now wait for all those threads
jobstepThreadPool.join(runners);
}
if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()) && fAggregator->aggMapKeyLength() > 0)