You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-15 12:09:09 +03:00
Fixed a bug in Sync where we were using an object through a
reference to a shared pointer. A deref of the shared ptr could trigger the dtor of the object while threads are waiting. Fixed that by properly copying the shared ptr.
This commit is contained in:
@@ -150,8 +150,10 @@ void Synchronizer::flushObject(const string &key)
|
|||||||
{
|
{
|
||||||
auto op = opsInProgress.find(key);
|
auto op = opsInProgress.find(key);
|
||||||
// it's already in progress
|
// it's already in progress
|
||||||
if (op != opsInProgress.end())
|
if (op != opsInProgress.end()) {
|
||||||
op->second->wait(&mutex);
|
boost::shared_ptr<PendingOps> tmp = op->second;
|
||||||
|
tmp->wait(&mutex);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// it's not in either one, trigger existence check
|
// it's not in either one, trigger existence check
|
||||||
@@ -237,7 +239,8 @@ void Synchronizer::process(list<string>::iterator name)
|
|||||||
// it's already in progress
|
// it's already in progress
|
||||||
if (op != opsInProgress.end())
|
if (op != opsInProgress.end())
|
||||||
{
|
{
|
||||||
op->second->wait(&mutex);
|
boost::shared_ptr<PendingOps> tmp = op->second;
|
||||||
|
tmp->wait(&mutex);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@@ -246,9 +249,12 @@ void Synchronizer::process(list<string>::iterator name)
|
|||||||
}
|
}
|
||||||
|
|
||||||
boost::shared_ptr<PendingOps> pending = it->second;
|
boost::shared_ptr<PendingOps> pending = it->second;
|
||||||
opsInProgress[key] = pending;
|
bool inserted = opsInProgress.insert(*it).second;
|
||||||
pendingOps.erase(it);
|
if (!inserted)
|
||||||
|
return; // the one in pending will have to wait until the next time to avoid clobbering waiting threads
|
||||||
|
//opsInProgress[key] = it->second;
|
||||||
string sourceFile = MetadataFile::getSourceFromKey(*name);
|
string sourceFile = MetadataFile::getSourceFromKey(*name);
|
||||||
|
pendingOps.erase(it);
|
||||||
s.unlock();
|
s.unlock();
|
||||||
|
|
||||||
bool success = false;
|
bool success = false;
|
||||||
@@ -271,11 +277,12 @@ void Synchronizer::process(list<string>::iterator name)
|
|||||||
synchronize(sourceFile, name);
|
synchronize(sourceFile, name);
|
||||||
else
|
else
|
||||||
throw logic_error("Synchronizer::process(): got an unknown op flag");
|
throw logic_error("Synchronizer::process(): got an unknown op flag");
|
||||||
pending->notify(&mutex);
|
s.lock();
|
||||||
|
pending->notify();
|
||||||
success = true;
|
success = true;
|
||||||
}
|
}
|
||||||
catch(exception &e) {
|
catch(exception &e) {
|
||||||
logger->log(LOG_CRIT, "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Requeueing...", key.c_str(),
|
logger->log(LOG_CRIT, "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Retrying...", key.c_str(),
|
||||||
pending->opFlags, e.what());
|
pending->opFlags, e.what());
|
||||||
success = false;
|
success = false;
|
||||||
sleep(1);
|
sleep(1);
|
||||||
@@ -295,8 +302,6 @@ void Synchronizer::process(list<string>::iterator name)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.lock();
|
|
||||||
|
|
||||||
opsInProgress.erase(key);
|
opsInProgress.erase(key);
|
||||||
objNames.erase(name);
|
objNames.erase(name);
|
||||||
@@ -503,13 +508,17 @@ bf::path Synchronizer::getCachePath()
|
|||||||
|
|
||||||
/* The helper objects & fcns */
|
/* The helper objects & fcns */
|
||||||
|
|
||||||
Synchronizer::PendingOps::PendingOps(int flags) : opFlags(flags), finished(false)
|
Synchronizer::PendingOps::PendingOps(int flags) : opFlags(flags), waiters(0), finished(false)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
void Synchronizer::PendingOps::notify(boost::recursive_mutex *m)
|
Synchronizer::PendingOps::~PendingOps()
|
||||||
|
{
|
||||||
|
assert(waiters == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
void Synchronizer::PendingOps::notify()
|
||||||
{
|
{
|
||||||
boost::unique_lock<boost::recursive_mutex> s(*m);
|
|
||||||
finished = true;
|
finished = true;
|
||||||
condvar.notify_all();
|
condvar.notify_all();
|
||||||
}
|
}
|
||||||
@@ -517,7 +526,11 @@ void Synchronizer::PendingOps::notify(boost::recursive_mutex *m)
|
|||||||
void Synchronizer::PendingOps::wait(boost::recursive_mutex *m)
|
void Synchronizer::PendingOps::wait(boost::recursive_mutex *m)
|
||||||
{
|
{
|
||||||
while (!finished)
|
while (!finished)
|
||||||
|
{
|
||||||
|
waiters++;
|
||||||
condvar.wait(*m);
|
condvar.wait(*m);
|
||||||
|
waiters--;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -51,11 +51,13 @@ class Synchronizer : public boost::noncopyable
|
|||||||
struct PendingOps
|
struct PendingOps
|
||||||
{
|
{
|
||||||
PendingOps(int flags);
|
PendingOps(int flags);
|
||||||
|
~PendingOps();
|
||||||
int opFlags;
|
int opFlags;
|
||||||
|
int waiters;
|
||||||
bool finished;
|
bool finished;
|
||||||
boost::condition condvar;
|
boost::condition condvar;
|
||||||
void wait(boost::recursive_mutex *);
|
void wait(boost::recursive_mutex *);
|
||||||
void notify(boost::recursive_mutex *);
|
void notify();
|
||||||
};
|
};
|
||||||
|
|
||||||
struct Job : public ThreadPool::Job
|
struct Job : public ThreadPool::Job
|
||||||
|
|||||||
Reference in New Issue
Block a user