1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-18 13:54:11 +03:00

Made the write/append tasks work in 1MB chunks.

Made the read task send a proper error response.
This commit is contained in:
Patrick LeBlanc
2019-01-30 13:00:48 -06:00
parent a24a28fa34
commit 9a3eae4a27
4 changed files with 52 additions and 39 deletions

View File

@@ -40,29 +40,32 @@ void AppendTask::run()
success = read(&cmdbuf[sizeof(*cmd)], min(cmd->filename_len, 1024 - sizeof(*cmd) - 1)); success = read(&cmdbuf[sizeof(*cmd)], min(cmd->filename_len, 1024 - sizeof(*cmd) - 1));
check_error("AppendTask read"); check_error("AppendTask read");
size_t count = 0; size_t readCount = 0, writeCount = 0;
vector<uint8_t> databuf; vector<uint8_t> databuf;
databuf.resize(cmd->count); uint bufsize = 1 << 20; // 1 MB
databuf.resize(bufsize); // 1 MB
// todo: do this in chunks... while (readCount < cmd->count)
while (count < cmd->count)
{ {
success = read(&databuf[count], cmd->count - count); uint toRead = min(cmd->count - readCount, bufsize); // 1 MB
success = read(&databuf[0], toRead);
check_error("AppendTask read data"); check_error("AppendTask read data");
readCount += toRead;
uint count2 = 0; while (writeCount < readCount)
while (count2 < cmd->count - count)
{ {
err = ioc->append(cmd->filename, &databuf[count], cmd->count - count); int err = ioc->append(cmd->filename, &databuf[writeCount], readCount - writeCount);
if (err <= 0) if (err <= 0)
{ break;
handleError("AppendTask write data", errno); writeCount += err;
return;
} }
count2 += err; if (readCount != writeCount)
} break;
count += cmd->count;
} }
uint32_t *buf32 = (uint32_t *) cmdbuf;
buf32[0] = SM_MSG_START;
buf32[1] = 4;
buf32[2] = writeCount;
write(cmdbuf, 12);
} }
} }

View File

@@ -48,21 +48,22 @@ void ReadTask::run()
outbuf32[0] = SM_MSG_START; outbuf32[0] = SM_MSG_START;
outbuf32[1] = cmd->count; outbuf32[1] = cmd->count;
// do the reading and writing in chunks // todo: do the reading and writing in chunks
ioc->willRead(cmd->filename, cmd->offset, cmd->count); ioc->willRead(cmd->filename, cmd->offset, cmd->count);
int count = 0, err; int count = 0, err;
while (count < cmd->count) while (count < cmd->count)
{ {
err = ioc->read(cmd->filename, &outbuf[SM_HEADER_LEN + count], cmd->offset + count, cmd->count - count); err = ioc->read(cmd->filename, &outbuf[SM_HEADER_LEN + count], cmd->offset + count, cmd->count - count);
if (err < 0) if (err <= 0) {
{ if (count > 0)
handleError("ReadTask read data", errno); outbuf32[1] = count;
return; else {
outbuf.resize(16);
outbuf32[1] = 8;
outbuf32[2] = err;
outbuf32[3] = errno;
} }
else if (err == 0) break;
{
handleError("ReadTask EOF", errno);
return;
} }
count += err; count += err;
} }

View File

@@ -39,28 +39,32 @@ void WriteTask::run()
success = read(&cmdbuf[sizeof(*cmd)], min(cmd->filename_len, 1024 - sizeof(*cmd) - 1)); success = read(&cmdbuf[sizeof(*cmd)], min(cmd->filename_len, 1024 - sizeof(*cmd) - 1));
check_error("WriteTask read"); check_error("WriteTask read");
size_t count = 0; size_t readCount = 0, writeCount = 0;
vector<uint8_t> databuf; vector<uint8_t> databuf;
databuf.resize(cmd->count); uint bufsize = 1 << 20; // 1 MB
databuf.resize(bufsize); // 1 MB
// todo: do this in chunks... while (readCount < cmd->count)
while (count < cmd->count)
{ {
success = read(&databuf[count], cmd->count - count); uint toRead = min(cmd->count - readCount, bufsize); // 1 MB
success = read(&databuf[0], toRead);
check_error("WriteTask read data"); check_error("WriteTask read data");
count += cmd->count; readCount += toRead;
int count2 = 0; while (writeCount < readCount)
while (count2 < count)
{ {
int err = ioc->write(cmd->filename, &databuf[count + count2], cmd->offset + count2, cmd->count - count2); int err = ioc->write(cmd->filename, &databuf[writeCount], cmd->offset + writeCount, readCount - writeCount);
if (err <= 0) if (err <= 0)
{ break;
handleError("WriteTask write", errno); writeCount += err;
return;
}
count2 += err;
} }
if (writeCount != readCount)
break;
} }
uint32_t *buf32 = (uint32_t *) cmdbuf;
buf32[0] = SM_MSG_START;
buf32[1] = 4;
buf32[2] = writeCount;
write(cmdbuf, 12);
} }
} }

View File

@@ -10,12 +10,17 @@
using namespace std; using namespace std;
#include "SessionManager.h" #include "SessionManager.h"
#include "IOCoordinator.h"
using namespace storagemanager; using namespace storagemanager;
IOCoordinator *ioc;
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
int ret = 0; int ret = 0;
SessionManager sessionManager = SessionManager(); SessionManager sessionManager = SessionManager();
ioc = new IOCoordinator();
ret = sessionManager.start(); ret = sessionManager.start();