1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-5001 This patch merges ExeMgr and PrimProc runtimes

EM and PP are most resource-hungry runtimes.
        The merge enables to control their cummulative
        resource consumption, thread allocation + enables
        zero-copy data exchange b/w local EM and PP facilities.
This commit is contained in:
Roman Nozdrin
2022-03-02 17:13:29 +00:00
committed by Roman Nozdrin
parent 2ec502aaf3
commit e174696351
23 changed files with 2443 additions and 111 deletions

View File

@ -292,7 +292,7 @@ void DistributedEngineComm::Setup()
catch (std::exception& ex)
{
if (i < newPmCount)
newPmCount--;
newPmCount = newPmCount > 1 ? newPmCount-1 : 1; // We can't afford to reduce newPmCount to 0
writeToLog(__FILE__, __LINE__,
"Could not connect to PMS" + std::to_string(connectionId) + ": " + ex.what(),
@ -302,7 +302,7 @@ void DistributedEngineComm::Setup()
catch (...)
{
if (i < newPmCount)
newPmCount--;
newPmCount = newPmCount > 1 ? newPmCount-1 : 1; // We can't afford to reduce newPmCount to 0
writeToLog(__FILE__, __LINE__, "Could not connect to PMS" + std::to_string(connectionId),
LOG_TYPE_ERROR);

View File

@ -1,4 +1,5 @@
/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2022 Mariadb Corporation.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -40,39 +41,25 @@ using namespace config;
namespace joblist
{
// const string ResourceManager::fExeMgrStr("ExeMgr1");
const string ResourceManager::fHashJoinStr("HashJoin");
const string ResourceManager::fHashBucketReuseStr("HashBucketReuse");
const string ResourceManager::fJobListStr("JobList");
const string ResourceManager::fPrimitiveServersStr("PrimitiveServers");
// const string ResourceManager::fSystemConfigStr("SystemConfig");
const string ResourceManager::fTupleWSDLStr("TupleWSDL");
const string ResourceManager::fZDLStr("ZDL");
const string ResourceManager::fExtentMapStr("ExtentMap");
// const string ResourceManager::fDMLProcStr("DMLProc");
// const string ResourceManager::fBatchInsertStr("BatchInsert");
const string ResourceManager::fOrderByLimitStr("OrderByLimit");
const string ResourceManager::fRowAggregationStr("RowAggregation");
ResourceManager* ResourceManager::fInstance = NULL;
boost::mutex mx;
ResourceManager* ResourceManager::instance(bool runningInExeMgr)
ResourceManager* ResourceManager::instance(bool runningInExeMgr, config::Config* aConfig)
{
boost::mutex::scoped_lock lk(mx);
if (!fInstance)
fInstance = new ResourceManager(runningInExeMgr);
fInstance = new ResourceManager(runningInExeMgr, aConfig);
return fInstance;
}
ResourceManager::ResourceManager(bool runningInExeMgr)
ResourceManager::ResourceManager(bool runningInExeMgr, config::Config* aConfig)
: fExeMgrStr("ExeMgr1")
, fSystemConfigStr("SystemConfig")
, fDMLProcStr("DMLProc")
, fBatchInsertStr("BatchInsert")
, fConfig(Config::makeConfig())
, fConfig(aConfig == nullptr ? Config::makeConfig() : aConfig)
, fNumCores(8)
, fHjNumThreads(defaultNumThreads)
, fJlProcessorThreadsPerScan(defaultProcessorThreadsPerScan)

View File

@ -1,4 +1,5 @@
/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2022 Mariadb Corporation.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -138,11 +139,8 @@ class ResourceManager
/** @brief ctor
*
*/
EXPORT ResourceManager(bool runningInExeMgr = false);
static ResourceManager* instance(bool runningInExeMgr = false);
// ResourceManager(const config::Config *cf);
// ResourceManager(const std::string& config);
// passed by ExeMgr and DistributedEngineComm to MessageQueueServer or -Client
EXPORT ResourceManager(bool runningInExeMgr = false, config::Config *aConfig = nullptr);
static ResourceManager* instance(bool runningInExeMgr = false, config::Config *aConfig = nullptr);
config::Config* getConfig()
{
return fConfig;
@ -577,19 +575,19 @@ class ResourceManager
void logMessage(logging::LOG_TYPE logLevel, logging::Message::MessageID mid, uint64_t value = 0,
uint32_t sessionId = 0);
/*static const*/ std::string fExeMgrStr;
static const std::string fHashJoinStr;
static const std::string fHashBucketReuseStr;
static const std::string fJobListStr;
static const std::string fPrimitiveServersStr;
std::string fExeMgrStr;
inline static const std::string fHashJoinStr = "HashJoin";
inline static const std::string fHashBucketReuseStr = "HashBucketReuse";
inline static const std::string fJobListStr = "JobList";
inline static const std::string fPrimitiveServersStr = "PrimitiveServers";
/*static const*/ std::string fSystemConfigStr;
static const std::string fTupleWSDLStr;
static const std::string fZDLStr;
static const std::string fExtentMapStr;
inline static const std::string fTupleWSDLStr = "TupleWSDL";
inline static const std::string fZDLStr = "ZDL";
inline static const std::string fExtentMapStr = "ExtentMap";
/*static const*/ std::string fDMLProcStr;
/*static const*/ std::string fBatchInsertStr;
static const std::string fOrderByLimitStr;
static const std::string fRowAggregationStr;
inline static const std::string fOrderByLimitStr = "OrderByLimit";
inline static const std::string fRowAggregationStr = "RowAggregation";
config::Config* fConfig;
static ResourceManager* fInstance;
uint32_t fTraceFlags;

View File

@ -6,7 +6,6 @@ etc/mysql/mariadb.conf.d/columnstore.cnf
usr/bin/mcsRebuildEM
usr/bin/DDLProc
usr/bin/DMLProc
usr/bin/ExeMgr
usr/bin/PrimProc
usr/bin/StorageManager
usr/bin/WriteEngineServer

View File

@ -1,18 +1,18 @@
include_directories( ${ENGINE_COMMON_INCLUDES} )
# include_directories( ${ENGINE_COMMON_INCLUDES} )
########### next target ###############
set(ExeMgr_SRCS serviceexemgr.cpp sqlfrontsessionthread.cpp rssmonfcn.cpp exemgr.cpp activestatementcounter.cpp femsghandler.cpp ../utils/common/crashtrace.cpp)
# set(ExeMgr_SRCS serviceexemgr.cpp sqlfrontsessionthread.cpp rssmonfcn.cpp exemgr.cpp activestatementcounter.cpp femsghandler.cpp ../utils/common/crashtrace.cpp)
add_executable(ExeMgr ${ExeMgr_SRCS})
# add_executable(ExeMgr ${ExeMgr_SRCS})
target_link_libraries(ExeMgr ${ENGINE_LDFLAGS} ${ENGINE_EXEC_LIBS} ${NETSNMP_LIBRARIES} ${MARIADB_CLIENT_LIBS} cacheutils threadpool)
# target_link_libraries(ExeMgr ${ENGINE_LDFLAGS} ${ENGINE_EXEC_LIBS} ${NETSNMP_LIBRARIES} ${MARIADB_CLIENT_LIBS} cacheutils threadpool)
target_include_directories(ExeMgr PRIVATE ${Boost_INCLUDE_DIRS})
# target_include_directories(ExeMgr PRIVATE ${Boost_INCLUDE_DIRS})
install(TARGETS ExeMgr DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-engine)
# install(TARGETS ExeMgr DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-engine)
########### install files ###############

View File

@ -84,6 +84,7 @@ namespace exemgr
}
}
}
Opt(): m_debug(0), m_e(false), m_fg(false) {};
int getDebugLevel() const
{
return m_debug;

View File

@ -6,15 +6,18 @@ PartOf=mcs-primproc.service
After=network.target mcs-primproc.service
[Service]
Type=forking
Type=oneshot
User=@DEFAULT_USER@
Group=@DEFAULT_GROUP@
LimitNOFILE=65536
LimitNPROC=65536
ExecStartPre=/usr/bin/env bash -c "ldconfig -p | grep -m1 libjemalloc > /dev/null || echo 'Please install jemalloc to avoid ColumnStore performance degradation and unexpected service interruptions.'"
ExecStart=/usr/bin/env bash -c "LD_PRELOAD=$(ldconfig -p | grep -m1 libjemalloc | awk '{print $1}') exec @ENGINE_BINDIR@/ExeMgr"
#ExecStartPre=/usr/bin/env bash -c "ldconfig -p | grep -m1 libjemalloc > /dev/null || echo 'Please install jemalloc to avoid ColumnStore performance degradation and unexpected service interruptions.'"
#ExecStart=/usr/bin/env bash -c "LD_PRELOAD=$(ldconfig -p | grep -m1 libjemalloc | awk '{print $1}') exec @ENGINE_BINDIR@/ExeMgr"
ExecStart=/bin/echo 'EM dummy start'
RemainAfterExit=yes
Restart=on-failure
TimeoutStopSec=2

View File

@ -19,14 +19,17 @@ set(PrimProc_SRCS
pseudocc.cpp
rtscommand.cpp
umsocketselector.cpp
serviceexemgr.cpp
sqlfrontsessionthread.cpp
rssmonfcn.cpp
activestatementcounter.cpp
femsghandler.cpp
../../utils/common/crashtrace.cpp)
add_executable(PrimProc ${PrimProc_SRCS})
add_dependencies(PrimProc loggingcpp)
target_include_directories(PrimProc PRIVATE ${Boost_INCLUDE_DIRS})
target_link_libraries(PrimProc ${ENGINE_LDFLAGS} ${NETSNMP_LIBRARIES} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} threadpool cacheutils dbbc processor)
install(TARGETS PrimProc DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-engine)

View File

@ -0,0 +1,59 @@
/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: activestatementcounter.cpp 940 2013-01-21 14:11:31Z rdempsey $
//
#include <unistd.h>
#include <boost/thread/mutex.hpp>
using namespace boost;
#include "activestatementcounter.h"
void ActiveStatementCounter::incr(bool& counted)
{
if (counted)
return;
counted = true;
boost::mutex::scoped_lock lk(fMutex);
if (upperLimit > 0)
while (fStatementCount >= upperLimit)
{
fStatementsWaiting++;
condvar.wait(lk);
--fStatementsWaiting;
}
fStatementCount++;
}
void ActiveStatementCounter::decr(bool& counted)
{
if (!counted)
return;
counted = false;
boost::mutex::scoped_lock lk(fMutex);
if (fStatementCount == 0)
return;
--fStatementCount;
condvar.notify_one();
}

View File

@ -0,0 +1,64 @@
/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: activestatementcounter.h 940 2013-01-21 14:11:31Z rdempsey $
//
/** @file */
#pragma once
#include <stdint.h>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include "vss.h"
class ActiveStatementCounter
{
public:
ActiveStatementCounter(uint32_t limit) : fStatementCount(0), upperLimit(limit), fStatementsWaiting(0)
{
}
virtual ~ActiveStatementCounter()
{
}
void incr(bool& counted);
void decr(bool& counted);
uint32_t cur() const
{
return fStatementCount;
}
uint32_t waiting() const
{
return fStatementsWaiting;
}
private:
ActiveStatementCounter(const ActiveStatementCounter& rhs);
ActiveStatementCounter& operator=(const ActiveStatementCounter& rhs);
uint32_t fStatementCount;
uint32_t upperLimit;
uint32_t fStatementsWaiting;
boost::mutex fMutex;
boost::condition condvar;
BRM::VSS fVss;
};

View File

@ -0,0 +1,143 @@
/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include "messagequeue.h"
#include "iosocket.h"
#include "femsghandler.h"
using namespace std;
using namespace joblist;
using namespace messageqcpp;
threadpool::ThreadPool FEMsgHandler::threadPool;
namespace
{
class Runner
{
public:
Runner(FEMsgHandler* f) : target(f)
{
}
void operator()()
{
target->threadFcn();
}
FEMsgHandler* target;
};
} // namespace
FEMsgHandler::FEMsgHandler() : die(false), running(false), sawData(false), sock(NULL)
{
}
FEMsgHandler::FEMsgHandler(boost::shared_ptr<JobList> j, IOSocket* s)
: die(false), running(false), sawData(false), jl(j)
{
sock = s;
assert(sock);
}
FEMsgHandler::~FEMsgHandler()
{
stop();
threadPool.join(thr);
}
void FEMsgHandler::start()
{
if (!running)
{
running = true;
thr = threadPool.invoke(Runner(this));
}
}
void FEMsgHandler::stop()
{
die = true;
jl.reset();
}
void FEMsgHandler::setJobList(boost::shared_ptr<JobList> j)
{
jl = j;
}
void FEMsgHandler::setSocket(IOSocket* i)
{
sock = i;
assert(sock);
}
/* Note, the next two fcns strongly depend on ExeMgr's current implementation. There's a
* good chance that if ExeMgr's table send loop is changed, these will need to be
* updated to match.
*/
/* This is currently only called if InetStreamSocket::write() throws, implying
* a connection error. It might not make sense in other contexts.
*/
bool FEMsgHandler::aborted()
{
if (sawData)
return true;
boost::mutex::scoped_lock sl(mutex);
int err;
int connectionNum = sock->getConnectionNum();
err = InetStreamSocket::pollConnection(connectionNum, 1000);
if (err == 1)
{
sawData = true;
return true;
}
return false;
}
void FEMsgHandler::threadFcn()
{
int err = 0;
int connectionNum = sock->getConnectionNum();
/* This waits for the next readable event on sock. An abort is signaled
* by sending something (anything at the moment), then dropping the connection.
* This fcn exits on all other events.
*/
while (!die && err == 0)
{
boost::mutex::scoped_lock sl(mutex);
err = InetStreamSocket::pollConnection(connectionNum, 1000);
}
if (err == 1)
sawData = true; // there's data to read, must be the abort signal
if (!die && (err == 2 || err == 1))
{
die = true;
jl->abort();
jl.reset();
}
running = false;
}

View File

@ -0,0 +1,47 @@
/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#pragma once
#include "joblist.h"
#include "inetstreamsocket.h"
#include "threadpool.h"
class FEMsgHandler
{
public:
FEMsgHandler();
FEMsgHandler(boost::shared_ptr<joblist::JobList>, messageqcpp::IOSocket*);
virtual ~FEMsgHandler();
void start();
void stop();
void setJobList(boost::shared_ptr<joblist::JobList>);
void setSocket(messageqcpp::IOSocket*);
bool aborted();
void threadFcn();
static threadpool::ThreadPool threadPool;
private:
bool die, running, sawData;
messageqcpp::IOSocket* sock;
boost::shared_ptr<joblist::JobList> jl;
boost::mutex mutex;
uint64_t thr;
};

View File

@ -945,7 +945,6 @@ struct AsynchLoader
QueryContext ver;
uint32_t txn;
int compType;
uint8_t dataWidth;
bool LBIDTrace;
uint32_t sessionID;
uint32_t* cacheCount;
@ -1420,8 +1419,8 @@ struct BPPHandler
/* Uncomment these lines to verify duplicate(). == op might need updating */
// if (*bpp != *dup)
// cerr << "createBPP: duplicate mismatch at index " << i <<
// endl;
// cerr << "createBPP: duplicate mismatch at index " << i
// << endl;
// idbassert(*bpp == *dup);
bppv->add(dup);
}
@ -2441,7 +2440,7 @@ PrimitiveServer::~PrimitiveServer()
{
}
void PrimitiveServer::start(Service* service)
void PrimitiveServer::start(Service* service, utils::USpaceSpinLock& startupRaceLock)
{
// start all the server threads
for (int i = 1; i <= fServerThreads; i++)
@ -2452,7 +2451,7 @@ void PrimitiveServer::start(Service* service)
fServerpool.invoke(ServerThread(oss.str(), this));
}
startupRaceLock.release();
service->NotifyServiceStarted();
fServerpool.wait();

View File

@ -124,7 +124,7 @@ class PrimitiveServer
/** @brief start the primitive server
*
*/
void start(Service* p);
void start(Service* p, utils::USpaceSpinLock& startupRaceLock);
/** @brief get a pointer the shared processor thread pool
*/

View File

@ -1,5 +1,5 @@
/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016 MariaDB Corporation
Copyright (C) 2016-2022 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -30,15 +30,15 @@
#endif
#include <csignal>
#include <sys/time.h>
#ifndef _MSC_VER
#include <sys/resource.h>
#include <tr1/unordered_set>
#else
#include <unordered_set>
#endif
#include <clocale>
#include <iterator>
#include <algorithm>
#include <thread>
#include <mutex>
#include <condition_variable>
//#define NDEBUG
#include <cassert>
using namespace std;
@ -72,7 +72,9 @@ using namespace idbdatafile;
#include "mariadb_my_sys.h"
#include "spinlock.h"
#include "service.h"
#include "serviceexemgr.h"
class Opt
{
@ -115,6 +117,14 @@ class ServicePrimProc : public Service, public Opt
{
return m_fg ? Child() : RunForking();
}
std::atomic_flag& getStartupRaceFlag()
{
return startupRaceFlag_;
}
private:
// Since C++20 flag's init value is false.
std::atomic_flag startupRaceFlag_;
};
namespace primitiveprocessor
@ -152,28 +162,11 @@ int toInt(const string& val)
void setupSignalHandlers()
{
#ifndef _MSC_VER
signal(SIGHUP, SIG_IGN);
struct sigaction ign;
memset(&ign, 0, sizeof(ign));
ign.sa_handler = SIG_IGN;
sigaction(SIGPIPE, &ign, 0);
memset(&ign, 0, sizeof(ign));
ign.sa_handler = SIG_IGN;
sigaction(SIGUSR1, &ign, 0);
memset(&ign, 0, sizeof(ign));
ign.sa_handler = SIG_IGN;
sigaction(SIGUSR2, &ign, 0);
memset(&ign, 0, sizeof(ign));
ign.sa_handler = fatalHandler;
sigaction(SIGSEGV, &ign, 0);
sigaction(SIGABRT, &ign, 0);
sigaction(SIGFPE, &ign, 0);
sigset_t sigset;
sigemptyset(&sigset);
sigaddset(&sigset, SIGPIPE);
@ -382,6 +375,18 @@ int ServicePrimProc::Child()
NotifyServiceInitializationFailed();
return 2;
}
utils::USpaceSpinLock startupRaceLock(getStartupRaceFlag());
std::thread exeMgrThread(
[this, cf]()
{
exemgr::Opt opt;
exemgr::globServiceExeMgr = new exemgr::ServiceExeMgr(opt, cf);
// primitive delay to avoid 'not connected to PM' log error messages
// from EM. PrimitiveServer::start() releases SpinLock after sockets
// are available.
utils::USpaceSpinLock startupRaceLock(this->getStartupRaceFlag());
exemgr::globServiceExeMgr->Child();
});
int serverThreads = 1;
int serverQueueSize = 10;
@ -395,7 +400,6 @@ int ServicePrimProc::Child()
bool rotatingDestination = false;
uint32_t deleteBlocks = 128;
bool PTTrace = false;
int temp;
string strTemp;
int priority = -1;
const string primitiveServers("PrimitiveServers");
@ -414,7 +418,7 @@ int ServicePrimProc::Child()
gDebugLevel = primitiveprocessor::NONE;
temp = toInt(cf->getConfig(primitiveServers, "ServerThreads"));
int temp = toInt(cf->getConfig(primitiveServers, "ServerThreads"));
if (temp > 0)
serverThreads = temp;
@ -761,7 +765,7 @@ int ServicePrimProc::Child()
}
#endif
server.start(this);
server.start(this, startupRaceLock);
cerr << "server.start() exited!" << endl;

View File

@ -0,0 +1,69 @@
/* Copyright (C) 2022 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <thread>
#include "rssmonfcn.h"
#include "serviceexemgr.h"
namespace exemgr
{
void RssMonFcn::operator()() const
{
logging::Logger& msgLog = globServiceExeMgr->getLogger();
auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount();
for (;;)
{
size_t rssMb = rss();
size_t pct = rssMb * 100 / fMemTotal;
if (pct > fMaxPct)
{
if (fMaxPct >= 95)
{
std::cerr << "Too much memory allocated!" << std::endl;
logging::Message::Args args;
args.add((int)pct);
args.add((int)fMaxPct);
msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logRssTooBig, args, logging::LoggingID(16));
exit(1);
}
if (statementsRunningCount->cur() == 0)
{
std::cerr << "Too much memory allocated!" << std::endl;
logging::Message::Args args;
args.add((int)pct);
args.add((int)fMaxPct);
msgLog.logMessage(logging::LOG_TYPE_WARNING, ServiceExeMgr::logRssTooBig, args, logging::LoggingID(16));
exit(1);
}
std::cerr << "Too much memory allocated, but stmts running" << std::endl;
}
// Update sessionMemMap entries lower than current mem % use
globServiceExeMgr->updateSessionMap(pct);
pause_();
}
}
void startRssMon(size_t maxPct, int pauseSeconds)
{
new std::thread(RssMonFcn(maxPct, pauseSeconds));
}
} // namespace

View File

@ -0,0 +1,36 @@
/* Copyright (C) 2022 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#pragma once
#include <iostream>
#include "MonitorProcMem.h"
namespace exemgr
{
class RssMonFcn : public utils::MonitorProcMem
{
public:
RssMonFcn(size_t maxPct, int pauseSeconds) : MonitorProcMem(maxPct, 0, 21, pauseSeconds)
{
}
void operator()() const;
};
} // namespace

View File

@ -0,0 +1,457 @@
/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2019-22 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/**********************************************************************
* $Id: main.cpp 1000 2013-07-24 21:05:51Z pleblanc $
*
*
***********************************************************************/
/**
* @brief execution plan manager main program
*
* This is the ?main? program for dealing with execution plans and
* result sets. It sits in a loop waiting for CalpontExecutionPlan
* from the FEP. It then passes the CalpontExecutionPlan to the
* JobListFactory from which a JobList is obtained. This is passed
* to to the Query Manager running in the real-time portion of the
* EC. The ExecutionPlanManager waits until the Query Manager
* returns a result set for the job list. These results are passed
* into the CalpontResultFactory, which outputs a CalpontResultSet.
* The ExecutionPlanManager passes the CalpontResultSet into the
* VendorResultFactory which produces a result set tailored to the
* specific DBMS front end in use. The ExecutionPlanManager then
* sends the VendorResultSet back to the Calpont Database Connector
* on the Front-End Processor where it is returned to the DBMS
* front-end.
*/
#include <iostream>
#include <cstdint>
#include <csignal>
#include <sys/resource.h>
#undef root_name
#include <boost/filesystem.hpp>
#include "calpontselectexecutionplan.h"
#include "mcsanalyzetableexecutionplan.h"
#include "activestatementcounter.h"
#include "distributedenginecomm.h"
#include "resourcemanager.h"
#include "configcpp.h"
#include "queryteleserverparms.h"
#include "iosocket.h"
#include "joblist.h"
#include "joblistfactory.h"
#include "oamcache.h"
#include "simplecolumn.h"
#include "bytestream.h"
#include "telestats.h"
#include "messageobj.h"
#include "messagelog.h"
#include "sqllogger.h"
#include "femsghandler.h"
#include "idberrorinfo.h"
#include "MonitorProcMem.h"
#include "liboamcpp.h"
#include "crashtrace.h"
#include "service.h"
#include <mutex>
#include <thread>
#include <condition_variable>
#include "dbrm.h"
#include "mariadb_my_sys.h"
#include "statistics.h"
#include "serviceexemgr.h"
#include "sqlfrontsessionthread.h"
namespace exemgr
{
ServiceExeMgr* globServiceExeMgr = nullptr;
void startRssMon(size_t maxPct, int pauseSeconds);
void added_a_pm(int)
{
logging::LoggingID logid(21, 0, 0);
logging::Message::Args args1;
logging::Message msg(1);
args1.add("exeMgr caught SIGHUP. Resetting connections");
msg.format(args1);
std::cout << msg.msg().c_str() << std::endl;
logging::Logger logger(logid.fSubsysID);
logger.logMessage(logging::LOG_TYPE_DEBUG, msg, logid);
auto* dec = exemgr::globServiceExeMgr->getDec();
if (dec)
{
oam::OamCache* oamCache = oam::OamCache::makeOamCache();
oamCache->forceReload();
dec->Setup();
}
}
void printTotalUmMemory(int sig)
{
int64_t num = globServiceExeMgr->getRm().availableMemory();
std::cout << "Total UM memory available: " << num << std::endl;
}
void ServiceExeMgr::setupSignalHandlers()
{
struct sigaction ign;
memset(&ign, 0, sizeof(ign));
ign.sa_handler = SIG_IGN;
sigaction(SIGPIPE, &ign, 0);
memset(&ign, 0, sizeof(ign));
ign.sa_handler = exemgr::added_a_pm;
sigaction(SIGHUP, &ign, 0);
ign.sa_handler = exemgr::printTotalUmMemory;
sigaction(SIGUSR1, &ign, 0);
memset(&ign, 0, sizeof(ign));
ign.sa_handler = fatalHandler;
sigaction(SIGSEGV, &ign, 0);
sigaction(SIGABRT, &ign, 0);
sigaction(SIGFPE, &ign, 0);
}
void cleanTempDir()
{
using TempDirPurpose = config::Config::TempDirPurpose;
struct Dirs
{
std::string section;
std::string allowed;
TempDirPurpose purpose;
};
std::vector<Dirs> dirs{{"HashJoin", "AllowDiskBasedJoin", TempDirPurpose::Joins},
{"RowAggregation", "AllowDiskBasedAggregation", TempDirPurpose::Aggregates}};
const auto config = config::Config::makeConfig();
for (const auto& dir : dirs)
{
std::string allowStr = config->getConfig(dir.section, dir.allowed);
bool allow = (allowStr == "Y" || allowStr == "y");
std::string tmpPrefix = config->getTempFileDir(dir.purpose);
if (allow && tmpPrefix.empty())
{
std::cerr << "Empty tmp directory name for " << dir.section << std::endl;
logging::LoggingID logid(16, 0, 0);
logging::Message::Args args;
logging::Message message(8);
args.add("Empty tmp directory name for:");
args.add(dir.section);
message.format(args);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(logging::LOG_TYPE_CRITICAL, message, logid);
}
tmpPrefix += "/";
idbassert(tmpPrefix != "/");
/* This is quite scary as ExeMgr usually runs as root */
try
{
if (allow)
{
boost::filesystem::remove_all(tmpPrefix);
}
boost::filesystem::create_directories(tmpPrefix);
}
catch (const std::exception& ex)
{
std::cerr << ex.what() << std::endl;
logging::LoggingID logid(16, 0, 0);
logging::Message::Args args;
logging::Message message(8);
args.add("Exception whilst cleaning tmpdir: ");
args.add(ex.what());
message.format(args);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(logging::LOG_TYPE_WARNING, message, logid);
}
catch (...)
{
std::cerr << "Caught unknown exception during tmpdir cleanup" << std::endl;
logging::LoggingID logid(16, 0, 0);
logging::Message::Args args;
logging::Message message(8);
args.add("Unknown exception whilst cleaning tmpdir");
message.format(args);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(logging::LOG_TYPE_WARNING, message, logid);
}
}
}
const std::string ServiceExeMgr::prettyPrintMiniInfo(const std::string& in)
{
// 1. take the std::string and tok it by '\n'
// 2. for each part in each line calc the longest part
// 3. padding to each longest value, output a header and the lines
using CharTraits = std::basic_string<char>::traits_type;
using CharSeparator = boost::char_separator<char, CharTraits>;
using Tokeniser = boost::tokenizer<boost::char_separator<char, CharTraits>>;
CharSeparator sep1("\n");
Tokeniser tok1(in, sep1);
std::vector<std::string> lines;
std::string header = "Desc Mode Table TableOID ReferencedColumns PIO LIO PBE Elapsed Rows";
const int header_parts = 10;
lines.push_back(header);
for (auto iter1 = tok1.begin(); iter1 != tok1.end(); ++iter1)
{
if (!iter1->empty())
lines.push_back(*iter1);
}
std::vector<unsigned> lens;
for (int i = 0; i < header_parts; i++)
lens.push_back(0);
std::vector<std::vector<std::string>> lineparts;
std::vector<std::string>::iterator iter2;
int j;
for (iter2 = lines.begin(), j = 0; iter2 != lines.end(); ++iter2, j++)
{
CharSeparator sep2(" ");
Tokeniser tok2(*iter2, sep2);
int i;
std::vector<std::string> parts;
Tokeniser::iterator iter3;
for (iter3 = tok2.begin(), i = 0; iter3 != tok2.end(); ++iter3, i++)
{
if (i >= header_parts)
break;
std::string part(*iter3);
if (j != 0 && i == 8)
part.resize(part.size() - 3);
assert(i < header_parts);
if (part.size() > lens[i])
lens[i] = part.size();
parts.push_back(part);
}
assert(i == header_parts);
lineparts.push_back(parts);
}
std::ostringstream oss;
std::vector<std::vector<std::string>>::iterator iter1 = lineparts.begin();
std::vector<std::vector<std::string>>::iterator end1 = lineparts.end();
oss << "\n";
while (iter1 != end1)
{
std::vector<std::string>::iterator iter2 = iter1->begin();
std::vector<std::string>::iterator end2 = iter1->end();
assert(distance(iter2, end2) == header_parts);
int i = 0;
while (iter2 != end2)
{
assert(i < header_parts);
oss << std::setw(lens[i]) << std::left << *iter2 << " ";
++iter2;
i++;
}
oss << "\n";
++iter1;
}
return oss.str();
}
int ServiceExeMgr::Child()
{
// Make sure CSC thinks it's on a UM or else bucket reuse stuff below will stall
if (!m_e)
setenv("CALPONT_CSC_IDENT", "um", 1);
setupSignalHandlers();
int err = 0;
if (!m_debug)
err = setupResources();
std::string errMsg;
switch (err)
{
case -1:
case -3: errMsg = "Error getting file limits, please see non-root install documentation"; break;
case -2: errMsg = "Error setting file limits, please see non-root install documentation"; break;
case -4:
errMsg = "Could not install file limits to required value, please see non-root install documentation";
break;
default: errMsg = "Couldn't change working directory or unknown error"; break;
}
cleanTempDir();
logging::MsgMap msgMap;
msgMap[logDefaultMsg] = logging::Message(logDefaultMsg);
msgMap[logDbProfStartStatement] = logging::Message(logDbProfStartStatement);
msgMap[logDbProfEndStatement] = logging::Message(logDbProfEndStatement);
msgMap[logStartSql] = logging::Message(logStartSql);
msgMap[logEndSql] = logging::Message(logEndSql);
msgMap[logRssTooBig] = logging::Message(logRssTooBig);
msgMap[logDbProfQueryStats] = logging::Message(logDbProfQueryStats);
msgMap[logExeMgrExcpt] = logging::Message(logExeMgrExcpt);
msgLog_.msgMap(msgMap);
dec_ = joblist::DistributedEngineComm::instance(rm_, true);
dec_->Open();
bool tellUser = true;
messageqcpp::MessageQueueServer* mqs;
statementsRunningCount_ = new ActiveStatementCounter(rm_->getEmExecQueueSize());
const std::string ExeMgr = "ExeMgr1";
for (;;)
{
try
{
mqs = new messageqcpp::MessageQueueServer(ExeMgr, rm_->getConfig(), messageqcpp::ByteStream::BlockSize,
64);
break;
}
catch (std::runtime_error& re)
{
std::string what = re.what();
if (what.find("Address already in use") != std::string::npos)
{
if (tellUser)
{
std::cerr << "Address already in use, retrying..." << std::endl;
tellUser = false;
}
sleep(5);
}
else
{
throw;
}
}
}
// class jobstepThreadPool is used by other processes. We can't call
// resourcemanaager (rm) functions during the static creation of threadpool
// because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton).
// From the pools perspective, it has no idea if it is ExeMgr doing the
// creation, so it has no idea which way to set the flag. So we set the max here.
joblist::JobStep::jobstepThreadPool.setMaxThreads(rm_->getJLThreadPoolSize());
joblist::JobStep::jobstepThreadPool.setName("ExeMgrJobList");
if (rm_->getJlThreadPoolDebug() == "Y" || rm_->getJlThreadPoolDebug() == "y")
{
joblist::JobStep::jobstepThreadPool.setDebug(true);
joblist::JobStep::jobstepThreadPool.invoke(
threadpool::ThreadPoolMonitor(&joblist::JobStep::jobstepThreadPool));
}
int serverThreads = rm_->getEmServerThreads();
int maxPct = rm_->getEmMaxPct();
int pauseSeconds = rm_->getEmSecondsBetweenMemChecks();
int priority = rm_->getEmPriority();
FEMsgHandler::threadPool.setMaxThreads(serverThreads);
FEMsgHandler::threadPool.setName("FEMsgHandler");
if (maxPct > 0)
{
// Defined in rssmonfcn.cpp
exemgr::startRssMon(maxPct, pauseSeconds);
}
setpriority(PRIO_PROCESS, 0, priority);
std::string teleServerHost(rm_->getConfig()->getConfig("QueryTele", "Host"));
if (!teleServerHost.empty())
{
int teleServerPort = toInt(rm_->getConfig()->getConfig("QueryTele", "Port"));
if (teleServerPort > 0)
{
teleServerParms_ = querytele::QueryTeleServerParms(teleServerHost, teleServerPort);
}
}
std::cout << "Starting ExeMgr: st = " << serverThreads << ", qs = " << rm_->getEmExecQueueSize()
<< ", mx = " << maxPct << ", cf = " << rm_->getConfig()->configFile() << std::endl;
{
BRM::DBRM* dbrm = new BRM::DBRM();
dbrm->setSystemQueryReady(true);
delete dbrm;
}
threadpool::ThreadPool exeMgrThreadPool(serverThreads, 0);
exeMgrThreadPool.setName("ExeMgrServer");
if (rm_->getExeMgrThreadPoolDebug() == "Y" || rm_->getExeMgrThreadPoolDebug() == "y")
{
exeMgrThreadPool.setDebug(true);
exeMgrThreadPool.invoke(threadpool::ThreadPoolMonitor(&exeMgrThreadPool));
}
// Load statistics.
try
{
statistics::StatisticsManager::instance()->loadFromFile();
}
catch (...)
{
std::cerr << "Cannot load statistics from file " << std::endl;
}
for (;;)
{
messageqcpp::IOSocket ios;
ios = mqs->accept();
exeMgrThreadPool.invoke(exemgr::SQLFrontSessionThread(ios, dec_, rm_));
}
exeMgrThreadPool.wait();
return 0;
}
} // namespace exemgr

View File

@ -0,0 +1,348 @@
/* Copyright (C) 2022 Mariadb Corporation.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#pragma once
#include <iostream>
#include <cstdint>
#include <csignal>
#include <sys/resource.h>
#undef root_name
#include <boost/filesystem.hpp>
#include "calpontselectexecutionplan.h"
#include "mcsanalyzetableexecutionplan.h"
#include "activestatementcounter.h"
#include "distributedenginecomm.h"
#include "resourcemanager.h"
#include "configcpp.h"
#include "queryteleserverparms.h"
#include "iosocket.h"
#include "joblist.h"
#include "joblistfactory.h"
#include "oamcache.h"
#include "simplecolumn.h"
#include "bytestream.h"
#include "telestats.h"
#include "messageobj.h"
#include "messagelog.h"
#include "sqllogger.h"
#include "femsghandler.h"
#include "idberrorinfo.h"
#include "MonitorProcMem.h"
#include "liboamcpp.h"
#include "crashtrace.h"
#include "service.h"
#include <mutex>
#include <thread>
#include <condition_variable>
#include "dbrm.h"
#include "mariadb_my_sys.h"
#include "statistics.h"
namespace exemgr
{
class Opt
{
public:
int m_debug;
bool m_e;
bool m_fg;
Opt() : m_debug(0), m_e(false), m_fg(false) {};
Opt(int argc, char* argv[]) : m_debug(0), m_e(false), m_fg(false)
{
int c;
while ((c = getopt(argc, argv, "edf")) != EOF)
{
switch (c)
{
case 'd': m_debug++; break;
case 'e': m_e = true; break;
case 'f': m_fg = true; break;
case '?':
default: break;
}
}
}
int getDebugLevel() const
{
return m_debug;
}
};
class ServiceExeMgr : public Service, public Opt
{
using SessionMemMap_t = std::map<uint32_t, size_t>;
using ThreadCntPerSessionMap_t = std::map<uint32_t, uint32_t>;
protected:
void log(logging::LOG_TYPE type, const std::string& str)
{
logging::LoggingID logid(16);
logging::Message::Args args;
logging::Message message(8);
args.add(strerror(errno));
message.format(args);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(type, message, logid);
}
public:
ServiceExeMgr(const Opt& opt) : Service("ExeMgr"), Opt(opt), msgLog_(logging::Logger(16))
{
bool runningWithExeMgr = true;
rm_ = joblist::ResourceManager::instance(runningWithExeMgr);
}
ServiceExeMgr(const Opt& opt, config::Config* aConfig) : Service("ExeMgr"), Opt(opt), msgLog_(logging::Logger(16))
{
bool runningWithExeMgr = true;
rm_ = joblist::ResourceManager::instance(runningWithExeMgr, aConfig);
}
void LogErrno() override
{
log(logging::LOG_TYPE_CRITICAL, std::string(strerror(errno)));
}
void ParentLogChildMessage(const std::string& str) override
{
log(logging::LOG_TYPE_INFO, str);
}
int Child() override;
int Run()
{
return m_fg ? Child() : RunForking();
}
static const constexpr unsigned logDefaultMsg = logging::M0000;
static const constexpr unsigned logDbProfStartStatement = logging::M0028;
static const constexpr unsigned logDbProfEndStatement = logging::M0029;
static const constexpr unsigned logStartSql = logging::M0041;
static const constexpr unsigned logEndSql = logging::M0042;
static const constexpr unsigned logRssTooBig = logging::M0044;
static const constexpr unsigned logDbProfQueryStats = logging::M0047;
static const constexpr unsigned logExeMgrExcpt = logging::M0055;
// If any flags other than the table mode flags are set, produce output to screeen
static const constexpr uint32_t flagsWantOutput = (0xffffffff & ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH &
~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF);
logging::Logger& getLogger()
{
return msgLog_;
}
void updateSessionMap(const size_t pct)
{
std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
for (auto mapIter = sessionMemMap_.begin(); mapIter != sessionMemMap_.end(); ++mapIter)
{
if (pct > mapIter->second)
{
mapIter->second = pct;
}
}
}
ThreadCntPerSessionMap_t& getThreadCntPerSessionMap()
{
return threadCntPerSessionMap_;
}
std::mutex& getThreadCntPerSessionMapMutex()
{
return threadCntPerSessionMapMutex_;
}
void initMaxMemPct(uint32_t sessionId)
{
// WIP
if (sessionId < 0x80000000)
{
std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
auto mapIter = sessionMemMap_.find(sessionId);
if (mapIter == sessionMemMap_.end())
{
sessionMemMap_[sessionId] = 0;
}
else
{
mapIter->second = 0;
}
}
}
uint64_t getMaxMemPct(const uint32_t sessionId)
{
uint64_t maxMemoryPct = 0;
// WIP
if (sessionId < 0x80000000)
{
std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
auto mapIter = sessionMemMap_.find(sessionId);
if (mapIter != sessionMemMap_.end())
{
maxMemoryPct = (uint64_t)mapIter->second;
}
}
return maxMemoryPct;
}
void deleteMaxMemPct(uint32_t sessionId)
{
if (sessionId < 0x80000000)
{
std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
auto mapIter = sessionMemMap_.find(sessionId);
if (mapIter != sessionMemMap_.end())
{
sessionMemMap_.erase(sessionId);
}
}
}
//...Increment the number of threads using the specified sessionId
void incThreadCntPerSession(uint32_t sessionId)
{
std::lock_guard<std::mutex> lk(threadCntPerSessionMapMutex_);
auto mapIter = threadCntPerSessionMap_.find(sessionId);
if (mapIter == threadCntPerSessionMap_.end())
threadCntPerSessionMap_.insert(ThreadCntPerSessionMap_t::value_type(sessionId, 1));
else
mapIter->second++;
}
//...Decrement the number of threads using the specified sessionId.
//...When the thread count for a sessionId reaches 0, the corresponding
//...CalpontSystemCatalog objects are deleted.
//...The user query and its associated catalog query have a different
//...session Id where the highest bit is flipped.
//...The object with id(sessionId | 0x80000000) cannot be removed before
//...user query session completes because the sysdata may be used for
//...debugging/stats purpose, such as result graph, etc.
void decThreadCntPerSession(uint32_t sessionId)
{
std::lock_guard<std::mutex> lk(threadCntPerSessionMapMutex_);
auto mapIter = threadCntPerSessionMap_.find(sessionId);
if (mapIter != threadCntPerSessionMap_.end())
{
if (--mapIter->second == 0)
{
threadCntPerSessionMap_.erase(mapIter);
execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
execplan::CalpontSystemCatalog::removeCalpontSystemCatalog((sessionId ^ 0x80000000));
}
}
}
ActiveStatementCounter* getStatementsRunningCount()
{
return statementsRunningCount_;
}
joblist::DistributedEngineComm* getDec()
{
return dec_;
}
int toInt(const std::string& val)
{
if (val.length() == 0)
return -1;
return static_cast<int>(config::Config::fromText(val));
}
const std::string prettyPrintMiniInfo(const std::string& in);
const std::string timeNow()
{
time_t outputTime = time(0);
struct tm ltm;
char buf[32]; // ctime(3) says at least 26
size_t len = 0;
asctime_r(localtime_r(&outputTime, &ltm), buf);
len = strlen(buf);
if (len > 0)
--len;
if (buf[len] == '\n')
buf[len] = 0;
return buf;
}
querytele::QueryTeleServerParms& getTeleServerParms()
{
return teleServerParms_;
}
joblist::ResourceManager& getRm()
{
return *rm_;
}
private:
void setupSignalHandlers();
int8_t setupCwd()
{
std::string workdir = rm_->getScWorkingDir();
int8_t rc = chdir(workdir.c_str());
if (rc < 0 || access(".", W_OK) != 0)
rc = chdir("/tmp");
return (rc < 0) ? -5 : rc;
}
int setupResources()
{
struct rlimit rlim;
if (getrlimit(RLIMIT_NOFILE, &rlim) != 0)
{
return -1;
}
rlim.rlim_cur = rlim.rlim_max = 65536;
if (setrlimit(RLIMIT_NOFILE, &rlim) != 0)
{
return -2;
}
if (getrlimit(RLIMIT_NOFILE, &rlim) != 0)
{
return -3;
}
if (rlim.rlim_cur != 65536)
{
return -4;
}
return 0;
}
logging::Logger msgLog_;
SessionMemMap_t sessionMemMap_; // track memory% usage during a query
std::mutex sessionMemMapMutex_;
//...The FrontEnd may establish more than 1 connection (which results in
// more than 1 ExeMgr thread) per session. These threads will share
// the same CalpontSystemCatalog object for that session. Here, we
// define a std::map to track how many threads are sharing each session, so
// that we know when we can safely delete a CalpontSystemCatalog object
// shared by multiple threads per session.
ThreadCntPerSessionMap_t threadCntPerSessionMap_;
std::mutex threadCntPerSessionMapMutex_;
ActiveStatementCounter* statementsRunningCount_;
joblist::DistributedEngineComm* dec_;
joblist::ResourceManager* rm_;
// Its attributes are set in Child()
querytele::QueryTeleServerParms teleServerParms_;
};
extern ServiceExeMgr* globServiceExeMgr;
}

View File

@ -0,0 +1,985 @@
/* Copyright (C) 2022 Mariadb Corporation.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include "sqlfrontsessionthread.h"
namespace exemgr
{
uint64_t SQLFrontSessionThread::getMaxMemPct(uint32_t sessionId)
{
return globServiceExeMgr->getMaxMemPct(sessionId);
}
void SQLFrontSessionThread::deleteMaxMemPct(uint32_t sessionId)
{
return globServiceExeMgr->deleteMaxMemPct(sessionId);
}
void SQLFrontSessionThread::incThreadCntPerSession(uint32_t sessionId)
{
return globServiceExeMgr->incThreadCntPerSession(sessionId);
}
void SQLFrontSessionThread::decThreadCntPerSession(uint32_t sessionId)
{
return globServiceExeMgr->decThreadCntPerSession(sessionId);
}
void SQLFrontSessionThread::initMaxMemPct(uint32_t sessionId)
{
return globServiceExeMgr->initMaxMemPct(sessionId);
}
//...Get and log query stats to specified output stream
const std::string SQLFrontSessionThread::formatQueryStats(
joblist::SJLP& jl, // joblist associated with query
const std::string& label, // header label to print in front of log output
bool includeNewLine, // include line breaks in query stats std::string
bool vtableModeOn, bool wantExtendedStats, uint64_t rowsReturned)
{
std::ostringstream os;
// Get stats if not already acquired for current query
if (!fStatsRetrieved)
{
if (wantExtendedStats)
{
// wait for the ei data to be written by another thread (brain-dead)
struct timespec req = {0, 250000}; // 250 usec
nanosleep(&req, 0);
}
// Get % memory usage during current query for sessionId
jl->querySummary(wantExtendedStats);
fStats = jl->queryStats();
fStats.fMaxMemPct = getMaxMemPct(fStats.fSessionID);
fStats.fRows = rowsReturned;
fStatsRetrieved = true;
}
std::string queryMode;
queryMode = (vtableModeOn ? "Distributed" : "Standard");
// Log stats to specified output stream
os << label << ": MaxMemPct-" << fStats.fMaxMemPct << "; NumTempFiles-" << fStats.fNumFiles
<< "; TempFileSpace-" << roundBytes(fStats.fFileBytes) << "; ApproxPhyI/O-" << fStats.fPhyIO
<< "; CacheI/O-" << fStats.fCacheIO << "; BlocksTouched-" << fStats.fMsgRcvCnt;
if (includeNewLine)
os << std::endl << " "; // insert line break
else
os << "; "; // continue without line break
os << "PartitionBlocksEliminated-" << fStats.fCPBlocksSkipped << "; MsgBytesIn-"
<< roundBytes(fStats.fMsgBytesIn) << "; MsgBytesOut-" << roundBytes(fStats.fMsgBytesOut) << "; Mode-"
<< queryMode;
return os.str();
}
//... Round off to human readable format (KB, MB, or GB).
const std::string SQLFrontSessionThread::roundBytes(uint64_t value) const
{
const char* units[] = {"B", "KB", "MB", "GB", "TB"};
uint64_t i = 0, up = 0;
uint64_t roundedValue = value;
while (roundedValue > 1024 && i < 4)
{
up = (roundedValue & 512);
roundedValue /= 1024;
i++;
}
if (up)
roundedValue++;
std::ostringstream oss;
oss << roundedValue << units[i];
return oss.str();
}
//...Round off to nearest (1024*1024) MB
uint64_t SQLFrontSessionThread::roundMB(uint64_t value) const
{
uint64_t roundedValue = value >> 20;
if (value & 524288)
roundedValue++;
return roundedValue;
}
void SQLFrontSessionThread::setRMParms(const execplan::CalpontSelectExecutionPlan::RMParmVec& parms)
{
for (execplan::CalpontSelectExecutionPlan::RMParmVec::const_iterator it = parms.begin();
it != parms.end(); ++it)
{
switch (it->id)
{
case execplan::PMSMALLSIDEMEMORY:
{
globServiceExeMgr->getRm().addHJPmMaxSmallSideMap(it->sessionId, it->value);
break;
}
case execplan::UMSMALLSIDEMEMORY:
{
globServiceExeMgr->getRm().addHJUmMaxSmallSideMap(it->sessionId, it->value);
break;
}
default:;
}
}
}
void SQLFrontSessionThread::buildSysCache(const execplan::CalpontSelectExecutionPlan& csep,
boost::shared_ptr<execplan::CalpontSystemCatalog> csc)
{
const execplan::CalpontSelectExecutionPlan::ColumnMap& colMap = csep.columnMap();
std::string schemaName;
for (auto it = colMap.begin(); it != colMap.end(); ++it)
{
const auto sc = dynamic_cast<execplan::SimpleColumn*>((it->second).get());
if (sc)
{
schemaName = sc->schemaName();
// only the first time a schema is got will actually query
// system catalog. System catalog keeps a schema name std::map.
// if a schema exists, the call getSchemaInfo returns without
// doing anything.
if (!schemaName.empty())
csc->getSchemaInfo(schemaName);
}
}
execplan::CalpontSelectExecutionPlan::SelectList::const_iterator subIt;
for (subIt = csep.derivedTableList().begin(); subIt != csep.derivedTableList().end(); ++subIt)
{
buildSysCache(*(dynamic_cast<execplan::CalpontSelectExecutionPlan*>(subIt->get())), csc);
}
}
void SQLFrontSessionThread::writeCodeAndError(messageqcpp::ByteStream::quadbyte code, const std::string emsg)
{
messageqcpp::ByteStream emsgBs;
messageqcpp::ByteStream tbs;
tbs << code;
fIos.write(tbs);
emsgBs << emsg;
fIos.write(emsgBs);
}
void SQLFrontSessionThread::analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl, bool& stmtCounted)
{
auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount();
messageqcpp::ByteStream::quadbyte qb;
execplan::MCSAnalyzeTableExecutionPlan caep;
bs = fIos.read();
caep.unserialize(bs);
statementsRunningCount->incr(stmtCounted);
jl = joblist::JobListFactory::makeJobList(&caep, fRm, false, true);
// Joblist is empty.
if (jl->status() == logging::statisticsJobListEmpty)
{
if (caep.traceOn())
std::cout << "JobList is empty " << std::endl;
jl.reset();
bs.restart();
qb = ANALYZE_TABLE_SUCCESS;
bs << qb;
fIos.write(bs);
bs.reset();
statementsRunningCount->decr(stmtCounted);
return;
}
if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers()))
{
std::cout << "fEc setup " << std::endl;
fEc->Setup();
}
if (jl->status() == 0)
{
std::string emsg;
if (jl->putEngineComm(fEc) != 0)
throw std::runtime_error(jl->errMsg());
}
else
{
throw std::runtime_error("ExeMgr: could not build a JobList!");
}
// Execute a joblist.
jl->doQuery();
FEMsgHandler msgHandler(jl, &fIos);
msgHandler.start();
auto rowCount = jl->projectTable(100, bs);
msgHandler.stop();
auto outRG = (static_cast<joblist::TupleJobList*>(jl.get()))->getOutputRowGroup();
if (caep.traceOn())
std::cout << "Row count " << rowCount << std::endl;
// Process `RowGroup`, increase an epoch and save statistics to the file.
auto* statisticsManager = statistics::StatisticsManager::instance();
statisticsManager->analyzeColumnKeyTypes(outRG, caep.traceOn());
statisticsManager->incEpoch();
statisticsManager->saveToFile();
// Distribute statistics across all ExeMgr clients if possible.
statistics::StatisticsDistributor::instance()->distributeStatistics();
// Send the signal back to front-end.
bs.restart();
qb = ANALYZE_TABLE_SUCCESS;
bs << qb;
fIos.write(bs);
bs.reset();
statementsRunningCount->decr(stmtCounted);
}
void SQLFrontSessionThread::analyzeTableHandleStats(messageqcpp::ByteStream& bs)
{
messageqcpp::ByteStream::quadbyte qb;
#ifdef DEBUG_STATISTICS
std::cout << "Get distributed statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl;
#endif
bs = fIos.read();
#ifdef DEBUG_STATISTICS
std::cout << "Read the hash from statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl;
#endif
uint64_t dataHashRec;
bs >> dataHashRec;
uint64_t dataHash = statistics::StatisticsManager::instance()->computeHashFromStats();
// The stats are the same.
if (dataHash == dataHashRec)
{
#ifdef DEBUG_STATISTICS
std::cout << "The hash is the same as rec hash on ExeMgr(Client) from ExeMgr(Server) " << std::endl;
#endif
qb = ANALYZE_TABLE_SUCCESS;
bs << qb;
fIos.write(bs);
bs.reset();
return;
}
bs.restart();
qb = ANALYZE_TABLE_NEED_STATS;
bs << qb;
fIos.write(bs);
bs.restart();
bs = fIos.read();
#ifdef DEBUG_STATISTICS
std::cout << "Read statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl;
#endif
statistics::StatisticsManager::instance()->unserialize(bs);
statistics::StatisticsManager::instance()->saveToFile();
#ifdef DEBUG_STATISTICS
std::cout << "Write flag on ExeMgr(Client) to ExeMgr(Server)" << std::endl;
#endif
qb = ANALYZE_TABLE_SUCCESS;
bs << qb;
fIos.write(bs);
bs.reset();
}
void SQLFrontSessionThread::operator()()
{
messageqcpp::ByteStream bs, inbs;
execplan::CalpontSelectExecutionPlan csep;
csep.sessionID(0);
joblist::SJLP jl;
bool incSQLFrontSessionThreadCnt = true;
std::mutex jlMutex;
std::condition_variable jlCleanupDone;
int destructing = 0;
int gDebug = globServiceExeMgr->getDebugLevel();
logging::Logger& msgLog = globServiceExeMgr->getLogger();
bool selfJoin = false;
bool tryTuples = false;
bool usingTuples = false;
bool stmtCounted = false;
auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount();
try
{
for (;;)
{
selfJoin = false;
tryTuples = false;
usingTuples = false;
if (jl)
{
// puts the real destruction in another thread to avoid
// making the whole session wait. It can take several seconds.
std::unique_lock<std::mutex> scoped(jlMutex);
destructing++;
std::thread bgdtor(
[jl, &jlMutex, &jlCleanupDone, &destructing]
{
std::unique_lock<std::mutex> scoped(jlMutex);
const_cast<joblist::SJLP&>(jl).reset(); // this happens second; does real destruction
if (--destructing == 0)
jlCleanupDone.notify_one();
});
jl.reset(); // this runs first
bgdtor.detach();
}
bs = fIos.read();
if (bs.length() == 0)
{
if (gDebug > 1 || (gDebug && !csep.isInternal()))
std::cout << "### Got a close(1) for session id " << csep.sessionID() << std::endl;
// connection closed by client
fIos.close();
break;
}
else if (bs.length() < 4) // Not a CalpontSelectExecutionPlan
{
if (gDebug)
std::cout << "### Got a not-a-plan for session id " << csep.sessionID() << " with length "
<< bs.length() << std::endl;
fIos.close();
break;
}
else if (bs.length() == 4) // possible tuple flag
{
messageqcpp::ByteStream::quadbyte qb;
bs >> qb;
if (qb == 4) // UM wants new tuple i/f
{
if (gDebug)
std::cout << "### UM wants tuples" << std::endl;
tryTuples = true;
// now wait for the CSEP...
bs = fIos.read();
}
else if (qb == 5) // somebody wants stats
{
bs.restart();
qb = statementsRunningCount->cur();
bs << qb;
qb = statementsRunningCount->waiting();
bs << qb;
fIos.write(bs);
fIos.close();
break;
}
else if (qb == ANALYZE_TABLE_EXECUTE)
{
analyzeTableExecute(bs, jl, stmtCounted);
continue;
}
else if (qb == ANALYZE_TABLE_REC_STATS)
{
analyzeTableHandleStats(bs);
continue;
}
else
{
if (gDebug)
std::cout << "### Got a not-a-plan value " << qb << std::endl;
fIos.close();
break;
}
}
new_plan:
try
{
csep.unserialize(bs);
}
catch (logging::IDBExcept& ex)
{
// We can get here on illegal function parameter data type, e.g.
// SELECT blob_column|1 FROM t1;
statementsRunningCount->decr(stmtCounted);
writeCodeAndError(ex.errorCode(), std::string(ex.what()));
continue;
}
querytele::QueryTeleStats qts;
if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT"))
{
qts.query_uuid = csep.uuid();
qts.msg_type = querytele::QueryTeleStats::QT_START;
qts.start_time = querytele::QueryTeleClient::timeNowms();
qts.query = csep.data();
qts.session_id = csep.sessionID();
qts.query_type = csep.queryType();
qts.system_name = fOamCachePtr->getSystemName();
qts.module_name = fOamCachePtr->getModuleName();
qts.local_query = csep.localQuery();
qts.schema_name = csep.schemaName();
fTeleClient.postQueryTele(qts);
}
if (gDebug > 1 || (gDebug && !csep.isInternal()))
std::cout << "### For session id " << csep.sessionID() << ", got a CSEP" << std::endl;
setRMParms(csep.rmParms());
// Re-establish lost PP connections.
if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers()))
{
fEc->Setup();
}
// @bug 1021. try to get schema cache for a come in query.
// skip system catalog queries.
if (!csep.isInternal())
{
boost::shared_ptr<execplan::CalpontSystemCatalog> csc =
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(csep.sessionID());
buildSysCache(csep, csc);
}
// As soon as we have a session id for this thread, update the
// thread count per session; only do this once per thread.
// Mask 0x80000000 is for associate user query and csc query
if (incSQLFrontSessionThreadCnt)
{
// WIP
incThreadCntPerSession(csep.sessionID() | 0x80000000);
incSQLFrontSessionThreadCnt = false;
}
bool needDbProfEndStatementMsg = false;
logging::Message::Args args;
std::string sqlText = csep.data();
logging::LoggingID li(16, csep.sessionID(), csep.txnID());
// Initialize stats for this query, including
// init sessionMemMap entry for this session to 0 memory %.
// We will need this later for traceOn() or if we receive a
// table request with qb=3 (see below). This is also recorded
// as query start time.
initStats(csep.sessionID(), sqlText);
fStats.fQueryType = csep.queryType();
// Log start and end statement if tracing is enabled. Keep in
// mind the trace flag won't be set for system catalog queries.
if (csep.traceOn())
{
args.reset();
args.add((int)csep.statementID());
args.add((int)csep.verID().currentScn);
args.add(sqlText);
msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfStartStatement, args, li);
needDbProfEndStatementMsg = true;
}
// Don't log subsequent self joins after first.
if (selfJoin)
sqlText = "";
std::ostringstream oss;
oss << sqlText << "; |" << csep.schemaName() << "|";
logging::SQLLogger sqlLog(oss.str(), li);
statementsRunningCount->incr(stmtCounted);
if (tryTuples)
{
try // @bug2244: try/catch around fIos.write() calls responding to makeTupleList
{
jl = joblist::JobListFactory::makeJobList(&csep, fRm, true, true);
// assign query stats
jl->queryStats(fStats);
if ((jl->status()) == 0 && (jl->putEngineComm(fEc) == 0))
{
usingTuples = true;
// Tell the FE that we're sending tuples back, not TableBands
writeCodeAndError(0, "NOERROR");
auto tjlp = dynamic_cast<joblist::TupleJobList*>(jl.get());
assert(tjlp);
messageqcpp::ByteStream tbs;
tbs << tjlp->getOutputRowGroup();
fIos.write(tbs);
}
else
{
const std::string emsg = jl->errMsg();
statementsRunningCount->decr(stmtCounted);
writeCodeAndError(jl->status(), emsg);
std::cerr << "ExeMgr: could not build a tuple joblist: " << emsg << std::endl;
continue;
}
}
catch (std::exception& ex)
{
std::ostringstream errMsg;
errMsg << "ExeMgr: error writing makeJoblist "
"response; "
<< ex.what();
throw std::runtime_error(errMsg.str());
}
catch (...)
{
std::ostringstream errMsg;
errMsg << "ExeMgr: unknown error writing makeJoblist "
"response; ";
throw std::runtime_error(errMsg.str());
}
if (!usingTuples)
{
if (gDebug)
std::cout << "### UM wanted tuples but it didn't work out :-(" << std::endl;
}
else
{
if (gDebug)
std::cout << "### UM wanted tuples and we'll do our best;-)" << std::endl;
}
}
else
{
usingTuples = false;
jl = joblist::JobListFactory::makeJobList(&csep, fRm, false, true);
if (jl->status() == 0)
{
std::string emsg;
if (jl->putEngineComm(fEc) != 0)
throw std::runtime_error(jl->errMsg());
}
else
{
throw std::runtime_error("ExeMgr: could not build a JobList!");
}
}
jl->doQuery();
execplan::CalpontSystemCatalog::OID tableOID;
bool swallowRows = false;
joblist::DeliveredTableMap tm;
uint64_t totalBytesSent = 0;
uint64_t totalRowCount = 0;
// Project each table as the FE asks for it
for (;;)
{
bs = fIos.read();
if (bs.length() == 0)
{
if (gDebug > 1 || (gDebug && !csep.isInternal()))
std::cout << "### Got a close(2) for session id " << csep.sessionID() << std::endl;
break;
}
if (gDebug && bs.length() > 4)
std::cout << "### For session id " << csep.sessionID() << ", got too many bytes = " << bs.length()
<< std::endl;
// TODO: Holy crud! Can this be right?
//@bug 1444 Yes, if there is a self-join
if (bs.length() > 4)
{
selfJoin = true;
statementsRunningCount->decr(stmtCounted);
goto new_plan;
}
assert(bs.length() == 4);
messageqcpp::ByteStream::quadbyte qb;
try // @bug2244: try/catch around fIos.write() calls responding to qb command
{
bs >> qb;
if (gDebug > 1 || (gDebug && !csep.isInternal()))
std::cout << "### For session id " << csep.sessionID() << ", got a command = " << qb
<< std::endl;
if (qb == 0)
{
// No more tables, query is done
break;
}
else if (qb == 1)
{
// super-secret flag indicating that the UM is going to scarf down all the rows in the
// query.
swallowRows = true;
tm = jl->deliveredTables();
continue;
}
else if (qb == 2)
{
// UM just wants any table
assert(swallowRows);
auto iter = tm.begin();
if (iter == tm.end())
{
if (gDebug > 1 || (gDebug && !csep.isInternal()))
std::cout << "### For session id " << csep.sessionID() << ", returning end flag"
<< std::endl;
bs.restart();
bs << (messageqcpp::ByteStream::byte)1;
fIos.write(bs);
continue;
}
tableOID = iter->first;
}
else if (qb == 3) // special option-UM wants job stats std::string
{
std::string statsString;
// Log stats std::string to be sent back to front end
statsString = formatQueryStats(
jl, "Query Stats", false,
!(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF),
(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), totalRowCount);
bs.restart();
bs << statsString;
if ((csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG) != 0)
{
bs << jl->extendedInfo();
bs << globServiceExeMgr->prettyPrintMiniInfo(jl->miniInfo());
}
else
{
std::string empty;
bs << empty;
bs << empty;
}
// send stats to connector for inserting to the querystats table
fStats.serialize(bs);
fIos.write(bs);
continue;
}
// for table mode handling
else if (qb == 4)
{
statementsRunningCount->decr(stmtCounted);
bs = fIos.read();
goto new_plan;
}
else // (qb > 3)
{
// Return table bands for the requested tableOID
tableOID = static_cast<execplan::CalpontSystemCatalog::OID>(qb);
}
}
catch (std::exception& ex)
{
std::ostringstream errMsg;
errMsg << "ExeMgr: error writing qb response "
"for qb cmd "
<< qb << "; " << ex.what();
throw std::runtime_error(errMsg.str());
}
catch (...)
{
std::ostringstream errMsg;
errMsg << "ExeMgr: unknown error writing qb response "
"for qb cmd "
<< qb;
throw std::runtime_error(errMsg.str());
}
if (swallowRows)
tm.erase(tableOID);
FEMsgHandler msgHandler(jl, &fIos);
if (tableOID == 100)
msgHandler.start();
//...Loop serializing table bands projected for the tableOID
for (;;)
{
uint32_t rowCount;
rowCount = jl->projectTable(tableOID, bs);
msgHandler.stop();
if (jl->status())
{
const auto errInfo = logging::IDBErrorInfo::instance();
if (jl->errMsg().length() != 0)
bs << jl->errMsg();
else
bs << errInfo->errorMsg(jl->status());
}
try // @bug2244: try/catch around fIos.write() calls projecting rows
{
if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3)
{
// Skip the write to the front end until the last empty band. Used to time queries
// through without any front end waiting.
if (tableOID < 3000 || rowCount == 0)
fIos.write(bs);
}
else
{
fIos.write(bs);
}
}
catch (std::exception& ex)
{
msgHandler.stop();
std::ostringstream errMsg;
errMsg << "ExeMgr: error projecting rows "
"for tableOID: "
<< tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount << "; "
<< ex.what();
jl->abort();
while (rowCount)
rowCount = jl->projectTable(tableOID, bs);
if (tableOID == 100 && msgHandler.aborted())
{
/* TODO: modularize the cleanup code, as well as
* the rest of this fcn */
decThreadCntPerSession(csep.sessionID() | 0x80000000);
statementsRunningCount->decr(stmtCounted);
fIos.close();
return;
}
// std::cout << "connection drop\n";
throw std::runtime_error(errMsg.str());
}
catch (...)
{
std::ostringstream errMsg;
msgHandler.stop();
errMsg << "ExeMgr: unknown error projecting rows "
"for tableOID: "
<< tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount;
jl->abort();
while (rowCount)
rowCount = jl->projectTable(tableOID, bs);
throw std::runtime_error(errMsg.str());
}
totalRowCount += rowCount;
totalBytesSent += bs.length();
if (rowCount == 0)
{
msgHandler.stop();
// No more bands, table is done
bs.reset();
// @bug 2083 decr active statement count here for table mode.
if (!usingTuples)
statementsRunningCount->decr(stmtCounted);
break;
}
else
{
bs.restart();
}
} // End of loop to project and serialize table bands for a table
} // End of loop to process tables
// @bug 828
if (csep.traceOn())
jl->graph(csep.sessionID());
if (needDbProfEndStatementMsg)
{
std::string ss;
std::ostringstream prefix;
prefix << "ses:" << csep.sessionID() << " Query Totals";
// Log stats std::string to standard out
ss = formatQueryStats(jl, prefix.str(), true,
!(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF),
(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG),
totalRowCount);
//@Bug 1306. Added timing info for real time tracking.
std::cout << ss << " at " << globServiceExeMgr->timeNow() << std::endl;
// log query stats to debug log file
args.reset();
args.add((int)csep.statementID());
args.add(fStats.fMaxMemPct);
args.add(fStats.fNumFiles);
args.add(fStats.fFileBytes); // log raw byte count instead of MB
args.add(fStats.fPhyIO);
args.add(fStats.fCacheIO);
args.add(fStats.fMsgRcvCnt);
args.add(fStats.fMsgBytesIn);
args.add(fStats.fMsgBytesOut);
args.add(fStats.fCPBlocksSkipped);
msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfQueryStats, args, li);
//@bug 1327
deleteMaxMemPct(csep.sessionID());
// Calling reset here, will cause joblist destructor to be
// called, which "joins" the threads. We need to do that
// here to make sure all syslogging from all the threads
// are complete; and that our logDbProfEndStatement will
// appear "last" in the syslog for this SQL statement.
// puts the real destruction in another thread to avoid
// making the whole session wait. It can take several seconds.
int stmtID = csep.statementID();
std::unique_lock<std::mutex> scoped(jlMutex);
destructing++;
std::thread bgdtor(
[jl, &jlMutex, &jlCleanupDone, stmtID, &li, &destructing, &msgLog]
{
std::unique_lock<std::mutex> scoped(jlMutex);
const_cast<joblist::SJLP&>(jl).reset(); // this happens second; does real destruction
logging::Message::Args args;
args.add(stmtID);
msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfEndStatement, args, li);
if (--destructing == 0)
jlCleanupDone.notify_one();
});
jl.reset(); // this happens first
bgdtor.detach();
}
else
// delete sessionMemMap entry for this session's memory % use
deleteMaxMemPct(csep.sessionID());
std::string endtime(globServiceExeMgr->timeNow());
if ((csep.traceFlags() & globServiceExeMgr->flagsWantOutput) && (csep.sessionID() < 0x80000000))
{
std::cout << "For session " << csep.sessionID() << ": " << totalBytesSent << " bytes sent back at "
<< endtime << std::endl;
// @bug 663 - Implemented caltraceon(16) to replace the
// $FIFO_SINK compiler definition in pColStep.
// This option consumes rows in the project steps.
if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS4)
{
std::cout << std::endl;
std::cout << "**** No data returned to DM. Rows consumed "
"in ProjectSteps - caltrace(16) is on (FIFO_SINK)."
" ****"
<< std::endl;
std::cout << std::endl;
}
else if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3)
{
std::cout << std::endl;
std::cout << "**** No data returned to DM - caltrace(8) is "
"on (SWALLOW_ROWS_EXEMGR). ****"
<< std::endl;
std::cout << std::endl;
}
}
statementsRunningCount->decr(stmtCounted);
if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT"))
{
qts.msg_type = querytele::QueryTeleStats::QT_SUMMARY;
qts.max_mem_pct = fStats.fMaxMemPct;
qts.num_files = fStats.fNumFiles;
qts.phy_io = fStats.fPhyIO;
qts.cache_io = fStats.fCacheIO;
qts.msg_rcv_cnt = fStats.fMsgRcvCnt;
qts.cp_blocks_skipped = fStats.fCPBlocksSkipped;
qts.msg_bytes_in = fStats.fMsgBytesIn;
qts.msg_bytes_out = fStats.fMsgBytesOut;
qts.rows = totalRowCount;
qts.end_time = querytele::QueryTeleClient::timeNowms();
qts.session_id = csep.sessionID();
qts.query_type = csep.queryType();
qts.query = csep.data();
qts.system_name = fOamCachePtr->getSystemName();
qts.module_name = fOamCachePtr->getModuleName();
qts.local_query = csep.localQuery();
fTeleClient.postQueryTele(qts);
}
}
// Release CSC object (for sessionID) that was added by makeJobList()
// Mask 0x80000000 is for associate user query and csc query.
// (actual joblist destruction happens at the top of this loop)
decThreadCntPerSession(csep.sessionID() | 0x80000000);
}
catch (std::exception& ex)
{
decThreadCntPerSession(csep.sessionID() | 0x80000000);
statementsRunningCount->decr(stmtCounted);
std::cerr << "### ExeMgr ses:" << csep.sessionID() << " caught: " << ex.what() << std::endl;
logging::Message::Args args;
logging::LoggingID li(16, csep.sessionID(), csep.txnID());
args.add(ex.what());
msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li);
fIos.close();
}
catch (...)
{
decThreadCntPerSession(csep.sessionID() | 0x80000000);
statementsRunningCount->decr(stmtCounted);
std::cerr << "### Exception caught!" << std::endl;
logging::Message::Args args;
logging::LoggingID li(16, csep.sessionID(), csep.txnID());
args.add("ExeMgr caught unknown exception");
msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li);
fIos.close();
}
// make sure we don't leave scope while joblists are being destroyed
std::unique_lock<std::mutex> scoped(jlMutex);
while (destructing > 0)
jlCleanupDone.wait(scoped);
}
}; // namespace exemgr

View File

@ -0,0 +1,131 @@
/* Copyright (C) 2022 Mariadb Corporation.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#pragma once
#include <iostream>
#include <cstdint>
#include <csignal>
#include <sys/resource.h>
#undef root_name
#include <boost/filesystem.hpp>
#include "calpontselectexecutionplan.h"
#include "mcsanalyzetableexecutionplan.h"
#include "activestatementcounter.h"
#include "distributedenginecomm.h"
#include "resourcemanager.h"
#include "configcpp.h"
#include "queryteleserverparms.h"
#include "iosocket.h"
#include "joblist.h"
#include "joblistfactory.h"
#include "oamcache.h"
#include "simplecolumn.h"
#include "bytestream.h"
#include "telestats.h"
#include "messageobj.h"
#include "messagelog.h"
#include "sqllogger.h"
#include "femsghandler.h"
#include "idberrorinfo.h"
#include "MonitorProcMem.h"
#include "liboamcpp.h"
#include "crashtrace.h"
#include "service.h"
#include <mutex>
#include <thread>
#include <condition_variable>
#include "dbrm.h"
#include "mariadb_my_sys.h"
#include "statistics.h"
#include "serviceexemgr.h"
namespace exemgr
{
class SQLFrontSessionThread
{
public:
SQLFrontSessionThread(const messageqcpp::IOSocket& ios, joblist::DistributedEngineComm* ec,
joblist::ResourceManager* rm)
: fIos(ios)
, fEc(ec)
, fRm(rm)
, fStatsRetrieved(false)
, fTeleClient(globServiceExeMgr->getTeleServerParms())
, fOamCachePtr(oam::OamCache::makeOamCache())
{
}
private:
messageqcpp::IOSocket fIos;
joblist::DistributedEngineComm* fEc;
joblist::ResourceManager* fRm;
querystats::QueryStats fStats;
// Variables used to store return stats
bool fStatsRetrieved;
querytele::QueryTeleClient fTeleClient;
oam::OamCache* fOamCachePtr; // this ptr is copyable...
//...Reinitialize stats for start of a new query
void initStats(uint32_t sessionId, std::string& sqlText)
{
initMaxMemPct(sessionId);
fStats.reset();
fStats.setStartTime();
fStats.fSessionID = sessionId;
fStats.fQuery = sqlText;
fStatsRetrieved = false;
}
//...Get % memory usage during latest query for sesssionId.
//...SessionId >= 0x80000000 is system catalog query we can ignore.
static uint64_t getMaxMemPct(uint32_t sessionId);
//...Delete sessionMemMap entry for the specified session's memory % use.
//...SessionId >= 0x80000000 is system catalog query we can ignore.
static void deleteMaxMemPct(uint32_t sessionId);
//...Get and log query stats to specified output stream
const std::string formatQueryStats(
joblist::SJLP& jl, // joblist associated with query
const std::string& label, // header label to print in front of log output
bool includeNewLine, // include line breaks in query stats std::string
bool vtableModeOn, bool wantExtendedStats, uint64_t rowsReturned);
static void incThreadCntPerSession(uint32_t sessionId);
static void decThreadCntPerSession(uint32_t sessionId);
//...Init sessionMemMap entry for specified session to 0 memory %.
//...SessionId >= 0x80000000 is system catalog query we can ignore.
static void initMaxMemPct(uint32_t sessionId);
//... Round off to human readable format (KB, MB, or GB).
const std::string roundBytes(uint64_t value) const;
void setRMParms(const execplan::CalpontSelectExecutionPlan::RMParmVec& parms);
void buildSysCache(const execplan::CalpontSelectExecutionPlan& csep,
boost::shared_ptr<execplan::CalpontSystemCatalog> csc);
void writeCodeAndError(messageqcpp::ByteStream::quadbyte code, const std::string emsg);
void analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl, bool& stmtCounted);
void analyzeTableHandleStats(messageqcpp::ByteStream& bs);
uint64_t roundMB(uint64_t value) const;
public:
void operator()();
};
}

View File

@ -2,6 +2,10 @@
#include <atomic>
// To be refactored. There are two issues with this implentation.
// 1. It can be replaced with a lock despite the memory_order_acquire/release mem order. Needs
// std::atomic_flag instead.
// 2. https://www.realworldtech.com/forum/?threadid=189711&curpostid=189723
namespace utils
{
inline void getSpinlock(std::atomic<bool>& lock)
@ -23,4 +27,29 @@ inline void releaseSpinlock(std::atomic<bool>& lock)
lock.store(false, std::memory_order_release);
}
// c++20 offers a combination of wait/notify methods but
// I prefer to use a simpler version of uspace spin lock.
class USpaceSpinLock
{
public:
USpaceSpinLock(std::atomic_flag& flag) : flag_(flag)
{
while (flag_.test_and_set(std::memory_order_acquire))
{
;
}
};
~USpaceSpinLock()
{
release();
};
inline void release()
{
flag_.clear(std::memory_order_release);
}
private:
std::atomic_flag& flag_;
};
} // namespace utils

View File

@ -20,41 +20,11 @@
#include <iostream>
#include <vector>
#include <boost/shared_ptr.hpp>
#ifdef _MSC_VER
#include <unordered_map>
#else
#include <tr1/unordered_map>
#endif
#include "../common/simpleallocator.h"
#ifndef _HASHFIX_
#define _HASHFIX_
#ifndef __LP64__
#if __GNUC__ == 4 && __GNUC_MINOR__ < 2
// This is needed for /usr/include/c++/4.1.1/tr1/functional on 32-bit compiles
// tr1_hashtable_define_trivial_hash(long long unsigned int);
namespace std
{
namespace tr1
{
template <>
struct hash<long long unsigned int> : public std::unary_function<long long unsigned int, std::size_t>
{
std::size_t operator()(long long unsigned int val) const
{
return static_cast<std::size_t>(val);
}
};
} // namespace tr1
} // namespace std
#endif
#endif
#endif
#define NO_DATALISTS
#include "../joblist/elementtype.h"
#undef NO_DATALISTS
namespace joiner
{