You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
MCOL-3557: add config change listeners.
This commit is contained in:
@ -56,24 +56,11 @@ Cache::Cache()
|
||||
Config *conf = Config::get();
|
||||
logger = SMLogging::get();
|
||||
|
||||
string stmp = conf->getValue("Cache", "cache_size");
|
||||
if (stmp.empty())
|
||||
{
|
||||
logger->log(LOG_CRIT, "Cache/cache_size is not set");
|
||||
throw runtime_error("Please set Cache/cache_size in the storagemanager.cnf file");
|
||||
}
|
||||
try
|
||||
{
|
||||
maxCacheSize = stoul(stmp);
|
||||
}
|
||||
catch (invalid_argument &)
|
||||
{
|
||||
logger->log(LOG_CRIT, "Cache/cache_size is not a number");
|
||||
throw runtime_error("Please set Cache/cache_size to a number");
|
||||
}
|
||||
configListener();
|
||||
conf->addConfigListener(this);
|
||||
//cout << "Cache got cache size " << maxCacheSize << endl;
|
||||
|
||||
stmp = conf->getValue("ObjectStorage", "object_size");
|
||||
string stmp = conf->getValue("ObjectStorage", "object_size");
|
||||
if (stmp.empty())
|
||||
{
|
||||
logger->log(LOG_CRIT, "ObjectStorage/object_size is not set");
|
||||
@ -129,6 +116,7 @@ Cache::Cache()
|
||||
|
||||
Cache::~Cache()
|
||||
{
|
||||
Config::get()->removeConfigListener(this);
|
||||
for (auto it = prefixCaches.begin(); it != prefixCaches.end(); ++it)
|
||||
delete it->second;
|
||||
}
|
||||
@ -334,6 +322,39 @@ void Cache::shutdown()
|
||||
downloader.reset();
|
||||
}
|
||||
|
||||
void Cache::configListener()
|
||||
{
|
||||
Config *conf = Config::get();
|
||||
SMLogging* logger = SMLogging::get();
|
||||
|
||||
if (maxCacheSize == 0)
|
||||
maxCacheSize = 2147483648;
|
||||
string stmp = conf->getValue("Cache", "cache_size");
|
||||
if (stmp.empty())
|
||||
{
|
||||
logger->log(LOG_CRIT, "Cache/cache_size is not set. Using current value = %zi",maxCacheSize);
|
||||
}
|
||||
try
|
||||
{
|
||||
size_t newMaxCacheSize = stoull(stmp);
|
||||
if (newMaxCacheSize != maxCacheSize)
|
||||
{
|
||||
if (newMaxCacheSize >= MIN_CACHE_SIZE)
|
||||
{
|
||||
setMaxCacheSize(newMaxCacheSize);
|
||||
logger->log(LOG_INFO, "Cache/cache_size = %zi",maxCacheSize);
|
||||
}
|
||||
else
|
||||
{
|
||||
logger->log(LOG_CRIT, "Cache/cache_size is below %u. Check value and suffix are correct in configuration file. Using current value = %zi",MIN_CACHE_SIZE,maxCacheSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (invalid_argument &)
|
||||
{
|
||||
logger->log(LOG_CRIT, "Cache/cache_size is not a number. Using current value = %zi",maxCacheSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -27,6 +27,7 @@
|
||||
#include "Downloader.h"
|
||||
#include "SMLogging.h"
|
||||
#include "PrefixCache.h"
|
||||
#include "Config.h"
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
@ -35,10 +36,16 @@
|
||||
#include <boost/filesystem/path.hpp>
|
||||
#include <boost/thread/mutex.hpp>
|
||||
|
||||
// Setting to min possible based on
|
||||
// using 1k in config file. If wrong
|
||||
// letter is used will not value be
|
||||
// set to less than 1k
|
||||
#define MIN_CACHE_SIZE 1024
|
||||
|
||||
namespace storagemanager
|
||||
{
|
||||
|
||||
class Cache : public boost::noncopyable
|
||||
class Cache : public boost::noncopyable , public ConfigListener
|
||||
{
|
||||
public:
|
||||
static Cache *get();
|
||||
@ -94,6 +101,7 @@ class Cache : public boost::noncopyable
|
||||
void reset();
|
||||
void validateCacheSize();
|
||||
|
||||
virtual void configListener() override;
|
||||
private:
|
||||
Cache();
|
||||
|
||||
|
@ -131,7 +131,8 @@ void Config::reloadThreadFcn()
|
||||
try
|
||||
{
|
||||
reload();
|
||||
// TODO: add a listener interface to inform upstream of config changes
|
||||
for (auto& listener : configListeners)
|
||||
listener->configListener();
|
||||
boost::this_thread::sleep(reloadInterval);
|
||||
}
|
||||
catch (boost::property_tree::ini_parser_error &e)
|
||||
@ -207,4 +208,17 @@ string Config::getValue(const string §ion, const string &key) const
|
||||
return ret;
|
||||
}
|
||||
|
||||
void Config::addConfigListener(ConfigListener *listener)
|
||||
{
|
||||
configListeners.push_back(listener);
|
||||
}
|
||||
|
||||
void Config::removeConfigListener(ConfigListener *listener)
|
||||
{
|
||||
auto iterator = std::find(configListeners.begin(), configListeners.end(), listener);
|
||||
|
||||
if (iterator != configListeners.end())
|
||||
configListeners.erase(iterator);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -22,13 +22,20 @@
|
||||
#include <boost/property_tree/ptree.hpp>
|
||||
#include <boost/thread.hpp>
|
||||
#include <sys/types.h>
|
||||
|
||||
#include <functional>
|
||||
#include <vector>
|
||||
#include <string>
|
||||
|
||||
/* TODO. Need a config change listener impl. */
|
||||
|
||||
namespace storagemanager
|
||||
{
|
||||
class ConfigListener
|
||||
{
|
||||
public:
|
||||
virtual void configListener() = 0;
|
||||
};
|
||||
|
||||
class Config : public boost::noncopyable
|
||||
{
|
||||
public:
|
||||
@ -40,12 +47,16 @@ class Config : public boost::noncopyable
|
||||
// for testing, lets caller specify a config file to use
|
||||
static Config *get(const std::string &);
|
||||
|
||||
void addConfigListener(ConfigListener *listener);
|
||||
void removeConfigListener(ConfigListener *listener);
|
||||
|
||||
private:
|
||||
Config();
|
||||
Config(const std::string &);
|
||||
|
||||
void reload();
|
||||
void reloadThreadFcn();
|
||||
std::vector<ConfigListener *> configListeners;
|
||||
struct ::timespec last_mtime;
|
||||
mutable boost::mutex mutex;
|
||||
boost::thread reloader;
|
||||
@ -56,6 +67,9 @@ class Config : public boost::noncopyable
|
||||
bool die;
|
||||
};
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -31,17 +31,8 @@ namespace storagemanager
|
||||
Downloader::Downloader() : maxDownloads(0)
|
||||
{
|
||||
storage = CloudStorage::get();
|
||||
string sMaxDownloads = Config::get()->getValue("ObjectStorage", "max_concurrent_downloads");
|
||||
try
|
||||
{
|
||||
maxDownloads = stoul(sMaxDownloads);
|
||||
}
|
||||
catch(invalid_argument)
|
||||
{
|
||||
logger->log(LOG_WARNING, "Downloader: Invalid arg for ObjectStorage/max_concurrent_downloads, using default of 20");
|
||||
}
|
||||
if (maxDownloads == 0)
|
||||
maxDownloads = 20;
|
||||
configListener();
|
||||
Config::get()->addConfigListener(this);
|
||||
workers.setMaxThreads(maxDownloads);
|
||||
workers.setName("Downloader");
|
||||
logger = SMLogging::get();
|
||||
@ -51,6 +42,7 @@ Downloader::Downloader() : maxDownloads(0)
|
||||
|
||||
Downloader::~Downloader()
|
||||
{
|
||||
Config::get()->removeConfigListener(this);
|
||||
}
|
||||
|
||||
void Downloader::download(const vector<const string *> &keys, vector<int> *errnos, vector<size_t> *sizes,
|
||||
@ -216,4 +208,29 @@ inline bool Downloader::DLEquals::operator()(const boost::shared_ptr<Download> &
|
||||
return (d1->key == d2->key);
|
||||
}
|
||||
|
||||
void Downloader::configListener()
|
||||
{
|
||||
// Downloader threads
|
||||
string stmp = Config::get()->getValue("ObjectStorage", "max_concurrent_downloads");
|
||||
if (maxDownloads == 0)
|
||||
maxDownloads = 20;
|
||||
if (stmp.empty())
|
||||
{
|
||||
logger->log(LOG_CRIT, "max_concurrent_downloads is not set. Using current value = %u",maxDownloads);
|
||||
}
|
||||
try
|
||||
{
|
||||
uint newValue = stoul(stmp);
|
||||
if (newValue != maxDownloads)
|
||||
{
|
||||
maxDownloads = newValue;
|
||||
workers.setMaxThreads(maxDownloads);
|
||||
logger->log(LOG_INFO, "max_concurrent_downloads = %u",maxDownloads);
|
||||
}
|
||||
}
|
||||
catch (invalid_argument &)
|
||||
{
|
||||
logger->log(LOG_CRIT, "max_concurrent_downloads is not a number. Using current value = %u",maxDownloads);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,8 @@
|
||||
#include "ThreadPool.h"
|
||||
#include "CloudStorage.h"
|
||||
#include "SMLogging.h"
|
||||
#include "Config.h"
|
||||
|
||||
#include <unordered_set>
|
||||
#include <vector>
|
||||
#include <string>
|
||||
@ -34,7 +36,7 @@
|
||||
namespace storagemanager
|
||||
{
|
||||
|
||||
class Downloader
|
||||
class Downloader : public ConfigListener
|
||||
{
|
||||
public:
|
||||
Downloader();
|
||||
@ -50,6 +52,8 @@ class Downloader
|
||||
|
||||
void printKPIs() const;
|
||||
|
||||
virtual void configListener() override;
|
||||
|
||||
private:
|
||||
uint maxDownloads;
|
||||
//boost::filesystem::path downloadPath;
|
||||
|
@ -63,17 +63,8 @@ Synchronizer::Synchronizer() : maxUploads(0)
|
||||
flushesTriggeredBySize = flushesTriggeredByTimer = journalsMerged =
|
||||
objectsSyncedWithNoJournal = bytesReadBySync = bytesReadBySyncWithJournal = 0;
|
||||
|
||||
string stmp = config->getValue("ObjectStorage", "max_concurrent_uploads");
|
||||
try
|
||||
{
|
||||
maxUploads = stoul(stmp);
|
||||
}
|
||||
catch(invalid_argument)
|
||||
{
|
||||
logger->log(LOG_WARNING, "Downloader: Invalid arg for ObjectStorage/max_concurrent_uploads, using default of 20");
|
||||
}
|
||||
if (maxUploads == 0)
|
||||
maxUploads = 20;
|
||||
configListener();
|
||||
config->addConfigListener(this);
|
||||
|
||||
journalPath = cache->getJournalPath();
|
||||
cachePath = cache->getCachePath();
|
||||
@ -91,6 +82,7 @@ Synchronizer::~Synchronizer()
|
||||
or save the list it's working on.....
|
||||
For milestone 2, this will do the safe thing and finish working first.
|
||||
Later we can get fancy. */
|
||||
Config::get()->removeConfigListener(this);
|
||||
forceFlush();
|
||||
die = true;
|
||||
syncThread.join();
|
||||
@ -772,4 +764,29 @@ void Synchronizer::Job::operator()()
|
||||
sync->process(it);
|
||||
}
|
||||
|
||||
void Synchronizer::configListener()
|
||||
{
|
||||
// Uploader threads
|
||||
string stmp = Config::get()->getValue("ObjectStorage", "max_concurrent_uploads");
|
||||
if (maxUploads == 0)
|
||||
maxUploads = 20;
|
||||
if (stmp.empty())
|
||||
{
|
||||
logger->log(LOG_CRIT, "max_concurrent_uploads is not set. Using current value = %u",maxUploads);
|
||||
}
|
||||
try
|
||||
{
|
||||
uint newValue = stoul(stmp);
|
||||
if (newValue != maxUploads)
|
||||
{
|
||||
maxUploads = newValue;
|
||||
threadPool->setMaxThreads(maxUploads);
|
||||
logger->log(LOG_INFO, "max_concurrent_uploads = %u",maxUploads);
|
||||
}
|
||||
}
|
||||
catch (invalid_argument &)
|
||||
{
|
||||
logger->log(LOG_CRIT, "max_concurrent_uploads is not a number. Using current value = %u",maxUploads);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -31,6 +31,7 @@
|
||||
#include "Replicator.h"
|
||||
#include "ThreadPool.h"
|
||||
#include "CloudStorage.h"
|
||||
#include "Config.h"
|
||||
|
||||
namespace storagemanager
|
||||
{
|
||||
@ -38,7 +39,7 @@ namespace storagemanager
|
||||
class Cache; // break circular dependency in header files
|
||||
class IOCoordinator;
|
||||
|
||||
class Synchronizer : public boost::noncopyable
|
||||
class Synchronizer : public boost::noncopyable , public ConfigListener
|
||||
{
|
||||
public:
|
||||
static Synchronizer *get();
|
||||
@ -61,6 +62,9 @@ class Synchronizer : public boost::noncopyable
|
||||
boost::filesystem::path getJournalPath();
|
||||
boost::filesystem::path getCachePath();
|
||||
void printKPIs() const;
|
||||
|
||||
virtual void configListener() override;
|
||||
|
||||
private:
|
||||
Synchronizer();
|
||||
|
||||
|
Reference in New Issue
Block a user