1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-12 11:01:17 +03:00

Checkpointing additions to Cache

This commit is contained in:
Patrick LeBlanc
2019-03-07 13:18:38 -06:00
parent 5f6694cecc
commit df6675db01
9 changed files with 169 additions and 20 deletions

View File

@@ -27,6 +27,7 @@ set(storagemanager_SRCS
src/Cache.cpp src/Cache.cpp
src/SMLogging.cpp src/SMLogging.cpp
src/Downloader.cpp src/Downloader.cpp
src/Synchronizer.cpp
) )
option(TRACE "Enable some tracing output" OFF) option(TRACE "Enable some tracing output" OFF)

View File

@@ -6,6 +6,9 @@
#include <syslog.h> #include <syslog.h>
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
using namespace std; using namespace std;
using namespace boost::filesystem; using namespace boost::filesystem;
@@ -13,33 +16,52 @@ using namespace boost::filesystem;
namespace storagemanager namespace storagemanager
{ {
Cache::Cache() Cache::Cache() : currentCacheSize(0)
{ {
Config *conf = Config::get(); Config *conf = Config::get();
logger = SMLogging::get();
string ssize = conf->getValue("Cache", "cache_size"); sync = Synchronizer::get();
if (ssize.empty())
string stmp = conf->getValue("Cache", "cache_size");
if (stmp.empty())
{ {
syslog(LOG_CRIT, "Cache/cache_size is not set"); logger->log(LOG_CRIT, "Cache/cache_size is not set");
throw runtime_error("Please set Cache/cache_size in the storagemanager.cnf file"); throw runtime_error("Please set Cache/cache_size in the storagemanager.cnf file");
} }
try try
{ {
maxCacheSize = stol(ssize); maxCacheSize = stoul(stmp);
} }
catch (invalid_argument &) catch (invalid_argument &)
{ {
syslog(LOG_CRIT, "Cache/cache_size is not a number"); logger->log(LOG_CRIT, "Cache/cache_size is not a number");
throw runtime_error("Please set Cache/cache_size to a number"); throw runtime_error("Please set Cache/cache_size to a number");
} }
//cout << "Cache got cache size " << maxCacheSize << endl; //cout << "Cache got cache size " << maxCacheSize << endl;
stmp = conf->getValue("ObjectStorage", "object_size");
if (stmp.empty())
{
logger->log(LOG_CRIT, "ObjectStorage/object_size is not set");
throw runtime_error("Please set ObjectStorage/object_size in the storagemanager.cnf file");
}
try
{
objectSize = stoul(stmp);
}
catch (invalid_argument &)
{
logger->log(LOG_CRIT, "ObjectStorage/object_size is not a number");
throw runtime_error("Please set ObjectStorage/object_size to a number");
}
prefix = conf->getValue("Cache", "path"); prefix = conf->getValue("Cache", "path");
if (prefix.empty()) if (prefix.empty())
{ {
syslog(LOG_CRIT, "Cache/path is not set"); logger->log(LOG_CRIT, "Cache/path is not set");
throw runtime_error("Please set Cache/path in the storagemanager.cnf file"); throw runtime_error("Please set Cache/path in the storagemanager.cnf file");
} }
try try
{ {
boost::filesystem::create_directories(prefix); boost::filesystem::create_directories(prefix);
@@ -82,6 +104,11 @@ void Cache::read(const vector<string> &keys)
// not in the cache, put it in the list to download // not in the cache, put it in the list to download
keysToFetch.push_back(&key); keysToFetch.push_back(&key);
} }
// TODO: get the sizes of the objects to download and make space
// For now using an estimate
makeSpace(keys.size() * objectSize);
s.unlock(); s.unlock();
// start downloading the keys to fetch // start downloading the keys to fetch
@@ -154,6 +181,10 @@ const boost::filesystem::path & Cache::getCachePath()
void Cache::exists(const vector<string> &keys, vector<bool> *out) void Cache::exists(const vector<string> &keys, vector<bool> *out)
{ {
out->resize(keys.size());
boost::unique_lock<boost::mutex> s(lru_mutex);
for (int i = 0; i < keys.size(); i++)
(*out)[i] = (m_lru.find(keys[i]) == m_lru.end());
} }
void Cache::newObject(const string &key, size_t size) void Cache::newObject(const string &key, size_t size)
@@ -164,12 +195,51 @@ void Cache::deletedObject(const string &key, size_t size)
{ {
} }
void Cache::setCacheSize(size_t size) void Cache::setMaxCacheSize(size_t size)
{ {
} }
// call this holding lru_mutex
void Cache::makeSpace(size_t size) void Cache::makeSpace(size_t size)
{ {
ssize_t thisMuch = currentCacheSize + size - maxCacheSize;
if (thisMuch <= 0)
return;
struct stat statbuf;
LRU_t::iterator it = lru.begin();
while (it != lru.end() && thisMuch > 0)
{
if (doNotEvict.find(it) != doNotEvict.end())
{
++it;
continue; // it's in the do-not-evict list
}
boost::filesystem::path cachedFile = prefix / *it;
int err = stat(cachedFile.string().c_str(), &statbuf);
if (err)
{
logger->log(LOG_WARNING, "Downloader: There seems to be a cached file that couldn't be stat'ed: %s", cachedFile.string().c_str());
++it;
continue;
}
/*
TODO: tell Synchronizer that this key will be evicted
delete the file
remove it from our structs
update current size
*/
assert(currentCacheSize >= statbuf.st_size);
currentCacheSize -= statbuf.st_size;
thisMuch -= statbuf.st_size;
sync->flushObject(*it);
boost::filesystem::remove(cachedFile);
LRU_t::iterator toRemove = it++;
lru.erase(toRemove);
m_lru.erase(*toRemove);
}
} }

View File

@@ -3,6 +3,8 @@
#define CACHE_H_ #define CACHE_H_
#include "Downloader.h" #include "Downloader.h"
#include "SMLogging.h"
#include "Synchronizer.h"
#include <string> #include <string>
#include <vector> #include <vector>
@@ -25,14 +27,21 @@ class Cache : public boost::noncopyable
void exists(const std::vector<std::string> &keys, std::vector<bool> *out); void exists(const std::vector<std::string> &keys, std::vector<bool> *out);
void newObject(const std::string &key, size_t size); void newObject(const std::string &key, size_t size);
void deletedObject(const std::string &key, size_t size); void deletedObject(const std::string &key, size_t size);
void setCacheSize(size_t size); void setMaxCacheSize(size_t size);
void makeSpace(size_t size); size_t getCurrentCacheSize();
// test helpers // test helpers
const boost::filesystem::path &getCachePath(); const boost::filesystem::path &getCachePath();
private: private:
boost::filesystem::path prefix; boost::filesystem::path prefix;
size_t maxCacheSize; size_t maxCacheSize;
size_t objectSize;
size_t currentCacheSize;
Downloader downloader;
Synchronizer *sync;
SMLogging *logger;
void makeSpace(size_t size);
/* The main cache structures */ /* The main cache structures */
// lru owns the string memory for the filenames it manages. m_lru and DNE point to those strings. // lru owns the string memory for the filenames it manages. m_lru and DNE point to those strings.
@@ -83,9 +92,6 @@ class Cache : public boost::noncopyable
void addToDNE(const LRU_t::iterator &key); void addToDNE(const LRU_t::iterator &key);
void removeFromDNE(const LRU_t::iterator &key); void removeFromDNE(const LRU_t::iterator &key);
boost::mutex lru_mutex; // protects the main cache structures & the do-not-evict set boost::mutex lru_mutex; // protects the main cache structures & the do-not-evict set
Downloader downloader;
}; };

View File

@@ -13,7 +13,7 @@ using namespace std;
namespace namespace
{ {
boost::mutex m; boost::mutex m;
storagemanager::CloudStorage *inst; storagemanager::CloudStorage *inst = NULL;
string tolower(const string &s) string tolower(const string &s)
{ {
@@ -29,10 +29,10 @@ namespace storagemanager
{ {
CloudStorage * CloudStorage::get() CloudStorage * CloudStorage::get()
{ {
SMLogging* logger = SMLogging::get();
if (inst) if (inst)
return inst; return inst;
SMLogging* logger = SMLogging::get();
Config *conf = Config::get(); Config *conf = Config::get();
string type = tolower(conf->getValue("ObjectStorage", "service")); string type = tolower(conf->getValue("ObjectStorage", "service"));
boost::mutex::scoped_lock s(m); boost::mutex::scoped_lock s(m);

View File

@@ -3,12 +3,12 @@
#define CONFIG_H_ #define CONFIG_H_
#include <boost/property_tree/ptree.hpp> #include <boost/property_tree/ptree.hpp>
//#include <boost/thread/mutex.hpp>
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <sys/types.h> #include <sys/types.h>
#include <string> #include <string>
/* TODO. Need a config change listener impl. */
namespace storagemanager namespace storagemanager
{ {

View File

@@ -1,5 +1,6 @@
#include "Downloader.h" #include "Downloader.h"
#include "Config.h" #include "Config.h"
#include "SMLogging.h"
#include <string> #include <string>
#include <errno.h> #include <errno.h>
#include <iostream> #include <iostream>
@@ -18,11 +19,12 @@ Downloader::Downloader() : maxDownloads(0)
} }
catch(invalid_argument) catch(invalid_argument)
{ {
// log something logger->log(LOG_WARNING, "Downloader: Invalid arg for ObjectStorage/max_concurrent_downloads, using default of 20");
} }
if (maxDownloads == 0) if (maxDownloads == 0)
maxDownloads = 20; maxDownloads = 20;
workers.reset(new ThreadPool(maxDownloads)); workers.reset(new ThreadPool(maxDownloads));
logger = SMLogging::get();
} }
Downloader::~Downloader() Downloader::~Downloader()
@@ -92,7 +94,11 @@ int Downloader::download(const vector<const string *> &keys, vector<int> *errnos
auto &dl = dls[i]; auto &dl = dls[i];
(*errnos)[i] = dl->dl_errno; (*errnos)[i] = dl->dl_errno;
if (dl->dl_errno != 0) if (dl->dl_errno != 0)
{
char buf[80];
logger->log(LOG_ERR, "Downloader: failed to download %s, got %s", keys[i]->c_str(), strerror_r(dl->dl_errno, buf, 80));
ret = -1; ret = -1;
}
} }
} }

View File

@@ -3,6 +3,7 @@
#include "ThreadPool.h" #include "ThreadPool.h"
#include "CloudStorage.h" #include "CloudStorage.h"
#include "SMLogging.h"
#include <unordered_set> #include <unordered_set>
#include <vector> #include <vector>
#include <string> #include <string>
@@ -68,6 +69,7 @@ class Downloader
boost::mutex &getDownloadMutex(); boost::mutex &getDownloadMutex();
boost::scoped_ptr<ThreadPool> workers; boost::scoped_ptr<ThreadPool> workers;
CloudStorage *storage; CloudStorage *storage;
SMLogging *logger;
}; };
} }

40
src/Synchronizer.cpp Normal file
View File

@@ -0,0 +1,40 @@
#include "Synchronizer.h"
#include <boost/thread/mutex.hpp>
using namespace std;
namespace
{
storagemanager::Synchronizer *instance = NULL;
boost::mutex inst_mutex;
}
namespace storagemanager
{
Synchronizer * Synchronizer::get()
{
if (instance)
return instance;
boost::unique_lock<boost::mutex> lock(inst_mutex);
if (instance)
return instance;
instance = new Synchronizer();
return instance;
}
Synchronizer::Synchronizer()
{
}
Synchronizer::~Synchronizer()
{
}
void Synchronizer::flushObject(const string &key)
{
}
}

24
src/Synchronizer.h Normal file
View File

@@ -0,0 +1,24 @@
#ifndef SYNCHRONIZER_H_
#define SYNCHRONIZER_H_
#include <string>
#include <boost/utility.hpp>
namespace storagemanager
{
class Synchronizer : public boost::noncopyable
{
public:
static Synchronizer *get();
virtual ~Synchronizer();
void flushObject(const std::string &key);
private:
Synchronizer();
};
}
#endif