diff --git a/dbcon/joblist/primitivestep.h b/dbcon/joblist/primitivestep.h index 9c24548e3..be4e5cee0 100644 --- a/dbcon/joblist/primitivestep.h +++ b/dbcon/joblist/primitivestep.h @@ -1099,7 +1099,7 @@ private: uint32_t fExtentsPerSegFile;//config num of Extents Per Segment File boost::shared_ptr cThread; //consumer thread boost::shared_ptr pThread; //producer thread - boost::mutex mutex; + boost::mutex tplMutex; boost::mutex dlMutex; boost::mutex cpMutex; boost::mutex serializeJoinerMutex; diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index 023a0b72d..adb2c5d82 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -1147,9 +1147,9 @@ void TupleBPS::join() { if (msgsRecvd < msgsSent) { // wake up the sending thread, it should drain the input dl and exit - mutex.lock(); + boost::unique_lock tplLock(tplMutex); condvarWakeupProducer.notify_all(); - mutex.unlock(); + tplLock.unlock(); } if (pThread) @@ -1291,21 +1291,22 @@ void TupleBPS::interleaveJobs(vector *jobs) const void TupleBPS::sendJobs(const vector &jobs) { uint32_t i; + boost::unique_lock tplLock(tplMutex, boost::defer_lock); for (i = 0; i < jobs.size() && !cancelled(); i++) { //cout << "sending a job for dbroot " << jobs[i].dbroot << ", PM " << jobs[i].connectionNum << endl; fDec->write(uniqueID, *(jobs[i].msg)); - mutex.lock(); + tplLock.lock(); msgsSent += jobs[i].expectedResponses; if (recvWaiting) condvar.notify_all(); while ((msgsSent - msgsRecvd > fMaxOutstandingRequests << LOGICAL_EXTENT_CONVERTER) && !fDie) { sendWaiting = true; - condvarWakeupProducer.wait(mutex); + condvarWakeupProducer.wait(tplLock); sendWaiting = false; } - mutex.unlock(); + tplLock.unlock(); } } @@ -1695,10 +1696,10 @@ void TupleBPS::sendPrimitiveMessages() } abort: - mutex.lock(); + boost::unique_lock tplLock(tplMutex); finishedSending = true; condvar.notify_all(); - mutex.unlock(); + tplLock.unlock(); } struct _CPInfo { @@ -1763,6 +1764,7 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID) StepTeleStats sts; sts.query_uuid = fQueryUuid; sts.step_uuid = fStepUuid; + boost::unique_lock tplLock(tplMutex, boost::defer_lock); try { @@ -1853,13 +1855,13 @@ try #endif } - mutex.lock(); + tplLock.lock(); while (1) { // sync with the send side while (!finishedSending && msgsSent == msgsRecvd) { recvWaiting++; - condvar.wait(mutex); + condvar.wait(tplLock); recvWaiting--; } @@ -1920,13 +1922,13 @@ try break; } if (size == 0) { - mutex.unlock(); + tplLock.unlock(); usleep(2000 * fNumThreads); - mutex.lock(); + tplLock.lock(); continue; } - mutex.unlock(); + tplLock.unlock(); // cout << "thread " << threadID << " has " << size << " Bytestreams\n"; for (i = 0; i < size && !cancelled(); i++) { @@ -1938,7 +1940,7 @@ try if (bs->length() == 0 || hdr->Status > 0) { /* PM errors mean this should abort right away instead of draining the PM backlog */ - mutex.lock(); + tplLock.lock(); if (bs->length() == 0) { errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_PRIMPROC_DOWN)); @@ -2129,7 +2131,7 @@ try } cpv.clear(); - mutex.lock(); + tplLock.lock(); if (fOid >= 3000) { @@ -2162,7 +2164,7 @@ out: if (++recvExited == fNumThreads) { if (doJoin && smallOuterJoiner != -1 && !cancelled()) { - mutex.unlock(); + tplLock.unlock(); /* If this was a left outer join, this needs to put the unmatched rows from the joiner into the output XXXPAT: This might be a problem if later steps depend @@ -2220,7 +2222,7 @@ out: else rgDataToDl(joinedData, local_outputRG, dlp); } - mutex.unlock(); + tplLock.lock(); } if (traceOn() && fOid>=3000) { @@ -2245,7 +2247,7 @@ out: BPPIsAllocated = false; } } - // catch and do nothing. Let it continues with the clean up and profiling + // catch and do nothing. Let it continue with the clean up and profiling catch (const std::exception& e) { cerr << "tuple-bps caught: " << e.what() << endl; @@ -2268,7 +2270,7 @@ out: fPhysicalIO += physIO_Thread; fCacheIO += cachedIO_Thread; fBlockTouched += touchedBlocks_Thread; - mutex.unlock(); + tplLock.unlock(); if (fTableOid >= 3000 && lastThread) {