1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

feat(): Replacing STLPoolAllocator with CountingAllocator for in-memory joins

This commit is contained in:
drrtuy
2025-02-14 12:08:28 +00:00
parent 101a07d90b
commit 4c1d9bceb7
6 changed files with 86 additions and 240 deletions

View File

@ -1397,7 +1397,7 @@ bool BatchPrimitiveProcessorJL::pickNextJoinerNum()
for (i = 0; i < PMJoinerCount; i++)
{
joinerNum = (joinerNum + 1) % PMJoinerCount;
if (posByJoinerNum[joinerNum] != tJoiners[joinerNum]->getSmallSide()->size())
if (posByJoinerNum[joinerNum] != tJoiners[joinerNum]->getSmallSide().size())
break;
}
if (i == PMJoinerCount)
@ -1410,10 +1410,9 @@ bool BatchPrimitiveProcessorJL::pickNextJoinerNum()
/* XXXPAT: Going to interleave across joiners to take advantage of the new locking env in PrimProc */
bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs)
{
uint32_t size = 0, toSend, i, j;
uint32_t toSend, i, j;
ISMPacketHeader ism;
Row r;
vector<Row::Pointer>* tSmallSide;
joiner::TypelessData tlData;
uint32_t smallKeyCol;
uint32_t largeKeyCol;
@ -1436,8 +1435,8 @@ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs)
}
memset((void*)&ism, 0, sizeof(ism));
tSmallSide = tJoiners[joinerNum]->getSmallSide();
size = tSmallSide->size();
auto& tSmallSide = tJoiners[joinerNum]->getSmallSide();
auto size = tSmallSide.size();
#if 0
if (joinerNum == PMJoinerCount - 1 && pos == size)
@ -1487,7 +1486,7 @@ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs)
for (i = pos; i < pos + toSend; i++)
{
r.setPointer((*tSmallSide)[i]);
r.setPointer(tSmallSide[i]);
isNull = false;
bSignedUnsigned = tJoiners[joinerNum]->isSignedUnsignedJoin();
@ -1554,7 +1553,7 @@ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs)
for (i = pos, j = 0; i < pos + toSend; ++i, ++j)
{
r.setPointer((*tSmallSide)[i]);
r.setPointer(tSmallSide[i]);
if (r.getColType(smallKeyCol) == CalpontSystemCatalog::LONGDOUBLE)
{
@ -1627,7 +1626,7 @@ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs)
for (i = pos; i < pos + toSend; i++, tmpRow.nextRow())
{
r.setPointer((*tSmallSide)[i]);
r.setPointer(tSmallSide[i]);
copyRow(r, &tmpRow);
}

View File

@ -207,60 +207,6 @@ void TupleHashJoinStep::join()
}
}
// simple sol'n. Poll mem usage of Joiner once per second. Request mem
// increase after the fact. Failure to get mem will be detected and handled by
// the threads inserting into Joiner.
void TupleHashJoinStep::trackMem(uint index)
{
auto joiner = joiners[index];
ssize_t memBefore = 0, memAfter = 0;
bool gotMem;
boost::unique_lock<boost::mutex> scoped(memTrackMutex);
while (!stopMemTracking)
{
memAfter = joiner->getMemUsage();
if (memAfter != memBefore)
{
gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, true);
if (gotMem)
atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore);
else
return;
memBefore = memAfter;
}
memTrackDone.timed_wait(scoped, boost::posix_time::seconds(1));
}
// one more iteration to capture mem usage since last poll, for this one
// raise an error if mem went over the limit
memAfter = joiner->getMemUsage();
if (memAfter == memBefore)
return;
gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, true);
if (gotMem)
{
atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore);
}
else
{
if (!joinIsTooBig &&
(isDML || !allowDJS || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000)))
{
joinIsTooBig = true;
ostringstream oss;
oss << "(" << __LINE__ << ") "
<< logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG);
fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str());
errorMessage(oss.str());
status(logging::ERR_JOIN_TOO_BIG);
cout << "Join is too big, raise the UM join limit for now (monitor thread)" << endl;
abort();
}
}
}
void TupleHashJoinStep::startSmallRunners(uint index)
{
utils::setThreadName("HJSStartSmall");
@ -302,7 +248,6 @@ void TupleHashJoinStep::startSmallRunners(uint index)
stopMemTracking = false;
utils::VLArray<uint64_t> jobs(numCores);
// uint64_t memMonitor = jobstepThreadPool.invoke([this, index] { this->trackMem(index); });
// starting 1 thread when in PM mode, since it's only inserting into a
// vector of rows. The rest will be started when converted to UM mode.
if (joiners[index]->inUM())