From 586391e1ca1148a88b0b4c8c3a6440f758cf4987 Mon Sep 17 00:00:00 2001 From: Sergei Golubchik Date: Thu, 19 Dec 2019 18:13:10 +0100 Subject: [PATCH] compilation failure error: reference to 'mutex' is ambiguous note: candidates are: 'class boost::mutex' note: 'class std::mutex' --- dbcon/execplan/clientrotator.cpp | 2 +- dbcon/joblist/distributedenginecomm.cpp | 52 +++++++++---------- dbcon/joblist/resourcemanager.cpp | 4 +- dbcon/joblist/tuple-bps.cpp | 4 +- dbcon/joblist/tupleaggregatestep.cpp | 2 +- dbcon/joblist/tuplehashjoin.cpp | 4 +- dbcon/joblist/tupleunion.cpp | 12 ++--- dmlproc/batchinsertprocessor.cpp | 10 ++-- dmlproc/dmlprocessor.cpp | 4 +- exemgr/activestatementcounter.cpp | 4 +- oam/oamcpp/oamcache.cpp | 24 ++++----- primitives/blockcache/filebuffermgr.cpp | 26 +++++----- primitives/blockcache/stats.cpp | 8 +-- .../primproc/batchprimitiveprocessor.cpp | 4 +- primitives/primproc/bppsendthread.cpp | 20 +++---- primitives/primproc/primitiveserver.cpp | 36 ++++++------- storage-manager/src/MetadataFile.cpp | 12 ++--- utils/cacheutils/cacheutils.cpp | 14 ++--- utils/configcpp/configcpp.cpp | 22 ++++---- utils/loggingcpp/idberrorinfo.cpp | 4 +- utils/loggingcpp/logger.cpp | 4 +- utils/loggingcpp/message.cpp | 4 +- utils/rowgroup/rowgroup.cpp | 4 +- utils/startup/installdir.cpp | 4 +- utils/threadpool/prioritythreadpool.cpp | 6 +-- versioning/BRM/autoincrementmanager.cpp | 12 ++--- versioning/BRM/extentmap.cpp | 4 +- versioning/BRM/sessionmanagerserver.cpp | 18 +++---- versioning/BRM/tablelockserver.cpp | 16 +++--- writeengine/client/we_clients.cpp | 16 +++--- .../redistribute/we_redistributecontrol.cpp | 14 ++--- .../we_redistributecontrolthread.cpp | 10 ++-- .../we_redistributeworkerthread.cpp | 10 ++-- writeengine/server/we_cpifeederthread.cpp | 10 ++-- writeengine/server/we_dataloader.cpp | 32 ++++++------ writeengine/shared/we_brm.cpp | 2 +- writeengine/splitter/we_filereadthread.cpp | 8 +-- writeengine/splitter/we_sdhandler.cpp | 30 +++++------ writeengine/splitter/we_splclient.cpp | 8 +-- 39 files changed, 240 insertions(+), 240 deletions(-) diff --git a/dbcon/execplan/clientrotator.cpp b/dbcon/execplan/clientrotator.cpp index 600f6f8e3..bb9f80bfa 100644 --- a/dbcon/execplan/clientrotator.cpp +++ b/dbcon/execplan/clientrotator.cpp @@ -214,7 +214,7 @@ void ClientRotator::write(const ByteStream& msg) ByteStream ClientRotator::read() { - mutex::scoped_lock lk(fClientLock); + boost::mutex::scoped_lock lk(fClientLock); ByteStream bs; diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index 056dbfe9a..6285b187b 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -224,7 +224,7 @@ DistributedEngineComm::~DistributedEngineComm() void DistributedEngineComm::Setup() { // This is here to ensure that this function does not get invoked multiple times simultaneously. - mutex::scoped_lock setupLock(fSetupMutex); + boost::mutex::scoped_lock setupLock(fSetupMutex); makeBusy(true); @@ -310,7 +310,7 @@ void DistributedEngineComm::Setup() // for every entry in newClients up to newPmCount, scan for the same ip in the // first pmCount. If there is no match, it's a new node, // call the event listeners' newPMOnline() callbacks. - mutex::scoped_lock lock(eventListenerLock); + boost::mutex::scoped_lock lock(eventListenerLock); for (uint32_t i = 0; i < newPmCount; i++) { @@ -386,7 +386,7 @@ void DistributedEngineComm::Listen(boost::shared_ptr client, Error: // @bug 488 - error condition! push 0 length bs to messagequeuemap and // eventually let jobstep error out. - /* mutex::scoped_lock lk(fMlock); + /* boost::mutex::scoped_lock lk(fMlock); //cout << "WARNING: DEC READ 0 LENGTH BS FROM " << client->otherEnd()<< endl; MessageQueueMap::iterator map_tok; @@ -404,7 +404,7 @@ Error: ClientList tempConns; { - mutex::scoped_lock onErrLock(fOnErrMutex); + boost::mutex::scoped_lock onErrLock(fOnErrMutex); string moduleName = client->moduleName(); //cout << "moduleName=" << moduleName << endl; for ( uint32_t i = 0; i < fPmConnections.size(); i++) @@ -438,7 +438,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs) { bool b; - mutex* lock = new mutex(); + boost::mutex* lock = new boost::mutex(); condition* cond = new condition(); boost::shared_ptr mqe(new MQE(pmCount)); @@ -446,7 +446,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs) mqe->sendACKs = sendACKs; mqe->throttled = false; - mutex::scoped_lock lk ( fMlock ); + boost::mutex::scoped_lock lk ( fMlock ); b = fSessionMessages.insert(pair >(key, mqe)).second; if (!b) @@ -459,7 +459,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs) void DistributedEngineComm::removeQueue(uint32_t key) { - mutex::scoped_lock lk(fMlock); + boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) @@ -472,7 +472,7 @@ void DistributedEngineComm::removeQueue(uint32_t key) void DistributedEngineComm::shutdownQueue(uint32_t key) { - mutex::scoped_lock lk(fMlock); + boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) @@ -487,7 +487,7 @@ void DistributedEngineComm::read(uint32_t key, SBS& bs) boost::shared_ptr mqe; //Find the StepMsgQueueList for this session - mutex::scoped_lock lk(fMlock); + boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) @@ -506,7 +506,7 @@ void DistributedEngineComm::read(uint32_t key, SBS& bs) if (bs && mqe->sendACKs) { - mutex::scoped_lock lk(ackLock); + boost::mutex::scoped_lock lk(ackLock); if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold) setFlowControl(false, key, mqe); @@ -526,7 +526,7 @@ const ByteStream DistributedEngineComm::read(uint32_t key) boost::shared_ptr mqe; //Find the StepMsgQueueList for this session - mutex::scoped_lock lk(fMlock); + boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) @@ -544,7 +544,7 @@ const ByteStream DistributedEngineComm::read(uint32_t key) if (sbs && mqe->sendACKs) { - mutex::scoped_lock lk(ackLock); + boost::mutex::scoped_lock lk(ackLock); if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold) setFlowControl(false, key, mqe); @@ -564,7 +564,7 @@ void DistributedEngineComm::read_all(uint32_t key, vector& v) { boost::shared_ptr mqe; - mutex::scoped_lock lk(fMlock); + boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) @@ -581,7 +581,7 @@ void DistributedEngineComm::read_all(uint32_t key, vector& v) if (mqe->sendACKs) { - mutex::scoped_lock lk(ackLock); + boost::mutex::scoped_lock lk(ackLock); sendAcks(key, v, mqe, 0); } } @@ -591,7 +591,7 @@ void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector mqe; - mutex::scoped_lock lk(fMlock); + boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) @@ -612,7 +612,7 @@ void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vectorsendACKs) { - mutex::scoped_lock lk(ackLock); + boost::mutex::scoped_lock lk(ackLock); if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold) setFlowControl(false, key, mqe); @@ -876,7 +876,7 @@ void DistributedEngineComm::write(messageqcpp::ByteStream& msg, uint32_t connect PrimitiveHeader* pm = (PrimitiveHeader*) (ism + 1); uint32_t senderID = pm->UniqueID; - mutex::scoped_lock lk(fMlock, defer_lock_t()); + boost::mutex::scoped_lock lk(fMlock, boost::defer_lock_t()); MessageQueueMap::iterator it; // This keeps mqe's stats from being freed until end of function boost::shared_ptr mqe; @@ -909,7 +909,7 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* uint32_t uniqueId = p->UniqueID; boost::shared_ptr mqe; - mutex::scoped_lock lk(fMlock); + boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId); if (map_tok == fSessionMessages.end()) @@ -931,7 +931,7 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* if (mqe->sendACKs) { - mutex::scoped_lock lk(ackLock); + boost::mutex::scoped_lock lk(ackLock); uint64_t msgSize = sbs->lengthWithHdrOverhead(); if (!mqe->throttled && msgSize > (targetRecvQueueSize / 2)) @@ -956,7 +956,7 @@ void DistributedEngineComm::doHasBigMsgs(boost::shared_ptr mqe, uint64_t ta int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uint32_t sender, bool doInterleaving) { - mutex::scoped_lock lk(fMlock, defer_lock_t()); + boost::mutex::scoped_lock lk(fMlock, boost::defer_lock_t()); MessageQueueMap::iterator it; // Keep mqe's stats from being freed early boost::shared_ptr mqe; @@ -992,7 +992,7 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin if (!client->isAvailable()) return 0; - mutex::scoped_lock lk(*(fWlock[index])); + boost::mutex::scoped_lock lk(*(fWlock[index])); client->write(bs, NULL, senderStats); return 0; } @@ -1019,7 +1019,7 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin ClientList tempConns; { //cout << "WARNING: DEC WRITE BROKEN PIPE " << fPmConnections[index]->otherEnd()<< endl; - mutex::scoped_lock onErrLock(fOnErrMutex); + boost::mutex::scoped_lock onErrLock(fOnErrMutex); string moduleName = fPmConnections[index]->moduleName(); //cout << "module name = " << moduleName << endl; if (index >= fPmConnections.size()) return 0; @@ -1051,7 +1051,7 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin uint32_t DistributedEngineComm::size(uint32_t key) { - mutex::scoped_lock lk(fMlock); + boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) @@ -1065,13 +1065,13 @@ uint32_t DistributedEngineComm::size(uint32_t key) void DistributedEngineComm::addDECEventListener(DECEventListener* l) { - mutex::scoped_lock lk(eventListenerLock); + boost::mutex::scoped_lock lk(eventListenerLock); eventListeners.push_back(l); } void DistributedEngineComm::removeDECEventListener(DECEventListener* l) { - mutex::scoped_lock lk(eventListenerLock); + boost::mutex::scoped_lock lk(eventListenerLock); std::vector newListeners; uint32_t s = eventListeners.size(); @@ -1084,7 +1084,7 @@ void DistributedEngineComm::removeDECEventListener(DECEventListener* l) Stats DistributedEngineComm::getNetworkStats(uint32_t uniqueID) { - mutex::scoped_lock lk(fMlock); + boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator it; Stats empty; diff --git a/dbcon/joblist/resourcemanager.cpp b/dbcon/joblist/resourcemanager.cpp index a5b8af16d..e34221131 100644 --- a/dbcon/joblist/resourcemanager.cpp +++ b/dbcon/joblist/resourcemanager.cpp @@ -57,11 +57,11 @@ const string ResourceManager::fExtentMapStr("ExtentMap"); const string ResourceManager::fOrderByLimitStr("OrderByLimit"); ResourceManager* ResourceManager::fInstance = NULL; -mutex mx; +boost::mutex mx; ResourceManager* ResourceManager::instance(bool runningInExeMgr) { - mutex::scoped_lock lk(mx); + boost::mutex::scoped_lock lk(mx); if (!fInstance) fInstance = new ResourceManager(runningInExeMgr); diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index e60d4db40..f70bb7b8e 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -927,7 +927,7 @@ void TupleBPS::prepCasualPartitioning() { uint32_t i; int64_t min, max, seq; - mutex::scoped_lock lk(cpMutex); + boost::mutex::scoped_lock lk(cpMutex); for (i = 0; i < scannedExtents.size(); i++) { @@ -3312,7 +3312,7 @@ void TupleBPS::abort_nolock() void TupleBPS::abort() { - boost::mutex::scoped_lock scoped(mutex); + boost::mutex::scoped_lock scoped(boost::mutex); abort_nolock(); } diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index ec9562b6d..24c1efa7e 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -414,7 +414,7 @@ void TupleAggregateStep::initializeMultiThread() for (i = 0; i < fNumOfBuckets; i++) { - mutex* lock = new mutex(); + boost::mutex* lock = new boost::mutex(); fAgg_mutex.push_back(lock); fRowGroupOuts[i] = fRowGroupOut; rgData.reinit(fRowGroupOut); diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index 20967e15f..ebc657d58 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -142,7 +142,7 @@ void TupleHashJoinStep::run() { uint32_t i; - mutex::scoped_lock lk(jlLock); + boost::mutex::scoped_lock lk(jlLock); if (runRan) return; @@ -183,7 +183,7 @@ void TupleHashJoinStep::run() void TupleHashJoinStep::join() { - mutex::scoped_lock lk(jlLock); + boost::mutex::scoped_lock lk(jlLock); if (joinRan) return; diff --git a/dbcon/joblist/tupleunion.cpp b/dbcon/joblist/tupleunion.cpp index f3cab8ddf..f62c94b9d 100644 --- a/dbcon/joblist/tupleunion.cpp +++ b/dbcon/joblist/tupleunion.cpp @@ -226,7 +226,7 @@ void TupleUnion::readInput(uint32_t which) l_tmpRG.getRow(0, &tmpRow); { - mutex::scoped_lock lk(uniquerMutex); + boost::mutex::scoped_lock lk(uniquerMutex); getOutput(&l_outputRG, &outRow, &outRGData); memUsageBefore = allocator.getMemUsage(); @@ -294,8 +294,8 @@ void TupleUnion::readInput(uint32_t which) more = dl->next(it, &inRGData); { - mutex::scoped_lock lock1(uniquerMutex); - mutex::scoped_lock lock2(sMutex); + boost::mutex::scoped_lock lock1(uniquerMutex); + boost::mutex::scoped_lock lock2(sMutex); if (!distinct && l_outputRG.getRowCount() > 0) output->insert(outRGData); @@ -395,7 +395,7 @@ void TupleUnion::addToOutput(Row* r, RowGroup* rg, bool keepit, if (rg->getRowCount() == 8192) { { - mutex::scoped_lock lock(sMutex); + boost::mutex::scoped_lock lock(sMutex); output->insert(data); } data = RGData(*rg); @@ -1182,7 +1182,7 @@ void TupleUnion::run() { uint32_t i; - mutex::scoped_lock lk(jlLock); + boost::mutex::scoped_lock lk(jlLock); if (runRan) return; @@ -1225,7 +1225,7 @@ void TupleUnion::run() void TupleUnion::join() { - mutex::scoped_lock lk(jlLock); + boost::mutex::scoped_lock lk(jlLock); if (joinRan) return; diff --git a/dmlproc/batchinsertprocessor.cpp b/dmlproc/batchinsertprocessor.cpp index a415e8248..1f7fbcda2 100644 --- a/dmlproc/batchinsertprocessor.cpp +++ b/dmlproc/batchinsertprocessor.cpp @@ -177,7 +177,7 @@ uint64_t BatchInsertProc::grabTableLock(int32_t sessionId) BatchInsertProc::SP_PKG BatchInsertProc::getInsertQueue() { - mutex::scoped_lock lk(fLock); + boost::mutex::scoped_lock lk(fLock); return fInsertPkgQueue; } @@ -187,7 +187,7 @@ void BatchInsertProc::setLastPkg (bool lastPkg) } void BatchInsertProc::addPkg(messageqcpp::ByteStream& insertBs) { - mutex::scoped_lock lk(fLock); + boost::mutex::scoped_lock lk(fLock); fInsertPkgQueue->push(insertBs); } @@ -195,7 +195,7 @@ void BatchInsertProc::addPkg(messageqcpp::ByteStream& insertBs) messageqcpp::ByteStream BatchInsertProc::getPkg() { messageqcpp::ByteStream bs; - mutex::scoped_lock lk(fLock); + boost::mutex::scoped_lock lk(fLock); bs = fInsertPkgQueue->front(); fInsertPkgQueue->pop(); return bs; @@ -535,13 +535,13 @@ void BatchInsertProc::setHwm() void BatchInsertProc::setError(int errorCode, std::string errMsg) { - mutex::scoped_lock lk(fLock); + boost::mutex::scoped_lock lk(fLock); fErrorCode = errorCode; fErrMsg = errMsg; } void BatchInsertProc::getError(int& errorCode, std::string& errMsg) { - mutex::scoped_lock lk(fLock); + boost::mutex::scoped_lock lk(fLock); errorCode = fErrorCode; errMsg = fErrMsg; } diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index 9e1601a36..ba13c4a21 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -1689,7 +1689,7 @@ void DMLProcessor::operator()() fConcurrentSupport, maxDeleteRows, sessionID, txnid.id, fDbrm, fQtc, csc)); // We put the packageHandler into a map so that if we receive a // message to affect the previous command, we can find it. - boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, defer_lock); + boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, boost::defer_lock); lk2.lock(); packageHandlerMap[sessionID] = php; @@ -1731,7 +1731,7 @@ void DMLProcessor::operator()() fConcurrentSupport, maxDeleteRows, sessionID, 0, fDbrm, fQtc, csc)); // We put the packageHandler into a map so that if we receive a // message to affect the previous command, we can find it. - boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, defer_lock); + boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, boost::defer_lock); lk2.lock(); packageHandlerMap[sessionID] = php; diff --git a/exemgr/activestatementcounter.cpp b/exemgr/activestatementcounter.cpp index 827bf46df..0b26bdff0 100644 --- a/exemgr/activestatementcounter.cpp +++ b/exemgr/activestatementcounter.cpp @@ -32,7 +32,7 @@ void ActiveStatementCounter::incr(bool& counted) return; counted = true; - mutex::scoped_lock lk(fMutex); + boost::mutex::scoped_lock lk(fMutex); if (upperLimit > 0) while (fStatementCount >= upperLimit) @@ -51,7 +51,7 @@ void ActiveStatementCounter::decr(bool& counted) return; counted = false; - mutex::scoped_lock lk(fMutex); + boost::mutex::scoped_lock lk(fMutex); if (fStatementCount == 0) return; diff --git a/oam/oamcpp/oamcache.cpp b/oam/oamcpp/oamcache.cpp index bf15b000b..ed5097fda 100644 --- a/oam/oamcpp/oamcache.cpp +++ b/oam/oamcpp/oamcache.cpp @@ -39,7 +39,7 @@ using namespace boost; namespace { oam::OamCache* oamCache = 0; -mutex cacheLock; +boost::mutex cacheLock; } namespace oam @@ -47,7 +47,7 @@ namespace oam OamCache* OamCache::makeOamCache() { - mutex::scoped_lock lk(cacheLock); + boost::mutex::scoped_lock lk(cacheLock); if (oamCache == 0) oamCache = new OamCache(); @@ -226,7 +226,7 @@ void OamCache::checkReload() OamCache::dbRootPMMap_t OamCache::getDBRootToPMMap() { - mutex::scoped_lock lk(cacheLock); + boost::mutex::scoped_lock lk(cacheLock); checkReload(); return dbRootPMMap; @@ -234,7 +234,7 @@ OamCache::dbRootPMMap_t OamCache::getDBRootToPMMap() OamCache::dbRootPMMap_t OamCache::getDBRootToConnectionMap() { - mutex::scoped_lock lk(cacheLock); + boost::mutex::scoped_lock lk(cacheLock); checkReload(); return dbRootConnectionMap; @@ -242,7 +242,7 @@ OamCache::dbRootPMMap_t OamCache::getDBRootToConnectionMap() OamCache::PMDbrootsMap_t OamCache::getPMToDbrootsMap() { - mutex::scoped_lock lk(cacheLock); + boost::mutex::scoped_lock lk(cacheLock); checkReload(); return pmDbrootsMap; @@ -250,7 +250,7 @@ OamCache::PMDbrootsMap_t OamCache::getPMToDbrootsMap() uint32_t OamCache::getDBRootCount() { - mutex::scoped_lock lk(cacheLock); + boost::mutex::scoped_lock lk(cacheLock); checkReload(); return numDBRoots; @@ -258,7 +258,7 @@ uint32_t OamCache::getDBRootCount() DBRootConfigList& OamCache::getDBRootNums() { - mutex::scoped_lock lk(cacheLock); + boost::mutex::scoped_lock lk(cacheLock); checkReload(); return dbroots; @@ -266,7 +266,7 @@ DBRootConfigList& OamCache::getDBRootNums() std::vector& OamCache::getModuleIds() { - mutex::scoped_lock lk(cacheLock); + boost::mutex::scoped_lock lk(cacheLock); checkReload(); return moduleIds; @@ -274,7 +274,7 @@ std::vector& OamCache::getModuleIds() std::string OamCache::getOAMParentModuleName() { - mutex::scoped_lock lk(cacheLock); + boost::mutex::scoped_lock lk(cacheLock); checkReload(); return OAMParentModuleName; @@ -282,7 +282,7 @@ std::string OamCache::getOAMParentModuleName() int OamCache::getLocalPMId() { - mutex::scoped_lock lk(cacheLock); + boost::mutex::scoped_lock lk(cacheLock); // This comes from the file /var/lib/columnstore/local/module, not from the xml. // Thus, it's not refreshed during checkReload(). @@ -324,7 +324,7 @@ int OamCache::getLocalPMId() string OamCache::getSystemName() { - mutex::scoped_lock lk(cacheLock); + boost::mutex::scoped_lock lk(cacheLock); checkReload(); return systemName; @@ -332,7 +332,7 @@ string OamCache::getSystemName() string OamCache::getModuleName() { - mutex::scoped_lock lk(cacheLock); + boost::mutex::scoped_lock lk(cacheLock); if (!moduleName.empty()) return moduleName; diff --git a/primitives/blockcache/filebuffermgr.cpp b/primitives/blockcache/filebuffermgr.cpp index fa7071942..6deccf8fc 100644 --- a/primitives/blockcache/filebuffermgr.cpp +++ b/primitives/blockcache/filebuffermgr.cpp @@ -105,7 +105,7 @@ void FileBufferMgr::setReportingFrequency(const uint32_t d) void FileBufferMgr::flushCache() { - mutex::scoped_lock lk(fWLock); + boost::mutex::scoped_lock lk(fWLock); { filebuffer_uset_t sEmpty; filebuffer_list_t lEmpty; @@ -131,7 +131,7 @@ void FileBufferMgr::flushCache() void FileBufferMgr::flushOne(const BRM::LBID_t lbid, const BRM::VER_t ver) { //similar in function to depleteCache() - mutex::scoped_lock lk(fWLock); + boost::mutex::scoped_lock lk(fWLock); filebuffer_uset_iter_t iter = fbSet.find(HashObject_t(lbid, ver, 0)); @@ -152,7 +152,7 @@ void FileBufferMgr::flushOne(const BRM::LBID_t lbid, const BRM::VER_t ver) void FileBufferMgr::flushMany(const LbidAtVer* laVptr, uint32_t cnt) { - mutex::scoped_lock lk(fWLock); + boost::mutex::scoped_lock lk(fWLock); BRM::LBID_t lbid; BRM::VER_t ver; @@ -199,7 +199,7 @@ void FileBufferMgr::flushManyAllversion(const LBID_t* laVptr, uint32_t cnt) tr1::unordered_set uniquer; tr1::unordered_set::iterator uit; - mutex::scoped_lock lk(fWLock); + boost::mutex::scoped_lock lk(fWLock); if (fReportFrequency) { @@ -264,7 +264,7 @@ void FileBufferMgr::flushOIDs(const uint32_t* oids, uint32_t count) // If there are more than this # of extents to drop, the whole cache will be cleared const uint32_t clearThreshold = 50000; - mutex::scoped_lock lk(fWLock); + boost::mutex::scoped_lock lk(fWLock); if (fCacheSize == 0 || count == 0) return; @@ -323,7 +323,7 @@ void FileBufferMgr::flushPartition(const vector& oids, const setmarkEvent(keyFb.lbid, pthread_self(), gSession, 'L'); #endif - mutex::scoped_lock lk(fWLock); + boost::mutex::scoped_lock lk(fWLock); if (gPMProfOn && gPMStatsPtr) #ifdef _MSC_VER @@ -497,7 +497,7 @@ uint32_t FileBufferMgr::bulkFind(const BRM::LBID_t* lbids, const BRM::VER_t* ver } } - mutex::scoped_lock lk(fWLock); + boost::mutex::scoped_lock lk(fWLock); if (gPMProfOn && gPMStatsPtr) { @@ -558,7 +558,7 @@ uint32_t FileBufferMgr::bulkFind(const BRM::LBID_t* lbids, const BRM::VER_t* ver bool FileBufferMgr::exists(const HashObject_t& fb) const { bool find_bool = false; - mutex::scoped_lock lk(fWLock); + boost::mutex::scoped_lock lk(fWLock); filebuffer_uset_iter_t it = fbSet.find(fb); @@ -590,7 +590,7 @@ int FileBufferMgr::insert(const BRM::LBID_t lbid, const BRM::VER_t ver, const ui gPMStatsPtr->markEvent(lbid, pthread_self(), gSession, 'I'); #endif - mutex::scoped_lock lk(fWLock); + boost::mutex::scoped_lock lk(fWLock); HashObject_t fbIndex(lbid, ver, 0); filebuffer_pair_t pr = fbSet.insert(fbIndex); @@ -796,7 +796,7 @@ int FileBufferMgr::bulkInsert(const vector& ops) int32_t pi; int ret = 0; - mutex::scoped_lock lk(fWLock); + boost::mutex::scoped_lock lk(fWLock); if (fReportFrequency) { diff --git a/primitives/blockcache/stats.cpp b/primitives/blockcache/stats.cpp index 29ab88ccd..f0ada3532 100644 --- a/primitives/blockcache/stats.cpp +++ b/primitives/blockcache/stats.cpp @@ -179,7 +179,7 @@ typedef map TraceFileMap_t; TraceFileMap_t traceFileMap; //map mutex -mutex traceFileMapMutex; +boost::mutex traceFileMapMutex; class StatMon { @@ -198,7 +198,7 @@ public: void operator()() const { //struct timespec ts = { 60 * 1, 0 }; - mutex::scoped_lock lk(traceFileMapMutex); + boost::mutex::scoped_lock lk(traceFileMapMutex); TraceFileMap_t::iterator iter; TraceFileMap_t::iterator end; @@ -259,7 +259,7 @@ void Stats::touchedLBID(uint64_t lbid, pthread_t thdid, uint32_t session) { if (session == 0) return; - mutex::scoped_lock lk(traceFileMapMutex); + boost::mutex::scoped_lock lk(traceFileMapMutex); TraceFileMap_t::iterator iter = traceFileMap.find(session); if (iter == traceFileMap.end()) @@ -276,7 +276,7 @@ void Stats::markEvent(const uint64_t lbid, const pthread_t thdid, const uint32_t { if (session == 0) return; - mutex::scoped_lock lk(traceFileMapMutex); + boost::mutex::scoped_lock lk(traceFileMapMutex); TraceFileMap_t::iterator iter = traceFileMap.find(session); if (iter == traceFileMap.end()) diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 7ae382830..b08c27de5 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -773,7 +773,7 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs) // TODO: write an RGData fcn to let it interpret data within a ByteStream to avoid // the extra copying. offTheWire.deserialize(bs); - mutex::scoped_lock lk(smallSideDataLocks[joinerNum]); + boost::mutex::scoped_lock lk(smallSideDataLocks[joinerNum]); smallSide.setData(&smallSideRowData[joinerNum]); smallSide.append(offTheWire, startPos); @@ -790,7 +790,7 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs) { joblist::ElementType *et = (joblist::ElementType*) bs.buf(); - mutex::scoped_lock lk(addToJoinerLocks[0][0]); + boost::mutex::scoped_lock lk(addToJoinerLocks[0][0]); for (i = 0; i < count; i++) { // cout << "BPP: adding <" << et[i].first << ", " << et[i].second << "> to Joiner\n"; diff --git a/primitives/primproc/bppsendthread.cpp b/primitives/primproc/bppsendthread.cpp index e025bc943..75ab27c38 100644 --- a/primitives/primproc/bppsendthread.cpp +++ b/primitives/primproc/bppsendthread.cpp @@ -51,8 +51,8 @@ BPPSendThread::BPPSendThread(uint32_t initMsgsLeft) : die(false), gotException(f BPPSendThread::~BPPSendThread() { - mutex::scoped_lock sl(msgQueueLock); - mutex::scoped_lock sl2(ackLock); + boost::mutex::scoped_lock sl(msgQueueLock); + boost::mutex::scoped_lock sl2(ackLock); die = true; queueNotEmpty.notify_one(); okToSend.notify_one(); @@ -74,7 +74,7 @@ void BPPSendThread::sendResult(const Msg_t& msg, bool newConnection) if (die) return; - mutex::scoped_lock sl(msgQueueLock); + boost::mutex::scoped_lock sl(msgQueueLock); if (gotException) throw runtime_error(exceptionString); @@ -108,7 +108,7 @@ void BPPSendThread::sendResults(const vector& msgs, bool newConnection) if (die) return; - mutex::scoped_lock sl(msgQueueLock); + boost::mutex::scoped_lock sl(msgQueueLock); if (gotException) throw runtime_error(exceptionString); @@ -143,7 +143,7 @@ void BPPSendThread::sendResults(const vector& msgs, bool newConnection) void BPPSendThread::sendMore(int num) { - mutex::scoped_lock sl(ackLock); + boost::mutex::scoped_lock sl(ackLock); // cout << "got an ACK for " << num << " msgsLeft=" << msgsLeft << endl; if (num == -1) @@ -178,7 +178,7 @@ void BPPSendThread::mainLoop() while (!die) { - mutex::scoped_lock sl(msgQueueLock); + boost::mutex::scoped_lock sl(msgQueueLock); if (msgQueue.empty() && !die) { @@ -209,7 +209,7 @@ void BPPSendThread::mainLoop() if (msgsLeft <= 0 && fcEnabled && !die) { - mutex::scoped_lock sl2(ackLock); + boost::mutex::scoped_lock sl2(ackLock); while (msgsLeft <= 0 && fcEnabled && !die) { @@ -238,7 +238,7 @@ void BPPSendThread::mainLoop() try { - mutex::scoped_lock sl2(*lock); + boost::mutex::scoped_lock sl2(*lock); sock->write(*msg[msgsSent].msg); //cout << "sent 1 msg\n"; } @@ -260,8 +260,8 @@ void BPPSendThread::mainLoop() void BPPSendThread::abort() { - mutex::scoped_lock sl(msgQueueLock); - mutex::scoped_lock sl2(ackLock); + boost::mutex::scoped_lock sl(msgQueueLock); + boost::mutex::scoped_lock sl2(ackLock); die = true; queueNotEmpty.notify_one(); okToSend.notify_one(); diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 2061d832f..13abe2a04 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -150,11 +150,11 @@ int noVB = 0; const uint8_t fMaxColWidth(8); BPPMap bppMap; -mutex bppLock; +boost::mutex bppLock; #define DJLOCK_READ 0 #define DJLOCK_WRITE 1 -mutex djMutex; // lock for djLock, lol. +boost::mutex djMutex; // lock for djLock, lol. std::map djLock; // djLock synchronizes destroy and joiner msgs, see bug 2619 volatile int32_t asyncCounter; @@ -187,7 +187,7 @@ pfBlockMap_t pfExtentMap; boost::mutex pfMutex; // = PTHREAD_MUTEX_INITIALIZER; map > dictEqualityFilters; -mutex eqFilterMutex; +boost::mutex eqFilterMutex; uint32_t cacheNum(uint64_t lbid) { @@ -1047,7 +1047,7 @@ void loadBlockAsync(uint64_t lbid, (void)atomicops::atomicInc(&asyncCounter); - mutex::scoped_lock sl(*m); + boost::mutex::scoped_lock sl(*m); try { @@ -1134,7 +1134,7 @@ DictScanJob::~DictScanJob() void DictScanJob::write(const ByteStream& bs) { - mutex::scoped_lock lk(*fWriteLock); + boost::mutex::scoped_lock lk(*fWriteLock); fIos->write(bs); } @@ -1178,7 +1178,7 @@ int DictScanJob::operator()() /* Grab the equality filter if one is specified */ if (cmd->flags & HAS_EQ_FILTER) { - mutex::scoped_lock sl(eqFilterMutex); + boost::mutex::scoped_lock sl(eqFilterMutex); map >::iterator it; it = dictEqualityFilters.find(uniqueId); @@ -1280,7 +1280,7 @@ struct BPPHandler ~BPPHandler() { - mutex::scoped_lock scoped(bppLock); + boost::mutex::scoped_lock scoped(bppLock); for (bppKeysIt = bppKeys.begin() ; bppKeysIt != bppKeys.end(); ++bppKeysIt) { @@ -1383,7 +1383,7 @@ struct BPPHandler return -1; } - mutex::scoped_lock scoped(bppLock); + boost::mutex::scoped_lock scoped(bppLock); bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), key); if (bppKeysIt != bppKeys.end()) @@ -1475,7 +1475,7 @@ struct BPPHandler } } - mutex::scoped_lock scoped(bppLock); + boost::mutex::scoped_lock scoped(bppLock); key = bpp->getUniqueID(); bppKeys.push_back(key); bool newInsert; @@ -1504,7 +1504,7 @@ struct BPPHandler */ SBPPV ret; - mutex::scoped_lock scoped(bppLock); + boost::mutex::scoped_lock scoped(bppLock); it = bppMap.find(uniqueID); if (it != bppMap.end()) @@ -1533,7 +1533,7 @@ struct BPPHandler inline shared_mutex & getDJLock(uint32_t uniqueID) { - mutex::scoped_lock lk(djMutex); + boost::mutex::scoped_lock lk(djMutex); auto it = djLock.find(uniqueID); if (it != djLock.end()) return *it->second; @@ -1546,7 +1546,7 @@ struct BPPHandler inline void deleteDJLock(uint32_t uniqueID) { - mutex::scoped_lock lk(djMutex); + boost::mutex::scoped_lock lk(djMutex); auto it = djLock.find(uniqueID); if (it != djLock.end()) { @@ -1607,7 +1607,7 @@ struct BPPHandler return -1; } - unique_lock lk(getDJLock(uniqueID)); + boost::unique_lock lk(getDJLock(uniqueID)); for (i = 0; i < bppv->get().size(); i++) @@ -1653,8 +1653,8 @@ struct BPPHandler return -1; } - unique_lock lk(getDJLock(uniqueID)); - mutex::scoped_lock scoped(bppLock); + boost::unique_lock lk(getDJLock(uniqueID)); + boost::mutex::scoped_lock scoped(bppLock); bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), uniqueID); @@ -1821,7 +1821,7 @@ private: filter->insert(str); } - mutex::scoped_lock sl(eqFilterMutex); + boost::mutex::scoped_lock sl(eqFilterMutex); dictEqualityFilters[uniqueID] = filter; } }; @@ -1843,7 +1843,7 @@ public: bs->advance(sizeof(ISMPacketHeader)); *bs >> uniqueID; - mutex::scoped_lock sl(eqFilterMutex); + boost::mutex::scoped_lock sl(eqFilterMutex); it = dictEqualityFilters.find(uniqueID); if (it != dictEqualityFilters.end()) @@ -2001,7 +2001,7 @@ struct ReadThread // IOSocket. If we end up rotating through multiple output sockets // for the same UM, we will use UmSocketSelector to select output. SP_UM_IOSOCK outIosDefault(new IOSocket(fIos)); - SP_UM_MUTEX writeLockDefault(new mutex()); + SP_UM_MUTEX writeLockDefault(new boost::mutex()); bool bRotateDest = fPrimitiveServerPtr->rotatingDestination(); diff --git a/storage-manager/src/MetadataFile.cpp b/storage-manager/src/MetadataFile.cpp index e79a88be6..028b1d22c 100644 --- a/storage-manager/src/MetadataFile.cpp +++ b/storage-manager/src/MetadataFile.cpp @@ -38,7 +38,7 @@ namespace bf = boost::filesystem; namespace { - boost::mutex mutex; + boost::mutex mdfLock; storagemanager::MetadataFile::MetadataConfig *inst = NULL; uint64_t metadataFilesAccessed = 0; } @@ -50,7 +50,7 @@ MetadataFile::MetadataConfig * MetadataFile::MetadataConfig::get() { if (inst) return inst; - boost::unique_lock s(mutex); + boost::unique_lock s(mdfLock); if (inst) return inst; inst = new MetadataConfig(); @@ -380,9 +380,9 @@ void MetadataFile::breakout(const string &key, vector &ret) string MetadataFile::getNewKeyFromOldKey(const string &key, size_t length) { - mutex.lock(); + mdfLock.lock(); boost::uuids::uuid u = boost::uuids::random_generator()(); - mutex.unlock(); + mdfLock.unlock(); vector split; breakout(key, split); @@ -393,9 +393,9 @@ string MetadataFile::getNewKeyFromOldKey(const string &key, size_t length) string MetadataFile::getNewKey(string sourceName, size_t offset, size_t length) { - mutex.lock(); + mdfLock.lock(); boost::uuids::uuid u = boost::uuids::random_generator()(); - mutex.unlock(); + mdfLock.unlock(); stringstream ss; for (uint i = 0; i < sourceName.length(); i++) diff --git a/utils/cacheutils/cacheutils.cpp b/utils/cacheutils/cacheutils.cpp index ca56e2da0..b53ef7875 100644 --- a/utils/cacheutils/cacheutils.cpp +++ b/utils/cacheutils/cacheutils.cpp @@ -52,7 +52,7 @@ namespace { //Only one of the cacheutils fcns can run at a time -mutex CacheOpsMutex; +boost::mutex CacheOpsMutex; //This global is updated only w/ atomic ops volatile uint32_t MultiReturnCode; @@ -148,7 +148,7 @@ namespace cacheutils */ int flushPrimProcCache() { - mutex::scoped_lock lk(CacheOpsMutex); + boost::mutex::scoped_lock lk(CacheOpsMutex); try { @@ -176,7 +176,7 @@ int flushPrimProcBlocks(const BRM::BlockList_t& list) { if (list.empty()) return 0; - mutex::scoped_lock lk(CacheOpsMutex); + boost::mutex::scoped_lock lk(CacheOpsMutex); #if defined(__LP64__) || defined(_WIN64) @@ -233,7 +233,7 @@ int flushPrimProcAllverBlocks(const vector& list) try { - mutex::scoped_lock lk(CacheOpsMutex); + boost::mutex::scoped_lock lk(CacheOpsMutex); rc = sendToAll(bs); return rc; } @@ -252,7 +252,7 @@ int flushOIDsFromCache(const vector& oids) * uint32_t * - OID array */ - mutex::scoped_lock lk(CacheOpsMutex, defer_lock_t()); + boost::mutex::scoped_lock lk(CacheOpsMutex, boost::defer_lock_t()); ByteStream bs; ISMPacketHeader ism; @@ -281,7 +281,7 @@ int flushPartition(const std::vector& oids, set& values) { - recursive_mutex::scoped_lock lk(fLock); + boost::recursive_mutex::scoped_lock lk(fLock); if (section.length() == 0) throw invalid_argument("Config::getConfig: section must have a length"); @@ -270,7 +270,7 @@ void Config::getConfig(const string& section, const string& name, vector void Config::setConfig(const string& section, const string& name, const string& value) { - recursive_mutex::scoped_lock lk(fLock); + boost::recursive_mutex::scoped_lock lk(fLock); if (section.length() == 0 || name.length() == 0 ) throw invalid_argument("Config::setConfig: all of section and name must have a length"); @@ -300,7 +300,7 @@ void Config::setConfig(const string& section, const string& name, const string& void Config::delConfig(const string& section, const string& name) { - recursive_mutex::scoped_lock lk(fLock); + boost::recursive_mutex::scoped_lock lk(fLock); if (section.length() == 0 || name.length() == 0) throw invalid_argument("Config::delConfig: both section and name must have a length"); @@ -328,7 +328,7 @@ void Config::delConfig(const string& section, const string& name) void Config::writeConfig(const string& configFile) const { - recursive_mutex::scoped_lock lk(fLock); + boost::recursive_mutex::scoped_lock lk(fLock); FILE* fi; if (fDoc == 0) @@ -445,7 +445,7 @@ void Config::writeConfig(const string& configFile) const void Config::write(void) const { - mutex::scoped_lock lk(fWriteXmlLock); + boost::mutex::scoped_lock lk(fWriteXmlLock); #ifdef _MSC_VER writeConfig(fConfigFile); #else @@ -540,7 +540,7 @@ void Config::writeConfigFile(messageqcpp::ByteStream msg) const /* static */ void Config::deleteInstanceMap() { - mutex::scoped_lock lk(fInstanceMapMutex); + boost::mutex::scoped_lock lk(fInstanceMapMutex); for (Config::configMap_t::iterator iter = fInstanceMap.begin(); iter != fInstanceMap.end(); ++iter) @@ -602,7 +602,7 @@ int64_t Config::fromText(const std::string& text) time_t Config::getCurrentMTime() { - recursive_mutex::scoped_lock lk(fLock); + boost::recursive_mutex::scoped_lock lk(fLock); struct stat statbuf; @@ -614,7 +614,7 @@ time_t Config::getCurrentMTime() const vector Config::enumConfig() { - recursive_mutex::scoped_lock lk(fLock); + boost::recursive_mutex::scoped_lock lk(fLock); if (fDoc == 0) { @@ -638,7 +638,7 @@ const vector Config::enumConfig() const vector Config::enumSection(const string& section) { - recursive_mutex::scoped_lock lk(fLock); + boost::recursive_mutex::scoped_lock lk(fLock); if (fDoc == 0) { diff --git a/utils/loggingcpp/idberrorinfo.cpp b/utils/loggingcpp/idberrorinfo.cpp index fd562ae1d..0fe755fe1 100644 --- a/utils/loggingcpp/idberrorinfo.cpp +++ b/utils/loggingcpp/idberrorinfo.cpp @@ -47,11 +47,11 @@ namespace logging { IDBErrorInfo* IDBErrorInfo::fInstance = 0; -mutex mx; +boost::mutex mx; IDBErrorInfo* IDBErrorInfo::instance() { - mutex::scoped_lock lk(mx); + boost::mutex::scoped_lock lk(mx); if (!fInstance) fInstance = new IDBErrorInfo(); diff --git a/utils/loggingcpp/logger.cpp b/utils/loggingcpp/logger.cpp index 15b72d865..1f3c5ad3f 100644 --- a/utils/loggingcpp/logger.cpp +++ b/utils/loggingcpp/logger.cpp @@ -51,7 +51,7 @@ const string Logger::logMessage(LOG_TYPE logLevel, Message::MessageID mid, const return logMessage(logLevel, msg, logInfo); /* - mutex::scoped_lock lk(fLogLock); + boost::mutex::scoped_lock lk(fLogLock); fMl1.logData(logInfo); switch (logLevel) @@ -79,7 +79,7 @@ const string Logger::logMessage(LOG_TYPE logLevel, Message::MessageID mid, const const std::string Logger::logMessage(LOG_TYPE logLevel, const Message& msg, const LoggingID& logInfo) { - mutex::scoped_lock lk(fLogLock); + boost::mutex::scoped_lock lk(fLogLock); fMl1.logData(logInfo); switch (logLevel) diff --git a/utils/loggingcpp/message.cpp b/utils/loggingcpp/message.cpp index 5b7ec6c4f..3f7ba1274 100644 --- a/utils/loggingcpp/message.cpp +++ b/utils/loggingcpp/message.cpp @@ -44,7 +44,7 @@ using namespace config; namespace { -mutex mx; +boost::mutex mx; bool catalogLoaded = false; typedef map CatMap; @@ -190,7 +190,7 @@ const string Message::lookupMessage(const MessageID& msgid) { if (!catalogLoaded) { - mutex::scoped_lock lock(mx); + boost::mutex::scoped_lock lock(mx); if (!catalogLoaded) { diff --git a/utils/rowgroup/rowgroup.cpp b/utils/rowgroup/rowgroup.cpp index cefae07f4..a8bae1086 100644 --- a/utils/rowgroup/rowgroup.cpp +++ b/utils/rowgroup/rowgroup.cpp @@ -98,7 +98,7 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len) return numeric_limits::max(); //@bug6065, make StringStore::storeString() thread safe - boost::mutex::scoped_lock lk(fMutex, defer_lock); + boost::mutex::scoped_lock lk(fMutex, boost::defer_lock); if (fUseStoreStringMutex) lk.lock(); @@ -254,7 +254,7 @@ uint32_t UserDataStore::storeUserData(mcsv1sdk::mcsv1Context& context, return numeric_limits::max(); } - boost::mutex::scoped_lock lk(fMutex, defer_lock); + boost::mutex::scoped_lock lk(fMutex, boost::defer_lock); if (fUseUserDataMutex) lk.lock(); diff --git a/utils/startup/installdir.cpp b/utils/startup/installdir.cpp index fb94ac93d..1c1c0e453 100644 --- a/utils/startup/installdir.cpp +++ b/utils/startup/installdir.cpp @@ -43,14 +43,14 @@ namespace startup { /* static */ -mutex StartUp::fTmpDirLock; +boost::mutex StartUp::fTmpDirLock; /* static */ string* StartUp::fTmpDirp = 0; /* static */ const string StartUp::tmpDir() { - mutex::scoped_lock lk(fTmpDirLock); + boost::mutex::scoped_lock lk(fTmpDirLock); if (fTmpDirp) return *fTmpDirp; diff --git a/utils/threadpool/prioritythreadpool.cpp b/utils/threadpool/prioritythreadpool.cpp index f752ded3b..ebed4e9c9 100644 --- a/utils/threadpool/prioritythreadpool.cpp +++ b/utils/threadpool/prioritythreadpool.cpp @@ -73,7 +73,7 @@ PriorityThreadPool::~PriorityThreadPool() void PriorityThreadPool::addJob(const Job& job, bool useLock) { boost::thread* newThread; - mutex::scoped_lock lk(mutex, defer_lock_t()); + boost::mutex::scoped_lock lk(mutex, boost::defer_lock_t()); if (useLock) lk.lock(); @@ -115,7 +115,7 @@ void PriorityThreadPool::removeJobs(uint32_t id) { list::iterator it; - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); for (uint32_t i = 0; i < _COUNT; i++) for (it = jobQueues[i].begin(); it != jobQueues[i].end();) @@ -152,7 +152,7 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() while (!_stop) { - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); queue = pickAQueue(preferredQueue); diff --git a/versioning/BRM/autoincrementmanager.cpp b/versioning/BRM/autoincrementmanager.cpp index a71f428af..b62eaa2ab 100644 --- a/versioning/BRM/autoincrementmanager.cpp +++ b/versioning/BRM/autoincrementmanager.cpp @@ -45,7 +45,7 @@ AutoincrementManager::~AutoincrementManager() void AutoincrementManager::startSequence(uint32_t oid, uint64_t firstNum, uint32_t colWidth, execplan::CalpontSystemCatalog::ColDataType colDataType) { - mutex::scoped_lock lk(lock); + boost::mutex::scoped_lock lk(lock); map::iterator it; sequence s; @@ -70,7 +70,7 @@ void AutoincrementManager::startSequence(uint32_t oid, uint64_t firstNum, uint32 bool AutoincrementManager::getAIRange(uint32_t oid, uint64_t count, uint64_t* firstNum) { - mutex::scoped_lock lk(lock); + boost::mutex::scoped_lock lk(lock); map::iterator it; it = sequences.find(oid); @@ -91,7 +91,7 @@ bool AutoincrementManager::getAIRange(uint32_t oid, uint64_t count, uint64_t* fi void AutoincrementManager::resetSequence(uint32_t oid, uint64_t value) { - mutex::scoped_lock lk(lock); + boost::mutex::scoped_lock lk(lock); map::iterator it; it = sequences.find(oid); @@ -104,7 +104,7 @@ void AutoincrementManager::resetSequence(uint32_t oid, uint64_t value) void AutoincrementManager::getLock(uint32_t oid) { - mutex::scoped_lock lk(lock); + boost::mutex::scoped_lock lk(lock); map::iterator it; ptime stealTime = microsec_clock::local_time() + seconds(lockTime); @@ -130,7 +130,7 @@ void AutoincrementManager::getLock(uint32_t oid) void AutoincrementManager::releaseLock(uint32_t oid) { - mutex::scoped_lock lk(lock); + boost::mutex::scoped_lock lk(lock); map::iterator it; it = sequences.find(oid); @@ -145,7 +145,7 @@ void AutoincrementManager::releaseLock(uint32_t oid) void AutoincrementManager::deleteSequence(uint32_t oid) { - mutex::scoped_lock lk(lock); + boost::mutex::scoped_lock lk(lock); map::iterator it; it = sequences.find(oid); diff --git a/versioning/BRM/extentmap.cpp b/versioning/BRM/extentmap.cpp index ea43a1053..2b79fc6ea 100644 --- a/versioning/BRM/extentmap.cpp +++ b/versioning/BRM/extentmap.cpp @@ -1439,7 +1439,7 @@ void ExtentMap::save(const string& filename) /* always returns holding the EM lock, and with the EM seg mapped */ void ExtentMap::grabEMEntryTable(OPS op) { - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); if (op == READ) fEMShminfo = fMST.getTable_read(MasterSegmentTable::EMTable); @@ -1496,7 +1496,7 @@ void ExtentMap::grabEMEntryTable(OPS op) /* always returns holding the FL lock */ void ExtentMap::grabFreeList(OPS op) { - mutex::scoped_lock lk(mutex, defer_lock); + boost::mutex::scoped_lock lk(mutex, boost::defer_lock); if (op == READ) { diff --git a/versioning/BRM/sessionmanagerserver.cpp b/versioning/BRM/sessionmanagerserver.cpp index b98ca2d87..7d5f3b8be 100644 --- a/versioning/BRM/sessionmanagerserver.cpp +++ b/versioning/BRM/sessionmanagerserver.cpp @@ -237,7 +237,7 @@ const QueryContext SessionManagerServer::verID() { QueryContext ret; - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); ret.currentScn = _verID; for (iterator i = activeTxns.begin(); i != activeTxns.end(); ++i) @@ -250,7 +250,7 @@ const QueryContext SessionManagerServer::sysCatVerID() { QueryContext ret; - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); ret.currentScn = _sysCatVerID; for (iterator i = activeTxns.begin(); i != activeTxns.end(); ++i) @@ -264,7 +264,7 @@ const TxnID SessionManagerServer::newTxnID(const SID session, bool block, bool i TxnID ret; //ctor must set valid = false iterator it; - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); // if it already has a txn... it = activeTxns.find(session); @@ -299,7 +299,7 @@ const TxnID SessionManagerServer::newTxnID(const SID session, bool block, bool i void SessionManagerServer::finishTransaction(TxnID& txn) { iterator it; - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); bool found = false; if (!txn.valid) @@ -335,7 +335,7 @@ const TxnID SessionManagerServer::getTxnID(const SID session) TxnID ret; iterator it; - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); it = activeTxns.find(session); @@ -352,7 +352,7 @@ shared_array SessionManagerServer::SIDTIDMap(int& len) { int j; shared_array ret; - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); iterator it; ret.reset(new SIDTIDEntry[activeTxns.size()]); @@ -371,7 +371,7 @@ shared_array SessionManagerServer::SIDTIDMap(int& len) void SessionManagerServer::setSystemState(uint32_t state) { - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); systemState |= state; saveSystemState(); @@ -379,7 +379,7 @@ void SessionManagerServer::setSystemState(uint32_t state) void SessionManagerServer::clearSystemState(uint32_t state) { - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); systemState &= ~state; saveSystemState(); @@ -387,7 +387,7 @@ void SessionManagerServer::clearSystemState(uint32_t state) uint32_t SessionManagerServer::getTxnCount() { - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); return activeTxns.size(); } diff --git a/versioning/BRM/tablelockserver.cpp b/versioning/BRM/tablelockserver.cpp index 291001b14..5772e7135 100644 --- a/versioning/BRM/tablelockserver.cpp +++ b/versioning/BRM/tablelockserver.cpp @@ -40,7 +40,7 @@ namespace BRM TableLockServer::TableLockServer(SessionManagerServer* sm) : sms(sm) { - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); config::Config* config = config::Config::makeConfig(); filename = config->getConfig("SystemConfig", "TableLockSaveFile"); @@ -135,7 +135,7 @@ uint64_t TableLockServer::lock(TableLockInfo* tli) set dbroots; lit_t it; uint32_t i; - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); for (i = 0; i < tli->dbrootList.size(); i++) dbroots.insert(tli->dbrootList[i]); @@ -177,7 +177,7 @@ bool TableLockServer::unlock(uint64_t id) std::map::iterator it; TableLockInfo tli; - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); it = locks.find(id); if (it != locks.end()) @@ -204,7 +204,7 @@ bool TableLockServer::unlock(uint64_t id) bool TableLockServer::changeState(uint64_t id, LockState state) { lit_t it; - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); LockState old; it = locks.find(id); @@ -232,7 +232,7 @@ bool TableLockServer::changeOwner(uint64_t id, const string& ownerName, uint32_t int32_t txnID) { lit_t it; - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); string oldName; uint32_t oldPID; int32_t oldSession; @@ -271,7 +271,7 @@ bool TableLockServer::changeOwner(uint64_t id, const string& ownerName, uint32_t vector TableLockServer::getAllLocks() const { vector ret; - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); constlit_t it; for (it = locks.begin(); it != locks.end(); ++it) @@ -284,7 +284,7 @@ void TableLockServer::releaseAllLocks() { std::map tmp; - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); tmp.swap(locks); try @@ -301,7 +301,7 @@ void TableLockServer::releaseAllLocks() bool TableLockServer::getLockInfo(uint64_t id, TableLockInfo* out) const { constlit_t it; - mutex::scoped_lock lk(mutex); + boost::mutex::scoped_lock lk(mutex); it = locks.find(id); diff --git a/writeengine/client/we_clients.cpp b/writeengine/client/we_clients.cpp index c82b99c10..92b7c7f9c 100644 --- a/writeengine/client/we_clients.cpp +++ b/writeengine/client/we_clients.cpp @@ -382,7 +382,7 @@ void WEClients::Listen ( boost::shared_ptr client, uint32_t Error: // error condition! push 0 length bs to messagequeuemap and // eventually let jobstep error out. - mutex::scoped_lock lk(fMlock); + boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok; sbs.reset(new ByteStream(0)); @@ -398,7 +398,7 @@ Error: // reset the pmconnection map { - mutex::scoped_lock onErrLock(fOnErrMutex); + boost::mutex::scoped_lock onErrLock(fOnErrMutex); string moduleName = client->moduleName(); ClientList::iterator itor = fPmConnections.begin(); @@ -430,13 +430,13 @@ void WEClients::addQueue(uint32_t key) { bool b; - mutex* lock = new mutex(); + boost::mutex* lock = new boost::mutex(); condition* cond = new condition(); boost::shared_ptr mqe(new MQE(pmCount)); mqe->queue = WESMsgQueue(lock, cond); - mutex::scoped_lock lk ( fMlock ); + boost::mutex::scoped_lock lk ( fMlock ); b = fSessionMessages.insert(pair >(key, mqe)).second; if (!b) @@ -449,7 +449,7 @@ void WEClients::addQueue(uint32_t key) void WEClients::removeQueue(uint32_t key) { - mutex::scoped_lock lk(fMlock); + boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) @@ -462,7 +462,7 @@ void WEClients::removeQueue(uint32_t key) void WEClients::shutdownQueue(uint32_t key) { - mutex::scoped_lock lk(fMlock); + boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) @@ -477,7 +477,7 @@ void WEClients::read(uint32_t key, SBS& bs) boost::shared_ptr mqe; //Find the StepMsgQueueList for this session - mutex::scoped_lock lk(fMlock); + boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) @@ -557,7 +557,7 @@ void WEClients::addDataToOutput(SBS sbs, uint32_t connIndex) *sbs >> uniqueId; boost::shared_ptr mqe; - mutex::scoped_lock lk(fMlock); + boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId); if (map_tok == fSessionMessages.end()) diff --git a/writeengine/redistribute/we_redistributecontrol.cpp b/writeengine/redistribute/we_redistributecontrol.cpp index 371c14858..59e42bee5 100644 --- a/writeengine/redistribute/we_redistributecontrol.cpp +++ b/writeengine/redistribute/we_redistributecontrol.cpp @@ -77,7 +77,7 @@ const string PlanFileName("/redistribute.plan"); RedistributeControl* RedistributeControl::instance() { // The constructor is protected by instanceMutex lock. - mutex::scoped_lock lock(instanceMutex); + boost::mutex::scoped_lock lock(instanceMutex); if (fInstance == NULL) fInstance = new RedistributeControl(); @@ -132,7 +132,7 @@ RedistributeControl::~RedistributeControl() int RedistributeControl::handleUIMsg(messageqcpp::ByteStream& bs, messageqcpp::IOSocket& so) { - mutex::scoped_lock sessionLock(fSessionMutex); + boost::mutex::scoped_lock sessionLock(fSessionMutex); uint32_t status = RED_STATE_UNDEF; const RedistributeMsgHeader* h = (const RedistributeMsgHeader*) bs.buf(); @@ -402,7 +402,7 @@ uint32_t RedistributeControl::getCurrentState() { uint32_t status = RED_STATE_UNDEF; ostringstream oss; - mutex::scoped_lock lock(fInfoFileMutex); + boost::mutex::scoped_lock lock(fInfoFileMutex); if (!fInfoFilePtr) { @@ -475,7 +475,7 @@ bool RedistributeControl::getStartOptions(messageqcpp::ByteStream& bs) void RedistributeControl::updateState(uint32_t s) { - mutex::scoped_lock lock(fInfoFileMutex); + boost::mutex::scoped_lock lock(fInfoFileMutex); // allowed state change: // idle -> active @@ -666,7 +666,7 @@ void RedistributeControl::updateState(uint32_t s) void RedistributeControl::setEntryCount(uint32_t entryCount) { - mutex::scoped_lock lock(fInfoFileMutex); + boost::mutex::scoped_lock lock(fInfoFileMutex); fRedistributeInfo.planned = entryCount; rewind(fInfoFilePtr); @@ -677,7 +677,7 @@ void RedistributeControl::setEntryCount(uint32_t entryCount) void RedistributeControl::updateProgressInfo(uint32_t s, time_t t) { - mutex::scoped_lock lock(fInfoFileMutex); + boost::mutex::scoped_lock lock(fInfoFileMutex); fRedistributeInfo.endTime = t; switch (s) @@ -703,7 +703,7 @@ void RedistributeControl::updateProgressInfo(uint32_t s, time_t t) int RedistributeControl::handleJobMsg(messageqcpp::ByteStream& bs, messageqcpp::IOSocket& so) { -// mutex::scoped_lock jobLock(fJobMutex); +// boost::mutex::scoped_lock jobLock(fJobMutex); uint32_t status = RED_TRANS_SUCCESS; diff --git a/writeengine/redistribute/we_redistributecontrolthread.cpp b/writeengine/redistribute/we_redistributecontrolthread.cpp index 07af0112c..1ce786a75 100644 --- a/writeengine/redistribute/we_redistributecontrolthread.cpp +++ b/writeengine/redistribute/we_redistributecontrolthread.cpp @@ -73,7 +73,7 @@ string RedistributeControlThread::fWesInUse; void RedistributeControlThread::setStopAction(bool s) { - mutex::scoped_lock lock(fActionMutex); + boost::mutex::scoped_lock lock(fActionMutex); fStopAction = s; } @@ -153,7 +153,7 @@ void RedistributeControlThread::doRedistribute() fControl->logMessage(fErrorMsg + " @controlThread::doRedistribute"); { - mutex::scoped_lock lock(fActionMutex); + boost::mutex::scoped_lock lock(fActionMutex); fWesInUse.clear(); } } @@ -776,7 +776,7 @@ int RedistributeControlThread::connectToWes(int dbroot) try { - mutex::scoped_lock lock(fActionMutex); + boost::mutex::scoped_lock lock(fActionMutex); fWesInUse = oss.str(); fMsgQueueClient.reset(new MessageQueueClient(fWesInUse, fConfig)); } @@ -793,7 +793,7 @@ int RedistributeControlThread::connectToWes(int dbroot) if (ret != 0) { - mutex::scoped_lock lock(fActionMutex); + boost::mutex::scoped_lock lock(fActionMutex); fWesInUse.clear(); fMsgQueueClient.reset(); @@ -808,7 +808,7 @@ void RedistributeControlThread::doStopAction() fConfig = Config::makeConfig(); fControl = RedistributeControl::instance(); - mutex::scoped_lock lock(fActionMutex); + boost::mutex::scoped_lock lock(fActionMutex); if (!fWesInUse.empty()) { diff --git a/writeengine/redistribute/we_redistributeworkerthread.cpp b/writeengine/redistribute/we_redistributeworkerthread.cpp index b2ed7ba47..dd8eb8cea 100644 --- a/writeengine/redistribute/we_redistributeworkerthread.cpp +++ b/writeengine/redistribute/we_redistributeworkerthread.cpp @@ -99,7 +99,7 @@ RedistributeWorkerThread::RedistributeWorkerThread(ByteStream& bs, IOSocket& ios RedistributeWorkerThread::~RedistributeWorkerThread() { - mutex::scoped_lock lock(fActionMutex); + boost::mutex::scoped_lock lock(fActionMutex); if (fNewFilePtr) closeFile(fNewFilePtr); @@ -140,7 +140,7 @@ void RedistributeWorkerThread::handleRequest() { // clear stop flag if ever set. { - mutex::scoped_lock lock(fActionMutex); + boost::mutex::scoped_lock lock(fActionMutex); fStopAction = false; fCommitted = false; } @@ -188,7 +188,7 @@ void RedistributeWorkerThread::handleRequest() sendResponse(RED_ACTN_REQUEST); - mutex::scoped_lock lock(fActionMutex); + boost::mutex::scoped_lock lock(fActionMutex); fWesInUse.clear(); fMsgQueueClient.reset(); @@ -910,7 +910,7 @@ int RedistributeWorkerThread::updateDbrm() { int rc1 = BRM::ERR_OK; int rc2 = BRM::ERR_OK; - mutex::scoped_lock lock(fActionMutex); + boost::mutex::scoped_lock lock(fActionMutex); // cannot stop after extent map is updated. if (!fStopAction) @@ -1113,7 +1113,7 @@ void RedistributeWorkerThread::addToDirSet(const char* fileName, bool isSource) void RedistributeWorkerThread::handleStop() { - mutex::scoped_lock lock(fActionMutex); + boost::mutex::scoped_lock lock(fActionMutex); // cannot stop after extent map is updated. if (!fCommitted) diff --git a/writeengine/server/we_cpifeederthread.cpp b/writeengine/server/we_cpifeederthread.cpp index cf271fec2..309d20212 100644 --- a/writeengine/server/we_cpifeederthread.cpp +++ b/writeengine/server/we_cpifeederthread.cpp @@ -96,7 +96,7 @@ void WECpiFeederThread::add2MsgQueue(ByteStream& Ibs) //TODO creating copy is NOT good; later read from socket using a SBS messageqcpp::SBS aSbs(new messageqcpp::ByteStream(Ibs)); Ibs.reset(); //forcefully clearing it - mutex::scoped_lock aLock(fMsgQMutex); + boost::mutex::scoped_lock aLock(fMsgQMutex); //cout << "pushing to the MsgQueue" << endl; fMsgQueue.push(aSbs); fFeederCond.notify_one(); // as per preference of Damon @@ -111,7 +111,7 @@ void WECpiFeederThread::feedData2Cpi() while (isContinue()) { - mutex::scoped_lock aLock(fMsgQMutex); + boost::mutex::scoped_lock aLock(fMsgQMutex); if (fMsgQueue.empty()) { @@ -163,7 +163,7 @@ void WECpiFeederThread::feedData2Cpi() bool WECpiFeederThread::isMsgQueueEmpty() { bool aRet = false; - mutex::scoped_lock aLock(fMsgQMutex); + boost::mutex::scoped_lock aLock(fMsgQMutex); aRet = fMsgQueue.empty(); aLock.unlock(); return aRet; @@ -173,7 +173,7 @@ bool WECpiFeederThread::isMsgQueueEmpty() void WECpiFeederThread::stopThread() { - mutex::scoped_lock aCondLock(fContMutex); + boost::mutex::scoped_lock aCondLock(fContMutex); fContinue = false; aCondLock.unlock(); fFeederCond.notify_all(); @@ -184,7 +184,7 @@ void WECpiFeederThread::stopThread() bool WECpiFeederThread::isContinue() { - mutex::scoped_lock aCondLock(fContMutex); + boost::mutex::scoped_lock aCondLock(fContMutex); return fContinue; } diff --git a/writeengine/server/we_dataloader.cpp b/writeengine/server/we_dataloader.cpp index 10a0d0c76..94a9c8553 100644 --- a/writeengine/server/we_dataloader.cpp +++ b/writeengine/server/we_dataloader.cpp @@ -585,7 +585,7 @@ void WEDataLoader::onCpimportSuccess() if (aRet) { - mutex::scoped_lock aLock(fClntMsgMutex); + boost::mutex::scoped_lock aLock(fClntMsgMutex); //aBrmRptParser.unserialize(obs); - was for testing updateTxBytes(obs.length()); @@ -622,7 +622,7 @@ void WEDataLoader::onCpimportSuccess() obs.reset(); obs << (ByteStream::byte)WE_CLT_SRV_CPIPASS; obs << (ByteStream::byte)fPmId; // PM id - mutex::scoped_lock aLock(fClntMsgMutex); + boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try @@ -679,7 +679,7 @@ void WEDataLoader::onCpimportFailure() if (aRet) { - mutex::scoped_lock aLock(fClntMsgMutex); + boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try @@ -727,7 +727,7 @@ void WEDataLoader::sendCpimportFailureNotice() ByteStream obs; obs << (ByteStream::byte)WE_CLT_SRV_CPIFAIL; obs << (ByteStream::byte)fPmId; // PM id - mutex::scoped_lock aLock(fClntMsgMutex); + boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try @@ -773,7 +773,7 @@ void WEDataLoader::onReceiveKeepAlive(ByteStream& Ibs) ByteStream obs; obs << (ByteStream::byte)WE_CLT_SRV_KEEPALIVE; obs << (ByteStream::byte)fPmId; // PM id - mutex::scoped_lock aLock(fClntMsgMutex); + boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try @@ -832,7 +832,7 @@ void WEDataLoader::onReceiveKeepAlive(ByteStream& Ibs) ByteStream obs; obs << (ByteStream::byte)WE_CLT_SRV_EOD; obs << (ByteStream::byte)fPmId; // PM id - mutex::scoped_lock aLock(fClntMsgMutex); + boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try @@ -956,7 +956,7 @@ void WEDataLoader::onReceiveEod(ByteStream& Ibs) ByteStream obs; obs << (ByteStream::byte)WE_CLT_SRV_EOD; obs << (ByteStream::byte)fPmId; // PM id - mutex::scoped_lock aLock(fClntMsgMutex); + boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try @@ -1058,7 +1058,7 @@ void WEDataLoader::onReceiveMode(ByteStream& Ibs) aObs << (ByteStream::byte)WE_CLT_SRV_DBRCNT; aObs << (ByteStream::byte)fPmId; aObs << (ByteStream::byte)aDbCnt; - mutex::scoped_lock aLock(fClntMsgMutex); + boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(aObs.length()); try @@ -1151,7 +1151,7 @@ void WEDataLoader::onReceiveCmdLineArgs(ByteStream& Ibs) } obs << (ByteStream::byte) fPmId; // PM id - mutex::scoped_lock aLock(fClntMsgMutex); + boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try @@ -1290,7 +1290,7 @@ void WEDataLoader::onReceiveCleanup(ByteStream& Ibs) else obs << (ByteStream::byte)0; // cleanup failed - mutex::scoped_lock aLock(fClntMsgMutex); + boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try @@ -1347,7 +1347,7 @@ void WEDataLoader::onReceiveRollback(ByteStream& Ibs) else obs << (ByteStream::byte)0; // Rollback failed - mutex::scoped_lock aLock(fClntMsgMutex); + boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try @@ -1392,7 +1392,7 @@ void WEDataLoader::onReceiveImportFileName(ByteStream& Ibs) obs << (ByteStream::byte)WE_CLT_SRV_IMPFILEERROR; obs << (ByteStream::byte)fPmId; updateTxBytes(obs.length()); - mutex::scoped_lock aLock(fClntMsgMutex); + boost::mutex::scoped_lock aLock(fClntMsgMutex); try { @@ -1430,7 +1430,7 @@ void WEDataLoader::onReceiveImportFileName(ByteStream& Ibs) ByteStream obs; obs << (ByteStream::byte)WE_CLT_SRV_IMPFILEERROR; obs << (ByteStream::byte)fPmId; // PM id - mutex::scoped_lock aLock(fClntMsgMutex); + boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try @@ -1512,7 +1512,7 @@ void WEDataLoader::onReceiveErrFileRqst(ByteStream& Ibs) if (aRet) { - mutex::scoped_lock aLock(fClntMsgMutex); + boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try @@ -1561,7 +1561,7 @@ void WEDataLoader::onReceiveBadFileRqst(ByteStream& Ibs) if (aRet) { - mutex::scoped_lock aLock(fClntMsgMutex); + boost::mutex::scoped_lock aLock(fClntMsgMutex); updateTxBytes(obs.length()); try @@ -1614,7 +1614,7 @@ void WEDataLoader::sendDataRequest() } } - mutex::scoped_lock aLock(fClntMsgMutex); + boost::mutex::scoped_lock aLock(fClntMsgMutex); ByteStream obs; obs << (ByteStream::byte)WE_CLT_SRV_DATARQST; obs << (ByteStream::byte)fPmId; // PM id diff --git a/writeengine/shared/we_brm.cpp b/writeengine/shared/we_brm.cpp index 7c90b86f4..5b4f9339b 100644 --- a/writeengine/shared/we_brm.cpp +++ b/writeengine/shared/we_brm.cpp @@ -1691,7 +1691,7 @@ int BRMWrapper::writeVB(IDBDataFile* pSourceFile, const VER_t transID, const OID fileInfo.fSegment = 0; // fileInfo.fDbRoot = (freeList[0].vbOID % rootCnt) + 1; fileInfo.fDbRoot = dbRoot; - mutex::scoped_lock lk(vbFileLock); + boost::mutex::scoped_lock lk(vbFileLock); pTargetFile = openFile(fileInfo, "r+b", true); if (pTargetFile == NULL) diff --git a/writeengine/splitter/we_filereadthread.cpp b/writeengine/splitter/we_filereadthread.cpp index e455ff518..2d7fb4731 100644 --- a/writeengine/splitter/we_filereadthread.cpp +++ b/writeengine/splitter/we_filereadthread.cpp @@ -263,7 +263,7 @@ void WEFileReadThread::add2InputDataFileList(std::string& FileName) void WEFileReadThread::shutdown() { this->setContinue(false); - mutex::scoped_lock aLock(fFileMutex); //wait till readDataFile() finish + boost::mutex::scoped_lock aLock(fFileMutex); //wait till readDataFile() finish //if(fInFile.is_open()) fInFile.close(); //@BUG 4326 if (fIfFile.is_open()) fIfFile.close(); @@ -300,7 +300,7 @@ void WEFileReadThread::feedData() //cout << "Length " << aSbs->length() <fWriteMutex); + boost::mutex::scoped_lock aLock(fWeSplClients[aIdx]->fWriteMutex); fWeSplClients[aIdx]->write(Bs); aLock.unlock(); } @@ -288,7 +288,7 @@ void WESDHandler::send2Pm(ByteStream& Bs, unsigned int PmId) } else { - mutex::scoped_lock aLock(fWeSplClients[PmId]->fWriteMutex); + boost::mutex::scoped_lock aLock(fWeSplClients[PmId]->fWriteMutex); fWeSplClients[PmId]->write(Bs); aLock.unlock(); } @@ -310,7 +310,7 @@ void WESDHandler::send2Pm(messageqcpp::SBS& Sbs, unsigned int PmId) { if (fWeSplClients[aIdx] != 0) { - mutex::scoped_lock aLock(fWeSplClients[aIdx]->fSentQMutex); + boost::mutex::scoped_lock aLock(fWeSplClients[aIdx]->fSentQMutex); fWeSplClients[aIdx]->add2SendQueue(Sbs); aLock.unlock(); @@ -319,7 +319,7 @@ void WESDHandler::send2Pm(messageqcpp::SBS& Sbs, unsigned int PmId) } else { - mutex::scoped_lock aLock(fWeSplClients[PmId]->fSentQMutex); + boost::mutex::scoped_lock aLock(fWeSplClients[PmId]->fSentQMutex); fWeSplClients[PmId]->add2SendQueue(Sbs); aLock.unlock(); } @@ -363,7 +363,7 @@ void WESDHandler::checkForRespMsgs() while (isContinue()) { - mutex::scoped_lock aLock(fRespMutex); + boost::mutex::scoped_lock aLock(fRespMutex); //NOTE - if isContinue is not checked thread will hang on shutdown // by locking again on fRespList.empty() @@ -458,7 +458,7 @@ void WESDHandler::checkForRespMsgs() void WESDHandler::add2RespQueue(const messageqcpp::SBS& Sbs) { - mutex::scoped_lock aLock(fRespMutex); + boost::mutex::scoped_lock aLock(fRespMutex); fRespList.push_back(Sbs); aLock.unlock(); //cout <<"Notifing from add2RespQueue" << endl; @@ -891,7 +891,7 @@ void WESDHandler::cancelOutstandingCpimports() if (fWeSplClients[aCnt]->isConnected()) { - mutex::scoped_lock aLock(fWeSplClients[aCnt]->fWriteMutex); + boost::mutex::scoped_lock aLock(fWeSplClients[aCnt]->fWriteMutex); fWeSplClients[aCnt]->write(aBs); aLock.unlock(); } @@ -1045,7 +1045,7 @@ void WESDHandler::shutdown() } - mutex::scoped_lock aLock(fRespMutex); + boost::mutex::scoped_lock aLock(fRespMutex); this->setContinue(false); usleep(100000); // so that response thread get updated. fRespCond.notify_all(); @@ -2015,7 +2015,7 @@ void WESDHandler::doRollback() aBs << (ByteStream::quadbyte) fTableOId; aBs << fRef.fCmdArgs.getTableName(); aBs << aAppName; - mutex::scoped_lock aLock(fSendMutex); + boost::mutex::scoped_lock aLock(fSendMutex); send2Pm(aBs); aLock.unlock(); @@ -2032,7 +2032,7 @@ void WESDHandler::doCleanup(bool deleteHdfsTempDbFiles) aBs << (ByteStream::byte) WE_CLT_SRV_CLEANUP; aBs << (ByteStream::quadbyte) fTableOId; aBs << (ByteStream::byte) deleteHdfsTempDbFiles; - mutex::scoped_lock aLock(fSendMutex); + boost::mutex::scoped_lock aLock(fSendMutex); send2Pm(aBs); aLock.unlock(); } diff --git a/writeengine/splitter/we_splclient.cpp b/writeengine/splitter/we_splclient.cpp index 59b776888..5f7b096c6 100644 --- a/writeengine/splitter/we_splclient.cpp +++ b/writeengine/splitter/we_splclient.cpp @@ -215,7 +215,7 @@ void WESplClient::send() cout << "DataRqstCnt [" << getPmId() << "] = " << getDataRqstCount() << endl; - mutex::scoped_lock aLock(fSentQMutex); + boost::mutex::scoped_lock aLock(fSentQMutex); messageqcpp::SBS aSbs = fSendQueue.front(); fSendQueue.pop(); aLock.unlock(); @@ -223,7 +223,7 @@ void WESplClient::send() if (aLen > 0) { - mutex::scoped_lock aLock(fWriteMutex); + boost::mutex::scoped_lock aLock(fWriteMutex); setBytesTx(getBytesTx() + aLen); try @@ -323,7 +323,7 @@ void WESplClient::add2SendQueue(const messageqcpp::SBS& Sbs) void WESplClient::clearSendQueue() { - mutex::scoped_lock aLock(fSentQMutex); + boost::mutex::scoped_lock aLock(fSentQMutex); while (!fSendQueue.empty()) fSendQueue.pop(); @@ -334,7 +334,7 @@ void WESplClient::clearSendQueue() int WESplClient::getSendQSize() { int aQSize = 0; - mutex::scoped_lock aLock(fSentQMutex); + boost::mutex::scoped_lock aLock(fSentQMutex); aQSize = fSendQueue.size(); aLock.unlock(); return aQSize;