1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-17 01:02:23 +03:00

Got it to build.

This commit is contained in:
Patrick LeBlanc
2019-01-29 13:14:48 -06:00
parent 2624dc048d
commit 9bad261489
23 changed files with 180 additions and 62 deletions

View File

@@ -1,9 +1,28 @@
include_directories(storage-manager/include)
cmake_minimum_required(VERSION 2.8.12)
set(storagemanager_SRCS main.cpp SessionManager.cpp)
include_directories(include)
set(storagemanager_SRCS
src/AppendTask.cpp
src/ClientRequestProcessor.cpp
src/ListDirectoryTask.cpp
src/OpenTask.cpp
src/PingTask.cpp
src/PosixTask.cpp
src/ProcessTask.cpp
src/ReadTask.cpp
src/StatTask.cpp
src/ThreadPool.cpp
src/TruncateTask.cpp
src/UnlinkTask.cpp
src/WriteTask.cpp
main.cpp
# SessionManager.cpp
)
add_executable(StorageManager ${storagemanager_SRCS})
target_link_libraries(StorageManager boost_system boost_thread)
//install(TARGETS StorageManager DESTINATION ${ENGINE_BINDIR} COMPONENT platform)
#install(TARGETS StorageManager DESTINATION ${ENGINE_BINDIR} COMPONENT platform)

View File

@@ -1,5 +1,6 @@
#include "AppendTask.h"
#include <errno.h>
using namespace std;
@@ -17,10 +18,12 @@ AppendTask::~AppendTask()
#define check_error(msg) \
if (!success) \
{ \
handleError(msg); \
handleError(msg, errno); \
return; \
}
#define min(x, y) (x < y ? x : y)
void AppendTask::run()
{
bool success;
@@ -28,7 +31,7 @@ void AppendTask::run()
success = read(cmdbuf, sizeof(struct cmd_overlay));
check_error("AppendTask read");
cmd_overlay *cmd = cmdbuf;
cmd_overlay *cmd = (cmd_overlay *) cmdbuf;
success = read(&cmdbuf[sizeof(*cmd)], min(cmd->filename_len, 1024 - sizeof(*cmd) - 1));
check_error("AppendTask read");

View File

@@ -8,7 +8,7 @@
namespace storagemanager
{
class AppendTask : PosixTask
class AppendTask : public PosixTask
{
public:
AppendTask(int sock, uint length);

View File

@@ -1,6 +1,8 @@
#include "ClientRequestProcessor.h"
#include "ProcessTask.h"
#include <sys/types.h>
namespace storagemanager
{
@@ -13,9 +15,9 @@ ClientRequestProcessor::~ClientRequestProcessor()
{
}
ClientRequestProcessor::processRequest(int sock)
void ClientRequestProcessor::processRequest(int sock, uint len)
{
ProcessTask t(sock);
ProcessTask t(sock, len);
threadPool.addJob(t);
}

View File

@@ -1,5 +1,9 @@
#ifndef CLIENTREQUESTPROCESSOR_H_
#define CLIENTREQUESTPROCESSOR_H_
#include "ThreadPool.h"
#include <sys/types.h>
namespace storagemanager
@@ -11,12 +15,12 @@ class ClientRequestProcessor : public boost::noncopyable
ClientRequestProcessor();
virtual ~ClientRequestProcessor();
void processRequest(int sock);
void processRequest(int sock, uint len);
private:
ThreadPool threadPool;
};
}
#endif

View File

@@ -1,5 +1,8 @@
#include "ListDirectoryTask.h"
#include "messageFormat.h"
#include <errno.h>
#include <string.h>
using namespace std;
@@ -28,7 +31,9 @@ ListDirectoryTask::~ListDirectoryTask()
return false; \
}
bool ListDirectoryTask::writeString(uint8_t buf, int *offset, int size, const string &str)
#define min(x, y) (x < y ? x : y)
bool ListDirectoryTask::writeString(uint8_t *buf, int *offset, int size, const string &str)
{
bool success;
if (size - *offset < 4) // eh, let's not frag 4 bytes.
@@ -85,10 +90,10 @@ void ListDirectoryTask::run()
buf32[0] = SM_MSG_START;
buf32[1] = payloadLen;
int offset = 8;
int offset = SM_HEADER_LEN;
for (uint i = 0; i < listing.size(); i++)
{
success = writeString(buf, &offset, 1024, listing);
success = writeString(buf, &offset, 1024, listing[i]);
check_error("ListDirectoryTask write");
}

View File

@@ -3,6 +3,7 @@
#define LIST_DIRECTORYTASK_H_
#include "PosixTask.h"
#include <string>
namespace storagemanager
{
@@ -18,7 +19,7 @@ class ListDirectoryTask : public PosixTask
private:
ListDirectoryTask();
void writeString(uint8_t buf, int *offset, int size, const std::string &str);
bool writeString(uint8_t *buf, int *offset, int size, const std::string &str);
struct cmd_overlay {
uint plen;
char path[];

View File

@@ -1,7 +1,10 @@
#include "OpenTask.h"
#include "messageFormat.h"
#include <sys/stat.h>
#include <errno.h>
#include <string.h>
using namespace std;
@@ -12,6 +15,10 @@ OpenTask::OpenTask(int sock, uint len) : PosixTask(sock, len)
{
}
OpenTask::~OpenTask()
{
}
void OpenTask::run()
{
/*
@@ -20,16 +27,15 @@ void OpenTask::run()
return the result
*/
bool success;
uint8_t *buf;
uint8_t buf[1024] = {0};
if (getLength() > 1024)
if (getLength() > 1023)
{
handleError("OpenTask read", ENAMETOOLONG);
return;
}
buf = alloca(getLength());
success = read(&buf, getLength());
success = read(buf, getLength());
if (!success)
{
handleError("OpenTask read", errno);
@@ -38,12 +44,13 @@ void OpenTask::run()
cmd_overlay *cmd = (cmd_overlay *) buf;
// IOC->open(filename, openmode, &buf[SM_HEADER_LEN])
// IOC->open(cmd->filename, cmd->openmode, &buf[SM_HEADER_LEN])
// stand-in dummy response
uint32_t *buf32 = (uint32_t *) buf;
buf32[0] = SM_MSG_START;
buf32[1] = sizeof(struct stat);
memset(&buf[SM_HEADER_LEN], 0, sizeof(struct stat));
success = write(buf, sizeof(struct stat) + SM_HEADER_LEN);
if (!success)
handleError("OpenTask write", errno);

View File

@@ -1,5 +1,6 @@
#include "PingTask.h"
#include "messageFormat.h"
namespace storagemanager
{
@@ -18,3 +19,5 @@ void PingTask::run()
uint32_t buf[3] = { SM_MSG_START, 4, 0 }; // generic success response
write((uint8_t *) buf, 12);
}
}

View File

@@ -8,7 +8,7 @@
namespace storagemanager
{
class PingTask : PosixTask
class PingTask : public PosixTask
{
public:
PingTask(int sock, uint length);

View File

@@ -1,7 +1,14 @@
#include "PosixTask.h"
#include "messageFormat.h"
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#define min(x, y) (x < y ? x : y)
using namespace std;
namespace storagemanager
{
@@ -19,7 +26,7 @@ PosixTask::PosixTask(int _sock, uint _length) :
PosixTask::~PosixTask()
{
comsumeMsg();
consumeMsg();
if (!socketReturned)
returnSocket();
}
@@ -29,12 +36,12 @@ void PosixTask::handleError(const char *name, int errCode)
char buf[80];
// send an error response if possible
int32_t *buf32;
int32_t *buf32 = (int32_t *) buf;
buf32[0] = SM_MSG_START;
buf32[1] = 8;
buf32[2] = -1;
buf32[3] = errCode;
write(buf32, 16);
write((uint8_t *) buf, 16);
// TODO: construct and log a message
cout << name << " caught an error reading from a socket: " << strerror_r(errCode, buf, 80) << endl;
@@ -66,7 +73,6 @@ bool PosixTask::read(uint8_t *buf, uint length)
{
if (length > remainingLengthForCaller)
length = remainingLengthForCaller;
if (length == 0)
return false;
@@ -124,13 +130,13 @@ void PosixTask::primeBuffer()
{
// debating whether it is more efficient to use a circular buffer + more
// recv's, or to move data to reduce the # of recv's. WAG: moving data.
memmove(buffer, &buffer[bufferPos], bufferLen - bufferPos);
memmove(localBuffer, &localBuffer[bufferPos], bufferLen - bufferPos);
bufferLen -= bufferPos;
bufferPos = 0;
}
uint toRead = min(remainingLengthInStream, bufferSize - bufferLen);
err = ::recv(sock, &localBuffer[bufferLen], toRead, MSG_NOBLOCK);
int err = ::recv(sock, &localBuffer[bufferLen], toRead, MSG_DONTWAIT);
// ignoring errors here since this is supposed to be silent.
// errors will be caught by the next read
if (err > 0)
@@ -141,14 +147,14 @@ void PosixTask::primeBuffer()
}
}
bool PosixTask::write(uint8_t *buf, uint len)
bool PosixTask::write(const uint8_t *buf, uint len)
{
int err;
uint count = 0;
while (count < len)
{
err = ::write(sock, &buf[count], len - count);
err = ::send(sock, &buf[count], len - count, 0);
if (err < 0)
return false;
count += err;
@@ -172,7 +178,7 @@ void PosixTask::consumeMsg()
while (remainingLengthInStream > 0)
{
err = ::read(sock, buf, min(remainingLengthInStream, 1024));
err = ::recv(sock, buf, min(remainingLengthInStream, 1024), 0);
if (err <= 0) {
remainingLengthInStream = 0;
break;

View File

@@ -4,6 +4,11 @@
#define POSIX_TASK_H_
#include <vector>
#include <sys/types.h>
#include <stdint.h>
namespace storagemanager
{
class PosixTask
{
@@ -15,9 +20,9 @@ class PosixTask
void primeBuffer();
protected:
int read(uint8_t *buf, uint length);
bool read(uint8_t *buf, uint length);
bool write(const std::vector<uint8_t> &buf);
bool write(void *buf, uint length);
bool write(const uint8_t *buf, uint length);
void consumeMsg(); // drains the remaining portion of the message
uint getLength(); // returns the total length of the msg
uint getRemainingLength(); // returns the remaining length from the caller's perspective
@@ -39,9 +44,5 @@ class PosixTask
bool socketReturned;
};
}
#endif

View File

@@ -1,24 +1,40 @@
#include "ProcessTask.h"
#include <vector>
#include <iostream>
#include "messageFormat.h"
#include "AppendTask.h"
#include "ListDirectoryTask.h"
#include "OpenTask.h"
#include "PingTask.h"
#include "ReadTask.h"
#include "StatTask.h"
#include "TruncateTask.h"
#include "UnlinkTask.h"
#include "WriteTask.h"
using namespace std;
namespace storagemanager
{
ProcessTask::ProcessTask(int _sock, uint _length) : sock(_sock), length(_length)
ProcessTask::ProcessTask(int _sock, uint _length) : sock(_sock), length(_length), returnedSock(false)
{
assert(length > 0);
}
ProcessTask::~ProcessTask()
{
if (!returnedSock)
; // SM->returnSocket(sock);
}
void ProcessTask::handleError(int saved_errno)
{
// return sock to SessionManager
// SM->socketError(sock);
returnedSock = true;
char buf[80];
cout << "ProcessTask: got an error during a socket read: " << strerror_r(saved_errno, buf, 80) << endl;
}

View File

@@ -3,10 +3,12 @@
#ifndef PROCESS_TASK_H_
#define PROCESS_TASK_H_
#include "ThreadPool.h"
namespace storagemanager
{
class ProcessTask
class ProcessTask : public ThreadPool::Job
{
public:
ProcessTask(int sock, uint length); // _sock is the socket to read from
@@ -17,9 +19,10 @@ class ProcessTask
private:
ProcessTask();
void handleError();
void handleError(int errCode);
int sock;
uint length;
bool returnedSock;
};

View File

@@ -1,5 +1,7 @@
#include "ReadTask.h"
#include "messageFormat.h"
#include <errno.h>
using namespace std;
@@ -16,24 +18,24 @@ ReadTask::~ReadTask()
void ReadTask::run()
{
uint8_t buf[1024] = {0};
// get the parameters
if (getLength() > 1024) {
if (getLength() > 1023) {
handleError("ReadTask read", EFAULT);
return;
}
bool success;
uint8_t *buf = alloca(getLength());
success = read(&buf, getLength());
success = read(buf, getLength());
cmd_overlay *cmd = (cmd_overlay *) buf;
// read from IOC, write to the socket
vector<uint8_t> outbuf;
outbuf.resize(count + SM_HEADER_LEN);
outbuf.resize(cmd->count + SM_HEADER_LEN);
uint32_t *outbuf32 = (uint32_t *) &outbuf[0];
outbuf32[0] = SM_MSG_START;
outbuf32[1] = length;
outbuf32[1] = cmd->count;
// do the reading and writing in chunks
// IOC->willRead(filename, offset, length);

View File

@@ -8,7 +8,7 @@
namespace storagemanager
{
class ReadTask : PosixTask
class ReadTask : public PosixTask
{
public:
ReadTask(int sock, uint length);

View File

@@ -1,5 +1,11 @@
#include "StatTask.h"
#include "messageFormat.h"
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <stdint.h>
#include <string.h>
using namespace std;
@@ -14,6 +20,13 @@ StatTask::~StatTask()
{
}
#define check_error(msg) \
if (!success) \
{ \
handleError(msg, errno); \
return; \
}
void StatTask::run()
{
bool success;
@@ -39,3 +52,6 @@ void StatTask::run()
memcpy(&buf[SM_HEADER_LEN], &_stat, sizeof(_stat));
write(buf, SM_HEADER_LEN + sizeof(_stat));
}
}

View File

@@ -2,7 +2,7 @@
#include "ThreadPool.h"
#include <boost/thread/locks.hpp>
#include <boost/thread/mutex.hpp>
using namespace std;
@@ -22,7 +22,7 @@ ThreadPool::ThreadPool(uint num_threads) : maxThreads(num_threads), die(false),
ThreadPool::~ThreadPool()
{
boost::mutex::unique_lock s(m);
boost::mutex::scoped_lock s(m);
die = true;
jobs.clear();
jobAvailable.notify_all();
@@ -34,12 +34,11 @@ ThreadPool::~ThreadPool()
void ThreadPool::addJob(Job &j)
{
boost::mutex::unique_lock s(m);
boost::mutex::scoped_lock s(m);
jobs.push_back(j);
// Start another thread if necessary
if (threadsWaiting == 0 && threads.size() < maxThreads) {
boost::scoped_ptr<boost::thread> thread(new boost::thread(processingLoop));
boost::shared_ptr<boost::thread> thread(new boost::thread(Runner(this)));
threads.push_back(thread);
}
else
@@ -48,7 +47,7 @@ void ThreadPool::addJob(Job &j)
void ThreadPool::processingLoop()
{
boost::mutex::scoped_lock s(m, boost::defer_lock_t);
boost::mutex::scoped_lock s(m, boost::defer_lock);
while (!die)
{

View File

@@ -5,31 +5,36 @@
#include <list>
#include <deque>
#include <boost/thread.hpp>
//#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/utility.hpp>
namespace storagemanager
{
typedef boost::function0<void> Job;
class ThreadPool : public boost::noncopyable
{
public:
ThreadPool(); // this ctor uses an 'unbounded' # of threads; relies on the context for a natural max.
ThreadPool(uint num_threads); // this ctor lets caller specify a limit.
virtual ~Threadpool();
virtual ~ThreadPool();
// doesn't block
// addJob doesn't block
typedef boost::function0<void> Job;
void addJob(Job &j);
private:
struct Runner {
Runner(ThreadPool *t) : tp(t) { }
void operator()() { tp->processingLoop(); }
ThreadPool *tp;
};
void processingLoop(); // the fcn run by each thread
uint maxThreads;
bool die;
int threadsWaiting;
std::vector<boost::scoped_ptr<boost:thread> > threads;
std::vector<boost::shared_ptr<boost::thread> > threads;
boost::condition jobAvailable;
std::deque<Job> jobs;
boost::mutex m;

View File

@@ -1,5 +1,7 @@
#include "TruncateTask.h"
#include <errno.h>
#include "messageFormat.h"
using namespace std;
@@ -14,6 +16,13 @@ TruncateTask::~TruncateTask()
{
}
#define check_error(msg) \
if (!success) \
{ \
handleError(msg, errno); \
return; \
}
void TruncateTask::run()
{
bool success;
@@ -31,9 +40,11 @@ void TruncateTask::run()
// IOC->truncate(cmd->filename, cmd->newSize);
// generic success msg
uint32_t *buf32 = buf;
uint32_t *buf32 = (uint32_t *) buf;
buf32[0] = SM_MSG_START;
buf32[1] = 4;
buf32[2] = 0;
write(buf, 12);
}
}

View File

@@ -1,5 +1,7 @@
#include "UnlinkTask.h"
#include <errno.h>
#include "messageFormat.h"
using namespace std;
@@ -14,6 +16,14 @@ UnlinkTask::~UnlinkTask()
{
}
#define check_error(msg) \
if (!success) \
{ \
handleError(msg, errno); \
return; \
}
void UnlinkTask::run()
{
bool success;
@@ -31,9 +41,11 @@ void UnlinkTask::run()
// IOC->unlink(cmd->filename);
// generic success msg
uint32_t *buf32 = buf;
uint32_t *buf32 = (uint32_t *) buf;
buf32[0] = SM_MSG_START;
buf32[1] = 4;
buf32[2] = 0;
write(buf, 12);
}
}

View File

@@ -1,5 +1,6 @@
#include "WriteTask.h"
#include <errno.h>
using namespace std;
@@ -21,6 +22,8 @@ WriteTask::~WriteTask()
return; \
}
#define min(x, y) (x < y ? x : y)
void WriteTask::run()
{
bool success;
@@ -28,7 +31,7 @@ void WriteTask::run()
success = read(cmdbuf, sizeof(struct cmd_overlay));
check_error("WriteTask read");
cmd_overlay *cmd = cmdbuf;
cmd_overlay *cmd = (cmd_overlay *) cmdbuf;
success = read(&cmdbuf[sizeof(*cmd)], min(cmd->filename_len, 1024 - sizeof(*cmd) - 1));
check_error("WriteTask read");

View File

@@ -8,7 +8,7 @@
namespace storagemanager
{
class WriteTask : PosixTask
class WriteTask : public PosixTask
{
public:
WriteTask(int sock, uint length);