1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

feat(MCOL-6082): Multiple readers of dbroots using OamCache logic

This patch introduces centralized logic of selecting what dbroot is
accessible in PrimProc on what node. The logic is in OamCache for time
being and can be moved later.
This commit is contained in:
Serguey Zefirov
2025-07-10 11:31:32 +00:00
parent 2753743762
commit 5aa2a824c2
33 changed files with 232 additions and 194 deletions

View File

@ -761,8 +761,7 @@ void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemC
int pmNum = 1;
OamCache* oamcache = OamCache::makeOamCache();
boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
pmNum = (*dbRootPMMap)[dbRoot];
pmNum = oamcache->getOwnerPM(dbRoot);
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
// Will create files on each PM as needed.
@ -925,7 +924,7 @@ void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemC
if (rc != 0)
throw std::runtime_error("Error while calling getSysCatDBRoot ");
pmNum = (*dbRootPMMap)[dbRoot];
pmNum = oamcache->getOwnerPM(dbRoot);
bs.restart();
bs << (ByteStream::byte)WE_SVR_UPDATE_SYSTABLE_AUTO;
bs << uniqueId;
@ -1317,8 +1316,7 @@ void AlterTableProcessor::dropColumn(uint32_t sessionID, execplan::CalpontSystem
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
OamCache* oamcache = OamCache::makeOamCache();
boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
pmNum = (*dbRootPMMap)[dbRoot];
pmNum = oamcache->getOwnerPM(dbRoot);
// MCOL-66 The DBRM can't handle concurrent DDL
boost::mutex::scoped_lock lk(dbrmMutex);
@ -1379,7 +1377,7 @@ void AlterTableProcessor::dropColumn(uint32_t sessionID, execplan::CalpontSystem
if (rc != 0)
throw std::runtime_error("Error while calling getSysCatDBRoot ");
pmNum = (*dbRootPMMap)[dbRoot];
pmNum = oamcache->getOwnerPM(dbRoot);
bytestream.restart();
bytestream << (ByteStream::byte)WE_SVR_UPDATE_SYSTABLE_AUTO;
bytestream << uniqueId;
@ -1452,7 +1450,7 @@ void AlterTableProcessor::dropColumn(uint32_t sessionID, execplan::CalpontSystem
if (rc != 0)
throw std::runtime_error("Error while calling getSysCatDBRoot ");
pmNum = (*dbRootPMMap)[dbRoot];
pmNum = oamcache->getOwnerPM(dbRoot);
try
{
@ -1706,8 +1704,7 @@ void AlterTableProcessor::setColumnDefault(uint32_t sessionID, execplan::Calpont
int pmNum = 1;
OamCache* oamcache = OamCache::makeOamCache();
boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
pmNum = (*dbRootPMMap)[dbRoot];
pmNum = oamcache->getOwnerPM(dbRoot);
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
@ -1795,8 +1792,7 @@ void AlterTableProcessor::dropColumnDefault(uint32_t sessionID, execplan::Calpon
int pmNum = 1;
OamCache* oamcache = OamCache::makeOamCache();
boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
pmNum = (*dbRootPMMap)[dbRoot];
pmNum = oamcache->getOwnerPM(dbRoot);
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
@ -2001,8 +1997,7 @@ void AlterTableProcessor::renameTable(uint32_t sessionID, execplan::CalpontSyste
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
OamCache* oamcache = OamCache::makeOamCache();
boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
pmNum = (*dbRootPMMap)[dbRoot];
pmNum = oamcache->getOwnerPM(dbRoot);
try
{
@ -2067,7 +2062,7 @@ void AlterTableProcessor::renameTable(uint32_t sessionID, execplan::CalpontSyste
if (rc != 0)
throw std::runtime_error("Error while calling getSysCatDBRoot");
pmNum = (*dbRootPMMap)[dbRoot];
pmNum = oamcache->getOwnerPM(dbRoot);
try
{
@ -2131,8 +2126,7 @@ void AlterTableProcessor::tableComment(uint32_t sessionID, execplan::CalpontSyst
int pmNum = 1;
rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);
OamCache* oamcache = OamCache::makeOamCache();
boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
pmNum = (*dbRootPMMap)[dbRoot];
pmNum = oamcache->getOwnerPM(dbRoot);
if (rc != 0)
throw std::runtime_error("Error while calling getSysCatDBRoot");
@ -2354,8 +2348,7 @@ void AlterTableProcessor::renameColumn(uint32_t sessionID, execplan::CalpontSyst
int pmNum = 1;
OamCache* oamcache = OamCache::makeOamCache();
boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
pmNum = (*dbRootPMMap)[dbRoot];
pmNum = oamcache->getOwnerPM(dbRoot);
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
CalpontSystemCatalog::TableName tableName;
@ -2539,7 +2532,7 @@ void AlterTableProcessor::renameColumn(uint32_t sessionID, execplan::CalpontSyst
if (rc != 0)
throw std::runtime_error("Error while calling getSysCatDBRoot");
pmNum = (*dbRootPMMap)[dbRoot];
pmNum = oamcache->getOwnerPM(dbRoot);
// send to WES to process
try

View File

@ -326,8 +326,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackageInternal(
bytestream << (uint32_t)dbRoot;
tableDef.serialize(bytestream);
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
pmNum = (*dbRootPMMap)[dbRoot];
pmNum = oamcache->getOwnerPM(dbRoot);
// MCOL-66 The DBRM can't handle concurrent DDL
boost::mutex::scoped_lock lk(dbrmMutex);
@ -450,7 +449,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackageInternal(
bytestream << (uint32_t)dbRoot;
tableDef.serialize(bytestream);
pmNum = (*dbRootPMMap)[dbRoot];
pmNum = oamcache->getOwnerPM(dbRoot);
try
{
@ -666,7 +665,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackageInternal(
return result;
}
pmNum = (*dbRootPMMap)[useDBRoot];
pmNum = oamcache->getOwnerPM(useDBRoot);
try
{

View File

@ -611,8 +611,7 @@ void DDLPackageProcessor::createFiles(CalpontSystemCatalog::TableName aTableName
try
{
OamCache* oamcache = OamCache::makeOamCache();
boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
int pmNum = (*dbRootPMMap)[useDBRoot];
int pmNum = oamcache->getOwnerPM(useDBRoot);
fWEClient->write(bytestream, (uint32_t)pmNum);
bsIn.reset(new ByteStream());

View File

@ -359,8 +359,7 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackageInternal(ddlpack
return result;
}
boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
pmNum = (*dbRootPMMap)[dbRoot];
pmNum = oamcache->getOwnerPM(dbRoot);
try
{
@ -464,7 +463,7 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackageInternal(ddlpack
return result;
}
pmNum = (*dbRootPMMap)[dbRoot];
pmNum = oamcache->getOwnerPM(dbRoot);
try
{
@ -1275,8 +1274,7 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackageInternal(ddlpa
bytestream << (uint32_t)colType.compressionType;
}
boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
pmNum = (*dbRootPMMap)[useDBRoot];
pmNum = oamcache->getOwnerPM(useDBRoot);
try
{

View File

@ -845,20 +845,15 @@ void CommandPackageProcessor::clearTableLock(uint64_t uniqueId, const dmlpackage
establishTableLockToClear(tableLockID, lockInfo);
lockGrabbed = true;
oam::OamCache* oamCache = oam::OamCache::makeOamCache();
oam::OamCache::dbRootPMMap_t dbRootPmMap = oamCache->getDBRootToPMMap();
std::map<int, int>::const_iterator mapIter;
std::set<int> pmSet;
// Construct relevant list of PMs based on the DBRoots associated
// with the tableLock.
for (unsigned int k = 0; k < lockInfo.dbrootList.size(); k++)
{
mapIter = dbRootPmMap->find(lockInfo.dbrootList[k]);
if (mapIter != dbRootPmMap->end())
if (!oamcache()->isOffline(lockInfo.dbrootList[k]))
{
int pm = mapIter->second;
int pm = oamcache()->getOwnerPM(lockInfo.dbrootList[k]);
pmSet.insert(pm);
}
else

View File

@ -643,7 +643,7 @@ bool DeletePackageProcessor::processRowgroup(ByteStream& aRowGroup, DMLResult& r
{
bool rc = false;
// cout << "Get dbroot " << dbroot << endl;
int pmNum = (*fDbRootPMMap)[dbroot];
int pmNum = oamcache()->getOwnerPM(dbroot);
DMLTable* tablePtr = cpackage.get_Table();
ByteStream bytestream;
bytestream << (ByteStream::byte)WE_SVR_DELETE;

View File

@ -165,8 +165,6 @@ class DMLPackageProcessor
std::cout << "Cannot make connection to WES" << std::endl;
}
oam::OamCache* oamCache = oam::OamCache::makeOamCache();
fDbRootPMMap = oamCache->getDBRootToPMMap();
fDbrm = aDbrm;
fSessionID = sid;
fExeMgr = new execplan::ClientRotator(fSessionID, "ExeMgr");
@ -489,6 +487,8 @@ class DMLPackageProcessor
uint32_t tableOid);
int endTransaction(uint64_t uniqueId, BRM::TxnID txnID, bool success);
oam::OamCache* oamcache() { return oam::OamCache::makeOamCache(); }
/** @brief the Session Manager interface
*/
execplan::SessionManager fSessionManager;
@ -500,8 +500,6 @@ class DMLPackageProcessor
uint32_t fPMCount;
WriteEngine::WEClients* fWEClient;
BRM::DBRM* fDbrm;
boost::shared_ptr<std::map<int, int> > fDbRootPMMap;
oam::Oam fOam;
bool fRollbackPending; // When set, any derived object should stop what it's doing and cleanup in
// preparation for a Rollback
execplan::ClientRotator* fExeMgr;

View File

@ -275,8 +275,7 @@ DMLPackageProcessor::DMLResult InsertPackageProcessor::processPackageInternal(
if (tmpSet)
{
dbroot = tmp.dbRoot;
boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
pmNum = (*dbRootPMMap)[dbroot];
pmNum = oamcache->getOwnerPM(dbroot);
//@Bug 4760. validate pm value
if (pmNum == 0)

View File

@ -161,8 +161,7 @@ DMLPackageProcessor::DMLResult UpdatePackageProcessor::processPackageInternal(
int32_t sessionId = fSessionID;
std::string processName("DMLProc");
int i = 0;
OamCache* oamcache = OamCache::makeOamCache();
std::vector<int> pmList = oamcache->getModuleIds();
std::vector<int> pmList = oamcache()->getModuleIds();
std::vector<uint32_t> pms;
for (unsigned i = 0; i < pmList.size(); i++)
@ -437,8 +436,7 @@ uint64_t UpdatePackageProcessor::fixUpRows(dmlpackage::CalpontDMLPackage& cpacka
uint64_t rowsProcessed = 0;
uint32_t dbroot = 1;
bool metaData = false;
oam::OamCache* oamCache = oam::OamCache::makeOamCache();
std::vector<int> fPMs = oamCache->getModuleIds();
std::vector<int> fPMs = oamcache()->getModuleIds();
std::map<unsigned, bool> pmState;
string emsg;
string emsgStr;
@ -726,7 +724,7 @@ bool UpdatePackageProcessor::processRowgroup(ByteStream& aRowGroup, DMLResult& r
{
bool rc = false;
// cout << "Get dbroot " << dbroot << endl;
uint32_t pmNum = (*fDbRootPMMap)[dbroot];
uint32_t pmNum = oamcache()->getOwnerPM(dbroot);
ByteStream bytestream;
bytestream << (uint8_t)WE_SVR_UPDATE;

View File

@ -330,16 +330,11 @@ void pDictionaryScan::sendPrimitiveMessages()
uint32_t partNum;
uint16_t segNum;
BRM::OID_t oid;
boost::shared_ptr<map<int, int> > dbRootConnectionMap;
boost::shared_ptr<map<int, int> > dbRootPMMap;
oam::OamCache* oamCache = oam::OamCache::makeOamCache();
int localPMId = oamCache->getLocalPMId();
try
{
dbRootConnectionMap = oamCache->getDBRootToConnectionMap();
dbRootPMMap = oamCache->getDBRootToPMMap();
it = fDictlbids.begin();
for (; it != fDictlbids.end() && !cancelled(); it++)
@ -350,10 +345,11 @@ void pDictionaryScan::sendPrimitiveMessages()
// Bug5741 If we are local only and this doesn't belongs to us, skip it
if (fLocalQuery == execplan::CalpontSelectExecutionPlan::LOCAL_QUERY)
{
// XXX SZ: this may trigger.
if (localPMId == 0)
throw IDBExcept(ERR_LOCAL_QUERY_UM);
if (dbRootPMMap->find(dbroot)->second != localPMId)
if (!oamCache->isAccessibleBy(dbroot, localPMId))
continue;
}
@ -374,23 +370,22 @@ void pDictionaryScan::sendPrimitiveMessages()
if (remainingLbids < msgLbidCount)
msgLbidCount = remainingLbids;
if (dbRootConnectionMap->find(dbroot) == dbRootConnectionMap->end())
if (oamCache->isOffline(dbroot))
{
// MCOL-259 force a reload of the xml. This usualy fixes it.
Logger log;
log.logMessage(logging::LOG_TYPE_DEBUG,
"dictionary forcing reload of columnstore.xml for dbRootConnectionMap");
oamCache->forceReload();
dbRootConnectionMap = oamCache->getDBRootToConnectionMap();
if (dbRootConnectionMap->find(dbroot) == dbRootConnectionMap->end())
if (oamCache->isOffline(dbroot))
{
log.logMessage(logging::LOG_TYPE_DEBUG, "dictionary still not in dbRootConnectionMap");
throw IDBExcept(ERR_DATA_OFFLINE);
}
}
sendAPrimitiveMessage(msgLbidStart, msgLbidCount, (*dbRootConnectionMap)[dbroot]);
sendAPrimitiveMessage(msgLbidStart, msgLbidCount, oamCache->getClosestConnection(dbroot));
mutex.lock();
msgsSent += msgLbidCount;

View File

@ -61,6 +61,7 @@
#include "rowgroup.h"
#include "rowaggregation.h"
#include "funcexpwrapper.h"
#include "oamcache.h"
namespace joblist
{
@ -1458,7 +1459,7 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep
uint32_t blocksPerJob;
/* Pseudo column filter processing. Think about refactoring into a separate class. */
bool processPseudoColFilters(uint32_t extentIndex, boost::shared_ptr<std::map<int, int>> dbRootPMMap) const;
bool processPseudoColFilters(uint32_t extentIndex, oam::OamCache* oamCache) const;
template <typename T>
bool processOneFilterType(int8_t colWidth, T value, uint32_t type) const;
template <typename T>
@ -1472,6 +1473,7 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep
bool compareRange(uint8_t COP, int64_t min, int64_t max, int64_t val) const;
bool hasPCFilter, hasPMFilter, hasRIDFilter, hasSegmentFilter, hasDBRootFilter, hasSegmentDirFilter,
hasPartitionFilter, hasMaxFilter, hasMinFilter, hasLBIDFilter, hasExtentIDFilter;
int findClosestPM(const std::map<int, std::set<int>>& dbrootConnMap, int dbroot);
};
/** @brief class FilterStep

View File

@ -1936,7 +1936,7 @@ bool TupleBPS::processLBIDFilter(const EMEntry& emEntry) const
}
bool TupleBPS::processPseudoColFilters(uint32_t extentIndex,
boost::shared_ptr<map<int, int>> dbRootPMMap) const
oam::OamCache* oamCache) const
{
if (!hasPCFilter)
return true;
@ -1946,7 +1946,7 @@ bool TupleBPS::processPseudoColFilters(uint32_t extentIndex,
if (bop == BOP_AND)
{
/* All Pseudocolumns have been promoted to 8-bytes except the casual partitioning filters */
return (!hasPMFilter || processOneFilterType(8, (*dbRootPMMap)[emEntry.dbRoot], PSEUDO_PM)) &&
return (!hasPMFilter || processOneFilterType(8, oamCache->getClosestPM(emEntry.dbRoot), PSEUDO_PM)) &&
(!hasSegmentFilter || processOneFilterType(8, emEntry.segmentNum, PSEUDO_SEGMENT)) &&
(!hasDBRootFilter || processOneFilterType(8, emEntry.dbRoot, PSEUDO_DBROOT)) &&
(!hasSegmentDirFilter || processOneFilterType(8, emEntry.partitionNum, PSEUDO_SEGMENTDIR)) &&
@ -1971,7 +1971,7 @@ bool TupleBPS::processPseudoColFilters(uint32_t extentIndex,
}
else
{
return (hasPMFilter && processOneFilterType(8, (*dbRootPMMap)[emEntry.dbRoot], PSEUDO_PM)) ||
return (hasPMFilter && processOneFilterType(8, oamCache->getClosestPM(emEntry.dbRoot), PSEUDO_PM)) ||
(hasSegmentFilter && processOneFilterType(8, emEntry.segmentNum, PSEUDO_SEGMENT)) ||
(hasDBRootFilter && processOneFilterType(8, emEntry.dbRoot, PSEUDO_DBROOT)) ||
(hasSegmentDirFilter && processOneFilterType(8, emEntry.partitionNum, PSEUDO_SEGMENTDIR)) ||
@ -2004,8 +2004,6 @@ void TupleBPS::makeJobs(vector<Job>* jobs)
uint32_t blocksToScan;
LBID_t startingLBID;
oam::OamCache* oamCache = oam::OamCache::makeOamCache();
boost::shared_ptr<map<int, int>> dbRootConnectionMap = oamCache->getDBRootToConnectionMap();
boost::shared_ptr<map<int, int>> dbRootPMMap = oamCache->getDBRootToPMMap();
int localPMId = oamCache->getLocalPMId();
idbassert(ffirstStepType == SCAN);
@ -2043,7 +2041,7 @@ void TupleBPS::makeJobs(vector<Job>* jobs)
continue;
}
if (!processPseudoColFilters(i, dbRootPMMap))
if (!processPseudoColFilters(i, oamCache))
{
fNumBlksSkipped += lbidsToScan;
continue;
@ -2066,20 +2064,19 @@ void TupleBPS::makeJobs(vector<Job>* jobs)
throw IDBExcept(ERR_LOCAL_QUERY_UM);
}
if (dbRootPMMap->find(scannedExtents[i].dbRoot)->second != localPMId)
if (!oamCache->isAccessibleBy(scannedExtents[i].dbRoot, localPMId))
continue;
}
// a necessary DB root is offline
if (dbRootConnectionMap->find(scannedExtents[i].dbRoot) == dbRootConnectionMap->end())
if (oamCache->isOffline(scannedExtents[i].dbRoot))
{
// MCOL-259 force a reload of the xml. This usualy fixes it.
Logger log;
log.logMessage(logging::LOG_TYPE_WARNING, "forcing reload of columnstore.xml for dbRootConnectionMap");
oamCache->forceReload();
dbRootConnectionMap = oamCache->getDBRootToConnectionMap();
if (dbRootConnectionMap->find(scannedExtents[i].dbRoot) == dbRootConnectionMap->end())
if (oamCache->isOffline(scannedExtents[i].dbRoot))
{
log.logMessage(logging::LOG_TYPE_WARNING, "dbroot still not in dbRootConnectionMap");
throw IDBExcept(ERR_DATA_OFFLINE);
@ -2106,9 +2103,10 @@ void TupleBPS::makeJobs(vector<Job>* jobs)
fBPP->setLBID(startingLBID, scannedExtents[i]);
fBPP->setCount(blocksThisJob);
bs.reset(new ByteStream());
fBPP->runBPP(*bs, (*dbRootConnectionMap)[scannedExtents[i].dbRoot], isExeMgrDEC);
int connIndex = oamCache->getClosestConnection(scannedExtents[i].dbRoot);
fBPP->runBPP(*bs, connIndex, isExeMgrDEC);
jobs->push_back(
Job(scannedExtents[i].dbRoot, (*dbRootConnectionMap)[scannedExtents[i].dbRoot], blocksThisJob, bs));
Job(scannedExtents[i].dbRoot, connIndex, blocksThisJob, bs));
blocksToScan -= blocksThisJob;
startingLBID += fColType.colWidth * blocksThisJob;
fBPP->reset();

View File

@ -33,6 +33,7 @@
#include "we_brm.h"
#include "bytestream.h"
#include "liboamcpp.h"
#include "oamcache.h"
#include "messagequeue.h"
#include "messagequeuepool.h"
#include "we_messages.h"
@ -119,7 +120,7 @@ static int generate_result(BRM::OID_t oid, BRM::DBRM* emp, TABLE* table, THD* th
off_t compressedFileSize = 0;
we_config.initConfigCache();
messageqcpp::MessageQueueClient* msgQueueClient;
oam::Oam oam_instance;
oam::OamCache* oamcache = oam::OamCache::makeOamCache();
int pmId = 0;
int rc;
@ -141,7 +142,7 @@ static int generate_result(BRM::OID_t oid, BRM::DBRM* emp, TABLE* table, THD* th
try
{
oam_instance.getDbrootPmConfig(iter->dbRoot, pmId);
pmId = oamcache->getOwnerPM(iter->dbRoot);
}
catch (std::runtime_error&)
{