diff --git a/versioning/BRM/brmtypes.h b/versioning/BRM/brmtypes.h index b5472d711..594319b49 100644 --- a/versioning/BRM/brmtypes.h +++ b/versioning/BRM/brmtypes.h @@ -532,11 +532,6 @@ const uint8_t BULK_UPDATE_DBROOT = 100; const uint8_t GET_SYSTEM_CATALOG = 101; const uint8_t BULK_WRITE_VB_ENTRY = 102; -const uint8_t NEW_CPIMPORT_JOB = 103; -const uint8_t FINISH_CPIMPORT_JOB = 104; -const uint8_t START_READONLY = 105; -const uint8_t FORCE_CLEAR_CPIMPORT_JOBS = 106; - /* Error codes returned by the DBRM functions. */ /// The operation was successful const int8_t ERR_OK = 0; diff --git a/versioning/BRM/dbrm.cpp b/versioning/BRM/dbrm.cpp index 27e36067a..ae344602d 100644 --- a/versioning/BRM/dbrm.cpp +++ b/versioning/BRM/dbrm.cpp @@ -2733,25 +2733,6 @@ int DBRM::setReadOnly(bool b) DBRM_THROW return err; } -int DBRM::startReadOnly() DBRM_THROW -{ - ByteStream command, response; - uint8_t err; - - command << START_READONLY; - err = send_recv(command, response); - - if (err != ERR_OK) - return err; - - if (response.length() != 1) - return ERR_NETWORK; - - response >> err; - CHECK_EMPTY(response); - return err; -} - int DBRM::isReadWrite() throw() { #ifdef BRM_INFO @@ -3025,73 +3006,6 @@ const QueryContext DBRM::sysCatVerID() return ret; } -uint8_t DBRM::newCpimportJob(uint32_t& jobId) -{ - ByteStream command, response; - uint8_t err; - command << NEW_CPIMPORT_JOB; - err = send_recv(command, response); - - if (err != ERR_OK) - { - log("DBRM: SessionManager::newCpimportJob(): network error"); - return err; - } - - if (response.length() != 5) - { - log("DBRM: SessionManager::newCpimportJob(): bad response"); - return ERR_READONLY; - } - - response >> err; - response >> jobId; - return ERR_OK; -} - -void DBRM::finishCpimportJob(uint32_t jobId) -{ - ByteStream command, response; - uint8_t err; - - command << FINISH_CPIMPORT_JOB << (uint32_t)jobId; - err = send_recv(command, response); - - if (err != ERR_OK) - log("DBRM: error: SessionManager::finishCpimportJob() failed"); - else if (response.length() != 1) - log("DBRM: error: SessionManager::finishCpimportJob() failed (bad response)", logging::LOG_TYPE_ERROR); - - response >> err; - - if (err != ERR_OK) - log("DBRM: error: SessionManager::finishCpimportJob() failed (valid error code)", - logging::LOG_TYPE_ERROR); -} - -int DBRM::forceClearCpimportJobs() DBRM_THROW -{ - ByteStream command, response; - uint8_t err; - - command << FORCE_CLEAR_CPIMPORT_JOBS; - err = send_recv(command, response); - - if (err != ERR_OK) - log("DBRM: error: SessionManager::forceClearAllCpimportJobs()) failed"); - else if (response.length() != 1) - log("DBRM: error: SessionManager::forceClearAllCpimportJobs() failed (bad response)", - logging::LOG_TYPE_ERROR); - - response >> err; - - if (err != ERR_OK) - log("DBRM: error: SessionManager::forceClearAllCpimportJobs() failed (valid error code)", - logging::LOG_TYPE_ERROR); - - return err; -} - const TxnID DBRM::newTxnID(const SessionManagerServer::SID session, bool block, bool isDDL) { #ifdef BRM_INFO @@ -4544,12 +4458,11 @@ void DBRM::deleteAISequence(uint32_t OID) void DBRM::addToLBIDList(uint32_t sessionID, vector& lbidList) { boost::shared_ptr systemCatalogPtr = - execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); + execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); - std::unordered_map< - execplan::CalpontSystemCatalog::OID, - std::unordered_map>> - extentMap; + std::unordered_map>> extentMap; int err = 0; @@ -4574,15 +4487,18 @@ void DBRM::addToLBIDList(uint32_t sessionID, vector& lbidList) throw runtime_error(os.str()); } - execplan::CalpontSystemCatalog::OID tableOid = systemCatalogPtr->isAUXColumnOID(oid); + execplan::CalpontSystemCatalog::OID tableOid = + systemCatalogPtr->isAUXColumnOID(oid); if (tableOid >= 3000) { if (tableOidSet.find(tableOid) == tableOidSet.end()) { tableOidSet.insert(tableOid); - execplan::CalpontSystemCatalog::TableName tableName = systemCatalogPtr->tableName(tableOid); - execplan::CalpontSystemCatalog::RIDList tableColRidList = systemCatalogPtr->columnRIDs(tableName); + execplan::CalpontSystemCatalog::TableName tableName = + systemCatalogPtr->tableName(tableOid); + execplan::CalpontSystemCatalog::RIDList tableColRidList = + systemCatalogPtr->columnRIDs(tableName); for (unsigned j = 0; j < tableColRidList.size(); j++) { diff --git a/versioning/BRM/dbrm.h b/versioning/BRM/dbrm.h index eb5dc557e..6fa1836a6 100644 --- a/versioning/BRM/dbrm.h +++ b/versioning/BRM/dbrm.h @@ -753,8 +753,6 @@ class DBRM /* SessionManager interface */ EXPORT const QueryContext verID(); EXPORT const QueryContext sysCatVerID(); - EXPORT uint8_t newCpimportJob(uint32_t &jobId); - EXPORT void finishCpimportJob(uint32_t jobId); EXPORT const TxnID newTxnID(const SessionManagerServer::SID session, bool block, bool isDDL = false); EXPORT void committed(BRM::TxnID& txnid); EXPORT void rolledback(BRM::TxnID& txnid); @@ -828,8 +826,6 @@ class DBRM EXPORT int resume() DBRM_THROW; EXPORT int forceReload() DBRM_THROW; EXPORT int setReadOnly(bool b) DBRM_THROW; - EXPORT int startReadOnly() DBRM_THROW; - EXPORT int forceClearCpimportJobs() DBRM_THROW; EXPORT int isReadWrite() throw(); EXPORT bool isEMEmpty() throw(); diff --git a/versioning/BRM/dbrmctl.cpp b/versioning/BRM/dbrmctl.cpp index 312e6119c..68274c140 100644 --- a/versioning/BRM/dbrmctl.cpp +++ b/versioning/BRM/dbrmctl.cpp @@ -39,10 +39,7 @@ bool vflg; void usage(char* c) { - cerr << "Usage: " << c - << " [-vh] status | halt | resume | readonly | readwrite | reload | startreadonly | " - "forceclearcpimportjobs" - << endl; + cerr << "Usage: " << c << " [-vh] status | halt | resume | readonly | readwrite | reload" << endl; exit(1); } @@ -118,20 +115,6 @@ void do_status() errMsg(err); } -void start_readonly() -{ - int err; - err = dbrm.startReadOnly(); - errMsg(err); -} - -void force_clear_cpimport_jobs() -{ - int err; - err = dbrm.forceClearCpimportJobs(); - errMsg(err); -} - void do_sysstatus() { int err; @@ -189,10 +172,6 @@ int main(int argc, char** argv) set_readonly(false); else if (cmd == "reload") do_reload(); - else if (cmd == "startreadonly") - start_readonly(); - else if (cmd == "forceclearcpimportjobs") - force_clear_cpimport_jobs(); else if (cmd == "sysstatus") do_sysstatus(); else diff --git a/versioning/BRM/masterdbrmnode.cpp b/versioning/BRM/masterdbrmnode.cpp index 418a82c76..1c28e4b71 100644 --- a/versioning/BRM/masterdbrmnode.cpp +++ b/versioning/BRM/masterdbrmnode.cpp @@ -24,7 +24,6 @@ #include #include #include -#include #include "sessionmanager.h" #include "socketclosed.h" @@ -478,17 +477,6 @@ void MasterDBRMNode::msgProcessor() case GET_UNCOMMITTED_LBIDS: doGetUncommittedLbids(msg, p); continue; } - switch (cmd) - { - case NEW_CPIMPORT_JOB: doNewCpimportJob(p); continue; - - case FINISH_CPIMPORT_JOB: doFinishCpimportJob(msg, p); continue; - - case START_READONLY: doStartReadOnly(p->sock); continue; - - case FORCE_CLEAR_CPIMPORT_JOBS: doForceClearAllCpimportJobs(p->sock); continue; - } - /* Process TableLock calls */ switch (cmd) { @@ -1161,66 +1149,11 @@ void MasterDBRMNode::doResume(messageqcpp::IOSocket* sock) { } } -void MasterDBRMNode::doForceClearAllCpimportJobs(messageqcpp::IOSocket* sock) -{ -#ifdef BMR_VERBOSE - std::cout << "doForceClearCpimprtJobs" << std::endl; -#endif - ByteStream reply; - std::unique_lock lk(cpimportMutex); - sm.clearAllCpimportJobs(); - - reply << (uint8_t)ERR_OK; - try - { - sock->write(reply); - } - catch (exception&) - { - } -} - -void MasterDBRMNode::doStartReadOnly(messageqcpp::IOSocket* sock) -{ -#ifdef BRM_VERBOSE - std::cout << "doStartReadonly" << std::endl; -#endif - ByteStream reply; - - // Spawn a new thread and detach it, we cannot block `msgProcessor`. - std::thread readonlyAdjuster( - [this]() - { - std::unique_lock lk(cpimportMutex); - if (sm.getCpimportJobsCount() != 0) - { - waitToFinishJobs = true; - // Wait until all cpimort jobs are done. - cpimportJobsCond.wait(lk, [this]() { return sm.getCpimportJobsCount() == 0; }); - setReadOnly(true); - waitToFinishJobs = false; - } - else - { - setReadOnly(true); - } - }); - - readonlyAdjuster.detach(); - - reply << (uint8_t)ERR_OK; - try - { - sock->write(reply); - } - catch (exception&) - { - } -} void MasterDBRMNode::doSetReadOnly(messageqcpp::IOSocket* sock, bool b) { ByteStream reply; + setReadOnly(b); reply << (uint8_t)ERR_OK; @@ -1425,103 +1358,6 @@ void MasterDBRMNode::doSysCatVerID(ByteStream& msg, ThreadParams* p) } } -void MasterDBRMNode::doNewCpimportJob(ThreadParams* p) -{ -#ifdef BRM_VERBOSE - std::cerr << "doNewCpimportJob" << std::endl; -#endif - ByteStream reply; - uint32_t jobId; - - std::unique_lock lk(cpimportMutex); - // That means that controller node is waiting untill all active cpimport jobs are done. - // We cannot block `msgProcessor` and cannot create a new job, so exit with `readonly` code. - if (waitToFinishJobs) - { - reply << (uint8_t)ERR_READONLY; - try - { - p->sock->write(reply); - } - catch (...) - { - } - return; - } - - try - { - jobId = sm.newCpimportJob(); - } - catch (exception&) - { - reply.reset(); - reply << (uint8_t)ERR_FAILURE; - try - { - p->sock->write(reply); - } - catch (...) - { - } - - return; - } - - reply << (uint8_t)ERR_OK; - reply << (uint32_t)jobId; - try - { - p->sock->write(reply); - } - catch (exception&) - { - } -} - -void MasterDBRMNode::doFinishCpimportJob(ByteStream& msg, ThreadParams* p) -{ -#ifdef BRM_VERBOSE - std::cout << "doFinishCpimportJob" << std::endl; -#endif - ByteStream reply; - uint32_t cpimportJob; - uint8_t cmd; - std::unique_lock lk(cpimportMutex); - - msg >> cmd; - msg >> cpimportJob; - try - { - sm.finishCpimortJob(cpimportJob); - } - catch (exception&) - { - reply << (uint8_t)ERR_FAILURE; - try - { - p->sock->write(reply); - } - catch (...) - { - } - - return; - } - - reply << (uint8_t)ERR_OK; - try - { - p->sock->write(reply); - } - catch (...) - { - } - - if (waitToFinishJobs && sm.getCpimportJobsCount() == 0) - cpimportJobsCond.notify_one(); -} - void MasterDBRMNode::doNewTxnID(ByteStream& msg, ThreadParams* p) { ByteStream reply; diff --git a/versioning/BRM/masterdbrmnode.h b/versioning/BRM/masterdbrmnode.h index 50dd13131..a3dea951c 100644 --- a/versioning/BRM/masterdbrmnode.h +++ b/versioning/BRM/masterdbrmnode.h @@ -186,7 +186,6 @@ class MasterDBRMNode void doReload(messageqcpp::IOSocket* sock); void doSetReadOnly(messageqcpp::IOSocket* sock, bool b); void doGetReadOnly(messageqcpp::IOSocket* sock); - void doStartReadOnly(messageqcpp::IOSocket* sock); /* SessionManager interface */ SessionManagerServer sm; @@ -194,9 +193,6 @@ class MasterDBRMNode void doGetSystemCatalog(messageqcpp::ByteStream& msg, ThreadParams* p); void doSysCatVerID(messageqcpp::ByteStream& msg, ThreadParams* p); void doNewTxnID(messageqcpp::ByteStream& msg, ThreadParams* p); - void doNewCpimportJob(ThreadParams* p); - void doFinishCpimportJob(messageqcpp::ByteStream& msg, ThreadParams* p); - void doForceClearAllCpimportJobs(messageqcpp::IOSocket* sock); void doCommitted(messageqcpp::ByteStream& msg, ThreadParams* p); void doRolledBack(messageqcpp::ByteStream& msg, ThreadParams* p); void doGetTxnID(messageqcpp::ByteStream& msg, ThreadParams* p); @@ -251,14 +247,11 @@ class MasterDBRMNode boost::mutex mutex2; // protects params and the hand-off TODO: simplify boost::mutex slaveLock; // syncs communication with the slaves boost::mutex serverLock; // kludge to synchronize reloading - std::mutex cpimportMutex; - std::condition_variable cpimportJobsCond; int runners, NumWorkers; ThreadParams* params; volatile bool die, halting; bool reloadCmd; mutable bool readOnly; - mutable bool waitToFinishJobs{false}; struct timespec MSG_TIMEOUT; }; diff --git a/versioning/BRM/sessionmanagerserver.cpp b/versioning/BRM/sessionmanagerserver.cpp index 427bafd69..b0a5c3968 100644 --- a/versioning/BRM/sessionmanagerserver.cpp +++ b/versioning/BRM/sessionmanagerserver.cpp @@ -34,7 +34,6 @@ #include #include #include -#include using namespace std; #include @@ -257,28 +256,6 @@ const QueryContext SessionManagerServer::sysCatVerID() return ret; } -uint32_t SessionManagerServer::newCpimportJob() -{ - std::scoped_lock lk(cpimportMutex); - activeCpimportJobs.insert(cpimportJobId); - auto ret = cpimportJobId; - ++cpimportJobId; - return ret; -} - -void SessionManagerServer::finishCpimortJob(uint32_t jobId) -{ - std::scoped_lock lk(cpimportMutex); - if (activeCpimportJobs.count(jobId)) - activeCpimportJobs.erase(jobId); -} - -void SessionManagerServer::clearAllCpimportJobs() -{ - std::scoped_lock lk(cpimportMutex); - activeCpimportJobs.clear(); -} - const TxnID SessionManagerServer::newTxnID(const SID session, bool block, bool isDDL) { TxnID ret; // ctor must set valid = false @@ -406,12 +383,6 @@ void SessionManagerServer::clearSystemState(uint32_t state) saveSystemState(); } -uint32_t SessionManagerServer::getCpimportJobsCount() -{ - std::scoped_lock lk(cpimportMutex); - return activeCpimportJobs.size(); -} - uint32_t SessionManagerServer::getTxnCount() { boost::mutex::scoped_lock lk(mutex); diff --git a/versioning/BRM/sessionmanagerserver.h b/versioning/BRM/sessionmanagerserver.h index ab4486d6e..84164ccd0 100644 --- a/versioning/BRM/sessionmanagerserver.h +++ b/versioning/BRM/sessionmanagerserver.h @@ -27,9 +27,8 @@ #pragma once #include -#include -#include + #include #include @@ -40,6 +39,7 @@ #define EXPORT + namespace BRM { /** @brief Issues transaction IDs and keeps track of the current system-wide version ID. @@ -146,15 +146,6 @@ class SessionManagerServer */ EXPORT const TxnID newTxnID(const SID session, bool block = true, bool isDDL = false); - // Adds a new job into `active` cpimport job list and return id of that job. - EXPORT uint32_t newCpimportJob(); - - // Removes the given `jobId` from `active` cpimort job list. - EXPORT void finishCpimortJob(uint32_t jobId); - - // Clears all active cpimport jobs. - EXPORT void clearAllCpimportJobs(); - /** @brief Record that a transaction has been committed * * Record that a transaction has been committed. @@ -262,9 +253,6 @@ class SessionManagerServer */ EXPORT uint32_t getTxnCount(); - - EXPORT uint32_t getCpimportJobsCount(); - private: SessionManagerServer(const SessionManagerServer&); SessionManagerServer& operator=(const SessionManagerServer&); @@ -289,12 +277,9 @@ class SessionManagerServer boost::mutex mutex; boost::condition_variable condvar; // used to synthesize a semaphore uint32_t semValue; - - std::unordered_set activeCpimportJobs; - uint32_t cpimportJobId{0}; - std::mutex cpimportMutex; }; } // namespace BRM + #undef EXPORT diff --git a/writeengine/bulk/cpimport.cpp b/writeengine/bulk/cpimport.cpp index a4b7a5f54..a1a0aebcb 100644 --- a/writeengine/bulk/cpimport.cpp +++ b/writeengine/bulk/cpimport.cpp @@ -59,7 +59,6 @@ namespace char* pgmName = 0; const std::string IMPORT_PATH_CWD("."); bool bDebug = false; -uint32_t cpimportJobId = 0; //@bug 4643: cpimport job ended during setup w/o any err msg. // Added a try/catch with logging to main() in case @@ -193,7 +192,6 @@ void printUsage() //------------------------------------------------------------------------------ void handleSigTerm(int i) { - BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId); std::cout << "Received SIGTERM to terminate the process..." << std::endl; BulkStatus::setJobStatus(EXIT_FAILURE); } @@ -203,32 +201,12 @@ void handleSigTerm(int i) //------------------------------------------------------------------------------ void handleControlC(int i) { - BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId); if (!BulkLoad::disableConsoleOutput()) std::cout << "Received Control-C to terminate the process..." << std::endl; BulkStatus::setJobStatus(EXIT_FAILURE); } -//------------------------------------------------------------------------------ -// Signal handler to catch SIGTERM signal to terminate the process -//------------------------------------------------------------------------------ -void handleSigSegv(int i) -{ - BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId); - std::cout << "Received SIGSEGV to terminate the process..." << std::endl; - BulkStatus::setJobStatus(EXIT_FAILURE); -} - -//------------------------------------------------------------------------------ -// Signal handler to catch SIGTERM signal to terminate the process -//------------------------------------------------------------------------------ -void handleSigAbrt(int i) -{ - BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId); - std::cout << "Received SIGABRT to terminate the process..." << std::endl; - BulkStatus::setJobStatus(EXIT_FAILURE); -} //------------------------------------------------------------------------------ // If error occurs during startup, this function is called to log the specified @@ -236,7 +214,6 @@ void handleSigAbrt(int i) //------------------------------------------------------------------------------ void startupError(const std::string& errMsg, bool showHint) { - BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId); // Log to console if (!BulkLoad::disableConsoleOutput()) cerr << errMsg << endl; @@ -298,17 +275,6 @@ void setupSignalHandlers() memset(&act, 0, sizeof(act)); act.sa_handler = handleSigTerm; sigaction(SIGTERM, &act, 0); - - // catch SIGSEGV signal to terminate the program - memset(&act, 0, sizeof(act)); - act.sa_handler = handleSigSegv; - sigaction(SIGSEGV, &act, 0); - - // catch SIGABRT signal to terminate the program - memset(&act, 0, sizeof(act)); - act.sa_handler = handleSigAbrt; - sigaction(SIGABRT, &act, 0); - } //------------------------------------------------------------------------------ @@ -1304,16 +1270,6 @@ int main(int argc, char** argv) new boost::thread(utils::MonitorProcMem(0, checkPct, SUBSYSTEM_ID_WE_BULK)); } - rc = BRMWrapper::getInstance()->newCpimportJob(cpimportJobId); - if (rc != NO_ERROR) - { - WErrorCodes ec; - std::ostringstream oss; - oss << "Error in creating new cpimport job on Controller node; " << ec.errorString(rc) - << "; cpimport is terminating."; - startupError(oss.str(), false); - } - //-------------------------------------------------------------------------- // This is the real business //-------------------------------------------------------------------------- @@ -1364,7 +1320,6 @@ int main(int argc, char** argv) rc = ERR_UNKNOWN; } - BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId); // Free up resources allocated by MY_INIT() above. my_end(0); diff --git a/writeengine/shared/we_brm.cpp b/writeengine/shared/we_brm.cpp index e065630b5..ccd116e0b 100644 --- a/writeengine/shared/we_brm.cpp +++ b/writeengine/shared/we_brm.cpp @@ -60,7 +60,8 @@ BRMWrapper* volatile BRMWrapper::m_instance = NULL; boost::thread_specific_ptr BRMWrapper::m_ThreadDataPtr; boost::mutex BRMWrapper::m_instanceCreateMutex; -bool BRMWrapper::m_useVb = true; + + bool BRMWrapper::m_useVb = true; OID BRMWrapper::m_curVBOid = INVALID_NUM; IDBDataFile* BRMWrapper::m_curVBFile = NULL; boost::mutex vbFileLock; @@ -743,16 +744,6 @@ int BRMWrapper::copyVBBlock(IDBDataFile* pSourceFile, IDBDataFile* pTargetFile, return NO_ERROR; } -uint8_t BRMWrapper::newCpimportJob(uint32_t &jobId) -{ - return blockRsltnMgrPtr->newCpimportJob(jobId); -} - -void BRMWrapper::finishCpimportJob(uint32_t jobId) -{ - blockRsltnMgrPtr->finishCpimportJob(jobId); -} - int BRMWrapper::commit(const VER_t transID) { int rc = blockRsltnMgrPtr->vbCommit(transID); diff --git a/writeengine/shared/we_brm.h b/writeengine/shared/we_brm.h index d8bc403ae..30d1eb774 100644 --- a/writeengine/shared/we_brm.h +++ b/writeengine/shared/we_brm.h @@ -380,8 +380,6 @@ class BRMWrapper : public WEObj * @brief Commit the transaction */ EXPORT int commit(const BRM::VER_t transID); - EXPORT uint8_t newCpimportJob(uint32_t &jobId); - EXPORT void finishCpimportJob(uint32_t jobId); /** * @brief Copy blocks between write engine and version buffer @@ -468,7 +466,7 @@ class BRMWrapper : public WEObj static boost::thread_specific_ptr m_ThreadDataPtr; static boost::mutex m_instanceCreateMutex; - EXPORT static bool m_useVb; + EXPORT static bool m_useVb; static OID m_curVBOid; static IDBDataFile* m_curVBFile;