1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-15 12:09:09 +03:00

Phase 1 of trying to clean up some messaging code.

This commit is contained in:
Patrick LeBlanc
2019-02-11 18:08:18 -06:00
parent d792f78c25
commit b78cac755b
16 changed files with 123 additions and 104 deletions

View File

@@ -20,14 +20,21 @@ namespace storagemanager
#pragma pack(push, 1)
struct sm_msg_header {
uint32_t type; // SM_MSG_{START,CONT,END}
uint32_t payloadLen;
uint32_t type; // SM_MSG_{START,CONT,END}
uint32_t payloadLen; // refers to the length of what follows the header
uint8_t flags; // see below for valid values
};
// current values for the flags field in sm_msg_header
static const uint8_t CONT = 0x1; // this means another message will follow as part of this request or response
struct sm_request {
sm_msg_header header;
uint8_t payload[];
};
struct sm_msg_resp {
uint32_t type;
uint32_t payloadLen;
struct sm_response {
sm_msg_header header;
int32_t returnCode; // if < 0 it indicates an error, and payload contains a 4-byte errno value
uint8_t payload[];
};

View File

@@ -71,22 +71,18 @@ bool AppendTask::run()
break;
}
uint8_t respbuf[sizeof(sm_msg_resp) + 4];
sm_msg_resp *resp = (sm_msg_resp *) respbuf;
resp->type = SM_MSG_START;
uint8_t respbuf[sizeof(sm_response) + 4];
sm_response *resp = (sm_response *) respbuf;
uint payloadLen = 0;
if (cmd->count != 0 && writeCount == 0)
{
resp->payloadLen = 8;
payloadLen = 4;
resp->returnCode = -1;
*((int *) &resp[1]) = errno;
success = write((uint8_t *) respbuf, sizeof(sm_msg_resp) + 4);
}
else
{
resp->payloadLen = 4;
resp->returnCode = writeCount;
success = write((uint8_t *) respbuf, sizeof(sm_msg_resp));
}
success = write(*resp, payloadLen);
return success;
}

View File

@@ -51,11 +51,10 @@ bool CopyTask::run()
return true;
}
sm_msg_resp *resp = (sm_msg_resp *) buf;
resp->type = SM_MSG_START;
resp->payloadLen = 4;
sm_response *resp = (sm_response *) buf;
uint payloadLen = 0;
resp->returnCode = 0;
success = write(buf, sizeof(sm_msg_resp));
success = write(*resp, 0);
return success;
}

View File

@@ -87,9 +87,10 @@ bool ListDirectoryTask::run()
for (uint i = 0; i < listing.size(); i++)
payloadLen += listing[i].size();
sm_msg_resp *resp = (sm_msg_resp *) buf;
resp->type = SM_MSG_START;
resp->payloadLen = payloadLen + 4; // the +4 is for the length of the return code
sm_response *resp = (sm_response *) buf;
resp->header.type = SM_MSG_START;
resp->header.payloadLen = payloadLen + sizeof(sm_response) - sizeof(sm_msg_header); // the +4 is for the length of the return code
resp->header.flags = 0;
resp->returnCode = 0;
listdir_resp *r = (listdir_resp *) resp->payload;
r->elements = listing.size();

View File

@@ -47,19 +47,16 @@ bool OpenTask::run()
#ifdef SM_TRACE
cout << "open filename " << cmd->filename << " mode " << oct << cmd->openmode << dec << endl;
#endif
int err = ioc->open(cmd->filename, cmd->openmode, (struct stat *) &buf[sizeof(sm_msg_resp)]);
sm_response *resp = (sm_response *) buf;
int err = ioc->open(cmd->filename, cmd->openmode, (struct stat *) &resp->payload);
if (err)
{
handleError("OpenTask open", errno);
return true;
}
sm_msg_resp *resp = (sm_msg_resp *) buf;
resp->type = SM_MSG_START;
resp->payloadLen = sizeof(struct stat) + 4;
resp->returnCode = 0;
success = write(buf, sizeof(struct stat) + sizeof(sm_msg_resp));
success = write(*resp, sizeof(struct stat));
if (!success)
handleError("OpenTask write", errno);
return success;

View File

@@ -34,11 +34,9 @@ bool PingTask::run()
}
// send generic success response
sm_msg_resp ret;
ret.type = SM_MSG_START;
ret.payloadLen = 4;
sm_response ret;
ret.returnCode = 0;
success = write((uint8_t *) &ret, sizeof(ret));
success = write(ret, 0);
return success;
}

View File

@@ -33,15 +33,13 @@ PosixTask::~PosixTask()
void PosixTask::handleError(const char *name, int errCode)
{
char buf[80];
char buf[sizeof(sm_response) + 4];
// send an error response if possible
sm_msg_resp *resp = (sm_msg_resp *) buf;
resp->type = SM_MSG_START;
resp->payloadLen = 8;
sm_response *resp = (sm_response *) buf;
resp->returnCode = -1;
*((int *) resp->payload) = errCode;
write((uint8_t *) buf, sizeof(*resp) + 4);
write(*resp, 4);
// TODO: construct and log a message
cout << name << " caught an error: " << strerror_r(errCode, buf, 80) << endl;
@@ -151,6 +149,27 @@ bool PosixTask::write(const uint8_t *buf, uint len)
return true;
}
bool PosixTask::write(sm_response &resp, uint payloadLength)
{
int err;
uint count = 0;
uint8_t *buf = (uint8_t *) &resp;
resp.header.type = SM_MSG_START;
resp.header.flags = 0;
resp.header.payloadLen = payloadLength + sizeof(sm_response) - sizeof(sm_msg_header);
uint toSend = payloadLength + sizeof(sm_response);
while (count < toSend)
{
err = ::send(sock, &buf[count], toSend - count, 0);
if (err < 0)
return false;
count += err;
}
return true;
}
bool PosixTask::write(const vector<uint8_t> &buf)
{
return write(&buf[0], buf.size());

View File

@@ -8,6 +8,7 @@
#include <stdint.h>
#include <iostream>
#include "IOCoordinator.h"
#include "messageFormat.h"
namespace storagemanager
{
@@ -25,6 +26,7 @@ class PosixTask
protected:
bool read(uint8_t *buf, uint length);
bool write(const std::vector<uint8_t> &buf);
bool write(sm_response &resp, uint payloadLength);
bool write(const uint8_t *buf, uint length);
void consumeMsg(); // drains the remaining portion of the message
uint getLength(); // returns the total length of the msg

View File

@@ -46,12 +46,11 @@ bool ReadTask::run()
// read from IOC, write to the socket
vector<uint8_t> outbuf;
outbuf.resize(max(cmd->count, 4) + sizeof(sm_msg_resp));
sm_msg_resp *resp = (sm_msg_resp *) &outbuf[0];
outbuf.resize(max(cmd->count, 4) + sizeof(sm_response));
sm_response *resp = (sm_response *) &outbuf[0];
resp->type = SM_MSG_START;
resp->returnCode = 0;
resp->payloadLen = 4;
uint payloadLen = 0;
// todo: do the reading and writing in chunks
// todo: need to make this use O_DIRECT on the IOC side
@@ -63,8 +62,8 @@ bool ReadTask::run()
cmd->count - resp->returnCode);
if (err < 0) {
if (resp->returnCode == 0) {
resp->payloadLen = 8;
resp->returnCode = err;
payloadLen = 4;
*((int32_t *) resp->payload) = errno;
}
break;
@@ -72,10 +71,10 @@ bool ReadTask::run()
if (err == 0)
break;
resp->returnCode += err;
resp->payloadLen += err;
}
success = write(&outbuf[0], resp->payloadLen + SM_HEADER_LEN);
if (resp->returnCode >= 0)
payloadLen = resp->returnCode;
success = write(*resp, payloadLen);
return success;
}

View File

@@ -280,7 +280,7 @@ int SessionManager::start()
//found it set msgLength and recvMsgStart offset of SM_MSG_START
recvMsgLength = *((uint *) &recv_buffer[i+4]);
//cout << "got length = " << recvMsgLength << endl;
recvMsgStart = i+8;
recvMsgStart = i + SM_HEADER_LEN;
//printf(" recvMsgLength %d recvMsgStart %d endofData %d\n", recvMsgLength,recvMsgStart,endOfData);
// if >= endOfData then the start of the message data is the beginning of next message
if (recvMsgStart >= endOfData)

View File

@@ -8,7 +8,7 @@
#include <boost/thread/mutex.hpp>
#include <sys/poll.h>
#include <unordered_map>
#include <tr1/unordered_map>
namespace storagemanager
@@ -53,7 +53,7 @@ private:
char remainingData[SM_HEADER_LEN];
uint remainingBytes;
};
std::unordered_map<int, SockState> sockState;
std::tr1::unordered_map<int, SockState> sockState;
};

View File

@@ -40,7 +40,7 @@ bool StatTask::run()
success = read(buf, getLength());
check_error("StatTask read", false);
stat_cmd *cmd = (stat_cmd *) buf;
sm_msg_resp *resp = (sm_msg_resp *) buf;
sm_response *resp = (sm_response *) buf;
#ifdef SM_TRACE
cout << "stat " << cmd->filename << endl;
@@ -48,17 +48,15 @@ bool StatTask::run()
int err = ioc->stat(cmd->filename, (struct stat *) resp->payload);
resp->type = SM_MSG_START;
resp->returnCode = err;
if (!err) {
resp->payloadLen = sizeof(struct stat) + 4;
success = write(buf, sizeof(*resp) + sizeof(struct stat));
}
uint payloadLen;
if (!err)
payloadLen = sizeof(struct stat);
else {
resp->payloadLen = 8;
payloadLen = 4;
*((int32_t *) resp->payload) = errno;
success = write(buf, SM_HEADER_LEN + resp->payloadLen);
}
success = write(*resp, payloadLen);
return success;
}

View File

@@ -48,11 +48,9 @@ bool TruncateTask::run()
return true;
}
sm_msg_resp *resp = (sm_msg_resp *) buf;
resp->type = SM_MSG_START;
resp->payloadLen = 4;
sm_response *resp = (sm_response *) buf;
resp->returnCode = 0;
success = write(buf, sizeof(sm_msg_resp));
success = write(*resp, 0);
return success;
}

View File

@@ -49,11 +49,9 @@ bool UnlinkTask::run()
return true;
}
sm_msg_resp *resp = (sm_msg_resp *) buf;
resp->type = SM_MSG_START;
resp->payloadLen = 4;
sm_response *resp = (sm_response *) buf;
resp->returnCode = 0;
success = write(buf, sizeof(*resp));
success = write(*resp, 0);
return success;
}

View File

@@ -71,22 +71,19 @@ bool WriteTask::run()
break;
}
uint8_t respbuf[sizeof(sm_msg_resp) + 4];
sm_msg_resp *resp = (sm_msg_resp *) respbuf;
resp->type = SM_MSG_START;
uint8_t respbuf[sizeof(sm_response) + 4];
sm_response *resp = (sm_response *) respbuf;
uint payloadLen = 0;
if (cmd->count != 0 && writeCount == 0)
{
resp->payloadLen = 8;
payloadLen = 4;
resp->returnCode = -1;
*((int *) &resp[1]) = errno;
success = write((uint8_t *) respbuf, sizeof(sm_msg_resp) + 4);
success = write(*resp, payloadLen);
}
else
{
resp->payloadLen = 4;
resp->returnCode = writeCount;
success = write((uint8_t *) respbuf, sizeof(sm_msg_resp));
}
success = write(*resp, payloadLen);
return success;
}

View File

@@ -96,6 +96,7 @@ bool opentask()
// open/create a file named 'opentest1'
const char *filename = "opentest1";
hdr->type = SM_MSG_START;
hdr->flags = 0;
hdr->payloadLen = sizeof(*cmd) + 9;
cmd->opcode = OPEN;
cmd->openmode = O_WRONLY | O_CREAT;
@@ -109,10 +110,11 @@ bool opentask()
// read the response
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_msg_resp *resp = (sm_msg_resp *) buf;
assert(err == sizeof(struct stat) + sizeof(sm_msg_resp));
assert(resp->type == SM_MSG_START);
assert(resp->payloadLen == sizeof(struct stat) + 4);
sm_response *resp = (sm_response *) buf;
assert(err == sizeof(struct stat) + sizeof(sm_response));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == sizeof(struct stat) + 4);
assert(resp->header.flags == 0);
assert(resp->returnCode == 0);
struct stat *_stat = (struct stat *) resp->payload;
@@ -159,10 +161,11 @@ bool writetask()
// verify response
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_msg_resp *resp = (sm_msg_resp *) buf;
sm_response *resp = (sm_response *) buf;
assert(err == sizeof(*resp));
assert(resp->type == SM_MSG_START);
assert(resp->payloadLen == 4);
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == 4);
assert(resp->header.flags == 0);
assert(resp->returnCode == 9);
//check file contents
@@ -205,10 +208,11 @@ bool appendtask()
// verify response
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_msg_resp *resp = (sm_msg_resp *) buf;
sm_response *resp = (sm_response *) buf;
assert(err == sizeof(*resp));
assert(resp->type == SM_MSG_START);
assert(resp->payloadLen == 4);
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == 4);
assert(resp->header.flags == 0);
assert(resp->returnCode == 9);
//check file contents
@@ -245,10 +249,11 @@ bool unlinktask()
// verify response
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_msg_resp *resp = (sm_msg_resp *) buf;
sm_response *resp = (sm_response *) buf;
assert(err == sizeof(*resp));
assert(resp->type == SM_MSG_START);
assert(resp->payloadLen == 4);
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == 4);
assert(resp->header.flags == 0);
assert(resp->returnCode == 0);
// confirm it no longer exists
@@ -277,10 +282,11 @@ bool stattask()
// read the response
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_msg_resp *resp = (sm_msg_resp *) buf;
assert(err == sizeof(struct stat) + sizeof(sm_msg_resp));
assert(resp->type == SM_MSG_START);
assert(resp->payloadLen == sizeof(struct stat) + 4);
sm_response *resp = (sm_response *) buf;
assert(err == sizeof(struct stat) + sizeof(sm_response));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.flags == 0);
assert(resp->header.payloadLen == sizeof(struct stat) + 4);
assert(resp->returnCode == 0);
struct stat *_stat = (struct stat *) resp->payload;
@@ -316,10 +322,11 @@ bool truncatetask()
// read the response
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_msg_resp *resp = (sm_msg_resp *) buf;
assert(err == sizeof(sm_msg_resp));
assert(resp->type == SM_MSG_START);
assert(resp->payloadLen == 4);
sm_response *resp = (sm_response *) buf;
assert(err == sizeof(sm_response));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.flags == 0);
assert(resp->header.payloadLen == 4);
assert(resp->returnCode == 0);
struct stat statbuf;
@@ -353,13 +360,14 @@ bool listdirtask()
/* going to keep this simple. Don't run this in a big dir. */
/* maybe later I'll make a dir, put a file in it, and etc. For now run it in a small dir. */
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_msg_resp *resp = (sm_msg_resp *) buf;
sm_response *resp = (sm_response *) buf;
assert(err > 0);
assert(resp->type == SM_MSG_START);
assert(resp->header.type == SM_MSG_START);
assert(resp->header.flags == 0);
assert(resp->returnCode == 0);
listdir_resp *r = (listdir_resp *) resp->payload;
//cout << "resp has " << r->elements << " elements" << endl;
int off = sizeof(sm_msg_resp) + sizeof(listdir_resp);
int off = sizeof(sm_response) + sizeof(listdir_resp);
while (off < err)
{
listdir_resp_entry *e = (listdir_resp_entry *) &buf[off];
@@ -391,10 +399,11 @@ bool pingtask()
// read the response
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_msg_resp *resp = (sm_msg_resp *) buf;
assert(err == sizeof(sm_msg_resp));
assert(resp->type == SM_MSG_START);
assert(resp->payloadLen == 4);
sm_response *resp = (sm_response *) buf;
assert(err == sizeof(sm_response));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == 4);
assert(resp->header.flags == 0);
assert(resp->returnCode == 0);
cout << "pingtask OK" << endl;
}
@@ -431,10 +440,11 @@ bool copytask()
// read the response
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_msg_resp *resp = (sm_msg_resp *) buf;
assert(err == sizeof(sm_msg_resp));
assert(resp->type == SM_MSG_START);
assert(resp->payloadLen == 4);
sm_response *resp = (sm_response *) buf;
assert(err == sizeof(sm_response));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == 4);
assert(resp->header.flags == 0);
assert(resp->returnCode == 0);
// verify copytest2 is there