From b4e64f24f338a8156fb1d6d6fd29c11f68f0e9eb Mon Sep 17 00:00:00 2001 From: david hill Date: Fri, 2 Feb 2018 09:04:07 -0600 Subject: [PATCH] MCOL-1185 MCOL-436 -send alarms to procmgr to be processed --- oam/etc/Columnstore.xml | 4 + oam/etc/Columnstore.xml.singleserver | 4 + oamapps/alarmmanager/alarmglobal.h | 2 +- oamapps/alarmmanager/alarmmanager.cpp | 83 ++++++++++++--- oamapps/alarmmanager/alarmmanager.h | 2 + oamapps/postConfigure/postConfigure.cpp | 4 +- procmgr/main.cpp | 121 ++++++++++++++++++--- procmgr/processmanager.cpp | 135 ++++++++++++++++++------ procmgr/processmanager.h | 16 +-- procmon/main.cpp | 1 + tools/configMgt/autoConfigure.cpp | 1 + 11 files changed, 307 insertions(+), 66 deletions(-) diff --git a/oam/etc/Columnstore.xml b/oam/etc/Columnstore.xml index c3aa79c42..ec985028f 100644 --- a/oam/etc/Columnstore.xml +++ b/oam/etc/Columnstore.xml @@ -19,6 +19,10 @@ 0.0.0.0 8603 + + 127.0.0.1 + 8606 + 0.0.0.0 8604 diff --git a/oam/etc/Columnstore.xml.singleserver b/oam/etc/Columnstore.xml.singleserver index 13f2b3fc7..58bb10ebd 100644 --- a/oam/etc/Columnstore.xml.singleserver +++ b/oam/etc/Columnstore.xml.singleserver @@ -14,6 +14,10 @@ 127.0.0.1 8603 + + 127.0.0.1 + 8606 + 127.0.0.1 8604 diff --git a/oamapps/alarmmanager/alarmglobal.h b/oamapps/alarmmanager/alarmglobal.h index fd22d3d97..af01e149c 100644 --- a/oamapps/alarmmanager/alarmglobal.h +++ b/oamapps/alarmmanager/alarmglobal.h @@ -38,7 +38,7 @@ const std::string ACTIVE_ALARM_FILE = "/var/log/mariadb/columnstore/activeAlarms const std::string ALARM_FILE = "/var/log/mariadb/columnstore/alarm.log"; const std::string ALARM_ARCHIVE_FILE = "/var/log/mariadb/columnstore/archive"; -const bool ALARM_DEBUG = false; +const bool ALARM_DEBUG = true; const uint16_t INVALID_ALARM_ID = 0; } diff --git a/oamapps/alarmmanager/alarmmanager.cpp b/oamapps/alarmmanager/alarmmanager.cpp index e03f38b4d..b9ba4f702 100644 --- a/oamapps/alarmmanager/alarmmanager.cpp +++ b/oamapps/alarmmanager/alarmmanager.cpp @@ -28,10 +28,10 @@ #include #include -#include "messagequeue.h" #include "alarmglobal.h" #include "liboamcpp.h" #include "installdir.h" +#include "messagequeue.h" using namespace std; using namespace oam; @@ -373,11 +373,11 @@ void configAlarm (Alarm& calAlarm) /***************************************************************************************** * @brief sendAlarmReport API * -* purpose: Process Alarm Report +* purpose: Send Alarm Report * *****************************************************************************************/ void ALARMManager::sendAlarmReport (const char* componentID, int alarmID, int state, - std::string repModuleName, std::string repProcessName) + std::string repModuleName, std::string repProcessName) { #ifdef SKIP_ALARM @@ -438,16 +438,73 @@ void ALARMManager::sendAlarmReport (const char* componentID, int alarmID, int st else processName = repProcessName; - Alarm calAlarm; - - calAlarm.setAlarmID (alarmID); - calAlarm.setComponentID (componentID); - calAlarm.setState (state); - calAlarm.setSname (ModuleName); - calAlarm.setPname (processName); - calAlarm.setPid (pid); - calAlarm.setTid (tid); + int returnStatus = API_SUCCESS; //default + ByteStream msg1; + // setup message + msg1 << (ByteStream::byte) alarmID; + msg1 << (std::string) componentID; + msg1 << (ByteStream::byte) state; + msg1 << (std::string) ModuleName; + msg1 << (std::string) processName; + msg1 << (ByteStream::byte) pid; + msg1 << (ByteStream::byte) tid; + + try + { + //send the msg to Process Manager + MessageQueueClient procmgr("ProcMgr_Alarm"); + procmgr.write(msg1); + + // shutdown connection + procmgr.shutdown(); + } + catch (std::runtime_error& e) + { + LoggingID lid(11); + MessageLog ml(lid); + Message msg; + Message::Args args; + args.add("sendAlarmReport error:"); + args.add(e.what()); + msg.format(args); + ml.logErrorMessage(msg); + } + catch (std::exception& e) + { + LoggingID lid(11); + MessageLog ml(lid); + Message msg; + Message::Args args; + args.add("sendAlarmReport error:"); + args.add(e.what()); + msg.format(args); + ml.logErrorMessage(msg); + } + catch (...) + { + LoggingID lid(11); + MessageLog ml(lid); + Message msg; + Message::Args args; + args.add("sendAlarmReport error:"); + args.add("general failure"); + msg.format(args); + ml.logErrorMessage(msg); + } + + return; +#endif //SKIP_ALARM +} + +/***************************************************************************************** +* @brief processAlarmReport API +* +* purpose: Process Alarm Report +* +*****************************************************************************************/ +void ALARMManager::processAlarmReport (Alarm& calAlarm) +{ // Get alarm configuration try { configAlarm (calAlarm); @@ -464,7 +521,7 @@ void ALARMManager::sendAlarmReport (const char* componentID, int alarmID, int st } return; -#endif //SKIP_ALARM + } /***************************************************************************************** diff --git a/oamapps/alarmmanager/alarmmanager.h b/oamapps/alarmmanager/alarmmanager.h index 28909cbb7..5f79ccf9a 100644 --- a/oamapps/alarmmanager/alarmmanager.h +++ b/oamapps/alarmmanager/alarmmanager.h @@ -77,6 +77,8 @@ public: std::string repProcessName = ""); + EXPORT void processAlarmReport (Alarm& calAlarm); + /** @brief return active alarm list * * @param AlarmList the alarm map reference to store alarms diff --git a/oamapps/postConfigure/postConfigure.cpp b/oamapps/postConfigure/postConfigure.cpp index a12784039..50685ae39 100644 --- a/oamapps/postConfigure/postConfigure.cpp +++ b/oamapps/postConfigure/postConfigure.cpp @@ -2028,8 +2028,8 @@ int main(int argc, char *argv[]) sysConfig->setConfig(parentProcessMonitor, "IPAddr", parentOAMModuleIPAddr); sysConfig->setConfig(parentProcessMonitor, "Port", "8800"); sysConfig->setConfig("ProcMgr", "IPAddr", parentOAMModuleIPAddr); - //sysConfig->setConfig("ProcHeartbeatControl", "IPAddr", parentOAMModuleIPAddr); - sysConfig->setConfig("ProcStatusControl", "IPAddr", parentOAMModuleIPAddr); + sysConfig->setConfig("ProcHeartbeatControl", "IPAddr", parentOAMModuleIPAddr); + sysConfig->setConfig("ProcMgr_Alarm", "IPAddr", parentOAMModuleIPAddr); string parentServerMonitor = parentOAMModuleName + "_ServerMonitor"; sysConfig->setConfig(parentServerMonitor, "IPAddr", parentOAMModuleIPAddr); string portName = parentOAMModuleName + "_WriteEngineServer"; diff --git a/procmgr/main.cpp b/procmgr/main.cpp index 01153d11f..ec90a1eeb 100644 --- a/procmgr/main.cpp +++ b/procmgr/main.cpp @@ -78,6 +78,7 @@ extern bool startFailOver; extern bool gOAMParentModuleFlag; static void messageThread(Configuration config); +static void alarmMessageThread(Configuration config); static void sigUser1Handler(int sig); static void startMgrProcessThread(); static void hdfsActiveAlarmsPushingThread(); @@ -265,6 +266,12 @@ int main(int argc, char **argv) if ( ret != 0 ) log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR); + // create alarm message thread + pthread_t AlarmMessageThread; + ret = pthread_create (&AlarmMessageThread, NULL, (void*(*)(void*)) &alarmMessageThread, &config); + if ( ret != 0 ) + log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR); + //monitor OAM Parent Module for failover while(true) { @@ -351,6 +358,7 @@ int main(int argc, char **argv) string IPaddr = (*pt1).IPAddr; sysConfig->setConfig("ProcMgr", "IPAddr", IPaddr); + sysConfig->setConfig("ProcMgr_Alarm", "IPAddr", IPaddr); log.writeLog(__LINE__, "set ProcMgr IPaddr to " + IPaddr, LOG_TYPE_DEBUG); //update Calpont Config table @@ -378,6 +386,12 @@ int main(int argc, char **argv) int ret = pthread_create (&MessageThread, NULL, (void*(*)(void*)) &messageThread, &config); if ( ret != 0 ) log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR); + + // create alarm message thread + pthread_t AlarmMessageThread; + ret = pthread_create (&AlarmMessageThread, NULL, (void*(*)(void*)) &alarmMessageThread, &config); + if ( ret != 0 ) + log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR); } // @@ -463,21 +477,102 @@ static void messageThread(Configuration config) } } - catch (exception& ex) - { - string error = ex.what(); - log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueServer for ProcMgr:" + error, LOG_TYPE_ERROR); + catch (exception& ex) + { + string error = ex.what(); + log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueServer for ProcMgr:" + error, LOG_TYPE_ERROR); - // takes 2 - 4 minites to free sockets, sleep and retry - sleep(60); - } - catch(...) - { - log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueServer for ProcMgr: Caught unknown exception!", LOG_TYPE_ERROR); + // takes 2 - 4 minites to free sockets, sleep and retry + sleep(60); + } + catch(...) + { + log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueServer for ProcMgr: Caught unknown exception!", LOG_TYPE_ERROR); - // takes 2 - 4 minites to free sockets, sleep and retry - sleep(60); - } + // takes 2 - 4 minites to free sockets, sleep and retry + sleep(60); + } + } + return; +} + +/****************************************************************************************** +* @brief alarmMesssageThread +* +* purpose: Read incoming alarm messages +* +******************************************************************************************/ +static void alarmMessageThread(Configuration config) +{ + ProcessLog log; + ProcessManager processManager(config, log); + Oam oam; + + //check for running active, then launch + while(true) + { + if ( !runStandby) + break; + sleep (1); + } + + log.writeLog(__LINE__, "Alarm Message Thread started ..", LOG_TYPE_DEBUG); + + //read and cleanup port before trying to use + try { + Config* sysConfig = Config::makeConfig(); + string port = sysConfig->getConfig("ProcMgr_Alarm", "Port"); + string cmd = "fuser -k " + port + "/tcp >/dev/null 2>&1"; + if ( !rootUser) + cmd = "sudo fuser -k " + port + "/tcp >/dev/null 2>&1"; + + + system(cmd.c_str()); + } + catch(...) + { + } + + // + //waiting for request + // + IOSocket fIos; + + for (;;) + { + try + { + MessageQueueServer procmgr("ProcMgr_Alarm"); + for (;;) + { + try + { + fIos = procmgr.accept(); + + pthread_t alarmMessagethread; + int status = pthread_create (&alarmMessagethread, NULL, (void*(*)(void*)) &processAlarmMSG, &fIos); + + if ( status != 0 ) + log.writeLog(__LINE__, "alarmmessagethread: pthread_create failed, return status = " + oam.itoa(status), LOG_TYPE_ERROR); + } + catch(...) + {} + + } + } + catch (exception& ex) + { + string error = ex.what(); + log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueServer for ProcMgr_Alarm:" + error, LOG_TYPE_ERROR); + + sleep(1); + } + catch(...) + { + log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueServer for ProcMgr_Alarm: Caught unknown exception!", LOG_TYPE_ERROR); + + sleep(1); + } } return; } diff --git a/procmgr/processmanager.cpp b/procmgr/processmanager.cpp index c4abe6793..24fc09f72 100755 --- a/procmgr/processmanager.cpp +++ b/procmgr/processmanager.cpp @@ -301,6 +301,79 @@ ProcessManager::~ProcessManager() { } +/****************************************************************************************** +* @brief processAlarmMSG +* +* purpose: Process the Alarm message +* +******************************************************************************************/ +//void ProcessManager::processAlarmMSG( messageqcpp::IOSocket fIos, messageqcpp::ByteStream msg) +void processAlarmMSG(messageqcpp::IOSocket* cfIos) +{ + messageqcpp::IOSocket afIos = *cfIos; + + pthread_t ThreadId; + ThreadId = pthread_self(); + + ByteStream msg; + + try{ + msg = afIos.read(); + } + catch(...) + { + pthread_detach (ThreadId); + pthread_exit(0); + } + + if (msg.length() <= 0) { + afIos.close(); + pthread_detach (ThreadId); + pthread_exit(0); + } + + Oam oam; + ProcessLog log; + + Configuration config; + ProcessManager processManager(config, log); + + log.writeLog(__LINE__, "MSG RECEIVED: Process Alarm Message"); + + ByteStream::byte alarmID; + std::string componentID; + ByteStream::byte state; + std::string ModuleName; + std::string processName; + ByteStream::byte pid; + ByteStream::byte tid; + + msg >> alarmID; + msg >> componentID; + msg >> state; + msg >> ModuleName; + msg >> processName; + msg >> pid; + msg >> tid; + + Alarm calAlarm; + + calAlarm.setAlarmID (alarmID); + calAlarm.setComponentID (componentID); + calAlarm.setState (state); + calAlarm.setSname (ModuleName); + calAlarm.setPname (processName); + calAlarm.setPid (pid); + calAlarm.setTid (tid); + + ALARMManager aManager; + aManager.processAlarmReport(calAlarm); + + afIos.close(); + pthread_detach (ThreadId); + pthread_exit(0); +} + /****************************************************************************************** * @brief processMSG * @@ -2629,44 +2702,44 @@ void processMSG(messageqcpp::IOSocket* cfIos) break; } -/* - case PROCESSALARM: - { - log.writeLog(__LINE__, "MSG RECEIVED: Process Alarm Message"); - ByteStream::byte alarmID; - std::string componentID; - ByteStream::byte state; - std::string ModuleName; - std::string processName; - ByteStream::byte pid; - ByteStream::byte tid; +/* case PROCESSALARM: + { + log.writeLog(__LINE__, "MSG RECEIVED: Process Alarm Message"); - msg >> alarmID; - msg >> componentID; - msg >> state; - msg >> ModuleName; - msg >> processName; - msg >> pid; - msg >> tid; + ByteStream::byte alarmID; + std::string componentID; + ByteStream::byte state; + std::string ModuleName; + std::string processName; + ByteStream::byte pid; + ByteStream::byte tid; - Alarm calAlarm; + msg >> alarmID; + msg >> componentID; + msg >> state; + msg >> ModuleName; + msg >> processName; + msg >> pid; + msg >> tid; - calAlarm.setAlarmID (alarmID); - calAlarm.setComponentID (componentID); - calAlarm.setState (state); - calAlarm.setSname (ModuleName); - calAlarm.setPname (processName); - calAlarm.setPid (pid); - calAlarm.setTid (tid); + Alarm calAlarm; - ALARMManager aManager; - aManager.processAlarmReport(calAlarm); + calAlarm.setAlarmID (alarmID); + calAlarm.setComponentID (componentID); + calAlarm.setState (state); + calAlarm.setSname (ModuleName); + calAlarm.setPname (processName); + calAlarm.setPid (pid); + calAlarm.setTid (tid); - break; - } + ALARMManager aManager; + aManager.processAlarmReport(calAlarm); + break; + } */ + default: log.writeLog(__LINE__, "MSG RECEIVED: Invalid type" ); break; @@ -8720,6 +8793,7 @@ int ProcessManager::switchParentOAMModule(std::string newActiveModuleName) newActiveIPaddr = (*pt2).IPAddr; sysConfig4->setConfig("ProcMgr", "IPAddr", newActiveIPaddr); + sysConfig4->setConfig("ProcMgr_Alarm", "IPAddr", newActiveIPaddr); sysConfig4->setConfig("ProcStatusControl", "IPAddr", newActiveIPaddr); sysConfig4->setConfig("DBRM_Controller", "IPAddr", newActiveIPaddr); @@ -9296,6 +9370,7 @@ int ProcessManager::OAMParentModuleChange() localIPaddr = (*pt1).IPAddr; sysConfig4->setConfig("ProcMgr", "IPAddr", localIPaddr); + sysConfig4->setConfig("ProcMgr_Alarm", "IPAddr", localIPaddr); sysConfig4->setConfig("ProcStatusControl", "IPAddr", localIPaddr); sysConfig4->setConfig("DBRM_Controller", "IPAddr", localIPaddr); diff --git a/procmgr/processmanager.h b/procmgr/processmanager.h index f1dd878ee..4339add26 100644 --- a/procmgr/processmanager.h +++ b/procmgr/processmanager.h @@ -79,6 +79,8 @@ namespace processmanager{ void startModuleThread(std::string moduleName); void stopModuleThread(std::string moduleName); void processMSG(messageqcpp::IOSocket* fIos); + void processAlarmMSG(messageqcpp::IOSocket* fIos); + void sendUpgradeRequest(); /** @brief Timeset for Milleseconds @@ -89,16 +91,16 @@ namespace processmanager{ { std::string ModuleName; //!< Module Name std::string ProcessName; //!< Process Name - int ID; //!< Heartbeat ID - bool receiveFlag; //!< Heartbeat indication flag - }; + int ID; //!< Heartbeat ID + bool receiveFlag; //!< Heartbeat indication flag + }; - typedef std::list HeartBeatProcList; + typedef std::list HeartBeatProcList; - typedef std::map srvStateList; + typedef std::map srvStateList; - const int MAX_ARGUMENTS = 10; - const std::string DEFAULT_LOG_FILE = "/var/log/mariadb/columnstore/ProcessManager.log"; + const int MAX_ARGUMENTS = 10; + const std::string DEFAULT_LOG_FILE = "/var/log/mariadb/columnstore/ProcessManager.log"; /** diff --git a/procmon/main.cpp b/procmon/main.cpp index 2f9cbce18..c1c4dd424 100644 --- a/procmon/main.cpp +++ b/procmon/main.cpp @@ -339,6 +339,7 @@ int main(int argc, char **argv) string IPaddr = (*pt1).IPAddr; sysConfig->setConfig("ProcMgr", "IPAddr", IPaddr); + sysConfig->setConfig("ProcMgr_Alarm", "IPAddr", IPaddr); log.writeLog(__LINE__, "set ProcMgr IPaddr to Old Standby Module: " + IPaddr, LOG_TYPE_DEBUG); //update Calpont Config table diff --git a/tools/configMgt/autoConfigure.cpp b/tools/configMgt/autoConfigure.cpp index 3e9ad5b34..1e7f98fdd 100755 --- a/tools/configMgt/autoConfigure.cpp +++ b/tools/configMgt/autoConfigure.cpp @@ -938,6 +938,7 @@ int main(int argc, char *argv[]) sysConfigNew->setConfig(parentProcessMonitor, "IPAddr", parentOAMModuleIPAddr); sysConfigNew->setConfig(parentProcessMonitor, "Port", "8800"); sysConfigNew->setConfig("ProcMgr", "IPAddr", parentOAMModuleIPAddr); + sysConfigNew->setConfig("ProcMgr_Alarm", "IPAddr", parentOAMModuleIPAddr); sysConfigNew->setConfig("ProcStatusControl", "IPAddr", parentOAMModuleIPAddr); string parentServerMonitor = systemParentOAMModuleName + "_ServerMonitor"; sysConfigNew->setConfig(parentServerMonitor, "IPAddr", parentOAMModuleIPAddr);