1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-17 01:02:23 +03:00

Made Sync take into account that object names may change

beteween when a job is created and when it is run.
This commit is contained in:
Patrick LeBlanc
2019-03-19 17:24:32 -05:00
parent 1b16f05d35
commit 97d2844994
2 changed files with 85 additions and 49 deletions

View File

@@ -1,5 +1,6 @@
#include "Synchronizer.h"
#include "Metadatafile.h"
#include <boost/thread/mutex.hpp>
using namespace std;
@@ -61,8 +62,6 @@ enum OpFlags
JOURNAL = 0x1,
DELETE = 0x2,
NEW_OBJECT = 0x4,
IN_PROGRESS = 0x8,
IS_FLUSH = 0x10
};
void Synchronizer::newJournalEntry(const string &key)
@@ -115,29 +114,41 @@ void Synchronizer::flushObject(const string &key)
void Synchronizer::makeJob(const string &key)
{
boost::shared_ptr<Job> j(new Job(this, key));
boost::shared_ptr<string> s(new string(key));
names.push_front(s);
boost::shared_ptr<Job> j(new Job(this, names.begin()));
threadPool.addJob(j);
}
void Synchronizer::process(const string &key)
void Synchronizer::process(list<string>::iterator &name)
{
/*
check if there is a pendingOp for *it
if yes, start processing it
if no,
check if there is an ongoing op and block on it
if not, return
*/
boost::unique_lock<boost::mutex> s(mutex);
auto op = opsInProgress.find(key);
// it's already in progress
if (op != opsInProgress.end())
{
op->second->wait(&mutex);
return;
}
auto it = pendingOps.find(key);
// no work to be done on this key
if (it == pendingOps.end())
return;
{
auto op = opsInProgress.find(key);
// it's already in progress
if (op != opsInProgress.end())
{
op->second->wait(&mutex);
return;
}
}
boost::shared_ptr<PendingOps> pending = it->second;
opsInProgress[key] = *it;
pendingOps.erase(it);
string sourceFile = Metadata::getSourceFilenameFromKey(*name);
s.unlock();
bool success = false;
@@ -145,11 +156,11 @@ void Synchronizer::process(const string &key)
{
try {
if (pending->opFlags & DELETE)
synchronizeDelete(key);
synchronizeDelete(sourceFile, name);
else if (pending->opFlags & JOURNAL)
synchronizerWithJournal(key);
synchronizerWithJournal(sourceFile, name);
else if (pending->opFlags & NEW_OBJECT)
synchronize(key);
synchronize(sourceFile, name);
else
throw logic_error("Synchronizer::process(): got an unknown op flag");
pending->notify(&mutex);
@@ -159,8 +170,14 @@ void Synchronizer::process(const string &key)
logger->log(LOG_CRIT, "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Requeueing it.", key.c_str(),
pending->opFlags, e.what());
sleep(1);
// TODO: requeue the job instead of looping infinitely
}
}
s.lock();
opsInProgress.erase(key);
names.erase(name);
// TBD: On a network outage or S3 outage, it might not be a bad idea to keep retrying
// until the end of time. This will (?) naturally make the system unusable until the blockage
// is cleared, which is what we want, right? Is there a way to nicely tell the user what
@@ -207,40 +224,58 @@ struct ScopedWriteLock
bool locked;
};
void Synchronizer::synchronize(const string &key)
void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator &it)
{
ScopedReadLock s(ioc, key);
ScopedReadLock s(ioc, sourceFile);
string &key = *it;
char buf[80];
bool exists = false;
int err;
err = cs->exists(key, &exists);
if (err)
throw runtime_error(strerror_r(errno, buf, 80));
throw runtime_error(string("synchronize(): checking existence of ") + key + ", got " +
strerror_r(errno, buf, 80));
if (exists)
return;
// can this run after a delete op?
exists = bf::exists(cache->getCachePath() / key);
if (!exists)
{
err = cs->putObject(cache->getCachePath() / key, key);
if (err)
throw runtime_error(strerror_r(errno, buf, 80));
replicator->delete(key, Replicator::NO_LOCAL);
logger->log(LOG_WARNING, "synchronize(): was told to upload %s but it does not exist locally", key.c_str());
return;
}
err = cs->putObject(cache->getCachePath() / key, key);
if (err)
throw runtime_error(string("synchronize(): uploading ") + key + ", got " + strerror_r(errno, buf, 80));
replicator->delete(key, Replicator::NO_LOCAL);
}
void Synchronizer::synchronizeDelete(const string &key)
void Synchronizer::synchronizeDelete(const string &sourceFile, list<string>::iterator &it)
{
/* Right now I think this is being told to delete key from cloud storage,
and that it has already been deleted everywhere locally. If that's not right,
we'll have to add some sync around this. */
cs->delete(key);
ScopedWriteLock s(ioc, sourceFile);
cs->delete(*it);
}
void Synchronizer::synchronizeWithJournal(const string &key)
void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>::iterator &lit)
{
// interface to Metadata TBD
//string sourceFilename = Metadata::getSourceFromKey(key);
ScopedWriteLock s(ioc, sourceFilename);
ScopedWriteLock s(ioc, sourceFile);
string &key = *lit;
bf::path oldCachePath = cache->getCachePath() / key;
string journalName = oldCachePath.string() + ".journal";
if (!bf::exists(journalName))
{
logger->(LOG_WARNING, "synchronizeWithJournal(): no journal file found for %s", key.c_str());
// I don't think this should happen, maybe throw a logic_error here
return;
}
int err;
boost::shared_array<uint8_t> data;
size_t count = 0, size = 0;
@@ -262,7 +297,7 @@ void Synchronizer::synchronizeWithJournal(const string &key)
assert(data);
// get a new key for the resolved version & upload it
string newKey = ios->newKeyFromOldKey(key);
string newKey = ioc->newKeyFromOldKey(key);
err = cs->putObject(data, size, newKey);
if (err)
throw runtime_error(string("Synchronizer: putObject() failed: ") + strerror_r(errno, buf, 80));
@@ -292,13 +327,7 @@ void Synchronizer::synchronizeWithJournal(const string &key)
count += err;
}
// might be wise to add things like getting a file size to a utility class.
// TBD how often we need to do it.
struct stat statbuf;
err = stat(oldCachePath.string().c_str(), &statbuf);
assert(!err);
cache->rename(key, newKey, size - statbuf.st_size);
rename(key, newKey);
cache->rename(key, newKey, size - bf::file_size(oldCachePath));
replicator->delete(key);
}
@@ -309,17 +338,13 @@ void Synchronizer::synchronizeWithJournal(const string &key)
md.rename(key, newKey);
replicator->updateMetadata(sourceFilename, md);
*/
rename(key, newKey);
ioc->renameObject(oldkey, newkey);
s.unlock();
struct stat statbuf;
err = stat(journalName.string().c_str(), &statbuf);
assert(!err);
// delete the old object & journal file
cache->deletedJournal(bf::file_size(journalName);
replicator->delete(journalName);
cache->deletedJournal(statbuf.st_size);
cs->delete(key);
}
@@ -330,8 +355,12 @@ void Synchronizer::rename(const string &oldKey, const string &newKey)
auto it = pendingOps.find(oldKey);
if (it == pendingOps.end())
return;
pendingOps.erase(it);
pendingOps[newKey] = it->second;
pendingOps.erase(it);
for (auto &name: objNames)
if (*name == oldKey)
*name = newKey;
}
/* The helper objects & fcns */