From 29ac5fe2b27d4eeabc5b7ec7bdae84c47303673d Mon Sep 17 00:00:00 2001 From: Andrew Hutchings Date: Wed, 26 Jul 2017 11:50:26 +0100 Subject: [PATCH] MCOL-834 Cleanup BPP threads on ExeMgr disconnect If ExeMgr disconnects (such as a crash) whilst queries are being executed some BPP threads get orphaned. This patch tracks the BPP usage for each threads and cleans up appropriately. --- primitives/primproc/primitiveserver.cpp | 34 +++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 1f6145733..6ef3d7e34 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -1145,6 +1145,30 @@ struct BPPHandler { BPPHandler(PrimitiveServer* ps) : fPrimitiveServerPtr(ps) { } + // Keep a list of keys so that if connection fails we don't leave BPP + // threads lying around + std::vector bppKeys; + std::vector::iterator bppKeysIt; + + ~BPPHandler() + { + for (bppKeysIt = bppKeys.begin() ; bppKeysIt != bppKeys.end(); ++bppKeysIt) + { + uint32_t key = *bppKeysIt; + BPPMap::iterator it; + + mutex::scoped_lock scoped(bppLock); + it = bppMap.find(key); + if (it != bppMap.end()) { + it->second->abort(); + bppMap.erase(it); + } + scoped.unlock(); + fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key); + OOBPool->removeJobs(key); + } + } + struct BPPHandlerFunctor : public PriorityThreadPool::Functor { BPPHandlerFunctor(boost::shared_ptr r, SBS b) : bs(b) { @@ -1200,6 +1224,10 @@ struct BPPHandler bs.advance(sizeof(ISMPacketHeader)); bs >> key; + bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), key); + if (bppKeysIt != bppKeys.end()) { + bppKeys.erase(bppKeysIt); + } mutex::scoped_lock scoped(bppLock); it = bppMap.find(key); if (it != bppMap.end()) { @@ -1273,6 +1301,7 @@ struct BPPHandler } } key = bpp->getUniqueID(); + bppKeys.push_back(key); mutex::scoped_lock scoped(bppLock); bool newInsert; newInsert = bppMap.insert(pair(key, bppv)).second; @@ -1403,6 +1432,11 @@ struct BPPHandler mutex::scoped_lock lk(djLock); mutex::scoped_lock scoped(bppLock); + bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), uniqueID); + if (bppKeysIt != bppKeys.end()) { + bppKeys.erase(bppKeysIt); + } + it = bppMap.find(uniqueID); if (it != bppMap.end()) { boost::shared_ptr bppv = it->second;