1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-13 23:02:14 +03:00

Revamped/simplified the Sync impl. Won't build yet.

This commit is contained in:
Patrick LeBlanc
2019-03-18 14:49:25 -05:00
parent 6f294b8c07
commit 6366a54bbc
3 changed files with 106 additions and 88 deletions

View File

@@ -303,8 +303,7 @@ void Cache::makeSpace(size_t size)
currentCacheSize -= statbuf.st_size;
thisMuch -= statbuf.st_size;
sync->flushObject(*it);
// Deleting the files will be done through Synchronizer->Replicator
//bf::remove(cachedFile);
replicator->delete(cachedFile, Replicator::LOCAL_ONLY);
LRU_t::iterator toRemove = it++;
lru.erase(toRemove);
m_lru.erase(*toRemove);

View File

@@ -75,8 +75,8 @@ void Synchronizer::newJournalEntry(const string &key)
it->second->opFlags |= JOURNAL;
return;
}
workQueue.push_back(key);
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(JOURNAL, workQueue.end() - 1));
makeJob(key);
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(JOURNAL));
}
void Synchronizer::newObjects(const vector<string> &keys)
@@ -86,8 +86,8 @@ void Synchronizer::newObjects(const vector<string> &keys)
for (string &key : keys)
{
assert(pendingOps.find(key) == pendingOps.end());
workQueue.push_back(key);
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(NEW_OBJECT, workQueue.end() - 1));
makeJob(key);
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(NEW_OBJECT));
}
}
@@ -95,70 +95,76 @@ void Synchronizer::deletedObjects(const vector<string> &keys)
{
boost::unique_lock<boost::mutex> s(mutex);
auto it = pendingOps.find(key);
if (it != pendingOps.end())
for (string &key : keys)
{
it->second->opFlags |= DELETE;
return;
auto it = pendingOps.find(key);
if (it != pendingOps.end())
{
it->second->opFlags |= DELETE;
return;
}
makeJob(key);
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(DELETE));
}
workQueue.push_back(key);
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(DELETE, workQueue.end() - 1));
}
void Synchronizer::flushObject(const string &key)
{
/* move the work queue entry for key to the front of the queue / create if not exists
mark the pending ops as a flush
wait for the op to finish
*/
boost::unique_lock<boost::mutex> s(mutex);
process(key);
}
auto &it = pendingOps.find(key);
if (it != pendingOps.end())
{
workQueue.splice(workQueue.begin(), workQueue, it->second.queueEntry);
it->second->opFlags |= IS_FLUSH;
it->second->wait();
}
else
{
workQueue.push_front(key);
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(IS_FLUSH, workQueue.begin()));
pendingOps[key]->wait();
}
void Synchronizer::makeJob(const string &key)
{
boost::shared_ptr<Job> j(new Job(this, key));
threadPool.addJob(j);
}
void Synchronizer::process(const string &key)
{
boost::unique_lock<boost::mutex> s(mutex);
auto op = opsInProgress.find(key);
// it's already in progress
if (op != opsInProgress.end())
{
op->second->wait(&mutex);
return;
}
auto it = pendingOps.find(key);
assert(it != pendingOps.end());
// no work to be done on this key
if (it == pendingOps.end())
return;
boost::shared_ptr<PendingOps> pending = it->second;
opsInProgress[key] = *it;
pendingOps.erase(it);
s.unlock();
try {
if (pending->opFlags & DELETE)
synchronizeDelete(key);
else if (pending->opFlags & JOURNAL)
synchronizerWithJournal(key, pending->opFlags & IS_FLUSH);
else if (pending->opFlags & NEW_OBJECT)
synchronize(key, pending->opFlags & IS_FLUSH);
else
throw logic_error("Synchronizer::process(): got an unknown op flag");
bool success = false;
while (!success)
{
try {
if (pending->opFlags & DELETE)
synchronizeDelete(key);
else if (pending->opFlags & JOURNAL)
synchronizerWithJournal(key);
else if (pending->opFlags & NEW_OBJECT)
synchronize(key);
else
throw logic_error("Synchronizer::process(): got an unknown op flag");
pending->notify(&mutex);
success = true;
}
catch(exception &e) {
logger->log(LOG_CRIT, "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Requeueing it.", key.c_str(),
pending->opFlags, e.what());
sleep(1);
}
}
catch(exception &e) {
logger->log(LOG_CRIT, "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Requeueing it.", key.c_str(),
pending->opFlags, e.what());
s.lock();
workQueue.push_back(key);
pendingOps[key] = pending;
return;
}
if (pending->opFlags & IS_FLUSH)
pending->notify();
// TBD: On a network outage or S3 outage, it might not be a bad idea to keep retrying
// until the end of time. This will (?) naturally make the system unusable until the blockage
// is cleared, which is what we want, right? Is there a way to nicely tell the user what
// is happening, or are the logs good enough?
}
struct ScopedReadLock
@@ -201,7 +207,7 @@ struct ScopedWriteLock
bool locked;
};
void Synchronizer::synchronize(const string &key, bool isFlush)
void Synchronizer::synchronize(const string &key)
{
ScopedReadLock s(ioc, key);
@@ -218,18 +224,17 @@ void Synchronizer::synchronize(const string &key, bool isFlush)
throw runtime_error(strerror_r(errno, buf, 80));
replicator->delete(key, Replicator::NO_LOCAL);
}
if (isFlush)
replicator->delete(key, Replicator::LOCAL_ONLY);
}
void Synchronizer::synchronizeDelete(const string &key)
{
/* Right now I think this is being told to delete key from cloud storage,
and that it has already been deleted everywhere locally. */
and that it has already been deleted everywhere locally. If that's not right,
we'll have to add some sync around this. */
cs->delete(key);
}
void Synchronizer::synchronizeWithJournal(const string &key, bool isFlush)
void Synchronizer::synchronizeWithJournal(const string &key)
{
// interface to Metadata TBD
//string sourceFilename = Metadata::getSourceFromKey(key);
@@ -262,12 +267,12 @@ void Synchronizer::synchronizeWithJournal(const string &key, bool isFlush)
if (err)
throw runtime_error(string("Synchronizer: putObject() failed: ") + strerror_r(errno, buf, 80));
// if this isn't a flush operation..
// if the object was cached...
// write the new data to disk,
// tell the cache about the rename
// rename the file in any pending ops in Synchronizer
if (!isFlush && oldObjIsCached)
if (oldObjIsCached)
{
// Is this the only thing outside of Replicator that writes files?
// If so move this write loop to Replicator.
@@ -287,19 +292,14 @@ void Synchronizer::synchronizeWithJournal(const string &key, bool isFlush)
count += err;
}
// the total size difference is the new obj size - (old obj size + journal size)
// might be wise to add things like getting a file size to a utility class.
// TBD how often we need to do it.
struct stat statbuf;
err = stat(oldCachePath.string().c_str(), &statbuf);
assert(!err);
size_t oldObjSize = statbuf.st_size;
err = stat((oldCachePath.string() + ".journal").c_str(), &statbuf);
assert(!err);
size_t journalSize = statbuf.st_size;
cache->rename(key, newKey, size - oldObjSize - journalSize);
cache->rename(key, newKey, size - statbuf.st_size);
rename(key, newKey);
replicator->delete(key);
}
// update the metadata for the source file
@@ -310,34 +310,51 @@ void Synchronizer::synchronizeWithJournal(const string &key, bool isFlush)
replicator->updateMetadata(sourceFilename, md);
*/
ioc->renameObject(oldkey, newkey);
s.unlock();
struct stat statbuf;
err = stat(journalName.string().c_str(), &statbuf);
assert(!err);
// delete the old object & journal file
vector<string> files;
files.push_back(key);
files.push_back(journalName);
replicator->delete(files);
replicator->delete(journalName);
cache->deletedJournal(statbuf.st_size);
cs->delete(key);
}
void Synchronizer::rename(const string &oldKey, const string &newKey)
{
boost::unique_lock<boost::mutex> s(mutex);
auto it = pendingOps.find(oldKey);
if (it == pendingOps.end())
return;
pendingOps.erase(it);
pendingOps[newKey] = it->second;
}
/* The helper objects & fcns */
Synchronizer::PendingOps(int flags, list<string>::iterator pos) : opFlags(flags), finished(false), queueEntry(pos)
Synchronizer::PendingOps(int flags) : opFlags(flags), finished(false)
{
}
Synchronizer::PendingOps::notify()
Synchronizer::~PendingOps()
{
boost::unique_lock<boost::mutex> s(mutex);
}
Synchronizer::PendingOps::notify(boost::mutex *m)
{
boost::unique_lock<boost::mutex> s(*m);
finished = true;
condvar.notify_all();
}
Synchronizer::PendingOps::wait()
Synchronizer::PendingOps::wait(boost::mutex *m)
{
while (!finished)
condvar.wait(mutex);
condvar.wait(*m);
}
}

View File

@@ -35,29 +35,31 @@ class Synchronizer : public boost::noncopyable
void synchronize(const std::string &key, bool isFlush);
void synchronizeDelete(const std::string &key);
void synchronizeWithJournal(const std::string &key, bool isFlush);
void rename(const std::string &oldkey, const std::string &newkey);
void makeJob(const std::string &key);
struct FlushListener
{
FlushListener(boost::mutex *m, boost::condvar *c);
boost::mutex *mutex;
boost::condition *condvar;
void flushed();
}
// this struct kind of got sloppy. Need to clean it at some point.
struct PendingOps
{
PendingOps(int flags, std::list<std::string>::iterator pos);
PendingOps(int flags);
int opFlags;
bool finished;
std::list<std::string>::iterator queueEntry;
boost::condition condvar;
void wait();
void notify();
void wait(boost::mutex *);
void notify(boost::mutex *);
};
struct Job : public ThreadPool::Job
{
Job(Synchronizer *s, const std::string &k) : sync(s), key(k) { }
void operator()() { sync->process(key); }
Synchronizer *sync;
std::string key;
};
ThreadPool threadPool;
std::map<std::string, boost::shared_ptr<PendingOps> > pendingOps;
std::list<std::string> workQueue;
std::map<std::string, boost::shared_ptr<PendingOps> > opsInProgress;
SMLogging *logger;
Cache *cache;
Replicator *replicator;