1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Merge pull request #2851 from denis0x0D/MCOL-5477

MCOL-5477 Disk join step improvement.
This commit is contained in:
Denis Khalikov
2023-06-26 11:02:20 +03:00
committed by GitHub
13 changed files with 309 additions and 165 deletions

View File

@ -85,6 +85,8 @@ CalpontSelectExecutionPlan::CalpontSelectExecutionPlan(const int location)
, fDJSSmallSideLimit(0) , fDJSSmallSideLimit(0)
, fDJSLargeSideLimit(0) , fDJSLargeSideLimit(0)
, fDJSPartitionSize(100 * 1024 * 1024) , fDJSPartitionSize(100 * 1024 * 1024)
, fDJSMaxPartitionTreeDepth(8)
, fDJSForceRun(false)
, // 100MB mem usage for disk based join, , // 100MB mem usage for disk based join,
fUMMemLimit(numeric_limits<int64_t>::max()) fUMMemLimit(numeric_limits<int64_t>::max())
, fIsDML(false) , fIsDML(false)
@ -457,6 +459,8 @@ void CalpontSelectExecutionPlan::serialize(messageqcpp::ByteStream& b) const
b << fDJSSmallSideLimit; b << fDJSSmallSideLimit;
b << fDJSLargeSideLimit; b << fDJSLargeSideLimit;
b << fDJSPartitionSize; b << fDJSPartitionSize;
b << fDJSMaxPartitionTreeDepth;
b << (uint8_t)fDJSForceRun;
b << fUMMemLimit; b << fUMMemLimit;
b << (uint8_t)fIsDML; b << (uint8_t)fIsDML;
messageqcpp::ByteStream::octbyte timeZone = fTimeZone; messageqcpp::ByteStream::octbyte timeZone = fTimeZone;
@ -652,6 +656,8 @@ void CalpontSelectExecutionPlan::unserialize(messageqcpp::ByteStream& b)
b >> fDJSSmallSideLimit; b >> fDJSSmallSideLimit;
b >> fDJSLargeSideLimit; b >> fDJSLargeSideLimit;
b >> fDJSPartitionSize; b >> fDJSPartitionSize;
b >> fDJSMaxPartitionTreeDepth;
b >> (uint8_t&)fDJSForceRun;
b >> fUMMemLimit; b >> fUMMemLimit;
b >> tmp8; b >> tmp8;
fIsDML = tmp8; fIsDML = tmp8;

View File

@ -688,6 +688,24 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan
return fDJSPartitionSize; return fDJSPartitionSize;
} }
void djsMaxPartitionTreeDepth(uint32_t value)
{
fDJSMaxPartitionTreeDepth = value;
}
uint64_t djsMaxPartitionTreeDepth()
{
return fDJSMaxPartitionTreeDepth;
}
void djsForceRun(bool b)
{
fDJSForceRun = b;
}
bool djsForceRun()
{
return fDJSForceRun;
}
void umMemLimit(uint64_t l) void umMemLimit(uint64_t l)
{ {
fUMMemLimit = l; fUMMemLimit = l;
@ -920,6 +938,8 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan
uint64_t fDJSSmallSideLimit = 0; uint64_t fDJSSmallSideLimit = 0;
uint64_t fDJSLargeSideLimit = 0; uint64_t fDJSLargeSideLimit = 0;
uint64_t fDJSPartitionSize = 100 * 1024 * 1024; uint64_t fDJSPartitionSize = 100 * 1024 * 1024;
uint32_t fDJSMaxPartitionTreeDepth = 8;
bool fDJSForceRun = false;
int64_t fUMMemLimit = numeric_limits<int64_t>::max(); int64_t fUMMemLimit = numeric_limits<int64_t>::max();
bool fIsDML = false; bool fIsDML = false;
long fTimeZone = 0; long fTimeZone = 0;

View File

@ -81,6 +81,7 @@ DiskJoinStep::DiskJoinStep(TupleHashJoinStep* t, int djsIndex, int joinIndex, bo
smallLimit = thjs->djsSmallLimit; smallLimit = thjs->djsSmallLimit;
largeLimit = thjs->djsLargeLimit; largeLimit = thjs->djsLargeLimit;
partitionSize = thjs->djsPartitionSize; partitionSize = thjs->djsPartitionSize;
maxPartitionTreeDepth = thjs->djsMaxPartitionTreeDepth;
if (smallLimit == 0) if (smallLimit == 0)
smallLimit = numeric_limits<int64_t>::max(); smallLimit = numeric_limits<int64_t>::max();
@ -91,7 +92,7 @@ DiskJoinStep::DiskJoinStep(TupleHashJoinStep* t, int djsIndex, int joinIndex, bo
uint64_t totalUMMemory = thjs->resourceManager->getConfiguredUMMemLimit(); uint64_t totalUMMemory = thjs->resourceManager->getConfiguredUMMemLimit();
jp.reset(new JoinPartition(largeRG, smallRG, smallKeyCols, largeKeyCols, typeless, jp.reset(new JoinPartition(largeRG, smallRG, smallKeyCols, largeKeyCols, typeless,
(joinType & ANTI) && (joinType & MATCHNULLS), (bool)fe, totalUMMemory, (joinType & ANTI) && (joinType & MATCHNULLS), (bool)fe, totalUMMemory,
partitionSize)); partitionSize, maxPartitionTreeDepth));
if (cancelled()) if (cancelled())
{ {
@ -163,7 +164,6 @@ void DiskJoinStep::smallReader()
RGData rgData; RGData rgData;
bool more = true; bool more = true;
int64_t memUsage = 0, combinedMemUsage = 0; int64_t memUsage = 0, combinedMemUsage = 0;
[[maybe_unused]] int rowCount = 0;
RowGroup l_smallRG = smallRG; RowGroup l_smallRG = smallRG;
try try
@ -175,7 +175,6 @@ void DiskJoinStep::smallReader()
if (more) if (more)
{ {
l_smallRG.setData(&rgData); l_smallRG.setData(&rgData);
rowCount += l_smallRG.getRowCount();
memUsage = jp->insertSmallSideRGData(rgData); memUsage = jp->insertSmallSideRGData(rgData);
combinedMemUsage = atomicops::atomicAdd(smallUsage.get(), memUsage); combinedMemUsage = atomicops::atomicAdd(smallUsage.get(), memUsage);
@ -224,7 +223,6 @@ void DiskJoinStep::largeReader()
RGData rgData; RGData rgData;
bool more = true; bool more = true;
int64_t largeSize = 0; int64_t largeSize = 0;
[[maybe_unused]] int rowCount = 0;
RowGroup l_largeRG = largeRG; RowGroup l_largeRG = largeRG;
largeIterationCount++; largeIterationCount++;
@ -239,7 +237,6 @@ void DiskJoinStep::largeReader()
if (more) if (more)
{ {
l_largeRG.setData(&rgData); l_largeRG.setData(&rgData);
rowCount += l_largeRG.getRowCount();
// cout << "large side raw data: " << largeRG.toString() << endl; // cout << "large side raw data: " << largeRG.toString() << endl;
largeSize += jp->insertLargeSideRGData(rgData); largeSize += jp->insertLargeSideRGData(rgData);
@ -268,22 +265,86 @@ void DiskJoinStep::largeReader()
void DiskJoinStep::loadFcn() void DiskJoinStep::loadFcn()
{ {
boost::shared_ptr<LoaderOutput> out; boost::shared_ptr<LoaderOutput> out;
bool ret; std::vector<JoinPartition*> joinPartitions;
// Collect all join partitions.
jp->collectJoinPartitions(joinPartitions);
#ifdef DEBUG_DJS
cout << "Collected join partitions: " << endl;
for (uint32_t i = 0; i < joinPartitions.size(); ++i)
cout << joinPartitions[i]->getUniqueID() << ", ";
cout << endl;
#endif
try try
{ {
do uint32_t partitionIndex = 0;
{ bool partitionDone = true;
out.reset(new LoaderOutput());
ret = jp->getNextPartition(&out->smallData, &out->partitionID, &out->jp);
if (ret) // Iterate over partitions.
while (partitionIndex < joinPartitions.size() && !cancelled())
{
uint64_t currentSize = 0;
auto* joinPartition = joinPartitions[partitionIndex];
out.reset(new LoaderOutput());
if (partitionDone)
joinPartition->setNextSmallOffset(0);
while (true)
{ {
// cout << "loaded partition " << out->partitionID << " smallData = " << out->smallData.size() << messageqcpp::ByteStream bs;
// endl; RowGroup rowGroup;
loadFIFO->insert(out); RGData rgData;
joinPartition->readByteStream(0, &bs);
if (!bs.length())
{
partitionDone = true;
break;
}
rgData.deserialize(bs);
rowGroup.setData(&rgData);
// Check that current `RowGroup` has rows.
if (!rowGroup.getRowCount())
{
partitionDone = true;
break;
}
currentSize += rowGroup.getRowCount() * rowGroup.getColumnCount() * 64;
out->smallData.push_back(rgData);
if (currentSize > partitionSize)
{
#ifdef DEBUG_DJS
cout << "Memory limit exceeded for the partition: " << joinPartition->getUniqueID() << endl;
cout << "Current size: " << currentSize << " Memory limit: " << partitionSize << endl;
#endif
partitionDone = false;
currentSize = 0;
break;
}
} }
} while (ret && !cancelled());
if (!out->smallData.size())
{
++partitionIndex;
partitionDone = true;
continue;
}
// Initialize `LoaderOutput` and add it to `FIFO`.
out->partitionID = joinPartition->getUniqueID();
out->jp = joinPartition;
loadFIFO->insert(out);
// If this partition is done - take a next one.
if (partitionDone)
++partitionIndex;
}
} }
catch (...) catch (...)
{ {

View File

@ -148,6 +148,7 @@ class DiskJoinStep : public JobStep
int64_t smallLimit; int64_t smallLimit;
int64_t largeLimit; int64_t largeLimit;
uint64_t partitionSize; uint64_t partitionSize;
uint32_t maxPartitionTreeDepth;
void reportStats(); void reportStats();

View File

@ -364,6 +364,8 @@ struct JobInfo
int64_t smallSideLimit; // need to get these from a session var in execplan int64_t smallSideLimit; // need to get these from a session var in execplan
int64_t largeSideLimit; int64_t largeSideLimit;
uint64_t partitionSize; uint64_t partitionSize;
uint32_t djsMaxPartitionTreeDepth;
bool djsForceRun;
bool isDML; bool isDML;
long timeZone; long timeZone;

View File

@ -2066,6 +2066,8 @@ SJLP makeJobList_(CalpontExecutionPlan* cplan, ResourceManager* rm,
jobInfo.smallSideLimit = csep->djsSmallSideLimit(); jobInfo.smallSideLimit = csep->djsSmallSideLimit();
jobInfo.largeSideLimit = csep->djsLargeSideLimit(); jobInfo.largeSideLimit = csep->djsLargeSideLimit();
jobInfo.partitionSize = csep->djsPartitionSize(); jobInfo.partitionSize = csep->djsPartitionSize();
jobInfo.djsMaxPartitionTreeDepth = csep->djsMaxPartitionTreeDepth();
jobInfo.djsForceRun = csep->djsForceRun();
jobInfo.umMemLimit.reset(new int64_t); jobInfo.umMemLimit.reset(new int64_t);
*(jobInfo.umMemLimit) = csep->umMemLimit(); *(jobInfo.umMemLimit) = csep->umMemLimit();
jobInfo.isDML = csep->isDML(); jobInfo.isDML = csep->isDML();

View File

@ -106,6 +106,8 @@ TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo)
djsSmallLimit = jobInfo.smallSideLimit; djsSmallLimit = jobInfo.smallSideLimit;
djsLargeLimit = jobInfo.largeSideLimit; djsLargeLimit = jobInfo.largeSideLimit;
djsPartitionSize = jobInfo.partitionSize; djsPartitionSize = jobInfo.partitionSize;
djsMaxPartitionTreeDepth = jobInfo.djsMaxPartitionTreeDepth;
djsForceRun = jobInfo.djsForceRun;
isDML = jobInfo.isDML; isDML = jobInfo.isDML;
config::Config* config = config::Config::makeConfig(); config::Config* config = config::Config::makeConfig();
@ -1984,53 +1986,55 @@ void TupleHashJoinStep::segregateJoiners()
return; return;
} }
/* If they are all inner joins they can be segregated w/o respect to // Force all joins into disk based.
ordering; if they're not, the ordering has to stay consistent therefore if (djsForceRun)
the first joiner that isn't finished and everything after has to be
done by DJS. */
if (allInnerJoins)
{ {
for (i = 0; i < smallSideCount; i++) for (i = 0; i < smallSideCount; ++i)
{ {
// if (joiners[i]->isFinished() && (rand() % 2)) { // for debugging joinIsTooBig = true;
if (joiners[i]->isFinished()) joiners[i]->setConvertToDiskJoin();
{ djsJoiners.push_back(joiners[i]);
// cout << "1joiner " << i << " " << hex << (uint64_t) joiners[i].get() << dec << " -> TBPS" << endl; djsJoinerMap.push_back(i);
tbpsJoiners.push_back(joiners[i]);
}
else
{
joinIsTooBig = true;
joiners[i]->setConvertToDiskJoin();
// cout << "1joiner " << i << " " << hex << (uint64_t) joiners[i].get() << dec << " -> DJS" << endl;
djsJoiners.push_back(joiners[i]);
djsJoinerMap.push_back(i);
}
} }
} }
else else
{ {
// uint limit = rand() % smallSideCount; /* If they are all inner joins they can be segregated w/o respect to
for (i = 0; i < smallSideCount; i++) ordering; if they're not, the ordering has to stay consistent therefore
the first joiner that isn't finished and everything after has to be
done by DJS. */
if (allInnerJoins)
{ {
// if (joiners[i]->isFinished() && i < limit) { // debugging for (i = 0; i < smallSideCount; i++)
if (joiners[i]->isFinished())
{ {
// cout << "2joiner " << i << " " << hex << (uint64_t) joiners[i].get() << dec << " -> TBPS" << endl; if (joiners[i]->isFinished())
tbpsJoiners.push_back(joiners[i]); tbpsJoiners.push_back(joiners[i]);
else
{
joinIsTooBig = true;
joiners[i]->setConvertToDiskJoin();
djsJoiners.push_back(joiners[i]);
djsJoinerMap.push_back(i);
}
} }
else
break;
} }
else
for (; i < smallSideCount; i++)
{ {
joinIsTooBig = true; for (i = 0; i < smallSideCount; i++)
joiners[i]->setConvertToDiskJoin(); {
// cout << "2joiner " << i << " " << hex << (uint64_t) joiners[i].get() << dec << " -> DJS" << endl; if (joiners[i]->isFinished())
djsJoiners.push_back(joiners[i]); tbpsJoiners.push_back(joiners[i]);
djsJoinerMap.push_back(i); else
break;
}
for (; i < smallSideCount; i++)
{
joinIsTooBig = true;
joiners[i]->setConvertToDiskJoin();
djsJoiners.push_back(joiners[i]);
djsJoinerMap.push_back(i);
}
} }
} }
} }

View File

@ -620,6 +620,8 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep
int64_t djsSmallLimit; int64_t djsSmallLimit;
int64_t djsLargeLimit; int64_t djsLargeLimit;
uint64_t djsPartitionSize; uint64_t djsPartitionSize;
uint32_t djsMaxPartitionTreeDepth;
bool djsForceRun;
bool isDML; bool isDML;
bool allowDJS; bool allowDJS;

View File

@ -6640,7 +6640,8 @@ void setExecutionParams(gp_walk_info& gwi, SCSEP& csep)
csep->djsSmallSideLimit(get_diskjoin_smallsidelimit(gwi.thd) * 1024ULL * 1024); csep->djsSmallSideLimit(get_diskjoin_smallsidelimit(gwi.thd) * 1024ULL * 1024);
csep->djsLargeSideLimit(get_diskjoin_largesidelimit(gwi.thd) * 1024ULL * 1024); csep->djsLargeSideLimit(get_diskjoin_largesidelimit(gwi.thd) * 1024ULL * 1024);
csep->djsPartitionSize(get_diskjoin_bucketsize(gwi.thd) * 1024ULL * 1024); csep->djsPartitionSize(get_diskjoin_bucketsize(gwi.thd) * 1024ULL * 1024);
csep->djsMaxPartitionTreeDepth(get_diskjoin_max_partition_tree_depth(gwi.thd));
csep->djsForceRun(get_diskjoin_force_run(gwi.thd));
if (get_um_mem_limit(gwi.thd) == 0) if (get_um_mem_limit(gwi.thd) == 0)
csep->umMemLimit(numeric_limits<int64_t>::max()); csep->umMemLimit(numeric_limits<int64_t>::max());
else else

View File

@ -135,6 +135,12 @@ static MYSQL_THDVAR_ULONG(diskjoin_bucketsize, PLUGIN_VAR_RQCMDARG,
"The maximum size in MB of each 'small side' table in memory.", NULL, NULL, 100, 1, "The maximum size in MB of each 'small side' table in memory.", NULL, NULL, 100, 1,
~0U, 1); ~0U, 1);
static MYSQL_THDVAR_ULONG(diskjoin_max_partition_tree_depth, PLUGIN_VAR_RQCMDARG,
"The maximum size of partition tree depth.", NULL, NULL, 8, 1, ~0U, 1);
static MYSQL_THDVAR_BOOL(diskjoin_force_run, PLUGIN_VAR_RQCMDARG, "Force run for the disk join step.", NULL,
NULL, 0);
static MYSQL_THDVAR_ULONG(um_mem_limit, PLUGIN_VAR_RQCMDARG, static MYSQL_THDVAR_ULONG(um_mem_limit, PLUGIN_VAR_RQCMDARG,
"Per user Memory limit(MB). Switch to disk-based JOIN when limit is reached", NULL, "Per user Memory limit(MB). Switch to disk-based JOIN when limit is reached", NULL,
NULL, 0, 0, ~0U, 1); NULL, 0, 0, ~0U, 1);
@ -224,6 +230,8 @@ st_mysql_sys_var* mcs_system_variables[] = {MYSQL_SYSVAR(compression_type),
MYSQL_SYSVAR(diskjoin_smallsidelimit), MYSQL_SYSVAR(diskjoin_smallsidelimit),
MYSQL_SYSVAR(diskjoin_largesidelimit), MYSQL_SYSVAR(diskjoin_largesidelimit),
MYSQL_SYSVAR(diskjoin_bucketsize), MYSQL_SYSVAR(diskjoin_bucketsize),
MYSQL_SYSVAR(diskjoin_max_partition_tree_depth),
MYSQL_SYSVAR(diskjoin_force_run),
MYSQL_SYSVAR(um_mem_limit), MYSQL_SYSVAR(um_mem_limit),
MYSQL_SYSVAR(double_for_decimal_math), MYSQL_SYSVAR(double_for_decimal_math),
MYSQL_SYSVAR(decimal_overflow_check), MYSQL_SYSVAR(decimal_overflow_check),
@ -420,6 +428,24 @@ void set_diskjoin_bucketsize(THD* thd, ulong value)
THDVAR(thd, diskjoin_bucketsize) = value; THDVAR(thd, diskjoin_bucketsize) = value;
} }
ulong get_diskjoin_max_partition_tree_depth(THD* thd)
{
return (thd == NULL) ? 0 : THDVAR(thd, diskjoin_max_partition_tree_depth);
}
void set_diskjoin_max_partition_tree_depth(THD* thd, ulong value)
{
THDVAR(thd, diskjoin_max_partition_tree_depth) = value;
}
bool get_diskjoin_force_run(THD* thd)
{
return (thd == NULL) ? 0 : THDVAR(thd, diskjoin_force_run);
}
void set_diskjoin_force_run(THD* thd, bool value)
{
THDVAR(thd, diskjoin_force_run) = value;
}
ulong get_um_mem_limit(THD* thd) ulong get_um_mem_limit(THD* thd)
{ {
return (thd == NULL) ? 0 : THDVAR(thd, um_mem_limit); return (thd == NULL) ? 0 : THDVAR(thd, um_mem_limit);
@ -598,4 +624,4 @@ const char* get_s3_region(THD* thd)
void set_s3_region(THD* thd, char* value) void set_s3_region(THD* thd, char* value)
{ {
THDVAR(thd, s3_region) = value; THDVAR(thd, s3_region) = value;
} }

View File

@ -108,6 +108,12 @@ void set_diskjoin_largesidelimit(THD* thd, ulong value);
ulong get_diskjoin_bucketsize(THD* thd); ulong get_diskjoin_bucketsize(THD* thd);
void set_diskjoin_bucketsize(THD* thd, ulong value); void set_diskjoin_bucketsize(THD* thd, ulong value);
bool get_diskjoin_force_run(THD* thd);
void set_diskjoin_force_run(THD* thd, bool value);
ulong get_diskjoin_max_partition_tree_depth(THD* thd);
void set_diskjoin_max_partition_tree_depth(THD* thd, ulong value);
ulong get_um_mem_limit(THD* thd); ulong get_um_mem_limit(THD* thd);
void set_um_mem_limit(THD* thd, ulong value); void set_um_mem_limit(THD* thd, ulong value);

View File

@ -34,6 +34,7 @@ using namespace logging;
namespace joiner namespace joiner
{ {
// FIXME: Possible overflow, we have to null it after clearing files.
uint64_t uniqueNums = 0; uint64_t uniqueNums = 0;
JoinPartition::JoinPartition() JoinPartition::JoinPartition()
@ -44,7 +45,7 @@ JoinPartition::JoinPartition()
/* This is the ctor used by THJS */ /* This is the ctor used by THJS */
JoinPartition::JoinPartition(const RowGroup& lRG, const RowGroup& sRG, const vector<uint32_t>& smallKeys, JoinPartition::JoinPartition(const RowGroup& lRG, const RowGroup& sRG, const vector<uint32_t>& smallKeys,
const vector<uint32_t>& largeKeys, bool typeless, bool antiWMN, bool hasFEFilter, const vector<uint32_t>& largeKeys, bool typeless, bool antiWMN, bool hasFEFilter,
uint64_t totalUMMemory, uint64_t partitionSize) uint64_t totalUMMemory, uint64_t partitionSize, uint32_t maxPartitionTreeDepth)
: smallRG(sRG) : smallRG(sRG)
, largeRG(lRG) , largeRG(lRG)
, smallKeyCols(smallKeys) , smallKeyCols(smallKeys)
@ -54,6 +55,7 @@ JoinPartition::JoinPartition(const RowGroup& lRG, const RowGroup& sRG, const vec
, htSizeEstimate(0) , htSizeEstimate(0)
, htTargetSize(partitionSize) , htTargetSize(partitionSize)
, rootNode(true) , rootNode(true)
, canSplit(true)
, antiWithMatchNulls(antiWMN) , antiWithMatchNulls(antiWMN)
, needsAllNullRows(hasFEFilter) , needsAllNullRows(hasFEFilter)
, gotNullRow(false) , gotNullRow(false)
@ -63,6 +65,8 @@ JoinPartition::JoinPartition(const RowGroup& lRG, const RowGroup& sRG, const vec
, maxSmallSize(0) , maxSmallSize(0)
, nextSmallOffset(0) , nextSmallOffset(0)
, nextLargeOffset(0) , nextLargeOffset(0)
, currentPartitionTreeDepth(0)
, maxPartitionTreeDepth(maxPartitionTreeDepth)
{ {
config::Config* config = config::Config::makeConfig(); config::Config* config = config::Config::makeConfig();
string cfgTxt; string cfgTxt;
@ -105,20 +109,17 @@ JoinPartition::JoinPartition(const RowGroup& lRG, const RowGroup& sRG, const vec
} }
if (compressionType == "LZ4") if (compressionType == "LZ4")
{
compressor.reset(new compress::CompressInterfaceLZ4()); compressor.reset(new compress::CompressInterfaceLZ4());
}
else else
{
compressor.reset(new compress::CompressInterfaceSnappy()); compressor.reset(new compress::CompressInterfaceSnappy());
}
for (uint32_t i = 0; i < bucketCount; i++) for (uint32_t i = 0; i < bucketCount; i++)
buckets.push_back(boost::shared_ptr<JoinPartition>(new JoinPartition(*this, false))); buckets.push_back(
boost::shared_ptr<JoinPartition>(new JoinPartition(*this, false, currentPartitionTreeDepth + 1)));
} }
/* Ctor used by JoinPartition on expansion, creates JP's in filemode */ /* Ctor used by JoinPartition on expansion, creates JP's in filemode */
JoinPartition::JoinPartition(const JoinPartition& jp, bool splitMode) JoinPartition::JoinPartition(const JoinPartition& jp, bool splitMode, uint32_t currentPartitionTreeDepth)
: smallRG(jp.smallRG) : smallRG(jp.smallRG)
, largeRG(jp.largeRG) , largeRG(jp.largeRG)
, smallKeyCols(jp.smallKeyCols) , smallKeyCols(jp.smallKeyCols)
@ -131,6 +132,7 @@ JoinPartition::JoinPartition(const JoinPartition& jp, bool splitMode)
, htSizeEstimate(0) , htSizeEstimate(0)
, htTargetSize(jp.htTargetSize) , htTargetSize(jp.htTargetSize)
, rootNode(false) , rootNode(false)
, canSplit(true)
, antiWithMatchNulls(jp.antiWithMatchNulls) , antiWithMatchNulls(jp.antiWithMatchNulls)
, needsAllNullRows(jp.needsAllNullRows) , needsAllNullRows(jp.needsAllNullRows)
, gotNullRow(false) , gotNullRow(false)
@ -141,17 +143,12 @@ JoinPartition::JoinPartition(const JoinPartition& jp, bool splitMode)
, maxSmallSize(0) , maxSmallSize(0)
, nextSmallOffset(0) , nextSmallOffset(0)
, nextLargeOffset(0) , nextLargeOffset(0)
, currentPartitionTreeDepth(currentPartitionTreeDepth)
, maxPartitionTreeDepth(jp.maxPartitionTreeDepth)
{ {
ostringstream os; ostringstream os;
fileMode = true; fileMode = true;
// tuning issue: with the defaults, each 100MB bucket would split s.t. the children
// could store another 4GB total. Given a good hash and evenly distributed data,
// the first level of expansion would happen for all JPs at once, giving a total
// capacity of (4GB * 40) = 160GB, when actual usage at that point is a little over 4GB.
// Instead, each will double in size, giving a capacity of 8GB -> 16 -> 32, and so on.
// bucketCount = jp.bucketCount;
bucketCount = 2;
config::Config* config = config::Config::makeConfig(); config::Config* config = config::Config::makeConfig();
filenamePrefix = config->getTempFileDir(config::Config::TempDirPurpose::Joins); filenamePrefix = config->getTempFileDir(config::Config::TempDirPurpose::Joins);
@ -270,9 +267,6 @@ int64_t JoinPartition::doneInsertingSmallData()
smallSizeOnDisk += leafNodeIncrement; smallSizeOnDisk += leafNodeIncrement;
} }
// else
// cout << uniqueID << " htsizeestimate = " << htSizeEstimate << endl;
if (!rootNode) if (!rootNode)
{ {
buffer.reinit(largeRG); buffer.reinit(largeRG);
@ -314,25 +308,92 @@ int64_t JoinPartition::doneInsertingLargeData()
return ret; return ret;
} }
bool JoinPartition::canConvertToSplitMode()
{
// TODO: Make depth configurable.
if (!canSplit || currentPartitionTreeDepth >= maxPartitionTreeDepth)
return false;
ByteStream bs;
RowGroup& rg = smallRG;
Row& row = smallRow;
RGData rgData;
uint64_t totalRowCount = 0;
std::unordered_map<uint32_t, uint32_t> rowDist;
nextSmallOffset = 0;
while (1)
{
uint32_t hash;
readByteStream(0, &bs);
if (bs.length() == 0)
break;
rgData.deserialize(bs);
rg.setData(&rgData);
for (uint32_t j = 0, e = rg.getRowCount(); j < e; ++j)
{
rg.getRow(j, &row);
if (antiWithMatchNulls && hasNullJoinColumn(row))
continue;
uint64_t tmp;
if (typelessJoin)
hash = getHashOfTypelessKey(row, smallKeyCols, hashSeed) % bucketCount;
else
{
if (UNLIKELY(row.isUnsigned(smallKeyCols[0])))
tmp = row.getUintField(smallKeyCols[0]);
else
tmp = row.getIntField(smallKeyCols[0]);
hash = hasher((char*)&tmp, 8, hashSeed);
hash = hasher.finalize(hash, 8) % bucketCount;
}
totalRowCount++;
rowDist[hash]++;
}
}
for (const auto& [hash, currentRowCount] : rowDist)
{
if (currentRowCount == totalRowCount)
{
canSplit = false;
break;
}
}
rg.setData(&buffer);
rg.resetRowGroup(0);
rg.getRow(0, &row);
return canSplit;
}
int64_t JoinPartition::convertToSplitMode() int64_t JoinPartition::convertToSplitMode()
{ {
int i, j; #ifdef DEBUG_DJS
cout << "Convert to split mode " << endl;
#endif
ByteStream bs; ByteStream bs;
RGData rgData; RGData rgData;
uint32_t hash; uint32_t hash;
uint64_t tmp; uint64_t tmp;
int64_t ret = -(int64_t)smallSizeOnDisk; // smallFile gets deleted int64_t ret = -(int64_t)smallSizeOnDisk; // smallFile gets deleted
boost::scoped_array<uint32_t> rowDist(new uint32_t[bucketCount]);
uint32_t rowCount = 0;
memset(rowDist.get(), 0, sizeof(uint32_t) * bucketCount);
fileMode = false; fileMode = false;
htSizeEstimate = 0; htSizeEstimate = 0;
smallSizeOnDisk = 0; smallSizeOnDisk = 0;
buckets.reserve(bucketCount);
for (i = 0; i < (int)bucketCount; i++) buckets.reserve(bucketCount);
buckets.push_back(boost::shared_ptr<JoinPartition>(new JoinPartition(*this, false))); for (uint32_t i = 0; i < bucketCount; i++)
buckets.push_back(
boost::shared_ptr<JoinPartition>(new JoinPartition(*this, false, currentPartitionTreeDepth + 1)));
RowGroup& rg = smallRG; RowGroup& rg = smallRG;
Row& row = smallRow; Row& row = smallRow;
@ -348,7 +409,7 @@ int64_t JoinPartition::convertToSplitMode()
rgData.deserialize(bs); rgData.deserialize(bs);
rg.setData(&rgData); rg.setData(&rgData);
for (j = 0; j < (int)rg.getRowCount(); j++) for (uint32_t j = 0; j < rg.getRowCount(); j++)
{ {
rg.getRow(j, &row); rg.getRow(j, &row);
@ -356,7 +417,7 @@ int64_t JoinPartition::convertToSplitMode()
{ {
if (needsAllNullRows || !gotNullRow) if (needsAllNullRows || !gotNullRow)
{ {
for (j = 0; j < (int)bucketCount; j++) for (j = 0; j < bucketCount; j++)
ret += buckets[j]->insertSmallSideRow(row); ret += buckets[j]->insertSmallSideRow(row);
gotNullRow = true; gotNullRow = true;
@ -377,20 +438,14 @@ int64_t JoinPartition::convertToSplitMode()
hash = hasher((char*)&tmp, 8, hashSeed); hash = hasher((char*)&tmp, 8, hashSeed);
hash = hasher.finalize(hash, 8) % bucketCount; hash = hasher.finalize(hash, 8) % bucketCount;
} }
buckets[hash]->insertSmallSideRow(row);
rowCount++;
rowDist[hash]++;
ret += buckets[hash]->insertSmallSideRow(row);
} }
} }
boost::filesystem::remove(smallFilename); boost::filesystem::remove(smallFilename);
smallFilename.clear(); smallFilename.clear();
for (i = 0; i < (int)bucketCount; i++)
if (rowDist[i] == rowCount)
throw IDBExcept("All rows hashed to the same bucket", ERR_DBJ_DATA_DISTRIBUTION);
rg.setData(&buffer); rg.setData(&buffer);
rg.resetRowGroup(0); rg.resetRowGroup(0);
rg.getRow(0, &row); rg.getRow(0, &row);
@ -418,30 +473,18 @@ int64_t JoinPartition::processSmallBuffer(RGData& rgData)
int64_t ret = 0; int64_t ret = 0;
rg.setData(&rgData); rg.setData(&rgData);
// if (rootNode)
// cout << "smallside RGData: " << rg.toString() << endl;
if (fileMode) if (fileMode)
{ {
ByteStream bs; ByteStream bs;
rg.serializeRGData(bs); rg.serializeRGData(bs);
// cout << "writing RGData: " << rg.toString() << endl;
ret = writeByteStream(0, bs); ret = writeByteStream(0, bs);
// cout << "wrote " << ret << " bytes" << endl;
/* Check whether this partition is now too big -> convert to split mode. htSizeEstimate += rg.getRowCount() * rg.getColumnCount() * 64;
// Check whether this partition is now too big -> convert to split mode.
The current estimate is based on 100M 4-byte rows = 4GB. The total size is if (htTargetSize < htSizeEstimate && canConvertToSplitMode())
the amount stored in RowGroups in mem + the size of the hash table. The RowGroups
in that case use 600MB, so 3.4GB is used by the hash table. 3.4GB/100M rows = 34 bytes/row
*/
htSizeEstimate += rg.getDataSize() + (34 * rg.getRowCount());
if (htSizeEstimate > htTargetSize)
ret += convertToSplitMode(); ret += convertToSplitMode();
// cout << "wrote some data, returning " << ret << endl;
} }
else else
{ {
@ -478,19 +521,16 @@ int64_t JoinPartition::processSmallBuffer(RGData& rgData)
hash = hasher.finalize(hash, 8) % bucketCount; hash = hasher.finalize(hash, 8) % bucketCount;
} }
// cout << "hashing smallside row: " << row.toString() << endl;
ret += buckets[hash]->insertSmallSideRow(row); ret += buckets[hash]->insertSmallSideRow(row);
} }
// cout << "distributed rows, returning " << ret << endl;
} }
smallSizeOnDisk += ret; smallSizeOnDisk += ret;
return ret; return ret;
} }
/* the difference between processSmall & processLarge is mostly the names of // the difference between processSmall & processLarge is mostly the names of
variables being small* -> large*, template? */ // variables being small* -> large*, template? */
int64_t JoinPartition::processLargeBuffer() int64_t JoinPartition::processLargeBuffer()
{ {
@ -511,11 +551,8 @@ int64_t JoinPartition::processLargeBuffer(RGData& rgData)
rg.setData(&rgData); rg.setData(&rgData);
// if (rootNode) // Need to fail a query with an anti join, an FE filter, and a NULL row on the
// cout << "largeside RGData: " << rg.toString() << endl; // large side b/c it needs to be joined with the entire small side table.
/* Need to fail a query with an anti join, an FE filter, and a NULL row on the
large side b/c it needs to be joined with the entire small side table. */
if (antiWithMatchNulls && needsAllNullRows) if (antiWithMatchNulls && needsAllNullRows)
{ {
rg.getRow(0, &row); rg.getRow(0, &row);
@ -534,9 +571,7 @@ int64_t JoinPartition::processLargeBuffer(RGData& rgData)
{ {
ByteStream bs; ByteStream bs;
rg.serializeRGData(bs); rg.serializeRGData(bs);
// cout << "writing large RGData: " << rg.toString() << endl;
ret = writeByteStream(1, bs); ret = writeByteStream(1, bs);
// cout << "wrote " << ret << " bytes" << endl;
} }
else else
{ {
@ -560,7 +595,6 @@ int64_t JoinPartition::processLargeBuffer(RGData& rgData)
hash = hasher.finalize(hash, 8) % bucketCount; hash = hasher.finalize(hash, 8) % bucketCount;
} }
// cout << "large side hashing row: " << row.toString() << endl;
ret += buckets[hash]->insertLargeSideRow(row); ret += buckets[hash]->insertLargeSideRow(row);
} }
} }
@ -569,49 +603,18 @@ int64_t JoinPartition::processLargeBuffer(RGData& rgData)
return ret; return ret;
} }
bool JoinPartition::getNextPartition(vector<RGData>* smallData, uint64_t* partitionID, JoinPartition** jp) void JoinPartition::collectJoinPartitions(std::vector<JoinPartition*>& joinPartitions)
{ {
if (fileMode) if (fileMode)
{ {
ByteStream bs; joinPartitions.push_back(this);
RGData rgData; return;
if (nextPartitionToReturn > 0)
return false;
// cout << "reading the small side" << endl;
nextSmallOffset = 0;
while (1)
{
readByteStream(0, &bs);
if (bs.length() == 0)
break;
rgData.deserialize(bs);
// smallRG.setData(&rgData);
// cout << "read a smallRG with " << smallRG.getRowCount() << " rows" << endl;
smallData->push_back(rgData);
}
nextPartitionToReturn = 1;
*partitionID = uniqueID;
*jp = this;
return true;
} }
bool ret = false; for (uint32_t currentBucket = 0; currentBucket < bucketCount; ++currentBucket)
while (!ret && nextPartitionToReturn < bucketCount)
{ {
ret = buckets[nextPartitionToReturn]->getNextPartition(smallData, partitionID, jp); buckets[currentBucket]->collectJoinPartitions(joinPartitions);
if (!ret)
nextPartitionToReturn++;
} }
return ret;
} }
boost::shared_ptr<RGData> JoinPartition::getNextLargeRGData() boost::shared_ptr<RGData> JoinPartition::getNextLargeRGData()
@ -627,10 +630,7 @@ boost::shared_ptr<RGData> JoinPartition::getNextLargeRGData()
ret->deserialize(bs); ret->deserialize(bs);
} }
else else
{ nextLargeOffset = 0;
boost::filesystem::remove(largeFilename);
largeSizeOnDisk = 0;
}
return ret; return ret;
} }
@ -682,7 +682,6 @@ void JoinPartition::initForLargeSideFeed()
void JoinPartition::saveSmallSidePartition(vector<RGData>& rgData) void JoinPartition::saveSmallSidePartition(vector<RGData>& rgData)
{ {
// cout << "JP: saving partition: " << id << endl;
htSizeEstimate = 0; htSizeEstimate = 0;
smallSizeOnDisk = 0; smallSizeOnDisk = 0;
nextSmallOffset = 0; nextSmallOffset = 0;
@ -806,7 +805,7 @@ uint64_t JoinPartition::writeByteStream(int which, ByteStream& bs)
if (!useCompression) if (!useCompression)
{ {
ret = len + 4; ret = len + sizeof(len);
fs.write((char*)&len, sizeof(len)); fs.write((char*)&len, sizeof(len));
fs.write((char*)bs.buf(), len); fs.write((char*)bs.buf(), len);
saveErrno = errno; saveErrno = errno;
@ -828,7 +827,7 @@ uint64_t JoinPartition::writeByteStream(int which, ByteStream& bs)
boost::scoped_array<uint8_t> compressed(new uint8_t[maxSize]); boost::scoped_array<uint8_t> compressed(new uint8_t[maxSize]);
compressor->compress((char*)bs.buf(), len, (char*)compressed.get(), &actualSize); compressor->compress((char*)bs.buf(), len, (char*)compressed.get(), &actualSize);
ret = actualSize + 4 + 8; // sizeof (size_t) == 8. Why 4? ret = actualSize + sizeof(len); // sizeof (size_t) == 8. Why 4?
fs.write((char*)&actualSize, sizeof(actualSize)); fs.write((char*)&actualSize, sizeof(actualSize));
// Save uncompressed len. // Save uncompressed len.
fs.write((char*)&len, sizeof(len)); fs.write((char*)&len, sizeof(len));
@ -843,7 +842,7 @@ uint64_t JoinPartition::writeByteStream(int which, ByteStream& bs)
throw IDBExcept(os.str().c_str(), ERR_DBJ_FILE_IO_ERROR); throw IDBExcept(os.str().c_str(), ERR_DBJ_FILE_IO_ERROR);
} }
totalBytesWritten += sizeof(actualSize) + actualSize; totalBytesWritten += sizeof(len) + actualSize;
} }
bs.advance(len); bs.advance(len);

View File

@ -33,8 +33,8 @@ class JoinPartition
JoinPartition(const rowgroup::RowGroup& largeRG, const rowgroup::RowGroup& smallRG, JoinPartition(const rowgroup::RowGroup& largeRG, const rowgroup::RowGroup& smallRG,
const std::vector<uint32_t>& smallkeyCols, const std::vector<uint32_t>& largeKeyCols, const std::vector<uint32_t>& smallkeyCols, const std::vector<uint32_t>& largeKeyCols,
bool typeless, bool isAntiWithMatchNulls, bool hasFEFilter, uint64_t totalUMMemory, bool typeless, bool isAntiWithMatchNulls, bool hasFEFilter, uint64_t totalUMMemory,
uint64_t partitionSize); uint64_t partitionSize, uint32_t maxPartitionTreeDepth);
JoinPartition(const JoinPartition&, bool splitMode); JoinPartition(const JoinPartition&, bool splitMode, uint32_t depth);
virtual ~JoinPartition(); virtual ~JoinPartition();
@ -52,6 +52,8 @@ class JoinPartition
/* Returns true if there are more partitions to fetch, false otherwise */ /* Returns true if there are more partitions to fetch, false otherwise */
bool getNextPartition(std::vector<rowgroup::RGData>* smallData, uint64_t* partitionID, JoinPartition** jp); bool getNextPartition(std::vector<rowgroup::RGData>* smallData, uint64_t* partitionID, JoinPartition** jp);
void collectJoinPartitions(std::vector<JoinPartition*>& joinPartitions);
boost::shared_ptr<rowgroup::RGData> getNextLargeRGData(); boost::shared_ptr<rowgroup::RGData> getNextLargeRGData();
/* It's important to follow the sequence of operations to maintain the correct /* It's important to follow the sequence of operations to maintain the correct
@ -100,11 +102,21 @@ class JoinPartition
{ {
return maxSmallSize; return maxSmallSize;
} }
void readByteStream(int which, messageqcpp::ByteStream* bs);
uint64_t getUniqueID()
{
return uniqueID;
}
void setNextSmallOffset(size_t offset)
{
nextSmallOffset = offset;
}
protected: protected:
private: private:
void initBuffers(); void initBuffers();
int64_t convertToSplitMode(); int64_t convertToSplitMode();
bool canConvertToSplitMode();
int64_t processSmallBuffer(); int64_t processSmallBuffer();
int64_t processLargeBuffer(); int64_t processLargeBuffer();
@ -137,7 +149,7 @@ class JoinPartition
uint64_t largeSizeOnDisk; uint64_t largeSizeOnDisk;
utils::Hasher_r hasher; utils::Hasher_r hasher;
bool rootNode; bool rootNode;
bool canSplit;
/* Not-in antijoin hack. A small-side row with a null join column has to go into every partition or /* Not-in antijoin hack. A small-side row with a null join column has to go into every partition or
into one always resident partition (TBD). into one always resident partition (TBD).
@ -148,7 +160,6 @@ class JoinPartition
bool hasNullJoinColumn(rowgroup::Row&); bool hasNullJoinColumn(rowgroup::Row&);
// which = 0 -> smallFile, which = 1 -> largeFile // which = 0 -> smallFile, which = 1 -> largeFile
void readByteStream(int which, messageqcpp::ByteStream* bs);
uint64_t writeByteStream(int which, messageqcpp::ByteStream& bs); uint64_t writeByteStream(int which, messageqcpp::ByteStream& bs);
/* Compression support */ /* Compression support */
@ -163,6 +174,9 @@ class JoinPartition
/* file descriptor reduction */ /* file descriptor reduction */
size_t nextSmallOffset; size_t nextSmallOffset;
size_t nextLargeOffset; size_t nextLargeOffset;
};
// Options to control partition tree depth.
uint32_t currentPartitionTreeDepth;
uint32_t maxPartitionTreeDepth;
};
} // namespace joiner } // namespace joiner