diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index f934e5e2a..ab2d5cd7b 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -1765,6 +1765,8 @@ int BatchPrimitiveProcessor::operator()() vssCache.clear(); #ifndef __FreeBSD__ + if (sendThread->aborted()) + objLock.try_lock(); objLock.unlock(); #endif freeLargeBuffers(); diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index c7d8d815c..c57479ef9 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -121,7 +121,7 @@ class BatchPrimitiveProcessor // these two functions are used by BPPV to create BPP instances // on demand. TRY not to use unlock() for anything else. - void unlock() { objLock.unlock(); } + void unlock() { objLock.try_lock(); objLock.unlock(); } bool hasJoin() { return doJoin; } private: BatchPrimitiveProcessor(); diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 6ef3d7e34..1d477be86 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #ifdef _MSC_VER #include typedef int pthread_t; @@ -1152,21 +1153,21 @@ struct BPPHandler ~BPPHandler() { + mutex::scoped_lock scoped(bppLock); for (bppKeysIt = bppKeys.begin() ; bppKeysIt != bppKeys.end(); ++bppKeysIt) { uint32_t key = *bppKeysIt; BPPMap::iterator it; - mutex::scoped_lock scoped(bppLock); it = bppMap.find(key); if (it != bppMap.end()) { it->second->abort(); bppMap.erase(it); } - scoped.unlock(); fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key); OOBPool->removeJobs(key); } + scoped.unlock(); } struct BPPHandlerFunctor : public PriorityThreadPool::Functor { @@ -1224,11 +1225,11 @@ struct BPPHandler bs.advance(sizeof(ISMPacketHeader)); bs >> key; + mutex::scoped_lock scoped(bppLock); bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), key); if (bppKeysIt != bppKeys.end()) { bppKeys.erase(bppKeysIt); } - mutex::scoped_lock scoped(bppLock); it = bppMap.find(key); if (it != bppMap.end()) { it->second->abort(); @@ -1300,9 +1301,9 @@ struct BPPHandler bppv->add(dup); } } - key = bpp->getUniqueID(); + mutex::scoped_lock scoped(bppLock); + key = bpp->getUniqueID(); bppKeys.push_back(key); - mutex::scoped_lock scoped(bppLock); bool newInsert; newInsert = bppMap.insert(pair(key, bppv)).second; //cout << "creating BPP # " << key << endl; @@ -2210,6 +2211,10 @@ boost::shared_ptr BPPV::next() void BPPV::abort() { sendThread->abort(); + BOOST_FOREACH( boost::shared_ptr bpp, v ) + { + bpp->unlock(); + } } bool BPPV::aborted()