From 5aa2a824c25a46d70453eabe2f7594939efd53ce Mon Sep 17 00:00:00 2001 From: Serguey Zefirov Date: Thu, 10 Jul 2025 11:31:32 +0000 Subject: [PATCH] feat(MCOL-6082): Multiple readers of dbroots using OamCache logic This patch introduces centralized logic of selecting what dbroot is accessible in PrimProc on what node. The logic is in OamCache for time being and can be moved later. --- dbcon/ddlpackageproc/altertableprocessor.cpp | 31 ++--- dbcon/ddlpackageproc/createtableprocessor.cpp | 7 +- dbcon/ddlpackageproc/ddlpackageprocessor.cpp | 3 +- dbcon/ddlpackageproc/droptableprocessor.cpp | 8 +- .../commandpackageprocessor.cpp | 9 +- .../dmlpackageproc/deletepackageprocessor.cpp | 2 +- dbcon/dmlpackageproc/dmlpackageprocessor.h | 6 +- .../dmlpackageproc/insertpackageprocessor.cpp | 3 +- .../dmlpackageproc/updatepackageprocessor.cpp | 8 +- dbcon/joblist/pdictionaryscan.cpp | 15 +- dbcon/joblist/primitivestep.h | 4 +- dbcon/joblist/tuple-bps.cpp | 22 ++- dbcon/mysql/is_columnstore_files.cpp | 5 +- dmlproc/batchinsertprocessor.cpp | 13 +- dmlproc/batchinsertprocessor.h | 1 + dmlproc/dmlproc.cpp | 3 +- dmlproc/dmlprocessor.cpp | 10 +- dmlproc/dmlprocessor.h | 2 +- oam/oamcpp/liboamcpp.cpp | 11 +- oam/oamcpp/liboamcpp.h | 2 +- oam/oamcpp/oamcache.cpp | 129 ++++++++++++++---- oam/oamcpp/oamcache.h | 18 ++- tools/cleartablelock/cleartablelock.cpp | 13 +- utils/batchloader/batchloader.cpp | 57 ++------ utils/batchloader/batchloader.h | 3 +- versioning/BRM/dbrm.cpp | 3 +- versioning/BRM/extentmap.cpp | 16 +-- writeengine/client/we_clients.cpp | 3 +- writeengine/client/we_ddlcommandclient.cpp | 3 +- writeengine/client/we_ddlcommandclient.h | 3 +- .../we_redistributecontrolthread.cpp | 3 +- .../we_redistributeworkerthread.cpp | 5 +- writeengine/shared/we_define.cpp | 5 +- 33 files changed, 232 insertions(+), 194 deletions(-) diff --git a/dbcon/ddlpackageproc/altertableprocessor.cpp b/dbcon/ddlpackageproc/altertableprocessor.cpp index 29e27a97f..92cffe90d 100644 --- a/dbcon/ddlpackageproc/altertableprocessor.cpp +++ b/dbcon/ddlpackageproc/altertableprocessor.cpp @@ -761,8 +761,7 @@ void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemC int pmNum = 1; OamCache* oamcache = OamCache::makeOamCache(); - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); boost::shared_ptr bsIn; // Will create files on each PM as needed. @@ -925,7 +924,7 @@ void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemC if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot "); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); bs.restart(); bs << (ByteStream::byte)WE_SVR_UPDATE_SYSTABLE_AUTO; bs << uniqueId; @@ -1317,8 +1316,7 @@ void AlterTableProcessor::dropColumn(uint32_t sessionID, execplan::CalpontSystem boost::shared_ptr bsIn; OamCache* oamcache = OamCache::makeOamCache(); - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); // MCOL-66 The DBRM can't handle concurrent DDL boost::mutex::scoped_lock lk(dbrmMutex); @@ -1379,7 +1377,7 @@ void AlterTableProcessor::dropColumn(uint32_t sessionID, execplan::CalpontSystem if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot "); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); bytestream.restart(); bytestream << (ByteStream::byte)WE_SVR_UPDATE_SYSTABLE_AUTO; bytestream << uniqueId; @@ -1452,7 +1450,7 @@ void AlterTableProcessor::dropColumn(uint32_t sessionID, execplan::CalpontSystem if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot "); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); try { @@ -1706,8 +1704,7 @@ void AlterTableProcessor::setColumnDefault(uint32_t sessionID, execplan::Calpont int pmNum = 1; OamCache* oamcache = OamCache::makeOamCache(); - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); boost::shared_ptr bsIn; @@ -1795,8 +1792,7 @@ void AlterTableProcessor::dropColumnDefault(uint32_t sessionID, execplan::Calpon int pmNum = 1; OamCache* oamcache = OamCache::makeOamCache(); - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); boost::shared_ptr bsIn; @@ -2001,8 +1997,7 @@ void AlterTableProcessor::renameTable(uint32_t sessionID, execplan::CalpontSyste boost::shared_ptr bsIn; OamCache* oamcache = OamCache::makeOamCache(); - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); try { @@ -2067,7 +2062,7 @@ void AlterTableProcessor::renameTable(uint32_t sessionID, execplan::CalpontSyste if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot"); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); try { @@ -2131,8 +2126,7 @@ void AlterTableProcessor::tableComment(uint32_t sessionID, execplan::CalpontSyst int pmNum = 1; rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot); OamCache* oamcache = OamCache::makeOamCache(); - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot"); @@ -2354,8 +2348,7 @@ void AlterTableProcessor::renameColumn(uint32_t sessionID, execplan::CalpontSyst int pmNum = 1; OamCache* oamcache = OamCache::makeOamCache(); - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); boost::shared_ptr bsIn; CalpontSystemCatalog::TableName tableName; @@ -2539,7 +2532,7 @@ void AlterTableProcessor::renameColumn(uint32_t sessionID, execplan::CalpontSyst if (rc != 0) throw std::runtime_error("Error while calling getSysCatDBRoot"); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); // send to WES to process try diff --git a/dbcon/ddlpackageproc/createtableprocessor.cpp b/dbcon/ddlpackageproc/createtableprocessor.cpp index 0b9f68f00..cd61df7a8 100644 --- a/dbcon/ddlpackageproc/createtableprocessor.cpp +++ b/dbcon/ddlpackageproc/createtableprocessor.cpp @@ -326,8 +326,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackageInternal( bytestream << (uint32_t)dbRoot; tableDef.serialize(bytestream); boost::shared_ptr bsIn; - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); // MCOL-66 The DBRM can't handle concurrent DDL boost::mutex::scoped_lock lk(dbrmMutex); @@ -450,7 +449,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackageInternal( bytestream << (uint32_t)dbRoot; tableDef.serialize(bytestream); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); try { @@ -666,7 +665,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackageInternal( return result; } - pmNum = (*dbRootPMMap)[useDBRoot]; + pmNum = oamcache->getOwnerPM(useDBRoot); try { diff --git a/dbcon/ddlpackageproc/ddlpackageprocessor.cpp b/dbcon/ddlpackageproc/ddlpackageprocessor.cpp index e4b8786d8..7dc815de0 100644 --- a/dbcon/ddlpackageproc/ddlpackageprocessor.cpp +++ b/dbcon/ddlpackageproc/ddlpackageprocessor.cpp @@ -611,8 +611,7 @@ void DDLPackageProcessor::createFiles(CalpontSystemCatalog::TableName aTableName try { OamCache* oamcache = OamCache::makeOamCache(); - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - int pmNum = (*dbRootPMMap)[useDBRoot]; + int pmNum = oamcache->getOwnerPM(useDBRoot); fWEClient->write(bytestream, (uint32_t)pmNum); bsIn.reset(new ByteStream()); diff --git a/dbcon/ddlpackageproc/droptableprocessor.cpp b/dbcon/ddlpackageproc/droptableprocessor.cpp index cd259f586..b8d67fe7e 100644 --- a/dbcon/ddlpackageproc/droptableprocessor.cpp +++ b/dbcon/ddlpackageproc/droptableprocessor.cpp @@ -359,8 +359,7 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackageInternal(ddlpack return result; } - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); try { @@ -464,7 +463,7 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackageInternal(ddlpack return result; } - pmNum = (*dbRootPMMap)[dbRoot]; + pmNum = oamcache->getOwnerPM(dbRoot); try { @@ -1275,8 +1274,7 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackageInternal(ddlpa bytestream << (uint32_t)colType.compressionType; } - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[useDBRoot]; + pmNum = oamcache->getOwnerPM(useDBRoot); try { diff --git a/dbcon/dmlpackageproc/commandpackageprocessor.cpp b/dbcon/dmlpackageproc/commandpackageprocessor.cpp index 7c3c4794d..e57a80a54 100644 --- a/dbcon/dmlpackageproc/commandpackageprocessor.cpp +++ b/dbcon/dmlpackageproc/commandpackageprocessor.cpp @@ -845,20 +845,15 @@ void CommandPackageProcessor::clearTableLock(uint64_t uniqueId, const dmlpackage establishTableLockToClear(tableLockID, lockInfo); lockGrabbed = true; - oam::OamCache* oamCache = oam::OamCache::makeOamCache(); - oam::OamCache::dbRootPMMap_t dbRootPmMap = oamCache->getDBRootToPMMap(); - std::map::const_iterator mapIter; std::set pmSet; // Construct relevant list of PMs based on the DBRoots associated // with the tableLock. for (unsigned int k = 0; k < lockInfo.dbrootList.size(); k++) { - mapIter = dbRootPmMap->find(lockInfo.dbrootList[k]); - - if (mapIter != dbRootPmMap->end()) + if (!oamcache()->isOffline(lockInfo.dbrootList[k])) { - int pm = mapIter->second; + int pm = oamcache()->getOwnerPM(lockInfo.dbrootList[k]); pmSet.insert(pm); } else diff --git a/dbcon/dmlpackageproc/deletepackageprocessor.cpp b/dbcon/dmlpackageproc/deletepackageprocessor.cpp index 2653fd68d..033f709c2 100644 --- a/dbcon/dmlpackageproc/deletepackageprocessor.cpp +++ b/dbcon/dmlpackageproc/deletepackageprocessor.cpp @@ -643,7 +643,7 @@ bool DeletePackageProcessor::processRowgroup(ByteStream& aRowGroup, DMLResult& r { bool rc = false; // cout << "Get dbroot " << dbroot << endl; - int pmNum = (*fDbRootPMMap)[dbroot]; + int pmNum = oamcache()->getOwnerPM(dbroot); DMLTable* tablePtr = cpackage.get_Table(); ByteStream bytestream; bytestream << (ByteStream::byte)WE_SVR_DELETE; diff --git a/dbcon/dmlpackageproc/dmlpackageprocessor.h b/dbcon/dmlpackageproc/dmlpackageprocessor.h index ec7956c81..4379e9412 100644 --- a/dbcon/dmlpackageproc/dmlpackageprocessor.h +++ b/dbcon/dmlpackageproc/dmlpackageprocessor.h @@ -165,8 +165,6 @@ class DMLPackageProcessor std::cout << "Cannot make connection to WES" << std::endl; } - oam::OamCache* oamCache = oam::OamCache::makeOamCache(); - fDbRootPMMap = oamCache->getDBRootToPMMap(); fDbrm = aDbrm; fSessionID = sid; fExeMgr = new execplan::ClientRotator(fSessionID, "ExeMgr"); @@ -489,6 +487,8 @@ class DMLPackageProcessor uint32_t tableOid); int endTransaction(uint64_t uniqueId, BRM::TxnID txnID, bool success); + oam::OamCache* oamcache() { return oam::OamCache::makeOamCache(); } + /** @brief the Session Manager interface */ execplan::SessionManager fSessionManager; @@ -500,8 +500,6 @@ class DMLPackageProcessor uint32_t fPMCount; WriteEngine::WEClients* fWEClient; BRM::DBRM* fDbrm; - boost::shared_ptr > fDbRootPMMap; - oam::Oam fOam; bool fRollbackPending; // When set, any derived object should stop what it's doing and cleanup in // preparation for a Rollback execplan::ClientRotator* fExeMgr; diff --git a/dbcon/dmlpackageproc/insertpackageprocessor.cpp b/dbcon/dmlpackageproc/insertpackageprocessor.cpp index 524acb677..9629a504c 100644 --- a/dbcon/dmlpackageproc/insertpackageprocessor.cpp +++ b/dbcon/dmlpackageproc/insertpackageprocessor.cpp @@ -275,8 +275,7 @@ DMLPackageProcessor::DMLResult InsertPackageProcessor::processPackageInternal( if (tmpSet) { dbroot = tmp.dbRoot; - boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); - pmNum = (*dbRootPMMap)[dbroot]; + pmNum = oamcache->getOwnerPM(dbroot); //@Bug 4760. validate pm value if (pmNum == 0) diff --git a/dbcon/dmlpackageproc/updatepackageprocessor.cpp b/dbcon/dmlpackageproc/updatepackageprocessor.cpp index 651779a58..91508ca46 100644 --- a/dbcon/dmlpackageproc/updatepackageprocessor.cpp +++ b/dbcon/dmlpackageproc/updatepackageprocessor.cpp @@ -161,8 +161,7 @@ DMLPackageProcessor::DMLResult UpdatePackageProcessor::processPackageInternal( int32_t sessionId = fSessionID; std::string processName("DMLProc"); int i = 0; - OamCache* oamcache = OamCache::makeOamCache(); - std::vector pmList = oamcache->getModuleIds(); + std::vector pmList = oamcache()->getModuleIds(); std::vector pms; for (unsigned i = 0; i < pmList.size(); i++) @@ -437,8 +436,7 @@ uint64_t UpdatePackageProcessor::fixUpRows(dmlpackage::CalpontDMLPackage& cpacka uint64_t rowsProcessed = 0; uint32_t dbroot = 1; bool metaData = false; - oam::OamCache* oamCache = oam::OamCache::makeOamCache(); - std::vector fPMs = oamCache->getModuleIds(); + std::vector fPMs = oamcache()->getModuleIds(); std::map pmState; string emsg; string emsgStr; @@ -726,7 +724,7 @@ bool UpdatePackageProcessor::processRowgroup(ByteStream& aRowGroup, DMLResult& r { bool rc = false; // cout << "Get dbroot " << dbroot << endl; - uint32_t pmNum = (*fDbRootPMMap)[dbroot]; + uint32_t pmNum = oamcache()->getOwnerPM(dbroot); ByteStream bytestream; bytestream << (uint8_t)WE_SVR_UPDATE; diff --git a/dbcon/joblist/pdictionaryscan.cpp b/dbcon/joblist/pdictionaryscan.cpp index c710466cf..2e1a00e10 100644 --- a/dbcon/joblist/pdictionaryscan.cpp +++ b/dbcon/joblist/pdictionaryscan.cpp @@ -330,16 +330,11 @@ void pDictionaryScan::sendPrimitiveMessages() uint32_t partNum; uint16_t segNum; BRM::OID_t oid; - boost::shared_ptr > dbRootConnectionMap; - boost::shared_ptr > dbRootPMMap; oam::OamCache* oamCache = oam::OamCache::makeOamCache(); int localPMId = oamCache->getLocalPMId(); try { - dbRootConnectionMap = oamCache->getDBRootToConnectionMap(); - dbRootPMMap = oamCache->getDBRootToPMMap(); - it = fDictlbids.begin(); for (; it != fDictlbids.end() && !cancelled(); it++) @@ -350,10 +345,11 @@ void pDictionaryScan::sendPrimitiveMessages() // Bug5741 If we are local only and this doesn't belongs to us, skip it if (fLocalQuery == execplan::CalpontSelectExecutionPlan::LOCAL_QUERY) { + // XXX SZ: this may trigger. if (localPMId == 0) throw IDBExcept(ERR_LOCAL_QUERY_UM); - if (dbRootPMMap->find(dbroot)->second != localPMId) + if (!oamCache->isAccessibleBy(dbroot, localPMId)) continue; } @@ -374,23 +370,22 @@ void pDictionaryScan::sendPrimitiveMessages() if (remainingLbids < msgLbidCount) msgLbidCount = remainingLbids; - if (dbRootConnectionMap->find(dbroot) == dbRootConnectionMap->end()) + if (oamCache->isOffline(dbroot)) { // MCOL-259 force a reload of the xml. This usualy fixes it. Logger log; log.logMessage(logging::LOG_TYPE_DEBUG, "dictionary forcing reload of columnstore.xml for dbRootConnectionMap"); oamCache->forceReload(); - dbRootConnectionMap = oamCache->getDBRootToConnectionMap(); - if (dbRootConnectionMap->find(dbroot) == dbRootConnectionMap->end()) + if (oamCache->isOffline(dbroot)) { log.logMessage(logging::LOG_TYPE_DEBUG, "dictionary still not in dbRootConnectionMap"); throw IDBExcept(ERR_DATA_OFFLINE); } } - sendAPrimitiveMessage(msgLbidStart, msgLbidCount, (*dbRootConnectionMap)[dbroot]); + sendAPrimitiveMessage(msgLbidStart, msgLbidCount, oamCache->getClosestConnection(dbroot)); mutex.lock(); msgsSent += msgLbidCount; diff --git a/dbcon/joblist/primitivestep.h b/dbcon/joblist/primitivestep.h index 008a0e81f..4ba0af992 100644 --- a/dbcon/joblist/primitivestep.h +++ b/dbcon/joblist/primitivestep.h @@ -61,6 +61,7 @@ #include "rowgroup.h" #include "rowaggregation.h" #include "funcexpwrapper.h" +#include "oamcache.h" namespace joblist { @@ -1458,7 +1459,7 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep uint32_t blocksPerJob; /* Pseudo column filter processing. Think about refactoring into a separate class. */ - bool processPseudoColFilters(uint32_t extentIndex, boost::shared_ptr> dbRootPMMap) const; + bool processPseudoColFilters(uint32_t extentIndex, oam::OamCache* oamCache) const; template bool processOneFilterType(int8_t colWidth, T value, uint32_t type) const; template @@ -1472,6 +1473,7 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep bool compareRange(uint8_t COP, int64_t min, int64_t max, int64_t val) const; bool hasPCFilter, hasPMFilter, hasRIDFilter, hasSegmentFilter, hasDBRootFilter, hasSegmentDirFilter, hasPartitionFilter, hasMaxFilter, hasMinFilter, hasLBIDFilter, hasExtentIDFilter; + int findClosestPM(const std::map>& dbrootConnMap, int dbroot); }; /** @brief class FilterStep diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index 1e8e0f93e..46d674130 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -1936,7 +1936,7 @@ bool TupleBPS::processLBIDFilter(const EMEntry& emEntry) const } bool TupleBPS::processPseudoColFilters(uint32_t extentIndex, - boost::shared_ptr> dbRootPMMap) const + oam::OamCache* oamCache) const { if (!hasPCFilter) return true; @@ -1946,7 +1946,7 @@ bool TupleBPS::processPseudoColFilters(uint32_t extentIndex, if (bop == BOP_AND) { /* All Pseudocolumns have been promoted to 8-bytes except the casual partitioning filters */ - return (!hasPMFilter || processOneFilterType(8, (*dbRootPMMap)[emEntry.dbRoot], PSEUDO_PM)) && + return (!hasPMFilter || processOneFilterType(8, oamCache->getClosestPM(emEntry.dbRoot), PSEUDO_PM)) && (!hasSegmentFilter || processOneFilterType(8, emEntry.segmentNum, PSEUDO_SEGMENT)) && (!hasDBRootFilter || processOneFilterType(8, emEntry.dbRoot, PSEUDO_DBROOT)) && (!hasSegmentDirFilter || processOneFilterType(8, emEntry.partitionNum, PSEUDO_SEGMENTDIR)) && @@ -1971,7 +1971,7 @@ bool TupleBPS::processPseudoColFilters(uint32_t extentIndex, } else { - return (hasPMFilter && processOneFilterType(8, (*dbRootPMMap)[emEntry.dbRoot], PSEUDO_PM)) || + return (hasPMFilter && processOneFilterType(8, oamCache->getClosestPM(emEntry.dbRoot), PSEUDO_PM)) || (hasSegmentFilter && processOneFilterType(8, emEntry.segmentNum, PSEUDO_SEGMENT)) || (hasDBRootFilter && processOneFilterType(8, emEntry.dbRoot, PSEUDO_DBROOT)) || (hasSegmentDirFilter && processOneFilterType(8, emEntry.partitionNum, PSEUDO_SEGMENTDIR)) || @@ -2004,8 +2004,6 @@ void TupleBPS::makeJobs(vector* jobs) uint32_t blocksToScan; LBID_t startingLBID; oam::OamCache* oamCache = oam::OamCache::makeOamCache(); - boost::shared_ptr> dbRootConnectionMap = oamCache->getDBRootToConnectionMap(); - boost::shared_ptr> dbRootPMMap = oamCache->getDBRootToPMMap(); int localPMId = oamCache->getLocalPMId(); idbassert(ffirstStepType == SCAN); @@ -2043,7 +2041,7 @@ void TupleBPS::makeJobs(vector* jobs) continue; } - if (!processPseudoColFilters(i, dbRootPMMap)) + if (!processPseudoColFilters(i, oamCache)) { fNumBlksSkipped += lbidsToScan; continue; @@ -2066,20 +2064,19 @@ void TupleBPS::makeJobs(vector* jobs) throw IDBExcept(ERR_LOCAL_QUERY_UM); } - if (dbRootPMMap->find(scannedExtents[i].dbRoot)->second != localPMId) + if (!oamCache->isAccessibleBy(scannedExtents[i].dbRoot, localPMId)) continue; } // a necessary DB root is offline - if (dbRootConnectionMap->find(scannedExtents[i].dbRoot) == dbRootConnectionMap->end()) + if (oamCache->isOffline(scannedExtents[i].dbRoot)) { // MCOL-259 force a reload of the xml. This usualy fixes it. Logger log; log.logMessage(logging::LOG_TYPE_WARNING, "forcing reload of columnstore.xml for dbRootConnectionMap"); oamCache->forceReload(); - dbRootConnectionMap = oamCache->getDBRootToConnectionMap(); - if (dbRootConnectionMap->find(scannedExtents[i].dbRoot) == dbRootConnectionMap->end()) + if (oamCache->isOffline(scannedExtents[i].dbRoot)) { log.logMessage(logging::LOG_TYPE_WARNING, "dbroot still not in dbRootConnectionMap"); throw IDBExcept(ERR_DATA_OFFLINE); @@ -2106,9 +2103,10 @@ void TupleBPS::makeJobs(vector* jobs) fBPP->setLBID(startingLBID, scannedExtents[i]); fBPP->setCount(blocksThisJob); bs.reset(new ByteStream()); - fBPP->runBPP(*bs, (*dbRootConnectionMap)[scannedExtents[i].dbRoot], isExeMgrDEC); + int connIndex = oamCache->getClosestConnection(scannedExtents[i].dbRoot); + fBPP->runBPP(*bs, connIndex, isExeMgrDEC); jobs->push_back( - Job(scannedExtents[i].dbRoot, (*dbRootConnectionMap)[scannedExtents[i].dbRoot], blocksThisJob, bs)); + Job(scannedExtents[i].dbRoot, connIndex, blocksThisJob, bs)); blocksToScan -= blocksThisJob; startingLBID += fColType.colWidth * blocksThisJob; fBPP->reset(); diff --git a/dbcon/mysql/is_columnstore_files.cpp b/dbcon/mysql/is_columnstore_files.cpp index 19aea021b..9c29a19b7 100644 --- a/dbcon/mysql/is_columnstore_files.cpp +++ b/dbcon/mysql/is_columnstore_files.cpp @@ -33,6 +33,7 @@ #include "we_brm.h" #include "bytestream.h" #include "liboamcpp.h" +#include "oamcache.h" #include "messagequeue.h" #include "messagequeuepool.h" #include "we_messages.h" @@ -119,7 +120,7 @@ static int generate_result(BRM::OID_t oid, BRM::DBRM* emp, TABLE* table, THD* th off_t compressedFileSize = 0; we_config.initConfigCache(); messageqcpp::MessageQueueClient* msgQueueClient; - oam::Oam oam_instance; + oam::OamCache* oamcache = oam::OamCache::makeOamCache(); int pmId = 0; int rc; @@ -141,7 +142,7 @@ static int generate_result(BRM::OID_t oid, BRM::DBRM* emp, TABLE* table, THD* th try { - oam_instance.getDbrootPmConfig(iter->dbRoot, pmId); + pmId = oamcache->getOwnerPM(iter->dbRoot); } catch (std::runtime_error&) { diff --git a/dmlproc/batchinsertprocessor.cpp b/dmlproc/batchinsertprocessor.cpp index 785a31225..d53183a26 100644 --- a/dmlproc/batchinsertprocessor.cpp +++ b/dmlproc/batchinsertprocessor.cpp @@ -230,6 +230,15 @@ void BatchInsertProc::buildLastPkg(messageqcpp::ByteStream& bs) bs << rt; } +uint32_t BatchInsertProc::selectNextPM() +{ + uint32_t pm; + do + { + pm = fBatchLoader->selectNextPM(); + } while (pm != 0 && fWEClient->isConnectionReadonly(pm)); + return pm; +} void BatchInsertProc::sendFirstBatch() { uint32_t firstPmId = 0; @@ -237,7 +246,7 @@ void BatchInsertProc::sendFirstBatch() try { - firstPmId = fBatchLoader->selectNextPM(); + firstPmId = selectNextPM(); } catch (std::exception& ex) { @@ -268,7 +277,7 @@ void BatchInsertProc::sendNextBatch() try { - fCurrentPMid = fBatchLoader->selectNextPM(); + fCurrentPMid = selectNextPM(); } catch (std::exception& ex) { diff --git a/dmlproc/batchinsertprocessor.h b/dmlproc/batchinsertprocessor.h index a950b0986..ddb2a229e 100644 --- a/dmlproc/batchinsertprocessor.h +++ b/dmlproc/batchinsertprocessor.h @@ -68,6 +68,7 @@ class BatchInsertProc void setHwm(); void receiveAllMsg(); void receiveOutstandingMsg(); + uint32_t selectNextPM(); private: SP_PKG fInsertPkgQueue; diff --git a/dmlproc/dmlproc.cpp b/dmlproc/dmlproc.cpp index 0bc59e2e0..b60767fd7 100644 --- a/dmlproc/dmlproc.cpp +++ b/dmlproc/dmlproc.cpp @@ -253,7 +253,6 @@ void rollbackAll(DBRM* dbrm) message5.format(args5); ml.logInfoMessage(message5); OamCache* oamcache = OamCache::makeOamCache(); - OamCache::dbRootPMMap_t dbRootPMMap = oamcache->getDBRootToPMMap(); int errorTxn = 0; for (i = 0; i < tableLocks.size(); i++) @@ -393,7 +392,7 @@ void rollbackAll(DBRM* dbrm) try { - rollbackProcessor.processBulkRollback(tableLocks[i], dbrm, uniqueId, dbRootPMMap, lockReleased); + rollbackProcessor.processBulkRollback(tableLocks[i], dbrm, uniqueId, oamcache, lockReleased); ostringstream oss; oss << "DMLProc started bulkrollback on table OID " << tableLocks[i].tableOID << " and table lock id " << tableLocks[i].id << " finished and tablelock is released."; diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index e905095e5..974d6ce5c 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -1155,8 +1155,6 @@ void PackageHandler::run() logging::Message message(1); args.add("dmlprocessor.cpp PackageHandler::run() package type"); args.add((uint64_t)fPackageType); - args.add(" ,transaction ID: "); - args.add(fTxnid); args.add(e.what()); message.format(args); ml.logErrorMessage(message); @@ -1404,7 +1402,9 @@ void DMLProcessor::operator()() messageqcpp::ByteStream::byte status = 255; messageqcpp::ByteStream::octbyte rowCount = 0; - if (fDbrm->getSystemState(stateFlags) > + int rr = fDbrm->getSystemState(stateFlags); + + if (rr > 0) // > 0 implies succesful retrieval. It doesn't imply anything about the contents { messageqcpp::ByteStream results; @@ -1845,7 +1845,7 @@ void DMLProcessor::operator()() void RollbackTransactionProcessor::processBulkRollback(BRM::TableLockInfo lockInfo, BRM::DBRM* dbrm, uint64_t uniqueId, - OamCache::dbRootPMMap_t& dbRootPMMap, + OamCache* oamcache, bool& lockReleased) { // Take over ownership of stale lock. @@ -1884,7 +1884,7 @@ void RollbackTransactionProcessor::processBulkRollback(BRM::TableLockInfo lockIn for (uint32_t i = 0; i < lockInfo.dbrootList.size(); i++) { - pmId = (*dbRootPMMap)[lockInfo.dbrootList[i]]; + pmId = oamcache->getOwnerPM(lockInfo.dbrootList[i]); pmSet.insert(pmId); } diff --git a/dmlproc/dmlprocessor.h b/dmlproc/dmlprocessor.h index 3492bcfa5..6d020770b 100644 --- a/dmlproc/dmlprocessor.h +++ b/dmlproc/dmlprocessor.h @@ -321,7 +321,7 @@ class RollbackTransactionProcessor : public dmlpackageprocessor::DMLPackageProce } void processBulkRollback(BRM::TableLockInfo lockInfo, BRM::DBRM* dbrm, uint64_t uniqueId, - oam::OamCache::dbRootPMMap_t& dbRootPMMap, bool& lockReleased); + oam::OamCache* oamcache, bool& lockReleased); protected: private: diff --git a/oam/oamcpp/liboamcpp.cpp b/oam/oamcpp/liboamcpp.cpp index 60b704780..5cc5856a9 100644 --- a/oam/oamcpp/liboamcpp.cpp +++ b/oam/oamcpp/liboamcpp.cpp @@ -684,12 +684,14 @@ void Oam::getPmDbrootConfig(const int pmid, DBRootConfigList& dbrootconfiglist) * Get DBRoot - PM Config data * ********************************************************************/ -void Oam::getDbrootPmConfig(const int dbrootid, int& pmid) +void Oam::getDbrootPmConfig(const int dbrootid, set& pmids) { SystemModuleTypeConfig systemmoduletypeconfig; ModuleTypeConfig moduletypeconfig; ModuleConfig moduleconfig; + pmids.clear(); + try { getSystemConfig(systemmoduletypeconfig); @@ -716,13 +718,16 @@ void Oam::getDbrootPmConfig(const int dbrootid, int& pmid) { if (*pt1 == dbrootid) { - pmid = (*pt).DeviceID; - return; + pmids.insert((*pt).DeviceID); } } } } } + if (!pmids.empty()) + { + return; + } // dbrootid not found, return with error exceptionControl("getDbrootPmConfig", API_INVALID_PARAMETER); diff --git a/oam/oamcpp/liboamcpp.h b/oam/oamcpp/liboamcpp.h index 8ab935b8e..6f9e41c99 100644 --- a/oam/oamcpp/liboamcpp.h +++ b/oam/oamcpp/liboamcpp.h @@ -384,7 +384,7 @@ class Oam /** *@brief Get DBRoot - PM Config data */ - EXPORT void getDbrootPmConfig(const int dbrootid, int& pmid); + EXPORT void getDbrootPmConfig(const int dbrootid, set& pmid); /** *@brief Get System DBRoot Config data diff --git a/oam/oamcpp/oamcache.cpp b/oam/oamcpp/oamcache.cpp index 607c45528..3d53a49f7 100644 --- a/oam/oamcpp/oamcache.cpp +++ b/oam/oamcpp/oamcache.cpp @@ -39,7 +39,6 @@ using namespace boost; namespace oam { - struct CacheReloaded { CacheReloaded() @@ -56,11 +55,20 @@ OamCache* OamCache::makeOamCache() return &cache.oamcache; } +static bool isWESConfigured(config::Config* config, int moduleID) +{ + char buff[200]; + snprintf(buff, sizeof(buff), "pm%u_WriteEngineServer", moduleID); + string fServer(buff); + // Check if WES IP address record exists in the config (if not, this is a read-only node) + std::string otherEndDnOrIPStr = config->getConfig(fServer, "IPAddr"); + return !(otherEndDnOrIPStr.empty() || otherEndDnOrIPStr == "unassigned"); +} void OamCache::checkReload() { Oam oam; config::Config* config = config::Config::makeConfig(); - int temp; + set temp; if (config->getCurrentMTime() == mtime) return; @@ -73,7 +81,7 @@ void OamCache::checkReload() idbassert(txt != ""); numDBRoots = config->fromText(txt); - dbRootPMMap.reset(new map()); + dbRootPMMap.reset(new map>()); // cerr << "reloading oamcache\n"; for (uint32_t i = 0; i < dbroots.size(); i++) @@ -88,39 +96,32 @@ void OamCache::checkReload() oam.getSystemConfig("pm", moduletypeconfig); int moduleID = 0; + rwPMs.clear(); for (unsigned i = 0; i < moduletypeconfig.ModuleCount; i++) { moduleID = atoi((moduletypeconfig.ModuleNetworkList[i]) .DeviceName.substr(MAX_MODULE_TYPE_SIZE, MAX_MODULE_ID_SIZE) .c_str()); uniquePids.insert(moduleID); + if (isWESConfigured(config, moduleID)) + { + rwPMs.insert(moduleID); + } } std::set::const_iterator it = uniquePids.begin(); moduleIds.clear(); uint32_t i = 0; - map pmToConnectionMap; + pmConnectionMap.clear(); // Restore for Windows when we support multiple PMs while (it != uniquePids.end()) { - pmToConnectionMap[*it] = i++; + pmConnectionMap[*it] = i++; moduleIds.push_back(*it); it++; } - dbRootConnectionMap.reset(new map()); - - for (i = 0; i < dbroots.size(); i++) - { - map::iterator pmIter = pmToConnectionMap.find((*dbRootPMMap)[dbroots[i]]); - - if (pmIter != pmToConnectionMap.end()) - { - (*dbRootConnectionMap)[dbroots[i]] = (*pmIter).second; - } - } - pmDbrootsMap.reset(new OamCache::PMDbrootsMap_t::element_type()); systemStorageInfo_t t; t = oam.getStorageConfig(); @@ -142,21 +143,18 @@ void OamCache::checkReload() tm = oam.getModuleInfo(); OAMParentModuleName = boost::get<3>(tm); systemName = config->getConfig("SystemConfig", "SystemName"); -} + dbRootConnectionMap.clear(); -OamCache::dbRootPMMap_t OamCache::getDBRootToPMMap() -{ - return dbRootPMMap; -} + for (i = 0; i < dbroots.size(); i++) + { + auto pmIter = pmConnectionMap.find(getOwnerPM(dbroots[i])); -OamCache::dbRootPMMap_t OamCache::getDBRootToConnectionMap() -{ - return dbRootConnectionMap; -} + if (pmIter != pmConnectionMap.end()) + { + dbRootConnectionMap[dbroots[i]] = (*pmIter).second; + } + } -OamCache::PMDbrootsMap_t OamCache::getPMToDbrootsMap() -{ - return pmDbrootsMap; } uint32_t OamCache::getDBRootCount() @@ -237,4 +235,77 @@ string OamCache::getModuleName() return moduleName; } +bool OamCache::isAccessibleBy(int dbRoot, int pmId) +{ + return (*dbRootPMMap)[dbRoot].contains(pmId); +} + +bool OamCache::isOffline(int dbRoot) +{ + return dbRootConnectionMap.find(dbRoot) == dbRootConnectionMap.end(); +} + +int OamCache::getClosestPM(int dbroot) // who can access dbroot's records for read requests - either owner or us. +{ + if ((*dbRootPMMap)[dbroot].contains(mLocalPMId)) + { + return mLocalPMId; + } + for(auto j : (*dbRootPMMap)[dbroot]) + { + int pm = j; + if (rwPMs.contains(pm)) + { + return pm; + } + } + idbassert_s(0, "dbroot " << dbroot << " has empty set of PM's"); +} + +int OamCache::getClosestConnection(int dbroot) // connection index to owner's PM or ours PM - who can access dbRoot. +{ + return pmConnectionMap[getClosestPM(dbroot)]; +} + +int OamCache::getOwnerConnection(int dbroot) // connection index to owner's PM. +{ + return pmConnectionMap[getOwnerPM(dbroot)]; +} + +int OamCache::getOwnerPM(int dbroot) // Owner's PM index. +{ + for(auto j : (*dbRootPMMap)[dbroot]) + { + int pm = j; + if (rwPMs.contains(pm)) + { + return pm; + } + } + idbassert_s(0, "cannot find owner for dbroot " << dbroot); +} + +std::vector OamCache::getPMDBRoots(int PM) // what DBRoots are owned by given PM. +{ + std::vector result; + for (const auto& dbroot : (*dbRootPMMap)) + { + if (dbroot.second.find(PM) != dbroot.second.end()) + { + result.push_back(dbroot.first); + } + } + return result; +} + +std::vector OamCache::getAllDBRoots() // get all DBRoots. +{ + std::vector result; + for (const auto& dbroot : (*dbRootPMMap)) + { + result.push_back(dbroot.first); + } + return result; +} + } /* namespace oam */ diff --git a/oam/oamcpp/oamcache.h b/oam/oamcpp/oamcache.h index 17f432219..34eb115ac 100644 --- a/oam/oamcpp/oamcache.h +++ b/oam/oamcpp/oamcache.h @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -31,7 +32,7 @@ namespace oam class OamCache { public: - typedef boost::shared_ptr > dbRootPMMap_t; + typedef boost::shared_ptr> > dbRootPMMap_t; typedef std::vector dbRoots; typedef boost::shared_ptr > PMDbrootsMap_t; virtual ~OamCache() = default; @@ -42,9 +43,14 @@ class OamCache mtime = 0; } - dbRootPMMap_t getDBRootToPMMap(); - dbRootPMMap_t getDBRootToConnectionMap(); - PMDbrootsMap_t getPMToDbrootsMap(); + int getClosestPM(int dbroot); // who can access dbroot's records for read requests - either owner or us. + int getClosestConnection(int dbroot); // connection index to owner's PM or ours PM - who can access dbRoot. + int getOwnerConnection(int dbroot); // connection index to owner's PM. + int getOwnerPM(int dbroot); // Owner's PM index. + std::vector getPMDBRoots(int PM); // what DBRoots are owned by given PM. + std::vector getAllDBRoots(); // get all DBRoots. + bool isAccessibleBy(int dbRoot, int pmId); + bool isOffline(int dbRoot); // not registered in map. uint32_t getDBRootCount(); DBRootConfigList& getDBRootNums(); std::vector& getModuleIds(); @@ -61,7 +67,7 @@ class OamCache OamCache& operator=(const OamCache&) const = delete; dbRootPMMap_t dbRootPMMap; - dbRootPMMap_t dbRootConnectionMap; + map dbRootConnectionMap; PMDbrootsMap_t pmDbrootsMap; uint32_t numDBRoots = 1; time_t mtime = 0; @@ -71,6 +77,8 @@ class OamCache int mLocalPMId = 0; // The PM id running on this machine std::string systemName; std::string moduleName; + map pmConnectionMap; + set rwPMs; }; } // namespace oam diff --git a/tools/cleartablelock/cleartablelock.cpp b/tools/cleartablelock/cleartablelock.cpp index 361d354a6..5620bbb56 100644 --- a/tools/cleartablelock/cleartablelock.cpp +++ b/tools/cleartablelock/cleartablelock.cpp @@ -159,19 +159,22 @@ int constructPMList(const std::vector& dbRootList, std::vector pmSet; // used to filter out duplicates - int pm; + std::set pms; for (unsigned j = 0; j < dbRootList.size(); j++) { dbRoot = dbRootList[j]; - oam.getDbrootPmConfig(dbRootList[j], pm); - pmSet.insert(pm); + oam.getDbrootPmConfig(dbRootList[j], pms); + for (auto pm : pms) + { + pmSet.insert(pm); + } } // Store unique set of PM IDs into output vector - for (std::set::const_iterator iter = pmSet.begin(); iter != pmSet.end(); ++iter) + for (auto pm : pmSet) { - pmList.push_back(*iter); + pmList.push_back(pm); } } catch (std::exception& ex) diff --git a/utils/batchloader/batchloader.cpp b/utils/batchloader/batchloader.cpp index 0e9bfe836..a1e7697d7 100644 --- a/utils/batchloader/batchloader.cpp +++ b/utils/batchloader/batchloader.cpp @@ -48,31 +48,11 @@ BatchLoader::BatchLoader(uint32_t tableOid, execplan::CalpontSystemCatalog::SCN fPMs = PMs; fSessionId = sessionId; fTableOid = tableOid; - OamCache* oamcache = OamCache::makeOamCache(); - oam::OamCache::PMDbrootsMap_t systemPmDbrootMap = oamcache->getPMToDbrootsMap(); - std::map::iterator iter = systemPmDbrootMap->begin(); - // cout << "fPMs size is " << fPMs.size() << endl; - fPmDbrootMap.reset(new OamCache::PMDbrootsMap_t::element_type()); - fDbrootPMmap.reset(new map()); - - for (uint32_t i = 0; i < fPMs.size(); i++) + fOamCache = OamCache::makeOamCache(); + auto allDBRoots = fOamCache->getAllDBRoots(); + for (auto dbr : allDBRoots) { - iter = systemPmDbrootMap->find(fPMs[i]); - - if (iter != systemPmDbrootMap->end()) - { - fDbRoots.insert(fDbRoots.end(), (iter->second).begin(), (iter->second).end()); - (*fPmDbrootMap)[fPMs[i]] = iter->second; - } - } - - // Build dbroot to PM map - for (iter = fPmDbrootMap->begin(); iter != fPmDbrootMap->end(); iter++) - { - for (uint32_t i = 0; i < iter->second.size(); i++) - { - (*fDbrootPMmap)[iter->second[i]] = iter->first; - } + fDbRoots.push_back(dbr); } } //------------------------------------------------------------------------------ @@ -199,12 +179,7 @@ void BatchLoader::selectFirstPM(uint32_t& PMId) if (createdDbroot != 0) { - std::map::iterator iter = fDbrootPMmap->begin(); - - iter = fDbrootPMmap->find(createdDbroot); - - if (iter != fDbrootPMmap->end()) - PMId = iter->second; + PMId = fOamCache->getOwnerPM(createdDbroot); } // This will build the batch distribution sequence @@ -233,12 +208,12 @@ void BatchLoader::selectFirstPM(uint32_t& PMId) { PMRootInfo aEntry; aEntry.PMId = fPmDistSeq[j]; - iter = fPmDbrootMap->find(aEntry.PMId); + auto dbroots = fOamCache->getPMDBRoots(PMId); - for (unsigned k = 0; k < (iter->second).size(); k++) + for (unsigned k = 0; k < dbroots.size(); k++) { RootExtentsBlocks aRootInfo; - aRootInfo.DBRoot = (iter->second)[k]; + aRootInfo.DBRoot = dbroots[k]; aRootInfo.numExtents = rootExtents[aRootInfo.DBRoot]; aRootInfo.numBlocks = rootBlocks[aRootInfo.DBRoot]; // cout << "aRootInfo DBRoot:numExtents:numBlocks = " << @@ -410,17 +385,15 @@ void BatchLoader::buildBatchDistSeqVector() fPmDistSeq.clear(); BlIntVec aDbCntVec(fPMs.size()); - std::map::iterator iter = fPmDbrootMap->begin(); - for (uint32_t i = 0; i < fPMs.size(); i++) { - iter = fPmDbrootMap->find(fPMs[i]); + auto dbroots = fOamCache->getPMDBRoots(fPMs[i]); - if ((iter != fPmDbrootMap->end()) && ((iter->second).begin() != (iter->second).end())) + if (dbroots.size() > 0) { try { - aDbCntVec[i] = (iter->second).size(); + aDbCntVec[i] = dbroots.size(); // cout << "PM - "<& dbRootVec) { oam::OamCache* oamcache = oam::OamCache::makeOamCache(); - oam::OamCache::PMDbrootsMap_t pmDbroots = oamcache->getPMToDbrootsMap(); - dbRootVec.clear(); - dbRootVec = (*pmDbroots)[pm]; + dbRootVec = oamcache->getPMDBRoots(pm); } DBRootVec ExtentMap::getAllDbRoots() @@ -6189,13 +6187,11 @@ DBRootVec ExtentMap::getAllDbRoots() DBRootVec dbRootResultVec; oam::OamCache* oamcache = oam::OamCache::makeOamCache(); // NB The routine uses int for dbroot id that contradicts with the type used here, namely uint16_t - oam::OamCache::PMDbrootsMap_t pmDbroots = oamcache->getPMToDbrootsMap(); - auto& pmDbrootsRef = *pmDbroots; + auto pmDbroots = oamcache->getAllDBRoots(); - for (auto& pmDBRootPair : pmDbrootsRef) + for (auto& DBRoot : pmDbroots) { - for (auto dbRootId : pmDBRootPair.second) - dbRootResultVec.push_back(dbRootId); + dbRootResultVec.push_back(DBRoot); } return dbRootResultVec; } diff --git a/writeengine/client/we_clients.cpp b/writeengine/client/we_clients.cpp index 073d13744..223947ed5 100644 --- a/writeengine/client/we_clients.cpp +++ b/writeengine/client/we_clients.cpp @@ -505,8 +505,9 @@ void WEClients::write(const messageqcpp::ByteStream& msg, uint32_t connection) fPmConnections[connection]->write(msg); else { + // new behavior: connection client is nullptr means it is read-only. ostringstream os; - os << "Lost connection to WriteEngineServer on pm" << connection; + os << "Connection to readonly pm" << connection; throw runtime_error(os.str()); } } diff --git a/writeengine/client/we_ddlcommandclient.cpp b/writeengine/client/we_ddlcommandclient.cpp index e17a0604d..03c0bd7ac 100644 --- a/writeengine/client/we_ddlcommandclient.cpp +++ b/writeengine/client/we_ddlcommandclient.cpp @@ -37,6 +37,7 @@ namespace WriteEngine WE_DDLCommandClient::WE_DDLCommandClient() { fWEClient = new WEClients(WEClients::DDLPROC); + fOamCache = oam::OamCache::makeOamCache(); } WE_DDLCommandClient::~WE_DDLCommandClient() @@ -64,7 +65,7 @@ uint8_t WE_DDLCommandClient::UpdateSyscolumnNextval(uint32_t columnOid, uint64_t try { - fOam.getDbrootPmConfig(dbRoot, pmNum); + pmNum = fOamCache->getOwnerPM(dbRoot); fWEClient->write(command, pmNum); while (1) diff --git a/writeengine/client/we_ddlcommandclient.h b/writeengine/client/we_ddlcommandclient.h index b4ef7ce4d..98ee0204a 100644 --- a/writeengine/client/we_ddlcommandclient.h +++ b/writeengine/client/we_ddlcommandclient.h @@ -26,6 +26,7 @@ #include "dbrm.h" #include "liboamcpp.h" #include "writeengine.h" +#include "oamcache.h" #define EXPORT @@ -50,7 +51,7 @@ class WE_DDLCommandClient private: BRM::DBRM fDbrm; WEClients* fWEClient; - oam::Oam fOam; + oam::OamCache* fOamCache; }; } // namespace WriteEngine diff --git a/writeengine/redistribute/we_redistributecontrolthread.cpp b/writeengine/redistribute/we_redistributecontrolthread.cpp index 7caa14b05..c62a53ae7 100644 --- a/writeengine/redistribute/we_redistributecontrolthread.cpp +++ b/writeengine/redistribute/we_redistributecontrolthread.cpp @@ -758,8 +758,7 @@ int RedistributeControlThread::executeRedistributePlan() int RedistributeControlThread::connectToWes(int dbroot) { int ret = 0; - OamCache::dbRootPMMap_t dbrootToPM = fOamCache->getDBRootToPMMap(); - int pmId = (*dbrootToPM)[dbroot]; + int pmId = fOamCache->getOwnerPM(dbroot); ostringstream oss; oss << "pm" << pmId << "_WriteEngineServer"; diff --git a/writeengine/redistribute/we_redistributeworkerthread.cpp b/writeengine/redistribute/we_redistributeworkerthread.cpp index 1cd33c8a7..71fbb8425 100644 --- a/writeengine/redistribute/we_redistributeworkerthread.cpp +++ b/writeengine/redistribute/we_redistributeworkerthread.cpp @@ -134,11 +134,10 @@ void RedistributeWorkerThread::handleRequest() { memcpy(&fPlanEntry, fBs.buf(), sizeof(RedistributePlanEntry)); fBs.advance(sizeof(RedistributePlanEntry)); - OamCache::dbRootPMMap_t dbrootToPM = fOamCache->getDBRootToPMMap(); fMyId.first = fPlanEntry.source; - fMyId.second = (*dbrootToPM)[fMyId.first]; + fMyId.second = fOamCache->getOwnerPM(fMyId.first); fPeerId.first = fPlanEntry.destination; - fPeerId.second = (*dbrootToPM)[fPeerId.first]; + fPeerId.second = fOamCache->getOwnerPM(fPeerId.first); if (grabTableLock() == 0) { diff --git a/writeengine/shared/we_define.cpp b/writeengine/shared/we_define.cpp index 49bfeac18..382b49456 100644 --- a/writeengine/shared/we_define.cpp +++ b/writeengine/shared/we_define.cpp @@ -154,7 +154,7 @@ WErrorCodes::WErrorCodes() : fErrorCodes() fErrorCodes[ERR_BULK_SEND_MSG_ERR] = " in a bulk load send msg"; fErrorCodes[ERR_BULK_MISSING_EXTENT_ENTRY] = " missing Extent Entry when trying to save LBID info for CP"; fErrorCodes[ERR_BULK_MISSING_EXTENT_ROW] = " missing Extent Row when trying to save LBID info for CP"; - fErrorCodes[ERR_BULK_ROW_FILL_BUFFER] = " Single row fills read buffer; try larger read buffer via -c flag in cpimport"; + fErrorCodes[ERR_BULK_ROW_FILL_BUFFER] = " Single row fills read buffer; try larger read buffer."; fErrorCodes[ERR_BULK_DBROOT_CHANGE] = " Local PM DBRoot settings changed during bulk load."; fErrorCodes[ERR_BULK_ROLLBACK_MISS_ROOT] = " Mode3 automatic rollback not performed. DBRoot missing."; fErrorCodes[ERR_BULK_ROLLBACK_SEG_LIST] = " Error building segment file list in a directory."; @@ -284,7 +284,8 @@ std::string WErrorCodes::errorString(int code) case ERR_FILE_DISK_SPACE: { logging::Message::Args args; - args.add("configured by WriteEngine.MaxFileSystemDiskUsagePct in Columnstore.xml"); + std::string msgArg; // empty str arg; no extra info in this context + args.add(msgArg); return logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_EXTENT_DISK_SPACE, args); break; }