diff --git a/src/Synchronizer.cpp b/src/Synchronizer.cpp index d40205d74..d4b038589 100644 --- a/src/Synchronizer.cpp +++ b/src/Synchronizer.cpp @@ -77,7 +77,7 @@ enum OpFlags void Synchronizer::newJournalEntry(const string &key) { - boost::unique_lock s(mutex); + boost::unique_lock s(mutex); auto it = pendingOps.find(key); if (it != pendingOps.end()) @@ -91,7 +91,7 @@ void Synchronizer::newJournalEntry(const string &key) void Synchronizer::newObjects(const vector &keys) { - boost::unique_lock s(mutex); + boost::unique_lock s(mutex); for (const string &key : keys) { @@ -103,7 +103,7 @@ void Synchronizer::newObjects(const vector &keys) void Synchronizer::deletedObjects(const vector &keys) { - boost::unique_lock s(mutex); + boost::unique_lock s(mutex); for (const string &key : keys) { @@ -120,7 +120,7 @@ void Synchronizer::deletedObjects(const vector &keys) void Synchronizer::flushObject(const string &key) { - boost::unique_lock s(mutex); + boost::unique_lock s(mutex); // if there is something to do on key, it should be in the objNames list // and either in pendingOps or opsInProgress. @@ -135,7 +135,7 @@ void Synchronizer::flushObject(const string &key) { if (*name == key) { - process(name, false); + process(name); break; } } @@ -156,10 +156,10 @@ void Synchronizer::flushObject(const string &key) return; // check whether this key is in cloud storage - bool exists; + bool keyExists, journalExists; int err; do { - err = cs->exists(key.c_str(), &exists); + err = cs->exists(key.c_str(), &keyExists); if (err) { char buf[80]; @@ -167,13 +167,23 @@ void Synchronizer::flushObject(const string &key) sleep(5); } } while (err); - if (!exists) + journalExists = bf::exists(journalPath/(key + ".journal")); + + if (journalExists) + { + logger->log(LOG_DEBUG, "Sync::flushObject(): %s has a journal, " + "and there is no job for it. Merging & uploading now.", key.c_str()); + pendingOps[key] = boost::shared_ptr(new PendingOps(JOURNAL)); + objNames.push_front(key); + process(objNames.begin()); + } + else if (!keyExists) { logger->log(LOG_DEBUG, "Sync::flushObject(): %s does not exist in cloud storage, " "and there is no job for it. Uploading it now.", key.c_str()); pendingOps[key] = boost::shared_ptr(new PendingOps(NEW_OBJECT)); objNames.push_front(key); - process(objNames.begin(), true); + process(objNames.begin()); } } @@ -185,7 +195,7 @@ void Synchronizer::makeJob(const string &key) threadPool.addJob(j); } -void Synchronizer::process(list::iterator name, bool callerHoldsLock) +void Synchronizer::process(list::iterator name) { /* check if there is a pendingOp for name @@ -195,12 +205,7 @@ void Synchronizer::process(list::iterator name, bool callerHoldsLock) if not, return */ - // had to use this 'callerHoldsLock' kludge to let flush() start processing a job w/o unlocking first - // and introducing a race. It means this fcn should not use its own scoped lock. It sucks, need to rework it. - boost::unique_lock s(mutex, boost::defer_lock); - - if (!callerHoldsLock) - s.lock(); + boost::unique_lock s(mutex); string &key = *name; auto it = pendingOps.find(key); @@ -222,11 +227,7 @@ void Synchronizer::process(list::iterator name, bool callerHoldsLock) opsInProgress[key] = pending; pendingOps.erase(it); string sourceFile = MetadataFile::getSourceFromKey(*name); - - if (!callerHoldsLock) - s.unlock(); - else - mutex.unlock(); + s.unlock(); bool success = false; while (!success) @@ -237,7 +238,7 @@ void Synchronizer::process(list::iterator name, bool callerHoldsLock) // In particular, it's possible that by the time synchronize() runs, // the file to sync has already been deleted. When one of these functions // encounters a state that doesn't make sense, such as being told to upload a file - // that doesn't exist, it will return silently under the assumption that + // that doesn't exist, it will return successfully under the assumption that // things are working as they should upstream, and a syncDelete() call will be coming // shortly. if (pending->opFlags & DELETE) @@ -252,18 +253,20 @@ void Synchronizer::process(list::iterator name, bool callerHoldsLock) success = true; } catch(exception &e) { - logger->log(LOG_CRIT, "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Retrying...", key.c_str(), + logger->log(LOG_CRIT, "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Requeueing...", key.c_str(), pending->opFlags, e.what()); success = false; sleep(1); - // TODO: requeue the job instead of looping infinitely + s.lock(); + pendingOps[key] = pending; + opsInProgress.erase(key); + makeJob(key); + objNames.erase(name); + return; } } - if (!callerHoldsLock) - s.lock(); - else - mutex.lock(); + s.lock(); opsInProgress.erase(key); objNames.erase(name); @@ -429,7 +432,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list void Synchronizer::rename(const string &oldKey, const string &newKey) { - boost::unique_lock s(mutex); + boost::unique_lock s(mutex); auto it = pendingOps.find(oldKey); if (it == pendingOps.end()) @@ -458,14 +461,14 @@ Synchronizer::PendingOps::PendingOps(int flags) : opFlags(flags), finished(false { } -void Synchronizer::PendingOps::notify(boost::mutex *m) +void Synchronizer::PendingOps::notify(boost::recursive_mutex *m) { - boost::unique_lock s(*m); + boost::unique_lock s(*m); finished = true; condvar.notify_all(); } -void Synchronizer::PendingOps::wait(boost::mutex *m) +void Synchronizer::PendingOps::wait(boost::recursive_mutex *m) { while (!finished) condvar.wait(*m); diff --git a/src/Synchronizer.h b/src/Synchronizer.h index b297ae427..923840e03 100644 --- a/src/Synchronizer.h +++ b/src/Synchronizer.h @@ -39,7 +39,7 @@ class Synchronizer : public boost::noncopyable private: Synchronizer(); - void process(std::list::iterator key, bool callerHoldsLock=false); + void process(std::list::iterator key); void synchronize(const std::string &sourceFile, std::list::iterator &it); void synchronizeDelete(const std::string &sourceFile, std::list::iterator &it); void synchronizeWithJournal(const std::string &sourceFile, std::list::iterator &it); @@ -53,8 +53,8 @@ class Synchronizer : public boost::noncopyable int opFlags; bool finished; boost::condition condvar; - void wait(boost::mutex *); - void notify(boost::mutex *); + void wait(boost::recursive_mutex *); + void notify(boost::recursive_mutex *); }; struct Job : public ThreadPool::Job @@ -84,7 +84,7 @@ class Synchronizer : public boost::noncopyable boost::filesystem::path cachePath; boost::filesystem::path journalPath; - boost::mutex mutex; + boost::recursive_mutex mutex; }; }