diff --git a/include/messageFormat.h b/include/messageFormat.h index 8a96bea66..89529608e 100644 --- a/include/messageFormat.h +++ b/include/messageFormat.h @@ -13,9 +13,18 @@ namespace storagemanager #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-variable" -// all msgs to and from StorageManager begin with this magic +// all msgs to and from StorageManager begin with this magic and a payload length static const uint SM_MSG_START=0xbf65a7e1; +// for read/write/append, which may break a message into chunks, messages not the +// beginning or end of the larger message will preface a chunk with this magic +static const uint SM_MSG_CONT=0xfa371bd2; + +// for read/write/append, the last chunk of a message should begin with this magic +static const uint SM_MSG_END=0x9d5bc31b; + +static const uint SM_HEADER_LEN = 8; + // the unix socket StorageManager is listening on static const char *socket_name = "\0storagemanager"; @@ -46,6 +55,8 @@ enum Opcodes { value for errno follows. On success, no errno is sent. On success, what follows is any output parameters from the call. + + TBD: Require filenames to be NULL-terminated. Currently they are not. OPEN ---- @@ -66,7 +77,7 @@ enum Opcodes { WRITE ----- command format: - 1-byte opcode|4-byte filename length|filename|size_t count|off_t offset|data + 1-byte opcode|size_t count|off_t offset|4-byte filename length|filename|data response format: diff --git a/src/OpenTask.cpp b/src/OpenTask.cpp index e69de29bb..dfef90eaa 100644 --- a/src/OpenTask.cpp +++ b/src/OpenTask.cpp @@ -0,0 +1,47 @@ + + +#include "OpenTask.h" +#include + +using namespace std; + +namespace storagemanager +{ + +OpenTask::OpenTask(int sock, uint len) : PosixTask(sock, len) +{ +} + +void OpenTask::run() +{ + /* + get the parameters + call IOManager to do the work + return the result + */ + bool success; + uint8_t *buf = alloca(max(getLength(), sizeof(struct stat) + SM_HEADER_LEN)); + + success = read(&buf, getLength()); + if (!success) + { + handleError("OpenTask read", errno); + return; + } + + uint32_t flen = *((uint32_t *) &buf[0]); + string filename((char *) &buf[4], flen); // might want to require filenames to be NULL-terminated for efficiency + int openmode = *((int *) &buf[4+flen]); + + // IOC->open(filename, openmode, &buf[SM_HEADER_LEN]) + + // stand-in dummy response + uint32_t *buf32 = (uint32_t *) &buf[0]; + buf32[0] = SM_MSG_START; + buf32[1] = sizeof(struct stat); + success = write(buf, sizeof(struct stat) + SM_HEADER_LEN); + if (!success) + handleError("OpenTask write", errno); +} + +} diff --git a/src/OpenTask.h b/src/OpenTask.h index b06c6caea..3a5dabbbb 100644 --- a/src/OpenTask.h +++ b/src/OpenTask.h @@ -10,10 +10,14 @@ namespace storagemanager class OpenTask : public PosixTask { public: - OpenTask(int _sock, uint length) : PosixTask(_sock) - WORKING HERE - -} + OpenTask(int sock, uint length); + virtual ~OpenTask(); + + void run(); + + private: + OpenTask(); +}; } diff --git a/src/PosixTask.cpp b/src/PosixTask.cpp index c642e4f82..7b7441552 100644 --- a/src/PosixTask.cpp +++ b/src/PosixTask.cpp @@ -1,56 +1,152 @@ #include "PosixTask.h" - +#include +#include namespace storagemanager { -PosixTask::PosixTask(int _sock, uint _length) : sock(_sock), remainingLength(_length) +PosixTask::PosixTask(int _sock, uint _length) : + sock(_sock), + totalLength(_length), + remainingLengthInStream(_length), + remainingLengthForCaller(_length), + bufferPos(0), + bufferLen(0), + socketReturned(false) { } PosixTask::~PosixTask() { - // return the socket + if (!socketReturned) + returnSocket(); } -void PosixTask::handleError(int errCode) +void PosixTask::handleError(const char *name, int errCode) { char buf[80]; + + // send an error response if possible + int32_t *buf32; + buf32[0] = SM_MSG_START; + buf32[1] = 8; + buf32[2] = -1; + buf32[3] = errCode; + write(buf32, 16); + // TODO: construct and log a message - cout << "PosixTask caught an error reading from a socket: " << strerror_r(errCode, buf, 80) << endl; + cout << name << " caught an error reading from a socket: " << strerror_r(errCode, buf, 80) << endl; + socketError(); } -/* Optimization. Make this read larger chunks into a buffer & supply data from that when possible. */ -int PosixTask::read(vector *buf, uint offset, uint length) +void PosixTask::returnSocket() { - if (length > remainingLength) - length = remainingLength; - - uint originalSize = buf->size(); - buf->resize(originalSize + length); + socketReturned = true; +} + +void PosixTask::socketError() +{ + socketReturned = true; +} + +uint PosixTask::getRemainingLength() +{ + return remainingLengthForCaller; +} + +uint PosixTask::getLength() +{ + return totalLength; +} + +bool PosixTask::read(uint8_t *buf, uint length) +{ + if (length > remainingLengthForCaller) + length = remainingLengthForCaller; uint count = 0; int err; + + // copy data from the local buffer first. + uint dataInBuffer = bufferLen - bufferPos; + if (length <= dataInBuffer) + { + memcpy(buf, &localBuffer[bufferPos], length); + count = length; + bufferPos += length; + remainingLengthForCaller -= length; + } + else if (dataInBuffer > 0) + { + memcpy(buf, &localBuffer[bufferPos], dataInBuffer); + count = dataInBuffer; + bufferPos += dataInBuffer; + remainingLengthForCaller -= dataInBuffer; + } + + // read the remaining requested amount from the stream. + // ideally, combine the recv here with the recv below that refills the local + // buffer. while (count < length) { - /* TODO: need a timeout here? */ - err = ::read(sock, &(*buf)[count + offset], length - count); - if (err < 0) { - buf->resize(originalSize); - handleError(errno); - return -1; - } - else if (err == 0) { - buf->resize(originalSize); - handleError(0); - return -1; - } + err = ::recv(sock, &buf[count], length - count, 0); + if (err <= 0) + return false; + count += err; - remainingLength -= err; + remainingLengthInStream -= err; + remainingLengthForCaller -= err; } - return count; + + /* The caller's request has been satisfied here. If there is remaining data in the stream + get what's available. */ + if (remainingLengthInStream > 0) + { + // Reset the buffer to allow a larger read. + if (bufferLen == bufferPos) + { + bufferLen = 0; + bufferPos = 0; + } + else if (bufferLen - bufferPos < 1024) // if < 1024 in the buffer, move data to the front + { + memmove(buffer, &buffer[bufferPos], bufferLen - bufferPos); + bufferLen -= bufferPos; + bufferPos = 0; + } + + uint toRead = min(remainingLengthInStream, bufferSize - bufferLen); + err = ::recv(sock, &localBuffer[bufferLen], toRead, MSG_NOBLOCK); + // ignoring errors here since the request has been satisfied successfully. + // errors will be caught by the next read + if (err > 0) + { + bufferLen += err; + remainingLengthInStream -= err; + } + } + return true; } +bool PosixTask::write(uint8_t *buf, uint len) +{ + int err; + uint count = 0; + + while (count < len) + { + err = ::write(sock, &buf[count], len - count); + if (err < 0) + return false; + count += err; + } + return true; +} + +bool PosixTask::write(const vector &buf) +{ + return write(&buf[0], buf.size()); +} } diff --git a/src/PosixTask.h b/src/PosixTask.h index 16ee6ab75..aa3d0459d 100644 --- a/src/PosixTask.h +++ b/src/PosixTask.h @@ -14,19 +14,27 @@ class PosixTask virtual void run() = 0; protected: - int read(std::vector *buf, uint offset, uint length); + int read(uint8_t *buf, uint length); bool write(const std::vector &buf); + bool write(void *buf, uint length); + uint getLength(); // returns the total length of the msg + uint getRemainingLength(); // returns the remaining length from the caller's perspective + void handleError(const char *name, int errCode); + void returnSocket(); + void socketError(); private: PosixTask(); - void handleError(); - int sock; - uint remainingLength; - uint8_t buffer[4096]; + int totalLength; + uint remainingLengthInStream; + uint remainingLengthForCaller; + static const uint bufferSize = 4096; + uint8_t localBuffer[bufferSize]; uint bufferPos; uint bufferLen; + bool socketReturned; }; diff --git a/src/ProcessTask.cpp b/src/ProcessTask.cpp index 43488f09a..c8d7f7891 100644 --- a/src/ProcessTask.cpp +++ b/src/ProcessTask.cpp @@ -34,6 +34,7 @@ void ProcessTask::operator()() int err; uint8_t opcode; + /* D'oh, a 1-byte read.... TODO! */ err = ::read(sock, &opcode, 1); if (err <= 0) { diff --git a/src/ReadTask.cpp b/src/ReadTask.cpp index e69de29bb..8b006de9f 100644 --- a/src/ReadTask.cpp +++ b/src/ReadTask.cpp @@ -0,0 +1,49 @@ + +#include "ReadTask.h" + +using namespace std; + +namespace storagemanager +{ + +ReadTask::ReadTask(int sock, uint len) : PosixTask(sock, len) +{ +} + +ReadTask::~ReadTask() +{ +} + +void ReadTask::run() +{ + + // get the parameters + bool success; + uint8_t *buf = alloca(getLength()); + int boff = 0; + success = read(&buf, getLength()); + uint flen = *((uint *) &buf[0]) + boff += 4; + string filename(&buf[boff], flen); + boff += flen; + size_t count = *((size_t *) &buf[boff]); + boff += sizeof(size_t); + off_t offset = *((off_t *) &buf[boff]); + + // read from IOC, write to the socket + vector outbuf; + outbuf.resize(count + SM_HEADER_LEN); + uint32_t *outbuf32 = (uint32_t *) &outbuf[0]; + outbuf32[0] = SM_MSG_START; + outbuf32[1] = length; + + // do the reading and writing in chunks + // IOC->willRead(filename, offset, length); + // IOC->read(filename, offset, length, &outbuf[SM_HEADER_LEN]); + + write(outbuf); +} + + + +} diff --git a/src/ReadTask.h b/src/ReadTask.h index e69de29bb..6a742e3a7 100644 --- a/src/ReadTask.h +++ b/src/ReadTask.h @@ -0,0 +1,24 @@ + + +#ifndef READTASK_H_ +#define READTASH_H_ + +#include "PosixTask.h" + +namespace storagemanager +{ + +class ReadTask : PosixTask +{ + public: + ReadTask(int sock, uint length); + virtual ~ReadTask(); + + void run(); + + private: + ReadTask(); +}; + +} +#endif diff --git a/src/WriteTask.cpp b/src/WriteTask.cpp index e69de29bb..ce5f82875 100644 --- a/src/WriteTask.cpp +++ b/src/WriteTask.cpp @@ -0,0 +1,50 @@ + +#include "WriteTask.h" + +using namespace std; + +namespace storagemanager +{ + +WriteTask::WriteTask(int sock, uint len) : PosixTask(sock, len) +{ +} + +WriteTask::~WriteTask() +{ +} + +#define check_error(msg) \ + if (!success) \ + { \ + handleError(msg); \ + return; \ + } + +void WriteTask::run() +{ + bool success; + uint8_t cmdbuf[1024] = {0}; + + success = read(cmdbuf, sizeof(struct cmd_overlay)); + check_error("WriteTask read"); + cmd_overlay *cmd = cmdbuf; + + success = read(&cmdbuf[sizeof(*cmd)], min(cmd->filename_len, 1024 - sizeof(*cmd) - 1)); + check_error("WriteTask read"); + + size_t count = 0; + vector databuf; + databuf.resize(cmd->count); + + // todo: do this in chunks... + while (count < cmd->count) + { + success = read(&databuf[count], cmd->count - count); + check_error("WriteTask read data"); + count += cmd->count; + // IOC->write() + } +} + +} diff --git a/src/WriteTask.h b/src/WriteTask.h index e69de29bb..90b3d0c18 100644 --- a/src/WriteTask.h +++ b/src/WriteTask.h @@ -0,0 +1,31 @@ + + +#ifndef WRITETASK_H_ +#define WRITETASK_H_ + +#include "PosixTask.h" + +namespace storagemanager +{ + +class WriteTask : PosixTask +{ + public: + WriteTask(int sock, uint length); + virtual ~WriteTask(); + + void run(); + + private: + WriteTask(); + + struct cmd_overlay { + size_t count; + off_t offset; + uint filename_len; + char filename[]; + }; +}; + +} +#endif