diff --git a/utils/cloudio/SMComm.cpp b/utils/cloudio/SMComm.cpp index e69de29bb..40f95010c 100644 --- a/utils/cloudio/SMComm.cpp +++ b/utils/cloudio/SMComm.cpp @@ -0,0 +1,7 @@ +// copy licensing stuff here + + + +#include "SMComm.h" + + diff --git a/utils/cloudio/SMComm.h b/utils/cloudio/SMComm.h index 17b178ea3..357771c3d 100644 --- a/utils/cloudio/SMComm.h +++ b/utils/cloudio/SMComm.h @@ -8,7 +8,8 @@ namespace idbdatafile { class SMComm : public boost::noncopyable { public: - SMComm *get(); + // This is a singleton. Get it with get() + static SMComm *get(); /* Open currently returns a stat struct so SMDataFile can set its initial position, otherwise behaves how you'd think. */ @@ -42,13 +43,8 @@ class SMComm : public boost::noncopyable SMComm(); SocketPool sockets; - +}; } - -} - - - #endif diff --git a/utils/cloudio/SocketPool.cpp b/utils/cloudio/SocketPool.cpp new file mode 100644 index 000000000..d9ce374d8 --- /dev/null +++ b/utils/cloudio/SocketPool.cpp @@ -0,0 +1,185 @@ +// copy licensing stuff here + + + +#include "SocketPool.h" +#include "configcpp.h" +#include "logging.h" +#include "storage-manager/include/messageFormat.h" + +#include +#include + + +namespace +{ + +void log(logging::LOG_TYPE whichLogFile, const string& msg) +{ + logging::Logger logger(12); //12 = configcpp + logger.logMessage(whichLogFile, msg, logging::LoggingID(12)); +} + +} + +namespace idbdatafile +{ + +SocketPool::SocketPool() +{ + config::Config *config = config::Config::makeConfig(); // the most 'config' ever put into a single line of code? + string stmp; + int64_t itmp = 0; + + try + { + stmp = config->getConfig("StorageManager", "MaxSockets"); + itmp = strtol(stmp.c_str(), NULL, 10); + if (itmp > 500 || itmp < 1) { + string errmsg = "SocketPool(): Got a bad value '" + stmp + "' for StorageManager/MaxSockets."; + log(logging::CRITICAL, errmsg); + throw runtime_error(errmsg); + maxSockets = itmp; + } + catch (exception &e) + { + ostringstream os; + os << "SocketPool(): Using default of " << defaultSockets << "."; + log(logging::CRITICAL, os.str()); + maxSockets = defaultSockets; + } + clientSocket = ::socket(AF_UNIX, SOCK_STREAM, 0); + if (clientSocket < 0) { + char buf[80], *ptr; + ptr = strerror_r(errno, buf, 80); + throw runtime_error("SocketPool(): Failed to get clientSocket, got " + string(ptr)); + } +} + +SocketPool::~SocketPool() +{ + boost::mutex::scoped_lock(mutex); + + for (int i = 0; i < allSockets.size(); i++) + ::close(allSockets[i]); + ::close(clientSocket) +} + +int SocketPool::send_recv(const messageqcpp::ByteStream &in, messageqcpp::ByteStream *out) +{ + uint count = 0; + uint length = in.length(); + int sock = getSocket(); + uint8_t inbuf* = in.buf(); + int err = 0; + + ::write(sock, &storagemanager::SM_MSG_START, sizeof(storagemanager::SM_MSG_START)); + ::write(sock, &length, sizeof(length)); + while (count < length) + { + err = ::write(sock, &inbuf[count], length-count); + if (err < 0) + { + returnSocket(sock); + return -1; + } + count += err; + } + + out->restart(); + uint8_t *outbuf; + uint8_t window[8192]; + bool foundheader = false; + length = 0; + uint remainingBytes = 0; + int i; + while (!foundheader) + { + // here remainingBytes means the # of bytes from the previous message + err = ::read(sock, &window[remainingBytes], 8192 - remainingBytes); + if (err < 0) + { + returnSocket(sock); + return -1; + } + uint endOfData = remainingBytes + err; // for clarity + + // scan for the 8-byte header. If it is fragmented, move the fragment to the front of the buffer + // for the next iteration to handle. + for (i = 0; i <= endOfData - 8 && !foundHeader; i++) + { + if (*((uint *) &window[i]) == storagemanager::SM_MSG_START) + { + length = *((uint *) &window[i+4]); + foundHeader = true; + } + } + + if (!foundHeader) + { + remainingBytes = endOfData - i; + if (i != 0) + memmove(window, &window[i], remainingBytes); + else + continue; // if i == 0, then the read was too short to see the whole header, do another read(). + } + else + { + out->needAtLeast(length); + outbuf = out->buf(); + memcpy(outbuf, &window[i+8], endOfData - (i+8)); + remainingBytes = length - (endOfData - (i+8)); // remainingBytes is now the # of bytes left to read + } + } + + // read the rest of the payload + while (remainingBytes > 0) + { + err = ::read(sock, &outbuf[length - remainingBytes], remainingBytes); + if (err < 0) + { + returnSocket(sock); + return -1; + } + remainingBytes -= err; + } + + returnSocket(sock); + return; +} + +int SocketPool::getSocket() +{ + boost::mutex::scoped_lock lock(mutex); + int ret; + + if (freeSockets.size() == 0 && allSockets.size() < maxSockets) + { + // make a new connection + struct sockaddr_un addr; + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strcpy(&addr.sun_path[1], "storagemanager"); // abstract socket; this is intentional + ret = ::connect(tmp, &addr, sizeof(addr)); + if (ret >= 0) + allSockets.push_back(ret); + return ret; + } + + // wait for a socket to become free + while (freeSockets.size() == 0) + socketAvailable.wait(lock); + + ret = freeSockets.front(); + freeSockets.pop_front(); + return ret; +} + +void SocketPool::returnSocket(int sock) +{ + boost::mutex::scoped_lock lock(mutex); + freeSockets.push_back(sock); + socketAvailable.notify_one(); +} + +} diff --git a/utils/cloudio/SocketPool.h b/utils/cloudio/SocketPool.h new file mode 100644 index 000000000..a584fa1b4 --- /dev/null +++ b/utils/cloudio/SocketPool.h @@ -0,0 +1,42 @@ +// copy licensing stuff here + +#ifndef _SOCKETPOOL_H_ +#define _SOCKETPOOL_H_ + +#include +#include + +#include "bytestream.h" + +namespace idbdatafile +{ + +class SocketPool : public boost::noncopyable +{ + public: + SocketPool(); + + // the dtor will immediately close all sockets + virtual ~SocketPool(); + + // 0 = success, -1 = failure. Should this throw instead? + int send_recv(const ByteStream &to_send, ByteStream *to_recv); + + private: + int getSocket(); + void returnSocket(); + + std::vector allSockets; + std::deque freeSockets; + boost::mutex mutex; + boost::condition socketAvailable; + int clientSocket; + int maxSockets; + static const int defaultSockets = 20; +}; + +} + + + +#endif diff --git a/utils/cloudio/sm_exceptions.h b/utils/cloudio/sm_exceptions.h index 15231eafe..daa41851f 100644 --- a/utils/cloudio/sm_exceptions.h +++ b/utils/cloudio/sm_exceptions.h @@ -3,15 +3,42 @@ #include +namespace idbdatafile +{ + class NotImplementedYet : public std::exception { public: NotImplementedYet(const std::string &s); }; -NotImplementedYes::NotImplementedYet(const std::string &s) : +class FailedToSend : public std::runtime_error +{ + public: + FailedToSend(const std::string &s); +}; + +class FailedToRecv : public std::runtime_error +{ + public: + FailedToRecv(const std::string &s); +}; + + +NotImplementedYet::NotImplementedYet(const std::string &s) : std::exception(s + "() isn't implemented yet.") { } +FailedToSend::FailedToSend(const std::string &s) : + std::runtime_error(s) +{ +} + +FailedToRecv::FailedToRecv(const std::string &s) : + std::runtime_error(s) +{ +} + +} #endif diff --git a/utils/cloudio/storage-manager b/utils/cloudio/storage-manager index 20b0d3b88..50daf50b8 160000 --- a/utils/cloudio/storage-manager +++ b/utils/cloudio/storage-manager @@ -1 +1 @@ -Subproject commit 20b0d3b88b23dc018ccd4f4b64ddddc9df447798 +Subproject commit 50daf50b82a58edc3643448fe2b7f2067f0477cd