diff --git a/src/Synchronizer.cpp b/src/Synchronizer.cpp index 7f721f6b1..76e9400dd 100644 --- a/src/Synchronizer.cpp +++ b/src/Synchronizer.cpp @@ -1,5 +1,6 @@ #include "Synchronizer.h" +#include "Metadatafile.h" #include using namespace std; @@ -61,8 +62,6 @@ enum OpFlags JOURNAL = 0x1, DELETE = 0x2, NEW_OBJECT = 0x4, - IN_PROGRESS = 0x8, - IS_FLUSH = 0x10 }; void Synchronizer::newJournalEntry(const string &key) @@ -115,29 +114,41 @@ void Synchronizer::flushObject(const string &key) void Synchronizer::makeJob(const string &key) { - boost::shared_ptr j(new Job(this, key)); + boost::shared_ptr s(new string(key)); + names.push_front(s); + + boost::shared_ptr j(new Job(this, names.begin())); threadPool.addJob(j); } -void Synchronizer::process(const string &key) +void Synchronizer::process(list::iterator &name) { + /* + check if there is a pendingOp for *it + if yes, start processing it + if no, + check if there is an ongoing op and block on it + if not, return + */ + boost::unique_lock s(mutex); - auto op = opsInProgress.find(key); - // it's already in progress - if (op != opsInProgress.end()) - { - op->second->wait(&mutex); - return; - } auto it = pendingOps.find(key); - // no work to be done on this key if (it == pendingOps.end()) - return; + { + auto op = opsInProgress.find(key); + // it's already in progress + if (op != opsInProgress.end()) + { + op->second->wait(&mutex); + return; + } + } boost::shared_ptr pending = it->second; opsInProgress[key] = *it; pendingOps.erase(it); + string sourceFile = Metadata::getSourceFilenameFromKey(*name); s.unlock(); bool success = false; @@ -145,11 +156,11 @@ void Synchronizer::process(const string &key) { try { if (pending->opFlags & DELETE) - synchronizeDelete(key); + synchronizeDelete(sourceFile, name); else if (pending->opFlags & JOURNAL) - synchronizerWithJournal(key); + synchronizerWithJournal(sourceFile, name); else if (pending->opFlags & NEW_OBJECT) - synchronize(key); + synchronize(sourceFile, name); else throw logic_error("Synchronizer::process(): got an unknown op flag"); pending->notify(&mutex); @@ -159,8 +170,14 @@ void Synchronizer::process(const string &key) logger->log(LOG_CRIT, "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Requeueing it.", key.c_str(), pending->opFlags, e.what()); sleep(1); + // TODO: requeue the job instead of looping infinitely } } + + s.lock(); + opsInProgress.erase(key); + names.erase(name); + // TBD: On a network outage or S3 outage, it might not be a bad idea to keep retrying // until the end of time. This will (?) naturally make the system unusable until the blockage // is cleared, which is what we want, right? Is there a way to nicely tell the user what @@ -207,40 +224,58 @@ struct ScopedWriteLock bool locked; }; -void Synchronizer::synchronize(const string &key) +void Synchronizer::synchronize(const string &sourceFile, list::iterator &it) { - ScopedReadLock s(ioc, key); + ScopedReadLock s(ioc, sourceFile); + string &key = *it; char buf[80]; bool exists = false; int err; + err = cs->exists(key, &exists); if (err) - throw runtime_error(strerror_r(errno, buf, 80)); + throw runtime_error(string("synchronize(): checking existence of ") + key + ", got " + + strerror_r(errno, buf, 80)); + if (exists) + return; + + // can this run after a delete op? + exists = bf::exists(cache->getCachePath() / key); if (!exists) { - err = cs->putObject(cache->getCachePath() / key, key); - if (err) - throw runtime_error(strerror_r(errno, buf, 80)); - replicator->delete(key, Replicator::NO_LOCAL); + logger->log(LOG_WARNING, "synchronize(): was told to upload %s but it does not exist locally", key.c_str()); + return; } + + err = cs->putObject(cache->getCachePath() / key, key); + if (err) + throw runtime_error(string("synchronize(): uploading ") + key + ", got " + strerror_r(errno, buf, 80)); + replicator->delete(key, Replicator::NO_LOCAL); } -void Synchronizer::synchronizeDelete(const string &key) +void Synchronizer::synchronizeDelete(const string &sourceFile, list::iterator &it) { - /* Right now I think this is being told to delete key from cloud storage, - and that it has already been deleted everywhere locally. If that's not right, - we'll have to add some sync around this. */ - cs->delete(key); + ScopedWriteLock s(ioc, sourceFile); + cs->delete(*it); } -void Synchronizer::synchronizeWithJournal(const string &key) +void Synchronizer::synchronizeWithJournal(const string &sourceFile, list::iterator &lit) { // interface to Metadata TBD - //string sourceFilename = Metadata::getSourceFromKey(key); - ScopedWriteLock s(ioc, sourceFilename); + ScopedWriteLock s(ioc, sourceFile); + + string &key = *lit; bf::path oldCachePath = cache->getCachePath() / key; string journalName = oldCachePath.string() + ".journal"; + + if (!bf::exists(journalName)) + { + logger->(LOG_WARNING, "synchronizeWithJournal(): no journal file found for %s", key.c_str()); + // I don't think this should happen, maybe throw a logic_error here + return; + } + int err; boost::shared_array data; size_t count = 0, size = 0; @@ -262,7 +297,7 @@ void Synchronizer::synchronizeWithJournal(const string &key) assert(data); // get a new key for the resolved version & upload it - string newKey = ios->newKeyFromOldKey(key); + string newKey = ioc->newKeyFromOldKey(key); err = cs->putObject(data, size, newKey); if (err) throw runtime_error(string("Synchronizer: putObject() failed: ") + strerror_r(errno, buf, 80)); @@ -292,13 +327,7 @@ void Synchronizer::synchronizeWithJournal(const string &key) count += err; } - // might be wise to add things like getting a file size to a utility class. - // TBD how often we need to do it. - struct stat statbuf; - err = stat(oldCachePath.string().c_str(), &statbuf); - assert(!err); - cache->rename(key, newKey, size - statbuf.st_size); - rename(key, newKey); + cache->rename(key, newKey, size - bf::file_size(oldCachePath)); replicator->delete(key); } @@ -309,17 +338,13 @@ void Synchronizer::synchronizeWithJournal(const string &key) md.rename(key, newKey); replicator->updateMetadata(sourceFilename, md); */ - + rename(key, newKey); ioc->renameObject(oldkey, newkey); s.unlock(); - struct stat statbuf; - err = stat(journalName.string().c_str(), &statbuf); - assert(!err); - // delete the old object & journal file + cache->deletedJournal(bf::file_size(journalName); replicator->delete(journalName); - cache->deletedJournal(statbuf.st_size); cs->delete(key); } @@ -330,8 +355,12 @@ void Synchronizer::rename(const string &oldKey, const string &newKey) auto it = pendingOps.find(oldKey); if (it == pendingOps.end()) return; - pendingOps.erase(it); pendingOps[newKey] = it->second; + pendingOps.erase(it); + + for (auto &name: objNames) + if (*name == oldKey) + *name = newKey; } /* The helper objects & fcns */ diff --git a/src/Synchronizer.h b/src/Synchronizer.h index 4ec15f938..65d020170 100644 --- a/src/Synchronizer.h +++ b/src/Synchronizer.h @@ -51,15 +51,22 @@ class Synchronizer : public boost::noncopyable struct Job : public ThreadPool::Job { - Job(Synchronizer *s, const std::string &k) : sync(s), key(k) { } - void operator()() { sync->process(key); } + Job(Synchronizer *s, std::list::iterator &i) : sync(s), it(i) { } + void operator()() { sync->process(it); } Synchronizer *sync; - std::string key; + std::list::iterator it; }; ThreadPool threadPool; std::map > pendingOps; std::map > opsInProgress; + + // this is a bit of a kludge to handle objects being renamed. Jobs can be issued + // against name1, but when the job starts running, the target may be name2. + // some consolidation should be possible between this and the two maps above, tbd. + // in general the code got kludgier b/c of renaming, needs a cleanup pass. + std::list objNames; + SMLogging *logger; Cache *cache; Replicator *replicator;