diff --git a/dbcon/joblist/joblist.cpp b/dbcon/joblist/joblist.cpp index e5eb86b26..934dae71d 100644 --- a/dbcon/joblist/joblist.cpp +++ b/dbcon/joblist/joblist.cpp @@ -1067,6 +1067,10 @@ void JobList::abort() fQuery[i]->abort(); for (i = 0; i < fProject.size(); i++) fProject[i]->abort(); + for (i = 0; i < fQuery.size(); i++) + fQuery[i]->join(); + for (i = 0; i < fProject.size(); i++) + fProject[i]->join(); } } diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index bc7f6326b..b5cde967b 100755 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -4691,21 +4691,6 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID) if (more) { - // Trying out a ramp-up strategy for starting the - // first phase threads to cut overhead on big systems - // processing small result - // sets. On every non-zero read from the input FIFO, - // and if there is more data to read, the - // first thread will start another thread until the - // maximum number is reached. -#if 0 - if (threadID == 0 && fFirstPhaseThreadCount < fNumOfThreads && - dlIn->more(fInputIter)) - { - fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, fFirstPhaseThreadCount))); - fFirstPhaseThreadCount++; - } -#endif fRowGroupIns[threadID].setData(&rgData); fMemUsage[threadID] += fRowGroupIns[threadID].getSizeWithStrings(); if (!fRm->getMemory(fRowGroupIns[threadID].getSizeWithStrings(), fSessionMemLimit)) @@ -4971,28 +4956,16 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp { initializeMultiThread(); -// This block of code starts all threads at the start - fFirstPhaseThreadCount = fNumOfThreads; - fFirstPhaseRunners.clear(); - fFirstPhaseRunners.reserve(fNumOfThreads); // to prevent a resize during use - for (i = 0; i < fNumOfThreads; i++) - { - fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i))); - } + vector runners; // thread pool handles + runners.reserve(fNumOfThreads); // to prevent a resize during use + // Start the aggregator threads + for (i = 0; i < fNumOfThreads; i++) + { + runners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i))); + } -#if 0 -// This block of code starts one thread, relies on doThreadedAggregation() -// For reasons unknown, this doesn't work right with threadpool -// to start more as needed - fFirstPhaseRunners.clear(); - fFirstPhaseRunners.reserve(fNumOfThreads); // to prevent a resize during use - fFirstPhaseThreadCount = 1; - fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, 0))); -#endif - - // Now wait for that thread plus all the threads it may have spawned - jobstepThreadPool.join(fFirstPhaseRunners); - fFirstPhaseRunners.clear(); + // Now wait for all those threads + jobstepThreadPool.join(runners); } if (dynamic_cast(fAggregator.get()) && fAggregator->aggMapKeyLength() > 0) diff --git a/dbcon/joblist/tupleaggregatestep.h b/dbcon/joblist/tupleaggregatestep.h index 0839e8263..58b84a129 100644 --- a/dbcon/joblist/tupleaggregatestep.h +++ b/dbcon/joblist/tupleaggregatestep.h @@ -186,8 +186,6 @@ private: bool fIsMultiThread; int fInputIter; // iterator boost::scoped_array fMemUsage; - std::vector fFirstPhaseRunners; // thread pool handles - uint32_t fFirstPhaseThreadCount; boost::shared_ptr fSessionMemLimit; };