1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-4173 This patch adds support for wide-DECIMAL INNER, OUTER, SEMI, functional JOINs

based on top of TypelessData
This commit is contained in:
Roman Nozdrin
2021-02-16 10:23:49 +00:00
parent 4ecd561878
commit bed0b7c6bc
22 changed files with 2347 additions and 228 deletions

View File

@ -129,6 +129,9 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor() :
hasFilterStep(false),
filtOnString(false),
prefetchThreshold(0),
mJOINHasSkewedKeyColumn(false),
mSmallSideRGPtr(nullptr),
mSmallSideKeyColumnsPtr(nullptr),
hasDictStep(false),
sockIndex(0),
endOfJoinerRan(false),
@ -175,6 +178,9 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch,
hasFilterStep(false),
filtOnString(false),
prefetchThreshold(prefetch),
mJOINHasSkewedKeyColumn(false),
mSmallSideRGPtr(nullptr),
mSmallSideKeyColumnsPtr(nullptr),
hasDictStep(false),
sockIndex(0),
endOfJoinerRan(false),
@ -297,7 +303,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
for (uint j = 0; j < joinerCount; ++j)
tJoiners[j].reset(new boost::shared_ptr<TJoiner>[processorThreads]);
//_pools.reset(new boost::shared_ptr<utils::SimplePool>[joinerCount]);
tlJoiners.reset(new boost::shared_array<boost::shared_ptr<TLJoiner> >[joinerCount]);
for (uint j = 0; j < joinerCount; ++j)
tlJoiners[j].reset(new boost::shared_ptr<TLJoiner>[processorThreads]);
@ -310,8 +315,9 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
tJoinerSizes.reset(new std::atomic<uint32_t>[joinerCount]);
largeSideKeyColumns.reset(new uint32_t[joinerCount]);
tlLargeSideKeyColumns.reset(new vector<uint32_t>[joinerCount]);
tlSmallSideKeyColumns.reset(new std::vector<uint32_t>);
typelessJoin.reset(new bool[joinerCount]);
tlKeyLengths.reset(new uint32_t[joinerCount]);
tlSmallSideKeyLengths.reset(new uint32_t[joinerCount]);
storedKeyAllocators.reset(new PoolAllocator[joinerCount]);
for (uint j = 0; j < joinerCount; ++j)
@ -322,6 +328,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
joinFEFilters.reset(new scoped_ptr<FuncExpWrapper>[joinerCount]);
hasJoinFEFilters = false;
hasSmallOuterJoin = false;
bool smallSideRGRecvd = false;
for (i = 0; i < joinerCount; i++)
{
@ -356,14 +363,31 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
else
{
deserializeVector<uint32_t>(bs, tlLargeSideKeyColumns[i]);
bs >> tlKeyLengths[i];
//storedKeyAllocators[i] = PoolAllocator();
bs >> tlSmallSideKeyLengths[i];
bs >> mJOINHasSkewedKeyColumn;
// Deser smallSideRG if key data types are different, e.g. INT vs wide-DECIMAL.
if (mJOINHasSkewedKeyColumn && !smallSideRGRecvd)
{
smallSideRGs.emplace_back(rowgroup::RowGroup(bs));
// LargeSide key columns number equals to SmallSide key columns number.
deserializeVector<uint32_t>(bs, *tlSmallSideKeyColumns);
mSmallSideRGPtr = &smallSideRGs[0];
mSmallSideKeyColumnsPtr = &(*tlSmallSideKeyColumns);
smallSideRGRecvd = true;
}
for (uint j = 0; j < processorThreads; ++j)
tlJoiners[i][j].reset(new TLJoiner(10,
TupleJoiner::TypelessDataHasher(&outputRG,
&tlLargeSideKeyColumns[i]),
TupleJoiner::TypelessDataComparator(&outputRG,
&tlLargeSideKeyColumns[i])));
{
auto tlHasher = TupleJoiner::TypelessDataHasher(&outputRG,
&tlLargeSideKeyColumns[i],
mSmallSideKeyColumnsPtr,
mSmallSideRGPtr);
auto tlComparator = TupleJoiner::TypelessDataComparator(&outputRG,
&tlLargeSideKeyColumns[i],
mSmallSideKeyColumnsPtr,
mSmallSideRGPtr);
tlJoiners[i][j].reset(new TLJoiner(10, tlHasher, tlComparator));
}
}
}
@ -610,7 +634,6 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
if (typelessJoin[joinerNum])
{
utils::VLArray<vector<pair<TypelessData, uint32_t> > > tmpBuckets(processorThreads);
TypelessData tlLargeKey;
uint8_t nullFlag;
PoolAllocator &storedKeyAllocator = storedKeyAllocators[joinerNum];
// this first loop hashes incoming values into vectors that parallel the hash tables.
@ -620,10 +643,20 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
bs >> nullFlag;
if (nullFlag == 0)
{
tlLargeKey.deserialize(bs, storedKeyAllocator);
TypelessData tlSmallSideKey(bs, storedKeyAllocator);
if (mJOINHasSkewedKeyColumn)
tlSmallSideKey.setSmallSideWithSkewedData();
else
tlSmallSideKey.setSmallSide();
bs >> tlIndex;
bucket = tlLargeKey.hash(outputRG, tlLargeSideKeyColumns[joinerNum]) & ptMask;
tmpBuckets[bucket].push_back(make_pair(tlLargeKey, tlIndex));
// The bucket number corresponds with the index used later inserting TL keys into permanent JOIN hash map.
auto ha = tlSmallSideKey.hash(outputRG,
tlLargeSideKeyColumns[joinerNum],
mSmallSideKeyColumnsPtr,
mSmallSideRGPtr);
bucket = ha & ptMask;
tmpBuckets[bucket].push_back(make_pair(tlSmallSideKey, tlIndex));
}
else
++nullCount;
@ -914,11 +947,6 @@ void BatchPrimitiveProcessor::initProcessor()
{
outputRG.initRow(&oldRow);
outputRG.initRow(&newRow);
tmpKeyAllocators.reset(new FixedAllocator[joinerCount]);
for (i = 0; i < joinerCount; i++)
if (typelessJoin[i])
tmpKeyAllocators[i] = FixedAllocator(tlKeyLengths[i], true);
tSmallSideMatches.reset(new MatchedData[joinerCount]);
keyColumnProj.reset(new bool[projectCount]);
@ -1126,7 +1154,6 @@ void BatchPrimitiveProcessor::executeTupleJoin()
uint32_t newRowCount = 0, i, j;
vector<uint32_t> matches;
uint64_t largeKey;
TypelessData tlLargeKey;
outputRG.getRow(0, &oldRow);
outputRG.getRow(0, &newRow);
@ -1195,8 +1222,10 @@ void BatchPrimitiveProcessor::executeTupleJoin()
{
//cout << " typeless join\n";
// the null values are not sent by UM in typeless case. null -> !found
tlLargeKey = TypelessData(&oldRow);
uint bucket = oldRow.hashTypeless(tlLargeSideKeyColumns[j]) & ptMask;
TypelessData tlLargeKey(&oldRow);
uint bucket = oldRow.hashTypeless(tlLargeSideKeyColumns[j],
mSmallSideKeyColumnsPtr,
mSmallSideRGPtr ? &mSmallSideRGPtr->getColWidths() : nullptr) & ptMask;
found = tlJoiners[j][bucket]->find(tlLargeKey) != tlJoiners[j][bucket]->end();
if ((!found && !(joinTypes[j] & (LARGEOUTER | ANTI))) ||
@ -1335,21 +1364,23 @@ void BatchPrimitiveProcessor::executeTupleJoin()
/* Finally, copy the row into the output */
if (j == joinerCount)
{
// We need to update 8 and 16 bytes in values and wide128Values buffers
// otherwise unrelated values will be observed in the JOIN-ed output RGData.
if (i != newRowCount)
{
values[newRowCount] = values[i];
if (mJOINHasSkewedKeyColumn)
wide128Values[newRowCount] = wide128Values[i];
relRids[newRowCount] = relRids[i];
copyRow(oldRow, &newRow);
//cout << "joined row: " << newRow.toString() << endl;
//memcpy(newRow.getData(), oldRow.getData(), oldRow.getSize());
//cout << "joined row: " << newRow.toString() << endl;
}
newRowCount++;
newRow.nextRow();
}
//else
// cout << "j != joinerCount\n";
// cout << "j != joinerCount\n";
}
}
@ -2220,7 +2251,6 @@ int BatchPrimitiveProcessor::operator()()
}
catch (std::exception& e)
{
cerr << "BPP::sendResponse(): " << e.what() << endl;
break; // If we make this throw, be sure to do the cleanup at the end
}
@ -2382,13 +2412,22 @@ SBPP BatchPrimitiveProcessor::duplicate()
//bpp->_pools = _pools;
bpp->typelessJoin = typelessJoin;
bpp->tlLargeSideKeyColumns = tlLargeSideKeyColumns;
bpp->tlSmallSideKeyColumns = tlSmallSideKeyColumns;
bpp->tlJoiners = tlJoiners;
bpp->tlKeyLengths = tlKeyLengths;
bpp->tlSmallSideKeyLengths = tlSmallSideKeyLengths;
bpp->storedKeyAllocators = storedKeyAllocators;
bpp->joinNullValues = joinNullValues;
bpp->doMatchNulls = doMatchNulls;
bpp->hasJoinFEFilters = hasJoinFEFilters;
bpp->hasSmallOuterJoin = hasSmallOuterJoin;
bpp->mJOINHasSkewedKeyColumn = mJOINHasSkewedKeyColumn;
bpp->mSmallSideRGPtr = mSmallSideRGPtr;
bpp->mSmallSideKeyColumnsPtr = mSmallSideKeyColumnsPtr;
if (!getTupleJoinRowGroupData && mJOINHasSkewedKeyColumn)
{
idbassert(!smallSideRGs.empty());
bpp->smallSideRGs.push_back(smallSideRGs[0]);
}
if (hasJoinFEFilters)
{
@ -2714,7 +2753,9 @@ inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jInde
}
TypelessData largeKey(&r);
bucket = r.hashTypeless(tlLargeSideKeyColumns[jIndex]) & ptMask;
bucket = r.hashTypeless(tlLargeSideKeyColumns[jIndex],
mSmallSideKeyColumnsPtr,
mSmallSideRGPtr ? &mSmallSideRGPtr->getColWidths() : nullptr) & ptMask;
pair<TLJoiner::iterator, TLJoiner::iterator> range =
tlJoiners[jIndex][bucket]->equal_range(largeKey);
for (; range.first != range.second; ++range.first)