1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-13 23:02:14 +03:00

First cut of the journal-merging code + the unit test for it.

Looks like it's working.
This commit is contained in:
Patrick LeBlanc
2019-03-12 11:51:47 -05:00
parent 13ade9e724
commit 208e3c300d
3 changed files with 240 additions and 1 deletions

View File

@@ -7,8 +7,12 @@
#include <unistd.h>
#include <errno.h>
#include <boost/filesystem.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <iostream>
#define max(x, y) (x > y ? x : y)
#define min(x, y) (x < y ? x : y)
using namespace std;
namespace
@@ -22,6 +26,18 @@ namespace storagemanager
IOCoordinator::IOCoordinator()
{
config = Config::get();
logger = SMLogging::get();
objectSize = 5 * (1<<20);
try
{
objectSize = stoul(config->getValue("ObjectStorage", "object_size"));
}
catch (...)
{
cerr << "ObjectStorage/object_size must be set to a numeric value" << endl;
throw;
}
}
IOCoordinator::~IOCoordinator()
@@ -209,4 +225,138 @@ int IOCoordinator::copyFile(const char *filename1, const char *filename2)
return err;
}
// this is not generic by any means. This is assuming a version 1 journal header, and is looking
// for the end of it, which is the first \0 char. It returns with fd pointing at the
// first byte after the header.
// update: had to make it also return the header; the boost json parser does not stop at either
// a null char or the end of an object.
boost::shared_array<char> seekToEndOfHeader1(int fd)
{
::lseek(fd, 0, SEEK_SET);
boost::shared_array<char> ret(new char[100]);
int err;
err = ::read(fd, ret.get(), 100);
if (err < 0)
{
char buf[80];
throw runtime_error("seekToEndOfHeader1 got: " + string(strerror_r(errno, buf, 80)));
}
for (int i = 0; i < err; i++)
{
if (ret[i] == 0)
{
::lseek(fd, i+1, SEEK_SET);
return ret;
}
}
throw runtime_error("seekToEndOfHeader1: did not find the end of the header");
}
boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, const char *journal, off_t offset, size_t len) const
{
int objFD, journalFD;
boost::shared_array<uint8_t> ret;
objFD = ::open(object, O_RDONLY);
if (objFD < 0)
return NULL;
scoped_closer s1(objFD);
journalFD = ::open(journal, O_RDONLY);
if (journalFD < 0)
return NULL;
scoped_closer s2(journalFD);
// TODO: Right now this assumes that max object size has not been changed.
// ideally, we would have a way to look up the size of a specific object.
if (len == 0)
len = objectSize - offset;
ret.reset(new uint8_t[len]);
// read the object into memory
size_t count = 0;
::lseek(objFD, offset, SEEK_SET);
while (count < len) {
int err = ::read(objFD, &ret[count], len - count);
if (err < 0)
{
char buf[80];
logger->log(LOG_CRIT, "IOC::mergeJournal(): failed to read %s, got '%s'", object, strerror_r(errno, buf, 80));
int l_errno = errno;
ret.reset();
errno = l_errno;
return ret;
}
else if (err == 0)
{
// at the EOF of the object. The journal may contain entries that append to the data,
// so 0-fill the remaining bytes.
#ifdef DEBUG
memset(&ret[count], 0, len-count);
#endif
break;
}
count += err;
}
// grab the journal header and make sure the version is 1
boost::shared_array<char> headertxt = seekToEndOfHeader1(journalFD);
stringstream ss;
ss << headertxt.get();
boost::property_tree::ptree header;
boost::property_tree::json_parser::read_json(ss, header);
assert(header.get<string>("version") == "1");
// start processing the entries
bool eof = false;
while (1)
{
uint64_t offlen[2];
int err = ::read(journalFD, &offlen, 16);
if (err != 16) // got EOF
break;
// if this entry overlaps, read the overlapping section
uint64_t lastJournalOffset = offlen[0] + offlen[1];
uint64_t lastBufOffset = offset + len;
if (offlen[0] <= lastBufOffset && lastJournalOffset >= offset)
{
uint64_t startReadingAt = max(offlen[0], offset);
uint64_t lengthOfRead = min(lastBufOffset, lastJournalOffset) - startReadingAt;
if (startReadingAt != offlen[0])
::lseek(journalFD, startReadingAt - offlen[0], SEEK_CUR);
uint count = 0;
while (count < lengthOfRead)
{
err = ::read(journalFD, &ret[startReadingAt - offset + count], lengthOfRead - count);
if (err < 0)
{
char buf[80];
logger->log(LOG_ERR, "mergeJournal: got %s", strerror_r(errno, buf, 80));
ret.reset();
return ret;
}
else if (err == 0)
{
logger->log(LOG_ERR, "mergeJournal: got early EOF");
ret.reset();
return ret;
}
count += err;
}
if (lengthOfRead != offlen[1])
::lseek(journalFD, offlen[1] - lengthOfRead, SEEK_CUR);
}
else
// skip over this journal entry
::lseek(journalFD, offlen[1], SEEK_CUR);
}
return ret;
}
}

View File

@@ -9,6 +9,10 @@
#include <string>
#include <boost/utility.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/shared_array.hpp>
#include "Config.h"
#include "SMLogging.h"
namespace storagemanager
{
@@ -30,8 +34,16 @@ class IOCoordinator : public boost::noncopyable
int unlink(const char *path);
int copyFile(const char *filename1, const char *filename2);
// The shared logic for merging a journal file with its base file.
// The default values for offset and len mean 'process the whole file'. Otherwise,
// offset is relative to the object.
boost::shared_array<uint8_t> mergeJournal(const char *objectPath, const char *journalPath, off_t offset = 0, size_t len = 0) const;
private:
IOCoordinator();
Config *config;
SMLogging *logger;
size_t objectSize;
};
}

View File

@@ -29,6 +29,7 @@
using namespace storagemanager;
using namespace std;
namespace bf = boost::filesystem;
struct scoped_closer {
scoped_closer(int f) : fd(f) { }
@@ -470,7 +471,7 @@ bool localstorageTest1()
bool cacheTest1()
{
namespace bf = boost::filesystem;
Cache cache;
CloudStorage *cs = CloudStorage::get();
LocalStorage *ls = dynamic_cast<LocalStorage *>(cs);
@@ -519,6 +520,81 @@ bool cacheTest1()
bf::remove(storagePath / realFile);
cout << "cache test 1 OK" << endl;
}
bool mergeJournalTest()
{
/*
create a dummy object and a dummy journal
call mergeJournal to process it with various params
verify the expected values
*/
int objFD = open("test-object", O_WRONLY | O_CREAT | O_TRUNC, 0600);
assert(objFD >= 0);
scoped_closer s1(objFD);
int journalFD = open("test-journal", O_WRONLY | O_CREAT | O_TRUNC, 0600);
assert(journalFD >= 0);
scoped_closer s2(journalFD);
int i;
for (i = 0; i < 2048; i++)
assert(write(objFD, &i, 4) == 4);
char header[] = "{ \"version\" : 1 }";
write(journalFD, header, strlen(header) + 1);
uint64_t offlen[2] = { 20, 20 };
write(journalFD, offlen, 16);
for (i = 0; i < 5; i++)
assert(write(journalFD, &i, 4) == 4);
// the merged version should look like
// (ints) 0 1 2 3 4 0 1 2 3 4 10 11 12 13...
IOCoordinator *ioc = IOCoordinator::get();
boost::shared_array<uint8_t> data = ioc->mergeJournal("test-object", "test-journal");
assert(data);
int *idata = (int *) data.get();
for (i = 0; i < 5; i++)
assert(idata[i] == i);
for (; i < 10; i++)
assert(idata[i] == i-5);
for (; i < 2048; i++)
assert(idata[i] == i);
// try different range parameters
// read at the beginning of the change
data = ioc->mergeJournal("test-object", "test-journal", 20, 40);
assert(data);
idata = (int *) data.get();
for (i = 0; i < 5; i++)
assert(idata[i] == i);
for (; i < 10; i++)
assert(idata[i] == i+5);
// read s.t. beginning of the change is in the middle of the range
data = ioc->mergeJournal("test-object", "test-journal", 8, 24);
assert(data);
idata = (int *) data.get();
for (i = 0; i < 3; i++)
assert(idata[i] == i + 2);
for (; i < 6; i++)
assert(idata[i] == i - 3);
// read s.t. end of the change is in the middle of the range
data = ioc->mergeJournal("test-object", "test-journal", 28, 20);
assert(data);
idata = (int *) data.get();
for (i = 0; i < 3; i++)
assert(idata[i] == i + 2);
for (; i < 3; i++)
assert(idata[i] == i + 7);
// cleanup
bf::remove("test-object");
bf::remove("test-journal");
cout << "mergeJournalTest OK" << endl;
}
int main()
@@ -539,6 +615,7 @@ int main()
localstorageTest1();
cacheTest1();
mergeJournalTest();
return 0;
}