1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-27 21:01:50 +03:00

MCOL-4841 Fix for MCOL-5009

respondWait could be set to false
while other threads were waiting. With respondWait false, okToRrespond
wouldn't ever get notify_one(). Get rid of respondWait and use
fProcessorPool->blockedThreadCount to determine if any threads may be
waiting.
This commit is contained in:
David Hall
2022-03-07 15:25:00 -06:00
parent 27dea733c5
commit a98834e31c
3 changed files with 6 additions and 7 deletions

View File

@ -76,11 +76,9 @@ void BPPSendThread::sendResult(const Msg_t& msg, bool newConnection)
std::unique_lock<std::mutex> sl1(respondLock); std::unique_lock<std::mutex> sl1(respondLock);
while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die) while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die)
{ {
respondWait = true;
fProcessorPool->incBlockedThreads(); fProcessorPool->incBlockedThreads();
okToRespond.wait(sl1); okToRespond.wait(sl1);
fProcessorPool->decBlockedThreads(); fProcessorPool->decBlockedThreads();
respondWait = false;
} }
} }
if (die) if (die)
@ -123,11 +121,9 @@ void BPPSendThread::sendResults(const vector<Msg_t>& msgs, bool newConnection)
std::unique_lock<std::mutex> sl1(respondLock); std::unique_lock<std::mutex> sl1(respondLock);
while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die) while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die)
{ {
respondWait = true;
fProcessorPool->incBlockedThreads(); fProcessorPool->incBlockedThreads();
okToRespond.wait(sl1); okToRespond.wait(sl1);
fProcessorPool->decBlockedThreads(); fProcessorPool->decBlockedThreads();
respondWait = false;
} }
} }
if (die) if (die)
@ -279,7 +275,7 @@ void BPPSendThread::mainLoop()
msg[msgsSent].msg.reset(); msg[msgsSent].msg.reset();
} }
if (respondWait && currentByteSize < maxByteSize) if (fProcessorPool->blockedThreadCount() > 0 && currentByteSize < maxByteSize)
{ {
okToRespond.notify_one(); okToRespond.notify_one();
} }

View File

@ -118,7 +118,6 @@ class BPPSendThread
std::mutex ackLock; std::mutex ackLock;
std::condition_variable okToSend; std::condition_variable okToSend;
// Condition to prevent run away queue // Condition to prevent run away queue
bool respondWait;
std::mutex respondLock; std::mutex respondLock;
std::condition_variable okToRespond; std::condition_variable okToRespond;

View File

@ -112,7 +112,11 @@ class PriorityThreadPool
{ {
blockedThreads--; blockedThreads--;
} }
uint32_t blockedThreadCount()
{
return blockedThreads;
}
protected: protected:
private: private:
struct ThreadHelper struct ThreadHelper