You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-3713 - A UM join mem overflow abort bug
Seems that segregateJoiners would double-lock a mutex. Not sure why we're only seeing it now. This is a checkpoint commit, probably not final for this bug.
This commit is contained in:
@ -132,8 +132,9 @@ TupleHashJoinStep::~TupleHashJoinStep()
|
|||||||
if (ownsOutputDL)
|
if (ownsOutputDL)
|
||||||
delete outputDL;
|
delete outputDL;
|
||||||
|
|
||||||
for (uint i = 0 ; i < smallDLs.size(); i++)
|
if (memUsedByEachJoin)
|
||||||
resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit);
|
for (uint i = 0 ; i < smallDLs.size(); i++)
|
||||||
|
resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit);
|
||||||
|
|
||||||
//cout << "deallocated THJS, UM memory available: " << resourceManager.availableMemory() << endl;
|
//cout << "deallocated THJS, UM memory available: " << resourceManager.availableMemory() << endl;
|
||||||
}
|
}
|
||||||
@ -313,6 +314,8 @@ void TupleHashJoinStep::startSmallRunners(uint index)
|
|||||||
memTrackMutex.unlock();
|
memTrackMutex.unlock();
|
||||||
jobstepThreadPool.join(memMonitor);
|
jobstepThreadPool.join(memMonitor);
|
||||||
|
|
||||||
|
extendedInfo += "\n";
|
||||||
|
|
||||||
/* If there was an error or an abort, drain the input DL,
|
/* If there was an error or an abort, drain the input DL,
|
||||||
do endOfInput on the output */
|
do endOfInput on the output */
|
||||||
if (cancelled())
|
if (cancelled())
|
||||||
@ -334,35 +337,34 @@ void TupleHashJoinStep::startSmallRunners(uint index)
|
|||||||
" size = " << joiner->size() << endl;
|
" size = " << joiner->size() << endl;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
extendedInfo += "\n";
|
|
||||||
|
|
||||||
ostringstream oss;
|
ostringstream oss;
|
||||||
if (joiner->inPM())
|
|
||||||
{
|
|
||||||
oss << "PM join (" << index << ")" << endl;
|
|
||||||
#ifdef JLF_DEBUG
|
|
||||||
cout << oss.str();
|
|
||||||
#endif
|
|
||||||
extendedInfo += oss.str();
|
|
||||||
}
|
|
||||||
else if (joiner->inUM() && !joiner->onDisk())
|
|
||||||
{
|
|
||||||
oss << "UM join (" << index << ")" << endl;
|
|
||||||
#ifdef JLF_DEBUG
|
|
||||||
cout << oss.str();
|
|
||||||
#endif
|
|
||||||
extendedInfo += oss.str();
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Trying to get the extended info to match the original version
|
|
||||||
It's kind of kludgey at the moment, need to clean it up at some point */
|
|
||||||
if (!joiner->onDisk())
|
if (!joiner->onDisk())
|
||||||
{
|
{
|
||||||
joiner->doneInserting();
|
// add extended info, and if not aborted then tell joiner
|
||||||
boost::mutex::scoped_lock lk(*fStatsMutexPtr);
|
// we're done reading the small side.
|
||||||
fExtendedInfo += extendedInfo;
|
if (joiner->inPM())
|
||||||
formatMiniStats(index);
|
{
|
||||||
|
oss << "PM join (" << index << ")" << endl;
|
||||||
|
#ifdef JLF_DEBUG
|
||||||
|
cout << oss.str();
|
||||||
|
#endif
|
||||||
|
extendedInfo += oss.str();
|
||||||
|
}
|
||||||
|
else if (joiner->inUM())
|
||||||
|
{
|
||||||
|
oss << "UM join (" << index << ")" << endl;
|
||||||
|
#ifdef JLF_DEBUG
|
||||||
|
cout << oss.str();
|
||||||
|
#endif
|
||||||
|
extendedInfo += oss.str();
|
||||||
|
}
|
||||||
|
if (!cancelled())
|
||||||
|
joiner->doneInserting();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boost::mutex::scoped_lock lk(*fStatsMutexPtr);
|
||||||
|
fExtendedInfo += extendedInfo;
|
||||||
|
formatMiniStats(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Index is which small input to read. */
|
/* Index is which small input to read. */
|
||||||
@ -404,9 +406,16 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t *
|
|||||||
gotMem = resourceManager->getMemory(rgSize, sessionMemLimit, false);
|
gotMem = resourceManager->getMemory(rgSize, sessionMemLimit, false);
|
||||||
if (!gotMem)
|
if (!gotMem)
|
||||||
{
|
{
|
||||||
|
/* Mem went over the limit.
|
||||||
|
If DML or a syscat query, abort.
|
||||||
|
if disk join is enabled, use it.
|
||||||
|
else abort.
|
||||||
|
*/
|
||||||
boost::unique_lock<boost::mutex> sl(saneErrMsg);
|
boost::unique_lock<boost::mutex> sl(saneErrMsg);
|
||||||
if (!joinIsTooBig && (isDML || !allowDJS || (fSessionId & 0x80000000) ||
|
if (cancelled())
|
||||||
(tableOid() < 3000 && tableOid() >= 1000)))
|
return;
|
||||||
|
if (!allowDJS || isDML || (fSessionId & 0x80000000) ||
|
||||||
|
(tableOid() < 3000 && tableOid() >= 1000))
|
||||||
{
|
{
|
||||||
joinIsTooBig = true;
|
joinIsTooBig = true;
|
||||||
fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_JOIN_TOO_BIG);
|
fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_JOIN_TOO_BIG);
|
||||||
@ -414,13 +423,10 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t *
|
|||||||
status(logging::ERR_JOIN_TOO_BIG);
|
status(logging::ERR_JOIN_TOO_BIG);
|
||||||
cout << "Join is too big, raise the UM join limit for now (small runner)" << endl;
|
cout << "Join is too big, raise the UM join limit for now (small runner)" << endl;
|
||||||
abort();
|
abort();
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
else
|
else if (allowDJS)
|
||||||
{
|
|
||||||
joiner->setConvertToDiskJoin();
|
joiner->setConvertToDiskJoin();
|
||||||
return;
|
return;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
joiner->insertRGData(smallRG, threadID);
|
joiner->insertRGData(smallRG, threadID);
|
||||||
@ -699,9 +705,12 @@ void TupleHashJoinStep::hjRunner()
|
|||||||
segregateJoiners();
|
segregateJoiners();
|
||||||
|
|
||||||
/* Need to clean this stuff up. If the query was cancelled before this, and this would have had
|
/* Need to clean this stuff up. If the query was cancelled before this, and this would have had
|
||||||
a disk join, it's still necessary to construct the DJS objects to finish the abort. */
|
a disk join, it's still necessary to construct the DJS objects to finish the abort.
|
||||||
|
Update: Is this more complicated than scanning joiners for either ondisk() or (not isFinished())
|
||||||
|
and draining the corresponding inputs & telling downstream EOF? */
|
||||||
if (!djsJoiners.empty())
|
if (!djsJoiners.empty())
|
||||||
{
|
{
|
||||||
|
cout << "Starting disk join" << endl;
|
||||||
joinIsTooBig = false;
|
joinIsTooBig = false;
|
||||||
|
|
||||||
if (!cancelled())
|
if (!cancelled())
|
||||||
@ -1894,8 +1903,6 @@ void TupleHashJoinStep::segregateJoiners()
|
|||||||
bool anyTooLarge = false;
|
bool anyTooLarge = false;
|
||||||
uint32_t smallSideCount = smallDLs.size();
|
uint32_t smallSideCount = smallDLs.size();
|
||||||
|
|
||||||
boost::mutex::scoped_lock sl(djsLock);
|
|
||||||
|
|
||||||
for (i = 0; i < smallSideCount; i++)
|
for (i = 0; i < smallSideCount; i++)
|
||||||
{
|
{
|
||||||
allInnerJoins &= (joinTypes[i] == INNER);
|
allInnerJoins &= (joinTypes[i] == INNER);
|
||||||
@ -1920,8 +1927,9 @@ void TupleHashJoinStep::segregateJoiners()
|
|||||||
// Debugging code, this makes all eligible joins disk-based.
|
// Debugging code, this makes all eligible joins disk-based.
|
||||||
else {
|
else {
|
||||||
cout << "making all joins disk-based" << endl;
|
cout << "making all joins disk-based" << endl;
|
||||||
|
joinIsTooBig = true;
|
||||||
for (i = 0; i < smallSideCount; i++) {
|
for (i = 0; i < smallSideCount; i++) {
|
||||||
joinIsTooBig = true;
|
joiner[i]->setConvertToDiskJoin();
|
||||||
djsJoiners.push_back(joiners[i]);
|
djsJoiners.push_back(joiners[i]);
|
||||||
djsJoinerMap.push_back(i);
|
djsJoinerMap.push_back(i);
|
||||||
}
|
}
|
||||||
@ -1938,6 +1946,7 @@ void TupleHashJoinStep::segregateJoiners()
|
|||||||
|
|
||||||
for (i = 0; i < smallSideCount; i++)
|
for (i = 0; i < smallSideCount; i++)
|
||||||
{
|
{
|
||||||
|
joiners[i]->setConvertToDiskJoin();
|
||||||
djsJoiners.push_back(joiners[i]);
|
djsJoiners.push_back(joiners[i]);
|
||||||
djsJoinerMap.push_back(i);
|
djsJoinerMap.push_back(i);
|
||||||
}
|
}
|
||||||
@ -1966,6 +1975,7 @@ void TupleHashJoinStep::segregateJoiners()
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
joinIsTooBig = true;
|
joinIsTooBig = true;
|
||||||
|
joiners[i]->setConvertToDiskJoin();
|
||||||
//cout << "1joiner " << i << " " << hex << (uint64_t) joiners[i].get() << dec << " -> DJS" << endl;
|
//cout << "1joiner " << i << " " << hex << (uint64_t) joiners[i].get() << dec << " -> DJS" << endl;
|
||||||
djsJoiners.push_back(joiners[i]);
|
djsJoiners.push_back(joiners[i]);
|
||||||
djsJoinerMap.push_back(i);
|
djsJoinerMap.push_back(i);
|
||||||
@ -1990,6 +2000,7 @@ void TupleHashJoinStep::segregateJoiners()
|
|||||||
for (; i < smallSideCount; i++)
|
for (; i < smallSideCount; i++)
|
||||||
{
|
{
|
||||||
joinIsTooBig = true;
|
joinIsTooBig = true;
|
||||||
|
joiners[i]->setConvertToDiskJoin();
|
||||||
//cout << "2joiner " << i << " " << hex << (uint64_t) joiners[i].get() << dec << " -> DJS" << endl;
|
//cout << "2joiner " << i << " " << hex << (uint64_t) joiners[i].get() << dec << " -> DJS" << endl;
|
||||||
djsJoiners.push_back(joiners[i]);
|
djsJoiners.push_back(joiners[i]);
|
||||||
djsJoinerMap.push_back(i);
|
djsJoinerMap.push_back(i);
|
||||||
|
Reference in New Issue
Block a user