diff --git a/src/Cache.cpp b/src/Cache.cpp index 122f4d087..84f248dba 100644 --- a/src/Cache.cpp +++ b/src/Cache.cpp @@ -203,6 +203,19 @@ void Cache::newObject(const string &key, size_t size) currentCacheSize += size; } +void Cache::newJournalEntry(size_t size) +{ + boost::unique_lock s(lru_mutex); + makeSpace(size); + currentCacheSize += size; +} + +void Cache::deletedJournal(size_t size) +{ + boost::unique_lock s(lru_mutex); + currentCacheSize -= size; +} + void Cache::deletedObject(const string &key, size_t size) { boost::unique_lock s(lru_mutex); @@ -257,7 +270,8 @@ void Cache::makeSpace(size_t size) currentCacheSize -= statbuf.st_size; thisMuch -= statbuf.st_size; sync->flushObject(*it); - boost::filesystem::remove(cachedFile); + // Deleting the files will be done through Synchronizer->Replicator + //boost::filesystem::remove(cachedFile); LRU_t::iterator toRemove = it++; lru.erase(toRemove); m_lru.erase(*toRemove); diff --git a/src/Cache.h b/src/Cache.h index 44efea1d2..596c7021b 100644 --- a/src/Cache.h +++ b/src/Cache.h @@ -26,7 +26,9 @@ class Cache : public boost::noncopyable void read(const std::vector &keys); void exists(const std::vector &keys, std::vector *out); void newObject(const std::string &key, size_t size); + void newJournalEntry(size_t size); void deletedObject(const std::string &key, size_t size); + void deletedJournal(size_t size); void setMaxCacheSize(size_t size); size_t getCurrentCacheSize() const; diff --git a/src/CloudStorage.h b/src/CloudStorage.h index 3bd9296de..070b38a37 100644 --- a/src/CloudStorage.h +++ b/src/CloudStorage.h @@ -10,11 +10,13 @@ namespace storagemanager class CloudStorage { public: + /* These behave like syscalls. return code -1 means an error, and errno is set */ virtual int getObject(const std::string &sourceKey, const std::string &destFile, size_t *size = NULL) = 0; virtual int putObject(const std::string &sourceFile, const std::string &destKey) = 0; virtual void deleteObject(const std::string &key) = 0; virtual int copyObject(const std::string &sourceKey, const std::string &destKey) = 0; + virtual int exists(const std::string &key, bool *out) = 0; // this will return a CloudStorage instance of the type specified in StorageManager.cnf static CloudStorage *get(); diff --git a/src/Downloader.cpp b/src/Downloader.cpp index 32158fa46..94ce1ff48 100644 --- a/src/Downloader.cpp +++ b/src/Downloader.cpp @@ -23,7 +23,7 @@ Downloader::Downloader() : maxDownloads(0) } if (maxDownloads == 0) maxDownloads = 20; - workers.reset(new ThreadPool(maxDownloads)); + workers.setMaxThreads(maxDownloads); logger = SMLogging::get(); } @@ -64,7 +64,7 @@ int Downloader::download(const vector &keys, vector *errnos if (inserted[i]) { dl->listeners.push_back(&listener); - workers->addJob(dl); + workers.addJob(dl); } else { diff --git a/src/Downloader.h b/src/Downloader.h index 411ecab8f..0a1a790a0 100644 --- a/src/Downloader.h +++ b/src/Downloader.h @@ -70,7 +70,7 @@ class Downloader Downloads_t downloads; boost::mutex download_mutex; boost::mutex &getDownloadMutex(); - boost::scoped_ptr workers; + ThreadPool workers; CloudStorage *storage; SMLogging *logger; }; diff --git a/src/LocalStorage.cpp b/src/LocalStorage.cpp index 1b8377cd6..b6a4a9910 100644 --- a/src/LocalStorage.cpp +++ b/src/LocalStorage.cpp @@ -60,7 +60,7 @@ path operator+(const path &p1, const path &p2) int LocalStorage::getObject(const string &source, const string &dest, size_t *size) { - int ret = copy(prefix + source, dest); + int ret = copy(prefix / source, dest); if (ret) return ret; if (size) @@ -70,19 +70,25 @@ int LocalStorage::getObject(const string &source, const string &dest, size_t *si int LocalStorage::putObject(const string &source, const string &dest) { - return copy(source, prefix + dest); + return copy(source, prefix / dest); } int LocalStorage::copyObject(const string &source, const string &dest) { - return copy(prefix + source, prefix + dest); + return copy(prefix / source, prefix / dest); } void LocalStorage::deleteObject(const string &key) { boost::system::error_code err; - boost::filesystem::remove(prefix + key, err); + boost::filesystem::remove(prefix / key, err); +} + +int LocalStorage::exists(const std::string &key, bool *out) +{ + *out = boost::filesystem::exists(prefix / key); + return 0; } } diff --git a/src/LocalStorage.h b/src/LocalStorage.h index b129a06df..816cf15db 100644 --- a/src/LocalStorage.h +++ b/src/LocalStorage.h @@ -18,6 +18,7 @@ class LocalStorage : public CloudStorage int putObject(const std::string &sourceFile, const std::string &destKey); void deleteObject(const std::string &key); int copyObject(const std::string &sourceKey, const std::string &destKey); + int exists(const std::string &key, bool *out); const boost::filesystem::path & getPrefix() const; diff --git a/src/S3Storage.cpp b/src/S3Storage.cpp index 86fd2370a..2e83e9908 100644 --- a/src/S3Storage.cpp +++ b/src/S3Storage.cpp @@ -33,4 +33,9 @@ int S3Storage::copyObject(const string &sourceKey, const string &destKey) return 0; } +int S3Storage::exists(const string &key, bool *out) +{ + return 0; +} + } diff --git a/src/S3Storage.h b/src/S3Storage.h index e91b649cb..e3dc1c28a 100644 --- a/src/S3Storage.h +++ b/src/S3Storage.h @@ -18,6 +18,7 @@ class S3Storage : public CloudStorage int putObject(const std::string &sourceFile, const std::string &destKey); void deleteObject(const std::string &key); int copyObject(const std::string &sourceKey, const std::string &destKey); + int exists(const std::string &key, bool *out); private: diff --git a/src/Synchronizer.cpp b/src/Synchronizer.cpp index aa850cbd6..f8d283746 100644 --- a/src/Synchronizer.cpp +++ b/src/Synchronizer.cpp @@ -10,7 +10,6 @@ namespace boost::mutex inst_mutex; } - namespace storagemanager { @@ -25,16 +24,201 @@ Synchronizer * Synchronizer::get() return instance; } -Synchronizer::Synchronizer() +Synchronizer::Synchronizer() : maxUploads(0) { + Config *config = Config::get(); + logger = SMLogging::get(); + cache = Cache::get(); + replicator = Replicator::get(); + ioc = IOCoordinator::get(); + + string stmp = config->getValue("ObjectStorage", "max_concurrent_uploads") + try + { + maxUploads = stoul(stmp); + } + catch(invalid_argument) + { + logger->log(LOG_WARNING, "Downloader: Invalid arg for ObjectStorage/max_concurrent_uploads, using default of 20"); + } + if (maxUploads == 0) + maxUploads = 20; + threadPool.setMaxThreads(maxUploads); } Synchronizer::~Synchronizer() { + /* should this wait until all pending work is done, + or save the list it's working on..... + For milestone 2, this will do the safe thing and finish working first. + Later we can get fancy. */ +} + +enum OpFlags +{ + NOOP = 0, + JOURNAL = 0x1, + DELETE = 0x2, + NEW_OBJECT = 0x4, + IN_PROGRESS = 0x8, + IS_FLUSH = 0x10 +}; + +void Synchronizer::newJournalEntry(const string &key) +{ + boost::unique_lock s(mutex); + + auto it = pendingOps.find(key); + if (it != pendingOps.end()) + { + it->second->opFlags |= JOURNAL; + return; + } + workQueue.push_back(key); + pendingOps[key] = boost::shared_ptr(new PendingOps(JOURNAL, workQueue.end() - 1)); +} + +void Synchronizer::newObjects(const vector &keys) +{ + boost::unique_lock s(mutex); + + for (string &key : keys) + { + assert(pendingOps.find(key) == pendingOps.end()); + workQueue.push_back(key); + pendingOps[key] = boost::shared_ptr(new PendingOps(NEW_OBJECT, workQueue.end() - 1)); + } +} + +void Synchronizer::deletedObjects(const vector &keys) +{ + boost::unique_lock s(mutex); + + auto it = pendingOps.find(key); + if (it != pendingOps.end()) + { + it->second->opFlags |= DELETE; + return; + } + workQueue.push_back(key); + pendingOps[key] = boost::shared_ptr(new PendingOps(DELETE, workQueue.end() - 1)); + } void Synchronizer::flushObject(const string &key) { + /* move the work queue entry for key to the front of the queue / create if not exists + mark the pending ops as a flush + wait for the op to finish + */ + boost::unique_lock s(mutex); + + auto &it = pendingOps.find(key); + if (it != pendingOps.end()) + { + workQueue.splice(workQueue.begin(), workQueue, it->second.queueEntry); + it->second->opFlags |= IS_FLUSH; + it->second->wait(); + } + else + { + workQueue.push_front(key); + pendingOps[key] = boost::shared_ptr(new PendingOps(IS_FLUSH, workQueue.begin())); + pendingOps[key]->wait(); + } +} + +void Synchronizer::process(const string &key) +{ + boost::unique_lock s(mutex); + + auto it = pendingOps.find(key); + assert(it != pendingOps.end()); + boost::shared_ptr pending = it->second; + pendingOps.erase(it); + s.unlock(); + + if (pending->opFlags & DELETE) + synchronizeDelete(key); + else if (pending->opFlags & JOURNAL) + synchronizerWithJournal(key, pending->opFlags & IS_FLUSH); + else if (pending->opFlags & NEW_OBJECT) + synchronize(key, pending->opFlags & IS_FLUSH); + else + // complain + ; + + if (pending->opFlags & IS_FLUSH) + pending->notify(); +} + +struct ScopedReadLock +{ + ScopedReadLock(IOCoordinator *i, string &key) + { + ioc = i; + ioc->getReadLock(key); + } + ~ScopedReadLock() + { + ioc->releaseReadLock(key); + } + IOCoordinator *ioc; +}; + +struct ScopedWriteLock +{ + ScopedWriteLock(IOCoordinator *i, string &key) + { + ioc = i; + ioc->getWriteLock(key); + } + ~ScopedReadLock() + { + ioc->releaseWriteLock(key); + } + IOCoordinator *ioc; +}; + +void Synchronizer::synchronize(const std::string &key, bool isFlush) +{ + ScopedReadLock s(ioc, key); + + bool exists = false; + cs->exists(key, &exists); + if (!exists) + { + cs->putObject(cache->getCachePath() / key, key); + replicator->delete(key, Replicator::NO_LOCAL); + } + if (isFlush) + replicator->delete(key, Replicator::LOCAL_ONLY); +} + +void Synchronizer::synchronizeWithJournal(const string &key, bool isFlush) +{ + + + + + +/* The helper objects & fcns */ + +Synchronizer::PendingOps(int flags, list::iterator pos) : opFlags(flags), finished(false), queueEntry(pos) +{ +} + +Synchronizer::PendingOps::notify() +{ + boost::unique_lock s(mutex); + finished = true; + condvar.notify_all(); +} + +Synchronizer::PendingOps::wait() +{ + while (!finished) + condvar.wait(mutex); } } diff --git a/src/Synchronizer.h b/src/Synchronizer.h index bd354da49..15e27a2a4 100644 --- a/src/Synchronizer.h +++ b/src/Synchronizer.h @@ -3,8 +3,16 @@ #define SYNCHRONIZER_H_ #include +#include +#include #include +#include "SMLogging.h" +#include "Cache.h" +#include "Replicator.h" +#include "IOCoordinator.h" +#include "ThreadPool.h" + namespace storagemanager { @@ -14,10 +22,41 @@ class Synchronizer : public boost::noncopyable static Synchronizer *get(); virtual ~Synchronizer(); + void newJournalEntry(const std::string &key); + void newObjects(const std::vector &keys); + void deletedObjects(const std::vector &keys); void flushObject(const std::string &key); - + private: Synchronizer(); + + struct FlushListener + { + FlushListener(boost::mutex *m, boost::condvar *c); + boost::mutex *mutex; + boost::condition *condvar; + void flushed(); + } + + struct PendingOps + { + PendingOps(int flags, std::list::iterator pos); + int opFlags; + bool finished; + std::list::iterator queueEntry; + boost::condition condvar; + void wait(); + void notify(); + }; + + ThreadPool threadPool; + std::map > pendingOps; + std::list workQueue; + SMLogging *logger; + Cache *cache; + Replicator *replicator; + IOCoordinator *ioc; + boost::mutex mutex; }; } diff --git a/storagemanager.cnf b/storagemanager.cnf index d09f205c6..c77d3848b 100644 --- a/storagemanager.cnf +++ b/storagemanager.cnf @@ -3,6 +3,8 @@ service = S3 object_size = 5M metadata_path = ${HOME}/storagemanager/metadata max_concurrent_downloads = 20 +max_concurrent_uploads = 20 + [S3] region = us-west-1