1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-1185 MCOL-436 -send alarms to procmgr to be processed

This commit is contained in:
david hill
2018-02-02 09:04:07 -06:00
parent c792188a5f
commit b4e64f24f3
11 changed files with 307 additions and 66 deletions

View File

@ -19,6 +19,10 @@
<IPAddr>0.0.0.0</IPAddr> <IPAddr>0.0.0.0</IPAddr>
<Port>8603</Port> <Port>8603</Port>
</ProcMgr> </ProcMgr>
<ProcMgr_Alarm>
<IPAddr>127.0.0.1</IPAddr>
<Port>8606</Port>
</ProcMgr_Alarm>
<ProcStatusControl> <ProcStatusControl>
<IPAddr>0.0.0.0</IPAddr> <IPAddr>0.0.0.0</IPAddr>
<Port>8604</Port> <Port>8604</Port>

View File

@ -14,6 +14,10 @@
<IPAddr>127.0.0.1</IPAddr> <IPAddr>127.0.0.1</IPAddr>
<Port>8603</Port> <Port>8603</Port>
</ProcMgr> </ProcMgr>
<ProcMgr_Alarm>
<IPAddr>127.0.0.1</IPAddr>
<Port>8606</Port>
</ProcMgr_Alarm>
<ProcStatusControl> <ProcStatusControl>
<IPAddr>127.0.0.1</IPAddr> <IPAddr>127.0.0.1</IPAddr>
<Port>8604</Port> <Port>8604</Port>

View File

@ -38,7 +38,7 @@ const std::string ACTIVE_ALARM_FILE = "/var/log/mariadb/columnstore/activeAlarms
const std::string ALARM_FILE = "/var/log/mariadb/columnstore/alarm.log"; const std::string ALARM_FILE = "/var/log/mariadb/columnstore/alarm.log";
const std::string ALARM_ARCHIVE_FILE = "/var/log/mariadb/columnstore/archive"; const std::string ALARM_ARCHIVE_FILE = "/var/log/mariadb/columnstore/archive";
const bool ALARM_DEBUG = false; const bool ALARM_DEBUG = true;
const uint16_t INVALID_ALARM_ID = 0; const uint16_t INVALID_ALARM_ID = 0;
} }

View File

@ -28,10 +28,10 @@
#include <vector> #include <vector>
#include <iterator> #include <iterator>
#include "messagequeue.h"
#include "alarmglobal.h" #include "alarmglobal.h"
#include "liboamcpp.h" #include "liboamcpp.h"
#include "installdir.h" #include "installdir.h"
#include "messagequeue.h"
using namespace std; using namespace std;
using namespace oam; using namespace oam;
@ -373,11 +373,11 @@ void configAlarm (Alarm& calAlarm)
/***************************************************************************************** /*****************************************************************************************
* @brief sendAlarmReport API * @brief sendAlarmReport API
* *
* purpose: Process Alarm Report * purpose: Send Alarm Report
* *
*****************************************************************************************/ *****************************************************************************************/
void ALARMManager::sendAlarmReport (const char* componentID, int alarmID, int state, void ALARMManager::sendAlarmReport (const char* componentID, int alarmID, int state,
std::string repModuleName, std::string repProcessName) std::string repModuleName, std::string repProcessName)
{ {
#ifdef SKIP_ALARM #ifdef SKIP_ALARM
@ -438,16 +438,73 @@ void ALARMManager::sendAlarmReport (const char* componentID, int alarmID, int st
else else
processName = repProcessName; processName = repProcessName;
Alarm calAlarm; int returnStatus = API_SUCCESS; //default
ByteStream msg1;
calAlarm.setAlarmID (alarmID);
calAlarm.setComponentID (componentID);
calAlarm.setState (state);
calAlarm.setSname (ModuleName);
calAlarm.setPname (processName);
calAlarm.setPid (pid);
calAlarm.setTid (tid);
// setup message
msg1 << (ByteStream::byte) alarmID;
msg1 << (std::string) componentID;
msg1 << (ByteStream::byte) state;
msg1 << (std::string) ModuleName;
msg1 << (std::string) processName;
msg1 << (ByteStream::byte) pid;
msg1 << (ByteStream::byte) tid;
try
{
//send the msg to Process Manager
MessageQueueClient procmgr("ProcMgr_Alarm");
procmgr.write(msg1);
// shutdown connection
procmgr.shutdown();
}
catch (std::runtime_error& e)
{
LoggingID lid(11);
MessageLog ml(lid);
Message msg;
Message::Args args;
args.add("sendAlarmReport error:");
args.add(e.what());
msg.format(args);
ml.logErrorMessage(msg);
}
catch (std::exception& e)
{
LoggingID lid(11);
MessageLog ml(lid);
Message msg;
Message::Args args;
args.add("sendAlarmReport error:");
args.add(e.what());
msg.format(args);
ml.logErrorMessage(msg);
}
catch (...)
{
LoggingID lid(11);
MessageLog ml(lid);
Message msg;
Message::Args args;
args.add("sendAlarmReport error:");
args.add("general failure");
msg.format(args);
ml.logErrorMessage(msg);
}
return;
#endif //SKIP_ALARM
}
/*****************************************************************************************
* @brief processAlarmReport API
*
* purpose: Process Alarm Report
*
*****************************************************************************************/
void ALARMManager::processAlarmReport (Alarm& calAlarm)
{
// Get alarm configuration // Get alarm configuration
try { try {
configAlarm (calAlarm); configAlarm (calAlarm);
@ -464,7 +521,7 @@ void ALARMManager::sendAlarmReport (const char* componentID, int alarmID, int st
} }
return; return;
#endif //SKIP_ALARM
} }
/***************************************************************************************** /*****************************************************************************************

View File

@ -77,6 +77,8 @@ public:
std::string repProcessName = ""); std::string repProcessName = "");
EXPORT void processAlarmReport (Alarm& calAlarm);
/** @brief return active alarm list /** @brief return active alarm list
* *
* @param AlarmList the alarm map reference to store alarms * @param AlarmList the alarm map reference to store alarms

View File

@ -2028,8 +2028,8 @@ int main(int argc, char *argv[])
sysConfig->setConfig(parentProcessMonitor, "IPAddr", parentOAMModuleIPAddr); sysConfig->setConfig(parentProcessMonitor, "IPAddr", parentOAMModuleIPAddr);
sysConfig->setConfig(parentProcessMonitor, "Port", "8800"); sysConfig->setConfig(parentProcessMonitor, "Port", "8800");
sysConfig->setConfig("ProcMgr", "IPAddr", parentOAMModuleIPAddr); sysConfig->setConfig("ProcMgr", "IPAddr", parentOAMModuleIPAddr);
//sysConfig->setConfig("ProcHeartbeatControl", "IPAddr", parentOAMModuleIPAddr); sysConfig->setConfig("ProcHeartbeatControl", "IPAddr", parentOAMModuleIPAddr);
sysConfig->setConfig("ProcStatusControl", "IPAddr", parentOAMModuleIPAddr); sysConfig->setConfig("ProcMgr_Alarm", "IPAddr", parentOAMModuleIPAddr);
string parentServerMonitor = parentOAMModuleName + "_ServerMonitor"; string parentServerMonitor = parentOAMModuleName + "_ServerMonitor";
sysConfig->setConfig(parentServerMonitor, "IPAddr", parentOAMModuleIPAddr); sysConfig->setConfig(parentServerMonitor, "IPAddr", parentOAMModuleIPAddr);
string portName = parentOAMModuleName + "_WriteEngineServer"; string portName = parentOAMModuleName + "_WriteEngineServer";

View File

@ -78,6 +78,7 @@ extern bool startFailOver;
extern bool gOAMParentModuleFlag; extern bool gOAMParentModuleFlag;
static void messageThread(Configuration config); static void messageThread(Configuration config);
static void alarmMessageThread(Configuration config);
static void sigUser1Handler(int sig); static void sigUser1Handler(int sig);
static void startMgrProcessThread(); static void startMgrProcessThread();
static void hdfsActiveAlarmsPushingThread(); static void hdfsActiveAlarmsPushingThread();
@ -265,6 +266,12 @@ int main(int argc, char **argv)
if ( ret != 0 ) if ( ret != 0 )
log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR); log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR);
// create alarm message thread
pthread_t AlarmMessageThread;
ret = pthread_create (&AlarmMessageThread, NULL, (void*(*)(void*)) &alarmMessageThread, &config);
if ( ret != 0 )
log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR);
//monitor OAM Parent Module for failover //monitor OAM Parent Module for failover
while(true) while(true)
{ {
@ -351,6 +358,7 @@ int main(int argc, char **argv)
string IPaddr = (*pt1).IPAddr; string IPaddr = (*pt1).IPAddr;
sysConfig->setConfig("ProcMgr", "IPAddr", IPaddr); sysConfig->setConfig("ProcMgr", "IPAddr", IPaddr);
sysConfig->setConfig("ProcMgr_Alarm", "IPAddr", IPaddr);
log.writeLog(__LINE__, "set ProcMgr IPaddr to " + IPaddr, LOG_TYPE_DEBUG); log.writeLog(__LINE__, "set ProcMgr IPaddr to " + IPaddr, LOG_TYPE_DEBUG);
//update Calpont Config table //update Calpont Config table
@ -378,6 +386,12 @@ int main(int argc, char **argv)
int ret = pthread_create (&MessageThread, NULL, (void*(*)(void*)) &messageThread, &config); int ret = pthread_create (&MessageThread, NULL, (void*(*)(void*)) &messageThread, &config);
if ( ret != 0 ) if ( ret != 0 )
log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR); log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR);
// create alarm message thread
pthread_t AlarmMessageThread;
ret = pthread_create (&AlarmMessageThread, NULL, (void*(*)(void*)) &alarmMessageThread, &config);
if ( ret != 0 )
log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR);
} }
// //
@ -463,21 +477,102 @@ static void messageThread(Configuration config)
} }
} }
catch (exception& ex) catch (exception& ex)
{ {
string error = ex.what(); string error = ex.what();
log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueServer for ProcMgr:" + error, LOG_TYPE_ERROR); log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueServer for ProcMgr:" + error, LOG_TYPE_ERROR);
// takes 2 - 4 minites to free sockets, sleep and retry // takes 2 - 4 minites to free sockets, sleep and retry
sleep(60); sleep(60);
} }
catch(...) catch(...)
{ {
log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueServer for ProcMgr: Caught unknown exception!", LOG_TYPE_ERROR); log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueServer for ProcMgr: Caught unknown exception!", LOG_TYPE_ERROR);
// takes 2 - 4 minites to free sockets, sleep and retry // takes 2 - 4 minites to free sockets, sleep and retry
sleep(60); sleep(60);
} }
}
return;
}
/******************************************************************************************
* @brief alarmMesssageThread
*
* purpose: Read incoming alarm messages
*
******************************************************************************************/
static void alarmMessageThread(Configuration config)
{
ProcessLog log;
ProcessManager processManager(config, log);
Oam oam;
//check for running active, then launch
while(true)
{
if ( !runStandby)
break;
sleep (1);
}
log.writeLog(__LINE__, "Alarm Message Thread started ..", LOG_TYPE_DEBUG);
//read and cleanup port before trying to use
try {
Config* sysConfig = Config::makeConfig();
string port = sysConfig->getConfig("ProcMgr_Alarm", "Port");
string cmd = "fuser -k " + port + "/tcp >/dev/null 2>&1";
if ( !rootUser)
cmd = "sudo fuser -k " + port + "/tcp >/dev/null 2>&1";
system(cmd.c_str());
}
catch(...)
{
}
//
//waiting for request
//
IOSocket fIos;
for (;;)
{
try
{
MessageQueueServer procmgr("ProcMgr_Alarm");
for (;;)
{
try
{
fIos = procmgr.accept();
pthread_t alarmMessagethread;
int status = pthread_create (&alarmMessagethread, NULL, (void*(*)(void*)) &processAlarmMSG, &fIos);
if ( status != 0 )
log.writeLog(__LINE__, "alarmmessagethread: pthread_create failed, return status = " + oam.itoa(status), LOG_TYPE_ERROR);
}
catch(...)
{}
}
}
catch (exception& ex)
{
string error = ex.what();
log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueServer for ProcMgr_Alarm:" + error, LOG_TYPE_ERROR);
sleep(1);
}
catch(...)
{
log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueServer for ProcMgr_Alarm: Caught unknown exception!", LOG_TYPE_ERROR);
sleep(1);
}
} }
return; return;
} }

View File

@ -301,6 +301,79 @@ ProcessManager::~ProcessManager()
{ {
} }
/******************************************************************************************
* @brief processAlarmMSG
*
* purpose: Process the Alarm message
*
******************************************************************************************/
//void ProcessManager::processAlarmMSG( messageqcpp::IOSocket fIos, messageqcpp::ByteStream msg)
void processAlarmMSG(messageqcpp::IOSocket* cfIos)
{
messageqcpp::IOSocket afIos = *cfIos;
pthread_t ThreadId;
ThreadId = pthread_self();
ByteStream msg;
try{
msg = afIos.read();
}
catch(...)
{
pthread_detach (ThreadId);
pthread_exit(0);
}
if (msg.length() <= 0) {
afIos.close();
pthread_detach (ThreadId);
pthread_exit(0);
}
Oam oam;
ProcessLog log;
Configuration config;
ProcessManager processManager(config, log);
log.writeLog(__LINE__, "MSG RECEIVED: Process Alarm Message");
ByteStream::byte alarmID;
std::string componentID;
ByteStream::byte state;
std::string ModuleName;
std::string processName;
ByteStream::byte pid;
ByteStream::byte tid;
msg >> alarmID;
msg >> componentID;
msg >> state;
msg >> ModuleName;
msg >> processName;
msg >> pid;
msg >> tid;
Alarm calAlarm;
calAlarm.setAlarmID (alarmID);
calAlarm.setComponentID (componentID);
calAlarm.setState (state);
calAlarm.setSname (ModuleName);
calAlarm.setPname (processName);
calAlarm.setPid (pid);
calAlarm.setTid (tid);
ALARMManager aManager;
aManager.processAlarmReport(calAlarm);
afIos.close();
pthread_detach (ThreadId);
pthread_exit(0);
}
/****************************************************************************************** /******************************************************************************************
* @brief processMSG * @brief processMSG
* *
@ -2629,44 +2702,44 @@ void processMSG(messageqcpp::IOSocket* cfIos)
break; break;
} }
/*
case PROCESSALARM:
{
log.writeLog(__LINE__, "MSG RECEIVED: Process Alarm Message");
ByteStream::byte alarmID; /* case PROCESSALARM:
std::string componentID; {
ByteStream::byte state; log.writeLog(__LINE__, "MSG RECEIVED: Process Alarm Message");
std::string ModuleName;
std::string processName;
ByteStream::byte pid;
ByteStream::byte tid;
msg >> alarmID; ByteStream::byte alarmID;
msg >> componentID; std::string componentID;
msg >> state; ByteStream::byte state;
msg >> ModuleName; std::string ModuleName;
msg >> processName; std::string processName;
msg >> pid; ByteStream::byte pid;
msg >> tid; ByteStream::byte tid;
Alarm calAlarm; msg >> alarmID;
msg >> componentID;
msg >> state;
msg >> ModuleName;
msg >> processName;
msg >> pid;
msg >> tid;
calAlarm.setAlarmID (alarmID); Alarm calAlarm;
calAlarm.setComponentID (componentID);
calAlarm.setState (state);
calAlarm.setSname (ModuleName);
calAlarm.setPname (processName);
calAlarm.setPid (pid);
calAlarm.setTid (tid);
ALARMManager aManager; calAlarm.setAlarmID (alarmID);
aManager.processAlarmReport(calAlarm); calAlarm.setComponentID (componentID);
calAlarm.setState (state);
calAlarm.setSname (ModuleName);
calAlarm.setPname (processName);
calAlarm.setPid (pid);
calAlarm.setTid (tid);
break; ALARMManager aManager;
} aManager.processAlarmReport(calAlarm);
break;
}
*/ */
default: default:
log.writeLog(__LINE__, "MSG RECEIVED: Invalid type" ); log.writeLog(__LINE__, "MSG RECEIVED: Invalid type" );
break; break;
@ -8720,6 +8793,7 @@ int ProcessManager::switchParentOAMModule(std::string newActiveModuleName)
newActiveIPaddr = (*pt2).IPAddr; newActiveIPaddr = (*pt2).IPAddr;
sysConfig4->setConfig("ProcMgr", "IPAddr", newActiveIPaddr); sysConfig4->setConfig("ProcMgr", "IPAddr", newActiveIPaddr);
sysConfig4->setConfig("ProcMgr_Alarm", "IPAddr", newActiveIPaddr);
sysConfig4->setConfig("ProcStatusControl", "IPAddr", newActiveIPaddr); sysConfig4->setConfig("ProcStatusControl", "IPAddr", newActiveIPaddr);
sysConfig4->setConfig("DBRM_Controller", "IPAddr", newActiveIPaddr); sysConfig4->setConfig("DBRM_Controller", "IPAddr", newActiveIPaddr);
@ -9296,6 +9370,7 @@ int ProcessManager::OAMParentModuleChange()
localIPaddr = (*pt1).IPAddr; localIPaddr = (*pt1).IPAddr;
sysConfig4->setConfig("ProcMgr", "IPAddr", localIPaddr); sysConfig4->setConfig("ProcMgr", "IPAddr", localIPaddr);
sysConfig4->setConfig("ProcMgr_Alarm", "IPAddr", localIPaddr);
sysConfig4->setConfig("ProcStatusControl", "IPAddr", localIPaddr); sysConfig4->setConfig("ProcStatusControl", "IPAddr", localIPaddr);
sysConfig4->setConfig("DBRM_Controller", "IPAddr", localIPaddr); sysConfig4->setConfig("DBRM_Controller", "IPAddr", localIPaddr);

View File

@ -79,6 +79,8 @@ namespace processmanager{
void startModuleThread(std::string moduleName); void startModuleThread(std::string moduleName);
void stopModuleThread(std::string moduleName); void stopModuleThread(std::string moduleName);
void processMSG(messageqcpp::IOSocket* fIos); void processMSG(messageqcpp::IOSocket* fIos);
void processAlarmMSG(messageqcpp::IOSocket* fIos);
void sendUpgradeRequest(); void sendUpgradeRequest();
/** @brief Timeset for Milleseconds /** @brief Timeset for Milleseconds
@ -89,16 +91,16 @@ namespace processmanager{
{ {
std::string ModuleName; //!< Module Name std::string ModuleName; //!< Module Name
std::string ProcessName; //!< Process Name std::string ProcessName; //!< Process Name
int ID; //!< Heartbeat ID int ID; //!< Heartbeat ID
bool receiveFlag; //!< Heartbeat indication flag bool receiveFlag; //!< Heartbeat indication flag
}; };
typedef std::list<HeartBeatProc> HeartBeatProcList; typedef std::list<HeartBeatProc> HeartBeatProcList;
typedef std::map<std::string, std::string> srvStateList; typedef std::map<std::string, std::string> srvStateList;
const int MAX_ARGUMENTS = 10; const int MAX_ARGUMENTS = 10;
const std::string DEFAULT_LOG_FILE = "/var/log/mariadb/columnstore/ProcessManager.log"; const std::string DEFAULT_LOG_FILE = "/var/log/mariadb/columnstore/ProcessManager.log";
/** /**

View File

@ -339,6 +339,7 @@ int main(int argc, char **argv)
string IPaddr = (*pt1).IPAddr; string IPaddr = (*pt1).IPAddr;
sysConfig->setConfig("ProcMgr", "IPAddr", IPaddr); sysConfig->setConfig("ProcMgr", "IPAddr", IPaddr);
sysConfig->setConfig("ProcMgr_Alarm", "IPAddr", IPaddr);
log.writeLog(__LINE__, "set ProcMgr IPaddr to Old Standby Module: " + IPaddr, LOG_TYPE_DEBUG); log.writeLog(__LINE__, "set ProcMgr IPaddr to Old Standby Module: " + IPaddr, LOG_TYPE_DEBUG);
//update Calpont Config table //update Calpont Config table

View File

@ -938,6 +938,7 @@ int main(int argc, char *argv[])
sysConfigNew->setConfig(parentProcessMonitor, "IPAddr", parentOAMModuleIPAddr); sysConfigNew->setConfig(parentProcessMonitor, "IPAddr", parentOAMModuleIPAddr);
sysConfigNew->setConfig(parentProcessMonitor, "Port", "8800"); sysConfigNew->setConfig(parentProcessMonitor, "Port", "8800");
sysConfigNew->setConfig("ProcMgr", "IPAddr", parentOAMModuleIPAddr); sysConfigNew->setConfig("ProcMgr", "IPAddr", parentOAMModuleIPAddr);
sysConfigNew->setConfig("ProcMgr_Alarm", "IPAddr", parentOAMModuleIPAddr);
sysConfigNew->setConfig("ProcStatusControl", "IPAddr", parentOAMModuleIPAddr); sysConfigNew->setConfig("ProcStatusControl", "IPAddr", parentOAMModuleIPAddr);
string parentServerMonitor = systemParentOAMModuleName + "_ServerMonitor"; string parentServerMonitor = systemParentOAMModuleName + "_ServerMonitor";
sysConfigNew->setConfig(parentServerMonitor, "IPAddr", parentOAMModuleIPAddr); sysConfigNew->setConfig(parentServerMonitor, "IPAddr", parentOAMModuleIPAddr);