diff --git a/src/Cache.cpp b/src/Cache.cpp index c782ad5c9..41ff14493 100644 --- a/src/Cache.cpp +++ b/src/Cache.cpp @@ -303,8 +303,7 @@ void Cache::makeSpace(size_t size) currentCacheSize -= statbuf.st_size; thisMuch -= statbuf.st_size; sync->flushObject(*it); - // Deleting the files will be done through Synchronizer->Replicator - //bf::remove(cachedFile); + replicator->delete(cachedFile, Replicator::LOCAL_ONLY); LRU_t::iterator toRemove = it++; lru.erase(toRemove); m_lru.erase(*toRemove); diff --git a/src/Synchronizer.cpp b/src/Synchronizer.cpp index 944d46de1..7f721f6b1 100644 --- a/src/Synchronizer.cpp +++ b/src/Synchronizer.cpp @@ -75,8 +75,8 @@ void Synchronizer::newJournalEntry(const string &key) it->second->opFlags |= JOURNAL; return; } - workQueue.push_back(key); - pendingOps[key] = boost::shared_ptr(new PendingOps(JOURNAL, workQueue.end() - 1)); + makeJob(key); + pendingOps[key] = boost::shared_ptr(new PendingOps(JOURNAL)); } void Synchronizer::newObjects(const vector &keys) @@ -86,8 +86,8 @@ void Synchronizer::newObjects(const vector &keys) for (string &key : keys) { assert(pendingOps.find(key) == pendingOps.end()); - workQueue.push_back(key); - pendingOps[key] = boost::shared_ptr(new PendingOps(NEW_OBJECT, workQueue.end() - 1)); + makeJob(key); + pendingOps[key] = boost::shared_ptr(new PendingOps(NEW_OBJECT)); } } @@ -95,70 +95,76 @@ void Synchronizer::deletedObjects(const vector &keys) { boost::unique_lock s(mutex); - auto it = pendingOps.find(key); - if (it != pendingOps.end()) + for (string &key : keys) { - it->second->opFlags |= DELETE; - return; + auto it = pendingOps.find(key); + if (it != pendingOps.end()) + { + it->second->opFlags |= DELETE; + return; + } + makeJob(key); + pendingOps[key] = boost::shared_ptr(new PendingOps(DELETE)); } - workQueue.push_back(key); - pendingOps[key] = boost::shared_ptr(new PendingOps(DELETE, workQueue.end() - 1)); - } void Synchronizer::flushObject(const string &key) { - /* move the work queue entry for key to the front of the queue / create if not exists - mark the pending ops as a flush - wait for the op to finish - */ - boost::unique_lock s(mutex); - - auto &it = pendingOps.find(key); - if (it != pendingOps.end()) - { - workQueue.splice(workQueue.begin(), workQueue, it->second.queueEntry); - it->second->opFlags |= IS_FLUSH; - it->second->wait(); - } - else - { - workQueue.push_front(key); - pendingOps[key] = boost::shared_ptr(new PendingOps(IS_FLUSH, workQueue.begin())); - pendingOps[key]->wait(); - } + process(key); +} + +void Synchronizer::makeJob(const string &key) +{ + boost::shared_ptr j(new Job(this, key)); + threadPool.addJob(j); } void Synchronizer::process(const string &key) { boost::unique_lock s(mutex); + auto op = opsInProgress.find(key); + // it's already in progress + if (op != opsInProgress.end()) + { + op->second->wait(&mutex); + return; + } auto it = pendingOps.find(key); - assert(it != pendingOps.end()); + // no work to be done on this key + if (it == pendingOps.end()) + return; + boost::shared_ptr pending = it->second; + opsInProgress[key] = *it; pendingOps.erase(it); s.unlock(); - try { - if (pending->opFlags & DELETE) - synchronizeDelete(key); - else if (pending->opFlags & JOURNAL) - synchronizerWithJournal(key, pending->opFlags & IS_FLUSH); - else if (pending->opFlags & NEW_OBJECT) - synchronize(key, pending->opFlags & IS_FLUSH); - else - throw logic_error("Synchronizer::process(): got an unknown op flag"); + bool success = false; + while (!success) + { + try { + if (pending->opFlags & DELETE) + synchronizeDelete(key); + else if (pending->opFlags & JOURNAL) + synchronizerWithJournal(key); + else if (pending->opFlags & NEW_OBJECT) + synchronize(key); + else + throw logic_error("Synchronizer::process(): got an unknown op flag"); + pending->notify(&mutex); + success = true; + } + catch(exception &e) { + logger->log(LOG_CRIT, "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Requeueing it.", key.c_str(), + pending->opFlags, e.what()); + sleep(1); + } } - catch(exception &e) { - logger->log(LOG_CRIT, "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Requeueing it.", key.c_str(), - pending->opFlags, e.what()); - s.lock(); - workQueue.push_back(key); - pendingOps[key] = pending; - return; - } - if (pending->opFlags & IS_FLUSH) - pending->notify(); + // TBD: On a network outage or S3 outage, it might not be a bad idea to keep retrying + // until the end of time. This will (?) naturally make the system unusable until the blockage + // is cleared, which is what we want, right? Is there a way to nicely tell the user what + // is happening, or are the logs good enough? } struct ScopedReadLock @@ -201,7 +207,7 @@ struct ScopedWriteLock bool locked; }; -void Synchronizer::synchronize(const string &key, bool isFlush) +void Synchronizer::synchronize(const string &key) { ScopedReadLock s(ioc, key); @@ -218,18 +224,17 @@ void Synchronizer::synchronize(const string &key, bool isFlush) throw runtime_error(strerror_r(errno, buf, 80)); replicator->delete(key, Replicator::NO_LOCAL); } - if (isFlush) - replicator->delete(key, Replicator::LOCAL_ONLY); } void Synchronizer::synchronizeDelete(const string &key) { /* Right now I think this is being told to delete key from cloud storage, - and that it has already been deleted everywhere locally. */ + and that it has already been deleted everywhere locally. If that's not right, + we'll have to add some sync around this. */ cs->delete(key); } -void Synchronizer::synchronizeWithJournal(const string &key, bool isFlush) +void Synchronizer::synchronizeWithJournal(const string &key) { // interface to Metadata TBD //string sourceFilename = Metadata::getSourceFromKey(key); @@ -262,12 +267,12 @@ void Synchronizer::synchronizeWithJournal(const string &key, bool isFlush) if (err) throw runtime_error(string("Synchronizer: putObject() failed: ") + strerror_r(errno, buf, 80)); - // if this isn't a flush operation.. + // if the object was cached... // write the new data to disk, // tell the cache about the rename // rename the file in any pending ops in Synchronizer - if (!isFlush && oldObjIsCached) + if (oldObjIsCached) { // Is this the only thing outside of Replicator that writes files? // If so move this write loop to Replicator. @@ -287,19 +292,14 @@ void Synchronizer::synchronizeWithJournal(const string &key, bool isFlush) count += err; } - // the total size difference is the new obj size - (old obj size + journal size) // might be wise to add things like getting a file size to a utility class. // TBD how often we need to do it. struct stat statbuf; err = stat(oldCachePath.string().c_str(), &statbuf); assert(!err); - size_t oldObjSize = statbuf.st_size; - err = stat((oldCachePath.string() + ".journal").c_str(), &statbuf); - assert(!err); - size_t journalSize = statbuf.st_size; - - cache->rename(key, newKey, size - oldObjSize - journalSize); + cache->rename(key, newKey, size - statbuf.st_size); rename(key, newKey); + replicator->delete(key); } // update the metadata for the source file @@ -310,34 +310,51 @@ void Synchronizer::synchronizeWithJournal(const string &key, bool isFlush) replicator->updateMetadata(sourceFilename, md); */ + ioc->renameObject(oldkey, newkey); s.unlock(); + struct stat statbuf; + err = stat(journalName.string().c_str(), &statbuf); + assert(!err); + // delete the old object & journal file - vector files; - files.push_back(key); - files.push_back(journalName); - replicator->delete(files); + replicator->delete(journalName); + cache->deletedJournal(statbuf.st_size); cs->delete(key); } +void Synchronizer::rename(const string &oldKey, const string &newKey) +{ + boost::unique_lock s(mutex); + + auto it = pendingOps.find(oldKey); + if (it == pendingOps.end()) + return; + pendingOps.erase(it); + pendingOps[newKey] = it->second; +} /* The helper objects & fcns */ -Synchronizer::PendingOps(int flags, list::iterator pos) : opFlags(flags), finished(false), queueEntry(pos) +Synchronizer::PendingOps(int flags) : opFlags(flags), finished(false) { } -Synchronizer::PendingOps::notify() +Synchronizer::~PendingOps() { - boost::unique_lock s(mutex); +} + +Synchronizer::PendingOps::notify(boost::mutex *m) +{ + boost::unique_lock s(*m); finished = true; condvar.notify_all(); } -Synchronizer::PendingOps::wait() +Synchronizer::PendingOps::wait(boost::mutex *m) { while (!finished) - condvar.wait(mutex); + condvar.wait(*m); } } diff --git a/src/Synchronizer.h b/src/Synchronizer.h index d9182b26b..4ec15f938 100644 --- a/src/Synchronizer.h +++ b/src/Synchronizer.h @@ -35,29 +35,31 @@ class Synchronizer : public boost::noncopyable void synchronize(const std::string &key, bool isFlush); void synchronizeDelete(const std::string &key); void synchronizeWithJournal(const std::string &key, bool isFlush); + void rename(const std::string &oldkey, const std::string &newkey); + void makeJob(const std::string &key); - struct FlushListener - { - FlushListener(boost::mutex *m, boost::condvar *c); - boost::mutex *mutex; - boost::condition *condvar; - void flushed(); - } - + // this struct kind of got sloppy. Need to clean it at some point. struct PendingOps { - PendingOps(int flags, std::list::iterator pos); + PendingOps(int flags); int opFlags; bool finished; - std::list::iterator queueEntry; boost::condition condvar; - void wait(); - void notify(); + void wait(boost::mutex *); + void notify(boost::mutex *); + }; + + struct Job : public ThreadPool::Job + { + Job(Synchronizer *s, const std::string &k) : sync(s), key(k) { } + void operator()() { sync->process(key); } + Synchronizer *sync; + std::string key; }; ThreadPool threadPool; std::map > pendingOps; - std::list workQueue; + std::map > opsInProgress; SMLogging *logger; Cache *cache; Replicator *replicator;