diff --git a/src/Cache.cpp b/src/Cache.cpp index 84f248dba..018fc5947 100644 --- a/src/Cache.cpp +++ b/src/Cache.cpp @@ -192,6 +192,12 @@ void Cache::exists(const vector &keys, vector *out) (*out)[i] = (m_lru.find(keys[i]) != m_lru.end()); } +bool Cache::exists(const string &key) +{ + boost::unique_lock s(lru_mutex); + return m_lru.find(keys[i]) != m_lru.end(); +} + void Cache::newObject(const string &key, size_t size) { boost::unique_lock s(lru_mutex); @@ -278,6 +284,19 @@ void Cache::makeSpace(size_t size) } } +void Cache::rename(const string &oldKey, const string &newKey, ssize_t sizediff) +{ + boost::unique_lock s(lru_mutex); + auto it = m_lru(oldKey); + assert(it != m_lru.end()); + + auto lit = it->lit; + m_lru.erase(it); + *lit = newKey; + m_lru.insert(lit); + currentCacheSize += sizediff; +} + size_t Cache::getCurrentCacheSize() const { return currentCacheSize; diff --git a/src/Cache.h b/src/Cache.h index 596c7021b..df0a31dc3 100644 --- a/src/Cache.h +++ b/src/Cache.h @@ -24,11 +24,16 @@ class Cache : public boost::noncopyable virtual ~Cache(); void read(const std::vector &keys); + bool exists(const std::string &key); void exists(const std::vector &keys, std::vector *out); void newObject(const std::string &key, size_t size); void newJournalEntry(size_t size); void deletedObject(const std::string &key, size_t size); void deletedJournal(size_t size); + + // rename is used when an old obj gets merged with its journal file + // the size will change in that process; sizediff is by how much + void rename(const std::string &oldKey, const std::string &newKey, ssize_t sizediff); void setMaxCacheSize(size_t size); size_t getCurrentCacheSize() const; diff --git a/src/CloudStorage.h b/src/CloudStorage.h index 070b38a37..20194db50 100644 --- a/src/CloudStorage.h +++ b/src/CloudStorage.h @@ -3,6 +3,7 @@ #define CLOUDSTORAGE_H_ #include +#include namespace storagemanager { @@ -13,7 +14,9 @@ class CloudStorage /* These behave like syscalls. return code -1 means an error, and errno is set */ virtual int getObject(const std::string &sourceKey, const std::string &destFile, size_t *size = NULL) = 0; + virtual int getObject(const std::string &sourceKey, boost::shared_array &data, size_t *size = NULL) = 0; virtual int putObject(const std::string &sourceFile, const std::string &destKey) = 0; + virtual int putObject(const boost::shared_array data, size_t len, const std::string &destKey) = 0; virtual void deleteObject(const std::string &key) = 0; virtual int copyObject(const std::string &sourceKey, const std::string &destKey) = 0; virtual int exists(const std::string &key, bool *out) = 0; diff --git a/src/IOCoordinator.cpp b/src/IOCoordinator.cpp index 12036cbe8..7d325374f 100644 --- a/src/IOCoordinator.cpp +++ b/src/IOCoordinator.cpp @@ -254,7 +254,8 @@ boost::shared_array seekToEndOfHeader1(int fd) } -boost::shared_array IOCoordinator::mergeJournal(const char *object, const char *journal, off_t offset, size_t len) const +boost::shared_array IOCoordinator::mergeJournal(const char *object, const char *journal, off_t offset, + size_t *len) const { int objFD, journalFD; boost::shared_array ret; @@ -268,17 +269,32 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con return NULL; scoped_closer s2(journalFD); - // TODO: Right now this assumes that max object size has not been changed. - // ideally, we would have a way to look up the size of a specific object. - if (len == 0) - len = objectSize - offset; - ret.reset(new uint8_t[len]); + // grab the journal header, make sure the version is 1, and get the max offset + boost::shared_array headertxt = seekToEndOfHeader1(journalFD); + stringstream ss; + ss << headertxt.get(); + boost::property_tree::ptree header; + boost::property_tree::json_parser::read_json(ss, header); + assert(header.get("version") == "1"); + string stmp = header.get("max_offset"); + size_t maxJournalOffset = strtoul(stmp); + + struct stat objStat; + fstat(objFD, &objStat); + + if (*len == 0) + // read to the end of the file + *len = max(maxJournalOffset, objStat.st_size) - offset; + else + // make sure len is within the bounds of the data + *len = min(*len, (max(maxJournalOffset, objStat.st_size) - offset)); + ret.reset(new uint8_t[*len]); // read the object into memory size_t count = 0; ::lseek(objFD, offset, SEEK_SET); - while (count < len) { - int err = ::read(objFD, &ret[count], len - count); + while (count < *len) { + int err = ::read(objFD, &ret[count], *len - count); if (err < 0) { char buf[80]; @@ -300,14 +316,6 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con count += err; } - // grab the journal header and make sure the version is 1 - boost::shared_array headertxt = seekToEndOfHeader1(journalFD); - stringstream ss; - ss << headertxt.get(); - boost::property_tree::ptree header; - boost::property_tree::json_parser::read_json(ss, header); - assert(header.get("version") == "1"); - // start processing the entries while (1) { @@ -359,7 +367,7 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con } // MergeJournalInMem is a specialized version of mergeJournal(). TODO: refactor if possible. -int IOCoordinator::mergeJournalInMem(uint8_t *objData, const char *journalPath) +int IOCoordinator::mergeJournalInMem(boost::shared_array &objData, size_t *len, const char *journalPath) { int journalFD = ::open(journalPath, O_RDONLY); if (journalFD < 0) @@ -373,6 +381,14 @@ int IOCoordinator::mergeJournalInMem(uint8_t *objData, const char *journalPath) boost::property_tree::ptree header; boost::property_tree::json_parser::read_json(ss, header); assert(header.get("version") == "1"); + string stmp = header.get("max_offset"); + size_t maxJournalOffset = strtoul(stmp); + + if (maxJournalOffset > *len) + { + objData.reset(new uint8_t[maxJournalOffset]); + *len = maxJournalOffset; + } // start processing the entries while (1) diff --git a/src/IOCoordinator.h b/src/IOCoordinator.h index 8dd7cecd1..efdd6ca4a 100644 --- a/src/IOCoordinator.h +++ b/src/IOCoordinator.h @@ -38,10 +38,10 @@ class IOCoordinator : public boost::noncopyable void getNewKeyFromSourceName(const std::string &sourceName, std::string *newKey); // The shared logic for merging a journal file with its base file. - // The default values for offset and len mean 'process the whole file'. Otherwise, - // offset is relative to the object. - boost::shared_array mergeJournal(const char *objectPath, const char *journalPath, off_t offset = 0, size_t len = 0) const; - int mergeJournalInMem(uint8_t *objData, const char *journalPath); + // *len should be set to the length of the data requested (0 means read the whole file), + // on return *len will be the actual length returned. + boost::shared_array mergeJournal(const char *objectPath, const char *journalPath, off_t offset, size_t *len) const; + int mergeJournalInMem(boost::shared_array &objData, size_t *len, const char *journalPath); private: IOCoordinator(); diff --git a/src/LocalStorage.cpp b/src/LocalStorage.cpp index b6a4a9910..a9129b016 100644 --- a/src/LocalStorage.cpp +++ b/src/LocalStorage.cpp @@ -7,7 +7,7 @@ #include "Config.h" using namespace std; -using namespace boost::filesystem; +namespace bf = boost::filesystem; namespace storagemanager { @@ -16,11 +16,11 @@ LocalStorage::LocalStorage() { prefix = Config::get()->getValue("LocalStorage", "path"); //cout << "LS: got prefix " << prefix << endl; - if (!is_directory(prefix)) + if (!bf::is_directory(prefix)) { try { - create_directories(prefix); + bf::create_directories(prefix); } catch (exception &e) { @@ -28,13 +28,14 @@ LocalStorage::LocalStorage() throw e; } } + logger = SMLogging::get(); } LocalStorage::~LocalStorage() { } -const boost::filesystem::path & LocalStorage::getPrefix() const +const bf::path & LocalStorage::getPrefix() const { return prefix; } @@ -42,7 +43,7 @@ const boost::filesystem::path & LocalStorage::getPrefix() const int LocalStorage::copy(const path &source, const path &dest) { boost::system::error_code err; - copy_file(source, dest, copy_option::fail_if_exists, err); + bf::copy_file(source, dest, copy_option::fail_if_exists, err); if (err) { errno = err.value(); @@ -51,9 +52,9 @@ int LocalStorage::copy(const path &source, const path &dest) return 0; } -path operator+(const path &p1, const path &p2) +bf::path operator+(const bf::path &p1, const bf::path &p2) { - path ret(p1); + bf::path ret(p1); ret /= p2; return ret; } @@ -64,15 +65,75 @@ int LocalStorage::getObject(const string &source, const string &dest, size_t *si if (ret) return ret; if (size) - *size = boost::filesystem::file_size(dest); + *size = bf::file_size(dest); return ret; } +int LocalStorage::getObject(const std::string &sourceKey, boost::shared_array &data, size_t *size) +{ + int ret; + bf::path source = prefix / sourceKey; + const char *c_source = source.string().c_str(); + size_t l_size = bf::file_size(source); + data.reset(new uint8_t[l_size]); + char buf[80]; + + int fd = open(c_source, O_RDONLY); + if (fd < 0) + { + logger->log(LOG_CRIT, "LocalStorage::getObject() failed to open %s, got '%s'", c_source, strerror_r(errno, buf, 80)); + return fd; + } + scoped_closer s(fd); + size_t count = 0; + while (count < l_size) + { + int err = ::read(fd, &data[count], l_size - count); + if (err < 0) + { + logger->log(LOG_CRIT, "LocalStorage::getObject() failed to read %s, got '%s'", c_source, strerror_r(errno, buf, 80)); + return err; + } + count += err; + } + if (size) + *size = l_size; + return 0; +} + int LocalStorage::putObject(const string &source, const string &dest) { return copy(source, prefix / dest); } +int LocalStorage::putObject(boost::shared_array data, size_t len, const string &dest) +{ + bf::path destPath = prefix / dest; + const char *c_dest = destPath.string().c_str(); + char buf[80]; + + int fd = ::open(c_dest, O_WRONLY | O_CREAT | O_TRUNC, 0600); + if (fd < 0) + { + logger->log("LocalStorage::putObject(): Failed to open %s, got '%s'", c_dest, strerror_r(errno, buf, 80)); + return fd; + } + scoped_closer s(fd); + size_t count = 0; + int err; + while (count < len) + { + err = ::write(fd, &data[count], len - count); + if (err < 0) + { + logger->log("LocalStorage::putObject(): Failed to write to %s, got '%s'", c_dest, strerror_r(errno, buf, 80)); + return err; + } + count += err; + } + return 0; +} + int LocalStorage::copyObject(const string &source, const string &dest) { return copy(prefix / source, prefix / dest); @@ -82,12 +143,12 @@ void LocalStorage::deleteObject(const string &key) { boost::system::error_code err; - boost::filesystem::remove(prefix / key, err); + bf::remove(prefix / key, err); } int LocalStorage::exists(const std::string &key, bool *out) { - *out = boost::filesystem::exists(prefix / key); + *out = bf::exists(prefix / key); return 0; } diff --git a/src/LocalStorage.h b/src/LocalStorage.h index 816cf15db..99209b764 100644 --- a/src/LocalStorage.h +++ b/src/LocalStorage.h @@ -15,7 +15,9 @@ class LocalStorage : public CloudStorage virtual ~LocalStorage(); int getObject(const std::string &sourceKey, const std::string &destFile, size_t *size = NULL); + int getObject(const std::string &sourceKey, boost::shared_array &data, size_t *size = NULL); int putObject(const std::string &sourceFile, const std::string &destKey); + int putObject(const boost::shared_array data, size_t len, const std::string &destKey); void deleteObject(const std::string &key); int copyObject(const std::string &sourceKey, const std::string &destKey); int exists(const std::string &key, bool *out); @@ -24,6 +26,7 @@ class LocalStorage : public CloudStorage private: boost::filesystem::path prefix; + SMLogging *logger; int copy(const boost::filesystem::path &sourceKey, const boost::filesystem::path &destKey); }; diff --git a/src/S3Storage.cpp b/src/S3Storage.cpp index 2e83e9908..3a6eedb16 100644 --- a/src/S3Storage.cpp +++ b/src/S3Storage.cpp @@ -19,11 +19,21 @@ int S3Storage::getObject(const string &sourceKey, const string &destFile, size_t return 0; } +int S3Storage::getObject(const string &sourceKey, boost::shared_array &data, size_t *size = NULL) +{ + return 0; +} + int S3Storage::putObject(const string &sourceFile, const string &destKey) { return 0; } +int S3Storage::putObject(boost::shared_array data, uint len, const string &destKey) +{ + return 0; +} + void S3Storage::deleteObject(const string &key) { } diff --git a/src/S3Storage.h b/src/S3Storage.h index e3dc1c28a..fe9988372 100644 --- a/src/S3Storage.h +++ b/src/S3Storage.h @@ -15,7 +15,9 @@ class S3Storage : public CloudStorage virtual ~S3Storage(); int getObject(const std::string &sourceKey, const std::string &destFile, size_t *size = NULL); + int getObject(const std::string &sourceKey, boost::shared_array &data, size_t *size = NULL); int putObject(const std::string &sourceFile, const std::string &destKey); + int putObject(boost::shared_array data, uint len, const std::string &destKey); void deleteObject(const std::string &key); int copyObject(const std::string &sourceKey, const std::string &destKey); int exists(const std::string &key, bool *out); diff --git a/src/Synchronizer.cpp b/src/Synchronizer.cpp index f8d283746..944d46de1 100644 --- a/src/Synchronizer.cpp +++ b/src/Synchronizer.cpp @@ -10,6 +10,7 @@ namespace boost::mutex inst_mutex; } +namespace bf = boost::filesystem; namespace storagemanager { @@ -138,16 +139,24 @@ void Synchronizer::process(const string &key) pendingOps.erase(it); s.unlock(); - if (pending->opFlags & DELETE) - synchronizeDelete(key); - else if (pending->opFlags & JOURNAL) - synchronizerWithJournal(key, pending->opFlags & IS_FLUSH); - else if (pending->opFlags & NEW_OBJECT) - synchronize(key, pending->opFlags & IS_FLUSH); - else - // complain - ; - + try { + if (pending->opFlags & DELETE) + synchronizeDelete(key); + else if (pending->opFlags & JOURNAL) + synchronizerWithJournal(key, pending->opFlags & IS_FLUSH); + else if (pending->opFlags & NEW_OBJECT) + synchronize(key, pending->opFlags & IS_FLUSH); + else + throw logic_error("Synchronizer::process(): got an unknown op flag"); + } + catch(exception &e) { + logger->log(LOG_CRIT, "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Requeueing it.", key.c_str(), + pending->opFlags, e.what()); + s.lock(); + workQueue.push_back(key); + pendingOps[key] = pending; + return; + } if (pending->opFlags & IS_FLUSH) pending->notify(); } @@ -157,11 +166,11 @@ struct ScopedReadLock ScopedReadLock(IOCoordinator *i, string &key) { ioc = i; - ioc->getReadLock(key); + ioc->readLock(key.c_str()); } ~ScopedReadLock() - { - ioc->releaseReadLock(key); + + ioc->readUnlock(key.c_str()); } IOCoordinator *ioc; }; @@ -171,35 +180,145 @@ struct ScopedWriteLock ScopedWriteLock(IOCoordinator *i, string &key) { ioc = i; - ioc->getWriteLock(key); + ioc->writeLock(key.c_str()); + locked = true; } ~ScopedReadLock() { - ioc->releaseWriteLock(key); + if (locked) + ioc->writeUnlock(key.c_str()); + } + + void unlock() + { + if (locked) + { + ioc->writeUnlock(key.c_str()); + locked = false; + } } IOCoordinator *ioc; + bool locked; }; -void Synchronizer::synchronize(const std::string &key, bool isFlush) +void Synchronizer::synchronize(const string &key, bool isFlush) { ScopedReadLock s(ioc, key); + char buf[80]; bool exists = false; - cs->exists(key, &exists); + int err; + err = cs->exists(key, &exists); + if (err) + throw runtime_error(strerror_r(errno, buf, 80)); if (!exists) { - cs->putObject(cache->getCachePath() / key, key); + err = cs->putObject(cache->getCachePath() / key, key); + if (err) + throw runtime_error(strerror_r(errno, buf, 80)); replicator->delete(key, Replicator::NO_LOCAL); } if (isFlush) replicator->delete(key, Replicator::LOCAL_ONLY); } +void Synchronizer::synchronizeDelete(const string &key) +{ + /* Right now I think this is being told to delete key from cloud storage, + and that it has already been deleted everywhere locally. */ + cs->delete(key); +} + void Synchronizer::synchronizeWithJournal(const string &key, bool isFlush) { + // interface to Metadata TBD + //string sourceFilename = Metadata::getSourceFromKey(key); + ScopedWriteLock s(ioc, sourceFilename); + bf::path oldCachePath = cache->getCachePath() / key; + string journalName = oldCachePath.string() + ".journal"; + int err; + boost::shared_array data; + size_t count = 0, size = 0; + char buf[80]; + bool oldObjIsCached = cache->exists(key); - + // get the base object if it is not already cached + // merge it with its journal file + if (!oldObjIsCached) + { + err = cs->getObject(key, data, &size); + if (err) + throw runtime_error(string("Synchronizer: getObject() failed: ") + strerror_r(errno, buf, 80)); + err = ios->mergeJournalInMem(data, journalName, &size); + assert(!err); + } + else + data = ios->mergeJournal(oldCachePath.string(), journalName, 0, &size); + assert(data); + // get a new key for the resolved version & upload it + string newKey = ios->newKeyFromOldKey(key); + err = cs->putObject(data, size, newKey); + if (err) + throw runtime_error(string("Synchronizer: putObject() failed: ") + strerror_r(errno, buf, 80)); + + // if this isn't a flush operation.. + // write the new data to disk, + // tell the cache about the rename + // rename the file in any pending ops in Synchronizer + + if (!isFlush && oldObjIsCached) + { + // Is this the only thing outside of Replicator that writes files? + // If so move this write loop to Replicator. + bf::path newCachePath = cache->getCachePath() / newKey; + int newFD = ::open(newCachePath.string().c_str(), O_WRONLY, 0600); + if (newFD < 0) + throw runtime_error(string("Synchronizer: Failed to open a new object in local storage! Got ") + + strerror_r(errno, buf, 80)); + ScopedCloser s(newFD); + + while (count < size) + { + err = ::write(newFD, data.get(), size - count); + if (err < 0) + throw runtime_error(string("Synchronizer: Failed to write to a new object in local storage! Got ") + + strerror_r(errno, buf, 80)); + count += err; + } + + // the total size difference is the new obj size - (old obj size + journal size) + // might be wise to add things like getting a file size to a utility class. + // TBD how often we need to do it. + struct stat statbuf; + err = stat(oldCachePath.string().c_str(), &statbuf); + assert(!err); + size_t oldObjSize = statbuf.st_size; + err = stat((oldCachePath.string() + ".journal").c_str(), &statbuf); + assert(!err); + size_t journalSize = statbuf.st_size; + + cache->rename(key, newKey, size - oldObjSize - journalSize); + rename(key, newKey); + } + + // update the metadata for the source file + // waiting for stubs to see what these calls look like + /* + Metadata md(sourceFilename); + md.rename(key, newKey); + replicator->updateMetadata(sourceFilename, md); + */ + + s.unlock(); + + // delete the old object & journal file + vector files; + files.push_back(key); + files.push_back(journalName); + replicator->delete(files); + cs->delete(key); +} /* The helper objects & fcns */ diff --git a/src/Synchronizer.h b/src/Synchronizer.h index 15e27a2a4..d9182b26b 100644 --- a/src/Synchronizer.h +++ b/src/Synchronizer.h @@ -16,6 +16,7 @@ namespace storagemanager { +/* TODO: Need to think about how errors are handled / propagated */ class Synchronizer : public boost::noncopyable { public: @@ -30,6 +31,11 @@ class Synchronizer : public boost::noncopyable private: Synchronizer(); + void process(const std::string &key); + void synchronize(const std::string &key, bool isFlush); + void synchronizeDelete(const std::string &key); + void synchronizeWithJournal(const std::string &key, bool isFlush); + struct FlushListener { FlushListener(boost::mutex *m, boost::condvar *c); diff --git a/src/ThreadPool.cpp b/src/ThreadPool.cpp index 8e4d37c72..574898d4f 100644 --- a/src/ThreadPool.cpp +++ b/src/ThreadPool.cpp @@ -21,6 +21,7 @@ ThreadPool::ThreadPool(uint num_threads) : maxThreads(num_threads), die(false), pruner = boost::thread([this] { this->prune(); } ); } +// TBD: Should the default behavior be to finish the job queue or not? ThreadPool::~ThreadPool() { boost::unique_lock s(mutex);