1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-13 23:02:14 +03:00

Check pointing some changes I made all over the place

working on Synchronizer.  Won't build yet.
This commit is contained in:
Patrick LeBlanc
2019-03-11 16:09:07 -05:00
parent 13ade9e724
commit 9906ff9efb
12 changed files with 267 additions and 11 deletions

View File

@@ -203,6 +203,19 @@ void Cache::newObject(const string &key, size_t size)
currentCacheSize += size;
}
void Cache::newJournalEntry(size_t size)
{
boost::unique_lock<boost::mutex> s(lru_mutex);
makeSpace(size);
currentCacheSize += size;
}
void Cache::deletedJournal(size_t size)
{
boost::unique_lock<boost::mutex> s(lru_mutex);
currentCacheSize -= size;
}
void Cache::deletedObject(const string &key, size_t size)
{
boost::unique_lock<boost::mutex> s(lru_mutex);
@@ -257,7 +270,8 @@ void Cache::makeSpace(size_t size)
currentCacheSize -= statbuf.st_size;
thisMuch -= statbuf.st_size;
sync->flushObject(*it);
boost::filesystem::remove(cachedFile);
// Deleting the files will be done through Synchronizer->Replicator
//boost::filesystem::remove(cachedFile);
LRU_t::iterator toRemove = it++;
lru.erase(toRemove);
m_lru.erase(*toRemove);

View File

@@ -26,7 +26,9 @@ class Cache : public boost::noncopyable
void read(const std::vector<std::string> &keys);
void exists(const std::vector<std::string> &keys, std::vector<bool> *out);
void newObject(const std::string &key, size_t size);
void newJournalEntry(size_t size);
void deletedObject(const std::string &key, size_t size);
void deletedJournal(size_t size);
void setMaxCacheSize(size_t size);
size_t getCurrentCacheSize() const;

View File

@@ -10,11 +10,13 @@ namespace storagemanager
class CloudStorage
{
public:
/* These behave like syscalls. return code -1 means an error, and errno is set */
virtual int getObject(const std::string &sourceKey, const std::string &destFile, size_t *size = NULL) = 0;
virtual int putObject(const std::string &sourceFile, const std::string &destKey) = 0;
virtual void deleteObject(const std::string &key) = 0;
virtual int copyObject(const std::string &sourceKey, const std::string &destKey) = 0;
virtual int exists(const std::string &key, bool *out) = 0;
// this will return a CloudStorage instance of the type specified in StorageManager.cnf
static CloudStorage *get();

View File

@@ -23,7 +23,7 @@ Downloader::Downloader() : maxDownloads(0)
}
if (maxDownloads == 0)
maxDownloads = 20;
workers.reset(new ThreadPool(maxDownloads));
workers.setMaxThreads(maxDownloads);
logger = SMLogging::get();
}
@@ -64,7 +64,7 @@ int Downloader::download(const vector<const string *> &keys, vector<int> *errnos
if (inserted[i])
{
dl->listeners.push_back(&listener);
workers->addJob(dl);
workers.addJob(dl);
}
else
{

View File

@@ -70,7 +70,7 @@ class Downloader
Downloads_t downloads;
boost::mutex download_mutex;
boost::mutex &getDownloadMutex();
boost::scoped_ptr<ThreadPool> workers;
ThreadPool workers;
CloudStorage *storage;
SMLogging *logger;
};

View File

@@ -60,7 +60,7 @@ path operator+(const path &p1, const path &p2)
int LocalStorage::getObject(const string &source, const string &dest, size_t *size)
{
int ret = copy(prefix + source, dest);
int ret = copy(prefix / source, dest);
if (ret)
return ret;
if (size)
@@ -70,19 +70,25 @@ int LocalStorage::getObject(const string &source, const string &dest, size_t *si
int LocalStorage::putObject(const string &source, const string &dest)
{
return copy(source, prefix + dest);
return copy(source, prefix / dest);
}
int LocalStorage::copyObject(const string &source, const string &dest)
{
return copy(prefix + source, prefix + dest);
return copy(prefix / source, prefix / dest);
}
void LocalStorage::deleteObject(const string &key)
{
boost::system::error_code err;
boost::filesystem::remove(prefix + key, err);
boost::filesystem::remove(prefix / key, err);
}
int LocalStorage::exists(const std::string &key, bool *out)
{
*out = boost::filesystem::exists(prefix / key);
return 0;
}
}

View File

@@ -18,6 +18,7 @@ class LocalStorage : public CloudStorage
int putObject(const std::string &sourceFile, const std::string &destKey);
void deleteObject(const std::string &key);
int copyObject(const std::string &sourceKey, const std::string &destKey);
int exists(const std::string &key, bool *out);
const boost::filesystem::path & getPrefix() const;

View File

@@ -33,4 +33,9 @@ int S3Storage::copyObject(const string &sourceKey, const string &destKey)
return 0;
}
int S3Storage::exists(const string &key, bool *out)
{
return 0;
}
}

View File

@@ -18,6 +18,7 @@ class S3Storage : public CloudStorage
int putObject(const std::string &sourceFile, const std::string &destKey);
void deleteObject(const std::string &key);
int copyObject(const std::string &sourceKey, const std::string &destKey);
int exists(const std::string &key, bool *out);
private:

View File

@@ -10,7 +10,6 @@ namespace
boost::mutex inst_mutex;
}
namespace storagemanager
{
@@ -25,16 +24,201 @@ Synchronizer * Synchronizer::get()
return instance;
}
Synchronizer::Synchronizer()
Synchronizer::Synchronizer() : maxUploads(0)
{
Config *config = Config::get();
logger = SMLogging::get();
cache = Cache::get();
replicator = Replicator::get();
ioc = IOCoordinator::get();
string stmp = config->getValue("ObjectStorage", "max_concurrent_uploads")
try
{
maxUploads = stoul(stmp);
}
catch(invalid_argument)
{
logger->log(LOG_WARNING, "Downloader: Invalid arg for ObjectStorage/max_concurrent_uploads, using default of 20");
}
if (maxUploads == 0)
maxUploads = 20;
threadPool.setMaxThreads(maxUploads);
}
Synchronizer::~Synchronizer()
{
/* should this wait until all pending work is done,
or save the list it's working on.....
For milestone 2, this will do the safe thing and finish working first.
Later we can get fancy. */
}
enum OpFlags
{
NOOP = 0,
JOURNAL = 0x1,
DELETE = 0x2,
NEW_OBJECT = 0x4,
IN_PROGRESS = 0x8,
IS_FLUSH = 0x10
};
void Synchronizer::newJournalEntry(const string &key)
{
boost::unique_lock<boost::mutex> s(mutex);
auto it = pendingOps.find(key);
if (it != pendingOps.end())
{
it->second->opFlags |= JOURNAL;
return;
}
workQueue.push_back(key);
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(JOURNAL, workQueue.end() - 1));
}
void Synchronizer::newObjects(const vector<string> &keys)
{
boost::unique_lock<boost::mutex> s(mutex);
for (string &key : keys)
{
assert(pendingOps.find(key) == pendingOps.end());
workQueue.push_back(key);
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(NEW_OBJECT, workQueue.end() - 1));
}
}
void Synchronizer::deletedObjects(const vector<string> &keys)
{
boost::unique_lock<boost::mutex> s(mutex);
auto it = pendingOps.find(key);
if (it != pendingOps.end())
{
it->second->opFlags |= DELETE;
return;
}
workQueue.push_back(key);
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(DELETE, workQueue.end() - 1));
}
void Synchronizer::flushObject(const string &key)
{
/* move the work queue entry for key to the front of the queue / create if not exists
mark the pending ops as a flush
wait for the op to finish
*/
boost::unique_lock<boost::mutex> s(mutex);
auto &it = pendingOps.find(key);
if (it != pendingOps.end())
{
workQueue.splice(workQueue.begin(), workQueue, it->second.queueEntry);
it->second->opFlags |= IS_FLUSH;
it->second->wait();
}
else
{
workQueue.push_front(key);
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(IS_FLUSH, workQueue.begin()));
pendingOps[key]->wait();
}
}
void Synchronizer::process(const string &key)
{
boost::unique_lock<boost::mutex> s(mutex);
auto it = pendingOps.find(key);
assert(it != pendingOps.end());
boost::shared_ptr<PendingOps> pending = it->second;
pendingOps.erase(it);
s.unlock();
if (pending->opFlags & DELETE)
synchronizeDelete(key);
else if (pending->opFlags & JOURNAL)
synchronizerWithJournal(key, pending->opFlags & IS_FLUSH);
else if (pending->opFlags & NEW_OBJECT)
synchronize(key, pending->opFlags & IS_FLUSH);
else
// complain
;
if (pending->opFlags & IS_FLUSH)
pending->notify();
}
struct ScopedReadLock
{
ScopedReadLock(IOCoordinator *i, string &key)
{
ioc = i;
ioc->getReadLock(key);
}
~ScopedReadLock()
{
ioc->releaseReadLock(key);
}
IOCoordinator *ioc;
};
struct ScopedWriteLock
{
ScopedWriteLock(IOCoordinator *i, string &key)
{
ioc = i;
ioc->getWriteLock(key);
}
~ScopedReadLock()
{
ioc->releaseWriteLock(key);
}
IOCoordinator *ioc;
};
void Synchronizer::synchronize(const std::string &key, bool isFlush)
{
ScopedReadLock s(ioc, key);
bool exists = false;
cs->exists(key, &exists);
if (!exists)
{
cs->putObject(cache->getCachePath() / key, key);
replicator->delete(key, Replicator::NO_LOCAL);
}
if (isFlush)
replicator->delete(key, Replicator::LOCAL_ONLY);
}
void Synchronizer::synchronizeWithJournal(const string &key, bool isFlush)
{
/* The helper objects & fcns */
Synchronizer::PendingOps(int flags, list<string>::iterator pos) : opFlags(flags), finished(false), queueEntry(pos)
{
}
Synchronizer::PendingOps::notify()
{
boost::unique_lock<boost::mutex> s(mutex);
finished = true;
condvar.notify_all();
}
Synchronizer::PendingOps::wait()
{
while (!finished)
condvar.wait(mutex);
}
}

View File

@@ -3,8 +3,16 @@
#define SYNCHRONIZER_H_
#include <string>
#include <map>
#include <deque>
#include <boost/utility.hpp>
#include "SMLogging.h"
#include "Cache.h"
#include "Replicator.h"
#include "IOCoordinator.h"
#include "ThreadPool.h"
namespace storagemanager
{
@@ -14,10 +22,41 @@ class Synchronizer : public boost::noncopyable
static Synchronizer *get();
virtual ~Synchronizer();
void newJournalEntry(const std::string &key);
void newObjects(const std::vector<std::string> &keys);
void deletedObjects(const std::vector<std::string> &keys);
void flushObject(const std::string &key);
private:
Synchronizer();
struct FlushListener
{
FlushListener(boost::mutex *m, boost::condvar *c);
boost::mutex *mutex;
boost::condition *condvar;
void flushed();
}
struct PendingOps
{
PendingOps(int flags, std::list<std::string>::iterator pos);
int opFlags;
bool finished;
std::list<std::string>::iterator queueEntry;
boost::condition condvar;
void wait();
void notify();
};
ThreadPool threadPool;
std::map<std::string, boost::shared_ptr<PendingOps> > pendingOps;
std::list<std::string> workQueue;
SMLogging *logger;
Cache *cache;
Replicator *replicator;
IOCoordinator *ioc;
boost::mutex mutex;
};
}

View File

@@ -3,6 +3,8 @@ service = S3
object_size = 5M
metadata_path = ${HOME}/storagemanager/metadata
max_concurrent_downloads = 20
max_concurrent_uploads = 20
[S3]
region = us-west-1