mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
385 lines
11 KiB
C++
385 lines
11 KiB
C++
/* Copyright (C) 2022 Mariadb Corporation.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
#pragma once
|
|
|
|
#include <iostream>
|
|
#include <cstdint>
|
|
#include <csignal>
|
|
#include <ifaddrs.h>
|
|
#include <sys/resource.h>
|
|
#include <sys/types.h>
|
|
|
|
#undef root_name
|
|
#include <boost/filesystem.hpp>
|
|
|
|
#include "calpontselectexecutionplan.h"
|
|
#include "mcsanalyzetableexecutionplan.h"
|
|
#include "activestatementcounter.h"
|
|
#include "distributedenginecomm.h"
|
|
#include "resourcemanager.h"
|
|
#include "configcpp.h"
|
|
#include "queryteleserverparms.h"
|
|
#include "iosocket.h"
|
|
#include "joblist.h"
|
|
#include "joblistfactory.h"
|
|
#include "oamcache.h"
|
|
#include "simplecolumn.h"
|
|
#include "bytestream.h"
|
|
#include "telestats.h"
|
|
#include "messageobj.h"
|
|
#include "messagelog.h"
|
|
#include "sqllogger.h"
|
|
#include "femsghandler.h"
|
|
#include "idberrorinfo.h"
|
|
#include "MonitorProcMem.h"
|
|
#include "liboamcpp.h"
|
|
#include "crashtrace.h"
|
|
#include "service.h"
|
|
|
|
#include <mutex>
|
|
#include <thread>
|
|
#include <condition_variable>
|
|
|
|
#include "dbrm.h"
|
|
|
|
#include "mariadb_my_sys.h"
|
|
#include "statistics.h"
|
|
|
|
namespace exemgr
|
|
{
|
|
using SharedPtrEMSock = boost::shared_ptr<messageqcpp::IOSocket>;
|
|
class Opt
|
|
{
|
|
public:
|
|
int m_debug;
|
|
bool m_e;
|
|
bool m_fg;
|
|
Opt() : m_debug(0), m_e(false), m_fg(false){};
|
|
Opt(int argc, char* argv[]) : m_debug(0), m_e(false), m_fg(false)
|
|
{
|
|
int c;
|
|
while ((c = getopt(argc, argv, "edf")) != EOF)
|
|
{
|
|
switch (c)
|
|
{
|
|
case 'd': m_debug++; break;
|
|
|
|
case 'e': m_e = true; break;
|
|
|
|
case 'f': m_fg = true; break;
|
|
|
|
case '?':
|
|
default: break;
|
|
}
|
|
}
|
|
}
|
|
int getDebugLevel() const
|
|
{
|
|
return m_debug;
|
|
}
|
|
};
|
|
|
|
class ServiceExeMgr : public Service, public Opt
|
|
{
|
|
using SessionMemMap_t = std::map<uint32_t, size_t>;
|
|
using ThreadCntPerSessionMap_t = std::map<uint32_t, uint32_t>;
|
|
|
|
protected:
|
|
void log(logging::LOG_TYPE type, const std::string& str)
|
|
{
|
|
logging::LoggingID logid(16);
|
|
logging::Message::Args args;
|
|
logging::Message message(8);
|
|
args.add(strerror(errno));
|
|
message.format(args);
|
|
logging::Logger logger(logid.fSubsysID);
|
|
logger.logMessage(type, message, logid);
|
|
}
|
|
|
|
public:
|
|
ServiceExeMgr(const Opt& opt) : Service("ExeMgr"), Opt(opt), msgLog_(logging::Logger(16))
|
|
{
|
|
bool runningWithExeMgr = true;
|
|
rm_ = joblist::ResourceManager::instance(runningWithExeMgr);
|
|
}
|
|
ServiceExeMgr(const Opt& opt, config::Config* aConfig)
|
|
: Service("ExeMgr"), Opt(opt), msgLog_(logging::Logger(16))
|
|
{
|
|
bool runningWithExeMgr = true;
|
|
rm_ = joblist::ResourceManager::instance(runningWithExeMgr, aConfig);
|
|
}
|
|
void LogErrno() override
|
|
{
|
|
log(logging::LOG_TYPE_CRITICAL, std::string(strerror(errno)));
|
|
}
|
|
void ParentLogChildMessage(const std::string& str) override
|
|
{
|
|
log(logging::LOG_TYPE_INFO, str);
|
|
}
|
|
int Child() override;
|
|
int Run()
|
|
{
|
|
return m_fg ? Child() : RunForking();
|
|
}
|
|
static const constexpr unsigned logDefaultMsg = logging::M0000;
|
|
static const constexpr unsigned logDbProfStartStatement = logging::M0028;
|
|
static const constexpr unsigned logDbProfEndStatement = logging::M0029;
|
|
static const constexpr unsigned logStartSql = logging::M0041;
|
|
static const constexpr unsigned logEndSql = logging::M0042;
|
|
static const constexpr unsigned logRssTooBig = logging::M0044;
|
|
static const constexpr unsigned logDbProfQueryStats = logging::M0047;
|
|
static const constexpr unsigned logExeMgrExcpt = logging::M0055;
|
|
// If any flags other than the table mode flags are set, produce output to screeen
|
|
static const constexpr uint32_t flagsWantOutput =
|
|
(0xffffffff & ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH &
|
|
~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF);
|
|
logging::Logger& getLogger()
|
|
{
|
|
return msgLog_;
|
|
}
|
|
void updateSessionMap(const size_t pct)
|
|
{
|
|
std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
|
|
|
|
for (auto mapIter = sessionMemMap_.begin(); mapIter != sessionMemMap_.end(); ++mapIter)
|
|
{
|
|
if (pct > mapIter->second)
|
|
{
|
|
mapIter->second = pct;
|
|
}
|
|
}
|
|
}
|
|
ThreadCntPerSessionMap_t& getThreadCntPerSessionMap()
|
|
{
|
|
return threadCntPerSessionMap_;
|
|
}
|
|
std::mutex& getThreadCntPerSessionMapMutex()
|
|
{
|
|
return threadCntPerSessionMapMutex_;
|
|
}
|
|
void initMaxMemPct(uint32_t sessionId)
|
|
{
|
|
if (sessionId < 0x80000000)
|
|
{
|
|
std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
|
|
auto mapIter = sessionMemMap_.find(sessionId);
|
|
|
|
if (mapIter == sessionMemMap_.end())
|
|
{
|
|
sessionMemMap_[sessionId] = 0;
|
|
}
|
|
else
|
|
{
|
|
mapIter->second = 0;
|
|
}
|
|
}
|
|
}
|
|
uint64_t getMaxMemPct(const uint32_t sessionId)
|
|
{
|
|
uint64_t maxMemoryPct = 0;
|
|
if (sessionId < 0x80000000)
|
|
{
|
|
std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
|
|
auto mapIter = sessionMemMap_.find(sessionId);
|
|
|
|
if (mapIter != sessionMemMap_.end())
|
|
{
|
|
maxMemoryPct = (uint64_t)mapIter->second;
|
|
}
|
|
}
|
|
|
|
return maxMemoryPct;
|
|
}
|
|
void deleteMaxMemPct(uint32_t sessionId)
|
|
{
|
|
if (sessionId < 0x80000000)
|
|
{
|
|
std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
|
|
auto mapIter = sessionMemMap_.find(sessionId);
|
|
|
|
if (mapIter != sessionMemMap_.end())
|
|
{
|
|
sessionMemMap_.erase(sessionId);
|
|
}
|
|
}
|
|
}
|
|
//...Increment the number of threads using the specified sessionId
|
|
void incThreadCntPerSession(uint32_t sessionId)
|
|
{
|
|
std::lock_guard<std::mutex> lk(threadCntPerSessionMapMutex_);
|
|
auto mapIter = threadCntPerSessionMap_.find(sessionId);
|
|
|
|
if (mapIter == threadCntPerSessionMap_.end())
|
|
threadCntPerSessionMap_.insert(ThreadCntPerSessionMap_t::value_type(sessionId, 1));
|
|
else
|
|
mapIter->second++;
|
|
}
|
|
//...Decrement the number of threads using the specified sessionId.
|
|
//...When the thread count for a sessionId reaches 0, the corresponding
|
|
//...CalpontSystemCatalog objects are deleted.
|
|
//...The user query and its associated catalog query have a different
|
|
//...session Id where the highest bit is flipped.
|
|
//...The object with id(sessionId | 0x80000000) cannot be removed before
|
|
//...user query session completes because the sysdata may be used for
|
|
//...debugging/stats purpose, such as result graph, etc.
|
|
void decThreadCntPerSession(uint32_t sessionId)
|
|
{
|
|
std::lock_guard<std::mutex> lk(threadCntPerSessionMapMutex_);
|
|
auto mapIter = threadCntPerSessionMap_.find(sessionId);
|
|
|
|
if (mapIter != threadCntPerSessionMap_.end())
|
|
{
|
|
if (--mapIter->second == 0)
|
|
{
|
|
threadCntPerSessionMap_.erase(mapIter);
|
|
execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
|
|
execplan::CalpontSystemCatalog::removeCalpontSystemCatalog((sessionId ^ 0x80000000));
|
|
}
|
|
}
|
|
}
|
|
ActiveStatementCounter* getStatementsRunningCount()
|
|
{
|
|
return statementsRunningCount_;
|
|
}
|
|
joblist::DistributedEngineComm* getDec()
|
|
{
|
|
return dec_;
|
|
}
|
|
int toInt(const std::string& val)
|
|
{
|
|
if (val.length() == 0)
|
|
return -1;
|
|
|
|
return static_cast<int>(config::Config::fromText(val));
|
|
}
|
|
const std::string prettyPrintMiniInfo(const std::string& in);
|
|
|
|
const std::string timeNow()
|
|
{
|
|
time_t outputTime = time(nullptr);
|
|
struct tm ltm;
|
|
char buf[32]; // ctime(3) says at least 26
|
|
size_t len = 0;
|
|
asctime_r(localtime_r(&outputTime, <m), buf);
|
|
len = strlen(buf);
|
|
|
|
if (len > 0)
|
|
--len;
|
|
|
|
if (buf[len] == '\n')
|
|
buf[len] = 0;
|
|
|
|
return buf;
|
|
}
|
|
querytele::QueryTeleServerParms& getTeleServerParms()
|
|
{
|
|
return teleServerParms_;
|
|
}
|
|
joblist::ResourceManager& getRm()
|
|
{
|
|
return *rm_;
|
|
}
|
|
bool isLocalNodeSock(SharedPtrEMSock& sock) const
|
|
{
|
|
for (auto& sin : localNetIfaceSins_)
|
|
{
|
|
if (sock->isSameAddr(sin))
|
|
{
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
private:
|
|
void setupSignalHandlers();
|
|
int8_t setupCwd()
|
|
{
|
|
std::string workdir = rm_->getScWorkingDir();
|
|
int8_t rc = chdir(workdir.c_str());
|
|
|
|
if (rc < 0 || access(".", W_OK) != 0)
|
|
rc = chdir("/tmp");
|
|
|
|
return (rc < 0) ? -5 : rc;
|
|
}
|
|
int setupResources()
|
|
{
|
|
struct rlimit rlim;
|
|
|
|
if (getrlimit(RLIMIT_NOFILE, &rlim) != 0)
|
|
{
|
|
return -1;
|
|
}
|
|
rlim.rlim_cur = rlim.rlim_max = 65536;
|
|
|
|
if (setrlimit(RLIMIT_NOFILE, &rlim) != 0)
|
|
{
|
|
return -2;
|
|
}
|
|
if (getrlimit(RLIMIT_NOFILE, &rlim) != 0)
|
|
{
|
|
return -3;
|
|
}
|
|
if (rlim.rlim_cur != 65536)
|
|
{
|
|
return -4;
|
|
}
|
|
return 0;
|
|
}
|
|
void getLocalNetIfacesSins()
|
|
{
|
|
string ipAddress = "Unable to get IP Address";
|
|
struct ifaddrs* netIfacesList = nullptr;
|
|
struct ifaddrs* ifaceListMembPtr = nullptr;
|
|
int success = 0;
|
|
// retrieve the current interfaces - returns 0 on success
|
|
success = getifaddrs(&netIfacesList);
|
|
if (success == 0)
|
|
{
|
|
ifaceListMembPtr = netIfacesList;
|
|
for (; ifaceListMembPtr; ifaceListMembPtr = ifaceListMembPtr->ifa_next)
|
|
{
|
|
if (ifaceListMembPtr->ifa_addr->sa_family == AF_INET)
|
|
{
|
|
localNetIfaceSins_.push_back(((struct sockaddr_in*)ifaceListMembPtr->ifa_addr)->sin_addr);
|
|
}
|
|
}
|
|
}
|
|
freeifaddrs(netIfacesList);
|
|
}
|
|
logging::Logger msgLog_;
|
|
SessionMemMap_t sessionMemMap_; // track memory% usage during a query
|
|
std::mutex sessionMemMapMutex_;
|
|
//...The FrontEnd may establish more than 1 connection (which results in
|
|
// more than 1 ExeMgr thread) per session. These threads will share
|
|
// the same CalpontSystemCatalog object for that session. Here, we
|
|
// define a std::map to track how many threads are sharing each session, so
|
|
// that we know when we can safely delete a CalpontSystemCatalog object
|
|
// shared by multiple threads per session.
|
|
ThreadCntPerSessionMap_t threadCntPerSessionMap_;
|
|
std::mutex threadCntPerSessionMapMutex_;
|
|
ActiveStatementCounter* statementsRunningCount_ = nullptr;
|
|
joblist::DistributedEngineComm* dec_ = nullptr;
|
|
joblist::ResourceManager* rm_ = nullptr;
|
|
// Its attributes are set in Child()
|
|
querytele::QueryTeleServerParms teleServerParms_;
|
|
std::vector<struct in_addr> localNetIfaceSins_;
|
|
};
|
|
extern ServiceExeMgr* globServiceExeMgr;
|
|
} // namespace exemgr
|