From 208e3c300d9f26744669654b69123d41e1b45274 Mon Sep 17 00:00:00 2001 From: Patrick LeBlanc Date: Tue, 12 Mar 2019 11:51:47 -0500 Subject: [PATCH] First cut of the journal-merging code + the unit test for it. Looks like it's working. --- src/IOCoordinator.cpp | 150 ++++++++++++++++++++++++++++++++++++++++++ src/IOCoordinator.h | 12 ++++ src/unit_tests.cpp | 79 +++++++++++++++++++++- 3 files changed, 240 insertions(+), 1 deletion(-) diff --git a/src/IOCoordinator.cpp b/src/IOCoordinator.cpp index d11384d72..83ce780d8 100644 --- a/src/IOCoordinator.cpp +++ b/src/IOCoordinator.cpp @@ -7,8 +7,12 @@ #include #include #include +#include #include +#define max(x, y) (x > y ? x : y) +#define min(x, y) (x < y ? x : y) + using namespace std; namespace @@ -22,6 +26,18 @@ namespace storagemanager IOCoordinator::IOCoordinator() { + config = Config::get(); + logger = SMLogging::get(); + objectSize = 5 * (1<<20); + try + { + objectSize = stoul(config->getValue("ObjectStorage", "object_size")); + } + catch (...) + { + cerr << "ObjectStorage/object_size must be set to a numeric value" << endl; + throw; + } } IOCoordinator::~IOCoordinator() @@ -209,4 +225,138 @@ int IOCoordinator::copyFile(const char *filename1, const char *filename2) return err; } +// this is not generic by any means. This is assuming a version 1 journal header, and is looking +// for the end of it, which is the first \0 char. It returns with fd pointing at the +// first byte after the header. +// update: had to make it also return the header; the boost json parser does not stop at either +// a null char or the end of an object. +boost::shared_array seekToEndOfHeader1(int fd) +{ + ::lseek(fd, 0, SEEK_SET); + boost::shared_array ret(new char[100]); + int err; + + err = ::read(fd, ret.get(), 100); + if (err < 0) + { + char buf[80]; + throw runtime_error("seekToEndOfHeader1 got: " + string(strerror_r(errno, buf, 80))); + } + for (int i = 0; i < err; i++) + { + if (ret[i] == 0) + { + ::lseek(fd, i+1, SEEK_SET); + return ret; + } + } + throw runtime_error("seekToEndOfHeader1: did not find the end of the header"); +} + + +boost::shared_array IOCoordinator::mergeJournal(const char *object, const char *journal, off_t offset, size_t len) const +{ + int objFD, journalFD; + boost::shared_array ret; + + objFD = ::open(object, O_RDONLY); + if (objFD < 0) + return NULL; + scoped_closer s1(objFD); + journalFD = ::open(journal, O_RDONLY); + if (journalFD < 0) + return NULL; + scoped_closer s2(journalFD); + + // TODO: Right now this assumes that max object size has not been changed. + // ideally, we would have a way to look up the size of a specific object. + if (len == 0) + len = objectSize - offset; + ret.reset(new uint8_t[len]); + + // read the object into memory + size_t count = 0; + ::lseek(objFD, offset, SEEK_SET); + while (count < len) { + int err = ::read(objFD, &ret[count], len - count); + if (err < 0) + { + char buf[80]; + logger->log(LOG_CRIT, "IOC::mergeJournal(): failed to read %s, got '%s'", object, strerror_r(errno, buf, 80)); + int l_errno = errno; + ret.reset(); + errno = l_errno; + return ret; + } + else if (err == 0) + { + // at the EOF of the object. The journal may contain entries that append to the data, + // so 0-fill the remaining bytes. + #ifdef DEBUG + memset(&ret[count], 0, len-count); + #endif + break; + } + count += err; + } + + // grab the journal header and make sure the version is 1 + boost::shared_array headertxt = seekToEndOfHeader1(journalFD); + stringstream ss; + ss << headertxt.get(); + boost::property_tree::ptree header; + boost::property_tree::json_parser::read_json(ss, header); + assert(header.get("version") == "1"); + + // start processing the entries + bool eof = false; + while (1) + { + uint64_t offlen[2]; + int err = ::read(journalFD, &offlen, 16); + if (err != 16) // got EOF + break; + + // if this entry overlaps, read the overlapping section + uint64_t lastJournalOffset = offlen[0] + offlen[1]; + uint64_t lastBufOffset = offset + len; + if (offlen[0] <= lastBufOffset && lastJournalOffset >= offset) + { + uint64_t startReadingAt = max(offlen[0], offset); + uint64_t lengthOfRead = min(lastBufOffset, lastJournalOffset) - startReadingAt; + + if (startReadingAt != offlen[0]) + ::lseek(journalFD, startReadingAt - offlen[0], SEEK_CUR); + + uint count = 0; + while (count < lengthOfRead) + { + err = ::read(journalFD, &ret[startReadingAt - offset + count], lengthOfRead - count); + if (err < 0) + { + char buf[80]; + logger->log(LOG_ERR, "mergeJournal: got %s", strerror_r(errno, buf, 80)); + ret.reset(); + return ret; + } + else if (err == 0) + { + logger->log(LOG_ERR, "mergeJournal: got early EOF"); + ret.reset(); + return ret; + } + count += err; + } + + if (lengthOfRead != offlen[1]) + ::lseek(journalFD, offlen[1] - lengthOfRead, SEEK_CUR); + } + else + // skip over this journal entry + ::lseek(journalFD, offlen[1], SEEK_CUR); + } + + return ret; +} + } diff --git a/src/IOCoordinator.h b/src/IOCoordinator.h index a91716157..3ed3731f5 100644 --- a/src/IOCoordinator.h +++ b/src/IOCoordinator.h @@ -9,6 +9,10 @@ #include #include #include +#include + +#include "Config.h" +#include "SMLogging.h" namespace storagemanager { @@ -30,8 +34,16 @@ class IOCoordinator : public boost::noncopyable int unlink(const char *path); int copyFile(const char *filename1, const char *filename2); + // The shared logic for merging a journal file with its base file. + // The default values for offset and len mean 'process the whole file'. Otherwise, + // offset is relative to the object. + boost::shared_array mergeJournal(const char *objectPath, const char *journalPath, off_t offset = 0, size_t len = 0) const; + private: IOCoordinator(); + Config *config; + SMLogging *logger; + size_t objectSize; }; } diff --git a/src/unit_tests.cpp b/src/unit_tests.cpp index 0f8735eb5..be79eb941 100644 --- a/src/unit_tests.cpp +++ b/src/unit_tests.cpp @@ -29,6 +29,7 @@ using namespace storagemanager; using namespace std; +namespace bf = boost::filesystem; struct scoped_closer { scoped_closer(int f) : fd(f) { } @@ -470,7 +471,7 @@ bool localstorageTest1() bool cacheTest1() { - namespace bf = boost::filesystem; + Cache cache; CloudStorage *cs = CloudStorage::get(); LocalStorage *ls = dynamic_cast(cs); @@ -519,6 +520,81 @@ bool cacheTest1() bf::remove(storagePath / realFile); cout << "cache test 1 OK" << endl; } + +bool mergeJournalTest() +{ + /* + create a dummy object and a dummy journal + call mergeJournal to process it with various params + verify the expected values + */ + + int objFD = open("test-object", O_WRONLY | O_CREAT | O_TRUNC, 0600); + assert(objFD >= 0); + scoped_closer s1(objFD); + int journalFD = open("test-journal", O_WRONLY | O_CREAT | O_TRUNC, 0600); + assert(journalFD >= 0); + scoped_closer s2(journalFD); + + int i; + for (i = 0; i < 2048; i++) + assert(write(objFD, &i, 4) == 4); + + char header[] = "{ \"version\" : 1 }"; + write(journalFD, header, strlen(header) + 1); + + uint64_t offlen[2] = { 20, 20 }; + write(journalFD, offlen, 16); + for (i = 0; i < 5; i++) + assert(write(journalFD, &i, 4) == 4); + + // the merged version should look like + // (ints) 0 1 2 3 4 0 1 2 3 4 10 11 12 13... + + IOCoordinator *ioc = IOCoordinator::get(); + boost::shared_array data = ioc->mergeJournal("test-object", "test-journal"); + assert(data); + int *idata = (int *) data.get(); + for (i = 0; i < 5; i++) + assert(idata[i] == i); + for (; i < 10; i++) + assert(idata[i] == i-5); + for (; i < 2048; i++) + assert(idata[i] == i); + + // try different range parameters + // read at the beginning of the change + data = ioc->mergeJournal("test-object", "test-journal", 20, 40); + assert(data); + idata = (int *) data.get(); + for (i = 0; i < 5; i++) + assert(idata[i] == i); + for (; i < 10; i++) + assert(idata[i] == i+5); + + // read s.t. beginning of the change is in the middle of the range + data = ioc->mergeJournal("test-object", "test-journal", 8, 24); + assert(data); + idata = (int *) data.get(); + for (i = 0; i < 3; i++) + assert(idata[i] == i + 2); + for (; i < 6; i++) + assert(idata[i] == i - 3); + + // read s.t. end of the change is in the middle of the range + data = ioc->mergeJournal("test-object", "test-journal", 28, 20); + assert(data); + idata = (int *) data.get(); + for (i = 0; i < 3; i++) + assert(idata[i] == i + 2); + for (; i < 3; i++) + assert(idata[i] == i + 7); + + // cleanup + bf::remove("test-object"); + bf::remove("test-journal"); + cout << "mergeJournalTest OK" << endl; +} int main() @@ -539,6 +615,7 @@ int main() localstorageTest1(); cacheTest1(); + mergeJournalTest(); return 0; }