diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index 67f40f268..a960e7374 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -292,7 +292,7 @@ void DistributedEngineComm::Setup() catch (std::exception& ex) { if (i < newPmCount) - newPmCount--; + newPmCount = newPmCount > 1 ? newPmCount-1 : 1; // We can't afford to reduce newPmCount to 0 writeToLog(__FILE__, __LINE__, "Could not connect to PMS" + std::to_string(connectionId) + ": " + ex.what(), @@ -302,7 +302,7 @@ void DistributedEngineComm::Setup() catch (...) { if (i < newPmCount) - newPmCount--; + newPmCount = newPmCount > 1 ? newPmCount-1 : 1; // We can't afford to reduce newPmCount to 0 writeToLog(__FILE__, __LINE__, "Could not connect to PMS" + std::to_string(connectionId), LOG_TYPE_ERROR); diff --git a/dbcon/joblist/resourcemanager.cpp b/dbcon/joblist/resourcemanager.cpp index da041efa6..be03066c8 100644 --- a/dbcon/joblist/resourcemanager.cpp +++ b/dbcon/joblist/resourcemanager.cpp @@ -1,4 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. + Copyright (C) 2022 Mariadb Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -40,39 +41,25 @@ using namespace config; namespace joblist { -// const string ResourceManager::fExeMgrStr("ExeMgr1"); -const string ResourceManager::fHashJoinStr("HashJoin"); -const string ResourceManager::fHashBucketReuseStr("HashBucketReuse"); -const string ResourceManager::fJobListStr("JobList"); -const string ResourceManager::fPrimitiveServersStr("PrimitiveServers"); -// const string ResourceManager::fSystemConfigStr("SystemConfig"); -const string ResourceManager::fTupleWSDLStr("TupleWSDL"); -const string ResourceManager::fZDLStr("ZDL"); -const string ResourceManager::fExtentMapStr("ExtentMap"); -// const string ResourceManager::fDMLProcStr("DMLProc"); -// const string ResourceManager::fBatchInsertStr("BatchInsert"); -const string ResourceManager::fOrderByLimitStr("OrderByLimit"); -const string ResourceManager::fRowAggregationStr("RowAggregation"); - ResourceManager* ResourceManager::fInstance = NULL; boost::mutex mx; -ResourceManager* ResourceManager::instance(bool runningInExeMgr) +ResourceManager* ResourceManager::instance(bool runningInExeMgr, config::Config* aConfig) { boost::mutex::scoped_lock lk(mx); if (!fInstance) - fInstance = new ResourceManager(runningInExeMgr); + fInstance = new ResourceManager(runningInExeMgr, aConfig); return fInstance; } -ResourceManager::ResourceManager(bool runningInExeMgr) +ResourceManager::ResourceManager(bool runningInExeMgr, config::Config* aConfig) : fExeMgrStr("ExeMgr1") , fSystemConfigStr("SystemConfig") , fDMLProcStr("DMLProc") , fBatchInsertStr("BatchInsert") - , fConfig(Config::makeConfig()) + , fConfig(aConfig == nullptr ? Config::makeConfig() : aConfig) , fNumCores(8) , fHjNumThreads(defaultNumThreads) , fJlProcessorThreadsPerScan(defaultProcessorThreadsPerScan) diff --git a/dbcon/joblist/resourcemanager.h b/dbcon/joblist/resourcemanager.h index 64ff17a55..76b08760b 100644 --- a/dbcon/joblist/resourcemanager.h +++ b/dbcon/joblist/resourcemanager.h @@ -1,4 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. + Copyright (C) 2022 Mariadb Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -138,11 +139,8 @@ class ResourceManager /** @brief ctor * */ - EXPORT ResourceManager(bool runningInExeMgr = false); - static ResourceManager* instance(bool runningInExeMgr = false); - // ResourceManager(const config::Config *cf); - // ResourceManager(const std::string& config); - // passed by ExeMgr and DistributedEngineComm to MessageQueueServer or -Client + EXPORT ResourceManager(bool runningInExeMgr = false, config::Config *aConfig = nullptr); + static ResourceManager* instance(bool runningInExeMgr = false, config::Config *aConfig = nullptr); config::Config* getConfig() { return fConfig; @@ -577,19 +575,19 @@ class ResourceManager void logMessage(logging::LOG_TYPE logLevel, logging::Message::MessageID mid, uint64_t value = 0, uint32_t sessionId = 0); - /*static const*/ std::string fExeMgrStr; - static const std::string fHashJoinStr; - static const std::string fHashBucketReuseStr; - static const std::string fJobListStr; - static const std::string fPrimitiveServersStr; + std::string fExeMgrStr; + inline static const std::string fHashJoinStr = "HashJoin"; + inline static const std::string fHashBucketReuseStr = "HashBucketReuse"; + inline static const std::string fJobListStr = "JobList"; + inline static const std::string fPrimitiveServersStr = "PrimitiveServers"; /*static const*/ std::string fSystemConfigStr; - static const std::string fTupleWSDLStr; - static const std::string fZDLStr; - static const std::string fExtentMapStr; + inline static const std::string fTupleWSDLStr = "TupleWSDL"; + inline static const std::string fZDLStr = "ZDL"; + inline static const std::string fExtentMapStr = "ExtentMap"; /*static const*/ std::string fDMLProcStr; /*static const*/ std::string fBatchInsertStr; - static const std::string fOrderByLimitStr; - static const std::string fRowAggregationStr; + inline static const std::string fOrderByLimitStr = "OrderByLimit"; + inline static const std::string fRowAggregationStr = "RowAggregation"; config::Config* fConfig; static ResourceManager* fInstance; uint32_t fTraceFlags; diff --git a/debian/mariadb-plugin-columnstore.install b/debian/mariadb-plugin-columnstore.install index 39f4e8c72..20241ff57 100644 --- a/debian/mariadb-plugin-columnstore.install +++ b/debian/mariadb-plugin-columnstore.install @@ -6,7 +6,6 @@ etc/mysql/mariadb.conf.d/columnstore.cnf usr/bin/mcsRebuildEM usr/bin/DDLProc usr/bin/DMLProc -usr/bin/ExeMgr usr/bin/PrimProc usr/bin/StorageManager usr/bin/WriteEngineServer diff --git a/exemgr/CMakeLists.txt b/exemgr/CMakeLists.txt index 729cc98cd..e0d17d205 100644 --- a/exemgr/CMakeLists.txt +++ b/exemgr/CMakeLists.txt @@ -1,18 +1,18 @@ -include_directories( ${ENGINE_COMMON_INCLUDES} ) +# include_directories( ${ENGINE_COMMON_INCLUDES} ) ########### next target ############### -set(ExeMgr_SRCS serviceexemgr.cpp sqlfrontsessionthread.cpp rssmonfcn.cpp exemgr.cpp activestatementcounter.cpp femsghandler.cpp ../utils/common/crashtrace.cpp) +# set(ExeMgr_SRCS serviceexemgr.cpp sqlfrontsessionthread.cpp rssmonfcn.cpp exemgr.cpp activestatementcounter.cpp femsghandler.cpp ../utils/common/crashtrace.cpp) -add_executable(ExeMgr ${ExeMgr_SRCS}) +# add_executable(ExeMgr ${ExeMgr_SRCS}) -target_link_libraries(ExeMgr ${ENGINE_LDFLAGS} ${ENGINE_EXEC_LIBS} ${NETSNMP_LIBRARIES} ${MARIADB_CLIENT_LIBS} cacheutils threadpool) +# target_link_libraries(ExeMgr ${ENGINE_LDFLAGS} ${ENGINE_EXEC_LIBS} ${NETSNMP_LIBRARIES} ${MARIADB_CLIENT_LIBS} cacheutils threadpool) -target_include_directories(ExeMgr PRIVATE ${Boost_INCLUDE_DIRS}) +# target_include_directories(ExeMgr PRIVATE ${Boost_INCLUDE_DIRS}) -install(TARGETS ExeMgr DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-engine) +# install(TARGETS ExeMgr DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-engine) ########### install files ############### diff --git a/exemgr/serviceexemgr.h b/exemgr/serviceexemgr.h index 544978ee7..afcf07914 100644 --- a/exemgr/serviceexemgr.h +++ b/exemgr/serviceexemgr.h @@ -84,6 +84,7 @@ namespace exemgr } } } + Opt(): m_debug(0), m_e(false), m_fg(false) {}; int getDebugLevel() const { return m_debug; diff --git a/oam/install_scripts/mcs-exemgr.service.in b/oam/install_scripts/mcs-exemgr.service.in index b9a36fcaf..4cbddfc4a 100644 --- a/oam/install_scripts/mcs-exemgr.service.in +++ b/oam/install_scripts/mcs-exemgr.service.in @@ -6,15 +6,18 @@ PartOf=mcs-primproc.service After=network.target mcs-primproc.service [Service] -Type=forking +Type=oneshot User=@DEFAULT_USER@ Group=@DEFAULT_GROUP@ LimitNOFILE=65536 LimitNPROC=65536 -ExecStartPre=/usr/bin/env bash -c "ldconfig -p | grep -m1 libjemalloc > /dev/null || echo 'Please install jemalloc to avoid ColumnStore performance degradation and unexpected service interruptions.'" -ExecStart=/usr/bin/env bash -c "LD_PRELOAD=$(ldconfig -p | grep -m1 libjemalloc | awk '{print $1}') exec @ENGINE_BINDIR@/ExeMgr" +#ExecStartPre=/usr/bin/env bash -c "ldconfig -p | grep -m1 libjemalloc > /dev/null || echo 'Please install jemalloc to avoid ColumnStore performance degradation and unexpected service interruptions.'" +#ExecStart=/usr/bin/env bash -c "LD_PRELOAD=$(ldconfig -p | grep -m1 libjemalloc | awk '{print $1}') exec @ENGINE_BINDIR@/ExeMgr" +ExecStart=/bin/echo 'EM dummy start' + +RemainAfterExit=yes Restart=on-failure TimeoutStopSec=2 diff --git a/primitives/primproc/CMakeLists.txt b/primitives/primproc/CMakeLists.txt index b8d2e3741..69d83f464 100644 --- a/primitives/primproc/CMakeLists.txt +++ b/primitives/primproc/CMakeLists.txt @@ -19,14 +19,17 @@ set(PrimProc_SRCS pseudocc.cpp rtscommand.cpp umsocketselector.cpp + serviceexemgr.cpp + sqlfrontsessionthread.cpp + rssmonfcn.cpp + activestatementcounter.cpp + femsghandler.cpp ../../utils/common/crashtrace.cpp) add_executable(PrimProc ${PrimProc_SRCS}) add_dependencies(PrimProc loggingcpp) - +target_include_directories(PrimProc PRIVATE ${Boost_INCLUDE_DIRS}) target_link_libraries(PrimProc ${ENGINE_LDFLAGS} ${NETSNMP_LIBRARIES} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} threadpool cacheutils dbbc processor) -install(TARGETS PrimProc DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-engine) - - +install(TARGETS PrimProc DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-engine) \ No newline at end of file diff --git a/primitives/primproc/activestatementcounter.cpp b/primitives/primproc/activestatementcounter.cpp new file mode 100644 index 000000000..b25e61549 --- /dev/null +++ b/primitives/primproc/activestatementcounter.cpp @@ -0,0 +1,59 @@ +/* Copyright (C) 2014 InfiniDB, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +// $Id: activestatementcounter.cpp 940 2013-01-21 14:11:31Z rdempsey $ +// + +#include +#include +using namespace boost; + +#include "activestatementcounter.h" + +void ActiveStatementCounter::incr(bool& counted) +{ + if (counted) + return; + + counted = true; + boost::mutex::scoped_lock lk(fMutex); + + if (upperLimit > 0) + while (fStatementCount >= upperLimit) + { + fStatementsWaiting++; + condvar.wait(lk); + --fStatementsWaiting; + } + + fStatementCount++; +} + +void ActiveStatementCounter::decr(bool& counted) +{ + if (!counted) + return; + + counted = false; + boost::mutex::scoped_lock lk(fMutex); + + if (fStatementCount == 0) + return; + + --fStatementCount; + condvar.notify_one(); +} diff --git a/primitives/primproc/activestatementcounter.h b/primitives/primproc/activestatementcounter.h new file mode 100644 index 000000000..87e7163aa --- /dev/null +++ b/primitives/primproc/activestatementcounter.h @@ -0,0 +1,64 @@ +/* Copyright (C) 2014 InfiniDB, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +// $Id: activestatementcounter.h 940 2013-01-21 14:11:31Z rdempsey $ +// +/** @file */ + +#pragma once + +#include + +#include +#include + +#include "vss.h" + +class ActiveStatementCounter +{ + public: + ActiveStatementCounter(uint32_t limit) : fStatementCount(0), upperLimit(limit), fStatementsWaiting(0) + { + } + + virtual ~ActiveStatementCounter() + { + } + + void incr(bool& counted); + void decr(bool& counted); + uint32_t cur() const + { + return fStatementCount; + } + uint32_t waiting() const + { + return fStatementsWaiting; + } + + private: + ActiveStatementCounter(const ActiveStatementCounter& rhs); + ActiveStatementCounter& operator=(const ActiveStatementCounter& rhs); + + uint32_t fStatementCount; + uint32_t upperLimit; + uint32_t fStatementsWaiting; + boost::mutex fMutex; + boost::condition condvar; + BRM::VSS fVss; +}; + diff --git a/primitives/primproc/femsghandler.cpp b/primitives/primproc/femsghandler.cpp new file mode 100644 index 000000000..0ce22519f --- /dev/null +++ b/primitives/primproc/femsghandler.cpp @@ -0,0 +1,143 @@ +/* Copyright (C) 2014 InfiniDB, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include "messagequeue.h" +#include "iosocket.h" + +#include "femsghandler.h" + +using namespace std; +using namespace joblist; +using namespace messageqcpp; + +threadpool::ThreadPool FEMsgHandler::threadPool; + +namespace +{ +class Runner +{ + public: + Runner(FEMsgHandler* f) : target(f) + { + } + void operator()() + { + target->threadFcn(); + } + FEMsgHandler* target; +}; + +} // namespace + +FEMsgHandler::FEMsgHandler() : die(false), running(false), sawData(false), sock(NULL) +{ +} + +FEMsgHandler::FEMsgHandler(boost::shared_ptr j, IOSocket* s) + : die(false), running(false), sawData(false), jl(j) +{ + sock = s; + assert(sock); +} + +FEMsgHandler::~FEMsgHandler() +{ + stop(); + threadPool.join(thr); +} + +void FEMsgHandler::start() +{ + if (!running) + { + running = true; + thr = threadPool.invoke(Runner(this)); + } +} + +void FEMsgHandler::stop() +{ + die = true; + jl.reset(); +} + +void FEMsgHandler::setJobList(boost::shared_ptr j) +{ + jl = j; +} + +void FEMsgHandler::setSocket(IOSocket* i) +{ + sock = i; + assert(sock); +} + +/* Note, the next two fcns strongly depend on ExeMgr's current implementation. There's a + * good chance that if ExeMgr's table send loop is changed, these will need to be + * updated to match. + */ + +/* This is currently only called if InetStreamSocket::write() throws, implying + * a connection error. It might not make sense in other contexts. + */ +bool FEMsgHandler::aborted() +{ + if (sawData) + return true; + + boost::mutex::scoped_lock sl(mutex); + int err; + int connectionNum = sock->getConnectionNum(); + + err = InetStreamSocket::pollConnection(connectionNum, 1000); + + if (err == 1) + { + sawData = true; + return true; + } + + return false; +} + +void FEMsgHandler::threadFcn() +{ + int err = 0; + int connectionNum = sock->getConnectionNum(); + + /* This waits for the next readable event on sock. An abort is signaled + * by sending something (anything at the moment), then dropping the connection. + * This fcn exits on all other events. + */ + while (!die && err == 0) + { + boost::mutex::scoped_lock sl(mutex); + err = InetStreamSocket::pollConnection(connectionNum, 1000); + } + + if (err == 1) + sawData = true; // there's data to read, must be the abort signal + + if (!die && (err == 2 || err == 1)) + { + die = true; + jl->abort(); + jl.reset(); + } + + running = false; +} diff --git a/primitives/primproc/femsghandler.h b/primitives/primproc/femsghandler.h new file mode 100644 index 000000000..baa91ad90 --- /dev/null +++ b/primitives/primproc/femsghandler.h @@ -0,0 +1,47 @@ +/* Copyright (C) 2014 InfiniDB, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include "joblist.h" +#include "inetstreamsocket.h" +#include "threadpool.h" + +class FEMsgHandler +{ + public: + FEMsgHandler(); + FEMsgHandler(boost::shared_ptr, messageqcpp::IOSocket*); + virtual ~FEMsgHandler(); + + void start(); + void stop(); + void setJobList(boost::shared_ptr); + void setSocket(messageqcpp::IOSocket*); + bool aborted(); + + void threadFcn(); + + static threadpool::ThreadPool threadPool; + + private: + bool die, running, sawData; + messageqcpp::IOSocket* sock; + boost::shared_ptr jl; + boost::mutex mutex; + uint64_t thr; +}; diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index ad073ddc7..89c6b70f5 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -945,7 +945,6 @@ struct AsynchLoader QueryContext ver; uint32_t txn; int compType; - uint8_t dataWidth; bool LBIDTrace; uint32_t sessionID; uint32_t* cacheCount; @@ -1420,8 +1419,8 @@ struct BPPHandler /* Uncomment these lines to verify duplicate(). == op might need updating */ // if (*bpp != *dup) - // cerr << "createBPP: duplicate mismatch at index " << i << - // endl; + // cerr << "createBPP: duplicate mismatch at index " << i + // << endl; // idbassert(*bpp == *dup); bppv->add(dup); } @@ -2441,7 +2440,7 @@ PrimitiveServer::~PrimitiveServer() { } -void PrimitiveServer::start(Service* service) +void PrimitiveServer::start(Service* service, utils::USpaceSpinLock& startupRaceLock) { // start all the server threads for (int i = 1; i <= fServerThreads; i++) @@ -2452,7 +2451,7 @@ void PrimitiveServer::start(Service* service) fServerpool.invoke(ServerThread(oss.str(), this)); } - + startupRaceLock.release(); service->NotifyServiceStarted(); fServerpool.wait(); diff --git a/primitives/primproc/primitiveserver.h b/primitives/primproc/primitiveserver.h index 5f35911e4..c79c0f807 100644 --- a/primitives/primproc/primitiveserver.h +++ b/primitives/primproc/primitiveserver.h @@ -124,7 +124,7 @@ class PrimitiveServer /** @brief start the primitive server * */ - void start(Service* p); + void start(Service* p, utils::USpaceSpinLock& startupRaceLock); /** @brief get a pointer the shared processor thread pool */ diff --git a/primitives/primproc/primproc.cpp b/primitives/primproc/primproc.cpp index b65846533..260a3ca96 100644 --- a/primitives/primproc/primproc.cpp +++ b/primitives/primproc/primproc.cpp @@ -1,5 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. - Copyright (C) 2016 MariaDB Corporation + Copyright (C) 2016-2022 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -30,15 +30,15 @@ #endif #include #include -#ifndef _MSC_VER #include #include -#else -#include -#endif + #include #include #include +#include +#include +#include //#define NDEBUG #include using namespace std; @@ -72,7 +72,9 @@ using namespace idbdatafile; #include "mariadb_my_sys.h" +#include "spinlock.h" #include "service.h" +#include "serviceexemgr.h" class Opt { @@ -115,6 +117,14 @@ class ServicePrimProc : public Service, public Opt { return m_fg ? Child() : RunForking(); } + std::atomic_flag& getStartupRaceFlag() + { + return startupRaceFlag_; + } + + private: + // Since C++20 flag's init value is false. + std::atomic_flag startupRaceFlag_; }; namespace primitiveprocessor @@ -152,28 +162,11 @@ int toInt(const string& val) void setupSignalHandlers() { #ifndef _MSC_VER - signal(SIGHUP, SIG_IGN); - struct sigaction ign; - - memset(&ign, 0, sizeof(ign)); - ign.sa_handler = SIG_IGN; - sigaction(SIGPIPE, &ign, 0); - - memset(&ign, 0, sizeof(ign)); - ign.sa_handler = SIG_IGN; - sigaction(SIGUSR1, &ign, 0); - memset(&ign, 0, sizeof(ign)); ign.sa_handler = SIG_IGN; sigaction(SIGUSR2, &ign, 0); - memset(&ign, 0, sizeof(ign)); - ign.sa_handler = fatalHandler; - sigaction(SIGSEGV, &ign, 0); - sigaction(SIGABRT, &ign, 0); - sigaction(SIGFPE, &ign, 0); - sigset_t sigset; sigemptyset(&sigset); sigaddset(&sigset, SIGPIPE); @@ -382,6 +375,18 @@ int ServicePrimProc::Child() NotifyServiceInitializationFailed(); return 2; } + utils::USpaceSpinLock startupRaceLock(getStartupRaceFlag()); + std::thread exeMgrThread( + [this, cf]() + { + exemgr::Opt opt; + exemgr::globServiceExeMgr = new exemgr::ServiceExeMgr(opt, cf); + // primitive delay to avoid 'not connected to PM' log error messages + // from EM. PrimitiveServer::start() releases SpinLock after sockets + // are available. + utils::USpaceSpinLock startupRaceLock(this->getStartupRaceFlag()); + exemgr::globServiceExeMgr->Child(); + }); int serverThreads = 1; int serverQueueSize = 10; @@ -395,7 +400,6 @@ int ServicePrimProc::Child() bool rotatingDestination = false; uint32_t deleteBlocks = 128; bool PTTrace = false; - int temp; string strTemp; int priority = -1; const string primitiveServers("PrimitiveServers"); @@ -414,7 +418,7 @@ int ServicePrimProc::Child() gDebugLevel = primitiveprocessor::NONE; - temp = toInt(cf->getConfig(primitiveServers, "ServerThreads")); + int temp = toInt(cf->getConfig(primitiveServers, "ServerThreads")); if (temp > 0) serverThreads = temp; @@ -761,7 +765,7 @@ int ServicePrimProc::Child() } #endif - server.start(this); + server.start(this, startupRaceLock); cerr << "server.start() exited!" << endl; diff --git a/primitives/primproc/rssmonfcn.cpp b/primitives/primproc/rssmonfcn.cpp new file mode 100644 index 000000000..8a9b62d83 --- /dev/null +++ b/primitives/primproc/rssmonfcn.cpp @@ -0,0 +1,69 @@ +/* Copyright (C) 2022 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include + +#include "rssmonfcn.h" +#include "serviceexemgr.h" + +namespace exemgr +{ + void RssMonFcn::operator()() const + { + logging::Logger& msgLog = globServiceExeMgr->getLogger(); + auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); + for (;;) + { + size_t rssMb = rss(); + size_t pct = rssMb * 100 / fMemTotal; + + if (pct > fMaxPct) + { + if (fMaxPct >= 95) + { + std::cerr << "Too much memory allocated!" << std::endl; + logging::Message::Args args; + args.add((int)pct); + args.add((int)fMaxPct); + msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logRssTooBig, args, logging::LoggingID(16)); + exit(1); + } + + if (statementsRunningCount->cur() == 0) + { + std::cerr << "Too much memory allocated!" << std::endl; + logging::Message::Args args; + args.add((int)pct); + args.add((int)fMaxPct); + msgLog.logMessage(logging::LOG_TYPE_WARNING, ServiceExeMgr::logRssTooBig, args, logging::LoggingID(16)); + exit(1); + } + + std::cerr << "Too much memory allocated, but stmts running" << std::endl; + } + + // Update sessionMemMap entries lower than current mem % use + globServiceExeMgr->updateSessionMap(pct); + + pause_(); + } + } + void startRssMon(size_t maxPct, int pauseSeconds) + { + new std::thread(RssMonFcn(maxPct, pauseSeconds)); + } +} // namespace \ No newline at end of file diff --git a/primitives/primproc/rssmonfcn.h b/primitives/primproc/rssmonfcn.h new file mode 100644 index 000000000..341038e91 --- /dev/null +++ b/primitives/primproc/rssmonfcn.h @@ -0,0 +1,36 @@ +/* Copyright (C) 2022 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include + +#include "MonitorProcMem.h" + +namespace exemgr +{ +class RssMonFcn : public utils::MonitorProcMem +{ + public: + RssMonFcn(size_t maxPct, int pauseSeconds) : MonitorProcMem(maxPct, 0, 21, pauseSeconds) + { + } + + void operator()() const; +}; + +} // namespace \ No newline at end of file diff --git a/primitives/primproc/serviceexemgr.cpp b/primitives/primproc/serviceexemgr.cpp new file mode 100644 index 000000000..8746b68f7 --- /dev/null +++ b/primitives/primproc/serviceexemgr.cpp @@ -0,0 +1,457 @@ +/* Copyright (C) 2014 InfiniDB, Inc. + Copyright (C) 2019-22 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +/********************************************************************** + * $Id: main.cpp 1000 2013-07-24 21:05:51Z pleblanc $ + * + * + ***********************************************************************/ +/** + * @brief execution plan manager main program + * + * This is the ?main? program for dealing with execution plans and + * result sets. It sits in a loop waiting for CalpontExecutionPlan + * from the FEP. It then passes the CalpontExecutionPlan to the + * JobListFactory from which a JobList is obtained. This is passed + * to to the Query Manager running in the real-time portion of the + * EC. The ExecutionPlanManager waits until the Query Manager + * returns a result set for the job list. These results are passed + * into the CalpontResultFactory, which outputs a CalpontResultSet. + * The ExecutionPlanManager passes the CalpontResultSet into the + * VendorResultFactory which produces a result set tailored to the + * specific DBMS front end in use. The ExecutionPlanManager then + * sends the VendorResultSet back to the Calpont Database Connector + * on the Front-End Processor where it is returned to the DBMS + * front-end. + */ +#include +#include +#include +#include + +#undef root_name +#include + +#include "calpontselectexecutionplan.h" +#include "mcsanalyzetableexecutionplan.h" +#include "activestatementcounter.h" +#include "distributedenginecomm.h" +#include "resourcemanager.h" +#include "configcpp.h" +#include "queryteleserverparms.h" +#include "iosocket.h" +#include "joblist.h" +#include "joblistfactory.h" +#include "oamcache.h" +#include "simplecolumn.h" +#include "bytestream.h" +#include "telestats.h" +#include "messageobj.h" +#include "messagelog.h" +#include "sqllogger.h" +#include "femsghandler.h" +#include "idberrorinfo.h" +#include "MonitorProcMem.h" +#include "liboamcpp.h" +#include "crashtrace.h" +#include "service.h" + +#include +#include +#include + +#include "dbrm.h" + +#include "mariadb_my_sys.h" +#include "statistics.h" +#include "serviceexemgr.h" +#include "sqlfrontsessionthread.h" + +namespace exemgr +{ +ServiceExeMgr* globServiceExeMgr = nullptr; +void startRssMon(size_t maxPct, int pauseSeconds); + +void added_a_pm(int) +{ + logging::LoggingID logid(21, 0, 0); + logging::Message::Args args1; + logging::Message msg(1); + args1.add("exeMgr caught SIGHUP. Resetting connections"); + msg.format(args1); + std::cout << msg.msg().c_str() << std::endl; + logging::Logger logger(logid.fSubsysID); + logger.logMessage(logging::LOG_TYPE_DEBUG, msg, logid); + + auto* dec = exemgr::globServiceExeMgr->getDec(); + if (dec) + { + oam::OamCache* oamCache = oam::OamCache::makeOamCache(); + oamCache->forceReload(); + dec->Setup(); + } +} + +void printTotalUmMemory(int sig) +{ + int64_t num = globServiceExeMgr->getRm().availableMemory(); + std::cout << "Total UM memory available: " << num << std::endl; +} + +void ServiceExeMgr::setupSignalHandlers() +{ + struct sigaction ign; + + memset(&ign, 0, sizeof(ign)); + ign.sa_handler = SIG_IGN; + + sigaction(SIGPIPE, &ign, 0); + + memset(&ign, 0, sizeof(ign)); + ign.sa_handler = exemgr::added_a_pm; + sigaction(SIGHUP, &ign, 0); + ign.sa_handler = exemgr::printTotalUmMemory; + sigaction(SIGUSR1, &ign, 0); + memset(&ign, 0, sizeof(ign)); + ign.sa_handler = fatalHandler; + sigaction(SIGSEGV, &ign, 0); + sigaction(SIGABRT, &ign, 0); + sigaction(SIGFPE, &ign, 0); +} + +void cleanTempDir() +{ + using TempDirPurpose = config::Config::TempDirPurpose; + struct Dirs + { + std::string section; + std::string allowed; + TempDirPurpose purpose; + }; + std::vector dirs{{"HashJoin", "AllowDiskBasedJoin", TempDirPurpose::Joins}, + {"RowAggregation", "AllowDiskBasedAggregation", TempDirPurpose::Aggregates}}; + const auto config = config::Config::makeConfig(); + + for (const auto& dir : dirs) + { + std::string allowStr = config->getConfig(dir.section, dir.allowed); + bool allow = (allowStr == "Y" || allowStr == "y"); + + std::string tmpPrefix = config->getTempFileDir(dir.purpose); + + if (allow && tmpPrefix.empty()) + { + std::cerr << "Empty tmp directory name for " << dir.section << std::endl; + logging::LoggingID logid(16, 0, 0); + logging::Message::Args args; + logging::Message message(8); + args.add("Empty tmp directory name for:"); + args.add(dir.section); + message.format(args); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(logging::LOG_TYPE_CRITICAL, message, logid); + } + + tmpPrefix += "/"; + + idbassert(tmpPrefix != "/"); + + /* This is quite scary as ExeMgr usually runs as root */ + try + { + if (allow) + { + boost::filesystem::remove_all(tmpPrefix); + } + boost::filesystem::create_directories(tmpPrefix); + } + catch (const std::exception& ex) + { + std::cerr << ex.what() << std::endl; + logging::LoggingID logid(16, 0, 0); + logging::Message::Args args; + logging::Message message(8); + args.add("Exception whilst cleaning tmpdir: "); + args.add(ex.what()); + message.format(args); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(logging::LOG_TYPE_WARNING, message, logid); + } + catch (...) + { + std::cerr << "Caught unknown exception during tmpdir cleanup" << std::endl; + logging::LoggingID logid(16, 0, 0); + logging::Message::Args args; + logging::Message message(8); + args.add("Unknown exception whilst cleaning tmpdir"); + message.format(args); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(logging::LOG_TYPE_WARNING, message, logid); + } + } +} + +const std::string ServiceExeMgr::prettyPrintMiniInfo(const std::string& in) +{ + // 1. take the std::string and tok it by '\n' + // 2. for each part in each line calc the longest part + // 3. padding to each longest value, output a header and the lines + using CharTraits = std::basic_string::traits_type; + using CharSeparator = boost::char_separator; + using Tokeniser = boost::tokenizer>; + CharSeparator sep1("\n"); + Tokeniser tok1(in, sep1); + std::vector lines; + std::string header = "Desc Mode Table TableOID ReferencedColumns PIO LIO PBE Elapsed Rows"; + const int header_parts = 10; + lines.push_back(header); + + for (auto iter1 = tok1.begin(); iter1 != tok1.end(); ++iter1) + { + if (!iter1->empty()) + lines.push_back(*iter1); + } + + std::vector lens; + + for (int i = 0; i < header_parts; i++) + lens.push_back(0); + + std::vector> lineparts; + std::vector::iterator iter2; + int j; + + for (iter2 = lines.begin(), j = 0; iter2 != lines.end(); ++iter2, j++) + { + CharSeparator sep2(" "); + Tokeniser tok2(*iter2, sep2); + int i; + std::vector parts; + Tokeniser::iterator iter3; + + for (iter3 = tok2.begin(), i = 0; iter3 != tok2.end(); ++iter3, i++) + { + if (i >= header_parts) + break; + + std::string part(*iter3); + + if (j != 0 && i == 8) + part.resize(part.size() - 3); + + assert(i < header_parts); + + if (part.size() > lens[i]) + lens[i] = part.size(); + + parts.push_back(part); + } + + assert(i == header_parts); + lineparts.push_back(parts); + } + + std::ostringstream oss; + + std::vector>::iterator iter1 = lineparts.begin(); + std::vector>::iterator end1 = lineparts.end(); + + oss << "\n"; + + while (iter1 != end1) + { + std::vector::iterator iter2 = iter1->begin(); + std::vector::iterator end2 = iter1->end(); + assert(distance(iter2, end2) == header_parts); + int i = 0; + + while (iter2 != end2) + { + assert(i < header_parts); + oss << std::setw(lens[i]) << std::left << *iter2 << " "; + ++iter2; + i++; + } + + oss << "\n"; + ++iter1; + } + + return oss.str(); +} + +int ServiceExeMgr::Child() +{ + // Make sure CSC thinks it's on a UM or else bucket reuse stuff below will stall + if (!m_e) + setenv("CALPONT_CSC_IDENT", "um", 1); + + setupSignalHandlers(); + int err = 0; + if (!m_debug) + err = setupResources(); + std::string errMsg; + + switch (err) + { + case -1: + case -3: errMsg = "Error getting file limits, please see non-root install documentation"; break; + + case -2: errMsg = "Error setting file limits, please see non-root install documentation"; break; + + case -4: + errMsg = "Could not install file limits to required value, please see non-root install documentation"; + break; + + default: errMsg = "Couldn't change working directory or unknown error"; break; + } + + cleanTempDir(); + + logging::MsgMap msgMap; + msgMap[logDefaultMsg] = logging::Message(logDefaultMsg); + msgMap[logDbProfStartStatement] = logging::Message(logDbProfStartStatement); + msgMap[logDbProfEndStatement] = logging::Message(logDbProfEndStatement); + msgMap[logStartSql] = logging::Message(logStartSql); + msgMap[logEndSql] = logging::Message(logEndSql); + msgMap[logRssTooBig] = logging::Message(logRssTooBig); + msgMap[logDbProfQueryStats] = logging::Message(logDbProfQueryStats); + msgMap[logExeMgrExcpt] = logging::Message(logExeMgrExcpt); + msgLog_.msgMap(msgMap); + + dec_ = joblist::DistributedEngineComm::instance(rm_, true); + dec_->Open(); + + bool tellUser = true; + + messageqcpp::MessageQueueServer* mqs; + + statementsRunningCount_ = new ActiveStatementCounter(rm_->getEmExecQueueSize()); + const std::string ExeMgr = "ExeMgr1"; + for (;;) + { + try + { + mqs = new messageqcpp::MessageQueueServer(ExeMgr, rm_->getConfig(), messageqcpp::ByteStream::BlockSize, + 64); + break; + } + catch (std::runtime_error& re) + { + std::string what = re.what(); + + if (what.find("Address already in use") != std::string::npos) + { + if (tellUser) + { + std::cerr << "Address already in use, retrying..." << std::endl; + tellUser = false; + } + + sleep(5); + } + else + { + throw; + } + } + } + + // class jobstepThreadPool is used by other processes. We can't call + // resourcemanaager (rm) functions during the static creation of threadpool + // because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton). + // From the pools perspective, it has no idea if it is ExeMgr doing the + // creation, so it has no idea which way to set the flag. So we set the max here. + joblist::JobStep::jobstepThreadPool.setMaxThreads(rm_->getJLThreadPoolSize()); + joblist::JobStep::jobstepThreadPool.setName("ExeMgrJobList"); + + if (rm_->getJlThreadPoolDebug() == "Y" || rm_->getJlThreadPoolDebug() == "y") + { + joblist::JobStep::jobstepThreadPool.setDebug(true); + joblist::JobStep::jobstepThreadPool.invoke( + threadpool::ThreadPoolMonitor(&joblist::JobStep::jobstepThreadPool)); + } + + int serverThreads = rm_->getEmServerThreads(); + int maxPct = rm_->getEmMaxPct(); + int pauseSeconds = rm_->getEmSecondsBetweenMemChecks(); + int priority = rm_->getEmPriority(); + + FEMsgHandler::threadPool.setMaxThreads(serverThreads); + FEMsgHandler::threadPool.setName("FEMsgHandler"); + + if (maxPct > 0) + { + // Defined in rssmonfcn.cpp + exemgr::startRssMon(maxPct, pauseSeconds); + } + + setpriority(PRIO_PROCESS, 0, priority); + + std::string teleServerHost(rm_->getConfig()->getConfig("QueryTele", "Host")); + + if (!teleServerHost.empty()) + { + int teleServerPort = toInt(rm_->getConfig()->getConfig("QueryTele", "Port")); + + if (teleServerPort > 0) + { + teleServerParms_ = querytele::QueryTeleServerParms(teleServerHost, teleServerPort); + } + } + + std::cout << "Starting ExeMgr: st = " << serverThreads << ", qs = " << rm_->getEmExecQueueSize() + << ", mx = " << maxPct << ", cf = " << rm_->getConfig()->configFile() << std::endl; + + { + BRM::DBRM* dbrm = new BRM::DBRM(); + dbrm->setSystemQueryReady(true); + delete dbrm; + } + + threadpool::ThreadPool exeMgrThreadPool(serverThreads, 0); + exeMgrThreadPool.setName("ExeMgrServer"); + + if (rm_->getExeMgrThreadPoolDebug() == "Y" || rm_->getExeMgrThreadPoolDebug() == "y") + { + exeMgrThreadPool.setDebug(true); + exeMgrThreadPool.invoke(threadpool::ThreadPoolMonitor(&exeMgrThreadPool)); + } + + // Load statistics. + try + { + statistics::StatisticsManager::instance()->loadFromFile(); + } + catch (...) + { + std::cerr << "Cannot load statistics from file " << std::endl; + } + + for (;;) + { + messageqcpp::IOSocket ios; + ios = mqs->accept(); + exeMgrThreadPool.invoke(exemgr::SQLFrontSessionThread(ios, dec_, rm_)); + } + + exeMgrThreadPool.wait(); + + return 0; +} +} // namespace exemgr diff --git a/primitives/primproc/serviceexemgr.h b/primitives/primproc/serviceexemgr.h new file mode 100644 index 000000000..606ae48db --- /dev/null +++ b/primitives/primproc/serviceexemgr.h @@ -0,0 +1,348 @@ +/* Copyright (C) 2022 Mariadb Corporation. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include +#include +#include +#include + +#undef root_name +#include + +#include "calpontselectexecutionplan.h" +#include "mcsanalyzetableexecutionplan.h" +#include "activestatementcounter.h" +#include "distributedenginecomm.h" +#include "resourcemanager.h" +#include "configcpp.h" +#include "queryteleserverparms.h" +#include "iosocket.h" +#include "joblist.h" +#include "joblistfactory.h" +#include "oamcache.h" +#include "simplecolumn.h" +#include "bytestream.h" +#include "telestats.h" +#include "messageobj.h" +#include "messagelog.h" +#include "sqllogger.h" +#include "femsghandler.h" +#include "idberrorinfo.h" +#include "MonitorProcMem.h" +#include "liboamcpp.h" +#include "crashtrace.h" +#include "service.h" + +#include +#include +#include + +#include "dbrm.h" + +#include "mariadb_my_sys.h" +#include "statistics.h" + +namespace exemgr +{ + class Opt + { + public: + int m_debug; + bool m_e; + bool m_fg; + Opt() : m_debug(0), m_e(false), m_fg(false) {}; + Opt(int argc, char* argv[]) : m_debug(0), m_e(false), m_fg(false) + { + int c; + while ((c = getopt(argc, argv, "edf")) != EOF) + { + switch (c) + { + case 'd': m_debug++; break; + + case 'e': m_e = true; break; + + case 'f': m_fg = true; break; + + case '?': + default: break; + } + } + } + int getDebugLevel() const + { + return m_debug; + } + }; + + class ServiceExeMgr : public Service, public Opt + { + using SessionMemMap_t = std::map; + using ThreadCntPerSessionMap_t = std::map; + protected: + void log(logging::LOG_TYPE type, const std::string& str) + { + logging::LoggingID logid(16); + logging::Message::Args args; + logging::Message message(8); + args.add(strerror(errno)); + message.format(args); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(type, message, logid); + } + + public: + ServiceExeMgr(const Opt& opt) : Service("ExeMgr"), Opt(opt), msgLog_(logging::Logger(16)) + { + bool runningWithExeMgr = true; + rm_ = joblist::ResourceManager::instance(runningWithExeMgr); + } + ServiceExeMgr(const Opt& opt, config::Config* aConfig) : Service("ExeMgr"), Opt(opt), msgLog_(logging::Logger(16)) + { + bool runningWithExeMgr = true; + rm_ = joblist::ResourceManager::instance(runningWithExeMgr, aConfig); + } + void LogErrno() override + { + log(logging::LOG_TYPE_CRITICAL, std::string(strerror(errno))); + } + void ParentLogChildMessage(const std::string& str) override + { + log(logging::LOG_TYPE_INFO, str); + } + int Child() override; + int Run() + { + return m_fg ? Child() : RunForking(); + } + static const constexpr unsigned logDefaultMsg = logging::M0000; + static const constexpr unsigned logDbProfStartStatement = logging::M0028; + static const constexpr unsigned logDbProfEndStatement = logging::M0029; + static const constexpr unsigned logStartSql = logging::M0041; + static const constexpr unsigned logEndSql = logging::M0042; + static const constexpr unsigned logRssTooBig = logging::M0044; + static const constexpr unsigned logDbProfQueryStats = logging::M0047; + static const constexpr unsigned logExeMgrExcpt = logging::M0055; + // If any flags other than the table mode flags are set, produce output to screeen + static const constexpr uint32_t flagsWantOutput = (0xffffffff & ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH & + ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF); + logging::Logger& getLogger() + { + return msgLog_; + } + void updateSessionMap(const size_t pct) + { + std::lock_guard lk(sessionMemMapMutex_); + + for (auto mapIter = sessionMemMap_.begin(); mapIter != sessionMemMap_.end(); ++mapIter) + { + if (pct > mapIter->second) + { + mapIter->second = pct; + } + } + } + ThreadCntPerSessionMap_t& getThreadCntPerSessionMap() + { + return threadCntPerSessionMap_; + } + std::mutex& getThreadCntPerSessionMapMutex() + { + return threadCntPerSessionMapMutex_; + } + void initMaxMemPct(uint32_t sessionId) + { + // WIP + if (sessionId < 0x80000000) + { + std::lock_guard lk(sessionMemMapMutex_); + auto mapIter = sessionMemMap_.find(sessionId); + + if (mapIter == sessionMemMap_.end()) + { + sessionMemMap_[sessionId] = 0; + } + else + { + mapIter->second = 0; + } + } + } + uint64_t getMaxMemPct(const uint32_t sessionId) + { + uint64_t maxMemoryPct = 0; + // WIP + if (sessionId < 0x80000000) + { + std::lock_guard lk(sessionMemMapMutex_); + auto mapIter = sessionMemMap_.find(sessionId); + + if (mapIter != sessionMemMap_.end()) + { + maxMemoryPct = (uint64_t)mapIter->second; + } + } + + return maxMemoryPct; + } + void deleteMaxMemPct(uint32_t sessionId) + { + if (sessionId < 0x80000000) + { + std::lock_guard lk(sessionMemMapMutex_); + auto mapIter = sessionMemMap_.find(sessionId); + + if (mapIter != sessionMemMap_.end()) + { + sessionMemMap_.erase(sessionId); + } + } + } + //...Increment the number of threads using the specified sessionId + void incThreadCntPerSession(uint32_t sessionId) + { + std::lock_guard lk(threadCntPerSessionMapMutex_); + auto mapIter = threadCntPerSessionMap_.find(sessionId); + + if (mapIter == threadCntPerSessionMap_.end()) + threadCntPerSessionMap_.insert(ThreadCntPerSessionMap_t::value_type(sessionId, 1)); + else + mapIter->second++; + } + //...Decrement the number of threads using the specified sessionId. + //...When the thread count for a sessionId reaches 0, the corresponding + //...CalpontSystemCatalog objects are deleted. + //...The user query and its associated catalog query have a different + //...session Id where the highest bit is flipped. + //...The object with id(sessionId | 0x80000000) cannot be removed before + //...user query session completes because the sysdata may be used for + //...debugging/stats purpose, such as result graph, etc. + void decThreadCntPerSession(uint32_t sessionId) + { + std::lock_guard lk(threadCntPerSessionMapMutex_); + auto mapIter = threadCntPerSessionMap_.find(sessionId); + + if (mapIter != threadCntPerSessionMap_.end()) + { + if (--mapIter->second == 0) + { + threadCntPerSessionMap_.erase(mapIter); + execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId); + execplan::CalpontSystemCatalog::removeCalpontSystemCatalog((sessionId ^ 0x80000000)); + } + } + } + ActiveStatementCounter* getStatementsRunningCount() + { + return statementsRunningCount_; + } + joblist::DistributedEngineComm* getDec() + { + return dec_; + } + int toInt(const std::string& val) + { + if (val.length() == 0) + return -1; + + return static_cast(config::Config::fromText(val)); + } + const std::string prettyPrintMiniInfo(const std::string& in); + + const std::string timeNow() + { + time_t outputTime = time(0); + struct tm ltm; + char buf[32]; // ctime(3) says at least 26 + size_t len = 0; + asctime_r(localtime_r(&outputTime, <m), buf); + len = strlen(buf); + + if (len > 0) + --len; + + if (buf[len] == '\n') + buf[len] = 0; + + return buf; + } + querytele::QueryTeleServerParms& getTeleServerParms() + { + return teleServerParms_; + } + joblist::ResourceManager& getRm() + { + return *rm_; + } + private: + void setupSignalHandlers(); + int8_t setupCwd() + { + std::string workdir = rm_->getScWorkingDir(); + int8_t rc = chdir(workdir.c_str()); + + if (rc < 0 || access(".", W_OK) != 0) + rc = chdir("/tmp"); + + return (rc < 0) ? -5 : rc; + } + int setupResources() + { + struct rlimit rlim; + + if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) + { + return -1; + } + rlim.rlim_cur = rlim.rlim_max = 65536; + + if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) + { + return -2; + } + if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) + { + return -3; + } + if (rlim.rlim_cur != 65536) + { + return -4; + } + return 0; + } + + logging::Logger msgLog_; + SessionMemMap_t sessionMemMap_; // track memory% usage during a query + std::mutex sessionMemMapMutex_; + //...The FrontEnd may establish more than 1 connection (which results in + // more than 1 ExeMgr thread) per session. These threads will share + // the same CalpontSystemCatalog object for that session. Here, we + // define a std::map to track how many threads are sharing each session, so + // that we know when we can safely delete a CalpontSystemCatalog object + // shared by multiple threads per session. + ThreadCntPerSessionMap_t threadCntPerSessionMap_; + std::mutex threadCntPerSessionMapMutex_; + ActiveStatementCounter* statementsRunningCount_; + joblist::DistributedEngineComm* dec_; + joblist::ResourceManager* rm_; + // Its attributes are set in Child() + querytele::QueryTeleServerParms teleServerParms_; + }; + extern ServiceExeMgr* globServiceExeMgr; +} \ No newline at end of file diff --git a/primitives/primproc/sqlfrontsessionthread.cpp b/primitives/primproc/sqlfrontsessionthread.cpp new file mode 100644 index 000000000..3b5094e7a --- /dev/null +++ b/primitives/primproc/sqlfrontsessionthread.cpp @@ -0,0 +1,985 @@ +/* Copyright (C) 2022 Mariadb Corporation. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include "sqlfrontsessionthread.h" + +namespace exemgr +{ + uint64_t SQLFrontSessionThread::getMaxMemPct(uint32_t sessionId) + { + return globServiceExeMgr->getMaxMemPct(sessionId); + } + void SQLFrontSessionThread::deleteMaxMemPct(uint32_t sessionId) + { + return globServiceExeMgr->deleteMaxMemPct(sessionId); + } + void SQLFrontSessionThread::incThreadCntPerSession(uint32_t sessionId) + { + return globServiceExeMgr->incThreadCntPerSession(sessionId); + } + void SQLFrontSessionThread::decThreadCntPerSession(uint32_t sessionId) + { + return globServiceExeMgr->decThreadCntPerSession(sessionId); + } + + void SQLFrontSessionThread::initMaxMemPct(uint32_t sessionId) + { + return globServiceExeMgr->initMaxMemPct(sessionId); + } + //...Get and log query stats to specified output stream + const std::string SQLFrontSessionThread::formatQueryStats( + joblist::SJLP& jl, // joblist associated with query + const std::string& label, // header label to print in front of log output + bool includeNewLine, // include line breaks in query stats std::string + bool vtableModeOn, bool wantExtendedStats, uint64_t rowsReturned) + { + std::ostringstream os; + + // Get stats if not already acquired for current query + if (!fStatsRetrieved) + { + if (wantExtendedStats) + { + // wait for the ei data to be written by another thread (brain-dead) + struct timespec req = {0, 250000}; // 250 usec + nanosleep(&req, 0); + } + + // Get % memory usage during current query for sessionId + jl->querySummary(wantExtendedStats); + fStats = jl->queryStats(); + fStats.fMaxMemPct = getMaxMemPct(fStats.fSessionID); + fStats.fRows = rowsReturned; + fStatsRetrieved = true; + } + + std::string queryMode; + queryMode = (vtableModeOn ? "Distributed" : "Standard"); + + // Log stats to specified output stream + os << label << ": MaxMemPct-" << fStats.fMaxMemPct << "; NumTempFiles-" << fStats.fNumFiles + << "; TempFileSpace-" << roundBytes(fStats.fFileBytes) << "; ApproxPhyI/O-" << fStats.fPhyIO + << "; CacheI/O-" << fStats.fCacheIO << "; BlocksTouched-" << fStats.fMsgRcvCnt; + + if (includeNewLine) + os << std::endl << " "; // insert line break + else + os << "; "; // continue without line break + + os << "PartitionBlocksEliminated-" << fStats.fCPBlocksSkipped << "; MsgBytesIn-" + << roundBytes(fStats.fMsgBytesIn) << "; MsgBytesOut-" << roundBytes(fStats.fMsgBytesOut) << "; Mode-" + << queryMode; + + return os.str(); + } + + //... Round off to human readable format (KB, MB, or GB). + const std::string SQLFrontSessionThread::roundBytes(uint64_t value) const + { + const char* units[] = {"B", "KB", "MB", "GB", "TB"}; + uint64_t i = 0, up = 0; + uint64_t roundedValue = value; + + while (roundedValue > 1024 && i < 4) + { + up = (roundedValue & 512); + roundedValue /= 1024; + i++; + } + + if (up) + roundedValue++; + + std::ostringstream oss; + oss << roundedValue << units[i]; + return oss.str(); + } + + //...Round off to nearest (1024*1024) MB + uint64_t SQLFrontSessionThread::roundMB(uint64_t value) const + { + uint64_t roundedValue = value >> 20; + + if (value & 524288) + roundedValue++; + + return roundedValue; + } + + void SQLFrontSessionThread::setRMParms(const execplan::CalpontSelectExecutionPlan::RMParmVec& parms) + { + for (execplan::CalpontSelectExecutionPlan::RMParmVec::const_iterator it = parms.begin(); + it != parms.end(); ++it) + { + switch (it->id) + { + case execplan::PMSMALLSIDEMEMORY: + { + globServiceExeMgr->getRm().addHJPmMaxSmallSideMap(it->sessionId, it->value); + break; + } + + case execplan::UMSMALLSIDEMEMORY: + { + globServiceExeMgr->getRm().addHJUmMaxSmallSideMap(it->sessionId, it->value); + break; + } + + default:; + } + } + } + + void SQLFrontSessionThread::buildSysCache(const execplan::CalpontSelectExecutionPlan& csep, + boost::shared_ptr csc) + { + const execplan::CalpontSelectExecutionPlan::ColumnMap& colMap = csep.columnMap(); + std::string schemaName; + + for (auto it = colMap.begin(); it != colMap.end(); ++it) + { + const auto sc = dynamic_cast((it->second).get()); + + if (sc) + { + schemaName = sc->schemaName(); + + // only the first time a schema is got will actually query + // system catalog. System catalog keeps a schema name std::map. + // if a schema exists, the call getSchemaInfo returns without + // doing anything. + if (!schemaName.empty()) + csc->getSchemaInfo(schemaName); + } + } + + execplan::CalpontSelectExecutionPlan::SelectList::const_iterator subIt; + + for (subIt = csep.derivedTableList().begin(); subIt != csep.derivedTableList().end(); ++subIt) + { + buildSysCache(*(dynamic_cast(subIt->get())), csc); + } + } + + void SQLFrontSessionThread::writeCodeAndError(messageqcpp::ByteStream::quadbyte code, const std::string emsg) + { + messageqcpp::ByteStream emsgBs; + messageqcpp::ByteStream tbs; + tbs << code; + fIos.write(tbs); + emsgBs << emsg; + fIos.write(emsgBs); + } + + void SQLFrontSessionThread::analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl, bool& stmtCounted) + { + auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); + messageqcpp::ByteStream::quadbyte qb; + execplan::MCSAnalyzeTableExecutionPlan caep; + + bs = fIos.read(); + caep.unserialize(bs); + + statementsRunningCount->incr(stmtCounted); + jl = joblist::JobListFactory::makeJobList(&caep, fRm, false, true); + + // Joblist is empty. + if (jl->status() == logging::statisticsJobListEmpty) + { + if (caep.traceOn()) + std::cout << "JobList is empty " << std::endl; + + jl.reset(); + bs.restart(); + qb = ANALYZE_TABLE_SUCCESS; + bs << qb; + fIos.write(bs); + bs.reset(); + statementsRunningCount->decr(stmtCounted); + return; + } + + if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers())) + { + std::cout << "fEc setup " << std::endl; + fEc->Setup(); + } + + if (jl->status() == 0) + { + std::string emsg; + + if (jl->putEngineComm(fEc) != 0) + throw std::runtime_error(jl->errMsg()); + } + else + { + throw std::runtime_error("ExeMgr: could not build a JobList!"); + } + + // Execute a joblist. + jl->doQuery(); + + FEMsgHandler msgHandler(jl, &fIos); + + msgHandler.start(); + auto rowCount = jl->projectTable(100, bs); + msgHandler.stop(); + + auto outRG = (static_cast(jl.get()))->getOutputRowGroup(); + + if (caep.traceOn()) + std::cout << "Row count " << rowCount << std::endl; + + // Process `RowGroup`, increase an epoch and save statistics to the file. + auto* statisticsManager = statistics::StatisticsManager::instance(); + statisticsManager->analyzeColumnKeyTypes(outRG, caep.traceOn()); + statisticsManager->incEpoch(); + statisticsManager->saveToFile(); + + // Distribute statistics across all ExeMgr clients if possible. + statistics::StatisticsDistributor::instance()->distributeStatistics(); + + // Send the signal back to front-end. + bs.restart(); + qb = ANALYZE_TABLE_SUCCESS; + bs << qb; + fIos.write(bs); + bs.reset(); + statementsRunningCount->decr(stmtCounted); + } + + void SQLFrontSessionThread::analyzeTableHandleStats(messageqcpp::ByteStream& bs) + { + messageqcpp::ByteStream::quadbyte qb; +#ifdef DEBUG_STATISTICS + std::cout << "Get distributed statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; +#endif + bs = fIos.read(); +#ifdef DEBUG_STATISTICS + std::cout << "Read the hash from statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; +#endif + uint64_t dataHashRec; + bs >> dataHashRec; + + uint64_t dataHash = statistics::StatisticsManager::instance()->computeHashFromStats(); + // The stats are the same. + if (dataHash == dataHashRec) + { +#ifdef DEBUG_STATISTICS + std::cout << "The hash is the same as rec hash on ExeMgr(Client) from ExeMgr(Server) " << std::endl; +#endif + qb = ANALYZE_TABLE_SUCCESS; + bs << qb; + fIos.write(bs); + bs.reset(); + return; + } + + bs.restart(); + qb = ANALYZE_TABLE_NEED_STATS; + bs << qb; + fIos.write(bs); + + bs.restart(); + bs = fIos.read(); +#ifdef DEBUG_STATISTICS + std::cout << "Read statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; +#endif + statistics::StatisticsManager::instance()->unserialize(bs); + statistics::StatisticsManager::instance()->saveToFile(); + +#ifdef DEBUG_STATISTICS + std::cout << "Write flag on ExeMgr(Client) to ExeMgr(Server)" << std::endl; +#endif + qb = ANALYZE_TABLE_SUCCESS; + bs << qb; + fIos.write(bs); + bs.reset(); + } + + void SQLFrontSessionThread::operator()() + { + messageqcpp::ByteStream bs, inbs; + execplan::CalpontSelectExecutionPlan csep; + csep.sessionID(0); + joblist::SJLP jl; + bool incSQLFrontSessionThreadCnt = true; + std::mutex jlMutex; + std::condition_variable jlCleanupDone; + int destructing = 0; + int gDebug = globServiceExeMgr->getDebugLevel(); + logging::Logger& msgLog = globServiceExeMgr->getLogger(); + + bool selfJoin = false; + bool tryTuples = false; + bool usingTuples = false; + bool stmtCounted = false; + auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); + + try + { + for (;;) + { + selfJoin = false; + tryTuples = false; + usingTuples = false; + + if (jl) + { + // puts the real destruction in another thread to avoid + // making the whole session wait. It can take several seconds. + std::unique_lock scoped(jlMutex); + destructing++; + std::thread bgdtor( + [jl, &jlMutex, &jlCleanupDone, &destructing] + { + std::unique_lock scoped(jlMutex); + const_cast(jl).reset(); // this happens second; does real destruction + if (--destructing == 0) + jlCleanupDone.notify_one(); + }); + jl.reset(); // this runs first + bgdtor.detach(); + } + + bs = fIos.read(); + + if (bs.length() == 0) + { + if (gDebug > 1 || (gDebug && !csep.isInternal())) + std::cout << "### Got a close(1) for session id " << csep.sessionID() << std::endl; + + // connection closed by client + fIos.close(); + break; + } + else if (bs.length() < 4) // Not a CalpontSelectExecutionPlan + { + if (gDebug) + std::cout << "### Got a not-a-plan for session id " << csep.sessionID() << " with length " + << bs.length() << std::endl; + + fIos.close(); + break; + } + else if (bs.length() == 4) // possible tuple flag + { + messageqcpp::ByteStream::quadbyte qb; + bs >> qb; + + if (qb == 4) // UM wants new tuple i/f + { + if (gDebug) + std::cout << "### UM wants tuples" << std::endl; + + tryTuples = true; + // now wait for the CSEP... + bs = fIos.read(); + } + else if (qb == 5) // somebody wants stats + { + bs.restart(); + qb = statementsRunningCount->cur(); + bs << qb; + qb = statementsRunningCount->waiting(); + bs << qb; + fIos.write(bs); + fIos.close(); + break; + } + else if (qb == ANALYZE_TABLE_EXECUTE) + { + analyzeTableExecute(bs, jl, stmtCounted); + continue; + } + else if (qb == ANALYZE_TABLE_REC_STATS) + { + analyzeTableHandleStats(bs); + continue; + } + else + { + if (gDebug) + std::cout << "### Got a not-a-plan value " << qb << std::endl; + + fIos.close(); + break; + } + } + + new_plan: + try + { + csep.unserialize(bs); + } + catch (logging::IDBExcept& ex) + { + // We can get here on illegal function parameter data type, e.g. + // SELECT blob_column|1 FROM t1; + statementsRunningCount->decr(stmtCounted); + writeCodeAndError(ex.errorCode(), std::string(ex.what())); + continue; + } + + querytele::QueryTeleStats qts; + + if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT")) + { + qts.query_uuid = csep.uuid(); + qts.msg_type = querytele::QueryTeleStats::QT_START; + qts.start_time = querytele::QueryTeleClient::timeNowms(); + qts.query = csep.data(); + qts.session_id = csep.sessionID(); + qts.query_type = csep.queryType(); + qts.system_name = fOamCachePtr->getSystemName(); + qts.module_name = fOamCachePtr->getModuleName(); + qts.local_query = csep.localQuery(); + qts.schema_name = csep.schemaName(); + fTeleClient.postQueryTele(qts); + } + + if (gDebug > 1 || (gDebug && !csep.isInternal())) + std::cout << "### For session id " << csep.sessionID() << ", got a CSEP" << std::endl; + + setRMParms(csep.rmParms()); + // Re-establish lost PP connections. + if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers())) + { + fEc->Setup(); + } + // @bug 1021. try to get schema cache for a come in query. + // skip system catalog queries. + if (!csep.isInternal()) + { + boost::shared_ptr csc = + execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(csep.sessionID()); + buildSysCache(csep, csc); + } + + // As soon as we have a session id for this thread, update the + // thread count per session; only do this once per thread. + // Mask 0x80000000 is for associate user query and csc query + if (incSQLFrontSessionThreadCnt) + { + // WIP + incThreadCntPerSession(csep.sessionID() | 0x80000000); + incSQLFrontSessionThreadCnt = false; + } + + bool needDbProfEndStatementMsg = false; + logging::Message::Args args; + std::string sqlText = csep.data(); + logging::LoggingID li(16, csep.sessionID(), csep.txnID()); + + // Initialize stats for this query, including + // init sessionMemMap entry for this session to 0 memory %. + // We will need this later for traceOn() or if we receive a + // table request with qb=3 (see below). This is also recorded + // as query start time. + initStats(csep.sessionID(), sqlText); + fStats.fQueryType = csep.queryType(); + + // Log start and end statement if tracing is enabled. Keep in + // mind the trace flag won't be set for system catalog queries. + if (csep.traceOn()) + { + args.reset(); + args.add((int)csep.statementID()); + args.add((int)csep.verID().currentScn); + args.add(sqlText); + msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfStartStatement, args, li); + needDbProfEndStatementMsg = true; + } + + // Don't log subsequent self joins after first. + if (selfJoin) + sqlText = ""; + + std::ostringstream oss; + oss << sqlText << "; |" << csep.schemaName() << "|"; + logging::SQLLogger sqlLog(oss.str(), li); + + statementsRunningCount->incr(stmtCounted); + + if (tryTuples) + { + try // @bug2244: try/catch around fIos.write() calls responding to makeTupleList + { + jl = joblist::JobListFactory::makeJobList(&csep, fRm, true, true); + // assign query stats + jl->queryStats(fStats); + + if ((jl->status()) == 0 && (jl->putEngineComm(fEc) == 0)) + { + usingTuples = true; + + // Tell the FE that we're sending tuples back, not TableBands + writeCodeAndError(0, "NOERROR"); + auto tjlp = dynamic_cast(jl.get()); + assert(tjlp); + messageqcpp::ByteStream tbs; + tbs << tjlp->getOutputRowGroup(); + fIos.write(tbs); + } + else + { + const std::string emsg = jl->errMsg(); + statementsRunningCount->decr(stmtCounted); + writeCodeAndError(jl->status(), emsg); + std::cerr << "ExeMgr: could not build a tuple joblist: " << emsg << std::endl; + continue; + } + } + catch (std::exception& ex) + { + std::ostringstream errMsg; + errMsg << "ExeMgr: error writing makeJoblist " + "response; " + << ex.what(); + throw std::runtime_error(errMsg.str()); + } + catch (...) + { + std::ostringstream errMsg; + errMsg << "ExeMgr: unknown error writing makeJoblist " + "response; "; + throw std::runtime_error(errMsg.str()); + } + + if (!usingTuples) + { + if (gDebug) + std::cout << "### UM wanted tuples but it didn't work out :-(" << std::endl; + } + else + { + if (gDebug) + std::cout << "### UM wanted tuples and we'll do our best;-)" << std::endl; + } + } + else + { + usingTuples = false; + jl = joblist::JobListFactory::makeJobList(&csep, fRm, false, true); + + if (jl->status() == 0) + { + std::string emsg; + + if (jl->putEngineComm(fEc) != 0) + throw std::runtime_error(jl->errMsg()); + } + else + { + throw std::runtime_error("ExeMgr: could not build a JobList!"); + } + } + + jl->doQuery(); + + execplan::CalpontSystemCatalog::OID tableOID; + bool swallowRows = false; + joblist::DeliveredTableMap tm; + uint64_t totalBytesSent = 0; + uint64_t totalRowCount = 0; + + // Project each table as the FE asks for it + for (;;) + { + bs = fIos.read(); + + if (bs.length() == 0) + { + if (gDebug > 1 || (gDebug && !csep.isInternal())) + std::cout << "### Got a close(2) for session id " << csep.sessionID() << std::endl; + + break; + } + + if (gDebug && bs.length() > 4) + std::cout << "### For session id " << csep.sessionID() << ", got too many bytes = " << bs.length() + << std::endl; + + // TODO: Holy crud! Can this be right? + //@bug 1444 Yes, if there is a self-join + if (bs.length() > 4) + { + selfJoin = true; + statementsRunningCount->decr(stmtCounted); + goto new_plan; + } + + assert(bs.length() == 4); + + messageqcpp::ByteStream::quadbyte qb; + + try // @bug2244: try/catch around fIos.write() calls responding to qb command + { + bs >> qb; + + if (gDebug > 1 || (gDebug && !csep.isInternal())) + std::cout << "### For session id " << csep.sessionID() << ", got a command = " << qb + << std::endl; + + if (qb == 0) + { + // No more tables, query is done + break; + } + else if (qb == 1) + { + // super-secret flag indicating that the UM is going to scarf down all the rows in the + // query. + swallowRows = true; + tm = jl->deliveredTables(); + continue; + } + else if (qb == 2) + { + // UM just wants any table + assert(swallowRows); + auto iter = tm.begin(); + + if (iter == tm.end()) + { + if (gDebug > 1 || (gDebug && !csep.isInternal())) + std::cout << "### For session id " << csep.sessionID() << ", returning end flag" + << std::endl; + + bs.restart(); + bs << (messageqcpp::ByteStream::byte)1; + fIos.write(bs); + continue; + } + + tableOID = iter->first; + } + else if (qb == 3) // special option-UM wants job stats std::string + { + std::string statsString; + + // Log stats std::string to be sent back to front end + statsString = formatQueryStats( + jl, "Query Stats", false, + !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF), + (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), totalRowCount); + + bs.restart(); + bs << statsString; + + if ((csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG) != 0) + { + bs << jl->extendedInfo(); + bs << globServiceExeMgr->prettyPrintMiniInfo(jl->miniInfo()); + } + else + { + std::string empty; + bs << empty; + bs << empty; + } + + // send stats to connector for inserting to the querystats table + fStats.serialize(bs); + fIos.write(bs); + continue; + } + // for table mode handling + else if (qb == 4) + { + statementsRunningCount->decr(stmtCounted); + bs = fIos.read(); + goto new_plan; + } + else // (qb > 3) + { + // Return table bands for the requested tableOID + tableOID = static_cast(qb); + } + } + catch (std::exception& ex) + { + std::ostringstream errMsg; + errMsg << "ExeMgr: error writing qb response " + "for qb cmd " + << qb << "; " << ex.what(); + throw std::runtime_error(errMsg.str()); + } + catch (...) + { + std::ostringstream errMsg; + errMsg << "ExeMgr: unknown error writing qb response " + "for qb cmd " + << qb; + throw std::runtime_error(errMsg.str()); + } + + if (swallowRows) + tm.erase(tableOID); + + FEMsgHandler msgHandler(jl, &fIos); + + if (tableOID == 100) + msgHandler.start(); + + //...Loop serializing table bands projected for the tableOID + for (;;) + { + uint32_t rowCount; + + rowCount = jl->projectTable(tableOID, bs); + + msgHandler.stop(); + + if (jl->status()) + { + const auto errInfo = logging::IDBErrorInfo::instance(); + + if (jl->errMsg().length() != 0) + bs << jl->errMsg(); + else + bs << errInfo->errorMsg(jl->status()); + } + + try // @bug2244: try/catch around fIos.write() calls projecting rows + { + if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) + { + // Skip the write to the front end until the last empty band. Used to time queries + // through without any front end waiting. + if (tableOID < 3000 || rowCount == 0) + fIos.write(bs); + } + else + { + fIos.write(bs); + } + } + catch (std::exception& ex) + { + msgHandler.stop(); + std::ostringstream errMsg; + errMsg << "ExeMgr: error projecting rows " + "for tableOID: " + << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount << "; " + << ex.what(); + jl->abort(); + + while (rowCount) + rowCount = jl->projectTable(tableOID, bs); + + if (tableOID == 100 && msgHandler.aborted()) + { + /* TODO: modularize the cleanup code, as well as + * the rest of this fcn */ + + decThreadCntPerSession(csep.sessionID() | 0x80000000); + statementsRunningCount->decr(stmtCounted); + fIos.close(); + return; + } + + // std::cout << "connection drop\n"; + throw std::runtime_error(errMsg.str()); + } + catch (...) + { + std::ostringstream errMsg; + msgHandler.stop(); + errMsg << "ExeMgr: unknown error projecting rows " + "for tableOID: " + << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount; + jl->abort(); + + while (rowCount) + rowCount = jl->projectTable(tableOID, bs); + + throw std::runtime_error(errMsg.str()); + } + + totalRowCount += rowCount; + totalBytesSent += bs.length(); + + if (rowCount == 0) + { + msgHandler.stop(); + // No more bands, table is done + bs.reset(); + + // @bug 2083 decr active statement count here for table mode. + if (!usingTuples) + statementsRunningCount->decr(stmtCounted); + + break; + } + else + { + bs.restart(); + } + } // End of loop to project and serialize table bands for a table + } // End of loop to process tables + + // @bug 828 + if (csep.traceOn()) + jl->graph(csep.sessionID()); + + if (needDbProfEndStatementMsg) + { + std::string ss; + std::ostringstream prefix; + prefix << "ses:" << csep.sessionID() << " Query Totals"; + + // Log stats std::string to standard out + ss = formatQueryStats(jl, prefix.str(), true, + !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF), + (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), + totalRowCount); + //@Bug 1306. Added timing info for real time tracking. + std::cout << ss << " at " << globServiceExeMgr->timeNow() << std::endl; + + // log query stats to debug log file + args.reset(); + args.add((int)csep.statementID()); + args.add(fStats.fMaxMemPct); + args.add(fStats.fNumFiles); + args.add(fStats.fFileBytes); // log raw byte count instead of MB + args.add(fStats.fPhyIO); + args.add(fStats.fCacheIO); + args.add(fStats.fMsgRcvCnt); + args.add(fStats.fMsgBytesIn); + args.add(fStats.fMsgBytesOut); + args.add(fStats.fCPBlocksSkipped); + msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfQueryStats, args, li); + //@bug 1327 + deleteMaxMemPct(csep.sessionID()); + // Calling reset here, will cause joblist destructor to be + // called, which "joins" the threads. We need to do that + // here to make sure all syslogging from all the threads + // are complete; and that our logDbProfEndStatement will + // appear "last" in the syslog for this SQL statement. + // puts the real destruction in another thread to avoid + // making the whole session wait. It can take several seconds. + int stmtID = csep.statementID(); + std::unique_lock scoped(jlMutex); + destructing++; + std::thread bgdtor( + [jl, &jlMutex, &jlCleanupDone, stmtID, &li, &destructing, &msgLog] + { + std::unique_lock scoped(jlMutex); + const_cast(jl).reset(); // this happens second; does real destruction + logging::Message::Args args; + args.add(stmtID); + msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfEndStatement, args, li); + if (--destructing == 0) + jlCleanupDone.notify_one(); + }); + jl.reset(); // this happens first + bgdtor.detach(); + } + else + // delete sessionMemMap entry for this session's memory % use + deleteMaxMemPct(csep.sessionID()); + + std::string endtime(globServiceExeMgr->timeNow()); + + if ((csep.traceFlags() & globServiceExeMgr->flagsWantOutput) && (csep.sessionID() < 0x80000000)) + { + std::cout << "For session " << csep.sessionID() << ": " << totalBytesSent << " bytes sent back at " + << endtime << std::endl; + + // @bug 663 - Implemented caltraceon(16) to replace the + // $FIFO_SINK compiler definition in pColStep. + // This option consumes rows in the project steps. + if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS4) + { + std::cout << std::endl; + std::cout << "**** No data returned to DM. Rows consumed " + "in ProjectSteps - caltrace(16) is on (FIFO_SINK)." + " ****" + << std::endl; + std::cout << std::endl; + } + else if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) + { + std::cout << std::endl; + std::cout << "**** No data returned to DM - caltrace(8) is " + "on (SWALLOW_ROWS_EXEMGR). ****" + << std::endl; + std::cout << std::endl; + } + } + + statementsRunningCount->decr(stmtCounted); + + if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT")) + { + qts.msg_type = querytele::QueryTeleStats::QT_SUMMARY; + qts.max_mem_pct = fStats.fMaxMemPct; + qts.num_files = fStats.fNumFiles; + qts.phy_io = fStats.fPhyIO; + qts.cache_io = fStats.fCacheIO; + qts.msg_rcv_cnt = fStats.fMsgRcvCnt; + qts.cp_blocks_skipped = fStats.fCPBlocksSkipped; + qts.msg_bytes_in = fStats.fMsgBytesIn; + qts.msg_bytes_out = fStats.fMsgBytesOut; + qts.rows = totalRowCount; + qts.end_time = querytele::QueryTeleClient::timeNowms(); + qts.session_id = csep.sessionID(); + qts.query_type = csep.queryType(); + qts.query = csep.data(); + qts.system_name = fOamCachePtr->getSystemName(); + qts.module_name = fOamCachePtr->getModuleName(); + qts.local_query = csep.localQuery(); + fTeleClient.postQueryTele(qts); + } + } + + // Release CSC object (for sessionID) that was added by makeJobList() + // Mask 0x80000000 is for associate user query and csc query. + // (actual joblist destruction happens at the top of this loop) + decThreadCntPerSession(csep.sessionID() | 0x80000000); + } + catch (std::exception& ex) + { + decThreadCntPerSession(csep.sessionID() | 0x80000000); + statementsRunningCount->decr(stmtCounted); + std::cerr << "### ExeMgr ses:" << csep.sessionID() << " caught: " << ex.what() << std::endl; + logging::Message::Args args; + logging::LoggingID li(16, csep.sessionID(), csep.txnID()); + args.add(ex.what()); + msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li); + fIos.close(); + } + catch (...) + { + decThreadCntPerSession(csep.sessionID() | 0x80000000); + statementsRunningCount->decr(stmtCounted); + std::cerr << "### Exception caught!" << std::endl; + logging::Message::Args args; + logging::LoggingID li(16, csep.sessionID(), csep.txnID()); + args.add("ExeMgr caught unknown exception"); + msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li); + fIos.close(); + } + + // make sure we don't leave scope while joblists are being destroyed + std::unique_lock scoped(jlMutex); + while (destructing > 0) + jlCleanupDone.wait(scoped); +} +}; // namespace exemgr \ No newline at end of file diff --git a/primitives/primproc/sqlfrontsessionthread.h b/primitives/primproc/sqlfrontsessionthread.h new file mode 100644 index 000000000..ca605ea79 --- /dev/null +++ b/primitives/primproc/sqlfrontsessionthread.h @@ -0,0 +1,131 @@ +/* Copyright (C) 2022 Mariadb Corporation. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include +#include +#include +#include + +#undef root_name +#include + +#include "calpontselectexecutionplan.h" +#include "mcsanalyzetableexecutionplan.h" +#include "activestatementcounter.h" +#include "distributedenginecomm.h" +#include "resourcemanager.h" +#include "configcpp.h" +#include "queryteleserverparms.h" +#include "iosocket.h" +#include "joblist.h" +#include "joblistfactory.h" +#include "oamcache.h" +#include "simplecolumn.h" +#include "bytestream.h" +#include "telestats.h" +#include "messageobj.h" +#include "messagelog.h" +#include "sqllogger.h" +#include "femsghandler.h" +#include "idberrorinfo.h" +#include "MonitorProcMem.h" +#include "liboamcpp.h" +#include "crashtrace.h" +#include "service.h" + +#include +#include +#include + +#include "dbrm.h" + +#include "mariadb_my_sys.h" +#include "statistics.h" +#include "serviceexemgr.h" + +namespace exemgr +{ + class SQLFrontSessionThread + { + public: + SQLFrontSessionThread(const messageqcpp::IOSocket& ios, joblist::DistributedEngineComm* ec, + joblist::ResourceManager* rm) + : fIos(ios) + , fEc(ec) + , fRm(rm) + , fStatsRetrieved(false) + , fTeleClient(globServiceExeMgr->getTeleServerParms()) + , fOamCachePtr(oam::OamCache::makeOamCache()) + { + } + + private: + messageqcpp::IOSocket fIos; + joblist::DistributedEngineComm* fEc; + joblist::ResourceManager* fRm; + querystats::QueryStats fStats; + + // Variables used to store return stats + bool fStatsRetrieved; + + querytele::QueryTeleClient fTeleClient; + + oam::OamCache* fOamCachePtr; // this ptr is copyable... + + //...Reinitialize stats for start of a new query + void initStats(uint32_t sessionId, std::string& sqlText) + { + initMaxMemPct(sessionId); + + fStats.reset(); + fStats.setStartTime(); + fStats.fSessionID = sessionId; + fStats.fQuery = sqlText; + fStatsRetrieved = false; + } + //...Get % memory usage during latest query for sesssionId. + //...SessionId >= 0x80000000 is system catalog query we can ignore. + static uint64_t getMaxMemPct(uint32_t sessionId); + //...Delete sessionMemMap entry for the specified session's memory % use. + //...SessionId >= 0x80000000 is system catalog query we can ignore. + static void deleteMaxMemPct(uint32_t sessionId); + //...Get and log query stats to specified output stream + const std::string formatQueryStats( + joblist::SJLP& jl, // joblist associated with query + const std::string& label, // header label to print in front of log output + bool includeNewLine, // include line breaks in query stats std::string + bool vtableModeOn, bool wantExtendedStats, uint64_t rowsReturned); + static void incThreadCntPerSession(uint32_t sessionId); + static void decThreadCntPerSession(uint32_t sessionId); + //...Init sessionMemMap entry for specified session to 0 memory %. + //...SessionId >= 0x80000000 is system catalog query we can ignore. + static void initMaxMemPct(uint32_t sessionId); + //... Round off to human readable format (KB, MB, or GB). + const std::string roundBytes(uint64_t value) const; + void setRMParms(const execplan::CalpontSelectExecutionPlan::RMParmVec& parms); + void buildSysCache(const execplan::CalpontSelectExecutionPlan& csep, + boost::shared_ptr csc); + void writeCodeAndError(messageqcpp::ByteStream::quadbyte code, const std::string emsg); + void analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl, bool& stmtCounted); + void analyzeTableHandleStats(messageqcpp::ByteStream& bs); + uint64_t roundMB(uint64_t value) const; + public: + void operator()(); + }; +} \ No newline at end of file diff --git a/utils/common/spinlock.h b/utils/common/spinlock.h index 2549230c7..8fab8661f 100644 --- a/utils/common/spinlock.h +++ b/utils/common/spinlock.h @@ -2,6 +2,10 @@ #include +// To be refactored. There are two issues with this implentation. +// 1. It can be replaced with a lock despite the memory_order_acquire/release mem order. Needs +// std::atomic_flag instead. +// 2. https://www.realworldtech.com/forum/?threadid=189711&curpostid=189723 namespace utils { inline void getSpinlock(std::atomic& lock) @@ -23,4 +27,29 @@ inline void releaseSpinlock(std::atomic& lock) lock.store(false, std::memory_order_release); } +// c++20 offers a combination of wait/notify methods but +// I prefer to use a simpler version of uspace spin lock. +class USpaceSpinLock +{ + public: + USpaceSpinLock(std::atomic_flag& flag) : flag_(flag) + { + while (flag_.test_and_set(std::memory_order_acquire)) + { + ; + } + }; + ~USpaceSpinLock() + { + release(); + }; + inline void release() + { + flag_.clear(std::memory_order_release); + } + + private: + std::atomic_flag& flag_; +}; + } // namespace utils diff --git a/utils/joiner/joiner.h b/utils/joiner/joiner.h index e61370dd6..58661489b 100644 --- a/utils/joiner/joiner.h +++ b/utils/joiner/joiner.h @@ -20,41 +20,11 @@ #include #include #include -#ifdef _MSC_VER -#include -#else + #include -#endif #include "../common/simpleallocator.h" - -#ifndef _HASHFIX_ -#define _HASHFIX_ -#ifndef __LP64__ -#if __GNUC__ == 4 && __GNUC_MINOR__ < 2 -// This is needed for /usr/include/c++/4.1.1/tr1/functional on 32-bit compiles -// tr1_hashtable_define_trivial_hash(long long unsigned int); -namespace std -{ -namespace tr1 -{ -template <> -struct hash : public std::unary_function -{ - std::size_t operator()(long long unsigned int val) const - { - return static_cast(val); - } -}; -} // namespace tr1 -} // namespace std -#endif -#endif -#endif - -#define NO_DATALISTS #include "../joblist/elementtype.h" -#undef NO_DATALISTS namespace joiner {