You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-18 13:54:11 +03:00
Fixed a few problems in Sync.
- flushObject() could wait on an element in objNames which doesn't exist, and so it would not get processed. - process() would not always delete its job from objNames - Added add'l debugging printouts and a fallback case for when syncWithJournal does not find a journal file. - noticed a name collision between the scoped ops in syncWithJournal (harmless)
This commit is contained in:
@@ -129,23 +129,18 @@ void Synchronizer::flushObject(const string &key)
|
|||||||
{
|
{
|
||||||
boost::unique_lock<boost::recursive_mutex> s(mutex);
|
boost::unique_lock<boost::recursive_mutex> s(mutex);
|
||||||
|
|
||||||
// if there is something to do on key, it should be in the objNames list
|
// if there is something to do on key, it should be either in pendingOps or opsInProgress
|
||||||
// and either in pendingOps or opsInProgress.
|
// if it is is pending ops, start the job now. If it is in progress, wait for it to finish.
|
||||||
// The sanity check at the end was intended for debugging / development
|
// The sanity check at the end was intended for debugging / development
|
||||||
// I'm inclined to make it permanent. An existence check on S3 is quick.
|
// I'm inclined to make it permanent. An existence check on S3 is quick.
|
||||||
|
|
||||||
bool noExistingJob = false;
|
bool noExistingJob = false;
|
||||||
auto it = pendingOps.find(key);
|
auto it = pendingOps.find(key);
|
||||||
if (it != pendingOps.end())
|
if (it != pendingOps.end())
|
||||||
// find the object name and call process() to start it right away
|
{
|
||||||
for (auto name = objNames.begin(); name != objNames.end(); ++name)
|
objNames.push_front(key);
|
||||||
{
|
process(objNames.begin());
|
||||||
if (*name == key)
|
}
|
||||||
{
|
|
||||||
process(name);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
auto op = opsInProgress.find(key);
|
auto op = opsInProgress.find(key);
|
||||||
@@ -241,11 +236,15 @@ void Synchronizer::process(list<string>::iterator name)
|
|||||||
{
|
{
|
||||||
boost::shared_ptr<PendingOps> tmp = op->second;
|
boost::shared_ptr<PendingOps> tmp = op->second;
|
||||||
tmp->wait(&mutex);
|
tmp->wait(&mutex);
|
||||||
|
objNames.erase(name);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
{
|
||||||
// it's not in pending or opsinprogress, nothing to do
|
// it's not in pending or opsinprogress, nothing to do
|
||||||
|
objNames.erase(name);
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
boost::shared_ptr<PendingOps> pending = it->second;
|
boost::shared_ptr<PendingOps> pending = it->second;
|
||||||
@@ -340,14 +339,20 @@ void Synchronizer::synchronizeDelete(const string &sourceFile, list<string>::ite
|
|||||||
{
|
{
|
||||||
ScopedWriteLock s(ioc, sourceFile);
|
ScopedWriteLock s(ioc, sourceFile);
|
||||||
cs->deleteObject(*it);
|
cs->deleteObject(*it);
|
||||||
|
|
||||||
|
// delete any pending jobs for *it.
|
||||||
|
boost::unique_lock<boost::recursive_mutex> sc(mutex);
|
||||||
|
pendingOps.erase(*it);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>::iterator &lit)
|
void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>::iterator &lit)
|
||||||
{
|
{
|
||||||
ScopedWriteLock s(ioc, sourceFile);
|
ScopedWriteLock s(ioc, sourceFile);
|
||||||
|
|
||||||
|
char buf[80];
|
||||||
string &key = *lit;
|
string &key = *lit;
|
||||||
MetadataFile md(sourceFile.c_str(), MetadataFile::no_create_t());
|
MetadataFile md(sourceFile.c_str(), MetadataFile::no_create_t());
|
||||||
|
|
||||||
if (!md.exists())
|
if (!md.exists())
|
||||||
{
|
{
|
||||||
logger->log(LOG_DEBUG, "synchronizeWithJournal(): no metadata found for %s", sourceFile.c_str());
|
logger->log(LOG_DEBUG, "synchronizeWithJournal(): no metadata found for %s", sourceFile.c_str());
|
||||||
@@ -361,15 +366,38 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
|
|||||||
if (!bf::exists(journalName))
|
if (!bf::exists(journalName))
|
||||||
{
|
{
|
||||||
logger->log(LOG_DEBUG, "synchronizeWithJournal(): no journal file found for %s", key.c_str());
|
logger->log(LOG_DEBUG, "synchronizeWithJournal(): no journal file found for %s", key.c_str());
|
||||||
// I don't think this should happen, maybe throw a logic_error here.
|
|
||||||
// Revision ^^. It can happen if the object was deleted after the op was latched but before it runs.
|
// sanity check + add'l info. Test whether the object exists in cloud storage. If so, complain,
|
||||||
|
// and run synchronize() instead.
|
||||||
|
bool existsOnCloud;
|
||||||
|
int err = cs->exists(key, &existsOnCloud);
|
||||||
|
if (err)
|
||||||
|
throw runtime_error(string("Synchronizer: cs->exists() failed: ") + strerror_r(errno, buf, 80));
|
||||||
|
if (!existsOnCloud)
|
||||||
|
{
|
||||||
|
if (bf::exists(oldCachePath))
|
||||||
|
{
|
||||||
|
logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s has no journal and does not exist in the cloud, calling "
|
||||||
|
"synchronize() instead. Need to explain how this happens.", key.c_str());
|
||||||
|
s.unlock();
|
||||||
|
synchronize(sourceFile, lit);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s has no journal, and does not exist in the cloud or in "
|
||||||
|
" the local cache. Need to explain how this happens.", key.c_str());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s has no journal, but it does exist in the cloud. "
|
||||||
|
" This indicates that an overlapping syncWithJournal() call handled it properly.", key.c_str());
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int err;
|
int err;
|
||||||
boost::shared_array<uint8_t> data;
|
boost::shared_array<uint8_t> data;
|
||||||
size_t count = 0, size = mdEntry.length;
|
size_t count = 0, size = mdEntry.length;
|
||||||
char buf[80];
|
|
||||||
bool oldObjIsCached = cache->exists(key);
|
bool oldObjIsCached = cache->exists(key);
|
||||||
|
|
||||||
// get the base object if it is not already cached
|
// get the base object if it is not already cached
|
||||||
@@ -449,7 +477,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
|
|||||||
if (newFD < 0)
|
if (newFD < 0)
|
||||||
throw runtime_error(string("Synchronizer: Failed to open a new object in local storage! Got ")
|
throw runtime_error(string("Synchronizer: Failed to open a new object in local storage! Got ")
|
||||||
+ strerror_r(errno, buf, 80));
|
+ strerror_r(errno, buf, 80));
|
||||||
ScopedCloser s(newFD);
|
ScopedCloser scloser(newFD);
|
||||||
|
|
||||||
while (count < size)
|
while (count < size)
|
||||||
{
|
{
|
||||||
@@ -473,7 +501,6 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
|
|||||||
|
|
||||||
rename(key, newKey);
|
rename(key, newKey);
|
||||||
ioc->renameObject(key, newKey);
|
ioc->renameObject(key, newKey);
|
||||||
s.unlock();
|
|
||||||
|
|
||||||
// delete the old object & journal file
|
// delete the old object & journal file
|
||||||
cache->deletedJournal(bf::file_size(journalName));
|
cache->deletedJournal(bf::file_size(journalName));
|
||||||
|
|||||||
Reference in New Issue
Block a user