1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Fix pthread worker funcs to match the POSIX threads API

This commit is contained in:
Alexey Antipovsky
2020-11-11 05:35:06 +00:00
parent 83d3adf466
commit da691f7b7a
6 changed files with 104 additions and 87 deletions

View File

@ -71,10 +71,11 @@ typedef boost::tuple<ChildModuleList::iterator, string > threadInfo_t;
bool LOCAL = false;
void childReportThread(threadInfo_t& st)
void* childReportThread(threadInfo_t* st)
{
ChildModuleList::iterator& list = boost::get<0>(st);
string reportType = boost::get<1>(st);
assert(st);
ChildModuleList::iterator& list = boost::get<0>(*st);
string reportType = boost::get<1>(*st);
string remoteModuleName = (*list).moduleName;
string remoteModuleIP = (*list).moduleIP;
@ -144,9 +145,10 @@ void childReportThread(threadInfo_t& st)
pthread_exit(0);
}
void reportThread(string reporttype)
void* reportThread(string* reporttype)
{
string reportType = reporttype;
assert(reporttype);
string reportType = *reporttype;
Oam oam;

View File

@ -81,8 +81,8 @@ extern vector<string> downModuleList;
extern bool startFailOver;
extern bool gOAMParentModuleFlag;
static void messageThread(Configuration config);
static void alarmMessageThread(Configuration config);
static void* messageThread(Configuration* config);
static void* alarmMessageThread(Configuration* config);
static void sigUser1Handler(int sig);
static void startMgrProcessThread();
static void hdfsActiveAlarmsPushingThread();
@ -462,10 +462,11 @@ int main(int argc, char** argv)
* purpose: Read incoming messages
*
******************************************************************************************/
static void messageThread(Configuration config)
static void* messageThread(Configuration* config)
{
ProcessLog log;
ProcessManager processManager(config, log);
assert(config);
ProcessManager processManager(*config, log);
Oam oam;
//check for running active, then launch
@ -536,6 +537,7 @@ static void messageThread(Configuration config)
sleep(60);
}
}
return NULL;
}
/******************************************************************************************
@ -544,10 +546,11 @@ static void messageThread(Configuration config)
* purpose: Read incoming alarm messages
*
******************************************************************************************/
static void alarmMessageThread(Configuration config)
static void* alarmMessageThread(Configuration* config)
{
ProcessLog log;
ProcessManager processManager(config, log);
assert(config);
ProcessManager processManager(*config, log);
Oam oam;
ByteStream msg;
@ -677,6 +680,7 @@ static void alarmMessageThread(Configuration config)
sleep(1);
}
}
return NULL;
}
/******************************************************************************************

View File

@ -345,7 +345,7 @@ ProcessManager::~ProcessManager()
*
******************************************************************************************/
//void ProcessManager::processMSG( messageqcpp::IOSocket fIos, messageqcpp::ByteStream msg)
void processMSG(messageqcpp::IOSocket* cfIos)
void* processMSG(messageqcpp::IOSocket* cfIos)
{
messageqcpp::IOSocket fIos = *cfIos;
@ -3382,6 +3382,7 @@ void processMSG(messageqcpp::IOSocket* cfIos)
fIos.close();
pthread_detach (ThreadId);
pthread_exit(0);
return NULL;
}
/******************************************************************************************
@ -6800,9 +6801,10 @@ bool ProcessManager::createModuleFile(string remoteModuleName)
* purpose: Send Messages to Module Process Monitors to start Processes
*
*****************************************************************************************/
void startSystemThread(oam::DeviceNetworkList Devicenetworklist)
void* startSystemThread(oam::DeviceNetworkList* Devicenetworklist)
{
oam::DeviceNetworkList devicenetworklist = Devicenetworklist;
assert(Devicenetworklist);
oam::DeviceNetworkList devicenetworklist = *Devicenetworklist;
ProcessLog log;
Configuration config;
@ -7410,11 +7412,11 @@ void startSystemThread(oam::DeviceNetworkList Devicenetworklist)
* purpose: Send Messages to Module Process Monitors to start Processes
*
*****************************************************************************************/
void startModuleThread(string module)
void* startModuleThread(string* module)
{
assert(module);
//store in a local variable
string moduleName = module;
string moduleName = *module;
ProcessLog log;
Configuration config;
@ -7516,9 +7518,10 @@ void startModuleThread(string module)
* purpose: Send Messages to Module Process Monitors to stop Processes
*
*****************************************************************************************/
void stopSystemThread(oam::DeviceNetworkList Devicenetworklist)
void* stopSystemThread(oam::DeviceNetworkList* Devicenetworklist)
{
oam::DeviceNetworkList devicenetworklist = Devicenetworklist;
assert(Devicenetworklist);
oam::DeviceNetworkList devicenetworklist = *Devicenetworklist;
ProcessLog log;
Configuration config;
@ -7759,10 +7762,11 @@ void stopSystemThread(oam::DeviceNetworkList Devicenetworklist)
* purpose: Send Messages to Module Process Monitors to stop Processes
*
*****************************************************************************************/
void stopModuleThread(string module)
void* stopModuleThread(string* module)
{
assert(module);
//store in a local variable
string moduleName = module;
string moduleName = *module;
ProcessLog log;
Configuration config;

View File

@ -75,11 +75,11 @@ void pingDeviceThread();
namespace processmanager
{
void startSystemThread(oam::DeviceNetworkList devicenetworklist);
void stopSystemThread(oam::DeviceNetworkList devicenetworklist);
void startModuleThread(std::string moduleName);
void stopModuleThread(std::string moduleName);
void processMSG(messageqcpp::IOSocket* fIos);
void* startSystemThread(oam::DeviceNetworkList* devicenetworklist);
void* stopSystemThread(oam::DeviceNetworkList* devicenetworklist);
void* startModuleThread(std::string* moduleName);
void* stopModuleThread(std::string* moduleName);
void* processMSG(messageqcpp::IOSocket* fIos);
/** @brief Timeset for Milleseconds
*/

View File

@ -41,13 +41,13 @@ using namespace idbdatafile;
//using namespace procheartbeat;
static void messageThread(MonitorConfig config);
static void statusControlThread();
static void sigchldHandleThread();
static void* messageThread(MonitorConfig* config);
static void* statusControlThread(void*);
static void* sigchldHandleThread(void*);
static void SIGCHLDHandler(int signal_number);
static void chldHandleThread(MonitorConfig config);
static void* chldHandleThread(MonitorConfig* config);
static void sigHupHandler(int sig);
static void mysqlMonitorThread(MonitorConfig config);
static void* mysqlMonitorThread(MonitorConfig* config);
string systemOAM;
string dm_server;
string cloud;
@ -190,7 +190,7 @@ int main(int argc, char** argv)
// create message thread
pthread_t MessageThread;
int ret = pthread_create (&MessageThread, NULL, (void* (*)(void*)) &messageThread, &config);
int ret = pthread_create (&MessageThread, NULL, (void*(*)(void*))&messageThread, &config);
if ( ret != 0 )
{
@ -631,7 +631,7 @@ int main(int argc, char** argv)
//launch Status table control thread on 'pm' modules
pthread_t statusThread;
int ret = pthread_create (&statusThread, NULL, (void* (*)(void*)) &statusControlThread, NULL);
int ret = pthread_create (&statusThread, NULL, &statusControlThread, NULL);
if ( ret != 0 )
log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR);
@ -827,7 +827,7 @@ int main(int argc, char** argv)
//handle SIGCHLD signal
pthread_t signalThread;
ret = pthread_create (&signalThread, NULL, (void* (*)(void*)) &sigchldHandleThread, NULL);
ret = pthread_create (&signalThread, NULL, &sigchldHandleThread, NULL);
if ( ret != 0 )
log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR);
@ -838,7 +838,7 @@ int main(int argc, char** argv)
( config.moduleType() == "pm" && PMwithUM == "y") )
{
pthread_t mysqlThread;
ret = pthread_create (&mysqlThread, NULL, (void* (*)(void*)) &mysqlMonitorThread, NULL);
ret = pthread_create (&mysqlThread, NULL, (void*(*)(void*))&mysqlMonitorThread, NULL);
if ( ret != 0 )
log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR);
@ -1091,7 +1091,7 @@ int main(int argc, char** argv)
// create process health (monitor) thread
pthread_t processHealthThread;
ret = pthread_create (&processHealthThread, NULL, (void* (*)(void*)) &chldHandleThread, &config);
ret = pthread_create (&processHealthThread, NULL, (void*(*)(void*))&chldHandleThread, &config);
if ( ret != 0 )
log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR);
@ -1164,15 +1164,16 @@ int main(int argc, char** argv)
* purpose: Read incoming messages
*
******************************************************************************************/
static void messageThread(MonitorConfig config)
static void* messageThread(MonitorConfig* config)
{
//ProcMon log file
MonitorLog log;
ProcessMonitor aMonitor(config, log);
assert(config);
ProcessMonitor aMonitor(*config, log);
log.writeLog(__LINE__, "Message Thread started ..", LOG_TYPE_DEBUG);
Oam oam;
string msgPort = config.moduleName() + "_ProcessMonitor";
string msgPort = config->moduleName() + "_ProcessMonitor";
string port = "";
//ProcMon will wait for request
@ -1264,7 +1265,7 @@ static void messageThread(MonitorConfig config)
}
}
return;
return NULL;
}
/******************************************************************************************
@ -1273,10 +1274,11 @@ static void messageThread(MonitorConfig config)
* purpose: monitor mysqld by getting status
*
******************************************************************************************/
static void mysqlMonitorThread(MonitorConfig config)
static void* mysqlMonitorThread(MonitorConfig* config)
{
MonitorLog log;
ProcessMonitor aMonitor(config, log);
assert(config);
ProcessMonitor aMonitor(*config, log);
log.writeLog(__LINE__, "mysqld Monitoring Thread started ..", LOG_TYPE_DEBUG);
Oam oam;
@ -1293,6 +1295,7 @@ static void mysqlMonitorThread(MonitorConfig config)
sleep(5);
}
return NULL;
}
/******************************************************************************************
@ -1301,13 +1304,13 @@ static void mysqlMonitorThread(MonitorConfig config)
* purpose: Catch and process dieing child processes
*
******************************************************************************************/
static void sigchldHandleThread()
static void* sigchldHandleThread(void*)
{
struct sigaction sigchld_action;
memset (&sigchld_action, 0, sizeof (sigchld_action));
sigchld_action.sa_handler = &SIGCHLDHandler;
sigaction(SIGCHLD, &sigchld_action, NULL);
return;
return NULL;
}
static void SIGCHLDHandler(int signal_number)
@ -1326,18 +1329,19 @@ static void SIGCHLDHandler(int signal_number)
* Also validate the internal Process status with the Process-Status disk file
*
******************************************************************************************/
static void chldHandleThread(MonitorConfig config)
static void* chldHandleThread(MonitorConfig* config)
{
//ProcMon log file
MonitorLog log;
ProcessMonitor aMonitor(config, log);
assert(config);
ProcessMonitor aMonitor(*config, log);
log.writeLog(__LINE__, "Child Process Monitoring Thread started ..", LOG_TYPE_DEBUG);
Oam oam;
SystemProcessStatus systemprocessstatus;
//Loop through the process list to check the process current state
processList::iterator listPtr;
processList* aPtr = config.monitoredListPtr();
processList* aPtr = config->monitoredListPtr();
//get dbhealth flag
string DBFunctionalMonitorFlag;
@ -1387,7 +1391,7 @@ static void chldHandleThread(MonitorConfig config)
try
{
ProcessStatus procstat;
oam.getProcessStatus((*listPtr).ProcessName, config.moduleName(), procstat);
oam.getProcessStatus((*listPtr).ProcessName, config->moduleName(), procstat);
state = procstat.ProcessOpState;
PID = procstat.ProcessID;
@ -1472,7 +1476,7 @@ static void chldHandleThread(MonitorConfig config)
//setModule status to failed
try
{
oam.setModuleStatus(config.moduleName(), oam::FAILED);
oam.setModuleStatus(config->moduleName(), oam::FAILED);
}
catch (exception& ex)
{
@ -1533,18 +1537,18 @@ static void chldHandleThread(MonitorConfig config)
processRestartCount == 0)
{
// don't restart it
config.buildList((*listPtr).ProcessModuleType,
(*listPtr).ProcessName,
(*listPtr).ProcessLocation,
(*listPtr).ProcessArgs,
(*listPtr).launchID,
0,
oam::AUTO_OFFLINE,
(*listPtr).BootLaunch,
(*listPtr).RunType,
(*listPtr).DepProcessName,
(*listPtr).DepModuleName,
(*listPtr).LogFile);
config->buildList((*listPtr).ProcessModuleType,
(*listPtr).ProcessName,
(*listPtr).ProcessLocation,
(*listPtr).ProcessArgs,
(*listPtr).launchID,
0,
oam::AUTO_OFFLINE,
(*listPtr).BootLaunch,
(*listPtr).RunType,
(*listPtr).DepProcessName,
(*listPtr).DepModuleName,
(*listPtr).LogFile);
//Set the alarm
aMonitor.sendAlarm((*listPtr).ProcessName, PROCESS_DOWN_AUTO, SET);
@ -1563,13 +1567,13 @@ static void chldHandleThread(MonitorConfig config)
{
bool degraded;
int moduleStatus;
oam.getModuleStatus(config.moduleName(), moduleStatus, degraded);
oam.getModuleStatus(config->moduleName(), moduleStatus, degraded);
if ( moduleStatus == oam::ACTIVE)
{
try
{
oam.setModuleStatus(config.moduleName(), oam::DEGRADED);
oam.setModuleStatus(config->moduleName(), oam::DEGRADED);
}
catch (exception& ex)
{
@ -1621,18 +1625,18 @@ static void chldHandleThread(MonitorConfig config)
initStatus = oam::STANDBY;
//record the process information into processList
config.buildList((*listPtr).ProcessModuleType,
(*listPtr).ProcessName,
(*listPtr).ProcessLocation,
(*listPtr).ProcessArgs,
(*listPtr).launchID,
0,
oam::AUTO_OFFLINE,
(*listPtr).BootLaunch,
(*listPtr).RunType,
(*listPtr).DepProcessName,
(*listPtr).DepModuleName,
(*listPtr).LogFile);
config->buildList((*listPtr).ProcessModuleType,
(*listPtr).ProcessName,
(*listPtr).ProcessLocation,
(*listPtr).ProcessArgs,
(*listPtr).launchID,
0,
oam::AUTO_OFFLINE,
(*listPtr).BootLaunch,
(*listPtr).RunType,
(*listPtr).DepProcessName,
(*listPtr).DepModuleName,
(*listPtr).LogFile);
//Set the alarm
aMonitor.sendAlarm((*listPtr).ProcessName, PROCESS_DOWN_AUTO, SET);
@ -1699,13 +1703,13 @@ static void chldHandleThread(MonitorConfig config)
{
bool degraded;
int moduleStatus;
oam.getModuleStatus(config.moduleName(), moduleStatus, degraded);
oam.getModuleStatus(config->moduleName(), moduleStatus, degraded);
if ( moduleStatus == oam::ACTIVE)
{
try
{
oam.setModuleStatus(config.moduleName(), oam::DEGRADED);
oam.setModuleStatus(config->moduleName(), oam::DEGRADED);
}
catch (exception& ex)
{
@ -1766,6 +1770,7 @@ static void chldHandleThread(MonitorConfig config)
sleep(5);
}
return NULL;
}
/******************************************************************************************
@ -1799,7 +1804,7 @@ int processNumber = 0;
boost::interprocess::shared_memory_object fSysStatShmobj;
boost::interprocess::mapped_region fSysStatMapreg;
void processStatusMSG(messageqcpp::IOSocket* fIos);
void* processStatusMSG(messageqcpp::IOSocket* fIos);
processStatusList* aPtr;
SystemProcessConfig systemprocessconfig;
@ -1833,7 +1838,7 @@ processStatusList* statusListPtr()
* into the Status Shared-Memory table
*
******************************************************************************************/
static void statusControlThread()
static void* statusControlThread(void*)
{
MonitorLog log;
MonitorConfig config;
@ -2051,7 +2056,7 @@ static void statusControlThread()
memset(fShmSystemStatus, 0, SYSTEMSTATshmsize);
//set system status
memcpy(fShmSystemStatus[0].Name, "system", NAMESIZE);
memcpy(fShmSystemStatus[0].Name, "system", sizeof("system"));
if (runStandby)
{
@ -2334,7 +2339,7 @@ static void statusControlThread()
{
//log.writeLog(__LINE__, "***before create thread", LOG_TYPE_DEBUG);
pthread_t messagethread;
int status = pthread_create (&messagethread, NULL, (void* (*)(void*)) &processStatusMSG, fIos);
int status = pthread_create (&messagethread, NULL, (void*(*)(void*))&processStatusMSG, fIos);
//log.writeLog(__LINE__, "***after create thread", LOG_TYPE_DEBUG);
@ -2366,6 +2371,7 @@ static void statusControlThread()
}
}
} // end of for loop
return NULL;
}
/******************************************************************************************
@ -2374,7 +2380,7 @@ static void statusControlThread()
* purpose: Process the status message
*
******************************************************************************************/
void processStatusMSG(messageqcpp::IOSocket* cfIos)
void* processStatusMSG(messageqcpp::IOSocket* cfIos)
{
messageqcpp::IOSocket* fIos = cfIos;
@ -3583,6 +3589,7 @@ void processStatusMSG(messageqcpp::IOSocket* cfIos)
delete msg;
pthread_detach (ThreadId);
pthread_exit(0);
return NULL;
}
/******************************************************************************************

View File

@ -76,8 +76,8 @@ pthread_mutex_t PROCESS_LOCK;
namespace processmonitor
{
void sendAlarmThread (sendAlarmInfo_t* t);
void sendProcessThread (sendProcessInfo_t* t);
void* sendAlarmThread (sendAlarmInfo_t* t);
void* sendProcessThread (sendProcessInfo_t* t);
using namespace oam;
@ -2961,7 +2961,7 @@ void ProcessMonitor::sendAlarm(string alarmItem, ALARMS alarmID, int action)
* purpose: send a trap and log the process information
*
******************************************************************************************/
void sendAlarmThread(sendAlarmInfo_t* t)
void* sendAlarmThread(sendAlarmInfo_t* t)
{
MonitorLog log;
Oam oam;
@ -3052,7 +3052,7 @@ bool ProcessMonitor::updateProcessInfo(std::string processName, int state, pid_t
* purpose: Send msg to update process state and status change time on disk
*
******************************************************************************************/
void sendProcessThread(sendProcessInfo_t* t)
void* sendProcessThread(sendProcessInfo_t* t)
{
MonitorLog log;
MonitorConfig config;
@ -3081,7 +3081,7 @@ void sendProcessThread(sendProcessInfo_t* t)
delete t;
// pthread_mutex_unlock(&PROCESS_LOCK);
return;
return NULL;
}
/******************************************************************************************