You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-13 23:02:14 +03:00
Merge branch 'develop' into synchronizer
This commit is contained in:
@@ -29,6 +29,8 @@ set(storagemanager_SRCS
|
||||
src/Downloader.cpp
|
||||
src/Synchronizer.cpp
|
||||
src/RWLock.cpp
|
||||
src/MetadataFile.cpp
|
||||
src/Replicator.cpp
|
||||
)
|
||||
|
||||
option(TRACE "Enable some tracing output" OFF)
|
||||
|
||||
110
src/IOCoordinator.cpp
Normal file → Executable file
110
src/IOCoordinator.cpp
Normal file → Executable file
@@ -1,5 +1,7 @@
|
||||
|
||||
#include "IOCoordinator.h"
|
||||
#include "Cache.h"
|
||||
#include "MetadataFile.h"
|
||||
#include "SMLogging.h"
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
@@ -8,8 +10,6 @@
|
||||
#include <errno.h>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <boost/property_tree/json_parser.hpp>
|
||||
#include <boost/uuid/uuid.hpp>
|
||||
#include <boost/uuid/uuid_io.hpp>
|
||||
#include <iostream>
|
||||
|
||||
#define max(x, y) (x > y ? x : y)
|
||||
@@ -30,6 +30,7 @@ IOCoordinator::IOCoordinator()
|
||||
{
|
||||
config = Config::get();
|
||||
logger = SMLogging::get();
|
||||
replicator = Replicator::get();
|
||||
objectSize = 5 * (1<<20);
|
||||
try
|
||||
{
|
||||
@@ -101,21 +102,77 @@ int IOCoordinator::read(const char *filename, uint8_t *data, off_t offset, size_
|
||||
|
||||
int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset, size_t length)
|
||||
{
|
||||
int fd, err;
|
||||
|
||||
OPEN(filename, O_WRONLY);
|
||||
size_t count = 0;
|
||||
::lseek(fd, offset, SEEK_SET);
|
||||
while (count < length) {
|
||||
err = ::write(fd, &data[count], length - count);
|
||||
if (err <= 0)
|
||||
if (count > 0) // return what was successfully written
|
||||
return count;
|
||||
int err = 0;
|
||||
uint64_t count = 0;
|
||||
uint64_t writelength = 0;
|
||||
uint64_t dataRemaining = length;
|
||||
uint64_t journalOffset = 0;
|
||||
bool updateMeta = false;
|
||||
vector<metadataObject> objects;
|
||||
|
||||
//writeLock(filename);
|
||||
|
||||
MetadataFile metadata = MetadataFile(filename);
|
||||
|
||||
//read metadata determine how many objects overlap
|
||||
objects = metadata.metadataRead(offset,length);
|
||||
|
||||
// if there are objects append the journalfile in replicator
|
||||
if(!objects.empty())
|
||||
{
|
||||
for (std::vector<metadataObject>::const_iterator i = objects.begin(); i != objects.end(); ++i)
|
||||
{
|
||||
// figure out how much data to write to this object
|
||||
if (count == 0 && offset > i->offset)
|
||||
{
|
||||
// first object in the list so start at offset and
|
||||
// write to end of oject or all the data
|
||||
journalOffset = offset - i->offset;
|
||||
writelength = min((objectSize - journalOffset),dataRemaining);
|
||||
}
|
||||
else
|
||||
return err;
|
||||
count += err;
|
||||
{
|
||||
// starting at beginning of next object write the rest of data
|
||||
// or until object length is reached
|
||||
writelength = min(objectSize,dataRemaining);
|
||||
journalOffset = 0;
|
||||
}
|
||||
err = replicator->addJournalEntry(i->name.c_str(),&data[count],journalOffset,writelength);
|
||||
if (err <= 0)
|
||||
{
|
||||
//log error and abort
|
||||
}
|
||||
count += err;
|
||||
dataRemaining -= err;
|
||||
}
|
||||
//cache.makeSpace(journal_data_size)
|
||||
//Synchronizer::newJournalData(journal_file);
|
||||
}
|
||||
|
||||
// there is no overlapping data, or data goes beyond end of last object
|
||||
while (dataRemaining > 0 && err >= 0)
|
||||
{
|
||||
//add a new metaDataObject
|
||||
writelength = min(objectSize,dataRemaining);
|
||||
//cache.makeSpace(size)
|
||||
// add a new metadata object, this will get a new objectKey
|
||||
metadataObject newObject = metadata.addMetadataObject(filename,writelength);
|
||||
// write the new object
|
||||
err = replicator->newObject(newObject.name.c_str(),data,writelength);
|
||||
if (err <= 0)
|
||||
{
|
||||
//log error and abort
|
||||
}
|
||||
// sync
|
||||
//Synchronizer::newObject(newname)
|
||||
count += err;
|
||||
dataRemaining -= err;
|
||||
}
|
||||
|
||||
metadata.updateMetadata(filename);
|
||||
|
||||
//writeUnlock(filename);
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
@@ -264,11 +321,11 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
|
||||
|
||||
objFD = ::open(object, O_RDONLY);
|
||||
if (objFD < 0)
|
||||
return NULL;
|
||||
return ret;
|
||||
scoped_closer s1(objFD);
|
||||
journalFD = ::open(journal, O_RDONLY);
|
||||
if (journalFD < 0)
|
||||
return NULL;
|
||||
return ret;
|
||||
scoped_closer s2(journalFD);
|
||||
|
||||
// grab the journal header, make sure the version is 1, and get the max offset
|
||||
@@ -430,27 +487,6 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array<uint8_t> &objData, size
|
||||
return 0;
|
||||
}
|
||||
|
||||
string IOCoordinator::getNewKeyFromOldKey(const string &oldKey)
|
||||
{
|
||||
boost::uuids::uuid u;
|
||||
string ret(oldKey);
|
||||
strcpy(&ret[0], boost::uuids::to_string(u).c_str());
|
||||
return ret;
|
||||
}
|
||||
|
||||
string IOCoordinator::getNewKey(string sourceName, size_t offset, size_t length)
|
||||
{
|
||||
boost::uuids::uuid u;
|
||||
stringstream ss;
|
||||
|
||||
for (int i = 0; i < sourceName.length(); i++)
|
||||
if (sourceName[i] == '/')
|
||||
sourceName[i] = '-';
|
||||
|
||||
ss << u << "_" << offset << "_" << length << "_" << sourceName;
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
bool IOCoordinator::readLock(const string &filename)
|
||||
{
|
||||
boost::unique_lock<boost::mutex> s(lockMutex);
|
||||
|
||||
@@ -14,10 +14,13 @@
|
||||
#include "Config.h"
|
||||
#include "SMLogging.h"
|
||||
#include "RWLock.h"
|
||||
#include "Replicator.h"
|
||||
|
||||
namespace storagemanager
|
||||
{
|
||||
|
||||
boost::shared_array<char> seekToEndOfHeader1(int fd);
|
||||
|
||||
class IOCoordinator : public boost::noncopyable
|
||||
{
|
||||
public:
|
||||
@@ -35,10 +38,6 @@ class IOCoordinator : public boost::noncopyable
|
||||
int unlink(const char *path);
|
||||
int copyFile(const char *filename1, const char *filename2);
|
||||
|
||||
// TBD: this may have to go; there may be no use case where only the uuid needs to change.
|
||||
std::string getNewKeyFromOldKey(const std::string &oldKey);
|
||||
std::string getNewKey(std::string sourceName, size_t offset, size_t length);
|
||||
|
||||
// The shared logic for merging a journal file with its base file.
|
||||
// *len should be set to the length of the data requested (0 means read the whole file),
|
||||
// on return *len will be the actual length returned.
|
||||
@@ -50,11 +49,12 @@ class IOCoordinator : public boost::noncopyable
|
||||
bool writeLock(const std::string &filename);
|
||||
void readUnlock(const std::string &filename);
|
||||
void writeUnlock(const std::string &filename);
|
||||
|
||||
|
||||
private:
|
||||
IOCoordinator();
|
||||
Config *config;
|
||||
SMLogging *logger;
|
||||
Replicator *replicator;
|
||||
size_t objectSize;
|
||||
|
||||
std::map<std::string, RWLock *> locks;
|
||||
|
||||
203
src/MetadataFile.cpp
Executable file
203
src/MetadataFile.cpp
Executable file
@@ -0,0 +1,203 @@
|
||||
/*
|
||||
* MetadataFile.cpp
|
||||
*/
|
||||
#include "MetadataFile.h"
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <boost/property_tree/ptree.hpp>
|
||||
#include <boost/property_tree/json_parser.hpp>
|
||||
#include <boost/foreach.hpp>
|
||||
#include <boost/uuid/uuid.hpp>
|
||||
#include <boost/uuid/uuid_io.hpp>
|
||||
|
||||
#define max(x, y) (x > y ? x : y)
|
||||
#define min(x, y) (x < y ? x : y)
|
||||
|
||||
namespace storagemanager
|
||||
{
|
||||
|
||||
MetadataFile::MetadataFile()
|
||||
{
|
||||
mpConfig = Config::get();
|
||||
mpLogger = SMLogging::get();
|
||||
mObjectSize = 5 * (1<<20);
|
||||
try
|
||||
{
|
||||
mObjectSize = stoul(mpConfig->getValue("ObjectStorage", "object_size"));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
cerr << "ObjectStorage/object_size must be set to a numeric value" << endl;
|
||||
throw;
|
||||
}
|
||||
mVersion=1;
|
||||
mRevision=1;
|
||||
}
|
||||
|
||||
|
||||
MetadataFile::MetadataFile(const char* filename)
|
||||
{
|
||||
mpConfig = Config::get();
|
||||
mpLogger = SMLogging::get();
|
||||
mObjectSize = 5 * (1<<20);
|
||||
try
|
||||
{
|
||||
mObjectSize = stoul(mpConfig->getValue("ObjectStorage", "object_size"));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
cerr << "ObjectStorage/object_size must be set to a numeric value" << endl;
|
||||
throw;
|
||||
}
|
||||
string metadataFilename = string(filename) + ".meta";
|
||||
if (boost::filesystem::exists(metadataFilename))
|
||||
{
|
||||
boost::property_tree::ptree jsontree;
|
||||
boost::property_tree::read_json(metadataFilename, jsontree);
|
||||
metadataObject newObject;
|
||||
mVersion = jsontree.get<int>("version");
|
||||
mRevision = jsontree.get<int>("revision");
|
||||
|
||||
BOOST_FOREACH(const boost::property_tree::ptree::value_type &v, jsontree.get_child("objects"))
|
||||
{
|
||||
metadataObject newObject;
|
||||
newObject.offset = v.second.get<uint64_t>("offset");
|
||||
newObject.length = v.second.get<uint64_t>("length");
|
||||
newObject.name = v.second.get<string>("name");
|
||||
mObjects.push_back(newObject);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
mVersion = 1;
|
||||
mRevision = 1;
|
||||
updateMetadata(filename);
|
||||
}
|
||||
}
|
||||
|
||||
MetadataFile::~MetadataFile()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
vector<metadataObject> MetadataFile::metadataRead(off_t offset, size_t length)
|
||||
{
|
||||
vector<metadataObject> returnObjs;
|
||||
uint64_t startData = offset;
|
||||
uint64_t endData = offset + length;
|
||||
uint64_t dataRemaining = length;
|
||||
bool foundStart = false;
|
||||
for (std::vector<metadataObject>::iterator i = mObjects.begin(); i != mObjects.end(); ++i)
|
||||
{
|
||||
uint64_t startObject = i->offset;
|
||||
uint64_t endObject = i->offset + i->length;
|
||||
uint64_t maxEndObject = i->offset + mObjectSize;
|
||||
// This logic assumes objects are in ascending order of offsets
|
||||
if (startData >= startObject && (startData < endObject || startData < maxEndObject))
|
||||
{
|
||||
returnObjs.push_back(*i);
|
||||
if (startData >= endObject)
|
||||
{
|
||||
// data starts and the end of current object and can atleast partially fit here update length
|
||||
i->length += min((maxEndObject-startData),dataRemaining);
|
||||
}
|
||||
foundStart = true;
|
||||
}
|
||||
else if (endData >= startObject && (endData < endObject || endData < maxEndObject))
|
||||
{
|
||||
// data ends in this object
|
||||
returnObjs.push_back(*i);
|
||||
if (endData >= endObject)
|
||||
{
|
||||
// data end is beyond old length
|
||||
i->length += (endData - endObject);
|
||||
}
|
||||
}
|
||||
else if (endData >= startObject && foundStart)
|
||||
{
|
||||
// data overlaps this object
|
||||
returnObjs.push_back(*i);
|
||||
}
|
||||
}
|
||||
|
||||
return returnObjs;
|
||||
}
|
||||
|
||||
metadataObject MetadataFile::addMetadataObject(const char *filename, size_t length)
|
||||
{
|
||||
metadataObject addObject,lastObject;
|
||||
if (!mObjects.empty())
|
||||
{
|
||||
metadataObject lastObject = mObjects.back();
|
||||
addObject.offset = lastObject.offset + lastObject.length;
|
||||
}
|
||||
else
|
||||
{
|
||||
addObject.offset = 0;
|
||||
}
|
||||
addObject.length = length;
|
||||
string newObjectKey = getNewKey(filename, addObject.offset, addObject.length);
|
||||
addObject.name = string(newObjectKey);
|
||||
mObjects.push_back(addObject);
|
||||
|
||||
return addObject;
|
||||
}
|
||||
|
||||
|
||||
int MetadataFile::updateMetadata(const char *filename)
|
||||
{
|
||||
string metadataFilename = string(filename) + ".meta";
|
||||
boost::property_tree::ptree jsontree;
|
||||
boost::property_tree::ptree objs;
|
||||
jsontree.put("version",mVersion);
|
||||
jsontree.put("revision",mRevision);
|
||||
for (std::vector<metadataObject>::const_iterator i = mObjects.begin(); i != mObjects.end(); ++i)
|
||||
{
|
||||
boost::property_tree::ptree object;
|
||||
object.put("offset",i->offset);
|
||||
object.put("length",i->length);
|
||||
object.put("name",i->name);
|
||||
objs.push_back(std::make_pair("", object));
|
||||
}
|
||||
jsontree.add_child("objects", objs);
|
||||
write_json(metadataFilename, jsontree);
|
||||
}
|
||||
|
||||
string MetadataFile::getNewKeyFromOldKey(const string &oldKey)
|
||||
{
|
||||
boost::uuids::uuid u;
|
||||
string ret(oldKey);
|
||||
strcpy(&ret[0], boost::uuids::to_string(u).c_str());
|
||||
return ret;
|
||||
}
|
||||
|
||||
string MetadataFile::getNewKey(string sourceName, size_t offset, size_t length)
|
||||
{
|
||||
boost::uuids::uuid u;
|
||||
stringstream ss;
|
||||
|
||||
for (int i = 0; i < sourceName.length(); i++)
|
||||
{
|
||||
if (sourceName[i] == '/')
|
||||
{
|
||||
sourceName[i] = '-';
|
||||
}
|
||||
}
|
||||
|
||||
ss << u << "_" << offset << "_" << length << "_" << sourceName;
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
void MetadataFile::printObjects()
|
||||
{
|
||||
printf("Version: %i Revision: %i\n",mVersion,mRevision);
|
||||
for (std::vector<metadataObject>::const_iterator i = mObjects.begin(); i != mObjects.end(); ++i)
|
||||
{
|
||||
printf("Name: %s Length: %lu Offset: %lu\n",i->name.c_str(),i->length,i->offset);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
58
src/MetadataFile.h
Executable file
58
src/MetadataFile.h
Executable file
@@ -0,0 +1,58 @@
|
||||
/*
|
||||
* MetadataFile.h
|
||||
*/
|
||||
#ifndef METADATAFILE_H_
|
||||
#define METADATAFILE_H_
|
||||
|
||||
#include "Config.h"
|
||||
#include "SMLogging.h"
|
||||
#include <string>
|
||||
#include <sys/types.h>
|
||||
#include <stdint.h>
|
||||
#include <vector>
|
||||
#include <iostream>
|
||||
#include <set>
|
||||
|
||||
using namespace std;
|
||||
|
||||
namespace storagemanager
|
||||
{
|
||||
|
||||
struct metadataObject {
|
||||
uint64_t offset;
|
||||
uint64_t length;
|
||||
string name;
|
||||
bool operator < (const metadataObject &b) const { return offset < b.offset; }
|
||||
};
|
||||
|
||||
class MetadataFile
|
||||
{
|
||||
public:
|
||||
MetadataFile();
|
||||
MetadataFile(const char* filename);
|
||||
~MetadataFile();
|
||||
|
||||
void printObjects();
|
||||
// returns the objects needed to update
|
||||
vector<metadataObject> metadataRead(off_t offset, size_t length);
|
||||
// updates the metadatafile with new object
|
||||
int updateMetadata(const char *filename);
|
||||
metadataObject addMetadataObject(const char *filename, size_t length);
|
||||
|
||||
// TBD: this may have to go; there may be no use case where only the uuid needs to change.
|
||||
std::string getNewKeyFromOldKey(const std::string &oldKey);
|
||||
std::string getNewKey(std::string sourceName, size_t offset, size_t length);
|
||||
|
||||
private:
|
||||
Config *mpConfig;
|
||||
SMLogging *mpLogger;
|
||||
int mVersion;
|
||||
int mRevision;
|
||||
size_t mObjectSize;
|
||||
//set<metadataObject> mObjects;
|
||||
vector<metadataObject> mObjects;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif /* METADATAFILE_H_ */
|
||||
154
src/Replicator.cpp
Executable file
154
src/Replicator.cpp
Executable file
@@ -0,0 +1,154 @@
|
||||
|
||||
#include "Replicator.h"
|
||||
#include "IOCoordinator.h"
|
||||
#include "SMLogging.h"
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
#include <errno.h>
|
||||
#include <sys/sendfile.h>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <boost/property_tree/json_parser.hpp>
|
||||
#include <boost/shared_array.hpp>
|
||||
#include <boost/format.hpp>
|
||||
#include <iostream>
|
||||
|
||||
using namespace std;
|
||||
|
||||
namespace
|
||||
{
|
||||
storagemanager::Replicator *rep = NULL;
|
||||
boost::mutex m;
|
||||
}
|
||||
namespace storagemanager
|
||||
{
|
||||
|
||||
Replicator::Replicator()
|
||||
{
|
||||
}
|
||||
|
||||
Replicator::~Replicator()
|
||||
{
|
||||
}
|
||||
|
||||
Replicator * Replicator::get()
|
||||
{
|
||||
if (rep)
|
||||
return rep;
|
||||
boost::mutex::scoped_lock s(m);
|
||||
if (rep)
|
||||
return rep;
|
||||
rep = new Replicator();
|
||||
return rep;
|
||||
}
|
||||
|
||||
struct scoped_closer {
|
||||
scoped_closer(int f) : fd(f) { }
|
||||
~scoped_closer() {
|
||||
int s_errno = errno;
|
||||
::close(fd);
|
||||
errno = s_errno;
|
||||
}
|
||||
int fd;
|
||||
};
|
||||
|
||||
#define OPEN(name, mode) \
|
||||
fd = ::open(name, mode, 0600); \
|
||||
if (fd < 0) \
|
||||
return fd; \
|
||||
scoped_closer sc(fd);
|
||||
|
||||
int Replicator::newObject(const char *filename, const uint8_t *data, size_t length )
|
||||
{
|
||||
int fd, err;
|
||||
|
||||
OPEN(filename, O_WRONLY | O_CREAT);
|
||||
size_t count = 0;
|
||||
while (count < length) {
|
||||
err = ::write(fd, &data[count], length - count);
|
||||
if (err <= 0)
|
||||
if (count > 0) // return what was successfully written
|
||||
return count;
|
||||
else
|
||||
return err;
|
||||
count += err;
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
int Replicator::addJournalEntry(const char *filename, const uint8_t *data, off_t offset, size_t length)
|
||||
{
|
||||
int fd, err;
|
||||
uint64_t offlen[] = {offset,length};
|
||||
size_t count = 0;
|
||||
int version = 1;
|
||||
string journalFilename = string(filename) + ".journal";
|
||||
uint64_t thisEntryMaxOffset = (offset + length - 1);
|
||||
if (!boost::filesystem::exists(journalFilename))
|
||||
{
|
||||
// create new journal file with header
|
||||
OPEN(journalFilename.c_str(), O_WRONLY | O_CREAT);
|
||||
string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str();
|
||||
err = ::write(fd, header.c_str(), header.length() + 1);
|
||||
if (err <= 0)
|
||||
return err;
|
||||
}
|
||||
else
|
||||
{
|
||||
// read the existing header and check if max_offset needs to be updated
|
||||
OPEN(journalFilename.c_str(), O_RDWR);
|
||||
boost::shared_array<char> headertxt = seekToEndOfHeader1(fd);
|
||||
stringstream ss;
|
||||
ss << headertxt.get();
|
||||
boost::property_tree::ptree header;
|
||||
boost::property_tree::json_parser::read_json(ss, header);
|
||||
assert(header.get<int>("version") == 1);
|
||||
uint64_t currentMaxOffset = header.get<uint64_t>("max_offset");
|
||||
if (thisEntryMaxOffset > currentMaxOffset)
|
||||
{
|
||||
string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str();
|
||||
err = ::pwrite(fd, header.c_str(), header.length() + 1,0);
|
||||
if (err <= 0)
|
||||
return err;
|
||||
}
|
||||
}
|
||||
|
||||
OPEN(journalFilename.c_str(), O_WRONLY | O_APPEND);
|
||||
|
||||
err = ::write(fd, offlen, 16);
|
||||
if (err <= 0)
|
||||
return err;
|
||||
|
||||
while (count < length) {
|
||||
err = ::write(fd, &data[count], length - count);
|
||||
if (err <= 0)
|
||||
if (count > 0) // return what was successfully written
|
||||
return count;
|
||||
else
|
||||
return err;
|
||||
count += err;
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
int Replicator::remove(const char *filename, uint8_t flags)
|
||||
{
|
||||
int ret = 0;
|
||||
boost::filesystem::path p(filename);
|
||||
|
||||
try
|
||||
{
|
||||
boost::filesystem::remove_all(filename);
|
||||
}
|
||||
catch(boost::filesystem::filesystem_error &e)
|
||||
{
|
||||
errno = e.code().value();
|
||||
ret = -1;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
||||
33
src/Replicator.h
Executable file
33
src/Replicator.h
Executable file
@@ -0,0 +1,33 @@
|
||||
#ifndef REPLICATOR_H_
|
||||
#define REPLICATOR_H_
|
||||
|
||||
//#include "ThreadPool.h"
|
||||
#include <sys/types.h>
|
||||
#include <stdint.h>
|
||||
|
||||
namespace storagemanager
|
||||
{
|
||||
|
||||
// 64-bit offset
|
||||
// 64-bit length
|
||||
// <length-bytes of data to write at specified offset>
|
||||
|
||||
class Replicator
|
||||
{
|
||||
public:
|
||||
static Replicator *get();
|
||||
virtual ~Replicator();
|
||||
|
||||
int addJournalEntry(const char *filename, const uint8_t *data, off_t offset, size_t length);
|
||||
int newObject(const char *filename, const uint8_t *data, size_t length);
|
||||
int remove(const char *key ,uint8_t flags);
|
||||
|
||||
|
||||
private:
|
||||
Replicator();
|
||||
//ThreadPool threadPool;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
184
src/unit_tests.cpp
Normal file → Executable file
184
src/unit_tests.cpp
Normal file → Executable file
@@ -11,6 +11,8 @@
|
||||
#include "Config.h"
|
||||
#include "Cache.h"
|
||||
#include "LocalStorage.h"
|
||||
#include "MetadataFile.h"
|
||||
#include "Replicator.h"
|
||||
|
||||
#include <iostream>
|
||||
#include <stdlib.h>
|
||||
@@ -22,6 +24,9 @@
|
||||
#include <fcntl.h>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <boost/thread.hpp>
|
||||
#include <boost/format.hpp>
|
||||
#include <algorithm>
|
||||
#include <random>
|
||||
|
||||
|
||||
#undef NDEBUG
|
||||
@@ -135,6 +140,59 @@ bool opentask()
|
||||
return true;
|
||||
}
|
||||
|
||||
bool replicatorTest()
|
||||
{
|
||||
Replicator *repli = Replicator::get();
|
||||
int err,fd;
|
||||
const char *newobject = "newobjectTest";
|
||||
const char *newobjectJournal = "newobjectTest.journal";
|
||||
uint8_t buf[1024];
|
||||
uint8_t data[1024];
|
||||
int version = 1;
|
||||
uint64_t max_offset = 0;
|
||||
memcpy(data,"1234567890",10);
|
||||
string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % max_offset).str();
|
||||
::pwrite(fd, header.c_str(), header.length() + 1,0);
|
||||
|
||||
// test newObject
|
||||
repli->newObject(newobject,data,10);
|
||||
|
||||
//check file contents
|
||||
fd = ::open(newobject, O_RDONLY);
|
||||
err = ::read(fd, buf, 1024);
|
||||
assert(err == 10);
|
||||
buf[10] = 0;
|
||||
assert(!strcmp("1234567890", (const char *) buf));
|
||||
cout << "replicator newObject OK" << endl;
|
||||
::close(fd);
|
||||
|
||||
// test addJournalEntry
|
||||
repli->addJournalEntry(newobject,data,0,10);
|
||||
|
||||
fd = ::open(newobjectJournal, O_RDONLY);
|
||||
err = ::read(fd, buf, 1024);
|
||||
assert(err == (header.length() + 1 + 16 + 10));
|
||||
buf[err] = 0;
|
||||
assert(!strcmp("1234567890", (const char *) buf + header.length() + 1 + 16));
|
||||
cout << "replicator addJournalEntry OK" << endl;
|
||||
::close(fd);
|
||||
|
||||
repli->remove(newobject,0);
|
||||
repli->remove(newobjectJournal,0);
|
||||
assert(!boost::filesystem::exists(newobject));
|
||||
cout << "replicator remove OK" << endl;
|
||||
return true;
|
||||
}
|
||||
|
||||
::vector<uint64_t> GenerateData(std::size_t bytes)
|
||||
{
|
||||
assert(bytes % sizeof(uint64_t) == 0);
|
||||
::vector<uint64_t> data(bytes / sizeof(uint64_t));
|
||||
::iota(data.begin(), data.end(), 0);
|
||||
::shuffle(data.begin(), data.end(), std::mt19937{ std::random_device{}() });
|
||||
return data;
|
||||
}
|
||||
|
||||
bool writetask()
|
||||
{
|
||||
// make an empty file to write to
|
||||
@@ -144,41 +202,46 @@ bool writetask()
|
||||
assert(fd > 0);
|
||||
scoped_closer f(fd);
|
||||
|
||||
uint8_t buf[1024];
|
||||
sm_msg_header *hdr = (sm_msg_header *) buf;
|
||||
write_cmd *cmd = (write_cmd *) &hdr[1];
|
||||
uint8_t *data;
|
||||
std::size_t writeSize = (10 * 1024);
|
||||
std::vector<uint64_t> writeData = GenerateData(writeSize);
|
||||
off_t nextOffset = 0;
|
||||
|
||||
for (std::size_t size = writeSize; size <= (5 * writeSize); size += writeSize)
|
||||
{
|
||||
uint8_t buf[(1024 + writeSize)];
|
||||
uint8_t *data;
|
||||
sm_msg_header *hdr = (sm_msg_header *) buf;
|
||||
write_cmd *cmd = (write_cmd *) &hdr[1];
|
||||
cmd->opcode = WRITE;
|
||||
cmd->offset = nextOffset;
|
||||
cmd->count = writeSize;
|
||||
cmd->flen = 10;
|
||||
memcpy(&cmd->filename, filename, cmd->flen);
|
||||
|
||||
data = (uint8_t *) &cmd->filename[cmd->flen];
|
||||
memcpy(data, &writeData, writeSize);
|
||||
|
||||
cmd->opcode = WRITE;
|
||||
cmd->offset = 0;
|
||||
cmd->count = 9;
|
||||
cmd->flen = 10;
|
||||
memcpy(&cmd->filename, filename, cmd->flen);
|
||||
data = (uint8_t *) &cmd->filename[cmd->flen];
|
||||
memcpy(data, "123456789", cmd->count);
|
||||
hdr->type = SM_MSG_START;
|
||||
hdr->payloadLen = sizeof(*cmd) + cmd->flen + cmd->count;
|
||||
|
||||
WriteTask w(clientSock, hdr->payloadLen);
|
||||
::write(sessionSock, cmd, hdr->payloadLen);
|
||||
w.run();
|
||||
|
||||
// verify response
|
||||
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
|
||||
sm_response *resp = (sm_response *) buf;
|
||||
assert(err == sizeof(*resp));
|
||||
assert(resp->header.type == SM_MSG_START);
|
||||
assert(resp->header.payloadLen == 4);
|
||||
assert(resp->header.flags == 0);
|
||||
assert(resp->returnCode == writeSize);
|
||||
nextOffset += (writeSize);
|
||||
}
|
||||
// This leaves behind object journal and metadata files currently
|
||||
|
||||
hdr->type = SM_MSG_START;
|
||||
hdr->payloadLen = sizeof(*cmd) + cmd->flen + cmd->count;
|
||||
|
||||
WriteTask w(clientSock, hdr->payloadLen);
|
||||
::write(sessionSock, cmd, hdr->payloadLen);
|
||||
w.run();
|
||||
|
||||
// verify response
|
||||
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
|
||||
sm_response *resp = (sm_response *) buf;
|
||||
assert(err == sizeof(*resp));
|
||||
assert(resp->header.type == SM_MSG_START);
|
||||
assert(resp->header.payloadLen == 4);
|
||||
assert(resp->header.flags == 0);
|
||||
assert(resp->returnCode == 9);
|
||||
|
||||
//check file contents
|
||||
err = ::read(fd, buf, 1024);
|
||||
assert(err == 9);
|
||||
buf[9] = 0;
|
||||
assert(!strcmp("123456789", (const char *) buf));
|
||||
::unlink(filename);
|
||||
MetadataFile mdf("./writetest1");
|
||||
mdf.printObjects();
|
||||
cout << "write task OK" << endl;
|
||||
return true;
|
||||
}
|
||||
@@ -521,34 +584,6 @@ bool cacheTest1()
|
||||
cout << "cache test 1 OK" << endl;
|
||||
}
|
||||
|
||||
// the merged version should look like
|
||||
// (ints) 0 1 2 3 4 0 1 2 3 4 10 11 12 13...
|
||||
void makeTestObject()
|
||||
{
|
||||
int objFD = open("test-object", O_WRONLY | O_CREAT | O_TRUNC, 0600);
|
||||
assert(objFD >= 0);
|
||||
scoped_closer s1(objFD);
|
||||
|
||||
int i;
|
||||
for (i = 0; i < 2048; i++)
|
||||
assert(write(objFD, &i, 4) == 4);
|
||||
}
|
||||
|
||||
void makeTestJournal()
|
||||
{
|
||||
int journalFD = open("test-journal", O_WRONLY | O_CREAT | O_TRUNC, 0600);
|
||||
assert(journalFD >= 0);
|
||||
scoped_closer s2(journalFD);
|
||||
|
||||
char header[] = "{ \"version\" : 1, \"max_offset\" : 39 }";
|
||||
write(journalFD, header, strlen(header) + 1);
|
||||
|
||||
uint64_t offlen[2] = { 20, 20 };
|
||||
write(journalFD, offlen, 16);
|
||||
for (i = 0; i < 5; i++)
|
||||
assert(write(journalFD, &i, 4) == 4);
|
||||
}
|
||||
|
||||
bool mergeJournalTest()
|
||||
{
|
||||
/*
|
||||
@@ -557,8 +592,27 @@ bool mergeJournalTest()
|
||||
verify the expected values
|
||||
*/
|
||||
|
||||
makeTestJournal();
|
||||
makeTestObject();
|
||||
int objFD = open("test-object", O_WRONLY | O_CREAT | O_TRUNC, 0600);
|
||||
assert(objFD >= 0);
|
||||
scoped_closer s1(objFD);
|
||||
int journalFD = open("test-journal", O_WRONLY | O_CREAT | O_TRUNC, 0600);
|
||||
assert(journalFD >= 0);
|
||||
scoped_closer s2(journalFD);
|
||||
|
||||
int i;
|
||||
for (i = 0; i < 2048; i++)
|
||||
assert(write(objFD, &i, 4) == 4);
|
||||
|
||||
char header[] = "{ \"version\" : 1 }";
|
||||
write(journalFD, header, strlen(header) + 1);
|
||||
|
||||
uint64_t offlen[2] = { 20, 20 };
|
||||
write(journalFD, offlen, 16);
|
||||
for (i = 0; i < 5; i++)
|
||||
assert(write(journalFD, &i, 4) == 4);
|
||||
|
||||
// the merged version should look like
|
||||
// (ints) 0 1 2 3 4 0 1 2 3 4 10 11 12 13...
|
||||
|
||||
IOCoordinator *ioc = IOCoordinator::get();
|
||||
boost::shared_array<uint8_t> data = ioc->mergeJournal("test-object", "test-journal");
|
||||
@@ -604,13 +658,11 @@ bool mergeJournalTest()
|
||||
bf::remove("test-journal");
|
||||
cout << "mergeJournalTest OK" << endl;
|
||||
}
|
||||
|
||||
bool syncTest1()
|
||||
{
|
||||
Synchronizer *sync = Synchronizer::get();
|
||||
Cache *cache = Cache::get();
|
||||
CloudStorage *cs = CloudStorage.get();
|
||||
|
||||
bf::path cachePath = sync->getCachePath();
|
||||
bf::path journalPath = sync->getJournalPath();
|
||||
|
||||
@@ -668,6 +720,6 @@ int main()
|
||||
localstorageTest1();
|
||||
cacheTest1();
|
||||
mergeJournalTest();
|
||||
|
||||
replicatorTest();
|
||||
return 0;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user