1
0
mirror of https://github.com/MariaDB/server.git synced 2025-11-03 14:33:32 +03:00
Files
mariadb/ndb/src/common/transporter/Transporter.cpp

199 lines
5.2 KiB
C++

/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <TransporterRegistry.hpp>
#include <TransporterCallback.hpp>
#include "Transporter.hpp"
#include "TransporterInternalDefinitions.hpp"
#include <NdbSleep.h>
#include <SocketAuthenticator.hpp>
#include <InputStream.hpp>
#include <OutputStream.hpp>
#include <EventLogger.hpp>
extern EventLogger g_eventLogger;
Transporter::Transporter(TransporterRegistry &t_reg,
TransporterType _type,
const char *lHostName,
const char *rHostName,
int r_port,
NodeId lNodeId,
NodeId rNodeId,
int _byteorder,
bool _compression, bool _checksum, bool _signalId)
: m_r_port(r_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
isServer(lNodeId < rNodeId),
m_packer(_signalId, _checksum),
m_type(_type),
m_transporter_registry(t_reg)
{
DBUG_ENTER("Transporter::Transporter");
if (rHostName && strlen(rHostName) > 0){
strncpy(remoteHostName, rHostName, sizeof(remoteHostName));
Ndb_getInAddr(&remoteHostAddress, rHostName);
}
else
{
if (!isServer) {
ndbout << "Unable to setup transporter. Node " << rNodeId
<< " must have hostname. Update configuration." << endl;
exit(-1);
}
remoteHostName[0]= 0;
}
strncpy(localHostName, lHostName, sizeof(localHostName));
if (strlen(lHostName) > 0)
Ndb_getInAddr(&localHostAddress, lHostName);
DBUG_PRINT("info",("rId=%d lId=%d isServer=%d rHost=%s lHost=%s r_port=%d",
remoteNodeId, localNodeId, isServer,
remoteHostName, localHostName,
r_port));
byteOrder = _byteorder;
compressionUsed = _compression;
checksumUsed = _checksum;
signalIdUsed = _signalId;
m_connected = false;
m_timeOutMillis = 1000;
m_connect_address.s_addr= 0;
if (isServer)
m_socket_client= 0;
else
m_socket_client= new SocketClient(remoteHostName, r_port,
new SocketAuthSimple("ndbd",
"ndbd passwd"));
DBUG_VOID_RETURN;
}
Transporter::~Transporter(){
if (m_socket_client)
delete m_socket_client;
}
bool
Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
// all initial negotiation is done in TransporterRegistry::connect_server
DBUG_ENTER("Transporter::connect_server");
if(m_connected)
{
DBUG_RETURN(true); // TODO assert(0);
}
{
struct sockaddr addr;
SOCKET_SIZE_TYPE addrlen= sizeof(addr);
int r= getpeername(sockfd, &addr, &addrlen);
m_connect_address= ((struct sockaddr_in *)&addr)->sin_addr;
}
bool res = connect_server_impl(sockfd);
if(res){
m_connected = true;
m_errorCount = 0;
}
DBUG_RETURN(res);
}
bool
Transporter::connect_client() {
if(m_connected)
return true;
NDB_SOCKET_TYPE sockfd = m_socket_client->connect();
if (sockfd == NDB_INVALID_SOCKET)
return false;
DBUG_ENTER("Transporter::connect_client");
// send info about own id
// send info about own transporter type
SocketOutputStream s_output(sockfd);
s_output.println("%d %d", localNodeId, m_type);
// get remote id
int nodeId, remote_transporter_type= -1;
SocketInputStream s_input(sockfd);
char buf[256];
if (s_input.gets(buf, 256) == 0) {
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
switch (r) {
case 2:
break;
case 1:
// we're running version prior to 4.1.9
// ok, but with no checks on transporter configuration compatability
break;
default:
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
nodeId, remote_transporter_type));
if (remote_transporter_type != -1)
{
if (remote_transporter_type != m_type)
{
DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",
m_type, remote_transporter_type));
NDB_CLOSE_SOCKET(sockfd);
g_eventLogger.error("Incompatible configuration: transporter type "
"mismatch with node %d", nodeId);
DBUG_RETURN(false);
}
}
else if (m_type == tt_SHM_TRANSPORTER)
{
g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId);
}
{
struct sockaddr addr;
SOCKET_SIZE_TYPE addrlen= sizeof(addr);
int r= getpeername(sockfd, &addr, &addrlen);
m_connect_address= ((struct sockaddr_in *)&addr)->sin_addr;
}
bool res = connect_client_impl(sockfd);
if(res){
m_connected = true;
m_errorCount = 0;
}
DBUG_RETURN(res);
}
void
Transporter::doDisconnect() {
if(!m_connected)
return; //assert(0); TODO will fail
m_connected= false;
disconnectImpl();
}