diff --git a/CMakeLists.txt b/CMakeLists.txt index f8139d16e..bbd16f973 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -86,6 +86,8 @@ set(storagemanager_SRCS src/MetadataFile.cpp src/Replicator.cpp src/Utilities.cpp + src/Ownership.cpp + src/PrefixCache.cpp ) option(TRACE "Enable some tracing output" OFF) diff --git a/src/Cache.cpp b/src/Cache.cpp index 4a73f793f..4de14bd7e 100644 --- a/src/Cache.cpp +++ b/src/Cache.cpp @@ -34,11 +34,10 @@ Cache * Cache::get() return inst; } -Cache::Cache() : currentCacheSize(0) +Cache::Cache() { Config *conf = Config::get(); logger = SMLogging::get(); - replicator = Replicator::get(); string stmp = conf->getValue("Cache", "cache_size"); if (stmp.empty()) @@ -73,8 +72,8 @@ Cache::Cache() : currentCacheSize(0) throw runtime_error("Please set ObjectStorage/object_size to a number"); } - prefix = conf->getValue("Cache", "path"); - if (prefix.empty()) + cachePrefix = conf->getValue("Cache", "path"); + if (cachePrefix.empty()) { logger->log(LOG_CRIT, "Cache/path is not set"); throw runtime_error("Please set Cache/path in the storagemanager.cnf file"); @@ -82,17 +81,16 @@ Cache::Cache() : currentCacheSize(0) try { - bf::create_directories(prefix); + bf::create_directories(cachePrefix); } catch (exception &e) { - logger->log(LOG_CRIT, "Failed to create %s, got: %s", prefix.string().c_str(), e.what()); + logger->log(LOG_CRIT, "Failed to create %s, got: %s", cachePrefix.string().c_str(), e.what()); throw e; } - //cout << "Cache got prefix " << prefix << endl; + //cout << "Cache got cachePrefix " << cachePrefix << endl; downloader.reset(new Downloader()); - downloader->setDownloadPath(prefix.string()); - downloader->useThisLock(&lru_mutex); + downloader->setDownloadPath(cachePrefix); stmp = conf->getValue("ObjectStorage", "journal_path"); if (stmp.empty()) @@ -111,64 +109,12 @@ Cache::Cache() : currentCacheSize(0) logger->log(LOG_CRIT, "Failed to create %s, got: %s", journalPrefix.string().c_str(), e.what()); throw e; } - - lru_mutex.lock(); // unlocked by populate() when it's done - boost::thread t([this] { this->populate(); }); - t.detach(); } Cache::~Cache() { -} - -void Cache::populate() -{ - Synchronizer *sync = Synchronizer::get(); - bf::directory_iterator dir(prefix); - bf::directory_iterator dend; - vector newObjects; - while (dir != dend) - { - // put everything in lru & m_lru - const bf::path &p = dir->path(); - if (bf::is_regular_file(p)) - { - lru.push_back(p.filename().string()); - auto last = lru.end(); - m_lru.insert(--last); - currentCacheSize += bf::file_size(*dir); - newObjects.push_back(p.filename().string()); - } - else - logger->log(LOG_WARNING, "Cache: found something in the cache that does not belong '%s'", p.string().c_str()); - ++dir; - } - sync->newObjects(newObjects); - newObjects.clear(); - - // account for what's in the journal dir - vector > newJournals; - dir = bf::directory_iterator(journalPrefix); - while (dir != dend) - { - const bf::path &p = dir->path(); - if (bf::is_regular_file(p)) - { - if (p.extension() == ".journal") - { - size_t s = bf::file_size(*dir); - currentCacheSize += s; - newJournals.push_back(pair(p.stem().string(), s)); - } - else - logger->log(LOG_WARNING, "Cache: found a file in the journal dir that does not belong '%s'", p.string().c_str()); - } - else - logger->log(LOG_WARNING, "Cache: found something in the journal dir that does not belong '%s'", p.string().c_str()); - ++dir; - } - lru_mutex.unlock(); - sync->newJournalEntries(newJournals); + for (auto it = prefixCaches.begin(); it != prefixCaches.end(); ++it) + delete it->second; } // be careful using this! SM should be idle. No ongoing reads or writes. @@ -176,308 +122,88 @@ void Cache::validateCacheSize() { boost::unique_lock s(lru_mutex); - if (!doNotEvict.empty() || !toBeDeleted.empty()) - { - cout << "Not safe to use validateCacheSize() at the moment." << endl; - return; - } - - size_t oldSize = currentCacheSize; - currentCacheSize = 0; - m_lru.clear(); - lru.clear(); - populate(); - - if (oldSize != currentCacheSize) - logger->log(LOG_DEBUG, "Cache::validateCacheSize(): found a discrepancy. Actual size is %lld, had %lld.", - currentCacheSize, oldSize); - else - logger->log(LOG_DEBUG, "Cache::validateCacheSize(): Cache size accounting agrees with reality for now."); + for (auto it = prefixCaches.begin(); it != prefixCaches.end(); ++it) + it->second->validateCacheSize(); } - - -#if 0 -/* Need to simplify this, we keep running into sneaky problems, and I just spotted a couple more. - Just going to rewrite it. We can revisit later if the simplified version needs improvement */ -void Cache::read(const vector &keys) -{ -/* - move existing keys to a do-not-evict map - fetch keys that do not exist - after fetching, move all keys from do-not-evict to the back of the LRU -*/ - boost::unique_lock s(lru_mutex); - vector keysToFetch; - - uint i; - M_LRU_t::iterator mit; - - for (const string &key : keys) - { - mit = m_lru.find(key); - if (mit != m_lru.end()) - // it's in the cache, add the entry to the do-not-evict set - addToDNE(mit->lit); - else - // not in the cache, put it in the list to download - keysToFetch.push_back(&key); - } - //s.unlock(); - - // start downloading the keys to fetch - // TODO: there should be a path for the common case, which is that there is nothing - // to download. - vector dl_errnos; - vector sizes; - if (!keysToFetch.empty()) - downloader->download(keysToFetch, &dl_errnos, &sizes, lru_mutex); - - size_t sum_sizes = 0; - for (size_t &size : sizes) - sum_sizes += size; - - //s.lock(); - // do makespace() before downloading. Problem is, until the download is finished, this fcn can't tell which - // downloads it was responsible for. Need Downloader to make the call...? - _makeSpace(sum_sizes); - currentCacheSize += sum_sizes; - - // move all keys to the back of the LRU - for (i = 0; i < keys.size(); i++) - { - mit = m_lru.find(keys[i]); - if (mit != m_lru.end()) - { - lru.splice(lru.end(), lru, mit->lit); - LRU_t::iterator lit = lru.end(); - removeFromDNE(--lit); - } - else if (dl_errnos[i] == 0) // successful download - { - lru.push_back(keys[i]); - LRU_t::iterator lit = lru.end(); - m_lru.insert(M_LRU_element_t(--lit)); - } - else - { - // Downloader already logged it, anything to do here? - /* brainstorming options for handling it. - 1) Should it be handled? The caller will log a file-not-found error, and there will be a download - failure in the log already. - 2) Can't really DO anything can it? - */ - } - } -} -#endif // new simplified version -void Cache::read(const vector &keys) +void Cache::read(const bf::path &prefix, const vector &keys) { - /* Move existing keys to the back of the LRU, start downloading nonexistant keys. - */ - vector keysToFetch; - vector dlErrnos; - vector dlSizes; - - boost::unique_lock s(lru_mutex); - - M_LRU_t::iterator mit; - for (const string &key : keys) - { - mit = m_lru.find(key); - if (mit != m_lru.end()) - { - addToDNE(mit->lit); - lru.splice(lru.end(), lru, mit->lit); // move them to the back so they are last to pick for eviction - } - else - { - // There's window where the file has been downloaded but is not yet - // added to the lru structs. However it is in the DNE. If it is in the DNE, then it is also - // in Downloader's map. So, this thread needs to start the download if it's not in the - // DNE or if there's an existing download that hasn't finished yet. Starting the download - // includes waiting for an existing download to finish, which from this class's pov is the - // same thing. - if (doNotEvict.find(key) == doNotEvict.end() || downloader->inProgress(key)) - keysToFetch.push_back(&key); - else - cout << "Cache: detected and stopped a racey download" << endl; - addToDNE(key); - } - } - if (keysToFetch.empty()) - return; - - downloader->download(keysToFetch, &dlErrnos, &dlSizes); - - size_t sum_sizes = 0; - for (uint i = 0; i < keysToFetch.size(); ++i) - { - // downloads with size 0 didn't actually happen, either because it - // was a preexisting download (another read() call owns it), or because - // there was an error downloading it. Use size == 0 as an indication of - // what to add to the cache. Also needs to verify that the file was not deleted, - // indicated by existence in doNotEvict. - if (dlSizes[i] != 0) - { - if (doNotEvict.find(*keysToFetch[i]) != doNotEvict.end()) - { - sum_sizes += dlSizes[i]; - lru.push_back(*keysToFetch[i]); - LRU_t::iterator lit = lru.end(); - m_lru.insert(--lit); // I dislike this way of grabbing the last iterator in a list. - } - else // it was downloaded, but a deletion happened so we have to toss it - { - cout << "removing a file that was deleted by another thread during download" << endl; - bf::remove(prefix / (*keysToFetch[i])); - } - } - } - - // move everything in keys to the back of the lru (yes, again) - for (const string &key : keys) - { - mit = m_lru.find(key); - if (mit != m_lru.end()) // all of the files exist, just not all of them are 'owned by' this thread. - lru.splice(lru.end(), lru, mit->lit); - } - - // fix cache size - //_makeSpace(sum_sizes); - currentCacheSize += sum_sizes; + getPCache(prefix).read(keys); } -void Cache::doneReading(const vector &keys) +void Cache::doneReading(const bf::path &prefix, const vector &keys) { - boost::unique_lock s(lru_mutex); - for (const string &key : keys) - { - removeFromDNE(key); - // most should be in the map. - // debateable whether it's faster to look up the list iterator and use it - // or whether it's faster to bypass that and use strings only. - - //const auto &it = m_lru.find(key); - //if (it != m_lru.end()) - // removeFromDNE(it->lit); - } - _makeSpace(0); + getPCache(prefix).doneReading(keys); } -void Cache::doneWriting() +void Cache::doneWriting(const bf::path &prefix) { - makeSpace(0); + getPCache(prefix).makeSpace(0); } -Cache::DNEElement::DNEElement(const LRU_t::iterator &k) : key(k), refCount(1) -{ -} -Cache::DNEElement::DNEElement(const string &k) : sKey(k), refCount(1) +const bf::path & Cache::getCachePath() const { + return cachePrefix; } -void Cache::addToDNE(const DNEElement &key) -{ - DNE_t::iterator it = doNotEvict.find(key); - if (it != doNotEvict.end()) - { - DNEElement &dnee = const_cast(*it); - ++(dnee.refCount); - } - else - doNotEvict.insert(key); -} - -void Cache::removeFromDNE(const DNEElement &key) -{ - DNE_t::iterator it = doNotEvict.find(key); - if (it == doNotEvict.end()) - return; - DNEElement &dnee = const_cast(*it); - if (--(dnee.refCount) == 0) - doNotEvict.erase(it); -} - -const bf::path & Cache::getCachePath() -{ - return prefix; -} - -const bf::path & Cache::getJournalPath() +const bf::path & Cache::getJournalPath() const { return journalPrefix; } + +const bf::path Cache::getCachePath(const bf::path &prefix) const +{ + return cachePrefix/prefix; +} + +const bf::path Cache::getJournalPath(const bf::path &prefix) const +{ + return journalPrefix/prefix; +} -void Cache::exists(const vector &keys, vector *out) const +void Cache::exists(const bf::path &prefix, const vector &keys, vector *out) { - out->resize(keys.size()); - boost::unique_lock s(lru_mutex); - for (uint i = 0; i < keys.size(); i++) - (*out)[i] = (m_lru.find(keys[i]) != m_lru.end()); + getPCache(prefix).exists(keys, out); } -bool Cache::exists(const string &key) const +bool Cache::exists(const bf::path &prefix, const string &key) { - boost::unique_lock s(lru_mutex); - return m_lru.find(key) != m_lru.end(); + return getPCache(prefix).exists(key); } -void Cache::newObject(const string &key, size_t size) +void Cache::newObject(const bf::path &prefix, const string &key, size_t size) { - boost::unique_lock s(lru_mutex); - assert(m_lru.find(key) == m_lru.end()); - //_makeSpace(size); - lru.push_back(key); - LRU_t::iterator back = lru.end(); - m_lru.insert(--back); - currentCacheSize += size; + getPCache(prefix).newObject(key, size); } -void Cache::newJournalEntry(size_t size) +void Cache::newJournalEntry(const bf::path &prefix, size_t size) { - boost::unique_lock s(lru_mutex); - //_makeSpace(size); - currentCacheSize += size; + getPCache(prefix).newJournalEntry(size); } -void Cache::deletedJournal(size_t size) +void Cache::deletedJournal(const bf::path &prefix, size_t size) { - boost::unique_lock s(lru_mutex); - assert(currentCacheSize >= size); - currentCacheSize -= size; + getPCache(prefix).deletedJournal(size); } -void Cache::deletedObject(const string &key, size_t size) +void Cache::deletedObject(const bf::path &prefix, const string &key, size_t size) { - boost::unique_lock s(lru_mutex); - assert(currentCacheSize >= size); - M_LRU_t::iterator mit = m_lru.find(key); - assert(mit != m_lru.end()); - - // if it's being flushed, let makeSpace() do the deleting - if (toBeDeleted.find(mit->lit) == toBeDeleted.end()) - { - doNotEvict.erase(mit->lit); - lru.erase(mit->lit); - m_lru.erase(mit); - currentCacheSize -= size; - } + getPCache(prefix).deletedObject(key, size); } void Cache::setMaxCacheSize(size_t size) { boost::unique_lock s(lru_mutex); - if (size < maxCacheSize) - _makeSpace(maxCacheSize - size); + maxCacheSize = size; + for (auto it = prefixCaches.begin(); it != prefixCaches.end(); ++it) + it->second->setMaxCacheSize(size); } -void Cache::makeSpace(size_t size) +void Cache::makeSpace(const bf::path &prefix, size_t size) { - boost::unique_lock s(lru_mutex); - _makeSpace(size); + getPCache(prefix).makeSpace(size); } size_t Cache::getMaxCacheSize() const @@ -485,188 +211,101 @@ size_t Cache::getMaxCacheSize() const return maxCacheSize; } -// call this holding lru_mutex -void Cache::_makeSpace(size_t size) +void Cache::rename(const bf::path &prefix, const string &oldKey, const string &newKey, ssize_t sizediff) { - ssize_t thisMuch = currentCacheSize + size - maxCacheSize; - if (thisMuch <= 0) - return; - - LRU_t::iterator it; - while (thisMuch > 0 && !lru.empty()) - { - it = lru.begin(); - // find the first element not being either read() right now or being processed by another - // makeSpace() call. - while (it != lru.end()) - { - // make sure it's not currently being read or being flushed by another _makeSpace() call - if ((doNotEvict.find(it) == doNotEvict.end()) && (toBeDeleted.find(it) == toBeDeleted.end())) - break; - ++it; - } - if (it == lru.end()) - { - // nothing can be deleted right now - return; - } - - if (!bf::exists(prefix / *it)) - cout << prefix / *it << " doesn't exist, WTF?" << endl; // ran into this a couple times, still happens as of commit 948ee1aa5 - assert(bf::exists(prefix / *it)); - /* - tell Synchronizer that this key will be evicted - delete the file - remove it from our structs - update current size - */ - - //logger->log(LOG_WARNING, "Cache: flushing!"); - toBeDeleted.insert(it); - - string key = *it; // need to make a copy; it could get changed after unlocking. - - lru_mutex.unlock(); - try - { - Synchronizer::get()->flushObject(key); - } - catch (...) - { - // it gets logged by Sync - lru_mutex.lock(); - toBeDeleted.erase(it); - continue; - } - lru_mutex.lock(); - - // check doNotEvict again in case this object is now being read - if (doNotEvict.find(it) == doNotEvict.end()) - { - bf::path cachedFile = prefix / *it; - m_lru.erase(*it); - toBeDeleted.erase(it); - lru.erase(it); - size_t newSize = bf::file_size(cachedFile); - replicator->remove(cachedFile, Replicator::LOCAL_ONLY); - if (newSize < currentCacheSize) - { - currentCacheSize -= newSize; - thisMuch -= newSize; - } - else - { - logger->log(LOG_WARNING, "Cache::makeSpace(): accounting error. Almost wrapped currentCacheSize on flush."); - currentCacheSize = 0; - thisMuch = 0; - } - } - else - toBeDeleted.erase(it); - } + getPCache(prefix).rename(oldKey, newKey, sizediff); } -void Cache::rename(const string &oldKey, const string &newKey, ssize_t sizediff) +int Cache::ifExistsThenDelete(const bf::path &prefix, const string &key) { - // rename it in the LRU - // erase/insert to rehash it everywhere else + return getPCache(prefix).ifExistsThenDelete(key); +} +size_t Cache::getCurrentCacheSize() +{ + size_t totalSize = 0; boost::unique_lock s(lru_mutex); - auto it = m_lru.find(oldKey); - if (it == m_lru.end()) - return; - auto lit = it->lit; - m_lru.erase(it); - int refCount = 0; - auto dne_it = doNotEvict.find(lit); - if (dne_it != doNotEvict.end()) - { - refCount = dne_it->refCount; - doNotEvict.erase(dne_it); - } - - auto tbd_it = toBeDeleted.find(lit); - bool hasTBDEntry = (tbd_it != toBeDeleted.end()); - if (hasTBDEntry) - toBeDeleted.erase(tbd_it); - - *lit = newKey; - - if (hasTBDEntry) - toBeDeleted.insert(lit); - if (refCount != 0) - { - pair dne_tmp = doNotEvict.insert(lit); - const_cast(*(dne_tmp.first)).refCount = refCount; - } - - m_lru.insert(lit); - currentCacheSize += sizediff; + for (auto it = prefixCaches.begin(); it != prefixCaches.end(); ++it) + totalSize += it->second->getCurrentCacheSize(); + return totalSize; } -int Cache::ifExistsThenDelete(const string &key) +size_t Cache::getCurrentCacheElementCount() { - bf::path cachedPath = prefix / key; - bf::path journalPath = journalPrefix / (key + ".journal"); + size_t totalCount = 0; + + for (auto it = prefixCaches.begin(); it != prefixCaches.end(); ++it) + totalCount += it->second->getCurrentCacheElementCount(); + return totalCount; - boost::unique_lock s(lru_mutex); - bool objectExists = false; - - auto it = m_lru.find(key); - if (it != m_lru.end()) - { - if (toBeDeleted.find(it->lit) == toBeDeleted.end()) - { - doNotEvict.erase(it->lit); - lru.erase(it->lit); - m_lru.erase(it); - objectExists = true; - } - else // let makeSpace() delete it if it's already in progress - return 0; - } - bool journalExists = bf::exists(journalPath); - //assert(objectExists == bf::exists(cachedPath)); - - size_t objectSize = (objectExists ? bf::file_size(cachedPath) : 0); - //size_t objectSize = (objectExists ? MetadataFile::getLengthFromKey(key) : 0); - size_t journalSize = (journalExists ? bf::file_size(journalPath) : 0); - currentCacheSize -= (objectSize + journalSize); - - //assert(!objectExists || objectSize == bf::file_size(cachedPath)); - - return (objectExists ? 1 : 0) | (journalExists ? 2 : 0); } -size_t Cache::getCurrentCacheSize() const +size_t Cache::getCurrentCacheSize(const bf::path &prefix) { - return currentCacheSize; + return getPCache(prefix).getCurrentCacheSize(); } -size_t Cache::getCurrentCacheElementCount() const +size_t Cache::getCurrentCacheElementCount(const bf::path &prefix) { - boost::unique_lock s(lru_mutex); - assert(m_lru.size() == lru.size()); - return m_lru.size(); + return getPCache(prefix).getCurrentCacheElementCount(); } void Cache::reset() { boost::unique_lock s(lru_mutex); - m_lru.clear(); - lru.clear(); - toBeDeleted.clear(); - doNotEvict.clear(); - bf::directory_iterator dir; - bf::directory_iterator dend; - for (dir = bf::directory_iterator(prefix); dir != dend; ++dir) - bf::remove_all(dir->path()); - - for (dir = bf::directory_iterator(journalPrefix); dir != dend; ++dir) - bf::remove_all(dir->path()); - currentCacheSize = 0; + for (auto it = prefixCaches.begin(); it != prefixCaches.end(); ++it) + it->second->reset(); +} + +void Cache::newPrefix(const bf::path &prefix) +{ + boost::unique_lock s(lru_mutex); + + //cerr << "Cache: making new prefix " << prefix.string() << endl; + assert(prefixCaches.find(prefix) == prefixCaches.end()); + prefixCaches[prefix] = NULL; + s.unlock(); + PrefixCache *pCache = new PrefixCache(prefix); + s.lock(); + prefixCaches[prefix] = pCache; +} + +void Cache::dropPrefix(const bf::path &prefix) +{ + boost::unique_lock s(lru_mutex); + + auto *pCache = prefixCaches[prefix]; + prefixCaches.erase(prefix); + s.unlock(); + delete pCache; +} + +inline PrefixCache & Cache::getPCache(const bf::path &prefix) +{ + boost::unique_lock s(lru_mutex); + + //cerr << "Getting pcache for " << prefix.string() << endl; + PrefixCache *ret; + auto it = prefixCaches.find(prefix); + assert(it != prefixCaches.end()); + + ret = it->second; + while (ret == NULL) // wait for the new PC to init + { + s.unlock(); + sleep(1); + s.lock(); + ret = prefixCaches[prefix]; + } + + return *ret; +} + +Downloader * Cache::getDownloader() const +{ + return downloader.get(); } void Cache::shutdown() @@ -675,48 +314,4 @@ void Cache::shutdown() downloader.reset(); } -/* The helper classes */ - -Cache::M_LRU_element_t::M_LRU_element_t(const string *k) : key(k) -{} - -Cache::M_LRU_element_t::M_LRU_element_t(const string &k) : key(&k) -{} - -Cache::M_LRU_element_t::M_LRU_element_t(const LRU_t::iterator &i) : key(&(*i)), lit(i) -{} - -inline size_t Cache::KeyHasher::operator()(const M_LRU_element_t &l) const -{ - return hash()(*(l.key)); } - -inline bool Cache::KeyEquals::operator()(const M_LRU_element_t &l1, const M_LRU_element_t &l2) const -{ - return (*(l1.key) == *(l2.key)); -} - -inline size_t Cache::DNEHasher::operator()(const DNEElement &l) const -{ - return (l.sKey.empty() ? hash()(*(l.key)) : hash()(l.sKey)); -} - -inline bool Cache::DNEEquals::operator()(const DNEElement &l1, const DNEElement &l2) const -{ - const string *s1, *s2; - s1 = l1.sKey.empty() ? &(*(l1.key)) : &(l1.sKey); - s2 = l2.sKey.empty() ? &(*(l2.key)) : &(l2.sKey); - - return (*s1 == *s2); -} - -inline bool Cache::TBDLess::operator()(const LRU_t::iterator &i1, const LRU_t::iterator &i2) const -{ - return *i1 < *i2; -} - -} - - - - diff --git a/src/Cache.h b/src/Cache.h index 7735c064a..069d5f402 100644 --- a/src/Cache.h +++ b/src/Cache.h @@ -1,15 +1,19 @@ - #ifndef CACHE_H_ #define CACHE_H_ +/* PrefixCache manages the cache for one prefix managed by SM. + Cache is a map of prefix -> Cache, and holds the items + that should be centralized like the Downloader +*/ + + #include "Downloader.h" #include "SMLogging.h" -#include "Replicator.h" +#include "PrefixCache.h" #include #include -#include -#include +#include #include #include #include @@ -26,120 +30,66 @@ class Cache : public boost::noncopyable //reading fcns // read() marks objects to be read s.t. they do not get flushed. // after reading them, unlock the 'logical file', and call doneReading(). - void read(const std::vector &keys); - void doneReading(const std::vector &keys); - bool exists(const std::string &key) const; - void exists(const std::vector &keys, std::vector *out) const; + void read(const boost::filesystem::path &prefix, const std::vector &keys); + void doneReading(const boost::filesystem::path &prefix, const std::vector &keys); + bool exists(const boost::filesystem::path &prefix, const std::string &key); + void exists(const boost::filesystem::path &prefix, const std::vector &keys, + std::vector *out); // writing fcns - // new*() fcns tell the cache data was added. After writing a set of objects, + // new*() fcns tell the Cache data was added. After writing a set of objects, // unlock the 'logical file', and call doneWriting(). - void newObject(const std::string &key, size_t size); - void newJournalEntry(size_t size); - void doneWriting(); - void deletedObject(const std::string &key, size_t size); - void deletedJournal(size_t size); + void newObject(const boost::filesystem::path &prefix, const std::string &key, size_t size); + void newJournalEntry(const boost::filesystem::path &prefix, size_t size); + void doneWriting(const boost::filesystem::path &prefix); + void deletedObject(const boost::filesystem::path &prefix, const std::string &key, size_t size); + void deletedJournal(const boost::filesystem::path &prefix, size_t size); // an 'atomic' existence check & delete. Covers the object and journal. Does not delete the files. // returns 0 if it didn't exist, 1 if the object exists, 2 if the journal exists, and 3 (1 | 2) if both exist // This should be called while holding the file lock for key because it touches the journal file. - int ifExistsThenDelete(const std::string &key); + int ifExistsThenDelete(const boost::filesystem::path &prefix, const std::string &key); // rename is used when an old obj gets merged with its journal file // the size will change in that process; sizediff is by how much - void rename(const std::string &oldKey, const std::string &newKey, ssize_t sizediff); + void rename(const boost::filesystem::path &prefix, const std::string &oldKey, + const std::string &newKey, ssize_t sizediff); void setMaxCacheSize(size_t size); - void makeSpace(size_t size); - size_t getCurrentCacheSize() const; - size_t getCurrentCacheElementCount() const; + void makeSpace(const boost::filesystem::path &prefix, size_t size); + size_t getCurrentCacheSize(); + size_t getCurrentCacheElementCount(); + size_t getCurrentCacheSize(const boost::filesystem::path &prefix); + size_t getCurrentCacheElementCount(const boost::filesystem::path &prefix); size_t getMaxCacheSize() const; + + Downloader * getDownloader() const; + void newPrefix(const boost::filesystem::path &prefix); + void dropPrefix(const boost::filesystem::path &prefix); void shutdown(); // test helpers - const boost::filesystem::path &getCachePath(); - const boost::filesystem::path &getJournalPath(); - // this will delete everything in the cache and journal paths, and empty all Cache structures. + const boost::filesystem::path &getCachePath() const; + const boost::filesystem::path &getJournalPath() const; + const boost::filesystem::path getCachePath(const boost::filesystem::path &prefix) const; + const boost::filesystem::path getJournalPath(const boost::filesystem::path &prefix) const; + // this will delete everything in the Cache and journal paths, and empty all Cache structures. void reset(); void validateCacheSize(); private: Cache(); - boost::filesystem::path prefix; + SMLogging *logger; + boost::filesystem::path cachePrefix; boost::filesystem::path journalPrefix; size_t maxCacheSize; size_t objectSize; - size_t currentCacheSize; boost::scoped_ptr downloader; - Replicator *replicator; - SMLogging *logger; - void populate(); - void _makeSpace(size_t size); + PrefixCache & getPCache(const boost::filesystem::path &prefix); - /* The main cache structures */ - // lru owns the string memory for the filenames it manages. m_lru and DNE point to those strings. - typedef std::list LRU_t; - LRU_t lru; - - struct M_LRU_element_t - { - M_LRU_element_t(const std::string &); - M_LRU_element_t(const std::string *); - M_LRU_element_t(const LRU_t::iterator &); - const std::string *key; - LRU_t::iterator lit; - }; - struct KeyHasher - { - size_t operator()(const M_LRU_element_t &l) const; - }; - - struct KeyEquals - { - bool operator()(const M_LRU_element_t &l1, const M_LRU_element_t &l2) const; - }; - - typedef std::unordered_set M_LRU_t; - M_LRU_t m_lru; // the LRU entries as a hash table - - /* The do-not-evict list stuff. */ - struct DNEElement - { - DNEElement(const LRU_t::iterator &); - DNEElement(const std::string &); - LRU_t::iterator key; - std::string sKey; - uint refCount; - }; - - struct DNEHasher - { - size_t operator()(const DNEElement &d) const; - }; - - struct DNEEquals - { - bool operator()(const DNEElement &d1, const DNEElement &d2) const; - }; - - typedef std::unordered_set DNE_t; - DNE_t doNotEvict; - void addToDNE(const DNEElement &); - void removeFromDNE(const DNEElement &); - - // the to-be-deleted set. Elements removed from the LRU but not yet deleted will be here. - // Elements are inserted and removed by makeSpace(). If read() references a file that is in this, - // it will remove it, signalling to makeSpace that it should not be deleted - struct TBDLess - { - bool operator()(const LRU_t::iterator &, const LRU_t::iterator &) const; - }; - - typedef std::set TBD_t; - TBD_t toBeDeleted; - - mutable boost::mutex lru_mutex; // protects the main cache structures & the do-not-evict set + std::map prefixCaches; + mutable boost::mutex lru_mutex; // protects the prefixCaches }; diff --git a/src/Downloader.cpp b/src/Downloader.cpp index 6192c9fe5..e2ef4f1f7 100644 --- a/src/Downloader.cpp +++ b/src/Downloader.cpp @@ -7,6 +7,7 @@ #include using namespace std; +namespace bf = boost::filesystem; namespace storagemanager { @@ -33,107 +34,24 @@ Downloader::~Downloader() { } -void Downloader::useThisLock(boost::mutex *mutex) -{ - lock = mutex; -} - -/* -inline boost::mutex * Downloader::getDownloadMutex() -{ - return &download_mutex; -} -*/ - -#if 0 -/* This is another fcn with a bug too subtle to identify right now. Writing up a simplified - version; we can revisit later if necessary */ -void Downloader::download(const vector &keys, vector *errnos, vector *sizes) -{ - uint counter = keys.size(); - boost::condition condvar; - boost::mutex m; - DownloadListener listener(&counter, &condvar, &m); - - boost::shared_ptr dls[keys.size()]; - bool inserted[keys.size()]; - Downloads_t::iterator iterators[keys.size()]; - - uint i; - - for (i = 0; i < keys.size(); i++) - dls[i].reset(new Download(keys[i], this)); - - boost::unique_lock s(download_mutex); - for (i = 0; i < keys.size(); i++) - { - auto &dl = dls[i]; - pair ret = downloads.insert(dl); - iterators[i] = ret.first; - inserted[i] = ret.second; - - if (inserted[i]) - { - dl->listeners.push_back(&listener); - workers.addJob(dl); - } - else - { - dl = *(ret.first); // point to the existing download. Redundant with the iterators array. Don't care yet. - (*iterators[i])->listeners.push_back(&listener); - } - } - - // wait for the downloads to finish - boost::unique_lock dl_lock(m); - s.unlock(); - while (counter > 0) - condvar.wait(dl_lock); - dl_lock.unlock(); - - // remove the entries inserted by this call - sizes->resize(keys.size()); - s.lock(); - for (i = 0; i < keys.size(); i++) - { - if (inserted[i]) - { - (*sizes)[i] = (*iterators[i])->size; - downloads.erase(iterators[i]); - } - else - (*sizes)[i] = 0; - } - s.unlock(); - - // check for errors & propagate - errnos->resize(keys.size()); - char buf[80]; - for (i = 0; i < keys.size(); i++) - { - auto &dl = dls[i]; - (*errnos)[i] = dl->dl_errno; - if (dl->dl_errno != 0) - logger->log(LOG_ERR, "Downloader: failed to download %s, got '%s'", keys[i]->c_str(), strerror_r(dl->dl_errno, buf, 80)); - } -} -#endif - -void Downloader::download(const vector &keys, vector *errnos, vector *sizes) +void Downloader::download(const vector &keys, vector *errnos, vector *sizes, + const bf::path &prefix, boost::mutex *cache_lock) { uint counter = keys.size(); boost::condition condvar; DownloadListener listener(&counter, &condvar); vector > ownedDownloads(keys.size()); - // the caller is holding a mutex + // the caller is holding cache_lock /* if a key is already being downloaded, attach listener to that Download instance. if it is not already being downloaded, make a new Download instance. wait for the listener to tell us that it's done. */ + boost::unique_lock s(lock); for (uint i = 0; i < keys.size(); i++) { - boost::shared_ptr newDL(new Download(*keys[i], downloadPath, lock)); + boost::shared_ptr newDL(new Download(*keys[i], prefix, cache_lock)); + auto it = downloads.find(newDL); // kinda sucks to have to search this way. if (it == downloads.end()) { @@ -157,15 +75,17 @@ void Downloader::download(const vector &keys, vector *errno (*it)->listeners.push_back(&listener); } } - + s.unlock(); + // wait for the downloads to finish while (counter > 0) - condvar.wait(*lock); + condvar.wait(*cache_lock); // check success, gather sizes from downloads started by this thread sizes->resize(keys.size()); errnos->resize(keys.size()); - char buf[80]; + char buf[80]; + s.lock(); for (uint i = 0; i < keys.size(); i++) { if (ownedDownloads[i]) @@ -189,19 +109,21 @@ void Downloader::download(const vector &keys, vector *errno bool Downloader::inProgress(const string &key) { boost::shared_ptr tmp(new Download(key)); + boost::unique_lock s(lock); + auto it = downloads.find(tmp); if (it != downloads.end()) return !(*it)->finished; return false; } - -void Downloader::setDownloadPath(const string &path) -{ - downloadPath = path; -} +void Downloader::setDownloadPath(const bf::path &path) +{ + //downloadPath = path; +} + /* The helper fcns */ -Downloader::Download::Download(const string &source, const string &_dlPath, boost::mutex *_lock) : +Downloader::Download::Download(const string &source, const bf::path &_dlPath, boost::mutex *_lock) : dlPath(_dlPath), key(source), dl_errno(0), size(0), lock(_lock), finished(false), itRan(false) { } @@ -222,11 +144,12 @@ void Downloader::Download::operator()() CloudStorage *storage = CloudStorage::get(); // TODO: we should have it download to a tmp path, then mv it to the cache dir to avoid // Cache seeing an incomplete download after a crash + int err = storage->getObject(key, (dlPath / key).string(), &size); if (err != 0) { dl_errno = errno; - boost::filesystem::remove(dlPath / key); + bf::remove(dlPath / key); size = 0; } diff --git a/src/Downloader.h b/src/Downloader.h index 518658de4..b7bf5def7 100644 --- a/src/Downloader.h +++ b/src/Downloader.h @@ -25,15 +25,16 @@ class Downloader // caller owns the memory for the strings. // errors are reported through errnos - void download(const std::vector &keys, std::vector *errnos, std::vector *sizes); - void setDownloadPath(const std::string &path); - void useThisLock(boost::mutex *); - bool inProgress(const std::string &); + void download(const std::vector &keys, + std::vector *errnos, std::vector *sizes, const boost::filesystem::path &prefix, + boost::mutex *lock); + void setDownloadPath(const boost::filesystem::path &path); + bool inProgress(const std::string &); // call this holding the cache's lock private: uint maxDownloads; - std::string downloadPath; - boost::mutex *lock; + //boost::filesystem::path downloadPath; + boost::mutex lock; class DownloadListener { @@ -51,7 +52,7 @@ class Downloader */ struct Download : public ThreadPool::Job { - Download(const std::string &source, const std::string &_dlPath, boost::mutex *_lock); + Download(const std::string &source, const boost::filesystem::path &_dlPath, boost::mutex *); Download(const std::string &source); ~Download(); void operator()(); diff --git a/src/IOCoordinator.cpp b/src/IOCoordinator.cpp index ca6b8a80d..7373a543a 100644 --- a/src/IOCoordinator.cpp +++ b/src/IOCoordinator.cpp @@ -80,11 +80,6 @@ IOCoordinator * IOCoordinator::get() return ioc; } -void IOCoordinator::willRead(const char *, off_t, size_t) -{ - // not sure we will implement this. -} - int IOCoordinator::loadObject(int fd, uint8_t *data, off_t offset, size_t length) const { size_t count = 0; @@ -135,7 +130,8 @@ ssize_t IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset, release read lock put together the response in data */ - bf::path p(_filename); + bf::path p = ownership.get(_filename); + const bf::path firstDir = *(p.begin()); const char *filename = p.string().c_str(); ScopedReadLock fileLock(this, filename); @@ -158,7 +154,7 @@ ssize_t IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset, vector keys; for (const auto &object : relevants) keys.push_back(object.key); - cache->read(keys); + cache->read(firstDir, keys); // open the journal files and objects that exist to prevent them from being // deleted mid-operation @@ -169,11 +165,11 @@ ssize_t IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset, // later. not thinking about it for now. // open all of the journal files that exist - string filename = (journalPath/(key + ".journal")).string(); - int fd = ::open(filename.c_str(), O_RDONLY); + string jFilename = (journalPath/firstDir/(key + ".journal")).string(); + int fd = ::open(jFilename.c_str(), O_RDONLY); if (fd >= 0) { - keyToJournalName[key] = filename; + keyToJournalName[key] = jFilename; journalFDs[key] = fd; fdMinders[mindersIndex++].fd = fd; //fdMinders.push_back(SharedCloser(fd)); @@ -182,27 +178,27 @@ ssize_t IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset, { int l_errno = errno; fileLock.unlock(); - cache->doneReading(keys); + cache->doneReading(firstDir, keys); logger->log(LOG_CRIT, "IOCoordinator::read(): Got an unexpected error opening %s, error was '%s'", - filename.c_str(), strerror_r(l_errno, buf, 80)); + jFilename.c_str(), strerror_r(l_errno, buf, 80)); errno = l_errno; return -1; } // open all of the objects - filename = (cachePath/key).string(); - fd = ::open(filename.c_str(), O_RDONLY); + string oFilename = (cachePath/firstDir/key).string(); + fd = ::open(oFilename.c_str(), O_RDONLY); if (fd < 0) { int l_errno = errno; fileLock.unlock(); - cache->doneReading(keys); + cache->doneReading(firstDir, keys); logger->log(LOG_CRIT, "IOCoordinator::read(): Got an unexpected error opening %s, error was '%s'", - filename.c_str(), strerror_r(l_errno, buf, 80)); + oFilename.c_str(), strerror_r(l_errno, buf, 80)); errno = l_errno; return -1; } - keyToObjectName[key] = filename; + keyToObjectName[key] = oFilename; objectFDs[key] = fd; fdMinders[mindersIndex++].fd = fd; //fdMinders.push_back(SharedCloser(fd)); @@ -239,7 +235,7 @@ ssize_t IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset, if (err) { fileLock.unlock(); - cache->doneReading(keys); + cache->doneReading(firstDir, keys); if (count == 0) return -1; else @@ -251,24 +247,26 @@ ssize_t IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset, out: fileLock.unlock(); - cache->doneReading(keys); + cache->doneReading(firstDir, keys); // all done return count; } ssize_t IOCoordinator::write(const char *_filename, const uint8_t *data, off_t offset, size_t length) { - bf::path p(_filename); + bf::path p = ownership.get(_filename); + const bf::path firstDir = *(p.begin()); const char *filename = p.string().c_str(); ScopedWriteLock lock(this, filename); - int ret = _write(filename, data, offset, length); + int ret = _write(filename, data, offset, length, firstDir); lock.unlock(); - cache->doneWriting(); + cache->doneWriting(firstDir); return ret; } -ssize_t IOCoordinator::_write(const char *filename, const uint8_t *data, off_t offset, size_t length) +ssize_t IOCoordinator::_write(const char *filename, const uint8_t *data, off_t offset, size_t length, + const bf::path &firstDir) { int err = 0; ssize_t count = 0; @@ -311,7 +309,7 @@ ssize_t IOCoordinator::_write(const char *filename, const uint8_t *data, off_t o } //cache->makeSpace(writeLength+JOURNAL_ENTRY_HEADER_SIZE); - err = replicator->addJournalEntry(i->key.c_str(),&data[count],objectOffset,writeLength); + err = replicator->addJournalEntry((firstDir/i->key).string().c_str(),&data[count],objectOffset,writeLength); assert((uint) err == writeLength); if (err <= 0) @@ -328,9 +326,9 @@ ssize_t IOCoordinator::_write(const char *filename, const uint8_t *data, off_t o if ((writeLength + objectOffset) > i->length) metadata.updateEntryLength(i->offset, (writeLength + objectOffset)); - cache->newJournalEntry(writeLength+JOURNAL_ENTRY_HEADER_SIZE); + cache->newJournalEntry(firstDir, writeLength+JOURNAL_ENTRY_HEADER_SIZE); - synchronizer->newJournalEntry(i->key, writeLength+JOURNAL_ENTRY_HEADER_SIZE); + synchronizer->newJournalEntry(firstDir, i->key, writeLength+JOURNAL_ENTRY_HEADER_SIZE); count += writeLength; dataRemaining -= writeLength; } @@ -364,7 +362,7 @@ ssize_t IOCoordinator::_write(const char *filename, const uint8_t *data, off_t o metadataObject newObject = metadata.addMetadataObject(filename,(writeLength + objectOffset)); // send to replicator - err = replicator->newObject(newObject.key.c_str(),&data[count],objectOffset,writeLength); + err = replicator->newObject((firstDir/newObject.key).string().c_str(),&data[count],objectOffset,writeLength); assert((uint) err == writeLength); if (err <= 0) { @@ -377,13 +375,13 @@ ssize_t IOCoordinator::_write(const char *filename, const uint8_t *data, off_t o //log error and abort } - cache->newObject(newObject.key,writeLength + objectOffset); + cache->newObject(firstDir, newObject.key,writeLength + objectOffset); newObjectKeys.push_back(newObject.key); count += writeLength; dataRemaining -= writeLength; } - synchronizer->newObjects(newObjectKeys); + synchronizer->newObjects(firstDir, newObjectKeys); replicator->updateMetadata(filename, metadata); @@ -392,7 +390,8 @@ ssize_t IOCoordinator::_write(const char *filename, const uint8_t *data, off_t o ssize_t IOCoordinator::append(const char *_filename, const uint8_t *data, size_t length) { - bf::path p(_filename); + bf::path p = ownership.get(_filename); + const bf::path firstDir = *(p.begin()); const char *filename = p.string().c_str(); int err; @@ -430,7 +429,7 @@ ssize_t IOCoordinator::append(const char *_filename, const uint8_t *data, size_t //cache->makeSpace(writeLength+JOURNAL_ENTRY_HEADER_SIZE); - err = replicator->addJournalEntry(i->key.c_str(),&data[count],i->length,writeLength); + err = replicator->addJournalEntry((firstDir/i->key).string().c_str(),&data[count],i->length,writeLength); assert((uint) err == writeLength); if (err <= 0) { @@ -441,9 +440,9 @@ ssize_t IOCoordinator::append(const char *_filename, const uint8_t *data, size_t } metadata.updateEntryLength(i->offset, (writeLength + i->length)); - cache->newJournalEntry(writeLength+JOURNAL_ENTRY_HEADER_SIZE); + cache->newJournalEntry(firstDir, writeLength+JOURNAL_ENTRY_HEADER_SIZE); - synchronizer->newJournalEntry(i->key, writeLength+JOURNAL_ENTRY_HEADER_SIZE); + synchronizer->newJournalEntry(firstDir, i->key, writeLength+JOURNAL_ENTRY_HEADER_SIZE); count += writeLength; dataRemaining -= writeLength; } @@ -466,7 +465,7 @@ ssize_t IOCoordinator::append(const char *_filename, const uint8_t *data, size_t metadataObject newObject = metadata.addMetadataObject(filename,writeLength); // write the new object - err = replicator->newObject(newObject.key.c_str(),&data[count],0,writeLength); + err = replicator->newObject((firstDir/newObject.key).string().c_str(),&data[count],0,writeLength); assert((uint) err == writeLength); if (err <= 0) { @@ -477,20 +476,20 @@ ssize_t IOCoordinator::append(const char *_filename, const uint8_t *data, size_t goto out; //log error and abort } - cache->newObject(newObject.key,writeLength); + cache->newObject(firstDir, newObject.key,writeLength); newObjectKeys.push_back(newObject.key); count += writeLength; dataRemaining -= writeLength; } - synchronizer->newObjects(newObjectKeys); + synchronizer->newObjects(firstDir, newObjectKeys); replicator->updateMetadata(filename, metadata); // had to add this hack to prevent deadlock out: // need to release the file lock before telling Cache that we're done writing. lock.unlock(); - cache->doneWriting(); + cache->doneWriting(firstDir); return count; } @@ -498,26 +497,28 @@ out: // TODO: might need to support more open flags, ex: O_EXCL int IOCoordinator::open(const char *_filename, int openmode, struct stat *out) { - bf::path p = _filename; // normalize it + bf::path p = ownership.get(_filename); const char *filename = p.string().c_str(); - ScopedReadLock s(this, filename); + boost::scoped_ptr s; + + if (openmode & O_CREAT || openmode | O_TRUNC) + s.reset(new ScopedWriteLock(this, filename)); + else + s.reset(new ScopedReadLock(this, filename)); MetadataFile meta(filename, MetadataFile::no_create_t()); if ((openmode & O_CREAT) && !meta.exists()) replicator->updateMetadata(filename, meta); // this will end up creating filename if ((openmode & O_TRUNC) && meta.exists()) - { - s.unlock(); - truncate(filename, 0); - s.lock(); - } + _truncate(p, 0, s.get()); + return meta.stat(out); } int IOCoordinator::listDirectory(const char *dirname, vector *listing) { - bf::path p(metaPath / dirname); + bf::path p(metaPath / ownership.get(dirname)); listing->clear(); if (!bf::exists(p)) @@ -544,7 +545,7 @@ int IOCoordinator::listDirectory(const char *dirname, vector *listing) int IOCoordinator::stat(const char *_path, struct stat *out) { - bf::path p(_path); + bf::path p = ownership.get(_path); const char *path = p.string().c_str(); if (bf::is_directory(metaPath/p)) @@ -556,6 +557,16 @@ int IOCoordinator::stat(const char *_path, struct stat *out) } int IOCoordinator::truncate(const char *_path, size_t newSize) +{ + bf::path p = ownership.get(_path); + const char *path = p.string().c_str(); + + ScopedWriteLock lock(this, path); + return _truncate(p, newSize, &lock); +} + + +int IOCoordinator::_truncate(const bf::path &bfpath, size_t newSize, ScopedFileLock *lock) { /* grab the write lock. @@ -568,13 +579,12 @@ int IOCoordinator::truncate(const char *_path, size_t newSize) tell cache they were deleted tell synchronizer they were deleted */ - bf::path p(_path); - const char *path = p.string().c_str(); + const bf::path firstDir = *(bfpath.begin()); + const char *path = bfpath.string().c_str(); Synchronizer *synchronizer = Synchronizer::get(); // needs to init sync here to break circular dependency... int err; - ScopedWriteLock lock(this, path); MetadataFile meta(path, MetadataFile::no_create_t()); if (!meta.exists()) { @@ -590,9 +600,9 @@ int IOCoordinator::truncate(const char *_path, size_t newSize) if (filesize < newSize) { uint8_t zero = 0; - err = _write(path, &zero, newSize - 1, 1); - lock.unlock(); - cache->doneWriting(); + err = _write(path, &zero, newSize - 1, 1, firstDir); + lock->unlock(); + cache->doneWriting(firstDir); if (err < 0) return -1; return 0; @@ -620,15 +630,15 @@ int IOCoordinator::truncate(const char *_path, size_t newSize) vector deletedObjects; for (; i < objects.size(); ++i) { - int result = cache->ifExistsThenDelete(objects[i].key); + int result = cache->ifExistsThenDelete(firstDir, objects[i].key); if (result & 0x1) - replicator->remove(cachePath / objects[i].key); + replicator->remove(cachePath/firstDir/objects[i].key); if (result & 0x2) - replicator->remove(journalPath / (objects[i].key + ".journal")); + replicator->remove(journalPath/firstDir/(objects[i].key + ".journal")); deletedObjects.push_back(objects[i].key); } if (!deletedObjects.empty()) - synchronizer->deletedObjects(deletedObjects); + synchronizer->deletedObjects(firstDir, deletedObjects); return 0; } @@ -645,10 +655,11 @@ void IOCoordinator::deleteMetaFile(const bf::path &file) //cout << "deleteMetaFile called on " << file << endl; Synchronizer *synchronizer = Synchronizer::get(); - + // this is kind of ugly. We need to lock on 'file' relative to metaPath, and without the .meta extension - string pita = file.string().substr(metaPath.string().length()); // get rid of metapath + string pita = file.string().substr(metaPath.string().length() + 1); // get rid of metapath pita = pita.substr(0, pita.length() - 5); // get rid of the extension + const bf::path firstDir = *(bf::path(pita).begin()); ScopedWriteLock lock(this, pita); //cout << "file is " << file.string() << " locked on " << pita << endl; @@ -661,14 +672,14 @@ void IOCoordinator::deleteMetaFile(const bf::path &file) for (auto &object : objects) { //cout << "deleting " << object.key << endl; - int result = cache->ifExistsThenDelete(object.key); + int result = cache->ifExistsThenDelete(firstDir, object.key); if (result & 0x1) - replicator->remove(cachePath/object.key); + replicator->remove(cachePath/firstDir/object.key); if (result & 0x2) - replicator->remove(journalPath/(object.key + ".journal")); + replicator->remove(journalPath/firstDir/(object.key + ".journal")); deletedObjects.push_back(object.key); } - synchronizer->deletedObjects(deletedObjects); + synchronizer->deletedObjects(firstDir, deletedObjects); } void IOCoordinator::remove(const bf::path &p) @@ -718,7 +729,7 @@ int IOCoordinator::unlink(const char *path) /* TODO! We need to make sure the input params to IOC fcns don't go up to parent dirs, ex, if path = '../../../blahblah'. */ - bf::path p(metaPath/path); + bf::path p(metaPath/ownership.get(path)); try { @@ -764,8 +775,10 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2) write the new metadata object */ - const bf::path p1(_filename1); - const bf::path p2(_filename2); + const bf::path p1 = ownership.get(_filename1); + const bf::path p2 = ownership.get(_filename2); + const bf::path firstDir1 = *(p1.begin()); + const bf::path firstDir2 = *(p2.begin()); const char *filename1 = p1.string().c_str(); const char *filename2 = p2.string().c_str(); @@ -815,7 +828,7 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2) { for (const auto &object : objects) { - bf::path journalFile = journalPath/(object.key + ".journal"); + bf::path journalFile = journalPath/firstDir1/(object.key + ".journal"); metadataObject newObj = meta2.addMetadataObject(filename2, object.length); assert(newObj.offset == object.offset); // TODO: failure here makes a lot of noise in the log file @@ -826,7 +839,7 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2) if (errno == ENOENT) { // it's not in cloudstorage, see if it's in the cache - bf::path cachedObjPath = cachePath/object.key; + bf::path cachedObjPath = cachePath/firstDir1/object.key; bool objExists = bf::exists(cachedObjPath); if (!objExists) throw CFException(ENOENT, string("IOCoordinator::copyFile(): source = ") + filename1 + @@ -849,12 +862,12 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2) // if there's a journal file for this object, make a copy if (bf::exists(journalFile)) { - bf::path newJournalFile = journalPath/(newObj.key + ".journal"); + bf::path newJournalFile = journalPath/firstDir2/(newObj.key + ".journal"); try { bf::copy_file(journalFile, newJournalFile); size_t tmp = bf::file_size(newJournalFile); - cache->newJournalEntry(tmp); + cache->newJournalEntry(firstDir2, tmp); newJournalEntries.push_back(pair(newObj.key, tmp)); } catch (bf::filesystem_error &e) @@ -873,8 +886,8 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2) cs->deleteObject(newObject.key); for (auto &jEntry : newJournalEntries) { - bf::path fullJournalPath = journalPath/(jEntry.first + ".journal"); - cache->deletedJournal(bf::file_size(fullJournalPath)); + bf::path fullJournalPath = journalPath/firstDir2/(jEntry.first + ".journal"); + cache->deletedJournal(firstDir2, bf::file_size(fullJournalPath)); bf::remove(fullJournalPath); } errno = e.l_errno; @@ -885,7 +898,7 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2) lock2.unlock(); for (auto &jEntry : newJournalEntries) - sync->newJournalEntry(jEntry.first, jEntry.second); + sync->newJournalEntry(firstDir2, jEntry.first, jEntry.second); return 0; } @@ -1147,18 +1160,12 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array &objData, size return 0; } -void IOCoordinator::renameObject(const string &oldKey, const string &newKey) -{ - // does anything need to be done here? -} - - void IOCoordinator::readLock(const string &filename) { boost::unique_lock s(lockMutex); //cout << "read-locking " << filename << endl; - //assert(filename[0] == '/'); + assert(filename[0] != '/'); auto ins = locks.insert(pair(filename, NULL)); if (ins.second) ins.first->second = new RWLock(); @@ -1183,7 +1190,7 @@ void IOCoordinator::writeLock(const string &filename) boost::unique_lock s(lockMutex); //cout << "write-locking " << filename << endl; - //assert(filename[0] == '/'); + assert(filename[0] != '/'); auto ins = locks.insert(pair(filename, NULL)); if (ins.second) ins.first->second = new RWLock(); diff --git a/src/IOCoordinator.h b/src/IOCoordinator.h index c74376a5a..b6db5dca4 100644 --- a/src/IOCoordinator.h +++ b/src/IOCoordinator.h @@ -18,6 +18,7 @@ #include "RWLock.h" #include "Replicator.h" #include "Utilities.h" +#include "Ownership.h" namespace storagemanager { @@ -30,7 +31,6 @@ class IOCoordinator : public boost::noncopyable static IOCoordinator *get(); virtual ~IOCoordinator(); - void willRead(const char *filename, off_t offset, size_t length); /* TODO: make read, write, append return a ssize_t */ ssize_t read(const char *filename, uint8_t *data, off_t offset, size_t length); ssize_t write(const char *filename, const uint8_t *data, off_t offset, size_t length); @@ -58,7 +58,6 @@ class IOCoordinator : public boost::noncopyable /* Lock manipulation fcns. They can lock on any param given to them. For convention's sake, the parameter should mostly be the abs filename being accessed. */ - void renameObject(const std::string &oldKey, const std::string &newKey); void readLock(const std::string &filename); void writeLock(const std::string &filename); void readUnlock(const std::string &filename); @@ -75,6 +74,7 @@ class IOCoordinator : public boost::noncopyable Cache *cache; SMLogging *logger; Replicator *replicator; + Ownership ownership; // ACK! Need a new name for this! size_t objectSize; boost::filesystem::path journalPath; @@ -87,7 +87,9 @@ class IOCoordinator : public boost::noncopyable void remove(const boost::filesystem::path &path); void deleteMetaFile(const boost::filesystem::path &file); - ssize_t _write(const char *filename, const uint8_t *data, off_t offset, size_t length); + int _truncate(const boost::filesystem::path &path, size_t newsize, ScopedFileLock *lock); + ssize_t _write(const char *filename, const uint8_t *data, off_t offset, size_t length, + const boost::filesystem::path &firstDir); int loadObjectAndJournal(const char *objFilename, const char *journalFilename, uint8_t *data, off_t offset, size_t length) const; diff --git a/src/MetadataFile.cpp b/src/MetadataFile.cpp index 502a94e0e..a0f8977af 100644 --- a/src/MetadataFile.cpp +++ b/src/MetadataFile.cpp @@ -49,7 +49,7 @@ MetadataFile::MetadataConfig::MetadataConfig() catch (...) { logger->log(LOG_CRIT, "ObjectStorage/object_size must be set to a numeric value"); - throw runtime_error("Please set ObjectStorage/journal_path in the storagemanager.cnf file"); + throw runtime_error("Please set ObjectStorage/object)size in the storagemanager.cnf file"); } try @@ -57,8 +57,8 @@ MetadataFile::MetadataConfig::MetadataConfig() msMetadataPath = config->getValue("ObjectStorage", "metadata_path"); if (msMetadataPath.empty()) { - logger->log(LOG_CRIT, "ObjectStorage/journal_path is not set"); - throw runtime_error("Please set ObjectStorage/journal_path in the storagemanager.cnf file"); + logger->log(LOG_CRIT, "ObjectStorage/metadata_path is not set"); + throw runtime_error("Please set ObjectStorage/metadata_path in the storagemanager.cnf file"); } } catch (...) diff --git a/src/Ownership.cpp b/src/Ownership.cpp new file mode 100644 index 000000000..dfa3a8e3f --- /dev/null +++ b/src/Ownership.cpp @@ -0,0 +1,303 @@ +#include "Ownership.h" +#include "Config.h" +#include "Cache.h" +#include "Synchronizer.h" +#include +#include +#include +#include +#include + +using namespace std; +namespace bf=boost::filesystem; + +namespace storagemanager +{ + +Ownership::Ownership() +{ + Config *config = Config::get(); + logger = SMLogging::get(); + + string sPrefixDepth = config->getValue("ObjectStorage", "common_prefix_depth"); + if (sPrefixDepth.empty()) + { + const char *msg = "Ownership: Need to specify ObjectStorage/common_prefix_depth in the storagemanager.cnf file"; + logger->log(LOG_CRIT, msg); + throw runtime_error(msg); + } + try + { + prefixDepth = stoul(sPrefixDepth, NULL, 0); + } + catch (invalid_argument &e) + { + const char *msg = "Ownership: Invalid value in ObjectStorage/common_prefix_depth"; + logger->log(LOG_CRIT, msg); + throw runtime_error(msg); + } + + metadataPrefix = config->getValue("ObjectStorage", "metadata_path"); + if (metadataPrefix.empty()) + { + const char *msg = "Ownership: Need to specify ObjectStorage/metadata_path in the storagemanager.cnf file"; + logger->log(LOG_CRIT, msg); + throw runtime_error(msg); + } + monitor = new Monitor(this); +} + +Ownership::~Ownership() +{ + delete monitor; + for (auto &it : ownedPrefixes) + releaseOwnership(it.first, true); +} + +bf::path Ownership::get(const bf::path &p) +{ + bf::path ret, prefix; + bf::path::const_iterator pit; + uint i; + + //cerr << "Ownership::get() param = " << p.string() << endl; + if (prefixDepth > 0) + { + for (i = 0, pit = p.begin(); i <= prefixDepth && pit != p.end(); ++i, ++pit) + ; + if (pit != p.end()) + prefix = *pit; + //cerr << "prefix is " << prefix.string() << endl; + for (; pit != p.end(); ++pit) + ret /= *pit; + if (ret.empty()) + { + //cerr << "returning ''" << endl; + return ret; + } + } + else + { + ret = p; + prefix = *(p.begin()); + } + + mutex.lock(); + if (ownedPrefixes.find(prefix) == ownedPrefixes.end()) + { + mutex.unlock(); + takeOwnership(prefix); + } + else + { + // todo... replace this polling, and the similar polling in Cache, with proper condition vars. + while (ownedPrefixes[prefix] == false) + { + mutex.unlock(); + sleep(1); + mutex.lock(); + } + mutex.unlock(); + } + //cerr << "returning " << ret.string() << endl; + return ret; +} + +// minor timesaver +#define TOUCH(p, f) { \ + int fd = ::open((metadataPrefix/p/f).string().c_str(), O_TRUNC | O_CREAT | O_WRONLY, 0660); \ + if (fd >= 0) \ + ::close(fd); \ + else \ + { \ + char buf[80]; int saved_errno = errno; \ + cerr << "failed to touch " << metadataPrefix/p/f << " got " << strerror_r(saved_errno, buf, 80) << endl; \ + } \ +} + +#define DELETE(p, f) ::unlink((metadataPrefix/p/f).string().c_str()); + +void Ownership::touchFlushing(const bf::path &prefix, volatile bool *doneFlushing) const +{ + while (!*doneFlushing) + { + TOUCH(prefix, "FLUSHING"); + try + { + boost::this_thread::sleep_for(boost::chrono::seconds(1)); + } + catch (boost::thread_interrupted &) + { } + } +} + +void Ownership::releaseOwnership(const bf::path &p, bool isDtor) +{ + logger->log(LOG_DEBUG, "Ownership: releasing ownership of %s", p.string().c_str()); + boost::unique_lock s(mutex); + + auto it = ownedPrefixes.find(p); + if (it == ownedPrefixes.end()) + { + logger->log(LOG_DEBUG, "Ownership::releaseOwnership(): told to disown %s, but do not own it", p.string().c_str()); + return; + } + ownedPrefixes.erase(it); + + if (isDtor) + { + // This is a quick release. If this is being destroyed, then it is through the graceful + // shutdown mechanism, which will flush data separately. + DELETE(p, "OWNED"); + DELETE(p, "FLUSHING"); + return; + } + + s.unlock(); + + volatile bool done = false; + + // start flushing + boost::thread xfer([this, &p, &done] { this->touchFlushing(p, &done); }); + Cache::get()->dropPrefix(p); + Synchronizer::get()->dropPrefix(p); + done = true; + xfer.interrupt(); + xfer.join(); + + // update state + DELETE(p, "OWNED"); + DELETE(p, "FLUSHING"); +} + +void Ownership::_takeOwnership(const bf::path &p) +{ + logger->log(LOG_DEBUG, "Ownership: taking ownership of %s", p.string().c_str()); + bf::create_directories(metadataPrefix/p); + DELETE(p, "FLUSHING"); + DELETE(p, "REQUEST_TRANSFER"); + // TODO: need to consider errors taking ownership + TOUCH(p, "OWNED"); + mutex.lock(); + ownedPrefixes[p] = true; + mutex.unlock(); + Synchronizer::get()->newPrefix(p); + Cache::get()->newPrefix(p); +} + +void Ownership::takeOwnership(const bf::path &p) +{ + boost::unique_lock s(mutex); + + auto it = ownedPrefixes.find(p); + if (it != ownedPrefixes.end()) + return; + ownedPrefixes[p] = NULL; + s.unlock(); + + bool okToTransfer = false; + struct stat statbuf; + int err; + char buf[80]; + bf::path ownedPath = metadataPrefix/p/"OWNED"; + bf::path flushingPath = metadataPrefix/p/"FLUSHING"; + + // if it's not already owned, then we can take possession + err = ::stat(ownedPath.string().c_str(), &statbuf); + if (err && errno == ENOENT) + { + _takeOwnership(p); + return; + } + + TOUCH(p, "REQUEST_TRANSFER"); + time_t lastFlushTime = time(NULL); + while (!okToTransfer && time(NULL) < lastFlushTime + 10) + { + // if the OWNED file is deleted or if the flushing file isn't touched after 10 secs + // it is ok to take possession. + err = ::stat(ownedPath.string().c_str(), &statbuf); + if (err) + { + if (errno == ENOENT) + okToTransfer = true; + else + logger->log(LOG_CRIT, "Ownership::takeOwnership(): got '%s' doing stat of %s", strerror_r(errno, buf, 80), + ownedPath.string().c_str()); + } + err = ::stat(flushingPath.string().c_str(), &statbuf); + if (err && errno != ENOENT) + logger->log(LOG_CRIT, "Ownership::takeOwnership(): got '%s' doing stat of %s", strerror_r(errno, buf, 80), + flushingPath.string().c_str()); + else + { + logger->log(LOG_DEBUG, "Ownership: waiting to get %s", p.string().c_str()); + if (!err) + lastFlushTime = statbuf.st_mtime; + } + if (!okToTransfer) + sleep(1); + } + _takeOwnership(p); +} + +Ownership::Monitor::Monitor(Ownership *_owner) : owner(_owner), stop(false) +{ + thread = boost::thread([this] { this->watchForInterlopers(); }); +} + +Ownership::Monitor::~Monitor() +{ + stop = true; + thread.interrupt(); + thread.join(); +} + +void Ownership::Monitor::watchForInterlopers() +{ + // look for requests to transfer ownership + struct stat statbuf; + int err; + char buf[80]; + vector releaseList; + + while (!stop) + { + releaseList.clear(); + boost::unique_lock s(owner->mutex); + + for (auto &prefix : owner->ownedPrefixes) + { + if (stop) + break; + if (prefix.second == false) + continue; + bf::path p(owner->metadataPrefix/(prefix.first)/"REQUEST_TRANSFER"); + const char *cp = p.string().c_str(); + + err = ::stat(cp, &statbuf); + // release it if there's a release request only. Log it if there's an error other than + // that the file isn't there. + if (err == 0) + releaseList.push_back(prefix.first); + if (err < 0 && errno != ENOENT) + owner->logger->log(LOG_ERR, "Runner::watchForInterlopers(): failed to stat %s, got %s", cp, + strerror_r(errno, buf, 80)); + } + s.unlock(); + + for (auto &prefix : releaseList) + owner->releaseOwnership(prefix); + if (stop) + break; + try + { + boost::this_thread::sleep_for(boost::chrono::seconds(1)); + } + catch (boost::thread_interrupted &) + { } + } +} + +} + diff --git a/src/Ownership.h b/src/Ownership.h new file mode 100644 index 000000000..e4c389506 --- /dev/null +++ b/src/Ownership.h @@ -0,0 +1,60 @@ +#ifndef OWNERSHIP_H_ +#define OWNERSHIP_H_ + +#include +#include +#include +#include "SMLogging.h" + +/* This class tracks the ownership of each prefix and manages ownership transfer. + Could we come up with a better name btw? */ + +namespace storagemanager +{ + +class Ownership : public boost::noncopyable +{ + public: + Ownership(); + ~Ownership(); + + bool sharedFS(); + // returns the path "right shifted" by prefixDepth, and with ownership of that path. + // on error it returns an empty path. + boost::filesystem::path get(const boost::filesystem::path &); + + + private: + uint prefixDepth; + boost::filesystem::path metadataPrefix; + SMLogging *logger; + + void touchFlushing(const boost::filesystem::path &, volatile bool *) const; + void takeOwnership(const boost::filesystem::path &); + void releaseOwnership(const boost::filesystem::path &, bool isDtor = false); + void _takeOwnership(const boost::filesystem::path &); + + struct Monitor + { + Monitor(Ownership *); + ~Monitor(); + boost::thread thread; + Ownership *owner; + volatile bool stop; + void watchForInterlopers(); + }; + + // maps a prefix to a state. ownedPrefixes[p] == false means it's being init'd, == true means it's ready for use. + std::map ownedPrefixes; + Monitor *monitor; + boost::mutex mutex; +}; + +inline bool Ownership::sharedFS() +{ + return prefixDepth >= 0; +} + +} + +#endif diff --git a/src/PrefixCache.cpp b/src/PrefixCache.cpp new file mode 100644 index 000000000..88ba1178a --- /dev/null +++ b/src/PrefixCache.cpp @@ -0,0 +1,631 @@ + +#include "PrefixCache.h" +#include "Cache.h" +#include "Config.h" +#include "Downloader.h" +#include "Synchronizer.h" +#include +#include +#include +#include +#include +#include +#include + +using namespace std; +namespace bf = boost::filesystem; + +namespace storagemanager +{ + +PrefixCache::PrefixCache(const bf::path &prefix) : firstDir(prefix), currentCacheSize(0) +{ + Config *conf = Config::get(); + logger = SMLogging::get(); + replicator = Replicator::get(); + downloader = Cache::get()->getDownloader(); + + string stmp = conf->getValue("Cache", "cache_size"); + if (stmp.empty()) + { + logger->log(LOG_CRIT, "Cache/cache_size is not set"); + throw runtime_error("Please set Cache/cache_size in the storagemanager.cnf file"); + } + try + { + maxCacheSize = stoul(stmp); + } + catch (invalid_argument &) + { + logger->log(LOG_CRIT, "Cache/cache_size is not a number"); + throw runtime_error("Please set Cache/cache_size to a number"); + } + //cout << "Cache got cache size " << maxCacheSize << endl; + + stmp = conf->getValue("ObjectStorage", "object_size"); + if (stmp.empty()) + { + logger->log(LOG_CRIT, "ObjectStorage/object_size is not set"); + throw runtime_error("Please set ObjectStorage/object_size in the storagemanager.cnf file"); + } + try + { + objectSize = stoul(stmp); + } + catch (invalid_argument &) + { + logger->log(LOG_CRIT, "ObjectStorage/object_size is not a number"); + throw runtime_error("Please set ObjectStorage/object_size to a number"); + } + + cachePrefix = conf->getValue("Cache", "path"); + if (cachePrefix.empty()) + { + logger->log(LOG_CRIT, "Cache/path is not set"); + throw runtime_error("Please set Cache/path in the storagemanager.cnf file"); + } + cachePrefix /= firstDir; + + try + { + bf::create_directories(cachePrefix); + } + catch (exception &e) + { + logger->log(LOG_CRIT, "Failed to create %s, got: %s", cachePrefix.string().c_str(), e.what()); + throw e; + } + + stmp = conf->getValue("ObjectStorage", "journal_path"); + if (stmp.empty()) + { + logger->log(LOG_CRIT, "ObjectStorage/journal_path is not set"); + throw runtime_error("Please set ObjectStorage/journal_path in the storagemanager.cnf file"); + } + journalPrefix = stmp; + journalPrefix /= firstDir; + + bf::create_directories(journalPrefix); + try + { + bf::create_directories(journalPrefix); + } + catch (exception &e) + { + logger->log(LOG_CRIT, "Failed to create %s, got: %s", journalPrefix.string().c_str(), e.what()); + throw e; + } + + lru_mutex.lock(); // unlocked by populate() when it's done + boost::thread t([this] { this->populate(); }); + t.detach(); +} + +PrefixCache::~PrefixCache() +{ + /* This and shutdown() need to do whatever is necessary to leave cache contents in a safe + state on disk. Does anything need to be done toward that? + */ +} + +void PrefixCache::populate() +{ + Synchronizer *sync = Synchronizer::get(); + bf::directory_iterator dir(cachePrefix); + bf::directory_iterator dend; + vector newObjects; + while (dir != dend) + { + // put everything in lru & m_lru + const bf::path &p = dir->path(); + if (bf::is_regular_file(p)) + { + lru.push_back(p.filename().string()); + auto last = lru.end(); + m_lru.insert(--last); + currentCacheSize += bf::file_size(*dir); + newObjects.push_back(p.filename().string()); + } + else + logger->log(LOG_WARNING, "Cache: found something in the cache that does not belong '%s'", p.string().c_str()); + ++dir; + } + sync->newObjects(firstDir, newObjects); + newObjects.clear(); + + // account for what's in the journal dir + vector > newJournals; + dir = bf::directory_iterator(journalPrefix); + while (dir != dend) + { + const bf::path &p = dir->path(); + if (bf::is_regular_file(p)) + { + if (p.extension() == ".journal") + { + size_t s = bf::file_size(*dir); + currentCacheSize += s; + newJournals.push_back(pair(p.stem().string(), s)); + } + else + logger->log(LOG_WARNING, "Cache: found a file in the journal dir that does not belong '%s'", p.string().c_str()); + } + else + logger->log(LOG_WARNING, "Cache: found something in the journal dir that does not belong '%s'", p.string().c_str()); + ++dir; + } + lru_mutex.unlock(); + sync->newJournalEntries(firstDir, newJournals); +} + +// be careful using this! SM should be idle. No ongoing reads or writes. +void PrefixCache::validateCacheSize() +{ + boost::unique_lock s(lru_mutex); + + if (!doNotEvict.empty() || !toBeDeleted.empty()) + { + cout << "Not safe to use validateCacheSize() at the moment." << endl; + return; + } + + size_t oldSize = currentCacheSize; + currentCacheSize = 0; + m_lru.clear(); + lru.clear(); + populate(); + + if (oldSize != currentCacheSize) + logger->log(LOG_DEBUG, "PrefixCache::validateCacheSize(): found a discrepancy. Actual size is %lld, had %lld.", + currentCacheSize, oldSize); + else + logger->log(LOG_DEBUG, "PrefixCache::validateCacheSize(): Cache size accounting agrees with reality for now."); +} + +void PrefixCache::read(const vector &keys) +{ + /* Move existing keys to the back of the LRU, start downloading nonexistant keys. + */ + vector keysToFetch; + vector dlErrnos; + vector dlSizes; + + boost::unique_lock s(lru_mutex); + + M_LRU_t::iterator mit; + for (const string &key : keys) + { + mit = m_lru.find(key); + if (mit != m_lru.end()) + { + addToDNE(mit->lit); + lru.splice(lru.end(), lru, mit->lit); // move them to the back so they are last to pick for eviction + } + else + { + // There's window where the file has been downloaded but is not yet + // added to the lru structs. However it is in the DNE. If it is in the DNE, then it is also + // in Downloader's map. So, this thread needs to start the download if it's not in the + // DNE or if there's an existing download that hasn't finished yet. Starting the download + // includes waiting for an existing download to finish, which from this class's pov is the + // same thing. + if (doNotEvict.find(key) == doNotEvict.end() || downloader->inProgress(key)) + keysToFetch.push_back(&key); + else + cout << "Cache: detected and stopped a racey download" << endl; + addToDNE(key); + } + } + if (keysToFetch.empty()) + return; + + downloader->download(keysToFetch, &dlErrnos, &dlSizes, cachePrefix, &lru_mutex); + + size_t sum_sizes = 0; + for (uint i = 0; i < keysToFetch.size(); ++i) + { + // downloads with size 0 didn't actually happen, either because it + // was a preexisting download (another read() call owns it), or because + // there was an error downloading it. Use size == 0 as an indication of + // what to add to the cache. Also needs to verify that the file was not deleted, + // indicated by existence in doNotEvict. + if (dlSizes[i] != 0) + { + if (doNotEvict.find(*keysToFetch[i]) != doNotEvict.end()) + { + sum_sizes += dlSizes[i]; + lru.push_back(*keysToFetch[i]); + LRU_t::iterator lit = lru.end(); + m_lru.insert(--lit); // I dislike this way of grabbing the last iterator in a list. + } + else // it was downloaded, but a deletion happened so we have to toss it + { + cout << "removing a file that was deleted by another thread during download" << endl; + bf::remove(cachePrefix / (*keysToFetch[i])); + } + } + } + + // move everything in keys to the back of the lru (yes, again) + for (const string &key : keys) + { + mit = m_lru.find(key); + if (mit != m_lru.end()) // all of the files exist, just not all of them are 'owned by' this thread. + lru.splice(lru.end(), lru, mit->lit); + } + + // fix cache size + //_makeSpace(sum_sizes); + currentCacheSize += sum_sizes; +} + +void PrefixCache::doneReading(const vector &keys) +{ + boost::unique_lock s(lru_mutex); + for (const string &key : keys) + { + removeFromDNE(key); + // most should be in the map. + // debateable whether it's faster to look up the list iterator and use it + // or whether it's faster to bypass that and use strings only. + + //const auto &it = m_lru.find(key); + //if (it != m_lru.end()) + // removeFromDNE(it->lit); + } + _makeSpace(0); +} + +void PrefixCache::doneWriting() +{ + makeSpace(0); +} + +PrefixCache::DNEElement::DNEElement(const LRU_t::iterator &k) : key(k), refCount(1) +{ +} +PrefixCache::DNEElement::DNEElement(const string &k) : sKey(k), refCount(1) +{ +} + +void PrefixCache::addToDNE(const DNEElement &key) +{ + DNE_t::iterator it = doNotEvict.find(key); + if (it != doNotEvict.end()) + { + DNEElement &dnee = const_cast(*it); + ++(dnee.refCount); + } + else + doNotEvict.insert(key); +} + +void PrefixCache::removeFromDNE(const DNEElement &key) +{ + DNE_t::iterator it = doNotEvict.find(key); + if (it == doNotEvict.end()) + return; + DNEElement &dnee = const_cast(*it); + if (--(dnee.refCount) == 0) + doNotEvict.erase(it); +} + +const bf::path & PrefixCache::getCachePath() +{ + return cachePrefix; +} + +const bf::path & PrefixCache::getJournalPath() +{ + return journalPrefix; +} + +void PrefixCache::exists(const vector &keys, vector *out) const +{ + out->resize(keys.size()); + boost::unique_lock s(lru_mutex); + for (uint i = 0; i < keys.size(); i++) + (*out)[i] = (m_lru.find(keys[i]) != m_lru.end()); +} + +bool PrefixCache::exists(const string &key) const +{ + boost::unique_lock s(lru_mutex); + return m_lru.find(key) != m_lru.end(); +} + +void PrefixCache::newObject(const string &key, size_t size) +{ + boost::unique_lock s(lru_mutex); + assert(m_lru.find(key) == m_lru.end()); + //_makeSpace(size); + lru.push_back(key); + LRU_t::iterator back = lru.end(); + m_lru.insert(--back); + currentCacheSize += size; +} + +void PrefixCache::newJournalEntry(size_t size) +{ + boost::unique_lock s(lru_mutex); + //_makeSpace(size); + currentCacheSize += size; +} + +void PrefixCache::deletedJournal(size_t size) +{ + boost::unique_lock s(lru_mutex); + assert(currentCacheSize >= size); + currentCacheSize -= size; +} + +void PrefixCache::deletedObject(const string &key, size_t size) +{ + boost::unique_lock s(lru_mutex); + assert(currentCacheSize >= size); + M_LRU_t::iterator mit = m_lru.find(key); + assert(mit != m_lru.end()); + + // if it's being flushed, let makeSpace() do the deleting + if (toBeDeleted.find(mit->lit) == toBeDeleted.end()) + { + doNotEvict.erase(mit->lit); + lru.erase(mit->lit); + m_lru.erase(mit); + currentCacheSize -= size; + } +} + +void PrefixCache::setMaxCacheSize(size_t size) +{ + boost::unique_lock s(lru_mutex); + if (size < maxCacheSize) + _makeSpace(maxCacheSize - size); + maxCacheSize = size; +} + +void PrefixCache::makeSpace(size_t size) +{ + boost::unique_lock s(lru_mutex); + _makeSpace(size); +} + +size_t PrefixCache::getMaxCacheSize() const +{ + return maxCacheSize; +} + +// call this holding lru_mutex +void PrefixCache::_makeSpace(size_t size) +{ + ssize_t thisMuch = currentCacheSize + size - maxCacheSize; + if (thisMuch <= 0) + return; + + LRU_t::iterator it; + while (thisMuch > 0 && !lru.empty()) + { + it = lru.begin(); + // find the first element not being either read() right now or being processed by another + // makeSpace() call. + while (it != lru.end()) + { + // make sure it's not currently being read or being flushed by another _makeSpace() call + if ((doNotEvict.find(it) == doNotEvict.end()) && (toBeDeleted.find(it) == toBeDeleted.end())) + break; + ++it; + } + if (it == lru.end()) + { + // nothing can be deleted right now + return; + } + + if (!bf::exists(cachePrefix / *it)) + cout << cachePrefix / *it << " doesn't exist, WTF?" << endl; // ran into this a couple times, still happens as of commit 948ee1aa5 + assert(bf::exists(cachePrefix / *it)); + /* + tell Synchronizer that this key will be evicted + delete the file + remove it from our structs + update current size + */ + + //logger->log(LOG_WARNING, "Cache: flushing!"); + toBeDeleted.insert(it); + + string key = *it; // need to make a copy; it could get changed after unlocking. + + lru_mutex.unlock(); + try + { + Synchronizer::get()->flushObject(firstDir, key); + } + catch (...) + { + // it gets logged by Sync + lru_mutex.lock(); + toBeDeleted.erase(it); + continue; + } + lru_mutex.lock(); + + // check doNotEvict again in case this object is now being read + if (doNotEvict.find(it) == doNotEvict.end()) + { + bf::path cachedFile = cachePrefix / *it; + m_lru.erase(*it); + toBeDeleted.erase(it); + lru.erase(it); + size_t newSize = bf::file_size(cachedFile); + replicator->remove(cachedFile, Replicator::LOCAL_ONLY); + if (newSize < currentCacheSize) + { + currentCacheSize -= newSize; + thisMuch -= newSize; + } + else + { + logger->log(LOG_WARNING, "PrefixCache::makeSpace(): accounting error. Almost wrapped currentCacheSize on flush."); + currentCacheSize = 0; + thisMuch = 0; + } + } + else + toBeDeleted.erase(it); + } +} + +void PrefixCache::rename(const string &oldKey, const string &newKey, ssize_t sizediff) +{ + // rename it in the LRU + // erase/insert to rehash it everywhere else + + boost::unique_lock s(lru_mutex); + auto it = m_lru.find(oldKey); + if (it == m_lru.end()) + return; + + auto lit = it->lit; + m_lru.erase(it); + int refCount = 0; + auto dne_it = doNotEvict.find(lit); + if (dne_it != doNotEvict.end()) + { + refCount = dne_it->refCount; + doNotEvict.erase(dne_it); + } + + auto tbd_it = toBeDeleted.find(lit); + bool hasTBDEntry = (tbd_it != toBeDeleted.end()); + if (hasTBDEntry) + toBeDeleted.erase(tbd_it); + + *lit = newKey; + + if (hasTBDEntry) + toBeDeleted.insert(lit); + if (refCount != 0) + { + pair dne_tmp = doNotEvict.insert(lit); + const_cast(*(dne_tmp.first)).refCount = refCount; + } + + m_lru.insert(lit); + currentCacheSize += sizediff; +} + +int PrefixCache::ifExistsThenDelete(const string &key) +{ + bf::path cachedPath = cachePrefix / key; + bf::path journalPath = journalPrefix / (key + ".journal"); + + boost::unique_lock s(lru_mutex); + bool objectExists = false; + + auto it = m_lru.find(key); + if (it != m_lru.end()) + { + if (toBeDeleted.find(it->lit) == toBeDeleted.end()) + { + doNotEvict.erase(it->lit); + lru.erase(it->lit); + m_lru.erase(it); + objectExists = true; + } + else // let makeSpace() delete it if it's already in progress + return 0; + } + bool journalExists = bf::exists(journalPath); + //assert(objectExists == bf::exists(cachedPath)); + + size_t objectSize = (objectExists ? bf::file_size(cachedPath) : 0); + //size_t objectSize = (objectExists ? MetadataFile::getLengthFromKey(key) : 0); + size_t journalSize = (journalExists ? bf::file_size(journalPath) : 0); + currentCacheSize -= (objectSize + journalSize); + + //assert(!objectExists || objectSize == bf::file_size(cachedPath)); + + return (objectExists ? 1 : 0) | (journalExists ? 2 : 0); +} + +size_t PrefixCache::getCurrentCacheSize() const +{ + return currentCacheSize; +} + +size_t PrefixCache::getCurrentCacheElementCount() const +{ + boost::unique_lock s(lru_mutex); + assert(m_lru.size() == lru.size()); + return m_lru.size(); +} + +void PrefixCache::reset() +{ + boost::unique_lock s(lru_mutex); + m_lru.clear(); + lru.clear(); + toBeDeleted.clear(); + doNotEvict.clear(); + + bf::directory_iterator dir; + bf::directory_iterator dend; + for (dir = bf::directory_iterator(cachePrefix); dir != dend; ++dir) + bf::remove_all(dir->path()); + + for (dir = bf::directory_iterator(journalPrefix); dir != dend; ++dir) + bf::remove_all(dir->path()); + currentCacheSize = 0; +} + +void PrefixCache::shutdown() +{ + /* Does this need to do something anymore? */ +} + +/* The helper classes */ + +PrefixCache::M_LRU_element_t::M_LRU_element_t(const string *k) : key(k) +{} + +PrefixCache::M_LRU_element_t::M_LRU_element_t(const string &k) : key(&k) +{} + +PrefixCache::M_LRU_element_t::M_LRU_element_t(const LRU_t::iterator &i) : key(&(*i)), lit(i) +{} + +inline size_t PrefixCache::KeyHasher::operator()(const M_LRU_element_t &l) const +{ + return hash()(*(l.key)); +} + +inline bool PrefixCache::KeyEquals::operator()(const M_LRU_element_t &l1, const M_LRU_element_t &l2) const +{ + return (*(l1.key) == *(l2.key)); +} + +inline size_t PrefixCache::DNEHasher::operator()(const DNEElement &l) const +{ + return (l.sKey.empty() ? hash()(*(l.key)) : hash()(l.sKey)); +} + +inline bool PrefixCache::DNEEquals::operator()(const DNEElement &l1, const DNEElement &l2) const +{ + const string *s1, *s2; + s1 = l1.sKey.empty() ? &(*(l1.key)) : &(l1.sKey); + s2 = l2.sKey.empty() ? &(*(l2.key)) : &(l2.sKey); + + return (*s1 == *s2); +} + +inline bool PrefixCache::TBDLess::operator()(const LRU_t::iterator &i1, const LRU_t::iterator &i2) const +{ + return *i1 < *i2; +} + +} + + + + diff --git a/src/PrefixCache.h b/src/PrefixCache.h new file mode 100644 index 000000000..ba9d3f089 --- /dev/null +++ b/src/PrefixCache.h @@ -0,0 +1,153 @@ +#ifndef PREFIXCACHE_H_ +#define PREFIXCACHE_H_ + +/* PrefixCache manages the cache for one prefix managed by SM. + Cache is a map of prefix -> PrefixCache, and holds the items + that should be centralized like the Downloader +*/ + + +#include "Downloader.h" +#include "SMLogging.h" +#include "Replicator.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace storagemanager +{ + +class PrefixCache : public boost::noncopyable +{ + public: + PrefixCache(const boost::filesystem::path &prefix); + virtual ~PrefixCache(); + + //reading fcns + // read() marks objects to be read s.t. they do not get flushed. + // after reading them, unlock the 'logical file', and call doneReading(). + void read(const std::vector &keys); + void doneReading(const std::vector &keys); + bool exists(const std::string &key) const; + void exists(const std::vector &keys, std::vector *out) const; + + // writing fcns + // new*() fcns tell the PrefixCache data was added. After writing a set of objects, + // unlock the 'logical file', and call doneWriting(). + void newObject(const std::string &key, size_t size); + void newJournalEntry(size_t size); + void doneWriting(); + void deletedObject(const std::string &key, size_t size); + void deletedJournal(size_t size); + + // an 'atomic' existence check & delete. Covers the object and journal. Does not delete the files. + // returns 0 if it didn't exist, 1 if the object exists, 2 if the journal exists, and 3 (1 | 2) if both exist + // This should be called while holding the file lock for key because it touches the journal file. + int ifExistsThenDelete(const std::string &key); + + // rename is used when an old obj gets merged with its journal file + // the size will change in that process; sizediff is by how much + void rename(const std::string &oldKey, const std::string &newKey, ssize_t sizediff); + void setMaxCacheSize(size_t size); + void makeSpace(size_t size); + size_t getCurrentCacheSize() const; + size_t getCurrentCacheElementCount() const; + size_t getMaxCacheSize() const; + void shutdown(); + + // test helpers + const boost::filesystem::path &getCachePath(); + const boost::filesystem::path &getJournalPath(); + // this will delete everything in the PrefixCache and journal paths, and empty all PrefixCache structures. + void reset(); + void validateCacheSize(); + + private: + PrefixCache(); + + boost::filesystem::path cachePrefix; + boost::filesystem::path journalPrefix; + boost::filesystem::path firstDir; + size_t maxCacheSize; + size_t objectSize; + size_t currentCacheSize; + Replicator *replicator; + SMLogging *logger; + Downloader *downloader; + + void populate(); + void _makeSpace(size_t size); + + /* The main PrefixCache structures */ + // lru owns the string memory for the filenames it manages. m_lru and DNE point to those strings. + typedef std::list LRU_t; + LRU_t lru; + + struct M_LRU_element_t + { + M_LRU_element_t(const std::string &); + M_LRU_element_t(const std::string *); + M_LRU_element_t(const LRU_t::iterator &); + const std::string *key; + LRU_t::iterator lit; + }; + struct KeyHasher + { + size_t operator()(const M_LRU_element_t &l) const; + }; + + struct KeyEquals + { + bool operator()(const M_LRU_element_t &l1, const M_LRU_element_t &l2) const; + }; + + typedef std::unordered_set M_LRU_t; + M_LRU_t m_lru; // the LRU entries as a hash table + + /* The do-not-evict list stuff. */ + struct DNEElement + { + DNEElement(const LRU_t::iterator &); + DNEElement(const std::string &); + LRU_t::iterator key; + std::string sKey; + uint refCount; + }; + + struct DNEHasher + { + size_t operator()(const DNEElement &d) const; + }; + + struct DNEEquals + { + bool operator()(const DNEElement &d1, const DNEElement &d2) const; + }; + + typedef std::unordered_set DNE_t; + DNE_t doNotEvict; + void addToDNE(const DNEElement &); + void removeFromDNE(const DNEElement &); + + // the to-be-deleted set. Elements removed from the LRU but not yet deleted will be here. + // Elements are inserted and removed by makeSpace(). If read() references a file that is in this, + // it will remove it, signalling to makeSpace that it should not be deleted + struct TBDLess + { + bool operator()(const LRU_t::iterator &, const LRU_t::iterator &) const; + }; + + typedef std::set TBD_t; + TBD_t toBeDeleted; + + mutable boost::mutex lru_mutex; // protects the main PrefixCache structures & the do-not-evict set +}; + +} + +#endif diff --git a/src/ReadTask.cpp b/src/ReadTask.cpp index cce09ab58..93bfdd525 100644 --- a/src/ReadTask.cpp +++ b/src/ReadTask.cpp @@ -56,7 +56,6 @@ bool ReadTask::run() // todo: do the reading and writing in chunks // todo: need to make this use O_DIRECT on the IOC side - ioc->willRead(cmd->filename, cmd->offset, cmd->count); ssize_t err; while ((uint) resp->returnCode < cmd->count) { diff --git a/src/Replicator.cpp b/src/Replicator.cpp index 3b0a6cef4..87b9dbd52 100644 --- a/src/Replicator.cpp +++ b/src/Replicator.cpp @@ -121,6 +121,7 @@ int Replicator::addJournalEntry(const char *filename, const uint8_t *data, off_t size_t count = 0; int version = 1; string journalFilename = msJournalPath + "/" + string(filename) + ".journal"; + boost::filesystem::path firstDir = *(boost::filesystem::path(filename).begin()); uint64_t thisEntryMaxOffset = (offset + length - 1); bool exists = boost::filesystem::exists(journalFilename); @@ -135,7 +136,7 @@ int Replicator::addJournalEntry(const char *filename, const uint8_t *data, off_t assert((uint) err == header.length() + 1); if (err <= 0) return err; - Cache::get()->newJournalEntry(header.length() + 1); + Cache::get()->newJournalEntry(firstDir, header.length() + 1); } else { diff --git a/src/Synchronizer.cpp b/src/Synchronizer.cpp index fbe686cf9..7de83ee69 100644 --- a/src/Synchronizer.cpp +++ b/src/Synchronizer.cpp @@ -59,8 +59,8 @@ Synchronizer::Synchronizer() : maxUploads(0) threadPool.reset(new ThreadPool()); threadPool->setMaxThreads(maxUploads); die = false; - uncommittedJournalSize = 0; journalSizeThreshold = cache->getMaxCacheSize() / 2; + blockNewJobs = false; syncThread = boost::thread([this] () { this->periodicSync(); }); } @@ -84,9 +84,28 @@ enum OpFlags NEW_OBJECT = 0x4, }; -void Synchronizer::_newJournalEntry(const string &key, size_t size) +/* XXXPAT. Need to revisit this later. To make multiple prefix functionality as minimal as possible, +I limited the changes to key string manipulation where possible. The keys it manages have the prefix they +belong to prepended. So key 12345 in prefix p1 becomes p1/12345. This is not the most elegant or performant +option, just the least invasive. +*/ + +void Synchronizer::newPrefix(const bf::path &p) { - uncommittedJournalSize += size; + uncommittedJournalSize[p] = 0; +} + +void Synchronizer::dropPrefix(const bf::path &p) +{ + syncNow(p); + boost::unique_lock s(mutex); + uncommittedJournalSize.erase(p); +} + +void Synchronizer::_newJournalEntry(const bf::path &prefix, const string &_key, size_t size) +{ + string key = (prefix/_key).string(); + uncommittedJournalSize[prefix] += size; auto it = pendingOps.find(key); if (it != pendingOps.end()) { @@ -97,61 +116,64 @@ void Synchronizer::_newJournalEntry(const string &key, size_t size) pendingOps[key] = boost::shared_ptr(new PendingOps(JOURNAL)); } -void Synchronizer::newJournalEntry(const string &key, size_t size) +void Synchronizer::newJournalEntry(const bf::path &prefix, const string &_key, size_t size) { boost::unique_lock s(mutex); - _newJournalEntry(key, size); - if (uncommittedJournalSize > journalSizeThreshold) + _newJournalEntry(prefix, _key, size); + if (uncommittedJournalSize[prefix] > journalSizeThreshold) { - uncommittedJournalSize = 0; + uncommittedJournalSize[prefix] = 0; s.unlock(); forceFlush(); } } -void Synchronizer::newJournalEntries(const vector > &keys) +void Synchronizer::newJournalEntries(const bf::path &prefix, const vector > &keys) { boost::unique_lock s(mutex); for (auto &keysize : keys) - _newJournalEntry(keysize.first, keysize.second); - if (uncommittedJournalSize > journalSizeThreshold) + _newJournalEntry(prefix, keysize.first, keysize.second); + if (uncommittedJournalSize[prefix] > journalSizeThreshold) { - uncommittedJournalSize = 0; + uncommittedJournalSize[prefix] = 0; s.unlock(); forceFlush(); } } -void Synchronizer::newObjects(const vector &keys) +void Synchronizer::newObjects(const bf::path &prefix, const vector &keys) { boost::unique_lock s(mutex); - for (const string &key : keys) + for (const string &_key : keys) { - assert(pendingOps.find(key) == pendingOps.end()); + bf::path key(prefix/_key); + assert(pendingOps.find(key.string()) == pendingOps.end()); //makeJob(key); - pendingOps[key] = boost::shared_ptr(new PendingOps(NEW_OBJECT)); + pendingOps[key.string()] = boost::shared_ptr(new PendingOps(NEW_OBJECT)); } } -void Synchronizer::deletedObjects(const vector &keys) +void Synchronizer::deletedObjects(const bf::path &prefix, const vector &keys) { boost::unique_lock s(mutex); - for (const string &key : keys) + for (const string &_key : keys) { - auto it = pendingOps.find(key); + bf::path key(prefix/_key); + auto it = pendingOps.find(key.string()); if (it != pendingOps.end()) it->second->opFlags |= DELETE; else - pendingOps[key] = boost::shared_ptr(new PendingOps(DELETE)); + pendingOps[key.string()] = boost::shared_ptr(new PendingOps(DELETE)); } // would be good to signal to the things in opsInProgress that these were deleted. That would // quiet down the logging somewhat. How to do that efficiently, and w/o gaps or deadlock... } -void Synchronizer::flushObject(const string &key) +void Synchronizer::flushObject(const bf::path &prefix, const string &_key) { + string key = (prefix/_key).string(); boost::unique_lock s(mutex); // if there is something to do on key, it should be either in pendingOps or opsInProgress @@ -192,7 +214,7 @@ void Synchronizer::flushObject(const string &key) bool keyExists, journalExists; int err; do { - err = cs->exists(key.c_str(), &keyExists); + err = cs->exists(_key.c_str(), &keyExists); if (err) { char buf[80]; @@ -239,14 +261,37 @@ void Synchronizer::periodicSync() //logger->log(LOG_DEBUG,"Synchronizer Force Flush."); } lock.lock(); + if (blockNewJobs) + continue; //cout << "Sync'ing " << pendingOps.size() << " objects" << " queue size is " << // threadPool.currentQueueSize() << endl; for (auto &job : pendingOps) makeJob(job.first); - uncommittedJournalSize = 0; + for (auto it = uncommittedJournalSize.begin(); it != uncommittedJournalSize.end(); ++it) + it->second = 0; } } +void Synchronizer::syncNow(const bf::path &prefix) +{ + boost::unique_lock lock(mutex); + + // this is pretty hacky. when time permits, implement something better. + // + // Issue all of the pendingOps for the given prefix + // recreate the threadpool (dtor returns once all jobs have finished) + // resume normal operation + + blockNewJobs = true; + for (auto &job : pendingOps) + if (job.first.find(prefix.string()) == 0) + makeJob(job.first); + uncommittedJournalSize[prefix] = 0; + threadPool.reset(new ThreadPool()); + threadPool->setMaxThreads(maxUploads); + blockNewJobs = false; +} + void Synchronizer::forceFlush() { boost::unique_lock lock(mutex); @@ -361,6 +406,9 @@ void Synchronizer::synchronize(const string &sourceFile, list::iterator ScopedReadLock s(ioc, sourceFile); string &key = *it; + size_t pos = key.find_first_of('/'); + bf::path prefix = key.substr(0, pos); + string cloudKey = key.substr(pos + 1); char buf[80]; bool exists = false; int err; @@ -374,7 +422,7 @@ void Synchronizer::synchronize(const string &sourceFile, list::iterator const metadataObject *mdEntry; bool entryExists = md.getEntry(MetadataFile::getOffsetFromKey(key), &mdEntry); - if (!entryExists || key != mdEntry->key) + if (!entryExists || cloudKey != mdEntry->key) { logger->log(LOG_DEBUG, "synchronize(): %s does not exist in metadata for %s. This suggests truncation.", key.c_str(), sourceFile.c_str()); return; @@ -382,7 +430,7 @@ void Synchronizer::synchronize(const string &sourceFile, list::iterator //assert(key == mdEntry->key); <-- This could fail b/c of truncation + a write/append before this job runs. - err = cs->exists(key, &exists); + err = cs->exists(cloudKey, &exists); if (err) throw runtime_error(string("synchronize(): checking existence of ") + key + ", got " + strerror_r(errno, buf, 80)); @@ -390,14 +438,14 @@ void Synchronizer::synchronize(const string &sourceFile, list::iterator return; // TODO: should be safe to check with Cache instead of a file existence check - exists = cache->exists(key); + exists = cache->exists(prefix, cloudKey); if (!exists) { logger->log(LOG_DEBUG, "synchronize(): was told to upload %s but it does not exist locally", key.c_str()); return; } - err = cs->putObject((cachePath / key).string(), key); + err = cs->putObject((cachePath / key).string(), cloudKey); if (err) throw runtime_error(string("synchronize(): uploading ") + key + ", got " + strerror_r(errno, buf, 80)); replicator->remove((cachePath/key).string().c_str(), Replicator::NO_LOCAL); @@ -406,7 +454,8 @@ void Synchronizer::synchronize(const string &sourceFile, list::iterator void Synchronizer::synchronizeDelete(const string &sourceFile, list::iterator &it) { ScopedWriteLock s(ioc, sourceFile); - cs->deleteObject(*it); + string cloudKey = it->substr(it->find('/') + 1); + cs->deleteObject(cloudKey); } void Synchronizer::synchronizeWithJournal(const string &sourceFile, list::iterator &lit) @@ -415,6 +464,10 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list char buf[80]; string key = *lit; + size_t pos = key.find_first_of('/'); + bf::path prefix = key.substr(0, pos); + string cloudKey = key.substr(pos + 1); + MetadataFile md(sourceFile.c_str(), MetadataFile::no_create_t()); if (!md.exists()) @@ -425,7 +478,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list const metadataObject *mdEntry; bool metaExists = md.getEntry(MetadataFile::getOffsetFromKey(key), &mdEntry); - if (!metaExists || key != mdEntry->key) + if (!metaExists || cloudKey != mdEntry->key) { logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s does not exist in metadata for %s. This suggests truncation.", key.c_str(), sourceFile.c_str()); return; @@ -442,12 +495,12 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list // sanity check + add'l info. Test whether the object exists in cloud storage. If so, complain, // and run synchronize() instead. bool existsOnCloud; - int err = cs->exists(key, &existsOnCloud); + int err = cs->exists(cloudKey, &existsOnCloud); if (err) throw runtime_error(string("Synchronizer: cs->exists() failed: ") + strerror_r(errno, buf, 80)); if (!existsOnCloud) { - if (cache->exists(key)) + if (cache->exists(prefix, cloudKey)) { logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s has no journal and does not exist in the cloud, calling " "synchronize() instead. Need to explain how this happens.", key.c_str()); @@ -470,13 +523,13 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list boost::shared_array data; size_t count = 0, size = mdEntry->length; - bool oldObjIsCached = cache->exists(key); + bool oldObjIsCached = cache->exists(prefix, cloudKey); // get the base object if it is not already cached // merge it with its journal file if (!oldObjIsCached) { - err = cs->getObject(key, &data, &size); + err = cs->getObject(cloudKey, &data, &size); if (err) { if (errno == ENOENT) @@ -524,14 +577,15 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list } // get a new key for the resolved version & upload it - string newKey = MetadataFile::getNewKeyFromOldKey(key, size); - err = cs->putObject(data, size, newKey); + string newCloudKey = MetadataFile::getNewKeyFromOldKey(key, size); + string newKey = (prefix/newCloudKey).string(); + err = cs->putObject(data, size, newCloudKey); if (err) { // try to delete it in cloud storage... unlikely it is there in the first place, and if it is // this probably won't work int l_errno = errno; - cs->deleteObject(newKey); + cs->deleteObject(newCloudKey); throw runtime_error(string("Synchronizer: putObject() failed: ") + strerror_r(l_errno, buf, 80)); } @@ -562,22 +616,21 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list } count += err; } - cache->rename(key, newKey, size - bf::file_size(oldCachePath)); + cache->rename(prefix, cloudKey, newCloudKey, size - bf::file_size(oldCachePath)); replicator->remove(oldCachePath); } // update the metadata for the source file - md.updateEntry(MetadataFile::getOffsetFromKey(key), newKey, size); + md.updateEntry(MetadataFile::getOffsetFromKey(key), newCloudKey, size); replicator->updateMetadata(sourceFile.c_str(), md); rename(key, newKey); - ioc->renameObject(key, newKey); // delete the old object & journal file - cache->deletedJournal(bf::file_size(journalName)); + cache->deletedJournal(prefix, bf::file_size(journalName)); replicator->remove(journalName); - cs->deleteObject(key); + cs->deleteObject(cloudKey); } void Synchronizer::rename(const string &oldKey, const string &newKey) @@ -639,4 +692,12 @@ void Synchronizer::PendingOps::wait(boost::mutex *m) } } +Synchronizer::Job::Job(Synchronizer *s, std::list::iterator i) : sync(s), it(i) +{ } + +void Synchronizer::Job::operator()() +{ + sync->process(it); +} + } diff --git a/src/Synchronizer.h b/src/Synchronizer.h index dc72ce343..a206e6de0 100644 --- a/src/Synchronizer.h +++ b/src/Synchronizer.h @@ -21,7 +21,6 @@ namespace storagemanager class Cache; // break circular dependency in header files class IOCoordinator; -/* TODO: Need to think about how errors are handled / propagated */ class Synchronizer : public boost::noncopyable { public: @@ -30,12 +29,16 @@ class Synchronizer : public boost::noncopyable // these take keys as parameters, not full path names, ex, pass in '12345' not // 'cache/12345'. - void newJournalEntry(const std::string &key, size_t len); - void newJournalEntries(const std::vector > &keys); - void newObjects(const std::vector &keys); - void deletedObjects(const std::vector &keys); - void flushObject(const std::string &key); - void forceFlush(); + void newJournalEntry(const boost::filesystem::path &firstDir, const std::string &key, size_t len); + void newJournalEntries(const boost::filesystem::path &firstDir, const std::vector > &keys); + void newObjects(const boost::filesystem::path &firstDir, const std::vector &keys); + void deletedObjects(const boost::filesystem::path &firstDir, const std::vector &keys); + void flushObject(const boost::filesystem::path &firstDir, const std::string &key); + void forceFlush(); // ideally, make a version of this that takes a firstDir parameter + + + void newPrefix(const boost::filesystem::path &p); + void dropPrefix(const boost::filesystem::path &p); // for testing primarily boost::filesystem::path getJournalPath(); @@ -43,7 +46,7 @@ class Synchronizer : public boost::noncopyable private: Synchronizer(); - void _newJournalEntry(const std::string &key, size_t len); + void _newJournalEntry(const boost::filesystem::path &firstDir, const std::string &key, size_t len); void process(std::list::iterator key); void synchronize(const std::string &sourceFile, std::list::iterator &it); void synchronizeDelete(const std::string &sourceFile, std::list::iterator &it); @@ -66,8 +69,8 @@ class Synchronizer : public boost::noncopyable struct Job : public ThreadPool::Job { - Job(Synchronizer *s, std::list::iterator i) : sync(s), it(i) { } - void operator()() { sync->process(it); } + Job(Synchronizer *s, std::list::iterator i); + void operator()(); Synchronizer *sync; std::list::iterator it; }; @@ -88,7 +91,11 @@ class Synchronizer : public boost::noncopyable boost::thread syncThread; const boost::chrono::seconds syncInterval = boost::chrono::seconds(10); void periodicSync(); - size_t uncommittedJournalSize, journalSizeThreshold; + std::map uncommittedJournalSize; + size_t journalSizeThreshold; + bool blockNewJobs; + + void syncNow(const boost::filesystem::path &prefix); // a synchronous version of forceFlush() SMLogging *logger; Cache *cache; diff --git a/src/Utilities.cpp b/src/Utilities.cpp index 88af9dcd9..cb2fdd177 100644 --- a/src/Utilities.cpp +++ b/src/Utilities.cpp @@ -4,10 +4,19 @@ namespace storagemanager { -ScopedReadLock::ScopedReadLock(IOCoordinator *i, const std::string &k) : ioc(i), locked(false), key(k) +ScopedFileLock::ScopedFileLock(IOCoordinator *i, const std::string &k) : ioc(i), locked(false), key(k) +{ +} + +ScopedFileLock::~ScopedFileLock() +{ +} + +ScopedReadLock::ScopedReadLock(IOCoordinator *i, const std::string &k) : ScopedFileLock(i, k) { lock(); } + ScopedReadLock::~ScopedReadLock() { unlock(); @@ -29,10 +38,11 @@ void ScopedReadLock::unlock() } } -ScopedWriteLock::ScopedWriteLock(IOCoordinator *i, const std::string &k) : ioc(i), locked(false), key(k) +ScopedWriteLock::ScopedWriteLock(IOCoordinator *i, const std::string &k) : ScopedFileLock(i, k) { lock(); } + ScopedWriteLock::~ScopedWriteLock() { unlock(); diff --git a/src/Utilities.h b/src/Utilities.h index 1df8766a4..75e1a8284 100644 --- a/src/Utilities.h +++ b/src/Utilities.h @@ -11,28 +11,34 @@ class IOCoordinator; // a few utility classes we've coded here and there, now de-duped and centralized. // modify as necessary. -struct ScopedReadLock + +struct ScopedFileLock { - ScopedReadLock(IOCoordinator *i, const std::string &k); - ~ScopedReadLock(); - void lock(); - void unlock(); + ScopedFileLock(IOCoordinator *i, const std::string &k); + virtual ~ScopedFileLock(); + + virtual void lock() = 0; + virtual void unlock() = 0; IOCoordinator *ioc; bool locked; const std::string key; }; -struct ScopedWriteLock +struct ScopedReadLock : public ScopedFileLock { - ScopedWriteLock(IOCoordinator *i, const std::string &k); - ~ScopedWriteLock(); + ScopedReadLock(IOCoordinator *i, const std::string &k); + virtual ~ScopedReadLock(); + void lock(); + void unlock(); +}; + +struct ScopedWriteLock : public ScopedFileLock +{ + ScopedWriteLock(IOCoordinator *i, const std::string &k); + virtual ~ScopedWriteLock(); void lock(); void unlock(); - - IOCoordinator *ioc; - bool locked; - const std::string key; }; struct ScopedCloser diff --git a/src/main.cpp b/src/main.cpp index a02639478..8182ad91c 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -68,11 +68,10 @@ int main(int argc, char** argv) SessionManager* sm = SessionManager::get(); - - ret = sm->start(); cache->shutdown(); + delete sync; delete cache; delete ioc; diff --git a/src/unit_tests.cpp b/src/unit_tests.cpp index b342dbb49..31ee386bf 100644 --- a/src/unit_tests.cpp +++ b/src/unit_tests.cpp @@ -891,9 +891,9 @@ bool cacheTest1() // make sure nothing shows up in the cache path for files that don't exist v_bogus.push_back("does-not-exist"); - cache->read(v_bogus); + cache->read("", v_bogus); assert(!bf::exists(cachePath / "does-not-exist")); - cache->exists(v_bogus, &exists); + cache->exists("", v_bogus, &exists); assert(exists.size() == 1); assert(!exists[0]); @@ -901,21 +901,21 @@ bool cacheTest1() string realFile("storagemanager.cnf"); bf::copy_file(realFile, storagePath / realFile, bf::copy_option::overwrite_if_exists); v_bogus[0] = realFile; - cache->read(v_bogus); + cache->read("", v_bogus); assert(bf::exists(cachePath / realFile)); exists.clear(); - cache->exists(v_bogus, &exists); + cache->exists("", v_bogus, &exists); assert(exists.size() == 1); assert(exists[0]); size_t currentSize = cache->getCurrentCacheSize(); assert(currentSize == bf::file_size(cachePath / realFile)); // lie about the file being deleted and then replaced - cache->deletedObject(realFile, currentSize); + cache->deletedObject("", realFile, currentSize); assert(cache->getCurrentCacheSize() == 0); - cache->newObject(realFile, currentSize); + cache->newObject("", realFile, currentSize); assert(cache->getCurrentCacheSize() == currentSize); - cache->exists(v_bogus, &exists); + cache->exists("", v_bogus, &exists); assert(exists.size() == 1); assert(exists[0]); @@ -1025,13 +1025,13 @@ bool syncTest1() makeTestJournal((journalPath/journalName).string().c_str()); makeTestMetadata((metaPath/"test-file.meta").string().c_str()); - cache->newObject(key, bf::file_size(cachePath/key)); - cache->newJournalEntry(bf::file_size(journalPath/journalName)); + cache->newObject("", key, bf::file_size(cachePath/key)); + cache->newJournalEntry("", bf::file_size(journalPath/journalName)); vector vObj; vObj.push_back(key); - sync->newObjects(vObj); + sync->newObjects("", vObj); sync->forceFlush(); sleep(1); // wait for the job to run @@ -1041,12 +1041,12 @@ bool syncTest1() assert(!err); assert(exists); - sync->newJournalEntry(key, 0); + sync->newJournalEntry("", key, 0); sync->forceFlush(); sleep(1); // let it do what it does // check that the original objects no longer exist - assert(!cache->exists(key)); + assert(!cache->exists("", key)); assert(!bf::exists(journalPath/journalName)); // Replicator doesn't implement all of its functionality yet, need to delete key from the cache manually for now @@ -1063,24 +1063,24 @@ bool syncTest1() foundIt = (MetadataFile::getSourceFromKey(newKey) == "test-file"); if (foundIt) { - assert(cache->exists(newKey)); + assert(cache->exists("", newKey)); cs->deleteObject(newKey); break; } } assert(foundIt); - cache->makeSpace(cache->getMaxCacheSize()); // clear the cache & make it call sync->flushObject() + cache->makeSpace("", cache->getMaxCacheSize()); // clear the cache & make it call sync->flushObject() // the key should now be back in cloud storage and deleted from the cache - assert(!cache->exists(newKey)); + assert(!cache->exists("", newKey)); err = cs->exists(newKey, &exists); assert(!err && exists); // make the journal again, call sync->newJournalObject() makeTestJournal((journalPath / (newKey + ".journal")).string().c_str()); - cache->newJournalEntry(bf::file_size(journalPath / (newKey + ".journal"))); - sync->newJournalEntry(newKey, 0); + cache->newJournalEntry("", bf::file_size(journalPath / (newKey + ".journal"))); + sync->newJournalEntry("", newKey, 0); sync->forceFlush(); sleep(1); @@ -1102,7 +1102,7 @@ bool syncTest1() vector keys; for (bf::directory_iterator dir(fakeCloudPath); dir != bf::directory_iterator(); ++dir) keys.push_back(dir->path().filename().string()); - sync->deletedObjects(keys); + sync->deletedObjects("", keys); sync->forceFlush(); sleep(1); ::unlink((metaPath/"test-file.meta").string().c_str()); @@ -1318,11 +1318,11 @@ void IOCUnlink() makeTestObject(cachedObjPath.string().c_str()); makeTestJournal(cachedJournalPath.string().c_str()); - cache->newObject(cachedObjPath.filename().string(), bf::file_size(cachedObjPath)); - cache->newJournalEntry(bf::file_size(cachedJournalPath)); + cache->newObject("", cachedObjPath.filename().string(), bf::file_size(cachedObjPath)); + cache->newJournalEntry("", bf::file_size(cachedJournalPath)); vector keys; keys.push_back(cachedObjPath.filename().string()); - sync->newObjects(keys); + sync->newObjects("", keys); //sync->newJournalEntry(keys[0]); don't want to end up renaming it sync->forceFlush(); sleep(1); @@ -1385,7 +1385,7 @@ void IOCCopyFile1() makeTestMetadata(sourcePath.string().c_str()); makeTestObject((csPath/testObjKey).string().c_str()); makeTestJournal((journalPath/(string(testObjKey) + ".journal")).string().c_str()); - cache->newJournalEntry(bf::file_size(journalPath/(string(testObjKey) + ".journal"))); + cache->newJournalEntry("", bf::file_size(journalPath/(string(testObjKey) + ".journal"))); int err = ioc->copyFile("copyfile1/source", "copyfile2/dest"); assert(!err); @@ -1445,8 +1445,8 @@ void IOCCopyFile3() makeTestMetadata(sourcePath.string().c_str()); makeTestObject((cachePath/testObjKey).string().c_str()); makeTestJournal((journalPath/(string(testObjKey) + ".journal")).string().c_str()); - cache->newObject(testObjKey, bf::file_size(cachePath/testObjKey)); - cache->newJournalEntry(bf::file_size(journalPath/(string(testObjKey) + ".journal"))); + cache->newObject("", testObjKey, bf::file_size(cachePath/testObjKey)); + cache->newJournalEntry("", bf::file_size(journalPath/(string(testObjKey) + ".journal"))); int err = ioc->copyFile("copyfile3/source", "copyfile4/dest"); assert(!err); diff --git a/storagemanager.cnf b/storagemanager.cnf index 61f9b9459..c87a59b42 100644 --- a/storagemanager.cnf +++ b/storagemanager.cnf @@ -1,11 +1,41 @@ [ObjectStorage] service = LocalStorage + +# This is a tuneable value, but also implies a maximum capacity. +# Each file managed by StorageManager is broken into chunks of +# object_size bytes. Each chunk is stored in the cache as a file, +# so the filesystem the cache is put on needs to have enough inodes to +# support at least cache_size/object_size files. +# +# Regarding tuning, object stores do not support modifying stored data; +# entire objects must be replaced on modification, and entire +# objects are fetched on read. An access pattern that includes +# frequently accessing small amounts of data will benefit from +# a smaller object_size. An access pattern where data +# is accessed in large chunks will benefit from a larger object_size. +# +# Another limitation to consider is the get/put rate imposed by the +# cloud provider. If that is the limitation, increasing object_size +# will result in higher transfer rates. object_size = 5M + metadata_path = ${HOME}/storagemanager/metadata journal_path = ${HOME}/storagemanager/journal max_concurrent_downloads = 20 max_concurrent_uploads = 20 +# This is the depth of the common prefix that all files managed by SM have +# Ex: /usr/local/mariadb/columnstore/data1, and +# /usr/local/mariadb/columnstore/data2 differ at the 5th directory element, +# so they have a common prefix depth of 4. +# +# This value is used to manage the ownership of prefixes between +# StorageManager instances that sharing a filesystem. +# +# -1 is a special value indicating that there is no filesystem shared +# between SM instances. +common_prefix_depth = 4 + [S3] region = us-east-2 bucket = s3-cs-test2