From 715d041a858b4f3410defb62cec0b8f7157d7d08 Mon Sep 17 00:00:00 2001 From: Patrick LeBlanc Date: Fri, 28 Jun 2019 13:20:31 -0500 Subject: [PATCH] Fixed several bugs exposed by setting the cache size to an unreasonably small value. Had to make a compromise to avoid a deadlock though. read/write/append/truncate will now be able to exceed the cache size limit temporarily. The cache will be reconciled at the end of the respective operation. Ex, given a cache of 100MB, and a read() of 500MB, all 500MB of data being read will stay in the cache until it is read, then 400MB of it will be evicted. Same on the write side. --- src/Cache.cpp | 52 +++++++++++++++++++++++++++---------------- src/Cache.h | 11 ++++++--- src/IOCoordinator.cpp | 37 ++++++++++++++++++------------ src/IOCoordinator.h | 2 ++ src/Replicator.cpp | 3 --- src/SMLogging.cpp | 2 +- src/Synchronizer.cpp | 17 +++++++++++--- src/Synchronizer.h | 3 ++- src/unit_tests.cpp | 1 + 9 files changed, 84 insertions(+), 44 deletions(-) diff --git a/src/Cache.cpp b/src/Cache.cpp index 26850cf0a..f937c81b9 100644 --- a/src/Cache.cpp +++ b/src/Cache.cpp @@ -2,6 +2,7 @@ #include "Cache.h" #include "Config.h" #include "Downloader.h" +#include "Synchronizer.h" #include #include #include @@ -120,7 +121,6 @@ Cache::~Cache() void Cache::populate() { - Synchronizer *sync = Synchronizer::get(); bf::directory_iterator dir(prefix); bf::directory_iterator dend; while (dir != dend) @@ -147,9 +147,7 @@ void Cache::populate() if (bf::is_regular_file(p)) { if (p.extension() == ".journal") - { currentCacheSize += bf::file_size(*dir); - sync->newJournalEntry(p.stem()); else logger->log(LOG_WARNING, "Cache: found a file in the journal dir that does not belong '%s'", p.string().c_str()); } @@ -257,7 +255,7 @@ void Cache::read(const vector &keys) TBD_t::iterator tbd_it = toBeDeleted.find(mit->lit); if (tbd_it != toBeDeleted.end()) { - cout << "Saved one from being deleted" << endl; + //cout << "Saved one from being deleted" << endl; toBeDeleted.erase(tbd_it); } } @@ -297,7 +295,7 @@ void Cache::read(const vector &keys) } // fix cache size - _makeSpace(sum_sizes); + //_makeSpace(sum_sizes); currentCacheSize += sum_sizes; } @@ -310,7 +308,12 @@ void Cache::doneReading(const vector &keys) if (it != m_lru.end()) removeFromDNE(it->lit); } - readyToDelete.notify_all(); + _makeSpace(0); +} + +void Cache::doneWriting() +{ + makeSpace(0); } Cache::DNEElement::DNEElement(const LRU_t::iterator &k) : key(k), refCount(1) @@ -369,7 +372,7 @@ void Cache::newObject(const string &key, size_t size) { boost::unique_lock s(lru_mutex); assert(m_lru.find(key) == m_lru.end()); - _makeSpace(size); + //_makeSpace(size); lru.push_back(key); LRU_t::iterator back = lru.end(); m_lru.insert(--back); @@ -379,7 +382,7 @@ void Cache::newObject(const string &key, size_t size) void Cache::newJournalEntry(size_t size) { boost::unique_lock s(lru_mutex); - _makeSpace(size); + //_makeSpace(size); currentCacheSize += size; } @@ -432,7 +435,9 @@ void Cache::_makeSpace(size_t size) ssize_t thisMuch = currentCacheSize + size - maxCacheSize; if (thisMuch <= 0) return; - + if (thisMuch > (ssize_t) currentCacheSize) + thisMuch = currentCacheSize; + LRU_t::iterator it; while (thisMuch > 0 && !lru.empty()) { @@ -447,10 +452,8 @@ void Cache::_makeSpace(size_t size) } if (it == lru.end()) { - // nothing can be deleted, wait for something to change - cout << "nothing can be deleted, waiting" << endl; - readyToDelete.wait(lru_mutex); - continue; + // nothing can be deleted right now + return; } @@ -492,8 +495,17 @@ void Cache::_makeSpace(size_t size) lru.erase(it); size_t newSize = bf::file_size(cachedFile); replicator->remove(cachedFile, Replicator::LOCAL_ONLY); - currentCacheSize -= newSize; - thisMuch -= newSize; + if (newSize < currentCacheSize) + { + currentCacheSize -= newSize; + thisMuch -= newSize; + } + else + { + logger->log(LOG_WARNING, "Cache::makeSpace(): accounting error. Almost wrapped currentCacheSize on flush."); + currentCacheSize = 0; + thisMuch = 0; + } } } } @@ -511,7 +523,7 @@ void Cache::rename(const string &oldKey, const string &newKey, ssize_t sizediff) auto lit = it->lit; m_lru.erase(it); int refCount = 0; - auto dne_it = doNotEvict.find(it->lit); + auto dne_it = doNotEvict.find(lit); if (dne_it != doNotEvict.end()) { refCount = dne_it->refCount; @@ -544,12 +556,11 @@ int Cache::ifExistsThenDelete(const string &key) boost::unique_lock s(lru_mutex); bool objectExists = false; - bool journalExists = bf::exists(journalPath); + auto it = m_lru.find(key); if (it != m_lru.end()) { - // let makeSpace() delete it if it's already being flushed if (toBeDeleted.find(it->lit) == toBeDeleted.end()) { doNotEvict.erase(it->lit); @@ -557,8 +568,11 @@ int Cache::ifExistsThenDelete(const string &key) m_lru.erase(it); objectExists = true; } + else // let makeSpace() delete it if it's already in progress + return 0; } - //assert(objectExists == bf::exists(cachedPath)); + bool journalExists = bf::exists(journalPath); + assert(objectExists == bf::exists(cachedPath)); size_t objectSize = (objectExists ? bf::file_size(cachedPath) : 0); //size_t objectSize = (objectExists ? MetadataFile::getLengthFromKey(key) : 0); diff --git a/src/Cache.h b/src/Cache.h index b85361319..321cac486 100644 --- a/src/Cache.h +++ b/src/Cache.h @@ -17,20 +17,26 @@ namespace storagemanager { -class Synchronizer; - class Cache : public boost::noncopyable { public: static Cache *get(); virtual ~Cache(); + //reading fcns + // read() marks objects to be read s.t. they do not get flushed. + // after reading them, unlock the 'logical file', and call doneReading(). void read(const std::vector &keys); void doneReading(const std::vector &keys); bool exists(const std::string &key) const; void exists(const std::vector &keys, std::vector *out) const; + + // writing fcns + // new*() fcns tell the cache data was added. After writing a set of objects, + // unlock the 'logical file', and call doneWriting(). void newObject(const std::string &key, size_t size); void newJournalEntry(size_t size); + void doneWriting(); void deletedObject(const std::string &key, size_t size); void deletedJournal(size_t size); @@ -116,7 +122,6 @@ class Cache : public boost::noncopyable DNE_t doNotEvict; void addToDNE(const LRU_t::iterator &key); void removeFromDNE(const LRU_t::iterator &key); - boost::condition readyToDelete; // the to-be-deleted set. Elements removed from the LRU but not yet deleted will be here. // Elements are inserted and removed by makeSpace(). If read() references a file that is in this, diff --git a/src/IOCoordinator.cpp b/src/IOCoordinator.cpp index bbad42b42..0e2839049 100755 --- a/src/IOCoordinator.cpp +++ b/src/IOCoordinator.cpp @@ -1,7 +1,7 @@ #include "IOCoordinator.h" #include "MetadataFile.h" -#include "Utilities.h" +#include "Synchronizer.h" #include #include #include @@ -178,6 +178,7 @@ int IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset, size } else if (errno != ENOENT) { + fileLock.unlock(); cache->doneReading(keys); int l_errno = errno; logger->log(LOG_CRIT, "IOCoordinator::read(): Got an unexpected error opening %s, error was '%s'", @@ -191,6 +192,7 @@ int IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset, size fd = ::open(filename.c_str(), O_RDONLY); if (fd < 0) { + fileLock.unlock(); cache->doneReading(keys); int l_errno = errno; logger->log(LOG_CRIT, "IOCoordinator::read(): Got an unexpected error opening %s, error was '%s'", @@ -221,7 +223,7 @@ int IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset, size off_t thisOffset = (count == 0 ? offset - object.offset : 0); // This checks and returns if the read is starting past EOF if (thisOffset >= (off_t) object.length) - return count; + goto out; // if this is the last object, the length of the read is length - count, // otherwise it is the length of the object - starting offset @@ -233,6 +235,7 @@ int IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset, size &data[count], thisOffset, thisLength); if (err) { + fileLock.unlock(); cache->doneReading(keys); if (count == 0) return -1; @@ -243,6 +246,8 @@ int IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset, size count += thisLength; } +out: + fileLock.unlock(); cache->doneReading(keys); // all done return count; @@ -254,7 +259,10 @@ int IOCoordinator::write(const char *_filename, const uint8_t *data, off_t offse const char *filename = p.string().c_str(); ScopedWriteLock lock(this, filename); - return _write(filename, data, offset, length); + int ret = _write(filename, data, offset, length); + lock.unlock(); + cache->doneWriting(); + return ret; } int IOCoordinator::_write(const char *filename, const uint8_t *data, off_t offset, size_t length) @@ -298,7 +306,7 @@ int IOCoordinator::_write(const char *filename, const uint8_t *data, off_t offse writeLength = min(objectSize,dataRemaining); objectOffset = 0; } - cache->makeSpace(writeLength+JOURNAL_ENTRY_HEADER_SIZE); + //cache->makeSpace(writeLength+JOURNAL_ENTRY_HEADER_SIZE); err = replicator->addJournalEntry(i->key.c_str(),&data[count],objectOffset,writeLength); assert((uint) err == writeLength); @@ -317,7 +325,6 @@ int IOCoordinator::_write(const char *filename, const uint8_t *data, off_t offse if ((writeLength + objectOffset) > i->length) metadata.updateEntryLength(i->offset, (writeLength + objectOffset)); - cache->newJournalEntry(writeLength+JOURNAL_ENTRY_HEADER_SIZE); synchronizer->newJournalEntry(i->key); @@ -349,7 +356,7 @@ int IOCoordinator::_write(const char *filename, const uint8_t *data, off_t offse // writeLength is the data length passed to write() // objectOffset is 0 unless the write starts beyond the end of data // in that case need to add the null data to cachespace - cache->makeSpace(writeLength + objectOffset); + //cache->makeSpace(writeLength + objectOffset); metadataObject newObject = metadata.addMetadataObject(filename,(writeLength + objectOffset)); @@ -373,7 +380,6 @@ int IOCoordinator::_write(const char *filename, const uint8_t *data, off_t offse count += writeLength; dataRemaining -= writeLength; } - synchronizer->newObjects(newObjectKeys); metadata.writeMetadata(filename); @@ -419,7 +425,7 @@ int IOCoordinator::append(const char *_filename, const uint8_t *data, size_t len // figure out how much data to write to this object writeLength = min((objectSize - i->length),dataRemaining); - cache->makeSpace(writeLength+JOURNAL_ENTRY_HEADER_SIZE); + //cache->makeSpace(writeLength+JOURNAL_ENTRY_HEADER_SIZE); err = replicator->addJournalEntry(i->key.c_str(),&data[count],i->length,writeLength); assert((uint) err == writeLength); @@ -428,14 +434,13 @@ int IOCoordinator::append(const char *_filename, const uint8_t *data, size_t len metadata.updateEntryLength(i->offset, (count + i->length)); metadata.writeMetadata(filename); logger->log(LOG_ERR,"IOCoordinator::append(): journal failed to complete write, %u of %u bytes written.",count,length); - return count; + goto out; } metadata.updateEntryLength(i->offset, (writeLength + i->length)); cache->newJournalEntry(writeLength+JOURNAL_ENTRY_HEADER_SIZE); synchronizer->newJournalEntry(i->key); - count += writeLength; dataRemaining -= writeLength; } @@ -453,7 +458,7 @@ int IOCoordinator::append(const char *_filename, const uint8_t *data, size_t len //add a new metaDataObject writeLength = min(objectSize,dataRemaining); - cache->makeSpace(writeLength); + //cache->makeSpace(writeLength); // add a new metadata object, this will get a new objectKey NOTE: probably needs offset too metadataObject newObject = metadata.addMetadataObject(filename,writeLength); @@ -466,7 +471,7 @@ int IOCoordinator::append(const char *_filename, const uint8_t *data, size_t len metadata.updateEntryLength(newObject.offset, (count)); metadata.writeMetadata(filename); logger->log(LOG_ERR,"IOCoordinator::append(): newObject failed to complete write, %u of %u bytes written.",count,length); - return count; + goto out; //log error and abort } cache->newObject(newObject.key,writeLength); @@ -475,12 +480,14 @@ int IOCoordinator::append(const char *_filename, const uint8_t *data, size_t len count += writeLength; dataRemaining -= writeLength; } - synchronizer->newObjects(newObjectKeys); - metadata.writeMetadata(filename); + // had to add this hack to prevent deadlock +out: + // need to release the file lock before calling the cache fcns. lock.unlock(); + cache->doneWriting(); return count; } @@ -581,6 +588,8 @@ int IOCoordinator::truncate(const char *_path, size_t newSize) { uint8_t zero = 0; err = _write(path, &zero, newSize - 1, 1); + lock.unlock(); + cache->doneWriting(); if (err < 0) return -1; return 0; diff --git a/src/IOCoordinator.h b/src/IOCoordinator.h index 3dc663625..af6421d2f 100644 --- a/src/IOCoordinator.h +++ b/src/IOCoordinator.h @@ -17,6 +17,7 @@ #include "SMLogging.h" #include "RWLock.h" #include "Replicator.h" +#include "Utilities.h" namespace storagemanager { @@ -85,6 +86,7 @@ class IOCoordinator : public boost::noncopyable void remove(const boost::filesystem::path &path); void deleteMetaFile(const boost::filesystem::path &file); + int _write(const char *filename, const uint8_t *data, off_t offset, size_t length); int loadObjectAndJournal(const char *objFilename, const char *journalFilename, diff --git a/src/Replicator.cpp b/src/Replicator.cpp index 68c7e534e..a671bba25 100755 --- a/src/Replicator.cpp +++ b/src/Replicator.cpp @@ -2,7 +2,6 @@ #include "Replicator.h" #include "IOCoordinator.h" #include "SMLogging.h" -#include "Cache.h" #include "Utilities.h" #include #include @@ -122,7 +121,6 @@ int Replicator::addJournalEntry(const char *filename, const uint8_t *data, off_t int version = 1; string journalFilename = msJournalPath + "/" + string(filename) + ".journal"; uint64_t thisEntryMaxOffset = (offset + length - 1); - Cache *cache = Cache::get(); // need to init sync here to break circular dependency... bool exists = boost::filesystem::exists(journalFilename); OPEN(journalFilename.c_str(), (exists ? O_RDWR : O_WRONLY | O_CREAT)) @@ -132,7 +130,6 @@ int Replicator::addJournalEntry(const char *filename, const uint8_t *data, off_t // create new journal file with header //OPEN(journalFilename.c_str(), O_WRONLY | O_CREAT); string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str(); - cache->makeSpace(header.size()); err = ::write(fd, header.c_str(), header.length() + 1); assert((uint) err == header.length() + 1); if (err <= 0) diff --git a/src/SMLogging.cpp b/src/SMLogging.cpp index a4d399e48..c3cd0be69 100644 --- a/src/SMLogging.cpp +++ b/src/SMLogging.cpp @@ -46,7 +46,7 @@ void SMLogging::log(int priority,const char *format, ...) #ifdef DEBUG va_list args2; va_copy(args2, args); - vprintf(format, args2); + vfprintf(stderr, format, args2); printf("\n"); #endif vsyslog(priority, format, args); diff --git a/src/Synchronizer.cpp b/src/Synchronizer.cpp index 6ca719069..abd6b1453 100755 --- a/src/Synchronizer.cpp +++ b/src/Synchronizer.cpp @@ -82,10 +82,8 @@ enum OpFlags NEW_OBJECT = 0x4, }; -void Synchronizer::newJournalEntry(const string &key) +void Synchronizer::_newJournalEntry(const string &key) { - boost::unique_lock s(mutex); - auto it = pendingOps.find(key); if (it != pendingOps.end()) { @@ -95,6 +93,19 @@ void Synchronizer::newJournalEntry(const string &key) //makeJob(key); pendingOps[key] = boost::shared_ptr(new PendingOps(JOURNAL)); } + +void Synchronizer::newJournalEntry(const string &key) +{ + boost::unique_lock s(mutex); + _newJournalEntry(key); +} + +void Synchronizer::newJournalEntries(const vector &keys) +{ + boost::unique_lock s(mutex); + for (const string &key : keys) + _newJournalEntry(key); +} void Synchronizer::newObjects(const vector &keys) { diff --git a/src/Synchronizer.h b/src/Synchronizer.h index fec9f2fd0..7652b2fc2 100755 --- a/src/Synchronizer.h +++ b/src/Synchronizer.h @@ -30,6 +30,7 @@ class Synchronizer : public boost::noncopyable // these take keys as parameters, not full path names, ex, pass in '12345' not // 'cache/12345'. void newJournalEntry(const std::string &key); + void newJournalEntries(const std::vector &keys); void newObjects(const std::vector &keys); void deletedObjects(const std::vector &keys); void flushObject(const std::string &key); @@ -41,6 +42,7 @@ class Synchronizer : public boost::noncopyable private: Synchronizer(); + void _newJournalEntry(const std::string &key); void process(std::list::iterator key); void synchronize(const std::string &sourceFile, std::list::iterator &it); void synchronizeDelete(const std::string &sourceFile, std::list::iterator &it); @@ -86,7 +88,6 @@ class Synchronizer : public boost::noncopyable const boost::chrono::seconds syncInterval = boost::chrono::seconds(10); void periodicSync(); - SMLogging *logger; Cache *cache; Replicator *replicator; diff --git a/src/unit_tests.cpp b/src/unit_tests.cpp index a51cf7f7f..3ceac0d40 100755 --- a/src/unit_tests.cpp +++ b/src/unit_tests.cpp @@ -15,6 +15,7 @@ #include "Replicator.h" #include "S3Storage.h" #include "Utilities.h" +#include "Synchronizer.h" #include #include