You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-01 06:46:55 +03:00
MCOL-5477 Disk join step improvement.
This patch: 1. Handles corner case when the bucket exceeded the memory limit, but we cannot redistribute the data in this bucket into new buckets based on a hash algorithm, because the rows have the same values. 2. Adds force option for disk join step. 3. Add a option to contol the depth of the partition tree.
This commit is contained in:
@ -34,6 +34,7 @@ using namespace logging;
|
||||
|
||||
namespace joiner
|
||||
{
|
||||
// FIXME: Possible overflow, we have to null it after clearing files.
|
||||
uint64_t uniqueNums = 0;
|
||||
|
||||
JoinPartition::JoinPartition()
|
||||
@ -44,7 +45,7 @@ JoinPartition::JoinPartition()
|
||||
/* This is the ctor used by THJS */
|
||||
JoinPartition::JoinPartition(const RowGroup& lRG, const RowGroup& sRG, const vector<uint32_t>& smallKeys,
|
||||
const vector<uint32_t>& largeKeys, bool typeless, bool antiWMN, bool hasFEFilter,
|
||||
uint64_t totalUMMemory, uint64_t partitionSize)
|
||||
uint64_t totalUMMemory, uint64_t partitionSize, uint32_t maxPartitionTreeDepth)
|
||||
: smallRG(sRG)
|
||||
, largeRG(lRG)
|
||||
, smallKeyCols(smallKeys)
|
||||
@ -54,6 +55,7 @@ JoinPartition::JoinPartition(const RowGroup& lRG, const RowGroup& sRG, const vec
|
||||
, htSizeEstimate(0)
|
||||
, htTargetSize(partitionSize)
|
||||
, rootNode(true)
|
||||
, canSplit(true)
|
||||
, antiWithMatchNulls(antiWMN)
|
||||
, needsAllNullRows(hasFEFilter)
|
||||
, gotNullRow(false)
|
||||
@ -63,6 +65,8 @@ JoinPartition::JoinPartition(const RowGroup& lRG, const RowGroup& sRG, const vec
|
||||
, maxSmallSize(0)
|
||||
, nextSmallOffset(0)
|
||||
, nextLargeOffset(0)
|
||||
, currentPartitionTreeDepth(0)
|
||||
, maxPartitionTreeDepth(maxPartitionTreeDepth)
|
||||
{
|
||||
config::Config* config = config::Config::makeConfig();
|
||||
string cfgTxt;
|
||||
@ -105,20 +109,17 @@ JoinPartition::JoinPartition(const RowGroup& lRG, const RowGroup& sRG, const vec
|
||||
}
|
||||
|
||||
if (compressionType == "LZ4")
|
||||
{
|
||||
compressor.reset(new compress::CompressInterfaceLZ4());
|
||||
}
|
||||
else
|
||||
{
|
||||
compressor.reset(new compress::CompressInterfaceSnappy());
|
||||
}
|
||||
|
||||
for (uint32_t i = 0; i < bucketCount; i++)
|
||||
buckets.push_back(boost::shared_ptr<JoinPartition>(new JoinPartition(*this, false)));
|
||||
buckets.push_back(
|
||||
boost::shared_ptr<JoinPartition>(new JoinPartition(*this, false, currentPartitionTreeDepth + 1)));
|
||||
}
|
||||
|
||||
/* Ctor used by JoinPartition on expansion, creates JP's in filemode */
|
||||
JoinPartition::JoinPartition(const JoinPartition& jp, bool splitMode)
|
||||
JoinPartition::JoinPartition(const JoinPartition& jp, bool splitMode, uint32_t currentPartitionTreeDepth)
|
||||
: smallRG(jp.smallRG)
|
||||
, largeRG(jp.largeRG)
|
||||
, smallKeyCols(jp.smallKeyCols)
|
||||
@ -131,6 +132,7 @@ JoinPartition::JoinPartition(const JoinPartition& jp, bool splitMode)
|
||||
, htSizeEstimate(0)
|
||||
, htTargetSize(jp.htTargetSize)
|
||||
, rootNode(false)
|
||||
, canSplit(true)
|
||||
, antiWithMatchNulls(jp.antiWithMatchNulls)
|
||||
, needsAllNullRows(jp.needsAllNullRows)
|
||||
, gotNullRow(false)
|
||||
@ -141,17 +143,12 @@ JoinPartition::JoinPartition(const JoinPartition& jp, bool splitMode)
|
||||
, maxSmallSize(0)
|
||||
, nextSmallOffset(0)
|
||||
, nextLargeOffset(0)
|
||||
, currentPartitionTreeDepth(currentPartitionTreeDepth)
|
||||
, maxPartitionTreeDepth(jp.maxPartitionTreeDepth)
|
||||
{
|
||||
ostringstream os;
|
||||
|
||||
fileMode = true;
|
||||
// tuning issue: with the defaults, each 100MB bucket would split s.t. the children
|
||||
// could store another 4GB total. Given a good hash and evenly distributed data,
|
||||
// the first level of expansion would happen for all JPs at once, giving a total
|
||||
// capacity of (4GB * 40) = 160GB, when actual usage at that point is a little over 4GB.
|
||||
// Instead, each will double in size, giving a capacity of 8GB -> 16 -> 32, and so on.
|
||||
// bucketCount = jp.bucketCount;
|
||||
bucketCount = 2;
|
||||
config::Config* config = config::Config::makeConfig();
|
||||
filenamePrefix = config->getTempFileDir(config::Config::TempDirPurpose::Joins);
|
||||
|
||||
@ -270,9 +267,6 @@ int64_t JoinPartition::doneInsertingSmallData()
|
||||
smallSizeOnDisk += leafNodeIncrement;
|
||||
}
|
||||
|
||||
// else
|
||||
// cout << uniqueID << " htsizeestimate = " << htSizeEstimate << endl;
|
||||
|
||||
if (!rootNode)
|
||||
{
|
||||
buffer.reinit(largeRG);
|
||||
@ -314,25 +308,92 @@ int64_t JoinPartition::doneInsertingLargeData()
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool JoinPartition::canConvertToSplitMode()
|
||||
{
|
||||
// TODO: Make depth configurable.
|
||||
if (!canSplit || currentPartitionTreeDepth >= maxPartitionTreeDepth)
|
||||
return false;
|
||||
|
||||
ByteStream bs;
|
||||
RowGroup& rg = smallRG;
|
||||
Row& row = smallRow;
|
||||
RGData rgData;
|
||||
uint64_t totalRowCount = 0;
|
||||
std::unordered_map<uint32_t, uint32_t> rowDist;
|
||||
|
||||
nextSmallOffset = 0;
|
||||
while (1)
|
||||
{
|
||||
uint32_t hash;
|
||||
readByteStream(0, &bs);
|
||||
|
||||
if (bs.length() == 0)
|
||||
break;
|
||||
|
||||
rgData.deserialize(bs);
|
||||
rg.setData(&rgData);
|
||||
|
||||
for (uint32_t j = 0, e = rg.getRowCount(); j < e; ++j)
|
||||
{
|
||||
rg.getRow(j, &row);
|
||||
|
||||
if (antiWithMatchNulls && hasNullJoinColumn(row))
|
||||
continue;
|
||||
|
||||
uint64_t tmp;
|
||||
if (typelessJoin)
|
||||
hash = getHashOfTypelessKey(row, smallKeyCols, hashSeed) % bucketCount;
|
||||
else
|
||||
{
|
||||
if (UNLIKELY(row.isUnsigned(smallKeyCols[0])))
|
||||
tmp = row.getUintField(smallKeyCols[0]);
|
||||
else
|
||||
tmp = row.getIntField(smallKeyCols[0]);
|
||||
|
||||
hash = hasher((char*)&tmp, 8, hashSeed);
|
||||
hash = hasher.finalize(hash, 8) % bucketCount;
|
||||
}
|
||||
|
||||
totalRowCount++;
|
||||
rowDist[hash]++;
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& [hash, currentRowCount] : rowDist)
|
||||
{
|
||||
if (currentRowCount == totalRowCount)
|
||||
{
|
||||
canSplit = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
rg.setData(&buffer);
|
||||
rg.resetRowGroup(0);
|
||||
rg.getRow(0, &row);
|
||||
|
||||
return canSplit;
|
||||
}
|
||||
|
||||
int64_t JoinPartition::convertToSplitMode()
|
||||
{
|
||||
int i, j;
|
||||
#ifdef DEBUG_DJS
|
||||
cout << "Convert to split mode " << endl;
|
||||
#endif
|
||||
ByteStream bs;
|
||||
RGData rgData;
|
||||
uint32_t hash;
|
||||
uint64_t tmp;
|
||||
int64_t ret = -(int64_t)smallSizeOnDisk; // smallFile gets deleted
|
||||
boost::scoped_array<uint32_t> rowDist(new uint32_t[bucketCount]);
|
||||
uint32_t rowCount = 0;
|
||||
|
||||
memset(rowDist.get(), 0, sizeof(uint32_t) * bucketCount);
|
||||
fileMode = false;
|
||||
|
||||
htSizeEstimate = 0;
|
||||
smallSizeOnDisk = 0;
|
||||
buckets.reserve(bucketCount);
|
||||
|
||||
for (i = 0; i < (int)bucketCount; i++)
|
||||
buckets.push_back(boost::shared_ptr<JoinPartition>(new JoinPartition(*this, false)));
|
||||
buckets.reserve(bucketCount);
|
||||
for (uint32_t i = 0; i < bucketCount; i++)
|
||||
buckets.push_back(
|
||||
boost::shared_ptr<JoinPartition>(new JoinPartition(*this, false, currentPartitionTreeDepth + 1)));
|
||||
|
||||
RowGroup& rg = smallRG;
|
||||
Row& row = smallRow;
|
||||
@ -348,7 +409,7 @@ int64_t JoinPartition::convertToSplitMode()
|
||||
rgData.deserialize(bs);
|
||||
rg.setData(&rgData);
|
||||
|
||||
for (j = 0; j < (int)rg.getRowCount(); j++)
|
||||
for (uint32_t j = 0; j < rg.getRowCount(); j++)
|
||||
{
|
||||
rg.getRow(j, &row);
|
||||
|
||||
@ -356,7 +417,7 @@ int64_t JoinPartition::convertToSplitMode()
|
||||
{
|
||||
if (needsAllNullRows || !gotNullRow)
|
||||
{
|
||||
for (j = 0; j < (int)bucketCount; j++)
|
||||
for (j = 0; j < bucketCount; j++)
|
||||
ret += buckets[j]->insertSmallSideRow(row);
|
||||
|
||||
gotNullRow = true;
|
||||
@ -377,20 +438,14 @@ int64_t JoinPartition::convertToSplitMode()
|
||||
hash = hasher((char*)&tmp, 8, hashSeed);
|
||||
hash = hasher.finalize(hash, 8) % bucketCount;
|
||||
}
|
||||
|
||||
rowCount++;
|
||||
rowDist[hash]++;
|
||||
ret += buckets[hash]->insertSmallSideRow(row);
|
||||
buckets[hash]->insertSmallSideRow(row);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
boost::filesystem::remove(smallFilename);
|
||||
smallFilename.clear();
|
||||
|
||||
for (i = 0; i < (int)bucketCount; i++)
|
||||
if (rowDist[i] == rowCount)
|
||||
throw IDBExcept("All rows hashed to the same bucket", ERR_DBJ_DATA_DISTRIBUTION);
|
||||
|
||||
rg.setData(&buffer);
|
||||
rg.resetRowGroup(0);
|
||||
rg.getRow(0, &row);
|
||||
@ -418,30 +473,18 @@ int64_t JoinPartition::processSmallBuffer(RGData& rgData)
|
||||
int64_t ret = 0;
|
||||
|
||||
rg.setData(&rgData);
|
||||
// if (rootNode)
|
||||
// cout << "smallside RGData: " << rg.toString() << endl;
|
||||
|
||||
if (fileMode)
|
||||
{
|
||||
ByteStream bs;
|
||||
rg.serializeRGData(bs);
|
||||
// cout << "writing RGData: " << rg.toString() << endl;
|
||||
|
||||
ret = writeByteStream(0, bs);
|
||||
// cout << "wrote " << ret << " bytes" << endl;
|
||||
|
||||
/* Check whether this partition is now too big -> convert to split mode.
|
||||
|
||||
The current estimate is based on 100M 4-byte rows = 4GB. The total size is
|
||||
the amount stored in RowGroups in mem + the size of the hash table. The RowGroups
|
||||
in that case use 600MB, so 3.4GB is used by the hash table. 3.4GB/100M rows = 34 bytes/row
|
||||
*/
|
||||
htSizeEstimate += rg.getDataSize() + (34 * rg.getRowCount());
|
||||
|
||||
if (htSizeEstimate > htTargetSize)
|
||||
htSizeEstimate += rg.getRowCount() * rg.getColumnCount() * 64;
|
||||
// Check whether this partition is now too big -> convert to split mode.
|
||||
if (htTargetSize < htSizeEstimate && canConvertToSplitMode())
|
||||
ret += convertToSplitMode();
|
||||
|
||||
// cout << "wrote some data, returning " << ret << endl;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -478,19 +521,16 @@ int64_t JoinPartition::processSmallBuffer(RGData& rgData)
|
||||
hash = hasher.finalize(hash, 8) % bucketCount;
|
||||
}
|
||||
|
||||
// cout << "hashing smallside row: " << row.toString() << endl;
|
||||
ret += buckets[hash]->insertSmallSideRow(row);
|
||||
}
|
||||
|
||||
// cout << "distributed rows, returning " << ret << endl;
|
||||
}
|
||||
|
||||
smallSizeOnDisk += ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* the difference between processSmall & processLarge is mostly the names of
|
||||
variables being small* -> large*, template? */
|
||||
// the difference between processSmall & processLarge is mostly the names of
|
||||
// variables being small* -> large*, template? */
|
||||
|
||||
int64_t JoinPartition::processLargeBuffer()
|
||||
{
|
||||
@ -511,11 +551,8 @@ int64_t JoinPartition::processLargeBuffer(RGData& rgData)
|
||||
|
||||
rg.setData(&rgData);
|
||||
|
||||
// if (rootNode)
|
||||
// cout << "largeside RGData: " << rg.toString() << endl;
|
||||
|
||||
/* Need to fail a query with an anti join, an FE filter, and a NULL row on the
|
||||
large side b/c it needs to be joined with the entire small side table. */
|
||||
// Need to fail a query with an anti join, an FE filter, and a NULL row on the
|
||||
// large side b/c it needs to be joined with the entire small side table.
|
||||
if (antiWithMatchNulls && needsAllNullRows)
|
||||
{
|
||||
rg.getRow(0, &row);
|
||||
@ -534,9 +571,7 @@ int64_t JoinPartition::processLargeBuffer(RGData& rgData)
|
||||
{
|
||||
ByteStream bs;
|
||||
rg.serializeRGData(bs);
|
||||
// cout << "writing large RGData: " << rg.toString() << endl;
|
||||
ret = writeByteStream(1, bs);
|
||||
// cout << "wrote " << ret << " bytes" << endl;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -560,7 +595,6 @@ int64_t JoinPartition::processLargeBuffer(RGData& rgData)
|
||||
hash = hasher.finalize(hash, 8) % bucketCount;
|
||||
}
|
||||
|
||||
// cout << "large side hashing row: " << row.toString() << endl;
|
||||
ret += buckets[hash]->insertLargeSideRow(row);
|
||||
}
|
||||
}
|
||||
@ -569,49 +603,18 @@ int64_t JoinPartition::processLargeBuffer(RGData& rgData)
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool JoinPartition::getNextPartition(vector<RGData>* smallData, uint64_t* partitionID, JoinPartition** jp)
|
||||
void JoinPartition::collectJoinPartitions(std::vector<JoinPartition*>& joinPartitions)
|
||||
{
|
||||
if (fileMode)
|
||||
{
|
||||
ByteStream bs;
|
||||
RGData rgData;
|
||||
|
||||
if (nextPartitionToReturn > 0)
|
||||
return false;
|
||||
|
||||
// cout << "reading the small side" << endl;
|
||||
nextSmallOffset = 0;
|
||||
|
||||
while (1)
|
||||
{
|
||||
readByteStream(0, &bs);
|
||||
|
||||
if (bs.length() == 0)
|
||||
break;
|
||||
|
||||
rgData.deserialize(bs);
|
||||
// smallRG.setData(&rgData);
|
||||
// cout << "read a smallRG with " << smallRG.getRowCount() << " rows" << endl;
|
||||
smallData->push_back(rgData);
|
||||
}
|
||||
|
||||
nextPartitionToReturn = 1;
|
||||
*partitionID = uniqueID;
|
||||
*jp = this;
|
||||
return true;
|
||||
joinPartitions.push_back(this);
|
||||
return;
|
||||
}
|
||||
|
||||
bool ret = false;
|
||||
|
||||
while (!ret && nextPartitionToReturn < bucketCount)
|
||||
for (uint32_t currentBucket = 0; currentBucket < bucketCount; ++currentBucket)
|
||||
{
|
||||
ret = buckets[nextPartitionToReturn]->getNextPartition(smallData, partitionID, jp);
|
||||
|
||||
if (!ret)
|
||||
nextPartitionToReturn++;
|
||||
buckets[currentBucket]->collectJoinPartitions(joinPartitions);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
boost::shared_ptr<RGData> JoinPartition::getNextLargeRGData()
|
||||
@ -627,10 +630,7 @@ boost::shared_ptr<RGData> JoinPartition::getNextLargeRGData()
|
||||
ret->deserialize(bs);
|
||||
}
|
||||
else
|
||||
{
|
||||
boost::filesystem::remove(largeFilename);
|
||||
largeSizeOnDisk = 0;
|
||||
}
|
||||
nextLargeOffset = 0;
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -682,7 +682,6 @@ void JoinPartition::initForLargeSideFeed()
|
||||
|
||||
void JoinPartition::saveSmallSidePartition(vector<RGData>& rgData)
|
||||
{
|
||||
// cout << "JP: saving partition: " << id << endl;
|
||||
htSizeEstimate = 0;
|
||||
smallSizeOnDisk = 0;
|
||||
nextSmallOffset = 0;
|
||||
@ -806,7 +805,7 @@ uint64_t JoinPartition::writeByteStream(int which, ByteStream& bs)
|
||||
|
||||
if (!useCompression)
|
||||
{
|
||||
ret = len + 4;
|
||||
ret = len + sizeof(len);
|
||||
fs.write((char*)&len, sizeof(len));
|
||||
fs.write((char*)bs.buf(), len);
|
||||
saveErrno = errno;
|
||||
@ -828,7 +827,7 @@ uint64_t JoinPartition::writeByteStream(int which, ByteStream& bs)
|
||||
boost::scoped_array<uint8_t> compressed(new uint8_t[maxSize]);
|
||||
|
||||
compressor->compress((char*)bs.buf(), len, (char*)compressed.get(), &actualSize);
|
||||
ret = actualSize + 4 + 8; // sizeof (size_t) == 8. Why 4?
|
||||
ret = actualSize + sizeof(len); // sizeof (size_t) == 8. Why 4?
|
||||
fs.write((char*)&actualSize, sizeof(actualSize));
|
||||
// Save uncompressed len.
|
||||
fs.write((char*)&len, sizeof(len));
|
||||
@ -843,7 +842,7 @@ uint64_t JoinPartition::writeByteStream(int which, ByteStream& bs)
|
||||
throw IDBExcept(os.str().c_str(), ERR_DBJ_FILE_IO_ERROR);
|
||||
}
|
||||
|
||||
totalBytesWritten += sizeof(actualSize) + actualSize;
|
||||
totalBytesWritten += sizeof(len) + actualSize;
|
||||
}
|
||||
|
||||
bs.advance(len);
|
||||
|
Reference in New Issue
Block a user