From ab44ef6ddb09b3b2f2a5e6b68944d5961a438374 Mon Sep 17 00:00:00 2001 From: Alexander Barkov Date: Wed, 7 Oct 2020 12:38:09 +0400 Subject: [PATCH] MCOL-4170 Refactor services/systemd units to finish their bootstrap ... --- ddlproc/ddlproc.cpp | 100 +++++++++++-- dmlproc/dmlproc.cpp | 118 ++++++++++++++-- exemgr/main.cpp | 133 +++++++++++++----- .../mcs-controllernode.service.in | 5 +- oam/install_scripts/mcs-ddlproc.service.in | 2 +- oam/install_scripts/mcs-dmlproc.service.in | 2 +- oam/install_scripts/mcs-exemgr.service.in | 2 +- oam/install_scripts/mcs-primproc.service.in | 2 +- .../mcs-storagemanager.service.in | 2 +- oam/install_scripts/mcs-workernode.service.in | 4 +- .../mcs-writeengineserver.service.in | 2 +- primitives/primproc/primitiveserver.cpp | 4 +- primitives/primproc/primitiveserver.h | 4 +- primitives/primproc/primproc.cpp | 104 ++++++++++---- storage-manager/src/main.cpp | 54 ++++++- utils/common/pipe.h | 97 +++++++++++++ utils/common/service.h | 116 +++++++++++++++ utils/threadpool/threadpool.cpp | 2 + versioning/BRM/masternode.cpp | 102 ++++++++++---- versioning/BRM/slavenode.cpp | 85 ++++++++--- writeengine/server/we_server.cpp | 116 +++++++++++---- 21 files changed, 875 insertions(+), 181 deletions(-) create mode 100644 utils/common/pipe.h create mode 100644 utils/common/service.h diff --git a/ddlproc/ddlproc.cpp b/ddlproc/ddlproc.cpp index 8d75dd5c5..0c9970bd9 100644 --- a/ddlproc/ddlproc.cpp +++ b/ddlproc/ddlproc.cpp @@ -66,8 +66,76 @@ using namespace execplan; #include "collation.h" +#include "service.h" + namespace { + +class Opt +{ +public: + int m_debug; + bool m_fg; + Opt(int argc, char *argv[]) + :m_debug(0), + m_fg(false) + { + int c; + while ((c = getopt(argc, argv, "df")) != EOF) + { + switch(c) + { + case 'd': + m_debug++; // TODO: not really used yes + break; + case 'f': + m_fg= true; + break; + case '?': + default: + break; + } + } + } +}; + + +class ServiceDDLProc: public Service, public Opt +{ +protected: + void setupChildSignalHandlers(); + + void log(logging::LOG_TYPE type, const std::string &str) + { + LoggingID logid(23, 0, 0); + Message::Args args; + Message message(8); + args.add(str); + message.format(args); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_CRITICAL, message, logid); + } + +public: + ServiceDDLProc(const Opt &opt) + :Service("DDLProc"), Opt(opt) + { } + void LogErrno() override + { + log(LOG_TYPE_CRITICAL, std::string(strerror(errno))); + } + void ParentLogChildMessage(const std::string &str) override + { + log(LOG_TYPE_INFO, str); + } + int Child() override; + int Run() + { + return m_fg ? Child() : RunForking(); + } +}; + + DistributedEngineComm* Dec; int8_t setupCwd() @@ -95,7 +163,7 @@ void added_a_pm(int) } -static void setupSignalHandlers() +void ServiceDDLProc::setupChildSignalHandlers() { #ifndef _MSC_VER /* set up some signal handlers */ @@ -114,16 +182,8 @@ static void setupSignalHandlers() } -int main(int argc, char* argv[]) +int ServiceDDLProc::Child() { - // Set locale language - setlocale(LC_ALL, ""); - setlocale(LC_NUMERIC, "C"); - // Initialize the charset library - my_init(); - // This is unset due to the way we start it - program_invocation_short_name = const_cast("DDLProc"); - if ( setupCwd() < 0 ) { LoggingID logid(23, 0, 0); @@ -133,6 +193,7 @@ int main(int argc, char* argv[]) msg.format( args1 ); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_CRITICAL, msg, logid); + NotifyServiceInitializationFailed(); return 1; } @@ -147,10 +208,12 @@ int main(int argc, char* argv[]) ResourceManager* rm = ResourceManager::instance(); Dec = DistributedEngineComm::instance(rm); - setupSignalHandlers(); + setupChildSignalHandlers(); ddlprocessor::DDLProcessor ddlprocessor(1, 20); + NotifyServiceStarted(); + { Oam oam; @@ -218,5 +281,20 @@ int main(int argc, char* argv[]) return 0; } + + +int main(int argc, char** argv) +{ + Opt opt(argc, argv); + // Set locale language + setlocale(LC_ALL, ""); + setlocale(LC_NUMERIC, "C"); + // This is unset due to the way we start it + program_invocation_short_name = const_cast("DDLProc"); + // Initialize the charset library + my_init(); + + return ServiceDDLProc(opt).Run(); +} // vim:ts=4 sw=4: diff --git a/dmlproc/dmlproc.cpp b/dmlproc/dmlproc.cpp index 149da4d54..a8b44ca25 100644 --- a/dmlproc/dmlproc.cpp +++ b/dmlproc/dmlproc.cpp @@ -86,10 +86,90 @@ using namespace joblist; #include "collation.h" +#include "service.h" + + threadpool::ThreadPool DMLServer::fDmlPackagepool(10, 0); namespace { + + +class Opt +{ +public: + int m_debug; + bool m_fg; + Opt(int argc, char *argv[]) + :m_debug(0), + m_fg(false) + { + int c; + while ((c = getopt(argc, argv, "df")) != EOF) + { + switch(c) + { + case 'd': + m_debug++; // TODO: not really used yes + break; + case 'f': + m_fg= true; + break; + case '?': + default: + break; + } + } + } +}; + + +class ServiceDMLProc: public Service, public Opt +{ +protected: + int Parent() override + { + /* + Need to shutdown TheadPool, + otherwise it would get stuck when trying to join fPruneThread. + */ + joblist::JobStep::jobstepThreadPool.stop(); + return Service::Parent(); + } + + void log(logging::LOG_TYPE type, const std::string &str) + { + LoggingID logid(23, 0, 0); + Message::Args args; + Message message(8); + args.add(str); + message.format(args); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_CRITICAL, message, logid); + } + + void setupChildSignalHandlers(); + +public: + ServiceDMLProc(const Opt &opt) + :Service("DMLProc"), Opt(opt) + { } + void LogErrno() override + { + log(LOG_TYPE_CRITICAL, std::string(strerror(errno))); + } + void ParentLogChildMessage(const std::string &str) override + { + log(LOG_TYPE_INFO, str); + } + int Child() override; + int Run() + { + return m_fg ? Child() : RunForking(); + } +}; + + DistributedEngineComm* Dec; void added_a_pm(int) @@ -510,7 +590,7 @@ int8_t setupCwd() } // Namewspace -static void setupSignalHandlers() +void ServiceDMLProc::setupChildSignalHandlers() { #ifndef _MSC_VER /* set up some signal handlers */ @@ -530,18 +610,10 @@ static void setupSignalHandlers() } -int main(int argc, char* argv[]) +int ServiceDMLProc::Child() { BRM::DBRM dbrm; Oam oam; - // Set locale language - setlocale(LC_ALL, ""); - setlocale(LC_NUMERIC, "C"); - // Initialize the charset library - my_init(); - - // This is unset due to the way we start it - program_invocation_short_name = const_cast("DMLProc"); Config* cf = Config::makeConfig(); @@ -554,6 +626,7 @@ int main(int argc, char* argv[]) msg.format( args1 ); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_CRITICAL, msg, logid); + NotifyServiceInitializationFailed(); return 1; } @@ -581,6 +654,7 @@ int main(int argc, char* argv[]) msg.format( args1 ); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_CRITICAL, msg, logid); + NotifyServiceInitializationFailed(); return 1; } catch (...) @@ -593,6 +667,7 @@ int main(int argc, char* argv[]) msg.format( args1 ); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_CRITICAL, msg, logid); + NotifyServiceInitializationFailed(); return 1; } @@ -623,6 +698,7 @@ int main(int argc, char* argv[]) ml.logCriticalMessage( message ); cerr << "DMLProc failed to start due to : " << e.what() << endl; + NotifyServiceInitializationFailed(); return 1; } @@ -664,6 +740,9 @@ int main(int argc, char* argv[]) } DMLServer dmlserver(serverThreads, serverQueueSize, &dbrm); + + NotifyServiceStarted(); + ResourceManager* rm = ResourceManager::instance(); // jobstepThreadPool is used by other processes. We can't call @@ -715,11 +794,28 @@ int main(int argc, char* argv[]) Dec = DistributedEngineComm::instance(rm); - setupSignalHandlers(); + setupChildSignalHandlers(); dmlserver.start(); return 1; } + + +int main(int argc, char** argv) +{ + Opt opt(argc, argv); + + // Set locale language + setlocale(LC_ALL, ""); + setlocale(LC_NUMERIC, "C"); + // This is unset due to the way we start it + program_invocation_short_name = const_cast("DMLProc"); + // Initialize the charset library + my_init(); + + return ServiceDMLProc(opt).Run(); +} + // vim:ts=4 sw=4: diff --git a/exemgr/main.cpp b/exemgr/main.cpp index 036fbbb75..b9b02c908 100644 --- a/exemgr/main.cpp +++ b/exemgr/main.cpp @@ -69,6 +69,7 @@ #include "liboamcpp.h" #include "crashtrace.h" #include "utils_utf8.h" +#include "service.h" #include #include @@ -78,6 +79,79 @@ #include "collation.h" + +class Opt +{ +public: + int m_debug; + bool m_e; + bool m_fg; + Opt(int argc, char *argv[]) + :m_debug(0), + m_e(false), + m_fg(false) + { + int c; + while ((c = getopt(argc, argv, "edf")) != EOF) + { + switch (c) + { + case 'd': + m_debug++; + break; + + case 'e': + m_e= true; + break; + + case 'f': + m_fg= true; + break; + + case '?': + default: + break; + } + } + } +}; + + +class ServiceExeMgr: public Service, public Opt +{ +protected: + + void log(logging::LOG_TYPE type, const std::string &str) + { + logging::LoggingID logid(16); + logging::Message::Args args; + logging::Message message(8); + args.add(strerror(errno)); + message.format(args); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(type, message, logid); + } + +public: + ServiceExeMgr(const Opt &opt) + :Service("ExeMgr"), Opt(opt) + { } + void LogErrno() override + { + log(logging::LOG_TYPE_CRITICAL, std::string(strerror(errno))); + } + void ParentLogChildMessage(const std::string &str) override + { + log(logging::LOG_TYPE_INFO, str); + } + int Child() override; + int Run() + { + return m_fg ? Child() : RunForking(); + } +}; + + namespace { @@ -1435,38 +1509,10 @@ void cleanTempDir() } -int main(int argc, char* argv[]) +int ServiceExeMgr::Child() { - // Set locale language - setlocale(LC_ALL, ""); - setlocale(LC_NUMERIC, "C"); - // Initialize the charset library - my_init(); - // This is unset due to the way we start it - program_invocation_short_name = const_cast("ExeMgr"); - - gDebug = 0; - bool eFlg = false; - int c; - - opterr = 0; - - while ((c = getopt(argc, argv, "ed")) != EOF) - switch (c) - { - case 'd': - gDebug++; - break; - - case 'e': - eFlg = true; - break; - - case '?': - default: - break; - } + gDebug= m_debug; //set BUSY_INIT state { @@ -1486,13 +1532,13 @@ int main(int argc, char* argv[]) #else // Make sure CSC thinks it's on a UM or else bucket reuse stuff below will stall - if (!eFlg) + if (!m_e) setenv("CALPONT_CSC_IDENT", "um", 1); #endif setupSignalHandlers(); int err = 0; - if (!gDebug) + if (!m_debug) err = setupResources(); std::string errMsg; @@ -1538,6 +1584,7 @@ int main(int argc, char* argv[]) { } + NotifyServiceInitializationFailed(); return 2; } @@ -1633,6 +1680,8 @@ int main(int argc, char* argv[]) } } + NotifyServiceStarted(); + std::cout << "Starting ExeMgr: st = " << serverThreads << ", qs = " << rm->getEmExecQueueSize() << ", mx = " << maxPct << ", cf = " << rm->getConfig()->configFile() << std::endl; @@ -1676,4 +1725,24 @@ int main(int argc, char* argv[]) return 0; } + + +int main(int argc, char* argv[]) +{ + opterr = 0; + Opt opt(argc, argv); + + // Set locale language + setlocale(LC_ALL, ""); + setlocale(LC_NUMERIC, "C"); + + // This is unset due to the way we start it + program_invocation_short_name = const_cast("ExeMgr"); + + // Initialize the charset library + my_init(); + + return ServiceExeMgr(opt).Run(); +} + // vim:ts=4 sw=4: diff --git a/oam/install_scripts/mcs-controllernode.service.in b/oam/install_scripts/mcs-controllernode.service.in index 4764c9fb0..82f7cc51f 100644 --- a/oam/install_scripts/mcs-controllernode.service.in +++ b/oam/install_scripts/mcs-controllernode.service.in @@ -6,15 +6,14 @@ PartOf=mcs-workernode@1.service After=network.target mcs-workernode@1.service [Service] -Type=simple +Type=forking User=@DEFAULT_USER@ Group=@DEFAULT_GROUP@ LimitNOFILE=65536 LimitNPROC=65536 -ExecStartPre=/bin/sleep 7 -ExecStart=@ENGINE_BINDIR@/controllernode fg +ExecStart=@ENGINE_BINDIR@/controllernode ExecStop=@ENGINE_BINDIR@/mcs-stop-controllernode.sh $MAINPID Restart=on-failure diff --git a/oam/install_scripts/mcs-ddlproc.service.in b/oam/install_scripts/mcs-ddlproc.service.in index 3d28c47cb..d5e3aa220 100644 --- a/oam/install_scripts/mcs-ddlproc.service.in +++ b/oam/install_scripts/mcs-ddlproc.service.in @@ -6,7 +6,7 @@ PartOf=mcs-writeengineserver.service After=network.target mcs-dmlproc.service [Service] -Type=simple +Type=forking User=@DEFAULT_USER@ Group=@DEFAULT_GROUP@ diff --git a/oam/install_scripts/mcs-dmlproc.service.in b/oam/install_scripts/mcs-dmlproc.service.in index c653f3748..2ccaf29af 100644 --- a/oam/install_scripts/mcs-dmlproc.service.in +++ b/oam/install_scripts/mcs-dmlproc.service.in @@ -6,7 +6,7 @@ PartOf=mcs-writeengineserver.service After=network.target mcs-writeengineserver.service [Service] -Type=simple +Type=forking User=@DEFAULT_USER@ Group=@DEFAULT_GROUP@ diff --git a/oam/install_scripts/mcs-exemgr.service.in b/oam/install_scripts/mcs-exemgr.service.in index 44f436ded..8bf24431c 100644 --- a/oam/install_scripts/mcs-exemgr.service.in +++ b/oam/install_scripts/mcs-exemgr.service.in @@ -6,7 +6,7 @@ PartOf=mcs-primproc.service After=network.target mcs-primproc.service [Service] -Type=simple +Type=forking User=@DEFAULT_USER@ Group=@DEFAULT_GROUP@ diff --git a/oam/install_scripts/mcs-primproc.service.in b/oam/install_scripts/mcs-primproc.service.in index f19b68ddf..d963e3e69 100644 --- a/oam/install_scripts/mcs-primproc.service.in +++ b/oam/install_scripts/mcs-primproc.service.in @@ -7,7 +7,7 @@ PartOf=mcs-controllernode.service After=network.target mcs-workernode@1.service mcs-workernode@2.service mcs-controllernode.service [Service] -Type=simple +Type=forking User=@DEFAULT_USER@ Group=@DEFAULT_GROUP@ diff --git a/oam/install_scripts/mcs-storagemanager.service.in b/oam/install_scripts/mcs-storagemanager.service.in index 7acc6cd62..cf457dfdf 100644 --- a/oam/install_scripts/mcs-storagemanager.service.in +++ b/oam/install_scripts/mcs-storagemanager.service.in @@ -2,7 +2,7 @@ Description=storagemanager [Service] -Type=simple +Type=forking User=@DEFAULT_USER@ Group=@DEFAULT_GROUP@ diff --git a/oam/install_scripts/mcs-workernode.service.in b/oam/install_scripts/mcs-workernode.service.in index c235f478c..c75c6788b 100644 --- a/oam/install_scripts/mcs-workernode.service.in +++ b/oam/install_scripts/mcs-workernode.service.in @@ -3,14 +3,14 @@ Description=mcs-workernode After=network.target mcs-loadbrm.service [Service] -Type=simple +Type=forking User=@DEFAULT_USER@ Group=@DEFAULT_GROUP@ LimitNOFILE=65536 LimitNPROC=65536 -ExecStart=@ENGINE_BINDIR@/workernode DBRM_Worker%i fg +ExecStart=@ENGINE_BINDIR@/workernode DBRM_Worker%i ExecStopPost=@ENGINE_BINDIR@/mcs-savebrm.py ExecStopPost=/usr/bin/env bash -c "clearShm > /dev/null 2>&1" diff --git a/oam/install_scripts/mcs-writeengineserver.service.in b/oam/install_scripts/mcs-writeengineserver.service.in index 801d8b7fb..2e783a286 100644 --- a/oam/install_scripts/mcs-writeengineserver.service.in +++ b/oam/install_scripts/mcs-writeengineserver.service.in @@ -6,7 +6,7 @@ PartOf=mcs-exemgr.service After=network.target mcs-exemgr.service [Service] -Type=simple +Type=forking User=@DEFAULT_USER@ Group=@DEFAULT_GROUP@ diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index b41736566..8e2ec1155 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -2493,7 +2493,7 @@ PrimitiveServer::~PrimitiveServer() { } -void PrimitiveServer::start() +void PrimitiveServer::start(Service *service) { // start all the server threads for ( int i = 1; i <= fServerThreads; i++) @@ -2515,6 +2515,8 @@ void PrimitiveServer::start() catch (...) {} } + service->NotifyServiceStarted(); + fServerpool.wait(); cerr << "PrimitiveServer::start() exiting!" << endl; diff --git a/primitives/primproc/primitiveserver.h b/primitives/primproc/primitiveserver.h index 6fa680475..033b9fbf8 100644 --- a/primitives/primproc/primitiveserver.h +++ b/primitives/primproc/primitiveserver.h @@ -41,6 +41,8 @@ #include "blockrequestprocessor.h" #include "batchprimitiveprocessor.h" +#include "service.h" + //#define PRIMPROC_STOPWATCH #ifdef PRIMPROC_STOPWATCH #include "stopwatch.h" @@ -140,7 +142,7 @@ public: /** @brief start the primitive server * */ - void start(); + void start(Service *p); /** @brief get a pointer the shared processor thread pool */ diff --git a/primitives/primproc/primproc.cpp b/primitives/primproc/primproc.cpp index 6a10c1c95..f282f8a7a 100644 --- a/primitives/primproc/primproc.cpp +++ b/primitives/primproc/primproc.cpp @@ -76,6 +76,61 @@ using namespace idbdatafile; #include "collation.h" +#include "service.h" + + +class Opt +{ +public: + int m_debug; + bool m_fg; + Opt(int argc, char *argv[]) + :m_debug(0), + m_fg(false) + { + int c; + + while ((c = getopt(argc, argv, "df")) != EOF) + { + switch(c) + { + case 'd': + m_debug++; + break; + case 'f': + m_fg= true; + break; + case '?': + default: + break; + } + } + } +}; + + +class ServicePrimProc: public Service, public Opt +{ +public: + ServicePrimProc(const Opt &opt) + :Service("PrimProc"), Opt(opt) + { } + void LogErrno() override + { + cerr << strerror(errno) << endl; + } + void ParentLogChildMessage(const std::string &str) override + { + cout << str << endl; + } + int Child() override; + int Run() + { + return m_fg ? Child() : RunForking(); + } +}; + + namespace primitiveprocessor { @@ -311,33 +366,9 @@ void* waitForSIGUSR1(void* p) } -int main(int argc, char* argv[]) + +int ServicePrimProc::Child() { - // This is unset due to the way we start it - program_invocation_short_name = const_cast("PrimProc"); - - // Set locale language - setlocale(LC_ALL, ""); - setlocale(LC_NUMERIC, "C"); - // Initialize the charset library - my_init(); - - int gDebug = 0; - int c; - - while ((c = getopt(argc, argv, "d")) != EOF) - { - switch(c) - { - case 'd': - gDebug++; - break; - case '?': - default: - break; - } - } - Config* cf = Config::makeConfig(); setupSignalHandlers(); @@ -347,7 +378,7 @@ int main(int argc, char* argv[]) mlp = new primitiveprocessor::Logger(); - if (!gDebug) + if (!m_debug) err = setupResources(); string errMsg; @@ -388,6 +419,7 @@ int main(int argc, char* argv[]) { } + NotifyServiceInitializationFailed(); return 2; } @@ -767,10 +799,26 @@ int main(int argc, char* argv[]) } #endif - server.start(); + server.start(this); cerr << "server.start() exited!" << endl; return 1; } + + +int main(int argc, char** argv) +{ + Opt opt(argc, argv); + + // Set locale language + setlocale(LC_ALL, ""); + setlocale(LC_NUMERIC, "C"); + // This is unset due to the way we start it + program_invocation_short_name = const_cast("PrimProc"); + // Initialize the charset library + my_init(); + + return ServicePrimProc(opt).Run(); +} // vim:ts=4 sw=4: diff --git a/storage-manager/src/main.cpp b/storage-manager/src/main.cpp index 63f09923f..f6a85dc1a 100644 --- a/storage-manager/src/main.cpp +++ b/storage-manager/src/main.cpp @@ -35,9 +35,48 @@ using namespace std; #include "Synchronizer.h" #include "Replicator.h" #include "crashtrace.h" +#include "service.h" using namespace storagemanager; + +class Opt +{ +public: + const char *m_progname; + bool m_fg; + Opt(int argc, char **argv) + :m_progname(argv[0]), + m_fg(argc >= 2 && string(argv[1]) == "fg") + { } +}; + + +class ServiceStorageManager: public Service, public Opt +{ +protected: + void setupChildSignalHandlers(); + +public: + ServiceStorageManager(const Opt &opt) + :Service("StorageManager"), Opt(opt) + { } + void LogErrno() override + { + SMLogging::get()->log(LOG_ERR, "%s", strerror(errno)); + } + void ParentLogChildMessage(const std::string &str) override + { + SMLogging::get()->log(LOG_INFO, "%.*s", (int) str.length(), str.data()); + } + int Child() override; + int Run() + { + return m_fg ? Child() : RunForking(); + } +}; + + bool signalCaught = false; void printCacheUsage(int sig) @@ -75,7 +114,7 @@ void coreSM(int sig) } -static void setupSignalHandlers() +void ServiceStorageManager::setupChildSignalHandlers() { struct sigaction sa; memset(&sa, 0, sizeof(sa)); @@ -106,7 +145,7 @@ static void setupSignalHandlers() } -int main(int argc, char** argv) +int ServiceStorageManager::Child() { SMLogging* logger = SMLogging::get(); @@ -129,14 +168,16 @@ int main(int argc, char** argv) return -1; } - setupSignalHandlers(); - + setupChildSignalHandlers(); + int ret = 0; logger->log(LOG_NOTICE,"StorageManager started."); SessionManager* sm = SessionManager::get(); + NotifyServiceStarted(); + ret = sm->start(); cache->shutdown(); @@ -150,3 +191,8 @@ int main(int argc, char** argv) return ret; } + +int main(int argc, char** argv) +{ + return ServiceStorageManager(Opt(argc, argv)).Run(); +} diff --git a/utils/common/pipe.h b/utils/common/pipe.h new file mode 100644 index 000000000..427f17ac0 --- /dev/null +++ b/utils/common/pipe.h @@ -0,0 +1,97 @@ +/* Copyright (C) 2020 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#ifndef PIPE_H_INCLUDED +#define PIPE_H_INCLUDED + +/* + A helper class to hold the file descriptors returned from a pipe() call. +*/ +class Pipe +{ + int fd[2]; +public: + Pipe() + { + fd[0]= 0; + fd[1]= 0; + } + ~Pipe() + { + close(); + } + bool is_open_for_read() const + { + return fd[0] > 0; + } + bool is_open_for_write() const + { + return fd[1] > 0; + } + bool open() + { + return ::pipe(fd) == -1; + } + bool init() // TODO: remove this + { + return open(); + } + ssize_t read(char *str, size_t nbytes) + { + return ::read(fd[0], str, nbytes); + } + ssize_t readtm(const struct timeval &tv, void *buf, size_t nbytes) + { + fd_set rfds; + FD_ZERO(&rfds); + FD_SET(fd[0], &rfds); + struct timeval tmptv = tv; + int retval = select(fd[0] + 1, &rfds, NULL, NULL, &tmptv); + if (retval == -1) + return -1; + if (!retval) + { + errno= ETIME; + return -1; + } + return ::read(fd[0], buf, nbytes); + } + ssize_t write(const char *str, size_t nbytes) + { + return ::write(fd[1], str, nbytes); + } + ssize_t write(const std::string &str) + { + return write(str.data(), str.length()); + } + void close() + { + if (fd[0]) + { + ::close(fd[0]); + fd[0]= 0; + } + if (fd[1]) + { + ::close(fd[1]); + fd[1]= 0; + } + } +}; + + +#endif // PIPE_H_INCLUDED diff --git a/utils/common/service.h b/utils/common/service.h new file mode 100644 index 000000000..041fd1eda --- /dev/null +++ b/utils/common/service.h @@ -0,0 +1,116 @@ +/* Copyright (C) 2020 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#ifndef SERVICE_H_INCLUDED +#define SERVICE_H_INCLUDED + +#include +#include "pipe.h" + + +class Service +{ +protected: + // The service name, for logging + const std::string m_name; + // The pipe to send messages from the child to the parent + Pipe m_pipe; + + + static void common_signal_handler_CHLD(int sig) + { } + + void InitCommonSignalHandlers() + { + struct sigaction sa; + memset(&sa, 0, sizeof(sa)); + sa.sa_handler = common_signal_handler_CHLD; + sigaction(SIGCHLD, &sa, NULL); + } + + int RunForking() + { + int err; + InitCommonSignalHandlers(); + + if (m_pipe.open() || (err = fork()) < 0) + { + // Pipe or fork failed + LogErrno(); + return 1; + } + + if (err > 0) // Parent + return Parent(); + + return Child(); + } + + virtual int Parent() + { + char str[100]; + // Read the message from the child + ssize_t nbytes= m_pipe.readtm({120,0}, str, sizeof(str)); + if (nbytes >= 0) + { + ParentLogChildMessage(std::string(str, nbytes)); + } + else + { + // read() failed + LogErrno(); + return 1; + } + return 0; + } + + +public: + Service(const std::string &name) + :m_name(name) + { } + virtual ~Service() { } + + void NotifyServiceStarted() + { + if (m_pipe.is_open_for_write()) + { + std::ostringstream str; + str << m_name << " main process has started"; + m_pipe.write(str.str()); + } + } + + void NotifyServiceInitializationFailed() + { + if (m_pipe.is_open_for_write()) + { + std::ostringstream str; + str << m_name << " main process initialization failed"; + m_pipe.write(str.str()); + } + } + + // Used by both Parent and Child to log errors + virtual void LogErrno()= 0; + // Used by Parent to log an initialization notification message from child + virtual void ParentLogChildMessage(const std::string &str)= 0; + // The main service process job + virtual int Child()= 0; +}; + +#endif // SERVICE_H_INCLUDED diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index fbaadbd93..40a107424 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -130,6 +130,8 @@ void ThreadPool::setMaxThreads(size_t maxThreads) void ThreadPool::stop() { boost::mutex::scoped_lock lock1(fMutex); + if (fStop) + return; // Was stopped earlier fStop = true; lock1.unlock(); diff --git a/versioning/BRM/masternode.cpp b/versioning/BRM/masternode.cpp index e9826e367..7b85ae1b5 100644 --- a/versioning/BRM/masternode.cpp +++ b/versioning/BRM/masternode.cpp @@ -35,13 +35,15 @@ #include "IDBPolicy.h" #include "brmtypes.h" #include "utils_utf8.h" +#include "service.h" +#include "jobstep.h" #include "crashtrace.h" #define MAX_RETRIES 10 BRM::MasterDBRMNode* m; -bool die; +bool die= false; using namespace std; using namespace BRM; @@ -60,6 +62,48 @@ void fail() } } + +class Opt +{ +protected: + const char *m_progname; + bool m_fg; +public: + Opt(int argc, char **argv) + :m_progname(argv[0]), + m_fg(argc >= 2 && string(argv[1]) == "fg") + { } +}; + + +class ServiceControllerNode: public Service, public Opt +{ +protected: + void setupChildSignalHandlers(); + +public: + ServiceControllerNode(const Opt &opt) + :Service("ControllerNode"), Opt(opt) + { } + void LogErrno() override + { + perror(m_progname); + log_errno(std::string(m_progname)); + fail(); + } + void ParentLogChildMessage(const std::string &str) override + { + log(str, logging::LOG_TYPE_INFO); + } + int Child() override; + int Run() + { + return m_fg ? Child() : RunForking(); + } +}; + + + void stop(int num) { #ifdef BRM_VERBOSE @@ -101,7 +145,7 @@ void reload(int num) */ -static void setupSignalHandlers() +void ServiceControllerNode::setupChildSignalHandlers() { /* XXXPAT: we might want to install signal handlers for every signal */ @@ -122,37 +166,13 @@ static void setupSignalHandlers() } -int main(int argc, char** argv) +int ServiceControllerNode::Child() { - // Set locale language - setlocale(LC_ALL, ""); - setlocale(LC_NUMERIC, "C"); - - - BRM::logInit ( BRM::SubSystemLogId_controllerNode ); - - int retries = 0, err; - std::string arg; - - die = false; - - if (!(argc >= 2 && (arg = argv[1]) == "fg")) - { - if ((err = fork()) < 0) - { - perror(argv[0]); - log_errno(string(argv[0])); - fail(); - exit(1); - } - - if (err > 0) - exit(0); - } + int retries = 0; (void)config::Config::makeConfig(); - setupSignalHandlers(); + setupChildSignalHandlers(); idbdatafile::IDBPolicy::configIDBPolicy(); @@ -167,6 +187,8 @@ int main(int argc, char** argv) m = new BRM::MasterDBRMNode(); + NotifyServiceStarted(); + try { oam::Oam oam; @@ -203,11 +225,31 @@ int main(int argc, char** argv) if (retries == MAX_RETRIES) { + NotifyServiceInitializationFailed(); log(string("Exiting after too many errors")); fail(); + return 1; } std::cerr << "Exiting..." << std::endl; - exit(0); + return 0; } + +int main(int argc, char** argv) +{ + Opt opt(argc, argv); + // Set locale language + setlocale(LC_ALL, ""); + setlocale(LC_NUMERIC, "C"); + + BRM::logInit ( BRM::SubSystemLogId_controllerNode ); + + /* + Need to shutdown TheadPool before fork(), + otherwise it would get stuck when trying to join fPruneThread. + */ + joblist::JobStep::jobstepThreadPool.stop(); + + return ServiceControllerNode(opt).Run(); +} diff --git a/versioning/BRM/slavenode.cpp b/versioning/BRM/slavenode.cpp index e9335dad0..b8db2c3ae 100644 --- a/versioning/BRM/slavenode.cpp +++ b/versioning/BRM/slavenode.cpp @@ -39,12 +39,14 @@ #include "IDBPolicy.h" #include "crashtrace.h" +#include "service.h" +#include "jobstep.h" using namespace BRM; using namespace std; SlaveComm* comm; -bool die; +bool die= false; boost::thread_group monitorThreads; void fail() @@ -61,6 +63,49 @@ void fail() } } + +class Opt +{ +protected: + const char *m_progname; + const char *m_nodename; + bool m_fg; +public: + Opt(int argc, char **argv) + :m_progname(argv[0]), + m_nodename(argc > 1 ? argv[1] : nullptr), + m_fg(argc > 2 && string(argv[2]) == "fg") + { } +}; + + +class ServiceWorkerNode: public Service, public Opt +{ +protected: + void setupChildSignalHandlers(); + +public: + ServiceWorkerNode(const Opt &opt) + :Service("WorkerNode"), Opt(opt) + { } + void LogErrno() override + { + perror(m_progname); + log_errno(std::string(m_progname)); + fail(); + } + void ParentLogChildMessage(const std::string &str) override + { + log(str, logging::LOG_TYPE_INFO); + } + int Child() override; + int Run() + { + return m_fg ? Child() : RunForking(); + } +}; + + void stop(int sig) { if (!die) @@ -77,7 +122,7 @@ void reset(int sig) } -static void setupSignalHandlers() +void ServiceWorkerNode::setupChildSignalHandlers() { #ifdef SIGHUP signal(SIGHUP, reset); @@ -97,16 +142,17 @@ static void setupSignalHandlers() } -static int child(const string& nodeName) +int ServiceWorkerNode::Child() { - setupSignalHandlers(); + setupChildSignalHandlers(); SlaveDBRMNode slave; ShmKeys keys; try { - comm = new SlaveComm(nodeName, &slave); + comm = new SlaveComm(std::string(m_nodename), &slave); + NotifyServiceStarted(); } catch (exception& e) { @@ -115,7 +161,8 @@ static int child(const string& nodeName) cerr << os.str() << endl; log(os.str()); fail(); - exit(1); + NotifyServiceInitializationFailed(); + return 1; } /* Start 4 threads to monitor write lock state */ @@ -161,14 +208,19 @@ static int child(const string& nodeName) int main(int argc, char** argv) { + Opt opt(argc, argv); + // Set locale language setlocale(LC_ALL, ""); setlocale(LC_NUMERIC, "C"); BRM::logInit ( BRM::SubSystemLogId_workerNode ); - string arg; - int err = 0; + /* + Need to shutdown TheadPool before fork(), + otherwise it would get stuck when trying to join fPruneThread. + */ + joblist::JobStep::jobstepThreadPool.stop(); if (argc < 2) { @@ -180,21 +232,8 @@ int main(int argc, char** argv) exit(1); } + // TODO: this should move to child() probably, like in masternode.cpp idbdatafile::IDBPolicy::configIDBPolicy(); - if (!(argc >= 3 && (arg = argv[2]) == "fg")) - err = fork(); - - if (err == 0) - { - return child(argv[1]); - } - else if (err < 0) - { - perror(argv[0]); - log_errno(string(argv[0])); - fail(); - } - - exit(0); + return ServiceWorkerNode(opt).Run(); } diff --git a/writeengine/server/we_server.cpp b/writeengine/server/we_server.cpp index 321a7ba0f..40f3ef5a2 100644 --- a/writeengine/server/we_server.cpp +++ b/writeengine/server/we_server.cpp @@ -54,10 +54,73 @@ using namespace oam; #include "collation.h" +#include "service.h" + using namespace WriteEngine; namespace { + +class Opt +{ +public: + int m_debug; + bool m_fg; + Opt(int argc, char *argv[]) + :m_debug(0), + m_fg(false) + { + int c; + while ((c = getopt(argc, argv, "df")) != EOF) + { + switch(c) + { + case 'd': + m_debug++; + break; + case 'f': + m_fg= true; + break; + case '?': + default: + break; + } + } + } +}; + + +class ServiceWriteEngine: public Service, public Opt +{ + void log(logging::LOG_TYPE type, const std::string &str) + { + logging::LoggingID logid(SUBSYSTEM_ID_WE_SRV); + logging::Message::Args args; + logging::Message msg(1); + args.add(str); + msg.format(args); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(type, msg, logid); + } + int setupResources(); + void setupChildSignalHandlers(); +public: + ServiceWriteEngine(const Opt &opt) + :Service("WriteEngine"), Opt(opt) + { } + void LogErrno() override + { + log(logging::LOG_TYPE_CRITICAL, strerror(errno)); + } + void ParentLogChildMessage(const std::string &str) + { + log(logging::LOG_TYPE_INFO, str); + } + int Child() override; + int Run() { return m_fg ? Child() : RunForking(); } +}; + + void added_a_pm(int) { logging::LoggingID logid(21, 0, 0); @@ -71,7 +134,7 @@ void added_a_pm(int) } } -int setupResources() +int ServiceWriteEngine::setupResources() { #ifndef _MSC_VER struct rlimit rlim; @@ -103,7 +166,7 @@ int setupResources() } -static void setupSignalHandlers() +void ServiceWriteEngine::setupChildSignalHandlers() { #ifndef _MSC_VER struct sigaction sa; @@ -122,31 +185,8 @@ static void setupSignalHandlers() } -int main(int argc, char** argv) +int ServiceWriteEngine::Child() { - // Set locale language - setlocale(LC_ALL, ""); - setlocale(LC_NUMERIC, "C"); - // Initialize the charset library - my_init(); - - // This is unset due to the way we start it - program_invocation_short_name = const_cast("WriteEngineServ"); - - int gDebug = 0; - int c; - while ((c = getopt(argc, argv, "d")) != EOF) - { - switch (c) - { - case 'd': - gDebug++; - break; - case '?': - default: - break; - } - } //set BUSY_INIT state { @@ -162,7 +202,7 @@ int main(int argc, char** argv) } } - setupSignalHandlers(); + setupChildSignalHandlers(); // Init WriteEngine Wrapper (including Config Columnstore.xml cache) WriteEngine::WriteEngineWrapper::init( WriteEngine::SUBSYSTEM_ID_WE_SRV ); @@ -228,14 +268,14 @@ int main(int argc, char** argv) logging::LoggingID lid(SUBSYSTEM_ID_WE_SRV); logging::MessageLog ml(lid); ml.logCriticalMessage( message ); - + NotifyServiceInitializationFailed(); return 2; } } } int err = 0; - if (!gDebug) + if (!m_debug) err = setupResources(); string errMsg; @@ -278,6 +318,7 @@ int main(int argc, char** argv) { } + NotifyServiceInitializationFailed(); return 2; } @@ -300,6 +341,8 @@ int main(int argc, char** argv) } } cout << "WriteEngineServer is ready" << endl; + NotifyServiceStarted(); + BRM::DBRM dbrm; for (;;) @@ -339,3 +382,18 @@ int main(int argc, char** argv) return 1; } + +int main(int argc, char** argv) +{ + Opt opt(argc, argv); + + // Set locale language + setlocale(LC_ALL, ""); + setlocale(LC_NUMERIC, "C"); + // This is unset due to the way we start it + program_invocation_short_name = const_cast("WriteEngineServ"); + // Initialize the charset library + my_init(); + + return ServiceWriteEngine(opt).Run(); +}