You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-07 03:22:57 +03:00
MCOL-5009 fix deadlock
respondWait could be set to false while other threads were waiting. With respondWait false, okToRrespond wouldn't ever get notify_one(). Get rid of respondWait and use fProcessorPool->blockedThreadCount to determine if any threads may be waiting.
This commit is contained in:
@@ -75,11 +75,9 @@ void BPPSendThread::sendResult(const Msg_t& msg, bool newConnection)
|
|||||||
std::unique_lock<std::mutex> sl1(respondLock);
|
std::unique_lock<std::mutex> sl1(respondLock);
|
||||||
while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die)
|
while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die)
|
||||||
{
|
{
|
||||||
respondWait = true;
|
|
||||||
fProcessorPool->incBlockedThreads();
|
fProcessorPool->incBlockedThreads();
|
||||||
okToRespond.wait(sl1);
|
okToRespond.wait(sl1);
|
||||||
fProcessorPool->decBlockedThreads();
|
fProcessorPool->decBlockedThreads();
|
||||||
respondWait = false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (die)
|
if (die)
|
||||||
@@ -122,11 +120,9 @@ void BPPSendThread::sendResults(const vector<Msg_t>& msgs, bool newConnection)
|
|||||||
std::unique_lock<std::mutex> sl1(respondLock);
|
std::unique_lock<std::mutex> sl1(respondLock);
|
||||||
while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die)
|
while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die)
|
||||||
{
|
{
|
||||||
respondWait = true;
|
|
||||||
fProcessorPool->incBlockedThreads();
|
fProcessorPool->incBlockedThreads();
|
||||||
okToRespond.wait(sl1);
|
okToRespond.wait(sl1);
|
||||||
fProcessorPool->decBlockedThreads();
|
fProcessorPool->decBlockedThreads();
|
||||||
respondWait = false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (die)
|
if (die)
|
||||||
@@ -278,7 +274,7 @@ void BPPSendThread::mainLoop()
|
|||||||
msg[msgsSent].msg.reset();
|
msg[msgsSent].msg.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (respondWait && currentByteSize < maxByteSize)
|
if (fProcessorPool->blockedThreadCount() > 0 && currentByteSize < maxByteSize)
|
||||||
{
|
{
|
||||||
okToRespond.notify_one();
|
okToRespond.notify_one();
|
||||||
}
|
}
|
||||||
|
@@ -120,7 +120,6 @@ class BPPSendThread
|
|||||||
std::mutex ackLock;
|
std::mutex ackLock;
|
||||||
std::condition_variable okToSend;
|
std::condition_variable okToSend;
|
||||||
// Condition to prevent run away queue
|
// Condition to prevent run away queue
|
||||||
bool respondWait;
|
|
||||||
std::mutex respondLock;
|
std::mutex respondLock;
|
||||||
std::condition_variable okToRespond;
|
std::condition_variable okToRespond;
|
||||||
|
|
||||||
|
@@ -113,6 +113,10 @@ class PriorityThreadPool
|
|||||||
{
|
{
|
||||||
blockedThreads--;
|
blockedThreads--;
|
||||||
}
|
}
|
||||||
|
uint32_t blockedThreadCount()
|
||||||
|
{
|
||||||
|
return blockedThreads;
|
||||||
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
private:
|
private:
|
||||||
|
Reference in New Issue
Block a user