/* Copyright (C) 2022 Mariadb Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #pragma once #include #include #include #include #include #include #undef root_name #include #include "calpontselectexecutionplan.h" #include "mcsanalyzetableexecutionplan.h" #include "activestatementcounter.h" #include "distributedenginecomm.h" #include "resourcemanager.h" #include "configcpp.h" #include "queryteleserverparms.h" #include "iosocket.h" #include "joblist.h" #include "joblistfactory.h" #include "oamcache.h" #include "simplecolumn.h" #include "bytestream.h" #include "telestats.h" #include "messageobj.h" #include "messagelog.h" #include "sqllogger.h" #include "femsghandler.h" #include "idberrorinfo.h" #include "MonitorProcMem.h" #include "liboamcpp.h" #include "crashtrace.h" #include "service.h" #include #include #include #include "dbrm.h" #include "mariadb_my_sys.h" #include "statistics.h" namespace exemgr { using SharedPtrEMSock = boost::shared_ptr; class Opt { public: int m_debug; bool m_e; bool m_fg; Opt() : m_debug(0), m_e(false), m_fg(false) {}; Opt(int argc, char* argv[]) : m_debug(0), m_e(false), m_fg(false) { int c; while ((c = getopt(argc, argv, "edf")) != EOF) { switch (c) { case 'd': m_debug++; break; case 'e': m_e = true; break; case 'f': m_fg = true; break; case '?': default: break; } } } int getDebugLevel() const { return m_debug; } }; class ServiceExeMgr : public Service, public Opt { using SessionMemMap_t = std::map; using ThreadCntPerSessionMap_t = std::map; protected: void log(logging::LOG_TYPE type, const std::string& str) { logging::LoggingID logid(16); logging::Message::Args args; logging::Message message(8); args.add(strerror(errno)); message.format(args); logging::Logger logger(logid.fSubsysID); logger.logMessage(type, message, logid); } public: ServiceExeMgr(const Opt& opt) : Service("ExeMgr"), Opt(opt), msgLog_(logging::Logger(16)) { bool runningWithExeMgr = true; rm_ = joblist::ResourceManager::instance(runningWithExeMgr); } ServiceExeMgr(const Opt& opt, config::Config* aConfig) : Service("ExeMgr"), Opt(opt), msgLog_(logging::Logger(16)) { bool runningWithExeMgr = true; rm_ = joblist::ResourceManager::instance(runningWithExeMgr, aConfig); } void LogErrno() override { log(logging::LOG_TYPE_CRITICAL, std::string(strerror(errno))); } void ParentLogChildMessage(const std::string& str) override { log(logging::LOG_TYPE_INFO, str); } int Child() override; int Run() { return m_fg ? Child() : RunForking(); } static const constexpr unsigned logDefaultMsg = logging::M0000; static const constexpr unsigned logDbProfStartStatement = logging::M0028; static const constexpr unsigned logDbProfEndStatement = logging::M0029; static const constexpr unsigned logStartSql = logging::M0041; static const constexpr unsigned logEndSql = logging::M0042; static const constexpr unsigned logRssTooBig = logging::M0044; static const constexpr unsigned logDbProfQueryStats = logging::M0047; static const constexpr unsigned logExeMgrExcpt = logging::M0055; // If any flags other than the table mode flags are set, produce output to screeen static const constexpr uint32_t flagsWantOutput = (0xffffffff & ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH & ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF); logging::Logger& getLogger() { return msgLog_; } void updateSessionMap(const size_t pct) { std::lock_guard lk(sessionMemMapMutex_); for (auto mapIter = sessionMemMap_.begin(); mapIter != sessionMemMap_.end(); ++mapIter) { if (pct > mapIter->second) { mapIter->second = pct; } } } ThreadCntPerSessionMap_t& getThreadCntPerSessionMap() { return threadCntPerSessionMap_; } std::mutex& getThreadCntPerSessionMapMutex() { return threadCntPerSessionMapMutex_; } void initMaxMemPct(uint32_t sessionId) { if (sessionId < 0x80000000) { std::lock_guard lk(sessionMemMapMutex_); auto mapIter = sessionMemMap_.find(sessionId); if (mapIter == sessionMemMap_.end()) { sessionMemMap_[sessionId] = 0; } else { mapIter->second = 0; } } } uint64_t getMaxMemPct(const uint32_t sessionId) { uint64_t maxMemoryPct = 0; if (sessionId < 0x80000000) { std::lock_guard lk(sessionMemMapMutex_); auto mapIter = sessionMemMap_.find(sessionId); if (mapIter != sessionMemMap_.end()) { maxMemoryPct = (uint64_t)mapIter->second; } } return maxMemoryPct; } void deleteMaxMemPct(uint32_t sessionId) { if (sessionId < 0x80000000) { std::lock_guard lk(sessionMemMapMutex_); auto mapIter = sessionMemMap_.find(sessionId); if (mapIter != sessionMemMap_.end()) { sessionMemMap_.erase(sessionId); } } } //...Increment the number of threads using the specified sessionId void incThreadCntPerSession(uint32_t sessionId) { std::lock_guard lk(threadCntPerSessionMapMutex_); auto mapIter = threadCntPerSessionMap_.find(sessionId); if (mapIter == threadCntPerSessionMap_.end()) threadCntPerSessionMap_.insert(ThreadCntPerSessionMap_t::value_type(sessionId, 1)); else mapIter->second++; } //...Decrement the number of threads using the specified sessionId. //...When the thread count for a sessionId reaches 0, the corresponding //...CalpontSystemCatalog objects are deleted. //...The user query and its associated catalog query have a different //...session Id where the highest bit is flipped. //...The object with id(sessionId | 0x80000000) cannot be removed before //...user query session completes because the sysdata may be used for //...debugging/stats purpose, such as result graph, etc. void decThreadCntPerSession(uint32_t sessionId) { std::lock_guard lk(threadCntPerSessionMapMutex_); auto mapIter = threadCntPerSessionMap_.find(sessionId); if (mapIter != threadCntPerSessionMap_.end()) { if (--mapIter->second == 0) { threadCntPerSessionMap_.erase(mapIter); execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId); execplan::CalpontSystemCatalog::removeCalpontSystemCatalog((sessionId ^ 0x80000000)); } } } ActiveStatementCounter* getStatementsRunningCount() { return statementsRunningCount_; } joblist::DistributedEngineComm* getDec() { return dec_; } int toInt(const std::string& val) { if (val.length() == 0) return -1; return static_cast(config::Config::fromText(val)); } const std::string prettyPrintMiniInfo(const std::string& in); const std::string timeNow() { time_t outputTime = time(0); struct tm ltm; char buf[32]; // ctime(3) says at least 26 size_t len = 0; asctime_r(localtime_r(&outputTime, <m), buf); len = strlen(buf); if (len > 0) --len; if (buf[len] == '\n') buf[len] = 0; return buf; } querytele::QueryTeleServerParms& getTeleServerParms() { return teleServerParms_; } joblist::ResourceManager& getRm() { return *rm_; } bool isLocalNodeSock(SharedPtrEMSock& sock) const { for (auto& sin : localNetIfaceSins_) { if (sock->isSameAddr(sin)) { return true; } } return false; } private: void setupSignalHandlers(); int8_t setupCwd() { std::string workdir = rm_->getScWorkingDir(); int8_t rc = chdir(workdir.c_str()); if (rc < 0 || access(".", W_OK) != 0) rc = chdir("/tmp"); return (rc < 0) ? -5 : rc; } int setupResources() { struct rlimit rlim; if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) { return -1; } rlim.rlim_cur = rlim.rlim_max = 65536; if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) { return -2; } if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) { return -3; } if (rlim.rlim_cur != 65536) { return -4; } return 0; } void getLocalNetIfacesSins() { string ipAddress = "Unable to get IP Address"; struct ifaddrs* netIfacesList = nullptr; struct ifaddrs* ifaceListMembPtr = nullptr; int success = 0; // retrieve the current interfaces - returns 0 on success success = getifaddrs(&netIfacesList); if (success == 0) { ifaceListMembPtr = netIfacesList; for (; ifaceListMembPtr; ifaceListMembPtr = ifaceListMembPtr->ifa_next) { if (ifaceListMembPtr->ifa_addr->sa_family == AF_INET) { localNetIfaceSins_.push_back(((struct sockaddr_in*)ifaceListMembPtr->ifa_addr)->sin_addr); } } } freeifaddrs(netIfacesList); } logging::Logger msgLog_; SessionMemMap_t sessionMemMap_; // track memory% usage during a query std::mutex sessionMemMapMutex_; //...The FrontEnd may establish more than 1 connection (which results in // more than 1 ExeMgr thread) per session. These threads will share // the same CalpontSystemCatalog object for that session. Here, we // define a std::map to track how many threads are sharing each session, so // that we know when we can safely delete a CalpontSystemCatalog object // shared by multiple threads per session. ThreadCntPerSessionMap_t threadCntPerSessionMap_; std::mutex threadCntPerSessionMapMutex_; ActiveStatementCounter* statementsRunningCount_ = nullptr; joblist::DistributedEngineComm* dec_ = nullptr; joblist::ResourceManager* rm_ = nullptr; // Its attributes are set in Child() querytele::QueryTeleServerParms teleServerParms_; std::vector localNetIfaceSins_; }; extern ServiceExeMgr* globServiceExeMgr; }