1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

feat(): TupleHashJoin now handles bad_alloc case switching to disk-based if it is enabled

This commit is contained in:
drrtuy
2025-01-22 15:17:25 +00:00
parent f594d27685
commit 1aa2f3a42b
5 changed files with 110 additions and 35 deletions

View File

@ -282,8 +282,8 @@ void TupleHashJoinStep::startSmallRunners(uint index)
} }
else else
{ {
joiners[index].reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index][0], largeSideKeys[index][0], joiners[index].reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index][0],
jt, &jobstepThreadPool, resourceManager, numCores)); largeSideKeys[index][0], jt, &jobstepThreadPool, resourceManager, numCores));
} }
joiners[index]->setUniqueLimit(uniqueLimit); joiners[index]->setUniqueLimit(uniqueLimit);
@ -397,6 +397,54 @@ void TupleHashJoinStep::startSmallRunners(uint index)
} }
} }
void TupleHashJoinStep::outOfMemoryHandler(std::shared_ptr<joiner::TupleJoiner> joiner)
{
boost::unique_lock<boost::mutex> sl(saneErrMsg);
if (cancelled())
return;
if (!allowDJS || isDML || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000))
{
joinIsTooBig = true;
ostringstream oss;
oss << "(" << __LINE__ << ") " << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG);
fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str());
errorMessage(oss.str());
status(logging::ERR_JOIN_TOO_BIG);
cout << "Join is too big, raise the UM join limit for now (small runner)" << endl;
abort();
}
else if (allowDJS)
{
joiner->setConvertToDiskJoin();
// TODO RGData that triggers this path is lost. Need to store it to pass it future.
}
}
void TupleHashJoinStep::outOfMemoryHandler(std::shared_ptr<joiner::TupleJoiner> joiner)
{
boost::unique_lock<boost::mutex> sl(saneErrMsg);
if (cancelled())
return;
if (!allowDJS || isDML || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000))
{
joinIsTooBig = true;
ostringstream oss;
oss << "(" << __LINE__ << ") " << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG);
fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str());
errorMessage(oss.str());
status(logging::ERR_JOIN_TOO_BIG);
cout << "Join is too big, raise the UM join limit for now (small runner)" << endl;
abort();
}
else if (allowDJS)
{
joiner->setConvertToDiskJoin();
// TODO RGData that triggers this path is lost. Need to store it to pass it future.
}
}
/* Index is which small input to read. */ /* Index is which small input to read. */
void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t* jobs) void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t* jobs)
{ {
@ -420,6 +468,7 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t*
ssize_t rgSize; ssize_t rgSize;
bool gotMem; bool gotMem;
goto next; goto next;
// TODO need to quit this loop early of on-disk flag is set by any of the small size threads.
while (more && !cancelled()) while (more && !cancelled())
{ {
smallRG.setData(&oneRG); smallRG.setData(&oneRG);
@ -444,25 +493,28 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t*
if disk join is enabled, use it. if disk join is enabled, use it.
else abort. else abort.
*/ */
boost::unique_lock<boost::mutex> sl(saneErrMsg); // boost::unique_lock<boost::mutex> sl(saneErrMsg);
if (cancelled())
return;
if (!allowDJS || isDML || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000))
{
joinIsTooBig = true;
ostringstream oss;
oss << "(" << __LINE__ << ") "
<< logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG);
fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str());
errorMessage(oss.str());
status(logging::ERR_JOIN_TOO_BIG);
cout << "Join is too big, raise the UM join limit for now (small runner)" << endl;
abort();
}
else if (allowDJS)
joiner->setConvertToDiskJoin();
return; // if (cancelled())
// return;
// if (!allowDJS || isDML || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000))
// {
// joinIsTooBig = true;
// ostringstream oss;
// oss << "(" << __LINE__ << ") "
// << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG);
// fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str());
// errorMessage(oss.str());
// status(logging::ERR_JOIN_TOO_BIG);
// cout << "Join is too big, raise the UM join limit for now (small runner)" << endl;
// abort();
// }
// else if (allowDJS)
// {
// joiner->setConvertToDiskJoin();
// // TODO RGData that triggers this path is lost. Need to store it to pass it future.
// }
return outOfMemoryHandler(joiner);
} }
joiner->insertRGData(smallRG, threadID); joiner->insertRGData(smallRG, threadID);
if (!joiner->inUM() && (memUsedByEachJoin[index] > pmMemLimit)) if (!joiner->inUM() && (memUsedByEachJoin[index] > pmMemLimit))
@ -483,19 +535,38 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t*
} }
catch (std::bad_alloc& exc) catch (std::bad_alloc& exc)
{ {
if (!joinIsTooBig && // boost::unique_lock<boost::mutex> sl(saneErrMsg);
(isDML || !allowDJS || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000)))
{ // if (!joinIsTooBig &&
joinIsTooBig = true; // (isDML || !allowDJS || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000)))
ostringstream oss; // {
oss << "(" << __LINE__ << ") " // joinIsTooBig = true;
<< logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG); // ostringstream oss;
fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str()); // oss << "(" << __LINE__ << ") "
errorMessage(oss.str()); // << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG);
status(logging::ERR_JOIN_TOO_BIG); // fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str());
cout << "Join is too big, raise the UM join limit for now" << endl; // errorMessage(oss.str());
abort(); // status(logging::ERR_JOIN_TOO_BIG);
} // cout << "Join is too big, raise the UM join limit for now" << endl;
// abort();
// }
// else if (allowDJS)
// {
// joiner->setConvertToDiskJoin();
// // RGData that triggers OOM is saved but the hash table is too big.
// // TBD need to store RGData if counting allocator will be used for RGData.
// // Need to clean hash tables and vec.
// // joiner->clearData(); // `finished` flag can implicitly interfere.
// }
// return;
// RGData that triggers OOM is saved but the hash table is too big.
// TBD need to store RGData if counting allocator will be used for RGData.
// Need to clean hash tables and vec.
// joiner->clearData(); // `finished` flag can implicitly interfere.
return outOfMemoryHandler(joiner);
} }
catch (...) catch (...)
{ {

View File

@ -650,6 +650,7 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep
bool stopMemTracking; bool stopMemTracking;
void trackMem(uint index); void trackMem(uint index);
void startSmallRunners(uint index); void startSmallRunners(uint index);
void outOfMemoryHandler(std::shared_ptr<joiner::TupleJoiner> joiner);
friend class DiskJoinStep; friend class DiskJoinStep;
}; };

View File

@ -2227,8 +2227,9 @@ int BatchPrimitiveProcessor::operator()()
validCPData = false; validCPData = false;
cpDataFromDictScan = false; cpDataFromDictScan = false;
auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<messageqcpp::BSBufType>(); // auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<messageqcpp::BSBufType>();
messageqcpp::SBS bs(new ByteStream(allocator)); // messageqcpp::SBS bs(new ByteStream(allocator));
messageqcpp::SBS bs(new ByteStream());
#ifdef PRIMPROC_STOPWATCH #ifdef PRIMPROC_STOPWATCH
stopwatch->start("BPP() execute"); stopwatch->start("BPP() execute");

View File

@ -81,6 +81,7 @@ if (WITH_UNITTESTS)
target_link_libraries(counting_allocator ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES}) target_link_libraries(counting_allocator ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES})
gtest_add_tests(TARGET counting_allocator TEST_PREFIX columnstore:) gtest_add_tests(TARGET counting_allocator TEST_PREFIX columnstore:)
set_source_files_properties(poolallocator.cpp PROPERTIES COMPILE_FLAGS "-Wno-sign-compare")
add_executable(poolallocator poolallocator.cpp) add_executable(poolallocator poolallocator.cpp)
add_dependencies(poolallocator googletest) add_dependencies(poolallocator googletest)
target_link_libraries(poolallocator ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES}) target_link_libraries(poolallocator ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES})

View File

@ -305,6 +305,7 @@ void TupleJoiner::bucketsToTables(buckets_t* buckets, hash_table_t* tables)
wasProductive = true; wasProductive = true;
buckets[i].clear(); buckets[i].clear();
} }
// TODO use CV here instead of busy sleep
if (!done && !wasProductive) if (!done && !wasProductive)
::usleep(1000 * numCores); ::usleep(1000 * numCores);
} }