From 0b4cbad829b3b1948633b87c5ce37b40832cc8c5 Mon Sep 17 00:00:00 2001 From: Ben Thompson Date: Wed, 20 Mar 2019 11:44:43 -0500 Subject: [PATCH] Add MetadataFile / Replicator class and initial IOC-write logic. --- CMakeLists.txt | 2 + src/IOCoordinator.cpp | 110 +++++++++++++++-------- src/IOCoordinator.h | 10 +-- src/MetadataFile.cpp | 203 ++++++++++++++++++++++++++++++++++++++++++ src/MetadataFile.h | 58 ++++++++++++ src/Replicator.cpp | 154 ++++++++++++++++++++++++++++++++ src/Replicator.h | 33 +++++++ src/unit_tests.cpp | 135 ++++++++++++++++++++-------- 8 files changed, 627 insertions(+), 78 deletions(-) mode change 100644 => 100755 src/IOCoordinator.cpp create mode 100755 src/MetadataFile.cpp create mode 100755 src/MetadataFile.h create mode 100755 src/Replicator.cpp create mode 100755 src/Replicator.h mode change 100644 => 100755 src/unit_tests.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 2657a0dbd..da3669962 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -29,6 +29,8 @@ set(storagemanager_SRCS src/Downloader.cpp src/Synchronizer.cpp src/RWLock.cpp + src/MetadataFile.cpp + src/Replicator.cpp ) option(TRACE "Enable some tracing output" OFF) diff --git a/src/IOCoordinator.cpp b/src/IOCoordinator.cpp old mode 100644 new mode 100755 index e1a07cebb..97bc4ebb8 --- a/src/IOCoordinator.cpp +++ b/src/IOCoordinator.cpp @@ -1,5 +1,7 @@ #include "IOCoordinator.h" +#include "Cache.h" +#include "MetadataFile.h" #include "SMLogging.h" #include #include @@ -8,8 +10,6 @@ #include #include #include -#include -#include #include #define max(x, y) (x > y ? x : y) @@ -30,6 +30,7 @@ IOCoordinator::IOCoordinator() { config = Config::get(); logger = SMLogging::get(); + replicator = Replicator::get(); objectSize = 5 * (1<<20); try { @@ -101,21 +102,77 @@ int IOCoordinator::read(const char *filename, uint8_t *data, off_t offset, size_ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset, size_t length) { - int fd, err; - - OPEN(filename, O_WRONLY); - size_t count = 0; - ::lseek(fd, offset, SEEK_SET); - while (count < length) { - err = ::write(fd, &data[count], length - count); - if (err <= 0) - if (count > 0) // return what was successfully written - return count; + int err = 0; + uint64_t count = 0; + uint64_t writelength = 0; + uint64_t dataRemaining = length; + uint64_t journalOffset = 0; + bool updateMeta = false; + vector objects; + + //writeLock(filename); + + MetadataFile metadata = MetadataFile(filename); + + //read metadata determine how many objects overlap + objects = metadata.metadataRead(offset,length); + + // if there are objects append the journalfile in replicator + if(!objects.empty()) + { + for (std::vector::const_iterator i = objects.begin(); i != objects.end(); ++i) + { + // figure out how much data to write to this object + if (count == 0 && offset > i->offset) + { + // first object in the list so start at offset and + // write to end of oject or all the data + journalOffset = offset - i->offset; + writelength = min((objectSize - journalOffset),dataRemaining); + } else - return err; - count += err; + { + // starting at beginning of next object write the rest of data + // or until object length is reached + writelength = min(objectSize,dataRemaining); + journalOffset = 0; + } + err = replicator->addJournalEntry(i->name.c_str(),&data[count],journalOffset,writelength); + if (err <= 0) + { + //log error and abort + } + count += err; + dataRemaining -= err; + } + //cache.makeSpace(journal_data_size) + //Synchronizer::newJournalData(journal_file); } + + // there is no overlapping data, or data goes beyond end of last object + while (dataRemaining > 0 && err >= 0) + { + //add a new metaDataObject + writelength = min(objectSize,dataRemaining); + //cache.makeSpace(size) + // add a new metadata object, this will get a new objectKey + metadataObject newObject = metadata.addMetadataObject(filename,writelength); + // write the new object + err = replicator->newObject(newObject.name.c_str(),data,writelength); + if (err <= 0) + { + //log error and abort + } + // sync + //Synchronizer::newObject(newname) + count += err; + dataRemaining -= err; + } + + metadata.updateMetadata(filename); + //writeUnlock(filename); + return count; } @@ -263,11 +320,11 @@ boost::shared_array IOCoordinator::mergeJournal(const char *object, con objFD = ::open(object, O_RDONLY); if (objFD < 0) - return NULL; + return ret; scoped_closer s1(objFD); journalFD = ::open(journal, O_RDONLY); if (journalFD < 0) - return NULL; + return ret; scoped_closer s2(journalFD); // TODO: Right now this assumes that max object size has not been changed. @@ -414,27 +471,6 @@ int IOCoordinator::mergeJournalInMem(uint8_t *objData, const char *journalPath) return 0; } -string IOCoordinator::getNewKeyFromOldKey(const string &oldKey) -{ - boost::uuids::uuid u; - string ret(oldKey); - strcpy(&ret[0], boost::uuids::to_string(u).c_str()); - return ret; -} - -string IOCoordinator::getNewKey(string sourceName, size_t offset, size_t length) -{ - boost::uuids::uuid u; - stringstream ss; - - for (int i = 0; i < sourceName.length(); i++) - if (sourceName[i] == '/') - sourceName[i] = '-'; - - ss << u << "_" << offset << "_" << length << "_" << sourceName; - return ss.str(); -} - bool IOCoordinator::readLock(const string &filename) { boost::unique_lock s(lockMutex); diff --git a/src/IOCoordinator.h b/src/IOCoordinator.h index a91347a37..ecbe3281b 100644 --- a/src/IOCoordinator.h +++ b/src/IOCoordinator.h @@ -14,10 +14,13 @@ #include "Config.h" #include "SMLogging.h" #include "RWLock.h" +#include "Replicator.h" namespace storagemanager { +boost::shared_array seekToEndOfHeader1(int fd); + class IOCoordinator : public boost::noncopyable { public: @@ -35,10 +38,6 @@ class IOCoordinator : public boost::noncopyable int unlink(const char *path); int copyFile(const char *filename1, const char *filename2); - // TBD: this may have to go; there may be no use case where only the uuid needs to change. - std::string getNewKeyFromOldKey(const std::string &oldKey); - std::string getNewKey(std::string sourceName, size_t offset, size_t length); - // The shared logic for merging a journal file with its base file. // The default values for offset and len mean 'process the whole file'. Otherwise, // offset is relative to the object. @@ -50,11 +49,12 @@ class IOCoordinator : public boost::noncopyable bool writeLock(const std::string &filename); void readUnlock(const std::string &filename); void writeUnlock(const std::string &filename); - + private: IOCoordinator(); Config *config; SMLogging *logger; + Replicator *replicator; size_t objectSize; std::map locks; diff --git a/src/MetadataFile.cpp b/src/MetadataFile.cpp new file mode 100755 index 000000000..5409116ea --- /dev/null +++ b/src/MetadataFile.cpp @@ -0,0 +1,203 @@ +/* + * MetadataFile.cpp + */ +#include "MetadataFile.h" +#include +#include +#include +#include +#include +#include + +#define max(x, y) (x > y ? x : y) +#define min(x, y) (x < y ? x : y) + +namespace storagemanager +{ + +MetadataFile::MetadataFile() +{ + mpConfig = Config::get(); + mpLogger = SMLogging::get(); + mObjectSize = 5 * (1<<20); + try + { + mObjectSize = stoul(mpConfig->getValue("ObjectStorage", "object_size")); + } + catch (...) + { + cerr << "ObjectStorage/object_size must be set to a numeric value" << endl; + throw; + } + mVersion=1; + mRevision=1; +} + + +MetadataFile::MetadataFile(const char* filename) +{ + mpConfig = Config::get(); + mpLogger = SMLogging::get(); + mObjectSize = 5 * (1<<20); + try + { + mObjectSize = stoul(mpConfig->getValue("ObjectStorage", "object_size")); + } + catch (...) + { + cerr << "ObjectStorage/object_size must be set to a numeric value" << endl; + throw; + } + string metadataFilename = string(filename) + ".meta"; + if (boost::filesystem::exists(metadataFilename)) + { + boost::property_tree::ptree jsontree; + boost::property_tree::read_json(metadataFilename, jsontree); + metadataObject newObject; + mVersion = jsontree.get("version"); + mRevision = jsontree.get("revision"); + + BOOST_FOREACH(const boost::property_tree::ptree::value_type &v, jsontree.get_child("objects")) + { + metadataObject newObject; + newObject.offset = v.second.get("offset"); + newObject.length = v.second.get("length"); + newObject.name = v.second.get("name"); + mObjects.push_back(newObject); + } + } + else + { + mVersion = 1; + mRevision = 1; + updateMetadata(filename); + } +} + +MetadataFile::~MetadataFile() +{ + +} + +vector MetadataFile::metadataRead(off_t offset, size_t length) +{ + vector returnObjs; + uint64_t startData = offset; + uint64_t endData = offset + length; + uint64_t dataRemaining = length; + bool foundStart = false; + for (std::vector::iterator i = mObjects.begin(); i != mObjects.end(); ++i) + { + uint64_t startObject = i->offset; + uint64_t endObject = i->offset + i->length; + uint64_t maxEndObject = i->offset + mObjectSize; + // This logic assumes objects are in ascending order of offsets + if (startData >= startObject && (startData < endObject || startData < maxEndObject)) + { + returnObjs.push_back(*i); + if (startData >= endObject) + { + // data starts and the end of current object and can atleast partially fit here update length + i->length += min((maxEndObject-startData),dataRemaining); + } + foundStart = true; + } + else if (endData >= startObject && (endData < endObject || endData < maxEndObject)) + { + // data ends in this object + returnObjs.push_back(*i); + if (endData >= endObject) + { + // data end is beyond old length + i->length += (endData - endObject); + } + } + else if (endData >= startObject && foundStart) + { + // data overlaps this object + returnObjs.push_back(*i); + } + } + + return returnObjs; +} + +metadataObject MetadataFile::addMetadataObject(const char *filename, size_t length) +{ + metadataObject addObject,lastObject; + if (!mObjects.empty()) + { + metadataObject lastObject = mObjects.back(); + addObject.offset = lastObject.offset + lastObject.length; + } + else + { + addObject.offset = 0; + } + addObject.length = length; + string newObjectKey = getNewKey(filename, addObject.offset, addObject.length); + addObject.name = string(newObjectKey); + mObjects.push_back(addObject); + + return addObject; +} + + +int MetadataFile::updateMetadata(const char *filename) +{ + string metadataFilename = string(filename) + ".meta"; + boost::property_tree::ptree jsontree; + boost::property_tree::ptree objs; + jsontree.put("version",mVersion); + jsontree.put("revision",mRevision); + for (std::vector::const_iterator i = mObjects.begin(); i != mObjects.end(); ++i) + { + boost::property_tree::ptree object; + object.put("offset",i->offset); + object.put("length",i->length); + object.put("name",i->name); + objs.push_back(std::make_pair("", object)); + } + jsontree.add_child("objects", objs); + write_json(metadataFilename, jsontree); +} + +string MetadataFile::getNewKeyFromOldKey(const string &oldKey) +{ + boost::uuids::uuid u; + string ret(oldKey); + strcpy(&ret[0], boost::uuids::to_string(u).c_str()); + return ret; +} + +string MetadataFile::getNewKey(string sourceName, size_t offset, size_t length) +{ + boost::uuids::uuid u; + stringstream ss; + + for (int i = 0; i < sourceName.length(); i++) + { + if (sourceName[i] == '/') + { + sourceName[i] = '-'; + } + } + + ss << u << "_" << offset << "_" << length << "_" << sourceName; + return ss.str(); +} + +void MetadataFile::printObjects() +{ + printf("Version: %i Revision: %i\n",mVersion,mRevision); + for (std::vector::const_iterator i = mObjects.begin(); i != mObjects.end(); ++i) + { + printf("Name: %s Length: %lu Offset: %lu\n",i->name.c_str(),i->length,i->offset); + } +} + + +} + + + diff --git a/src/MetadataFile.h b/src/MetadataFile.h new file mode 100755 index 000000000..23cdef589 --- /dev/null +++ b/src/MetadataFile.h @@ -0,0 +1,58 @@ +/* + * MetadataFile.h + */ +#ifndef METADATAFILE_H_ +#define METADATAFILE_H_ + +#include "Config.h" +#include "SMLogging.h" +#include +#include +#include +#include +#include +#include + +using namespace std; + +namespace storagemanager +{ + +struct metadataObject { + uint64_t offset; + uint64_t length; + string name; + bool operator < (const metadataObject &b) const { return offset < b.offset; } +}; + +class MetadataFile +{ + public: + MetadataFile(); + MetadataFile(const char* filename); + ~MetadataFile(); + + void printObjects(); + // returns the objects needed to update + vector metadataRead(off_t offset, size_t length); + // updates the metadatafile with new object + int updateMetadata(const char *filename); + metadataObject addMetadataObject(const char *filename, size_t length); + + // TBD: this may have to go; there may be no use case where only the uuid needs to change. + std::string getNewKeyFromOldKey(const std::string &oldKey); + std::string getNewKey(std::string sourceName, size_t offset, size_t length); + + private: + Config *mpConfig; + SMLogging *mpLogger; + int mVersion; + int mRevision; + size_t mObjectSize; + //set mObjects; + vector mObjects; +}; + +} + +#endif /* METADATAFILE_H_ */ diff --git a/src/Replicator.cpp b/src/Replicator.cpp new file mode 100755 index 000000000..a84b702ed --- /dev/null +++ b/src/Replicator.cpp @@ -0,0 +1,154 @@ + +#include "Replicator.h" +#include "IOCoordinator.h" +#include "SMLogging.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std; + +namespace +{ + storagemanager::Replicator *rep = NULL; + boost::mutex m; +} +namespace storagemanager +{ + +Replicator::Replicator() +{ +} + +Replicator::~Replicator() +{ +} + +Replicator * Replicator::get() +{ + if (rep) + return rep; + boost::mutex::scoped_lock s(m); + if (rep) + return rep; + rep = new Replicator(); + return rep; +} + +struct scoped_closer { + scoped_closer(int f) : fd(f) { } + ~scoped_closer() { + int s_errno = errno; + ::close(fd); + errno = s_errno; + } + int fd; +}; + +#define OPEN(name, mode) \ + fd = ::open(name, mode, 0600); \ + if (fd < 0) \ + return fd; \ + scoped_closer sc(fd); + +int Replicator::newObject(const char *filename, const uint8_t *data, size_t length ) +{ + int fd, err; + + OPEN(filename, O_WRONLY | O_CREAT); + size_t count = 0; + while (count < length) { + err = ::write(fd, &data[count], length - count); + if (err <= 0) + if (count > 0) // return what was successfully written + return count; + else + return err; + count += err; + } + + return count; +} + +int Replicator::addJournalEntry(const char *filename, const uint8_t *data, off_t offset, size_t length) +{ + int fd, err; + uint64_t offlen[] = {offset,length}; + size_t count = 0; + int version = 1; + string journalFilename = string(filename) + ".journal"; + uint64_t thisEntryMaxOffset = (offset + length - 1); + if (!boost::filesystem::exists(journalFilename)) + { + // create new journal file with header + OPEN(journalFilename.c_str(), O_WRONLY | O_CREAT); + string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str(); + err = ::write(fd, header.c_str(), header.length() + 1); + if (err <= 0) + return err; + } + else + { + // read the existing header and check if max_offset needs to be updated + OPEN(journalFilename.c_str(), O_RDWR); + boost::shared_array headertxt = seekToEndOfHeader1(fd); + stringstream ss; + ss << headertxt.get(); + boost::property_tree::ptree header; + boost::property_tree::json_parser::read_json(ss, header); + assert(header.get("version") == 1); + uint64_t currentMaxOffset = header.get("max_offset"); + if (thisEntryMaxOffset > currentMaxOffset) + { + string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str(); + err = ::pwrite(fd, header.c_str(), header.length() + 1,0); + if (err <= 0) + return err; + } + } + + OPEN(journalFilename.c_str(), O_WRONLY | O_APPEND); + + err = ::write(fd, offlen, 16); + if (err <= 0) + return err; + + while (count < length) { + err = ::write(fd, &data[count], length - count); + if (err <= 0) + if (count > 0) // return what was successfully written + return count; + else + return err; + count += err; + } + + return count; +} + +int Replicator::remove(const char *filename, uint8_t flags) +{ + int ret = 0; + boost::filesystem::path p(filename); + + try + { + boost::filesystem::remove_all(filename); + } + catch(boost::filesystem::filesystem_error &e) + { + errno = e.code().value(); + ret = -1; + } + return ret; +} + +} diff --git a/src/Replicator.h b/src/Replicator.h new file mode 100755 index 000000000..63b7785d0 --- /dev/null +++ b/src/Replicator.h @@ -0,0 +1,33 @@ +#ifndef REPLICATOR_H_ +#define REPLICATOR_H_ + +//#include "ThreadPool.h" +#include +#include + +namespace storagemanager +{ + +// 64-bit offset +// 64-bit length +// + +class Replicator +{ + public: + static Replicator *get(); + virtual ~Replicator(); + + int addJournalEntry(const char *filename, const uint8_t *data, off_t offset, size_t length); + int newObject(const char *filename, const uint8_t *data, size_t length); + int remove(const char *key ,uint8_t flags); + + + private: + Replicator(); + //ThreadPool threadPool; +}; + +} + +#endif diff --git a/src/unit_tests.cpp b/src/unit_tests.cpp old mode 100644 new mode 100755 index be79eb941..888954f26 --- a/src/unit_tests.cpp +++ b/src/unit_tests.cpp @@ -11,6 +11,8 @@ #include "Config.h" #include "Cache.h" #include "LocalStorage.h" +#include "MetadataFile.h" +#include "Replicator.h" #include #include @@ -22,6 +24,9 @@ #include #include #include +#include +#include +#include #undef NDEBUG @@ -135,6 +140,59 @@ bool opentask() return true; } +bool replicatorTest() +{ + Replicator *repli = Replicator::get(); + int err,fd; + const char *newobject = "newobjectTest"; + const char *newobjectJournal = "newobjectTest.journal"; + uint8_t buf[1024]; + uint8_t data[1024]; + int version = 1; + uint64_t max_offset = 0; + memcpy(data,"1234567890",10); + string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % max_offset).str(); + ::pwrite(fd, header.c_str(), header.length() + 1,0); + + // test newObject + repli->newObject(newobject,data,10); + + //check file contents + fd = ::open(newobject, O_RDONLY); + err = ::read(fd, buf, 1024); + assert(err == 10); + buf[10] = 0; + assert(!strcmp("1234567890", (const char *) buf)); + cout << "replicator newObject OK" << endl; + ::close(fd); + + // test addJournalEntry + repli->addJournalEntry(newobject,data,0,10); + + fd = ::open(newobjectJournal, O_RDONLY); + err = ::read(fd, buf, 1024); + assert(err == (header.length() + 1 + 16 + 10)); + buf[err] = 0; + assert(!strcmp("1234567890", (const char *) buf + header.length() + 1 + 16)); + cout << "replicator addJournalEntry OK" << endl; + ::close(fd); + + repli->remove(newobject,0); + repli->remove(newobjectJournal,0); + assert(!boost::filesystem::exists(newobject)); + cout << "replicator remove OK" << endl; + return true; +} + +::vector GenerateData(std::size_t bytes) +{ + assert(bytes % sizeof(uint64_t) == 0); + ::vector data(bytes / sizeof(uint64_t)); + ::iota(data.begin(), data.end(), 0); + ::shuffle(data.begin(), data.end(), std::mt19937{ std::random_device{}() }); + return data; +} + bool writetask() { // make an empty file to write to @@ -144,41 +202,46 @@ bool writetask() assert(fd > 0); scoped_closer f(fd); - uint8_t buf[1024]; - sm_msg_header *hdr = (sm_msg_header *) buf; - write_cmd *cmd = (write_cmd *) &hdr[1]; - uint8_t *data; + std::size_t writeSize = (10 * 1024); + std::vector writeData = GenerateData(writeSize); + off_t nextOffset = 0; + + for (std::size_t size = writeSize; size <= (5 * writeSize); size += writeSize) + { + uint8_t buf[(1024 + writeSize)]; + uint8_t *data; + sm_msg_header *hdr = (sm_msg_header *) buf; + write_cmd *cmd = (write_cmd *) &hdr[1]; + cmd->opcode = WRITE; + cmd->offset = nextOffset; + cmd->count = writeSize; + cmd->flen = 10; + memcpy(&cmd->filename, filename, cmd->flen); + + data = (uint8_t *) &cmd->filename[cmd->flen]; + memcpy(data, &writeData, writeSize); - cmd->opcode = WRITE; - cmd->offset = 0; - cmd->count = 9; - cmd->flen = 10; - memcpy(&cmd->filename, filename, cmd->flen); - data = (uint8_t *) &cmd->filename[cmd->flen]; - memcpy(data, "123456789", cmd->count); + hdr->type = SM_MSG_START; + hdr->payloadLen = sizeof(*cmd) + cmd->flen + cmd->count; + + WriteTask w(clientSock, hdr->payloadLen); + ::write(sessionSock, cmd, hdr->payloadLen); + w.run(); + + // verify response + int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); + sm_response *resp = (sm_response *) buf; + assert(err == sizeof(*resp)); + assert(resp->header.type == SM_MSG_START); + assert(resp->header.payloadLen == 4); + assert(resp->header.flags == 0); + assert(resp->returnCode == writeSize); + nextOffset += (writeSize); + } + // This leaves behind object journal and metadata files currently - hdr->type = SM_MSG_START; - hdr->payloadLen = sizeof(*cmd) + cmd->flen + cmd->count; - - WriteTask w(clientSock, hdr->payloadLen); - ::write(sessionSock, cmd, hdr->payloadLen); - w.run(); - - // verify response - int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); - sm_response *resp = (sm_response *) buf; - assert(err == sizeof(*resp)); - assert(resp->header.type == SM_MSG_START); - assert(resp->header.payloadLen == 4); - assert(resp->header.flags == 0); - assert(resp->returnCode == 9); - - //check file contents - err = ::read(fd, buf, 1024); - assert(err == 9); - buf[9] = 0; - assert(!strcmp("123456789", (const char *) buf)); - ::unlink(filename); + MetadataFile mdf("./writetest1"); + mdf.printObjects(); cout << "write task OK" << endl; return true; } @@ -595,7 +658,7 @@ bool mergeJournalTest() bf::remove("test-journal"); cout << "mergeJournalTest OK" << endl; } - + int main() { @@ -612,10 +675,10 @@ int main() listdirtask(); pingtask(); copytask(); - localstorageTest1(); cacheTest1(); mergeJournalTest(); - + replicatorTest(); + return 0; }