diff --git a/storage-manager/src/IOCoordinator.cpp b/storage-manager/src/IOCoordinator.cpp index 89682af1b..aa0b58c12 100644 --- a/storage-manager/src/IOCoordinator.cpp +++ b/storage-manager/src/IOCoordinator.cpp @@ -1083,19 +1083,6 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con if (objFD < 0) return ret; ScopedCloser s1(objFD); - journalFD = ::open(journal, O_RDONLY); - if (journalFD < 0) - return ret; - ScopedCloser s2(journalFD); - - // grab the journal header, make sure the version is 1, and get the max offset - - boost::shared_array headertxt = seekToEndOfHeader1(journalFD, &l_bytesRead); - stringstream ss; - ss << headertxt.get(); - boost::property_tree::ptree header; - boost::property_tree::json_parser::read_json(ss, header); - assert(header.get("version") == 1); ret.reset(new uint8_t[len]); @@ -1117,14 +1104,39 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con else if (err == 0) { // at the EOF of the object. The journal may contain entries that append to the data, - // so 0-fill the remaining bytes. - memset(&ret[count], 0, len-count); break; } count += err; } l_bytesRead += count; + // mergeJournalInMem has lower a IOPS requirement than the fully general code in this fcn. Use + // that if the caller requested the whole object to be merged + if (offset == 0 && (ssize_t) len >= ::lseek(objFD, 0, SEEK_END)) + { + size_t mjimBytesRead = 0; + int mjimerr = mergeJournalInMem(ret, len, journal, &mjimBytesRead); + if (mjimerr) + { + ret.reset(); + return ret; + } + l_bytesRead += mjimBytesRead; + return ret; + } + + journalFD = ::open(journal, O_RDONLY); + if (journalFD < 0) + return ret; + ScopedCloser s2(journalFD); + + boost::shared_array headertxt = seekToEndOfHeader1(journalFD, &l_bytesRead); + stringstream ss; + ss << headertxt.get(); + boost::property_tree::ptree header; + boost::property_tree::json_parser::read_json(ss, header); + assert(header.get("version") == 1); + // start processing the entries while (1) { @@ -1134,8 +1146,7 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con break; assert(err == 16); l_bytesRead += 16; - - //cout << "MJ: got offset " << offlen[0] << " length " << offlen[1] << endl; + // if this entry overlaps, read the overlapping section uint64_t lastJournalOffset = offlen[0] + offlen[1]; uint64_t lastBufOffset = offset + len; @@ -1144,8 +1155,6 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con uint64_t startReadingAt = max(offlen[0], (uint64_t) offset); uint64_t lengthOfRead = min(lastBufOffset, lastJournalOffset) - startReadingAt; - //cout << "MJ: startReadingAt = " << startReadingAt << " offlen[0] = " << offlen[0] << endl; - // seek to the portion of the entry to start reading at if (startReadingAt != offlen[0]) ::lseek(journalFD, startReadingAt - offlen[0], SEEK_CUR); @@ -1184,9 +1193,95 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con return ret; } -// MergeJournalInMem is a specialized version of mergeJournal(). TODO: refactor if possible. +// MergeJournalInMem is a specialized version of mergeJournal(). This is currently only used by Synchronizer +// and mergeJournal(), and only for merging the whole object with the whole journal. int IOCoordinator::mergeJournalInMem(boost::shared_array &objData, size_t len, const char *journalPath, size_t *_bytesReadOut) const +{ + // if the journal is over some size threshold (100MB for now why not), + // use the original low-mem-usage version + if (len > (100 << 20)) + return mergeJournalInMem_bigJ(objData, len, journalPath, _bytesReadOut); + + size_t l_bytesRead = 0; + int journalFD = ::open(journalPath, O_RDONLY); + if (journalFD < 0) + return -1; + ScopedCloser s(journalFD); + + // grab the journal header and make sure the version is 1 + boost::shared_array headertxt = seekToEndOfHeader1(journalFD, &l_bytesRead); + stringstream ss; + ss << headertxt.get(); + boost::property_tree::ptree header; + boost::property_tree::json_parser::read_json(ss, header); + assert(header.get("version") == 1); + + // read the journal file into memory + size_t journalBytes = ::lseek(journalFD, 0, SEEK_END) - l_bytesRead; + ::lseek(journalFD, l_bytesRead, SEEK_SET); + boost::scoped_array journalData(new uint8_t[journalBytes]); + size_t readCount = 0; + while (readCount < journalBytes) + { + ssize_t err = ::read(journalFD, &journalData[readCount], journalBytes - readCount); + if (err < 0) + { + char buf[80]; + int l_errno = errno; + logger->log(LOG_ERR, "mergeJournalInMem: got %s", strerror_r(errno, buf, 80)); + errno = l_errno; + return -1; + } + else if (err == 0) + { + logger->log(LOG_ERR, "mergeJournalInMem: got early EOF"); + errno = ENODATA; // is there a better errno for early EOF? + return -1; + } + readCount += err; + l_bytesRead += err; + } + + // start processing the entries + size_t offset = 0; + while (offset < journalBytes) + { + if (offset + 16 >= journalBytes) + { + logger->log(LOG_ERR, "mergeJournalInMem: got early EOF"); + errno = ENODATA; // is there a better errno for early EOF? + return -1; + } + uint64_t *offlen = (uint64_t *) &journalData[offset]; + offset += 16; + + uint64_t startReadingAt = offlen[0]; + uint64_t lengthOfRead = offlen[1]; + + if (startReadingAt > len) + { + offset += offlen[1]; + continue; + } + + if (startReadingAt + lengthOfRead > len) + lengthOfRead = len - startReadingAt; + if (offset + lengthOfRead > journalBytes) + { + logger->log(LOG_ERR, "mergeJournalInMem: got early EOF"); + errno = ENODATA; // is there a better errno for early EOF? + return -1; + } + memcpy(&objData[startReadingAt], &journalData[offset], lengthOfRead); + offset += offlen[1]; + } + *_bytesReadOut = l_bytesRead; + return 0; +} + +int IOCoordinator::mergeJournalInMem_bigJ(boost::shared_array &objData, size_t len, const char *journalPath, + size_t *_bytesReadOut) const { size_t l_bytesRead = 0; int journalFD = ::open(journalPath, O_RDONLY); @@ -1201,7 +1296,6 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array &objData, size boost::property_tree::ptree header; boost::property_tree::json_parser::read_json(ss, header); assert(header.get("version") == 1); - //size_t maxJournalOffset = header.get("max_offset"); // start processing the entries while (1) @@ -1222,23 +1316,15 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array &objData, size uint64_t startReadingAt = offlen[0]; uint64_t lengthOfRead = offlen[1]; - // XXXPAT: Speculative change. Got mem errors from writing past the end of objData. The length - // in the metadata is shorter than this journal entry, and not because it got crazy values. - // I think the explanation is a truncation. if (startReadingAt > len) { - //logger->log(LOG_CRIT, "mergeJournalInMem: skipping a theoretically irrelevant journal entry in %s. " - // "jstart = %llu, max = %llu", journalPath, startReadingAt, len); ::lseek(journalFD, offlen[1], SEEK_CUR); continue; } if (startReadingAt + lengthOfRead > len) - { - //logger->log(LOG_CRIT, "mergeJournalInMem: possibly bad journal entry in %s. jStart = %llu, jEnd = %llu, max = %llu", - // journalPath, startReadingAt, startReadingAt + lengthOfRead, len); lengthOfRead = len - startReadingAt; - } + uint count = 0; while (count < lengthOfRead) { @@ -1267,8 +1353,6 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array &objData, size return 0; } - - void IOCoordinator::readLock(const string &filename) { boost::unique_lock s(lockMutex); diff --git a/storage-manager/src/IOCoordinator.h b/storage-manager/src/IOCoordinator.h index d84bedb21..1783f6a76 100644 --- a/storage-manager/src/IOCoordinator.h +++ b/storage-manager/src/IOCoordinator.h @@ -63,10 +63,15 @@ class IOCoordinator : public boost::noncopyable boost::shared_array mergeJournal(const char *objectPath, const char *journalPath, off_t offset, size_t len, size_t *sizeRead) const; - // this version modifies object data in memory, given the journal filename + // this version modifies object data in memory, given the journal filename. Processes the whole object + // and whole journal file. int mergeJournalInMem(boost::shared_array &objData, size_t len, const char *journalPath, size_t *sizeRead) const; + // this version of MJIM has a higher IOPS requirement and lower mem usage. + int mergeJournalInMem_bigJ(boost::shared_array &objData, size_t len, const char *journalPath, + size_t *sizeRead) const; + // this version takes already-open file descriptors, and an already-allocated buffer as input. // file descriptor are positioned, eh, best not to assume anything about their positions // on return. diff --git a/storage-manager/src/Synchronizer.cpp b/storage-manager/src/Synchronizer.cpp index fb3c123b0..5fae02c31 100644 --- a/storage-manager/src/Synchronizer.cpp +++ b/storage-manager/src/Synchronizer.cpp @@ -669,7 +669,8 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list count += err; } numBytesWritten += size; - cache->rename(prefix, cloudKey, newCloudKey, size - bf::file_size(oldCachePath)); + assert(bf::file_size(oldCachePath) == MetadataFile:getLengthFromKey(cloudKey)); + cache->rename(prefix, cloudKey, newCloudKey, size - MetadataFile::getLengthFromKey(cloudKey)); replicator->remove(oldCachePath); } diff --git a/storage-manager/src/smls.cpp b/storage-manager/src/smls.cpp index 14ba636e9..2afdc2538 100644 --- a/storage-manager/src/smls.cpp +++ b/storage-manager/src/smls.cpp @@ -157,12 +157,12 @@ int main(int argc, char **argv) } char prefix[8192]; - makePathPrefix(prefix, 8192); + int prefixlen = makePathPrefix(prefix, 8192); if (SMOnline()) - lsOnline(strncat(prefix, argv[1], 8192)); + lsOnline(strncat(prefix, argv[1], 8192 - prefixlen)); else - lsOffline(strncat(prefix, argv[1], 8192)); + lsOffline(strncat(prefix, argv[1], 8192 - prefixlen)); return 0; } diff --git a/storage-manager/src/smput.cpp b/storage-manager/src/smput.cpp index 6451765b5..9cd24d040 100644 --- a/storage-manager/src/smput.cpp +++ b/storage-manager/src/smput.cpp @@ -172,9 +172,9 @@ int main(int argc, char **argv) int prefixlen = makePathPrefix(prefix, 8192); if (SMOnline()) - putOnline(strncat(prefix, argv[1], 8192), prefixlen); + putOnline(strncat(prefix, argv[1], 8192 - prefixlen), prefixlen); else - putOffline(strncat(prefix, argv[1], 8192), prefixlen); + putOffline(strncat(prefix, argv[1], 8192 - prefixlen), prefixlen); return 0; }