mirror of
https://github.com/MariaDB/server.git
synced 2025-08-01 03:47:19 +03:00
logging_ok:
Logging to logging@openlogging.org accepted SCI_Transporter.hpp, SCI_Transporter.cpp: Major fix-up of SCI Transporter, fixed so that it works with single card, fixed wrap around, added lots of DBUG statements, merged with new transporter handling TransporterRegistry.cpp: Some fixes for wrap around needed plus DBUG handling TCP_Transporter.hpp, TCP_Transporter.cpp: Added DBUG statements SHM_Transporter.hpp, SHM_Transporter.cpp: Fixed SHM Transporter SHM_Buffer.hpp: Fixed SHM Buffer to handle wrap around properly IPCConfig.cpp: Fixed up config of SCI SocketServer.cpp: Added DBUG support for SocketServer threads ConfigInfo.cpp: Config changes for SCI TransporterDefinitions.hpp, mgmapi_config_parameters.h: SCI fixes Makefile.am, type_ndbapitools.mk.am, type_ndbapitest.mk.am: Added SCI library path to Makefiles configure.in: Fixed small bug with shared mem and sci together in configure acinclude.m4: Added possibility of providing SCI library path in confgure acinclude.m4: Added possibility of providing SCI library path in confgure configure.in: Fixed small bug with shared mem and sci together in configure ndb/config/type_ndbapitest.mk.am: Added SCI library path to Makefiles ndb/config/type_ndbapitools.mk.am: Added SCI library path to Makefiles ndb/src/cw/cpcd/Makefile.am: Added SCI library path to Makefiles ndb/src/kernel/Makefile.am: Added SCI library path to Makefiles ndb/src/kernel/blocks/backup/restore/Makefile.am: Added SCI library path to Makefiles ndb/src/mgmsrv/Makefile.am: Added SCI library path to Makefiles sql/Makefile.am: Added SCI library path to Makefiles ndb/src/common/transporter/Makefile.am: Added SCI library path to Makefiles ndb/include/mgmapi/mgmapi_config_parameters.h: SCI fixes ndb/include/transporter/TransporterDefinitions.hpp: SCI fixes ndb/src/mgmsrv/ConfigInfo.cpp: Config changes for SCI ndb/src/common/util/SocketServer.cpp: Added DBUG support for SocketServer threads ndb/src/common/mgmcommon/IPCConfig.cpp: Fixed up config of SCI ndb/src/common/transporter/SHM_Buffer.hpp: Fixed SHM Buffer to handle wrap around properly ndb/src/common/transporter/SHM_Transporter.cpp: Fixed SHM Transporter ndb/src/common/transporter/SHM_Transporter.hpp: Fixed SHM Transporter ndb/src/common/transporter/TCP_Transporter.cpp: Added DBUG statements ndb/src/common/transporter/TCP_Transporter.hpp: Added DBUG statements ndb/src/common/transporter/TransporterRegistry.cpp: Some fixes for wrap around needed plus DBUG handling ndb/src/common/transporter/SCI_Transporter.cpp: Major fix-up of SCI Transporter, fixed so that it works with single card, fixed wrap around, added lots of DBUG statements, merged with new transporter handling ndb/src/common/transporter/SCI_Transporter.hpp: Major fix-up of SCI Transporter, fixed so that it works with single card, fixed wrap around, added lots of DBUG statements, merged with new transporter handling BitKeeper/etc/logging_ok: Logging to logging@openlogging.org accepted
This commit is contained in:
@ -100,6 +100,7 @@ miguel@hegel.txg.br
|
||||
miguel@light.
|
||||
miguel@light.local
|
||||
miguel@sartre.local
|
||||
mikael@mc04.(none)
|
||||
mikron@c-fb0ae253.1238-1-64736c10.cust.bredbandsbolaget.se
|
||||
mikron@mikael-ronstr-ms-dator.local
|
||||
mmatthew@markslaptop.
|
||||
@ -158,6 +159,7 @@ ram@ram.(none)
|
||||
ranger@regul.home.lan
|
||||
rburnett@build.mysql.com
|
||||
root@home.(none)
|
||||
root@mc04.(none)
|
||||
root@x3.internalnet
|
||||
salle@banica.(none)
|
||||
salle@geopard.(none)
|
||||
|
50
acinclude.m4
50
acinclude.m4
@ -1551,16 +1551,43 @@ dnl Sets HAVE_NDBCLUSTER_DB if --with-ndbcluster is used
|
||||
dnl ---------------------------------------------------------------------------
|
||||
|
||||
AC_DEFUN([MYSQL_CHECK_NDB_OPTIONS], [
|
||||
AC_ARG_WITH([ndb-sci],
|
||||
AC_HELP_STRING([--with-ndb-sci=DIR],
|
||||
[Provide MySQL with a custom location of
|
||||
sci library. Given DIR, sci library is
|
||||
assumed to be in $DIR/lib and header files
|
||||
in $DIR/include.]),
|
||||
[mysql_sci_dir=${withval}],
|
||||
[mysql_sci_dir=""])
|
||||
|
||||
case "$mysql_sci_dir" in
|
||||
"no" )
|
||||
have_ndb_sci=no
|
||||
AC_MSG_RESULT([-- not including sci transporter])
|
||||
;;
|
||||
* )
|
||||
if test -f "$mysql_sci_dir/lib/libsisci.a" -a \
|
||||
-f "$mysql_sci_dir/include/sisci_api.h"; then
|
||||
NDB_SCI_INCLUDES="-I$mysql_sci_dir/include"
|
||||
NDB_SCI_LIBS="-L$mysql_sci_dir/lib -lsisci"
|
||||
AC_MSG_RESULT([-- including sci transporter])
|
||||
AC_DEFINE([NDB_SCI_TRANSPORTER], [1],
|
||||
[Including Ndb Cluster DB sci transporter])
|
||||
AC_SUBST(NDB_SCI_INCLUDES)
|
||||
AC_SUBST(NDB_SCI_LIBS)
|
||||
have_ndb_sci="yes"
|
||||
AC_MSG_RESULT([found sci transporter in $mysql_sci_dir/{include, lib}])
|
||||
else
|
||||
AC_MSG_RESULT([could not find sci transporter in $mysql_sci_dir/{include, lib}])
|
||||
fi
|
||||
;;
|
||||
esac
|
||||
|
||||
AC_ARG_WITH([ndb-shm],
|
||||
[
|
||||
--with-ndb-shm Include the NDB Cluster shared memory transporter],
|
||||
[ndb_shm="$withval"],
|
||||
[ndb_shm=no])
|
||||
AC_ARG_WITH([ndb-sci],
|
||||
[
|
||||
--with-ndb-sci Include the NDB Cluster sci transporter],
|
||||
[ndb_sci="$withval"],
|
||||
[ndb_sci=no])
|
||||
AC_ARG_WITH([ndb-test],
|
||||
[
|
||||
--with-ndb-test Include the NDB Cluster ndbapi test programs],
|
||||
@ -1593,19 +1620,6 @@ AC_DEFUN([MYSQL_CHECK_NDB_OPTIONS], [
|
||||
;;
|
||||
esac
|
||||
|
||||
have_ndb_sci=no
|
||||
case "$ndb_sci" in
|
||||
yes )
|
||||
AC_MSG_RESULT([-- including sci transporter])
|
||||
AC_DEFINE([NDB_SCI_TRANSPORTER], [1],
|
||||
[Including Ndb Cluster DB sci transporter])
|
||||
have_ndb_sci="yes"
|
||||
;;
|
||||
* )
|
||||
AC_MSG_RESULT([-- not including sci transporter])
|
||||
;;
|
||||
esac
|
||||
|
||||
have_ndb_test=no
|
||||
case "$ndb_test" in
|
||||
yes )
|
||||
|
@ -3024,11 +3024,11 @@ AC_SUBST([ndb_port_base])
|
||||
ndb_transporter_opt_objs=""
|
||||
if test X"$have_ndb_shm" = Xyes
|
||||
then
|
||||
ndb_transporter_opt_objs="$(ndb_transporter_opt_objs) SHM_Transporter.lo SHM_Transporter.unix.lo"
|
||||
ndb_transporter_opt_objs="$ndb_transporter_opt_objs SHM_Transporter.lo SHM_Transporter.unix.lo"
|
||||
fi
|
||||
if test X"$have_ndb_sci" = Xyes
|
||||
then
|
||||
ndb_transporter_opt_objs="$(ndb_transporter_opt_objs) SCI_Transporter.lo"
|
||||
ndb_transporter_opt_objs="$ndb_transporter_opt_objs SCI_Transporter.lo"
|
||||
fi
|
||||
AC_SUBST([ndb_transporter_opt_objs])
|
||||
|
||||
|
@ -3,7 +3,7 @@ LDADD += $(top_builddir)/ndb/test/src/libNDBT.a \
|
||||
$(top_builddir)/ndb/src/libndbclient.la \
|
||||
$(top_builddir)/dbug/libdbug.a \
|
||||
$(top_builddir)/mysys/libmysys.a \
|
||||
$(top_builddir)/strings/libmystrings.a
|
||||
$(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@
|
||||
|
||||
INCLUDES += -I$(srcdir) -I$(top_srcdir)/include \
|
||||
-I$(top_srcdir)/ndb/include \
|
||||
|
@ -3,7 +3,7 @@ LDADD += \
|
||||
$(top_builddir)/ndb/src/libndbclient.la \
|
||||
$(top_builddir)/dbug/libdbug.a \
|
||||
$(top_builddir)/mysys/libmysys.a \
|
||||
$(top_builddir)/strings/libmystrings.a
|
||||
$(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@
|
||||
|
||||
INCLUDES += -I$(srcdir) -I$(top_srcdir)/include \
|
||||
-I$(top_srcdir)/ndb/include \
|
||||
|
@ -117,16 +117,14 @@
|
||||
#define CFG_SHM_KEY 502
|
||||
#define CFG_SHM_BUFFER_MEM 503
|
||||
|
||||
#define CFG_SCI_ID_0 550
|
||||
#define CFG_SCI_ID_1 551
|
||||
#define CFG_SCI_SEND_LIMIT 552
|
||||
#define CFG_SCI_BUFFER_MEM 553
|
||||
#define CFG_SCI_NODE1_ADAPTERS 554
|
||||
#define CFG_SCI_NODE1_ADAPTER0 555
|
||||
#define CFG_SCI_NODE1_ADAPTER1 556
|
||||
#define CFG_SCI_NODE2_ADAPTERS 554
|
||||
#define CFG_SCI_NODE2_ADAPTER0 555
|
||||
#define CFG_SCI_NODE2_ADAPTER1 556
|
||||
#define CFG_SCI_HOST1_ID_0 550
|
||||
#define CFG_SCI_HOST1_ID_1 551
|
||||
#define CFG_SCI_HOST2_ID_0 552
|
||||
#define CFG_SCI_HOST2_ID_1 553
|
||||
#define CFG_SCI_HOSTNAME_1 554
|
||||
#define CFG_SCI_HOSTNAME_2 555
|
||||
#define CFG_SCI_SEND_LIMIT 556
|
||||
#define CFG_SCI_BUFFER_MEM 557
|
||||
|
||||
#define CFG_OSE_HOSTNAME_1 600
|
||||
#define CFG_OSE_HOSTNAME_2 601
|
||||
|
@ -59,8 +59,6 @@ struct TCP_TransporterConfiguration {
|
||||
NodeId localNodeId;
|
||||
Uint32 sendBufferSize; // Size of SendBuffer of priority B
|
||||
Uint32 maxReceiveSize; // Maximum no of bytes to receive
|
||||
Uint32 byteOrder;
|
||||
bool compression;
|
||||
bool checksum;
|
||||
bool signalId;
|
||||
};
|
||||
@ -72,10 +70,8 @@ struct SHM_TransporterConfiguration {
|
||||
Uint32 port;
|
||||
NodeId remoteNodeId;
|
||||
NodeId localNodeId;
|
||||
bool compression;
|
||||
bool checksum;
|
||||
bool signalId;
|
||||
int byteOrder;
|
||||
|
||||
Uint32 shmKey;
|
||||
Uint32 shmSize;
|
||||
@ -89,10 +85,8 @@ struct OSE_TransporterConfiguration {
|
||||
const char *localHostName;
|
||||
NodeId remoteNodeId;
|
||||
NodeId localNodeId;
|
||||
bool compression;
|
||||
bool checksum;
|
||||
bool signalId;
|
||||
int byteOrder;
|
||||
|
||||
Uint32 prioASignalSize;
|
||||
Uint32 prioBSignalSize;
|
||||
@ -103,20 +97,20 @@ struct OSE_TransporterConfiguration {
|
||||
* SCI Transporter Configuration
|
||||
*/
|
||||
struct SCI_TransporterConfiguration {
|
||||
const char *remoteHostName;
|
||||
const char *localHostName;
|
||||
Uint32 port;
|
||||
Uint32 sendLimit; // Packet size
|
||||
Uint32 bufferSize; // Buffer size
|
||||
|
||||
Uint32 nLocalAdapters; // 1 or 2, the number of adapters on local host
|
||||
|
||||
Uint32 nRemoteAdapters;
|
||||
Uint32 remoteSciNodeId0; // SCInodeId for adapter 1
|
||||
Uint32 remoteSciNodeId1; // SCInodeId for adapter 2
|
||||
|
||||
NodeId localNodeId; // Local node Id
|
||||
NodeId remoteNodeId; // Remote node Id
|
||||
|
||||
Uint32 byteOrder;
|
||||
bool compression;
|
||||
bool checksum;
|
||||
bool signalId;
|
||||
|
||||
|
@ -133,7 +133,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
|
||||
Uint32 compression;
|
||||
Uint32 checksum;
|
||||
if(!tmp->get("SendSignalId", &sendSignalId)) continue;
|
||||
if(!tmp->get("Compression", &compression)) continue;
|
||||
if(!tmp->get("Checksum", &checksum)) continue;
|
||||
|
||||
const char * type;
|
||||
@ -143,8 +142,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
|
||||
SHM_TransporterConfiguration conf;
|
||||
conf.localNodeId = the_ownId;
|
||||
conf.remoteNodeId = (nodeId1 != the_ownId ? nodeId1 : nodeId2);
|
||||
conf.byteOrder = 0;
|
||||
conf.compression = compression;
|
||||
conf.checksum = checksum;
|
||||
conf.signalId = sendSignalId;
|
||||
|
||||
@ -164,8 +161,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
|
||||
SCI_TransporterConfiguration conf;
|
||||
conf.localNodeId = the_ownId;
|
||||
conf.remoteNodeId = (nodeId1 != the_ownId ? nodeId1 : nodeId2);
|
||||
conf.byteOrder = 0;
|
||||
conf.compression = compression;
|
||||
conf.checksum = checksum;
|
||||
conf.signalId = sendSignalId;
|
||||
|
||||
@ -174,18 +169,16 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
|
||||
|
||||
if(the_ownId == nodeId1){
|
||||
if(!tmp->get("Node1_NoOfAdapters", &conf.nLocalAdapters)) continue;
|
||||
if(!tmp->get("Node2_NoOfAdapters", &conf.nRemoteAdapters)) continue;
|
||||
if(!tmp->get("Node2_Adapter", 0, &conf.remoteSciNodeId0)) continue;
|
||||
|
||||
if(conf.nRemoteAdapters > 1){
|
||||
if(conf.nLocalAdapters > 1){
|
||||
if(!tmp->get("Node2_Adapter", 1, &conf.remoteSciNodeId1)) continue;
|
||||
}
|
||||
} else {
|
||||
if(!tmp->get("Node2_NoOfAdapters", &conf.nLocalAdapters)) continue;
|
||||
if(!tmp->get("Node1_NoOfAdapters", &conf.nRemoteAdapters)) continue;
|
||||
if(!tmp->get("Node1_Adapter", 0, &conf.remoteSciNodeId0)) continue;
|
||||
|
||||
if(conf.nRemoteAdapters > 1){
|
||||
if(conf.nLocalAdapters > 1){
|
||||
if(!tmp->get("Node1_Adapter", 1, &conf.remoteSciNodeId1)) continue;
|
||||
}
|
||||
}
|
||||
@ -243,8 +236,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
|
||||
conf.localHostName = ownHostName;
|
||||
conf.remoteNodeId = remoteNodeId;
|
||||
conf.localNodeId = ownNodeId;
|
||||
conf.byteOrder = 0;
|
||||
conf.compression = compression;
|
||||
conf.checksum = checksum;
|
||||
conf.signalId = sendSignalId;
|
||||
|
||||
@ -270,8 +261,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
|
||||
conf.localHostName = ownHostName;
|
||||
conf.remoteNodeId = remoteNodeId;
|
||||
conf.localNodeId = ownNodeId;
|
||||
conf.byteOrder = 0;
|
||||
conf.compression = compression;
|
||||
conf.checksum = checksum;
|
||||
conf.signalId = sendSignalId;
|
||||
|
||||
@ -344,6 +333,7 @@ Uint32
|
||||
IPCConfig::configureTransporters(Uint32 nodeId,
|
||||
const class ndb_mgm_configuration & config,
|
||||
class TransporterRegistry & tr){
|
||||
DBUG_ENTER("IPCConfig::configureTransporters");
|
||||
|
||||
Uint32 noOfTransportersCreated= 0, server_port= 0;
|
||||
ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
|
||||
@ -374,14 +364,13 @@ IPCConfig::configureTransporters(Uint32 nodeId,
|
||||
}
|
||||
server_port= tmp_server_port;
|
||||
}
|
||||
|
||||
DBUG_PRINT("info", ("Transporter between this node %d and node %d using port %d, signalId %d, checksum %d",
|
||||
nodeId, remoteNodeId, tmp_server_port, sendSignalId, checksum));
|
||||
switch(type){
|
||||
case CONNECTION_TYPE_SHM:{
|
||||
SHM_TransporterConfiguration conf;
|
||||
conf.localNodeId = nodeId;
|
||||
conf.remoteNodeId = remoteNodeId;
|
||||
conf.byteOrder = 0;
|
||||
conf.compression = 0;
|
||||
conf.checksum = checksum;
|
||||
conf.signalId = sendSignalId;
|
||||
|
||||
@ -391,45 +380,60 @@ IPCConfig::configureTransporters(Uint32 nodeId,
|
||||
conf.port= tmp_server_port;
|
||||
|
||||
if(!tr.createTransporter(&conf)){
|
||||
DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d",
|
||||
conf.localNodeId, conf.remoteNodeId));
|
||||
ndbout << "Failed to create SHM Transporter from: "
|
||||
<< conf.localNodeId << " to: " << conf.remoteNodeId << endl;
|
||||
} else {
|
||||
noOfTransportersCreated++;
|
||||
}
|
||||
DBUG_PRINT("info", ("Created SHM Transporter using shmkey %d, buf size = %d",
|
||||
conf.shmKey, conf.shmSize));
|
||||
break;
|
||||
}
|
||||
case CONNECTION_TYPE_SCI:{
|
||||
SCI_TransporterConfiguration conf;
|
||||
const char * host1, * host2;
|
||||
conf.localNodeId = nodeId;
|
||||
conf.remoteNodeId = remoteNodeId;
|
||||
conf.byteOrder = 0;
|
||||
conf.compression = 0;
|
||||
conf.checksum = checksum;
|
||||
conf.signalId = sendSignalId;
|
||||
conf.port= tmp_server_port;
|
||||
|
||||
if(iter.get(CFG_SCI_HOSTNAME_1, &host1)) break;
|
||||
if(iter.get(CFG_SCI_HOSTNAME_2, &host2)) break;
|
||||
|
||||
conf.localHostName = (nodeId == nodeId1 ? host1 : host2);
|
||||
conf.remoteHostName = (nodeId == nodeId1 ? host2 : host1);
|
||||
|
||||
if(iter.get(CFG_SCI_SEND_LIMIT, &conf.sendLimit)) break;
|
||||
if(iter.get(CFG_SCI_BUFFER_MEM, &conf.bufferSize)) break;
|
||||
|
||||
if(nodeId == nodeId1){
|
||||
if(iter.get(CFG_SCI_NODE1_ADAPTERS, &conf.nLocalAdapters)) break;
|
||||
if(iter.get(CFG_SCI_NODE2_ADAPTERS, &conf.nRemoteAdapters)) break;
|
||||
if(iter.get(CFG_SCI_NODE2_ADAPTER0, &conf.remoteSciNodeId0)) break;
|
||||
if(conf.nRemoteAdapters > 1){
|
||||
if(iter.get(CFG_SCI_NODE2_ADAPTER1, &conf.remoteSciNodeId1)) break;
|
||||
}
|
||||
if (nodeId == nodeId1) {
|
||||
if(iter.get(CFG_SCI_HOST2_ID_0, &conf.remoteSciNodeId0)) break;
|
||||
if(iter.get(CFG_SCI_HOST2_ID_1, &conf.remoteSciNodeId1)) break;
|
||||
} else {
|
||||
if(iter.get(CFG_SCI_NODE2_ADAPTERS, &conf.nLocalAdapters)) break;
|
||||
if(iter.get(CFG_SCI_NODE1_ADAPTERS, &conf.nRemoteAdapters)) break;
|
||||
if(iter.get(CFG_SCI_NODE1_ADAPTER0, &conf.remoteSciNodeId0)) break;
|
||||
if(conf.nRemoteAdapters > 1){
|
||||
if(iter.get(CFG_SCI_NODE1_ADAPTER1, &conf.remoteSciNodeId1)) break;
|
||||
if(iter.get(CFG_SCI_HOST1_ID_0, &conf.remoteSciNodeId0)) break;
|
||||
if(iter.get(CFG_SCI_HOST1_ID_1, &conf.remoteSciNodeId1)) break;
|
||||
}
|
||||
if (conf.remoteSciNodeId1 == 0) {
|
||||
conf.nLocalAdapters = 1;
|
||||
} else {
|
||||
conf.nLocalAdapters = 2;
|
||||
}
|
||||
|
||||
if(!tr.createTransporter(&conf)){
|
||||
DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d",
|
||||
conf.localNodeId, conf.remoteNodeId));
|
||||
ndbout << "Failed to create SCI Transporter from: "
|
||||
<< conf.localNodeId << " to: " << conf.remoteNodeId << endl;
|
||||
} else {
|
||||
DBUG_PRINT("info", ("Created SCI Transporter: Adapters = %d, remote SCI node id %d",
|
||||
conf.nLocalAdapters, conf.remoteSciNodeId0));
|
||||
DBUG_PRINT("info", ("Host 1 = %s, Host 2 = %s, sendLimit = %d, buf size = %d",
|
||||
conf.localHostName, conf.remoteHostName, conf.sendLimit, conf.bufferSize));
|
||||
if (conf.nLocalAdapters > 1) {
|
||||
DBUG_PRINT("info", ("Fault-tolerant with 2 Remote Adapters, second remote SCI node id = %d",
|
||||
conf.remoteSciNodeId1));
|
||||
}
|
||||
noOfTransportersCreated++;
|
||||
continue;
|
||||
}
|
||||
@ -457,8 +461,6 @@ IPCConfig::configureTransporters(Uint32 nodeId,
|
||||
conf.remoteNodeId = remoteNodeId;
|
||||
conf.localHostName = (nodeId == nodeId1 ? host1 : host2);
|
||||
conf.remoteHostName = (nodeId == nodeId1 ? host2 : host1);
|
||||
conf.byteOrder = 0;
|
||||
conf.compression = 0;
|
||||
conf.checksum = checksum;
|
||||
conf.signalId = sendSignalId;
|
||||
|
||||
@ -468,6 +470,9 @@ IPCConfig::configureTransporters(Uint32 nodeId,
|
||||
} else {
|
||||
noOfTransportersCreated++;
|
||||
}
|
||||
DBUG_PRINT("info", ("Created TCP Transporter: sendBufferSize = %d, maxReceiveSize = %d",
|
||||
conf.sendBufferSize, conf.maxReceiveSize));
|
||||
break;
|
||||
case CONNECTION_TYPE_OSE:{
|
||||
OSE_TransporterConfiguration conf;
|
||||
|
||||
@ -483,8 +488,6 @@ IPCConfig::configureTransporters(Uint32 nodeId,
|
||||
conf.remoteNodeId = remoteNodeId;
|
||||
conf.localHostName = (nodeId == nodeId1 ? host1 : host2);
|
||||
conf.remoteHostName = (nodeId == nodeId1 ? host2 : host1);
|
||||
conf.byteOrder = 0;
|
||||
conf.compression = 0;
|
||||
conf.checksum = checksum;
|
||||
conf.signalId = sendSignalId;
|
||||
|
||||
@ -505,6 +508,6 @@ IPCConfig::configureTransporters(Uint32 nodeId,
|
||||
|
||||
tr.m_service_port= server_port;
|
||||
|
||||
return noOfTransportersCreated;
|
||||
DBUG_RETURN(noOfTransportersCreated);
|
||||
}
|
||||
|
||||
|
@ -13,7 +13,7 @@ EXTRA_libtransporter_la_SOURCES = SHM_Transporter.cpp SHM_Transporter.unix.cpp S
|
||||
libtransporter_la_LIBADD = @ndb_transporter_opt_objs@
|
||||
libtransporter_la_DEPENDENCIES = @ndb_transporter_opt_objs@
|
||||
|
||||
INCLUDES_LOC = -I$(top_srcdir)/ndb/include/kernel -I$(top_srcdir)/ndb/include/transporter
|
||||
INCLUDES_LOC = -I$(top_srcdir)/ndb/include/kernel -I$(top_srcdir)/ndb/include/transporter @NDB_SCI_INCLUDES@
|
||||
|
||||
include $(top_srcdir)/ndb/config/common.mk.am
|
||||
include $(top_srcdir)/ndb/config/type_util.mk.am
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -26,7 +26,7 @@
|
||||
|
||||
#include <ndb_types.h>
|
||||
|
||||
/**
|
||||
/**
|
||||
* The SCI Transporter
|
||||
*
|
||||
* The design goal of the SCI transporter is to deliver high performance
|
||||
@ -135,15 +135,17 @@ public:
|
||||
bool getConnectionStatus();
|
||||
|
||||
private:
|
||||
SCI_Transporter(Uint32 packetSize,
|
||||
SCI_Transporter(TransporterRegistry &t_reg,
|
||||
const char *local_host,
|
||||
const char *remote_host,
|
||||
int port,
|
||||
Uint32 packetSize,
|
||||
Uint32 bufferSize,
|
||||
Uint32 nAdapters,
|
||||
Uint16 remoteSciNodeId0,
|
||||
Uint16 remoteSciNodeId1,
|
||||
NodeId localNodeID,
|
||||
NodeId remoteNodeID,
|
||||
int byteorder,
|
||||
bool compression,
|
||||
bool checksum,
|
||||
bool signalId,
|
||||
Uint32 reportFreq = 4096);
|
||||
@ -160,7 +162,8 @@ private:
|
||||
/**
|
||||
* For statistics on transfered packets
|
||||
*/
|
||||
#ifdef DEBUG_TRANSPORTER
|
||||
//#ifdef DEBUG_TRANSPORTER
|
||||
#if 1
|
||||
Uint32 i1024;
|
||||
Uint32 i2048;
|
||||
Uint32 i2049;
|
||||
@ -177,10 +180,8 @@ private:
|
||||
struct {
|
||||
Uint32 * m_buffer; // The buffer
|
||||
Uint32 m_dataSize; // No of words in buffer
|
||||
Uint32 m_bufferSize; // Buffer size
|
||||
Uint32 m_sendBufferSize; // Buffer size
|
||||
Uint32 m_forceSendLimit; // Send when buffer is this full
|
||||
|
||||
bool full() const { return (m_dataSize * 4) > m_forceSendLimit ;}
|
||||
} m_sendBuffer;
|
||||
|
||||
SHM_Reader * reader;
|
||||
@ -196,7 +197,7 @@ private:
|
||||
Uint32 m_adapters;
|
||||
Uint32 m_numberOfRemoteNodes;
|
||||
|
||||
Uint16* m_remoteNodes;
|
||||
Uint16 m_remoteNodes[2];
|
||||
|
||||
typedef struct SciAdapter {
|
||||
sci_desc_t scidesc;
|
||||
@ -297,12 +298,12 @@ private:
|
||||
bool sendIsPossible(struct timeval * timeout);
|
||||
|
||||
|
||||
void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){
|
||||
reader->getReadPtr(* ptr, * eod);
|
||||
void getReceivePtr(Uint32 ** ptr, Uint32 &size){
|
||||
size = reader->getReadPtr(* ptr);
|
||||
}
|
||||
|
||||
void updateReceivePtr(Uint32 * ptr){
|
||||
reader->updateReadPtr(ptr);
|
||||
void updateReceivePtr(Uint32 size){
|
||||
reader->updateReadPtr(size);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -341,6 +342,8 @@ private:
|
||||
*/
|
||||
void failoverShmWriter();
|
||||
|
||||
bool init_local();
|
||||
bool init_remote();
|
||||
|
||||
protected:
|
||||
|
||||
@ -350,7 +353,8 @@ protected:
|
||||
* retrying.
|
||||
* @return Returns true on success, otherwize falser
|
||||
*/
|
||||
bool connectImpl(Uint32 timeOutMillis);
|
||||
bool connect_server_impl(NDB_SOCKET_TYPE sockfd);
|
||||
bool connect_client_impl(NDB_SOCKET_TYPE sockfd);
|
||||
|
||||
/**
|
||||
* We will disconnect if:
|
||||
|
@ -42,17 +42,19 @@ public:
|
||||
Uint32 _sizeOfBuffer,
|
||||
Uint32 _slack,
|
||||
Uint32 * _readIndex,
|
||||
Uint32 * _endWriteIndex,
|
||||
Uint32 * _writeIndex) :
|
||||
m_startOfBuffer(_startOfBuffer),
|
||||
m_totalBufferSize(_sizeOfBuffer),
|
||||
m_bufferSize(_sizeOfBuffer - _slack),
|
||||
m_sharedReadIndex(_readIndex),
|
||||
m_sharedEndWriteIndex(_endWriteIndex),
|
||||
m_sharedWriteIndex(_writeIndex)
|
||||
{
|
||||
}
|
||||
|
||||
void clear() {
|
||||
m_readIndex = * m_sharedReadIndex;
|
||||
m_readIndex = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -66,12 +68,12 @@ public:
|
||||
* returns ptr - where to start reading
|
||||
* sz - how much can I read
|
||||
*/
|
||||
inline void getReadPtr(Uint32 * & ptr, Uint32 * & eod);
|
||||
inline Uint32 getReadPtr(Uint32 * & ptr);
|
||||
|
||||
/**
|
||||
* Update read ptr
|
||||
*/
|
||||
inline void updateReadPtr(Uint32 * readPtr);
|
||||
inline void updateReadPtr(Uint32 size);
|
||||
|
||||
private:
|
||||
char * const m_startOfBuffer;
|
||||
@ -80,6 +82,7 @@ private:
|
||||
Uint32 m_readIndex;
|
||||
|
||||
Uint32 * m_sharedReadIndex;
|
||||
Uint32 * m_sharedEndWriteIndex;
|
||||
Uint32 * m_sharedWriteIndex;
|
||||
};
|
||||
|
||||
@ -97,19 +100,22 @@ SHM_Reader::empty() const{
|
||||
* sz - how much can I read
|
||||
*/
|
||||
inline
|
||||
void
|
||||
SHM_Reader::getReadPtr(Uint32 * & ptr, Uint32 * & eod){
|
||||
|
||||
Uint32
|
||||
SHM_Reader::getReadPtr(Uint32 * & ptr)
|
||||
{
|
||||
Uint32 *eod;
|
||||
Uint32 tReadIndex = m_readIndex;
|
||||
Uint32 tWriteIndex = * m_sharedWriteIndex;
|
||||
Uint32 tEndWriteIndex = * m_sharedEndWriteIndex;
|
||||
|
||||
ptr = (Uint32*)&m_startOfBuffer[tReadIndex];
|
||||
|
||||
if(tReadIndex <= tWriteIndex){
|
||||
eod = (Uint32*)&m_startOfBuffer[tWriteIndex];
|
||||
} else {
|
||||
eod = (Uint32*)&m_startOfBuffer[m_bufferSize];
|
||||
eod = (Uint32*)&m_startOfBuffer[tEndWriteIndex];
|
||||
}
|
||||
return (Uint32)((char*)eod - (char*)ptr);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -117,14 +123,14 @@ SHM_Reader::getReadPtr(Uint32 * & ptr, Uint32 * & eod){
|
||||
*/
|
||||
inline
|
||||
void
|
||||
SHM_Reader::updateReadPtr(Uint32 * ptr){
|
||||
|
||||
Uint32 tReadIndex = ((char *)ptr) - m_startOfBuffer;
|
||||
|
||||
SHM_Reader::updateReadPtr(Uint32 size)
|
||||
{
|
||||
Uint32 tReadIndex = m_readIndex;
|
||||
tReadIndex += size;
|
||||
assert(tReadIndex < m_totalBufferSize);
|
||||
|
||||
if(tReadIndex >= m_bufferSize){
|
||||
tReadIndex = 0; //-= m_bufferSize;
|
||||
tReadIndex = 0;
|
||||
}
|
||||
|
||||
m_readIndex = tReadIndex;
|
||||
@ -139,17 +145,19 @@ public:
|
||||
Uint32 _sizeOfBuffer,
|
||||
Uint32 _slack,
|
||||
Uint32 * _readIndex,
|
||||
Uint32 * _endWriteIndex,
|
||||
Uint32 * _writeIndex) :
|
||||
m_startOfBuffer(_startOfBuffer),
|
||||
m_totalBufferSize(_sizeOfBuffer),
|
||||
m_bufferSize(_sizeOfBuffer - _slack),
|
||||
m_sharedReadIndex(_readIndex),
|
||||
m_sharedEndWriteIndex(_endWriteIndex),
|
||||
m_sharedWriteIndex(_writeIndex)
|
||||
{
|
||||
}
|
||||
|
||||
void clear() {
|
||||
m_writeIndex = * m_sharedWriteIndex;
|
||||
m_writeIndex = 0;
|
||||
}
|
||||
|
||||
inline char * getWritePtr(Uint32 sz);
|
||||
@ -168,6 +176,7 @@ private:
|
||||
Uint32 m_writeIndex;
|
||||
|
||||
Uint32 * m_sharedReadIndex;
|
||||
Uint32 * m_sharedEndWriteIndex;
|
||||
Uint32 * m_sharedWriteIndex;
|
||||
};
|
||||
|
||||
@ -206,7 +215,8 @@ SHM_Writer::updateWritePtr(Uint32 sz){
|
||||
assert(tWriteIndex < m_totalBufferSize);
|
||||
|
||||
if(tWriteIndex >= m_bufferSize){
|
||||
tWriteIndex = 0; //-= m_bufferSize;
|
||||
* m_sharedEndWriteIndex = tWriteIndex;
|
||||
tWriteIndex = 0;
|
||||
}
|
||||
|
||||
m_writeIndex = tWriteIndex;
|
||||
|
@ -32,13 +32,12 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
|
||||
int r_port,
|
||||
NodeId lNodeId,
|
||||
NodeId rNodeId,
|
||||
bool compression,
|
||||
bool checksum,
|
||||
bool signalId,
|
||||
key_t _shmKey,
|
||||
Uint32 _shmSize) :
|
||||
Transporter(t_reg, lHostName, rHostName, r_port, lNodeId, rNodeId,
|
||||
0, compression, checksum, signalId),
|
||||
0, false, checksum, signalId),
|
||||
shmKey(_shmKey),
|
||||
shmSize(_shmSize)
|
||||
{
|
||||
@ -83,36 +82,40 @@ SHM_Transporter::setupBuffers(){
|
||||
|
||||
Uint32 * sharedReadIndex1 = base1;
|
||||
Uint32 * sharedWriteIndex1 = base1 + 1;
|
||||
Uint32 * sharedEndWriteIndex1 = base1 + 2;
|
||||
serverStatusFlag = base1 + 4;
|
||||
char * startOfBuf1 = shmBuf+sharedSize;
|
||||
|
||||
Uint32 * base2 = (Uint32*)(shmBuf + sizeOfBuffer + sharedSize);
|
||||
Uint32 * sharedReadIndex2 = base2;
|
||||
Uint32 * sharedWriteIndex2 = base2 + 1;
|
||||
Uint32 * sharedEndWriteIndex2 = base2 + 2;
|
||||
clientStatusFlag = base2 + 4;
|
||||
char * startOfBuf2 = ((char *)base2)+sharedSize;
|
||||
|
||||
* sharedReadIndex2 = * sharedWriteIndex2 = 0;
|
||||
|
||||
if(isServer){
|
||||
* serverStatusFlag = 0;
|
||||
reader = new SHM_Reader(startOfBuf1,
|
||||
sizeOfBuffer,
|
||||
slack,
|
||||
sharedReadIndex1,
|
||||
sharedEndWriteIndex1,
|
||||
sharedWriteIndex1);
|
||||
|
||||
writer = new SHM_Writer(startOfBuf2,
|
||||
sizeOfBuffer,
|
||||
slack,
|
||||
sharedReadIndex2,
|
||||
sharedEndWriteIndex2,
|
||||
sharedWriteIndex2);
|
||||
|
||||
* sharedReadIndex1 = 0;
|
||||
* sharedWriteIndex2 = 0;
|
||||
* sharedWriteIndex1 = 0;
|
||||
* sharedEndWriteIndex1 = 0;
|
||||
|
||||
* sharedReadIndex2 = 0;
|
||||
* sharedWriteIndex1 = 0;
|
||||
* sharedWriteIndex2 = 0;
|
||||
* sharedEndWriteIndex2 = 0;
|
||||
|
||||
reader->clear();
|
||||
writer->clear();
|
||||
@ -145,16 +148,19 @@ SHM_Transporter::setupBuffers(){
|
||||
sizeOfBuffer,
|
||||
slack,
|
||||
sharedReadIndex2,
|
||||
sharedEndWriteIndex2,
|
||||
sharedWriteIndex2);
|
||||
|
||||
writer = new SHM_Writer(startOfBuf1,
|
||||
sizeOfBuffer,
|
||||
slack,
|
||||
sharedReadIndex1,
|
||||
sharedEndWriteIndex1,
|
||||
sharedWriteIndex1);
|
||||
|
||||
* sharedReadIndex2 = 0;
|
||||
* sharedWriteIndex1 = 0;
|
||||
* sharedEndWriteIndex1 = 0;
|
||||
|
||||
reader->clear();
|
||||
writer->clear();
|
||||
@ -224,6 +230,7 @@ SHM_Transporter::prepareSend(const SignalHeader * const signalHeader,
|
||||
bool
|
||||
SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
|
||||
{
|
||||
DBUG_ENTER("SHM_Transporter::connect_server_impl");
|
||||
SocketOutputStream s_output(sockfd);
|
||||
SocketInputStream s_input(sockfd);
|
||||
char buf[256];
|
||||
@ -233,7 +240,7 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
|
||||
if (!ndb_shm_create()) {
|
||||
report_error(TE_SHM_UNABLE_TO_CREATE_SEGMENT);
|
||||
NDB_CLOSE_SOCKET(sockfd);
|
||||
return false;
|
||||
DBUG_RETURN(false);
|
||||
}
|
||||
_shmSegCreated = true;
|
||||
}
|
||||
@ -243,7 +250,7 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
|
||||
if (!ndb_shm_attach()) {
|
||||
report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT);
|
||||
NDB_CLOSE_SOCKET(sockfd);
|
||||
return false;
|
||||
DBUG_RETURN(false);
|
||||
}
|
||||
_attached = true;
|
||||
}
|
||||
@ -254,7 +261,7 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
|
||||
// Wait for ok from client
|
||||
if (s_input.gets(buf, 256) == 0) {
|
||||
NDB_CLOSE_SOCKET(sockfd);
|
||||
return false;
|
||||
DBUG_RETURN(false);
|
||||
}
|
||||
|
||||
int r= connect_common(sockfd);
|
||||
@ -265,17 +272,20 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
|
||||
// Wait for ok from client
|
||||
if (s_input.gets(buf, 256) == 0) {
|
||||
NDB_CLOSE_SOCKET(sockfd);
|
||||
return false;
|
||||
DBUG_RETURN(false);
|
||||
}
|
||||
DBUG_PRINT("info", ("Successfully connected server to node %d",
|
||||
remoteNodeId));
|
||||
}
|
||||
|
||||
NDB_CLOSE_SOCKET(sockfd);
|
||||
return r;
|
||||
DBUG_RETURN(r);
|
||||
}
|
||||
|
||||
bool
|
||||
SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
|
||||
{
|
||||
DBUG_ENTER("SHM_Transporter::connect_client_impl");
|
||||
SocketInputStream s_input(sockfd);
|
||||
SocketOutputStream s_output(sockfd);
|
||||
char buf[256];
|
||||
@ -283,14 +293,18 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
|
||||
// Wait for server to create and attach
|
||||
if (s_input.gets(buf, 256) == 0) {
|
||||
NDB_CLOSE_SOCKET(sockfd);
|
||||
return false;
|
||||
DBUG_PRINT("error", ("Server id %d did not attach",
|
||||
remoteNodeId));
|
||||
DBUG_RETURN(false);
|
||||
}
|
||||
|
||||
// Create
|
||||
if(!_shmSegCreated){
|
||||
if (!ndb_shm_get()) {
|
||||
NDB_CLOSE_SOCKET(sockfd);
|
||||
return false;
|
||||
DBUG_PRINT("error", ("Failed create of shm seg to node %d",
|
||||
remoteNodeId));
|
||||
DBUG_RETURN(false);
|
||||
}
|
||||
_shmSegCreated = true;
|
||||
}
|
||||
@ -300,7 +314,9 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
|
||||
if (!ndb_shm_attach()) {
|
||||
report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT);
|
||||
NDB_CLOSE_SOCKET(sockfd);
|
||||
return false;
|
||||
DBUG_PRINT("error", ("Failed attach of shm seg to node %d",
|
||||
remoteNodeId));
|
||||
DBUG_RETURN(false);
|
||||
}
|
||||
_attached = true;
|
||||
}
|
||||
@ -314,21 +330,28 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
|
||||
// Wait for ok from server
|
||||
if (s_input.gets(buf, 256) == 0) {
|
||||
NDB_CLOSE_SOCKET(sockfd);
|
||||
return false;
|
||||
DBUG_PRINT("error", ("No ok from server node %d",
|
||||
remoteNodeId));
|
||||
DBUG_RETURN(false);
|
||||
}
|
||||
// Send ok to server
|
||||
s_output.println("shm client 2 ok");
|
||||
DBUG_PRINT("info", ("Successfully connected client to node %d",
|
||||
remoteNodeId));
|
||||
}
|
||||
|
||||
NDB_CLOSE_SOCKET(sockfd);
|
||||
return r;
|
||||
DBUG_RETURN(r);
|
||||
}
|
||||
|
||||
bool
|
||||
SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
|
||||
{
|
||||
if (!checkConnected())
|
||||
if (!checkConnected()) {
|
||||
DBUG_PRINT("error", ("Already connected to node %d",
|
||||
remoteNodeId));
|
||||
return false;
|
||||
}
|
||||
|
||||
if(!setupBuffersDone) {
|
||||
setupBuffers();
|
||||
@ -341,5 +364,7 @@ SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
|
||||
return true;
|
||||
}
|
||||
|
||||
DBUG_PRINT("error", ("Failed to set up buffers to node %d",
|
||||
remoteNodeId));
|
||||
return false;
|
||||
}
|
||||
|
@ -38,7 +38,6 @@ public:
|
||||
int r_port,
|
||||
NodeId lNodeId,
|
||||
NodeId rNodeId,
|
||||
bool compression,
|
||||
bool checksum,
|
||||
bool signalId,
|
||||
key_t shmKey,
|
||||
@ -62,12 +61,12 @@ public:
|
||||
writer->updateWritePtr(lenBytes);
|
||||
}
|
||||
|
||||
void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){
|
||||
reader->getReadPtr(* ptr, * eod);
|
||||
void getReceivePtr(Uint32 ** ptr, Uint32 sz){
|
||||
sz = reader->getReadPtr(* ptr);
|
||||
}
|
||||
|
||||
void updateReceivePtr(Uint32 * ptr){
|
||||
reader->updateReadPtr(ptr);
|
||||
void updateReceivePtr(Uint32 sz){
|
||||
reader->updateReadPtr(sz);
|
||||
}
|
||||
|
||||
protected:
|
||||
@ -127,6 +126,7 @@ protected:
|
||||
private:
|
||||
bool _shmSegCreated;
|
||||
bool _attached;
|
||||
bool m_connected;
|
||||
|
||||
key_t shmKey;
|
||||
volatile Uint32 * serverStatusFlag;
|
||||
|
@ -70,11 +70,10 @@ TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
|
||||
int r_port,
|
||||
NodeId lNodeId,
|
||||
NodeId rNodeId,
|
||||
int byte_order,
|
||||
bool compr, bool chksm, bool signalId,
|
||||
bool chksm, bool signalId,
|
||||
Uint32 _reportFreq) :
|
||||
Transporter(t_reg, lHostName, rHostName, r_port, lNodeId, rNodeId,
|
||||
byte_order, compr, chksm, signalId),
|
||||
0, false, chksm, signalId),
|
||||
m_sendBuffer(sendBufSize)
|
||||
{
|
||||
maxReceiveSize = maxRecvSize;
|
||||
@ -106,12 +105,14 @@ TCP_Transporter::~TCP_Transporter() {
|
||||
|
||||
bool TCP_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
|
||||
{
|
||||
return connect_common(sockfd);
|
||||
DBUG_ENTER("TCP_Transpporter::connect_server_impl");
|
||||
DBUG_RETURN(connect_common(sockfd));
|
||||
}
|
||||
|
||||
bool TCP_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
|
||||
{
|
||||
return connect_common(sockfd);
|
||||
DBUG_ENTER("TCP_Transpporter::connect_client_impl");
|
||||
DBUG_RETURN(connect_common(sockfd));
|
||||
}
|
||||
|
||||
bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
|
||||
@ -119,6 +120,8 @@ bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
|
||||
theSocket = sockfd;
|
||||
setSocketOptions();
|
||||
setSocketNonBlocking(theSocket);
|
||||
DBUG_PRINT("info", ("Successfully set-up TCP transporter to node %d",
|
||||
remoteNodeId));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -52,8 +52,7 @@ private:
|
||||
int r_port,
|
||||
NodeId lHostId,
|
||||
NodeId rHostId,
|
||||
int byteorder,
|
||||
bool compression, bool checksum, bool signalId,
|
||||
bool checksum, bool signalId,
|
||||
Uint32 reportFreq = 4096);
|
||||
|
||||
// Disconnect, delete send buffers and receive buffer
|
||||
|
@ -15,6 +15,7 @@
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
#include <ndb_global.h>
|
||||
#include <my_pthread.h>
|
||||
|
||||
#include <TransporterRegistry.hpp>
|
||||
#include "TransporterInternalDefinitions.hpp"
|
||||
@ -48,9 +49,10 @@
|
||||
|
||||
SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
|
||||
{
|
||||
DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
|
||||
if (m_auth && !m_auth->server_authenticate(sockfd)){
|
||||
NDB_CLOSE_SOCKET(sockfd);
|
||||
return 0;
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
{
|
||||
@ -60,27 +62,32 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
|
||||
char buf[256];
|
||||
if (s_input.gets(buf, 256) == 0) {
|
||||
NDB_CLOSE_SOCKET(sockfd);
|
||||
return 0;
|
||||
DBUG_PRINT("error", ("Could not get node id from client"));
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
if (sscanf(buf, "%d", &nodeId) != 1) {
|
||||
NDB_CLOSE_SOCKET(sockfd);
|
||||
return 0;
|
||||
DBUG_PRINT("error", ("Error in node id from client"));
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
//check that nodeid is valid and that there is an allocated transporter
|
||||
if ( nodeId < 0 || nodeId >= m_transporter_registry->maxTransporters) {
|
||||
if ( nodeId < 0 || nodeId >= (int)m_transporter_registry->maxTransporters) {
|
||||
NDB_CLOSE_SOCKET(sockfd);
|
||||
return 0;
|
||||
DBUG_PRINT("error", ("Node id out of range from client"));
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
if (m_transporter_registry->theTransporters[nodeId] == 0) {
|
||||
NDB_CLOSE_SOCKET(sockfd);
|
||||
return 0;
|
||||
DBUG_PRINT("error", ("No transporter for this node id from client"));
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
//check that the transporter should be connected
|
||||
if (m_transporter_registry->performStates[nodeId] != TransporterRegistry::CONNECTING) {
|
||||
NDB_CLOSE_SOCKET(sockfd);
|
||||
return 0;
|
||||
DBUG_PRINT("error", ("Transporter in wrong state for this node id from client"));
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
Transporter *t= m_transporter_registry->theTransporters[nodeId];
|
||||
@ -93,7 +100,7 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
|
||||
t->connect_server(sockfd);
|
||||
}
|
||||
|
||||
return 0;
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
TransporterRegistry::TransporterRegistry(void * callback,
|
||||
@ -209,8 +216,6 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) {
|
||||
config->port,
|
||||
localNodeId,
|
||||
config->remoteNodeId,
|
||||
config->byteOrder,
|
||||
config->compression,
|
||||
config->checksum,
|
||||
config->signalId);
|
||||
if (t == NULL)
|
||||
@ -264,8 +269,6 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) {
|
||||
conf->localHostName,
|
||||
conf->remoteNodeId,
|
||||
conf->remoteHostName,
|
||||
conf->byteOrder,
|
||||
conf->compression,
|
||||
conf->checksum,
|
||||
conf->signalId);
|
||||
if (t == NULL)
|
||||
@ -306,15 +309,17 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) {
|
||||
if(theTransporters[config->remoteNodeId] != NULL)
|
||||
return false;
|
||||
|
||||
SCI_Transporter * t = new SCI_Transporter(config->sendLimit,
|
||||
SCI_Transporter * t = new SCI_Transporter(*this,
|
||||
config->localHostName,
|
||||
config->remoteHostName,
|
||||
config->port,
|
||||
config->sendLimit,
|
||||
config->bufferSize,
|
||||
config->nLocalAdapters,
|
||||
config->remoteSciNodeId0,
|
||||
config->remoteSciNodeId1,
|
||||
localNodeId,
|
||||
config->remoteNodeId,
|
||||
config->byteOrder,
|
||||
config->compression,
|
||||
config->checksum,
|
||||
config->signalId);
|
||||
|
||||
@ -357,7 +362,6 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
|
||||
config->port,
|
||||
localNodeId,
|
||||
config->remoteNodeId,
|
||||
config->compression,
|
||||
config->checksum,
|
||||
config->signalId,
|
||||
config->shmKey,
|
||||
@ -853,10 +857,11 @@ TransporterRegistry::performReceive(){
|
||||
const NodeId nodeId = t->getRemoteNodeId();
|
||||
if(is_connected(nodeId)){
|
||||
if(t->isConnected() && t->checkConnected()){
|
||||
Uint32 * readPtr, * eodPtr;
|
||||
t->getReceivePtr(&readPtr, &eodPtr);
|
||||
readPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
|
||||
t->updateReceivePtr(readPtr);
|
||||
Uint32 * readPtr;
|
||||
Uint32 sz = 0;
|
||||
t->getReceivePtr(&readPtr, sz);
|
||||
Uint32 szUsed = unpack(readPtr, sz, nodeId, ioStates[nodeId]);
|
||||
t->updateReceivePtr(szUsed);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -868,10 +873,11 @@ TransporterRegistry::performReceive(){
|
||||
const NodeId nodeId = t->getRemoteNodeId();
|
||||
if(is_connected(nodeId)){
|
||||
if(t->isConnected() && t->checkConnected()){
|
||||
Uint32 * readPtr, * eodPtr;
|
||||
t->getReceivePtr(&readPtr, &eodPtr);
|
||||
readPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
|
||||
t->updateReceivePtr(readPtr);
|
||||
Uint32 * readPtr;
|
||||
Uint32 sz = 0;
|
||||
t->getReceivePtr(&readPtr, sz);
|
||||
Uint32 szUsed = unpack(readPtr, sz, nodeId, ioStates[nodeId]);
|
||||
t->updateReceivePtr(szUsed);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1023,7 +1029,9 @@ TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
|
||||
static void *
|
||||
run_start_clients_C(void * me)
|
||||
{
|
||||
my_thread_init();
|
||||
((TransporterRegistry*) me)->start_clients_thread();
|
||||
my_thread_end();
|
||||
NdbThread_Exit(0);
|
||||
return me;
|
||||
}
|
||||
@ -1106,6 +1114,7 @@ TransporterRegistry::update_connections()
|
||||
void
|
||||
TransporterRegistry::start_clients_thread()
|
||||
{
|
||||
DBUG_ENTER("TransporterRegistry::start_clients_thread");
|
||||
while (m_run_start_clients_thread) {
|
||||
NdbSleep_MilliSleep(100);
|
||||
for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){
|
||||
@ -1129,6 +1138,7 @@ TransporterRegistry::start_clients_thread()
|
||||
}
|
||||
}
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
bool
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
|
||||
#include <ndb_global.h>
|
||||
#include <my_pthread.h>
|
||||
|
||||
#include <SocketServer.hpp>
|
||||
|
||||
@ -176,9 +177,9 @@ extern "C"
|
||||
void*
|
||||
socketServerThread_C(void* _ss){
|
||||
SocketServer * ss = (SocketServer *)_ss;
|
||||
|
||||
my_thread_init();
|
||||
ss->doRun();
|
||||
|
||||
my_thread_end();
|
||||
NdbThread_Exit(0);
|
||||
return 0;
|
||||
}
|
||||
@ -287,8 +288,10 @@ void*
|
||||
sessionThread_C(void* _sc){
|
||||
SocketServer::Session * si = (SocketServer::Session *)_sc;
|
||||
|
||||
my_thread_init();
|
||||
if(!transfer(si->m_socket)){
|
||||
si->m_stopped = true;
|
||||
my_thread_end();
|
||||
NdbThread_Exit(0);
|
||||
return 0;
|
||||
}
|
||||
@ -301,6 +304,7 @@ sessionThread_C(void* _sc){
|
||||
}
|
||||
|
||||
si->m_stopped = true;
|
||||
my_thread_end();
|
||||
NdbThread_Exit(0);
|
||||
return 0;
|
||||
}
|
||||
|
@ -7,7 +7,7 @@ LDADD_LOC = \
|
||||
$(top_builddir)/ndb/src/libndbclient.la \
|
||||
$(top_builddir)/dbug/libdbug.a \
|
||||
$(top_builddir)/mysys/libmysys.a \
|
||||
$(top_builddir)/strings/libmystrings.a
|
||||
$(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@
|
||||
|
||||
include $(top_srcdir)/ndb/config/common.mk.am
|
||||
include $(top_srcdir)/ndb/config/type_util.mk.am
|
||||
|
@ -55,7 +55,7 @@ LDADD += \
|
||||
$(top_builddir)/ndb/src/common/util/libgeneral.la \
|
||||
$(top_builddir)/dbug/libdbug.a \
|
||||
$(top_builddir)/mysys/libmysys.a \
|
||||
$(top_builddir)/strings/libmystrings.a
|
||||
$(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@
|
||||
|
||||
# Don't update the files from bitkeeper
|
||||
%::SCCS/s.%
|
||||
|
@ -7,7 +7,7 @@ LDADD_LOC = \
|
||||
$(top_builddir)/ndb/src/libndbclient.la \
|
||||
$(top_builddir)/dbug/libdbug.a \
|
||||
$(top_builddir)/mysys/libmysys.a \
|
||||
$(top_builddir)/strings/libmystrings.a
|
||||
$(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@
|
||||
|
||||
include $(top_srcdir)/ndb/config/common.mk.am
|
||||
|
||||
|
@ -125,11 +125,14 @@ ConfigInfo::m_SectionRules[] = {
|
||||
|
||||
{ "TCP", fixHostname, "HostName1" },
|
||||
{ "TCP", fixHostname, "HostName2" },
|
||||
{ "SCI", fixHostname, "HostName1" },
|
||||
{ "SCI", fixHostname, "HostName2" },
|
||||
{ "OSE", fixHostname, "HostName1" },
|
||||
{ "OSE", fixHostname, "HostName2" },
|
||||
|
||||
{ "TCP", fixPortNumber, 0 }, // has to come after fixHostName
|
||||
{ "SHM", fixPortNumber, 0 }, // has to come after fixHostName
|
||||
{ "SCI", fixPortNumber, 0 }, // has to come after fixHostName
|
||||
//{ "SHM", fixShmKey, 0 },
|
||||
|
||||
/**
|
||||
@ -159,6 +162,8 @@ ConfigInfo::m_SectionRules[] = {
|
||||
|
||||
{ "TCP", checkTCPConstraints, "HostName1" },
|
||||
{ "TCP", checkTCPConstraints, "HostName2" },
|
||||
{ "SCI", checkTCPConstraints, "HostName1" },
|
||||
{ "SCI", checkTCPConstraints, "HostName2" },
|
||||
|
||||
{ "*", checkMandatory, 0 },
|
||||
|
||||
@ -1788,7 +1793,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
|
||||
"Id of node ("DB_TOKEN", "API_TOKEN" or "MGM_TOKEN") on one side of the connection",
|
||||
ConfigInfo::USED,
|
||||
false,
|
||||
ConfigInfo::INT,
|
||||
ConfigInfo::STRING,
|
||||
MANDATORY,
|
||||
"0",
|
||||
STR_VALUE(MAX_INT_RNIL) },
|
||||
@ -1800,16 +1805,38 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
|
||||
"Id of node ("DB_TOKEN", "API_TOKEN" or "MGM_TOKEN") on one side of the connection",
|
||||
ConfigInfo::USED,
|
||||
false,
|
||||
ConfigInfo::INT,
|
||||
ConfigInfo::STRING,
|
||||
MANDATORY,
|
||||
"0",
|
||||
STR_VALUE(MAX_INT_RNIL) },
|
||||
|
||||
{
|
||||
CFG_SCI_ID_0,
|
||||
"SciId0",
|
||||
CFG_SCI_HOSTNAME_1,
|
||||
"HostName1",
|
||||
"SCI",
|
||||
"Local SCI-node id for adapter 0 (a computer can have two adapters)",
|
||||
"Name/IP of computer on one side of the connection",
|
||||
ConfigInfo::INTERNAL,
|
||||
false,
|
||||
ConfigInfo::STRING,
|
||||
UNDEFINED,
|
||||
0, 0 },
|
||||
|
||||
{
|
||||
CFG_SCI_HOSTNAME_2,
|
||||
"HostName2",
|
||||
"SCI",
|
||||
"Name/IP of computer on one side of the connection",
|
||||
ConfigInfo::INTERNAL,
|
||||
false,
|
||||
ConfigInfo::STRING,
|
||||
UNDEFINED,
|
||||
0, 0 },
|
||||
|
||||
{
|
||||
CFG_CONNECTION_SERVER_PORT,
|
||||
"PortNumber",
|
||||
"SCI",
|
||||
"Port used for this transporter",
|
||||
ConfigInfo::USED,
|
||||
false,
|
||||
ConfigInfo::INT,
|
||||
@ -1818,10 +1845,10 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
|
||||
STR_VALUE(MAX_INT_RNIL) },
|
||||
|
||||
{
|
||||
CFG_SCI_ID_1,
|
||||
"SciId1",
|
||||
CFG_SCI_HOST1_ID_0,
|
||||
"Host1SciId0",
|
||||
"SCI",
|
||||
"Local SCI-node id for adapter 1 (a computer can have two adapters)",
|
||||
"SCI-node id for adapter 0 on Host1 (a computer can have two adapters)",
|
||||
ConfigInfo::USED,
|
||||
false,
|
||||
ConfigInfo::INT,
|
||||
@ -1829,6 +1856,42 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
|
||||
"0",
|
||||
STR_VALUE(MAX_INT_RNIL) },
|
||||
|
||||
{
|
||||
CFG_SCI_HOST1_ID_1,
|
||||
"Host1SciId1",
|
||||
"SCI",
|
||||
"SCI-node id for adapter 1 on Host1 (a computer can have two adapters)",
|
||||
ConfigInfo::USED,
|
||||
false,
|
||||
ConfigInfo::INT,
|
||||
"0",
|
||||
"0",
|
||||
STR_VALUE(MAX_INT_RNIL) },
|
||||
|
||||
{
|
||||
CFG_SCI_HOST2_ID_0,
|
||||
"Host2SciId0",
|
||||
"SCI",
|
||||
"SCI-node id for adapter 0 on Host2 (a computer can have two adapters)",
|
||||
ConfigInfo::USED,
|
||||
false,
|
||||
ConfigInfo::INT,
|
||||
MANDATORY,
|
||||
"0",
|
||||
STR_VALUE(MAX_INT_RNIL) },
|
||||
|
||||
{
|
||||
CFG_SCI_HOST2_ID_1,
|
||||
"Host2SciId1",
|
||||
"SCI",
|
||||
"SCI-node id for adapter 1 on Host2 (a computer can have two adapters)",
|
||||
ConfigInfo::USED,
|
||||
false,
|
||||
ConfigInfo::INT,
|
||||
"0",
|
||||
"0",
|
||||
STR_VALUE(MAX_INT_RNIL) },
|
||||
|
||||
{
|
||||
CFG_CONNECTION_SEND_SIGNAL_ID,
|
||||
"SendSignalId",
|
||||
@ -1862,8 +1925,8 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
|
||||
false,
|
||||
ConfigInfo::INT,
|
||||
"2K",
|
||||
"512",
|
||||
STR_VALUE(MAX_INT_RNIL) },
|
||||
"128",
|
||||
"32K" },
|
||||
|
||||
{
|
||||
CFG_SCI_BUFFER_MEM,
|
||||
@ -1873,8 +1936,8 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
|
||||
ConfigInfo::USED,
|
||||
false,
|
||||
ConfigInfo::INT,
|
||||
"1M",
|
||||
"256K",
|
||||
"192K",
|
||||
"64K",
|
||||
STR_VALUE(MAX_INT_RNIL) },
|
||||
|
||||
{
|
||||
|
@ -29,7 +29,7 @@ LDADD_LOC = $(top_builddir)/ndb/src/libndbclient.la \
|
||||
$(top_builddir)/ndb/src/common/editline/libeditline.a \
|
||||
$(top_builddir)/dbug/libdbug.a \
|
||||
$(top_builddir)/mysys/libmysys.a \
|
||||
$(top_builddir)/strings/libmystrings.a
|
||||
$(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@
|
||||
@TERMCAP_LIB@
|
||||
|
||||
DEFS_LOC = -DDEFAULT_MYSQL_HOME="\"$(MYSQLBASEdir)\"" \
|
||||
|
@ -37,7 +37,7 @@ LDADD = @isam_libs@ \
|
||||
$(top_builddir)/mysys/libmysys.a \
|
||||
$(top_builddir)/dbug/libdbug.a \
|
||||
$(top_builddir)/regex/libregex.a \
|
||||
$(top_builddir)/strings/libmystrings.a @ZLIB_LIBS@
|
||||
$(top_builddir)/strings/libmystrings.a @ZLIB_LIBS@ @NDB_SCI_LIBS@
|
||||
|
||||
mysqld_LDADD = @MYSQLD_EXTRA_LDFLAGS@ \
|
||||
@bdb_libs@ @innodb_libs@ @pstack_libs@ \
|
||||
|
Reference in New Issue
Block a user