diff --git a/src/IOCoordinator.cpp b/src/IOCoordinator.cpp index 0de93b936..018ef56f9 100755 --- a/src/IOCoordinator.cpp +++ b/src/IOCoordinator.cpp @@ -110,7 +110,7 @@ int IOCoordinator::loadObjectAndJournal(const char *objFilename, const char *jou { boost::shared_array argh; - argh = mergeJournal(objFilename, journalFilename, offset, &length); + argh = mergeJournal(objFilename, journalFilename, offset, length); if (!argh) return -1; else @@ -874,7 +874,7 @@ int IOCoordinator::mergeJournal(int objFD, int journalFD, uint8_t *buf, off_t of } boost::shared_array IOCoordinator::mergeJournal(const char *object, const char *journal, off_t offset, - size_t *len) const + size_t len) const { int objFD, journalFD; boost::shared_array ret; @@ -896,24 +896,28 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con boost::property_tree::ptree header; boost::property_tree::json_parser::read_json(ss, header); assert(header.get("version") == 1); - size_t maxJournalOffset = header.get("max_offset"); + //size_t maxJournalOffset = header.get("max_offset"); + #if 0 struct stat objStat; fstat(objFD, &objStat); - if (*len == 0) + if (len == 0) // read to the end of the file - *len = max(maxJournalOffset + 1, (size_t) objStat.st_size) - offset; + len = max(maxJournalOffset + 1, (size_t) objStat.st_size) - offset; else // make sure len is within the bounds of the data - *len = min(*len, (max(maxJournalOffset + 1, (size_t) objStat.st_size) - offset)); + len = min(*len, (max(maxJournalOffset + 1, (size_t) objStat.st_size) - offset)); ret.reset(new uint8_t[*len]); + #endif + + ret.reset(new uint8_t[len]); // read the object into memory size_t count = 0; ::lseek(objFD, offset, SEEK_SET); - while (count < *len) { - int err = ::read(objFD, &ret[count], *len - count); + while (count < len) { + int err = ::read(objFD, &ret[count], len - count); if (err < 0) { char buf[80]; @@ -927,7 +931,7 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con { // at the EOF of the object. The journal may contain entries that append to the data, // so 0-fill the remaining bytes. - memset(&ret[count], 0, *len-count); + memset(&ret[count], 0, len-count); break; } count += err; @@ -943,7 +947,7 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con // if this entry overlaps, read the overlapping section uint64_t lastJournalOffset = offlen[0] + offlen[1]; - uint64_t lastBufOffset = offset + *len; + uint64_t lastBufOffset = offset + len; if (offlen[0] <= lastBufOffset && lastJournalOffset >= (uint64_t) offset) { uint64_t startReadingAt = max(offlen[0], (uint64_t) offset); @@ -984,7 +988,7 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con } // MergeJournalInMem is a specialized version of mergeJournal(). TODO: refactor if possible. -int IOCoordinator::mergeJournalInMem(boost::shared_array &objData, size_t *len, const char *journalPath) const +int IOCoordinator::mergeJournalInMem(boost::shared_array &objData, size_t len, const char *journalPath) const { int journalFD = ::open(journalPath, O_RDONLY); if (journalFD < 0) @@ -998,8 +1002,9 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array &objData, size boost::property_tree::ptree header; boost::property_tree::json_parser::read_json(ss, header); assert(header.get("version") == 1); - size_t maxJournalOffset = header.get("max_offset"); + //size_t maxJournalOffset = header.get("max_offset"); + #if 0 if (maxJournalOffset > *len) { uint8_t *newbuf = new uint8_t[maxJournalOffset + 1]; @@ -1008,6 +1013,7 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array &objData, size objData.reset(newbuf); *len = maxJournalOffset + 1; } + #endif // start processing the entries while (1) @@ -1018,8 +1024,12 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array &objData, size break; uint64_t startReadingAt = offlen[0]; - uint64_t lengthOfRead = offlen[1]; - + uint64_t lengthOfRead; + if (offlen[0] + offlen[1] > len) + lengthOfRead = len - offlen[0]; + else + lengthOfRead = offlen[1]; + uint count = 0; while (count < lengthOfRead) { diff --git a/src/IOCoordinator.h b/src/IOCoordinator.h index b83ee915d..3dc663625 100644 --- a/src/IOCoordinator.h +++ b/src/IOCoordinator.h @@ -42,12 +42,11 @@ class IOCoordinator : public boost::noncopyable int copyFile(const char *filename1, const char *filename2); // The shared logic for merging a journal file with its base file. - // *len should be set to the length of the data requested (0 means read the whole file), - // on return *len will be the actual length returned. - boost::shared_array mergeJournal(const char *objectPath, const char *journalPath, off_t offset, size_t *len) const; + // len should be set to the length of the data requested + boost::shared_array mergeJournal(const char *objectPath, const char *journalPath, off_t offset, size_t len) const; // this version modifies object data in memory, given the journal filename - int mergeJournalInMem(boost::shared_array &objData, size_t *len, const char *journalPath) const; + int mergeJournalInMem(boost::shared_array &objData, size_t len, const char *journalPath) const; // this version takes already-open file descriptors, and an already-allocated buffer as input. // file descriptor are positioned, eh, best not to assume anything about their positions diff --git a/src/MetadataFile.cpp b/src/MetadataFile.cpp index c67c9c3ba..7c240ade7 100755 --- a/src/MetadataFile.cpp +++ b/src/MetadataFile.cpp @@ -301,6 +301,11 @@ int MetadataFile::writeMetadata(const char *filename) return error; } +const metadataObject & MetadataFile::getEntry(off_t offset) +{ + return *(mObjects.find(offset)); +} + void MetadataFile::removeEntry(off_t offset) { mObjects.erase(offset); diff --git a/src/MetadataFile.h b/src/MetadataFile.h index 98669b65c..ec279a0f6 100755 --- a/src/MetadataFile.h +++ b/src/MetadataFile.h @@ -54,6 +54,7 @@ class MetadataFile void updateEntry(off_t offset, const std::string &newName, size_t newLength); void updateEntryLength(off_t offset, size_t newLength); metadataObject addMetadataObject(const char *filename, size_t length); + const metadataObject &getEntry(off_t offset); void removeEntry(off_t offset); // TBD: this may have to go; there may be no use case where only the uuid needs to change. diff --git a/src/Synchronizer.cpp b/src/Synchronizer.cpp index e2eba059b..ff0903a3a 100644 --- a/src/Synchronizer.cpp +++ b/src/Synchronizer.cpp @@ -325,6 +325,14 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list ScopedWriteLock s(ioc, sourceFile); string &key = *lit; + MetadataFile md(sourceFile.c_str(), MetadataFile::no_create_t()); + if (!md.exists()) + { + logger->log(LOG_DEBUG, "synchronizeWithJournal(): no metadata found for %s", sourceFile.c_str()); + return; + } + const metadataObject &mdEntry = md.getEntry(MetadataFile::getOffsetFromKey(key)); + bf::path oldCachePath = cachePath / key; string journalName = (journalPath/ (key + ".journal")).string(); @@ -335,10 +343,10 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list // Revision ^^. It can happen if the object was deleted after the op was latched but before it runs. return; } - + int err; boost::shared_array data; - size_t count = 0, size = 0; + size_t count = 0, size = mdEntry.length; char buf[80]; bool oldObjIsCached = cache->exists(key); @@ -356,7 +364,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list } throw runtime_error(string("Synchronizer: getObject() failed: ") + strerror_r(errno, buf, 80)); } - err = ioc->mergeJournalInMem(data, &size, journalName.c_str()); + err = ioc->mergeJournalInMem(data, size, journalName.c_str()); if (err) { if (!bf::exists(journalName)) @@ -369,7 +377,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list } else { - data = ioc->mergeJournal(oldCachePath.string().c_str(), journalName.c_str(), 0, &size); + data = ioc->mergeJournal(oldCachePath.string().c_str(), journalName.c_str(), 0, size); if (!data) { if (!bf::exists(journalName)) @@ -424,7 +432,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list } // update the metadata for the source file - MetadataFile md(sourceFile.c_str()); + md.updateEntry(MetadataFile::getOffsetFromKey(key), newKey, size); replicator->updateMetadata(sourceFile.c_str(), md); diff --git a/src/unit_tests.cpp b/src/unit_tests.cpp index 8910f62ae..d0809d394 100755 --- a/src/unit_tests.cpp +++ b/src/unit_tests.cpp @@ -932,8 +932,8 @@ bool mergeJournalTest() int i; IOCoordinator *ioc = IOCoordinator::get(); - size_t len = 0; - boost::shared_array data = ioc->mergeJournal("test-object", "test-journal", 0, &len); + size_t len = 8192; + boost::shared_array data = ioc->mergeJournal("test-object", "test-journal", 0, len); assert(data); int *idata = (int *) data.get(); for (i = 0; i < 5; i++) @@ -946,7 +946,7 @@ bool mergeJournalTest() // try different range parameters // read at the beginning of the change len = 40; - data = ioc->mergeJournal("test-object", "test-journal", 20, &len); + data = ioc->mergeJournal("test-object", "test-journal", 20, len); assert(data); idata = (int *) data.get(); for (i = 0; i < 5; i++) @@ -956,7 +956,7 @@ bool mergeJournalTest() // read s.t. beginning of the change is in the middle of the range len = 24; - data = ioc->mergeJournal("test-object", "test-journal", 8, &len); + data = ioc->mergeJournal("test-object", "test-journal", 8, len); assert(data); idata = (int *) data.get(); for (i = 0; i < 3; i++) @@ -966,7 +966,7 @@ bool mergeJournalTest() // read s.t. end of the change is in the middle of the range len = 20; - data = ioc->mergeJournal("test-object", "test-journal", 28, &len); + data = ioc->mergeJournal("test-object", "test-journal", 28, len); assert(data); idata = (int *) data.get(); for (i = 0; i < 3; i++)