diff --git a/ndb/include/transporter/TransporterDefinitions.hpp b/ndb/include/transporter/TransporterDefinitions.hpp index d4763ba4c37..18d1ec76a3c 100644 --- a/ndb/include/transporter/TransporterDefinitions.hpp +++ b/ndb/include/transporter/TransporterDefinitions.hpp @@ -49,74 +49,50 @@ enum SendStatus { const Uint32 MAX_MESSAGE_SIZE = (12+4+4+(4*25)+(3*4)+4*4096); /** - * TCP Transporter Configuration + * TransporterConfiguration + * + * used for setting up a transporter. the union member specific is for + * information specific to a transporter type. */ -struct TCP_TransporterConfiguration { - Uint32 port; +struct TransporterConfiguration { + Uint32 port; const char *remoteHostName; const char *localHostName; NodeId remoteNodeId; NodeId localNodeId; - Uint32 sendBufferSize; // Size of SendBuffer of priority B - Uint32 maxReceiveSize; // Maximum no of bytes to receive + NodeId serverNodeId; bool checksum; bool signalId; -}; + bool isMgmConnection; // is a mgm connection, requires transforming -/** - * SHM Transporter Configuration - */ -struct SHM_TransporterConfiguration { - Uint32 port; - const char *remoteHostName; - const char *localHostName; - NodeId remoteNodeId; - NodeId localNodeId; - bool checksum; - bool signalId; - - Uint32 shmKey; - Uint32 shmSize; - int signum; -}; + union { // Transporter specific configuration information -/** - * OSE Transporter Configuration - */ -struct OSE_TransporterConfiguration { - const char *remoteHostName; - const char *localHostName; - NodeId remoteNodeId; - NodeId localNodeId; - bool checksum; - bool signalId; - - Uint32 prioASignalSize; - Uint32 prioBSignalSize; - Uint32 receiveBufferSize; // In number of signals -}; - -/** - * SCI Transporter Configuration - */ -struct SCI_TransporterConfiguration { - const char *remoteHostName; - const char *localHostName; - Uint32 port; - Uint32 sendLimit; // Packet size - Uint32 bufferSize; // Buffer size - - Uint32 nLocalAdapters; // 1 or 2, the number of adapters on local host - - Uint32 remoteSciNodeId0; // SCInodeId for adapter 1 - Uint32 remoteSciNodeId1; // SCInodeId for adapter 2 - - NodeId localNodeId; // Local node Id - NodeId remoteNodeId; // Remote node Id - - bool checksum; - bool signalId; + struct { + Uint32 sendBufferSize; // Size of SendBuffer of priority B + Uint32 maxReceiveSize; // Maximum no of bytes to receive + } tcp; + + struct { + Uint32 shmKey; + Uint32 shmSize; + int signum; + } shm; + + struct { + Uint32 prioASignalSize; + Uint32 prioBSignalSize; + } ose; + struct { + Uint32 sendLimit; // Packet size + Uint32 bufferSize; // Buffer size + + Uint32 nLocalAdapters; // 1 or 2, the number of adapters on local host + + Uint32 remoteSciNodeId0; // SCInodeId for adapter 1 + Uint32 remoteSciNodeId1; // SCInodeId for adapter 2 + } sci; + }; }; struct SignalHeader { diff --git a/ndb/include/transporter/TransporterRegistry.hpp b/ndb/include/transporter/TransporterRegistry.hpp index ff4e1d89a37..665a2728d29 100644 --- a/ndb/include/transporter/TransporterRegistry.hpp +++ b/ndb/include/transporter/TransporterRegistry.hpp @@ -102,6 +102,7 @@ public: unsigned sizeOfLongSignalMemory = 100); void set_mgm_handle(NdbMgmHandle h) { m_mgm_handle = h; }; + NdbMgmHandle get_mgm_handle(void) { return m_mgm_handle; }; bool init(NodeId localNodeId); @@ -179,10 +180,10 @@ public: * started, startServer is called. A transporter of the selected kind * is created and it is put in the transporter arrays. */ - bool createTransporter(struct TCP_TransporterConfiguration * config); - bool createTransporter(struct SCI_TransporterConfiguration * config); - bool createTransporter(struct SHM_TransporterConfiguration * config); - bool createTransporter(struct OSE_TransporterConfiguration * config); + bool createTCPTransporter(struct TransporterConfiguration * config); + bool createSCITransporter(struct TransporterConfiguration * config); + bool createSHMTransporter(struct TransporterConfiguration * config); + bool createOSETransporter(struct TransporterConfiguration * config); /** * prepareSend diff --git a/ndb/include/util/SocketClient.hpp b/ndb/include/util/SocketClient.hpp index 563544922dc..5847c55f397 100644 --- a/ndb/include/util/SocketClient.hpp +++ b/ndb/include/util/SocketClient.hpp @@ -36,6 +36,7 @@ public: m_servaddr.sin_port = htons(m_port); }; NDB_SOCKET_TYPE connect(); + NDB_SOCKET_TYPE connect_without_auth(); bool close(); }; diff --git a/ndb/src/common/mgmcommon/IPCConfig.cpp b/ndb/src/common/mgmcommon/IPCConfig.cpp index b24e41cfb25..af24e738e05 100644 --- a/ndb/src/common/mgmcommon/IPCConfig.cpp +++ b/ndb/src/common/mgmcommon/IPCConfig.cpp @@ -110,175 +110,6 @@ IPCConfig::addRemoteNodeId(NodeId nodeId){ return true; } -/** - * Returns no of transporters configured - */ -int -IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){ - int noOfTransportersCreated = 0; - - Uint32 noOfConnections; - if(!props->get("NoOfConnections", &noOfConnections)) return -1; - - for (Uint32 i = 0; i < noOfConnections; i++){ - const Properties * tmp; - Uint32 nodeId1, nodeId2; - const char * host1; - const char * host2; - - if(!props->get("Connection", i, &tmp)) continue; - if(!tmp->get("NodeId1", &nodeId1)) continue; - if(!tmp->get("NodeId2", &nodeId2)) continue; - if(nodeId1 != the_ownId && nodeId2 != the_ownId) continue; - - Uint32 sendSignalId; - Uint32 compression; - Uint32 checksum; - if(!tmp->get("SendSignalId", &sendSignalId)) continue; - if(!tmp->get("Checksum", &checksum)) continue; - - const char * type; - if(!tmp->get("Type", &type)) continue; - - if(strcmp("SHM", type) == 0){ - SHM_TransporterConfiguration conf; - conf.localNodeId = the_ownId; - conf.remoteNodeId = (nodeId1 != the_ownId ? nodeId1 : nodeId2); - conf.checksum = checksum; - conf.signalId = sendSignalId; - - if(!tmp->get("ShmKey", &conf.shmKey)) continue; - if(!tmp->get("ShmSize", &conf.shmSize)) continue; - - if(!theTransporterRegistry->createTransporter(&conf)){ - ndbout << "Failed to create SHM Transporter from: " - << conf.localNodeId << " to: " << conf.remoteNodeId << endl; - continue; - } else { - noOfTransportersCreated++; - continue; - } - - } else if(strcmp("SCI", type) == 0){ - SCI_TransporterConfiguration conf; - conf.localNodeId = the_ownId; - conf.remoteNodeId = (nodeId1 != the_ownId ? nodeId1 : nodeId2); - conf.checksum = checksum; - conf.signalId = sendSignalId; - - if(!tmp->get("SendLimit", &conf.sendLimit)) continue; - if(!tmp->get("SharedBufferSize", &conf.bufferSize)) continue; - - if(the_ownId == nodeId1){ - if(!tmp->get("Node1_NoOfAdapters", &conf.nLocalAdapters)) continue; - if(!tmp->get("Node2_Adapter", 0, &conf.remoteSciNodeId0)) continue; - - if(conf.nLocalAdapters > 1){ - if(!tmp->get("Node2_Adapter", 1, &conf.remoteSciNodeId1)) continue; - } - } else { - if(!tmp->get("Node2_NoOfAdapters", &conf.nLocalAdapters)) continue; - if(!tmp->get("Node1_Adapter", 0, &conf.remoteSciNodeId0)) continue; - - if(conf.nLocalAdapters > 1){ - if(!tmp->get("Node1_Adapter", 1, &conf.remoteSciNodeId1)) continue; - } - } - - if(!theTransporterRegistry->createTransporter(&conf)){ - ndbout << "Failed to create SCI Transporter from: " - << conf.localNodeId << " to: " << conf.remoteNodeId << endl; - continue; - } else { - noOfTransportersCreated++; - continue; - } - } - - if(!tmp->get("HostName1", &host1)) continue; - if(!tmp->get("HostName2", &host2)) continue; - - Uint32 ownNodeId; - Uint32 remoteNodeId; - const char * ownHostName; - const char * remoteHostName; - - if(nodeId1 == the_ownId){ - ownNodeId = nodeId1; - ownHostName = host1; - remoteNodeId = nodeId2; - remoteHostName = host2; - } else if(nodeId2 == the_ownId){ - ownNodeId = nodeId2; - ownHostName = host2; - remoteNodeId = nodeId1; - remoteHostName = host1; - } else { - continue; - } - - if(strcmp("TCP", type) == 0){ - TCP_TransporterConfiguration conf; - - if(!tmp->get("PortNumber", &conf.port)) continue; - if(!tmp->get("SendBufferSize", &conf.sendBufferSize)) continue; - if(!tmp->get("MaxReceiveSize", &conf.maxReceiveSize)) continue; - - const char * proxy; - if (tmp->get("Proxy", &proxy)) { - if (strlen(proxy) > 0 && nodeId2 == the_ownId) { - // TODO handle host:port - conf.port = atoi(proxy); - } - } - conf.sendBufferSize *= MAX_MESSAGE_SIZE; - conf.maxReceiveSize *= MAX_MESSAGE_SIZE; - - conf.remoteHostName = remoteHostName; - conf.localHostName = ownHostName; - conf.remoteNodeId = remoteNodeId; - conf.localNodeId = ownNodeId; - conf.checksum = checksum; - conf.signalId = sendSignalId; - - if(!theTransporterRegistry->createTransporter(&conf)){ - ndbout << "Failed to create TCP Transporter from: " - << ownNodeId << " to: " << remoteNodeId << endl; - } else { - noOfTransportersCreated++; - } - - } else if(strcmp("OSE", type) == 0){ - - OSE_TransporterConfiguration conf; - - if(!tmp->get("PrioASignalSize", &conf.prioASignalSize)) - continue; - if(!tmp->get("PrioBSignalSize", &conf.prioBSignalSize)) - continue; - if(!tmp->get("ReceiveArraySize", &conf.receiveBufferSize)) - continue; - - conf.remoteHostName = remoteHostName; - conf.localHostName = ownHostName; - conf.remoteNodeId = remoteNodeId; - conf.localNodeId = ownNodeId; - conf.checksum = checksum; - conf.signalId = sendSignalId; - - if(!theTransporterRegistry->createTransporter(&conf)){ - ndbout << "Failed to create OSE Transporter from: " - << ownNodeId << " to: " << remoteNodeId << endl; - } else { - noOfTransportersCreated++; - } - } else { - continue; - } - } - return noOfTransportersCreated; -} - /** * Supply a nodeId, * and get next higher node id @@ -335,6 +166,8 @@ Uint32 IPCConfig::configureTransporters(Uint32 nodeId, const class ndb_mgm_configuration & config, class TransporterRegistry & tr){ + TransporterConfiguration conf; + DBUG_ENTER("IPCConfig::configureTransporters"); Uint32 noOfTransportersCreated= 0; @@ -368,9 +201,35 @@ IPCConfig::configureTransporters(Uint32 nodeId, Uint32 server_port= 0; if(iter.get(CFG_CONNECTION_SERVER_PORT, &server_port)) break; - if (nodeId <= nodeId1 && nodeId <= nodeId2) { + + /* + We check the node type. MGM node becomes server. + */ + Uint32 node1type, node2type; + ndb_mgm_configuration_iterator node1iter(config, CFG_SECTION_NODE); + ndb_mgm_configuration_iterator node2iter(config, CFG_SECTION_NODE); + node1iter.find(CFG_NODE_ID,nodeId1); + node2iter.find(CFG_NODE_ID,nodeId2); + node1iter.get(CFG_TYPE_OF_SECTION,&node1type); + node2iter.get(CFG_TYPE_OF_SECTION,&node2type); + + conf.serverNodeId= (nodeId1 < nodeId2)? nodeId1:nodeId2; + + conf.isMgmConnection= false; + if(node2type==NODE_TYPE_MGM) + { + conf.isMgmConnection= true; + conf.serverNodeId= nodeId2; + } + else if(node1type==NODE_TYPE_MGM) + { + conf.isMgmConnection= true; + conf.serverNodeId= nodeId1; + } + else if (nodeId == conf.serverNodeId) { tr.add_transporter_interface(remoteNodeId, localHostName, server_port); } + DBUG_PRINT("info", ("Transporter between this node %d and node %d using port %d, signalId %d, checksum %d", nodeId, remoteNodeId, server_port, sendSignalId, checksum)); /* @@ -386,27 +245,24 @@ IPCConfig::configureTransporters(Uint32 nodeId, if((int)server_port<0) server_port= -server_port; + conf.localNodeId = nodeId; + conf.remoteNodeId = remoteNodeId; + conf.checksum = checksum; + conf.signalId = sendSignalId; + conf.port = server_port; + conf.localHostName = localHostName; + conf.remoteHostName = remoteHostName; + switch(type){ - case CONNECTION_TYPE_SHM:{ - SHM_TransporterConfiguration conf; - conf.localNodeId = nodeId; - conf.remoteNodeId = remoteNodeId; - conf.checksum = checksum; - conf.signalId = sendSignalId; - - if(iter.get(CFG_SHM_KEY, &conf.shmKey)) break; - if(iter.get(CFG_SHM_BUFFER_MEM, &conf.shmSize)) break; - { - Uint32 tmp; - if(iter.get(CFG_SHM_SIGNUM, &tmp)) break; - conf.signum= tmp; - } + case CONNECTION_TYPE_SHM: + if(iter.get(CFG_SHM_KEY, &conf.shm.shmKey)) break; + if(iter.get(CFG_SHM_BUFFER_MEM, &conf.shm.shmSize)) break; - conf.port= server_port; - conf.localHostName = localHostName; - conf.remoteHostName = remoteHostName; + Uint32 tmp; + if(iter.get(CFG_SHM_SIGNUM, &tmp)) break; + conf.shm.signum= tmp; - if(!tr.createTransporter(&conf)){ + if(!tr.createSHMTransporter(&conf)){ DBUG_PRINT("error", ("Failed to create SHM Transporter from %d to %d", conf.localNodeId, conf.remoteNodeId)); ndbout << "Failed to create SHM Transporter from: " @@ -414,60 +270,53 @@ IPCConfig::configureTransporters(Uint32 nodeId, } else { noOfTransportersCreated++; } - DBUG_PRINT("info", ("Created SHM Transporter using shmkey %d, buf size = %d", - conf.shmKey, conf.shmSize)); - break; - } - case CONNECTION_TYPE_SCI:{ - SCI_TransporterConfiguration conf; - conf.localNodeId = nodeId; - conf.remoteNodeId = remoteNodeId; - conf.checksum = checksum; - conf.signalId = sendSignalId; - conf.port= server_port; - - conf.localHostName = localHostName; - conf.remoteHostName = remoteHostName; + DBUG_PRINT("info", ("Created SHM Transporter using shmkey %d, " + "buf size = %d", conf.shm.shmKey, conf.shm.shmSize)); - if(iter.get(CFG_SCI_SEND_LIMIT, &conf.sendLimit)) break; - if(iter.get(CFG_SCI_BUFFER_MEM, &conf.bufferSize)) break; + break; + + case CONNECTION_TYPE_SCI: + if(iter.get(CFG_SCI_SEND_LIMIT, &conf.sci.sendLimit)) break; + if(iter.get(CFG_SCI_BUFFER_MEM, &conf.sci.bufferSize)) break; if (nodeId == nodeId1) { - if(iter.get(CFG_SCI_HOST2_ID_0, &conf.remoteSciNodeId0)) break; - if(iter.get(CFG_SCI_HOST2_ID_1, &conf.remoteSciNodeId1)) break; + if(iter.get(CFG_SCI_HOST2_ID_0, &conf.sci.remoteSciNodeId0)) break; + if(iter.get(CFG_SCI_HOST2_ID_1, &conf.sci.remoteSciNodeId1)) break; } else { - if(iter.get(CFG_SCI_HOST1_ID_0, &conf.remoteSciNodeId0)) break; - if(iter.get(CFG_SCI_HOST1_ID_1, &conf.remoteSciNodeId1)) break; + if(iter.get(CFG_SCI_HOST1_ID_0, &conf.sci.remoteSciNodeId0)) break; + if(iter.get(CFG_SCI_HOST1_ID_1, &conf.sci.remoteSciNodeId1)) break; } - if (conf.remoteSciNodeId1 == 0) { - conf.nLocalAdapters = 1; + if (conf.sci.remoteSciNodeId1 == 0) { + conf.sci.nLocalAdapters = 1; } else { - conf.nLocalAdapters = 2; + conf.sci.nLocalAdapters = 2; } - if(!tr.createTransporter(&conf)){ + if(!tr.createSCITransporter(&conf)){ DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d", conf.localNodeId, conf.remoteNodeId)); ndbout << "Failed to create SCI Transporter from: " << conf.localNodeId << " to: " << conf.remoteNodeId << endl; } else { - DBUG_PRINT("info", ("Created SCI Transporter: Adapters = %d, remote SCI node id %d", - conf.nLocalAdapters, conf.remoteSciNodeId0)); - DBUG_PRINT("info", ("Host 1 = %s, Host 2 = %s, sendLimit = %d, buf size = %d", - conf.localHostName, conf.remoteHostName, conf.sendLimit, conf.bufferSize)); - if (conf.nLocalAdapters > 1) { - DBUG_PRINT("info", ("Fault-tolerant with 2 Remote Adapters, second remote SCI node id = %d", - conf.remoteSciNodeId1)); + DBUG_PRINT("info", ("Created SCI Transporter: Adapters = %d, " + "remote SCI node id %d", + conf.sci.nLocalAdapters, conf.sci.remoteSciNodeId0)); + DBUG_PRINT("info", ("Host 1 = %s, Host 2 = %s, sendLimit = %d, " + "buf size = %d", conf.localHostName, + conf.remoteHostName, conf.sci.sendLimit, + conf.sci.bufferSize)); + if (conf.sci.nLocalAdapters > 1) { + DBUG_PRINT("info", ("Fault-tolerant with 2 Remote Adapters, " + "second remote SCI node id = %d", + conf.sci.remoteSciNodeId1)); } noOfTransportersCreated++; continue; } - } - case CONNECTION_TYPE_TCP:{ - TCP_TransporterConfiguration conf; + break; + + case CONNECTION_TYPE_TCP: + if(iter.get(CFG_TCP_SEND_BUFFER_SIZE, &conf.tcp.sendBufferSize)) break; + if(iter.get(CFG_TCP_RECEIVE_BUFFER_SIZE, &conf.tcp.maxReceiveSize)) break; - if(iter.get(CFG_TCP_SEND_BUFFER_SIZE, &conf.sendBufferSize)) break; - if(iter.get(CFG_TCP_RECEIVE_BUFFER_SIZE, &conf.maxReceiveSize)) break; - - conf.port= server_port; const char * proxy; if (!iter.get(CFG_TCP_PROXY, &proxy)) { if (strlen(proxy) > 0 && nodeId2 == nodeId) { @@ -476,50 +325,35 @@ IPCConfig::configureTransporters(Uint32 nodeId, } } - conf.localNodeId = nodeId; - conf.remoteNodeId = remoteNodeId; - conf.localHostName = localHostName; - conf.remoteHostName = remoteHostName; - conf.checksum = checksum; - conf.signalId = sendSignalId; - - if(!tr.createTransporter(&conf)){ + if(!tr.createTCPTransporter(&conf)){ ndbout << "Failed to create TCP Transporter from: " << nodeId << " to: " << remoteNodeId << endl; } else { noOfTransportersCreated++; } - DBUG_PRINT("info", ("Created TCP Transporter: sendBufferSize = %d, maxReceiveSize = %d", - conf.sendBufferSize, conf.maxReceiveSize)); + DBUG_PRINT("info", ("Created TCP Transporter: sendBufferSize = %d, " + "maxReceiveSize = %d", conf.tcp.sendBufferSize, + conf.tcp.maxReceiveSize)); break; - case CONNECTION_TYPE_OSE:{ - OSE_TransporterConfiguration conf; - - if(iter.get(CFG_OSE_PRIO_A_SIZE, &conf.prioASignalSize)) break; - if(iter.get(CFG_OSE_PRIO_B_SIZE, &conf.prioBSignalSize)) break; - if(iter.get(CFG_OSE_RECEIVE_ARRAY_SIZE, &conf.receiveBufferSize)) break; + case CONNECTION_TYPE_OSE: + if(iter.get(CFG_OSE_PRIO_A_SIZE, &conf.ose.prioASignalSize)) break; + if(iter.get(CFG_OSE_PRIO_B_SIZE, &conf.ose.prioBSignalSize)) break; - conf.localNodeId = nodeId; - conf.remoteNodeId = remoteNodeId; - conf.localHostName = localHostName; - conf.remoteHostName = remoteHostName; - conf.checksum = checksum; - conf.signalId = sendSignalId; - - if(!tr.createTransporter(&conf)){ + if(!tr.createOSETransporter(&conf)){ ndbout << "Failed to create OSE Transporter from: " << nodeId << " to: " << remoteNodeId << endl; } else { noOfTransportersCreated++; } - } + break; + default: ndbout << "Unknown transporter type from: " << nodeId << " to: " << remoteNodeId << endl; break; - } - } - } + } // switch + } // for + DBUG_RETURN(noOfTransportersCreated); } diff --git a/ndb/src/common/transporter/OSE_Transporter.cpp b/ndb/src/common/transporter/OSE_Transporter.cpp index a52862a80e5..ad67791fc0c 100644 --- a/ndb/src/common/transporter/OSE_Transporter.cpp +++ b/ndb/src/common/transporter/OSE_Transporter.cpp @@ -32,6 +32,7 @@ OSE_Transporter::OSE_Transporter(int _prioASignalSize, NodeId localNodeId, const char * lHostName, NodeId remoteNodeId, + NodeId serverNodeId, const char * rHostName, int byteorder, bool compression, @@ -40,6 +41,7 @@ OSE_Transporter::OSE_Transporter(int _prioASignalSize, Uint32 reportFreq) : Transporter(localNodeId, remoteNodeId, + serverNodeId, byteorder, compression, checksum, diff --git a/ndb/src/common/transporter/OSE_Transporter.hpp b/ndb/src/common/transporter/OSE_Transporter.hpp index 4fd06130477..898352366ba 100644 --- a/ndb/src/common/transporter/OSE_Transporter.hpp +++ b/ndb/src/common/transporter/OSE_Transporter.hpp @@ -48,6 +48,7 @@ public: NodeId localNodeId, const char * lHostName, NodeId remoteNodeId, + NodeId serverNodeId, const char * rHostName, int byteorder, bool compression, diff --git a/ndb/src/common/transporter/SCI_Transporter.cpp b/ndb/src/common/transporter/SCI_Transporter.cpp index e7807c972b1..506140a887f 100644 --- a/ndb/src/common/transporter/SCI_Transporter.cpp +++ b/ndb/src/common/transporter/SCI_Transporter.cpp @@ -34,19 +34,21 @@ SCI_Transporter::SCI_Transporter(TransporterRegistry &t_reg, const char *lHostName, const char *rHostName, int r_port, + bool isMgmConnection, Uint32 packetSize, Uint32 bufferSize, Uint32 nAdapters, Uint16 remoteSciNodeId0, Uint16 remoteSciNodeId1, NodeId _localNodeId, - NodeId _remoteNodeId, + NodeId _remoteNodeId, + NodeId serverNodeId, bool chksm, bool signalId, Uint32 reportFreq) : Transporter(t_reg, tt_SCI_TRANSPORTER, - lHostName, rHostName, r_port, _localNodeId, - _remoteNodeId, 0, false, chksm, signalId) + lHostName, rHostName, r_port, isMgmConnection, _localNodeId, + _remoteNodeId, serverNodeID, 0, false, chksm, signalId) { DBUG_ENTER("SCI_Transporter::SCI_Transporter"); m_PacketSize = (packetSize + 3)/4 ; diff --git a/ndb/src/common/transporter/SCI_Transporter.hpp b/ndb/src/common/transporter/SCI_Transporter.hpp index e2f2dfcaf99..8d263f32a57 100644 --- a/ndb/src/common/transporter/SCI_Transporter.hpp +++ b/ndb/src/common/transporter/SCI_Transporter.hpp @@ -139,13 +139,15 @@ private: const char *local_host, const char *remote_host, int port, + bool isMgmConnection, Uint32 packetSize, Uint32 bufferSize, Uint32 nAdapters, Uint16 remoteSciNodeId0, Uint16 remoteSciNodeId1, NodeId localNodeID, - NodeId remoteNodeID, + NodeId remoteNodeID, + NodeId serverNodeId, bool checksum, bool signalId, Uint32 reportFreq = 4096); diff --git a/ndb/src/common/transporter/SHM_Transporter.cpp b/ndb/src/common/transporter/SHM_Transporter.cpp index eed3ad77be6..e2d23cf94e2 100644 --- a/ndb/src/common/transporter/SHM_Transporter.cpp +++ b/ndb/src/common/transporter/SHM_Transporter.cpp @@ -32,14 +32,17 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg, const char *lHostName, const char *rHostName, int r_port, + bool isMgmConnection, NodeId lNodeId, - NodeId rNodeId, + NodeId rNodeId, + NodeId serverNodeId, bool checksum, bool signalId, key_t _shmKey, Uint32 _shmSize) : Transporter(t_reg, tt_SHM_TRANSPORTER, - lHostName, rHostName, r_port, lNodeId, rNodeId, + lHostName, rHostName, r_port, isMgmConnection, + lNodeId, rNodeId, serverNodeId, 0, false, checksum, signalId), shmKey(_shmKey), shmSize(_shmSize) diff --git a/ndb/src/common/transporter/SHM_Transporter.hpp b/ndb/src/common/transporter/SHM_Transporter.hpp index b501f652168..677bd6efc37 100644 --- a/ndb/src/common/transporter/SHM_Transporter.hpp +++ b/ndb/src/common/transporter/SHM_Transporter.hpp @@ -36,8 +36,10 @@ public: const char *lHostName, const char *rHostName, int r_port, + bool isMgmConnection, NodeId lNodeId, - NodeId rNodeId, + NodeId rNodeId, + NodeId serverNodeId, bool checksum, bool signalId, key_t shmKey, diff --git a/ndb/src/common/transporter/TCP_Transporter.cpp b/ndb/src/common/transporter/TCP_Transporter.cpp index a629b620157..fd71cf71cd9 100644 --- a/ndb/src/common/transporter/TCP_Transporter.cpp +++ b/ndb/src/common/transporter/TCP_Transporter.cpp @@ -68,12 +68,15 @@ TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg, const char *lHostName, const char *rHostName, int r_port, + bool isMgmConnection, NodeId lNodeId, NodeId rNodeId, + NodeId serverNodeId, bool chksm, bool signalId, Uint32 _reportFreq) : Transporter(t_reg, tt_TCP_TRANSPORTER, - lHostName, rHostName, r_port, lNodeId, rNodeId, + lHostName, rHostName, r_port, isMgmConnection, + lNodeId, rNodeId, serverNodeId, 0, false, chksm, signalId), m_sendBuffer(sendBufSize) { diff --git a/ndb/src/common/transporter/TCP_Transporter.hpp b/ndb/src/common/transporter/TCP_Transporter.hpp index 48046310bf8..9cd174150c1 100644 --- a/ndb/src/common/transporter/TCP_Transporter.hpp +++ b/ndb/src/common/transporter/TCP_Transporter.hpp @@ -49,9 +49,11 @@ private: int sendBufferSize, int maxReceiveSize, const char *lHostName, const char *rHostName, - int r_port, + int r_port, + bool isMgmConnection, NodeId lHostId, NodeId rHostId, + NodeId serverNodeId, bool checksum, bool signalId, Uint32 reportFreq = 4096); diff --git a/ndb/src/common/transporter/Transporter.cpp b/ndb/src/common/transporter/Transporter.cpp index b84f8f6fb5e..ea9e469382e 100644 --- a/ndb/src/common/transporter/Transporter.cpp +++ b/ndb/src/common/transporter/Transporter.cpp @@ -32,12 +32,14 @@ Transporter::Transporter(TransporterRegistry &t_reg, const char *lHostName, const char *rHostName, int r_port, + bool _isMgmConnection, NodeId lNodeId, - NodeId rNodeId, + NodeId rNodeId, + NodeId serverNodeId, int _byteorder, bool _compression, bool _checksum, bool _signalId) : m_r_port(r_port), remoteNodeId(rNodeId), localNodeId(lNodeId), - isServer(lNodeId < rNodeId), + isServer(lNodeId==serverNodeId), isMgmConnection(_isMgmConnection), m_packer(_signalId, _checksum), m_type(_type), m_transporter_registry(t_reg) @@ -109,22 +111,46 @@ Transporter::connect_server(NDB_SOCKET_TYPE sockfd) { bool Transporter::connect_client() { + NDB_SOCKET_TYPE sockfd; + if(m_connected) return true; - NDB_SOCKET_TYPE sockfd = m_socket_client->connect(); - - if (sockfd == NDB_INVALID_SOCKET) - return false; DBUG_ENTER("Transporter::connect_client"); + DBUG_PRINT("info",("port %d isMgmConnection=%d",m_r_port,isMgmConnection)); + + if(isMgmConnection) + sockfd= m_socket_client->connect_without_auth(); + else + sockfd= m_socket_client->connect(); + + if(sockfd<0) + return false; + + SocketOutputStream s_output(sockfd); + SocketInputStream s_input(sockfd); + + if(isMgmConnection) + { + /* + We issue the magic command to the management server to + switch into transporter mode. + */ + s_output.println("transporter connect"); + s_output.println(""); + } + + if (sockfd == NDB_INVALID_SOCKET) + return false; + // send info about own id // send info about own transporter type - SocketOutputStream s_output(sockfd); + s_output.println("%d %d", localNodeId, m_type); // get remote id int nodeId, remote_transporter_type= -1; - SocketInputStream s_input(sockfd); + char buf[256]; if (s_input.gets(buf, 256) == 0) { NDB_CLOSE_SOCKET(sockfd); diff --git a/ndb/src/common/transporter/Transporter.hpp b/ndb/src/common/transporter/Transporter.hpp index 12d991de681..736481a3250 100644 --- a/ndb/src/common/transporter/Transporter.hpp +++ b/ndb/src/common/transporter/Transporter.hpp @@ -89,8 +89,10 @@ protected: const char *lHostName, const char *rHostName, int r_port, + bool isMgmConnection, NodeId lNodeId, - NodeId rNodeId, + NodeId rNodeId, + NodeId serverNodeId, int byteorder, bool compression, bool checksum, @@ -133,6 +135,12 @@ protected: private: + /** + * means that we transform an MGM connection into + * a transporter connection + */ + bool isMgmConnection; + SocketClient *m_socket_client; protected: diff --git a/ndb/src/common/transporter/TransporterRegistry.cpp b/ndb/src/common/transporter/TransporterRegistry.cpp index 007225b17d8..3ec728597a5 100644 --- a/ndb/src/common/transporter/TransporterRegistry.cpp +++ b/ndb/src/common/transporter/TransporterRegistry.cpp @@ -248,7 +248,7 @@ TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd) } bool -TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) { +TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) { #ifdef NDB_TCP_TRANSPORTER if(!nodeIdSpecified){ @@ -262,13 +262,15 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) { return false; TCP_Transporter * t = new TCP_Transporter(*this, - config->sendBufferSize, - config->maxReceiveSize, + config->tcp.sendBufferSize, + config->tcp.maxReceiveSize, config->localHostName, config->remoteHostName, config->port, + config->isMgmConnection, localNodeId, config->remoteNodeId, + config->serverNodeId, config->checksum, config->signalId); if (t == NULL) @@ -297,7 +299,7 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) { } bool -TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) { +TransporterRegistry::createOSETransporter(TransporterConfiguration *conf) { #ifdef NDB_OSE_TRANSPORTER if(!nodeIdSpecified){ @@ -316,11 +318,12 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) { localNodeId); } - OSE_Transporter * t = new OSE_Transporter(conf->prioASignalSize, - conf->prioBSignalSize, + OSE_Transporter * t = new OSE_Transporter(conf->ose.prioASignalSize, + conf->ose.prioBSignalSize, localNodeId, conf->localHostName, conf->remoteNodeId, + conf->serverNodeId, conf->remoteHostName, conf->checksum, conf->signalId); @@ -346,7 +349,7 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) { } bool -TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) { +TransporterRegistry::createSCITransporter(TransporterConfiguration *config) { #ifdef NDB_SCI_TRANSPORTER if(!SCI_Transporter::initSCI()) @@ -366,13 +369,15 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) { config->localHostName, config->remoteHostName, config->port, - config->sendLimit, - config->bufferSize, - config->nLocalAdapters, - config->remoteSciNodeId0, - config->remoteSciNodeId1, + config->isMgmConnection, + config->sci.sendLimit, + config->sci.bufferSize, + config->sci.nLocalAdapters, + config->sci.remoteSciNodeId0, + config->sci.remoteSciNodeId1, localNodeId, config->remoteNodeId, + config->serverNodeId, config->checksum, config->signalId); @@ -397,7 +402,7 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) { } bool -TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) { +TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) { DBUG_ENTER("TransporterRegistry::createTransporter SHM"); #ifdef NDB_SHM_TRANSPORTER if(!nodeIdSpecified){ @@ -408,7 +413,7 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) { return false; if (!g_ndb_shm_signum) { - g_ndb_shm_signum= config->signum; + g_ndb_shm_signum= config->shm.signum; DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum)); /** * Make sure to block g_ndb_shm_signum @@ -420,7 +425,7 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) { pthread_sigmask(SIG_BLOCK, &mask, 0); } - if(config->signum != g_ndb_shm_signum) + if(config->shm.signum != g_ndb_shm_signum) return false; if(theTransporters[config->remoteNodeId] != NULL) @@ -430,12 +435,14 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) { config->localHostName, config->remoteHostName, config->port, + config->isMgmConnection, localNodeId, config->remoteNodeId, + config->serverNodeId, config->checksum, config->signalId, - config->shmKey, - config->shmSize + config->shm.shmKey, + config->shm.shmSize ); if (t == NULL) return false; diff --git a/ndb/src/common/util/SocketClient.cpp b/ndb/src/common/util/SocketClient.cpp index 38df1417eb8..1ac105881a2 100644 --- a/ndb/src/common/util/SocketClient.cpp +++ b/ndb/src/common/util/SocketClient.cpp @@ -60,6 +60,27 @@ SocketClient::init() return true; } +/** + * SocketClient::connect_without_auth() + * + * Temporarily disables authentication and connects. + * This is useful if you're trying to change what this + * SocketClient object is for (e.g. from mgm to ndb) + */ + +NDB_SOCKET_TYPE +SocketClient::connect_without_auth() +{ + SocketAuthenticator *tmp; + NDB_SOCKET_TYPE retval; + tmp= m_auth; + m_auth= NULL; + retval= connect(); + m_auth= tmp; + + return retval; +} + NDB_SOCKET_TYPE SocketClient::connect() { diff --git a/ndb/src/common/util/SocketServer.cpp b/ndb/src/common/util/SocketServer.cpp index f50ed4eb1ed..8dd949f7421 100644 --- a/ndb/src/common/util/SocketServer.cpp +++ b/ndb/src/common/util/SocketServer.cpp @@ -333,11 +333,18 @@ sessionThread_C(void* _sc){ return 0; } - if(!si->m_stop){ - si->m_stopped = false; - si->runSession(); - } else { - NDB_CLOSE_SOCKET(si->m_socket); + /** + * may have m_stopped set if we're transforming a mgm + * connection into a transporter connection. + */ + if(!si->m_stopped) + { + if(!si->m_stop){ + si->m_stopped = false; + si->runSession(); + } else { + NDB_CLOSE_SOCKET(si->m_socket); + } } si->m_stopped = true; diff --git a/ndb/src/mgmsrv/ConfigInfo.cpp b/ndb/src/mgmsrv/ConfigInfo.cpp index bd0d0f2ffd0..e6774c5c99b 100644 --- a/ndb/src/mgmsrv/ConfigInfo.cpp +++ b/ndb/src/mgmsrv/ConfigInfo.cpp @@ -3132,8 +3132,8 @@ fixPortNumber(InitConfigFileParser::Context & ctx, const char * data){ const Properties * node; require(ctx.m_config->get("Node", id1, &node)); + BaseString hostname(hostName1); - // require(node->get("HostName", hostname)); if (hostname.c_str()[0] == 0) { ctx.reportError("Hostname required on nodeid %d since it will " @@ -3142,6 +3142,19 @@ fixPortNumber(InitConfigFileParser::Context & ctx, const char * data){ } Uint32 port= 0; + const char * type1; + const char * type2; + const Properties * node2; + + node->get("Type", &type1); + ctx.m_config->get("Node", id2, &node2); + node2->get("Type", &type2); + + if(strcmp(type1, MGM_TOKEN)==0) + node->get("PortNumber",&port); + else if(strcmp(type2, MGM_TOKEN)==0) + node2->get("PortNumber",&port); + if (!node->get("ServerPort", &port) && !ctx.m_userProperties.get("ServerPort_", id1, &port)) { ctx.m_currentSection->put("PortNumber", port); diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp index 137d2aa1647..4965eec41a9 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.cpp +++ b/ndb/src/mgmsrv/MgmtSrvr.cpp @@ -2883,6 +2883,11 @@ MgmtSrvr::getConnectionDbParameter(int node1, DBUG_RETURN(1); } +void MgmtSrvr::transporter_connect(NDB_SOCKET_TYPE sockfd) +{ + theFacade->get_registry()->connect_server(sockfd); +} + int MgmtSrvr::set_connect_string(const char *str) { return ndb_mgm_set_connectstring(m_config_retriever->get_mgmHandle(),str); diff --git a/ndb/src/mgmsrv/MgmtSrvr.hpp b/ndb/src/mgmsrv/MgmtSrvr.hpp index 2244785ee9a..95298630230 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.hpp +++ b/ndb/src/mgmsrv/MgmtSrvr.hpp @@ -515,6 +515,8 @@ public: int set_connect_string(const char *str); + void transporter_connect(NDB_SOCKET_TYPE sockfd); + ConfigRetriever *get_config_retriever() { return m_config_retriever; }; const char *get_connect_address(Uint32 node_id) { return inet_ntoa(m_connect_address[node_id]); } diff --git a/ndb/src/mgmsrv/Services.cpp b/ndb/src/mgmsrv/Services.cpp index 42798bcda39..344a7afd348 100644 --- a/ndb/src/mgmsrv/Services.cpp +++ b/ndb/src/mgmsrv/Services.cpp @@ -264,6 +264,8 @@ ParserRow commands[] = { MGM_CMD("check connection", &MgmApiSession::check_connection, ""), + MGM_CMD("transporter connect", &MgmApiSession::transporter_connect, ""), + MGM_END() }; @@ -1538,5 +1540,17 @@ MgmApiSession::check_connection(Parser_t::Context &ctx, m_output->println(""); } +void +MgmApiSession::transporter_connect(Parser_t::Context &ctx, + Properties const &args) { + NDB_SOCKET_TYPE s= m_socket; + + m_stop= true; + m_stopped= true; // force a stop (no closing socket) + m_socket= -1; // so nobody closes it + + m_mgmsrv.transporter_connect(s); +} + template class MutexVector; template class Vector const*>; diff --git a/ndb/src/mgmsrv/Services.hpp b/ndb/src/mgmsrv/Services.hpp index d7334ee1c5f..e4fddea7d04 100644 --- a/ndb/src/mgmsrv/Services.hpp +++ b/ndb/src/mgmsrv/Services.hpp @@ -97,6 +97,8 @@ public: void purge_stale_sessions(Parser_t::Context &ctx, const class Properties &args); void check_connection(Parser_t::Context &ctx, const class Properties &args); + + void transporter_connect(Parser_t::Context &ctx, Properties const &args); void repCommand(Parser_t::Context &ctx, const class Properties &args); };