mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
430 lines
8.6 KiB
C++
430 lines
8.6 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/*
|
|
Protocol definition:
|
|
*/
|
|
|
|
#include <poll.h>
|
|
#include <sys/socket.h>
|
|
#include <netinet/in.h>
|
|
#include <arpa/inet.h>
|
|
#include <iostream>
|
|
#include <string>
|
|
#include <stdexcept>
|
|
#include <sstream>
|
|
#include <iomanip>
|
|
#include <sys/types.h>
|
|
#include <sys/stat.h>
|
|
#include <fcntl.h>
|
|
#include <unistd.h>
|
|
#include <signal.h>
|
|
#include <cerrno>
|
|
using namespace std;
|
|
|
|
#include <boost/thread.hpp>
|
|
#include <boost/scoped_ptr.hpp>
|
|
#include <boost/scoped_array.hpp>
|
|
#include <boost/tokenizer.hpp>
|
|
#include <boost/algorithm/string.hpp>
|
|
using namespace boost;
|
|
|
|
#include "mcsconfig.h"
|
|
|
|
#include "socktype.h"
|
|
#include "parsequery.h"
|
|
#include "sendcsep.h"
|
|
#include "returnedrows.h"
|
|
#include "socketio.h"
|
|
#include "ddlstmts.h"
|
|
|
|
#include "exceptclasses.h" //brings in idbassert_s macro
|
|
|
|
#include "messagequeue.h"
|
|
using namespace messageqcpp;
|
|
|
|
#include "atomicops.h"
|
|
|
|
namespace execplan
|
|
{
|
|
class CalpontSelectExecutionPlan;
|
|
}
|
|
|
|
#define SINGLE_THREADED
|
|
|
|
namespace qfe
|
|
{
|
|
string DefaultSchema;
|
|
}
|
|
|
|
namespace
|
|
{
|
|
using namespace qfe;
|
|
|
|
enum StmtType
|
|
{
|
|
UNKNOWN,
|
|
QUERY,
|
|
CREATE,
|
|
DROP,
|
|
SHOW,
|
|
};
|
|
|
|
volatile uint32_t SystemSID;
|
|
|
|
void log(const string& s)
|
|
{
|
|
cerr << s << endl;
|
|
}
|
|
|
|
struct QueryMessage
|
|
{
|
|
QueryMessage() : isValid(false)
|
|
{
|
|
}
|
|
~QueryMessage()
|
|
{
|
|
}
|
|
|
|
string toString() const;
|
|
|
|
bool isValid;
|
|
string queryText;
|
|
string defaultSchema;
|
|
};
|
|
|
|
string QueryMessage::toString() const
|
|
{
|
|
ostringstream oss;
|
|
oss << "valid: " << boolalpha << isValid << ", "
|
|
<< "queryText: " << queryText << ", "
|
|
<< "defaultSchema: " << defaultSchema;
|
|
return oss.str();
|
|
}
|
|
|
|
ostream& operator<<(ostream& os, const QueryMessage& rhs)
|
|
{
|
|
os << rhs.toString();
|
|
return os;
|
|
}
|
|
|
|
class ThreadFunc
|
|
{
|
|
public:
|
|
ThreadFunc(SockType fd) : fFd(fd)
|
|
{
|
|
}
|
|
~ThreadFunc()
|
|
{
|
|
}
|
|
|
|
void run();
|
|
void operator()()
|
|
{
|
|
run();
|
|
}
|
|
|
|
private:
|
|
ThreadFunc(const ThreadFunc& rhs);
|
|
ThreadFunc& operator=(const ThreadFunc& rhs);
|
|
|
|
SockType fFd;
|
|
};
|
|
|
|
bool serverInit()
|
|
{
|
|
|
|
setsid();
|
|
|
|
// Handle certain signals (we want these to return EINTR so we can throw)
|
|
// SIGPIPE
|
|
// I don't think we'll get any of these from init (except possibly HUP, but that's an indication
|
|
// of bad things anyway)
|
|
// SIGHUP?
|
|
// SIGUSR1?
|
|
// SIGUSR2?
|
|
// SIGPOLL?
|
|
struct sigaction sa;
|
|
memset(&sa, 0, sizeof(struct sigaction));
|
|
sa.sa_handler = SIG_IGN;
|
|
sigaction(SIGPIPE, &sa, 0);
|
|
sigaction(SIGHUP, &sa, 0);
|
|
sigaction(SIGUSR1, &sa, 0);
|
|
sigaction(SIGUSR2, &sa, 0);
|
|
#ifndef __FreeBSD__
|
|
sigaction(SIGPOLL, &sa, 0);
|
|
#endif
|
|
#if 0
|
|
int fd;
|
|
close(2);
|
|
fd = open("/tmp/qfe.err", O_CREAT | O_TRUNC | O_WRONLY, 0644);
|
|
|
|
if (fd >= 0 && fd != 2)
|
|
{
|
|
dup2(fd, 2);
|
|
close(fd);
|
|
}
|
|
|
|
#endif
|
|
return true;
|
|
}
|
|
|
|
SockType initListenSock(short portNo)
|
|
{
|
|
SockType listenSock = -1;
|
|
listenSock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
|
|
idbassert_s(listenSock >= 0, string("socket create error: ") + strerror(errno));
|
|
// if (listenSock < 0) throw runtime_error(string("socket create error: ") + strerror(errno));
|
|
int optval = 1;
|
|
setsockopt(listenSock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<const char*>(&optval), sizeof(optval));
|
|
int rc = 0;
|
|
struct sockaddr_in serv_addr;
|
|
memset(&serv_addr, 0, sizeof(serv_addr));
|
|
serv_addr.sin_family = AF_INET;
|
|
serv_addr.sin_addr.s_addr = INADDR_ANY;
|
|
serv_addr.sin_port = htons(portNo);
|
|
const int MaxTries = 5 * 60 / 10;
|
|
int tries = 0;
|
|
again:
|
|
rc = ::bind(listenSock, (sockaddr*)&serv_addr, sizeof(serv_addr));
|
|
|
|
if (rc < 0)
|
|
{
|
|
if (errno == EADDRINUSE)
|
|
{
|
|
// cerr << "Addr in use..." << endl;
|
|
if (++tries >= MaxTries)
|
|
{
|
|
log("Waited too long for socket to bind...giving up");
|
|
// cerr << "Waited too long for socket to bind...giving up" << endl;
|
|
exit(1);
|
|
}
|
|
|
|
sleep(10);
|
|
goto again;
|
|
}
|
|
|
|
idbassert_s(0, string("socket bind error: ") + strerror(errno));
|
|
// throw runtime_error(string("socket bind error: ") + strerror(errno));
|
|
}
|
|
|
|
rc = listen(listenSock, 16);
|
|
idbassert_s(rc >= 0, string("socket listen error") + strerror(errno));
|
|
// if (rc < 0) throw runtime_error(string("socket listen error") + strerror(errno));
|
|
|
|
return listenSock;
|
|
}
|
|
|
|
QueryMessage getNextMsg(SockType fd)
|
|
{
|
|
QueryMessage msg;
|
|
|
|
try
|
|
{
|
|
msg.defaultSchema = socketio::readString(fd);
|
|
msg.queryText = socketio::readString(fd);
|
|
msg.isValid = true;
|
|
}
|
|
catch (runtime_error& rex)
|
|
{
|
|
cerr << "re reading ctl msg: " << rex.what() << endl;
|
|
msg.queryText = "";
|
|
}
|
|
catch (...)
|
|
{
|
|
cerr << "ex reading ctl msg" << endl;
|
|
msg.queryText = "";
|
|
}
|
|
|
|
return msg;
|
|
}
|
|
|
|
StmtType guessStatementType(const string& stmt)
|
|
{
|
|
typedef boost::tokenizer<boost::char_separator<char> > tokenizer;
|
|
char_separator<char> sep;
|
|
tokenizer tokens(stmt, sep);
|
|
tokenizer::iterator tok_iter = tokens.begin();
|
|
string first_word;
|
|
first_word = *tok_iter;
|
|
algorithm::to_lower(first_word);
|
|
|
|
if (first_word == "select")
|
|
return QUERY;
|
|
|
|
if (first_word == "create")
|
|
return CREATE;
|
|
|
|
if (first_word == "drop")
|
|
return DROP;
|
|
|
|
if (first_word == "show")
|
|
return SHOW;
|
|
|
|
return UNKNOWN;
|
|
}
|
|
|
|
struct ScopedCleaner
|
|
{
|
|
ScopedCleaner(SockType fd = -1) : fFd(fd)
|
|
{
|
|
}
|
|
~ScopedCleaner()
|
|
{
|
|
if (fFd >= 0)
|
|
shutdown(fFd, SHUT_RDWR);
|
|
|
|
close(fFd);
|
|
}
|
|
|
|
SockType fFd;
|
|
};
|
|
|
|
void ThreadFunc::run()
|
|
{
|
|
QueryMessage m;
|
|
execplan::CalpontSelectExecutionPlan* csep = 0;
|
|
MessageQueueClient* msgqcl;
|
|
|
|
ScopedCleaner cleaner(fFd);
|
|
|
|
uint32_t sid = 1;
|
|
sid = atomicops::atomicInc(&SystemSID);
|
|
|
|
try
|
|
{
|
|
m = getNextMsg(fFd);
|
|
|
|
if (m.isValid)
|
|
{
|
|
DefaultSchema = m.defaultSchema;
|
|
StmtType st = guessStatementType(m.queryText);
|
|
|
|
switch (st)
|
|
{
|
|
case QUERY:
|
|
csep = parseQuery(m.queryText, sid);
|
|
// sendCSEP takes ownership of the ptr from parseQuery
|
|
msgqcl = sendCSEP(csep);
|
|
// processReturnedRows takes ownership of the ptr from sendCSEP
|
|
processReturnedRows(msgqcl, fFd);
|
|
break;
|
|
|
|
case CREATE: processCreateStmt(m.queryText, sid); break;
|
|
|
|
case DROP: processDropStmt(m.queryText, sid); break;
|
|
|
|
case SHOW:
|
|
{
|
|
ostringstream oss;
|
|
oss << "select calpontsys.systable.tablename from calpontsys.systable where "
|
|
"calpontsys.systable.schema='"
|
|
<< m.defaultSchema << "';";
|
|
csep = parseQuery(oss.str(), sid);
|
|
msgqcl = sendCSEP(csep);
|
|
processReturnedRows(msgqcl, fFd);
|
|
break;
|
|
}
|
|
|
|
default: throw runtime_error("couldn't guess the statement type"); break;
|
|
}
|
|
}
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
socketio::writeString(fFd, ex.what());
|
|
throw; // in a multi-threaded server this will simply cause this thread to exit
|
|
}
|
|
catch (...)
|
|
{
|
|
socketio::writeString(fFd, "internal query processing error");
|
|
throw;
|
|
}
|
|
}
|
|
|
|
} // namespace
|
|
|
|
int main(int argc, char** argv)
|
|
{
|
|
int c;
|
|
SockType listenSock;
|
|
short portNo;
|
|
|
|
portNo = 0;
|
|
char* p = getenv("IDB_QFE_PORT");
|
|
|
|
if (p && *p)
|
|
portNo = atoi(p);
|
|
|
|
if (portNo <= 0)
|
|
portNo = 9198;
|
|
|
|
listenSock = -1;
|
|
opterr = 0;
|
|
|
|
while ((c = getopt(argc, argv, "p:")) != -1)
|
|
switch (c)
|
|
{
|
|
case 'p': portNo = atoi(optarg); break;
|
|
|
|
case '?':
|
|
default: break;
|
|
}
|
|
|
|
if (!serverInit())
|
|
{
|
|
log("Could not initialize the QFE Server!");
|
|
cerr << "Could not initialize the QFE Server!" << endl;
|
|
return 1;
|
|
}
|
|
|
|
listenSock = initListenSock(portNo);
|
|
|
|
SystemSID = 0;
|
|
|
|
for (;;)
|
|
{
|
|
int querySock = -1;
|
|
querySock = accept(listenSock, 0, 0);
|
|
idbassert_s(querySock >= 0, string("socket accept error: ") + strerror(errno));
|
|
|
|
// ThreadFunc now owns querySock and is responsible for cleaning it up
|
|
ThreadFunc tf(querySock);
|
|
|
|
#ifdef SINGLE_THREADED
|
|
|
|
try
|
|
{
|
|
tf.run();
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
cerr << "ThreadFunc run threw an exception: " << ex.what() << endl;
|
|
}
|
|
catch (...)
|
|
{
|
|
cerr << "ThreadFunc run threw an exception" << endl;
|
|
}
|
|
|
|
#else
|
|
thread t(tf);
|
|
#endif
|
|
}
|
|
|
|
return 0;
|
|
}
|