diff --git a/primitives/primproc/bppsendthread.cpp b/primitives/primproc/bppsendthread.cpp index 580598dfc..11b00ec6e 100644 --- a/primitives/primproc/bppsendthread.cpp +++ b/primitives/primproc/bppsendthread.cpp @@ -75,11 +75,9 @@ void BPPSendThread::sendResult(const Msg_t& msg, bool newConnection) std::unique_lock sl1(respondLock); while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die) { - respondWait = true; fProcessorPool->incBlockedThreads(); okToRespond.wait(sl1); fProcessorPool->decBlockedThreads(); - respondWait = false; } } if (die) @@ -122,11 +120,9 @@ void BPPSendThread::sendResults(const vector& msgs, bool newConnection) std::unique_lock sl1(respondLock); while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die) { - respondWait = true; fProcessorPool->incBlockedThreads(); okToRespond.wait(sl1); fProcessorPool->decBlockedThreads(); - respondWait = false; } } if (die) @@ -278,7 +274,7 @@ void BPPSendThread::mainLoop() msg[msgsSent].msg.reset(); } - if (respondWait && currentByteSize < maxByteSize) + if (fProcessorPool->blockedThreadCount() > 0 && currentByteSize < maxByteSize) { okToRespond.notify_one(); } diff --git a/primitives/primproc/bppsendthread.h b/primitives/primproc/bppsendthread.h index deb6ff3d6..17994617a 100644 --- a/primitives/primproc/bppsendthread.h +++ b/primitives/primproc/bppsendthread.h @@ -120,7 +120,6 @@ class BPPSendThread std::mutex ackLock; std::condition_variable okToSend; // Condition to prevent run away queue - bool respondWait; std::mutex respondLock; std::condition_variable okToRespond; diff --git a/utils/threadpool/prioritythreadpool.h b/utils/threadpool/prioritythreadpool.h index ac5b74821..c37990416 100644 --- a/utils/threadpool/prioritythreadpool.h +++ b/utils/threadpool/prioritythreadpool.h @@ -113,6 +113,10 @@ class PriorityThreadPool { blockedThreads--; } + uint32_t blockedThreadCount() + { + return blockedThreads; + } protected: private: