1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-24 14:20:59 +03:00

EOD checkpoint. Finished first cut of the harder posix tasks.

This commit is contained in:
Patrick LeBlanc
2019-01-28 17:26:09 -06:00
parent 0d74c32d08
commit b38c92738c
13 changed files with 259 additions and 19 deletions

View File

@@ -61,7 +61,7 @@ enum Opcodes {
OPEN
----
command format:
1-byte opcode|4-byte filename length|filename|4-byte openmode
1-byte opcode|4-byte openmode|4-byte filename length|filename
response format:
struct stat
@@ -69,7 +69,7 @@ enum Opcodes {
READ
----
command format:
1-byte opcode|4-byte filename length|filename|size_t count|off_t offset
1-byte opcode|size_t count|off_t offset|4-byte filename length|filename
response format:
data (size is stored in the return code)

View File

@@ -0,0 +1,50 @@
#include "AppendTask.h"
using namespace std;
namespace storagemanager
{
AppendTask::AppendTask(int sock, uint len) : PosixTask(sock, len)
{
}
AppendTask::~AppendTask()
{
}
#define check_error(msg) \
if (!success) \
{ \
handleError(msg); \
return; \
}
void AppendTask::run()
{
bool success;
uint8_t cmdbuf[1024] = {0};
success = read(cmdbuf, sizeof(struct cmd_overlay));
check_error("AppendTask read");
cmd_overlay *cmd = cmdbuf;
success = read(&cmdbuf[sizeof(*cmd)], min(cmd->filename_len, 1024 - sizeof(*cmd) - 1));
check_error("AppendTask read");
size_t count = 0;
vector<uint8_t> databuf;
databuf.resize(cmd->count);
// todo: do this in chunks...
while (count < cmd->count)
{
success = read(&databuf[count], cmd->count - count);
check_error("AppendTask read data");
count += cmd->count;
// IOC->append()
}
}
}

View File

@@ -0,0 +1,30 @@
#ifndef APPENDTASK_H_
#define APPENDTASK_H_
#include "PosixTask.h"
namespace storagemanager
{
class AppendTask : PosixTask
{
public:
AppendTask(int sock, uint length);
virtual ~AppendTask();
void run();
private:
AppendTask();
struct cmd_overlay {
size_t count;
uint filename_len;
char filename[];
};
};
}
#endif

View File

@@ -0,0 +1,105 @@
#include "ListDirectoryTask.h"
using namespace std;
namespace storagemanager
{
ListDirectoryTask::ListDirectoryTask(int sock, uint len) : PosixTask(sock, len)
{
}
ListDirectoryTask::~ListDirectoryTask()
{
}
#define check_error(msg) \
if (!success) \
{ \
handleError(msg, errno); \
return; \
}
#define check_error_2(msg) \
if (!success) \
{ \
handleError(msg, errno); \
return false; \
}
bool ListDirectoryTask::writeString(uint8_t buf, int *offset, int size, const string &str)
{
bool success;
if (size - *offset < 4) // eh, let's not frag 4 bytes.
{
success = write(buf, *offset);
check_error_2("ListDirectoryTask::writeString()");
*offset = 0;
}
uint count = 0, len = str.length();
*((uint32_t *) &buf[*offset]) = len;
*offset += 4;
while (count < len)
{
int toWrite = min(size - *offset, len);
memcpy(&buf[*offset], &str.data()[count], toWrite);
count += toWrite;
*offset += toWrite;
if (*offset == size)
{
success = write(buf, *offset);
check_error_2("ListDirectoryTask::writeString()");
*offset = 0;
}
}
return true;
}
void ListDirectoryTask::run()
{
bool success;
uint8_t buf[1024];
if (getLength() > 1024) {
handleError("ListDirectoryTask read", ENAMETOOLONG);
return;
}
success = read(buf, getLength());
check_error("ListDirectoryTask read");
cmd_overlay *cmd = (cmd_overlay *) buf;
vector<string> listing;
// IOC->listDirectory(path, &listing)
// bogus response
listing.push_back("dummy1");
listing.push_back("dummy2");
uint payloadLen = 4 * listing.size();
for (uint i = 0; i < listing.size(); i++)
payloadLen += listing.size();
uint32_t *buf32 = (uint32_t *) buf;
buf32[0] = SM_MSG_START;
buf32[1] = payloadLen;
int offset = 8;
for (uint i = 0; i < listing.size(); i++)
{
success = writeString(buf, &offset, 1024, listing);
check_error("ListDirectoryTask write");
}
if (offset != 0)
{
success = write(buf, offset);
check_error("ListDirectoryTask write");
}
}
}

View File

@@ -0,0 +1,30 @@
#ifndef LIST_DIRECTORYTASK_H_
#define LIST_DIRECTORYTASK_H_
#include "PosixTask.h"
namespace storagemanager
{
class ListDirectoryTask : public PosixTask
{
public:
ListDirectoryTask(int sock, uint length);
virtual ~ListDirectoryTask();
void run();
private:
ListDirectoryTask();
void writeString(uint8_t buf, int *offset, int size, const std::string &str)
struct cmd_overlay {
uint flen;
char path[];
};
};
}
#endif

View File

@@ -20,8 +20,15 @@ void OpenTask::run()
return the result
*/
bool success;
uint8_t *buf = alloca(max(getLength(), sizeof(struct stat) + SM_HEADER_LEN));
uint8_t *buf;
if (getLength() > 1024)
{
handleError("OpenTask read", ENAMETOOLONG);
return;
}
buf = alloca(getLength());
success = read(&buf, getLength());
if (!success)
{
@@ -29,14 +36,12 @@ void OpenTask::run()
return;
}
uint32_t flen = *((uint32_t *) &buf[0]);
string filename((char *) &buf[4], flen); // might want to require filenames to be NULL-terminated for efficiency
int openmode = *((int *) &buf[4+flen]);
cmd_overlay *cmd = (cmd_overlay *) buf;
// IOC->open(filename, openmode, &buf[SM_HEADER_LEN])
// stand-in dummy response
uint32_t *buf32 = (uint32_t *) &buf[0];
uint32_t *buf32 = (uint32_t *) buf;
buf32[0] = SM_MSG_START;
buf32[1] = sizeof(struct stat);
success = write(buf, sizeof(struct stat) + SM_HEADER_LEN);

View File

@@ -1,6 +1,6 @@
#ifndef OPENTASK_H_
#define OPENTASH_H_
#define OPENTASK_H_
#include "PosixTask.h"
@@ -17,6 +17,12 @@ class OpenTask : public PosixTask
private:
OpenTask();
struct cmd_overlay {
int openmode;
uint flen;
char filename[];
};
};

View File

@@ -101,6 +101,12 @@ bool PosixTask::read(uint8_t *buf, uint length)
/* The caller's request has been satisfied here. If there is remaining data in the stream
get what's available. */
primeBuffer();
return true;
}
void PosixTask::primeBuffer()
{
if (remainingLengthInStream > 0)
{
// Reset the buffer to allow a larger read.
@@ -111,6 +117,8 @@ bool PosixTask::read(uint8_t *buf, uint length)
}
else if (bufferLen - bufferPos < 1024) // if < 1024 in the buffer, move data to the front
{
// debating whether it is more efficient to use a circular buffer + more
// recv's, or to move data to reduce the # of recv's. WAG: moving data.
memmove(buffer, &buffer[bufferPos], bufferLen - bufferPos);
bufferLen -= bufferPos;
bufferPos = 0;
@@ -118,7 +126,7 @@ bool PosixTask::read(uint8_t *buf, uint length)
uint toRead = min(remainingLengthInStream, bufferSize - bufferLen);
err = ::recv(sock, &localBuffer[bufferLen], toRead, MSG_NOBLOCK);
// ignoring errors here since the request has been satisfied successfully.
// ignoring errors here since this is supposed to be silent.
// errors will be caught by the next read
if (err > 0)
{
@@ -126,7 +134,6 @@ bool PosixTask::read(uint8_t *buf, uint length)
remainingLengthInStream -= err;
}
}
return true;
}
bool PosixTask::write(uint8_t *buf, uint len)

View File

@@ -12,6 +12,7 @@ class PosixTask
virtual ~PosixTask();
virtual void run() = 0;
void primeBuffer();
protected:
int read(uint8_t *buf, uint length);

View File

@@ -75,6 +75,7 @@ void ProcessTask::operator()()
default:
throw runtime_error("ProcessTask: got an unknown opcode");
}
task->primeBuffer();
task->run();
}

View File

@@ -18,17 +18,15 @@ void ReadTask::run()
{
// get the parameters
if (getLength() > 1024) {
handleError("ReadTask read", EFAULT);
return;
}
bool success;
uint8_t *buf = alloca(getLength());
int boff = 0;
success = read(&buf, getLength());
uint flen = *((uint *) &buf[0])
boff += 4;
string filename(&buf[boff], flen);
boff += flen;
size_t count = *((size_t *) &buf[boff]);
boff += sizeof(size_t);
off_t offset = *((off_t *) &buf[boff]);
cmd_overlay *cmd = (cmd_overlay *) buf;
// read from IOC, write to the socket
vector<uint8_t> outbuf;

View File

@@ -18,6 +18,13 @@ class ReadTask : PosixTask
private:
ReadTask();
struct cmd_overlay {
size_t count;
off_t offset;
uint flen;
char filename[];
};
};
}

View File

@@ -17,7 +17,7 @@ WriteTask::~WriteTask()
#define check_error(msg) \
if (!success) \
{ \
handleError(msg); \
handleError(msg, errno); \
return; \
}