1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

[MCOL-4849] Parallelize the processing of the bytestream vector.

This patch changes the logic of the `receiveMultiPrimitiveMessages`
function in the following way:

1. We have only one aggregation thread which reads the data from Queue (which is populated
by messages from BPPs).
2. Processing of the received `bytestream vector` could be in parallel depends on the
type of `TupleBPS` operation (join, fe2, ...) and actual thread pool workload.

The motivation is to eliminate some amount of context switches.
This commit is contained in:
Denis Khalikov
2021-10-13 15:31:05 +03:00
parent 650d45fcc1
commit b382f681a1
4 changed files with 786 additions and 665 deletions

View File

@ -1075,6 +1075,24 @@ public:
virtual void setFE23Output(const rowgroup::RowGroup& rg) = 0;
};
struct _CPInfo
{
_CPInfo(int64_t MIN, int64_t MAX, uint64_t l, bool val) : min(MIN), max(MAX), LBID(l), valid(val){};
_CPInfo(int128_t BIGMIN, int128_t BIGMAX, uint64_t l, bool val)
: bigMin(BIGMIN), bigMax(BIGMAX), LBID(l), valid(val){};
union
{
int128_t bigMin;
int64_t min;
};
union
{
int128_t bigMax;
int64_t max;
};
uint64_t LBID;
bool valid;
};
/** @brief class TupleBPS
*
@ -1113,7 +1131,12 @@ public:
*
* The main loop for the receive-side thread. Don't call it directly.
*/
void receiveMultiPrimitiveMessages(uint32_t threadID);
void receiveMultiPrimitiveMessages();
// Processes the vector of `bytestream` starting from `begin` index to the `end` index, non inclusive.
void processByteStreamVector(vector<boost::shared_ptr<messageqcpp::ByteStream>>& bsv,
const uint32_t begin, const uint32_t end, vector<_CPInfo>& cpv,
RowGroupDL* dlp, const uint32_t threadID);
/** @brief Add a filter when the column is anything but a 4-byte float type.
*
@ -1348,158 +1371,246 @@ protected:
void sendError(uint16_t status);
private:
void formatMiniStats();
void formatMiniStats();
void startPrimitiveThread();
void startAggregationThread();
void initializeConfigParms();
uint64_t getFBO(uint64_t lbid);
void checkDupOutputColumns(const rowgroup::RowGroup& rg);
void dupOutputColumns(rowgroup::RowGroup&);
void dupOutputColumns(rowgroup::RGData&, rowgroup::RowGroup&);
void rgDataToDl(rowgroup::RGData&, rowgroup::RowGroup&, RowGroupDL*);
void rgDataVecToDl(std::vector<rowgroup::RGData>&, rowgroup::RowGroup&, RowGroupDL*);
void startPrimitiveThread();
void startAggregationThread();
// Processes the vector of `bytestream` starting from `begin` index to the `end` index, non inclusive.
void startProcessingThread(TupleBPS* tbps, vector<boost::shared_ptr<messageqcpp::ByteStream>>& bsv,
const uint32_t begin, const uint32_t end, vector<_CPInfo>& cpv, RowGroupDL* dlp,
const uint32_t threadID);
void initializeConfigParms();
uint64_t getFBO(uint64_t lbid);
void checkDupOutputColumns(const rowgroup::RowGroup& rg);
void dupOutputColumns(rowgroup::RowGroup&);
void dupOutputColumns(rowgroup::RGData&, rowgroup::RowGroup&);
void rgDataToDl(rowgroup::RGData&, rowgroup::RowGroup&, RowGroupDL*);
void rgDataVecToDl(std::vector<rowgroup::RGData>&, rowgroup::RowGroup&, RowGroupDL*);
DistributedEngineComm* fDec;
boost::shared_ptr<BatchPrimitiveProcessorJL> fBPP;
uint16_t fNumSteps;
int fColWidth;
uint32_t fStepCount;
bool fCPEvaluated; // @bug 2123
uint64_t fEstimatedRows; // @bug 2123
/// number of threads on the receive side
uint32_t fMaxNumThreads;
uint32_t fNumThreads;
PrimitiveStepType ffirstStepType;
bool isFilterFeeder;
std::vector<uint64_t> fProducerThreads; // thread pool handles
std::vector<uint64_t> fProcessorThreads;
messageqcpp::ByteStream fFilterString;
uint32_t fFilterCount;
execplan::CalpontSystemCatalog::ColType fColType;
execplan::CalpontSystemCatalog::OID fOid;
execplan::CalpontSystemCatalog::OID fTableOid;
uint64_t fLastTupleId;
BRM::LBIDRange_v lbidRanges;
std::vector<int32_t> lastExtent;
std::vector<BRM::LBID_t> lastScannedLBID;
BRM::DBRM dbrm;
SP_LBIDList lbidList;
uint64_t ridsRequested;
uint64_t totalMsgs;
volatile uint64_t msgsSent;
volatile uint64_t msgsRecvd;
volatile bool finishedSending;
bool firstRead;
bool sendWaiting;
uint32_t recvWaiting;
uint32_t recvExited;
uint64_t ridsReturned;
std::map<execplan::CalpontSystemCatalog::OID, std::tr1::unordered_map<int64_t, struct BRM::EMEntry>> extentsMap;
std::vector<BRM::EMEntry> scannedExtents;
OIDVector projectOids;
uint32_t extentSize, divShift, rpbShift, numExtents, modMask;
uint32_t fRequestSize; // the number of logical extents per batch of requests sent to PrimProc.
uint32_t fProcessorThreadsPerScan; // The number of messages sent per logical extent.
bool fSwallowRows;
uint32_t fMaxOutstandingRequests; // The number of logical extents have not processed by PrimProc
uint64_t fPhysicalIO; // total physical I/O count
uint64_t fCacheIO; // total cache I/O count
uint64_t fNumBlksSkipped; // total number of block scans skipped due to CP
uint64_t fMsgBytesIn; // total byte count for incoming messages
uint64_t fMsgBytesOut; // total byte count for outcoming messages
uint64_t fBlockTouched; // total blocks touched
uint32_t fExtentsPerSegFile; // config num of Extents Per Segment File
// uint64_t cThread; //consumer thread. thread handle from thread pool
uint64_t pThread; // producer thread. thread handle from thread pool
boost::mutex tplMutex;
boost::mutex dlMutex;
boost::mutex cpMutex;
boost::mutex serializeJoinerMutex;
boost::condition condvarWakeupProducer, condvar;
DistributedEngineComm* fDec;
boost::shared_ptr<BatchPrimitiveProcessorJL> fBPP;
uint16_t fNumSteps;
int fColWidth;
uint32_t fStepCount;
bool fCPEvaluated; // @bug 2123
uint64_t fEstimatedRows; // @bug 2123
/// number of threads on the receive side
uint32_t fMaxNumThreads;
uint32_t fNumThreads;
PrimitiveStepType ffirstStepType;
bool isFilterFeeder;
std::vector<uint64_t> fProducerThreads; // thread pool handles
messageqcpp::ByteStream fFilterString;
uint32_t fFilterCount;
execplan::CalpontSystemCatalog::ColType fColType;
execplan::CalpontSystemCatalog::OID fOid;
execplan::CalpontSystemCatalog::OID fTableOid;
uint64_t fLastTupleId;
BRM::LBIDRange_v lbidRanges;
std::vector<int32_t> lastExtent;
std::vector<BRM::LBID_t> lastScannedLBID;
BRM::DBRM dbrm;
SP_LBIDList lbidList;
uint64_t ridsRequested;
uint64_t totalMsgs;
volatile uint64_t msgsSent;
volatile uint64_t msgsRecvd;
volatile bool finishedSending;
bool firstRead;
bool sendWaiting;
uint32_t recvWaiting;
uint32_t recvExited;
uint64_t ridsReturned;
std::map<execplan::CalpontSystemCatalog::OID, std::tr1::unordered_map<int64_t, struct BRM::EMEntry> > extentsMap;
std::vector<BRM::EMEntry> scannedExtents;
OIDVector projectOids;
uint32_t extentSize, divShift, rpbShift, numExtents, modMask;
uint32_t fRequestSize; // the number of logical extents per batch of requests sent to PrimProc.
uint32_t fProcessorThreadsPerScan; // The number of messages sent per logical extent.
bool fSwallowRows;
uint32_t fMaxOutstandingRequests; // The number of logical extents have not processed by PrimProc
uint64_t fPhysicalIO; // total physical I/O count
uint64_t fCacheIO; // total cache I/O count
uint64_t fNumBlksSkipped;//total number of block scans skipped due to CP
uint64_t fMsgBytesIn; // total byte count for incoming messages
uint64_t fMsgBytesOut; // total byte count for outcoming messages
uint64_t fBlockTouched; // total blocks touched
uint32_t fExtentsPerSegFile;//config num of Extents Per Segment File
// uint64_t cThread; //consumer thread. thread handle from thread pool
uint64_t pThread; //producer thread. thread handle from thread pool
boost::mutex tplMutex;
boost::mutex dlMutex;
boost::mutex cpMutex;
boost::mutex serializeJoinerMutex;
boost::condition condvarWakeupProducer, condvar;
std::vector<bool> scanFlags; // use to keep track of which extents to eliminate from this step
bool BPPIsAllocated;
uint32_t uniqueID;
ResourceManager* fRm;
std::vector<bool> scanFlags; // use to keep track of which extents to eliminate from this step
bool BPPIsAllocated;
uint32_t uniqueID;
ResourceManager* fRm;
/* HashJoin support */
/* HashJoin support */
void serializeJoiner();
void serializeJoiner(uint32_t connectionNumber);
void serializeJoiner();
void serializeJoiner(uint32_t connectionNumber);
void generateJoinResultSet(const std::vector<std::vector<rowgroup::Row::Pointer>>& joinerOutput,
rowgroup::Row& baseRow, const std::vector<boost::shared_array<int>>& mappings,
const uint32_t depth, rowgroup::RowGroup& outputRG, rowgroup::RGData& rgData,
std::vector<rowgroup::RGData>* outputData,
const boost::scoped_array<rowgroup::Row>& smallRows, rowgroup::Row& joinedRow);
void generateJoinResultSet(const std::vector<std::vector<rowgroup::Row::Pointer> >& joinerOutput,
rowgroup::Row& baseRow, const std::vector<boost::shared_array<int> >& mappings,
const uint32_t depth, rowgroup::RowGroup& outputRG, rowgroup::RGData& rgData,
std::vector<rowgroup::RGData>* outputData,
const boost::scoped_array<rowgroup::Row>& smallRows, rowgroup::Row& joinedRow);
std::vector<boost::shared_ptr<joiner::TupleJoiner>> tjoiners;
bool doJoin, hasPMJoin, hasUMJoin;
std::vector<rowgroup::RowGroup> joinerMatchesRGs; // parses the small-side matches from joiner
std::vector<boost::shared_ptr<joiner::TupleJoiner> > tjoiners;
bool doJoin, hasPMJoin, hasUMJoin;
std::vector<rowgroup::RowGroup> joinerMatchesRGs; // parses the small-side matches from joiner
uint32_t smallSideCount;
int smallOuterJoiner;
uint32_t smallSideCount;
int smallOuterJoiner;
bool fRunExecuted; // was the run method executed for this step
rowgroup::RowGroup inputRowGroup; // for parsing the data read from the datalist
rowgroup::RowGroup primRowGroup; // for parsing the data received from the PM
rowgroup::RowGroup outputRowGroup; // if there's a join, these are the joined
// result, otherwise it's = to primRowGroup
// aggregation support
rowgroup::SP_ROWAGG_PM_t fAggregatorPm;
rowgroup::RowGroup fAggRowGroupPm;
bool fRunExecuted; // was the run method executed for this step
rowgroup::RowGroup inputRowGroup; // for parsing the data read from the datalist
rowgroup::RowGroup primRowGroup; // for parsing the data received from the PM
rowgroup::RowGroup outputRowGroup; // if there's a join, these are the joined
// result, otherwise it's = to primRowGroup
// aggregation support
rowgroup::SP_ROWAGG_PM_t fAggregatorPm;
rowgroup::RowGroup fAggRowGroupPm;
// OR hacks
uint8_t bop; // BOP_AND or BOP_OR
// OR hacks
uint8_t bop; // BOP_AND or BOP_OR
// temporary hack to make sure JobList only calls run and join once
boost::mutex jlLock;
bool runRan;
bool joinRan;
// temporary hack to make sure JobList only calls run and join once
boost::mutex jlLock;
bool runRan;
bool joinRan;
// bug 1965, trace duplicat columns in delivery list <dest, src>
std::vector<std::pair<uint32_t, uint32_t>> dupColumns;
// bug 1965, trace duplicat columns in delivery list <dest, src>
std::vector<std::pair<uint32_t, uint32_t> > dupColumns;
/* Functions & Expressions vars */
boost::shared_ptr<funcexp::FuncExpWrapper> fe1, fe2;
rowgroup::RowGroup fe1Input, fe2Output;
boost::shared_array<int> fe2Mapping;
bool runFEonPM;
/* Functions & Expressions vars */
boost::shared_ptr<funcexp::FuncExpWrapper> fe1, fe2;
rowgroup::RowGroup fe1Input, fe2Output;
boost::shared_array<int> fe2Mapping;
bool runFEonPM;
/* for UM F & E 2 processing */
rowgroup::RGData fe2Data;
rowgroup::Row fe2InRow, fe2OutRow;
/* for UM F & E 2 processing */
rowgroup::RGData fe2Data;
rowgroup::Row fe2InRow, fe2OutRow;
void processFE2(rowgroup::RowGroup& input, rowgroup::RowGroup& output, rowgroup::Row& inRow, rowgroup::Row& outRow,
std::vector<rowgroup::RGData>* rgData, funcexp::FuncExpWrapper* localFE2);
void processFE2_oneRG(rowgroup::RowGroup& input, rowgroup::RowGroup& output, rowgroup::Row& inRow,
rowgroup::Row& outRow, funcexp::FuncExpWrapper* localFE2);
void processFE2(rowgroup::RowGroup& input, rowgroup::RowGroup& output,
rowgroup::Row& inRow, rowgroup::Row& outRow,
std::vector<rowgroup::RGData>* rgData,
funcexp::FuncExpWrapper* localFE2);
void processFE2_oneRG(rowgroup::RowGroup& input, rowgroup::RowGroup& output,
rowgroup::Row& inRow, rowgroup::Row& outRow,
funcexp::FuncExpWrapper* localFE2);
/* Runtime Casual Partitioning adjustments. The CP code is needlessly complicated;
* to avoid making it worse, decided to designate 'scanFlags' as the static
* component and this new array as the runtime component. The final CP decision
* is scanFlags & runtimeCP.
*/
std::vector<bool> runtimeCPFlags;
/* Runtime Casual Partitioning adjustments. The CP code is needlessly complicated;
* to avoid making it worse, decided to designate 'scanFlags' as the static
* component and this new array as the runtime component. The final CP decision
* is scanFlags & runtimeCP.
*/
std::vector<bool> runtimeCPFlags;
/* semijoin vars */
rowgroup::RowGroup joinFERG;
/* semijoin vars */
rowgroup::RowGroup joinFERG;
boost::shared_ptr<RowGroupDL> deliveryDL;
uint32_t deliveryIt;
boost::shared_ptr<RowGroupDL> deliveryDL;
uint32_t deliveryIt;
struct JoinLocalData
{
JoinLocalData() = delete;
JoinLocalData(const JoinLocalData&) = delete;
JoinLocalData(JoinLocalData&&) = delete;
JoinLocalData& operator=(const JoinLocalData&) = delete;
JoinLocalData& operator=(JoinLocalData&&) = delete;
~JoinLocalData() = default;
/* shared nothing support */
struct Job
{
Job(uint32_t d, uint32_t n, uint32_t b, boost::shared_ptr<messageqcpp::ByteStream>& bs) :
dbroot(d), connectionNum(n), expectedResponses(b), msg(bs) { }
uint32_t dbroot;
uint32_t connectionNum;
uint32_t expectedResponses;
boost::shared_ptr<messageqcpp::ByteStream> msg;
JoinLocalData(rowgroup::RowGroup& primRowGroup, rowgroup::RowGroup& outputRowGroup,
boost::shared_ptr<funcexp::FuncExpWrapper>& fe2, rowgroup::RowGroup& fe2Output,
std::vector<rowgroup::RowGroup>& joinerMatchesRGs, rowgroup::RowGroup& joinFERG,
std::vector<boost::shared_ptr<joiner::TupleJoiner>>& tjoiners, uint32_t smallSideCount,
bool doJoin);
rowgroup::RowGroup local_primRG;
rowgroup::RowGroup local_outputRG;
uint32_t cachedIO_Thread = 0;
uint32_t physIO_Thread = 0;
uint32_t touchedBlocks_Thread = 0;
int64_t ridsReturned_Thread = 0;
// On init.
bool doJoin;
boost::shared_ptr<funcexp::FuncExpWrapper> fe2;
rowgroup::RowGroup fe2Output;
uint32_t smallSideCount;
std::vector<rowgroup::RowGroup> joinerMatchesRGs;
rowgroup::RowGroup joinFERG;
std::vector<boost::shared_ptr<joiner::TupleJoiner>> tjoiners;
// Join vars.
vector<vector<rowgroup::Row::Pointer>> joinerOutput;
rowgroup::Row largeSideRow;
rowgroup::Row joinedBaseRow;
rowgroup::Row largeNull;
rowgroup::Row joinFERow; // LSR clean
boost::scoped_array<rowgroup::Row> smallSideRows;
boost::scoped_array<rowgroup::Row> smallNulls;
boost::scoped_array<uint8_t> joinedBaseRowData;
boost::scoped_array<uint8_t> joinFERowData;
boost::shared_array<int> largeMapping;
vector<boost::shared_array<int>> smallMappings;
vector<boost::shared_array<int>> fergMappings;
rowgroup::RGData joinedData;
boost::scoped_array<uint8_t> largeNullMemory;
boost::scoped_array<boost::shared_array<uint8_t>> smallNullMemory;
uint32_t matchCount;
rowgroup::Row postJoinRow;
rowgroup::RowGroup local_fe2Output;
rowgroup::RGData local_fe2Data;
rowgroup::Row local_fe2OutRow;
funcexp::FuncExpWrapper local_fe2;
};
// We will initialize the actual value in TupleBPS `initializeConfigParms` function.
uint32_t fMaxNumProcessorThreads = 16;
// Based on the `ThreadPool` workload we set it in runtime for each `TupleBPS` operation.
uint32_t fNumProcessorThreads = 1;
std::shared_ptr<JoinLocalData> getJoinLocalDataByIndex(uint32_t index)
{
idbassert(index < fNumProcessorThreads && joinLocalDataPool.size() == fNumProcessorThreads);
return joinLocalDataPool[index];
}
void initializeJoinLocalDataPool(uint32_t numThreads)
{
idbassert(numThreads <= fMaxNumProcessorThreads);
for (uint32_t i = 0; i < numThreads; ++i)
{
joinLocalDataPool.push_back(std::shared_ptr<JoinLocalData>(
new JoinLocalData(primRowGroup, outputRowGroup, fe2, fe2Output, joinerMatchesRGs, joinFERG,
tjoiners, smallSideCount, doJoin)));
}
fNumProcessorThreads = numThreads;
}
// Join local data vector.
std::vector<std::shared_ptr<JoinLocalData>> joinLocalDataPool;
/* shared nothing support */
struct Job
{
Job(uint32_t d, uint32_t n, uint32_t b, boost::shared_ptr<messageqcpp::ByteStream>& bs)
: dbroot(d), connectionNum(n), expectedResponses(b), msg(bs)
{
}
uint32_t dbroot;
uint32_t connectionNum;
uint32_t expectedResponses;
boost::shared_ptr<messageqcpp::ByteStream> msg;
};
void prepCasualPartitioning();
@ -1523,8 +1634,7 @@ private:
bool compareRange(uint8_t COP, int64_t min, int64_t max, int64_t val) const;
bool hasPCFilter, hasPMFilter, hasRIDFilter, hasSegmentFilter, hasDBRootFilter, hasSegmentDirFilter,
hasPartitionFilter, hasMaxFilter, hasMinFilter, hasLBIDFilter, hasExtentIDFilter;
};
};
/** @brief class FilterStep
*

File diff suppressed because it is too large Load Diff

View File

@ -908,8 +908,7 @@ void TupleJoiner::setInUM(vector<RGData> &rgs)
}
}
void TupleJoiner::setPMJoinResults(boost::shared_array<vector<uint32_t> > jr,
uint32_t threadID)
void TupleJoiner::setPMJoinResults(boost::shared_array<vector<uint32_t>> jr, uint32_t threadID)
{
pmJoinResults[threadID] = jr;
}
@ -950,7 +949,7 @@ boost::shared_array<std::vector<uint32_t> > TupleJoiner::getPMJoinArrays(uint32_
void TupleJoiner::setThreadCount(uint32_t cnt)
{
threadCount = cnt;
pmJoinResults.reset(new boost::shared_array<vector<uint32_t> >[cnt]);
pmJoinResults.reset(new boost::shared_array<vector<uint32_t>>[cnt]);
smallRow.reset(new Row[cnt]);
for (uint32_t i = 0; i < cnt; i++)

View File

@ -225,6 +225,10 @@ public:
return fMaxThreads;
}
/** @brief get the issued number of threads
*/
inline size_t getIssuedThreads() { return fIssued; }
/** @brief queue size accessor
*
*/
@ -292,6 +296,7 @@ public:
fDebug = d;
}
friend class ThreadPoolMonitor;
protected: