diff --git a/dbcon/ddlpackageproc/altertableprocessor.cpp b/dbcon/ddlpackageproc/altertableprocessor.cpp index 29e27a97f..92cffe90d 100644 --- a/dbcon/ddlpackageproc/altertableprocessor.cpp +++ b/dbcon/ddlpackageproc/altertableprocessor.cpp @@ -761,8 +761,7 @@ void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemC int pmNum = 1; OamCache* oamcache = OamCache::makeOamCache(); - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); boost::shared_ptr bsIn; // Will create files on each PM as needed. @@ -925,7 +924,7 @@ void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemC if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot "); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); bs.restart(); bs << (ByteStream::byte)WE_SVR_UPDATE_SYSTABLE_AUTO; bs << uniqueId; @@ -1317,8 +1316,7 @@ void AlterTableProcessor::dropColumn(uint32_t sessionID, execplan::CalpontSystem boost::shared_ptr bsIn; OamCache* oamcache = OamCache::makeOamCache(); - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); // MCOL-66 The DBRM can't handle concurrent DDL boost::mutex::scoped_lock lk(dbrmMutex); @@ -1379,7 +1377,7 @@ void AlterTableProcessor::dropColumn(uint32_t sessionID, execplan::CalpontSystem if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot "); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); bytestream.restart(); bytestream << (ByteStream::byte)WE_SVR_UPDATE_SYSTABLE_AUTO; bytestream << uniqueId; @@ -1452,7 +1450,7 @@ void AlterTableProcessor::dropColumn(uint32_t sessionID, execplan::CalpontSystem if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot "); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); try { @@ -1706,8 +1704,7 @@ void AlterTableProcessor::setColumnDefault(uint32_t sessionID, execplan::Calpont int pmNum = 1; OamCache* oamcache = OamCache::makeOamCache(); - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); boost::shared_ptr bsIn; @@ -1795,8 +1792,7 @@ void AlterTableProcessor::dropColumnDefault(uint32_t sessionID, execplan::Calpon int pmNum = 1; OamCache* oamcache = OamCache::makeOamCache(); - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); boost::shared_ptr bsIn; @@ -2001,8 +1997,7 @@ void AlterTableProcessor::renameTable(uint32_t sessionID, execplan::CalpontSyste boost::shared_ptr bsIn; OamCache* oamcache = OamCache::makeOamCache(); - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); try { @@ -2067,7 +2062,7 @@ void AlterTableProcessor::renameTable(uint32_t sessionID, execplan::CalpontSyste if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot"); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); try { @@ -2131,8 +2126,7 @@ void AlterTableProcessor::tableComment(uint32_t sessionID, execplan::CalpontSyst int pmNum = 1; rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot); OamCache* oamcache = OamCache::makeOamCache(); - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot"); @@ -2354,8 +2348,7 @@ void AlterTableProcessor::renameColumn(uint32_t sessionID, execplan::CalpontSyst int pmNum = 1; OamCache* oamcache = OamCache::makeOamCache(); - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); boost::shared_ptr bsIn; CalpontSystemCatalog::TableName tableName; @@ -2539,7 +2532,7 @@ void AlterTableProcessor::renameColumn(uint32_t sessionID, execplan::CalpontSyst if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot"); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); // send to WES to process try diff --git a/dbcon/ddlpackageproc/createtableprocessor.cpp b/dbcon/ddlpackageproc/createtableprocessor.cpp index 0b9f68f00..cd61df7a8 100644 --- a/dbcon/ddlpackageproc/createtableprocessor.cpp +++ b/dbcon/ddlpackageproc/createtableprocessor.cpp @@ -326,8 +326,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackageInternal( bytestream << (uint32_t)dbRoot; tableDef.serialize(bytestream); boost::shared_ptr bsIn; - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); // MCOL-66 The DBRM can't handle concurrent DDL boost::mutex::scoped_lock lk(dbrmMutex); @@ -450,7 +449,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackageInternal( bytestream << (uint32_t)dbRoot; tableDef.serialize(bytestream); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); try { @@ -666,7 +665,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackageInternal( return result; } - pmNum = (*dbRootPMMap)[useDBRoot]; + pmNum = oamcache->getOwnerPM(useDBRoot); try { diff --git a/dbcon/ddlpackageproc/ddlpackageprocessor.cpp b/dbcon/ddlpackageproc/ddlpackageprocessor.cpp index e4b8786d8..7dc815de0 100644 --- a/dbcon/ddlpackageproc/ddlpackageprocessor.cpp +++ b/dbcon/ddlpackageproc/ddlpackageprocessor.cpp @@ -611,8 +611,7 @@ void DDLPackageProcessor::createFiles(CalpontSystemCatalog::TableName aTableName try { OamCache* oamcache = OamCache::makeOamCache(); - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - int pmNum = (*dbRootPMMap)[useDBRoot]; + int pmNum = oamcache->getOwnerPM(useDBRoot); fWEClient->write(bytestream, (uint32_t)pmNum); bsIn.reset(new ByteStream()); diff --git a/dbcon/ddlpackageproc/droptableprocessor.cpp b/dbcon/ddlpackageproc/droptableprocessor.cpp index cd259f586..b8d67fe7e 100644 --- a/dbcon/ddlpackageproc/droptableprocessor.cpp +++ b/dbcon/ddlpackageproc/droptableprocessor.cpp @@ -359,8 +359,7 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackageInternal(ddlpack return result; } - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); try { @@ -464,7 +463,7 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackageInternal(ddlpack return result; } - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); try { @@ -1275,8 +1274,7 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackageInternal(ddlpa bytestream << (uint32_t)colType.compressionType; } - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[useDBRoot]; + pmNum = oamcache->getOwnerPM(useDBRoot); try { diff --git a/dbcon/dmlpackageproc/commandpackageprocessor.cpp b/dbcon/dmlpackageproc/commandpackageprocessor.cpp index 7c3c4794d..e57a80a54 100644 --- a/dbcon/dmlpackageproc/commandpackageprocessor.cpp +++ b/dbcon/dmlpackageproc/commandpackageprocessor.cpp @@ -845,20 +845,15 @@ void CommandPackageProcessor::clearTableLock(uint64_t uniqueId, const dmlpackage establishTableLockToClear(tableLockID, lockInfo); lockGrabbed = true; - oam::OamCache* oamCache = oam::OamCache::makeOamCache(); - oam::OamCache::dbRootPMMap_t dbRootPmMap = oamCache->getDBRootToPMMap(); - std::map::const_iterator mapIter; std::set pmSet; // Construct relevant list of PMs based on the DBRoots associated // with the tableLock. for (unsigned int k = 0; k < lockInfo.dbrootList.size(); k++) { - mapIter = dbRootPmMap->find(lockInfo.dbrootList[k]); - - if (mapIter != dbRootPmMap->end()) + if (!oamcache()->isOffline(lockInfo.dbrootList[k])) { - int pm = mapIter->second; + int pm = oamcache()->getOwnerPM(lockInfo.dbrootList[k]); pmSet.insert(pm); } else diff --git a/dbcon/dmlpackageproc/deletepackageprocessor.cpp b/dbcon/dmlpackageproc/deletepackageprocessor.cpp index 2653fd68d..033f709c2 100644 --- a/dbcon/dmlpackageproc/deletepackageprocessor.cpp +++ b/dbcon/dmlpackageproc/deletepackageprocessor.cpp @@ -643,7 +643,7 @@ bool DeletePackageProcessor::processRowgroup(ByteStream& aRowGroup, DMLResult& r { bool rc = false; // cout << "Get dbroot " << dbroot << endl; - int pmNum = (*fDbRootPMMap)[dbroot]; + int pmNum = oamcache()->getOwnerPM(dbroot); DMLTable* tablePtr = cpackage.get_Table(); ByteStream bytestream; bytestream << (ByteStream::byte)WE_SVR_DELETE; diff --git a/dbcon/dmlpackageproc/dmlpackageprocessor.h b/dbcon/dmlpackageproc/dmlpackageprocessor.h index ec7956c81..4379e9412 100644 --- a/dbcon/dmlpackageproc/dmlpackageprocessor.h +++ b/dbcon/dmlpackageproc/dmlpackageprocessor.h @@ -165,8 +165,6 @@ class DMLPackageProcessor std::cout << "Cannot make connection to WES" << std::endl; } - oam::OamCache* oamCache = oam::OamCache::makeOamCache(); - fDbRootPMMap = oamCache->getDBRootToPMMap(); fDbrm = aDbrm; fSessionID = sid; fExeMgr = new execplan::ClientRotator(fSessionID, "ExeMgr"); @@ -489,6 +487,8 @@ class DMLPackageProcessor uint32_t tableOid); int endTransaction(uint64_t uniqueId, BRM::TxnID txnID, bool success); + oam::OamCache* oamcache() { return oam::OamCache::makeOamCache(); } + /** @brief the Session Manager interface */ execplan::SessionManager fSessionManager; @@ -500,8 +500,6 @@ class DMLPackageProcessor uint32_t fPMCount; WriteEngine::WEClients* fWEClient; BRM::DBRM* fDbrm; - boost::shared_ptr > fDbRootPMMap; - oam::Oam fOam; bool fRollbackPending; // When set, any derived object should stop what it's doing and cleanup in // preparation for a Rollback execplan::ClientRotator* fExeMgr; diff --git a/dbcon/dmlpackageproc/insertpackageprocessor.cpp b/dbcon/dmlpackageproc/insertpackageprocessor.cpp index 524acb677..9629a504c 100644 --- a/dbcon/dmlpackageproc/insertpackageprocessor.cpp +++ b/dbcon/dmlpackageproc/insertpackageprocessor.cpp @@ -275,8 +275,7 @@ DMLPackageProcessor::DMLResult InsertPackageProcessor::processPackageInternal( if (tmpSet) { dbroot = tmp.dbRoot; - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbroot]; + pmNum = oamcache->getOwnerPM(dbroot); //@Bug 4760. validate pm value if (pmNum == 0) diff --git a/dbcon/dmlpackageproc/updatepackageprocessor.cpp b/dbcon/dmlpackageproc/updatepackageprocessor.cpp index 651779a58..91508ca46 100644 --- a/dbcon/dmlpackageproc/updatepackageprocessor.cpp +++ b/dbcon/dmlpackageproc/updatepackageprocessor.cpp @@ -161,8 +161,7 @@ DMLPackageProcessor::DMLResult UpdatePackageProcessor::processPackageInternal( int32_t sessionId = fSessionID; std::string processName("DMLProc"); int i = 0; - OamCache* oamcache = OamCache::makeOamCache(); - std::vector pmList = oamcache->getModuleIds(); + std::vector pmList = oamcache()->getModuleIds(); std::vector pms; for (unsigned i = 0; i < pmList.size(); i++) @@ -437,8 +436,7 @@ uint64_t UpdatePackageProcessor::fixUpRows(dmlpackage::CalpontDMLPackage& cpacka uint64_t rowsProcessed = 0; uint32_t dbroot = 1; bool metaData = false; - oam::OamCache* oamCache = oam::OamCache::makeOamCache(); - std::vector fPMs = oamCache->getModuleIds(); + std::vector fPMs = oamcache()->getModuleIds(); std::map pmState; string emsg; string emsgStr; @@ -726,7 +724,7 @@ bool UpdatePackageProcessor::processRowgroup(ByteStream& aRowGroup, DMLResult& r { bool rc = false; // cout << "Get dbroot " << dbroot << endl; - uint32_t pmNum = (*fDbRootPMMap)[dbroot]; + uint32_t pmNum = oamcache()->getOwnerPM(dbroot); ByteStream bytestream; bytestream << (uint8_t)WE_SVR_UPDATE; diff --git a/dbcon/joblist/pdictionaryscan.cpp b/dbcon/joblist/pdictionaryscan.cpp index c710466cf..2e1a00e10 100644 --- a/dbcon/joblist/pdictionaryscan.cpp +++ b/dbcon/joblist/pdictionaryscan.cpp @@ -330,16 +330,11 @@ void pDictionaryScan::sendPrimitiveMessages() uint32_t partNum; uint16_t segNum; BRM::OID_t oid; - boost::shared_ptr > dbRootConnectionMap; - boost::shared_ptr > dbRootPMMap; oam::OamCache* oamCache = oam::OamCache::makeOamCache(); int localPMId = oamCache->getLocalPMId(); try { - dbRootConnectionMap = oamCache->getDBRootToConnectionMap(); - dbRootPMMap = oamCache->getDBRootToPMMap(); - it = fDictlbids.begin(); for (; it != fDictlbids.end() && !cancelled(); it++) @@ -350,10 +345,11 @@ void pDictionaryScan::sendPrimitiveMessages() // Bug5741 If we are local only and this doesn't belongs to us, skip it if (fLocalQuery == execplan::CalpontSelectExecutionPlan::LOCAL_QUERY) { + // XXX SZ: this may trigger. if (localPMId == 0) throw IDBExcept(ERR_LOCAL_QUERY_UM); - if (dbRootPMMap->find(dbroot)->second != localPMId) + if (!oamCache->isAccessibleBy(dbroot, localPMId)) continue; } @@ -374,23 +370,22 @@ void pDictionaryScan::sendPrimitiveMessages() if (remainingLbids < msgLbidCount) msgLbidCount = remainingLbids; - if (dbRootConnectionMap->find(dbroot) == dbRootConnectionMap->end()) + if (oamCache->isOffline(dbroot)) { // MCOL-259 force a reload of the xml. This usualy fixes it. Logger log; log.logMessage(logging::LOG_TYPE_DEBUG, "dictionary forcing reload of columnstore.xml for dbRootConnectionMap"); oamCache->forceReload(); - dbRootConnectionMap = oamCache->getDBRootToConnectionMap(); - if (dbRootConnectionMap->find(dbroot) == dbRootConnectionMap->end()) + if (oamCache->isOffline(dbroot)) { log.logMessage(logging::LOG_TYPE_DEBUG, "dictionary still not in dbRootConnectionMap"); throw IDBExcept(ERR_DATA_OFFLINE); } } - sendAPrimitiveMessage(msgLbidStart, msgLbidCount, (*dbRootConnectionMap)[dbroot]); + sendAPrimitiveMessage(msgLbidStart, msgLbidCount, oamCache->getClosestConnection(dbroot)); mutex.lock(); msgsSent += msgLbidCount; diff --git a/dbcon/joblist/primitivestep.h b/dbcon/joblist/primitivestep.h index 008a0e81f..4ba0af992 100644 --- a/dbcon/joblist/primitivestep.h +++ b/dbcon/joblist/primitivestep.h @@ -61,6 +61,7 @@ #include "rowgroup.h" #include "rowaggregation.h" #include "funcexpwrapper.h" +#include "oamcache.h" namespace joblist { @@ -1458,7 +1459,7 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep uint32_t blocksPerJob; /* Pseudo column filter processing. Think about refactoring into a separate class. */ - bool processPseudoColFilters(uint32_t extentIndex, boost::shared_ptr> dbRootPMMap) const; + bool processPseudoColFilters(uint32_t extentIndex, oam::OamCache* oamCache) const; template bool processOneFilterType(int8_t colWidth, T value, uint32_t type) const; template @@ -1472,6 +1473,7 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep bool compareRange(uint8_t COP, int64_t min, int64_t max, int64_t val) const; bool hasPCFilter, hasPMFilter, hasRIDFilter, hasSegmentFilter, hasDBRootFilter, hasSegmentDirFilter, hasPartitionFilter, hasMaxFilter, hasMinFilter, hasLBIDFilter, hasExtentIDFilter; + int findClosestPM(const std::map>& dbrootConnMap, int dbroot); }; /** @brief class FilterStep diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index 1e8e0f93e..46d674130 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -1936,7 +1936,7 @@ bool TupleBPS::processLBIDFilter(const EMEntry& emEntry) const } bool TupleBPS::processPseudoColFilters(uint32_t extentIndex, - boost::shared_ptr> dbRootPMMap) const + oam::OamCache* oamCache) const { if (!hasPCFilter) return true; @@ -1946,7 +1946,7 @@ bool TupleBPS::processPseudoColFilters(uint32_t extentIndex, if (bop == BOP_AND) { /* All Pseudocolumns have been promoted to 8-bytes except the casual partitioning filters */ - return (!hasPMFilter || processOneFilterType(8, (*dbRootPMMap)[emEntry.dbRoot], PSEUDO_PM)) && + return (!hasPMFilter || processOneFilterType(8, oamCache->getClosestPM(emEntry.dbRoot), PSEUDO_PM)) && (!hasSegmentFilter || processOneFilterType(8, emEntry.segmentNum, PSEUDO_SEGMENT)) && (!hasDBRootFilter || processOneFilterType(8, emEntry.dbRoot, PSEUDO_DBROOT)) && (!hasSegmentDirFilter || processOneFilterType(8, emEntry.partitionNum, PSEUDO_SEGMENTDIR)) && @@ -1971,7 +1971,7 @@ bool TupleBPS::processPseudoColFilters(uint32_t extentIndex, } else { - return (hasPMFilter && processOneFilterType(8, (*dbRootPMMap)[emEntry.dbRoot], PSEUDO_PM)) || + return (hasPMFilter && processOneFilterType(8, oamCache->getClosestPM(emEntry.dbRoot), PSEUDO_PM)) || (hasSegmentFilter && processOneFilterType(8, emEntry.segmentNum, PSEUDO_SEGMENT)) || (hasDBRootFilter && processOneFilterType(8, emEntry.dbRoot, PSEUDO_DBROOT)) || (hasSegmentDirFilter && processOneFilterType(8, emEntry.partitionNum, PSEUDO_SEGMENTDIR)) || @@ -2004,8 +2004,6 @@ void TupleBPS::makeJobs(vector* jobs) uint32_t blocksToScan; LBID_t startingLBID; oam::OamCache* oamCache = oam::OamCache::makeOamCache(); - boost::shared_ptr> dbRootConnectionMap = oamCache->getDBRootToConnectionMap(); - boost::shared_ptr> dbRootPMMap = oamCache->getDBRootToPMMap(); int localPMId = oamCache->getLocalPMId(); idbassert(ffirstStepType == SCAN); @@ -2043,7 +2041,7 @@ void TupleBPS::makeJobs(vector* jobs) continue; } - if (!processPseudoColFilters(i, dbRootPMMap)) + if (!processPseudoColFilters(i, oamCache)) { fNumBlksSkipped += lbidsToScan; continue; @@ -2066,20 +2064,19 @@ void TupleBPS::makeJobs(vector* jobs) throw IDBExcept(ERR_LOCAL_QUERY_UM); } - if (dbRootPMMap->find(scannedExtents[i].dbRoot)->second != localPMId) + if (!oamCache->isAccessibleBy(scannedExtents[i].dbRoot, localPMId)) continue; } // a necessary DB root is offline - if (dbRootConnectionMap->find(scannedExtents[i].dbRoot) == dbRootConnectionMap->end()) + if (oamCache->isOffline(scannedExtents[i].dbRoot)) { // MCOL-259 force a reload of the xml. This usualy fixes it. Logger log; log.logMessage(logging::LOG_TYPE_WARNING, "forcing reload of columnstore.xml for dbRootConnectionMap"); oamCache->forceReload(); - dbRootConnectionMap = oamCache->getDBRootToConnectionMap(); - if (dbRootConnectionMap->find(scannedExtents[i].dbRoot) == dbRootConnectionMap->end()) + if (oamCache->isOffline(scannedExtents[i].dbRoot)) { log.logMessage(logging::LOG_TYPE_WARNING, "dbroot still not in dbRootConnectionMap"); throw IDBExcept(ERR_DATA_OFFLINE); @@ -2106,9 +2103,10 @@ void TupleBPS::makeJobs(vector* jobs) fBPP->setLBID(startingLBID, scannedExtents[i]); fBPP->setCount(blocksThisJob); bs.reset(new ByteStream()); - fBPP->runBPP(*bs, (*dbRootConnectionMap)[scannedExtents[i].dbRoot], isExeMgrDEC); + int connIndex = oamCache->getClosestConnection(scannedExtents[i].dbRoot); + fBPP->runBPP(*bs, connIndex, isExeMgrDEC); jobs->push_back( - Job(scannedExtents[i].dbRoot, (*dbRootConnectionMap)[scannedExtents[i].dbRoot], blocksThisJob, bs)); + Job(scannedExtents[i].dbRoot, connIndex, blocksThisJob, bs)); blocksToScan -= blocksThisJob; startingLBID += fColType.colWidth * blocksThisJob; fBPP->reset(); diff --git a/dbcon/mysql/is_columnstore_files.cpp b/dbcon/mysql/is_columnstore_files.cpp index 19aea021b..9c29a19b7 100644 --- a/dbcon/mysql/is_columnstore_files.cpp +++ b/dbcon/mysql/is_columnstore_files.cpp @@ -33,6 +33,7 @@ #include "we_brm.h" #include "bytestream.h" #include "liboamcpp.h" +#include "oamcache.h" #include "messagequeue.h" #include "messagequeuepool.h" #include "we_messages.h" @@ -119,7 +120,7 @@ static int generate_result(BRM::OID_t oid, BRM::DBRM* emp, TABLE* table, THD* th off_t compressedFileSize = 0; we_config.initConfigCache(); messageqcpp::MessageQueueClient* msgQueueClient; - oam::Oam oam_instance; + oam::OamCache* oamcache = oam::OamCache::makeOamCache(); int pmId = 0; int rc; @@ -141,7 +142,7 @@ static int generate_result(BRM::OID_t oid, BRM::DBRM* emp, TABLE* table, THD* th try { - oam_instance.getDbrootPmConfig(iter->dbRoot, pmId); + pmId = oamcache->getOwnerPM(iter->dbRoot); } catch (std::runtime_error&) { diff --git a/dmlproc/batchinsertprocessor.cpp b/dmlproc/batchinsertprocessor.cpp index 785a31225..d53183a26 100644 --- a/dmlproc/batchinsertprocessor.cpp +++ b/dmlproc/batchinsertprocessor.cpp @@ -230,6 +230,15 @@ void BatchInsertProc::buildLastPkg(messageqcpp::ByteStream& bs) bs << rt; } +uint32_t BatchInsertProc::selectNextPM() +{ + uint32_t pm; + do + { + pm = fBatchLoader->selectNextPM(); + } while (pm != 0 && fWEClient->isConnectionReadonly(pm)); + return pm; +} void BatchInsertProc::sendFirstBatch() { uint32_t firstPmId = 0; @@ -237,7 +246,7 @@ void BatchInsertProc::sendFirstBatch() try { - firstPmId = fBatchLoader->selectNextPM(); + firstPmId = selectNextPM(); } catch (std::exception& ex) { @@ -268,7 +277,7 @@ void BatchInsertProc::sendNextBatch() try { - fCurrentPMid = fBatchLoader->selectNextPM(); + fCurrentPMid = selectNextPM(); } catch (std::exception& ex) { diff --git a/dmlproc/batchinsertprocessor.h b/dmlproc/batchinsertprocessor.h index a950b0986..ddb2a229e 100644 --- a/dmlproc/batchinsertprocessor.h +++ b/dmlproc/batchinsertprocessor.h @@ -68,6 +68,7 @@ class BatchInsertProc void setHwm(); void receiveAllMsg(); void receiveOutstandingMsg(); + uint32_t selectNextPM(); private: SP_PKG fInsertPkgQueue; diff --git a/dmlproc/dmlproc.cpp b/dmlproc/dmlproc.cpp index 0bc59e2e0..b60767fd7 100644 --- a/dmlproc/dmlproc.cpp +++ b/dmlproc/dmlproc.cpp @@ -253,7 +253,6 @@ void rollbackAll(DBRM* dbrm) message5.format(args5); ml.logInfoMessage(message5); OamCache* oamcache = OamCache::makeOamCache(); - OamCache::dbRootPMMap_t dbRootPMMap = oamcache->getDBRootToPMMap(); int errorTxn = 0; for (i = 0; i < tableLocks.size(); i++) @@ -393,7 +392,7 @@ void rollbackAll(DBRM* dbrm) try { - rollbackProcessor.processBulkRollback(tableLocks[i], dbrm, uniqueId, dbRootPMMap, lockReleased); + rollbackProcessor.processBulkRollback(tableLocks[i], dbrm, uniqueId, oamcache, lockReleased); ostringstream oss; oss << "DMLProc started bulkrollback on table OID " << tableLocks[i].tableOID << " and table lock id " << tableLocks[i].id << " finished and tablelock is released."; diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index e905095e5..974d6ce5c 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -1155,8 +1155,6 @@ void PackageHandler::run() logging::Message message(1); args.add("dmlprocessor.cpp PackageHandler::run() package type"); args.add((uint64_t)fPackageType); - args.add(" ,transaction ID: "); - args.add(fTxnid); args.add(e.what()); message.format(args); ml.logErrorMessage(message); @@ -1404,7 +1402,9 @@ void DMLProcessor::operator()() messageqcpp::ByteStream::byte status = 255; messageqcpp::ByteStream::octbyte rowCount = 0; - if (fDbrm->getSystemState(stateFlags) > + int rr = fDbrm->getSystemState(stateFlags); + + if (rr > 0) // > 0 implies succesful retrieval. It doesn't imply anything about the contents { messageqcpp::ByteStream results; @@ -1845,7 +1845,7 @@ void DMLProcessor::operator()() void RollbackTransactionProcessor::processBulkRollback(BRM::TableLockInfo lockInfo, BRM::DBRM* dbrm, uint64_t uniqueId, - OamCache::dbRootPMMap_t& dbRootPMMap, + OamCache* oamcache, bool& lockReleased) { // Take over ownership of stale lock. @@ -1884,7 +1884,7 @@ void RollbackTransactionProcessor::processBulkRollback(BRM::TableLockInfo lockIn for (uint32_t i = 0; i < lockInfo.dbrootList.size(); i++) { - pmId = (*dbRootPMMap)[lockInfo.dbrootList[i]]; + pmId = oamcache->getOwnerPM(lockInfo.dbrootList[i]); pmSet.insert(pmId); } diff --git a/dmlproc/dmlprocessor.h b/dmlproc/dmlprocessor.h index 3492bcfa5..6d020770b 100644 --- a/dmlproc/dmlprocessor.h +++ b/dmlproc/dmlprocessor.h @@ -321,7 +321,7 @@ class RollbackTransactionProcessor : public dmlpackageprocessor::DMLPackageProce } void processBulkRollback(BRM::TableLockInfo lockInfo, BRM::DBRM* dbrm, uint64_t uniqueId, - oam::OamCache::dbRootPMMap_t& dbRootPMMap, bool& lockReleased); + oam::OamCache* oamcache, bool& lockReleased); protected: private: diff --git a/oam/oamcpp/liboamcpp.cpp b/oam/oamcpp/liboamcpp.cpp index 60b704780..5cc5856a9 100644 --- a/oam/oamcpp/liboamcpp.cpp +++ b/oam/oamcpp/liboamcpp.cpp @@ -684,12 +684,14 @@ void Oam::getPmDbrootConfig(const int pmid, DBRootConfigList& dbrootconfiglist) * Get DBRoot - PM Config data * ********************************************************************/ -void Oam::getDbrootPmConfig(const int dbrootid, int& pmid) +void Oam::getDbrootPmConfig(const int dbrootid, set& pmids) { SystemModuleTypeConfig systemmoduletypeconfig; ModuleTypeConfig moduletypeconfig; ModuleConfig moduleconfig; + pmids.clear(); + try { getSystemConfig(systemmoduletypeconfig); @@ -716,13 +718,16 @@ void Oam::getDbrootPmConfig(const int dbrootid, int& pmid) { if (*pt1 == dbrootid) { - pmid = (*pt).DeviceID; - return; + pmids.insert((*pt).DeviceID); } } } } } + if (!pmids.empty()) + { + return; + } // dbrootid not found, return with error exceptionControl("getDbrootPmConfig", API_INVALID_PARAMETER); diff --git a/oam/oamcpp/liboamcpp.h b/oam/oamcpp/liboamcpp.h index 8ab935b8e..6f9e41c99 100644 --- a/oam/oamcpp/liboamcpp.h +++ b/oam/oamcpp/liboamcpp.h @@ -384,7 +384,7 @@ class Oam /** *@brief Get DBRoot - PM Config data */ - EXPORT void getDbrootPmConfig(const int dbrootid, int& pmid); + EXPORT void getDbrootPmConfig(const int dbrootid, set& pmid); /** *@brief Get System DBRoot Config data diff --git a/oam/oamcpp/oamcache.cpp b/oam/oamcpp/oamcache.cpp index 607c45528..3d53a49f7 100644 --- a/oam/oamcpp/oamcache.cpp +++ b/oam/oamcpp/oamcache.cpp @@ -39,7 +39,6 @@ using namespace boost; namespace oam { - struct CacheReloaded { CacheReloaded() @@ -56,11 +55,20 @@ OamCache* OamCache::makeOamCache() return &cache.oamcache; } +static bool isWESConfigured(config::Config* config, int moduleID) +{ + char buff[200]; + snprintf(buff, sizeof(buff), "pm%u_WriteEngineServer", moduleID); + string fServer(buff); + // Check if WES IP address record exists in the config (if not, this is a read-only node) + std::string otherEndDnOrIPStr = config->getConfig(fServer, "IPAddr"); + return !(otherEndDnOrIPStr.empty() || otherEndDnOrIPStr == "unassigned"); +} void OamCache::checkReload() { Oam oam; config::Config* config = config::Config::makeConfig(); - int temp; + set temp; if (config->getCurrentMTime() == mtime) return; @@ -73,7 +81,7 @@ void OamCache::checkReload() idbassert(txt != ""); numDBRoots = config->fromText(txt); - dbRootPMMap.reset(new map()); + dbRootPMMap.reset(new map>()); // cerr << "reloading oamcache\n"; for (uint32_t i = 0; i < dbroots.size(); i++) @@ -88,39 +96,32 @@ void OamCache::checkReload() oam.getSystemConfig("pm", moduletypeconfig); int moduleID = 0; + rwPMs.clear(); for (unsigned i = 0; i < moduletypeconfig.ModuleCount; i++) { moduleID = atoi((moduletypeconfig.ModuleNetworkList[i]) .DeviceName.substr(MAX_MODULE_TYPE_SIZE, MAX_MODULE_ID_SIZE) .c_str()); uniquePids.insert(moduleID); + if (isWESConfigured(config, moduleID)) + { + rwPMs.insert(moduleID); + } } std::set::const_iterator it = uniquePids.begin(); moduleIds.clear(); uint32_t i = 0; - map pmToConnectionMap; + pmConnectionMap.clear(); // Restore for Windows when we support multiple PMs while (it != uniquePids.end()) { - pmToConnectionMap[*it] = i++; + pmConnectionMap[*it] = i++; moduleIds.push_back(*it); it++; } - dbRootConnectionMap.reset(new map()); - - for (i = 0; i < dbroots.size(); i++) - { - map::iterator pmIter = pmToConnectionMap.find((*dbRootPMMap)[dbroots[i]]); - - if (pmIter != pmToConnectionMap.end()) - { - (*dbRootConnectionMap)[dbroots[i]] = (*pmIter).second; - } - } - pmDbrootsMap.reset(new OamCache::PMDbrootsMap_t::element_type()); systemStorageInfo_t t; t = oam.getStorageConfig(); @@ -142,21 +143,18 @@ void OamCache::checkReload() tm = oam.getModuleInfo(); OAMParentModuleName = boost::get<3>(tm); systemName = config->getConfig("SystemConfig", "SystemName"); -} + dbRootConnectionMap.clear(); -OamCache::dbRootPMMap_t OamCache::getDBRootToPMMap() -{ - return dbRootPMMap; -} + for (i = 0; i < dbroots.size(); i++) + { + auto pmIter = pmConnectionMap.find(getOwnerPM(dbroots[i])); -OamCache::dbRootPMMap_t OamCache::getDBRootToConnectionMap() -{ - return dbRootConnectionMap; -} + if (pmIter != pmConnectionMap.end()) + { + dbRootConnectionMap[dbroots[i]] = (*pmIter).second; + } + } -OamCache::PMDbrootsMap_t OamCache::getPMToDbrootsMap() -{ - return pmDbrootsMap; } uint32_t OamCache::getDBRootCount() @@ -237,4 +235,77 @@ string OamCache::getModuleName() return moduleName; } +bool OamCache::isAccessibleBy(int dbRoot, int pmId) +{ + return (*dbRootPMMap)[dbRoot].contains(pmId); +} + +bool OamCache::isOffline(int dbRoot) +{ + return dbRootConnectionMap.find(dbRoot) == dbRootConnectionMap.end(); +} + +int OamCache::getClosestPM(int dbroot) // who can access dbroot's records for read requests - either owner or us. +{ + if ((*dbRootPMMap)[dbroot].contains(mLocalPMId)) + { + return mLocalPMId; + } + for(auto j : (*dbRootPMMap)[dbroot]) + { + int pm = j; + if (rwPMs.contains(pm)) + { + return pm; + } + } + idbassert_s(0, "dbroot " << dbroot << " has empty set of PM's"); +} + +int OamCache::getClosestConnection(int dbroot) // connection index to owner's PM or ours PM - who can access dbRoot. +{ + return pmConnectionMap[getClosestPM(dbroot)]; +} + +int OamCache::getOwnerConnection(int dbroot) // connection index to owner's PM. +{ + return pmConnectionMap[getOwnerPM(dbroot)]; +} + +int OamCache::getOwnerPM(int dbroot) // Owner's PM index. +{ + for(auto j : (*dbRootPMMap)[dbroot]) + { + int pm = j; + if (rwPMs.contains(pm)) + { + return pm; + } + } + idbassert_s(0, "cannot find owner for dbroot " << dbroot); +} + +std::vector OamCache::getPMDBRoots(int PM) // what DBRoots are owned by given PM. +{ + std::vector result; + for (const auto& dbroot : (*dbRootPMMap)) + { + if (dbroot.second.find(PM) != dbroot.second.end()) + { + result.push_back(dbroot.first); + } + } + return result; +} + +std::vector OamCache::getAllDBRoots() // get all DBRoots. +{ + std::vector result; + for (const auto& dbroot : (*dbRootPMMap)) + { + result.push_back(dbroot.first); + } + return result; +} + } /* namespace oam */ diff --git a/oam/oamcpp/oamcache.h b/oam/oamcpp/oamcache.h index 17f432219..34eb115ac 100644 --- a/oam/oamcpp/oamcache.h +++ b/oam/oamcpp/oamcache.h @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -31,7 +32,7 @@ namespace oam class OamCache { public: - typedef boost::shared_ptr > dbRootPMMap_t; + typedef boost::shared_ptr> > dbRootPMMap_t; typedef std::vector dbRoots; typedef boost::shared_ptr > PMDbrootsMap_t; virtual ~OamCache() = default; @@ -42,9 +43,14 @@ class OamCache mtime = 0; } - dbRootPMMap_t getDBRootToPMMap(); - dbRootPMMap_t getDBRootToConnectionMap(); - PMDbrootsMap_t getPMToDbrootsMap(); + int getClosestPM(int dbroot); // who can access dbroot's records for read requests - either owner or us. + int getClosestConnection(int dbroot); // connection index to owner's PM or ours PM - who can access dbRoot. + int getOwnerConnection(int dbroot); // connection index to owner's PM. + int getOwnerPM(int dbroot); // Owner's PM index. + std::vector getPMDBRoots(int PM); // what DBRoots are owned by given PM. + std::vector getAllDBRoots(); // get all DBRoots. + bool isAccessibleBy(int dbRoot, int pmId); + bool isOffline(int dbRoot); // not registered in map. uint32_t getDBRootCount(); DBRootConfigList& getDBRootNums(); std::vector& getModuleIds(); @@ -61,7 +67,7 @@ class OamCache OamCache& operator=(const OamCache&) const = delete; dbRootPMMap_t dbRootPMMap; - dbRootPMMap_t dbRootConnectionMap; + map dbRootConnectionMap; PMDbrootsMap_t pmDbrootsMap; uint32_t numDBRoots = 1; time_t mtime = 0; @@ -71,6 +77,8 @@ class OamCache int mLocalPMId = 0; // The PM id running on this machine std::string systemName; std::string moduleName; + map pmConnectionMap; + set rwPMs; }; } // namespace oam diff --git a/tools/cleartablelock/cleartablelock.cpp b/tools/cleartablelock/cleartablelock.cpp index 361d354a6..5620bbb56 100644 --- a/tools/cleartablelock/cleartablelock.cpp +++ b/tools/cleartablelock/cleartablelock.cpp @@ -159,19 +159,22 @@ int constructPMList(const std::vector& dbRootList, std::vector pmSet; // used to filter out duplicates - int pm; + std::set pms; for (unsigned j = 0; j < dbRootList.size(); j++) { dbRoot = dbRootList[j]; - oam.getDbrootPmConfig(dbRootList[j], pm); - pmSet.insert(pm); + oam.getDbrootPmConfig(dbRootList[j], pms); + for (auto pm : pms) + { + pmSet.insert(pm); + } } // Store unique set of PM IDs into output vector - for (std::set::const_iterator iter = pmSet.begin(); iter != pmSet.end(); ++iter) + for (auto pm : pmSet) { - pmList.push_back(*iter); + pmList.push_back(pm); } } catch (std::exception& ex) diff --git a/utils/batchloader/batchloader.cpp b/utils/batchloader/batchloader.cpp index 0e9bfe836..a1e7697d7 100644 --- a/utils/batchloader/batchloader.cpp +++ b/utils/batchloader/batchloader.cpp @@ -48,31 +48,11 @@ BatchLoader::BatchLoader(uint32_t tableOid, execplan::CalpontSystemCatalog::SCN fPMs = PMs; fSessionId = sessionId; fTableOid = tableOid; - OamCache* oamcache = OamCache::makeOamCache(); - oam::OamCache::PMDbrootsMap_t systemPmDbrootMap = oamcache->getPMToDbrootsMap(); - std::map::iterator iter = systemPmDbrootMap->begin(); - // cout << "fPMs size is " << fPMs.size() << endl; - fPmDbrootMap.reset(new OamCache::PMDbrootsMap_t::element_type()); - fDbrootPMmap.reset(new map()); - - for (uint32_t i = 0; i < fPMs.size(); i++) + fOamCache = OamCache::makeOamCache(); + auto allDBRoots = fOamCache->getAllDBRoots(); + for (auto dbr : allDBRoots) { - iter = systemPmDbrootMap->find(fPMs[i]); - - if (iter != systemPmDbrootMap->end()) - { - fDbRoots.insert(fDbRoots.end(), (iter->second).begin(), (iter->second).end()); - (*fPmDbrootMap)[fPMs[i]] = iter->second; - } - } - - // Build dbroot to PM map - for (iter = fPmDbrootMap->begin(); iter != fPmDbrootMap->end(); iter++) - { - for (uint32_t i = 0; i < iter->second.size(); i++) - { - (*fDbrootPMmap)[iter->second[i]] = iter->first; - } + fDbRoots.push_back(dbr); } } //------------------------------------------------------------------------------ @@ -199,12 +179,7 @@ void BatchLoader::selectFirstPM(uint32_t& PMId) if (createdDbroot != 0) { - std::map::iterator iter = fDbrootPMmap->begin(); - - iter = fDbrootPMmap->find(createdDbroot); - - if (iter != fDbrootPMmap->end()) - PMId = iter->second; + PMId = fOamCache->getOwnerPM(createdDbroot); } // This will build the batch distribution sequence @@ -233,12 +208,12 @@ void BatchLoader::selectFirstPM(uint32_t& PMId) { PMRootInfo aEntry; aEntry.PMId = fPmDistSeq[j]; - iter = fPmDbrootMap->find(aEntry.PMId); + auto dbroots = fOamCache->getPMDBRoots(PMId); - for (unsigned k = 0; k < (iter->second).size(); k++) + for (unsigned k = 0; k < dbroots.size(); k++) { RootExtentsBlocks aRootInfo; - aRootInfo.DBRoot = (iter->second)[k]; + aRootInfo.DBRoot = dbroots[k]; aRootInfo.numExtents = rootExtents[aRootInfo.DBRoot]; aRootInfo.numBlocks = rootBlocks[aRootInfo.DBRoot]; // cout << "aRootInfo DBRoot:numExtents:numBlocks = " << @@ -410,17 +385,15 @@ void BatchLoader::buildBatchDistSeqVector() fPmDistSeq.clear(); BlIntVec aDbCntVec(fPMs.size()); - std::map::iterator iter = fPmDbrootMap->begin(); - for (uint32_t i = 0; i < fPMs.size(); i++) { - iter = fPmDbrootMap->find(fPMs[i]); + auto dbroots = fOamCache->getPMDBRoots(fPMs[i]); - if ((iter != fPmDbrootMap->end()) && ((iter->second).begin() != (iter->second).end())) + if (dbroots.size() > 0) { try { - aDbCntVec[i] = (iter->second).size(); + aDbCntVec[i] = dbroots.size(); // cout << "PM - "<& dbRootVec) { oam::OamCache* oamcache = oam::OamCache::makeOamCache(); - oam::OamCache::PMDbrootsMap_t pmDbroots = oamcache->getPMToDbrootsMap(); - dbRootVec.clear(); - dbRootVec = (*pmDbroots)[pm]; + dbRootVec = oamcache->getPMDBRoots(pm); } DBRootVec ExtentMap::getAllDbRoots() @@ -6189,13 +6187,11 @@ DBRootVec ExtentMap::getAllDbRoots() DBRootVec dbRootResultVec; oam::OamCache* oamcache = oam::OamCache::makeOamCache(); // NB The routine uses int for dbroot id that contradicts with the type used here, namely uint16_t - oam::OamCache::PMDbrootsMap_t pmDbroots = oamcache->getPMToDbrootsMap(); - auto& pmDbrootsRef = *pmDbroots; + auto pmDbroots = oamcache->getAllDBRoots(); - for (auto& pmDBRootPair : pmDbrootsRef) + for (auto& DBRoot : pmDbroots) { - for (auto dbRootId : pmDBRootPair.second) - dbRootResultVec.push_back(dbRootId); + dbRootResultVec.push_back(DBRoot); } return dbRootResultVec; } diff --git a/writeengine/client/we_clients.cpp b/writeengine/client/we_clients.cpp index 073d13744..223947ed5 100644 --- a/writeengine/client/we_clients.cpp +++ b/writeengine/client/we_clients.cpp @@ -505,8 +505,9 @@ void WEClients::write(const messageqcpp::ByteStream& msg, uint32_t connection) fPmConnections[connection]->write(msg); else { + // new behavior: connection client is nullptr means it is read-only. ostringstream os; - os << "Lost connection to WriteEngineServer on pm" << connection; + os << "Connection to readonly pm" << connection; throw runtime_error(os.str()); } } diff --git a/writeengine/client/we_ddlcommandclient.cpp b/writeengine/client/we_ddlcommandclient.cpp index e17a0604d..03c0bd7ac 100644 --- a/writeengine/client/we_ddlcommandclient.cpp +++ b/writeengine/client/we_ddlcommandclient.cpp @@ -37,6 +37,7 @@ namespace WriteEngine WE_DDLCommandClient::WE_DDLCommandClient() { fWEClient = new WEClients(WEClients::DDLPROC); + fOamCache = oam::OamCache::makeOamCache(); } WE_DDLCommandClient::~WE_DDLCommandClient() @@ -64,7 +65,7 @@ uint8_t WE_DDLCommandClient::UpdateSyscolumnNextval(uint32_t columnOid, uint64_t try { - fOam.getDbrootPmConfig(dbRoot, pmNum); + pmNum = fOamCache->getOwnerPM(dbRoot); fWEClient->write(command, pmNum); while (1) diff --git a/writeengine/client/we_ddlcommandclient.h b/writeengine/client/we_ddlcommandclient.h index b4ef7ce4d..98ee0204a 100644 --- a/writeengine/client/we_ddlcommandclient.h +++ b/writeengine/client/we_ddlcommandclient.h @@ -26,6 +26,7 @@ #include "dbrm.h" #include "liboamcpp.h" #include "writeengine.h" +#include "oamcache.h" #define EXPORT @@ -50,7 +51,7 @@ class WE_DDLCommandClient private: BRM::DBRM fDbrm; WEClients* fWEClient; - oam::Oam fOam; + oam::OamCache* fOamCache; }; } // namespace WriteEngine diff --git a/writeengine/redistribute/we_redistributecontrolthread.cpp b/writeengine/redistribute/we_redistributecontrolthread.cpp index 7caa14b05..c62a53ae7 100644 --- a/writeengine/redistribute/we_redistributecontrolthread.cpp +++ b/writeengine/redistribute/we_redistributecontrolthread.cpp @@ -758,8 +758,7 @@ int RedistributeControlThread::executeRedistributePlan() int RedistributeControlThread::connectToWes(int dbroot) { int ret = 0; - OamCache::dbRootPMMap_t dbrootToPM = fOamCache->getDBRootToPMMap(); - int pmId = (*dbrootToPM)[dbroot]; + int pmId = fOamCache->getOwnerPM(dbroot); ostringstream oss; oss << "pm" << pmId << "_WriteEngineServer"; diff --git a/writeengine/redistribute/we_redistributeworkerthread.cpp b/writeengine/redistribute/we_redistributeworkerthread.cpp index 1cd33c8a7..71fbb8425 100644 --- a/writeengine/redistribute/we_redistributeworkerthread.cpp +++ b/writeengine/redistribute/we_redistributeworkerthread.cpp @@ -134,11 +134,10 @@ void RedistributeWorkerThread::handleRequest() { memcpy(&fPlanEntry, fBs.buf(), sizeof(RedistributePlanEntry)); fBs.advance(sizeof(RedistributePlanEntry)); - OamCache::dbRootPMMap_t dbrootToPM = fOamCache->getDBRootToPMMap(); fMyId.first = fPlanEntry.source; - fMyId.second = (*dbrootToPM)[fMyId.first]; + fMyId.second = fOamCache->getOwnerPM(fMyId.first); fPeerId.first = fPlanEntry.destination; - fPeerId.second = (*dbrootToPM)[fPeerId.first]; + fPeerId.second = fOamCache->getOwnerPM(fPeerId.first); if (grabTableLock() == 0) { diff --git a/writeengine/shared/we_define.cpp b/writeengine/shared/we_define.cpp index 49bfeac18..382b49456 100644 --- a/writeengine/shared/we_define.cpp +++ b/writeengine/shared/we_define.cpp @@ -154,7 +154,7 @@ WErrorCodes::WErrorCodes() : fErrorCodes() fErrorCodes[ERR_BULK_SEND_MSG_ERR] = " in a bulk load send msg"; fErrorCodes[ERR_BULK_MISSING_EXTENT_ENTRY] = " missing Extent Entry when trying to save LBID info for CP"; fErrorCodes[ERR_BULK_MISSING_EXTENT_ROW] = " missing Extent Row when trying to save LBID info for CP"; - fErrorCodes[ERR_BULK_ROW_FILL_BUFFER] = " Single row fills read buffer; try larger read buffer via -c flag in cpimport"; + fErrorCodes[ERR_BULK_ROW_FILL_BUFFER] = " Single row fills read buffer; try larger read buffer."; fErrorCodes[ERR_BULK_DBROOT_CHANGE] = " Local PM DBRoot settings changed during bulk load."; fErrorCodes[ERR_BULK_ROLLBACK_MISS_ROOT] = " Mode3 automatic rollback not performed. DBRoot missing."; fErrorCodes[ERR_BULK_ROLLBACK_SEG_LIST] = " Error building segment file list in a directory."; @@ -284,7 +284,8 @@ std::string WErrorCodes::errorString(int code) case ERR_FILE_DISK_SPACE: { logging::Message::Args args; - args.add("configured by WriteEngine.MaxFileSystemDiskUsagePct in Columnstore.xml"); + std::string msgArg; // empty str arg; no extra info in this context + args.add(msgArg); return logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_EXTENT_DISK_SPACE, args); break; }