diff --git a/src/AppendTask.cpp b/src/AppendTask.cpp index db899958f..14698cf97 100644 --- a/src/AppendTask.cpp +++ b/src/AppendTask.cpp @@ -40,29 +40,32 @@ void AppendTask::run() success = read(&cmdbuf[sizeof(*cmd)], min(cmd->filename_len, 1024 - sizeof(*cmd) - 1)); check_error("AppendTask read"); - size_t count = 0; + size_t readCount = 0, writeCount = 0; vector databuf; - databuf.resize(cmd->count); + uint bufsize = 1 << 20; // 1 MB + databuf.resize(bufsize); // 1 MB - // todo: do this in chunks... - while (count < cmd->count) + while (readCount < cmd->count) { - success = read(&databuf[count], cmd->count - count); + uint toRead = min(cmd->count - readCount, bufsize); // 1 MB + success = read(&databuf[0], toRead); check_error("AppendTask read data"); - - uint count2 = 0; - while (count2 < cmd->count - count) + readCount += toRead; + while (writeCount < readCount) { - err = ioc->append(cmd->filename, &databuf[count], cmd->count - count); + int err = ioc->append(cmd->filename, &databuf[writeCount], readCount - writeCount); if (err <= 0) - { - handleError("AppendTask write data", errno); - return; - } - count2 += err; + break; + writeCount += err; } - count += cmd->count; + if (readCount != writeCount) + break; } + uint32_t *buf32 = (uint32_t *) cmdbuf; + buf32[0] = SM_MSG_START; + buf32[1] = 4; + buf32[2] = writeCount; + write(cmdbuf, 12); } } diff --git a/src/ReadTask.cpp b/src/ReadTask.cpp index 7384768b8..0c4021e04 100644 --- a/src/ReadTask.cpp +++ b/src/ReadTask.cpp @@ -48,21 +48,22 @@ void ReadTask::run() outbuf32[0] = SM_MSG_START; outbuf32[1] = cmd->count; - // do the reading and writing in chunks + // todo: do the reading and writing in chunks ioc->willRead(cmd->filename, cmd->offset, cmd->count); int count = 0, err; while (count < cmd->count) { err = ioc->read(cmd->filename, &outbuf[SM_HEADER_LEN + count], cmd->offset + count, cmd->count - count); - if (err < 0) - { - handleError("ReadTask read data", errno); - return; - } - else if (err == 0) - { - handleError("ReadTask EOF", errno); - return; + if (err <= 0) { + if (count > 0) + outbuf32[1] = count; + else { + outbuf.resize(16); + outbuf32[1] = 8; + outbuf32[2] = err; + outbuf32[3] = errno; + } + break; } count += err; } diff --git a/src/WriteTask.cpp b/src/WriteTask.cpp index 6d5565c18..57a9ffdea 100644 --- a/src/WriteTask.cpp +++ b/src/WriteTask.cpp @@ -39,28 +39,32 @@ void WriteTask::run() success = read(&cmdbuf[sizeof(*cmd)], min(cmd->filename_len, 1024 - sizeof(*cmd) - 1)); check_error("WriteTask read"); - size_t count = 0; + size_t readCount = 0, writeCount = 0; vector databuf; - databuf.resize(cmd->count); + uint bufsize = 1 << 20; // 1 MB + databuf.resize(bufsize); // 1 MB - // todo: do this in chunks... - while (count < cmd->count) + while (readCount < cmd->count) { - success = read(&databuf[count], cmd->count - count); + uint toRead = min(cmd->count - readCount, bufsize); // 1 MB + success = read(&databuf[0], toRead); check_error("WriteTask read data"); - count += cmd->count; - int count2 = 0; - while (count2 < count) + readCount += toRead; + while (writeCount < readCount) { - int err = ioc->write(cmd->filename, &databuf[count + count2], cmd->offset + count2, cmd->count - count2); + int err = ioc->write(cmd->filename, &databuf[writeCount], cmd->offset + writeCount, readCount - writeCount); if (err <= 0) - { - handleError("WriteTask write", errno); - return; - } - count2 += err; + break; + writeCount += err; } + if (writeCount != readCount) + break; } + uint32_t *buf32 = (uint32_t *) cmdbuf; + buf32[0] = SM_MSG_START; + buf32[1] = 4; + buf32[2] = writeCount; + write(cmdbuf, 12); } } diff --git a/src/main.cpp b/src/main.cpp index e0cdad84a..fd1fe8de6 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -10,12 +10,17 @@ using namespace std; #include "SessionManager.h" +#include "IOCoordinator.h" + using namespace storagemanager; +IOCoordinator *ioc; + int main(int argc, char** argv) { int ret = 0; SessionManager sessionManager = SessionManager(); + ioc = new IOCoordinator(); ret = sessionManager.start();