1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

Merge branch 'columnstore-1.4.3' into merge-hotfix-patches-1.5

Conflicts:
	VERSION
	dbcon/mysql/ha_mcs.cpp
	dbcon/mysql/ha_mcs_execplan.cpp
	dbcon/mysql/ha_mcs_impl.cpp
	dbcon/mysql/ha_mcs_pushdown.cpp
	oam/install_scripts/columnstore-post-install.in
	oam/install_scripts/columnstore-pre-uninstall.in
	oam/install_scripts/columnstore.in
	oam/install_scripts/post-mysql-install
This commit is contained in:
Patrick LeBlanc
2020-04-27 17:17:55 -04:00
11 changed files with 360 additions and 134 deletions

View File

@ -2291,6 +2291,21 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows)
cal_impl_if::gp_walk_info gwi;
gwi.thd = thd;
if (thd->slave_thread && !get_replication_slave(thd) && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
{
if (affected_rows)
*affected_rows = 0;
return 0;
}
if (execute)
{
rc = doUpdateDelete(thd, gwi);
@ -4919,6 +4934,17 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
return 0;
}
if (thd->slave_thread && !get_replication_slave(thd) && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
//Update and delete code
if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
return doUpdateDelete(thd, gwi);

View File

@ -32,6 +32,7 @@ if [ ! -f @ENGINE_SYSCONFDIR@/columnstore/Columnstore.xml ]; then
fi
if [ -f @ENGINE_SYSCONFDIR@/columnstore/storagemanager.cnf.rpmsave ]; then
/bin/cp -f @ENGINE_SYSCONFDIR@/columnstore/storagemanager.cnf @ENGINE_SYSCONFDIR@/columnstore/storagemanager.cnf.new
/bin/cp -f @ENGINE_SYSCONFDIR@/columnstore/storagemanager.cnf.rpmsave @ENGINE_SYSCONFDIR@/columnstore/storagemanager.cnf
fi

View File

@ -5996,12 +5996,12 @@ bool Oam::autoMovePmDbroot(std::string residePM)
if ( ret != 0 )
{
writeLog("FAILURE: Error assigning gluster dbroot# " + itoa(dbrootID), LOG_TYPE_ERROR );
writeLog("FAILURE: Error assigning gluster dbroot# " + itoa(dbrootID) +" ret: " + itoa(ret), LOG_TYPE_ERROR );
}
}
catch (...)
{
writeLog("FAILURE: Error assigning gluster dbroot# " + itoa(dbrootID), LOG_TYPE_ERROR );
writeLog("EXCEPTION FAILURE: Error assigning gluster dbroot# " + itoa(dbrootID), LOG_TYPE_ERROR );
}
// check if a copy is available when residePM returns
@ -7448,15 +7448,15 @@ void Oam::removeDbroot(DBRootConfigList& dbrootlist)
catch (exception& e)
{
cout << endl << "**** glusterctl API exception: " << e.what() << endl;
cerr << "FAILURE: Error assigning gluster dbroot# " + itoa(dbrootID) << endl;
writeLog("FAILURE: Error assigning gluster dbroot# " + itoa(dbrootID), LOG_TYPE_ERROR );
cerr << "FAILURE: Error deleting gluster dbroot# " + itoa(dbrootID) << endl;
writeLog("FAILURE: Error deleting gluster dbroot# " + itoa(dbrootID), LOG_TYPE_ERROR );
exceptionControl("removeDbroot", API_FAILURE);
}
catch (...)
{
cout << endl << "**** glusterctl API exception: UNKNOWN" << endl;
cerr << "FAILURE: Error assigning gluster dbroot# " + itoa(dbrootID) << endl;
writeLog("FAILURE: Error assigning gluster dbroot# " + itoa(dbrootID), LOG_TYPE_ERROR );
cerr << "FAILURE: Error deleting gluster dbroot# " + itoa(dbrootID) << endl;
writeLog("FAILURE: Error deleting gluster dbroot# " + itoa(dbrootID), LOG_TYPE_ERROR );
exceptionControl("removeDbroot", API_FAILURE);
}
}
@ -9151,7 +9151,7 @@ int Oam::glusterctl(GLUSTER_COMMANDS command, std::string argument1, std::string
string dbr = sysConfig->getConfig("SystemModuleConfig", ModuleDBRootID);
string command = "" + DataRedundancyConfigs[pm].pmIpAddr +
":/dbroot" + dbr + " /var/lib/columnstore/data" + dbr +
" glusterfs defaults,direct-io-mode=enable 00";
" glusterfs defaults,direct-io-mode=enable 0 0";
string toPM = "pm" + itoa(pm + 1);
distributeFstabUpdates(command, toPM);
}

View File

@ -155,6 +155,7 @@ void OamCache::checkReload()
}
sleep(1);
++retry;
//cout << "pm " << *it << " -> connection " << (i-1) << endl;
}

View File

@ -47,6 +47,7 @@ using namespace threadpool;
using namespace config;
bool runStandby = false;
bool MsgThreadActive = false;
bool runCold = false;
string systemName = "system";
string iface_name;
@ -412,6 +413,9 @@ int main(int argc, char** argv)
log.writeLog(__LINE__, "ERROR: makeConfig failed", LOG_TYPE_ERROR);
}
// TODO: This is called before MessageThread is created.
// Doesn't break anything but can be removed as it's done after MessageThread creation.
try
{
oam.distributeConfigFile();
@ -474,8 +478,6 @@ static void messageThread(Configuration config)
sleep (1);
}
log.writeLog(__LINE__, "Message Thread started ..", LOG_TYPE_DEBUG);
//read and cleanup port before trying to use
try
{
@ -489,6 +491,8 @@ static void messageThread(Configuration config)
{
}
log.writeLog(__LINE__, "Message Thread started ..", LOG_TYPE_DEBUG);
//
//waiting for request
//
@ -499,7 +503,7 @@ static void messageThread(Configuration config)
try
{
MessageQueueServer procmgr("ProcMgr");
MsgThreadActive = true;
for (;;)
{
try
@ -1645,17 +1649,17 @@ void pingDeviceThread()
if (busy)
break;
//set query system state not ready
processManager.setQuerySystemState(false);
processManager.setSystemState(oam::BUSY_INIT);
processManager.reinitProcessType("cpimport");
// halt the dbrm
oam.dbrmctl("halt");
log.writeLog(__LINE__, "'dbrmctl halt' done", LOG_TYPE_DEBUG);
//set query system state not ready
processManager.setQuerySystemState(false);
processManager.setSystemState(oam::BUSY_INIT);
aManager.sendAlarmReport(moduleName.c_str(), MODULE_DOWN_AUTO, CLEAR);
//send notification
@ -1720,6 +1724,36 @@ void pingDeviceThread()
//set query system state ready
processManager.setQuerySystemState(true);
// waiting until dml are ACTIVE
// disableModule is going to trigger DMLProc to restart wait for it
int retry = 0;
while (retry < 30)
{
ProcessStatus DMLprocessstatus;
try
{
oam.getProcessStatus("DMLProc", config.moduleName(), DMLprocessstatus);
}
catch (exception& ex)
{}
catch (...)
{}
if (DMLprocessstatus.ProcessOpState == oam::BUSY_INIT)
log.writeLog(__LINE__, "Waiting for DMLProc to finish rollback", LOG_TYPE_DEBUG);
if (DMLprocessstatus.ProcessOpState == oam::ACTIVE)
break;
if (DMLprocessstatus.ProcessOpState == oam::FAILED)
break;
// wait some more
sleep(2);
++retry;
}
goto break_case;
}
}
@ -2012,6 +2046,7 @@ void pingDeviceThread()
log.writeLog(__LINE__, "Module failed to auto start: " + moduleName, LOG_TYPE_CRITICAL);
if ( amazon )
processManager.setSystemState(oam::FAILED);
else
@ -2020,6 +2055,35 @@ void pingDeviceThread()
//set query system state ready
processManager.setQuerySystemState(true);
// waiting until dml are ACTIVE
// disableModule is going to trigger DMLProc to restart wait for it
int retry = 0;
while (retry < 30)
{
ProcessStatus DMLprocessstatus;
try
{
oam.getProcessStatus("DMLProc", config.moduleName(), DMLprocessstatus);
}
catch (exception& ex)
{}
catch (...)
{}
if (DMLprocessstatus.ProcessOpState == oam::BUSY_INIT)
log.writeLog(__LINE__, "Waiting for DMLProc to finish rollback", LOG_TYPE_DEBUG);
if (DMLprocessstatus.ProcessOpState == oam::ACTIVE)
break;
if (DMLprocessstatus.ProcessOpState == oam::FAILED)
break;
// wait some more
sleep(2);
++retry;
}
//clear count
moduleInfoList[moduleName] = 0;
}
@ -2081,21 +2145,22 @@ void pingDeviceThread()
Configuration config;
log.writeLog(__LINE__, "*** module is down: " + moduleName, LOG_TYPE_CRITICAL);
//set query system state not ready
processManager.setQuerySystemState(false);
processManager.setSystemState(oam::BUSY_INIT);
processManager.reinitProcessType("cpimport");
// halt the dbrm
oam.dbrmctl("halt");
log.writeLog(__LINE__, "'dbrmctl halt' done", LOG_TYPE_DEBUG);
//set query system state not ready
processManager.setQuerySystemState(false);
processManager.setSystemState(oam::BUSY_INIT);
//string cmd = "/etc/init.d/glusterd restart > /dev/null 2>&1";
//system(cmd.c_str());
// call for a reload in case cpimport was running and
// some cleanup is needed on dbrmcontroller thats active before continuing
oam.dbrmctl("reload");
log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG);
//send notification
oam.sendDeviceNotification(moduleName, MODULE_DOWN);
@ -2109,9 +2174,7 @@ void pingDeviceThread()
//set module to disable state
processManager.disableModule(moduleName, false);
//call dbrm control
oam.dbrmctl("reload");
log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG);
// if pm, move dbroots to other pms
if ( ( moduleName.find("pm") == 0 && !amazon && ( DBRootStorageType != "internal") ) ||
@ -2142,6 +2205,9 @@ void pingDeviceThread()
{
processManager.setModuleState(moduleName, oam::AUTO_DISABLED);
//call dbrm control
oam.dbrmctl("reload");
log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG);
// resume the dbrm
oam.dbrmctl("resume");
log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG);
@ -2149,6 +2215,36 @@ void pingDeviceThread()
//set query system state ready
processManager.setQuerySystemState(true);
// waiting until dml are ACTIVE
// disableModule is going to trigger DMLProc to restart wait for it
int retry = 0;
while (retry < 30)
{
ProcessStatus DMLprocessstatus;
try
{
oam.getProcessStatus("DMLProc", config.moduleName(), DMLprocessstatus);
}
catch (exception& ex)
{}
catch (...)
{}
if (DMLprocessstatus.ProcessOpState == oam::BUSY_INIT)
log.writeLog(__LINE__, "Waiting for DMLProc to finish rollback", LOG_TYPE_DEBUG);
if (DMLprocessstatus.ProcessOpState == oam::ACTIVE)
break;
if (DMLprocessstatus.ProcessOpState == oam::FAILED)
break;
// wait some more
sleep(2);
++retry;
}
break;
}
}
@ -2160,6 +2256,9 @@ void pingDeviceThread()
{
if ( moduleName.find("um") == 0 )
{
//call dbrm control
oam.dbrmctl("reload");
log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG);
// resume the dbrm
oam.dbrmctl("resume");
log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG);
@ -2350,8 +2449,8 @@ void pingDeviceThread()
}
}
//set recycle process
processManager.recycleProcess(moduleName);
//set reinit process
processManager.reinitProcesses();
//set query system state ready
processManager.setQuerySystemState(true);
@ -2365,6 +2464,9 @@ void pingDeviceThread()
( opState != oam::AUTO_DISABLED ) )
{
//call dbrm control
oam.dbrmctl("reload");
log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG);
// resume the dbrm
oam.dbrmctl("resume");
log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG);
@ -2376,13 +2478,45 @@ void pingDeviceThread()
else
{
// non-amazon
//call dbrm control
oam.dbrmctl("reload");
log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG);
// resume the dbrm
oam.dbrmctl("resume");
log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG);
//set recycle process
processManager.recycleProcess(moduleName);
// waiting until dml are ACTIVE
// disableModule is going to trigger DMLProc to restart wait for it
int retry = 0;
while (retry < 30)
{
ProcessStatus DMLprocessstatus;
try
{
oam.getProcessStatus("DMLProc", config.moduleName(), DMLprocessstatus);
}
catch (exception& ex)
{}
catch (...)
{}
if (DMLprocessstatus.ProcessOpState == oam::BUSY_INIT)
log.writeLog(__LINE__, "Waiting for DMLProc to finish rollback", LOG_TYPE_DEBUG);
if (DMLprocessstatus.ProcessOpState == oam::ACTIVE)
break;
if (DMLprocessstatus.ProcessOpState == oam::FAILED)
break;
// wait some more
sleep(2);
++retry;
}
// restart DMLProc again to retrigger rollback with all dbroots connected
processManager.restartProcessType("DMLProc");
//set query system state ready
processManager.setQuerySystemState(true);
}

View File

@ -53,6 +53,7 @@ pthread_mutex_t THREAD_LOCK;
extern string cloud;
extern bool amazon;
extern bool runStandby;
extern bool MsgThreadActive;
extern string iface_name;
extern string PMInstanceType;
extern string UMInstanceType;
@ -920,8 +921,6 @@ void processMSG(messageqcpp::IOSocket* cfIos)
status = processManager.disableModule(moduleName, true);
log.writeLog(__LINE__, "Disable Module Completed on " + moduleName, LOG_TYPE_INFO);
processManager.recycleProcess(moduleName);
//check for SIMPLEX Processes on mate might need to be started
processManager.checkSimplexModule(moduleName);
@ -3806,8 +3805,6 @@ int ProcessManager::disableModule(string target, bool manualFlag)
//set Columnstore.xml enable state
setEnableState( target, SnewState);
log.writeLog(__LINE__, "disableModule - setEnableState", LOG_TYPE_DEBUG);
//sleep a bit to give time for the state change to apply
sleep(1);
@ -3816,8 +3813,6 @@ int ProcessManager::disableModule(string target, bool manualFlag)
{
if ( updatePMSconfig() != API_SUCCESS )
return API_FAILURE;
log.writeLog(__LINE__, "disableModule - Updated PM server Count", LOG_TYPE_DEBUG);
}
//Update DBRM section of Columnstore.xml
@ -3825,19 +3820,36 @@ int ProcessManager::disableModule(string target, bool manualFlag)
{
return API_FAILURE;
}
processManager.recycleProcess(target);
//check for SIMPLEX Processes on mate might need to be started
processManager.checkSimplexModule(target);
//distribute config file
distributeConfigFile("system");
processManager.reinitProcesses();
log.writeLog(__LINE__, "disableModule successfully complete for " + target, LOG_TYPE_DEBUG);
return API_SUCCESS;
}
void ProcessManager::reinitProcesses(std::string skipModule)
{
Oam oam;
log.writeLog(__LINE__, "reinitProcesses... ", LOG_TYPE_DEBUG);
reinitProcessType("DBRMWorkerNode");
reinitProcessType("WriteEngineServer");
restartProcessType("ExeMgr",skipModule);
sleep(1);
restartProcessType("DDLProc",skipModule);
sleep(1);
restartProcessType("DMLProc",skipModule);
sleep(3);
log.writeLog(__LINE__, "reinitProcesses complete", LOG_TYPE_DEBUG);
}
/******************************************************************************************
* @brief recycleProcess
*
@ -4886,6 +4898,7 @@ int ProcessManager::reinitProcessType( std::string processName )
if ( systemprocessstatus.processstatus[i].ProcessName == "ServerMonitor" )
{
// found one, request reinit of it
log.writeLog(__LINE__, "reinitProcessType: cpimport" + systemprocessstatus.processstatus[i].Module, LOG_TYPE_DEBUG);
retStatus = processManager.reinitProcess(systemprocessstatus.processstatus[i].Module,
"cpimport");
log.writeLog(__LINE__, "reinitProcessType: ACK received from Process-Monitor, return status = " + oam.itoa(retStatus), LOG_TYPE_DEBUG);
@ -6514,12 +6527,12 @@ int ProcessManager::sendMsgProcMon( std::string module, ByteStream msg, int requ
catch (SocketClosed& ex)
{
string error = ex.what();
// log.writeLog(__LINE__, "EXCEPTION ERROR on mqRequest.read, module " + module + " : " + error, LOG_TYPE_ERROR);
log.writeLog(__LINE__, "EXCEPTION ERROR on mqRequest.read, module " + module + " : " + error, LOG_TYPE_ERROR);
return returnStatus;
}
catch (...)
{
// log.writeLog(__LINE__, "EXCEPTION ERROR on mqRequest.read: Caught unknown exception! module " + module, LOG_TYPE_ERROR);
log.writeLog(__LINE__, "EXCEPTION ERROR on mqRequest.read: Caught unknown exception! module " + module, LOG_TYPE_ERROR);
return returnStatus;
}
@ -6567,11 +6580,11 @@ int ProcessManager::sendMsgProcMon( std::string module, ByteStream msg, int requ
catch (exception& ex)
{
string error = ex.what();
// log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueClient: " + error, LOG_TYPE_ERROR);
log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueClient: " + error, LOG_TYPE_ERROR);
}
catch (...)
{
// log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueClient: Caught unknown exception!", LOG_TYPE_ERROR);
log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueClient: Caught unknown exception!", LOG_TYPE_ERROR);
}
return returnStatus;
@ -6734,8 +6747,6 @@ void ProcessManager::setQuerySystemState(bool set)
Oam oam;
BRM::DBRM dbrm;
log.writeLog(__LINE__, "setQuerySystemState called = " + oam.itoa(set), LOG_TYPE_DEBUG);
try
{
dbrm.setSystemQueryReady(set);
@ -8005,8 +8016,6 @@ int ProcessManager::updatePMSconfig( bool check )
vector<string> IpAddrs;
vector<int> nicIDs;
log.writeLog(__LINE__, "updatePMSconfig Started", LOG_TYPE_DEBUG);
pthread_mutex_lock(&THREAD_LOCK);
ModuleTypeConfig moduletypeconfig;
@ -8166,8 +8175,6 @@ int ProcessManager::updatePMSconfig( bool check )
sysConfig1->write();
pthread_mutex_unlock(&THREAD_LOCK);
log.writeLog(__LINE__, "updatePMSconfig completed", LOG_TYPE_DEBUG);
return API_SUCCESS;
}
catch (...)
@ -8195,8 +8202,6 @@ int ProcessManager::updateWorkerNodeconfig()
vector <string> module;
vector <string> ipadr;
log.writeLog(__LINE__, "updateWorkerNodeconfig Started", LOG_TYPE_DEBUG);
pthread_mutex_lock(&THREAD_LOCK);
//setup current module as work-node #1 by entering it in first
@ -8309,8 +8314,6 @@ int ProcessManager::updateWorkerNodeconfig()
sysConfig3->write();
pthread_mutex_unlock(&THREAD_LOCK);
log.writeLog(__LINE__, "updateWorkerNodeconfig completed", LOG_TYPE_DEBUG);
return API_SUCCESS;
}
@ -8511,8 +8514,6 @@ int ProcessManager::setPMProcIPs( std::string moduleName, std::string processNam
Oam oam;
ModuleConfig moduleconfig;
log.writeLog(__LINE__, "setPMProcIPs called for " + moduleName, LOG_TYPE_DEBUG);
pthread_mutex_lock(&THREAD_LOCK);
if ( processName == oam::UnassignedName || processName == "DDLProc")
@ -8631,8 +8632,6 @@ int ProcessManager::distributeConfigFile(std::string name, std::string file)
Oam oam;
int returnStatus = oam::API_SUCCESS;
log.writeLog(__LINE__, "distributeConfigFile called for " + name + " file = " + file, LOG_TYPE_DEBUG);
string dirName = std::string(MCSSYSCONFDIR) + "/columnstore/";
string fileName = dirName + file;
@ -8817,7 +8816,6 @@ int ProcessManager::getDBRMData(messageqcpp::IOSocket fIos, std::string moduleNa
// StorageManager: Need to make these existence checks use an idbfilesystem op if we
// decide to put the BRM-managed files in cloud storage
string currentDbrmFile;
log.writeLog(__LINE__, "I declare that I am ProcMgr, and I am running getDBRMData!", LOG_TYPE_DEBUG);
IDBFileSystem &fs = IDBPolicy::getFs(currentFileName.c_str());
boost::scoped_ptr<IDBDataFile> oldFile(IDBDataFile::open(IDBPolicy::getType(currentFileName.c_str(),
IDBPolicy::WRITEENG),
@ -9198,6 +9196,14 @@ int ProcessManager::switchParentOAMModule(std::string newActiveModuleName)
//clear run standby flag;
runStandby = false;
int retryCount = 0;
//sleep, give time for message thread to startup
while (!MsgThreadActive && retryCount < 10)
{
log.writeLog(__LINE__, "Waiting for Message Thread...", LOG_TYPE_DEBUG);
sleep(5);
++retryCount;
}
int moduleID = atoi(newActiveModuleName.substr(MAX_MODULE_TYPE_SIZE, MAX_MODULE_ID_SIZE).c_str());
@ -9883,9 +9889,6 @@ int ProcessManager::OAMParentModuleChange()
log.writeLog(__LINE__, " ", LOG_TYPE_DEBUG);
log.writeLog(__LINE__, "*** OAMParentModule outage, OAM Parent Module change-over started ***", LOG_TYPE_DEBUG);
//run save.brm script
processManager.saveBRM(true, false);
gdownActiveOAMModule = downOAMParentName;
// update Columnstore.xml entries
@ -9933,10 +9936,19 @@ int ProcessManager::OAMParentModuleChange()
//clear run standby flag;
runStandby = false;
int retryCount = 0;
//sleep, give time for message thread to startup
while (!MsgThreadActive && retryCount < 10)
{
log.writeLog(__LINE__, "Waiting for Message Thread...", LOG_TYPE_DEBUG);
sleep(5);
++retryCount;
}
//run save.brm script
//Nope turns out this has to be done first...
processManager.saveBRM(false);
try
{
oam.autoMovePmDbroot(downOAMParentName);
@ -10032,19 +10044,23 @@ int ProcessManager::OAMParentModuleChange()
//do it here to get current processes active faster to process queries faster
processManager.setProcessStates(downOAMParentName, oam::AUTO_OFFLINE);
//set other down modules to disable state
//set OTHER down modules to disable state
vector<string>::iterator pt1 = downModuleList.begin();
for ( ; pt1 != downModuleList.end() ; pt1++)
{
// Don't do this again for downOAMParentName we just did it 3 lines ago
if (*pt1 != downOAMParentName)
{
disableModule(*pt1, false);
processManager.setProcessStates(*pt1, oam::AUTO_OFFLINE);
}
}
//distribute config file
distributeConfigFile("system");
//restart local module
//restart local module WHY??
processManager.stopModule(config.moduleName(), oam::FORCEFUL, true);
string localModule = config.moduleName();
@ -10060,8 +10076,11 @@ int ProcessManager::OAMParentModuleChange()
status = startsystemthreadStatus;
}
reinitProcessType("cpimport");
// waiting until dml are ACTIVE
while (true)
int retry = 0;
while (retry < 30)
{
ProcessStatus DMLprocessstatus;
@ -10085,25 +10104,18 @@ int ProcessManager::OAMParentModuleChange()
// wait some more
sleep(2);
++retry;
}
//set recycle process
processManager.recycleProcess(downOAMParentName);
//restart/reinit processes to force their release of the controller node port
if ( ( config.ServerInstallType() == oam::INSTALL_COMBINE_DM_UM_PM) &&
( moduleNameList.size() <= 0 && config.moduleType() == "pm") )
{
status = 0;
// Do Nothing
}
else
{
// processManager.restartProcessType("mysql", localModule);
// processManager.restartProcessType("ExeMgr", localModule);
// processManager.restartProcessType("WriteEngineServer", localModule);
// processManager.reinitProcessType("DBRMWorkerNode");
//send message to start new Standby Process-Manager, if needed
newStandbyModule = getStandbyModule();
@ -10184,15 +10196,6 @@ int ProcessManager::OAMParentModuleChange()
}
}
//restart DDLProc/DMLProc to perform any rollbacks, if needed
//dont rollback in amazon, wait until down pm recovers
// if ( ( config.ServerInstallType() != oam::INSTALL_COMBINE_DM_UM_PM )
// && !amazon ) {
// processManager.restartProcessType("DDLProc", config.moduleName());
// sleep(1);
// processManager.restartProcessType("DMLProc", config.moduleName());
// }
if ( config.ServerInstallType() == oam::INSTALL_COMBINE_DM_UM_PM )
{
//change master MySQL Replication setup
@ -10201,8 +10204,38 @@ int ProcessManager::OAMParentModuleChange()
processManager.setMySQLReplication(devicenetworklist, config.moduleName());
}
//set query system state not ready
processManager.setQuerySystemState(true);
processManager.restartProcessType("DBRMControllerNode");
processManager.reinitProcesses();
// waiting until dml are ACTIVE
retry = 0;
while (retry < 30)
{
ProcessStatus DMLprocessstatus;
try
{
oam.getProcessStatus("DMLProc", config.moduleName(), DMLprocessstatus);
}
catch (exception& ex)
{}
catch (...)
{}
if (DMLprocessstatus.ProcessOpState == oam::BUSY_INIT)
log.writeLog(__LINE__, "Waiting for DMLProc to finish rollback", LOG_TYPE_DEBUG);
if (DMLprocessstatus.ProcessOpState == oam::ACTIVE)
break;
if (DMLprocessstatus.ProcessOpState == oam::FAILED)
break;
// wait some more
sleep(2);
++retry;
}
// clear alarm
aManager.sendAlarmReport(config.moduleName().c_str(), MODULE_SWITCH_ACTIVE, CLEAR);
@ -10301,11 +10334,29 @@ std::string ProcessManager::getStandbyModule()
string backupStandbyModule = "NONE";
string newStandbyModule = "NONE";
log.writeLog(__LINE__, "getStandbyModule called", LOG_TYPE_DEBUG);
//check if gluster, if so then find PMs that have copies of DBROOT #1
string pmList = "";
try
{
oam.getProcessStatus(systemprocessstatus);
for ( unsigned int i = 0 ; i < systemprocessstatus.processstatus.size(); i++)
{
if ( systemprocessstatus.processstatus[i].ProcessName == "ProcessManager" &&
systemprocessstatus.processstatus[i].ProcessOpState == oam::STANDBY )
//already have a hot-standby
return "";
}
}
catch (exception& ex)
{
log.writeLog(__LINE__, "EXCEPTION ERROR on getProcessStatus: " + string(ex.what()), LOG_TYPE_ERROR);
}
catch (...)
{
log.writeLog(__LINE__, "EXCEPTION ERROR on getProcessStatus: Caught unknown exception!", LOG_TYPE_ERROR);
}
if (DataRedundancyConfig == "y")
{
@ -10314,8 +10365,6 @@ std::string ProcessManager::getStandbyModule()
string errmsg;
oam.glusterctl(oam::GLUSTER_WHOHAS, "1", pmList, errmsg);
log.writeLog(__LINE__, "GLUSTER_WHOHAS called:" + pmList, LOG_TYPE_DEBUG);
boost::char_separator<char> sep(" ");
boost::tokenizer< boost::char_separator<char> > tokens(pmList, sep);
@ -10357,8 +10406,6 @@ std::string ProcessManager::getStandbyModule()
//not gluster, check by status
try
{
oam.getProcessStatus(systemprocessstatus);
for ( unsigned int i = 0 ; i < systemprocessstatus.processstatus.size(); i++)
{
if ( systemprocessstatus.processstatus[i].ProcessName == "ProcessManager" &&
@ -10431,8 +10478,6 @@ bool ProcessManager::setStandbyModule(std::string newStandbyModule, bool send)
{
Oam oam;
log.writeLog(__LINE__, "setStandbyModule called", LOG_TYPE_DEBUG);
if ( newStandbyModule.empty() )
return true;
@ -10502,8 +10547,6 @@ bool ProcessManager::clearStandbyModule()
{
Oam oam;
log.writeLog(__LINE__, "clearStandbyModule called", LOG_TYPE_DEBUG);
pthread_mutex_lock(&THREAD_LOCK);
Configuration config;
@ -10793,8 +10836,6 @@ int ProcessManager::setMySQLReplication(oam::DeviceNetworkList devicenetworklist
{
Oam oam;
log.writeLog(__LINE__, "setMySQLReplication called", LOG_TYPE_DEBUG);
string MySQLRep;
try
@ -11121,7 +11162,15 @@ int ProcessManager::glusterAssign(std::string moduleName, std::string dbroot)
msg << dbroot;
int returnStatus = sendMsgProcMon( moduleName, msg, requestID, 30 );
int retry = 0;
// Try this for a minute because in failover the node returning to service may not be listening yet
while(returnStatus != API_SUCCESS && retry < 60)
{
log.writeLog(__LINE__, "glusterAssign retrying...", LOG_TYPE_DEBUG);
returnStatus = sendMsgProcMon( moduleName, msg, requestID, 30 );
sleep(1);
++retry;
}
if ( returnStatus == API_SUCCESS)
{
//log the success event
@ -11151,7 +11200,15 @@ int ProcessManager::glusterUnassign(std::string moduleName, std::string dbroot)
msg << dbroot;
int returnStatus = sendMsgProcMon( moduleName, msg, requestID, 30 );
int retry = 0;
// Try this for a minute because in failover the node returning to service may not be listening yet
while(returnStatus != API_SUCCESS && retry < 60)
{
log.writeLog(__LINE__, "glusterUnassign retrying...", LOG_TYPE_DEBUG);
returnStatus = sendMsgProcMon( moduleName, msg, requestID, 30 );
sleep(1);
++retry;
}
if ( returnStatus == API_SUCCESS)
{
//log the success event

View File

@ -299,6 +299,10 @@ public:
*/
int disableModule(std::string target, bool manualFlag);
/**
*@brief reinit Processes trying to replace recycleProcess
*/
void reinitProcesses(std::string skipModule = "none");
/**
*@brief recycle Processes
*/

View File

@ -3210,6 +3210,7 @@ void processStatusMSG(messageqcpp::IOSocket* cfIos)
oam::DeviceNetworkConfig devicenetworkconfig;
oam::DeviceNetworkList devicenetworklist;
string value;
MonitorConfig currentConfig;
*msg >> moduleCount;
@ -3223,7 +3224,7 @@ void processStatusMSG(messageqcpp::IOSocket* cfIos)
log.writeLog(__LINE__, "statusControl: REQUEST RECEIVED: Add Module");
string moduleType = devicenetworkconfig.DeviceName.substr(0, MAX_MODULE_TYPE_SIZE);
string OAMParentModuleType = config.OAMParentName().substr(0, 2);
string OAMParentModuleType = currentConfig.OAMParentName().substr(0, 2);
// add to module status shared memory
DeviceNetworkList::iterator pt = devicenetworklist.begin();

View File

@ -460,6 +460,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
{
Oam oam;
ByteStream ackMsg;
MonitorConfig currentConfig;
ByteStream::byte messageType;
ByteStream::byte requestID;
@ -809,6 +810,24 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
if ( processName == "cpimport" )
{
system("pkill -sighup cpimport");
for (int i=0; i < 10; i++)
{
//get pid
char buf[512];
FILE *cmd_pipe = popen("pidof -s cpimport", "r");
fgets(buf, 512, cmd_pipe);
pid_t pid = strtoul(buf, NULL, 10);
pclose( cmd_pipe );
if (pid)
sleep(2);
else
break;
}
// kill other processes
system("pkill -9 cpimport.bin");
}
else
{
@ -2217,6 +2236,7 @@ pid_t ProcessMonitor::startProcess(string processModuleType, string processName,
char* argList[MAXARGUMENTS];
unsigned int i = 0;
MonitorLog log;
MonitorConfig currentConfig;
unsigned int numAugs = 0;
Oam oam;
SystemProcessStatus systemprocessstatus;
@ -3736,7 +3756,7 @@ int ProcessMonitor::updateConfig()
{
//ProcMon log file
MonitorLog log;
// MonitorConfig config;
MonitorConfig currentConfig;
// ProcessMonitor aMonitor(config, log);
Oam oam;
@ -3758,7 +3778,7 @@ int ProcessMonitor::updateConfig()
}
//Update a map for application launch ID for this Process-Monitor
string OAMParentModuleType = config.OAMParentName().substr(0, MAX_MODULE_TYPE_SIZE);
string OAMParentModuleType = currentConfig.OAMParentName().substr(0, MAX_MODULE_TYPE_SIZE);
string systemModuleType = config.moduleName().substr(0, MAX_MODULE_TYPE_SIZE);
for ( unsigned int i = 0 ; i < systemprocessconfig.processconfig.size(); i++)
@ -4117,8 +4137,6 @@ int ProcessMonitor::createDataDirs(std::string cloud)
MonitorLog log;
Oam oam;
log.writeLog(__LINE__, "createDataDirs called", LOG_TYPE_DEBUG);
if ( config.moduleType() == "um" &&
( cloud == "amazon-ec2" || cloud == "amazon-vpc") )
{
@ -4278,8 +4296,6 @@ int ProcessMonitor::getDBRMdata(string *path)
Oam oam;
ByteStream msg;
log.writeLog(__LINE__, "getDBRMdata called", LOG_TYPE_DEBUG);
int returnStatus = API_FAILURE;
msg << (ByteStream::byte) GETDBRMDATA;
@ -4441,9 +4457,9 @@ int ProcessMonitor::getDBRMdata(string *path)
//create journal file if none come across
if ( !journalFile)
{
string journalFilename = "/var/lib/columnstore/data1/systemFiles/dbrm/BRM_saves_journal";
IDBDataFile *idbJournalFile = IDBDataFile::open(IDBPolicy::getType(journalFilename.c_str(),
IDBPolicy::WRITEENG), journalFilename.c_str(), "w", 0);
bf::path pJournalFilename(pTmp / "BRM_saves_journal");
IDBDataFile *idbJournalFile = IDBDataFile::open(IDBPolicy::getType(pJournalFilename.string().c_str(),
IDBPolicy::WRITEENG), pJournalFilename.string().c_str(), "w", 0);
delete idbJournalFile;
//string cmd = "touch " + startup::StartUp::installDir() + "/data1/systemFiles/dbrm/BRM_saves_journal";
//system(cmd.c_str());
@ -4964,8 +4980,6 @@ int ProcessMonitor::runMasterRep(std::string& masterLogFile, std::string& master
{
Oam oam;
log.writeLog(__LINE__, "runMasterRep function called", LOG_TYPE_DEBUG);
SystemModuleTypeConfig systemModuleTypeConfig;
try
@ -5140,8 +5154,6 @@ int ProcessMonitor::runSlaveRep(std::string& masterLogFile, std::string& masterL
{
Oam oam;
log.writeLog(__LINE__, "runSlaveRep function called", LOG_TYPE_DEBUG);
// get master replicaion module IP Address
string PrimaryUMModuleName;
oam.getSystemConfig("PrimaryUMModuleName", PrimaryUMModuleName);
@ -5224,8 +5236,6 @@ int ProcessMonitor::runDisableRep()
{
Oam oam;
log.writeLog(__LINE__, "runDisableRep function called", LOG_TYPE_DEBUG);
// mysql port number
string MySQLPort;
@ -5279,8 +5289,6 @@ int ProcessMonitor::runMasterDist(std::string& password, std::string& slaveModul
{
Oam oam;
log.writeLog(__LINE__, "runMasterDist function called", LOG_TYPE_DEBUG);
SystemModuleTypeConfig systemModuleTypeConfig;
try
@ -5405,8 +5413,6 @@ bool ProcessMonitor::amazonIPCheck()
MonitorLog log;
Oam oam;
log.writeLog(__LINE__, "amazonIPCheck function called", LOG_TYPE_DEBUG);
// delete description file so it will create a new one
string tmpLog = tmpLogDir + "/describeInstance.log";
unlink(tmpLog.c_str());
@ -5490,9 +5496,11 @@ bool ProcessMonitor::amazonIPCheck()
// get all ips if parent oam
// get just parent and local if not parent oam
if ( config.moduleName() == config.OAMParentName() ||
MonitorConfig currentConfig;
if ( config.moduleName() == currentConfig.OAMParentName() ||
moduleName == config.moduleName() ||
moduleName == config.OAMParentName() )
moduleName == currentConfig.OAMParentName() )
{
HostConfigList::iterator pt1 = (*pt).hostConfigList.begin();
@ -5700,8 +5708,6 @@ void ProcessMonitor::unmountExtraDBroots()
ModuleConfig moduleconfig;
Oam oam;
log.writeLog(__LINE__, "unmountExtraDBroots called ", LOG_TYPE_DEBUG);
string DBRootStorageType = "internal";
try
@ -5797,8 +5803,6 @@ int ProcessMonitor::checkDataMount()
//check/update the pmMount files
log.writeLog(__LINE__, "checkDataMount called ", LOG_TYPE_DEBUG);
string DBRootStorageType = "internal";
vector <string> dbrootList;
@ -6031,8 +6035,6 @@ void ProcessMonitor::calTotalUmMemory()
//check/update the pmMount files
log.writeLog(__LINE__, "calTotalUmMemory called ", LOG_TYPE_DEBUG);
try
{
sysinfo(&myinfo);
@ -6158,7 +6160,7 @@ int ProcessMonitor::glusterAssign(std::string dbrootID)
if ( WEXITSTATUS(ret) != 0 )
{
log.writeLog(__LINE__, "glusterAssign mount failure: dbroot: " + dbrootID + " error: " + oam.itoa(WEXITSTATUS(ret)), LOG_TYPE_ERROR);
//log.writeLog(__LINE__, "glusterAssign mount failure: dbroot: " + dbrootID + " error: " + oam.itoa(WEXITSTATUS(ret)), LOG_TYPE_ERROR);
ifstream in(tmpLog.c_str());
in.seekg(0, std::ios::end);
@ -6203,7 +6205,7 @@ int ProcessMonitor::glusterUnassign(std::string dbrootID)
if ( WEXITSTATUS(ret) != 0 )
{
log.writeLog(__LINE__, "glusterUnassign mount failure: dbroot: " + dbrootID + " error: " + oam.itoa(WEXITSTATUS(ret)), LOG_TYPE_ERROR);
//log.writeLog(__LINE__, "glusterUnassign mount failure: dbroot: " + dbrootID + " error: " + oam.itoa(WEXITSTATUS(ret)), LOG_TYPE_ERROR);
ifstream in(tmpLog.c_str());
in.seekg(0, std::ios::end);

View File

@ -1278,7 +1278,7 @@ void MasterDBRMNode::doReload(messageqcpp::IOSocket* sock)
for (i = 0; i < (int) slaves.size(); i++)
{
MessageQueueClientPool::releaseInstance(slaves[i]);
MessageQueueClientPool::deleteInstance(slaves[i]);
slaves[i] = NULL;
}

View File

@ -2195,7 +2195,7 @@ int SlaveComm::replayJournal(string prefix)
fName = prefix + "_journal";
}
const char* filename = journalName.c_str();
const char* filename = fName.c_str();
IDBDataFile* journalf = IDBDataFile::open(
IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "rb", 0);