1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-08 14:22:09 +03:00

feat(): propagated changes into SLTPoolAllocator and friends

This commit is contained in:
drrtuy
2025-01-10 18:53:49 +00:00
parent a6de8ec1ac
commit 90b4322470
18 changed files with 516 additions and 129 deletions

View File

@@ -277,22 +277,21 @@ void TupleHashJoinStep::startSmallRunners(uint index)
if (typelessJoin[index])
{
joiner.reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index], largeSideKeys[index], jt,
&jobstepThreadPool, numCores));
joiners[index].reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index], largeSideKeys[index],
jt, &jobstepThreadPool, resourceManager, numCores));
}
else
{
joiner.reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index][0], largeSideKeys[index][0],
jt, &jobstepThreadPool, numCores));
joiners[index].reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index][0], largeSideKeys[index][0],
jt, &jobstepThreadPool, resourceManager, numCores));
}
joiner->setUniqueLimit(uniqueLimit);
joiner->setTableName(smallTableNames[index]);
joiners[index] = joiner;
joiners[index]->setUniqueLimit(uniqueLimit);
joiners[index]->setTableName(smallTableNames[index]);
/* check for join types unsupported on the PM. */
if (!largeBPS || !isExeMgr)
joiner->setInUM(rgData[index]);
joiners[index]->setInUM(rgData[index]);
/*
start the small runners
@@ -306,7 +305,7 @@ void TupleHashJoinStep::startSmallRunners(uint index)
uint64_t memMonitor = jobstepThreadPool.invoke([this, index] { this->trackMem(index); });
// starting 1 thread when in PM mode, since it's only inserting into a
// vector of rows. The rest will be started when converted to UM mode.
if (joiner->inUM())
if (joiners[index]->inUM())
{
for (int i = 0; i < numCores; i++)
{
@@ -320,7 +319,7 @@ void TupleHashJoinStep::startSmallRunners(uint index)
// wait for the first thread to join, then decide whether the others exist and need joining
jobstepThreadPool.join(jobs[0]);
if (joiner->inUM())
if (joiners[index]->inUM())
{
for (int i = 1; i < numCores; i++)
{
@@ -352,7 +351,7 @@ void TupleHashJoinStep::startSmallRunners(uint index)
end_time = boost::posix_time::microsec_clock::universal_time();
if (!(fSessionId & 0x80000000))
cout << "hash table construction time = " << end_time - start_time <<
" size = " << joiner->size() << endl;
" size = " << joiners[index]->size() << endl;
*/
if (traceOn())
@@ -361,13 +360,13 @@ void TupleHashJoinStep::startSmallRunners(uint index)
}
ostringstream oss;
if (!joiner->onDisk())
if (!joiners[index]->onDisk())
{
// add extended info, and if not aborted then tell joiner
// we're done reading the small side.
if (traceOn())
{
if (joiner->inPM())
if (joiners[index]->inPM())
{
{
oss << "PM join (" << index << ")" << endl;
@@ -377,7 +376,7 @@ void TupleHashJoinStep::startSmallRunners(uint index)
extendedInfo += oss.str();
}
}
else if (joiner->inUM())
else if (joiners[index]->inUM())
{
oss << "UM join (" << index << ")" << endl;
#ifdef JLF_DEBUG
@@ -387,7 +386,7 @@ void TupleHashJoinStep::startSmallRunners(uint index)
}
}
if (!cancelled())
joiner->doneInserting();
joiners[index]->doneInserting();
}
if (traceOn())