You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-11-19 22:02:09 +03:00
fix(correctness): replace volatiles with atomics
This commit is contained in:
committed by
Leonid Fedorov
parent
c5e3b847ab
commit
2f0f5a79b6
@@ -219,7 +219,7 @@ void DistributedEngineComm::reset()
|
|||||||
}
|
}
|
||||||
|
|
||||||
DistributedEngineComm::DistributedEngineComm(ResourceManager* rm, bool isExeMgr)
|
DistributedEngineComm::DistributedEngineComm(ResourceManager* rm, bool isExeMgr)
|
||||||
: fRm(rm), pmCount(0), fIsExeMgr(isExeMgr)
|
: fRm(rm), pmCount{0}, fIsExeMgr(isExeMgr)
|
||||||
{
|
{
|
||||||
if (fIsExeMgr)
|
if (fIsExeMgr)
|
||||||
{
|
{
|
||||||
@@ -369,9 +369,8 @@ int32_t DistributedEngineComm::Setup()
|
|||||||
|
|
||||||
fWlock.swap(newLocks);
|
fWlock.swap(newLocks);
|
||||||
fPmConnections.swap(newClients);
|
fPmConnections.swap(newClients);
|
||||||
// memory barrier to prevent the pmCount assignment migrating upward
|
// atomic store with release semantics ensures all previous writes are visible
|
||||||
atomicops::atomicMb();
|
pmCount.store(newPmCount, std::memory_order_release);
|
||||||
pmCount = newPmCount;
|
|
||||||
|
|
||||||
newLocks.clear();
|
newLocks.clear();
|
||||||
newClients.clear();
|
newClients.clear();
|
||||||
@@ -432,18 +431,18 @@ Error:
|
|||||||
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
|
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
|
||||||
{
|
{
|
||||||
map_tok->second->queue.clear();
|
map_tok->second->queue.clear();
|
||||||
(void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
|
map_tok->second->unackedWork[0].fetch_add(1, std::memory_order_relaxed);
|
||||||
map_tok->second->queue.push(sbs);
|
map_tok->second->queue.push(sbs);
|
||||||
}
|
}
|
||||||
lk.unlock();
|
lk.unlock();
|
||||||
|
|
||||||
if (fIsExeMgr)
|
if (fIsExeMgr)
|
||||||
{
|
{
|
||||||
decltype(pmCount) originalPMCount = pmCount;
|
uint32_t originalPMCount = pmCount.load(std::memory_order_relaxed);
|
||||||
// Re-establish if a remote PM restarted.
|
// Re-establish if a remote PM restarted.
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
std::this_thread::sleep_for(std::chrono::seconds(3));
|
||||||
auto rc = Setup();
|
auto rc = Setup();
|
||||||
if (rc || originalPMCount != pmCount)
|
if (rc || originalPMCount != pmCount.load(std::memory_order_relaxed))
|
||||||
{
|
{
|
||||||
ostringstream os;
|
ostringstream os;
|
||||||
os << "DEC: lost connection to " << client->addr2String();
|
os << "DEC: lost connection to " << client->addr2String();
|
||||||
@@ -461,9 +460,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs)
|
|||||||
condition* cond = new condition();
|
condition* cond = new condition();
|
||||||
uint32_t firstPMInterleavedConnectionId =
|
uint32_t firstPMInterleavedConnectionId =
|
||||||
key % (fPmConnections.size() / pmCount) * fDECConnectionsPerQuery * pmCount % fPmConnections.size();
|
key % (fPmConnections.size() / pmCount) * fDECConnectionsPerQuery * pmCount % fPmConnections.size();
|
||||||
boost::shared_ptr<MQE> mqe(new MQE(pmCount, firstPMInterleavedConnectionId, flowControlEnableBytesThresh));
|
boost::shared_ptr<MQE> mqe(new MQE(pmCount, firstPMInterleavedConnectionId, flowControlEnableBytesThresh, lock, cond));
|
||||||
|
|
||||||
mqe->queue = StepMsgQueue(lock, cond);
|
|
||||||
mqe->sendACKs = sendACKs;
|
mqe->sendACKs = sendACKs;
|
||||||
mqe->throttled = false;
|
mqe->throttled = false;
|
||||||
|
|
||||||
@@ -757,7 +754,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
|
|||||||
uint64_t totalUnackedWork = 0;
|
uint64_t totalUnackedWork = 0;
|
||||||
|
|
||||||
for (uint32_t i = 0; i < pmCount; ++i)
|
for (uint32_t i = 0; i < pmCount; ++i)
|
||||||
totalUnackedWork += mqe->unackedWork[i];
|
totalUnackedWork += mqe->unackedWork[i].load(std::memory_order_relaxed);
|
||||||
|
|
||||||
if (totalUnackedWork == 0)
|
if (totalUnackedWork == 0)
|
||||||
{
|
{
|
||||||
@@ -784,9 +781,9 @@ void DistributedEngineComm::nextPMToACK(boost::shared_ptr<MQE> mqe, uint32_t max
|
|||||||
* the locking env, mqe->unackedWork can only grow; whatever gets latched in this fcn
|
* the locking env, mqe->unackedWork can only grow; whatever gets latched in this fcn
|
||||||
* is a safe minimum at the point of use. */
|
* is a safe minimum at the point of use. */
|
||||||
|
|
||||||
if (mqe->unackedWork[nextIndex] >= maxAck)
|
if (mqe->unackedWork[nextIndex].load(std::memory_order_relaxed) >= maxAck)
|
||||||
{
|
{
|
||||||
(void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], maxAck);
|
mqe->unackedWork[nextIndex].fetch_sub(maxAck, std::memory_order_relaxed);
|
||||||
*sockIndex = nextIndex;
|
*sockIndex = nextIndex;
|
||||||
// FIXME: we're going to truncate here from 32 to 16 bits. Hopefully this will always fit...
|
// FIXME: we're going to truncate here from 32 to 16 bits. Hopefully this will always fit...
|
||||||
*numToAck = maxAck;
|
*numToAck = maxAck;
|
||||||
@@ -800,12 +797,12 @@ void DistributedEngineComm::nextPMToACK(boost::shared_ptr<MQE> mqe, uint32_t max
|
|||||||
{
|
{
|
||||||
for (int i = pmCount - 1; i >= 0; --i)
|
for (int i = pmCount - 1; i >= 0; --i)
|
||||||
{
|
{
|
||||||
uint32_t curVal = mqe->unackedWork[nextIndex];
|
uint32_t curVal = mqe->unackedWork[nextIndex].load(std::memory_order_relaxed);
|
||||||
uint32_t unackedWork = (curVal > maxAck ? maxAck : curVal);
|
uint32_t unackedWork = (curVal > maxAck ? maxAck : curVal);
|
||||||
|
|
||||||
if (unackedWork > 0)
|
if (unackedWork > 0)
|
||||||
{
|
{
|
||||||
(void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], unackedWork);
|
mqe->unackedWork[nextIndex].fetch_sub(unackedWork, std::memory_order_relaxed);
|
||||||
*sockIndex = nextIndex;
|
*sockIndex = nextIndex;
|
||||||
*numToAck = unackedWork;
|
*numToAck = unackedWork;
|
||||||
|
|
||||||
@@ -822,7 +819,7 @@ void DistributedEngineComm::nextPMToACK(boost::shared_ptr<MQE> mqe, uint32_t max
|
|||||||
cerr << "DEC::nextPMToACK(): Couldn't find a PM to ACK! ";
|
cerr << "DEC::nextPMToACK(): Couldn't find a PM to ACK! ";
|
||||||
|
|
||||||
for (int i = pmCount - 1; i >= 0; --i)
|
for (int i = pmCount - 1; i >= 0; --i)
|
||||||
cerr << mqe->unackedWork[i] << " ";
|
cerr << mqe->unackedWork[i].load(std::memory_order_relaxed) << " ";
|
||||||
|
|
||||||
cerr << " max: " << maxAck;
|
cerr << " max: " << maxAck;
|
||||||
cerr << endl;
|
cerr << endl;
|
||||||
@@ -986,7 +983,7 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats*
|
|||||||
|
|
||||||
if (pmCount > 0)
|
if (pmCount > 0)
|
||||||
{
|
{
|
||||||
(void)atomicops::atomicInc(&mqe->unackedWork[connIndex % pmCount]);
|
mqe->unackedWork[connIndex % pmCount].fetch_add(1, std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
TSQSize_t queueSize = mqe->queue.push(sbs);
|
TSQSize_t queueSize = mqe->queue.push(sbs);
|
||||||
@@ -1108,7 +1105,7 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_
|
|||||||
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
|
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
|
||||||
{
|
{
|
||||||
map_tok->second->queue.clear();
|
map_tok->second->queue.clear();
|
||||||
(void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
|
map_tok->second->unackedWork[0].fetch_add(1, std::memory_order_relaxed);
|
||||||
map_tok->second->queue.push(sbs);
|
map_tok->second->queue.push(sbs);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1153,7 +1150,8 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_
|
|||||||
}
|
}
|
||||||
if (tempConns.size() == fPmConnections.size()) return 0;
|
if (tempConns.size() == fPmConnections.size()) return 0;
|
||||||
fPmConnections.swap(tempConns);
|
fPmConnections.swap(tempConns);
|
||||||
pmCount = (pmCount == 0 ? 0 : pmCount - 1);
|
uint32_t oldCount = pmCount.load(std::memory_order_relaxed);
|
||||||
|
pmCount.store((oldCount == 0 ? 0 : oldCount - 1), std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
// send alarm
|
// send alarm
|
||||||
ALARMManager alarmMgr;
|
ALARMManager alarmMgr;
|
||||||
@@ -1219,12 +1217,12 @@ Stats DistributedEngineComm::getNetworkStats(uint32_t uniqueID)
|
|||||||
}
|
}
|
||||||
|
|
||||||
DistributedEngineComm::MQE::MQE(const uint32_t pCount, const uint32_t initialInterleaverValue,
|
DistributedEngineComm::MQE::MQE(const uint32_t pCount, const uint32_t initialInterleaverValue,
|
||||||
const uint64_t flowControlEnableBytesThresh)
|
const uint64_t flowControlEnableBytesThresh,
|
||||||
: ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false), targetQueueSize(flowControlEnableBytesThresh)
|
boost::mutex* lock, boost::condition* cond)
|
||||||
|
: queue(lock, cond), ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false), targetQueueSize(flowControlEnableBytesThresh), unackedWork(pCount)
|
||||||
{
|
{
|
||||||
unackedWork.reset(new volatile uint32_t[pmCount]);
|
// unackedWork vector is default-initialized to 0
|
||||||
interleaver.reset(new uint32_t[pmCount]);
|
interleaver.reset(new uint32_t[pmCount]);
|
||||||
memset((void*)unackedWork.get(), 0, pmCount * sizeof(uint32_t));
|
|
||||||
uint32_t interleaverValue = initialInterleaverValue;
|
uint32_t interleaverValue = initialInterleaverValue;
|
||||||
initialConnectionId = initialInterleaverValue;
|
initialConnectionId = initialInterleaverValue;
|
||||||
for (size_t pmId = 0; pmId < pmCount; ++pmId)
|
for (size_t pmId = 0; pmId < pmCount; ++pmId)
|
||||||
|
|||||||
@@ -238,15 +238,13 @@ class DistributedEngineComm
|
|||||||
/* To keep some state associated with the connection. These aren't copyable. */
|
/* To keep some state associated with the connection. These aren't copyable. */
|
||||||
struct MQE : public boost::noncopyable
|
struct MQE : public boost::noncopyable
|
||||||
{
|
{
|
||||||
MQE(const uint32_t pmCount, const uint32_t initialInterleaverValue, const uint64_t recvQueueSize);
|
MQE(const uint32_t pmCount, const uint32_t initialInterleaverValue, const uint64_t recvQueueSize,
|
||||||
|
boost::mutex* lock = nullptr, boost::condition* cond = nullptr);
|
||||||
uint32_t getNextConnectionId(const size_t pmIndex, const size_t pmConnectionsNumber,
|
uint32_t getNextConnectionId(const size_t pmIndex, const size_t pmConnectionsNumber,
|
||||||
const uint32_t DECConnectionsPerQuery);
|
const uint32_t DECConnectionsPerQuery);
|
||||||
messageqcpp::Stats stats;
|
messageqcpp::Stats stats;
|
||||||
StepMsgQueue queue;
|
StepMsgQueue queue;
|
||||||
uint32_t ackSocketIndex;
|
uint32_t ackSocketIndex;
|
||||||
boost::scoped_array<volatile uint32_t> unackedWork;
|
|
||||||
boost::scoped_array<uint32_t> interleaver;
|
|
||||||
uint32_t initialConnectionId;
|
|
||||||
uint32_t pmCount;
|
uint32_t pmCount;
|
||||||
// non-BPP primitives don't do ACKs
|
// non-BPP primitives don't do ACKs
|
||||||
bool sendACKs;
|
bool sendACKs;
|
||||||
@@ -261,6 +259,9 @@ class DistributedEngineComm
|
|||||||
bool hasBigMsgs;
|
bool hasBigMsgs;
|
||||||
|
|
||||||
uint64_t targetQueueSize;
|
uint64_t targetQueueSize;
|
||||||
|
std::vector<std::atomic<uint32_t>> unackedWork;
|
||||||
|
boost::scoped_array<uint32_t> interleaver;
|
||||||
|
uint32_t initialConnectionId;
|
||||||
};
|
};
|
||||||
|
|
||||||
// The mapping of session ids to StepMsgQueueLists
|
// The mapping of session ids to StepMsgQueueLists
|
||||||
@@ -292,7 +293,7 @@ class DistributedEngineComm
|
|||||||
std::mutex fMlock; // sessionMessages mutex
|
std::mutex fMlock; // sessionMessages mutex
|
||||||
std::vector<std::shared_ptr<std::mutex>> fWlock; // PrimProc socket write mutexes
|
std::vector<std::shared_ptr<std::mutex>> fWlock; // PrimProc socket write mutexes
|
||||||
bool fBusy;
|
bool fBusy;
|
||||||
volatile uint32_t pmCount;
|
std::atomic<uint32_t> pmCount;
|
||||||
boost::mutex fOnErrMutex; // to lock function scope to reset pmconnections under error condition
|
boost::mutex fOnErrMutex; // to lock function scope to reset pmconnections under error condition
|
||||||
boost::mutex fSetupMutex;
|
boost::mutex fSetupMutex;
|
||||||
|
|
||||||
|
|||||||
@@ -28,6 +28,7 @@
|
|||||||
|
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
#include <atomic>
|
||||||
#include <boost/thread.hpp>
|
#include <boost/thread.hpp>
|
||||||
#include <boost/thread/condition.hpp>
|
#include <boost/thread/condition.hpp>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
@@ -157,7 +158,7 @@ class FIFO : public DataListImpl<std::vector<element_t>, element_t>
|
|||||||
uint64_t fTotSize;
|
uint64_t fTotSize;
|
||||||
bool fInOrder;
|
bool fInOrder;
|
||||||
uint64_t fConsumerFinishedCount;
|
uint64_t fConsumerFinishedCount;
|
||||||
volatile bool fConsumptionStarted;
|
std::atomic<bool> fConsumptionStarted;
|
||||||
uint32_t fElementMode;
|
uint32_t fElementMode;
|
||||||
uint64_t fNumFiles;
|
uint64_t fNumFiles;
|
||||||
uint64_t fNumBytes;
|
uint64_t fNumBytes;
|
||||||
|
|||||||
@@ -74,7 +74,7 @@ struct JSJoiner
|
|||||||
};
|
};
|
||||||
|
|
||||||
JobList::JobList(bool isEM)
|
JobList::JobList(bool isEM)
|
||||||
: fIsRunning(false), fIsExeMgr(isEM), fPmsConnected(0), projectingTableOID(0), fAborted(0), fPriority(50)
|
: fIsRunning(false), fIsExeMgr(isEM), fPmsConnected(0), projectingTableOID(0), fAborted{0}, fPriority(50)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -615,7 +615,8 @@ void JobList::abort()
|
|||||||
uint32_t i;
|
uint32_t i;
|
||||||
|
|
||||||
// If we're not currently aborting, then start aborting...
|
// If we're not currently aborting, then start aborting...
|
||||||
if (atomicops::atomicCAS<uint32_t>(&fAborted, 0, 1))
|
uint32_t expected = 0;
|
||||||
|
if (fAborted.compare_exchange_strong(expected, 1, std::memory_order_relaxed))
|
||||||
{
|
{
|
||||||
for (i = 0; i < fQuery.size(); i++)
|
for (i = 0; i < fQuery.size(); i++)
|
||||||
fQuery[i]->abort();
|
fQuery[i]->abort();
|
||||||
@@ -628,7 +629,8 @@ void JobList::abort()
|
|||||||
void JobList::abortOnLimit(JobStep* js)
|
void JobList::abortOnLimit(JobStep* js)
|
||||||
{
|
{
|
||||||
// If we're not currently aborting, then start aborting...
|
// If we're not currently aborting, then start aborting...
|
||||||
if (atomicops::atomicCAS<uint32_t>(&fAborted, 0, 1))
|
uint32_t expected = 0;
|
||||||
|
if (fAborted.compare_exchange_strong(expected, 1, std::memory_order_relaxed))
|
||||||
{
|
{
|
||||||
// @bug4848, enhance and unify limit handling.
|
// @bug4848, enhance and unify limit handling.
|
||||||
for (uint32_t i = 0; i < fQuery.size(); i++)
|
for (uint32_t i = 0; i < fQuery.size(); i++)
|
||||||
@@ -669,7 +671,7 @@ TupleJobList::~TupleJobList()
|
|||||||
|
|
||||||
void TupleJobList::abort()
|
void TupleJobList::abort()
|
||||||
{
|
{
|
||||||
if (fAborted == 0 && fIsRunning)
|
if (fAborted.load() == 0 && fIsRunning)
|
||||||
{
|
{
|
||||||
JobList::abort();
|
JobList::abort();
|
||||||
messageqcpp::ByteStream bs;
|
messageqcpp::ByteStream bs;
|
||||||
|
|||||||
@@ -25,6 +25,9 @@
|
|||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <map>
|
#include <map>
|
||||||
|
#include <atomic>
|
||||||
|
#include <iostream>
|
||||||
|
#include <boost/uuid/uuid_io.hpp>
|
||||||
#include <boost/shared_ptr.hpp>
|
#include <boost/shared_ptr.hpp>
|
||||||
|
|
||||||
#include "calpontsystemcatalog.h"
|
#include "calpontsystemcatalog.h"
|
||||||
@@ -163,7 +166,7 @@ class JobList
|
|||||||
EXPORT virtual void abort();
|
EXPORT virtual void abort();
|
||||||
EXPORT virtual bool aborted()
|
EXPORT virtual bool aborted()
|
||||||
{
|
{
|
||||||
return (fAborted != 0);
|
return (fAborted.load() != 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string toString() const;
|
std::string toString() const;
|
||||||
@@ -209,7 +212,7 @@ class JobList
|
|||||||
std::string fMiniInfo;
|
std::string fMiniInfo;
|
||||||
std::vector<SJLP> subqueryJoblists;
|
std::vector<SJLP> subqueryJoblists;
|
||||||
|
|
||||||
volatile uint32_t fAborted;
|
std::atomic<uint32_t> fAborted;
|
||||||
|
|
||||||
uint32_t fPriority; // higher #s = higher priority
|
uint32_t fPriority; // higher #s = higher priority
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -112,6 +112,47 @@ JobStep::JobStep(const JobInfo& j)
|
|||||||
fStepUuid = QueryTeleClient::genUUID();
|
fStepUuid = QueryTeleClient::genUUID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
// Copy constructor - needed because fDie is std::atomic which is not copyable
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
JobStep::JobStep(const JobStep& rhs)
|
||||||
|
: fInputJobStepAssociation(rhs.fInputJobStepAssociation)
|
||||||
|
, fOutputJobStepAssociation(rhs.fOutputJobStepAssociation)
|
||||||
|
, fSessionId(rhs.fSessionId)
|
||||||
|
, fTxnId(rhs.fTxnId)
|
||||||
|
, fVerId(rhs.fVerId)
|
||||||
|
, fStatementId(rhs.fStatementId)
|
||||||
|
, fStepId(rhs.fStepId)
|
||||||
|
, fTupleId(rhs.fTupleId)
|
||||||
|
, fAlias(rhs.fAlias)
|
||||||
|
, fView(rhs.fView)
|
||||||
|
, fPartitions(rhs.fPartitions)
|
||||||
|
, fName(rhs.fName)
|
||||||
|
, fSchema(rhs.fSchema)
|
||||||
|
, fTraceFlags(rhs.fTraceFlags)
|
||||||
|
, fCardinality(rhs.fCardinality)
|
||||||
|
, fDelayedRunFlag(rhs.fDelayedRunFlag)
|
||||||
|
, fDelivery(rhs.fDelivery)
|
||||||
|
, fOnClauseFilter(rhs.fOnClauseFilter)
|
||||||
|
, fDie(rhs.fDie.load(std::memory_order_relaxed))
|
||||||
|
, fWaitToRunStepCnt(rhs.fWaitToRunStepCnt)
|
||||||
|
, fExtendedInfo(rhs.fExtendedInfo)
|
||||||
|
, fMiniInfo(rhs.fMiniInfo)
|
||||||
|
, fPriority(rhs.fPriority)
|
||||||
|
, fErrorInfo(rhs.fErrorInfo)
|
||||||
|
, fLogger(rhs.fLogger)
|
||||||
|
, fLocalQuery(rhs.fLocalQuery)
|
||||||
|
, fQueryUuid(rhs.fQueryUuid)
|
||||||
|
, fStepUuid(rhs.fStepUuid)
|
||||||
|
, fQtc(rhs.fQtc)
|
||||||
|
, fProgress(rhs.fProgress)
|
||||||
|
, fStartTime(rhs.fStartTime)
|
||||||
|
, fLastStepTeleTime(rhs.fLastStepTeleTime)
|
||||||
|
, fTimeZone(rhs.fTimeZone)
|
||||||
|
, fMaxPmJoinResultCount(rhs.fMaxPmJoinResultCount)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
// Log a syslog msg for the start of this specified job step
|
// Log a syslog msg for the start of this specified job step
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -125,6 +125,7 @@ class JobStep
|
|||||||
*/
|
*/
|
||||||
JobStep() = default;
|
JobStep() = default;
|
||||||
explicit JobStep(const JobInfo&);
|
explicit JobStep(const JobInfo&);
|
||||||
|
JobStep(const JobStep&);
|
||||||
/** destructor
|
/** destructor
|
||||||
*/
|
*/
|
||||||
virtual ~JobStep()
|
virtual ~JobStep()
|
||||||
@@ -135,7 +136,7 @@ class JobStep
|
|||||||
virtual void run() = 0;
|
virtual void run() = 0;
|
||||||
virtual void abort()
|
virtual void abort()
|
||||||
{
|
{
|
||||||
fDie = true;
|
fDie.store(true, std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
/** @brief virtual void join method
|
/** @brief virtual void join method
|
||||||
*/
|
*/
|
||||||
@@ -388,7 +389,7 @@ class JobStep
|
|||||||
|
|
||||||
bool cancelled()
|
bool cancelled()
|
||||||
{
|
{
|
||||||
return (fErrorInfo->errCode > 0 || fDie);
|
return (fErrorInfo->errCode > 0 || fDie.load());
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual bool stringTableFriendly()
|
virtual bool stringTableFriendly()
|
||||||
@@ -482,7 +483,7 @@ class JobStep
|
|||||||
bool fDelayedRunFlag;
|
bool fDelayedRunFlag;
|
||||||
bool fDelivery;
|
bool fDelivery;
|
||||||
bool fOnClauseFilter;
|
bool fOnClauseFilter;
|
||||||
volatile bool fDie;
|
std::atomic<bool> fDie;
|
||||||
uint32_t fWaitToRunStepCnt;
|
uint32_t fWaitToRunStepCnt;
|
||||||
std::string fExtendedInfo;
|
std::string fExtendedInfo;
|
||||||
std::string fMiniInfo;
|
std::string fMiniInfo;
|
||||||
|
|||||||
@@ -29,6 +29,7 @@
|
|||||||
|
|
||||||
/** @file */
|
/** @file */
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <list>
|
#include <list>
|
||||||
@@ -135,12 +136,12 @@ class HashJoin
|
|||||||
uint32_t thrIdx;
|
uint32_t thrIdx;
|
||||||
TimeSet timeset;
|
TimeSet timeset;
|
||||||
JSTimeStamp dlTimes;
|
JSTimeStamp dlTimes;
|
||||||
volatile bool* die;
|
std::atomic<bool>* die;
|
||||||
} thrParams_t;
|
} thrParams_t;
|
||||||
|
|
||||||
HashJoin(joblist::BDLWrapper<element_t>& set1, joblist::BDLWrapper<element_t>& set2,
|
HashJoin(joblist::BDLWrapper<element_t>& set1, joblist::BDLWrapper<element_t>& set2,
|
||||||
joblist::DataList<element_t>* result1, joblist::DataList<element_t>* result2, JoinType joinType,
|
joblist::DataList<element_t>* result1, joblist::DataList<element_t>* result2, JoinType joinType,
|
||||||
JSTimeStamp* dlTimes, const SErrorInfo& status, uint32_t sessionId, volatile bool* die);
|
JSTimeStamp* dlTimes, const SErrorInfo& status, uint32_t sessionId, std::atomic<bool>* die);
|
||||||
|
|
||||||
HashJoin();
|
HashJoin();
|
||||||
HashJoin(const HashJoin& hj);
|
HashJoin(const HashJoin& hj);
|
||||||
@@ -213,7 +214,7 @@ class HashJoin
|
|||||||
void createHash(BucketDL<element_t>* bdlptr, hash_t* destHashTbl, const uint32_t idx,
|
void createHash(BucketDL<element_t>* bdlptr, hash_t* destHashTbl, const uint32_t idx,
|
||||||
bool populateResult, // true if bdlptr is opposite an outer join
|
bool populateResult, // true if bdlptr is opposite an outer join
|
||||||
joblist::DataList<element_t>* result, // populated if populateResult true
|
joblist::DataList<element_t>* result, // populated if populateResult true
|
||||||
JSTimeStamp& thrDlTimes, volatile bool* die);
|
JSTimeStamp& thrDlTimes, std::atomic<bool>* die);
|
||||||
void init();
|
void init();
|
||||||
TimeSet* getTimeSet()
|
TimeSet* getTimeSet()
|
||||||
{
|
{
|
||||||
@@ -252,7 +253,7 @@ class HashJoin
|
|||||||
TimeSet fTimeSet;
|
TimeSet fTimeSet;
|
||||||
SErrorInfo fStatus;
|
SErrorInfo fStatus;
|
||||||
uint32_t fSessionId;
|
uint32_t fSessionId;
|
||||||
volatile bool* die;
|
std::atomic<bool>* die;
|
||||||
};
|
};
|
||||||
|
|
||||||
template <typename element_t>
|
template <typename element_t>
|
||||||
@@ -262,7 +263,7 @@ template <typename element_t>
|
|||||||
HashJoin<element_t>::HashJoin(joblist::BDLWrapper<element_t>& set1, joblist::BDLWrapper<element_t>& set2,
|
HashJoin<element_t>::HashJoin(joblist::BDLWrapper<element_t>& set1, joblist::BDLWrapper<element_t>& set2,
|
||||||
joblist::DataList<element_t>* result1, joblist::DataList<element_t>* result2,
|
joblist::DataList<element_t>* result1, joblist::DataList<element_t>* result2,
|
||||||
JoinType joinType, JSTimeStamp* dlt, const SErrorInfo& status,
|
JoinType joinType, JSTimeStamp* dlt, const SErrorInfo& status,
|
||||||
uint32_t sessionId, volatile bool* d)
|
uint32_t sessionId, std::atomic<bool>* d)
|
||||||
: fTimeSet(), fStatus(status), fSessionId(sessionId)
|
: fTimeSet(), fStatus(status), fSessionId(sessionId)
|
||||||
{
|
{
|
||||||
fSet1 = set1;
|
fSet1 = set1;
|
||||||
@@ -464,7 +465,7 @@ template <typename element_t>
|
|||||||
void HashJoin<element_t>::createHash(BucketDL<element_t>* srcBucketDL, hash_t* destHashTbl,
|
void HashJoin<element_t>::createHash(BucketDL<element_t>* srcBucketDL, hash_t* destHashTbl,
|
||||||
const uint32_t bucketNum, bool populateResult,
|
const uint32_t bucketNum, bool populateResult,
|
||||||
joblist::DataList<element_t>* result, JSTimeStamp& thrDlTimes,
|
joblist::DataList<element_t>* result, JSTimeStamp& thrDlTimes,
|
||||||
volatile bool* die)
|
std::atomic<bool>* die)
|
||||||
{
|
{
|
||||||
bool more;
|
bool more;
|
||||||
element_t e;
|
element_t e;
|
||||||
@@ -485,7 +486,7 @@ void HashJoin<element_t>::createHash(BucketDL<element_t>* srcBucketDL, hash_t* d
|
|||||||
thrDlTimes.setFirstReadTime();
|
thrDlTimes.setFirstReadTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
for (; more & !(*die); more = srcBucketDL->next(bucketNum, bucketIter, &e))
|
for (; more & !die->load(); more = srcBucketDL->next(bucketNum, bucketIter, &e))
|
||||||
{
|
{
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
cout << "createHash() bkt " << bucketNum << " idx " << idx << " find(" << e.second << ")" << endl;
|
cout << "createHash() bkt " << bucketNum << " idx " << idx << " find(" << e.second << ")" << endl;
|
||||||
|
|||||||
@@ -1250,7 +1250,7 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep
|
|||||||
uint64_t totalMsgs;
|
uint64_t totalMsgs;
|
||||||
uint64_t msgsSent;
|
uint64_t msgsSent;
|
||||||
uint64_t msgsRecvd;
|
uint64_t msgsRecvd;
|
||||||
volatile bool finishedSending;
|
std::atomic<bool> finishedSending;
|
||||||
bool firstRead;
|
bool firstRead;
|
||||||
bool sendWaiting;
|
bool sendWaiting;
|
||||||
uint32_t recvWaiting;
|
uint32_t recvWaiting;
|
||||||
|
|||||||
@@ -22,6 +22,7 @@
|
|||||||
/** @file */
|
/** @file */
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <queue>
|
#include <queue>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
@@ -325,7 +326,7 @@ class ThreadSafeQueue
|
|||||||
impl_type fImpl;
|
impl_type fImpl;
|
||||||
SPBM fPimplLock;
|
SPBM fPimplLock;
|
||||||
SPBC fPimplCond;
|
SPBC fPimplCond;
|
||||||
volatile bool fShutdown;
|
std::atomic<bool> fShutdown;
|
||||||
T fBs0;
|
T fBs0;
|
||||||
size_t bytes;
|
size_t bytes;
|
||||||
uint32_t zeroCount; // counts the # of times read_some returned 0
|
uint32_t zeroCount; // counts the # of times read_some returned 0
|
||||||
|
|||||||
@@ -1679,7 +1679,7 @@ void TupleBPS::sendJobs(const vector<Job>& jobs)
|
|||||||
condvar.notify_all();
|
condvar.notify_all();
|
||||||
|
|
||||||
// Send not more than fMaxOutstandingRequests jobs out. min(blocksPerJob) = 16
|
// Send not more than fMaxOutstandingRequests jobs out. min(blocksPerJob) = 16
|
||||||
while ((msgsSent - msgsRecvd > fMaxOutstandingRequests * (blocksPerJob >> 1)) && !fDie)
|
while ((msgsSent - msgsRecvd > fMaxOutstandingRequests * (blocksPerJob >> 1)) && !fDie.load())
|
||||||
{
|
{
|
||||||
sendWaiting = true;
|
sendWaiting = true;
|
||||||
condvarWakeupProducer.wait(tplLock);
|
condvarWakeupProducer.wait(tplLock);
|
||||||
@@ -2843,7 +2843,7 @@ const string TupleBPS::toString() const
|
|||||||
if (bop == BOP_OR)
|
if (bop == BOP_OR)
|
||||||
oss << " BOP_OR ";
|
oss << " BOP_OR ";
|
||||||
|
|
||||||
if (fDie)
|
if (fDie.load())
|
||||||
oss << " aborting " << msgsSent << "/" << msgsRecvd << " " << uniqueID << " ";
|
oss << " aborting " << msgsSent << "/" << msgsRecvd << " " << uniqueID << " ";
|
||||||
|
|
||||||
if (fOutputJobStepAssociation.outSize() > 0)
|
if (fOutputJobStepAssociation.outSize() > 0)
|
||||||
@@ -3357,7 +3357,7 @@ void TupleBPS::dec(DistributedEngineComm* dec)
|
|||||||
|
|
||||||
void TupleBPS::abort_nolock()
|
void TupleBPS::abort_nolock()
|
||||||
{
|
{
|
||||||
if (fDie)
|
if (fDie.load())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
JobStep::abort();
|
JobStep::abort();
|
||||||
|
|||||||
@@ -675,7 +675,7 @@ void TupleHashJoinStep::hjRunner()
|
|||||||
errorMessage("too many threads");
|
errorMessage("too many threads");
|
||||||
status(logging::threadResourceErr);
|
status(logging::threadResourceErr);
|
||||||
errorLogging(emsg, logging::threadResourceErr);
|
errorLogging(emsg, logging::threadResourceErr);
|
||||||
fDie = true;
|
fDie.store(true, std::memory_order_relaxed);
|
||||||
deliverMutex.unlock();
|
deliverMutex.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -52,22 +52,22 @@ void BPPSendThread::sendResult(const Msg_t& msg, bool newConnection)
|
|||||||
if (sizeTooBig())
|
if (sizeTooBig())
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> sl1(respondLock);
|
std::unique_lock<std::mutex> sl1(respondLock);
|
||||||
while (currentByteSize >= queueBytesThresh && msgQueue.size() > 3 && !die)
|
while (currentByteSize.load() >= queueBytesThresh && msgQueue.size() > 3 && !die.load())
|
||||||
{
|
{
|
||||||
fProcessorPool->incBlockedThreads();
|
fProcessorPool->incBlockedThreads();
|
||||||
okToRespond.wait(sl1);
|
okToRespond.wait(sl1);
|
||||||
fProcessorPool->decBlockedThreads();
|
fProcessorPool->decBlockedThreads();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (die)
|
if (die.load())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
std::unique_lock<std::mutex> sl(msgQueueLock);
|
std::unique_lock<std::mutex> sl(msgQueueLock);
|
||||||
|
|
||||||
if (gotException)
|
if (gotException.load())
|
||||||
throw std::runtime_error(exceptionString);
|
throw std::runtime_error(exceptionString);
|
||||||
|
|
||||||
(void)atomicops::atomicAdd<uint64_t>(¤tByteSize, msg.msg->lengthWithHdrOverhead());
|
currentByteSize.fetch_add(msg.msg->lengthWithHdrOverhead(), std::memory_order_relaxed);
|
||||||
msgQueue.push(msg);
|
msgQueue.push(msg);
|
||||||
|
|
||||||
if (!sawAllConnections && newConnection)
|
if (!sawAllConnections && newConnection)
|
||||||
@@ -87,7 +87,7 @@ void BPPSendThread::sendResult(const Msg_t& msg, bool newConnection)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mainThreadWaiting)
|
if (mainThreadWaiting.load())
|
||||||
queueNotEmpty.notify_one();
|
queueNotEmpty.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -97,19 +97,19 @@ void BPPSendThread::sendResults(const std::vector<Msg_t>& msgs, bool newConnecti
|
|||||||
if (sizeTooBig())
|
if (sizeTooBig())
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> sl1(respondLock);
|
std::unique_lock<std::mutex> sl1(respondLock);
|
||||||
while (currentByteSize >= queueBytesThresh && msgQueue.size() > 3 && !die)
|
while (currentByteSize.load() >= queueBytesThresh && msgQueue.size() > 3 && !die.load())
|
||||||
{
|
{
|
||||||
fProcessorPool->incBlockedThreads();
|
fProcessorPool->incBlockedThreads();
|
||||||
okToRespond.wait(sl1);
|
okToRespond.wait(sl1);
|
||||||
fProcessorPool->decBlockedThreads();
|
fProcessorPool->decBlockedThreads();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (die)
|
if (die.load())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
std::unique_lock<std::mutex> sl(msgQueueLock);
|
std::unique_lock<std::mutex> sl(msgQueueLock);
|
||||||
|
|
||||||
if (gotException)
|
if (gotException.load())
|
||||||
throw std::runtime_error(exceptionString);
|
throw std::runtime_error(exceptionString);
|
||||||
|
|
||||||
if (!sawAllConnections && newConnection)
|
if (!sawAllConnections && newConnection)
|
||||||
@@ -132,11 +132,11 @@ void BPPSendThread::sendResults(const std::vector<Msg_t>& msgs, bool newConnecti
|
|||||||
|
|
||||||
for (uint32_t i = 0; i < msgs.size(); i++)
|
for (uint32_t i = 0; i < msgs.size(); i++)
|
||||||
{
|
{
|
||||||
(void)atomicops::atomicAdd<uint64_t>(¤tByteSize, msgs[i].msg->lengthWithHdrOverhead());
|
currentByteSize.fetch_add(msgs[i].msg->lengthWithHdrOverhead(), std::memory_order_relaxed);
|
||||||
msgQueue.push(msgs[i]);
|
msgQueue.push(msgs[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mainThreadWaiting)
|
if (mainThreadWaiting.load())
|
||||||
queueNotEmpty.notify_one();
|
queueNotEmpty.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -145,14 +145,14 @@ void BPPSendThread::sendMore(int num)
|
|||||||
std::unique_lock<std::mutex> sl(ackLock);
|
std::unique_lock<std::mutex> sl(ackLock);
|
||||||
|
|
||||||
if (num == -1)
|
if (num == -1)
|
||||||
fcEnabled = false;
|
fcEnabled.store(false, std::memory_order_relaxed);
|
||||||
else if (num == 0)
|
else if (num == 0)
|
||||||
{
|
{
|
||||||
fcEnabled = true;
|
fcEnabled.store(true, std::memory_order_relaxed);
|
||||||
msgsLeft = 0;
|
msgsLeft.store(0, std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
(void)atomicops::atomicAdd(&msgsLeft, num);
|
msgsLeft.fetch_add(num, std::memory_order_relaxed);
|
||||||
|
|
||||||
sl.unlock();
|
sl.unlock();
|
||||||
if (waiting)
|
if (waiting)
|
||||||
@@ -161,7 +161,7 @@ void BPPSendThread::sendMore(int num)
|
|||||||
|
|
||||||
bool BPPSendThread::flowControlEnabled()
|
bool BPPSendThread::flowControlEnabled()
|
||||||
{
|
{
|
||||||
return fcEnabled;
|
return fcEnabled.load();
|
||||||
}
|
}
|
||||||
|
|
||||||
void BPPSendThread::mainLoop()
|
void BPPSendThread::mainLoop()
|
||||||
@@ -175,15 +175,15 @@ void BPPSendThread::mainLoop()
|
|||||||
|
|
||||||
msg.reset(new Msg_t[msgCap]);
|
msg.reset(new Msg_t[msgCap]);
|
||||||
|
|
||||||
while (!die)
|
while (!die.load())
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> sl(msgQueueLock);
|
std::unique_lock<std::mutex> sl(msgQueueLock);
|
||||||
|
|
||||||
if (msgQueue.empty() && !die)
|
if (msgQueue.empty() && !die.load())
|
||||||
{
|
{
|
||||||
mainThreadWaiting = true;
|
mainThreadWaiting.store(true, std::memory_order_relaxed);
|
||||||
queueNotEmpty.wait(sl);
|
queueNotEmpty.wait(sl);
|
||||||
mainThreadWaiting = false;
|
mainThreadWaiting.store(false, std::memory_order_relaxed);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -202,14 +202,14 @@ void BPPSendThread::mainLoop()
|
|||||||
* i how many msgs are sent by 1 run of the loop, limited by msgCount or msgsLeft. */
|
* i how many msgs are sent by 1 run of the loop, limited by msgCount or msgsLeft. */
|
||||||
msgsSent = 0;
|
msgsSent = 0;
|
||||||
|
|
||||||
while (msgsSent < msgCount && !die)
|
while (msgsSent < msgCount && !die.load())
|
||||||
{
|
{
|
||||||
uint64_t bsSize;
|
uint64_t bsSize;
|
||||||
|
|
||||||
if (msgsLeft <= 0 && fcEnabled && !die)
|
if (msgsLeft.load() <= 0 && fcEnabled.load() && !die.load())
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> sl2(ackLock);
|
std::unique_lock<std::mutex> sl2(ackLock);
|
||||||
while (msgsLeft <= 0 && fcEnabled && !die)
|
while (msgsLeft.load() <= 0 && fcEnabled.load() && !die.load())
|
||||||
{
|
{
|
||||||
waiting = true;
|
waiting = true;
|
||||||
okToSend.wait(sl2);
|
okToSend.wait(sl2);
|
||||||
@@ -217,7 +217,7 @@ void BPPSendThread::mainLoop()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (i = 0; msgsSent < msgCount && ((fcEnabled && msgsLeft > 0) || !fcEnabled) && !die; msgsSent++, i++)
|
for (i = 0; msgsSent < msgCount && ((fcEnabled.load() && msgsLeft.load() > 0) || !fcEnabled.load()) && !die.load(); msgsSent++, i++)
|
||||||
{
|
{
|
||||||
if (doLoadBalancing)
|
if (doLoadBalancing)
|
||||||
{
|
{
|
||||||
@@ -249,17 +249,17 @@ void BPPSendThread::mainLoop()
|
|||||||
{
|
{
|
||||||
sl.lock();
|
sl.lock();
|
||||||
exceptionString = e.what();
|
exceptionString = e.what();
|
||||||
gotException = true;
|
gotException.store(true, std::memory_order_relaxed);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)atomicops::atomicDec(&msgsLeft);
|
msgsLeft.fetch_sub(1, std::memory_order_relaxed);
|
||||||
(void)atomicops::atomicSub(¤tByteSize, bsSize);
|
currentByteSize.fetch_sub(bsSize, std::memory_order_relaxed);
|
||||||
msg[msgsSent].msg.reset();
|
msg[msgsSent].msg.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fProcessorPool->blockedThreadCount() > 0 && currentByteSize < queueBytesThresh)
|
if (fProcessorPool->blockedThreadCount() > 0 && currentByteSize.load() < queueBytesThresh)
|
||||||
{
|
{
|
||||||
okToRespond.notify_one();
|
okToRespond.notify_one();
|
||||||
}
|
}
|
||||||
@@ -273,7 +273,7 @@ void BPPSendThread::abort()
|
|||||||
std::lock_guard<std::mutex> sl2(ackLock);
|
std::lock_guard<std::mutex> sl2(ackLock);
|
||||||
std::lock_guard<std::mutex> sl3(respondLock);
|
std::lock_guard<std::mutex> sl3(respondLock);
|
||||||
|
|
||||||
die = true;
|
die.store(true, std::memory_order_relaxed);
|
||||||
|
|
||||||
queueNotEmpty.notify_all();
|
queueNotEmpty.notify_all();
|
||||||
okToSend.notify_all();
|
okToSend.notify_all();
|
||||||
|
|||||||
@@ -26,6 +26,7 @@
|
|||||||
|
|
||||||
#include "fair_threadpool.h"
|
#include "fair_threadpool.h"
|
||||||
#include "umsocketselector.h"
|
#include "umsocketselector.h"
|
||||||
|
#include <atomic>
|
||||||
#include <queue>
|
#include <queue>
|
||||||
#include <set>
|
#include <set>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
@@ -103,12 +104,12 @@ class BPPSendThread
|
|||||||
std::queue<Msg_t> msgQueue;
|
std::queue<Msg_t> msgQueue;
|
||||||
std::mutex msgQueueLock;
|
std::mutex msgQueueLock;
|
||||||
std::condition_variable queueNotEmpty;
|
std::condition_variable queueNotEmpty;
|
||||||
volatile bool die = false;
|
std::atomic<bool> die{false};
|
||||||
volatile bool gotException = false;
|
std::atomic<bool> gotException{false};
|
||||||
volatile bool mainThreadWaiting = false;
|
std::atomic<bool> mainThreadWaiting{false};
|
||||||
std::string exceptionString;
|
std::string exceptionString;
|
||||||
uint32_t queueMsgThresh = 0;
|
uint32_t queueMsgThresh = 0;
|
||||||
volatile int32_t msgsLeft = -1;
|
std::atomic<int32_t> msgsLeft{-1};
|
||||||
bool waiting = false;
|
bool waiting = false;
|
||||||
std::mutex ackLock;
|
std::mutex ackLock;
|
||||||
std::condition_variable okToSend;
|
std::condition_variable okToSend;
|
||||||
@@ -136,10 +137,10 @@ class BPPSendThread
|
|||||||
std::set<Connection_t> connections_s;
|
std::set<Connection_t> connections_s;
|
||||||
std::vector<Connection_t> connections_v;
|
std::vector<Connection_t> connections_v;
|
||||||
bool sawAllConnections = false;
|
bool sawAllConnections = false;
|
||||||
volatile bool fcEnabled = false;
|
std::atomic<bool> fcEnabled{false};
|
||||||
|
|
||||||
/* secondary queue size restriction based on byte size */
|
/* secondary queue size restriction based on byte size */
|
||||||
volatile uint64_t currentByteSize = 0;
|
std::atomic<uint64_t> currentByteSize{0};
|
||||||
uint64_t queueBytesThresh;
|
uint64_t queueBytesThresh;
|
||||||
// Used to tell the ThreadPool It should consider additional threads because a
|
// Used to tell the ThreadPool It should consider additional threads because a
|
||||||
// queue full event has happened and a thread has been blocked.
|
// queue full event has happened and a thread has been blocked.
|
||||||
|
|||||||
@@ -145,7 +145,7 @@ boost::mutex bppLock;
|
|||||||
boost::mutex djMutex; // lock for djLock, lol.
|
boost::mutex djMutex; // lock for djLock, lol.
|
||||||
std::map<uint64_t, boost::shared_mutex*> djLock; // djLock synchronizes destroy and joiner msgs, see bug 2619
|
std::map<uint64_t, boost::shared_mutex*> djLock; // djLock synchronizes destroy and joiner msgs, see bug 2619
|
||||||
|
|
||||||
volatile int32_t asyncCounter;
|
std::atomic<int32_t> asyncCounter;
|
||||||
const int asyncMax = 20; // current number of asynchronous loads
|
const int asyncMax = 20; // current number of asynchronous loads
|
||||||
|
|
||||||
struct preFetchCond
|
struct preFetchCond
|
||||||
@@ -858,8 +858,8 @@ struct AsynchLoader
|
|||||||
{
|
{
|
||||||
sendThread->abort();
|
sendThread->abort();
|
||||||
cerr << "AsynchLoader caught loadBlock exception: " << ex.what() << endl;
|
cerr << "AsynchLoader caught loadBlock exception: " << ex.what() << endl;
|
||||||
idbassert(asyncCounter > 0);
|
idbassert(asyncCounter.load() > 0);
|
||||||
(void)atomicops::atomicDec(&asyncCounter);
|
asyncCounter.fetch_sub(1, std::memory_order_relaxed);
|
||||||
mutex->lock();
|
mutex->lock();
|
||||||
--(*busyLoaders);
|
--(*busyLoaders);
|
||||||
mutex->unlock();
|
mutex->unlock();
|
||||||
@@ -874,8 +874,8 @@ struct AsynchLoader
|
|||||||
sendThread->abort();
|
sendThread->abort();
|
||||||
cerr << "AsynchLoader caught unknown exception: " << endl;
|
cerr << "AsynchLoader caught unknown exception: " << endl;
|
||||||
// FIXME Use a locked processor primitive?
|
// FIXME Use a locked processor primitive?
|
||||||
idbassert(asyncCounter > 0);
|
idbassert(asyncCounter.load() > 0);
|
||||||
(void)atomicops::atomicDec(&asyncCounter);
|
asyncCounter.fetch_sub(1, std::memory_order_relaxed);
|
||||||
mutex->lock();
|
mutex->lock();
|
||||||
--(*busyLoaders);
|
--(*busyLoaders);
|
||||||
mutex->unlock();
|
mutex->unlock();
|
||||||
@@ -885,8 +885,8 @@ struct AsynchLoader
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
idbassert(asyncCounter > 0);
|
idbassert(asyncCounter.load() > 0);
|
||||||
(void)atomicops::atomicDec(&asyncCounter);
|
asyncCounter.fetch_sub(1, std::memory_order_relaxed);
|
||||||
mutex->lock();
|
mutex->lock();
|
||||||
|
|
||||||
if (cached)
|
if (cached)
|
||||||
@@ -943,12 +943,10 @@ void loadBlockAsync(uint64_t lbid, const QueryContext& c, uint32_t txn, int comp
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
/* a quick and easy stand-in for a threadpool for loaders */
|
/* a quick and easy stand-in for a threadpool for loaders */
|
||||||
atomicops::atomicMb();
|
if (asyncCounter.load(std::memory_order_relaxed) >= asyncMax)
|
||||||
|
|
||||||
if (asyncCounter >= asyncMax)
|
|
||||||
return;
|
return;
|
||||||
|
|
||||||
(void)atomicops::atomicInc(&asyncCounter);
|
asyncCounter.fetch_add(1, std::memory_order_relaxed);
|
||||||
|
|
||||||
boost::mutex::scoped_lock sl(*m);
|
boost::mutex::scoped_lock sl(*m);
|
||||||
|
|
||||||
@@ -961,8 +959,8 @@ void loadBlockAsync(uint64_t lbid, const QueryContext& c, uint32_t txn, int comp
|
|||||||
catch (boost::thread_resource_error& e)
|
catch (boost::thread_resource_error& e)
|
||||||
{
|
{
|
||||||
cerr << "AsynchLoader: caught a thread resource error, need to lower asyncMax\n";
|
cerr << "AsynchLoader: caught a thread resource error, need to lower asyncMax\n";
|
||||||
idbassert(asyncCounter > 0);
|
idbassert(asyncCounter.load() > 0);
|
||||||
(void)atomicops::atomicDec(&asyncCounter);
|
asyncCounter.fetch_sub(1, std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2272,7 +2270,7 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro
|
|||||||
// that can reschedule jobs, and an unlimited non-blocking queue
|
// that can reschedule jobs, and an unlimited non-blocking queue
|
||||||
fOOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1));
|
fOOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1));
|
||||||
|
|
||||||
asyncCounter = 0;
|
asyncCounter.store(0, std::memory_order_relaxed);
|
||||||
|
|
||||||
brm = new DBRM();
|
brm = new DBRM();
|
||||||
|
|
||||||
|
|||||||
@@ -141,9 +141,9 @@ bf::path Ownership::get(const bf::path& p, bool getOwnership)
|
|||||||
|
|
||||||
#define DELETE(p, f) ::unlink((metadataPrefix / p / f).string().c_str());
|
#define DELETE(p, f) ::unlink((metadataPrefix / p / f).string().c_str());
|
||||||
|
|
||||||
void Ownership::touchFlushing(const bf::path& prefix, volatile bool* doneFlushing) const
|
void Ownership::touchFlushing(const bf::path& prefix, std::atomic<bool>* doneFlushing) const
|
||||||
{
|
{
|
||||||
while (!*doneFlushing)
|
while (!doneFlushing->load())
|
||||||
{
|
{
|
||||||
TOUCH(prefix, "FLUSHING");
|
TOUCH(prefix, "FLUSHING");
|
||||||
try
|
try
|
||||||
@@ -182,13 +182,13 @@ void Ownership::releaseOwnership(const bf::path& p, bool isDtor)
|
|||||||
|
|
||||||
s.unlock();
|
s.unlock();
|
||||||
|
|
||||||
volatile bool done = false;
|
std::atomic<bool> done{false};
|
||||||
|
|
||||||
// start flushing
|
// start flushing
|
||||||
boost::thread xfer([this, &p, &done] { this->touchFlushing(p, &done); });
|
boost::thread xfer([this, &p, &done] { this->touchFlushing(p, &done); });
|
||||||
Synchronizer::get()->dropPrefix(p);
|
Synchronizer::get()->dropPrefix(p);
|
||||||
Cache::get()->dropPrefix(p);
|
Cache::get()->dropPrefix(p);
|
||||||
done = true;
|
done.store(true);
|
||||||
xfer.interrupt();
|
xfer.interrupt();
|
||||||
xfer.join();
|
xfer.join();
|
||||||
|
|
||||||
@@ -271,14 +271,14 @@ void Ownership::takeOwnership(const bf::path& p)
|
|||||||
_takeOwnership(p);
|
_takeOwnership(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ownership::Monitor::Monitor(Ownership* _owner) : owner(_owner), stop(false)
|
Ownership::Monitor::Monitor(Ownership* _owner) : owner(_owner), stop{false}
|
||||||
{
|
{
|
||||||
thread = boost::thread([this] { this->watchForInterlopers(); });
|
thread = boost::thread([this] { this->watchForInterlopers(); });
|
||||||
}
|
}
|
||||||
|
|
||||||
Ownership::Monitor::~Monitor()
|
Ownership::Monitor::~Monitor()
|
||||||
{
|
{
|
||||||
stop = true;
|
stop.store(true);
|
||||||
thread.interrupt();
|
thread.interrupt();
|
||||||
thread.join();
|
thread.join();
|
||||||
}
|
}
|
||||||
@@ -291,14 +291,14 @@ void Ownership::Monitor::watchForInterlopers()
|
|||||||
char buf[80];
|
char buf[80];
|
||||||
vector<bf::path> releaseList;
|
vector<bf::path> releaseList;
|
||||||
|
|
||||||
while (!stop)
|
while (!stop.load())
|
||||||
{
|
{
|
||||||
releaseList.clear();
|
releaseList.clear();
|
||||||
boost::unique_lock<boost::mutex> s(owner->mutex);
|
boost::unique_lock<boost::mutex> s(owner->mutex);
|
||||||
|
|
||||||
for (auto& prefix : owner->ownedPrefixes)
|
for (auto& prefix : owner->ownedPrefixes)
|
||||||
{
|
{
|
||||||
if (stop)
|
if (stop.load())
|
||||||
break;
|
break;
|
||||||
if (prefix.second == false)
|
if (prefix.second == false)
|
||||||
continue;
|
continue;
|
||||||
@@ -318,7 +318,7 @@ void Ownership::Monitor::watchForInterlopers()
|
|||||||
|
|
||||||
for (auto& prefix : releaseList)
|
for (auto& prefix : releaseList)
|
||||||
owner->releaseOwnership(prefix);
|
owner->releaseOwnership(prefix);
|
||||||
if (stop)
|
if (stop.load())
|
||||||
break;
|
break;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -17,6 +17,7 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
#include <boost/filesystem/path.hpp>
|
#include <boost/filesystem/path.hpp>
|
||||||
#include <boost/noncopyable.hpp>
|
#include <boost/noncopyable.hpp>
|
||||||
#include <boost/thread.hpp>
|
#include <boost/thread.hpp>
|
||||||
@@ -46,7 +47,7 @@ class Ownership : public boost::noncopyable
|
|||||||
boost::filesystem::path metadataPrefix;
|
boost::filesystem::path metadataPrefix;
|
||||||
SMLogging* logger;
|
SMLogging* logger;
|
||||||
|
|
||||||
void touchFlushing(const boost::filesystem::path&, volatile bool*) const;
|
void touchFlushing(const boost::filesystem::path&, std::atomic<bool>*) const;
|
||||||
void takeOwnership(const boost::filesystem::path&);
|
void takeOwnership(const boost::filesystem::path&);
|
||||||
void releaseOwnership(const boost::filesystem::path&, bool isDtor = false);
|
void releaseOwnership(const boost::filesystem::path&, bool isDtor = false);
|
||||||
void _takeOwnership(const boost::filesystem::path&);
|
void _takeOwnership(const boost::filesystem::path&);
|
||||||
@@ -57,7 +58,7 @@ class Ownership : public boost::noncopyable
|
|||||||
~Monitor();
|
~Monitor();
|
||||||
boost::thread thread;
|
boost::thread thread;
|
||||||
Ownership* owner;
|
Ownership* owner;
|
||||||
volatile bool stop;
|
std::atomic<bool> stop;
|
||||||
void watchForInterlopers();
|
void watchForInterlopers();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -54,7 +54,7 @@ namespace
|
|||||||
boost::mutex CacheOpsMutex;
|
boost::mutex CacheOpsMutex;
|
||||||
|
|
||||||
// This global is updated only w/ atomic ops
|
// This global is updated only w/ atomic ops
|
||||||
volatile uint32_t MultiReturnCode;
|
std::atomic<uint32_t> MultiReturnCode;
|
||||||
|
|
||||||
int32_t extractRespCode(const ByteStream& bs)
|
int32_t extractRespCode(const ByteStream& bs)
|
||||||
{
|
{
|
||||||
@@ -95,7 +95,10 @@ class CacheOpThread
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
atomicops::atomicCAS<uint32_t>(&MultiReturnCode, 0, 1);
|
{
|
||||||
|
uint32_t expected = 0;
|
||||||
|
MultiReturnCode.compare_exchange_strong(expected, 1, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@@ -123,7 +126,7 @@ int sendToAll(const ByteStream& outBs)
|
|||||||
|
|
||||||
thread_group tg;
|
thread_group tg;
|
||||||
int rc = 0;
|
int rc = 0;
|
||||||
MultiReturnCode = 0;
|
MultiReturnCode.store(0, std::memory_order_relaxed);
|
||||||
|
|
||||||
for (int i = 0; i < cnt; i++)
|
for (int i = 0; i < cnt; i++)
|
||||||
{
|
{
|
||||||
@@ -134,7 +137,7 @@ int sendToAll(const ByteStream& outBs)
|
|||||||
|
|
||||||
tg.join_all();
|
tg.join_all();
|
||||||
|
|
||||||
if (MultiReturnCode != 0)
|
if (MultiReturnCode.load(std::memory_order_relaxed) != 0)
|
||||||
rc = -1;
|
rc = -1;
|
||||||
|
|
||||||
return rc;
|
return rc;
|
||||||
|
|||||||
@@ -58,7 +58,7 @@ TsTeleQueue<querytele::StepTele> stQueue;
|
|||||||
TsTeleQueue<querytele::QueryTele> qtQueue;
|
TsTeleQueue<querytele::QueryTele> qtQueue;
|
||||||
TsTeleQueue<querytele::ImportTele> itQueue;
|
TsTeleQueue<querytele::ImportTele> itQueue;
|
||||||
|
|
||||||
volatile bool isInited = false;
|
std::atomic<bool> isInited{false};
|
||||||
boost::mutex initMux;
|
boost::mutex initMux;
|
||||||
|
|
||||||
std::shared_ptr<att::TSocket> fSocket;
|
std::shared_ptr<att::TSocket> fSocket;
|
||||||
@@ -337,9 +337,7 @@ QueryTeleProtoImpl::QueryTeleProtoImpl(const QueryTeleServerParms& sp) : fServer
|
|||||||
|
|
||||||
boost::mutex::scoped_lock lk(initMux);
|
boost::mutex::scoped_lock lk(initMux);
|
||||||
|
|
||||||
atomicops::atomicMb();
|
if (isInited.load(std::memory_order_acquire))
|
||||||
|
|
||||||
if (isInited)
|
|
||||||
return;
|
return;
|
||||||
|
|
||||||
fSocket.reset(new att::TSocket(fServerParms.host, fServerParms.port));
|
fSocket.reset(new att::TSocket(fServerParms.host, fServerParms.port));
|
||||||
@@ -348,8 +346,7 @@ QueryTeleProtoImpl::QueryTeleProtoImpl(const QueryTeleServerParms& sp) : fServer
|
|||||||
|
|
||||||
consThd = new boost::thread(&TeleConsumer);
|
consThd = new boost::thread(&TeleConsumer);
|
||||||
|
|
||||||
atomicops::atomicMb();
|
isInited.store(true, std::memory_order_release);
|
||||||
isInited = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int QueryTeleProtoImpl::enqStepTele(const StepTele& stdata)
|
int QueryTeleProtoImpl::enqStepTele(const StepTele& stdata)
|
||||||
|
|||||||
@@ -26,6 +26,9 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <mutex>
|
||||||
#include <boost/thread.hpp>
|
#include <boost/thread.hpp>
|
||||||
#include <boost/scoped_ptr.hpp>
|
#include <boost/scoped_ptr.hpp>
|
||||||
|
|
||||||
@@ -255,7 +258,8 @@ class MasterDBRMNode
|
|||||||
std::condition_variable cpimportJobsCond;
|
std::condition_variable cpimportJobsCond;
|
||||||
int runners, NumWorkers;
|
int runners, NumWorkers;
|
||||||
ThreadParams* params;
|
ThreadParams* params;
|
||||||
volatile bool die, halting;
|
std::atomic<bool> die;
|
||||||
|
std::atomic<bool> halting;
|
||||||
bool reloadCmd;
|
bool reloadCmd;
|
||||||
mutable bool readOnly;
|
mutable bool readOnly;
|
||||||
// Maximum time to wait for worker responses/reconfigure before forcing read-only
|
// Maximum time to wait for worker responses/reconfigure before forcing read-only
|
||||||
|
|||||||
@@ -86,7 +86,7 @@ const uint32_t SessionManagerServer::SS_FORCE =
|
|||||||
const uint32_t SessionManagerServer::SS_QUERY_READY =
|
const uint32_t SessionManagerServer::SS_QUERY_READY =
|
||||||
1 << 6; // Set by ProcManager when system is ready for queries
|
1 << 6; // Set by ProcManager when system is ready for queries
|
||||||
|
|
||||||
SessionManagerServer::SessionManagerServer() : unique32(0), unique64(0)
|
SessionManagerServer::SessionManagerServer() : unique32{0}, unique64{0}
|
||||||
{
|
{
|
||||||
config::Config* conf;
|
config::Config* conf;
|
||||||
string stmp;
|
string stmp;
|
||||||
|
|||||||
@@ -26,9 +26,10 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <string>
|
||||||
|
#include <set>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <condition_variable>
|
|
||||||
|
|
||||||
#include <unordered_set>
|
#include <unordered_set>
|
||||||
#include <boost/thread/mutex.hpp>
|
#include <boost/thread/mutex.hpp>
|
||||||
#include <boost/thread/condition_variable.hpp>
|
#include <boost/thread/condition_variable.hpp>
|
||||||
@@ -209,7 +210,7 @@ class SessionManagerServer
|
|||||||
*/
|
*/
|
||||||
uint32_t getUnique32()
|
uint32_t getUnique32()
|
||||||
{
|
{
|
||||||
return atomicops::atomicInc(&unique32);
|
return unique32.fetch_add(1, std::memory_order_relaxed) + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -217,7 +218,7 @@ class SessionManagerServer
|
|||||||
*/
|
*/
|
||||||
uint64_t getUnique64()
|
uint64_t getUnique64()
|
||||||
{
|
{
|
||||||
return atomicops::atomicInc(&unique64);
|
return unique64.fetch_add(1, std::memory_order_relaxed) + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @brief Resets the semaphores to their original state. For testing only.
|
/** @brief Resets the semaphores to their original state. For testing only.
|
||||||
@@ -274,8 +275,8 @@ class SessionManagerServer
|
|||||||
void finishTransaction(TxnID& txn);
|
void finishTransaction(TxnID& txn);
|
||||||
void saveSMTxnIDAndState();
|
void saveSMTxnIDAndState();
|
||||||
|
|
||||||
volatile uint32_t unique32;
|
std::atomic<uint32_t> unique32;
|
||||||
volatile uint64_t unique64;
|
std::atomic<uint64_t> unique64;
|
||||||
|
|
||||||
int maxTxns; // the maximum number of concurrent transactions
|
int maxTxns; // the maximum number of concurrent transactions
|
||||||
std::string txnidFilename;
|
std::string txnidFilename;
|
||||||
|
|||||||
@@ -30,5 +30,5 @@
|
|||||||
namespace WriteEngine
|
namespace WriteEngine
|
||||||
{
|
{
|
||||||
/*static*/
|
/*static*/
|
||||||
volatile int BulkStatus::fJobStatus = EXIT_SUCCESS;
|
std::atomic<int> BulkStatus::fJobStatus{EXIT_SUCCESS};
|
||||||
} // namespace WriteEngine
|
} // namespace WriteEngine
|
||||||
|
|||||||
@@ -23,6 +23,8 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
|
||||||
#if 0 // defined(_MSC_VER) && defined(WE_BULKSTATUS_DLLEXPORT)
|
#if 0 // defined(_MSC_VER) && defined(WE_BULKSTATUS_DLLEXPORT)
|
||||||
#define EXPORT __declspec(dllexport)
|
#define EXPORT __declspec(dllexport)
|
||||||
#else
|
#else
|
||||||
@@ -48,12 +50,10 @@ class BulkStatus
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
/* @brief Global job status flag.
|
/* @brief Global job status flag.
|
||||||
* Declared volatile to insure that all threads see when this flag is
|
* Using std::atomic to ensure thread-safe access without requiring a mutex.
|
||||||
* changed. We don't worry about using a mutex since we are just using
|
* atomic provides proper memory ordering guarantees for multi-threaded access.
|
||||||
* as a flag. Making the variable volatile should suffice, to make it
|
|
||||||
* work with multiple threads.
|
|
||||||
*/
|
*/
|
||||||
static volatile int fJobStatus;
|
static std::atomic<int> fJobStatus;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace WriteEngine
|
} // namespace WriteEngine
|
||||||
|
|||||||
@@ -152,7 +152,7 @@ ColumnInfo::ColumnInfo(Log* logger, int idIn, const JobColumn& columnIn, DBRootE
|
|||||||
, fMaxNumRowsPerSegFile(0)
|
, fMaxNumRowsPerSegFile(0)
|
||||||
, fStore(0)
|
, fStore(0)
|
||||||
, fAutoIncLastValue(0)
|
, fAutoIncLastValue(0)
|
||||||
, fSaturatedRowCnt(0)
|
, fSaturatedRowCnt{0}
|
||||||
, fpTableInfo(pTableInfo)
|
, fpTableInfo(pTableInfo)
|
||||||
, fAutoIncMgr(0)
|
, fAutoIncMgr(0)
|
||||||
, fDbRootExtTrk(pDBRootExtTrk)
|
, fDbRootExtTrk(pDBRootExtTrk)
|
||||||
|
|||||||
@@ -472,7 +472,7 @@ class ColumnInfo : public WeUIDGID
|
|||||||
// For autoincrement column only... Tracks latest autoincrement value used
|
// For autoincrement column only... Tracks latest autoincrement value used
|
||||||
long long fAutoIncLastValue;
|
long long fAutoIncLastValue;
|
||||||
|
|
||||||
volatile int64_t fSaturatedRowCnt; // No. of rows with saturated values
|
std::atomic<int64_t> fSaturatedRowCnt; // No. of rows with saturated values
|
||||||
|
|
||||||
// List of segment files updated during an import; used to track infor-
|
// List of segment files updated during an import; used to track infor-
|
||||||
// mation necessary to update the ExtentMap at the "end" of the import.
|
// mation necessary to update the ExtentMap at the "end" of the import.
|
||||||
@@ -517,7 +517,7 @@ inline int64_t ColumnInfo::getFileSize() const
|
|||||||
|
|
||||||
inline void ColumnInfo::incSaturatedCnt(int64_t satIncCnt)
|
inline void ColumnInfo::incSaturatedCnt(int64_t satIncCnt)
|
||||||
{
|
{
|
||||||
(void)atomicops::atomicAdd(&fSaturatedRowCnt, satIncCnt);
|
fSaturatedRowCnt.fetch_add(satIncCnt, std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline bool ColumnInfo::isAbbrevExtent()
|
inline bool ColumnInfo::isAbbrevExtent()
|
||||||
@@ -537,7 +537,7 @@ inline void ColumnInfo::printCPInfo(JobColumn column)
|
|||||||
|
|
||||||
inline long long ColumnInfo::saturatedCnt()
|
inline long long ColumnInfo::saturatedCnt()
|
||||||
{
|
{
|
||||||
return fSaturatedRowCnt;
|
return fSaturatedRowCnt.load(std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void ColumnInfo::relativeColWidthFactor(int colWidFactor)
|
inline void ColumnInfo::relativeColWidthFactor(int colWidFactor)
|
||||||
|
|||||||
@@ -21,6 +21,7 @@
|
|||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
#include <sys/time.h>
|
#include <sys/time.h>
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
@@ -69,10 +70,10 @@ class TableInfo : public WeUIDGID
|
|||||||
// to read import files. Comes from
|
// to read import files. Comes from
|
||||||
// writeBufferSize tag in job xml file
|
// writeBufferSize tag in job xml file
|
||||||
char fColDelim; // Used to delimit col values in a row
|
char fColDelim; // Used to delimit col values in a row
|
||||||
volatile Status fStatusTI; // Status of table. Made volatile to
|
std::atomic<Status> fStatusTI; // Status of table. Using atomic to
|
||||||
// insure BulkLoad methods can access
|
// ensure BulkLoad methods can access
|
||||||
// (thru getStatusTI()) correctly w/o
|
// correctly across threads with proper
|
||||||
// having to go through a mutex lock.
|
// memory ordering guarantees.
|
||||||
int fReadBufCount; // Number of read buffers
|
int fReadBufCount; // Number of read buffers
|
||||||
// (size of fBuffers vector)
|
// (size of fBuffers vector)
|
||||||
unsigned fNumberOfColumns; // Number of ColumnInfo objs in this tbl
|
unsigned fNumberOfColumns; // Number of ColumnInfo objs in this tbl
|
||||||
@@ -134,7 +135,7 @@ class TableInfo : public WeUIDGID
|
|||||||
// to use for TIMESTAMP data type. For example,
|
// to use for TIMESTAMP data type. For example,
|
||||||
// for EST which is UTC-5:00, offset will be -18000s.
|
// for EST which is UTC-5:00, offset will be -18000s.
|
||||||
|
|
||||||
volatile bool fTableLocked; // Do we have db table lock
|
std::atomic<bool> fTableLocked; // Do we have db table lock
|
||||||
|
|
||||||
bool fReadFromStdin; // Read import file from STDIN
|
bool fReadFromStdin; // Read import file from STDIN
|
||||||
bool fReadFromS3; // Read import file from S3
|
bool fReadFromS3; // Read import file from S3
|
||||||
|
|||||||
@@ -173,9 +173,8 @@ bool isWESConfigured(config::Config* config, const std::string& fOtherEnd)
|
|||||||
|
|
||||||
namespace WriteEngine
|
namespace WriteEngine
|
||||||
{
|
{
|
||||||
WEClients::WEClients(int PrgmID) : fPrgmID(PrgmID), pmCount(0)
|
WEClients::WEClients(int PrgmID) : fPrgmID(PrgmID), closingConnection{0}, pmCount(0)
|
||||||
{
|
{
|
||||||
closingConnection = 0;
|
|
||||||
Setup();
|
Setup();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -317,7 +316,7 @@ bool WEClients::isConnectionReadonly(uint32_t connection)
|
|||||||
int WEClients::Close()
|
int WEClients::Close()
|
||||||
{
|
{
|
||||||
makeBusy(false);
|
makeBusy(false);
|
||||||
closingConnection = 1;
|
closingConnection.store(1, std::memory_order_relaxed);
|
||||||
ByteStream bs;
|
ByteStream bs;
|
||||||
bs << (ByteStream::byte)WE_SVR_CLOSE_CONNECTION;
|
bs << (ByteStream::byte)WE_SVR_CLOSE_CONNECTION;
|
||||||
write_to_all(bs);
|
write_to_all(bs);
|
||||||
@@ -356,7 +355,7 @@ void WEClients::Listen(boost::shared_ptr<MessageQueueClient> client, uint32_t co
|
|||||||
}
|
}
|
||||||
else // got zero bytes on read, nothing more will come
|
else // got zero bytes on read, nothing more will come
|
||||||
{
|
{
|
||||||
if (closingConnection > 0)
|
if (closingConnection.load() > 0)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -390,7 +389,7 @@ Error:
|
|||||||
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
|
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
|
||||||
{
|
{
|
||||||
map_tok->second->queue.clear();
|
map_tok->second->queue.clear();
|
||||||
(void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
|
map_tok->second->unackedWork[0].fetch_add(1, std::memory_order_relaxed);
|
||||||
map_tok->second->queue.push(sbs);
|
map_tok->second->queue.push(sbs);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -425,9 +424,7 @@ void WEClients::addQueue(uint32_t key)
|
|||||||
|
|
||||||
boost::mutex* lock = new boost::mutex();
|
boost::mutex* lock = new boost::mutex();
|
||||||
condition* cond = new condition();
|
condition* cond = new condition();
|
||||||
boost::shared_ptr<MQE> mqe(new MQE(pmCount));
|
boost::shared_ptr<MQE> mqe(new MQE(pmCount, lock, cond));
|
||||||
|
|
||||||
mqe->queue = WESMsgQueue(lock, cond);
|
|
||||||
|
|
||||||
boost::mutex::scoped_lock lk(fMlock);
|
boost::mutex::scoped_lock lk(fMlock);
|
||||||
b = fSessionMessages.insert(pair<uint32_t, boost::shared_ptr<MQE> >(key, mqe)).second;
|
b = fSessionMessages.insert(pair<uint32_t, boost::shared_ptr<MQE> >(key, mqe)).second;
|
||||||
@@ -562,7 +559,7 @@ void WEClients::addDataToOutput(SBS sbs, uint32_t connIndex)
|
|||||||
|
|
||||||
if (pmCount > 0)
|
if (pmCount > 0)
|
||||||
{
|
{
|
||||||
atomicops::atomicInc(&mqe->unackedWork[connIndex % pmCount]);
|
mqe->unackedWork[connIndex % pmCount].fetch_add(1, std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)mqe->queue.push(sbs);
|
(void)mqe->queue.push(sbs);
|
||||||
|
|||||||
@@ -138,15 +138,15 @@ class WEClients
|
|||||||
/* To keep some state associated with the connection */
|
/* To keep some state associated with the connection */
|
||||||
struct MQE
|
struct MQE
|
||||||
{
|
{
|
||||||
MQE(uint32_t pCount) : ackSocketIndex(0), pmCount(pCount)
|
MQE(uint32_t pCount, boost::mutex* lock = nullptr, boost::condition* cond = nullptr)
|
||||||
|
: queue(lock, cond), ackSocketIndex(0), pmCount(pCount), unackedWork(pCount)
|
||||||
{
|
{
|
||||||
unackedWork.reset(new volatile uint32_t[pmCount]);
|
// unackedWork vector is default-initialized to 0
|
||||||
memset((void*)unackedWork.get(), 0, pmCount * sizeof(uint32_t));
|
|
||||||
}
|
}
|
||||||
WESMsgQueue queue;
|
WESMsgQueue queue;
|
||||||
uint32_t ackSocketIndex;
|
uint32_t ackSocketIndex;
|
||||||
boost::scoped_array<volatile uint32_t> unackedWork;
|
|
||||||
uint32_t pmCount;
|
uint32_t pmCount;
|
||||||
|
std::vector<std::atomic<uint32_t>> unackedWork;
|
||||||
};
|
};
|
||||||
|
|
||||||
// The mapping of session ids to StepMsgQueueLists
|
// The mapping of session ids to StepMsgQueueLists
|
||||||
@@ -168,7 +168,7 @@ class WEClients
|
|||||||
boost::mutex fMlock; // sessionMessages mutex
|
boost::mutex fMlock; // sessionMessages mutex
|
||||||
std::vector<boost::shared_ptr<boost::mutex> > fWlock; // WES socket write mutexes
|
std::vector<boost::shared_ptr<boost::mutex> > fWlock; // WES socket write mutexes
|
||||||
bool fBusy;
|
bool fBusy;
|
||||||
volatile uint32_t closingConnection;
|
std::atomic<uint32_t> closingConnection;
|
||||||
uint32_t pmCount;
|
uint32_t pmCount;
|
||||||
boost::mutex fOnErrMutex; // to lock function scope to reset pmconnections under error condition
|
boost::mutex fOnErrMutex; // to lock function scope to reset pmconnections under error condition
|
||||||
|
|
||||||
|
|||||||
@@ -65,13 +65,13 @@ namespace redistribute
|
|||||||
{
|
{
|
||||||
// static variables
|
// static variables
|
||||||
boost::mutex RedistributeControlThread::fActionMutex;
|
boost::mutex RedistributeControlThread::fActionMutex;
|
||||||
volatile bool RedistributeControlThread::fStopAction = false;
|
std::atomic<bool> RedistributeControlThread::fStopAction{false};
|
||||||
string RedistributeControlThread::fWesInUse;
|
string RedistributeControlThread::fWesInUse;
|
||||||
|
|
||||||
void RedistributeControlThread::setStopAction(bool s)
|
void RedistributeControlThread::setStopAction(bool s)
|
||||||
{
|
{
|
||||||
boost::mutex::scoped_lock lock(fActionMutex);
|
boost::mutex::scoped_lock lock(fActionMutex);
|
||||||
fStopAction = s;
|
fStopAction.store(s, std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
RedistributeControlThread::RedistributeControlThread(uint32_t act)
|
RedistributeControlThread::RedistributeControlThread(uint32_t act)
|
||||||
|
|||||||
@@ -24,6 +24,8 @@
|
|||||||
#include <map>
|
#include <map>
|
||||||
#include <set>
|
#include <set>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
#include <atomic>
|
||||||
|
#include <boost/thread.hpp>
|
||||||
|
|
||||||
#include "boost/shared_ptr.hpp"
|
#include "boost/shared_ptr.hpp"
|
||||||
#include "boost/thread/mutex.hpp"
|
#include "boost/thread/mutex.hpp"
|
||||||
@@ -119,7 +121,7 @@ class RedistributeControlThread
|
|||||||
RedistributeControl* fControl;
|
RedistributeControl* fControl;
|
||||||
|
|
||||||
static boost::mutex fActionMutex;
|
static boost::mutex fActionMutex;
|
||||||
static volatile bool fStopAction;
|
static std::atomic<bool> fStopAction;
|
||||||
static std::string fWesInUse;
|
static std::string fWesInUse;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -72,8 +72,8 @@ namespace redistribute
|
|||||||
{
|
{
|
||||||
// static variables
|
// static variables
|
||||||
boost::mutex RedistributeWorkerThread::fActionMutex;
|
boost::mutex RedistributeWorkerThread::fActionMutex;
|
||||||
volatile bool RedistributeWorkerThread::fStopAction = false;
|
std::atomic<bool> RedistributeWorkerThread::fStopAction{false};
|
||||||
volatile bool RedistributeWorkerThread::fCommitted = false;
|
std::atomic<bool> RedistributeWorkerThread::fCommitted{false};
|
||||||
string RedistributeWorkerThread::fWesInUse;
|
string RedistributeWorkerThread::fWesInUse;
|
||||||
|
|
||||||
RedistributeWorkerThread::RedistributeWorkerThread(ByteStream& bs, IOSocket& ios)
|
RedistributeWorkerThread::RedistributeWorkerThread(ByteStream& bs, IOSocket& ios)
|
||||||
|
|||||||
@@ -25,9 +25,11 @@
|
|||||||
#include <set>
|
#include <set>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <cstdio>
|
#include <cstdio>
|
||||||
|
#include <atomic>
|
||||||
|
|
||||||
#include "boost/shared_ptr.hpp"
|
#include "boost/shared_ptr.hpp"
|
||||||
#include "boost/thread/mutex.hpp"
|
#include "boost/thread/mutex.hpp"
|
||||||
|
#include "boost/thread.hpp"
|
||||||
|
|
||||||
#include "brmtypes.h"
|
#include "brmtypes.h"
|
||||||
#include "we_redistributedef.h"
|
#include "we_redistributedef.h"
|
||||||
@@ -133,8 +135,8 @@ class RedistributeWorkerThread
|
|||||||
// uint64_t fSegPerRoot;
|
// uint64_t fSegPerRoot;
|
||||||
|
|
||||||
static boost::mutex fActionMutex;
|
static boost::mutex fActionMutex;
|
||||||
static volatile bool fStopAction;
|
static std::atomic<bool> fStopAction;
|
||||||
static volatile bool fCommitted;
|
static std::atomic<bool> fCommitted;
|
||||||
static std::string fWesInUse;
|
static std::string fWesInUse;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -55,13 +55,12 @@ class ActiveThreadCounter
|
|||||||
|
|
||||||
for (;;)
|
for (;;)
|
||||||
{
|
{
|
||||||
atomicops::atomicMb();
|
atc = factiveThreadCount.load(std::memory_order_relaxed);
|
||||||
atc = factiveThreadCount;
|
|
||||||
|
|
||||||
if (atc <= 0) // hopefully atc will never be < 0!
|
if (atc <= 0) // hopefully atc will never be < 0!
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (atomicops::atomicCAS(&factiveThreadCount, atc, (atc - 1)))
|
if (factiveThreadCount.compare_exchange_weak(atc, atc - 1, std::memory_order_relaxed))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
atomicops::atomicYield();
|
atomicops::atomicYield();
|
||||||
@@ -70,14 +69,14 @@ class ActiveThreadCounter
|
|||||||
|
|
||||||
uint32_t cur()
|
uint32_t cur()
|
||||||
{
|
{
|
||||||
return factiveThreadCount;
|
return factiveThreadCount.load(std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
ActiveThreadCounter(const ActiveThreadCounter& rhs);
|
ActiveThreadCounter(const ActiveThreadCounter& rhs);
|
||||||
ActiveThreadCounter& operator=(const ActiveThreadCounter& rhs);
|
ActiveThreadCounter& operator=(const ActiveThreadCounter& rhs);
|
||||||
|
|
||||||
volatile int32_t factiveThreadCount;
|
std::atomic<int32_t> factiveThreadCount;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace WriteEngine
|
} // namespace WriteEngine
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ using namespace execplan;
|
|||||||
/** Namespace WriteEngine */
|
/** Namespace WriteEngine */
|
||||||
namespace WriteEngine
|
namespace WriteEngine
|
||||||
{
|
{
|
||||||
BRMWrapper* volatile BRMWrapper::m_instance = NULL;
|
std::atomic<BRMWrapper*> BRMWrapper::m_instance{nullptr};
|
||||||
std::atomic<bool> BRMWrapper::finishReported(false);
|
std::atomic<bool> BRMWrapper::finishReported(false);
|
||||||
thread_local int BRMWrapper::m_brmRc = 0;
|
thread_local int BRMWrapper::m_brmRc = 0;
|
||||||
boost::mutex BRMWrapper::m_instanceCreateMutex;
|
boost::mutex BRMWrapper::m_instanceCreateMutex;
|
||||||
@@ -298,22 +298,19 @@ int BRMWrapper::getFboOffset(const uint64_t lbid, int& oid, uint16_t& dbRoot, ui
|
|||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
BRMWrapper* BRMWrapper::getInstance()
|
BRMWrapper* BRMWrapper::getInstance()
|
||||||
{
|
{
|
||||||
if (m_instance == 0)
|
BRMWrapper* tmp = m_instance.load(std::memory_order_acquire);
|
||||||
|
if (tmp == nullptr)
|
||||||
{
|
{
|
||||||
boost::mutex::scoped_lock lock(m_instanceCreateMutex);
|
boost::mutex::scoped_lock lock(m_instanceCreateMutex);
|
||||||
|
tmp = m_instance.load(std::memory_order_relaxed);
|
||||||
if (m_instance == 0)
|
if (tmp == nullptr)
|
||||||
{
|
{
|
||||||
BRMWrapper* tmp = new BRMWrapper();
|
tmp = new BRMWrapper();
|
||||||
|
m_instance.store(tmp, std::memory_order_release);
|
||||||
// Memory barrier makes sure the m_instance assignment is not
|
|
||||||
// mingled with the constructor code
|
|
||||||
atomicops::atomicMb();
|
|
||||||
m_instance = tmp;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return m_instance;
|
return tmp;
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -465,7 +465,7 @@ class BRMWrapper : public WEObj
|
|||||||
// Private data members
|
// Private data members
|
||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
|
|
||||||
static BRMWrapper* volatile m_instance;
|
static std::atomic<BRMWrapper*> m_instance;
|
||||||
static thread_local int m_brmRc;
|
static thread_local int m_brmRc;
|
||||||
static boost::mutex m_instanceCreateMutex;
|
static boost::mutex m_instanceCreateMutex;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user