diff --git a/dbcon/execplan/calpontsystemcatalog.cpp b/dbcon/execplan/calpontsystemcatalog.cpp index 500f78119..2be3c6c7b 100644 --- a/dbcon/execplan/calpontsystemcatalog.cpp +++ b/dbcon/execplan/calpontsystemcatalog.cpp @@ -850,7 +850,9 @@ void CalpontSystemCatalog::getSysData_EC(CalpontSelectExecutionPlan& csep, NJLSy ResourceManager* rm = ResourceManager::instance(true); DistributedEngineComm* fEc = DistributedEngineComm::instance(rm); - SJLP jl = JobListFactory::makeJobList(&csep, rm, true); + PrimitiveServerThreadPools dummyPrimitiveServerThreadPools; + + SJLP jl = JobListFactory::makeJobList(&csep, rm, dummyPrimitiveServerThreadPools, true); //@bug 2221. Work around to prevent DMLProc crash. int retryNum = 0; @@ -864,7 +866,7 @@ void CalpontSystemCatalog::getSysData_EC(CalpontSelectExecutionPlan& csep, NJLSy #else sleep(1); #endif - jl = JobListFactory::makeJobList(&csep, rm, true); + jl = JobListFactory::makeJobList(&csep, rm, dummyPrimitiveServerThreadPools, true); retryNum++; } diff --git a/dbcon/joblist/jlf_common.h b/dbcon/joblist/jlf_common.h index 61006e9e8..1f6811f58 100644 --- a/dbcon/joblist/jlf_common.h +++ b/dbcon/joblist/jlf_common.h @@ -46,6 +46,7 @@ #include "resourcemanager.h" #include "rowgroup.h" +#include "../../primitives/primproc/primitiveserverthreadpools.h" // forward reference namespace execplan @@ -363,6 +364,8 @@ struct JobInfo // ~csep() or csep.unserialize() std::vector dynamicParseTreeVec; + PrimitiveServerThreadPools primitiveServerThreadPools; + private: // defaults okay // JobInfo(const JobInfo& rhs); diff --git a/dbcon/joblist/joblistfactory.cpp b/dbcon/joblist/joblistfactory.cpp index 26809d394..01c6fa29c 100644 --- a/dbcon/joblist/joblistfactory.cpp +++ b/dbcon/joblist/joblistfactory.cpp @@ -2004,8 +2004,9 @@ void handleException(std::exception_ptr e, JobList* jl, JobInfo& jobInfo, unsign jl = nullptr; } -SJLP makeJobList_(CalpontExecutionPlan* cplan, ResourceManager* rm, bool isExeMgr, unsigned& errCode, - string& emsg) +SJLP makeJobList_(CalpontExecutionPlan* cplan, ResourceManager* rm, + const PrimitiveServerThreadPools& primitiveServerThreadPools, bool isExeMgr, + unsigned& errCode, string& emsg) { // TODO: This part requires a proper refactoring, we have to move common methods from // `CalpontSelectExecutionPlan` to the base class. I have no idea what's a point of @@ -2042,6 +2043,7 @@ SJLP makeJobList_(CalpontExecutionPlan* cplan, ResourceManager* rm, bool isExeMg jobInfo.statementId = csep->statementID(); jobInfo.queryType = csep->queryType(); jobInfo.csc = csc; + jobInfo.primitiveServerThreadPools = primitiveServerThreadPools; // TODO: clean up the vestiges of the bool trace jobInfo.trace = csep->traceOn(); jobInfo.traceFlags = csep->traceFlags(); @@ -2292,14 +2294,15 @@ SJLP makeJobList_(CalpontExecutionPlan* cplan, ResourceManager* rm, bool isExeMg namespace joblist { /* static */ -SJLP JobListFactory::makeJobList(CalpontExecutionPlan* cplan, ResourceManager* rm, bool tryTuple, +SJLP JobListFactory::makeJobList(CalpontExecutionPlan* cplan, ResourceManager* rm, + const PrimitiveServerThreadPools& primitiveServerThreadPools, bool tryTuple, bool isExeMgr) { SJLP ret; string emsg; unsigned errCode = 0; - ret = makeJobList_(cplan, rm, isExeMgr, errCode, emsg); + ret = makeJobList_(cplan, rm, primitiveServerThreadPools, isExeMgr, errCode, emsg); if (!ret) { diff --git a/dbcon/joblist/joblistfactory.h b/dbcon/joblist/joblistfactory.h index 4f1e5a89e..fe3c14beb 100644 --- a/dbcon/joblist/joblistfactory.h +++ b/dbcon/joblist/joblistfactory.h @@ -27,6 +27,7 @@ #include #include "joblist.h" +#include "../../primitives/primproc/primitiveserverthreadpools.h" #if defined(_MSC_VER) && defined(JOBLIST_DLLEXPORT) #define EXPORT __declspec(dllexport) @@ -61,6 +62,7 @@ class JobListFactory * @param cplan the CalpontExecutionPlan from which the JobList is constructed */ EXPORT static SJLP makeJobList(execplan::CalpontExecutionPlan* cplan, ResourceManager* rm, + const PrimitiveServerThreadPools& primitiveServerThreadPools, bool tryTuple = false, bool isExeMgr = false); private: diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index e7a7791a8..90b5b42b4 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -352,6 +352,7 @@ TupleAggregateStep::TupleAggregateStep(const SP_ROWAGG_UM_t& agg, const RowGroup fExtendedInfo = "TAS: "; fQtc.stepParms().stepType = StepTeleStats::T_TAS; + fPrimitiveServerThreadPools = jobInfo.primitiveServerThreadPools; } TupleAggregateStep::~TupleAggregateStep() diff --git a/dbcon/joblist/tupleaggregatestep.h b/dbcon/joblist/tupleaggregatestep.h index b81c8497d..80d8e9da3 100644 --- a/dbcon/joblist/tupleaggregatestep.h +++ b/dbcon/joblist/tupleaggregatestep.h @@ -23,6 +23,7 @@ #include "jobstep.h" #include "rowaggregation.h" #include "threadnaming.h" +#include "../../primitives/primproc/primitiveserverthreadpools.h" #include @@ -220,6 +221,8 @@ class TupleAggregateStep : public JobStep, public TupleDeliveryStep boost::scoped_array fMemUsage; boost::shared_ptr fSessionMemLimit; + + PrimitiveServerThreadPools fPrimitiveServerThreadPools; }; } // namespace joblist diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 89c6b70f5..df3446d74 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -2412,6 +2412,8 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro // We're not using either the priority or the job-clustering features, just need a threadpool // that can reschedule jobs, and an unlimited non-blocking queue OOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1)); + // Initialize a local pointer. + fOOBPool = OOBPool; asyncCounter = 0; diff --git a/primitives/primproc/primitiveserver.h b/primitives/primproc/primitiveserver.h index c79c0f807..6421db6cf 100644 --- a/primitives/primproc/primitiveserver.h +++ b/primitives/primproc/primitiveserver.h @@ -133,6 +133,11 @@ class PrimitiveServer return fProcessorPool; } + inline boost::shared_ptr getOOBThreadPool() const + { + return fOOBPool; + } + // int fCacheCount; int ReadAheadBlocks() const { @@ -166,6 +171,7 @@ class PrimitiveServer * primitive commands */ boost::shared_ptr fProcessorPool; + boost::shared_ptr fOOBPool; int fServerThreads; int fServerQueueSize; diff --git a/primitives/primproc/primitiveserverthreadpools.h b/primitives/primproc/primitiveserverthreadpools.h new file mode 100644 index 000000000..5297adfe8 --- /dev/null +++ b/primitives/primproc/primitiveserverthreadpools.h @@ -0,0 +1,46 @@ +/* Copyright (C) 2022 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include +#include "prioritythreadpool.h" + +class PrimitiveServerThreadPools +{ + public: + PrimitiveServerThreadPools() = default; + PrimitiveServerThreadPools(boost::shared_ptr primServerThreadPool, + boost::shared_ptr OOBThreadPool) + : fPrimServerThreadPool(primServerThreadPool), fOOBThreadPool(OOBThreadPool) + { + } + + boost::shared_ptr getPrimitiveServerThreadPool() + { + return fPrimServerThreadPool; + } + + boost::shared_ptr getOOBThreadPool() + { + return fOOBThreadPool; + } + + private: + boost::shared_ptr fPrimServerThreadPool; + boost::shared_ptr fOOBThreadPool; +}; diff --git a/primitives/primproc/primproc.cpp b/primitives/primproc/primproc.cpp index 5f9bfc463..4d474a390 100644 --- a/primitives/primproc/primproc.cpp +++ b/primitives/primproc/primproc.cpp @@ -76,57 +76,6 @@ using namespace idbdatafile; #include "service.h" #include "serviceexemgr.h" -class Opt -{ - public: - int m_debug; - bool m_fg; - Opt(int argc, char* argv[]) : m_debug(0), m_fg(false) - { - int c; - - while ((c = getopt(argc, argv, "df")) != EOF) - { - switch (c) - { - case 'd': m_debug++; break; - case 'f': m_fg = true; break; - case '?': - default: break; - } - } - } -}; - -class ServicePrimProc : public Service, public Opt -{ - public: - ServicePrimProc(const Opt& opt) : Service("PrimProc"), Opt(opt) - { - } - void LogErrno() override - { - cerr << strerror(errno) << endl; - } - void ParentLogChildMessage(const std::string& str) override - { - cout << str << endl; - } - int Child() override; - int Run() - { - return m_fg ? Child() : RunForking(); - } - std::atomic_flag& getStartupRaceFlag() - { - return startupRaceFlag_; - } - - private: - // Since C++20 flag's init value is false. - std::atomic_flag startupRaceFlag_{false}; -}; - namespace primitiveprocessor { extern uint32_t BPPCount; @@ -335,6 +284,15 @@ void* waitForSIGUSR1(void* p) } // namespace +ServicePrimProc* ServicePrimProc::fInstance = nullptr; +ServicePrimProc* ServicePrimProc::instance() +{ + if (!fInstance) + fInstance = new ServicePrimProc(); + + return fInstance; +} + int ServicePrimProc::Child() { Config* cf = Config::makeConfig(); @@ -764,6 +722,9 @@ int ServicePrimProc::Child() } #endif + primServerThreadPool = server.getProcessorThreadPool(); + OOBThreadPool = server.getOOBThreadPool(); + server.start(this, startupRaceLock); cerr << "server.start() exited!" << endl; @@ -783,5 +744,6 @@ int main(int argc, char** argv) // Initialize the charset library MY_INIT(argv[0]); - return ServicePrimProc(opt).Run(); + ServicePrimProc::instance()->setOpt(opt); + return ServicePrimProc::instance()->Run(); } diff --git a/primitives/primproc/primproc.h b/primitives/primproc/primproc.h index ed98a80d3..328f004c0 100644 --- a/primitives/primproc/primproc.h +++ b/primitives/primproc/primproc.h @@ -34,6 +34,8 @@ #include #include +#include "service.h" +#include "prioritythreadpool.h" #include "pp_logger.h" namespace primitiveprocessor @@ -111,4 +113,83 @@ const int MAX_BUFFER_SIZE = 32768 * 2; // extern logging::MessageLog ml1; // extern boost::mutex logLock; extern Logger* mlp; + } // namespace primitiveprocessor + +class Opt +{ + public: + int m_debug; + bool m_fg; + Opt(int m_debug, bool m_fg) : m_debug(m_debug), m_fg(m_fg) + { + } + + Opt(int argc, char* argv[]) : m_debug(0), m_fg(false) + { + int c; + + while ((c = getopt(argc, argv, "df")) != EOF) + { + switch (c) + { + case 'd': m_debug++; break; + case 'f': m_fg = true; break; + case '?': + default: break; + } + } + } +}; + +class ServicePrimProc : public Service, public Opt +{ + public: + static ServicePrimProc* instance(); + + void setOpt(const Opt& opt) + { + m_debug = opt.m_debug; + m_fg = opt.m_fg; + } + + void LogErrno() override + { + cerr << strerror(errno) << endl; + } + + void ParentLogChildMessage(const std::string& str) override + { + cout << str << endl; + } + int Child() override; + int Run() + { + return m_fg ? Child() : RunForking(); + } + std::atomic_flag& getStartupRaceFlag() + { + return startupRaceFlag_; + } + + boost::shared_ptr getPrimitiveServerThreadPool() + { + return primServerThreadPool; + } + + boost::shared_ptr getOOBThreadPool() + { + return OOBThreadPool; + } + + private: + ServicePrimProc() : Service("PrimProc"), Opt(0, false) + { + } + + static ServicePrimProc* fInstance; + // Since C++20 flag's init value is false. + std::atomic_flag startupRaceFlag_{false}; + boost::shared_ptr primServerThreadPool; + boost::shared_ptr OOBThreadPool; +}; diff --git a/primitives/primproc/sqlfrontsessionthread.cpp b/primitives/primproc/sqlfrontsessionthread.cpp index 3b5094e7a..f3c17035b 100644 --- a/primitives/primproc/sqlfrontsessionthread.cpp +++ b/primitives/primproc/sqlfrontsessionthread.cpp @@ -16,6 +16,8 @@ MA 02110-1301, USA. */ #include "sqlfrontsessionthread.h" +#include "primproc.h" +#include "primitiveserverthreadpools.h" namespace exemgr { @@ -195,7 +197,8 @@ namespace exemgr caep.unserialize(bs); statementsRunningCount->incr(stmtCounted); - jl = joblist::JobListFactory::makeJobList(&caep, fRm, false, true); + PrimitiveServerThreadPools primitiveServerThreadPools; + jl = joblist::JobListFactory::makeJobList(&caep, fRm, primitiveServerThreadPools, false, true); // Joblist is empty. if (jl->status() == logging::statisticsJobListEmpty) @@ -516,16 +519,20 @@ namespace exemgr statementsRunningCount->incr(stmtCounted); + PrimitiveServerThreadPools primitiveServerThreadPools( + ServicePrimProc::instance()->getPrimitiveServerThreadPool(), + ServicePrimProc::instance()->getOOBThreadPool()); + if (tryTuples) { try // @bug2244: try/catch around fIos.write() calls responding to makeTupleList { - jl = joblist::JobListFactory::makeJobList(&csep, fRm, true, true); - // assign query stats - jl->queryStats(fStats); + jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, true, true); + // assign query stats + jl->queryStats(fStats); - if ((jl->status()) == 0 && (jl->putEngineComm(fEc) == 0)) - { + if ((jl->status()) == 0 && (jl->putEngineComm(fEc) == 0)) + { usingTuples = true; // Tell the FE that we're sending tuples back, not TableBands @@ -535,59 +542,59 @@ namespace exemgr messageqcpp::ByteStream tbs; tbs << tjlp->getOutputRowGroup(); fIos.write(tbs); - } - else - { + } + else + { const std::string emsg = jl->errMsg(); statementsRunningCount->decr(stmtCounted); writeCodeAndError(jl->status(), emsg); std::cerr << "ExeMgr: could not build a tuple joblist: " << emsg << std::endl; continue; - } + } } catch (std::exception& ex) { - std::ostringstream errMsg; - errMsg << "ExeMgr: error writing makeJoblist " + std::ostringstream errMsg; + errMsg << "ExeMgr: error writing makeJoblist " "response; " - << ex.what(); - throw std::runtime_error(errMsg.str()); + << ex.what(); + throw std::runtime_error(errMsg.str()); } catch (...) { - std::ostringstream errMsg; - errMsg << "ExeMgr: unknown error writing makeJoblist " + std::ostringstream errMsg; + errMsg << "ExeMgr: unknown error writing makeJoblist " "response; "; - throw std::runtime_error(errMsg.str()); + throw std::runtime_error(errMsg.str()); } if (!usingTuples) { - if (gDebug) + if (gDebug) std::cout << "### UM wanted tuples but it didn't work out :-(" << std::endl; } else { - if (gDebug) + if (gDebug) std::cout << "### UM wanted tuples and we'll do our best;-)" << std::endl; } } else { - usingTuples = false; - jl = joblist::JobListFactory::makeJobList(&csep, fRm, false, true); + usingTuples = false; + jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, false, true); - if (jl->status() == 0) - { + if (jl->status() == 0) + { std::string emsg; if (jl->putEngineComm(fEc) != 0) - throw std::runtime_error(jl->errMsg()); - } - else - { + throw std::runtime_error(jl->errMsg()); + } + else + { throw std::runtime_error("ExeMgr: could not build a JobList!"); - } + } } jl->doQuery(); @@ -982,4 +989,4 @@ namespace exemgr while (destructing > 0) jlCleanupDone.wait(scoped); } -}; // namespace exemgr \ No newline at end of file +}; // namespace exemgr diff --git a/primitives/primproc/sqlfrontsessionthread.h b/primitives/primproc/sqlfrontsessionthread.h index ca605ea79..92464a733 100644 --- a/primitives/primproc/sqlfrontsessionthread.h +++ b/primitives/primproc/sqlfrontsessionthread.h @@ -128,4 +128,4 @@ namespace exemgr public: void operator()(); }; -} \ No newline at end of file +}