1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Finished the first cut. I assume it doesn't build yet.

This commit is contained in:
Patrick LeBlanc
2019-01-22 16:12:49 -06:00
parent 3d58581945
commit 88273bfc14
5 changed files with 219 additions and 5 deletions

View File

@ -547,5 +547,8 @@
<Host>127.0.0.1</Host> <Host>127.0.0.1</Host>
<Port>0</Port> <Port>0</Port>
</QueryTele> </QueryTele>
<StorageManager>
<MaxSockets>30</MaxSockets>
</StorageManager>
</Columnstore> </Columnstore>

View File

@ -3,5 +3,205 @@
#include "SMComm.h" #include "SMComm.h"
#include "bytestream.h"
#include <boost/thread/mutex.hpp>
using namespace std;
using namespace messageqcpp;
namespace
{
SMComm *instance = NULL;
boost::mutex m;
}
namespace idbdatafile
{
static SMComm * SMComm::get()
{
if (instance)
return instance;
boost::mutex::scoped_lock sl(m);
if (instance)
return instance;
instance = new SMComm();
return instance;
}
// timesavers
#define common_exit(bs1, bs2, retCode) \
{ \
buffers.returnByteStream(bs1); \
buffers.returnByteStream(bs2); \
return retCode; \
}
// bs1 is the bytestream ptr with the command to SMComm.
// bs2 is the bytestream pointer with the response from SMComm.
// retCode is the var to store the return code in from the msg.
// returns with the output pointer at the fcn-specific data
#define check_for_error(bs1, bs2, retCode) \
{ \
int l_errno; \
*bs2 >> retCode; \
if (retCode < 0) \
{ \
*bs2 >> l_errno; \
errno = l_errno; \
common_exit(bs1, bs2, retCode); \
} \
}
int SMComm::open(const string &filename, int mode, struct stat *statbuf)
{
ByteStream *command = buffers.getByteStream();
ByteStream *response = buffers.getByteStream();
int err;
command << storagemanager::OPEN << filename << mode;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
memcpy(statbuf, response->buf(), sizeof(*statbuf));
common_exit(command, response, err);
}
ssize_t SMComm::pread(const string &filename, const void *buf, size_t count, off_t offset)
{
ByteStream *command = buffers.getByteStream();
ByteStream *response = buffers.getByteStream();
int err;
command << storagemanager::READ << filename << count << offset;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
memcpy(buf, response->buf(), err);
common_exit(command, response, err);
}
ssize_t SMComm::pwrite(const string &filename, const void *buf, size_t count, off_t offset)
{
ByteStream *command = buffers.getByteStream();
ByteStream *response = buffers.getByteStream();
int err;
command << storagemanager::WRITE << filename << count << offset;
command.needAtLeast(count);
uint8_t *cmdBuf = command->getInputPtr();
memcpy(cmdBuf, buf, count);
command->advanceInputPtr(count);
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
common_exit(command, response, err);
}
ssize_t SMComm::append(const string &filename, const void *buf, size_t count)
{
ByteStream *command = buffers.getByteStream();
ByteStream *response = buffers.getByteStream();
int err;
command << storagemanager::APPEND << filename << count;
command.needAtLeast(count);
uint8_t *cmdBuf = command->getInputPtr();
memcpy(cmdBuf, buf, count);
command->advanceInputPtr(count);
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
common_exit(command, response, err);
}
int SMComm::unlink(const string &filename)
{
ByteStream *command = buffers.getByteStream();
ByteStream *response = buffers.getByteStream();
int err;
command << storagemanager::UNLINK << filename;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
common_exit(command, response, err);
}
int SMComm::stat(const string &filename, struct stat *statbuf)
{
ByteStream *command = buffers.getByteStream();
ByteStream *response = buffers.getByteStream();
int err;
command << storageManager::STAT << filename;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
memcpy(statbuf, response->buf(), sizeof(*statbuf));
common_exit(command, response, err);
}
int SMComm::truncate(const string &filename, off64_t length)
{
ByteStream *command = buffers.getByteStream();
ByteStream *response = buffers.getByteStream();
int err;
command << storagemanager::TRUNCATE << filename << length;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
common_exit(command, response, err);
}
int SMComm::listDirectory(const string &path, list<string> *entries)
{
ByteStream *command = buffers.getByteStream();
ByteStream *response = buffers.getByteStream();
int err;
command << storagemanager::LIST_DIRECTORY << path;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
uint32_t numElements;
string stmp;
entries->clear();
*response >> numElements;
while (numElements > 0) {
*response >> stmp;
entries->push_back(stmp);
}
common_exit(command, response, err);
}
int SMComm::ping()
{
ByteStream *command = buffers.getByteStream();
ByteStream *response = buffers.getByteStream();
int err;
command << storagemanager::PING << filename << length;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
common_exit(command, response, err);
}
}

View File

@ -3,6 +3,11 @@
#ifndef SMCOMM_H_ #ifndef SMCOMM_H_
#define SMCOMM_H_ #define SMCOMM_H_
#include <sys/types.h>
#include <string>
#include "SocketPool.h"
#include "bytestream.h"
namespace idbdatafile { namespace idbdatafile {
class SMComm : public boost::noncopyable class SMComm : public boost::noncopyable
@ -20,7 +25,7 @@ class SMComm : public boost::noncopyable
ssize_t pwrite(const std::string &filename, const void *buf, size_t count, off_t offset); ssize_t pwrite(const std::string &filename, const void *buf, size_t count, off_t offset);
/* append exists for cases where the file is open in append mode. A normal write won't work /* append exists for cases where the file is open in append mode. A normal write won't work
because the file position may be out of date if there are multiple writers. */ because the file position/size may be out of date if there are multiple writers. */
ssize_t append(const std::string &filename, const void *buf, size_t count); ssize_t append(const std::string &filename, const void *buf, size_t count);
int unlink(const std::string &filename); int unlink(const std::string &filename);
@ -43,6 +48,7 @@ class SMComm : public boost::noncopyable
SMComm(); SMComm();
SocketPool sockets; SocketPool sockets;
ByteStreamPool buffers;
}; };
} }

View File

@ -10,6 +10,7 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/un.h> #include <sys/un.h>
using namespace std;
namespace namespace
{ {
@ -60,8 +61,11 @@ SocketPool::~SocketPool()
{ {
boost::mutex::scoped_lock(mutex); boost::mutex::scoped_lock(mutex);
for (int i = 0; i < allSockets.size(); i++) while (!allSockets.empty()) {
::close(allSockets[i]); int next = allSockets.front();
allSockets.pop_front();
::close(next);
}
::close(clientSocket) ::close(clientSocket)
} }
@ -145,7 +149,7 @@ int SocketPool::send_recv(const messageqcpp::ByteStream &in, messageqcpp::ByteSt
} }
returnSocket(sock); returnSocket(sock);
return; return 0;
} }
int SocketPool::getSocket() int SocketPool::getSocket()
@ -159,7 +163,7 @@ int SocketPool::getSocket()
struct sockaddr_un addr; struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr)); memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX; addr.sun_family = AF_UNIX;
strcpy(&addr.sun_path[1], "storagemanager"); // abstract socket; this is intentional strcpy(&addr.sun_path[0], storagemanager::socket_name);
ret = ::connect(tmp, &addr, sizeof(addr)); ret = ::connect(tmp, &addr, sizeof(addr));
if (ret >= 0) if (ret >= 0)
allSockets.push_back(ret); allSockets.push_back(ret);

View File

@ -11,6 +11,7 @@
namespace idbdatafile namespace idbdatafile
{ {
/* This should be renamed; it's more than a pool, it also does the low-level communication. TBD. */
class SocketPool : public boost::noncopyable class SocketPool : public boost::noncopyable
{ {
public: public: