diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index b594515ce..2505ed497 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -283,7 +283,7 @@ void DistributedEngineComm::Setup() { cl->atTheSameHost(true); } - boost::shared_ptr nl(new boost::mutex()); + std::shared_ptr nl(new std::mutex()); try { @@ -411,7 +411,7 @@ void DistributedEngineComm::Listen(boost::shared_ptr client, Error: // @bug 488 - error condition! push 0 length bs to messagequeuemap and // eventually let jobstep error out. - boost::mutex::scoped_lock lk(fMlock); + std::unique_lock lk(fMlock); MessageQueueMap::iterator map_tok; sbs.reset(new ByteStream(0)); @@ -480,7 +480,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs) mqe->sendACKs = sendACKs; mqe->throttled = false; - boost::mutex::scoped_lock lk(fMlock); + std::lock_guard lk(fMlock); b = fSessionMessages.insert(pair >(key, mqe)).second; if (!b) @@ -493,7 +493,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs) void DistributedEngineComm::removeQueue(uint32_t key) { - boost::mutex::scoped_lock lk(fMlock); + std::lock_guard lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) @@ -506,7 +506,7 @@ void DistributedEngineComm::removeQueue(uint32_t key) void DistributedEngineComm::shutdownQueue(uint32_t key) { - boost::mutex::scoped_lock lk(fMlock); + std::lock_guard lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) @@ -521,7 +521,7 @@ void DistributedEngineComm::read(uint32_t key, SBS& bs) boost::shared_ptr mqe; // Find the StepMsgQueueList for this session - boost::mutex::scoped_lock lk(fMlock); + std::unique_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) @@ -540,7 +540,7 @@ void DistributedEngineComm::read(uint32_t key, SBS& bs) if (bs && mqe->sendACKs) { - boost::mutex::scoped_lock lk(ackLock); + std::unique_lock lk(ackLock); if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold) setFlowControl(false, key, mqe); @@ -560,7 +560,7 @@ const ByteStream DistributedEngineComm::read(uint32_t key) boost::shared_ptr mqe; // Find the StepMsgQueueList for this session - boost::mutex::scoped_lock lk(fMlock); + std::unique_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) @@ -578,7 +578,7 @@ const ByteStream DistributedEngineComm::read(uint32_t key) if (sbs && mqe->sendACKs) { - boost::mutex::scoped_lock lk(ackLock); + std::unique_lock lk(ackLock); if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold) setFlowControl(false, key, mqe); @@ -598,7 +598,7 @@ void DistributedEngineComm::read_all(uint32_t key, vector& v) { boost::shared_ptr mqe; - boost::mutex::scoped_lock lk(fMlock); + std::unique_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) @@ -615,7 +615,7 @@ void DistributedEngineComm::read_all(uint32_t key, vector& v) if (mqe->sendACKs) { - boost::mutex::scoped_lock lk(ackLock); + std::unique_lock lk(ackLock); sendAcks(key, v, mqe, 0); } } @@ -624,7 +624,7 @@ void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector mqe; - boost::mutex::scoped_lock lk(fMlock); + std::unique_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) @@ -645,7 +645,7 @@ void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vectorsendACKs) { - boost::mutex::scoped_lock lk(ackLock); + std::unique_lock lk(ackLock); if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold) setFlowControl(false, key, mqe); @@ -954,13 +954,12 @@ void DistributedEngineComm::write(messageqcpp::ByteStream& msg, uint32_t connect PrimitiveHeader* pm = (PrimitiveHeader*)(ism + 1); uint32_t senderID = pm->UniqueID; - boost::mutex::scoped_lock lk(fMlock, boost::defer_lock_t()); MessageQueueMap::iterator it; // This keeps mqe's stats from being freed until end of function boost::shared_ptr mqe; Stats* senderStats = NULL; - lk.lock(); + std::unique_lock lk(fMlock); it = fSessionMessages.find(senderID); if (it != fSessionMessages.end()) @@ -987,7 +986,7 @@ void DistributedEngineComm::addDataToOutput(SBS sbs) uint32_t uniqueId = p->UniqueID; boost::shared_ptr mqe; - boost::mutex::scoped_lock lk(fMlock); + std::unique_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId); // The message for a session that doesn't exist. @@ -1020,7 +1019,7 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* PrimitiveHeader* p = (PrimitiveHeader*)(hdr + 1); uint32_t uniqueId = p->UniqueID; boost::shared_ptr mqe; - boost::mutex::scoped_lock lk(fMlock); + std::unique_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId); if (map_tok == fSessionMessages.end()) @@ -1043,7 +1042,7 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* if (mqe->sendACKs) { - boost::mutex::scoped_lock lk(ackLock); + std::lock_guard lk(ackLock); uint64_t msgSize = sbs->lengthWithHdrOverhead(); if (!mqe->throttled && msgSize > (targetRecvQueueSize / 2)) @@ -1105,7 +1104,6 @@ void DistributedEngineComm::pushToTheLocalQueueAndNotifyRecv(const messageqcpp:: int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_t senderUniqueID, bool doInterleaving) { - boost::mutex::scoped_lock lk(fMlock, boost::defer_lock_t()); MessageQueueMap::iterator it; // Keep mqe's stats from being freed early boost::shared_ptr mqe; @@ -1125,7 +1123,7 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_ if (senderUniqueID != numeric_limits::max()) { - lk.lock(); + std::lock_guard lk(fMlock); it = fSessionMessages.find(senderUniqueID); if (it != fSessionMessages.end()) @@ -1135,8 +1133,6 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_ size_t pmIndex = aPMIndex % mqe->pmCount; connectionId = it->second->getNextConnectionId(pmIndex, fPmConnections.size(), fDECConnectionsPerQuery); } - - lk.unlock(); } try @@ -1146,7 +1142,7 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_ if (!client->isAvailable()) return 0; - boost::mutex::scoped_lock lk(*(fWlock[connectionId])); + std::lock_guard lk(*(fWlock[connectionId])); client->write(bs, NULL, senderStats); return 0; } @@ -1155,7 +1151,7 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_ // @bug 488. error out under such condition instead of re-trying other connection, // by pushing 0 size bytestream to messagequeue and throw exception SBS sbs; - lk.lock(); + std::unique_lock lk(fMlock); // std::cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << std::endl; MessageQueueMap::iterator map_tok; sbs.reset(new ByteStream(0)); @@ -1205,7 +1201,7 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_ uint32_t DistributedEngineComm::size(uint32_t key) { - boost::mutex::scoped_lock lk(fMlock); + std::unique_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) @@ -1238,7 +1234,7 @@ void DistributedEngineComm::removeDECEventListener(DECEventListener* l) Stats DistributedEngineComm::getNetworkStats(uint32_t uniqueID) { - boost::mutex::scoped_lock lk(fMlock); + std::lock_guard lk(fMlock); MessageQueueMap::iterator it; Stats empty; diff --git a/dbcon/joblist/distributedenginecomm.h b/dbcon/joblist/distributedenginecomm.h index ad2692d40..361180a35 100644 --- a/dbcon/joblist/distributedenginecomm.h +++ b/dbcon/joblist/distributedenginecomm.h @@ -32,16 +32,17 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include #include "bytestream.h" #include "primitivemsg.h" @@ -224,7 +225,7 @@ class DistributedEngineComm private: typedef std::vector ReaderList; - typedef std::vector > ClientList; + typedef std::vector> ClientList; // A queue of ByteStreams coming in from PrimProc heading for a JobStep typedef ThreadSafeQueue StepMsgQueue; @@ -258,7 +259,7 @@ class DistributedEngineComm }; // The mapping of session ids to StepMsgQueueLists - typedef std::map > MessageQueueMap; + typedef std::map> MessageQueueMap; explicit DistributedEngineComm(ResourceManager* rm, bool isExeMgr); @@ -283,8 +284,8 @@ class DistributedEngineComm ReaderList fPmReader; // all the reader threads for the pm servers MessageQueueMap fSessionMessages; // place to put messages from the pm server to be returned by the Read method - boost::mutex fMlock; // sessionMessages mutex - std::vector > fWlock; // PrimProc socket write mutexes + std::mutex fMlock; // sessionMessages mutex + std::vector> fWlock; // PrimProc socket write mutexes bool fBusy; volatile uint32_t pmCount; boost::mutex fOnErrMutex; // to lock function scope to reset pmconnections under error condition @@ -295,7 +296,7 @@ class DistributedEngineComm boost::mutex eventListenerLock; ClientList newClients; - std::vector > newLocks; + std::vector> newLocks; bool fIsExeMgr;