1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-13 23:02:14 +03:00

Working on defining message structures and unit tests for

the PosixTask subclasses.
This commit is contained in:
Patrick LeBlanc
2019-01-31 14:43:32 -06:00
parent 0ef3caca9e
commit 13af644425
14 changed files with 335 additions and 99 deletions

View File

@@ -7,30 +7,48 @@
#ifndef MESSAGEFORMAT_H_
#define MESSAGEFORMAT_H_
#include <sys/types.h>
#include <sys/stat.h>
#include <stdint.h>
namespace storagemanager
{
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma pack(push, 1)
struct sm_msg_header {
uint32_t type; // SM_MSG_{START,CONT,END}
uint32_t payloadLen;
uint8_t payload[];
};
struct sm_msg_resp {
uint32_t type;
uint32_t payloadLen;
int32_t returnCode; // if < 0 it indicates an error, and payload contains a 4-byte errno value
uint8_t payload[];
};
// all msgs to and from StorageManager begin with this magic and a payload length
static const uint SM_MSG_START=0xbf65a7e1;
static const uint32_t SM_MSG_START=0xbf65a7e1;
// for read/write/append, which may break a message into chunks, messages not the
// beginning or end of the larger message will preface a chunk with this magic
static const uint SM_MSG_CONT=0xfa371bd2;
static const uint32_t SM_MSG_CONT=0xfa371bd2;
// for read/write/append, the last chunk of a message should begin with this magic
static const uint SM_MSG_END=0x9d5bc31b;
static const uint32_t SM_MSG_END=0x9d5bc31b;
static const uint SM_HEADER_LEN = 8;
static const uint32_t SM_HEADER_LEN = sizeof(sm_msg_header);
// the unix socket StorageManager is listening on
static const char *socket_name = "\0storagemanager";
#pragma GCC diagnostic pop
// opcodes understood by StorageManager. Cast these to
// a uint8_t to use them.
enum Opcodes {
@@ -57,7 +75,9 @@ enum Opcodes {
On success, what follows is any output parameters from the call.
TBD: Require filenames to be NULL-terminated. Currently they are not.
*/
/*
OPEN
----
command format:
@@ -65,7 +85,19 @@ enum Opcodes {
response format:
struct stat
*/
struct open_cmd {
uint8_t opcode; // == OPEN
int32_t openmode;
uint32_t flen;
char filename[];
};
struct open_resp {
struct stat statbuf;
};
/*
READ
----
command format:
@@ -73,28 +105,66 @@ enum Opcodes {
response format:
data (size is stored in the return code)
*/
struct read_cmd {
uint8_t opcode; // == READ
size_t count;
off_t offset;
uint32_t flen;
char filename[];
};
typedef uint8_t read_resp;
/*
WRITE
-----
command format:
1-byte opcode|size_t count|off_t offset|4-byte filename length|filename|data
response format:
*/
struct write_cmd {
uint8_t opcode; // == WRITE
size_t count;
off_t offset;
uint32_t flen;
char filename[];
// after this is the data to be written, ie at &filename[flen]
};
/*
APPEND
------
command format:
1-byte opcode|4-byte filename length|filename|size_t count|data
1-byte opcode|size_t count|4-byte filename length|filename|data
response format:
*/
struct append_cmd {
uint8_t opcode; // == APPEND
size_t count;
uint32_t flen;
char filename[];
// after this is the data to be written, ie at &filename[flen]
};
/*
UNLINK
------
command format:
1-byte opcode|4-byte filename length|filename
response format:
*/
struct unlink_cmd {
uint8_t opcode; // == UNLINK
uint32_t flen;
char filename[];
};
/*
STAT
----
command format:
@@ -102,7 +172,19 @@ enum Opcodes {
response format:
struct stat
*/
struct stat_cmd {
uint8_t opcode; // == STAT
uint32_t flen;
char filename[];
};
struct stat_resp {
struct stat statbuf;
}
/*
TRUNCATE
--------
command format:
@@ -128,6 +210,8 @@ enum Opcodes {
*/
#pragma pack(pop)
}
#endif

View File

@@ -34,16 +34,21 @@ void AppendTask::run()
uint8_t cmdbuf[1024] = {0};
int err;
success = read(cmdbuf, sizeof(struct cmd_overlay));
success = read(cmdbuf, sizeof(append_cmd));
check_error("AppendTask read");
cmd_overlay *cmd = (cmd_overlay *) cmdbuf;
append_cmd *cmd = (append_cmd *) cmdbuf;
success = read(&cmdbuf[sizeof(*cmd)], min(cmd->filename_len, 1024 - sizeof(*cmd) - 1));
if (cmd->flen > 1023 - sizeof(*cmd))
{
handleError("AppendTask", ENAMETOOLONG);
return;
}
success = read(&cmdbuf[sizeof(*cmd)], cmd->flen);
check_error("AppendTask read");
size_t readCount = 0, writeCount = 0;
vector<uint8_t> databuf;
uint bufsize = 1 << 20; // 1 MB
uint bufsize = min(1 << 20, cmd->count); // 1 MB
databuf.resize(bufsize);
while (readCount < cmd->count)
@@ -52,31 +57,34 @@ void AppendTask::run()
success = read(&databuf[0], toRead);
check_error("AppendTask read data");
readCount += toRead;
uint writePos = 0;
while (writeCount < readCount)
{
int err = ioc->append(cmd->filename, &databuf[writeCount], readCount - writeCount);
int err = ioc->append(cmd->filename, &databuf[writePos], toRead - writePos);
if (err <= 0)
break;
writeCount += err;
writePos += err;
}
if (readCount != writeCount)
break;
}
uint32_t response[4];
response[0] = SM_MSG_START;
uint8_t respbuf[sizeof(sm_msg_resp) + 4];
sm_msg_resp *resp = (sm_msg_resp *) respbuf;
resp->type = SM_MSG_START;
if (cmd->count != 0 && writeCount == 0)
{
response[1] = 8;
response[2] = -1;
response[3] = errno;
write((uint8_t *) response, 16);
resp->payloadLen = 8;
resp->returnCode = -1;
*((int *) &resp[1]) = errno;
write((uint8_t *) respbuf, sizeof(sm_msg_resp) + 4);
}
else
{
response[1] = 4;
response[2] = writeCount;
write((uint8_t *) response, 12);
resp->payloadLen = 4;
resp->returnCode = writeCount;
write((uint8_t *) respbuf, sizeof(sm_msg_resp));
}
}

View File

@@ -18,12 +18,6 @@ class AppendTask : public PosixTask
private:
AppendTask();
struct cmd_overlay {
size_t count;
uint filename_len;
char filename[];
};
};
}

View File

@@ -37,7 +37,7 @@ struct scoped_closer {
};
#define OPEN(name, mode) \
fd = ::open(filename, mode); \
fd = ::open(filename, mode, 0666); \
if (fd < 0) \
return fd; \
scoped_closer sc(fd);

View File

@@ -45,19 +45,20 @@ void OpenTask::run()
return;
}
cmd_overlay *cmd = (cmd_overlay *) buf;
int err = ioc->open(cmd->filename, cmd->openmode, (struct stat *) &buf[SM_HEADER_LEN]);
open_cmd *cmd = (open_cmd *) buf;
int err = ioc->open(cmd->filename, cmd->openmode, (struct stat *) &buf[sizeof(sm_msg_resp)]);
if (err)
{
handleError("OpenTask open", errno);
return;
}
uint32_t *buf32 = (uint32_t *) buf;
buf32[0] = SM_MSG_START;
buf32[1] = sizeof(struct stat);
success = write(buf, sizeof(struct stat) + SM_HEADER_LEN);
sm_msg_resp *resp = (sm_msg_resp *) buf;
resp->type = SM_MSG_START;
resp->payloadLen = sizeof(struct stat) + 4;
resp->returnCode = 0;
success = write(buf, sizeof(struct stat) + sizeof(sm_msg_resp));
if (!success)
handleError("OpenTask write", errno);
}

View File

@@ -17,12 +17,6 @@ class OpenTask : public PosixTask
private:
OpenTask();
struct cmd_overlay {
int openmode;
uint flen;
char filename[];
};
};

View File

@@ -36,15 +36,15 @@ void PosixTask::handleError(const char *name, int errCode)
char buf[80];
// send an error response if possible
int32_t *buf32 = (int32_t *) buf;
buf32[0] = SM_MSG_START;
buf32[1] = 8;
buf32[2] = -1;
buf32[3] = errCode;
write((uint8_t *) buf, 16);
sm_msg_resp *resp = (sm_msg_resp *) buf;
resp->type = SM_MSG_START;
resp->payloadLen = 8;
resp->returnCode = -1;
*((int *) resp->payload) = errCode;
write((uint8_t *) buf, sizeof(*resp) + 4);
// TODO: construct and log a message
cout << name << " caught an error reading from a socket: " << strerror_r(errCode, buf, 80) << endl;
cout << name << " caught an error: " << strerror_r(errCode, buf, 80) << endl;
socketError();
}

View File

@@ -14,6 +14,8 @@
#include "UnlinkTask.h"
#include "WriteTask.h"
#include <sys/socket.h>
using namespace std;
namespace storagemanager
@@ -50,8 +52,7 @@ void ProcessTask::operator()()
int err;
uint8_t opcode;
/* D'oh, a 1-byte read.... TODO! */
err = ::read(sock, &opcode, 1);
err = ::recv(sock, &opcode, 1, MSG_PEEK);
if (err <= 0)
{
handleError(errno);

View File

@@ -49,6 +49,7 @@ void ReadTask::run()
outbuf32[1] = cmd->count;
// todo: do the reading and writing in chunks
// todo: need to make this use O_DIRECT
ioc->willRead(cmd->filename, cmd->offset, cmd->count);
int count = 0, err;
while (count < cmd->count)

View File

@@ -39,7 +39,7 @@ void UnlinkTask::run()
success = read(buf, getLength());
check_error("UnlinkTask read");
cmd_overlay *cmd = (cmd_overlay *) buf;
unlink_cmd *cmd = (unlink_cmd *) buf;
int err = ioc->unlink(cmd->filename);
if (err)
@@ -48,11 +48,11 @@ void UnlinkTask::run()
return;
}
uint32_t *buf32 = (uint32_t *) buf;
buf32[0] = SM_MSG_START;
buf32[1] = 4;
buf32[2] = 0;
write(buf, 12);
sm_msg_resp *resp = (sm_msg_resp *) buf;
resp->type = SM_MSG_START;
resp->payloadLen = 4;
resp->returnCode = 0;
write(buf, sizeof(*resp));
}
}

View File

@@ -17,11 +17,6 @@ class UnlinkTask : public PosixTask
private:
UnlinkTask();
struct cmd_overlay {
uint flen;
char filename[];
};
};

View File

@@ -32,17 +32,22 @@ void WriteTask::run()
{
bool success;
uint8_t cmdbuf[1024] = {0};
success = read(cmdbuf, sizeof(struct cmd_overlay));
check_error("WriteTask read");
cmd_overlay *cmd = (cmd_overlay *) cmdbuf;
success = read(&cmdbuf[sizeof(*cmd)], min(cmd->filename_len, 1024 - sizeof(*cmd) - 1));
success = read(cmdbuf, sizeof(write_cmd));
check_error("WriteTask read");
write_cmd *cmd = (write_cmd *) cmdbuf;
if (cmd->flen > 1023 - sizeof(*cmd))
{
handleError("WriteTask", ENAMETOOLONG);
return;
}
success = read(&cmdbuf[sizeof(*cmd)], cmd->flen);
check_error("WriteTask read");
size_t readCount = 0, writeCount = 0;
vector<uint8_t> databuf;
uint bufsize = 1 << 20; // 1 MB
uint bufsize = min(1 << 20, cmd->count); // 1 MB
databuf.resize(bufsize);
while (readCount < cmd->count)
@@ -51,31 +56,34 @@ void WriteTask::run()
success = read(&databuf[0], toRead);
check_error("WriteTask read data");
readCount += toRead;
uint writePos = 0;
while (writeCount < readCount)
{
int err = ioc->write(cmd->filename, &databuf[writeCount], cmd->offset + writeCount, readCount - writeCount);
int err = ioc->append(cmd->filename, &databuf[writePos], toRead - writePos);
if (err <= 0)
break;
writeCount += err;
writePos += err;
}
if (writeCount != readCount)
break;
}
uint32_t response[4];
response[0] = SM_MSG_START;
uint8_t respbuf[sizeof(sm_msg_resp) + 4];
sm_msg_resp *resp = (sm_msg_resp *) respbuf;
resp->type = SM_MSG_START;
if (cmd->count != 0 && writeCount == 0)
{
response[1] = 8;
response[2] = -1;
response[3] = errno;
write((uint8_t *) response, 16);
resp->payloadLen = 8;
resp->returnCode = -1;
*((int *) &resp[1]) = errno;
write((uint8_t *) respbuf, sizeof(sm_msg_resp) + 4);
}
else
{
response[1] = 4;
response[2] = writeCount;
write((uint8_t *) response, 12);
resp->payloadLen = 4;
resp->returnCode = writeCount;
write((uint8_t *) respbuf, sizeof(sm_msg_resp));
}
}

View File

@@ -18,13 +18,6 @@ class WriteTask : public PosixTask
private:
WriteTask();
struct cmd_overlay {
size_t count;
off_t offset;
uint filename_len;
char filename[];
};
};
}

View File

@@ -1,5 +1,9 @@
#include "OpenTask.h"
#include "WriteTask.h"
#include "AppendTask.h"
#include "UnlinkTask.h"
#include "IOCoordinator.h"
#include "messageFormat.h"
#include <iostream>
#include <stdlib.h>
#include <unistd.h>
@@ -24,7 +28,6 @@ struct scoped_closer {
scoped_closer(int f) : fd(f) { }
~scoped_closer() {
int s_errno = errno;
cout << "closing " << fd << endl;
::close(fd);
errno = s_errno;
}
@@ -38,8 +41,9 @@ int getSocket()
return sock;
}
int sessionSock = -1;
int serverSock = -1;
int sessionSock = -1; // tester uses this end of the connection
int serverSock = -1;
int clientSock = -1; // have the Tasks use this end of the connection
void acceptConnection()
{
@@ -61,8 +65,9 @@ void acceptConnection()
sessionSock = ::accept(serverSock, NULL, NULL);
assert(sessionSock > 0);
}
bool opentask()
// connects sessionSock to clientSock
void makeConnection()
{
boost::thread t(acceptConnection);
struct sockaddr_un sa;
@@ -70,43 +75,195 @@ bool opentask()
sa.sun_family = AF_UNIX;
memcpy(&sa.sun_path[1], "testing", 7);
int clientSock = ::socket(AF_UNIX, SOCK_STREAM, 0);
clientSock = ::socket(AF_UNIX, SOCK_STREAM, 0);
assert(clientSock > 0);
scoped_closer s2(clientSock);
sleep(1); // let server thread get to accept()
int err = ::connect(clientSock, (struct sockaddr *) &sa, sizeof(sa));
assert(err == 0);
t.join();
scoped_closer s3(sessionSock);
}
bool opentask()
{
// going to rely on msgs being smaller than the buffer here
uint8_t buf[1024];
uint32_t *buf32 = (uint32_t *) buf;
sm_msg_header *hdr = (sm_msg_header *) buf;
open_cmd *cmd = (open_cmd *) &hdr[1];
// open/create a file named 'opentest1'
const char *filename = "opentest1";
buf32[0] = O_WRONLY | O_CREAT;
buf32[1] = 9;
strcpy((char *) &buf32[2], "opentest1");
uint msg_len = 17;
hdr->type = SM_MSG_START;
hdr->payloadLen = sizeof(*cmd) + 9;
cmd->opcode = OPEN;
cmd->openmode = O_WRONLY | O_CREAT;
cmd->flen = 9;
strcpy((char *) cmd->filename, filename);
::unlink(filename);
::write(sessionSock, buf, msg_len);
OpenTask o(clientSock, msg_len);
::write(sessionSock, cmd, hdr->payloadLen);
OpenTask o(clientSock, hdr->payloadLen);
o.run();
// read the response
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_msg_resp *resp = (sm_msg_resp *) buf;
assert(err == sizeof(struct stat) + sizeof(sm_msg_resp));
assert(resp->type == SM_MSG_START);
assert(resp->payloadLen == sizeof(struct stat) + 4);
assert(resp->returnCode == 0);
struct stat *_stat = (struct stat *) resp->payload;
// what can we verify about the stat...
assert(_stat->st_uid == getuid());
assert(_stat->st_gid == getgid());
assert(_stat->st_size == 0);
/* verify the file is there */
assert(boost::filesystem::exists(filename));
::unlink(filename);
cout << "opentask OK" << endl;
return true;
}
bool writetask()
{
// make an empty file to write to
const char *filename = "writetest1";
::unlink(filename);
int fd = ::open(filename, O_CREAT | O_RDWR, 0666);
assert(fd > 0);
scoped_closer f(fd);
uint8_t buf[1024];
sm_msg_header *hdr = (sm_msg_header *) buf;
write_cmd *cmd = (write_cmd *) &hdr[1];
uint8_t *data;
cmd->opcode = WRITE;
cmd->offset = 0;
cmd->count = 9;
cmd->flen = 10;
memcpy(&cmd->filename, filename, cmd->flen);
data = (uint8_t *) &cmd->filename[cmd->flen];
memcpy(data, "123456789", cmd->count);
hdr->type = SM_MSG_START;
hdr->payloadLen = sizeof(*cmd) + cmd->flen + cmd->count;
WriteTask w(clientSock, hdr->payloadLen);
::write(sessionSock, cmd, hdr->payloadLen);
w.run();
// verify response
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_msg_resp *resp = (sm_msg_resp *) buf;
assert(err == sizeof(*resp));
assert(resp->type == SM_MSG_START);
assert(resp->payloadLen == 4);
assert(resp->returnCode == 9);
//check file contents
err = ::read(fd, buf, 1024);
assert(err == 9);
buf[9] = 0;
assert(!strcmp("123456789", (const char *) buf));
::unlink(filename);
cout << "write task OK" << endl;
return true;
}
bool appendtask()
{
// make a file and put some stuff in it
const char *filename = "appendtest1";
::unlink(filename);
int fd = ::open(filename, O_CREAT | O_RDWR, 0666);
assert(fd > 0);
scoped_closer f(fd);
int err = ::write(fd, "testjunk", 8);
assert(err == 8);
uint8_t buf[1024];
append_cmd *cmd = (append_cmd *) buf;
uint8_t *data;
cmd->opcode = APPEND;
cmd->count = 9;
cmd->flen = 11;
memcpy(&cmd->filename, filename, cmd->flen);
data = (uint8_t *) &cmd->filename[cmd->flen];
memcpy(data, "123456789", cmd->count);
int payloadLen = sizeof(*cmd) + cmd->flen + cmd->count;
AppendTask a(clientSock, payloadLen);
::write(sessionSock, cmd, payloadLen);
a.run();
// verify response
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_msg_resp *resp = (sm_msg_resp *) buf;
assert(err == sizeof(*resp));
assert(resp->type == SM_MSG_START);
assert(resp->payloadLen == 4);
assert(resp->returnCode == 9);
//check file contents
::lseek(fd, 0, SEEK_SET);
err = ::read(fd, buf, 1024);
assert(err == 17);
buf[17] = 0;
assert(!strcmp("testjunk123456789", (const char *) buf));
::unlink(filename);
cout << "append task OK" << endl;
return true;
}
bool unlinktask()
{
// make a file and delete it
const char *filename = "unlinktest1";
::unlink(filename);
int fd = ::open(filename, O_CREAT | O_RDWR, 0666);
assert(fd > 0);
scoped_closer f(fd);
uint8_t buf[1024];
unlink_cmd *cmd = (unlink_cmd *) buf;
uint8_t *data;
cmd->opcode = UNLINK;
cmd->flen = strlen(filename);
memcpy(&cmd->filename, filename, cmd->flen);
UnlinkTask u(clientSock, sizeof(unlink_cmd) + cmd->flen);
::write(sessionSock, cmd, sizeof(unlink_cmd) + cmd->flen);
u.run();
// verify response
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_msg_resp *resp = (sm_msg_resp *) buf;
assert(err == sizeof(*resp));
assert(resp->type == SM_MSG_START);
assert(resp->payloadLen == 4);
assert(resp->returnCode == 0);
// confirm it no longer exists
assert(!boost::filesystem::exists(filename));
cout << "unlink task OK" << endl;
}
int main()
{
ioc = new IOCoordinator();
cout << "connecting" << endl;
makeConnection();
cout << "connected" << endl;
scoped_closer sc1(serverSock), sc2(sessionSock), sc3(clientSock);
opentask();
writetask();
appendtask();
unlinktask();
return 0;
}