You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-18 13:54:11 +03:00
Changed Sync s.t. it starts processing jobs every 10 seconds instead
of every time there's an event. Big improvement in efficiency.
This commit is contained in:
@@ -57,6 +57,8 @@ Synchronizer::Synchronizer() : maxUploads(0)
|
||||
journalPath = cache->getJournalPath();
|
||||
cachePath = cache->getCachePath();
|
||||
threadPool.setMaxThreads(maxUploads);
|
||||
die = false;
|
||||
syncThread = boost::thread([this] () { this->periodicSync(); });
|
||||
}
|
||||
|
||||
Synchronizer::~Synchronizer()
|
||||
@@ -65,6 +67,11 @@ Synchronizer::~Synchronizer()
|
||||
or save the list it's working on.....
|
||||
For milestone 2, this will do the safe thing and finish working first.
|
||||
Later we can get fancy. */
|
||||
boost::unique_lock<boost::recursive_mutex> lock(mutex);
|
||||
die = true;
|
||||
syncThread.interrupt();
|
||||
lock.unlock();
|
||||
syncThread.join();
|
||||
}
|
||||
|
||||
enum OpFlags
|
||||
@@ -85,7 +92,7 @@ void Synchronizer::newJournalEntry(const string &key)
|
||||
it->second->opFlags |= JOURNAL;
|
||||
return;
|
||||
}
|
||||
makeJob(key);
|
||||
//makeJob(key);
|
||||
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(JOURNAL));
|
||||
}
|
||||
|
||||
@@ -96,7 +103,7 @@ void Synchronizer::newObjects(const vector<string> &keys)
|
||||
for (const string &key : keys)
|
||||
{
|
||||
assert(pendingOps.find(key) == pendingOps.end());
|
||||
makeJob(key);
|
||||
//makeJob(key);
|
||||
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(NEW_OBJECT));
|
||||
}
|
||||
}
|
||||
@@ -113,7 +120,7 @@ void Synchronizer::deletedObjects(const vector<string> &keys)
|
||||
it->second->opFlags |= DELETE;
|
||||
return;
|
||||
}
|
||||
makeJob(key);
|
||||
//makeJob(key);
|
||||
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(DELETE));
|
||||
}
|
||||
}
|
||||
@@ -187,6 +194,21 @@ void Synchronizer::flushObject(const string &key)
|
||||
}
|
||||
}
|
||||
|
||||
void Synchronizer::periodicSync()
|
||||
{
|
||||
boost::unique_lock<boost::recursive_mutex> lock(mutex);
|
||||
while (!die)
|
||||
{
|
||||
lock.unlock();
|
||||
boost::this_thread::sleep_for(syncInterval);
|
||||
lock.lock();
|
||||
//cout << "Sync'ing " << pendingOps.size() << " objects" << " queue size is " <<
|
||||
// threadPool.currentQueueSize() << endl;
|
||||
for (auto &job : pendingOps)
|
||||
makeJob(job.first);
|
||||
}
|
||||
}
|
||||
|
||||
void Synchronizer::makeJob(const string &key)
|
||||
{
|
||||
objNames.push_front(key);
|
||||
@@ -280,8 +302,6 @@ void Synchronizer::process(list<string>::iterator name)
|
||||
objNames.erase(name);
|
||||
}
|
||||
|
||||
|
||||
|
||||
void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator &it)
|
||||
{
|
||||
ScopedReadLock s(ioc, sourceFile);
|
||||
@@ -313,9 +333,6 @@ void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator
|
||||
|
||||
void Synchronizer::synchronizeDelete(const string &sourceFile, list<string>::iterator &it)
|
||||
{
|
||||
/* Don't think it's necessary to lock here. Sync is being told to delete a file on cloud storage.
|
||||
Presumably the caller has removed it from metadata & the cache, so it is no longer referencable.
|
||||
*/
|
||||
ScopedWriteLock s(ioc, sourceFile);
|
||||
cs->deleteObject(*it);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user