diff --git a/CMakeLists.txt b/CMakeLists.txt index fad1c5723..88ad2d5dd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,9 +1,28 @@ -include_directories(storage-manager/include) +cmake_minimum_required(VERSION 2.8.12) -set(storagemanager_SRCS main.cpp SessionManager.cpp) +include_directories(include) + +set(storagemanager_SRCS + src/AppendTask.cpp + src/ClientRequestProcessor.cpp + src/ListDirectoryTask.cpp + src/OpenTask.cpp + src/PingTask.cpp + src/PosixTask.cpp + src/ProcessTask.cpp + src/ReadTask.cpp + src/StatTask.cpp + src/ThreadPool.cpp + src/TruncateTask.cpp + src/UnlinkTask.cpp + src/WriteTask.cpp + main.cpp +# SessionManager.cpp +) add_executable(StorageManager ${storagemanager_SRCS}) +target_link_libraries(StorageManager boost_system boost_thread) -//install(TARGETS StorageManager DESTINATION ${ENGINE_BINDIR} COMPONENT platform) +#install(TARGETS StorageManager DESTINATION ${ENGINE_BINDIR} COMPONENT platform) diff --git a/src/AppendTask.cpp b/src/AppendTask.cpp index 4db964b1f..df10b6509 100644 --- a/src/AppendTask.cpp +++ b/src/AppendTask.cpp @@ -1,5 +1,6 @@ #include "AppendTask.h" +#include using namespace std; @@ -17,9 +18,11 @@ AppendTask::~AppendTask() #define check_error(msg) \ if (!success) \ { \ - handleError(msg); \ + handleError(msg, errno); \ return; \ } + +#define min(x, y) (x < y ? x : y) void AppendTask::run() { @@ -28,7 +31,7 @@ void AppendTask::run() success = read(cmdbuf, sizeof(struct cmd_overlay)); check_error("AppendTask read"); - cmd_overlay *cmd = cmdbuf; + cmd_overlay *cmd = (cmd_overlay *) cmdbuf; success = read(&cmdbuf[sizeof(*cmd)], min(cmd->filename_len, 1024 - sizeof(*cmd) - 1)); check_error("AppendTask read"); diff --git a/src/AppendTask.h b/src/AppendTask.h index ff9502ea2..cfcdae25f 100644 --- a/src/AppendTask.h +++ b/src/AppendTask.h @@ -8,7 +8,7 @@ namespace storagemanager { -class AppendTask : PosixTask +class AppendTask : public PosixTask { public: AppendTask(int sock, uint length); diff --git a/src/ClientRequestProcessor.cpp b/src/ClientRequestProcessor.cpp index 9138e341e..44b33e1c0 100644 --- a/src/ClientRequestProcessor.cpp +++ b/src/ClientRequestProcessor.cpp @@ -1,6 +1,8 @@ #include "ClientRequestProcessor.h" +#include "ProcessTask.h" +#include namespace storagemanager { @@ -13,9 +15,9 @@ ClientRequestProcessor::~ClientRequestProcessor() { } -ClientRequestProcessor::processRequest(int sock) +void ClientRequestProcessor::processRequest(int sock, uint len) { - ProcessTask t(sock); + ProcessTask t(sock, len); threadPool.addJob(t); } diff --git a/src/ClientRequestProcessor.h b/src/ClientRequestProcessor.h index 30fcf86eb..68fd38475 100644 --- a/src/ClientRequestProcessor.h +++ b/src/ClientRequestProcessor.h @@ -1,5 +1,9 @@ +#ifndef CLIENTREQUESTPROCESSOR_H_ +#define CLIENTREQUESTPROCESSOR_H_ + #include "ThreadPool.h" +#include namespace storagemanager @@ -11,12 +15,12 @@ class ClientRequestProcessor : public boost::noncopyable ClientRequestProcessor(); virtual ~ClientRequestProcessor(); - void processRequest(int sock); + void processRequest(int sock, uint len); private: ThreadPool threadPool; - - }; } + +#endif diff --git a/src/ListDirectoryTask.cpp b/src/ListDirectoryTask.cpp index e79cc2c49..8c1684b6c 100644 --- a/src/ListDirectoryTask.cpp +++ b/src/ListDirectoryTask.cpp @@ -1,5 +1,8 @@ #include "ListDirectoryTask.h" +#include "messageFormat.h" +#include +#include using namespace std; @@ -27,8 +30,10 @@ ListDirectoryTask::~ListDirectoryTask() handleError(msg, errno); \ return false; \ } + +#define min(x, y) (x < y ? x : y) -bool ListDirectoryTask::writeString(uint8_t buf, int *offset, int size, const string &str) +bool ListDirectoryTask::writeString(uint8_t *buf, int *offset, int size, const string &str) { bool success; if (size - *offset < 4) // eh, let's not frag 4 bytes. @@ -85,10 +90,10 @@ void ListDirectoryTask::run() buf32[0] = SM_MSG_START; buf32[1] = payloadLen; - int offset = 8; + int offset = SM_HEADER_LEN; for (uint i = 0; i < listing.size(); i++) { - success = writeString(buf, &offset, 1024, listing); + success = writeString(buf, &offset, 1024, listing[i]); check_error("ListDirectoryTask write"); } diff --git a/src/ListDirectoryTask.h b/src/ListDirectoryTask.h index 19e6e2eb5..8a99ad755 100644 --- a/src/ListDirectoryTask.h +++ b/src/ListDirectoryTask.h @@ -3,6 +3,7 @@ #define LIST_DIRECTORYTASK_H_ #include "PosixTask.h" +#include namespace storagemanager { @@ -18,7 +19,7 @@ class ListDirectoryTask : public PosixTask private: ListDirectoryTask(); - void writeString(uint8_t buf, int *offset, int size, const std::string &str); + bool writeString(uint8_t *buf, int *offset, int size, const std::string &str); struct cmd_overlay { uint plen; char path[]; diff --git a/src/OpenTask.cpp b/src/OpenTask.cpp index b3002a0fb..077ba5b62 100644 --- a/src/OpenTask.cpp +++ b/src/OpenTask.cpp @@ -1,7 +1,10 @@ #include "OpenTask.h" +#include "messageFormat.h" #include +#include +#include using namespace std; @@ -12,6 +15,10 @@ OpenTask::OpenTask(int sock, uint len) : PosixTask(sock, len) { } +OpenTask::~OpenTask() +{ +} + void OpenTask::run() { /* @@ -20,16 +27,15 @@ void OpenTask::run() return the result */ bool success; - uint8_t *buf; + uint8_t buf[1024] = {0}; - if (getLength() > 1024) + if (getLength() > 1023) { handleError("OpenTask read", ENAMETOOLONG); return; } - buf = alloca(getLength()); - success = read(&buf, getLength()); + success = read(buf, getLength()); if (!success) { handleError("OpenTask read", errno); @@ -38,12 +44,13 @@ void OpenTask::run() cmd_overlay *cmd = (cmd_overlay *) buf; - // IOC->open(filename, openmode, &buf[SM_HEADER_LEN]) + // IOC->open(cmd->filename, cmd->openmode, &buf[SM_HEADER_LEN]) // stand-in dummy response uint32_t *buf32 = (uint32_t *) buf; buf32[0] = SM_MSG_START; buf32[1] = sizeof(struct stat); + memset(&buf[SM_HEADER_LEN], 0, sizeof(struct stat)); success = write(buf, sizeof(struct stat) + SM_HEADER_LEN); if (!success) handleError("OpenTask write", errno); diff --git a/src/PingTask.cpp b/src/PingTask.cpp index 4be407096..a2cb6b4f7 100644 --- a/src/PingTask.cpp +++ b/src/PingTask.cpp @@ -1,5 +1,6 @@ #include "PingTask.h" +#include "messageFormat.h" namespace storagemanager { @@ -18,3 +19,5 @@ void PingTask::run() uint32_t buf[3] = { SM_MSG_START, 4, 0 }; // generic success response write((uint8_t *) buf, 12); } + +} diff --git a/src/PingTask.h b/src/PingTask.h index 52aed4c2a..24f836ad4 100644 --- a/src/PingTask.h +++ b/src/PingTask.h @@ -8,7 +8,7 @@ namespace storagemanager { -class PingTask : PosixTask +class PingTask : public PosixTask { public: PingTask(int sock, uint length); diff --git a/src/PosixTask.cpp b/src/PosixTask.cpp index a82dda7d8..813acb289 100644 --- a/src/PosixTask.cpp +++ b/src/PosixTask.cpp @@ -1,7 +1,14 @@ #include "PosixTask.h" +#include "messageFormat.h" +#include #include #include +#include + +#define min(x, y) (x < y ? x : y) + +using namespace std; namespace storagemanager { @@ -19,7 +26,7 @@ PosixTask::PosixTask(int _sock, uint _length) : PosixTask::~PosixTask() { - comsumeMsg(); + consumeMsg(); if (!socketReturned) returnSocket(); } @@ -29,12 +36,12 @@ void PosixTask::handleError(const char *name, int errCode) char buf[80]; // send an error response if possible - int32_t *buf32; + int32_t *buf32 = (int32_t *) buf; buf32[0] = SM_MSG_START; buf32[1] = 8; buf32[2] = -1; buf32[3] = errCode; - write(buf32, 16); + write((uint8_t *) buf, 16); // TODO: construct and log a message cout << name << " caught an error reading from a socket: " << strerror_r(errCode, buf, 80) << endl; @@ -66,7 +73,6 @@ bool PosixTask::read(uint8_t *buf, uint length) { if (length > remainingLengthForCaller) length = remainingLengthForCaller; - if (length == 0) return false; @@ -124,13 +130,13 @@ void PosixTask::primeBuffer() { // debating whether it is more efficient to use a circular buffer + more // recv's, or to move data to reduce the # of recv's. WAG: moving data. - memmove(buffer, &buffer[bufferPos], bufferLen - bufferPos); + memmove(localBuffer, &localBuffer[bufferPos], bufferLen - bufferPos); bufferLen -= bufferPos; bufferPos = 0; } uint toRead = min(remainingLengthInStream, bufferSize - bufferLen); - err = ::recv(sock, &localBuffer[bufferLen], toRead, MSG_NOBLOCK); + int err = ::recv(sock, &localBuffer[bufferLen], toRead, MSG_DONTWAIT); // ignoring errors here since this is supposed to be silent. // errors will be caught by the next read if (err > 0) @@ -141,14 +147,14 @@ void PosixTask::primeBuffer() } } -bool PosixTask::write(uint8_t *buf, uint len) +bool PosixTask::write(const uint8_t *buf, uint len) { int err; uint count = 0; while (count < len) { - err = ::write(sock, &buf[count], len - count); + err = ::send(sock, &buf[count], len - count, 0); if (err < 0) return false; count += err; @@ -172,7 +178,7 @@ void PosixTask::consumeMsg() while (remainingLengthInStream > 0) { - err = ::read(sock, buf, min(remainingLengthInStream, 1024)); + err = ::recv(sock, buf, min(remainingLengthInStream, 1024), 0); if (err <= 0) { remainingLengthInStream = 0; break; diff --git a/src/PosixTask.h b/src/PosixTask.h index 0f0b61e65..53af31904 100644 --- a/src/PosixTask.h +++ b/src/PosixTask.h @@ -4,6 +4,11 @@ #define POSIX_TASK_H_ #include +#include +#include + +namespace storagemanager +{ class PosixTask { @@ -15,9 +20,9 @@ class PosixTask void primeBuffer(); protected: - int read(uint8_t *buf, uint length); + bool read(uint8_t *buf, uint length); bool write(const std::vector &buf); - bool write(void *buf, uint length); + bool write(const uint8_t *buf, uint length); void consumeMsg(); // drains the remaining portion of the message uint getLength(); // returns the total length of the msg uint getRemainingLength(); // returns the remaining length from the caller's perspective @@ -39,9 +44,5 @@ class PosixTask bool socketReturned; }; - - - - - +} #endif diff --git a/src/ProcessTask.cpp b/src/ProcessTask.cpp index ac8488694..94239e373 100644 --- a/src/ProcessTask.cpp +++ b/src/ProcessTask.cpp @@ -1,24 +1,40 @@ #include "ProcessTask.h" #include +#include +#include "messageFormat.h" + +#include "AppendTask.h" +#include "ListDirectoryTask.h" +#include "OpenTask.h" +#include "PingTask.h" +#include "ReadTask.h" +#include "StatTask.h" +#include "TruncateTask.h" +#include "UnlinkTask.h" +#include "WriteTask.h" using namespace std; namespace storagemanager { -ProcessTask::ProcessTask(int _sock, uint _length) : sock(_sock), length(_length) +ProcessTask::ProcessTask(int _sock, uint _length) : sock(_sock), length(_length), returnedSock(false) { assert(length > 0); } ProcessTask::~ProcessTask() { + if (!returnedSock) + ; // SM->returnSocket(sock); } void ProcessTask::handleError(int saved_errno) { // return sock to SessionManager + // SM->socketError(sock); + returnedSock = true; char buf[80]; cout << "ProcessTask: got an error during a socket read: " << strerror_r(saved_errno, buf, 80) << endl; } diff --git a/src/ProcessTask.h b/src/ProcessTask.h index f398ea041..3fb5dc6a9 100644 --- a/src/ProcessTask.h +++ b/src/ProcessTask.h @@ -3,10 +3,12 @@ #ifndef PROCESS_TASK_H_ #define PROCESS_TASK_H_ +#include "ThreadPool.h" + namespace storagemanager { -class ProcessTask +class ProcessTask : public ThreadPool::Job { public: ProcessTask(int sock, uint length); // _sock is the socket to read from @@ -17,9 +19,10 @@ class ProcessTask private: ProcessTask(); - void handleError(); + void handleError(int errCode); int sock; uint length; + bool returnedSock; }; diff --git a/src/ReadTask.cpp b/src/ReadTask.cpp index 687200f78..1fc9582de 100644 --- a/src/ReadTask.cpp +++ b/src/ReadTask.cpp @@ -1,5 +1,7 @@ #include "ReadTask.h" +#include "messageFormat.h" +#include using namespace std; @@ -16,24 +18,24 @@ ReadTask::~ReadTask() void ReadTask::run() { + uint8_t buf[1024] = {0}; // get the parameters - if (getLength() > 1024) { + if (getLength() > 1023) { handleError("ReadTask read", EFAULT); return; } bool success; - uint8_t *buf = alloca(getLength()); - success = read(&buf, getLength()); + success = read(buf, getLength()); cmd_overlay *cmd = (cmd_overlay *) buf; // read from IOC, write to the socket vector outbuf; - outbuf.resize(count + SM_HEADER_LEN); + outbuf.resize(cmd->count + SM_HEADER_LEN); uint32_t *outbuf32 = (uint32_t *) &outbuf[0]; outbuf32[0] = SM_MSG_START; - outbuf32[1] = length; + outbuf32[1] = cmd->count; // do the reading and writing in chunks // IOC->willRead(filename, offset, length); diff --git a/src/ReadTask.h b/src/ReadTask.h index 93b4e0dee..d388c42cc 100644 --- a/src/ReadTask.h +++ b/src/ReadTask.h @@ -8,7 +8,7 @@ namespace storagemanager { -class ReadTask : PosixTask +class ReadTask : public PosixTask { public: ReadTask(int sock, uint length); diff --git a/src/StatTask.cpp b/src/StatTask.cpp index 552f73d39..cbe7d2675 100644 --- a/src/StatTask.cpp +++ b/src/StatTask.cpp @@ -1,5 +1,11 @@ #include "StatTask.h" +#include "messageFormat.h" +#include +#include +#include +#include +#include using namespace std; @@ -14,6 +20,13 @@ StatTask::~StatTask() { } +#define check_error(msg) \ + if (!success) \ + { \ + handleError(msg, errno); \ + return; \ + } + void StatTask::run() { bool success; @@ -39,3 +52,6 @@ void StatTask::run() memcpy(&buf[SM_HEADER_LEN], &_stat, sizeof(_stat)); write(buf, SM_HEADER_LEN + sizeof(_stat)); } + +} + diff --git a/src/ThreadPool.cpp b/src/ThreadPool.cpp index 4bc7c0b33..c87817839 100644 --- a/src/ThreadPool.cpp +++ b/src/ThreadPool.cpp @@ -2,7 +2,7 @@ #include "ThreadPool.h" -#include +#include using namespace std; @@ -22,7 +22,7 @@ ThreadPool::ThreadPool(uint num_threads) : maxThreads(num_threads), die(false), ThreadPool::~ThreadPool() { - boost::mutex::unique_lock s(m); + boost::mutex::scoped_lock s(m); die = true; jobs.clear(); jobAvailable.notify_all(); @@ -34,12 +34,11 @@ ThreadPool::~ThreadPool() void ThreadPool::addJob(Job &j) { - - boost::mutex::unique_lock s(m); + boost::mutex::scoped_lock s(m); jobs.push_back(j); // Start another thread if necessary if (threadsWaiting == 0 && threads.size() < maxThreads) { - boost::scoped_ptr thread(new boost::thread(processingLoop)); + boost::shared_ptr thread(new boost::thread(Runner(this))); threads.push_back(thread); } else @@ -48,7 +47,7 @@ void ThreadPool::addJob(Job &j) void ThreadPool::processingLoop() { - boost::mutex::scoped_lock s(m, boost::defer_lock_t); + boost::mutex::scoped_lock s(m, boost::defer_lock); while (!die) { diff --git a/src/ThreadPool.h b/src/ThreadPool.h index 9dc9487e5..c386a9484 100644 --- a/src/ThreadPool.h +++ b/src/ThreadPool.h @@ -5,31 +5,36 @@ #include #include #include -//#include +#include #include namespace storagemanager { -typedef boost::function0 Job; - class ThreadPool : public boost::noncopyable { public: ThreadPool(); // this ctor uses an 'unbounded' # of threads; relies on the context for a natural max. ThreadPool(uint num_threads); // this ctor lets caller specify a limit. - virtual ~Threadpool(); + virtual ~ThreadPool(); - // doesn't block + // addJob doesn't block + typedef boost::function0 Job; void addJob(Job &j); private: + struct Runner { + Runner(ThreadPool *t) : tp(t) { } + void operator()() { tp->processingLoop(); } + ThreadPool *tp; + }; + void processingLoop(); // the fcn run by each thread uint maxThreads; bool die; int threadsWaiting; - std::vector > threads; + std::vector > threads; boost::condition jobAvailable; std::deque jobs; boost::mutex m; diff --git a/src/TruncateTask.cpp b/src/TruncateTask.cpp index 6edfa0658..0a504d4c7 100644 --- a/src/TruncateTask.cpp +++ b/src/TruncateTask.cpp @@ -1,5 +1,7 @@ #include "TruncateTask.h" +#include +#include "messageFormat.h" using namespace std; @@ -14,6 +16,13 @@ TruncateTask::~TruncateTask() { } +#define check_error(msg) \ + if (!success) \ + { \ + handleError(msg, errno); \ + return; \ + } + void TruncateTask::run() { bool success; @@ -31,9 +40,11 @@ void TruncateTask::run() // IOC->truncate(cmd->filename, cmd->newSize); // generic success msg - uint32_t *buf32 = buf; + uint32_t *buf32 = (uint32_t *) buf; buf32[0] = SM_MSG_START; buf32[1] = 4; buf32[2] = 0; write(buf, 12); } + +} diff --git a/src/UnlinkTask.cpp b/src/UnlinkTask.cpp index c88f6dad9..71ca5ade8 100644 --- a/src/UnlinkTask.cpp +++ b/src/UnlinkTask.cpp @@ -1,5 +1,7 @@ #include "UnlinkTask.h" +#include +#include "messageFormat.h" using namespace std; @@ -14,6 +16,14 @@ UnlinkTask::~UnlinkTask() { } +#define check_error(msg) \ + if (!success) \ + { \ + handleError(msg, errno); \ + return; \ + } + + void UnlinkTask::run() { bool success; @@ -31,9 +41,11 @@ void UnlinkTask::run() // IOC->unlink(cmd->filename); // generic success msg - uint32_t *buf32 = buf; + uint32_t *buf32 = (uint32_t *) buf; buf32[0] = SM_MSG_START; buf32[1] = 4; buf32[2] = 0; write(buf, 12); } + +} diff --git a/src/WriteTask.cpp b/src/WriteTask.cpp index 5820a6de4..5c66ed51e 100644 --- a/src/WriteTask.cpp +++ b/src/WriteTask.cpp @@ -1,5 +1,6 @@ #include "WriteTask.h" +#include using namespace std; @@ -21,6 +22,8 @@ WriteTask::~WriteTask() return; \ } +#define min(x, y) (x < y ? x : y) + void WriteTask::run() { bool success; @@ -28,7 +31,7 @@ void WriteTask::run() success = read(cmdbuf, sizeof(struct cmd_overlay)); check_error("WriteTask read"); - cmd_overlay *cmd = cmdbuf; + cmd_overlay *cmd = (cmd_overlay *) cmdbuf; success = read(&cmdbuf[sizeof(*cmd)], min(cmd->filename_len, 1024 - sizeof(*cmd) - 1)); check_error("WriteTask read"); diff --git a/src/WriteTask.h b/src/WriteTask.h index 90b3d0c18..e00243b5b 100644 --- a/src/WriteTask.h +++ b/src/WriteTask.h @@ -8,7 +8,7 @@ namespace storagemanager { -class WriteTask : PosixTask +class WriteTask : public PosixTask { public: WriteTask(int sock, uint length);