diff --git a/src/Synchronizer.cpp b/src/Synchronizer.cpp index 8f59302b7..99021821b 100644 --- a/src/Synchronizer.cpp +++ b/src/Synchronizer.cpp @@ -150,8 +150,10 @@ void Synchronizer::flushObject(const string &key) { auto op = opsInProgress.find(key); // it's already in progress - if (op != opsInProgress.end()) - op->second->wait(&mutex); + if (op != opsInProgress.end()) { + boost::shared_ptr tmp = op->second; + tmp->wait(&mutex); + } else { // it's not in either one, trigger existence check @@ -237,7 +239,8 @@ void Synchronizer::process(list::iterator name) // it's already in progress if (op != opsInProgress.end()) { - op->second->wait(&mutex); + boost::shared_ptr tmp = op->second; + tmp->wait(&mutex); return; } else @@ -246,9 +249,12 @@ void Synchronizer::process(list::iterator name) } boost::shared_ptr pending = it->second; - opsInProgress[key] = pending; - pendingOps.erase(it); + bool inserted = opsInProgress.insert(*it).second; + if (!inserted) + return; // the one in pending will have to wait until the next time to avoid clobbering waiting threads + //opsInProgress[key] = it->second; string sourceFile = MetadataFile::getSourceFromKey(*name); + pendingOps.erase(it); s.unlock(); bool success = false; @@ -271,11 +277,12 @@ void Synchronizer::process(list::iterator name) synchronize(sourceFile, name); else throw logic_error("Synchronizer::process(): got an unknown op flag"); - pending->notify(&mutex); + s.lock(); + pending->notify(); success = true; } catch(exception &e) { - logger->log(LOG_CRIT, "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Requeueing...", key.c_str(), + logger->log(LOG_CRIT, "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Retrying...", key.c_str(), pending->opFlags, e.what()); success = false; sleep(1); @@ -295,8 +302,6 @@ void Synchronizer::process(list::iterator name) return; } } - - s.lock(); opsInProgress.erase(key); objNames.erase(name); @@ -503,13 +508,17 @@ bf::path Synchronizer::getCachePath() /* The helper objects & fcns */ -Synchronizer::PendingOps::PendingOps(int flags) : opFlags(flags), finished(false) +Synchronizer::PendingOps::PendingOps(int flags) : opFlags(flags), waiters(0), finished(false) { } -void Synchronizer::PendingOps::notify(boost::recursive_mutex *m) +Synchronizer::PendingOps::~PendingOps() +{ + assert(waiters == 0); +} + +void Synchronizer::PendingOps::notify() { - boost::unique_lock s(*m); finished = true; condvar.notify_all(); } @@ -517,7 +526,11 @@ void Synchronizer::PendingOps::notify(boost::recursive_mutex *m) void Synchronizer::PendingOps::wait(boost::recursive_mutex *m) { while (!finished) + { + waiters++; condvar.wait(*m); + waiters--; + } } } diff --git a/src/Synchronizer.h b/src/Synchronizer.h index 036d19512..2b8d0994a 100644 --- a/src/Synchronizer.h +++ b/src/Synchronizer.h @@ -51,11 +51,13 @@ class Synchronizer : public boost::noncopyable struct PendingOps { PendingOps(int flags); + ~PendingOps(); int opFlags; + int waiters; bool finished; boost::condition condvar; void wait(boost::recursive_mutex *); - void notify(boost::recursive_mutex *); + void notify(); }; struct Job : public ThreadPool::Job