/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2022 Mariadb Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /****************************************************************************************** * $Id: resourcemanager.cpp 9655 2013-06-25 23:08:13Z xlou $ * ******************************************************************************************/ #include #include #include #include #include #include #include using namespace std; #include "resourcemanager.h" #include "jl_logger.h" #include "cgroupconfigurator.h" #include "liboamcpp.h" #include "secrets.h" using namespace config; namespace joblist { ResourceManager* ResourceManager::fInstance = NULL; boost::mutex mx; ResourceManager* ResourceManager::instance(bool runningInExeMgr, config::Config* aConfig) { boost::mutex::scoped_lock lk(mx); if (!fInstance) fInstance = new ResourceManager(runningInExeMgr, aConfig); return fInstance; } ResourceManager::ResourceManager(bool runningInExeMgr, config::Config* aConfig) : fExeMgrStr("ExeMgr1") , fSystemConfigStr("SystemConfig") , fDMLProcStr("DMLProc") , fBatchInsertStr("BatchInsert") , fConfig(aConfig == nullptr ? Config::makeConfig() : aConfig) , fNumCores(8) , fHjNumThreads(defaultNumThreads) , fJlProcessorThreadsPerScan(defaultProcessorThreadsPerScan) , fJlNumScanReceiveThreads(defaultScanReceiveThreads) , fJlMaxOutstandingRequests(defaultMaxOutstandingRequests) , fHJUmMaxMemorySmallSideDistributor( fHashJoinStr, "UmMaxMemorySmallSide", getUintVal(fHashJoinStr, "TotalUmMaxMemorySmallSide", defaultTotalUmMemory), getUintVal(fHashJoinStr, "UmMaxMemorySmallSide", defaultHJUmMaxMemorySmallSide), 0) , fHJPmMaxMemorySmallSideSessionMap( getUintVal(fHashJoinStr, "PmMaxMemorySmallSide", defaultHJPmMaxMemorySmallSide)) , isExeMgr(runningInExeMgr) { int temp; int configNumCores = -1; fTraceFlags = 0; // See if we want to override the calculated #cores temp = getIntVal(fJobListStr, "NumCores", -1); if (temp > 0) configNumCores = temp; if (configNumCores <= 0) { // count the actual #cores utils::CGroupConfigurator cg; fNumCores = cg.getNumCores(); if (fNumCores <= 0) fNumCores = 8; } else fNumCores = configNumCores; // based on the #cores, calculate some thread parms if (fNumCores > 0) { fHjNumThreads = fNumCores; fJlNumScanReceiveThreads = fNumCores; } // possibly override any calculated values temp = getIntVal(fHashJoinStr, "NumThreads", -1); if (temp > 0) fHjNumThreads = temp; temp = getIntVal(fJobListStr, "ProcessorThreadsPerScan", -1); if (temp > 0) fJlProcessorThreadsPerScan = temp; temp = getIntVal(fJobListStr, "MaxOutstandingRequests", -1); if (temp > 0) fJlMaxOutstandingRequests = temp; else { oam::Oam oam; oam::ModuleTypeConfig moduletypeconfig; oam.getSystemConfig("pm", moduletypeconfig); const uint temp = moduletypeconfig.ModuleCount * fNumCores * 4 / fJlProcessorThreadsPerScan; const uint minMaxOutstandingRequests = std::max(static_cast(moduletypeconfig.ModuleCount * 2), defaultMaxOutstandingRequests); fJlMaxOutstandingRequests = temp > minMaxOutstandingRequests ? temp : minMaxOutstandingRequests; } temp = getIntVal(fJobListStr, "NumScanReceiveThreads", -1); if (temp > 0) fJlNumScanReceiveThreads = temp; fDECConnectionsPerQuery = getUintVal(fJobListStr, "DECConnectionsPerQuery", 0); fDECConnectionsPerQuery = (fDECConnectionsPerQuery) ? fDECConnectionsPerQuery : getPsConnectionsPerPrimProc(); pmJoinMemLimit = getUintVal(fHashJoinStr, "PmMaxMemorySmallSide", defaultHJPmMaxMemorySmallSide); // Need to use different limits if this instance isn't running on the UM, // or if it's an ExeMgr running on a PM node if (!isExeMgr) totalUmMemLimit = pmJoinMemLimit; else { // Installation.PMwithUM = y by default so RM prefers HashJoin.TotalPmUmMemory // if it exists. string whichLimit = "TotalUmMemory"; string pmWithUM = fConfig->getConfig("Installation", "PMwithUM"); if (pmWithUM == "y" || pmWithUM == "Y") { oam::Oam OAM; oam::oamModuleInfo_t moduleInfo = OAM.getModuleInfo(); string& moduleType = boost::get<1>(moduleInfo); if (moduleType == "pm" || moduleType == "PM") { string doesItExist = fConfig->getConfig(fHashJoinStr, "TotalPmUmMemory"); if (!doesItExist.empty()) whichLimit = "TotalPmUmMemory"; } } string umtxt = fConfig->getConfig(fHashJoinStr, whichLimit); if (umtxt.empty()) totalUmMemLimit = defaultTotalUmMemory; else { // is it an absolute or a percentage? if (umtxt.find('%') != string::npos) { utils::CGroupConfigurator cg; uint64_t totalMem = cg.getTotalMemory(); totalUmMemLimit = atoll(umtxt.c_str()) / 100.0 * (double)totalMem; if (totalUmMemLimit == 0 || totalUmMemLimit == LLONG_MIN || totalUmMemLimit == LLONG_MAX) // some garbage in the xml entry totalUmMemLimit = defaultTotalUmMemory; } else // an absolute; use the existing converter { totalUmMemLimit = getIntVal(fHashJoinStr, whichLimit, defaultTotalUmMemory); } } } configuredUmMemLimit = totalUmMemLimit; // cout << "RM: total UM memory = " << totalUmMemLimit << endl; // multi-thread aggregate string nt, nb, nr; nt = fConfig->getConfig("RowAggregation", "RowAggrThreads"); if (nt.empty()) { if (numCores() > 0) fAggNumThreads = numCores(); else fAggNumThreads = 1; } else fAggNumThreads = fConfig->uFromText(nt); nb = fConfig->getConfig("RowAggregation", "RowAggrBuckets"); if (nb.empty()) fAggNumBuckets = fAggNumThreads * 4; else fAggNumBuckets = fConfig->uFromText(nb); nr = fConfig->getConfig("RowAggregation", "RowAggrRowGroupsPerThread"); if (nr.empty()) fAggNumRowGroups = 20; else fAggNumRowGroups = fConfig->uFromText(nr); // window function string wt = fConfig->getConfig("WindowFunction", "WorkThreads"); if (wt.empty()) fWindowFunctionThreads = numCores(); else fWindowFunctionThreads = fConfig->uFromText(wt); // hdfs info string hdfs = fConfig->getConfig("SystemConfig", "DataFilePlugin"); if (hdfs.find("hdfs") != string::npos) fUseHdfs = true; else fUseHdfs = false; fAllowedDiskAggregation = getBoolVal(fRowAggregationStr, "AllowDiskBasedAggregation", defaultAllowDiskAggregation); if (!load_encryption_keys()) { Logger log; log.logMessage(logging::LOG_TYPE_ERROR, "Error loading CEJ password encryption keys"); } } int ResourceManager::getEmPriority() const { int temp = getIntVal(fExeMgrStr, "Priority", defaultEMPriority); // config file priority is 40..1 (highest..lowest) // convert to -20..19 (highest..lowest, defaults to -1) int val; // @Bug3385 - the ExeMgr priority was being set backwards with 1 being the highest instead of the lowest. if (temp < 1) val = 19; else if (temp > 40) val = -20; else val = 20 - temp; return val; } void ResourceManager::addHJPmMaxSmallSideMap(uint32_t sessionID, uint64_t mem) { if (fHJPmMaxMemorySmallSideSessionMap.addSession(sessionID, mem, fHJUmMaxMemorySmallSideDistributor.getTotalResource())) logResourceChangeMessage(logging::LOG_TYPE_INFO, sessionID, mem, defaultHJPmMaxMemorySmallSide, "PmMaxMemorySmallSide", LogRMResourceChange); else { logResourceChangeMessage(logging::LOG_TYPE_WARNING, sessionID, mem, fHJUmMaxMemorySmallSideDistributor.getTotalResource(), "PmMaxMemorySmallSide", LogRMResourceChangeError); logResourceChangeMessage(logging::LOG_TYPE_INFO, sessionID, mem, fHJUmMaxMemorySmallSideDistributor.getTotalResource(), "PmMaxMemorySmallSide", LogRMResourceChangeError); } } void ResourceManager::addHJUmMaxSmallSideMap(uint32_t sessionID, uint64_t mem) { if (fHJUmMaxMemorySmallSideDistributor.addSession(sessionID, mem)) logResourceChangeMessage(logging::LOG_TYPE_INFO, sessionID, mem, defaultHJUmMaxMemorySmallSide, "UmMaxMemorySmallSide", LogRMResourceChange); else { logResourceChangeMessage(logging::LOG_TYPE_WARNING, sessionID, mem, fHJUmMaxMemorySmallSideDistributor.getTotalResource(), "UmMaxMemorySmallSide", LogRMResourceChangeError); logResourceChangeMessage(logging::LOG_TYPE_INFO, sessionID, mem, fHJUmMaxMemorySmallSideDistributor.getTotalResource(), "UmMaxMemorySmallSide", LogRMResourceChangeError); } } void ResourceManager::logResourceChangeMessage(logging::LOG_TYPE logType, uint32_t sessionID, uint64_t newvalue, uint64_t value, const string& source, logging::Message::MessageID mid) { logging::Message::Args args; args.add(source); args.add(newvalue); args.add(value); Logger log; log.logMessage(logType, mid, args, logging::LoggingID(5, sessionID)); } bool ResourceManager::getMysqldInfo(std::string& h, std::string& u, std::string& w, unsigned int& p) const { static const std::string hostUserUnassignedValue("unassigned"); // MCS will read username and pass from disk if the config changed. bool reReadConfig = true; u = getStringVal("CrossEngineSupport", "User", hostUserUnassignedValue, reReadConfig); std::string encryptedPW = getStringVal("CrossEngineSupport", "Password", "", reReadConfig); // This will return back the plaintext password if there is no MCSDATADIR/.secrets file present w = decrypt_password(encryptedPW); // MCS will not read username and pass from disk if the config changed. h = getStringVal("CrossEngineSupport", "Host", hostUserUnassignedValue); p = getUintVal("CrossEngineSupport", "Port", 0); return h != hostUserUnassignedValue && u != hostUserUnassignedValue && p; } bool ResourceManager::queryStatsEnabled() const { std::string val(getStringVal("QueryStats", "Enabled", "N")); boost::to_upper(val); return "Y" == val; } bool ResourceManager::userPriorityEnabled() const { std::string val(getStringVal("UserPriority", "Enabled", "N")); boost::to_upper(val); return "Y" == val; } // Counts memory. This funtion doesn't actually malloc, just counts against two limits // totalUmMemLimit for overall UM counting and (optional) sessionLimit for a single session. // If both have space, return true. bool ResourceManager::getMemory(int64_t amount, boost::shared_ptr& sessionLimit, bool patience) { bool ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0); bool ret2 = sessionLimit ? (atomicops::atomicSub(sessionLimit.get(), amount) >= 0) : ret1; uint32_t retryCounter = 0, maxRetries = 20; // 10s delay while (patience && !(ret1 && ret2) && retryCounter++ < maxRetries) { atomicops::atomicAdd(&totalUmMemLimit, amount); sessionLimit ? atomicops::atomicAdd(sessionLimit.get(), amount) : 0; usleep(500000); ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0); ret2 = sessionLimit ? (atomicops::atomicSub(sessionLimit.get(), amount) >= 0) : ret1; } if (!(ret1 && ret2)) { // If we didn't get any memory, restore the counters. atomicops::atomicAdd(&totalUmMemLimit, amount); sessionLimit ? atomicops::atomicAdd(sessionLimit.get(), amount) : 0; } return (ret1 && ret2); } // Don't care about session memory bool ResourceManager::getMemory(int64_t amount, bool patience) { bool ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0); uint32_t retryCounter = 0, maxRetries = 20; // 10s delay while (patience && !ret1 && retryCounter++ < maxRetries) { atomicops::atomicAdd(&totalUmMemLimit, amount); usleep(500000); ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0); } if (!ret1) { // If we didn't get any memory, restore the counters. atomicops::atomicAdd(&totalUmMemLimit, amount); } return ret1; } } // namespace joblist