You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
166 lines
4.6 KiB
C++
166 lines
4.6 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/******************************************************************************************
|
|
* $Id: $
|
|
*
|
|
******************************************************************************************/
|
|
#include <string>
|
|
#include <stdexcept>
|
|
#include <iostream>
|
|
#include <sstream>
|
|
#include <sys/time.h>
|
|
using namespace std;
|
|
|
|
#include "jl_logger.h"
|
|
#include "resourcedistributor.h"
|
|
|
|
namespace joblist
|
|
{
|
|
const unsigned maxSessionsDefault = 100;
|
|
|
|
uint64_t ResourceDistributor::requestResource(uint32_t sessionID)
|
|
{
|
|
uint64_t resource = getSessionResource(sessionID);
|
|
|
|
return requestResource(sessionID, resource);
|
|
}
|
|
|
|
uint64_t ResourceDistributor::requestResource(uint32_t sessionID, uint64_t resource)
|
|
{
|
|
if (fTraceOn)
|
|
logMessage(logging::LOG_TYPE_DEBUG, LogRDRequest, resource, sessionID);
|
|
|
|
boost::mutex::scoped_lock lk(fResourceLock);
|
|
|
|
while (fTotalResource < resource)
|
|
{
|
|
if (fTraceOn)
|
|
logMessage(logging::LOG_TYPE_DEBUG, LogRDRequestWait, resource, sessionID);
|
|
|
|
fResourceAvailable.wait(lk);
|
|
|
|
if (fTraceOn)
|
|
logMessage(logging::LOG_TYPE_DEBUG, LogRDRequest, resource, sessionID);
|
|
}
|
|
|
|
fTotalResource -= resource;
|
|
|
|
return resource;
|
|
}
|
|
|
|
void ResourceDistributor::returnResource(uint64_t resource)
|
|
{
|
|
if (fTraceOn)
|
|
logMessage(logging::LOG_TYPE_DEBUG, LogRDReturn, resource);
|
|
|
|
boost::mutex::scoped_lock lk(fResourceLock);
|
|
fTotalResource += resource;
|
|
|
|
fResourceAvailable.notify_all();
|
|
}
|
|
|
|
void ResourceDistributor::logMessage(logging::LOG_TYPE logLevel, logging::Message::MessageID mid,
|
|
uint64_t value, uint32_t sessionID)
|
|
{
|
|
logging::Message::Args args;
|
|
args.add(fJob);
|
|
args.add(fIdentity);
|
|
args.add(fTotalResource);
|
|
|
|
if (value)
|
|
args.add(value);
|
|
|
|
Logger log;
|
|
log.logMessage(logLevel, mid, args, logging::LoggingID(5, sessionID));
|
|
}
|
|
|
|
void LockedSessionMap::updateAging(uint32_t sessionID)
|
|
{
|
|
boost::mutex::scoped_lock lock(fSessionLock);
|
|
SessionList::iterator pos = find(fSessionAgingList.begin(), fSessionAgingList.end(), sessionID);
|
|
|
|
if (fSessionAgingList.end() != pos)
|
|
fSessionAgingList.splice(fSessionAgingList.end(), fSessionAgingList,
|
|
find(fSessionAgingList.begin(), fSessionAgingList.end(), sessionID));
|
|
else
|
|
fSessionAgingList.push_back(sessionID);
|
|
}
|
|
|
|
uint64_t LockedSessionMap::getSessionResource(uint32_t sessionID)
|
|
{
|
|
SessionMap::const_iterator it = fSessionMap.find(sessionID);
|
|
|
|
if (fSessionMap.end() != it)
|
|
{
|
|
updateAging(sessionID);
|
|
return it->second;
|
|
}
|
|
|
|
return fResourceBlock;
|
|
}
|
|
|
|
bool LockedSessionMap::addSession(uint32_t sessionID, uint64_t resource, uint64_t limit)
|
|
{
|
|
bool ret = true;
|
|
|
|
if (resource > limit)
|
|
{
|
|
resource = limit;
|
|
ret = false;
|
|
}
|
|
|
|
boost::mutex::scoped_lock maplock(fMapLock);
|
|
fSessionMap[sessionID] = resource;
|
|
updateAging(sessionID);
|
|
|
|
if (fMaxSessions < fSessionMap.size())
|
|
{
|
|
boost::mutex::scoped_lock lock(fSessionLock);
|
|
uint32_t oldsession = fSessionAgingList.front();
|
|
fSessionMap.erase(oldsession);
|
|
fSessionAgingList.erase(fSessionAgingList.begin());
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
void LockedSessionMap::removeSession(uint32_t sessionID)
|
|
{
|
|
boost::mutex::scoped_lock maplock(fMapLock);
|
|
fSessionMap.erase(sessionID);
|
|
boost::mutex::scoped_lock listlock(fSessionLock);
|
|
fSessionAgingList.erase(find(fSessionAgingList.begin(), fSessionAgingList.end(), sessionID));
|
|
}
|
|
|
|
ostream& operator<<(ostream& os, const LockedSessionMap& lsm)
|
|
{
|
|
os << "Default Resource Block: " << lsm.fResourceBlock
|
|
<< "\tMax Number of saved sessions: " << lsm.fMaxSessions << endl;
|
|
os << "Session Map:\tsessionID\tvalue\n";
|
|
LockedSessionMap::SessionMap::const_iterator smIt = lsm.fSessionMap.begin(), smEnd = lsm.fSessionMap.end();
|
|
|
|
for (; smIt != smEnd; ++smIt)
|
|
os << "\t\t" << smIt->first << "\t\t" << smIt->second << endl;
|
|
|
|
os << "\nAging List:\tsessionID\n\t\t";
|
|
copy(lsm.fSessionAgingList.begin(), lsm.fSessionAgingList.end(), ostream_iterator<uint32_t>(os, "\n\t\t"));
|
|
os << endl;
|
|
return os;
|
|
}
|
|
} // namespace joblist
|