diff --git a/src/Cache.cpp b/src/Cache.cpp index 31506a9e9..5d844616f 100644 --- a/src/Cache.cpp +++ b/src/Cache.cpp @@ -166,7 +166,7 @@ void Cache::read(const vector &keys) fetch keys that do not exist after fetching, move all keys from do-not-evict to the back of the LRU */ - boost::unique_lock s(lru_mutex); + boost::unique_lock s(lru_mutex); vector keysToFetch; uint i; @@ -243,14 +243,23 @@ void Cache::read(const vector &keys) vector dlErrnos; vector dlSizes; - boost::unique_lock s(lru_mutex); + boost::unique_lock s(lru_mutex); M_LRU_t::iterator mit; for (const string &key : keys) { mit = m_lru.find(key); if (mit != m_lru.end()) + { lru.splice(lru.end(), lru, mit->lit); + // if it's about to be deleted, stop that + TBD_t::iterator tbd_it = toBeDeleted.find(mit->lit); + if (tbd_it != toBeDeleted.end()) + { + cout << "Saved one from being deleted" << endl; + toBeDeleted.erase(tbd_it); + } + } else keysToFetch.push_back(&key); } @@ -323,20 +332,20 @@ const bf::path & Cache::getJournalPath() void Cache::exists(const vector &keys, vector *out) const { out->resize(keys.size()); - boost::unique_lock s(lru_mutex); + boost::unique_lock s(lru_mutex); for (uint i = 0; i < keys.size(); i++) (*out)[i] = (m_lru.find(keys[i]) != m_lru.end()); } bool Cache::exists(const string &key) const { - boost::unique_lock s(lru_mutex); + boost::unique_lock s(lru_mutex); return m_lru.find(key) != m_lru.end(); } void Cache::newObject(const string &key, size_t size) { - boost::unique_lock s(lru_mutex); + boost::unique_lock s(lru_mutex); assert(m_lru.find(key) == m_lru.end()); _makeSpace(size); lru.push_back(key); @@ -347,21 +356,21 @@ void Cache::newObject(const string &key, size_t size) void Cache::newJournalEntry(size_t size) { - boost::unique_lock s(lru_mutex); + boost::unique_lock s(lru_mutex); _makeSpace(size); currentCacheSize += size; } void Cache::deletedJournal(size_t size) { - boost::unique_lock s(lru_mutex); + boost::unique_lock s(lru_mutex); assert(currentCacheSize >= size); currentCacheSize -= size; } void Cache::deletedObject(const string &key, size_t size) { - boost::unique_lock s(lru_mutex); + boost::unique_lock s(lru_mutex); assert(currentCacheSize >= size); M_LRU_t::iterator mit = m_lru.find(key); assert(mit != m_lru.end()); // TODO: 5/16/19 - got this assertion using S3 by running test000, then test000 again. @@ -373,7 +382,7 @@ void Cache::deletedObject(const string &key, size_t size) void Cache::setMaxCacheSize(size_t size) { - boost::unique_lock s(lru_mutex); + boost::unique_lock s(lru_mutex); if (size < maxCacheSize) _makeSpace(maxCacheSize - size); maxCacheSize = size; @@ -381,7 +390,7 @@ void Cache::setMaxCacheSize(size_t size) void Cache::makeSpace(size_t size) { - boost::unique_lock s(lru_mutex); + boost::unique_lock s(lru_mutex); _makeSpace(size); } @@ -397,55 +406,69 @@ void Cache::_makeSpace(size_t size) if (thisMuch <= 0) return; - struct stat statbuf; - LRU_t::iterator it = lru.begin(); - while (it != lru.end() && thisMuch > 0) + LRU_t::iterator it; + while (thisMuch > 0 && !lru.empty()) { - if (doNotEvict.find(it) != doNotEvict.end()) + it = lru.begin(); + // find the first element not being either read() right now or being processed by another + // makeSpace() call. + while (it != lru.end()) { + if ((doNotEvict.find(it) == doNotEvict.end()) && (toBeDeleted.find(it) == toBeDeleted.end())) + break; ++it; - continue; // it's in the do-not-evict list } bf::path cachedFile = prefix / *it; - int err = stat(cachedFile.string().c_str(), &statbuf); - if (err) - { - logger->log(LOG_WARNING, "Cache::makeSpace(): There seems to be a cached file that couldn't be stat'ed: %s", - cachedFile.string().c_str()); - ++it; - continue; - } - + assert(bf::exists(cachedFile)); /* tell Synchronizer that this key will be evicted delete the file remove it from our structs update current size */ - assert(currentCacheSize >= (size_t) statbuf.st_size); - currentCacheSize -= statbuf.st_size; - thisMuch -= statbuf.st_size; + //logger->log(LOG_WARNING, "Cache: flushing!"); - Synchronizer::get()->flushObject(*it); - cachedFile = prefix / *it; // Sync may have renamed it - replicator->remove(cachedFile, Replicator::LOCAL_ONLY); - LRU_t::iterator toRemove = it++; - m_lru.erase(*toRemove); - lru.erase(toRemove); + toBeDeleted.insert(it); + + lru_mutex.unlock(); + try + { + Synchronizer::get()->flushObject(*it); + } + catch (...) + { + // it gets logged by Sync + lru_mutex.lock(); + toBeDeleted.erase(it); + continue; + } + lru_mutex.lock(); + + TBD_t::iterator tbd_it = toBeDeleted.find(it); + if (tbd_it != toBeDeleted.end()) + { + // if it's still in toBeDeleted then it is safe to delete. + // if read() happened to access it while it was flushing, it will not + // be in that set. + cachedFile = prefix / *it; + toBeDeleted.erase(tbd_it); + m_lru.erase(*it); + lru.erase(it); + size_t newSize = bf::file_size(cachedFile); + replicator->remove(cachedFile, Replicator::LOCAL_ONLY); + currentCacheSize -= newSize; + thisMuch -= newSize; + } } } void Cache::rename(const string &oldKey, const string &newKey, ssize_t sizediff) { - boost::unique_lock s(lru_mutex); + boost::unique_lock s(lru_mutex); auto it = m_lru.find(oldKey); - //assert(it != m_lru.end()); if (it == m_lru.end()) - { - logger->log(LOG_WARNING, "Cache: was told to rename %s, but it is not in the cache", oldKey.c_str()); return; - } auto lit = it->lit; m_lru.erase(it); @@ -459,7 +482,7 @@ int Cache::ifExistsThenDelete(const string &key) bf::path cachedPath = prefix / key; bf::path journalPath = journalPrefix / (key + ".journal"); - boost::unique_lock s(lru_mutex); + boost::unique_lock s(lru_mutex); bool objectExists = false; bool journalExists = bf::exists(journalPath); @@ -489,14 +512,14 @@ size_t Cache::getCurrentCacheSize() const size_t Cache::getCurrentCacheElementCount() const { - boost::unique_lock s(lru_mutex); + boost::unique_lock s(lru_mutex); assert(m_lru.size() == lru.size()); return m_lru.size(); } void Cache::reset() { - boost::unique_lock s(lru_mutex); + boost::unique_lock s(lru_mutex); m_lru.clear(); lru.clear(); @@ -541,6 +564,11 @@ inline bool Cache::DNEEquals::operator()(const DNEElement &l1, const DNEElement return (*(l1.key) == *(l2.key)); } +inline bool Cache::TBDLess::operator()(const LRU_t::iterator &i1, const LRU_t::iterator &i2) const +{ + return *i1 < *i2; +} + } diff --git a/src/Cache.h b/src/Cache.h index 96791b643..dab5696bd 100644 --- a/src/Cache.h +++ b/src/Cache.h @@ -93,7 +93,7 @@ class Cache : public boost::noncopyable typedef std::unordered_set M_LRU_t; M_LRU_t m_lru; // the LRU entries as a hash table - /* The do-not-evict list stuff */ + /* The do-not-evict list stuff. Unused at the moment for simplicity. */ struct DNEElement { DNEElement(const LRU_t::iterator &); @@ -115,7 +115,21 @@ class Cache : public boost::noncopyable DNE_t doNotEvict; void addToDNE(const LRU_t::iterator &key); void removeFromDNE(const LRU_t::iterator &key); - mutable boost::recursive_mutex lru_mutex; // protects the main cache structures & the do-not-evict set + + // the to-be-deleted set. Elements removed from the LRU but not yet deleted will be here. + // Elements are inserted and removed by makeSpace(). If read() references a file that is in this, + // it will remove it, signalling to makeSpace that it should not be deleted + struct TBDLess + { + bool operator()(const LRU_t::iterator &, const LRU_t::iterator &) const; + }; + + typedef std::set TBD_t; + TBD_t toBeDeleted; + + + + mutable boost::mutex lru_mutex; // protects the main cache structures & the do-not-evict set }; diff --git a/src/Downloader.cpp b/src/Downloader.cpp index ee4639628..0af44174b 100644 --- a/src/Downloader.cpp +++ b/src/Downloader.cpp @@ -33,7 +33,7 @@ Downloader::~Downloader() { } -void Downloader::useThisLock(boost::recursive_mutex *mutex) +void Downloader::useThisLock(boost::mutex *mutex) { lock = mutex; } @@ -192,7 +192,7 @@ void Downloader::setDownloadPath(const string &path) } /* The helper fcns */ -Downloader::Download::Download(const string &source, const string &_dlPath, boost::recursive_mutex *_lock) : +Downloader::Download::Download(const string &source, const string &_dlPath, boost::mutex *_lock) : dlPath(_dlPath), key(source), dl_errno(0), size(0), lock(_lock), finished(false), itRan(false) { } diff --git a/src/Downloader.h b/src/Downloader.h index e4876a802..07d82a739 100644 --- a/src/Downloader.h +++ b/src/Downloader.h @@ -27,12 +27,12 @@ class Downloader // errors are reported through errnos void download(const std::vector &keys, std::vector *errnos, std::vector *sizes); void setDownloadPath(const std::string &path); - void useThisLock(boost::recursive_mutex *); + void useThisLock(boost::mutex *); private: uint maxDownloads; std::string downloadPath; - boost::recursive_mutex *lock; + boost::mutex *lock; class DownloadListener { @@ -50,14 +50,14 @@ class Downloader */ struct Download : public ThreadPool::Job { - Download(const std::string &source, const std::string &_dlPath, boost::recursive_mutex *_lock); + Download(const std::string &source, const std::string &_dlPath, boost::mutex *_lock); ~Download(); void operator()(); boost::filesystem::path dlPath; const std::string key; int dl_errno; // to propagate errors from the download job to the caller size_t size; - boost::recursive_mutex *lock; + boost::mutex *lock; bool finished, itRan; std::vector listeners; }; diff --git a/src/IOCoordinator.cpp b/src/IOCoordinator.cpp index eae3cb9a2..53b7406a0 100755 --- a/src/IOCoordinator.cpp +++ b/src/IOCoordinator.cpp @@ -565,7 +565,7 @@ int IOCoordinator::truncate(const char *path, size_t newSize) else { meta.updateEntryLength(objects[0].offset, newSize - objects[0].offset); - assert(objects[0].offset >= 0 && objects[0].offset < (newSize - objects[0].offset)); + assert(objects[0].offset >= 0 && objects[0].length > (newSize - objects[0].offset)); } for (uint i = 1; i < objects.size(); i++) meta.removeEntry(objects[i].offset); diff --git a/src/Synchronizer.cpp b/src/Synchronizer.cpp index ce1d3a3e2..57e8606af 100644 --- a/src/Synchronizer.cpp +++ b/src/Synchronizer.cpp @@ -67,7 +67,7 @@ Synchronizer::~Synchronizer() or save the list it's working on..... For milestone 2, this will do the safe thing and finish working first. Later we can get fancy. */ - boost::unique_lock lock(mutex); + boost::unique_lock lock(mutex); die = true; syncThread.interrupt(); lock.unlock(); @@ -84,7 +84,7 @@ enum OpFlags void Synchronizer::newJournalEntry(const string &key) { - boost::unique_lock s(mutex); + boost::unique_lock s(mutex); auto it = pendingOps.find(key); if (it != pendingOps.end()) @@ -98,7 +98,7 @@ void Synchronizer::newJournalEntry(const string &key) void Synchronizer::newObjects(const vector &keys) { - boost::unique_lock s(mutex); + boost::unique_lock s(mutex); for (const string &key : keys) { @@ -110,7 +110,7 @@ void Synchronizer::newObjects(const vector &keys) void Synchronizer::deletedObjects(const vector &keys) { - boost::unique_lock s(mutex); + boost::unique_lock s(mutex); for (const string &key : keys) { @@ -127,7 +127,7 @@ void Synchronizer::deletedObjects(const vector &keys) void Synchronizer::flushObject(const string &key) { - boost::unique_lock s(mutex); + boost::unique_lock s(mutex); // if there is something to do on key, it should be either in pendingOps or opsInProgress // if it is is pending ops, start the job now. If it is in progress, wait for it to finish. @@ -139,7 +139,10 @@ void Synchronizer::flushObject(const string &key) if (it != pendingOps.end()) { objNames.push_front(key); - process(objNames.begin()); + auto nameIt = objNames.begin(); + s.unlock(); + process(nameIt); + s.lock(); } else { @@ -193,7 +196,7 @@ void Synchronizer::flushObject(const string &key) void Synchronizer::periodicSync() { - boost::unique_lock lock(mutex); + boost::unique_lock lock(mutex); while (!die) { lock.unlock(); @@ -224,7 +227,7 @@ void Synchronizer::process(list::iterator name) if not, return */ - boost::unique_lock s(mutex); + boost::unique_lock s(mutex); string &key = *name; auto it = pendingOps.find(key); @@ -258,7 +261,8 @@ void Synchronizer::process(list::iterator name) bool success = false; while (!success) - { + { + assert(!s.owns_lock()); try { // Exceptions should only happen b/c of cloud service errors that can't be retried. // This code is intentionally racy to avoid having to grab big locks. @@ -341,7 +345,7 @@ void Synchronizer::synchronizeDelete(const string &sourceFile, list::ite cs->deleteObject(*it); // delete any pending jobs for *it. There shouldn't be any, this is out of pure paranoia. - boost::unique_lock sc(mutex); + boost::unique_lock sc(mutex); pendingOps.erase(*it); } @@ -518,7 +522,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list void Synchronizer::rename(const string &oldKey, const string &newKey) { - boost::unique_lock s(mutex); + boost::unique_lock s(mutex); auto it = pendingOps.find(oldKey); if (it == pendingOps.end()) @@ -558,7 +562,7 @@ void Synchronizer::PendingOps::notify() condvar.notify_all(); } -void Synchronizer::PendingOps::wait(boost::recursive_mutex *m) +void Synchronizer::PendingOps::wait(boost::mutex *m) { while (!finished) { diff --git a/src/Synchronizer.h b/src/Synchronizer.h index 2b8d0994a..2c914f9a0 100644 --- a/src/Synchronizer.h +++ b/src/Synchronizer.h @@ -56,7 +56,7 @@ class Synchronizer : public boost::noncopyable int waiters; bool finished; boost::condition condvar; - void wait(boost::recursive_mutex *); + void wait(boost::mutex *); void notify(); }; @@ -94,7 +94,7 @@ class Synchronizer : public boost::noncopyable boost::filesystem::path cachePath; boost::filesystem::path journalPath; - boost::recursive_mutex mutex; + boost::mutex mutex; }; }