From 62c853595f5d6023e897adc68034042a7d97e7ff Mon Sep 17 00:00:00 2001 From: Patrick LeBlanc Date: Fri, 5 Jul 2019 11:02:55 -0500 Subject: [PATCH] Changed the sync trigger to include outstanding journal size. Right now it's hardcoded at 50MB for testing. Will parameterize later. --- src/IOCoordinator.cpp | 15 ++++++++------- src/Synchronizer.cpp | 28 +++++++++++++++++++++------- src/Synchronizer.h | 9 +++++---- src/unit_tests.cpp | 4 ++-- 4 files changed, 36 insertions(+), 20 deletions(-) diff --git a/src/IOCoordinator.cpp b/src/IOCoordinator.cpp index 78b290d83..62cbcb50d 100755 --- a/src/IOCoordinator.cpp +++ b/src/IOCoordinator.cpp @@ -330,7 +330,7 @@ int IOCoordinator::_write(const char *filename, const uint8_t *data, off_t offse cache->newJournalEntry(writeLength+JOURNAL_ENTRY_HEADER_SIZE); - synchronizer->newJournalEntry(i->key); + synchronizer->newJournalEntry(i->key, writeLength+JOURNAL_ENTRY_HEADER_SIZE); count += writeLength; dataRemaining -= writeLength; } @@ -443,7 +443,7 @@ int IOCoordinator::append(const char *_filename, const uint8_t *data, size_t len cache->newJournalEntry(writeLength+JOURNAL_ENTRY_HEADER_SIZE); - synchronizer->newJournalEntry(i->key); + synchronizer->newJournalEntry(i->key, writeLength+JOURNAL_ENTRY_HEADER_SIZE); count += writeLength; dataRemaining -= writeLength; } @@ -798,7 +798,7 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2) return -1; } - vector newJournalEntries; + vector > newJournalEntries; ScopedReadLock lock(this, filename1); ScopedWriteLock lock2(this, filename2); MetadataFile meta1(metaFile1); @@ -853,8 +853,9 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2) try { bf::copy_file(journalFile, newJournalFile); - cache->newJournalEntry(bf::file_size(newJournalFile)); - newJournalEntries.push_back(newObj.key); + size_t tmp = bf::file_size(newJournalFile); + cache->newJournalEntry(tmp); + newJournalEntries.push_back(pair(newObj.key, tmp)); } catch (bf::filesystem_error &e) { @@ -872,7 +873,7 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2) cs->deleteObject(newObject.key); for (auto &jEntry : newJournalEntries) { - bf::path fullJournalPath = journalPath/(jEntry + ".journal"); + bf::path fullJournalPath = journalPath/(jEntry.first + ".journal"); cache->deletedJournal(bf::file_size(fullJournalPath)); bf::remove(fullJournalPath); } @@ -884,7 +885,7 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2) lock2.unlock(); for (auto &jEntry : newJournalEntries) - sync->newJournalEntry(jEntry); + sync->newJournalEntry(jEntry.first, jEntry.second); return 0; } diff --git a/src/Synchronizer.cpp b/src/Synchronizer.cpp index 72472d30d..2d4ac8dec 100755 --- a/src/Synchronizer.cpp +++ b/src/Synchronizer.cpp @@ -58,6 +58,7 @@ Synchronizer::Synchronizer() : maxUploads(0) cachePath = cache->getCachePath(); threadPool.setMaxThreads(maxUploads); die = false; + uncommittedJournalSize = 0; syncThread = boost::thread([this] () { this->periodicSync(); }); } @@ -82,8 +83,9 @@ enum OpFlags NEW_OBJECT = 0x4, }; -void Synchronizer::_newJournalEntry(const string &key) +void Synchronizer::_newJournalEntry(const string &key, size_t size) { + uncommittedJournalSize += size; auto it = pendingOps.find(key); if (it != pendingOps.end()) { @@ -94,17 +96,29 @@ void Synchronizer::_newJournalEntry(const string &key) pendingOps[key] = boost::shared_ptr(new PendingOps(JOURNAL)); } -void Synchronizer::newJournalEntry(const string &key) +void Synchronizer::newJournalEntry(const string &key, size_t size) { boost::unique_lock s(mutex); - _newJournalEntry(key); + _newJournalEntry(key, size); + if (uncommittedJournalSize > 50000000) + { + uncommittedJournalSize = 0; + s.unlock(); + forceFlush(); + } } -void Synchronizer::newJournalEntries(const vector &keys) +void Synchronizer::newJournalEntries(const vector > &keys) { boost::unique_lock s(mutex); - for (const string &key : keys) - _newJournalEntry(key); + for (auto &keysize : keys) + _newJournalEntry(keysize.first, keysize.second); + if (uncommittedJournalSize > 50000000) + { + uncommittedJournalSize = 0; + s.unlock(); + forceFlush(); + } } void Synchronizer::newObjects(const vector &keys) @@ -228,6 +242,7 @@ void Synchronizer::periodicSync() // threadPool.currentQueueSize() << endl; for (auto &job : pendingOps) makeJob(job.first); + uncommittedJournalSize = 0; } } @@ -235,7 +250,6 @@ void Synchronizer::forceFlush() { boost::unique_lock lock(mutex); syncThread.interrupt(); - lock.unlock(); } void Synchronizer::makeJob(const string &key) diff --git a/src/Synchronizer.h b/src/Synchronizer.h index 484fdfeba..7d43a7653 100755 --- a/src/Synchronizer.h +++ b/src/Synchronizer.h @@ -29,8 +29,8 @@ class Synchronizer : public boost::noncopyable // these take keys as parameters, not full path names, ex, pass in '12345' not // 'cache/12345'. - void newJournalEntry(const std::string &key); - void newJournalEntries(const std::vector &keys); + void newJournalEntry(const std::string &key, size_t len); + void newJournalEntries(const std::vector > &keys); void newObjects(const std::vector &keys); void deletedObjects(const std::vector &keys); void flushObject(const std::string &key); @@ -42,7 +42,7 @@ class Synchronizer : public boost::noncopyable private: Synchronizer(); - void _newJournalEntry(const std::string &key); + void _newJournalEntry(const std::string &key, size_t len); void process(std::list::iterator key); void synchronize(const std::string &sourceFile, std::list::iterator &it); void synchronizeDelete(const std::string &sourceFile, std::list::iterator &it); @@ -85,8 +85,9 @@ class Synchronizer : public boost::noncopyable // this thread will start jobs for entries in pendingOps every 10 seconds bool die; boost::thread syncThread; - const boost::chrono::seconds syncInterval = boost::chrono::seconds(1); + const boost::chrono::seconds syncInterval = boost::chrono::seconds(10); void periodicSync(); + size_t uncommittedJournalSize; SMLogging *logger; Cache *cache; diff --git a/src/unit_tests.cpp b/src/unit_tests.cpp index 3ceac0d40..b342dbb49 100755 --- a/src/unit_tests.cpp +++ b/src/unit_tests.cpp @@ -1041,7 +1041,7 @@ bool syncTest1() assert(!err); assert(exists); - sync->newJournalEntry(key); + sync->newJournalEntry(key, 0); sync->forceFlush(); sleep(1); // let it do what it does @@ -1080,7 +1080,7 @@ bool syncTest1() // make the journal again, call sync->newJournalObject() makeTestJournal((journalPath / (newKey + ".journal")).string().c_str()); cache->newJournalEntry(bf::file_size(journalPath / (newKey + ".journal"))); - sync->newJournalEntry(newKey); + sync->newJournalEntry(newKey, 0); sync->forceFlush(); sleep(1);