diff --git a/include/messageFormat.h b/include/messageFormat.h index 637fabf1d..3c1fe4d09 100644 --- a/include/messageFormat.h +++ b/include/messageFormat.h @@ -7,30 +7,48 @@ #ifndef MESSAGEFORMAT_H_ #define MESSAGEFORMAT_H_ +#include +#include +#include + + namespace storagemanager { #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-variable" +#pragma pack(push, 1) +struct sm_msg_header { + uint32_t type; // SM_MSG_{START,CONT,END} + uint32_t payloadLen; + uint8_t payload[]; +}; + +struct sm_msg_resp { + uint32_t type; + uint32_t payloadLen; + int32_t returnCode; // if < 0 it indicates an error, and payload contains a 4-byte errno value + uint8_t payload[]; +}; + // all msgs to and from StorageManager begin with this magic and a payload length -static const uint SM_MSG_START=0xbf65a7e1; +static const uint32_t SM_MSG_START=0xbf65a7e1; // for read/write/append, which may break a message into chunks, messages not the // beginning or end of the larger message will preface a chunk with this magic -static const uint SM_MSG_CONT=0xfa371bd2; +static const uint32_t SM_MSG_CONT=0xfa371bd2; // for read/write/append, the last chunk of a message should begin with this magic -static const uint SM_MSG_END=0x9d5bc31b; +static const uint32_t SM_MSG_END=0x9d5bc31b; -static const uint SM_HEADER_LEN = 8; +static const uint32_t SM_HEADER_LEN = sizeof(sm_msg_header); // the unix socket StorageManager is listening on static const char *socket_name = "\0storagemanager"; #pragma GCC diagnostic pop - // opcodes understood by StorageManager. Cast these to // a uint8_t to use them. enum Opcodes { @@ -57,7 +75,9 @@ enum Opcodes { On success, what follows is any output parameters from the call. TBD: Require filenames to be NULL-terminated. Currently they are not. +*/ +/* OPEN ---- command format: @@ -65,7 +85,19 @@ enum Opcodes { response format: struct stat +*/ +struct open_cmd { + uint8_t opcode; // == OPEN + int32_t openmode; + uint32_t flen; + char filename[]; +}; +struct open_resp { + struct stat statbuf; +}; + +/* READ ---- command format: @@ -73,28 +105,66 @@ enum Opcodes { response format: data (size is stored in the return code) +*/ +struct read_cmd { + uint8_t opcode; // == READ + size_t count; + off_t offset; + uint32_t flen; + char filename[]; +}; +typedef uint8_t read_resp; + +/* WRITE ----- command format: 1-byte opcode|size_t count|off_t offset|4-byte filename length|filename|data response format: +*/ +struct write_cmd { + uint8_t opcode; // == WRITE + size_t count; + off_t offset; + uint32_t flen; + char filename[]; + // after this is the data to be written, ie at &filename[flen] +}; +/* APPEND ------ command format: - 1-byte opcode|4-byte filename length|filename|size_t count|data + 1-byte opcode|size_t count|4-byte filename length|filename|data response format: +*/ +struct append_cmd { + uint8_t opcode; // == APPEND + size_t count; + uint32_t flen; + char filename[]; + // after this is the data to be written, ie at &filename[flen] +}; +/* UNLINK ------ command format: 1-byte opcode|4-byte filename length|filename response format: - +*/ +struct unlink_cmd { + uint8_t opcode; // == UNLINK + uint32_t flen; + char filename[]; +}; + +/* + STAT ---- command format: @@ -102,7 +172,19 @@ enum Opcodes { response format: struct stat +*/ +struct stat_cmd { + uint8_t opcode; // == STAT + uint32_t flen; + char filename[]; +}; +struct stat_resp { + struct stat statbuf; +} + +/* + TRUNCATE -------- command format: @@ -128,6 +210,8 @@ enum Opcodes { */ +#pragma pack(pop) + } #endif diff --git a/src/AppendTask.cpp b/src/AppendTask.cpp index 8561bb384..947c70524 100644 --- a/src/AppendTask.cpp +++ b/src/AppendTask.cpp @@ -34,16 +34,21 @@ void AppendTask::run() uint8_t cmdbuf[1024] = {0}; int err; - success = read(cmdbuf, sizeof(struct cmd_overlay)); + success = read(cmdbuf, sizeof(append_cmd)); check_error("AppendTask read"); - cmd_overlay *cmd = (cmd_overlay *) cmdbuf; + append_cmd *cmd = (append_cmd *) cmdbuf; - success = read(&cmdbuf[sizeof(*cmd)], min(cmd->filename_len, 1024 - sizeof(*cmd) - 1)); + if (cmd->flen > 1023 - sizeof(*cmd)) + { + handleError("AppendTask", ENAMETOOLONG); + return; + } + success = read(&cmdbuf[sizeof(*cmd)], cmd->flen); check_error("AppendTask read"); size_t readCount = 0, writeCount = 0; vector databuf; - uint bufsize = 1 << 20; // 1 MB + uint bufsize = min(1 << 20, cmd->count); // 1 MB databuf.resize(bufsize); while (readCount < cmd->count) @@ -52,31 +57,34 @@ void AppendTask::run() success = read(&databuf[0], toRead); check_error("AppendTask read data"); readCount += toRead; + uint writePos = 0; while (writeCount < readCount) { - int err = ioc->append(cmd->filename, &databuf[writeCount], readCount - writeCount); + int err = ioc->append(cmd->filename, &databuf[writePos], toRead - writePos); if (err <= 0) break; writeCount += err; + writePos += err; } if (readCount != writeCount) break; } - uint32_t response[4]; - response[0] = SM_MSG_START; + uint8_t respbuf[sizeof(sm_msg_resp) + 4]; + sm_msg_resp *resp = (sm_msg_resp *) respbuf; + resp->type = SM_MSG_START; if (cmd->count != 0 && writeCount == 0) { - response[1] = 8; - response[2] = -1; - response[3] = errno; - write((uint8_t *) response, 16); + resp->payloadLen = 8; + resp->returnCode = -1; + *((int *) &resp[1]) = errno; + write((uint8_t *) respbuf, sizeof(sm_msg_resp) + 4); } else { - response[1] = 4; - response[2] = writeCount; - write((uint8_t *) response, 12); + resp->payloadLen = 4; + resp->returnCode = writeCount; + write((uint8_t *) respbuf, sizeof(sm_msg_resp)); } } diff --git a/src/AppendTask.h b/src/AppendTask.h index cfcdae25f..6183801b0 100644 --- a/src/AppendTask.h +++ b/src/AppendTask.h @@ -18,12 +18,6 @@ class AppendTask : public PosixTask private: AppendTask(); - - struct cmd_overlay { - size_t count; - uint filename_len; - char filename[]; - }; }; } diff --git a/src/IOCoordinator.cpp b/src/IOCoordinator.cpp index 95eead2f0..5982010e7 100644 --- a/src/IOCoordinator.cpp +++ b/src/IOCoordinator.cpp @@ -37,7 +37,7 @@ struct scoped_closer { }; #define OPEN(name, mode) \ - fd = ::open(filename, mode); \ + fd = ::open(filename, mode, 0666); \ if (fd < 0) \ return fd; \ scoped_closer sc(fd); diff --git a/src/OpenTask.cpp b/src/OpenTask.cpp index 1aef69573..cf2f9e773 100644 --- a/src/OpenTask.cpp +++ b/src/OpenTask.cpp @@ -45,19 +45,20 @@ void OpenTask::run() return; } - cmd_overlay *cmd = (cmd_overlay *) buf; - - int err = ioc->open(cmd->filename, cmd->openmode, (struct stat *) &buf[SM_HEADER_LEN]); + open_cmd *cmd = (open_cmd *) buf; + + int err = ioc->open(cmd->filename, cmd->openmode, (struct stat *) &buf[sizeof(sm_msg_resp)]); if (err) { handleError("OpenTask open", errno); return; } - uint32_t *buf32 = (uint32_t *) buf; - buf32[0] = SM_MSG_START; - buf32[1] = sizeof(struct stat); - success = write(buf, sizeof(struct stat) + SM_HEADER_LEN); + sm_msg_resp *resp = (sm_msg_resp *) buf; + resp->type = SM_MSG_START; + resp->payloadLen = sizeof(struct stat) + 4; + resp->returnCode = 0; + success = write(buf, sizeof(struct stat) + sizeof(sm_msg_resp)); if (!success) handleError("OpenTask write", errno); } diff --git a/src/OpenTask.h b/src/OpenTask.h index f29b84052..0bba356a6 100644 --- a/src/OpenTask.h +++ b/src/OpenTask.h @@ -17,12 +17,6 @@ class OpenTask : public PosixTask private: OpenTask(); - - struct cmd_overlay { - int openmode; - uint flen; - char filename[]; - }; }; diff --git a/src/PosixTask.cpp b/src/PosixTask.cpp index 813acb289..6a95d6000 100644 --- a/src/PosixTask.cpp +++ b/src/PosixTask.cpp @@ -36,15 +36,15 @@ void PosixTask::handleError(const char *name, int errCode) char buf[80]; // send an error response if possible - int32_t *buf32 = (int32_t *) buf; - buf32[0] = SM_MSG_START; - buf32[1] = 8; - buf32[2] = -1; - buf32[3] = errCode; - write((uint8_t *) buf, 16); + sm_msg_resp *resp = (sm_msg_resp *) buf; + resp->type = SM_MSG_START; + resp->payloadLen = 8; + resp->returnCode = -1; + *((int *) resp->payload) = errCode; + write((uint8_t *) buf, sizeof(*resp) + 4); // TODO: construct and log a message - cout << name << " caught an error reading from a socket: " << strerror_r(errCode, buf, 80) << endl; + cout << name << " caught an error: " << strerror_r(errCode, buf, 80) << endl; socketError(); } diff --git a/src/ProcessTask.cpp b/src/ProcessTask.cpp index 94239e373..bf47e2576 100644 --- a/src/ProcessTask.cpp +++ b/src/ProcessTask.cpp @@ -14,6 +14,8 @@ #include "UnlinkTask.h" #include "WriteTask.h" +#include + using namespace std; namespace storagemanager @@ -50,8 +52,7 @@ void ProcessTask::operator()() int err; uint8_t opcode; - /* D'oh, a 1-byte read.... TODO! */ - err = ::read(sock, &opcode, 1); + err = ::recv(sock, &opcode, 1, MSG_PEEK); if (err <= 0) { handleError(errno); diff --git a/src/ReadTask.cpp b/src/ReadTask.cpp index 7d9fe044c..cb364ccd7 100644 --- a/src/ReadTask.cpp +++ b/src/ReadTask.cpp @@ -49,6 +49,7 @@ void ReadTask::run() outbuf32[1] = cmd->count; // todo: do the reading and writing in chunks + // todo: need to make this use O_DIRECT ioc->willRead(cmd->filename, cmd->offset, cmd->count); int count = 0, err; while (count < cmd->count) diff --git a/src/UnlinkTask.cpp b/src/UnlinkTask.cpp index aa28dccd1..b21712bff 100644 --- a/src/UnlinkTask.cpp +++ b/src/UnlinkTask.cpp @@ -39,7 +39,7 @@ void UnlinkTask::run() success = read(buf, getLength()); check_error("UnlinkTask read"); - cmd_overlay *cmd = (cmd_overlay *) buf; + unlink_cmd *cmd = (unlink_cmd *) buf; int err = ioc->unlink(cmd->filename); if (err) @@ -48,11 +48,11 @@ void UnlinkTask::run() return; } - uint32_t *buf32 = (uint32_t *) buf; - buf32[0] = SM_MSG_START; - buf32[1] = 4; - buf32[2] = 0; - write(buf, 12); + sm_msg_resp *resp = (sm_msg_resp *) buf; + resp->type = SM_MSG_START; + resp->payloadLen = 4; + resp->returnCode = 0; + write(buf, sizeof(*resp)); } } diff --git a/src/UnlinkTask.h b/src/UnlinkTask.h index 61e515a9a..a92521763 100644 --- a/src/UnlinkTask.h +++ b/src/UnlinkTask.h @@ -17,11 +17,6 @@ class UnlinkTask : public PosixTask private: UnlinkTask(); - - struct cmd_overlay { - uint flen; - char filename[]; - }; }; diff --git a/src/WriteTask.cpp b/src/WriteTask.cpp index f850582f2..d0ba240a3 100644 --- a/src/WriteTask.cpp +++ b/src/WriteTask.cpp @@ -32,17 +32,22 @@ void WriteTask::run() { bool success; uint8_t cmdbuf[1024] = {0}; - - success = read(cmdbuf, sizeof(struct cmd_overlay)); - check_error("WriteTask read"); - cmd_overlay *cmd = (cmd_overlay *) cmdbuf; - success = read(&cmdbuf[sizeof(*cmd)], min(cmd->filename_len, 1024 - sizeof(*cmd) - 1)); + success = read(cmdbuf, sizeof(write_cmd)); + check_error("WriteTask read"); + write_cmd *cmd = (write_cmd *) cmdbuf; + + if (cmd->flen > 1023 - sizeof(*cmd)) + { + handleError("WriteTask", ENAMETOOLONG); + return; + } + success = read(&cmdbuf[sizeof(*cmd)], cmd->flen); check_error("WriteTask read"); size_t readCount = 0, writeCount = 0; vector databuf; - uint bufsize = 1 << 20; // 1 MB + uint bufsize = min(1 << 20, cmd->count); // 1 MB databuf.resize(bufsize); while (readCount < cmd->count) @@ -51,31 +56,34 @@ void WriteTask::run() success = read(&databuf[0], toRead); check_error("WriteTask read data"); readCount += toRead; + uint writePos = 0; while (writeCount < readCount) { - int err = ioc->write(cmd->filename, &databuf[writeCount], cmd->offset + writeCount, readCount - writeCount); + int err = ioc->append(cmd->filename, &databuf[writePos], toRead - writePos); if (err <= 0) break; writeCount += err; + writePos += err; } if (writeCount != readCount) break; } - uint32_t response[4]; - response[0] = SM_MSG_START; + uint8_t respbuf[sizeof(sm_msg_resp) + 4]; + sm_msg_resp *resp = (sm_msg_resp *) respbuf; + resp->type = SM_MSG_START; if (cmd->count != 0 && writeCount == 0) { - response[1] = 8; - response[2] = -1; - response[3] = errno; - write((uint8_t *) response, 16); + resp->payloadLen = 8; + resp->returnCode = -1; + *((int *) &resp[1]) = errno; + write((uint8_t *) respbuf, sizeof(sm_msg_resp) + 4); } else { - response[1] = 4; - response[2] = writeCount; - write((uint8_t *) response, 12); + resp->payloadLen = 4; + resp->returnCode = writeCount; + write((uint8_t *) respbuf, sizeof(sm_msg_resp)); } } diff --git a/src/WriteTask.h b/src/WriteTask.h index e00243b5b..a0194f451 100644 --- a/src/WriteTask.h +++ b/src/WriteTask.h @@ -18,13 +18,6 @@ class WriteTask : public PosixTask private: WriteTask(); - - struct cmd_overlay { - size_t count; - off_t offset; - uint filename_len; - char filename[]; - }; }; } diff --git a/src/unit_tests.cpp b/src/unit_tests.cpp index a9a4a7519..737dfe5b3 100644 --- a/src/unit_tests.cpp +++ b/src/unit_tests.cpp @@ -1,5 +1,9 @@ #include "OpenTask.h" +#include "WriteTask.h" +#include "AppendTask.h" +#include "UnlinkTask.h" #include "IOCoordinator.h" +#include "messageFormat.h" #include #include #include @@ -24,7 +28,6 @@ struct scoped_closer { scoped_closer(int f) : fd(f) { } ~scoped_closer() { int s_errno = errno; - cout << "closing " << fd << endl; ::close(fd); errno = s_errno; } @@ -38,8 +41,9 @@ int getSocket() return sock; } -int sessionSock = -1; -int serverSock = -1; +int sessionSock = -1; // tester uses this end of the connection +int serverSock = -1; +int clientSock = -1; // have the Tasks use this end of the connection void acceptConnection() { @@ -61,8 +65,9 @@ void acceptConnection() sessionSock = ::accept(serverSock, NULL, NULL); assert(sessionSock > 0); } - -bool opentask() + +// connects sessionSock to clientSock +void makeConnection() { boost::thread t(acceptConnection); struct sockaddr_un sa; @@ -70,43 +75,195 @@ bool opentask() sa.sun_family = AF_UNIX; memcpy(&sa.sun_path[1], "testing", 7); - int clientSock = ::socket(AF_UNIX, SOCK_STREAM, 0); + clientSock = ::socket(AF_UNIX, SOCK_STREAM, 0); assert(clientSock > 0); - scoped_closer s2(clientSock); sleep(1); // let server thread get to accept() int err = ::connect(clientSock, (struct sockaddr *) &sa, sizeof(sa)); assert(err == 0); t.join(); - scoped_closer s3(sessionSock); +} + +bool opentask() +{ // going to rely on msgs being smaller than the buffer here uint8_t buf[1024]; - uint32_t *buf32 = (uint32_t *) buf; + sm_msg_header *hdr = (sm_msg_header *) buf; + open_cmd *cmd = (open_cmd *) &hdr[1]; // open/create a file named 'opentest1' const char *filename = "opentest1"; - buf32[0] = O_WRONLY | O_CREAT; - buf32[1] = 9; - strcpy((char *) &buf32[2], "opentest1"); - uint msg_len = 17; + hdr->type = SM_MSG_START; + hdr->payloadLen = sizeof(*cmd) + 9; + cmd->opcode = OPEN; + cmd->openmode = O_WRONLY | O_CREAT; + cmd->flen = 9; + strcpy((char *) cmd->filename, filename); ::unlink(filename); - ::write(sessionSock, buf, msg_len); - OpenTask o(clientSock, msg_len); + ::write(sessionSock, cmd, hdr->payloadLen); + OpenTask o(clientSock, hdr->payloadLen); o.run(); // read the response + int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); + sm_msg_resp *resp = (sm_msg_resp *) buf; + assert(err == sizeof(struct stat) + sizeof(sm_msg_resp)); + assert(resp->type == SM_MSG_START); + assert(resp->payloadLen == sizeof(struct stat) + 4); + assert(resp->returnCode == 0); + struct stat *_stat = (struct stat *) resp->payload; + // what can we verify about the stat... + assert(_stat->st_uid == getuid()); + assert(_stat->st_gid == getgid()); + assert(_stat->st_size == 0); /* verify the file is there */ assert(boost::filesystem::exists(filename)); ::unlink(filename); + cout << "opentask OK" << endl; + return true; } +bool writetask() +{ + // make an empty file to write to + const char *filename = "writetest1"; + ::unlink(filename); + int fd = ::open(filename, O_CREAT | O_RDWR, 0666); + assert(fd > 0); + scoped_closer f(fd); + + uint8_t buf[1024]; + sm_msg_header *hdr = (sm_msg_header *) buf; + write_cmd *cmd = (write_cmd *) &hdr[1]; + uint8_t *data; + + cmd->opcode = WRITE; + cmd->offset = 0; + cmd->count = 9; + cmd->flen = 10; + memcpy(&cmd->filename, filename, cmd->flen); + data = (uint8_t *) &cmd->filename[cmd->flen]; + memcpy(data, "123456789", cmd->count); + + hdr->type = SM_MSG_START; + hdr->payloadLen = sizeof(*cmd) + cmd->flen + cmd->count; + + WriteTask w(clientSock, hdr->payloadLen); + ::write(sessionSock, cmd, hdr->payloadLen); + w.run(); + + // verify response + int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); + sm_msg_resp *resp = (sm_msg_resp *) buf; + assert(err == sizeof(*resp)); + assert(resp->type == SM_MSG_START); + assert(resp->payloadLen == 4); + assert(resp->returnCode == 9); + + //check file contents + err = ::read(fd, buf, 1024); + assert(err == 9); + buf[9] = 0; + assert(!strcmp("123456789", (const char *) buf)); + ::unlink(filename); + cout << "write task OK" << endl; + return true; +} + +bool appendtask() +{ + // make a file and put some stuff in it + const char *filename = "appendtest1"; + ::unlink(filename); + int fd = ::open(filename, O_CREAT | O_RDWR, 0666); + assert(fd > 0); + scoped_closer f(fd); + int err = ::write(fd, "testjunk", 8); + assert(err == 8); + + uint8_t buf[1024]; + append_cmd *cmd = (append_cmd *) buf; + uint8_t *data; + + cmd->opcode = APPEND; + cmd->count = 9; + cmd->flen = 11; + memcpy(&cmd->filename, filename, cmd->flen); + data = (uint8_t *) &cmd->filename[cmd->flen]; + memcpy(data, "123456789", cmd->count); + + int payloadLen = sizeof(*cmd) + cmd->flen + cmd->count; + + AppendTask a(clientSock, payloadLen); + ::write(sessionSock, cmd, payloadLen); + a.run(); + + // verify response + err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); + sm_msg_resp *resp = (sm_msg_resp *) buf; + assert(err == sizeof(*resp)); + assert(resp->type == SM_MSG_START); + assert(resp->payloadLen == 4); + assert(resp->returnCode == 9); + + //check file contents + ::lseek(fd, 0, SEEK_SET); + err = ::read(fd, buf, 1024); + assert(err == 17); + buf[17] = 0; + assert(!strcmp("testjunk123456789", (const char *) buf)); + ::unlink(filename); + cout << "append task OK" << endl; + return true; +} + +bool unlinktask() +{ + // make a file and delete it + const char *filename = "unlinktest1"; + ::unlink(filename); + int fd = ::open(filename, O_CREAT | O_RDWR, 0666); + assert(fd > 0); + scoped_closer f(fd); + + uint8_t buf[1024]; + unlink_cmd *cmd = (unlink_cmd *) buf; + uint8_t *data; + + cmd->opcode = UNLINK; + cmd->flen = strlen(filename); + memcpy(&cmd->filename, filename, cmd->flen); + + UnlinkTask u(clientSock, sizeof(unlink_cmd) + cmd->flen); + ::write(sessionSock, cmd, sizeof(unlink_cmd) + cmd->flen); + u.run(); + + // verify response + int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); + sm_msg_resp *resp = (sm_msg_resp *) buf; + assert(err == sizeof(*resp)); + assert(resp->type == SM_MSG_START); + assert(resp->payloadLen == 4); + assert(resp->returnCode == 0); + + // confirm it no longer exists + assert(!boost::filesystem::exists(filename)); + cout << "unlink task OK" << endl; +} int main() { ioc = new IOCoordinator(); + cout << "connecting" << endl; + makeConnection(); + cout << "connected" << endl; + scoped_closer sc1(serverSock), sc2(sessionSock), sc3(clientSock); opentask(); + writetask(); + appendtask(); + unlinktask(); return 0; }