/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2016-2022 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /*********************************************************************** * $Id: primproc.cpp 2147 2013-08-14 20:44:44Z bwilkinson $ * * ***********************************************************************/ #include #include #include #ifdef QSIZE_DEBUG #include #include #endif #include #include #include #include #include #include #include #include #include #include //#define NDEBUG #include using namespace std; #include using namespace boost; #include "configcpp.h" using namespace config; #include "messageids.h" using namespace logging; #include "primproc.h" #include "primitiveserver.h" #include "MonitorProcMem.h" #include "pp_logger.h" #include "umsocketselector.h" using namespace primitiveprocessor; #include "archcheck.h" using namespace archcheck; #include "liboamcpp.h" using namespace oam; #include "IDBPolicy.h" using namespace idbdatafile; #include "cgroupconfigurator.h" #include "crashtrace.h" #include "installdir.h" #include "mariadb_my_sys.h" #include "spinlock.h" #include "service.h" #include "serviceexemgr.h" namespace primitiveprocessor { extern uint32_t BPPCount; extern uint32_t blocksReadAhead; extern uint32_t defaultBufferSize; extern uint32_t connectionsPerUM; extern uint32_t highPriorityThreads; extern uint32_t medPriorityThreads; extern uint32_t lowPriorityThreads; extern int directIOFlag; extern int noVB; DebugLevel gDebugLevel; Logger* mlp; bool isDebug(const DebugLevel level) { return level <= gDebugLevel; } } // namespace primitiveprocessor namespace { int toInt(const string& val) { if (val.length() == 0) return -1; return static_cast(Config::fromText(val)); } void setupSignalHandlers() { struct sigaction ign; memset(&ign, 0, sizeof(ign)); ign.sa_handler = SIG_IGN; sigaction(SIGUSR2, &ign, 0); sigset_t sigset; sigemptyset(&sigset); sigaddset(&sigset, SIGPIPE); sigaddset(&sigset, SIGUSR1); sigaddset(&sigset, SIGUSR2); sigprocmask(SIG_BLOCK, &sigset, 0); } int8_t setupCwd(Config* cf) { string workdir = startup::StartUp::tmpDir(); if (workdir.length() == 0) workdir = "."; int8_t rc = chdir(workdir.c_str()); if (rc < 0 || access(".", W_OK) != 0) rc = chdir("/tmp"); return rc; } int setupResources() { struct rlimit rlim; if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) { return -1; } rlim.rlim_cur = rlim.rlim_max = 65536; if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) { return -2; } if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) { return -3; } if (rlim.rlim_cur != 65536) { return -4; } return 0; } #ifdef QSIZE_DEBUG class QszMonThd { public: QszMonThd(PrimitiveServer* psp, ofstream* qszlog) : fPsp(psp), fQszLog(qszlog) { } ~QszMonThd() { } void operator()() { utils::setThreadName("QszMonThd"); for (;;) { uint32_t qd = fPsp->getProcessorThreadPool()->getWaiting(); if (fQszLog) { // Get a timestamp for output. struct tm tm; struct timeval tv; gettimeofday(&tv, 0); localtime_r(&tv.tv_sec, &tm); ostringstream oss; oss << setfill('0') << setw(2) << tm.tm_hour << ':' << setw(2) << tm.tm_min << ':' << setw(2) << tm.tm_sec << '.' << setw(4) << tv.tv_usec / 100; *fQszLog << oss.str() << ' ' << qd << endl; } struct timespec req = {0, 1000 * 100}; // 100 usec nanosleep(&req, 0); } } private: // defaults okay // QszMonThd(const QszMonThd& rhs); // QszMonThd& operator=(const QszMonThd& rhs); const PrimitiveServer* fPsp; ofstream* fQszLog; }; #endif #define DUMP_CACHE_CONTENTS #ifdef DUMP_CACHE_CONTENTS void* waitForSIGUSR1(void* p) { utils::setThreadName("waitForSIGUSR1"); #if defined(__LP64__) || defined(_MSC_VER) ptrdiff_t tmp = reinterpret_cast(p); int cacheCount = static_cast(tmp); #else int cacheCount = reinterpret_cast(p); #endif sigset_t oset; int rec_sig; int32_t rpt_state = 0; sigfillset(&oset); sigdelset(&oset, SIGUSR1); sigdelset(&oset, SIGUSR2); pthread_sigmask(SIG_SETMASK, &oset, 0); sigemptyset(&oset); sigaddset(&oset, SIGUSR1); sigaddset(&oset, SIGUSR2); for (;;) { sigwait(&oset, &rec_sig); if (rec_sig == SIGUSR1) { ostringstream out; for (int i = 0; i < cacheCount; i++) { BRPp[i]->formatLRUList(out); cout << out.str() << "###" << endl; } } else if (rec_sig == SIGUSR2) { // is reporting currently on? rpt_state = BRPp[0]->ReportingFrequency(); if (rpt_state > 0) rpt_state = 0; // turn reporting off else rpt_state = 1; // fbm will set to the value from config file for (int i = 0; i < cacheCount; i++) BRPp[i]->setReportingFrequency(rpt_state); cout << "@@@" << endl; } } return 0; } #endif } // namespace ServicePrimProc* ServicePrimProc::fInstance = nullptr; ServicePrimProc* ServicePrimProc::instance() { if (!fInstance) fInstance = new ServicePrimProc(); return fInstance; } int ServicePrimProc::Child() { Config* cf = Config::makeConfig(); setupSignalHandlers(); int err = 0; err = setupCwd(cf); // Initialize the charset library MY_INIT("PrimProc"); mlp = new primitiveprocessor::Logger(); if (!m_debug) err = setupResources(); string errMsg; switch (err) { case -1: case -3: errMsg = "Error getting file limits, please see non-root install documentation"; break; case -2: errMsg = "Error setting file limits, please see non-root install documentation"; break; case -4: errMsg = "Could not install file limits to required value, please see non-root install documentation"; break; case -5: errMsg = "Could not change into working directory"; break; default: break; } if (err < 0) { Oam oam; mlp->logMessage(errMsg); cerr << errMsg << endl; NotifyServiceInitializationFailed(); // Free up resources allocated by MY_INIT() above. my_end(0); return 2; } utils::USpaceSpinLock startupRaceLock(getStartupRaceFlag()); std::thread exeMgrThread( [this, cf]() { exemgr::Opt opt; exemgr::globServiceExeMgr = new exemgr::ServiceExeMgr(opt, cf); // primitive delay to avoid 'not connected to PM' log error messages // from EM. PrimitiveServer::start() releases SpinLock after sockets // are available. utils::USpaceSpinLock startupRaceLock(this->getStartupRaceFlag()); exemgr::globServiceExeMgr->Child(); }); int serverThreads = 1; int serverQueueSize = 10; int processorWeight = 8 * 1024; int processorQueueSize = 10 * 1024; int64_t BRPBlocksPct = 70; uint32_t BRPBlocks = 1887437; int BRPThreads = 16; int cacheCount = 1; int maxBlocksPerRead = 256; // 1MB bool rotatingDestination = false; uint32_t deleteBlocks = 128; bool PTTrace = false; string strTemp; int priority = -1; const string primitiveServers("PrimitiveServers"); const string jobListStr("JobList"); const string dbbc("DBBC"); const string ExtentMapStr("ExtentMap"); uint64_t extentRows = 8 * 1024 * 1024; uint64_t MaxExtentSize = 0; double prefetchThreshold; uint64_t PMSmallSide = 67108864; BPPCount = 16; int numCores = -1; int configNumCores = -1; uint32_t highPriorityPercentage, medPriorityPercentage, lowPriorityPercentage; utils::CGroupConfigurator cg; gDebugLevel = primitiveprocessor::NONE; int temp = toInt(cf->getConfig(primitiveServers, "ServerThreads")); if (temp > 0) serverThreads = temp; temp = toInt(cf->getConfig(primitiveServers, "ServerQueueSize")); if (temp > 0) serverQueueSize = temp; temp = toInt(cf->getConfig(primitiveServers, "ProcessorThreshold")); if (temp > 0) processorWeight = temp; temp = toInt(cf->getConfig(primitiveServers, "ProcessorQueueSize")); if (temp > 0) processorQueueSize = temp; temp = toInt(cf->getConfig(primitiveServers, "DebugLevel")); if (temp > 0) gDebugLevel = (DebugLevel)temp; highPriorityPercentage = 0; temp = toInt(cf->getConfig(primitiveServers, "HighPriorityPercentage")); if (temp >= 0) highPriorityPercentage = temp; medPriorityPercentage = 0; temp = toInt(cf->getConfig(primitiveServers, "MediumPriorityPercentage")); if (temp >= 0) medPriorityPercentage = temp; lowPriorityPercentage = 0; temp = toInt(cf->getConfig(primitiveServers, "LowPriorityPercentage")); if (temp >= 0) lowPriorityPercentage = temp; temp = toInt(cf->getConfig(ExtentMapStr, "ExtentRows")); if (temp > 0) extentRows = temp; temp = toInt(cf->getConfig(primitiveServers, "ConnectionsPerPrimProc")); if (temp > 0) connectionsPerUM = temp; else connectionsPerUM = 1; // set to smallest extent size // do not allow to read beyond the end of an extent const int MaxReadAheadSz = (extentRows) / BLOCK_SIZE; // Max size for a primitive message column buffer. Must be big enough // to accomodate 8192 values of the widest non-dict column + 8192 RIDs + ohead. defaultBufferSize = 150 * 1024; // This parm controls whether we rotate through the output sockets // when deciding where to send response messages, or whether to simply // send the response to the socket of origin. Should normally be set // to 'y', for install types 1 and 3. string strVal = cf->getConfig(primitiveServers, "RotatingDestination"); // XXX: Permanently disable for now... strVal = "N"; if ((strVal == "y") || (strVal == "Y")) { rotatingDestination = true; // Disable destination rotation if UM and PM are running on same // server, because we could accidentally end up sending DMLProc // responses to ExeMgr and vice versa, if we rotated socket dest. temp = toInt(cf->getConfig("Installation", "ServerTypeInstall")); if ((temp == oam::INSTALL_COMBINE_DM_UM_PM) || (temp == oam::INSTALL_COMBINE_PM_UM)) rotatingDestination = false; } string strBlockPct = cf->getConfig(dbbc, "NumBlocksPct"); temp = atoi(strBlockPct.c_str()); bool absCache = false; if (temp > 0) { BRPBlocksPct = temp; /* MCOL-1847. Did the user specify this as an absolute? */ int len = strBlockPct.length(); if ((strBlockPct[len - 1] >= 'a' && strBlockPct[len - 1] <= 'z') || (strBlockPct[len - 1] >= 'A' && strBlockPct[len - 1] <= 'Z')) { absCache = true; BRPBlocksPct = Config::fromText(strBlockPct); } } if (absCache) BRPBlocks = BRPBlocksPct / 8192; else BRPBlocks = ((BRPBlocksPct / 100.0) * (double)cg.getTotalMemory()) / 8192; #if 0 temp = toInt(cf->getConfig(dbbc, "NumThreads")); if (temp > 0) BRPThreads = temp; #endif temp = toInt(cf->getConfig(dbbc, "NumCaches")); if (temp > 0) cacheCount = temp; temp = toInt(cf->getConfig(dbbc, "NumDeleteBlocks")); if (temp > 0) deleteBlocks = temp; if ((uint32_t)(.01 * BRPBlocks) < deleteBlocks) deleteBlocks = (uint32_t)(.01 * BRPBlocks); temp = toInt(cf->getConfig(primitiveServers, "ColScanBufferSizeBlocks")); if (temp > (int)MaxReadAheadSz || temp < 1) maxBlocksPerRead = MaxReadAheadSz; else if (temp > 0) maxBlocksPerRead = temp; temp = toInt(cf->getConfig(primitiveServers, "ColScanReadAheadBlocks")); if (temp > (int)MaxReadAheadSz || temp < 0) blocksReadAhead = MaxReadAheadSz; else if (temp > 0) { // make sure we've got an integral factor of extent size for (; (MaxExtentSize % temp) != 0; ++temp) ; blocksReadAhead = temp; } temp = toInt(cf->getConfig(primitiveServers, "PTTrace")); if (temp > 0) PTTrace = true; temp = toInt(cf->getConfig(primitiveServers, "PrefetchThreshold")); if (temp < 0 || temp > 100) prefetchThreshold = 0; else prefetchThreshold = temp / 100.0; int maxPct = 0; // disable by default temp = toInt(cf->getConfig(primitiveServers, "MaxPct")); if (temp >= 0) maxPct = temp; // We could use this same mechanism for other growing buffers. int aggPct = 95; temp = toInt(cf->getConfig("SystemConfig", "MemoryCheckPercent")); if (temp >= 0) aggPct = temp; //...Start the thread to monitor our memory usage new boost::thread(utils::MonitorProcMem(maxPct, aggPct, 28)); // config file priority is 40..1 (highest..lowest) string sPriority = cf->getConfig(primitiveServers, "Priority"); if (sPriority.length() > 0) temp = toInt(sPriority); else temp = 21; // convert config file value to setpriority(2) value (-20..19, -1 is the default) if (temp > 0) priority = 20 - temp; else if (temp < 0) priority = 19; if (priority < -20) priority = -20; setpriority(PRIO_PROCESS, 0, priority); //..Instantiate UmSocketSelector singleton. Disable rotating destination //..selection if no UM IP addresses are in the Calpo67108864LLnt.xml file. UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance(); if (rotatingDestination) { if (pUmSocketSelector->ipAddressCount() < 1) rotatingDestination = false; } // See if we want to override the calculated #cores temp = toInt(cf->getConfig(primitiveServers, "NumCores")); if (temp > 0) configNumCores = temp; if (configNumCores <= 0) { // count the actual #cores numCores = cg.getNumCores(); if (numCores == 0) numCores = 8; } else numCores = configNumCores; // based on the #cores, calculate some thread parms if (numCores > 0) { BRPThreads = 2 * numCores; // there doesn't seem much benefit to having more than this, and sometimes it causes problems. // DBBC.NumThreads can override this cap BRPThreads = std::min(BRPThreads, 32); } // the default is ~10% low, 30% medium, 60% high, (where 2*cores = 100%) if (highPriorityPercentage == 0 && medPriorityPercentage == 0 && lowPriorityPercentage == 0) { lowPriorityThreads = max(1, (2 * numCores) / 10); medPriorityThreads = max(1, (2 * numCores) / 3); highPriorityThreads = (2 * numCores) - lowPriorityThreads - medPriorityThreads; } else { uint32_t totalThreads = (uint32_t)((lowPriorityPercentage + medPriorityPercentage + highPriorityPercentage) / 100.0 * (2 * numCores)); if (totalThreads == 0) totalThreads = 1; lowPriorityThreads = (uint32_t)(lowPriorityPercentage / 100.0 * (2 * numCores)); medPriorityThreads = (uint32_t)(medPriorityPercentage / 100.0 * (2 * numCores)); highPriorityThreads = totalThreads - lowPriorityThreads - medPriorityThreads; } BPPCount = highPriorityThreads + medPriorityThreads + lowPriorityThreads; // let the user override if they want temp = toInt(cf->getConfig(primitiveServers, "BPPCount")); if (temp > 0 && temp < (int)BPPCount) BPPCount = temp; temp = toInt(cf->getConfig(dbbc, "NumThreads")); if (temp > 0) BRPThreads = temp; // @bug4598, switch for O_DIRECT to support gluster fs. // directIOFlag == O_DIRECT, by default strVal = cf->getConfig(primitiveServers, "DirectIO"); if ((strVal == "n") || (strVal == "N")) directIOFlag = 0; IDBPolicy::configIDBPolicy(); // no versionbuffer if using HDFS for performance reason if (IDBPolicy::useHdfs()) noVB = 1; cout << "Starting PrimitiveServer: st = " << serverThreads << ", sq = " << serverQueueSize << ", pw = " << processorWeight << ", pq = " << processorQueueSize << ", nb = " << BRPBlocks << ", nt = " << BRPThreads << ", nc = " << cacheCount << ", ra = " << blocksReadAhead << ", db = " << deleteBlocks << ", mb = " << maxBlocksPerRead << ", rd = " << rotatingDestination << ", tr = " << PTTrace << ", ss = " << PMSmallSide << ", bp = " << BPPCount << endl; PrimitiveServer server(serverThreads, serverQueueSize, processorWeight, processorQueueSize, rotatingDestination, BRPBlocks, BRPThreads, cacheCount, maxBlocksPerRead, blocksReadAhead, deleteBlocks, PTTrace, prefetchThreshold, PMSmallSide); #ifdef QSIZE_DEBUG thread* qszMonThd; if (gDebugLevel >= STATS) { ofstream* qszLog = new ofstream("/var/log/mariadb/columnstore/trace/ppqsz.dat"); if (!qszLog->good()) { qszLog->close(); delete qszLog; qszLog = 0; } qszMonThd = new thread(QszMonThd(&server, qszLog)); } #endif #ifdef DUMP_CACHE_CONTENTS { // Need to use pthreads API here... pthread_t thd1; pthread_attr_t attr1; pthread_attr_init(&attr1); pthread_attr_setdetachstate(&attr1, PTHREAD_CREATE_DETACHED); pthread_create(&thd1, &attr1, waitForSIGUSR1, reinterpret_cast(cacheCount)); } #endif primServerThreadPool = server.getProcessorThreadPool(); server.start(this, startupRaceLock); cerr << "server.start() exited!" << endl; // Free up resources allocated by MY_INIT() above. my_end(0); return 1; } int main(int argc, char** argv) { if (checkArchitecture() != arcitecture::SSE4_2 && checkArchitecture() != arcitecture::ASIMD) { std::cerr << "Unsupported CPU architecture. ARM Advanced SIMD or x86_64 SSE4.2 required; aborting. \n"; return 1; } Opt opt(argc, argv); // Set locale language setlocale(LC_ALL, ""); setlocale(LC_NUMERIC, "C"); // This is unset due to the way we start it program_invocation_short_name = const_cast("PrimProc"); ServicePrimProc::instance()->setOpt(opt); return ServicePrimProc::instance()->Run(); }