From a98834e31c272ddda715ebf79aac7fc2f19b25ca Mon Sep 17 00:00:00 2001 From: David Hall Date: Mon, 7 Mar 2022 15:25:00 -0600 Subject: [PATCH] MCOL-4841 Fix for MCOL-5009 respondWait could be set to false while other threads were waiting. With respondWait false, okToRrespond wouldn't ever get notify_one(). Get rid of respondWait and use fProcessorPool->blockedThreadCount to determine if any threads may be waiting. --- primitives/primproc/bppsendthread.cpp | 6 +----- primitives/primproc/bppsendthread.h | 1 - utils/threadpool/prioritythreadpool.h | 6 +++++- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/primitives/primproc/bppsendthread.cpp b/primitives/primproc/bppsendthread.cpp index ba134a479..356342d3b 100644 --- a/primitives/primproc/bppsendthread.cpp +++ b/primitives/primproc/bppsendthread.cpp @@ -76,11 +76,9 @@ void BPPSendThread::sendResult(const Msg_t& msg, bool newConnection) std::unique_lock sl1(respondLock); while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die) { - respondWait = true; fProcessorPool->incBlockedThreads(); okToRespond.wait(sl1); fProcessorPool->decBlockedThreads(); - respondWait = false; } } if (die) @@ -123,11 +121,9 @@ void BPPSendThread::sendResults(const vector& msgs, bool newConnection) std::unique_lock sl1(respondLock); while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die) { - respondWait = true; fProcessorPool->incBlockedThreads(); okToRespond.wait(sl1); fProcessorPool->decBlockedThreads(); - respondWait = false; } } if (die) @@ -279,7 +275,7 @@ void BPPSendThread::mainLoop() msg[msgsSent].msg.reset(); } - if (respondWait && currentByteSize < maxByteSize) + if (fProcessorPool->blockedThreadCount() > 0 && currentByteSize < maxByteSize) { okToRespond.notify_one(); } diff --git a/primitives/primproc/bppsendthread.h b/primitives/primproc/bppsendthread.h index 8c3a3e9a1..77856ee90 100644 --- a/primitives/primproc/bppsendthread.h +++ b/primitives/primproc/bppsendthread.h @@ -118,7 +118,6 @@ class BPPSendThread std::mutex ackLock; std::condition_variable okToSend; // Condition to prevent run away queue - bool respondWait; std::mutex respondLock; std::condition_variable okToRespond; diff --git a/utils/threadpool/prioritythreadpool.h b/utils/threadpool/prioritythreadpool.h index 842d8f5c0..7608b4166 100644 --- a/utils/threadpool/prioritythreadpool.h +++ b/utils/threadpool/prioritythreadpool.h @@ -112,7 +112,11 @@ class PriorityThreadPool { blockedThreads--; } - + uint32_t blockedThreadCount() + { + return blockedThreads; + } + protected: private: struct ThreadHelper