diff --git a/storage-manager/src/Cache.cpp b/storage-manager/src/Cache.cpp index 05a70adf3..de7b22c16 100644 --- a/storage-manager/src/Cache.cpp +++ b/storage-manager/src/Cache.cpp @@ -355,6 +355,20 @@ void Cache::configListener() logger->log(LOG_CRIT, "Cache/cache_size is not a number. Using current value = %zi",maxCacheSize); } } + +void Cache::repopulate() +{ + boost::unique_lock sl(lru_mutex); + + for (auto &pcache : prefixCaches) + pcache.second->repopulate(); +} + +void Cache::repopulate(const boost::filesystem::path &p) +{ + getPCache(p).repopulate(); +} + } diff --git a/storage-manager/src/Cache.h b/storage-manager/src/Cache.h index 12396550a..3ae57a342 100644 --- a/storage-manager/src/Cache.h +++ b/storage-manager/src/Cache.h @@ -92,6 +92,11 @@ class Cache : public boost::noncopyable , public ConfigListener void shutdown(); void printKPIs() const; + // Used to update accounting variables in the PrefixCaches when a potential error + // is detected. + void repopulate(); + void repopulate(const boost::filesystem::path &prefix); + // test helpers const boost::filesystem::path &getCachePath() const; const boost::filesystem::path &getJournalPath() const; diff --git a/storage-manager/src/IOCoordinator.cpp b/storage-manager/src/IOCoordinator.cpp index 0b79d864a..e86ab2c83 100644 --- a/storage-manager/src/IOCoordinator.cpp +++ b/storage-manager/src/IOCoordinator.cpp @@ -505,6 +505,8 @@ ssize_t IOCoordinator::_write(const boost::filesystem::path &filename, const uin dataRemaining -= err; count += err; iocBytesWritten += err; + // get a new name for the object + newObject.key = metadata.getNewKeyFromOldKey(newObject.key, err + objectOffset); metadata.updateEntryLength(newObject.offset, (err + objectOffset)); cache->newObject(firstDir, newObject.key,err + objectOffset); newObjectKeys.push_back(newObject.key); @@ -634,14 +636,17 @@ ssize_t IOCoordinator::append(const char *_filename, const uint8_t *data, size_t count += err; dataRemaining -= err; iocBytesWritten += err; + if (err < (int64_t) writeLength) + { + newObject.key = metadata.getNewKeyFromOldKey(newObject.key, err + newObject.offset); + metadata.updateEntry(newObject.offset, newObject.key, err + newObject.offset); + } cache->newObject(firstDir, newObject.key,err); newObjectKeys.push_back(newObject.key); if (err < (int64_t) writeLength) { //logger->log(LOG_ERR,"IOCoordinator::append(): newObject failed to complete write, %u of %u bytes written.",count,length); - // make the object reflect length actually written - metadata.updateEntryLength(newObject.offset, err); goto out; } } @@ -1135,7 +1140,10 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con objFD = ::open(object, O_RDONLY); if (objFD < 0) + { + *_bytesReadOut = 0; return ret; + } ScopedCloser s1(objFD); ret.reset(new uint8_t[len]); @@ -1148,11 +1156,12 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con int err = ::read(objFD, &ret[count], len - count); if (err < 0) { - char buf[80]; - logger->log(LOG_CRIT, "IOC::mergeJournal(): failed to read %s, got '%s'", object, strerror_r(errno, buf, 80)); int l_errno = errno; + char buf[80]; + logger->log(LOG_CRIT, "IOC::mergeJournal(): failed to read %s, got '%s'", object, strerror_r(l_errno, buf, 80)); ret.reset(); errno = l_errno; + *_bytesReadOut = count; return ret; } else if (err == 0) @@ -1171,17 +1180,18 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con size_t mjimBytesRead = 0; int mjimerr = mergeJournalInMem(ret, len, journal, &mjimBytesRead); if (mjimerr) - { ret.reset(); - return ret; - } l_bytesRead += mjimBytesRead; + *_bytesReadOut = l_bytesRead; return ret; } journalFD = ::open(journal, O_RDONLY); if (journalFD < 0) + { + *_bytesReadOut = l_bytesRead; return ret; + } ScopedCloser s2(journalFD); boost::shared_array headertxt = seekToEndOfHeader1(journalFD, &l_bytesRead); @@ -1219,17 +1229,21 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con err = ::read(journalFD, &ret[startReadingAt - offset + count], lengthOfRead - count); if (err < 0) { + int l_errno = errno; char buf[80]; - logger->log(LOG_ERR, "mergeJournal: got %s", strerror_r(errno, buf, 80)); + logger->log(LOG_ERR, "mergeJournal: got %s", strerror_r(l_errno, buf, 80)); ret.reset(); - return ret; + errno = l_errno; + l_bytesRead += count; + goto out; } else if (err == 0) { logger->log(LOG_ERR, "mergeJournal: got early EOF. offset=%ld, len=%ld, jOffset=%ld, jLen=%ld," " startReadingAt=%ld, lengthOfRead=%ld", offset, len, offlen[0], offlen[1], startReadingAt, lengthOfRead); ret.reset(); - return ret; + l_bytesRead += count; + goto out; } count += err; } @@ -1243,6 +1257,7 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con // skip over this journal entry ::lseek(journalFD, offlen[1], SEEK_CUR); } +out: *_bytesReadOut = l_bytesRead; return ret; } diff --git a/storage-manager/src/PrefixCache.cpp b/storage-manager/src/PrefixCache.cpp index e29845b03..99a65dbba 100644 --- a/storage-manager/src/PrefixCache.cpp +++ b/storage-manager/src/PrefixCache.cpp @@ -127,12 +127,20 @@ PrefixCache::~PrefixCache() */ } -void PrefixCache::populate() +void PrefixCache::repopulate() +{ + lru_mutex.lock(); + populate(false); +} + +void PrefixCache::populate(bool useSync) { Synchronizer *sync = Synchronizer::get(); bf::directory_iterator dir(cachePrefix); bf::directory_iterator dend; vector newObjects; + lru.clear(); + m_lru.clear(); while (dir != dend) { // put everything in lru & m_lru @@ -143,13 +151,15 @@ void PrefixCache::populate() auto last = lru.end(); m_lru.insert(--last); currentCacheSize += bf::file_size(*dir); - newObjects.push_back(p.filename().string()); + if (useSync) + newObjects.push_back(p.filename().string()); } else if (p != cachePrefix/downloader->getTmpPath()) logger->log(LOG_WARNING, "Cache: found something in the cache that does not belong '%s'", p.string().c_str()); ++dir; } - sync->newObjects(firstDir, newObjects); + if (useSync) + sync->newObjects(firstDir, newObjects); newObjects.clear(); // account for what's in the journal dir @@ -164,7 +174,8 @@ void PrefixCache::populate() { size_t s = bf::file_size(*dir); currentCacheSize += s; - newJournals.push_back(pair(p.stem().string(), s)); + if (useSync) + newJournals.push_back(pair(p.stem().string(), s)); } else logger->log(LOG_WARNING, "Cache: found a file in the journal dir that does not belong '%s'", p.string().c_str()); @@ -174,7 +185,8 @@ void PrefixCache::populate() ++dir; } lru_mutex.unlock(); - sync->newJournalEntries(firstDir, newJournals); + if (useSync) + sync->newJournalEntries(firstDir, newJournals); } // be careful using this! SM should be idle. No ongoing reads or writes. @@ -380,14 +392,25 @@ void PrefixCache::newJournalEntry(size_t size) void PrefixCache::deletedJournal(size_t size) { boost::unique_lock s(lru_mutex); - assert(currentCacheSize >= size); - currentCacheSize -= size; + + //assert(currentCacheSize >= size); + if (currentCacheSize >= size) + currentCacheSize -= size; + else + { + ostringstream oss; + oss << "PrefixCache::deletedJournal(): Detected an accounting error." << + " Reloading cache metadata, this will pause IO activity briefly."; + logger->log(LOG_WARNING, oss.str().c_str()); + populate(false); + } } void PrefixCache::deletedObject(const string &key, size_t size) { boost::unique_lock s(lru_mutex); - assert(currentCacheSize >= size); + + //assert(currentCacheSize >= size); M_LRU_t::iterator mit = m_lru.find(key); assert(mit != m_lru.end()); @@ -397,7 +420,16 @@ void PrefixCache::deletedObject(const string &key, size_t size) doNotEvict.erase(mit->lit); lru.erase(mit->lit); m_lru.erase(mit); - currentCacheSize -= size; + if (currentCacheSize >= size) + currentCacheSize -= size; + else + { + ostringstream oss; + oss << "PrefixCache::deletedObject(): Detected an accounting error." << + " Reloading cache metadata, this will pause IO activity briefly."; + logger->log(LOG_WARNING, oss.str().c_str()); + populate(false); + } } } diff --git a/storage-manager/src/PrefixCache.h b/storage-manager/src/PrefixCache.h index 1121275c7..da6b7fb66 100644 --- a/storage-manager/src/PrefixCache.h +++ b/storage-manager/src/PrefixCache.h @@ -77,6 +77,11 @@ class PrefixCache : public boost::noncopyable size_t getMaxCacheSize() const; void shutdown(); + // clears out cache structures and reloads them from cache/journal dir contents + // needed to potentially repair the cache's accounting error after detecting + // an error. + void repopulate(); + // test helpers const boost::filesystem::path &getCachePath(); const boost::filesystem::path &getJournalPath(); @@ -97,7 +102,9 @@ class PrefixCache : public boost::noncopyable SMLogging *logger; Downloader *downloader; - void populate(); + // useSync makes populate() tell Synchronizer about what it finds. + // set it to false when the system is already fully up. + void populate(bool useSync = true); void _makeSpace(size_t size); /* The main PrefixCache structures */ diff --git a/storage-manager/src/Replicator.cpp b/storage-manager/src/Replicator.cpp index 6e387726c..82492736e 100644 --- a/storage-manager/src/Replicator.cpp +++ b/storage-manager/src/Replicator.cpp @@ -128,7 +128,7 @@ int Replicator::newObject(const boost::filesystem::path &filename, const uint8_t OPEN(objectFilename.c_str(), O_WRONLY | O_CREAT); size_t count = 0; while (count < length) { - err = ::pwrite(fd, &data[count], length - count, offset); + err = ::pwrite(fd, &data[count], length - count, offset + count); if (err <= 0) { if (count > 0) // return what was successfully written diff --git a/storage-manager/src/Synchronizer.cpp b/storage-manager/src/Synchronizer.cpp index ee47b9dfa..7276227cf 100644 --- a/storage-manager/src/Synchronizer.cpp +++ b/storage-manager/src/Synchronizer.cpp @@ -683,7 +683,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list while (count < size) { - err = ::write(newFD, data.get(), size - count); + err = ::write(newFD, &data[count], size - count); if (err < 0) { ::unlink(newCachePath.string().c_str()); @@ -693,8 +693,19 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list count += err; } numBytesWritten += size; - assert(bf::file_size(oldCachePath) == MetadataFile::getLengthFromKey(cloudKey)); + + //assert(bf::file_size(oldCachePath) == MetadataFile::getLengthFromKey(cloudKey)); cache->rename(prefix, cloudKey, newCloudKey, size - MetadataFile::getLengthFromKey(cloudKey)); + if (bf::file_size(oldCachePath) != MetadataFile::getLengthFromKey(cloudKey)) + { + ostringstream oss; + oss << "Synchronizer::synchronizeWithJournal(): detected a mismatch between file size and " << + "length stored in the object name. object name = " << cloudKey << " length-in-name = " << + MetadataFile::getLengthFromKey(cloudKey) << " real-length = " << bf::file_size(oldCachePath) + << ". Reloading cache metadata, this will pause IO activity briefly."; + logger->log(LOG_WARNING, oss.str().c_str()); + cache->repopulate(prefix); + } replicator->remove(oldCachePath); }