diff --git a/src/Cache.cpp b/src/Cache.cpp index 6693a0fb1..280e89e25 100644 --- a/src/Cache.cpp +++ b/src/Cache.cpp @@ -132,7 +132,7 @@ void Cache::populate() lru.push_back(p.filename().string()); auto last = lru.end(); m_lru.insert(--last); - currentCacheSize += bf::file_size(p); + currentCacheSize += bf::file_size(*dir); } else logger->log(LOG_WARNING, "Cache: found something in the cache that does not belong '%s'", p.string().c_str()); @@ -147,7 +147,7 @@ void Cache::populate() if (bf::is_regular_file(p)) { if (p.extension() == ".journal") - currentCacheSize += bf::file_size(p); + currentCacheSize += bf::file_size(*dir); else logger->log(LOG_WARNING, "Cache: found a file in the journal dir that does not belong '%s'", p.string().c_str()); } @@ -286,8 +286,17 @@ void Cache::read(const vector &keys) } else { + // There's window where the file has been downloaded but is not yet + // added to the lru structs. However it is in the DNE. If it is in the DNE, then it is also + // in Downloader's map. So, this thread needs to start the download if it's not in the + // DNE or if there's an existing download that hasn't finished yet. Starting the download + // includes waiting for an existing download to finish, which from this class's pov is the + // same thing. + if (doNotEvict.find(key) == doNotEvict.end() || downloader.inProgress(key)) + keysToFetch.push_back(&key); + else + cout << "Cache: detected and stopped a racey download" << endl; addToDNE(key); - keysToFetch.push_back(&key); } } if (keysToFetch.empty()) @@ -301,13 +310,22 @@ void Cache::read(const vector &keys) // downloads with size 0 didn't actually happen, either because it // was a preexisting download (another read() call owns it), or because // there was an error downloading it. Use size == 0 as an indication of - // what to add to the cache + // what to add to the cache. Also needs to verify that the file was not deleted, + // indicated by existence in doNotEvict. if (dlSizes[i] != 0) { - sum_sizes += dlSizes[i]; - lru.push_back(*keysToFetch[i]); - LRU_t::iterator lit = lru.end(); - m_lru.insert(--lit); // I dislike this way of grabbing the last iterator in a list. + if (doNotEvict.find(*keysToFetch[i]) != doNotEvict.end()) + { + sum_sizes += dlSizes[i]; + lru.push_back(*keysToFetch[i]); + LRU_t::iterator lit = lru.end(); + m_lru.insert(--lit); // I dislike this way of grabbing the last iterator in a list. + } + else // it was downloaded, but a deletion happened so we have to toss it + { + cout << "removing a file that was deleted by another thread during download" << endl; + bf::remove(prefix / (*keysToFetch[i])); + } } } @@ -329,9 +347,14 @@ void Cache::doneReading(const vector &keys) boost::unique_lock s(lru_mutex); for (const string &key : keys) { - const auto &it = m_lru.find(key); - if (it != m_lru.end()) - removeFromDNE(it->lit); + removeFromDNE(key); + // most should be in the map. + // debateable whether it's faster to look up the list iterator and use it + // or whether it's faster to bypass that and use strings only. + + //const auto &it = m_lru.find(key); + //if (it != m_lru.end()) + // removeFromDNE(it->lit); } _makeSpace(0); } @@ -588,12 +611,6 @@ int Cache::ifExistsThenDelete(const string &key) auto it = m_lru.find(key); if (it != m_lru.end()) { - if (doNotEvict.find(it->lit) != doNotEvict.end()) - { - cout << "almost deleted a file being read, are we really doing that somewhere?" << endl; - return 0; - } - if (toBeDeleted.find(it->lit) == toBeDeleted.end()) { doNotEvict.erase(it->lit); diff --git a/src/Downloader.cpp b/src/Downloader.cpp index ae988f0bb..6192c9fe5 100644 --- a/src/Downloader.cpp +++ b/src/Downloader.cpp @@ -186,6 +186,15 @@ void Downloader::download(const vector &keys, vector *errno } } +bool Downloader::inProgress(const string &key) +{ + boost::shared_ptr tmp(new Download(key)); + auto it = downloads.find(tmp); + if (it != downloads.end()) + return !(*it)->finished; + return false; +} + void Downloader::setDownloadPath(const string &path) { downloadPath = path; @@ -197,6 +206,11 @@ Downloader::Download::Download(const string &source, const string &_dlPath, boos { } +Downloader::Download::Download(const string &source) : + key(source), dl_errno(0), size(0), lock(NULL), finished(false), itRan(false) +{ +} + Downloader::Download::~Download() { assert(!itRan || finished); diff --git a/src/Downloader.h b/src/Downloader.h index 07d82a739..518658de4 100644 --- a/src/Downloader.h +++ b/src/Downloader.h @@ -28,6 +28,7 @@ class Downloader void download(const std::vector &keys, std::vector *errnos, std::vector *sizes); void setDownloadPath(const std::string &path); void useThisLock(boost::mutex *); + bool inProgress(const std::string &); private: uint maxDownloads; @@ -51,6 +52,7 @@ class Downloader struct Download : public ThreadPool::Job { Download(const std::string &source, const std::string &_dlPath, boost::mutex *_lock); + Download(const std::string &source); ~Download(); void operator()(); boost::filesystem::path dlPath; diff --git a/src/IOCoordinator.cpp b/src/IOCoordinator.cpp index 752947edd..78b290d83 100755 --- a/src/IOCoordinator.cpp +++ b/src/IOCoordinator.cpp @@ -488,7 +488,7 @@ int IOCoordinator::append(const char *_filename, const uint8_t *data, size_t len // had to add this hack to prevent deadlock out: - // need to release the file lock before calling the cache fcns. + // need to release the file lock before telling Cache that we're done writing. lock.unlock(); cache->doneWriting(); @@ -802,12 +802,13 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2) ScopedReadLock lock(this, filename1); ScopedWriteLock lock2(this, filename2); MetadataFile meta1(metaFile1); - MetadataFile meta2(metaFile2); + MetadataFile meta2(metaFile2.string().c_str(), MetadataFile::no_create_t()); vector objects = meta1.metadataRead(0, meta1.getLength()); - if (meta2.exists()) + if (meta2.exists()) { cout << "copyFile: overwriting a file" << endl; - meta2.removeAllEntries(); + meta2.removeAllEntries(); + } // TODO. I dislike large try-catch blocks, and large loops. Maybe a little refactoring is in order. try @@ -1089,19 +1090,34 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array &objData, size { uint64_t offlen[2]; int err = ::read(journalFD, &offlen, 16); - if (err != 16) // got EOF + if (err == 0) // got EOF break; + else if (err < 16) + { + // punting on this + cout << "mergeJournalInMem: failed to read a journal entry header in one attempt. fixme..." << endl; + errno = ENODATA; + return -1; + } uint64_t startReadingAt = offlen[0]; uint64_t lengthOfRead = offlen[1]; // XXXPAT: Speculative change. Got mem errors from writing past the end of objData. The length // in the metadata is shorter than this journal entry, and not because it got crazy values. - // I think the explanation is a truncation. Remove the log here if we see good results. + // I think the explanation is a truncation. + if (startReadingAt > len) + { + //logger->log(LOG_CRIT, "mergeJournalInMem: skipping a theoretically irrelevant journal entry in %s. " + // "jstart = %llu, max = %llu", journalPath, startReadingAt, len); + ::lseek(journalFD, offlen[1], SEEK_CUR); + continue; + } + if (startReadingAt + lengthOfRead > len) { - logger->log(LOG_CRIT, "mergeJournalInMem: possibly bad journal entry in %s. jStart = %lld, jEnd = %lld, max = %lld", - journalPath, startReadingAt, startReadingAt + lengthOfRead, len); + //logger->log(LOG_CRIT, "mergeJournalInMem: possibly bad journal entry in %s. jStart = %llu, jEnd = %llu, max = %llu", + // journalPath, startReadingAt, startReadingAt + lengthOfRead, len); lengthOfRead = len - startReadingAt; } uint count = 0; @@ -1124,6 +1140,8 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array &objData, size } count += err; } + if (lengthOfRead < offlen[1]) + ::lseek(journalFD, offlen[1] - lengthOfRead, SEEK_CUR); } return 0; } diff --git a/storagemanager.cnf b/storagemanager.cnf index 6195f992f..26c77ae58 100644 --- a/storagemanager.cnf +++ b/storagemanager.cnf @@ -20,6 +20,14 @@ bucket = s3-cs-test2 [LocalStorage] path = ${HOME}/storagemanager/fake-cloud +# introduce latency to fake-cloud operations. Useful for debugging. +fake_latency = n + +# values are randomized between 1 and max_latency in microseconds. +# values between 30000-50000 roughly simulate observed latency of S3 +# access from an EC2 node. +max_latency = 50000 + [Cache] cache_size = 2g path = ${HOME}/storagemanager/cache