1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-13 23:02:14 +03:00

Untangled cache flushing behavior s.t. it should no longer deadlock.

Now, something that grabs the sync or cache locks should never block
while holding the lock.  Need to review the code to make sure of that.

Also made the recursive mutexes non-recursive again.
This commit is contained in:
Patrick LeBlanc
2019-05-24 13:02:09 -05:00
parent b7b9ce9f93
commit 0cad8308f1
7 changed files with 110 additions and 64 deletions

View File

@@ -166,7 +166,7 @@ void Cache::read(const vector<string> &keys)
fetch keys that do not exist
after fetching, move all keys from do-not-evict to the back of the LRU
*/
boost::unique_lock<boost::recursive_mutex> s(lru_mutex);
boost::unique_lock<boost::mutex> s(lru_mutex);
vector<const string *> keysToFetch;
uint i;
@@ -243,14 +243,23 @@ void Cache::read(const vector<string> &keys)
vector<int> dlErrnos;
vector<size_t> dlSizes;
boost::unique_lock<boost::recursive_mutex> s(lru_mutex);
boost::unique_lock<boost::mutex> s(lru_mutex);
M_LRU_t::iterator mit;
for (const string &key : keys)
{
mit = m_lru.find(key);
if (mit != m_lru.end())
{
lru.splice(lru.end(), lru, mit->lit);
// if it's about to be deleted, stop that
TBD_t::iterator tbd_it = toBeDeleted.find(mit->lit);
if (tbd_it != toBeDeleted.end())
{
cout << "Saved one from being deleted" << endl;
toBeDeleted.erase(tbd_it);
}
}
else
keysToFetch.push_back(&key);
}
@@ -323,20 +332,20 @@ const bf::path & Cache::getJournalPath()
void Cache::exists(const vector<string> &keys, vector<bool> *out) const
{
out->resize(keys.size());
boost::unique_lock<boost::recursive_mutex> s(lru_mutex);
boost::unique_lock<boost::mutex> s(lru_mutex);
for (uint i = 0; i < keys.size(); i++)
(*out)[i] = (m_lru.find(keys[i]) != m_lru.end());
}
bool Cache::exists(const string &key) const
{
boost::unique_lock<boost::recursive_mutex> s(lru_mutex);
boost::unique_lock<boost::mutex> s(lru_mutex);
return m_lru.find(key) != m_lru.end();
}
void Cache::newObject(const string &key, size_t size)
{
boost::unique_lock<boost::recursive_mutex> s(lru_mutex);
boost::unique_lock<boost::mutex> s(lru_mutex);
assert(m_lru.find(key) == m_lru.end());
_makeSpace(size);
lru.push_back(key);
@@ -347,21 +356,21 @@ void Cache::newObject(const string &key, size_t size)
void Cache::newJournalEntry(size_t size)
{
boost::unique_lock<boost::recursive_mutex> s(lru_mutex);
boost::unique_lock<boost::mutex> s(lru_mutex);
_makeSpace(size);
currentCacheSize += size;
}
void Cache::deletedJournal(size_t size)
{
boost::unique_lock<boost::recursive_mutex> s(lru_mutex);
boost::unique_lock<boost::mutex> s(lru_mutex);
assert(currentCacheSize >= size);
currentCacheSize -= size;
}
void Cache::deletedObject(const string &key, size_t size)
{
boost::unique_lock<boost::recursive_mutex> s(lru_mutex);
boost::unique_lock<boost::mutex> s(lru_mutex);
assert(currentCacheSize >= size);
M_LRU_t::iterator mit = m_lru.find(key);
assert(mit != m_lru.end()); // TODO: 5/16/19 - got this assertion using S3 by running test000, then test000 again.
@@ -373,7 +382,7 @@ void Cache::deletedObject(const string &key, size_t size)
void Cache::setMaxCacheSize(size_t size)
{
boost::unique_lock<boost::recursive_mutex> s(lru_mutex);
boost::unique_lock<boost::mutex> s(lru_mutex);
if (size < maxCacheSize)
_makeSpace(maxCacheSize - size);
maxCacheSize = size;
@@ -381,7 +390,7 @@ void Cache::setMaxCacheSize(size_t size)
void Cache::makeSpace(size_t size)
{
boost::unique_lock<boost::recursive_mutex> s(lru_mutex);
boost::unique_lock<boost::mutex> s(lru_mutex);
_makeSpace(size);
}
@@ -397,55 +406,69 @@ void Cache::_makeSpace(size_t size)
if (thisMuch <= 0)
return;
struct stat statbuf;
LRU_t::iterator it = lru.begin();
while (it != lru.end() && thisMuch > 0)
LRU_t::iterator it;
while (thisMuch > 0 && !lru.empty())
{
if (doNotEvict.find(it) != doNotEvict.end())
it = lru.begin();
// find the first element not being either read() right now or being processed by another
// makeSpace() call.
while (it != lru.end())
{
if ((doNotEvict.find(it) == doNotEvict.end()) && (toBeDeleted.find(it) == toBeDeleted.end()))
break;
++it;
continue; // it's in the do-not-evict list
}
bf::path cachedFile = prefix / *it;
int err = stat(cachedFile.string().c_str(), &statbuf);
if (err)
{
logger->log(LOG_WARNING, "Cache::makeSpace(): There seems to be a cached file that couldn't be stat'ed: %s",
cachedFile.string().c_str());
++it;
continue;
}
assert(bf::exists(cachedFile));
/*
tell Synchronizer that this key will be evicted
delete the file
remove it from our structs
update current size
*/
assert(currentCacheSize >= (size_t) statbuf.st_size);
currentCacheSize -= statbuf.st_size;
thisMuch -= statbuf.st_size;
//logger->log(LOG_WARNING, "Cache: flushing!");
toBeDeleted.insert(it);
lru_mutex.unlock();
try
{
Synchronizer::get()->flushObject(*it);
cachedFile = prefix / *it; // Sync may have renamed it
}
catch (...)
{
// it gets logged by Sync
lru_mutex.lock();
toBeDeleted.erase(it);
continue;
}
lru_mutex.lock();
TBD_t::iterator tbd_it = toBeDeleted.find(it);
if (tbd_it != toBeDeleted.end())
{
// if it's still in toBeDeleted then it is safe to delete.
// if read() happened to access it while it was flushing, it will not
// be in that set.
cachedFile = prefix / *it;
toBeDeleted.erase(tbd_it);
m_lru.erase(*it);
lru.erase(it);
size_t newSize = bf::file_size(cachedFile);
replicator->remove(cachedFile, Replicator::LOCAL_ONLY);
LRU_t::iterator toRemove = it++;
m_lru.erase(*toRemove);
lru.erase(toRemove);
currentCacheSize -= newSize;
thisMuch -= newSize;
}
}
}
void Cache::rename(const string &oldKey, const string &newKey, ssize_t sizediff)
{
boost::unique_lock<boost::recursive_mutex> s(lru_mutex);
boost::unique_lock<boost::mutex> s(lru_mutex);
auto it = m_lru.find(oldKey);
//assert(it != m_lru.end());
if (it == m_lru.end())
{
logger->log(LOG_WARNING, "Cache: was told to rename %s, but it is not in the cache", oldKey.c_str());
return;
}
auto lit = it->lit;
m_lru.erase(it);
@@ -459,7 +482,7 @@ int Cache::ifExistsThenDelete(const string &key)
bf::path cachedPath = prefix / key;
bf::path journalPath = journalPrefix / (key + ".journal");
boost::unique_lock<boost::recursive_mutex> s(lru_mutex);
boost::unique_lock<boost::mutex> s(lru_mutex);
bool objectExists = false;
bool journalExists = bf::exists(journalPath);
@@ -489,14 +512,14 @@ size_t Cache::getCurrentCacheSize() const
size_t Cache::getCurrentCacheElementCount() const
{
boost::unique_lock<boost::recursive_mutex> s(lru_mutex);
boost::unique_lock<boost::mutex> s(lru_mutex);
assert(m_lru.size() == lru.size());
return m_lru.size();
}
void Cache::reset()
{
boost::unique_lock<boost::recursive_mutex> s(lru_mutex);
boost::unique_lock<boost::mutex> s(lru_mutex);
m_lru.clear();
lru.clear();
@@ -541,6 +564,11 @@ inline bool Cache::DNEEquals::operator()(const DNEElement &l1, const DNEElement
return (*(l1.key) == *(l2.key));
}
inline bool Cache::TBDLess::operator()(const LRU_t::iterator &i1, const LRU_t::iterator &i2) const
{
return *i1 < *i2;
}
}

View File

@@ -93,7 +93,7 @@ class Cache : public boost::noncopyable
typedef std::unordered_set<M_LRU_element_t, KeyHasher, KeyEquals> M_LRU_t;
M_LRU_t m_lru; // the LRU entries as a hash table
/* The do-not-evict list stuff */
/* The do-not-evict list stuff. Unused at the moment for simplicity. */
struct DNEElement
{
DNEElement(const LRU_t::iterator &);
@@ -115,7 +115,21 @@ class Cache : public boost::noncopyable
DNE_t doNotEvict;
void addToDNE(const LRU_t::iterator &key);
void removeFromDNE(const LRU_t::iterator &key);
mutable boost::recursive_mutex lru_mutex; // protects the main cache structures & the do-not-evict set
// the to-be-deleted set. Elements removed from the LRU but not yet deleted will be here.
// Elements are inserted and removed by makeSpace(). If read() references a file that is in this,
// it will remove it, signalling to makeSpace that it should not be deleted
struct TBDLess
{
bool operator()(const LRU_t::iterator &, const LRU_t::iterator &) const;
};
typedef std::set<LRU_t::iterator, TBDLess> TBD_t;
TBD_t toBeDeleted;
mutable boost::mutex lru_mutex; // protects the main cache structures & the do-not-evict set
};

View File

@@ -33,7 +33,7 @@ Downloader::~Downloader()
{
}
void Downloader::useThisLock(boost::recursive_mutex *mutex)
void Downloader::useThisLock(boost::mutex *mutex)
{
lock = mutex;
}
@@ -192,7 +192,7 @@ void Downloader::setDownloadPath(const string &path)
}
/* The helper fcns */
Downloader::Download::Download(const string &source, const string &_dlPath, boost::recursive_mutex *_lock) :
Downloader::Download::Download(const string &source, const string &_dlPath, boost::mutex *_lock) :
dlPath(_dlPath), key(source), dl_errno(0), size(0), lock(_lock), finished(false), itRan(false)
{
}

View File

@@ -27,12 +27,12 @@ class Downloader
// errors are reported through errnos
void download(const std::vector<const std::string *> &keys, std::vector<int> *errnos, std::vector<size_t> *sizes);
void setDownloadPath(const std::string &path);
void useThisLock(boost::recursive_mutex *);
void useThisLock(boost::mutex *);
private:
uint maxDownloads;
std::string downloadPath;
boost::recursive_mutex *lock;
boost::mutex *lock;
class DownloadListener
{
@@ -50,14 +50,14 @@ class Downloader
*/
struct Download : public ThreadPool::Job
{
Download(const std::string &source, const std::string &_dlPath, boost::recursive_mutex *_lock);
Download(const std::string &source, const std::string &_dlPath, boost::mutex *_lock);
~Download();
void operator()();
boost::filesystem::path dlPath;
const std::string key;
int dl_errno; // to propagate errors from the download job to the caller
size_t size;
boost::recursive_mutex *lock;
boost::mutex *lock;
bool finished, itRan;
std::vector<DownloadListener *> listeners;
};

View File

@@ -565,7 +565,7 @@ int IOCoordinator::truncate(const char *path, size_t newSize)
else
{
meta.updateEntryLength(objects[0].offset, newSize - objects[0].offset);
assert(objects[0].offset >= 0 && objects[0].offset < (newSize - objects[0].offset));
assert(objects[0].offset >= 0 && objects[0].length > (newSize - objects[0].offset));
}
for (uint i = 1; i < objects.size(); i++)
meta.removeEntry(objects[i].offset);

View File

@@ -67,7 +67,7 @@ Synchronizer::~Synchronizer()
or save the list it's working on.....
For milestone 2, this will do the safe thing and finish working first.
Later we can get fancy. */
boost::unique_lock<boost::recursive_mutex> lock(mutex);
boost::unique_lock<boost::mutex> lock(mutex);
die = true;
syncThread.interrupt();
lock.unlock();
@@ -84,7 +84,7 @@ enum OpFlags
void Synchronizer::newJournalEntry(const string &key)
{
boost::unique_lock<boost::recursive_mutex> s(mutex);
boost::unique_lock<boost::mutex> s(mutex);
auto it = pendingOps.find(key);
if (it != pendingOps.end())
@@ -98,7 +98,7 @@ void Synchronizer::newJournalEntry(const string &key)
void Synchronizer::newObjects(const vector<string> &keys)
{
boost::unique_lock<boost::recursive_mutex> s(mutex);
boost::unique_lock<boost::mutex> s(mutex);
for (const string &key : keys)
{
@@ -110,7 +110,7 @@ void Synchronizer::newObjects(const vector<string> &keys)
void Synchronizer::deletedObjects(const vector<string> &keys)
{
boost::unique_lock<boost::recursive_mutex> s(mutex);
boost::unique_lock<boost::mutex> s(mutex);
for (const string &key : keys)
{
@@ -127,7 +127,7 @@ void Synchronizer::deletedObjects(const vector<string> &keys)
void Synchronizer::flushObject(const string &key)
{
boost::unique_lock<boost::recursive_mutex> s(mutex);
boost::unique_lock<boost::mutex> s(mutex);
// if there is something to do on key, it should be either in pendingOps or opsInProgress
// if it is is pending ops, start the job now. If it is in progress, wait for it to finish.
@@ -139,7 +139,10 @@ void Synchronizer::flushObject(const string &key)
if (it != pendingOps.end())
{
objNames.push_front(key);
process(objNames.begin());
auto nameIt = objNames.begin();
s.unlock();
process(nameIt);
s.lock();
}
else
{
@@ -193,7 +196,7 @@ void Synchronizer::flushObject(const string &key)
void Synchronizer::periodicSync()
{
boost::unique_lock<boost::recursive_mutex> lock(mutex);
boost::unique_lock<boost::mutex> lock(mutex);
while (!die)
{
lock.unlock();
@@ -224,7 +227,7 @@ void Synchronizer::process(list<string>::iterator name)
if not, return
*/
boost::unique_lock<boost::recursive_mutex> s(mutex);
boost::unique_lock<boost::mutex> s(mutex);
string &key = *name;
auto it = pendingOps.find(key);
@@ -259,6 +262,7 @@ void Synchronizer::process(list<string>::iterator name)
bool success = false;
while (!success)
{
assert(!s.owns_lock());
try {
// Exceptions should only happen b/c of cloud service errors that can't be retried.
// This code is intentionally racy to avoid having to grab big locks.
@@ -341,7 +345,7 @@ void Synchronizer::synchronizeDelete(const string &sourceFile, list<string>::ite
cs->deleteObject(*it);
// delete any pending jobs for *it. There shouldn't be any, this is out of pure paranoia.
boost::unique_lock<boost::recursive_mutex> sc(mutex);
boost::unique_lock<boost::mutex> sc(mutex);
pendingOps.erase(*it);
}
@@ -518,7 +522,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
void Synchronizer::rename(const string &oldKey, const string &newKey)
{
boost::unique_lock<boost::recursive_mutex> s(mutex);
boost::unique_lock<boost::mutex> s(mutex);
auto it = pendingOps.find(oldKey);
if (it == pendingOps.end())
@@ -558,7 +562,7 @@ void Synchronizer::PendingOps::notify()
condvar.notify_all();
}
void Synchronizer::PendingOps::wait(boost::recursive_mutex *m)
void Synchronizer::PendingOps::wait(boost::mutex *m)
{
while (!finished)
{

View File

@@ -56,7 +56,7 @@ class Synchronizer : public boost::noncopyable
int waiters;
bool finished;
boost::condition condvar;
void wait(boost::recursive_mutex *);
void wait(boost::mutex *);
void notify();
};
@@ -94,7 +94,7 @@ class Synchronizer : public boost::noncopyable
boost::filesystem::path cachePath;
boost::filesystem::path journalPath;
boost::recursive_mutex mutex;
boost::mutex mutex;
};
}