You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-17 01:02:23 +03:00
231 lines
6.4 KiB
C++
231 lines
6.4 KiB
C++
|
|
#include "Replicator.h"
|
|
#include "IOCoordinator.h"
|
|
#include "SMLogging.h"
|
|
#include "Utilities.h"
|
|
#include "Cache.h"
|
|
#include <sys/types.h>
|
|
#include <sys/stat.h>
|
|
#include <fcntl.h>
|
|
#include <unistd.h>
|
|
#include <errno.h>
|
|
#include <sys/sendfile.h>
|
|
#include <boost/filesystem.hpp>
|
|
#define BOOST_SPIRIT_THREADSAFE
|
|
#include <boost/property_tree/json_parser.hpp>
|
|
#include <boost/shared_array.hpp>
|
|
#include <boost/format.hpp>
|
|
#include <iostream>
|
|
|
|
using namespace std;
|
|
|
|
namespace
|
|
{
|
|
storagemanager::Replicator *rep = NULL;
|
|
boost::mutex m;
|
|
}
|
|
namespace storagemanager
|
|
{
|
|
|
|
Replicator::Replicator()
|
|
{
|
|
mpConfig = Config::get();
|
|
mpLogger = SMLogging::get();
|
|
try
|
|
{
|
|
msJournalPath = mpConfig->getValue("ObjectStorage", "journal_path");
|
|
if (msJournalPath.empty())
|
|
{
|
|
mpLogger->log(LOG_CRIT, "ObjectStorage/journal_path is not set");
|
|
throw runtime_error("Please set ObjectStorage/journal_path in the storagemanager.cnf file");
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
mpLogger->log(LOG_CRIT, "Could not load metadata_path from storagemanger.cnf file.");
|
|
throw runtime_error("Please set ObjectStorage/metadata_path in the storagemanager.cnf file");
|
|
}
|
|
try
|
|
{
|
|
boost::filesystem::create_directories(msJournalPath);
|
|
}
|
|
catch (exception &e)
|
|
{
|
|
syslog(LOG_CRIT, "Failed to create %s, got: %s", msJournalPath.c_str(), e.what());
|
|
throw e;
|
|
}
|
|
msCachePath = mpConfig->getValue("Cache", "path");
|
|
if (msCachePath.empty())
|
|
{
|
|
mpLogger->log(LOG_CRIT, "Cache/path is not set");
|
|
throw runtime_error("Please set Cache/path in the storagemanager.cnf file");
|
|
}
|
|
try
|
|
{
|
|
boost::filesystem::create_directories(msCachePath);
|
|
}
|
|
catch (exception &e)
|
|
{
|
|
mpLogger->log(LOG_CRIT, "Failed to create %s, got: %s", msCachePath.c_str(), e.what());
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
Replicator::~Replicator()
|
|
{
|
|
}
|
|
|
|
Replicator * Replicator::get()
|
|
{
|
|
if (rep)
|
|
return rep;
|
|
boost::mutex::scoped_lock s(m);
|
|
if (rep)
|
|
return rep;
|
|
rep = new Replicator();
|
|
return rep;
|
|
}
|
|
|
|
#define OPEN(name, mode) \
|
|
fd = ::open(name, mode, 0600); \
|
|
if (fd < 0) \
|
|
return fd; \
|
|
ScopedCloser sc(fd);
|
|
|
|
int Replicator::newObject(const char *filename, const uint8_t *data, off_t offset, size_t length )
|
|
{
|
|
int fd, err;
|
|
string objectFilename = msCachePath + "/" + string(filename);
|
|
|
|
OPEN(objectFilename.c_str(), O_WRONLY | O_CREAT);
|
|
size_t count = 0;
|
|
while (count < length) {
|
|
err = ::pwrite(fd, &data[count], length - count, offset);
|
|
if (err <= 0)
|
|
{
|
|
if (count > 0) // return what was successfully written
|
|
return count;
|
|
else
|
|
return err;
|
|
}
|
|
count += err;
|
|
}
|
|
|
|
return count;
|
|
}
|
|
|
|
int Replicator::addJournalEntry(const char *filename, const uint8_t *data, off_t offset, size_t length)
|
|
{
|
|
int fd, err;
|
|
uint64_t offlen[] = {(uint64_t) offset,length};
|
|
size_t count = 0;
|
|
int version = 1;
|
|
string journalFilename = msJournalPath + "/" + string(filename) + ".journal";
|
|
uint64_t thisEntryMaxOffset = (offset + length - 1);
|
|
|
|
bool exists = boost::filesystem::exists(journalFilename);
|
|
OPEN(journalFilename.c_str(), (exists ? O_RDWR : O_WRONLY | O_CREAT))
|
|
|
|
if (!exists)
|
|
{
|
|
// create new journal file with header
|
|
//OPEN(journalFilename.c_str(), O_WRONLY | O_CREAT);
|
|
string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str();
|
|
err = ::write(fd, header.c_str(), header.length() + 1);
|
|
assert((uint) err == header.length() + 1);
|
|
if (err <= 0)
|
|
return err;
|
|
Cache::get()->newJournalEntry(header.length() + 1);
|
|
}
|
|
else
|
|
{
|
|
// read the existing header and check if max_offset needs to be updated
|
|
//OPEN(journalFilename.c_str(), O_RDWR);
|
|
boost::shared_array<char> headertxt = seekToEndOfHeader1(fd);
|
|
stringstream ss;
|
|
ss << headertxt.get();
|
|
boost::property_tree::ptree header;
|
|
boost::property_tree::json_parser::read_json(ss, header);
|
|
assert(header.get<int>("version") == 1);
|
|
uint64_t currentMaxOffset = header.get<uint64_t>("max_offset");
|
|
if (thisEntryMaxOffset > currentMaxOffset)
|
|
{
|
|
string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str();
|
|
err = ::pwrite(fd, header.c_str(), header.length() + 1,0);
|
|
assert((uint) err == header.length() + 1);
|
|
if (err <= 0)
|
|
return err;
|
|
}
|
|
}
|
|
|
|
// XXXPAT: avoid closing and right away opening the file again when it's easy not to.
|
|
// Just reposition the file pointer.
|
|
//OPEN(journalFilename.c_str(), O_WRONLY | O_APPEND);
|
|
::lseek(fd, 0, SEEK_END);
|
|
|
|
err = ::write(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE);
|
|
assert(err == JOURNAL_ENTRY_HEADER_SIZE);
|
|
if (err <= 0)
|
|
return err;
|
|
|
|
while (count < length) {
|
|
err = ::write(fd, &data[count], length - count);
|
|
if (err < 0)
|
|
{
|
|
/* XXXPAT: We can't return anything but success here, unless we also update the entry's
|
|
header */
|
|
if (count > 0) // return what was successfully written
|
|
return count;
|
|
else
|
|
return err;
|
|
}
|
|
count += err;
|
|
}
|
|
|
|
return count;
|
|
}
|
|
|
|
int Replicator::remove(const boost::filesystem::path &filename, Flags flags)
|
|
{
|
|
int ret = 0;
|
|
|
|
if (flags & NO_LOCAL)
|
|
return 0; // not implemented yet
|
|
|
|
try
|
|
{
|
|
//#ifndef NDEBUG
|
|
// assert(boost::filesystem::remove_all(filename) > 0);
|
|
//#else
|
|
boost::filesystem::remove_all(filename);
|
|
//#endif
|
|
}
|
|
catch(boost::filesystem::filesystem_error &e)
|
|
{
|
|
#ifndef NDEBUG
|
|
cout << "Replicator::remove(): caught an execption: " << e.what() << endl;
|
|
assert(0);
|
|
#endif
|
|
errno = e.code().value();
|
|
ret = -1;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
|
|
int Replicator::remove(const char *filename, Flags flags)
|
|
{
|
|
if (flags & NO_LOCAL)
|
|
return 0; // not implemented yet
|
|
|
|
boost::filesystem::path p(filename);
|
|
return remove(p);
|
|
}
|
|
|
|
int Replicator::updateMetadata(const char *filename, MetadataFile &meta)
|
|
{
|
|
return meta.writeMetadata(filename);
|
|
}
|
|
|
|
}
|