diff --git a/oam/etc/Columnstore.xml b/oam/etc/Columnstore.xml
index c3aa79c42..ec985028f 100644
--- a/oam/etc/Columnstore.xml
+++ b/oam/etc/Columnstore.xml
@@ -19,6 +19,10 @@
0.0.0.0
8603
+
+ 127.0.0.1
+ 8606
+
0.0.0.0
8604
diff --git a/oam/etc/Columnstore.xml.singleserver b/oam/etc/Columnstore.xml.singleserver
index 13f2b3fc7..58bb10ebd 100644
--- a/oam/etc/Columnstore.xml.singleserver
+++ b/oam/etc/Columnstore.xml.singleserver
@@ -14,6 +14,10 @@
127.0.0.1
8603
+
+ 127.0.0.1
+ 8606
+
127.0.0.1
8604
diff --git a/oamapps/alarmmanager/alarmglobal.h b/oamapps/alarmmanager/alarmglobal.h
index fd22d3d97..af01e149c 100644
--- a/oamapps/alarmmanager/alarmglobal.h
+++ b/oamapps/alarmmanager/alarmglobal.h
@@ -38,7 +38,7 @@ const std::string ACTIVE_ALARM_FILE = "/var/log/mariadb/columnstore/activeAlarms
const std::string ALARM_FILE = "/var/log/mariadb/columnstore/alarm.log";
const std::string ALARM_ARCHIVE_FILE = "/var/log/mariadb/columnstore/archive";
-const bool ALARM_DEBUG = false;
+const bool ALARM_DEBUG = true;
const uint16_t INVALID_ALARM_ID = 0;
}
diff --git a/oamapps/alarmmanager/alarmmanager.cpp b/oamapps/alarmmanager/alarmmanager.cpp
index e03f38b4d..b9ba4f702 100644
--- a/oamapps/alarmmanager/alarmmanager.cpp
+++ b/oamapps/alarmmanager/alarmmanager.cpp
@@ -28,10 +28,10 @@
#include
#include
-#include "messagequeue.h"
#include "alarmglobal.h"
#include "liboamcpp.h"
#include "installdir.h"
+#include "messagequeue.h"
using namespace std;
using namespace oam;
@@ -373,11 +373,11 @@ void configAlarm (Alarm& calAlarm)
/*****************************************************************************************
* @brief sendAlarmReport API
*
-* purpose: Process Alarm Report
+* purpose: Send Alarm Report
*
*****************************************************************************************/
void ALARMManager::sendAlarmReport (const char* componentID, int alarmID, int state,
- std::string repModuleName, std::string repProcessName)
+ std::string repModuleName, std::string repProcessName)
{
#ifdef SKIP_ALARM
@@ -438,16 +438,73 @@ void ALARMManager::sendAlarmReport (const char* componentID, int alarmID, int st
else
processName = repProcessName;
- Alarm calAlarm;
-
- calAlarm.setAlarmID (alarmID);
- calAlarm.setComponentID (componentID);
- calAlarm.setState (state);
- calAlarm.setSname (ModuleName);
- calAlarm.setPname (processName);
- calAlarm.setPid (pid);
- calAlarm.setTid (tid);
+ int returnStatus = API_SUCCESS; //default
+ ByteStream msg1;
+ // setup message
+ msg1 << (ByteStream::byte) alarmID;
+ msg1 << (std::string) componentID;
+ msg1 << (ByteStream::byte) state;
+ msg1 << (std::string) ModuleName;
+ msg1 << (std::string) processName;
+ msg1 << (ByteStream::byte) pid;
+ msg1 << (ByteStream::byte) tid;
+
+ try
+ {
+ //send the msg to Process Manager
+ MessageQueueClient procmgr("ProcMgr_Alarm");
+ procmgr.write(msg1);
+
+ // shutdown connection
+ procmgr.shutdown();
+ }
+ catch (std::runtime_error& e)
+ {
+ LoggingID lid(11);
+ MessageLog ml(lid);
+ Message msg;
+ Message::Args args;
+ args.add("sendAlarmReport error:");
+ args.add(e.what());
+ msg.format(args);
+ ml.logErrorMessage(msg);
+ }
+ catch (std::exception& e)
+ {
+ LoggingID lid(11);
+ MessageLog ml(lid);
+ Message msg;
+ Message::Args args;
+ args.add("sendAlarmReport error:");
+ args.add(e.what());
+ msg.format(args);
+ ml.logErrorMessage(msg);
+ }
+ catch (...)
+ {
+ LoggingID lid(11);
+ MessageLog ml(lid);
+ Message msg;
+ Message::Args args;
+ args.add("sendAlarmReport error:");
+ args.add("general failure");
+ msg.format(args);
+ ml.logErrorMessage(msg);
+ }
+
+ return;
+#endif //SKIP_ALARM
+}
+
+/*****************************************************************************************
+* @brief processAlarmReport API
+*
+* purpose: Process Alarm Report
+*
+*****************************************************************************************/
+void ALARMManager::processAlarmReport (Alarm& calAlarm)
+{
// Get alarm configuration
try {
configAlarm (calAlarm);
@@ -464,7 +521,7 @@ void ALARMManager::sendAlarmReport (const char* componentID, int alarmID, int st
}
return;
-#endif //SKIP_ALARM
+
}
/*****************************************************************************************
diff --git a/oamapps/alarmmanager/alarmmanager.h b/oamapps/alarmmanager/alarmmanager.h
index 28909cbb7..5f79ccf9a 100644
--- a/oamapps/alarmmanager/alarmmanager.h
+++ b/oamapps/alarmmanager/alarmmanager.h
@@ -77,6 +77,8 @@ public:
std::string repProcessName = "");
+ EXPORT void processAlarmReport (Alarm& calAlarm);
+
/** @brief return active alarm list
*
* @param AlarmList the alarm map reference to store alarms
diff --git a/oamapps/postConfigure/postConfigure.cpp b/oamapps/postConfigure/postConfigure.cpp
index a12784039..50685ae39 100644
--- a/oamapps/postConfigure/postConfigure.cpp
+++ b/oamapps/postConfigure/postConfigure.cpp
@@ -2028,8 +2028,8 @@ int main(int argc, char *argv[])
sysConfig->setConfig(parentProcessMonitor, "IPAddr", parentOAMModuleIPAddr);
sysConfig->setConfig(parentProcessMonitor, "Port", "8800");
sysConfig->setConfig("ProcMgr", "IPAddr", parentOAMModuleIPAddr);
- //sysConfig->setConfig("ProcHeartbeatControl", "IPAddr", parentOAMModuleIPAddr);
- sysConfig->setConfig("ProcStatusControl", "IPAddr", parentOAMModuleIPAddr);
+ sysConfig->setConfig("ProcHeartbeatControl", "IPAddr", parentOAMModuleIPAddr);
+ sysConfig->setConfig("ProcMgr_Alarm", "IPAddr", parentOAMModuleIPAddr);
string parentServerMonitor = parentOAMModuleName + "_ServerMonitor";
sysConfig->setConfig(parentServerMonitor, "IPAddr", parentOAMModuleIPAddr);
string portName = parentOAMModuleName + "_WriteEngineServer";
diff --git a/procmgr/main.cpp b/procmgr/main.cpp
index 01153d11f..ec90a1eeb 100644
--- a/procmgr/main.cpp
+++ b/procmgr/main.cpp
@@ -78,6 +78,7 @@ extern bool startFailOver;
extern bool gOAMParentModuleFlag;
static void messageThread(Configuration config);
+static void alarmMessageThread(Configuration config);
static void sigUser1Handler(int sig);
static void startMgrProcessThread();
static void hdfsActiveAlarmsPushingThread();
@@ -265,6 +266,12 @@ int main(int argc, char **argv)
if ( ret != 0 )
log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR);
+ // create alarm message thread
+ pthread_t AlarmMessageThread;
+ ret = pthread_create (&AlarmMessageThread, NULL, (void*(*)(void*)) &alarmMessageThread, &config);
+ if ( ret != 0 )
+ log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR);
+
//monitor OAM Parent Module for failover
while(true)
{
@@ -351,6 +358,7 @@ int main(int argc, char **argv)
string IPaddr = (*pt1).IPAddr;
sysConfig->setConfig("ProcMgr", "IPAddr", IPaddr);
+ sysConfig->setConfig("ProcMgr_Alarm", "IPAddr", IPaddr);
log.writeLog(__LINE__, "set ProcMgr IPaddr to " + IPaddr, LOG_TYPE_DEBUG);
//update Calpont Config table
@@ -378,6 +386,12 @@ int main(int argc, char **argv)
int ret = pthread_create (&MessageThread, NULL, (void*(*)(void*)) &messageThread, &config);
if ( ret != 0 )
log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR);
+
+ // create alarm message thread
+ pthread_t AlarmMessageThread;
+ ret = pthread_create (&AlarmMessageThread, NULL, (void*(*)(void*)) &alarmMessageThread, &config);
+ if ( ret != 0 )
+ log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR);
}
//
@@ -463,21 +477,102 @@ static void messageThread(Configuration config)
}
}
- catch (exception& ex)
- {
- string error = ex.what();
- log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueServer for ProcMgr:" + error, LOG_TYPE_ERROR);
+ catch (exception& ex)
+ {
+ string error = ex.what();
+ log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueServer for ProcMgr:" + error, LOG_TYPE_ERROR);
- // takes 2 - 4 minites to free sockets, sleep and retry
- sleep(60);
- }
- catch(...)
- {
- log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueServer for ProcMgr: Caught unknown exception!", LOG_TYPE_ERROR);
+ // takes 2 - 4 minites to free sockets, sleep and retry
+ sleep(60);
+ }
+ catch(...)
+ {
+ log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueServer for ProcMgr: Caught unknown exception!", LOG_TYPE_ERROR);
- // takes 2 - 4 minites to free sockets, sleep and retry
- sleep(60);
- }
+ // takes 2 - 4 minites to free sockets, sleep and retry
+ sleep(60);
+ }
+ }
+ return;
+}
+
+/******************************************************************************************
+* @brief alarmMesssageThread
+*
+* purpose: Read incoming alarm messages
+*
+******************************************************************************************/
+static void alarmMessageThread(Configuration config)
+{
+ ProcessLog log;
+ ProcessManager processManager(config, log);
+ Oam oam;
+
+ //check for running active, then launch
+ while(true)
+ {
+ if ( !runStandby)
+ break;
+ sleep (1);
+ }
+
+ log.writeLog(__LINE__, "Alarm Message Thread started ..", LOG_TYPE_DEBUG);
+
+ //read and cleanup port before trying to use
+ try {
+ Config* sysConfig = Config::makeConfig();
+ string port = sysConfig->getConfig("ProcMgr_Alarm", "Port");
+ string cmd = "fuser -k " + port + "/tcp >/dev/null 2>&1";
+ if ( !rootUser)
+ cmd = "sudo fuser -k " + port + "/tcp >/dev/null 2>&1";
+
+
+ system(cmd.c_str());
+ }
+ catch(...)
+ {
+ }
+
+ //
+ //waiting for request
+ //
+ IOSocket fIos;
+
+ for (;;)
+ {
+ try
+ {
+ MessageQueueServer procmgr("ProcMgr_Alarm");
+ for (;;)
+ {
+ try
+ {
+ fIos = procmgr.accept();
+
+ pthread_t alarmMessagethread;
+ int status = pthread_create (&alarmMessagethread, NULL, (void*(*)(void*)) &processAlarmMSG, &fIos);
+
+ if ( status != 0 )
+ log.writeLog(__LINE__, "alarmmessagethread: pthread_create failed, return status = " + oam.itoa(status), LOG_TYPE_ERROR);
+ }
+ catch(...)
+ {}
+
+ }
+ }
+ catch (exception& ex)
+ {
+ string error = ex.what();
+ log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueServer for ProcMgr_Alarm:" + error, LOG_TYPE_ERROR);
+
+ sleep(1);
+ }
+ catch(...)
+ {
+ log.writeLog(__LINE__, "EXCEPTION ERROR on MessageQueueServer for ProcMgr_Alarm: Caught unknown exception!", LOG_TYPE_ERROR);
+
+ sleep(1);
+ }
}
return;
}
diff --git a/procmgr/processmanager.cpp b/procmgr/processmanager.cpp
index c4abe6793..24fc09f72 100755
--- a/procmgr/processmanager.cpp
+++ b/procmgr/processmanager.cpp
@@ -301,6 +301,79 @@ ProcessManager::~ProcessManager()
{
}
+/******************************************************************************************
+* @brief processAlarmMSG
+*
+* purpose: Process the Alarm message
+*
+******************************************************************************************/
+//void ProcessManager::processAlarmMSG( messageqcpp::IOSocket fIos, messageqcpp::ByteStream msg)
+void processAlarmMSG(messageqcpp::IOSocket* cfIos)
+{
+ messageqcpp::IOSocket afIos = *cfIos;
+
+ pthread_t ThreadId;
+ ThreadId = pthread_self();
+
+ ByteStream msg;
+
+ try{
+ msg = afIos.read();
+ }
+ catch(...)
+ {
+ pthread_detach (ThreadId);
+ pthread_exit(0);
+ }
+
+ if (msg.length() <= 0) {
+ afIos.close();
+ pthread_detach (ThreadId);
+ pthread_exit(0);
+ }
+
+ Oam oam;
+ ProcessLog log;
+
+ Configuration config;
+ ProcessManager processManager(config, log);
+
+ log.writeLog(__LINE__, "MSG RECEIVED: Process Alarm Message");
+
+ ByteStream::byte alarmID;
+ std::string componentID;
+ ByteStream::byte state;
+ std::string ModuleName;
+ std::string processName;
+ ByteStream::byte pid;
+ ByteStream::byte tid;
+
+ msg >> alarmID;
+ msg >> componentID;
+ msg >> state;
+ msg >> ModuleName;
+ msg >> processName;
+ msg >> pid;
+ msg >> tid;
+
+ Alarm calAlarm;
+
+ calAlarm.setAlarmID (alarmID);
+ calAlarm.setComponentID (componentID);
+ calAlarm.setState (state);
+ calAlarm.setSname (ModuleName);
+ calAlarm.setPname (processName);
+ calAlarm.setPid (pid);
+ calAlarm.setTid (tid);
+
+ ALARMManager aManager;
+ aManager.processAlarmReport(calAlarm);
+
+ afIos.close();
+ pthread_detach (ThreadId);
+ pthread_exit(0);
+}
+
/******************************************************************************************
* @brief processMSG
*
@@ -2629,44 +2702,44 @@ void processMSG(messageqcpp::IOSocket* cfIos)
break;
}
-/*
- case PROCESSALARM:
- {
- log.writeLog(__LINE__, "MSG RECEIVED: Process Alarm Message");
- ByteStream::byte alarmID;
- std::string componentID;
- ByteStream::byte state;
- std::string ModuleName;
- std::string processName;
- ByteStream::byte pid;
- ByteStream::byte tid;
+/* case PROCESSALARM:
+ {
+ log.writeLog(__LINE__, "MSG RECEIVED: Process Alarm Message");
- msg >> alarmID;
- msg >> componentID;
- msg >> state;
- msg >> ModuleName;
- msg >> processName;
- msg >> pid;
- msg >> tid;
+ ByteStream::byte alarmID;
+ std::string componentID;
+ ByteStream::byte state;
+ std::string ModuleName;
+ std::string processName;
+ ByteStream::byte pid;
+ ByteStream::byte tid;
- Alarm calAlarm;
+ msg >> alarmID;
+ msg >> componentID;
+ msg >> state;
+ msg >> ModuleName;
+ msg >> processName;
+ msg >> pid;
+ msg >> tid;
- calAlarm.setAlarmID (alarmID);
- calAlarm.setComponentID (componentID);
- calAlarm.setState (state);
- calAlarm.setSname (ModuleName);
- calAlarm.setPname (processName);
- calAlarm.setPid (pid);
- calAlarm.setTid (tid);
+ Alarm calAlarm;
- ALARMManager aManager;
- aManager.processAlarmReport(calAlarm);
+ calAlarm.setAlarmID (alarmID);
+ calAlarm.setComponentID (componentID);
+ calAlarm.setState (state);
+ calAlarm.setSname (ModuleName);
+ calAlarm.setPname (processName);
+ calAlarm.setPid (pid);
+ calAlarm.setTid (tid);
- break;
- }
+ ALARMManager aManager;
+ aManager.processAlarmReport(calAlarm);
+ break;
+ }
*/
+
default:
log.writeLog(__LINE__, "MSG RECEIVED: Invalid type" );
break;
@@ -8720,6 +8793,7 @@ int ProcessManager::switchParentOAMModule(std::string newActiveModuleName)
newActiveIPaddr = (*pt2).IPAddr;
sysConfig4->setConfig("ProcMgr", "IPAddr", newActiveIPaddr);
+ sysConfig4->setConfig("ProcMgr_Alarm", "IPAddr", newActiveIPaddr);
sysConfig4->setConfig("ProcStatusControl", "IPAddr", newActiveIPaddr);
sysConfig4->setConfig("DBRM_Controller", "IPAddr", newActiveIPaddr);
@@ -9296,6 +9370,7 @@ int ProcessManager::OAMParentModuleChange()
localIPaddr = (*pt1).IPAddr;
sysConfig4->setConfig("ProcMgr", "IPAddr", localIPaddr);
+ sysConfig4->setConfig("ProcMgr_Alarm", "IPAddr", localIPaddr);
sysConfig4->setConfig("ProcStatusControl", "IPAddr", localIPaddr);
sysConfig4->setConfig("DBRM_Controller", "IPAddr", localIPaddr);
diff --git a/procmgr/processmanager.h b/procmgr/processmanager.h
index f1dd878ee..4339add26 100644
--- a/procmgr/processmanager.h
+++ b/procmgr/processmanager.h
@@ -79,6 +79,8 @@ namespace processmanager{
void startModuleThread(std::string moduleName);
void stopModuleThread(std::string moduleName);
void processMSG(messageqcpp::IOSocket* fIos);
+ void processAlarmMSG(messageqcpp::IOSocket* fIos);
+
void sendUpgradeRequest();
/** @brief Timeset for Milleseconds
@@ -89,16 +91,16 @@ namespace processmanager{
{
std::string ModuleName; //!< Module Name
std::string ProcessName; //!< Process Name
- int ID; //!< Heartbeat ID
- bool receiveFlag; //!< Heartbeat indication flag
- };
+ int ID; //!< Heartbeat ID
+ bool receiveFlag; //!< Heartbeat indication flag
+ };
- typedef std::list HeartBeatProcList;
+ typedef std::list HeartBeatProcList;
- typedef std::map srvStateList;
+ typedef std::map srvStateList;
- const int MAX_ARGUMENTS = 10;
- const std::string DEFAULT_LOG_FILE = "/var/log/mariadb/columnstore/ProcessManager.log";
+ const int MAX_ARGUMENTS = 10;
+ const std::string DEFAULT_LOG_FILE = "/var/log/mariadb/columnstore/ProcessManager.log";
/**
diff --git a/procmon/main.cpp b/procmon/main.cpp
index 2f9cbce18..c1c4dd424 100644
--- a/procmon/main.cpp
+++ b/procmon/main.cpp
@@ -339,6 +339,7 @@ int main(int argc, char **argv)
string IPaddr = (*pt1).IPAddr;
sysConfig->setConfig("ProcMgr", "IPAddr", IPaddr);
+ sysConfig->setConfig("ProcMgr_Alarm", "IPAddr", IPaddr);
log.writeLog(__LINE__, "set ProcMgr IPaddr to Old Standby Module: " + IPaddr, LOG_TYPE_DEBUG);
//update Calpont Config table
diff --git a/tools/configMgt/autoConfigure.cpp b/tools/configMgt/autoConfigure.cpp
index 3e9ad5b34..1e7f98fdd 100755
--- a/tools/configMgt/autoConfigure.cpp
+++ b/tools/configMgt/autoConfigure.cpp
@@ -938,6 +938,7 @@ int main(int argc, char *argv[])
sysConfigNew->setConfig(parentProcessMonitor, "IPAddr", parentOAMModuleIPAddr);
sysConfigNew->setConfig(parentProcessMonitor, "Port", "8800");
sysConfigNew->setConfig("ProcMgr", "IPAddr", parentOAMModuleIPAddr);
+ sysConfigNew->setConfig("ProcMgr_Alarm", "IPAddr", parentOAMModuleIPAddr);
sysConfigNew->setConfig("ProcStatusControl", "IPAddr", parentOAMModuleIPAddr);
string parentServerMonitor = systemParentOAMModuleName + "_ServerMonitor";
sysConfigNew->setConfig(parentServerMonitor, "IPAddr", parentOAMModuleIPAddr);