From b382f681a17290b213adb24cf5497f83434910bf Mon Sep 17 00:00:00 2001 From: Denis Khalikov Date: Wed, 13 Oct 2021 15:31:05 +0300 Subject: [PATCH] [MCOL-4849] Parallelize the processing of the bytestream vector. This patch changes the logic of the `receiveMultiPrimitiveMessages` function in the following way: 1. We have only one aggregation thread which reads the data from Queue (which is populated by messages from BPPs). 2. Processing of the received `bytestream vector` could be in parallel depends on the type of `TupleBPS` operation (join, fe2, ...) and actual thread pool workload. The motivation is to eliminate some amount of context switches. --- dbcon/joblist/primitivestep.h | 382 +++++++----- dbcon/joblist/tuple-bps.cpp | 1059 +++++++++++++++++---------------- utils/joiner/tuplejoiner.cpp | 5 +- utils/threadpool/threadpool.h | 5 + 4 files changed, 786 insertions(+), 665 deletions(-) diff --git a/dbcon/joblist/primitivestep.h b/dbcon/joblist/primitivestep.h index 80d4eaec2..4a0d92602 100644 --- a/dbcon/joblist/primitivestep.h +++ b/dbcon/joblist/primitivestep.h @@ -1075,6 +1075,24 @@ public: virtual void setFE23Output(const rowgroup::RowGroup& rg) = 0; }; +struct _CPInfo +{ + _CPInfo(int64_t MIN, int64_t MAX, uint64_t l, bool val) : min(MIN), max(MAX), LBID(l), valid(val){}; + _CPInfo(int128_t BIGMIN, int128_t BIGMAX, uint64_t l, bool val) + : bigMin(BIGMIN), bigMax(BIGMAX), LBID(l), valid(val){}; + union + { + int128_t bigMin; + int64_t min; + }; + union + { + int128_t bigMax; + int64_t max; + }; + uint64_t LBID; + bool valid; +}; /** @brief class TupleBPS * @@ -1113,7 +1131,12 @@ public: * * The main loop for the receive-side thread. Don't call it directly. */ - void receiveMultiPrimitiveMessages(uint32_t threadID); + void receiveMultiPrimitiveMessages(); + + // Processes the vector of `bytestream` starting from `begin` index to the `end` index, non inclusive. + void processByteStreamVector(vector>& bsv, + const uint32_t begin, const uint32_t end, vector<_CPInfo>& cpv, + RowGroupDL* dlp, const uint32_t threadID); /** @brief Add a filter when the column is anything but a 4-byte float type. * @@ -1348,158 +1371,246 @@ protected: void sendError(uint16_t status); private: - void formatMiniStats(); + void formatMiniStats(); - void startPrimitiveThread(); - void startAggregationThread(); - void initializeConfigParms(); - uint64_t getFBO(uint64_t lbid); - void checkDupOutputColumns(const rowgroup::RowGroup& rg); - void dupOutputColumns(rowgroup::RowGroup&); - void dupOutputColumns(rowgroup::RGData&, rowgroup::RowGroup&); - void rgDataToDl(rowgroup::RGData&, rowgroup::RowGroup&, RowGroupDL*); - void rgDataVecToDl(std::vector&, rowgroup::RowGroup&, RowGroupDL*); + void startPrimitiveThread(); + void startAggregationThread(); + // Processes the vector of `bytestream` starting from `begin` index to the `end` index, non inclusive. + void startProcessingThread(TupleBPS* tbps, vector>& bsv, + const uint32_t begin, const uint32_t end, vector<_CPInfo>& cpv, RowGroupDL* dlp, + const uint32_t threadID); + void initializeConfigParms(); + uint64_t getFBO(uint64_t lbid); + void checkDupOutputColumns(const rowgroup::RowGroup& rg); + void dupOutputColumns(rowgroup::RowGroup&); + void dupOutputColumns(rowgroup::RGData&, rowgroup::RowGroup&); + void rgDataToDl(rowgroup::RGData&, rowgroup::RowGroup&, RowGroupDL*); + void rgDataVecToDl(std::vector&, rowgroup::RowGroup&, RowGroupDL*); + DistributedEngineComm* fDec; + boost::shared_ptr fBPP; + uint16_t fNumSteps; + int fColWidth; + uint32_t fStepCount; + bool fCPEvaluated; // @bug 2123 + uint64_t fEstimatedRows; // @bug 2123 + /// number of threads on the receive side + uint32_t fMaxNumThreads; + uint32_t fNumThreads; + PrimitiveStepType ffirstStepType; + bool isFilterFeeder; + std::vector fProducerThreads; // thread pool handles + std::vector fProcessorThreads; + messageqcpp::ByteStream fFilterString; + uint32_t fFilterCount; + execplan::CalpontSystemCatalog::ColType fColType; + execplan::CalpontSystemCatalog::OID fOid; + execplan::CalpontSystemCatalog::OID fTableOid; + uint64_t fLastTupleId; + BRM::LBIDRange_v lbidRanges; + std::vector lastExtent; + std::vector lastScannedLBID; + BRM::DBRM dbrm; + SP_LBIDList lbidList; + uint64_t ridsRequested; + uint64_t totalMsgs; + volatile uint64_t msgsSent; + volatile uint64_t msgsRecvd; + volatile bool finishedSending; + bool firstRead; + bool sendWaiting; + uint32_t recvWaiting; + uint32_t recvExited; + uint64_t ridsReturned; + std::map> extentsMap; + std::vector scannedExtents; + OIDVector projectOids; + uint32_t extentSize, divShift, rpbShift, numExtents, modMask; + uint32_t fRequestSize; // the number of logical extents per batch of requests sent to PrimProc. + uint32_t fProcessorThreadsPerScan; // The number of messages sent per logical extent. + bool fSwallowRows; + uint32_t fMaxOutstandingRequests; // The number of logical extents have not processed by PrimProc + uint64_t fPhysicalIO; // total physical I/O count + uint64_t fCacheIO; // total cache I/O count + uint64_t fNumBlksSkipped; // total number of block scans skipped due to CP + uint64_t fMsgBytesIn; // total byte count for incoming messages + uint64_t fMsgBytesOut; // total byte count for outcoming messages + uint64_t fBlockTouched; // total blocks touched + uint32_t fExtentsPerSegFile; // config num of Extents Per Segment File + // uint64_t cThread; //consumer thread. thread handle from thread pool + uint64_t pThread; // producer thread. thread handle from thread pool + boost::mutex tplMutex; + boost::mutex dlMutex; + boost::mutex cpMutex; + boost::mutex serializeJoinerMutex; + boost::condition condvarWakeupProducer, condvar; - DistributedEngineComm* fDec; - boost::shared_ptr fBPP; - uint16_t fNumSteps; - int fColWidth; - uint32_t fStepCount; - bool fCPEvaluated; // @bug 2123 - uint64_t fEstimatedRows; // @bug 2123 - /// number of threads on the receive side - uint32_t fMaxNumThreads; - uint32_t fNumThreads; - PrimitiveStepType ffirstStepType; - bool isFilterFeeder; - std::vector fProducerThreads; // thread pool handles - messageqcpp::ByteStream fFilterString; - uint32_t fFilterCount; - execplan::CalpontSystemCatalog::ColType fColType; - execplan::CalpontSystemCatalog::OID fOid; - execplan::CalpontSystemCatalog::OID fTableOid; - uint64_t fLastTupleId; - BRM::LBIDRange_v lbidRanges; - std::vector lastExtent; - std::vector lastScannedLBID; - BRM::DBRM dbrm; - SP_LBIDList lbidList; - uint64_t ridsRequested; - uint64_t totalMsgs; - volatile uint64_t msgsSent; - volatile uint64_t msgsRecvd; - volatile bool finishedSending; - bool firstRead; - bool sendWaiting; - uint32_t recvWaiting; - uint32_t recvExited; - uint64_t ridsReturned; - std::map > extentsMap; - std::vector scannedExtents; - OIDVector projectOids; - uint32_t extentSize, divShift, rpbShift, numExtents, modMask; - uint32_t fRequestSize; // the number of logical extents per batch of requests sent to PrimProc. - uint32_t fProcessorThreadsPerScan; // The number of messages sent per logical extent. - bool fSwallowRows; - uint32_t fMaxOutstandingRequests; // The number of logical extents have not processed by PrimProc - uint64_t fPhysicalIO; // total physical I/O count - uint64_t fCacheIO; // total cache I/O count - uint64_t fNumBlksSkipped;//total number of block scans skipped due to CP - uint64_t fMsgBytesIn; // total byte count for incoming messages - uint64_t fMsgBytesOut; // total byte count for outcoming messages - uint64_t fBlockTouched; // total blocks touched - uint32_t fExtentsPerSegFile;//config num of Extents Per Segment File - // uint64_t cThread; //consumer thread. thread handle from thread pool - uint64_t pThread; //producer thread. thread handle from thread pool - boost::mutex tplMutex; - boost::mutex dlMutex; - boost::mutex cpMutex; - boost::mutex serializeJoinerMutex; - boost::condition condvarWakeupProducer, condvar; + std::vector scanFlags; // use to keep track of which extents to eliminate from this step + bool BPPIsAllocated; + uint32_t uniqueID; + ResourceManager* fRm; - std::vector scanFlags; // use to keep track of which extents to eliminate from this step - bool BPPIsAllocated; - uint32_t uniqueID; - ResourceManager* fRm; + /* HashJoin support */ - /* HashJoin support */ + void serializeJoiner(); + void serializeJoiner(uint32_t connectionNumber); - void serializeJoiner(); - void serializeJoiner(uint32_t connectionNumber); + void generateJoinResultSet(const std::vector>& joinerOutput, + rowgroup::Row& baseRow, const std::vector>& mappings, + const uint32_t depth, rowgroup::RowGroup& outputRG, rowgroup::RGData& rgData, + std::vector* outputData, + const boost::scoped_array& smallRows, rowgroup::Row& joinedRow); - void generateJoinResultSet(const std::vector >& joinerOutput, - rowgroup::Row& baseRow, const std::vector >& mappings, - const uint32_t depth, rowgroup::RowGroup& outputRG, rowgroup::RGData& rgData, - std::vector* outputData, - const boost::scoped_array& smallRows, rowgroup::Row& joinedRow); + std::vector> tjoiners; + bool doJoin, hasPMJoin, hasUMJoin; + std::vector joinerMatchesRGs; // parses the small-side matches from joiner - std::vector > tjoiners; - bool doJoin, hasPMJoin, hasUMJoin; - std::vector joinerMatchesRGs; // parses the small-side matches from joiner + uint32_t smallSideCount; + int smallOuterJoiner; - uint32_t smallSideCount; - int smallOuterJoiner; + bool fRunExecuted; // was the run method executed for this step + rowgroup::RowGroup inputRowGroup; // for parsing the data read from the datalist + rowgroup::RowGroup primRowGroup; // for parsing the data received from the PM + rowgroup::RowGroup outputRowGroup; // if there's a join, these are the joined + // result, otherwise it's = to primRowGroup + // aggregation support + rowgroup::SP_ROWAGG_PM_t fAggregatorPm; + rowgroup::RowGroup fAggRowGroupPm; - bool fRunExecuted; // was the run method executed for this step - rowgroup::RowGroup inputRowGroup; // for parsing the data read from the datalist - rowgroup::RowGroup primRowGroup; // for parsing the data received from the PM - rowgroup::RowGroup outputRowGroup; // if there's a join, these are the joined - // result, otherwise it's = to primRowGroup - // aggregation support - rowgroup::SP_ROWAGG_PM_t fAggregatorPm; - rowgroup::RowGroup fAggRowGroupPm; + // OR hacks + uint8_t bop; // BOP_AND or BOP_OR - // OR hacks - uint8_t bop; // BOP_AND or BOP_OR + // temporary hack to make sure JobList only calls run and join once + boost::mutex jlLock; + bool runRan; + bool joinRan; - // temporary hack to make sure JobList only calls run and join once - boost::mutex jlLock; - bool runRan; - bool joinRan; + // bug 1965, trace duplicat columns in delivery list + std::vector> dupColumns; - // bug 1965, trace duplicat columns in delivery list - std::vector > dupColumns; + /* Functions & Expressions vars */ + boost::shared_ptr fe1, fe2; + rowgroup::RowGroup fe1Input, fe2Output; + boost::shared_array fe2Mapping; + bool runFEonPM; - /* Functions & Expressions vars */ - boost::shared_ptr fe1, fe2; - rowgroup::RowGroup fe1Input, fe2Output; - boost::shared_array fe2Mapping; - bool runFEonPM; + /* for UM F & E 2 processing */ + rowgroup::RGData fe2Data; + rowgroup::Row fe2InRow, fe2OutRow; - /* for UM F & E 2 processing */ - rowgroup::RGData fe2Data; - rowgroup::Row fe2InRow, fe2OutRow; + void processFE2(rowgroup::RowGroup& input, rowgroup::RowGroup& output, rowgroup::Row& inRow, rowgroup::Row& outRow, + std::vector* rgData, funcexp::FuncExpWrapper* localFE2); + void processFE2_oneRG(rowgroup::RowGroup& input, rowgroup::RowGroup& output, rowgroup::Row& inRow, + rowgroup::Row& outRow, funcexp::FuncExpWrapper* localFE2); - void processFE2(rowgroup::RowGroup& input, rowgroup::RowGroup& output, - rowgroup::Row& inRow, rowgroup::Row& outRow, - std::vector* rgData, - funcexp::FuncExpWrapper* localFE2); - void processFE2_oneRG(rowgroup::RowGroup& input, rowgroup::RowGroup& output, - rowgroup::Row& inRow, rowgroup::Row& outRow, - funcexp::FuncExpWrapper* localFE2); + /* Runtime Casual Partitioning adjustments. The CP code is needlessly complicated; + * to avoid making it worse, decided to designate 'scanFlags' as the static + * component and this new array as the runtime component. The final CP decision + * is scanFlags & runtimeCP. + */ + std::vector runtimeCPFlags; - /* Runtime Casual Partitioning adjustments. The CP code is needlessly complicated; - * to avoid making it worse, decided to designate 'scanFlags' as the static - * component and this new array as the runtime component. The final CP decision - * is scanFlags & runtimeCP. - */ - std::vector runtimeCPFlags; + /* semijoin vars */ + rowgroup::RowGroup joinFERG; - /* semijoin vars */ - rowgroup::RowGroup joinFERG; + boost::shared_ptr deliveryDL; + uint32_t deliveryIt; - boost::shared_ptr deliveryDL; - uint32_t deliveryIt; + struct JoinLocalData + { + JoinLocalData() = delete; + JoinLocalData(const JoinLocalData&) = delete; + JoinLocalData(JoinLocalData&&) = delete; + JoinLocalData& operator=(const JoinLocalData&) = delete; + JoinLocalData& operator=(JoinLocalData&&) = delete; + ~JoinLocalData() = default; - /* shared nothing support */ - struct Job - { - Job(uint32_t d, uint32_t n, uint32_t b, boost::shared_ptr& bs) : - dbroot(d), connectionNum(n), expectedResponses(b), msg(bs) { } - uint32_t dbroot; - uint32_t connectionNum; - uint32_t expectedResponses; - boost::shared_ptr msg; + JoinLocalData(rowgroup::RowGroup& primRowGroup, rowgroup::RowGroup& outputRowGroup, + boost::shared_ptr& fe2, rowgroup::RowGroup& fe2Output, + std::vector& joinerMatchesRGs, rowgroup::RowGroup& joinFERG, + std::vector>& tjoiners, uint32_t smallSideCount, + bool doJoin); + + rowgroup::RowGroup local_primRG; + rowgroup::RowGroup local_outputRG; + + uint32_t cachedIO_Thread = 0; + uint32_t physIO_Thread = 0; + uint32_t touchedBlocks_Thread = 0; + int64_t ridsReturned_Thread = 0; + + // On init. + bool doJoin; + boost::shared_ptr fe2; + rowgroup::RowGroup fe2Output; + uint32_t smallSideCount; + std::vector joinerMatchesRGs; + rowgroup::RowGroup joinFERG; + std::vector> tjoiners; + + // Join vars. + vector> joinerOutput; + rowgroup::Row largeSideRow; + rowgroup::Row joinedBaseRow; + rowgroup::Row largeNull; + rowgroup::Row joinFERow; // LSR clean + boost::scoped_array smallSideRows; + boost::scoped_array smallNulls; + boost::scoped_array joinedBaseRowData; + boost::scoped_array joinFERowData; + boost::shared_array largeMapping; + vector> smallMappings; + vector> fergMappings; + rowgroup::RGData joinedData; + boost::scoped_array largeNullMemory; + boost::scoped_array> smallNullMemory; + uint32_t matchCount; + + rowgroup::Row postJoinRow; + rowgroup::RowGroup local_fe2Output; + rowgroup::RGData local_fe2Data; + rowgroup::Row local_fe2OutRow; + funcexp::FuncExpWrapper local_fe2; + }; + + // We will initialize the actual value in TupleBPS `initializeConfigParms` function. + uint32_t fMaxNumProcessorThreads = 16; + + // Based on the `ThreadPool` workload we set it in runtime for each `TupleBPS` operation. + uint32_t fNumProcessorThreads = 1; + + std::shared_ptr getJoinLocalDataByIndex(uint32_t index) + { + idbassert(index < fNumProcessorThreads && joinLocalDataPool.size() == fNumProcessorThreads); + return joinLocalDataPool[index]; + } + + void initializeJoinLocalDataPool(uint32_t numThreads) + { + idbassert(numThreads <= fMaxNumProcessorThreads); + for (uint32_t i = 0; i < numThreads; ++i) + { + joinLocalDataPool.push_back(std::shared_ptr( + new JoinLocalData(primRowGroup, outputRowGroup, fe2, fe2Output, joinerMatchesRGs, joinFERG, + tjoiners, smallSideCount, doJoin))); + } + + fNumProcessorThreads = numThreads; + } + + // Join local data vector. + std::vector> joinLocalDataPool; + + /* shared nothing support */ + struct Job + { + Job(uint32_t d, uint32_t n, uint32_t b, boost::shared_ptr& bs) + : dbroot(d), connectionNum(n), expectedResponses(b), msg(bs) + { + } + uint32_t dbroot; + uint32_t connectionNum; + uint32_t expectedResponses; + boost::shared_ptr msg; }; void prepCasualPartitioning(); @@ -1523,8 +1634,7 @@ private: bool compareRange(uint8_t COP, int64_t min, int64_t max, int64_t val) const; bool hasPCFilter, hasPMFilter, hasRIDFilter, hasSegmentFilter, hasDBRootFilter, hasSegmentDirFilter, hasPartitionFilter, hasMaxFilter, hasMinFilter, hasLBIDFilter, hasExtentIDFilter; - -}; + }; /** @brief class FilterStep * diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index 82d41fe54..f57a05673 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -129,18 +129,15 @@ struct TupleBPSPrimitive struct TupleBPSAggregators { - TupleBPSAggregators(TupleBPS* batchPrimitiveStep, uint64_t index) : - fBatchPrimitiveStepCols(batchPrimitiveStep), fThreadId(index) - {} + TupleBPSAggregators(TupleBPS* batchPrimitiveStep) : fBatchPrimitiveStepCols(batchPrimitiveStep) {} TupleBPS* fBatchPrimitiveStepCols; - uint64_t fThreadId; void operator()() { try { utils::setThreadName("BPSAggregator"); - fBatchPrimitiveStepCols->receiveMultiPrimitiveMessages(fThreadId); + fBatchPrimitiveStepCols->receiveMultiPrimitiveMessages(); } catch (std::exception& re) { @@ -156,6 +153,109 @@ struct TupleBPSAggregators } }; +TupleBPS::JoinLocalData::JoinLocalData(RowGroup& primRowGroup, RowGroup& outputRowGroup, + boost::shared_ptr& fe2, + rowgroup::RowGroup& fe2Output, + std::vector& joinerMatchesRGs, + rowgroup::RowGroup& joinFERG, + std::vector>& tjoiners, + uint32_t smallSideCount, bool doJoin) + : local_primRG(primRowGroup), local_outputRG(outputRowGroup), fe2(fe2), fe2Output(fe2Output), + joinerMatchesRGs(joinerMatchesRGs), joinFERG(joinFERG), tjoiners(tjoiners), + smallSideCount(smallSideCount), doJoin(doJoin) +{ + if (doJoin || fe2) + { + local_outputRG.initRow(&postJoinRow); + } + + if (fe2) + { + local_fe2Output = fe2Output; + local_fe2Output.initRow(&local_fe2OutRow); + local_fe2Data.reinit(fe2Output); + local_fe2Output.setData(&local_fe2Data); + // local_fe2OutRow = fe2OutRow; + local_fe2 = *fe2; + } + + if (doJoin) + { + joinerOutput.resize(smallSideCount); + smallSideRows.reset(new Row[smallSideCount]); + smallNulls.reset(new Row[smallSideCount]); + smallMappings.resize(smallSideCount); + fergMappings.resize(smallSideCount + 1); + smallNullMemory.reset(new shared_array[smallSideCount]); + local_primRG.initRow(&largeSideRow); + local_outputRG.initRow(&joinedBaseRow, true); + joinedBaseRowData.reset(new uint8_t[joinedBaseRow.getSize()]); + joinedBaseRow.setData(joinedBaseRowData.get()); + joinedBaseRow.initToNull(); + largeMapping = makeMapping(local_primRG, local_outputRG); + + bool hasJoinFE = false; + + for (int i = 0; i < smallSideCount; i++) + { + joinerMatchesRGs[i].initRow(&(smallSideRows[i])); + smallMappings[i] = makeMapping(joinerMatchesRGs[i], local_outputRG); + + if (tjoiners[i]->hasFEFilter()) + { + fergMappings[i] = makeMapping(joinerMatchesRGs[i], joinFERG); + hasJoinFE = true; + } + } + + if (hasJoinFE) + { + joinFERG.initRow(&joinFERow, true); + joinFERowData.reset(new uint8_t[joinFERow.getSize()]); + memset(joinFERowData.get(), 0, joinFERow.getSize()); + joinFERow.setData(joinFERowData.get()); + fergMappings[smallSideCount] = makeMapping(local_primRG, joinFERG); + } + + for (int i = 0; i < smallSideCount; i++) + { + joinerMatchesRGs[i].initRow(&(smallNulls[i]), true); + smallNullMemory[i].reset(new uint8_t[smallNulls[i].getSize()]); + smallNulls[i].setData(smallNullMemory[i].get()); + smallNulls[i].initToNull(); + } + + local_primRG.initRow(&largeNull, true); + largeNullMemory.reset(new uint8_t[largeNull.getSize()]); + largeNull.setData(largeNullMemory.get()); + largeNull.initToNull(); + } +} + +struct ByteStreamProcessor +{ + ByteStreamProcessor(TupleBPS* tbps, vector>& bsv, + const uint32_t begin, const uint32_t end, vector<_CPInfo>& cpv, RowGroupDL* dlp, + const uint32_t threadID) + : tbps(tbps), bsv(bsv), begin(begin), end(end), cpv(cpv), dlp(dlp), threadID(threadID) + { + } + + TupleBPS* tbps; + vector>& bsv; + uint32_t begin; + uint32_t end; + vector<_CPInfo>& cpv; + RowGroupDL* dlp; + uint32_t threadID; + + void operator()() + { + utils::setThreadName("ByteStreamProcessor"); + tbps->processByteStreamVector(bsv, begin, end, cpv, dlp, threadID); + } +}; + //------------------------------------------------------------------------------ // Initialize configurable parameters //------------------------------------------------------------------------------ @@ -185,9 +285,15 @@ void TupleBPS::initializeConfigParms() fRequestSize = 1; if ((fSessionId & 0x80000000) == 0) + { fMaxNumThreads = fRm->getJlNumScanReceiveThreads(); + fMaxNumProcessorThreads = fMaxNumThreads; + } else + { fMaxNumThreads = 1; + fMaxNumProcessorThreads = 1; + } // Reserve the max number of thread space. A bit of an optimization. fProducerThreads.clear(); @@ -870,12 +976,16 @@ void TupleBPS::startAggregationThread() // for (uint32_t i = 0; i < fMaxNumThreads; i++) // fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this, i))); -// This block of code starts one thread at a time - if (fNumThreads >= fMaxNumThreads) - return; - fNumThreads++; - fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this, fNumThreads - 1))); + fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this))); +} + +void TupleBPS::startProcessingThread(TupleBPS* tbps, vector>& bsv, + const uint32_t start, const uint32_t end, vector<_CPInfo>& cpv, + RowGroupDL* dlp, const uint32_t threadID) +{ + fProcessorThreads.push_back( + jobstepThreadPool.invoke(ByteStreamProcessor(tbps, bsv, start, end, cpv, dlp, threadID))); } void TupleBPS::serializeJoiner() @@ -1176,11 +1286,11 @@ void TupleBPS::run() deliveryIt = deliveryDL->getIterator(); } - fBPP->setThreadCount(fMaxNumThreads); + fBPP->setThreadCount(fMaxNumProcessorThreads); if (doJoin) for (i = 0; i < smallSideCount; i++) - tjoiners[i]->setThreadCount(fMaxNumThreads); + tjoiners[i]->setThreadCount(fMaxNumProcessorThreads); if (fe1) fBPP->setFEGroup1(fe1, fe1Input); @@ -1868,11 +1978,8 @@ void TupleBPS::sendPrimitiveMessages() catch (...) { sendError(logging::ERR_TUPLE_BPS); - handleException(std::current_exception(), - logging::ERR_TUPLE_BPS, - logging::ERR_ALWAYS_CRITICAL, - "st: " + std::to_string(fStepId) + - " TupleBPS::sendPrimitiveMessages()"); + handleException(std::current_exception(), logging::ERR_TUPLE_BPS, logging::ERR_ALWAYS_CRITICAL, + "st: " + std::to_string(fStepId) + " TupleBPS::sendPrimitiveMessages()"); abort_nolock(); } @@ -1883,196 +1990,282 @@ abort: tplLock.unlock(); } -struct _CPInfo +void TupleBPS::processByteStreamVector(vector>& bsv, + const uint32_t begin, const uint32_t end, vector<_CPInfo>& cpv, + RowGroupDL* dlp, const uint32_t threadID) { - _CPInfo(int64_t MIN, int64_t MAX, uint64_t l, bool val) : min(MIN), max(MAX), LBID(l), valid(val) { }; - _CPInfo(int128_t BIGMIN, int128_t BIGMAX, uint64_t l, bool val) : bigMin(BIGMIN), bigMax(BIGMAX), LBID(l), valid(val) { }; - union - { - int128_t bigMin; - int64_t min; - }; - union - { - int128_t bigMax; - int64_t max; - }; - uint64_t LBID; - bool valid; -}; - -void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID) -{ - AnyDataListSPtr dl = fOutputJobStepAssociation.outAt(0); - RowGroupDL* dlp = (fDelivery ? deliveryDL.get() : dl->rowGroupDL()); - - uint32_t size = 0; - vector > bsv; - - RGData rgData; - vector rgDatav; - vector fromPrimProc; + rowgroup::RGData rgData; + vector rgDatav; + vector fromPrimProc; + auto data = getJoinLocalDataByIndex(threadID); bool validCPData; bool hasBinaryColumn; int128_t min; int128_t max; uint64_t lbid; - vector<_CPInfo> cpv; uint32_t cachedIO; uint32_t physIO; uint32_t touchedBlocks; - uint32_t cachedIO_Thread = 0; - uint32_t physIO_Thread = 0; - uint32_t touchedBlocks_Thread = 0; - int64_t ridsReturned_Thread = 0; - bool lastThread = false; - uint32_t i, j, k; - RowGroup local_primRG = primRowGroup; - RowGroup local_outputRG = outputRowGroup; - bool didEOF = false; - bool unused; - /* Join vars */ - vector > joinerOutput; // clean usage - Row largeSideRow, joinedBaseRow, largeNull, joinFERow; // LSR clean - scoped_array smallSideRows, smallNulls; - scoped_array joinedBaseRowData; - scoped_array joinFERowData; - shared_array largeMapping; - vector > smallMappings; - vector > fergMappings; - RGData joinedData; - scoped_array largeNullMemory; - scoped_array > smallNullMemory; - uint32_t matchCount; + for (uint32_t i = begin; i < end; ++i) + { + messageqcpp::ByteStream* bs = bsv[i].get(); + + // @bug 488. when PrimProc node is down. error out + // An error condition. We are not going to do anymore. + ISMPacketHeader* hdr = (ISMPacketHeader*) (bs->buf()); + +#ifdef DEBUG_MPM + cout << "BS length: " << bs->length() << endl; +#endif + if (bs->length() == 0 || hdr->Status > 0) + { + // PM errors mean this should abort right away instead of draining the PM backlog + // tplLock.lock(); + + if (bs->length() == 0) + { + errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_PRIMPROC_DOWN)); + status(ERR_PRIMPROC_DOWN); + } + else + { + string errMsg; + + bs->advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); + *bs >> errMsg; + status(hdr->Status); + errorMessage(errMsg); + } + + // Sets `fDie` to true, so other threads can check `cancelled()`. + abort_nolock(); + return; + } + + bool unused; + fromPrimProc.clear(); + fBPP->getRowGroupData(*bs, &fromPrimProc, &validCPData, &lbid, &min, &max, &cachedIO, + &physIO, &touchedBlocks, &unused, threadID, &hasBinaryColumn, + fColType); + + // Another layer of messiness. Need to refactor this fcn. + while (!fromPrimProc.empty() && !cancelled()) + { + rgData = fromPrimProc.back(); + fromPrimProc.pop_back(); + + data->local_primRG.setData(&rgData); + // TODO need the pre-join count even on PM joins... later + data->ridsReturned_Thread += data->local_primRG.getRowCount(); + + // TupleHashJoinStep::joinOneRG() is a port of the main join loop here. Any + // changes made here should also be made there and vice versa. + if (hasUMJoin || !fBPP->pmSendsFinalResult()) + { + data->joinedData = RGData(data->local_outputRG); + data->local_outputRG.setData(&data->joinedData); + data->local_outputRG.resetRowGroup(data->local_primRG.getBaseRid()); + data->local_outputRG.setDBRoot(data->local_primRG.getDBRoot()); + data->local_primRG.getRow(0, &data->largeSideRow); + + for (uint32_t k = 0; k < data->local_primRG.getRowCount() && !cancelled(); + k++, data->largeSideRow.nextRow()) + { + uint32_t matchCount = 0; + + for (uint32_t j = 0; j < smallSideCount; j++) + { + tjoiners[j]->match(data->largeSideRow, k, threadID, &(data->joinerOutput[j])); +#ifdef JLF_DEBUG + // Debugging code to print the matches + Row r; + joinerMatchesRGs[j].initRow(&r); + cout << data->joinerOutput[j].size() << " matches: \n"; + for (uint32_t z = 0; z < data->joinerOutput[j].size(); z++) + { + r.setPointer(data->joinerOutput[j][z]); + cout << " " << r.toString() << endl; + } +#endif + matchCount = data->joinerOutput[j].size(); + + if (tjoiners[j]->inUM()) + { + // Count the # of rows that pass the join filter + if (tjoiners[j]->hasFEFilter() && matchCount > 0) + { + vector newJoinerOutput; + applyMapping(data->fergMappings[smallSideCount], data->largeSideRow, + &data->joinFERow); + + for (uint32_t z = 0; z < data->joinerOutput[j].size(); z++) + { + data->smallSideRows[j].setPointer(data->joinerOutput[j][z]); + applyMapping(data->fergMappings[j], data->smallSideRows[j], + &data->joinFERow); + + if (!tjoiners[j]->evaluateFilter(data->joinFERow, threadID)) + matchCount--; + else + { + // The first match includes it in a SEMI join result and excludes it + // from an ANTI join result. If it's SEMI & SCALAR however, it needs + // to continue. + newJoinerOutput.push_back(data->joinerOutput[j][z]); + + if (tjoiners[j]->antiJoin() || + (tjoiners[j]->semiJoin() && !tjoiners[j]->scalar())) + break; + } + } + + // the filter eliminated all matches, need to join with the NULL row + if (matchCount == 0 && tjoiners[j]->largeOuterJoin()) + { + newJoinerOutput.push_back(Row::Pointer(data->smallNullMemory[j].get())); + matchCount = 1; + } + + data->joinerOutput[j].swap(newJoinerOutput); + } + + // XXXPAT: This has gone through enough revisions it would benefit + // from refactoring + + // If anti-join, reverse the result + if (tjoiners[j]->antiJoin()) + { + matchCount = (matchCount ? 0 : 1); + } + + if (matchCount == 0) + { + data->joinerOutput[j].clear(); + break; + } + else if (!tjoiners[j]->scalar() && + (tjoiners[j]->antiJoin() || tjoiners[j]->semiJoin())) + { + data->joinerOutput[j].clear(); + data->joinerOutput[j].push_back(Row::Pointer(data->smallNullMemory[j].get())); + matchCount = 1; + } + } + + if (matchCount == 0 && tjoiners[j]->innerJoin()) + break; + + // Scalar check + if (tjoiners[j]->scalar() && matchCount > 1) + { + errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_MORE_THAN_1_ROW)); + status(ERR_MORE_THAN_1_ROW); + abort(); + } + + if (tjoiners[j]->smallOuterJoin()) + tjoiners[j]->markMatches(threadID, data->joinerOutput[j]); + } + + if (matchCount > 0) + { + applyMapping(data->largeMapping, data->largeSideRow, &data->joinedBaseRow); + data->joinedBaseRow.setRid(data->largeSideRow.getRelRid()); + generateJoinResultSet(data->joinerOutput, data->joinedBaseRow, data->smallMappings, 0, + data->local_outputRG, data->joinedData, &rgDatav, + data->smallSideRows, data->postJoinRow); + + // Bug 3510: Don't let the join results buffer get out of control. Need + // to refactor this. All post-join processing needs to go here AND below + // for now. + if (rgDatav.size() * data->local_outputRG.getMaxDataSize() > 50000000) + { + RowGroup out(data->local_outputRG); + + if (fe2 && !runFEonPM) + { + processFE2(out, data->local_fe2Output, data->postJoinRow, data->local_fe2OutRow, + &rgDatav, &data->local_fe2); + rgDataVecToDl(rgDatav, data->local_fe2Output, dlp); + } + else + rgDataVecToDl(rgDatav, out, dlp); + } + } + } // end of the for-loop in the join code + + if (data->local_outputRG.getRowCount() > 0) + { + rgDatav.push_back(data->joinedData); + } + } + else + { + rgDatav.push_back(rgData); + } + + // Execute UM F & E group 2 on rgDatav + if (fe2 && !runFEonPM && rgDatav.size() > 0 && !cancelled()) + { + processFE2(data->local_outputRG, data->local_fe2Output, data->postJoinRow, + data->local_fe2OutRow, &rgDatav, &data->local_fe2); + rgDataVecToDl(rgDatav, data->local_fe2Output, dlp); + } + + data->cachedIO_Thread += cachedIO; + data->physIO_Thread += physIO; + data->touchedBlocks_Thread += touchedBlocks; + + if (fOid >= 3000 && ffirstStepType == SCAN && bop == BOP_AND) + { + if (fColType.colWidth <= 8) + { + cpv.push_back(_CPInfo((int64_t) min, (int64_t) max, lbid, validCPData)); + } + else if (fColType.colWidth == 16) + { + cpv.push_back(_CPInfo(min, max, lbid, validCPData)); + } + } + } // end of the per-rowgroup processing loop + + // insert the resulting rowgroup data from a single bytestream into dlp + if (rgDatav.size() > 0) + { + if (fe2 && runFEonPM) + rgDataVecToDl(rgDatav, data->local_fe2Output, dlp); + else + rgDataVecToDl(rgDatav, data->local_outputRG, dlp); + } + } +} + +void TupleBPS::receiveMultiPrimitiveMessages() +{ + AnyDataListSPtr dl = fOutputJobStepAssociation.outAt(0); + RowGroupDL* dlp = (fDelivery ? deliveryDL.get() : dl->rowGroupDL()); - /* Thread-scoped F&E 2 var */ - Row postJoinRow; // postJoinRow is also used for joins - RowGroup local_fe2Output; - RGData local_fe2Data; - Row local_fe2OutRow; - funcexp::FuncExpWrapper local_fe2; StepTeleStats sts; sts.query_uuid = fQueryUuid; sts.step_uuid = fStepUuid; + + uint32_t size = 0; + + // Based on the type of `tupleBPS` operation - initialize the `JoinLocalDataPool`. + // We initialize the max possible number of threads, because the right number of parallel threads will be + // chosen right before the `vector of bytestream` processing based on `ThreadPool` workload. + if (doJoin || fe2) + initializeJoinLocalDataPool(fMaxNumProcessorThreads); + else + initializeJoinLocalDataPool(1); + + vector> bsv; boost::unique_lock tplLock(tplMutex, boost::defer_lock); try { - if (doJoin || fe2) - { - local_outputRG.initRow(&postJoinRow); - } - - if (fe2) - { - local_fe2Output = fe2Output; - local_fe2Output.initRow(&local_fe2OutRow); - local_fe2Data.reinit(fe2Output); - local_fe2Output.setData(&local_fe2Data); - // local_fe2OutRow = fe2OutRow; - local_fe2 = *fe2; - } - - if (doJoin) - { - joinerOutput.resize(smallSideCount); - smallSideRows.reset(new Row[smallSideCount]); - smallNulls.reset(new Row[smallSideCount]); - smallMappings.resize(smallSideCount); - fergMappings.resize(smallSideCount + 1); - smallNullMemory.reset(new shared_array[smallSideCount]); - local_primRG.initRow(&largeSideRow); - local_outputRG.initRow(&joinedBaseRow, true); - joinedBaseRowData.reset(new uint8_t[joinedBaseRow.getSize()]); - joinedBaseRow.setData(joinedBaseRowData.get()); - joinedBaseRow.initToNull(); - largeMapping = makeMapping(local_primRG, local_outputRG); - - bool hasJoinFE = false; - - for (i = 0; i < smallSideCount; i++) - { - joinerMatchesRGs[i].initRow(&smallSideRows[i]); - smallMappings[i] = makeMapping(joinerMatchesRGs[i], local_outputRG); - -// if (tjoiners[i]->semiJoin() || tjoiners[i]->antiJoin()) { - if (tjoiners[i]->hasFEFilter()) - { - fergMappings[i] = makeMapping(joinerMatchesRGs[i], joinFERG); - hasJoinFE = true; - } - -// } - } - - if (hasJoinFE) - { - joinFERG.initRow(&joinFERow, true); - joinFERowData.reset(new uint8_t[joinFERow.getSize()]); - memset(joinFERowData.get(), 0, joinFERow.getSize()); - joinFERow.setData(joinFERowData.get()); - fergMappings[smallSideCount] = makeMapping(local_primRG, joinFERG); - } - - for (i = 0; i < smallSideCount; i++) - { - joinerMatchesRGs[i].initRow(&smallNulls[i], true); - smallNullMemory[i].reset(new uint8_t[smallNulls[i].getSize()]); - smallNulls[i].setData(smallNullMemory[i].get()); - smallNulls[i].initToNull(); - } - - local_primRG.initRow(&largeNull, true); - largeNullMemory.reset(new uint8_t[largeNull.getSize()]); - largeNull.setData(largeNullMemory.get()); - largeNull.initToNull(); - -#if 0 - - if (threadID == 0) - { - /* Some rowgroup debugging stuff. */ - uint8_t* tmp8; - tmp8 = local_primRG.getData(); - local_primRG.setData(NULL); - cout << "large-side RG: " << local_primRG.toString() << endl; - local_primRG.setData(tmp8); - - for (i = 0; i < smallSideCount; i++) - { - tmp8 = joinerMatchesRGs[i].getData(); - joinerMatchesRGs[i].setData(NULL); - cout << "small-side[" << i << "] RG: " << joinerMatchesRGs[i].toString() << endl; - } - - tmp8 = local_outputRG.getData(); - local_outputRG.setData(NULL); - cout << "output RG: " << local_outputRG.toString() << endl; - local_outputRG.setData(tmp8); - - cout << "large mapping:\n"; - - for (i = 0; i < local_primRG.getColumnCount(); i++) - cout << largeMapping[i] << " "; - - cout << endl; - - for (uint32_t z = 0; z < smallSideCount; z++) - { - cout << "small mapping[" << z << "] :\n"; - - for (i = 0; i < joinerMatchesRGs[z].getColumnCount(); i++) - cout << smallMappings[z][i] << " "; - - cout << endl; - } - } - -#endif - } - tplLock.lock(); while (1) @@ -2096,32 +2289,13 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID) if (traceOn() && fOid >= 3000 && dlTimes.FirstReadTime().tv_sec == 0) dlTimes.setFirstReadTime(); - if (fOid >= 3000 && threadID == 0 && sts.msg_type == StepTeleStats::ST_INVALID && size > 0) + if (fOid >= 3000 && sts.msg_type == StepTeleStats::ST_INVALID && size > 0) { sts.msg_type = StepTeleStats::ST_START; sts.total_units_of_work = totalMsgs; postStepStartTele(sts); } - /* This is a simple ramp-up of the TBPS msg processing threads. - One thread is created by run(), and add'l threads are created - as needed. Right now, "as needed" means that flow control - is on, which means that the UM is not keeping up by definition, - or size > 5. We found that using flow control alone was not aggressive - enough when the messages were small. The 'size' parameter checks - the number of msgs waiting in the DEC buffers. Since each - message can be processed independently of the others, they can all - be processed in different threads. In benchmarking we found that - there was no end-to-end performance difference between using 1 - and 20 msgs as the threshold. Erring on the side of aggressiveness, - we chose '5'. - '5' still preserves the original goal of not starting MAX threads - for small queries or when the UM can keep up with the PMs with - fewer threads. Tweak as necessary. */ - - if ((size > 5 || flowControlOn) && fNumThreads < fMaxNumThreads) - startAggregationThread(); - for (uint32_t z = 0; z < size; z++) { if (bsv[z]->length() > 0 && fBPP->countThisMsg(*(bsv[z]))) @@ -2134,7 +2308,8 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID) (fMaxOutstandingRequests << LOGICAL_EXTENT_CONVERTER))) { condvarWakeupProducer.notify_one(); - THROTTLEDEBUG << "receiveMultiPrimitiveMessages wakes up sending side .. " << " msgsSent: " << msgsSent << " msgsRecvd = " << msgsRecvd << endl; + THROTTLEDEBUG << "receiveMultiPrimitiveMessages wakes up sending side .. " + << " msgsSent: " << msgsSent << " msgsRecvd = " << msgsRecvd << endl; } /* If there's an error and the joblist is being aborted, don't @@ -2157,256 +2332,91 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID) tplLock.unlock(); - for (i = 0; i < size && !cancelled(); i++) + vector> cpInfos; + // Calculate the work sizes. + const uint32_t issuedThreads = jobstepThreadPool.getIssuedThreads(); + const uint32_t maxNumOfThreads = jobstepThreadPool.getMaxThreads(); + uint32_t numOfThreads = 1; + + if (issuedThreads < maxNumOfThreads && (doJoin || fe2)) { - ByteStream* bs = bsv[i].get(); + // We cannot parallel more than allocated data we have. + numOfThreads = std::min(maxNumOfThreads - issuedThreads, fNumProcessorThreads); + } - // @bug 488. when PrimProc node is down. error out - //An error condition. We are not going to do anymore. - ISMPacketHeader* hdr = (ISMPacketHeader*)(bs->buf()); + uint32_t workSize = size / numOfThreads; + if (!workSize) + { + workSize = size; + numOfThreads = 1; + } - if (bs->length() == 0 || hdr->Status > 0) - { - /* PM errors mean this should abort right away instead of draining the PM backlog */ - tplLock.lock(); + vector workSizes; + // Calculate the work size for each thread. + workSizes.reserve(numOfThreads); + cpInfos.reserve(numOfThreads); - if (bs->length() == 0) - { - errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_PRIMPROC_DOWN)); - status(ERR_PRIMPROC_DOWN); - } - else - { - string errMsg; + for (uint32_t i = 0; i < numOfThreads; ++i) + { + workSizes.push_back(workSize); + cpInfos.push_back(vector<_CPInfo>()); + } - bs->advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); - *bs >> errMsg; - status(hdr->Status); - errorMessage(errMsg); - } + const uint32_t moreWork = size % numOfThreads; + for (uint32_t i = 0; i < moreWork; ++i) + { + ++workSizes[i]; + } - abort_nolock(); - goto out; - } - - fromPrimProc.clear(); - fBPP->getRowGroupData(*bs, &fromPrimProc, &validCPData, &lbid, &min, &max, - &cachedIO, &physIO, &touchedBlocks, &unused, threadID, &hasBinaryColumn, fColType); - - /* Another layer of messiness. Need to refactor this fcn. */ - while (!fromPrimProc.empty() && !cancelled()) - { - rgData = fromPrimProc.back(); - fromPrimProc.pop_back(); - - local_primRG.setData(&rgData); - ridsReturned_Thread += local_primRG.getRowCount(); // TODO need the pre-join count even on PM joins... later - - /* TupleHashJoinStep::joinOneRG() is a port of the main join loop here. Any - * changes made here should also be made there and vice versa. */ - if (hasUMJoin || !fBPP->pmSendsFinalResult()) - { - joinedData = RGData(local_outputRG); - local_outputRG.setData(&joinedData); - local_outputRG.resetRowGroup(local_primRG.getBaseRid()); - local_outputRG.setDBRoot(local_primRG.getDBRoot()); - local_primRG.getRow(0, &largeSideRow); - - for (k = 0; k < local_primRG.getRowCount() && !cancelled(); k++, largeSideRow.nextRow()) - { - matchCount = 0; - - for (j = 0; j < smallSideCount; j++) - { - tjoiners[j]->match(largeSideRow, k, threadID, &joinerOutput[j]); -#ifdef JLF_DEBUG - // Debugging code to print the matches - Row r; - joinerMatchesRGs[j].initRow(&r); - cout << joinerOutput[j].size() << " matches: \n"; - for (uint32_t z = 0; z < joinerOutput[j].size(); z++) { - r.setPointer(joinerOutput[j][z]); - cout << " " << r.toString() << endl; - } + uint32_t start = 0; +#ifdef DEBUG_BSV + cout << "Number of threads: " << workSizes.size() << endl; #endif - matchCount = joinerOutput[j].size(); + for (uint32_t i = 0, e = workSizes.size(); i < e; ++i) + { - if (tjoiners[j]->inUM()) - { - /* Count the # of rows that pass the join filter */ - if (tjoiners[j]->hasFEFilter() && matchCount > 0) - { - vector newJoinerOutput; - applyMapping(fergMappings[smallSideCount], largeSideRow, &joinFERow); +#ifdef DEBUG_BSV + cout << "Thread # " << i << " work size " << workSizes[i] << endl; +#endif + uint32_t end = start + workSizes[i]; + startProcessingThread(this, bsv, start, end, cpInfos[i], dlp, i); + start = end; + } - for (uint32_t z = 0; z < joinerOutput[j].size(); z++) - { - smallSideRows[j].setPointer(joinerOutput[j][z]); - applyMapping(fergMappings[j], smallSideRows[j], &joinFERow); + // Join threads. + for (uint32_t i = 0, e = fProcessorThreads.size(); i < e; ++i) + jobstepThreadPool.join(fProcessorThreads[i]); - if (!tjoiners[j]->evaluateFilter(joinFERow, threadID)) - matchCount--; - else - { - /* The first match includes it in a SEMI join result and excludes it from an ANTI join - * result. If it's SEMI & SCALAR however, it needs to continue. - */ - newJoinerOutput.push_back(joinerOutput[j][z]); - - if (tjoiners[j]->antiJoin() || (tjoiners[j]->semiJoin() && !tjoiners[j]->scalar())) - break; - } - } - - // the filter eliminated all matches, need to join with the NULL row - if (matchCount == 0 && tjoiners[j]->largeOuterJoin()) - { - newJoinerOutput.push_back(Row::Pointer(smallNullMemory[j].get())); - matchCount = 1; - } - - joinerOutput[j].swap(newJoinerOutput); - } - - // XXXPAT: This has gone through enough revisions it would benefit - // from refactoring - - /* If anti-join, reverse the result */ - if (tjoiners[j]->antiJoin()) - { - matchCount = (matchCount ? 0 : 1); - } - - if (matchCount == 0) - { - joinerOutput[j].clear(); - break; - } - else if (!tjoiners[j]->scalar() && - (tjoiners[j]->antiJoin() || tjoiners[j]->semiJoin())) - { - joinerOutput[j].clear(); - joinerOutput[j].push_back(Row::Pointer(smallNullMemory[j].get())); - matchCount = 1; - } - } - - if (matchCount == 0 && tjoiners[j]->innerJoin()) - break; - - /* Scalar check */ - if (tjoiners[j]->scalar() && matchCount > 1) - { - errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_MORE_THAN_1_ROW)); - status(ERR_MORE_THAN_1_ROW); - abort(); - } - - if (tjoiners[j]->smallOuterJoin()) - tjoiners[j]->markMatches(threadID, joinerOutput[j]); - - } - - if (matchCount > 0) - { - applyMapping(largeMapping, largeSideRow, &joinedBaseRow); - joinedBaseRow.setRid(largeSideRow.getRelRid()); - generateJoinResultSet(joinerOutput, joinedBaseRow, smallMappings, - 0, local_outputRG, joinedData, &rgDatav, smallSideRows, postJoinRow); - - /* Bug 3510: Don't let the join results buffer get out of control. Need - to refactor this. All post-join processing needs to go here AND below - for now. */ - if (rgDatav.size() * local_outputRG.getMaxDataSize() > 50000000) - { - RowGroup out(local_outputRG); - - if (fe2 && !runFEonPM) - { - processFE2(out, local_fe2Output, postJoinRow, - local_fe2OutRow, &rgDatav, &local_fe2); - rgDataVecToDl(rgDatav, local_fe2Output, dlp); - } - else - rgDataVecToDl(rgDatav, out, dlp); - } - } - } // end of the for-loop in the join code - - if (local_outputRG.getRowCount() > 0) - { - rgDatav.push_back(joinedData); - } - } - else - { - rgDatav.push_back(rgData); - } - - /* Execute UM F & E group 2 on rgDatav */ - if (fe2 && !runFEonPM && rgDatav.size() > 0 && !cancelled()) - { - processFE2(local_outputRG, local_fe2Output, postJoinRow, local_fe2OutRow, &rgDatav, &local_fe2); - rgDataVecToDl(rgDatav, local_fe2Output, dlp); - } - - cachedIO_Thread += cachedIO; - physIO_Thread += physIO; - touchedBlocks_Thread += touchedBlocks; - - if (fOid >= 3000 && ffirstStepType == SCAN && bop == BOP_AND) - { - if (fColType.colWidth <= 8) - { - cpv.push_back(_CPInfo((int64_t) min, (int64_t) max, lbid, validCPData)); - } - else if (fColType.colWidth == 16) - { - cpv.push_back(_CPInfo(min, max, lbid, validCPData)); - } - } - } // end of the per-rowgroup processing loop - - // insert the resulting rowgroup data from a single bytestream into dlp - if (rgDatav.size() > 0) - { - if (fe2 && runFEonPM) - rgDataVecToDl(rgDatav, local_fe2Output, dlp); - else - rgDataVecToDl(rgDatav, local_outputRG, dlp); - } - } // end of the per-bytestream loop + // Clear all. + fProcessorThreads.clear(); + bsv.clear(); // @bug 4562 if (traceOn() && fOid >= 3000) dlTimes.setFirstInsertTime(); - //update casual partition - size = cpv.size(); - - if (size > 0 && !cancelled()) + // update casual partition + for (const auto& cpv : cpInfos) { - cpMutex.lock(); - - for (i = 0; i < size; i++) + size = cpv.size(); + if (size > 0 && !cancelled()) { - if (fColType.colWidth > 8) + for (uint32_t i = 0; i < size; i++) { - lbidList->UpdateMinMax(cpv[i].bigMin, cpv[i].bigMax, cpv[i].LBID, fColType, - cpv[i].valid); - } - else - { - lbidList->UpdateMinMax(cpv[i].min, cpv[i].max, cpv[i].LBID, fColType, - cpv[i].valid); + if (fColType.colWidth > 8) + { + lbidList->UpdateMinMax(cpv[i].bigMin, cpv[i].bigMax, cpv[i].LBID, fColType, + cpv[i].valid); + } + else + { + lbidList->UpdateMinMax(cpv[i].min, cpv[i].max, cpv[i].LBID, fColType, + cpv[i].valid); + } } } - - cpMutex.unlock(); } - cpv.clear(); - tplLock.lock(); if (fOid >= 3000) @@ -2427,96 +2437,98 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID) } // done reading - }//try + } // try catch (...) { - handleException(std::current_exception(), - logging::ERR_TUPLE_BPS, - logging::ERR_ALWAYS_CRITICAL, - "st: " + std::to_string(fStepId) + - " TupleBPS::receiveMultiPrimitiveMessages()"); + handleException(std::current_exception(), logging::ERR_TUPLE_BPS, logging::ERR_ALWAYS_CRITICAL, + "st: " + std::to_string(fStepId) + " TupleBPS::receiveMultiPrimitiveMessages()"); abort_nolock(); } out: - if (++recvExited == fNumThreads) - { + // We have on thread here and do not need to notify any waiting producer threads, because we are done of + // consuming messages from queue. + tplLock.unlock(); + // Take just the first one. + auto data = getJoinLocalDataByIndex(0); + { if (doJoin && smallOuterJoiner != -1 && !cancelled()) { - tplLock.unlock(); - /* If this was a left outer join, this needs to put the unmatched - rows from the joiner into the output - XXXPAT: This might be a problem if later steps depend - on sensible rids and/or sensible ordering */ + vector rgDatav; + + // If this was a left outer join, this needs to put the unmatched + // rows from the joiner into the output + // XXXPAT: This might be a problem if later steps depend + // on sensible rids and/or sensible ordering vector unmatched; #ifdef JLF_DEBUG cout << "finishing small-outer join output\n"; #endif - i = smallOuterJoiner; + uint32_t i = smallOuterJoiner; tjoiners[i]->getUnmarkedRows(&unmatched); - joinedData = RGData(local_outputRG); - local_outputRG.setData(&joinedData); - local_outputRG.resetRowGroup(-1); - local_outputRG.getRow(0, &joinedBaseRow); + data->joinedData = RGData(data->local_outputRG); + data->local_outputRG.setData(&data->joinedData); + data->local_outputRG.resetRowGroup(-1); + data->local_outputRG.getRow(0, &data->joinedBaseRow); - for (j = 0; j < unmatched.size(); j++) + for (uint32_t j = 0; j < unmatched.size(); j++) { - smallSideRows[i].setPointer(unmatched[j]); + data->smallSideRows[i].setPointer(unmatched[j]); - for (k = 0; k < smallSideCount; k++) + for (uint32_t k = 0; k < smallSideCount; k++) { if (i == k) - applyMapping(smallMappings[i], smallSideRows[i], &joinedBaseRow); + applyMapping(data->smallMappings[i], data->smallSideRows[i], &data->joinedBaseRow); else - applyMapping(smallMappings[k], smallNulls[k], &joinedBaseRow); + applyMapping(data->smallMappings[k], data->smallNulls[k], &data->joinedBaseRow); } - applyMapping(largeMapping, largeNull, &joinedBaseRow); - joinedBaseRow.setRid(0); - joinedBaseRow.nextRow(); - local_outputRG.incRowCount(); + applyMapping(data->largeMapping, data->largeNull, &data->joinedBaseRow); + data->joinedBaseRow.setRid(0); + data->joinedBaseRow.nextRow(); + data->local_outputRG.incRowCount(); - if (local_outputRG.getRowCount() == 8192) + if (data->local_outputRG.getRowCount() == 8192) { if (fe2) { - rgDatav.push_back(joinedData); - processFE2(local_outputRG, local_fe2Output, postJoinRow, local_fe2OutRow, &rgDatav, &local_fe2); + rgDatav.push_back(data->joinedData); + processFE2(data->local_outputRG, data->local_fe2Output, data->postJoinRow, + data->local_fe2OutRow, &rgDatav, &data->local_fe2); if (rgDatav.size() > 0) - rgDataToDl(rgDatav[0], local_fe2Output, dlp); + rgDataToDl(rgDatav[0], data->local_fe2Output, dlp); rgDatav.clear(); } else - rgDataToDl(joinedData, local_outputRG, dlp); + rgDataToDl(data->joinedData, data->local_outputRG, dlp); - joinedData = RGData(local_outputRG); - local_outputRG.setData(&joinedData); - local_outputRG.resetRowGroup(-1); - local_outputRG.getRow(0, &joinedBaseRow); + data->joinedData = RGData(data->local_outputRG); + data->local_outputRG.setData(&data->joinedData); + data->local_outputRG.resetRowGroup(-1); + data->local_outputRG.getRow(0, &data->joinedBaseRow); } } - if (local_outputRG.getRowCount() > 0) + if (data->local_outputRG.getRowCount() > 0) { if (fe2) { - rgDatav.push_back(joinedData); - processFE2(local_outputRG, local_fe2Output, postJoinRow, local_fe2OutRow, &rgDatav, &local_fe2); + rgDatav.push_back(data->joinedData); + processFE2(data->local_outputRG, data->local_fe2Output, data->postJoinRow, + data->local_fe2OutRow, &rgDatav, &data->local_fe2); if (rgDatav.size() > 0) - rgDataToDl(rgDatav[0], local_fe2Output, dlp); + rgDataToDl(rgDatav[0], data->local_fe2Output, dlp); rgDatav.clear(); } else - rgDataToDl(joinedData, local_outputRG, dlp); + rgDataToDl(data->joinedData, data->local_outputRG, dlp); } - - tplLock.lock(); } if (traceOn() && fOid >= 3000) @@ -2560,32 +2572,29 @@ out: fMsgBytesOut = stats.dataSent(); fDec->removeQueue(uniqueID); tjoiners.clear(); - - lastThread = true; } //@Bug 1099 - ridsReturned += ridsReturned_Thread; - fPhysicalIO += physIO_Thread; - fCacheIO += cachedIO_Thread; - fBlockTouched += touchedBlocks_Thread; - tplLock.unlock(); + ridsReturned += data->ridsReturned_Thread; + fPhysicalIO += data->physIO_Thread; + fCacheIO += data->cachedIO_Thread; + fBlockTouched += data->touchedBlocks_Thread; - if (fTableOid >= 3000 && lastThread) + if (fTableOid >= 3000) { struct timeval tvbuf; gettimeofday(&tvbuf, 0); - FIFO >* pFifo = 0; - uint64_t totalBlockedReadCount = 0; + FIFO>* pFifo = 0; + uint64_t totalBlockedReadCount = 0; uint64_t totalBlockedWriteCount = 0; //...Sum up the blocked FIFO reads for all input associations - size_t inDlCnt = fInputJobStepAssociation.outSize(); + size_t inDlCnt = fInputJobStepAssociation.outSize(); for (size_t iDataList = 0; iDataList < inDlCnt; iDataList++) { - pFifo = dynamic_cast > *>( - fInputJobStepAssociation.outAt(iDataList)->rowGroupDL()); + pFifo = dynamic_cast>*>( + fInputJobStepAssociation.outAt(iDataList)->rowGroupDL()); if (pFifo) { @@ -2598,7 +2607,7 @@ out: for (size_t iDataList = 0; iDataList < outDlCnt; iDataList++) { - pFifo = dynamic_cast > *>(dlp); + pFifo = dynamic_cast>*>(dlp); if (pFifo) { @@ -2606,8 +2615,11 @@ out: } } + // Notify consumers as early as possible. + dlp->endOfInput(); + //...Roundoff msg byte counts to nearest KB for display - uint64_t msgBytesInKB = fMsgBytesIn >> 10; + uint64_t msgBytesInKB = fMsgBytesIn >> 10; uint64_t msgBytesOutKB = fMsgBytesOut >> 10; if (fMsgBytesIn & 512) @@ -2620,44 +2632,40 @@ out: { // @bug 828 ostringstream logStr; - logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << - JSTimeStamp::format(tvbuf) << "; PhyI/O-" << fPhysicalIO << "; CacheI/O-" << - fCacheIO << "; MsgsSent-" << msgsSent << "; MsgsRvcd-" << msgsRecvd << - "; BlocksTouched-" << fBlockTouched << - "; BlockedFifoIn/Out-" << totalBlockedReadCount << - "/" << totalBlockedWriteCount << - "; output size-" << ridsReturned << endl << - "\tPartitionBlocksEliminated-" << fNumBlksSkipped << - "; MsgBytesIn-" << msgBytesInKB << "KB" << - "; MsgBytesOut-" << msgBytesOutKB << "KB" << - "; TotalMsgs-" << totalMsgs << endl << - "\t1st read " << dlTimes.FirstReadTimeString() << - "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-" << - JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << - "s\n\tUUID " << uuids::to_string(fStepUuid) << - "\n\tQuery UUID " << uuids::to_string(queryUuid()) << - "\n\tJob completion status " << status() << endl; + logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " + << JSTimeStamp::format(tvbuf) << "; PhyI/O-" << fPhysicalIO << "; CacheI/O-" << fCacheIO + << "; MsgsSent-" << msgsSent << "; MsgsRvcd-" << msgsRecvd << "; BlocksTouched-" + << fBlockTouched << "; BlockedFifoIn/Out-" << totalBlockedReadCount << "/" + << totalBlockedWriteCount << "; output size-" << ridsReturned << endl + << "\tPartitionBlocksEliminated-" << fNumBlksSkipped << "; MsgBytesIn-" << msgBytesInKB + << "KB" + << "; MsgBytesOut-" << msgBytesOutKB << "KB" + << "; TotalMsgs-" << totalMsgs << endl + << "\t1st read " << dlTimes.FirstReadTimeString() << "; EOI " + << dlTimes.EndOfInputTimeString() << "; runtime-" + << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) + << "s\n\tUUID " << uuids::to_string(fStepUuid) << "\n\tQuery UUID " + << uuids::to_string(queryUuid()) << "\n\tJob completion status " << status() << endl; logEnd(logStr.str().c_str()); - syslogReadBlockCounts(16, // exemgr sybsystem + syslogReadBlockCounts(16, // exemgr sybsystem fPhysicalIO, // # blocks read from disk fCacheIO, // # blocks read from cache fNumBlksSkipped); // # casual partition block hits - syslogProcessingTimes(16, // exemgr subsystem + syslogProcessingTimes(16, // exemgr subsystem dlTimes.FirstReadTime(), // first datalist read dlTimes.LastReadTime(), // last datalist read dlTimes.FirstInsertTime(), // first datalist write dlTimes.EndOfInputTime()); // last (endOfInput) datalist write - syslogEndStep(16, // exemgr subsystem - totalBlockedReadCount, // blocked datalist input - totalBlockedWriteCount, // blocked datalist output - fMsgBytesIn, // incoming msg byte count - fMsgBytesOut); // outgoing msg byte count + syslogEndStep(16, // exemgr subsystem + totalBlockedReadCount, // blocked datalist input + totalBlockedWriteCount, // blocked datalist output + fMsgBytesIn, // incoming msg byte count + fMsgBytesOut); // outgoing msg byte count fExtendedInfo += toString() + logStr.str(); formatMiniStats(); } - if (lastThread && fOid >= 3000) { sts.msg_type = StepTeleStats::ST_SUMMARY; sts.phy_io = fPhysicalIO; @@ -2672,15 +2680,14 @@ out: if (ffirstStepType == SCAN && bop == BOP_AND && !cancelled()) { - cpMutex.lock(); lbidList->UpdateAllPartitionInfo(fColType); - cpMutex.unlock(); } } - - // Bug 3136, let mini stats to be formatted if traceOn. - if (lastThread && !didEOF) + else + { + // Notify consumers. dlp->endOfInput(); + } } const string TupleBPS::toString() const @@ -3412,5 +3419,5 @@ bool TupleBPS::compareSingleValue(uint8_t COP, int64_t val1, int64_t va template bool TupleBPS::compareSingleValue(uint8_t COP, int128_t val1, int128_t val2) const; -} //namespace +} // namespace // vim:ts=4 sw=4: diff --git a/utils/joiner/tuplejoiner.cpp b/utils/joiner/tuplejoiner.cpp index 880d4662c..acb6f06f6 100644 --- a/utils/joiner/tuplejoiner.cpp +++ b/utils/joiner/tuplejoiner.cpp @@ -908,8 +908,7 @@ void TupleJoiner::setInUM(vector &rgs) } } -void TupleJoiner::setPMJoinResults(boost::shared_array > jr, - uint32_t threadID) +void TupleJoiner::setPMJoinResults(boost::shared_array> jr, uint32_t threadID) { pmJoinResults[threadID] = jr; } @@ -950,7 +949,7 @@ boost::shared_array > TupleJoiner::getPMJoinArrays(uint32_ void TupleJoiner::setThreadCount(uint32_t cnt) { threadCount = cnt; - pmJoinResults.reset(new boost::shared_array >[cnt]); + pmJoinResults.reset(new boost::shared_array>[cnt]); smallRow.reset(new Row[cnt]); for (uint32_t i = 0; i < cnt; i++) diff --git a/utils/threadpool/threadpool.h b/utils/threadpool/threadpool.h index a15adc09f..5d2c9b9b1 100644 --- a/utils/threadpool/threadpool.h +++ b/utils/threadpool/threadpool.h @@ -225,6 +225,10 @@ public: return fMaxThreads; } + /** @brief get the issued number of threads + */ + inline size_t getIssuedThreads() { return fIssued; } + /** @brief queue size accessor * */ @@ -292,6 +296,7 @@ public: fDebug = d; } + friend class ThreadPoolMonitor; protected: