diff --git a/dbcon/ddlpackageproc/altertableprocessor.cpp b/dbcon/ddlpackageproc/altertableprocessor.cpp index 29edd1112..f250da133 100644 --- a/dbcon/ddlpackageproc/altertableprocessor.cpp +++ b/dbcon/ddlpackageproc/altertableprocessor.cpp @@ -762,7 +762,7 @@ void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemC int pmNum = 1; OamCache* oamcache = OamCache::makeOamCache(); //boost::shared_ptr> > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = oamcache->getOwnerPM(); //*(*dbRootPMMap)[dbRoot].begin(); + pmNum = oamcache->getOwnerPM(dbRoot); //*(*dbRootPMMap)[dbRoot].begin(); boost::shared_ptr bsIn; // Will create files on each PM as needed. @@ -925,7 +925,7 @@ void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemC if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot "); - pmNum = *(*dbRootPMMap)[dbRoot].begin(); + pmNum = oamcache->getOwnerPM(dbRoot); bs.restart(); bs << (ByteStream::byte)WE_SVR_UPDATE_SYSTABLE_AUTO; bs << uniqueId; @@ -1379,7 +1379,7 @@ void AlterTableProcessor::dropColumn(uint32_t sessionID, execplan::CalpontSystem if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot "); - pmNum = *(*dbRootPMMap)[dbRoot].begin(); + pmNum = oamcache->getOwnerPM(dbRoot); bytestream.restart(); bytestream << (ByteStream::byte)WE_SVR_UPDATE_SYSTABLE_AUTO; bytestream << uniqueId; @@ -1452,7 +1452,7 @@ void AlterTableProcessor::dropColumn(uint32_t sessionID, execplan::CalpontSystem if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot "); - pmNum = *(*dbRootPMMap)[dbRoot].begin(); + pmNum = oamcache->getOwnerPM(dbRoot); try { @@ -1707,7 +1707,7 @@ void AlterTableProcessor::setColumnDefault(uint32_t sessionID, execplan::Calpont int pmNum = 1; OamCache* oamcache = OamCache::makeOamCache(); //boost::shared_ptr> > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = oamcacne->getOwnerPM(dbRoot); //*(*dbRootPMMap)[dbRoot].begin(); + pmNum = oamcache->getOwnerPM(dbRoot); //*(*dbRootPMMap)[dbRoot].begin(); boost::shared_ptr bsIn; @@ -2540,7 +2540,7 @@ void AlterTableProcessor::renameColumn(uint32_t sessionID, execplan::CalpontSyst if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot"); - pmNum = *(*dbRootPMMap)[dbRoot].begin(); + pmNum = oamcache->getOwnerPM(dbRoot); // send to WES to process try diff --git a/dbcon/ddlpackageproc/createtableprocessor.cpp b/dbcon/ddlpackageproc/createtableprocessor.cpp index 0b9f68f00..cd61df7a8 100644 --- a/dbcon/ddlpackageproc/createtableprocessor.cpp +++ b/dbcon/ddlpackageproc/createtableprocessor.cpp @@ -326,8 +326,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackageInternal( bytestream << (uint32_t)dbRoot; tableDef.serialize(bytestream); boost::shared_ptr bsIn; - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); // MCOL-66 The DBRM can't handle concurrent DDL boost::mutex::scoped_lock lk(dbrmMutex); @@ -450,7 +449,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackageInternal( bytestream << (uint32_t)dbRoot; tableDef.serialize(bytestream); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); try { @@ -666,7 +665,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackageInternal( return result; } - pmNum = (*dbRootPMMap)[useDBRoot]; + pmNum = oamcache->getOwnerPM(useDBRoot); try { diff --git a/dbcon/ddlpackageproc/droptableprocessor.cpp b/dbcon/ddlpackageproc/droptableprocessor.cpp index faa28b721..3720dbb5f 100644 --- a/dbcon/ddlpackageproc/droptableprocessor.cpp +++ b/dbcon/ddlpackageproc/droptableprocessor.cpp @@ -359,8 +359,7 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackageInternal(ddlpack return result; } - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); try { @@ -463,7 +462,7 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackageInternal(ddlpack return result; } - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); try { @@ -1274,8 +1273,7 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackageInternal(ddlpa bytestream << (uint32_t)colType.compressionType; } - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[useDBRoot]; + pmNum = oamcache->getOwnerPM(useDBRoot); try { diff --git a/dbcon/dmlpackageproc/commandpackageprocessor.cpp b/dbcon/dmlpackageproc/commandpackageprocessor.cpp index 0554e9ec4..e57a80a54 100644 --- a/dbcon/dmlpackageproc/commandpackageprocessor.cpp +++ b/dbcon/dmlpackageproc/commandpackageprocessor.cpp @@ -845,20 +845,15 @@ void CommandPackageProcessor::clearTableLock(uint64_t uniqueId, const dmlpackage establishTableLockToClear(tableLockID, lockInfo); lockGrabbed = true; - oam::OamCache* oamCache = oam::OamCache::makeOamCache(); - //oam::OamCache::dbRootPMMap_t dbRootPmMap = oamCache->getDBRootToPMMap(); - //std::map::const_iterator mapIter; std::set pmSet; // Construct relevant list of PMs based on the DBRoots associated // with the tableLock. for (unsigned int k = 0; k < lockInfo.dbrootList.size(); k++) { - mapIter = dbRootPmMap->find(lockInfo.dbrootList[k]); - - if (!oamCache->isOffline(lockInfo.dbrootList[k])) + if (!oamcache()->isOffline(lockInfo.dbrootList[k])) { - int pm = oamCache->getOwnerPM(lockInfo.dbrootList[k]); + int pm = oamcache()->getOwnerPM(lockInfo.dbrootList[k]); pmSet.insert(pm); } else diff --git a/dbcon/dmlpackageproc/deletepackageprocessor.cpp b/dbcon/dmlpackageproc/deletepackageprocessor.cpp index 2653fd68d..033f709c2 100644 --- a/dbcon/dmlpackageproc/deletepackageprocessor.cpp +++ b/dbcon/dmlpackageproc/deletepackageprocessor.cpp @@ -643,7 +643,7 @@ bool DeletePackageProcessor::processRowgroup(ByteStream& aRowGroup, DMLResult& r { bool rc = false; // cout << "Get dbroot " << dbroot << endl; - int pmNum = (*fDbRootPMMap)[dbroot]; + int pmNum = oamcache()->getOwnerPM(dbroot); DMLTable* tablePtr = cpackage.get_Table(); ByteStream bytestream; bytestream << (ByteStream::byte)WE_SVR_DELETE; diff --git a/dbcon/dmlpackageproc/dmlpackageprocessor.h b/dbcon/dmlpackageproc/dmlpackageprocessor.h index 06d6b35d5..4379e9412 100644 --- a/dbcon/dmlpackageproc/dmlpackageprocessor.h +++ b/dbcon/dmlpackageproc/dmlpackageprocessor.h @@ -165,8 +165,6 @@ class DMLPackageProcessor std::cout << "Cannot make connection to WES" << std::endl; } - oam::OamCache* oamCache = oam::OamCache::makeOamCache(); - fDbRootPMMap = oamCache->getDBRootToPMMap(); fDbrm = aDbrm; fSessionID = sid; fExeMgr = new execplan::ClientRotator(fSessionID, "ExeMgr"); @@ -489,6 +487,8 @@ class DMLPackageProcessor uint32_t tableOid); int endTransaction(uint64_t uniqueId, BRM::TxnID txnID, bool success); + oam::OamCache* oamcache() { return oam::OamCache::makeOamCache(); } + /** @brief the Session Manager interface */ execplan::SessionManager fSessionManager; @@ -500,8 +500,6 @@ class DMLPackageProcessor uint32_t fPMCount; WriteEngine::WEClients* fWEClient; BRM::DBRM* fDbrm; - boost::shared_ptr> > fDbRootPMMap; - oam::Oam fOam; bool fRollbackPending; // When set, any derived object should stop what it's doing and cleanup in // preparation for a Rollback execplan::ClientRotator* fExeMgr; diff --git a/dbcon/dmlpackageproc/insertpackageprocessor.cpp b/dbcon/dmlpackageproc/insertpackageprocessor.cpp index 524acb677..9629a504c 100644 --- a/dbcon/dmlpackageproc/insertpackageprocessor.cpp +++ b/dbcon/dmlpackageproc/insertpackageprocessor.cpp @@ -275,8 +275,7 @@ DMLPackageProcessor::DMLResult InsertPackageProcessor::processPackageInternal( if (tmpSet) { dbroot = tmp.dbRoot; - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbroot]; + pmNum = oamcache->getOwnerPM(dbroot); //@Bug 4760. validate pm value if (pmNum == 0) diff --git a/dbcon/dmlpackageproc/updatepackageprocessor.cpp b/dbcon/dmlpackageproc/updatepackageprocessor.cpp index 651779a58..91508ca46 100644 --- a/dbcon/dmlpackageproc/updatepackageprocessor.cpp +++ b/dbcon/dmlpackageproc/updatepackageprocessor.cpp @@ -161,8 +161,7 @@ DMLPackageProcessor::DMLResult UpdatePackageProcessor::processPackageInternal( int32_t sessionId = fSessionID; std::string processName("DMLProc"); int i = 0; - OamCache* oamcache = OamCache::makeOamCache(); - std::vector pmList = oamcache->getModuleIds(); + std::vector pmList = oamcache()->getModuleIds(); std::vector pms; for (unsigned i = 0; i < pmList.size(); i++) @@ -437,8 +436,7 @@ uint64_t UpdatePackageProcessor::fixUpRows(dmlpackage::CalpontDMLPackage& cpacka uint64_t rowsProcessed = 0; uint32_t dbroot = 1; bool metaData = false; - oam::OamCache* oamCache = oam::OamCache::makeOamCache(); - std::vector fPMs = oamCache->getModuleIds(); + std::vector fPMs = oamcache()->getModuleIds(); std::map pmState; string emsg; string emsgStr; @@ -726,7 +724,7 @@ bool UpdatePackageProcessor::processRowgroup(ByteStream& aRowGroup, DMLResult& r { bool rc = false; // cout << "Get dbroot " << dbroot << endl; - uint32_t pmNum = (*fDbRootPMMap)[dbroot]; + uint32_t pmNum = oamcache()->getOwnerPM(dbroot); ByteStream bytestream; bytestream << (uint8_t)WE_SVR_UPDATE; diff --git a/oam/oamcpp/oamcache.cpp b/oam/oamcpp/oamcache.cpp index 78e0a83b0..11c7489de 100644 --- a/oam/oamcpp/oamcache.cpp +++ b/oam/oamcpp/oamcache.cpp @@ -39,7 +39,7 @@ using namespace boost; namespace oam { - +#if 0 struct CacheReloaded { CacheReloaded() @@ -246,5 +246,6 @@ bool OamCache::isOffline(int dbRoot) { return dbRootConnectionMap->find(scannedExtents[i].dbRoot) == dbRootConnectionMap->end(); } +#endif } /* namespace oam */ diff --git a/oam/oamcpp/oamcache.h b/oam/oamcpp/oamcache.h index 43ba9daeb..f019aab5b 100644 --- a/oam/oamcpp/oamcache.h +++ b/oam/oamcpp/oamcache.h @@ -51,6 +51,7 @@ class OamCache int getClosestConnection(int dbroot); // connection index to owner's PM or ours PM - who can access dbRoot. int getOwnerConnection(int dbroot); // connection index to owner's PM. int getOwnerPM(int dbroot); // Owner's PM index. + std::vector getPMDBRoots(int PM); // what DBRoots are owned by given PM. bool isAccessibleBy(int dbRoot, int pmId); //// a necessary DB root is offline //if (dbRootConnectionMap->find(scannedExtents[i].dbRoot) == dbRootConnectionMap->end()) diff --git a/utils/batchloader/batchloader.cpp b/utils/batchloader/batchloader.cpp index 0e9bfe836..0e1bb6a59 100644 --- a/utils/batchloader/batchloader.cpp +++ b/utils/batchloader/batchloader.cpp @@ -48,32 +48,7 @@ BatchLoader::BatchLoader(uint32_t tableOid, execplan::CalpontSystemCatalog::SCN fPMs = PMs; fSessionId = sessionId; fTableOid = tableOid; - OamCache* oamcache = OamCache::makeOamCache(); - oam::OamCache::PMDbrootsMap_t systemPmDbrootMap = oamcache->getPMToDbrootsMap(); - std::map::iterator iter = systemPmDbrootMap->begin(); - // cout << "fPMs size is " << fPMs.size() << endl; - fPmDbrootMap.reset(new OamCache::PMDbrootsMap_t::element_type()); - fDbrootPMmap.reset(new map()); - - for (uint32_t i = 0; i < fPMs.size(); i++) - { - iter = systemPmDbrootMap->find(fPMs[i]); - - if (iter != systemPmDbrootMap->end()) - { - fDbRoots.insert(fDbRoots.end(), (iter->second).begin(), (iter->second).end()); - (*fPmDbrootMap)[fPMs[i]] = iter->second; - } - } - - // Build dbroot to PM map - for (iter = fPmDbrootMap->begin(); iter != fPmDbrootMap->end(); iter++) - { - for (uint32_t i = 0; i < iter->second.size(); i++) - { - (*fDbrootPMmap)[iter->second[i]] = iter->first; - } - } + fOamCache = OamCache::makeOamCache(); } //------------------------------------------------------------------------------ // Select the first PM to send the first batch of rows. @@ -199,12 +174,7 @@ void BatchLoader::selectFirstPM(uint32_t& PMId) if (createdDbroot != 0) { - std::map::iterator iter = fDbrootPMmap->begin(); - - iter = fDbrootPMmap->find(createdDbroot); - - if (iter != fDbrootPMmap->end()) - PMId = iter->second; + PMId = fOamCache->getOwnerPM(createdDbroot); } // This will build the batch distribution sequence @@ -233,12 +203,12 @@ void BatchLoader::selectFirstPM(uint32_t& PMId) { PMRootInfo aEntry; aEntry.PMId = fPmDistSeq[j]; - iter = fPmDbrootMap->find(aEntry.PMId); + auto dbroots = fOamCache->getPMDBRoots(PMId); - for (unsigned k = 0; k < (iter->second).size(); k++) + for (unsigned k = 0; k < dbroots.size(); k++) { RootExtentsBlocks aRootInfo; - aRootInfo.DBRoot = (iter->second)[k]; + aRootInfo.DBRoot = dbroots[k]; aRootInfo.numExtents = rootExtents[aRootInfo.DBRoot]; aRootInfo.numBlocks = rootBlocks[aRootInfo.DBRoot]; // cout << "aRootInfo DBRoot:numExtents:numBlocks = " << @@ -410,17 +380,15 @@ void BatchLoader::buildBatchDistSeqVector() fPmDistSeq.clear(); BlIntVec aDbCntVec(fPMs.size()); - std::map::iterator iter = fPmDbrootMap->begin(); - for (uint32_t i = 0; i < fPMs.size(); i++) { - iter = fPmDbrootMap->find(fPMs[i]); + auto dbroots = fOamCache->getPMDBRoots(fPMs[i]); - if ((iter != fPmDbrootMap->end()) && ((iter->second).begin() != (iter->second).end())) + if (dbroots.size() > 0) { try { - aDbCntVec[i] = (iter->second).size(); + aDbCntVec[i] = dbroots.size(); // cout << "PM - "<getDBRootToPMMap(); - int pmId = (*dbrootToPM)[dbroot]; + int pmId = fOamCache->getOwnerPM(dbroot); ostringstream oss; oss << "pm" << pmId << "_WriteEngineServer"; diff --git a/writeengine/redistribute/we_redistributeworkerthread.cpp b/writeengine/redistribute/we_redistributeworkerthread.cpp index 1cd33c8a7..71fbb8425 100644 --- a/writeengine/redistribute/we_redistributeworkerthread.cpp +++ b/writeengine/redistribute/we_redistributeworkerthread.cpp @@ -134,11 +134,10 @@ void RedistributeWorkerThread::handleRequest() { memcpy(&fPlanEntry, fBs.buf(), sizeof(RedistributePlanEntry)); fBs.advance(sizeof(RedistributePlanEntry)); - OamCache::dbRootPMMap_t dbrootToPM = fOamCache->getDBRootToPMMap(); fMyId.first = fPlanEntry.source; - fMyId.second = (*dbrootToPM)[fMyId.first]; + fMyId.second = fOamCache->getOwnerPM(fMyId.first); fPeerId.first = fPlanEntry.destination; - fPeerId.second = (*dbrootToPM)[fPeerId.first]; + fPeerId.second = fOamCache->getOwnerPM(fPeerId.first); if (grabTableLock() == 0) {