From 1ee3c760b1129d048629a050f22032d851b5c516 Mon Sep 17 00:00:00 2001 From: Patrick LeBlanc Date: Thu, 25 Apr 2019 12:07:30 -0500 Subject: [PATCH] Changed Sync s.t. it starts processing jobs every 10 seconds instead of every time there's an event. Big improvement in efficiency. --- src/Synchronizer.cpp | 33 +++++++++++++++++++++++++-------- src/Synchronizer.h | 8 ++++++++ 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/src/Synchronizer.cpp b/src/Synchronizer.cpp index 0c69c9033..41615a12b 100644 --- a/src/Synchronizer.cpp +++ b/src/Synchronizer.cpp @@ -57,6 +57,8 @@ Synchronizer::Synchronizer() : maxUploads(0) journalPath = cache->getJournalPath(); cachePath = cache->getCachePath(); threadPool.setMaxThreads(maxUploads); + die = false; + syncThread = boost::thread([this] () { this->periodicSync(); }); } Synchronizer::~Synchronizer() @@ -65,6 +67,11 @@ Synchronizer::~Synchronizer() or save the list it's working on..... For milestone 2, this will do the safe thing and finish working first. Later we can get fancy. */ + boost::unique_lock lock(mutex); + die = true; + syncThread.interrupt(); + lock.unlock(); + syncThread.join(); } enum OpFlags @@ -85,7 +92,7 @@ void Synchronizer::newJournalEntry(const string &key) it->second->opFlags |= JOURNAL; return; } - makeJob(key); + //makeJob(key); pendingOps[key] = boost::shared_ptr(new PendingOps(JOURNAL)); } @@ -96,7 +103,7 @@ void Synchronizer::newObjects(const vector &keys) for (const string &key : keys) { assert(pendingOps.find(key) == pendingOps.end()); - makeJob(key); + //makeJob(key); pendingOps[key] = boost::shared_ptr(new PendingOps(NEW_OBJECT)); } } @@ -113,7 +120,7 @@ void Synchronizer::deletedObjects(const vector &keys) it->second->opFlags |= DELETE; return; } - makeJob(key); + //makeJob(key); pendingOps[key] = boost::shared_ptr(new PendingOps(DELETE)); } } @@ -187,6 +194,21 @@ void Synchronizer::flushObject(const string &key) } } +void Synchronizer::periodicSync() +{ + boost::unique_lock lock(mutex); + while (!die) + { + lock.unlock(); + boost::this_thread::sleep_for(syncInterval); + lock.lock(); + //cout << "Sync'ing " << pendingOps.size() << " objects" << " queue size is " << + // threadPool.currentQueueSize() << endl; + for (auto &job : pendingOps) + makeJob(job.first); + } +} + void Synchronizer::makeJob(const string &key) { objNames.push_front(key); @@ -280,8 +302,6 @@ void Synchronizer::process(list::iterator name) objNames.erase(name); } - - void Synchronizer::synchronize(const string &sourceFile, list::iterator &it) { ScopedReadLock s(ioc, sourceFile); @@ -313,9 +333,6 @@ void Synchronizer::synchronize(const string &sourceFile, list::iterator void Synchronizer::synchronizeDelete(const string &sourceFile, list::iterator &it) { - /* Don't think it's necessary to lock here. Sync is being told to delete a file on cloud storage. - Presumably the caller has removed it from metadata & the cache, so it is no longer referencable. - */ ScopedWriteLock s(ioc, sourceFile); cs->deleteObject(*it); } diff --git a/src/Synchronizer.h b/src/Synchronizer.h index 923840e03..036d19512 100644 --- a/src/Synchronizer.h +++ b/src/Synchronizer.h @@ -7,6 +7,7 @@ #include #include #include +#include #include "SMLogging.h" #include "Replicator.h" @@ -76,6 +77,13 @@ class Synchronizer : public boost::noncopyable // in general the code got kludgier b/c of renaming, needs a cleanup pass. std::list objNames; + // this thread will start jobs for entries in pendingOps every 10 seconds + bool die; + boost::thread syncThread; + const boost::chrono::seconds syncInterval = boost::chrono::seconds(10); + void periodicSync(); + + SMLogging *logger; Cache *cache; Replicator *replicator;