diff --git a/src/ReadTask.cpp b/src/ReadTask.cpp index aa62b4b87..3abac9aae 100644 --- a/src/ReadTask.cpp +++ b/src/ReadTask.cpp @@ -23,6 +23,8 @@ ReadTask::~ReadTask() return ret; \ } +#define min(x, y) (x < y ? x : y) + bool ReadTask::run() { uint8_t buf[1024] = {0}; @@ -36,38 +38,38 @@ bool ReadTask::run() bool success; success = read(buf, getLength()); check_error("ReadTask read cmd", false); - cmd_overlay *cmd = (cmd_overlay *) buf; + read_cmd *cmd = (read_cmd *) buf; // read from IOC, write to the socket vector outbuf; - outbuf.resize(cmd->count + SM_HEADER_LEN); - uint32_t *outbuf32 = (uint32_t *) &outbuf[0]; - outbuf32[0] = SM_MSG_START; - outbuf32[1] = cmd->count; + outbuf.resize(min(cmd->count, 4) + sizeof(sm_msg_resp)); + sm_msg_resp *resp = (sm_msg_resp *) &outbuf[0]; + + resp->type = SM_MSG_START; + resp->returnCode = 0; + resp->payloadLen = 4; // todo: do the reading and writing in chunks - // todo: need to make this use O_DIRECT + // todo: need to make this use O_DIRECT on the IOC side ioc->willRead(cmd->filename, cmd->offset, cmd->count); - int count = 0, err; - while (count < cmd->count) + int err; + while (resp->returnCode < cmd->count) { - err = ioc->read(cmd->filename, &outbuf[SM_HEADER_LEN + count], cmd->offset + count, cmd->count - count); - if (err <= 0) { - if (count > 0) - outbuf32[1] = count; - else { - int l_errno = errno; - outbuf.resize(16); - outbuf32[1] = 8; - outbuf32[2] = err; - outbuf32[3] = l_errno; + err = ioc->read(cmd->filename, &resp->payload[resp->returnCode], cmd->offset + resp->returnCode, + cmd->count - resp->returnCode); + if (err < 0) { + if (resp->returnCode == 0) { + resp->payloadLen = 8; + resp->returnCode = err; + *((int32_t *) resp->payload) = errno; } break; } - count += err; + resp->returnCode += err; + resp->payloadLen += err; } - success = write(outbuf); + success = write(&outbuf[0], resp->payloadLen + SM_HEADER_LEN); return success; } diff --git a/src/ReadTask.h b/src/ReadTask.h index 1f659f12c..490b2911b 100644 --- a/src/ReadTask.h +++ b/src/ReadTask.h @@ -18,13 +18,6 @@ class ReadTask : public PosixTask private: ReadTask(); - - struct cmd_overlay { - size_t count; - off_t offset; - uint flen; - char filename[]; - }; }; } diff --git a/src/SessionManager.cpp b/src/SessionManager.cpp index a0b1e9f79..d4427f1e5 100644 --- a/src/SessionManager.cpp +++ b/src/SessionManager.cpp @@ -11,6 +11,7 @@ #include #include #include +#include using namespace std; #include @@ -223,13 +224,13 @@ int SessionManager::start() } break; } - + cout << "recv got " << peakLength << " bytes" << endl; endOfData = remainingBytes + peakLength; if (endOfData < 8) { //read this snippet and keep going - len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength); remainingBytes = endOfData; + len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength); continue; } @@ -238,10 +239,12 @@ int SessionManager::start() { if (*((uint *) &recv_buffer[i]) == storagemanager::SM_MSG_START) { - //printf("Received SM_MSG_START\n"); + printf("Received SM_MSG_START\n"); //found it set msgLength and recvMsgStart offset of SM_MSG_START recvMsgLength = *((uint *) &recv_buffer[i+4]); + cout << "got length = " << recvMsgLength << endl; recvMsgStart = i+8; + remainingBytes = 0; //printf(" recvMsgLength %d recvMsgStart %d endofData %d\n", recvMsgLength,recvMsgStart,endOfData); // if >= endOfData then the start of the message data is the beginning of next message if (recvMsgStart >= endOfData) @@ -253,9 +256,13 @@ int SessionManager::start() // didn't find SM_MSG_START in this message consume the data and loop back through on next message if (recvMsgLength == 0) { - //printf("No SM_MSG_START\n"); + printf("No SM_MSG_START\n"); len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength); - remainingBytes = endOfData; + // we know the msg header isn't in position [0, endOfData - 7], so throw that out + // and copy [endOfData - 7, endOfData) to the front of the buffer to be + // checked by the next iteration. + memmove(recv_buffer, &recv_buffer[endOfData - 7], 7); + remainingBytes = 7; } else { @@ -264,12 +271,14 @@ int SessionManager::start() if (recvMsgStart > 0) { //printf("SM_MSG_START data is here\n"); - len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength - recvMsgStart); + len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], recvMsgStart); + remainingBytes = 0; } else { //printf("SM_MSG_START data is next message\n"); len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength); + remainingBytes = 0; } //Disable polling on this socket fds[socketIncr].events = 0;