You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
Revert "MCOL-5555 Add support for startreadonly
command."
This reverts commit 441cd9d34f
.
This commit is contained in:
committed by
Leonid Fedorov
parent
041eb2ec8a
commit
6315546557
@ -532,11 +532,6 @@ const uint8_t BULK_UPDATE_DBROOT = 100;
|
|||||||
const uint8_t GET_SYSTEM_CATALOG = 101;
|
const uint8_t GET_SYSTEM_CATALOG = 101;
|
||||||
const uint8_t BULK_WRITE_VB_ENTRY = 102;
|
const uint8_t BULK_WRITE_VB_ENTRY = 102;
|
||||||
|
|
||||||
const uint8_t NEW_CPIMPORT_JOB = 103;
|
|
||||||
const uint8_t FINISH_CPIMPORT_JOB = 104;
|
|
||||||
const uint8_t START_READONLY = 105;
|
|
||||||
const uint8_t FORCE_CLEAR_CPIMPORT_JOBS = 106;
|
|
||||||
|
|
||||||
/* Error codes returned by the DBRM functions. */
|
/* Error codes returned by the DBRM functions. */
|
||||||
/// The operation was successful
|
/// The operation was successful
|
||||||
const int8_t ERR_OK = 0;
|
const int8_t ERR_OK = 0;
|
||||||
|
@ -2733,25 +2733,6 @@ int DBRM::setReadOnly(bool b) DBRM_THROW
|
|||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
int DBRM::startReadOnly() DBRM_THROW
|
|
||||||
{
|
|
||||||
ByteStream command, response;
|
|
||||||
uint8_t err;
|
|
||||||
|
|
||||||
command << START_READONLY;
|
|
||||||
err = send_recv(command, response);
|
|
||||||
|
|
||||||
if (err != ERR_OK)
|
|
||||||
return err;
|
|
||||||
|
|
||||||
if (response.length() != 1)
|
|
||||||
return ERR_NETWORK;
|
|
||||||
|
|
||||||
response >> err;
|
|
||||||
CHECK_EMPTY(response);
|
|
||||||
return err;
|
|
||||||
}
|
|
||||||
|
|
||||||
int DBRM::isReadWrite() throw()
|
int DBRM::isReadWrite() throw()
|
||||||
{
|
{
|
||||||
#ifdef BRM_INFO
|
#ifdef BRM_INFO
|
||||||
@ -3025,73 +3006,6 @@ const QueryContext DBRM::sysCatVerID()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint8_t DBRM::newCpimportJob(uint32_t& jobId)
|
|
||||||
{
|
|
||||||
ByteStream command, response;
|
|
||||||
uint8_t err;
|
|
||||||
command << NEW_CPIMPORT_JOB;
|
|
||||||
err = send_recv(command, response);
|
|
||||||
|
|
||||||
if (err != ERR_OK)
|
|
||||||
{
|
|
||||||
log("DBRM: SessionManager::newCpimportJob(): network error");
|
|
||||||
return err;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (response.length() != 5)
|
|
||||||
{
|
|
||||||
log("DBRM: SessionManager::newCpimportJob(): bad response");
|
|
||||||
return ERR_READONLY;
|
|
||||||
}
|
|
||||||
|
|
||||||
response >> err;
|
|
||||||
response >> jobId;
|
|
||||||
return ERR_OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
void DBRM::finishCpimportJob(uint32_t jobId)
|
|
||||||
{
|
|
||||||
ByteStream command, response;
|
|
||||||
uint8_t err;
|
|
||||||
|
|
||||||
command << FINISH_CPIMPORT_JOB << (uint32_t)jobId;
|
|
||||||
err = send_recv(command, response);
|
|
||||||
|
|
||||||
if (err != ERR_OK)
|
|
||||||
log("DBRM: error: SessionManager::finishCpimportJob() failed");
|
|
||||||
else if (response.length() != 1)
|
|
||||||
log("DBRM: error: SessionManager::finishCpimportJob() failed (bad response)", logging::LOG_TYPE_ERROR);
|
|
||||||
|
|
||||||
response >> err;
|
|
||||||
|
|
||||||
if (err != ERR_OK)
|
|
||||||
log("DBRM: error: SessionManager::finishCpimportJob() failed (valid error code)",
|
|
||||||
logging::LOG_TYPE_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
int DBRM::forceClearCpimportJobs() DBRM_THROW
|
|
||||||
{
|
|
||||||
ByteStream command, response;
|
|
||||||
uint8_t err;
|
|
||||||
|
|
||||||
command << FORCE_CLEAR_CPIMPORT_JOBS;
|
|
||||||
err = send_recv(command, response);
|
|
||||||
|
|
||||||
if (err != ERR_OK)
|
|
||||||
log("DBRM: error: SessionManager::forceClearAllCpimportJobs()) failed");
|
|
||||||
else if (response.length() != 1)
|
|
||||||
log("DBRM: error: SessionManager::forceClearAllCpimportJobs() failed (bad response)",
|
|
||||||
logging::LOG_TYPE_ERROR);
|
|
||||||
|
|
||||||
response >> err;
|
|
||||||
|
|
||||||
if (err != ERR_OK)
|
|
||||||
log("DBRM: error: SessionManager::forceClearAllCpimportJobs() failed (valid error code)",
|
|
||||||
logging::LOG_TYPE_ERROR);
|
|
||||||
|
|
||||||
return err;
|
|
||||||
}
|
|
||||||
|
|
||||||
const TxnID DBRM::newTxnID(const SessionManagerServer::SID session, bool block, bool isDDL)
|
const TxnID DBRM::newTxnID(const SessionManagerServer::SID session, bool block, bool isDDL)
|
||||||
{
|
{
|
||||||
#ifdef BRM_INFO
|
#ifdef BRM_INFO
|
||||||
@ -4544,12 +4458,11 @@ void DBRM::deleteAISequence(uint32_t OID)
|
|||||||
void DBRM::addToLBIDList(uint32_t sessionID, vector<LBID_t>& lbidList)
|
void DBRM::addToLBIDList(uint32_t sessionID, vector<LBID_t>& lbidList)
|
||||||
{
|
{
|
||||||
boost::shared_ptr<execplan::CalpontSystemCatalog> systemCatalogPtr =
|
boost::shared_ptr<execplan::CalpontSystemCatalog> systemCatalogPtr =
|
||||||
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
|
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
|
||||||
|
|
||||||
std::unordered_map<
|
std::unordered_map<execplan::CalpontSystemCatalog::OID,
|
||||||
execplan::CalpontSystemCatalog::OID,
|
std::unordered_map<execplan::CalpontSystemCatalog::OID,
|
||||||
std::unordered_map<execplan::CalpontSystemCatalog::OID, std::vector<struct BRM::EMEntry>>>
|
std::vector<struct BRM::EMEntry>>> extentMap;
|
||||||
extentMap;
|
|
||||||
|
|
||||||
int err = 0;
|
int err = 0;
|
||||||
|
|
||||||
@ -4574,15 +4487,18 @@ void DBRM::addToLBIDList(uint32_t sessionID, vector<LBID_t>& lbidList)
|
|||||||
throw runtime_error(os.str());
|
throw runtime_error(os.str());
|
||||||
}
|
}
|
||||||
|
|
||||||
execplan::CalpontSystemCatalog::OID tableOid = systemCatalogPtr->isAUXColumnOID(oid);
|
execplan::CalpontSystemCatalog::OID tableOid =
|
||||||
|
systemCatalogPtr->isAUXColumnOID(oid);
|
||||||
|
|
||||||
if (tableOid >= 3000)
|
if (tableOid >= 3000)
|
||||||
{
|
{
|
||||||
if (tableOidSet.find(tableOid) == tableOidSet.end())
|
if (tableOidSet.find(tableOid) == tableOidSet.end())
|
||||||
{
|
{
|
||||||
tableOidSet.insert(tableOid);
|
tableOidSet.insert(tableOid);
|
||||||
execplan::CalpontSystemCatalog::TableName tableName = systemCatalogPtr->tableName(tableOid);
|
execplan::CalpontSystemCatalog::TableName tableName =
|
||||||
execplan::CalpontSystemCatalog::RIDList tableColRidList = systemCatalogPtr->columnRIDs(tableName);
|
systemCatalogPtr->tableName(tableOid);
|
||||||
|
execplan::CalpontSystemCatalog::RIDList tableColRidList =
|
||||||
|
systemCatalogPtr->columnRIDs(tableName);
|
||||||
|
|
||||||
for (unsigned j = 0; j < tableColRidList.size(); j++)
|
for (unsigned j = 0; j < tableColRidList.size(); j++)
|
||||||
{
|
{
|
||||||
|
@ -753,8 +753,6 @@ class DBRM
|
|||||||
/* SessionManager interface */
|
/* SessionManager interface */
|
||||||
EXPORT const QueryContext verID();
|
EXPORT const QueryContext verID();
|
||||||
EXPORT const QueryContext sysCatVerID();
|
EXPORT const QueryContext sysCatVerID();
|
||||||
EXPORT uint8_t newCpimportJob(uint32_t &jobId);
|
|
||||||
EXPORT void finishCpimportJob(uint32_t jobId);
|
|
||||||
EXPORT const TxnID newTxnID(const SessionManagerServer::SID session, bool block, bool isDDL = false);
|
EXPORT const TxnID newTxnID(const SessionManagerServer::SID session, bool block, bool isDDL = false);
|
||||||
EXPORT void committed(BRM::TxnID& txnid);
|
EXPORT void committed(BRM::TxnID& txnid);
|
||||||
EXPORT void rolledback(BRM::TxnID& txnid);
|
EXPORT void rolledback(BRM::TxnID& txnid);
|
||||||
@ -828,8 +826,6 @@ class DBRM
|
|||||||
EXPORT int resume() DBRM_THROW;
|
EXPORT int resume() DBRM_THROW;
|
||||||
EXPORT int forceReload() DBRM_THROW;
|
EXPORT int forceReload() DBRM_THROW;
|
||||||
EXPORT int setReadOnly(bool b) DBRM_THROW;
|
EXPORT int setReadOnly(bool b) DBRM_THROW;
|
||||||
EXPORT int startReadOnly() DBRM_THROW;
|
|
||||||
EXPORT int forceClearCpimportJobs() DBRM_THROW;
|
|
||||||
EXPORT int isReadWrite() throw();
|
EXPORT int isReadWrite() throw();
|
||||||
|
|
||||||
EXPORT bool isEMEmpty() throw();
|
EXPORT bool isEMEmpty() throw();
|
||||||
|
@ -39,10 +39,7 @@ bool vflg;
|
|||||||
|
|
||||||
void usage(char* c)
|
void usage(char* c)
|
||||||
{
|
{
|
||||||
cerr << "Usage: " << c
|
cerr << "Usage: " << c << " [-vh] status | halt | resume | readonly | readwrite | reload" << endl;
|
||||||
<< " [-vh] status | halt | resume | readonly | readwrite | reload | startreadonly | "
|
|
||||||
"forceclearcpimportjobs"
|
|
||||||
<< endl;
|
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -118,20 +115,6 @@ void do_status()
|
|||||||
errMsg(err);
|
errMsg(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
void start_readonly()
|
|
||||||
{
|
|
||||||
int err;
|
|
||||||
err = dbrm.startReadOnly();
|
|
||||||
errMsg(err);
|
|
||||||
}
|
|
||||||
|
|
||||||
void force_clear_cpimport_jobs()
|
|
||||||
{
|
|
||||||
int err;
|
|
||||||
err = dbrm.forceClearCpimportJobs();
|
|
||||||
errMsg(err);
|
|
||||||
}
|
|
||||||
|
|
||||||
void do_sysstatus()
|
void do_sysstatus()
|
||||||
{
|
{
|
||||||
int err;
|
int err;
|
||||||
@ -189,10 +172,6 @@ int main(int argc, char** argv)
|
|||||||
set_readonly(false);
|
set_readonly(false);
|
||||||
else if (cmd == "reload")
|
else if (cmd == "reload")
|
||||||
do_reload();
|
do_reload();
|
||||||
else if (cmd == "startreadonly")
|
|
||||||
start_readonly();
|
|
||||||
else if (cmd == "forceclearcpimportjobs")
|
|
||||||
force_clear_cpimport_jobs();
|
|
||||||
else if (cmd == "sysstatus")
|
else if (cmd == "sysstatus")
|
||||||
do_sysstatus();
|
do_sysstatus();
|
||||||
else
|
else
|
||||||
|
@ -24,7 +24,6 @@
|
|||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <thread>
|
|
||||||
|
|
||||||
#include "sessionmanager.h"
|
#include "sessionmanager.h"
|
||||||
#include "socketclosed.h"
|
#include "socketclosed.h"
|
||||||
@ -478,17 +477,6 @@ void MasterDBRMNode::msgProcessor()
|
|||||||
case GET_UNCOMMITTED_LBIDS: doGetUncommittedLbids(msg, p); continue;
|
case GET_UNCOMMITTED_LBIDS: doGetUncommittedLbids(msg, p); continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (cmd)
|
|
||||||
{
|
|
||||||
case NEW_CPIMPORT_JOB: doNewCpimportJob(p); continue;
|
|
||||||
|
|
||||||
case FINISH_CPIMPORT_JOB: doFinishCpimportJob(msg, p); continue;
|
|
||||||
|
|
||||||
case START_READONLY: doStartReadOnly(p->sock); continue;
|
|
||||||
|
|
||||||
case FORCE_CLEAR_CPIMPORT_JOBS: doForceClearAllCpimportJobs(p->sock); continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Process TableLock calls */
|
/* Process TableLock calls */
|
||||||
switch (cmd)
|
switch (cmd)
|
||||||
{
|
{
|
||||||
@ -1161,66 +1149,11 @@ void MasterDBRMNode::doResume(messageqcpp::IOSocket* sock)
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void MasterDBRMNode::doForceClearAllCpimportJobs(messageqcpp::IOSocket* sock)
|
|
||||||
{
|
|
||||||
#ifdef BMR_VERBOSE
|
|
||||||
std::cout << "doForceClearCpimprtJobs" << std::endl;
|
|
||||||
#endif
|
|
||||||
ByteStream reply;
|
|
||||||
std::unique_lock lk(cpimportMutex);
|
|
||||||
sm.clearAllCpimportJobs();
|
|
||||||
|
|
||||||
reply << (uint8_t)ERR_OK;
|
|
||||||
try
|
|
||||||
{
|
|
||||||
sock->write(reply);
|
|
||||||
}
|
|
||||||
catch (exception&)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void MasterDBRMNode::doStartReadOnly(messageqcpp::IOSocket* sock)
|
|
||||||
{
|
|
||||||
#ifdef BRM_VERBOSE
|
|
||||||
std::cout << "doStartReadonly" << std::endl;
|
|
||||||
#endif
|
|
||||||
ByteStream reply;
|
|
||||||
|
|
||||||
// Spawn a new thread and detach it, we cannot block `msgProcessor`.
|
|
||||||
std::thread readonlyAdjuster(
|
|
||||||
[this]()
|
|
||||||
{
|
|
||||||
std::unique_lock lk(cpimportMutex);
|
|
||||||
if (sm.getCpimportJobsCount() != 0)
|
|
||||||
{
|
|
||||||
waitToFinishJobs = true;
|
|
||||||
// Wait until all cpimort jobs are done.
|
|
||||||
cpimportJobsCond.wait(lk, [this]() { return sm.getCpimportJobsCount() == 0; });
|
|
||||||
setReadOnly(true);
|
|
||||||
waitToFinishJobs = false;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
setReadOnly(true);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
readonlyAdjuster.detach();
|
|
||||||
|
|
||||||
reply << (uint8_t)ERR_OK;
|
|
||||||
try
|
|
||||||
{
|
|
||||||
sock->write(reply);
|
|
||||||
}
|
|
||||||
catch (exception&)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void MasterDBRMNode::doSetReadOnly(messageqcpp::IOSocket* sock, bool b)
|
void MasterDBRMNode::doSetReadOnly(messageqcpp::IOSocket* sock, bool b)
|
||||||
{
|
{
|
||||||
ByteStream reply;
|
ByteStream reply;
|
||||||
|
|
||||||
setReadOnly(b);
|
setReadOnly(b);
|
||||||
reply << (uint8_t)ERR_OK;
|
reply << (uint8_t)ERR_OK;
|
||||||
|
|
||||||
@ -1425,103 +1358,6 @@ void MasterDBRMNode::doSysCatVerID(ByteStream& msg, ThreadParams* p)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void MasterDBRMNode::doNewCpimportJob(ThreadParams* p)
|
|
||||||
{
|
|
||||||
#ifdef BRM_VERBOSE
|
|
||||||
std::cerr << "doNewCpimportJob" << std::endl;
|
|
||||||
#endif
|
|
||||||
ByteStream reply;
|
|
||||||
uint32_t jobId;
|
|
||||||
|
|
||||||
std::unique_lock lk(cpimportMutex);
|
|
||||||
// That means that controller node is waiting untill all active cpimport jobs are done.
|
|
||||||
// We cannot block `msgProcessor` and cannot create a new job, so exit with `readonly` code.
|
|
||||||
if (waitToFinishJobs)
|
|
||||||
{
|
|
||||||
reply << (uint8_t)ERR_READONLY;
|
|
||||||
try
|
|
||||||
{
|
|
||||||
p->sock->write(reply);
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
jobId = sm.newCpimportJob();
|
|
||||||
}
|
|
||||||
catch (exception&)
|
|
||||||
{
|
|
||||||
reply.reset();
|
|
||||||
reply << (uint8_t)ERR_FAILURE;
|
|
||||||
try
|
|
||||||
{
|
|
||||||
p->sock->write(reply);
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
reply << (uint8_t)ERR_OK;
|
|
||||||
reply << (uint32_t)jobId;
|
|
||||||
try
|
|
||||||
{
|
|
||||||
p->sock->write(reply);
|
|
||||||
}
|
|
||||||
catch (exception&)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void MasterDBRMNode::doFinishCpimportJob(ByteStream& msg, ThreadParams* p)
|
|
||||||
{
|
|
||||||
#ifdef BRM_VERBOSE
|
|
||||||
std::cout << "doFinishCpimportJob" << std::endl;
|
|
||||||
#endif
|
|
||||||
ByteStream reply;
|
|
||||||
uint32_t cpimportJob;
|
|
||||||
uint8_t cmd;
|
|
||||||
std::unique_lock lk(cpimportMutex);
|
|
||||||
|
|
||||||
msg >> cmd;
|
|
||||||
msg >> cpimportJob;
|
|
||||||
try
|
|
||||||
{
|
|
||||||
sm.finishCpimortJob(cpimportJob);
|
|
||||||
}
|
|
||||||
catch (exception&)
|
|
||||||
{
|
|
||||||
reply << (uint8_t)ERR_FAILURE;
|
|
||||||
try
|
|
||||||
{
|
|
||||||
p->sock->write(reply);
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
reply << (uint8_t)ERR_OK;
|
|
||||||
try
|
|
||||||
{
|
|
||||||
p->sock->write(reply);
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
if (waitToFinishJobs && sm.getCpimportJobsCount() == 0)
|
|
||||||
cpimportJobsCond.notify_one();
|
|
||||||
}
|
|
||||||
|
|
||||||
void MasterDBRMNode::doNewTxnID(ByteStream& msg, ThreadParams* p)
|
void MasterDBRMNode::doNewTxnID(ByteStream& msg, ThreadParams* p)
|
||||||
{
|
{
|
||||||
ByteStream reply;
|
ByteStream reply;
|
||||||
|
@ -186,7 +186,6 @@ class MasterDBRMNode
|
|||||||
void doReload(messageqcpp::IOSocket* sock);
|
void doReload(messageqcpp::IOSocket* sock);
|
||||||
void doSetReadOnly(messageqcpp::IOSocket* sock, bool b);
|
void doSetReadOnly(messageqcpp::IOSocket* sock, bool b);
|
||||||
void doGetReadOnly(messageqcpp::IOSocket* sock);
|
void doGetReadOnly(messageqcpp::IOSocket* sock);
|
||||||
void doStartReadOnly(messageqcpp::IOSocket* sock);
|
|
||||||
|
|
||||||
/* SessionManager interface */
|
/* SessionManager interface */
|
||||||
SessionManagerServer sm;
|
SessionManagerServer sm;
|
||||||
@ -194,9 +193,6 @@ class MasterDBRMNode
|
|||||||
void doGetSystemCatalog(messageqcpp::ByteStream& msg, ThreadParams* p);
|
void doGetSystemCatalog(messageqcpp::ByteStream& msg, ThreadParams* p);
|
||||||
void doSysCatVerID(messageqcpp::ByteStream& msg, ThreadParams* p);
|
void doSysCatVerID(messageqcpp::ByteStream& msg, ThreadParams* p);
|
||||||
void doNewTxnID(messageqcpp::ByteStream& msg, ThreadParams* p);
|
void doNewTxnID(messageqcpp::ByteStream& msg, ThreadParams* p);
|
||||||
void doNewCpimportJob(ThreadParams* p);
|
|
||||||
void doFinishCpimportJob(messageqcpp::ByteStream& msg, ThreadParams* p);
|
|
||||||
void doForceClearAllCpimportJobs(messageqcpp::IOSocket* sock);
|
|
||||||
void doCommitted(messageqcpp::ByteStream& msg, ThreadParams* p);
|
void doCommitted(messageqcpp::ByteStream& msg, ThreadParams* p);
|
||||||
void doRolledBack(messageqcpp::ByteStream& msg, ThreadParams* p);
|
void doRolledBack(messageqcpp::ByteStream& msg, ThreadParams* p);
|
||||||
void doGetTxnID(messageqcpp::ByteStream& msg, ThreadParams* p);
|
void doGetTxnID(messageqcpp::ByteStream& msg, ThreadParams* p);
|
||||||
@ -251,14 +247,11 @@ class MasterDBRMNode
|
|||||||
boost::mutex mutex2; // protects params and the hand-off TODO: simplify
|
boost::mutex mutex2; // protects params and the hand-off TODO: simplify
|
||||||
boost::mutex slaveLock; // syncs communication with the slaves
|
boost::mutex slaveLock; // syncs communication with the slaves
|
||||||
boost::mutex serverLock; // kludge to synchronize reloading
|
boost::mutex serverLock; // kludge to synchronize reloading
|
||||||
std::mutex cpimportMutex;
|
|
||||||
std::condition_variable cpimportJobsCond;
|
|
||||||
int runners, NumWorkers;
|
int runners, NumWorkers;
|
||||||
ThreadParams* params;
|
ThreadParams* params;
|
||||||
volatile bool die, halting;
|
volatile bool die, halting;
|
||||||
bool reloadCmd;
|
bool reloadCmd;
|
||||||
mutable bool readOnly;
|
mutable bool readOnly;
|
||||||
mutable bool waitToFinishJobs{false};
|
|
||||||
struct timespec MSG_TIMEOUT;
|
struct timespec MSG_TIMEOUT;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -34,7 +34,6 @@
|
|||||||
#include <string>
|
#include <string>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
#include <limits>
|
#include <limits>
|
||||||
#include <unordered_set>
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
#include <boost/thread/mutex.hpp>
|
#include <boost/thread/mutex.hpp>
|
||||||
@ -257,28 +256,6 @@ const QueryContext SessionManagerServer::sysCatVerID()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t SessionManagerServer::newCpimportJob()
|
|
||||||
{
|
|
||||||
std::scoped_lock lk(cpimportMutex);
|
|
||||||
activeCpimportJobs.insert(cpimportJobId);
|
|
||||||
auto ret = cpimportJobId;
|
|
||||||
++cpimportJobId;
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
void SessionManagerServer::finishCpimortJob(uint32_t jobId)
|
|
||||||
{
|
|
||||||
std::scoped_lock lk(cpimportMutex);
|
|
||||||
if (activeCpimportJobs.count(jobId))
|
|
||||||
activeCpimportJobs.erase(jobId);
|
|
||||||
}
|
|
||||||
|
|
||||||
void SessionManagerServer::clearAllCpimportJobs()
|
|
||||||
{
|
|
||||||
std::scoped_lock lk(cpimportMutex);
|
|
||||||
activeCpimportJobs.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
const TxnID SessionManagerServer::newTxnID(const SID session, bool block, bool isDDL)
|
const TxnID SessionManagerServer::newTxnID(const SID session, bool block, bool isDDL)
|
||||||
{
|
{
|
||||||
TxnID ret; // ctor must set valid = false
|
TxnID ret; // ctor must set valid = false
|
||||||
@ -406,12 +383,6 @@ void SessionManagerServer::clearSystemState(uint32_t state)
|
|||||||
saveSystemState();
|
saveSystemState();
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t SessionManagerServer::getCpimportJobsCount()
|
|
||||||
{
|
|
||||||
std::scoped_lock lk(cpimportMutex);
|
|
||||||
return activeCpimportJobs.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t SessionManagerServer::getTxnCount()
|
uint32_t SessionManagerServer::getTxnCount()
|
||||||
{
|
{
|
||||||
boost::mutex::scoped_lock lk(mutex);
|
boost::mutex::scoped_lock lk(mutex);
|
||||||
|
@ -27,9 +27,8 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <condition_variable>
|
|
||||||
|
|
||||||
#include <unordered_set>
|
|
||||||
#include <boost/thread/mutex.hpp>
|
#include <boost/thread/mutex.hpp>
|
||||||
#include <boost/thread/condition_variable.hpp>
|
#include <boost/thread/condition_variable.hpp>
|
||||||
|
|
||||||
@ -40,6 +39,7 @@
|
|||||||
|
|
||||||
#define EXPORT
|
#define EXPORT
|
||||||
|
|
||||||
|
|
||||||
namespace BRM
|
namespace BRM
|
||||||
{
|
{
|
||||||
/** @brief Issues transaction IDs and keeps track of the current system-wide version ID.
|
/** @brief Issues transaction IDs and keeps track of the current system-wide version ID.
|
||||||
@ -146,15 +146,6 @@ class SessionManagerServer
|
|||||||
*/
|
*/
|
||||||
EXPORT const TxnID newTxnID(const SID session, bool block = true, bool isDDL = false);
|
EXPORT const TxnID newTxnID(const SID session, bool block = true, bool isDDL = false);
|
||||||
|
|
||||||
// Adds a new job into `active` cpimport job list and return id of that job.
|
|
||||||
EXPORT uint32_t newCpimportJob();
|
|
||||||
|
|
||||||
// Removes the given `jobId` from `active` cpimort job list.
|
|
||||||
EXPORT void finishCpimortJob(uint32_t jobId);
|
|
||||||
|
|
||||||
// Clears all active cpimport jobs.
|
|
||||||
EXPORT void clearAllCpimportJobs();
|
|
||||||
|
|
||||||
/** @brief Record that a transaction has been committed
|
/** @brief Record that a transaction has been committed
|
||||||
*
|
*
|
||||||
* Record that a transaction has been committed.
|
* Record that a transaction has been committed.
|
||||||
@ -262,9 +253,6 @@ class SessionManagerServer
|
|||||||
*/
|
*/
|
||||||
EXPORT uint32_t getTxnCount();
|
EXPORT uint32_t getTxnCount();
|
||||||
|
|
||||||
|
|
||||||
EXPORT uint32_t getCpimportJobsCount();
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
SessionManagerServer(const SessionManagerServer&);
|
SessionManagerServer(const SessionManagerServer&);
|
||||||
SessionManagerServer& operator=(const SessionManagerServer&);
|
SessionManagerServer& operator=(const SessionManagerServer&);
|
||||||
@ -289,12 +277,9 @@ class SessionManagerServer
|
|||||||
boost::mutex mutex;
|
boost::mutex mutex;
|
||||||
boost::condition_variable condvar; // used to synthesize a semaphore
|
boost::condition_variable condvar; // used to synthesize a semaphore
|
||||||
uint32_t semValue;
|
uint32_t semValue;
|
||||||
|
|
||||||
std::unordered_set<uint32_t> activeCpimportJobs;
|
|
||||||
uint32_t cpimportJobId{0};
|
|
||||||
std::mutex cpimportMutex;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace BRM
|
} // namespace BRM
|
||||||
|
|
||||||
|
|
||||||
#undef EXPORT
|
#undef EXPORT
|
||||||
|
@ -59,7 +59,6 @@ namespace
|
|||||||
char* pgmName = 0;
|
char* pgmName = 0;
|
||||||
const std::string IMPORT_PATH_CWD(".");
|
const std::string IMPORT_PATH_CWD(".");
|
||||||
bool bDebug = false;
|
bool bDebug = false;
|
||||||
uint32_t cpimportJobId = 0;
|
|
||||||
|
|
||||||
//@bug 4643: cpimport job ended during setup w/o any err msg.
|
//@bug 4643: cpimport job ended during setup w/o any err msg.
|
||||||
// Added a try/catch with logging to main() in case
|
// Added a try/catch with logging to main() in case
|
||||||
@ -193,7 +192,6 @@ void printUsage()
|
|||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
void handleSigTerm(int i)
|
void handleSigTerm(int i)
|
||||||
{
|
{
|
||||||
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
|
|
||||||
std::cout << "Received SIGTERM to terminate the process..." << std::endl;
|
std::cout << "Received SIGTERM to terminate the process..." << std::endl;
|
||||||
BulkStatus::setJobStatus(EXIT_FAILURE);
|
BulkStatus::setJobStatus(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
@ -203,32 +201,12 @@ void handleSigTerm(int i)
|
|||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
void handleControlC(int i)
|
void handleControlC(int i)
|
||||||
{
|
{
|
||||||
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
|
|
||||||
if (!BulkLoad::disableConsoleOutput())
|
if (!BulkLoad::disableConsoleOutput())
|
||||||
std::cout << "Received Control-C to terminate the process..." << std::endl;
|
std::cout << "Received Control-C to terminate the process..." << std::endl;
|
||||||
|
|
||||||
BulkStatus::setJobStatus(EXIT_FAILURE);
|
BulkStatus::setJobStatus(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
|
||||||
// Signal handler to catch SIGTERM signal to terminate the process
|
|
||||||
//------------------------------------------------------------------------------
|
|
||||||
void handleSigSegv(int i)
|
|
||||||
{
|
|
||||||
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
|
|
||||||
std::cout << "Received SIGSEGV to terminate the process..." << std::endl;
|
|
||||||
BulkStatus::setJobStatus(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
|
||||||
// Signal handler to catch SIGTERM signal to terminate the process
|
|
||||||
//------------------------------------------------------------------------------
|
|
||||||
void handleSigAbrt(int i)
|
|
||||||
{
|
|
||||||
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
|
|
||||||
std::cout << "Received SIGABRT to terminate the process..." << std::endl;
|
|
||||||
BulkStatus::setJobStatus(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
// If error occurs during startup, this function is called to log the specified
|
// If error occurs during startup, this function is called to log the specified
|
||||||
@ -236,7 +214,6 @@ void handleSigAbrt(int i)
|
|||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
void startupError(const std::string& errMsg, bool showHint)
|
void startupError(const std::string& errMsg, bool showHint)
|
||||||
{
|
{
|
||||||
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
|
|
||||||
// Log to console
|
// Log to console
|
||||||
if (!BulkLoad::disableConsoleOutput())
|
if (!BulkLoad::disableConsoleOutput())
|
||||||
cerr << errMsg << endl;
|
cerr << errMsg << endl;
|
||||||
@ -298,17 +275,6 @@ void setupSignalHandlers()
|
|||||||
memset(&act, 0, sizeof(act));
|
memset(&act, 0, sizeof(act));
|
||||||
act.sa_handler = handleSigTerm;
|
act.sa_handler = handleSigTerm;
|
||||||
sigaction(SIGTERM, &act, 0);
|
sigaction(SIGTERM, &act, 0);
|
||||||
|
|
||||||
// catch SIGSEGV signal to terminate the program
|
|
||||||
memset(&act, 0, sizeof(act));
|
|
||||||
act.sa_handler = handleSigSegv;
|
|
||||||
sigaction(SIGSEGV, &act, 0);
|
|
||||||
|
|
||||||
// catch SIGABRT signal to terminate the program
|
|
||||||
memset(&act, 0, sizeof(act));
|
|
||||||
act.sa_handler = handleSigAbrt;
|
|
||||||
sigaction(SIGABRT, &act, 0);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
@ -1304,16 +1270,6 @@ int main(int argc, char** argv)
|
|||||||
new boost::thread(utils::MonitorProcMem(0, checkPct, SUBSYSTEM_ID_WE_BULK));
|
new boost::thread(utils::MonitorProcMem(0, checkPct, SUBSYSTEM_ID_WE_BULK));
|
||||||
}
|
}
|
||||||
|
|
||||||
rc = BRMWrapper::getInstance()->newCpimportJob(cpimportJobId);
|
|
||||||
if (rc != NO_ERROR)
|
|
||||||
{
|
|
||||||
WErrorCodes ec;
|
|
||||||
std::ostringstream oss;
|
|
||||||
oss << "Error in creating new cpimport job on Controller node; " << ec.errorString(rc)
|
|
||||||
<< "; cpimport is terminating.";
|
|
||||||
startupError(oss.str(), false);
|
|
||||||
}
|
|
||||||
|
|
||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
// This is the real business
|
// This is the real business
|
||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
@ -1364,7 +1320,6 @@ int main(int argc, char** argv)
|
|||||||
rc = ERR_UNKNOWN;
|
rc = ERR_UNKNOWN;
|
||||||
}
|
}
|
||||||
|
|
||||||
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
|
|
||||||
// Free up resources allocated by MY_INIT() above.
|
// Free up resources allocated by MY_INIT() above.
|
||||||
my_end(0);
|
my_end(0);
|
||||||
|
|
||||||
|
@ -60,7 +60,8 @@ BRMWrapper* volatile BRMWrapper::m_instance = NULL;
|
|||||||
boost::thread_specific_ptr<int> BRMWrapper::m_ThreadDataPtr;
|
boost::thread_specific_ptr<int> BRMWrapper::m_ThreadDataPtr;
|
||||||
boost::mutex BRMWrapper::m_instanceCreateMutex;
|
boost::mutex BRMWrapper::m_instanceCreateMutex;
|
||||||
|
|
||||||
bool BRMWrapper::m_useVb = true;
|
|
||||||
|
bool BRMWrapper::m_useVb = true;
|
||||||
OID BRMWrapper::m_curVBOid = INVALID_NUM;
|
OID BRMWrapper::m_curVBOid = INVALID_NUM;
|
||||||
IDBDataFile* BRMWrapper::m_curVBFile = NULL;
|
IDBDataFile* BRMWrapper::m_curVBFile = NULL;
|
||||||
boost::mutex vbFileLock;
|
boost::mutex vbFileLock;
|
||||||
@ -743,16 +744,6 @@ int BRMWrapper::copyVBBlock(IDBDataFile* pSourceFile, IDBDataFile* pTargetFile,
|
|||||||
return NO_ERROR;
|
return NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint8_t BRMWrapper::newCpimportJob(uint32_t &jobId)
|
|
||||||
{
|
|
||||||
return blockRsltnMgrPtr->newCpimportJob(jobId);
|
|
||||||
}
|
|
||||||
|
|
||||||
void BRMWrapper::finishCpimportJob(uint32_t jobId)
|
|
||||||
{
|
|
||||||
blockRsltnMgrPtr->finishCpimportJob(jobId);
|
|
||||||
}
|
|
||||||
|
|
||||||
int BRMWrapper::commit(const VER_t transID)
|
int BRMWrapper::commit(const VER_t transID)
|
||||||
{
|
{
|
||||||
int rc = blockRsltnMgrPtr->vbCommit(transID);
|
int rc = blockRsltnMgrPtr->vbCommit(transID);
|
||||||
|
@ -380,8 +380,6 @@ class BRMWrapper : public WEObj
|
|||||||
* @brief Commit the transaction
|
* @brief Commit the transaction
|
||||||
*/
|
*/
|
||||||
EXPORT int commit(const BRM::VER_t transID);
|
EXPORT int commit(const BRM::VER_t transID);
|
||||||
EXPORT uint8_t newCpimportJob(uint32_t &jobId);
|
|
||||||
EXPORT void finishCpimportJob(uint32_t jobId);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Copy blocks between write engine and version buffer
|
* @brief Copy blocks between write engine and version buffer
|
||||||
@ -468,7 +466,7 @@ class BRMWrapper : public WEObj
|
|||||||
static boost::thread_specific_ptr<int> m_ThreadDataPtr;
|
static boost::thread_specific_ptr<int> m_ThreadDataPtr;
|
||||||
static boost::mutex m_instanceCreateMutex;
|
static boost::mutex m_instanceCreateMutex;
|
||||||
|
|
||||||
EXPORT static bool m_useVb;
|
EXPORT static bool m_useVb;
|
||||||
|
|
||||||
static OID m_curVBOid;
|
static OID m_curVBOid;
|
||||||
static IDBDataFile* m_curVBFile;
|
static IDBDataFile* m_curVBFile;
|
||||||
|
Reference in New Issue
Block a user