From 7453db6b02cf65d974704fedb51fdbfeea664174 Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Mon, 4 Apr 2022 12:50:42 +0000 Subject: [PATCH] MCOL-5001 This patch removes ExeMgr traces --- CMakeLists.txt | 1 - debian/mariadb-plugin-columnstore.install | 1 - exemgr/CMakeLists.txt | 20 - exemgr/activestatementcounter.cpp | 59 -- exemgr/activestatementcounter.h | 64 -- exemgr/exemgr.cpp | 102 -- exemgr/femsghandler.cpp | 143 --- exemgr/femsghandler.h | 47 - exemgr/rssmonfcn.cpp | 69 -- exemgr/rssmonfcn.h | 36 - exemgr/serviceexemgr.cpp | 475 --------- exemgr/serviceexemgr.h | 343 ------ exemgr/sqlfrontsessionthread.cpp | 985 ------------------ exemgr/sqlfrontsessionthread.h | 131 --- oam/install_scripts/CMakeLists.txt | 14 +- .../columnstore-post-install.in | 26 +- .../columnstore-pre-uninstall.in | 14 +- .../mariadb-columnstore-start.sh.in | 1 - .../mariadb-columnstore-stop.sh | 1 - oam/install_scripts/mcs-exemgr.service.in | 23 - .../mcs-writeengineserver.service.in | 6 +- 21 files changed, 30 insertions(+), 2531 deletions(-) delete mode 100644 exemgr/CMakeLists.txt delete mode 100644 exemgr/activestatementcounter.cpp delete mode 100644 exemgr/activestatementcounter.h delete mode 100644 exemgr/exemgr.cpp delete mode 100644 exemgr/femsghandler.cpp delete mode 100644 exemgr/femsghandler.h delete mode 100644 exemgr/rssmonfcn.cpp delete mode 100644 exemgr/rssmonfcn.h delete mode 100644 exemgr/serviceexemgr.cpp delete mode 100644 exemgr/serviceexemgr.h delete mode 100644 exemgr/sqlfrontsessionthread.cpp delete mode 100644 exemgr/sqlfrontsessionthread.h delete mode 100644 oam/install_scripts/mcs-exemgr.service.in diff --git a/CMakeLists.txt b/CMakeLists.txt index 1be5206fd..a2fcc7954 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -343,7 +343,6 @@ ADD_SUBDIRECTORY(dbcon/ddlpackage) ADD_SUBDIRECTORY(dbcon/ddlpackageproc) ADD_SUBDIRECTORY(dbcon/dmlpackage) ADD_SUBDIRECTORY(dbcon/dmlpackageproc) -ADD_SUBDIRECTORY(exemgr) ADD_SUBDIRECTORY(ddlproc) ADD_SUBDIRECTORY(dmlproc) ADD_SUBDIRECTORY(oamapps) diff --git a/debian/mariadb-plugin-columnstore.install b/debian/mariadb-plugin-columnstore.install index 20241ff57..a921e36f2 100644 --- a/debian/mariadb-plugin-columnstore.install +++ b/debian/mariadb-plugin-columnstore.install @@ -105,7 +105,6 @@ usr/share/columnstore/mariadb-columnstore.service usr/share/columnstore/mcs-controllernode.service usr/share/columnstore/mcs-ddlproc.service usr/share/columnstore/mcs-dmlproc.service -usr/share/columnstore/mcs-exemgr.service usr/share/columnstore/mcs-loadbrm.service usr/share/columnstore/mcs-primproc.service usr/share/columnstore/mcs-storagemanager.service diff --git a/exemgr/CMakeLists.txt b/exemgr/CMakeLists.txt deleted file mode 100644 index e0d17d205..000000000 --- a/exemgr/CMakeLists.txt +++ /dev/null @@ -1,20 +0,0 @@ - -# include_directories( ${ENGINE_COMMON_INCLUDES} ) - - -########### next target ############### - -# set(ExeMgr_SRCS serviceexemgr.cpp sqlfrontsessionthread.cpp rssmonfcn.cpp exemgr.cpp activestatementcounter.cpp femsghandler.cpp ../utils/common/crashtrace.cpp) - -# add_executable(ExeMgr ${ExeMgr_SRCS}) - -# target_link_libraries(ExeMgr ${ENGINE_LDFLAGS} ${ENGINE_EXEC_LIBS} ${NETSNMP_LIBRARIES} ${MARIADB_CLIENT_LIBS} cacheutils threadpool) - -# target_include_directories(ExeMgr PRIVATE ${Boost_INCLUDE_DIRS}) - -# install(TARGETS ExeMgr DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-engine) - - -########### install files ############### - - diff --git a/exemgr/activestatementcounter.cpp b/exemgr/activestatementcounter.cpp deleted file mode 100644 index b25e61549..000000000 --- a/exemgr/activestatementcounter.cpp +++ /dev/null @@ -1,59 +0,0 @@ -/* Copyright (C) 2014 InfiniDB, Inc. - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, - MA 02110-1301, USA. */ - -// $Id: activestatementcounter.cpp 940 2013-01-21 14:11:31Z rdempsey $ -// - -#include -#include -using namespace boost; - -#include "activestatementcounter.h" - -void ActiveStatementCounter::incr(bool& counted) -{ - if (counted) - return; - - counted = true; - boost::mutex::scoped_lock lk(fMutex); - - if (upperLimit > 0) - while (fStatementCount >= upperLimit) - { - fStatementsWaiting++; - condvar.wait(lk); - --fStatementsWaiting; - } - - fStatementCount++; -} - -void ActiveStatementCounter::decr(bool& counted) -{ - if (!counted) - return; - - counted = false; - boost::mutex::scoped_lock lk(fMutex); - - if (fStatementCount == 0) - return; - - --fStatementCount; - condvar.notify_one(); -} diff --git a/exemgr/activestatementcounter.h b/exemgr/activestatementcounter.h deleted file mode 100644 index 87e7163aa..000000000 --- a/exemgr/activestatementcounter.h +++ /dev/null @@ -1,64 +0,0 @@ -/* Copyright (C) 2014 InfiniDB, Inc. - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, - MA 02110-1301, USA. */ - -// $Id: activestatementcounter.h 940 2013-01-21 14:11:31Z rdempsey $ -// -/** @file */ - -#pragma once - -#include - -#include -#include - -#include "vss.h" - -class ActiveStatementCounter -{ - public: - ActiveStatementCounter(uint32_t limit) : fStatementCount(0), upperLimit(limit), fStatementsWaiting(0) - { - } - - virtual ~ActiveStatementCounter() - { - } - - void incr(bool& counted); - void decr(bool& counted); - uint32_t cur() const - { - return fStatementCount; - } - uint32_t waiting() const - { - return fStatementsWaiting; - } - - private: - ActiveStatementCounter(const ActiveStatementCounter& rhs); - ActiveStatementCounter& operator=(const ActiveStatementCounter& rhs); - - uint32_t fStatementCount; - uint32_t upperLimit; - uint32_t fStatementsWaiting; - boost::mutex fMutex; - boost::condition condvar; - BRM::VSS fVss; -}; - diff --git a/exemgr/exemgr.cpp b/exemgr/exemgr.cpp deleted file mode 100644 index fa1acec8d..000000000 --- a/exemgr/exemgr.cpp +++ /dev/null @@ -1,102 +0,0 @@ -/* Copyright (C) 2014 InfiniDB, Inc. - Copyright (C) 2019 MariaDB Corporation - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, - MA 02110-1301, USA. */ - -/********************************************************************** - * $Id: main.cpp 1000 2013-07-24 21:05:51Z pleblanc $ - * - * - ***********************************************************************/ -/** - * @brief execution plan manager main program - * - * This is the ?main? program for dealing with execution plans and - * result sets. It sits in a loop waiting for CalpontExecutionPlan - * from the FEP. It then passes the CalpontExecutionPlan to the - * JobListFactory from which a JobList is obtained. This is passed - * to to the Query Manager running in the real-time portion of the - * EC. The ExecutionPlanManager waits until the Query Manager - * returns a result set for the job list. These results are passed - * into the CalpontResultFactory, which outputs a CalpontResultSet. - * The ExecutionPlanManager passes the CalpontResultSet into the - * VendorResultFactory which produces a result set tailored to the - * specific DBMS front end in use. The ExecutionPlanManager then - * sends the VendorResultSet back to the Calpont Database Connector - * on the Front-End Processor where it is returned to the DBMS - * front-end. - */ -#include -#include -#include -#include - -#undef root_name -#include - -#include "calpontselectexecutionplan.h" -#include "mcsanalyzetableexecutionplan.h" -#include "activestatementcounter.h" -#include "distributedenginecomm.h" -#include "resourcemanager.h" -#include "configcpp.h" -#include "queryteleserverparms.h" -#include "iosocket.h" -#include "joblist.h" -#include "joblistfactory.h" -#include "oamcache.h" -#include "simplecolumn.h" -#include "bytestream.h" -#include "telestats.h" -#include "messageobj.h" -#include "messagelog.h" -#include "sqllogger.h" -#include "femsghandler.h" -#include "idberrorinfo.h" -#include "MonitorProcMem.h" -#include "liboamcpp.h" -#include "crashtrace.h" -#include "service.h" - -#include -#include -#include - -#include "dbrm.h" - -#include "mariadb_my_sys.h" -#include "statistics.h" -#include "serviceexemgr.h" - -int main(int argc, char* argv[]) -{ - opterr = 0; - exemgr::Opt opt(argc, argv); - - // Set locale language - setlocale(LC_ALL, ""); - setlocale(LC_NUMERIC, "C"); - - // This is unset due to the way we start it - // program_invocation_short_name = const_cast("ExeMgr"); - - // Initialize the charset library - MY_INIT(argv[0]); - // global ptr defined in serviceexemgr.cpp - exemgr::globServiceExeMgr = new exemgr::ServiceExeMgr(opt); - return exemgr::globServiceExeMgr->Run(); -} - diff --git a/exemgr/femsghandler.cpp b/exemgr/femsghandler.cpp deleted file mode 100644 index 0ce22519f..000000000 --- a/exemgr/femsghandler.cpp +++ /dev/null @@ -1,143 +0,0 @@ -/* Copyright (C) 2014 InfiniDB, Inc. - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, - MA 02110-1301, USA. */ - -#include "messagequeue.h" -#include "iosocket.h" - -#include "femsghandler.h" - -using namespace std; -using namespace joblist; -using namespace messageqcpp; - -threadpool::ThreadPool FEMsgHandler::threadPool; - -namespace -{ -class Runner -{ - public: - Runner(FEMsgHandler* f) : target(f) - { - } - void operator()() - { - target->threadFcn(); - } - FEMsgHandler* target; -}; - -} // namespace - -FEMsgHandler::FEMsgHandler() : die(false), running(false), sawData(false), sock(NULL) -{ -} - -FEMsgHandler::FEMsgHandler(boost::shared_ptr j, IOSocket* s) - : die(false), running(false), sawData(false), jl(j) -{ - sock = s; - assert(sock); -} - -FEMsgHandler::~FEMsgHandler() -{ - stop(); - threadPool.join(thr); -} - -void FEMsgHandler::start() -{ - if (!running) - { - running = true; - thr = threadPool.invoke(Runner(this)); - } -} - -void FEMsgHandler::stop() -{ - die = true; - jl.reset(); -} - -void FEMsgHandler::setJobList(boost::shared_ptr j) -{ - jl = j; -} - -void FEMsgHandler::setSocket(IOSocket* i) -{ - sock = i; - assert(sock); -} - -/* Note, the next two fcns strongly depend on ExeMgr's current implementation. There's a - * good chance that if ExeMgr's table send loop is changed, these will need to be - * updated to match. - */ - -/* This is currently only called if InetStreamSocket::write() throws, implying - * a connection error. It might not make sense in other contexts. - */ -bool FEMsgHandler::aborted() -{ - if (sawData) - return true; - - boost::mutex::scoped_lock sl(mutex); - int err; - int connectionNum = sock->getConnectionNum(); - - err = InetStreamSocket::pollConnection(connectionNum, 1000); - - if (err == 1) - { - sawData = true; - return true; - } - - return false; -} - -void FEMsgHandler::threadFcn() -{ - int err = 0; - int connectionNum = sock->getConnectionNum(); - - /* This waits for the next readable event on sock. An abort is signaled - * by sending something (anything at the moment), then dropping the connection. - * This fcn exits on all other events. - */ - while (!die && err == 0) - { - boost::mutex::scoped_lock sl(mutex); - err = InetStreamSocket::pollConnection(connectionNum, 1000); - } - - if (err == 1) - sawData = true; // there's data to read, must be the abort signal - - if (!die && (err == 2 || err == 1)) - { - die = true; - jl->abort(); - jl.reset(); - } - - running = false; -} diff --git a/exemgr/femsghandler.h b/exemgr/femsghandler.h deleted file mode 100644 index baa91ad90..000000000 --- a/exemgr/femsghandler.h +++ /dev/null @@ -1,47 +0,0 @@ -/* Copyright (C) 2014 InfiniDB, Inc. - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, - MA 02110-1301, USA. */ - -#pragma once - -#include "joblist.h" -#include "inetstreamsocket.h" -#include "threadpool.h" - -class FEMsgHandler -{ - public: - FEMsgHandler(); - FEMsgHandler(boost::shared_ptr, messageqcpp::IOSocket*); - virtual ~FEMsgHandler(); - - void start(); - void stop(); - void setJobList(boost::shared_ptr); - void setSocket(messageqcpp::IOSocket*); - bool aborted(); - - void threadFcn(); - - static threadpool::ThreadPool threadPool; - - private: - bool die, running, sawData; - messageqcpp::IOSocket* sock; - boost::shared_ptr jl; - boost::mutex mutex; - uint64_t thr; -}; diff --git a/exemgr/rssmonfcn.cpp b/exemgr/rssmonfcn.cpp deleted file mode 100644 index 8a9b62d83..000000000 --- a/exemgr/rssmonfcn.cpp +++ /dev/null @@ -1,69 +0,0 @@ -/* Copyright (C) 2022 MariaDB Corporation - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, - MA 02110-1301, USA. */ - -#include - -#include "rssmonfcn.h" -#include "serviceexemgr.h" - -namespace exemgr -{ - void RssMonFcn::operator()() const - { - logging::Logger& msgLog = globServiceExeMgr->getLogger(); - auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); - for (;;) - { - size_t rssMb = rss(); - size_t pct = rssMb * 100 / fMemTotal; - - if (pct > fMaxPct) - { - if (fMaxPct >= 95) - { - std::cerr << "Too much memory allocated!" << std::endl; - logging::Message::Args args; - args.add((int)pct); - args.add((int)fMaxPct); - msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logRssTooBig, args, logging::LoggingID(16)); - exit(1); - } - - if (statementsRunningCount->cur() == 0) - { - std::cerr << "Too much memory allocated!" << std::endl; - logging::Message::Args args; - args.add((int)pct); - args.add((int)fMaxPct); - msgLog.logMessage(logging::LOG_TYPE_WARNING, ServiceExeMgr::logRssTooBig, args, logging::LoggingID(16)); - exit(1); - } - - std::cerr << "Too much memory allocated, but stmts running" << std::endl; - } - - // Update sessionMemMap entries lower than current mem % use - globServiceExeMgr->updateSessionMap(pct); - - pause_(); - } - } - void startRssMon(size_t maxPct, int pauseSeconds) - { - new std::thread(RssMonFcn(maxPct, pauseSeconds)); - } -} // namespace \ No newline at end of file diff --git a/exemgr/rssmonfcn.h b/exemgr/rssmonfcn.h deleted file mode 100644 index 341038e91..000000000 --- a/exemgr/rssmonfcn.h +++ /dev/null @@ -1,36 +0,0 @@ -/* Copyright (C) 2022 MariaDB Corporation - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, - MA 02110-1301, USA. */ - -#pragma once - -#include - -#include "MonitorProcMem.h" - -namespace exemgr -{ -class RssMonFcn : public utils::MonitorProcMem -{ - public: - RssMonFcn(size_t maxPct, int pauseSeconds) : MonitorProcMem(maxPct, 0, 21, pauseSeconds) - { - } - - void operator()() const; -}; - -} // namespace \ No newline at end of file diff --git a/exemgr/serviceexemgr.cpp b/exemgr/serviceexemgr.cpp deleted file mode 100644 index 33ef3d8d2..000000000 --- a/exemgr/serviceexemgr.cpp +++ /dev/null @@ -1,475 +0,0 @@ -/* Copyright (C) 2014 InfiniDB, Inc. - Copyright (C) 2019-22 MariaDB Corporation - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, - MA 02110-1301, USA. */ - -/********************************************************************** - * $Id: main.cpp 1000 2013-07-24 21:05:51Z pleblanc $ - * - * - ***********************************************************************/ -/** - * @brief execution plan manager main program - * - * This is the ?main? program for dealing with execution plans and - * result sets. It sits in a loop waiting for CalpontExecutionPlan - * from the FEP. It then passes the CalpontExecutionPlan to the - * JobListFactory from which a JobList is obtained. This is passed - * to to the Query Manager running in the real-time portion of the - * EC. The ExecutionPlanManager waits until the Query Manager - * returns a result set for the job list. These results are passed - * into the CalpontResultFactory, which outputs a CalpontResultSet. - * The ExecutionPlanManager passes the CalpontResultSet into the - * VendorResultFactory which produces a result set tailored to the - * specific DBMS front end in use. The ExecutionPlanManager then - * sends the VendorResultSet back to the Calpont Database Connector - * on the Front-End Processor where it is returned to the DBMS - * front-end. - */ -#include -#include -#include -#include - -#undef root_name -#include - -#include "calpontselectexecutionplan.h" -#include "mcsanalyzetableexecutionplan.h" -#include "activestatementcounter.h" -#include "distributedenginecomm.h" -#include "resourcemanager.h" -#include "configcpp.h" -#include "queryteleserverparms.h" -#include "iosocket.h" -#include "joblist.h" -#include "joblistfactory.h" -#include "oamcache.h" -#include "simplecolumn.h" -#include "bytestream.h" -#include "telestats.h" -#include "messageobj.h" -#include "messagelog.h" -#include "sqllogger.h" -#include "femsghandler.h" -#include "idberrorinfo.h" -#include "MonitorProcMem.h" -#include "liboamcpp.h" -#include "crashtrace.h" -#include "service.h" - -#include -#include -#include - -#include "dbrm.h" - -#include "mariadb_my_sys.h" -#include "statistics.h" -#include "serviceexemgr.h" -#include "sqlfrontsessionthread.h" - - -namespace exemgr -{ - ServiceExeMgr* globServiceExeMgr = nullptr; - void startRssMon(size_t maxPct, int pauseSeconds); - - void added_a_pm(int) - { - logging::LoggingID logid(21, 0, 0); - logging::Message::Args args1; - logging::Message msg(1); - args1.add("exeMgr caught SIGHUP. Resetting connections"); - msg.format(args1); - std::cout << msg.msg().c_str() << std::endl; - logging::Logger logger(logid.fSubsysID); - logger.logMessage(logging::LOG_TYPE_DEBUG, msg, logid); - - auto* dec = exemgr::globServiceExeMgr->getDec(); - if (dec) - { - oam::OamCache* oamCache = oam::OamCache::makeOamCache(); - oamCache->forceReload(); - dec->Setup(); - } - } - - void printTotalUmMemory(int sig) - { - int64_t num = globServiceExeMgr->getRm().availableMemory(); - std::cout << "Total UM memory available: " << num << std::endl; - } - - void ServiceExeMgr::setupSignalHandlers() - { - struct sigaction ign; - - memset(&ign, 0, sizeof(ign)); - ign.sa_handler = SIG_IGN; - - sigaction(SIGPIPE, &ign, 0); - - memset(&ign, 0, sizeof(ign)); - ign.sa_handler = exemgr::added_a_pm; - sigaction(SIGHUP, &ign, 0); - ign.sa_handler = exemgr::printTotalUmMemory; - sigaction(SIGUSR1, &ign, 0); - memset(&ign, 0, sizeof(ign)); - ign.sa_handler = fatalHandler; - sigaction(SIGSEGV, &ign, 0); - sigaction(SIGABRT, &ign, 0); - sigaction(SIGFPE, &ign, 0); - } - - void cleanTempDir() - { - using TempDirPurpose = config::Config::TempDirPurpose; - struct Dirs - { - std::string section; - std::string allowed; - TempDirPurpose purpose; - }; - std::vector dirs{{"HashJoin", "AllowDiskBasedJoin", TempDirPurpose::Joins}, - {"RowAggregation", "AllowDiskBasedAggregation", TempDirPurpose::Aggregates}}; - const auto config = config::Config::makeConfig(); - - for (const auto& dir : dirs) - { - std::string allowStr = config->getConfig(dir.section, dir.allowed); - bool allow = (allowStr == "Y" || allowStr == "y"); - - std::string tmpPrefix = config->getTempFileDir(dir.purpose); - - if (allow && tmpPrefix.empty()) - { - std::cerr << "Empty tmp directory name for " << dir.section << std::endl; - logging::LoggingID logid(16, 0, 0); - logging::Message::Args args; - logging::Message message(8); - args.add("Empty tmp directory name for:"); - args.add(dir.section); - message.format(args); - logging::Logger logger(logid.fSubsysID); - logger.logMessage(logging::LOG_TYPE_CRITICAL, message, logid); - } - - tmpPrefix += "/"; - - idbassert(tmpPrefix != "/"); - - /* This is quite scary as ExeMgr usually runs as root */ - try - { - if (allow) - { - boost::filesystem::remove_all(tmpPrefix); - } - boost::filesystem::create_directories(tmpPrefix); - } - catch (const std::exception& ex) - { - std::cerr << ex.what() << std::endl; - logging::LoggingID logid(16, 0, 0); - logging::Message::Args args; - logging::Message message(8); - args.add("Exception whilst cleaning tmpdir: "); - args.add(ex.what()); - message.format(args); - logging::Logger logger(logid.fSubsysID); - logger.logMessage(logging::LOG_TYPE_WARNING, message, logid); - } - catch (...) - { - std::cerr << "Caught unknown exception during tmpdir cleanup" << std::endl; - logging::LoggingID logid(16, 0, 0); - logging::Message::Args args; - logging::Message message(8); - args.add("Unknown exception whilst cleaning tmpdir"); - message.format(args); - logging::Logger logger(logid.fSubsysID); - logger.logMessage(logging::LOG_TYPE_WARNING, message, logid); - } - } - } - - const std::string ServiceExeMgr::prettyPrintMiniInfo(const std::string& in) - { - // 1. take the std::string and tok it by '\n' - // 2. for each part in each line calc the longest part - // 3. padding to each longest value, output a header and the lines - typedef boost::tokenizer > my_tokenizer; - boost::char_separator sep1("\n"); - my_tokenizer tok1(in, sep1); - std::vector lines; - std::string header = "Desc Mode Table TableOID ReferencedColumns PIO LIO PBE Elapsed Rows"; - const int header_parts = 10; - lines.push_back(header); - - for (my_tokenizer::const_iterator iter1 = tok1.begin(); iter1 != tok1.end(); ++iter1) - { - if (!iter1->empty()) - lines.push_back(*iter1); - } - - std::vector lens; - - for (int i = 0; i < header_parts; i++) - lens.push_back(0); - - std::vector > lineparts; - std::vector::iterator iter2; - int j; - - for (iter2 = lines.begin(), j = 0; iter2 != lines.end(); ++iter2, j++) - { - boost::char_separator sep2(" "); - my_tokenizer tok2(*iter2, sep2); - int i; - std::vector parts; - my_tokenizer::iterator iter3; - - for (iter3 = tok2.begin(), i = 0; iter3 != tok2.end(); ++iter3, i++) - { - if (i >= header_parts) - break; - - std::string part(*iter3); - - if (j != 0 && i == 8) - part.resize(part.size() - 3); - - assert(i < header_parts); - - if (part.size() > lens[i]) - lens[i] = part.size(); - - parts.push_back(part); - } - - assert(i == header_parts); - lineparts.push_back(parts); - } - - std::ostringstream oss; - - std::vector >::iterator iter1 = lineparts.begin(); - std::vector >::iterator end1 = lineparts.end(); - - oss << "\n"; - - while (iter1 != end1) - { - std::vector::iterator iter2 = iter1->begin(); - std::vector::iterator end2 = iter1->end(); - assert(distance(iter2, end2) == header_parts); - int i = 0; - - while (iter2 != end2) - { - assert(i < header_parts); - oss << std::setw(lens[i]) << std::left << *iter2 << " "; - ++iter2; - i++; - } - - oss << "\n"; - ++iter1; - } - - return oss.str(); - } - - int ServiceExeMgr::Child() - { - // Make sure CSC thinks it's on a UM or else bucket reuse stuff below will stall - if (!m_e) - setenv("CALPONT_CSC_IDENT", "um", 1); - - setupSignalHandlers(); - int err = 0; - if (!m_debug) - err = setupResources(); - std::string errMsg; - - switch (err) - { - case -1: - case -3: errMsg = "Error getting file limits, please see non-root install documentation"; break; - - case -2: errMsg = "Error setting file limits, please see non-root install documentation"; break; - - case -4: - errMsg = "Could not install file limits to required value, please see non-root install documentation"; - break; - - default: errMsg = "Couldn't change working directory or unknown error"; break; - } - - err = setupCwd(); - - if (err < 0) - { - oam::Oam oam; - logging::Message::Args args; - logging::Message message; - args.add(errMsg); - message.format(args); - logging::LoggingID lid(16); - logging::MessageLog ml(lid); - ml.logCriticalMessage(message); - std::cerr << errMsg << std::endl; - - NotifyServiceInitializationFailed(); - return 2; - } - - cleanTempDir(); - - logging::MsgMap msgMap; - msgMap[logDefaultMsg] = logging::Message(logDefaultMsg); - msgMap[logDbProfStartStatement] = logging::Message(logDbProfStartStatement); - msgMap[logDbProfEndStatement] = logging::Message(logDbProfEndStatement); - msgMap[logStartSql] = logging::Message(logStartSql); - msgMap[logEndSql] = logging::Message(logEndSql); - msgMap[logRssTooBig] = logging::Message(logRssTooBig); - msgMap[logDbProfQueryStats] = logging::Message(logDbProfQueryStats); - msgMap[logExeMgrExcpt] = logging::Message(logExeMgrExcpt); - msgLog_.msgMap(msgMap); - - dec_ = joblist::DistributedEngineComm::instance(rm_, true); - dec_->Open(); - - bool tellUser = true; - - messageqcpp::MessageQueueServer* mqs; - - statementsRunningCount_ = new ActiveStatementCounter(rm_->getEmExecQueueSize()); - const std::string ExeMgr = "ExeMgr1"; - for (;;) - { - try - { - mqs = new messageqcpp::MessageQueueServer(ExeMgr, rm_->getConfig(), messageqcpp::ByteStream::BlockSize, 64); - break; - } - catch (std::runtime_error& re) - { - std::string what = re.what(); - - if (what.find("Address already in use") != std::string::npos) - { - if (tellUser) - { - std::cerr << "Address already in use, retrying..." << std::endl; - tellUser = false; - } - - sleep(5); - } - else - { - throw; - } - } - } - - // class jobstepThreadPool is used by other processes. We can't call - // resourcemanaager (rm) functions during the static creation of threadpool - // because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton). - // From the pools perspective, it has no idea if it is ExeMgr doing the - // creation, so it has no idea which way to set the flag. So we set the max here. - joblist::JobStep::jobstepThreadPool.setMaxThreads(rm_->getJLThreadPoolSize()); - joblist::JobStep::jobstepThreadPool.setName("ExeMgrJobList"); - - if (rm_->getJlThreadPoolDebug() == "Y" || rm_->getJlThreadPoolDebug() == "y") - { - joblist::JobStep::jobstepThreadPool.setDebug(true); - joblist::JobStep::jobstepThreadPool.invoke( - threadpool::ThreadPoolMonitor(&joblist::JobStep::jobstepThreadPool)); - } - - int serverThreads = rm_->getEmServerThreads(); - int maxPct = rm_->getEmMaxPct(); - int pauseSeconds = rm_->getEmSecondsBetweenMemChecks(); - int priority = rm_->getEmPriority(); - - FEMsgHandler::threadPool.setMaxThreads(serverThreads); - FEMsgHandler::threadPool.setName("FEMsgHandler"); - - if (maxPct > 0) - { - // Defined in rssmonfcn.cpp - exemgr::startRssMon(maxPct, pauseSeconds); - } - - setpriority(PRIO_PROCESS, 0, priority); - - std::string teleServerHost(rm_->getConfig()->getConfig("QueryTele", "Host")); - - if (!teleServerHost.empty()) - { - int teleServerPort = toInt(rm_->getConfig()->getConfig("QueryTele", "Port")); - - if (teleServerPort > 0) - { - teleServerParms_ = querytele::QueryTeleServerParms(teleServerHost, teleServerPort); - } - } - - NotifyServiceStarted(); - - std::cout << "Starting ExeMgr: st = " << serverThreads << ", qs = " << rm_->getEmExecQueueSize() - << ", mx = " << maxPct << ", cf = " << rm_->getConfig()->configFile() << std::endl; - - { - BRM::DBRM* dbrm = new BRM::DBRM(); - dbrm->setSystemQueryReady(true); - delete dbrm; - } - - threadpool::ThreadPool exeMgrThreadPool(serverThreads, 0); - exeMgrThreadPool.setName("ExeMgrServer"); - - if (rm_->getExeMgrThreadPoolDebug() == "Y" || rm_->getExeMgrThreadPoolDebug() == "y") - { - exeMgrThreadPool.setDebug(true); - exeMgrThreadPool.invoke(threadpool::ThreadPoolMonitor(&exeMgrThreadPool)); - } - - // Load statistics. - try - { - statistics::StatisticsManager::instance()->loadFromFile(); - } - catch (...) - { - std::cerr << "Cannot load statistics from file " << std::endl; - } - - for (;;) - { - messageqcpp::IOSocket ios; - ios = mqs->accept(); - exeMgrThreadPool.invoke(exemgr::SQLFrontSessionThread(ios, dec_, rm_)); - } - - exeMgrThreadPool.wait(); - - return 0; - } -} // namespace diff --git a/exemgr/serviceexemgr.h b/exemgr/serviceexemgr.h deleted file mode 100644 index afcf07914..000000000 --- a/exemgr/serviceexemgr.h +++ /dev/null @@ -1,343 +0,0 @@ -/* Copyright (C) 2022 Mariadb Corporation. - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, - MA 02110-1301, USA. */ - -#pragma once - -#include -#include -#include -#include - -#undef root_name -#include - -#include "calpontselectexecutionplan.h" -#include "mcsanalyzetableexecutionplan.h" -#include "activestatementcounter.h" -#include "distributedenginecomm.h" -#include "resourcemanager.h" -#include "configcpp.h" -#include "queryteleserverparms.h" -#include "iosocket.h" -#include "joblist.h" -#include "joblistfactory.h" -#include "oamcache.h" -#include "simplecolumn.h" -#include "bytestream.h" -#include "telestats.h" -#include "messageobj.h" -#include "messagelog.h" -#include "sqllogger.h" -#include "femsghandler.h" -#include "idberrorinfo.h" -#include "MonitorProcMem.h" -#include "liboamcpp.h" -#include "crashtrace.h" -#include "service.h" - -#include -#include -#include - -#include "dbrm.h" - -#include "mariadb_my_sys.h" -#include "statistics.h" - -namespace exemgr -{ - class Opt - { - public: - int m_debug; - bool m_e; - bool m_fg; - Opt(int argc, char* argv[]) : m_debug(0), m_e(false), m_fg(false) - { - int c; - while ((c = getopt(argc, argv, "edf")) != EOF) - { - switch (c) - { - case 'd': m_debug++; break; - - case 'e': m_e = true; break; - - case 'f': m_fg = true; break; - - case '?': - default: break; - } - } - } - Opt(): m_debug(0), m_e(false), m_fg(false) {}; - int getDebugLevel() const - { - return m_debug; - } - }; - - class ServiceExeMgr : public Service, public Opt - { - using SessionMemMap_t = std::map; - using ThreadCntPerSessionMap_t = std::map; - protected: - void log(logging::LOG_TYPE type, const std::string& str) - { - logging::LoggingID logid(16); - logging::Message::Args args; - logging::Message message(8); - args.add(strerror(errno)); - message.format(args); - logging::Logger logger(logid.fSubsysID); - logger.logMessage(type, message, logid); - } - - public: - ServiceExeMgr(const Opt& opt) : Service("ExeMgr"), Opt(opt), msgLog_(logging::Logger(16)) - { - bool runningWithExeMgr = true; - rm_ = joblist::ResourceManager::instance(runningWithExeMgr); - } - void LogErrno() override - { - log(logging::LOG_TYPE_CRITICAL, std::string(strerror(errno))); - } - void ParentLogChildMessage(const std::string& str) override - { - log(logging::LOG_TYPE_INFO, str); - } - int Child() override; - int Run() - { - return m_fg ? Child() : RunForking(); - } - static const constexpr unsigned logDefaultMsg = logging::M0000; - static const constexpr unsigned logDbProfStartStatement = logging::M0028; - static const constexpr unsigned logDbProfEndStatement = logging::M0029; - static const constexpr unsigned logStartSql = logging::M0041; - static const constexpr unsigned logEndSql = logging::M0042; - static const constexpr unsigned logRssTooBig = logging::M0044; - static const constexpr unsigned logDbProfQueryStats = logging::M0047; - static const constexpr unsigned logExeMgrExcpt = logging::M0055; - // If any flags other than the table mode flags are set, produce output to screeen - static const constexpr uint32_t flagsWantOutput = (0xffffffff & ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH & - ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF); - logging::Logger& getLogger() - { - return msgLog_; - } - void updateSessionMap(const size_t pct) - { - std::lock_guard lk(sessionMemMapMutex_); - - for (auto mapIter = sessionMemMap_.begin(); mapIter != sessionMemMap_.end(); ++mapIter) - { - if (pct > mapIter->second) - { - mapIter->second = pct; - } - } - } - ThreadCntPerSessionMap_t& getThreadCntPerSessionMap() - { - return threadCntPerSessionMap_; - } - std::mutex& getThreadCntPerSessionMapMutex() - { - return threadCntPerSessionMapMutex_; - } - void initMaxMemPct(uint32_t sessionId) - { - // WIP - if (sessionId < 0x80000000) - { - std::lock_guard lk(sessionMemMapMutex_); - auto mapIter = sessionMemMap_.find(sessionId); - - if (mapIter == sessionMemMap_.end()) - { - sessionMemMap_[sessionId] = 0; - } - else - { - mapIter->second = 0; - } - } - } - uint64_t getMaxMemPct(const uint32_t sessionId) - { - uint64_t maxMemoryPct = 0; - // WIP - if (sessionId < 0x80000000) - { - std::lock_guard lk(sessionMemMapMutex_); - auto mapIter = sessionMemMap_.find(sessionId); - - if (mapIter != sessionMemMap_.end()) - { - maxMemoryPct = (uint64_t)mapIter->second; - } - } - - return maxMemoryPct; - } - void deleteMaxMemPct(uint32_t sessionId) - { - if (sessionId < 0x80000000) - { - std::lock_guard lk(sessionMemMapMutex_); - auto mapIter = sessionMemMap_.find(sessionId); - - if (mapIter != sessionMemMap_.end()) - { - sessionMemMap_.erase(sessionId); - } - } - } - //...Increment the number of threads using the specified sessionId - void incThreadCntPerSession(uint32_t sessionId) - { - std::lock_guard lk(threadCntPerSessionMapMutex_); - auto mapIter = threadCntPerSessionMap_.find(sessionId); - - if (mapIter == threadCntPerSessionMap_.end()) - threadCntPerSessionMap_.insert(ThreadCntPerSessionMap_t::value_type(sessionId, 1)); - else - mapIter->second++; - } - //...Decrement the number of threads using the specified sessionId. - //...When the thread count for a sessionId reaches 0, the corresponding - //...CalpontSystemCatalog objects are deleted. - //...The user query and its associated catalog query have a different - //...session Id where the highest bit is flipped. - //...The object with id(sessionId | 0x80000000) cannot be removed before - //...user query session completes because the sysdata may be used for - //...debugging/stats purpose, such as result graph, etc. - void decThreadCntPerSession(uint32_t sessionId) - { - std::lock_guard lk(threadCntPerSessionMapMutex_); - auto mapIter = threadCntPerSessionMap_.find(sessionId); - - if (mapIter != threadCntPerSessionMap_.end()) - { - if (--mapIter->second == 0) - { - threadCntPerSessionMap_.erase(mapIter); - execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId); - execplan::CalpontSystemCatalog::removeCalpontSystemCatalog((sessionId ^ 0x80000000)); - } - } - } - ActiveStatementCounter* getStatementsRunningCount() - { - return statementsRunningCount_; - } - joblist::DistributedEngineComm* getDec() - { - return dec_; - } - int toInt(const std::string& val) - { - if (val.length() == 0) - return -1; - - return static_cast(config::Config::fromText(val)); - } - const std::string prettyPrintMiniInfo(const std::string& in); - - const std::string timeNow() - { - time_t outputTime = time(0); - struct tm ltm; - char buf[32]; // ctime(3) says at least 26 - size_t len = 0; - asctime_r(localtime_r(&outputTime, <m), buf); - len = strlen(buf); - - if (len > 0) - --len; - - if (buf[len] == '\n') - buf[len] = 0; - - return buf; - } - querytele::QueryTeleServerParms& getTeleServerParms() - { - return teleServerParms_; - } - joblist::ResourceManager& getRm() - { - return *rm_; - } - private: - void setupSignalHandlers(); - int8_t setupCwd() - { - std::string workdir = rm_->getScWorkingDir(); - int8_t rc = chdir(workdir.c_str()); - - if (rc < 0 || access(".", W_OK) != 0) - rc = chdir("/tmp"); - - return (rc < 0) ? -5 : rc; - } - int setupResources() - { - struct rlimit rlim; - - if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) - { - return -1; - } - rlim.rlim_cur = rlim.rlim_max = 65536; - - if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) - { - return -2; - } - if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) - { - return -3; - } - if (rlim.rlim_cur != 65536) - { - return -4; - } - return 0; - } - - logging::Logger msgLog_; - SessionMemMap_t sessionMemMap_; // track memory% usage during a query - std::mutex sessionMemMapMutex_; - //...The FrontEnd may establish more than 1 connection (which results in - // more than 1 ExeMgr thread) per session. These threads will share - // the same CalpontSystemCatalog object for that session. Here, we - // define a std::map to track how many threads are sharing each session, so - // that we know when we can safely delete a CalpontSystemCatalog object - // shared by multiple threads per session. - ThreadCntPerSessionMap_t threadCntPerSessionMap_; - std::mutex threadCntPerSessionMapMutex_; - ActiveStatementCounter* statementsRunningCount_; - joblist::DistributedEngineComm* dec_; - joblist::ResourceManager* rm_; - // Its attributes are set in Child() - querytele::QueryTeleServerParms teleServerParms_; - }; - extern ServiceExeMgr* globServiceExeMgr; -} \ No newline at end of file diff --git a/exemgr/sqlfrontsessionthread.cpp b/exemgr/sqlfrontsessionthread.cpp deleted file mode 100644 index 3b5094e7a..000000000 --- a/exemgr/sqlfrontsessionthread.cpp +++ /dev/null @@ -1,985 +0,0 @@ -/* Copyright (C) 2022 Mariadb Corporation. - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, - MA 02110-1301, USA. */ - -#include "sqlfrontsessionthread.h" - -namespace exemgr -{ - uint64_t SQLFrontSessionThread::getMaxMemPct(uint32_t sessionId) - { - return globServiceExeMgr->getMaxMemPct(sessionId); - } - void SQLFrontSessionThread::deleteMaxMemPct(uint32_t sessionId) - { - return globServiceExeMgr->deleteMaxMemPct(sessionId); - } - void SQLFrontSessionThread::incThreadCntPerSession(uint32_t sessionId) - { - return globServiceExeMgr->incThreadCntPerSession(sessionId); - } - void SQLFrontSessionThread::decThreadCntPerSession(uint32_t sessionId) - { - return globServiceExeMgr->decThreadCntPerSession(sessionId); - } - - void SQLFrontSessionThread::initMaxMemPct(uint32_t sessionId) - { - return globServiceExeMgr->initMaxMemPct(sessionId); - } - //...Get and log query stats to specified output stream - const std::string SQLFrontSessionThread::formatQueryStats( - joblist::SJLP& jl, // joblist associated with query - const std::string& label, // header label to print in front of log output - bool includeNewLine, // include line breaks in query stats std::string - bool vtableModeOn, bool wantExtendedStats, uint64_t rowsReturned) - { - std::ostringstream os; - - // Get stats if not already acquired for current query - if (!fStatsRetrieved) - { - if (wantExtendedStats) - { - // wait for the ei data to be written by another thread (brain-dead) - struct timespec req = {0, 250000}; // 250 usec - nanosleep(&req, 0); - } - - // Get % memory usage during current query for sessionId - jl->querySummary(wantExtendedStats); - fStats = jl->queryStats(); - fStats.fMaxMemPct = getMaxMemPct(fStats.fSessionID); - fStats.fRows = rowsReturned; - fStatsRetrieved = true; - } - - std::string queryMode; - queryMode = (vtableModeOn ? "Distributed" : "Standard"); - - // Log stats to specified output stream - os << label << ": MaxMemPct-" << fStats.fMaxMemPct << "; NumTempFiles-" << fStats.fNumFiles - << "; TempFileSpace-" << roundBytes(fStats.fFileBytes) << "; ApproxPhyI/O-" << fStats.fPhyIO - << "; CacheI/O-" << fStats.fCacheIO << "; BlocksTouched-" << fStats.fMsgRcvCnt; - - if (includeNewLine) - os << std::endl << " "; // insert line break - else - os << "; "; // continue without line break - - os << "PartitionBlocksEliminated-" << fStats.fCPBlocksSkipped << "; MsgBytesIn-" - << roundBytes(fStats.fMsgBytesIn) << "; MsgBytesOut-" << roundBytes(fStats.fMsgBytesOut) << "; Mode-" - << queryMode; - - return os.str(); - } - - //... Round off to human readable format (KB, MB, or GB). - const std::string SQLFrontSessionThread::roundBytes(uint64_t value) const - { - const char* units[] = {"B", "KB", "MB", "GB", "TB"}; - uint64_t i = 0, up = 0; - uint64_t roundedValue = value; - - while (roundedValue > 1024 && i < 4) - { - up = (roundedValue & 512); - roundedValue /= 1024; - i++; - } - - if (up) - roundedValue++; - - std::ostringstream oss; - oss << roundedValue << units[i]; - return oss.str(); - } - - //...Round off to nearest (1024*1024) MB - uint64_t SQLFrontSessionThread::roundMB(uint64_t value) const - { - uint64_t roundedValue = value >> 20; - - if (value & 524288) - roundedValue++; - - return roundedValue; - } - - void SQLFrontSessionThread::setRMParms(const execplan::CalpontSelectExecutionPlan::RMParmVec& parms) - { - for (execplan::CalpontSelectExecutionPlan::RMParmVec::const_iterator it = parms.begin(); - it != parms.end(); ++it) - { - switch (it->id) - { - case execplan::PMSMALLSIDEMEMORY: - { - globServiceExeMgr->getRm().addHJPmMaxSmallSideMap(it->sessionId, it->value); - break; - } - - case execplan::UMSMALLSIDEMEMORY: - { - globServiceExeMgr->getRm().addHJUmMaxSmallSideMap(it->sessionId, it->value); - break; - } - - default:; - } - } - } - - void SQLFrontSessionThread::buildSysCache(const execplan::CalpontSelectExecutionPlan& csep, - boost::shared_ptr csc) - { - const execplan::CalpontSelectExecutionPlan::ColumnMap& colMap = csep.columnMap(); - std::string schemaName; - - for (auto it = colMap.begin(); it != colMap.end(); ++it) - { - const auto sc = dynamic_cast((it->second).get()); - - if (sc) - { - schemaName = sc->schemaName(); - - // only the first time a schema is got will actually query - // system catalog. System catalog keeps a schema name std::map. - // if a schema exists, the call getSchemaInfo returns without - // doing anything. - if (!schemaName.empty()) - csc->getSchemaInfo(schemaName); - } - } - - execplan::CalpontSelectExecutionPlan::SelectList::const_iterator subIt; - - for (subIt = csep.derivedTableList().begin(); subIt != csep.derivedTableList().end(); ++subIt) - { - buildSysCache(*(dynamic_cast(subIt->get())), csc); - } - } - - void SQLFrontSessionThread::writeCodeAndError(messageqcpp::ByteStream::quadbyte code, const std::string emsg) - { - messageqcpp::ByteStream emsgBs; - messageqcpp::ByteStream tbs; - tbs << code; - fIos.write(tbs); - emsgBs << emsg; - fIos.write(emsgBs); - } - - void SQLFrontSessionThread::analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl, bool& stmtCounted) - { - auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); - messageqcpp::ByteStream::quadbyte qb; - execplan::MCSAnalyzeTableExecutionPlan caep; - - bs = fIos.read(); - caep.unserialize(bs); - - statementsRunningCount->incr(stmtCounted); - jl = joblist::JobListFactory::makeJobList(&caep, fRm, false, true); - - // Joblist is empty. - if (jl->status() == logging::statisticsJobListEmpty) - { - if (caep.traceOn()) - std::cout << "JobList is empty " << std::endl; - - jl.reset(); - bs.restart(); - qb = ANALYZE_TABLE_SUCCESS; - bs << qb; - fIos.write(bs); - bs.reset(); - statementsRunningCount->decr(stmtCounted); - return; - } - - if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers())) - { - std::cout << "fEc setup " << std::endl; - fEc->Setup(); - } - - if (jl->status() == 0) - { - std::string emsg; - - if (jl->putEngineComm(fEc) != 0) - throw std::runtime_error(jl->errMsg()); - } - else - { - throw std::runtime_error("ExeMgr: could not build a JobList!"); - } - - // Execute a joblist. - jl->doQuery(); - - FEMsgHandler msgHandler(jl, &fIos); - - msgHandler.start(); - auto rowCount = jl->projectTable(100, bs); - msgHandler.stop(); - - auto outRG = (static_cast(jl.get()))->getOutputRowGroup(); - - if (caep.traceOn()) - std::cout << "Row count " << rowCount << std::endl; - - // Process `RowGroup`, increase an epoch and save statistics to the file. - auto* statisticsManager = statistics::StatisticsManager::instance(); - statisticsManager->analyzeColumnKeyTypes(outRG, caep.traceOn()); - statisticsManager->incEpoch(); - statisticsManager->saveToFile(); - - // Distribute statistics across all ExeMgr clients if possible. - statistics::StatisticsDistributor::instance()->distributeStatistics(); - - // Send the signal back to front-end. - bs.restart(); - qb = ANALYZE_TABLE_SUCCESS; - bs << qb; - fIos.write(bs); - bs.reset(); - statementsRunningCount->decr(stmtCounted); - } - - void SQLFrontSessionThread::analyzeTableHandleStats(messageqcpp::ByteStream& bs) - { - messageqcpp::ByteStream::quadbyte qb; -#ifdef DEBUG_STATISTICS - std::cout << "Get distributed statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; -#endif - bs = fIos.read(); -#ifdef DEBUG_STATISTICS - std::cout << "Read the hash from statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; -#endif - uint64_t dataHashRec; - bs >> dataHashRec; - - uint64_t dataHash = statistics::StatisticsManager::instance()->computeHashFromStats(); - // The stats are the same. - if (dataHash == dataHashRec) - { -#ifdef DEBUG_STATISTICS - std::cout << "The hash is the same as rec hash on ExeMgr(Client) from ExeMgr(Server) " << std::endl; -#endif - qb = ANALYZE_TABLE_SUCCESS; - bs << qb; - fIos.write(bs); - bs.reset(); - return; - } - - bs.restart(); - qb = ANALYZE_TABLE_NEED_STATS; - bs << qb; - fIos.write(bs); - - bs.restart(); - bs = fIos.read(); -#ifdef DEBUG_STATISTICS - std::cout << "Read statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; -#endif - statistics::StatisticsManager::instance()->unserialize(bs); - statistics::StatisticsManager::instance()->saveToFile(); - -#ifdef DEBUG_STATISTICS - std::cout << "Write flag on ExeMgr(Client) to ExeMgr(Server)" << std::endl; -#endif - qb = ANALYZE_TABLE_SUCCESS; - bs << qb; - fIos.write(bs); - bs.reset(); - } - - void SQLFrontSessionThread::operator()() - { - messageqcpp::ByteStream bs, inbs; - execplan::CalpontSelectExecutionPlan csep; - csep.sessionID(0); - joblist::SJLP jl; - bool incSQLFrontSessionThreadCnt = true; - std::mutex jlMutex; - std::condition_variable jlCleanupDone; - int destructing = 0; - int gDebug = globServiceExeMgr->getDebugLevel(); - logging::Logger& msgLog = globServiceExeMgr->getLogger(); - - bool selfJoin = false; - bool tryTuples = false; - bool usingTuples = false; - bool stmtCounted = false; - auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); - - try - { - for (;;) - { - selfJoin = false; - tryTuples = false; - usingTuples = false; - - if (jl) - { - // puts the real destruction in another thread to avoid - // making the whole session wait. It can take several seconds. - std::unique_lock scoped(jlMutex); - destructing++; - std::thread bgdtor( - [jl, &jlMutex, &jlCleanupDone, &destructing] - { - std::unique_lock scoped(jlMutex); - const_cast(jl).reset(); // this happens second; does real destruction - if (--destructing == 0) - jlCleanupDone.notify_one(); - }); - jl.reset(); // this runs first - bgdtor.detach(); - } - - bs = fIos.read(); - - if (bs.length() == 0) - { - if (gDebug > 1 || (gDebug && !csep.isInternal())) - std::cout << "### Got a close(1) for session id " << csep.sessionID() << std::endl; - - // connection closed by client - fIos.close(); - break; - } - else if (bs.length() < 4) // Not a CalpontSelectExecutionPlan - { - if (gDebug) - std::cout << "### Got a not-a-plan for session id " << csep.sessionID() << " with length " - << bs.length() << std::endl; - - fIos.close(); - break; - } - else if (bs.length() == 4) // possible tuple flag - { - messageqcpp::ByteStream::quadbyte qb; - bs >> qb; - - if (qb == 4) // UM wants new tuple i/f - { - if (gDebug) - std::cout << "### UM wants tuples" << std::endl; - - tryTuples = true; - // now wait for the CSEP... - bs = fIos.read(); - } - else if (qb == 5) // somebody wants stats - { - bs.restart(); - qb = statementsRunningCount->cur(); - bs << qb; - qb = statementsRunningCount->waiting(); - bs << qb; - fIos.write(bs); - fIos.close(); - break; - } - else if (qb == ANALYZE_TABLE_EXECUTE) - { - analyzeTableExecute(bs, jl, stmtCounted); - continue; - } - else if (qb == ANALYZE_TABLE_REC_STATS) - { - analyzeTableHandleStats(bs); - continue; - } - else - { - if (gDebug) - std::cout << "### Got a not-a-plan value " << qb << std::endl; - - fIos.close(); - break; - } - } - - new_plan: - try - { - csep.unserialize(bs); - } - catch (logging::IDBExcept& ex) - { - // We can get here on illegal function parameter data type, e.g. - // SELECT blob_column|1 FROM t1; - statementsRunningCount->decr(stmtCounted); - writeCodeAndError(ex.errorCode(), std::string(ex.what())); - continue; - } - - querytele::QueryTeleStats qts; - - if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT")) - { - qts.query_uuid = csep.uuid(); - qts.msg_type = querytele::QueryTeleStats::QT_START; - qts.start_time = querytele::QueryTeleClient::timeNowms(); - qts.query = csep.data(); - qts.session_id = csep.sessionID(); - qts.query_type = csep.queryType(); - qts.system_name = fOamCachePtr->getSystemName(); - qts.module_name = fOamCachePtr->getModuleName(); - qts.local_query = csep.localQuery(); - qts.schema_name = csep.schemaName(); - fTeleClient.postQueryTele(qts); - } - - if (gDebug > 1 || (gDebug && !csep.isInternal())) - std::cout << "### For session id " << csep.sessionID() << ", got a CSEP" << std::endl; - - setRMParms(csep.rmParms()); - // Re-establish lost PP connections. - if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers())) - { - fEc->Setup(); - } - // @bug 1021. try to get schema cache for a come in query. - // skip system catalog queries. - if (!csep.isInternal()) - { - boost::shared_ptr csc = - execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(csep.sessionID()); - buildSysCache(csep, csc); - } - - // As soon as we have a session id for this thread, update the - // thread count per session; only do this once per thread. - // Mask 0x80000000 is for associate user query and csc query - if (incSQLFrontSessionThreadCnt) - { - // WIP - incThreadCntPerSession(csep.sessionID() | 0x80000000); - incSQLFrontSessionThreadCnt = false; - } - - bool needDbProfEndStatementMsg = false; - logging::Message::Args args; - std::string sqlText = csep.data(); - logging::LoggingID li(16, csep.sessionID(), csep.txnID()); - - // Initialize stats for this query, including - // init sessionMemMap entry for this session to 0 memory %. - // We will need this later for traceOn() or if we receive a - // table request with qb=3 (see below). This is also recorded - // as query start time. - initStats(csep.sessionID(), sqlText); - fStats.fQueryType = csep.queryType(); - - // Log start and end statement if tracing is enabled. Keep in - // mind the trace flag won't be set for system catalog queries. - if (csep.traceOn()) - { - args.reset(); - args.add((int)csep.statementID()); - args.add((int)csep.verID().currentScn); - args.add(sqlText); - msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfStartStatement, args, li); - needDbProfEndStatementMsg = true; - } - - // Don't log subsequent self joins after first. - if (selfJoin) - sqlText = ""; - - std::ostringstream oss; - oss << sqlText << "; |" << csep.schemaName() << "|"; - logging::SQLLogger sqlLog(oss.str(), li); - - statementsRunningCount->incr(stmtCounted); - - if (tryTuples) - { - try // @bug2244: try/catch around fIos.write() calls responding to makeTupleList - { - jl = joblist::JobListFactory::makeJobList(&csep, fRm, true, true); - // assign query stats - jl->queryStats(fStats); - - if ((jl->status()) == 0 && (jl->putEngineComm(fEc) == 0)) - { - usingTuples = true; - - // Tell the FE that we're sending tuples back, not TableBands - writeCodeAndError(0, "NOERROR"); - auto tjlp = dynamic_cast(jl.get()); - assert(tjlp); - messageqcpp::ByteStream tbs; - tbs << tjlp->getOutputRowGroup(); - fIos.write(tbs); - } - else - { - const std::string emsg = jl->errMsg(); - statementsRunningCount->decr(stmtCounted); - writeCodeAndError(jl->status(), emsg); - std::cerr << "ExeMgr: could not build a tuple joblist: " << emsg << std::endl; - continue; - } - } - catch (std::exception& ex) - { - std::ostringstream errMsg; - errMsg << "ExeMgr: error writing makeJoblist " - "response; " - << ex.what(); - throw std::runtime_error(errMsg.str()); - } - catch (...) - { - std::ostringstream errMsg; - errMsg << "ExeMgr: unknown error writing makeJoblist " - "response; "; - throw std::runtime_error(errMsg.str()); - } - - if (!usingTuples) - { - if (gDebug) - std::cout << "### UM wanted tuples but it didn't work out :-(" << std::endl; - } - else - { - if (gDebug) - std::cout << "### UM wanted tuples and we'll do our best;-)" << std::endl; - } - } - else - { - usingTuples = false; - jl = joblist::JobListFactory::makeJobList(&csep, fRm, false, true); - - if (jl->status() == 0) - { - std::string emsg; - - if (jl->putEngineComm(fEc) != 0) - throw std::runtime_error(jl->errMsg()); - } - else - { - throw std::runtime_error("ExeMgr: could not build a JobList!"); - } - } - - jl->doQuery(); - - execplan::CalpontSystemCatalog::OID tableOID; - bool swallowRows = false; - joblist::DeliveredTableMap tm; - uint64_t totalBytesSent = 0; - uint64_t totalRowCount = 0; - - // Project each table as the FE asks for it - for (;;) - { - bs = fIos.read(); - - if (bs.length() == 0) - { - if (gDebug > 1 || (gDebug && !csep.isInternal())) - std::cout << "### Got a close(2) for session id " << csep.sessionID() << std::endl; - - break; - } - - if (gDebug && bs.length() > 4) - std::cout << "### For session id " << csep.sessionID() << ", got too many bytes = " << bs.length() - << std::endl; - - // TODO: Holy crud! Can this be right? - //@bug 1444 Yes, if there is a self-join - if (bs.length() > 4) - { - selfJoin = true; - statementsRunningCount->decr(stmtCounted); - goto new_plan; - } - - assert(bs.length() == 4); - - messageqcpp::ByteStream::quadbyte qb; - - try // @bug2244: try/catch around fIos.write() calls responding to qb command - { - bs >> qb; - - if (gDebug > 1 || (gDebug && !csep.isInternal())) - std::cout << "### For session id " << csep.sessionID() << ", got a command = " << qb - << std::endl; - - if (qb == 0) - { - // No more tables, query is done - break; - } - else if (qb == 1) - { - // super-secret flag indicating that the UM is going to scarf down all the rows in the - // query. - swallowRows = true; - tm = jl->deliveredTables(); - continue; - } - else if (qb == 2) - { - // UM just wants any table - assert(swallowRows); - auto iter = tm.begin(); - - if (iter == tm.end()) - { - if (gDebug > 1 || (gDebug && !csep.isInternal())) - std::cout << "### For session id " << csep.sessionID() << ", returning end flag" - << std::endl; - - bs.restart(); - bs << (messageqcpp::ByteStream::byte)1; - fIos.write(bs); - continue; - } - - tableOID = iter->first; - } - else if (qb == 3) // special option-UM wants job stats std::string - { - std::string statsString; - - // Log stats std::string to be sent back to front end - statsString = formatQueryStats( - jl, "Query Stats", false, - !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF), - (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), totalRowCount); - - bs.restart(); - bs << statsString; - - if ((csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG) != 0) - { - bs << jl->extendedInfo(); - bs << globServiceExeMgr->prettyPrintMiniInfo(jl->miniInfo()); - } - else - { - std::string empty; - bs << empty; - bs << empty; - } - - // send stats to connector for inserting to the querystats table - fStats.serialize(bs); - fIos.write(bs); - continue; - } - // for table mode handling - else if (qb == 4) - { - statementsRunningCount->decr(stmtCounted); - bs = fIos.read(); - goto new_plan; - } - else // (qb > 3) - { - // Return table bands for the requested tableOID - tableOID = static_cast(qb); - } - } - catch (std::exception& ex) - { - std::ostringstream errMsg; - errMsg << "ExeMgr: error writing qb response " - "for qb cmd " - << qb << "; " << ex.what(); - throw std::runtime_error(errMsg.str()); - } - catch (...) - { - std::ostringstream errMsg; - errMsg << "ExeMgr: unknown error writing qb response " - "for qb cmd " - << qb; - throw std::runtime_error(errMsg.str()); - } - - if (swallowRows) - tm.erase(tableOID); - - FEMsgHandler msgHandler(jl, &fIos); - - if (tableOID == 100) - msgHandler.start(); - - //...Loop serializing table bands projected for the tableOID - for (;;) - { - uint32_t rowCount; - - rowCount = jl->projectTable(tableOID, bs); - - msgHandler.stop(); - - if (jl->status()) - { - const auto errInfo = logging::IDBErrorInfo::instance(); - - if (jl->errMsg().length() != 0) - bs << jl->errMsg(); - else - bs << errInfo->errorMsg(jl->status()); - } - - try // @bug2244: try/catch around fIos.write() calls projecting rows - { - if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) - { - // Skip the write to the front end until the last empty band. Used to time queries - // through without any front end waiting. - if (tableOID < 3000 || rowCount == 0) - fIos.write(bs); - } - else - { - fIos.write(bs); - } - } - catch (std::exception& ex) - { - msgHandler.stop(); - std::ostringstream errMsg; - errMsg << "ExeMgr: error projecting rows " - "for tableOID: " - << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount << "; " - << ex.what(); - jl->abort(); - - while (rowCount) - rowCount = jl->projectTable(tableOID, bs); - - if (tableOID == 100 && msgHandler.aborted()) - { - /* TODO: modularize the cleanup code, as well as - * the rest of this fcn */ - - decThreadCntPerSession(csep.sessionID() | 0x80000000); - statementsRunningCount->decr(stmtCounted); - fIos.close(); - return; - } - - // std::cout << "connection drop\n"; - throw std::runtime_error(errMsg.str()); - } - catch (...) - { - std::ostringstream errMsg; - msgHandler.stop(); - errMsg << "ExeMgr: unknown error projecting rows " - "for tableOID: " - << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount; - jl->abort(); - - while (rowCount) - rowCount = jl->projectTable(tableOID, bs); - - throw std::runtime_error(errMsg.str()); - } - - totalRowCount += rowCount; - totalBytesSent += bs.length(); - - if (rowCount == 0) - { - msgHandler.stop(); - // No more bands, table is done - bs.reset(); - - // @bug 2083 decr active statement count here for table mode. - if (!usingTuples) - statementsRunningCount->decr(stmtCounted); - - break; - } - else - { - bs.restart(); - } - } // End of loop to project and serialize table bands for a table - } // End of loop to process tables - - // @bug 828 - if (csep.traceOn()) - jl->graph(csep.sessionID()); - - if (needDbProfEndStatementMsg) - { - std::string ss; - std::ostringstream prefix; - prefix << "ses:" << csep.sessionID() << " Query Totals"; - - // Log stats std::string to standard out - ss = formatQueryStats(jl, prefix.str(), true, - !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF), - (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), - totalRowCount); - //@Bug 1306. Added timing info for real time tracking. - std::cout << ss << " at " << globServiceExeMgr->timeNow() << std::endl; - - // log query stats to debug log file - args.reset(); - args.add((int)csep.statementID()); - args.add(fStats.fMaxMemPct); - args.add(fStats.fNumFiles); - args.add(fStats.fFileBytes); // log raw byte count instead of MB - args.add(fStats.fPhyIO); - args.add(fStats.fCacheIO); - args.add(fStats.fMsgRcvCnt); - args.add(fStats.fMsgBytesIn); - args.add(fStats.fMsgBytesOut); - args.add(fStats.fCPBlocksSkipped); - msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfQueryStats, args, li); - //@bug 1327 - deleteMaxMemPct(csep.sessionID()); - // Calling reset here, will cause joblist destructor to be - // called, which "joins" the threads. We need to do that - // here to make sure all syslogging from all the threads - // are complete; and that our logDbProfEndStatement will - // appear "last" in the syslog for this SQL statement. - // puts the real destruction in another thread to avoid - // making the whole session wait. It can take several seconds. - int stmtID = csep.statementID(); - std::unique_lock scoped(jlMutex); - destructing++; - std::thread bgdtor( - [jl, &jlMutex, &jlCleanupDone, stmtID, &li, &destructing, &msgLog] - { - std::unique_lock scoped(jlMutex); - const_cast(jl).reset(); // this happens second; does real destruction - logging::Message::Args args; - args.add(stmtID); - msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfEndStatement, args, li); - if (--destructing == 0) - jlCleanupDone.notify_one(); - }); - jl.reset(); // this happens first - bgdtor.detach(); - } - else - // delete sessionMemMap entry for this session's memory % use - deleteMaxMemPct(csep.sessionID()); - - std::string endtime(globServiceExeMgr->timeNow()); - - if ((csep.traceFlags() & globServiceExeMgr->flagsWantOutput) && (csep.sessionID() < 0x80000000)) - { - std::cout << "For session " << csep.sessionID() << ": " << totalBytesSent << " bytes sent back at " - << endtime << std::endl; - - // @bug 663 - Implemented caltraceon(16) to replace the - // $FIFO_SINK compiler definition in pColStep. - // This option consumes rows in the project steps. - if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS4) - { - std::cout << std::endl; - std::cout << "**** No data returned to DM. Rows consumed " - "in ProjectSteps - caltrace(16) is on (FIFO_SINK)." - " ****" - << std::endl; - std::cout << std::endl; - } - else if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) - { - std::cout << std::endl; - std::cout << "**** No data returned to DM - caltrace(8) is " - "on (SWALLOW_ROWS_EXEMGR). ****" - << std::endl; - std::cout << std::endl; - } - } - - statementsRunningCount->decr(stmtCounted); - - if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT")) - { - qts.msg_type = querytele::QueryTeleStats::QT_SUMMARY; - qts.max_mem_pct = fStats.fMaxMemPct; - qts.num_files = fStats.fNumFiles; - qts.phy_io = fStats.fPhyIO; - qts.cache_io = fStats.fCacheIO; - qts.msg_rcv_cnt = fStats.fMsgRcvCnt; - qts.cp_blocks_skipped = fStats.fCPBlocksSkipped; - qts.msg_bytes_in = fStats.fMsgBytesIn; - qts.msg_bytes_out = fStats.fMsgBytesOut; - qts.rows = totalRowCount; - qts.end_time = querytele::QueryTeleClient::timeNowms(); - qts.session_id = csep.sessionID(); - qts.query_type = csep.queryType(); - qts.query = csep.data(); - qts.system_name = fOamCachePtr->getSystemName(); - qts.module_name = fOamCachePtr->getModuleName(); - qts.local_query = csep.localQuery(); - fTeleClient.postQueryTele(qts); - } - } - - // Release CSC object (for sessionID) that was added by makeJobList() - // Mask 0x80000000 is for associate user query and csc query. - // (actual joblist destruction happens at the top of this loop) - decThreadCntPerSession(csep.sessionID() | 0x80000000); - } - catch (std::exception& ex) - { - decThreadCntPerSession(csep.sessionID() | 0x80000000); - statementsRunningCount->decr(stmtCounted); - std::cerr << "### ExeMgr ses:" << csep.sessionID() << " caught: " << ex.what() << std::endl; - logging::Message::Args args; - logging::LoggingID li(16, csep.sessionID(), csep.txnID()); - args.add(ex.what()); - msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li); - fIos.close(); - } - catch (...) - { - decThreadCntPerSession(csep.sessionID() | 0x80000000); - statementsRunningCount->decr(stmtCounted); - std::cerr << "### Exception caught!" << std::endl; - logging::Message::Args args; - logging::LoggingID li(16, csep.sessionID(), csep.txnID()); - args.add("ExeMgr caught unknown exception"); - msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li); - fIos.close(); - } - - // make sure we don't leave scope while joblists are being destroyed - std::unique_lock scoped(jlMutex); - while (destructing > 0) - jlCleanupDone.wait(scoped); -} -}; // namespace exemgr \ No newline at end of file diff --git a/exemgr/sqlfrontsessionthread.h b/exemgr/sqlfrontsessionthread.h deleted file mode 100644 index ca605ea79..000000000 --- a/exemgr/sqlfrontsessionthread.h +++ /dev/null @@ -1,131 +0,0 @@ -/* Copyright (C) 2022 Mariadb Corporation. - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, - MA 02110-1301, USA. */ - -#pragma once - -#include -#include -#include -#include - -#undef root_name -#include - -#include "calpontselectexecutionplan.h" -#include "mcsanalyzetableexecutionplan.h" -#include "activestatementcounter.h" -#include "distributedenginecomm.h" -#include "resourcemanager.h" -#include "configcpp.h" -#include "queryteleserverparms.h" -#include "iosocket.h" -#include "joblist.h" -#include "joblistfactory.h" -#include "oamcache.h" -#include "simplecolumn.h" -#include "bytestream.h" -#include "telestats.h" -#include "messageobj.h" -#include "messagelog.h" -#include "sqllogger.h" -#include "femsghandler.h" -#include "idberrorinfo.h" -#include "MonitorProcMem.h" -#include "liboamcpp.h" -#include "crashtrace.h" -#include "service.h" - -#include -#include -#include - -#include "dbrm.h" - -#include "mariadb_my_sys.h" -#include "statistics.h" -#include "serviceexemgr.h" - -namespace exemgr -{ - class SQLFrontSessionThread - { - public: - SQLFrontSessionThread(const messageqcpp::IOSocket& ios, joblist::DistributedEngineComm* ec, - joblist::ResourceManager* rm) - : fIos(ios) - , fEc(ec) - , fRm(rm) - , fStatsRetrieved(false) - , fTeleClient(globServiceExeMgr->getTeleServerParms()) - , fOamCachePtr(oam::OamCache::makeOamCache()) - { - } - - private: - messageqcpp::IOSocket fIos; - joblist::DistributedEngineComm* fEc; - joblist::ResourceManager* fRm; - querystats::QueryStats fStats; - - // Variables used to store return stats - bool fStatsRetrieved; - - querytele::QueryTeleClient fTeleClient; - - oam::OamCache* fOamCachePtr; // this ptr is copyable... - - //...Reinitialize stats for start of a new query - void initStats(uint32_t sessionId, std::string& sqlText) - { - initMaxMemPct(sessionId); - - fStats.reset(); - fStats.setStartTime(); - fStats.fSessionID = sessionId; - fStats.fQuery = sqlText; - fStatsRetrieved = false; - } - //...Get % memory usage during latest query for sesssionId. - //...SessionId >= 0x80000000 is system catalog query we can ignore. - static uint64_t getMaxMemPct(uint32_t sessionId); - //...Delete sessionMemMap entry for the specified session's memory % use. - //...SessionId >= 0x80000000 is system catalog query we can ignore. - static void deleteMaxMemPct(uint32_t sessionId); - //...Get and log query stats to specified output stream - const std::string formatQueryStats( - joblist::SJLP& jl, // joblist associated with query - const std::string& label, // header label to print in front of log output - bool includeNewLine, // include line breaks in query stats std::string - bool vtableModeOn, bool wantExtendedStats, uint64_t rowsReturned); - static void incThreadCntPerSession(uint32_t sessionId); - static void decThreadCntPerSession(uint32_t sessionId); - //...Init sessionMemMap entry for specified session to 0 memory %. - //...SessionId >= 0x80000000 is system catalog query we can ignore. - static void initMaxMemPct(uint32_t sessionId); - //... Round off to human readable format (KB, MB, or GB). - const std::string roundBytes(uint64_t value) const; - void setRMParms(const execplan::CalpontSelectExecutionPlan::RMParmVec& parms); - void buildSysCache(const execplan::CalpontSelectExecutionPlan& csep, - boost::shared_ptr csc); - void writeCodeAndError(messageqcpp::ByteStream::quadbyte code, const std::string emsg); - void analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl, bool& stmtCounted); - void analyzeTableHandleStats(messageqcpp::ByteStream& bs); - uint64_t roundMB(uint64_t value) const; - public: - void operator()(); - }; -} \ No newline at end of file diff --git a/oam/install_scripts/CMakeLists.txt b/oam/install_scripts/CMakeLists.txt index b010b033e..80ca95be9 100644 --- a/oam/install_scripts/CMakeLists.txt +++ b/oam/install_scripts/CMakeLists.txt @@ -10,7 +10,6 @@ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mariadb-columnstore-start.sh.in" "${ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-workernode.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-workernode.service" @ONLY) configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-controllernode.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-controllernode.service" @ONLY) configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-primproc.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-primproc.service" @ONLY) -configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-exemgr.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-exemgr.service" @ONLY) configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-writeengineserver.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-writeengineserver.service" @ONLY) configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-dmlproc.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-dmlproc.service" @ONLY) configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-ddlproc.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-ddlproc.service" @ONLY) @@ -21,12 +20,12 @@ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-loadbrm.py.in" "${CMAKE_CURRENT_ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-savebrm.py.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-savebrm.py" @ONLY) configure_file("${CMAKE_CURRENT_SOURCE_DIR}/columnstoreSyslog.in" "${CMAKE_CURRENT_SOURCE_DIR}/columnstoreSyslog" @ONLY) -install(PROGRAMS columnstore-post-install - columnstore-pre-uninstall - columnstore_run.sh - post-mysql-install - post-mysqld-install - columnstoreSyslogSetup.sh +install(PROGRAMS columnstore-post-install + columnstore-pre-uninstall + columnstore_run.sh + post-mysql-install + post-mysqld-install + columnstoreSyslogSetup.sh mcs-stop-controllernode.sh mcs-loadbrm.py mcs-savebrm.py @@ -45,7 +44,6 @@ install(FILES mariadb-columnstore.service mcs-workernode.service mcs-controllernode.service mcs-primproc.service - mcs-exemgr.service mcs-writeengineserver.service mcs-dmlproc.service mcs-ddlproc.service diff --git a/oam/install_scripts/columnstore-post-install.in b/oam/install_scripts/columnstore-post-install.in index dea0eba12..8f8d33832 100755 --- a/oam/install_scripts/columnstore-post-install.in +++ b/oam/install_scripts/columnstore-post-install.in @@ -70,7 +70,7 @@ quiet=0 stop_mysqld=0 if [ -z "$(pgrep -x mariadbd)" ];then - # Startup mysqld + # Startup mysqld systemctl cat mariadb.service > /dev/null 2>&1 if [ $? -eq 0 ] && [ $(running_systemd) -eq 0 ]; then systemctl start mariadb.service @@ -165,8 +165,6 @@ if [ $user = "root" ]; then cp @ENGINE_SUPPORTDIR@/mcs-ddlproc.service /lib/systemd/system/. >/dev/null 2>&1 cp @ENGINE_SUPPORTDIR@/mcs-dmlproc.service /usr/lib/systemd/system/. >/dev/null 2>&1 cp @ENGINE_SUPPORTDIR@/mcs-dmlproc.service /lib/systemd/system/. >/dev/null 2>&1 - cp @ENGINE_SUPPORTDIR@/mcs-exemgr.service /usr/lib/systemd/system/. >/dev/null 2>&1 - cp @ENGINE_SUPPORTDIR@/mcs-exemgr.service /lib/systemd/system/. >/dev/null 2>&1 cp @ENGINE_SUPPORTDIR@/mcs-primproc.service /usr/lib/systemd/system/. >/dev/null 2>&1 cp @ENGINE_SUPPORTDIR@/mcs-primproc.service /lib/systemd/system/. >/dev/null 2>&1 cp @ENGINE_SUPPORTDIR@/mcs-workernode.service /usr/lib/systemd/system/mcs-workernode@.service >/dev/null 2>&1 @@ -203,7 +201,7 @@ if [ $user = "root" ]; then update-rc.d columnstore defaults 99 > /dev/null 2>&1 else echo "" - echo "Package 'systemctl', 'chkconfig' or 'update-rc.d' not installed, contact your sysadmin if you want to setup to autostart for columnstore" + echo "Package 'systemctl', 'chkconfig' or 'update-rc.d' not installed, contact your sysadmin if you want to setup to autostart for columnstore" fi fi fi @@ -225,7 +223,7 @@ if [ $user = "root" ]; then fi else chown $user:$user @ENGINE_SYSCONFDIR@/columnstore/Columnstore.xml - + cat < /dev/null 2>&1 diff --git a/oam/install_scripts/columnstore-pre-uninstall.in b/oam/install_scripts/columnstore-pre-uninstall.in index 941d9aa7a..82e2bd570 100755 --- a/oam/install_scripts/columnstore-pre-uninstall.in +++ b/oam/install_scripts/columnstore-pre-uninstall.in @@ -35,7 +35,7 @@ systemctl cat mariadb-columnstore.service > /dev/null 2>&1 if [ $? -eq 0 ] && [ $(running_systemd) -eq 0 ]; then systemctl stop mariadb-columnstore >/dev/null 2>&1 else - PROGS='PrimProc ExeMgr DMLProc DDLProc WriteEngineServer StorageManager controllernode workernode' + PROGS='PrimProc ExeMgr DMLProc DDLProc WriteEngineServer StorageManager controllernode workernode' kill $(pidof $PROGS) > /dev/null sleep 3 kill -9 $(pidof $PROGS) > /dev/null @@ -43,7 +43,7 @@ else fi if [ -n "$(pgrep -x ProcMon)" ] || [ -n "$(pgrep -x ProcMgr)" ];then - # Old system must be running, kill ProcMon/ProcMgr + # Old system must be running, kill ProcMon/ProcMgr pkill ProcMon pkill ProcMgr fi @@ -94,8 +94,12 @@ if [ -n "$systemctl" ] && [ $(running_systemd) -eq 0 ]; then rm -f /lib/systemd/system/mcs-ddlproc.service rm -f /usr/lib/systemd/system/mcs-dmlproc.service rm -f /lib/systemd/system/mcs-dmlproc.service - rm -f /usr/lib/systemd/system/mcs-exemgr.service - rm -f /lib/systemd/system/mcs-exemgr.service + if [[ -f /usr/lib/systemd/system/mcs-exemgr.service ]] + rm -f /usr/lib/systemd/system/mcs-exemgr.service + fi + if [[ -f /lib/systemd/system/mcs-exemgr.service ]] + rm -f /lib/systemd/system/mcs-exemgr.service + fi rm -f /usr/lib/systemd/system/mcs-primproc.service rm -f /lib/systemd/system/mcs-primproc.service rm -f /usr/lib/systemd/system/mcs-workernode@.service @@ -123,7 +127,7 @@ else updaterc=`which update-rc.d 2>/dev/null` if [ -n "$updaterc" ]; then update-rc.d -f columnstore remove > /dev/null 2>&1 - rm -f /etc/init.d/columnstore > /dev/null 2>&1 + rm -f /etc/init.d/columnstore > /dev/null 2>&1 fi fi fi diff --git a/oam/install_scripts/mariadb-columnstore-start.sh.in b/oam/install_scripts/mariadb-columnstore-start.sh.in index 351512d62..f41b6647d 100644 --- a/oam/install_scripts/mariadb-columnstore-start.sh.in +++ b/oam/install_scripts/mariadb-columnstore-start.sh.in @@ -12,7 +12,6 @@ flock -n "$fd_lock" || exit 0 /bin/systemctl start mcs-controllernode /bin/systemctl start mcs-primproc /bin/systemctl start mcs-writeengineserver -/bin/systemctl start mcs-exemgr /bin/systemctl start mcs-dmlproc /bin/systemctl start mcs-ddlproc su -s /bin/sh -c '@ENGINE_BINDIR@/dbbuilder 7' @DEFAULT_USER@ 1> @ENGINE_LOGDIR@/install/dbbuilder.log diff --git a/oam/install_scripts/mariadb-columnstore-stop.sh b/oam/install_scripts/mariadb-columnstore-stop.sh index 9bce80618..9e1a879ab 100644 --- a/oam/install_scripts/mariadb-columnstore-stop.sh +++ b/oam/install_scripts/mariadb-columnstore-stop.sh @@ -4,7 +4,6 @@ /bin/systemctl stop mcs-dmlproc /bin/systemctl stop mcs-ddlproc -/bin/systemctl stop mcs-exemgr /bin/systemctl stop mcs-writeengineserver /bin/systemctl stop mcs-primproc /bin/systemctl stop mcs-controllernode diff --git a/oam/install_scripts/mcs-exemgr.service.in b/oam/install_scripts/mcs-exemgr.service.in deleted file mode 100644 index 4cbddfc4a..000000000 --- a/oam/install_scripts/mcs-exemgr.service.in +++ /dev/null @@ -1,23 +0,0 @@ -[Unit] -Description=mcs-exemgr - -# restart/start mcs-exemgr on restart/start of mcs-primproc -PartOf=mcs-primproc.service -After=network.target mcs-primproc.service - -[Service] -Type=oneshot - -User=@DEFAULT_USER@ -Group=@DEFAULT_GROUP@ -LimitNOFILE=65536 -LimitNPROC=65536 - -#ExecStartPre=/usr/bin/env bash -c "ldconfig -p | grep -m1 libjemalloc > /dev/null || echo 'Please install jemalloc to avoid ColumnStore performance degradation and unexpected service interruptions.'" -#ExecStart=/usr/bin/env bash -c "LD_PRELOAD=$(ldconfig -p | grep -m1 libjemalloc | awk '{print $1}') exec @ENGINE_BINDIR@/ExeMgr" -ExecStart=/bin/echo 'EM dummy start' - -RemainAfterExit=yes - -Restart=on-failure -TimeoutStopSec=2 diff --git a/oam/install_scripts/mcs-writeengineserver.service.in b/oam/install_scripts/mcs-writeengineserver.service.in index 32564fc61..2e1470022 100644 --- a/oam/install_scripts/mcs-writeengineserver.service.in +++ b/oam/install_scripts/mcs-writeengineserver.service.in @@ -1,9 +1,9 @@ [Unit] Description=WriteEngineServer -# restart/stop mcs-writeengineserver on restart/stop of mcs-exemgr -PartOf=mcs-exemgr.service -After=network.target mcs-exemgr.service +# restart/stop mcs-writeengineserver on restart/stop of mcs-primproc +PartOf=mcs-primproc.service +After=network.target mcs-primproc.service [Service] Type=forking