1
0
mirror of https://github.com/MariaDB/server.git synced 2025-12-04 17:23:46 +03:00

Initial revision of NDB Cluster files

BitKeeper/etc/logging_ok:
  Logging to logging@openlogging.org accepted
This commit is contained in:
unknown
2004-04-14 10:53:21 +02:00
parent 0ba6cb48d8
commit 6386c55cee
1835 changed files with 500032 additions and 0 deletions

View File

@@ -0,0 +1,307 @@
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "SocketServer.hpp"
#include <NdbTCP.h>
#include <string.h>
#include <NdbOut.hpp>
#include <NdbThread.h>
#include <NdbSleep.h>
#include <stdio.h>
#include <assert.h>
#define DEBUG(x) ndbout << x << endl;
SocketServer::SocketServer(int maxSessions) :
m_sessions(10),
m_services(5)
{
m_thread = 0;
m_stopThread = false;
m_maxSessions = maxSessions;
}
SocketServer::~SocketServer() {
for(unsigned i = 0; i<m_sessions.size(); i++){
delete m_sessions[i].m_session;
}
for(unsigned i = 0; i<m_services.size(); i++){
delete m_services[i].m_service;
}
}
bool
SocketServer::tryBind(unsigned short port, const char * intface) const {
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(port);
if(intface != 0){
if(Ndb_getInAddr(&servaddr.sin_addr, intface))
return false;
}
const NDB_SOCKET_TYPE sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == NDB_INVALID_SOCKET) {
return false;
}
const int on = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
(const char*)&on, sizeof(on)) == -1) {
NDB_CLOSE_SOCKET(sock);
return false;
}
if (bind(sock, (struct sockaddr*) &servaddr, sizeof(servaddr)) == -1) {
NDB_CLOSE_SOCKET(sock);
return false;
}
NDB_CLOSE_SOCKET(sock);
return true;
}
bool
SocketServer::setup(SocketServer::Service * service,
unsigned short port,
const char * intface){
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(port);
if(intface != 0){
if(Ndb_getInAddr(&servaddr.sin_addr, intface))
return false;
}
const NDB_SOCKET_TYPE sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == NDB_INVALID_SOCKET) {
return false;
}
const int on = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
(const char*)&on, sizeof(on)) == -1) {
NDB_CLOSE_SOCKET(sock);
return false;
}
if (bind(sock, (struct sockaddr*) &servaddr, sizeof(servaddr)) == -1) {
NDB_CLOSE_SOCKET(sock);
return false;
}
if (listen(sock, m_maxSessions) == -1){
NDB_CLOSE_SOCKET(sock);
return false;
}
ServiceInstance i;
i.m_socket = sock;
i.m_service = service;
m_services.push_back(i);
return true;
}
void
SocketServer::doAccept(){
fd_set readSet, exceptionSet;
FD_ZERO(&readSet);
FD_ZERO(&exceptionSet);
m_services.lock();
int maxSock = 0;
for (unsigned i = 0; i < m_services.size(); i++){
const NDB_SOCKET_TYPE s = m_services[i].m_socket;
FD_SET(s, &readSet);
FD_SET(s, &exceptionSet);
maxSock = (maxSock > s ? maxSock : s);
}
struct timeval timeout;
timeout.tv_sec = 1;
timeout.tv_usec = 0;
if(select(maxSock + 1, &readSet, 0, &exceptionSet, &timeout) > 0){
for (unsigned i = 0; i < m_services.size(); i++){
ServiceInstance & si = m_services[i];
if(FD_ISSET(si.m_socket, &readSet)){
NDB_SOCKET_TYPE childSock = accept(si.m_socket, 0, 0);
if(childSock == NDB_INVALID_SOCKET){
continue;
}
SessionInstance s;
s.m_service = si.m_service;
s.m_session = si.m_service->newSession(childSock);
if(s.m_session != 0){
m_sessions.push_back(s);
startSession(m_sessions.back());
}
continue;
}
if(FD_ISSET(si.m_socket, &exceptionSet)){
DEBUG("socket in the exceptionSet");
continue;
}
}
}
m_services.unlock();
}
extern "C"
void*
socketServerThread_C(void* _ss){
SocketServer * ss = (SocketServer *)_ss;
ss->doRun();
NdbThread_Exit(0);
return 0;
}
void
SocketServer::startServer(){
m_threadLock.lock();
if(m_thread == 0 && m_stopThread == false){
m_thread = NdbThread_Create(socketServerThread_C,
(void**)this,
32768,
"NdbSockServ",
NDB_THREAD_PRIO_LOW);
}
m_threadLock.unlock();
}
void
SocketServer::stopServer(){
m_threadLock.lock();
if(m_thread != 0){
m_stopThread = true;
void * res;
NdbThread_WaitFor(m_thread, &res);
NdbThread_Destroy(&m_thread);
m_thread = 0;
}
m_threadLock.unlock();
}
void
SocketServer::doRun(){
while(!m_stopThread){
checkSessions();
if(m_sessions.size() < m_maxSessions){
doAccept();
} else {
NdbSleep_MilliSleep(200);
}
}
}
void
SocketServer::startSession(SessionInstance & si){
si.m_thread = NdbThread_Create(sessionThread_C,
(void**)si.m_session,
32768,
"NdbSock_Session",
NDB_THREAD_PRIO_LOW);
}
static
bool
transfer(NDB_SOCKET_TYPE sock){
#if defined NDB_OSE || defined NDB_SOFTOSE
const PROCESS p = current_process();
const size_t ps = sizeof(PROCESS);
int res = setsockopt(sock, SOL_SOCKET, SO_OSEOWNER, &p, ps);
if(res != 0){
ndbout << "Failed to transfer ownership of socket" << endl;
return false;
}
#endif
return true;
}
void
SocketServer::checkSessions(){
for(int i = m_sessions.size() - 1; i >= 0; i--){
if(m_sessions[i].m_session->m_stopped){
if(m_sessions[i].m_thread != 0){
void* ret;
NdbThread_WaitFor(m_sessions[i].m_thread, &ret);
NdbThread_Destroy(&m_sessions[i].m_thread);
}
m_sessions[i].m_session->stopSession();
delete m_sessions[i].m_session;
m_sessions.erase(i);
}
}
}
void
SocketServer::stopSessions(bool wait){
for(int i = m_sessions.size() - 1; i>=0; i--)
m_sessions[i].m_session->m_stop = true;
for(int i = m_services.size() - 1; i>=0; i--)
m_services[i].m_service->stopSessions();
if(wait){
while(m_sessions.size() > 0){
checkSessions();
NdbSleep_MilliSleep(100);
}
}
}
/***** Session code ******/
extern "C"
void*
sessionThread_C(void* _sc){
SocketServer::Session * si = (SocketServer::Session *)_sc;
if(!transfer(si->m_socket)){
si->m_stopped = true;
NdbThread_Exit(0);
return 0;
}
if(!si->m_stop){
si->m_stopped = false;
si->runSession();
} else {
NDB_CLOSE_SOCKET(si->m_socket);
}
si->m_stopped = true;
NdbThread_Exit(0);
return 0;
}