You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-15 12:09:09 +03:00
Checkpointing mods for IOC::read(). Doesn't build yet.
This commit is contained in:
@@ -31,6 +31,7 @@ set(storagemanager_SRCS
|
||||
src/RWLock.cpp
|
||||
src/MetadataFile.cpp
|
||||
src/Replicator.cpp
|
||||
src/Utilities.cpp
|
||||
)
|
||||
|
||||
option(TRACE "Enable some tracing output" OFF)
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
|
||||
#include "IOCoordinator.h"
|
||||
#include "Cache.h"
|
||||
#include "MetadataFile.h"
|
||||
#include "SMLogging.h"
|
||||
#include "Utilities.h"
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
@@ -24,12 +23,15 @@ namespace
|
||||
boost::mutex m;
|
||||
}
|
||||
|
||||
namespace bf = boost::filesystem;
|
||||
|
||||
namespace storagemanager
|
||||
{
|
||||
|
||||
IOCoordinator::IOCoordinator()
|
||||
{
|
||||
config = Config::get();
|
||||
cache = Cache::get();
|
||||
logger = SMLogging::get();
|
||||
replicator = Replicator::get();
|
||||
objectSize = 5 * (1<<20);
|
||||
@@ -42,6 +44,8 @@ IOCoordinator::IOCoordinator()
|
||||
cerr << "ObjectStorage/object_size must be set to a numeric value" << endl;
|
||||
throw;
|
||||
}
|
||||
cachePath = cache->getCachePath();
|
||||
journalPath = cache->getJournalPath();
|
||||
}
|
||||
|
||||
IOCoordinator::~IOCoordinator()
|
||||
@@ -62,26 +66,149 @@ IOCoordinator * IOCoordinator::get()
|
||||
void IOCoordinator::willRead(const char *, off_t, size_t)
|
||||
{
|
||||
// no cache yet
|
||||
// not sure we will implement this.
|
||||
}
|
||||
|
||||
struct scoped_closer {
|
||||
scoped_closer(int f) : fd(f) { }
|
||||
~scoped_closer() {
|
||||
int s_errno = errno;
|
||||
::close(fd);
|
||||
errno = s_errno;
|
||||
}
|
||||
int fd;
|
||||
};
|
||||
|
||||
#define OPEN(name, mode) \
|
||||
fd = ::open(filename, mode, 0660); \
|
||||
if (fd < 0) \
|
||||
return fd; \
|
||||
scoped_closer sc(fd);
|
||||
ScopedCloser sc(fd);
|
||||
|
||||
int IOCoordinator::loadObject(int fd, uint8_t *data, off_t offset, size_t length)
|
||||
{
|
||||
size_t count = 0;
|
||||
int err;
|
||||
|
||||
::lseek(fd, offset, SEEK_SET);
|
||||
while (count < length)
|
||||
{
|
||||
err = ::read(fd, &data[count], length - count);
|
||||
if (err < 0)
|
||||
return err;
|
||||
else if (err == 0)
|
||||
{
|
||||
errno = ENODATA; // better errno for early EOF?
|
||||
return -1;
|
||||
}
|
||||
count += err;
|
||||
}
|
||||
}
|
||||
|
||||
int IOCoordinator::loadObjectWithJournal(const char *objFilename, const char *journalFilename,
|
||||
uint8_t *data, off_t offset, size_t length)
|
||||
{
|
||||
boost::shared_array<uint8_t> argh;
|
||||
|
||||
argh = mergeJournal(objFilename, journalFilename, offset, &length);
|
||||
if (!argh)
|
||||
return -1;
|
||||
else
|
||||
memcpy(data, argh.get(), length);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int IOCoordinator::read(const char *filename, uint8_t *data, off_t offset, size_t length)
|
||||
{
|
||||
/*
|
||||
This is a bit complex and verbose, so for the first cut, it won't bother returning
|
||||
a partial result. If an error happens, it will just fail the whole operation.
|
||||
*/
|
||||
|
||||
/*
|
||||
Get the read lock on filename
|
||||
Figure out which objects are relevant to the request
|
||||
call Cache::read(objects)
|
||||
Open any journal files that exist to prevent deletion
|
||||
open all object files to prevent deletion
|
||||
release read lock
|
||||
put together the response in data
|
||||
*/
|
||||
|
||||
ScopedReadLock fileLock(filename);
|
||||
Metadatafile meta(filename);
|
||||
vector<metadataObject> relevants = meta.metadataRead(offset, length);
|
||||
map<string, int> journalFDs, objectFDs;
|
||||
map<string, string> keyToJournalName, keyToObjectName;
|
||||
vector<SharedCloser> fdMinders;
|
||||
char buf[80];
|
||||
|
||||
// load them into the cache
|
||||
vector<string> keys;
|
||||
for (const auto &it : relevants)
|
||||
keys.push_back(it->key);
|
||||
cache->read(keys);
|
||||
|
||||
// open the journal files and objects that exist to prevent them from being
|
||||
// deleted mid-operation
|
||||
for (const auto &key : keys)
|
||||
{
|
||||
// trying not to think about how expensive all these type conversions are.
|
||||
// need to standardize on type. Or be smart and build filenames in a char [].
|
||||
// later. not thinking about it for now.
|
||||
|
||||
// open all of the journal files that exist
|
||||
string filename = (journalPath/(*key + ".journal").string();
|
||||
int fd = ::open(filename.c_str(), O_RDONLY);
|
||||
if (fd >= 0)
|
||||
{
|
||||
keyToJournalName[*key] = filename;
|
||||
journalFDs[filename] = fd;
|
||||
fdMinders.push_back(SharedCloser(fd));
|
||||
}
|
||||
else if (errno != EEXIST)
|
||||
{
|
||||
int l_errno = errno;
|
||||
logger->log(LOG_CRIT, "IOCoordinator::read(): Got an unexpected error opening %s, error was '%s'",
|
||||
filename.c_str(), strerror_r(l_errno, buf, 80));
|
||||
errno = l_errno;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// open all of the objects
|
||||
filename = (cachePath)/(*key)).string();
|
||||
fd = ::open(filename.c_str(), O_RDONLY);
|
||||
if (fd < 0)
|
||||
{
|
||||
int l_errno = errno;
|
||||
logger->log(LOG_CRIT, "IOCoordinator::read(): Got an unexpected error opening %s, error was '%s'",
|
||||
filename.c_str(), strerror_r(l_errno, buf, 80));
|
||||
errno = l_errno;
|
||||
return -1;
|
||||
}
|
||||
keyToObjectName[*key] = filename;
|
||||
objectFDs[*key] = fd;
|
||||
fdMinders.push_back(SharedCloser(fd));
|
||||
}
|
||||
fileLock.unlock();
|
||||
|
||||
// copy data from each object + journal into the returned data
|
||||
size_t count = 0;
|
||||
boost::shared_array<uint8_t> mergedData;
|
||||
for (auto &object : relevants)
|
||||
{
|
||||
auto &jit = journalFDs.find(object->key);
|
||||
|
||||
// if this is the first object, the offset to start reading at is offset - object->offset
|
||||
off_t thisOffset = (object == relevants.begin() ? offset - object->offset : 0);
|
||||
|
||||
// if this is the last object, the length of the read is length - count,
|
||||
// otherwise it is the length of the object
|
||||
size_t thisLength = min(object->length, count - length);
|
||||
if (jit == journalFDs.end())
|
||||
err = loadObject(objectFDs.find(object->key), &data[count], thisOffset, thisLength);
|
||||
else
|
||||
err = loadObjectAndJournal(keyToObjectName, keyToJournalName, &data[count], thisOffset, thisLength);
|
||||
if (err)
|
||||
return -1;
|
||||
|
||||
count += thisLength;
|
||||
}
|
||||
|
||||
// all done
|
||||
return length;
|
||||
|
||||
/*
|
||||
int fd, err;
|
||||
|
||||
OPEN(filename, O_RDONLY);
|
||||
@@ -99,6 +226,7 @@ int IOCoordinator::read(const char *filename, uint8_t *data, off_t offset, size_
|
||||
}
|
||||
|
||||
return count;
|
||||
*/
|
||||
}
|
||||
|
||||
int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset, size_t length)
|
||||
@@ -212,9 +340,9 @@ int IOCoordinator::open(const char *filename, int openmode, struct stat *out)
|
||||
|
||||
/* create all subdirs if necessary. We don't care if directories actually get created. */
|
||||
if (openmode & O_CREAT) {
|
||||
boost::filesystem::path p(filename);
|
||||
bf::path p(filename);
|
||||
boost::system::error_code ec;
|
||||
boost::filesystem::create_directories(p.parent_path(), ec);
|
||||
bf::create_directories(p.parent_path(), ec);
|
||||
}
|
||||
OPEN(filename, openmode);
|
||||
return fstat(fd, out);
|
||||
@@ -222,22 +350,22 @@ int IOCoordinator::open(const char *filename, int openmode, struct stat *out)
|
||||
|
||||
int IOCoordinator::listDirectory(const char *filename, vector<string> *listing)
|
||||
{
|
||||
boost::filesystem::path p(filename);
|
||||
bf::path p(filename);
|
||||
|
||||
listing->clear();
|
||||
if (!boost::filesystem::exists(p))
|
||||
if (!bf::exists(p))
|
||||
{
|
||||
errno = ENOENT;
|
||||
return -1;
|
||||
}
|
||||
if (!boost::filesystem::is_directory(p))
|
||||
if (!bf::is_directory(p))
|
||||
{
|
||||
errno = ENOTDIR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
boost::filesystem::directory_iterator it(p), end;
|
||||
for (boost::filesystem::directory_iterator it(p); it != end; it++)
|
||||
bf::directory_iterator it(p), end;
|
||||
for (bf::directory_iterator it(p); it != end; it++)
|
||||
listing->push_back(it->path().filename().string());
|
||||
return 0;
|
||||
}
|
||||
@@ -258,13 +386,13 @@ will no longer make it like the 'unlink' syscall. */
|
||||
int IOCoordinator::unlink(const char *path)
|
||||
{
|
||||
int ret = 0;
|
||||
boost::filesystem::path p(path);
|
||||
bf::path p(path);
|
||||
|
||||
try
|
||||
{
|
||||
boost::filesystem::remove_all(path);
|
||||
bf::remove_all(path);
|
||||
}
|
||||
catch(boost::filesystem::filesystem_error &e)
|
||||
catch(bf::filesystem_error &e)
|
||||
{
|
||||
errno = e.code().value();
|
||||
ret = -1;
|
||||
@@ -279,9 +407,9 @@ int IOCoordinator::copyFile(const char *filename1, const char *filename2)
|
||||
SMLogging* logger = SMLogging::get();
|
||||
int err = 0, l_errno;
|
||||
try {
|
||||
boost::filesystem::copy_file(filename1, filename2);
|
||||
bf::copy_file(filename1, filename2);
|
||||
}
|
||||
catch (boost::filesystem::filesystem_error &e) {
|
||||
catch (bf::filesystem_error &e) {
|
||||
err = -1;
|
||||
l_errno = e.code().value(); // why not.
|
||||
// eh, not going to translate all of boost's errors into our errors for this.
|
||||
@@ -323,6 +451,10 @@ boost::shared_array<char> seekToEndOfHeader1(int fd)
|
||||
throw runtime_error("seekToEndOfHeader1: did not find the end of the header");
|
||||
}
|
||||
|
||||
void IOCoordinator::mergeJournal(int objFD, int journalFD, uint8_t *buf, off_t offset, size_t *len)
|
||||
{
|
||||
throw runtime_error("IOCoordinator::mergeJournal(int, int, etc) is not implemented yet.");
|
||||
}
|
||||
|
||||
boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, const char *journal, off_t offset,
|
||||
size_t *len) const
|
||||
@@ -456,7 +588,9 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array<uint8_t> &objData, size
|
||||
|
||||
if (maxJournalOffset > *len)
|
||||
{
|
||||
objData.reset(new uint8_t[maxJournalOffset]);
|
||||
uint8_t *newbuf = new uint8_t[maxJournalOffset];
|
||||
memcpy(newbuf, objData.get(), *len);
|
||||
objData.reset(newbuf);
|
||||
*len = maxJournalOffset;
|
||||
}
|
||||
|
||||
|
||||
@@ -10,8 +10,10 @@
|
||||
#include <boost/utility.hpp>
|
||||
#include <boost/thread/mutex.hpp>
|
||||
#include <boost/shared_array.hpp>
|
||||
#include <boost/filesystem.hpp>
|
||||
|
||||
#include "Config.h"
|
||||
#include "Cache.h"
|
||||
#include "SMLogging.h"
|
||||
#include "RWLock.h"
|
||||
#include "Replicator.h"
|
||||
@@ -42,7 +44,16 @@ class IOCoordinator : public boost::noncopyable
|
||||
// *len should be set to the length of the data requested (0 means read the whole file),
|
||||
// on return *len will be the actual length returned.
|
||||
boost::shared_array<uint8_t> mergeJournal(const char *objectPath, const char *journalPath, off_t offset, size_t *len) const;
|
||||
int mergeJournalInMem(boost::shared_array<uint8_t> &objData, size_t *len, const char *journalPath);
|
||||
|
||||
// this version modifies object data in memory, given the journal filename
|
||||
int mergeJournalInMem(boost::shared_array<uint8_t> &objData, size_t *len, const char *journalPath) const;
|
||||
|
||||
// this version takes already-open file descriptors, and an already-allocated buffer as input.
|
||||
// file descriptor are positioned, eh, best not to assume anything about their positions
|
||||
// on return.
|
||||
// Not implemented yet. At the point of this writing, we will use the existing versions, even though
|
||||
// it's wasteful, and will leave this as a likely future optimization.
|
||||
int mergeJournal(int objFD, int journalFD, uint8_t *buf, off_t offset, size_t *len) const;
|
||||
|
||||
/* Lock manipulation fcns. They can lock on any param given to them. */
|
||||
void renameObject(const std::string &oldKey, const std::string &newKey);
|
||||
@@ -54,12 +65,19 @@ class IOCoordinator : public boost::noncopyable
|
||||
private:
|
||||
IOCoordinator();
|
||||
Config *config;
|
||||
Cache *cache;
|
||||
SMLogging *logger;
|
||||
Replicator *replicator;
|
||||
size_t objectSize;
|
||||
boost::filesystem::path journalPath;
|
||||
boost::filesystem::path cachePath;
|
||||
|
||||
std::map<std::string, RWLock *> locks;
|
||||
boost::mutex lockMutex; // lol
|
||||
|
||||
int loadObjectWithJournal(const char *objFilename, const char *journalFilename,
|
||||
uint8_t *data, off_t offset, size_t length);
|
||||
int loadObject(int fd, uint8_t *data, off_t offset, size_t length);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -129,6 +129,27 @@ MetadataFile::~MetadataFile()
|
||||
|
||||
vector<metadataObject> MetadataFile::metadataRead(off_t offset, size_t length)
|
||||
{
|
||||
// this version assumes mObjects is sorted by offset, and there are no gaps between objects
|
||||
vector<metadataObject> ret;
|
||||
size_t foundLen = 0;
|
||||
|
||||
auto i = mObjects.begin();
|
||||
// find the first object in range
|
||||
while (i != mObjects.end())
|
||||
if (offset >= i->offset)
|
||||
break;
|
||||
|
||||
// append objects until foundLen >= length or EOF
|
||||
while (i != mObjects.end() && foundLen < length)
|
||||
{
|
||||
ret.push_back(*i);
|
||||
foundLen += i->length;
|
||||
++i;
|
||||
}
|
||||
return ret;
|
||||
|
||||
#if 0
|
||||
// this version assumed mObjects was unsorted
|
||||
vector<metadataObject> returnObjs;
|
||||
uint64_t startData = offset;
|
||||
uint64_t endData = offset + length;
|
||||
@@ -157,6 +178,7 @@ vector<metadataObject> MetadataFile::metadataRead(off_t offset, size_t length)
|
||||
}
|
||||
|
||||
return returnObjs;
|
||||
#endif
|
||||
}
|
||||
|
||||
metadataObject MetadataFile::addMetadataObject(const char *filename, size_t length)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
|
||||
#include "Synchronizer.h"
|
||||
#include "MetadataFile.h"
|
||||
#include "Utilities.h"
|
||||
#include <boost/thread/mutex.hpp>
|
||||
|
||||
#include <sys/stat.h>
|
||||
@@ -13,57 +14,6 @@ namespace
|
||||
{
|
||||
storagemanager::Synchronizer *instance = NULL;
|
||||
boost::mutex inst_mutex;
|
||||
|
||||
// a few utility classes. Maybe move these to a utilities header.
|
||||
struct ScopedReadLock
|
||||
{
|
||||
ScopedReadLock(storagemanager::IOCoordinator *i, const string &k) : ioc(i), key(k)
|
||||
{
|
||||
ioc->readLock(key);
|
||||
}
|
||||
~ScopedReadLock()
|
||||
{
|
||||
ioc->readUnlock(key);
|
||||
}
|
||||
storagemanager::IOCoordinator *ioc;
|
||||
const string key;
|
||||
};
|
||||
|
||||
struct ScopedWriteLock
|
||||
{
|
||||
ScopedWriteLock(storagemanager::IOCoordinator *i, const string &k) : ioc(i), key(k)
|
||||
{
|
||||
ioc->writeLock(key);
|
||||
locked = true;
|
||||
}
|
||||
~ScopedWriteLock()
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
void unlock()
|
||||
{
|
||||
if (locked)
|
||||
{
|
||||
ioc->writeUnlock(key);
|
||||
locked = false;
|
||||
}
|
||||
}
|
||||
storagemanager::IOCoordinator *ioc;
|
||||
bool locked;
|
||||
const string key;
|
||||
};
|
||||
|
||||
struct ScopedCloser {
|
||||
ScopedCloser(int f) : fd(f) { }
|
||||
~ScopedCloser() {
|
||||
int s_errno = errno;
|
||||
::close(fd);
|
||||
errno = s_errno;
|
||||
}
|
||||
int fd;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
namespace bf = boost::filesystem;
|
||||
|
||||
106
src/Utilities.cpp
Normal file
106
src/Utilities.cpp
Normal file
@@ -0,0 +1,106 @@
|
||||
#include "Utilities.h"
|
||||
#include "IOCoordinator.h"
|
||||
|
||||
namespace storagemanager
|
||||
{
|
||||
|
||||
|
||||
ScopedReadLock::ScopedReadLock(IOCoordinator *i, const std::string &k) : ioc(i), key(k)
|
||||
{
|
||||
lock();
|
||||
}
|
||||
ScopedReadLock::~ScopedReadLock()
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
void ScopedReadLock::lock()
|
||||
{
|
||||
assert(!locked);
|
||||
ioc->readLock(key);
|
||||
locked = true;
|
||||
}
|
||||
|
||||
void ScopedReadLock::unlock()
|
||||
{
|
||||
if (locked)
|
||||
{
|
||||
ioc->readUnlock(key);
|
||||
locked = false;
|
||||
}
|
||||
}
|
||||
|
||||
ScopedWriteLock::ScopedWriteLock(IOCoordinator *i, const std::string &k) : ioc(i), key(k)
|
||||
{
|
||||
lock();
|
||||
}
|
||||
ScopedWriteLock::~ScopedWriteLock()
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
void ScopedWriteLock::lock()
|
||||
{
|
||||
assert(!locked);
|
||||
ioc->writeLock(key);
|
||||
locked = true;
|
||||
}
|
||||
|
||||
void ScopedWriteLock::unlock()
|
||||
{
|
||||
if (locked)
|
||||
{
|
||||
ioc->writeUnlock(key);
|
||||
locked = false;
|
||||
}
|
||||
}
|
||||
|
||||
ScopedCloser::ScopedCloser(int f) : fd(f) { }
|
||||
ScopedCloser::~ScopedCloser() {
|
||||
int s_errno = errno;
|
||||
::close(fd);
|
||||
errno = s_errno;
|
||||
}
|
||||
|
||||
|
||||
|
||||
struct SharedCloser
|
||||
{
|
||||
public:
|
||||
SharedCloser(int f);
|
||||
SharedCloser(const SharedCloser &);
|
||||
~SharedCloser();
|
||||
|
||||
private:
|
||||
struct CtrlBlock
|
||||
{
|
||||
int fd;
|
||||
uint refCount;
|
||||
};
|
||||
|
||||
CtrlBlock *block;
|
||||
};
|
||||
|
||||
SharedCloser::SharedCloser(int f)
|
||||
{
|
||||
block = new CtrlBlock();
|
||||
block->fd = f;
|
||||
block->refCount = 1;
|
||||
}
|
||||
|
||||
SharedCloser(const SharedCloser &s) : block(s.block)
|
||||
{
|
||||
block->refCount++;
|
||||
}
|
||||
|
||||
SharedCloser::~SharedCloser()
|
||||
{
|
||||
block->refCount--;
|
||||
if (block->refCount == 0)
|
||||
{
|
||||
::close(block->fd);
|
||||
delete block;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
52
src/Utilities.h
Normal file
52
src/Utilities.h
Normal file
@@ -0,0 +1,52 @@
|
||||
|
||||
#include <std::string>
|
||||
|
||||
|
||||
namespace storagemanager
|
||||
{
|
||||
|
||||
class IOCoordinator;
|
||||
|
||||
// a few utility classes we've coded here and there, now de-duped and centralized
|
||||
struct ScopedReadLock
|
||||
{
|
||||
ScopedReadLock(IOCoordinator *i, const std::string &k);
|
||||
~ScopedReadLock();
|
||||
void lock();
|
||||
void unlock();
|
||||
|
||||
IOCoordinator *ioc;
|
||||
bool locked;
|
||||
const std::string key;
|
||||
};
|
||||
|
||||
struct ScopedWriteLock
|
||||
{
|
||||
ScopedWriteLock(IOCoordinator *i, const std::string &k);
|
||||
~ScopedWriteLock();
|
||||
void lock();
|
||||
void unlock();
|
||||
|
||||
IOCoordinator *ioc;
|
||||
bool locked;
|
||||
const std::string key;
|
||||
};
|
||||
|
||||
class ScopedCloser
|
||||
{
|
||||
public:
|
||||
SharedCloser(int f);
|
||||
SharedCloser(const SharedCloser &);
|
||||
~SharedCloser();
|
||||
|
||||
private:
|
||||
struct CtrlBlock
|
||||
{
|
||||
int fd;
|
||||
uint refCount;
|
||||
};
|
||||
|
||||
CtrlBlock *block;
|
||||
};
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user