From 07b4bdd19c1fba48bd677cc9111be0f88d0f7f1b Mon Sep 17 00:00:00 2001 From: Patrick LeBlanc Date: Thu, 21 Mar 2019 12:42:10 -0500 Subject: [PATCH] Got the synchronizer stuff to build. --- src/Cache.cpp | 59 +++++++--- src/Cache.h | 11 +- src/CloudStorage.cpp | 6 + src/CloudStorage.h | 7 ++ src/IOCoordinator.cpp | 15 ++- src/IOCoordinator.h | 1 + src/LocalStorage.cpp | 25 +++-- src/LocalStorage.h | 2 +- src/Replicator.cpp | 7 +- src/Replicator.h | 13 ++- src/S3Storage.cpp | 4 +- src/S3Storage.h | 2 +- src/Synchronizer.cpp | 251 ++++++++++++++++++++++++++---------------- src/Synchronizer.h | 15 ++- src/unit_tests.cpp | 102 +++++++++-------- 15 files changed, 338 insertions(+), 182 deletions(-) diff --git a/src/Cache.cpp b/src/Cache.cpp index 41ff14493..0296ba39d 100644 --- a/src/Cache.cpp +++ b/src/Cache.cpp @@ -13,14 +13,32 @@ using namespace std; namespace bf = boost::filesystem; +namespace +{ + boost::mutex m; + storagemanager::Cache *inst = NULL; +} + namespace storagemanager { +Cache * Cache::get() +{ + if (inst) + return inst; + boost::unique_lock s(m); + if (inst) + return inst; + inst = new Cache(); + return inst; +} + Cache::Cache() : currentCacheSize(0) { Config *conf = Config::get(); logger = SMLogging::get(); sync = Synchronizer::get(); + replicator = Replicator::get(); string stmp = conf->getValue("Cache", "cache_size"); if (stmp.empty()) @@ -84,26 +102,28 @@ Cache::~Cache() void Cache::populate() { bf::directory_iterator dir(prefix); - bf::diretory_iterator dend; + bf::directory_iterator dend; while (dir != dend) { // put everything that doesn't end with '.journal' in lru & m_lru - if (bf::is_regular_file(*dir)) + const bf::path &p = dir->path(); + if (bf::is_regular_file(p)) { size_t size = bf::file_size(*dir); - if (dir->extension() == "obj") + if (p.extension() == "") // need to decide whether objects should have an extension { - lru.push_back(dir->string()); - m_lru.insert(lru.end() - 1); + lru.push_back(p.string()); + auto last = lru.end(); + m_lru.insert(--last); currentCacheSize += size; } - else if (dir->extension() == "journal") + else if (p.extension() == "journal") currentCacheSize += size; else - logger->log(LOG_WARN, "Cache: found a file in the cache that does not belong '%s'", dir->string().c_str()); + logger->log(LOG_WARNING, "Cache: found a file in the cache that does not belong '%s'", p.string().c_str()); } else - logger->log(LOG_WARN, "Cache: found something in the cache that does not belong '%s'", dir->string().c_str()); + logger->log(LOG_WARNING, "Cache: found something in the cache that does not belong '%s'", p.string().c_str()); ++dir; } } @@ -147,7 +167,7 @@ void Cache::read(const vector &keys) s.lock(); // do makespace() before downloading. Problem is, until the download is finished, this fcn can't tell which // downloads it was responsible for. Need Downloader to make the call...? - makeSpace(sum_sizes); + _makeSpace(sum_sizes); currentCacheSize += sum_sizes; // move all keys to the back of the LRU @@ -222,14 +242,14 @@ void Cache::exists(const vector &keys, vector *out) bool Cache::exists(const string &key) { boost::unique_lock s(lru_mutex); - return m_lru.find(keys[i]) != m_lru.end(); + return m_lru.find(key) != m_lru.end(); } void Cache::newObject(const string &key, size_t size) { boost::unique_lock s(lru_mutex); assert(m_lru.find(key) == m_lru.end()); - makeSpace(size); + _makeSpace(size); lru.push_back(key); LRU_t::iterator back = lru.end(); m_lru.insert(--back); @@ -239,7 +259,7 @@ void Cache::newObject(const string &key, size_t size) void Cache::newJournalEntry(size_t size) { boost::unique_lock s(lru_mutex); - makeSpace(size); + _makeSpace(size); currentCacheSize += size; } @@ -262,13 +282,20 @@ void Cache::deletedObject(const string &key, size_t size) void Cache::setMaxCacheSize(size_t size) { + boost::unique_lock s(lru_mutex); if (size < maxCacheSize) - makeSpace(maxCacheSize - size); + _makeSpace(maxCacheSize - size); maxCacheSize = size; } -// call this holding lru_mutex void Cache::makeSpace(size_t size) +{ + boost::unique_lock s(lru_mutex); + _makeSpace(size); +} + +// call this holding lru_mutex +void Cache::_makeSpace(size_t size) { ssize_t thisMuch = currentCacheSize + size - maxCacheSize; if (thisMuch <= 0) @@ -303,7 +330,7 @@ void Cache::makeSpace(size_t size) currentCacheSize -= statbuf.st_size; thisMuch -= statbuf.st_size; sync->flushObject(*it); - replicator->delete(cachedFile, Replicator::LOCAL_ONLY); + replicator->remove(cachedFile.string().c_str(), Replicator::LOCAL_ONLY); LRU_t::iterator toRemove = it++; lru.erase(toRemove); m_lru.erase(*toRemove); @@ -313,7 +340,7 @@ void Cache::makeSpace(size_t size) void Cache::rename(const string &oldKey, const string &newKey, ssize_t sizediff) { boost::unique_lock s(lru_mutex); - auto it = m_lru(oldKey); + auto it = m_lru.find(oldKey); assert(it != m_lru.end()); auto lit = it->lit; diff --git a/src/Cache.h b/src/Cache.h index b1b62e38f..28ed27d16 100644 --- a/src/Cache.h +++ b/src/Cache.h @@ -5,6 +5,7 @@ #include "Downloader.h" #include "SMLogging.h" #include "Synchronizer.h" +#include "Replicator.h" #include #include @@ -17,10 +18,12 @@ namespace storagemanager { +class Synchronizer; + class Cache : public boost::noncopyable { public: - Cache(); + static Cache *get(); virtual ~Cache(); void read(const std::vector &keys); @@ -35,21 +38,25 @@ class Cache : public boost::noncopyable // the size will change in that process; sizediff is by how much void rename(const std::string &oldKey, const std::string &newKey, ssize_t sizediff); void setMaxCacheSize(size_t size); + void makeSpace(size_t size); size_t getCurrentCacheSize() const; // test helpers const boost::filesystem::path &getCachePath(); private: + Cache(); + boost::filesystem::path prefix; size_t maxCacheSize; size_t objectSize; size_t currentCacheSize; Downloader downloader; + Replicator *replicator; Synchronizer *sync; SMLogging *logger; void populate(); - void makeSpace(size_t size); + void _makeSpace(size_t size); /* The main cache structures */ // lru owns the string memory for the filenames it manages. m_lru and DNE point to those strings. diff --git a/src/CloudStorage.cpp b/src/CloudStorage.cpp index 17e9acb93..f953918ae 100644 --- a/src/CloudStorage.cpp +++ b/src/CloudStorage.cpp @@ -27,6 +27,7 @@ string tolower(const string &s) namespace storagemanager { + CloudStorage * CloudStorage::get() { if (inst) @@ -50,4 +51,9 @@ CloudStorage * CloudStorage::get() return inst; } +CloudStorage::CloudStorage() +{ + logger = SMLogging::get(); +} + } diff --git a/src/CloudStorage.h b/src/CloudStorage.h index 20194db50..45d4ab875 100644 --- a/src/CloudStorage.h +++ b/src/CloudStorage.h @@ -4,6 +4,7 @@ #include #include +#include "SMLogging.h" namespace storagemanager { @@ -23,7 +24,13 @@ class CloudStorage // this will return a CloudStorage instance of the type specified in StorageManager.cnf static CloudStorage *get(); + + protected: + SMLogging *logger; + CloudStorage(); + private: + }; } diff --git a/src/IOCoordinator.cpp b/src/IOCoordinator.cpp index 0a0eed2ac..921ad96b4 100755 --- a/src/IOCoordinator.cpp +++ b/src/IOCoordinator.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -336,7 +337,7 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con boost::property_tree::json_parser::read_json(ss, header); assert(header.get("version") == "1"); string stmp = header.get("max_offset"); - size_t maxJournalOffset = strtoul(stmp); + size_t maxJournalOffset = strtoul(stmp.c_str(), NULL, 0); struct stat objStat; fstat(objFD, &objStat); @@ -368,7 +369,7 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con // at the EOF of the object. The journal may contain entries that append to the data, // so 0-fill the remaining bytes. #ifdef DEBUG - memset(&ret[count], 0, len-count); + memset(&ret[count], 0, *len-count); #endif break; } @@ -385,7 +386,7 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con // if this entry overlaps, read the overlapping section uint64_t lastJournalOffset = offlen[0] + offlen[1]; - uint64_t lastBufOffset = offset + len; + uint64_t lastBufOffset = offset + *len; if (offlen[0] <= lastBufOffset && lastJournalOffset >= offset) { uint64_t startReadingAt = max(offlen[0], offset); @@ -441,7 +442,7 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array &objData, size boost::property_tree::json_parser::read_json(ss, header); assert(header.get("version") == "1"); string stmp = header.get("max_offset"); - size_t maxJournalOffset = strtoul(stmp); + size_t maxJournalOffset = strtoul(stmp.c_str(), NULL, 0); if (maxJournalOffset > *len) { @@ -487,6 +488,12 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array &objData, size return 0; } +void IOCoordinator::renameObject(const string &oldKey, const string &newKey) +{ + // does anything need to be done here? +} + + bool IOCoordinator::readLock(const string &filename) { boost::unique_lock s(lockMutex); diff --git a/src/IOCoordinator.h b/src/IOCoordinator.h index 221bc0df1..898ffe6c2 100644 --- a/src/IOCoordinator.h +++ b/src/IOCoordinator.h @@ -45,6 +45,7 @@ class IOCoordinator : public boost::noncopyable int mergeJournalInMem(boost::shared_array &objData, size_t *len, const char *journalPath); /* Lock manipulation fcns. They can lock on any param given to them. */ + void renameObject(const std::string &oldKey, const std::string &newKey); bool readLock(const std::string &filename); bool writeLock(const std::string &filename); void readUnlock(const std::string &filename); diff --git a/src/LocalStorage.cpp b/src/LocalStorage.cpp index a9129b016..d33d5dfc4 100644 --- a/src/LocalStorage.cpp +++ b/src/LocalStorage.cpp @@ -2,7 +2,9 @@ #include #include -#include +#include +#include +#include #include "LocalStorage.h" #include "Config.h" @@ -24,11 +26,10 @@ LocalStorage::LocalStorage() } catch (exception &e) { - syslog(LOG_CRIT, "Failed to create %s, got: %s", prefix.string().c_str(), e.what()); + logger->log(LOG_CRIT, "Failed to create %s, got: %s", prefix.string().c_str(), e.what()); throw e; } } - logger = SMLogging::get(); } LocalStorage::~LocalStorage() @@ -40,10 +41,10 @@ const bf::path & LocalStorage::getPrefix() const return prefix; } -int LocalStorage::copy(const path &source, const path &dest) +int LocalStorage::copy(const bf::path &source, const bf::path &dest) { boost::system::error_code err; - bf::copy_file(source, dest, copy_option::fail_if_exists, err); + bf::copy_file(source, dest, bf::copy_option::fail_if_exists, err); if (err) { errno = err.value(); @@ -78,13 +79,13 @@ int LocalStorage::getObject(const std::string &sourceKey, boost::shared_arraylog(LOG_CRIT, "LocalStorage::getObject() failed to open %s, got '%s'", c_source, strerror_r(errno, buf, 80)); return fd; } - scoped_closer s(fd); + size_t count = 0; while (count < l_size) { @@ -92,12 +93,14 @@ int LocalStorage::getObject(const std::string &sourceKey, boost::shared_arraylog(LOG_CRIT, "LocalStorage::getObject() failed to read %s, got '%s'", c_source, strerror_r(errno, buf, 80)); + close(fd); return err; } count += err; } if (size) *size = l_size; + close(fd); return 0; } @@ -115,10 +118,10 @@ int LocalStorage::putObject(boost::shared_array data, size_t len, const int fd = ::open(c_dest, O_WRONLY | O_CREAT | O_TRUNC, 0600); if (fd < 0) { - logger->log("LocalStorage::putObject(): Failed to open %s, got '%s'", c_dest, strerror_r(errno, buf, 80)); + logger->log(LOG_CRIT, "LocalStorage::putObject(): Failed to open %s, got '%s'", c_dest, strerror_r(errno, buf, 80)); return fd; } - scoped_closer s(fd); + size_t count = 0; int err; while (count < len) @@ -126,11 +129,13 @@ int LocalStorage::putObject(boost::shared_array data, size_t len, const err = ::write(fd, &data[count], len - count); if (err < 0) { - logger->log("LocalStorage::putObject(): Failed to write to %s, got '%s'", c_dest, strerror_r(errno, buf, 80)); + logger->log(LOG_CRIT, "LocalStorage::putObject(): Failed to write to %s, got '%s'", c_dest, strerror_r(errno, buf, 80)); + close(fd); return err; } count += err; } + close(fd); return 0; } diff --git a/src/LocalStorage.h b/src/LocalStorage.h index 99209b764..d572df0c8 100644 --- a/src/LocalStorage.h +++ b/src/LocalStorage.h @@ -3,6 +3,7 @@ #include #include "CloudStorage.h" +#include "SMLogging.h" #include namespace storagemanager @@ -26,7 +27,6 @@ class LocalStorage : public CloudStorage private: boost::filesystem::path prefix; - SMLogging *logger; int copy(const boost::filesystem::path &sourceKey, const boost::filesystem::path &destKey); }; diff --git a/src/Replicator.cpp b/src/Replicator.cpp index a84b702ed..40fb79fee 100755 --- a/src/Replicator.cpp +++ b/src/Replicator.cpp @@ -134,7 +134,7 @@ int Replicator::addJournalEntry(const char *filename, const uint8_t *data, off_t return count; } -int Replicator::remove(const char *filename, uint8_t flags) +int Replicator::remove(const char *filename, Flags flags) { int ret = 0; boost::filesystem::path p(filename); @@ -151,4 +151,9 @@ int Replicator::remove(const char *filename, uint8_t flags) return ret; } +int Replicator::updateMetadata(const char *filename, const MetadataFile &meta) +{ + return 0; +} + } diff --git a/src/Replicator.h b/src/Replicator.h index 63b7785d0..e78603b7e 100755 --- a/src/Replicator.h +++ b/src/Replicator.h @@ -2,6 +2,7 @@ #define REPLICATOR_H_ //#include "ThreadPool.h" +#include "MetadataFile.h" #include #include @@ -18,10 +19,18 @@ class Replicator static Replicator *get(); virtual ~Replicator(); + enum Flags + { + NONE = 0, + LOCAL_ONLY = 0x1, + NO_LOCAL = 0x2 + }; + int addJournalEntry(const char *filename, const uint8_t *data, off_t offset, size_t length); int newObject(const char *filename, const uint8_t *data, size_t length); - int remove(const char *key ,uint8_t flags); - + int remove(const char *key, Flags flags = NONE); + + int updateMetadata(const char *filename, const MetadataFile &meta); private: Replicator(); diff --git a/src/S3Storage.cpp b/src/S3Storage.cpp index 3a6eedb16..8984be323 100644 --- a/src/S3Storage.cpp +++ b/src/S3Storage.cpp @@ -19,7 +19,7 @@ int S3Storage::getObject(const string &sourceKey, const string &destFile, size_t return 0; } -int S3Storage::getObject(const string &sourceKey, boost::shared_array &data, size_t *size = NULL) +int S3Storage::getObject(const string &sourceKey, boost::shared_array &data, size_t *size) { return 0; } @@ -29,7 +29,7 @@ int S3Storage::putObject(const string &sourceFile, const string &destKey) return 0; } -int S3Storage::putObject(boost::shared_array data, uint len, const string &destKey) +int S3Storage::putObject(const boost::shared_array data, size_t len, const string &destKey) { return 0; } diff --git a/src/S3Storage.h b/src/S3Storage.h index fe9988372..34abaa9df 100644 --- a/src/S3Storage.h +++ b/src/S3Storage.h @@ -17,7 +17,7 @@ class S3Storage : public CloudStorage int getObject(const std::string &sourceKey, const std::string &destFile, size_t *size = NULL); int getObject(const std::string &sourceKey, boost::shared_array &data, size_t *size = NULL); int putObject(const std::string &sourceFile, const std::string &destKey); - int putObject(boost::shared_array data, uint len, const std::string &destKey); + int putObject(const boost::shared_array data, size_t len, const std::string &destKey); void deleteObject(const std::string &key); int copyObject(const std::string &sourceKey, const std::string &destKey); int exists(const std::string &key, bool *out); diff --git a/src/Synchronizer.cpp b/src/Synchronizer.cpp index 6bbc7456f..e00ddd177 100644 --- a/src/Synchronizer.cpp +++ b/src/Synchronizer.cpp @@ -1,14 +1,69 @@ #include "Synchronizer.h" -#include "Metadatafile.h" +#include "MetadataFile.h" #include +#include +#include +#include + using namespace std; namespace { - storagemanager::Synchronizer *instance = NULL; - boost::mutex inst_mutex; +storagemanager::Synchronizer *instance = NULL; +boost::mutex inst_mutex; + +// a few utility classes. Maybe move these to a utilities header. +struct ScopedReadLock +{ + ScopedReadLock(storagemanager::IOCoordinator *i, const string &k) : ioc(i), key(k) + { + ioc->readLock(key); + } + ~ScopedReadLock() + { + ioc->readUnlock(key); + } + storagemanager::IOCoordinator *ioc; + const string key; +}; + +struct ScopedWriteLock +{ + ScopedWriteLock(storagemanager::IOCoordinator *i, const string &k) : ioc(i), key(k) + { + ioc->writeLock(key); + locked = true; + } + ~ScopedWriteLock() + { + unlock(); + } + + void unlock() + { + if (locked) + { + ioc->writeUnlock(key); + locked = false; + } + } + storagemanager::IOCoordinator *ioc; + bool locked; + const string key; +}; + +struct ScopedCloser { + ScopedCloser(int f) : fd(f) { } + ~ScopedCloser() { + int s_errno = errno; + ::close(fd); + errno = s_errno; + } + int fd; +}; + } namespace bf = boost::filesystem; @@ -33,8 +88,9 @@ Synchronizer::Synchronizer() : maxUploads(0) cache = Cache::get(); replicator = Replicator::get(); ioc = IOCoordinator::get(); + cs = CloudStorage::get(); - string stmp = config->getValue("ObjectStorage", "max_concurrent_uploads") + string stmp = config->getValue("ObjectStorage", "max_concurrent_uploads"); try { maxUploads = stoul(stmp); @@ -47,21 +103,21 @@ Synchronizer::Synchronizer() : maxUploads(0) maxUploads = 20; stmp = config->getValue("ObjectStorage", "journal_path"); - if (prefix.empty()) + if (stmp.empty()) { logger->log(LOG_CRIT, "ObjectStorage/journal_path is not set"); throw runtime_error("Please set ObjectStorage/journal_path in the storagemanager.cnf file"); } try { - bf::create_directories(stmp); + journalPath = stmp; + bf::create_directories(journalPath); } catch (exception &e) { - syslog(LOG_CRIT, "Failed to create %s, got: %s", stmp.string().c_str(), e.what()); + logger->log(LOG_CRIT, "Failed to create %s, got: %s", stmp.c_str(), e.what()); throw e; } - journalPath = stmp; cachePath = cache->getCachePath(); threadPool.setMaxThreads(maxUploads); } @@ -100,7 +156,7 @@ void Synchronizer::newObjects(const vector &keys) { boost::unique_lock s(mutex); - for (string &key : keys) + for (const string &key : keys) { assert(pendingOps.find(key) == pendingOps.end()); makeJob(key); @@ -112,7 +168,7 @@ void Synchronizer::deletedObjects(const vector &keys) { boost::unique_lock s(mutex); - for (string &key : keys) + for (const string &key : keys) { auto it = pendingOps.find(key); if (it != pendingOps.end()) @@ -127,30 +183,85 @@ void Synchronizer::deletedObjects(const vector &keys) void Synchronizer::flushObject(const string &key) { - process(key); + boost::unique_lock s(mutex); + + // if there is something to do on key, it should be in the objNames list + // and either in pendingOps or opsInProgress. + // in testing though, going to check whether there is something to do + + bool noExistingJob = false; + auto it = pendingOps.find(key); + if (it != pendingOps.end()) + // find the object name and call process() + for (auto name = objNames.begin(); name != objNames.end(); ++it) + if (*name == key) + { + process(name, false); + break; + } + else + { + auto op = opsInProgress.find(key); + // it's already in progress + if (op != opsInProgress.end()) + op->second->wait(&mutex); + else + { + // it's not in either one, check if there is anything to be done as + // a sanity check. + noExistingJob = true; + } + } + + if (!noExistingJob) + return; + + // check whether this key is in cloud storage + bool exists; + int err; + do { + err = cs->exists(key.c_str(), &exists); + if (err) + { + char buf[80]; + logger->log(LOG_CRIT, "Sync::flushObject(): cloud existence check failed, got '%s'", strerror_r(errno, buf, 80)); + sleep(5); + } + } while (err); + if (!exists) + { + logger->log(LOG_DEBUG, "Sync::flushObject(): broken assumption! %s does not exist in cloud storage, but there is no job for it. Uploading it now."); + pendingOps[key] = boost::shared_ptr(new PendingOps(NEW_OBJECT)); + objNames.push_front(key); + process(objNames.begin(), false); + } } void Synchronizer::makeJob(const string &key) { - boost::shared_ptr s(new string(key)); - names.push_front(s); + objNames.push_front(key); - boost::shared_ptr j(new Job(this, names.begin())); + boost::shared_ptr j(new Job(this, objNames.begin())); threadPool.addJob(j); } -void Synchronizer::process(list::iterator &name) +void Synchronizer::process(list::iterator name, bool use_lock) { /* - check if there is a pendingOp for *it + check if there is a pendingOp for name if yes, start processing it if no, check if there is an ongoing op and block on it if not, return */ - boost::unique_lock s(mutex); + // had to use this 'use_lock' kludge to let flush() start processing a job immediately + boost::unique_lock s(mutex, boost::defer_lock); + if (use_lock) + s.lock(); + + string &key = *name; auto it = pendingOps.find(key); if (it == pendingOps.end()) { @@ -161,22 +272,27 @@ void Synchronizer::process(list::iterator &name) op->second->wait(&mutex); return; } + else + // it's not in pending or opsinprogress, nothing to do + return; } boost::shared_ptr pending = it->second; - opsInProgress[key] = *it; + opsInProgress[key] = pending; pendingOps.erase(it); - string sourceFile = Metadata::getSourceFilenameFromKey(*name); + string sourceFile = MetadataFile::getSourceFromKey(*name); s.unlock(); bool success = false; while (!success) { try { + /* Exceptions should only happen b/c of cloud service errors. Rather than retry here endlessly, + probably a better idea to have cloudstorage classes do the retrying */ if (pending->opFlags & DELETE) synchronizeDelete(sourceFile, name); else if (pending->opFlags & JOURNAL) - synchronizerWithJournal(sourceFile, name); + synchronizeWithJournal(sourceFile, name); else if (pending->opFlags & NEW_OBJECT) synchronize(sourceFile, name); else @@ -194,53 +310,10 @@ void Synchronizer::process(list::iterator &name) s.lock(); opsInProgress.erase(key); - names.erase(name); - - // TBD: On a network outage or S3 outage, it might not be a bad idea to keep retrying - // until the end of time. This will (?) naturally make the system unusable until the blockage - // is cleared, which is what we want, right? Is there a way to nicely tell the user what - // is happening, or are the logs good enough? + objNames.erase(name); } -struct ScopedReadLock -{ - ScopedReadLock(IOCoordinator *i, string &key) - { - ioc = i; - ioc->readLock(key.c_str()); - } - ~ScopedReadLock() - - ioc->readUnlock(key.c_str()); - } - IOCoordinator *ioc; -}; -struct ScopedWriteLock -{ - ScopedWriteLock(IOCoordinator *i, string &key) - { - ioc = i; - ioc->writeLock(key.c_str()); - locked = true; - } - ~ScopedReadLock() - { - if (locked) - ioc->writeUnlock(key.c_str()); - } - - void unlock() - { - if (locked) - { - ioc->writeUnlock(key.c_str()); - locked = false; - } - } - IOCoordinator *ioc; - bool locked; -}; void Synchronizer::synchronize(const string &sourceFile, list::iterator &it) { @@ -266,21 +339,20 @@ void Synchronizer::synchronize(const string &sourceFile, list::iterator return; } - err = cs->putObject(cachePath / key, key); + err = cs->putObject((cachePath / key).string(), key); if (err) throw runtime_error(string("synchronize(): uploading ") + key + ", got " + strerror_r(errno, buf, 80)); - replicator->delete(key, Replicator::NO_LOCAL); + replicator->remove(key.c_str(), Replicator::NO_LOCAL); } void Synchronizer::synchronizeDelete(const string &sourceFile, list::iterator &it) { ScopedWriteLock s(ioc, sourceFile); - cs->delete(*it); + cs->deleteObject(*it); } void Synchronizer::synchronizeWithJournal(const string &sourceFile, list::iterator &lit) { - // interface to Metadata TBD ScopedWriteLock s(ioc, sourceFile); string &key = *lit; @@ -289,7 +361,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list if (!bf::exists(journalName)) { - logger->(LOG_WARNING, "synchronizeWithJournal(): no journal file found for %s", key.c_str()); + logger->log(LOG_WARNING, "synchronizeWithJournal(): no journal file found for %s", key.c_str()); // I don't think this should happen, maybe throw a logic_error here return; } @@ -307,15 +379,15 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list err = cs->getObject(key, data, &size); if (err) throw runtime_error(string("Synchronizer: getObject() failed: ") + strerror_r(errno, buf, 80)); - err = ios->mergeJournalInMem(data, journalName, &size); + err = ioc->mergeJournalInMem(data, &size, journalName.c_str()); assert(!err); } else - data = ios->mergeJournal(oldCachePath.string(), journalName, 0, &size); + data = ioc->mergeJournal(oldCachePath.string().c_str(), journalName.c_str(), 0, &size); assert(data); // get a new key for the resolved version & upload it - string newKey = ioc->newKeyFromOldKey(key); + string newKey = MetadataFile::getNewKeyFromOldKey(key, size); err = cs->putObject(data, size, newKey); if (err) throw runtime_error(string("Synchronizer: putObject() failed: ") + strerror_r(errno, buf, 80)); @@ -346,24 +418,23 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list } cache->rename(key, newKey, size - bf::file_size(oldCachePath)); - replicator->delete(key); + replicator->remove(key.c_str()); } // update the metadata for the source file - // waiting for stubs to see what these calls look like - /* - Metadata md(sourceFilename); - md.rename(key, newKey); - replicator->updateMetadata(sourceFilename, md); - */ + + MetadataFile md(sourceFile.c_str()); + md.updateEntry(MetadataFile::getOffsetFromKey(key), newKey, size); + replicator->updateMetadata(sourceFile.c_str(), md); + rename(key, newKey); - ioc->renameObject(oldkey, newkey); + ioc->renameObject(key, newKey); s.unlock(); // delete the old object & journal file - cache->deletedJournal(bf::file_size(journalName); - replicator->delete(journalName); - cs->delete(key); + cache->deletedJournal(bf::file_size(journalName)); + replicator->remove(journalName.c_str()); + cs->deleteObject(key); } void Synchronizer::rename(const string &oldKey, const string &newKey) @@ -377,8 +448,8 @@ void Synchronizer::rename(const string &oldKey, const string &newKey) pendingOps.erase(it); for (auto &name: objNames) - if (*name == oldKey) - *name = newKey; + if (name == oldKey) + name = newKey; } bf::path Synchronizer::getJournalPath() @@ -393,22 +464,18 @@ bf::path Synchronizer::getCachePath() /* The helper objects & fcns */ -Synchronizer::PendingOps(int flags) : opFlags(flags), finished(false) +Synchronizer::PendingOps::PendingOps(int flags) : opFlags(flags), finished(false) { } -Synchronizer::~PendingOps() -{ -} - -Synchronizer::PendingOps::notify(boost::mutex *m) +void Synchronizer::PendingOps::notify(boost::mutex *m) { boost::unique_lock s(*m); finished = true; condvar.notify_all(); } -Synchronizer::PendingOps::wait(boost::mutex *m) +void Synchronizer::PendingOps::wait(boost::mutex *m) { while (!finished) condvar.wait(*m); diff --git a/src/Synchronizer.h b/src/Synchronizer.h index c9e2525c2..ad5b7d288 100644 --- a/src/Synchronizer.h +++ b/src/Synchronizer.h @@ -6,6 +6,7 @@ #include #include #include +#include #include "SMLogging.h" #include "Cache.h" @@ -16,6 +17,8 @@ namespace storagemanager { +class Cache; + /* TODO: Need to think about how errors are handled / propagated */ class Synchronizer : public boost::noncopyable { @@ -36,10 +39,10 @@ class Synchronizer : public boost::noncopyable private: Synchronizer(); - void process(const std::string &key); - void synchronize(const std::string &key, bool isFlush); - void synchronizeDelete(const std::string &key); - void synchronizeWithJournal(const std::string &key, bool isFlush); + void process(std::list::iterator key, bool use_lock=true); + void synchronize(const std::string &sourceFile, std::list::iterator &it); + void synchronizeDelete(const std::string &sourceFile, std::list::iterator &it); + void synchronizeWithJournal(const std::string &sourceFile, std::list::iterator &it); void rename(const std::string &oldkey, const std::string &newkey); void makeJob(const std::string &key); @@ -56,12 +59,13 @@ class Synchronizer : public boost::noncopyable struct Job : public ThreadPool::Job { - Job(Synchronizer *s, std::list::iterator &i) : sync(s), it(i) { } + Job(Synchronizer *s, std::list::iterator i) : sync(s), it(i) { } void operator()() { sync->process(it); } Synchronizer *sync; std::list::iterator it; }; + uint maxUploads; ThreadPool threadPool; std::map > pendingOps; std::map > opsInProgress; @@ -76,6 +80,7 @@ class Synchronizer : public boost::noncopyable Cache *cache; Replicator *replicator; IOCoordinator *ioc; + CloudStorage *cs; boost::filesystem::path cachePath; boost::filesystem::path journalPath; diff --git a/src/unit_tests.cpp b/src/unit_tests.cpp index 2d667e258..3ad4ac887 100755 --- a/src/unit_tests.cpp +++ b/src/unit_tests.cpp @@ -177,8 +177,8 @@ bool replicatorTest() cout << "replicator addJournalEntry OK" << endl; ::close(fd); - repli->remove(newobject,0); - repli->remove(newobjectJournal,0); + repli->remove(newobject); + repli->remove(newobjectJournal); assert(!boost::filesystem::exists(newobject)); cout << "replicator remove OK" << endl; return true; @@ -535,7 +535,7 @@ bool localstorageTest1() bool cacheTest1() { - Cache cache; + Cache *cache = Cache::get(); CloudStorage *cs = CloudStorage::get(); LocalStorage *ls = dynamic_cast(cs); if (ls == NULL) { @@ -544,15 +544,15 @@ bool cacheTest1() } bf::path storagePath = ls->getPrefix(); - bf::path cachePath = cache.getCachePath(); + bf::path cachePath = cache->getCachePath(); vector v_bogus; vector exists; // make sure nothing shows up in the cache path for files that don't exist v_bogus.push_back("does-not-exist"); - cache.read(v_bogus); + cache->read(v_bogus); assert(!bf::exists(cachePath / "does-not-exist")); - cache.exists(v_bogus, &exists); + cache->exists(v_bogus, &exists); assert(exists.size() == 1); assert(!exists[0]); @@ -560,21 +560,21 @@ bool cacheTest1() string realFile("storagemanager.cnf"); bf::copy_file(realFile, storagePath / realFile, bf::copy_option::overwrite_if_exists); v_bogus[0] = realFile; - cache.read(v_bogus); + cache->read(v_bogus); assert(bf::exists(cachePath / realFile)); exists.clear(); - cache.exists(v_bogus, &exists); + cache->exists(v_bogus, &exists); assert(exists.size() == 1); assert(exists[0]); - ssize_t currentSize = cache.getCurrentCacheSize(); + ssize_t currentSize = cache->getCurrentCacheSize(); assert(currentSize == bf::file_size(cachePath / realFile)); // lie about the file being deleted and then replaced - cache.deletedObject(realFile, currentSize); - assert(cache.getCurrentCacheSize() == 0); - cache.newObject(realFile, currentSize); - assert(cache.getCurrentCacheSize() == currentSize); - cache.exists(v_bogus, &exists); + cache->deletedObject(realFile, currentSize); + assert(cache->getCurrentCacheSize() == 0); + cache->newObject(realFile, currentSize); + assert(cache->getCurrentCacheSize() == currentSize); + cache->exists(v_bogus, &exists); assert(exists.size() == 1); assert(exists[0]); @@ -584,6 +584,33 @@ bool cacheTest1() cout << "cache test 1 OK" << endl; } +void makeTestObject() +{ + int objFD = open("test-object", O_WRONLY | O_CREAT | O_TRUNC, 0600); + assert(objFD >= 0); + scoped_closer s1(objFD); + + for (int i = 0; i < 2048; i++) + assert(write(objFD, &i, 4) == 4); +} + +// the merged version should look like +// (ints) 0 1 2 3 4 0 1 2 3 4 10 11 12 13... +void makeTestJournal() +{ + int journalFD = open("test-journal", O_WRONLY | O_CREAT | O_TRUNC, 0600); + assert(journalFD >= 0); + scoped_closer s2(journalFD); + + char header[] = "{ \"version\" : 1, \"max_offset\": 39 }"; + write(journalFD, header, strlen(header) + 1); + + uint64_t offlen[2] = { 20, 20 }; + write(journalFD, offlen, 16); + for (int i = 0; i < 5; i++) + assert(write(journalFD, &i, 4) == 4); +} + bool mergeJournalTest() { /* @@ -592,30 +619,13 @@ bool mergeJournalTest() verify the expected values */ - int objFD = open("test-object", O_WRONLY | O_CREAT | O_TRUNC, 0600); - assert(objFD >= 0); - scoped_closer s1(objFD); - int journalFD = open("test-journal", O_WRONLY | O_CREAT | O_TRUNC, 0600); - assert(journalFD >= 0); - scoped_closer s2(journalFD); + makeTestObject(); + makeTestJournal(); int i; - for (i = 0; i < 2048; i++) - assert(write(objFD, &i, 4) == 4); - - char header[] = "{ \"version\" : 1 }"; - write(journalFD, header, strlen(header) + 1); - - uint64_t offlen[2] = { 20, 20 }; - write(journalFD, offlen, 16); - for (i = 0; i < 5; i++) - assert(write(journalFD, &i, 4) == 4); - - // the merged version should look like - // (ints) 0 1 2 3 4 0 1 2 3 4 10 11 12 13... - IOCoordinator *ioc = IOCoordinator::get(); - boost::shared_array data = ioc->mergeJournal("test-object", "test-journal"); + size_t len = 0; + boost::shared_array data = ioc->mergeJournal("test-object", "test-journal", 0, &len); assert(data); int *idata = (int *) data.get(); for (i = 0; i < 5; i++) @@ -627,7 +637,8 @@ bool mergeJournalTest() // try different range parameters // read at the beginning of the change - data = ioc->mergeJournal("test-object", "test-journal", 20, 40); + len = 40; + data = ioc->mergeJournal("test-object", "test-journal", 20, &len); assert(data); idata = (int *) data.get(); for (i = 0; i < 5; i++) @@ -636,7 +647,8 @@ bool mergeJournalTest() assert(idata[i] == i+5); // read s.t. beginning of the change is in the middle of the range - data = ioc->mergeJournal("test-object", "test-journal", 8, 24); + len = 24; + data = ioc->mergeJournal("test-object", "test-journal", 8, &len); assert(data); idata = (int *) data.get(); for (i = 0; i < 3; i++) @@ -645,7 +657,8 @@ bool mergeJournalTest() assert(idata[i] == i - 3); // read s.t. end of the change is in the middle of the range - data = ioc->mergeJournal("test-object", "test-journal", 28, 20); + len = 20; + data = ioc->mergeJournal("test-object", "test-journal", 28, &len); assert(data); idata = (int *) data.get(); for (i = 0; i < 3; i++) @@ -662,7 +675,7 @@ bool syncTest1() { Synchronizer *sync = Synchronizer::get(); Cache *cache = Cache::get(); - CloudStorage *cs = CloudStorage.get(); + CloudStorage *cs = CloudStorage::get(); bf::path cachePath = sync->getCachePath(); bf::path journalPath = sync->getJournalPath(); @@ -689,16 +702,13 @@ bool syncTest1() sleep(1); // let it do what it does // check that the original objects no longer exist - assert(!cache->exists("test-object.obj"); - assert(!bf::exists(cachePath + assert(!cache->exists("test-object.obj")); + //working here assert(!bf::exists(cachePath // cleanup - bf::remove(cachePath / "test-object"); - bf::remove(cachePath / "test-journal"); - - - + bf::remove(cachePath / "test-object.obj"); + bf::remove(journalPath / "test-object.journal"); } int main()