You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-17 01:02:23 +03:00
Got the synchronizer stuff to build.
This commit is contained in:
@@ -1,14 +1,69 @@
|
||||
|
||||
#include "Synchronizer.h"
|
||||
#include "Metadatafile.h"
|
||||
#include "MetadataFile.h"
|
||||
#include <boost/thread/mutex.hpp>
|
||||
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
using namespace std;
|
||||
|
||||
namespace
|
||||
{
|
||||
storagemanager::Synchronizer *instance = NULL;
|
||||
boost::mutex inst_mutex;
|
||||
storagemanager::Synchronizer *instance = NULL;
|
||||
boost::mutex inst_mutex;
|
||||
|
||||
// a few utility classes. Maybe move these to a utilities header.
|
||||
struct ScopedReadLock
|
||||
{
|
||||
ScopedReadLock(storagemanager::IOCoordinator *i, const string &k) : ioc(i), key(k)
|
||||
{
|
||||
ioc->readLock(key);
|
||||
}
|
||||
~ScopedReadLock()
|
||||
{
|
||||
ioc->readUnlock(key);
|
||||
}
|
||||
storagemanager::IOCoordinator *ioc;
|
||||
const string key;
|
||||
};
|
||||
|
||||
struct ScopedWriteLock
|
||||
{
|
||||
ScopedWriteLock(storagemanager::IOCoordinator *i, const string &k) : ioc(i), key(k)
|
||||
{
|
||||
ioc->writeLock(key);
|
||||
locked = true;
|
||||
}
|
||||
~ScopedWriteLock()
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
void unlock()
|
||||
{
|
||||
if (locked)
|
||||
{
|
||||
ioc->writeUnlock(key);
|
||||
locked = false;
|
||||
}
|
||||
}
|
||||
storagemanager::IOCoordinator *ioc;
|
||||
bool locked;
|
||||
const string key;
|
||||
};
|
||||
|
||||
struct ScopedCloser {
|
||||
ScopedCloser(int f) : fd(f) { }
|
||||
~ScopedCloser() {
|
||||
int s_errno = errno;
|
||||
::close(fd);
|
||||
errno = s_errno;
|
||||
}
|
||||
int fd;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
namespace bf = boost::filesystem;
|
||||
@@ -33,8 +88,9 @@ Synchronizer::Synchronizer() : maxUploads(0)
|
||||
cache = Cache::get();
|
||||
replicator = Replicator::get();
|
||||
ioc = IOCoordinator::get();
|
||||
cs = CloudStorage::get();
|
||||
|
||||
string stmp = config->getValue("ObjectStorage", "max_concurrent_uploads")
|
||||
string stmp = config->getValue("ObjectStorage", "max_concurrent_uploads");
|
||||
try
|
||||
{
|
||||
maxUploads = stoul(stmp);
|
||||
@@ -47,21 +103,21 @@ Synchronizer::Synchronizer() : maxUploads(0)
|
||||
maxUploads = 20;
|
||||
|
||||
stmp = config->getValue("ObjectStorage", "journal_path");
|
||||
if (prefix.empty())
|
||||
if (stmp.empty())
|
||||
{
|
||||
logger->log(LOG_CRIT, "ObjectStorage/journal_path is not set");
|
||||
throw runtime_error("Please set ObjectStorage/journal_path in the storagemanager.cnf file");
|
||||
}
|
||||
try
|
||||
{
|
||||
bf::create_directories(stmp);
|
||||
journalPath = stmp;
|
||||
bf::create_directories(journalPath);
|
||||
}
|
||||
catch (exception &e)
|
||||
{
|
||||
syslog(LOG_CRIT, "Failed to create %s, got: %s", stmp.string().c_str(), e.what());
|
||||
logger->log(LOG_CRIT, "Failed to create %s, got: %s", stmp.c_str(), e.what());
|
||||
throw e;
|
||||
}
|
||||
journalPath = stmp;
|
||||
cachePath = cache->getCachePath();
|
||||
threadPool.setMaxThreads(maxUploads);
|
||||
}
|
||||
@@ -100,7 +156,7 @@ void Synchronizer::newObjects(const vector<string> &keys)
|
||||
{
|
||||
boost::unique_lock<boost::mutex> s(mutex);
|
||||
|
||||
for (string &key : keys)
|
||||
for (const string &key : keys)
|
||||
{
|
||||
assert(pendingOps.find(key) == pendingOps.end());
|
||||
makeJob(key);
|
||||
@@ -112,7 +168,7 @@ void Synchronizer::deletedObjects(const vector<string> &keys)
|
||||
{
|
||||
boost::unique_lock<boost::mutex> s(mutex);
|
||||
|
||||
for (string &key : keys)
|
||||
for (const string &key : keys)
|
||||
{
|
||||
auto it = pendingOps.find(key);
|
||||
if (it != pendingOps.end())
|
||||
@@ -127,30 +183,85 @@ void Synchronizer::deletedObjects(const vector<string> &keys)
|
||||
|
||||
void Synchronizer::flushObject(const string &key)
|
||||
{
|
||||
process(key);
|
||||
boost::unique_lock<boost::mutex> s(mutex);
|
||||
|
||||
// if there is something to do on key, it should be in the objNames list
|
||||
// and either in pendingOps or opsInProgress.
|
||||
// in testing though, going to check whether there is something to do
|
||||
|
||||
bool noExistingJob = false;
|
||||
auto it = pendingOps.find(key);
|
||||
if (it != pendingOps.end())
|
||||
// find the object name and call process()
|
||||
for (auto name = objNames.begin(); name != objNames.end(); ++it)
|
||||
if (*name == key)
|
||||
{
|
||||
process(name, false);
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
auto op = opsInProgress.find(key);
|
||||
// it's already in progress
|
||||
if (op != opsInProgress.end())
|
||||
op->second->wait(&mutex);
|
||||
else
|
||||
{
|
||||
// it's not in either one, check if there is anything to be done as
|
||||
// a sanity check.
|
||||
noExistingJob = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!noExistingJob)
|
||||
return;
|
||||
|
||||
// check whether this key is in cloud storage
|
||||
bool exists;
|
||||
int err;
|
||||
do {
|
||||
err = cs->exists(key.c_str(), &exists);
|
||||
if (err)
|
||||
{
|
||||
char buf[80];
|
||||
logger->log(LOG_CRIT, "Sync::flushObject(): cloud existence check failed, got '%s'", strerror_r(errno, buf, 80));
|
||||
sleep(5);
|
||||
}
|
||||
} while (err);
|
||||
if (!exists)
|
||||
{
|
||||
logger->log(LOG_DEBUG, "Sync::flushObject(): broken assumption! %s does not exist in cloud storage, but there is no job for it. Uploading it now.");
|
||||
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(NEW_OBJECT));
|
||||
objNames.push_front(key);
|
||||
process(objNames.begin(), false);
|
||||
}
|
||||
}
|
||||
|
||||
void Synchronizer::makeJob(const string &key)
|
||||
{
|
||||
boost::shared_ptr<string> s(new string(key));
|
||||
names.push_front(s);
|
||||
objNames.push_front(key);
|
||||
|
||||
boost::shared_ptr<Job> j(new Job(this, names.begin()));
|
||||
boost::shared_ptr<Job> j(new Job(this, objNames.begin()));
|
||||
threadPool.addJob(j);
|
||||
}
|
||||
|
||||
void Synchronizer::process(list<string>::iterator &name)
|
||||
void Synchronizer::process(list<string>::iterator name, bool use_lock)
|
||||
{
|
||||
/*
|
||||
check if there is a pendingOp for *it
|
||||
check if there is a pendingOp for name
|
||||
if yes, start processing it
|
||||
if no,
|
||||
check if there is an ongoing op and block on it
|
||||
if not, return
|
||||
*/
|
||||
|
||||
boost::unique_lock<boost::mutex> s(mutex);
|
||||
// had to use this 'use_lock' kludge to let flush() start processing a job immediately
|
||||
boost::unique_lock<boost::mutex> s(mutex, boost::defer_lock);
|
||||
|
||||
if (use_lock)
|
||||
s.lock();
|
||||
|
||||
string &key = *name;
|
||||
auto it = pendingOps.find(key);
|
||||
if (it == pendingOps.end())
|
||||
{
|
||||
@@ -161,22 +272,27 @@ void Synchronizer::process(list<string>::iterator &name)
|
||||
op->second->wait(&mutex);
|
||||
return;
|
||||
}
|
||||
else
|
||||
// it's not in pending or opsinprogress, nothing to do
|
||||
return;
|
||||
}
|
||||
|
||||
boost::shared_ptr<PendingOps> pending = it->second;
|
||||
opsInProgress[key] = *it;
|
||||
opsInProgress[key] = pending;
|
||||
pendingOps.erase(it);
|
||||
string sourceFile = Metadata::getSourceFilenameFromKey(*name);
|
||||
string sourceFile = MetadataFile::getSourceFromKey(*name);
|
||||
s.unlock();
|
||||
|
||||
bool success = false;
|
||||
while (!success)
|
||||
{
|
||||
try {
|
||||
/* Exceptions should only happen b/c of cloud service errors. Rather than retry here endlessly,
|
||||
probably a better idea to have cloudstorage classes do the retrying */
|
||||
if (pending->opFlags & DELETE)
|
||||
synchronizeDelete(sourceFile, name);
|
||||
else if (pending->opFlags & JOURNAL)
|
||||
synchronizerWithJournal(sourceFile, name);
|
||||
synchronizeWithJournal(sourceFile, name);
|
||||
else if (pending->opFlags & NEW_OBJECT)
|
||||
synchronize(sourceFile, name);
|
||||
else
|
||||
@@ -194,53 +310,10 @@ void Synchronizer::process(list<string>::iterator &name)
|
||||
|
||||
s.lock();
|
||||
opsInProgress.erase(key);
|
||||
names.erase(name);
|
||||
|
||||
// TBD: On a network outage or S3 outage, it might not be a bad idea to keep retrying
|
||||
// until the end of time. This will (?) naturally make the system unusable until the blockage
|
||||
// is cleared, which is what we want, right? Is there a way to nicely tell the user what
|
||||
// is happening, or are the logs good enough?
|
||||
objNames.erase(name);
|
||||
}
|
||||
|
||||
struct ScopedReadLock
|
||||
{
|
||||
ScopedReadLock(IOCoordinator *i, string &key)
|
||||
{
|
||||
ioc = i;
|
||||
ioc->readLock(key.c_str());
|
||||
}
|
||||
~ScopedReadLock()
|
||||
|
||||
ioc->readUnlock(key.c_str());
|
||||
}
|
||||
IOCoordinator *ioc;
|
||||
};
|
||||
|
||||
struct ScopedWriteLock
|
||||
{
|
||||
ScopedWriteLock(IOCoordinator *i, string &key)
|
||||
{
|
||||
ioc = i;
|
||||
ioc->writeLock(key.c_str());
|
||||
locked = true;
|
||||
}
|
||||
~ScopedReadLock()
|
||||
{
|
||||
if (locked)
|
||||
ioc->writeUnlock(key.c_str());
|
||||
}
|
||||
|
||||
void unlock()
|
||||
{
|
||||
if (locked)
|
||||
{
|
||||
ioc->writeUnlock(key.c_str());
|
||||
locked = false;
|
||||
}
|
||||
}
|
||||
IOCoordinator *ioc;
|
||||
bool locked;
|
||||
};
|
||||
|
||||
void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator &it)
|
||||
{
|
||||
@@ -266,21 +339,20 @@ void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator
|
||||
return;
|
||||
}
|
||||
|
||||
err = cs->putObject(cachePath / key, key);
|
||||
err = cs->putObject((cachePath / key).string(), key);
|
||||
if (err)
|
||||
throw runtime_error(string("synchronize(): uploading ") + key + ", got " + strerror_r(errno, buf, 80));
|
||||
replicator->delete(key, Replicator::NO_LOCAL);
|
||||
replicator->remove(key.c_str(), Replicator::NO_LOCAL);
|
||||
}
|
||||
|
||||
void Synchronizer::synchronizeDelete(const string &sourceFile, list<string>::iterator &it)
|
||||
{
|
||||
ScopedWriteLock s(ioc, sourceFile);
|
||||
cs->delete(*it);
|
||||
cs->deleteObject(*it);
|
||||
}
|
||||
|
||||
void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>::iterator &lit)
|
||||
{
|
||||
// interface to Metadata TBD
|
||||
ScopedWriteLock s(ioc, sourceFile);
|
||||
|
||||
string &key = *lit;
|
||||
@@ -289,7 +361,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
|
||||
|
||||
if (!bf::exists(journalName))
|
||||
{
|
||||
logger->(LOG_WARNING, "synchronizeWithJournal(): no journal file found for %s", key.c_str());
|
||||
logger->log(LOG_WARNING, "synchronizeWithJournal(): no journal file found for %s", key.c_str());
|
||||
// I don't think this should happen, maybe throw a logic_error here
|
||||
return;
|
||||
}
|
||||
@@ -307,15 +379,15 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
|
||||
err = cs->getObject(key, data, &size);
|
||||
if (err)
|
||||
throw runtime_error(string("Synchronizer: getObject() failed: ") + strerror_r(errno, buf, 80));
|
||||
err = ios->mergeJournalInMem(data, journalName, &size);
|
||||
err = ioc->mergeJournalInMem(data, &size, journalName.c_str());
|
||||
assert(!err);
|
||||
}
|
||||
else
|
||||
data = ios->mergeJournal(oldCachePath.string(), journalName, 0, &size);
|
||||
data = ioc->mergeJournal(oldCachePath.string().c_str(), journalName.c_str(), 0, &size);
|
||||
assert(data);
|
||||
|
||||
// get a new key for the resolved version & upload it
|
||||
string newKey = ioc->newKeyFromOldKey(key);
|
||||
string newKey = MetadataFile::getNewKeyFromOldKey(key, size);
|
||||
err = cs->putObject(data, size, newKey);
|
||||
if (err)
|
||||
throw runtime_error(string("Synchronizer: putObject() failed: ") + strerror_r(errno, buf, 80));
|
||||
@@ -346,24 +418,23 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
|
||||
}
|
||||
|
||||
cache->rename(key, newKey, size - bf::file_size(oldCachePath));
|
||||
replicator->delete(key);
|
||||
replicator->remove(key.c_str());
|
||||
}
|
||||
|
||||
// update the metadata for the source file
|
||||
// waiting for stubs to see what these calls look like
|
||||
/*
|
||||
Metadata md(sourceFilename);
|
||||
md.rename(key, newKey);
|
||||
replicator->updateMetadata(sourceFilename, md);
|
||||
*/
|
||||
|
||||
MetadataFile md(sourceFile.c_str());
|
||||
md.updateEntry(MetadataFile::getOffsetFromKey(key), newKey, size);
|
||||
replicator->updateMetadata(sourceFile.c_str(), md);
|
||||
|
||||
rename(key, newKey);
|
||||
ioc->renameObject(oldkey, newkey);
|
||||
ioc->renameObject(key, newKey);
|
||||
s.unlock();
|
||||
|
||||
// delete the old object & journal file
|
||||
cache->deletedJournal(bf::file_size(journalName);
|
||||
replicator->delete(journalName);
|
||||
cs->delete(key);
|
||||
cache->deletedJournal(bf::file_size(journalName));
|
||||
replicator->remove(journalName.c_str());
|
||||
cs->deleteObject(key);
|
||||
}
|
||||
|
||||
void Synchronizer::rename(const string &oldKey, const string &newKey)
|
||||
@@ -377,8 +448,8 @@ void Synchronizer::rename(const string &oldKey, const string &newKey)
|
||||
pendingOps.erase(it);
|
||||
|
||||
for (auto &name: objNames)
|
||||
if (*name == oldKey)
|
||||
*name = newKey;
|
||||
if (name == oldKey)
|
||||
name = newKey;
|
||||
}
|
||||
|
||||
bf::path Synchronizer::getJournalPath()
|
||||
@@ -393,22 +464,18 @@ bf::path Synchronizer::getCachePath()
|
||||
|
||||
/* The helper objects & fcns */
|
||||
|
||||
Synchronizer::PendingOps(int flags) : opFlags(flags), finished(false)
|
||||
Synchronizer::PendingOps::PendingOps(int flags) : opFlags(flags), finished(false)
|
||||
{
|
||||
}
|
||||
|
||||
Synchronizer::~PendingOps()
|
||||
{
|
||||
}
|
||||
|
||||
Synchronizer::PendingOps::notify(boost::mutex *m)
|
||||
void Synchronizer::PendingOps::notify(boost::mutex *m)
|
||||
{
|
||||
boost::unique_lock<boost::mutex> s(*m);
|
||||
finished = true;
|
||||
condvar.notify_all();
|
||||
}
|
||||
|
||||
Synchronizer::PendingOps::wait(boost::mutex *m)
|
||||
void Synchronizer::PendingOps::wait(boost::mutex *m)
|
||||
{
|
||||
while (!finished)
|
||||
condvar.wait(*m);
|
||||
|
||||
Reference in New Issue
Block a user