1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-15 12:09:09 +03:00

Got load_brm to work, or at least, not crash.

This commit is contained in:
Patrick LeBlanc
2019-02-05 11:42:12 -06:00
parent fcea38b48c
commit 27c60421fc
3 changed files with 37 additions and 33 deletions

View File

@@ -23,6 +23,8 @@ ReadTask::~ReadTask()
return ret; \
}
#define min(x, y) (x < y ? x : y)
bool ReadTask::run()
{
uint8_t buf[1024] = {0};
@@ -36,38 +38,38 @@ bool ReadTask::run()
bool success;
success = read(buf, getLength());
check_error("ReadTask read cmd", false);
cmd_overlay *cmd = (cmd_overlay *) buf;
read_cmd *cmd = (read_cmd *) buf;
// read from IOC, write to the socket
vector<uint8_t> outbuf;
outbuf.resize(cmd->count + SM_HEADER_LEN);
uint32_t *outbuf32 = (uint32_t *) &outbuf[0];
outbuf32[0] = SM_MSG_START;
outbuf32[1] = cmd->count;
outbuf.resize(min(cmd->count, 4) + sizeof(sm_msg_resp));
sm_msg_resp *resp = (sm_msg_resp *) &outbuf[0];
resp->type = SM_MSG_START;
resp->returnCode = 0;
resp->payloadLen = 4;
// todo: do the reading and writing in chunks
// todo: need to make this use O_DIRECT
// todo: need to make this use O_DIRECT on the IOC side
ioc->willRead(cmd->filename, cmd->offset, cmd->count);
int count = 0, err;
while (count < cmd->count)
int err;
while (resp->returnCode < cmd->count)
{
err = ioc->read(cmd->filename, &outbuf[SM_HEADER_LEN + count], cmd->offset + count, cmd->count - count);
if (err <= 0) {
if (count > 0)
outbuf32[1] = count;
else {
int l_errno = errno;
outbuf.resize(16);
outbuf32[1] = 8;
outbuf32[2] = err;
outbuf32[3] = l_errno;
err = ioc->read(cmd->filename, &resp->payload[resp->returnCode], cmd->offset + resp->returnCode,
cmd->count - resp->returnCode);
if (err < 0) {
if (resp->returnCode == 0) {
resp->payloadLen = 8;
resp->returnCode = err;
*((int32_t *) resp->payload) = errno;
}
break;
}
count += err;
resp->returnCode += err;
resp->payloadLen += err;
}
success = write(outbuf);
success = write(&outbuf[0], resp->payloadLen + SM_HEADER_LEN);
return success;
}

View File

@@ -18,13 +18,6 @@ class ReadTask : public PosixTask
private:
ReadTask();
struct cmd_overlay {
size_t count;
off_t offset;
uint flen;
char filename[];
};
};
}

View File

@@ -11,6 +11,7 @@
#include <errno.h>
#include <string>
#include <assert.h>
#include <iostream>
using namespace std;
#include <exception>
@@ -223,13 +224,13 @@ int SessionManager::start()
}
break;
}
cout << "recv got " << peakLength << " bytes" << endl;
endOfData = remainingBytes + peakLength;
if (endOfData < 8)
{
//read this snippet and keep going
len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength);
remainingBytes = endOfData;
len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength);
continue;
}
@@ -238,10 +239,12 @@ int SessionManager::start()
{
if (*((uint *) &recv_buffer[i]) == storagemanager::SM_MSG_START)
{
//printf("Received SM_MSG_START\n");
printf("Received SM_MSG_START\n");
//found it set msgLength and recvMsgStart offset of SM_MSG_START
recvMsgLength = *((uint *) &recv_buffer[i+4]);
cout << "got length = " << recvMsgLength << endl;
recvMsgStart = i+8;
remainingBytes = 0;
//printf(" recvMsgLength %d recvMsgStart %d endofData %d\n", recvMsgLength,recvMsgStart,endOfData);
// if >= endOfData then the start of the message data is the beginning of next message
if (recvMsgStart >= endOfData)
@@ -253,9 +256,13 @@ int SessionManager::start()
// didn't find SM_MSG_START in this message consume the data and loop back through on next message
if (recvMsgLength == 0)
{
//printf("No SM_MSG_START\n");
printf("No SM_MSG_START\n");
len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength);
remainingBytes = endOfData;
// we know the msg header isn't in position [0, endOfData - 7], so throw that out
// and copy [endOfData - 7, endOfData) to the front of the buffer to be
// checked by the next iteration.
memmove(recv_buffer, &recv_buffer[endOfData - 7], 7);
remainingBytes = 7;
}
else
{
@@ -264,12 +271,14 @@ int SessionManager::start()
if (recvMsgStart > 0)
{
//printf("SM_MSG_START data is here\n");
len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength - recvMsgStart);
len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], recvMsgStart);
remainingBytes = 0;
}
else
{
//printf("SM_MSG_START data is next message\n");
len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength);
remainingBytes = 0;
}
//Disable polling on this socket
fds[socketIncr].events = 0;