1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-17 01:02:23 +03:00

Checkpointing work on CRP & posix tasks.

This commit is contained in:
Patrick LeBlanc
2019-01-28 08:05:22 -06:00
parent b23d507eaa
commit 5d894b9a77
26 changed files with 385 additions and 0 deletions

0
src/AppendTask.cpp Normal file
View File

0
src/AppendTask.h Normal file
View File

View File

@@ -0,0 +1,24 @@
#include "ClientRequestProcessor.h"
namespace storagemanager
{
ClientRequestProcessor::ClientRequestProcessor()
{
}
ClientRequestProcessor::~ClientRequestProcessor()
{
}
ClientRequestProcessor::processRequest(int sock)
{
ProcessTask t(sock);
threadPool.addJob(t);
}
}

View File

@@ -0,0 +1,22 @@
#include "ThreadPool.h"
namespace storagemanager
{
class ClientRequestProcessor : public boost::noncopyable
{
public:
ClientRequestProcessor();
virtual ~ClientRequestProcessor();
void processRequest(int sock);
private:
ThreadPool threadPool;
};
}

View File

0
src/ListDirectoryTask.h Normal file
View File

0
src/OpenTask.cpp Normal file
View File

20
src/OpenTask.h Normal file
View File

@@ -0,0 +1,20 @@
#ifndef OPENTASK_H_
#define OPENTASH_H_
#include "PosixTask.h"
namespace storagemanager
{
class OpenTask : public PosixTask
{
public:
OpenTask(int _sock, uint length) : PosixTask(_sock)
WORKING HERE
}
}
#endif

0
src/PingTask.cpp Normal file
View File

0
src/PingTask.h Normal file
View File

56
src/PosixTask.cpp Normal file
View File

@@ -0,0 +1,56 @@
#include "PosixTask.h"
namespace storagemanager
{
PosixTask::PosixTask(int _sock, uint _length) : sock(_sock), remainingLength(_length)
{
}
PosixTask::~PosixTask()
{
// return the socket
}
void PosixTask::handleError(int errCode)
{
char buf[80];
// TODO: construct and log a message
cout << "PosixTask caught an error reading from a socket: " << strerror_r(errCode, buf, 80) << endl;
}
/* Optimization. Make this read larger chunks into a buffer & supply data from that when possible. */
int PosixTask::read(vector<uint8_t> *buf, uint offset, uint length)
{
if (length > remainingLength)
length = remainingLength;
uint originalSize = buf->size();
buf->resize(originalSize + length);
uint count = 0;
int err;
while (count < length)
{
/* TODO: need a timeout here? */
err = ::read(sock, &(*buf)[count + offset], length - count);
if (err < 0) {
buf->resize(originalSize);
handleError(errno);
return -1;
}
else if (err == 0) {
buf->resize(originalSize);
handleError(0);
return -1;
}
count += err;
remainingLength -= err;
}
return count;
}
}

37
src/PosixTask.h Normal file
View File

@@ -0,0 +1,37 @@
#ifndef POSIX_TASK_H_
#define POSIX_TASK_H_
#include <vector>
class PosixTask
{
public:
PosixTask(int sock, uint length);
virtual ~PosixTask();
virtual void run() = 0;
protected:
int read(std::vector<uint8_t> *buf, uint offset, uint length);
bool write(const std::vector<uint8_t> &buf);
private:
PosixTask();
void handleError();
int sock;
uint remainingLength;
uint8_t buffer[4096];
uint bufferPos;
uint bufferLen;
};
#endif

81
src/ProcessTask.cpp Normal file
View File

@@ -0,0 +1,81 @@
#include "ProcessTask.h"
#include <vector>
using namespace std;
namespace storagemanager
{
ProcessTask::ProcessTask(int _sock, uint _length) : sock(_sock), length(_length)
{
assert(length > 0);
}
ProcessTask::~ProcessTask()
{
}
void ProcessTask::handleError(int saved_errno)
{
// return sock to SessionManager
char buf[80];
cout << "ProcessTask: got an error during a socket read: " << strerror_r(saved_errno, buf, 80) << endl;
}
void ProcessTask::operator()()
{
/*
Read the command from the socket
Create the appropriate PosixTask
Run it
*/
vector<uint8_t> msg;
int err;
uint8_t opcode;
err = ::read(sock, &opcode, 1);
if (err <= 0)
{
handleError(errno);
return;
}
PosixTask *task;
switch(opcode)
{
case OPEN:
task = new OpenTask(sock, length);
break;
case READ:
task = new ReadTask(sock, length);
break;
case WRITE:
task = new WriteTask(sock, length);
break;
case STAT:
task = new StatTask(sock, length);
break;
case UNLINK:
task = new UnlinkTask(sock, length);
break;
case APPEND:
task = new AppendTask(sock, length);
break;
case TRUNCATE:
task = new TruncateTask(sock, length);
break;
case LIST_DIRECTORY:
task = new ListDirectoryTask(sock, length);
break;
case PING:
task = new PingTask(sock, length);
break;
default:
throw runtime_error("ProcessTask: got an unknown opcode");
}
task->run();
}
}

32
src/ProcessTask.h Normal file
View File

@@ -0,0 +1,32 @@
#ifndef PROCESS_TASK_H_
#define PROCESS_TASK_H_
namespace storagemanager
{
class ProcessTask
{
public:
ProcessTask(int sock, uint length); // _sock is the socket to read from
virtual ~ProcessTask();
void operator()();
private:
ProcessTask();
void handleError();
int sock;
uint length;
};
}
#endif

0
src/ReadTask.cpp Normal file
View File

0
src/ReadTask.h Normal file
View File

0
src/StatTask.cpp Normal file
View File

0
src/StatTask.h Normal file
View File

72
src/ThreadPool.cpp Normal file
View File

@@ -0,0 +1,72 @@
#include "ThreadPool.h"
#include <boost/thread/locks.hpp>
using namespace std;
namespace storagemanager
{
ThreadPool::ThreadPool() : maxThreads(1000), die(false), threadsWaiting(0)
{
// Using this ctor effectively limits the # of threads here to the natural limit of the
// context it's used in. In the CRP class for example, the # of active threads would be
// naturally limited by the # of concurrent operations.
}
ThreadPool::ThreadPool(uint num_threads) : maxThreads(num_threads), die(false), threadsWaiting(0)
{
}
ThreadPool::~ThreadPool()
{
boost::mutex::unique_lock s(m);
die = true;
jobs.clear();
jobAvailable.notify_all();
s.unlock();
for (uint i = 0; i < threads.size(); i++)
threads[i]->join();
}
void ThreadPool::addJob(Job &j)
{
boost::mutex::unique_lock s(m);
jobs.push_back(j);
// Start another thread if necessary
if (threadsWaiting == 0 && threads.size() < maxThreads) {
boost::scoped_ptr<boost::thread> thread(new boost::thread(processingLoop));
threads.push_back(thread);
}
else
jobAvailable.notify_one();
}
void ThreadPool::processingLoop()
{
boost::mutex::scoped_lock s(m, boost::defer_lock_t);
while (!die)
{
s.lock();
while (jobs.empty() && !die)
{
threadsWaiting++;
jobAvailable.wait(s);
threadsWaiting--;
}
if (die)
return;
Job job = jobs.front();
jobs.pop_front();
s.unlock();
job();
}
}
}

41
src/ThreadPool.h Normal file
View File

@@ -0,0 +1,41 @@
#ifndef SM_THREADPOOL_H_
#define SM_THREADPOOL_H_
#include <list>
#include <deque>
#include <boost/thread.hpp>
//#include <boost/thread/mutex.hpp>
#include <boost/utility.hpp>
namespace storagemanager
{
typedef boost::function0<void> Job;
class ThreadPool : public boost::noncopyable
{
public:
ThreadPool(); // this ctor uses an 'unbounded' # of threads; relies on the context for a natural max.
ThreadPool(uint num_threads); // this ctor lets caller specify a limit.
virtual ~Threadpool();
// doesn't block
void addJob(Job &j);
private:
void processingLoop(); // the fcn run by each thread
uint maxThreads;
bool die;
int threadsWaiting;
std::vector<boost::scoped_ptr<boost:thread> > threads;
boost::condition jobAvailable;
std::deque<Job> jobs;
boost::mutex m;
};
}
#endif

0
src/TruncateTask.cpp Normal file
View File

0
src/TruncateTask.h Normal file
View File

0
src/UnlinkTask.cpp Normal file
View File

0
src/UnlinkTask.h Normal file
View File

0
src/WriteTask.cpp Normal file
View File

0
src/WriteTask.h Normal file
View File