1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-5264 This patch replaces boost mutex locks with std analogs

boost::uniqie_lock dtor calls a fancy unlock logic that throws twice.
First if the mutex is 0 and second lock doesn't own the mutex.
The first condition failure causes unhandled exception for one of the clients
in DEC::writeToClient(). I was unable to find out why Linux can have a 0
mutex and replaced boost::mutex with std::mutex b/c stdlibc++ should
be more stable comparing with boost.
This commit is contained in:
Roman Nozdrin
2022-11-24 17:09:44 +00:00
parent 93a2e7eb8b
commit bfbe5bf315
2 changed files with 35 additions and 38 deletions

View File

@ -283,7 +283,7 @@ void DistributedEngineComm::Setup()
{
cl->atTheSameHost(true);
}
boost::shared_ptr<boost::mutex> nl(new boost::mutex());
std::shared_ptr<std::mutex> nl(new std::mutex());
try
{
@ -411,7 +411,7 @@ void DistributedEngineComm::Listen(boost::shared_ptr<MessageQueueClient> client,
Error:
// @bug 488 - error condition! push 0 length bs to messagequeuemap and
// eventually let jobstep error out.
boost::mutex::scoped_lock lk(fMlock);
std::unique_lock lk(fMlock);
MessageQueueMap::iterator map_tok;
sbs.reset(new ByteStream(0));
@ -480,7 +480,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs)
mqe->sendACKs = sendACKs;
mqe->throttled = false;
boost::mutex::scoped_lock lk(fMlock);
std::lock_guard lk(fMlock);
b = fSessionMessages.insert(pair<uint32_t, boost::shared_ptr<MQE> >(key, mqe)).second;
if (!b)
@ -493,7 +493,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs)
void DistributedEngineComm::removeQueue(uint32_t key)
{
boost::mutex::scoped_lock lk(fMlock);
std::lock_guard lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end())
@ -506,7 +506,7 @@ void DistributedEngineComm::removeQueue(uint32_t key)
void DistributedEngineComm::shutdownQueue(uint32_t key)
{
boost::mutex::scoped_lock lk(fMlock);
std::lock_guard lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end())
@ -521,7 +521,7 @@ void DistributedEngineComm::read(uint32_t key, SBS& bs)
boost::shared_ptr<MQE> mqe;
// Find the StepMsgQueueList for this session
boost::mutex::scoped_lock lk(fMlock);
std::unique_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end())
@ -540,7 +540,7 @@ void DistributedEngineComm::read(uint32_t key, SBS& bs)
if (bs && mqe->sendACKs)
{
boost::mutex::scoped_lock lk(ackLock);
std::unique_lock lk(ackLock);
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
setFlowControl(false, key, mqe);
@ -560,7 +560,7 @@ const ByteStream DistributedEngineComm::read(uint32_t key)
boost::shared_ptr<MQE> mqe;
// Find the StepMsgQueueList for this session
boost::mutex::scoped_lock lk(fMlock);
std::unique_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end())
@ -578,7 +578,7 @@ const ByteStream DistributedEngineComm::read(uint32_t key)
if (sbs && mqe->sendACKs)
{
boost::mutex::scoped_lock lk(ackLock);
std::unique_lock lk(ackLock);
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
setFlowControl(false, key, mqe);
@ -598,7 +598,7 @@ void DistributedEngineComm::read_all(uint32_t key, vector<SBS>& v)
{
boost::shared_ptr<MQE> mqe;
boost::mutex::scoped_lock lk(fMlock);
std::unique_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end())
@ -615,7 +615,7 @@ void DistributedEngineComm::read_all(uint32_t key, vector<SBS>& v)
if (mqe->sendACKs)
{
boost::mutex::scoped_lock lk(ackLock);
std::unique_lock lk(ackLock);
sendAcks(key, v, mqe, 0);
}
}
@ -624,7 +624,7 @@ void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector<SBS
{
boost::shared_ptr<MQE> mqe;
boost::mutex::scoped_lock lk(fMlock);
std::unique_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end())
@ -645,7 +645,7 @@ void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector<SBS
if (mqe->sendACKs)
{
boost::mutex::scoped_lock lk(ackLock);
std::unique_lock lk(ackLock);
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
setFlowControl(false, key, mqe);
@ -954,13 +954,12 @@ void DistributedEngineComm::write(messageqcpp::ByteStream& msg, uint32_t connect
PrimitiveHeader* pm = (PrimitiveHeader*)(ism + 1);
uint32_t senderID = pm->UniqueID;
boost::mutex::scoped_lock lk(fMlock, boost::defer_lock_t());
MessageQueueMap::iterator it;
// This keeps mqe's stats from being freed until end of function
boost::shared_ptr<MQE> mqe;
Stats* senderStats = NULL;
lk.lock();
std::unique_lock lk(fMlock);
it = fSessionMessages.find(senderID);
if (it != fSessionMessages.end())
@ -987,7 +986,7 @@ void DistributedEngineComm::addDataToOutput(SBS sbs)
uint32_t uniqueId = p->UniqueID;
boost::shared_ptr<MQE> mqe;
boost::mutex::scoped_lock lk(fMlock);
std::unique_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId);
// The message for a session that doesn't exist.
@ -1020,7 +1019,7 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats*
PrimitiveHeader* p = (PrimitiveHeader*)(hdr + 1);
uint32_t uniqueId = p->UniqueID;
boost::shared_ptr<MQE> mqe;
boost::mutex::scoped_lock lk(fMlock);
std::unique_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId);
if (map_tok == fSessionMessages.end())
@ -1043,7 +1042,7 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats*
if (mqe->sendACKs)
{
boost::mutex::scoped_lock lk(ackLock);
std::lock_guard lk(ackLock);
uint64_t msgSize = sbs->lengthWithHdrOverhead();
if (!mqe->throttled && msgSize > (targetRecvQueueSize / 2))
@ -1105,7 +1104,6 @@ void DistributedEngineComm::pushToTheLocalQueueAndNotifyRecv(const messageqcpp::
int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_t senderUniqueID,
bool doInterleaving)
{
boost::mutex::scoped_lock lk(fMlock, boost::defer_lock_t());
MessageQueueMap::iterator it;
// Keep mqe's stats from being freed early
boost::shared_ptr<MQE> mqe;
@ -1125,7 +1123,7 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_
if (senderUniqueID != numeric_limits<uint32_t>::max())
{
lk.lock();
std::lock_guard lk(fMlock);
it = fSessionMessages.find(senderUniqueID);
if (it != fSessionMessages.end())
@ -1135,8 +1133,6 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_
size_t pmIndex = aPMIndex % mqe->pmCount;
connectionId = it->second->getNextConnectionId(pmIndex, fPmConnections.size(), fDECConnectionsPerQuery);
}
lk.unlock();
}
try
@ -1146,7 +1142,7 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_
if (!client->isAvailable())
return 0;
boost::mutex::scoped_lock lk(*(fWlock[connectionId]));
std::lock_guard lk(*(fWlock[connectionId]));
client->write(bs, NULL, senderStats);
return 0;
}
@ -1155,7 +1151,7 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_
// @bug 488. error out under such condition instead of re-trying other connection,
// by pushing 0 size bytestream to messagequeue and throw exception
SBS sbs;
lk.lock();
std::unique_lock lk(fMlock);
// std::cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << std::endl;
MessageQueueMap::iterator map_tok;
sbs.reset(new ByteStream(0));
@ -1205,7 +1201,7 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_
uint32_t DistributedEngineComm::size(uint32_t key)
{
boost::mutex::scoped_lock lk(fMlock);
std::unique_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end())
@ -1238,7 +1234,7 @@ void DistributedEngineComm::removeDECEventListener(DECEventListener* l)
Stats DistributedEngineComm::getNetworkStats(uint32_t uniqueID)
{
boost::mutex::scoped_lock lk(fMlock);
std::lock_guard lk(fMlock);
MessageQueueMap::iterator it;
Stats empty;

View File

@ -32,16 +32,17 @@
#pragma once
#include <ifaddrs.h>
#include <condition_variable>
#include <iostream>
#include <vector>
#include <queue>
#include <string>
#include <map>
#include <boost/thread.hpp>
#include <boost/thread/condition.hpp>
#include <boost/scoped_array.hpp>
#include <condition_variable>
#include <ifaddrs.h>
#include <iostream>
#include <map>
#include <mutex>
#include <string>
#include <queue>
#include <vector>
#include "bytestream.h"
#include "primitivemsg.h"
@ -224,7 +225,7 @@ class DistributedEngineComm
private:
typedef std::vector<boost::thread*> ReaderList;
typedef std::vector<boost::shared_ptr<messageqcpp::MessageQueueClient> > ClientList;
typedef std::vector<boost::shared_ptr<messageqcpp::MessageQueueClient>> ClientList;
// A queue of ByteStreams coming in from PrimProc heading for a JobStep
typedef ThreadSafeQueue<messageqcpp::SBS> StepMsgQueue;
@ -258,7 +259,7 @@ class DistributedEngineComm
};
// The mapping of session ids to StepMsgQueueLists
typedef std::map<unsigned, boost::shared_ptr<MQE> > MessageQueueMap;
typedef std::map<unsigned, boost::shared_ptr<MQE>> MessageQueueMap;
explicit DistributedEngineComm(ResourceManager* rm, bool isExeMgr);
@ -283,8 +284,8 @@ class DistributedEngineComm
ReaderList fPmReader; // all the reader threads for the pm servers
MessageQueueMap
fSessionMessages; // place to put messages from the pm server to be returned by the Read method
boost::mutex fMlock; // sessionMessages mutex
std::vector<boost::shared_ptr<boost::mutex> > fWlock; // PrimProc socket write mutexes
std::mutex fMlock; // sessionMessages mutex
std::vector<std::shared_ptr<std::mutex>> fWlock; // PrimProc socket write mutexes
bool fBusy;
volatile uint32_t pmCount;
boost::mutex fOnErrMutex; // to lock function scope to reset pmconnections under error condition
@ -295,7 +296,7 @@ class DistributedEngineComm
boost::mutex eventListenerLock;
ClientList newClients;
std::vector<boost::shared_ptr<boost::mutex> > newLocks;
std::vector<std::shared_ptr<std::mutex>> newLocks;
bool fIsExeMgr;