You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			377 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			377 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2022 Mariadb Corporation.
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or
 | 
						|
   modify it under the terms of the GNU General Public License
 | 
						|
   as published by the Free Software Foundation; version 2 of
 | 
						|
   the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | 
						|
   MA 02110-1301, USA. */
 | 
						|
 | 
						|
#pragma once
 | 
						|
 | 
						|
#include <iostream>
 | 
						|
#include <cstdint>
 | 
						|
#include <csignal>
 | 
						|
#include <ifaddrs.h>
 | 
						|
#include <sys/resource.h>
 | 
						|
#include <sys/types.h>
 | 
						|
 | 
						|
#undef root_name
 | 
						|
#include <boost/filesystem.hpp>
 | 
						|
 | 
						|
#include "calpontselectexecutionplan.h"
 | 
						|
#include "mcsanalyzetableexecutionplan.h"
 | 
						|
#include "activestatementcounter.h"
 | 
						|
#include "distributedenginecomm.h"
 | 
						|
#include "resourcemanager.h"
 | 
						|
#include "configcpp.h"
 | 
						|
#include "queryteleserverparms.h"
 | 
						|
#include "iosocket.h"
 | 
						|
#include "joblist.h"
 | 
						|
#include "joblistfactory.h"
 | 
						|
#include "oamcache.h"
 | 
						|
#include "simplecolumn.h"
 | 
						|
#include "bytestream.h"
 | 
						|
#include "telestats.h"
 | 
						|
#include "messageobj.h"
 | 
						|
#include "messagelog.h"
 | 
						|
#include "sqllogger.h"
 | 
						|
#include "femsghandler.h"
 | 
						|
#include "idberrorinfo.h"
 | 
						|
#include "MonitorProcMem.h"
 | 
						|
#include "liboamcpp.h"
 | 
						|
#include "crashtrace.h"
 | 
						|
#include "service.h"
 | 
						|
 | 
						|
#include <mutex>
 | 
						|
#include <thread>
 | 
						|
#include <condition_variable>
 | 
						|
 | 
						|
#include "dbrm.h"
 | 
						|
 | 
						|
#include "mariadb_my_sys.h"
 | 
						|
#include "statistics.h"
 | 
						|
 | 
						|
namespace exemgr
 | 
						|
{
 | 
						|
using SharedPtrEMSock = boost::shared_ptr<messageqcpp::IOSocket>;
 | 
						|
class Opt
 | 
						|
{
 | 
						|
 public:
 | 
						|
  int m_debug;
 | 
						|
  bool m_e;
 | 
						|
  bool m_fg;
 | 
						|
  Opt() : m_debug(0), m_e(false), m_fg(false){};
 | 
						|
  Opt(int argc, char* argv[]) : m_debug(0), m_e(false), m_fg(false)
 | 
						|
  {
 | 
						|
    int c;
 | 
						|
    while ((c = getopt(argc, argv, "edf")) != EOF)
 | 
						|
    {
 | 
						|
      switch (c)
 | 
						|
      {
 | 
						|
        case 'd': m_debug++; break;
 | 
						|
 | 
						|
        case 'e': m_e = true; break;
 | 
						|
 | 
						|
        case 'f': m_fg = true; break;
 | 
						|
 | 
						|
        case '?':
 | 
						|
        default: break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  int getDebugLevel() const
 | 
						|
  {
 | 
						|
    return m_debug;
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
  void printTotalUmMemory(int sig);
 | 
						|
 | 
						|
class ServiceExeMgr : public Service, public Opt
 | 
						|
{
 | 
						|
  using SessionMemMap_t = std::map<uint32_t, size_t>;
 | 
						|
  using ThreadCntPerSessionMap_t = std::map<uint32_t, uint32_t>;
 | 
						|
 | 
						|
 protected:
 | 
						|
  void log(logging::LOG_TYPE type, const std::string& str)
 | 
						|
  {
 | 
						|
    logging::LoggingID logid(16);
 | 
						|
    logging::Message::Args args;
 | 
						|
    logging::Message message(8);
 | 
						|
    args.add(strerror(errno));
 | 
						|
    message.format(args);
 | 
						|
    logging::Logger logger(logid.fSubsysID);
 | 
						|
    logger.logMessage(type, message, logid);
 | 
						|
  }
 | 
						|
 | 
						|
  public:
 | 
						|
    ServiceExeMgr(const Opt& opt, joblist::ResourceManager* rm) : Service("ExeMgr"), Opt(opt), msgLog_(logging::Logger(16)), rm_(rm)
 | 
						|
    { }
 | 
						|
    void LogErrno() override
 | 
						|
    {
 | 
						|
      log(logging::LOG_TYPE_CRITICAL, std::string(strerror(errno)));
 | 
						|
    }
 | 
						|
    void ParentLogChildMessage(const std::string& str) override
 | 
						|
    {
 | 
						|
      log(logging::LOG_TYPE_INFO, str);
 | 
						|
    }
 | 
						|
    int Child() override;
 | 
						|
    int Run()
 | 
						|
    {
 | 
						|
      return m_fg ? Child() : RunForking();
 | 
						|
    }
 | 
						|
    static const constexpr unsigned logDefaultMsg = logging::M0000;
 | 
						|
    static const constexpr unsigned logDbProfStartStatement = logging::M0028;
 | 
						|
    static const constexpr unsigned logDbProfEndStatement = logging::M0029;
 | 
						|
    static const constexpr unsigned logStartSql = logging::M0041;
 | 
						|
    static const constexpr unsigned logEndSql = logging::M0042;
 | 
						|
    static const constexpr unsigned logRssTooBig = logging::M0044;
 | 
						|
    static const constexpr unsigned logDbProfQueryStats = logging::M0047;
 | 
						|
    static const constexpr unsigned logExeMgrExcpt = logging::M0055;
 | 
						|
    // If any flags other than the table mode flags are set, produce output to screeen
 | 
						|
    static const constexpr uint32_t flagsWantOutput = (0xffffffff & ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH &
 | 
						|
      ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF);
 | 
						|
    logging::Logger& getLogger()
 | 
						|
    {
 | 
						|
      return msgLog_;
 | 
						|
    }
 | 
						|
    void updateSessionMap(const size_t pct)
 | 
						|
    {
 | 
						|
      std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
 | 
						|
 | 
						|
    for (auto mapIter = sessionMemMap_.begin(); mapIter != sessionMemMap_.end(); ++mapIter)
 | 
						|
    {
 | 
						|
      if (pct > mapIter->second)
 | 
						|
      {
 | 
						|
        mapIter->second = pct;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  ThreadCntPerSessionMap_t& getThreadCntPerSessionMap()
 | 
						|
  {
 | 
						|
    return threadCntPerSessionMap_;
 | 
						|
  }
 | 
						|
  std::mutex& getThreadCntPerSessionMapMutex()
 | 
						|
  {
 | 
						|
    return threadCntPerSessionMapMutex_;
 | 
						|
  }
 | 
						|
  void initMaxMemPct(uint32_t sessionId)
 | 
						|
  {
 | 
						|
    if (sessionId < 0x80000000)
 | 
						|
    {
 | 
						|
      std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
 | 
						|
      auto mapIter = sessionMemMap_.find(sessionId);
 | 
						|
 | 
						|
      if (mapIter == sessionMemMap_.end())
 | 
						|
      {
 | 
						|
        sessionMemMap_[sessionId] = 0;
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        mapIter->second = 0;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  uint64_t getMaxMemPct(const uint32_t sessionId)
 | 
						|
  {
 | 
						|
    uint64_t maxMemoryPct = 0;
 | 
						|
    if (sessionId < 0x80000000)
 | 
						|
    {
 | 
						|
      std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
 | 
						|
      auto mapIter = sessionMemMap_.find(sessionId);
 | 
						|
 | 
						|
      if (mapIter != sessionMemMap_.end())
 | 
						|
      {
 | 
						|
        maxMemoryPct = (uint64_t)mapIter->second;
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    return maxMemoryPct;
 | 
						|
  }
 | 
						|
  void deleteMaxMemPct(uint32_t sessionId)
 | 
						|
  {
 | 
						|
    if (sessionId < 0x80000000)
 | 
						|
    {
 | 
						|
      std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
 | 
						|
      auto mapIter = sessionMemMap_.find(sessionId);
 | 
						|
 | 
						|
      if (mapIter != sessionMemMap_.end())
 | 
						|
      {
 | 
						|
        sessionMemMap_.erase(sessionId);
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  //...Increment the number of threads using the specified sessionId
 | 
						|
  void incThreadCntPerSession(uint32_t sessionId)
 | 
						|
  {
 | 
						|
    std::lock_guard<std::mutex> lk(threadCntPerSessionMapMutex_);
 | 
						|
    auto mapIter = threadCntPerSessionMap_.find(sessionId);
 | 
						|
 | 
						|
    if (mapIter == threadCntPerSessionMap_.end())
 | 
						|
      threadCntPerSessionMap_.insert(ThreadCntPerSessionMap_t::value_type(sessionId, 1));
 | 
						|
    else
 | 
						|
      mapIter->second++;
 | 
						|
  }
 | 
						|
  //...Decrement the number of threads using the specified sessionId.
 | 
						|
  //...When the thread count for a sessionId reaches 0, the corresponding
 | 
						|
  //...CalpontSystemCatalog objects are deleted.
 | 
						|
  //...The user query and its associated catalog query have a different
 | 
						|
  //...session Id where the highest bit is flipped.
 | 
						|
  //...The object with id(sessionId | 0x80000000) cannot be removed before
 | 
						|
  //...user query session completes because the sysdata may be used for
 | 
						|
  //...debugging/stats purpose, such as result graph, etc.
 | 
						|
  void decThreadCntPerSession(uint32_t sessionId)
 | 
						|
  {
 | 
						|
    std::lock_guard<std::mutex> lk(threadCntPerSessionMapMutex_);
 | 
						|
    auto mapIter = threadCntPerSessionMap_.find(sessionId);
 | 
						|
 | 
						|
    if (mapIter != threadCntPerSessionMap_.end())
 | 
						|
    {
 | 
						|
      if (--mapIter->second == 0)
 | 
						|
      {
 | 
						|
        threadCntPerSessionMap_.erase(mapIter);
 | 
						|
        execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
 | 
						|
        execplan::CalpontSystemCatalog::removeCalpontSystemCatalog((sessionId ^ 0x80000000));
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  ActiveStatementCounter* getStatementsRunningCount()
 | 
						|
  {
 | 
						|
    return statementsRunningCount_;
 | 
						|
  }
 | 
						|
  joblist::DistributedEngineComm* getDec()
 | 
						|
  {
 | 
						|
    return dec_;
 | 
						|
  }
 | 
						|
  int toInt(const std::string& val)
 | 
						|
  {
 | 
						|
    if (val.length() == 0)
 | 
						|
      return -1;
 | 
						|
 | 
						|
    return static_cast<int>(config::Config::fromText(val));
 | 
						|
  }
 | 
						|
  const std::string prettyPrintMiniInfo(const std::string& in);
 | 
						|
 | 
						|
  const std::string timeNow()
 | 
						|
  {
 | 
						|
    time_t outputTime = time(nullptr);
 | 
						|
    struct tm ltm;
 | 
						|
    char buf[32];  // ctime(3) says at least 26
 | 
						|
    size_t len = 0;
 | 
						|
    asctime_r(localtime_r(&outputTime, <m), buf);
 | 
						|
    len = strlen(buf);
 | 
						|
 | 
						|
    if (len > 0)
 | 
						|
      --len;
 | 
						|
 | 
						|
    if (buf[len] == '\n')
 | 
						|
      buf[len] = 0;
 | 
						|
 | 
						|
    return buf;
 | 
						|
  }
 | 
						|
  querytele::QueryTeleServerParms& getTeleServerParms()
 | 
						|
  {
 | 
						|
    return teleServerParms_;
 | 
						|
  }
 | 
						|
  joblist::ResourceManager& getRm()
 | 
						|
  {
 | 
						|
    return *rm_;
 | 
						|
  }
 | 
						|
  bool isLocalNodeSock(SharedPtrEMSock& sock) const
 | 
						|
  {
 | 
						|
    for (auto& sin : localNetIfaceSins_)
 | 
						|
    {
 | 
						|
      if (sock->isSameAddr(sin))
 | 
						|
      {
 | 
						|
        return true;
 | 
						|
      }
 | 
						|
    }
 | 
						|
    return false;
 | 
						|
  }
 | 
						|
 | 
						|
 private:
 | 
						|
  void setupSignalHandlers();
 | 
						|
  int8_t setupCwd()
 | 
						|
  {
 | 
						|
    std::string workdir = rm_->getScWorkingDir();
 | 
						|
    int8_t rc = chdir(workdir.c_str());
 | 
						|
 | 
						|
    if (rc < 0 || access(".", W_OK) != 0)
 | 
						|
      rc = chdir("/tmp");
 | 
						|
 | 
						|
    return (rc < 0) ? -5 : rc;
 | 
						|
  }
 | 
						|
  int setupResources()
 | 
						|
  {
 | 
						|
    struct rlimit rlim;
 | 
						|
 | 
						|
    if (getrlimit(RLIMIT_NOFILE, &rlim) != 0)
 | 
						|
    {
 | 
						|
      return -1;
 | 
						|
    }
 | 
						|
    rlim.rlim_cur = rlim.rlim_max = 65536;
 | 
						|
 | 
						|
    if (setrlimit(RLIMIT_NOFILE, &rlim) != 0)
 | 
						|
    {
 | 
						|
      return -2;
 | 
						|
    }
 | 
						|
    if (getrlimit(RLIMIT_NOFILE, &rlim) != 0)
 | 
						|
    {
 | 
						|
      return -3;
 | 
						|
    }
 | 
						|
    if (rlim.rlim_cur != 65536)
 | 
						|
    {
 | 
						|
      return -4;
 | 
						|
    }
 | 
						|
    return 0;
 | 
						|
  }
 | 
						|
  void getLocalNetIfacesSins()
 | 
						|
  {
 | 
						|
    string ipAddress = "Unable to get IP Address";
 | 
						|
    struct ifaddrs* netIfacesList = nullptr;
 | 
						|
    struct ifaddrs* ifaceListMembPtr = nullptr;
 | 
						|
    int success = 0;
 | 
						|
    // retrieve the current interfaces - returns 0 on success
 | 
						|
    success = getifaddrs(&netIfacesList);
 | 
						|
    if (success == 0)
 | 
						|
    {
 | 
						|
      ifaceListMembPtr = netIfacesList;
 | 
						|
      for (; ifaceListMembPtr; ifaceListMembPtr = ifaceListMembPtr->ifa_next)
 | 
						|
      {
 | 
						|
        if (ifaceListMembPtr->ifa_addr->sa_family == AF_INET)
 | 
						|
        {
 | 
						|
          localNetIfaceSins_.push_back(((struct sockaddr_in*)ifaceListMembPtr->ifa_addr)->sin_addr);
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
    freeifaddrs(netIfacesList);
 | 
						|
  }
 | 
						|
  logging::Logger msgLog_;
 | 
						|
  SessionMemMap_t sessionMemMap_;  // track memory% usage during a query
 | 
						|
  std::mutex sessionMemMapMutex_;
 | 
						|
  //...The FrontEnd may establish more than 1 connection (which results in
 | 
						|
  //   more than 1 ExeMgr thread) per session.  These threads will share
 | 
						|
  //   the same CalpontSystemCatalog object for that session.  Here, we
 | 
						|
  //   define a std::map to track how many threads are sharing each session, so
 | 
						|
  //   that we know when we can safely delete a CalpontSystemCatalog object
 | 
						|
  //   shared by multiple threads per session.
 | 
						|
  ThreadCntPerSessionMap_t threadCntPerSessionMap_;
 | 
						|
  std::mutex threadCntPerSessionMapMutex_;
 | 
						|
  ActiveStatementCounter* statementsRunningCount_ = nullptr;
 | 
						|
  joblist::DistributedEngineComm* dec_ = nullptr;
 | 
						|
  joblist::ResourceManager* rm_ = nullptr;
 | 
						|
  // Its attributes are set in Child()
 | 
						|
  querytele::QueryTeleServerParms teleServerParms_;
 | 
						|
  std::vector<struct in_addr> localNetIfaceSins_;
 | 
						|
};
 | 
						|
extern ServiceExeMgr* globServiceExeMgr;
 | 
						|
}  // namespace exemgr
 |