1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Checkpointing, not ready to build yet.

This commit is contained in:
Patrick LeBlanc
2019-01-21 16:41:04 -06:00
parent d53471fc75
commit d84dcb9ccc
6 changed files with 266 additions and 9 deletions

View File

@ -0,0 +1,7 @@
// copy licensing stuff here
#include "SMComm.h"

View File

@ -8,7 +8,8 @@ namespace idbdatafile {
class SMComm : public boost::noncopyable
{
public:
SMComm *get();
// This is a singleton. Get it with get()
static SMComm *get();
/* Open currently returns a stat struct so SMDataFile can set its initial position, otherwise
behaves how you'd think. */
@ -42,13 +43,8 @@ class SMComm : public boost::noncopyable
SMComm();
SocketPool sockets;
};
}
}
#endif

View File

@ -0,0 +1,185 @@
// copy licensing stuff here
#include "SocketPool.h"
#include "configcpp.h"
#include "logging.h"
#include "storage-manager/include/messageFormat.h"
#include <sys/socket.h>
#include <sys/un.h>
namespace
{
void log(logging::LOG_TYPE whichLogFile, const string& msg)
{
logging::Logger logger(12); //12 = configcpp
logger.logMessage(whichLogFile, msg, logging::LoggingID(12));
}
}
namespace idbdatafile
{
SocketPool::SocketPool()
{
config::Config *config = config::Config::makeConfig(); // the most 'config' ever put into a single line of code?
string stmp;
int64_t itmp = 0;
try
{
stmp = config->getConfig("StorageManager", "MaxSockets");
itmp = strtol(stmp.c_str(), NULL, 10);
if (itmp > 500 || itmp < 1) {
string errmsg = "SocketPool(): Got a bad value '" + stmp + "' for StorageManager/MaxSockets.";
log(logging::CRITICAL, errmsg);
throw runtime_error(errmsg);
maxSockets = itmp;
}
catch (exception &e)
{
ostringstream os;
os << "SocketPool(): Using default of " << defaultSockets << ".";
log(logging::CRITICAL, os.str());
maxSockets = defaultSockets;
}
clientSocket = ::socket(AF_UNIX, SOCK_STREAM, 0);
if (clientSocket < 0) {
char buf[80], *ptr;
ptr = strerror_r(errno, buf, 80);
throw runtime_error("SocketPool(): Failed to get clientSocket, got " + string(ptr));
}
}
SocketPool::~SocketPool()
{
boost::mutex::scoped_lock(mutex);
for (int i = 0; i < allSockets.size(); i++)
::close(allSockets[i]);
::close(clientSocket)
}
int SocketPool::send_recv(const messageqcpp::ByteStream &in, messageqcpp::ByteStream *out)
{
uint count = 0;
uint length = in.length();
int sock = getSocket();
uint8_t inbuf* = in.buf();
int err = 0;
::write(sock, &storagemanager::SM_MSG_START, sizeof(storagemanager::SM_MSG_START));
::write(sock, &length, sizeof(length));
while (count < length)
{
err = ::write(sock, &inbuf[count], length-count);
if (err < 0)
{
returnSocket(sock);
return -1;
}
count += err;
}
out->restart();
uint8_t *outbuf;
uint8_t window[8192];
bool foundheader = false;
length = 0;
uint remainingBytes = 0;
int i;
while (!foundheader)
{
// here remainingBytes means the # of bytes from the previous message
err = ::read(sock, &window[remainingBytes], 8192 - remainingBytes);
if (err < 0)
{
returnSocket(sock);
return -1;
}
uint endOfData = remainingBytes + err; // for clarity
// scan for the 8-byte header. If it is fragmented, move the fragment to the front of the buffer
// for the next iteration to handle.
for (i = 0; i <= endOfData - 8 && !foundHeader; i++)
{
if (*((uint *) &window[i]) == storagemanager::SM_MSG_START)
{
length = *((uint *) &window[i+4]);
foundHeader = true;
}
}
if (!foundHeader)
{
remainingBytes = endOfData - i;
if (i != 0)
memmove(window, &window[i], remainingBytes);
else
continue; // if i == 0, then the read was too short to see the whole header, do another read().
}
else
{
out->needAtLeast(length);
outbuf = out->buf();
memcpy(outbuf, &window[i+8], endOfData - (i+8));
remainingBytes = length - (endOfData - (i+8)); // remainingBytes is now the # of bytes left to read
}
}
// read the rest of the payload
while (remainingBytes > 0)
{
err = ::read(sock, &outbuf[length - remainingBytes], remainingBytes);
if (err < 0)
{
returnSocket(sock);
return -1;
}
remainingBytes -= err;
}
returnSocket(sock);
return;
}
int SocketPool::getSocket()
{
boost::mutex::scoped_lock lock(mutex);
int ret;
if (freeSockets.size() == 0 && allSockets.size() < maxSockets)
{
// make a new connection
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strcpy(&addr.sun_path[1], "storagemanager"); // abstract socket; this is intentional
ret = ::connect(tmp, &addr, sizeof(addr));
if (ret >= 0)
allSockets.push_back(ret);
return ret;
}
// wait for a socket to become free
while (freeSockets.size() == 0)
socketAvailable.wait(lock);
ret = freeSockets.front();
freeSockets.pop_front();
return ret;
}
void SocketPool::returnSocket(int sock)
{
boost::mutex::scoped_lock lock(mutex);
freeSockets.push_back(sock);
socketAvailable.notify_one();
}
}

View File

@ -0,0 +1,42 @@
// copy licensing stuff here
#ifndef _SOCKETPOOL_H_
#define _SOCKETPOOL_H_
#include <boost/utility.hpp>
#include <boost/thread/mutex.hpp>
#include "bytestream.h"
namespace idbdatafile
{
class SocketPool : public boost::noncopyable
{
public:
SocketPool();
// the dtor will immediately close all sockets
virtual ~SocketPool();
// 0 = success, -1 = failure. Should this throw instead?
int send_recv(const ByteStream &to_send, ByteStream *to_recv);
private:
int getSocket();
void returnSocket();
std::vector<int> allSockets;
std::deque<int> freeSockets;
boost::mutex mutex;
boost::condition socketAvailable;
int clientSocket;
int maxSockets;
static const int defaultSockets = 20;
};
}
#endif

View File

@ -3,15 +3,42 @@
#include <exception>
namespace idbdatafile
{
class NotImplementedYet : public std::exception
{
public:
NotImplementedYet(const std::string &s);
};
NotImplementedYes::NotImplementedYet(const std::string &s) :
class FailedToSend : public std::runtime_error
{
public:
FailedToSend(const std::string &s);
};
class FailedToRecv : public std::runtime_error
{
public:
FailedToRecv(const std::string &s);
};
NotImplementedYet::NotImplementedYet(const std::string &s) :
std::exception(s + "() isn't implemented yet.")
{
}
FailedToSend::FailedToSend(const std::string &s) :
std::runtime_error(s)
{
}
FailedToRecv::FailedToRecv(const std::string &s) :
std::runtime_error(s)
{
}
}
#endif