1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-15 12:09:09 +03:00

Changed the sync trigger to include outstanding journal size.

Right now it's hardcoded at 50MB for testing.  Will parameterize later.
This commit is contained in:
Patrick LeBlanc
2019-07-05 11:02:55 -05:00
parent aa65090a61
commit 62c853595f
4 changed files with 36 additions and 20 deletions

View File

@@ -330,7 +330,7 @@ int IOCoordinator::_write(const char *filename, const uint8_t *data, off_t offse
cache->newJournalEntry(writeLength+JOURNAL_ENTRY_HEADER_SIZE);
synchronizer->newJournalEntry(i->key);
synchronizer->newJournalEntry(i->key, writeLength+JOURNAL_ENTRY_HEADER_SIZE);
count += writeLength;
dataRemaining -= writeLength;
}
@@ -443,7 +443,7 @@ int IOCoordinator::append(const char *_filename, const uint8_t *data, size_t len
cache->newJournalEntry(writeLength+JOURNAL_ENTRY_HEADER_SIZE);
synchronizer->newJournalEntry(i->key);
synchronizer->newJournalEntry(i->key, writeLength+JOURNAL_ENTRY_HEADER_SIZE);
count += writeLength;
dataRemaining -= writeLength;
}
@@ -798,7 +798,7 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2)
return -1;
}
vector<string> newJournalEntries;
vector<pair<string, size_t> > newJournalEntries;
ScopedReadLock lock(this, filename1);
ScopedWriteLock lock2(this, filename2);
MetadataFile meta1(metaFile1);
@@ -853,8 +853,9 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2)
try
{
bf::copy_file(journalFile, newJournalFile);
cache->newJournalEntry(bf::file_size(newJournalFile));
newJournalEntries.push_back(newObj.key);
size_t tmp = bf::file_size(newJournalFile);
cache->newJournalEntry(tmp);
newJournalEntries.push_back(pair<string, size_t>(newObj.key, tmp));
}
catch (bf::filesystem_error &e)
{
@@ -872,7 +873,7 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2)
cs->deleteObject(newObject.key);
for (auto &jEntry : newJournalEntries)
{
bf::path fullJournalPath = journalPath/(jEntry + ".journal");
bf::path fullJournalPath = journalPath/(jEntry.first + ".journal");
cache->deletedJournal(bf::file_size(fullJournalPath));
bf::remove(fullJournalPath);
}
@@ -884,7 +885,7 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2)
lock2.unlock();
for (auto &jEntry : newJournalEntries)
sync->newJournalEntry(jEntry);
sync->newJournalEntry(jEntry.first, jEntry.second);
return 0;
}

View File

@@ -58,6 +58,7 @@ Synchronizer::Synchronizer() : maxUploads(0)
cachePath = cache->getCachePath();
threadPool.setMaxThreads(maxUploads);
die = false;
uncommittedJournalSize = 0;
syncThread = boost::thread([this] () { this->periodicSync(); });
}
@@ -82,8 +83,9 @@ enum OpFlags
NEW_OBJECT = 0x4,
};
void Synchronizer::_newJournalEntry(const string &key)
void Synchronizer::_newJournalEntry(const string &key, size_t size)
{
uncommittedJournalSize += size;
auto it = pendingOps.find(key);
if (it != pendingOps.end())
{
@@ -94,17 +96,29 @@ void Synchronizer::_newJournalEntry(const string &key)
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(JOURNAL));
}
void Synchronizer::newJournalEntry(const string &key)
void Synchronizer::newJournalEntry(const string &key, size_t size)
{
boost::unique_lock<boost::mutex> s(mutex);
_newJournalEntry(key);
_newJournalEntry(key, size);
if (uncommittedJournalSize > 50000000)
{
uncommittedJournalSize = 0;
s.unlock();
forceFlush();
}
}
void Synchronizer::newJournalEntries(const vector<string> &keys)
void Synchronizer::newJournalEntries(const vector<pair<string, size_t> > &keys)
{
boost::unique_lock<boost::mutex> s(mutex);
for (const string &key : keys)
_newJournalEntry(key);
for (auto &keysize : keys)
_newJournalEntry(keysize.first, keysize.second);
if (uncommittedJournalSize > 50000000)
{
uncommittedJournalSize = 0;
s.unlock();
forceFlush();
}
}
void Synchronizer::newObjects(const vector<string> &keys)
@@ -228,6 +242,7 @@ void Synchronizer::periodicSync()
// threadPool.currentQueueSize() << endl;
for (auto &job : pendingOps)
makeJob(job.first);
uncommittedJournalSize = 0;
}
}
@@ -235,7 +250,6 @@ void Synchronizer::forceFlush()
{
boost::unique_lock<boost::mutex> lock(mutex);
syncThread.interrupt();
lock.unlock();
}
void Synchronizer::makeJob(const string &key)

View File

@@ -29,8 +29,8 @@ class Synchronizer : public boost::noncopyable
// these take keys as parameters, not full path names, ex, pass in '12345' not
// 'cache/12345'.
void newJournalEntry(const std::string &key);
void newJournalEntries(const std::vector<std::string> &keys);
void newJournalEntry(const std::string &key, size_t len);
void newJournalEntries(const std::vector<std::pair<std::string, size_t> > &keys);
void newObjects(const std::vector<std::string> &keys);
void deletedObjects(const std::vector<std::string> &keys);
void flushObject(const std::string &key);
@@ -42,7 +42,7 @@ class Synchronizer : public boost::noncopyable
private:
Synchronizer();
void _newJournalEntry(const std::string &key);
void _newJournalEntry(const std::string &key, size_t len);
void process(std::list<std::string>::iterator key);
void synchronize(const std::string &sourceFile, std::list<std::string>::iterator &it);
void synchronizeDelete(const std::string &sourceFile, std::list<std::string>::iterator &it);
@@ -85,8 +85,9 @@ class Synchronizer : public boost::noncopyable
// this thread will start jobs for entries in pendingOps every 10 seconds
bool die;
boost::thread syncThread;
const boost::chrono::seconds syncInterval = boost::chrono::seconds(1);
const boost::chrono::seconds syncInterval = boost::chrono::seconds(10);
void periodicSync();
size_t uncommittedJournalSize;
SMLogging *logger;
Cache *cache;

View File

@@ -1041,7 +1041,7 @@ bool syncTest1()
assert(!err);
assert(exists);
sync->newJournalEntry(key);
sync->newJournalEntry(key, 0);
sync->forceFlush();
sleep(1); // let it do what it does
@@ -1080,7 +1080,7 @@ bool syncTest1()
// make the journal again, call sync->newJournalObject()
makeTestJournal((journalPath / (newKey + ".journal")).string().c_str());
cache->newJournalEntry(bf::file_size(journalPath / (newKey + ".journal")));
sync->newJournalEntry(newKey);
sync->newJournalEntry(newKey, 0);
sync->forceFlush();
sleep(1);