From 4ff769ab2430a2da92540c2be9ff648711cf7cdf Mon Sep 17 00:00:00 2001 From: Patrick LeBlanc Date: Fri, 22 Mar 2019 12:04:36 -0500 Subject: [PATCH] Got a pretty decent unit test for Sync working. --- src/Cache.cpp | 76 +++++++++++++++++++++++++++++++++------ src/Cache.h | 11 ++++-- src/CloudStorage.h | 2 +- src/IOCoordinator.cpp | 9 ++--- src/LocalStorage.cpp | 10 +++--- src/LocalStorage.h | 2 +- src/RWLock.cpp | 2 ++ src/S3Storage.cpp | 2 +- src/S3Storage.h | 2 +- src/SMLogging.cpp | 4 ++- src/Synchronizer.cpp | 55 ++++++++++++++-------------- src/Synchronizer.h | 2 +- src/unit_tests.cpp | 84 +++++++++++++++++++++++++++++++++---------- 13 files changed, 184 insertions(+), 77 deletions(-) diff --git a/src/Cache.cpp b/src/Cache.cpp index 894fd8ff1..01b62e3a6 100644 --- a/src/Cache.cpp +++ b/src/Cache.cpp @@ -91,6 +91,25 @@ Cache::Cache() : currentCacheSize(0) //cout << "Cache got prefix " << prefix << endl; downloader.setDownloadPath(prefix.string()); + + stmp = conf->getValue("ObjectStorage", "journal_path"); + if (stmp.empty()) + { + logger->log(LOG_CRIT, "ObjectStorage/journal_path is not set"); + throw runtime_error("Please set ObjectStorage/journal_path in the storagemanager.cnf file"); + } + journalPrefix = stmp; + bf::create_directories(journalPrefix); + try + { + bf::create_directories(journalPrefix); + } + catch (exception &e) + { + syslog(LOG_CRIT, "Failed to create %s, got: %s", journalPrefix.string().c_str(), e.what()); + throw e; + } + populate(); } @@ -104,20 +123,17 @@ void Cache::populate() bf::directory_iterator dend; while (dir != dend) { - // put everything that doesn't end with '.journal' in lru & m_lru + // put everything in lru & m_lru const bf::path &p = dir->path(); if (bf::is_regular_file(p)) { - size_t size = bf::file_size(*dir); if (p.extension() == "") // need to decide whether objects should have an extension { - lru.push_back(p.string()); + lru.push_back(p.filename().string()); auto last = lru.end(); m_lru.insert(--last); - currentCacheSize += size; + currentCacheSize += bf::file_size(*dir); } - else if (p.extension() == "journal") - currentCacheSize += size; else logger->log(LOG_WARNING, "Cache: found a file in the cache that does not belong '%s'", p.string().c_str()); } @@ -125,6 +141,23 @@ void Cache::populate() logger->log(LOG_WARNING, "Cache: found something in the cache that does not belong '%s'", p.string().c_str()); ++dir; } + + // account for what's in the journal dir + dir = bf::directory_iterator(journalPrefix); + while (dir != dend) + { + const bf::path &p = dir->path(); + if (bf::is_regular_file(p)) + { + if (p.extension() == ".journal") // need to decide whether objects should have an extension + currentCacheSize += bf::file_size(*dir); + else + logger->log(LOG_WARNING, "Cache: found a file in the journal dir that does not belong '%s'", p.string().c_str()); + } + else + logger->log(LOG_WARNING, "Cache: found something in the journal dir that does not belong '%s'", p.string().c_str()); + ++dir; + } } void Cache::read(const vector &keys) @@ -229,8 +262,12 @@ const bf::path & Cache::getCachePath() return prefix; } +const bf::path & Cache::getJournalPath() +{ + return journalPrefix; +} -void Cache::exists(const vector &keys, vector *out) +void Cache::exists(const vector &keys, vector *out) const { out->resize(keys.size()); boost::unique_lock s(lru_mutex); @@ -238,7 +275,7 @@ void Cache::exists(const vector &keys, vector *out) (*out)[i] = (m_lru.find(keys[i]) != m_lru.end()); } -bool Cache::exists(const string &key) +bool Cache::exists(const string &key) const { boost::unique_lock s(lru_mutex); return m_lru.find(key) != m_lru.end(); @@ -293,6 +330,11 @@ void Cache::makeSpace(size_t size) _makeSpace(size); } +size_t Cache::getMaxCacheSize() const +{ + return maxCacheSize; +} + // call this holding lru_mutex void Cache::_makeSpace(size_t size) { @@ -314,7 +356,7 @@ void Cache::_makeSpace(size_t size) int err = stat(cachedFile.string().c_str(), &statbuf); if (err) { - logger->log(LOG_WARNING, "Downloader: There seems to be a cached file that couldn't be stat'ed: %s", cachedFile.string().c_str()); + logger->log(LOG_WARNING, "Cache::makeSpace(): There seems to be a cached file that couldn't be stat'ed: %s", cachedFile.string().c_str()); ++it; continue; } @@ -331,8 +373,8 @@ void Cache::_makeSpace(size_t size) Synchronizer::get()->flushObject(*it); replicator->remove(cachedFile.string().c_str(), Replicator::LOCAL_ONLY); LRU_t::iterator toRemove = it++; - lru.erase(toRemove); m_lru.erase(*toRemove); + lru.erase(toRemove); } } @@ -354,6 +396,20 @@ size_t Cache::getCurrentCacheSize() const return currentCacheSize; } +void Cache::reset() +{ + m_lru.clear(); + lru.clear(); + + bf::directory_iterator dir; + bf::directory_iterator dend; + for (dir = bf::directory_iterator(prefix); dir != dend; ++dir) + bf::remove_all(dir->path()); + + for (dir = bf::directory_iterator(journalPrefix); dir != dend; ++dir) + bf::remove_all(dir->path()); +} + /* The helper classes */ Cache::M_LRU_element_t::M_LRU_element_t(const string *k) : key(k) diff --git a/src/Cache.h b/src/Cache.h index 1d0c01ee6..b6c61ea83 100644 --- a/src/Cache.h +++ b/src/Cache.h @@ -27,8 +27,8 @@ class Cache : public boost::noncopyable virtual ~Cache(); void read(const std::vector &keys); - bool exists(const std::string &key); - void exists(const std::vector &keys, std::vector *out); + bool exists(const std::string &key) const; + void exists(const std::vector &keys, std::vector *out) const; void newObject(const std::string &key, size_t size); void newJournalEntry(size_t size); void deletedObject(const std::string &key, size_t size); @@ -40,13 +40,18 @@ class Cache : public boost::noncopyable void setMaxCacheSize(size_t size); void makeSpace(size_t size); size_t getCurrentCacheSize() const; + size_t getMaxCacheSize() const; // test helpers const boost::filesystem::path &getCachePath(); + const boost::filesystem::path &getJournalPath(); + // this will delete everything in the cache and journal paths, and empty all Cache structures. + void reset(); private: Cache(); boost::filesystem::path prefix; + boost::filesystem::path journalPrefix; size_t maxCacheSize; size_t objectSize; size_t currentCacheSize; @@ -105,7 +110,7 @@ class Cache : public boost::noncopyable DNE_t doNotEvict; void addToDNE(const LRU_t::iterator &key); void removeFromDNE(const LRU_t::iterator &key); - boost::mutex lru_mutex; // protects the main cache structures & the do-not-evict set + mutable boost::mutex lru_mutex; // protects the main cache structures & the do-not-evict set }; diff --git a/src/CloudStorage.h b/src/CloudStorage.h index 45d4ab875..1baf6c780 100644 --- a/src/CloudStorage.h +++ b/src/CloudStorage.h @@ -15,7 +15,7 @@ class CloudStorage /* These behave like syscalls. return code -1 means an error, and errno is set */ virtual int getObject(const std::string &sourceKey, const std::string &destFile, size_t *size = NULL) = 0; - virtual int getObject(const std::string &sourceKey, boost::shared_array &data, size_t *size = NULL) = 0; + virtual int getObject(const std::string &sourceKey, boost::shared_array *data, size_t *size = NULL) = 0; virtual int putObject(const std::string &sourceFile, const std::string &destKey) = 0; virtual int putObject(const boost::shared_array data, size_t len, const std::string &destKey) = 0; virtual void deleteObject(const std::string &key) = 0; diff --git a/src/IOCoordinator.cpp b/src/IOCoordinator.cpp index 921ad96b4..33a37e9ff 100755 --- a/src/IOCoordinator.cpp +++ b/src/IOCoordinator.cpp @@ -460,8 +460,6 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array &objData, size uint64_t startReadingAt = offlen[0]; uint64_t lengthOfRead = offlen[1]; - - ::lseek(journalFD, startReadingAt, SEEK_CUR); uint count = 0; while (count < lengthOfRead) @@ -471,19 +469,18 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array &objData, size { char buf[80]; int l_errno = errno; - logger->log(LOG_ERR, "mergeJournal: got %s", strerror_r(errno, buf, 80)); + logger->log(LOG_ERR, "mergeJournalInMem: got %s", strerror_r(errno, buf, 80)); errno = l_errno; return -1; } else if (err == 0) { - logger->log(LOG_ERR, "mergeJournal: got early EOF"); + logger->log(LOG_ERR, "mergeJournalInMem: got early EOF"); errno = ENODATA; // is there a better errno for early EOF? return -1; } count += err; } - ::lseek(journalFD, offlen[1] - lengthOfRead, SEEK_CUR); } return 0; } @@ -532,7 +529,7 @@ void IOCoordinator::writeUnlock(const string &filename) boost::unique_lock s(lockMutex); auto it = locks.find(filename); - it->second->readUnlock(); + it->second->writeUnlock(); if (!it->second->inUse()) { delete it->second; diff --git a/src/LocalStorage.cpp b/src/LocalStorage.cpp index d33d5dfc4..a0d4e1d71 100644 --- a/src/LocalStorage.cpp +++ b/src/LocalStorage.cpp @@ -70,13 +70,11 @@ int LocalStorage::getObject(const string &source, const string &dest, size_t *si return ret; } -int LocalStorage::getObject(const std::string &sourceKey, boost::shared_array &data, size_t *size) +int LocalStorage::getObject(const std::string &sourceKey, boost::shared_array *data, size_t *size) { int ret; bf::path source = prefix / sourceKey; const char *c_source = source.string().c_str(); - size_t l_size = bf::file_size(source); - data.reset(new uint8_t[l_size]); char buf[80]; int fd = ::open(c_source, O_RDONLY); @@ -85,11 +83,13 @@ int LocalStorage::getObject(const std::string &sourceKey, boost::shared_arraylog(LOG_CRIT, "LocalStorage::getObject() failed to open %s, got '%s'", c_source, strerror_r(errno, buf, 80)); return fd; } - + + size_t l_size = bf::file_size(source); + data->reset(new uint8_t[l_size]); size_t count = 0; while (count < l_size) { - int err = ::read(fd, &data[count], l_size - count); + int err = ::read(fd, &(*data)[count], l_size - count); if (err < 0) { logger->log(LOG_CRIT, "LocalStorage::getObject() failed to read %s, got '%s'", c_source, strerror_r(errno, buf, 80)); diff --git a/src/LocalStorage.h b/src/LocalStorage.h index d572df0c8..9272b1c75 100644 --- a/src/LocalStorage.h +++ b/src/LocalStorage.h @@ -16,7 +16,7 @@ class LocalStorage : public CloudStorage virtual ~LocalStorage(); int getObject(const std::string &sourceKey, const std::string &destFile, size_t *size = NULL); - int getObject(const std::string &sourceKey, boost::shared_array &data, size_t *size = NULL); + int getObject(const std::string &sourceKey, boost::shared_array *data, size_t *size = NULL); int putObject(const std::string &sourceFile, const std::string &destKey); int putObject(const boost::shared_array data, size_t len, const std::string &destKey); void deleteObject(const std::string &key); diff --git a/src/RWLock.cpp b/src/RWLock.cpp index c6af32190..297003294 100644 --- a/src/RWLock.cpp +++ b/src/RWLock.cpp @@ -49,6 +49,7 @@ void RWLock::readUnlock() { boost::unique_lock s(m); + assert(readersRunning > 0); --readersRunning; if (readersRunning == 0 && writersWaiting != 0) okToWrite.notify_one(); @@ -83,6 +84,7 @@ void RWLock::writeUnlock() { boost::unique_lock s(m); + assert(writersRunning > 0); --writersRunning; if (writersWaiting != 0) okToWrite.notify_one(); diff --git a/src/S3Storage.cpp b/src/S3Storage.cpp index 8984be323..76d549178 100644 --- a/src/S3Storage.cpp +++ b/src/S3Storage.cpp @@ -19,7 +19,7 @@ int S3Storage::getObject(const string &sourceKey, const string &destFile, size_t return 0; } -int S3Storage::getObject(const string &sourceKey, boost::shared_array &data, size_t *size) +int S3Storage::getObject(const string &sourceKey, boost::shared_array *data, size_t *size) { return 0; } diff --git a/src/S3Storage.h b/src/S3Storage.h index 34abaa9df..1fb3798de 100644 --- a/src/S3Storage.h +++ b/src/S3Storage.h @@ -15,7 +15,7 @@ class S3Storage : public CloudStorage virtual ~S3Storage(); int getObject(const std::string &sourceKey, const std::string &destFile, size_t *size = NULL); - int getObject(const std::string &sourceKey, boost::shared_array &data, size_t *size = NULL); + int getObject(const std::string &sourceKey, boost::shared_array *data, size_t *size = NULL); int putObject(const std::string &sourceFile, const std::string &destKey); int putObject(const boost::shared_array data, size_t len, const std::string &destKey); void deleteObject(const std::string &key); diff --git a/src/SMLogging.cpp b/src/SMLogging.cpp index 666601782..a4d399e48 100644 --- a/src/SMLogging.cpp +++ b/src/SMLogging.cpp @@ -44,7 +44,9 @@ void SMLogging::log(int priority,const char *format, ...) va_start(args, format); #ifdef DEBUG - vprintf(format, args); + va_list args2; + va_copy(args2, args); + vprintf(format, args2); printf("\n"); #endif vsyslog(priority, format, args); diff --git a/src/Synchronizer.cpp b/src/Synchronizer.cpp index 609842132..f07c0a6b0 100644 --- a/src/Synchronizer.cpp +++ b/src/Synchronizer.cpp @@ -102,22 +102,7 @@ Synchronizer::Synchronizer() : maxUploads(0) if (maxUploads == 0) maxUploads = 20; - stmp = config->getValue("ObjectStorage", "journal_path"); - if (stmp.empty()) - { - logger->log(LOG_CRIT, "ObjectStorage/journal_path is not set"); - throw runtime_error("Please set ObjectStorage/journal_path in the storagemanager.cnf file"); - } - try - { - journalPath = stmp; - bf::create_directories(journalPath); - } - catch (exception &e) - { - logger->log(LOG_CRIT, "Failed to create %s, got: %s", stmp.c_str(), e.what()); - throw e; - } + journalPath = cache->getJournalPath(); cachePath = cache->getCachePath(); threadPool.setMaxThreads(maxUploads); } @@ -187,18 +172,21 @@ void Synchronizer::flushObject(const string &key) // if there is something to do on key, it should be in the objNames list // and either in pendingOps or opsInProgress. - // in testing though, going to check whether there is something to do + // The sanity check at the end was intended for debugging / development + // I'm inclined to make it permanent. An existence check on S3 is quick. bool noExistingJob = false; auto it = pendingOps.find(key); if (it != pendingOps.end()) - // find the object name and call process() + // find the object name and call process() to start it right away for (auto name = objNames.begin(); name != objNames.end(); ++it) + { if (*name == key) { process(name, false); break; } + } else { auto op = opsInProgress.find(key); @@ -207,8 +195,7 @@ void Synchronizer::flushObject(const string &key) op->second->wait(&mutex); else { - // it's not in either one, check if there is anything to be done as - // a sanity check. + // it's not in either one, trigger existence check noExistingJob = true; } } @@ -230,10 +217,11 @@ void Synchronizer::flushObject(const string &key) } while (err); if (!exists) { - logger->log(LOG_DEBUG, "Sync::flushObject(): broken assumption! %s does not exist in cloud storage, but there is no job for it. Uploading it now."); + logger->log(LOG_DEBUG, "Sync::flushObject(): %s does not exist in cloud storage, " + "and there is no job for it. Uploading it now.", key.c_str()); pendingOps[key] = boost::shared_ptr(new PendingOps(NEW_OBJECT)); objNames.push_front(key); - process(objNames.begin(), false); + process(objNames.begin(), true); } } @@ -245,7 +233,7 @@ void Synchronizer::makeJob(const string &key) threadPool.addJob(j); } -void Synchronizer::process(list::iterator name, bool use_lock) +void Synchronizer::process(list::iterator name, bool callerHoldsLock) { /* check if there is a pendingOp for name @@ -255,10 +243,11 @@ void Synchronizer::process(list::iterator name, bool use_lock) if not, return */ - // had to use this 'use_lock' kludge to let flush() start processing a job immediately + // had to use this 'callerHoldsLock' kludge to let flush() start processing a job w/o unlocking first + // and introducing a race. It means this fcn should not use its own scoped lock. It sucks, need to rework it. boost::unique_lock s(mutex, boost::defer_lock); - if (use_lock) + if (!callerHoldsLock) s.lock(); string &key = *name; @@ -281,7 +270,11 @@ void Synchronizer::process(list::iterator name, bool use_lock) opsInProgress[key] = pending; pendingOps.erase(it); string sourceFile = MetadataFile::getSourceFromKey(*name); - s.unlock(); + + if (!callerHoldsLock) + s.unlock(); + else + mutex.unlock(); bool success = false; while (!success) @@ -308,7 +301,11 @@ void Synchronizer::process(list::iterator name, bool use_lock) } } - s.lock(); + if (!callerHoldsLock) + s.lock(); + else + mutex.lock(); + opsInProgress.erase(key); objNames.erase(name); } @@ -376,7 +373,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list // merge it with its journal file if (!oldObjIsCached) { - err = cs->getObject(key, data, &size); + err = cs->getObject(key, &data, &size); if (err) throw runtime_error(string("Synchronizer: getObject() failed: ") + strerror_r(errno, buf, 80)); err = ioc->mergeJournalInMem(data, &size, journalName.c_str()); @@ -418,7 +415,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list } cache->rename(key, newKey, size - bf::file_size(oldCachePath)); - replicator->remove(key.c_str()); + replicator->remove(key.c_str()); // should this be the file to remove, or the key? } // update the metadata for the source file diff --git a/src/Synchronizer.h b/src/Synchronizer.h index 67068df4f..6193e7ff9 100644 --- a/src/Synchronizer.h +++ b/src/Synchronizer.h @@ -39,7 +39,7 @@ class Synchronizer : public boost::noncopyable private: Synchronizer(); - void process(std::list::iterator key, bool use_lock=true); + void process(std::list::iterator key, bool callerHoldsLock=false); void synchronize(const std::string &sourceFile, std::list::iterator &it); void synchronizeDelete(const std::string &sourceFile, std::list::iterator &it); void synchronizeWithJournal(const std::string &sourceFile, std::list::iterator &it); diff --git a/src/unit_tests.cpp b/src/unit_tests.cpp index 592bef7ff..67ae619dd 100755 --- a/src/unit_tests.cpp +++ b/src/unit_tests.cpp @@ -701,6 +701,20 @@ bool syncTest1() Synchronizer *sync = Synchronizer::get(); Cache *cache = Cache::get(); CloudStorage *cs = CloudStorage::get(); + LocalStorage *ls = dynamic_cast(cs); + if (!ls) + { + cout << "syncTest1() requires using local storage at the moment." << endl; + return true; + } + + cache->reset(); + + // delete everything in the fake cloud to make it easier to list later + bf::path fakeCloudPath = ls->getPrefix(); + for (bf::directory_iterator dir(fakeCloudPath); dir != bf::directory_iterator(); ++dir) + bf::remove(dir->path()); + bf::path cachePath = sync->getCachePath(); bf::path journalPath = sync->getJournalPath(); @@ -710,15 +724,13 @@ bool syncTest1() // nothing creates the dir yet bf::create_directories(metaPath); - // make the test obj, journal, and metadata and put them in their dirs + // make the test obj, journal, and metadata string key = "12345_0_8192_test-object"; string journalName = key + ".journal"; makeTestObject((cachePath/key).string().c_str()); makeTestJournal((journalPath/journalName).string().c_str()); makeTestMetadata((metaPath/"test-file.meta").string().c_str()); - - cout << "made test objects" << endl; cache->newObject(key, bf::file_size(cachePath/key)); cache->newJournalEntry(bf::file_size(journalPath/journalName)); @@ -726,8 +738,6 @@ bool syncTest1() vector vObj; vObj.push_back(key); - cout << "calling sync.newObjects()" << endl; - sync->newObjects(vObj); sleep(1); // wait for the job to run @@ -737,8 +747,6 @@ bool syncTest1() assert(!err); assert(exists); - cout << "calling sync.newJournalEntry()" << endl; - sync->newJournalEntry(key); sleep(1); // let it do what it does @@ -746,19 +754,59 @@ bool syncTest1() assert(!cache->exists(key)); assert(!bf::exists(journalPath / journalName)); -/* - void newJournalEntry(const std::string &key); - void newObjects(const std::vector &keys); - void deletedObjects(const std::vector &keys); - void flushObject(const std::string &key); -*/ - // TODO test the rest of the fcns, esp syncwithjournal where the obj is not in the cache + // Replicator doesn't implement all of its functionality yet, need to delete key from the cache manually for now + bf::remove(cachePath/key); + + // check that a new version of object exists in cloud storage + // D'oh, this would have to list the objects to find it, not going to implement + // that everywhere just now. For now, making this test require LocalStorage. + bool foundIt = false; + string newKey; + for (bf::directory_iterator dir(fakeCloudPath); dir != bf::directory_iterator() && !foundIt; ++dir) + { + newKey = dir->path().filename().string(); + foundIt = (MetadataFile::getSourceFromKey(newKey) == "test-object"); + if (foundIt) + { + size_t fsize = bf::file_size(dir->path()); + assert(cache->exists(newKey)); + cs->deleteObject(newKey); + break; + } + } + + assert(foundIt); + cache->makeSpace(cache->getMaxCacheSize()); // clear the cache & make it call sync->flushObject() + + // the key should now be back in cloud storage and deleted from the cache + assert(!cache->exists(newKey)); + err = cs->exists(newKey, &exists); + assert(!err && exists); + + // make the journal again, call sync->newJournalObject() + makeTestJournal((journalPath / (newKey + ".journal")).string().c_str()); + sync->newJournalEntry(newKey); + sleep(1); + + // verify that newkey is no longer in cloud storage, and that another permutation is + err = cs->exists(newKey, &exists); + assert(!err && !exists); + foundIt = false; + for (bf::directory_iterator dir(fakeCloudPath); dir != bf::directory_iterator() && !foundIt; ++dir) + { + key = dir->path().filename().string(); + foundIt = (MetadataFile::getSourceFromKey(key) == "test-object"); + } + assert(foundIt); + // TODO test error paths, pass in some junk - - // cleanup - bf::remove(cachePath / key); - bf::remove(journalPath / journalName); + // cleanup, just blow away everything for now + cache->reset(); + vector keys; + for (bf::directory_iterator dir(fakeCloudPath); dir != bf::directory_iterator(); ++dir) + keys.push_back(dir->path().filename().string()); + sync->deletedObjects(keys); cout << "Sync test 1 OK" << endl; }