#include "Cache.h" #include "Config.h" #include "Downloader.h" #include "Synchronizer.h" #include #include #include #include #include #include #include using namespace std; namespace bf = boost::filesystem; namespace { boost::mutex m; storagemanager::Cache *inst = NULL; } namespace storagemanager { Cache * Cache::get() { if (inst) return inst; boost::unique_lock s(m); if (inst) return inst; inst = new Cache(); return inst; } Cache::Cache() : currentCacheSize(0) { Config *conf = Config::get(); logger = SMLogging::get(); replicator = Replicator::get(); string stmp = conf->getValue("Cache", "cache_size"); if (stmp.empty()) { logger->log(LOG_CRIT, "Cache/cache_size is not set"); throw runtime_error("Please set Cache/cache_size in the storagemanager.cnf file"); } try { maxCacheSize = stoul(stmp); } catch (invalid_argument &) { logger->log(LOG_CRIT, "Cache/cache_size is not a number"); throw runtime_error("Please set Cache/cache_size to a number"); } //cout << "Cache got cache size " << maxCacheSize << endl; stmp = conf->getValue("ObjectStorage", "object_size"); if (stmp.empty()) { logger->log(LOG_CRIT, "ObjectStorage/object_size is not set"); throw runtime_error("Please set ObjectStorage/object_size in the storagemanager.cnf file"); } try { objectSize = stoul(stmp); } catch (invalid_argument &) { logger->log(LOG_CRIT, "ObjectStorage/object_size is not a number"); throw runtime_error("Please set ObjectStorage/object_size to a number"); } prefix = conf->getValue("Cache", "path"); if (prefix.empty()) { logger->log(LOG_CRIT, "Cache/path is not set"); throw runtime_error("Please set Cache/path in the storagemanager.cnf file"); } try { bf::create_directories(prefix); } catch (exception &e) { logger->log(LOG_CRIT, "Failed to create %s, got: %s", prefix.string().c_str(), e.what()); throw e; } //cout << "Cache got prefix " << prefix << endl; downloader.reset(new Downloader()); downloader->setDownloadPath(prefix.string()); downloader->useThisLock(&lru_mutex); stmp = conf->getValue("ObjectStorage", "journal_path"); if (stmp.empty()) { logger->log(LOG_CRIT, "ObjectStorage/journal_path is not set"); throw runtime_error("Please set ObjectStorage/journal_path in the storagemanager.cnf file"); } journalPrefix = stmp; bf::create_directories(journalPrefix); try { bf::create_directories(journalPrefix); } catch (exception &e) { logger->log(LOG_CRIT, "Failed to create %s, got: %s", journalPrefix.string().c_str(), e.what()); throw e; } lru_mutex.lock(); // unlocked by populate() when it's done boost::thread t([this] { this->populate(); }); t.detach(); } Cache::~Cache() { } void Cache::populate() { Synchronizer *sync = Synchronizer::get(); bf::directory_iterator dir(prefix); bf::directory_iterator dend; vector newObjects; while (dir != dend) { // put everything in lru & m_lru const bf::path &p = dir->path(); if (bf::is_regular_file(p)) { lru.push_back(p.filename().string()); auto last = lru.end(); m_lru.insert(--last); currentCacheSize += bf::file_size(*dir); newObjects.push_back(p.filename().string()); } else logger->log(LOG_WARNING, "Cache: found something in the cache that does not belong '%s'", p.string().c_str()); ++dir; } sync->newObjects(newObjects); newObjects.clear(); // account for what's in the journal dir vector > newJournals; dir = bf::directory_iterator(journalPrefix); while (dir != dend) { const bf::path &p = dir->path(); if (bf::is_regular_file(p)) { if (p.extension() == ".journal") { size_t s = bf::file_size(*dir); currentCacheSize += s; newJournals.push_back(pair(p.stem().string(), s)); } else logger->log(LOG_WARNING, "Cache: found a file in the journal dir that does not belong '%s'", p.string().c_str()); } else logger->log(LOG_WARNING, "Cache: found something in the journal dir that does not belong '%s'", p.string().c_str()); ++dir; } lru_mutex.unlock(); sync->newJournalEntries(newJournals); } // be careful using this! SM should be idle. No ongoing reads or writes. void Cache::validateCacheSize() { boost::unique_lock s(lru_mutex); if (!doNotEvict.empty() || !toBeDeleted.empty()) { cout << "Not safe to use validateCacheSize() at the moment." << endl; return; } size_t oldSize = currentCacheSize; currentCacheSize = 0; m_lru.clear(); lru.clear(); populate(); if (oldSize != currentCacheSize) logger->log(LOG_DEBUG, "Cache::validateCacheSize(): found a discrepancy. Actual size is %lld, had %lld.", currentCacheSize, oldSize); else logger->log(LOG_DEBUG, "Cache::validateCacheSize(): Cache size accounting agrees with reality for now."); } #if 0 /* Need to simplify this, we keep running into sneaky problems, and I just spotted a couple more. Just going to rewrite it. We can revisit later if the simplified version needs improvement */ void Cache::read(const vector &keys) { /* move existing keys to a do-not-evict map fetch keys that do not exist after fetching, move all keys from do-not-evict to the back of the LRU */ boost::unique_lock s(lru_mutex); vector keysToFetch; uint i; M_LRU_t::iterator mit; for (const string &key : keys) { mit = m_lru.find(key); if (mit != m_lru.end()) // it's in the cache, add the entry to the do-not-evict set addToDNE(mit->lit); else // not in the cache, put it in the list to download keysToFetch.push_back(&key); } //s.unlock(); // start downloading the keys to fetch // TODO: there should be a path for the common case, which is that there is nothing // to download. vector dl_errnos; vector sizes; if (!keysToFetch.empty()) downloader->download(keysToFetch, &dl_errnos, &sizes, lru_mutex); size_t sum_sizes = 0; for (size_t &size : sizes) sum_sizes += size; //s.lock(); // do makespace() before downloading. Problem is, until the download is finished, this fcn can't tell which // downloads it was responsible for. Need Downloader to make the call...? _makeSpace(sum_sizes); currentCacheSize += sum_sizes; // move all keys to the back of the LRU for (i = 0; i < keys.size(); i++) { mit = m_lru.find(keys[i]); if (mit != m_lru.end()) { lru.splice(lru.end(), lru, mit->lit); LRU_t::iterator lit = lru.end(); removeFromDNE(--lit); } else if (dl_errnos[i] == 0) // successful download { lru.push_back(keys[i]); LRU_t::iterator lit = lru.end(); m_lru.insert(M_LRU_element_t(--lit)); } else { // Downloader already logged it, anything to do here? /* brainstorming options for handling it. 1) Should it be handled? The caller will log a file-not-found error, and there will be a download failure in the log already. 2) Can't really DO anything can it? */ } } } #endif // new simplified version void Cache::read(const vector &keys) { /* Move existing keys to the back of the LRU, start downloading nonexistant keys. */ vector keysToFetch; vector dlErrnos; vector dlSizes; boost::unique_lock s(lru_mutex); M_LRU_t::iterator mit; for (const string &key : keys) { mit = m_lru.find(key); if (mit != m_lru.end()) { addToDNE(mit->lit); lru.splice(lru.end(), lru, mit->lit); // move them to the back so they are last to pick for eviction } else { // There's window where the file has been downloaded but is not yet // added to the lru structs. However it is in the DNE. If it is in the DNE, then it is also // in Downloader's map. So, this thread needs to start the download if it's not in the // DNE or if there's an existing download that hasn't finished yet. Starting the download // includes waiting for an existing download to finish, which from this class's pov is the // same thing. if (doNotEvict.find(key) == doNotEvict.end() || downloader->inProgress(key)) keysToFetch.push_back(&key); else cout << "Cache: detected and stopped a racey download" << endl; addToDNE(key); } } if (keysToFetch.empty()) return; downloader->download(keysToFetch, &dlErrnos, &dlSizes); size_t sum_sizes = 0; for (uint i = 0; i < keysToFetch.size(); ++i) { // downloads with size 0 didn't actually happen, either because it // was a preexisting download (another read() call owns it), or because // there was an error downloading it. Use size == 0 as an indication of // what to add to the cache. Also needs to verify that the file was not deleted, // indicated by existence in doNotEvict. if (dlSizes[i] != 0) { if (doNotEvict.find(*keysToFetch[i]) != doNotEvict.end()) { sum_sizes += dlSizes[i]; lru.push_back(*keysToFetch[i]); LRU_t::iterator lit = lru.end(); m_lru.insert(--lit); // I dislike this way of grabbing the last iterator in a list. } else // it was downloaded, but a deletion happened so we have to toss it { cout << "removing a file that was deleted by another thread during download" << endl; bf::remove(prefix / (*keysToFetch[i])); } } } // move everything in keys to the back of the lru (yes, again) for (const string &key : keys) { mit = m_lru.find(key); if (mit != m_lru.end()) // all of the files exist, just not all of them are 'owned by' this thread. lru.splice(lru.end(), lru, mit->lit); } // fix cache size //_makeSpace(sum_sizes); currentCacheSize += sum_sizes; } void Cache::doneReading(const vector &keys) { boost::unique_lock s(lru_mutex); for (const string &key : keys) { removeFromDNE(key); // most should be in the map. // debateable whether it's faster to look up the list iterator and use it // or whether it's faster to bypass that and use strings only. //const auto &it = m_lru.find(key); //if (it != m_lru.end()) // removeFromDNE(it->lit); } _makeSpace(0); } void Cache::doneWriting() { makeSpace(0); } Cache::DNEElement::DNEElement(const LRU_t::iterator &k) : key(k), refCount(1) { } Cache::DNEElement::DNEElement(const string &k) : sKey(k), refCount(1) { } void Cache::addToDNE(const DNEElement &key) { DNE_t::iterator it = doNotEvict.find(key); if (it != doNotEvict.end()) { DNEElement &dnee = const_cast(*it); ++(dnee.refCount); } else doNotEvict.insert(key); } void Cache::removeFromDNE(const DNEElement &key) { DNE_t::iterator it = doNotEvict.find(key); if (it == doNotEvict.end()) return; DNEElement &dnee = const_cast(*it); if (--(dnee.refCount) == 0) doNotEvict.erase(it); } const bf::path & Cache::getCachePath() { return prefix; } const bf::path & Cache::getJournalPath() { return journalPrefix; } void Cache::exists(const vector &keys, vector *out) const { out->resize(keys.size()); boost::unique_lock s(lru_mutex); for (uint i = 0; i < keys.size(); i++) (*out)[i] = (m_lru.find(keys[i]) != m_lru.end()); } bool Cache::exists(const string &key) const { boost::unique_lock s(lru_mutex); return m_lru.find(key) != m_lru.end(); } void Cache::newObject(const string &key, size_t size) { boost::unique_lock s(lru_mutex); assert(m_lru.find(key) == m_lru.end()); //_makeSpace(size); lru.push_back(key); LRU_t::iterator back = lru.end(); m_lru.insert(--back); currentCacheSize += size; } void Cache::newJournalEntry(size_t size) { boost::unique_lock s(lru_mutex); //_makeSpace(size); currentCacheSize += size; } void Cache::deletedJournal(size_t size) { boost::unique_lock s(lru_mutex); assert(currentCacheSize >= size); currentCacheSize -= size; } void Cache::deletedObject(const string &key, size_t size) { boost::unique_lock s(lru_mutex); assert(currentCacheSize >= size); M_LRU_t::iterator mit = m_lru.find(key); assert(mit != m_lru.end()); // if it's being flushed, let makeSpace() do the deleting if (toBeDeleted.find(mit->lit) == toBeDeleted.end()) { doNotEvict.erase(mit->lit); lru.erase(mit->lit); m_lru.erase(mit); currentCacheSize -= size; } } void Cache::setMaxCacheSize(size_t size) { boost::unique_lock s(lru_mutex); if (size < maxCacheSize) _makeSpace(maxCacheSize - size); maxCacheSize = size; } void Cache::makeSpace(size_t size) { boost::unique_lock s(lru_mutex); _makeSpace(size); } size_t Cache::getMaxCacheSize() const { return maxCacheSize; } // call this holding lru_mutex void Cache::_makeSpace(size_t size) { ssize_t thisMuch = currentCacheSize + size - maxCacheSize; if (thisMuch <= 0) return; LRU_t::iterator it; while (thisMuch > 0 && !lru.empty()) { it = lru.begin(); // find the first element not being either read() right now or being processed by another // makeSpace() call. while (it != lru.end()) { // make sure it's not currently being read or being flushed by another _makeSpace() call if ((doNotEvict.find(it) == doNotEvict.end()) && (toBeDeleted.find(it) == toBeDeleted.end())) break; ++it; } if (it == lru.end()) { // nothing can be deleted right now return; } if (!bf::exists(prefix / *it)) cout << prefix / *it << " doesn't exist, WTF?" << endl; // ran into this a couple times, still happens as of commit 948ee1aa5 assert(bf::exists(prefix / *it)); /* tell Synchronizer that this key will be evicted delete the file remove it from our structs update current size */ //logger->log(LOG_WARNING, "Cache: flushing!"); toBeDeleted.insert(it); string key = *it; // need to make a copy; it could get changed after unlocking. lru_mutex.unlock(); try { Synchronizer::get()->flushObject(key); } catch (...) { // it gets logged by Sync lru_mutex.lock(); toBeDeleted.erase(it); continue; } lru_mutex.lock(); // check doNotEvict again in case this object is now being read if (doNotEvict.find(it) == doNotEvict.end()) { bf::path cachedFile = prefix / *it; m_lru.erase(*it); toBeDeleted.erase(it); lru.erase(it); size_t newSize = bf::file_size(cachedFile); replicator->remove(cachedFile, Replicator::LOCAL_ONLY); if (newSize < currentCacheSize) { currentCacheSize -= newSize; thisMuch -= newSize; } else { logger->log(LOG_WARNING, "Cache::makeSpace(): accounting error. Almost wrapped currentCacheSize on flush."); currentCacheSize = 0; thisMuch = 0; } } else toBeDeleted.erase(it); } } void Cache::rename(const string &oldKey, const string &newKey, ssize_t sizediff) { // rename it in the LRU // erase/insert to rehash it everywhere else boost::unique_lock s(lru_mutex); auto it = m_lru.find(oldKey); if (it == m_lru.end()) return; auto lit = it->lit; m_lru.erase(it); int refCount = 0; auto dne_it = doNotEvict.find(lit); if (dne_it != doNotEvict.end()) { refCount = dne_it->refCount; doNotEvict.erase(dne_it); } auto tbd_it = toBeDeleted.find(lit); bool hasTBDEntry = (tbd_it != toBeDeleted.end()); if (hasTBDEntry) toBeDeleted.erase(tbd_it); *lit = newKey; if (hasTBDEntry) toBeDeleted.insert(lit); if (refCount != 0) { pair dne_tmp = doNotEvict.insert(lit); const_cast(*(dne_tmp.first)).refCount = refCount; } m_lru.insert(lit); currentCacheSize += sizediff; } int Cache::ifExistsThenDelete(const string &key) { bf::path cachedPath = prefix / key; bf::path journalPath = journalPrefix / (key + ".journal"); boost::unique_lock s(lru_mutex); bool objectExists = false; auto it = m_lru.find(key); if (it != m_lru.end()) { if (toBeDeleted.find(it->lit) == toBeDeleted.end()) { doNotEvict.erase(it->lit); lru.erase(it->lit); m_lru.erase(it); objectExists = true; } else // let makeSpace() delete it if it's already in progress return 0; } bool journalExists = bf::exists(journalPath); //assert(objectExists == bf::exists(cachedPath)); size_t objectSize = (objectExists ? bf::file_size(cachedPath) : 0); //size_t objectSize = (objectExists ? MetadataFile::getLengthFromKey(key) : 0); size_t journalSize = (journalExists ? bf::file_size(journalPath) : 0); currentCacheSize -= (objectSize + journalSize); //assert(!objectExists || objectSize == bf::file_size(cachedPath)); return (objectExists ? 1 : 0) | (journalExists ? 2 : 0); } size_t Cache::getCurrentCacheSize() const { return currentCacheSize; } size_t Cache::getCurrentCacheElementCount() const { boost::unique_lock s(lru_mutex); assert(m_lru.size() == lru.size()); return m_lru.size(); } void Cache::reset() { boost::unique_lock s(lru_mutex); m_lru.clear(); lru.clear(); toBeDeleted.clear(); doNotEvict.clear(); bf::directory_iterator dir; bf::directory_iterator dend; for (dir = bf::directory_iterator(prefix); dir != dend; ++dir) bf::remove_all(dir->path()); for (dir = bf::directory_iterator(journalPrefix); dir != dend; ++dir) bf::remove_all(dir->path()); currentCacheSize = 0; } void Cache::shutdown() { boost::unique_lock s(lru_mutex); downloader.reset(); } /* The helper classes */ Cache::M_LRU_element_t::M_LRU_element_t(const string *k) : key(k) {} Cache::M_LRU_element_t::M_LRU_element_t(const string &k) : key(&k) {} Cache::M_LRU_element_t::M_LRU_element_t(const LRU_t::iterator &i) : key(&(*i)), lit(i) {} inline size_t Cache::KeyHasher::operator()(const M_LRU_element_t &l) const { return hash()(*(l.key)); } inline bool Cache::KeyEquals::operator()(const M_LRU_element_t &l1, const M_LRU_element_t &l2) const { return (*(l1.key) == *(l2.key)); } inline size_t Cache::DNEHasher::operator()(const DNEElement &l) const { return (l.sKey.empty() ? hash()(*(l.key)) : hash()(l.sKey)); } inline bool Cache::DNEEquals::operator()(const DNEElement &l1, const DNEElement &l2) const { const string *s1, *s2; s1 = l1.sKey.empty() ? &(*(l1.key)) : &(l1.sKey); s2 = l2.sKey.empty() ? &(*(l2.key)) : &(l2.sKey); return (*s1 == *s2); } inline bool Cache::TBDLess::operator()(const LRU_t::iterator &i1, const LRU_t::iterator &i2) const { return *i1 < *i2; } }