1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

fix(PrimProc): MCOL-5651 Add a workaround to avoid choosing an incorr… (#3320)

* fix(PrimProc): MCOL-5651 Add a workaround to avoid choosing an incorrect TupleHashJoinStep as a joiner
This commit is contained in:
Alexey Antipovsky
2024-11-08 18:44:20 +01:00
committed by GitHub
parent 42be2cb7e0
commit 842a3c8a40
7 changed files with 242 additions and 118 deletions

View File

@ -39,7 +39,7 @@
#include <sstream>
#include <set>
#include "serviceexemgr.h"
#include <stdlib.h>
#include <cstdlib>
using namespace std;
#include <boost/thread.hpp>
@ -148,7 +148,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor()
pp.setLogicalBlockMode(true);
pp.setBlockPtr((int*)blockData);
pp.setBlockPtrAux((int*)blockDataAux);
pthread_mutex_init(&objLock, NULL);
pthread_mutex_init(&objLock, nullptr);
}
BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch,
@ -209,7 +209,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch,
pp.setBlockPtr((int*)blockData);
pp.setBlockPtrAux((int*)blockDataAux);
sendThread = bppst;
pthread_mutex_init(&objLock, NULL);
pthread_mutex_init(&objLock, nullptr);
initBPP(b);
}
@ -417,7 +417,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
for (i = 0; i < joinerCount; i++)
{
smallSideRowLengths[i] = smallSideRGs[i].getRowSize();
;
smallSideRowData[i] = RGData(smallSideRGs[i], tJoinerSizes[i]);
smallSideRGs[i].setData(&smallSideRowData[i]);
smallSideRGs[i].resetRowGroup(0);
@ -467,7 +466,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
{
hasFilterStep = true;
if (dynamic_cast<StrFilterCmd*>(filterSteps[i].get()) != NULL)
if (dynamic_cast<StrFilterCmd*>(filterSteps[i].get()) != nullptr)
filtOnString = true;
}
else if (type == Command::DICT_STEP || type == Command::RID_TO_STRING)
@ -498,10 +497,9 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
bs >> *(fAggregator.get());
// If there's UDAF involved, set up for PM processing
for (uint64_t i = 0; i < fAggregator->getAggFunctions().size(); i++)
for (const auto & pcol : fAggregator->getAggFunctions())
{
RowUDAFFunctionCol* rowUDAF =
dynamic_cast<RowUDAFFunctionCol*>(fAggregator->getAggFunctions()[i].get());
auto* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(pcol.get());
if (rowUDAF)
{
@ -553,10 +551,10 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w, con
ridMap = 0;
baseRid = absRids[0] & 0xffffffffffffe000ULL;
for (uint32_t i = 0; i < ridCount; i++)
for (uint32_t j = 0; j < ridCount; j++)
{
relRids[i] = absRids[i] - baseRid;
ridMap |= 1 << (relRids[i] >> 9);
relRids[j] = absRids[j] - baseRid;
ridMap |= 1 << (relRids[j] >> 9);
}
}
else
@ -583,7 +581,7 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w, con
projectSteps[i]->resetCommand(bs);
}
idbassert(bs.length() == 0);
idbassert(bs.empty());
/* init vars not part of the BS */
currentBlockOffset = 0;
@ -1098,7 +1096,7 @@ void BatchPrimitiveProcessor::initProcessor()
}
}
if (fAggregator.get() != NULL)
if (fAggregator.get() != nullptr)
{
fAggRowGroupData.reinit(fAggregateRG);
fAggregateRG.setData(&fAggRowGroupData);
@ -1164,7 +1162,6 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid, RowGroup&
for (j = 0; j < joinerCount; j++)
{
bool found;
if (UNLIKELY(joinTypes[j] & ANTI))
{
if (joinTypes[j] & WITHFCNEXP)
@ -1184,7 +1181,7 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid, RowGroup&
largeKey = oldRow.getIntField(colIndex);
uint bucket = bucketPicker((char*)&largeKey, 8, bpSeed) & ptMask;
bool joinerIsEmpty = tJoiners[j][bucket]->empty() ? true : false;
bool joinerIsEmpty = tJoiners[j][bucket]->empty();
found = (tJoiners[j][bucket]->find(largeKey) != tJoiners[j][bucket]->end());
isNull = oldRow.isNullValue(colIndex);
@ -1221,8 +1218,8 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid, RowGroup&
{
bool hasNull = false;
for (uint32_t z = 0; z < tlLargeSideKeyColumns[j].size(); z++)
if (oldRow.isNullValue(tlLargeSideKeyColumns[j][z]))
for (unsigned int column: tlLargeSideKeyColumns[j])
if (oldRow.isNullValue(column))
{
hasNull = true;
break;
@ -1396,7 +1393,7 @@ void BatchPrimitiveProcessor::execute()
{
ColumnCommand* col = dynamic_cast<ColumnCommand*>(filterSteps[0].get());
if ((col != NULL) && (col->getFilterCount() == 0) && (col->getLBID() != 0))
if ((col != nullptr) && (col->getFilterCount() == 0) && (col->getLBID() != 0))
{
// stored in last pos in relLBID[] and asyncLoaded[]
uint64_t p = projectCount;
@ -2452,7 +2449,7 @@ SBPP BatchPrimitiveProcessor::duplicate()
for (i = 0; i < projectCount; ++i)
bpp->projectSteps[i] = projectSteps[i]->duplicate();
if (fAggregator.get() != NULL)
if (fAggregator.get() != nullptr)
{
bpp->fAggregateRG = fAggregateRG;
bpp->fAggregator.reset(new RowAggregation(fAggregator->getGroupByCols(), fAggregator->getAggFunctions()));
@ -2551,7 +2548,7 @@ void BatchPrimitiveProcessor::asyncLoadProjectColumns()
// only care about column commands
ColumnCommand* col = dynamic_cast<ColumnCommand*>(projectSteps[i].get());
if (col != NULL)
if (col != nullptr)
{
asyncLoaded[i] = asyncLoaded[i] && (relLBID[i] % blocksReadAhead != 0);
relLBID[i] += col->getWidth();
@ -2706,12 +2703,14 @@ inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jInde
{
bool hasNullValue = false;
for (uint32_t i = 0; i < tlLargeSideKeyColumns[jIndex].size(); i++)
if (r.isNullValue(tlLargeSideKeyColumns[jIndex][i]))
for (unsigned int column: tlLargeSideKeyColumns[jIndex])
{
if (r.isNullValue(column))
{
hasNullValue = true;
break;
}
}
if (hasNullValue)
{