1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-17 01:02:23 +03:00

Got a pretty decent unit test for Sync working.

This commit is contained in:
Patrick LeBlanc
2019-03-22 12:04:36 -05:00
parent 323fd18953
commit 4ff769ab24
13 changed files with 184 additions and 77 deletions

View File

@@ -102,22 +102,7 @@ Synchronizer::Synchronizer() : maxUploads(0)
if (maxUploads == 0)
maxUploads = 20;
stmp = config->getValue("ObjectStorage", "journal_path");
if (stmp.empty())
{
logger->log(LOG_CRIT, "ObjectStorage/journal_path is not set");
throw runtime_error("Please set ObjectStorage/journal_path in the storagemanager.cnf file");
}
try
{
journalPath = stmp;
bf::create_directories(journalPath);
}
catch (exception &e)
{
logger->log(LOG_CRIT, "Failed to create %s, got: %s", stmp.c_str(), e.what());
throw e;
}
journalPath = cache->getJournalPath();
cachePath = cache->getCachePath();
threadPool.setMaxThreads(maxUploads);
}
@@ -187,18 +172,21 @@ void Synchronizer::flushObject(const string &key)
// if there is something to do on key, it should be in the objNames list
// and either in pendingOps or opsInProgress.
// in testing though, going to check whether there is something to do
// The sanity check at the end was intended for debugging / development
// I'm inclined to make it permanent. An existence check on S3 is quick.
bool noExistingJob = false;
auto it = pendingOps.find(key);
if (it != pendingOps.end())
// find the object name and call process()
// find the object name and call process() to start it right away
for (auto name = objNames.begin(); name != objNames.end(); ++it)
{
if (*name == key)
{
process(name, false);
break;
}
}
else
{
auto op = opsInProgress.find(key);
@@ -207,8 +195,7 @@ void Synchronizer::flushObject(const string &key)
op->second->wait(&mutex);
else
{
// it's not in either one, check if there is anything to be done as
// a sanity check.
// it's not in either one, trigger existence check
noExistingJob = true;
}
}
@@ -230,10 +217,11 @@ void Synchronizer::flushObject(const string &key)
} while (err);
if (!exists)
{
logger->log(LOG_DEBUG, "Sync::flushObject(): broken assumption! %s does not exist in cloud storage, but there is no job for it. Uploading it now.");
logger->log(LOG_DEBUG, "Sync::flushObject(): %s does not exist in cloud storage, "
"and there is no job for it. Uploading it now.", key.c_str());
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(NEW_OBJECT));
objNames.push_front(key);
process(objNames.begin(), false);
process(objNames.begin(), true);
}
}
@@ -245,7 +233,7 @@ void Synchronizer::makeJob(const string &key)
threadPool.addJob(j);
}
void Synchronizer::process(list<string>::iterator name, bool use_lock)
void Synchronizer::process(list<string>::iterator name, bool callerHoldsLock)
{
/*
check if there is a pendingOp for name
@@ -255,10 +243,11 @@ void Synchronizer::process(list<string>::iterator name, bool use_lock)
if not, return
*/
// had to use this 'use_lock' kludge to let flush() start processing a job immediately
// had to use this 'callerHoldsLock' kludge to let flush() start processing a job w/o unlocking first
// and introducing a race. It means this fcn should not use its own scoped lock. It sucks, need to rework it.
boost::unique_lock<boost::mutex> s(mutex, boost::defer_lock);
if (use_lock)
if (!callerHoldsLock)
s.lock();
string &key = *name;
@@ -281,7 +270,11 @@ void Synchronizer::process(list<string>::iterator name, bool use_lock)
opsInProgress[key] = pending;
pendingOps.erase(it);
string sourceFile = MetadataFile::getSourceFromKey(*name);
s.unlock();
if (!callerHoldsLock)
s.unlock();
else
mutex.unlock();
bool success = false;
while (!success)
@@ -308,7 +301,11 @@ void Synchronizer::process(list<string>::iterator name, bool use_lock)
}
}
s.lock();
if (!callerHoldsLock)
s.lock();
else
mutex.lock();
opsInProgress.erase(key);
objNames.erase(name);
}
@@ -376,7 +373,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
// merge it with its journal file
if (!oldObjIsCached)
{
err = cs->getObject(key, data, &size);
err = cs->getObject(key, &data, &size);
if (err)
throw runtime_error(string("Synchronizer: getObject() failed: ") + strerror_r(errno, buf, 80));
err = ioc->mergeJournalInMem(data, &size, journalName.c_str());
@@ -418,7 +415,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
}
cache->rename(key, newKey, size - bf::file_size(oldCachePath));
replicator->remove(key.c_str());
replicator->remove(key.c_str()); // should this be the file to remove, or the key?
}
// update the metadata for the source file