1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-18 13:54:11 +03:00

Cleaned up a kludge, and made the retry behavior requeue a job

rather than wait forever to it to work in the same thread.
This commit is contained in:
Patrick LeBlanc
2019-04-08 10:40:28 -05:00
parent 64bbf44227
commit 0be099769b
2 changed files with 39 additions and 36 deletions

View File

@@ -77,7 +77,7 @@ enum OpFlags
void Synchronizer::newJournalEntry(const string &key) void Synchronizer::newJournalEntry(const string &key)
{ {
boost::unique_lock<boost::mutex> s(mutex); boost::unique_lock<boost::recursive_mutex> s(mutex);
auto it = pendingOps.find(key); auto it = pendingOps.find(key);
if (it != pendingOps.end()) if (it != pendingOps.end())
@@ -91,7 +91,7 @@ void Synchronizer::newJournalEntry(const string &key)
void Synchronizer::newObjects(const vector<string> &keys) void Synchronizer::newObjects(const vector<string> &keys)
{ {
boost::unique_lock<boost::mutex> s(mutex); boost::unique_lock<boost::recursive_mutex> s(mutex);
for (const string &key : keys) for (const string &key : keys)
{ {
@@ -103,7 +103,7 @@ void Synchronizer::newObjects(const vector<string> &keys)
void Synchronizer::deletedObjects(const vector<string> &keys) void Synchronizer::deletedObjects(const vector<string> &keys)
{ {
boost::unique_lock<boost::mutex> s(mutex); boost::unique_lock<boost::recursive_mutex> s(mutex);
for (const string &key : keys) for (const string &key : keys)
{ {
@@ -120,7 +120,7 @@ void Synchronizer::deletedObjects(const vector<string> &keys)
void Synchronizer::flushObject(const string &key) void Synchronizer::flushObject(const string &key)
{ {
boost::unique_lock<boost::mutex> s(mutex); boost::unique_lock<boost::recursive_mutex> s(mutex);
// if there is something to do on key, it should be in the objNames list // if there is something to do on key, it should be in the objNames list
// and either in pendingOps or opsInProgress. // and either in pendingOps or opsInProgress.
@@ -135,7 +135,7 @@ void Synchronizer::flushObject(const string &key)
{ {
if (*name == key) if (*name == key)
{ {
process(name, false); process(name);
break; break;
} }
} }
@@ -156,10 +156,10 @@ void Synchronizer::flushObject(const string &key)
return; return;
// check whether this key is in cloud storage // check whether this key is in cloud storage
bool exists; bool keyExists, journalExists;
int err; int err;
do { do {
err = cs->exists(key.c_str(), &exists); err = cs->exists(key.c_str(), &keyExists);
if (err) if (err)
{ {
char buf[80]; char buf[80];
@@ -167,13 +167,23 @@ void Synchronizer::flushObject(const string &key)
sleep(5); sleep(5);
} }
} while (err); } while (err);
if (!exists) journalExists = bf::exists(journalPath/(key + ".journal"));
if (journalExists)
{
logger->log(LOG_DEBUG, "Sync::flushObject(): %s has a journal, "
"and there is no job for it. Merging & uploading now.", key.c_str());
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(JOURNAL));
objNames.push_front(key);
process(objNames.begin());
}
else if (!keyExists)
{ {
logger->log(LOG_DEBUG, "Sync::flushObject(): %s does not exist in cloud storage, " logger->log(LOG_DEBUG, "Sync::flushObject(): %s does not exist in cloud storage, "
"and there is no job for it. Uploading it now.", key.c_str()); "and there is no job for it. Uploading it now.", key.c_str());
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(NEW_OBJECT)); pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(NEW_OBJECT));
objNames.push_front(key); objNames.push_front(key);
process(objNames.begin(), true); process(objNames.begin());
} }
} }
@@ -185,7 +195,7 @@ void Synchronizer::makeJob(const string &key)
threadPool.addJob(j); threadPool.addJob(j);
} }
void Synchronizer::process(list<string>::iterator name, bool callerHoldsLock) void Synchronizer::process(list<string>::iterator name)
{ {
/* /*
check if there is a pendingOp for name check if there is a pendingOp for name
@@ -195,12 +205,7 @@ void Synchronizer::process(list<string>::iterator name, bool callerHoldsLock)
if not, return if not, return
*/ */
// had to use this 'callerHoldsLock' kludge to let flush() start processing a job w/o unlocking first boost::unique_lock<boost::recursive_mutex> s(mutex);
// and introducing a race. It means this fcn should not use its own scoped lock. It sucks, need to rework it.
boost::unique_lock<boost::mutex> s(mutex, boost::defer_lock);
if (!callerHoldsLock)
s.lock();
string &key = *name; string &key = *name;
auto it = pendingOps.find(key); auto it = pendingOps.find(key);
@@ -222,11 +227,7 @@ void Synchronizer::process(list<string>::iterator name, bool callerHoldsLock)
opsInProgress[key] = pending; opsInProgress[key] = pending;
pendingOps.erase(it); pendingOps.erase(it);
string sourceFile = MetadataFile::getSourceFromKey(*name); string sourceFile = MetadataFile::getSourceFromKey(*name);
s.unlock();
if (!callerHoldsLock)
s.unlock();
else
mutex.unlock();
bool success = false; bool success = false;
while (!success) while (!success)
@@ -237,7 +238,7 @@ void Synchronizer::process(list<string>::iterator name, bool callerHoldsLock)
// In particular, it's possible that by the time synchronize() runs, // In particular, it's possible that by the time synchronize() runs,
// the file to sync has already been deleted. When one of these functions // the file to sync has already been deleted. When one of these functions
// encounters a state that doesn't make sense, such as being told to upload a file // encounters a state that doesn't make sense, such as being told to upload a file
// that doesn't exist, it will return silently under the assumption that // that doesn't exist, it will return successfully under the assumption that
// things are working as they should upstream, and a syncDelete() call will be coming // things are working as they should upstream, and a syncDelete() call will be coming
// shortly. // shortly.
if (pending->opFlags & DELETE) if (pending->opFlags & DELETE)
@@ -252,18 +253,20 @@ void Synchronizer::process(list<string>::iterator name, bool callerHoldsLock)
success = true; success = true;
} }
catch(exception &e) { catch(exception &e) {
logger->log(LOG_CRIT, "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Retrying...", key.c_str(), logger->log(LOG_CRIT, "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Requeueing...", key.c_str(),
pending->opFlags, e.what()); pending->opFlags, e.what());
success = false; success = false;
sleep(1); sleep(1);
// TODO: requeue the job instead of looping infinitely s.lock();
pendingOps[key] = pending;
opsInProgress.erase(key);
makeJob(key);
objNames.erase(name);
return;
} }
} }
if (!callerHoldsLock) s.lock();
s.lock();
else
mutex.lock();
opsInProgress.erase(key); opsInProgress.erase(key);
objNames.erase(name); objNames.erase(name);
@@ -429,7 +432,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
void Synchronizer::rename(const string &oldKey, const string &newKey) void Synchronizer::rename(const string &oldKey, const string &newKey)
{ {
boost::unique_lock<boost::mutex> s(mutex); boost::unique_lock<boost::recursive_mutex> s(mutex);
auto it = pendingOps.find(oldKey); auto it = pendingOps.find(oldKey);
if (it == pendingOps.end()) if (it == pendingOps.end())
@@ -458,14 +461,14 @@ Synchronizer::PendingOps::PendingOps(int flags) : opFlags(flags), finished(false
{ {
} }
void Synchronizer::PendingOps::notify(boost::mutex *m) void Synchronizer::PendingOps::notify(boost::recursive_mutex *m)
{ {
boost::unique_lock<boost::mutex> s(*m); boost::unique_lock<boost::recursive_mutex> s(*m);
finished = true; finished = true;
condvar.notify_all(); condvar.notify_all();
} }
void Synchronizer::PendingOps::wait(boost::mutex *m) void Synchronizer::PendingOps::wait(boost::recursive_mutex *m)
{ {
while (!finished) while (!finished)
condvar.wait(*m); condvar.wait(*m);

View File

@@ -39,7 +39,7 @@ class Synchronizer : public boost::noncopyable
private: private:
Synchronizer(); Synchronizer();
void process(std::list<std::string>::iterator key, bool callerHoldsLock=false); void process(std::list<std::string>::iterator key);
void synchronize(const std::string &sourceFile, std::list<std::string>::iterator &it); void synchronize(const std::string &sourceFile, std::list<std::string>::iterator &it);
void synchronizeDelete(const std::string &sourceFile, std::list<std::string>::iterator &it); void synchronizeDelete(const std::string &sourceFile, std::list<std::string>::iterator &it);
void synchronizeWithJournal(const std::string &sourceFile, std::list<std::string>::iterator &it); void synchronizeWithJournal(const std::string &sourceFile, std::list<std::string>::iterator &it);
@@ -53,8 +53,8 @@ class Synchronizer : public boost::noncopyable
int opFlags; int opFlags;
bool finished; bool finished;
boost::condition condvar; boost::condition condvar;
void wait(boost::mutex *); void wait(boost::recursive_mutex *);
void notify(boost::mutex *); void notify(boost::recursive_mutex *);
}; };
struct Job : public ThreadPool::Job struct Job : public ThreadPool::Job
@@ -84,7 +84,7 @@ class Synchronizer : public boost::noncopyable
boost::filesystem::path cachePath; boost::filesystem::path cachePath;
boost::filesystem::path journalPath; boost::filesystem::path journalPath;
boost::mutex mutex; boost::recursive_mutex mutex;
}; };
} }