1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00
Files
mariadb-columnstore-engine/dbcon/joblist/resourcemanager.cpp

392 lines
13 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2022 Mariadb Corporation.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/******************************************************************************************
* $Id: resourcemanager.cpp 9655 2013-06-25 23:08:13Z xlou $
*
******************************************************************************************/
#include <unistd.h>
#include <string>
#include <stdexcept>
#include <iostream>
#include <sstream>
#include <fstream>
#include <sys/time.h>
using namespace std;
#include "resourcemanager.h"
#include "jl_logger.h"
#include "cgroupconfigurator.h"
#include "liboamcpp.h"
#include "secrets.h"
using namespace config;
namespace joblist
{
ResourceManager* ResourceManager::fInstance = NULL;
boost::mutex mx;
ResourceManager* ResourceManager::instance(bool runningInExeMgr, config::Config* aConfig)
{
boost::mutex::scoped_lock lk(mx);
if (!fInstance)
fInstance = new ResourceManager(runningInExeMgr, aConfig);
return fInstance;
}
ResourceManager::ResourceManager(bool runningInExeMgr, config::Config* aConfig)
: fExeMgrStr("ExeMgr1")
, fSystemConfigStr("SystemConfig")
, fDMLProcStr("DMLProc")
, fBatchInsertStr("BatchInsert")
, fConfig(aConfig == nullptr ? Config::makeConfig() : aConfig)
, fNumCores(8)
, fHjNumThreads(defaultNumThreads)
, fJlProcessorThreadsPerScan(defaultProcessorThreadsPerScan)
, fJlNumScanReceiveThreads(defaultScanReceiveThreads)
, fJlMaxOutstandingRequests(defaultMaxOutstandingRequests)
, fHJUmMaxMemorySmallSideDistributor(
fHashJoinStr, "UmMaxMemorySmallSide",
getUintVal(fHashJoinStr, "TotalUmMaxMemorySmallSide", defaultTotalUmMemory),
getUintVal(fHashJoinStr, "UmMaxMemorySmallSide", defaultHJUmMaxMemorySmallSide), 0)
, fHJPmMaxMemorySmallSideSessionMap(
getUintVal(fHashJoinStr, "PmMaxMemorySmallSide", defaultHJPmMaxMemorySmallSide))
, isExeMgr(runningInExeMgr)
{
int temp;
int configNumCores = -1;
fTraceFlags = 0;
// See if we want to override the calculated #cores
temp = getIntVal(fJobListStr, "NumCores", -1);
if (temp > 0)
configNumCores = temp;
if (configNumCores <= 0)
{
// count the actual #cores
utils::CGroupConfigurator cg;
fNumCores = cg.getNumCores();
if (fNumCores <= 0)
fNumCores = 8;
}
else
fNumCores = configNumCores;
// based on the #cores, calculate some thread parms
if (fNumCores > 0)
{
fHjNumThreads = fNumCores;
fJlNumScanReceiveThreads = fNumCores;
}
// possibly override any calculated values
temp = getIntVal(fHashJoinStr, "NumThreads", -1);
if (temp > 0)
fHjNumThreads = temp;
temp = getIntVal(fJobListStr, "ProcessorThreadsPerScan", -1);
if (temp > 0)
fJlProcessorThreadsPerScan = temp;
temp = getIntVal(fJobListStr, "MaxOutstandingRequests", -1);
if (temp > 0)
fJlMaxOutstandingRequests = temp;
else
{
oam::Oam oam;
oam::ModuleTypeConfig moduletypeconfig;
oam.getSystemConfig("pm", moduletypeconfig);
const uint temp = moduletypeconfig.ModuleCount * fNumCores * 4 / fJlProcessorThreadsPerScan;
const uint minMaxOutstandingRequests =
std::max(static_cast<uint>(moduletypeconfig.ModuleCount * 2), defaultMaxOutstandingRequests);
fJlMaxOutstandingRequests = temp > minMaxOutstandingRequests ? temp : minMaxOutstandingRequests;
}
temp = getIntVal(fJobListStr, "NumScanReceiveThreads", -1);
if (temp > 0)
fJlNumScanReceiveThreads = temp;
fDECConnectionsPerQuery = getUintVal(fJobListStr, "DECConnectionsPerQuery", 0);
fDECConnectionsPerQuery =
(fDECConnectionsPerQuery) ? fDECConnectionsPerQuery : getPsConnectionsPerPrimProc();
pmJoinMemLimit = getUintVal(fHashJoinStr, "PmMaxMemorySmallSide", defaultHJPmMaxMemorySmallSide);
// Need to use different limits if this instance isn't running on the UM,
// or if it's an ExeMgr running on a PM node
if (!isExeMgr)
totalUmMemLimit = pmJoinMemLimit;
else
{
// Installation.PMwithUM = y by default so RM prefers HashJoin.TotalPmUmMemory
// if it exists.
string whichLimit = "TotalUmMemory";
string pmWithUM = fConfig->getConfig("Installation", "PMwithUM");
if (pmWithUM == "y" || pmWithUM == "Y")
{
oam::Oam OAM;
oam::oamModuleInfo_t moduleInfo = OAM.getModuleInfo();
string& moduleType = boost::get<1>(moduleInfo);
if (moduleType == "pm" || moduleType == "PM")
{
string doesItExist = fConfig->getConfig(fHashJoinStr, "TotalPmUmMemory");
if (!doesItExist.empty())
whichLimit = "TotalPmUmMemory";
}
}
string umtxt = fConfig->getConfig(fHashJoinStr, whichLimit);
if (umtxt.empty())
totalUmMemLimit = defaultTotalUmMemory;
else
{
// is it an absolute or a percentage?
if (umtxt.find('%') != string::npos)
{
utils::CGroupConfigurator cg;
uint64_t totalMem = cg.getTotalMemory();
totalUmMemLimit = atoll(umtxt.c_str()) / 100.0 * (double)totalMem;
if (totalUmMemLimit == 0 || totalUmMemLimit == LLONG_MIN ||
totalUmMemLimit == LLONG_MAX) // some garbage in the xml entry
totalUmMemLimit = defaultTotalUmMemory;
}
else // an absolute; use the existing converter
{
totalUmMemLimit = getIntVal(fHashJoinStr, whichLimit, defaultTotalUmMemory);
}
}
}
configuredUmMemLimit = totalUmMemLimit;
// cout << "RM: total UM memory = " << totalUmMemLimit << endl;
// multi-thread aggregate
string nt, nb, nr;
nt = fConfig->getConfig("RowAggregation", "RowAggrThreads");
if (nt.empty())
{
if (numCores() > 0)
fAggNumThreads = numCores();
else
fAggNumThreads = 1;
}
else
fAggNumThreads = fConfig->uFromText(nt);
nb = fConfig->getConfig("RowAggregation", "RowAggrBuckets");
if (nb.empty())
fAggNumBuckets = fAggNumThreads * 4;
else
fAggNumBuckets = fConfig->uFromText(nb);
nr = fConfig->getConfig("RowAggregation", "RowAggrRowGroupsPerThread");
if (nr.empty())
fAggNumRowGroups = 20;
else
fAggNumRowGroups = fConfig->uFromText(nr);
// window function
string wt = fConfig->getConfig("WindowFunction", "WorkThreads");
if (wt.empty())
fWindowFunctionThreads = numCores();
else
fWindowFunctionThreads = fConfig->uFromText(wt);
// hdfs info
string hdfs = fConfig->getConfig("SystemConfig", "DataFilePlugin");
if (hdfs.find("hdfs") != string::npos)
fUseHdfs = true;
else
fUseHdfs = false;
fAllowedDiskAggregation =
getBoolVal(fRowAggregationStr, "AllowDiskBasedAggregation", defaultAllowDiskAggregation);
if (!load_encryption_keys())
{
Logger log;
log.logMessage(logging::LOG_TYPE_ERROR, "Error loading CEJ password encryption keys");
}
}
int ResourceManager::getEmPriority() const
{
int temp = getIntVal(fExeMgrStr, "Priority", defaultEMPriority);
// config file priority is 40..1 (highest..lowest)
// convert to -20..19 (highest..lowest, defaults to -1)
int val;
// @Bug3385 - the ExeMgr priority was being set backwards with 1 being the highest instead of the lowest.
if (temp < 1)
val = 19;
else if (temp > 40)
val = -20;
else
val = 20 - temp;
return val;
}
void ResourceManager::addHJPmMaxSmallSideMap(uint32_t sessionID, uint64_t mem)
{
if (fHJPmMaxMemorySmallSideSessionMap.addSession(sessionID, mem,
fHJUmMaxMemorySmallSideDistributor.getTotalResource()))
logResourceChangeMessage(logging::LOG_TYPE_INFO, sessionID, mem, defaultHJPmMaxMemorySmallSide,
"PmMaxMemorySmallSide", LogRMResourceChange);
else
{
logResourceChangeMessage(logging::LOG_TYPE_WARNING, sessionID, mem,
fHJUmMaxMemorySmallSideDistributor.getTotalResource(), "PmMaxMemorySmallSide",
LogRMResourceChangeError);
logResourceChangeMessage(logging::LOG_TYPE_INFO, sessionID, mem,
fHJUmMaxMemorySmallSideDistributor.getTotalResource(), "PmMaxMemorySmallSide",
LogRMResourceChangeError);
}
}
void ResourceManager::addHJUmMaxSmallSideMap(uint32_t sessionID, uint64_t mem)
{
if (fHJUmMaxMemorySmallSideDistributor.addSession(sessionID, mem))
logResourceChangeMessage(logging::LOG_TYPE_INFO, sessionID, mem, defaultHJUmMaxMemorySmallSide,
"UmMaxMemorySmallSide", LogRMResourceChange);
else
{
logResourceChangeMessage(logging::LOG_TYPE_WARNING, sessionID, mem,
fHJUmMaxMemorySmallSideDistributor.getTotalResource(), "UmMaxMemorySmallSide",
LogRMResourceChangeError);
logResourceChangeMessage(logging::LOG_TYPE_INFO, sessionID, mem,
fHJUmMaxMemorySmallSideDistributor.getTotalResource(), "UmMaxMemorySmallSide",
LogRMResourceChangeError);
}
}
void ResourceManager::logResourceChangeMessage(logging::LOG_TYPE logType, uint32_t sessionID,
uint64_t newvalue, uint64_t value, const string& source,
logging::Message::MessageID mid)
{
logging::Message::Args args;
args.add(source);
args.add(newvalue);
args.add(value);
Logger log;
log.logMessage(logType, mid, args, logging::LoggingID(5, sessionID));
}
bool ResourceManager::getMysqldInfo(std::string& h, std::string& u, std::string& w, unsigned int& p) const
{
static const std::string hostUserUnassignedValue("unassigned");
// MCS will read username and pass from disk if the config changed.
bool reReadConfig = true;
u = getStringVal("CrossEngineSupport", "User", hostUserUnassignedValue, reReadConfig);
std::string encryptedPW = getStringVal("CrossEngineSupport", "Password", "", reReadConfig);
// This will return back the plaintext password if there is no MCSDATADIR/.secrets file present
w = decrypt_password(encryptedPW);
// MCS will not read username and pass from disk if the config changed.
h = getStringVal("CrossEngineSupport", "Host", hostUserUnassignedValue);
p = getUintVal("CrossEngineSupport", "Port", 0);
return h != hostUserUnassignedValue && u != hostUserUnassignedValue && p;
}
bool ResourceManager::queryStatsEnabled() const
{
std::string val(getStringVal("QueryStats", "Enabled", "N"));
boost::to_upper(val);
return "Y" == val;
}
bool ResourceManager::userPriorityEnabled() const
{
std::string val(getStringVal("UserPriority", "Enabled", "N"));
boost::to_upper(val);
return "Y" == val;
}
// Counts memory. This funtion doesn't actually malloc, just counts against two limits
// totalUmMemLimit for overall UM counting and (optional) sessionLimit for a single session.
// If both have space, return true.
bool ResourceManager::getMemory(int64_t amount, boost::shared_ptr<int64_t>& sessionLimit, bool patience)
{
bool ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0);
bool ret2 = sessionLimit ? (atomicops::atomicSub(sessionLimit.get(), amount) >= 0) : ret1;
uint32_t retryCounter = 0, maxRetries = 20; // 10s delay
while (patience && !(ret1 && ret2) && retryCounter++ < maxRetries)
{
atomicops::atomicAdd(&totalUmMemLimit, amount);
sessionLimit ? atomicops::atomicAdd(sessionLimit.get(), amount) : 0;
usleep(500000);
ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0);
ret2 = sessionLimit ? (atomicops::atomicSub(sessionLimit.get(), amount) >= 0) : ret1;
}
if (!(ret1 && ret2))
{
// If we didn't get any memory, restore the counters.
atomicops::atomicAdd(&totalUmMemLimit, amount);
sessionLimit ? atomicops::atomicAdd(sessionLimit.get(), amount) : 0;
}
return (ret1 && ret2);
}
// Don't care about session memory
bool ResourceManager::getMemory(int64_t amount, bool patience)
{
bool ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0);
uint32_t retryCounter = 0, maxRetries = 20; // 10s delay
while (patience && !ret1 && retryCounter++ < maxRetries)
{
atomicops::atomicAdd(&totalUmMemLimit, amount);
usleep(500000);
ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0);
}
if (!ret1)
{
// If we didn't get any memory, restore the counters.
atomicops::atomicAdd(&totalUmMemLimit, amount);
}
return ret1;
}
} // namespace joblist