diff --git a/include/messageFormat.h b/include/messageFormat.h index d3e6928fc..ee753d7f4 100644 --- a/include/messageFormat.h +++ b/include/messageFormat.h @@ -20,14 +20,21 @@ namespace storagemanager #pragma pack(push, 1) struct sm_msg_header { - uint32_t type; // SM_MSG_{START,CONT,END} - uint32_t payloadLen; + uint32_t type; // SM_MSG_{START,CONT,END} + uint32_t payloadLen; // refers to the length of what follows the header + uint8_t flags; // see below for valid values +}; + +// current values for the flags field in sm_msg_header +static const uint8_t CONT = 0x1; // this means another message will follow as part of this request or response + +struct sm_request { + sm_msg_header header; uint8_t payload[]; }; -struct sm_msg_resp { - uint32_t type; - uint32_t payloadLen; +struct sm_response { + sm_msg_header header; int32_t returnCode; // if < 0 it indicates an error, and payload contains a 4-byte errno value uint8_t payload[]; }; diff --git a/src/AppendTask.cpp b/src/AppendTask.cpp index 0e434027a..2c4d0af9e 100644 --- a/src/AppendTask.cpp +++ b/src/AppendTask.cpp @@ -71,22 +71,18 @@ bool AppendTask::run() break; } - uint8_t respbuf[sizeof(sm_msg_resp) + 4]; - sm_msg_resp *resp = (sm_msg_resp *) respbuf; - resp->type = SM_MSG_START; + uint8_t respbuf[sizeof(sm_response) + 4]; + sm_response *resp = (sm_response *) respbuf; + uint payloadLen = 0; if (cmd->count != 0 && writeCount == 0) { - resp->payloadLen = 8; + payloadLen = 4; resp->returnCode = -1; *((int *) &resp[1]) = errno; - success = write((uint8_t *) respbuf, sizeof(sm_msg_resp) + 4); } else - { - resp->payloadLen = 4; resp->returnCode = writeCount; - success = write((uint8_t *) respbuf, sizeof(sm_msg_resp)); - } + success = write(*resp, payloadLen); return success; } diff --git a/src/CopyTask.cpp b/src/CopyTask.cpp index f9f68cc01..d350a7628 100644 --- a/src/CopyTask.cpp +++ b/src/CopyTask.cpp @@ -51,11 +51,10 @@ bool CopyTask::run() return true; } - sm_msg_resp *resp = (sm_msg_resp *) buf; - resp->type = SM_MSG_START; - resp->payloadLen = 4; + sm_response *resp = (sm_response *) buf; + uint payloadLen = 0; resp->returnCode = 0; - success = write(buf, sizeof(sm_msg_resp)); + success = write(*resp, 0); return success; } diff --git a/src/ListDirectoryTask.cpp b/src/ListDirectoryTask.cpp index c269f99b9..4547ad78c 100644 --- a/src/ListDirectoryTask.cpp +++ b/src/ListDirectoryTask.cpp @@ -87,9 +87,10 @@ bool ListDirectoryTask::run() for (uint i = 0; i < listing.size(); i++) payloadLen += listing[i].size(); - sm_msg_resp *resp = (sm_msg_resp *) buf; - resp->type = SM_MSG_START; - resp->payloadLen = payloadLen + 4; // the +4 is for the length of the return code + sm_response *resp = (sm_response *) buf; + resp->header.type = SM_MSG_START; + resp->header.payloadLen = payloadLen + sizeof(sm_response) - sizeof(sm_msg_header); // the +4 is for the length of the return code + resp->header.flags = 0; resp->returnCode = 0; listdir_resp *r = (listdir_resp *) resp->payload; r->elements = listing.size(); diff --git a/src/OpenTask.cpp b/src/OpenTask.cpp index 9af449f25..ff943db03 100644 --- a/src/OpenTask.cpp +++ b/src/OpenTask.cpp @@ -47,19 +47,16 @@ bool OpenTask::run() #ifdef SM_TRACE cout << "open filename " << cmd->filename << " mode " << oct << cmd->openmode << dec << endl; #endif - - int err = ioc->open(cmd->filename, cmd->openmode, (struct stat *) &buf[sizeof(sm_msg_resp)]); + sm_response *resp = (sm_response *) buf; + int err = ioc->open(cmd->filename, cmd->openmode, (struct stat *) &resp->payload); if (err) { handleError("OpenTask open", errno); return true; } - sm_msg_resp *resp = (sm_msg_resp *) buf; - resp->type = SM_MSG_START; - resp->payloadLen = sizeof(struct stat) + 4; resp->returnCode = 0; - success = write(buf, sizeof(struct stat) + sizeof(sm_msg_resp)); + success = write(*resp, sizeof(struct stat)); if (!success) handleError("OpenTask write", errno); return success; diff --git a/src/PingTask.cpp b/src/PingTask.cpp index d7eefc4d0..1b1d4773b 100644 --- a/src/PingTask.cpp +++ b/src/PingTask.cpp @@ -34,11 +34,9 @@ bool PingTask::run() } // send generic success response - sm_msg_resp ret; - ret.type = SM_MSG_START; - ret.payloadLen = 4; + sm_response ret; ret.returnCode = 0; - success = write((uint8_t *) &ret, sizeof(ret)); + success = write(ret, 0); return success; } diff --git a/src/PosixTask.cpp b/src/PosixTask.cpp index f7629169c..6285c8be6 100644 --- a/src/PosixTask.cpp +++ b/src/PosixTask.cpp @@ -33,15 +33,13 @@ PosixTask::~PosixTask() void PosixTask::handleError(const char *name, int errCode) { - char buf[80]; + char buf[sizeof(sm_response) + 4]; // send an error response if possible - sm_msg_resp *resp = (sm_msg_resp *) buf; - resp->type = SM_MSG_START; - resp->payloadLen = 8; + sm_response *resp = (sm_response *) buf; resp->returnCode = -1; *((int *) resp->payload) = errCode; - write((uint8_t *) buf, sizeof(*resp) + 4); + write(*resp, 4); // TODO: construct and log a message cout << name << " caught an error: " << strerror_r(errCode, buf, 80) << endl; @@ -151,6 +149,27 @@ bool PosixTask::write(const uint8_t *buf, uint len) return true; } +bool PosixTask::write(sm_response &resp, uint payloadLength) +{ + int err; + uint count = 0; + uint8_t *buf = (uint8_t *) &resp; + + resp.header.type = SM_MSG_START; + resp.header.flags = 0; + resp.header.payloadLen = payloadLength + sizeof(sm_response) - sizeof(sm_msg_header); + uint toSend = payloadLength + sizeof(sm_response); + + while (count < toSend) + { + err = ::send(sock, &buf[count], toSend - count, 0); + if (err < 0) + return false; + count += err; + } + return true; +} + bool PosixTask::write(const vector &buf) { return write(&buf[0], buf.size()); diff --git a/src/PosixTask.h b/src/PosixTask.h index 808cb95b8..107ef5208 100644 --- a/src/PosixTask.h +++ b/src/PosixTask.h @@ -8,6 +8,7 @@ #include #include #include "IOCoordinator.h" +#include "messageFormat.h" namespace storagemanager { @@ -25,6 +26,7 @@ class PosixTask protected: bool read(uint8_t *buf, uint length); bool write(const std::vector &buf); + bool write(sm_response &resp, uint payloadLength); bool write(const uint8_t *buf, uint length); void consumeMsg(); // drains the remaining portion of the message uint getLength(); // returns the total length of the msg diff --git a/src/ReadTask.cpp b/src/ReadTask.cpp index 8ef104358..9336f5447 100644 --- a/src/ReadTask.cpp +++ b/src/ReadTask.cpp @@ -46,12 +46,11 @@ bool ReadTask::run() // read from IOC, write to the socket vector outbuf; - outbuf.resize(max(cmd->count, 4) + sizeof(sm_msg_resp)); - sm_msg_resp *resp = (sm_msg_resp *) &outbuf[0]; + outbuf.resize(max(cmd->count, 4) + sizeof(sm_response)); + sm_response *resp = (sm_response *) &outbuf[0]; - resp->type = SM_MSG_START; resp->returnCode = 0; - resp->payloadLen = 4; + uint payloadLen = 0; // todo: do the reading and writing in chunks // todo: need to make this use O_DIRECT on the IOC side @@ -63,8 +62,8 @@ bool ReadTask::run() cmd->count - resp->returnCode); if (err < 0) { if (resp->returnCode == 0) { - resp->payloadLen = 8; resp->returnCode = err; + payloadLen = 4; *((int32_t *) resp->payload) = errno; } break; @@ -72,10 +71,10 @@ bool ReadTask::run() if (err == 0) break; resp->returnCode += err; - resp->payloadLen += err; } - - success = write(&outbuf[0], resp->payloadLen + SM_HEADER_LEN); + if (resp->returnCode >= 0) + payloadLen = resp->returnCode; + success = write(*resp, payloadLen); return success; } diff --git a/src/SessionManager.cpp b/src/SessionManager.cpp index 90d57dfd4..bcbe3fc21 100644 --- a/src/SessionManager.cpp +++ b/src/SessionManager.cpp @@ -280,7 +280,7 @@ int SessionManager::start() //found it set msgLength and recvMsgStart offset of SM_MSG_START recvMsgLength = *((uint *) &recv_buffer[i+4]); //cout << "got length = " << recvMsgLength << endl; - recvMsgStart = i+8; + recvMsgStart = i + SM_HEADER_LEN; //printf(" recvMsgLength %d recvMsgStart %d endofData %d\n", recvMsgLength,recvMsgStart,endOfData); // if >= endOfData then the start of the message data is the beginning of next message if (recvMsgStart >= endOfData) diff --git a/src/SessionManager.h b/src/SessionManager.h index 00f38210c..81d9a5ed6 100644 --- a/src/SessionManager.h +++ b/src/SessionManager.h @@ -8,7 +8,7 @@ #include #include -#include +#include namespace storagemanager @@ -53,7 +53,7 @@ private: char remainingData[SM_HEADER_LEN]; uint remainingBytes; }; - std::unordered_map sockState; + std::tr1::unordered_map sockState; }; diff --git a/src/StatTask.cpp b/src/StatTask.cpp index 6933b5b74..6cb818d47 100644 --- a/src/StatTask.cpp +++ b/src/StatTask.cpp @@ -40,7 +40,7 @@ bool StatTask::run() success = read(buf, getLength()); check_error("StatTask read", false); stat_cmd *cmd = (stat_cmd *) buf; - sm_msg_resp *resp = (sm_msg_resp *) buf; + sm_response *resp = (sm_response *) buf; #ifdef SM_TRACE cout << "stat " << cmd->filename << endl; @@ -48,17 +48,15 @@ bool StatTask::run() int err = ioc->stat(cmd->filename, (struct stat *) resp->payload); - resp->type = SM_MSG_START; resp->returnCode = err; - if (!err) { - resp->payloadLen = sizeof(struct stat) + 4; - success = write(buf, sizeof(*resp) + sizeof(struct stat)); - } + uint payloadLen; + if (!err) + payloadLen = sizeof(struct stat); else { - resp->payloadLen = 8; + payloadLen = 4; *((int32_t *) resp->payload) = errno; - success = write(buf, SM_HEADER_LEN + resp->payloadLen); } + success = write(*resp, payloadLen); return success; } diff --git a/src/TruncateTask.cpp b/src/TruncateTask.cpp index 1ae4e6ce4..48a5a1bb0 100644 --- a/src/TruncateTask.cpp +++ b/src/TruncateTask.cpp @@ -48,11 +48,9 @@ bool TruncateTask::run() return true; } - sm_msg_resp *resp = (sm_msg_resp *) buf; - resp->type = SM_MSG_START; - resp->payloadLen = 4; + sm_response *resp = (sm_response *) buf; resp->returnCode = 0; - success = write(buf, sizeof(sm_msg_resp)); + success = write(*resp, 0); return success; } diff --git a/src/UnlinkTask.cpp b/src/UnlinkTask.cpp index 5f670927f..259cc15e4 100644 --- a/src/UnlinkTask.cpp +++ b/src/UnlinkTask.cpp @@ -49,11 +49,9 @@ bool UnlinkTask::run() return true; } - sm_msg_resp *resp = (sm_msg_resp *) buf; - resp->type = SM_MSG_START; - resp->payloadLen = 4; + sm_response *resp = (sm_response *) buf; resp->returnCode = 0; - success = write(buf, sizeof(*resp)); + success = write(*resp, 0); return success; } diff --git a/src/WriteTask.cpp b/src/WriteTask.cpp index b738ead86..dc09d8147 100644 --- a/src/WriteTask.cpp +++ b/src/WriteTask.cpp @@ -71,22 +71,19 @@ bool WriteTask::run() break; } - uint8_t respbuf[sizeof(sm_msg_resp) + 4]; - sm_msg_resp *resp = (sm_msg_resp *) respbuf; - resp->type = SM_MSG_START; + uint8_t respbuf[sizeof(sm_response) + 4]; + sm_response *resp = (sm_response *) respbuf; + uint payloadLen = 0; if (cmd->count != 0 && writeCount == 0) { - resp->payloadLen = 8; + payloadLen = 4; resp->returnCode = -1; *((int *) &resp[1]) = errno; - success = write((uint8_t *) respbuf, sizeof(sm_msg_resp) + 4); + success = write(*resp, payloadLen); } else - { - resp->payloadLen = 4; resp->returnCode = writeCount; - success = write((uint8_t *) respbuf, sizeof(sm_msg_resp)); - } + success = write(*resp, payloadLen); return success; } diff --git a/src/unit_tests.cpp b/src/unit_tests.cpp index 09899f3eb..93cf6423f 100644 --- a/src/unit_tests.cpp +++ b/src/unit_tests.cpp @@ -96,6 +96,7 @@ bool opentask() // open/create a file named 'opentest1' const char *filename = "opentest1"; hdr->type = SM_MSG_START; + hdr->flags = 0; hdr->payloadLen = sizeof(*cmd) + 9; cmd->opcode = OPEN; cmd->openmode = O_WRONLY | O_CREAT; @@ -109,10 +110,11 @@ bool opentask() // read the response int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); - sm_msg_resp *resp = (sm_msg_resp *) buf; - assert(err == sizeof(struct stat) + sizeof(sm_msg_resp)); - assert(resp->type == SM_MSG_START); - assert(resp->payloadLen == sizeof(struct stat) + 4); + sm_response *resp = (sm_response *) buf; + assert(err == sizeof(struct stat) + sizeof(sm_response)); + assert(resp->header.type == SM_MSG_START); + assert(resp->header.payloadLen == sizeof(struct stat) + 4); + assert(resp->header.flags == 0); assert(resp->returnCode == 0); struct stat *_stat = (struct stat *) resp->payload; @@ -159,10 +161,11 @@ bool writetask() // verify response int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); - sm_msg_resp *resp = (sm_msg_resp *) buf; + sm_response *resp = (sm_response *) buf; assert(err == sizeof(*resp)); - assert(resp->type == SM_MSG_START); - assert(resp->payloadLen == 4); + assert(resp->header.type == SM_MSG_START); + assert(resp->header.payloadLen == 4); + assert(resp->header.flags == 0); assert(resp->returnCode == 9); //check file contents @@ -205,10 +208,11 @@ bool appendtask() // verify response err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); - sm_msg_resp *resp = (sm_msg_resp *) buf; + sm_response *resp = (sm_response *) buf; assert(err == sizeof(*resp)); - assert(resp->type == SM_MSG_START); - assert(resp->payloadLen == 4); + assert(resp->header.type == SM_MSG_START); + assert(resp->header.payloadLen == 4); + assert(resp->header.flags == 0); assert(resp->returnCode == 9); //check file contents @@ -245,10 +249,11 @@ bool unlinktask() // verify response int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); - sm_msg_resp *resp = (sm_msg_resp *) buf; + sm_response *resp = (sm_response *) buf; assert(err == sizeof(*resp)); - assert(resp->type == SM_MSG_START); - assert(resp->payloadLen == 4); + assert(resp->header.type == SM_MSG_START); + assert(resp->header.payloadLen == 4); + assert(resp->header.flags == 0); assert(resp->returnCode == 0); // confirm it no longer exists @@ -277,10 +282,11 @@ bool stattask() // read the response int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); - sm_msg_resp *resp = (sm_msg_resp *) buf; - assert(err == sizeof(struct stat) + sizeof(sm_msg_resp)); - assert(resp->type == SM_MSG_START); - assert(resp->payloadLen == sizeof(struct stat) + 4); + sm_response *resp = (sm_response *) buf; + assert(err == sizeof(struct stat) + sizeof(sm_response)); + assert(resp->header.type == SM_MSG_START); + assert(resp->header.flags == 0); + assert(resp->header.payloadLen == sizeof(struct stat) + 4); assert(resp->returnCode == 0); struct stat *_stat = (struct stat *) resp->payload; @@ -316,10 +322,11 @@ bool truncatetask() // read the response int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); - sm_msg_resp *resp = (sm_msg_resp *) buf; - assert(err == sizeof(sm_msg_resp)); - assert(resp->type == SM_MSG_START); - assert(resp->payloadLen == 4); + sm_response *resp = (sm_response *) buf; + assert(err == sizeof(sm_response)); + assert(resp->header.type == SM_MSG_START); + assert(resp->header.flags == 0); + assert(resp->header.payloadLen == 4); assert(resp->returnCode == 0); struct stat statbuf; @@ -353,13 +360,14 @@ bool listdirtask() /* going to keep this simple. Don't run this in a big dir. */ /* maybe later I'll make a dir, put a file in it, and etc. For now run it in a small dir. */ int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); - sm_msg_resp *resp = (sm_msg_resp *) buf; + sm_response *resp = (sm_response *) buf; assert(err > 0); - assert(resp->type == SM_MSG_START); + assert(resp->header.type == SM_MSG_START); + assert(resp->header.flags == 0); assert(resp->returnCode == 0); listdir_resp *r = (listdir_resp *) resp->payload; //cout << "resp has " << r->elements << " elements" << endl; - int off = sizeof(sm_msg_resp) + sizeof(listdir_resp); + int off = sizeof(sm_response) + sizeof(listdir_resp); while (off < err) { listdir_resp_entry *e = (listdir_resp_entry *) &buf[off]; @@ -391,10 +399,11 @@ bool pingtask() // read the response int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); - sm_msg_resp *resp = (sm_msg_resp *) buf; - assert(err == sizeof(sm_msg_resp)); - assert(resp->type == SM_MSG_START); - assert(resp->payloadLen == 4); + sm_response *resp = (sm_response *) buf; + assert(err == sizeof(sm_response)); + assert(resp->header.type == SM_MSG_START); + assert(resp->header.payloadLen == 4); + assert(resp->header.flags == 0); assert(resp->returnCode == 0); cout << "pingtask OK" << endl; } @@ -431,10 +440,11 @@ bool copytask() // read the response err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); - sm_msg_resp *resp = (sm_msg_resp *) buf; - assert(err == sizeof(sm_msg_resp)); - assert(resp->type == SM_MSG_START); - assert(resp->payloadLen == 4); + sm_response *resp = (sm_response *) buf; + assert(err == sizeof(sm_response)); + assert(resp->header.type == SM_MSG_START); + assert(resp->header.payloadLen == 4); + assert(resp->header.flags == 0); assert(resp->returnCode == 0); // verify copytest2 is there