From a55495a73f14b4c022c704c143439ea20e1668e6 Mon Sep 17 00:00:00 2001 From: Patrick LeBlanc Date: Mon, 6 Jan 2020 18:03:45 -0500 Subject: [PATCH] MCOL-3713 - A UM join mem overflow abort bug Seems that segregateJoiners would double-lock a mutex. Not sure why we're only seeing it now. This is a checkpoint commit, probably not final for this bug. --- dbcon/joblist/tuplehashjoin.cpp | 87 +++++++++++++++++++-------------- 1 file changed, 49 insertions(+), 38 deletions(-) diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index ebc657d58..0b1bc13c8 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -132,8 +132,9 @@ TupleHashJoinStep::~TupleHashJoinStep() if (ownsOutputDL) delete outputDL; - for (uint i = 0 ; i < smallDLs.size(); i++) - resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit); + if (memUsedByEachJoin) + for (uint i = 0 ; i < smallDLs.size(); i++) + resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit); //cout << "deallocated THJS, UM memory available: " << resourceManager.availableMemory() << endl; } @@ -313,6 +314,8 @@ void TupleHashJoinStep::startSmallRunners(uint index) memTrackMutex.unlock(); jobstepThreadPool.join(memMonitor); + extendedInfo += "\n"; + /* If there was an error or an abort, drain the input DL, do endOfInput on the output */ if (cancelled()) @@ -334,35 +337,34 @@ void TupleHashJoinStep::startSmallRunners(uint index) " size = " << joiner->size() << endl; */ - extendedInfo += "\n"; - ostringstream oss; - if (joiner->inPM()) - { - oss << "PM join (" << index << ")" << endl; - #ifdef JLF_DEBUG - cout << oss.str(); - #endif - extendedInfo += oss.str(); - } - else if (joiner->inUM() && !joiner->onDisk()) - { - oss << "UM join (" << index << ")" << endl; - #ifdef JLF_DEBUG - cout << oss.str(); - #endif - extendedInfo += oss.str(); - } - - /* Trying to get the extended info to match the original version - It's kind of kludgey at the moment, need to clean it up at some point */ if (!joiner->onDisk()) { - joiner->doneInserting(); - boost::mutex::scoped_lock lk(*fStatsMutexPtr); - fExtendedInfo += extendedInfo; - formatMiniStats(index); + // add extended info, and if not aborted then tell joiner + // we're done reading the small side. + if (joiner->inPM()) + { + oss << "PM join (" << index << ")" << endl; + #ifdef JLF_DEBUG + cout << oss.str(); + #endif + extendedInfo += oss.str(); + } + else if (joiner->inUM()) + { + oss << "UM join (" << index << ")" << endl; + #ifdef JLF_DEBUG + cout << oss.str(); + #endif + extendedInfo += oss.str(); + } + if (!cancelled()) + joiner->doneInserting(); } + + boost::mutex::scoped_lock lk(*fStatsMutexPtr); + fExtendedInfo += extendedInfo; + formatMiniStats(index); } /* Index is which small input to read. */ @@ -404,9 +406,16 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t * gotMem = resourceManager->getMemory(rgSize, sessionMemLimit, false); if (!gotMem) { + /* Mem went over the limit. + If DML or a syscat query, abort. + if disk join is enabled, use it. + else abort. + */ boost::unique_lock sl(saneErrMsg); - if (!joinIsTooBig && (isDML || !allowDJS || (fSessionId & 0x80000000) || - (tableOid() < 3000 && tableOid() >= 1000))) + if (cancelled()) + return; + if (!allowDJS || isDML || (fSessionId & 0x80000000) || + (tableOid() < 3000 && tableOid() >= 1000)) { joinIsTooBig = true; fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_JOIN_TOO_BIG); @@ -414,13 +423,10 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t * status(logging::ERR_JOIN_TOO_BIG); cout << "Join is too big, raise the UM join limit for now (small runner)" << endl; abort(); - break; } - else - { + else if (allowDJS) joiner->setConvertToDiskJoin(); - return; - } + return; } joiner->insertRGData(smallRG, threadID); @@ -699,9 +705,12 @@ void TupleHashJoinStep::hjRunner() segregateJoiners(); /* Need to clean this stuff up. If the query was cancelled before this, and this would have had - a disk join, it's still necessary to construct the DJS objects to finish the abort. */ + a disk join, it's still necessary to construct the DJS objects to finish the abort. + Update: Is this more complicated than scanning joiners for either ondisk() or (not isFinished()) + and draining the corresponding inputs & telling downstream EOF? */ if (!djsJoiners.empty()) { + cout << "Starting disk join" << endl; joinIsTooBig = false; if (!cancelled()) @@ -1894,8 +1903,6 @@ void TupleHashJoinStep::segregateJoiners() bool anyTooLarge = false; uint32_t smallSideCount = smallDLs.size(); - boost::mutex::scoped_lock sl(djsLock); - for (i = 0; i < smallSideCount; i++) { allInnerJoins &= (joinTypes[i] == INNER); @@ -1920,8 +1927,9 @@ void TupleHashJoinStep::segregateJoiners() // Debugging code, this makes all eligible joins disk-based. else { cout << "making all joins disk-based" << endl; + joinIsTooBig = true; for (i = 0; i < smallSideCount; i++) { - joinIsTooBig = true; + joiner[i]->setConvertToDiskJoin(); djsJoiners.push_back(joiners[i]); djsJoinerMap.push_back(i); } @@ -1938,6 +1946,7 @@ void TupleHashJoinStep::segregateJoiners() for (i = 0; i < smallSideCount; i++) { + joiners[i]->setConvertToDiskJoin(); djsJoiners.push_back(joiners[i]); djsJoinerMap.push_back(i); } @@ -1966,6 +1975,7 @@ void TupleHashJoinStep::segregateJoiners() else { joinIsTooBig = true; + joiners[i]->setConvertToDiskJoin(); //cout << "1joiner " << i << " " << hex << (uint64_t) joiners[i].get() << dec << " -> DJS" << endl; djsJoiners.push_back(joiners[i]); djsJoinerMap.push_back(i); @@ -1990,6 +2000,7 @@ void TupleHashJoinStep::segregateJoiners() for (; i < smallSideCount; i++) { joinIsTooBig = true; + joiners[i]->setConvertToDiskJoin(); //cout << "2joiner " << i << " " << hex << (uint64_t) joiners[i].get() << dec << " -> DJS" << endl; djsJoiners.push_back(joiners[i]); djsJoinerMap.push_back(i);