1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Added licensing preamble. Got it to build.

This commit is contained in:
Patrick LeBlanc
2019-01-23 13:02:53 -06:00
parent 88273bfc14
commit 51bb9f3050
17 changed files with 339 additions and 168 deletions

View File

@ -538,4 +538,7 @@
<Host>127.0.0.1</Host> <Host>127.0.0.1</Host>
<Port>0</Port> <Port>0</Port>
</QueryTele> </QueryTele>
<StorageManager>
<MaxSockets>30</MaxSockets>
</StorageManager>
</Columnstore> </Columnstore>

View File

@ -26,4 +26,5 @@ add_subdirectory(querytele)
add_subdirectory(clusterTester) add_subdirectory(clusterTester)
add_subdirectory(libmysql_client) add_subdirectory(libmysql_client)
add_subdirectory(regr) add_subdirectory(regr)
add_subdirectory(cloudio)

View File

@ -0,0 +1,10 @@
include_directories(${ENGINE_COMMON_INCLUDES} storage-manager/include)
set(cloudio_LIB_SRCS SMComm.cpp SMDataFile.cpp SMFileFactory.cpp SMFileSystem.cpp SocketPool.cpp)
add_library(cloudio SHARED ${cloudio_LIB_SRCS})
set_target_properties(cloudio PROPERTIES VERSION 1.0.0 SOVERSION 1)
install(TARGETS cloudio DESTINATION ${ENGINE_LIBDIR} COMPONENT libs)

View File

@ -1,24 +1,36 @@
// copy licensing stuff here /* Copyright (C) 2019 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include "SMComm.h" #include "SMComm.h"
#include "bytestream.h" #include "messageFormat.h"
#include <boost/thread/mutex.hpp>
using namespace std; using namespace std;
using namespace messageqcpp; using namespace messageqcpp;
namespace namespace
{ {
SMComm *instance = NULL; idbdatafile::SMComm *instance = NULL;
boost::mutex m; boost::mutex m;
} };
namespace idbdatafile namespace idbdatafile
{ {
static SMComm * SMComm::get() SMComm * SMComm::get()
{ {
if (instance) if (instance)
return instance; return instance;
@ -55,13 +67,13 @@ static SMComm * SMComm::get()
} \ } \
} }
int SMComm::open(const string &filename, int mode, struct stat *statbuf) int SMComm::open(const string &filename, const int mode, struct stat *statbuf)
{ {
ByteStream *command = buffers.getByteStream(); ByteStream *command = buffers.getByteStream();
ByteStream *response = buffers.getByteStream(); ByteStream *response = buffers.getByteStream();
int err; int err;
command << storagemanager::OPEN << filename << mode; *command << (uint8_t) storagemanager::OPEN << filename << mode;
err = sockets.send_recv(*command, response); err = sockets.send_recv(*command, response);
if (err) if (err)
common_exit(command, response, err); common_exit(command, response, err);
@ -71,13 +83,13 @@ int SMComm::open(const string &filename, int mode, struct stat *statbuf)
common_exit(command, response, err); common_exit(command, response, err);
} }
ssize_t SMComm::pread(const string &filename, const void *buf, size_t count, off_t offset) ssize_t SMComm::pread(const string &filename, void *buf, const size_t count, const off_t offset)
{ {
ByteStream *command = buffers.getByteStream(); ByteStream *command = buffers.getByteStream();
ByteStream *response = buffers.getByteStream(); ByteStream *response = buffers.getByteStream();
int err; int err;
command << storagemanager::READ << filename << count << offset; *command << (uint8_t) storagemanager::READ << filename << count << offset;
err = sockets.send_recv(*command, response); err = sockets.send_recv(*command, response);
if (err) if (err)
common_exit(command, response, err); common_exit(command, response, err);
@ -87,14 +99,14 @@ ssize_t SMComm::pread(const string &filename, const void *buf, size_t count, off
common_exit(command, response, err); common_exit(command, response, err);
} }
ssize_t SMComm::pwrite(const string &filename, const void *buf, size_t count, off_t offset) ssize_t SMComm::pwrite(const string &filename, const void *buf, const size_t count, const off_t offset)
{ {
ByteStream *command = buffers.getByteStream(); ByteStream *command = buffers.getByteStream();
ByteStream *response = buffers.getByteStream(); ByteStream *response = buffers.getByteStream();
int err; int err;
command << storagemanager::WRITE << filename << count << offset; *command << (uint8_t) storagemanager::WRITE << filename << count << offset;
command.needAtLeast(count); command->needAtLeast(count);
uint8_t *cmdBuf = command->getInputPtr(); uint8_t *cmdBuf = command->getInputPtr();
memcpy(cmdBuf, buf, count); memcpy(cmdBuf, buf, count);
command->advanceInputPtr(count); command->advanceInputPtr(count);
@ -105,14 +117,14 @@ ssize_t SMComm::pwrite(const string &filename, const void *buf, size_t count, of
common_exit(command, response, err); common_exit(command, response, err);
} }
ssize_t SMComm::append(const string &filename, const void *buf, size_t count) ssize_t SMComm::append(const string &filename, const void *buf, const size_t count)
{ {
ByteStream *command = buffers.getByteStream(); ByteStream *command = buffers.getByteStream();
ByteStream *response = buffers.getByteStream(); ByteStream *response = buffers.getByteStream();
int err; int err;
command << storagemanager::APPEND << filename << count; *command << (uint8_t) storagemanager::APPEND << filename << count;
command.needAtLeast(count); command->needAtLeast(count);
uint8_t *cmdBuf = command->getInputPtr(); uint8_t *cmdBuf = command->getInputPtr();
memcpy(cmdBuf, buf, count); memcpy(cmdBuf, buf, count);
command->advanceInputPtr(count); command->advanceInputPtr(count);
@ -129,7 +141,7 @@ int SMComm::unlink(const string &filename)
ByteStream *response = buffers.getByteStream(); ByteStream *response = buffers.getByteStream();
int err; int err;
command << storagemanager::UNLINK << filename; *command << (uint8_t) storagemanager::UNLINK << filename;
err = sockets.send_recv(*command, response); err = sockets.send_recv(*command, response);
if (err) if (err)
common_exit(command, response, err); common_exit(command, response, err);
@ -143,7 +155,7 @@ int SMComm::stat(const string &filename, struct stat *statbuf)
ByteStream *response = buffers.getByteStream(); ByteStream *response = buffers.getByteStream();
int err; int err;
command << storageManager::STAT << filename; *command << (uint8_t) storagemanager::STAT << filename;
err = sockets.send_recv(*command, response); err = sockets.send_recv(*command, response);
if (err) if (err)
common_exit(command, response, err); common_exit(command, response, err);
@ -153,13 +165,13 @@ int SMComm::stat(const string &filename, struct stat *statbuf)
common_exit(command, response, err); common_exit(command, response, err);
} }
int SMComm::truncate(const string &filename, off64_t length) int SMComm::truncate(const string &filename, const off64_t length)
{ {
ByteStream *command = buffers.getByteStream(); ByteStream *command = buffers.getByteStream();
ByteStream *response = buffers.getByteStream(); ByteStream *response = buffers.getByteStream();
int err; int err;
command << storagemanager::TRUNCATE << filename << length; *command << (uint8_t) storagemanager::TRUNCATE << filename << length;
err = sockets.send_recv(*command, response); err = sockets.send_recv(*command, response);
if (err) if (err)
common_exit(command, response, err); common_exit(command, response, err);
@ -173,7 +185,7 @@ int SMComm::listDirectory(const string &path, list<string> *entries)
ByteStream *response = buffers.getByteStream(); ByteStream *response = buffers.getByteStream();
int err; int err;
command << storagemanager::LIST_DIRECTORY << path; *command << (uint8_t) storagemanager::LIST_DIRECTORY << path;
err = sockets.send_recv(*command, response); err = sockets.send_recv(*command, response);
if (err) if (err)
common_exit(command, response, err); common_exit(command, response, err);
@ -196,7 +208,7 @@ int SMComm::ping()
ByteStream *response = buffers.getByteStream(); ByteStream *response = buffers.getByteStream();
int err; int err;
command << storagemanager::PING << filename << length; *command << (uint8_t) storagemanager::PING;
err = sockets.send_recv(*command, response); err = sockets.send_recv(*command, response);
if (err) if (err)
common_exit(command, response, err); common_exit(command, response, err);

View File

@ -1,14 +1,31 @@
# copy some licensing stuff here /* Copyright (C) 2019 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#ifndef SMCOMM_H_ #ifndef SMCOMM_H_
#define SMCOMM_H_ #define SMCOMM_H_
#include <sys/types.h> #include <sys/stat.h>
#include <string> #include <string>
#include "SocketPool.h" #include "SocketPool.h"
#include "bytestream.h" #include "bytestream.h"
#include "bytestreampool.h"
namespace idbdatafile { namespace idbdatafile
{
class SMComm : public boost::noncopyable class SMComm : public boost::noncopyable
{ {
@ -18,15 +35,15 @@ class SMComm : public boost::noncopyable
/* Open currently returns a stat struct so SMDataFile can set its initial position, otherwise /* Open currently returns a stat struct so SMDataFile can set its initial position, otherwise
behaves how you'd think. */ behaves how you'd think. */
int open(const std::string &filename, int mode, struct stat *statbuf); int open(const std::string &filename, const int mode, struct stat *statbuf);
ssize_t pread(const std::string &filename, const void *buf, size_t count, off_t offset); ssize_t pread(const std::string &filename, void *buf, const size_t count, const off_t offset);
ssize_t pwrite(const std::string &filename, const void *buf, size_t count, off_t offset); ssize_t pwrite(const std::string &filename, const void *buf, const size_t count, const off_t offset);
/* append exists for cases where the file is open in append mode. A normal write won't work /* append exists for cases where the file is open in append mode. A normal write won't work
because the file position/size may be out of date if there are multiple writers. */ because the file position/size may be out of date if there are multiple writers. */
ssize_t append(const std::string &filename, const void *buf, size_t count); ssize_t append(const std::string &filename, const void *buf, const size_t count);
int unlink(const std::string &filename); int unlink(const std::string &filename);
@ -34,7 +51,7 @@ class SMComm : public boost::noncopyable
// added this one because it should be trivial to implement in SM, and prevents a large // added this one because it should be trivial to implement in SM, and prevents a large
// operation in SMDataFile. // operation in SMDataFile.
int truncate(const std::string &filename, off64_t length); int truncate(const std::string &filename, const off64_t length);
int listDirectory(const std::string &path, std::list<std::string> *entries); int listDirectory(const std::string &path, std::list<std::string> *entries);
@ -48,7 +65,7 @@ class SMComm : public boost::noncopyable
SMComm(); SMComm();
SocketPool sockets; SocketPool sockets;
ByteStreamPool buffers; messageqcpp::ByteStreamPool buffers;
}; };
} }

View File

@ -1,6 +1,21 @@
// copy licensing stuff here /* Copyright (C) 2019 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <fcntl.h>
#include "SMDataFile.h" #include "SMDataFile.h"
@ -51,13 +66,15 @@ int SMDataFile::seek(off64_t offset, int whence)
case SEEK_CUR: case SEEK_CUR:
position += offset; position += offset;
break; break;
case SEEK_END: case SEEK_END:
{
struct stat _stat; struct stat _stat;
int err = comm->stat(name(), &_stat); int err = comm->stat(name(), &_stat);
if (err) if (err)
return err; return err;
position = _stat.st_size + offset; position = _stat.st_size + offset;
break; break;
}
default: default:
errno = EINVAL; errno = EINVAL;
return -1; return -1;
@ -101,6 +118,9 @@ time_t SMDataFile::mtime()
return _stat.st_mtime; return _stat.st_mtime;
} }
int SMDataFile::close()
{
return 0;
}
} }

View File

@ -1,6 +1,20 @@
# copy some licensing stuff here /* Copyright (C) 2019 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#ifndef SMDATAFILE_H_ #ifndef SMDATAFILE_H_
#define SMDATAFILE_H_ #define SMDATAFILE_H_
@ -26,6 +40,7 @@ class SMDataFile : public IDBDataFile
off64_t tell(); off64_t tell();
int flush(); int flush();
time_t mtime(); time_t mtime();
int close();
private: private:
SMDataFile(); SMDataFile();

View File

@ -1,7 +1,24 @@
# copy licensing stuff here /* Copyright (C) 2019 MariaDB Corporaton
#include <sys/types.h> This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <sys/stat.h>
#include <fcntl.h>
#include "SMFileFactory.h" #include "SMFileFactory.h"
#include "SMDataFile.h"
#include "SMComm.h" #include "SMComm.h"
using namespace std; using namespace std;
@ -9,64 +26,57 @@ using namespace std;
namespace idbdatafile { namespace idbdatafile {
#define _toUint64(ptr_to_eight_bytes) *((uint64_t *) ptr_to_eight_bytes) IDBDataFile* SMFileFactory::open(const char *filename, const char *mode, unsigned opts, unsigned colWidth)
IDBDataFile* SMFileFactory::open(const char *filename, const char *mode, uint opts, uint colWidth)
{ {
bool _read = false; bool _read = false;
bool _write = false; bool _write = false;
bool at_eof = false;
bool create = false; bool create = false;
bool truncate = false; bool truncate = false;
bool append_only = false; bool append = false;
string s_filename = filename;
// strip 'b' chars from mode // strip 'b' chars from mode
char newmode[8] = {'\0'}; // there'd better not be 7 chars in the mode string char newmode[8] = {'\0'}; // there'd better not be 7 chars in the mode string
int i = 0; int i = 0;
for (char *c = mode; *c != '\0' && i < 8; c++) for (const char *c = mode; *c != '\0' && i < 8; c++)
if (*c != 'b') if (*c != 'b')
newmode[i++] = *c; newmode[i++] = *c;
if (i == 8) if (i == 8) {
errno = EINVAL;
return NULL; return NULL;
// I hate dealing with C-lib file IO. This is ugly but fast.
switch (_toUint64(newmode)) {
case _toUint64("r\0\0\0\0\0\0\0"):
_read = true;
break;
case _toUint64("r+\0\0\0\0\0\0"):
_read = true;
_write = true;
break;
case _toUint64("w\0\0\0\0\0\0\0"):
_write = true;
truncate = true;
create = true;
break;
case _toUint64("w+\0\0\0\0\0\0"):
_read = true;
_write = true;
truncate = true;
create = true;
break;
case _toUint64("a\0\0\0\0\0\0\0"):
_write = true;
create = true;
at_eof = true;
break;
case _toUint64("a+\0\0\0\0\0\0"):
_read = true;
_write = true;
create = true;
append_only = true;
break;
default:
return NULL;
} }
// parse the new mode string
if (newmode[0] == 'r')
{
_read = true;
if (newmode[1] == '+')
_write = true;
}
else if (newmode[0] == 'w')
{
_write = true;
truncate = true;
create = true;
if (newmode[1] == '+')
_read = true;
}
else if (newmode[0] == 'a')
{
_write = true;
create = true;
append = true;
if (newmode[1] == '+')
_read = true;
}
else
{
errno = EINVAL;
return NULL;
}
// turn newmode into posix flags
uint posix_flags = 0; uint posix_flags = 0;
if (_read && write) if (_read && _write)
posix_flags |= O_RDWR; posix_flags |= O_RDWR;
else if (_read) else if (_read)
posix_flags |= O_RDONLY; posix_flags |= O_RDONLY;
@ -75,15 +85,15 @@ IDBDataFile* SMFileFactory::open(const char *filename, const char *mode, uint op
posix_flags |= (create ? O_CREAT : 0); posix_flags |= (create ? O_CREAT : 0);
posix_flags |= (truncate ? O_TRUNC : 0); posix_flags |= (truncate ? O_TRUNC : 0);
posix_flage |= (at_eof ? O_APPEND : 0); posix_flags |= (append ? O_APPEND : 0);
SMComm *comm = SMComm::get(); SMComm *comm = SMComm::get();
struct stat _stat; struct stat _stat;
int err = comm->open(s_filename, posix_flags, &stat); int err = comm->open(filename, posix_flags, &_stat);
if (!err) if (!err)
return NULL; return NULL;
SMDataFile *ret = new SMDataFile(s_filename, posix_flags, append_only, stat); SMDataFile *ret = new SMDataFile(filename, posix_flags, _stat);
return ret; return ret;
} }

View File

@ -1,4 +1,19 @@
# copy licensing stuff here /* Copyright (C) 2019 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#ifndef SMFILEFACTORY_H_ #ifndef SMFILEFACTORY_H_
#define SMFILEFACTORY_H_ #define SMFILEFACTORY_H_
@ -12,7 +27,7 @@ namespace idbdatafile
class SMFileFactory : public FileFactoryBase class SMFileFactory : public FileFactoryBase
{ {
public: public:
IDBDataFile* open(const char* fname, const char* mode, unsigned opts, unsigned colWidth); IDBDataFile * open(const char* fname, const char* mode, unsigned opts, unsigned colWidth);
}; };
} }

View File

@ -1,4 +1,19 @@
# copy licensing stuff here /* Copyright (C) 2019 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <sys/types.h> #include <sys/types.h>
#include "SMFileSystem.h" #include "SMFileSystem.h"
@ -12,15 +27,15 @@ namespace idbdatafile
SMFileSystem::SMFileSystem() : IDBFileSystem(IDBFileSystem::CLOUD) SMFileSystem::SMFileSystem() : IDBFileSystem(IDBFileSystem::CLOUD)
{ {
SMComm::getSMComm(); // get SMComm running SMComm::get(); // get SMComm running
} }
int SMFileSystem::mkdir(const char *path) int SMFileSystem::mkdir(const char *path) const
{ {
return 0; return 0;
} }
int SMFileSystem::size(const char *filename) const off64_t SMFileSystem::size(const char *filename) const
{ {
struct stat _stat; struct stat _stat;
@ -71,10 +86,10 @@ bool SMFileSystem::isDir(const char *path) const
SMComm *comm = SMComm::get(); SMComm *comm = SMComm::get();
struct stat _stat; struct stat _stat;
int err = comm->stat(path, &stat); int err = comm->stat(path, &_stat);
if (err != 0) if (err != 0)
return false; // reasonable to throw here? todo, look at what the other classes do. return false; // reasonable to throw here? todo, look at what the other classes do.
return (stat.st_mode & S_IFDIR); return (_stat.st_mode & S_IFDIR);
} }
int SMFileSystem::copyFile(const char *src, const char *dest) const int SMFileSystem::copyFile(const char *src, const char *dest) const

View File

@ -1,4 +1,19 @@
# copy licensing stuff here /* Copyright (C) 2019 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#ifndef SMFILESYSTEM_H_ #ifndef SMFILESYSTEM_H_
@ -18,11 +33,11 @@ class SMFileSystem : public IDBFileSystem, boost::noncopyable
SMFileSystem(); SMFileSystem();
virtual ~SMFileSystem(); virtual ~SMFileSystem();
int mkdir(const char* pathname); int mkdir(const char* pathname) const;
off64_t size(const char* path) const; off64_t size(const char* path) const;
off64_t compressedSize(const char* path) const; off64_t compressedSize(const char* path) const;
int remove(const char* pathname); int remove(const char* pathname) const;
int rename(const char* oldpath, const char* newpath); int rename(const char* oldpath, const char* newpath) const;
bool exists(const char* pathname) const; bool exists(const char* pathname) const;
int listDirectory(const char* pathname, std::list<std::string>& contents) const; int listDirectory(const char* pathname, std::list<std::string>& contents) const;
bool isDir(const char* pathname) const; bool isDir(const char* pathname) const;

View File

@ -1,11 +1,24 @@
// copy licensing stuff here /* Copyright (C) 2019 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include "SocketPool.h" #include "SocketPool.h"
#include "configcpp.h" #include "configcpp.h"
#include "logging.h" #include "logger.h"
#include "storage-manager/include/messageFormat.h" #include "messageFormat.h"
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/un.h> #include <sys/un.h>
@ -36,21 +49,24 @@ SocketPool::SocketPool()
{ {
stmp = config->getConfig("StorageManager", "MaxSockets"); stmp = config->getConfig("StorageManager", "MaxSockets");
itmp = strtol(stmp.c_str(), NULL, 10); itmp = strtol(stmp.c_str(), NULL, 10);
if (itmp > 500 || itmp < 1) { if (itmp > 500 || itmp < 1)
string errmsg = "SocketPool(): Got a bad value '" + stmp + "' for StorageManager/MaxSockets."; {
log(logging::CRITICAL, errmsg); string errmsg = "SocketPool(): Got a bad value '" + stmp + "' for StorageManager/MaxSockets. Range is 1-500.";
log(logging::LOG_TYPE_CRITICAL, errmsg);
throw runtime_error(errmsg); throw runtime_error(errmsg);
}
maxSockets = itmp; maxSockets = itmp;
} }
catch (exception &e) catch (exception &e)
{ {
ostringstream os; ostringstream os;
os << "SocketPool(): Using default of " << defaultSockets << "."; os << "SocketPool(): Using default of " << defaultSockets << ".";
log(logging::CRITICAL, os.str()); log(logging::LOG_TYPE_CRITICAL, os.str());
maxSockets = defaultSockets; maxSockets = defaultSockets;
} }
clientSocket = ::socket(AF_UNIX, SOCK_STREAM, 0); clientSocket = ::socket(AF_UNIX, SOCK_STREAM, 0);
if (clientSocket < 0) { if (clientSocket < 0)
{
char buf[80], *ptr; char buf[80], *ptr;
ptr = strerror_r(errno, buf, 80); ptr = strerror_r(errno, buf, 80);
throw runtime_error("SocketPool(): Failed to get clientSocket, got " + string(ptr)); throw runtime_error("SocketPool(): Failed to get clientSocket, got " + string(ptr));
@ -61,51 +77,53 @@ SocketPool::~SocketPool()
{ {
boost::mutex::scoped_lock(mutex); boost::mutex::scoped_lock(mutex);
while (!allSockets.empty()) { for (uint i = 0; i < allSockets.size(); i++)
int next = allSockets.front(); ::close(allSockets[i]);
allSockets.pop_front(); ::close(clientSocket);
::close(next);
}
::close(clientSocket)
} }
int SocketPool::send_recv(const messageqcpp::ByteStream &in, messageqcpp::ByteStream *out) #define sm_check_error \
if (err < 0) \
{ \
returnSocket(sock); \
return -1; \
}
int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream *out)
{ {
uint count = 0; uint count = 0;
uint length = in.length(); uint length = in.length();
int sock = getSocket(); int sock = getSocket();
uint8_t inbuf* = in.buf(); const uint8_t *inbuf = in.buf();
int err = 0; int err = 0;
::write(sock, &storagemanager::SM_MSG_START, sizeof(storagemanager::SM_MSG_START)); /* TODO: make these writes not send SIGPIPE */
::write(sock, &length, sizeof(length)); err = ::write(sock, &storagemanager::SM_MSG_START, sizeof(storagemanager::SM_MSG_START));
sm_check_error;
err = ::write(sock, &length, sizeof(length));
sm_check_error;
while (count < length) while (count < length)
{ {
err = ::write(sock, &inbuf[count], length-count); err = ::write(sock, &inbuf[count], length-count);
if (err < 0) sm_check_error;
{
returnSocket(sock);
return -1;
}
count += err; count += err;
in.advance(err);
} }
out->restart(); out->restart();
uint8_t *outbuf; uint8_t *outbuf;
uint8_t window[8192]; uint8_t window[8192];
bool foundheader = false; bool foundHeader = false;
length = 0; length = 0;
uint remainingBytes = 0; uint remainingBytes = 0;
int i; uint i;
while (!foundheader)
/* TODO: consider adding timeouts on msg recv if we start using tcp sockets */
while (!foundHeader)
{ {
// here remainingBytes means the # of bytes from the previous message // here remainingBytes means the # of bytes from the previous message
err = ::read(sock, &window[remainingBytes], 8192 - remainingBytes); err = ::read(sock, &window[remainingBytes], 8192 - remainingBytes);
if (err < 0) sm_check_error;
{
returnSocket(sock);
return -1;
}
uint endOfData = remainingBytes + err; // for clarity uint endOfData = remainingBytes + err; // for clarity
// scan for the 8-byte header. If it is fragmented, move the fragment to the front of the buffer // scan for the 8-byte header. If it is fragmented, move the fragment to the front of the buffer
@ -129,25 +147,22 @@ int SocketPool::send_recv(const messageqcpp::ByteStream &in, messageqcpp::ByteSt
} }
else else
{ {
// copy the payload fragment we got into the output bytestream
out->needAtLeast(length); out->needAtLeast(length);
outbuf = out->buf(); outbuf = out->getInputPtr();
memcpy(outbuf, &window[i+8], endOfData - (i+8)); memcpy(outbuf, &window[i+8], endOfData - (i+8));
remainingBytes = length - (endOfData - (i+8)); // remainingBytes is now the # of bytes left to read remainingBytes = length - (endOfData - (i+8)); // remainingBytes is now the # of bytes left to read
} }
} }
// read the rest of the payload // read the rest of the payload directly into the output bytestream
while (remainingBytes > 0) while (remainingBytes > 0)
{ {
err = ::read(sock, &outbuf[length - remainingBytes], remainingBytes); err = ::read(sock, &outbuf[length - remainingBytes], remainingBytes);
if (err < 0) sm_check_error;
{
returnSocket(sock);
return -1;
}
remainingBytes -= err; remainingBytes -= err;
out->advanceInputPtr(err);
} }
returnSocket(sock); returnSocket(sock);
return 0; return 0;
} }
@ -164,9 +179,18 @@ int SocketPool::getSocket()
memset(&addr, 0, sizeof(addr)); memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX; addr.sun_family = AF_UNIX;
strcpy(&addr.sun_path[0], storagemanager::socket_name); strcpy(&addr.sun_path[0], storagemanager::socket_name);
ret = ::connect(tmp, &addr, sizeof(addr)); ret = ::connect(clientSocket, (const struct sockaddr *) &addr, sizeof(addr));
if (ret >= 0) if (ret >= 0)
allSockets.push_back(ret); allSockets.push_back(ret);
else
{
int saved_errno = errno;
ostringstream os;
char buf[80];
os << "SocketPool::getSocket() failed to connect; got '" << strerror_r(saved_errno, buf, 80);
log(logging::LOG_TYPE_CRITICAL, os.str());
errno = saved_errno;
}
return ret; return ret;
} }
@ -179,7 +203,7 @@ int SocketPool::getSocket()
return ret; return ret;
} }
void SocketPool::returnSocket(int sock) void SocketPool::returnSocket(const int sock)
{ {
boost::mutex::scoped_lock lock(mutex); boost::mutex::scoped_lock lock(mutex);
freeSockets.push_back(sock); freeSockets.push_back(sock);

View File

@ -1,10 +1,26 @@
// copy licensing stuff here /* Copyright (C) 2019 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#ifndef _SOCKETPOOL_H_ #ifndef _SOCKETPOOL_H_
#define _SOCKETPOOL_H_ #define _SOCKETPOOL_H_
#include <boost/utility.hpp> #include <boost/utility.hpp>
#include <boost/thread/mutex.hpp> #include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include "bytestream.h" #include "bytestream.h"
@ -21,19 +37,19 @@ class SocketPool : public boost::noncopyable
virtual ~SocketPool(); virtual ~SocketPool();
// 0 = success, -1 = failure. Should this throw instead? // 0 = success, -1 = failure. Should this throw instead?
int send_recv(const ByteStream &to_send, ByteStream *to_recv); int send_recv(messageqcpp::ByteStream &to_send, messageqcpp::ByteStream *to_recv);
private: private:
int getSocket(); int getSocket();
void returnSocket(); void returnSocket(const int sock);
std::vector<int> allSockets; std::vector<int> allSockets;
std::deque<int> freeSockets; std::deque<int> freeSockets;
boost::mutex mutex; boost::mutex mutex;
boost::condition socketAvailable; boost::condition_variable socketAvailable;
int clientSocket; int clientSocket;
int maxSockets; uint maxSockets;
static const int defaultSockets = 20; static const uint defaultSockets = 20;
}; };
} }

View File

@ -1,3 +1,20 @@
/* Copyright (C) 2019 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#ifndef _SMEXECEPTIONS_H_ #ifndef _SMEXECEPTIONS_H_
#define _SMEXECEPTIONS_H_ #define _SMEXECEPTIONS_H_
@ -6,37 +23,15 @@
namespace idbdatafile namespace idbdatafile
{ {
class NotImplementedYet : public std::exception class NotImplementedYet : public std::logic_error
{ {
public: public:
NotImplementedYet(const std::string &s); NotImplementedYet(const std::string &s);
}; };
class FailedToSend : public std::runtime_error
{
public:
FailedToSend(const std::string &s);
};
class FailedToRecv : public std::runtime_error
{
public:
FailedToRecv(const std::string &s);
};
NotImplementedYet::NotImplementedYet(const std::string &s) : NotImplementedYet::NotImplementedYet(const std::string &s) :
std::exception(s + "() isn't implemented yet.") std::logic_error(s)
{
}
FailedToSend::FailedToSend(const std::string &s) :
std::runtime_error(s)
{
}
FailedToRecv::FailedToRecv(const std::string &s) :
std::runtime_error(s)
{ {
} }

View File

@ -10,7 +10,9 @@ set(messageqcpp_LIB_SRCS
socketparms.cpp socketparms.cpp
inetstreamsocket.cpp inetstreamsocket.cpp
iosocket.cpp iosocket.cpp
compressed_iss.cpp) compressed_iss.cpp
bytestreampool.cpp
)
add_library(messageqcpp SHARED ${messageqcpp_LIB_SRCS}) add_library(messageqcpp SHARED ${messageqcpp_LIB_SRCS})

View File

@ -71,7 +71,7 @@ void ByteStreamPool::returnByteStream(ByteStream *bs)
else else
{ {
boost::mutex::scoped_lock s(mutex); boost::mutex::scoped_lock s(mutex);
if (freeByteStream.size() > maxFreeBuffers) if (freeByteStreams.size() > maxFreeBuffers)
delete bs; delete bs;
else { else {
bs->restart(); bs->restart();

View File

@ -26,6 +26,7 @@ Initially, 'large' is defined as 1MB.
#include <deque> #include <deque>
#include <boost/thread/mutex.hpp> #include <boost/thread/mutex.hpp>
#include "bytestream.h"
namespace messageqcpp namespace messageqcpp
{ {