1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-5555 Add support for startreadonly command. (#3081)

This patch adds support for `startreadonly` command which waits
until all active cpimport jobs are done and then puts controller node to readonly
mode.
This commit is contained in:
Denis Khalikov
2023-12-26 14:11:33 +03:00
committed by GitHub
parent 0c6876d8e4
commit 55ffacf546
11 changed files with 403 additions and 18 deletions

View File

@ -532,6 +532,11 @@ const uint8_t BULK_UPDATE_DBROOT = 100;
const uint8_t GET_SYSTEM_CATALOG = 101;
const uint8_t BULK_WRITE_VB_ENTRY = 102;
const uint8_t NEW_CPIMPORT_JOB = 103;
const uint8_t FINISH_CPIMPORT_JOB = 104;
const uint8_t START_READONLY = 105;
const uint8_t FORCE_CLEAR_CPIMPORT_JOBS = 106;
/* Error codes returned by the DBRM functions. */
/// The operation was successful
const int8_t ERR_OK = 0;

View File

@ -2733,6 +2733,25 @@ int DBRM::setReadOnly(bool b) DBRM_THROW
return err;
}
int DBRM::startReadOnly() DBRM_THROW
{
ByteStream command, response;
uint8_t err;
command << START_READONLY;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::isReadWrite() throw()
{
#ifdef BRM_INFO
@ -3006,6 +3025,73 @@ const QueryContext DBRM::sysCatVerID()
return ret;
}
uint8_t DBRM::newCpimportJob(uint32_t& jobId)
{
ByteStream command, response;
uint8_t err;
command << NEW_CPIMPORT_JOB;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: SessionManager::newCpimportJob(): network error");
return err;
}
if (response.length() != 5)
{
log("DBRM: SessionManager::newCpimportJob(): bad response");
return ERR_READONLY;
}
response >> err;
response >> jobId;
return ERR_OK;
}
void DBRM::finishCpimportJob(uint32_t jobId)
{
ByteStream command, response;
uint8_t err;
command << FINISH_CPIMPORT_JOB << (uint32_t)jobId;
err = send_recv(command, response);
if (err != ERR_OK)
log("DBRM: error: SessionManager::finishCpimportJob() failed");
else if (response.length() != 1)
log("DBRM: error: SessionManager::finishCpimportJob() failed (bad response)", logging::LOG_TYPE_ERROR);
response >> err;
if (err != ERR_OK)
log("DBRM: error: SessionManager::finishCpimportJob() failed (valid error code)",
logging::LOG_TYPE_ERROR);
}
int DBRM::forceClearCpimportJobs() DBRM_THROW
{
ByteStream command, response;
uint8_t err;
command << FORCE_CLEAR_CPIMPORT_JOBS;
err = send_recv(command, response);
if (err != ERR_OK)
log("DBRM: error: SessionManager::forceClearAllCpimportJobs()) failed");
else if (response.length() != 1)
log("DBRM: error: SessionManager::forceClearAllCpimportJobs() failed (bad response)",
logging::LOG_TYPE_ERROR);
response >> err;
if (err != ERR_OK)
log("DBRM: error: SessionManager::forceClearAllCpimportJobs() failed (valid error code)",
logging::LOG_TYPE_ERROR);
return err;
}
const TxnID DBRM::newTxnID(const SessionManagerServer::SID session, bool block, bool isDDL)
{
#ifdef BRM_INFO
@ -4460,9 +4546,10 @@ void DBRM::addToLBIDList(uint32_t sessionID, vector<LBID_t>& lbidList)
boost::shared_ptr<execplan::CalpontSystemCatalog> systemCatalogPtr =
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
std::unordered_map<execplan::CalpontSystemCatalog::OID,
std::unordered_map<execplan::CalpontSystemCatalog::OID,
std::vector<struct BRM::EMEntry>>> extentMap;
std::unordered_map<
execplan::CalpontSystemCatalog::OID,
std::unordered_map<execplan::CalpontSystemCatalog::OID, std::vector<struct BRM::EMEntry>>>
extentMap;
int err = 0;
@ -4487,18 +4574,15 @@ void DBRM::addToLBIDList(uint32_t sessionID, vector<LBID_t>& lbidList)
throw runtime_error(os.str());
}
execplan::CalpontSystemCatalog::OID tableOid =
systemCatalogPtr->isAUXColumnOID(oid);
execplan::CalpontSystemCatalog::OID tableOid = systemCatalogPtr->isAUXColumnOID(oid);
if (tableOid >= 3000)
{
if (tableOidSet.find(tableOid) == tableOidSet.end())
{
tableOidSet.insert(tableOid);
execplan::CalpontSystemCatalog::TableName tableName =
systemCatalogPtr->tableName(tableOid);
execplan::CalpontSystemCatalog::RIDList tableColRidList =
systemCatalogPtr->columnRIDs(tableName);
execplan::CalpontSystemCatalog::TableName tableName = systemCatalogPtr->tableName(tableOid);
execplan::CalpontSystemCatalog::RIDList tableColRidList = systemCatalogPtr->columnRIDs(tableName);
for (unsigned j = 0; j < tableColRidList.size(); j++)
{

View File

@ -753,6 +753,8 @@ class DBRM
/* SessionManager interface */
EXPORT const QueryContext verID();
EXPORT const QueryContext sysCatVerID();
EXPORT uint8_t newCpimportJob(uint32_t &jobId);
EXPORT void finishCpimportJob(uint32_t jobId);
EXPORT const TxnID newTxnID(const SessionManagerServer::SID session, bool block, bool isDDL = false);
EXPORT void committed(BRM::TxnID& txnid);
EXPORT void rolledback(BRM::TxnID& txnid);
@ -826,6 +828,8 @@ class DBRM
EXPORT int resume() DBRM_THROW;
EXPORT int forceReload() DBRM_THROW;
EXPORT int setReadOnly(bool b) DBRM_THROW;
EXPORT int startReadOnly() DBRM_THROW;
EXPORT int forceClearCpimportJobs() DBRM_THROW;
EXPORT int isReadWrite() throw();
EXPORT bool isEMEmpty() throw();

View File

@ -39,7 +39,10 @@ bool vflg;
void usage(char* c)
{
cerr << "Usage: " << c << " [-vh] status | halt | resume | readonly | readwrite | reload" << endl;
cerr << "Usage: " << c
<< " [-vh] status | halt | resume | readonly | readwrite | reload | startreadonly | "
"forceclearcpimportjobs"
<< endl;
exit(1);
}
@ -115,6 +118,20 @@ void do_status()
errMsg(err);
}
void start_readonly()
{
int err;
err = dbrm.startReadOnly();
errMsg(err);
}
void force_clear_cpimport_jobs()
{
int err;
err = dbrm.forceClearCpimportJobs();
errMsg(err);
}
void do_sysstatus()
{
int err;
@ -172,6 +189,10 @@ int main(int argc, char** argv)
set_readonly(false);
else if (cmd == "reload")
do_reload();
else if (cmd == "startreadonly")
start_readonly();
else if (cmd == "forceclearcpimportjobs")
force_clear_cpimport_jobs();
else if (cmd == "sysstatus")
do_sysstatus();
else

View File

@ -24,6 +24,7 @@
#include <unistd.h>
#include <sys/types.h>
#include <sstream>
#include <thread>
#include "sessionmanager.h"
#include "socketclosed.h"
@ -477,6 +478,17 @@ void MasterDBRMNode::msgProcessor()
case GET_UNCOMMITTED_LBIDS: doGetUncommittedLbids(msg, p); continue;
}
switch (cmd)
{
case NEW_CPIMPORT_JOB: doNewCpimportJob(p); continue;
case FINISH_CPIMPORT_JOB: doFinishCpimportJob(msg, p); continue;
case START_READONLY: doStartReadOnly(p->sock); continue;
case FORCE_CLEAR_CPIMPORT_JOBS: doForceClearAllCpimportJobs(p->sock); continue;
}
/* Process TableLock calls */
switch (cmd)
{
@ -1149,11 +1161,66 @@ void MasterDBRMNode::doResume(messageqcpp::IOSocket* sock)
{
}
}
void MasterDBRMNode::doForceClearAllCpimportJobs(messageqcpp::IOSocket* sock)
{
#ifdef BMR_VERBOSE
std::cout << "doForceClearCpimprtJobs" << std::endl;
#endif
ByteStream reply;
std::unique_lock lk(cpimportMutex);
sm.clearAllCpimportJobs();
reply << (uint8_t)ERR_OK;
try
{
sock->write(reply);
}
catch (exception&)
{
}
}
void MasterDBRMNode::doStartReadOnly(messageqcpp::IOSocket* sock)
{
#ifdef BRM_VERBOSE
std::cout << "doStartReadonly" << std::endl;
#endif
ByteStream reply;
// Spawn a new thread and detach it, we cannot block `msgProcessor`.
std::thread readonlyAdjuster(
[this]()
{
std::unique_lock lk(cpimportMutex);
if (sm.getCpimportJobsCount() != 0)
{
waitToFinishJobs = true;
// Wait until all cpimort jobs are done.
cpimportJobsCond.wait(lk, [this]() { return sm.getCpimportJobsCount() == 0; });
setReadOnly(true);
waitToFinishJobs = false;
}
else
{
setReadOnly(true);
}
});
readonlyAdjuster.detach();
reply << (uint8_t)ERR_OK;
try
{
sock->write(reply);
}
catch (exception&)
{
}
}
void MasterDBRMNode::doSetReadOnly(messageqcpp::IOSocket* sock, bool b)
{
ByteStream reply;
setReadOnly(b);
reply << (uint8_t)ERR_OK;
@ -1358,6 +1425,103 @@ void MasterDBRMNode::doSysCatVerID(ByteStream& msg, ThreadParams* p)
}
}
void MasterDBRMNode::doNewCpimportJob(ThreadParams* p)
{
#ifdef BRM_VERBOSE
std::cerr << "doNewCpimportJob" << std::endl;
#endif
ByteStream reply;
uint32_t jobId;
std::unique_lock lk(cpimportMutex);
// That means that controller node is waiting untill all active cpimport jobs are done.
// We cannot block `msgProcessor` and cannot create a new job, so exit with `readonly` code.
if (waitToFinishJobs)
{
reply << (uint8_t)ERR_READONLY;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
try
{
jobId = sm.newCpimportJob();
}
catch (exception&)
{
reply.reset();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
reply << (uint8_t)ERR_OK;
reply << (uint32_t)jobId;
try
{
p->sock->write(reply);
}
catch (exception&)
{
}
}
void MasterDBRMNode::doFinishCpimportJob(ByteStream& msg, ThreadParams* p)
{
#ifdef BRM_VERBOSE
std::cout << "doFinishCpimportJob" << std::endl;
#endif
ByteStream reply;
uint32_t cpimportJob;
uint8_t cmd;
std::unique_lock lk(cpimportMutex);
msg >> cmd;
msg >> cpimportJob;
try
{
sm.finishCpimortJob(cpimportJob);
}
catch (exception&)
{
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
reply << (uint8_t)ERR_OK;
try
{
p->sock->write(reply);
}
catch (...)
{
}
if (waitToFinishJobs && sm.getCpimportJobsCount() == 0)
cpimportJobsCond.notify_one();
}
void MasterDBRMNode::doNewTxnID(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;

View File

@ -186,6 +186,7 @@ class MasterDBRMNode
void doReload(messageqcpp::IOSocket* sock);
void doSetReadOnly(messageqcpp::IOSocket* sock, bool b);
void doGetReadOnly(messageqcpp::IOSocket* sock);
void doStartReadOnly(messageqcpp::IOSocket* sock);
/* SessionManager interface */
SessionManagerServer sm;
@ -193,6 +194,9 @@ class MasterDBRMNode
void doGetSystemCatalog(messageqcpp::ByteStream& msg, ThreadParams* p);
void doSysCatVerID(messageqcpp::ByteStream& msg, ThreadParams* p);
void doNewTxnID(messageqcpp::ByteStream& msg, ThreadParams* p);
void doNewCpimportJob(ThreadParams* p);
void doFinishCpimportJob(messageqcpp::ByteStream& msg, ThreadParams* p);
void doForceClearAllCpimportJobs(messageqcpp::IOSocket* sock);
void doCommitted(messageqcpp::ByteStream& msg, ThreadParams* p);
void doRolledBack(messageqcpp::ByteStream& msg, ThreadParams* p);
void doGetTxnID(messageqcpp::ByteStream& msg, ThreadParams* p);
@ -247,11 +251,14 @@ class MasterDBRMNode
boost::mutex mutex2; // protects params and the hand-off TODO: simplify
boost::mutex slaveLock; // syncs communication with the slaves
boost::mutex serverLock; // kludge to synchronize reloading
std::mutex cpimportMutex;
std::condition_variable cpimportJobsCond;
int runners, NumWorkers;
ThreadParams* params;
volatile bool die, halting;
bool reloadCmd;
mutable bool readOnly;
mutable bool waitToFinishJobs{false};
struct timespec MSG_TIMEOUT;
};

View File

@ -34,6 +34,7 @@
#include <string>
#include <stdexcept>
#include <limits>
#include <unordered_set>
using namespace std;
#include <boost/thread/mutex.hpp>
@ -256,6 +257,28 @@ const QueryContext SessionManagerServer::sysCatVerID()
return ret;
}
uint32_t SessionManagerServer::newCpimportJob()
{
std::scoped_lock lk(cpimportMutex);
activeCpimportJobs.insert(cpimportJobId);
auto ret = cpimportJobId;
++cpimportJobId;
return ret;
}
void SessionManagerServer::finishCpimortJob(uint32_t jobId)
{
std::scoped_lock lk(cpimportMutex);
if (activeCpimportJobs.count(jobId))
activeCpimportJobs.erase(jobId);
}
void SessionManagerServer::clearAllCpimportJobs()
{
std::scoped_lock lk(cpimportMutex);
activeCpimportJobs.clear();
}
const TxnID SessionManagerServer::newTxnID(const SID session, bool block, bool isDDL)
{
TxnID ret; // ctor must set valid = false
@ -383,6 +406,12 @@ void SessionManagerServer::clearSystemState(uint32_t state)
saveSystemState();
}
uint32_t SessionManagerServer::getCpimportJobsCount()
{
std::scoped_lock lk(cpimportMutex);
return activeCpimportJobs.size();
}
uint32_t SessionManagerServer::getTxnCount()
{
boost::mutex::scoped_lock lk(mutex);

View File

@ -27,8 +27,9 @@
#pragma once
#include <map>
#include <condition_variable>
#include <unordered_set>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
@ -39,7 +40,6 @@
#define EXPORT
namespace BRM
{
/** @brief Issues transaction IDs and keeps track of the current system-wide version ID.
@ -146,6 +146,15 @@ class SessionManagerServer
*/
EXPORT const TxnID newTxnID(const SID session, bool block = true, bool isDDL = false);
// Adds a new job into `active` cpimport job list and return id of that job.
EXPORT uint32_t newCpimportJob();
// Removes the given `jobId` from `active` cpimort job list.
EXPORT void finishCpimortJob(uint32_t jobId);
// Clears all active cpimport jobs.
EXPORT void clearAllCpimportJobs();
/** @brief Record that a transaction has been committed
*
* Record that a transaction has been committed.
@ -253,6 +262,9 @@ class SessionManagerServer
*/
EXPORT uint32_t getTxnCount();
EXPORT uint32_t getCpimportJobsCount();
private:
SessionManagerServer(const SessionManagerServer&);
SessionManagerServer& operator=(const SessionManagerServer&);
@ -277,9 +289,12 @@ class SessionManagerServer
boost::mutex mutex;
boost::condition_variable condvar; // used to synthesize a semaphore
uint32_t semValue;
std::unordered_set<uint32_t> activeCpimportJobs;
uint32_t cpimportJobId{0};
std::mutex cpimportMutex;
};
} // namespace BRM
#undef EXPORT

View File

@ -59,6 +59,7 @@ namespace
char* pgmName = 0;
const std::string IMPORT_PATH_CWD(".");
bool bDebug = false;
uint32_t cpimportJobId = 0;
//@bug 4643: cpimport job ended during setup w/o any err msg.
// Added a try/catch with logging to main() in case
@ -192,6 +193,7 @@ void printUsage()
//------------------------------------------------------------------------------
void handleSigTerm(int i)
{
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
std::cout << "Received SIGTERM to terminate the process..." << std::endl;
BulkStatus::setJobStatus(EXIT_FAILURE);
}
@ -201,12 +203,32 @@ void handleSigTerm(int i)
//------------------------------------------------------------------------------
void handleControlC(int i)
{
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
if (!BulkLoad::disableConsoleOutput())
std::cout << "Received Control-C to terminate the process..." << std::endl;
BulkStatus::setJobStatus(EXIT_FAILURE);
}
//------------------------------------------------------------------------------
// Signal handler to catch SIGTERM signal to terminate the process
//------------------------------------------------------------------------------
void handleSigSegv(int i)
{
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
std::cout << "Received SIGSEGV to terminate the process..." << std::endl;
BulkStatus::setJobStatus(EXIT_FAILURE);
}
//------------------------------------------------------------------------------
// Signal handler to catch SIGTERM signal to terminate the process
//------------------------------------------------------------------------------
void handleSigAbrt(int i)
{
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
std::cout << "Received SIGABRT to terminate the process..." << std::endl;
BulkStatus::setJobStatus(EXIT_FAILURE);
}
//------------------------------------------------------------------------------
// If error occurs during startup, this function is called to log the specified
@ -214,6 +236,7 @@ void handleControlC(int i)
//------------------------------------------------------------------------------
void startupError(const std::string& errMsg, bool showHint)
{
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
// Log to console
if (!BulkLoad::disableConsoleOutput())
cerr << errMsg << endl;
@ -275,6 +298,17 @@ void setupSignalHandlers()
memset(&act, 0, sizeof(act));
act.sa_handler = handleSigTerm;
sigaction(SIGTERM, &act, 0);
// catch SIGSEGV signal to terminate the program
memset(&act, 0, sizeof(act));
act.sa_handler = handleSigSegv;
sigaction(SIGSEGV, &act, 0);
// catch SIGABRT signal to terminate the program
memset(&act, 0, sizeof(act));
act.sa_handler = handleSigAbrt;
sigaction(SIGABRT, &act, 0);
}
//------------------------------------------------------------------------------
@ -1270,6 +1304,16 @@ int main(int argc, char** argv)
new boost::thread(utils::MonitorProcMem(0, checkPct, SUBSYSTEM_ID_WE_BULK));
}
rc = BRMWrapper::getInstance()->newCpimportJob(cpimportJobId);
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "Error in creating new cpimport job on Controller node; " << ec.errorString(rc)
<< "; cpimport is terminating.";
startupError(oss.str(), false);
}
//--------------------------------------------------------------------------
// This is the real business
//--------------------------------------------------------------------------
@ -1320,6 +1364,7 @@ int main(int argc, char** argv)
rc = ERR_UNKNOWN;
}
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
// Free up resources allocated by MY_INIT() above.
my_end(0);

View File

@ -60,8 +60,7 @@ BRMWrapper* volatile BRMWrapper::m_instance = NULL;
boost::thread_specific_ptr<int> BRMWrapper::m_ThreadDataPtr;
boost::mutex BRMWrapper::m_instanceCreateMutex;
bool BRMWrapper::m_useVb = true;
bool BRMWrapper::m_useVb = true;
OID BRMWrapper::m_curVBOid = INVALID_NUM;
IDBDataFile* BRMWrapper::m_curVBFile = NULL;
boost::mutex vbFileLock;
@ -744,6 +743,16 @@ int BRMWrapper::copyVBBlock(IDBDataFile* pSourceFile, IDBDataFile* pTargetFile,
return NO_ERROR;
}
uint8_t BRMWrapper::newCpimportJob(uint32_t &jobId)
{
return blockRsltnMgrPtr->newCpimportJob(jobId);
}
void BRMWrapper::finishCpimportJob(uint32_t jobId)
{
blockRsltnMgrPtr->finishCpimportJob(jobId);
}
int BRMWrapper::commit(const VER_t transID)
{
int rc = blockRsltnMgrPtr->vbCommit(transID);

View File

@ -380,6 +380,8 @@ class BRMWrapper : public WEObj
* @brief Commit the transaction
*/
EXPORT int commit(const BRM::VER_t transID);
EXPORT uint8_t newCpimportJob(uint32_t &jobId);
EXPORT void finishCpimportJob(uint32_t jobId);
/**
* @brief Copy blocks between write engine and version buffer