1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-27 21:01:50 +03:00

MCOL-4170 Refactor services/systemd units to finish their bootstrap ...

This commit is contained in:
Alexander Barkov
2020-10-07 12:38:09 +04:00
parent 2992ee3c31
commit ab44ef6ddb
21 changed files with 875 additions and 181 deletions

View File

@ -66,8 +66,76 @@ using namespace execplan;
#include "collation.h"
#include "service.h"
namespace
{
class Opt
{
public:
int m_debug;
bool m_fg;
Opt(int argc, char *argv[])
:m_debug(0),
m_fg(false)
{
int c;
while ((c = getopt(argc, argv, "df")) != EOF)
{
switch(c)
{
case 'd':
m_debug++; // TODO: not really used yes
break;
case 'f':
m_fg= true;
break;
case '?':
default:
break;
}
}
}
};
class ServiceDDLProc: public Service, public Opt
{
protected:
void setupChildSignalHandlers();
void log(logging::LOG_TYPE type, const std::string &str)
{
LoggingID logid(23, 0, 0);
Message::Args args;
Message message(8);
args.add(str);
message.format(args);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_CRITICAL, message, logid);
}
public:
ServiceDDLProc(const Opt &opt)
:Service("DDLProc"), Opt(opt)
{ }
void LogErrno() override
{
log(LOG_TYPE_CRITICAL, std::string(strerror(errno)));
}
void ParentLogChildMessage(const std::string &str) override
{
log(LOG_TYPE_INFO, str);
}
int Child() override;
int Run()
{
return m_fg ? Child() : RunForking();
}
};
DistributedEngineComm* Dec;
int8_t setupCwd()
@ -95,7 +163,7 @@ void added_a_pm(int)
}
static void setupSignalHandlers()
void ServiceDDLProc::setupChildSignalHandlers()
{
#ifndef _MSC_VER
/* set up some signal handlers */
@ -114,16 +182,8 @@ static void setupSignalHandlers()
}
int main(int argc, char* argv[])
int ServiceDDLProc::Child()
{
// Set locale language
setlocale(LC_ALL, "");
setlocale(LC_NUMERIC, "C");
// Initialize the charset library
my_init();
// This is unset due to the way we start it
program_invocation_short_name = const_cast<char*>("DDLProc");
if ( setupCwd() < 0 )
{
LoggingID logid(23, 0, 0);
@ -133,6 +193,7 @@ int main(int argc, char* argv[])
msg.format( args1 );
logging::Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_CRITICAL, msg, logid);
NotifyServiceInitializationFailed();
return 1;
}
@ -147,10 +208,12 @@ int main(int argc, char* argv[])
ResourceManager* rm = ResourceManager::instance();
Dec = DistributedEngineComm::instance(rm);
setupSignalHandlers();
setupChildSignalHandlers();
ddlprocessor::DDLProcessor ddlprocessor(1, 20);
NotifyServiceStarted();
{
Oam oam;
@ -218,5 +281,20 @@ int main(int argc, char* argv[])
return 0;
}
int main(int argc, char** argv)
{
Opt opt(argc, argv);
// Set locale language
setlocale(LC_ALL, "");
setlocale(LC_NUMERIC, "C");
// This is unset due to the way we start it
program_invocation_short_name = const_cast<char*>("DDLProc");
// Initialize the charset library
my_init();
return ServiceDDLProc(opt).Run();
}
// vim:ts=4 sw=4:

View File

@ -86,10 +86,90 @@ using namespace joblist;
#include "collation.h"
#include "service.h"
threadpool::ThreadPool DMLServer::fDmlPackagepool(10, 0);
namespace
{
class Opt
{
public:
int m_debug;
bool m_fg;
Opt(int argc, char *argv[])
:m_debug(0),
m_fg(false)
{
int c;
while ((c = getopt(argc, argv, "df")) != EOF)
{
switch(c)
{
case 'd':
m_debug++; // TODO: not really used yes
break;
case 'f':
m_fg= true;
break;
case '?':
default:
break;
}
}
}
};
class ServiceDMLProc: public Service, public Opt
{
protected:
int Parent() override
{
/*
Need to shutdown TheadPool,
otherwise it would get stuck when trying to join fPruneThread.
*/
joblist::JobStep::jobstepThreadPool.stop();
return Service::Parent();
}
void log(logging::LOG_TYPE type, const std::string &str)
{
LoggingID logid(23, 0, 0);
Message::Args args;
Message message(8);
args.add(str);
message.format(args);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_CRITICAL, message, logid);
}
void setupChildSignalHandlers();
public:
ServiceDMLProc(const Opt &opt)
:Service("DMLProc"), Opt(opt)
{ }
void LogErrno() override
{
log(LOG_TYPE_CRITICAL, std::string(strerror(errno)));
}
void ParentLogChildMessage(const std::string &str) override
{
log(LOG_TYPE_INFO, str);
}
int Child() override;
int Run()
{
return m_fg ? Child() : RunForking();
}
};
DistributedEngineComm* Dec;
void added_a_pm(int)
@ -510,7 +590,7 @@ int8_t setupCwd()
} // Namewspace
static void setupSignalHandlers()
void ServiceDMLProc::setupChildSignalHandlers()
{
#ifndef _MSC_VER
/* set up some signal handlers */
@ -530,18 +610,10 @@ static void setupSignalHandlers()
}
int main(int argc, char* argv[])
int ServiceDMLProc::Child()
{
BRM::DBRM dbrm;
Oam oam;
// Set locale language
setlocale(LC_ALL, "");
setlocale(LC_NUMERIC, "C");
// Initialize the charset library
my_init();
// This is unset due to the way we start it
program_invocation_short_name = const_cast<char*>("DMLProc");
Config* cf = Config::makeConfig();
@ -554,6 +626,7 @@ int main(int argc, char* argv[])
msg.format( args1 );
logging::Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_CRITICAL, msg, logid);
NotifyServiceInitializationFailed();
return 1;
}
@ -581,6 +654,7 @@ int main(int argc, char* argv[])
msg.format( args1 );
logging::Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_CRITICAL, msg, logid);
NotifyServiceInitializationFailed();
return 1;
}
catch (...)
@ -593,6 +667,7 @@ int main(int argc, char* argv[])
msg.format( args1 );
logging::Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_CRITICAL, msg, logid);
NotifyServiceInitializationFailed();
return 1;
}
@ -623,6 +698,7 @@ int main(int argc, char* argv[])
ml.logCriticalMessage( message );
cerr << "DMLProc failed to start due to : " << e.what() << endl;
NotifyServiceInitializationFailed();
return 1;
}
@ -664,6 +740,9 @@ int main(int argc, char* argv[])
}
DMLServer dmlserver(serverThreads, serverQueueSize, &dbrm);
NotifyServiceStarted();
ResourceManager* rm = ResourceManager::instance();
// jobstepThreadPool is used by other processes. We can't call
@ -715,11 +794,28 @@ int main(int argc, char* argv[])
Dec = DistributedEngineComm::instance(rm);
setupSignalHandlers();
setupChildSignalHandlers();
dmlserver.start();
return 1;
}
int main(int argc, char** argv)
{
Opt opt(argc, argv);
// Set locale language
setlocale(LC_ALL, "");
setlocale(LC_NUMERIC, "C");
// This is unset due to the way we start it
program_invocation_short_name = const_cast<char*>("DMLProc");
// Initialize the charset library
my_init();
return ServiceDMLProc(opt).Run();
}
// vim:ts=4 sw=4:

View File

@ -69,6 +69,7 @@
#include "liboamcpp.h"
#include "crashtrace.h"
#include "utils_utf8.h"
#include "service.h"
#include <mutex>
#include <thread>
@ -78,6 +79,79 @@
#include "collation.h"
class Opt
{
public:
int m_debug;
bool m_e;
bool m_fg;
Opt(int argc, char *argv[])
:m_debug(0),
m_e(false),
m_fg(false)
{
int c;
while ((c = getopt(argc, argv, "edf")) != EOF)
{
switch (c)
{
case 'd':
m_debug++;
break;
case 'e':
m_e= true;
break;
case 'f':
m_fg= true;
break;
case '?':
default:
break;
}
}
}
};
class ServiceExeMgr: public Service, public Opt
{
protected:
void log(logging::LOG_TYPE type, const std::string &str)
{
logging::LoggingID logid(16);
logging::Message::Args args;
logging::Message message(8);
args.add(strerror(errno));
message.format(args);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(type, message, logid);
}
public:
ServiceExeMgr(const Opt &opt)
:Service("ExeMgr"), Opt(opt)
{ }
void LogErrno() override
{
log(logging::LOG_TYPE_CRITICAL, std::string(strerror(errno)));
}
void ParentLogChildMessage(const std::string &str) override
{
log(logging::LOG_TYPE_INFO, str);
}
int Child() override;
int Run()
{
return m_fg ? Child() : RunForking();
}
};
namespace
{
@ -1435,38 +1509,10 @@ void cleanTempDir()
}
int main(int argc, char* argv[])
int ServiceExeMgr::Child()
{
// Set locale language
setlocale(LC_ALL, "");
setlocale(LC_NUMERIC, "C");
// Initialize the charset library
my_init();
// This is unset due to the way we start it
program_invocation_short_name = const_cast<char*>("ExeMgr");
gDebug = 0;
bool eFlg = false;
int c;
opterr = 0;
while ((c = getopt(argc, argv, "ed")) != EOF)
switch (c)
{
case 'd':
gDebug++;
break;
case 'e':
eFlg = true;
break;
case '?':
default:
break;
}
gDebug= m_debug;
//set BUSY_INIT state
{
@ -1486,13 +1532,13 @@ int main(int argc, char* argv[])
#else
// Make sure CSC thinks it's on a UM or else bucket reuse stuff below will stall
if (!eFlg)
if (!m_e)
setenv("CALPONT_CSC_IDENT", "um", 1);
#endif
setupSignalHandlers();
int err = 0;
if (!gDebug)
if (!m_debug)
err = setupResources();
std::string errMsg;
@ -1538,6 +1584,7 @@ int main(int argc, char* argv[])
{
}
NotifyServiceInitializationFailed();
return 2;
}
@ -1633,6 +1680,8 @@ int main(int argc, char* argv[])
}
}
NotifyServiceStarted();
std::cout << "Starting ExeMgr: st = " << serverThreads <<
", qs = " << rm->getEmExecQueueSize() << ", mx = " << maxPct << ", cf = " <<
rm->getConfig()->configFile() << std::endl;
@ -1676,4 +1725,24 @@ int main(int argc, char* argv[])
return 0;
}
int main(int argc, char* argv[])
{
opterr = 0;
Opt opt(argc, argv);
// Set locale language
setlocale(LC_ALL, "");
setlocale(LC_NUMERIC, "C");
// This is unset due to the way we start it
program_invocation_short_name = const_cast<char*>("ExeMgr");
// Initialize the charset library
my_init();
return ServiceExeMgr(opt).Run();
}
// vim:ts=4 sw=4:

View File

@ -6,15 +6,14 @@ PartOf=mcs-workernode@1.service
After=network.target mcs-workernode@1.service
[Service]
Type=simple
Type=forking
User=@DEFAULT_USER@
Group=@DEFAULT_GROUP@
LimitNOFILE=65536
LimitNPROC=65536
ExecStartPre=/bin/sleep 7
ExecStart=@ENGINE_BINDIR@/controllernode fg
ExecStart=@ENGINE_BINDIR@/controllernode
ExecStop=@ENGINE_BINDIR@/mcs-stop-controllernode.sh $MAINPID
Restart=on-failure

View File

@ -6,7 +6,7 @@ PartOf=mcs-writeengineserver.service
After=network.target mcs-dmlproc.service
[Service]
Type=simple
Type=forking
User=@DEFAULT_USER@
Group=@DEFAULT_GROUP@

View File

@ -6,7 +6,7 @@ PartOf=mcs-writeengineserver.service
After=network.target mcs-writeengineserver.service
[Service]
Type=simple
Type=forking
User=@DEFAULT_USER@
Group=@DEFAULT_GROUP@

View File

@ -6,7 +6,7 @@ PartOf=mcs-primproc.service
After=network.target mcs-primproc.service
[Service]
Type=simple
Type=forking
User=@DEFAULT_USER@
Group=@DEFAULT_GROUP@

View File

@ -7,7 +7,7 @@ PartOf=mcs-controllernode.service
After=network.target mcs-workernode@1.service mcs-workernode@2.service mcs-controllernode.service
[Service]
Type=simple
Type=forking
User=@DEFAULT_USER@
Group=@DEFAULT_GROUP@

View File

@ -2,7 +2,7 @@
Description=storagemanager
[Service]
Type=simple
Type=forking
User=@DEFAULT_USER@
Group=@DEFAULT_GROUP@

View File

@ -3,14 +3,14 @@ Description=mcs-workernode
After=network.target mcs-loadbrm.service
[Service]
Type=simple
Type=forking
User=@DEFAULT_USER@
Group=@DEFAULT_GROUP@
LimitNOFILE=65536
LimitNPROC=65536
ExecStart=@ENGINE_BINDIR@/workernode DBRM_Worker%i fg
ExecStart=@ENGINE_BINDIR@/workernode DBRM_Worker%i
ExecStopPost=@ENGINE_BINDIR@/mcs-savebrm.py
ExecStopPost=/usr/bin/env bash -c "clearShm > /dev/null 2>&1"

View File

@ -6,7 +6,7 @@ PartOf=mcs-exemgr.service
After=network.target mcs-exemgr.service
[Service]
Type=simple
Type=forking
User=@DEFAULT_USER@
Group=@DEFAULT_GROUP@

View File

@ -2493,7 +2493,7 @@ PrimitiveServer::~PrimitiveServer()
{
}
void PrimitiveServer::start()
void PrimitiveServer::start(Service *service)
{
// start all the server threads
for ( int i = 1; i <= fServerThreads; i++)
@ -2515,6 +2515,8 @@ void PrimitiveServer::start()
catch (...) {}
}
service->NotifyServiceStarted();
fServerpool.wait();
cerr << "PrimitiveServer::start() exiting!" << endl;

View File

@ -41,6 +41,8 @@
#include "blockrequestprocessor.h"
#include "batchprimitiveprocessor.h"
#include "service.h"
//#define PRIMPROC_STOPWATCH
#ifdef PRIMPROC_STOPWATCH
#include "stopwatch.h"
@ -140,7 +142,7 @@ public:
/** @brief start the primitive server
*
*/
void start();
void start(Service *p);
/** @brief get a pointer the shared processor thread pool
*/

View File

@ -76,6 +76,61 @@ using namespace idbdatafile;
#include "collation.h"
#include "service.h"
class Opt
{
public:
int m_debug;
bool m_fg;
Opt(int argc, char *argv[])
:m_debug(0),
m_fg(false)
{
int c;
while ((c = getopt(argc, argv, "df")) != EOF)
{
switch(c)
{
case 'd':
m_debug++;
break;
case 'f':
m_fg= true;
break;
case '?':
default:
break;
}
}
}
};
class ServicePrimProc: public Service, public Opt
{
public:
ServicePrimProc(const Opt &opt)
:Service("PrimProc"), Opt(opt)
{ }
void LogErrno() override
{
cerr << strerror(errno) << endl;
}
void ParentLogChildMessage(const std::string &str) override
{
cout << str << endl;
}
int Child() override;
int Run()
{
return m_fg ? Child() : RunForking();
}
};
namespace primitiveprocessor
{
@ -311,33 +366,9 @@ void* waitForSIGUSR1(void* p)
}
int main(int argc, char* argv[])
int ServicePrimProc::Child()
{
// This is unset due to the way we start it
program_invocation_short_name = const_cast<char*>("PrimProc");
// Set locale language
setlocale(LC_ALL, "");
setlocale(LC_NUMERIC, "C");
// Initialize the charset library
my_init();
int gDebug = 0;
int c;
while ((c = getopt(argc, argv, "d")) != EOF)
{
switch(c)
{
case 'd':
gDebug++;
break;
case '?':
default:
break;
}
}
Config* cf = Config::makeConfig();
setupSignalHandlers();
@ -347,7 +378,7 @@ int main(int argc, char* argv[])
mlp = new primitiveprocessor::Logger();
if (!gDebug)
if (!m_debug)
err = setupResources();
string errMsg;
@ -388,6 +419,7 @@ int main(int argc, char* argv[])
{
}
NotifyServiceInitializationFailed();
return 2;
}
@ -767,10 +799,26 @@ int main(int argc, char* argv[])
}
#endif
server.start();
server.start(this);
cerr << "server.start() exited!" << endl;
return 1;
}
int main(int argc, char** argv)
{
Opt opt(argc, argv);
// Set locale language
setlocale(LC_ALL, "");
setlocale(LC_NUMERIC, "C");
// This is unset due to the way we start it
program_invocation_short_name = const_cast<char*>("PrimProc");
// Initialize the charset library
my_init();
return ServicePrimProc(opt).Run();
}
// vim:ts=4 sw=4:

View File

@ -35,9 +35,48 @@ using namespace std;
#include "Synchronizer.h"
#include "Replicator.h"
#include "crashtrace.h"
#include "service.h"
using namespace storagemanager;
class Opt
{
public:
const char *m_progname;
bool m_fg;
Opt(int argc, char **argv)
:m_progname(argv[0]),
m_fg(argc >= 2 && string(argv[1]) == "fg")
{ }
};
class ServiceStorageManager: public Service, public Opt
{
protected:
void setupChildSignalHandlers();
public:
ServiceStorageManager(const Opt &opt)
:Service("StorageManager"), Opt(opt)
{ }
void LogErrno() override
{
SMLogging::get()->log(LOG_ERR, "%s", strerror(errno));
}
void ParentLogChildMessage(const std::string &str) override
{
SMLogging::get()->log(LOG_INFO, "%.*s", (int) str.length(), str.data());
}
int Child() override;
int Run()
{
return m_fg ? Child() : RunForking();
}
};
bool signalCaught = false;
void printCacheUsage(int sig)
@ -75,7 +114,7 @@ void coreSM(int sig)
}
static void setupSignalHandlers()
void ServiceStorageManager::setupChildSignalHandlers()
{
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
@ -106,7 +145,7 @@ static void setupSignalHandlers()
}
int main(int argc, char** argv)
int ServiceStorageManager::Child()
{
SMLogging* logger = SMLogging::get();
@ -129,7 +168,7 @@ int main(int argc, char** argv)
return -1;
}
setupSignalHandlers();
setupChildSignalHandlers();
int ret = 0;
@ -137,6 +176,8 @@ int main(int argc, char** argv)
SessionManager* sm = SessionManager::get();
NotifyServiceStarted();
ret = sm->start();
cache->shutdown();
@ -150,3 +191,8 @@ int main(int argc, char** argv)
return ret;
}
int main(int argc, char** argv)
{
return ServiceStorageManager(Opt(argc, argv)).Run();
}

97
utils/common/pipe.h Normal file
View File

@ -0,0 +1,97 @@
/* Copyright (C) 2020 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#ifndef PIPE_H_INCLUDED
#define PIPE_H_INCLUDED
/*
A helper class to hold the file descriptors returned from a pipe() call.
*/
class Pipe
{
int fd[2];
public:
Pipe()
{
fd[0]= 0;
fd[1]= 0;
}
~Pipe()
{
close();
}
bool is_open_for_read() const
{
return fd[0] > 0;
}
bool is_open_for_write() const
{
return fd[1] > 0;
}
bool open()
{
return ::pipe(fd) == -1;
}
bool init() // TODO: remove this
{
return open();
}
ssize_t read(char *str, size_t nbytes)
{
return ::read(fd[0], str, nbytes);
}
ssize_t readtm(const struct timeval &tv, void *buf, size_t nbytes)
{
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(fd[0], &rfds);
struct timeval tmptv = tv;
int retval = select(fd[0] + 1, &rfds, NULL, NULL, &tmptv);
if (retval == -1)
return -1;
if (!retval)
{
errno= ETIME;
return -1;
}
return ::read(fd[0], buf, nbytes);
}
ssize_t write(const char *str, size_t nbytes)
{
return ::write(fd[1], str, nbytes);
}
ssize_t write(const std::string &str)
{
return write(str.data(), str.length());
}
void close()
{
if (fd[0])
{
::close(fd[0]);
fd[0]= 0;
}
if (fd[1])
{
::close(fd[1]);
fd[1]= 0;
}
}
};
#endif // PIPE_H_INCLUDED

116
utils/common/service.h Normal file
View File

@ -0,0 +1,116 @@
/* Copyright (C) 2020 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#ifndef SERVICE_H_INCLUDED
#define SERVICE_H_INCLUDED
#include <signal.h>
#include "pipe.h"
class Service
{
protected:
// The service name, for logging
const std::string m_name;
// The pipe to send messages from the child to the parent
Pipe m_pipe;
static void common_signal_handler_CHLD(int sig)
{ }
void InitCommonSignalHandlers()
{
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_handler = common_signal_handler_CHLD;
sigaction(SIGCHLD, &sa, NULL);
}
int RunForking()
{
int err;
InitCommonSignalHandlers();
if (m_pipe.open() || (err = fork()) < 0)
{
// Pipe or fork failed
LogErrno();
return 1;
}
if (err > 0) // Parent
return Parent();
return Child();
}
virtual int Parent()
{
char str[100];
// Read the message from the child
ssize_t nbytes= m_pipe.readtm({120,0}, str, sizeof(str));
if (nbytes >= 0)
{
ParentLogChildMessage(std::string(str, nbytes));
}
else
{
// read() failed
LogErrno();
return 1;
}
return 0;
}
public:
Service(const std::string &name)
:m_name(name)
{ }
virtual ~Service() { }
void NotifyServiceStarted()
{
if (m_pipe.is_open_for_write())
{
std::ostringstream str;
str << m_name << " main process has started";
m_pipe.write(str.str());
}
}
void NotifyServiceInitializationFailed()
{
if (m_pipe.is_open_for_write())
{
std::ostringstream str;
str << m_name << " main process initialization failed";
m_pipe.write(str.str());
}
}
// Used by both Parent and Child to log errors
virtual void LogErrno()= 0;
// Used by Parent to log an initialization notification message from child
virtual void ParentLogChildMessage(const std::string &str)= 0;
// The main service process job
virtual int Child()= 0;
};
#endif // SERVICE_H_INCLUDED

View File

@ -130,6 +130,8 @@ void ThreadPool::setMaxThreads(size_t maxThreads)
void ThreadPool::stop()
{
boost::mutex::scoped_lock lock1(fMutex);
if (fStop)
return; // Was stopped earlier
fStop = true;
lock1.unlock();

View File

@ -35,13 +35,15 @@
#include "IDBPolicy.h"
#include "brmtypes.h"
#include "utils_utf8.h"
#include "service.h"
#include "jobstep.h"
#include "crashtrace.h"
#define MAX_RETRIES 10
BRM::MasterDBRMNode* m;
bool die;
bool die= false;
using namespace std;
using namespace BRM;
@ -60,6 +62,48 @@ void fail()
}
}
class Opt
{
protected:
const char *m_progname;
bool m_fg;
public:
Opt(int argc, char **argv)
:m_progname(argv[0]),
m_fg(argc >= 2 && string(argv[1]) == "fg")
{ }
};
class ServiceControllerNode: public Service, public Opt
{
protected:
void setupChildSignalHandlers();
public:
ServiceControllerNode(const Opt &opt)
:Service("ControllerNode"), Opt(opt)
{ }
void LogErrno() override
{
perror(m_progname);
log_errno(std::string(m_progname));
fail();
}
void ParentLogChildMessage(const std::string &str) override
{
log(str, logging::LOG_TYPE_INFO);
}
int Child() override;
int Run()
{
return m_fg ? Child() : RunForking();
}
};
void stop(int num)
{
#ifdef BRM_VERBOSE
@ -101,7 +145,7 @@ void reload(int num)
*/
static void setupSignalHandlers()
void ServiceControllerNode::setupChildSignalHandlers()
{
/* XXXPAT: we might want to install signal handlers for every signal */
@ -122,37 +166,13 @@ static void setupSignalHandlers()
}
int main(int argc, char** argv)
int ServiceControllerNode::Child()
{
// Set locale language
setlocale(LC_ALL, "");
setlocale(LC_NUMERIC, "C");
BRM::logInit ( BRM::SubSystemLogId_controllerNode );
int retries = 0, err;
std::string arg;
die = false;
if (!(argc >= 2 && (arg = argv[1]) == "fg"))
{
if ((err = fork()) < 0)
{
perror(argv[0]);
log_errno(string(argv[0]));
fail();
exit(1);
}
if (err > 0)
exit(0);
}
int retries = 0;
(void)config::Config::makeConfig();
setupSignalHandlers();
setupChildSignalHandlers();
idbdatafile::IDBPolicy::configIDBPolicy();
@ -167,6 +187,8 @@ int main(int argc, char** argv)
m = new BRM::MasterDBRMNode();
NotifyServiceStarted();
try
{
oam::Oam oam;
@ -203,11 +225,31 @@ int main(int argc, char** argv)
if (retries == MAX_RETRIES)
{
NotifyServiceInitializationFailed();
log(string("Exiting after too many errors"));
fail();
return 1;
}
std::cerr << "Exiting..." << std::endl;
exit(0);
return 0;
}
int main(int argc, char** argv)
{
Opt opt(argc, argv);
// Set locale language
setlocale(LC_ALL, "");
setlocale(LC_NUMERIC, "C");
BRM::logInit ( BRM::SubSystemLogId_controllerNode );
/*
Need to shutdown TheadPool before fork(),
otherwise it would get stuck when trying to join fPruneThread.
*/
joblist::JobStep::jobstepThreadPool.stop();
return ServiceControllerNode(opt).Run();
}

View File

@ -39,12 +39,14 @@
#include "IDBPolicy.h"
#include "crashtrace.h"
#include "service.h"
#include "jobstep.h"
using namespace BRM;
using namespace std;
SlaveComm* comm;
bool die;
bool die= false;
boost::thread_group monitorThreads;
void fail()
@ -61,6 +63,49 @@ void fail()
}
}
class Opt
{
protected:
const char *m_progname;
const char *m_nodename;
bool m_fg;
public:
Opt(int argc, char **argv)
:m_progname(argv[0]),
m_nodename(argc > 1 ? argv[1] : nullptr),
m_fg(argc > 2 && string(argv[2]) == "fg")
{ }
};
class ServiceWorkerNode: public Service, public Opt
{
protected:
void setupChildSignalHandlers();
public:
ServiceWorkerNode(const Opt &opt)
:Service("WorkerNode"), Opt(opt)
{ }
void LogErrno() override
{
perror(m_progname);
log_errno(std::string(m_progname));
fail();
}
void ParentLogChildMessage(const std::string &str) override
{
log(str, logging::LOG_TYPE_INFO);
}
int Child() override;
int Run()
{
return m_fg ? Child() : RunForking();
}
};
void stop(int sig)
{
if (!die)
@ -77,7 +122,7 @@ void reset(int sig)
}
static void setupSignalHandlers()
void ServiceWorkerNode::setupChildSignalHandlers()
{
#ifdef SIGHUP
signal(SIGHUP, reset);
@ -97,16 +142,17 @@ static void setupSignalHandlers()
}
static int child(const string& nodeName)
int ServiceWorkerNode::Child()
{
setupSignalHandlers();
setupChildSignalHandlers();
SlaveDBRMNode slave;
ShmKeys keys;
try
{
comm = new SlaveComm(nodeName, &slave);
comm = new SlaveComm(std::string(m_nodename), &slave);
NotifyServiceStarted();
}
catch (exception& e)
{
@ -115,7 +161,8 @@ static int child(const string& nodeName)
cerr << os.str() << endl;
log(os.str());
fail();
exit(1);
NotifyServiceInitializationFailed();
return 1;
}
/* Start 4 threads to monitor write lock state */
@ -161,14 +208,19 @@ static int child(const string& nodeName)
int main(int argc, char** argv)
{
Opt opt(argc, argv);
// Set locale language
setlocale(LC_ALL, "");
setlocale(LC_NUMERIC, "C");
BRM::logInit ( BRM::SubSystemLogId_workerNode );
string arg;
int err = 0;
/*
Need to shutdown TheadPool before fork(),
otherwise it would get stuck when trying to join fPruneThread.
*/
joblist::JobStep::jobstepThreadPool.stop();
if (argc < 2)
{
@ -180,21 +232,8 @@ int main(int argc, char** argv)
exit(1);
}
// TODO: this should move to child() probably, like in masternode.cpp
idbdatafile::IDBPolicy::configIDBPolicy();
if (!(argc >= 3 && (arg = argv[2]) == "fg"))
err = fork();
if (err == 0)
{
return child(argv[1]);
}
else if (err < 0)
{
perror(argv[0]);
log_errno(string(argv[0]));
fail();
}
exit(0);
return ServiceWorkerNode(opt).Run();
}

View File

@ -54,10 +54,73 @@ using namespace oam;
#include "collation.h"
#include "service.h"
using namespace WriteEngine;
namespace
{
class Opt
{
public:
int m_debug;
bool m_fg;
Opt(int argc, char *argv[])
:m_debug(0),
m_fg(false)
{
int c;
while ((c = getopt(argc, argv, "df")) != EOF)
{
switch(c)
{
case 'd':
m_debug++;
break;
case 'f':
m_fg= true;
break;
case '?':
default:
break;
}
}
}
};
class ServiceWriteEngine: public Service, public Opt
{
void log(logging::LOG_TYPE type, const std::string &str)
{
logging::LoggingID logid(SUBSYSTEM_ID_WE_SRV);
logging::Message::Args args;
logging::Message msg(1);
args.add(str);
msg.format(args);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(type, msg, logid);
}
int setupResources();
void setupChildSignalHandlers();
public:
ServiceWriteEngine(const Opt &opt)
:Service("WriteEngine"), Opt(opt)
{ }
void LogErrno() override
{
log(logging::LOG_TYPE_CRITICAL, strerror(errno));
}
void ParentLogChildMessage(const std::string &str)
{
log(logging::LOG_TYPE_INFO, str);
}
int Child() override;
int Run() { return m_fg ? Child() : RunForking(); }
};
void added_a_pm(int)
{
logging::LoggingID logid(21, 0, 0);
@ -71,7 +134,7 @@ void added_a_pm(int)
}
}
int setupResources()
int ServiceWriteEngine::setupResources()
{
#ifndef _MSC_VER
struct rlimit rlim;
@ -103,7 +166,7 @@ int setupResources()
}
static void setupSignalHandlers()
void ServiceWriteEngine::setupChildSignalHandlers()
{
#ifndef _MSC_VER
struct sigaction sa;
@ -122,31 +185,8 @@ static void setupSignalHandlers()
}
int main(int argc, char** argv)
int ServiceWriteEngine::Child()
{
// Set locale language
setlocale(LC_ALL, "");
setlocale(LC_NUMERIC, "C");
// Initialize the charset library
my_init();
// This is unset due to the way we start it
program_invocation_short_name = const_cast<char*>("WriteEngineServ");
int gDebug = 0;
int c;
while ((c = getopt(argc, argv, "d")) != EOF)
{
switch (c)
{
case 'd':
gDebug++;
break;
case '?':
default:
break;
}
}
//set BUSY_INIT state
{
@ -162,7 +202,7 @@ int main(int argc, char** argv)
}
}
setupSignalHandlers();
setupChildSignalHandlers();
// Init WriteEngine Wrapper (including Config Columnstore.xml cache)
WriteEngine::WriteEngineWrapper::init( WriteEngine::SUBSYSTEM_ID_WE_SRV );
@ -228,14 +268,14 @@ int main(int argc, char** argv)
logging::LoggingID lid(SUBSYSTEM_ID_WE_SRV);
logging::MessageLog ml(lid);
ml.logCriticalMessage( message );
NotifyServiceInitializationFailed();
return 2;
}
}
}
int err = 0;
if (!gDebug)
if (!m_debug)
err = setupResources();
string errMsg;
@ -278,6 +318,7 @@ int main(int argc, char** argv)
{
}
NotifyServiceInitializationFailed();
return 2;
}
@ -300,6 +341,8 @@ int main(int argc, char** argv)
}
}
cout << "WriteEngineServer is ready" << endl;
NotifyServiceStarted();
BRM::DBRM dbrm;
for (;;)
@ -339,3 +382,18 @@ int main(int argc, char** argv)
return 1;
}
int main(int argc, char** argv)
{
Opt opt(argc, argv);
// Set locale language
setlocale(LC_ALL, "");
setlocale(LC_NUMERIC, "C");
// This is unset due to the way we start it
program_invocation_short_name = const_cast<char*>("WriteEngineServ");
// Initialize the charset library
my_init();
return ServiceWriteEngine(opt).Run();
}