You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-07 03:22:57 +03:00
MCOL-1185 MCOL-436 - make processing alarms single thread process
This commit is contained in:
@@ -521,8 +521,7 @@ namespace oam
|
|||||||
ENABLEMYSQLREP,
|
ENABLEMYSQLREP,
|
||||||
DISABLEMYSQLREP,
|
DISABLEMYSQLREP,
|
||||||
GLUSTERASSIGN,
|
GLUSTERASSIGN,
|
||||||
GLUSTERUNASSIGN,
|
GLUSTERUNASSIGN
|
||||||
PROCESSALARM
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/** @brief Process Management - Mgr to Mon request options
|
/** @brief Process Management - Mgr to Mon request options
|
||||||
|
@@ -38,7 +38,7 @@ const std::string ACTIVE_ALARM_FILE = "/var/log/mariadb/columnstore/activeAlarms
|
|||||||
const std::string ALARM_FILE = "/var/log/mariadb/columnstore/alarm.log";
|
const std::string ALARM_FILE = "/var/log/mariadb/columnstore/alarm.log";
|
||||||
const std::string ALARM_ARCHIVE_FILE = "/var/log/mariadb/columnstore/archive";
|
const std::string ALARM_ARCHIVE_FILE = "/var/log/mariadb/columnstore/archive";
|
||||||
|
|
||||||
const bool ALARM_DEBUG = true;
|
const bool ALARM_DEBUG = false;
|
||||||
const uint16_t INVALID_ALARM_ID = 0;
|
const uint16_t INVALID_ALARM_ID = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -493,7 +493,6 @@ static void messageThread(Configuration config)
|
|||||||
sleep(60);
|
sleep(60);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/******************************************************************************************
|
/******************************************************************************************
|
||||||
@@ -508,6 +507,8 @@ static void alarmMessageThread(Configuration config)
|
|||||||
ProcessManager processManager(config, log);
|
ProcessManager processManager(config, log);
|
||||||
Oam oam;
|
Oam oam;
|
||||||
|
|
||||||
|
ByteStream msg;
|
||||||
|
|
||||||
//check for running active, then launch
|
//check for running active, then launch
|
||||||
while(true)
|
while(true)
|
||||||
{
|
{
|
||||||
@@ -525,8 +526,7 @@ static void alarmMessageThread(Configuration config)
|
|||||||
string cmd = "fuser -k " + port + "/tcp >/dev/null 2>&1";
|
string cmd = "fuser -k " + port + "/tcp >/dev/null 2>&1";
|
||||||
if ( !rootUser)
|
if ( !rootUser)
|
||||||
cmd = "sudo fuser -k " + port + "/tcp >/dev/null 2>&1";
|
cmd = "sudo fuser -k " + port + "/tcp >/dev/null 2>&1";
|
||||||
|
|
||||||
|
|
||||||
system(cmd.c_str());
|
system(cmd.c_str());
|
||||||
}
|
}
|
||||||
catch(...)
|
catch(...)
|
||||||
@@ -549,15 +549,66 @@ static void alarmMessageThread(Configuration config)
|
|||||||
{
|
{
|
||||||
fIos = procmgr.accept();
|
fIos = procmgr.accept();
|
||||||
|
|
||||||
pthread_t alarmMessagethread;
|
try{
|
||||||
int status = pthread_create (&alarmMessagethread, NULL, (void*(*)(void*)) &processAlarmMSG, &fIos);
|
msg = fIos.read();
|
||||||
|
|
||||||
if ( status != 0 )
|
if (msg.length() <= 0)
|
||||||
log.writeLog(__LINE__, "alarmmessagethread: pthread_create failed, return status = " + oam.itoa(status), LOG_TYPE_ERROR);
|
continue;
|
||||||
|
|
||||||
|
//log.writeLog(__LINE__, "MSG RECEIVED: Process Alarm Message");
|
||||||
|
|
||||||
|
ByteStream::byte alarmID;
|
||||||
|
std::string componentID;
|
||||||
|
ByteStream::byte state;
|
||||||
|
std::string ModuleName;
|
||||||
|
std::string processName;
|
||||||
|
ByteStream::byte pid;
|
||||||
|
ByteStream::byte tid;
|
||||||
|
|
||||||
|
msg >> alarmID;
|
||||||
|
msg >> componentID;
|
||||||
|
msg >> state;
|
||||||
|
msg >> ModuleName;
|
||||||
|
msg >> processName;
|
||||||
|
msg >> pid;
|
||||||
|
msg >> tid;
|
||||||
|
|
||||||
|
Alarm calAlarm;
|
||||||
|
|
||||||
|
calAlarm.setAlarmID (alarmID);
|
||||||
|
calAlarm.setComponentID (componentID);
|
||||||
|
calAlarm.setState (state);
|
||||||
|
calAlarm.setSname (ModuleName);
|
||||||
|
calAlarm.setPname (processName);
|
||||||
|
calAlarm.setPid (pid);
|
||||||
|
calAlarm.setTid (tid);
|
||||||
|
|
||||||
|
ALARMManager aManager;
|
||||||
|
aManager.processAlarmReport(calAlarm);
|
||||||
|
}
|
||||||
|
catch (exception& ex)
|
||||||
|
{
|
||||||
|
string error = ex.what();
|
||||||
|
log.writeLog(__LINE__, "EXCEPTION ERROR on read for ProcMgr_Alarm:" + error, LOG_TYPE_ERROR);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
catch(...)
|
||||||
|
{
|
||||||
|
log.writeLog(__LINE__, "EXCEPTION ERROR on read for ProcMgr_Alarm: Caught unknown exception!", LOG_TYPE_ERROR);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (exception& ex)
|
||||||
|
{
|
||||||
|
string error = ex.what();
|
||||||
|
log.writeLog(__LINE__, "EXCEPTION ERROR on accept for ProcMgr_Alarm:" + error, LOG_TYPE_ERROR);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
catch(...)
|
catch(...)
|
||||||
{}
|
{
|
||||||
|
log.writeLog(__LINE__, "EXCEPTION ERROR on accept for ProcMgr_Alarm: Caught unknown exception!", LOG_TYPE_ERROR);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (exception& ex)
|
catch (exception& ex)
|
||||||
@@ -574,7 +625,6 @@ static void alarmMessageThread(Configuration config)
|
|||||||
sleep(1);
|
sleep(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/******************************************************************************************
|
/******************************************************************************************
|
||||||
|
@@ -301,79 +301,6 @@ ProcessManager::~ProcessManager()
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
/******************************************************************************************
|
|
||||||
* @brief processAlarmMSG
|
|
||||||
*
|
|
||||||
* purpose: Process the Alarm message
|
|
||||||
*
|
|
||||||
******************************************************************************************/
|
|
||||||
//void ProcessManager::processAlarmMSG( messageqcpp::IOSocket fIos, messageqcpp::ByteStream msg)
|
|
||||||
void processAlarmMSG(messageqcpp::IOSocket* cfIos)
|
|
||||||
{
|
|
||||||
messageqcpp::IOSocket afIos = *cfIos;
|
|
||||||
|
|
||||||
pthread_t ThreadId;
|
|
||||||
ThreadId = pthread_self();
|
|
||||||
|
|
||||||
ByteStream msg;
|
|
||||||
|
|
||||||
try{
|
|
||||||
msg = afIos.read();
|
|
||||||
}
|
|
||||||
catch(...)
|
|
||||||
{
|
|
||||||
pthread_detach (ThreadId);
|
|
||||||
pthread_exit(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (msg.length() <= 0) {
|
|
||||||
afIos.close();
|
|
||||||
pthread_detach (ThreadId);
|
|
||||||
pthread_exit(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
Oam oam;
|
|
||||||
ProcessLog log;
|
|
||||||
|
|
||||||
Configuration config;
|
|
||||||
ProcessManager processManager(config, log);
|
|
||||||
|
|
||||||
log.writeLog(__LINE__, "MSG RECEIVED: Process Alarm Message");
|
|
||||||
|
|
||||||
ByteStream::byte alarmID;
|
|
||||||
std::string componentID;
|
|
||||||
ByteStream::byte state;
|
|
||||||
std::string ModuleName;
|
|
||||||
std::string processName;
|
|
||||||
ByteStream::byte pid;
|
|
||||||
ByteStream::byte tid;
|
|
||||||
|
|
||||||
msg >> alarmID;
|
|
||||||
msg >> componentID;
|
|
||||||
msg >> state;
|
|
||||||
msg >> ModuleName;
|
|
||||||
msg >> processName;
|
|
||||||
msg >> pid;
|
|
||||||
msg >> tid;
|
|
||||||
|
|
||||||
Alarm calAlarm;
|
|
||||||
|
|
||||||
calAlarm.setAlarmID (alarmID);
|
|
||||||
calAlarm.setComponentID (componentID);
|
|
||||||
calAlarm.setState (state);
|
|
||||||
calAlarm.setSname (ModuleName);
|
|
||||||
calAlarm.setPname (processName);
|
|
||||||
calAlarm.setPid (pid);
|
|
||||||
calAlarm.setTid (tid);
|
|
||||||
|
|
||||||
ALARMManager aManager;
|
|
||||||
aManager.processAlarmReport(calAlarm);
|
|
||||||
|
|
||||||
afIos.close();
|
|
||||||
pthread_detach (ThreadId);
|
|
||||||
pthread_exit(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
/******************************************************************************************
|
/******************************************************************************************
|
||||||
* @brief processMSG
|
* @brief processMSG
|
||||||
*
|
*
|
||||||
@@ -2703,43 +2630,6 @@ void processMSG(messageqcpp::IOSocket* cfIos)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* case PROCESSALARM:
|
|
||||||
{
|
|
||||||
log.writeLog(__LINE__, "MSG RECEIVED: Process Alarm Message");
|
|
||||||
|
|
||||||
ByteStream::byte alarmID;
|
|
||||||
std::string componentID;
|
|
||||||
ByteStream::byte state;
|
|
||||||
std::string ModuleName;
|
|
||||||
std::string processName;
|
|
||||||
ByteStream::byte pid;
|
|
||||||
ByteStream::byte tid;
|
|
||||||
|
|
||||||
msg >> alarmID;
|
|
||||||
msg >> componentID;
|
|
||||||
msg >> state;
|
|
||||||
msg >> ModuleName;
|
|
||||||
msg >> processName;
|
|
||||||
msg >> pid;
|
|
||||||
msg >> tid;
|
|
||||||
|
|
||||||
Alarm calAlarm;
|
|
||||||
|
|
||||||
calAlarm.setAlarmID (alarmID);
|
|
||||||
calAlarm.setComponentID (componentID);
|
|
||||||
calAlarm.setState (state);
|
|
||||||
calAlarm.setSname (ModuleName);
|
|
||||||
calAlarm.setPname (processName);
|
|
||||||
calAlarm.setPid (pid);
|
|
||||||
calAlarm.setTid (tid);
|
|
||||||
|
|
||||||
ALARMManager aManager;
|
|
||||||
aManager.processAlarmReport(calAlarm);
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
log.writeLog(__LINE__, "MSG RECEIVED: Invalid type" );
|
log.writeLog(__LINE__, "MSG RECEIVED: Invalid type" );
|
||||||
break;
|
break;
|
||||||
|
@@ -79,7 +79,6 @@ namespace processmanager{
|
|||||||
void startModuleThread(std::string moduleName);
|
void startModuleThread(std::string moduleName);
|
||||||
void stopModuleThread(std::string moduleName);
|
void stopModuleThread(std::string moduleName);
|
||||||
void processMSG(messageqcpp::IOSocket* fIos);
|
void processMSG(messageqcpp::IOSocket* fIos);
|
||||||
void processAlarmMSG(messageqcpp::IOSocket* fIos);
|
|
||||||
|
|
||||||
void sendUpgradeRequest();
|
void sendUpgradeRequest();
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user