1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-15 12:09:09 +03:00

Add MetadataFile / Replicator class and initial IOC-write logic.

This commit is contained in:
Ben Thompson
2019-03-20 11:44:43 -05:00
parent a26e939d75
commit 0b4cbad829
8 changed files with 627 additions and 78 deletions

View File

@@ -29,6 +29,8 @@ set(storagemanager_SRCS
src/Downloader.cpp
src/Synchronizer.cpp
src/RWLock.cpp
src/MetadataFile.cpp
src/Replicator.cpp
)
option(TRACE "Enable some tracing output" OFF)

108
src/IOCoordinator.cpp Normal file → Executable file
View File

@@ -1,5 +1,7 @@
#include "IOCoordinator.h"
#include "Cache.h"
#include "MetadataFile.h"
#include "SMLogging.h"
#include <sys/types.h>
#include <sys/stat.h>
@@ -8,8 +10,6 @@
#include <errno.h>
#include <boost/filesystem.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <iostream>
#define max(x, y) (x > y ? x : y)
@@ -30,6 +30,7 @@ IOCoordinator::IOCoordinator()
{
config = Config::get();
logger = SMLogging::get();
replicator = Replicator::get();
objectSize = 5 * (1<<20);
try
{
@@ -101,21 +102,77 @@ int IOCoordinator::read(const char *filename, uint8_t *data, off_t offset, size_
int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset, size_t length)
{
int fd, err;
int err = 0;
uint64_t count = 0;
uint64_t writelength = 0;
uint64_t dataRemaining = length;
uint64_t journalOffset = 0;
bool updateMeta = false;
vector<metadataObject> objects;
OPEN(filename, O_WRONLY);
size_t count = 0;
::lseek(fd, offset, SEEK_SET);
while (count < length) {
err = ::write(fd, &data[count], length - count);
if (err <= 0)
if (count > 0) // return what was successfully written
return count;
//writeLock(filename);
MetadataFile metadata = MetadataFile(filename);
//read metadata determine how many objects overlap
objects = metadata.metadataRead(offset,length);
// if there are objects append the journalfile in replicator
if(!objects.empty())
{
for (std::vector<metadataObject>::const_iterator i = objects.begin(); i != objects.end(); ++i)
{
// figure out how much data to write to this object
if (count == 0 && offset > i->offset)
{
// first object in the list so start at offset and
// write to end of oject or all the data
journalOffset = offset - i->offset;
writelength = min((objectSize - journalOffset),dataRemaining);
}
else
return err;
count += err;
{
// starting at beginning of next object write the rest of data
// or until object length is reached
writelength = min(objectSize,dataRemaining);
journalOffset = 0;
}
err = replicator->addJournalEntry(i->name.c_str(),&data[count],journalOffset,writelength);
if (err <= 0)
{
//log error and abort
}
count += err;
dataRemaining -= err;
}
//cache.makeSpace(journal_data_size)
//Synchronizer::newJournalData(journal_file);
}
// there is no overlapping data, or data goes beyond end of last object
while (dataRemaining > 0 && err >= 0)
{
//add a new metaDataObject
writelength = min(objectSize,dataRemaining);
//cache.makeSpace(size)
// add a new metadata object, this will get a new objectKey
metadataObject newObject = metadata.addMetadataObject(filename,writelength);
// write the new object
err = replicator->newObject(newObject.name.c_str(),data,writelength);
if (err <= 0)
{
//log error and abort
}
// sync
//Synchronizer::newObject(newname)
count += err;
dataRemaining -= err;
}
metadata.updateMetadata(filename);
//writeUnlock(filename);
return count;
}
@@ -263,11 +320,11 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
objFD = ::open(object, O_RDONLY);
if (objFD < 0)
return NULL;
return ret;
scoped_closer s1(objFD);
journalFD = ::open(journal, O_RDONLY);
if (journalFD < 0)
return NULL;
return ret;
scoped_closer s2(journalFD);
// TODO: Right now this assumes that max object size has not been changed.
@@ -414,27 +471,6 @@ int IOCoordinator::mergeJournalInMem(uint8_t *objData, const char *journalPath)
return 0;
}
string IOCoordinator::getNewKeyFromOldKey(const string &oldKey)
{
boost::uuids::uuid u;
string ret(oldKey);
strcpy(&ret[0], boost::uuids::to_string(u).c_str());
return ret;
}
string IOCoordinator::getNewKey(string sourceName, size_t offset, size_t length)
{
boost::uuids::uuid u;
stringstream ss;
for (int i = 0; i < sourceName.length(); i++)
if (sourceName[i] == '/')
sourceName[i] = '-';
ss << u << "_" << offset << "_" << length << "_" << sourceName;
return ss.str();
}
bool IOCoordinator::readLock(const string &filename)
{
boost::unique_lock<boost::mutex> s(lockMutex);

View File

@@ -14,10 +14,13 @@
#include "Config.h"
#include "SMLogging.h"
#include "RWLock.h"
#include "Replicator.h"
namespace storagemanager
{
boost::shared_array<char> seekToEndOfHeader1(int fd);
class IOCoordinator : public boost::noncopyable
{
public:
@@ -35,10 +38,6 @@ class IOCoordinator : public boost::noncopyable
int unlink(const char *path);
int copyFile(const char *filename1, const char *filename2);
// TBD: this may have to go; there may be no use case where only the uuid needs to change.
std::string getNewKeyFromOldKey(const std::string &oldKey);
std::string getNewKey(std::string sourceName, size_t offset, size_t length);
// The shared logic for merging a journal file with its base file.
// The default values for offset and len mean 'process the whole file'. Otherwise,
// offset is relative to the object.
@@ -55,6 +54,7 @@ class IOCoordinator : public boost::noncopyable
IOCoordinator();
Config *config;
SMLogging *logger;
Replicator *replicator;
size_t objectSize;
std::map<std::string, RWLock *> locks;

203
src/MetadataFile.cpp Executable file
View File

@@ -0,0 +1,203 @@
/*
* MetadataFile.cpp
*/
#include "MetadataFile.h"
#include <boost/filesystem.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/foreach.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
#define max(x, y) (x > y ? x : y)
#define min(x, y) (x < y ? x : y)
namespace storagemanager
{
MetadataFile::MetadataFile()
{
mpConfig = Config::get();
mpLogger = SMLogging::get();
mObjectSize = 5 * (1<<20);
try
{
mObjectSize = stoul(mpConfig->getValue("ObjectStorage", "object_size"));
}
catch (...)
{
cerr << "ObjectStorage/object_size must be set to a numeric value" << endl;
throw;
}
mVersion=1;
mRevision=1;
}
MetadataFile::MetadataFile(const char* filename)
{
mpConfig = Config::get();
mpLogger = SMLogging::get();
mObjectSize = 5 * (1<<20);
try
{
mObjectSize = stoul(mpConfig->getValue("ObjectStorage", "object_size"));
}
catch (...)
{
cerr << "ObjectStorage/object_size must be set to a numeric value" << endl;
throw;
}
string metadataFilename = string(filename) + ".meta";
if (boost::filesystem::exists(metadataFilename))
{
boost::property_tree::ptree jsontree;
boost::property_tree::read_json(metadataFilename, jsontree);
metadataObject newObject;
mVersion = jsontree.get<int>("version");
mRevision = jsontree.get<int>("revision");
BOOST_FOREACH(const boost::property_tree::ptree::value_type &v, jsontree.get_child("objects"))
{
metadataObject newObject;
newObject.offset = v.second.get<uint64_t>("offset");
newObject.length = v.second.get<uint64_t>("length");
newObject.name = v.second.get<string>("name");
mObjects.push_back(newObject);
}
}
else
{
mVersion = 1;
mRevision = 1;
updateMetadata(filename);
}
}
MetadataFile::~MetadataFile()
{
}
vector<metadataObject> MetadataFile::metadataRead(off_t offset, size_t length)
{
vector<metadataObject> returnObjs;
uint64_t startData = offset;
uint64_t endData = offset + length;
uint64_t dataRemaining = length;
bool foundStart = false;
for (std::vector<metadataObject>::iterator i = mObjects.begin(); i != mObjects.end(); ++i)
{
uint64_t startObject = i->offset;
uint64_t endObject = i->offset + i->length;
uint64_t maxEndObject = i->offset + mObjectSize;
// This logic assumes objects are in ascending order of offsets
if (startData >= startObject && (startData < endObject || startData < maxEndObject))
{
returnObjs.push_back(*i);
if (startData >= endObject)
{
// data starts and the end of current object and can atleast partially fit here update length
i->length += min((maxEndObject-startData),dataRemaining);
}
foundStart = true;
}
else if (endData >= startObject && (endData < endObject || endData < maxEndObject))
{
// data ends in this object
returnObjs.push_back(*i);
if (endData >= endObject)
{
// data end is beyond old length
i->length += (endData - endObject);
}
}
else if (endData >= startObject && foundStart)
{
// data overlaps this object
returnObjs.push_back(*i);
}
}
return returnObjs;
}
metadataObject MetadataFile::addMetadataObject(const char *filename, size_t length)
{
metadataObject addObject,lastObject;
if (!mObjects.empty())
{
metadataObject lastObject = mObjects.back();
addObject.offset = lastObject.offset + lastObject.length;
}
else
{
addObject.offset = 0;
}
addObject.length = length;
string newObjectKey = getNewKey(filename, addObject.offset, addObject.length);
addObject.name = string(newObjectKey);
mObjects.push_back(addObject);
return addObject;
}
int MetadataFile::updateMetadata(const char *filename)
{
string metadataFilename = string(filename) + ".meta";
boost::property_tree::ptree jsontree;
boost::property_tree::ptree objs;
jsontree.put("version",mVersion);
jsontree.put("revision",mRevision);
for (std::vector<metadataObject>::const_iterator i = mObjects.begin(); i != mObjects.end(); ++i)
{
boost::property_tree::ptree object;
object.put("offset",i->offset);
object.put("length",i->length);
object.put("name",i->name);
objs.push_back(std::make_pair("", object));
}
jsontree.add_child("objects", objs);
write_json(metadataFilename, jsontree);
}
string MetadataFile::getNewKeyFromOldKey(const string &oldKey)
{
boost::uuids::uuid u;
string ret(oldKey);
strcpy(&ret[0], boost::uuids::to_string(u).c_str());
return ret;
}
string MetadataFile::getNewKey(string sourceName, size_t offset, size_t length)
{
boost::uuids::uuid u;
stringstream ss;
for (int i = 0; i < sourceName.length(); i++)
{
if (sourceName[i] == '/')
{
sourceName[i] = '-';
}
}
ss << u << "_" << offset << "_" << length << "_" << sourceName;
return ss.str();
}
void MetadataFile::printObjects()
{
printf("Version: %i Revision: %i\n",mVersion,mRevision);
for (std::vector<metadataObject>::const_iterator i = mObjects.begin(); i != mObjects.end(); ++i)
{
printf("Name: %s Length: %lu Offset: %lu\n",i->name.c_str(),i->length,i->offset);
}
}
}

58
src/MetadataFile.h Executable file
View File

@@ -0,0 +1,58 @@
/*
* MetadataFile.h
*/
#ifndef METADATAFILE_H_
#define METADATAFILE_H_
#include "Config.h"
#include "SMLogging.h"
#include <string>
#include <sys/types.h>
#include <stdint.h>
#include <vector>
#include <iostream>
#include <set>
using namespace std;
namespace storagemanager
{
struct metadataObject {
uint64_t offset;
uint64_t length;
string name;
bool operator < (const metadataObject &b) const { return offset < b.offset; }
};
class MetadataFile
{
public:
MetadataFile();
MetadataFile(const char* filename);
~MetadataFile();
void printObjects();
// returns the objects needed to update
vector<metadataObject> metadataRead(off_t offset, size_t length);
// updates the metadatafile with new object
int updateMetadata(const char *filename);
metadataObject addMetadataObject(const char *filename, size_t length);
// TBD: this may have to go; there may be no use case where only the uuid needs to change.
std::string getNewKeyFromOldKey(const std::string &oldKey);
std::string getNewKey(std::string sourceName, size_t offset, size_t length);
private:
Config *mpConfig;
SMLogging *mpLogger;
int mVersion;
int mRevision;
size_t mObjectSize;
//set<metadataObject> mObjects;
vector<metadataObject> mObjects;
};
}
#endif /* METADATAFILE_H_ */

154
src/Replicator.cpp Executable file
View File

@@ -0,0 +1,154 @@
#include "Replicator.h"
#include "IOCoordinator.h"
#include "SMLogging.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <sys/sendfile.h>
#include <boost/filesystem.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/shared_array.hpp>
#include <boost/format.hpp>
#include <iostream>
using namespace std;
namespace
{
storagemanager::Replicator *rep = NULL;
boost::mutex m;
}
namespace storagemanager
{
Replicator::Replicator()
{
}
Replicator::~Replicator()
{
}
Replicator * Replicator::get()
{
if (rep)
return rep;
boost::mutex::scoped_lock s(m);
if (rep)
return rep;
rep = new Replicator();
return rep;
}
struct scoped_closer {
scoped_closer(int f) : fd(f) { }
~scoped_closer() {
int s_errno = errno;
::close(fd);
errno = s_errno;
}
int fd;
};
#define OPEN(name, mode) \
fd = ::open(name, mode, 0600); \
if (fd < 0) \
return fd; \
scoped_closer sc(fd);
int Replicator::newObject(const char *filename, const uint8_t *data, size_t length )
{
int fd, err;
OPEN(filename, O_WRONLY | O_CREAT);
size_t count = 0;
while (count < length) {
err = ::write(fd, &data[count], length - count);
if (err <= 0)
if (count > 0) // return what was successfully written
return count;
else
return err;
count += err;
}
return count;
}
int Replicator::addJournalEntry(const char *filename, const uint8_t *data, off_t offset, size_t length)
{
int fd, err;
uint64_t offlen[] = {offset,length};
size_t count = 0;
int version = 1;
string journalFilename = string(filename) + ".journal";
uint64_t thisEntryMaxOffset = (offset + length - 1);
if (!boost::filesystem::exists(journalFilename))
{
// create new journal file with header
OPEN(journalFilename.c_str(), O_WRONLY | O_CREAT);
string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str();
err = ::write(fd, header.c_str(), header.length() + 1);
if (err <= 0)
return err;
}
else
{
// read the existing header and check if max_offset needs to be updated
OPEN(journalFilename.c_str(), O_RDWR);
boost::shared_array<char> headertxt = seekToEndOfHeader1(fd);
stringstream ss;
ss << headertxt.get();
boost::property_tree::ptree header;
boost::property_tree::json_parser::read_json(ss, header);
assert(header.get<int>("version") == 1);
uint64_t currentMaxOffset = header.get<uint64_t>("max_offset");
if (thisEntryMaxOffset > currentMaxOffset)
{
string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str();
err = ::pwrite(fd, header.c_str(), header.length() + 1,0);
if (err <= 0)
return err;
}
}
OPEN(journalFilename.c_str(), O_WRONLY | O_APPEND);
err = ::write(fd, offlen, 16);
if (err <= 0)
return err;
while (count < length) {
err = ::write(fd, &data[count], length - count);
if (err <= 0)
if (count > 0) // return what was successfully written
return count;
else
return err;
count += err;
}
return count;
}
int Replicator::remove(const char *filename, uint8_t flags)
{
int ret = 0;
boost::filesystem::path p(filename);
try
{
boost::filesystem::remove_all(filename);
}
catch(boost::filesystem::filesystem_error &e)
{
errno = e.code().value();
ret = -1;
}
return ret;
}
}

33
src/Replicator.h Executable file
View File

@@ -0,0 +1,33 @@
#ifndef REPLICATOR_H_
#define REPLICATOR_H_
//#include "ThreadPool.h"
#include <sys/types.h>
#include <stdint.h>
namespace storagemanager
{
// 64-bit offset
// 64-bit length
// <length-bytes of data to write at specified offset>
class Replicator
{
public:
static Replicator *get();
virtual ~Replicator();
int addJournalEntry(const char *filename, const uint8_t *data, off_t offset, size_t length);
int newObject(const char *filename, const uint8_t *data, size_t length);
int remove(const char *key ,uint8_t flags);
private:
Replicator();
//ThreadPool threadPool;
};
}
#endif

125
src/unit_tests.cpp Normal file → Executable file
View File

@@ -11,6 +11,8 @@
#include "Config.h"
#include "Cache.h"
#include "LocalStorage.h"
#include "MetadataFile.h"
#include "Replicator.h"
#include <iostream>
#include <stdlib.h>
@@ -22,6 +24,9 @@
#include <fcntl.h>
#include <boost/filesystem.hpp>
#include <boost/thread.hpp>
#include <boost/format.hpp>
#include <algorithm>
#include <random>
#undef NDEBUG
@@ -135,6 +140,59 @@ bool opentask()
return true;
}
bool replicatorTest()
{
Replicator *repli = Replicator::get();
int err,fd;
const char *newobject = "newobjectTest";
const char *newobjectJournal = "newobjectTest.journal";
uint8_t buf[1024];
uint8_t data[1024];
int version = 1;
uint64_t max_offset = 0;
memcpy(data,"1234567890",10);
string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % max_offset).str();
::pwrite(fd, header.c_str(), header.length() + 1,0);
// test newObject
repli->newObject(newobject,data,10);
//check file contents
fd = ::open(newobject, O_RDONLY);
err = ::read(fd, buf, 1024);
assert(err == 10);
buf[10] = 0;
assert(!strcmp("1234567890", (const char *) buf));
cout << "replicator newObject OK" << endl;
::close(fd);
// test addJournalEntry
repli->addJournalEntry(newobject,data,0,10);
fd = ::open(newobjectJournal, O_RDONLY);
err = ::read(fd, buf, 1024);
assert(err == (header.length() + 1 + 16 + 10));
buf[err] = 0;
assert(!strcmp("1234567890", (const char *) buf + header.length() + 1 + 16));
cout << "replicator addJournalEntry OK" << endl;
::close(fd);
repli->remove(newobject,0);
repli->remove(newobjectJournal,0);
assert(!boost::filesystem::exists(newobject));
cout << "replicator remove OK" << endl;
return true;
}
::vector<uint64_t> GenerateData(std::size_t bytes)
{
assert(bytes % sizeof(uint64_t) == 0);
::vector<uint64_t> data(bytes / sizeof(uint64_t));
::iota(data.begin(), data.end(), 0);
::shuffle(data.begin(), data.end(), std::mt19937{ std::random_device{}() });
return data;
}
bool writetask()
{
// make an empty file to write to
@@ -144,41 +202,46 @@ bool writetask()
assert(fd > 0);
scoped_closer f(fd);
uint8_t buf[1024];
sm_msg_header *hdr = (sm_msg_header *) buf;
write_cmd *cmd = (write_cmd *) &hdr[1];
uint8_t *data;
std::size_t writeSize = (10 * 1024);
std::vector<uint64_t> writeData = GenerateData(writeSize);
off_t nextOffset = 0;
cmd->opcode = WRITE;
cmd->offset = 0;
cmd->count = 9;
cmd->flen = 10;
memcpy(&cmd->filename, filename, cmd->flen);
data = (uint8_t *) &cmd->filename[cmd->flen];
memcpy(data, "123456789", cmd->count);
for (std::size_t size = writeSize; size <= (5 * writeSize); size += writeSize)
{
uint8_t buf[(1024 + writeSize)];
uint8_t *data;
sm_msg_header *hdr = (sm_msg_header *) buf;
write_cmd *cmd = (write_cmd *) &hdr[1];
cmd->opcode = WRITE;
cmd->offset = nextOffset;
cmd->count = writeSize;
cmd->flen = 10;
memcpy(&cmd->filename, filename, cmd->flen);
hdr->type = SM_MSG_START;
hdr->payloadLen = sizeof(*cmd) + cmd->flen + cmd->count;
data = (uint8_t *) &cmd->filename[cmd->flen];
memcpy(data, &writeData, writeSize);
WriteTask w(clientSock, hdr->payloadLen);
::write(sessionSock, cmd, hdr->payloadLen);
w.run();
hdr->type = SM_MSG_START;
hdr->payloadLen = sizeof(*cmd) + cmd->flen + cmd->count;
// verify response
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_response *resp = (sm_response *) buf;
assert(err == sizeof(*resp));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == 4);
assert(resp->header.flags == 0);
assert(resp->returnCode == 9);
WriteTask w(clientSock, hdr->payloadLen);
::write(sessionSock, cmd, hdr->payloadLen);
w.run();
//check file contents
err = ::read(fd, buf, 1024);
assert(err == 9);
buf[9] = 0;
assert(!strcmp("123456789", (const char *) buf));
::unlink(filename);
// verify response
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_response *resp = (sm_response *) buf;
assert(err == sizeof(*resp));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == 4);
assert(resp->header.flags == 0);
assert(resp->returnCode == writeSize);
nextOffset += (writeSize);
}
// This leaves behind object journal and metadata files currently
MetadataFile mdf("./writetest1");
mdf.printObjects();
cout << "write task OK" << endl;
return true;
}
@@ -612,10 +675,10 @@ int main()
listdirtask();
pingtask();
copytask();
localstorageTest1();
cacheTest1();
mergeJournalTest();
replicatorTest();
return 0;
}