diff --git a/src/Cache.cpp b/src/Cache.cpp index c6044974a..967cf677a 100644 --- a/src/Cache.cpp +++ b/src/Cache.cpp @@ -91,6 +91,7 @@ Cache::Cache() : currentCacheSize(0) //cout << "Cache got prefix " << prefix << endl; downloader.setDownloadPath(prefix.string()); + downloader.useThisLock(&lru_mutex); stmp = conf->getValue("ObjectStorage", "journal_path"); if (stmp.empty()) @@ -155,6 +156,9 @@ void Cache::populate() } } +#if 0 +/* Need to simplify this, we keep running into sneaky problems, and I just spotted a couple more. + Just going to rewrite it. We can revisit later if the simplified version needs improvement */ void Cache::read(const vector &keys) { /* @@ -178,19 +182,21 @@ void Cache::read(const vector &keys) // not in the cache, put it in the list to download keysToFetch.push_back(&key); } - s.unlock(); + //s.unlock(); // start downloading the keys to fetch + // TODO: there should be a path for the common case, which is that there is nothing + // to download. vector dl_errnos; vector sizes; if (!keysToFetch.empty()) - downloader.download(keysToFetch, &dl_errnos, &sizes); + downloader.download(keysToFetch, &dl_errnos, &sizes, lru_mutex); size_t sum_sizes = 0; for (size_t &size : sizes) sum_sizes += size; - s.lock(); + //s.lock(); // do makespace() before downloading. Problem is, until the download is finished, this fcn can't tell which // downloads it was responsible for. Need Downloader to make the call...? _makeSpace(sum_sizes); @@ -209,8 +215,8 @@ void Cache::read(const vector &keys) else if (dl_errnos[i] == 0) // successful download { lru.push_back(keys[i]); - LRU_t::iterator it = lru.end(); - m_lru.insert(M_LRU_element_t(--it)); + LRU_t::iterator lit = lru.end(); + m_lru.insert(M_LRU_element_t(--lit)); } else { @@ -223,6 +229,58 @@ void Cache::read(const vector &keys) } } } +#endif + +// new simplified version +void Cache::read(const vector &keys) +{ + /* Move existing keys to the back of the LRU, start downloading nonexistant keys. + Going to skip the do-not-evict set in this version, and will assume that the cache + is large enough that an entry at the back of the LRU won't be evicted before the + caller can open the corresponding file + */ + vector keysToFetch; + vector dlErrnos; + vector dlSizes; + + boost::unique_lock s(lru_mutex); + + M_LRU_t::iterator mit; + for (const string &key : keys) + { + mit = m_lru.find(key); + if (mit != m_lru.end()) + lru.splice(lru.end(), lru, mit->lit); + else + keysToFetch.push_back(&key); + } + if (keysToFetch.empty()) + return; + + assert(s.owns_lock()); + downloader.download(keysToFetch, &dlErrnos, &dlSizes); + assert(s.owns_lock()); + + size_t sum_sizes = 0; + for (uint i = 0; i < keysToFetch.size(); ++i) + { + // downloads with size 0 didn't actually happen, either because it + // was a preexisting download (another read() call owns it), or because + // there was an error downloading it. Use size == 0 as an indication of + // what to add to the cache + if (dlSizes[i] != 0) + { + sum_sizes += dlSizes[i]; + lru.push_back(*keysToFetch[i]); + LRU_t::iterator lit = lru.end(); + m_lru.insert(M_LRU_element_t(--lit)); // I dislike this way of grabbing the last iterator in a list. + } + } + + // fix cache size + _makeSpace(sum_sizes); + currentCacheSize += sum_sizes; +} Cache::DNEElement::DNEElement(const LRU_t::iterator &k) : key(k), refCount(1) { @@ -235,7 +293,7 @@ void Cache::addToDNE(const LRU_t::iterator &key) if (it != doNotEvict.end()) { DNEElement &dnee = const_cast(*it); - ++dnee.refCount; + ++(dnee.refCount); } else doNotEvict.insert(e); @@ -248,7 +306,7 @@ void Cache::removeFromDNE(const LRU_t::iterator &key) if (it == doNotEvict.end()) return; DNEElement &dnee = const_cast(*it); - if (--dnee.refCount == 0) + if (--(dnee.refCount) == 0) doNotEvict.erase(it); } @@ -368,7 +426,7 @@ void Cache::_makeSpace(size_t size) assert(currentCacheSize >= (size_t) statbuf.st_size); currentCacheSize -= statbuf.st_size; thisMuch -= statbuf.st_size; - logger->log(LOG_WARNING, "Cache: flushing! Try to avoid this, it may deadlock!"); + //logger->log(LOG_WARNING, "Cache: flushing! Try to avoid this, it may deadlock!"); Synchronizer::get()->flushObject(*it); replicator->remove(cachedFile.string().c_str(), Replicator::LOCAL_ONLY); LRU_t::iterator toRemove = it++; diff --git a/src/Downloader.cpp b/src/Downloader.cpp index 2bdeb3277..6a15f5e50 100644 --- a/src/Downloader.cpp +++ b/src/Downloader.cpp @@ -25,6 +25,7 @@ Downloader::Downloader() : maxDownloads(0) if (maxDownloads == 0) maxDownloads = 20; workers.setMaxThreads(maxDownloads); + workers.setName("Downloader"); logger = SMLogging::get(); } @@ -32,11 +33,21 @@ Downloader::~Downloader() { } -inline boost::mutex & Downloader::getDownloadMutex() +void Downloader::useThisLock(boost::recursive_mutex *mutex) { - return download_mutex; + lock = mutex; } +/* +inline boost::mutex * Downloader::getDownloadMutex() +{ + return &download_mutex; +} +*/ + +#if 0 +/* This is another fcn with a bug too subtle to identify right now. Writing up a simplified + version; we can revisit later if necessary */ void Downloader::download(const vector &keys, vector *errnos, vector *sizes) { uint counter = keys.size(); @@ -106,58 +117,133 @@ void Downloader::download(const vector &keys, vector *errno logger->log(LOG_ERR, "Downloader: failed to download %s, got '%s'", keys[i]->c_str(), strerror_r(dl->dl_errno, buf, 80)); } } +#endif + +void Downloader::download(const vector &keys, vector *errnos, vector *sizes) +{ + uint counter = keys.size(); + boost::condition condvar; + DownloadListener listener(&counter, &condvar); + vector > ownedDownloads(keys.size()); + + // the caller is holding a mutex + /* if a key is already being downloaded, attach listener to that Download instance. + if it is not already being downloaded, make a new Download instance. + wait for the listener to tell us that it's done. + */ + for (uint i = 0; i < keys.size(); i++) + { + boost::shared_ptr newDL(new Download(*keys[i], downloadPath, lock)); + auto it = downloads.find(newDL); // kinda sucks to have to search this way. + if (it == downloads.end()) + { + newDL->listeners.push_back(&listener); + ownedDownloads[i] = newDL; + downloads.insert(newDL); + workers.addJob(newDL); + } + else + { + //assert((*it)->key == *keys[i]); + //cout << "Waiting for the existing download of " << *keys[i] << endl; + + // a Download is technically in the map until all of the Downloads issued by its owner + // have finished. So, we have to test whether this existing download has already finished + // or not before waiting for it. We could have Downloads remove themselves on completion. + // TBD. Need to just get this working first. + if ((*it)->finished) + --counter; + else + (*it)->listeners.push_back(&listener); + } + } + + // wait for the downloads to finish + while (counter > 0) + condvar.wait(*lock); + + // check success, gather sizes from downloads started by this thread + sizes->resize(keys.size()); + errnos->resize(keys.size()); + char buf[80]; + for (uint i = 0; i < keys.size(); i++) + { + if (ownedDownloads[i]) + { + assert(ownedDownloads[i]->finished); + (*sizes)[i] = ownedDownloads[i]->size; + (*errnos)[i] = ownedDownloads[i]->dl_errno; + if ((*errnos)[i]) + logger->log(LOG_ERR, "Downloader: failed to download %s, got '%s'", keys[i]->c_str(), + strerror_r((*errnos)[i], buf, 80)); + downloads.erase(ownedDownloads[i]); + } + else + { + (*sizes)[i] = 0; + (*errnos)[i] = 0; + } + } +} void Downloader::setDownloadPath(const string &path) { downloadPath = path; } - -inline const string & Downloader::getDownloadPath() const -{ - return downloadPath; -} /* The helper fcns */ -Downloader::Download::Download(const string *source, Downloader *dl) : dler(dl), key(source), dl_errno(0), size(0) +Downloader::Download::Download(const string &source, const string &_dlPath, boost::recursive_mutex *_lock) : + dlPath(_dlPath), key(source), dl_errno(0), size(0), lock(_lock), finished(false), itRan(false) { } +Downloader::Download::~Download() +{ + assert(!itRan || finished); +} + void Downloader::Download::operator()() { + itRan = true; CloudStorage *storage = CloudStorage::get(); - int err = storage->getObject(*key, dler->getDownloadPath() + "/" + *key, &size); + // TODO: we should have it download to a tmp path, then mv it to the cache dir to avoid + // Cache seeing an incomplete download after a crash + int err = storage->getObject(key, (dlPath / key).string(), &size); if (err != 0) { - boost::filesystem::remove(dler->getDownloadPath() + "/" + *key); - size = 0; dl_errno = errno; + boost::filesystem::remove(dlPath / key); + size = 0; } - boost::unique_lock s(dler->getDownloadMutex()); - for (auto &listener : listeners) - listener->downloadFinished(); + lock->lock(); + finished = true; + for (uint i = 0; i < listeners.size(); i++) + listeners[i]->downloadFinished(); + lock->unlock(); } -//Downloader::DownloadListener::DownloadListener(volatile uint *counter, boost::condition *condvar, boost::mutex *m) : count(counter), cond(condvar), mutex(m) -Downloader::DownloadListener::DownloadListener(uint *counter, boost::condition *condvar, boost::mutex *m) : count(counter), cond(condvar), mutex(m) +Downloader::DownloadListener::DownloadListener(uint *_counter, boost::condition *condvar) : counter(_counter), cond(condvar) { } void Downloader::DownloadListener::downloadFinished() { - boost::unique_lock u(*mutex); - if (--(*count) == 0) + (*counter)--; + if ((*counter) == 0) cond->notify_all(); } inline size_t Downloader::DLHasher::operator()(const boost::shared_ptr &d) const { - return hash()(*(d->key)); + // since the keys start with a uuid, we can probably get away with just returning the first 8 chars + // or as a compromise, hashing only the first X chars. For later. + return hash()(d->key); } inline bool Downloader::DLEquals::operator()(const boost::shared_ptr &d1, const boost::shared_ptr &d2) const { - return (*(d1->key) == *(d2->key)); + return (d1->key == d2->key); } } diff --git a/src/Downloader.h b/src/Downloader.h index fdf326f0c..e4876a802 100644 --- a/src/Downloader.h +++ b/src/Downloader.h @@ -12,6 +12,7 @@ #include #include #include +#include namespace storagemanager { @@ -26,33 +27,38 @@ class Downloader // errors are reported through errnos void download(const std::vector &keys, std::vector *errnos, std::vector *sizes); void setDownloadPath(const std::string &path); - const std::string & getDownloadPath() const; + void useThisLock(boost::recursive_mutex *); private: uint maxDownloads; std::string downloadPath; + boost::recursive_mutex *lock; class DownloadListener { public: - DownloadListener(uint *counter, boost::condition *condvar, boost::mutex *m); - //DownloadListener(volatile uint *counter, boost::condition *condvar, boost::mutex *m); + DownloadListener(uint *counter, boost::condition *condvar); void downloadFinished(); private: - uint *count; - //volatile uint *count; + uint *counter; boost::condition *cond; - boost::mutex *mutex; }; + /* Possible optimization. Downloads used to use pointers to strings to avoid an extra copy. + Out of paranoid during debugging, I made it copy the strings instead for a clearer lifecycle. + However, it _should_ be safe to do. + */ struct Download : public ThreadPool::Job { - Download(const std::string *source, Downloader *); + Download(const std::string &source, const std::string &_dlPath, boost::recursive_mutex *_lock); + ~Download(); void operator()(); - Downloader *dler; - const std::string *key; + boost::filesystem::path dlPath; + const std::string key; int dl_errno; // to propagate errors from the download job to the caller size_t size; + boost::recursive_mutex *lock; + bool finished, itRan; std::vector listeners; }; @@ -68,8 +74,11 @@ class Downloader typedef std::unordered_set, DLHasher, DLEquals> Downloads_t; Downloads_t downloads; - boost::mutex download_mutex; - boost::mutex &getDownloadMutex(); + + // something is not working right with this lock design, need to simplify. + // for now, download will use Cache's lock for everything. + //boost::mutex download_mutex; + //boost::mutex *getDownloadMutex(); ThreadPool workers; CloudStorage *storage; SMLogging *logger;