1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-08 14:22:09 +03:00

fix(PP,BPP): reduced the BPPMap lock scope to avoid holding lock whilst calling heavy BPPV dtor. It became heavy b/c of CountingAllocator.

This commit is contained in:
drrtuy
2025-04-18 12:18:07 +00:00
parent e34f7d2530
commit 3d1c49bd6b

View File

@@ -1557,60 +1557,67 @@ struct BPPHandler
bs >> stepID; bs >> stepID;
bs >> uniqueID; bs >> uniqueID;
boost::unique_lock<shared_mutex> lk(getDJLock(uniqueID)); boost::shared_ptr<BPPV> bppv = nullptr;
boost::mutex::scoped_lock scoped(bppLock);
bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), uniqueID);
if (bppKeysIt != bppKeys.end())
{ {
bppKeys.erase(bppKeysIt); boost::unique_lock<shared_mutex> lk(getDJLock(uniqueID));
} boost::mutex::scoped_lock scoped(bppLock);
it = bppMap.find(uniqueID); bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), uniqueID);
if (it != bppMap.end()) if (bppKeysIt != bppKeys.end())
{
boost::shared_ptr<BPPV> bppv = it->second;
if (bppv->joinDataReceived)
{ {
bppv->abort(); bppKeys.erase(bppKeysIt);
bppMap.erase(it); }
it = bppMap.find(uniqueID);
if (it != bppMap.end())
{
bppv = it->second;
if (bppv->joinDataReceived)
{
bppMap.erase(it);
}
else
{
// MCOL-5. On ubuntu, a crash was happening. Checking
// joinDataReceived here fixes it.
// We're not ready for a destroy. Reschedule to wait
// for all joiners to arrive.
// TODO there might be no joiners if the query is canceled.
// The memory will leak.
// Rewind to the beginning of ByteStream buf b/c of the advance above.
bs.rewind();
return -1;
}
} }
else else
{ {
// MCOL-5. On ubuntu, a crash was happening. Checking if (posix_time::second_clock::universal_time() > dieTime)
// joinDataReceived here fixes it. {
// We're not ready for a destroy. Reschedule to wait cout << "destroyBPP: job for id " << uniqueID << " and sessionID " << sessionID << " has been killed."
// for all joiners to arrive. << endl;
// TODO there might be no joiners if the query is canceled. // If for some reason there are jobs for this uniqueID that arrived later
// The memory will leak. // they won't leave PP thread pool staying there forever.
// Rewind to the beginning of ByteStream buf b/c of the advance above. }
bs.rewind(); else
return -1; {
} bs.rewind();
} return -1;
else }
{
if (posix_time::second_clock::universal_time() > dieTime)
{
cout << "destroyBPP: job for id " << uniqueID << " and sessionID " << sessionID << " has been killed."
<< endl;
// If for some reason there are jobs for this uniqueID that arrived later
// they won't leave PP thread pool staying there forever.
}
else
{
bs.rewind();
return -1;
} }
fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(uniqueID);
fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(uniqueID);
lk.unlock();
deleteDJLock(uniqueID);
} }
fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(uniqueID); if (bppv)
fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(uniqueID); {
lk.unlock(); bppv->abort();
deleteDJLock(uniqueID); }
return 0; return 0;
} }