From 3d1c49bd6bdb5c3ae2a7da0b35df61e5a6dcc007 Mon Sep 17 00:00:00 2001 From: drrtuy Date: Fri, 18 Apr 2025 12:18:07 +0000 Subject: [PATCH] fix(PP,BPP): reduced the BPPMap lock scope to avoid holding lock whilst calling heavy BPPV dtor. It became heavy b/c of CountingAllocator. --- primitives/primproc/primitiveserver.cpp | 95 +++++++++++++------------ 1 file changed, 51 insertions(+), 44 deletions(-) diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 1c93f7f29..c94294791 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -1557,60 +1557,67 @@ struct BPPHandler bs >> stepID; bs >> uniqueID; - boost::unique_lock lk(getDJLock(uniqueID)); - boost::mutex::scoped_lock scoped(bppLock); - - bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), uniqueID); - - if (bppKeysIt != bppKeys.end()) + boost::shared_ptr bppv = nullptr; { - bppKeys.erase(bppKeysIt); - } + boost::unique_lock lk(getDJLock(uniqueID)); + boost::mutex::scoped_lock scoped(bppLock); - it = bppMap.find(uniqueID); + bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), uniqueID); - if (it != bppMap.end()) - { - boost::shared_ptr bppv = it->second; - - if (bppv->joinDataReceived) + if (bppKeysIt != bppKeys.end()) { - bppv->abort(); - bppMap.erase(it); + bppKeys.erase(bppKeysIt); + } + + it = bppMap.find(uniqueID); + + if (it != bppMap.end()) + { + bppv = it->second; + + if (bppv->joinDataReceived) + { + bppMap.erase(it); + } + else + { + // MCOL-5. On ubuntu, a crash was happening. Checking + // joinDataReceived here fixes it. + // We're not ready for a destroy. Reschedule to wait + // for all joiners to arrive. + // TODO there might be no joiners if the query is canceled. + // The memory will leak. + // Rewind to the beginning of ByteStream buf b/c of the advance above. + bs.rewind(); + return -1; + } } else { - // MCOL-5. On ubuntu, a crash was happening. Checking - // joinDataReceived here fixes it. - // We're not ready for a destroy. Reschedule to wait - // for all joiners to arrive. - // TODO there might be no joiners if the query is canceled. - // The memory will leak. - // Rewind to the beginning of ByteStream buf b/c of the advance above. - bs.rewind(); - return -1; - } - } - else - { - if (posix_time::second_clock::universal_time() > dieTime) - { - cout << "destroyBPP: job for id " << uniqueID << " and sessionID " << sessionID << " has been killed." - << endl; - // If for some reason there are jobs for this uniqueID that arrived later - // they won't leave PP thread pool staying there forever. - } - else - { - bs.rewind(); - return -1; + if (posix_time::second_clock::universal_time() > dieTime) + { + cout << "destroyBPP: job for id " << uniqueID << " and sessionID " << sessionID << " has been killed." + << endl; + // If for some reason there are jobs for this uniqueID that arrived later + // they won't leave PP thread pool staying there forever. + } + else + { + bs.rewind(); + return -1; + } } + + fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(uniqueID); + fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(uniqueID); + lk.unlock(); + deleteDJLock(uniqueID); } - fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(uniqueID); - fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(uniqueID); - lk.unlock(); - deleteDJLock(uniqueID); + if (bppv) + { + bppv->abort(); + } return 0; }