diff --git a/dbcon/execplan/calpontselectexecutionplan.cpp b/dbcon/execplan/calpontselectexecutionplan.cpp index 5f416b6fc..4354fd187 100644 --- a/dbcon/execplan/calpontselectexecutionplan.cpp +++ b/dbcon/execplan/calpontselectexecutionplan.cpp @@ -85,6 +85,8 @@ CalpontSelectExecutionPlan::CalpontSelectExecutionPlan(const int location) , fDJSSmallSideLimit(0) , fDJSLargeSideLimit(0) , fDJSPartitionSize(100 * 1024 * 1024) + , fDJSMaxPartitionTreeDepth(8) + , fDJSForceRun(false) , // 100MB mem usage for disk based join, fUMMemLimit(numeric_limits::max()) , fIsDML(false) @@ -457,6 +459,8 @@ void CalpontSelectExecutionPlan::serialize(messageqcpp::ByteStream& b) const b << fDJSSmallSideLimit; b << fDJSLargeSideLimit; b << fDJSPartitionSize; + b << fDJSMaxPartitionTreeDepth; + b << (uint8_t)fDJSForceRun; b << fUMMemLimit; b << (uint8_t)fIsDML; messageqcpp::ByteStream::octbyte timeZone = fTimeZone; @@ -652,6 +656,8 @@ void CalpontSelectExecutionPlan::unserialize(messageqcpp::ByteStream& b) b >> fDJSSmallSideLimit; b >> fDJSLargeSideLimit; b >> fDJSPartitionSize; + b >> fDJSMaxPartitionTreeDepth; + b >> (uint8_t&)fDJSForceRun; b >> fUMMemLimit; b >> tmp8; fIsDML = tmp8; diff --git a/dbcon/execplan/calpontselectexecutionplan.h b/dbcon/execplan/calpontselectexecutionplan.h index b57889aff..992e2b41f 100644 --- a/dbcon/execplan/calpontselectexecutionplan.h +++ b/dbcon/execplan/calpontselectexecutionplan.h @@ -688,6 +688,24 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan return fDJSPartitionSize; } + void djsMaxPartitionTreeDepth(uint32_t value) + { + fDJSMaxPartitionTreeDepth = value; + } + uint64_t djsMaxPartitionTreeDepth() + { + return fDJSMaxPartitionTreeDepth; + } + + void djsForceRun(bool b) + { + fDJSForceRun = b; + } + bool djsForceRun() + { + return fDJSForceRun; + } + void umMemLimit(uint64_t l) { fUMMemLimit = l; @@ -920,6 +938,8 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan uint64_t fDJSSmallSideLimit = 0; uint64_t fDJSLargeSideLimit = 0; uint64_t fDJSPartitionSize = 100 * 1024 * 1024; + uint32_t fDJSMaxPartitionTreeDepth = 8; + bool fDJSForceRun = false; int64_t fUMMemLimit = numeric_limits::max(); bool fIsDML = false; long fTimeZone = 0; diff --git a/dbcon/joblist/diskjoinstep.cpp b/dbcon/joblist/diskjoinstep.cpp index aa84deea8..b83d9a6c6 100644 --- a/dbcon/joblist/diskjoinstep.cpp +++ b/dbcon/joblist/diskjoinstep.cpp @@ -81,6 +81,7 @@ DiskJoinStep::DiskJoinStep(TupleHashJoinStep* t, int djsIndex, int joinIndex, bo smallLimit = thjs->djsSmallLimit; largeLimit = thjs->djsLargeLimit; partitionSize = thjs->djsPartitionSize; + maxPartitionTreeDepth = thjs->djsMaxPartitionTreeDepth; if (smallLimit == 0) smallLimit = numeric_limits::max(); @@ -91,7 +92,7 @@ DiskJoinStep::DiskJoinStep(TupleHashJoinStep* t, int djsIndex, int joinIndex, bo uint64_t totalUMMemory = thjs->resourceManager->getConfiguredUMMemLimit(); jp.reset(new JoinPartition(largeRG, smallRG, smallKeyCols, largeKeyCols, typeless, (joinType & ANTI) && (joinType & MATCHNULLS), (bool)fe, totalUMMemory, - partitionSize)); + partitionSize, maxPartitionTreeDepth)); if (cancelled()) { @@ -163,7 +164,6 @@ void DiskJoinStep::smallReader() RGData rgData; bool more = true; int64_t memUsage = 0, combinedMemUsage = 0; - [[maybe_unused]] int rowCount = 0; RowGroup l_smallRG = smallRG; try @@ -175,7 +175,6 @@ void DiskJoinStep::smallReader() if (more) { l_smallRG.setData(&rgData); - rowCount += l_smallRG.getRowCount(); memUsage = jp->insertSmallSideRGData(rgData); combinedMemUsage = atomicops::atomicAdd(smallUsage.get(), memUsage); @@ -224,7 +223,6 @@ void DiskJoinStep::largeReader() RGData rgData; bool more = true; int64_t largeSize = 0; - [[maybe_unused]] int rowCount = 0; RowGroup l_largeRG = largeRG; largeIterationCount++; @@ -239,7 +237,6 @@ void DiskJoinStep::largeReader() if (more) { l_largeRG.setData(&rgData); - rowCount += l_largeRG.getRowCount(); // cout << "large side raw data: " << largeRG.toString() << endl; largeSize += jp->insertLargeSideRGData(rgData); @@ -268,22 +265,86 @@ void DiskJoinStep::largeReader() void DiskJoinStep::loadFcn() { boost::shared_ptr out; - bool ret; + std::vector joinPartitions; + // Collect all join partitions. + jp->collectJoinPartitions(joinPartitions); + +#ifdef DEBUG_DJS + cout << "Collected join partitions: " << endl; + for (uint32_t i = 0; i < joinPartitions.size(); ++i) + cout << joinPartitions[i]->getUniqueID() << ", "; + cout << endl; +#endif try { - do - { - out.reset(new LoaderOutput()); - ret = jp->getNextPartition(&out->smallData, &out->partitionID, &out->jp); + uint32_t partitionIndex = 0; + bool partitionDone = true; - if (ret) + // Iterate over partitions. + while (partitionIndex < joinPartitions.size() && !cancelled()) + { + uint64_t currentSize = 0; + auto* joinPartition = joinPartitions[partitionIndex]; + out.reset(new LoaderOutput()); + + if (partitionDone) + joinPartition->setNextSmallOffset(0); + + while (true) { - // cout << "loaded partition " << out->partitionID << " smallData = " << out->smallData.size() << - // endl; - loadFIFO->insert(out); + messageqcpp::ByteStream bs; + RowGroup rowGroup; + RGData rgData; + + joinPartition->readByteStream(0, &bs); + if (!bs.length()) + { + partitionDone = true; + break; + } + + rgData.deserialize(bs); + rowGroup.setData(&rgData); + + // Check that current `RowGroup` has rows. + if (!rowGroup.getRowCount()) + { + partitionDone = true; + break; + } + + currentSize += rowGroup.getRowCount() * rowGroup.getColumnCount() * 64; + out->smallData.push_back(rgData); + + if (currentSize > partitionSize) + { +#ifdef DEBUG_DJS + cout << "Memory limit exceeded for the partition: " << joinPartition->getUniqueID() << endl; + cout << "Current size: " << currentSize << " Memory limit: " << partitionSize << endl; +#endif + partitionDone = false; + currentSize = 0; + break; + } } - } while (ret && !cancelled()); + + if (!out->smallData.size()) + { + ++partitionIndex; + partitionDone = true; + continue; + } + + // Initialize `LoaderOutput` and add it to `FIFO`. + out->partitionID = joinPartition->getUniqueID(); + out->jp = joinPartition; + loadFIFO->insert(out); + + // If this partition is done - take a next one. + if (partitionDone) + ++partitionIndex; + } } catch (...) { diff --git a/dbcon/joblist/diskjoinstep.h b/dbcon/joblist/diskjoinstep.h index a6817439f..e719acba1 100644 --- a/dbcon/joblist/diskjoinstep.h +++ b/dbcon/joblist/diskjoinstep.h @@ -148,6 +148,7 @@ class DiskJoinStep : public JobStep int64_t smallLimit; int64_t largeLimit; uint64_t partitionSize; + uint32_t maxPartitionTreeDepth; void reportStats(); diff --git a/dbcon/joblist/jlf_common.h b/dbcon/joblist/jlf_common.h index a2c7a4881..3073be233 100644 --- a/dbcon/joblist/jlf_common.h +++ b/dbcon/joblist/jlf_common.h @@ -364,6 +364,8 @@ struct JobInfo int64_t smallSideLimit; // need to get these from a session var in execplan int64_t largeSideLimit; uint64_t partitionSize; + uint32_t djsMaxPartitionTreeDepth; + bool djsForceRun; bool isDML; long timeZone; diff --git a/dbcon/joblist/joblistfactory.cpp b/dbcon/joblist/joblistfactory.cpp index 11308612e..e48d5a02d 100644 --- a/dbcon/joblist/joblistfactory.cpp +++ b/dbcon/joblist/joblistfactory.cpp @@ -2066,6 +2066,8 @@ SJLP makeJobList_(CalpontExecutionPlan* cplan, ResourceManager* rm, jobInfo.smallSideLimit = csep->djsSmallSideLimit(); jobInfo.largeSideLimit = csep->djsLargeSideLimit(); jobInfo.partitionSize = csep->djsPartitionSize(); + jobInfo.djsMaxPartitionTreeDepth = csep->djsMaxPartitionTreeDepth(); + jobInfo.djsForceRun = csep->djsForceRun(); jobInfo.umMemLimit.reset(new int64_t); *(jobInfo.umMemLimit) = csep->umMemLimit(); jobInfo.isDML = csep->isDML(); diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index 4d9959d8d..f6b154114 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -106,6 +106,8 @@ TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo) djsSmallLimit = jobInfo.smallSideLimit; djsLargeLimit = jobInfo.largeSideLimit; djsPartitionSize = jobInfo.partitionSize; + djsMaxPartitionTreeDepth = jobInfo.djsMaxPartitionTreeDepth; + djsForceRun = jobInfo.djsForceRun; isDML = jobInfo.isDML; config::Config* config = config::Config::makeConfig(); @@ -1971,53 +1973,55 @@ void TupleHashJoinStep::segregateJoiners() return; } - /* If they are all inner joins they can be segregated w/o respect to - ordering; if they're not, the ordering has to stay consistent therefore - the first joiner that isn't finished and everything after has to be - done by DJS. */ - - if (allInnerJoins) + // Force all joins into disk based. + if (djsForceRun) { - for (i = 0; i < smallSideCount; i++) + for (i = 0; i < smallSideCount; ++i) { - // if (joiners[i]->isFinished() && (rand() % 2)) { // for debugging - if (joiners[i]->isFinished()) - { - // cout << "1joiner " << i << " " << hex << (uint64_t) joiners[i].get() << dec << " -> TBPS" << endl; - tbpsJoiners.push_back(joiners[i]); - } - else - { - joinIsTooBig = true; - joiners[i]->setConvertToDiskJoin(); - // cout << "1joiner " << i << " " << hex << (uint64_t) joiners[i].get() << dec << " -> DJS" << endl; - djsJoiners.push_back(joiners[i]); - djsJoinerMap.push_back(i); - } + joinIsTooBig = true; + joiners[i]->setConvertToDiskJoin(); + djsJoiners.push_back(joiners[i]); + djsJoinerMap.push_back(i); } } else { - // uint limit = rand() % smallSideCount; - for (i = 0; i < smallSideCount; i++) + /* If they are all inner joins they can be segregated w/o respect to + ordering; if they're not, the ordering has to stay consistent therefore + the first joiner that isn't finished and everything after has to be + done by DJS. */ + if (allInnerJoins) { - // if (joiners[i]->isFinished() && i < limit) { // debugging - if (joiners[i]->isFinished()) + for (i = 0; i < smallSideCount; i++) { - // cout << "2joiner " << i << " " << hex << (uint64_t) joiners[i].get() << dec << " -> TBPS" << endl; - tbpsJoiners.push_back(joiners[i]); + if (joiners[i]->isFinished()) + tbpsJoiners.push_back(joiners[i]); + else + { + joinIsTooBig = true; + joiners[i]->setConvertToDiskJoin(); + djsJoiners.push_back(joiners[i]); + djsJoinerMap.push_back(i); + } } - else - break; } - - for (; i < smallSideCount; i++) + else { - joinIsTooBig = true; - joiners[i]->setConvertToDiskJoin(); - // cout << "2joiner " << i << " " << hex << (uint64_t) joiners[i].get() << dec << " -> DJS" << endl; - djsJoiners.push_back(joiners[i]); - djsJoinerMap.push_back(i); + for (i = 0; i < smallSideCount; i++) + { + if (joiners[i]->isFinished()) + tbpsJoiners.push_back(joiners[i]); + else + break; + } + + for (; i < smallSideCount; i++) + { + joinIsTooBig = true; + joiners[i]->setConvertToDiskJoin(); + djsJoiners.push_back(joiners[i]); + djsJoinerMap.push_back(i); + } } } } diff --git a/dbcon/joblist/tuplehashjoin.h b/dbcon/joblist/tuplehashjoin.h index 914e230d8..a1e2fb2a1 100644 --- a/dbcon/joblist/tuplehashjoin.h +++ b/dbcon/joblist/tuplehashjoin.h @@ -620,6 +620,8 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep int64_t djsSmallLimit; int64_t djsLargeLimit; uint64_t djsPartitionSize; + uint32_t djsMaxPartitionTreeDepth; + bool djsForceRun; bool isDML; bool allowDJS; diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index da5250144..290c5f101 100644 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -6640,7 +6640,8 @@ void setExecutionParams(gp_walk_info& gwi, SCSEP& csep) csep->djsSmallSideLimit(get_diskjoin_smallsidelimit(gwi.thd) * 1024ULL * 1024); csep->djsLargeSideLimit(get_diskjoin_largesidelimit(gwi.thd) * 1024ULL * 1024); csep->djsPartitionSize(get_diskjoin_bucketsize(gwi.thd) * 1024ULL * 1024); - + csep->djsMaxPartitionTreeDepth(get_diskjoin_max_partition_tree_depth(gwi.thd)); + csep->djsForceRun(get_diskjoin_force_run(gwi.thd)); if (get_um_mem_limit(gwi.thd) == 0) csep->umMemLimit(numeric_limits::max()); else diff --git a/dbcon/mysql/ha_mcs_sysvars.cpp b/dbcon/mysql/ha_mcs_sysvars.cpp index 9eecc6d73..d597e772e 100644 --- a/dbcon/mysql/ha_mcs_sysvars.cpp +++ b/dbcon/mysql/ha_mcs_sysvars.cpp @@ -135,6 +135,12 @@ static MYSQL_THDVAR_ULONG(diskjoin_bucketsize, PLUGIN_VAR_RQCMDARG, "The maximum size in MB of each 'small side' table in memory.", NULL, NULL, 100, 1, ~0U, 1); +static MYSQL_THDVAR_ULONG(diskjoin_max_partition_tree_depth, PLUGIN_VAR_RQCMDARG, + "The maximum size of partition tree depth.", NULL, NULL, 8, 1, ~0U, 1); + +static MYSQL_THDVAR_BOOL(diskjoin_force_run, PLUGIN_VAR_RQCMDARG, "Force run for the disk join step.", NULL, + NULL, 0); + static MYSQL_THDVAR_ULONG(um_mem_limit, PLUGIN_VAR_RQCMDARG, "Per user Memory limit(MB). Switch to disk-based JOIN when limit is reached", NULL, NULL, 0, 0, ~0U, 1); @@ -224,6 +230,8 @@ st_mysql_sys_var* mcs_system_variables[] = {MYSQL_SYSVAR(compression_type), MYSQL_SYSVAR(diskjoin_smallsidelimit), MYSQL_SYSVAR(diskjoin_largesidelimit), MYSQL_SYSVAR(diskjoin_bucketsize), + MYSQL_SYSVAR(diskjoin_max_partition_tree_depth), + MYSQL_SYSVAR(diskjoin_force_run), MYSQL_SYSVAR(um_mem_limit), MYSQL_SYSVAR(double_for_decimal_math), MYSQL_SYSVAR(decimal_overflow_check), @@ -420,6 +428,24 @@ void set_diskjoin_bucketsize(THD* thd, ulong value) THDVAR(thd, diskjoin_bucketsize) = value; } +ulong get_diskjoin_max_partition_tree_depth(THD* thd) +{ + return (thd == NULL) ? 0 : THDVAR(thd, diskjoin_max_partition_tree_depth); +} +void set_diskjoin_max_partition_tree_depth(THD* thd, ulong value) +{ + THDVAR(thd, diskjoin_max_partition_tree_depth) = value; +} + +bool get_diskjoin_force_run(THD* thd) +{ + return (thd == NULL) ? 0 : THDVAR(thd, diskjoin_force_run); +} +void set_diskjoin_force_run(THD* thd, bool value) +{ + THDVAR(thd, diskjoin_force_run) = value; +} + ulong get_um_mem_limit(THD* thd) { return (thd == NULL) ? 0 : THDVAR(thd, um_mem_limit); @@ -598,4 +624,4 @@ const char* get_s3_region(THD* thd) void set_s3_region(THD* thd, char* value) { THDVAR(thd, s3_region) = value; -} \ No newline at end of file +} diff --git a/dbcon/mysql/ha_mcs_sysvars.h b/dbcon/mysql/ha_mcs_sysvars.h index ec116987d..03abc67e2 100644 --- a/dbcon/mysql/ha_mcs_sysvars.h +++ b/dbcon/mysql/ha_mcs_sysvars.h @@ -108,6 +108,12 @@ void set_diskjoin_largesidelimit(THD* thd, ulong value); ulong get_diskjoin_bucketsize(THD* thd); void set_diskjoin_bucketsize(THD* thd, ulong value); +bool get_diskjoin_force_run(THD* thd); +void set_diskjoin_force_run(THD* thd, bool value); + +ulong get_diskjoin_max_partition_tree_depth(THD* thd); +void set_diskjoin_max_partition_tree_depth(THD* thd, ulong value); + ulong get_um_mem_limit(THD* thd); void set_um_mem_limit(THD* thd, ulong value); diff --git a/utils/joiner/joinpartition.cpp b/utils/joiner/joinpartition.cpp index 1b9d414a4..7108a9407 100644 --- a/utils/joiner/joinpartition.cpp +++ b/utils/joiner/joinpartition.cpp @@ -34,6 +34,7 @@ using namespace logging; namespace joiner { +// FIXME: Possible overflow, we have to null it after clearing files. uint64_t uniqueNums = 0; JoinPartition::JoinPartition() @@ -44,7 +45,7 @@ JoinPartition::JoinPartition() /* This is the ctor used by THJS */ JoinPartition::JoinPartition(const RowGroup& lRG, const RowGroup& sRG, const vector& smallKeys, const vector& largeKeys, bool typeless, bool antiWMN, bool hasFEFilter, - uint64_t totalUMMemory, uint64_t partitionSize) + uint64_t totalUMMemory, uint64_t partitionSize, uint32_t maxPartitionTreeDepth) : smallRG(sRG) , largeRG(lRG) , smallKeyCols(smallKeys) @@ -54,6 +55,7 @@ JoinPartition::JoinPartition(const RowGroup& lRG, const RowGroup& sRG, const vec , htSizeEstimate(0) , htTargetSize(partitionSize) , rootNode(true) + , canSplit(true) , antiWithMatchNulls(antiWMN) , needsAllNullRows(hasFEFilter) , gotNullRow(false) @@ -63,6 +65,8 @@ JoinPartition::JoinPartition(const RowGroup& lRG, const RowGroup& sRG, const vec , maxSmallSize(0) , nextSmallOffset(0) , nextLargeOffset(0) + , currentPartitionTreeDepth(0) + , maxPartitionTreeDepth(maxPartitionTreeDepth) { config::Config* config = config::Config::makeConfig(); string cfgTxt; @@ -105,20 +109,17 @@ JoinPartition::JoinPartition(const RowGroup& lRG, const RowGroup& sRG, const vec } if (compressionType == "LZ4") - { compressor.reset(new compress::CompressInterfaceLZ4()); - } else - { compressor.reset(new compress::CompressInterfaceSnappy()); - } for (uint32_t i = 0; i < bucketCount; i++) - buckets.push_back(boost::shared_ptr(new JoinPartition(*this, false))); + buckets.push_back( + boost::shared_ptr(new JoinPartition(*this, false, currentPartitionTreeDepth + 1))); } /* Ctor used by JoinPartition on expansion, creates JP's in filemode */ -JoinPartition::JoinPartition(const JoinPartition& jp, bool splitMode) +JoinPartition::JoinPartition(const JoinPartition& jp, bool splitMode, uint32_t currentPartitionTreeDepth) : smallRG(jp.smallRG) , largeRG(jp.largeRG) , smallKeyCols(jp.smallKeyCols) @@ -131,6 +132,7 @@ JoinPartition::JoinPartition(const JoinPartition& jp, bool splitMode) , htSizeEstimate(0) , htTargetSize(jp.htTargetSize) , rootNode(false) + , canSplit(true) , antiWithMatchNulls(jp.antiWithMatchNulls) , needsAllNullRows(jp.needsAllNullRows) , gotNullRow(false) @@ -141,17 +143,12 @@ JoinPartition::JoinPartition(const JoinPartition& jp, bool splitMode) , maxSmallSize(0) , nextSmallOffset(0) , nextLargeOffset(0) + , currentPartitionTreeDepth(currentPartitionTreeDepth) + , maxPartitionTreeDepth(jp.maxPartitionTreeDepth) { ostringstream os; fileMode = true; - // tuning issue: with the defaults, each 100MB bucket would split s.t. the children - // could store another 4GB total. Given a good hash and evenly distributed data, - // the first level of expansion would happen for all JPs at once, giving a total - // capacity of (4GB * 40) = 160GB, when actual usage at that point is a little over 4GB. - // Instead, each will double in size, giving a capacity of 8GB -> 16 -> 32, and so on. - // bucketCount = jp.bucketCount; - bucketCount = 2; config::Config* config = config::Config::makeConfig(); filenamePrefix = config->getTempFileDir(config::Config::TempDirPurpose::Joins); @@ -270,9 +267,6 @@ int64_t JoinPartition::doneInsertingSmallData() smallSizeOnDisk += leafNodeIncrement; } - // else - // cout << uniqueID << " htsizeestimate = " << htSizeEstimate << endl; - if (!rootNode) { buffer.reinit(largeRG); @@ -314,25 +308,92 @@ int64_t JoinPartition::doneInsertingLargeData() return ret; } +bool JoinPartition::canConvertToSplitMode() +{ + // TODO: Make depth configurable. + if (!canSplit || currentPartitionTreeDepth >= maxPartitionTreeDepth) + return false; + + ByteStream bs; + RowGroup& rg = smallRG; + Row& row = smallRow; + RGData rgData; + uint64_t totalRowCount = 0; + std::unordered_map rowDist; + + nextSmallOffset = 0; + while (1) + { + uint32_t hash; + readByteStream(0, &bs); + + if (bs.length() == 0) + break; + + rgData.deserialize(bs); + rg.setData(&rgData); + + for (uint32_t j = 0, e = rg.getRowCount(); j < e; ++j) + { + rg.getRow(j, &row); + + if (antiWithMatchNulls && hasNullJoinColumn(row)) + continue; + + uint64_t tmp; + if (typelessJoin) + hash = getHashOfTypelessKey(row, smallKeyCols, hashSeed) % bucketCount; + else + { + if (UNLIKELY(row.isUnsigned(smallKeyCols[0]))) + tmp = row.getUintField(smallKeyCols[0]); + else + tmp = row.getIntField(smallKeyCols[0]); + + hash = hasher((char*)&tmp, 8, hashSeed); + hash = hasher.finalize(hash, 8) % bucketCount; + } + + totalRowCount++; + rowDist[hash]++; + } + } + + for (const auto& [hash, currentRowCount] : rowDist) + { + if (currentRowCount == totalRowCount) + { + canSplit = false; + break; + } + } + + rg.setData(&buffer); + rg.resetRowGroup(0); + rg.getRow(0, &row); + + return canSplit; +} + int64_t JoinPartition::convertToSplitMode() { - int i, j; +#ifdef DEBUG_DJS + cout << "Convert to split mode " << endl; +#endif ByteStream bs; RGData rgData; uint32_t hash; uint64_t tmp; int64_t ret = -(int64_t)smallSizeOnDisk; // smallFile gets deleted - boost::scoped_array rowDist(new uint32_t[bucketCount]); - uint32_t rowCount = 0; - - memset(rowDist.get(), 0, sizeof(uint32_t) * bucketCount); fileMode = false; + htSizeEstimate = 0; smallSizeOnDisk = 0; - buckets.reserve(bucketCount); - for (i = 0; i < (int)bucketCount; i++) - buckets.push_back(boost::shared_ptr(new JoinPartition(*this, false))); + buckets.reserve(bucketCount); + for (uint32_t i = 0; i < bucketCount; i++) + buckets.push_back( + boost::shared_ptr(new JoinPartition(*this, false, currentPartitionTreeDepth + 1))); RowGroup& rg = smallRG; Row& row = smallRow; @@ -348,7 +409,7 @@ int64_t JoinPartition::convertToSplitMode() rgData.deserialize(bs); rg.setData(&rgData); - for (j = 0; j < (int)rg.getRowCount(); j++) + for (uint32_t j = 0; j < rg.getRowCount(); j++) { rg.getRow(j, &row); @@ -356,7 +417,7 @@ int64_t JoinPartition::convertToSplitMode() { if (needsAllNullRows || !gotNullRow) { - for (j = 0; j < (int)bucketCount; j++) + for (j = 0; j < bucketCount; j++) ret += buckets[j]->insertSmallSideRow(row); gotNullRow = true; @@ -377,20 +438,14 @@ int64_t JoinPartition::convertToSplitMode() hash = hasher((char*)&tmp, 8, hashSeed); hash = hasher.finalize(hash, 8) % bucketCount; } - - rowCount++; - rowDist[hash]++; - ret += buckets[hash]->insertSmallSideRow(row); + buckets[hash]->insertSmallSideRow(row); } } + boost::filesystem::remove(smallFilename); smallFilename.clear(); - for (i = 0; i < (int)bucketCount; i++) - if (rowDist[i] == rowCount) - throw IDBExcept("All rows hashed to the same bucket", ERR_DBJ_DATA_DISTRIBUTION); - rg.setData(&buffer); rg.resetRowGroup(0); rg.getRow(0, &row); @@ -418,30 +473,18 @@ int64_t JoinPartition::processSmallBuffer(RGData& rgData) int64_t ret = 0; rg.setData(&rgData); - // if (rootNode) - // cout << "smallside RGData: " << rg.toString() << endl; if (fileMode) { ByteStream bs; rg.serializeRGData(bs); - // cout << "writing RGData: " << rg.toString() << endl; ret = writeByteStream(0, bs); - // cout << "wrote " << ret << " bytes" << endl; - /* Check whether this partition is now too big -> convert to split mode. - - The current estimate is based on 100M 4-byte rows = 4GB. The total size is - the amount stored in RowGroups in mem + the size of the hash table. The RowGroups - in that case use 600MB, so 3.4GB is used by the hash table. 3.4GB/100M rows = 34 bytes/row - */ - htSizeEstimate += rg.getDataSize() + (34 * rg.getRowCount()); - - if (htSizeEstimate > htTargetSize) + htSizeEstimate += rg.getRowCount() * rg.getColumnCount() * 64; + // Check whether this partition is now too big -> convert to split mode. + if (htTargetSize < htSizeEstimate && canConvertToSplitMode()) ret += convertToSplitMode(); - - // cout << "wrote some data, returning " << ret << endl; } else { @@ -478,19 +521,16 @@ int64_t JoinPartition::processSmallBuffer(RGData& rgData) hash = hasher.finalize(hash, 8) % bucketCount; } - // cout << "hashing smallside row: " << row.toString() << endl; ret += buckets[hash]->insertSmallSideRow(row); } - - // cout << "distributed rows, returning " << ret << endl; } smallSizeOnDisk += ret; return ret; } -/* the difference between processSmall & processLarge is mostly the names of - variables being small* -> large*, template? */ +// the difference between processSmall & processLarge is mostly the names of +// variables being small* -> large*, template? */ int64_t JoinPartition::processLargeBuffer() { @@ -511,11 +551,8 @@ int64_t JoinPartition::processLargeBuffer(RGData& rgData) rg.setData(&rgData); - // if (rootNode) - // cout << "largeside RGData: " << rg.toString() << endl; - - /* Need to fail a query with an anti join, an FE filter, and a NULL row on the - large side b/c it needs to be joined with the entire small side table. */ + // Need to fail a query with an anti join, an FE filter, and a NULL row on the + // large side b/c it needs to be joined with the entire small side table. if (antiWithMatchNulls && needsAllNullRows) { rg.getRow(0, &row); @@ -534,9 +571,7 @@ int64_t JoinPartition::processLargeBuffer(RGData& rgData) { ByteStream bs; rg.serializeRGData(bs); - // cout << "writing large RGData: " << rg.toString() << endl; ret = writeByteStream(1, bs); - // cout << "wrote " << ret << " bytes" << endl; } else { @@ -560,7 +595,6 @@ int64_t JoinPartition::processLargeBuffer(RGData& rgData) hash = hasher.finalize(hash, 8) % bucketCount; } - // cout << "large side hashing row: " << row.toString() << endl; ret += buckets[hash]->insertLargeSideRow(row); } } @@ -569,49 +603,18 @@ int64_t JoinPartition::processLargeBuffer(RGData& rgData) return ret; } -bool JoinPartition::getNextPartition(vector* smallData, uint64_t* partitionID, JoinPartition** jp) +void JoinPartition::collectJoinPartitions(std::vector& joinPartitions) { if (fileMode) { - ByteStream bs; - RGData rgData; - - if (nextPartitionToReturn > 0) - return false; - - // cout << "reading the small side" << endl; - nextSmallOffset = 0; - - while (1) - { - readByteStream(0, &bs); - - if (bs.length() == 0) - break; - - rgData.deserialize(bs); - // smallRG.setData(&rgData); - // cout << "read a smallRG with " << smallRG.getRowCount() << " rows" << endl; - smallData->push_back(rgData); - } - - nextPartitionToReturn = 1; - *partitionID = uniqueID; - *jp = this; - return true; + joinPartitions.push_back(this); + return; } - bool ret = false; - - while (!ret && nextPartitionToReturn < bucketCount) + for (uint32_t currentBucket = 0; currentBucket < bucketCount; ++currentBucket) { - ret = buckets[nextPartitionToReturn]->getNextPartition(smallData, partitionID, jp); - - if (!ret) - nextPartitionToReturn++; + buckets[currentBucket]->collectJoinPartitions(joinPartitions); } - - return ret; } boost::shared_ptr JoinPartition::getNextLargeRGData() @@ -627,10 +630,7 @@ boost::shared_ptr JoinPartition::getNextLargeRGData() ret->deserialize(bs); } else - { - boost::filesystem::remove(largeFilename); - largeSizeOnDisk = 0; - } + nextLargeOffset = 0; return ret; } @@ -682,7 +682,6 @@ void JoinPartition::initForLargeSideFeed() void JoinPartition::saveSmallSidePartition(vector& rgData) { - // cout << "JP: saving partition: " << id << endl; htSizeEstimate = 0; smallSizeOnDisk = 0; nextSmallOffset = 0; @@ -806,7 +805,7 @@ uint64_t JoinPartition::writeByteStream(int which, ByteStream& bs) if (!useCompression) { - ret = len + 4; + ret = len + sizeof(len); fs.write((char*)&len, sizeof(len)); fs.write((char*)bs.buf(), len); saveErrno = errno; @@ -828,7 +827,7 @@ uint64_t JoinPartition::writeByteStream(int which, ByteStream& bs) boost::scoped_array compressed(new uint8_t[maxSize]); compressor->compress((char*)bs.buf(), len, (char*)compressed.get(), &actualSize); - ret = actualSize + 4 + 8; // sizeof (size_t) == 8. Why 4? + ret = actualSize + sizeof(len); // sizeof (size_t) == 8. Why 4? fs.write((char*)&actualSize, sizeof(actualSize)); // Save uncompressed len. fs.write((char*)&len, sizeof(len)); @@ -843,7 +842,7 @@ uint64_t JoinPartition::writeByteStream(int which, ByteStream& bs) throw IDBExcept(os.str().c_str(), ERR_DBJ_FILE_IO_ERROR); } - totalBytesWritten += sizeof(actualSize) + actualSize; + totalBytesWritten += sizeof(len) + actualSize; } bs.advance(len); diff --git a/utils/joiner/joinpartition.h b/utils/joiner/joinpartition.h index 11f341aef..928cd7e36 100644 --- a/utils/joiner/joinpartition.h +++ b/utils/joiner/joinpartition.h @@ -33,8 +33,8 @@ class JoinPartition JoinPartition(const rowgroup::RowGroup& largeRG, const rowgroup::RowGroup& smallRG, const std::vector& smallkeyCols, const std::vector& largeKeyCols, bool typeless, bool isAntiWithMatchNulls, bool hasFEFilter, uint64_t totalUMMemory, - uint64_t partitionSize); - JoinPartition(const JoinPartition&, bool splitMode); + uint64_t partitionSize, uint32_t maxPartitionTreeDepth); + JoinPartition(const JoinPartition&, bool splitMode, uint32_t depth); virtual ~JoinPartition(); @@ -52,6 +52,8 @@ class JoinPartition /* Returns true if there are more partitions to fetch, false otherwise */ bool getNextPartition(std::vector* smallData, uint64_t* partitionID, JoinPartition** jp); + void collectJoinPartitions(std::vector& joinPartitions); + boost::shared_ptr getNextLargeRGData(); /* It's important to follow the sequence of operations to maintain the correct @@ -100,11 +102,21 @@ class JoinPartition { return maxSmallSize; } + void readByteStream(int which, messageqcpp::ByteStream* bs); + uint64_t getUniqueID() + { + return uniqueID; + } + void setNextSmallOffset(size_t offset) + { + nextSmallOffset = offset; + } protected: private: void initBuffers(); int64_t convertToSplitMode(); + bool canConvertToSplitMode(); int64_t processSmallBuffer(); int64_t processLargeBuffer(); @@ -137,7 +149,7 @@ class JoinPartition uint64_t largeSizeOnDisk; utils::Hasher_r hasher; bool rootNode; - + bool canSplit; /* Not-in antijoin hack. A small-side row with a null join column has to go into every partition or into one always resident partition (TBD). @@ -148,7 +160,6 @@ class JoinPartition bool hasNullJoinColumn(rowgroup::Row&); // which = 0 -> smallFile, which = 1 -> largeFile - void readByteStream(int which, messageqcpp::ByteStream* bs); uint64_t writeByteStream(int which, messageqcpp::ByteStream& bs); /* Compression support */ @@ -163,6 +174,9 @@ class JoinPartition /* file descriptor reduction */ size_t nextSmallOffset; size_t nextLargeOffset; -}; + // Options to control partition tree depth. + uint32_t currentPartitionTreeDepth; + uint32_t maxPartitionTreeDepth; +}; } // namespace joiner