diff --git a/acinclude.m4 b/acinclude.m4
index 730ee15ed20..4f2ad8daf91 100644
--- a/acinclude.m4
+++ b/acinclude.m4
@@ -1599,11 +1599,6 @@ AC_DEFUN([MYSQL_CHECK_NDB_OPTIONS], [
;;
esac
- AC_ARG_WITH([ndb-shm],
- [
- --with-ndb-shm Include the NDB Cluster shared memory transporter],
- [ndb_shm="$withval"],
- [ndb_shm=no])
AC_ARG_WITH([ndb-test],
[
--with-ndb-test Include the NDB Cluster ndbapi test programs],
@@ -1633,19 +1628,6 @@ AC_DEFUN([MYSQL_CHECK_NDB_OPTIONS], [
AC_MSG_CHECKING([for NDB Cluster options])
AC_MSG_RESULT([])
- have_ndb_shm=no
- case "$ndb_shm" in
- yes )
- AC_MSG_RESULT([-- including shared memory transporter])
- AC_DEFINE([NDB_SHM_TRANSPORTER], [1],
- [Including Ndb Cluster DB shared memory transporter])
- have_ndb_shm="yes"
- ;;
- * )
- AC_MSG_RESULT([-- not including shared memory transporter])
- ;;
- esac
-
have_ndb_test=no
case "$ndb_test" in
yes )
diff --git a/configure.in b/configure.in
index 9be817c51da..bc78c9c8764 100644
--- a/configure.in
+++ b/configure.in
@@ -1923,7 +1923,9 @@ AC_CHECK_FUNCS(alarm bcmp bfill bmove bzero chsize cuserid fchmod fcntl \
pthread_attr_setstacksize pthread_condattr_create pthread_getsequence_np \
pthread_key_delete pthread_rwlock_rdlock pthread_setprio \
pthread_setprio_np pthread_setschedparam pthread_sigmask readlink \
- realpath rename rint rwlock_init setupterm sighold sigset sigthreadmask \
+ realpath rename rint rwlock_init setupterm \
+ shmget shmat shmdt shmctl \
+ sighold sigset sigthreadmask \
snprintf socket stpcpy strcasecmp strerror strnlen strpbrk strstr strtol \
strtoll strtoul strtoull tell tempnam thr_setconcurrency vidattr)
@@ -3078,10 +3080,19 @@ fi
AC_SUBST([ndb_port_base])
ndb_transporter_opt_objs=""
-if test X"$have_ndb_shm" = Xyes
+if test "$ac_cv_func_shmget" = "yes" &&
+ test "$ac_cv_func_shmat" = "yes" &&
+ test "$ac_cv_func_shmdt" = "yes" &&
+ test "$ac_cv_func_shmctl" = "yes"
then
- ndb_transporter_opt_objs="$ndb_transporter_opt_objs SHM_Transporter.lo SHM_Transporter.unix.lo"
+ AC_DEFINE([NDB_SHM_TRANSPORTER], [1],
+ [Including Ndb Cluster DB shared memory transporter])
+ AC_MSG_RESULT([Including ndb shared memory transporter])
+ ndb_transporter_opt_objs="$ndb_transporter_opt_objs SHM_Transporter.lo SHM_Transporter.unix.lo"
+else
+ AC_MSG_RESULT([Not including ndb shared memory transporter])
fi
+
if test X"$have_ndb_sci" = Xyes
then
ndb_transporter_opt_objs="$ndb_transporter_opt_objs SCI_Transporter.lo"
diff --git a/myisam/mi_rnext_same.c b/myisam/mi_rnext_same.c
index 1342718d6aa..a50c578e081 100644
--- a/myisam/mi_rnext_same.c
+++ b/myisam/mi_rnext_same.c
@@ -88,6 +88,10 @@ int mi_rnext_same(MI_INFO *info, byte *buf)
if (my_errno == HA_ERR_KEY_NOT_FOUND)
my_errno=HA_ERR_END_OF_FILE;
}
+ else if (!buf)
+ {
+ DBUG_RETURN(info->lastpos==HA_OFFSET_ERROR ? my_errno : 0);
+ }
else if (!(*info->read_record)(info,info->lastpos,buf))
{
info->update|= HA_STATE_AKTIV; /* Record is read */
diff --git a/myisammrg/myrg_rnext_same.c b/myisammrg/myrg_rnext_same.c
index b569459b77d..997e4100acd 100644
--- a/myisammrg/myrg_rnext_same.c
+++ b/myisammrg/myrg_rnext_same.c
@@ -16,25 +16,36 @@
#include "myrg_def.h"
+
int myrg_rnext_same(MYRG_INFO *info, byte *buf)
{
- uint err;
+ int err;
MI_INFO *mi;
if (!info->current_table)
return (HA_ERR_KEY_NOT_FOUND);
- err=mi_rnext_same(info->current_table->table,buf);
- if (err == HA_ERR_END_OF_FILE)
+ /* at first, do rnext for the table found before */
+ if ((err=mi_rnext_same(info->current_table->table,NULL)))
{
- queue_remove(&(info->by_key),0);
- if (!info->by_key.elements)
- return HA_ERR_END_OF_FILE;
-
- mi=(info->current_table=(MYRG_TABLE *)queue_top(&(info->by_key)))->table;
- mi->once_flags|= RRND_PRESERVE_LASTINX;
- return mi_rrnd(mi,buf,mi->lastpos);
+ if (err == HA_ERR_END_OF_FILE)
+ {
+ queue_remove(&(info->by_key),0);
+ if (!info->by_key.elements)
+ return HA_ERR_END_OF_FILE;
+ }
+ else
+ return err;
}
- return err;
+ else
+ {
+ /* Found here, adding to queue */
+ queue_top(&(info->by_key))=(byte *)(info->current_table);
+ queue_replaced(&(info->by_key));
+ }
+
+ /* now, mymerge's read_next is as simple as one queue_top */
+ mi=(info->current_table=(MYRG_TABLE *)queue_top(&(info->by_key)))->table;
+ return _myrg_mi_read_record(mi,buf);
}
diff --git a/mysql-test/r/func_concat.result b/mysql-test/r/func_concat.result
index ec53d6d87b0..419413e4156 100644
--- a/mysql-test/r/func_concat.result
+++ b/mysql-test/r/func_concat.result
@@ -32,3 +32,39 @@ select * from t1 where concat(A,C,B,D) = 'AAAA2003-03-011051';
a b c d
AAAA 105 2003-03-01 1
drop table t1;
+select 'a' union select concat('a', -4);
+a
+a
+a-4
+select 'a' union select concat('a', -4.5);
+a
+a
+a-4.5
+select 'a' union select concat('a', -(4 + 1));
+a
+a
+a-5
+select 'a' union select concat('a', 4 - 5);
+a
+a
+a-1
+select 'a' union select concat('a', -'3');
+a
+a
+a-3
+select 'a' union select concat('a', -concat('3',4));
+a
+a
+a-34
+select 'a' union select concat('a', -0);
+a
+a
+a0
+select 'a' union select concat('a', -0.0);
+a
+a
+a-0.0
+select 'a' union select concat('a', -0.0000);
+a
+a
+a-0.0000
diff --git a/mysql-test/r/merge.result b/mysql-test/r/merge.result
index 5755033190b..f71626221cb 100644
--- a/mysql-test/r/merge.result
+++ b/mysql-test/r/merge.result
@@ -651,3 +651,28 @@ ERROR HY000: You can't specify target table 't1' for update in FROM clause
create table t3 engine=merge union=(t1, t2) select * from t2;
ERROR HY000: You can't specify target table 't2' for update in FROM clause
drop table t1, t2;
+create table t1 (a int,b int,c int, index (a,b,c));
+create table t2 (a int,b int,c int, index (a,b,c));
+create table t3 (a int,b int,c int, index (a,b,c))
+engine=merge union=(t1 ,t2);
+insert into t1 (a,b,c) values (1,1,0),(1,2,0);
+insert into t2 (a,b,c) values (1,1,1),(1,2,1);
+explain select a,b,c from t3 force index (a) where a=1 order by a,b,c;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE t3 ref a a 5 const 2 Using where; Using index
+select a,b,c from t3 force index (a) where a=1 order by a,b,c;
+a b c
+1 1 0
+1 1 1
+1 2 0
+1 2 1
+explain select a,b,c from t3 force index (a) where a=1 order by a desc, b desc, c desc;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE t3 ref a a 5 const 2 Using where; Using index
+select a,b,c from t3 force index (a) where a=1 order by a desc, b desc, c desc;
+a b c
+1 2 1
+1 2 0
+1 1 1
+1 1 0
+drop table t1, t2, t3;
diff --git a/mysql-test/r/metadata.result b/mysql-test/r/metadata.result
index 2321a8998ac..3c7cf60db7a 100644
--- a/mysql-test/r/metadata.result
+++ b/mysql-test/r/metadata.result
@@ -3,7 +3,7 @@ select 1, 1.0, -1, "hello", NULL;
Catalog Database Table Table_alias Column Column_alias Name Type Length Max length Is_null Flags Decimals Charsetnr
def 1 8 1 1 N 32769 0 8
def 1.0 5 3 3 N 32769 1 8
-def -1 8 1 2 N 32769 0 8
+def -1 8 2 2 N 32769 0 8
def hello 254 5 5 N 1 31 8
def NULL 6 0 0 Y 32896 0 63
1 1.0 -1 hello NULL
diff --git a/mysql-test/t/func_concat.test b/mysql-test/t/func_concat.test
index 0cf1502b10e..78818cdda4e 100644
--- a/mysql-test/t/func_concat.test
+++ b/mysql-test/t/func_concat.test
@@ -34,3 +34,19 @@ create table t1 (a char(4), b double, c date, d tinyint(4));
insert into t1 values ('AAAA', 105, '2003-03-01', 1);
select * from t1 where concat(A,C,B,D) = 'AAAA2003-03-011051';
drop table t1;
+
+# BUG#6825
+select 'a' union select concat('a', -4);
+select 'a' union select concat('a', -4.5);
+
+select 'a' union select concat('a', -(4 + 1));
+select 'a' union select concat('a', 4 - 5);
+
+select 'a' union select concat('a', -'3');
+select 'a' union select concat('a', -concat('3',4));
+
+select 'a' union select concat('a', -0);
+select 'a' union select concat('a', -0.0);
+
+select 'a' union select concat('a', -0.0000);
+
diff --git a/mysql-test/t/merge.test b/mysql-test/t/merge.test
index 9580c1ab44c..b628cb07f7b 100644
--- a/mysql-test/t/merge.test
+++ b/mysql-test/t/merge.test
@@ -285,3 +285,21 @@ create table t3 engine=merge union=(t1, t2) select * from t1;
--error 1093
create table t3 engine=merge union=(t1, t2) select * from t2;
drop table t1, t2;
+
+# BUG#6699 : no sorting on 'ref' retrieval
+create table t1 (a int,b int,c int, index (a,b,c));
+create table t2 (a int,b int,c int, index (a,b,c));
+create table t3 (a int,b int,c int, index (a,b,c))
+ engine=merge union=(t1 ,t2);
+insert into t1 (a,b,c) values (1,1,0),(1,2,0);
+insert into t2 (a,b,c) values (1,1,1),(1,2,1);
+
+explain select a,b,c from t3 force index (a) where a=1 order by a,b,c;
+select a,b,c from t3 force index (a) where a=1 order by a,b,c;
+
+# this actually wasn't affected:
+explain select a,b,c from t3 force index (a) where a=1 order by a desc, b desc, c desc;
+select a,b,c from t3 force index (a) where a=1 order by a desc, b desc, c desc;
+
+drop table t1, t2, t3;
+
diff --git a/ndb/include/mgmapi/mgmapi_config_parameters.h b/ndb/include/mgmapi/mgmapi_config_parameters.h
index 6a0cd376355..406bdb1a110 100644
--- a/ndb/include/mgmapi/mgmapi_config_parameters.h
+++ b/ndb/include/mgmapi/mgmapi_config_parameters.h
@@ -110,6 +110,7 @@
#define CFG_CONNECTION_SERVER_PORT 406
#define CFG_CONNECTION_HOSTNAME_1 407
#define CFG_CONNECTION_HOSTNAME_2 408
+#define CFG_CONNECTION_GROUP 409
#define CFG_TCP_SERVER 452
#define CFG_TCP_SEND_BUFFER_SIZE 454
diff --git a/ndb/include/ndbapi/Ndb.hpp b/ndb/include/ndbapi/Ndb.hpp
index 1c9c2db5d6b..766409d64e2 100644
--- a/ndb/include/ndbapi/Ndb.hpp
+++ b/ndb/include/ndbapi/Ndb.hpp
@@ -901,23 +901,6 @@ typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*);
NDB_MAX_SCHEMA_NAME_SIZE + \
NDB_MAX_TAB_NAME_SIZE*2
-#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
-class NdbWaiter {
-public:
- NdbWaiter();
- ~NdbWaiter();
-
- void wait(int waitTime);
- void nodeFail(Uint32 node);
- void signal(Uint32 state);
-
- Uint32 m_node;
- Uint32 m_state;
- void * m_mutex;
- struct NdbCondition * m_condition;
-};
-#endif
-
/**
* @class Ndb
* @brief Represents the NDB kernel and is the main class of the NDB API.
@@ -1199,39 +1182,6 @@ public:
const char * keyData = 0,
Uint32 keyLen = 0);
-#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
- /**
- * This method is a modification of Ndb::startTransaction,
- * in which we use only the first two chars of keyData to
- * select transaction coordinator.
- * This is referred to as a distribution group.
- * There are two ways to use the method:
- * - In the first, the two characters are used directly as
- * the distribution key, and
- * - in the second the distribution is calculated as:
- * (10 * (char[0] - 0x30) + (char[1] - 0x30)).
- * Thus, in the second way, the two ASCII digits '78'
- * will provide the distribution key = 78.
- *
- * @note Transaction priorities are not yet supported.
- *
- * @param aPrio Priority of the transaction.
- * Priority 0 is the highest priority and is used for short transactions
- * with requirements on low delay.
- * Priority 1 is a medium priority for short transactions.
- * Priority 2 is a medium priority for long transactions.
- * Priority 3 is a low priority for long transactions.
- * @param keyData is a string of which the two first characters
- * is used to compute which fragement the data is stored in.
- * @param type is the type of distribution group.
- * 0 means direct usage of the two characters, and
- * 1 means the ASCII digit variant.
- * @return NdbConnection, or NULL if it failed.
- */
- NdbConnection* startTransactionDGroup(Uint32 aPrio,
- const char * keyData, int type);
-#endif
-
/**
* When a transactions is completed, the transaction has to be closed.
*
@@ -1586,8 +1536,6 @@ private:
/******************************************************************************
* These are the private variables in this class.
*****************************************************************************/
- Ndb_cluster_connection *m_ndb_cluster_connection;
-
NdbConnection** thePreparedTransactionsArray;
NdbConnection** theSentTransactionsArray;
NdbConnection** theCompletedTransactionsArray;
@@ -1601,8 +1549,6 @@ private:
Uint32 theNextConnectNode;
- NdbWaiter theWaiter;
-
bool fullyQualifiedNames;
// Ndb database name.
@@ -1658,35 +1604,6 @@ private:
InitConfigError
} theInitState;
- /**
- * Computes fragement id for primary key
- *
- * Note that keydata has to be "shaped" as it is being sent in KEYINFO
- */
- Uint32 computeFragmentId(const char * keyData, Uint32 keyLen);
- Uint32 getFragmentId(Uint32 hashValue);
-
- /**
- * Make a guess to which node is the primary for the fragment
- */
- Uint32 guessPrimaryNode(Uint32 fragmentId);
-
- /**
- * Structure containing values for guessing primary node
- */
- struct StartTransactionNodeSelectionData {
- StartTransactionNodeSelectionData():
- fragment2PrimaryNodeMap(0) {};
- Uint32 kValue;
- Uint32 hashValueMask;
- Uint32 hashpointerValue;
- Uint32 noOfFragments;
- Uint32 * fragment2PrimaryNodeMap;
-
- void init(Uint32 noOfNodes, Uint8 nodeIds[]);
- void release();
- } startTransactionNodeSelectionData;
-
NdbApiSignal* theCommitAckSignal;
diff --git a/ndb/include/ndbapi/ndb_cluster_connection.hpp b/ndb/include/ndbapi/ndb_cluster_connection.hpp
index db1cd0b119e..1b1c8575656 100644
--- a/ndb/include/ndbapi/ndb_cluster_connection.hpp
+++ b/ndb/include/ndbapi/ndb_cluster_connection.hpp
@@ -18,13 +18,7 @@
#ifndef CLUSTER_CONNECTION_HPP
#define CLUSTER_CONNECTION_HPP
-class TransporterFacade;
-class ConfigRetriever;
-struct NdbThread;
-
-extern "C" {
- void* run_ndb_cluster_connection_connect_thread(void*);
-}
+struct Ndb_cluster_connection_node_iter;
class Ndb_cluster_connection {
public:
@@ -32,16 +26,27 @@ public:
~Ndb_cluster_connection();
int connect(int no_retries, int retry_delay_in_seconds, int verbose);
int start_connect_thread(int (*connect_callback)(void)= 0);
+
+ // add check coupled to init state of cluster connection
+ // timeout_after_first_alive negative - ok only if all alive
+ // timeout_after_first_alive positive - ok if some alive
+ int wait_until_ready(int timeout_for_first_alive,
+ int timeout_after_first_alive);
+
const char *get_connectstring(char *buf, int buf_sz) const;
int get_connected_port() const;
const char *get_connected_host() const;
+
+ void set_optimized_node_selection(int val);
+
+ Uint32 no_db_nodes();
+
private:
- friend void* run_ndb_cluster_connection_connect_thread(void*);
- void connect_thread();
- TransporterFacade *m_facade;
- ConfigRetriever *m_config_retriever;
- NdbThread *m_connect_thread;
- int (*m_connect_callback)(void);
+ friend class Ndb;
+ friend class NdbImpl;
+ friend class Ndb_cluster_connection_impl;
+ class Ndb_cluster_connection_impl & m_impl;
+ Ndb_cluster_connection(Ndb_cluster_connection_impl&);
};
#endif
diff --git a/ndb/include/util/ndb_opts.h b/ndb/include/util/ndb_opts.h
index f7ae3b5489e..4bac36f5e5e 100644
--- a/ndb/include/util/ndb_opts.h
+++ b/ndb/include/util/ndb_opts.h
@@ -17,47 +17,62 @@
#ifndef _NDB_OPTS_H
#define _NDB_OPTS_H
+#include
#include
#include
#include
#include
+#define NDB_STD_OPTS_VARS \
+const char *opt_connect_str= 0;\
+my_bool opt_ndb_shm;\
+my_bool opt_ndb_optimized_node_selection
+
+#define NDB_STD_OPTS_OPTIONS \
+OPT_NDB_SHM= 256,\
+OPT_NDB_OPTIMIZED_NODE_SELECTION
+
+#define OPT_NDB_CONNECTSTRING 'c'
+
+#ifdef NDB_SHM_TRANSPORTER
+#define OPT_NDB_SHM_DEFAULT 1
+#else
+#define OPT_NDB_SHM_DEFAULT 0
+#endif
+
+#define NDB_STD_OPTS_COMMON \
+ { "usage", '?', "Display this help and exit.", \
+ 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
+ { "help", '?', "Display this help and exit.", \
+ 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
+ { "version", 'V', "Output version information and exit.", 0, 0, 0, \
+ GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
+ { "ndb-connectstring", OPT_NDB_CONNECTSTRING, \
+ "Set connect string for connecting to ndb_mgmd. " \
+ "Syntax: \"[nodeid=;][host=][:]\". " \
+ "Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg", \
+ (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \
+ GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },\
+ { "ndb-shm", OPT_NDB_SHM,\
+ "Allow optimizing using shared memory connections when available",\
+ (gptr*) &opt_ndb_shm, (gptr*) &opt_ndb_shm, 0,\
+ GET_BOOL, NO_ARG, OPT_NDB_SHM_DEFAULT, 0, 0, 0, 0, 0 },\
+ {"ndb-optimized-node-selection", OPT_NDB_OPTIMIZED_NODE_SELECTION,\
+ "Select nodes for transactions in a more optimal way",\
+ (gptr*) &opt_ndb_optimized_node_selection,\
+ (gptr*) &opt_ndb_optimized_node_selection, 0,\
+ GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},\
+ { "connect-string", OPT_NDB_CONNECTSTRING, "same as --ndb-connectstring",\
+ (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,\
+ GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }
+
#ifndef DBUG_OFF
#define NDB_STD_OPTS(prog_name) \
{ "debug", '#', "Output debug log. Often this is 'd:t:o,filename'.", \
0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0 }, \
- { "usage", '?', "Display this help and exit.", \
- 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
- { "help", '?', "Display this help and exit.", \
- 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
- { "version", 'V', "Output version information and exit.", 0, 0, 0, \
- GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
- { "ndb-connectstring", 'c', \
- "Set connect string for connecting to ndb_mgmd. " \
- "Syntax: \"[nodeid=;][host=][:]\". " \
- "Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg", \
- (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \
- GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },\
- { "connect-string", 'c', "same as --ndb-connectstring",\
- (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \
- GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }
+ NDB_STD_OPTS_COMMON
#else
-#define NDB_STD_OPTS(prog_name) \
- { "usage", '?', "Display this help and exit.", \
- 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
- { "help", '?', "Display this help and exit.", \
- 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
- { "version", 'V', "Output version information and exit.", 0, 0, 0, \
- GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
- { "ndb-connectstring", 'c', \
- "Set connect string for connecting to ndb_mgmd. " \
- "Syntax: \"[nodeid=;][host=][:]\". " \
- "Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg", \
- (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \
- GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },\
- { "connect-string", 'c', "same as --ndb-connectstring",\
- (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,\
- GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }
+#define NDB_STD_OPTS(prog_name) NDB_STD_OPTS_COMMON
#endif
#endif /*_NDB_OPTS_H */
diff --git a/ndb/src/kernel/vm/Configuration.cpp b/ndb/src/kernel/vm/Configuration.cpp
index 3de84bb0566..29255fc9837 100644
--- a/ndb/src/kernel/vm/Configuration.cpp
+++ b/ndb/src/kernel/vm/Configuration.cpp
@@ -46,7 +46,13 @@ extern "C" {
#include
extern EventLogger g_eventLogger;
-static const char* opt_connect_str= 0;
+enum ndbd_options {
+ NDB_STD_OPTS_OPTIONS,
+ OPT_INITIAL,
+ OPT_NODAEMON
+};
+
+NDB_STD_OPTS_VARS;
static int _daemon, _no_daemon, _initial, _no_start;
/**
* Arguments to NDB process
@@ -54,7 +60,7 @@ static int _daemon, _no_daemon, _initial, _no_start;
static struct my_option my_long_options[] =
{
NDB_STD_OPTS("ndbd"),
- { "initial", 256,
+ { "initial", OPT_INITIAL,
"Perform initial start of ndbd, including cleaning the file system. "
"Consult documentation before using this",
(gptr*) &_initial, (gptr*) &_initial, 0,
@@ -66,7 +72,7 @@ static struct my_option my_long_options[] =
{ "daemon", 'd', "Start ndbd as daemon (default)",
(gptr*) &_daemon, (gptr*) &_daemon, 0,
GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 },
- { "nodaemon", 257,
+ { "nodaemon", OPT_NODAEMON,
"Do not start ndbd as daemon, provided for testing purposes",
(gptr*) &_no_daemon, (gptr*) &_no_daemon, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
diff --git a/ndb/src/mgmclient/main.cpp b/ndb/src/mgmclient/main.cpp
index 84e27790705..9417c03805f 100644
--- a/ndb/src/mgmclient/main.cpp
+++ b/ndb/src/mgmclient/main.cpp
@@ -56,9 +56,13 @@ handler(int sig){
}
}
+enum ndb_mgm_options {
+ NDB_STD_OPTS_OPTIONS
+};
+NDB_STD_OPTS_VARS;
+
static const char default_prompt[]= "ndb_mgm> ";
static unsigned _try_reconnect;
-static char *opt_connect_str= 0;
static const char *prompt= default_prompt;
static char *opt_execute_str= 0;
diff --git a/ndb/src/mgmsrv/ConfigInfo.cpp b/ndb/src/mgmsrv/ConfigInfo.cpp
index ab2e34f6d3a..800ffe2e361 100644
--- a/ndb/src/mgmsrv/ConfigInfo.cpp
+++ b/ndb/src/mgmsrv/ConfigInfo.cpp
@@ -23,6 +23,8 @@
#include "InitConfigFileParser.hpp"
#include
+extern my_bool opt_ndb_shm;
+
#define MAX_LINE_LENGTH 255
#define KEY_INTERNAL 0
#define MAX_INT_RNIL 0xfffffeff
@@ -79,6 +81,7 @@ static bool transformSystem(InitConfigFileParser::Context & ctx, const char *);
static bool transformExternalSystem(InitConfigFileParser::Context & ctx, const char *);
static bool transformNode(InitConfigFileParser::Context & ctx, const char *);
static bool transformExtNode(InitConfigFileParser::Context & ctx, const char *);
+static bool checkConnectionSupport(InitConfigFileParser::Context & ctx, const char *);
static bool transformConnection(InitConfigFileParser::Context & ctx, const char *);
static bool applyDefaultValues(InitConfigFileParser::Context & ctx, const char *);
static bool checkMandatory(InitConfigFileParser::Context & ctx, const char *);
@@ -108,6 +111,11 @@ ConfigInfo::m_SectionRules[] = {
{ "REP", transformNode, 0 },
{ "EXTERNAL REP", transformExtNode, 0 },
+ { "TCP", checkConnectionSupport, 0 },
+ { "SHM", checkConnectionSupport, 0 },
+ { "SCI", checkConnectionSupport, 0 },
+ { "OSE", checkConnectionSupport, 0 },
+
{ "TCP", transformConnection, 0 },
{ "SHM", transformConnection, 0 },
{ "SCI", transformConnection, 0 },
@@ -130,6 +138,8 @@ ConfigInfo::m_SectionRules[] = {
{ "TCP", fixHostname, "HostName1" },
{ "TCP", fixHostname, "HostName2" },
+ { "SHM", fixHostname, "HostName1" },
+ { "SHM", fixHostname, "HostName2" },
{ "SCI", fixHostname, "HostName1" },
{ "SCI", fixHostname, "HostName2" },
{ "SHM", fixHostname, "HostName1" },
@@ -197,6 +207,9 @@ static bool sanity_checks(Vector§ions,
static bool add_node_connections(Vector§ions,
struct InitConfigFileParser::Context &ctx,
const char * rule_data);
+static bool set_connection_priorities(Vector§ions,
+ struct InitConfigFileParser::Context &ctx,
+ const char * rule_data);
static bool add_server_ports(Vector§ions,
struct InitConfigFileParser::Context &ctx,
const char * rule_data);
@@ -208,6 +221,7 @@ const ConfigInfo::ConfigRule
ConfigInfo::m_ConfigRules[] = {
{ sanity_checks, 0 },
{ add_node_connections, 0 },
+ { set_connection_priorities, 0 },
{ add_server_ports, 0 },
{ check_node_vs_replicas, 0 },
{ 0, 0 }
@@ -1582,6 +1596,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
MANDATORY,
0, 0 },
+ {
+ CFG_CONNECTION_GROUP,
+ "Group",
+ "TCP",
+ "",
+ ConfigInfo::CI_USED,
+ false,
+ ConfigInfo::CI_INT,
+ "55",
+ "0", "200" },
+
{
CFG_CONNECTION_SEND_SIGNAL_ID,
"SendSignalId",
@@ -1747,6 +1772,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
MANDATORY,
0, 0 },
+ {
+ CFG_CONNECTION_GROUP,
+ "Group",
+ "SHM",
+ "",
+ ConfigInfo::CI_USED,
+ false,
+ ConfigInfo::CI_INT,
+ "35",
+ "0", "200" },
+
{
CFG_CONNECTION_SEND_SIGNAL_ID,
"SendSignalId",
@@ -1780,7 +1816,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_USED,
false,
ConfigInfo::CI_INT,
- MANDATORY,
+ "0",
"0",
STR_VALUE(MAX_INT_RNIL) },
@@ -1857,6 +1893,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
"0",
STR_VALUE(MAX_INT_RNIL) },
+ {
+ CFG_CONNECTION_GROUP,
+ "Group",
+ "SCI",
+ "",
+ ConfigInfo::CI_USED,
+ false,
+ ConfigInfo::CI_INT,
+ "15",
+ "0", "200" },
+
{
CFG_CONNECTION_HOSTNAME_1,
"HostName1",
@@ -2680,12 +2727,51 @@ transformExtNode(InitConfigFileParser::Context & ctx, const char * data){
return true;
}
+/**
+ * Connection rule: Check support of connection
+ */
+bool
+checkConnectionSupport(InitConfigFileParser::Context & ctx, const char * data)
+{
+ int error= 0;
+ if (strcasecmp("TCP",ctx.fname) == 0)
+ {
+ // always enabled
+ }
+ else if (strcasecmp("SHM",ctx.fname) == 0)
+ {
+#ifndef NDB_SHM_TRANSPORTER
+ error= 1;
+#endif
+ }
+ else if (strcasecmp("SCI",ctx.fname) == 0)
+ {
+#ifndef NDB_SCI_TRANSPORTER
+ error= 1;
+#endif
+ }
+ else if (strcasecmp("OSE",ctx.fname) == 0)
+ {
+#ifndef NDB_OSE_TRANSPORTER
+ error= 1;
+#endif
+ }
+ if (error)
+ {
+ ctx.reportError("Binary not compiled with this connection support, "
+ "[%s] starting at line: %d",
+ ctx.fname, ctx.m_sectionLineno);
+ return false;
+ }
+ return true;
+}
+
/**
* Connection rule: Update "NoOfConnections"
*/
bool
-transformConnection(InitConfigFileParser::Context & ctx, const char * data){
-
+transformConnection(InitConfigFileParser::Context & ctx, const char * data)
+{
Uint32 connections = 0;
ctx.m_userProperties.get("NoOfConnections", &connections);
BaseString::snprintf(ctx.pname, sizeof(ctx.pname), "Connection_%d", connections);
@@ -3398,11 +3484,51 @@ sanity_checks(Vector§ions,
return true;
}
+static void
+add_a_connection(Vector§ions,
+ struct InitConfigFileParser::Context &ctx,
+ Uint32 nodeId1, Uint32 nodeId2, bool use_shm)
+{
+ ConfigInfo::ConfigRuleSection s;
+ const char *hostname1= 0, *hostname2= 0;
+ const Properties *tmp;
+
+ require(ctx.m_config->get("Node", nodeId1, &tmp));
+ tmp->get("HostName", &hostname1);
+
+ require(ctx.m_config->get("Node", nodeId2, &tmp));
+ tmp->get("HostName", &hostname2);
+
+ char buf[16];
+ s.m_sectionData= new Properties(true);
+ BaseString::snprintf(buf, sizeof(buf), "%u", nodeId1);
+ s.m_sectionData->put("NodeId1", buf);
+ BaseString::snprintf(buf, sizeof(buf), "%u", nodeId2);
+ s.m_sectionData->put("NodeId2", buf);
+
+ if (use_shm &&
+ hostname1 && hostname1[0] &&
+ hostname2 && hostname2[0] &&
+ strcmp(hostname1,hostname2) == 0)
+ {
+ s.m_sectionType= BaseString("SHM");
+ DBUG_PRINT("info",("adding SHM connection %d %d",nodeId1,nodeId2));
+ }
+ else
+ {
+ s.m_sectionType= BaseString("TCP");
+ DBUG_PRINT("info",("adding TCP connection %d %d",nodeId1,nodeId2));
+ }
+
+ sections.push_back(s);
+}
+
static bool
add_node_connections(Vector§ions,
struct InitConfigFileParser::Context &ctx,
const char * rule_data)
{
+ DBUG_ENTER("add_node_connections");
Uint32 i;
Properties * props= ctx.m_config;
Properties p_connections(true);
@@ -3427,9 +3553,10 @@ add_node_connections(Vector§ions,
ctx.m_userProperties.get("NoOfNodes", &nNodes);
Properties p_db_nodes(true);
- Properties p_api_mgm_nodes(true);
+ Properties p_api_nodes(true);
+ Properties p_mgm_nodes(true);
- Uint32 i_db= 0, i_api_mgm= 0, n;
+ Uint32 i_db= 0, i_api= 0, i_mgm= 0, n;
for (i= 0, n= 0; n < nNodes; i++){
const Properties * tmp;
if(!props->get("Node", i, &tmp)) continue;
@@ -3440,9 +3567,10 @@ add_node_connections(Vector§ions,
if (strcmp(type,DB_TOKEN) == 0)
p_db_nodes.put("", i_db++, i);
- else if (strcmp(type,API_TOKEN) == 0 ||
- strcmp(type,MGM_TOKEN) == 0)
- p_api_mgm_nodes.put("", i_api_mgm++, i);
+ else if (strcmp(type,API_TOKEN) == 0)
+ p_api_nodes.put("", i_api++, i);
+ else if (strcmp(type,MGM_TOKEN) == 0)
+ p_mgm_nodes.put("", i_mgm++, i);
}
Uint32 nodeId1, nodeId2, dummy;
@@ -3451,39 +3579,39 @@ add_node_connections(Vector§ions,
for (Uint32 j= i+1;; j++){
if(!p_db_nodes.get("", j, &nodeId2)) break;
if(!p_connections2.get("", nodeId1+nodeId2<<16, &dummy)) {
- ConfigInfo::ConfigRuleSection s;
- s.m_sectionType= BaseString("TCP");
- s.m_sectionData= new Properties(true);
- char buf[16];
- BaseString::snprintf(buf, sizeof(buf), "%u", nodeId1);
- s.m_sectionData->put("NodeId1", buf);
- BaseString::snprintf(buf, sizeof(buf), "%u", nodeId2);
- s.m_sectionData->put("NodeId2", buf);
- sections.push_back(s);
+ add_a_connection(sections,ctx,nodeId1,nodeId2,opt_ndb_shm);
}
}
}
- for (i= 0; p_api_mgm_nodes.get("", i, &nodeId1); i++){
+ for (i= 0; p_api_nodes.get("", i, &nodeId1); i++){
if(!p_connections.get("", nodeId1, &dummy)) {
for (Uint32 j= 0;; j++){
if(!p_db_nodes.get("", j, &nodeId2)) break;
- ConfigInfo::ConfigRuleSection s;
- s.m_sectionType= BaseString("TCP");
- s.m_sectionData= new Properties(true);
- char buf[16];
- BaseString::snprintf(buf, sizeof(buf), "%u", nodeId1);
- s.m_sectionData->put("NodeId1", buf);
- BaseString::snprintf(buf, sizeof(buf), "%u", nodeId2);
- s.m_sectionData->put("NodeId2", buf);
- sections.push_back(s);
+ add_a_connection(sections,ctx,nodeId1,nodeId2,opt_ndb_shm);
}
}
}
- return true;
+ for (i= 0; p_mgm_nodes.get("", i, &nodeId1); i++){
+ if(!p_connections.get("", nodeId1, &dummy)) {
+ for (Uint32 j= 0;; j++){
+ if(!p_db_nodes.get("", j, &nodeId2)) break;
+ add_a_connection(sections,ctx,nodeId1,nodeId2,0);
+ }
+ }
+ }
+
+ DBUG_RETURN(true);
}
+static bool set_connection_priorities(Vector§ions,
+ struct InitConfigFileParser::Context &ctx,
+ const char * rule_data)
+{
+ DBUG_ENTER("set_connection_priorities");
+ DBUG_RETURN(true);
+}
static bool add_server_ports(Vector§ions,
struct InitConfigFileParser::Context &ctx,
diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp
index 713dff912bb..3fcde997cb0 100644
--- a/ndb/src/mgmsrv/MgmtSrvr.cpp
+++ b/ndb/src/mgmsrv/MgmtSrvr.cpp
@@ -2225,9 +2225,24 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
if (*nodeId != 0 ||
type != NDB_MGM_NODE_TYPE_MGM ||
no_mgm == 1) { // any match is ok
+
+ if (config_hostname == 0 &&
+ *nodeId == 0 &&
+ type != NDB_MGM_NODE_TYPE_MGM)
+ {
+ if (!id_found) // only set if not set earlier
+ id_found= tmp;
+ continue; /* continue looking for a nodeid with specified
+ * hostname
+ */
+ }
+ assert(id_found == 0);
id_found= tmp;
break;
}
+ assert(no_mgm > 1);
+ assert(*nodeId != 0);
+ assert(type != NDB_MGM_NODE_TYPE_MGM);
if (id_found) { // mgmt server may only have one match
error_string.appfmt("Ambiguous node id's %d and %d.\n"
"Suggest specifying node id in connectstring,\n"
diff --git a/ndb/src/mgmsrv/main.cpp b/ndb/src/mgmsrv/main.cpp
index 992e827ceaa..04c95117214 100644
--- a/ndb/src/mgmsrv/main.cpp
+++ b/ndb/src/mgmsrv/main.cpp
@@ -89,50 +89,50 @@ bool g_StopServer;
extern EventLogger g_EventLogger;
extern int global_mgmt_server_check;
-static char *opt_connect_str= 0;
+
+enum ndb_mgmd_options {
+ NDB_STD_OPTS_OPTIONS,
+ OPT_INTERACTIVE,
+ OPT_NO_NODEID_CHECKS,
+ OPT_NO_DAEMON
+};
+NDB_STD_OPTS_VARS;
+
+#if NDB_VERSION_MAJOR <= 4
+#undef OPT_NDB_CONNECTSTRING
+#define OPT_NDB_CONNECTSTRING 1023
+#else
+
+#endif
static struct my_option my_long_options[] =
{
-#ifndef DBUG_OFF
- { "debug", '#', "Output debug log. Often this is 'd:t:o,filename'.",
- 0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0 },
-#endif
- { "usage", '?', "Display this help and exit.",
- 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
- { "help", '?', "Display this help and exit.",
- 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
- { "version", 'V', "Output version information and exit.", 0, 0, 0,
- GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
- { "ndb-connectstring", 1023,
- "Set connect string for connecting to ndb_mgmd. "
- "Syntax: \"[nodeid=;][host=][:]\". "
- "Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg",
- (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,
- GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
- { "connect-string", 1023,
- "same as --ndb-connectstring.",
- (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,
- GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
+ NDB_STD_OPTS("ndb_mgmd"),
{ "config-file", 'f', "Specify cluster configuration file",
(gptr*) &glob.config_filename, (gptr*) &glob.config_filename, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "daemon", 'd', "Run ndb_mgmd in daemon mode (default)",
(gptr*) &glob.daemon, (gptr*) &glob.daemon, 0,
GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 },
- { "interactive", 256, "Run interactive. Not supported but provided for testing purposes",
+ { "interactive", OPT_INTERACTIVE,
+ "Run interactive. Not supported but provided for testing purposes",
(gptr*) &glob.interactive, (gptr*) &glob.interactive, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
- { "no-nodeid-checks", 257, "Do not provide any node id checks",
+ { "no-nodeid-checks", OPT_NO_NODEID_CHECKS,
+ "Do not provide any node id checks",
(gptr*) &g_no_nodeid_checks, (gptr*) &g_no_nodeid_checks, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
- { "nodaemon", 258, "Don't run as daemon, but don't read from stdin",
+ { "nodaemon", OPT_NO_DAEMON,
+ "Don't run as daemon, but don't read from stdin",
(gptr*) &glob.non_interactive, (gptr*) &glob.non_interactive, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+#if NDB_VERSION_MAJOR <= 4
{ "config-file", 'c',
"-c provided for backwards compatability, will be removed in 5.0."
" Use -f instead",
(gptr*) &glob.config_filename, (gptr*) &glob.config_filename, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
+#endif
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
};
static void short_usage_sub(void)
@@ -164,6 +164,14 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
case 'c':
printf("Warning: -c will be removed in 5.0, use -f instead\n");
break;
+ case OPT_NDB_SHM:
+#ifndef NDB_SHM_TRANSPORTER
+ printf("Warning: binary not compiled with shared memory support,\n"
+ "use configure option --with-ndb-shm to enable support.\n"
+ "Tcp connections will now be used instead\n");
+ opt_ndb_shm= 0;
+#endif
+ break;
case '?':
usage();
exit(0);
diff --git a/ndb/src/ndbapi/DictCache.hpp b/ndb/src/ndbapi/DictCache.hpp
index a517acee56b..58c08a93e61 100644
--- a/ndb/src/ndbapi/DictCache.hpp
+++ b/ndb/src/ndbapi/DictCache.hpp
@@ -25,6 +25,7 @@
#include
#include
#include
+#include
#include "NdbLinHash.hpp"
class Ndb_local_table_info {
diff --git a/ndb/src/ndbapi/Ndb.cpp b/ndb/src/ndbapi/Ndb.cpp
index ca4592fb5eb..e9a125922c6 100644
--- a/ndb/src/ndbapi/Ndb.cpp
+++ b/ndb/src/ndbapi/Ndb.cpp
@@ -46,7 +46,6 @@ Connect to any node which has no connection at the moment.
NdbConnection* Ndb::doConnect(Uint32 tConNode)
{
Uint32 tNode;
- Uint32 i = 0;;
Uint32 tAnyAlive = 0;
int TretCode;
@@ -65,26 +64,51 @@ NdbConnection* Ndb::doConnect(Uint32 tConNode)
// We will connect to any node. Make sure that we have connections to all
// nodes.
//****************************************************************************
- Uint32 tNoOfDbNodes= theImpl->theNoOfDBnodes;
- Uint32 &theCurrentConnectIndex= theImpl->theCurrentConnectIndex;
- UintR Tcount = 0;
- do {
- theCurrentConnectIndex++;
- if (theCurrentConnectIndex >= tNoOfDbNodes) {
- theCurrentConnectIndex = 0;
- }//if
- Tcount++;
- tNode = theImpl->theDBnodes[theCurrentConnectIndex];
- TretCode = NDB_connect(tNode);
- if ((TretCode == 1) || (TretCode == 2)) {
+ if (theImpl->m_optimized_node_selection)
+ {
+ Ndb_cluster_connection_node_iter &node_iter=
+ theImpl->m_node_iter;
+ theImpl->m_ndb_cluster_connection.init_get_next_node(node_iter);
+ while ((tNode= theImpl->m_ndb_cluster_connection.get_next_node(node_iter)))
+ {
+ TretCode= NDB_connect(tNode);
+ if ((TretCode == 1) ||
+ (TretCode == 2))
+ {
//****************************************************************************
// We have connections now to the desired node. Return
//****************************************************************************
- return getConnectedNdbConnection(tNode);
- } else if (TretCode != 0) {
- tAnyAlive = 1;
- }//if
- } while (Tcount < tNoOfDbNodes);
+ return getConnectedNdbConnection(tNode);
+ } else if (TretCode != 0) {
+ tAnyAlive= 1;
+ }//if
+ }
+ }
+ else // just do a regular round robin
+ {
+ Uint32 tNoOfDbNodes= theImpl->theNoOfDBnodes;
+ Uint32 &theCurrentConnectIndex= theImpl->theCurrentConnectIndex;
+ UintR Tcount = 0;
+ do {
+ theCurrentConnectIndex++;
+ if (theCurrentConnectIndex >= tNoOfDbNodes)
+ theCurrentConnectIndex = 0;
+
+ Tcount++;
+ tNode= theImpl->theDBnodes[theCurrentConnectIndex];
+ TretCode= NDB_connect(tNode);
+ if ((TretCode == 1) ||
+ (TretCode == 2))
+ {
+//****************************************************************************
+// We have connections now to the desired node. Return
+//****************************************************************************
+ return getConnectedNdbConnection(tNode);
+ } else if (TretCode != 0) {
+ tAnyAlive= 1;
+ }//if
+ } while (Tcount < tNoOfDbNodes);
+ }
//****************************************************************************
// We were unable to find a free connection. If no node alive we will report
// error code for cluster failure otherwise connection failure.
@@ -149,8 +173,8 @@ Ndb::NDB_connect(Uint32 tNode)
tReturnCode = tp->sendSignal(tSignal, tNode);
releaseSignal(tSignal);
if (tReturnCode != -1) {
- theWaiter.m_node = tNode;
- theWaiter.m_state = WAIT_TC_SEIZE;
+ theImpl->theWaiter.m_node = tNode;
+ theImpl->theWaiter.m_state = WAIT_TC_SEIZE;
tReturnCode = receiveResponse();
}//if
} else {
@@ -243,50 +267,28 @@ Ndb::waitUntilReady(int timeout)
DBUG_RETURN(-1);
}
- do {
- if ((id = theNode) != 0) {
- unsigned int foundAliveNode = 0;
- TransporterFacade *tp = TransporterFacade::instance();
- tp->lock_mutex();
- for (unsigned int i = 0; i < theImpl->theNoOfDBnodes; i++) {
- const NodeId nodeId = theImpl->theDBnodes[i];
- //************************************************
- // If any node is answering, ndb is answering
- //************************************************
- if (tp->get_node_alive(nodeId) != 0) {
- foundAliveNode++;
- }//if
- }//for
-
- tp->unlock_mutex();
- if (foundAliveNode == theImpl->theNoOfDBnodes) {
- DBUG_RETURN(0);
- }//if
- if (foundAliveNode > 0) {
- noChecksSinceFirstAliveFound++;
- }//if
- if (noChecksSinceFirstAliveFound > 30) {
- DBUG_RETURN(0);
- }//if
- }//if theNode != 0
+ while (theNode == 0) {
if (secondsCounter >= timeout)
- break;
+ {
+ theError.code = 4269;
+ DBUG_RETURN(-1);
+ }
NdbSleep_MilliSleep(100);
milliCounter += 100;
if (milliCounter >= 1000) {
secondsCounter++;
milliCounter = 0;
}//if
- } while (1);
- if (id == 0) {
- theError.code = 4269;
+ }
+
+ if (theImpl->m_ndb_cluster_connection.wait_until_ready
+ (timeout-secondsCounter,30))
+ {
+ theError.code = 4009;
DBUG_RETURN(-1);
}
- if (noChecksSinceFirstAliveFound > 0) {
- DBUG_RETURN(0);
- }//if
- theError.code = 4009;
- DBUG_RETURN(-1);
+
+ DBUG_RETURN(0);
}
/*****************************************************************************
@@ -311,8 +313,8 @@ Ndb::startTransaction(Uint32 aPriority, const char * keyData, Uint32 keyLen)
*/
Uint32 nodeId;
if(keyData != 0) {
- Uint32 fragmentId = computeFragmentId(keyData, keyLen);
- nodeId = guessPrimaryNode(fragmentId);
+ nodeId = 0; // guess not supported
+ // nodeId = m_ndb_cluster_connection->guess_primary_node(keyData, keyLen);
} else {
nodeId = 0;
}//if
@@ -373,44 +375,6 @@ Ndb::hupp(NdbConnection* pBuddyTrans)
}//if
}//Ndb::hupp()
-NdbConnection*
-Ndb::startTransactionDGroup(Uint32 aPriority, const char * keyData, int type)
-{
-
- char DGroup[4];
- if ((keyData == NULL) ||
- (type > 1)) {
- theError.code = 4118;
- return NULL;
- }//if
- if (theInitState == Initialised) {
- theError.code = 0;
- checkFailedNode();
- /**
- * If the user supplied key data
- * We will make a qualified quess to which node is the primary for the
- * the fragment and contact that node
- */
- Uint32 fragmentId;
- if (type == 0) {
- DGroup[0] = keyData[0];
- DGroup[1] = keyData[1];
- DGroup[2] = 0x30;
- DGroup[3] = 0x30;
- fragmentId = computeFragmentId(&DGroup[0], 4);
- } else {
- Uint32 hashValue = ((keyData[0] - 0x30) * 10) + (keyData[1] - 0x30);
- fragmentId = getFragmentId(hashValue);
- }//if
- Uint32 nodeId = guessPrimaryNode(fragmentId);
- NdbConnection* trans= startTransactionLocal(aPriority, nodeId);
- DBUG_PRINT("exit", ("start DGroup trans: 0x%x transid: 0x%llx",
- trans, trans ? trans->getTransactionId() : 0));
- return trans;
- } else {
- return NULL;
- }//if
-}//Ndb::startTransaction()
NdbConnection*
Ndb::startTransactionLocal(Uint32 aPriority, Uint32 nodeId)
@@ -1010,118 +974,6 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
return ~0;
}
-static const Uint32 MAX_KEY_LEN_64_WORDS = 4;
-static const Uint32 MAX_KEY_LEN_32_WORDS = 8;
-static const Uint32 MAX_KEY_LEN_BYTES = 32;
-
-Uint32
-Ndb::computeFragmentId(const char * keyData, Uint32 keyLen)
-{
- Uint64 tempData[MAX_KEY_LEN_64_WORDS];
-
- const Uint32 usedKeyLen = (keyLen + 3) >> 2; // In words
- const char * usedKeyData = 0;
-
- /**
- * If key data buffer is not aligned (on 64 bit boundary)
- * or key len is not a multiple of 4
- * Use temp data
- */
- if(((((UintPtr)keyData) & 7) == 0) && ((keyLen & 3) == 0)) {
- usedKeyData = keyData;
- } else {
- memcpy(&tempData[0], keyData, keyLen);
- const int slack = keyLen & 3;
- if(slack > 0) {
- memset(&((char *)&tempData[0])[keyLen], 0, (4 - slack));
- }//if
- usedKeyData = (char *)&tempData[0];
- }//if
-
- Uint32 hashValue = md5_hash((Uint64 *)usedKeyData, usedKeyLen);
-
- hashValue >>= startTransactionNodeSelectionData.kValue;
- return getFragmentId(hashValue);
-}//Ndb::computeFragmentId()
-
-Uint32
-Ndb::getFragmentId(Uint32 hashValue)
-{
- Uint32 fragmentId = hashValue &
- startTransactionNodeSelectionData.hashValueMask;
- if(fragmentId < startTransactionNodeSelectionData.hashpointerValue) {
- fragmentId = hashValue &
- ((startTransactionNodeSelectionData.hashValueMask << 1) + 1);
- }//if
- return fragmentId;
-}
-
-Uint32
-Ndb::guessPrimaryNode(Uint32 fragmentId){
- //ASSERT(((fragmentId > 0) && fragmentId <
- // startTransactionNodeSelectionData.noOfFragments), "Invalid fragementId");
-
- return startTransactionNodeSelectionData.fragment2PrimaryNodeMap[fragmentId];
-}
-
-void
-Ndb::StartTransactionNodeSelectionData::init(Uint32 noOfNodes,
- Uint8 nodeIds[]) {
- kValue = 6;
- noOfFragments = 2 * noOfNodes;
-
- /**
- * Compute hashValueMask and hashpointerValue
- */
- {
- Uint32 topBit = (1 << 31);
- for(int i = 31; i>=0; i--){
- if((noOfFragments & topBit) != 0)
- break;
- topBit >>= 1;
- }
- hashValueMask = topBit - 1;
- hashpointerValue = noOfFragments - (hashValueMask + 1);
- }
-
- /**
- * This initialization depends on
- * the fact that:
- * primary node for fragment i = i % noOfNodes
- *
- * This algorithm should be implemented in Dbdih
- */
- {
- if (fragment2PrimaryNodeMap != 0)
- abort();
-
- fragment2PrimaryNodeMap = new Uint32[noOfFragments];
- Uint32 i;
- for(i = 0; i fragment2PrimaryNodeMap[j]){
- Uint32 tmp = fragment2PrimaryNodeMap[i];
- fragment2PrimaryNodeMap[i] = fragment2PrimaryNodeMap[j];
- fragment2PrimaryNodeMap[j] = tmp;
- }
-
- for(i = 0; i
#include
#include
-#include "NdbImpl.hpp"
+#include "NdbWaiter.hpp"
#include "DictCache.hpp"
class NdbDictObjectImpl {
diff --git a/ndb/src/ndbapi/NdbImpl.hpp b/ndb/src/ndbapi/NdbImpl.hpp
index 21a4706f890..00a8ef19f3a 100644
--- a/ndb/src/ndbapi/NdbImpl.hpp
+++ b/ndb/src/ndbapi/NdbImpl.hpp
@@ -17,7 +17,9 @@
#ifndef NDB_IMPL_HPP
#define NDB_IMPL_HPP
+#include
#include
+#include
#include
#include
#include
@@ -26,6 +28,8 @@
#include
+#include "ndb_cluster_connection_impl.hpp"
+#include "NdbDictionaryImpl.hpp"
#include "ObjectMap.hpp"
/**
@@ -33,11 +37,16 @@
*/
class NdbImpl {
public:
- NdbImpl();
+ NdbImpl(Ndb_cluster_connection *, Ndb&);
~NdbImpl();
+ Ndb_cluster_connection_impl &m_ndb_cluster_connection;
+
+ NdbDictionaryImpl m_dictionary;
+
// Ensure good distribution of connects
Uint32 theCurrentConnectIndex;
+ Ndb_cluster_connection_node_iter m_node_iter;
NdbObjectIdMap theNdbObjectIdMap;
@@ -46,6 +55,10 @@ public:
// 1 indicates to release all connections to node
Uint32 the_release_ind[MAX_NDB_NODES];
+
+ NdbWaiter theWaiter;
+
+ int m_optimized_node_selection;
};
#ifdef VM_TRACE
@@ -113,26 +126,6 @@ Ndb::checkInitState()
Uint32 convertEndian(Uint32 Data);
-enum WaitSignalType {
- NO_WAIT = 0,
- WAIT_NODE_FAILURE = 1, // Node failure during wait
- WST_WAIT_TIMEOUT = 2, // Timeout during wait
-
- WAIT_TC_SEIZE = 3,
- WAIT_TC_RELEASE = 4,
- WAIT_NDB_TAMPER = 5,
- WAIT_SCAN = 6,
-
- // DICT stuff
- WAIT_GET_TAB_INFO_REQ = 11,
- WAIT_CREATE_TAB_REQ = 12,
- WAIT_DROP_TAB_REQ = 13,
- WAIT_ALTER_TAB_REQ = 14,
- WAIT_CREATE_INDX_REQ = 15,
- WAIT_DROP_INDX_REQ = 16,
- WAIT_LIST_TABLES_CONF = 17
-};
-
enum LockMode {
Read,
Update,
@@ -140,44 +133,4 @@ enum LockMode {
Delete
};
-#include
-
-inline
-void
-NdbWaiter::wait(int waitTime)
-{
- const bool forever = (waitTime == -1);
- const NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + waitTime;
- while (1) {
- if (m_state == NO_WAIT || m_state == WAIT_NODE_FAILURE)
- break;
- if (forever) {
- NdbCondition_Wait(m_condition, (NdbMutex*)m_mutex);
- } else {
- if (waitTime <= 0) {
- m_state = WST_WAIT_TIMEOUT;
- break;
- }
- NdbCondition_WaitTimeout(m_condition, (NdbMutex*)m_mutex, waitTime);
- waitTime = maxTime - NdbTick_CurrentMillisecond();
- }
- }
-}
-
-inline
-void
-NdbWaiter::nodeFail(Uint32 aNodeId){
- if (m_state != NO_WAIT && m_node == aNodeId){
- m_state = WAIT_NODE_FAILURE;
- NdbCondition_Signal(m_condition);
- }
-}
-
-inline
-void
-NdbWaiter::signal(Uint32 state){
- m_state = state;
- NdbCondition_Signal(m_condition);
-}
-
#endif
diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp
index 88208409c08..a90c9f524a2 100644
--- a/ndb/src/ndbapi/NdbScanOperation.cpp
+++ b/ndb/src/ndbapi/NdbScanOperation.cpp
@@ -528,8 +528,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend)
/**
* No completed...
*/
- theNdb->theWaiter.m_node = nodeId;
- theNdb->theWaiter.m_state = WAIT_SCAN;
+ theNdb->theImpl->theWaiter.m_node = nodeId;
+ theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue;
@@ -1358,8 +1358,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
Uint32 tmp = m_sent_receivers_count;
s_idx = m_current_api_receiver;
while(m_sent_receivers_count > 0 && !theError.code){
- theNdb->theWaiter.m_node = nodeId;
- theNdb->theWaiter.m_state = WAIT_SCAN;
+ theNdb->theImpl->theWaiter.m_node = nodeId;
+ theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue;
@@ -1506,8 +1506,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
*/
while(theError.code == 0 && m_sent_receivers_count)
{
- theNdb->theWaiter.m_node = nodeId;
- theNdb->theWaiter.m_state = WAIT_SCAN;
+ theNdb->theImpl->theWaiter.m_node = nodeId;
+ theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){
case 0:
@@ -1576,8 +1576,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
*/
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
{
- theNdb->theWaiter.m_node = nodeId;
- theNdb->theWaiter.m_state = WAIT_SCAN;
+ theNdb->theImpl->theWaiter.m_node = nodeId;
+ theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){
case 0:
diff --git a/ndb/src/ndbapi/NdbWaiter.hpp b/ndb/src/ndbapi/NdbWaiter.hpp
new file mode 100644
index 00000000000..8b7b2a75879
--- /dev/null
+++ b/ndb/src/ndbapi/NdbWaiter.hpp
@@ -0,0 +1,102 @@
+/* Copyright (C) 2003 MySQL AB
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef NDB_WAITER_HPP
+#define NDB_WAITER_HPP
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+
+enum WaitSignalType {
+ NO_WAIT = 0,
+ WAIT_NODE_FAILURE = 1, // Node failure during wait
+ WST_WAIT_TIMEOUT = 2, // Timeout during wait
+
+ WAIT_TC_SEIZE = 3,
+ WAIT_TC_RELEASE = 4,
+ WAIT_NDB_TAMPER = 5,
+ WAIT_SCAN = 6,
+
+ // DICT stuff
+ WAIT_GET_TAB_INFO_REQ = 11,
+ WAIT_CREATE_TAB_REQ = 12,
+ WAIT_DROP_TAB_REQ = 13,
+ WAIT_ALTER_TAB_REQ = 14,
+ WAIT_CREATE_INDX_REQ = 15,
+ WAIT_DROP_INDX_REQ = 16,
+ WAIT_LIST_TABLES_CONF = 17
+};
+
+class NdbWaiter {
+public:
+ NdbWaiter();
+ ~NdbWaiter();
+
+ void wait(int waitTime);
+ void nodeFail(Uint32 node);
+ void signal(Uint32 state);
+
+ Uint32 m_node;
+ Uint32 m_state;
+ void * m_mutex;
+ struct NdbCondition * m_condition;
+};
+
+inline
+void
+NdbWaiter::wait(int waitTime)
+{
+ const bool forever = (waitTime == -1);
+ const NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + waitTime;
+ while (1) {
+ if (m_state == NO_WAIT || m_state == WAIT_NODE_FAILURE)
+ break;
+ if (forever) {
+ NdbCondition_Wait(m_condition, (NdbMutex*)m_mutex);
+ } else {
+ if (waitTime <= 0) {
+ m_state = WST_WAIT_TIMEOUT;
+ break;
+ }
+ NdbCondition_WaitTimeout(m_condition, (NdbMutex*)m_mutex, waitTime);
+ waitTime = maxTime - NdbTick_CurrentMillisecond();
+ }
+ }
+}
+
+inline
+void
+NdbWaiter::nodeFail(Uint32 aNodeId){
+ if (m_state != NO_WAIT && m_node == aNodeId){
+ m_state = WAIT_NODE_FAILURE;
+ NdbCondition_Signal(m_condition);
+ }
+}
+
+inline
+void
+NdbWaiter::signal(Uint32 state){
+ m_state = state;
+ NdbCondition_Signal(m_condition);
+}
+
+#endif
diff --git a/ndb/src/ndbapi/Ndbif.cpp b/ndb/src/ndbapi/Ndbif.cpp
index 232e55662f0..a4f233709c4 100644
--- a/ndb/src/ndbapi/Ndbif.cpp
+++ b/ndb/src/ndbapi/Ndbif.cpp
@@ -209,8 +209,6 @@ void Ndb::connected(Uint32 ref)
tmpTheNode,
theImpl->theNoOfDBnodes,
theFirstTransId));
- startTransactionNodeSelectionData.init(theImpl->theNoOfDBnodes,
- theImpl->theDBnodes);
theCommitAckSignal = new NdbApiSignal(theMyRef);
theDictionary->m_receiver.m_reference= theMyRef;
@@ -251,7 +249,7 @@ Ndb::report_node_failure(Uint32 node_id)
theImpl->the_release_ind[node_id] = 1;
// must come after
theImpl->the_release_ind[0] = 1;
- theWaiter.nodeFail(node_id);
+ theImpl->theWaiter.nodeFail(node_id);
return;
}//Ndb::report_node_failure()
@@ -330,7 +328,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
NdbConnection* tCon;
int tReturnCode = -1;
const Uint32* tDataPtr = aSignal->getDataPtr();
- const Uint32 tWaitState = theWaiter.m_state;
+ const Uint32 tWaitState = theImpl->theWaiter.m_state;
const Uint32 tSignalNumber = aSignal->readSignalNumber();
const Uint32 tFirstData = *tDataPtr;
const Uint32 tLen = aSignal->getLength();
@@ -401,7 +399,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
break;
case NdbReceiver::NDB_SCANRECEIVER:
tCon->theScanningOp->receiver_delivered(tRec);
- theWaiter.m_state = (((WaitSignalType) tWaitState) == WAIT_SCAN ?
+ theImpl->theWaiter.m_state = (((WaitSignalType) tWaitState) == WAIT_SCAN ?
(Uint32) NO_WAIT : tWaitState);
break;
default:
@@ -598,7 +596,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}//if
tReturnCode = tCon->receiveTCSEIZECONF(aSignal);
if (tReturnCode != -1) {
- theWaiter.m_state = NO_WAIT;
+ theImpl->theWaiter.m_state = NO_WAIT;
} else {
goto InvalidSignal;
}//if
@@ -618,7 +616,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}//if
tReturnCode = tCon->receiveTCSEIZEREF(aSignal);
if (tReturnCode != -1) {
- theWaiter.m_state = NO_WAIT;
+ theImpl->theWaiter.m_state = NO_WAIT;
} else {
return;
}//if
@@ -638,7 +636,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}//if
tReturnCode = tCon->receiveTCRELEASECONF(aSignal);
if (tReturnCode != -1) {
- theWaiter.m_state = NO_WAIT;
+ theImpl->theWaiter.m_state = NO_WAIT;
}//if
break;
}
@@ -656,7 +654,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}//if
tReturnCode = tCon->receiveTCRELEASEREF(aSignal);
if (tReturnCode != -1) {
- theWaiter.m_state = NO_WAIT;
+ theImpl->theWaiter.m_state = NO_WAIT;
}//if
break;
}
@@ -708,7 +706,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
return;
tReturnCode = tCon->receiveDIHNDBTAMPER(aSignal);
if (tReturnCode != -1)
- theWaiter.m_state = NO_WAIT;
+ theImpl->theWaiter.m_state = NO_WAIT;
break;
}
case GSN_SCAN_TABCONF:
@@ -730,7 +728,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
tLen - ScanTabConf::SignalLength);
}
if (tReturnCode != -1 && tWaitState == WAIT_SCAN)
- theWaiter.m_state = NO_WAIT;
+ theImpl->theWaiter.m_state = NO_WAIT;
break;
} else {
goto InvalidSignal;
@@ -749,7 +747,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
if (tCon->checkMagicNumber() == 0){
tReturnCode = tCon->receiveSCAN_TABREF(aSignal);
if (tReturnCode != -1 && tWaitState == WAIT_SCAN){
- theWaiter.m_state = NO_WAIT;
+ theImpl->theWaiter.m_state = NO_WAIT;
}
break;
}
@@ -774,7 +772,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
switch(com){
case 1:
tCon->theScanningOp->receiver_delivered(tRec);
- theWaiter.m_state = (((WaitSignalType) tWaitState) == WAIT_SCAN ?
+ theImpl->theWaiter.m_state = (((WaitSignalType) tWaitState) == WAIT_SCAN ?
(Uint32) NO_WAIT : tWaitState);
break;
case 0:
@@ -838,16 +836,16 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
goto InvalidSignal;
}//switch
- if (theWaiter.m_state == NO_WAIT) {
+ if (theImpl->theWaiter.m_state == NO_WAIT) {
// Wake up the thread waiting for response
- NdbCondition_Signal(theWaiter.m_condition);
+ NdbCondition_Signal(theImpl->theWaiter.m_condition);
}//if
return;
InvalidSignal:
#ifdef VM_TRACE
ndbout_c("Ndbif: Error Ndb::handleReceivedSignal "
- "(GSN=%d, theWaiter.m_state=%d)"
+ "(GSN=%d, theImpl->theWaiter.m_state=%d)"
" sender = (Block: %d Node: %d)",
tSignalNumber,
tWaitState,
@@ -895,7 +893,7 @@ Ndb::completedTransaction(NdbConnection* aCon)
if ((theMinNoOfEventsToWakeUp != 0) &&
(theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp)) {
theMinNoOfEventsToWakeUp = 0;
- NdbCondition_Signal(theWaiter.m_condition);
+ NdbCondition_Signal(theImpl->theWaiter.m_condition);
return;
}//if
} else {
@@ -1155,9 +1153,9 @@ void
Ndb::waitCompletedTransactions(int aMilliSecondsToWait,
int noOfEventsToWaitFor)
{
- theWaiter.m_state = NO_WAIT;
+ theImpl->theWaiter.m_state = NO_WAIT;
/**
- * theWaiter.m_state = NO_WAIT;
+ * theImpl->theWaiter.m_state = NO_WAIT;
* To ensure no messup with synchronous node fail handling
* (see ReportFailure)
*/
@@ -1166,8 +1164,8 @@ Ndb::waitCompletedTransactions(int aMilliSecondsToWait,
theMinNoOfEventsToWakeUp = noOfEventsToWaitFor;
do {
if (waitTime < 1000) waitTime = 1000;
- NdbCondition_WaitTimeout(theWaiter.m_condition,
- (NdbMutex*)theWaiter.m_mutex,
+ NdbCondition_WaitTimeout(theImpl->theWaiter.m_condition,
+ (NdbMutex*)theImpl->theWaiter.m_mutex,
waitTime);
if (theNoOfCompletedTransactions >= (Uint32)noOfEventsToWaitFor) {
break;
@@ -1273,23 +1271,23 @@ Ndb::receiveResponse(int waitTime){
int tResultCode;
TransporterFacade::instance()->checkForceSend(theNdbBlockNumber);
- theWaiter.wait(waitTime);
+ theImpl->theWaiter.wait(waitTime);
- if(theWaiter.m_state == NO_WAIT) {
+ if(theImpl->theWaiter.m_state == NO_WAIT) {
tResultCode = 0;
} else {
#ifdef VM_TRACE
- ndbout << "ERR: receiveResponse - theWaiter.m_state = ";
- ndbout << theWaiter.m_state << endl;
+ ndbout << "ERR: receiveResponse - theImpl->theWaiter.m_state = ";
+ ndbout << theImpl->theWaiter.m_state << endl;
#endif
- if (theWaiter.m_state == WAIT_NODE_FAILURE){
+ if (theImpl->theWaiter.m_state == WAIT_NODE_FAILURE){
tResultCode = -2;
} else {
tResultCode = -1;
}
- theWaiter.m_state = NO_WAIT;
+ theImpl->theWaiter.m_state = NO_WAIT;
}
return tResultCode;
}//Ndb::receiveResponse()
@@ -1321,8 +1319,8 @@ Ndb::sendRecSignal(Uint16 node_id,
if (tp->check_send_size(node_id, send_size)) {
return_code = tp->sendSignal(aSignal, node_id);
if (return_code != -1) {
- theWaiter.m_node = node_id;
- theWaiter.m_state = aWaitState;
+ theImpl->theWaiter.m_node = node_id;
+ theImpl->theWaiter.m_state = aWaitState;
return_code = receiveResponse();
} else {
return_code = -3;
diff --git a/ndb/src/ndbapi/Ndbinit.cpp b/ndb/src/ndbapi/Ndbinit.cpp
index 9754c25ab15..e1af7bd4cc5 100644
--- a/ndb/src/ndbapi/Ndbinit.cpp
+++ b/ndb/src/ndbapi/Ndbinit.cpp
@@ -50,7 +50,9 @@ Ndb(const char* aDataBase);
Parameters: aDataBase : Name of the database.
Remark: Connect to the database.
***************************************************************************/
-Ndb::Ndb( const char* aDataBase , const char* aSchema) {
+Ndb::Ndb( const char* aDataBase , const char* aSchema)
+ : theImpl(NULL)
+{
DBUG_ENTER("Ndb::Ndb()");
DBUG_PRINT("enter",("(old)Ndb::Ndb this=0x%x", this));
if (theNoOfNdbObjects < 0)
@@ -66,6 +68,7 @@ Ndb::Ndb( const char* aDataBase , const char* aSchema) {
Ndb::Ndb( Ndb_cluster_connection *ndb_cluster_connection,
const char* aDataBase , const char* aSchema)
+ : theImpl(NULL)
{
DBUG_ENTER("Ndb::Ndb()");
DBUG_PRINT("enter",("Ndb::Ndb this=0x%x", this));
@@ -82,7 +85,10 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
{
DBUG_ENTER("Ndb::setup");
- m_ndb_cluster_connection= ndb_cluster_connection;
+ assert(theImpl == NULL);
+ theImpl= new NdbImpl(ndb_cluster_connection,*this);
+ theDictionary= &(theImpl->m_dictionary);
+
thePreparedTransactionsArray= NULL;
theSentTransactionsArray= NULL;
theCompletedTransactionsArray= NULL;
@@ -93,8 +99,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
theMaxNoOfTransactions= 0;
theMinNoOfEventsToWakeUp= 0;
prefixEnd= NULL;
- theImpl= NULL;
- theDictionary= NULL;
theConIdleList= NULL;
theOpIdleList= NULL;
theScanOpIdleList= NULL;
@@ -153,14 +157,12 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
prefixEnd = prefixName + (len < (int) sizeof(prefixName) ? len :
sizeof(prefixName) - 1);
- theWaiter.m_mutex = TransporterFacade::instance()->theMutexPtr;
+ theImpl->theWaiter.m_mutex = TransporterFacade::instance()->theMutexPtr;
// Signal that the constructor has finished OK
if (theInitState == NotConstructed)
theInitState = NotInitialised;
- theImpl = new NdbImpl();
-
{
NdbGlobalEventBufferHandle *h=
NdbGlobalEventBuffer_init(NDB_MAX_ACTIVE_EVENTS);
@@ -171,11 +173,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
theGlobalEventBufferHandle = h;
}
- theDictionary = new NdbDictionaryImpl(*this);
- if (theDictionary == NULL) {
- ndbout_c("Ndb cailed to allocate dictionary");
- exit(-1);
- }
DBUG_VOID_RETURN;
}
@@ -201,8 +198,6 @@ Ndb::~Ndb()
DBUG_PRINT("enter",("Ndb::~Ndb this=0x%x",this));
doDisconnect();
- delete theDictionary;
-
NdbGlobalEventBuffer_drop(theGlobalEventBufferHandle);
if (TransporterFacade::instance() != NULL && theNdbBlockNumber > 0){
@@ -245,7 +240,6 @@ Ndb::~Ndb()
freeSignal();
releaseTransactionArrays();
- startTransactionNodeSelectionData.release();
delete []theConnectionArray;
if(theCommitAckSignal != NULL){
@@ -292,14 +286,20 @@ NdbWaiter::~NdbWaiter(){
NdbCondition_Destroy(m_condition);
}
-NdbImpl::NdbImpl() : theNdbObjectIdMap(1024,1024),
- theCurrentConnectIndex(0),
- theNoOfDBnodes(0)
+NdbImpl::NdbImpl(Ndb_cluster_connection *ndb_cluster_connection,
+ Ndb& ndb)
+ : m_ndb_cluster_connection(ndb_cluster_connection->m_impl),
+ m_dictionary(ndb),
+ theCurrentConnectIndex(0),
+ theNdbObjectIdMap(1024,1024),
+ theNoOfDBnodes(0)
{
int i;
for (i = 0; i < MAX_NDB_NODES; i++) {
the_release_ind[i] = 0;
}
+ m_optimized_node_selection=
+ m_ndb_cluster_connection.m_optimized_node_selection;
}
NdbImpl::~NdbImpl()
diff --git a/ndb/src/ndbapi/TransporterFacade.hpp b/ndb/src/ndbapi/TransporterFacade.hpp
index 5680e3a6f03..99edea846c1 100644
--- a/ndb/src/ndbapi/TransporterFacade.hpp
+++ b/ndb/src/ndbapi/TransporterFacade.hpp
@@ -127,7 +127,7 @@ private:
friend class ExtSender; ///< @todo Hack to be able to sendSignalUnCond
friend class GrepSS;
friend class Ndb;
- friend class Ndb_cluster_connection;
+ friend class Ndb_cluster_connection_impl;
int sendSignalUnCond(NdbApiSignal *, NodeId nodeId);
diff --git a/ndb/src/ndbapi/ndb_cluster_connection.cpp b/ndb/src/ndbapi/ndb_cluster_connection.cpp
index f436ee56ede..98a52786aab 100644
--- a/ndb/src/ndbapi/ndb_cluster_connection.cpp
+++ b/ndb/src/ndbapi/ndb_cluster_connection.cpp
@@ -18,7 +18,9 @@
#include
#include
-#include
+#include "ndb_cluster_connection_impl.hpp"
+#include
+#include
#include
#include
#include
@@ -26,6 +28,8 @@
#include
#include
#include
+#include
+#include
static int g_run_connect_thread= 0;
@@ -35,13 +39,226 @@ NdbMutex *ndb_global_event_buffer_mutex= NULL;
NdbMutex *ndb_print_state_mutex= NULL;
#endif
+/*
+ * Ndb_cluster_connection
+ */
+
Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
+ : m_impl(* new Ndb_cluster_connection_impl(connect_string))
+{
+}
+
+Ndb_cluster_connection::Ndb_cluster_connection
+(Ndb_cluster_connection_impl& impl) : m_impl(impl)
+{
+}
+
+Ndb_cluster_connection::~Ndb_cluster_connection()
+{
+ Ndb_cluster_connection_impl *tmp = &m_impl;
+ if (this != tmp)
+ delete tmp;
+}
+
+int Ndb_cluster_connection::get_connected_port() const
+{
+ if (m_impl.m_config_retriever)
+ return m_impl.m_config_retriever->get_mgmd_port();
+ return -1;
+}
+
+const char *Ndb_cluster_connection::get_connected_host() const
+{
+ if (m_impl.m_config_retriever)
+ return m_impl.m_config_retriever->get_mgmd_host();
+ return 0;
+}
+
+const char *Ndb_cluster_connection::get_connectstring(char *buf,
+ int buf_sz) const
+{
+ if (m_impl.m_config_retriever)
+ return m_impl.m_config_retriever->get_connectstring(buf,buf_sz);
+ return 0;
+}
+
+extern "C" pthread_handler_decl(run_ndb_cluster_connection_connect_thread, me)
+{
+ my_thread_init();
+ g_run_connect_thread= 1;
+ ((Ndb_cluster_connection_impl*) me)->connect_thread();
+ my_thread_end();
+ NdbThread_Exit(0);
+ return me;
+}
+
+int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void))
+{
+ int r;
+ DBUG_ENTER("Ndb_cluster_connection::start_connect_thread");
+ m_impl.m_connect_callback= connect_callback;
+ if ((r = connect(0,0,0)) == 1)
+ {
+ DBUG_PRINT("info",("starting thread"));
+ m_impl.m_connect_thread=
+ NdbThread_Create(run_ndb_cluster_connection_connect_thread,
+ (void**)&m_impl, 32768, "ndb_cluster_connection",
+ NDB_THREAD_PRIO_LOW);
+ }
+ else if (r < 0)
+ {
+ DBUG_RETURN(-1);
+ }
+ else if (m_impl.m_connect_callback)
+ {
+ (*m_impl.m_connect_callback)();
+ }
+ DBUG_RETURN(0);
+}
+
+void Ndb_cluster_connection::set_optimized_node_selection(int val)
+{
+ m_impl.m_optimized_node_selection= val;
+}
+
+void
+Ndb_cluster_connection_impl::init_get_next_node
+(Ndb_cluster_connection_node_iter &iter)
+{
+ if (iter.scan_state != (Uint8)~0)
+ iter.cur_pos= iter.scan_state;
+ if (iter.cur_pos >= no_db_nodes())
+ iter.cur_pos= 0;
+ iter.init_pos= iter.cur_pos;
+ iter.scan_state= 0;
+ // fprintf(stderr,"[init %d]",iter.init_pos);
+ return;
+}
+
+Uint32
+Ndb_cluster_connection_impl::get_next_node(Ndb_cluster_connection_node_iter &iter)
+{
+ Uint32 cur_pos= iter.cur_pos;
+ if (cur_pos >= no_db_nodes())
+ return 0;
+
+ Ndb_cluster_connection_impl::Node *nodes= m_impl.m_all_nodes.getBase();
+ Ndb_cluster_connection_impl::Node &node= nodes[cur_pos];
+
+ if (iter.scan_state != (Uint8)~0)
+ {
+ assert(iter.scan_state < no_db_nodes());
+ if (nodes[iter.scan_state].group == node.group)
+ iter.scan_state= ~0;
+ else
+ return nodes[iter.scan_state++].id;
+ }
+
+ // fprintf(stderr,"[%d]",node.id);
+
+ cur_pos++;
+ Uint32 init_pos= iter.init_pos;
+ if (cur_pos == node.next_group)
+ {
+ cur_pos= nodes[init_pos].this_group;
+ }
+
+ // fprintf(stderr,"[cur_pos %d]",cur_pos);
+ if (cur_pos != init_pos)
+ iter.cur_pos= cur_pos;
+ else
+ {
+ iter.cur_pos= node.next_group;
+ iter.init_pos= node.next_group;
+ }
+ return node.id;
+}
+
+Uint32
+Ndb_cluster_connection::no_db_nodes()
+{
+ return m_impl.m_all_nodes.size();
+}
+
+
+int
+Ndb_cluster_connection::wait_until_ready(int timeout,
+ int timeout_after_first_alive)
+{
+ DBUG_ENTER("Ndb_cluster_connection::wait_until_ready");
+ TransporterFacade *tp = TransporterFacade::instance();
+ if (tp == 0)
+ {
+ DBUG_RETURN(-1);
+ }
+ if (tp->ownId() == 0)
+ {
+ DBUG_RETURN(-1);
+ }
+ int secondsCounter = 0;
+ int milliCounter = 0;
+ int noChecksSinceFirstAliveFound = 0;
+ do {
+ unsigned int foundAliveNode = 0;
+ tp->lock_mutex();
+ for(unsigned i= 0; i < no_db_nodes(); i++)
+ {
+ //************************************************
+ // If any node is answering, ndb is answering
+ //************************************************
+ if (tp->get_node_alive(m_impl.m_all_nodes[i].id) != 0) {
+ foundAliveNode++;
+ }
+ }
+ tp->unlock_mutex();
+
+ if (foundAliveNode == no_db_nodes())
+ {
+ DBUG_RETURN(0);
+ }
+ else if (foundAliveNode > 0)
+ {
+ noChecksSinceFirstAliveFound++;
+ if (timeout_after_first_alive >= 0)
+ {
+ if (noChecksSinceFirstAliveFound > timeout_after_first_alive)
+ DBUG_RETURN(0);
+ }
+ else // timeout_after_first_alive < 0
+ {
+ if (noChecksSinceFirstAliveFound > -timeout_after_first_alive)
+ DBUG_RETURN(-1);
+ }
+ }
+ else if (secondsCounter >= timeout)
+ { // no alive nodes and timed out
+ DBUG_RETURN(-1);
+ }
+ NdbSleep_MilliSleep(100);
+ milliCounter += 100;
+ if (milliCounter >= 1000) {
+ secondsCounter++;
+ milliCounter = 0;
+ }//if
+ } while (1);
+}
+
+
+
+/*
+ * Ndb_cluster_connection_impl
+ */
+
+Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char *
+ connect_string)
+ : Ndb_cluster_connection(*this),
+ m_optimized_node_selection(1)
{
DBUG_ENTER("Ndb_cluster_connection");
DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%x", this));
- m_facade= TransporterFacade::theFacadeInstance= new TransporterFacade();
+ m_transporter_facade=
+ TransporterFacade::theFacadeInstance= new TransporterFacade();
- m_config_retriever= 0;
m_connect_thread= 0;
m_connect_callback= 0;
@@ -64,43 +281,230 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
delete m_config_retriever;
m_config_retriever= 0;
}
+
DBUG_VOID_RETURN;
}
-int Ndb_cluster_connection::get_connected_port() const
+Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
{
+ DBUG_ENTER("~Ndb_cluster_connection");
+ DBUG_PRINT("enter",("~Ndb_cluster_connection this=0x%x", this));
+ TransporterFacade::stop_instance();
+ if (m_connect_thread)
+ {
+ void *status;
+ g_run_connect_thread= 0;
+ NdbThread_WaitFor(m_connect_thread, &status);
+ NdbThread_Destroy(&m_connect_thread);
+ m_connect_thread= 0;
+ }
+ if (m_transporter_facade != 0)
+ {
+ delete m_transporter_facade;
+ if (m_transporter_facade != TransporterFacade::theFacadeInstance)
+ abort();
+ TransporterFacade::theFacadeInstance= 0;
+ }
if (m_config_retriever)
- return m_config_retriever->get_mgmd_port();
- return -1;
+ delete m_config_retriever;
+
+ // fragmentToNodeMap.release();
+
+ DBUG_VOID_RETURN;
}
-const char *Ndb_cluster_connection::get_connected_host() const
+void
+Ndb_cluster_connection_impl::init_nodes_vector(Uint32 nodeid,
+ const ndb_mgm_configuration
+ &config)
{
- if (m_config_retriever)
- return m_config_retriever->get_mgmd_host();
- return 0;
+ DBUG_ENTER("Ndb_cluster_connection_impl::init_nodes_vector");
+ ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
+
+ for(iter.first(); iter.valid(); iter.next())
+ {
+ Uint32 nodeid1, nodeid2, remoteNodeId, group= 5;
+ const char * remoteHostName= 0, * localHostName= 0;
+ if(iter.get(CFG_CONNECTION_NODE_1, &nodeid1)) continue;
+ if(iter.get(CFG_CONNECTION_NODE_2, &nodeid2)) continue;
+
+ if(nodeid1 != nodeid && nodeid2 != nodeid) continue;
+ remoteNodeId = (nodeid == nodeid1 ? nodeid2 : nodeid1);
+
+ iter.get(CFG_CONNECTION_GROUP, &group);
+
+ {
+ const char * host1= 0, * host2= 0;
+ iter.get(CFG_CONNECTION_HOSTNAME_1, &host1);
+ iter.get(CFG_CONNECTION_HOSTNAME_2, &host2);
+ localHostName = (nodeid == nodeid1 ? host1 : host2);
+ remoteHostName = (nodeid == nodeid1 ? host2 : host1);
+ }
+
+ Uint32 type = ~0;
+ if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue;
+
+ switch(type){
+ case CONNECTION_TYPE_SHM:{
+ break;
+ }
+ case CONNECTION_TYPE_SCI:{
+ break;
+ }
+ case CONNECTION_TYPE_TCP:{
+ // connecting through localhost
+ // check if config_hostname is local
+ if (SocketServer::tryBind(0,remoteHostName))
+ group--; // upgrade group value
+ break;
+ }
+ case CONNECTION_TYPE_OSE:{
+ break;
+ }
+ }
+ m_impl.m_all_nodes.push_back(Node(group,remoteNodeId));
+ DBUG_PRINT("info",("saved %d %d", group,remoteNodeId));
+ for (int i= m_impl.m_all_nodes.size()-2;
+ i >= 0 && m_impl.m_all_nodes[i].group > m_impl.m_all_nodes[i+1].group;
+ i--)
+ {
+ Node tmp= m_impl.m_all_nodes[i];
+ m_impl.m_all_nodes[i]= m_impl.m_all_nodes[i+1];
+ m_impl.m_all_nodes[i+1]= tmp;
+ }
+ }
+
+ int i;
+ Uint32 cur_group, i_group= 0;
+ cur_group= ~0;
+ for (i= (int)m_impl.m_all_nodes.size()-1; i >= 0; i--)
+ {
+ if (m_impl.m_all_nodes[i].group != cur_group)
+ {
+ cur_group= m_impl.m_all_nodes[i].group;
+ i_group= i+1;
+ }
+ m_impl.m_all_nodes[i].next_group= i_group;
+ }
+ cur_group= ~0;
+ for (i= 0; i < (int)m_impl.m_all_nodes.size(); i++)
+ {
+ if (m_impl.m_all_nodes[i].group != cur_group)
+ {
+ cur_group= m_impl.m_all_nodes[i].group;
+ i_group= i;
+ }
+ m_impl.m_all_nodes[i].this_group= i_group;
+ }
+#if 0
+ for (i= 0; i < (int)m_impl.m_all_nodes.size(); i++)
+ {
+ fprintf(stderr, "[%d] %d %d %d %d\n",
+ i,
+ m_impl.m_all_nodes[i].id,
+ m_impl.m_all_nodes[i].group,
+ m_impl.m_all_nodes[i].this_group,
+ m_impl.m_all_nodes[i].next_group);
+ }
+
+ do_test();
+#endif
+ DBUG_VOID_RETURN;
}
-const char *Ndb_cluster_connection::get_connectstring(char *buf, int buf_sz) const
+void
+Ndb_cluster_connection_impl::do_test()
{
- if (m_config_retriever)
- return m_config_retriever->get_connectstring(buf,buf_sz);
- return 0;
+ Ndb_cluster_connection_node_iter iter;
+ int n= no_db_nodes()+5;
+ Uint32 *nodes= new Uint32[n+1];
+
+ for (int g= 0; g < n; g++)
+ {
+ for (int h= 0; h < n; h++)
+ {
+ Uint32 id;
+ Ndb_cluster_connection_node_iter iter2;
+ {
+ for (int j= 0; j < g; j++)
+ {
+ nodes[j]= get_next_node(iter2);
+ }
+ }
+
+ for (int i= 0; i < n; i++)
+ {
+ init_get_next_node(iter);
+ fprintf(stderr, "%d dead:(", g);
+ id= 0;
+ while (id == 0)
+ {
+ if ((id= get_next_node(iter)) == 0)
+ break;
+ for (int j= 0; j < g; j++)
+ {
+ if (nodes[j] == id)
+ {
+ fprintf(stderr, " %d", id);
+ id= 0;
+ break;
+ }
+ }
+ }
+ fprintf(stderr, ")");
+ if (id == 0)
+ {
+ break;
+ }
+ fprintf(stderr, " %d\n", id);
+ }
+ fprintf(stderr, "\n");
+ }
+ }
+ delete [] nodes;
}
-extern "C" pthread_handler_decl(run_ndb_cluster_connection_connect_thread, me)
+int Ndb_cluster_connection::connect(int no_retries, int retry_delay_in_seconds,
+ int verbose)
{
- my_thread_init();
- g_run_connect_thread= 1;
- ((Ndb_cluster_connection*) me)->connect_thread();
- my_thread_end();
- NdbThread_Exit(0);
- return me;
+ DBUG_ENTER("Ndb_cluster_connection::connect");
+ const char* error = 0;
+ do {
+ if (m_impl.m_config_retriever == 0)
+ DBUG_RETURN(-1);
+ if (m_impl.m_config_retriever->do_connect(no_retries,
+ retry_delay_in_seconds,
+ verbose))
+ DBUG_RETURN(1); // mgmt server not up yet
+
+ Uint32 nodeId = m_impl.m_config_retriever->allocNodeId(4/*retries*/,
+ 3/*delay*/);
+ if(nodeId == 0)
+ break;
+ ndb_mgm_configuration * props = m_impl.m_config_retriever->getConfig();
+ if(props == 0)
+ break;
+ m_impl.m_transporter_facade->start_instance(nodeId, props);
+
+ m_impl.init_nodes_vector(nodeId, *props);
+
+ ndb_mgm_destroy_configuration(props);
+ m_impl.m_transporter_facade->connected();
+ DBUG_RETURN(0);
+ } while(0);
+
+ ndbout << "Configuration error: ";
+ const char* erString = m_impl.m_config_retriever->getErrorString();
+ if (erString == 0) {
+ erString = "No error specified!";
+ }
+ ndbout << erString << endl;
+ DBUG_RETURN(-1);
}
-void Ndb_cluster_connection::connect_thread()
+void Ndb_cluster_connection_impl::connect_thread()
{
- DBUG_ENTER("Ndb_cluster_connection::connect_thread");
+ DBUG_ENTER("Ndb_cluster_connection_impl::connect_thread");
int r;
do {
NdbSleep_SecSleep(1);
@@ -120,84 +524,110 @@ void Ndb_cluster_connection::connect_thread()
DBUG_VOID_RETURN;
}
-int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void))
-{
- int r;
- DBUG_ENTER("Ndb_cluster_connection::start_connect_thread");
- m_connect_callback= connect_callback;
- if ((r = connect(0,0,0)) == 1)
- {
- DBUG_PRINT("info",("starting thread"));
- m_connect_thread=
- NdbThread_Create(run_ndb_cluster_connection_connect_thread,
- (void**)this, 32768, "ndb_cluster_connection",
- NDB_THREAD_PRIO_LOW);
- }
- else if (r < 0)
- {
- DBUG_RETURN(-1);
- }
- else if (m_connect_callback)
- {
- (*m_connect_callback)();
- }
- DBUG_RETURN(0);
-}
+/*
+ * Hint handling to select node
+ * ToDo: fix this
+ */
-int Ndb_cluster_connection::connect(int no_retries, int retry_delay_in_seconds, int verbose)
+void
+Ndb_cluster_connection_impl::FragmentToNodeMap::init(Uint32 noOfNodes,
+ Uint8 nodeIds[])
{
- DBUG_ENTER("Ndb_cluster_connection::connect");
- const char* error = 0;
- do {
- if (m_config_retriever == 0)
- DBUG_RETURN(-1);
- if (m_config_retriever->do_connect(no_retries,retry_delay_in_seconds,verbose))
- DBUG_RETURN(1); // mgmt server not up yet
+ kValue = 6;
+ noOfFragments = 2 * noOfNodes;
- Uint32 nodeId = m_config_retriever->allocNodeId(4/*retries*/,3/*delay*/);
- if(nodeId == 0)
- break;
- ndb_mgm_configuration * props = m_config_retriever->getConfig();
- if(props == 0)
- break;
- m_facade->start_instance(nodeId, props);
- ndb_mgm_destroy_configuration(props);
- m_facade->connected();
- DBUG_RETURN(0);
- } while(0);
+ /**
+ * Compute hashValueMask and hashpointerValue
+ */
+ {
+ Uint32 topBit = (1 << 31);
+ for(int i = 31; i>=0; i--){
+ if((noOfFragments & topBit) != 0)
+ break;
+ topBit >>= 1;
+ }
+ hashValueMask = topBit - 1;
+ hashpointerValue = noOfFragments - (hashValueMask + 1);
+ }
- ndbout << "Configuration error: ";
- const char* erString = m_config_retriever->getErrorString();
- if (erString == 0) {
- erString = "No error specified!";
- }
- ndbout << erString << endl;
- DBUG_RETURN(-1);
-}
-
-Ndb_cluster_connection::~Ndb_cluster_connection()
-{
- DBUG_ENTER("~Ndb_cluster_connection");
- DBUG_PRINT("enter",("~Ndb_cluster_connection this=0x%x", this));
- TransporterFacade::stop_instance();
- if (m_connect_thread)
+ /**
+ * This initialization depends on
+ * the fact that:
+ * primary node for fragment i = i % noOfNodes
+ *
+ * This algorithm should be implemented in Dbdih
+ */
{
- void *status;
- g_run_connect_thread= 0;
- NdbThread_WaitFor(m_connect_thread, &status);
- NdbThread_Destroy(&m_connect_thread);
- m_connect_thread= 0;
- }
- if (m_facade != 0)
- {
- delete m_facade;
- if (m_facade != TransporterFacade::theFacadeInstance)
+ if (fragment2PrimaryNodeMap != 0)
abort();
- TransporterFacade::theFacadeInstance= 0;
+
+ fragment2PrimaryNodeMap = new Uint32[noOfFragments];
+ Uint32 i;
+ for(i = 0; i fragment2PrimaryNodeMap[j]){
+ Uint32 tmp = fragment2PrimaryNodeMap[i];
+ fragment2PrimaryNodeMap[i] = fragment2PrimaryNodeMap[j];
+ fragment2PrimaryNodeMap[j] = tmp;
+ }
+
+ for(i = 0; i> 2; // In words
+ const char * usedKeyData = 0;
+
+ /**
+ * If key data buffer is not aligned (on 64 bit boundary)
+ * or key len is not a multiple of 4
+ * Use temp data
+ */
+ if(((((UintPtr)keyData) & 7) == 0) && ((keyLen & 3) == 0)) {
+ usedKeyData = keyData;
+ } else {
+ memcpy(&tempData[0], keyData, keyLen);
+ const int slack = keyLen & 3;
+ if(slack > 0) {
+ memset(&((char *)&tempData[0])[keyLen], 0, (4 - slack));
+ }//if
+ usedKeyData = (char *)&tempData[0];
+ }//if
+
+ Uint32 hashValue = md5_hash((Uint64 *)usedKeyData, usedKeyLen);
+
+ hashValue >>= fragmentToNodeMap.kValue;
+
+ Uint32 fragmentId = hashValue &
+ fragmentToNodeMap.hashValueMask;
+
+ if(fragmentId < fragmentToNodeMap.hashpointerValue) {
+ fragmentId = hashValue &
+ ((fragmentToNodeMap.hashValueMask << 1) + 1);
+ }//if
+ return fragmentId;
}
+template class Vector;
+
diff --git a/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp b/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp
new file mode 100644
index 00000000000..620eac296a3
--- /dev/null
+++ b/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp
@@ -0,0 +1,100 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+#ifndef CLUSTER_CONNECTION_IMPL_HPP
+#define CLUSTER_CONNECTION_IMPL_HPP
+
+#include
+#include
+
+class TransporterFacade;
+class ConfigRetriever;
+class NdbThread;
+class ndb_mgm_configuration;
+
+struct Ndb_cluster_connection_node_iter {
+ Ndb_cluster_connection_node_iter() : scan_state(~0),
+ init_pos(0),
+ cur_pos(0) {};
+ Uint8 scan_state;
+ Uint8 init_pos;
+ Uint8 cur_pos;
+};
+
+extern "C" {
+ void* run_ndb_cluster_connection_connect_thread(void*);
+}
+
+class Ndb_cluster_connection_impl : public Ndb_cluster_connection
+{
+ Ndb_cluster_connection_impl(const char *connectstring);
+ ~Ndb_cluster_connection_impl();
+
+ void do_test();
+
+ void init_get_next_node(Ndb_cluster_connection_node_iter &iter);
+ Uint32 get_next_node(Ndb_cluster_connection_node_iter &iter);
+
+private:
+ friend class Ndb;
+ friend class NdbImpl;
+ friend void* run_ndb_cluster_connection_connect_thread(void*);
+ friend class Ndb_cluster_connection;
+
+ /**
+ * Structure containing values for guessing primary node
+ */
+ struct FragmentToNodeMap {
+ FragmentToNodeMap():
+ fragment2PrimaryNodeMap(0) {};
+ Uint32 kValue;
+ Uint32 hashValueMask;
+ Uint32 hashpointerValue;
+ Uint32 noOfFragments;
+ Uint32 *fragment2PrimaryNodeMap;
+
+ void init(Uint32 noOfNodes, Uint8 nodeIds[]);
+ void release();
+ } fragmentToNodeMap;
+
+ struct Node
+ {
+ Node(Uint32 _g= 0, Uint32 _id= 0) : this_group(0),
+ next_group(0),
+ group(_g),
+ id(_id) {};
+ Uint32 this_group;
+ Uint32 next_group;
+ Uint32 group;
+ Uint32 id;
+ };
+
+ Vector m_all_nodes;
+ void init_nodes_vector(Uint32 nodeid, const ndb_mgm_configuration &config);
+ Uint32 guess_primary_node(const char * keyData, Uint32 keyLen);
+
+ void connect_thread();
+
+ TransporterFacade *m_transporter_facade;
+ ConfigRetriever *m_config_retriever;
+ NdbThread *m_connect_thread;
+ int (*m_connect_callback)(void);
+
+ int m_optimized_node_selection;
+};
+
+#endif
diff --git a/ndb/test/ndbapi/testNdbApi.cpp b/ndb/test/ndbapi/testNdbApi.cpp
index a1ebac609b6..69e534e6860 100644
--- a/ndb/test/ndbapi/testNdbApi.cpp
+++ b/ndb/test/ndbapi/testNdbApi.cpp
@@ -142,14 +142,22 @@ int runTestMaxTransaction(NDBT_Context* ctx, NDBT_Step* step){
4);
break;
case 2:
+ ndbout_c("startTransactionDGroup not supported");
+ abort();
+ /*
pCon = pNdb->startTransactionDGroup(1,
"TEST",
0);
+ */
break;
case 3:
+ ndbout_c("startTransactionDGroup not supported");
+ abort();
+ /*
pCon = pNdb->startTransactionDGroup(2,
"TEST",
1);
+ */
break;
default:
diff --git a/ndb/tools/delete_all.cpp b/ndb/tools/delete_all.cpp
index 046ac8005d2..cdfaf2134ff 100644
--- a/ndb/tools/delete_all.cpp
+++ b/ndb/tools/delete_all.cpp
@@ -24,7 +24,11 @@
static int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab, int parallelism=240);
-static const char* opt_connect_str= 0;
+enum ndb_delete_all {
+ NDB_STD_OPTS_OPTIONS
+};
+NDB_STD_OPTS_VARS;
+
static const char* _dbname = "TEST_DB";
static struct my_option my_long_options[] =
{
diff --git a/ndb/tools/desc.cpp b/ndb/tools/desc.cpp
index c5e9efdfa8a..4bca51ee903 100644
--- a/ndb/tools/desc.cpp
+++ b/ndb/tools/desc.cpp
@@ -19,7 +19,11 @@
#include
#include
-static const char* opt_connect_str= 0;
+enum ndb_desc_options {
+ NDB_STD_OPTS_OPTIONS
+};
+NDB_STD_OPTS_VARS;
+
static const char* _dbname = "TEST_DB";
static int _unqualified = 0;
static struct my_option my_long_options[] =
diff --git a/ndb/tools/drop_index.cpp b/ndb/tools/drop_index.cpp
index 6600811e0c4..2b7f8c1bce9 100644
--- a/ndb/tools/drop_index.cpp
+++ b/ndb/tools/drop_index.cpp
@@ -21,7 +21,11 @@
#include
#include
-static const char* opt_connect_str= 0;
+enum ndb_drop_index_options {
+ NDB_STD_OPTS_OPTIONS
+};
+NDB_STD_OPTS_VARS;
+
static const char* _dbname = "TEST_DB";
static struct my_option my_long_options[] =
{
diff --git a/ndb/tools/drop_tab.cpp b/ndb/tools/drop_tab.cpp
index 0661a8c599b..2b0b6908449 100644
--- a/ndb/tools/drop_tab.cpp
+++ b/ndb/tools/drop_tab.cpp
@@ -21,7 +21,11 @@
#include
#include
-static const char* opt_connect_str= 0;
+enum ndb_drop_table_options {
+ NDB_STD_OPTS_OPTIONS
+};
+NDB_STD_OPTS_VARS;
+
static const char* _dbname = "TEST_DB";
static struct my_option my_long_options[] =
{
diff --git a/ndb/tools/listTables.cpp b/ndb/tools/listTables.cpp
index ccb6967e2dc..710af66f4de 100644
--- a/ndb/tools/listTables.cpp
+++ b/ndb/tools/listTables.cpp
@@ -161,13 +161,17 @@ list(const char * tabname,
}
}
-static const char* opt_connect_str= 0;
+enum ndb_show_tables_options {
+ NDB_STD_OPTS_OPTIONS
+};
+NDB_STD_OPTS_VARS;
+
static const char* _dbname = "TEST_DB";
static int _loops;
static int _type;
static struct my_option my_long_options[] =
{
- NDB_STD_OPTS("ndb_desc"),
+ NDB_STD_OPTS("ndb_show_tables"),
{ "database", 'd', "Name of database table is in",
(gptr*) &_dbname, (gptr*) &_dbname, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
diff --git a/ndb/tools/restore/restore_main.cpp b/ndb/tools/restore/restore_main.cpp
index ece2b2605b4..c24ed620b71 100644
--- a/ndb/tools/restore/restore_main.cpp
+++ b/ndb/tools/restore/restore_main.cpp
@@ -36,7 +36,10 @@ static Vector g_consumers;
static const char* ga_backupPath = "." DIR_SEPARATOR;
-static const char* opt_connect_str= NULL;
+enum ndb_restore_options {
+ NDB_STD_OPTS_OPTIONS
+};
+NDB_STD_OPTS_VARS;
/**
* print and restore flags
diff --git a/ndb/tools/select_all.cpp b/ndb/tools/select_all.cpp
index 5efeed485a4..9c65750094b 100644
--- a/ndb/tools/select_all.cpp
+++ b/ndb/tools/select_all.cpp
@@ -36,7 +36,11 @@ int scanReadRecords(Ndb*,
char delim,
bool orderby);
-static const char* opt_connect_str= 0;
+enum ndb_select_all_options {
+ NDB_STD_OPTS_OPTIONS
+};
+NDB_STD_OPTS_VARS;
+
static const char* _dbname = "TEST_DB";
static const char* _delimiter = "\t";
static int _unqualified, _header, _parallelism, _useHexFormat, _lock,
diff --git a/ndb/tools/select_count.cpp b/ndb/tools/select_count.cpp
index c3491f842d8..516eebda91d 100644
--- a/ndb/tools/select_count.cpp
+++ b/ndb/tools/select_count.cpp
@@ -32,7 +32,11 @@ select_count(Ndb* pNdb, const NdbDictionary::Table* pTab,
int* count_rows,
UtilTransactions::ScanLock lock);
-static const char* opt_connect_str= 0;
+enum ndb_select_count_options {
+ NDB_STD_OPTS_OPTIONS
+};
+NDB_STD_OPTS_VARS;
+
static const char* _dbname = "TEST_DB";
static int _parallelism = 240;
static int _lock = 0;
diff --git a/ndb/tools/waiter.cpp b/ndb/tools/waiter.cpp
index 5973b046f8f..4b86de36514 100644
--- a/ndb/tools/waiter.cpp
+++ b/ndb/tools/waiter.cpp
@@ -30,7 +30,11 @@ int
waitClusterStatus(const char* _addr, ndb_mgm_node_status _status,
unsigned int _timeout);
-static const char* opt_connect_str= 0;
+enum ndb_waiter_options {
+ NDB_STD_OPTS_OPTIONS
+};
+NDB_STD_OPTS_VARS;
+
static int _no_contact = 0;
static int _timeout = 120;
static struct my_option my_long_options[] =
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 029fe31ecf7..eb201ee6ef5 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -32,6 +32,10 @@
#include
#include
+// options from from mysqld.cc
+extern my_bool opt_ndb_optimized_node_selection;
+extern const char *opt_ndbcluster_connectstring;
+
// Default value for parallelism
static const int parallelism= 240;
@@ -39,9 +43,6 @@ static const int parallelism= 240;
// createable against NDB from this handler
static const int max_transactions= 256;
-// connectstring to cluster if given by mysqld
-const char *ndbcluster_connectstring= 0;
-
static const char *ha_ndb_ext=".ndb";
#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
@@ -4233,15 +4234,19 @@ bool ndbcluster_init()
int res;
DBUG_ENTER("ndbcluster_init");
// Set connectstring if specified
- if (ndbcluster_connectstring != 0)
- DBUG_PRINT("connectstring", ("%s", ndbcluster_connectstring));
+ if (opt_ndbcluster_connectstring != 0)
+ DBUG_PRINT("connectstring", ("%s", opt_ndbcluster_connectstring));
if ((g_ndb_cluster_connection=
- new Ndb_cluster_connection(ndbcluster_connectstring)) == 0)
+ new Ndb_cluster_connection(opt_ndbcluster_connectstring)) == 0)
{
- DBUG_PRINT("error",("Ndb_cluster_connection(%s)",ndbcluster_connectstring));
+ DBUG_PRINT("error",("Ndb_cluster_connection(%s)",
+ opt_ndbcluster_connectstring));
goto ndbcluster_init_error;
}
+ g_ndb_cluster_connection->set_optimized_node_selection
+ (opt_ndb_optimized_node_selection);
+
// Create a Ndb object to open the connection to NDB
g_ndb= new Ndb(g_ndb_cluster_connection, "sys");
g_ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_table_local_info));
@@ -4256,7 +4261,7 @@ bool ndbcluster_init()
DBUG_PRINT("info",("NDBCLUSTER storage engine at %s on port %d",
g_ndb_cluster_connection->get_connected_host(),
g_ndb_cluster_connection->get_connected_port()));
- g_ndb->waitUntilReady(10);
+ g_ndb_cluster_connection->wait_until_ready(10,0);
}
else if(res == 1)
{
diff --git a/sql/item_func.cc b/sql/item_func.cc
index 3c565d0c466..7125f4704b8 100644
--- a/sql/item_func.cc
+++ b/sql/item_func.cc
@@ -775,9 +775,25 @@ longlong Item_func_neg::val_int()
void Item_func_neg::fix_length_and_dec()
{
+ enum Item_result arg_result= args[0]->result_type();
+ enum Item::Type arg_type= args[0]->type();
decimals=args[0]->decimals;
max_length=args[0]->max_length;
hybrid_type= REAL_RESULT;
+
+ /*
+ We need to account for added '-' in the following cases:
+ A) argument is a real or integer positive constant - in this case
+ argument's max_length is set to actual number of bytes occupied, and not
+ maximum number of bytes real or integer may require. Note that all
+ constants are non negative so we don't need to account for removed '-'.
+ B) argument returns a string.
+ */
+ if (arg_result == STRING_RESULT ||
+ (arg_type == REAL_ITEM && ((Item_real*)args[0])->value >= 0) ||
+ (arg_type == INT_ITEM && ((Item_int*)args[0])->value > 0))
+ max_length++;
+
if (args[0]->result_type() == INT_RESULT)
{
/*
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index e39c902444e..5cd85113641 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -53,6 +53,11 @@
#endif
#ifdef HAVE_NDBCLUSTER_DB
#define OPT_NDBCLUSTER_DEFAULT 0
+#ifdef NDB_SHM_TRANSPORTER
+#define OPT_NDB_SHM_DEFAULT 1
+#else
+#define OPT_NDB_SHM_DEFAULT 0
+#endif
#else
#define OPT_NDBCLUSTER_DEFAULT 0
#endif
@@ -285,6 +290,10 @@ my_bool opt_safe_user_create = 0, opt_no_mix_types = 0;
my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0;
my_bool opt_log_slave_updates= 0;
my_bool opt_console= 0, opt_bdb, opt_innodb, opt_isam, opt_ndbcluster;
+#ifdef HAVE_NDBCLUSTER_DB
+const char *opt_ndbcluster_connectstring= 0;
+my_bool opt_ndb_shm, opt_ndb_optimized_node_selection;
+#endif
my_bool opt_readonly, use_temp_pool, relay_log_purge;
my_bool opt_sync_bdb_logs, opt_sync_frm;
my_bool opt_secure_auth= 0;
@@ -3998,6 +4007,7 @@ enum options_mysqld
OPT_INNODB, OPT_ISAM,
OPT_NDBCLUSTER, OPT_NDB_CONNECTSTRING, OPT_NDB_USE_EXACT_COUNT,
OPT_NDB_FORCE_SEND, OPT_NDB_AUTOINCREMENT_PREFETCH_SZ,
+ OPT_NDB_SHM, OPT_NDB_OPTIMIZED_NODE_SELECTION,
OPT_SKIP_SAFEMALLOC,
OPT_TEMP_POOL, OPT_TX_ISOLATION,
OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS,
@@ -4439,24 +4449,46 @@ Disable with --skip-ndbcluster (will save memory).",
#ifdef HAVE_NDBCLUSTER_DB
{"ndb-connectstring", OPT_NDB_CONNECTSTRING,
"Connect string for ndbcluster.",
- (gptr*) &ndbcluster_connectstring, (gptr*) &ndbcluster_connectstring,
+ (gptr*) &opt_ndbcluster_connectstring,
+ (gptr*) &opt_ndbcluster_connectstring,
0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
- {"ndb_autoincrement_prefetch_sz", OPT_NDB_AUTOINCREMENT_PREFETCH_SZ,
- "Specify number of autoincrement values that are prefetched",
+ {"ndb-autoincrement-prefetch-sz", OPT_NDB_AUTOINCREMENT_PREFETCH_SZ,
+ "Specify number of autoincrement values that are prefetched.",
(gptr*) &global_system_variables.ndb_autoincrement_prefetch_sz,
(gptr*) &global_system_variables.ndb_autoincrement_prefetch_sz,
0, GET_INT, REQUIRED_ARG, 32, 1, 256, 0, 0, 0},
- {"ndb_force_send", OPT_NDB_FORCE_SEND,
- "Force send of buffers to ndb immediately without waiting for other threads",
+ {"ndb-force-send", OPT_NDB_FORCE_SEND,
+ "Force send of buffers to ndb immediately without waiting for "
+ "other threads.",
(gptr*) &global_system_variables.ndb_force_send,
(gptr*) &global_system_variables.ndb_force_send,
0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
+ {"ndb_force_send", OPT_NDB_FORCE_SEND,
+ "same as --ndb-force-send.",
+ (gptr*) &global_system_variables.ndb_force_send,
+ (gptr*) &global_system_variables.ndb_force_send,
+ 0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
+ {"ndb-use-exact-count", OPT_NDB_USE_EXACT_COUNT,
+ "Use exact records count during query planning and for fast "
+ "select count(*), disable for faster queries.",
+ (gptr*) &global_system_variables.ndb_use_exact_count,
+ (gptr*) &global_system_variables.ndb_use_exact_count,
+ 0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
{"ndb_use_exact_count", OPT_NDB_USE_EXACT_COUNT,
- "Use exact records count during query planning and for "
- "fast select count(*)",
+ "same as --ndb-use-exact-count.",
(gptr*) &global_system_variables.ndb_use_exact_count,
(gptr*) &global_system_variables.ndb_use_exact_count,
0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
+ {"ndb-shm", OPT_NDB_SHM,
+ "Use shared memory connections when available.",
+ (gptr*) &opt_ndb_shm,
+ (gptr*) &opt_ndb_shm,
+ 0, GET_BOOL, OPT_ARG, OPT_NDB_SHM_DEFAULT, 0, 0, 0, 0, 0},
+ {"ndb-optimized-node-selection", OPT_NDB_OPTIMIZED_NODE_SELECTION,
+ "Select nodes for transactions in a more optimal way.",
+ (gptr*) &opt_ndb_optimized_node_selection,
+ (gptr*) &opt_ndb_optimized_node_selection,
+ 0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
#endif
{"new", 'n', "Use very new possible 'unsafe' functions.",
(gptr*) &global_system_variables.new_mode,