diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index deb4249ed..0fbfb0ca1 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -2291,6 +2291,21 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows) cal_impl_if::gp_walk_info gwi; gwi.thd = thd; + if (thd->slave_thread && !get_replication_slave(thd) && ( + thd->lex->sql_command == SQLCOM_INSERT || + thd->lex->sql_command == SQLCOM_INSERT_SELECT || + thd->lex->sql_command == SQLCOM_UPDATE || + thd->lex->sql_command == SQLCOM_UPDATE_MULTI || + thd->lex->sql_command == SQLCOM_DELETE || + thd->lex->sql_command == SQLCOM_DELETE_MULTI || + thd->lex->sql_command == SQLCOM_TRUNCATE || + thd->lex->sql_command == SQLCOM_LOAD)) + { + if (affected_rows) + *affected_rows = 0; + return 0; + } + if (execute) { rc = doUpdateDelete(thd, gwi); @@ -4919,6 +4934,17 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table) return 0; } + if (thd->slave_thread && !get_replication_slave(thd) && ( + thd->lex->sql_command == SQLCOM_INSERT || + thd->lex->sql_command == SQLCOM_INSERT_SELECT || + thd->lex->sql_command == SQLCOM_UPDATE || + thd->lex->sql_command == SQLCOM_UPDATE_MULTI || + thd->lex->sql_command == SQLCOM_DELETE || + thd->lex->sql_command == SQLCOM_DELETE_MULTI || + thd->lex->sql_command == SQLCOM_TRUNCATE || + thd->lex->sql_command == SQLCOM_LOAD)) + return 0; + //Update and delete code if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) return doUpdateDelete(thd, gwi); diff --git a/oam/install_scripts/columnstore-post-install.in b/oam/install_scripts/columnstore-post-install.in index adac6f228..5cb22c35e 100755 --- a/oam/install_scripts/columnstore-post-install.in +++ b/oam/install_scripts/columnstore-post-install.in @@ -32,6 +32,7 @@ if [ ! -f @ENGINE_SYSCONFDIR@/columnstore/Columnstore.xml ]; then fi if [ -f @ENGINE_SYSCONFDIR@/columnstore/storagemanager.cnf.rpmsave ]; then + /bin/cp -f @ENGINE_SYSCONFDIR@/columnstore/storagemanager.cnf @ENGINE_SYSCONFDIR@/columnstore/storagemanager.cnf.new /bin/cp -f @ENGINE_SYSCONFDIR@/columnstore/storagemanager.cnf.rpmsave @ENGINE_SYSCONFDIR@/columnstore/storagemanager.cnf fi diff --git a/oam/oamcpp/liboamcpp.cpp b/oam/oamcpp/liboamcpp.cpp index a23039f61..8b3454f33 100644 --- a/oam/oamcpp/liboamcpp.cpp +++ b/oam/oamcpp/liboamcpp.cpp @@ -5996,12 +5996,12 @@ bool Oam::autoMovePmDbroot(std::string residePM) if ( ret != 0 ) { - writeLog("FAILURE: Error assigning gluster dbroot# " + itoa(dbrootID), LOG_TYPE_ERROR ); + writeLog("FAILURE: Error assigning gluster dbroot# " + itoa(dbrootID) +" ret: " + itoa(ret), LOG_TYPE_ERROR ); } } catch (...) { - writeLog("FAILURE: Error assigning gluster dbroot# " + itoa(dbrootID), LOG_TYPE_ERROR ); + writeLog("EXCEPTION FAILURE: Error assigning gluster dbroot# " + itoa(dbrootID), LOG_TYPE_ERROR ); } // check if a copy is available when residePM returns @@ -7448,15 +7448,15 @@ void Oam::removeDbroot(DBRootConfigList& dbrootlist) catch (exception& e) { cout << endl << "**** glusterctl API exception: " << e.what() << endl; - cerr << "FAILURE: Error assigning gluster dbroot# " + itoa(dbrootID) << endl; - writeLog("FAILURE: Error assigning gluster dbroot# " + itoa(dbrootID), LOG_TYPE_ERROR ); + cerr << "FAILURE: Error deleting gluster dbroot# " + itoa(dbrootID) << endl; + writeLog("FAILURE: Error deleting gluster dbroot# " + itoa(dbrootID), LOG_TYPE_ERROR ); exceptionControl("removeDbroot", API_FAILURE); } catch (...) { cout << endl << "**** glusterctl API exception: UNKNOWN" << endl; - cerr << "FAILURE: Error assigning gluster dbroot# " + itoa(dbrootID) << endl; - writeLog("FAILURE: Error assigning gluster dbroot# " + itoa(dbrootID), LOG_TYPE_ERROR ); + cerr << "FAILURE: Error deleting gluster dbroot# " + itoa(dbrootID) << endl; + writeLog("FAILURE: Error deleting gluster dbroot# " + itoa(dbrootID), LOG_TYPE_ERROR ); exceptionControl("removeDbroot", API_FAILURE); } } @@ -9151,7 +9151,7 @@ int Oam::glusterctl(GLUSTER_COMMANDS command, std::string argument1, std::string string dbr = sysConfig->getConfig("SystemModuleConfig", ModuleDBRootID); string command = "" + DataRedundancyConfigs[pm].pmIpAddr + ":/dbroot" + dbr + " /var/lib/columnstore/data" + dbr + - " glusterfs defaults,direct-io-mode=enable 00"; + " glusterfs defaults,direct-io-mode=enable 0 0"; string toPM = "pm" + itoa(pm + 1); distributeFstabUpdates(command, toPM); } diff --git a/oam/oamcpp/oamcache.cpp b/oam/oamcpp/oamcache.cpp index bd2957764..aae381b18 100644 --- a/oam/oamcpp/oamcache.cpp +++ b/oam/oamcpp/oamcache.cpp @@ -155,6 +155,7 @@ void OamCache::checkReload() } sleep(1); + ++retry; //cout << "pm " << *it << " -> connection " << (i-1) << endl; } diff --git a/procmgr/main.cpp b/procmgr/main.cpp index e2a5dd10e..f1f7bfb47 100644 --- a/procmgr/main.cpp +++ b/procmgr/main.cpp @@ -47,6 +47,7 @@ using namespace threadpool; using namespace config; bool runStandby = false; +bool MsgThreadActive = false; bool runCold = false; string systemName = "system"; string iface_name; @@ -412,6 +413,9 @@ int main(int argc, char** argv) log.writeLog(__LINE__, "ERROR: makeConfig failed", LOG_TYPE_ERROR); } + + // TODO: This is called before MessageThread is created. + // Doesn't break anything but can be removed as it's done after MessageThread creation. try { oam.distributeConfigFile(); @@ -474,8 +478,6 @@ static void messageThread(Configuration config) sleep (1); } - log.writeLog(__LINE__, "Message Thread started ..", LOG_TYPE_DEBUG); - //read and cleanup port before trying to use try { @@ -489,6 +491,8 @@ static void messageThread(Configuration config) { } + log.writeLog(__LINE__, "Message Thread started ..", LOG_TYPE_DEBUG); + // //waiting for request // @@ -499,7 +503,7 @@ static void messageThread(Configuration config) try { MessageQueueServer procmgr("ProcMgr"); - + MsgThreadActive = true; for (;;) { try @@ -1645,17 +1649,17 @@ void pingDeviceThread() if (busy) break; - //set query system state not ready - processManager.setQuerySystemState(false); - - processManager.setSystemState(oam::BUSY_INIT); - processManager.reinitProcessType("cpimport"); // halt the dbrm oam.dbrmctl("halt"); log.writeLog(__LINE__, "'dbrmctl halt' done", LOG_TYPE_DEBUG); + //set query system state not ready + processManager.setQuerySystemState(false); + + processManager.setSystemState(oam::BUSY_INIT); + aManager.sendAlarmReport(moduleName.c_str(), MODULE_DOWN_AUTO, CLEAR); //send notification @@ -1720,6 +1724,36 @@ void pingDeviceThread() //set query system state ready processManager.setQuerySystemState(true); + // waiting until dml are ACTIVE + // disableModule is going to trigger DMLProc to restart wait for it + int retry = 0; + while (retry < 30) + { + ProcessStatus DMLprocessstatus; + + try + { + oam.getProcessStatus("DMLProc", config.moduleName(), DMLprocessstatus); + } + catch (exception& ex) + {} + catch (...) + {} + + if (DMLprocessstatus.ProcessOpState == oam::BUSY_INIT) + log.writeLog(__LINE__, "Waiting for DMLProc to finish rollback", LOG_TYPE_DEBUG); + + if (DMLprocessstatus.ProcessOpState == oam::ACTIVE) + break; + + if (DMLprocessstatus.ProcessOpState == oam::FAILED) + break; + + // wait some more + sleep(2); + ++retry; + } + goto break_case; } } @@ -2012,6 +2046,7 @@ void pingDeviceThread() log.writeLog(__LINE__, "Module failed to auto start: " + moduleName, LOG_TYPE_CRITICAL); + if ( amazon ) processManager.setSystemState(oam::FAILED); else @@ -2020,6 +2055,35 @@ void pingDeviceThread() //set query system state ready processManager.setQuerySystemState(true); + // waiting until dml are ACTIVE + // disableModule is going to trigger DMLProc to restart wait for it + int retry = 0; + while (retry < 30) + { + ProcessStatus DMLprocessstatus; + + try + { + oam.getProcessStatus("DMLProc", config.moduleName(), DMLprocessstatus); + } + catch (exception& ex) + {} + catch (...) + {} + + if (DMLprocessstatus.ProcessOpState == oam::BUSY_INIT) + log.writeLog(__LINE__, "Waiting for DMLProc to finish rollback", LOG_TYPE_DEBUG); + + if (DMLprocessstatus.ProcessOpState == oam::ACTIVE) + break; + + if (DMLprocessstatus.ProcessOpState == oam::FAILED) + break; + + // wait some more + sleep(2); + ++retry; + } //clear count moduleInfoList[moduleName] = 0; } @@ -2081,21 +2145,22 @@ void pingDeviceThread() Configuration config; log.writeLog(__LINE__, "*** module is down: " + moduleName, LOG_TYPE_CRITICAL); - //set query system state not ready - processManager.setQuerySystemState(false); - - processManager.setSystemState(oam::BUSY_INIT); - processManager.reinitProcessType("cpimport"); // halt the dbrm oam.dbrmctl("halt"); log.writeLog(__LINE__, "'dbrmctl halt' done", LOG_TYPE_DEBUG); + //set query system state not ready + processManager.setQuerySystemState(false); + processManager.setSystemState(oam::BUSY_INIT); - //string cmd = "/etc/init.d/glusterd restart > /dev/null 2>&1"; - //system(cmd.c_str()); + // call for a reload in case cpimport was running and + // some cleanup is needed on dbrmcontroller thats active before continuing + oam.dbrmctl("reload"); + log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG); + //send notification oam.sendDeviceNotification(moduleName, MODULE_DOWN); @@ -2109,9 +2174,7 @@ void pingDeviceThread() //set module to disable state processManager.disableModule(moduleName, false); - //call dbrm control - oam.dbrmctl("reload"); - log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG); + // if pm, move dbroots to other pms if ( ( moduleName.find("pm") == 0 && !amazon && ( DBRootStorageType != "internal") ) || @@ -2142,6 +2205,9 @@ void pingDeviceThread() { processManager.setModuleState(moduleName, oam::AUTO_DISABLED); + //call dbrm control + oam.dbrmctl("reload"); + log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG); // resume the dbrm oam.dbrmctl("resume"); log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG); @@ -2149,6 +2215,36 @@ void pingDeviceThread() //set query system state ready processManager.setQuerySystemState(true); + // waiting until dml are ACTIVE + // disableModule is going to trigger DMLProc to restart wait for it + int retry = 0; + while (retry < 30) + { + ProcessStatus DMLprocessstatus; + + try + { + oam.getProcessStatus("DMLProc", config.moduleName(), DMLprocessstatus); + } + catch (exception& ex) + {} + catch (...) + {} + + if (DMLprocessstatus.ProcessOpState == oam::BUSY_INIT) + log.writeLog(__LINE__, "Waiting for DMLProc to finish rollback", LOG_TYPE_DEBUG); + + if (DMLprocessstatus.ProcessOpState == oam::ACTIVE) + break; + + if (DMLprocessstatus.ProcessOpState == oam::FAILED) + break; + + // wait some more + sleep(2); + ++retry; + } + break; } } @@ -2160,6 +2256,9 @@ void pingDeviceThread() { if ( moduleName.find("um") == 0 ) { + //call dbrm control + oam.dbrmctl("reload"); + log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG); // resume the dbrm oam.dbrmctl("resume"); log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG); @@ -2350,8 +2449,8 @@ void pingDeviceThread() } } - //set recycle process - processManager.recycleProcess(moduleName); + //set reinit process + processManager.reinitProcesses(); //set query system state ready processManager.setQuerySystemState(true); @@ -2365,6 +2464,9 @@ void pingDeviceThread() ( opState != oam::AUTO_DISABLED ) ) { + //call dbrm control + oam.dbrmctl("reload"); + log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG); // resume the dbrm oam.dbrmctl("resume"); log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG); @@ -2376,13 +2478,45 @@ void pingDeviceThread() else { // non-amazon + //call dbrm control + oam.dbrmctl("reload"); + log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG); // resume the dbrm oam.dbrmctl("resume"); log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG); //set recycle process - processManager.recycleProcess(moduleName); + // waiting until dml are ACTIVE + // disableModule is going to trigger DMLProc to restart wait for it + int retry = 0; + while (retry < 30) + { + ProcessStatus DMLprocessstatus; + try + { + oam.getProcessStatus("DMLProc", config.moduleName(), DMLprocessstatus); + } + catch (exception& ex) + {} + catch (...) + {} + + if (DMLprocessstatus.ProcessOpState == oam::BUSY_INIT) + log.writeLog(__LINE__, "Waiting for DMLProc to finish rollback", LOG_TYPE_DEBUG); + + if (DMLprocessstatus.ProcessOpState == oam::ACTIVE) + break; + + if (DMLprocessstatus.ProcessOpState == oam::FAILED) + break; + + // wait some more + sleep(2); + ++retry; + } + // restart DMLProc again to retrigger rollback with all dbroots connected + processManager.restartProcessType("DMLProc"); //set query system state ready processManager.setQuerySystemState(true); } diff --git a/procmgr/processmanager.cpp b/procmgr/processmanager.cpp index faffb62fd..4dd6ac9f9 100644 --- a/procmgr/processmanager.cpp +++ b/procmgr/processmanager.cpp @@ -53,6 +53,7 @@ pthread_mutex_t THREAD_LOCK; extern string cloud; extern bool amazon; extern bool runStandby; +extern bool MsgThreadActive; extern string iface_name; extern string PMInstanceType; extern string UMInstanceType; @@ -920,8 +921,6 @@ void processMSG(messageqcpp::IOSocket* cfIos) status = processManager.disableModule(moduleName, true); log.writeLog(__LINE__, "Disable Module Completed on " + moduleName, LOG_TYPE_INFO); - processManager.recycleProcess(moduleName); - //check for SIMPLEX Processes on mate might need to be started processManager.checkSimplexModule(moduleName); @@ -3806,8 +3805,6 @@ int ProcessManager::disableModule(string target, bool manualFlag) //set Columnstore.xml enable state setEnableState( target, SnewState); - log.writeLog(__LINE__, "disableModule - setEnableState", LOG_TYPE_DEBUG); - //sleep a bit to give time for the state change to apply sleep(1); @@ -3816,8 +3813,6 @@ int ProcessManager::disableModule(string target, bool manualFlag) { if ( updatePMSconfig() != API_SUCCESS ) return API_FAILURE; - - log.writeLog(__LINE__, "disableModule - Updated PM server Count", LOG_TYPE_DEBUG); } //Update DBRM section of Columnstore.xml @@ -3825,19 +3820,36 @@ int ProcessManager::disableModule(string target, bool manualFlag) { return API_FAILURE; } - processManager.recycleProcess(target); - - //check for SIMPLEX Processes on mate might need to be started - processManager.checkSimplexModule(target); //distribute config file distributeConfigFile("system"); + processManager.reinitProcesses(); + log.writeLog(__LINE__, "disableModule successfully complete for " + target, LOG_TYPE_DEBUG); return API_SUCCESS; } + +void ProcessManager::reinitProcesses(std::string skipModule) +{ + Oam oam; + + log.writeLog(__LINE__, "reinitProcesses... ", LOG_TYPE_DEBUG); + + reinitProcessType("DBRMWorkerNode"); + reinitProcessType("WriteEngineServer"); + restartProcessType("ExeMgr",skipModule); + sleep(1); + restartProcessType("DDLProc",skipModule); + sleep(1); + restartProcessType("DMLProc",skipModule); + sleep(3); + + log.writeLog(__LINE__, "reinitProcesses complete", LOG_TYPE_DEBUG); +} + /****************************************************************************************** * @brief recycleProcess * @@ -3895,7 +3907,7 @@ void ProcessManager::recycleProcess(string module, bool enableModule) startProcessType("mysqld"); - return; + return; } /****************************************************************************************** @@ -4886,6 +4898,7 @@ int ProcessManager::reinitProcessType( std::string processName ) if ( systemprocessstatus.processstatus[i].ProcessName == "ServerMonitor" ) { // found one, request reinit of it + log.writeLog(__LINE__, "reinitProcessType: cpimport" + systemprocessstatus.processstatus[i].Module, LOG_TYPE_DEBUG); retStatus = processManager.reinitProcess(systemprocessstatus.processstatus[i].Module, "cpimport"); log.writeLog(__LINE__, "reinitProcessType: ACK received from Process-Monitor, return status = " + oam.itoa(retStatus), LOG_TYPE_DEBUG); @@ -6514,12 +6527,12 @@ int ProcessManager::sendMsgProcMon( std::string module, ByteStream msg, int requ catch (SocketClosed& ex) { string error = ex.what(); -// log.writeLog(__LINE__, "EXCEPTION ERROR on mqRequest.read, module " + module + " : " + error, LOG_TYPE_ERROR); + log.writeLog(__LINE__, "EXCEPTION ERROR on mqRequest.read, module " + module + " : " + error, LOG_TYPE_ERROR); return returnStatus; } catch (...) { -// log.writeLog(__LINE__, "EXCEPTION ERROR on mqRequest.read: Caught unknown exception! module " + module, LOG_TYPE_ERROR); + log.writeLog(__LINE__, "EXCEPTION ERROR on mqRequest.read: Caught unknown exception! module " + module, LOG_TYPE_ERROR); return returnStatus; } @@ -6567,11 +6580,11 @@ int ProcessManager::sendMsgProcMon( std::string module, ByteStream msg, int requ catch (exception& ex) { string error = ex.what(); -// log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueClient: " + error, LOG_TYPE_ERROR); + log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueClient: " + error, LOG_TYPE_ERROR); } catch (...) { -// log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueClient: Caught unknown exception!", LOG_TYPE_ERROR); + log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueClient: Caught unknown exception!", LOG_TYPE_ERROR); } return returnStatus; @@ -6734,8 +6747,6 @@ void ProcessManager::setQuerySystemState(bool set) Oam oam; BRM::DBRM dbrm; - log.writeLog(__LINE__, "setQuerySystemState called = " + oam.itoa(set), LOG_TYPE_DEBUG); - try { dbrm.setSystemQueryReady(set); @@ -8005,8 +8016,6 @@ int ProcessManager::updatePMSconfig( bool check ) vector IpAddrs; vector nicIDs; - log.writeLog(__LINE__, "updatePMSconfig Started", LOG_TYPE_DEBUG); - pthread_mutex_lock(&THREAD_LOCK); ModuleTypeConfig moduletypeconfig; @@ -8166,8 +8175,6 @@ int ProcessManager::updatePMSconfig( bool check ) sysConfig1->write(); pthread_mutex_unlock(&THREAD_LOCK); - log.writeLog(__LINE__, "updatePMSconfig completed", LOG_TYPE_DEBUG); - return API_SUCCESS; } catch (...) @@ -8195,8 +8202,6 @@ int ProcessManager::updateWorkerNodeconfig() vector module; vector ipadr; - log.writeLog(__LINE__, "updateWorkerNodeconfig Started", LOG_TYPE_DEBUG); - pthread_mutex_lock(&THREAD_LOCK); //setup current module as work-node #1 by entering it in first @@ -8309,8 +8314,6 @@ int ProcessManager::updateWorkerNodeconfig() sysConfig3->write(); pthread_mutex_unlock(&THREAD_LOCK); - log.writeLog(__LINE__, "updateWorkerNodeconfig completed", LOG_TYPE_DEBUG); - return API_SUCCESS; } @@ -8511,8 +8514,6 @@ int ProcessManager::setPMProcIPs( std::string moduleName, std::string processNam Oam oam; ModuleConfig moduleconfig; - log.writeLog(__LINE__, "setPMProcIPs called for " + moduleName, LOG_TYPE_DEBUG); - pthread_mutex_lock(&THREAD_LOCK); if ( processName == oam::UnassignedName || processName == "DDLProc") @@ -8631,8 +8632,6 @@ int ProcessManager::distributeConfigFile(std::string name, std::string file) Oam oam; int returnStatus = oam::API_SUCCESS; - log.writeLog(__LINE__, "distributeConfigFile called for " + name + " file = " + file, LOG_TYPE_DEBUG); - string dirName = std::string(MCSSYSCONFDIR) + "/columnstore/"; string fileName = dirName + file; @@ -8817,7 +8816,6 @@ int ProcessManager::getDBRMData(messageqcpp::IOSocket fIos, std::string moduleNa // StorageManager: Need to make these existence checks use an idbfilesystem op if we // decide to put the BRM-managed files in cloud storage string currentDbrmFile; - log.writeLog(__LINE__, "I declare that I am ProcMgr, and I am running getDBRMData!", LOG_TYPE_DEBUG); IDBFileSystem &fs = IDBPolicy::getFs(currentFileName.c_str()); boost::scoped_ptr oldFile(IDBDataFile::open(IDBPolicy::getType(currentFileName.c_str(), IDBPolicy::WRITEENG), @@ -9198,6 +9196,14 @@ int ProcessManager::switchParentOAMModule(std::string newActiveModuleName) //clear run standby flag; runStandby = false; + int retryCount = 0; + //sleep, give time for message thread to startup + while (!MsgThreadActive && retryCount < 10) + { + log.writeLog(__LINE__, "Waiting for Message Thread...", LOG_TYPE_DEBUG); + sleep(5); + ++retryCount; + } int moduleID = atoi(newActiveModuleName.substr(MAX_MODULE_TYPE_SIZE, MAX_MODULE_ID_SIZE).c_str()); @@ -9883,9 +9889,6 @@ int ProcessManager::OAMParentModuleChange() log.writeLog(__LINE__, " ", LOG_TYPE_DEBUG); log.writeLog(__LINE__, "*** OAMParentModule outage, OAM Parent Module change-over started ***", LOG_TYPE_DEBUG); - //run save.brm script - processManager.saveBRM(true, false); - gdownActiveOAMModule = downOAMParentName; // update Columnstore.xml entries @@ -9933,10 +9936,19 @@ int ProcessManager::OAMParentModuleChange() //clear run standby flag; runStandby = false; - + int retryCount = 0; //sleep, give time for message thread to startup - sleep(5); + while (!MsgThreadActive && retryCount < 10) + { + log.writeLog(__LINE__, "Waiting for Message Thread...", LOG_TYPE_DEBUG); + sleep(5); + ++retryCount; + } + //run save.brm script + //Nope turns out this has to be done first... + + processManager.saveBRM(false); try { oam.autoMovePmDbroot(downOAMParentName); @@ -10032,19 +10044,23 @@ int ProcessManager::OAMParentModuleChange() //do it here to get current processes active faster to process queries faster processManager.setProcessStates(downOAMParentName, oam::AUTO_OFFLINE); - //set other down modules to disable state + //set OTHER down modules to disable state vector::iterator pt1 = downModuleList.begin(); for ( ; pt1 != downModuleList.end() ; pt1++) { - disableModule(*pt1, false); - processManager.setProcessStates(*pt1, oam::AUTO_OFFLINE); + // Don't do this again for downOAMParentName we just did it 3 lines ago + if (*pt1 != downOAMParentName) + { + disableModule(*pt1, false); + processManager.setProcessStates(*pt1, oam::AUTO_OFFLINE); + } } //distribute config file distributeConfigFile("system"); - //restart local module + //restart local module WHY?? processManager.stopModule(config.moduleName(), oam::FORCEFUL, true); string localModule = config.moduleName(); @@ -10060,8 +10076,11 @@ int ProcessManager::OAMParentModuleChange() status = startsystemthreadStatus; } + reinitProcessType("cpimport"); + // waiting until dml are ACTIVE - while (true) + int retry = 0; + while (retry < 30) { ProcessStatus DMLprocessstatus; @@ -10085,25 +10104,18 @@ int ProcessManager::OAMParentModuleChange() // wait some more sleep(2); + ++retry; } - //set recycle process - processManager.recycleProcess(downOAMParentName); //restart/reinit processes to force their release of the controller node port if ( ( config.ServerInstallType() == oam::INSTALL_COMBINE_DM_UM_PM) && ( moduleNameList.size() <= 0 && config.moduleType() == "pm") ) { - status = 0; + // Do Nothing } else { -// processManager.restartProcessType("mysql", localModule); -// processManager.restartProcessType("ExeMgr", localModule); -// processManager.restartProcessType("WriteEngineServer", localModule); - -// processManager.reinitProcessType("DBRMWorkerNode"); - //send message to start new Standby Process-Manager, if needed newStandbyModule = getStandbyModule(); @@ -10184,15 +10196,6 @@ int ProcessManager::OAMParentModuleChange() } } - //restart DDLProc/DMLProc to perform any rollbacks, if needed - //dont rollback in amazon, wait until down pm recovers -// if ( ( config.ServerInstallType() != oam::INSTALL_COMBINE_DM_UM_PM ) -// && !amazon ) { -// processManager.restartProcessType("DDLProc", config.moduleName()); -// sleep(1); -// processManager.restartProcessType("DMLProc", config.moduleName()); -// } - if ( config.ServerInstallType() == oam::INSTALL_COMBINE_DM_UM_PM ) { //change master MySQL Replication setup @@ -10201,8 +10204,38 @@ int ProcessManager::OAMParentModuleChange() processManager.setMySQLReplication(devicenetworklist, config.moduleName()); } - //set query system state not ready - processManager.setQuerySystemState(true); + processManager.restartProcessType("DBRMControllerNode"); + + processManager.reinitProcesses(); + + // waiting until dml are ACTIVE + retry = 0; + while (retry < 30) + { + ProcessStatus DMLprocessstatus; + + try + { + oam.getProcessStatus("DMLProc", config.moduleName(), DMLprocessstatus); + } + catch (exception& ex) + {} + catch (...) + {} + + if (DMLprocessstatus.ProcessOpState == oam::BUSY_INIT) + log.writeLog(__LINE__, "Waiting for DMLProc to finish rollback", LOG_TYPE_DEBUG); + + if (DMLprocessstatus.ProcessOpState == oam::ACTIVE) + break; + + if (DMLprocessstatus.ProcessOpState == oam::FAILED) + break; + + // wait some more + sleep(2); + ++retry; + } // clear alarm aManager.sendAlarmReport(config.moduleName().c_str(), MODULE_SWITCH_ACTIVE, CLEAR); @@ -10301,11 +10334,29 @@ std::string ProcessManager::getStandbyModule() string backupStandbyModule = "NONE"; string newStandbyModule = "NONE"; - log.writeLog(__LINE__, "getStandbyModule called", LOG_TYPE_DEBUG); - //check if gluster, if so then find PMs that have copies of DBROOT #1 string pmList = ""; + try + { + oam.getProcessStatus(systemprocessstatus); + for ( unsigned int i = 0 ; i < systemprocessstatus.processstatus.size(); i++) + { + if ( systemprocessstatus.processstatus[i].ProcessName == "ProcessManager" && + systemprocessstatus.processstatus[i].ProcessOpState == oam::STANDBY ) + //already have a hot-standby + return ""; + } + } + catch (exception& ex) + { + log.writeLog(__LINE__, "EXCEPTION ERROR on getProcessStatus: " + string(ex.what()), LOG_TYPE_ERROR); + } + catch (...) + { + log.writeLog(__LINE__, "EXCEPTION ERROR on getProcessStatus: Caught unknown exception!", LOG_TYPE_ERROR); + } + if (DataRedundancyConfig == "y") { @@ -10314,8 +10365,6 @@ std::string ProcessManager::getStandbyModule() string errmsg; oam.glusterctl(oam::GLUSTER_WHOHAS, "1", pmList, errmsg); - log.writeLog(__LINE__, "GLUSTER_WHOHAS called:" + pmList, LOG_TYPE_DEBUG); - boost::char_separator sep(" "); boost::tokenizer< boost::char_separator > tokens(pmList, sep); @@ -10357,8 +10406,6 @@ std::string ProcessManager::getStandbyModule() //not gluster, check by status try { - oam.getProcessStatus(systemprocessstatus); - for ( unsigned int i = 0 ; i < systemprocessstatus.processstatus.size(); i++) { if ( systemprocessstatus.processstatus[i].ProcessName == "ProcessManager" && @@ -10431,8 +10478,6 @@ bool ProcessManager::setStandbyModule(std::string newStandbyModule, bool send) { Oam oam; - log.writeLog(__LINE__, "setStandbyModule called", LOG_TYPE_DEBUG); - if ( newStandbyModule.empty() ) return true; @@ -10502,8 +10547,6 @@ bool ProcessManager::clearStandbyModule() { Oam oam; - log.writeLog(__LINE__, "clearStandbyModule called", LOG_TYPE_DEBUG); - pthread_mutex_lock(&THREAD_LOCK); Configuration config; @@ -10793,8 +10836,6 @@ int ProcessManager::setMySQLReplication(oam::DeviceNetworkList devicenetworklist { Oam oam; - log.writeLog(__LINE__, "setMySQLReplication called", LOG_TYPE_DEBUG); - string MySQLRep; try @@ -11121,7 +11162,15 @@ int ProcessManager::glusterAssign(std::string moduleName, std::string dbroot) msg << dbroot; int returnStatus = sendMsgProcMon( moduleName, msg, requestID, 30 ); - + int retry = 0; + // Try this for a minute because in failover the node returning to service may not be listening yet + while(returnStatus != API_SUCCESS && retry < 60) + { + log.writeLog(__LINE__, "glusterAssign retrying...", LOG_TYPE_DEBUG); + returnStatus = sendMsgProcMon( moduleName, msg, requestID, 30 ); + sleep(1); + ++retry; + } if ( returnStatus == API_SUCCESS) { //log the success event @@ -11151,7 +11200,15 @@ int ProcessManager::glusterUnassign(std::string moduleName, std::string dbroot) msg << dbroot; int returnStatus = sendMsgProcMon( moduleName, msg, requestID, 30 ); - + int retry = 0; + // Try this for a minute because in failover the node returning to service may not be listening yet + while(returnStatus != API_SUCCESS && retry < 60) + { + log.writeLog(__LINE__, "glusterUnassign retrying...", LOG_TYPE_DEBUG); + returnStatus = sendMsgProcMon( moduleName, msg, requestID, 30 ); + sleep(1); + ++retry; + } if ( returnStatus == API_SUCCESS) { //log the success event diff --git a/procmgr/processmanager.h b/procmgr/processmanager.h index 9c8e1378d..76e473488 100644 --- a/procmgr/processmanager.h +++ b/procmgr/processmanager.h @@ -299,6 +299,10 @@ public: */ int disableModule(std::string target, bool manualFlag); + /** + *@brief reinit Processes trying to replace recycleProcess + */ + void reinitProcesses(std::string skipModule = "none"); /** *@brief recycle Processes */ diff --git a/procmon/main.cpp b/procmon/main.cpp index d091b7db7..9112d3962 100644 --- a/procmon/main.cpp +++ b/procmon/main.cpp @@ -3210,6 +3210,7 @@ void processStatusMSG(messageqcpp::IOSocket* cfIos) oam::DeviceNetworkConfig devicenetworkconfig; oam::DeviceNetworkList devicenetworklist; string value; + MonitorConfig currentConfig; *msg >> moduleCount; @@ -3223,7 +3224,7 @@ void processStatusMSG(messageqcpp::IOSocket* cfIos) log.writeLog(__LINE__, "statusControl: REQUEST RECEIVED: Add Module"); string moduleType = devicenetworkconfig.DeviceName.substr(0, MAX_MODULE_TYPE_SIZE); - string OAMParentModuleType = config.OAMParentName().substr(0, 2); + string OAMParentModuleType = currentConfig.OAMParentName().substr(0, 2); // add to module status shared memory DeviceNetworkList::iterator pt = devicenetworklist.begin(); diff --git a/procmon/processmonitor.cpp b/procmon/processmonitor.cpp index e2da321ee..468ce8891 100644 --- a/procmon/processmonitor.cpp +++ b/procmon/processmonitor.cpp @@ -460,6 +460,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO { Oam oam; ByteStream ackMsg; + MonitorConfig currentConfig; ByteStream::byte messageType; ByteStream::byte requestID; @@ -809,6 +810,24 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO if ( processName == "cpimport" ) { system("pkill -sighup cpimport"); + for (int i=0; i < 10; i++) + { + //get pid + char buf[512]; + FILE *cmd_pipe = popen("pidof -s cpimport", "r"); + + fgets(buf, 512, cmd_pipe); + pid_t pid = strtoul(buf, NULL, 10); + + pclose( cmd_pipe ); + + if (pid) + sleep(2); + else + break; + } + // kill other processes + system("pkill -9 cpimport.bin"); } else { @@ -2217,6 +2236,7 @@ pid_t ProcessMonitor::startProcess(string processModuleType, string processName, char* argList[MAXARGUMENTS]; unsigned int i = 0; MonitorLog log; + MonitorConfig currentConfig; unsigned int numAugs = 0; Oam oam; SystemProcessStatus systemprocessstatus; @@ -3736,7 +3756,7 @@ int ProcessMonitor::updateConfig() { //ProcMon log file MonitorLog log; -// MonitorConfig config; + MonitorConfig currentConfig; // ProcessMonitor aMonitor(config, log); Oam oam; @@ -3758,7 +3778,7 @@ int ProcessMonitor::updateConfig() } //Update a map for application launch ID for this Process-Monitor - string OAMParentModuleType = config.OAMParentName().substr(0, MAX_MODULE_TYPE_SIZE); + string OAMParentModuleType = currentConfig.OAMParentName().substr(0, MAX_MODULE_TYPE_SIZE); string systemModuleType = config.moduleName().substr(0, MAX_MODULE_TYPE_SIZE); for ( unsigned int i = 0 ; i < systemprocessconfig.processconfig.size(); i++) @@ -4117,8 +4137,6 @@ int ProcessMonitor::createDataDirs(std::string cloud) MonitorLog log; Oam oam; - log.writeLog(__LINE__, "createDataDirs called", LOG_TYPE_DEBUG); - if ( config.moduleType() == "um" && ( cloud == "amazon-ec2" || cloud == "amazon-vpc") ) { @@ -4278,8 +4296,6 @@ int ProcessMonitor::getDBRMdata(string *path) Oam oam; ByteStream msg; - log.writeLog(__LINE__, "getDBRMdata called", LOG_TYPE_DEBUG); - int returnStatus = API_FAILURE; msg << (ByteStream::byte) GETDBRMDATA; @@ -4441,9 +4457,9 @@ int ProcessMonitor::getDBRMdata(string *path) //create journal file if none come across if ( !journalFile) { - string journalFilename = "/var/lib/columnstore/data1/systemFiles/dbrm/BRM_saves_journal"; - IDBDataFile *idbJournalFile = IDBDataFile::open(IDBPolicy::getType(journalFilename.c_str(), - IDBPolicy::WRITEENG), journalFilename.c_str(), "w", 0); + bf::path pJournalFilename(pTmp / "BRM_saves_journal"); + IDBDataFile *idbJournalFile = IDBDataFile::open(IDBPolicy::getType(pJournalFilename.string().c_str(), + IDBPolicy::WRITEENG), pJournalFilename.string().c_str(), "w", 0); delete idbJournalFile; //string cmd = "touch " + startup::StartUp::installDir() + "/data1/systemFiles/dbrm/BRM_saves_journal"; //system(cmd.c_str()); @@ -4964,8 +4980,6 @@ int ProcessMonitor::runMasterRep(std::string& masterLogFile, std::string& master { Oam oam; - log.writeLog(__LINE__, "runMasterRep function called", LOG_TYPE_DEBUG); - SystemModuleTypeConfig systemModuleTypeConfig; try @@ -5140,8 +5154,6 @@ int ProcessMonitor::runSlaveRep(std::string& masterLogFile, std::string& masterL { Oam oam; - log.writeLog(__LINE__, "runSlaveRep function called", LOG_TYPE_DEBUG); - // get master replicaion module IP Address string PrimaryUMModuleName; oam.getSystemConfig("PrimaryUMModuleName", PrimaryUMModuleName); @@ -5224,8 +5236,6 @@ int ProcessMonitor::runDisableRep() { Oam oam; - log.writeLog(__LINE__, "runDisableRep function called", LOG_TYPE_DEBUG); - // mysql port number string MySQLPort; @@ -5279,8 +5289,6 @@ int ProcessMonitor::runMasterDist(std::string& password, std::string& slaveModul { Oam oam; - log.writeLog(__LINE__, "runMasterDist function called", LOG_TYPE_DEBUG); - SystemModuleTypeConfig systemModuleTypeConfig; try @@ -5405,8 +5413,6 @@ bool ProcessMonitor::amazonIPCheck() MonitorLog log; Oam oam; - log.writeLog(__LINE__, "amazonIPCheck function called", LOG_TYPE_DEBUG); - // delete description file so it will create a new one string tmpLog = tmpLogDir + "/describeInstance.log"; unlink(tmpLog.c_str()); @@ -5490,9 +5496,11 @@ bool ProcessMonitor::amazonIPCheck() // get all ips if parent oam // get just parent and local if not parent oam - if ( config.moduleName() == config.OAMParentName() || + MonitorConfig currentConfig; + + if ( config.moduleName() == currentConfig.OAMParentName() || moduleName == config.moduleName() || - moduleName == config.OAMParentName() ) + moduleName == currentConfig.OAMParentName() ) { HostConfigList::iterator pt1 = (*pt).hostConfigList.begin(); @@ -5700,8 +5708,6 @@ void ProcessMonitor::unmountExtraDBroots() ModuleConfig moduleconfig; Oam oam; - log.writeLog(__LINE__, "unmountExtraDBroots called ", LOG_TYPE_DEBUG); - string DBRootStorageType = "internal"; try @@ -5797,8 +5803,6 @@ int ProcessMonitor::checkDataMount() //check/update the pmMount files - log.writeLog(__LINE__, "checkDataMount called ", LOG_TYPE_DEBUG); - string DBRootStorageType = "internal"; vector dbrootList; @@ -6031,8 +6035,6 @@ void ProcessMonitor::calTotalUmMemory() //check/update the pmMount files - log.writeLog(__LINE__, "calTotalUmMemory called ", LOG_TYPE_DEBUG); - try { sysinfo(&myinfo); @@ -6158,7 +6160,7 @@ int ProcessMonitor::glusterAssign(std::string dbrootID) if ( WEXITSTATUS(ret) != 0 ) { - log.writeLog(__LINE__, "glusterAssign mount failure: dbroot: " + dbrootID + " error: " + oam.itoa(WEXITSTATUS(ret)), LOG_TYPE_ERROR); + //log.writeLog(__LINE__, "glusterAssign mount failure: dbroot: " + dbrootID + " error: " + oam.itoa(WEXITSTATUS(ret)), LOG_TYPE_ERROR); ifstream in(tmpLog.c_str()); in.seekg(0, std::ios::end); @@ -6203,7 +6205,7 @@ int ProcessMonitor::glusterUnassign(std::string dbrootID) if ( WEXITSTATUS(ret) != 0 ) { - log.writeLog(__LINE__, "glusterUnassign mount failure: dbroot: " + dbrootID + " error: " + oam.itoa(WEXITSTATUS(ret)), LOG_TYPE_ERROR); + //log.writeLog(__LINE__, "glusterUnassign mount failure: dbroot: " + dbrootID + " error: " + oam.itoa(WEXITSTATUS(ret)), LOG_TYPE_ERROR); ifstream in(tmpLog.c_str()); in.seekg(0, std::ios::end); diff --git a/versioning/BRM/masterdbrmnode.cpp b/versioning/BRM/masterdbrmnode.cpp index 0fa5a2a51..68add1c8b 100644 --- a/versioning/BRM/masterdbrmnode.cpp +++ b/versioning/BRM/masterdbrmnode.cpp @@ -1278,7 +1278,7 @@ void MasterDBRMNode::doReload(messageqcpp::IOSocket* sock) for (i = 0; i < (int) slaves.size(); i++) { - MessageQueueClientPool::releaseInstance(slaves[i]); + MessageQueueClientPool::deleteInstance(slaves[i]); slaves[i] = NULL; } diff --git a/versioning/BRM/slavecomm.cpp b/versioning/BRM/slavecomm.cpp index eef66bea4..097cf3959 100644 --- a/versioning/BRM/slavecomm.cpp +++ b/versioning/BRM/slavecomm.cpp @@ -2195,7 +2195,7 @@ int SlaveComm::replayJournal(string prefix) fName = prefix + "_journal"; } - const char* filename = journalName.c_str(); + const char* filename = fName.c_str(); IDBDataFile* journalf = IDBDataFile::open( IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "rb", 0);