1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-27 21:01:50 +03:00

compilation failure

error: reference to 'mutex' is ambiguous
note: candidates are: 'class boost::mutex'
note:                 'class std::mutex'
This commit is contained in:
Sergei Golubchik
2019-12-19 18:13:10 +01:00
parent 36a44ff136
commit 586391e1ca
39 changed files with 240 additions and 240 deletions

View File

@ -214,7 +214,7 @@ void ClientRotator::write(const ByteStream& msg)
ByteStream ClientRotator::read() ByteStream ClientRotator::read()
{ {
mutex::scoped_lock lk(fClientLock); boost::mutex::scoped_lock lk(fClientLock);
ByteStream bs; ByteStream bs;

View File

@ -224,7 +224,7 @@ DistributedEngineComm::~DistributedEngineComm()
void DistributedEngineComm::Setup() void DistributedEngineComm::Setup()
{ {
// This is here to ensure that this function does not get invoked multiple times simultaneously. // This is here to ensure that this function does not get invoked multiple times simultaneously.
mutex::scoped_lock setupLock(fSetupMutex); boost::mutex::scoped_lock setupLock(fSetupMutex);
makeBusy(true); makeBusy(true);
@ -310,7 +310,7 @@ void DistributedEngineComm::Setup()
// for every entry in newClients up to newPmCount, scan for the same ip in the // for every entry in newClients up to newPmCount, scan for the same ip in the
// first pmCount. If there is no match, it's a new node, // first pmCount. If there is no match, it's a new node,
// call the event listeners' newPMOnline() callbacks. // call the event listeners' newPMOnline() callbacks.
mutex::scoped_lock lock(eventListenerLock); boost::mutex::scoped_lock lock(eventListenerLock);
for (uint32_t i = 0; i < newPmCount; i++) for (uint32_t i = 0; i < newPmCount; i++)
{ {
@ -386,7 +386,7 @@ void DistributedEngineComm::Listen(boost::shared_ptr<MessageQueueClient> client,
Error: Error:
// @bug 488 - error condition! push 0 length bs to messagequeuemap and // @bug 488 - error condition! push 0 length bs to messagequeuemap and
// eventually let jobstep error out. // eventually let jobstep error out.
/* mutex::scoped_lock lk(fMlock); /* boost::mutex::scoped_lock lk(fMlock);
//cout << "WARNING: DEC READ 0 LENGTH BS FROM " << client->otherEnd()<< endl; //cout << "WARNING: DEC READ 0 LENGTH BS FROM " << client->otherEnd()<< endl;
MessageQueueMap::iterator map_tok; MessageQueueMap::iterator map_tok;
@ -404,7 +404,7 @@ Error:
ClientList tempConns; ClientList tempConns;
{ {
mutex::scoped_lock onErrLock(fOnErrMutex); boost::mutex::scoped_lock onErrLock(fOnErrMutex);
string moduleName = client->moduleName(); string moduleName = client->moduleName();
//cout << "moduleName=" << moduleName << endl; //cout << "moduleName=" << moduleName << endl;
for ( uint32_t i = 0; i < fPmConnections.size(); i++) for ( uint32_t i = 0; i < fPmConnections.size(); i++)
@ -438,7 +438,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs)
{ {
bool b; bool b;
mutex* lock = new mutex(); boost::mutex* lock = new boost::mutex();
condition* cond = new condition(); condition* cond = new condition();
boost::shared_ptr<MQE> mqe(new MQE(pmCount)); boost::shared_ptr<MQE> mqe(new MQE(pmCount));
@ -446,7 +446,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs)
mqe->sendACKs = sendACKs; mqe->sendACKs = sendACKs;
mqe->throttled = false; mqe->throttled = false;
mutex::scoped_lock lk ( fMlock ); boost::mutex::scoped_lock lk ( fMlock );
b = fSessionMessages.insert(pair<uint32_t, boost::shared_ptr<MQE> >(key, mqe)).second; b = fSessionMessages.insert(pair<uint32_t, boost::shared_ptr<MQE> >(key, mqe)).second;
if (!b) if (!b)
@ -459,7 +459,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs)
void DistributedEngineComm::removeQueue(uint32_t key) void DistributedEngineComm::removeQueue(uint32_t key)
{ {
mutex::scoped_lock lk(fMlock); boost::mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key); MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end()) if (map_tok == fSessionMessages.end())
@ -472,7 +472,7 @@ void DistributedEngineComm::removeQueue(uint32_t key)
void DistributedEngineComm::shutdownQueue(uint32_t key) void DistributedEngineComm::shutdownQueue(uint32_t key)
{ {
mutex::scoped_lock lk(fMlock); boost::mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key); MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end()) if (map_tok == fSessionMessages.end())
@ -487,7 +487,7 @@ void DistributedEngineComm::read(uint32_t key, SBS& bs)
boost::shared_ptr<MQE> mqe; boost::shared_ptr<MQE> mqe;
//Find the StepMsgQueueList for this session //Find the StepMsgQueueList for this session
mutex::scoped_lock lk(fMlock); boost::mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key); MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end()) if (map_tok == fSessionMessages.end())
@ -506,7 +506,7 @@ void DistributedEngineComm::read(uint32_t key, SBS& bs)
if (bs && mqe->sendACKs) if (bs && mqe->sendACKs)
{ {
mutex::scoped_lock lk(ackLock); boost::mutex::scoped_lock lk(ackLock);
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold) if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
setFlowControl(false, key, mqe); setFlowControl(false, key, mqe);
@ -526,7 +526,7 @@ const ByteStream DistributedEngineComm::read(uint32_t key)
boost::shared_ptr<MQE> mqe; boost::shared_ptr<MQE> mqe;
//Find the StepMsgQueueList for this session //Find the StepMsgQueueList for this session
mutex::scoped_lock lk(fMlock); boost::mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key); MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end()) if (map_tok == fSessionMessages.end())
@ -544,7 +544,7 @@ const ByteStream DistributedEngineComm::read(uint32_t key)
if (sbs && mqe->sendACKs) if (sbs && mqe->sendACKs)
{ {
mutex::scoped_lock lk(ackLock); boost::mutex::scoped_lock lk(ackLock);
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold) if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
setFlowControl(false, key, mqe); setFlowControl(false, key, mqe);
@ -564,7 +564,7 @@ void DistributedEngineComm::read_all(uint32_t key, vector<SBS>& v)
{ {
boost::shared_ptr<MQE> mqe; boost::shared_ptr<MQE> mqe;
mutex::scoped_lock lk(fMlock); boost::mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key); MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end()) if (map_tok == fSessionMessages.end())
@ -581,7 +581,7 @@ void DistributedEngineComm::read_all(uint32_t key, vector<SBS>& v)
if (mqe->sendACKs) if (mqe->sendACKs)
{ {
mutex::scoped_lock lk(ackLock); boost::mutex::scoped_lock lk(ackLock);
sendAcks(key, v, mqe, 0); sendAcks(key, v, mqe, 0);
} }
} }
@ -591,7 +591,7 @@ void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector<SBS
{ {
boost::shared_ptr<MQE> mqe; boost::shared_ptr<MQE> mqe;
mutex::scoped_lock lk(fMlock); boost::mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key); MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end()) if (map_tok == fSessionMessages.end())
@ -612,7 +612,7 @@ void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector<SBS
if (mqe->sendACKs) if (mqe->sendACKs)
{ {
mutex::scoped_lock lk(ackLock); boost::mutex::scoped_lock lk(ackLock);
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold) if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
setFlowControl(false, key, mqe); setFlowControl(false, key, mqe);
@ -876,7 +876,7 @@ void DistributedEngineComm::write(messageqcpp::ByteStream& msg, uint32_t connect
PrimitiveHeader* pm = (PrimitiveHeader*) (ism + 1); PrimitiveHeader* pm = (PrimitiveHeader*) (ism + 1);
uint32_t senderID = pm->UniqueID; uint32_t senderID = pm->UniqueID;
mutex::scoped_lock lk(fMlock, defer_lock_t()); boost::mutex::scoped_lock lk(fMlock, boost::defer_lock_t());
MessageQueueMap::iterator it; MessageQueueMap::iterator it;
// This keeps mqe's stats from being freed until end of function // This keeps mqe's stats from being freed until end of function
boost::shared_ptr<MQE> mqe; boost::shared_ptr<MQE> mqe;
@ -909,7 +909,7 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats*
uint32_t uniqueId = p->UniqueID; uint32_t uniqueId = p->UniqueID;
boost::shared_ptr<MQE> mqe; boost::shared_ptr<MQE> mqe;
mutex::scoped_lock lk(fMlock); boost::mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId); MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId);
if (map_tok == fSessionMessages.end()) if (map_tok == fSessionMessages.end())
@ -931,7 +931,7 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats*
if (mqe->sendACKs) if (mqe->sendACKs)
{ {
mutex::scoped_lock lk(ackLock); boost::mutex::scoped_lock lk(ackLock);
uint64_t msgSize = sbs->lengthWithHdrOverhead(); uint64_t msgSize = sbs->lengthWithHdrOverhead();
if (!mqe->throttled && msgSize > (targetRecvQueueSize / 2)) if (!mqe->throttled && msgSize > (targetRecvQueueSize / 2))
@ -956,7 +956,7 @@ void DistributedEngineComm::doHasBigMsgs(boost::shared_ptr<MQE> mqe, uint64_t ta
int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uint32_t sender, bool doInterleaving) int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uint32_t sender, bool doInterleaving)
{ {
mutex::scoped_lock lk(fMlock, defer_lock_t()); boost::mutex::scoped_lock lk(fMlock, boost::defer_lock_t());
MessageQueueMap::iterator it; MessageQueueMap::iterator it;
// Keep mqe's stats from being freed early // Keep mqe's stats from being freed early
boost::shared_ptr<MQE> mqe; boost::shared_ptr<MQE> mqe;
@ -992,7 +992,7 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin
if (!client->isAvailable()) return 0; if (!client->isAvailable()) return 0;
mutex::scoped_lock lk(*(fWlock[index])); boost::mutex::scoped_lock lk(*(fWlock[index]));
client->write(bs, NULL, senderStats); client->write(bs, NULL, senderStats);
return 0; return 0;
} }
@ -1019,7 +1019,7 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin
ClientList tempConns; ClientList tempConns;
{ {
//cout << "WARNING: DEC WRITE BROKEN PIPE " << fPmConnections[index]->otherEnd()<< endl; //cout << "WARNING: DEC WRITE BROKEN PIPE " << fPmConnections[index]->otherEnd()<< endl;
mutex::scoped_lock onErrLock(fOnErrMutex); boost::mutex::scoped_lock onErrLock(fOnErrMutex);
string moduleName = fPmConnections[index]->moduleName(); string moduleName = fPmConnections[index]->moduleName();
//cout << "module name = " << moduleName << endl; //cout << "module name = " << moduleName << endl;
if (index >= fPmConnections.size()) return 0; if (index >= fPmConnections.size()) return 0;
@ -1051,7 +1051,7 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin
uint32_t DistributedEngineComm::size(uint32_t key) uint32_t DistributedEngineComm::size(uint32_t key)
{ {
mutex::scoped_lock lk(fMlock); boost::mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key); MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end()) if (map_tok == fSessionMessages.end())
@ -1065,13 +1065,13 @@ uint32_t DistributedEngineComm::size(uint32_t key)
void DistributedEngineComm::addDECEventListener(DECEventListener* l) void DistributedEngineComm::addDECEventListener(DECEventListener* l)
{ {
mutex::scoped_lock lk(eventListenerLock); boost::mutex::scoped_lock lk(eventListenerLock);
eventListeners.push_back(l); eventListeners.push_back(l);
} }
void DistributedEngineComm::removeDECEventListener(DECEventListener* l) void DistributedEngineComm::removeDECEventListener(DECEventListener* l)
{ {
mutex::scoped_lock lk(eventListenerLock); boost::mutex::scoped_lock lk(eventListenerLock);
std::vector<DECEventListener*> newListeners; std::vector<DECEventListener*> newListeners;
uint32_t s = eventListeners.size(); uint32_t s = eventListeners.size();
@ -1084,7 +1084,7 @@ void DistributedEngineComm::removeDECEventListener(DECEventListener* l)
Stats DistributedEngineComm::getNetworkStats(uint32_t uniqueID) Stats DistributedEngineComm::getNetworkStats(uint32_t uniqueID)
{ {
mutex::scoped_lock lk(fMlock); boost::mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator it; MessageQueueMap::iterator it;
Stats empty; Stats empty;

View File

@ -57,11 +57,11 @@ const string ResourceManager::fExtentMapStr("ExtentMap");
const string ResourceManager::fOrderByLimitStr("OrderByLimit"); const string ResourceManager::fOrderByLimitStr("OrderByLimit");
ResourceManager* ResourceManager::fInstance = NULL; ResourceManager* ResourceManager::fInstance = NULL;
mutex mx; boost::mutex mx;
ResourceManager* ResourceManager::instance(bool runningInExeMgr) ResourceManager* ResourceManager::instance(bool runningInExeMgr)
{ {
mutex::scoped_lock lk(mx); boost::mutex::scoped_lock lk(mx);
if (!fInstance) if (!fInstance)
fInstance = new ResourceManager(runningInExeMgr); fInstance = new ResourceManager(runningInExeMgr);

View File

@ -927,7 +927,7 @@ void TupleBPS::prepCasualPartitioning()
{ {
uint32_t i; uint32_t i;
int64_t min, max, seq; int64_t min, max, seq;
mutex::scoped_lock lk(cpMutex); boost::mutex::scoped_lock lk(cpMutex);
for (i = 0; i < scannedExtents.size(); i++) for (i = 0; i < scannedExtents.size(); i++)
{ {
@ -3312,7 +3312,7 @@ void TupleBPS::abort_nolock()
void TupleBPS::abort() void TupleBPS::abort()
{ {
boost::mutex::scoped_lock scoped(mutex); boost::mutex::scoped_lock scoped(boost::mutex);
abort_nolock(); abort_nolock();
} }

View File

@ -414,7 +414,7 @@ void TupleAggregateStep::initializeMultiThread()
for (i = 0; i < fNumOfBuckets; i++) for (i = 0; i < fNumOfBuckets; i++)
{ {
mutex* lock = new mutex(); boost::mutex* lock = new boost::mutex();
fAgg_mutex.push_back(lock); fAgg_mutex.push_back(lock);
fRowGroupOuts[i] = fRowGroupOut; fRowGroupOuts[i] = fRowGroupOut;
rgData.reinit(fRowGroupOut); rgData.reinit(fRowGroupOut);

View File

@ -142,7 +142,7 @@ void TupleHashJoinStep::run()
{ {
uint32_t i; uint32_t i;
mutex::scoped_lock lk(jlLock); boost::mutex::scoped_lock lk(jlLock);
if (runRan) if (runRan)
return; return;
@ -183,7 +183,7 @@ void TupleHashJoinStep::run()
void TupleHashJoinStep::join() void TupleHashJoinStep::join()
{ {
mutex::scoped_lock lk(jlLock); boost::mutex::scoped_lock lk(jlLock);
if (joinRan) if (joinRan)
return; return;

View File

@ -226,7 +226,7 @@ void TupleUnion::readInput(uint32_t which)
l_tmpRG.getRow(0, &tmpRow); l_tmpRG.getRow(0, &tmpRow);
{ {
mutex::scoped_lock lk(uniquerMutex); boost::mutex::scoped_lock lk(uniquerMutex);
getOutput(&l_outputRG, &outRow, &outRGData); getOutput(&l_outputRG, &outRow, &outRGData);
memUsageBefore = allocator.getMemUsage(); memUsageBefore = allocator.getMemUsage();
@ -294,8 +294,8 @@ void TupleUnion::readInput(uint32_t which)
more = dl->next(it, &inRGData); more = dl->next(it, &inRGData);
{ {
mutex::scoped_lock lock1(uniquerMutex); boost::mutex::scoped_lock lock1(uniquerMutex);
mutex::scoped_lock lock2(sMutex); boost::mutex::scoped_lock lock2(sMutex);
if (!distinct && l_outputRG.getRowCount() > 0) if (!distinct && l_outputRG.getRowCount() > 0)
output->insert(outRGData); output->insert(outRGData);
@ -395,7 +395,7 @@ void TupleUnion::addToOutput(Row* r, RowGroup* rg, bool keepit,
if (rg->getRowCount() == 8192) if (rg->getRowCount() == 8192)
{ {
{ {
mutex::scoped_lock lock(sMutex); boost::mutex::scoped_lock lock(sMutex);
output->insert(data); output->insert(data);
} }
data = RGData(*rg); data = RGData(*rg);
@ -1182,7 +1182,7 @@ void TupleUnion::run()
{ {
uint32_t i; uint32_t i;
mutex::scoped_lock lk(jlLock); boost::mutex::scoped_lock lk(jlLock);
if (runRan) if (runRan)
return; return;
@ -1225,7 +1225,7 @@ void TupleUnion::run()
void TupleUnion::join() void TupleUnion::join()
{ {
mutex::scoped_lock lk(jlLock); boost::mutex::scoped_lock lk(jlLock);
if (joinRan) if (joinRan)
return; return;

View File

@ -177,7 +177,7 @@ uint64_t BatchInsertProc::grabTableLock(int32_t sessionId)
BatchInsertProc::SP_PKG BatchInsertProc::getInsertQueue() BatchInsertProc::SP_PKG BatchInsertProc::getInsertQueue()
{ {
mutex::scoped_lock lk(fLock); boost::mutex::scoped_lock lk(fLock);
return fInsertPkgQueue; return fInsertPkgQueue;
} }
@ -187,7 +187,7 @@ void BatchInsertProc::setLastPkg (bool lastPkg)
} }
void BatchInsertProc::addPkg(messageqcpp::ByteStream& insertBs) void BatchInsertProc::addPkg(messageqcpp::ByteStream& insertBs)
{ {
mutex::scoped_lock lk(fLock); boost::mutex::scoped_lock lk(fLock);
fInsertPkgQueue->push(insertBs); fInsertPkgQueue->push(insertBs);
} }
@ -195,7 +195,7 @@ void BatchInsertProc::addPkg(messageqcpp::ByteStream& insertBs)
messageqcpp::ByteStream BatchInsertProc::getPkg() messageqcpp::ByteStream BatchInsertProc::getPkg()
{ {
messageqcpp::ByteStream bs; messageqcpp::ByteStream bs;
mutex::scoped_lock lk(fLock); boost::mutex::scoped_lock lk(fLock);
bs = fInsertPkgQueue->front(); bs = fInsertPkgQueue->front();
fInsertPkgQueue->pop(); fInsertPkgQueue->pop();
return bs; return bs;
@ -535,13 +535,13 @@ void BatchInsertProc::setHwm()
void BatchInsertProc::setError(int errorCode, std::string errMsg) void BatchInsertProc::setError(int errorCode, std::string errMsg)
{ {
mutex::scoped_lock lk(fLock); boost::mutex::scoped_lock lk(fLock);
fErrorCode = errorCode; fErrorCode = errorCode;
fErrMsg = errMsg; fErrMsg = errMsg;
} }
void BatchInsertProc::getError(int& errorCode, std::string& errMsg) void BatchInsertProc::getError(int& errorCode, std::string& errMsg)
{ {
mutex::scoped_lock lk(fLock); boost::mutex::scoped_lock lk(fLock);
errorCode = fErrorCode; errorCode = fErrorCode;
errMsg = fErrMsg; errMsg = fErrMsg;
} }

View File

@ -1689,7 +1689,7 @@ void DMLProcessor::operator()()
fConcurrentSupport, maxDeleteRows, sessionID, txnid.id, fDbrm, fQtc, csc)); fConcurrentSupport, maxDeleteRows, sessionID, txnid.id, fDbrm, fQtc, csc));
// We put the packageHandler into a map so that if we receive a // We put the packageHandler into a map so that if we receive a
// message to affect the previous command, we can find it. // message to affect the previous command, we can find it.
boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, defer_lock); boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, boost::defer_lock);
lk2.lock(); lk2.lock();
packageHandlerMap[sessionID] = php; packageHandlerMap[sessionID] = php;
@ -1731,7 +1731,7 @@ void DMLProcessor::operator()()
fConcurrentSupport, maxDeleteRows, sessionID, 0, fDbrm, fQtc, csc)); fConcurrentSupport, maxDeleteRows, sessionID, 0, fDbrm, fQtc, csc));
// We put the packageHandler into a map so that if we receive a // We put the packageHandler into a map so that if we receive a
// message to affect the previous command, we can find it. // message to affect the previous command, we can find it.
boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, defer_lock); boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, boost::defer_lock);
lk2.lock(); lk2.lock();
packageHandlerMap[sessionID] = php; packageHandlerMap[sessionID] = php;

View File

@ -32,7 +32,7 @@ void ActiveStatementCounter::incr(bool& counted)
return; return;
counted = true; counted = true;
mutex::scoped_lock lk(fMutex); boost::mutex::scoped_lock lk(fMutex);
if (upperLimit > 0) if (upperLimit > 0)
while (fStatementCount >= upperLimit) while (fStatementCount >= upperLimit)
@ -51,7 +51,7 @@ void ActiveStatementCounter::decr(bool& counted)
return; return;
counted = false; counted = false;
mutex::scoped_lock lk(fMutex); boost::mutex::scoped_lock lk(fMutex);
if (fStatementCount == 0) if (fStatementCount == 0)
return; return;

View File

@ -39,7 +39,7 @@ using namespace boost;
namespace namespace
{ {
oam::OamCache* oamCache = 0; oam::OamCache* oamCache = 0;
mutex cacheLock; boost::mutex cacheLock;
} }
namespace oam namespace oam
@ -47,7 +47,7 @@ namespace oam
OamCache* OamCache::makeOamCache() OamCache* OamCache::makeOamCache()
{ {
mutex::scoped_lock lk(cacheLock); boost::mutex::scoped_lock lk(cacheLock);
if (oamCache == 0) if (oamCache == 0)
oamCache = new OamCache(); oamCache = new OamCache();
@ -226,7 +226,7 @@ void OamCache::checkReload()
OamCache::dbRootPMMap_t OamCache::getDBRootToPMMap() OamCache::dbRootPMMap_t OamCache::getDBRootToPMMap()
{ {
mutex::scoped_lock lk(cacheLock); boost::mutex::scoped_lock lk(cacheLock);
checkReload(); checkReload();
return dbRootPMMap; return dbRootPMMap;
@ -234,7 +234,7 @@ OamCache::dbRootPMMap_t OamCache::getDBRootToPMMap()
OamCache::dbRootPMMap_t OamCache::getDBRootToConnectionMap() OamCache::dbRootPMMap_t OamCache::getDBRootToConnectionMap()
{ {
mutex::scoped_lock lk(cacheLock); boost::mutex::scoped_lock lk(cacheLock);
checkReload(); checkReload();
return dbRootConnectionMap; return dbRootConnectionMap;
@ -242,7 +242,7 @@ OamCache::dbRootPMMap_t OamCache::getDBRootToConnectionMap()
OamCache::PMDbrootsMap_t OamCache::getPMToDbrootsMap() OamCache::PMDbrootsMap_t OamCache::getPMToDbrootsMap()
{ {
mutex::scoped_lock lk(cacheLock); boost::mutex::scoped_lock lk(cacheLock);
checkReload(); checkReload();
return pmDbrootsMap; return pmDbrootsMap;
@ -250,7 +250,7 @@ OamCache::PMDbrootsMap_t OamCache::getPMToDbrootsMap()
uint32_t OamCache::getDBRootCount() uint32_t OamCache::getDBRootCount()
{ {
mutex::scoped_lock lk(cacheLock); boost::mutex::scoped_lock lk(cacheLock);
checkReload(); checkReload();
return numDBRoots; return numDBRoots;
@ -258,7 +258,7 @@ uint32_t OamCache::getDBRootCount()
DBRootConfigList& OamCache::getDBRootNums() DBRootConfigList& OamCache::getDBRootNums()
{ {
mutex::scoped_lock lk(cacheLock); boost::mutex::scoped_lock lk(cacheLock);
checkReload(); checkReload();
return dbroots; return dbroots;
@ -266,7 +266,7 @@ DBRootConfigList& OamCache::getDBRootNums()
std::vector<int>& OamCache::getModuleIds() std::vector<int>& OamCache::getModuleIds()
{ {
mutex::scoped_lock lk(cacheLock); boost::mutex::scoped_lock lk(cacheLock);
checkReload(); checkReload();
return moduleIds; return moduleIds;
@ -274,7 +274,7 @@ std::vector<int>& OamCache::getModuleIds()
std::string OamCache::getOAMParentModuleName() std::string OamCache::getOAMParentModuleName()
{ {
mutex::scoped_lock lk(cacheLock); boost::mutex::scoped_lock lk(cacheLock);
checkReload(); checkReload();
return OAMParentModuleName; return OAMParentModuleName;
@ -282,7 +282,7 @@ std::string OamCache::getOAMParentModuleName()
int OamCache::getLocalPMId() int OamCache::getLocalPMId()
{ {
mutex::scoped_lock lk(cacheLock); boost::mutex::scoped_lock lk(cacheLock);
// This comes from the file /var/lib/columnstore/local/module, not from the xml. // This comes from the file /var/lib/columnstore/local/module, not from the xml.
// Thus, it's not refreshed during checkReload(). // Thus, it's not refreshed during checkReload().
@ -324,7 +324,7 @@ int OamCache::getLocalPMId()
string OamCache::getSystemName() string OamCache::getSystemName()
{ {
mutex::scoped_lock lk(cacheLock); boost::mutex::scoped_lock lk(cacheLock);
checkReload(); checkReload();
return systemName; return systemName;
@ -332,7 +332,7 @@ string OamCache::getSystemName()
string OamCache::getModuleName() string OamCache::getModuleName()
{ {
mutex::scoped_lock lk(cacheLock); boost::mutex::scoped_lock lk(cacheLock);
if (!moduleName.empty()) if (!moduleName.empty())
return moduleName; return moduleName;

View File

@ -105,7 +105,7 @@ void FileBufferMgr::setReportingFrequency(const uint32_t d)
void FileBufferMgr::flushCache() void FileBufferMgr::flushCache()
{ {
mutex::scoped_lock lk(fWLock); boost::mutex::scoped_lock lk(fWLock);
{ {
filebuffer_uset_t sEmpty; filebuffer_uset_t sEmpty;
filebuffer_list_t lEmpty; filebuffer_list_t lEmpty;
@ -131,7 +131,7 @@ void FileBufferMgr::flushCache()
void FileBufferMgr::flushOne(const BRM::LBID_t lbid, const BRM::VER_t ver) void FileBufferMgr::flushOne(const BRM::LBID_t lbid, const BRM::VER_t ver)
{ {
//similar in function to depleteCache() //similar in function to depleteCache()
mutex::scoped_lock lk(fWLock); boost::mutex::scoped_lock lk(fWLock);
filebuffer_uset_iter_t iter = fbSet.find(HashObject_t(lbid, ver, 0)); filebuffer_uset_iter_t iter = fbSet.find(HashObject_t(lbid, ver, 0));
@ -152,7 +152,7 @@ void FileBufferMgr::flushOne(const BRM::LBID_t lbid, const BRM::VER_t ver)
void FileBufferMgr::flushMany(const LbidAtVer* laVptr, uint32_t cnt) void FileBufferMgr::flushMany(const LbidAtVer* laVptr, uint32_t cnt)
{ {
mutex::scoped_lock lk(fWLock); boost::mutex::scoped_lock lk(fWLock);
BRM::LBID_t lbid; BRM::LBID_t lbid;
BRM::VER_t ver; BRM::VER_t ver;
@ -199,7 +199,7 @@ void FileBufferMgr::flushManyAllversion(const LBID_t* laVptr, uint32_t cnt)
tr1::unordered_set<LBID_t> uniquer; tr1::unordered_set<LBID_t> uniquer;
tr1::unordered_set<LBID_t>::iterator uit; tr1::unordered_set<LBID_t>::iterator uit;
mutex::scoped_lock lk(fWLock); boost::mutex::scoped_lock lk(fWLock);
if (fReportFrequency) if (fReportFrequency)
{ {
@ -264,7 +264,7 @@ void FileBufferMgr::flushOIDs(const uint32_t* oids, uint32_t count)
// If there are more than this # of extents to drop, the whole cache will be cleared // If there are more than this # of extents to drop, the whole cache will be cleared
const uint32_t clearThreshold = 50000; const uint32_t clearThreshold = 50000;
mutex::scoped_lock lk(fWLock); boost::mutex::scoped_lock lk(fWLock);
if (fCacheSize == 0 || count == 0) if (fCacheSize == 0 || count == 0)
return; return;
@ -323,7 +323,7 @@ void FileBufferMgr::flushPartition(const vector<OID_t>& oids, const set<BRM::Log
filebuffer_uset_t::iterator it; filebuffer_uset_t::iterator it;
uint32_t count = oids.size(); uint32_t count = oids.size();
mutex::scoped_lock lk(fWLock); boost::mutex::scoped_lock lk(fWLock);
if (fReportFrequency) if (fReportFrequency)
{ {
@ -398,7 +398,7 @@ bool FileBufferMgr::exists(const BRM::LBID_t& lbid, const BRM::VER_t& ver) const
FileBuffer* FileBufferMgr::findPtr(const HashObject_t& keyFb) FileBuffer* FileBufferMgr::findPtr(const HashObject_t& keyFb)
{ {
mutex::scoped_lock lk(fWLock); boost::mutex::scoped_lock lk(fWLock);
filebuffer_uset_iter_t it = fbSet.find(keyFb); filebuffer_uset_iter_t it = fbSet.find(keyFb);
@ -418,7 +418,7 @@ bool FileBufferMgr::find(const HashObject_t& keyFb, FileBuffer& fb)
{ {
bool ret = false; bool ret = false;
mutex::scoped_lock lk(fWLock); boost::mutex::scoped_lock lk(fWLock);
filebuffer_uset_iter_t it = fbSet.find(keyFb); filebuffer_uset_iter_t it = fbSet.find(keyFb);
@ -444,7 +444,7 @@ bool FileBufferMgr::find(const HashObject_t& keyFb, void* bufferPtr)
#else #else
gPMStatsPtr->markEvent(keyFb.lbid, pthread_self(), gSession, 'L'); gPMStatsPtr->markEvent(keyFb.lbid, pthread_self(), gSession, 'L');
#endif #endif
mutex::scoped_lock lk(fWLock); boost::mutex::scoped_lock lk(fWLock);
if (gPMProfOn && gPMStatsPtr) if (gPMProfOn && gPMStatsPtr)
#ifdef _MSC_VER #ifdef _MSC_VER
@ -497,7 +497,7 @@ uint32_t FileBufferMgr::bulkFind(const BRM::LBID_t* lbids, const BRM::VER_t* ver
} }
} }
mutex::scoped_lock lk(fWLock); boost::mutex::scoped_lock lk(fWLock);
if (gPMProfOn && gPMStatsPtr) if (gPMProfOn && gPMStatsPtr)
{ {
@ -558,7 +558,7 @@ uint32_t FileBufferMgr::bulkFind(const BRM::LBID_t* lbids, const BRM::VER_t* ver
bool FileBufferMgr::exists(const HashObject_t& fb) const bool FileBufferMgr::exists(const HashObject_t& fb) const
{ {
bool find_bool = false; bool find_bool = false;
mutex::scoped_lock lk(fWLock); boost::mutex::scoped_lock lk(fWLock);
filebuffer_uset_iter_t it = fbSet.find(fb); filebuffer_uset_iter_t it = fbSet.find(fb);
@ -590,7 +590,7 @@ int FileBufferMgr::insert(const BRM::LBID_t lbid, const BRM::VER_t ver, const ui
gPMStatsPtr->markEvent(lbid, pthread_self(), gSession, 'I'); gPMStatsPtr->markEvent(lbid, pthread_self(), gSession, 'I');
#endif #endif
mutex::scoped_lock lk(fWLock); boost::mutex::scoped_lock lk(fWLock);
HashObject_t fbIndex(lbid, ver, 0); HashObject_t fbIndex(lbid, ver, 0);
filebuffer_pair_t pr = fbSet.insert(fbIndex); filebuffer_pair_t pr = fbSet.insert(fbIndex);
@ -796,7 +796,7 @@ int FileBufferMgr::bulkInsert(const vector<CacheInsert_t>& ops)
int32_t pi; int32_t pi;
int ret = 0; int ret = 0;
mutex::scoped_lock lk(fWLock); boost::mutex::scoped_lock lk(fWLock);
if (fReportFrequency) if (fReportFrequency)
{ {

View File

@ -179,7 +179,7 @@ typedef map<uint32_t, TraceFileInfo> TraceFileMap_t;
TraceFileMap_t traceFileMap; TraceFileMap_t traceFileMap;
//map mutex //map mutex
mutex traceFileMapMutex; boost::mutex traceFileMapMutex;
class StatMon class StatMon
{ {
@ -198,7 +198,7 @@ public:
void operator()() const void operator()() const
{ {
//struct timespec ts = { 60 * 1, 0 }; //struct timespec ts = { 60 * 1, 0 };
mutex::scoped_lock lk(traceFileMapMutex); boost::mutex::scoped_lock lk(traceFileMapMutex);
TraceFileMap_t::iterator iter; TraceFileMap_t::iterator iter;
TraceFileMap_t::iterator end; TraceFileMap_t::iterator end;
@ -259,7 +259,7 @@ void Stats::touchedLBID(uint64_t lbid, pthread_t thdid, uint32_t session)
{ {
if (session == 0) return; if (session == 0) return;
mutex::scoped_lock lk(traceFileMapMutex); boost::mutex::scoped_lock lk(traceFileMapMutex);
TraceFileMap_t::iterator iter = traceFileMap.find(session); TraceFileMap_t::iterator iter = traceFileMap.find(session);
if (iter == traceFileMap.end()) if (iter == traceFileMap.end())
@ -276,7 +276,7 @@ void Stats::markEvent(const uint64_t lbid, const pthread_t thdid, const uint32_t
{ {
if (session == 0) return; if (session == 0) return;
mutex::scoped_lock lk(traceFileMapMutex); boost::mutex::scoped_lock lk(traceFileMapMutex);
TraceFileMap_t::iterator iter = traceFileMap.find(session); TraceFileMap_t::iterator iter = traceFileMap.find(session);
if (iter == traceFileMap.end()) if (iter == traceFileMap.end())

View File

@ -773,7 +773,7 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
// TODO: write an RGData fcn to let it interpret data within a ByteStream to avoid // TODO: write an RGData fcn to let it interpret data within a ByteStream to avoid
// the extra copying. // the extra copying.
offTheWire.deserialize(bs); offTheWire.deserialize(bs);
mutex::scoped_lock lk(smallSideDataLocks[joinerNum]); boost::mutex::scoped_lock lk(smallSideDataLocks[joinerNum]);
smallSide.setData(&smallSideRowData[joinerNum]); smallSide.setData(&smallSideRowData[joinerNum]);
smallSide.append(offTheWire, startPos); smallSide.append(offTheWire, startPos);
@ -790,7 +790,7 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
{ {
joblist::ElementType *et = (joblist::ElementType*) bs.buf(); joblist::ElementType *et = (joblist::ElementType*) bs.buf();
mutex::scoped_lock lk(addToJoinerLocks[0][0]); boost::mutex::scoped_lock lk(addToJoinerLocks[0][0]);
for (i = 0; i < count; i++) for (i = 0; i < count; i++)
{ {
// cout << "BPP: adding <" << et[i].first << ", " << et[i].second << "> to Joiner\n"; // cout << "BPP: adding <" << et[i].first << ", " << et[i].second << "> to Joiner\n";

View File

@ -51,8 +51,8 @@ BPPSendThread::BPPSendThread(uint32_t initMsgsLeft) : die(false), gotException(f
BPPSendThread::~BPPSendThread() BPPSendThread::~BPPSendThread()
{ {
mutex::scoped_lock sl(msgQueueLock); boost::mutex::scoped_lock sl(msgQueueLock);
mutex::scoped_lock sl2(ackLock); boost::mutex::scoped_lock sl2(ackLock);
die = true; die = true;
queueNotEmpty.notify_one(); queueNotEmpty.notify_one();
okToSend.notify_one(); okToSend.notify_one();
@ -74,7 +74,7 @@ void BPPSendThread::sendResult(const Msg_t& msg, bool newConnection)
if (die) if (die)
return; return;
mutex::scoped_lock sl(msgQueueLock); boost::mutex::scoped_lock sl(msgQueueLock);
if (gotException) if (gotException)
throw runtime_error(exceptionString); throw runtime_error(exceptionString);
@ -108,7 +108,7 @@ void BPPSendThread::sendResults(const vector<Msg_t>& msgs, bool newConnection)
if (die) if (die)
return; return;
mutex::scoped_lock sl(msgQueueLock); boost::mutex::scoped_lock sl(msgQueueLock);
if (gotException) if (gotException)
throw runtime_error(exceptionString); throw runtime_error(exceptionString);
@ -143,7 +143,7 @@ void BPPSendThread::sendResults(const vector<Msg_t>& msgs, bool newConnection)
void BPPSendThread::sendMore(int num) void BPPSendThread::sendMore(int num)
{ {
mutex::scoped_lock sl(ackLock); boost::mutex::scoped_lock sl(ackLock);
// cout << "got an ACK for " << num << " msgsLeft=" << msgsLeft << endl; // cout << "got an ACK for " << num << " msgsLeft=" << msgsLeft << endl;
if (num == -1) if (num == -1)
@ -178,7 +178,7 @@ void BPPSendThread::mainLoop()
while (!die) while (!die)
{ {
mutex::scoped_lock sl(msgQueueLock); boost::mutex::scoped_lock sl(msgQueueLock);
if (msgQueue.empty() && !die) if (msgQueue.empty() && !die)
{ {
@ -209,7 +209,7 @@ void BPPSendThread::mainLoop()
if (msgsLeft <= 0 && fcEnabled && !die) if (msgsLeft <= 0 && fcEnabled && !die)
{ {
mutex::scoped_lock sl2(ackLock); boost::mutex::scoped_lock sl2(ackLock);
while (msgsLeft <= 0 && fcEnabled && !die) while (msgsLeft <= 0 && fcEnabled && !die)
{ {
@ -238,7 +238,7 @@ void BPPSendThread::mainLoop()
try try
{ {
mutex::scoped_lock sl2(*lock); boost::mutex::scoped_lock sl2(*lock);
sock->write(*msg[msgsSent].msg); sock->write(*msg[msgsSent].msg);
//cout << "sent 1 msg\n"; //cout << "sent 1 msg\n";
} }
@ -260,8 +260,8 @@ void BPPSendThread::mainLoop()
void BPPSendThread::abort() void BPPSendThread::abort()
{ {
mutex::scoped_lock sl(msgQueueLock); boost::mutex::scoped_lock sl(msgQueueLock);
mutex::scoped_lock sl2(ackLock); boost::mutex::scoped_lock sl2(ackLock);
die = true; die = true;
queueNotEmpty.notify_one(); queueNotEmpty.notify_one();
okToSend.notify_one(); okToSend.notify_one();

View File

@ -150,11 +150,11 @@ int noVB = 0;
const uint8_t fMaxColWidth(8); const uint8_t fMaxColWidth(8);
BPPMap bppMap; BPPMap bppMap;
mutex bppLock; boost::mutex bppLock;
#define DJLOCK_READ 0 #define DJLOCK_READ 0
#define DJLOCK_WRITE 1 #define DJLOCK_WRITE 1
mutex djMutex; // lock for djLock, lol. boost::mutex djMutex; // lock for djLock, lol.
std::map<uint64_t, shared_mutex *> djLock; // djLock synchronizes destroy and joiner msgs, see bug 2619 std::map<uint64_t, shared_mutex *> djLock; // djLock synchronizes destroy and joiner msgs, see bug 2619
volatile int32_t asyncCounter; volatile int32_t asyncCounter;
@ -187,7 +187,7 @@ pfBlockMap_t pfExtentMap;
boost::mutex pfMutex; // = PTHREAD_MUTEX_INITIALIZER; boost::mutex pfMutex; // = PTHREAD_MUTEX_INITIALIZER;
map<uint32_t, boost::shared_ptr<DictEqualityFilter> > dictEqualityFilters; map<uint32_t, boost::shared_ptr<DictEqualityFilter> > dictEqualityFilters;
mutex eqFilterMutex; boost::mutex eqFilterMutex;
uint32_t cacheNum(uint64_t lbid) uint32_t cacheNum(uint64_t lbid)
{ {
@ -1047,7 +1047,7 @@ void loadBlockAsync(uint64_t lbid,
(void)atomicops::atomicInc(&asyncCounter); (void)atomicops::atomicInc(&asyncCounter);
mutex::scoped_lock sl(*m); boost::mutex::scoped_lock sl(*m);
try try
{ {
@ -1134,7 +1134,7 @@ DictScanJob::~DictScanJob()
void DictScanJob::write(const ByteStream& bs) void DictScanJob::write(const ByteStream& bs)
{ {
mutex::scoped_lock lk(*fWriteLock); boost::mutex::scoped_lock lk(*fWriteLock);
fIos->write(bs); fIos->write(bs);
} }
@ -1178,7 +1178,7 @@ int DictScanJob::operator()()
/* Grab the equality filter if one is specified */ /* Grab the equality filter if one is specified */
if (cmd->flags & HAS_EQ_FILTER) if (cmd->flags & HAS_EQ_FILTER)
{ {
mutex::scoped_lock sl(eqFilterMutex); boost::mutex::scoped_lock sl(eqFilterMutex);
map<uint32_t, boost::shared_ptr<DictEqualityFilter> >::iterator it; map<uint32_t, boost::shared_ptr<DictEqualityFilter> >::iterator it;
it = dictEqualityFilters.find(uniqueId); it = dictEqualityFilters.find(uniqueId);
@ -1280,7 +1280,7 @@ struct BPPHandler
~BPPHandler() ~BPPHandler()
{ {
mutex::scoped_lock scoped(bppLock); boost::mutex::scoped_lock scoped(bppLock);
for (bppKeysIt = bppKeys.begin() ; bppKeysIt != bppKeys.end(); ++bppKeysIt) for (bppKeysIt = bppKeys.begin() ; bppKeysIt != bppKeys.end(); ++bppKeysIt)
{ {
@ -1383,7 +1383,7 @@ struct BPPHandler
return -1; return -1;
} }
mutex::scoped_lock scoped(bppLock); boost::mutex::scoped_lock scoped(bppLock);
bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), key); bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), key);
if (bppKeysIt != bppKeys.end()) if (bppKeysIt != bppKeys.end())
@ -1475,7 +1475,7 @@ struct BPPHandler
} }
} }
mutex::scoped_lock scoped(bppLock); boost::mutex::scoped_lock scoped(bppLock);
key = bpp->getUniqueID(); key = bpp->getUniqueID();
bppKeys.push_back(key); bppKeys.push_back(key);
bool newInsert; bool newInsert;
@ -1504,7 +1504,7 @@ struct BPPHandler
*/ */
SBPPV ret; SBPPV ret;
mutex::scoped_lock scoped(bppLock); boost::mutex::scoped_lock scoped(bppLock);
it = bppMap.find(uniqueID); it = bppMap.find(uniqueID);
if (it != bppMap.end()) if (it != bppMap.end())
@ -1533,7 +1533,7 @@ struct BPPHandler
inline shared_mutex & getDJLock(uint32_t uniqueID) inline shared_mutex & getDJLock(uint32_t uniqueID)
{ {
mutex::scoped_lock lk(djMutex); boost::mutex::scoped_lock lk(djMutex);
auto it = djLock.find(uniqueID); auto it = djLock.find(uniqueID);
if (it != djLock.end()) if (it != djLock.end())
return *it->second; return *it->second;
@ -1546,7 +1546,7 @@ struct BPPHandler
inline void deleteDJLock(uint32_t uniqueID) inline void deleteDJLock(uint32_t uniqueID)
{ {
mutex::scoped_lock lk(djMutex); boost::mutex::scoped_lock lk(djMutex);
auto it = djLock.find(uniqueID); auto it = djLock.find(uniqueID);
if (it != djLock.end()) if (it != djLock.end())
{ {
@ -1607,7 +1607,7 @@ struct BPPHandler
return -1; return -1;
} }
unique_lock<shared_mutex> lk(getDJLock(uniqueID)); boost::unique_lock<shared_mutex> lk(getDJLock(uniqueID));
for (i = 0; i < bppv->get().size(); i++) for (i = 0; i < bppv->get().size(); i++)
@ -1653,8 +1653,8 @@ struct BPPHandler
return -1; return -1;
} }
unique_lock<shared_mutex> lk(getDJLock(uniqueID)); boost::unique_lock<shared_mutex> lk(getDJLock(uniqueID));
mutex::scoped_lock scoped(bppLock); boost::mutex::scoped_lock scoped(bppLock);
bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), uniqueID); bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), uniqueID);
@ -1821,7 +1821,7 @@ private:
filter->insert(str); filter->insert(str);
} }
mutex::scoped_lock sl(eqFilterMutex); boost::mutex::scoped_lock sl(eqFilterMutex);
dictEqualityFilters[uniqueID] = filter; dictEqualityFilters[uniqueID] = filter;
} }
}; };
@ -1843,7 +1843,7 @@ public:
bs->advance(sizeof(ISMPacketHeader)); bs->advance(sizeof(ISMPacketHeader));
*bs >> uniqueID; *bs >> uniqueID;
mutex::scoped_lock sl(eqFilterMutex); boost::mutex::scoped_lock sl(eqFilterMutex);
it = dictEqualityFilters.find(uniqueID); it = dictEqualityFilters.find(uniqueID);
if (it != dictEqualityFilters.end()) if (it != dictEqualityFilters.end())
@ -2001,7 +2001,7 @@ struct ReadThread
// IOSocket. If we end up rotating through multiple output sockets // IOSocket. If we end up rotating through multiple output sockets
// for the same UM, we will use UmSocketSelector to select output. // for the same UM, we will use UmSocketSelector to select output.
SP_UM_IOSOCK outIosDefault(new IOSocket(fIos)); SP_UM_IOSOCK outIosDefault(new IOSocket(fIos));
SP_UM_MUTEX writeLockDefault(new mutex()); SP_UM_MUTEX writeLockDefault(new boost::mutex());
bool bRotateDest = fPrimitiveServerPtr->rotatingDestination(); bool bRotateDest = fPrimitiveServerPtr->rotatingDestination();

View File

@ -38,7 +38,7 @@ namespace bf = boost::filesystem;
namespace namespace
{ {
boost::mutex mutex; boost::mutex mdfLock;
storagemanager::MetadataFile::MetadataConfig *inst = NULL; storagemanager::MetadataFile::MetadataConfig *inst = NULL;
uint64_t metadataFilesAccessed = 0; uint64_t metadataFilesAccessed = 0;
} }
@ -50,7 +50,7 @@ MetadataFile::MetadataConfig * MetadataFile::MetadataConfig::get()
{ {
if (inst) if (inst)
return inst; return inst;
boost::unique_lock<boost::mutex> s(mutex); boost::unique_lock<boost::mutex> s(mdfLock);
if (inst) if (inst)
return inst; return inst;
inst = new MetadataConfig(); inst = new MetadataConfig();
@ -380,9 +380,9 @@ void MetadataFile::breakout(const string &key, vector<string> &ret)
string MetadataFile::getNewKeyFromOldKey(const string &key, size_t length) string MetadataFile::getNewKeyFromOldKey(const string &key, size_t length)
{ {
mutex.lock(); mdfLock.lock();
boost::uuids::uuid u = boost::uuids::random_generator()(); boost::uuids::uuid u = boost::uuids::random_generator()();
mutex.unlock(); mdfLock.unlock();
vector<string> split; vector<string> split;
breakout(key, split); breakout(key, split);
@ -393,9 +393,9 @@ string MetadataFile::getNewKeyFromOldKey(const string &key, size_t length)
string MetadataFile::getNewKey(string sourceName, size_t offset, size_t length) string MetadataFile::getNewKey(string sourceName, size_t offset, size_t length)
{ {
mutex.lock(); mdfLock.lock();
boost::uuids::uuid u = boost::uuids::random_generator()(); boost::uuids::uuid u = boost::uuids::random_generator()();
mutex.unlock(); mdfLock.unlock();
stringstream ss; stringstream ss;
for (uint i = 0; i < sourceName.length(); i++) for (uint i = 0; i < sourceName.length(); i++)

View File

@ -52,7 +52,7 @@ namespace
{ {
//Only one of the cacheutils fcns can run at a time //Only one of the cacheutils fcns can run at a time
mutex CacheOpsMutex; boost::mutex CacheOpsMutex;
//This global is updated only w/ atomic ops //This global is updated only w/ atomic ops
volatile uint32_t MultiReturnCode; volatile uint32_t MultiReturnCode;
@ -148,7 +148,7 @@ namespace cacheutils
*/ */
int flushPrimProcCache() int flushPrimProcCache()
{ {
mutex::scoped_lock lk(CacheOpsMutex); boost::mutex::scoped_lock lk(CacheOpsMutex);
try try
{ {
@ -176,7 +176,7 @@ int flushPrimProcBlocks(const BRM::BlockList_t& list)
{ {
if (list.empty()) return 0; if (list.empty()) return 0;
mutex::scoped_lock lk(CacheOpsMutex); boost::mutex::scoped_lock lk(CacheOpsMutex);
#if defined(__LP64__) || defined(_WIN64) #if defined(__LP64__) || defined(_WIN64)
@ -233,7 +233,7 @@ int flushPrimProcAllverBlocks(const vector<LBID_t>& list)
try try
{ {
mutex::scoped_lock lk(CacheOpsMutex); boost::mutex::scoped_lock lk(CacheOpsMutex);
rc = sendToAll(bs); rc = sendToAll(bs);
return rc; return rc;
} }
@ -252,7 +252,7 @@ int flushOIDsFromCache(const vector<BRM::OID_t>& oids)
* uint32_t * - OID array * uint32_t * - OID array
*/ */
mutex::scoped_lock lk(CacheOpsMutex, defer_lock_t()); boost::mutex::scoped_lock lk(CacheOpsMutex, boost::defer_lock_t());
ByteStream bs; ByteStream bs;
ISMPacketHeader ism; ISMPacketHeader ism;
@ -281,7 +281,7 @@ int flushPartition(const std::vector<BRM::OID_t>& oids, set<BRM::LogicalPartitio
* uint32_t * - OID array * uint32_t * - OID array
*/ */
mutex::scoped_lock lk(CacheOpsMutex, defer_lock_t()); boost::mutex::scoped_lock lk(CacheOpsMutex, boost::defer_lock_t());
ByteStream bs; ByteStream bs;
ISMPacketHeader ism; ISMPacketHeader ism;
@ -309,7 +309,7 @@ int dropPrimProcFdCache()
try try
{ {
mutex::scoped_lock lk(CacheOpsMutex); boost::mutex::scoped_lock lk(CacheOpsMutex);
int rc = sendToAll(bs); int rc = sendToAll(bs);
return rc; return rc;
} }

View File

@ -81,7 +81,7 @@ Config* Config::makeConfig(const string& cf)
Config* Config::makeConfig(const char* cf) Config* Config::makeConfig(const char* cf)
{ {
mutex::scoped_lock lk(fInstanceMapMutex); boost::mutex::scoped_lock lk(fInstanceMapMutex);
static string defaultFilePath; static string defaultFilePath;
@ -218,7 +218,7 @@ void Config::closeConfig(void)
const string Config::getConfig(const string& section, const string& name) const string Config::getConfig(const string& section, const string& name)
{ {
recursive_mutex::scoped_lock lk(fLock); boost::recursive_mutex::scoped_lock lk(fLock);
if (section.length() == 0 || name.length() == 0) if (section.length() == 0 || name.length() == 0)
throw invalid_argument("Config::getConfig: both section and name must have a length"); throw invalid_argument("Config::getConfig: both section and name must have a length");
@ -245,7 +245,7 @@ const string Config::getConfig(const string& section, const string& name)
void Config::getConfig(const string& section, const string& name, vector<string>& values) void Config::getConfig(const string& section, const string& name, vector<string>& values)
{ {
recursive_mutex::scoped_lock lk(fLock); boost::recursive_mutex::scoped_lock lk(fLock);
if (section.length() == 0) if (section.length() == 0)
throw invalid_argument("Config::getConfig: section must have a length"); throw invalid_argument("Config::getConfig: section must have a length");
@ -270,7 +270,7 @@ void Config::getConfig(const string& section, const string& name, vector<string>
void Config::setConfig(const string& section, const string& name, const string& value) void Config::setConfig(const string& section, const string& name, const string& value)
{ {
recursive_mutex::scoped_lock lk(fLock); boost::recursive_mutex::scoped_lock lk(fLock);
if (section.length() == 0 || name.length() == 0 ) if (section.length() == 0 || name.length() == 0 )
throw invalid_argument("Config::setConfig: all of section and name must have a length"); throw invalid_argument("Config::setConfig: all of section and name must have a length");
@ -300,7 +300,7 @@ void Config::setConfig(const string& section, const string& name, const string&
void Config::delConfig(const string& section, const string& name) void Config::delConfig(const string& section, const string& name)
{ {
recursive_mutex::scoped_lock lk(fLock); boost::recursive_mutex::scoped_lock lk(fLock);
if (section.length() == 0 || name.length() == 0) if (section.length() == 0 || name.length() == 0)
throw invalid_argument("Config::delConfig: both section and name must have a length"); throw invalid_argument("Config::delConfig: both section and name must have a length");
@ -328,7 +328,7 @@ void Config::delConfig(const string& section, const string& name)
void Config::writeConfig(const string& configFile) const void Config::writeConfig(const string& configFile) const
{ {
recursive_mutex::scoped_lock lk(fLock); boost::recursive_mutex::scoped_lock lk(fLock);
FILE* fi; FILE* fi;
if (fDoc == 0) if (fDoc == 0)
@ -445,7 +445,7 @@ void Config::writeConfig(const string& configFile) const
void Config::write(void) const void Config::write(void) const
{ {
mutex::scoped_lock lk(fWriteXmlLock); boost::mutex::scoped_lock lk(fWriteXmlLock);
#ifdef _MSC_VER #ifdef _MSC_VER
writeConfig(fConfigFile); writeConfig(fConfigFile);
#else #else
@ -540,7 +540,7 @@ void Config::writeConfigFile(messageqcpp::ByteStream msg) const
/* static */ /* static */
void Config::deleteInstanceMap() void Config::deleteInstanceMap()
{ {
mutex::scoped_lock lk(fInstanceMapMutex); boost::mutex::scoped_lock lk(fInstanceMapMutex);
for (Config::configMap_t::iterator iter = fInstanceMap.begin(); for (Config::configMap_t::iterator iter = fInstanceMap.begin();
iter != fInstanceMap.end(); ++iter) iter != fInstanceMap.end(); ++iter)
@ -602,7 +602,7 @@ int64_t Config::fromText(const std::string& text)
time_t Config::getCurrentMTime() time_t Config::getCurrentMTime()
{ {
recursive_mutex::scoped_lock lk(fLock); boost::recursive_mutex::scoped_lock lk(fLock);
struct stat statbuf; struct stat statbuf;
@ -614,7 +614,7 @@ time_t Config::getCurrentMTime()
const vector<string> Config::enumConfig() const vector<string> Config::enumConfig()
{ {
recursive_mutex::scoped_lock lk(fLock); boost::recursive_mutex::scoped_lock lk(fLock);
if (fDoc == 0) if (fDoc == 0)
{ {
@ -638,7 +638,7 @@ const vector<string> Config::enumConfig()
const vector<string> Config::enumSection(const string& section) const vector<string> Config::enumSection(const string& section)
{ {
recursive_mutex::scoped_lock lk(fLock); boost::recursive_mutex::scoped_lock lk(fLock);
if (fDoc == 0) if (fDoc == 0)
{ {

View File

@ -47,11 +47,11 @@ namespace logging
{ {
IDBErrorInfo* IDBErrorInfo::fInstance = 0; IDBErrorInfo* IDBErrorInfo::fInstance = 0;
mutex mx; boost::mutex mx;
IDBErrorInfo* IDBErrorInfo::instance() IDBErrorInfo* IDBErrorInfo::instance()
{ {
mutex::scoped_lock lk(mx); boost::mutex::scoped_lock lk(mx);
if (!fInstance) if (!fInstance)
fInstance = new IDBErrorInfo(); fInstance = new IDBErrorInfo();

View File

@ -51,7 +51,7 @@ const string Logger::logMessage(LOG_TYPE logLevel, Message::MessageID mid, const
return logMessage(logLevel, msg, logInfo); return logMessage(logLevel, msg, logInfo);
/* /*
mutex::scoped_lock lk(fLogLock); boost::mutex::scoped_lock lk(fLogLock);
fMl1.logData(logInfo); fMl1.logData(logInfo);
switch (logLevel) switch (logLevel)
@ -79,7 +79,7 @@ const string Logger::logMessage(LOG_TYPE logLevel, Message::MessageID mid, const
const std::string Logger::logMessage(LOG_TYPE logLevel, const Message& msg, const LoggingID& logInfo) const std::string Logger::logMessage(LOG_TYPE logLevel, const Message& msg, const LoggingID& logInfo)
{ {
mutex::scoped_lock lk(fLogLock); boost::mutex::scoped_lock lk(fLogLock);
fMl1.logData(logInfo); fMl1.logData(logInfo);
switch (logLevel) switch (logLevel)

View File

@ -44,7 +44,7 @@ using namespace config;
namespace namespace
{ {
mutex mx; boost::mutex mx;
bool catalogLoaded = false; bool catalogLoaded = false;
typedef map<int, string> CatMap; typedef map<int, string> CatMap;
@ -190,7 +190,7 @@ const string Message::lookupMessage(const MessageID& msgid)
{ {
if (!catalogLoaded) if (!catalogLoaded)
{ {
mutex::scoped_lock lock(mx); boost::mutex::scoped_lock lock(mx);
if (!catalogLoaded) if (!catalogLoaded)
{ {

View File

@ -98,7 +98,7 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len)
return numeric_limits<uint64_t>::max(); return numeric_limits<uint64_t>::max();
//@bug6065, make StringStore::storeString() thread safe //@bug6065, make StringStore::storeString() thread safe
boost::mutex::scoped_lock lk(fMutex, defer_lock); boost::mutex::scoped_lock lk(fMutex, boost::defer_lock);
if (fUseStoreStringMutex) if (fUseStoreStringMutex)
lk.lock(); lk.lock();
@ -254,7 +254,7 @@ uint32_t UserDataStore::storeUserData(mcsv1sdk::mcsv1Context& context,
return numeric_limits<uint32_t>::max(); return numeric_limits<uint32_t>::max();
} }
boost::mutex::scoped_lock lk(fMutex, defer_lock); boost::mutex::scoped_lock lk(fMutex, boost::defer_lock);
if (fUseUserDataMutex) if (fUseUserDataMutex)
lk.lock(); lk.lock();

View File

@ -43,14 +43,14 @@ namespace startup
{ {
/* static */ /* static */
mutex StartUp::fTmpDirLock; boost::mutex StartUp::fTmpDirLock;
/* static */ /* static */
string* StartUp::fTmpDirp = 0; string* StartUp::fTmpDirp = 0;
/* static */ /* static */
const string StartUp::tmpDir() const string StartUp::tmpDir()
{ {
mutex::scoped_lock lk(fTmpDirLock); boost::mutex::scoped_lock lk(fTmpDirLock);
if (fTmpDirp) if (fTmpDirp)
return *fTmpDirp; return *fTmpDirp;

View File

@ -73,7 +73,7 @@ PriorityThreadPool::~PriorityThreadPool()
void PriorityThreadPool::addJob(const Job& job, bool useLock) void PriorityThreadPool::addJob(const Job& job, bool useLock)
{ {
boost::thread* newThread; boost::thread* newThread;
mutex::scoped_lock lk(mutex, defer_lock_t()); boost::mutex::scoped_lock lk(mutex, boost::defer_lock_t());
if (useLock) if (useLock)
lk.lock(); lk.lock();
@ -115,7 +115,7 @@ void PriorityThreadPool::removeJobs(uint32_t id)
{ {
list<Job>::iterator it; list<Job>::iterator it;
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
for (uint32_t i = 0; i < _COUNT; i++) for (uint32_t i = 0; i < _COUNT; i++)
for (it = jobQueues[i].begin(); it != jobQueues[i].end();) for (it = jobQueues[i].begin(); it != jobQueues[i].end();)
@ -152,7 +152,7 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw()
while (!_stop) while (!_stop)
{ {
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
queue = pickAQueue(preferredQueue); queue = pickAQueue(preferredQueue);

View File

@ -45,7 +45,7 @@ AutoincrementManager::~AutoincrementManager()
void AutoincrementManager::startSequence(uint32_t oid, uint64_t firstNum, uint32_t colWidth, void AutoincrementManager::startSequence(uint32_t oid, uint64_t firstNum, uint32_t colWidth,
execplan::CalpontSystemCatalog::ColDataType colDataType) execplan::CalpontSystemCatalog::ColDataType colDataType)
{ {
mutex::scoped_lock lk(lock); boost::mutex::scoped_lock lk(lock);
map<uint64_t, sequence>::iterator it; map<uint64_t, sequence>::iterator it;
sequence s; sequence s;
@ -70,7 +70,7 @@ void AutoincrementManager::startSequence(uint32_t oid, uint64_t firstNum, uint32
bool AutoincrementManager::getAIRange(uint32_t oid, uint64_t count, uint64_t* firstNum) bool AutoincrementManager::getAIRange(uint32_t oid, uint64_t count, uint64_t* firstNum)
{ {
mutex::scoped_lock lk(lock); boost::mutex::scoped_lock lk(lock);
map<uint64_t, sequence>::iterator it; map<uint64_t, sequence>::iterator it;
it = sequences.find(oid); it = sequences.find(oid);
@ -91,7 +91,7 @@ bool AutoincrementManager::getAIRange(uint32_t oid, uint64_t count, uint64_t* fi
void AutoincrementManager::resetSequence(uint32_t oid, uint64_t value) void AutoincrementManager::resetSequence(uint32_t oid, uint64_t value)
{ {
mutex::scoped_lock lk(lock); boost::mutex::scoped_lock lk(lock);
map<uint64_t, sequence>::iterator it; map<uint64_t, sequence>::iterator it;
it = sequences.find(oid); it = sequences.find(oid);
@ -104,7 +104,7 @@ void AutoincrementManager::resetSequence(uint32_t oid, uint64_t value)
void AutoincrementManager::getLock(uint32_t oid) void AutoincrementManager::getLock(uint32_t oid)
{ {
mutex::scoped_lock lk(lock); boost::mutex::scoped_lock lk(lock);
map<uint64_t, sequence>::iterator it; map<uint64_t, sequence>::iterator it;
ptime stealTime = microsec_clock::local_time() + seconds(lockTime); ptime stealTime = microsec_clock::local_time() + seconds(lockTime);
@ -130,7 +130,7 @@ void AutoincrementManager::getLock(uint32_t oid)
void AutoincrementManager::releaseLock(uint32_t oid) void AutoincrementManager::releaseLock(uint32_t oid)
{ {
mutex::scoped_lock lk(lock); boost::mutex::scoped_lock lk(lock);
map<uint64_t, sequence>::iterator it; map<uint64_t, sequence>::iterator it;
it = sequences.find(oid); it = sequences.find(oid);
@ -145,7 +145,7 @@ void AutoincrementManager::releaseLock(uint32_t oid)
void AutoincrementManager::deleteSequence(uint32_t oid) void AutoincrementManager::deleteSequence(uint32_t oid)
{ {
mutex::scoped_lock lk(lock); boost::mutex::scoped_lock lk(lock);
map<uint64_t, sequence>::iterator it; map<uint64_t, sequence>::iterator it;
it = sequences.find(oid); it = sequences.find(oid);

View File

@ -1439,7 +1439,7 @@ void ExtentMap::save(const string& filename)
/* always returns holding the EM lock, and with the EM seg mapped */ /* always returns holding the EM lock, and with the EM seg mapped */
void ExtentMap::grabEMEntryTable(OPS op) void ExtentMap::grabEMEntryTable(OPS op)
{ {
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
if (op == READ) if (op == READ)
fEMShminfo = fMST.getTable_read(MasterSegmentTable::EMTable); fEMShminfo = fMST.getTable_read(MasterSegmentTable::EMTable);
@ -1496,7 +1496,7 @@ void ExtentMap::grabEMEntryTable(OPS op)
/* always returns holding the FL lock */ /* always returns holding the FL lock */
void ExtentMap::grabFreeList(OPS op) void ExtentMap::grabFreeList(OPS op)
{ {
mutex::scoped_lock lk(mutex, defer_lock); boost::mutex::scoped_lock lk(mutex, boost::defer_lock);
if (op == READ) if (op == READ)
{ {

View File

@ -237,7 +237,7 @@ const QueryContext SessionManagerServer::verID()
{ {
QueryContext ret; QueryContext ret;
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
ret.currentScn = _verID; ret.currentScn = _verID;
for (iterator i = activeTxns.begin(); i != activeTxns.end(); ++i) for (iterator i = activeTxns.begin(); i != activeTxns.end(); ++i)
@ -250,7 +250,7 @@ const QueryContext SessionManagerServer::sysCatVerID()
{ {
QueryContext ret; QueryContext ret;
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
ret.currentScn = _sysCatVerID; ret.currentScn = _sysCatVerID;
for (iterator i = activeTxns.begin(); i != activeTxns.end(); ++i) for (iterator i = activeTxns.begin(); i != activeTxns.end(); ++i)
@ -264,7 +264,7 @@ const TxnID SessionManagerServer::newTxnID(const SID session, bool block, bool i
TxnID ret; //ctor must set valid = false TxnID ret; //ctor must set valid = false
iterator it; iterator it;
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
// if it already has a txn... // if it already has a txn...
it = activeTxns.find(session); it = activeTxns.find(session);
@ -299,7 +299,7 @@ const TxnID SessionManagerServer::newTxnID(const SID session, bool block, bool i
void SessionManagerServer::finishTransaction(TxnID& txn) void SessionManagerServer::finishTransaction(TxnID& txn)
{ {
iterator it; iterator it;
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
bool found = false; bool found = false;
if (!txn.valid) if (!txn.valid)
@ -335,7 +335,7 @@ const TxnID SessionManagerServer::getTxnID(const SID session)
TxnID ret; TxnID ret;
iterator it; iterator it;
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
it = activeTxns.find(session); it = activeTxns.find(session);
@ -352,7 +352,7 @@ shared_array<SIDTIDEntry> SessionManagerServer::SIDTIDMap(int& len)
{ {
int j; int j;
shared_array<SIDTIDEntry> ret; shared_array<SIDTIDEntry> ret;
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
iterator it; iterator it;
ret.reset(new SIDTIDEntry[activeTxns.size()]); ret.reset(new SIDTIDEntry[activeTxns.size()]);
@ -371,7 +371,7 @@ shared_array<SIDTIDEntry> SessionManagerServer::SIDTIDMap(int& len)
void SessionManagerServer::setSystemState(uint32_t state) void SessionManagerServer::setSystemState(uint32_t state)
{ {
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
systemState |= state; systemState |= state;
saveSystemState(); saveSystemState();
@ -379,7 +379,7 @@ void SessionManagerServer::setSystemState(uint32_t state)
void SessionManagerServer::clearSystemState(uint32_t state) void SessionManagerServer::clearSystemState(uint32_t state)
{ {
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
systemState &= ~state; systemState &= ~state;
saveSystemState(); saveSystemState();
@ -387,7 +387,7 @@ void SessionManagerServer::clearSystemState(uint32_t state)
uint32_t SessionManagerServer::getTxnCount() uint32_t SessionManagerServer::getTxnCount()
{ {
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
return activeTxns.size(); return activeTxns.size();
} }

View File

@ -40,7 +40,7 @@ namespace BRM
TableLockServer::TableLockServer(SessionManagerServer* sm) : sms(sm) TableLockServer::TableLockServer(SessionManagerServer* sm) : sms(sm)
{ {
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
config::Config* config = config::Config::makeConfig(); config::Config* config = config::Config::makeConfig();
filename = config->getConfig("SystemConfig", "TableLockSaveFile"); filename = config->getConfig("SystemConfig", "TableLockSaveFile");
@ -135,7 +135,7 @@ uint64_t TableLockServer::lock(TableLockInfo* tli)
set<uint32_t> dbroots; set<uint32_t> dbroots;
lit_t it; lit_t it;
uint32_t i; uint32_t i;
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
for (i = 0; i < tli->dbrootList.size(); i++) for (i = 0; i < tli->dbrootList.size(); i++)
dbroots.insert(tli->dbrootList[i]); dbroots.insert(tli->dbrootList[i]);
@ -177,7 +177,7 @@ bool TableLockServer::unlock(uint64_t id)
std::map<uint64_t, TableLockInfo>::iterator it; std::map<uint64_t, TableLockInfo>::iterator it;
TableLockInfo tli; TableLockInfo tli;
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
it = locks.find(id); it = locks.find(id);
if (it != locks.end()) if (it != locks.end())
@ -204,7 +204,7 @@ bool TableLockServer::unlock(uint64_t id)
bool TableLockServer::changeState(uint64_t id, LockState state) bool TableLockServer::changeState(uint64_t id, LockState state)
{ {
lit_t it; lit_t it;
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
LockState old; LockState old;
it = locks.find(id); it = locks.find(id);
@ -232,7 +232,7 @@ bool TableLockServer::changeOwner(uint64_t id, const string& ownerName, uint32_t
int32_t txnID) int32_t txnID)
{ {
lit_t it; lit_t it;
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
string oldName; string oldName;
uint32_t oldPID; uint32_t oldPID;
int32_t oldSession; int32_t oldSession;
@ -271,7 +271,7 @@ bool TableLockServer::changeOwner(uint64_t id, const string& ownerName, uint32_t
vector<TableLockInfo> TableLockServer::getAllLocks() const vector<TableLockInfo> TableLockServer::getAllLocks() const
{ {
vector<TableLockInfo> ret; vector<TableLockInfo> ret;
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
constlit_t it; constlit_t it;
for (it = locks.begin(); it != locks.end(); ++it) for (it = locks.begin(); it != locks.end(); ++it)
@ -284,7 +284,7 @@ void TableLockServer::releaseAllLocks()
{ {
std::map<uint64_t, TableLockInfo> tmp; std::map<uint64_t, TableLockInfo> tmp;
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
tmp.swap(locks); tmp.swap(locks);
try try
@ -301,7 +301,7 @@ void TableLockServer::releaseAllLocks()
bool TableLockServer::getLockInfo(uint64_t id, TableLockInfo* out) const bool TableLockServer::getLockInfo(uint64_t id, TableLockInfo* out) const
{ {
constlit_t it; constlit_t it;
mutex::scoped_lock lk(mutex); boost::mutex::scoped_lock lk(mutex);
it = locks.find(id); it = locks.find(id);

View File

@ -382,7 +382,7 @@ void WEClients::Listen ( boost::shared_ptr<MessageQueueClient> client, uint32_t
Error: Error:
// error condition! push 0 length bs to messagequeuemap and // error condition! push 0 length bs to messagequeuemap and
// eventually let jobstep error out. // eventually let jobstep error out.
mutex::scoped_lock lk(fMlock); boost::mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok; MessageQueueMap::iterator map_tok;
sbs.reset(new ByteStream(0)); sbs.reset(new ByteStream(0));
@ -398,7 +398,7 @@ Error:
// reset the pmconnection map // reset the pmconnection map
{ {
mutex::scoped_lock onErrLock(fOnErrMutex); boost::mutex::scoped_lock onErrLock(fOnErrMutex);
string moduleName = client->moduleName(); string moduleName = client->moduleName();
ClientList::iterator itor = fPmConnections.begin(); ClientList::iterator itor = fPmConnections.begin();
@ -430,13 +430,13 @@ void WEClients::addQueue(uint32_t key)
{ {
bool b; bool b;
mutex* lock = new mutex(); boost::mutex* lock = new boost::mutex();
condition* cond = new condition(); condition* cond = new condition();
boost::shared_ptr<MQE> mqe(new MQE(pmCount)); boost::shared_ptr<MQE> mqe(new MQE(pmCount));
mqe->queue = WESMsgQueue(lock, cond); mqe->queue = WESMsgQueue(lock, cond);
mutex::scoped_lock lk ( fMlock ); boost::mutex::scoped_lock lk ( fMlock );
b = fSessionMessages.insert(pair<uint32_t, boost::shared_ptr<MQE> >(key, mqe)).second; b = fSessionMessages.insert(pair<uint32_t, boost::shared_ptr<MQE> >(key, mqe)).second;
if (!b) if (!b)
@ -449,7 +449,7 @@ void WEClients::addQueue(uint32_t key)
void WEClients::removeQueue(uint32_t key) void WEClients::removeQueue(uint32_t key)
{ {
mutex::scoped_lock lk(fMlock); boost::mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key); MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end()) if (map_tok == fSessionMessages.end())
@ -462,7 +462,7 @@ void WEClients::removeQueue(uint32_t key)
void WEClients::shutdownQueue(uint32_t key) void WEClients::shutdownQueue(uint32_t key)
{ {
mutex::scoped_lock lk(fMlock); boost::mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key); MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end()) if (map_tok == fSessionMessages.end())
@ -477,7 +477,7 @@ void WEClients::read(uint32_t key, SBS& bs)
boost::shared_ptr<MQE> mqe; boost::shared_ptr<MQE> mqe;
//Find the StepMsgQueueList for this session //Find the StepMsgQueueList for this session
mutex::scoped_lock lk(fMlock); boost::mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key); MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end()) if (map_tok == fSessionMessages.end())
@ -557,7 +557,7 @@ void WEClients::addDataToOutput(SBS sbs, uint32_t connIndex)
*sbs >> uniqueId; *sbs >> uniqueId;
boost::shared_ptr<MQE> mqe; boost::shared_ptr<MQE> mqe;
mutex::scoped_lock lk(fMlock); boost::mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId); MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId);
if (map_tok == fSessionMessages.end()) if (map_tok == fSessionMessages.end())

View File

@ -77,7 +77,7 @@ const string PlanFileName("/redistribute.plan");
RedistributeControl* RedistributeControl::instance() RedistributeControl* RedistributeControl::instance()
{ {
// The constructor is protected by instanceMutex lock. // The constructor is protected by instanceMutex lock.
mutex::scoped_lock lock(instanceMutex); boost::mutex::scoped_lock lock(instanceMutex);
if (fInstance == NULL) if (fInstance == NULL)
fInstance = new RedistributeControl(); fInstance = new RedistributeControl();
@ -132,7 +132,7 @@ RedistributeControl::~RedistributeControl()
int RedistributeControl::handleUIMsg(messageqcpp::ByteStream& bs, messageqcpp::IOSocket& so) int RedistributeControl::handleUIMsg(messageqcpp::ByteStream& bs, messageqcpp::IOSocket& so)
{ {
mutex::scoped_lock sessionLock(fSessionMutex); boost::mutex::scoped_lock sessionLock(fSessionMutex);
uint32_t status = RED_STATE_UNDEF; uint32_t status = RED_STATE_UNDEF;
const RedistributeMsgHeader* h = (const RedistributeMsgHeader*) bs.buf(); const RedistributeMsgHeader* h = (const RedistributeMsgHeader*) bs.buf();
@ -402,7 +402,7 @@ uint32_t RedistributeControl::getCurrentState()
{ {
uint32_t status = RED_STATE_UNDEF; uint32_t status = RED_STATE_UNDEF;
ostringstream oss; ostringstream oss;
mutex::scoped_lock lock(fInfoFileMutex); boost::mutex::scoped_lock lock(fInfoFileMutex);
if (!fInfoFilePtr) if (!fInfoFilePtr)
{ {
@ -475,7 +475,7 @@ bool RedistributeControl::getStartOptions(messageqcpp::ByteStream& bs)
void RedistributeControl::updateState(uint32_t s) void RedistributeControl::updateState(uint32_t s)
{ {
mutex::scoped_lock lock(fInfoFileMutex); boost::mutex::scoped_lock lock(fInfoFileMutex);
// allowed state change: // allowed state change:
// idle -> active // idle -> active
@ -666,7 +666,7 @@ void RedistributeControl::updateState(uint32_t s)
void RedistributeControl::setEntryCount(uint32_t entryCount) void RedistributeControl::setEntryCount(uint32_t entryCount)
{ {
mutex::scoped_lock lock(fInfoFileMutex); boost::mutex::scoped_lock lock(fInfoFileMutex);
fRedistributeInfo.planned = entryCount; fRedistributeInfo.planned = entryCount;
rewind(fInfoFilePtr); rewind(fInfoFilePtr);
@ -677,7 +677,7 @@ void RedistributeControl::setEntryCount(uint32_t entryCount)
void RedistributeControl::updateProgressInfo(uint32_t s, time_t t) void RedistributeControl::updateProgressInfo(uint32_t s, time_t t)
{ {
mutex::scoped_lock lock(fInfoFileMutex); boost::mutex::scoped_lock lock(fInfoFileMutex);
fRedistributeInfo.endTime = t; fRedistributeInfo.endTime = t;
switch (s) switch (s)
@ -703,7 +703,7 @@ void RedistributeControl::updateProgressInfo(uint32_t s, time_t t)
int RedistributeControl::handleJobMsg(messageqcpp::ByteStream& bs, messageqcpp::IOSocket& so) int RedistributeControl::handleJobMsg(messageqcpp::ByteStream& bs, messageqcpp::IOSocket& so)
{ {
// mutex::scoped_lock jobLock(fJobMutex); // boost::mutex::scoped_lock jobLock(fJobMutex);
uint32_t status = RED_TRANS_SUCCESS; uint32_t status = RED_TRANS_SUCCESS;

View File

@ -73,7 +73,7 @@ string RedistributeControlThread::fWesInUse;
void RedistributeControlThread::setStopAction(bool s) void RedistributeControlThread::setStopAction(bool s)
{ {
mutex::scoped_lock lock(fActionMutex); boost::mutex::scoped_lock lock(fActionMutex);
fStopAction = s; fStopAction = s;
} }
@ -153,7 +153,7 @@ void RedistributeControlThread::doRedistribute()
fControl->logMessage(fErrorMsg + " @controlThread::doRedistribute"); fControl->logMessage(fErrorMsg + " @controlThread::doRedistribute");
{ {
mutex::scoped_lock lock(fActionMutex); boost::mutex::scoped_lock lock(fActionMutex);
fWesInUse.clear(); fWesInUse.clear();
} }
} }
@ -776,7 +776,7 @@ int RedistributeControlThread::connectToWes(int dbroot)
try try
{ {
mutex::scoped_lock lock(fActionMutex); boost::mutex::scoped_lock lock(fActionMutex);
fWesInUse = oss.str(); fWesInUse = oss.str();
fMsgQueueClient.reset(new MessageQueueClient(fWesInUse, fConfig)); fMsgQueueClient.reset(new MessageQueueClient(fWesInUse, fConfig));
} }
@ -793,7 +793,7 @@ int RedistributeControlThread::connectToWes(int dbroot)
if (ret != 0) if (ret != 0)
{ {
mutex::scoped_lock lock(fActionMutex); boost::mutex::scoped_lock lock(fActionMutex);
fWesInUse.clear(); fWesInUse.clear();
fMsgQueueClient.reset(); fMsgQueueClient.reset();
@ -808,7 +808,7 @@ void RedistributeControlThread::doStopAction()
fConfig = Config::makeConfig(); fConfig = Config::makeConfig();
fControl = RedistributeControl::instance(); fControl = RedistributeControl::instance();
mutex::scoped_lock lock(fActionMutex); boost::mutex::scoped_lock lock(fActionMutex);
if (!fWesInUse.empty()) if (!fWesInUse.empty())
{ {

View File

@ -99,7 +99,7 @@ RedistributeWorkerThread::RedistributeWorkerThread(ByteStream& bs, IOSocket& ios
RedistributeWorkerThread::~RedistributeWorkerThread() RedistributeWorkerThread::~RedistributeWorkerThread()
{ {
mutex::scoped_lock lock(fActionMutex); boost::mutex::scoped_lock lock(fActionMutex);
if (fNewFilePtr) if (fNewFilePtr)
closeFile(fNewFilePtr); closeFile(fNewFilePtr);
@ -140,7 +140,7 @@ void RedistributeWorkerThread::handleRequest()
{ {
// clear stop flag if ever set. // clear stop flag if ever set.
{ {
mutex::scoped_lock lock(fActionMutex); boost::mutex::scoped_lock lock(fActionMutex);
fStopAction = false; fStopAction = false;
fCommitted = false; fCommitted = false;
} }
@ -188,7 +188,7 @@ void RedistributeWorkerThread::handleRequest()
sendResponse(RED_ACTN_REQUEST); sendResponse(RED_ACTN_REQUEST);
mutex::scoped_lock lock(fActionMutex); boost::mutex::scoped_lock lock(fActionMutex);
fWesInUse.clear(); fWesInUse.clear();
fMsgQueueClient.reset(); fMsgQueueClient.reset();
@ -910,7 +910,7 @@ int RedistributeWorkerThread::updateDbrm()
{ {
int rc1 = BRM::ERR_OK; int rc1 = BRM::ERR_OK;
int rc2 = BRM::ERR_OK; int rc2 = BRM::ERR_OK;
mutex::scoped_lock lock(fActionMutex); boost::mutex::scoped_lock lock(fActionMutex);
// cannot stop after extent map is updated. // cannot stop after extent map is updated.
if (!fStopAction) if (!fStopAction)
@ -1113,7 +1113,7 @@ void RedistributeWorkerThread::addToDirSet(const char* fileName, bool isSource)
void RedistributeWorkerThread::handleStop() void RedistributeWorkerThread::handleStop()
{ {
mutex::scoped_lock lock(fActionMutex); boost::mutex::scoped_lock lock(fActionMutex);
// cannot stop after extent map is updated. // cannot stop after extent map is updated.
if (!fCommitted) if (!fCommitted)

View File

@ -96,7 +96,7 @@ void WECpiFeederThread::add2MsgQueue(ByteStream& Ibs)
//TODO creating copy is NOT good; later read from socket using a SBS //TODO creating copy is NOT good; later read from socket using a SBS
messageqcpp::SBS aSbs(new messageqcpp::ByteStream(Ibs)); messageqcpp::SBS aSbs(new messageqcpp::ByteStream(Ibs));
Ibs.reset(); //forcefully clearing it Ibs.reset(); //forcefully clearing it
mutex::scoped_lock aLock(fMsgQMutex); boost::mutex::scoped_lock aLock(fMsgQMutex);
//cout << "pushing to the MsgQueue" << endl; //cout << "pushing to the MsgQueue" << endl;
fMsgQueue.push(aSbs); fMsgQueue.push(aSbs);
fFeederCond.notify_one(); // as per preference of Damon fFeederCond.notify_one(); // as per preference of Damon
@ -111,7 +111,7 @@ void WECpiFeederThread::feedData2Cpi()
while (isContinue()) while (isContinue())
{ {
mutex::scoped_lock aLock(fMsgQMutex); boost::mutex::scoped_lock aLock(fMsgQMutex);
if (fMsgQueue.empty()) if (fMsgQueue.empty())
{ {
@ -163,7 +163,7 @@ void WECpiFeederThread::feedData2Cpi()
bool WECpiFeederThread::isMsgQueueEmpty() bool WECpiFeederThread::isMsgQueueEmpty()
{ {
bool aRet = false; bool aRet = false;
mutex::scoped_lock aLock(fMsgQMutex); boost::mutex::scoped_lock aLock(fMsgQMutex);
aRet = fMsgQueue.empty(); aRet = fMsgQueue.empty();
aLock.unlock(); aLock.unlock();
return aRet; return aRet;
@ -173,7 +173,7 @@ bool WECpiFeederThread::isMsgQueueEmpty()
void WECpiFeederThread::stopThread() void WECpiFeederThread::stopThread()
{ {
mutex::scoped_lock aCondLock(fContMutex); boost::mutex::scoped_lock aCondLock(fContMutex);
fContinue = false; fContinue = false;
aCondLock.unlock(); aCondLock.unlock();
fFeederCond.notify_all(); fFeederCond.notify_all();
@ -184,7 +184,7 @@ void WECpiFeederThread::stopThread()
bool WECpiFeederThread::isContinue() bool WECpiFeederThread::isContinue()
{ {
mutex::scoped_lock aCondLock(fContMutex); boost::mutex::scoped_lock aCondLock(fContMutex);
return fContinue; return fContinue;
} }

View File

@ -585,7 +585,7 @@ void WEDataLoader::onCpimportSuccess()
if (aRet) if (aRet)
{ {
mutex::scoped_lock aLock(fClntMsgMutex); boost::mutex::scoped_lock aLock(fClntMsgMutex);
//aBrmRptParser.unserialize(obs); - was for testing //aBrmRptParser.unserialize(obs); - was for testing
updateTxBytes(obs.length()); updateTxBytes(obs.length());
@ -622,7 +622,7 @@ void WEDataLoader::onCpimportSuccess()
obs.reset(); obs.reset();
obs << (ByteStream::byte)WE_CLT_SRV_CPIPASS; obs << (ByteStream::byte)WE_CLT_SRV_CPIPASS;
obs << (ByteStream::byte)fPmId; // PM id obs << (ByteStream::byte)fPmId; // PM id
mutex::scoped_lock aLock(fClntMsgMutex); boost::mutex::scoped_lock aLock(fClntMsgMutex);
updateTxBytes(obs.length()); updateTxBytes(obs.length());
try try
@ -679,7 +679,7 @@ void WEDataLoader::onCpimportFailure()
if (aRet) if (aRet)
{ {
mutex::scoped_lock aLock(fClntMsgMutex); boost::mutex::scoped_lock aLock(fClntMsgMutex);
updateTxBytes(obs.length()); updateTxBytes(obs.length());
try try
@ -727,7 +727,7 @@ void WEDataLoader::sendCpimportFailureNotice()
ByteStream obs; ByteStream obs;
obs << (ByteStream::byte)WE_CLT_SRV_CPIFAIL; obs << (ByteStream::byte)WE_CLT_SRV_CPIFAIL;
obs << (ByteStream::byte)fPmId; // PM id obs << (ByteStream::byte)fPmId; // PM id
mutex::scoped_lock aLock(fClntMsgMutex); boost::mutex::scoped_lock aLock(fClntMsgMutex);
updateTxBytes(obs.length()); updateTxBytes(obs.length());
try try
@ -773,7 +773,7 @@ void WEDataLoader::onReceiveKeepAlive(ByteStream& Ibs)
ByteStream obs; ByteStream obs;
obs << (ByteStream::byte)WE_CLT_SRV_KEEPALIVE; obs << (ByteStream::byte)WE_CLT_SRV_KEEPALIVE;
obs << (ByteStream::byte)fPmId; // PM id obs << (ByteStream::byte)fPmId; // PM id
mutex::scoped_lock aLock(fClntMsgMutex); boost::mutex::scoped_lock aLock(fClntMsgMutex);
updateTxBytes(obs.length()); updateTxBytes(obs.length());
try try
@ -832,7 +832,7 @@ void WEDataLoader::onReceiveKeepAlive(ByteStream& Ibs)
ByteStream obs; ByteStream obs;
obs << (ByteStream::byte)WE_CLT_SRV_EOD; obs << (ByteStream::byte)WE_CLT_SRV_EOD;
obs << (ByteStream::byte)fPmId; // PM id obs << (ByteStream::byte)fPmId; // PM id
mutex::scoped_lock aLock(fClntMsgMutex); boost::mutex::scoped_lock aLock(fClntMsgMutex);
updateTxBytes(obs.length()); updateTxBytes(obs.length());
try try
@ -956,7 +956,7 @@ void WEDataLoader::onReceiveEod(ByteStream& Ibs)
ByteStream obs; ByteStream obs;
obs << (ByteStream::byte)WE_CLT_SRV_EOD; obs << (ByteStream::byte)WE_CLT_SRV_EOD;
obs << (ByteStream::byte)fPmId; // PM id obs << (ByteStream::byte)fPmId; // PM id
mutex::scoped_lock aLock(fClntMsgMutex); boost::mutex::scoped_lock aLock(fClntMsgMutex);
updateTxBytes(obs.length()); updateTxBytes(obs.length());
try try
@ -1058,7 +1058,7 @@ void WEDataLoader::onReceiveMode(ByteStream& Ibs)
aObs << (ByteStream::byte)WE_CLT_SRV_DBRCNT; aObs << (ByteStream::byte)WE_CLT_SRV_DBRCNT;
aObs << (ByteStream::byte)fPmId; aObs << (ByteStream::byte)fPmId;
aObs << (ByteStream::byte)aDbCnt; aObs << (ByteStream::byte)aDbCnt;
mutex::scoped_lock aLock(fClntMsgMutex); boost::mutex::scoped_lock aLock(fClntMsgMutex);
updateTxBytes(aObs.length()); updateTxBytes(aObs.length());
try try
@ -1151,7 +1151,7 @@ void WEDataLoader::onReceiveCmdLineArgs(ByteStream& Ibs)
} }
obs << (ByteStream::byte) fPmId; // PM id obs << (ByteStream::byte) fPmId; // PM id
mutex::scoped_lock aLock(fClntMsgMutex); boost::mutex::scoped_lock aLock(fClntMsgMutex);
updateTxBytes(obs.length()); updateTxBytes(obs.length());
try try
@ -1290,7 +1290,7 @@ void WEDataLoader::onReceiveCleanup(ByteStream& Ibs)
else else
obs << (ByteStream::byte)0; // cleanup failed obs << (ByteStream::byte)0; // cleanup failed
mutex::scoped_lock aLock(fClntMsgMutex); boost::mutex::scoped_lock aLock(fClntMsgMutex);
updateTxBytes(obs.length()); updateTxBytes(obs.length());
try try
@ -1347,7 +1347,7 @@ void WEDataLoader::onReceiveRollback(ByteStream& Ibs)
else else
obs << (ByteStream::byte)0; // Rollback failed obs << (ByteStream::byte)0; // Rollback failed
mutex::scoped_lock aLock(fClntMsgMutex); boost::mutex::scoped_lock aLock(fClntMsgMutex);
updateTxBytes(obs.length()); updateTxBytes(obs.length());
try try
@ -1392,7 +1392,7 @@ void WEDataLoader::onReceiveImportFileName(ByteStream& Ibs)
obs << (ByteStream::byte)WE_CLT_SRV_IMPFILEERROR; obs << (ByteStream::byte)WE_CLT_SRV_IMPFILEERROR;
obs << (ByteStream::byte)fPmId; obs << (ByteStream::byte)fPmId;
updateTxBytes(obs.length()); updateTxBytes(obs.length());
mutex::scoped_lock aLock(fClntMsgMutex); boost::mutex::scoped_lock aLock(fClntMsgMutex);
try try
{ {
@ -1430,7 +1430,7 @@ void WEDataLoader::onReceiveImportFileName(ByteStream& Ibs)
ByteStream obs; ByteStream obs;
obs << (ByteStream::byte)WE_CLT_SRV_IMPFILEERROR; obs << (ByteStream::byte)WE_CLT_SRV_IMPFILEERROR;
obs << (ByteStream::byte)fPmId; // PM id obs << (ByteStream::byte)fPmId; // PM id
mutex::scoped_lock aLock(fClntMsgMutex); boost::mutex::scoped_lock aLock(fClntMsgMutex);
updateTxBytes(obs.length()); updateTxBytes(obs.length());
try try
@ -1512,7 +1512,7 @@ void WEDataLoader::onReceiveErrFileRqst(ByteStream& Ibs)
if (aRet) if (aRet)
{ {
mutex::scoped_lock aLock(fClntMsgMutex); boost::mutex::scoped_lock aLock(fClntMsgMutex);
updateTxBytes(obs.length()); updateTxBytes(obs.length());
try try
@ -1561,7 +1561,7 @@ void WEDataLoader::onReceiveBadFileRqst(ByteStream& Ibs)
if (aRet) if (aRet)
{ {
mutex::scoped_lock aLock(fClntMsgMutex); boost::mutex::scoped_lock aLock(fClntMsgMutex);
updateTxBytes(obs.length()); updateTxBytes(obs.length());
try try
@ -1614,7 +1614,7 @@ void WEDataLoader::sendDataRequest()
} }
} }
mutex::scoped_lock aLock(fClntMsgMutex); boost::mutex::scoped_lock aLock(fClntMsgMutex);
ByteStream obs; ByteStream obs;
obs << (ByteStream::byte)WE_CLT_SRV_DATARQST; obs << (ByteStream::byte)WE_CLT_SRV_DATARQST;
obs << (ByteStream::byte)fPmId; // PM id obs << (ByteStream::byte)fPmId; // PM id

View File

@ -1691,7 +1691,7 @@ int BRMWrapper::writeVB(IDBDataFile* pSourceFile, const VER_t transID, const OID
fileInfo.fSegment = 0; fileInfo.fSegment = 0;
// fileInfo.fDbRoot = (freeList[0].vbOID % rootCnt) + 1; // fileInfo.fDbRoot = (freeList[0].vbOID % rootCnt) + 1;
fileInfo.fDbRoot = dbRoot; fileInfo.fDbRoot = dbRoot;
mutex::scoped_lock lk(vbFileLock); boost::mutex::scoped_lock lk(vbFileLock);
pTargetFile = openFile(fileInfo, "r+b", true); pTargetFile = openFile(fileInfo, "r+b", true);
if (pTargetFile == NULL) if (pTargetFile == NULL)

View File

@ -263,7 +263,7 @@ void WEFileReadThread::add2InputDataFileList(std::string& FileName)
void WEFileReadThread::shutdown() void WEFileReadThread::shutdown()
{ {
this->setContinue(false); this->setContinue(false);
mutex::scoped_lock aLock(fFileMutex); //wait till readDataFile() finish boost::mutex::scoped_lock aLock(fFileMutex); //wait till readDataFile() finish
//if(fInFile.is_open()) fInFile.close(); //@BUG 4326 //if(fInFile.is_open()) fInFile.close(); //@BUG 4326
if (fIfFile.is_open()) fIfFile.close(); if (fIfFile.is_open()) fIfFile.close();
@ -300,7 +300,7 @@ void WEFileReadThread::feedData()
//cout << "Length " << aSbs->length() <<endl; - for debug //cout << "Length " << aSbs->length() <<endl; - for debug
fSdh.updateRowTx(aRowCnt, TgtPmId); fSdh.updateRowTx(aRowCnt, TgtPmId);
mutex::scoped_lock aLock(fSdh.fSendMutex); boost::mutex::scoped_lock aLock(fSdh.fSendMutex);
fSdh.send2Pm(aSbs, TgtPmId); fSdh.send2Pm(aSbs, TgtPmId);
aLock.unlock(); aLock.unlock();
setTgtPmId(0); //reset PmId. Send the data to next least data setTgtPmId(0); //reset PmId. Send the data to next least data
@ -345,7 +345,7 @@ void WEFileReadThread::feedData()
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
unsigned int WEFileReadThread::readDataFile(messageqcpp::SBS& Sbs) unsigned int WEFileReadThread::readDataFile(messageqcpp::SBS& Sbs)
{ {
mutex::scoped_lock aLock(fFileMutex); boost::mutex::scoped_lock aLock(fFileMutex);
// For now we are going to send KEEPALIVES // For now we are going to send KEEPALIVES
//*Sbs << (ByteStream::byte)(WE_CLT_SRV_KEEPALIVE); //*Sbs << (ByteStream::byte)(WE_CLT_SRV_KEEPALIVE);
@ -409,7 +409,7 @@ unsigned int WEFileReadThread::readDataFile(messageqcpp::SBS& Sbs)
unsigned int WEFileReadThread::readBinaryDataFile(messageqcpp::SBS& Sbs, unsigned int WEFileReadThread::readBinaryDataFile(messageqcpp::SBS& Sbs,
unsigned int recLen) unsigned int recLen)
{ {
mutex::scoped_lock aLock(fFileMutex); boost::mutex::scoped_lock aLock(fFileMutex);
if ((fInFile.good()) && (!fInFile.eof())) if ((fInFile.good()) && (!fInFile.eof()))
{ {

View File

@ -85,7 +85,7 @@ namespace WriteEngine
void WEPmList::addPm2List(int PmId) void WEPmList::addPm2List(int PmId)
{ {
mutex::scoped_lock aLock(fListMutex); boost::mutex::scoped_lock aLock(fListMutex);
fPmList.push_back(PmId); fPmList.push_back(PmId);
aLock.unlock(); aLock.unlock();
} }
@ -94,7 +94,7 @@ void WEPmList::addPm2List(int PmId)
void WEPmList::addPriorityPm2List(int PmId) void WEPmList::addPriorityPm2List(int PmId)
{ {
mutex::scoped_lock aLock(fListMutex); boost::mutex::scoped_lock aLock(fListMutex);
fPmList.push_front(PmId); fPmList.push_front(PmId);
aLock.unlock(); aLock.unlock();
} }
@ -102,7 +102,7 @@ void WEPmList::addPriorityPm2List(int PmId)
int WEPmList::getNextPm() int WEPmList::getNextPm()
{ {
mutex::scoped_lock aLock(fListMutex); boost::mutex::scoped_lock aLock(fListMutex);
int aPmId = 0; int aPmId = 0;
if (!fPmList.empty()) if (!fPmList.empty())
@ -119,7 +119,7 @@ int WEPmList::getNextPm()
// so that the sendingthreads will not keep sending data. // so that the sendingthreads will not keep sending data.
void WEPmList::clearPmList() void WEPmList::clearPmList()
{ {
mutex::scoped_lock aLock(fListMutex); boost::mutex::scoped_lock aLock(fListMutex);
if (!fPmList.empty()) if (!fPmList.empty())
fPmList.clear(); fPmList.clear();
@ -131,7 +131,7 @@ void WEPmList::clearPmList()
bool WEPmList::check4Pm(int PmId) bool WEPmList::check4Pm(int PmId)
{ {
mutex::scoped_lock aLock(fListMutex); boost::mutex::scoped_lock aLock(fListMutex);
WePmList::iterator aIt = fPmList.begin(); WePmList::iterator aIt = fPmList.begin();
bool aFound = false; bool aFound = false;
@ -280,7 +280,7 @@ void WESDHandler::send2Pm(ByteStream& Bs, unsigned int PmId)
{ {
if (fWeSplClients[aIdx] != 0) if (fWeSplClients[aIdx] != 0)
{ {
mutex::scoped_lock aLock(fWeSplClients[aIdx]->fWriteMutex); boost::mutex::scoped_lock aLock(fWeSplClients[aIdx]->fWriteMutex);
fWeSplClients[aIdx]->write(Bs); fWeSplClients[aIdx]->write(Bs);
aLock.unlock(); aLock.unlock();
} }
@ -288,7 +288,7 @@ void WESDHandler::send2Pm(ByteStream& Bs, unsigned int PmId)
} }
else else
{ {
mutex::scoped_lock aLock(fWeSplClients[PmId]->fWriteMutex); boost::mutex::scoped_lock aLock(fWeSplClients[PmId]->fWriteMutex);
fWeSplClients[PmId]->write(Bs); fWeSplClients[PmId]->write(Bs);
aLock.unlock(); aLock.unlock();
} }
@ -310,7 +310,7 @@ void WESDHandler::send2Pm(messageqcpp::SBS& Sbs, unsigned int PmId)
{ {
if (fWeSplClients[aIdx] != 0) if (fWeSplClients[aIdx] != 0)
{ {
mutex::scoped_lock aLock(fWeSplClients[aIdx]->fSentQMutex); boost::mutex::scoped_lock aLock(fWeSplClients[aIdx]->fSentQMutex);
fWeSplClients[aIdx]->add2SendQueue(Sbs); fWeSplClients[aIdx]->add2SendQueue(Sbs);
aLock.unlock(); aLock.unlock();
@ -319,7 +319,7 @@ void WESDHandler::send2Pm(messageqcpp::SBS& Sbs, unsigned int PmId)
} }
else else
{ {
mutex::scoped_lock aLock(fWeSplClients[PmId]->fSentQMutex); boost::mutex::scoped_lock aLock(fWeSplClients[PmId]->fSentQMutex);
fWeSplClients[PmId]->add2SendQueue(Sbs); fWeSplClients[PmId]->add2SendQueue(Sbs);
aLock.unlock(); aLock.unlock();
} }
@ -363,7 +363,7 @@ void WESDHandler::checkForRespMsgs()
while (isContinue()) while (isContinue())
{ {
mutex::scoped_lock aLock(fRespMutex); boost::mutex::scoped_lock aLock(fRespMutex);
//NOTE - if isContinue is not checked thread will hang on shutdown //NOTE - if isContinue is not checked thread will hang on shutdown
// by locking again on fRespList.empty() // by locking again on fRespList.empty()
@ -458,7 +458,7 @@ void WESDHandler::checkForRespMsgs()
void WESDHandler::add2RespQueue(const messageqcpp::SBS& Sbs) void WESDHandler::add2RespQueue(const messageqcpp::SBS& Sbs)
{ {
mutex::scoped_lock aLock(fRespMutex); boost::mutex::scoped_lock aLock(fRespMutex);
fRespList.push_back(Sbs); fRespList.push_back(Sbs);
aLock.unlock(); aLock.unlock();
//cout <<"Notifing from add2RespQueue" << endl; //cout <<"Notifing from add2RespQueue" << endl;
@ -891,7 +891,7 @@ void WESDHandler::cancelOutstandingCpimports()
if (fWeSplClients[aCnt]->isConnected()) if (fWeSplClients[aCnt]->isConnected())
{ {
mutex::scoped_lock aLock(fWeSplClients[aCnt]->fWriteMutex); boost::mutex::scoped_lock aLock(fWeSplClients[aCnt]->fWriteMutex);
fWeSplClients[aCnt]->write(aBs); fWeSplClients[aCnt]->write(aBs);
aLock.unlock(); aLock.unlock();
} }
@ -1045,7 +1045,7 @@ void WESDHandler::shutdown()
} }
mutex::scoped_lock aLock(fRespMutex); boost::mutex::scoped_lock aLock(fRespMutex);
this->setContinue(false); this->setContinue(false);
usleep(100000); // so that response thread get updated. usleep(100000); // so that response thread get updated.
fRespCond.notify_all(); fRespCond.notify_all();
@ -2015,7 +2015,7 @@ void WESDHandler::doRollback()
aBs << (ByteStream::quadbyte) fTableOId; aBs << (ByteStream::quadbyte) fTableOId;
aBs << fRef.fCmdArgs.getTableName(); aBs << fRef.fCmdArgs.getTableName();
aBs << aAppName; aBs << aAppName;
mutex::scoped_lock aLock(fSendMutex); boost::mutex::scoped_lock aLock(fSendMutex);
send2Pm(aBs); send2Pm(aBs);
aLock.unlock(); aLock.unlock();
@ -2032,7 +2032,7 @@ void WESDHandler::doCleanup(bool deleteHdfsTempDbFiles)
aBs << (ByteStream::byte) WE_CLT_SRV_CLEANUP; aBs << (ByteStream::byte) WE_CLT_SRV_CLEANUP;
aBs << (ByteStream::quadbyte) fTableOId; aBs << (ByteStream::quadbyte) fTableOId;
aBs << (ByteStream::byte) deleteHdfsTempDbFiles; aBs << (ByteStream::byte) deleteHdfsTempDbFiles;
mutex::scoped_lock aLock(fSendMutex); boost::mutex::scoped_lock aLock(fSendMutex);
send2Pm(aBs); send2Pm(aBs);
aLock.unlock(); aLock.unlock();
} }

View File

@ -215,7 +215,7 @@ void WESplClient::send()
cout << "DataRqstCnt [" << getPmId() << "] = " cout << "DataRqstCnt [" << getPmId() << "] = "
<< getDataRqstCount() << endl; << getDataRqstCount() << endl;
mutex::scoped_lock aLock(fSentQMutex); boost::mutex::scoped_lock aLock(fSentQMutex);
messageqcpp::SBS aSbs = fSendQueue.front(); messageqcpp::SBS aSbs = fSendQueue.front();
fSendQueue.pop(); fSendQueue.pop();
aLock.unlock(); aLock.unlock();
@ -223,7 +223,7 @@ void WESplClient::send()
if (aLen > 0) if (aLen > 0)
{ {
mutex::scoped_lock aLock(fWriteMutex); boost::mutex::scoped_lock aLock(fWriteMutex);
setBytesTx(getBytesTx() + aLen); setBytesTx(getBytesTx() + aLen);
try try
@ -323,7 +323,7 @@ void WESplClient::add2SendQueue(const messageqcpp::SBS& Sbs)
void WESplClient::clearSendQueue() void WESplClient::clearSendQueue()
{ {
mutex::scoped_lock aLock(fSentQMutex); boost::mutex::scoped_lock aLock(fSentQMutex);
while (!fSendQueue.empty()) while (!fSendQueue.empty())
fSendQueue.pop(); fSendQueue.pop();
@ -334,7 +334,7 @@ void WESplClient::clearSendQueue()
int WESplClient::getSendQSize() int WESplClient::getSendQSize()
{ {
int aQSize = 0; int aQSize = 0;
mutex::scoped_lock aLock(fSentQMutex); boost::mutex::scoped_lock aLock(fSentQMutex);
aQSize = fSendQueue.size(); aQSize = fSendQueue.size();
aLock.unlock(); aLock.unlock();
return aQSize; return aQSize;