diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index 639054c06..d5fbf133e 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -282,8 +282,8 @@ void TupleHashJoinStep::startSmallRunners(uint index) } else { - joiners[index].reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index][0], largeSideKeys[index][0], - jt, &jobstepThreadPool, resourceManager, numCores)); + joiners[index].reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index][0], + largeSideKeys[index][0], jt, &jobstepThreadPool, resourceManager, numCores)); } joiners[index]->setUniqueLimit(uniqueLimit); @@ -397,6 +397,54 @@ void TupleHashJoinStep::startSmallRunners(uint index) } } +void TupleHashJoinStep::outOfMemoryHandler(std::shared_ptr joiner) +{ + boost::unique_lock sl(saneErrMsg); + + if (cancelled()) + return; + if (!allowDJS || isDML || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000)) + { + joinIsTooBig = true; + ostringstream oss; + oss << "(" << __LINE__ << ") " << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG); + fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str()); + errorMessage(oss.str()); + status(logging::ERR_JOIN_TOO_BIG); + cout << "Join is too big, raise the UM join limit for now (small runner)" << endl; + abort(); + } + else if (allowDJS) + { + joiner->setConvertToDiskJoin(); + // TODO RGData that triggers this path is lost. Need to store it to pass it future. + } +} + +void TupleHashJoinStep::outOfMemoryHandler(std::shared_ptr joiner) +{ + boost::unique_lock sl(saneErrMsg); + + if (cancelled()) + return; + if (!allowDJS || isDML || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000)) + { + joinIsTooBig = true; + ostringstream oss; + oss << "(" << __LINE__ << ") " << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG); + fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str()); + errorMessage(oss.str()); + status(logging::ERR_JOIN_TOO_BIG); + cout << "Join is too big, raise the UM join limit for now (small runner)" << endl; + abort(); + } + else if (allowDJS) + { + joiner->setConvertToDiskJoin(); + // TODO RGData that triggers this path is lost. Need to store it to pass it future. + } +} + /* Index is which small input to read. */ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t* jobs) { @@ -420,6 +468,7 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t* ssize_t rgSize; bool gotMem; goto next; + // TODO need to quit this loop early of on-disk flag is set by any of the small size threads. while (more && !cancelled()) { smallRG.setData(&oneRG); @@ -444,25 +493,28 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t* if disk join is enabled, use it. else abort. */ - boost::unique_lock sl(saneErrMsg); - if (cancelled()) - return; - if (!allowDJS || isDML || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000)) - { - joinIsTooBig = true; - ostringstream oss; - oss << "(" << __LINE__ << ") " - << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG); - fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str()); - errorMessage(oss.str()); - status(logging::ERR_JOIN_TOO_BIG); - cout << "Join is too big, raise the UM join limit for now (small runner)" << endl; - abort(); - } - else if (allowDJS) - joiner->setConvertToDiskJoin(); + // boost::unique_lock sl(saneErrMsg); - return; + // if (cancelled()) + // return; + // if (!allowDJS || isDML || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000)) + // { + // joinIsTooBig = true; + // ostringstream oss; + // oss << "(" << __LINE__ << ") " + // << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG); + // fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str()); + // errorMessage(oss.str()); + // status(logging::ERR_JOIN_TOO_BIG); + // cout << "Join is too big, raise the UM join limit for now (small runner)" << endl; + // abort(); + // } + // else if (allowDJS) + // { + // joiner->setConvertToDiskJoin(); + // // TODO RGData that triggers this path is lost. Need to store it to pass it future. + // } + return outOfMemoryHandler(joiner); } joiner->insertRGData(smallRG, threadID); if (!joiner->inUM() && (memUsedByEachJoin[index] > pmMemLimit)) @@ -483,19 +535,38 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t* } catch (std::bad_alloc& exc) { - if (!joinIsTooBig && - (isDML || !allowDJS || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000))) - { - joinIsTooBig = true; - ostringstream oss; - oss << "(" << __LINE__ << ") " - << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG); - fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str()); - errorMessage(oss.str()); - status(logging::ERR_JOIN_TOO_BIG); - cout << "Join is too big, raise the UM join limit for now" << endl; - abort(); - } + // boost::unique_lock sl(saneErrMsg); + + // if (!joinIsTooBig && + // (isDML || !allowDJS || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000))) + // { + // joinIsTooBig = true; + // ostringstream oss; + // oss << "(" << __LINE__ << ") " + // << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG); + // fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str()); + // errorMessage(oss.str()); + // status(logging::ERR_JOIN_TOO_BIG); + // cout << "Join is too big, raise the UM join limit for now" << endl; + // abort(); + // } + // else if (allowDJS) + // { + // joiner->setConvertToDiskJoin(); + // // RGData that triggers OOM is saved but the hash table is too big. + // // TBD need to store RGData if counting allocator will be used for RGData. + + // // Need to clean hash tables and vec. + // // joiner->clearData(); // `finished` flag can implicitly interfere. + // } + // return; + + // RGData that triggers OOM is saved but the hash table is too big. + // TBD need to store RGData if counting allocator will be used for RGData. + + // Need to clean hash tables and vec. + // joiner->clearData(); // `finished` flag can implicitly interfere. + return outOfMemoryHandler(joiner); } catch (...) { diff --git a/dbcon/joblist/tuplehashjoin.h b/dbcon/joblist/tuplehashjoin.h index 9b8adafe0..f65391e25 100644 --- a/dbcon/joblist/tuplehashjoin.h +++ b/dbcon/joblist/tuplehashjoin.h @@ -650,6 +650,7 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep bool stopMemTracking; void trackMem(uint index); void startSmallRunners(uint index); + void outOfMemoryHandler(std::shared_ptr joiner); friend class DiskJoinStep; }; diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index a39459e95..fcff86b61 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -2227,8 +2227,9 @@ int BatchPrimitiveProcessor::operator()() validCPData = false; cpDataFromDictScan = false; - auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator(); - messageqcpp::SBS bs(new ByteStream(allocator)); + // auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator(); + // messageqcpp::SBS bs(new ByteStream(allocator)); + messageqcpp::SBS bs(new ByteStream()); #ifdef PRIMPROC_STOPWATCH stopwatch->start("BPP() execute"); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 67578a98d..62b91a36e 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -81,6 +81,7 @@ if (WITH_UNITTESTS) target_link_libraries(counting_allocator ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES}) gtest_add_tests(TARGET counting_allocator TEST_PREFIX columnstore:) + set_source_files_properties(poolallocator.cpp PROPERTIES COMPILE_FLAGS "-Wno-sign-compare") add_executable(poolallocator poolallocator.cpp) add_dependencies(poolallocator googletest) target_link_libraries(poolallocator ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES}) diff --git a/utils/joiner/tuplejoiner.cpp b/utils/joiner/tuplejoiner.cpp index 1818a5373..8125d5d18 100644 --- a/utils/joiner/tuplejoiner.cpp +++ b/utils/joiner/tuplejoiner.cpp @@ -305,6 +305,7 @@ void TupleJoiner::bucketsToTables(buckets_t* buckets, hash_table_t* tables) wasProductive = true; buckets[i].clear(); } + // TODO use CV here instead of busy sleep if (!done && !wasProductive) ::usleep(1000 * numCores); }