diff --git a/CMakeLists.txt b/CMakeLists.txt index 568045d5d..667b0bdb0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -25,6 +25,7 @@ set(storagemanager_SRCS src/S3Storage.cpp src/LocalStorage.cpp src/Cache.cpp + src/Downloader.cpp ) option(TRACE "Enable some tracing output" OFF) diff --git a/src/Cache.cpp b/src/Cache.cpp index 914d79576..d3e27e786 100644 --- a/src/Cache.cpp +++ b/src/Cache.cpp @@ -1,9 +1,11 @@ #include "Cache.h" #include "Config.h" +#include "Downloader.h" #include #include #include +#include using namespace std; using namespace boost::filesystem; @@ -49,6 +51,7 @@ Cache::Cache() } cout << "Cache got prefix " << prefix << endl; + downloader.setDownloadPath(prefix.string()); } Cache::~Cache() @@ -57,8 +60,81 @@ Cache::~Cache() void Cache::read(const vector &keys) { +/* + move existing keys to a do-not-evict map + fetch keys that do not exist + after fetching, move all keys from do-not-evict to the back of the LRU +*/ + boost::unique_lock s(lru_mutex); + vector keysToFetch; + + uint i; + M_LRU_t::iterator mit; + + for (const string &key : keys) + { + mit = m_lru.find(key); + if (mit != m_lru.end()) + // it's in the cache, add the entry to the do-not-evict set + addToDNE(mit->lit); + else + // not in the cache, put it in the list to download + keysToFetch.push_back(&key); + } + s.unlock(); + + // start downloading the keys to fetch + if (!keysToFetch.empty()) + downloader.download(keysToFetch); + + s.lock(); + + // move all keys to the back of the LRU + for (const string &key : keys) + { + mit = m_lru.find(key); + if (mit != m_lru.end()) + lru.splice(lru.end(), lru, mit->lit); + else + { + lru.push_back(key); + m_lru.insert(M_LRU_element_t(&(lru.back()), lru.end()--)); + } + removeFromDNE(lru.end()); + } } +Cache::DNEElement::DNEElement(const LRU_t::iterator &k) : key(k), refCount(1) +{ +} + +void Cache::addToDNE(const LRU_t::iterator &key) +{ + DNEElement e(key); + DNE_t::iterator it = doNotEvict.find(e); + if (it != doNotEvict.end()) + { + DNEElement &dnee = const_cast(*it); + ++dnee.refCount; + } + else + doNotEvict.insert(e); +} + +void Cache::removeFromDNE(const LRU_t::iterator &key) +{ + DNEElement e(key); + DNE_t::iterator it = doNotEvict.find(e); + if (it == doNotEvict.end()) + return; + DNEElement &dnee = const_cast(*it); + if (--dnee.refCount == 0) + doNotEvict.erase(it); +} + + + + void Cache::exists(const vector &keys, vector *out) { } @@ -79,4 +155,40 @@ void Cache::makeSpace(size_t size) { } + +/* The helper classes */ + +Cache::M_LRU_element_t::M_LRU_element_t(const string *k) : key(k) +{} + +Cache::M_LRU_element_t::M_LRU_element_t(const string &k) : key(&k) +{} + +Cache::M_LRU_element_t::M_LRU_element_t(const string *k, const LRU_t::iterator &i) : key(k), lit(i) +{} + +inline size_t Cache::KeyHasher::operator()(const M_LRU_element_t &l) const +{ + return hash()(*(l.key)); } + +inline bool Cache::KeyEquals::operator()(const M_LRU_element_t &l1, const M_LRU_element_t &l2) const +{ + return (*(l1.key) == *(l2.key)); +} + +inline size_t Cache::DNEHasher::operator()(const DNEElement &l) const +{ + return hash()(*(l.key)); +} + +inline bool Cache::DNEEquals::operator()(const DNEElement &l1, const DNEElement &l2) const +{ + return (*(l1.key) == *(l2.key)); +} + +} + + + + diff --git a/src/Cache.h b/src/Cache.h index a32cac474..11df01d71 100644 --- a/src/Cache.h +++ b/src/Cache.h @@ -2,10 +2,15 @@ #ifndef CACHE_H_ #define CACHE_H_ +#include "Downloader.h" + #include #include +#include +#include #include #include +#include namespace storagemanager { @@ -26,9 +31,62 @@ class Cache : public boost::noncopyable private: boost::filesystem::path prefix; size_t maxCacheSize; + + /* The main cache structures */ + typedef std::list LRU_t; + LRU_t lru; + + struct M_LRU_element_t + { + M_LRU_element_t(const std::string &); + M_LRU_element_t(const std::string *); + M_LRU_element_t(const std::string *, const LRU_t::iterator &); + const std::string *key; + LRU_t::iterator lit; + }; + struct KeyHasher + { + size_t operator()(const M_LRU_element_t &l) const; + }; + + struct KeyEquals + { + bool operator()(const M_LRU_element_t &l1, const M_LRU_element_t &l2) const; + }; + + typedef std::unordered_set M_LRU_t; + M_LRU_t m_lru; // the LRU entries as a hash table + + /* The do-not-evict list stuff */ + struct DNEElement + { + DNEElement(const LRU_t::iterator &); + LRU_t::iterator key; + uint refCount; + }; + + struct DNEHasher + { + size_t operator()(const DNEElement &d) const; + }; + + struct DNEEquals + { + bool operator()(const DNEElement &d1, const DNEElement &d2) const; + }; + + typedef std::unordered_set DNE_t; + DNE_t doNotEvict; + void addToDNE(const LRU_t::iterator &key); + void removeFromDNE(const LRU_t::iterator &key); + boost::mutex lru_mutex; // protects the main cache structures & the do-not-evict set + + Downloader downloader; }; + + } #endif diff --git a/src/CloudStorage.h b/src/CloudStorage.h index 14d8924e6..43e69af12 100644 --- a/src/CloudStorage.h +++ b/src/CloudStorage.h @@ -10,6 +10,7 @@ namespace storagemanager class CloudStorage { public: + /* These behave like syscalls. return code -1 means an error, and errno is set */ virtual int getObject(const std::string &sourceKey, const std::string &destFile) = 0; virtual int putObject(const std::string &sourceFile, const std::string &destKey) = 0; virtual void deleteObject(const std::string &key) = 0; diff --git a/src/Config.cpp b/src/Config.cpp index 3cac02ac7..45e7df3a3 100644 --- a/src/Config.cpp +++ b/src/Config.cpp @@ -7,6 +7,9 @@ #include #include +#include +#include +#include using namespace std; @@ -30,7 +33,7 @@ Config * Config::get() return inst; } -Config::Config() +Config::Config() : die(false) { /* This will search the current directory, then $COLUMNSTORE_INSTALL_DIR/etc, @@ -59,7 +62,51 @@ Config::Config() } if (filename.empty()) throw runtime_error("Config: Could not find the config file for StorageManager"); - + + reloadInterval = boost::posix_time::seconds(60); + last_mtime = {0, 0}; + reload(); + reloader = boost::thread([this] { this->reloadThreadFcn(); }); +} + +Config::~Config() +{ + die = true; + reloader.interrupt(); + reloader.join(); +} + +void Config::reloadThreadFcn() +{ + while (!die) + { + try + { + reload(); + boost::this_thread::sleep(reloadInterval); + } + catch (boost::property_tree::ini_parser_error &e) + { + // log the error. how to log things now? + } + catch (boost::thread_interrupted) + {} + } +} + +void Config::reload() +{ + struct stat statbuf; + int err = stat(filename.c_str(), &statbuf); + if (err) + // log something + return; + if ((statbuf.st_mtim.tv_sec == last_mtime.tv_sec) && (statbuf.st_mtim.tv_nsec == last_mtime.tv_nsec)) + return; + last_mtime = statbuf.st_mtim; + + boost::unique_lock s(mutex); + contents.clear(); boost::property_tree::ini_parser::read_ini(filename, contents); } @@ -86,7 +133,10 @@ string expand_numbers(const boost::smatch &match) string Config::getValue(const string §ion, const string &key) const { // if we care, move this envvar substition stuff to where the file is loaded + boost::unique_lock s(mutex); string ret = contents.get(section + "." + key); + s.unlock(); + boost::regex re("\\$\\{(.+)\\}"); ret = boost::regex_replace(ret, re, use_envvar); diff --git a/src/Config.h b/src/Config.h index 56e185f38..8f103c53b 100644 --- a/src/Config.h +++ b/src/Config.h @@ -3,6 +3,10 @@ #define CONFIG_H_ #include +//#include +#include +#include + #include @@ -12,14 +16,23 @@ class Config : public boost::noncopyable { public: static Config *get(); + virtual ~Config(); std::string getValue(const std::string §ion, const std::string &key) const; private: Config(); + void reload(); + void reloadThreadFcn(); + struct ::timespec last_mtime; + mutable boost::mutex mutex; + boost::thread reloader; + boost::posix_time::time_duration reloadInterval; + std::string filename; boost::property_tree::ptree contents; + bool die; }; } diff --git a/src/Downloader.cpp b/src/Downloader.cpp new file mode 100644 index 000000000..65819e193 --- /dev/null +++ b/src/Downloader.cpp @@ -0,0 +1,119 @@ +#include "Downloader.h" +#include "Config.h" +#include +#include + +using namespace std; +namespace storagemanager +{ + +Downloader::Downloader() +{ + storage = CloudStorage::get(); + string sMaxDownloads = Config::get()->getValue("ObjectStorage", "max_concurrent_downloads"); + maxDownloads = stoi(sMaxDownloads); + if (maxDownloads == 0) + maxDownloads = 20; +} + +Downloader::~Downloader() +{ +} + +void Downloader::download(const vector &keys) +{ + uint counter = keys.size(); + boost::condition condvar; + boost::mutex m; + DownloadListener listener(&counter, &condvar, &m); + + boost::shared_ptr dls[keys.size()]; + bool inserted[keys.size()]; + Downloads_t::iterator iterators[keys.size()]; + + uint i; + + for (const string *key : keys) + dls[i].reset(new Download(key, this)); + + boost::unique_lock s(download_mutex); + for (i = 0; i < keys.size(); i++) + { + auto &dl = dls[i]; + pair ret = downloads.insert(dl); + iterators[i] = ret.first; + inserted[i] = ret.second; + + if (inserted[i]) + { + dl->listeners.push_back(&listener); + downloaders->addJob(dl); + } + else + (*iterators[i])->listeners.push_back(&listener); + } + s.unlock(); + + // wait for the downloads to finish + boost::unique_lock dl_lock(m); + while (counter > 0) + condvar.wait(dl_lock); + dl_lock.unlock(); + + // remove the entries inserted by this call + s.lock(); + for (i = 0; i < keys.size(); i++) + if (inserted[i]) + downloads.erase(iterators[i]); + + // TODO: check for errors & propagate +} + +void Downloader::setDownloadPath(const string &path) +{ + downloadPath = path; +} + +inline const string & Downloader::getDownloadPath() const +{ + return downloadPath; +} + +/* The helper fcns */ +Downloader::Download::Download(const string *source, Downloader *dl) : key(source), dler(dl), dl_errno(0) +{ +} + +void Downloader::Download::operator()() +{ + CloudStorage *storage = CloudStorage::get(); + int err = storage->getObject(*key, dler->getDownloadPath() + *key); + if (err != 0) + dl_errno = errno; + + for (auto &listener : listeners) + listener->downloadFinished(); +} + +Downloader::DownloadListener::DownloadListener(uint *counter, boost::condition *condvar, boost::mutex *m) : count(counter), cond(condvar), mutex(m) +{ +} + +void Downloader::DownloadListener::downloadFinished() +{ + boost::unique_lock u(*mutex); + if (--(*count) == 0) + cond->notify_all(); +} + +inline size_t Downloader::DLHasher::operator()(const boost::shared_ptr &d) const +{ + return hash()(*(d->key)); +} + +inline bool Downloader::DLEquals::operator()(const boost::shared_ptr &d1, const boost::shared_ptr &d2) const +{ + return (*(d1->key) == *(d2->key)); +} + +} diff --git a/src/Downloader.h b/src/Downloader.h new file mode 100644 index 000000000..d52b3073d --- /dev/null +++ b/src/Downloader.h @@ -0,0 +1,73 @@ +#ifndef DOWNLOADER_H_ +#define DOWNLOADER_H_ + +#include "ThreadPool.h" +#include "CloudStorage.h" +#include +#include +#include + +#include +#include +#include +#include + +namespace storagemanager +{ + +class Downloader +{ + public: + Downloader(); + virtual ~Downloader(); + + void download(const std::vector &keys); + void setDownloadPath(const std::string &path); + const std::string & getDownloadPath() const; + + private: + uint maxDownloads; + std::string downloadPath; + + class DownloadListener + { + public: + DownloadListener(uint *counter, boost::condition *condvar, boost::mutex *m); + void downloadFinished(); + private: + uint *count; + boost::condition *cond; + boost::mutex *mutex; + }; + + struct Download : public ThreadPool::Job + { + Download(const std::string *source, Downloader *); + void operator()(); + Downloader *dler; + const std::string *key; + int dl_errno; // to propagate errors from the download job to the caller + std::vector listeners; + }; + + struct DLHasher + { + size_t operator()(const boost::shared_ptr &d) const; + }; + + struct DLEquals + { + bool operator()(const boost::shared_ptr &d1, const boost::shared_ptr &d2) const; + }; + + typedef std::unordered_set, DLHasher, DLEquals> Downloads_t; + Downloads_t downloads; + boost::mutex download_mutex; + + boost::scoped_ptr downloaders; + CloudStorage *storage; +}; + +} + +#endif diff --git a/src/ThreadPool.cpp b/src/ThreadPool.cpp index 9cf5d8abf..7b7f7278b 100644 --- a/src/ThreadPool.cpp +++ b/src/ThreadPool.cpp @@ -1,8 +1,5 @@ - - #include "ThreadPool.h" -#include using namespace std; @@ -18,36 +15,75 @@ ThreadPool::ThreadPool() : maxThreads(1000), die(false), threadsWaiting(0) ThreadPool::ThreadPool(uint num_threads) : maxThreads(num_threads), die(false), threadsWaiting(0) { + pruner = boost::thread([this] { this->pruner_fcn(); } ); } ThreadPool::~ThreadPool() { - boost::mutex::scoped_lock s(m); + boost::unique_lock s(m); die = true; jobs.clear(); jobAvailable.notify_all(); s.unlock(); - for (uint i = 0; i < threads.size(); i++) - threads[i]->join(); + threads.join_all(); + pruner.interrupt(); + pruner.join(); } void ThreadPool::addJob(const boost::shared_ptr &j) { - boost::mutex::scoped_lock s(m); + boost::unique_lock s(m); jobs.push_back(j); // Start another thread if necessary if (threadsWaiting == 0 && threads.size() < maxThreads) { - boost::shared_ptr thread(new boost::thread(Runner(this))); - threads.push_back(thread); + boost::thread *thread = threads.create_thread([this] { this->processingLoop(); }); + s_threads.insert(thread); } else jobAvailable.notify_one(); } +void ThreadPool::pruner_fcn() +{ + while (!die) + { + prune(); + try { + boost::this_thread::sleep(idleThreadTimeout); + } + catch(boost::thread_interrupted) + {} + } +} + +void ThreadPool::prune() +{ + boost::unique_lock s(m); + set::iterator it, to_remove; + it = s_threads.begin(); + while (it != s_threads.end()) + { + if ((*it)->joinable()) + { + (*it)->join(); + threads.remove_thread(*it); + to_remove = it++; + s_threads.erase(to_remove); + } + else + ++it; + } +} + +void ThreadPool::setMaxThreads(uint newMax) +{ + maxThreads = newMax; +} + void ThreadPool::processingLoop() { - boost::mutex::scoped_lock s(m, boost::defer_lock); + boost::unique_lock s(m, boost::defer_lock); while (!die) { @@ -55,8 +91,10 @@ void ThreadPool::processingLoop() while (jobs.empty() && !die) { threadsWaiting++; - jobAvailable.wait(s); + bool timedout = !jobAvailable.timed_wait<>(s, idleThreadTimeout); threadsWaiting--; + if (timedout) + return; } if (die) return; diff --git a/src/ThreadPool.h b/src/ThreadPool.h index 13b69f234..8669e7c1f 100644 --- a/src/ThreadPool.h +++ b/src/ThreadPool.h @@ -2,8 +2,8 @@ #ifndef SM_THREADPOOL_H_ #define SM_THREADPOOL_H_ -#include #include +#include #include #include @@ -25,6 +25,7 @@ class ThreadPool : public boost::noncopyable }; void addJob(const boost::shared_ptr &j); + void setMaxThreads(uint newMax); private: struct Runner { @@ -38,10 +39,16 @@ class ThreadPool : public boost::noncopyable uint maxThreads; bool die; int threadsWaiting; - std::vector > threads; + boost::thread_group threads; + std::set s_threads; boost::condition jobAvailable; std::deque > jobs; boost::mutex m; + + const boost::posix_time::time_duration idleThreadTimeout = boost::posix_time::seconds(60); + boost::thread pruner; + void pruner_fcn(); + void prune(); }; diff --git a/storagemanager.cnf b/storagemanager.cnf index 212a6ae3d..d09f205c6 100644 --- a/storagemanager.cnf +++ b/storagemanager.cnf @@ -2,7 +2,7 @@ service = S3 object_size = 5M metadata_path = ${HOME}/storagemanager/metadata - +max_concurrent_downloads = 20 [S3] region = us-west-1