From f6c479ab40c5b09bd230a35bdca8cde83ca03082 Mon Sep 17 00:00:00 2001 From: Leonid Fedorov Date: Mon, 27 Oct 2025 15:40:05 +0000 Subject: [PATCH] fix(correctness): replace volatiles with atomics --- dbcon/joblist/distributedenginecomm.cpp | 44 +++++++-------- dbcon/joblist/distributedenginecomm.h | 11 ++-- dbcon/joblist/fifo.h | 3 +- dbcon/joblist/joblist.cpp | 10 ++-- dbcon/joblist/joblist.h | 7 ++- dbcon/joblist/jobstep.cpp | 41 ++++++++++++++ dbcon/joblist/jobstep.h | 7 ++- dbcon/joblist/largehashjoin.h | 15 ++--- dbcon/joblist/primitivestep.h | 2 +- dbcon/joblist/threadsafequeue.h | 3 +- dbcon/joblist/tuple-bps.cpp | 6 +- dbcon/joblist/tuplehashjoin.cpp | 2 +- primitives/primproc/bppsendthread.cpp | 56 +++++++++---------- primitives/primproc/bppsendthread.h | 13 +++-- primitives/primproc/primitiveserver.cpp | 26 ++++----- storage-manager/src/Ownership.cpp | 18 +++--- storage-manager/src/Ownership.h | 5 +- utils/cacheutils/cacheutils.cpp | 11 ++-- utils/querytele/queryteleprotoimpl.cpp | 9 +-- versioning/BRM/masterdbrmnode.h | 6 +- versioning/BRM/sessionmanagerserver.cpp | 2 +- versioning/BRM/sessionmanagerserver.h | 13 +++-- writeengine/bulk/we_bulkstatus.cpp | 2 +- writeengine/bulk/we_bulkstatus.h | 10 ++-- writeengine/bulk/we_columninfo.cpp | 2 +- writeengine/bulk/we_columninfo.h | 6 +- writeengine/bulk/we_tableinfo.h | 11 ++-- writeengine/client/we_clients.cpp | 15 ++--- writeengine/client/we_clients.h | 10 ++-- .../we_redistributecontrolthread.cpp | 4 +- .../we_redistributecontrolthread.h | 4 +- .../we_redistributeworkerthread.cpp | 4 +- .../we_redistributeworkerthread.h | 6 +- writeengine/server/we_getfilesizes.h | 9 ++- writeengine/shared/we_brm.cpp | 19 +++---- writeengine/shared/we_brm.h | 2 +- 36 files changed, 233 insertions(+), 181 deletions(-) diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index da3a9b2d3..d417cebed 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -219,7 +219,7 @@ void DistributedEngineComm::reset() } DistributedEngineComm::DistributedEngineComm(ResourceManager* rm, bool isExeMgr) - : fRm(rm), pmCount(0), fIsExeMgr(isExeMgr) + : fRm(rm), pmCount{0}, fIsExeMgr(isExeMgr) { if (fIsExeMgr) { @@ -369,9 +369,8 @@ int32_t DistributedEngineComm::Setup() fWlock.swap(newLocks); fPmConnections.swap(newClients); - // memory barrier to prevent the pmCount assignment migrating upward - atomicops::atomicMb(); - pmCount = newPmCount; + // atomic store with release semantics ensures all previous writes are visible + pmCount.store(newPmCount, std::memory_order_release); newLocks.clear(); newClients.clear(); @@ -432,18 +431,18 @@ Error: for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) { map_tok->second->queue.clear(); - (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]); + map_tok->second->unackedWork[0].fetch_add(1, std::memory_order_relaxed); map_tok->second->queue.push(sbs); } lk.unlock(); if (fIsExeMgr) { - decltype(pmCount) originalPMCount = pmCount; + uint32_t originalPMCount = pmCount.load(std::memory_order_relaxed); // Re-establish if a remote PM restarted. std::this_thread::sleep_for(std::chrono::seconds(3)); auto rc = Setup(); - if (rc || originalPMCount != pmCount) + if (rc || originalPMCount != pmCount.load(std::memory_order_relaxed)) { ostringstream os; os << "DEC: lost connection to " << client->addr2String(); @@ -461,9 +460,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs) condition* cond = new condition(); uint32_t firstPMInterleavedConnectionId = key % (fPmConnections.size() / pmCount) * fDECConnectionsPerQuery * pmCount % fPmConnections.size(); - boost::shared_ptr mqe(new MQE(pmCount, firstPMInterleavedConnectionId, flowControlEnableBytesThresh)); - - mqe->queue = StepMsgQueue(lock, cond); + boost::shared_ptr mqe(new MQE(pmCount, firstPMInterleavedConnectionId, flowControlEnableBytesThresh, lock, cond)); mqe->sendACKs = sendACKs; mqe->throttled = false; @@ -757,7 +754,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, uint64_t totalUnackedWork = 0; for (uint32_t i = 0; i < pmCount; ++i) - totalUnackedWork += mqe->unackedWork[i]; + totalUnackedWork += mqe->unackedWork[i].load(std::memory_order_relaxed); if (totalUnackedWork == 0) { @@ -784,9 +781,9 @@ void DistributedEngineComm::nextPMToACK(boost::shared_ptr mqe, uint32_t max * the locking env, mqe->unackedWork can only grow; whatever gets latched in this fcn * is a safe minimum at the point of use. */ - if (mqe->unackedWork[nextIndex] >= maxAck) + if (mqe->unackedWork[nextIndex].load(std::memory_order_relaxed) >= maxAck) { - (void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], maxAck); + mqe->unackedWork[nextIndex].fetch_sub(maxAck, std::memory_order_relaxed); *sockIndex = nextIndex; // FIXME: we're going to truncate here from 32 to 16 bits. Hopefully this will always fit... *numToAck = maxAck; @@ -800,12 +797,12 @@ void DistributedEngineComm::nextPMToACK(boost::shared_ptr mqe, uint32_t max { for (int i = pmCount - 1; i >= 0; --i) { - uint32_t curVal = mqe->unackedWork[nextIndex]; + uint32_t curVal = mqe->unackedWork[nextIndex].load(std::memory_order_relaxed); uint32_t unackedWork = (curVal > maxAck ? maxAck : curVal); if (unackedWork > 0) { - (void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], unackedWork); + mqe->unackedWork[nextIndex].fetch_sub(unackedWork, std::memory_order_relaxed); *sockIndex = nextIndex; *numToAck = unackedWork; @@ -822,7 +819,7 @@ void DistributedEngineComm::nextPMToACK(boost::shared_ptr mqe, uint32_t max cerr << "DEC::nextPMToACK(): Couldn't find a PM to ACK! "; for (int i = pmCount - 1; i >= 0; --i) - cerr << mqe->unackedWork[i] << " "; + cerr << mqe->unackedWork[i].load(std::memory_order_relaxed) << " "; cerr << " max: " << maxAck; cerr << endl; @@ -986,7 +983,7 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* if (pmCount > 0) { - (void)atomicops::atomicInc(&mqe->unackedWork[connIndex % pmCount]); + mqe->unackedWork[connIndex % pmCount].fetch_add(1, std::memory_order_relaxed); } TSQSize_t queueSize = mqe->queue.push(sbs); @@ -1108,7 +1105,7 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_ for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) { map_tok->second->queue.clear(); - (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]); + map_tok->second->unackedWork[0].fetch_add(1, std::memory_order_relaxed); map_tok->second->queue.push(sbs); } @@ -1153,7 +1150,8 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_ } if (tempConns.size() == fPmConnections.size()) return 0; fPmConnections.swap(tempConns); - pmCount = (pmCount == 0 ? 0 : pmCount - 1); + uint32_t oldCount = pmCount.load(std::memory_order_relaxed); + pmCount.store((oldCount == 0 ? 0 : oldCount - 1), std::memory_order_relaxed); } // send alarm ALARMManager alarmMgr; @@ -1219,12 +1217,12 @@ Stats DistributedEngineComm::getNetworkStats(uint32_t uniqueID) } DistributedEngineComm::MQE::MQE(const uint32_t pCount, const uint32_t initialInterleaverValue, - const uint64_t flowControlEnableBytesThresh) - : ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false), targetQueueSize(flowControlEnableBytesThresh) + const uint64_t flowControlEnableBytesThresh, + boost::mutex* lock, boost::condition* cond) + : queue(lock, cond), ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false), targetQueueSize(flowControlEnableBytesThresh), unackedWork(pCount) { - unackedWork.reset(new volatile uint32_t[pmCount]); + // unackedWork vector is default-initialized to 0 interleaver.reset(new uint32_t[pmCount]); - memset((void*)unackedWork.get(), 0, pmCount * sizeof(uint32_t)); uint32_t interleaverValue = initialInterleaverValue; initialConnectionId = initialInterleaverValue; for (size_t pmId = 0; pmId < pmCount; ++pmId) diff --git a/dbcon/joblist/distributedenginecomm.h b/dbcon/joblist/distributedenginecomm.h index 2c6c6ac86..6928abc85 100644 --- a/dbcon/joblist/distributedenginecomm.h +++ b/dbcon/joblist/distributedenginecomm.h @@ -238,15 +238,13 @@ class DistributedEngineComm /* To keep some state associated with the connection. These aren't copyable. */ struct MQE : public boost::noncopyable { - MQE(const uint32_t pmCount, const uint32_t initialInterleaverValue, const uint64_t recvQueueSize); + MQE(const uint32_t pmCount, const uint32_t initialInterleaverValue, const uint64_t recvQueueSize, + boost::mutex* lock = nullptr, boost::condition* cond = nullptr); uint32_t getNextConnectionId(const size_t pmIndex, const size_t pmConnectionsNumber, const uint32_t DECConnectionsPerQuery); messageqcpp::Stats stats; StepMsgQueue queue; uint32_t ackSocketIndex; - boost::scoped_array unackedWork; - boost::scoped_array interleaver; - uint32_t initialConnectionId; uint32_t pmCount; // non-BPP primitives don't do ACKs bool sendACKs; @@ -261,6 +259,9 @@ class DistributedEngineComm bool hasBigMsgs; uint64_t targetQueueSize; + std::vector> unackedWork; + boost::scoped_array interleaver; + uint32_t initialConnectionId; }; // The mapping of session ids to StepMsgQueueLists @@ -292,7 +293,7 @@ class DistributedEngineComm std::mutex fMlock; // sessionMessages mutex std::vector> fWlock; // PrimProc socket write mutexes bool fBusy; - volatile uint32_t pmCount; + std::atomic pmCount; boost::mutex fOnErrMutex; // to lock function scope to reset pmconnections under error condition boost::mutex fSetupMutex; diff --git a/dbcon/joblist/fifo.h b/dbcon/joblist/fifo.h index e9f1c9bea..b88bbac75 100644 --- a/dbcon/joblist/fifo.h +++ b/dbcon/joblist/fifo.h @@ -28,6 +28,7 @@ #include #include +#include #include #include #include @@ -157,7 +158,7 @@ class FIFO : public DataListImpl, element_t> uint64_t fTotSize; bool fInOrder; uint64_t fConsumerFinishedCount; - volatile bool fConsumptionStarted; + std::atomic fConsumptionStarted; uint32_t fElementMode; uint64_t fNumFiles; uint64_t fNumBytes; diff --git a/dbcon/joblist/joblist.cpp b/dbcon/joblist/joblist.cpp index b902f5bf1..2dbc511cb 100644 --- a/dbcon/joblist/joblist.cpp +++ b/dbcon/joblist/joblist.cpp @@ -74,7 +74,7 @@ struct JSJoiner }; JobList::JobList(bool isEM) - : fIsRunning(false), fIsExeMgr(isEM), fPmsConnected(0), projectingTableOID(0), fAborted(0), fPriority(50) + : fIsRunning(false), fIsExeMgr(isEM), fPmsConnected(0), projectingTableOID(0), fAborted{0}, fPriority(50) { } @@ -615,7 +615,8 @@ void JobList::abort() uint32_t i; // If we're not currently aborting, then start aborting... - if (atomicops::atomicCAS(&fAborted, 0, 1)) + uint32_t expected = 0; + if (fAborted.compare_exchange_strong(expected, 1, std::memory_order_relaxed)) { for (i = 0; i < fQuery.size(); i++) fQuery[i]->abort(); @@ -628,7 +629,8 @@ void JobList::abort() void JobList::abortOnLimit(JobStep* js) { // If we're not currently aborting, then start aborting... - if (atomicops::atomicCAS(&fAborted, 0, 1)) + uint32_t expected = 0; + if (fAborted.compare_exchange_strong(expected, 1, std::memory_order_relaxed)) { // @bug4848, enhance and unify limit handling. for (uint32_t i = 0; i < fQuery.size(); i++) @@ -669,7 +671,7 @@ TupleJobList::~TupleJobList() void TupleJobList::abort() { - if (fAborted == 0 && fIsRunning) + if (fAborted.load() == 0 && fIsRunning) { JobList::abort(); messageqcpp::ByteStream bs; diff --git a/dbcon/joblist/joblist.h b/dbcon/joblist/joblist.h index 52b935a69..70af9e5bf 100644 --- a/dbcon/joblist/joblist.h +++ b/dbcon/joblist/joblist.h @@ -25,6 +25,9 @@ #include #include #include +#include +#include +#include #include #include "calpontsystemcatalog.h" @@ -163,7 +166,7 @@ class JobList EXPORT virtual void abort(); EXPORT virtual bool aborted() { - return (fAborted != 0); + return (fAborted.load() != 0); } std::string toString() const; @@ -209,7 +212,7 @@ class JobList std::string fMiniInfo; std::vector subqueryJoblists; - volatile uint32_t fAborted; + std::atomic fAborted; uint32_t fPriority; // higher #s = higher priority }; diff --git a/dbcon/joblist/jobstep.cpp b/dbcon/joblist/jobstep.cpp index b917262ef..d8cfbd58f 100644 --- a/dbcon/joblist/jobstep.cpp +++ b/dbcon/joblist/jobstep.cpp @@ -112,6 +112,47 @@ JobStep::JobStep(const JobInfo& j) fStepUuid = QueryTeleClient::genUUID(); } +//------------------------------------------------------------------------------ +// Copy constructor - needed because fDie is std::atomic which is not copyable +//------------------------------------------------------------------------------ +JobStep::JobStep(const JobStep& rhs) + : fInputJobStepAssociation(rhs.fInputJobStepAssociation) + , fOutputJobStepAssociation(rhs.fOutputJobStepAssociation) + , fSessionId(rhs.fSessionId) + , fTxnId(rhs.fTxnId) + , fVerId(rhs.fVerId) + , fStatementId(rhs.fStatementId) + , fStepId(rhs.fStepId) + , fTupleId(rhs.fTupleId) + , fAlias(rhs.fAlias) + , fView(rhs.fView) + , fPartitions(rhs.fPartitions) + , fName(rhs.fName) + , fSchema(rhs.fSchema) + , fTraceFlags(rhs.fTraceFlags) + , fCardinality(rhs.fCardinality) + , fDelayedRunFlag(rhs.fDelayedRunFlag) + , fDelivery(rhs.fDelivery) + , fOnClauseFilter(rhs.fOnClauseFilter) + , fDie(rhs.fDie.load(std::memory_order_relaxed)) + , fWaitToRunStepCnt(rhs.fWaitToRunStepCnt) + , fExtendedInfo(rhs.fExtendedInfo) + , fMiniInfo(rhs.fMiniInfo) + , fPriority(rhs.fPriority) + , fErrorInfo(rhs.fErrorInfo) + , fLogger(rhs.fLogger) + , fLocalQuery(rhs.fLocalQuery) + , fQueryUuid(rhs.fQueryUuid) + , fStepUuid(rhs.fStepUuid) + , fQtc(rhs.fQtc) + , fProgress(rhs.fProgress) + , fStartTime(rhs.fStartTime) + , fLastStepTeleTime(rhs.fLastStepTeleTime) + , fTimeZone(rhs.fTimeZone) + , fMaxPmJoinResultCount(rhs.fMaxPmJoinResultCount) +{ +} + //------------------------------------------------------------------------------ // Log a syslog msg for the start of this specified job step //------------------------------------------------------------------------------ diff --git a/dbcon/joblist/jobstep.h b/dbcon/joblist/jobstep.h index 0efb47756..8c4155487 100644 --- a/dbcon/joblist/jobstep.h +++ b/dbcon/joblist/jobstep.h @@ -125,6 +125,7 @@ class JobStep */ JobStep() = default; explicit JobStep(const JobInfo&); + JobStep(const JobStep&); /** destructor */ virtual ~JobStep() @@ -135,7 +136,7 @@ class JobStep virtual void run() = 0; virtual void abort() { - fDie = true; + fDie.store(true, std::memory_order_relaxed); } /** @brief virtual void join method */ @@ -388,7 +389,7 @@ class JobStep bool cancelled() { - return (fErrorInfo->errCode > 0 || fDie); + return (fErrorInfo->errCode > 0 || fDie.load()); } virtual bool stringTableFriendly() @@ -482,7 +483,7 @@ class JobStep bool fDelayedRunFlag; bool fDelivery; bool fOnClauseFilter; - volatile bool fDie; + std::atomic fDie; uint32_t fWaitToRunStepCnt; std::string fExtendedInfo; std::string fMiniInfo; diff --git a/dbcon/joblist/largehashjoin.h b/dbcon/joblist/largehashjoin.h index f85f091ba..07d576bf3 100644 --- a/dbcon/joblist/largehashjoin.h +++ b/dbcon/joblist/largehashjoin.h @@ -29,6 +29,7 @@ /** @file */ +#include #include #include #include @@ -135,12 +136,12 @@ class HashJoin uint32_t thrIdx; TimeSet timeset; JSTimeStamp dlTimes; - volatile bool* die; + std::atomic* die; } thrParams_t; HashJoin(joblist::BDLWrapper& set1, joblist::BDLWrapper& set2, joblist::DataList* result1, joblist::DataList* result2, JoinType joinType, - JSTimeStamp* dlTimes, const SErrorInfo& status, uint32_t sessionId, volatile bool* die); + JSTimeStamp* dlTimes, const SErrorInfo& status, uint32_t sessionId, std::atomic* die); HashJoin(); HashJoin(const HashJoin& hj); @@ -213,7 +214,7 @@ class HashJoin void createHash(BucketDL* bdlptr, hash_t* destHashTbl, const uint32_t idx, bool populateResult, // true if bdlptr is opposite an outer join joblist::DataList* result, // populated if populateResult true - JSTimeStamp& thrDlTimes, volatile bool* die); + JSTimeStamp& thrDlTimes, std::atomic* die); void init(); TimeSet* getTimeSet() { @@ -252,7 +253,7 @@ class HashJoin TimeSet fTimeSet; SErrorInfo fStatus; uint32_t fSessionId; - volatile bool* die; + std::atomic* die; }; template @@ -262,7 +263,7 @@ template HashJoin::HashJoin(joblist::BDLWrapper& set1, joblist::BDLWrapper& set2, joblist::DataList* result1, joblist::DataList* result2, JoinType joinType, JSTimeStamp* dlt, const SErrorInfo& status, - uint32_t sessionId, volatile bool* d) + uint32_t sessionId, std::atomic* d) : fTimeSet(), fStatus(status), fSessionId(sessionId) { fSet1 = set1; @@ -464,7 +465,7 @@ template void HashJoin::createHash(BucketDL* srcBucketDL, hash_t* destHashTbl, const uint32_t bucketNum, bool populateResult, joblist::DataList* result, JSTimeStamp& thrDlTimes, - volatile bool* die) + std::atomic* die) { bool more; element_t e; @@ -485,7 +486,7 @@ void HashJoin::createHash(BucketDL* srcBucketDL, hash_t* d thrDlTimes.setFirstReadTime(); } - for (; more & !(*die); more = srcBucketDL->next(bucketNum, bucketIter, &e)) + for (; more & !die->load(); more = srcBucketDL->next(bucketNum, bucketIter, &e)) { #ifdef DEBUG cout << "createHash() bkt " << bucketNum << " idx " << idx << " find(" << e.second << ")" << endl; diff --git a/dbcon/joblist/primitivestep.h b/dbcon/joblist/primitivestep.h index 23eae6073..b7b88f951 100644 --- a/dbcon/joblist/primitivestep.h +++ b/dbcon/joblist/primitivestep.h @@ -1250,7 +1250,7 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep uint64_t totalMsgs; uint64_t msgsSent; uint64_t msgsRecvd; - volatile bool finishedSending; + std::atomic finishedSending; bool firstRead; bool sendWaiting; uint32_t recvWaiting; diff --git a/dbcon/joblist/threadsafequeue.h b/dbcon/joblist/threadsafequeue.h index 71d6ca44c..44ac478ff 100644 --- a/dbcon/joblist/threadsafequeue.h +++ b/dbcon/joblist/threadsafequeue.h @@ -22,6 +22,7 @@ /** @file */ #pragma once +#include #include #include #include @@ -325,7 +326,7 @@ class ThreadSafeQueue impl_type fImpl; SPBM fPimplLock; SPBC fPimplCond; - volatile bool fShutdown; + std::atomic fShutdown; T fBs0; size_t bytes; uint32_t zeroCount; // counts the # of times read_some returned 0 diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index 46d674130..010df1a11 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -1679,7 +1679,7 @@ void TupleBPS::sendJobs(const vector& jobs) condvar.notify_all(); // Send not more than fMaxOutstandingRequests jobs out. min(blocksPerJob) = 16 - while ((msgsSent - msgsRecvd > fMaxOutstandingRequests * (blocksPerJob >> 1)) && !fDie) + while ((msgsSent - msgsRecvd > fMaxOutstandingRequests * (blocksPerJob >> 1)) && !fDie.load()) { sendWaiting = true; condvarWakeupProducer.wait(tplLock); @@ -2843,7 +2843,7 @@ const string TupleBPS::toString() const if (bop == BOP_OR) oss << " BOP_OR "; - if (fDie) + if (fDie.load()) oss << " aborting " << msgsSent << "/" << msgsRecvd << " " << uniqueID << " "; if (fOutputJobStepAssociation.outSize() > 0) @@ -3357,7 +3357,7 @@ void TupleBPS::dec(DistributedEngineComm* dec) void TupleBPS::abort_nolock() { - if (fDie) + if (fDie.load()) return; JobStep::abort(); diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index d56f0facf..db1786b62 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -675,7 +675,7 @@ void TupleHashJoinStep::hjRunner() errorMessage("too many threads"); status(logging::threadResourceErr); errorLogging(emsg, logging::threadResourceErr); - fDie = true; + fDie.store(true, std::memory_order_relaxed); deliverMutex.unlock(); } diff --git a/primitives/primproc/bppsendthread.cpp b/primitives/primproc/bppsendthread.cpp index 3b45928e2..22a953240 100644 --- a/primitives/primproc/bppsendthread.cpp +++ b/primitives/primproc/bppsendthread.cpp @@ -52,22 +52,22 @@ void BPPSendThread::sendResult(const Msg_t& msg, bool newConnection) if (sizeTooBig()) { std::unique_lock sl1(respondLock); - while (currentByteSize >= queueBytesThresh && msgQueue.size() > 3 && !die) + while (currentByteSize.load() >= queueBytesThresh && msgQueue.size() > 3 && !die.load()) { fProcessorPool->incBlockedThreads(); okToRespond.wait(sl1); fProcessorPool->decBlockedThreads(); } } - if (die) + if (die.load()) return; std::unique_lock sl(msgQueueLock); - if (gotException) + if (gotException.load()) throw std::runtime_error(exceptionString); - (void)atomicops::atomicAdd(¤tByteSize, msg.msg->lengthWithHdrOverhead()); + currentByteSize.fetch_add(msg.msg->lengthWithHdrOverhead(), std::memory_order_relaxed); msgQueue.push(msg); if (!sawAllConnections && newConnection) @@ -87,7 +87,7 @@ void BPPSendThread::sendResult(const Msg_t& msg, bool newConnection) } } - if (mainThreadWaiting) + if (mainThreadWaiting.load()) queueNotEmpty.notify_one(); } @@ -97,19 +97,19 @@ void BPPSendThread::sendResults(const std::vector& msgs, bool newConnecti if (sizeTooBig()) { std::unique_lock sl1(respondLock); - while (currentByteSize >= queueBytesThresh && msgQueue.size() > 3 && !die) + while (currentByteSize.load() >= queueBytesThresh && msgQueue.size() > 3 && !die.load()) { fProcessorPool->incBlockedThreads(); okToRespond.wait(sl1); fProcessorPool->decBlockedThreads(); } } - if (die) + if (die.load()) return; std::unique_lock sl(msgQueueLock); - if (gotException) + if (gotException.load()) throw std::runtime_error(exceptionString); if (!sawAllConnections && newConnection) @@ -132,11 +132,11 @@ void BPPSendThread::sendResults(const std::vector& msgs, bool newConnecti for (uint32_t i = 0; i < msgs.size(); i++) { - (void)atomicops::atomicAdd(¤tByteSize, msgs[i].msg->lengthWithHdrOverhead()); + currentByteSize.fetch_add(msgs[i].msg->lengthWithHdrOverhead(), std::memory_order_relaxed); msgQueue.push(msgs[i]); } - if (mainThreadWaiting) + if (mainThreadWaiting.load()) queueNotEmpty.notify_one(); } @@ -145,14 +145,14 @@ void BPPSendThread::sendMore(int num) std::unique_lock sl(ackLock); if (num == -1) - fcEnabled = false; + fcEnabled.store(false, std::memory_order_relaxed); else if (num == 0) { - fcEnabled = true; - msgsLeft = 0; + fcEnabled.store(true, std::memory_order_relaxed); + msgsLeft.store(0, std::memory_order_relaxed); } else - (void)atomicops::atomicAdd(&msgsLeft, num); + msgsLeft.fetch_add(num, std::memory_order_relaxed); sl.unlock(); if (waiting) @@ -161,7 +161,7 @@ void BPPSendThread::sendMore(int num) bool BPPSendThread::flowControlEnabled() { - return fcEnabled; + return fcEnabled.load(); } void BPPSendThread::mainLoop() @@ -175,15 +175,15 @@ void BPPSendThread::mainLoop() msg.reset(new Msg_t[msgCap]); - while (!die) + while (!die.load()) { std::unique_lock sl(msgQueueLock); - if (msgQueue.empty() && !die) + if (msgQueue.empty() && !die.load()) { - mainThreadWaiting = true; + mainThreadWaiting.store(true, std::memory_order_relaxed); queueNotEmpty.wait(sl); - mainThreadWaiting = false; + mainThreadWaiting.store(false, std::memory_order_relaxed); continue; } @@ -202,14 +202,14 @@ void BPPSendThread::mainLoop() * i how many msgs are sent by 1 run of the loop, limited by msgCount or msgsLeft. */ msgsSent = 0; - while (msgsSent < msgCount && !die) + while (msgsSent < msgCount && !die.load()) { uint64_t bsSize; - if (msgsLeft <= 0 && fcEnabled && !die) + if (msgsLeft.load() <= 0 && fcEnabled.load() && !die.load()) { std::unique_lock sl2(ackLock); - while (msgsLeft <= 0 && fcEnabled && !die) + while (msgsLeft.load() <= 0 && fcEnabled.load() && !die.load()) { waiting = true; okToSend.wait(sl2); @@ -217,7 +217,7 @@ void BPPSendThread::mainLoop() } } - for (i = 0; msgsSent < msgCount && ((fcEnabled && msgsLeft > 0) || !fcEnabled) && !die; msgsSent++, i++) + for (i = 0; msgsSent < msgCount && ((fcEnabled.load() && msgsLeft.load() > 0) || !fcEnabled.load()) && !die.load(); msgsSent++, i++) { if (doLoadBalancing) { @@ -249,17 +249,17 @@ void BPPSendThread::mainLoop() { sl.lock(); exceptionString = e.what(); - gotException = true; + gotException.store(true, std::memory_order_relaxed); return; } } - (void)atomicops::atomicDec(&msgsLeft); - (void)atomicops::atomicSub(¤tByteSize, bsSize); + msgsLeft.fetch_sub(1, std::memory_order_relaxed); + currentByteSize.fetch_sub(bsSize, std::memory_order_relaxed); msg[msgsSent].msg.reset(); } - if (fProcessorPool->blockedThreadCount() > 0 && currentByteSize < queueBytesThresh) + if (fProcessorPool->blockedThreadCount() > 0 && currentByteSize.load() < queueBytesThresh) { okToRespond.notify_one(); } @@ -273,7 +273,7 @@ void BPPSendThread::abort() std::lock_guard sl2(ackLock); std::lock_guard sl3(respondLock); - die = true; + die.store(true, std::memory_order_relaxed); queueNotEmpty.notify_all(); okToSend.notify_all(); diff --git a/primitives/primproc/bppsendthread.h b/primitives/primproc/bppsendthread.h index 7f2028917..4adc37eea 100644 --- a/primitives/primproc/bppsendthread.h +++ b/primitives/primproc/bppsendthread.h @@ -26,6 +26,7 @@ #include "fair_threadpool.h" #include "umsocketselector.h" +#include #include #include #include @@ -103,12 +104,12 @@ class BPPSendThread std::queue msgQueue; std::mutex msgQueueLock; std::condition_variable queueNotEmpty; - volatile bool die = false; - volatile bool gotException = false; - volatile bool mainThreadWaiting = false; + std::atomic die{false}; + std::atomic gotException{false}; + std::atomic mainThreadWaiting{false}; std::string exceptionString; uint32_t queueMsgThresh = 0; - volatile int32_t msgsLeft = -1; + std::atomic msgsLeft{-1}; bool waiting = false; std::mutex ackLock; std::condition_variable okToSend; @@ -136,10 +137,10 @@ class BPPSendThread std::set connections_s; std::vector connections_v; bool sawAllConnections = false; - volatile bool fcEnabled = false; + std::atomic fcEnabled{false}; /* secondary queue size restriction based on byte size */ - volatile uint64_t currentByteSize = 0; + std::atomic currentByteSize{0}; uint64_t queueBytesThresh; // Used to tell the ThreadPool It should consider additional threads because a // queue full event has happened and a thread has been blocked. diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 8b48452d4..ae97291e6 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -145,7 +145,7 @@ boost::mutex bppLock; boost::mutex djMutex; // lock for djLock, lol. std::map djLock; // djLock synchronizes destroy and joiner msgs, see bug 2619 -volatile int32_t asyncCounter; +std::atomic asyncCounter; const int asyncMax = 20; // current number of asynchronous loads struct preFetchCond @@ -858,8 +858,8 @@ struct AsynchLoader { sendThread->abort(); cerr << "AsynchLoader caught loadBlock exception: " << ex.what() << endl; - idbassert(asyncCounter > 0); - (void)atomicops::atomicDec(&asyncCounter); + idbassert(asyncCounter.load() > 0); + asyncCounter.fetch_sub(1, std::memory_order_relaxed); mutex->lock(); --(*busyLoaders); mutex->unlock(); @@ -874,8 +874,8 @@ struct AsynchLoader sendThread->abort(); cerr << "AsynchLoader caught unknown exception: " << endl; // FIXME Use a locked processor primitive? - idbassert(asyncCounter > 0); - (void)atomicops::atomicDec(&asyncCounter); + idbassert(asyncCounter.load() > 0); + asyncCounter.fetch_sub(1, std::memory_order_relaxed); mutex->lock(); --(*busyLoaders); mutex->unlock(); @@ -885,8 +885,8 @@ struct AsynchLoader return; } - idbassert(asyncCounter > 0); - (void)atomicops::atomicDec(&asyncCounter); + idbassert(asyncCounter.load() > 0); + asyncCounter.fetch_sub(1, std::memory_order_relaxed); mutex->lock(); if (cached) @@ -943,12 +943,10 @@ void loadBlockAsync(uint64_t lbid, const QueryContext& c, uint32_t txn, int comp return; /* a quick and easy stand-in for a threadpool for loaders */ - atomicops::atomicMb(); - - if (asyncCounter >= asyncMax) + if (asyncCounter.load(std::memory_order_relaxed) >= asyncMax) return; - (void)atomicops::atomicInc(&asyncCounter); + asyncCounter.fetch_add(1, std::memory_order_relaxed); boost::mutex::scoped_lock sl(*m); @@ -961,8 +959,8 @@ void loadBlockAsync(uint64_t lbid, const QueryContext& c, uint32_t txn, int comp catch (boost::thread_resource_error& e) { cerr << "AsynchLoader: caught a thread resource error, need to lower asyncMax\n"; - idbassert(asyncCounter > 0); - (void)atomicops::atomicDec(&asyncCounter); + idbassert(asyncCounter.load() > 0); + asyncCounter.fetch_sub(1, std::memory_order_relaxed); } } @@ -2272,7 +2270,7 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro // that can reschedule jobs, and an unlimited non-blocking queue fOOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1)); - asyncCounter = 0; + asyncCounter.store(0, std::memory_order_relaxed); brm = new DBRM(); diff --git a/storage-manager/src/Ownership.cpp b/storage-manager/src/Ownership.cpp index ddb26482f..57b0f8486 100644 --- a/storage-manager/src/Ownership.cpp +++ b/storage-manager/src/Ownership.cpp @@ -141,9 +141,9 @@ bf::path Ownership::get(const bf::path& p, bool getOwnership) #define DELETE(p, f) ::unlink((metadataPrefix / p / f).string().c_str()); -void Ownership::touchFlushing(const bf::path& prefix, volatile bool* doneFlushing) const +void Ownership::touchFlushing(const bf::path& prefix, std::atomic* doneFlushing) const { - while (!*doneFlushing) + while (!doneFlushing->load()) { TOUCH(prefix, "FLUSHING"); try @@ -182,13 +182,13 @@ void Ownership::releaseOwnership(const bf::path& p, bool isDtor) s.unlock(); - volatile bool done = false; + std::atomic done{false}; // start flushing boost::thread xfer([this, &p, &done] { this->touchFlushing(p, &done); }); Synchronizer::get()->dropPrefix(p); Cache::get()->dropPrefix(p); - done = true; + done.store(true); xfer.interrupt(); xfer.join(); @@ -271,14 +271,14 @@ void Ownership::takeOwnership(const bf::path& p) _takeOwnership(p); } -Ownership::Monitor::Monitor(Ownership* _owner) : owner(_owner), stop(false) +Ownership::Monitor::Monitor(Ownership* _owner) : owner(_owner), stop{false} { thread = boost::thread([this] { this->watchForInterlopers(); }); } Ownership::Monitor::~Monitor() { - stop = true; + stop.store(true); thread.interrupt(); thread.join(); } @@ -291,14 +291,14 @@ void Ownership::Monitor::watchForInterlopers() char buf[80]; vector releaseList; - while (!stop) + while (!stop.load()) { releaseList.clear(); boost::unique_lock s(owner->mutex); for (auto& prefix : owner->ownedPrefixes) { - if (stop) + if (stop.load()) break; if (prefix.second == false) continue; @@ -318,7 +318,7 @@ void Ownership::Monitor::watchForInterlopers() for (auto& prefix : releaseList) owner->releaseOwnership(prefix); - if (stop) + if (stop.load()) break; try { diff --git a/storage-manager/src/Ownership.h b/storage-manager/src/Ownership.h index 42f636252..a5b29ca9a 100644 --- a/storage-manager/src/Ownership.h +++ b/storage-manager/src/Ownership.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include #include @@ -46,7 +47,7 @@ class Ownership : public boost::noncopyable boost::filesystem::path metadataPrefix; SMLogging* logger; - void touchFlushing(const boost::filesystem::path&, volatile bool*) const; + void touchFlushing(const boost::filesystem::path&, std::atomic*) const; void takeOwnership(const boost::filesystem::path&); void releaseOwnership(const boost::filesystem::path&, bool isDtor = false); void _takeOwnership(const boost::filesystem::path&); @@ -57,7 +58,7 @@ class Ownership : public boost::noncopyable ~Monitor(); boost::thread thread; Ownership* owner; - volatile bool stop; + std::atomic stop; void watchForInterlopers(); }; diff --git a/utils/cacheutils/cacheutils.cpp b/utils/cacheutils/cacheutils.cpp index c94eb77c5..350e8b0e2 100644 --- a/utils/cacheutils/cacheutils.cpp +++ b/utils/cacheutils/cacheutils.cpp @@ -54,7 +54,7 @@ namespace boost::mutex CacheOpsMutex; // This global is updated only w/ atomic ops -volatile uint32_t MultiReturnCode; +std::atomic MultiReturnCode; int32_t extractRespCode(const ByteStream& bs) { @@ -95,7 +95,10 @@ class CacheOpThread } if (rc != 0) - atomicops::atomicCAS(&MultiReturnCode, 0, 1); + { + uint32_t expected = 0; + MultiReturnCode.compare_exchange_strong(expected, 1, std::memory_order_relaxed); + } } private: @@ -123,7 +126,7 @@ int sendToAll(const ByteStream& outBs) thread_group tg; int rc = 0; - MultiReturnCode = 0; + MultiReturnCode.store(0, std::memory_order_relaxed); for (int i = 0; i < cnt; i++) { @@ -134,7 +137,7 @@ int sendToAll(const ByteStream& outBs) tg.join_all(); - if (MultiReturnCode != 0) + if (MultiReturnCode.load(std::memory_order_relaxed) != 0) rc = -1; return rc; diff --git a/utils/querytele/queryteleprotoimpl.cpp b/utils/querytele/queryteleprotoimpl.cpp index d5b5e9e7e..80d83883c 100644 --- a/utils/querytele/queryteleprotoimpl.cpp +++ b/utils/querytele/queryteleprotoimpl.cpp @@ -58,7 +58,7 @@ TsTeleQueue stQueue; TsTeleQueue qtQueue; TsTeleQueue itQueue; -volatile bool isInited = false; +std::atomic isInited{false}; boost::mutex initMux; std::shared_ptr fSocket; @@ -337,9 +337,7 @@ QueryTeleProtoImpl::QueryTeleProtoImpl(const QueryTeleServerParms& sp) : fServer boost::mutex::scoped_lock lk(initMux); - atomicops::atomicMb(); - - if (isInited) + if (isInited.load(std::memory_order_acquire)) return; fSocket.reset(new att::TSocket(fServerParms.host, fServerParms.port)); @@ -348,8 +346,7 @@ QueryTeleProtoImpl::QueryTeleProtoImpl(const QueryTeleServerParms& sp) : fServer consThd = new boost::thread(&TeleConsumer); - atomicops::atomicMb(); - isInited = true; + isInited.store(true, std::memory_order_release); } int QueryTeleProtoImpl::enqStepTele(const StepTele& stdata) diff --git a/versioning/BRM/masterdbrmnode.h b/versioning/BRM/masterdbrmnode.h index 407f0b3dc..fd9ca77c9 100644 --- a/versioning/BRM/masterdbrmnode.h +++ b/versioning/BRM/masterdbrmnode.h @@ -26,6 +26,9 @@ #pragma once +#include +#include +#include #include #include @@ -255,7 +258,8 @@ class MasterDBRMNode std::condition_variable cpimportJobsCond; int runners, NumWorkers; ThreadParams* params; - volatile bool die, halting; + std::atomic die; + std::atomic halting; bool reloadCmd; mutable bool readOnly; // Maximum time to wait for worker responses/reconfigure before forcing read-only diff --git a/versioning/BRM/sessionmanagerserver.cpp b/versioning/BRM/sessionmanagerserver.cpp index bf0c7ccf9..7547fb45f 100644 --- a/versioning/BRM/sessionmanagerserver.cpp +++ b/versioning/BRM/sessionmanagerserver.cpp @@ -86,7 +86,7 @@ const uint32_t SessionManagerServer::SS_FORCE = const uint32_t SessionManagerServer::SS_QUERY_READY = 1 << 6; // Set by ProcManager when system is ready for queries -SessionManagerServer::SessionManagerServer() : unique32(0), unique64(0) +SessionManagerServer::SessionManagerServer() : unique32{0}, unique64{0} { config::Config* conf; string stmp; diff --git a/versioning/BRM/sessionmanagerserver.h b/versioning/BRM/sessionmanagerserver.h index ab4486d6e..257454757 100644 --- a/versioning/BRM/sessionmanagerserver.h +++ b/versioning/BRM/sessionmanagerserver.h @@ -26,9 +26,10 @@ #pragma once +#include +#include +#include #include -#include - #include #include #include @@ -209,7 +210,7 @@ class SessionManagerServer */ uint32_t getUnique32() { - return atomicops::atomicInc(&unique32); + return unique32.fetch_add(1, std::memory_order_relaxed) + 1; } /** @@ -217,7 +218,7 @@ class SessionManagerServer */ uint64_t getUnique64() { - return atomicops::atomicInc(&unique64); + return unique64.fetch_add(1, std::memory_order_relaxed) + 1; } /** @brief Resets the semaphores to their original state. For testing only. @@ -274,8 +275,8 @@ class SessionManagerServer void finishTransaction(TxnID& txn); void saveSMTxnIDAndState(); - volatile uint32_t unique32; - volatile uint64_t unique64; + std::atomic unique32; + std::atomic unique64; int maxTxns; // the maximum number of concurrent transactions std::string txnidFilename; diff --git a/writeengine/bulk/we_bulkstatus.cpp b/writeengine/bulk/we_bulkstatus.cpp index 5d5799a1e..ac60c8b26 100644 --- a/writeengine/bulk/we_bulkstatus.cpp +++ b/writeengine/bulk/we_bulkstatus.cpp @@ -30,5 +30,5 @@ namespace WriteEngine { /*static*/ -volatile int BulkStatus::fJobStatus = EXIT_SUCCESS; +std::atomic BulkStatus::fJobStatus{EXIT_SUCCESS}; } // namespace WriteEngine diff --git a/writeengine/bulk/we_bulkstatus.h b/writeengine/bulk/we_bulkstatus.h index cec56f8ac..10b07ac9f 100644 --- a/writeengine/bulk/we_bulkstatus.h +++ b/writeengine/bulk/we_bulkstatus.h @@ -23,6 +23,8 @@ #pragma once +#include + #if 0 // defined(_MSC_VER) && defined(WE_BULKSTATUS_DLLEXPORT) #define EXPORT __declspec(dllexport) #else @@ -48,12 +50,10 @@ class BulkStatus private: /* @brief Global job status flag. - * Declared volatile to insure that all threads see when this flag is - * changed. We don't worry about using a mutex since we are just using - * as a flag. Making the variable volatile should suffice, to make it - * work with multiple threads. + * Using std::atomic to ensure thread-safe access without requiring a mutex. + * atomic provides proper memory ordering guarantees for multi-threaded access. */ - static volatile int fJobStatus; + static std::atomic fJobStatus; }; } // namespace WriteEngine diff --git a/writeengine/bulk/we_columninfo.cpp b/writeengine/bulk/we_columninfo.cpp index f2e4d0e7d..76bd1ea2d 100644 --- a/writeengine/bulk/we_columninfo.cpp +++ b/writeengine/bulk/we_columninfo.cpp @@ -152,7 +152,7 @@ ColumnInfo::ColumnInfo(Log* logger, int idIn, const JobColumn& columnIn, DBRootE , fMaxNumRowsPerSegFile(0) , fStore(0) , fAutoIncLastValue(0) - , fSaturatedRowCnt(0) + , fSaturatedRowCnt{0} , fpTableInfo(pTableInfo) , fAutoIncMgr(0) , fDbRootExtTrk(pDBRootExtTrk) diff --git a/writeengine/bulk/we_columninfo.h b/writeengine/bulk/we_columninfo.h index 9e780b62c..f6bcc5aa8 100644 --- a/writeengine/bulk/we_columninfo.h +++ b/writeengine/bulk/we_columninfo.h @@ -472,7 +472,7 @@ class ColumnInfo : public WeUIDGID // For autoincrement column only... Tracks latest autoincrement value used long long fAutoIncLastValue; - volatile int64_t fSaturatedRowCnt; // No. of rows with saturated values + std::atomic fSaturatedRowCnt; // No. of rows with saturated values // List of segment files updated during an import; used to track infor- // mation necessary to update the ExtentMap at the "end" of the import. @@ -517,7 +517,7 @@ inline int64_t ColumnInfo::getFileSize() const inline void ColumnInfo::incSaturatedCnt(int64_t satIncCnt) { - (void)atomicops::atomicAdd(&fSaturatedRowCnt, satIncCnt); + fSaturatedRowCnt.fetch_add(satIncCnt, std::memory_order_relaxed); } inline bool ColumnInfo::isAbbrevExtent() @@ -537,7 +537,7 @@ inline void ColumnInfo::printCPInfo(JobColumn column) inline long long ColumnInfo::saturatedCnt() { - return fSaturatedRowCnt; + return fSaturatedRowCnt.load(std::memory_order_relaxed); } inline void ColumnInfo::relativeColWidthFactor(int colWidFactor) diff --git a/writeengine/bulk/we_tableinfo.h b/writeengine/bulk/we_tableinfo.h index 3cbfe204e..a0cd52168 100644 --- a/writeengine/bulk/we_tableinfo.h +++ b/writeengine/bulk/we_tableinfo.h @@ -21,6 +21,7 @@ *******************************************************************************/ #pragma once +#include #include #include #include @@ -69,10 +70,10 @@ class TableInfo : public WeUIDGID // to read import files. Comes from // writeBufferSize tag in job xml file char fColDelim; // Used to delimit col values in a row - volatile Status fStatusTI; // Status of table. Made volatile to - // insure BulkLoad methods can access - // (thru getStatusTI()) correctly w/o - // having to go through a mutex lock. + std::atomic fStatusTI; // Status of table. Using atomic to + // ensure BulkLoad methods can access + // correctly across threads with proper + // memory ordering guarantees. int fReadBufCount; // Number of read buffers // (size of fBuffers vector) unsigned fNumberOfColumns; // Number of ColumnInfo objs in this tbl @@ -134,7 +135,7 @@ class TableInfo : public WeUIDGID // to use for TIMESTAMP data type. For example, // for EST which is UTC-5:00, offset will be -18000s. - volatile bool fTableLocked; // Do we have db table lock + std::atomic fTableLocked; // Do we have db table lock bool fReadFromStdin; // Read import file from STDIN bool fReadFromS3; // Read import file from S3 diff --git a/writeengine/client/we_clients.cpp b/writeengine/client/we_clients.cpp index 223947ed5..262251ac3 100644 --- a/writeengine/client/we_clients.cpp +++ b/writeengine/client/we_clients.cpp @@ -173,9 +173,8 @@ bool isWESConfigured(config::Config* config, const std::string& fOtherEnd) namespace WriteEngine { -WEClients::WEClients(int PrgmID) : fPrgmID(PrgmID), pmCount(0) +WEClients::WEClients(int PrgmID) : fPrgmID(PrgmID), closingConnection{0}, pmCount(0) { - closingConnection = 0; Setup(); } @@ -317,7 +316,7 @@ bool WEClients::isConnectionReadonly(uint32_t connection) int WEClients::Close() { makeBusy(false); - closingConnection = 1; + closingConnection.store(1, std::memory_order_relaxed); ByteStream bs; bs << (ByteStream::byte)WE_SVR_CLOSE_CONNECTION; write_to_all(bs); @@ -356,7 +355,7 @@ void WEClients::Listen(boost::shared_ptr client, uint32_t co } else // got zero bytes on read, nothing more will come { - if (closingConnection > 0) + if (closingConnection.load() > 0) { return; } @@ -390,7 +389,7 @@ Error: for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) { map_tok->second->queue.clear(); - (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]); + map_tok->second->unackedWork[0].fetch_add(1, std::memory_order_relaxed); map_tok->second->queue.push(sbs); } @@ -425,9 +424,7 @@ void WEClients::addQueue(uint32_t key) boost::mutex* lock = new boost::mutex(); condition* cond = new condition(); - boost::shared_ptr mqe(new MQE(pmCount)); - - mqe->queue = WESMsgQueue(lock, cond); + boost::shared_ptr mqe(new MQE(pmCount, lock, cond)); boost::mutex::scoped_lock lk(fMlock); b = fSessionMessages.insert(pair >(key, mqe)).second; @@ -562,7 +559,7 @@ void WEClients::addDataToOutput(SBS sbs, uint32_t connIndex) if (pmCount > 0) { - atomicops::atomicInc(&mqe->unackedWork[connIndex % pmCount]); + mqe->unackedWork[connIndex % pmCount].fetch_add(1, std::memory_order_relaxed); } (void)mqe->queue.push(sbs); diff --git a/writeengine/client/we_clients.h b/writeengine/client/we_clients.h index da315c6de..fda869bba 100644 --- a/writeengine/client/we_clients.h +++ b/writeengine/client/we_clients.h @@ -138,15 +138,15 @@ class WEClients /* To keep some state associated with the connection */ struct MQE { - MQE(uint32_t pCount) : ackSocketIndex(0), pmCount(pCount) + MQE(uint32_t pCount, boost::mutex* lock = nullptr, boost::condition* cond = nullptr) + : queue(lock, cond), ackSocketIndex(0), pmCount(pCount), unackedWork(pCount) { - unackedWork.reset(new volatile uint32_t[pmCount]); - memset((void*)unackedWork.get(), 0, pmCount * sizeof(uint32_t)); + // unackedWork vector is default-initialized to 0 } WESMsgQueue queue; uint32_t ackSocketIndex; - boost::scoped_array unackedWork; uint32_t pmCount; + std::vector> unackedWork; }; // The mapping of session ids to StepMsgQueueLists @@ -168,7 +168,7 @@ class WEClients boost::mutex fMlock; // sessionMessages mutex std::vector > fWlock; // WES socket write mutexes bool fBusy; - volatile uint32_t closingConnection; + std::atomic closingConnection; uint32_t pmCount; boost::mutex fOnErrMutex; // to lock function scope to reset pmconnections under error condition diff --git a/writeengine/redistribute/we_redistributecontrolthread.cpp b/writeengine/redistribute/we_redistributecontrolthread.cpp index c62a53ae7..a9ce0d496 100644 --- a/writeengine/redistribute/we_redistributecontrolthread.cpp +++ b/writeengine/redistribute/we_redistributecontrolthread.cpp @@ -65,13 +65,13 @@ namespace redistribute { // static variables boost::mutex RedistributeControlThread::fActionMutex; -volatile bool RedistributeControlThread::fStopAction = false; +std::atomic RedistributeControlThread::fStopAction{false}; string RedistributeControlThread::fWesInUse; void RedistributeControlThread::setStopAction(bool s) { boost::mutex::scoped_lock lock(fActionMutex); - fStopAction = s; + fStopAction.store(s, std::memory_order_relaxed); } RedistributeControlThread::RedistributeControlThread(uint32_t act) diff --git a/writeengine/redistribute/we_redistributecontrolthread.h b/writeengine/redistribute/we_redistributecontrolthread.h index 9b4c88ac8..640a7e7fe 100644 --- a/writeengine/redistribute/we_redistributecontrolthread.h +++ b/writeengine/redistribute/we_redistributecontrolthread.h @@ -24,6 +24,8 @@ #include #include #include +#include +#include #include "boost/shared_ptr.hpp" #include "boost/thread/mutex.hpp" @@ -119,7 +121,7 @@ class RedistributeControlThread RedistributeControl* fControl; static boost::mutex fActionMutex; - static volatile bool fStopAction; + static std::atomic fStopAction; static std::string fWesInUse; }; diff --git a/writeengine/redistribute/we_redistributeworkerthread.cpp b/writeengine/redistribute/we_redistributeworkerthread.cpp index 71fbb8425..b6af33940 100644 --- a/writeengine/redistribute/we_redistributeworkerthread.cpp +++ b/writeengine/redistribute/we_redistributeworkerthread.cpp @@ -72,8 +72,8 @@ namespace redistribute { // static variables boost::mutex RedistributeWorkerThread::fActionMutex; -volatile bool RedistributeWorkerThread::fStopAction = false; -volatile bool RedistributeWorkerThread::fCommitted = false; +std::atomic RedistributeWorkerThread::fStopAction{false}; +std::atomic RedistributeWorkerThread::fCommitted{false}; string RedistributeWorkerThread::fWesInUse; RedistributeWorkerThread::RedistributeWorkerThread(ByteStream& bs, IOSocket& ios) diff --git a/writeengine/redistribute/we_redistributeworkerthread.h b/writeengine/redistribute/we_redistributeworkerthread.h index fb787f498..3ef99fd72 100644 --- a/writeengine/redistribute/we_redistributeworkerthread.h +++ b/writeengine/redistribute/we_redistributeworkerthread.h @@ -25,9 +25,11 @@ #include #include #include +#include #include "boost/shared_ptr.hpp" #include "boost/thread/mutex.hpp" +#include "boost/thread.hpp" #include "brmtypes.h" #include "we_redistributedef.h" @@ -133,8 +135,8 @@ class RedistributeWorkerThread // uint64_t fSegPerRoot; static boost::mutex fActionMutex; - static volatile bool fStopAction; - static volatile bool fCommitted; + static std::atomic fStopAction; + static std::atomic fCommitted; static std::string fWesInUse; }; diff --git a/writeengine/server/we_getfilesizes.h b/writeengine/server/we_getfilesizes.h index fbeb3a7b7..695574cf1 100644 --- a/writeengine/server/we_getfilesizes.h +++ b/writeengine/server/we_getfilesizes.h @@ -55,13 +55,12 @@ class ActiveThreadCounter for (;;) { - atomicops::atomicMb(); - atc = factiveThreadCount; + atc = factiveThreadCount.load(std::memory_order_relaxed); if (atc <= 0) // hopefully atc will never be < 0! return; - if (atomicops::atomicCAS(&factiveThreadCount, atc, (atc - 1))) + if (factiveThreadCount.compare_exchange_weak(atc, atc - 1, std::memory_order_relaxed)) return; atomicops::atomicYield(); @@ -70,14 +69,14 @@ class ActiveThreadCounter uint32_t cur() { - return factiveThreadCount; + return factiveThreadCount.load(std::memory_order_relaxed); } private: ActiveThreadCounter(const ActiveThreadCounter& rhs); ActiveThreadCounter& operator=(const ActiveThreadCounter& rhs); - volatile int32_t factiveThreadCount; + std::atomic factiveThreadCount; }; } // namespace WriteEngine diff --git a/writeengine/shared/we_brm.cpp b/writeengine/shared/we_brm.cpp index ec4d9f2a1..b361ad46a 100644 --- a/writeengine/shared/we_brm.cpp +++ b/writeengine/shared/we_brm.cpp @@ -56,7 +56,7 @@ using namespace execplan; /** Namespace WriteEngine */ namespace WriteEngine { -BRMWrapper* volatile BRMWrapper::m_instance = NULL; +std::atomic BRMWrapper::m_instance{nullptr}; std::atomic BRMWrapper::finishReported(false); thread_local int BRMWrapper::m_brmRc = 0; boost::mutex BRMWrapper::m_instanceCreateMutex; @@ -298,22 +298,19 @@ int BRMWrapper::getFboOffset(const uint64_t lbid, int& oid, uint16_t& dbRoot, ui //------------------------------------------------------------------------------ BRMWrapper* BRMWrapper::getInstance() { - if (m_instance == 0) + BRMWrapper* tmp = m_instance.load(std::memory_order_acquire); + if (tmp == nullptr) { boost::mutex::scoped_lock lock(m_instanceCreateMutex); - - if (m_instance == 0) + tmp = m_instance.load(std::memory_order_relaxed); + if (tmp == nullptr) { - BRMWrapper* tmp = new BRMWrapper(); - - // Memory barrier makes sure the m_instance assignment is not - // mingled with the constructor code - atomicops::atomicMb(); - m_instance = tmp; + tmp = new BRMWrapper(); + m_instance.store(tmp, std::memory_order_release); } } - return m_instance; + return tmp; } //------------------------------------------------------------------------------ diff --git a/writeengine/shared/we_brm.h b/writeengine/shared/we_brm.h index 275d930c3..a266b37eb 100644 --- a/writeengine/shared/we_brm.h +++ b/writeengine/shared/we_brm.h @@ -465,7 +465,7 @@ class BRMWrapper : public WEObj // Private data members //-------------------------------------------------------------------------- - static BRMWrapper* volatile m_instance; + static std::atomic m_instance; static thread_local int m_brmRc; static boost::mutex m_instanceCreateMutex;