You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
MCOL-5555 Add support for startreadonly
command.
This patch adds support for `startreadonly` command which waits until all active cpimport jobs are done and then puts controller node to readonly mode.
This commit is contained in:
committed by
Leonid Fedorov
parent
2b20e1de25
commit
3fcb9b66f5
@ -532,6 +532,11 @@ const uint8_t BULK_UPDATE_DBROOT = 100;
|
|||||||
const uint8_t GET_SYSTEM_CATALOG = 101;
|
const uint8_t GET_SYSTEM_CATALOG = 101;
|
||||||
const uint8_t BULK_WRITE_VB_ENTRY = 102;
|
const uint8_t BULK_WRITE_VB_ENTRY = 102;
|
||||||
|
|
||||||
|
const uint8_t NEW_CPIMPORT_JOB = 103;
|
||||||
|
const uint8_t FINISH_CPIMPORT_JOB = 104;
|
||||||
|
const uint8_t START_READONLY = 105;
|
||||||
|
const uint8_t FORCE_CLEAR_CPIMPORT_JOBS = 106;
|
||||||
|
|
||||||
/* Error codes returned by the DBRM functions. */
|
/* Error codes returned by the DBRM functions. */
|
||||||
/// The operation was successful
|
/// The operation was successful
|
||||||
const int8_t ERR_OK = 0;
|
const int8_t ERR_OK = 0;
|
||||||
|
@ -2733,6 +2733,25 @@ int DBRM::setReadOnly(bool b) DBRM_THROW
|
|||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int DBRM::startReadOnly() DBRM_THROW
|
||||||
|
{
|
||||||
|
ByteStream command, response;
|
||||||
|
uint8_t err;
|
||||||
|
|
||||||
|
command << START_READONLY;
|
||||||
|
err = send_recv(command, response);
|
||||||
|
|
||||||
|
if (err != ERR_OK)
|
||||||
|
return err;
|
||||||
|
|
||||||
|
if (response.length() != 1)
|
||||||
|
return ERR_NETWORK;
|
||||||
|
|
||||||
|
response >> err;
|
||||||
|
CHECK_EMPTY(response);
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
int DBRM::isReadWrite() throw()
|
int DBRM::isReadWrite() throw()
|
||||||
{
|
{
|
||||||
#ifdef BRM_INFO
|
#ifdef BRM_INFO
|
||||||
@ -3006,6 +3025,73 @@ const QueryContext DBRM::sysCatVerID()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint8_t DBRM::newCpimportJob(uint32_t& jobId)
|
||||||
|
{
|
||||||
|
ByteStream command, response;
|
||||||
|
uint8_t err;
|
||||||
|
command << NEW_CPIMPORT_JOB;
|
||||||
|
err = send_recv(command, response);
|
||||||
|
|
||||||
|
if (err != ERR_OK)
|
||||||
|
{
|
||||||
|
log("DBRM: SessionManager::newCpimportJob(): network error");
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (response.length() != 5)
|
||||||
|
{
|
||||||
|
log("DBRM: SessionManager::newCpimportJob(): bad response");
|
||||||
|
return ERR_READONLY;
|
||||||
|
}
|
||||||
|
|
||||||
|
response >> err;
|
||||||
|
response >> jobId;
|
||||||
|
return ERR_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
void DBRM::finishCpimportJob(uint32_t jobId)
|
||||||
|
{
|
||||||
|
ByteStream command, response;
|
||||||
|
uint8_t err;
|
||||||
|
|
||||||
|
command << FINISH_CPIMPORT_JOB << (uint32_t)jobId;
|
||||||
|
err = send_recv(command, response);
|
||||||
|
|
||||||
|
if (err != ERR_OK)
|
||||||
|
log("DBRM: error: SessionManager::finishCpimportJob() failed");
|
||||||
|
else if (response.length() != 1)
|
||||||
|
log("DBRM: error: SessionManager::finishCpimportJob() failed (bad response)", logging::LOG_TYPE_ERROR);
|
||||||
|
|
||||||
|
response >> err;
|
||||||
|
|
||||||
|
if (err != ERR_OK)
|
||||||
|
log("DBRM: error: SessionManager::finishCpimportJob() failed (valid error code)",
|
||||||
|
logging::LOG_TYPE_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
int DBRM::forceClearCpimportJobs() DBRM_THROW
|
||||||
|
{
|
||||||
|
ByteStream command, response;
|
||||||
|
uint8_t err;
|
||||||
|
|
||||||
|
command << FORCE_CLEAR_CPIMPORT_JOBS;
|
||||||
|
err = send_recv(command, response);
|
||||||
|
|
||||||
|
if (err != ERR_OK)
|
||||||
|
log("DBRM: error: SessionManager::forceClearAllCpimportJobs()) failed");
|
||||||
|
else if (response.length() != 1)
|
||||||
|
log("DBRM: error: SessionManager::forceClearAllCpimportJobs() failed (bad response)",
|
||||||
|
logging::LOG_TYPE_ERROR);
|
||||||
|
|
||||||
|
response >> err;
|
||||||
|
|
||||||
|
if (err != ERR_OK)
|
||||||
|
log("DBRM: error: SessionManager::forceClearAllCpimportJobs() failed (valid error code)",
|
||||||
|
logging::LOG_TYPE_ERROR);
|
||||||
|
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
const TxnID DBRM::newTxnID(const SessionManagerServer::SID session, bool block, bool isDDL)
|
const TxnID DBRM::newTxnID(const SessionManagerServer::SID session, bool block, bool isDDL)
|
||||||
{
|
{
|
||||||
#ifdef BRM_INFO
|
#ifdef BRM_INFO
|
||||||
@ -4458,11 +4544,12 @@ void DBRM::deleteAISequence(uint32_t OID)
|
|||||||
void DBRM::addToLBIDList(uint32_t sessionID, vector<LBID_t>& lbidList)
|
void DBRM::addToLBIDList(uint32_t sessionID, vector<LBID_t>& lbidList)
|
||||||
{
|
{
|
||||||
boost::shared_ptr<execplan::CalpontSystemCatalog> systemCatalogPtr =
|
boost::shared_ptr<execplan::CalpontSystemCatalog> systemCatalogPtr =
|
||||||
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
|
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
|
||||||
|
|
||||||
std::unordered_map<execplan::CalpontSystemCatalog::OID,
|
std::unordered_map<
|
||||||
std::unordered_map<execplan::CalpontSystemCatalog::OID,
|
execplan::CalpontSystemCatalog::OID,
|
||||||
std::vector<struct BRM::EMEntry>>> extentMap;
|
std::unordered_map<execplan::CalpontSystemCatalog::OID, std::vector<struct BRM::EMEntry>>>
|
||||||
|
extentMap;
|
||||||
|
|
||||||
int err = 0;
|
int err = 0;
|
||||||
|
|
||||||
@ -4487,18 +4574,15 @@ void DBRM::addToLBIDList(uint32_t sessionID, vector<LBID_t>& lbidList)
|
|||||||
throw runtime_error(os.str());
|
throw runtime_error(os.str());
|
||||||
}
|
}
|
||||||
|
|
||||||
execplan::CalpontSystemCatalog::OID tableOid =
|
execplan::CalpontSystemCatalog::OID tableOid = systemCatalogPtr->isAUXColumnOID(oid);
|
||||||
systemCatalogPtr->isAUXColumnOID(oid);
|
|
||||||
|
|
||||||
if (tableOid >= 3000)
|
if (tableOid >= 3000)
|
||||||
{
|
{
|
||||||
if (tableOidSet.find(tableOid) == tableOidSet.end())
|
if (tableOidSet.find(tableOid) == tableOidSet.end())
|
||||||
{
|
{
|
||||||
tableOidSet.insert(tableOid);
|
tableOidSet.insert(tableOid);
|
||||||
execplan::CalpontSystemCatalog::TableName tableName =
|
execplan::CalpontSystemCatalog::TableName tableName = systemCatalogPtr->tableName(tableOid);
|
||||||
systemCatalogPtr->tableName(tableOid);
|
execplan::CalpontSystemCatalog::RIDList tableColRidList = systemCatalogPtr->columnRIDs(tableName);
|
||||||
execplan::CalpontSystemCatalog::RIDList tableColRidList =
|
|
||||||
systemCatalogPtr->columnRIDs(tableName);
|
|
||||||
|
|
||||||
for (unsigned j = 0; j < tableColRidList.size(); j++)
|
for (unsigned j = 0; j < tableColRidList.size(); j++)
|
||||||
{
|
{
|
||||||
|
@ -753,6 +753,8 @@ class DBRM
|
|||||||
/* SessionManager interface */
|
/* SessionManager interface */
|
||||||
EXPORT const QueryContext verID();
|
EXPORT const QueryContext verID();
|
||||||
EXPORT const QueryContext sysCatVerID();
|
EXPORT const QueryContext sysCatVerID();
|
||||||
|
EXPORT uint8_t newCpimportJob(uint32_t &jobId);
|
||||||
|
EXPORT void finishCpimportJob(uint32_t jobId);
|
||||||
EXPORT const TxnID newTxnID(const SessionManagerServer::SID session, bool block, bool isDDL = false);
|
EXPORT const TxnID newTxnID(const SessionManagerServer::SID session, bool block, bool isDDL = false);
|
||||||
EXPORT void committed(BRM::TxnID& txnid);
|
EXPORT void committed(BRM::TxnID& txnid);
|
||||||
EXPORT void rolledback(BRM::TxnID& txnid);
|
EXPORT void rolledback(BRM::TxnID& txnid);
|
||||||
@ -826,6 +828,8 @@ class DBRM
|
|||||||
EXPORT int resume() DBRM_THROW;
|
EXPORT int resume() DBRM_THROW;
|
||||||
EXPORT int forceReload() DBRM_THROW;
|
EXPORT int forceReload() DBRM_THROW;
|
||||||
EXPORT int setReadOnly(bool b) DBRM_THROW;
|
EXPORT int setReadOnly(bool b) DBRM_THROW;
|
||||||
|
EXPORT int startReadOnly() DBRM_THROW;
|
||||||
|
EXPORT int forceClearCpimportJobs() DBRM_THROW;
|
||||||
EXPORT int isReadWrite() throw();
|
EXPORT int isReadWrite() throw();
|
||||||
|
|
||||||
EXPORT bool isEMEmpty() throw();
|
EXPORT bool isEMEmpty() throw();
|
||||||
|
@ -39,7 +39,10 @@ bool vflg;
|
|||||||
|
|
||||||
void usage(char* c)
|
void usage(char* c)
|
||||||
{
|
{
|
||||||
cerr << "Usage: " << c << " [-vh] status | halt | resume | readonly | readwrite | reload" << endl;
|
cerr << "Usage: " << c
|
||||||
|
<< " [-vh] status | halt | resume | readonly | readwrite | reload | startreadonly | "
|
||||||
|
"forceclearcpimportjobs"
|
||||||
|
<< endl;
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -115,6 +118,20 @@ void do_status()
|
|||||||
errMsg(err);
|
errMsg(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void start_readonly()
|
||||||
|
{
|
||||||
|
int err;
|
||||||
|
err = dbrm.startReadOnly();
|
||||||
|
errMsg(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
void force_clear_cpimport_jobs()
|
||||||
|
{
|
||||||
|
int err;
|
||||||
|
err = dbrm.forceClearCpimportJobs();
|
||||||
|
errMsg(err);
|
||||||
|
}
|
||||||
|
|
||||||
void do_sysstatus()
|
void do_sysstatus()
|
||||||
{
|
{
|
||||||
int err;
|
int err;
|
||||||
@ -172,6 +189,10 @@ int main(int argc, char** argv)
|
|||||||
set_readonly(false);
|
set_readonly(false);
|
||||||
else if (cmd == "reload")
|
else if (cmd == "reload")
|
||||||
do_reload();
|
do_reload();
|
||||||
|
else if (cmd == "startreadonly")
|
||||||
|
start_readonly();
|
||||||
|
else if (cmd == "forceclearcpimportjobs")
|
||||||
|
force_clear_cpimport_jobs();
|
||||||
else if (cmd == "sysstatus")
|
else if (cmd == "sysstatus")
|
||||||
do_sysstatus();
|
do_sysstatus();
|
||||||
else
|
else
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
#include "sessionmanager.h"
|
#include "sessionmanager.h"
|
||||||
#include "socketclosed.h"
|
#include "socketclosed.h"
|
||||||
@ -477,6 +478,17 @@ void MasterDBRMNode::msgProcessor()
|
|||||||
case GET_UNCOMMITTED_LBIDS: doGetUncommittedLbids(msg, p); continue;
|
case GET_UNCOMMITTED_LBIDS: doGetUncommittedLbids(msg, p); continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch (cmd)
|
||||||
|
{
|
||||||
|
case NEW_CPIMPORT_JOB: doNewCpimportJob(p); continue;
|
||||||
|
|
||||||
|
case FINISH_CPIMPORT_JOB: doFinishCpimportJob(msg, p); continue;
|
||||||
|
|
||||||
|
case START_READONLY: doStartReadOnly(p->sock); continue;
|
||||||
|
|
||||||
|
case FORCE_CLEAR_CPIMPORT_JOBS: doForceClearAllCpimportJobs(p->sock); continue;
|
||||||
|
}
|
||||||
|
|
||||||
/* Process TableLock calls */
|
/* Process TableLock calls */
|
||||||
switch (cmd)
|
switch (cmd)
|
||||||
{
|
{
|
||||||
@ -1149,11 +1161,66 @@ void MasterDBRMNode::doResume(messageqcpp::IOSocket* sock)
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
void MasterDBRMNode::doForceClearAllCpimportJobs(messageqcpp::IOSocket* sock)
|
||||||
|
{
|
||||||
|
#ifdef BMR_VERBOSE
|
||||||
|
std::cout << "doForceClearCpimprtJobs" << std::endl;
|
||||||
|
#endif
|
||||||
|
ByteStream reply;
|
||||||
|
std::unique_lock lk(cpimportMutex);
|
||||||
|
sm.clearAllCpimportJobs();
|
||||||
|
|
||||||
|
reply << (uint8_t)ERR_OK;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
sock->write(reply);
|
||||||
|
}
|
||||||
|
catch (exception&)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void MasterDBRMNode::doStartReadOnly(messageqcpp::IOSocket* sock)
|
||||||
|
{
|
||||||
|
#ifdef BRM_VERBOSE
|
||||||
|
std::cout << "doStartReadonly" << std::endl;
|
||||||
|
#endif
|
||||||
|
ByteStream reply;
|
||||||
|
|
||||||
|
// Spawn a new thread and detach it, we cannot block `msgProcessor`.
|
||||||
|
std::thread readonlyAdjuster(
|
||||||
|
[this]()
|
||||||
|
{
|
||||||
|
std::unique_lock lk(cpimportMutex);
|
||||||
|
if (sm.getCpimportJobsCount() != 0)
|
||||||
|
{
|
||||||
|
waitToFinishJobs = true;
|
||||||
|
// Wait until all cpimort jobs are done.
|
||||||
|
cpimportJobsCond.wait(lk, [this]() { return sm.getCpimportJobsCount() == 0; });
|
||||||
|
setReadOnly(true);
|
||||||
|
waitToFinishJobs = false;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
setReadOnly(true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
readonlyAdjuster.detach();
|
||||||
|
|
||||||
|
reply << (uint8_t)ERR_OK;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
sock->write(reply);
|
||||||
|
}
|
||||||
|
catch (exception&)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void MasterDBRMNode::doSetReadOnly(messageqcpp::IOSocket* sock, bool b)
|
void MasterDBRMNode::doSetReadOnly(messageqcpp::IOSocket* sock, bool b)
|
||||||
{
|
{
|
||||||
ByteStream reply;
|
ByteStream reply;
|
||||||
|
|
||||||
setReadOnly(b);
|
setReadOnly(b);
|
||||||
reply << (uint8_t)ERR_OK;
|
reply << (uint8_t)ERR_OK;
|
||||||
|
|
||||||
@ -1358,6 +1425,103 @@ void MasterDBRMNode::doSysCatVerID(ByteStream& msg, ThreadParams* p)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void MasterDBRMNode::doNewCpimportJob(ThreadParams* p)
|
||||||
|
{
|
||||||
|
#ifdef BRM_VERBOSE
|
||||||
|
std::cerr << "doNewCpimportJob" << std::endl;
|
||||||
|
#endif
|
||||||
|
ByteStream reply;
|
||||||
|
uint32_t jobId;
|
||||||
|
|
||||||
|
std::unique_lock lk(cpimportMutex);
|
||||||
|
// That means that controller node is waiting untill all active cpimport jobs are done.
|
||||||
|
// We cannot block `msgProcessor` and cannot create a new job, so exit with `readonly` code.
|
||||||
|
if (waitToFinishJobs)
|
||||||
|
{
|
||||||
|
reply << (uint8_t)ERR_READONLY;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
p->sock->write(reply);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
jobId = sm.newCpimportJob();
|
||||||
|
}
|
||||||
|
catch (exception&)
|
||||||
|
{
|
||||||
|
reply.reset();
|
||||||
|
reply << (uint8_t)ERR_FAILURE;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
p->sock->write(reply);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
reply << (uint8_t)ERR_OK;
|
||||||
|
reply << (uint32_t)jobId;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
p->sock->write(reply);
|
||||||
|
}
|
||||||
|
catch (exception&)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void MasterDBRMNode::doFinishCpimportJob(ByteStream& msg, ThreadParams* p)
|
||||||
|
{
|
||||||
|
#ifdef BRM_VERBOSE
|
||||||
|
std::cout << "doFinishCpimportJob" << std::endl;
|
||||||
|
#endif
|
||||||
|
ByteStream reply;
|
||||||
|
uint32_t cpimportJob;
|
||||||
|
uint8_t cmd;
|
||||||
|
std::unique_lock lk(cpimportMutex);
|
||||||
|
|
||||||
|
msg >> cmd;
|
||||||
|
msg >> cpimportJob;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
sm.finishCpimortJob(cpimportJob);
|
||||||
|
}
|
||||||
|
catch (exception&)
|
||||||
|
{
|
||||||
|
reply << (uint8_t)ERR_FAILURE;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
p->sock->write(reply);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
reply << (uint8_t)ERR_OK;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
p->sock->write(reply);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
if (waitToFinishJobs && sm.getCpimportJobsCount() == 0)
|
||||||
|
cpimportJobsCond.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
void MasterDBRMNode::doNewTxnID(ByteStream& msg, ThreadParams* p)
|
void MasterDBRMNode::doNewTxnID(ByteStream& msg, ThreadParams* p)
|
||||||
{
|
{
|
||||||
ByteStream reply;
|
ByteStream reply;
|
||||||
|
@ -186,6 +186,7 @@ class MasterDBRMNode
|
|||||||
void doReload(messageqcpp::IOSocket* sock);
|
void doReload(messageqcpp::IOSocket* sock);
|
||||||
void doSetReadOnly(messageqcpp::IOSocket* sock, bool b);
|
void doSetReadOnly(messageqcpp::IOSocket* sock, bool b);
|
||||||
void doGetReadOnly(messageqcpp::IOSocket* sock);
|
void doGetReadOnly(messageqcpp::IOSocket* sock);
|
||||||
|
void doStartReadOnly(messageqcpp::IOSocket* sock);
|
||||||
|
|
||||||
/* SessionManager interface */
|
/* SessionManager interface */
|
||||||
SessionManagerServer sm;
|
SessionManagerServer sm;
|
||||||
@ -193,6 +194,9 @@ class MasterDBRMNode
|
|||||||
void doGetSystemCatalog(messageqcpp::ByteStream& msg, ThreadParams* p);
|
void doGetSystemCatalog(messageqcpp::ByteStream& msg, ThreadParams* p);
|
||||||
void doSysCatVerID(messageqcpp::ByteStream& msg, ThreadParams* p);
|
void doSysCatVerID(messageqcpp::ByteStream& msg, ThreadParams* p);
|
||||||
void doNewTxnID(messageqcpp::ByteStream& msg, ThreadParams* p);
|
void doNewTxnID(messageqcpp::ByteStream& msg, ThreadParams* p);
|
||||||
|
void doNewCpimportJob(ThreadParams* p);
|
||||||
|
void doFinishCpimportJob(messageqcpp::ByteStream& msg, ThreadParams* p);
|
||||||
|
void doForceClearAllCpimportJobs(messageqcpp::IOSocket* sock);
|
||||||
void doCommitted(messageqcpp::ByteStream& msg, ThreadParams* p);
|
void doCommitted(messageqcpp::ByteStream& msg, ThreadParams* p);
|
||||||
void doRolledBack(messageqcpp::ByteStream& msg, ThreadParams* p);
|
void doRolledBack(messageqcpp::ByteStream& msg, ThreadParams* p);
|
||||||
void doGetTxnID(messageqcpp::ByteStream& msg, ThreadParams* p);
|
void doGetTxnID(messageqcpp::ByteStream& msg, ThreadParams* p);
|
||||||
@ -247,11 +251,14 @@ class MasterDBRMNode
|
|||||||
boost::mutex mutex2; // protects params and the hand-off TODO: simplify
|
boost::mutex mutex2; // protects params and the hand-off TODO: simplify
|
||||||
boost::mutex slaveLock; // syncs communication with the slaves
|
boost::mutex slaveLock; // syncs communication with the slaves
|
||||||
boost::mutex serverLock; // kludge to synchronize reloading
|
boost::mutex serverLock; // kludge to synchronize reloading
|
||||||
|
std::mutex cpimportMutex;
|
||||||
|
std::condition_variable cpimportJobsCond;
|
||||||
int runners, NumWorkers;
|
int runners, NumWorkers;
|
||||||
ThreadParams* params;
|
ThreadParams* params;
|
||||||
volatile bool die, halting;
|
volatile bool die, halting;
|
||||||
bool reloadCmd;
|
bool reloadCmd;
|
||||||
mutable bool readOnly;
|
mutable bool readOnly;
|
||||||
|
mutable bool waitToFinishJobs{false};
|
||||||
struct timespec MSG_TIMEOUT;
|
struct timespec MSG_TIMEOUT;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -34,6 +34,7 @@
|
|||||||
#include <string>
|
#include <string>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
#include <limits>
|
#include <limits>
|
||||||
|
#include <unordered_set>
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
#include <boost/thread/mutex.hpp>
|
#include <boost/thread/mutex.hpp>
|
||||||
@ -256,6 +257,28 @@ const QueryContext SessionManagerServer::sysCatVerID()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint32_t SessionManagerServer::newCpimportJob()
|
||||||
|
{
|
||||||
|
std::scoped_lock lk(cpimportMutex);
|
||||||
|
activeCpimportJobs.insert(cpimportJobId);
|
||||||
|
auto ret = cpimportJobId;
|
||||||
|
++cpimportJobId;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SessionManagerServer::finishCpimortJob(uint32_t jobId)
|
||||||
|
{
|
||||||
|
std::scoped_lock lk(cpimportMutex);
|
||||||
|
if (activeCpimportJobs.count(jobId))
|
||||||
|
activeCpimportJobs.erase(jobId);
|
||||||
|
}
|
||||||
|
|
||||||
|
void SessionManagerServer::clearAllCpimportJobs()
|
||||||
|
{
|
||||||
|
std::scoped_lock lk(cpimportMutex);
|
||||||
|
activeCpimportJobs.clear();
|
||||||
|
}
|
||||||
|
|
||||||
const TxnID SessionManagerServer::newTxnID(const SID session, bool block, bool isDDL)
|
const TxnID SessionManagerServer::newTxnID(const SID session, bool block, bool isDDL)
|
||||||
{
|
{
|
||||||
TxnID ret; // ctor must set valid = false
|
TxnID ret; // ctor must set valid = false
|
||||||
@ -383,6 +406,12 @@ void SessionManagerServer::clearSystemState(uint32_t state)
|
|||||||
saveSystemState();
|
saveSystemState();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint32_t SessionManagerServer::getCpimportJobsCount()
|
||||||
|
{
|
||||||
|
std::scoped_lock lk(cpimportMutex);
|
||||||
|
return activeCpimportJobs.size();
|
||||||
|
}
|
||||||
|
|
||||||
uint32_t SessionManagerServer::getTxnCount()
|
uint32_t SessionManagerServer::getTxnCount()
|
||||||
{
|
{
|
||||||
boost::mutex::scoped_lock lk(mutex);
|
boost::mutex::scoped_lock lk(mutex);
|
||||||
|
@ -27,8 +27,9 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <map>
|
#include <map>
|
||||||
|
#include <condition_variable>
|
||||||
|
|
||||||
|
#include <unordered_set>
|
||||||
#include <boost/thread/mutex.hpp>
|
#include <boost/thread/mutex.hpp>
|
||||||
#include <boost/thread/condition_variable.hpp>
|
#include <boost/thread/condition_variable.hpp>
|
||||||
|
|
||||||
@ -39,7 +40,6 @@
|
|||||||
|
|
||||||
#define EXPORT
|
#define EXPORT
|
||||||
|
|
||||||
|
|
||||||
namespace BRM
|
namespace BRM
|
||||||
{
|
{
|
||||||
/** @brief Issues transaction IDs and keeps track of the current system-wide version ID.
|
/** @brief Issues transaction IDs and keeps track of the current system-wide version ID.
|
||||||
@ -146,6 +146,15 @@ class SessionManagerServer
|
|||||||
*/
|
*/
|
||||||
EXPORT const TxnID newTxnID(const SID session, bool block = true, bool isDDL = false);
|
EXPORT const TxnID newTxnID(const SID session, bool block = true, bool isDDL = false);
|
||||||
|
|
||||||
|
// Adds a new job into `active` cpimport job list and return id of that job.
|
||||||
|
EXPORT uint32_t newCpimportJob();
|
||||||
|
|
||||||
|
// Removes the given `jobId` from `active` cpimort job list.
|
||||||
|
EXPORT void finishCpimortJob(uint32_t jobId);
|
||||||
|
|
||||||
|
// Clears all active cpimport jobs.
|
||||||
|
EXPORT void clearAllCpimportJobs();
|
||||||
|
|
||||||
/** @brief Record that a transaction has been committed
|
/** @brief Record that a transaction has been committed
|
||||||
*
|
*
|
||||||
* Record that a transaction has been committed.
|
* Record that a transaction has been committed.
|
||||||
@ -253,6 +262,9 @@ class SessionManagerServer
|
|||||||
*/
|
*/
|
||||||
EXPORT uint32_t getTxnCount();
|
EXPORT uint32_t getTxnCount();
|
||||||
|
|
||||||
|
|
||||||
|
EXPORT uint32_t getCpimportJobsCount();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
SessionManagerServer(const SessionManagerServer&);
|
SessionManagerServer(const SessionManagerServer&);
|
||||||
SessionManagerServer& operator=(const SessionManagerServer&);
|
SessionManagerServer& operator=(const SessionManagerServer&);
|
||||||
@ -277,9 +289,12 @@ class SessionManagerServer
|
|||||||
boost::mutex mutex;
|
boost::mutex mutex;
|
||||||
boost::condition_variable condvar; // used to synthesize a semaphore
|
boost::condition_variable condvar; // used to synthesize a semaphore
|
||||||
uint32_t semValue;
|
uint32_t semValue;
|
||||||
|
|
||||||
|
std::unordered_set<uint32_t> activeCpimportJobs;
|
||||||
|
uint32_t cpimportJobId{0};
|
||||||
|
std::mutex cpimportMutex;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace BRM
|
} // namespace BRM
|
||||||
|
|
||||||
|
|
||||||
#undef EXPORT
|
#undef EXPORT
|
||||||
|
@ -59,6 +59,7 @@ namespace
|
|||||||
char* pgmName = 0;
|
char* pgmName = 0;
|
||||||
const std::string IMPORT_PATH_CWD(".");
|
const std::string IMPORT_PATH_CWD(".");
|
||||||
bool bDebug = false;
|
bool bDebug = false;
|
||||||
|
uint32_t cpimportJobId = 0;
|
||||||
|
|
||||||
//@bug 4643: cpimport job ended during setup w/o any err msg.
|
//@bug 4643: cpimport job ended during setup w/o any err msg.
|
||||||
// Added a try/catch with logging to main() in case
|
// Added a try/catch with logging to main() in case
|
||||||
@ -192,6 +193,7 @@ void printUsage()
|
|||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
void handleSigTerm(int i)
|
void handleSigTerm(int i)
|
||||||
{
|
{
|
||||||
|
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
|
||||||
std::cout << "Received SIGTERM to terminate the process..." << std::endl;
|
std::cout << "Received SIGTERM to terminate the process..." << std::endl;
|
||||||
BulkStatus::setJobStatus(EXIT_FAILURE);
|
BulkStatus::setJobStatus(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
@ -201,12 +203,32 @@ void handleSigTerm(int i)
|
|||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
void handleControlC(int i)
|
void handleControlC(int i)
|
||||||
{
|
{
|
||||||
|
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
|
||||||
if (!BulkLoad::disableConsoleOutput())
|
if (!BulkLoad::disableConsoleOutput())
|
||||||
std::cout << "Received Control-C to terminate the process..." << std::endl;
|
std::cout << "Received Control-C to terminate the process..." << std::endl;
|
||||||
|
|
||||||
BulkStatus::setJobStatus(EXIT_FAILURE);
|
BulkStatus::setJobStatus(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
// Signal handler to catch SIGTERM signal to terminate the process
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
void handleSigSegv(int i)
|
||||||
|
{
|
||||||
|
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
|
||||||
|
std::cout << "Received SIGSEGV to terminate the process..." << std::endl;
|
||||||
|
BulkStatus::setJobStatus(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
// Signal handler to catch SIGTERM signal to terminate the process
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
void handleSigAbrt(int i)
|
||||||
|
{
|
||||||
|
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
|
||||||
|
std::cout << "Received SIGABRT to terminate the process..." << std::endl;
|
||||||
|
BulkStatus::setJobStatus(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
// If error occurs during startup, this function is called to log the specified
|
// If error occurs during startup, this function is called to log the specified
|
||||||
@ -214,6 +236,7 @@ void handleControlC(int i)
|
|||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
void startupError(const std::string& errMsg, bool showHint)
|
void startupError(const std::string& errMsg, bool showHint)
|
||||||
{
|
{
|
||||||
|
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
|
||||||
// Log to console
|
// Log to console
|
||||||
if (!BulkLoad::disableConsoleOutput())
|
if (!BulkLoad::disableConsoleOutput())
|
||||||
cerr << errMsg << endl;
|
cerr << errMsg << endl;
|
||||||
@ -275,6 +298,17 @@ void setupSignalHandlers()
|
|||||||
memset(&act, 0, sizeof(act));
|
memset(&act, 0, sizeof(act));
|
||||||
act.sa_handler = handleSigTerm;
|
act.sa_handler = handleSigTerm;
|
||||||
sigaction(SIGTERM, &act, 0);
|
sigaction(SIGTERM, &act, 0);
|
||||||
|
|
||||||
|
// catch SIGSEGV signal to terminate the program
|
||||||
|
memset(&act, 0, sizeof(act));
|
||||||
|
act.sa_handler = handleSigSegv;
|
||||||
|
sigaction(SIGSEGV, &act, 0);
|
||||||
|
|
||||||
|
// catch SIGABRT signal to terminate the program
|
||||||
|
memset(&act, 0, sizeof(act));
|
||||||
|
act.sa_handler = handleSigAbrt;
|
||||||
|
sigaction(SIGABRT, &act, 0);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
@ -1270,6 +1304,16 @@ int main(int argc, char** argv)
|
|||||||
new boost::thread(utils::MonitorProcMem(0, checkPct, SUBSYSTEM_ID_WE_BULK));
|
new boost::thread(utils::MonitorProcMem(0, checkPct, SUBSYSTEM_ID_WE_BULK));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rc = BRMWrapper::getInstance()->newCpimportJob(cpimportJobId);
|
||||||
|
if (rc != NO_ERROR)
|
||||||
|
{
|
||||||
|
WErrorCodes ec;
|
||||||
|
std::ostringstream oss;
|
||||||
|
oss << "Error in creating new cpimport job on Controller node; " << ec.errorString(rc)
|
||||||
|
<< "; cpimport is terminating.";
|
||||||
|
startupError(oss.str(), false);
|
||||||
|
}
|
||||||
|
|
||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
// This is the real business
|
// This is the real business
|
||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
@ -1320,6 +1364,7 @@ int main(int argc, char** argv)
|
|||||||
rc = ERR_UNKNOWN;
|
rc = ERR_UNKNOWN;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
|
||||||
// Free up resources allocated by MY_INIT() above.
|
// Free up resources allocated by MY_INIT() above.
|
||||||
my_end(0);
|
my_end(0);
|
||||||
|
|
||||||
|
@ -60,8 +60,7 @@ BRMWrapper* volatile BRMWrapper::m_instance = NULL;
|
|||||||
boost::thread_specific_ptr<int> BRMWrapper::m_ThreadDataPtr;
|
boost::thread_specific_ptr<int> BRMWrapper::m_ThreadDataPtr;
|
||||||
boost::mutex BRMWrapper::m_instanceCreateMutex;
|
boost::mutex BRMWrapper::m_instanceCreateMutex;
|
||||||
|
|
||||||
|
bool BRMWrapper::m_useVb = true;
|
||||||
bool BRMWrapper::m_useVb = true;
|
|
||||||
OID BRMWrapper::m_curVBOid = INVALID_NUM;
|
OID BRMWrapper::m_curVBOid = INVALID_NUM;
|
||||||
IDBDataFile* BRMWrapper::m_curVBFile = NULL;
|
IDBDataFile* BRMWrapper::m_curVBFile = NULL;
|
||||||
boost::mutex vbFileLock;
|
boost::mutex vbFileLock;
|
||||||
@ -744,6 +743,16 @@ int BRMWrapper::copyVBBlock(IDBDataFile* pSourceFile, IDBDataFile* pTargetFile,
|
|||||||
return NO_ERROR;
|
return NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint8_t BRMWrapper::newCpimportJob(uint32_t &jobId)
|
||||||
|
{
|
||||||
|
return blockRsltnMgrPtr->newCpimportJob(jobId);
|
||||||
|
}
|
||||||
|
|
||||||
|
void BRMWrapper::finishCpimportJob(uint32_t jobId)
|
||||||
|
{
|
||||||
|
blockRsltnMgrPtr->finishCpimportJob(jobId);
|
||||||
|
}
|
||||||
|
|
||||||
int BRMWrapper::commit(const VER_t transID)
|
int BRMWrapper::commit(const VER_t transID)
|
||||||
{
|
{
|
||||||
int rc = blockRsltnMgrPtr->vbCommit(transID);
|
int rc = blockRsltnMgrPtr->vbCommit(transID);
|
||||||
|
@ -380,6 +380,8 @@ class BRMWrapper : public WEObj
|
|||||||
* @brief Commit the transaction
|
* @brief Commit the transaction
|
||||||
*/
|
*/
|
||||||
EXPORT int commit(const BRM::VER_t transID);
|
EXPORT int commit(const BRM::VER_t transID);
|
||||||
|
EXPORT uint8_t newCpimportJob(uint32_t &jobId);
|
||||||
|
EXPORT void finishCpimportJob(uint32_t jobId);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Copy blocks between write engine and version buffer
|
* @brief Copy blocks between write engine and version buffer
|
||||||
@ -466,7 +468,7 @@ class BRMWrapper : public WEObj
|
|||||||
static boost::thread_specific_ptr<int> m_ThreadDataPtr;
|
static boost::thread_specific_ptr<int> m_ThreadDataPtr;
|
||||||
static boost::mutex m_instanceCreateMutex;
|
static boost::mutex m_instanceCreateMutex;
|
||||||
|
|
||||||
EXPORT static bool m_useVb;
|
EXPORT static bool m_useVb;
|
||||||
|
|
||||||
static OID m_curVBOid;
|
static OID m_curVBOid;
|
||||||
static IDBDataFile* m_curVBFile;
|
static IDBDataFile* m_curVBFile;
|
||||||
|
Reference in New Issue
Block a user