diff --git a/dbcon/joblist/diskjoinstep.cpp b/dbcon/joblist/diskjoinstep.cpp index 956aa56ee..86e6fa886 100644 --- a/dbcon/joblist/diskjoinstep.cpp +++ b/dbcon/joblist/diskjoinstep.cpp @@ -399,12 +399,12 @@ void DiskJoinStep::joinFcn(const uint32_t threadID) RowGroup l_outputRG = outputRG; Row l_largeRow; Row l_joinFERow, l_outputRow, baseRow; - vector > joinMatches; + vector> joinMatches; auto new_row = new Row[1]; std::shared_ptr smallRowTemplates(new_row); vector> joiners; std::shared_ptr[]> colMappings, fergMappings; - boost::scoped_array > smallNullMem; + boost::scoped_array> smallNullMem; boost::scoped_array joinFEMem; Row smallNullRow; @@ -515,21 +515,20 @@ void DiskJoinStep::joinFcn(const uint32_t threadID) if (l_outputRG.getRowCount() == 8192) { - outputDL->insert(rgData); + outputResult(rgData); // cout << "inserting a full RG" << endl; if (thjs) { + // FIXME: Possible false positive. Something wrong with this calculation, just put a warning + // until fixed. if (!thjs->getMemory(l_outputRG.getMaxDataSize())) { + // FIXME: This is also looks wrong. // calculate guess of size required for error message uint64_t memReqd = (unmatched.size() * outputRG.getDataSize(1)) / 1048576; - Message::Args args; - args.add(memReqd); - args.add(thjs->resourceManager->getConfiguredUMMemLimit() / 1048576); - std::cerr << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_RESULT_TOO_BIG, - args) - << " @" << __FILE__ << ":" << __LINE__; - throw logging::IDBExcept(logging::ERR_JOIN_RESULT_TOO_BIG, args); + uint64_t memLimit = thjs->resourceManager->getConfiguredUMMemLimit() / 1048576; + std::cerr << "DiskJoin::joinFcn() possible OOM for the join result, mem required: " + << memReqd << " mem limit: " << memLimit << std::endl; } } @@ -543,7 +542,7 @@ void DiskJoinStep::joinFcn(const uint32_t threadID) } if (l_outputRG.getRowCount()) - outputResult({rgData}); + outputResult(rgData); if (thjs) thjs->returnMemory(); } @@ -625,10 +624,16 @@ void DiskJoinStep::prepareJobs(const std::vector& joinPartitions void DiskJoinStep::outputResult(const std::vector& result) { std::lock_guard lk(outputMutex); - for (const auto &rgData : result) + for (const auto& rgData : result) outputDL->insert(rgData); } +void DiskJoinStep::outputResult(const rowgroup::RGData& result) +{ + std::lock_guard lk(outputMutex); + outputDL->insert(result); +} + void DiskJoinStep::spawnJobs(const std::vector>& joinPartitionsJobs, const uint32_t smallSideSizeLimitPerThread) { diff --git a/dbcon/joblist/diskjoinstep.h b/dbcon/joblist/diskjoinstep.h index 0a6f90586..1756fb46d 100644 --- a/dbcon/joblist/diskjoinstep.h +++ b/dbcon/joblist/diskjoinstep.h @@ -53,6 +53,7 @@ class DiskJoinStep : public JobStep void prepareJobs(const std::vector& joinPartitions, JoinPartitionJobs& joinPartitionsJobs); void outputResult(const std::vector& result); + void outputResult(const rowgroup::RGData& result); void spawnJobs(const std::vector>& joinPartitionsJobs, const uint32_t smallSideSizeLimitPerThread); boost::shared_ptr jp;