From 4b9c1d9169f1991db847087c3e59b07c3a72ffce Mon Sep 17 00:00:00 2001 From: Patrick LeBlanc Date: Thu, 21 May 2020 16:28:32 -0400 Subject: [PATCH] Cleaned up a little code of previous commit, added retry loops and a little better error handling to the code that writes journal entries. --- storage-manager/src/Replicator.cpp | 100 +++++++++++++++++++++++---- storage-manager/src/Replicator.h | 5 ++ storage-manager/src/Synchronizer.cpp | 12 ++-- 3 files changed, 100 insertions(+), 17 deletions(-) diff --git a/storage-manager/src/Replicator.cpp b/storage-manager/src/Replicator.cpp index 82492736e..b14f1107d 100644 --- a/storage-manager/src/Replicator.cpp +++ b/storage-manager/src/Replicator.cpp @@ -154,6 +154,67 @@ int Replicator::newNullObject(const boost::filesystem::path &filename,size_t len return err; } +ssize_t Replicator::_pwrite(int fd, const void *data, size_t length, off_t offset) +{ + ssize_t err; + size_t count = 0; + uint8_t *bData = (uint8_t *) data; + + do + { + err = ::pwrite(fd, &bData[count], length - count, offset + count); + if (err < 0 || (err == 0 && errno != EINTR)) + { + if (count > 0) + return count; + else + return err; + } + err += count; + } while (count < length); + + return count; +} + +ssize_t Replicator::_write(int fd, const void *data, size_t length) +{ + ssize_t err; + size_t count = 0; + uint8_t *bData = (uint8_t *) data; + + do + { + err = ::write(fd, &bData[count], length - count); + if (err < 0 || (err == 0 && errno != EINTR)) + { + if (count > 0) + return count; + else + return err; + } + err += count; + } while (count < length); + + return count; +} + +/* XXXPAT: I think we'll have to rewrite this function some; we'll have to at least clearly define + what happens in the various error scenarios. + + To be more resilent in the face of hard errors, we may also want to redefine what a journal file is. + If/when we cannot fix the journal file in the face of an error, there are scenarios that the read code + will not be able to cope with. Ex, a journal entry that says it's 200 bytes long, but there are only + really 100 bytes. The read code has no way to tell the difference if there is an entry that follows + the bad entry, and that will cause an unrecoverable error. + + Initial thought on a sol'n. Make each journal entry its own file in a tmp dir, ordered by a sequence + number in the filename. Then, one entry cannot affect the others, and the end of the file is unambiguously + the end of the data. On successful write, move the file to where it should be. This would also prevent + the readers from ever seeing bad data, and possibly reduce the size of some critical sections. + + Benefits would be data integrity, and possibly add'l parallelism. The downside is of course, a higher + number of IO ops for the same operation. +*/ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const uint8_t *data, off_t offset, size_t length) { int fd, err; @@ -177,7 +238,7 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u bHeaderChanged = true; // create new journal file with header string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str(); - err = ::write(fd, header.c_str(), header.length() + 1); + err = _write(fd, header.c_str(), header.length() + 1); l_errno = errno; repHeaderDataWritten += (header.length() + 1); if ((uint)err != (header.length() + 1)) @@ -238,26 +299,32 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u { bHeaderChanged = true; string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str(); - err = ::pwrite(fd, header.c_str(), header.length() + 1,0); + err = _pwrite(fd, header.c_str(), header.length() + 1,0); + l_errno = errno; repHeaderDataWritten += (header.length() + 1); if ((uint)err != (header.length() + 1)) { // only the header was possibly changed rollback attempt mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Updating journal header failed. " "Attempting to rollback and continue."); - int rollbackErr = ::pwrite(fd, headerRollback.c_str(), headerRollback.length() + 1,0); + int rollbackErr = _pwrite(fd, headerRollback.c_str(), headerRollback.length() + 1,0); if ((uint)rollbackErr == (headerRollback.length() + 1)) mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header success."); else mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header failed!"); - return err; + errno = l_errno; + if (err < 0) + return err; + else + return 0; } } } off_t entryHeaderOffset = ::lseek(fd, 0, SEEK_END); - err = ::write(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE); + err = _write(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE); + l_errno = errno; repHeaderDataWritten += JOURNAL_ENTRY_HEADER_SIZE; if (err != JOURNAL_ENTRY_HEADER_SIZE) { @@ -266,12 +333,16 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u { mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: write journal entry header failed. Attempting to rollback and continue."); //attempt to rollback top level header - int rollbackErr = ::pwrite(fd, headerRollback.c_str(), headerRollback.length() + 1,0); + int rollbackErr = _pwrite(fd, headerRollback.c_str(), headerRollback.length() + 1,0); if ((uint)rollbackErr != (headerRollback.length() + 1)) { mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header failed! (%s)", strerror_r(errno, errbuf, 80)); - return err; + errno = l_errno; + if (err < 0) + return err; + else + return 0; } } int rollbackErr = ::ftruncate(fd,entryHeaderOffset); @@ -279,13 +350,16 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u { mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Truncate to previous EOF failed! (%s)", strerror_r(errno, errbuf, 80)); - return err; + if (err < 0) + return err; + else + return 0; } + l_errno = errno; return err; - } while (count < length) { - err = ::write(fd, &data[count], length - count); + err = _write(fd, &data[count], length - count); if (err < 0 ) { l_errno = errno; @@ -301,7 +375,7 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u if (thisEntryMaxOffset > currentMaxOffset) { string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str(); - int rollbackErr = ::pwrite(fd, header.c_str(), header.length() + 1,0); + int rollbackErr = _pwrite(fd, header.c_str(), header.length() + 1,0); if ((uint)rollbackErr != (header.length() + 1)) { mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Update of journal header failed! (%s)", @@ -312,7 +386,7 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u } // Update the journal entry header offlen[1] = count; - int rollbackErr = ::pwrite(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE,entryHeaderOffset); + int rollbackErr = _pwrite(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE,entryHeaderOffset); if ((uint)rollbackErr != JOURNAL_ENTRY_HEADER_SIZE) { mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Update of journal entry header failed! (%s)", @@ -337,7 +411,7 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u "Attempting to rollback and continue.", strerror_r(l_errno, errbuf, 80)); //attempt to rollback top level header string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % 0).str(); - int rollbackErr = ::pwrite(fd, header.c_str(), header.length() + 1,0); + int rollbackErr = _pwrite(fd, header.c_str(), header.length() + 1,0); if ((uint)rollbackErr != (header.length() + 1)) { mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header failed (%s)!", diff --git a/storage-manager/src/Replicator.h b/storage-manager/src/Replicator.h index d4a6942f2..4cf96ac92 100644 --- a/storage-manager/src/Replicator.h +++ b/storage-manager/src/Replicator.h @@ -58,6 +58,11 @@ class Replicator private: Replicator(); + + // a couple helpers + ssize_t _write(int fd, const void *data, size_t len); + ssize_t _pwrite(int fd, const void *data, size_t len, off_t offset); + Config *mpConfig; SMLogging *mpLogger; std::string msJournalPath; diff --git a/storage-manager/src/Synchronizer.cpp b/storage-manager/src/Synchronizer.cpp index 7276227cf..b8e07ae39 100644 --- a/storage-manager/src/Synchronizer.cpp +++ b/storage-manager/src/Synchronizer.cpp @@ -694,9 +694,14 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list } numBytesWritten += size; - //assert(bf::file_size(oldCachePath) == MetadataFile::getLengthFromKey(cloudKey)); - cache->rename(prefix, cloudKey, newCloudKey, size - MetadataFile::getLengthFromKey(cloudKey)); - if (bf::file_size(oldCachePath) != MetadataFile::getLengthFromKey(cloudKey)) + size_t oldSize = bf::file_size(oldCachePath); + + cache->rename(prefix, cloudKey, newCloudKey, size - oldSize); + replicator->remove(oldCachePath); + + // This condition is probably irrelevant for correct functioning now, + // but it should be very rare so what the hell. + if (oldSize != MetadataFile::getLengthFromKey(cloudKey)) { ostringstream oss; oss << "Synchronizer::synchronizeWithJournal(): detected a mismatch between file size and " << @@ -706,7 +711,6 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list logger->log(LOG_WARNING, oss.str().c_str()); cache->repopulate(prefix); } - replicator->remove(oldCachePath); } mergeDiff += size - originalSize;