1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-12 11:01:17 +03:00

Wrote simplified versions of Cache::read() and Downloader::download()

to make it easier to find & fix bugs.  Then fixed a bug that
would result in a stalled read().  Later, it will be worth it
re-complicate things to get more lock granulatity.
This commit is contained in:
Patrick LeBlanc
2019-05-03 16:40:41 -05:00
parent 3983247982
commit 09eec724c8
3 changed files with 192 additions and 39 deletions

View File

@@ -91,6 +91,7 @@ Cache::Cache() : currentCacheSize(0)
//cout << "Cache got prefix " << prefix << endl; //cout << "Cache got prefix " << prefix << endl;
downloader.setDownloadPath(prefix.string()); downloader.setDownloadPath(prefix.string());
downloader.useThisLock(&lru_mutex);
stmp = conf->getValue("ObjectStorage", "journal_path"); stmp = conf->getValue("ObjectStorage", "journal_path");
if (stmp.empty()) if (stmp.empty())
@@ -155,6 +156,9 @@ void Cache::populate()
} }
} }
#if 0
/* Need to simplify this, we keep running into sneaky problems, and I just spotted a couple more.
Just going to rewrite it. We can revisit later if the simplified version needs improvement */
void Cache::read(const vector<string> &keys) void Cache::read(const vector<string> &keys)
{ {
/* /*
@@ -178,19 +182,21 @@ void Cache::read(const vector<string> &keys)
// not in the cache, put it in the list to download // not in the cache, put it in the list to download
keysToFetch.push_back(&key); keysToFetch.push_back(&key);
} }
s.unlock(); //s.unlock();
// start downloading the keys to fetch // start downloading the keys to fetch
// TODO: there should be a path for the common case, which is that there is nothing
// to download.
vector<int> dl_errnos; vector<int> dl_errnos;
vector<size_t> sizes; vector<size_t> sizes;
if (!keysToFetch.empty()) if (!keysToFetch.empty())
downloader.download(keysToFetch, &dl_errnos, &sizes); downloader.download(keysToFetch, &dl_errnos, &sizes, lru_mutex);
size_t sum_sizes = 0; size_t sum_sizes = 0;
for (size_t &size : sizes) for (size_t &size : sizes)
sum_sizes += size; sum_sizes += size;
s.lock(); //s.lock();
// do makespace() before downloading. Problem is, until the download is finished, this fcn can't tell which // do makespace() before downloading. Problem is, until the download is finished, this fcn can't tell which
// downloads it was responsible for. Need Downloader to make the call...? // downloads it was responsible for. Need Downloader to make the call...?
_makeSpace(sum_sizes); _makeSpace(sum_sizes);
@@ -209,8 +215,8 @@ void Cache::read(const vector<string> &keys)
else if (dl_errnos[i] == 0) // successful download else if (dl_errnos[i] == 0) // successful download
{ {
lru.push_back(keys[i]); lru.push_back(keys[i]);
LRU_t::iterator it = lru.end(); LRU_t::iterator lit = lru.end();
m_lru.insert(M_LRU_element_t(--it)); m_lru.insert(M_LRU_element_t(--lit));
} }
else else
{ {
@@ -223,6 +229,58 @@ void Cache::read(const vector<string> &keys)
} }
} }
} }
#endif
// new simplified version
void Cache::read(const vector<string> &keys)
{
/* Move existing keys to the back of the LRU, start downloading nonexistant keys.
Going to skip the do-not-evict set in this version, and will assume that the cache
is large enough that an entry at the back of the LRU won't be evicted before the
caller can open the corresponding file
*/
vector<const string *> keysToFetch;
vector<int> dlErrnos;
vector<size_t> dlSizes;
boost::unique_lock<boost::recursive_mutex> s(lru_mutex);
M_LRU_t::iterator mit;
for (const string &key : keys)
{
mit = m_lru.find(key);
if (mit != m_lru.end())
lru.splice(lru.end(), lru, mit->lit);
else
keysToFetch.push_back(&key);
}
if (keysToFetch.empty())
return;
assert(s.owns_lock());
downloader.download(keysToFetch, &dlErrnos, &dlSizes);
assert(s.owns_lock());
size_t sum_sizes = 0;
for (uint i = 0; i < keysToFetch.size(); ++i)
{
// downloads with size 0 didn't actually happen, either because it
// was a preexisting download (another read() call owns it), or because
// there was an error downloading it. Use size == 0 as an indication of
// what to add to the cache
if (dlSizes[i] != 0)
{
sum_sizes += dlSizes[i];
lru.push_back(*keysToFetch[i]);
LRU_t::iterator lit = lru.end();
m_lru.insert(M_LRU_element_t(--lit)); // I dislike this way of grabbing the last iterator in a list.
}
}
// fix cache size
_makeSpace(sum_sizes);
currentCacheSize += sum_sizes;
}
Cache::DNEElement::DNEElement(const LRU_t::iterator &k) : key(k), refCount(1) Cache::DNEElement::DNEElement(const LRU_t::iterator &k) : key(k), refCount(1)
{ {
@@ -235,7 +293,7 @@ void Cache::addToDNE(const LRU_t::iterator &key)
if (it != doNotEvict.end()) if (it != doNotEvict.end())
{ {
DNEElement &dnee = const_cast<DNEElement &>(*it); DNEElement &dnee = const_cast<DNEElement &>(*it);
++dnee.refCount; ++(dnee.refCount);
} }
else else
doNotEvict.insert(e); doNotEvict.insert(e);
@@ -248,7 +306,7 @@ void Cache::removeFromDNE(const LRU_t::iterator &key)
if (it == doNotEvict.end()) if (it == doNotEvict.end())
return; return;
DNEElement &dnee = const_cast<DNEElement &>(*it); DNEElement &dnee = const_cast<DNEElement &>(*it);
if (--dnee.refCount == 0) if (--(dnee.refCount) == 0)
doNotEvict.erase(it); doNotEvict.erase(it);
} }
@@ -368,7 +426,7 @@ void Cache::_makeSpace(size_t size)
assert(currentCacheSize >= (size_t) statbuf.st_size); assert(currentCacheSize >= (size_t) statbuf.st_size);
currentCacheSize -= statbuf.st_size; currentCacheSize -= statbuf.st_size;
thisMuch -= statbuf.st_size; thisMuch -= statbuf.st_size;
logger->log(LOG_WARNING, "Cache: flushing! Try to avoid this, it may deadlock!"); //logger->log(LOG_WARNING, "Cache: flushing! Try to avoid this, it may deadlock!");
Synchronizer::get()->flushObject(*it); Synchronizer::get()->flushObject(*it);
replicator->remove(cachedFile.string().c_str(), Replicator::LOCAL_ONLY); replicator->remove(cachedFile.string().c_str(), Replicator::LOCAL_ONLY);
LRU_t::iterator toRemove = it++; LRU_t::iterator toRemove = it++;

View File

@@ -25,6 +25,7 @@ Downloader::Downloader() : maxDownloads(0)
if (maxDownloads == 0) if (maxDownloads == 0)
maxDownloads = 20; maxDownloads = 20;
workers.setMaxThreads(maxDownloads); workers.setMaxThreads(maxDownloads);
workers.setName("Downloader");
logger = SMLogging::get(); logger = SMLogging::get();
} }
@@ -32,11 +33,21 @@ Downloader::~Downloader()
{ {
} }
inline boost::mutex & Downloader::getDownloadMutex() void Downloader::useThisLock(boost::recursive_mutex *mutex)
{ {
return download_mutex; lock = mutex;
} }
/*
inline boost::mutex * Downloader::getDownloadMutex()
{
return &download_mutex;
}
*/
#if 0
/* This is another fcn with a bug too subtle to identify right now. Writing up a simplified
version; we can revisit later if necessary */
void Downloader::download(const vector<const string *> &keys, vector<int> *errnos, vector<size_t> *sizes) void Downloader::download(const vector<const string *> &keys, vector<int> *errnos, vector<size_t> *sizes)
{ {
uint counter = keys.size(); uint counter = keys.size();
@@ -106,58 +117,133 @@ void Downloader::download(const vector<const string *> &keys, vector<int> *errno
logger->log(LOG_ERR, "Downloader: failed to download %s, got '%s'", keys[i]->c_str(), strerror_r(dl->dl_errno, buf, 80)); logger->log(LOG_ERR, "Downloader: failed to download %s, got '%s'", keys[i]->c_str(), strerror_r(dl->dl_errno, buf, 80));
} }
} }
#endif
void Downloader::download(const vector<const string *> &keys, vector<int> *errnos, vector<size_t> *sizes)
{
uint counter = keys.size();
boost::condition condvar;
DownloadListener listener(&counter, &condvar);
vector<boost::shared_ptr<Download> > ownedDownloads(keys.size());
// the caller is holding a mutex
/* if a key is already being downloaded, attach listener to that Download instance.
if it is not already being downloaded, make a new Download instance.
wait for the listener to tell us that it's done.
*/
for (uint i = 0; i < keys.size(); i++)
{
boost::shared_ptr<Download> newDL(new Download(*keys[i], downloadPath, lock));
auto it = downloads.find(newDL); // kinda sucks to have to search this way.
if (it == downloads.end())
{
newDL->listeners.push_back(&listener);
ownedDownloads[i] = newDL;
downloads.insert(newDL);
workers.addJob(newDL);
}
else
{
//assert((*it)->key == *keys[i]);
//cout << "Waiting for the existing download of " << *keys[i] << endl;
// a Download is technically in the map until all of the Downloads issued by its owner
// have finished. So, we have to test whether this existing download has already finished
// or not before waiting for it. We could have Downloads remove themselves on completion.
// TBD. Need to just get this working first.
if ((*it)->finished)
--counter;
else
(*it)->listeners.push_back(&listener);
}
}
// wait for the downloads to finish
while (counter > 0)
condvar.wait(*lock);
// check success, gather sizes from downloads started by this thread
sizes->resize(keys.size());
errnos->resize(keys.size());
char buf[80];
for (uint i = 0; i < keys.size(); i++)
{
if (ownedDownloads[i])
{
assert(ownedDownloads[i]->finished);
(*sizes)[i] = ownedDownloads[i]->size;
(*errnos)[i] = ownedDownloads[i]->dl_errno;
if ((*errnos)[i])
logger->log(LOG_ERR, "Downloader: failed to download %s, got '%s'", keys[i]->c_str(),
strerror_r((*errnos)[i], buf, 80));
downloads.erase(ownedDownloads[i]);
}
else
{
(*sizes)[i] = 0;
(*errnos)[i] = 0;
}
}
}
void Downloader::setDownloadPath(const string &path) void Downloader::setDownloadPath(const string &path)
{ {
downloadPath = path; downloadPath = path;
} }
inline const string & Downloader::getDownloadPath() const /* The helper fcns */
Downloader::Download::Download(const string &source, const string &_dlPath, boost::recursive_mutex *_lock) :
dlPath(_dlPath), key(source), dl_errno(0), size(0), lock(_lock), finished(false), itRan(false)
{ {
return downloadPath;
} }
/* The helper fcns */ Downloader::Download::~Download()
Downloader::Download::Download(const string *source, Downloader *dl) : dler(dl), key(source), dl_errno(0), size(0)
{ {
assert(!itRan || finished);
} }
void Downloader::Download::operator()() void Downloader::Download::operator()()
{ {
itRan = true;
CloudStorage *storage = CloudStorage::get(); CloudStorage *storage = CloudStorage::get();
int err = storage->getObject(*key, dler->getDownloadPath() + "/" + *key, &size); // TODO: we should have it download to a tmp path, then mv it to the cache dir to avoid
// Cache seeing an incomplete download after a crash
int err = storage->getObject(key, (dlPath / key).string(), &size);
if (err != 0) if (err != 0)
{ {
boost::filesystem::remove(dler->getDownloadPath() + "/" + *key);
size = 0;
dl_errno = errno; dl_errno = errno;
boost::filesystem::remove(dlPath / key);
size = 0;
} }
boost::unique_lock<boost::mutex> s(dler->getDownloadMutex()); lock->lock();
for (auto &listener : listeners) finished = true;
listener->downloadFinished(); for (uint i = 0; i < listeners.size(); i++)
listeners[i]->downloadFinished();
lock->unlock();
} }
//Downloader::DownloadListener::DownloadListener(volatile uint *counter, boost::condition *condvar, boost::mutex *m) : count(counter), cond(condvar), mutex(m) Downloader::DownloadListener::DownloadListener(uint *_counter, boost::condition *condvar) : counter(_counter), cond(condvar)
Downloader::DownloadListener::DownloadListener(uint *counter, boost::condition *condvar, boost::mutex *m) : count(counter), cond(condvar), mutex(m)
{ {
} }
void Downloader::DownloadListener::downloadFinished() void Downloader::DownloadListener::downloadFinished()
{ {
boost::unique_lock<boost::mutex> u(*mutex); (*counter)--;
if (--(*count) == 0) if ((*counter) == 0)
cond->notify_all(); cond->notify_all();
} }
inline size_t Downloader::DLHasher::operator()(const boost::shared_ptr<Download> &d) const inline size_t Downloader::DLHasher::operator()(const boost::shared_ptr<Download> &d) const
{ {
return hash<string>()(*(d->key)); // since the keys start with a uuid, we can probably get away with just returning the first 8 chars
// or as a compromise, hashing only the first X chars. For later.
return hash<string>()(d->key);
} }
inline bool Downloader::DLEquals::operator()(const boost::shared_ptr<Download> &d1, const boost::shared_ptr<Download> &d2) const inline bool Downloader::DLEquals::operator()(const boost::shared_ptr<Download> &d1, const boost::shared_ptr<Download> &d2) const
{ {
return (*(d1->key) == *(d2->key)); return (d1->key == d2->key);
} }
} }

View File

@@ -12,6 +12,7 @@
#include <boost/scoped_ptr.hpp> #include <boost/scoped_ptr.hpp>
#include <boost/thread/mutex.hpp> #include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp> #include <boost/thread/condition.hpp>
#include <boost/filesystem/path.hpp>
namespace storagemanager namespace storagemanager
{ {
@@ -26,33 +27,38 @@ class Downloader
// errors are reported through errnos // errors are reported through errnos
void download(const std::vector<const std::string *> &keys, std::vector<int> *errnos, std::vector<size_t> *sizes); void download(const std::vector<const std::string *> &keys, std::vector<int> *errnos, std::vector<size_t> *sizes);
void setDownloadPath(const std::string &path); void setDownloadPath(const std::string &path);
const std::string & getDownloadPath() const; void useThisLock(boost::recursive_mutex *);
private: private:
uint maxDownloads; uint maxDownloads;
std::string downloadPath; std::string downloadPath;
boost::recursive_mutex *lock;
class DownloadListener class DownloadListener
{ {
public: public:
DownloadListener(uint *counter, boost::condition *condvar, boost::mutex *m); DownloadListener(uint *counter, boost::condition *condvar);
//DownloadListener(volatile uint *counter, boost::condition *condvar, boost::mutex *m);
void downloadFinished(); void downloadFinished();
private: private:
uint *count; uint *counter;
//volatile uint *count;
boost::condition *cond; boost::condition *cond;
boost::mutex *mutex;
}; };
/* Possible optimization. Downloads used to use pointers to strings to avoid an extra copy.
Out of paranoid during debugging, I made it copy the strings instead for a clearer lifecycle.
However, it _should_ be safe to do.
*/
struct Download : public ThreadPool::Job struct Download : public ThreadPool::Job
{ {
Download(const std::string *source, Downloader *); Download(const std::string &source, const std::string &_dlPath, boost::recursive_mutex *_lock);
~Download();
void operator()(); void operator()();
Downloader *dler; boost::filesystem::path dlPath;
const std::string *key; const std::string key;
int dl_errno; // to propagate errors from the download job to the caller int dl_errno; // to propagate errors from the download job to the caller
size_t size; size_t size;
boost::recursive_mutex *lock;
bool finished, itRan;
std::vector<DownloadListener *> listeners; std::vector<DownloadListener *> listeners;
}; };
@@ -68,8 +74,11 @@ class Downloader
typedef std::unordered_set<boost::shared_ptr<Download>, DLHasher, DLEquals> Downloads_t; typedef std::unordered_set<boost::shared_ptr<Download>, DLHasher, DLEquals> Downloads_t;
Downloads_t downloads; Downloads_t downloads;
boost::mutex download_mutex;
boost::mutex &getDownloadMutex(); // something is not working right with this lock design, need to simplify.
// for now, download will use Cache's lock for everything.
//boost::mutex download_mutex;
//boost::mutex *getDownloadMutex();
ThreadPool workers; ThreadPool workers;
CloudStorage *storage; CloudStorage *storage;
SMLogging *logger; SMLogging *logger;