1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-08 14:22:09 +03:00

Work for almost three days - progress keeping commit

This commit is contained in:
Serguey Zefirov
2025-07-03 13:41:22 +00:00
parent 674c8a2a03
commit dc17e7cb3d
16 changed files with 43 additions and 89 deletions

View File

@@ -762,7 +762,7 @@ void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemC
int pmNum = 1; int pmNum = 1;
OamCache* oamcache = OamCache::makeOamCache(); OamCache* oamcache = OamCache::makeOamCache();
//boost::shared_ptr<std::map<int, std::set<int>> > dbRootPMMap = oamcache->getDBRootToPMMap(); //boost::shared_ptr<std::map<int, std::set<int>> > dbRootPMMap = oamcache->getDBRootToPMMap();
pmNum = oamcache->getOwnerPM(); //*(*dbRootPMMap)[dbRoot].begin(); pmNum = oamcache->getOwnerPM(dbRoot); //*(*dbRootPMMap)[dbRoot].begin();
boost::shared_ptr<messageqcpp::ByteStream> bsIn; boost::shared_ptr<messageqcpp::ByteStream> bsIn;
// Will create files on each PM as needed. // Will create files on each PM as needed.
@@ -925,7 +925,7 @@ void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemC
if (rc != 0) if (rc != 0)
throw std::runtime_error("Error while calling getSysCatDBRoot "); throw std::runtime_error("Error while calling getSysCatDBRoot ");
pmNum = *(*dbRootPMMap)[dbRoot].begin(); pmNum = oamcache->getOwnerPM(dbRoot);
bs.restart(); bs.restart();
bs << (ByteStream::byte)WE_SVR_UPDATE_SYSTABLE_AUTO; bs << (ByteStream::byte)WE_SVR_UPDATE_SYSTABLE_AUTO;
bs << uniqueId; bs << uniqueId;
@@ -1379,7 +1379,7 @@ void AlterTableProcessor::dropColumn(uint32_t sessionID, execplan::CalpontSystem
if (rc != 0) if (rc != 0)
throw std::runtime_error("Error while calling getSysCatDBRoot "); throw std::runtime_error("Error while calling getSysCatDBRoot ");
pmNum = *(*dbRootPMMap)[dbRoot].begin(); pmNum = oamcache->getOwnerPM(dbRoot);
bytestream.restart(); bytestream.restart();
bytestream << (ByteStream::byte)WE_SVR_UPDATE_SYSTABLE_AUTO; bytestream << (ByteStream::byte)WE_SVR_UPDATE_SYSTABLE_AUTO;
bytestream << uniqueId; bytestream << uniqueId;
@@ -1452,7 +1452,7 @@ void AlterTableProcessor::dropColumn(uint32_t sessionID, execplan::CalpontSystem
if (rc != 0) if (rc != 0)
throw std::runtime_error("Error while calling getSysCatDBRoot "); throw std::runtime_error("Error while calling getSysCatDBRoot ");
pmNum = *(*dbRootPMMap)[dbRoot].begin(); pmNum = oamcache->getOwnerPM(dbRoot);
try try
{ {
@@ -1707,7 +1707,7 @@ void AlterTableProcessor::setColumnDefault(uint32_t sessionID, execplan::Calpont
int pmNum = 1; int pmNum = 1;
OamCache* oamcache = OamCache::makeOamCache(); OamCache* oamcache = OamCache::makeOamCache();
//boost::shared_ptr<std::map<int, std::set<int>> > dbRootPMMap = oamcache->getDBRootToPMMap(); //boost::shared_ptr<std::map<int, std::set<int>> > dbRootPMMap = oamcache->getDBRootToPMMap();
pmNum = oamcacne->getOwnerPM(dbRoot); //*(*dbRootPMMap)[dbRoot].begin(); pmNum = oamcache->getOwnerPM(dbRoot); //*(*dbRootPMMap)[dbRoot].begin();
boost::shared_ptr<messageqcpp::ByteStream> bsIn; boost::shared_ptr<messageqcpp::ByteStream> bsIn;
@@ -2540,7 +2540,7 @@ void AlterTableProcessor::renameColumn(uint32_t sessionID, execplan::CalpontSyst
if (rc != 0) if (rc != 0)
throw std::runtime_error("Error while calling getSysCatDBRoot"); throw std::runtime_error("Error while calling getSysCatDBRoot");
pmNum = *(*dbRootPMMap)[dbRoot].begin(); pmNum = oamcache->getOwnerPM(dbRoot);
// send to WES to process // send to WES to process
try try

View File

@@ -326,8 +326,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackageInternal(
bytestream << (uint32_t)dbRoot; bytestream << (uint32_t)dbRoot;
tableDef.serialize(bytestream); tableDef.serialize(bytestream);
boost::shared_ptr<messageqcpp::ByteStream> bsIn; boost::shared_ptr<messageqcpp::ByteStream> bsIn;
boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap(); pmNum = oamcache->getOwnerPM(dbRoot);
pmNum = (*dbRootPMMap)[dbRoot];
// MCOL-66 The DBRM can't handle concurrent DDL // MCOL-66 The DBRM can't handle concurrent DDL
boost::mutex::scoped_lock lk(dbrmMutex); boost::mutex::scoped_lock lk(dbrmMutex);
@@ -450,7 +449,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackageInternal(
bytestream << (uint32_t)dbRoot; bytestream << (uint32_t)dbRoot;
tableDef.serialize(bytestream); tableDef.serialize(bytestream);
pmNum = (*dbRootPMMap)[dbRoot]; pmNum = oamcache->getOwnerPM(dbRoot);
try try
{ {
@@ -666,7 +665,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackageInternal(
return result; return result;
} }
pmNum = (*dbRootPMMap)[useDBRoot]; pmNum = oamcache->getOwnerPM(useDBRoot);
try try
{ {

View File

@@ -359,8 +359,7 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackageInternal(ddlpack
return result; return result;
} }
boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap(); pmNum = oamcache->getOwnerPM(dbRoot);
pmNum = (*dbRootPMMap)[dbRoot];
try try
{ {
@@ -463,7 +462,7 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackageInternal(ddlpack
return result; return result;
} }
pmNum = (*dbRootPMMap)[dbRoot]; pmNum = oamcache->getOwnerPM(dbRoot);
try try
{ {
@@ -1274,8 +1273,7 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackageInternal(ddlpa
bytestream << (uint32_t)colType.compressionType; bytestream << (uint32_t)colType.compressionType;
} }
boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap(); pmNum = oamcache->getOwnerPM(useDBRoot);
pmNum = (*dbRootPMMap)[useDBRoot];
try try
{ {

View File

@@ -845,20 +845,15 @@ void CommandPackageProcessor::clearTableLock(uint64_t uniqueId, const dmlpackage
establishTableLockToClear(tableLockID, lockInfo); establishTableLockToClear(tableLockID, lockInfo);
lockGrabbed = true; lockGrabbed = true;
oam::OamCache* oamCache = oam::OamCache::makeOamCache();
//oam::OamCache::dbRootPMMap_t dbRootPmMap = oamCache->getDBRootToPMMap();
//std::map<int, int>::const_iterator mapIter;
std::set<int> pmSet; std::set<int> pmSet;
// Construct relevant list of PMs based on the DBRoots associated // Construct relevant list of PMs based on the DBRoots associated
// with the tableLock. // with the tableLock.
for (unsigned int k = 0; k < lockInfo.dbrootList.size(); k++) for (unsigned int k = 0; k < lockInfo.dbrootList.size(); k++)
{ {
mapIter = dbRootPmMap->find(lockInfo.dbrootList[k]); if (!oamcache()->isOffline(lockInfo.dbrootList[k]))
if (!oamCache->isOffline(lockInfo.dbrootList[k]))
{ {
int pm = oamCache->getOwnerPM(lockInfo.dbrootList[k]); int pm = oamcache()->getOwnerPM(lockInfo.dbrootList[k]);
pmSet.insert(pm); pmSet.insert(pm);
} }
else else

View File

@@ -643,7 +643,7 @@ bool DeletePackageProcessor::processRowgroup(ByteStream& aRowGroup, DMLResult& r
{ {
bool rc = false; bool rc = false;
// cout << "Get dbroot " << dbroot << endl; // cout << "Get dbroot " << dbroot << endl;
int pmNum = (*fDbRootPMMap)[dbroot]; int pmNum = oamcache()->getOwnerPM(dbroot);
DMLTable* tablePtr = cpackage.get_Table(); DMLTable* tablePtr = cpackage.get_Table();
ByteStream bytestream; ByteStream bytestream;
bytestream << (ByteStream::byte)WE_SVR_DELETE; bytestream << (ByteStream::byte)WE_SVR_DELETE;

View File

@@ -165,8 +165,6 @@ class DMLPackageProcessor
std::cout << "Cannot make connection to WES" << std::endl; std::cout << "Cannot make connection to WES" << std::endl;
} }
oam::OamCache* oamCache = oam::OamCache::makeOamCache();
fDbRootPMMap = oamCache->getDBRootToPMMap();
fDbrm = aDbrm; fDbrm = aDbrm;
fSessionID = sid; fSessionID = sid;
fExeMgr = new execplan::ClientRotator(fSessionID, "ExeMgr"); fExeMgr = new execplan::ClientRotator(fSessionID, "ExeMgr");
@@ -489,6 +487,8 @@ class DMLPackageProcessor
uint32_t tableOid); uint32_t tableOid);
int endTransaction(uint64_t uniqueId, BRM::TxnID txnID, bool success); int endTransaction(uint64_t uniqueId, BRM::TxnID txnID, bool success);
oam::OamCache* oamcache() { return oam::OamCache::makeOamCache(); }
/** @brief the Session Manager interface /** @brief the Session Manager interface
*/ */
execplan::SessionManager fSessionManager; execplan::SessionManager fSessionManager;
@@ -500,8 +500,6 @@ class DMLPackageProcessor
uint32_t fPMCount; uint32_t fPMCount;
WriteEngine::WEClients* fWEClient; WriteEngine::WEClients* fWEClient;
BRM::DBRM* fDbrm; BRM::DBRM* fDbrm;
boost::shared_ptr<std::map<int, std::set<int>> > fDbRootPMMap;
oam::Oam fOam;
bool fRollbackPending; // When set, any derived object should stop what it's doing and cleanup in bool fRollbackPending; // When set, any derived object should stop what it's doing and cleanup in
// preparation for a Rollback // preparation for a Rollback
execplan::ClientRotator* fExeMgr; execplan::ClientRotator* fExeMgr;

View File

@@ -275,8 +275,7 @@ DMLPackageProcessor::DMLResult InsertPackageProcessor::processPackageInternal(
if (tmpSet) if (tmpSet)
{ {
dbroot = tmp.dbRoot; dbroot = tmp.dbRoot;
boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap(); pmNum = oamcache->getOwnerPM(dbroot);
pmNum = (*dbRootPMMap)[dbroot];
//@Bug 4760. validate pm value //@Bug 4760. validate pm value
if (pmNum == 0) if (pmNum == 0)

View File

@@ -161,8 +161,7 @@ DMLPackageProcessor::DMLResult UpdatePackageProcessor::processPackageInternal(
int32_t sessionId = fSessionID; int32_t sessionId = fSessionID;
std::string processName("DMLProc"); std::string processName("DMLProc");
int i = 0; int i = 0;
OamCache* oamcache = OamCache::makeOamCache(); std::vector<int> pmList = oamcache()->getModuleIds();
std::vector<int> pmList = oamcache->getModuleIds();
std::vector<uint32_t> pms; std::vector<uint32_t> pms;
for (unsigned i = 0; i < pmList.size(); i++) for (unsigned i = 0; i < pmList.size(); i++)
@@ -437,8 +436,7 @@ uint64_t UpdatePackageProcessor::fixUpRows(dmlpackage::CalpontDMLPackage& cpacka
uint64_t rowsProcessed = 0; uint64_t rowsProcessed = 0;
uint32_t dbroot = 1; uint32_t dbroot = 1;
bool metaData = false; bool metaData = false;
oam::OamCache* oamCache = oam::OamCache::makeOamCache(); std::vector<int> fPMs = oamcache()->getModuleIds();
std::vector<int> fPMs = oamCache->getModuleIds();
std::map<unsigned, bool> pmState; std::map<unsigned, bool> pmState;
string emsg; string emsg;
string emsgStr; string emsgStr;
@@ -726,7 +724,7 @@ bool UpdatePackageProcessor::processRowgroup(ByteStream& aRowGroup, DMLResult& r
{ {
bool rc = false; bool rc = false;
// cout << "Get dbroot " << dbroot << endl; // cout << "Get dbroot " << dbroot << endl;
uint32_t pmNum = (*fDbRootPMMap)[dbroot]; uint32_t pmNum = oamcache()->getOwnerPM(dbroot);
ByteStream bytestream; ByteStream bytestream;
bytestream << (uint8_t)WE_SVR_UPDATE; bytestream << (uint8_t)WE_SVR_UPDATE;

View File

@@ -39,7 +39,7 @@ using namespace boost;
namespace oam namespace oam
{ {
#if 0
struct CacheReloaded struct CacheReloaded
{ {
CacheReloaded() CacheReloaded()
@@ -246,5 +246,6 @@ bool OamCache::isOffline(int dbRoot)
{ {
return dbRootConnectionMap->find(scannedExtents[i].dbRoot) == dbRootConnectionMap->end(); return dbRootConnectionMap->find(scannedExtents[i].dbRoot) == dbRootConnectionMap->end();
} }
#endif
} /* namespace oam */ } /* namespace oam */

View File

@@ -51,6 +51,7 @@ class OamCache
int getClosestConnection(int dbroot); // connection index to owner's PM or ours PM - who can access dbRoot. int getClosestConnection(int dbroot); // connection index to owner's PM or ours PM - who can access dbRoot.
int getOwnerConnection(int dbroot); // connection index to owner's PM. int getOwnerConnection(int dbroot); // connection index to owner's PM.
int getOwnerPM(int dbroot); // Owner's PM index. int getOwnerPM(int dbroot); // Owner's PM index.
std::vector<int> getPMDBRoots(int PM); // what DBRoots are owned by given PM.
bool isAccessibleBy(int dbRoot, int pmId); bool isAccessibleBy(int dbRoot, int pmId);
//// a necessary DB root is offline //// a necessary DB root is offline
//if (dbRootConnectionMap->find(scannedExtents[i].dbRoot) == dbRootConnectionMap->end()) //if (dbRootConnectionMap->find(scannedExtents[i].dbRoot) == dbRootConnectionMap->end())

View File

@@ -48,32 +48,7 @@ BatchLoader::BatchLoader(uint32_t tableOid, execplan::CalpontSystemCatalog::SCN
fPMs = PMs; fPMs = PMs;
fSessionId = sessionId; fSessionId = sessionId;
fTableOid = tableOid; fTableOid = tableOid;
OamCache* oamcache = OamCache::makeOamCache(); fOamCache = OamCache::makeOamCache();
oam::OamCache::PMDbrootsMap_t systemPmDbrootMap = oamcache->getPMToDbrootsMap();
std::map<int, OamCache::dbRoots>::iterator iter = systemPmDbrootMap->begin();
// cout << "fPMs size is " << fPMs.size() << endl;
fPmDbrootMap.reset(new OamCache::PMDbrootsMap_t::element_type());
fDbrootPMmap.reset(new map<int, int>());
for (uint32_t i = 0; i < fPMs.size(); i++)
{
iter = systemPmDbrootMap->find(fPMs[i]);
if (iter != systemPmDbrootMap->end())
{
fDbRoots.insert(fDbRoots.end(), (iter->second).begin(), (iter->second).end());
(*fPmDbrootMap)[fPMs[i]] = iter->second;
}
}
// Build dbroot to PM map
for (iter = fPmDbrootMap->begin(); iter != fPmDbrootMap->end(); iter++)
{
for (uint32_t i = 0; i < iter->second.size(); i++)
{
(*fDbrootPMmap)[iter->second[i]] = iter->first;
}
}
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Select the first PM to send the first batch of rows. // Select the first PM to send the first batch of rows.
@@ -199,12 +174,7 @@ void BatchLoader::selectFirstPM(uint32_t& PMId)
if (createdDbroot != 0) if (createdDbroot != 0)
{ {
std::map<int, int>::iterator iter = fDbrootPMmap->begin(); PMId = fOamCache->getOwnerPM(createdDbroot);
iter = fDbrootPMmap->find(createdDbroot);
if (iter != fDbrootPMmap->end())
PMId = iter->second;
} }
// This will build the batch distribution sequence // This will build the batch distribution sequence
@@ -233,12 +203,12 @@ void BatchLoader::selectFirstPM(uint32_t& PMId)
{ {
PMRootInfo aEntry; PMRootInfo aEntry;
aEntry.PMId = fPmDistSeq[j]; aEntry.PMId = fPmDistSeq[j];
iter = fPmDbrootMap->find(aEntry.PMId); auto dbroots = fOamCache->getPMDBRoots(PMId);
for (unsigned k = 0; k < (iter->second).size(); k++) for (unsigned k = 0; k < dbroots.size(); k++)
{ {
RootExtentsBlocks aRootInfo; RootExtentsBlocks aRootInfo;
aRootInfo.DBRoot = (iter->second)[k]; aRootInfo.DBRoot = dbroots[k];
aRootInfo.numExtents = rootExtents[aRootInfo.DBRoot]; aRootInfo.numExtents = rootExtents[aRootInfo.DBRoot];
aRootInfo.numBlocks = rootBlocks[aRootInfo.DBRoot]; aRootInfo.numBlocks = rootBlocks[aRootInfo.DBRoot];
// cout << "aRootInfo DBRoot:numExtents:numBlocks = " << // cout << "aRootInfo DBRoot:numExtents:numBlocks = " <<
@@ -410,17 +380,15 @@ void BatchLoader::buildBatchDistSeqVector()
fPmDistSeq.clear(); fPmDistSeq.clear();
BlIntVec aDbCntVec(fPMs.size()); BlIntVec aDbCntVec(fPMs.size());
std::map<int, OamCache::dbRoots>::iterator iter = fPmDbrootMap->begin();
for (uint32_t i = 0; i < fPMs.size(); i++) for (uint32_t i = 0; i < fPMs.size(); i++)
{ {
iter = fPmDbrootMap->find(fPMs[i]); auto dbroots = fOamCache->getPMDBRoots(fPMs[i]);
if ((iter != fPmDbrootMap->end()) && ((iter->second).begin() != (iter->second).end())) if (dbroots.size() > 0)
{ {
try try
{ {
aDbCntVec[i] = (iter->second).size(); aDbCntVec[i] = dbroots.size();
// cout << "PM - "<<fPMs[i] << " Size = " << aDbCntVec[i] << endl; // cout << "PM - "<<fPMs[i] << " Size = " << aDbCntVec[i] << endl;
} }
catch (std::exception& exp) catch (std::exception& exp)
@@ -507,15 +475,13 @@ void BatchLoader::buildBatchDistSeqVector(uint32_t StartPm)
} }
} }
std::map<int, OamCache::dbRoots>::iterator iter = fPmDbrootMap->begin();
for (uint32_t i = 0; i < aPms.size(); i++) for (uint32_t i = 0; i < aPms.size(); i++)
{ {
iter = fPmDbrootMap->find(aPms[i]); auto dbroots = fOamCache->getPMDBRoots(aPms[i]);
if ((iter != fPmDbrootMap->end()) && ((iter->second).begin() != (iter->second).end())) if (dbroots.size())
{ {
aDbCntVec[i] = (iter->second).size(); aDbCntVec[i] = dbroots.size();
// cout << "PM - "<<aPms[i] << " Size = " << aDbCntVec[i] << endl; // cout << "PM - "<<aPms[i] << " Size = " << aDbCntVec[i] << endl;
} }
else else

View File

@@ -119,8 +119,7 @@ class BatchLoader
uint32_t fFirstPm; uint32_t fFirstPm;
execplan::CalpontSystemCatalog::SCN fSessionId; execplan::CalpontSystemCatalog::SCN fSessionId;
uint32_t fTableOid; uint32_t fTableOid;
oam::OamCache::PMDbrootsMap_t fPmDbrootMap; oam::OamCache* fOamCache;
oam::OamCache::dbRootPMMap_t fDbrootPMmap;
}; };
} // namespace batchloader } // namespace batchloader

View File

@@ -37,6 +37,7 @@ namespace WriteEngine
WE_DDLCommandClient::WE_DDLCommandClient() WE_DDLCommandClient::WE_DDLCommandClient()
{ {
fWEClient = new WEClients(WEClients::DDLPROC); fWEClient = new WEClients(WEClients::DDLPROC);
fOamCache = oam::OamCache->makeOamCache();
} }
WE_DDLCommandClient::~WE_DDLCommandClient() WE_DDLCommandClient::~WE_DDLCommandClient()
@@ -64,7 +65,7 @@ uint8_t WE_DDLCommandClient::UpdateSyscolumnNextval(uint32_t columnOid, uint64_t
try try
{ {
fOam.getDbrootPmConfig(dbRoot, pmNum); pmNum = fOamCache->getOwnerPM(dbRoot);
fWEClient->write(command, pmNum); fWEClient->write(command, pmNum);
while (1) while (1)

View File

@@ -26,6 +26,7 @@
#include "dbrm.h" #include "dbrm.h"
#include "liboamcpp.h" #include "liboamcpp.h"
#include "writeengine.h" #include "writeengine.h"
#include "oamcache.h"
#define EXPORT #define EXPORT
@@ -50,7 +51,7 @@ class WE_DDLCommandClient
private: private:
BRM::DBRM fDbrm; BRM::DBRM fDbrm;
WEClients* fWEClient; WEClients* fWEClient;
oam::Oam fOam; oam::OamCache* fOamCache;
}; };
} // namespace WriteEngine } // namespace WriteEngine

View File

@@ -758,8 +758,7 @@ int RedistributeControlThread::executeRedistributePlan()
int RedistributeControlThread::connectToWes(int dbroot) int RedistributeControlThread::connectToWes(int dbroot)
{ {
int ret = 0; int ret = 0;
OamCache::dbRootPMMap_t dbrootToPM = fOamCache->getDBRootToPMMap(); int pmId = fOamCache->getOwnerPM(dbroot);
int pmId = (*dbrootToPM)[dbroot];
ostringstream oss; ostringstream oss;
oss << "pm" << pmId << "_WriteEngineServer"; oss << "pm" << pmId << "_WriteEngineServer";

View File

@@ -134,11 +134,10 @@ void RedistributeWorkerThread::handleRequest()
{ {
memcpy(&fPlanEntry, fBs.buf(), sizeof(RedistributePlanEntry)); memcpy(&fPlanEntry, fBs.buf(), sizeof(RedistributePlanEntry));
fBs.advance(sizeof(RedistributePlanEntry)); fBs.advance(sizeof(RedistributePlanEntry));
OamCache::dbRootPMMap_t dbrootToPM = fOamCache->getDBRootToPMMap();
fMyId.first = fPlanEntry.source; fMyId.first = fPlanEntry.source;
fMyId.second = (*dbrootToPM)[fMyId.first]; fMyId.second = fOamCache->getOwnerPM(fMyId.first);
fPeerId.first = fPlanEntry.destination; fPeerId.first = fPlanEntry.destination;
fPeerId.second = (*dbrootToPM)[fPeerId.first]; fPeerId.second = fOamCache->getOwnerPM(fPeerId.first);
if (grabTableLock() == 0) if (grabTableLock() == 0)
{ {