diff --git a/oam/etc/Columnstore.xml b/oam/etc/Columnstore.xml index b5610bf11..5a24c8630 100644 --- a/oam/etc/Columnstore.xml +++ b/oam/etc/Columnstore.xml @@ -547,5 +547,8 @@ 127.0.0.1 0 + + 30 + diff --git a/utils/cloudio/SMComm.cpp b/utils/cloudio/SMComm.cpp index 40f95010c..ff5512810 100644 --- a/utils/cloudio/SMComm.cpp +++ b/utils/cloudio/SMComm.cpp @@ -3,5 +3,205 @@ #include "SMComm.h" +#include "bytestream.h" +#include +using namespace std; +using namespace messageqcpp; +namespace +{ +SMComm *instance = NULL; +boost::mutex m; +} + +namespace idbdatafile +{ + +static SMComm * SMComm::get() +{ + if (instance) + return instance; + + boost::mutex::scoped_lock sl(m); + + if (instance) + return instance; + instance = new SMComm(); + return instance; +} + +// timesavers +#define common_exit(bs1, bs2, retCode) \ + { \ + buffers.returnByteStream(bs1); \ + buffers.returnByteStream(bs2); \ + return retCode; \ + } + +// bs1 is the bytestream ptr with the command to SMComm. +// bs2 is the bytestream pointer with the response from SMComm. +// retCode is the var to store the return code in from the msg. +// returns with the output pointer at the fcn-specific data +#define check_for_error(bs1, bs2, retCode) \ + { \ + int l_errno; \ + *bs2 >> retCode; \ + if (retCode < 0) \ + { \ + *bs2 >> l_errno; \ + errno = l_errno; \ + common_exit(bs1, bs2, retCode); \ + } \ + } + +int SMComm::open(const string &filename, int mode, struct stat *statbuf) +{ + ByteStream *command = buffers.getByteStream(); + ByteStream *response = buffers.getByteStream(); + int err; + + command << storagemanager::OPEN << filename << mode; + err = sockets.send_recv(*command, response); + if (err) + common_exit(command, response, err); + + check_for_error(command, response, err); + memcpy(statbuf, response->buf(), sizeof(*statbuf)); + common_exit(command, response, err); +} + +ssize_t SMComm::pread(const string &filename, const void *buf, size_t count, off_t offset) +{ + ByteStream *command = buffers.getByteStream(); + ByteStream *response = buffers.getByteStream(); + int err; + + command << storagemanager::READ << filename << count << offset; + err = sockets.send_recv(*command, response); + if (err) + common_exit(command, response, err); + check_for_error(command, response, err); + + memcpy(buf, response->buf(), err); + common_exit(command, response, err); +} + +ssize_t SMComm::pwrite(const string &filename, const void *buf, size_t count, off_t offset) +{ + ByteStream *command = buffers.getByteStream(); + ByteStream *response = buffers.getByteStream(); + int err; + + command << storagemanager::WRITE << filename << count << offset; + command.needAtLeast(count); + uint8_t *cmdBuf = command->getInputPtr(); + memcpy(cmdBuf, buf, count); + command->advanceInputPtr(count); + err = sockets.send_recv(*command, response); + if (err) + common_exit(command, response, err); + check_for_error(command, response, err); + common_exit(command, response, err); +} + +ssize_t SMComm::append(const string &filename, const void *buf, size_t count) +{ + ByteStream *command = buffers.getByteStream(); + ByteStream *response = buffers.getByteStream(); + int err; + + command << storagemanager::APPEND << filename << count; + command.needAtLeast(count); + uint8_t *cmdBuf = command->getInputPtr(); + memcpy(cmdBuf, buf, count); + command->advanceInputPtr(count); + err = sockets.send_recv(*command, response); + if (err) + common_exit(command, response, err); + check_for_error(command, response, err); + common_exit(command, response, err); +} + +int SMComm::unlink(const string &filename) +{ + ByteStream *command = buffers.getByteStream(); + ByteStream *response = buffers.getByteStream(); + int err; + + command << storagemanager::UNLINK << filename; + err = sockets.send_recv(*command, response); + if (err) + common_exit(command, response, err); + check_for_error(command, response, err); + common_exit(command, response, err); +} + +int SMComm::stat(const string &filename, struct stat *statbuf) +{ + ByteStream *command = buffers.getByteStream(); + ByteStream *response = buffers.getByteStream(); + int err; + + command << storageManager::STAT << filename; + err = sockets.send_recv(*command, response); + if (err) + common_exit(command, response, err); + check_for_error(command, response, err); + + memcpy(statbuf, response->buf(), sizeof(*statbuf)); + common_exit(command, response, err); +} + +int SMComm::truncate(const string &filename, off64_t length) +{ + ByteStream *command = buffers.getByteStream(); + ByteStream *response = buffers.getByteStream(); + int err; + + command << storagemanager::TRUNCATE << filename << length; + err = sockets.send_recv(*command, response); + if (err) + common_exit(command, response, err); + check_for_error(command, response, err); + common_exit(command, response, err); +} + +int SMComm::listDirectory(const string &path, list *entries) +{ + ByteStream *command = buffers.getByteStream(); + ByteStream *response = buffers.getByteStream(); + int err; + + command << storagemanager::LIST_DIRECTORY << path; + err = sockets.send_recv(*command, response); + if (err) + common_exit(command, response, err); + check_for_error(command, response, err); + + uint32_t numElements; + string stmp; + entries->clear(); + *response >> numElements; + while (numElements > 0) { + *response >> stmp; + entries->push_back(stmp); + } + common_exit(command, response, err); +} + +int SMComm::ping() +{ + ByteStream *command = buffers.getByteStream(); + ByteStream *response = buffers.getByteStream(); + int err; + + command << storagemanager::PING << filename << length; + err = sockets.send_recv(*command, response); + if (err) + common_exit(command, response, err); + check_for_error(command, response, err); + common_exit(command, response, err); +} + +} diff --git a/utils/cloudio/SMComm.h b/utils/cloudio/SMComm.h index 357771c3d..35fd2c05c 100644 --- a/utils/cloudio/SMComm.h +++ b/utils/cloudio/SMComm.h @@ -3,6 +3,11 @@ #ifndef SMCOMM_H_ #define SMCOMM_H_ +#include +#include +#include "SocketPool.h" +#include "bytestream.h" + namespace idbdatafile { class SMComm : public boost::noncopyable @@ -20,7 +25,7 @@ class SMComm : public boost::noncopyable ssize_t pwrite(const std::string &filename, const void *buf, size_t count, off_t offset); /* append exists for cases where the file is open in append mode. A normal write won't work - because the file position may be out of date if there are multiple writers. */ + because the file position/size may be out of date if there are multiple writers. */ ssize_t append(const std::string &filename, const void *buf, size_t count); int unlink(const std::string &filename); @@ -43,6 +48,7 @@ class SMComm : public boost::noncopyable SMComm(); SocketPool sockets; + ByteStreamPool buffers; }; } diff --git a/utils/cloudio/SocketPool.cpp b/utils/cloudio/SocketPool.cpp index d9ce374d8..d8d290daa 100644 --- a/utils/cloudio/SocketPool.cpp +++ b/utils/cloudio/SocketPool.cpp @@ -10,6 +10,7 @@ #include #include +using namespace std; namespace { @@ -60,8 +61,11 @@ SocketPool::~SocketPool() { boost::mutex::scoped_lock(mutex); - for (int i = 0; i < allSockets.size(); i++) - ::close(allSockets[i]); + while (!allSockets.empty()) { + int next = allSockets.front(); + allSockets.pop_front(); + ::close(next); + } ::close(clientSocket) } @@ -145,7 +149,7 @@ int SocketPool::send_recv(const messageqcpp::ByteStream &in, messageqcpp::ByteSt } returnSocket(sock); - return; + return 0; } int SocketPool::getSocket() @@ -159,7 +163,7 @@ int SocketPool::getSocket() struct sockaddr_un addr; memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; - strcpy(&addr.sun_path[1], "storagemanager"); // abstract socket; this is intentional + strcpy(&addr.sun_path[0], storagemanager::socket_name); ret = ::connect(tmp, &addr, sizeof(addr)); if (ret >= 0) allSockets.push_back(ret); diff --git a/utils/cloudio/SocketPool.h b/utils/cloudio/SocketPool.h index a584fa1b4..df6ba7fe3 100644 --- a/utils/cloudio/SocketPool.h +++ b/utils/cloudio/SocketPool.h @@ -11,6 +11,7 @@ namespace idbdatafile { +/* This should be renamed; it's more than a pool, it also does the low-level communication. TBD. */ class SocketPool : public boost::noncopyable { public: