1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

mcol-833 - merge code from 1.0 for missing file fix

This commit is contained in:
david hill
2017-07-26 15:03:52 -05:00
parent 7691f05191
commit 26ac4aa31c
4 changed files with 89 additions and 59 deletions

View File

@ -336,7 +336,7 @@ void DistributedEngineComm::Listen(boost::shared_ptr<MessageQueueClient> client,
Error: Error:
// @bug 488 - error condition! push 0 length bs to messagequeuemap and // @bug 488 - error condition! push 0 length bs to messagequeuemap and
// eventually let jobstep error out. // eventually let jobstep error out.
mutex::scoped_lock lk(fMlock); /* mutex::scoped_lock lk(fMlock);
//cout << "WARNING: DEC READ 0 LENGTH BS FROM " << client->otherEnd()<< endl; //cout << "WARNING: DEC READ 0 LENGTH BS FROM " << client->otherEnd()<< endl;
MessageQueueMap::iterator map_tok; MessageQueueMap::iterator map_tok;
@ -370,7 +370,7 @@ Error:
fPmConnections.swap(tempConns); fPmConnections.swap(tempConns);
pmCount = (pmCount == 0 ? 0 : pmCount - 1); pmCount = (pmCount == 0 ? 0 : pmCount - 1);
//cout << "PMCOUNT=" << pmCount << endl; //cout << "PMCOUNT=" << pmCount << endl;
*/
// send alarm & log it // send alarm & log it
ALARMManager alarmMgr; ALARMManager alarmMgr;
string alarmItem = client->addr2String(); string alarmItem = client->addr2String();
@ -861,7 +861,7 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin
{ {
// @bug 488. error out under such condition instead of re-trying other connection, // @bug 488. error out under such condition instead of re-trying other connection,
// by pushing 0 size bytestream to messagequeue and throw excpetion // by pushing 0 size bytestream to messagequeue and throw excpetion
SBS sbs; /* SBS sbs;
lk.lock(); lk.lock();
//cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << endl; //cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << endl;
MessageQueueMap::iterator map_tok; MessageQueueMap::iterator map_tok;
@ -894,7 +894,7 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin
fPmConnections.swap(tempConns); fPmConnections.swap(tempConns);
pmCount = (pmCount == 0 ? 0 : pmCount - 1); pmCount = (pmCount == 0 ? 0 : pmCount - 1);
} }
*/
// send alarm // send alarm
ALARMManager alarmMgr; ALARMManager alarmMgr;
string alarmItem("UNKNOWN"); string alarmItem("UNKNOWN");

View File

@ -58,6 +58,7 @@ bool HDFS = false;
string localHostName; string localHostName;
string PMwithUM = "n"; string PMwithUM = "n";
string MySQLRep = "n"; string MySQLRep = "n";
string DBRootStorageType = "internal";
// pushing the ACTIVE_ALARMS_FILE to all nodes every 10 seconds. // pushing the ACTIVE_ALARMS_FILE to all nodes every 10 seconds.
const int ACTIVE_ALARMS_PUSHING_INTERVAL = 10; const int ACTIVE_ALARMS_PUSHING_INTERVAL = 10;
@ -1365,6 +1366,9 @@ void pingDeviceThread()
break; break;
//set query system state not ready //set query system state not ready
BRM::DBRM dbrm;
dbrm.setSystemQueryReady(false);
processManager.setQuerySystemState(false); processManager.setQuerySystemState(false);
processManager.setSystemState(oam::BUSY_INIT); processManager.setSystemState(oam::BUSY_INIT);
@ -1380,19 +1384,19 @@ void pingDeviceThread()
//send notification //send notification
oam.sendDeviceNotification(config.moduleName(), MODULE_UP); oam.sendDeviceNotification(config.moduleName(), MODULE_UP);
//set module to enable state
processManager.enableModule(moduleName, oam::AUTO_OFFLINE);
int status; int status;
// if pm, move dbroots back to pm // if shared pm, move dbroots back to pm
if ( ( moduleName.find("pm") == 0 && !amazon ) || if ( ( moduleName.find("pm") == 0 && !amazon && ( DBRootStorageType != "internal") ) ||
( moduleName.find("pm") == 0 && amazon && downActiveOAMModule ) || ( moduleName.find("pm") == 0 && amazon && downActiveOAMModule ) ||
( moduleName.find("pm") == 0 && amazon && AmazonPMFailover == "y") ) { ( moduleName.find("pm") == 0 && amazon && AmazonPMFailover == "y") ) {
//restart to get the versionbuffer files closed so it can be unmounted //restart to get the versionbuffer files closed so it can be unmounted
processManager.restartProcessType("WriteEngineServer", moduleName); processManager.restartProcessType("WriteEngineServer", moduleName);
//set module to enable state
processManager.enableModule(moduleName, oam::AUTO_OFFLINE);
downActiveOAMModule = false; downActiveOAMModule = false;
int retry; int retry;
for ( retry = 0 ; retry < 5 ; retry++ ) for ( retry = 0 ; retry < 5 ; retry++ )
@ -1484,6 +1488,9 @@ void pingDeviceThread()
break; break;
} }
} }
else
//set module to enable state
processManager.enableModule(moduleName, oam::AUTO_OFFLINE);
//restart module processes //restart module processes
int retry = 0; int retry = 0;
@ -1584,14 +1591,6 @@ void pingDeviceThread()
continue; continue;
} }
//call dbrm control, need to resume before start so the getdbrmfiles halt doesn't hang
oam.dbrmctl("reload");
log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG);
// resume the dbrm
oam.dbrmctl("resume");
log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG);
// next, startmodule // next, startmodule
status = processManager.startModule(moduleName, oam::FORCEFUL, oam::AUTO_OFFLINE); status = processManager.startModule(moduleName, oam::FORCEFUL, oam::AUTO_OFFLINE);
if ( status == oam::API_SUCCESS ) if ( status == oam::API_SUCCESS )
@ -1606,6 +1605,14 @@ void pingDeviceThread()
if ( retry < ModuleProcMonWaitCount ) if ( retry < ModuleProcMonWaitCount )
{ // module successfully started { // module successfully started
//call dbrm control, need to resume before start so the getdbrmfiles halt doesn't hang
oam.dbrmctl("reload");
log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG);
// resume the dbrm
oam.dbrmctl("resume");
log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG);
//distribute config file //distribute config file
processManager.distributeConfigFile("system"); processManager.distributeConfigFile("system");
sleep(1); sleep(1);
@ -1647,6 +1654,9 @@ void pingDeviceThread()
processManager.restartProcessType("DMLProc", moduleName); processManager.restartProcessType("DMLProc", moduleName);
} }
//enable query stats
dbrm.setSystemQueryReady(true);
//set query system state ready //set query system state ready
processManager.setQuerySystemState(true); processManager.setQuerySystemState(true);
@ -1664,8 +1674,9 @@ void pingDeviceThread()
aManager.sendAlarmReport(moduleName.c_str(), MODULE_DOWN_AUTO, SET); aManager.sendAlarmReport(moduleName.c_str(), MODULE_DOWN_AUTO, SET);
// if pm, move dbroots back to pm // if pm, move dbroots back to pm
if ( ( moduleName.find("pm") == 0 && !amazon ) || if ( ( moduleName.find("pm") == 0 && !amazon && ( DBRootStorageType != "internal") ) ||
( moduleName.find("pm") == 0 && amazon && downActiveOAMModule ) ) { ( moduleName.find("pm") == 0 && amazon && downActiveOAMModule ) ||
( moduleName.find("pm") == 0 && amazon && AmazonPMFailover == "y") ) {
//move dbroots to other modules //move dbroots to other modules
try { try {
log.writeLog(__LINE__, "Call autoMovePmDbroot", LOG_TYPE_DEBUG); log.writeLog(__LINE__, "Call autoMovePmDbroot", LOG_TYPE_DEBUG);
@ -1703,6 +1714,9 @@ void pingDeviceThread()
else else
processManager.setSystemState(oam::ACTIVE); processManager.setSystemState(oam::ACTIVE);
//enable query stats
dbrm.setSystemQueryReady(true);
//set query system state ready //set query system state ready
processManager.setQuerySystemState(true); processManager.setQuerySystemState(true);
@ -1741,8 +1755,13 @@ void pingDeviceThread()
log.writeLog(__LINE__, "module is down: " + moduleName, LOG_TYPE_CRITICAL); log.writeLog(__LINE__, "module is down: " + moduleName, LOG_TYPE_CRITICAL);
//set query system state not ready //set query system state not ready
BRM::DBRM dbrm;
dbrm.setSystemQueryReady(false);
processManager.setQuerySystemState(false); processManager.setQuerySystemState(false);
processManager.setSystemState(oam::BUSY_INIT);
processManager.reinitProcessType("cpimport"); processManager.reinitProcessType("cpimport");
// halt the dbrm // halt the dbrm
@ -1771,25 +1790,24 @@ void pingDeviceThread()
log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG); log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG);
// if pm, move dbroots to other pms // if pm, move dbroots to other pms
if ( !amazon || if ( ( moduleName.find("pm") == 0 && !amazon && ( DBRootStorageType != "internal") ) ||
( amazon && AmazonPMFailover == "y") ) { ( moduleName.find("pm") == 0 && amazon && downActiveOAMModule ) ||
if( moduleName.find("pm") == 0 ) { ( moduleName.find("pm") == 0 && amazon && AmazonPMFailover == "y") ) {
try { try {
log.writeLog(__LINE__, "Call autoMovePmDbroot", LOG_TYPE_DEBUG); log.writeLog(__LINE__, "Call autoMovePmDbroot", LOG_TYPE_DEBUG);
oam.autoMovePmDbroot(moduleName); oam.autoMovePmDbroot(moduleName);
log.writeLog(__LINE__, "autoMovePmDbroot success", LOG_TYPE_DEBUG); log.writeLog(__LINE__, "autoMovePmDbroot success", LOG_TYPE_DEBUG);
//distribute config file //distribute config file
processManager.distributeConfigFile("system"); processManager.distributeConfigFile("system");
} }
catch (exception& ex) catch (exception& ex)
{ {
string error = ex.what(); string error = ex.what();
log.writeLog(__LINE__, "EXCEPTION ERROR on autoMovePmDbroot: " + error, LOG_TYPE_DEBUG); log.writeLog(__LINE__, "EXCEPTION ERROR on autoMovePmDbroot: " + error, LOG_TYPE_DEBUG);
} }
catch(...) catch(...)
{ {
log.writeLog(__LINE__, "EXCEPTION ERROR on autoMovePmDbroot: Caught unknown exception!", LOG_TYPE_ERROR); log.writeLog(__LINE__, "EXCEPTION ERROR on autoMovePmDbroot: Caught unknown exception!", LOG_TYPE_ERROR);
}
} }
} }
@ -1968,7 +1986,9 @@ void pingDeviceThread()
processManager.removeModule(devicenetworklist, false); processManager.removeModule(devicenetworklist, false);
// if pm, move dbroots to other pms // if pm, move dbroots to other pms
if( moduleName.find("pm") == 0 ) { if ( ( moduleName.find("pm") == 0 && !amazon && ( DBRootStorageType != "internal") ) ||
( moduleName.find("pm") == 0 && amazon && downActiveOAMModule ) ||
( moduleName.find("pm") == 0 && amazon && AmazonPMFailover == "y") ) {
try { try {
log.writeLog(__LINE__, "Call autoMovePmDbroot", LOG_TYPE_DEBUG); log.writeLog(__LINE__, "Call autoMovePmDbroot", LOG_TYPE_DEBUG);
oam.autoMovePmDbroot(moduleName); oam.autoMovePmDbroot(moduleName);
@ -1990,6 +2010,9 @@ void pingDeviceThread()
//set recycle process //set recycle process
processManager.recycleProcess(moduleName); processManager.recycleProcess(moduleName);
//enable query stats
dbrm.setSystemQueryReady(true);
//set query system state ready //set query system state ready
processManager.setQuerySystemState(true); processManager.setQuerySystemState(true);
@ -2004,6 +2027,9 @@ void pingDeviceThread()
oam.dbrmctl("resume"); oam.dbrmctl("resume");
log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG); log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG);
//enable query stats
dbrm.setSystemQueryReady(true);
//set query system state ready //set query system state ready
processManager.setQuerySystemState(true); processManager.setQuerySystemState(true);
} }
@ -2017,6 +2043,9 @@ void pingDeviceThread()
//set recycle process //set recycle process
processManager.recycleProcess(moduleName); processManager.recycleProcess(moduleName);
//enable query stats
dbrm.setSystemQueryReady(true);
//set query system state ready //set query system state ready
processManager.setQuerySystemState(true); processManager.setQuerySystemState(true);

View File

@ -72,6 +72,7 @@ bool startsystemthreadRunning = false;
string gdownActiveOAMModule; string gdownActiveOAMModule;
vector<string> downModuleList; vector<string> downModuleList;
bool startFailOver = false; bool startFailOver = false;
extern string DBRootStorageType;
string masterLogFile = oam::UnassignedName; string masterLogFile = oam::UnassignedName;
string masterLogPos = oam::UnassignedName; string masterLogPos = oam::UnassignedName;
@ -2791,6 +2792,16 @@ void processMSG(messageqcpp::IOSocket* cfIos)
log.writeLog(__LINE__, "MSG RECEIVED: Process Restarted on " + moduleName + "/" + processName); log.writeLog(__LINE__, "MSG RECEIVED: Process Restarted on " + moduleName + "/" + processName);
//set query system states not ready
BRM::DBRM dbrm;
dbrm.setSystemQueryReady(false);
processManager.setQuerySystemState(false);
processManager.setSystemState(oam::BUSY_INIT);
processManager.reinitProcessType("cpimport");
//request reinit after Process is active //request reinit after Process is active
for ( int i = 0; i < 600 ; i++ ) { for ( int i = 0; i < 600 ; i++ ) {
try { try {
@ -2916,6 +2927,13 @@ void processMSG(messageqcpp::IOSocket* cfIos)
break; break;
} }
} }
//enable query stats
dbrm.setSystemQueryReady(true);
processManager.setQuerySystemState(true);
processManager.setSystemState(oam::ACTIVE);
} }
break; break;
@ -8525,14 +8543,6 @@ int ProcessManager::switchParentOAMModule(std::string newActiveModuleName)
log.writeLog(__LINE__, "switchParentOAMModule Function Started", LOG_TYPE_DEBUG); log.writeLog(__LINE__, "switchParentOAMModule Function Started", LOG_TYPE_DEBUG);
string DBRootStorageType = "internal";
{
try{
oam.getSystemConfig("DBRootStorageType", DBRootStorageType);
}
catch(...) {}
}
if ( DBRootStorageType == "internal" && GlusterConfig == "n") { if ( DBRootStorageType == "internal" && GlusterConfig == "n") {
log.writeLog(__LINE__, "ERROR: DBRootStorageType = internal", LOG_TYPE_ERROR); log.writeLog(__LINE__, "ERROR: DBRootStorageType = internal", LOG_TYPE_ERROR);
pthread_mutex_unlock(&THREAD_LOCK); pthread_mutex_unlock(&THREAD_LOCK);
@ -8818,15 +8828,6 @@ int ProcessManager::OAMParentModuleChange()
log.writeLog(__LINE__, "EXCEPTION ERROR on getSystemConfig: Caught unknown exception!", LOG_TYPE_ERROR); log.writeLog(__LINE__, "EXCEPTION ERROR on getSystemConfig: Caught unknown exception!", LOG_TYPE_ERROR);
} }
// dbroot storage type, do different failover if internal
string DBRootStorageType = "internal";
{
try{
oam.getSystemConfig("DBRootStorageType", DBRootStorageType);
}
catch(...) {}
}
string cmdLine = "ping "; string cmdLine = "ping ";
string cmdOption = " -c 1 -w 5 >> /dev/null"; string cmdOption = " -c 1 -w 5 >> /dev/null";
string cmd; string cmd;

View File

@ -595,7 +595,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
log.writeLog(__LINE__, "START: process already active " + processName); log.writeLog(__LINE__, "START: process already active " + processName);
//Inform Process Manager that Process restart //Inform Process Manager that Process restart
processRestarted(processName); //processRestarted(processName);
ackMsg << (ByteStream::byte) ACK; ackMsg << (ByteStream::byte) ACK;
ackMsg << (ByteStream::byte) START; ackMsg << (ByteStream::byte) START;
@ -694,7 +694,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
} }
//Inform Process Manager that Process restart //Inform Process Manager that Process restart
processRestarted(processName); //processRestarted(processName);
ackMsg << (ByteStream::byte) ACK; ackMsg << (ByteStream::byte) ACK;
ackMsg << (ByteStream::byte) RESTART; ackMsg << (ByteStream::byte) RESTART;