1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-13 23:02:14 +03:00

Checkpointing work on Cache::read() functionality.

It builds but 0% chance it works here.
This commit is contained in:
Patrick LeBlanc
2019-03-06 12:53:04 -06:00
parent 2619aa8983
commit 3c84e8d749
11 changed files with 488 additions and 16 deletions

View File

@@ -25,6 +25,7 @@ set(storagemanager_SRCS
src/S3Storage.cpp src/S3Storage.cpp
src/LocalStorage.cpp src/LocalStorage.cpp
src/Cache.cpp src/Cache.cpp
src/Downloader.cpp
) )
option(TRACE "Enable some tracing output" OFF) option(TRACE "Enable some tracing output" OFF)

View File

@@ -1,9 +1,11 @@
#include "Cache.h" #include "Cache.h"
#include "Config.h" #include "Config.h"
#include "Downloader.h"
#include <iostream> #include <iostream>
#include <syslog.h> #include <syslog.h>
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <boost/thread.hpp>
using namespace std; using namespace std;
using namespace boost::filesystem; using namespace boost::filesystem;
@@ -49,6 +51,7 @@ Cache::Cache()
} }
cout << "Cache got prefix " << prefix << endl; cout << "Cache got prefix " << prefix << endl;
downloader.setDownloadPath(prefix.string());
} }
Cache::~Cache() Cache::~Cache()
@@ -57,8 +60,81 @@ Cache::~Cache()
void Cache::read(const vector<string> &keys) void Cache::read(const vector<string> &keys)
{ {
/*
move existing keys to a do-not-evict map
fetch keys that do not exist
after fetching, move all keys from do-not-evict to the back of the LRU
*/
boost::unique_lock<boost::mutex> s(lru_mutex);
vector<const string *> keysToFetch;
uint i;
M_LRU_t::iterator mit;
for (const string &key : keys)
{
mit = m_lru.find(key);
if (mit != m_lru.end())
// it's in the cache, add the entry to the do-not-evict set
addToDNE(mit->lit);
else
// not in the cache, put it in the list to download
keysToFetch.push_back(&key);
}
s.unlock();
// start downloading the keys to fetch
if (!keysToFetch.empty())
downloader.download(keysToFetch);
s.lock();
// move all keys to the back of the LRU
for (const string &key : keys)
{
mit = m_lru.find(key);
if (mit != m_lru.end())
lru.splice(lru.end(), lru, mit->lit);
else
{
lru.push_back(key);
m_lru.insert(M_LRU_element_t(&(lru.back()), lru.end()--));
}
removeFromDNE(lru.end());
}
} }
Cache::DNEElement::DNEElement(const LRU_t::iterator &k) : key(k), refCount(1)
{
}
void Cache::addToDNE(const LRU_t::iterator &key)
{
DNEElement e(key);
DNE_t::iterator it = doNotEvict.find(e);
if (it != doNotEvict.end())
{
DNEElement &dnee = const_cast<DNEElement &>(*it);
++dnee.refCount;
}
else
doNotEvict.insert(e);
}
void Cache::removeFromDNE(const LRU_t::iterator &key)
{
DNEElement e(key);
DNE_t::iterator it = doNotEvict.find(e);
if (it == doNotEvict.end())
return;
DNEElement &dnee = const_cast<DNEElement &>(*it);
if (--dnee.refCount == 0)
doNotEvict.erase(it);
}
void Cache::exists(const vector<string> &keys, vector<bool> *out) void Cache::exists(const vector<string> &keys, vector<bool> *out)
{ {
} }
@@ -79,4 +155,40 @@ void Cache::makeSpace(size_t size)
{ {
} }
/* The helper classes */
Cache::M_LRU_element_t::M_LRU_element_t(const string *k) : key(k)
{}
Cache::M_LRU_element_t::M_LRU_element_t(const string &k) : key(&k)
{}
Cache::M_LRU_element_t::M_LRU_element_t(const string *k, const LRU_t::iterator &i) : key(k), lit(i)
{}
inline size_t Cache::KeyHasher::operator()(const M_LRU_element_t &l) const
{
return hash<string>()(*(l.key));
} }
inline bool Cache::KeyEquals::operator()(const M_LRU_element_t &l1, const M_LRU_element_t &l2) const
{
return (*(l1.key) == *(l2.key));
}
inline size_t Cache::DNEHasher::operator()(const DNEElement &l) const
{
return hash<string>()(*(l.key));
}
inline bool Cache::DNEEquals::operator()(const DNEElement &l1, const DNEElement &l2) const
{
return (*(l1.key) == *(l2.key));
}
}

View File

@@ -2,10 +2,15 @@
#ifndef CACHE_H_ #ifndef CACHE_H_
#define CACHE_H_ #define CACHE_H_
#include "Downloader.h"
#include <string> #include <string>
#include <vector> #include <vector>
#include <list>
#include <unordered_set>
#include <boost/utility.hpp> #include <boost/utility.hpp>
#include <boost/filesystem/path.hpp> #include <boost/filesystem/path.hpp>
#include <boost/thread/mutex.hpp>
namespace storagemanager namespace storagemanager
{ {
@@ -26,9 +31,62 @@ class Cache : public boost::noncopyable
private: private:
boost::filesystem::path prefix; boost::filesystem::path prefix;
size_t maxCacheSize; size_t maxCacheSize;
/* The main cache structures */
typedef std::list<std::string> LRU_t;
LRU_t lru;
struct M_LRU_element_t
{
M_LRU_element_t(const std::string &);
M_LRU_element_t(const std::string *);
M_LRU_element_t(const std::string *, const LRU_t::iterator &);
const std::string *key;
LRU_t::iterator lit;
};
struct KeyHasher
{
size_t operator()(const M_LRU_element_t &l) const;
};
struct KeyEquals
{
bool operator()(const M_LRU_element_t &l1, const M_LRU_element_t &l2) const;
};
typedef std::unordered_set<M_LRU_element_t, KeyHasher, KeyEquals> M_LRU_t;
M_LRU_t m_lru; // the LRU entries as a hash table
/* The do-not-evict list stuff */
struct DNEElement
{
DNEElement(const LRU_t::iterator &);
LRU_t::iterator key;
uint refCount;
};
struct DNEHasher
{
size_t operator()(const DNEElement &d) const;
};
struct DNEEquals
{
bool operator()(const DNEElement &d1, const DNEElement &d2) const;
};
typedef std::unordered_set<DNEElement, DNEHasher, DNEEquals> DNE_t;
DNE_t doNotEvict;
void addToDNE(const LRU_t::iterator &key);
void removeFromDNE(const LRU_t::iterator &key);
boost::mutex lru_mutex; // protects the main cache structures & the do-not-evict set
Downloader downloader;
}; };
} }
#endif #endif

View File

@@ -10,6 +10,7 @@ namespace storagemanager
class CloudStorage class CloudStorage
{ {
public: public:
/* These behave like syscalls. return code -1 means an error, and errno is set */
virtual int getObject(const std::string &sourceKey, const std::string &destFile) = 0; virtual int getObject(const std::string &sourceKey, const std::string &destFile) = 0;
virtual int putObject(const std::string &sourceFile, const std::string &destKey) = 0; virtual int putObject(const std::string &sourceFile, const std::string &destKey) = 0;
virtual void deleteObject(const std::string &key) = 0; virtual void deleteObject(const std::string &key) = 0;

View File

@@ -7,6 +7,9 @@
#include <stdlib.h> #include <stdlib.h>
#include <vector> #include <vector>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
using namespace std; using namespace std;
@@ -30,7 +33,7 @@ Config * Config::get()
return inst; return inst;
} }
Config::Config() Config::Config() : die(false)
{ {
/* This will search the current directory, /* This will search the current directory,
then $COLUMNSTORE_INSTALL_DIR/etc, then $COLUMNSTORE_INSTALL_DIR/etc,
@@ -59,7 +62,51 @@ Config::Config()
} }
if (filename.empty()) if (filename.empty())
throw runtime_error("Config: Could not find the config file for StorageManager"); throw runtime_error("Config: Could not find the config file for StorageManager");
reloadInterval = boost::posix_time::seconds(60);
last_mtime = {0, 0};
reload();
reloader = boost::thread([this] { this->reloadThreadFcn(); });
}
Config::~Config()
{
die = true;
reloader.interrupt();
reloader.join();
}
void Config::reloadThreadFcn()
{
while (!die)
{
try
{
reload();
boost::this_thread::sleep(reloadInterval);
}
catch (boost::property_tree::ini_parser_error &e)
{
// log the error. how to log things now?
}
catch (boost::thread_interrupted)
{}
}
}
void Config::reload()
{
struct stat statbuf;
int err = stat(filename.c_str(), &statbuf);
if (err)
// log something
return;
if ((statbuf.st_mtim.tv_sec == last_mtime.tv_sec) && (statbuf.st_mtim.tv_nsec == last_mtime.tv_nsec))
return;
last_mtime = statbuf.st_mtim;
boost::unique_lock<boost::mutex> s(mutex);
contents.clear();
boost::property_tree::ini_parser::read_ini(filename, contents); boost::property_tree::ini_parser::read_ini(filename, contents);
} }
@@ -86,7 +133,10 @@ string expand_numbers(const boost::smatch &match)
string Config::getValue(const string &section, const string &key) const string Config::getValue(const string &section, const string &key) const
{ {
// if we care, move this envvar substition stuff to where the file is loaded // if we care, move this envvar substition stuff to where the file is loaded
boost::unique_lock<boost::mutex> s(mutex);
string ret = contents.get<string>(section + "." + key); string ret = contents.get<string>(section + "." + key);
s.unlock();
boost::regex re("\\$\\{(.+)\\}"); boost::regex re("\\$\\{(.+)\\}");
ret = boost::regex_replace(ret, re, use_envvar); ret = boost::regex_replace(ret, re, use_envvar);

View File

@@ -3,6 +3,10 @@
#define CONFIG_H_ #define CONFIG_H_
#include <boost/property_tree/ptree.hpp> #include <boost/property_tree/ptree.hpp>
//#include <boost/thread/mutex.hpp>
#include <boost/thread.hpp>
#include <sys/types.h>
#include <string> #include <string>
@@ -12,14 +16,23 @@ class Config : public boost::noncopyable
{ {
public: public:
static Config *get(); static Config *get();
virtual ~Config();
std::string getValue(const std::string &section, const std::string &key) const; std::string getValue(const std::string &section, const std::string &key) const;
private: private:
Config(); Config();
void reload();
void reloadThreadFcn();
struct ::timespec last_mtime;
mutable boost::mutex mutex;
boost::thread reloader;
boost::posix_time::time_duration reloadInterval;
std::string filename; std::string filename;
boost::property_tree::ptree contents; boost::property_tree::ptree contents;
bool die;
}; };
} }

119
src/Downloader.cpp Normal file
View File

@@ -0,0 +1,119 @@
#include "Downloader.h"
#include "Config.h"
#include <string>
#include <errno.h>
using namespace std;
namespace storagemanager
{
Downloader::Downloader()
{
storage = CloudStorage::get();
string sMaxDownloads = Config::get()->getValue("ObjectStorage", "max_concurrent_downloads");
maxDownloads = stoi(sMaxDownloads);
if (maxDownloads == 0)
maxDownloads = 20;
}
Downloader::~Downloader()
{
}
void Downloader::download(const vector<const string *> &keys)
{
uint counter = keys.size();
boost::condition condvar;
boost::mutex m;
DownloadListener listener(&counter, &condvar, &m);
boost::shared_ptr<Download> dls[keys.size()];
bool inserted[keys.size()];
Downloads_t::iterator iterators[keys.size()];
uint i;
for (const string *key : keys)
dls[i].reset(new Download(key, this));
boost::unique_lock<boost::mutex> s(download_mutex);
for (i = 0; i < keys.size(); i++)
{
auto &dl = dls[i];
pair<Downloads_t::iterator, bool> ret = downloads.insert(dl);
iterators[i] = ret.first;
inserted[i] = ret.second;
if (inserted[i])
{
dl->listeners.push_back(&listener);
downloaders->addJob(dl);
}
else
(*iterators[i])->listeners.push_back(&listener);
}
s.unlock();
// wait for the downloads to finish
boost::unique_lock<boost::mutex> dl_lock(m);
while (counter > 0)
condvar.wait(dl_lock);
dl_lock.unlock();
// remove the entries inserted by this call
s.lock();
for (i = 0; i < keys.size(); i++)
if (inserted[i])
downloads.erase(iterators[i]);
// TODO: check for errors & propagate
}
void Downloader::setDownloadPath(const string &path)
{
downloadPath = path;
}
inline const string & Downloader::getDownloadPath() const
{
return downloadPath;
}
/* The helper fcns */
Downloader::Download::Download(const string *source, Downloader *dl) : key(source), dler(dl), dl_errno(0)
{
}
void Downloader::Download::operator()()
{
CloudStorage *storage = CloudStorage::get();
int err = storage->getObject(*key, dler->getDownloadPath() + *key);
if (err != 0)
dl_errno = errno;
for (auto &listener : listeners)
listener->downloadFinished();
}
Downloader::DownloadListener::DownloadListener(uint *counter, boost::condition *condvar, boost::mutex *m) : count(counter), cond(condvar), mutex(m)
{
}
void Downloader::DownloadListener::downloadFinished()
{
boost::unique_lock<boost::mutex> u(*mutex);
if (--(*count) == 0)
cond->notify_all();
}
inline size_t Downloader::DLHasher::operator()(const boost::shared_ptr<Download> &d) const
{
return hash<string>()(*(d->key));
}
inline bool Downloader::DLEquals::operator()(const boost::shared_ptr<Download> &d1, const boost::shared_ptr<Download> &d2) const
{
return (*(d1->key) == *(d2->key));
}
}

73
src/Downloader.h Normal file
View File

@@ -0,0 +1,73 @@
#ifndef DOWNLOADER_H_
#define DOWNLOADER_H_
#include "ThreadPool.h"
#include "CloudStorage.h"
#include <unordered_set>
#include <vector>
#include <string>
#include <boost/shared_ptr.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
namespace storagemanager
{
class Downloader
{
public:
Downloader();
virtual ~Downloader();
void download(const std::vector<const std::string *> &keys);
void setDownloadPath(const std::string &path);
const std::string & getDownloadPath() const;
private:
uint maxDownloads;
std::string downloadPath;
class DownloadListener
{
public:
DownloadListener(uint *counter, boost::condition *condvar, boost::mutex *m);
void downloadFinished();
private:
uint *count;
boost::condition *cond;
boost::mutex *mutex;
};
struct Download : public ThreadPool::Job
{
Download(const std::string *source, Downloader *);
void operator()();
Downloader *dler;
const std::string *key;
int dl_errno; // to propagate errors from the download job to the caller
std::vector<DownloadListener *> listeners;
};
struct DLHasher
{
size_t operator()(const boost::shared_ptr<Download> &d) const;
};
struct DLEquals
{
bool operator()(const boost::shared_ptr<Download> &d1, const boost::shared_ptr<Download> &d2) const;
};
typedef std::unordered_set<boost::shared_ptr<Download>, DLHasher, DLEquals> Downloads_t;
Downloads_t downloads;
boost::mutex download_mutex;
boost::scoped_ptr<ThreadPool> downloaders;
CloudStorage *storage;
};
}
#endif

View File

@@ -1,8 +1,5 @@
#include "ThreadPool.h" #include "ThreadPool.h"
#include <boost/thread/mutex.hpp>
using namespace std; using namespace std;
@@ -18,36 +15,75 @@ ThreadPool::ThreadPool() : maxThreads(1000), die(false), threadsWaiting(0)
ThreadPool::ThreadPool(uint num_threads) : maxThreads(num_threads), die(false), threadsWaiting(0) ThreadPool::ThreadPool(uint num_threads) : maxThreads(num_threads), die(false), threadsWaiting(0)
{ {
pruner = boost::thread([this] { this->pruner_fcn(); } );
} }
ThreadPool::~ThreadPool() ThreadPool::~ThreadPool()
{ {
boost::mutex::scoped_lock s(m); boost::unique_lock<boost::mutex> s(m);
die = true; die = true;
jobs.clear(); jobs.clear();
jobAvailable.notify_all(); jobAvailable.notify_all();
s.unlock(); s.unlock();
for (uint i = 0; i < threads.size(); i++) threads.join_all();
threads[i]->join(); pruner.interrupt();
pruner.join();
} }
void ThreadPool::addJob(const boost::shared_ptr<Job> &j) void ThreadPool::addJob(const boost::shared_ptr<Job> &j)
{ {
boost::mutex::scoped_lock s(m); boost::unique_lock<boost::mutex> s(m);
jobs.push_back(j); jobs.push_back(j);
// Start another thread if necessary // Start another thread if necessary
if (threadsWaiting == 0 && threads.size() < maxThreads) { if (threadsWaiting == 0 && threads.size() < maxThreads) {
boost::shared_ptr<boost::thread> thread(new boost::thread(Runner(this))); boost::thread *thread = threads.create_thread([this] { this->processingLoop(); });
threads.push_back(thread); s_threads.insert(thread);
} }
else else
jobAvailable.notify_one(); jobAvailable.notify_one();
} }
void ThreadPool::pruner_fcn()
{
while (!die)
{
prune();
try {
boost::this_thread::sleep(idleThreadTimeout);
}
catch(boost::thread_interrupted)
{}
}
}
void ThreadPool::prune()
{
boost::unique_lock<boost::mutex> s(m);
set<boost::thread *>::iterator it, to_remove;
it = s_threads.begin();
while (it != s_threads.end())
{
if ((*it)->joinable())
{
(*it)->join();
threads.remove_thread(*it);
to_remove = it++;
s_threads.erase(to_remove);
}
else
++it;
}
}
void ThreadPool::setMaxThreads(uint newMax)
{
maxThreads = newMax;
}
void ThreadPool::processingLoop() void ThreadPool::processingLoop()
{ {
boost::mutex::scoped_lock s(m, boost::defer_lock); boost::unique_lock<boost::mutex> s(m, boost::defer_lock);
while (!die) while (!die)
{ {
@@ -55,8 +91,10 @@ void ThreadPool::processingLoop()
while (jobs.empty() && !die) while (jobs.empty() && !die)
{ {
threadsWaiting++; threadsWaiting++;
jobAvailable.wait(s); bool timedout = !jobAvailable.timed_wait<>(s, idleThreadTimeout);
threadsWaiting--; threadsWaiting--;
if (timedout)
return;
} }
if (die) if (die)
return; return;

View File

@@ -2,8 +2,8 @@
#ifndef SM_THREADPOOL_H_ #ifndef SM_THREADPOOL_H_
#define SM_THREADPOOL_H_ #define SM_THREADPOOL_H_
#include <list>
#include <deque> #include <deque>
#include <set>
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <boost/thread/condition.hpp> #include <boost/thread/condition.hpp>
@@ -25,6 +25,7 @@ class ThreadPool : public boost::noncopyable
}; };
void addJob(const boost::shared_ptr<Job> &j); void addJob(const boost::shared_ptr<Job> &j);
void setMaxThreads(uint newMax);
private: private:
struct Runner { struct Runner {
@@ -38,10 +39,16 @@ class ThreadPool : public boost::noncopyable
uint maxThreads; uint maxThreads;
bool die; bool die;
int threadsWaiting; int threadsWaiting;
std::vector<boost::shared_ptr<boost::thread> > threads; boost::thread_group threads;
std::set<boost::thread *> s_threads;
boost::condition jobAvailable; boost::condition jobAvailable;
std::deque<boost::shared_ptr<Job> > jobs; std::deque<boost::shared_ptr<Job> > jobs;
boost::mutex m; boost::mutex m;
const boost::posix_time::time_duration idleThreadTimeout = boost::posix_time::seconds(60);
boost::thread pruner;
void pruner_fcn();
void prune();
}; };

View File

@@ -2,7 +2,7 @@
service = S3 service = S3
object_size = 5M object_size = 5M
metadata_path = ${HOME}/storagemanager/metadata metadata_path = ${HOME}/storagemanager/metadata
max_concurrent_downloads = 20
[S3] [S3]
region = us-west-1 region = us-west-1