1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
Roman Nozdrin e174696351 MCOL-5001 This patch merges ExeMgr and PrimProc runtimes
EM and PP are most resource-hungry runtimes.
        The merge enables to control their cummulative
        resource consumption, thread allocation + enables
        zero-copy data exchange b/w local EM and PP facilities.
2022-04-04 11:46:33 +00:00

458 lines
13 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2019-22 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/**********************************************************************
* $Id: main.cpp 1000 2013-07-24 21:05:51Z pleblanc $
*
*
***********************************************************************/
/**
* @brief execution plan manager main program
*
* This is the ?main? program for dealing with execution plans and
* result sets. It sits in a loop waiting for CalpontExecutionPlan
* from the FEP. It then passes the CalpontExecutionPlan to the
* JobListFactory from which a JobList is obtained. This is passed
* to to the Query Manager running in the real-time portion of the
* EC. The ExecutionPlanManager waits until the Query Manager
* returns a result set for the job list. These results are passed
* into the CalpontResultFactory, which outputs a CalpontResultSet.
* The ExecutionPlanManager passes the CalpontResultSet into the
* VendorResultFactory which produces a result set tailored to the
* specific DBMS front end in use. The ExecutionPlanManager then
* sends the VendorResultSet back to the Calpont Database Connector
* on the Front-End Processor where it is returned to the DBMS
* front-end.
*/
#include <iostream>
#include <cstdint>
#include <csignal>
#include <sys/resource.h>
#undef root_name
#include <boost/filesystem.hpp>
#include "calpontselectexecutionplan.h"
#include "mcsanalyzetableexecutionplan.h"
#include "activestatementcounter.h"
#include "distributedenginecomm.h"
#include "resourcemanager.h"
#include "configcpp.h"
#include "queryteleserverparms.h"
#include "iosocket.h"
#include "joblist.h"
#include "joblistfactory.h"
#include "oamcache.h"
#include "simplecolumn.h"
#include "bytestream.h"
#include "telestats.h"
#include "messageobj.h"
#include "messagelog.h"
#include "sqllogger.h"
#include "femsghandler.h"
#include "idberrorinfo.h"
#include "MonitorProcMem.h"
#include "liboamcpp.h"
#include "crashtrace.h"
#include "service.h"
#include <mutex>
#include <thread>
#include <condition_variable>
#include "dbrm.h"
#include "mariadb_my_sys.h"
#include "statistics.h"
#include "serviceexemgr.h"
#include "sqlfrontsessionthread.h"
namespace exemgr
{
ServiceExeMgr* globServiceExeMgr = nullptr;
void startRssMon(size_t maxPct, int pauseSeconds);
void added_a_pm(int)
{
logging::LoggingID logid(21, 0, 0);
logging::Message::Args args1;
logging::Message msg(1);
args1.add("exeMgr caught SIGHUP. Resetting connections");
msg.format(args1);
std::cout << msg.msg().c_str() << std::endl;
logging::Logger logger(logid.fSubsysID);
logger.logMessage(logging::LOG_TYPE_DEBUG, msg, logid);
auto* dec = exemgr::globServiceExeMgr->getDec();
if (dec)
{
oam::OamCache* oamCache = oam::OamCache::makeOamCache();
oamCache->forceReload();
dec->Setup();
}
}
void printTotalUmMemory(int sig)
{
int64_t num = globServiceExeMgr->getRm().availableMemory();
std::cout << "Total UM memory available: " << num << std::endl;
}
void ServiceExeMgr::setupSignalHandlers()
{
struct sigaction ign;
memset(&ign, 0, sizeof(ign));
ign.sa_handler = SIG_IGN;
sigaction(SIGPIPE, &ign, 0);
memset(&ign, 0, sizeof(ign));
ign.sa_handler = exemgr::added_a_pm;
sigaction(SIGHUP, &ign, 0);
ign.sa_handler = exemgr::printTotalUmMemory;
sigaction(SIGUSR1, &ign, 0);
memset(&ign, 0, sizeof(ign));
ign.sa_handler = fatalHandler;
sigaction(SIGSEGV, &ign, 0);
sigaction(SIGABRT, &ign, 0);
sigaction(SIGFPE, &ign, 0);
}
void cleanTempDir()
{
using TempDirPurpose = config::Config::TempDirPurpose;
struct Dirs
{
std::string section;
std::string allowed;
TempDirPurpose purpose;
};
std::vector<Dirs> dirs{{"HashJoin", "AllowDiskBasedJoin", TempDirPurpose::Joins},
{"RowAggregation", "AllowDiskBasedAggregation", TempDirPurpose::Aggregates}};
const auto config = config::Config::makeConfig();
for (const auto& dir : dirs)
{
std::string allowStr = config->getConfig(dir.section, dir.allowed);
bool allow = (allowStr == "Y" || allowStr == "y");
std::string tmpPrefix = config->getTempFileDir(dir.purpose);
if (allow && tmpPrefix.empty())
{
std::cerr << "Empty tmp directory name for " << dir.section << std::endl;
logging::LoggingID logid(16, 0, 0);
logging::Message::Args args;
logging::Message message(8);
args.add("Empty tmp directory name for:");
args.add(dir.section);
message.format(args);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(logging::LOG_TYPE_CRITICAL, message, logid);
}
tmpPrefix += "/";
idbassert(tmpPrefix != "/");
/* This is quite scary as ExeMgr usually runs as root */
try
{
if (allow)
{
boost::filesystem::remove_all(tmpPrefix);
}
boost::filesystem::create_directories(tmpPrefix);
}
catch (const std::exception& ex)
{
std::cerr << ex.what() << std::endl;
logging::LoggingID logid(16, 0, 0);
logging::Message::Args args;
logging::Message message(8);
args.add("Exception whilst cleaning tmpdir: ");
args.add(ex.what());
message.format(args);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(logging::LOG_TYPE_WARNING, message, logid);
}
catch (...)
{
std::cerr << "Caught unknown exception during tmpdir cleanup" << std::endl;
logging::LoggingID logid(16, 0, 0);
logging::Message::Args args;
logging::Message message(8);
args.add("Unknown exception whilst cleaning tmpdir");
message.format(args);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(logging::LOG_TYPE_WARNING, message, logid);
}
}
}
const std::string ServiceExeMgr::prettyPrintMiniInfo(const std::string& in)
{
// 1. take the std::string and tok it by '\n'
// 2. for each part in each line calc the longest part
// 3. padding to each longest value, output a header and the lines
using CharTraits = std::basic_string<char>::traits_type;
using CharSeparator = boost::char_separator<char, CharTraits>;
using Tokeniser = boost::tokenizer<boost::char_separator<char, CharTraits>>;
CharSeparator sep1("\n");
Tokeniser tok1(in, sep1);
std::vector<std::string> lines;
std::string header = "Desc Mode Table TableOID ReferencedColumns PIO LIO PBE Elapsed Rows";
const int header_parts = 10;
lines.push_back(header);
for (auto iter1 = tok1.begin(); iter1 != tok1.end(); ++iter1)
{
if (!iter1->empty())
lines.push_back(*iter1);
}
std::vector<unsigned> lens;
for (int i = 0; i < header_parts; i++)
lens.push_back(0);
std::vector<std::vector<std::string>> lineparts;
std::vector<std::string>::iterator iter2;
int j;
for (iter2 = lines.begin(), j = 0; iter2 != lines.end(); ++iter2, j++)
{
CharSeparator sep2(" ");
Tokeniser tok2(*iter2, sep2);
int i;
std::vector<std::string> parts;
Tokeniser::iterator iter3;
for (iter3 = tok2.begin(), i = 0; iter3 != tok2.end(); ++iter3, i++)
{
if (i >= header_parts)
break;
std::string part(*iter3);
if (j != 0 && i == 8)
part.resize(part.size() - 3);
assert(i < header_parts);
if (part.size() > lens[i])
lens[i] = part.size();
parts.push_back(part);
}
assert(i == header_parts);
lineparts.push_back(parts);
}
std::ostringstream oss;
std::vector<std::vector<std::string>>::iterator iter1 = lineparts.begin();
std::vector<std::vector<std::string>>::iterator end1 = lineparts.end();
oss << "\n";
while (iter1 != end1)
{
std::vector<std::string>::iterator iter2 = iter1->begin();
std::vector<std::string>::iterator end2 = iter1->end();
assert(distance(iter2, end2) == header_parts);
int i = 0;
while (iter2 != end2)
{
assert(i < header_parts);
oss << std::setw(lens[i]) << std::left << *iter2 << " ";
++iter2;
i++;
}
oss << "\n";
++iter1;
}
return oss.str();
}
int ServiceExeMgr::Child()
{
// Make sure CSC thinks it's on a UM or else bucket reuse stuff below will stall
if (!m_e)
setenv("CALPONT_CSC_IDENT", "um", 1);
setupSignalHandlers();
int err = 0;
if (!m_debug)
err = setupResources();
std::string errMsg;
switch (err)
{
case -1:
case -3: errMsg = "Error getting file limits, please see non-root install documentation"; break;
case -2: errMsg = "Error setting file limits, please see non-root install documentation"; break;
case -4:
errMsg = "Could not install file limits to required value, please see non-root install documentation";
break;
default: errMsg = "Couldn't change working directory or unknown error"; break;
}
cleanTempDir();
logging::MsgMap msgMap;
msgMap[logDefaultMsg] = logging::Message(logDefaultMsg);
msgMap[logDbProfStartStatement] = logging::Message(logDbProfStartStatement);
msgMap[logDbProfEndStatement] = logging::Message(logDbProfEndStatement);
msgMap[logStartSql] = logging::Message(logStartSql);
msgMap[logEndSql] = logging::Message(logEndSql);
msgMap[logRssTooBig] = logging::Message(logRssTooBig);
msgMap[logDbProfQueryStats] = logging::Message(logDbProfQueryStats);
msgMap[logExeMgrExcpt] = logging::Message(logExeMgrExcpt);
msgLog_.msgMap(msgMap);
dec_ = joblist::DistributedEngineComm::instance(rm_, true);
dec_->Open();
bool tellUser = true;
messageqcpp::MessageQueueServer* mqs;
statementsRunningCount_ = new ActiveStatementCounter(rm_->getEmExecQueueSize());
const std::string ExeMgr = "ExeMgr1";
for (;;)
{
try
{
mqs = new messageqcpp::MessageQueueServer(ExeMgr, rm_->getConfig(), messageqcpp::ByteStream::BlockSize,
64);
break;
}
catch (std::runtime_error& re)
{
std::string what = re.what();
if (what.find("Address already in use") != std::string::npos)
{
if (tellUser)
{
std::cerr << "Address already in use, retrying..." << std::endl;
tellUser = false;
}
sleep(5);
}
else
{
throw;
}
}
}
// class jobstepThreadPool is used by other processes. We can't call
// resourcemanaager (rm) functions during the static creation of threadpool
// because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton).
// From the pools perspective, it has no idea if it is ExeMgr doing the
// creation, so it has no idea which way to set the flag. So we set the max here.
joblist::JobStep::jobstepThreadPool.setMaxThreads(rm_->getJLThreadPoolSize());
joblist::JobStep::jobstepThreadPool.setName("ExeMgrJobList");
if (rm_->getJlThreadPoolDebug() == "Y" || rm_->getJlThreadPoolDebug() == "y")
{
joblist::JobStep::jobstepThreadPool.setDebug(true);
joblist::JobStep::jobstepThreadPool.invoke(
threadpool::ThreadPoolMonitor(&joblist::JobStep::jobstepThreadPool));
}
int serverThreads = rm_->getEmServerThreads();
int maxPct = rm_->getEmMaxPct();
int pauseSeconds = rm_->getEmSecondsBetweenMemChecks();
int priority = rm_->getEmPriority();
FEMsgHandler::threadPool.setMaxThreads(serverThreads);
FEMsgHandler::threadPool.setName("FEMsgHandler");
if (maxPct > 0)
{
// Defined in rssmonfcn.cpp
exemgr::startRssMon(maxPct, pauseSeconds);
}
setpriority(PRIO_PROCESS, 0, priority);
std::string teleServerHost(rm_->getConfig()->getConfig("QueryTele", "Host"));
if (!teleServerHost.empty())
{
int teleServerPort = toInt(rm_->getConfig()->getConfig("QueryTele", "Port"));
if (teleServerPort > 0)
{
teleServerParms_ = querytele::QueryTeleServerParms(teleServerHost, teleServerPort);
}
}
std::cout << "Starting ExeMgr: st = " << serverThreads << ", qs = " << rm_->getEmExecQueueSize()
<< ", mx = " << maxPct << ", cf = " << rm_->getConfig()->configFile() << std::endl;
{
BRM::DBRM* dbrm = new BRM::DBRM();
dbrm->setSystemQueryReady(true);
delete dbrm;
}
threadpool::ThreadPool exeMgrThreadPool(serverThreads, 0);
exeMgrThreadPool.setName("ExeMgrServer");
if (rm_->getExeMgrThreadPoolDebug() == "Y" || rm_->getExeMgrThreadPoolDebug() == "y")
{
exeMgrThreadPool.setDebug(true);
exeMgrThreadPool.invoke(threadpool::ThreadPoolMonitor(&exeMgrThreadPool));
}
// Load statistics.
try
{
statistics::StatisticsManager::instance()->loadFromFile();
}
catch (...)
{
std::cerr << "Cannot load statistics from file " << std::endl;
}
for (;;)
{
messageqcpp::IOSocket ios;
ios = mqs->accept();
exeMgrThreadPool.invoke(exemgr::SQLFrontSessionThread(ios, dec_, rm_));
}
exeMgrThreadPool.wait();
return 0;
}
} // namespace exemgr