1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
2025-02-21 20:02:38 +04:00

385 lines
11 KiB
C++

/* Copyright (C) 2022 Mariadb Corporation.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#pragma once
#include <iostream>
#include <cstdint>
#include <csignal>
#include <ifaddrs.h>
#include <sys/resource.h>
#include <sys/types.h>
#undef root_name
#include <boost/filesystem.hpp>
#include "calpontselectexecutionplan.h"
#include "mcsanalyzetableexecutionplan.h"
#include "activestatementcounter.h"
#include "distributedenginecomm.h"
#include "resourcemanager.h"
#include "configcpp.h"
#include "queryteleserverparms.h"
#include "iosocket.h"
#include "joblist.h"
#include "joblistfactory.h"
#include "oamcache.h"
#include "simplecolumn.h"
#include "bytestream.h"
#include "telestats.h"
#include "messageobj.h"
#include "messagelog.h"
#include "sqllogger.h"
#include "femsghandler.h"
#include "idberrorinfo.h"
#include "MonitorProcMem.h"
#include "liboamcpp.h"
#include "crashtrace.h"
#include "service.h"
#include <mutex>
#include <thread>
#include <condition_variable>
#include "dbrm.h"
#include "mariadb_my_sys.h"
#include "statistics.h"
namespace exemgr
{
using SharedPtrEMSock = boost::shared_ptr<messageqcpp::IOSocket>;
class Opt
{
public:
int m_debug;
bool m_e;
bool m_fg;
Opt() : m_debug(0), m_e(false), m_fg(false){};
Opt(int argc, char* argv[]) : m_debug(0), m_e(false), m_fg(false)
{
int c;
while ((c = getopt(argc, argv, "edf")) != EOF)
{
switch (c)
{
case 'd': m_debug++; break;
case 'e': m_e = true; break;
case 'f': m_fg = true; break;
case '?':
default: break;
}
}
}
int getDebugLevel() const
{
return m_debug;
}
};
class ServiceExeMgr : public Service, public Opt
{
using SessionMemMap_t = std::map<uint32_t, size_t>;
using ThreadCntPerSessionMap_t = std::map<uint32_t, uint32_t>;
protected:
void log(logging::LOG_TYPE type, const std::string& str)
{
logging::LoggingID logid(16);
logging::Message::Args args;
logging::Message message(8);
args.add(strerror(errno));
message.format(args);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(type, message, logid);
}
public:
ServiceExeMgr(const Opt& opt) : Service("ExeMgr"), Opt(opt), msgLog_(logging::Logger(16))
{
bool runningWithExeMgr = true;
rm_ = joblist::ResourceManager::instance(runningWithExeMgr);
}
ServiceExeMgr(const Opt& opt, config::Config* aConfig)
: Service("ExeMgr"), Opt(opt), msgLog_(logging::Logger(16))
{
bool runningWithExeMgr = true;
rm_ = joblist::ResourceManager::instance(runningWithExeMgr, aConfig);
}
void LogErrno() override
{
log(logging::LOG_TYPE_CRITICAL, std::string(strerror(errno)));
}
void ParentLogChildMessage(const std::string& str) override
{
log(logging::LOG_TYPE_INFO, str);
}
int Child() override;
int Run()
{
return m_fg ? Child() : RunForking();
}
static const constexpr unsigned logDefaultMsg = logging::M0000;
static const constexpr unsigned logDbProfStartStatement = logging::M0028;
static const constexpr unsigned logDbProfEndStatement = logging::M0029;
static const constexpr unsigned logStartSql = logging::M0041;
static const constexpr unsigned logEndSql = logging::M0042;
static const constexpr unsigned logRssTooBig = logging::M0044;
static const constexpr unsigned logDbProfQueryStats = logging::M0047;
static const constexpr unsigned logExeMgrExcpt = logging::M0055;
// If any flags other than the table mode flags are set, produce output to screeen
static const constexpr uint32_t flagsWantOutput =
(0xffffffff & ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH &
~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF);
logging::Logger& getLogger()
{
return msgLog_;
}
void updateSessionMap(const size_t pct)
{
std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
for (auto mapIter = sessionMemMap_.begin(); mapIter != sessionMemMap_.end(); ++mapIter)
{
if (pct > mapIter->second)
{
mapIter->second = pct;
}
}
}
ThreadCntPerSessionMap_t& getThreadCntPerSessionMap()
{
return threadCntPerSessionMap_;
}
std::mutex& getThreadCntPerSessionMapMutex()
{
return threadCntPerSessionMapMutex_;
}
void initMaxMemPct(uint32_t sessionId)
{
if (sessionId < 0x80000000)
{
std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
auto mapIter = sessionMemMap_.find(sessionId);
if (mapIter == sessionMemMap_.end())
{
sessionMemMap_[sessionId] = 0;
}
else
{
mapIter->second = 0;
}
}
}
uint64_t getMaxMemPct(const uint32_t sessionId)
{
uint64_t maxMemoryPct = 0;
if (sessionId < 0x80000000)
{
std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
auto mapIter = sessionMemMap_.find(sessionId);
if (mapIter != sessionMemMap_.end())
{
maxMemoryPct = (uint64_t)mapIter->second;
}
}
return maxMemoryPct;
}
void deleteMaxMemPct(uint32_t sessionId)
{
if (sessionId < 0x80000000)
{
std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
auto mapIter = sessionMemMap_.find(sessionId);
if (mapIter != sessionMemMap_.end())
{
sessionMemMap_.erase(sessionId);
}
}
}
//...Increment the number of threads using the specified sessionId
void incThreadCntPerSession(uint32_t sessionId)
{
std::lock_guard<std::mutex> lk(threadCntPerSessionMapMutex_);
auto mapIter = threadCntPerSessionMap_.find(sessionId);
if (mapIter == threadCntPerSessionMap_.end())
threadCntPerSessionMap_.insert(ThreadCntPerSessionMap_t::value_type(sessionId, 1));
else
mapIter->second++;
}
//...Decrement the number of threads using the specified sessionId.
//...When the thread count for a sessionId reaches 0, the corresponding
//...CalpontSystemCatalog objects are deleted.
//...The user query and its associated catalog query have a different
//...session Id where the highest bit is flipped.
//...The object with id(sessionId | 0x80000000) cannot be removed before
//...user query session completes because the sysdata may be used for
//...debugging/stats purpose, such as result graph, etc.
void decThreadCntPerSession(uint32_t sessionId)
{
std::lock_guard<std::mutex> lk(threadCntPerSessionMapMutex_);
auto mapIter = threadCntPerSessionMap_.find(sessionId);
if (mapIter != threadCntPerSessionMap_.end())
{
if (--mapIter->second == 0)
{
threadCntPerSessionMap_.erase(mapIter);
execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
execplan::CalpontSystemCatalog::removeCalpontSystemCatalog((sessionId ^ 0x80000000));
}
}
}
ActiveStatementCounter* getStatementsRunningCount()
{
return statementsRunningCount_;
}
joblist::DistributedEngineComm* getDec()
{
return dec_;
}
int toInt(const std::string& val)
{
if (val.length() == 0)
return -1;
return static_cast<int>(config::Config::fromText(val));
}
const std::string prettyPrintMiniInfo(const std::string& in);
const std::string timeNow()
{
time_t outputTime = time(nullptr);
struct tm ltm;
char buf[32]; // ctime(3) says at least 26
size_t len = 0;
asctime_r(localtime_r(&outputTime, &ltm), buf);
len = strlen(buf);
if (len > 0)
--len;
if (buf[len] == '\n')
buf[len] = 0;
return buf;
}
querytele::QueryTeleServerParms& getTeleServerParms()
{
return teleServerParms_;
}
joblist::ResourceManager& getRm()
{
return *rm_;
}
bool isLocalNodeSock(SharedPtrEMSock& sock) const
{
for (auto& sin : localNetIfaceSins_)
{
if (sock->isSameAddr(sin))
{
return true;
}
}
return false;
}
private:
void setupSignalHandlers();
int8_t setupCwd()
{
std::string workdir = rm_->getScWorkingDir();
int8_t rc = chdir(workdir.c_str());
if (rc < 0 || access(".", W_OK) != 0)
rc = chdir("/tmp");
return (rc < 0) ? -5 : rc;
}
int setupResources()
{
struct rlimit rlim;
if (getrlimit(RLIMIT_NOFILE, &rlim) != 0)
{
return -1;
}
rlim.rlim_cur = rlim.rlim_max = 65536;
if (setrlimit(RLIMIT_NOFILE, &rlim) != 0)
{
return -2;
}
if (getrlimit(RLIMIT_NOFILE, &rlim) != 0)
{
return -3;
}
if (rlim.rlim_cur != 65536)
{
return -4;
}
return 0;
}
void getLocalNetIfacesSins()
{
string ipAddress = "Unable to get IP Address";
struct ifaddrs* netIfacesList = nullptr;
struct ifaddrs* ifaceListMembPtr = nullptr;
int success = 0;
// retrieve the current interfaces - returns 0 on success
success = getifaddrs(&netIfacesList);
if (success == 0)
{
ifaceListMembPtr = netIfacesList;
for (; ifaceListMembPtr; ifaceListMembPtr = ifaceListMembPtr->ifa_next)
{
if (ifaceListMembPtr->ifa_addr->sa_family == AF_INET)
{
localNetIfaceSins_.push_back(((struct sockaddr_in*)ifaceListMembPtr->ifa_addr)->sin_addr);
}
}
}
freeifaddrs(netIfacesList);
}
logging::Logger msgLog_;
SessionMemMap_t sessionMemMap_; // track memory% usage during a query
std::mutex sessionMemMapMutex_;
//...The FrontEnd may establish more than 1 connection (which results in
// more than 1 ExeMgr thread) per session. These threads will share
// the same CalpontSystemCatalog object for that session. Here, we
// define a std::map to track how many threads are sharing each session, so
// that we know when we can safely delete a CalpontSystemCatalog object
// shared by multiple threads per session.
ThreadCntPerSessionMap_t threadCntPerSessionMap_;
std::mutex threadCntPerSessionMapMutex_;
ActiveStatementCounter* statementsRunningCount_ = nullptr;
joblist::DistributedEngineComm* dec_ = nullptr;
joblist::ResourceManager* rm_ = nullptr;
// Its attributes are set in Child()
querytele::QueryTeleServerParms teleServerParms_;
std::vector<struct in_addr> localNetIfaceSins_;
};
extern ServiceExeMgr* globServiceExeMgr;
} // namespace exemgr