diff --git a/oam/etc/Columnstore.xml.singleserver b/oam/etc/Columnstore.xml.singleserver index 0c291579a..b5f4f8d77 100644 --- a/oam/etc/Columnstore.xml.singleserver +++ b/oam/etc/Columnstore.xml.singleserver @@ -538,4 +538,7 @@ 127.0.0.1 0 + + 30 + diff --git a/utils/CMakeLists.txt b/utils/CMakeLists.txt index c4486ddd5..405241232 100644 --- a/utils/CMakeLists.txt +++ b/utils/CMakeLists.txt @@ -26,4 +26,5 @@ add_subdirectory(querytele) add_subdirectory(clusterTester) add_subdirectory(libmysql_client) add_subdirectory(regr) +add_subdirectory(cloudio) diff --git a/utils/cloudio/CMakeLists.txt b/utils/cloudio/CMakeLists.txt new file mode 100644 index 000000000..3fc10197a --- /dev/null +++ b/utils/cloudio/CMakeLists.txt @@ -0,0 +1,10 @@ +include_directories(${ENGINE_COMMON_INCLUDES} storage-manager/include) + +set(cloudio_LIB_SRCS SMComm.cpp SMDataFile.cpp SMFileFactory.cpp SMFileSystem.cpp SocketPool.cpp) + +add_library(cloudio SHARED ${cloudio_LIB_SRCS}) + +set_target_properties(cloudio PROPERTIES VERSION 1.0.0 SOVERSION 1) + +install(TARGETS cloudio DESTINATION ${ENGINE_LIBDIR} COMPONENT libs) + diff --git a/utils/cloudio/SMComm.cpp b/utils/cloudio/SMComm.cpp index ff5512810..59314cc52 100644 --- a/utils/cloudio/SMComm.cpp +++ b/utils/cloudio/SMComm.cpp @@ -1,24 +1,36 @@ -// copy licensing stuff here +/* Copyright (C) 2019 MariaDB Corporaton + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ #include "SMComm.h" -#include "bytestream.h" -#include +#include "messageFormat.h" using namespace std; using namespace messageqcpp; namespace { -SMComm *instance = NULL; +idbdatafile::SMComm *instance = NULL; boost::mutex m; -} +}; namespace idbdatafile { -static SMComm * SMComm::get() +SMComm * SMComm::get() { if (instance) return instance; @@ -55,13 +67,13 @@ static SMComm * SMComm::get() } \ } -int SMComm::open(const string &filename, int mode, struct stat *statbuf) +int SMComm::open(const string &filename, const int mode, struct stat *statbuf) { ByteStream *command = buffers.getByteStream(); ByteStream *response = buffers.getByteStream(); int err; - command << storagemanager::OPEN << filename << mode; + *command << (uint8_t) storagemanager::OPEN << filename << mode; err = sockets.send_recv(*command, response); if (err) common_exit(command, response, err); @@ -71,13 +83,13 @@ int SMComm::open(const string &filename, int mode, struct stat *statbuf) common_exit(command, response, err); } -ssize_t SMComm::pread(const string &filename, const void *buf, size_t count, off_t offset) +ssize_t SMComm::pread(const string &filename, void *buf, const size_t count, const off_t offset) { ByteStream *command = buffers.getByteStream(); ByteStream *response = buffers.getByteStream(); int err; - command << storagemanager::READ << filename << count << offset; + *command << (uint8_t) storagemanager::READ << filename << count << offset; err = sockets.send_recv(*command, response); if (err) common_exit(command, response, err); @@ -87,14 +99,14 @@ ssize_t SMComm::pread(const string &filename, const void *buf, size_t count, off common_exit(command, response, err); } -ssize_t SMComm::pwrite(const string &filename, const void *buf, size_t count, off_t offset) +ssize_t SMComm::pwrite(const string &filename, const void *buf, const size_t count, const off_t offset) { ByteStream *command = buffers.getByteStream(); ByteStream *response = buffers.getByteStream(); int err; - command << storagemanager::WRITE << filename << count << offset; - command.needAtLeast(count); + *command << (uint8_t) storagemanager::WRITE << filename << count << offset; + command->needAtLeast(count); uint8_t *cmdBuf = command->getInputPtr(); memcpy(cmdBuf, buf, count); command->advanceInputPtr(count); @@ -105,14 +117,14 @@ ssize_t SMComm::pwrite(const string &filename, const void *buf, size_t count, of common_exit(command, response, err); } -ssize_t SMComm::append(const string &filename, const void *buf, size_t count) +ssize_t SMComm::append(const string &filename, const void *buf, const size_t count) { ByteStream *command = buffers.getByteStream(); ByteStream *response = buffers.getByteStream(); int err; - command << storagemanager::APPEND << filename << count; - command.needAtLeast(count); + *command << (uint8_t) storagemanager::APPEND << filename << count; + command->needAtLeast(count); uint8_t *cmdBuf = command->getInputPtr(); memcpy(cmdBuf, buf, count); command->advanceInputPtr(count); @@ -129,7 +141,7 @@ int SMComm::unlink(const string &filename) ByteStream *response = buffers.getByteStream(); int err; - command << storagemanager::UNLINK << filename; + *command << (uint8_t) storagemanager::UNLINK << filename; err = sockets.send_recv(*command, response); if (err) common_exit(command, response, err); @@ -143,7 +155,7 @@ int SMComm::stat(const string &filename, struct stat *statbuf) ByteStream *response = buffers.getByteStream(); int err; - command << storageManager::STAT << filename; + *command << (uint8_t) storagemanager::STAT << filename; err = sockets.send_recv(*command, response); if (err) common_exit(command, response, err); @@ -153,13 +165,13 @@ int SMComm::stat(const string &filename, struct stat *statbuf) common_exit(command, response, err); } -int SMComm::truncate(const string &filename, off64_t length) +int SMComm::truncate(const string &filename, const off64_t length) { ByteStream *command = buffers.getByteStream(); ByteStream *response = buffers.getByteStream(); int err; - command << storagemanager::TRUNCATE << filename << length; + *command << (uint8_t) storagemanager::TRUNCATE << filename << length; err = sockets.send_recv(*command, response); if (err) common_exit(command, response, err); @@ -173,7 +185,7 @@ int SMComm::listDirectory(const string &path, list *entries) ByteStream *response = buffers.getByteStream(); int err; - command << storagemanager::LIST_DIRECTORY << path; + *command << (uint8_t) storagemanager::LIST_DIRECTORY << path; err = sockets.send_recv(*command, response); if (err) common_exit(command, response, err); @@ -196,7 +208,7 @@ int SMComm::ping() ByteStream *response = buffers.getByteStream(); int err; - command << storagemanager::PING << filename << length; + *command << (uint8_t) storagemanager::PING; err = sockets.send_recv(*command, response); if (err) common_exit(command, response, err); diff --git a/utils/cloudio/SMComm.h b/utils/cloudio/SMComm.h index 35fd2c05c..2d108a7b7 100644 --- a/utils/cloudio/SMComm.h +++ b/utils/cloudio/SMComm.h @@ -1,14 +1,31 @@ -# copy some licensing stuff here +/* Copyright (C) 2019 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ #ifndef SMCOMM_H_ #define SMCOMM_H_ -#include +#include #include #include "SocketPool.h" #include "bytestream.h" +#include "bytestreampool.h" -namespace idbdatafile { +namespace idbdatafile +{ class SMComm : public boost::noncopyable { @@ -18,15 +35,15 @@ class SMComm : public boost::noncopyable /* Open currently returns a stat struct so SMDataFile can set its initial position, otherwise behaves how you'd think. */ - int open(const std::string &filename, int mode, struct stat *statbuf); + int open(const std::string &filename, const int mode, struct stat *statbuf); - ssize_t pread(const std::string &filename, const void *buf, size_t count, off_t offset); + ssize_t pread(const std::string &filename, void *buf, const size_t count, const off_t offset); - ssize_t pwrite(const std::string &filename, const void *buf, size_t count, off_t offset); + ssize_t pwrite(const std::string &filename, const void *buf, const size_t count, const off_t offset); /* append exists for cases where the file is open in append mode. A normal write won't work because the file position/size may be out of date if there are multiple writers. */ - ssize_t append(const std::string &filename, const void *buf, size_t count); + ssize_t append(const std::string &filename, const void *buf, const size_t count); int unlink(const std::string &filename); @@ -34,7 +51,7 @@ class SMComm : public boost::noncopyable // added this one because it should be trivial to implement in SM, and prevents a large // operation in SMDataFile. - int truncate(const std::string &filename, off64_t length); + int truncate(const std::string &filename, const off64_t length); int listDirectory(const std::string &path, std::list *entries); @@ -48,7 +65,7 @@ class SMComm : public boost::noncopyable SMComm(); SocketPool sockets; - ByteStreamPool buffers; + messageqcpp::ByteStreamPool buffers; }; } diff --git a/utils/cloudio/SMDataFile.cpp b/utils/cloudio/SMDataFile.cpp index 1c8add2aa..7f1346eab 100644 --- a/utils/cloudio/SMDataFile.cpp +++ b/utils/cloudio/SMDataFile.cpp @@ -1,6 +1,21 @@ -// copy licensing stuff here +/* Copyright (C) 2019 MariaDB Corporaton + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include #include "SMDataFile.h" @@ -51,13 +66,15 @@ int SMDataFile::seek(off64_t offset, int whence) case SEEK_CUR: position += offset; break; - case SEEK_END: + case SEEK_END: + { struct stat _stat; int err = comm->stat(name(), &_stat); if (err) return err; position = _stat.st_size + offset; break; + } default: errno = EINVAL; return -1; @@ -101,6 +118,9 @@ time_t SMDataFile::mtime() return _stat.st_mtime; } - +int SMDataFile::close() +{ + return 0; +} } diff --git a/utils/cloudio/SMDataFile.h b/utils/cloudio/SMDataFile.h index dab286978..512c835a4 100644 --- a/utils/cloudio/SMDataFile.h +++ b/utils/cloudio/SMDataFile.h @@ -1,6 +1,20 @@ -# copy some licensing stuff here +/* Copyright (C) 2019 MariaDB Corporaton + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + #ifndef SMDATAFILE_H_ #define SMDATAFILE_H_ @@ -26,6 +40,7 @@ class SMDataFile : public IDBDataFile off64_t tell(); int flush(); time_t mtime(); + int close(); private: SMDataFile(); diff --git a/utils/cloudio/SMFileFactory.cpp b/utils/cloudio/SMFileFactory.cpp index eda1eb007..1b0b9b192 100644 --- a/utils/cloudio/SMFileFactory.cpp +++ b/utils/cloudio/SMFileFactory.cpp @@ -1,7 +1,24 @@ -# copy licensing stuff here +/* Copyright (C) 2019 MariaDB Corporaton -#include + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include #include "SMFileFactory.h" +#include "SMDataFile.h" #include "SMComm.h" using namespace std; @@ -9,64 +26,57 @@ using namespace std; namespace idbdatafile { -#define _toUint64(ptr_to_eight_bytes) *((uint64_t *) ptr_to_eight_bytes) - -IDBDataFile* SMFileFactory::open(const char *filename, const char *mode, uint opts, uint colWidth) +IDBDataFile* SMFileFactory::open(const char *filename, const char *mode, unsigned opts, unsigned colWidth) { bool _read = false; bool _write = false; - bool at_eof = false; bool create = false; bool truncate = false; - bool append_only = false; - string s_filename = filename; + bool append = false; // strip 'b' chars from mode char newmode[8] = {'\0'}; // there'd better not be 7 chars in the mode string int i = 0; - for (char *c = mode; *c != '\0' && i < 8; c++) + for (const char *c = mode; *c != '\0' && i < 8; c++) if (*c != 'b') newmode[i++] = *c; - if (i == 8) + if (i == 8) { + errno = EINVAL; return NULL; - - // I hate dealing with C-lib file IO. This is ugly but fast. - switch (_toUint64(newmode)) { - case _toUint64("r\0\0\0\0\0\0\0"): - _read = true; - break; - case _toUint64("r+\0\0\0\0\0\0"): - _read = true; - _write = true; - break; - case _toUint64("w\0\0\0\0\0\0\0"): - _write = true; - truncate = true; - create = true; - break; - case _toUint64("w+\0\0\0\0\0\0"): - _read = true; - _write = true; - truncate = true; - create = true; - break; - case _toUint64("a\0\0\0\0\0\0\0"): - _write = true; - create = true; - at_eof = true; - break; - case _toUint64("a+\0\0\0\0\0\0"): - _read = true; - _write = true; - create = true; - append_only = true; - break; - default: - return NULL; } + // parse the new mode string + if (newmode[0] == 'r') + { + _read = true; + if (newmode[1] == '+') + _write = true; + } + else if (newmode[0] == 'w') + { + _write = true; + truncate = true; + create = true; + if (newmode[1] == '+') + _read = true; + } + else if (newmode[0] == 'a') + { + _write = true; + create = true; + append = true; + if (newmode[1] == '+') + _read = true; + } + else + { + errno = EINVAL; + return NULL; + } + + // turn newmode into posix flags uint posix_flags = 0; - if (_read && write) + if (_read && _write) posix_flags |= O_RDWR; else if (_read) posix_flags |= O_RDONLY; @@ -75,15 +85,15 @@ IDBDataFile* SMFileFactory::open(const char *filename, const char *mode, uint op posix_flags |= (create ? O_CREAT : 0); posix_flags |= (truncate ? O_TRUNC : 0); - posix_flage |= (at_eof ? O_APPEND : 0); + posix_flags |= (append ? O_APPEND : 0); SMComm *comm = SMComm::get(); struct stat _stat; - int err = comm->open(s_filename, posix_flags, &stat); + int err = comm->open(filename, posix_flags, &_stat); if (!err) return NULL; - SMDataFile *ret = new SMDataFile(s_filename, posix_flags, append_only, stat); + SMDataFile *ret = new SMDataFile(filename, posix_flags, _stat); return ret; } diff --git a/utils/cloudio/SMFileFactory.h b/utils/cloudio/SMFileFactory.h index 81ac23660..0660398e5 100644 --- a/utils/cloudio/SMFileFactory.h +++ b/utils/cloudio/SMFileFactory.h @@ -1,4 +1,19 @@ -# copy licensing stuff here +/* Copyright (C) 2019 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ #ifndef SMFILEFACTORY_H_ #define SMFILEFACTORY_H_ @@ -12,7 +27,7 @@ namespace idbdatafile class SMFileFactory : public FileFactoryBase { public: - IDBDataFile* open(const char* fname, const char* mode, unsigned opts, unsigned colWidth); + IDBDataFile * open(const char* fname, const char* mode, unsigned opts, unsigned colWidth); }; } diff --git a/utils/cloudio/SMFileSystem.cpp b/utils/cloudio/SMFileSystem.cpp index c2e2731e3..97ae01736 100644 --- a/utils/cloudio/SMFileSystem.cpp +++ b/utils/cloudio/SMFileSystem.cpp @@ -1,4 +1,19 @@ -# copy licensing stuff here +/* Copyright (C) 2019 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ #include #include "SMFileSystem.h" @@ -12,15 +27,15 @@ namespace idbdatafile SMFileSystem::SMFileSystem() : IDBFileSystem(IDBFileSystem::CLOUD) { - SMComm::getSMComm(); // get SMComm running + SMComm::get(); // get SMComm running } -int SMFileSystem::mkdir(const char *path) +int SMFileSystem::mkdir(const char *path) const { return 0; } -int SMFileSystem::size(const char *filename) const +off64_t SMFileSystem::size(const char *filename) const { struct stat _stat; @@ -71,10 +86,10 @@ bool SMFileSystem::isDir(const char *path) const SMComm *comm = SMComm::get(); struct stat _stat; - int err = comm->stat(path, &stat); + int err = comm->stat(path, &_stat); if (err != 0) return false; // reasonable to throw here? todo, look at what the other classes do. - return (stat.st_mode & S_IFDIR); + return (_stat.st_mode & S_IFDIR); } int SMFileSystem::copyFile(const char *src, const char *dest) const diff --git a/utils/cloudio/SMFileSystem.h b/utils/cloudio/SMFileSystem.h index 9555dfcda..f843e5c3b 100644 --- a/utils/cloudio/SMFileSystem.h +++ b/utils/cloudio/SMFileSystem.h @@ -1,4 +1,19 @@ -# copy licensing stuff here +/* Copyright (C) 2019 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ #ifndef SMFILESYSTEM_H_ @@ -18,11 +33,11 @@ class SMFileSystem : public IDBFileSystem, boost::noncopyable SMFileSystem(); virtual ~SMFileSystem(); - int mkdir(const char* pathname); + int mkdir(const char* pathname) const; off64_t size(const char* path) const; off64_t compressedSize(const char* path) const; - int remove(const char* pathname); - int rename(const char* oldpath, const char* newpath); + int remove(const char* pathname) const; + int rename(const char* oldpath, const char* newpath) const; bool exists(const char* pathname) const; int listDirectory(const char* pathname, std::list& contents) const; bool isDir(const char* pathname) const; diff --git a/utils/cloudio/SocketPool.cpp b/utils/cloudio/SocketPool.cpp index d8d290daa..f5130a91d 100644 --- a/utils/cloudio/SocketPool.cpp +++ b/utils/cloudio/SocketPool.cpp @@ -1,11 +1,24 @@ -// copy licensing stuff here +/* Copyright (C) 2019 MariaDB Corporaton + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ #include "SocketPool.h" #include "configcpp.h" -#include "logging.h" -#include "storage-manager/include/messageFormat.h" +#include "logger.h" +#include "messageFormat.h" #include #include @@ -36,21 +49,24 @@ SocketPool::SocketPool() { stmp = config->getConfig("StorageManager", "MaxSockets"); itmp = strtol(stmp.c_str(), NULL, 10); - if (itmp > 500 || itmp < 1) { - string errmsg = "SocketPool(): Got a bad value '" + stmp + "' for StorageManager/MaxSockets."; - log(logging::CRITICAL, errmsg); + if (itmp > 500 || itmp < 1) + { + string errmsg = "SocketPool(): Got a bad value '" + stmp + "' for StorageManager/MaxSockets. Range is 1-500."; + log(logging::LOG_TYPE_CRITICAL, errmsg); throw runtime_error(errmsg); + } maxSockets = itmp; } catch (exception &e) { ostringstream os; os << "SocketPool(): Using default of " << defaultSockets << "."; - log(logging::CRITICAL, os.str()); + log(logging::LOG_TYPE_CRITICAL, os.str()); maxSockets = defaultSockets; } clientSocket = ::socket(AF_UNIX, SOCK_STREAM, 0); - if (clientSocket < 0) { + if (clientSocket < 0) + { char buf[80], *ptr; ptr = strerror_r(errno, buf, 80); throw runtime_error("SocketPool(): Failed to get clientSocket, got " + string(ptr)); @@ -61,51 +77,53 @@ SocketPool::~SocketPool() { boost::mutex::scoped_lock(mutex); - while (!allSockets.empty()) { - int next = allSockets.front(); - allSockets.pop_front(); - ::close(next); - } - ::close(clientSocket) + for (uint i = 0; i < allSockets.size(); i++) + ::close(allSockets[i]); + ::close(clientSocket); } -int SocketPool::send_recv(const messageqcpp::ByteStream &in, messageqcpp::ByteStream *out) +#define sm_check_error \ + if (err < 0) \ + { \ + returnSocket(sock); \ + return -1; \ + } + +int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream *out) { uint count = 0; uint length = in.length(); int sock = getSocket(); - uint8_t inbuf* = in.buf(); + const uint8_t *inbuf = in.buf(); int err = 0; - ::write(sock, &storagemanager::SM_MSG_START, sizeof(storagemanager::SM_MSG_START)); - ::write(sock, &length, sizeof(length)); + /* TODO: make these writes not send SIGPIPE */ + err = ::write(sock, &storagemanager::SM_MSG_START, sizeof(storagemanager::SM_MSG_START)); + sm_check_error; + err = ::write(sock, &length, sizeof(length)); + sm_check_error; while (count < length) { err = ::write(sock, &inbuf[count], length-count); - if (err < 0) - { - returnSocket(sock); - return -1; - } + sm_check_error; count += err; + in.advance(err); } out->restart(); uint8_t *outbuf; uint8_t window[8192]; - bool foundheader = false; + bool foundHeader = false; length = 0; uint remainingBytes = 0; - int i; - while (!foundheader) + uint i; + + /* TODO: consider adding timeouts on msg recv if we start using tcp sockets */ + while (!foundHeader) { // here remainingBytes means the # of bytes from the previous message err = ::read(sock, &window[remainingBytes], 8192 - remainingBytes); - if (err < 0) - { - returnSocket(sock); - return -1; - } + sm_check_error; uint endOfData = remainingBytes + err; // for clarity // scan for the 8-byte header. If it is fragmented, move the fragment to the front of the buffer @@ -129,25 +147,22 @@ int SocketPool::send_recv(const messageqcpp::ByteStream &in, messageqcpp::ByteSt } else { + // copy the payload fragment we got into the output bytestream out->needAtLeast(length); - outbuf = out->buf(); + outbuf = out->getInputPtr(); memcpy(outbuf, &window[i+8], endOfData - (i+8)); remainingBytes = length - (endOfData - (i+8)); // remainingBytes is now the # of bytes left to read } } - // read the rest of the payload + // read the rest of the payload directly into the output bytestream while (remainingBytes > 0) { err = ::read(sock, &outbuf[length - remainingBytes], remainingBytes); - if (err < 0) - { - returnSocket(sock); - return -1; - } + sm_check_error; remainingBytes -= err; + out->advanceInputPtr(err); } - returnSocket(sock); return 0; } @@ -164,9 +179,18 @@ int SocketPool::getSocket() memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; strcpy(&addr.sun_path[0], storagemanager::socket_name); - ret = ::connect(tmp, &addr, sizeof(addr)); + ret = ::connect(clientSocket, (const struct sockaddr *) &addr, sizeof(addr)); if (ret >= 0) allSockets.push_back(ret); + else + { + int saved_errno = errno; + ostringstream os; + char buf[80]; + os << "SocketPool::getSocket() failed to connect; got '" << strerror_r(saved_errno, buf, 80); + log(logging::LOG_TYPE_CRITICAL, os.str()); + errno = saved_errno; + } return ret; } @@ -179,7 +203,7 @@ int SocketPool::getSocket() return ret; } -void SocketPool::returnSocket(int sock) +void SocketPool::returnSocket(const int sock) { boost::mutex::scoped_lock lock(mutex); freeSockets.push_back(sock); diff --git a/utils/cloudio/SocketPool.h b/utils/cloudio/SocketPool.h index df6ba7fe3..60468bee3 100644 --- a/utils/cloudio/SocketPool.h +++ b/utils/cloudio/SocketPool.h @@ -1,10 +1,26 @@ -// copy licensing stuff here +/* Copyright (C) 2019 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ #ifndef _SOCKETPOOL_H_ #define _SOCKETPOOL_H_ #include #include +#include #include "bytestream.h" @@ -21,19 +37,19 @@ class SocketPool : public boost::noncopyable virtual ~SocketPool(); // 0 = success, -1 = failure. Should this throw instead? - int send_recv(const ByteStream &to_send, ByteStream *to_recv); + int send_recv(messageqcpp::ByteStream &to_send, messageqcpp::ByteStream *to_recv); private: int getSocket(); - void returnSocket(); + void returnSocket(const int sock); std::vector allSockets; std::deque freeSockets; boost::mutex mutex; - boost::condition socketAvailable; + boost::condition_variable socketAvailable; int clientSocket; - int maxSockets; - static const int defaultSockets = 20; + uint maxSockets; + static const uint defaultSockets = 20; }; } diff --git a/utils/cloudio/sm_exceptions.h b/utils/cloudio/sm_exceptions.h index daa41851f..61e7581c1 100644 --- a/utils/cloudio/sm_exceptions.h +++ b/utils/cloudio/sm_exceptions.h @@ -1,3 +1,20 @@ +/* Copyright (C) 2019 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + #ifndef _SMEXECEPTIONS_H_ #define _SMEXECEPTIONS_H_ @@ -6,37 +23,15 @@ namespace idbdatafile { -class NotImplementedYet : public std::exception +class NotImplementedYet : public std::logic_error { public: NotImplementedYet(const std::string &s); }; - -class FailedToSend : public std::runtime_error -{ - public: - FailedToSend(const std::string &s); -}; - -class FailedToRecv : public std::runtime_error -{ - public: - FailedToRecv(const std::string &s); -}; NotImplementedYet::NotImplementedYet(const std::string &s) : - std::exception(s + "() isn't implemented yet.") -{ -} - -FailedToSend::FailedToSend(const std::string &s) : - std::runtime_error(s) -{ -} - -FailedToRecv::FailedToRecv(const std::string &s) : - std::runtime_error(s) + std::logic_error(s) { } diff --git a/utils/messageqcpp/CMakeLists.txt b/utils/messageqcpp/CMakeLists.txt index 5fc8a39ab..4f7c0e24c 100644 --- a/utils/messageqcpp/CMakeLists.txt +++ b/utils/messageqcpp/CMakeLists.txt @@ -10,7 +10,9 @@ set(messageqcpp_LIB_SRCS socketparms.cpp inetstreamsocket.cpp iosocket.cpp - compressed_iss.cpp) + compressed_iss.cpp + bytestreampool.cpp +) add_library(messageqcpp SHARED ${messageqcpp_LIB_SRCS}) diff --git a/utils/messageqcpp/bytestreampool.cpp b/utils/messageqcpp/bytestreampool.cpp index 765ac449a..3039ea7b0 100644 --- a/utils/messageqcpp/bytestreampool.cpp +++ b/utils/messageqcpp/bytestreampool.cpp @@ -71,7 +71,7 @@ void ByteStreamPool::returnByteStream(ByteStream *bs) else { boost::mutex::scoped_lock s(mutex); - if (freeByteStream.size() > maxFreeBuffers) + if (freeByteStreams.size() > maxFreeBuffers) delete bs; else { bs->restart(); diff --git a/utils/messageqcpp/bytestreampool.h b/utils/messageqcpp/bytestreampool.h index 075a29fe9..443964b16 100644 --- a/utils/messageqcpp/bytestreampool.h +++ b/utils/messageqcpp/bytestreampool.h @@ -26,6 +26,7 @@ Initially, 'large' is defined as 1MB. #include #include +#include "bytestream.h" namespace messageqcpp {