You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-27 21:01:50 +03:00
MCOL-5001 This patch removes ExeMgr traces
This commit is contained in:
@ -343,7 +343,6 @@ ADD_SUBDIRECTORY(dbcon/ddlpackage)
|
|||||||
ADD_SUBDIRECTORY(dbcon/ddlpackageproc)
|
ADD_SUBDIRECTORY(dbcon/ddlpackageproc)
|
||||||
ADD_SUBDIRECTORY(dbcon/dmlpackage)
|
ADD_SUBDIRECTORY(dbcon/dmlpackage)
|
||||||
ADD_SUBDIRECTORY(dbcon/dmlpackageproc)
|
ADD_SUBDIRECTORY(dbcon/dmlpackageproc)
|
||||||
ADD_SUBDIRECTORY(exemgr)
|
|
||||||
ADD_SUBDIRECTORY(ddlproc)
|
ADD_SUBDIRECTORY(ddlproc)
|
||||||
ADD_SUBDIRECTORY(dmlproc)
|
ADD_SUBDIRECTORY(dmlproc)
|
||||||
ADD_SUBDIRECTORY(oamapps)
|
ADD_SUBDIRECTORY(oamapps)
|
||||||
|
1
debian/mariadb-plugin-columnstore.install
vendored
1
debian/mariadb-plugin-columnstore.install
vendored
@ -105,7 +105,6 @@ usr/share/columnstore/mariadb-columnstore.service
|
|||||||
usr/share/columnstore/mcs-controllernode.service
|
usr/share/columnstore/mcs-controllernode.service
|
||||||
usr/share/columnstore/mcs-ddlproc.service
|
usr/share/columnstore/mcs-ddlproc.service
|
||||||
usr/share/columnstore/mcs-dmlproc.service
|
usr/share/columnstore/mcs-dmlproc.service
|
||||||
usr/share/columnstore/mcs-exemgr.service
|
|
||||||
usr/share/columnstore/mcs-loadbrm.service
|
usr/share/columnstore/mcs-loadbrm.service
|
||||||
usr/share/columnstore/mcs-primproc.service
|
usr/share/columnstore/mcs-primproc.service
|
||||||
usr/share/columnstore/mcs-storagemanager.service
|
usr/share/columnstore/mcs-storagemanager.service
|
||||||
|
@ -1,20 +0,0 @@
|
|||||||
|
|
||||||
# include_directories( ${ENGINE_COMMON_INCLUDES} )
|
|
||||||
|
|
||||||
|
|
||||||
########### next target ###############
|
|
||||||
|
|
||||||
# set(ExeMgr_SRCS serviceexemgr.cpp sqlfrontsessionthread.cpp rssmonfcn.cpp exemgr.cpp activestatementcounter.cpp femsghandler.cpp ../utils/common/crashtrace.cpp)
|
|
||||||
|
|
||||||
# add_executable(ExeMgr ${ExeMgr_SRCS})
|
|
||||||
|
|
||||||
# target_link_libraries(ExeMgr ${ENGINE_LDFLAGS} ${ENGINE_EXEC_LIBS} ${NETSNMP_LIBRARIES} ${MARIADB_CLIENT_LIBS} cacheutils threadpool)
|
|
||||||
|
|
||||||
# target_include_directories(ExeMgr PRIVATE ${Boost_INCLUDE_DIRS})
|
|
||||||
|
|
||||||
# install(TARGETS ExeMgr DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-engine)
|
|
||||||
|
|
||||||
|
|
||||||
########### install files ###############
|
|
||||||
|
|
||||||
|
|
@ -1,59 +0,0 @@
|
|||||||
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
||||||
|
|
||||||
This program is free software; you can redistribute it and/or
|
|
||||||
modify it under the terms of the GNU General Public License
|
|
||||||
as published by the Free Software Foundation; version 2 of
|
|
||||||
the License.
|
|
||||||
|
|
||||||
This program is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with this program; if not, write to the Free Software
|
|
||||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
||||||
MA 02110-1301, USA. */
|
|
||||||
|
|
||||||
// $Id: activestatementcounter.cpp 940 2013-01-21 14:11:31Z rdempsey $
|
|
||||||
//
|
|
||||||
|
|
||||||
#include <unistd.h>
|
|
||||||
#include <boost/thread/mutex.hpp>
|
|
||||||
using namespace boost;
|
|
||||||
|
|
||||||
#include "activestatementcounter.h"
|
|
||||||
|
|
||||||
void ActiveStatementCounter::incr(bool& counted)
|
|
||||||
{
|
|
||||||
if (counted)
|
|
||||||
return;
|
|
||||||
|
|
||||||
counted = true;
|
|
||||||
boost::mutex::scoped_lock lk(fMutex);
|
|
||||||
|
|
||||||
if (upperLimit > 0)
|
|
||||||
while (fStatementCount >= upperLimit)
|
|
||||||
{
|
|
||||||
fStatementsWaiting++;
|
|
||||||
condvar.wait(lk);
|
|
||||||
--fStatementsWaiting;
|
|
||||||
}
|
|
||||||
|
|
||||||
fStatementCount++;
|
|
||||||
}
|
|
||||||
|
|
||||||
void ActiveStatementCounter::decr(bool& counted)
|
|
||||||
{
|
|
||||||
if (!counted)
|
|
||||||
return;
|
|
||||||
|
|
||||||
counted = false;
|
|
||||||
boost::mutex::scoped_lock lk(fMutex);
|
|
||||||
|
|
||||||
if (fStatementCount == 0)
|
|
||||||
return;
|
|
||||||
|
|
||||||
--fStatementCount;
|
|
||||||
condvar.notify_one();
|
|
||||||
}
|
|
@ -1,64 +0,0 @@
|
|||||||
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
||||||
|
|
||||||
This program is free software; you can redistribute it and/or
|
|
||||||
modify it under the terms of the GNU General Public License
|
|
||||||
as published by the Free Software Foundation; version 2 of
|
|
||||||
the License.
|
|
||||||
|
|
||||||
This program is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with this program; if not, write to the Free Software
|
|
||||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
||||||
MA 02110-1301, USA. */
|
|
||||||
|
|
||||||
// $Id: activestatementcounter.h 940 2013-01-21 14:11:31Z rdempsey $
|
|
||||||
//
|
|
||||||
/** @file */
|
|
||||||
|
|
||||||
#pragma once
|
|
||||||
|
|
||||||
#include <stdint.h>
|
|
||||||
|
|
||||||
#include <boost/thread/mutex.hpp>
|
|
||||||
#include <boost/thread/condition.hpp>
|
|
||||||
|
|
||||||
#include "vss.h"
|
|
||||||
|
|
||||||
class ActiveStatementCounter
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
ActiveStatementCounter(uint32_t limit) : fStatementCount(0), upperLimit(limit), fStatementsWaiting(0)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
virtual ~ActiveStatementCounter()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void incr(bool& counted);
|
|
||||||
void decr(bool& counted);
|
|
||||||
uint32_t cur() const
|
|
||||||
{
|
|
||||||
return fStatementCount;
|
|
||||||
}
|
|
||||||
uint32_t waiting() const
|
|
||||||
{
|
|
||||||
return fStatementsWaiting;
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
ActiveStatementCounter(const ActiveStatementCounter& rhs);
|
|
||||||
ActiveStatementCounter& operator=(const ActiveStatementCounter& rhs);
|
|
||||||
|
|
||||||
uint32_t fStatementCount;
|
|
||||||
uint32_t upperLimit;
|
|
||||||
uint32_t fStatementsWaiting;
|
|
||||||
boost::mutex fMutex;
|
|
||||||
boost::condition condvar;
|
|
||||||
BRM::VSS fVss;
|
|
||||||
};
|
|
||||||
|
|
@ -1,102 +0,0 @@
|
|||||||
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
||||||
Copyright (C) 2019 MariaDB Corporation
|
|
||||||
|
|
||||||
This program is free software; you can redistribute it and/or
|
|
||||||
modify it under the terms of the GNU General Public License
|
|
||||||
as published by the Free Software Foundation; version 2 of
|
|
||||||
the License.
|
|
||||||
|
|
||||||
This program is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with this program; if not, write to the Free Software
|
|
||||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
||||||
MA 02110-1301, USA. */
|
|
||||||
|
|
||||||
/**********************************************************************
|
|
||||||
* $Id: main.cpp 1000 2013-07-24 21:05:51Z pleblanc $
|
|
||||||
*
|
|
||||||
*
|
|
||||||
***********************************************************************/
|
|
||||||
/**
|
|
||||||
* @brief execution plan manager main program
|
|
||||||
*
|
|
||||||
* This is the ?main? program for dealing with execution plans and
|
|
||||||
* result sets. It sits in a loop waiting for CalpontExecutionPlan
|
|
||||||
* from the FEP. It then passes the CalpontExecutionPlan to the
|
|
||||||
* JobListFactory from which a JobList is obtained. This is passed
|
|
||||||
* to to the Query Manager running in the real-time portion of the
|
|
||||||
* EC. The ExecutionPlanManager waits until the Query Manager
|
|
||||||
* returns a result set for the job list. These results are passed
|
|
||||||
* into the CalpontResultFactory, which outputs a CalpontResultSet.
|
|
||||||
* The ExecutionPlanManager passes the CalpontResultSet into the
|
|
||||||
* VendorResultFactory which produces a result set tailored to the
|
|
||||||
* specific DBMS front end in use. The ExecutionPlanManager then
|
|
||||||
* sends the VendorResultSet back to the Calpont Database Connector
|
|
||||||
* on the Front-End Processor where it is returned to the DBMS
|
|
||||||
* front-end.
|
|
||||||
*/
|
|
||||||
#include <iostream>
|
|
||||||
#include <cstdint>
|
|
||||||
#include <csignal>
|
|
||||||
#include <sys/resource.h>
|
|
||||||
|
|
||||||
#undef root_name
|
|
||||||
#include <boost/filesystem.hpp>
|
|
||||||
|
|
||||||
#include "calpontselectexecutionplan.h"
|
|
||||||
#include "mcsanalyzetableexecutionplan.h"
|
|
||||||
#include "activestatementcounter.h"
|
|
||||||
#include "distributedenginecomm.h"
|
|
||||||
#include "resourcemanager.h"
|
|
||||||
#include "configcpp.h"
|
|
||||||
#include "queryteleserverparms.h"
|
|
||||||
#include "iosocket.h"
|
|
||||||
#include "joblist.h"
|
|
||||||
#include "joblistfactory.h"
|
|
||||||
#include "oamcache.h"
|
|
||||||
#include "simplecolumn.h"
|
|
||||||
#include "bytestream.h"
|
|
||||||
#include "telestats.h"
|
|
||||||
#include "messageobj.h"
|
|
||||||
#include "messagelog.h"
|
|
||||||
#include "sqllogger.h"
|
|
||||||
#include "femsghandler.h"
|
|
||||||
#include "idberrorinfo.h"
|
|
||||||
#include "MonitorProcMem.h"
|
|
||||||
#include "liboamcpp.h"
|
|
||||||
#include "crashtrace.h"
|
|
||||||
#include "service.h"
|
|
||||||
|
|
||||||
#include <mutex>
|
|
||||||
#include <thread>
|
|
||||||
#include <condition_variable>
|
|
||||||
|
|
||||||
#include "dbrm.h"
|
|
||||||
|
|
||||||
#include "mariadb_my_sys.h"
|
|
||||||
#include "statistics.h"
|
|
||||||
#include "serviceexemgr.h"
|
|
||||||
|
|
||||||
int main(int argc, char* argv[])
|
|
||||||
{
|
|
||||||
opterr = 0;
|
|
||||||
exemgr::Opt opt(argc, argv);
|
|
||||||
|
|
||||||
// Set locale language
|
|
||||||
setlocale(LC_ALL, "");
|
|
||||||
setlocale(LC_NUMERIC, "C");
|
|
||||||
|
|
||||||
// This is unset due to the way we start it
|
|
||||||
// program_invocation_short_name = const_cast<char*>("ExeMgr");
|
|
||||||
|
|
||||||
// Initialize the charset library
|
|
||||||
MY_INIT(argv[0]);
|
|
||||||
// global ptr defined in serviceexemgr.cpp
|
|
||||||
exemgr::globServiceExeMgr = new exemgr::ServiceExeMgr(opt);
|
|
||||||
return exemgr::globServiceExeMgr->Run();
|
|
||||||
}
|
|
||||||
|
|
@ -1,143 +0,0 @@
|
|||||||
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
||||||
|
|
||||||
This program is free software; you can redistribute it and/or
|
|
||||||
modify it under the terms of the GNU General Public License
|
|
||||||
as published by the Free Software Foundation; version 2 of
|
|
||||||
the License.
|
|
||||||
|
|
||||||
This program is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with this program; if not, write to the Free Software
|
|
||||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
||||||
MA 02110-1301, USA. */
|
|
||||||
|
|
||||||
#include "messagequeue.h"
|
|
||||||
#include "iosocket.h"
|
|
||||||
|
|
||||||
#include "femsghandler.h"
|
|
||||||
|
|
||||||
using namespace std;
|
|
||||||
using namespace joblist;
|
|
||||||
using namespace messageqcpp;
|
|
||||||
|
|
||||||
threadpool::ThreadPool FEMsgHandler::threadPool;
|
|
||||||
|
|
||||||
namespace
|
|
||||||
{
|
|
||||||
class Runner
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
Runner(FEMsgHandler* f) : target(f)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
void operator()()
|
|
||||||
{
|
|
||||||
target->threadFcn();
|
|
||||||
}
|
|
||||||
FEMsgHandler* target;
|
|
||||||
};
|
|
||||||
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
FEMsgHandler::FEMsgHandler() : die(false), running(false), sawData(false), sock(NULL)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
FEMsgHandler::FEMsgHandler(boost::shared_ptr<JobList> j, IOSocket* s)
|
|
||||||
: die(false), running(false), sawData(false), jl(j)
|
|
||||||
{
|
|
||||||
sock = s;
|
|
||||||
assert(sock);
|
|
||||||
}
|
|
||||||
|
|
||||||
FEMsgHandler::~FEMsgHandler()
|
|
||||||
{
|
|
||||||
stop();
|
|
||||||
threadPool.join(thr);
|
|
||||||
}
|
|
||||||
|
|
||||||
void FEMsgHandler::start()
|
|
||||||
{
|
|
||||||
if (!running)
|
|
||||||
{
|
|
||||||
running = true;
|
|
||||||
thr = threadPool.invoke(Runner(this));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void FEMsgHandler::stop()
|
|
||||||
{
|
|
||||||
die = true;
|
|
||||||
jl.reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
void FEMsgHandler::setJobList(boost::shared_ptr<JobList> j)
|
|
||||||
{
|
|
||||||
jl = j;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FEMsgHandler::setSocket(IOSocket* i)
|
|
||||||
{
|
|
||||||
sock = i;
|
|
||||||
assert(sock);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Note, the next two fcns strongly depend on ExeMgr's current implementation. There's a
|
|
||||||
* good chance that if ExeMgr's table send loop is changed, these will need to be
|
|
||||||
* updated to match.
|
|
||||||
*/
|
|
||||||
|
|
||||||
/* This is currently only called if InetStreamSocket::write() throws, implying
|
|
||||||
* a connection error. It might not make sense in other contexts.
|
|
||||||
*/
|
|
||||||
bool FEMsgHandler::aborted()
|
|
||||||
{
|
|
||||||
if (sawData)
|
|
||||||
return true;
|
|
||||||
|
|
||||||
boost::mutex::scoped_lock sl(mutex);
|
|
||||||
int err;
|
|
||||||
int connectionNum = sock->getConnectionNum();
|
|
||||||
|
|
||||||
err = InetStreamSocket::pollConnection(connectionNum, 1000);
|
|
||||||
|
|
||||||
if (err == 1)
|
|
||||||
{
|
|
||||||
sawData = true;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FEMsgHandler::threadFcn()
|
|
||||||
{
|
|
||||||
int err = 0;
|
|
||||||
int connectionNum = sock->getConnectionNum();
|
|
||||||
|
|
||||||
/* This waits for the next readable event on sock. An abort is signaled
|
|
||||||
* by sending something (anything at the moment), then dropping the connection.
|
|
||||||
* This fcn exits on all other events.
|
|
||||||
*/
|
|
||||||
while (!die && err == 0)
|
|
||||||
{
|
|
||||||
boost::mutex::scoped_lock sl(mutex);
|
|
||||||
err = InetStreamSocket::pollConnection(connectionNum, 1000);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (err == 1)
|
|
||||||
sawData = true; // there's data to read, must be the abort signal
|
|
||||||
|
|
||||||
if (!die && (err == 2 || err == 1))
|
|
||||||
{
|
|
||||||
die = true;
|
|
||||||
jl->abort();
|
|
||||||
jl.reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
running = false;
|
|
||||||
}
|
|
@ -1,47 +0,0 @@
|
|||||||
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
||||||
|
|
||||||
This program is free software; you can redistribute it and/or
|
|
||||||
modify it under the terms of the GNU General Public License
|
|
||||||
as published by the Free Software Foundation; version 2 of
|
|
||||||
the License.
|
|
||||||
|
|
||||||
This program is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with this program; if not, write to the Free Software
|
|
||||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
||||||
MA 02110-1301, USA. */
|
|
||||||
|
|
||||||
#pragma once
|
|
||||||
|
|
||||||
#include "joblist.h"
|
|
||||||
#include "inetstreamsocket.h"
|
|
||||||
#include "threadpool.h"
|
|
||||||
|
|
||||||
class FEMsgHandler
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
FEMsgHandler();
|
|
||||||
FEMsgHandler(boost::shared_ptr<joblist::JobList>, messageqcpp::IOSocket*);
|
|
||||||
virtual ~FEMsgHandler();
|
|
||||||
|
|
||||||
void start();
|
|
||||||
void stop();
|
|
||||||
void setJobList(boost::shared_ptr<joblist::JobList>);
|
|
||||||
void setSocket(messageqcpp::IOSocket*);
|
|
||||||
bool aborted();
|
|
||||||
|
|
||||||
void threadFcn();
|
|
||||||
|
|
||||||
static threadpool::ThreadPool threadPool;
|
|
||||||
|
|
||||||
private:
|
|
||||||
bool die, running, sawData;
|
|
||||||
messageqcpp::IOSocket* sock;
|
|
||||||
boost::shared_ptr<joblist::JobList> jl;
|
|
||||||
boost::mutex mutex;
|
|
||||||
uint64_t thr;
|
|
||||||
};
|
|
@ -1,69 +0,0 @@
|
|||||||
/* Copyright (C) 2022 MariaDB Corporation
|
|
||||||
|
|
||||||
This program is free software; you can redistribute it and/or
|
|
||||||
modify it under the terms of the GNU General Public License
|
|
||||||
as published by the Free Software Foundation; version 2 of
|
|
||||||
the License.
|
|
||||||
|
|
||||||
This program is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with this program; if not, write to the Free Software
|
|
||||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
||||||
MA 02110-1301, USA. */
|
|
||||||
|
|
||||||
#include <thread>
|
|
||||||
|
|
||||||
#include "rssmonfcn.h"
|
|
||||||
#include "serviceexemgr.h"
|
|
||||||
|
|
||||||
namespace exemgr
|
|
||||||
{
|
|
||||||
void RssMonFcn::operator()() const
|
|
||||||
{
|
|
||||||
logging::Logger& msgLog = globServiceExeMgr->getLogger();
|
|
||||||
auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount();
|
|
||||||
for (;;)
|
|
||||||
{
|
|
||||||
size_t rssMb = rss();
|
|
||||||
size_t pct = rssMb * 100 / fMemTotal;
|
|
||||||
|
|
||||||
if (pct > fMaxPct)
|
|
||||||
{
|
|
||||||
if (fMaxPct >= 95)
|
|
||||||
{
|
|
||||||
std::cerr << "Too much memory allocated!" << std::endl;
|
|
||||||
logging::Message::Args args;
|
|
||||||
args.add((int)pct);
|
|
||||||
args.add((int)fMaxPct);
|
|
||||||
msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logRssTooBig, args, logging::LoggingID(16));
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (statementsRunningCount->cur() == 0)
|
|
||||||
{
|
|
||||||
std::cerr << "Too much memory allocated!" << std::endl;
|
|
||||||
logging::Message::Args args;
|
|
||||||
args.add((int)pct);
|
|
||||||
args.add((int)fMaxPct);
|
|
||||||
msgLog.logMessage(logging::LOG_TYPE_WARNING, ServiceExeMgr::logRssTooBig, args, logging::LoggingID(16));
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::cerr << "Too much memory allocated, but stmts running" << std::endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update sessionMemMap entries lower than current mem % use
|
|
||||||
globServiceExeMgr->updateSessionMap(pct);
|
|
||||||
|
|
||||||
pause_();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
void startRssMon(size_t maxPct, int pauseSeconds)
|
|
||||||
{
|
|
||||||
new std::thread(RssMonFcn(maxPct, pauseSeconds));
|
|
||||||
}
|
|
||||||
} // namespace
|
|
@ -1,36 +0,0 @@
|
|||||||
/* Copyright (C) 2022 MariaDB Corporation
|
|
||||||
|
|
||||||
This program is free software; you can redistribute it and/or
|
|
||||||
modify it under the terms of the GNU General Public License
|
|
||||||
as published by the Free Software Foundation; version 2 of
|
|
||||||
the License.
|
|
||||||
|
|
||||||
This program is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with this program; if not, write to the Free Software
|
|
||||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
||||||
MA 02110-1301, USA. */
|
|
||||||
|
|
||||||
#pragma once
|
|
||||||
|
|
||||||
#include <iostream>
|
|
||||||
|
|
||||||
#include "MonitorProcMem.h"
|
|
||||||
|
|
||||||
namespace exemgr
|
|
||||||
{
|
|
||||||
class RssMonFcn : public utils::MonitorProcMem
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
RssMonFcn(size_t maxPct, int pauseSeconds) : MonitorProcMem(maxPct, 0, 21, pauseSeconds)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void operator()() const;
|
|
||||||
};
|
|
||||||
|
|
||||||
} // namespace
|
|
@ -1,475 +0,0 @@
|
|||||||
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
||||||
Copyright (C) 2019-22 MariaDB Corporation
|
|
||||||
|
|
||||||
This program is free software; you can redistribute it and/or
|
|
||||||
modify it under the terms of the GNU General Public License
|
|
||||||
as published by the Free Software Foundation; version 2 of
|
|
||||||
the License.
|
|
||||||
|
|
||||||
This program is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with this program; if not, write to the Free Software
|
|
||||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
||||||
MA 02110-1301, USA. */
|
|
||||||
|
|
||||||
/**********************************************************************
|
|
||||||
* $Id: main.cpp 1000 2013-07-24 21:05:51Z pleblanc $
|
|
||||||
*
|
|
||||||
*
|
|
||||||
***********************************************************************/
|
|
||||||
/**
|
|
||||||
* @brief execution plan manager main program
|
|
||||||
*
|
|
||||||
* This is the ?main? program for dealing with execution plans and
|
|
||||||
* result sets. It sits in a loop waiting for CalpontExecutionPlan
|
|
||||||
* from the FEP. It then passes the CalpontExecutionPlan to the
|
|
||||||
* JobListFactory from which a JobList is obtained. This is passed
|
|
||||||
* to to the Query Manager running in the real-time portion of the
|
|
||||||
* EC. The ExecutionPlanManager waits until the Query Manager
|
|
||||||
* returns a result set for the job list. These results are passed
|
|
||||||
* into the CalpontResultFactory, which outputs a CalpontResultSet.
|
|
||||||
* The ExecutionPlanManager passes the CalpontResultSet into the
|
|
||||||
* VendorResultFactory which produces a result set tailored to the
|
|
||||||
* specific DBMS front end in use. The ExecutionPlanManager then
|
|
||||||
* sends the VendorResultSet back to the Calpont Database Connector
|
|
||||||
* on the Front-End Processor where it is returned to the DBMS
|
|
||||||
* front-end.
|
|
||||||
*/
|
|
||||||
#include <iostream>
|
|
||||||
#include <cstdint>
|
|
||||||
#include <csignal>
|
|
||||||
#include <sys/resource.h>
|
|
||||||
|
|
||||||
#undef root_name
|
|
||||||
#include <boost/filesystem.hpp>
|
|
||||||
|
|
||||||
#include "calpontselectexecutionplan.h"
|
|
||||||
#include "mcsanalyzetableexecutionplan.h"
|
|
||||||
#include "activestatementcounter.h"
|
|
||||||
#include "distributedenginecomm.h"
|
|
||||||
#include "resourcemanager.h"
|
|
||||||
#include "configcpp.h"
|
|
||||||
#include "queryteleserverparms.h"
|
|
||||||
#include "iosocket.h"
|
|
||||||
#include "joblist.h"
|
|
||||||
#include "joblistfactory.h"
|
|
||||||
#include "oamcache.h"
|
|
||||||
#include "simplecolumn.h"
|
|
||||||
#include "bytestream.h"
|
|
||||||
#include "telestats.h"
|
|
||||||
#include "messageobj.h"
|
|
||||||
#include "messagelog.h"
|
|
||||||
#include "sqllogger.h"
|
|
||||||
#include "femsghandler.h"
|
|
||||||
#include "idberrorinfo.h"
|
|
||||||
#include "MonitorProcMem.h"
|
|
||||||
#include "liboamcpp.h"
|
|
||||||
#include "crashtrace.h"
|
|
||||||
#include "service.h"
|
|
||||||
|
|
||||||
#include <mutex>
|
|
||||||
#include <thread>
|
|
||||||
#include <condition_variable>
|
|
||||||
|
|
||||||
#include "dbrm.h"
|
|
||||||
|
|
||||||
#include "mariadb_my_sys.h"
|
|
||||||
#include "statistics.h"
|
|
||||||
#include "serviceexemgr.h"
|
|
||||||
#include "sqlfrontsessionthread.h"
|
|
||||||
|
|
||||||
|
|
||||||
namespace exemgr
|
|
||||||
{
|
|
||||||
ServiceExeMgr* globServiceExeMgr = nullptr;
|
|
||||||
void startRssMon(size_t maxPct, int pauseSeconds);
|
|
||||||
|
|
||||||
void added_a_pm(int)
|
|
||||||
{
|
|
||||||
logging::LoggingID logid(21, 0, 0);
|
|
||||||
logging::Message::Args args1;
|
|
||||||
logging::Message msg(1);
|
|
||||||
args1.add("exeMgr caught SIGHUP. Resetting connections");
|
|
||||||
msg.format(args1);
|
|
||||||
std::cout << msg.msg().c_str() << std::endl;
|
|
||||||
logging::Logger logger(logid.fSubsysID);
|
|
||||||
logger.logMessage(logging::LOG_TYPE_DEBUG, msg, logid);
|
|
||||||
|
|
||||||
auto* dec = exemgr::globServiceExeMgr->getDec();
|
|
||||||
if (dec)
|
|
||||||
{
|
|
||||||
oam::OamCache* oamCache = oam::OamCache::makeOamCache();
|
|
||||||
oamCache->forceReload();
|
|
||||||
dec->Setup();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void printTotalUmMemory(int sig)
|
|
||||||
{
|
|
||||||
int64_t num = globServiceExeMgr->getRm().availableMemory();
|
|
||||||
std::cout << "Total UM memory available: " << num << std::endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
void ServiceExeMgr::setupSignalHandlers()
|
|
||||||
{
|
|
||||||
struct sigaction ign;
|
|
||||||
|
|
||||||
memset(&ign, 0, sizeof(ign));
|
|
||||||
ign.sa_handler = SIG_IGN;
|
|
||||||
|
|
||||||
sigaction(SIGPIPE, &ign, 0);
|
|
||||||
|
|
||||||
memset(&ign, 0, sizeof(ign));
|
|
||||||
ign.sa_handler = exemgr::added_a_pm;
|
|
||||||
sigaction(SIGHUP, &ign, 0);
|
|
||||||
ign.sa_handler = exemgr::printTotalUmMemory;
|
|
||||||
sigaction(SIGUSR1, &ign, 0);
|
|
||||||
memset(&ign, 0, sizeof(ign));
|
|
||||||
ign.sa_handler = fatalHandler;
|
|
||||||
sigaction(SIGSEGV, &ign, 0);
|
|
||||||
sigaction(SIGABRT, &ign, 0);
|
|
||||||
sigaction(SIGFPE, &ign, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
void cleanTempDir()
|
|
||||||
{
|
|
||||||
using TempDirPurpose = config::Config::TempDirPurpose;
|
|
||||||
struct Dirs
|
|
||||||
{
|
|
||||||
std::string section;
|
|
||||||
std::string allowed;
|
|
||||||
TempDirPurpose purpose;
|
|
||||||
};
|
|
||||||
std::vector<Dirs> dirs{{"HashJoin", "AllowDiskBasedJoin", TempDirPurpose::Joins},
|
|
||||||
{"RowAggregation", "AllowDiskBasedAggregation", TempDirPurpose::Aggregates}};
|
|
||||||
const auto config = config::Config::makeConfig();
|
|
||||||
|
|
||||||
for (const auto& dir : dirs)
|
|
||||||
{
|
|
||||||
std::string allowStr = config->getConfig(dir.section, dir.allowed);
|
|
||||||
bool allow = (allowStr == "Y" || allowStr == "y");
|
|
||||||
|
|
||||||
std::string tmpPrefix = config->getTempFileDir(dir.purpose);
|
|
||||||
|
|
||||||
if (allow && tmpPrefix.empty())
|
|
||||||
{
|
|
||||||
std::cerr << "Empty tmp directory name for " << dir.section << std::endl;
|
|
||||||
logging::LoggingID logid(16, 0, 0);
|
|
||||||
logging::Message::Args args;
|
|
||||||
logging::Message message(8);
|
|
||||||
args.add("Empty tmp directory name for:");
|
|
||||||
args.add(dir.section);
|
|
||||||
message.format(args);
|
|
||||||
logging::Logger logger(logid.fSubsysID);
|
|
||||||
logger.logMessage(logging::LOG_TYPE_CRITICAL, message, logid);
|
|
||||||
}
|
|
||||||
|
|
||||||
tmpPrefix += "/";
|
|
||||||
|
|
||||||
idbassert(tmpPrefix != "/");
|
|
||||||
|
|
||||||
/* This is quite scary as ExeMgr usually runs as root */
|
|
||||||
try
|
|
||||||
{
|
|
||||||
if (allow)
|
|
||||||
{
|
|
||||||
boost::filesystem::remove_all(tmpPrefix);
|
|
||||||
}
|
|
||||||
boost::filesystem::create_directories(tmpPrefix);
|
|
||||||
}
|
|
||||||
catch (const std::exception& ex)
|
|
||||||
{
|
|
||||||
std::cerr << ex.what() << std::endl;
|
|
||||||
logging::LoggingID logid(16, 0, 0);
|
|
||||||
logging::Message::Args args;
|
|
||||||
logging::Message message(8);
|
|
||||||
args.add("Exception whilst cleaning tmpdir: ");
|
|
||||||
args.add(ex.what());
|
|
||||||
message.format(args);
|
|
||||||
logging::Logger logger(logid.fSubsysID);
|
|
||||||
logger.logMessage(logging::LOG_TYPE_WARNING, message, logid);
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
std::cerr << "Caught unknown exception during tmpdir cleanup" << std::endl;
|
|
||||||
logging::LoggingID logid(16, 0, 0);
|
|
||||||
logging::Message::Args args;
|
|
||||||
logging::Message message(8);
|
|
||||||
args.add("Unknown exception whilst cleaning tmpdir");
|
|
||||||
message.format(args);
|
|
||||||
logging::Logger logger(logid.fSubsysID);
|
|
||||||
logger.logMessage(logging::LOG_TYPE_WARNING, message, logid);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const std::string ServiceExeMgr::prettyPrintMiniInfo(const std::string& in)
|
|
||||||
{
|
|
||||||
// 1. take the std::string and tok it by '\n'
|
|
||||||
// 2. for each part in each line calc the longest part
|
|
||||||
// 3. padding to each longest value, output a header and the lines
|
|
||||||
typedef boost::tokenizer<boost::char_separator<char> > my_tokenizer;
|
|
||||||
boost::char_separator<char> sep1("\n");
|
|
||||||
my_tokenizer tok1(in, sep1);
|
|
||||||
std::vector<std::string> lines;
|
|
||||||
std::string header = "Desc Mode Table TableOID ReferencedColumns PIO LIO PBE Elapsed Rows";
|
|
||||||
const int header_parts = 10;
|
|
||||||
lines.push_back(header);
|
|
||||||
|
|
||||||
for (my_tokenizer::const_iterator iter1 = tok1.begin(); iter1 != tok1.end(); ++iter1)
|
|
||||||
{
|
|
||||||
if (!iter1->empty())
|
|
||||||
lines.push_back(*iter1);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<unsigned> lens;
|
|
||||||
|
|
||||||
for (int i = 0; i < header_parts; i++)
|
|
||||||
lens.push_back(0);
|
|
||||||
|
|
||||||
std::vector<std::vector<std::string> > lineparts;
|
|
||||||
std::vector<std::string>::iterator iter2;
|
|
||||||
int j;
|
|
||||||
|
|
||||||
for (iter2 = lines.begin(), j = 0; iter2 != lines.end(); ++iter2, j++)
|
|
||||||
{
|
|
||||||
boost::char_separator<char> sep2(" ");
|
|
||||||
my_tokenizer tok2(*iter2, sep2);
|
|
||||||
int i;
|
|
||||||
std::vector<std::string> parts;
|
|
||||||
my_tokenizer::iterator iter3;
|
|
||||||
|
|
||||||
for (iter3 = tok2.begin(), i = 0; iter3 != tok2.end(); ++iter3, i++)
|
|
||||||
{
|
|
||||||
if (i >= header_parts)
|
|
||||||
break;
|
|
||||||
|
|
||||||
std::string part(*iter3);
|
|
||||||
|
|
||||||
if (j != 0 && i == 8)
|
|
||||||
part.resize(part.size() - 3);
|
|
||||||
|
|
||||||
assert(i < header_parts);
|
|
||||||
|
|
||||||
if (part.size() > lens[i])
|
|
||||||
lens[i] = part.size();
|
|
||||||
|
|
||||||
parts.push_back(part);
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(i == header_parts);
|
|
||||||
lineparts.push_back(parts);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::ostringstream oss;
|
|
||||||
|
|
||||||
std::vector<std::vector<std::string> >::iterator iter1 = lineparts.begin();
|
|
||||||
std::vector<std::vector<std::string> >::iterator end1 = lineparts.end();
|
|
||||||
|
|
||||||
oss << "\n";
|
|
||||||
|
|
||||||
while (iter1 != end1)
|
|
||||||
{
|
|
||||||
std::vector<std::string>::iterator iter2 = iter1->begin();
|
|
||||||
std::vector<std::string>::iterator end2 = iter1->end();
|
|
||||||
assert(distance(iter2, end2) == header_parts);
|
|
||||||
int i = 0;
|
|
||||||
|
|
||||||
while (iter2 != end2)
|
|
||||||
{
|
|
||||||
assert(i < header_parts);
|
|
||||||
oss << std::setw(lens[i]) << std::left << *iter2 << " ";
|
|
||||||
++iter2;
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
|
|
||||||
oss << "\n";
|
|
||||||
++iter1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return oss.str();
|
|
||||||
}
|
|
||||||
|
|
||||||
int ServiceExeMgr::Child()
|
|
||||||
{
|
|
||||||
// Make sure CSC thinks it's on a UM or else bucket reuse stuff below will stall
|
|
||||||
if (!m_e)
|
|
||||||
setenv("CALPONT_CSC_IDENT", "um", 1);
|
|
||||||
|
|
||||||
setupSignalHandlers();
|
|
||||||
int err = 0;
|
|
||||||
if (!m_debug)
|
|
||||||
err = setupResources();
|
|
||||||
std::string errMsg;
|
|
||||||
|
|
||||||
switch (err)
|
|
||||||
{
|
|
||||||
case -1:
|
|
||||||
case -3: errMsg = "Error getting file limits, please see non-root install documentation"; break;
|
|
||||||
|
|
||||||
case -2: errMsg = "Error setting file limits, please see non-root install documentation"; break;
|
|
||||||
|
|
||||||
case -4:
|
|
||||||
errMsg = "Could not install file limits to required value, please see non-root install documentation";
|
|
||||||
break;
|
|
||||||
|
|
||||||
default: errMsg = "Couldn't change working directory or unknown error"; break;
|
|
||||||
}
|
|
||||||
|
|
||||||
err = setupCwd();
|
|
||||||
|
|
||||||
if (err < 0)
|
|
||||||
{
|
|
||||||
oam::Oam oam;
|
|
||||||
logging::Message::Args args;
|
|
||||||
logging::Message message;
|
|
||||||
args.add(errMsg);
|
|
||||||
message.format(args);
|
|
||||||
logging::LoggingID lid(16);
|
|
||||||
logging::MessageLog ml(lid);
|
|
||||||
ml.logCriticalMessage(message);
|
|
||||||
std::cerr << errMsg << std::endl;
|
|
||||||
|
|
||||||
NotifyServiceInitializationFailed();
|
|
||||||
return 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanTempDir();
|
|
||||||
|
|
||||||
logging::MsgMap msgMap;
|
|
||||||
msgMap[logDefaultMsg] = logging::Message(logDefaultMsg);
|
|
||||||
msgMap[logDbProfStartStatement] = logging::Message(logDbProfStartStatement);
|
|
||||||
msgMap[logDbProfEndStatement] = logging::Message(logDbProfEndStatement);
|
|
||||||
msgMap[logStartSql] = logging::Message(logStartSql);
|
|
||||||
msgMap[logEndSql] = logging::Message(logEndSql);
|
|
||||||
msgMap[logRssTooBig] = logging::Message(logRssTooBig);
|
|
||||||
msgMap[logDbProfQueryStats] = logging::Message(logDbProfQueryStats);
|
|
||||||
msgMap[logExeMgrExcpt] = logging::Message(logExeMgrExcpt);
|
|
||||||
msgLog_.msgMap(msgMap);
|
|
||||||
|
|
||||||
dec_ = joblist::DistributedEngineComm::instance(rm_, true);
|
|
||||||
dec_->Open();
|
|
||||||
|
|
||||||
bool tellUser = true;
|
|
||||||
|
|
||||||
messageqcpp::MessageQueueServer* mqs;
|
|
||||||
|
|
||||||
statementsRunningCount_ = new ActiveStatementCounter(rm_->getEmExecQueueSize());
|
|
||||||
const std::string ExeMgr = "ExeMgr1";
|
|
||||||
for (;;)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
mqs = new messageqcpp::MessageQueueServer(ExeMgr, rm_->getConfig(), messageqcpp::ByteStream::BlockSize, 64);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
catch (std::runtime_error& re)
|
|
||||||
{
|
|
||||||
std::string what = re.what();
|
|
||||||
|
|
||||||
if (what.find("Address already in use") != std::string::npos)
|
|
||||||
{
|
|
||||||
if (tellUser)
|
|
||||||
{
|
|
||||||
std::cerr << "Address already in use, retrying..." << std::endl;
|
|
||||||
tellUser = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
sleep(5);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// class jobstepThreadPool is used by other processes. We can't call
|
|
||||||
// resourcemanaager (rm) functions during the static creation of threadpool
|
|
||||||
// because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton).
|
|
||||||
// From the pools perspective, it has no idea if it is ExeMgr doing the
|
|
||||||
// creation, so it has no idea which way to set the flag. So we set the max here.
|
|
||||||
joblist::JobStep::jobstepThreadPool.setMaxThreads(rm_->getJLThreadPoolSize());
|
|
||||||
joblist::JobStep::jobstepThreadPool.setName("ExeMgrJobList");
|
|
||||||
|
|
||||||
if (rm_->getJlThreadPoolDebug() == "Y" || rm_->getJlThreadPoolDebug() == "y")
|
|
||||||
{
|
|
||||||
joblist::JobStep::jobstepThreadPool.setDebug(true);
|
|
||||||
joblist::JobStep::jobstepThreadPool.invoke(
|
|
||||||
threadpool::ThreadPoolMonitor(&joblist::JobStep::jobstepThreadPool));
|
|
||||||
}
|
|
||||||
|
|
||||||
int serverThreads = rm_->getEmServerThreads();
|
|
||||||
int maxPct = rm_->getEmMaxPct();
|
|
||||||
int pauseSeconds = rm_->getEmSecondsBetweenMemChecks();
|
|
||||||
int priority = rm_->getEmPriority();
|
|
||||||
|
|
||||||
FEMsgHandler::threadPool.setMaxThreads(serverThreads);
|
|
||||||
FEMsgHandler::threadPool.setName("FEMsgHandler");
|
|
||||||
|
|
||||||
if (maxPct > 0)
|
|
||||||
{
|
|
||||||
// Defined in rssmonfcn.cpp
|
|
||||||
exemgr::startRssMon(maxPct, pauseSeconds);
|
|
||||||
}
|
|
||||||
|
|
||||||
setpriority(PRIO_PROCESS, 0, priority);
|
|
||||||
|
|
||||||
std::string teleServerHost(rm_->getConfig()->getConfig("QueryTele", "Host"));
|
|
||||||
|
|
||||||
if (!teleServerHost.empty())
|
|
||||||
{
|
|
||||||
int teleServerPort = toInt(rm_->getConfig()->getConfig("QueryTele", "Port"));
|
|
||||||
|
|
||||||
if (teleServerPort > 0)
|
|
||||||
{
|
|
||||||
teleServerParms_ = querytele::QueryTeleServerParms(teleServerHost, teleServerPort);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
NotifyServiceStarted();
|
|
||||||
|
|
||||||
std::cout << "Starting ExeMgr: st = " << serverThreads << ", qs = " << rm_->getEmExecQueueSize()
|
|
||||||
<< ", mx = " << maxPct << ", cf = " << rm_->getConfig()->configFile() << std::endl;
|
|
||||||
|
|
||||||
{
|
|
||||||
BRM::DBRM* dbrm = new BRM::DBRM();
|
|
||||||
dbrm->setSystemQueryReady(true);
|
|
||||||
delete dbrm;
|
|
||||||
}
|
|
||||||
|
|
||||||
threadpool::ThreadPool exeMgrThreadPool(serverThreads, 0);
|
|
||||||
exeMgrThreadPool.setName("ExeMgrServer");
|
|
||||||
|
|
||||||
if (rm_->getExeMgrThreadPoolDebug() == "Y" || rm_->getExeMgrThreadPoolDebug() == "y")
|
|
||||||
{
|
|
||||||
exeMgrThreadPool.setDebug(true);
|
|
||||||
exeMgrThreadPool.invoke(threadpool::ThreadPoolMonitor(&exeMgrThreadPool));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load statistics.
|
|
||||||
try
|
|
||||||
{
|
|
||||||
statistics::StatisticsManager::instance()->loadFromFile();
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
std::cerr << "Cannot load statistics from file " << std::endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (;;)
|
|
||||||
{
|
|
||||||
messageqcpp::IOSocket ios;
|
|
||||||
ios = mqs->accept();
|
|
||||||
exeMgrThreadPool.invoke(exemgr::SQLFrontSessionThread(ios, dec_, rm_));
|
|
||||||
}
|
|
||||||
|
|
||||||
exeMgrThreadPool.wait();
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
} // namespace
|
|
@ -1,343 +0,0 @@
|
|||||||
/* Copyright (C) 2022 Mariadb Corporation.
|
|
||||||
|
|
||||||
This program is free software; you can redistribute it and/or
|
|
||||||
modify it under the terms of the GNU General Public License
|
|
||||||
as published by the Free Software Foundation; version 2 of
|
|
||||||
the License.
|
|
||||||
|
|
||||||
This program is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with this program; if not, write to the Free Software
|
|
||||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
||||||
MA 02110-1301, USA. */
|
|
||||||
|
|
||||||
#pragma once
|
|
||||||
|
|
||||||
#include <iostream>
|
|
||||||
#include <cstdint>
|
|
||||||
#include <csignal>
|
|
||||||
#include <sys/resource.h>
|
|
||||||
|
|
||||||
#undef root_name
|
|
||||||
#include <boost/filesystem.hpp>
|
|
||||||
|
|
||||||
#include "calpontselectexecutionplan.h"
|
|
||||||
#include "mcsanalyzetableexecutionplan.h"
|
|
||||||
#include "activestatementcounter.h"
|
|
||||||
#include "distributedenginecomm.h"
|
|
||||||
#include "resourcemanager.h"
|
|
||||||
#include "configcpp.h"
|
|
||||||
#include "queryteleserverparms.h"
|
|
||||||
#include "iosocket.h"
|
|
||||||
#include "joblist.h"
|
|
||||||
#include "joblistfactory.h"
|
|
||||||
#include "oamcache.h"
|
|
||||||
#include "simplecolumn.h"
|
|
||||||
#include "bytestream.h"
|
|
||||||
#include "telestats.h"
|
|
||||||
#include "messageobj.h"
|
|
||||||
#include "messagelog.h"
|
|
||||||
#include "sqllogger.h"
|
|
||||||
#include "femsghandler.h"
|
|
||||||
#include "idberrorinfo.h"
|
|
||||||
#include "MonitorProcMem.h"
|
|
||||||
#include "liboamcpp.h"
|
|
||||||
#include "crashtrace.h"
|
|
||||||
#include "service.h"
|
|
||||||
|
|
||||||
#include <mutex>
|
|
||||||
#include <thread>
|
|
||||||
#include <condition_variable>
|
|
||||||
|
|
||||||
#include "dbrm.h"
|
|
||||||
|
|
||||||
#include "mariadb_my_sys.h"
|
|
||||||
#include "statistics.h"
|
|
||||||
|
|
||||||
namespace exemgr
|
|
||||||
{
|
|
||||||
class Opt
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
int m_debug;
|
|
||||||
bool m_e;
|
|
||||||
bool m_fg;
|
|
||||||
Opt(int argc, char* argv[]) : m_debug(0), m_e(false), m_fg(false)
|
|
||||||
{
|
|
||||||
int c;
|
|
||||||
while ((c = getopt(argc, argv, "edf")) != EOF)
|
|
||||||
{
|
|
||||||
switch (c)
|
|
||||||
{
|
|
||||||
case 'd': m_debug++; break;
|
|
||||||
|
|
||||||
case 'e': m_e = true; break;
|
|
||||||
|
|
||||||
case 'f': m_fg = true; break;
|
|
||||||
|
|
||||||
case '?':
|
|
||||||
default: break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Opt(): m_debug(0), m_e(false), m_fg(false) {};
|
|
||||||
int getDebugLevel() const
|
|
||||||
{
|
|
||||||
return m_debug;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
class ServiceExeMgr : public Service, public Opt
|
|
||||||
{
|
|
||||||
using SessionMemMap_t = std::map<uint32_t, size_t>;
|
|
||||||
using ThreadCntPerSessionMap_t = std::map<uint32_t, uint32_t>;
|
|
||||||
protected:
|
|
||||||
void log(logging::LOG_TYPE type, const std::string& str)
|
|
||||||
{
|
|
||||||
logging::LoggingID logid(16);
|
|
||||||
logging::Message::Args args;
|
|
||||||
logging::Message message(8);
|
|
||||||
args.add(strerror(errno));
|
|
||||||
message.format(args);
|
|
||||||
logging::Logger logger(logid.fSubsysID);
|
|
||||||
logger.logMessage(type, message, logid);
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
|
||||||
ServiceExeMgr(const Opt& opt) : Service("ExeMgr"), Opt(opt), msgLog_(logging::Logger(16))
|
|
||||||
{
|
|
||||||
bool runningWithExeMgr = true;
|
|
||||||
rm_ = joblist::ResourceManager::instance(runningWithExeMgr);
|
|
||||||
}
|
|
||||||
void LogErrno() override
|
|
||||||
{
|
|
||||||
log(logging::LOG_TYPE_CRITICAL, std::string(strerror(errno)));
|
|
||||||
}
|
|
||||||
void ParentLogChildMessage(const std::string& str) override
|
|
||||||
{
|
|
||||||
log(logging::LOG_TYPE_INFO, str);
|
|
||||||
}
|
|
||||||
int Child() override;
|
|
||||||
int Run()
|
|
||||||
{
|
|
||||||
return m_fg ? Child() : RunForking();
|
|
||||||
}
|
|
||||||
static const constexpr unsigned logDefaultMsg = logging::M0000;
|
|
||||||
static const constexpr unsigned logDbProfStartStatement = logging::M0028;
|
|
||||||
static const constexpr unsigned logDbProfEndStatement = logging::M0029;
|
|
||||||
static const constexpr unsigned logStartSql = logging::M0041;
|
|
||||||
static const constexpr unsigned logEndSql = logging::M0042;
|
|
||||||
static const constexpr unsigned logRssTooBig = logging::M0044;
|
|
||||||
static const constexpr unsigned logDbProfQueryStats = logging::M0047;
|
|
||||||
static const constexpr unsigned logExeMgrExcpt = logging::M0055;
|
|
||||||
// If any flags other than the table mode flags are set, produce output to screeen
|
|
||||||
static const constexpr uint32_t flagsWantOutput = (0xffffffff & ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH &
|
|
||||||
~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF);
|
|
||||||
logging::Logger& getLogger()
|
|
||||||
{
|
|
||||||
return msgLog_;
|
|
||||||
}
|
|
||||||
void updateSessionMap(const size_t pct)
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
|
|
||||||
|
|
||||||
for (auto mapIter = sessionMemMap_.begin(); mapIter != sessionMemMap_.end(); ++mapIter)
|
|
||||||
{
|
|
||||||
if (pct > mapIter->second)
|
|
||||||
{
|
|
||||||
mapIter->second = pct;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ThreadCntPerSessionMap_t& getThreadCntPerSessionMap()
|
|
||||||
{
|
|
||||||
return threadCntPerSessionMap_;
|
|
||||||
}
|
|
||||||
std::mutex& getThreadCntPerSessionMapMutex()
|
|
||||||
{
|
|
||||||
return threadCntPerSessionMapMutex_;
|
|
||||||
}
|
|
||||||
void initMaxMemPct(uint32_t sessionId)
|
|
||||||
{
|
|
||||||
// WIP
|
|
||||||
if (sessionId < 0x80000000)
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
|
|
||||||
auto mapIter = sessionMemMap_.find(sessionId);
|
|
||||||
|
|
||||||
if (mapIter == sessionMemMap_.end())
|
|
||||||
{
|
|
||||||
sessionMemMap_[sessionId] = 0;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
mapIter->second = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
uint64_t getMaxMemPct(const uint32_t sessionId)
|
|
||||||
{
|
|
||||||
uint64_t maxMemoryPct = 0;
|
|
||||||
// WIP
|
|
||||||
if (sessionId < 0x80000000)
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
|
|
||||||
auto mapIter = sessionMemMap_.find(sessionId);
|
|
||||||
|
|
||||||
if (mapIter != sessionMemMap_.end())
|
|
||||||
{
|
|
||||||
maxMemoryPct = (uint64_t)mapIter->second;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return maxMemoryPct;
|
|
||||||
}
|
|
||||||
void deleteMaxMemPct(uint32_t sessionId)
|
|
||||||
{
|
|
||||||
if (sessionId < 0x80000000)
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
|
|
||||||
auto mapIter = sessionMemMap_.find(sessionId);
|
|
||||||
|
|
||||||
if (mapIter != sessionMemMap_.end())
|
|
||||||
{
|
|
||||||
sessionMemMap_.erase(sessionId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//...Increment the number of threads using the specified sessionId
|
|
||||||
void incThreadCntPerSession(uint32_t sessionId)
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lk(threadCntPerSessionMapMutex_);
|
|
||||||
auto mapIter = threadCntPerSessionMap_.find(sessionId);
|
|
||||||
|
|
||||||
if (mapIter == threadCntPerSessionMap_.end())
|
|
||||||
threadCntPerSessionMap_.insert(ThreadCntPerSessionMap_t::value_type(sessionId, 1));
|
|
||||||
else
|
|
||||||
mapIter->second++;
|
|
||||||
}
|
|
||||||
//...Decrement the number of threads using the specified sessionId.
|
|
||||||
//...When the thread count for a sessionId reaches 0, the corresponding
|
|
||||||
//...CalpontSystemCatalog objects are deleted.
|
|
||||||
//...The user query and its associated catalog query have a different
|
|
||||||
//...session Id where the highest bit is flipped.
|
|
||||||
//...The object with id(sessionId | 0x80000000) cannot be removed before
|
|
||||||
//...user query session completes because the sysdata may be used for
|
|
||||||
//...debugging/stats purpose, such as result graph, etc.
|
|
||||||
void decThreadCntPerSession(uint32_t sessionId)
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lk(threadCntPerSessionMapMutex_);
|
|
||||||
auto mapIter = threadCntPerSessionMap_.find(sessionId);
|
|
||||||
|
|
||||||
if (mapIter != threadCntPerSessionMap_.end())
|
|
||||||
{
|
|
||||||
if (--mapIter->second == 0)
|
|
||||||
{
|
|
||||||
threadCntPerSessionMap_.erase(mapIter);
|
|
||||||
execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
|
|
||||||
execplan::CalpontSystemCatalog::removeCalpontSystemCatalog((sessionId ^ 0x80000000));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ActiveStatementCounter* getStatementsRunningCount()
|
|
||||||
{
|
|
||||||
return statementsRunningCount_;
|
|
||||||
}
|
|
||||||
joblist::DistributedEngineComm* getDec()
|
|
||||||
{
|
|
||||||
return dec_;
|
|
||||||
}
|
|
||||||
int toInt(const std::string& val)
|
|
||||||
{
|
|
||||||
if (val.length() == 0)
|
|
||||||
return -1;
|
|
||||||
|
|
||||||
return static_cast<int>(config::Config::fromText(val));
|
|
||||||
}
|
|
||||||
const std::string prettyPrintMiniInfo(const std::string& in);
|
|
||||||
|
|
||||||
const std::string timeNow()
|
|
||||||
{
|
|
||||||
time_t outputTime = time(0);
|
|
||||||
struct tm ltm;
|
|
||||||
char buf[32]; // ctime(3) says at least 26
|
|
||||||
size_t len = 0;
|
|
||||||
asctime_r(localtime_r(&outputTime, <m), buf);
|
|
||||||
len = strlen(buf);
|
|
||||||
|
|
||||||
if (len > 0)
|
|
||||||
--len;
|
|
||||||
|
|
||||||
if (buf[len] == '\n')
|
|
||||||
buf[len] = 0;
|
|
||||||
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
querytele::QueryTeleServerParms& getTeleServerParms()
|
|
||||||
{
|
|
||||||
return teleServerParms_;
|
|
||||||
}
|
|
||||||
joblist::ResourceManager& getRm()
|
|
||||||
{
|
|
||||||
return *rm_;
|
|
||||||
}
|
|
||||||
private:
|
|
||||||
void setupSignalHandlers();
|
|
||||||
int8_t setupCwd()
|
|
||||||
{
|
|
||||||
std::string workdir = rm_->getScWorkingDir();
|
|
||||||
int8_t rc = chdir(workdir.c_str());
|
|
||||||
|
|
||||||
if (rc < 0 || access(".", W_OK) != 0)
|
|
||||||
rc = chdir("/tmp");
|
|
||||||
|
|
||||||
return (rc < 0) ? -5 : rc;
|
|
||||||
}
|
|
||||||
int setupResources()
|
|
||||||
{
|
|
||||||
struct rlimit rlim;
|
|
||||||
|
|
||||||
if (getrlimit(RLIMIT_NOFILE, &rlim) != 0)
|
|
||||||
{
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
rlim.rlim_cur = rlim.rlim_max = 65536;
|
|
||||||
|
|
||||||
if (setrlimit(RLIMIT_NOFILE, &rlim) != 0)
|
|
||||||
{
|
|
||||||
return -2;
|
|
||||||
}
|
|
||||||
if (getrlimit(RLIMIT_NOFILE, &rlim) != 0)
|
|
||||||
{
|
|
||||||
return -3;
|
|
||||||
}
|
|
||||||
if (rlim.rlim_cur != 65536)
|
|
||||||
{
|
|
||||||
return -4;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
logging::Logger msgLog_;
|
|
||||||
SessionMemMap_t sessionMemMap_; // track memory% usage during a query
|
|
||||||
std::mutex sessionMemMapMutex_;
|
|
||||||
//...The FrontEnd may establish more than 1 connection (which results in
|
|
||||||
// more than 1 ExeMgr thread) per session. These threads will share
|
|
||||||
// the same CalpontSystemCatalog object for that session. Here, we
|
|
||||||
// define a std::map to track how many threads are sharing each session, so
|
|
||||||
// that we know when we can safely delete a CalpontSystemCatalog object
|
|
||||||
// shared by multiple threads per session.
|
|
||||||
ThreadCntPerSessionMap_t threadCntPerSessionMap_;
|
|
||||||
std::mutex threadCntPerSessionMapMutex_;
|
|
||||||
ActiveStatementCounter* statementsRunningCount_;
|
|
||||||
joblist::DistributedEngineComm* dec_;
|
|
||||||
joblist::ResourceManager* rm_;
|
|
||||||
// Its attributes are set in Child()
|
|
||||||
querytele::QueryTeleServerParms teleServerParms_;
|
|
||||||
};
|
|
||||||
extern ServiceExeMgr* globServiceExeMgr;
|
|
||||||
}
|
|
@ -1,985 +0,0 @@
|
|||||||
/* Copyright (C) 2022 Mariadb Corporation.
|
|
||||||
|
|
||||||
This program is free software; you can redistribute it and/or
|
|
||||||
modify it under the terms of the GNU General Public License
|
|
||||||
as published by the Free Software Foundation; version 2 of
|
|
||||||
the License.
|
|
||||||
|
|
||||||
This program is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with this program; if not, write to the Free Software
|
|
||||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
||||||
MA 02110-1301, USA. */
|
|
||||||
|
|
||||||
#include "sqlfrontsessionthread.h"
|
|
||||||
|
|
||||||
namespace exemgr
|
|
||||||
{
|
|
||||||
uint64_t SQLFrontSessionThread::getMaxMemPct(uint32_t sessionId)
|
|
||||||
{
|
|
||||||
return globServiceExeMgr->getMaxMemPct(sessionId);
|
|
||||||
}
|
|
||||||
void SQLFrontSessionThread::deleteMaxMemPct(uint32_t sessionId)
|
|
||||||
{
|
|
||||||
return globServiceExeMgr->deleteMaxMemPct(sessionId);
|
|
||||||
}
|
|
||||||
void SQLFrontSessionThread::incThreadCntPerSession(uint32_t sessionId)
|
|
||||||
{
|
|
||||||
return globServiceExeMgr->incThreadCntPerSession(sessionId);
|
|
||||||
}
|
|
||||||
void SQLFrontSessionThread::decThreadCntPerSession(uint32_t sessionId)
|
|
||||||
{
|
|
||||||
return globServiceExeMgr->decThreadCntPerSession(sessionId);
|
|
||||||
}
|
|
||||||
|
|
||||||
void SQLFrontSessionThread::initMaxMemPct(uint32_t sessionId)
|
|
||||||
{
|
|
||||||
return globServiceExeMgr->initMaxMemPct(sessionId);
|
|
||||||
}
|
|
||||||
//...Get and log query stats to specified output stream
|
|
||||||
const std::string SQLFrontSessionThread::formatQueryStats(
|
|
||||||
joblist::SJLP& jl, // joblist associated with query
|
|
||||||
const std::string& label, // header label to print in front of log output
|
|
||||||
bool includeNewLine, // include line breaks in query stats std::string
|
|
||||||
bool vtableModeOn, bool wantExtendedStats, uint64_t rowsReturned)
|
|
||||||
{
|
|
||||||
std::ostringstream os;
|
|
||||||
|
|
||||||
// Get stats if not already acquired for current query
|
|
||||||
if (!fStatsRetrieved)
|
|
||||||
{
|
|
||||||
if (wantExtendedStats)
|
|
||||||
{
|
|
||||||
// wait for the ei data to be written by another thread (brain-dead)
|
|
||||||
struct timespec req = {0, 250000}; // 250 usec
|
|
||||||
nanosleep(&req, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get % memory usage during current query for sessionId
|
|
||||||
jl->querySummary(wantExtendedStats);
|
|
||||||
fStats = jl->queryStats();
|
|
||||||
fStats.fMaxMemPct = getMaxMemPct(fStats.fSessionID);
|
|
||||||
fStats.fRows = rowsReturned;
|
|
||||||
fStatsRetrieved = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string queryMode;
|
|
||||||
queryMode = (vtableModeOn ? "Distributed" : "Standard");
|
|
||||||
|
|
||||||
// Log stats to specified output stream
|
|
||||||
os << label << ": MaxMemPct-" << fStats.fMaxMemPct << "; NumTempFiles-" << fStats.fNumFiles
|
|
||||||
<< "; TempFileSpace-" << roundBytes(fStats.fFileBytes) << "; ApproxPhyI/O-" << fStats.fPhyIO
|
|
||||||
<< "; CacheI/O-" << fStats.fCacheIO << "; BlocksTouched-" << fStats.fMsgRcvCnt;
|
|
||||||
|
|
||||||
if (includeNewLine)
|
|
||||||
os << std::endl << " "; // insert line break
|
|
||||||
else
|
|
||||||
os << "; "; // continue without line break
|
|
||||||
|
|
||||||
os << "PartitionBlocksEliminated-" << fStats.fCPBlocksSkipped << "; MsgBytesIn-"
|
|
||||||
<< roundBytes(fStats.fMsgBytesIn) << "; MsgBytesOut-" << roundBytes(fStats.fMsgBytesOut) << "; Mode-"
|
|
||||||
<< queryMode;
|
|
||||||
|
|
||||||
return os.str();
|
|
||||||
}
|
|
||||||
|
|
||||||
//... Round off to human readable format (KB, MB, or GB).
|
|
||||||
const std::string SQLFrontSessionThread::roundBytes(uint64_t value) const
|
|
||||||
{
|
|
||||||
const char* units[] = {"B", "KB", "MB", "GB", "TB"};
|
|
||||||
uint64_t i = 0, up = 0;
|
|
||||||
uint64_t roundedValue = value;
|
|
||||||
|
|
||||||
while (roundedValue > 1024 && i < 4)
|
|
||||||
{
|
|
||||||
up = (roundedValue & 512);
|
|
||||||
roundedValue /= 1024;
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (up)
|
|
||||||
roundedValue++;
|
|
||||||
|
|
||||||
std::ostringstream oss;
|
|
||||||
oss << roundedValue << units[i];
|
|
||||||
return oss.str();
|
|
||||||
}
|
|
||||||
|
|
||||||
//...Round off to nearest (1024*1024) MB
|
|
||||||
uint64_t SQLFrontSessionThread::roundMB(uint64_t value) const
|
|
||||||
{
|
|
||||||
uint64_t roundedValue = value >> 20;
|
|
||||||
|
|
||||||
if (value & 524288)
|
|
||||||
roundedValue++;
|
|
||||||
|
|
||||||
return roundedValue;
|
|
||||||
}
|
|
||||||
|
|
||||||
void SQLFrontSessionThread::setRMParms(const execplan::CalpontSelectExecutionPlan::RMParmVec& parms)
|
|
||||||
{
|
|
||||||
for (execplan::CalpontSelectExecutionPlan::RMParmVec::const_iterator it = parms.begin();
|
|
||||||
it != parms.end(); ++it)
|
|
||||||
{
|
|
||||||
switch (it->id)
|
|
||||||
{
|
|
||||||
case execplan::PMSMALLSIDEMEMORY:
|
|
||||||
{
|
|
||||||
globServiceExeMgr->getRm().addHJPmMaxSmallSideMap(it->sessionId, it->value);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case execplan::UMSMALLSIDEMEMORY:
|
|
||||||
{
|
|
||||||
globServiceExeMgr->getRm().addHJUmMaxSmallSideMap(it->sessionId, it->value);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
default:;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void SQLFrontSessionThread::buildSysCache(const execplan::CalpontSelectExecutionPlan& csep,
|
|
||||||
boost::shared_ptr<execplan::CalpontSystemCatalog> csc)
|
|
||||||
{
|
|
||||||
const execplan::CalpontSelectExecutionPlan::ColumnMap& colMap = csep.columnMap();
|
|
||||||
std::string schemaName;
|
|
||||||
|
|
||||||
for (auto it = colMap.begin(); it != colMap.end(); ++it)
|
|
||||||
{
|
|
||||||
const auto sc = dynamic_cast<execplan::SimpleColumn*>((it->second).get());
|
|
||||||
|
|
||||||
if (sc)
|
|
||||||
{
|
|
||||||
schemaName = sc->schemaName();
|
|
||||||
|
|
||||||
// only the first time a schema is got will actually query
|
|
||||||
// system catalog. System catalog keeps a schema name std::map.
|
|
||||||
// if a schema exists, the call getSchemaInfo returns without
|
|
||||||
// doing anything.
|
|
||||||
if (!schemaName.empty())
|
|
||||||
csc->getSchemaInfo(schemaName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
execplan::CalpontSelectExecutionPlan::SelectList::const_iterator subIt;
|
|
||||||
|
|
||||||
for (subIt = csep.derivedTableList().begin(); subIt != csep.derivedTableList().end(); ++subIt)
|
|
||||||
{
|
|
||||||
buildSysCache(*(dynamic_cast<execplan::CalpontSelectExecutionPlan*>(subIt->get())), csc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void SQLFrontSessionThread::writeCodeAndError(messageqcpp::ByteStream::quadbyte code, const std::string emsg)
|
|
||||||
{
|
|
||||||
messageqcpp::ByteStream emsgBs;
|
|
||||||
messageqcpp::ByteStream tbs;
|
|
||||||
tbs << code;
|
|
||||||
fIos.write(tbs);
|
|
||||||
emsgBs << emsg;
|
|
||||||
fIos.write(emsgBs);
|
|
||||||
}
|
|
||||||
|
|
||||||
void SQLFrontSessionThread::analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl, bool& stmtCounted)
|
|
||||||
{
|
|
||||||
auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount();
|
|
||||||
messageqcpp::ByteStream::quadbyte qb;
|
|
||||||
execplan::MCSAnalyzeTableExecutionPlan caep;
|
|
||||||
|
|
||||||
bs = fIos.read();
|
|
||||||
caep.unserialize(bs);
|
|
||||||
|
|
||||||
statementsRunningCount->incr(stmtCounted);
|
|
||||||
jl = joblist::JobListFactory::makeJobList(&caep, fRm, false, true);
|
|
||||||
|
|
||||||
// Joblist is empty.
|
|
||||||
if (jl->status() == logging::statisticsJobListEmpty)
|
|
||||||
{
|
|
||||||
if (caep.traceOn())
|
|
||||||
std::cout << "JobList is empty " << std::endl;
|
|
||||||
|
|
||||||
jl.reset();
|
|
||||||
bs.restart();
|
|
||||||
qb = ANALYZE_TABLE_SUCCESS;
|
|
||||||
bs << qb;
|
|
||||||
fIos.write(bs);
|
|
||||||
bs.reset();
|
|
||||||
statementsRunningCount->decr(stmtCounted);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers()))
|
|
||||||
{
|
|
||||||
std::cout << "fEc setup " << std::endl;
|
|
||||||
fEc->Setup();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (jl->status() == 0)
|
|
||||||
{
|
|
||||||
std::string emsg;
|
|
||||||
|
|
||||||
if (jl->putEngineComm(fEc) != 0)
|
|
||||||
throw std::runtime_error(jl->errMsg());
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
throw std::runtime_error("ExeMgr: could not build a JobList!");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Execute a joblist.
|
|
||||||
jl->doQuery();
|
|
||||||
|
|
||||||
FEMsgHandler msgHandler(jl, &fIos);
|
|
||||||
|
|
||||||
msgHandler.start();
|
|
||||||
auto rowCount = jl->projectTable(100, bs);
|
|
||||||
msgHandler.stop();
|
|
||||||
|
|
||||||
auto outRG = (static_cast<joblist::TupleJobList*>(jl.get()))->getOutputRowGroup();
|
|
||||||
|
|
||||||
if (caep.traceOn())
|
|
||||||
std::cout << "Row count " << rowCount << std::endl;
|
|
||||||
|
|
||||||
// Process `RowGroup`, increase an epoch and save statistics to the file.
|
|
||||||
auto* statisticsManager = statistics::StatisticsManager::instance();
|
|
||||||
statisticsManager->analyzeColumnKeyTypes(outRG, caep.traceOn());
|
|
||||||
statisticsManager->incEpoch();
|
|
||||||
statisticsManager->saveToFile();
|
|
||||||
|
|
||||||
// Distribute statistics across all ExeMgr clients if possible.
|
|
||||||
statistics::StatisticsDistributor::instance()->distributeStatistics();
|
|
||||||
|
|
||||||
// Send the signal back to front-end.
|
|
||||||
bs.restart();
|
|
||||||
qb = ANALYZE_TABLE_SUCCESS;
|
|
||||||
bs << qb;
|
|
||||||
fIos.write(bs);
|
|
||||||
bs.reset();
|
|
||||||
statementsRunningCount->decr(stmtCounted);
|
|
||||||
}
|
|
||||||
|
|
||||||
void SQLFrontSessionThread::analyzeTableHandleStats(messageqcpp::ByteStream& bs)
|
|
||||||
{
|
|
||||||
messageqcpp::ByteStream::quadbyte qb;
|
|
||||||
#ifdef DEBUG_STATISTICS
|
|
||||||
std::cout << "Get distributed statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl;
|
|
||||||
#endif
|
|
||||||
bs = fIos.read();
|
|
||||||
#ifdef DEBUG_STATISTICS
|
|
||||||
std::cout << "Read the hash from statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl;
|
|
||||||
#endif
|
|
||||||
uint64_t dataHashRec;
|
|
||||||
bs >> dataHashRec;
|
|
||||||
|
|
||||||
uint64_t dataHash = statistics::StatisticsManager::instance()->computeHashFromStats();
|
|
||||||
// The stats are the same.
|
|
||||||
if (dataHash == dataHashRec)
|
|
||||||
{
|
|
||||||
#ifdef DEBUG_STATISTICS
|
|
||||||
std::cout << "The hash is the same as rec hash on ExeMgr(Client) from ExeMgr(Server) " << std::endl;
|
|
||||||
#endif
|
|
||||||
qb = ANALYZE_TABLE_SUCCESS;
|
|
||||||
bs << qb;
|
|
||||||
fIos.write(bs);
|
|
||||||
bs.reset();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
bs.restart();
|
|
||||||
qb = ANALYZE_TABLE_NEED_STATS;
|
|
||||||
bs << qb;
|
|
||||||
fIos.write(bs);
|
|
||||||
|
|
||||||
bs.restart();
|
|
||||||
bs = fIos.read();
|
|
||||||
#ifdef DEBUG_STATISTICS
|
|
||||||
std::cout << "Read statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl;
|
|
||||||
#endif
|
|
||||||
statistics::StatisticsManager::instance()->unserialize(bs);
|
|
||||||
statistics::StatisticsManager::instance()->saveToFile();
|
|
||||||
|
|
||||||
#ifdef DEBUG_STATISTICS
|
|
||||||
std::cout << "Write flag on ExeMgr(Client) to ExeMgr(Server)" << std::endl;
|
|
||||||
#endif
|
|
||||||
qb = ANALYZE_TABLE_SUCCESS;
|
|
||||||
bs << qb;
|
|
||||||
fIos.write(bs);
|
|
||||||
bs.reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
void SQLFrontSessionThread::operator()()
|
|
||||||
{
|
|
||||||
messageqcpp::ByteStream bs, inbs;
|
|
||||||
execplan::CalpontSelectExecutionPlan csep;
|
|
||||||
csep.sessionID(0);
|
|
||||||
joblist::SJLP jl;
|
|
||||||
bool incSQLFrontSessionThreadCnt = true;
|
|
||||||
std::mutex jlMutex;
|
|
||||||
std::condition_variable jlCleanupDone;
|
|
||||||
int destructing = 0;
|
|
||||||
int gDebug = globServiceExeMgr->getDebugLevel();
|
|
||||||
logging::Logger& msgLog = globServiceExeMgr->getLogger();
|
|
||||||
|
|
||||||
bool selfJoin = false;
|
|
||||||
bool tryTuples = false;
|
|
||||||
bool usingTuples = false;
|
|
||||||
bool stmtCounted = false;
|
|
||||||
auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount();
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
for (;;)
|
|
||||||
{
|
|
||||||
selfJoin = false;
|
|
||||||
tryTuples = false;
|
|
||||||
usingTuples = false;
|
|
||||||
|
|
||||||
if (jl)
|
|
||||||
{
|
|
||||||
// puts the real destruction in another thread to avoid
|
|
||||||
// making the whole session wait. It can take several seconds.
|
|
||||||
std::unique_lock<std::mutex> scoped(jlMutex);
|
|
||||||
destructing++;
|
|
||||||
std::thread bgdtor(
|
|
||||||
[jl, &jlMutex, &jlCleanupDone, &destructing]
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> scoped(jlMutex);
|
|
||||||
const_cast<joblist::SJLP&>(jl).reset(); // this happens second; does real destruction
|
|
||||||
if (--destructing == 0)
|
|
||||||
jlCleanupDone.notify_one();
|
|
||||||
});
|
|
||||||
jl.reset(); // this runs first
|
|
||||||
bgdtor.detach();
|
|
||||||
}
|
|
||||||
|
|
||||||
bs = fIos.read();
|
|
||||||
|
|
||||||
if (bs.length() == 0)
|
|
||||||
{
|
|
||||||
if (gDebug > 1 || (gDebug && !csep.isInternal()))
|
|
||||||
std::cout << "### Got a close(1) for session id " << csep.sessionID() << std::endl;
|
|
||||||
|
|
||||||
// connection closed by client
|
|
||||||
fIos.close();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
else if (bs.length() < 4) // Not a CalpontSelectExecutionPlan
|
|
||||||
{
|
|
||||||
if (gDebug)
|
|
||||||
std::cout << "### Got a not-a-plan for session id " << csep.sessionID() << " with length "
|
|
||||||
<< bs.length() << std::endl;
|
|
||||||
|
|
||||||
fIos.close();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
else if (bs.length() == 4) // possible tuple flag
|
|
||||||
{
|
|
||||||
messageqcpp::ByteStream::quadbyte qb;
|
|
||||||
bs >> qb;
|
|
||||||
|
|
||||||
if (qb == 4) // UM wants new tuple i/f
|
|
||||||
{
|
|
||||||
if (gDebug)
|
|
||||||
std::cout << "### UM wants tuples" << std::endl;
|
|
||||||
|
|
||||||
tryTuples = true;
|
|
||||||
// now wait for the CSEP...
|
|
||||||
bs = fIos.read();
|
|
||||||
}
|
|
||||||
else if (qb == 5) // somebody wants stats
|
|
||||||
{
|
|
||||||
bs.restart();
|
|
||||||
qb = statementsRunningCount->cur();
|
|
||||||
bs << qb;
|
|
||||||
qb = statementsRunningCount->waiting();
|
|
||||||
bs << qb;
|
|
||||||
fIos.write(bs);
|
|
||||||
fIos.close();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
else if (qb == ANALYZE_TABLE_EXECUTE)
|
|
||||||
{
|
|
||||||
analyzeTableExecute(bs, jl, stmtCounted);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
else if (qb == ANALYZE_TABLE_REC_STATS)
|
|
||||||
{
|
|
||||||
analyzeTableHandleStats(bs);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (gDebug)
|
|
||||||
std::cout << "### Got a not-a-plan value " << qb << std::endl;
|
|
||||||
|
|
||||||
fIos.close();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
new_plan:
|
|
||||||
try
|
|
||||||
{
|
|
||||||
csep.unserialize(bs);
|
|
||||||
}
|
|
||||||
catch (logging::IDBExcept& ex)
|
|
||||||
{
|
|
||||||
// We can get here on illegal function parameter data type, e.g.
|
|
||||||
// SELECT blob_column|1 FROM t1;
|
|
||||||
statementsRunningCount->decr(stmtCounted);
|
|
||||||
writeCodeAndError(ex.errorCode(), std::string(ex.what()));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
querytele::QueryTeleStats qts;
|
|
||||||
|
|
||||||
if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT"))
|
|
||||||
{
|
|
||||||
qts.query_uuid = csep.uuid();
|
|
||||||
qts.msg_type = querytele::QueryTeleStats::QT_START;
|
|
||||||
qts.start_time = querytele::QueryTeleClient::timeNowms();
|
|
||||||
qts.query = csep.data();
|
|
||||||
qts.session_id = csep.sessionID();
|
|
||||||
qts.query_type = csep.queryType();
|
|
||||||
qts.system_name = fOamCachePtr->getSystemName();
|
|
||||||
qts.module_name = fOamCachePtr->getModuleName();
|
|
||||||
qts.local_query = csep.localQuery();
|
|
||||||
qts.schema_name = csep.schemaName();
|
|
||||||
fTeleClient.postQueryTele(qts);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (gDebug > 1 || (gDebug && !csep.isInternal()))
|
|
||||||
std::cout << "### For session id " << csep.sessionID() << ", got a CSEP" << std::endl;
|
|
||||||
|
|
||||||
setRMParms(csep.rmParms());
|
|
||||||
// Re-establish lost PP connections.
|
|
||||||
if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers()))
|
|
||||||
{
|
|
||||||
fEc->Setup();
|
|
||||||
}
|
|
||||||
// @bug 1021. try to get schema cache for a come in query.
|
|
||||||
// skip system catalog queries.
|
|
||||||
if (!csep.isInternal())
|
|
||||||
{
|
|
||||||
boost::shared_ptr<execplan::CalpontSystemCatalog> csc =
|
|
||||||
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(csep.sessionID());
|
|
||||||
buildSysCache(csep, csc);
|
|
||||||
}
|
|
||||||
|
|
||||||
// As soon as we have a session id for this thread, update the
|
|
||||||
// thread count per session; only do this once per thread.
|
|
||||||
// Mask 0x80000000 is for associate user query and csc query
|
|
||||||
if (incSQLFrontSessionThreadCnt)
|
|
||||||
{
|
|
||||||
// WIP
|
|
||||||
incThreadCntPerSession(csep.sessionID() | 0x80000000);
|
|
||||||
incSQLFrontSessionThreadCnt = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool needDbProfEndStatementMsg = false;
|
|
||||||
logging::Message::Args args;
|
|
||||||
std::string sqlText = csep.data();
|
|
||||||
logging::LoggingID li(16, csep.sessionID(), csep.txnID());
|
|
||||||
|
|
||||||
// Initialize stats for this query, including
|
|
||||||
// init sessionMemMap entry for this session to 0 memory %.
|
|
||||||
// We will need this later for traceOn() or if we receive a
|
|
||||||
// table request with qb=3 (see below). This is also recorded
|
|
||||||
// as query start time.
|
|
||||||
initStats(csep.sessionID(), sqlText);
|
|
||||||
fStats.fQueryType = csep.queryType();
|
|
||||||
|
|
||||||
// Log start and end statement if tracing is enabled. Keep in
|
|
||||||
// mind the trace flag won't be set for system catalog queries.
|
|
||||||
if (csep.traceOn())
|
|
||||||
{
|
|
||||||
args.reset();
|
|
||||||
args.add((int)csep.statementID());
|
|
||||||
args.add((int)csep.verID().currentScn);
|
|
||||||
args.add(sqlText);
|
|
||||||
msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfStartStatement, args, li);
|
|
||||||
needDbProfEndStatementMsg = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Don't log subsequent self joins after first.
|
|
||||||
if (selfJoin)
|
|
||||||
sqlText = "";
|
|
||||||
|
|
||||||
std::ostringstream oss;
|
|
||||||
oss << sqlText << "; |" << csep.schemaName() << "|";
|
|
||||||
logging::SQLLogger sqlLog(oss.str(), li);
|
|
||||||
|
|
||||||
statementsRunningCount->incr(stmtCounted);
|
|
||||||
|
|
||||||
if (tryTuples)
|
|
||||||
{
|
|
||||||
try // @bug2244: try/catch around fIos.write() calls responding to makeTupleList
|
|
||||||
{
|
|
||||||
jl = joblist::JobListFactory::makeJobList(&csep, fRm, true, true);
|
|
||||||
// assign query stats
|
|
||||||
jl->queryStats(fStats);
|
|
||||||
|
|
||||||
if ((jl->status()) == 0 && (jl->putEngineComm(fEc) == 0))
|
|
||||||
{
|
|
||||||
usingTuples = true;
|
|
||||||
|
|
||||||
// Tell the FE that we're sending tuples back, not TableBands
|
|
||||||
writeCodeAndError(0, "NOERROR");
|
|
||||||
auto tjlp = dynamic_cast<joblist::TupleJobList*>(jl.get());
|
|
||||||
assert(tjlp);
|
|
||||||
messageqcpp::ByteStream tbs;
|
|
||||||
tbs << tjlp->getOutputRowGroup();
|
|
||||||
fIos.write(tbs);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
const std::string emsg = jl->errMsg();
|
|
||||||
statementsRunningCount->decr(stmtCounted);
|
|
||||||
writeCodeAndError(jl->status(), emsg);
|
|
||||||
std::cerr << "ExeMgr: could not build a tuple joblist: " << emsg << std::endl;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (std::exception& ex)
|
|
||||||
{
|
|
||||||
std::ostringstream errMsg;
|
|
||||||
errMsg << "ExeMgr: error writing makeJoblist "
|
|
||||||
"response; "
|
|
||||||
<< ex.what();
|
|
||||||
throw std::runtime_error(errMsg.str());
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
std::ostringstream errMsg;
|
|
||||||
errMsg << "ExeMgr: unknown error writing makeJoblist "
|
|
||||||
"response; ";
|
|
||||||
throw std::runtime_error(errMsg.str());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!usingTuples)
|
|
||||||
{
|
|
||||||
if (gDebug)
|
|
||||||
std::cout << "### UM wanted tuples but it didn't work out :-(" << std::endl;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (gDebug)
|
|
||||||
std::cout << "### UM wanted tuples and we'll do our best;-)" << std::endl;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
usingTuples = false;
|
|
||||||
jl = joblist::JobListFactory::makeJobList(&csep, fRm, false, true);
|
|
||||||
|
|
||||||
if (jl->status() == 0)
|
|
||||||
{
|
|
||||||
std::string emsg;
|
|
||||||
|
|
||||||
if (jl->putEngineComm(fEc) != 0)
|
|
||||||
throw std::runtime_error(jl->errMsg());
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
throw std::runtime_error("ExeMgr: could not build a JobList!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
jl->doQuery();
|
|
||||||
|
|
||||||
execplan::CalpontSystemCatalog::OID tableOID;
|
|
||||||
bool swallowRows = false;
|
|
||||||
joblist::DeliveredTableMap tm;
|
|
||||||
uint64_t totalBytesSent = 0;
|
|
||||||
uint64_t totalRowCount = 0;
|
|
||||||
|
|
||||||
// Project each table as the FE asks for it
|
|
||||||
for (;;)
|
|
||||||
{
|
|
||||||
bs = fIos.read();
|
|
||||||
|
|
||||||
if (bs.length() == 0)
|
|
||||||
{
|
|
||||||
if (gDebug > 1 || (gDebug && !csep.isInternal()))
|
|
||||||
std::cout << "### Got a close(2) for session id " << csep.sessionID() << std::endl;
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (gDebug && bs.length() > 4)
|
|
||||||
std::cout << "### For session id " << csep.sessionID() << ", got too many bytes = " << bs.length()
|
|
||||||
<< std::endl;
|
|
||||||
|
|
||||||
// TODO: Holy crud! Can this be right?
|
|
||||||
//@bug 1444 Yes, if there is a self-join
|
|
||||||
if (bs.length() > 4)
|
|
||||||
{
|
|
||||||
selfJoin = true;
|
|
||||||
statementsRunningCount->decr(stmtCounted);
|
|
||||||
goto new_plan;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(bs.length() == 4);
|
|
||||||
|
|
||||||
messageqcpp::ByteStream::quadbyte qb;
|
|
||||||
|
|
||||||
try // @bug2244: try/catch around fIos.write() calls responding to qb command
|
|
||||||
{
|
|
||||||
bs >> qb;
|
|
||||||
|
|
||||||
if (gDebug > 1 || (gDebug && !csep.isInternal()))
|
|
||||||
std::cout << "### For session id " << csep.sessionID() << ", got a command = " << qb
|
|
||||||
<< std::endl;
|
|
||||||
|
|
||||||
if (qb == 0)
|
|
||||||
{
|
|
||||||
// No more tables, query is done
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
else if (qb == 1)
|
|
||||||
{
|
|
||||||
// super-secret flag indicating that the UM is going to scarf down all the rows in the
|
|
||||||
// query.
|
|
||||||
swallowRows = true;
|
|
||||||
tm = jl->deliveredTables();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
else if (qb == 2)
|
|
||||||
{
|
|
||||||
// UM just wants any table
|
|
||||||
assert(swallowRows);
|
|
||||||
auto iter = tm.begin();
|
|
||||||
|
|
||||||
if (iter == tm.end())
|
|
||||||
{
|
|
||||||
if (gDebug > 1 || (gDebug && !csep.isInternal()))
|
|
||||||
std::cout << "### For session id " << csep.sessionID() << ", returning end flag"
|
|
||||||
<< std::endl;
|
|
||||||
|
|
||||||
bs.restart();
|
|
||||||
bs << (messageqcpp::ByteStream::byte)1;
|
|
||||||
fIos.write(bs);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
tableOID = iter->first;
|
|
||||||
}
|
|
||||||
else if (qb == 3) // special option-UM wants job stats std::string
|
|
||||||
{
|
|
||||||
std::string statsString;
|
|
||||||
|
|
||||||
// Log stats std::string to be sent back to front end
|
|
||||||
statsString = formatQueryStats(
|
|
||||||
jl, "Query Stats", false,
|
|
||||||
!(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF),
|
|
||||||
(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), totalRowCount);
|
|
||||||
|
|
||||||
bs.restart();
|
|
||||||
bs << statsString;
|
|
||||||
|
|
||||||
if ((csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG) != 0)
|
|
||||||
{
|
|
||||||
bs << jl->extendedInfo();
|
|
||||||
bs << globServiceExeMgr->prettyPrintMiniInfo(jl->miniInfo());
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
std::string empty;
|
|
||||||
bs << empty;
|
|
||||||
bs << empty;
|
|
||||||
}
|
|
||||||
|
|
||||||
// send stats to connector for inserting to the querystats table
|
|
||||||
fStats.serialize(bs);
|
|
||||||
fIos.write(bs);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// for table mode handling
|
|
||||||
else if (qb == 4)
|
|
||||||
{
|
|
||||||
statementsRunningCount->decr(stmtCounted);
|
|
||||||
bs = fIos.read();
|
|
||||||
goto new_plan;
|
|
||||||
}
|
|
||||||
else // (qb > 3)
|
|
||||||
{
|
|
||||||
// Return table bands for the requested tableOID
|
|
||||||
tableOID = static_cast<execplan::CalpontSystemCatalog::OID>(qb);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (std::exception& ex)
|
|
||||||
{
|
|
||||||
std::ostringstream errMsg;
|
|
||||||
errMsg << "ExeMgr: error writing qb response "
|
|
||||||
"for qb cmd "
|
|
||||||
<< qb << "; " << ex.what();
|
|
||||||
throw std::runtime_error(errMsg.str());
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
std::ostringstream errMsg;
|
|
||||||
errMsg << "ExeMgr: unknown error writing qb response "
|
|
||||||
"for qb cmd "
|
|
||||||
<< qb;
|
|
||||||
throw std::runtime_error(errMsg.str());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (swallowRows)
|
|
||||||
tm.erase(tableOID);
|
|
||||||
|
|
||||||
FEMsgHandler msgHandler(jl, &fIos);
|
|
||||||
|
|
||||||
if (tableOID == 100)
|
|
||||||
msgHandler.start();
|
|
||||||
|
|
||||||
//...Loop serializing table bands projected for the tableOID
|
|
||||||
for (;;)
|
|
||||||
{
|
|
||||||
uint32_t rowCount;
|
|
||||||
|
|
||||||
rowCount = jl->projectTable(tableOID, bs);
|
|
||||||
|
|
||||||
msgHandler.stop();
|
|
||||||
|
|
||||||
if (jl->status())
|
|
||||||
{
|
|
||||||
const auto errInfo = logging::IDBErrorInfo::instance();
|
|
||||||
|
|
||||||
if (jl->errMsg().length() != 0)
|
|
||||||
bs << jl->errMsg();
|
|
||||||
else
|
|
||||||
bs << errInfo->errorMsg(jl->status());
|
|
||||||
}
|
|
||||||
|
|
||||||
try // @bug2244: try/catch around fIos.write() calls projecting rows
|
|
||||||
{
|
|
||||||
if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3)
|
|
||||||
{
|
|
||||||
// Skip the write to the front end until the last empty band. Used to time queries
|
|
||||||
// through without any front end waiting.
|
|
||||||
if (tableOID < 3000 || rowCount == 0)
|
|
||||||
fIos.write(bs);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
fIos.write(bs);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (std::exception& ex)
|
|
||||||
{
|
|
||||||
msgHandler.stop();
|
|
||||||
std::ostringstream errMsg;
|
|
||||||
errMsg << "ExeMgr: error projecting rows "
|
|
||||||
"for tableOID: "
|
|
||||||
<< tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount << "; "
|
|
||||||
<< ex.what();
|
|
||||||
jl->abort();
|
|
||||||
|
|
||||||
while (rowCount)
|
|
||||||
rowCount = jl->projectTable(tableOID, bs);
|
|
||||||
|
|
||||||
if (tableOID == 100 && msgHandler.aborted())
|
|
||||||
{
|
|
||||||
/* TODO: modularize the cleanup code, as well as
|
|
||||||
* the rest of this fcn */
|
|
||||||
|
|
||||||
decThreadCntPerSession(csep.sessionID() | 0x80000000);
|
|
||||||
statementsRunningCount->decr(stmtCounted);
|
|
||||||
fIos.close();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// std::cout << "connection drop\n";
|
|
||||||
throw std::runtime_error(errMsg.str());
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
std::ostringstream errMsg;
|
|
||||||
msgHandler.stop();
|
|
||||||
errMsg << "ExeMgr: unknown error projecting rows "
|
|
||||||
"for tableOID: "
|
|
||||||
<< tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount;
|
|
||||||
jl->abort();
|
|
||||||
|
|
||||||
while (rowCount)
|
|
||||||
rowCount = jl->projectTable(tableOID, bs);
|
|
||||||
|
|
||||||
throw std::runtime_error(errMsg.str());
|
|
||||||
}
|
|
||||||
|
|
||||||
totalRowCount += rowCount;
|
|
||||||
totalBytesSent += bs.length();
|
|
||||||
|
|
||||||
if (rowCount == 0)
|
|
||||||
{
|
|
||||||
msgHandler.stop();
|
|
||||||
// No more bands, table is done
|
|
||||||
bs.reset();
|
|
||||||
|
|
||||||
// @bug 2083 decr active statement count here for table mode.
|
|
||||||
if (!usingTuples)
|
|
||||||
statementsRunningCount->decr(stmtCounted);
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
bs.restart();
|
|
||||||
}
|
|
||||||
} // End of loop to project and serialize table bands for a table
|
|
||||||
} // End of loop to process tables
|
|
||||||
|
|
||||||
// @bug 828
|
|
||||||
if (csep.traceOn())
|
|
||||||
jl->graph(csep.sessionID());
|
|
||||||
|
|
||||||
if (needDbProfEndStatementMsg)
|
|
||||||
{
|
|
||||||
std::string ss;
|
|
||||||
std::ostringstream prefix;
|
|
||||||
prefix << "ses:" << csep.sessionID() << " Query Totals";
|
|
||||||
|
|
||||||
// Log stats std::string to standard out
|
|
||||||
ss = formatQueryStats(jl, prefix.str(), true,
|
|
||||||
!(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF),
|
|
||||||
(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG),
|
|
||||||
totalRowCount);
|
|
||||||
//@Bug 1306. Added timing info for real time tracking.
|
|
||||||
std::cout << ss << " at " << globServiceExeMgr->timeNow() << std::endl;
|
|
||||||
|
|
||||||
// log query stats to debug log file
|
|
||||||
args.reset();
|
|
||||||
args.add((int)csep.statementID());
|
|
||||||
args.add(fStats.fMaxMemPct);
|
|
||||||
args.add(fStats.fNumFiles);
|
|
||||||
args.add(fStats.fFileBytes); // log raw byte count instead of MB
|
|
||||||
args.add(fStats.fPhyIO);
|
|
||||||
args.add(fStats.fCacheIO);
|
|
||||||
args.add(fStats.fMsgRcvCnt);
|
|
||||||
args.add(fStats.fMsgBytesIn);
|
|
||||||
args.add(fStats.fMsgBytesOut);
|
|
||||||
args.add(fStats.fCPBlocksSkipped);
|
|
||||||
msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfQueryStats, args, li);
|
|
||||||
//@bug 1327
|
|
||||||
deleteMaxMemPct(csep.sessionID());
|
|
||||||
// Calling reset here, will cause joblist destructor to be
|
|
||||||
// called, which "joins" the threads. We need to do that
|
|
||||||
// here to make sure all syslogging from all the threads
|
|
||||||
// are complete; and that our logDbProfEndStatement will
|
|
||||||
// appear "last" in the syslog for this SQL statement.
|
|
||||||
// puts the real destruction in another thread to avoid
|
|
||||||
// making the whole session wait. It can take several seconds.
|
|
||||||
int stmtID = csep.statementID();
|
|
||||||
std::unique_lock<std::mutex> scoped(jlMutex);
|
|
||||||
destructing++;
|
|
||||||
std::thread bgdtor(
|
|
||||||
[jl, &jlMutex, &jlCleanupDone, stmtID, &li, &destructing, &msgLog]
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> scoped(jlMutex);
|
|
||||||
const_cast<joblist::SJLP&>(jl).reset(); // this happens second; does real destruction
|
|
||||||
logging::Message::Args args;
|
|
||||||
args.add(stmtID);
|
|
||||||
msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfEndStatement, args, li);
|
|
||||||
if (--destructing == 0)
|
|
||||||
jlCleanupDone.notify_one();
|
|
||||||
});
|
|
||||||
jl.reset(); // this happens first
|
|
||||||
bgdtor.detach();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
// delete sessionMemMap entry for this session's memory % use
|
|
||||||
deleteMaxMemPct(csep.sessionID());
|
|
||||||
|
|
||||||
std::string endtime(globServiceExeMgr->timeNow());
|
|
||||||
|
|
||||||
if ((csep.traceFlags() & globServiceExeMgr->flagsWantOutput) && (csep.sessionID() < 0x80000000))
|
|
||||||
{
|
|
||||||
std::cout << "For session " << csep.sessionID() << ": " << totalBytesSent << " bytes sent back at "
|
|
||||||
<< endtime << std::endl;
|
|
||||||
|
|
||||||
// @bug 663 - Implemented caltraceon(16) to replace the
|
|
||||||
// $FIFO_SINK compiler definition in pColStep.
|
|
||||||
// This option consumes rows in the project steps.
|
|
||||||
if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS4)
|
|
||||||
{
|
|
||||||
std::cout << std::endl;
|
|
||||||
std::cout << "**** No data returned to DM. Rows consumed "
|
|
||||||
"in ProjectSteps - caltrace(16) is on (FIFO_SINK)."
|
|
||||||
" ****"
|
|
||||||
<< std::endl;
|
|
||||||
std::cout << std::endl;
|
|
||||||
}
|
|
||||||
else if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3)
|
|
||||||
{
|
|
||||||
std::cout << std::endl;
|
|
||||||
std::cout << "**** No data returned to DM - caltrace(8) is "
|
|
||||||
"on (SWALLOW_ROWS_EXEMGR). ****"
|
|
||||||
<< std::endl;
|
|
||||||
std::cout << std::endl;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
statementsRunningCount->decr(stmtCounted);
|
|
||||||
|
|
||||||
if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT"))
|
|
||||||
{
|
|
||||||
qts.msg_type = querytele::QueryTeleStats::QT_SUMMARY;
|
|
||||||
qts.max_mem_pct = fStats.fMaxMemPct;
|
|
||||||
qts.num_files = fStats.fNumFiles;
|
|
||||||
qts.phy_io = fStats.fPhyIO;
|
|
||||||
qts.cache_io = fStats.fCacheIO;
|
|
||||||
qts.msg_rcv_cnt = fStats.fMsgRcvCnt;
|
|
||||||
qts.cp_blocks_skipped = fStats.fCPBlocksSkipped;
|
|
||||||
qts.msg_bytes_in = fStats.fMsgBytesIn;
|
|
||||||
qts.msg_bytes_out = fStats.fMsgBytesOut;
|
|
||||||
qts.rows = totalRowCount;
|
|
||||||
qts.end_time = querytele::QueryTeleClient::timeNowms();
|
|
||||||
qts.session_id = csep.sessionID();
|
|
||||||
qts.query_type = csep.queryType();
|
|
||||||
qts.query = csep.data();
|
|
||||||
qts.system_name = fOamCachePtr->getSystemName();
|
|
||||||
qts.module_name = fOamCachePtr->getModuleName();
|
|
||||||
qts.local_query = csep.localQuery();
|
|
||||||
fTeleClient.postQueryTele(qts);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Release CSC object (for sessionID) that was added by makeJobList()
|
|
||||||
// Mask 0x80000000 is for associate user query and csc query.
|
|
||||||
// (actual joblist destruction happens at the top of this loop)
|
|
||||||
decThreadCntPerSession(csep.sessionID() | 0x80000000);
|
|
||||||
}
|
|
||||||
catch (std::exception& ex)
|
|
||||||
{
|
|
||||||
decThreadCntPerSession(csep.sessionID() | 0x80000000);
|
|
||||||
statementsRunningCount->decr(stmtCounted);
|
|
||||||
std::cerr << "### ExeMgr ses:" << csep.sessionID() << " caught: " << ex.what() << std::endl;
|
|
||||||
logging::Message::Args args;
|
|
||||||
logging::LoggingID li(16, csep.sessionID(), csep.txnID());
|
|
||||||
args.add(ex.what());
|
|
||||||
msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li);
|
|
||||||
fIos.close();
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
decThreadCntPerSession(csep.sessionID() | 0x80000000);
|
|
||||||
statementsRunningCount->decr(stmtCounted);
|
|
||||||
std::cerr << "### Exception caught!" << std::endl;
|
|
||||||
logging::Message::Args args;
|
|
||||||
logging::LoggingID li(16, csep.sessionID(), csep.txnID());
|
|
||||||
args.add("ExeMgr caught unknown exception");
|
|
||||||
msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li);
|
|
||||||
fIos.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
// make sure we don't leave scope while joblists are being destroyed
|
|
||||||
std::unique_lock<std::mutex> scoped(jlMutex);
|
|
||||||
while (destructing > 0)
|
|
||||||
jlCleanupDone.wait(scoped);
|
|
||||||
}
|
|
||||||
}; // namespace exemgr
|
|
@ -1,131 +0,0 @@
|
|||||||
/* Copyright (C) 2022 Mariadb Corporation.
|
|
||||||
|
|
||||||
This program is free software; you can redistribute it and/or
|
|
||||||
modify it under the terms of the GNU General Public License
|
|
||||||
as published by the Free Software Foundation; version 2 of
|
|
||||||
the License.
|
|
||||||
|
|
||||||
This program is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with this program; if not, write to the Free Software
|
|
||||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
||||||
MA 02110-1301, USA. */
|
|
||||||
|
|
||||||
#pragma once
|
|
||||||
|
|
||||||
#include <iostream>
|
|
||||||
#include <cstdint>
|
|
||||||
#include <csignal>
|
|
||||||
#include <sys/resource.h>
|
|
||||||
|
|
||||||
#undef root_name
|
|
||||||
#include <boost/filesystem.hpp>
|
|
||||||
|
|
||||||
#include "calpontselectexecutionplan.h"
|
|
||||||
#include "mcsanalyzetableexecutionplan.h"
|
|
||||||
#include "activestatementcounter.h"
|
|
||||||
#include "distributedenginecomm.h"
|
|
||||||
#include "resourcemanager.h"
|
|
||||||
#include "configcpp.h"
|
|
||||||
#include "queryteleserverparms.h"
|
|
||||||
#include "iosocket.h"
|
|
||||||
#include "joblist.h"
|
|
||||||
#include "joblistfactory.h"
|
|
||||||
#include "oamcache.h"
|
|
||||||
#include "simplecolumn.h"
|
|
||||||
#include "bytestream.h"
|
|
||||||
#include "telestats.h"
|
|
||||||
#include "messageobj.h"
|
|
||||||
#include "messagelog.h"
|
|
||||||
#include "sqllogger.h"
|
|
||||||
#include "femsghandler.h"
|
|
||||||
#include "idberrorinfo.h"
|
|
||||||
#include "MonitorProcMem.h"
|
|
||||||
#include "liboamcpp.h"
|
|
||||||
#include "crashtrace.h"
|
|
||||||
#include "service.h"
|
|
||||||
|
|
||||||
#include <mutex>
|
|
||||||
#include <thread>
|
|
||||||
#include <condition_variable>
|
|
||||||
|
|
||||||
#include "dbrm.h"
|
|
||||||
|
|
||||||
#include "mariadb_my_sys.h"
|
|
||||||
#include "statistics.h"
|
|
||||||
#include "serviceexemgr.h"
|
|
||||||
|
|
||||||
namespace exemgr
|
|
||||||
{
|
|
||||||
class SQLFrontSessionThread
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
SQLFrontSessionThread(const messageqcpp::IOSocket& ios, joblist::DistributedEngineComm* ec,
|
|
||||||
joblist::ResourceManager* rm)
|
|
||||||
: fIos(ios)
|
|
||||||
, fEc(ec)
|
|
||||||
, fRm(rm)
|
|
||||||
, fStatsRetrieved(false)
|
|
||||||
, fTeleClient(globServiceExeMgr->getTeleServerParms())
|
|
||||||
, fOamCachePtr(oam::OamCache::makeOamCache())
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
messageqcpp::IOSocket fIos;
|
|
||||||
joblist::DistributedEngineComm* fEc;
|
|
||||||
joblist::ResourceManager* fRm;
|
|
||||||
querystats::QueryStats fStats;
|
|
||||||
|
|
||||||
// Variables used to store return stats
|
|
||||||
bool fStatsRetrieved;
|
|
||||||
|
|
||||||
querytele::QueryTeleClient fTeleClient;
|
|
||||||
|
|
||||||
oam::OamCache* fOamCachePtr; // this ptr is copyable...
|
|
||||||
|
|
||||||
//...Reinitialize stats for start of a new query
|
|
||||||
void initStats(uint32_t sessionId, std::string& sqlText)
|
|
||||||
{
|
|
||||||
initMaxMemPct(sessionId);
|
|
||||||
|
|
||||||
fStats.reset();
|
|
||||||
fStats.setStartTime();
|
|
||||||
fStats.fSessionID = sessionId;
|
|
||||||
fStats.fQuery = sqlText;
|
|
||||||
fStatsRetrieved = false;
|
|
||||||
}
|
|
||||||
//...Get % memory usage during latest query for sesssionId.
|
|
||||||
//...SessionId >= 0x80000000 is system catalog query we can ignore.
|
|
||||||
static uint64_t getMaxMemPct(uint32_t sessionId);
|
|
||||||
//...Delete sessionMemMap entry for the specified session's memory % use.
|
|
||||||
//...SessionId >= 0x80000000 is system catalog query we can ignore.
|
|
||||||
static void deleteMaxMemPct(uint32_t sessionId);
|
|
||||||
//...Get and log query stats to specified output stream
|
|
||||||
const std::string formatQueryStats(
|
|
||||||
joblist::SJLP& jl, // joblist associated with query
|
|
||||||
const std::string& label, // header label to print in front of log output
|
|
||||||
bool includeNewLine, // include line breaks in query stats std::string
|
|
||||||
bool vtableModeOn, bool wantExtendedStats, uint64_t rowsReturned);
|
|
||||||
static void incThreadCntPerSession(uint32_t sessionId);
|
|
||||||
static void decThreadCntPerSession(uint32_t sessionId);
|
|
||||||
//...Init sessionMemMap entry for specified session to 0 memory %.
|
|
||||||
//...SessionId >= 0x80000000 is system catalog query we can ignore.
|
|
||||||
static void initMaxMemPct(uint32_t sessionId);
|
|
||||||
//... Round off to human readable format (KB, MB, or GB).
|
|
||||||
const std::string roundBytes(uint64_t value) const;
|
|
||||||
void setRMParms(const execplan::CalpontSelectExecutionPlan::RMParmVec& parms);
|
|
||||||
void buildSysCache(const execplan::CalpontSelectExecutionPlan& csep,
|
|
||||||
boost::shared_ptr<execplan::CalpontSystemCatalog> csc);
|
|
||||||
void writeCodeAndError(messageqcpp::ByteStream::quadbyte code, const std::string emsg);
|
|
||||||
void analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl, bool& stmtCounted);
|
|
||||||
void analyzeTableHandleStats(messageqcpp::ByteStream& bs);
|
|
||||||
uint64_t roundMB(uint64_t value) const;
|
|
||||||
public:
|
|
||||||
void operator()();
|
|
||||||
};
|
|
||||||
}
|
|
@ -10,7 +10,6 @@ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mariadb-columnstore-start.sh.in" "${
|
|||||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-workernode.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-workernode.service" @ONLY)
|
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-workernode.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-workernode.service" @ONLY)
|
||||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-controllernode.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-controllernode.service" @ONLY)
|
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-controllernode.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-controllernode.service" @ONLY)
|
||||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-primproc.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-primproc.service" @ONLY)
|
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-primproc.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-primproc.service" @ONLY)
|
||||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-exemgr.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-exemgr.service" @ONLY)
|
|
||||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-writeengineserver.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-writeengineserver.service" @ONLY)
|
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-writeengineserver.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-writeengineserver.service" @ONLY)
|
||||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-dmlproc.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-dmlproc.service" @ONLY)
|
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-dmlproc.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-dmlproc.service" @ONLY)
|
||||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-ddlproc.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-ddlproc.service" @ONLY)
|
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-ddlproc.service.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-ddlproc.service" @ONLY)
|
||||||
@ -21,12 +20,12 @@ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-loadbrm.py.in" "${CMAKE_CURRENT_
|
|||||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-savebrm.py.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-savebrm.py" @ONLY)
|
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/mcs-savebrm.py.in" "${CMAKE_CURRENT_SOURCE_DIR}/mcs-savebrm.py" @ONLY)
|
||||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/columnstoreSyslog.in" "${CMAKE_CURRENT_SOURCE_DIR}/columnstoreSyslog" @ONLY)
|
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/columnstoreSyslog.in" "${CMAKE_CURRENT_SOURCE_DIR}/columnstoreSyslog" @ONLY)
|
||||||
|
|
||||||
install(PROGRAMS columnstore-post-install
|
install(PROGRAMS columnstore-post-install
|
||||||
columnstore-pre-uninstall
|
columnstore-pre-uninstall
|
||||||
columnstore_run.sh
|
columnstore_run.sh
|
||||||
post-mysql-install
|
post-mysql-install
|
||||||
post-mysqld-install
|
post-mysqld-install
|
||||||
columnstoreSyslogSetup.sh
|
columnstoreSyslogSetup.sh
|
||||||
mcs-stop-controllernode.sh
|
mcs-stop-controllernode.sh
|
||||||
mcs-loadbrm.py
|
mcs-loadbrm.py
|
||||||
mcs-savebrm.py
|
mcs-savebrm.py
|
||||||
@ -45,7 +44,6 @@ install(FILES mariadb-columnstore.service
|
|||||||
mcs-workernode.service
|
mcs-workernode.service
|
||||||
mcs-controllernode.service
|
mcs-controllernode.service
|
||||||
mcs-primproc.service
|
mcs-primproc.service
|
||||||
mcs-exemgr.service
|
|
||||||
mcs-writeengineserver.service
|
mcs-writeengineserver.service
|
||||||
mcs-dmlproc.service
|
mcs-dmlproc.service
|
||||||
mcs-ddlproc.service
|
mcs-ddlproc.service
|
||||||
|
@ -70,7 +70,7 @@ quiet=0
|
|||||||
stop_mysqld=0
|
stop_mysqld=0
|
||||||
if [ -z "$(pgrep -x mariadbd)" ];then
|
if [ -z "$(pgrep -x mariadbd)" ];then
|
||||||
|
|
||||||
# Startup mysqld
|
# Startup mysqld
|
||||||
systemctl cat mariadb.service > /dev/null 2>&1
|
systemctl cat mariadb.service > /dev/null 2>&1
|
||||||
if [ $? -eq 0 ] && [ $(running_systemd) -eq 0 ]; then
|
if [ $? -eq 0 ] && [ $(running_systemd) -eq 0 ]; then
|
||||||
systemctl start mariadb.service
|
systemctl start mariadb.service
|
||||||
@ -165,8 +165,6 @@ if [ $user = "root" ]; then
|
|||||||
cp @ENGINE_SUPPORTDIR@/mcs-ddlproc.service /lib/systemd/system/. >/dev/null 2>&1
|
cp @ENGINE_SUPPORTDIR@/mcs-ddlproc.service /lib/systemd/system/. >/dev/null 2>&1
|
||||||
cp @ENGINE_SUPPORTDIR@/mcs-dmlproc.service /usr/lib/systemd/system/. >/dev/null 2>&1
|
cp @ENGINE_SUPPORTDIR@/mcs-dmlproc.service /usr/lib/systemd/system/. >/dev/null 2>&1
|
||||||
cp @ENGINE_SUPPORTDIR@/mcs-dmlproc.service /lib/systemd/system/. >/dev/null 2>&1
|
cp @ENGINE_SUPPORTDIR@/mcs-dmlproc.service /lib/systemd/system/. >/dev/null 2>&1
|
||||||
cp @ENGINE_SUPPORTDIR@/mcs-exemgr.service /usr/lib/systemd/system/. >/dev/null 2>&1
|
|
||||||
cp @ENGINE_SUPPORTDIR@/mcs-exemgr.service /lib/systemd/system/. >/dev/null 2>&1
|
|
||||||
cp @ENGINE_SUPPORTDIR@/mcs-primproc.service /usr/lib/systemd/system/. >/dev/null 2>&1
|
cp @ENGINE_SUPPORTDIR@/mcs-primproc.service /usr/lib/systemd/system/. >/dev/null 2>&1
|
||||||
cp @ENGINE_SUPPORTDIR@/mcs-primproc.service /lib/systemd/system/. >/dev/null 2>&1
|
cp @ENGINE_SUPPORTDIR@/mcs-primproc.service /lib/systemd/system/. >/dev/null 2>&1
|
||||||
cp @ENGINE_SUPPORTDIR@/mcs-workernode.service /usr/lib/systemd/system/mcs-workernode@.service >/dev/null 2>&1
|
cp @ENGINE_SUPPORTDIR@/mcs-workernode.service /usr/lib/systemd/system/mcs-workernode@.service >/dev/null 2>&1
|
||||||
@ -203,7 +201,7 @@ if [ $user = "root" ]; then
|
|||||||
update-rc.d columnstore defaults 99 > /dev/null 2>&1
|
update-rc.d columnstore defaults 99 > /dev/null 2>&1
|
||||||
else
|
else
|
||||||
echo ""
|
echo ""
|
||||||
echo "Package 'systemctl', 'chkconfig' or 'update-rc.d' not installed, contact your sysadmin if you want to setup to autostart for columnstore"
|
echo "Package 'systemctl', 'chkconfig' or 'update-rc.d' not installed, contact your sysadmin if you want to setup to autostart for columnstore"
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
@ -225,7 +223,7 @@ if [ $user = "root" ]; then
|
|||||||
fi
|
fi
|
||||||
else
|
else
|
||||||
chown $user:$user @ENGINE_SYSCONFDIR@/columnstore/Columnstore.xml
|
chown $user:$user @ENGINE_SYSCONFDIR@/columnstore/Columnstore.xml
|
||||||
|
|
||||||
cat <<EOD
|
cat <<EOD
|
||||||
|
|
||||||
NOTE: For non-root install, you will need to run the following commands as root user to
|
NOTE: For non-root install, you will need to run the following commands as root user to
|
||||||
@ -261,13 +259,13 @@ else
|
|||||||
sleep 1
|
sleep 1
|
||||||
done
|
done
|
||||||
/usr/bin/mysqld_safe &
|
/usr/bin/mysqld_safe &
|
||||||
sleep 2
|
sleep 2
|
||||||
fi
|
fi
|
||||||
|
|
||||||
checkForError
|
checkForError
|
||||||
if [ $? -ne 0 ]; then
|
if [ $? -ne 0 ]; then
|
||||||
echo "There was an error installing MariaDB ColumnStore engine plugin. \
|
echo "There was an error installing MariaDB ColumnStore engine plugin. \
|
||||||
Continue to install the engine though. \
|
Continue to install the engine though. \
|
||||||
Please resolve the issues and run necessary scripts manually."
|
Please resolve the issues and run necessary scripts manually."
|
||||||
fi
|
fi
|
||||||
|
|
||||||
@ -290,18 +288,18 @@ fi
|
|||||||
if [ ! -z "$MCS_USE_S3_STORAGE" ] && [ $MCS_USE_S3_STORAGE -eq 1 ]; then
|
if [ ! -z "$MCS_USE_S3_STORAGE" ] && [ $MCS_USE_S3_STORAGE -eq 1 ]; then
|
||||||
if [ -z "$MCS_S3_BUCKET" ]; then
|
if [ -z "$MCS_S3_BUCKET" ]; then
|
||||||
echo "Environment variable \$MCS_USE_S3_STORAGE is set but there is no \$MCS_S3_BUCKET."
|
echo "Environment variable \$MCS_USE_S3_STORAGE is set but there is no \$MCS_S3_BUCKET."
|
||||||
fi
|
fi
|
||||||
if [ -z "$MCS_S3_ACCESS_KEY_ID" ] && [ -z "$MCS_S3_ROLE_NAME" ]; then
|
if [ -z "$MCS_S3_ACCESS_KEY_ID" ] && [ -z "$MCS_S3_ROLE_NAME" ]; then
|
||||||
echo "Environment variable \$MCS_USE_S3_STORAGE is set but there is no \$MCS_S3_ACCESS_KEY_ID."
|
echo "Environment variable \$MCS_USE_S3_STORAGE is set but there is no \$MCS_S3_ACCESS_KEY_ID."
|
||||||
fi
|
fi
|
||||||
if [ -z "$MCS_S3_SECRET_ACCESS_KEY" ] && [ -z "$MCS_S3_ROLE_NAME" ]; then
|
if [ -z "$MCS_S3_SECRET_ACCESS_KEY" ] && [ -z "$MCS_S3_ROLE_NAME" ]; then
|
||||||
echo "Environment variable \$MCS_USE_S3_STORAGE is set but there is no \$MCS_S3_SECRET_ACCESS_KEY."
|
echo "Environment variable \$MCS_USE_S3_STORAGE is set but there is no \$MCS_S3_SECRET_ACCESS_KEY."
|
||||||
fi
|
fi
|
||||||
if [ -z "$MCS_S3_BUCKET" ] || [[ -z "$MCS_S3_ACCESS_KEY_ID" && -z "$MCS_S3_ROLE_NAME" ]] || [[ -z "$MCS_S3_SECRET_ACCESS_KEY" && -z "$MCS_S3_ROLE_NAME" ]]; then
|
if [ -z "$MCS_S3_BUCKET" ] || [[ -z "$MCS_S3_ACCESS_KEY_ID" && -z "$MCS_S3_ROLE_NAME" ]] || [[ -z "$MCS_S3_SECRET_ACCESS_KEY" && -z "$MCS_S3_ROLE_NAME" ]]; then
|
||||||
echo "Using local storage."
|
echo "Using local storage."
|
||||||
else
|
else
|
||||||
@ENGINE_BINDIR@/mcsSetConfig -d Installation DBRootStorageType "storagemanager"
|
@ENGINE_BINDIR@/mcsSetConfig -d Installation DBRootStorageType "storagemanager"
|
||||||
@ENGINE_BINDIR@/mcsSetConfig -d StorageManager Enabled "Y"
|
@ENGINE_BINDIR@/mcsSetConfig -d StorageManager Enabled "Y"
|
||||||
@ENGINE_BINDIR@/mcsSetConfig -d SystemConfig DataFilePlugin "libcloudio.so"
|
@ENGINE_BINDIR@/mcsSetConfig -d SystemConfig DataFilePlugin "libcloudio.so"
|
||||||
sed -i "s|^service =.*|service = S3|" /etc/columnstore/storagemanager.cnf
|
sed -i "s|^service =.*|service = S3|" /etc/columnstore/storagemanager.cnf
|
||||||
if [ ! -z "$MCS_S3_REGION" ]; then
|
if [ ! -z "$MCS_S3_REGION" ]; then
|
||||||
@ -342,7 +340,7 @@ if [ ! -z "$MCS_USE_S3_STORAGE" ] && [ $MCS_USE_S3_STORAGE -eq 1 ]; then
|
|||||||
echo "After environment variables are fixed, run command: columnstore-post-install"
|
echo "After environment variables are fixed, run command: columnstore-post-install"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
||||||
#change ownership/permissions to be able to run columnstore as non-root
|
#change ownership/permissions to be able to run columnstore as non-root
|
||||||
@ -377,7 +375,7 @@ if [ $? -eq 0 ] && [ $(running_systemd) -eq 0 ]; then
|
|||||||
|
|
||||||
flock -u "$fd_lock"
|
flock -u "$fd_lock"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [ $stop_mysqld -eq 1 ];then
|
if [ $stop_mysqld -eq 1 ];then
|
||||||
# Make sure we stop mariadb since it wasn't running prior to columnstore installation
|
# Make sure we stop mariadb since it wasn't running prior to columnstore installation
|
||||||
systemctl cat mariadb.service > /dev/null 2>&1
|
systemctl cat mariadb.service > /dev/null 2>&1
|
||||||
|
@ -35,7 +35,7 @@ systemctl cat mariadb-columnstore.service > /dev/null 2>&1
|
|||||||
if [ $? -eq 0 ] && [ $(running_systemd) -eq 0 ]; then
|
if [ $? -eq 0 ] && [ $(running_systemd) -eq 0 ]; then
|
||||||
systemctl stop mariadb-columnstore >/dev/null 2>&1
|
systemctl stop mariadb-columnstore >/dev/null 2>&1
|
||||||
else
|
else
|
||||||
PROGS='PrimProc ExeMgr DMLProc DDLProc WriteEngineServer StorageManager controllernode workernode'
|
PROGS='PrimProc ExeMgr DMLProc DDLProc WriteEngineServer StorageManager controllernode workernode'
|
||||||
kill $(pidof $PROGS) > /dev/null
|
kill $(pidof $PROGS) > /dev/null
|
||||||
sleep 3
|
sleep 3
|
||||||
kill -9 $(pidof $PROGS) > /dev/null
|
kill -9 $(pidof $PROGS) > /dev/null
|
||||||
@ -43,7 +43,7 @@ else
|
|||||||
fi
|
fi
|
||||||
|
|
||||||
if [ -n "$(pgrep -x ProcMon)" ] || [ -n "$(pgrep -x ProcMgr)" ];then
|
if [ -n "$(pgrep -x ProcMon)" ] || [ -n "$(pgrep -x ProcMgr)" ];then
|
||||||
# Old system must be running, kill ProcMon/ProcMgr
|
# Old system must be running, kill ProcMon/ProcMgr
|
||||||
pkill ProcMon
|
pkill ProcMon
|
||||||
pkill ProcMgr
|
pkill ProcMgr
|
||||||
fi
|
fi
|
||||||
@ -94,8 +94,12 @@ if [ -n "$systemctl" ] && [ $(running_systemd) -eq 0 ]; then
|
|||||||
rm -f /lib/systemd/system/mcs-ddlproc.service
|
rm -f /lib/systemd/system/mcs-ddlproc.service
|
||||||
rm -f /usr/lib/systemd/system/mcs-dmlproc.service
|
rm -f /usr/lib/systemd/system/mcs-dmlproc.service
|
||||||
rm -f /lib/systemd/system/mcs-dmlproc.service
|
rm -f /lib/systemd/system/mcs-dmlproc.service
|
||||||
rm -f /usr/lib/systemd/system/mcs-exemgr.service
|
if [[ -f /usr/lib/systemd/system/mcs-exemgr.service ]]
|
||||||
rm -f /lib/systemd/system/mcs-exemgr.service
|
rm -f /usr/lib/systemd/system/mcs-exemgr.service
|
||||||
|
fi
|
||||||
|
if [[ -f /lib/systemd/system/mcs-exemgr.service ]]
|
||||||
|
rm -f /lib/systemd/system/mcs-exemgr.service
|
||||||
|
fi
|
||||||
rm -f /usr/lib/systemd/system/mcs-primproc.service
|
rm -f /usr/lib/systemd/system/mcs-primproc.service
|
||||||
rm -f /lib/systemd/system/mcs-primproc.service
|
rm -f /lib/systemd/system/mcs-primproc.service
|
||||||
rm -f /usr/lib/systemd/system/mcs-workernode@.service
|
rm -f /usr/lib/systemd/system/mcs-workernode@.service
|
||||||
@ -123,7 +127,7 @@ else
|
|||||||
updaterc=`which update-rc.d 2>/dev/null`
|
updaterc=`which update-rc.d 2>/dev/null`
|
||||||
if [ -n "$updaterc" ]; then
|
if [ -n "$updaterc" ]; then
|
||||||
update-rc.d -f columnstore remove > /dev/null 2>&1
|
update-rc.d -f columnstore remove > /dev/null 2>&1
|
||||||
rm -f /etc/init.d/columnstore > /dev/null 2>&1
|
rm -f /etc/init.d/columnstore > /dev/null 2>&1
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
|
@ -12,7 +12,6 @@ flock -n "$fd_lock" || exit 0
|
|||||||
/bin/systemctl start mcs-controllernode
|
/bin/systemctl start mcs-controllernode
|
||||||
/bin/systemctl start mcs-primproc
|
/bin/systemctl start mcs-primproc
|
||||||
/bin/systemctl start mcs-writeengineserver
|
/bin/systemctl start mcs-writeengineserver
|
||||||
/bin/systemctl start mcs-exemgr
|
|
||||||
/bin/systemctl start mcs-dmlproc
|
/bin/systemctl start mcs-dmlproc
|
||||||
/bin/systemctl start mcs-ddlproc
|
/bin/systemctl start mcs-ddlproc
|
||||||
su -s /bin/sh -c '@ENGINE_BINDIR@/dbbuilder 7' @DEFAULT_USER@ 1> @ENGINE_LOGDIR@/install/dbbuilder.log
|
su -s /bin/sh -c '@ENGINE_BINDIR@/dbbuilder 7' @DEFAULT_USER@ 1> @ENGINE_LOGDIR@/install/dbbuilder.log
|
||||||
|
@ -4,7 +4,6 @@
|
|||||||
|
|
||||||
/bin/systemctl stop mcs-dmlproc
|
/bin/systemctl stop mcs-dmlproc
|
||||||
/bin/systemctl stop mcs-ddlproc
|
/bin/systemctl stop mcs-ddlproc
|
||||||
/bin/systemctl stop mcs-exemgr
|
|
||||||
/bin/systemctl stop mcs-writeengineserver
|
/bin/systemctl stop mcs-writeengineserver
|
||||||
/bin/systemctl stop mcs-primproc
|
/bin/systemctl stop mcs-primproc
|
||||||
/bin/systemctl stop mcs-controllernode
|
/bin/systemctl stop mcs-controllernode
|
||||||
|
@ -1,23 +0,0 @@
|
|||||||
[Unit]
|
|
||||||
Description=mcs-exemgr
|
|
||||||
|
|
||||||
# restart/start mcs-exemgr on restart/start of mcs-primproc
|
|
||||||
PartOf=mcs-primproc.service
|
|
||||||
After=network.target mcs-primproc.service
|
|
||||||
|
|
||||||
[Service]
|
|
||||||
Type=oneshot
|
|
||||||
|
|
||||||
User=@DEFAULT_USER@
|
|
||||||
Group=@DEFAULT_GROUP@
|
|
||||||
LimitNOFILE=65536
|
|
||||||
LimitNPROC=65536
|
|
||||||
|
|
||||||
#ExecStartPre=/usr/bin/env bash -c "ldconfig -p | grep -m1 libjemalloc > /dev/null || echo 'Please install jemalloc to avoid ColumnStore performance degradation and unexpected service interruptions.'"
|
|
||||||
#ExecStart=/usr/bin/env bash -c "LD_PRELOAD=$(ldconfig -p | grep -m1 libjemalloc | awk '{print $1}') exec @ENGINE_BINDIR@/ExeMgr"
|
|
||||||
ExecStart=/bin/echo 'EM dummy start'
|
|
||||||
|
|
||||||
RemainAfterExit=yes
|
|
||||||
|
|
||||||
Restart=on-failure
|
|
||||||
TimeoutStopSec=2
|
|
@ -1,9 +1,9 @@
|
|||||||
[Unit]
|
[Unit]
|
||||||
Description=WriteEngineServer
|
Description=WriteEngineServer
|
||||||
|
|
||||||
# restart/stop mcs-writeengineserver on restart/stop of mcs-exemgr
|
# restart/stop mcs-writeengineserver on restart/stop of mcs-primproc
|
||||||
PartOf=mcs-exemgr.service
|
PartOf=mcs-primproc.service
|
||||||
After=network.target mcs-exemgr.service
|
After=network.target mcs-primproc.service
|
||||||
|
|
||||||
[Service]
|
[Service]
|
||||||
Type=forking
|
Type=forking
|
||||||
|
Reference in New Issue
Block a user