1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

[MCOL-5109] Make a singleton from ServicePrimProc.

This patch makes a singleton from ServicePrimProc.
This commit is contained in:
Denis Khalikov
2022-06-01 19:54:21 +03:00
parent 94ca6fdd34
commit 467fe0b401
13 changed files with 206 additions and 88 deletions

View File

@ -850,7 +850,9 @@ void CalpontSystemCatalog::getSysData_EC(CalpontSelectExecutionPlan& csep, NJLSy
ResourceManager* rm = ResourceManager::instance(true);
DistributedEngineComm* fEc = DistributedEngineComm::instance(rm);
SJLP jl = JobListFactory::makeJobList(&csep, rm, true);
PrimitiveServerThreadPools dummyPrimitiveServerThreadPools;
SJLP jl = JobListFactory::makeJobList(&csep, rm, dummyPrimitiveServerThreadPools, true);
//@bug 2221. Work around to prevent DMLProc crash.
int retryNum = 0;
@ -864,7 +866,7 @@ void CalpontSystemCatalog::getSysData_EC(CalpontSelectExecutionPlan& csep, NJLSy
#else
sleep(1);
#endif
jl = JobListFactory::makeJobList(&csep, rm, true);
jl = JobListFactory::makeJobList(&csep, rm, dummyPrimitiveServerThreadPools, true);
retryNum++;
}

View File

@ -46,6 +46,7 @@
#include "resourcemanager.h"
#include "rowgroup.h"
#include "../../primitives/primproc/primitiveserverthreadpools.h"
// forward reference
namespace execplan
@ -363,6 +364,8 @@ struct JobInfo
// ~csep() or csep.unserialize()
std::vector<execplan::ParseTree*> dynamicParseTreeVec;
PrimitiveServerThreadPools primitiveServerThreadPools;
private:
// defaults okay
// JobInfo(const JobInfo& rhs);

View File

@ -2004,8 +2004,9 @@ void handleException(std::exception_ptr e, JobList* jl, JobInfo& jobInfo, unsign
jl = nullptr;
}
SJLP makeJobList_(CalpontExecutionPlan* cplan, ResourceManager* rm, bool isExeMgr, unsigned& errCode,
string& emsg)
SJLP makeJobList_(CalpontExecutionPlan* cplan, ResourceManager* rm,
const PrimitiveServerThreadPools& primitiveServerThreadPools, bool isExeMgr,
unsigned& errCode, string& emsg)
{
// TODO: This part requires a proper refactoring, we have to move common methods from
// `CalpontSelectExecutionPlan` to the base class. I have no idea what's a point of
@ -2042,6 +2043,7 @@ SJLP makeJobList_(CalpontExecutionPlan* cplan, ResourceManager* rm, bool isExeMg
jobInfo.statementId = csep->statementID();
jobInfo.queryType = csep->queryType();
jobInfo.csc = csc;
jobInfo.primitiveServerThreadPools = primitiveServerThreadPools;
// TODO: clean up the vestiges of the bool trace
jobInfo.trace = csep->traceOn();
jobInfo.traceFlags = csep->traceFlags();
@ -2292,14 +2294,15 @@ SJLP makeJobList_(CalpontExecutionPlan* cplan, ResourceManager* rm, bool isExeMg
namespace joblist
{
/* static */
SJLP JobListFactory::makeJobList(CalpontExecutionPlan* cplan, ResourceManager* rm, bool tryTuple,
SJLP JobListFactory::makeJobList(CalpontExecutionPlan* cplan, ResourceManager* rm,
const PrimitiveServerThreadPools& primitiveServerThreadPools, bool tryTuple,
bool isExeMgr)
{
SJLP ret;
string emsg;
unsigned errCode = 0;
ret = makeJobList_(cplan, rm, isExeMgr, errCode, emsg);
ret = makeJobList_(cplan, rm, primitiveServerThreadPools, isExeMgr, errCode, emsg);
if (!ret)
{

View File

@ -27,6 +27,7 @@
#include <string>
#include "joblist.h"
#include "../../primitives/primproc/primitiveserverthreadpools.h"
#if defined(_MSC_VER) && defined(JOBLIST_DLLEXPORT)
#define EXPORT __declspec(dllexport)
@ -61,6 +62,7 @@ class JobListFactory
* @param cplan the CalpontExecutionPlan from which the JobList is constructed
*/
EXPORT static SJLP makeJobList(execplan::CalpontExecutionPlan* cplan, ResourceManager* rm,
const PrimitiveServerThreadPools& primitiveServerThreadPools,
bool tryTuple = false, bool isExeMgr = false);
private:

View File

@ -352,6 +352,7 @@ TupleAggregateStep::TupleAggregateStep(const SP_ROWAGG_UM_t& agg, const RowGroup
fExtendedInfo = "TAS: ";
fQtc.stepParms().stepType = StepTeleStats::T_TAS;
fPrimitiveServerThreadPools = jobInfo.primitiveServerThreadPools;
}
TupleAggregateStep::~TupleAggregateStep()

View File

@ -23,6 +23,7 @@
#include "jobstep.h"
#include "rowaggregation.h"
#include "threadnaming.h"
#include "../../primitives/primproc/primitiveserverthreadpools.h"
#include <boost/thread.hpp>
@ -220,6 +221,8 @@ class TupleAggregateStep : public JobStep, public TupleDeliveryStep
boost::scoped_array<uint64_t> fMemUsage;
boost::shared_ptr<int64_t> fSessionMemLimit;
PrimitiveServerThreadPools fPrimitiveServerThreadPools;
};
} // namespace joblist

View File

@ -2412,6 +2412,8 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro
// We're not using either the priority or the job-clustering features, just need a threadpool
// that can reschedule jobs, and an unlimited non-blocking queue
OOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1));
// Initialize a local pointer.
fOOBPool = OOBPool;
asyncCounter = 0;

View File

@ -133,6 +133,11 @@ class PrimitiveServer
return fProcessorPool;
}
inline boost::shared_ptr<threadpool::PriorityThreadPool> getOOBThreadPool() const
{
return fOOBPool;
}
// int fCacheCount;
int ReadAheadBlocks() const
{
@ -166,6 +171,7 @@ class PrimitiveServer
* primitive commands
*/
boost::shared_ptr<threadpool::PriorityThreadPool> fProcessorPool;
boost::shared_ptr<threadpool::PriorityThreadPool> fOOBPool;
int fServerThreads;
int fServerQueueSize;

View File

@ -0,0 +1,46 @@
/* Copyright (C) 2022 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#pragma once
#include <boost/shared_ptr.hpp>
#include "prioritythreadpool.h"
class PrimitiveServerThreadPools
{
public:
PrimitiveServerThreadPools() = default;
PrimitiveServerThreadPools(boost::shared_ptr<threadpool::PriorityThreadPool> primServerThreadPool,
boost::shared_ptr<threadpool::PriorityThreadPool> OOBThreadPool)
: fPrimServerThreadPool(primServerThreadPool), fOOBThreadPool(OOBThreadPool)
{
}
boost::shared_ptr<threadpool::PriorityThreadPool> getPrimitiveServerThreadPool()
{
return fPrimServerThreadPool;
}
boost::shared_ptr<threadpool::PriorityThreadPool> getOOBThreadPool()
{
return fOOBThreadPool;
}
private:
boost::shared_ptr<threadpool::PriorityThreadPool> fPrimServerThreadPool;
boost::shared_ptr<threadpool::PriorityThreadPool> fOOBThreadPool;
};

View File

@ -76,57 +76,6 @@ using namespace idbdatafile;
#include "service.h"
#include "serviceexemgr.h"
class Opt
{
public:
int m_debug;
bool m_fg;
Opt(int argc, char* argv[]) : m_debug(0), m_fg(false)
{
int c;
while ((c = getopt(argc, argv, "df")) != EOF)
{
switch (c)
{
case 'd': m_debug++; break;
case 'f': m_fg = true; break;
case '?':
default: break;
}
}
}
};
class ServicePrimProc : public Service, public Opt
{
public:
ServicePrimProc(const Opt& opt) : Service("PrimProc"), Opt(opt)
{
}
void LogErrno() override
{
cerr << strerror(errno) << endl;
}
void ParentLogChildMessage(const std::string& str) override
{
cout << str << endl;
}
int Child() override;
int Run()
{
return m_fg ? Child() : RunForking();
}
std::atomic_flag& getStartupRaceFlag()
{
return startupRaceFlag_;
}
private:
// Since C++20 flag's init value is false.
std::atomic_flag startupRaceFlag_{false};
};
namespace primitiveprocessor
{
extern uint32_t BPPCount;
@ -335,6 +284,15 @@ void* waitForSIGUSR1(void* p)
} // namespace
ServicePrimProc* ServicePrimProc::fInstance = nullptr;
ServicePrimProc* ServicePrimProc::instance()
{
if (!fInstance)
fInstance = new ServicePrimProc();
return fInstance;
}
int ServicePrimProc::Child()
{
Config* cf = Config::makeConfig();
@ -764,6 +722,9 @@ int ServicePrimProc::Child()
}
#endif
primServerThreadPool = server.getProcessorThreadPool();
OOBThreadPool = server.getOOBThreadPool();
server.start(this, startupRaceLock);
cerr << "server.start() exited!" << endl;
@ -783,5 +744,6 @@ int main(int argc, char** argv)
// Initialize the charset library
MY_INIT(argv[0]);
return ServicePrimProc(opt).Run();
ServicePrimProc::instance()->setOpt(opt);
return ServicePrimProc::instance()->Run();
}

View File

@ -34,6 +34,8 @@
#include <boost/thread.hpp>
#include <map>
#include "service.h"
#include "prioritythreadpool.h"
#include "pp_logger.h"
namespace primitiveprocessor
@ -111,4 +113,83 @@ const int MAX_BUFFER_SIZE = 32768 * 2;
// extern logging::MessageLog ml1;
// extern boost::mutex logLock;
extern Logger* mlp;
} // namespace primitiveprocessor
class Opt
{
public:
int m_debug;
bool m_fg;
Opt(int m_debug, bool m_fg) : m_debug(m_debug), m_fg(m_fg)
{
}
Opt(int argc, char* argv[]) : m_debug(0), m_fg(false)
{
int c;
while ((c = getopt(argc, argv, "df")) != EOF)
{
switch (c)
{
case 'd': m_debug++; break;
case 'f': m_fg = true; break;
case '?':
default: break;
}
}
}
};
class ServicePrimProc : public Service, public Opt
{
public:
static ServicePrimProc* instance();
void setOpt(const Opt& opt)
{
m_debug = opt.m_debug;
m_fg = opt.m_fg;
}
void LogErrno() override
{
cerr << strerror(errno) << endl;
}
void ParentLogChildMessage(const std::string& str) override
{
cout << str << endl;
}
int Child() override;
int Run()
{
return m_fg ? Child() : RunForking();
}
std::atomic_flag& getStartupRaceFlag()
{
return startupRaceFlag_;
}
boost::shared_ptr<threadpool::PriorityThreadPool> getPrimitiveServerThreadPool()
{
return primServerThreadPool;
}
boost::shared_ptr<threadpool::PriorityThreadPool> getOOBThreadPool()
{
return OOBThreadPool;
}
private:
ServicePrimProc() : Service("PrimProc"), Opt(0, false)
{
}
static ServicePrimProc* fInstance;
// Since C++20 flag's init value is false.
std::atomic_flag startupRaceFlag_{false};
boost::shared_ptr<threadpool::PriorityThreadPool> primServerThreadPool;
boost::shared_ptr<threadpool::PriorityThreadPool> OOBThreadPool;
};

View File

@ -16,6 +16,8 @@
MA 02110-1301, USA. */
#include "sqlfrontsessionthread.h"
#include "primproc.h"
#include "primitiveserverthreadpools.h"
namespace exemgr
{
@ -195,7 +197,8 @@ namespace exemgr
caep.unserialize(bs);
statementsRunningCount->incr(stmtCounted);
jl = joblist::JobListFactory::makeJobList(&caep, fRm, false, true);
PrimitiveServerThreadPools primitiveServerThreadPools;
jl = joblist::JobListFactory::makeJobList(&caep, fRm, primitiveServerThreadPools, false, true);
// Joblist is empty.
if (jl->status() == logging::statisticsJobListEmpty)
@ -516,16 +519,20 @@ namespace exemgr
statementsRunningCount->incr(stmtCounted);
PrimitiveServerThreadPools primitiveServerThreadPools(
ServicePrimProc::instance()->getPrimitiveServerThreadPool(),
ServicePrimProc::instance()->getOOBThreadPool());
if (tryTuples)
{
try // @bug2244: try/catch around fIos.write() calls responding to makeTupleList
{
jl = joblist::JobListFactory::makeJobList(&csep, fRm, true, true);
// assign query stats
jl->queryStats(fStats);
jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, true, true);
// assign query stats
jl->queryStats(fStats);
if ((jl->status()) == 0 && (jl->putEngineComm(fEc) == 0))
{
if ((jl->status()) == 0 && (jl->putEngineComm(fEc) == 0))
{
usingTuples = true;
// Tell the FE that we're sending tuples back, not TableBands
@ -535,59 +542,59 @@ namespace exemgr
messageqcpp::ByteStream tbs;
tbs << tjlp->getOutputRowGroup();
fIos.write(tbs);
}
else
{
}
else
{
const std::string emsg = jl->errMsg();
statementsRunningCount->decr(stmtCounted);
writeCodeAndError(jl->status(), emsg);
std::cerr << "ExeMgr: could not build a tuple joblist: " << emsg << std::endl;
continue;
}
}
}
catch (std::exception& ex)
{
std::ostringstream errMsg;
errMsg << "ExeMgr: error writing makeJoblist "
std::ostringstream errMsg;
errMsg << "ExeMgr: error writing makeJoblist "
"response; "
<< ex.what();
throw std::runtime_error(errMsg.str());
<< ex.what();
throw std::runtime_error(errMsg.str());
}
catch (...)
{
std::ostringstream errMsg;
errMsg << "ExeMgr: unknown error writing makeJoblist "
std::ostringstream errMsg;
errMsg << "ExeMgr: unknown error writing makeJoblist "
"response; ";
throw std::runtime_error(errMsg.str());
throw std::runtime_error(errMsg.str());
}
if (!usingTuples)
{
if (gDebug)
if (gDebug)
std::cout << "### UM wanted tuples but it didn't work out :-(" << std::endl;
}
else
{
if (gDebug)
if (gDebug)
std::cout << "### UM wanted tuples and we'll do our best;-)" << std::endl;
}
}
else
{
usingTuples = false;
jl = joblist::JobListFactory::makeJobList(&csep, fRm, false, true);
usingTuples = false;
jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, false, true);
if (jl->status() == 0)
{
if (jl->status() == 0)
{
std::string emsg;
if (jl->putEngineComm(fEc) != 0)
throw std::runtime_error(jl->errMsg());
}
else
{
throw std::runtime_error(jl->errMsg());
}
else
{
throw std::runtime_error("ExeMgr: could not build a JobList!");
}
}
}
jl->doQuery();
@ -982,4 +989,4 @@ namespace exemgr
while (destructing > 0)
jlCleanupDone.wait(scoped);
}
}; // namespace exemgr
}; // namespace exemgr

View File

@ -128,4 +128,4 @@ namespace exemgr
public:
void operator()();
};
}
}