1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

feat(RGData,StringStore): add counting allocator capabilities to those ctors used in BPP::execute()

This commit is contained in:
drrtuy
2024-11-30 18:51:29 +00:00
parent 02b8ea1331
commit 5f1bd3be12
14 changed files with 300 additions and 126 deletions

View File

@ -38,6 +38,7 @@
#include <string>
#include <sstream>
#include <set>
#include "rowgroup.h"
#include "serviceexemgr.h"
#include <cstdlib>
using namespace std;
@ -327,9 +328,14 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
typelessJoin.reset(new bool[joinerCount]);
tlSmallSideKeyLengths.reset(new uint32_t[joinerCount]);
storedKeyAllocators.reset(new PoolAllocator[joinerCount]);
// storedKeyAllocators.reset(new PoolAllocator[joinerCount]);
for (uint j = 0; j < joinerCount; ++j)
storedKeyAllocators[j].setUseLock(true);
{
// storedKeyAllocators[j].setUseLock(true);
// WIP use one copy of the allocator
auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<utils::PoolAllocatorBufType>();
storedKeyAllocators.emplace_back(PoolAllocator(&allocator, PoolAllocator::DEFAULT_WINDOW_SIZE, false, true));
}
joinNullValues.reset(new uint64_t[joinerCount]);
doMatchNulls.reset(new bool[joinerCount]);
@ -360,16 +366,16 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
if (!typelessJoin[i])
{
auto alloc = exemgr::globServiceExeMgr->getRm().getAllocator<TJoiner::value_type>();
auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<TJoiner::value_type>();
bs >> joinNullValues[i];
bs >> largeSideKeyColumns[i];
for (uint j = 0; j < processorThreads; ++j)
tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher(), alloc));
tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher(), allocator));
}
else
{
auto alloc = exemgr::globServiceExeMgr->getRm().getAllocator<TLJoiner::value_type>();
auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<TLJoiner::value_type>();
deserializeVector<uint32_t>(bs, tlLargeSideKeyColumns[i]);
bs >> tlSmallSideKeyLengths[i];
@ -392,7 +398,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
mSmallSideKeyColumnsPtr, mSmallSideRGPtr);
auto tlComparator = TupleJoiner::TypelessDataComparator(&outputRG, &tlLargeSideKeyColumns[i],
mSmallSideKeyColumnsPtr, mSmallSideRGPtr);
tlJoiners[i][j].reset(new TLJoiner(10, tlHasher, tlComparator, alloc));
tlJoiners[i][j].reset(new TLJoiner(10, tlHasher, tlComparator, allocator));
}
}
}
@ -1375,7 +1381,7 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid, RowGroup&
}
#ifdef PRIMPROC_STOPWATCH
void BatchPrimitiveProcessor::execute(StopWatch* stopwatch)
void BatchPrimitiveProcessor::execute(StopWatch* stopwatch, messageqcpp::SBS& bs)
#else
void BatchPrimitiveProcessor::execute(messageqcpp::SBS& bs)
#endif
@ -1603,7 +1609,6 @@ void BatchPrimitiveProcessor::execute(messageqcpp::SBS& bs)
{
for (j = 0; j < projectCount; ++j)
{
// cout << "projectionMap[" << j << "] = " << projectionMap[j] << endl;
if (projectionMap[j] != -1)
{
#ifdef PRIMPROC_STOPWATCH
@ -1614,11 +1619,6 @@ void BatchPrimitiveProcessor::execute(messageqcpp::SBS& bs)
projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
#endif
}
// else
// cout << " no target found for OID " <<
// projectSteps[j]->getOID()
//<< endl;
}
if (fe2)
{
@ -2233,8 +2233,8 @@ int BatchPrimitiveProcessor::operator()()
validCPData = false;
cpDataFromDictScan = false;
auto alloc = exemgr::globServiceExeMgr->getRm().getAllocator<messageqcpp::BSBufType>();
messageqcpp::SBS bs(new ByteStream(&alloc));
auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<messageqcpp::BSBufType>();
messageqcpp::SBS bs(new ByteStream(&allocator));
#ifdef PRIMPROC_STOPWATCH
stopwatch->start("BPP() execute");
@ -2295,17 +2295,19 @@ int BatchPrimitiveProcessor::operator()()
void BatchPrimitiveProcessor::allocLargeBuffers()
{
auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<rowgroup::RGDataBufType>();
if (ot == ROW_GROUP && !outRowGroupData)
{
// outputRG.setUseStringTable(true);
outRowGroupData.reset(new RGData(outputRG));
outRowGroupData.reset(new RGData(outputRG, &allocator));
outputRG.setData(outRowGroupData.get());
}
if (fe1 && !fe1Data)
{
// fe1Input.setUseStringTable(true);
fe1Data.reset(new RGData(fe1Input));
fe1Data.reset(new RGData(fe1Input, &allocator));
// fe1Data.reset(new uint8_t[fe1Input.getMaxDataSize()]);
fe1Input.setData(fe1Data.get());
}
@ -2313,14 +2315,14 @@ void BatchPrimitiveProcessor::allocLargeBuffers()
if (fe2 && !fe2Data)
{
// fe2Output.setUseStringTable(true);
fe2Data.reset(new RGData(fe2Output));
fe2Data.reset(new RGData(fe2Output, &allocator));
fe2Output.setData(fe2Data.get());
}
if (getTupleJoinRowGroupData && !joinedRGMem)
{
// joinedRG.setUseStringTable(true);
joinedRGMem.reset(new RGData(joinedRG));
joinedRGMem.reset(new RGData(joinedRG, &allocator));
joinedRG.setData(joinedRGMem.get());
}
}
@ -2488,73 +2490,6 @@ SBPP BatchPrimitiveProcessor::duplicate()
return bpp;
}
#if 0
bool BatchPrimitiveProcessor::operator==(const BatchPrimitiveProcessor& bpp) const
{
uint32_t i;
if (ot != bpp.ot)
return false;
if (versionInfo != bpp.versionInfo)
return false;
if (txnID != bpp.txnID)
return false;
if (sessionID != bpp.sessionID)
return false;
if (stepID != bpp.stepID)
return false;
if (uniqueID != bpp.uniqueID)
return false;
if (gotValues != bpp.gotValues)
return false;
if (gotAbsRids != bpp.gotAbsRids)
return false;
if (needStrValues != bpp.needStrValues)
return false;
if (filterCount != bpp.filterCount)
return false;
if (projectCount != bpp.projectCount)
return false;
if (sendRidsAtDelivery != bpp.sendRidsAtDelivery)
return false;
if (hasScan != bpp.hasScan)
return false;
if (hasFilterStep != bpp.hasFilterStep)
return false;
if (filtOnString != bpp.filtOnString)
return false;
if (doJoin != bpp.doJoin)
return false;
if (doJoin)
/* Join equality test is a bit out of date */
if (joiner != bpp.joiner || joinerSize != bpp.joinerSize)
return false;
for (i = 0; i < filterCount; i++)
if (*filterSteps[i] != *bpp.filterSteps[i])
return false;
return true;
}
#endif
void BatchPrimitiveProcessor::asyncLoadProjectColumns()
{
// relLBID is the LBID related to the primMsg->LBID,