From dc2544fdee9e510614cc7d674726ea388284e57f Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 23 Jun 2004 00:48:07 +0000 Subject: [PATCH] WL 1748 ndb/include/kernel/signaldata/ApiRegSignalData.hpp: Added info about connected nodes to ApiRegConf ndb/include/mgmapi/mgmapi.h: New mgmapi command to get nodeid dynamically ndb/include/mgmapi/mgmapi_config_parameters.h: New config param to specify port on for transporter setup ndb/include/mgmcommon/ConfigRetriever.hpp: added notetype to getConfig ndb/include/portlib/NdbTCP.h: small detail ndb/include/transporter/TransporterRegistry.hpp: changed performstates and interface to connect/disconnect transporters added TransporterService for transporter setup changed model for setting up transporters ndb/src/common/mgmcommon/ConfigInfo.cpp: removed some config params as mandatory ndb/src/common/mgmcommon/ConfigRetriever.cpp: added dynamic alloc of nodeid ndb/src/common/mgmcommon/LocalConfig.cpp: added default localhost:2200 and dynamic id ndb/src/common/transporter/TCP_Transporter.cpp: moved TCP hostname stuff from TCP_Transporter to parent class Transporter changed TCP connection setup to use just one port for all transporters ndb/src/common/transporter/TCP_Transporter.hpp: moved TCP hostname stuff from TCP_Transporter to parent class Transporter changed TCP connection setup to use just one port for all transporters ndb/src/common/transporter/Transporter.cpp: moved TCP hostname stuff from TCP_Transporter to parent class Transporter changed TCP connection setup to use just one port for all transporters ndb/src/common/transporter/Transporter.hpp: moved TCP hostname stuff from TCP_Transporter to parent class Transporter changed TCP connection setup to use just one port for all transporters ndb/src/common/transporter/TransporterRegistry.cpp: changed performstates and interface to connect/disconnect transporters added TransporterService for transporter setup changed model for setting up transporters ndb/src/common/util/Makefile.am: New SocketAuthenticator ndb/src/common/util/SocketServer.cpp: small detail ndb/src/kernel/Makefile.am: small detail ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp: new interface to performstates + cleanup ndb/src/kernel/blocks/qmgr/QmgrMain.cpp: changed signal ApiRegConf ndb/src/kernel/vm/ThreadConfig.cpp: changed naming ndb/src/mgmapi/mgmapi.cpp: Use new Socket client New methid for allocating dynamic id ndb/src/mgmsrv/MgmtSrvr.cpp: moved port setup from main.cpp to MgmtSrvr new method getNextFreeNodeId ndb/src/mgmsrv/MgmtSrvr.hpp: .. ndb/src/mgmsrv/Services.cpp: allocate new nodeid ndb/src/mgmsrv/Services.hpp: .. ndb/src/mgmsrv/main.cpp: moved setup port to MgmtSrvr.cpp Rearranged setup order ndb/src/ndbapi/ClusterMgr.cpp: new API_REGCONF ndb/src/ndbapi/ClusterMgr.hpp: bitmask to hold connected nodes ndb/src/ndbapi/TransporterFacade.cpp: New transporter connect ndb/src/ndbapi/TransporterFacade.hpp: removed function not used ndb/src/kernel/main.cpp: new transporter setup --- .../kernel/signaldata/ApiRegSignalData.hpp | 4 +- ndb/include/mgmapi/mgmapi.h | 5 + ndb/include/mgmapi/mgmapi_config_parameters.h | 2 + ndb/include/mgmcommon/ConfigRetriever.hpp | 4 +- ndb/include/portlib/NdbTCP.h | 2 +- .../transporter/TransporterRegistry.hpp | 82 +++- ndb/include/util/SocketAuthenticator.hpp | 39 ++ ndb/include/util/SocketClient.hpp | 38 ++ ndb/src/common/mgmcommon/ConfigInfo.cpp | 43 +- ndb/src/common/mgmcommon/ConfigRetriever.cpp | 24 +- ndb/src/common/mgmcommon/LocalConfig.cpp | 6 + .../common/transporter/TCP_Transporter.cpp | 250 ++-------- .../common/transporter/TCP_Transporter.hpp | 83 +--- ndb/src/common/transporter/Transporter.cpp | 191 ++++---- ndb/src/common/transporter/Transporter.hpp | 105 ++--- .../transporter/TransporterRegistry.cpp | 439 +++++++++++------- ndb/src/common/util/Makefile.am | 3 +- ndb/src/common/util/SocketAuthenticator.cpp | 63 +++ ndb/src/common/util/SocketClient.cpp | 90 ++++ ndb/src/common/util/SocketServer.cpp | 2 +- ndb/src/kernel/Makefile.am | 2 +- ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp | 77 +-- ndb/src/kernel/blocks/qmgr/QmgrMain.cpp | 6 +- ndb/src/kernel/{Main.cpp => main.cpp} | 18 +- ndb/src/kernel/vm/ThreadConfig.cpp | 2 +- ndb/src/mgmapi/mgmapi.cpp | 83 ++-- ndb/src/mgmsrv/MgmtSrvr.cpp | 137 +++++- ndb/src/mgmsrv/MgmtSrvr.hpp | 8 + ndb/src/mgmsrv/Services.cpp | 85 +++- ndb/src/mgmsrv/Services.hpp | 1 + ndb/src/mgmsrv/main.cpp | 140 +----- ndb/src/ndbapi/ClusterMgr.cpp | 6 + ndb/src/ndbapi/ClusterMgr.hpp | 1 + ndb/src/ndbapi/TransporterFacade.cpp | 26 +- ndb/src/ndbapi/TransporterFacade.hpp | 1 - 35 files changed, 1195 insertions(+), 873 deletions(-) create mode 100644 ndb/include/util/SocketAuthenticator.hpp create mode 100644 ndb/include/util/SocketClient.hpp create mode 100644 ndb/src/common/util/SocketAuthenticator.cpp create mode 100644 ndb/src/common/util/SocketClient.cpp rename ndb/src/kernel/{Main.cpp => main.cpp} (94%) diff --git a/ndb/include/kernel/signaldata/ApiRegSignalData.hpp b/ndb/include/kernel/signaldata/ApiRegSignalData.hpp index 84dca8fb260..9ce99d3e45c 100644 --- a/ndb/include/kernel/signaldata/ApiRegSignalData.hpp +++ b/ndb/include/kernel/signaldata/ApiRegSignalData.hpp @@ -80,13 +80,15 @@ class ApiRegConf { friend class ClusterMgr; public: - STATIC_CONST( SignalLength = 3 + NodeState::DataLength ); + STATIC_CONST( SignalLength = 3 + NodeState::DataLength + + NdbNodeBitmask::Size ); private: Uint32 qmgrRef; Uint32 version; // Version of NDB node Uint32 apiHeartbeatFrequency; NodeState nodeState; + Bitmask::Data connected_nodes; }; #endif diff --git a/ndb/include/mgmapi/mgmapi.h b/ndb/include/mgmapi/mgmapi.h index 7b2f728bda8..45a421855b0 100644 --- a/ndb/include/mgmapi/mgmapi.h +++ b/ndb/include/mgmapi/mgmapi.h @@ -666,6 +666,11 @@ extern "C" { */ struct ndb_mgm_configuration * ndb_mgm_get_configuration(NdbMgmHandle handle, unsigned version); + + int ndb_mgm_alloc_nodeid(NdbMgmHandle handle, + unsigned version, + unsigned *pnodeid, + int nodetype); /** * Config iterator */ diff --git a/ndb/include/mgmapi/mgmapi_config_parameters.h b/ndb/include/mgmapi/mgmapi_config_parameters.h index d3bb44c1523..22b9f8f31dd 100644 --- a/ndb/include/mgmapi/mgmapi_config_parameters.h +++ b/ndb/include/mgmapi/mgmapi_config_parameters.h @@ -76,6 +76,8 @@ #define CFG_DB_DISCLESS 148 +#define CFG_DB_SERVER_PORT 149 + #define CFG_NODE_ARBIT_RANK 200 #define CFG_NODE_ARBIT_DELAY 201 diff --git a/ndb/include/mgmcommon/ConfigRetriever.hpp b/ndb/include/mgmcommon/ConfigRetriever.hpp index 50d333b54dd..c1de751b797 100644 --- a/ndb/include/mgmcommon/ConfigRetriever.hpp +++ b/ndb/include/mgmcommon/ConfigRetriever.hpp @@ -77,7 +77,7 @@ public: * Get config using socket */ struct ndb_mgm_configuration * getConfig(const char * mgmhost, short port, - int versionId); + int versionId, int nodetype); /** * Get config from file */ @@ -98,7 +98,7 @@ private: char * m_connectString; char * m_defaultConnectString; - + /** * Verify config */ diff --git a/ndb/include/portlib/NdbTCP.h b/ndb/include/portlib/NdbTCP.h index 42c34855c39..4dc8435eef1 100644 --- a/ndb/include/portlib/NdbTCP.h +++ b/ndb/include/portlib/NdbTCP.h @@ -64,7 +64,7 @@ typedef int socklen_t; #define NDB_NONBLOCK O_NONBLOCK #define NDB_SOCKET_TYPE int #define NDB_INVALID_SOCKET -1 -#define NDB_CLOSE_SOCKET(x) close(x) +#define NDB_CLOSE_SOCKET(x) ::close(x) #define InetErrno errno diff --git a/ndb/include/transporter/TransporterRegistry.hpp b/ndb/include/transporter/TransporterRegistry.hpp index 6c979777f18..7a750b81478 100644 --- a/ndb/include/transporter/TransporterRegistry.hpp +++ b/ndb/include/transporter/TransporterRegistry.hpp @@ -29,20 +29,10 @@ #define TransporterRegistry_H #include "TransporterDefinitions.hpp" +#include #include -// A transporter is always in a PerformState. -// PerformIO is used initially and as long as any of the events -// PerformConnect, ... -enum PerformState { - PerformNothing = 4, // Does nothing - PerformIO = 0, // Is connected - PerformConnect = 1, // Is trying to connect - PerformDisconnect = 2, // Trying to disconnect - RemoveTransporter = 3 // Will be removed -}; - // A transporter is always in an IOState. // NoHalt is used initially and as long as it is no restrictions on // sending or receiving. @@ -60,18 +50,45 @@ enum TransporterType { tt_OSE_TRANSPORTER = 4 }; +static const char *performStateString[] = + { "is connected", + "is trying to connect", + "does nothing", + "is trying to disconnect" }; + class Transporter; class TCP_Transporter; class SCI_Transporter; class SHM_Transporter; class OSE_Transporter; +class TransporterRegistry; +class SocketAuthenticator; + +class TransporterService : public SocketServer::Service { + SocketAuthenticator * m_auth; + TransporterRegistry * m_transporter_registry; +public: + TransporterService(SocketAuthenticator *auth= 0) + { + m_auth= auth; + m_transporter_registry= 0; + } + void setTransporterRegistry(TransporterRegistry *t) + { + m_transporter_registry= t; + } + SocketServer::Session * newSession(NDB_SOCKET_TYPE socket); +}; + /** * @class TransporterRegistry * @brief ... */ class TransporterRegistry { friend class OSE_Receiver; + friend class Transporter; + friend class TransporterService; public: /** * Constructor @@ -98,6 +115,12 @@ public: */ ~TransporterRegistry(); + bool start_service(SocketServer& server); + bool start_clients(); + bool stop_clients(); + void start_clients_thread(); + void update_connections(); + /** * Start/Stop receiving */ @@ -110,16 +133,26 @@ public: void startSending(); void stopSending(); + // A transporter is always in a PerformState. + // PerformIO is used initially and as long as any of the events + // PerformConnect, ... + enum PerformState { + CONNECTED = 0, + CONNECTING = 1, + DISCONNECTED = 2, + DISCONNECTING = 3 + }; + const char *getPerformStateString(NodeId nodeId) const + { return performStateString[(unsigned)performStates[nodeId]]; }; + /** * Get and set methods for PerformState */ - PerformState performState(NodeId nodeId); - void setPerformState(NodeId nodeId, PerformState state); - - /** - * Set perform state for all transporters - */ - void setPerformState(PerformState state); + void do_connect(NodeId node_id); + void do_disconnect(NodeId node_id); + bool is_connected(NodeId node_id) { return performStates[node_id] == CONNECTED; }; + void report_connect(NodeId node_id); + void report_disconnect(NodeId node_id, int errnum); /** * Get and set methods for IOState @@ -174,8 +207,6 @@ public: void performReceive(); void performSend(); - void checkConnections(); - /** * Force sending if more than or equal to sendLimit * number have asked for send. Returns 0 if not sending @@ -192,6 +223,12 @@ protected: private: void * callbackObj; + TransporterService *m_transporter_service; + unsigned short m_service_port; + char *m_interface_name; + struct NdbThread *m_start_clients_thread; + bool m_run_start_clients_thread; + int sendCounter; NodeId localNodeId; bool nodeIdSpecified; @@ -202,11 +239,6 @@ private: int nSHMTransporters; int nOSETransporters; - int m_ccCount; - int m_ccIndex; - int m_ccStep; - int m_nTransportersPerformConnect; - bool m_ccReady; /** * Arrays holding all transporters in the order they are created */ diff --git a/ndb/include/util/SocketAuthenticator.hpp b/ndb/include/util/SocketAuthenticator.hpp new file mode 100644 index 00000000000..b42c7beb70f --- /dev/null +++ b/ndb/include/util/SocketAuthenticator.hpp @@ -0,0 +1,39 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef SOCKET_AUTHENTICATOR_HPP +#define SOCKET_AUTHENTICATOR_HPP + +class SocketAuthenticator +{ +public: + virtual ~SocketAuthenticator() {}; + virtual bool client_authenticate(int sockfd) = 0; + virtual bool server_authenticate(int sockfd) = 0; +}; + +class SocketAuthSimple : public SocketAuthenticator +{ + const char *m_passwd; + char *m_buf; +public: + SocketAuthSimple(const char *passwd); + virtual ~SocketAuthSimple(); + virtual bool client_authenticate(int sockfd); + virtual bool server_authenticate(int sockfd); +}; + +#endif // SOCKET_AUTHENTICATOR_HPP diff --git a/ndb/include/util/SocketClient.hpp b/ndb/include/util/SocketClient.hpp new file mode 100644 index 00000000000..de9a081464a --- /dev/null +++ b/ndb/include/util/SocketClient.hpp @@ -0,0 +1,38 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef SOCKET_CLIENT_HPP +#define SOCKET_CLIENT_HPP + +#include +class SocketAuthenticator; + +class SocketClient +{ + NDB_SOCKET_TYPE m_sockfd; + struct sockaddr_in m_servaddr; + unsigned short m_port; + char *m_server_name; + SocketAuthenticator *m_auth; +public: + SocketClient(const char *server_name, unsigned short port, SocketAuthenticator *sa = 0); + ~SocketClient(); + bool init(); + NDB_SOCKET_TYPE connect(); + bool close(); +}; + +#endif // SOCKET_ClIENT_HPP diff --git a/ndb/src/common/mgmcommon/ConfigInfo.cpp b/ndb/src/common/mgmcommon/ConfigInfo.cpp index c2b5fdabf01..a1bd5f39d82 100644 --- a/ndb/src/common/mgmcommon/ConfigInfo.cpp +++ b/ndb/src/common/mgmcommon/ConfigInfo.cpp @@ -146,13 +146,17 @@ const int ConfigInfo::m_NoOfRules = sizeof(m_SectionRules)/sizeof(SectionRule); /**************************************************************************** * Config Rules declarations ****************************************************************************/ -bool addNodeConnections(Vector§ions, - struct InitConfigFileParser::Context &ctx, - const char * ruleData); +bool add_node_connections(Vector§ions, + struct InitConfigFileParser::Context &ctx, + const char * rule_data); +bool add_db_ports(Vector§ions, + struct InitConfigFileParser::Context &ctx, + const char * rule_data); const ConfigInfo::ConfigRule ConfigInfo::m_ConfigRules[] = { - { addNodeConnections, 0 }, + { add_node_connections, 0 }, + { add_db_ports, 0 }, { 0, 0 } }; @@ -376,6 +380,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { 1, (MAX_NODES - 1) }, + { + CFG_DB_SERVER_PORT, + "ServerPort", + "DB", + "Port used to setup transporter", + ConfigInfo::USED, + false, + ConfigInfo::INT, + 2202, + 0, + 0x7FFFFFFF }, + { CFG_DB_NO_REPLICAS, "NoOfReplicas", @@ -1231,7 +1247,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ConfigInfo::USED, false, ConfigInfo::STRING, - MANDATORY, + 0, 0, 0x7FFFFFFF }, @@ -1330,7 +1346,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ConfigInfo::USED, false, ConfigInfo::STRING, - MANDATORY, + 0, 0, 0x7FFFFFFF }, @@ -2510,10 +2526,14 @@ fixNodeHostname(InitConfigFileParser::Context & ctx, const char * data){ const char * compId; if(!ctx.m_currentSection->get("ExecuteOnComputer", &compId)){ + require(ctx.m_currentSection->put("HostName", "")); + return true; +#if 0 ctx.reportError("Parameter \"ExecuteOnComputer\" missing from section " "[%s] starting at line: %d", ctx.fname, ctx.m_sectionLineno); return false; +#endif } const Properties * computer; @@ -3158,9 +3178,9 @@ saveInConfigValues(InitConfigFileParser::Context & ctx, const char * data){ } bool -addNodeConnections(Vector§ions, +add_node_connections(Vector§ions, struct InitConfigFileParser::Context &ctx, - const char * ruleData) + const char * rule_data) { Properties * props= ctx.m_config; Properties p_connections; @@ -3241,3 +3261,10 @@ addNodeConnections(Vector§ions, return true; } +bool add_db_ports(Vector§ions, + struct InitConfigFileParser::Context &ctx, + const char * rule_data) +{ + return true; +} + diff --git a/ndb/src/common/mgmcommon/ConfigRetriever.cpp b/ndb/src/common/mgmcommon/ConfigRetriever.cpp index d2c622593de..c34d9bb01f9 100644 --- a/ndb/src/common/mgmcommon/ConfigRetriever.cpp +++ b/ndb/src/common/mgmcommon/ConfigRetriever.cpp @@ -114,7 +114,8 @@ ConfigRetriever::getConfig(int verId, int nodeType) { struct ndb_mgm_configuration * p = 0; switch(m->type){ case MgmId_TCP: - p = getConfig(m->data.tcp.remoteHost, m->data.tcp.port, verId); + p = getConfig(m->data.tcp.remoteHost, m->data.tcp.port, + verId, nodeType); break; case MgmId_File: p = getConfig(m->data.file.filename, verId); @@ -155,7 +156,8 @@ ConfigRetriever::getConfig(int verId, int nodeType) { ndb_mgm_configuration * ConfigRetriever::getConfig(const char * mgmhost, short port, - int versionId){ + int versionId, + int nodetype){ NdbMgmHandle h; h = ndb_mgm_create_handle(); @@ -175,6 +177,21 @@ ConfigRetriever::getConfig(const char * mgmhost, ndb_mgm_configuration * conf = ndb_mgm_get_configuration(h, versionId); if(conf == 0){ setError(CR_ERROR, ndb_mgm_get_latest_error_desc(h)); + ndb_mgm_destroy_handle(&h); + return 0; + } + + { + unsigned nodeid= getOwnNodeId(); + + int res= ndb_mgm_alloc_nodeid(h, versionId, &nodeid, nodetype); + if(res != 0) { + setError(CR_ERROR, ndb_mgm_get_latest_error_desc(h)); + ndb_mgm_destroy_handle(&h); + return 0; + } + + _ownNodeId= nodeid; } ndb_mgm_disconnect(h); @@ -329,6 +346,9 @@ ConfigRetriever::verifyConfig(const struct ndb_mgm_configuration * conf, } do { + if(strlen(hostname) == 0) + break; + if(strcasecmp(hostname, localhost) == 0) break; diff --git a/ndb/src/common/mgmcommon/LocalConfig.cpp b/ndb/src/common/mgmcommon/LocalConfig.cpp index 12e685ced34..67e92064e81 100644 --- a/ndb/src/common/mgmcommon/LocalConfig.cpp +++ b/ndb/src/common/mgmcommon/LocalConfig.cpp @@ -21,6 +21,7 @@ LocalConfig::LocalConfig(){ ids = 0; size = 0; items = 0; error_line = 0; error_msg[0] = 0; + _ownNodeId= 0; } bool @@ -95,6 +96,11 @@ LocalConfig::init(bool onlyNodeId, return false; } + //7. Check + if(readConnectString("host=localhost:2200", onlyNodeId)){ + return true; + } + setError(0, ""); return false; diff --git a/ndb/src/common/transporter/TCP_Transporter.cpp b/ndb/src/common/transporter/TCP_Transporter.cpp index 99b6a137797..8833b51e236 100644 --- a/ndb/src/common/transporter/TCP_Transporter.cpp +++ b/ndb/src/common/transporter/TCP_Transporter.cpp @@ -63,27 +63,23 @@ ndbstrerror::~ndbstrerror(void) #define ndbstrerror strerror #endif -TCP_Transporter::TCP_Transporter(int sendBufSize, int maxRecvSize, - int portNo, - const char *rHostName, +TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg, + int sendBufSize, int maxRecvSize, const char *lHostName, - NodeId rNodeId, NodeId lNodeId, + const char *rHostName, + int r_port, + NodeId lNodeId, + NodeId rNodeId, int byte_order, bool compr, bool chksm, bool signalId, Uint32 _reportFreq) : - Transporter(lNodeId, rNodeId, byte_order, compr, chksm, signalId), - m_sendBuffer(sendBufSize), - isServer(lNodeId < rNodeId), - port(portNo) + Transporter(t_reg, lHostName, rHostName, r_port, lNodeId, rNodeId, + byte_order, compr, chksm, signalId), + m_sendBuffer(sendBufSize) { maxReceiveSize = maxRecvSize; - strncpy(remoteHostName, rHostName, sizeof(remoteHostName)); - // Initialize member variables - Ndb_getInAddr(&remoteHostAddress, rHostName); - - Ndb_getInAddr(&localHostAddress, lHostName); theSocket = NDB_INVALID_SOCKET; sendCount = receiveCount = 0; @@ -108,6 +104,24 @@ TCP_Transporter::~TCP_Transporter() { receiveBuffer.destroy(); } +bool TCP_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd) +{ + return connect_common(sockfd); +} + +bool TCP_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd) +{ + return connect_common(sockfd); +} + +bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd) +{ + theSocket = sockfd; + setSocketOptions(); + setSocketNonBlocking(theSocket); + return true; +} + bool TCP_Transporter::initTransporter() { @@ -316,7 +330,7 @@ TCP_Transporter::doSend() { sendCount ++; sendSize += nBytesSent; if(sendCount == reportFreq){ - reportSendLen(callbackObj,remoteNodeId, sendCount, sendSize); + reportSendLen(get_callback_obj(), remoteNodeId, sendCount, sendSize); sendCount = 0; sendSize = 0; } @@ -331,7 +345,7 @@ TCP_Transporter::doSend() { #endif if(DISCONNECT_ERRNO(InetErrno, nBytesSent)){ doDisconnect(); - reportDisconnect(callbackObj, remoteNodeId, InetErrno); + report_disconnect(InetErrno); } return false; @@ -361,14 +375,15 @@ TCP_Transporter::doReceive() { #endif ndbout_c("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)", receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer); - reportError(callbackObj, remoteNodeId, TE_INVALID_MESSAGE_LENGTH); + report_error(TE_INVALID_MESSAGE_LENGTH); return 0; } receiveCount ++; receiveSize += nBytesRead; + if(receiveCount == reportFreq){ - reportReceiveLen(callbackObj, remoteNodeId, receiveCount, receiveSize); + reportReceiveLen(get_callback_obj(), remoteNodeId, receiveCount, receiveSize); receiveCount = 0; receiveSize = 0; } @@ -384,60 +399,17 @@ TCP_Transporter::doReceive() { if(DISCONNECT_ERRNO(InetErrno, nBytesRead)){ // The remote node has closed down doDisconnect(); - reportDisconnect(callbackObj, remoteNodeId,InetErrno); + report_disconnect(InetErrno); } } return nBytesRead; } -bool -TCP_Transporter::connectImpl(Uint32 timeOutMillis){ - struct timeval timeout = {0, 0}; - timeout.tv_sec = timeOutMillis / 1000; - timeout.tv_usec = (timeOutMillis % 1000)*1000; - - bool retVal = false; - - if(isServer){ - if(theSocket == NDB_INVALID_SOCKET){ - startTCPServer(); - } - if(theSocket == NDB_INVALID_SOCKET) - { - NdbSleep_MilliSleep(timeOutMillis); - return false; - } - retVal = acceptClient(&timeout); - } else { - // Is client - retVal = connectClient(&timeout); - } - - if(!retVal) { - NdbSleep_MilliSleep(timeOutMillis); - return false; - } - -#if defined NDB_OSE || defined NDB_SOFTOSE - if(setsockopt(theSocket, SOL_SOCKET, SO_OSEOWNER, - &theReceiverPid, sizeof(PROCESS)) != 0){ - - ndbout << "Failed to transfer ownership of socket" << endl; - NDB_CLOSE_SOCKET(theSocket); - theSocket = -1; - return false; - } -#endif - - return true; -} - - void -TCP_Transporter::disconnectImpl() { +TCP_Transporter::disconnectImpl() { if(theSocket != NDB_INVALID_SOCKET){ if(NDB_CLOSE_SOCKET(theSocket) < 0){ - reportError(callbackObj, remoteNodeId, TE_ERROR_CLOSING_SOCKET); + report_error(TE_ERROR_CLOSING_SOCKET); } } @@ -447,155 +419,3 @@ TCP_Transporter::disconnectImpl() { theSocket = NDB_INVALID_SOCKET; } - -bool -TCP_Transporter::startTCPServer() { - - int bindResult, listenResult; - - // The server variable is the remote server when we are a client - // htonl and htons returns the parameter in network byte order - // INADDR_ANY tells the OS kernel to choose the IP address - struct sockaddr_in server; - memset((void*)&server, 0, sizeof(server)); - server.sin_family = AF_INET; - server.sin_addr.s_addr = localHostAddress.s_addr; - server.sin_port = htons(port); - - if (theSocket != NDB_INVALID_SOCKET) { - return true; // Server socket is already initialized - } - - // Create the socket - theSocket = socket(AF_INET, SOCK_STREAM, 0); - if (theSocket == NDB_INVALID_SOCKET) { - reportThreadError(remoteNodeId, TE_COULD_NOT_CREATE_SOCKET); - return false; - } - - // Set the socket reuse addr to true, so we are sure we can bind the - // socket - int reuseAddr = 1; - setsockopt(theSocket, SOL_SOCKET, SO_REUSEADDR, - (char*)&reuseAddr, sizeof(reuseAddr)); - - // Set the TCP_NODELAY option so also small packets are sent - // as soon as possible - int nodelay = 1; - setsockopt(theSocket, IPPROTO_TCP, TCP_NODELAY, - (char*)&nodelay, sizeof(nodelay)); - - // Bind the socket - bindResult = bind(theSocket, (struct sockaddr *) &server, - sizeof(server)); - if (bindResult < 0) { - reportThreadError(remoteNodeId, TE_COULD_NOT_BIND_SOCKET); - NDB_CLOSE_SOCKET(theSocket); - theSocket = NDB_INVALID_SOCKET; - return false; - } - - // Perform listen. - listenResult = listen(theSocket, 1); - if (listenResult == 1) { - reportThreadError(remoteNodeId, TE_LISTEN_FAILED); - NDB_CLOSE_SOCKET(theSocket); - theSocket = NDB_INVALID_SOCKET; - return false; - } - - return true; -} - - -bool -TCP_Transporter::acceptClient (struct timeval * timeout){ - - struct sockaddr_in clientAddress; - - fd_set readset; - FD_ZERO(&readset); - FD_SET(theSocket, &readset); - const int res = select(theSocket + 1, &readset, 0, 0, timeout); - if(res == 0) - return false; - - if(res < 0){ - reportThreadError(remoteNodeId, TE_ERROR_IN_SELECT_BEFORE_ACCEPT); - return false; - } - - NDB_SOCKLEN_T clientAddressLen = sizeof(clientAddress); - const NDB_SOCKET_TYPE clientSocket = accept(theSocket, - (struct sockaddr*)&clientAddress, - &clientAddressLen); - if (clientSocket == NDB_INVALID_SOCKET) { - reportThreadError(remoteNodeId, TE_ACCEPT_RETURN_ERROR); - return false; - } - - if (clientAddress.sin_addr.s_addr != remoteHostAddress.s_addr) { - ndbout_c("Wrong client connecting!"); - ndbout_c("connecting address: %s", inet_ntoa(clientAddress.sin_addr)); - ndbout_c("expecting address: %s", inet_ntoa(remoteHostAddress)); - // The newly connected host is not the remote host - // we wanted to connect to. Disconnect it. - // XXX This is not valid. We cannot disconnect it. - NDB_CLOSE_SOCKET(clientSocket); - return false; - } else { - NDB_CLOSE_SOCKET(theSocket); - theSocket = clientSocket; - setSocketOptions(); - setSocketNonBlocking(theSocket); - return true; - } -} - -bool -TCP_Transporter::connectClient (struct timeval * timeout){ - - // Create the socket - theSocket = socket(AF_INET, SOCK_STREAM, 0); - if (theSocket == NDB_INVALID_SOCKET) { - reportThreadError(remoteNodeId, TE_COULD_NOT_CREATE_SOCKET); - return false; - } - - struct sockaddr_in server; - memset((void*)&server, 0, sizeof(server)); - server.sin_family = AF_INET; - server.sin_addr = remoteHostAddress; - server.sin_port = htons(port); - - struct sockaddr_in client; - memset((void*)&client, 0, sizeof(client)); - client.sin_family = AF_INET; - client.sin_addr = localHostAddress; - client.sin_port = 0; // Any port - - // Bind the socket - const int bindResult = bind(theSocket, (struct sockaddr *) &client, - sizeof(client)); - if (bindResult < 0) { - reportThreadError(remoteNodeId, TE_COULD_NOT_BIND_SOCKET); - NDB_CLOSE_SOCKET(theSocket); - theSocket = NDB_INVALID_SOCKET; - return false; - } - - const int connectRes = ::connect(theSocket, (struct sockaddr *) &server, - sizeof(server)); - if(connectRes == 0){ - setSocketOptions(); - setSocketNonBlocking(theSocket); - return true; - } - - NDB_CLOSE_SOCKET(theSocket); - theSocket = NDB_INVALID_SOCKET; - return false; -} - - - diff --git a/ndb/src/common/transporter/TCP_Transporter.hpp b/ndb/src/common/transporter/TCP_Transporter.hpp index 30b730a5b1c..958cfde03a1 100644 --- a/ndb/src/common/transporter/TCP_Transporter.hpp +++ b/ndb/src/common/transporter/TCP_Transporter.hpp @@ -14,24 +14,8 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -//**************************************************************************** -// -// AUTHOR -// Åsa Fransson -// -// NAME -// TCP_Transporter -// -// DESCRIPTION -// A TCP_Transporter instance is created when TCP/IP-communication -// shall be used (user specified). It handles connect, disconnect, -// send and receive. -// -// -// -//***************************************************************************/ -#ifndef TCP_Transporter_H -#define TCP_Transporter_H +#ifndef TCP_TRANSPORTER_HPP +#define TCP_TRANSPORTER_HPP #include "Transporter.hpp" #include "SendBuffer.hpp" @@ -61,11 +45,13 @@ class TCP_Transporter : public Transporter { friend class TransporterRegistry; private: // Initialize member variables - TCP_Transporter(int sendBufferSize, int maxReceiveSize, - int port, - const char *rHostName, + TCP_Transporter(TransporterRegistry&, + int sendBufferSize, int maxReceiveSize, const char *lHostName, - NodeId rHostId, NodeId lHostId, + const char *rHostName, + int r_port, + NodeId lHostId, + NodeId rHostId, int byteorder, bool compression, bool checksum, bool signalId, Uint32 reportFreq = 4096); @@ -121,12 +107,14 @@ protected: * A client connects to the remote server * A server accepts any new connections */ - bool connectImpl(Uint32 timeOutMillis); + virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd); + virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd); + bool connect_common(NDB_SOCKET_TYPE sockfd); /** * Disconnects a TCP/IP node. Empty send and receivebuffer. */ - void disconnectImpl(); + virtual void disconnectImpl(); private: /** @@ -134,21 +122,11 @@ private: */ SendBuffer m_sendBuffer; - const bool isServer; - const unsigned int port; - // Sending/Receiving socket used by both client and server NDB_SOCKET_TYPE theSocket; Uint32 maxReceiveSize; - /** - * Remote host name/and address - */ - char remoteHostName[256]; - struct in_addr remoteHostAddress; - struct in_addr localHostAddress; - /** * Socket options */ @@ -163,43 +141,6 @@ private: bool sendIsPossible(struct timeval * timeout); - /** - * startTCPServer - None blocking - * - * create a server socket - * bind - * listen - * - * Note: Does not call accept - */ - bool startTCPServer(); - - /** - * acceptClient - Blocking - * - * Accept a connection - * checks if "right" client has connected - * if so - * close server socket - * else - * close newly created socket and goto begin - */ - bool acceptClient(struct timeval * timeout); - - /** - * Creates a client socket - * - * Note does not call connect - */ - bool createClientSocket(); - - /** - * connectClient - Blocking - * - * connects to remote host - */ - bool connectClient(struct timeval * timeout); - /** * Statistics */ diff --git a/ndb/src/common/transporter/Transporter.cpp b/ndb/src/common/transporter/Transporter.cpp index 5ca523d5185..c6f93d2cbea 100644 --- a/ndb/src/common/transporter/Transporter.cpp +++ b/ndb/src/common/transporter/Transporter.cpp @@ -15,132 +15,125 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#include +#include #include "Transporter.hpp" #include "TransporterInternalDefinitions.hpp" #include +#include +#include +#include -Transporter::Transporter(NodeId lNodeId, NodeId rNodeId, +Transporter::Transporter(TransporterRegistry &t_reg, + const char *lHostName, + const char *rHostName, + int r_port, + NodeId lNodeId, + NodeId rNodeId, int _byteorder, bool _compression, bool _checksum, bool _signalId) - : localNodeId(lNodeId), remoteNodeId(rNodeId), - m_packer(_signalId, _checksum) + : m_r_port(r_port), localNodeId(lNodeId), remoteNodeId(rNodeId), + isServer(lNodeId < rNodeId), + m_packer(_signalId, _checksum), + m_transporter_registry(t_reg) { + if (rHostName && strlen(rHostName) > 0){ + strncpy(remoteHostName, rHostName, sizeof(remoteHostName)); + Ndb_getInAddr(&remoteHostAddress, rHostName); + } + else + { + if (!isServer) { + ndbout << "Unable to setup transporter. Node " << rNodeId + << " must have hostname. Update configuration." << endl; + exit(-1); + } + remoteHostName[0]= 0; + } + strncpy(localHostName, lHostName, sizeof(localHostName)); + + if (strlen(lHostName) > 0) + Ndb_getInAddr(&localHostAddress, lHostName); + byteOrder = _byteorder; compressionUsed = _compression; checksumUsed = _checksum; signalIdUsed = _signalId; - _threadError = TE_NO_ERROR; + m_connected = false; + m_timeOutMillis = 1000; - _connecting = false; - _disconnecting = false; - _connected = false; - _timeOutMillis = 1000; - theThreadPtr = NULL; - theMutexPtr = NdbMutex_Create(); + if (isServer) + m_socket_client= 0; + else + { + unsigned short tmp_port= 3307+rNodeId; + m_socket_client= new SocketClient(remoteHostName, tmp_port, + new SocketAuthSimple("ndbd passwd")); + } } Transporter::~Transporter(){ - NdbMutex_Destroy(theMutexPtr); - - if(theThreadPtr != 0){ - void * retVal; - NdbThread_WaitFor(theThreadPtr, &retVal); - NdbThread_Destroy(&theThreadPtr); - } + if (m_socket_client) + delete m_socket_client; } -extern "C" -void * -runConnect_C(void * me) -{ - runConnect(me); - NdbThread_Exit(0); - return NULL; +bool +Transporter::connect_server(NDB_SOCKET_TYPE sockfd) { + if(m_connected) + return true; // TODO assert(0); + + bool res = connect_server_impl(sockfd); + if(res){ + m_connected = true; + m_errorCount = 0; + } + + return res; } -void * -runConnect(void * me){ - Transporter * t = (Transporter *) me; +bool +Transporter::connect_client() { + if(m_connected) + return true; + + NDB_SOCKET_TYPE sockfd = m_socket_client->connect(); + + if (sockfd < 0) + return false; - DEBUG("Connect thread to " << t->remoteNodeId << " started"); + // send info about own id + SocketOutputStream s_output(sockfd); + s_output.println("%d", localNodeId); - while(true){ - NdbMutex_Lock(t->theMutexPtr); - if(t->_disconnecting){ - t->_connecting = false; - NdbMutex_Unlock(t->theMutexPtr); - DEBUG("Connect Thread " << t->remoteNodeId << " stop due to disconnect"); - return 0; - } - NdbMutex_Unlock(t->theMutexPtr); - - bool res = t->connectImpl(t->_timeOutMillis); // 1000 ms - DEBUG("Waiting for " << t->remoteNodeId << "..."); - if(res){ - t->_connected = true; - t->_connecting = false; - t->_errorCount = 0; - t->_threadError = TE_NO_ERROR; - DEBUG("Connect Thread " << t->remoteNodeId << " stop due to connect"); - return 0; - } + // get remote id + int nodeId; + SocketInputStream s_input(sockfd); + char buf[256]; + if (s_input.gets(buf, 256) == 0) { + NDB_CLOSE_SOCKET(sockfd); + return false; } + if (sscanf(buf, "%d", &nodeId) != 1) { + NDB_CLOSE_SOCKET(sockfd); + return false; + } + + bool res = connect_client_impl(sockfd); + if(res){ + m_connected = true; + m_errorCount = 0; + } + return res; } void -Transporter::doConnect() { - - NdbMutex_Lock(theMutexPtr); - if(_connecting || _disconnecting || _connected){ - NdbMutex_Unlock(theMutexPtr); - return; - } - - _connecting = true; +Transporter::doDisconnect() { - _threadError = TE_NO_ERROR; + if(!m_connected) + return; //assert(0); TODO will fail - // Start thread - - char buf[16]; - snprintf(buf, sizeof(buf), "ndb_con_%d", remoteNodeId); - - if(theThreadPtr != 0){ - void * retVal; - NdbThread_WaitFor(theThreadPtr, &retVal); - NdbThread_Destroy(&theThreadPtr); - } - - theThreadPtr = NdbThread_Create(runConnect_C, - (void**)this, - 32768, - buf, - NDB_THREAD_PRIO_LOW); - - NdbSleep_MilliSleep(100); // Let thread start - - NdbMutex_Unlock(theMutexPtr); -} - -void -Transporter::doDisconnect() { - - NdbMutex_Lock(theMutexPtr); - _disconnecting = true; - while(_connecting){ - DEBUG("Waiting for connect to finish..."); - - NdbMutex_Unlock(theMutexPtr); - NdbSleep_MilliSleep(500); - NdbMutex_Lock(theMutexPtr); - } - - _connected = false; - disconnectImpl(); - _threadError = TE_NO_ERROR; - _disconnecting = false; - - NdbMutex_Unlock(theMutexPtr); + + m_connected= false; } diff --git a/ndb/src/common/transporter/Transporter.hpp b/ndb/src/common/transporter/Transporter.hpp index 43b26d45899..9a39f8788bc 100644 --- a/ndb/src/common/transporter/Transporter.hpp +++ b/ndb/src/common/transporter/Transporter.hpp @@ -19,6 +19,9 @@ #include +#include + +#include #include #include "TransporterDefinitions.hpp" #include "Packer.hpp" @@ -40,8 +43,9 @@ public: * None blocking * Use isConnected() to check status */ - virtual void doConnect(); - + bool connect_client(); + bool connect_server(NDB_SOCKET_TYPE socket); + /** * Blocking */ @@ -60,14 +64,17 @@ public: */ NodeId getRemoteNodeId() const; - /** - * Set callback object + * Local (own) Node Id */ - void setCallbackObject(void * callback); + NodeId getLocalNodeId() const; protected: - Transporter(NodeId lNodeId, + Transporter(TransporterRegistry &, + const char *lHostName, + const char *rHostName, + int r_port, + NodeId lNodeId, NodeId rNodeId, int byteorder, bool compression, @@ -78,58 +85,59 @@ protected: * Blocking, for max timeOut milli seconds * Returns true if connect succeded */ - virtual bool connectImpl(Uint32 timeOut) = 0; + virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd) = 0; + virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd) = 0; /** * Blocking */ virtual void disconnectImpl() = 0; - const NodeId localNodeId; + /** + * Remote host name/and address + */ + char remoteHostName[256]; + char localHostName[256]; + struct in_addr remoteHostAddress; + struct in_addr localHostAddress; + + const unsigned int m_r_port; + const NodeId remoteNodeId; + const NodeId localNodeId; + const bool isServer; + unsigned createIndex; int byteOrder; bool compressionUsed; bool checksumUsed; bool signalIdUsed; - Packer m_packer; - + Packer m_packer; private: - /** - * Thread and mutex for connect - */ - NdbThread* theThreadPtr; - friend void* runConnect(void * me); + + SocketClient *m_socket_client; protected: - /** - * Error reporting from connect thread(s) - */ - void reportThreadError(NodeId nodeId, - TransporterError errorCode); Uint32 getErrorCount(); - TransporterError getThreadError(); - void resetThreadError(); - TransporterError _threadError; - Uint32 _timeOutMillis; - Uint32 _errorCount; + Uint32 m_errorCount; + Uint32 m_timeOutMillis; -protected: - NdbMutex* theMutexPtr; - bool _connected; // Are we connected - bool _connecting; // Connect thread is running - bool _disconnecting; // We are disconnecting +protected: + bool m_connected; // Are we connected - void * callbackObj; + TransporterRegistry &m_transporter_registry; + void *get_callback_obj() { return m_transporter_registry.callbackObj; }; + void report_disconnect(int err){m_transporter_registry.report_disconnect(remoteNodeId,err);}; + void report_error(enum TransporterError err){reportError(get_callback_obj(),remoteNodeId,err);}; }; inline bool Transporter::isConnected() const { - return _connected; + return m_connected; } inline @@ -138,42 +146,17 @@ Transporter::getRemoteNodeId() const { return remoteNodeId; } -inline -void -Transporter::reportThreadError(NodeId nodeId, TransporterError errorCode) -{ -#if 0 - ndbout_c("Transporter::reportThreadError (NodeId: %d, Error code: %d)", - nodeId, errorCode); -#endif - _threadError = errorCode; - _errorCount++; -} - inline -TransporterError -Transporter::getThreadError(){ - return _threadError; +NodeId +Transporter::getLocalNodeId() const { + return remoteNodeId; } inline Uint32 Transporter::getErrorCount() { - return _errorCount; -} - -inline -void -Transporter::resetThreadError() -{ - _threadError = TE_NO_ERROR; -} - -inline -void -Transporter::setCallbackObject(void * callback) { - callbackObj = callback; + return m_errorCount; } #endif // Define of Transporter_H diff --git a/ndb/src/common/transporter/TransporterRegistry.cpp b/ndb/src/common/transporter/TransporterRegistry.cpp index 3f98eeed89e..bad3b44706f 100644 --- a/ndb/src/common/transporter/TransporterRegistry.cpp +++ b/ndb/src/common/transporter/TransporterRegistry.cpp @@ -16,10 +16,11 @@ #include -#include "TransporterRegistry.hpp" +#include #include "TransporterInternalDefinitions.hpp" #include "Transporter.hpp" +#include #ifdef NDB_TCP_TRANSPORTER #include "TCP_Transporter.hpp" @@ -42,20 +43,67 @@ #include "NdbOut.hpp" #include #include -#define STEPPING 1 +#include +#include + +SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd) +{ + if (m_auth && !m_auth->server_authenticate(sockfd)){ + NDB_CLOSE_SOCKET(sockfd); + return 0; + } + + { + // read node id from client + int nodeId; + SocketInputStream s_input(sockfd); + char buf[256]; + if (s_input.gets(buf, 256) == 0) { + NDB_CLOSE_SOCKET(sockfd); + return 0; + } + if (sscanf(buf, "%d", &nodeId) != 1) { + NDB_CLOSE_SOCKET(sockfd); + return 0; + } + + //check that nodeid is valid and that there is an allocated transporter + if ( nodeId < 0 || nodeId >= m_transporter_registry->maxTransporters) { + NDB_CLOSE_SOCKET(sockfd); + return 0; + } + if (m_transporter_registry->theTransporters[nodeId] == 0) { + NDB_CLOSE_SOCKET(sockfd); + return 0; + } + + //check that the transporter should be connected + if (m_transporter_registry->performStates[nodeId] != TransporterRegistry::CONNECTING) { + NDB_CLOSE_SOCKET(sockfd); + return 0; + } + + Transporter *t= m_transporter_registry->theTransporters[nodeId]; + + // send info about own id (just as response to acnowledge connection) + SocketOutputStream s_output(sockfd); + s_output.println("%d", t->getLocalNodeId()); + + // setup transporter (transporter responsable for closing sockfd) + t->connect_server(sockfd); + } + + return 0; +} TransporterRegistry::TransporterRegistry(void * callback, unsigned _maxTransporters, unsigned sizeOfLongSignalMemory) { + m_transporter_service= 0; nodeIdSpecified = false; maxTransporters = _maxTransporters; sendCounter = 1; - m_ccCount = 0; - m_ccIndex = 0; - m_ccStep = STEPPING; - m_ccReady = false; - m_nTransportersPerformConnect=0; callbackObj=callback; @@ -82,7 +130,7 @@ TransporterRegistry::TransporterRegistry(void * callback, theSHMTransporters[i] = NULL; theOSETransporters[i] = NULL; theTransporters[i] = NULL; - performStates[i] = PerformNothing; + performStates[i] = DISCONNECTED; ioStates[i] = NoHalt; } theOSEReceiver = 0; @@ -154,13 +202,14 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) { return false; - TCP_Transporter * t = new TCP_Transporter(config->sendBufferSize, - config->maxReceiveSize, - config->port, - config->remoteHostName, + TCP_Transporter * t = new TCP_Transporter(*this, + config->sendBufferSize, + config->maxReceiveSize, config->localHostName, - config->remoteNodeId, + config->remoteHostName, + config->port, localNodeId, + config->remoteNodeId, config->byteOrder, config->compression, config->checksum, @@ -172,13 +221,11 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) { return false; } - t->setCallbackObject(callbackObj); - // Put the transporter in the transporter arrays theTCPTransporters[nTCPTransporters] = t; theTransporters[t->getRemoteNodeId()] = t; theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER; - performStates[t->getRemoteNodeId()] = PerformNothing; + performStates[t->getRemoteNodeId()] = DISCONNECTED; nTransporters++; nTCPTransporters++; @@ -228,12 +275,11 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) { delete t; return false; } - t->setCallbackObject(callbackObj); // Put the transporter in the transporter arrays theOSETransporters[nOSETransporters] = t; theTransporters[t->getRemoteNodeId()] = t; theTransporterTypes[t->getRemoteNodeId()] = tt_OSE_TRANSPORTER; - performStates[t->getRemoteNodeId()] = PerformNothing; + performStates[t->getRemoteNodeId()] = DISCONNECTED; nTransporters++; nOSETransporters++; @@ -279,12 +325,11 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) { delete t; return false; } - t->setCallbackObject(callbackObj); // Put the transporter in the transporter arrays theSCITransporters[nSCITransporters] = t; theTransporters[t->getRemoteNodeId()] = t; theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER; - performStates[t->getRemoteNodeId()] = PerformNothing; + performStates[t->getRemoteNodeId()] = DISCONNECTED; nTransporters++; nSCITransporters++; @@ -321,12 +366,11 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) { delete t; return false; } - t->setCallbackObject(callbackObj); // Put the transporter in the transporter arrays theSHMTransporters[nSHMTransporters] = t; theTransporters[t->getRemoteNodeId()] = t; theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER; - performStates[t->getRemoteNodeId()] = PerformNothing; + performStates[t->getRemoteNodeId()] = DISCONNECTED; nTransporters++; nSHMTransporters++; @@ -781,7 +825,7 @@ TransporterRegistry::performReceive(){ TCP_Transporter *t = theTCPTransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); const NDB_SOCKET_TYPE socket = t->getSocket(); - if(performStates[nodeId] == PerformIO){ + if(is_connected(nodeId)){ if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) { const int receiveSize = t->doReceive(); if(receiveSize > 0){ @@ -804,7 +848,7 @@ TransporterRegistry::performReceive(){ checkJobBuffer(); SCI_Transporter *t = theSCITransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); - if(performStates[nodeId] == PerformIO){ + if(is_connected(nodeId)){ if(t->isConnected() && t->checkConnected()){ Uint32 * readPtr, * eodPtr; t->getReceivePtr(&readPtr, &eodPtr); @@ -819,7 +863,7 @@ TransporterRegistry::performReceive(){ checkJobBuffer(); SHM_Transporter *t = theSHMTransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); - if(performStates[nodeId] == PerformIO){ + if(is_connected(nodeId)){ if(t->isConnected() && t->checkConnected()){ Uint32 * readPtr, * eodPtr; t->getReceivePtr(&readPtr, &eodPtr); @@ -840,7 +884,7 @@ TransporterRegistry::performSend(){ #ifdef NDB_OSE_TRANSPORTER for (int i = 0; i < nOSETransporters; i++){ OSE_Transporter *t = theOSETransporters[i]; - if((performStates[t->getRemoteNodeId()] == PerformIO) && + if((is_connected(t->getRemoteNodeId()) && (t->isConnected())) { t->doSend(); }//if @@ -887,7 +931,7 @@ TransporterRegistry::performSend(){ TCP_Transporter *t = theTCPTransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); const int socket = t->getSocket(); - if(performStates[nodeId] == PerformIO){ + if(is_connected(nodeId)){ if(t->isConnected() && FD_ISSET(socket, &writeset)) { t->doSend(); }//if @@ -901,7 +945,7 @@ TransporterRegistry::performSend(){ if (t && (t->hasDataToSend()) && (t->isConnected()) && - (performStates[t->getRemoteNodeId()] == PerformIO)) { + (is_connected(t->getRemoteNodeId()))) { t->doSend(); }//if }//for @@ -910,7 +954,7 @@ TransporterRegistry::performSend(){ if (t && (t->hasDataToSend()) && (t->isConnected()) && - (performStates[t->getRemoteNodeId()] == PerformIO)) { + (is_connected(t->getRemoteNodeId()))) { t->doSend(); }//if }//for @@ -925,7 +969,7 @@ TransporterRegistry::performSend(){ SCI_Transporter *t = theSCITransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); - if(performStates[nodeId] == PerformIO){ + if(is_connected(nodeId)){ if(t->isConnected() && t->hasDataToSend()) { t->doSend(); } //if @@ -961,56 +1005,6 @@ TransporterRegistry::printState(){ } #endif -PerformState -TransporterRegistry::performState(NodeId nodeId) { - return performStates[nodeId]; -} - -#ifdef DEBUG_TRANSPORTER -const char * -performStateString(PerformState state){ - switch(state){ - case PerformNothing: - return "PerformNothing"; - break; - case PerformIO: - return "PerformIO"; - break; - case PerformConnect: - return "PerformConnect"; - break; - case PerformDisconnect: - return "PerformDisconnect"; - break; - case RemoveTransporter: - return "RemoveTransporter"; - break; - } - return "Unknown"; -} -#endif - -void -TransporterRegistry::setPerformState(NodeId nodeId, PerformState state) { - DEBUG("TransporterRegistry::setPerformState(" - << nodeId << ", " << performStateString(state) << ")"); - - performStates[nodeId] = state; -} - -void -TransporterRegistry::setPerformState(PerformState state) { - int count = 0; - int index = 0; - while(count < nTransporters){ - if(theTransporters[index] != 0){ - setPerformState(theTransporters[index]->getRemoteNodeId(), state); - count ++; - } - index ++; - } -} - IOState TransporterRegistry::ioState(NodeId nodeId) { return ioStates[nodeId]; @@ -1023,8 +1017,198 @@ TransporterRegistry::setIOState(NodeId nodeId, IOState state) { ioStates[nodeId] = state; } +static void * +run_start_clients_C(void * me) +{ + ((TransporterRegistry*) me)->start_clients_thread(); + NdbThread_Exit(0); + return me; +} + +// Run by kernel thread void -TransporterRegistry::startReceiving(){ +TransporterRegistry::do_connect(NodeId node_id) +{ + PerformState &curr_state = performStates[node_id]; + switch(curr_state){ + case DISCONNECTED: + break; + case CONNECTED: + return; + case CONNECTING: + return; + case DISCONNECTING: + break; + } + curr_state= CONNECTING; +} +void +TransporterRegistry::do_disconnect(NodeId node_id) +{ + PerformState &curr_state = performStates[node_id]; + switch(curr_state){ + case DISCONNECTED: + return; + case CONNECTED: + break; + case CONNECTING: + break; + case DISCONNECTING: + return; + } + curr_state= DISCONNECTING; +} + +void +TransporterRegistry::report_connect(NodeId node_id) +{ + performStates[node_id] = CONNECTED; + reportConnect(callbackObj, node_id); +} + +void +TransporterRegistry::report_disconnect(NodeId node_id, int errnum) +{ + performStates[node_id] = DISCONNECTED; + reportDisconnect(callbackObj, node_id, errnum); +} + +void +TransporterRegistry::update_connections() +{ + for (int i= 0, n= 0; n < nTransporters; i++){ + Transporter * t = theTransporters[i]; + if (!t) + continue; + n++; + + const NodeId nodeId = t->getRemoteNodeId(); + switch(performStates[nodeId]){ + case CONNECTED: + case DISCONNECTED: + break; + case CONNECTING: + if(t->isConnected()) + report_connect(nodeId); + break; + case DISCONNECTING: + if(!t->isConnected()) + report_disconnect(nodeId, 0); + break; + } + } +} + +// run as own thread +void +TransporterRegistry::start_clients_thread() +{ + while (m_run_start_clients_thread) { + NdbSleep_MilliSleep(100); + for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){ + Transporter * t = theTransporters[i]; + if (!t) + continue; + n++; + + const NodeId nodeId = t->getRemoteNodeId(); + switch(performStates[nodeId]){ + case CONNECTING: + if(!t->isConnected() && !t->isServer) + t->connect_client(); + break; + case DISCONNECTING: + if(t->isConnected()) + t->doDisconnect(); + break; + default: + break; + } + } + } +} + +bool +TransporterRegistry::start_clients() +{ + m_run_start_clients_thread= true; + m_start_clients_thread= NdbThread_Create(run_start_clients_C, + (void**)this, + 32768, + "ndb_start_clients", + NDB_THREAD_PRIO_LOW); + if (m_start_clients_thread == 0) { + m_run_start_clients_thread= false; + return false; + } + return true; +} + +bool +TransporterRegistry::stop_clients() +{ + if (m_start_clients_thread) { + m_run_start_clients_thread= false; + void* status; + int r= NdbThread_WaitFor(m_start_clients_thread, &status); + NdbThread_Destroy(&m_start_clients_thread); + } + return true; +} + +bool +TransporterRegistry::start_service(SocketServer& socket_server) +{ +#if 0 + for (int i= 0, n= 0; n < nTransporters; i++){ + Transporter * t = theTransporters[i]; + if (!t) + continue; + n++; + if (t->isServer) { + t->m_service = new TransporterService(new SocketAuthSimple("ndbd passwd")); + if(!socket_server.setup(t->m_service, t->m_r_port, 0)) + { + ndbout_c("Unable to setup transporter service port: %d!\n" + "Please check if the port is already used,\n" + "(perhaps a mgmtsrvrserver is already running)", + m_service_port); + delete t->m_service; + return false; + } + } + } +#endif + + m_transporter_service = new TransporterService(new SocketAuthSimple("ndbd passwd")); + + if (nodeIdSpecified != true) { + ndbout_c("TransporterRegistry::startReceiving: localNodeId not specified"); + return false; + } + + m_service_port = 3307 + localNodeId; + //m_interface_name = "ndbd"; + m_interface_name = 0; + + if(!socket_server.setup(m_transporter_service, m_service_port, m_interface_name)) + { + ndbout_c("Unable to setup transporter service port: %d!\n" + "Please check if the port is already used,\n" + "(perhaps a mgmtsrvrserver is already running)", + m_service_port); + delete m_transporter_service; + return false; + } + + m_transporter_service->setTransporterRegistry(this); + + return true; +} + +void +TransporterRegistry::startReceiving() +{ #ifdef NDB_OSE_TRANSPORTER if(theOSEReceiver != NULL){ theOSEReceiver->createPhantom(); @@ -1081,99 +1265,6 @@ TransporterRegistry::stopSending(){ #endif } -/** - * The old implementation did not scale with a large - * number of nodes. (Watchdog killed NDB because - * it took too long time to allocated threads in - * doConnect. - * - * The new implementation only checks the connection - * for a number of transporters (STEPPING), until to - * the point where all transporters has executed - * doConnect once. After that, the behaviour is as - * in the old implemenation, i.e, checking the connection - * for all transporters. - * @todo: instead of STEPPING, maybe we should only - * allow checkConnections to execute for a certain - * time that somehow factors in heartbeat times and - * watchdog times. - * - */ - -void -TransporterRegistry::checkConnections(){ - if(m_ccStep > nTransporters) - m_ccStep = nTransporters; - - while(m_ccCount < m_ccStep){ - if(theTransporters[m_ccIndex] != 0){ - Transporter * t = theTransporters[m_ccIndex]; - const NodeId nodeId = t->getRemoteNodeId(); - if(t->getThreadError() != 0) { - reportError(callbackObj, nodeId, t->getThreadError()); - t->resetThreadError(); - } - - switch(performStates[nodeId]){ - case PerformConnect: - if(!t->isConnected()){ - t->doConnect(); - if(m_nTransportersPerformConnect!=nTransporters) - m_nTransportersPerformConnect++; - - } else { - performStates[nodeId] = PerformIO; - reportConnect(callbackObj, nodeId); - } - break; - case PerformDisconnect: - { - bool wasConnected = t->isConnected(); - t->doDisconnect(); - performStates[nodeId] = PerformNothing; - if(wasConnected){ - reportDisconnect(callbackObj, nodeId,0); - } - } - break; - case RemoveTransporter: - removeTransporter(nodeId); - break; - case PerformNothing: - case PerformIO: - break; - } - m_ccCount ++; - } - m_ccIndex ++; - } - - if(!m_ccReady) { - if(m_ccCount < nTransporters) { - if(nTransporters - m_ccStep < STEPPING) - m_ccStep += nTransporters-m_ccStep; - else - m_ccStep += STEPPING; - - // ndbout_c("count %d step %d ", m_ccCount, m_ccStep); - } - else { - m_ccCount = 0; - m_ccIndex = 0; - m_ccStep = STEPPING; - // ndbout_c("count %d step %d ", m_ccCount, m_ccStep); - } - } - if((nTransporters == m_nTransportersPerformConnect) || m_ccReady) { - m_ccReady = true; - m_ccCount = 0; - m_ccIndex = 0; - m_ccStep = nTransporters; - // ndbout_c("alla count %d step %d ", m_ccCount, m_ccStep); - } - -}//TransporterRegistry::checkConnections() - NdbOut & operator <<(NdbOut & out, SignalHeader & sh){ out << "-- Signal Header --" << endl; out << "theLength: " << sh.theLength << endl; diff --git a/ndb/src/common/util/Makefile.am b/ndb/src/common/util/Makefile.am index 59d9775b8e3..678added01e 100644 --- a/ndb/src/common/util/Makefile.am +++ b/ndb/src/common/util/Makefile.am @@ -3,7 +3,8 @@ noinst_LTLIBRARIES = libgeneral.la libgeneral_la_SOURCES = \ File.cpp md5_hash.cpp Properties.cpp socket_io.cpp \ - SimpleProperties.cpp Parser.cpp InputStream.cpp SocketServer.cpp \ + SimpleProperties.cpp Parser.cpp InputStream.cpp \ + SocketServer.cpp SocketClient.cpp SocketAuthenticator.cpp\ OutputStream.cpp NdbOut.cpp BaseString.cpp Base64.cpp \ NdbSqlUtil.cpp new.cpp \ uucode.c random.c getarg.c version.c \ diff --git a/ndb/src/common/util/SocketAuthenticator.cpp b/ndb/src/common/util/SocketAuthenticator.cpp new file mode 100644 index 00000000000..d0abf89b2b1 --- /dev/null +++ b/ndb/src/common/util/SocketAuthenticator.cpp @@ -0,0 +1,63 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +#include + +#include +#include +#include + +SocketAuthSimple::SocketAuthSimple(const char *passwd) { + m_passwd= strdup(passwd); + m_buf= (char*)malloc(strlen(passwd)+1); +} + +SocketAuthSimple::~SocketAuthSimple() +{ + if (m_passwd) + free((void*)m_passwd); + if (m_buf) + free(m_buf); +} + +bool SocketAuthSimple::client_authenticate(int sockfd) +{ + if (!m_passwd) + return false; + + int len = strlen(m_passwd); + int r; + r= send(sockfd, m_passwd, len, 0); + + r= recv(sockfd, m_buf, len, 0); + m_buf[r]= '\0'; + + return true; +} + +bool SocketAuthSimple::server_authenticate(int sockfd) +{ + if (!m_passwd) + return false; + + int len = strlen(m_passwd), r; + r= recv(sockfd, m_buf, len, 0); + m_buf[r]= '\0'; + r= send(sockfd, m_passwd, len, 0); + + return true; +} diff --git a/ndb/src/common/util/SocketClient.cpp b/ndb/src/common/util/SocketClient.cpp new file mode 100644 index 00000000000..b7769633875 --- /dev/null +++ b/ndb/src/common/util/SocketClient.cpp @@ -0,0 +1,90 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +#include +#include + +#include +#include + +SocketClient::SocketClient(const char *server_name, unsigned short port, SocketAuthenticator *sa) +{ + m_auth= sa; + m_port= port; + m_server_name= strdup(server_name); + m_sockfd= -1; +} + +SocketClient::~SocketClient() +{ + if (m_server_name) + free(m_server_name); + if (m_sockfd >= 0) + NDB_CLOSE_SOCKET(m_sockfd); + if (m_auth) + delete m_auth; +} + +bool +SocketClient::init() +{ + if (m_sockfd >= 0) + NDB_CLOSE_SOCKET(m_sockfd); + + memset(&m_servaddr, 0, sizeof(m_servaddr)); + m_servaddr.sin_family = AF_INET; + m_servaddr.sin_port = htons(m_port); + // Convert ip address presentation format to numeric format + if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name)) + return false; + + m_sockfd= socket(AF_INET, SOCK_STREAM, 0); + if (m_sockfd == NDB_INVALID_SOCKET) { + return false; + } + + return true; +} + +NDB_SOCKET_TYPE +SocketClient::connect() +{ + if (m_sockfd < 0) + { + if (!init()) { + ndbout << "SocketClient::connect() failed " << m_server_name << " " << m_port << endl; + return -1; + } + } + + const int r = ::connect(m_sockfd, (struct sockaddr*) &m_servaddr, sizeof(m_servaddr)); + if (r == -1) + return -1; + + if (m_auth) + if (!m_auth->client_authenticate(m_sockfd)) + { + NDB_CLOSE_SOCKET(m_sockfd); + m_sockfd= -1; + return -1; + } + + NDB_SOCKET_TYPE sockfd= m_sockfd; + m_sockfd= -1; + + return sockfd; +} diff --git a/ndb/src/common/util/SocketServer.cpp b/ndb/src/common/util/SocketServer.cpp index a0ec0aaa676..67cbf8aba4a 100644 --- a/ndb/src/common/util/SocketServer.cpp +++ b/ndb/src/common/util/SocketServer.cpp @@ -17,7 +17,7 @@ #include -#include "SocketServer.hpp" +#include #include #include diff --git a/ndb/src/kernel/Makefile.am b/ndb/src/kernel/Makefile.am index b2aa5f2e074..60284f6a369 100644 --- a/ndb/src/kernel/Makefile.am +++ b/ndb/src/kernel/Makefile.am @@ -4,7 +4,7 @@ include $(top_srcdir)/ndb/config/common.mk.am ndbbin_PROGRAMS = ndbd -ndbd_SOURCES = Main.cpp SimBlockList.cpp +ndbd_SOURCES = main.cpp SimBlockList.cpp include $(top_srcdir)/ndb/config/type_kernel.mk.am diff --git a/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp b/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp index 694007c8508..fd7d129c790 100644 --- a/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp +++ b/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp @@ -360,7 +360,7 @@ void Cmvmi::execCLOSE_COMREQ(Signal* signal) sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB); globalTransporterRegistry.setIOState(i, HaltIO); - globalTransporterRegistry.setPerformState(i, PerformDisconnect); + globalTransporterRegistry.do_disconnect(i); /** * Cancel possible event subscription @@ -388,7 +388,7 @@ void Cmvmi::execOPEN_COMREQ(Signal* signal) const Uint32 len = signal->getLength(); if(len == 2){ - globalTransporterRegistry.setPerformState(tStartingNode, PerformConnect); + globalTransporterRegistry.do_connect(tStartingNode); globalTransporterRegistry.setIOState(tStartingNode, HaltIO); //----------------------------------------------------- @@ -403,7 +403,7 @@ void Cmvmi::execOPEN_COMREQ(Signal* signal) jam(); if (i != getOwnNodeId() && getNodeInfo(i).m_type == tData2){ jam(); - globalTransporterRegistry.setPerformState(i, PerformConnect); + globalTransporterRegistry.do_connect(i); globalTransporterRegistry.setIOState(i, HaltIO); signal->theData[0] = EventReport::CommunicationOpened; @@ -454,34 +454,21 @@ void Cmvmi::execDISCONNECT_REP(Signal *signal) const NodeInfo::NodeType type = getNodeInfo(hostId).getType(); ndbrequire(type != NodeInfo::INVALID); - if (globalTransporterRegistry.performState(hostId) != PerformDisconnect) { + if(type == NodeInfo::DB || globalData.theStartLevel == NodeState::SL_STARTED){ jam(); - - // ------------------------------------------------------------------- - // We do not report the disconnection when disconnection is already ongoing. - // This reporting should be looked into but this secures that we avoid - // crashes due to too quick re-reporting of disconnection. - // ------------------------------------------------------------------- - if(type == NodeInfo::DB || globalData.theStartLevel == NodeState::SL_STARTED){ - jam(); - DisconnectRep * const rep = (DisconnectRep *)&signal->theData[0]; - rep->nodeId = hostId; - rep->err = errNo; - sendSignal(QMGR_REF, GSN_DISCONNECT_REP, signal, - DisconnectRep::SignalLength, JBA); - globalTransporterRegistry.setPerformState(hostId, PerformDisconnect); - } else if(globalData.theStartLevel == NodeState::SL_CMVMI || - globalData.theStartLevel == NodeState::SL_STARTING) { - /** - * Someone disconnected during cmvmi period - */ - if(type == NodeInfo::MGM){ - jam(); - globalTransporterRegistry.setPerformState(hostId, PerformConnect); - } else { - globalTransporterRegistry.setPerformState(hostId, PerformDisconnect); - } - } + DisconnectRep * const rep = (DisconnectRep *)&signal->theData[0]; + rep->nodeId = hostId; + rep->err = errNo; + sendSignal(QMGR_REF, GSN_DISCONNECT_REP, signal, + DisconnectRep::SignalLength, JBA); + } else if((globalData.theStartLevel == NodeState::SL_CMVMI || + globalData.theStartLevel == NodeState::SL_STARTING) + && type == NodeInfo::MGM) { + /** + * Someone disconnected during cmvmi period + */ + jam(); + globalTransporterRegistry.do_connect(hostId); } signal->theData[0] = EventReport::Disconnected; @@ -520,7 +507,8 @@ void Cmvmi::execCONNECT_REP(Signal *signal){ /** * Dont allow api nodes to connect */ - globalTransporterRegistry.setPerformState(hostId, PerformDisconnect); + abort(); + globalTransporterRegistry.do_disconnect(hostId); } } @@ -754,8 +742,8 @@ Cmvmi::execSTART_ORD(Signal* signal) { */ for(unsigned int i = 1; i < MAX_NODES; i++ ){ if (getNodeInfo(i).m_type == NodeInfo::MGM){ - if(globalTransporterRegistry.performState(i) != PerformIO){ - globalTransporterRegistry.setPerformState(i, PerformConnect); + if(!globalTransporterRegistry.is_connected(i)){ + globalTransporterRegistry.do_connect(i); globalTransporterRegistry.setIOState(i, NoHalt); } } @@ -781,7 +769,7 @@ Cmvmi::execSTART_ORD(Signal* signal) { // without any connected nodes. for(unsigned int i = 1; i < MAX_NODES; i++ ){ if (i != getOwnNodeId() && getNodeInfo(i).m_type != NodeInfo::MGM){ - globalTransporterRegistry.setPerformState(i, PerformDisconnect); + globalTransporterRegistry.do_disconnect(i); globalTransporterRegistry.setIOState(i, HaltIO); } } @@ -1060,29 +1048,10 @@ Cmvmi::execDUMP_STATE_ORD(Signal* signal) if(nodeTypeStr == 0) continue; - const char* actionStr = ""; - switch (globalTransporterRegistry.performState(i)){ - case PerformNothing: - actionStr = "does nothing"; - break; - case PerformIO: - actionStr = "is connected"; - break; - case PerformConnect: - actionStr = "is trying to connect"; - break; - case PerformDisconnect: - actionStr = "is trying to disconnect"; - break; - case RemoveTransporter: - actionStr = "will be removed"; - break; - } - infoEvent("Connection to %d (%s) %s", i, nodeTypeStr, - actionStr); + globalTransporterRegistry.getPerformStateString(i)); } } diff --git a/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp b/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp index f2d2edb615d..46f1acb9761 100644 --- a/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp +++ b/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp @@ -1704,6 +1704,7 @@ void Qmgr::sendApiFailReq(Signal* signal, Uint16 failedNodeNo) sendSignal(DBTC_REF, GSN_API_FAILREQ, signal, 2, JBA); sendSignal(DBDICT_REF, GSN_API_FAILREQ, signal, 2, JBA); sendSignal(SUMA_REF, GSN_API_FAILREQ, signal, 2, JBA); + /** * GREP also need the information that an API node * (actually a REP node) has failed. @@ -1978,8 +1979,11 @@ void Qmgr::execAPI_REGREQ(Signal* signal) apiRegConf->nodeState.dynamicId = -dynamicId; } } + c_connectedNodes.copyto(NdbNodeBitmask::Size, + apiRegConf->connected_nodes.data); + sendSignal(ref, GSN_API_REGCONF, signal, ApiRegConf::SignalLength, JBB); - + if ((getNodeState().startLevel == NodeState::SL_STARTED || getNodeState().getSingleUserMode()) && apiNodePtr.p->phase == ZAPI_INACTIVE) { diff --git a/ndb/src/kernel/Main.cpp b/ndb/src/kernel/main.cpp similarity index 94% rename from ndb/src/kernel/Main.cpp rename to ndb/src/kernel/main.cpp index 7bd4e75ca18..d2137a63c4d 100644 --- a/ndb/src/kernel/Main.cpp +++ b/ndb/src/kernel/main.cpp @@ -20,7 +20,7 @@ #include "Configuration.hpp" #include -#include "SimBlockList.hpp" +#include "vm/SimBlockList.hpp" #include "ThreadConfig.hpp" #include #include @@ -171,13 +171,29 @@ NDB_MAIN(ndb_kernel){ NDB_ASSERT(0, "Illegal state globalData.theRestartFlag"); } + SocketServer socket_server; + globalTransporterRegistry.startSending(); globalTransporterRegistry.startReceiving(); + if (!globalTransporterRegistry.start_service(socket_server)) + NDB_ASSERT(0, "globalTransporterRegistry.start_service() failed"); + + if (!globalTransporterRegistry.start_clients()) + NDB_ASSERT(0, "globalTransporterRegistry.start_clients() failed"); + globalEmulatorData.theWatchDog->doStart(); + socket_server.startServer(); + globalEmulatorData.theThreadConfig->ipControlLoop(); NdbShutdown(NST_Normal); + + socket_server.stopServer(); + socket_server.stopSessions(); + + globalTransporterRegistry.stop_clients(); + return NRT_Default; } diff --git a/ndb/src/kernel/vm/ThreadConfig.cpp b/ndb/src/kernel/vm/ThreadConfig.cpp index d18b20a5bb5..4844bb9a477 100644 --- a/ndb/src/kernel/vm/ThreadConfig.cpp +++ b/ndb/src/kernel/vm/ThreadConfig.cpp @@ -147,8 +147,8 @@ void ThreadConfig::ipControlLoop() // plus checking for any received messages. //-------------------------------------------------------------------- if (i++ >= 20) { + globalTransporterRegistry.update_connections(); globalData.incrementWatchDogCounter(5); - globalTransporterRegistry.checkConnections(); i = 0; }//if diff --git a/ndb/src/mgmapi/mgmapi.cpp b/ndb/src/mgmapi/mgmapi.cpp index bb4b6be8221..21a2ab074e7 100644 --- a/ndb/src/mgmapi/mgmapi.cpp +++ b/ndb/src/mgmapi/mgmapi.cpp @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -318,8 +319,8 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow *command_reply, /** * Print some info about why the parser returns NULL */ -// ndbout << " status=" << ctx.m_status << ", curr=" -// << ctx.m_currentToken << endl; + //ndbout << " status=" << ctx.m_status << ", curr=" + //<< ctx.m_currentToken << endl; } #ifdef MGMAPI_LOG else { @@ -362,30 +363,11 @@ ndb_mgm_connect(NdbMgmHandle handle, const char * mgmsrv) /** * Do connect */ - const NDB_SOCKET_TYPE sockfd = socket(AF_INET, SOCK_STREAM, 0); - if (sockfd == NDB_INVALID_SOCKET) { - SET_ERROR(handle, NDB_MGM_ILLEGAL_SOCKET, ""); - return -1; - } - - struct sockaddr_in servaddr; - memset(&servaddr, 0, sizeof(servaddr)); - servaddr.sin_family = AF_INET; - servaddr.sin_port = htons(handle->port); - // Convert ip address presentation format to numeric format - const int res1 = Ndb_getInAddr(&servaddr.sin_addr, handle->hostname); - if (res1 != 0) { - DEBUG("Ndb_getInAddr(...) == -1"); - setError(handle, EINVAL, __LINE__, "Invalid hostname/address"); - return -1; - } - - const int res2 = connect(sockfd, (struct sockaddr*) &servaddr, - sizeof(servaddr)); - if (res2 == -1) { - NDB_CLOSE_SOCKET(sockfd); - setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__, "Unable to connect to %s", - mgmsrv); + SocketClient s(handle->hostname, handle->port); + const NDB_SOCKET_TYPE sockfd = s.connect(); + if (sockfd < 0) { + setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__, + "Unable to connect to %s", mgmsrv); return -1; } @@ -1523,6 +1505,55 @@ ndb_mgm_get_configuration(NdbMgmHandle handle, unsigned int version) { return 0; } +extern "C" +int +ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, unsigned *pnodeid, int nodetype) +{ + + CHECK_HANDLE(handle, 0); + CHECK_CONNECTED(handle, 0); + + Properties args; + args.put("version", version); + args.put("nodetype", nodetype); + args.put("nodeid", *pnodeid); + args.put("user", "mysqld"); + args.put("password", "mysqld"); + args.put("public key", "a public key"); + + const ParserRow reply[]= { + MGM_CMD("get nodeid reply", NULL, ""), + MGM_ARG("nodeid", Int, Optional, "Error message"), + MGM_ARG("result", String, Mandatory, "Error message"), + MGM_END() + }; + + const Properties *prop; + prop= ndb_mgm_call(handle, reply, "get nodeid", &args); + + if(prop == NULL) { + SET_ERROR(handle, EIO, "Unable to alloc nodeid"); + return -1; + } + + int res= -1; + do { + const char * buf; + if(!prop->get("result", &buf) || strcmp(buf, "Ok") != 0){ + ndbout_c("ERROR Message: %s\n", buf); + break; + } + if(!prop->get("nodeid", pnodeid) != 0){ + ndbout_c("ERROR Message: \n"); + break; + } + res= 0; + }while(0); + + delete prop; + return res; +} + /***************************************************************************** * Global Replication ******************************************************************************/ diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp index 713433cb8e9..77ff52dc4bb 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.cpp +++ b/ndb/src/mgmsrv/MgmtSrvr.cpp @@ -43,7 +43,7 @@ #include #include -#include "SocketServer.hpp" +#include #include "NodeLogLevel.hpp" #include @@ -390,6 +390,95 @@ MgmtSrvr::getNodeCount(enum ndb_mgm_node_type type) const return count; } +int +MgmtSrvr::getPort() const { + const Properties *mgmProps; + + ndb_mgm_configuration_iterator * iter = + ndb_mgm_create_configuration_iterator(_config->m_configValues, + CFG_SECTION_NODE); + if(iter == 0) + return 0; + + if(ndb_mgm_find(iter, CFG_NODE_ID, getOwnNodeId()) != 0){ + ndbout << "Could not retrieve configuration for Node " + << getOwnNodeId() << " in config file." << endl + << "Have you set correct NodeId for this node?" << endl; + ndb_mgm_destroy_iterator(iter); + return 0; + } + + unsigned type; + if(ndb_mgm_get_int_parameter(iter, CFG_TYPE_OF_SECTION, &type) != 0 || + type != NODE_TYPE_MGM){ + ndbout << "Local node id " << getOwnNodeId() + << " is not defined as management server" << endl + << "Have you set correct NodeId for this node?" << endl; + return 0; + } + + Uint32 port = 0; + if(ndb_mgm_get_int_parameter(iter, CFG_MGM_PORT, &port) != 0){ + ndbout << "Could not find PortNumber in the configuration file." << endl; + return 0; + } + + /***************** + * Set Stat Port * + *****************/ +#if 0 + if (!mgmProps->get("PortNumberStats", &tmp)){ + ndbout << "Could not find PortNumberStats in the configuration file." + << endl; + return false; + } + glob.port_stats = tmp; +#endif + +#if 0 + const char * host; + if(ndb_mgm_get_string_parameter(iter, mgmProps->get("ExecuteOnComputer", host)){ + ndbout << "Failed to find \"ExecuteOnComputer\" for my node" << endl; + ndbout << "Unable to verify own hostname" << endl; + return false; + } + + const char * hostname; + { + const Properties * p; + char buf[255]; + snprintf(buf, sizeof(buf), "Computer_%s", host.c_str()); + if(!glob.cluster_config->get(buf, &p)){ + ndbout << "Failed to find computer " << host << " in config" << endl; + ndbout << "Unable to verify own hostname" << endl; + return false; + } + if(!p->get("HostName", &hostname)){ + ndbout << "Failed to find \"HostName\" for computer " << host + << " in config" << endl; + ndbout << "Unable to verify own hostname" << endl; + return false; + } + if(NdbHost_GetHostName(buf) != 0){ + ndbout << "Unable to get own hostname" << endl; + ndbout << "Unable to verify own hostname" << endl; + return false; + } + } + + const char * ip_address; + if(mgmProps->get("IpAddress", &ip_address)){ + glob.use_specific_ip = true; + glob.interface_name = strdup(ip_address); + return true; + } + + glob.interface_name = strdup(hostname); +#endif + + return port; +} + int MgmtSrvr::getStatPort() const { #if 0 @@ -419,7 +508,6 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId, theWaitState(WAIT_SUBSCRIBE_CONF), theConfCount(0) { - _ownNodeId = nodeId; _config = NULL; _isStatPortActive = false; _isClusterLogStatActive = false; @@ -429,6 +517,8 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId, _logLevelThreadSleep = 500; _startedNodeId = 0; + theFacade = 0; + m_newConfig = NULL; m_configFilename = configFilename; setCallback(CmdBackupCallback); @@ -486,6 +576,15 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId, _clusterLogLevelList = new NodeLogLevelList(); _props = NULL; + + _ownNodeId= 0; + NodeId tmp= nodeId > 0 ? nodeId-1 : 0; + if (getNextFreeNodeId(&tmp, NDB_MGM_NODE_TYPE_MGM)){ + _ownNodeId= tmp; + if (nodeId != 0 && nodeId != tmp) + _ownNodeId= 0; // did not get nodeid requested + } else + NDB_ASSERT(0, "Unable to retrieve own node id"); } @@ -510,8 +609,7 @@ MgmtSrvr::start() return false; } theFacade = TransporterFacade::start_instance - (_ownNodeId, - (ndb_mgm_configuration*)_config->m_configValues); + (_ownNodeId,(ndb_mgm_configuration*)_config->m_configValues); if(theFacade == 0) { DEBUG("MgmtSrvr.cpp: theFacade is NULL."); @@ -1896,6 +1994,7 @@ MgmtSrvr::handleReceivedSignal(NdbApiSignal* signal) int returnCode; int gsn = signal->readSignalNumber(); + switch (gsn) { case GSN_API_VERSION_CONF: { if (theWaitState == WAIT_VERSION) { @@ -2187,6 +2286,36 @@ MgmtSrvr::getNodeType(NodeId nodeId) const return nodeTypes[nodeId]; } +bool +MgmtSrvr::getNextFreeNodeId(NodeId * nodeId, + enum ndb_mgm_node_type type) const +{ +#if 0 + ndbout << "MgmtSrvr::getNextFreeNodeId type=" << type + << " *nodeid=" << *nodeId << endl; +#endif + + NodeId tmp= *nodeId; + if (theFacade && theFacade->theClusterMgr) { + while(getNextNodeId(&tmp, type)){ + if (theFacade->theClusterMgr->m_connected_nodes.get(tmp)) + continue; +#if 0 + ndbout << "MgmtSrvr::getNextFreeNodeId ret=" << tmp << endl; +#endif + *nodeId= tmp; + return true; + } + } else if (getNextNodeId(&tmp, type)){ +#if 0 + ndbout << "MgmtSrvr::getNextFreeNodeId (theFacade==0) ret=" << tmp << endl; +#endif + *nodeId= tmp; + return true; + } + return false; +} + bool MgmtSrvr::getNextNodeId(NodeId * nodeId, enum ndb_mgm_node_type type) const { diff --git a/ndb/src/mgmsrv/MgmtSrvr.hpp b/ndb/src/mgmsrv/MgmtSrvr.hpp index 1d394a14857..5760a55a676 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.hpp +++ b/ndb/src/mgmsrv/MgmtSrvr.hpp @@ -150,10 +150,12 @@ public: enum LogMode {In, Out, InOut, Off}; /* Constructor */ + MgmtSrvr(NodeId nodeId, /* Local nodeid */ const BaseString &config_filename, /* Where to save config */ const BaseString &ndb_config_filename, /* Ndb.cfg filename */ Config * config); + NodeId getOwnNodeId() const {return _ownNodeId;}; /** * Read (initial) config file, create TransporterFacade, @@ -448,6 +450,7 @@ public: * @return false if none found */ bool getNextNodeId(NodeId * _nodeId, enum ndb_mgm_node_type type) const ; + bool getNextFreeNodeId(NodeId * _nodeId, enum ndb_mgm_node_type type) const ; /** * @@ -492,6 +495,11 @@ public: * @return statistic port number. */ int getStatPort() const; + /** + * Returns the port number. + * @return port number. + */ + int getPort() const; //************************************************************************** diff --git a/ndb/src/mgmsrv/Services.cpp b/ndb/src/mgmsrv/Services.cpp index 739eef90c52..2049ca54864 100644 --- a/ndb/src/mgmsrv/Services.cpp +++ b/ndb/src/mgmsrv/Services.cpp @@ -121,6 +121,14 @@ ParserRow commands[] = { MGM_ARG("version", Int, Mandatory, "Configuration version number"), MGM_ARG("node", Int, Optional, "Node ID"), + MGM_CMD("get nodeid", &MgmApiSession::get_nodeid, ""), + MGM_ARG("version", Int, Mandatory, "Configuration version number"), + MGM_ARG("nodetype", Int, Mandatory, "Node type"), + MGM_ARG("nodeid", Int, Optional, "Node ID"), + MGM_ARG("user", String, Mandatory, "Password"), + MGM_ARG("password", String, Mandatory, "Password"), + MGM_ARG("public key", String, Mandatory, "Public key"), + MGM_CMD("get version", &MgmApiSession::getVersion, ""), MGM_CMD("get status", &MgmApiSession::getStatus, ""), @@ -332,6 +340,82 @@ backward(const char * base, const Properties* reply){ return ret; } +void +MgmApiSession::get_nodeid(Parser_t::Context &, + const class Properties &args) +{ + const char *cmd= "get nodeid reply"; + Uint32 version, nodeid= 0, nodetype= 0xff; + const char * user; + const char * password; + const char * public_key; + + args.get("version", &version); + args.get("nodetype", &nodetype); + args.get("nodeid", &nodeid); + args.get("user", &user); + args.get("password", &password); + args.get("public key", &public_key); + + NodeId free_id= 0; + NodeId tmp= nodeid > 0 ? nodeid-1 : 0; + bool compatible; + switch (nodetype) { + case NODE_TYPE_MGM: + compatible = ndbCompatible_mgmt_api(NDB_VERSION, version); + if (m_mgmsrv.getNextFreeNodeId(&tmp, NDB_MGM_NODE_TYPE_MGM)) + free_id= tmp; + break; + case NODE_TYPE_API: + compatible = ndbCompatible_mgmt_api(NDB_VERSION, version); + if (m_mgmsrv.getNextFreeNodeId(&tmp, NDB_MGM_NODE_TYPE_API)) + free_id= tmp; + break; + case NODE_TYPE_DB: + compatible = ndbCompatible_mgmt_ndb(NDB_VERSION, version); + if (m_mgmsrv.getNextFreeNodeId(&tmp, NDB_MGM_NODE_TYPE_NDB)) + free_id= tmp; + break; + default: + m_output->println(cmd); + m_output->println("result: unknown nodetype %d", nodetype); + m_output->println(""); + return; + } + + if (nodeid != 0 && free_id != nodeid){ + m_output->println(cmd); + m_output->println("result: no free nodeid %d for nodetype %d", + nodeid, nodetype); + m_output->println(""); + return; + } + + if (free_id == 0){ + m_output->println(cmd); + m_output->println("result: no free nodeid for nodetype %d", nodetype); + m_output->println(""); + return; + } + +#if 0 + if (!compatible){ + m_output->println(cmd); + m_output->println("result: incompatible version mgmt 0x%x and node 0x%x", + NDB_VERSION, version); + m_output->println(""); + return; + } +#endif + + m_output->println(cmd); + m_output->println("nodeid: %u", free_id); + m_output->println("result: Ok"); + m_output->println(""); + + return; +} + void MgmApiSession::getConfig_common(Parser_t::Context &, const class Properties &args, @@ -432,7 +516,6 @@ MgmApiSession::getConfig_common(Parser_t::Context &, m_output->println("Content-Transfer-Encoding: base64"); m_output->println(""); m_output->println(str.c_str()); - m_output->println(""); return; } diff --git a/ndb/src/mgmsrv/Services.hpp b/ndb/src/mgmsrv/Services.hpp index 3690f1a5a93..545d2bf846f 100644 --- a/ndb/src/mgmsrv/Services.hpp +++ b/ndb/src/mgmsrv/Services.hpp @@ -51,6 +51,7 @@ public: void getConfig_old(Parser_t::Context &ctx); #endif /* MGM_GET_CONFIG_BACKWARDS_COMPAT */ + void get_nodeid(Parser_t::Context &ctx, const class Properties &args); void getVersion(Parser_t::Context &ctx, const class Properties &args); void getStatus(Parser_t::Context &ctx, const class Properties &args); void getInfoClusterLog(Parser_t::Context &ctx, const class Properties &args); diff --git a/ndb/src/mgmsrv/main.cpp b/ndb/src/mgmsrv/main.cpp index d9eb0001c44..db977cc492f 100644 --- a/ndb/src/mgmsrv/main.cpp +++ b/ndb/src/mgmsrv/main.cpp @@ -20,7 +20,7 @@ #include "MgmtSrvr.hpp" #include "EventLogger.hpp" -#include "Config.hpp" +#include #include "InitConfigFileParser.hpp" #include #include "Services.hpp" @@ -88,7 +88,6 @@ static MgmGlobals glob; ******************************************************************************/ static bool readLocalConfig(); static bool readGlobalConfig(); -static bool setPortNo(); /** * Global variables @@ -146,7 +145,9 @@ NDB_MAIN(mgmsrv){ exit(1); } glob.socketServer = new SocketServer(); + MgmApiService * mapi = new MgmApiService(); + MgmStatService * mstat = new MgmStatService(); /**************************** @@ -157,9 +158,26 @@ NDB_MAIN(mgmsrv){ if (!readGlobalConfig()) goto error_end; - if (!setPortNo()) + glob.mgmObject = new MgmtSrvr(glob.localNodeId, + BaseString(glob.config_filename), + BaseString(glob.local_config_filename == 0 ? + "" : glob.local_config_filename), + glob.cluster_config); + + glob.cluster_config = 0; + glob.localNodeId= glob.mgmObject->getOwnNodeId(); + + if (glob.localNodeId == 0) goto error_end; - + + glob.port= glob.mgmObject->getPort(); + + if (glob.port == 0) + goto error_end; + + glob.interface_name = 0; + glob.use_specific_ip = false; + if(!glob.use_specific_ip){ if(!glob.socketServer->tryBind(glob.port, glob.interface_name)){ ndbout_c("Unable to setup port: %s:%d!\n" @@ -190,15 +208,8 @@ NDB_MAIN(mgmsrv){ goto error_end; } - glob.mgmObject = new MgmtSrvr(glob.localNodeId, - BaseString(glob.config_filename), - BaseString(glob.local_config_filename == 0 ? "" : glob.local_config_filename), - glob.cluster_config); - - glob.cluster_config = 0; - if(!glob.mgmObject->check_start()){ - ndbout_c("Unable to start management server."); + ndbout_c("Unable to check start management server."); ndbout_c("Probably caused by illegal initial configuration file."); goto error_end; } @@ -343,108 +354,3 @@ readGlobalConfig() { } return true; } - -/** - * @fn setPortNo - * @param glob : Global variables - * @return true if success, false otherwise. - * - * Port number: - * 2. Use port number from global configuration file - * 4. Use port number for statistics from global configuration file - */ -static bool -setPortNo(){ - const Properties *mgmProps; - - ndb_mgm_configuration_iterator * iter = - ndb_mgm_create_configuration_iterator(glob.cluster_config->m_configValues, - CFG_SECTION_NODE); - if(iter == 0) - return false; - - if(ndb_mgm_find(iter, CFG_NODE_ID, glob.localNodeId) != 0){ - ndbout << "Could not retrieve configuration for Node " - << glob.localNodeId << " in config file." << endl - << "Have you set correct NodeId for this node?" << endl; - ndb_mgm_destroy_iterator(iter); - return false; - } - - unsigned type; - if(ndb_mgm_get_int_parameter(iter, CFG_TYPE_OF_SECTION, &type) != 0 || - type != NODE_TYPE_MGM){ - ndbout << "Local node id " << glob.localNodeId - << " is not defined as management server" << endl - << "Have you set correct NodeId for this node?" << endl; - return false; - } - - /************ - * Set Port * - ************/ - Uint32 tmp = 0; - if(ndb_mgm_get_int_parameter(iter, CFG_MGM_PORT, &tmp) != 0){ - ndbout << "Could not find PortNumber in the configuration file." << endl; - return false; - } - glob.port = tmp; - - /***************** - * Set Stat Port * - *****************/ -#if 0 - if (!mgmProps->get("PortNumberStats", &tmp)){ - ndbout << "Could not find PortNumberStats in the configuration file." - << endl; - return false; - } - glob.port_stats = tmp; -#endif - -#if 0 - const char * host; - if(ndb_mgm_get_string_parameter(iter, mgmProps->get("ExecuteOnComputer", host)){ - ndbout << "Failed to find \"ExecuteOnComputer\" for my node" << endl; - ndbout << "Unable to verify own hostname" << endl; - return false; - } - - const char * hostname; - { - const Properties * p; - char buf[255]; - snprintf(buf, sizeof(buf), "Computer_%s", host.c_str()); - if(!glob.cluster_config->get(buf, &p)){ - ndbout << "Failed to find computer " << host << " in config" << endl; - ndbout << "Unable to verify own hostname" << endl; - return false; - } - if(!p->get("HostName", &hostname)){ - ndbout << "Failed to find \"HostName\" for computer " << host - << " in config" << endl; - ndbout << "Unable to verify own hostname" << endl; - return false; - } - if(NdbHost_GetHostName(buf) != 0){ - ndbout << "Unable to get own hostname" << endl; - ndbout << "Unable to verify own hostname" << endl; - return false; - } - } - - const char * ip_address; - if(mgmProps->get("IpAddress", &ip_address)){ - glob.use_specific_ip = true; - glob.interface_name = strdup(ip_address); - return true; - } - - glob.interface_name = strdup(hostname); -#endif - - glob.interface_name = 0; - glob.use_specific_ip = false; - - return true; -} diff --git a/ndb/src/ndbapi/ClusterMgr.cpp b/ndb/src/ndbapi/ClusterMgr.cpp index b26d550fe31..b5428cb46b0 100644 --- a/ndb/src/ndbapi/ClusterMgr.cpp +++ b/ndb/src/ndbapi/ClusterMgr.cpp @@ -295,11 +295,14 @@ ClusterMgr::execAPI_REGREQ(const Uint32 * theData){ } int global_mgmt_server_check = 0; // set to one in mgmtsrvr main; + void ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0]; const NodeId nodeId = refToNode(apiRegConf->qmgrRef); + m_connected_nodes.assign(apiRegConf->connected_nodes); + #if 0 ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId); #endif @@ -309,6 +312,7 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ Node & node = theNodes[nodeId]; assert(node.defined == true); assert(node.connected == true); + if(node.m_info.m_version != apiRegConf->version){ node.m_info.m_version = apiRegConf->version; if (global_mgmt_server_check == 1) @@ -422,6 +426,8 @@ ClusterMgr::reportDisconnected(NodeId nodeId){ void ClusterMgr::reportNodeFailed(NodeId nodeId){ + m_connected_nodes.clear(nodeId); + Node & theNode = theNodes[nodeId]; theNode.m_alive = false; diff --git a/ndb/src/ndbapi/ClusterMgr.hpp b/ndb/src/ndbapi/ClusterMgr.hpp index cc3cf66c8aa..a516df3e27f 100644 --- a/ndb/src/ndbapi/ClusterMgr.hpp +++ b/ndb/src/ndbapi/ClusterMgr.hpp @@ -78,6 +78,7 @@ public: const Node & getNodeInfo(NodeId) const; Uint32 getNoOfConnectedNodes() const; + NodeBitmask m_connected_nodes; private: Uint32 noOfConnectedNodes; diff --git a/ndb/src/ndbapi/TransporterFacade.cpp b/ndb/src/ndbapi/TransporterFacade.cpp index e725144a8f8..dea7b1e4bec 100644 --- a/ndb/src/ndbapi/TransporterFacade.cpp +++ b/ndb/src/ndbapi/TransporterFacade.cpp @@ -39,6 +39,7 @@ #endif //#define REPORT_TRANSPORTER +//#define API_TRACE; #if defined DEBUG_TRANSPORTER #define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl; @@ -440,7 +441,17 @@ runSendRequest_C(void * me) void TransporterFacade::threadMainSend(void) { + SocketServer socket_server; + theTransporterRegistry->startSending(); + if (!theTransporterRegistry->start_service(socket_server)) + NDB_ASSERT(0, "Unable to start theTransporterRegistry->start_service"); + + if (!theTransporterRegistry->start_clients()) + NDB_ASSERT(0, "Unable to start theTransporterRegistry->start_clients"); + + socket_server.startServer(); + while(!theStopReceive) { NdbSleep_MilliSleep(10); NdbMutex_Lock(theMutexPtr); @@ -451,6 +462,11 @@ void TransporterFacade::threadMainSend(void) NdbMutex_Unlock(theMutexPtr); } theTransporterRegistry->stopSending(); + + socket_server.stopServer(); + socket_server.stopSessions(); + + theTransporterRegistry->stop_clients(); } extern "C" @@ -466,7 +482,7 @@ void TransporterFacade::threadMainReceive(void) { theTransporterRegistry->startReceiving(); NdbMutex_Lock(theMutexPtr); - theTransporterRegistry->checkConnections(); + theTransporterRegistry->update_connections(); NdbMutex_Unlock(theMutexPtr); while(!theStopReceive) { for(int i = 0; i<10; i++){ @@ -478,7 +494,7 @@ void TransporterFacade::threadMainReceive(void) } } NdbMutex_Lock(theMutexPtr); - theTransporterRegistry->checkConnections(); + theTransporterRegistry->update_connections(); NdbMutex_Unlock(theMutexPtr); }//while theTransporterRegistry->stopReceiving(); @@ -875,13 +891,13 @@ TransporterFacade::sendFragmentedSignalUnCond(NdbApiSignal* aSignal, void TransporterFacade::doConnect(int aNodeId){ theTransporterRegistry->setIOState(aNodeId, NoHalt); - theTransporterRegistry->setPerformState(aNodeId, PerformConnect); + theTransporterRegistry->do_connect(aNodeId); } void TransporterFacade::doDisconnect(int aNodeId) { - theTransporterRegistry->setPerformState(aNodeId, PerformDisconnect); + theTransporterRegistry->do_disconnect(aNodeId); } void @@ -906,7 +922,7 @@ TransporterFacade::ownId() const bool TransporterFacade::isConnected(NodeId aNodeId){ - return theTransporterRegistry->performState(aNodeId) == PerformIO; + return theTransporterRegistry->is_connected(aNodeId); } NodeId diff --git a/ndb/src/ndbapi/TransporterFacade.hpp b/ndb/src/ndbapi/TransporterFacade.hpp index 4b76cbe864a..e6720f7de2e 100644 --- a/ndb/src/ndbapi/TransporterFacade.hpp +++ b/ndb/src/ndbapi/TransporterFacade.hpp @@ -110,7 +110,6 @@ public: // Close this block number int close_local(BlockNumber blockNumber); - void setState(Uint32 aNodeId, PerformState aState); private: /**