diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 8678e1381..6f15cb7dd 100755 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -112,6 +112,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor() : { pp.setLogicalBlockMode(true); pp.setBlockPtr((int *) blockData); + pthread_mutex_init(&objLock,NULL); } BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream &b, double prefetch, @@ -153,6 +154,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream &b, double prefetch, pp.setLogicalBlockMode(true); pp.setBlockPtr((int *) blockData); sendThread = bppst; + pthread_mutex_init(&objLock, NULL); initBPP(b); // cerr << "made a BPP\n"; } @@ -175,6 +177,7 @@ BatchPrimitiveProcessor::~BatchPrimitiveProcessor() counterLock.lock(); } counterLock.unlock(); + pthread_mutex_destroy(&objLock); } /** @@ -233,7 +236,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream &bs) } if (doJoin) { - objLock.lock(); + pthread_mutex_lock(&objLock); if (ot == ROW_GROUP) { bs >> joinerCount; // cout << "joinerCount = " << joinerCount << endl; @@ -325,7 +328,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream &bs) joiner.reset(new Joiner((bool) tmp8)); } #ifdef __FreeBSD__ - objLock.unlock(); + pthread_mutex_unlock(&objLock); #endif } @@ -401,7 +404,7 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream &bs, const SP_UM_MUTEX& w, uint32_t i; vector preloads; - objLock.lock(); + pthread_mutex_lock(&objLock); writelock = w; sock = s; @@ -452,7 +455,7 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream &bs, const SP_UM_MUTEX& w, buildVSSCache(count); #ifdef __FreeBSD__ - objLock.unlock(); + pthread_mutex_unlock(&objLock); #endif } @@ -599,7 +602,7 @@ int BatchPrimitiveProcessor::endOfJoiner() #endif #ifndef __FreeBSD__ - objLock.unlock(); + pthread_mutex_unlock(&objLock); #endif return 0; } @@ -1486,7 +1489,7 @@ void BatchPrimitiveProcessor::execute() #endif #ifndef __FreeBSD__ - objLock.unlock(); + pthread_mutex_unlock(&objLock); #endif throw n; // need to pass this through to BPPSeeder } @@ -1777,12 +1780,7 @@ int BatchPrimitiveProcessor::operator()() vssCache.clear(); #ifndef __FreeBSD__ - // If we've been aborted the lock *may* have been released already - // By doing try_lock, we ensure the unlock will work whether it was - // locked or not. - if (sendThread->aborted()) - objLock.try_lock(); - objLock.unlock(); + pthread_mutex_unlock(&objLock); #endif freeLargeBuffers(); #ifdef PRIMPROC_STOPWATCH @@ -1881,7 +1879,7 @@ SBPP BatchPrimitiveProcessor::duplicate() } bpp->doJoin = doJoin; if (doJoin) { - bpp->objLock.lock(); + pthread_mutex_lock(&bpp->objLock); bpp->joinerSize = joinerSize; if (ot == ROW_GROUP) { /* There are add'l join vars, but only these are necessary for processing @@ -1921,7 +1919,7 @@ SBPP BatchPrimitiveProcessor::duplicate() else bpp->joiner = joiner; #ifdef __FreeBSD__ - bpp->objLock.unlock(); + pthread_mutex_unlock(&bpp->objLock); #endif } diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index c57479ef9..ca782b3c7 100755 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -121,7 +121,7 @@ class BatchPrimitiveProcessor // these two functions are used by BPPV to create BPP instances // on demand. TRY not to use unlock() for anything else. - void unlock() { objLock.try_lock(); objLock.unlock(); } + void unlock() { pthread_mutex_unlock(&objLock); } bool hasJoin() { return doJoin; } private: BatchPrimitiveProcessor(); @@ -200,7 +200,12 @@ class BatchPrimitiveProcessor messageqcpp::SBS serialized; SP_UM_MUTEX writelock; - boost::mutex objLock; + // MCOL-744 using pthread mutex instead of Boost mutex because + // in it is possible that this lock could be unlocked when it is + // already unlocked. In Ubuntu 16.04's Boost this triggers a + // crash. Whilst it is very hard to hit this it is still bad. + // Longer term TODO: fix/remove objLock and/or refactor BPP + pthread_mutex_t objLock; bool LBIDTrace; bool fBusy;