1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00
Files
mariadb-columnstore-engine/writeengine/redistribute/we_redistributecontrolthread.cpp
Serguey Zefirov 5aa2a824c2 feat(MCOL-6082): Multiple readers of dbroots using OamCache logic
This patch introduces centralized logic of selecting what dbroot is
accessible in PrimProc on what node. The logic is in OamCache for time
being and can be moved later.
2025-07-21 14:32:39 +03:00

839 lines
24 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/*
* $Id: we_redistributecontrolthread.cpp 4450 2013-01-21 14:13:24Z rdempsey $
*/
#include <iostream>
#include <set>
#include <vector>
#include <cassert>
#include <stdexcept>
#include <sstream>
#include <string>
#include <unistd.h>
using namespace std;
#include "boost/scoped_ptr.hpp"
#include "boost/scoped_array.hpp"
#include "boost/thread/mutex.hpp"
#include "boost/filesystem.hpp"
using namespace boost;
#include "mcsconfig.h"
#include "installdir.h"
#include "configcpp.h"
using namespace config;
#include "liboamcpp.h"
#include "oamcache.h"
using namespace oam;
#include "dbrm.h"
using namespace BRM;
#include "messagequeue.h"
#include "bytestream.h"
using namespace messageqcpp;
#include "calpontsystemcatalog.h"
using namespace execplan;
#include "we_messages.h"
#include "we_redistributedef.h"
#include "we_redistributecontrol.h"
#include "we_redistributecontrolthread.h"
namespace redistribute
{
// static variables
boost::mutex RedistributeControlThread::fActionMutex;
volatile bool RedistributeControlThread::fStopAction = false;
string RedistributeControlThread::fWesInUse;
void RedistributeControlThread::setStopAction(bool s)
{
boost::mutex::scoped_lock lock(fActionMutex);
fStopAction = s;
}
RedistributeControlThread::RedistributeControlThread(uint32_t act)
: fAction(act), fMaxDbroot(0), fEntryCount(0), fErrorCode(RED_EC_OK)
{
}
RedistributeControlThread::~RedistributeControlThread()
{
// fWEClient->removeQueue(uniqueId);
}
void RedistributeControlThread::operator()()
{
if (fAction == RED_CNTL_START)
doRedistribute();
else if (fAction == RED_CNTL_STOP)
doStopAction();
}
void RedistributeControlThread::doRedistribute()
{
if (setup() != 0)
fErrorCode = RED_EC_CNTL_SETUP_FAIL;
else if (makeRedistributePlan() != 0)
fErrorCode = RED_EC_MAKEPLAN_FAIL;
try
{
if (fErrorCode == RED_EC_OK && !fStopAction && fEntryCount > 0)
executeRedistributePlan();
}
catch (const std::exception& ex)
{
fErrorMsg += ex.what();
fErrorCode = RED_EC_EXECUTE_FAIL;
}
catch (...)
{
fErrorMsg += "Error when executing the plan.";
fErrorCode = RED_EC_EXECUTE_FAIL;
}
uint32_t state = RED_STATE_FINISH;
if (fErrorCode != RED_EC_OK)
state = RED_STATE_FAILED;
try
{
if (!fStopAction)
fControl->updateState(state);
}
catch (const std::exception& ex)
{
fErrorMsg += ex.what();
if (fErrorCode == RED_EC_OK)
fErrorCode = RED_EC_UPDATE_STATE;
}
catch (...)
{
fErrorMsg += "Error when updating state.";
if (fErrorCode == RED_EC_OK)
fErrorCode = RED_EC_UPDATE_STATE;
}
if (fErrorMsg.empty())
fControl->logMessage("finished @controlThread::doRedistribute");
else
fControl->logMessage(fErrorMsg + " @controlThread::doRedistribute");
{
boost::mutex::scoped_lock lock(fActionMutex);
fWesInUse.clear();
}
}
int RedistributeControlThread::setup()
{
int ret = 0;
try
{
// fUniqueId = fDbrm.getUnique64();
// fWEClient = WriteEngine::WEClients::instance(WriteEngine::WEClients::REDISTRIBUTE);
// fWEClient->addQueue(uniqueId);
fConfig = Config::makeConfig();
fOamCache = oam::OamCache::makeOamCache();
fControl = RedistributeControl::instance();
// fOam.reset(new oam::Oam);
// fDbrm.reset(new BRM::DBRM);
vector<int>::iterator i = fControl->fSourceList.begin();
for (; i != fControl->fSourceList.end(); i++)
{
fSourceSet.insert(*i);
fDbrootSet.insert(*i);
if (*i > fMaxDbroot)
fMaxDbroot = *i;
}
vector<int>::iterator j = fControl->fDestinationList.begin();
for (; j != fControl->fDestinationList.end(); j++)
{
fTargetSet.insert(*j);
if (fDbrootSet.find(*j) == fDbrootSet.end())
{
fDbrootSet.insert(*j);
}
// if (*j > fMaxDbroot)
// fMaxDbroot = *j;
}
}
catch (const std::exception& ex)
{
fErrorMsg += ex.what();
ret = 1;
}
catch (...)
{
ret = 1;
}
return ret;
}
int RedistributeControlThread::makeRedistributePlan()
{
int ret = 0;
try
{
if (fControl->fPlanFilePtr != NULL)
{
// should not happen, just in case.
fclose(fControl->fPlanFilePtr);
fControl->fPlanFilePtr = NULL;
}
// get all user table oids
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(0);
vector<pair<CalpontSystemCatalog::OID, CalpontSystemCatalog::TableName> > tables = csc->getTables();
vector<pair<CalpontSystemCatalog::OID, CalpontSystemCatalog::TableName> >::iterator i;
for (i = tables.begin(); i != tables.end(); i++)
{
// in case, action is cancelled.
if (fStopAction)
break;
// column oids
CalpontSystemCatalog::RIDList cols = csc->columnRIDs(i->second, true);
typedef std::map<PartitionInfo, RedistributeExtentEntry> PartitionExtentMap;
PartitionExtentMap partitionMap;
vector<EMEntry> entries;
// sample the first column
int rc = fControl->fDbrm->getExtents(cols[0].objnum, entries, false, false, true);
if (rc != 0 || entries.size() == 0)
{
ostringstream oss;
oss << "Error in DBRM getExtents; oid:" << cols[0].objnum << "; returnCode: " << rc;
throw runtime_error(oss.str());
}
for (vector<EMEntry>::iterator j = entries.begin(); j != entries.end(); j++)
{
RedistributeExtentEntry redEntry;
redEntry.oid = cols[0].objnum;
redEntry.dbroot = j->dbRoot;
redEntry.partition = j->partitionNum;
redEntry.segment = j->segmentNum;
redEntry.lbid = j->range.start;
redEntry.range = j->range.size * 1024;
PartitionInfo partInfo(j->dbRoot, j->partitionNum);
partitionMap.insert(make_pair(partInfo, redEntry));
}
// sort partitions by dbroot
vector<vector<int> > dbPartVec(fMaxDbroot + 1);
uint64_t totalPartitionCount = 0;
int maxPartitionId = 0;
for (PartitionExtentMap::iterator j = partitionMap.begin(); j != partitionMap.end(); j++)
{
int dbroot = j->first.dbroot;
if (fSourceSet.find(dbroot) != fSourceSet.end())
{
// only dbroot in source list needs attention
dbPartVec[dbroot].push_back(j->first.partition);
if (j->first.partition > maxPartitionId)
maxPartitionId = j->first.partition;
totalPartitionCount++;
}
}
// sort the partitions
for (vector<vector<int> >::iterator k = dbPartVec.begin(); k != dbPartVec.end(); k++)
sort(k->begin(), k->end());
// divide the dbroots into the source and target sets
uint64_t average = totalPartitionCount / fTargetSet.size();
// Remainder is the number of partitions that must be spread across some
// of the dbroots such that no dbroot has more than average+1 partitions.
uint64_t remainder = totalPartitionCount % fTargetSet.size();
set<int> sourceDbroots;
set<int> targetDbroots;
// list<pair<size_t, int> > targetList; // to be ordered by partition size
int64_t extra = remainder;
for (set<int>::iterator j = fDbrootSet.begin(); j != fDbrootSet.end(); ++j)
{
if (fTargetSet.find(*j) == fTargetSet.end())
{
// Not a target (removed on command line). Always a source.
sourceDbroots.insert(*j);
continue;
}
// If a dbroot has exactly average+1 partitions and there's extras to be had,
// then it is neither a source nor a target.
if ((dbPartVec[*j].size() == average + 1) && extra)
{
--extra;
continue;
}
if (dbPartVec[*j].size() > average)
{
// Sources are those dbroots with more than average partitions
sourceDbroots.insert(*j);
}
else
{
// Targets are those with room ( <= average )
targetDbroots.insert(*j);
}
}
// At this point, there are two concepts of target. (1)Those in fTargetSet, which is the
// set of dbroots the user wants partitions on and (2) those in targetDbroots, a subset of
// fTargetSet, which is those that actually have room, based on average, for more data.
// After redistribution, partition count for each dbroot is average or average+1.
// When remainder > 0, some targets will have (average+1) partitions.
// loop through target dbroots and find partitions from sources to move to each.
set<int>::iterator sourceDbroot = sourceDbroots.begin();
int sourceCnt = sourceDbroots.size();
for (set<int>::iterator targetDbroot = targetDbroots.begin(); targetDbroot != targetDbroots.end();
++targetDbroot)
{
// check if this target will have average + 1 partitions.
uint64_t e = 0;
if (extra-- > 0)
e = 1;
// A set of the partitions already on the target. We try not to move the same partition here.
set<int> targetParts(dbPartVec[*targetDbroot].begin(), dbPartVec[*targetDbroot].end());
if (targetParts.size() >= (average + e))
continue; // Don't move any partitions to this target
// partitions to be moved to this target
vector<PartitionInfo> planVec;
// looking for source partitions start from partition 0
bool done = false; // if target got enough partitions, set to true.
int loop = 0; // avoid infinite loop. maxPartitionId is the last partition of one of the dbroots.
// It's a place to stop if all else fails.
while (!done && loop <= maxPartitionId)
{
for (int p = loop++; p <= maxPartitionId && !done; ++p)
{
bool found = false;
if (targetParts.find(p) ==
targetParts.end()) // True if the partition is not on the target already
{
// try to find partition p in one of the source dbroots
for (int x = 0; x < sourceCnt && !found; ++x)
{
vector<int>& sourceParts = dbPartVec[*sourceDbroot];
// This partition needs to move if:
// 1) source still has more than average partitions, or
// 2) source is not a listed target (we want to empty source).
bool bNotTarget =
fTargetSet.find(*sourceDbroot) == fTargetSet.end(); // true if source not in target list
if (sourceParts.size() >= average || bNotTarget)
{
vector<int>::iterator y = find(sourceParts.begin(), sourceParts.end(), p);
if ((y != sourceParts.end()) && (sourceParts.size() > targetParts.size() || bNotTarget))
{
targetParts.insert(p);
planVec.push_back(PartitionInfo(*sourceDbroot, p));
found = true;
// update the source
sourceParts.erase(y);
}
}
if (++sourceDbroot == sourceDbroots.end())
sourceDbroot = sourceDbroots.begin();
} // for source
if (targetParts.size() == (average + e))
done = true;
} // !find p
} // for p
} // while loop
// dump the plan for the target to file
dumpPlanToFile(i->first, planVec, *targetDbroot);
} // for target
// It's possible that a source that is "removed" on the command line is not empty.
// This can happen if a partition exists on all dbroots.
// WCOL-786: use nextDbroot to start the loop looking for a suitible target
// where we left off with the previous partition. This gives each target
// an opportunity to get some of the data. In the case of dbroot removal,
// there is often the same number of partitions on each remaining dbroot.
// This logic tends to roundrobin which dbroot gets the next batch.
set<int>::iterator nextDbroot = targetDbroots.begin();
set<int>::iterator targetDbroot;
int targetCnt = (int)targetDbroots.size();
// Loop through the sources, looking for dbroots that are not targets that also still contain partitions
for (set<int>::iterator sourceDbroot = sourceDbroots.begin(); sourceDbroot != sourceDbroots.end();
++sourceDbroot)
{
// Is this source in target list? If so, do nothing.
if (fTargetSet.find(*sourceDbroot) != fTargetSet.end())
continue;
vector<int>& sourceParts = dbPartVec[*sourceDbroot]; // Partitions still on source
// We can't erase from a vector we're iterating, so we need a kludge:
for (int p = 0; p <= maxPartitionId; ++p)
{
vector<int>::iterator sourcePart = find(sourceParts.begin(), sourceParts.end(), p);
if (sourcePart == sourceParts.end())
{
continue;
}
// Look through targets to see which can accept this partition. Find the one with the least
// number of partitions. Someday we want to put with the dbroot having the fewest segments of
// the partition.
uint64_t partCount = std::numeric_limits<uint64_t>::max();
int tdbroot = 0;
targetDbroot = nextDbroot;
// MCOL-786. Start at targetDbroot and loop around back to the same spot.
for (int tbd = 0; tbd < targetCnt; ++tbd)
{
if (targetDbroot == targetDbroots.end())
{
targetDbroot = targetDbroots.begin();
}
if (dbPartVec[*targetDbroot].size() < partCount)
{
tdbroot = *targetDbroot;
partCount = dbPartVec[*targetDbroot].size();
nextDbroot = targetDbroot;
++nextDbroot;
}
++targetDbroot;
}
if (tdbroot == 0)
{
continue;
}
set<int> targetParts(dbPartVec[tdbroot].begin(), dbPartVec[tdbroot].end());
vector<PartitionInfo> planVec; // partitions to be moved to this target
targetParts.insert(p);
planVec.push_back(PartitionInfo(*sourceDbroot, p));
sourceParts.erase(sourcePart);
dumpPlanToFile(i->first, planVec, tdbroot);
} // for sourceParts
} // for source
} // for tables
}
catch (const std::exception& ex)
{
fErrorMsg += ex.what();
ret = 2;
}
catch (...)
{
ret = 2;
}
displayPlan();
return ret;
}
void RedistributeControlThread::dumpPlanToFile(uint64_t oid, vector<PartitionInfo>& vec, int target)
{
// open the plan file, if not already opened, to write.
if (fControl->fPlanFilePtr == NULL)
{
errno = 0;
fControl->fPlanFilePtr = fopen(fControl->fPlanFilePath.c_str(), "w+");
if (fControl->fPlanFilePtr == NULL)
{
int e = errno;
ostringstream oss;
oss << "Failed to open redistribute.plan: " << strerror(e) << " (" << e << ")";
throw runtime_error(oss.str());
}
}
size_t entryNum = vec.size();
scoped_array<RedistributePlanEntry> entries(new RedistributePlanEntry[entryNum]);
for (uint64_t i = 0; i < entryNum; ++i)
{
entries[i].table = oid;
entries[i].source = vec[i].dbroot;
entries[i].partition = vec[i].partition;
entries[i].destination = target;
entries[i].status = RED_TRANS_READY;
}
errno = 0;
size_t n = fwrite(entries.get(), sizeof(RedistributePlanEntry), entryNum, fControl->fPlanFilePtr);
if (n != entryNum) // need retry
{
int e = errno;
ostringstream oss;
oss << "Failed to write into redistribute.plan: " << strerror(e) << " (" << e << ")";
throw runtime_error(oss.str());
}
fEntryCount += entryNum;
}
void RedistributeControlThread::displayPlan()
{
// start from the first entry
try
{
if (!fControl->fPlanFilePtr)
{
ostringstream oss;
oss << "No data is schefuled to be moved" << endl;
fControl->logMessage(oss.str());
return;
}
rewind(fControl->fPlanFilePtr);
ByteStream bs;
uint32_t entryId = 0;
long entrySize = sizeof(RedistributePlanEntry);
fControl->logMessage(string("Redistribution Plan:"));
while (entryId++ < fEntryCount)
{
RedistributePlanEntry entry;
errno = 0;
size_t n = fread(&entry, entrySize, 1, fControl->fPlanFilePtr);
if (n != 1)
{
int e = errno;
ostringstream oss;
oss << "Failed to read from redistribute.plan: " << strerror(e) << " (" << e << ")";
throw runtime_error(oss.str());
}
// Print this plan entry
ostringstream oss;
oss << "table oid " << entry.table << " partition " << entry.partition << " moves from dbroot "
<< entry.source << " to " << entry.destination << endl;
fControl->logMessage(oss.str());
}
}
catch (std::exception& e)
{
ostringstream oss;
oss << "exception during display of plan: " << e.what() << endl;
fControl->logMessage(oss.str());
}
catch (...)
{
ostringstream oss;
oss << "exception during display of plan" << endl;
fControl->logMessage(oss.str());
}
}
int RedistributeControlThread::executeRedistributePlan()
{
// update the info with total partitions to move
fControl->setEntryCount(fEntryCount);
// start from the first entry
rewind(fControl->fPlanFilePtr);
ByteStream bs;
uint32_t entryId = 0;
long entrySize = sizeof(RedistributePlanEntry);
while (entryId++ < fEntryCount)
{
try
{
// skip system status check in case no OAM
/*
if (getenv("SKIP_OAM_INIT") == NULL)
{
// make sure system is in active state
bool isActive = false;
while (!isActive)
{
bool noExcept = true;
SystemStatus systemstatus;
try
{
fControl->fOam->getSystemStatus(systemstatus);
}
catch (const std::exception& ex)
{
fErrorMsg += ex.what();
noExcept = false;
}
catch (...)
{
noExcept = false;
}
if (noExcept && ((isActive = (systemstatus.SystemOpState == oam::ACTIVE)) == false))
sleep(1);;
}
}
*/
if (fStopAction)
return RED_EC_USER_STOP;
RedistributePlanEntry entry;
errno = 0;
size_t n = fread(&entry, entrySize, 1, fControl->fPlanFilePtr);
if (n != 1)
{
int e = errno;
ostringstream oss;
oss << "Failed to read from redistribute.plan: " << strerror(e) << " (" << e << ")";
throw runtime_error(oss.str());
}
if (entry.status != (int)RED_TRANS_READY)
continue;
// send the job to source dbroot
size_t headerSize = sizeof(RedistributeMsgHeader);
size_t entrySize = sizeof(RedistributePlanEntry);
RedistributeMsgHeader header(entry.destination, entry.source, entryId, RED_ACTN_REQUEST);
if (connectToWes(header.source) == 0)
{
bs.restart();
entry.starttime = time(NULL);
bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;
bs.append((const ByteStream::byte*)&header, headerSize);
bs.append((const ByteStream::byte*)&entry, entrySize);
fMsgQueueClient->write(bs);
SBS sbs = fMsgQueueClient->read();
entry.status = RED_TRANS_FAILED;
if (sbs->length() == 0)
{
ostringstream oss;
oss << "Zero byte read, Network error. entryID=" << entryId;
fErrorMsg = oss.str();
}
else if (sbs->length() < (headerSize + entrySize + 1))
{
ostringstream oss;
oss << "Short message, length=" << sbs->length() << ". entryID=" << entryId;
fErrorMsg = oss.str();
}
else
{
ByteStream::byte wesMsgId;
*sbs >> wesMsgId;
// Need check header info
// const RedistributeMsgHeader* h = (const RedistributeMsgHeader*) sbs->buf();
sbs->advance(headerSize);
const RedistributePlanEntry* e = (const RedistributePlanEntry*)sbs->buf();
sbs->advance(entrySize);
entry.status = e->status;
entry.endtime = time(NULL);
// if (entry.status == (int32_t) RED_TRANS_FAILED)
// *sbs >> fErrorMsg;
}
// done with this connection, may consider to reuse.
fMsgQueueClient.reset();
}
else
{
entry.status = RED_TRANS_FAILED;
ostringstream oss;
oss << "Connect to PM failed."
<< ". entryID=" << entryId;
fErrorMsg += oss.str();
}
if (!fErrorMsg.empty())
throw runtime_error(fErrorMsg);
errno = 0;
int rc = fseek(fControl->fPlanFilePtr, -((long)entrySize), SEEK_CUR);
if (rc != 0)
{
int e = errno;
ostringstream oss;
oss << "fseek is failed: " << strerror(e) << " (" << e << "); entry id=" << entryId;
throw runtime_error(oss.str());
}
errno = 0;
n = fwrite(&entry, entrySize, 1, fControl->fPlanFilePtr);
if (n != 1) // need retry
{
int e = errno;
ostringstream oss;
oss << "Failed to update redistribute.plan: " << strerror(e) << " (" << e
<< "); entry id=" << entryId;
throw runtime_error(oss.str());
}
fflush(fControl->fPlanFilePtr);
fControl->updateProgressInfo(entry.status, entry.endtime);
}
catch (const std::exception& ex)
{
fControl->logMessage(string("got exception when executing plan:") + ex.what());
}
catch (...)
{
fControl->logMessage("got unknown exception when executing plan.");
}
}
return 0;
}
int RedistributeControlThread::connectToWes(int dbroot)
{
int ret = 0;
int pmId = fOamCache->getOwnerPM(dbroot);
ostringstream oss;
oss << "pm" << pmId << "_WriteEngineServer";
try
{
boost::mutex::scoped_lock lock(fActionMutex);
fWesInUse = oss.str();
fMsgQueueClient.reset(new MessageQueueClient(fWesInUse, fConfig));
}
catch (const std::exception& ex)
{
fErrorMsg = "Caught exception when connecting to " + oss.str() + " -- " + ex.what();
ret = 1;
}
catch (...)
{
fErrorMsg = "Caught exception when connecting to " + oss.str() + " -- unknown";
ret = 2;
}
if (ret != 0)
{
boost::mutex::scoped_lock lock(fActionMutex);
fWesInUse.clear();
fMsgQueueClient.reset();
}
return ret;
}
void RedistributeControlThread::doStopAction()
{
fConfig = Config::makeConfig();
fControl = RedistributeControl::instance();
boost::mutex::scoped_lock lock(fActionMutex);
if (!fWesInUse.empty())
{
// send the stop message to dbroots
size_t headerSize = sizeof(RedistributeMsgHeader);
RedistributeMsgHeader header(-1, -1, -1, RED_ACTN_STOP);
try
{
fMsgQueueClient.reset(new MessageQueueClient(fWesInUse, fConfig));
ByteStream bs;
bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;
bs.append((const ByteStream::byte*)&header, headerSize);
fMsgQueueClient->write(bs);
SBS sbs;
sbs = fMsgQueueClient->read();
// no retry yet.
}
catch (const std::exception& ex)
{
fErrorMsg = "Caught exception when connecting to " + fWesInUse + " -- " + ex.what();
}
catch (...)
{
fErrorMsg = "Caught exception when connecting to " + fWesInUse + " -- unknown";
}
}
if (!fErrorMsg.empty())
fControl->logMessage(fErrorMsg + " @controlThread::doStop");
else
fControl->logMessage("User stop @controlThread::doStop");
fWesInUse.clear();
fMsgQueueClient.reset();
}
} // namespace redistribute