You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-834 Cleanup BPP threads on ExeMgr disconnect
If ExeMgr disconnects (such as a crash) whilst queries are being executed some BPP threads get orphaned. This patch tracks the BPP usage for each threads and cleans up appropriately.
This commit is contained in:
@ -1145,6 +1145,30 @@ struct BPPHandler
|
|||||||
{
|
{
|
||||||
BPPHandler(PrimitiveServer* ps) : fPrimitiveServerPtr(ps) { }
|
BPPHandler(PrimitiveServer* ps) : fPrimitiveServerPtr(ps) { }
|
||||||
|
|
||||||
|
// Keep a list of keys so that if connection fails we don't leave BPP
|
||||||
|
// threads lying around
|
||||||
|
std::vector<uint32_t> bppKeys;
|
||||||
|
std::vector<uint32_t>::iterator bppKeysIt;
|
||||||
|
|
||||||
|
~BPPHandler()
|
||||||
|
{
|
||||||
|
for (bppKeysIt = bppKeys.begin() ; bppKeysIt != bppKeys.end(); ++bppKeysIt)
|
||||||
|
{
|
||||||
|
uint32_t key = *bppKeysIt;
|
||||||
|
BPPMap::iterator it;
|
||||||
|
|
||||||
|
mutex::scoped_lock scoped(bppLock);
|
||||||
|
it = bppMap.find(key);
|
||||||
|
if (it != bppMap.end()) {
|
||||||
|
it->second->abort();
|
||||||
|
bppMap.erase(it);
|
||||||
|
}
|
||||||
|
scoped.unlock();
|
||||||
|
fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key);
|
||||||
|
OOBPool->removeJobs(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct BPPHandlerFunctor : public PriorityThreadPool::Functor {
|
struct BPPHandlerFunctor : public PriorityThreadPool::Functor {
|
||||||
BPPHandlerFunctor(boost::shared_ptr<BPPHandler> r, SBS b) : bs(b)
|
BPPHandlerFunctor(boost::shared_ptr<BPPHandler> r, SBS b) : bs(b)
|
||||||
{
|
{
|
||||||
@ -1200,6 +1224,10 @@ struct BPPHandler
|
|||||||
|
|
||||||
bs.advance(sizeof(ISMPacketHeader));
|
bs.advance(sizeof(ISMPacketHeader));
|
||||||
bs >> key;
|
bs >> key;
|
||||||
|
bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), key);
|
||||||
|
if (bppKeysIt != bppKeys.end()) {
|
||||||
|
bppKeys.erase(bppKeysIt);
|
||||||
|
}
|
||||||
mutex::scoped_lock scoped(bppLock);
|
mutex::scoped_lock scoped(bppLock);
|
||||||
it = bppMap.find(key);
|
it = bppMap.find(key);
|
||||||
if (it != bppMap.end()) {
|
if (it != bppMap.end()) {
|
||||||
@ -1273,6 +1301,7 @@ struct BPPHandler
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
key = bpp->getUniqueID();
|
key = bpp->getUniqueID();
|
||||||
|
bppKeys.push_back(key);
|
||||||
mutex::scoped_lock scoped(bppLock);
|
mutex::scoped_lock scoped(bppLock);
|
||||||
bool newInsert;
|
bool newInsert;
|
||||||
newInsert = bppMap.insert(pair<uint32_t, SBPPV>(key, bppv)).second;
|
newInsert = bppMap.insert(pair<uint32_t, SBPPV>(key, bppv)).second;
|
||||||
@ -1403,6 +1432,11 @@ struct BPPHandler
|
|||||||
mutex::scoped_lock lk(djLock);
|
mutex::scoped_lock lk(djLock);
|
||||||
mutex::scoped_lock scoped(bppLock);
|
mutex::scoped_lock scoped(bppLock);
|
||||||
|
|
||||||
|
bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), uniqueID);
|
||||||
|
if (bppKeysIt != bppKeys.end()) {
|
||||||
|
bppKeys.erase(bppKeysIt);
|
||||||
|
}
|
||||||
|
|
||||||
it = bppMap.find(uniqueID);
|
it = bppMap.find(uniqueID);
|
||||||
if (it != bppMap.end()) {
|
if (it != bppMap.end()) {
|
||||||
boost::shared_ptr<BPPV> bppv = it->second;
|
boost::shared_ptr<BPPV> bppv = it->second;
|
||||||
|
Reference in New Issue
Block a user