mirror of
https://github.com/MariaDB/server.git
synced 2025-11-15 09:02:33 +03:00
Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.1-new
into poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-new-ndb
This commit is contained in:
@@ -18,6 +18,7 @@ select * from t2;
|
||||
ERROR 42S02: Table 'test.t2' doesn't exist
|
||||
show tables like 't2';
|
||||
Tables_in_test (t2)
|
||||
reset master;
|
||||
create table t2 (a int key) engine=ndbcluster;
|
||||
insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
|
||||
select * from t2 order by a limit 3;
|
||||
@@ -30,10 +31,12 @@ a
|
||||
1
|
||||
2
|
||||
3
|
||||
reset master;
|
||||
select * from t2;
|
||||
ERROR 42S02: Table 'test.t2' doesn't exist
|
||||
show tables like 't2';
|
||||
Tables_in_test (t2)
|
||||
reset master;
|
||||
create table t2 (a int key) engine=ndbcluster;
|
||||
insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
|
||||
select * from t2 order by a limit 3;
|
||||
@@ -46,4 +49,5 @@ a
|
||||
1
|
||||
2
|
||||
3
|
||||
reset master;
|
||||
drop table t2;
|
||||
|
||||
@@ -16,7 +16,7 @@ events_scheduling : BUG#19170 2006-04-26 andrey Test case of 19170 fails
|
||||
events_logs_tests : BUG#17619 2006-05-16 andrey Test case problems
|
||||
ndb_autodiscover : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog
|
||||
ndb_autodiscover2 : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog
|
||||
ndb_binlog_discover : BUG#19395 2006-04-28 tomas/knielsen mysqld does not always detect cluster shutdown
|
||||
#ndb_binlog_discover : BUG#19395 2006-04-28 tomas/knielsen mysqld does not always detect cluster shutdown
|
||||
#ndb_cache2 : BUG#18597 2006-03-28 brian simultaneous drop table and ndb statistics update triggers node failure
|
||||
#ndb_cache_multi2 : BUG#18597 2006-04-10 kent simultaneous drop table and ndb statistics update triggers node failure
|
||||
ndb_load : BUG#17233 2006-05-04 tomas failed load data from infile causes mysqld dbug_assert, binlog not flushed
|
||||
|
||||
@@ -43,6 +43,7 @@ select * from t2 order by a limit 3;
|
||||
--error ER_NO_SUCH_TABLE
|
||||
select * from t2;
|
||||
show tables like 't2';
|
||||
reset master;
|
||||
create table t2 (a int key) engine=ndbcluster;
|
||||
insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
|
||||
select * from t2 order by a limit 3;
|
||||
@@ -50,6 +51,7 @@ select * from t2 order by a limit 3;
|
||||
# server 1 should have a stale cache, and in this case wrong frm, transaction must be retried
|
||||
--connection server1
|
||||
select * from t2 order by a limit 3;
|
||||
reset master;
|
||||
|
||||
--exec $NDB_MGM --no-defaults -e "all restart -i" >> $NDB_TOOLS_OUTPUT
|
||||
--exec $NDB_TOOLS_DIR/ndb_waiter --no-defaults >> $NDB_TOOLS_OUTPUT
|
||||
@@ -60,6 +62,7 @@ select * from t2 order by a limit 3;
|
||||
--error ER_NO_SUCH_TABLE
|
||||
select * from t2;
|
||||
show tables like 't2';
|
||||
reset master;
|
||||
create table t2 (a int key) engine=ndbcluster;
|
||||
insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
|
||||
select * from t2 order by a limit 3;
|
||||
@@ -67,6 +70,7 @@ select * from t2 order by a limit 3;
|
||||
# server 2 should have a stale cache, but with right frm, transaction need not be retried
|
||||
--connection server2
|
||||
select * from t2 order by a limit 3;
|
||||
reset master;
|
||||
|
||||
drop table t2;
|
||||
# End of 4.1 tests
|
||||
|
||||
@@ -182,6 +182,8 @@ static const char * ndb_connected_host= 0;
|
||||
static long ndb_connected_port= 0;
|
||||
static long ndb_number_of_replicas= 0;
|
||||
long ndb_number_of_storage_nodes= 0;
|
||||
long ndb_number_of_ready_storage_nodes= 0;
|
||||
long ndb_connect_count= 0;
|
||||
|
||||
static int update_status_variables(Ndb_cluster_connection *c)
|
||||
{
|
||||
@@ -190,6 +192,8 @@ static int update_status_variables(Ndb_cluster_connection *c)
|
||||
ndb_connected_host= c->get_connected_host();
|
||||
ndb_number_of_replicas= 0;
|
||||
ndb_number_of_storage_nodes= c->no_db_nodes();
|
||||
ndb_number_of_ready_storage_nodes= c->get_no_ready();
|
||||
ndb_connect_count= c->get_connect_count();
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -7128,10 +7132,6 @@ void ndbcluster_real_free_share(NDB_SHARE **share)
|
||||
#ifndef DBUG_OFF
|
||||
bzero((gptr)(*share)->table_share, sizeof(*(*share)->table_share));
|
||||
bzero((gptr)(*share)->table, sizeof(*(*share)->table));
|
||||
#endif
|
||||
my_free((gptr) (*share)->table_share, MYF(0));
|
||||
my_free((gptr) (*share)->table, MYF(0));
|
||||
#ifndef DBUG_OFF
|
||||
(*share)->table_share= 0;
|
||||
(*share)->table= 0;
|
||||
#endif
|
||||
@@ -9361,11 +9361,15 @@ ndbcluster_show_status(THD* thd, stat_print_fn *stat_print,
|
||||
"cluster_node_id=%u, "
|
||||
"connected_host=%s, "
|
||||
"connected_port=%u, "
|
||||
"number_of_storage_nodes=%u",
|
||||
"number_of_storage_nodes=%u, "
|
||||
"number_of_ready_storage_nodes=%u, "
|
||||
"connect_count=%u",
|
||||
ndb_cluster_node_id,
|
||||
ndb_connected_host,
|
||||
ndb_connected_port,
|
||||
ndb_number_of_storage_nodes);
|
||||
ndb_number_of_storage_nodes,
|
||||
ndb_number_of_ready_storage_nodes,
|
||||
ndb_connect_count);
|
||||
if (stat_print(thd, ndbcluster_hton.name, strlen(ndbcluster_hton.name),
|
||||
"connection", strlen("connection"),
|
||||
buf, buflen))
|
||||
|
||||
@@ -113,6 +113,7 @@ typedef struct st_ndbcluster_share {
|
||||
char *old_names; // for rename table
|
||||
TABLE_SHARE *table_share;
|
||||
TABLE *table;
|
||||
byte *record[2]; // pointer to allocated records for receiving data
|
||||
NdbValue *ndb_value[2];
|
||||
MY_BITMAP *subscriber_bitmap;
|
||||
#endif
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
#include "slave.h"
|
||||
#include "ha_ndbcluster_binlog.h"
|
||||
#include "NdbDictionary.hpp"
|
||||
#include <util/NdbAutoPtr.hpp>
|
||||
|
||||
#ifdef ndb_dynamite
|
||||
#undef assert
|
||||
@@ -265,7 +266,8 @@ ndbcluster_binlog_close_table(THD *thd, NDB_SHARE *share)
|
||||
|
||||
static int
|
||||
ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
|
||||
TABLE_SHARE *table_share, TABLE *table)
|
||||
TABLE_SHARE *table_share, TABLE *table,
|
||||
int reopen)
|
||||
{
|
||||
int error;
|
||||
DBUG_ENTER("ndbcluster_binlog_open_table");
|
||||
@@ -278,27 +280,34 @@ ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
|
||||
share->key, error);
|
||||
DBUG_PRINT("error", ("open_table_def failed %d", error));
|
||||
free_table_share(table_share);
|
||||
my_free((gptr) table_share, MYF(0));
|
||||
my_free((gptr) table, MYF(0));
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
if ((error= open_table_from_share(thd, table_share, "", 0,
|
||||
if ((error= open_table_from_share(thd, table_share, "", 0 /* fon't allocate buffers */,
|
||||
(uint) READ_ALL, 0, table, FALSE)))
|
||||
{
|
||||
sql_print_error("Unable to open table for %s, error=%d(%d)",
|
||||
share->key, error, my_errno);
|
||||
DBUG_PRINT("error", ("open_table_from_share failed %d", error));
|
||||
free_table_share(table_share);
|
||||
my_free((gptr) table_share, MYF(0));
|
||||
my_free((gptr) table, MYF(0));
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
assign_new_table_id(table_share);
|
||||
if (!table->record[1] || table->record[1] == table->record[0])
|
||||
|
||||
if (!reopen)
|
||||
{
|
||||
table->record[1]= alloc_root(&table->mem_root,
|
||||
table->s->rec_buff_length);
|
||||
// allocate memory on ndb share so it can be reused after online alter table
|
||||
share->record[0]= (byte*) alloc_root(&share->mem_root, table->s->rec_buff_length);
|
||||
share->record[1]= (byte*) alloc_root(&share->mem_root, table->s->rec_buff_length);
|
||||
}
|
||||
{
|
||||
my_ptrdiff_t row_offset= share->record[0] - table->record[0];
|
||||
Field **p_field;
|
||||
for (p_field= table->field; *p_field; p_field++)
|
||||
(*p_field)->move_field_offset(row_offset);
|
||||
table->record[0]= share->record[0];
|
||||
table->record[1]= share->record[1];
|
||||
}
|
||||
|
||||
table->in_use= injector_thd;
|
||||
|
||||
table->s->db.str= share->db;
|
||||
@@ -366,10 +375,9 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
|
||||
while (1)
|
||||
{
|
||||
int error;
|
||||
TABLE_SHARE *table_share=
|
||||
(TABLE_SHARE *) my_malloc(sizeof(*table_share), MYF(MY_WME));
|
||||
TABLE *table= (TABLE*) my_malloc(sizeof(*table), MYF(MY_WME));
|
||||
if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table)))
|
||||
TABLE_SHARE *table_share= (TABLE_SHARE *) alloc_root(mem_root, sizeof(*table_share));
|
||||
TABLE *table= (TABLE*) alloc_root(mem_root, sizeof(*table));
|
||||
if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table, 0)))
|
||||
break;
|
||||
/*
|
||||
! do not touch the contents of the table
|
||||
@@ -1535,6 +1543,10 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
|
||||
sql_print_information("NDB: Failed write frm for %s.%s, error %d",
|
||||
dbname, tabname, error);
|
||||
}
|
||||
|
||||
// copy names as memory will be freed
|
||||
NdbAutoPtr<char> a1((char *)(dbname= strdup(dbname)));
|
||||
NdbAutoPtr<char> a2((char *)(tabname= strdup(tabname)));
|
||||
ndbcluster_binlog_close_table(thd, share);
|
||||
|
||||
TABLE_LIST table_list;
|
||||
@@ -1544,9 +1556,15 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
|
||||
close_cached_tables(thd, 0, &table_list, TRUE);
|
||||
|
||||
if ((error= ndbcluster_binlog_open_table(thd, share,
|
||||
table_share, table)))
|
||||
table_share, table, 1)))
|
||||
sql_print_information("NDB: Failed to re-open table %s.%s",
|
||||
dbname, tabname);
|
||||
|
||||
table= share->table;
|
||||
table_share= share->table_share;
|
||||
dbname= table_share->db.str;
|
||||
tabname= table_share->table_name.str;
|
||||
|
||||
pthread_mutex_unlock(&LOCK_open);
|
||||
}
|
||||
my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
|
||||
@@ -1776,7 +1794,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
|
||||
break;
|
||||
case NDBEVENT::TE_CLUSTER_FAILURE:
|
||||
if (ndb_extra_logging)
|
||||
sql_print_information("NDB Binlog: cluster failure for %s.", schema_share->key);
|
||||
sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.",
|
||||
schema_share->key, (unsigned) pOp->getGCI());
|
||||
// fall through
|
||||
case NDBEVENT::TE_DROP:
|
||||
if (ndb_extra_logging &&
|
||||
@@ -1785,7 +1804,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
|
||||
"read only on reconnect.");
|
||||
free_share(&schema_share);
|
||||
schema_share= 0;
|
||||
ndb_binlog_tables_inited= FALSE;
|
||||
close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, FALSE);
|
||||
// fall through
|
||||
case NDBEVENT::TE_ALTER:
|
||||
@@ -2829,7 +2847,8 @@ ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
|
||||
{
|
||||
case NDBEVENT::TE_CLUSTER_FAILURE:
|
||||
if (ndb_extra_logging)
|
||||
sql_print_information("NDB Binlog: cluster failure for %s.", share->key);
|
||||
sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.",
|
||||
share->key, (unsigned) pOp->getGCI());
|
||||
if (apply_status_share == share)
|
||||
{
|
||||
if (ndb_extra_logging &&
|
||||
@@ -2838,7 +2857,6 @@ ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
|
||||
"read only on reconnect.");
|
||||
free_share(&apply_status_share);
|
||||
apply_status_share= 0;
|
||||
ndb_binlog_tables_inited= FALSE;
|
||||
}
|
||||
DBUG_PRINT("info", ("CLUSTER FAILURE EVENT: "
|
||||
"%s received share: 0x%lx op: %lx share op: %lx "
|
||||
@@ -2854,7 +2872,6 @@ ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
|
||||
"read only on reconnect.");
|
||||
free_share(&apply_status_share);
|
||||
apply_status_share= 0;
|
||||
ndb_binlog_tables_inited= FALSE;
|
||||
}
|
||||
/* ToDo: remove printout */
|
||||
if (ndb_extra_logging)
|
||||
@@ -3267,46 +3284,43 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
|
||||
pthread_mutex_unlock(&injector_mutex);
|
||||
pthread_cond_signal(&injector_cond);
|
||||
|
||||
thd->proc_info= "Waiting for ndbcluster to start";
|
||||
|
||||
pthread_mutex_lock(&injector_mutex);
|
||||
while (!schema_share ||
|
||||
(ndb_binlog_running && !apply_status_share))
|
||||
{
|
||||
/* ndb not connected yet */
|
||||
struct timespec abstime;
|
||||
set_timespec(abstime, 1);
|
||||
pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
|
||||
if (abort_loop)
|
||||
{
|
||||
pthread_mutex_unlock(&injector_mutex);
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&injector_mutex);
|
||||
|
||||
restart:
|
||||
/*
|
||||
Main NDB Injector loop
|
||||
*/
|
||||
{
|
||||
thd->proc_info= "Waiting for ndbcluster to start";
|
||||
|
||||
DBUG_ASSERT(ndbcluster_hton.slot != ~(uint)0);
|
||||
if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
|
||||
{
|
||||
sql_print_error("Could not allocate Thd_ndb object");
|
||||
goto err;
|
||||
}
|
||||
set_thd_ndb(thd, thd_ndb);
|
||||
thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
|
||||
thd->query_id= 0; // to keep valgrind quiet
|
||||
{
|
||||
static char db[]= "";
|
||||
thd->db= db;
|
||||
if (ndb_binlog_running)
|
||||
open_binlog_index(thd, &binlog_tables, &binlog_index);
|
||||
thd->db= db;
|
||||
pthread_mutex_lock(&injector_mutex);
|
||||
while (!schema_share ||
|
||||
(ndb_binlog_running && !apply_status_share))
|
||||
{
|
||||
/* ndb not connected yet */
|
||||
struct timespec abstime;
|
||||
set_timespec(abstime, 1);
|
||||
pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
|
||||
if (abort_loop)
|
||||
{
|
||||
pthread_mutex_unlock(&injector_mutex);
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&injector_mutex);
|
||||
|
||||
if (thd_ndb == NULL)
|
||||
{
|
||||
DBUG_ASSERT(ndbcluster_hton.slot != ~(uint)0);
|
||||
if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
|
||||
{
|
||||
sql_print_error("Could not allocate Thd_ndb object");
|
||||
goto err;
|
||||
}
|
||||
set_thd_ndb(thd, thd_ndb);
|
||||
thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
|
||||
thd->query_id= 0; // to keep valgrind quiet
|
||||
}
|
||||
}
|
||||
|
||||
restart:
|
||||
{
|
||||
// wait for the first event
|
||||
thd->proc_info= "Waiting for first event from ndbcluster";
|
||||
@@ -3321,6 +3335,9 @@ restart:
|
||||
DBUG_PRINT("info", ("schema_res: %d schema_gci: %d", schema_res, schema_gci));
|
||||
if (schema_res > 0)
|
||||
{
|
||||
i_ndb->pollEvents(0);
|
||||
i_ndb->flushIncompleteEvents(schema_gci);
|
||||
s_ndb->flushIncompleteEvents(schema_gci);
|
||||
if (schema_gci < ndb_latest_handled_binlog_epoch)
|
||||
{
|
||||
sql_print_error("NDB Binlog: cluster has been restarted --initial or with older filesystem. "
|
||||
@@ -3334,7 +3351,13 @@ restart:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
static char db[]= "";
|
||||
thd->db= db;
|
||||
if (ndb_binlog_running)
|
||||
open_binlog_index(thd, &binlog_tables, &binlog_index);
|
||||
thd->db= db;
|
||||
}
|
||||
do_ndbcluster_binlog_close_connection= BCCC_running;
|
||||
for ( ; !((abort_loop || do_ndbcluster_binlog_close_connection) &&
|
||||
ndb_latest_handled_binlog_epoch >= g_latest_trans_gci) &&
|
||||
@@ -3683,7 +3706,12 @@ restart:
|
||||
ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
|
||||
}
|
||||
if (do_ndbcluster_binlog_close_connection == BCCC_restart)
|
||||
{
|
||||
ndb_binlog_tables_inited= FALSE;
|
||||
close_thread_tables(thd);
|
||||
binlog_index= 0;
|
||||
goto restart;
|
||||
}
|
||||
err:
|
||||
DBUG_PRINT("info",("Shutting down cluster binlog thread"));
|
||||
thd->proc_info= "Shutting down";
|
||||
|
||||
@@ -1262,6 +1262,7 @@ public:
|
||||
|
||||
|
||||
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
|
||||
int flushIncompleteEvents(Uint64 gci);
|
||||
NdbEventOperation *getEventOperation(NdbEventOperation* eventOp= 0);
|
||||
Uint64 getLatestGCI();
|
||||
void forceGCP();
|
||||
|
||||
@@ -1132,7 +1132,8 @@ public:
|
||||
_TE_NODE_FAILURE=10,
|
||||
_TE_SUBSCRIBE=11,
|
||||
_TE_UNSUBSCRIBE=12,
|
||||
_TE_NUL=13 // internal (e.g. INS o DEL within same GCI)
|
||||
_TE_NUL=13, // internal (e.g. INS o DEL within same GCI)
|
||||
_TE_ACTIVE=14 // internal (node becomes active)
|
||||
};
|
||||
#endif
|
||||
/**
|
||||
|
||||
@@ -106,6 +106,7 @@ public:
|
||||
void stopSessions(bool wait = false);
|
||||
|
||||
void foreachSession(void (*f)(Session*, void*), void *data);
|
||||
void checkSessions();
|
||||
|
||||
private:
|
||||
struct SessionInstance {
|
||||
@@ -117,12 +118,13 @@ private:
|
||||
Service * m_service;
|
||||
NDB_SOCKET_TYPE m_socket;
|
||||
};
|
||||
MutexVector<SessionInstance> m_sessions;
|
||||
NdbLockable m_session_mutex;
|
||||
Vector<SessionInstance> m_sessions;
|
||||
MutexVector<ServiceInstance> m_services;
|
||||
unsigned m_maxSessions;
|
||||
|
||||
void doAccept();
|
||||
void checkSessions();
|
||||
void checkSessionsImpl();
|
||||
void startSession(SessionInstance &);
|
||||
|
||||
/**
|
||||
|
||||
@@ -184,9 +184,12 @@ SocketServer::doAccept(){
|
||||
SessionInstance s;
|
||||
s.m_service = si.m_service;
|
||||
s.m_session = si.m_service->newSession(childSock);
|
||||
if(s.m_session != 0){
|
||||
if(s.m_session != 0)
|
||||
{
|
||||
m_session_mutex.lock();
|
||||
m_sessions.push_back(s);
|
||||
startSession(m_sessions.back());
|
||||
m_session_mutex.unlock();
|
||||
}
|
||||
|
||||
continue;
|
||||
@@ -240,10 +243,13 @@ void
|
||||
SocketServer::doRun(){
|
||||
|
||||
while(!m_stopThread){
|
||||
checkSessions();
|
||||
m_session_mutex.lock();
|
||||
checkSessionsImpl();
|
||||
if(m_sessions.size() < m_maxSessions){
|
||||
m_session_mutex.unlock();
|
||||
doAccept();
|
||||
} else {
|
||||
m_session_mutex.unlock();
|
||||
NdbSleep_MilliSleep(200);
|
||||
}
|
||||
}
|
||||
@@ -276,17 +282,30 @@ transfer(NDB_SOCKET_TYPE sock){
|
||||
void
|
||||
SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *), void *data)
|
||||
{
|
||||
m_session_mutex.lock();
|
||||
for(int i = m_sessions.size() - 1; i >= 0; i--){
|
||||
(*func)(m_sessions[i].m_session, data);
|
||||
}
|
||||
checkSessions();
|
||||
m_session_mutex.unlock();
|
||||
}
|
||||
|
||||
void
|
||||
SocketServer::checkSessions(){
|
||||
for(int i = m_sessions.size() - 1; i >= 0; i--){
|
||||
if(m_sessions[i].m_session->m_stopped){
|
||||
if(m_sessions[i].m_thread != 0){
|
||||
SocketServer::checkSessions()
|
||||
{
|
||||
m_session_mutex.lock();
|
||||
checkSessionsImpl();
|
||||
m_session_mutex.unlock();
|
||||
}
|
||||
|
||||
void
|
||||
SocketServer::checkSessionsImpl()
|
||||
{
|
||||
for(int i = m_sessions.size() - 1; i >= 0; i--)
|
||||
{
|
||||
if(m_sessions[i].m_session->m_stopped)
|
||||
{
|
||||
if(m_sessions[i].m_thread != 0)
|
||||
{
|
||||
void* ret;
|
||||
NdbThread_WaitFor(m_sessions[i].m_thread, &ret);
|
||||
NdbThread_Destroy(&m_sessions[i].m_thread);
|
||||
@@ -301,19 +320,26 @@ SocketServer::checkSessions(){
|
||||
void
|
||||
SocketServer::stopSessions(bool wait){
|
||||
int i;
|
||||
m_session_mutex.lock();
|
||||
for(i = m_sessions.size() - 1; i>=0; i--)
|
||||
{
|
||||
m_sessions[i].m_session->stopSession();
|
||||
m_sessions[i].m_session->m_stop = true; // to make sure
|
||||
}
|
||||
m_session_mutex.unlock();
|
||||
|
||||
for(i = m_services.size() - 1; i>=0; i--)
|
||||
m_services[i].m_service->stopSessions();
|
||||
|
||||
if(wait){
|
||||
m_session_mutex.lock();
|
||||
while(m_sessions.size() > 0){
|
||||
checkSessions();
|
||||
checkSessionsImpl();
|
||||
m_session_mutex.unlock();
|
||||
NdbSleep_MilliSleep(100);
|
||||
m_session_mutex.lock();
|
||||
}
|
||||
m_session_mutex.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -348,4 +374,4 @@ sessionThread_C(void* _sc){
|
||||
}
|
||||
|
||||
template class MutexVector<SocketServer::ServiceInstance>;
|
||||
template class MutexVector<SocketServer::SessionInstance>;
|
||||
template class Vector<SocketServer::SessionInstance>;
|
||||
|
||||
@@ -2649,6 +2649,22 @@ Suma::reportAllSubscribers(Signal *signal,
|
||||
SubscriptionPtr subPtr,
|
||||
SubscriberPtr subbPtr)
|
||||
{
|
||||
SubTableData * data = (SubTableData*)signal->getDataPtrSend();
|
||||
|
||||
if (table_event == NdbDictionary::Event::_TE_SUBSCRIBE)
|
||||
{
|
||||
data->gci = m_last_complete_gci + 1;
|
||||
data->tableId = subPtr.p->m_tableId;
|
||||
data->operation = NdbDictionary::Event::_TE_ACTIVE;
|
||||
data->ndbd_nodeid = refToNode(reference());
|
||||
data->changeMask = 0;
|
||||
data->totalLen = 0;
|
||||
data->req_nodeid = refToNode(subbPtr.p->m_senderRef);
|
||||
data->senderData = subbPtr.p->m_senderData;
|
||||
sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
|
||||
SubTableData::SignalLength, JBB);
|
||||
}
|
||||
|
||||
if (!(subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE))
|
||||
{
|
||||
return;
|
||||
@@ -2663,7 +2679,6 @@ Suma::reportAllSubscribers(Signal *signal,
|
||||
ndbout_c("reportAllSubscribers subPtr.i: %d subPtr.p->n_subscribers: %d",
|
||||
subPtr.i, subPtr.p->n_subscribers);
|
||||
//#endif
|
||||
SubTableData * data = (SubTableData*)signal->getDataPtrSend();
|
||||
data->gci = m_last_complete_gci + 1;
|
||||
data->tableId = subPtr.p->m_tableId;
|
||||
data->operation = table_event;
|
||||
|
||||
@@ -502,6 +502,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
|
||||
ps.tick= tick;
|
||||
m_mgmsrv.get_socket_server()->
|
||||
foreachSession(stop_session_if_timed_out,&ps);
|
||||
m_mgmsrv.get_socket_server()->checkSessions();
|
||||
error_string = "";
|
||||
continue;
|
||||
}
|
||||
@@ -1559,6 +1560,7 @@ MgmApiSession::purge_stale_sessions(Parser_t::Context &ctx,
|
||||
ps.free_nodes.bitXORC(NodeBitmask()); // invert connected_nodes to get free nodes
|
||||
|
||||
m_mgmsrv.get_socket_server()->foreachSession(stop_session_if_not_connected,&ps);
|
||||
m_mgmsrv.get_socket_server()->checkSessions();
|
||||
|
||||
m_output->println("purge stale sessions reply");
|
||||
if (str.length() > 0)
|
||||
|
||||
@@ -1324,6 +1324,12 @@ Ndb::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
|
||||
return theEventBuffer->pollEvents(aMillisecondNumber, latestGCI);
|
||||
}
|
||||
|
||||
int
|
||||
Ndb::flushIncompleteEvents(Uint64 gci)
|
||||
{
|
||||
return theEventBuffer->flushIncompleteEvents(gci);
|
||||
}
|
||||
|
||||
NdbEventOperation *Ndb::nextEvent()
|
||||
{
|
||||
return theEventBuffer->nextEvent();
|
||||
|
||||
@@ -3002,63 +3002,6 @@ NdbDictionaryImpl::removeCachedObject(NdbTableImpl & impl)
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
/*****************************************************************
|
||||
* Get index info
|
||||
*/
|
||||
NdbIndexImpl*
|
||||
NdbDictionaryImpl::getIndexImpl(const char * externalName,
|
||||
const BaseString& internalName)
|
||||
{
|
||||
ASSERT_NOT_MYSQLD;
|
||||
Ndb_local_table_info * info = get_local_table_info(internalName);
|
||||
if(info == 0){
|
||||
m_error.code = 4243;
|
||||
return 0;
|
||||
}
|
||||
NdbTableImpl * tab = info->m_table_impl;
|
||||
|
||||
if(tab->m_indexType == NdbDictionary::Object::TypeUndefined)
|
||||
{
|
||||
// Not an index
|
||||
m_error.code = 4243;
|
||||
return 0;
|
||||
}
|
||||
|
||||
NdbTableImpl* prim = getTable(tab->m_primaryTable.c_str());
|
||||
if(prim == 0){
|
||||
m_error.code = 4243;
|
||||
return 0;
|
||||
}
|
||||
|
||||
return getIndexImpl(externalName, internalName, *tab, *prim);
|
||||
}
|
||||
|
||||
NdbIndexImpl*
|
||||
NdbDictionaryImpl::getIndexImpl(const char * externalName,
|
||||
const BaseString& internalName,
|
||||
NdbTableImpl &tab,
|
||||
NdbTableImpl &prim)
|
||||
{
|
||||
DBUG_ENTER("NdbDictionaryImpl::getIndexImpl");
|
||||
DBUG_ASSERT(tab.m_indexType != NdbDictionary::Object::TypeUndefined);
|
||||
/**
|
||||
* Create index impl
|
||||
*/
|
||||
NdbIndexImpl* idx;
|
||||
if(NdbDictInterface::create_index_obj_from_table(&idx, &tab, &prim) == 0){
|
||||
idx->m_table = &tab;
|
||||
idx->m_externalName.assign(externalName);
|
||||
idx->m_internalName.assign(internalName);
|
||||
idx->m_table_id = prim.getObjectId();
|
||||
idx->m_table_version = prim.getObjectVersion();
|
||||
// TODO Assign idx to tab->m_index
|
||||
// Don't do it right now since assign can't asign a table with index
|
||||
// tab->m_index = idx;
|
||||
DBUG_RETURN(idx);
|
||||
}
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
int
|
||||
NdbDictInterface::create_index_obj_from_table(NdbIndexImpl** dst,
|
||||
NdbTableImpl* tab,
|
||||
@@ -3116,6 +3059,9 @@ NdbDictInterface::create_index_obj_from_table(NdbIndexImpl** dst,
|
||||
tab->m_columns[i]->m_distributionKey = 0;
|
||||
}
|
||||
|
||||
idx->m_table_id = prim->getObjectId();
|
||||
idx->m_table_version = prim->getObjectVersion();
|
||||
|
||||
* dst = idx;
|
||||
DBUG_PRINT("exit", ("m_id: %d m_version: %d", idx->m_id, idx->m_version));
|
||||
DBUG_RETURN(0);
|
||||
|
||||
@@ -617,6 +617,7 @@ public:
|
||||
get_local_table_info(const BaseString& internalTableName);
|
||||
NdbIndexImpl * getIndex(const char * indexName,
|
||||
const char * tableName);
|
||||
NdbIndexImpl * getIndex(const char * indexName, const NdbTableImpl& prim);
|
||||
NdbEventImpl * getEvent(const char * eventName, NdbTableImpl* = NULL);
|
||||
NdbEventImpl * getBlobEvent(const NdbEventImpl& ev, uint col_no);
|
||||
NdbEventImpl * getEventImpl(const char * internalName);
|
||||
@@ -958,51 +959,36 @@ NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName)
|
||||
DBUG_RETURN(info); // autoincrement already initialized
|
||||
}
|
||||
|
||||
class InitIndexGlobal : public GlobalCacheInitObject
|
||||
{
|
||||
public:
|
||||
const char *m_index_name;
|
||||
NdbTableImpl &m_prim;
|
||||
|
||||
InitIndexGlobal(NdbDictionaryImpl *dict,
|
||||
const BaseString &internal_indexname,
|
||||
const char *index_name,
|
||||
NdbTableImpl &prim) :
|
||||
GlobalCacheInitObject(dict, internal_indexname),
|
||||
m_index_name(index_name),
|
||||
m_prim(prim)
|
||||
{}
|
||||
int init(NdbTableImpl &tab) const
|
||||
{
|
||||
tab.m_index= m_dict->getIndexImpl(m_index_name, m_name, tab, m_prim);
|
||||
if (tab.m_index == 0)
|
||||
return 1;
|
||||
tab.m_index->m_table= &tab;
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
class InitIndex : public GlobalCacheInitObject
|
||||
{
|
||||
public:
|
||||
const char *m_index_name;
|
||||
const NdbTableImpl &m_prim;
|
||||
|
||||
InitIndex(NdbDictionaryImpl *dict,
|
||||
const BaseString &internal_indexname,
|
||||
const char *index_name) :
|
||||
GlobalCacheInitObject(dict, internal_indexname),
|
||||
m_index_name(index_name)
|
||||
{}
|
||||
int init(NdbTableImpl &tab) const
|
||||
{
|
||||
DBUG_ASSERT(tab.m_index == 0);
|
||||
tab.m_index= m_dict->getIndexImpl(m_index_name, m_name);
|
||||
if (tab.m_index)
|
||||
InitIndex(const BaseString &internal_indexname,
|
||||
const char *index_name,
|
||||
const NdbTableImpl &prim) :
|
||||
GlobalCacheInitObject(0, internal_indexname),
|
||||
m_index_name(index_name),
|
||||
m_prim(prim)
|
||||
{}
|
||||
|
||||
int init(NdbTableImpl &tab) const {
|
||||
DBUG_ENTER("InitIndex::init");
|
||||
DBUG_ASSERT(tab.m_indexType != NdbDictionary::Object::TypeUndefined);
|
||||
/**
|
||||
* Create index impl
|
||||
*/
|
||||
NdbIndexImpl* idx;
|
||||
if(NdbDictInterface::create_index_obj_from_table(&idx, &tab, &m_prim) == 0)
|
||||
{
|
||||
tab.m_index->m_table= &tab;
|
||||
return 0;
|
||||
idx->m_table = &tab;
|
||||
idx->m_externalName.assign(m_index_name);
|
||||
idx->m_internalName.assign(m_name);
|
||||
tab.m_index = idx;
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
return 1;
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1019,14 +1005,14 @@ NdbDictionaryImpl::getIndexGlobal(const char * index_name,
|
||||
while (retry)
|
||||
{
|
||||
NdbTableImpl *tab=
|
||||
fetchGlobalTableImplRef(InitIndexGlobal(this, internal_indexname,
|
||||
index_name, ndbtab));
|
||||
fetchGlobalTableImplRef(InitIndex(internal_indexname,
|
||||
index_name, ndbtab));
|
||||
if (tab)
|
||||
{
|
||||
// tab->m_index sould be set. otherwise tab == 0
|
||||
NdbIndexImpl *idx= tab->m_index;
|
||||
if (idx->m_table_id != ndbtab.getObjectId() ||
|
||||
idx->m_table_version != ndbtab.getObjectVersion())
|
||||
if (idx->m_table_id != (unsigned)ndbtab.getObjectId() ||
|
||||
idx->m_table_version != (unsigned)ndbtab.getObjectVersion())
|
||||
{
|
||||
releaseIndexGlobal(*idx, 1);
|
||||
retry--;
|
||||
@@ -1067,41 +1053,54 @@ NdbIndexImpl *
|
||||
NdbDictionaryImpl::getIndex(const char * index_name,
|
||||
const char * table_name)
|
||||
{
|
||||
while (table_name || m_ndb.usingFullyQualifiedNames())
|
||||
if (table_name == 0)
|
||||
{
|
||||
const BaseString internal_indexname(
|
||||
(table_name)
|
||||
?
|
||||
m_ndb.internalize_index_name(getTable(table_name), index_name)
|
||||
:
|
||||
m_ndb.internalize_table_name(index_name)); // Index is also a table
|
||||
|
||||
if (internal_indexname.length())
|
||||
{
|
||||
Ndb_local_table_info *info= m_localHash.get(internal_indexname.c_str());
|
||||
NdbTableImpl *tab;
|
||||
if (info == 0)
|
||||
{
|
||||
tab= fetchGlobalTableImplRef(InitIndex(this, internal_indexname,
|
||||
index_name));
|
||||
if (tab)
|
||||
{
|
||||
info= Ndb_local_table_info::create(tab, 0);
|
||||
if (info)
|
||||
m_localHash.put(internal_indexname.c_str(), info);
|
||||
else
|
||||
break;
|
||||
}
|
||||
else
|
||||
break;
|
||||
}
|
||||
else
|
||||
tab= info->m_table_impl;
|
||||
return tab->m_index;
|
||||
}
|
||||
break;
|
||||
assert(0);
|
||||
m_error.code= 4243;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
NdbTableImpl* prim = getTable(table_name);
|
||||
if (prim == 0)
|
||||
{
|
||||
m_error.code= 4243;
|
||||
return 0;
|
||||
}
|
||||
|
||||
return getIndex(index_name, *prim);
|
||||
}
|
||||
|
||||
inline
|
||||
NdbIndexImpl *
|
||||
NdbDictionaryImpl::getIndex(const char* index_name,
|
||||
const NdbTableImpl& prim)
|
||||
{
|
||||
|
||||
const BaseString
|
||||
internal_indexname(m_ndb.internalize_index_name(&prim, index_name));
|
||||
|
||||
Ndb_local_table_info *info= m_localHash.get(internal_indexname.c_str());
|
||||
NdbTableImpl *tab;
|
||||
if (info == 0)
|
||||
{
|
||||
tab= fetchGlobalTableImplRef(InitIndex(internal_indexname,
|
||||
index_name,
|
||||
prim));
|
||||
if (!tab)
|
||||
goto err;
|
||||
|
||||
info= Ndb_local_table_info::create(tab, 0);
|
||||
if (!info)
|
||||
goto err;
|
||||
m_localHash.put(internal_indexname.c_str(), info);
|
||||
}
|
||||
else
|
||||
tab= info->m_table_impl;
|
||||
|
||||
return tab->m_index;
|
||||
|
||||
err:
|
||||
m_error.code= 4243;
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -153,11 +153,14 @@ NdbEventOperationImpl::init(NdbEventImpl& evnt)
|
||||
|
||||
m_state= EO_CREATED;
|
||||
|
||||
m_node_bit_mask.clear();
|
||||
#ifdef ndb_event_stores_merge_events_flag
|
||||
m_mergeEvents = m_eventImpl->m_mergeEvents;
|
||||
#else
|
||||
m_mergeEvents = false;
|
||||
m_mergeEvents = false;
|
||||
#endif
|
||||
m_ref_count = 0;
|
||||
DBUG_PRINT("info", ("m_ref_count = 0 for op: %p", this));
|
||||
|
||||
m_has_error= 0;
|
||||
|
||||
@@ -530,7 +533,11 @@ NdbEventOperationImpl::execute_nolock()
|
||||
}
|
||||
}
|
||||
if (r == 0)
|
||||
{
|
||||
m_ref_count++;
|
||||
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
}
|
||||
//Error
|
||||
m_state= EO_ERROR;
|
||||
@@ -657,80 +664,79 @@ NdbEventOperationImpl::execSUB_TABLE_DATA(NdbApiSignal * signal,
|
||||
int
|
||||
NdbEventOperationImpl::receive_event()
|
||||
{
|
||||
DBUG_ENTER_EVENT("NdbEventOperationImpl::receive_event");
|
||||
|
||||
Uint32 operation= (Uint32)m_data_item->sdata->operation;
|
||||
DBUG_PRINT_EVENT("info",("sdata->operation %u",operation));
|
||||
|
||||
if (operation == NdbDictionary::Event::_TE_ALTER)
|
||||
{
|
||||
// Parse the new table definition and
|
||||
// create a table object
|
||||
NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
|
||||
NdbDictionaryImpl *dict = & NdbDictionaryImpl::getImpl(*myDict);
|
||||
NdbError error;
|
||||
NdbDictInterface dif(error);
|
||||
NdbTableImpl *at;
|
||||
m_change_mask = m_data_item->sdata->changeMask;
|
||||
error.code = dif.parseTableInfo(&at,
|
||||
(Uint32*)m_buffer.get_data(),
|
||||
m_buffer.length() / 4,
|
||||
true);
|
||||
m_buffer.clear();
|
||||
if (at)
|
||||
at->buildColumnHash();
|
||||
else
|
||||
{
|
||||
DBUG_PRINT_EVENT("info", ("Failed to parse DictTabInfo error %u",
|
||||
error.code));
|
||||
DBUG_RETURN_EVENT(1);
|
||||
}
|
||||
|
||||
NdbTableImpl *tmp_table_impl= m_eventImpl->m_tableImpl;
|
||||
m_eventImpl->m_tableImpl = at;
|
||||
|
||||
DBUG_PRINT("info", ("switching table impl 0x%x -> 0x%x",
|
||||
tmp_table_impl, at));
|
||||
|
||||
// change the rec attrs to refer to the new table object
|
||||
int i;
|
||||
for (i = 0; i < 2; i++)
|
||||
{
|
||||
NdbRecAttr *p = theFirstPkAttrs[i];
|
||||
while (p)
|
||||
{
|
||||
int no = p->getColumn()->getColumnNo();
|
||||
NdbColumnImpl *tAttrInfo = at->getColumn(no);
|
||||
DBUG_PRINT("info", ("rec_attr: 0x%x "
|
||||
"switching column impl 0x%x -> 0x%x",
|
||||
p, p->m_column, tAttrInfo));
|
||||
p->m_column = tAttrInfo;
|
||||
p = p->next();
|
||||
}
|
||||
}
|
||||
for (i = 0; i < 2; i++)
|
||||
{
|
||||
NdbRecAttr *p = theFirstDataAttrs[i];
|
||||
while (p)
|
||||
{
|
||||
int no = p->getColumn()->getColumnNo();
|
||||
NdbColumnImpl *tAttrInfo = at->getColumn(no);
|
||||
DBUG_PRINT("info", ("rec_attr: 0x%x "
|
||||
"switching column impl 0x%x -> 0x%x",
|
||||
p, p->m_column, tAttrInfo));
|
||||
p->m_column = tAttrInfo;
|
||||
p = p->next();
|
||||
}
|
||||
}
|
||||
if (tmp_table_impl)
|
||||
delete tmp_table_impl;
|
||||
}
|
||||
|
||||
if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
|
||||
{
|
||||
DBUG_RETURN_EVENT(1);
|
||||
DBUG_ENTER("NdbEventOperationImpl::receive_event");
|
||||
DBUG_PRINT("info",("sdata->operation %u this: %p", operation, this));
|
||||
if (operation == NdbDictionary::Event::_TE_ALTER)
|
||||
{
|
||||
// Parse the new table definition and
|
||||
// create a table object
|
||||
NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
|
||||
NdbDictionaryImpl *dict = & NdbDictionaryImpl::getImpl(*myDict);
|
||||
NdbError error;
|
||||
NdbDictInterface dif(error);
|
||||
NdbTableImpl *at;
|
||||
m_change_mask = m_data_item->sdata->changeMask;
|
||||
error.code = dif.parseTableInfo(&at,
|
||||
(Uint32*)m_buffer.get_data(),
|
||||
m_buffer.length() / 4,
|
||||
true);
|
||||
m_buffer.clear();
|
||||
if (unlikely(!at))
|
||||
{
|
||||
DBUG_PRINT("info", ("Failed to parse DictTabInfo error %u",
|
||||
error.code));
|
||||
ndbout_c("Failed to parse DictTabInfo error %u", error.code);
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
at->buildColumnHash();
|
||||
|
||||
NdbTableImpl *tmp_table_impl= m_eventImpl->m_tableImpl;
|
||||
m_eventImpl->m_tableImpl = at;
|
||||
|
||||
DBUG_PRINT("info", ("switching table impl 0x%x -> 0x%x",
|
||||
tmp_table_impl, at));
|
||||
|
||||
// change the rec attrs to refer to the new table object
|
||||
int i;
|
||||
for (i = 0; i < 2; i++)
|
||||
{
|
||||
NdbRecAttr *p = theFirstPkAttrs[i];
|
||||
while (p)
|
||||
{
|
||||
int no = p->getColumn()->getColumnNo();
|
||||
NdbColumnImpl *tAttrInfo = at->getColumn(no);
|
||||
DBUG_PRINT("info", ("rec_attr: 0x%x "
|
||||
"switching column impl 0x%x -> 0x%x",
|
||||
p, p->m_column, tAttrInfo));
|
||||
p->m_column = tAttrInfo;
|
||||
p = p->next();
|
||||
}
|
||||
}
|
||||
for (i = 0; i < 2; i++)
|
||||
{
|
||||
NdbRecAttr *p = theFirstDataAttrs[i];
|
||||
while (p)
|
||||
{
|
||||
int no = p->getColumn()->getColumnNo();
|
||||
NdbColumnImpl *tAttrInfo = at->getColumn(no);
|
||||
DBUG_PRINT("info", ("rec_attr: 0x%x "
|
||||
"switching column impl 0x%x -> 0x%x",
|
||||
p, p->m_column, tAttrInfo));
|
||||
p->m_column = tAttrInfo;
|
||||
p = p->next();
|
||||
}
|
||||
}
|
||||
if (tmp_table_impl)
|
||||
delete tmp_table_impl;
|
||||
}
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
|
||||
DBUG_ENTER_EVENT("NdbEventOperationImpl::receive_event");
|
||||
DBUG_PRINT_EVENT("info",("sdata->operation %u this: %p", operation, this));
|
||||
// now move the data into the RecAttrs
|
||||
|
||||
int is_update= operation == NdbDictionary::Event::_TE_UPDATE;
|
||||
@@ -1089,6 +1095,33 @@ NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int
|
||||
NdbEventBuffer::flushIncompleteEvents(Uint64 gci)
|
||||
{
|
||||
/**
|
||||
* Find min complete gci
|
||||
*/
|
||||
Uint32 i;
|
||||
Uint32 sz= m_active_gci.size();
|
||||
Gci_container* array = (Gci_container*)m_active_gci.getBase();
|
||||
for(i = 0; i < sz; i++)
|
||||
{
|
||||
Gci_container* tmp = array + i;
|
||||
if (tmp->m_gci && tmp->m_gci < gci)
|
||||
{
|
||||
// we have found an old not-completed gci, remove it
|
||||
ndbout_c("ndb: flushing incomplete epoch %lld (<%lld)", tmp->m_gci, gci);
|
||||
if(!tmp->m_data.is_empty())
|
||||
{
|
||||
free_list(tmp->m_data);
|
||||
}
|
||||
tmp->~Gci_container();
|
||||
bzero(tmp, sizeof(Gci_container));
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
NdbEventOperation *
|
||||
NdbEventBuffer::nextEvent()
|
||||
{
|
||||
@@ -1157,7 +1190,10 @@ NdbEventBuffer::nextEvent()
|
||||
}
|
||||
EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
|
||||
while (gci_ops && op->getGCI() > gci_ops->m_gci)
|
||||
{
|
||||
deleteUsedEventOperations();
|
||||
gci_ops = m_available_data.next_gci_ops();
|
||||
}
|
||||
assert(gci_ops && (op->getGCI() == gci_ops->m_gci));
|
||||
DBUG_RETURN_EVENT(op->m_facade);
|
||||
}
|
||||
@@ -1177,7 +1213,10 @@ NdbEventBuffer::nextEvent()
|
||||
// free all "per gci unique" collected operations
|
||||
EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
|
||||
while (gci_ops)
|
||||
{
|
||||
deleteUsedEventOperations();
|
||||
gci_ops = m_available_data.next_gci_ops();
|
||||
}
|
||||
DBUG_RETURN_EVENT(0);
|
||||
}
|
||||
|
||||
@@ -1191,31 +1230,37 @@ NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types)
|
||||
EventBufData_list::Gci_op g = gci_ops->m_gci_op_list[(*iter)++];
|
||||
if (event_types != NULL)
|
||||
*event_types = g.event_types;
|
||||
DBUG_PRINT("info", ("gci: %d", (unsigned)gci_ops->m_gci));
|
||||
DBUG_PRINT("info", ("gci: %d g.op: %x g.event_types: %x",
|
||||
(unsigned)gci_ops->m_gci, g.op, g.event_types));
|
||||
DBUG_RETURN(g.op);
|
||||
}
|
||||
DBUG_RETURN(NULL);
|
||||
}
|
||||
|
||||
void
|
||||
NdbEventBuffer::lock()
|
||||
NdbEventBuffer::deleteUsedEventOperations()
|
||||
{
|
||||
NdbMutex_Lock(m_mutex);
|
||||
}
|
||||
void
|
||||
NdbEventBuffer::unlock()
|
||||
{
|
||||
NdbMutex_Unlock(m_mutex);
|
||||
}
|
||||
void
|
||||
NdbEventBuffer::add_drop_lock()
|
||||
{
|
||||
NdbMutex_Lock(p_add_drop_mutex);
|
||||
}
|
||||
void
|
||||
NdbEventBuffer::add_drop_unlock()
|
||||
{
|
||||
NdbMutex_Unlock(p_add_drop_mutex);
|
||||
Uint32 iter= 0;
|
||||
const NdbEventOperation *op_f;
|
||||
while ((op_f= getGCIEventOperations(&iter, NULL)) != NULL)
|
||||
{
|
||||
NdbEventOperationImpl *op = &op_f->m_impl;
|
||||
DBUG_ASSERT(op->m_ref_count > 0);
|
||||
op->m_ref_count--;
|
||||
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
|
||||
if (op->m_ref_count == 0)
|
||||
{
|
||||
DBUG_PRINT("info", ("deleting op: %p", op));
|
||||
DBUG_ASSERT(op->m_node_bit_mask.isclear());
|
||||
if (op->m_next)
|
||||
op->m_next->m_prev = op->m_prev;
|
||||
if (op->m_prev)
|
||||
op->m_prev->m_next = op->m_next;
|
||||
else
|
||||
m_dropped_ev_op = op->m_next;
|
||||
delete op->m_facade;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static
|
||||
@@ -1469,6 +1514,10 @@ NdbEventBuffer::complete_outof_order_gcis()
|
||||
void
|
||||
NdbEventBuffer::report_node_failure(Uint32 node_id)
|
||||
{
|
||||
NdbEventOperation* op= m_ndb->getEventOperation(0);
|
||||
if (op == 0)
|
||||
return;
|
||||
|
||||
DBUG_ENTER("NdbEventBuffer::report_node_failure");
|
||||
SubTableData data;
|
||||
LinearSectionPtr ptr[3];
|
||||
@@ -1484,12 +1533,20 @@ NdbEventBuffer::report_node_failure(Uint32 node_id)
|
||||
/**
|
||||
* Insert this event for each operation
|
||||
*/
|
||||
NdbEventOperation* op= 0;
|
||||
while((op = m_ndb->getEventOperation(op)))
|
||||
{
|
||||
NdbEventOperationImpl* impl= &op->m_impl;
|
||||
data.senderData = impl->m_oid;
|
||||
insertDataL(impl, &data, ptr);
|
||||
// no need to lock()/unlock(), receive thread calls this
|
||||
NdbEventOperationImpl* impl = &op->m_impl;
|
||||
do if (!impl->m_node_bit_mask.isclear())
|
||||
{
|
||||
data.senderData = impl->m_oid;
|
||||
insertDataL(impl, &data, ptr);
|
||||
} while((impl = impl->m_next));
|
||||
for (impl = m_dropped_ev_op; impl; impl = impl->m_next)
|
||||
if (!impl->m_node_bit_mask.isclear())
|
||||
{
|
||||
data.senderData = impl->m_oid;
|
||||
insertDataL(impl, &data, ptr);
|
||||
}
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
@@ -1515,12 +1572,21 @@ NdbEventBuffer::completeClusterFailed()
|
||||
/**
|
||||
* Insert this event for each operation
|
||||
*/
|
||||
do
|
||||
{
|
||||
NdbEventOperationImpl* impl= &op->m_impl;
|
||||
data.senderData = impl->m_oid;
|
||||
insertDataL(impl, &data, ptr);
|
||||
} while((op = m_ndb->getEventOperation(op)));
|
||||
// no need to lock()/unlock(), receive thread calls this
|
||||
NdbEventOperationImpl* impl = &op->m_impl;
|
||||
do if (!impl->m_node_bit_mask.isclear())
|
||||
{
|
||||
data.senderData = impl->m_oid;
|
||||
insertDataL(impl, &data, ptr);
|
||||
} while((impl = impl->m_next));
|
||||
for (impl = m_dropped_ev_op; impl; impl = impl->m_next)
|
||||
if (!impl->m_node_bit_mask.isclear())
|
||||
{
|
||||
data.senderData = impl->m_oid;
|
||||
insertDataL(impl, &data, ptr);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Release all GCI's with m_gci > gci
|
||||
@@ -1565,7 +1631,11 @@ NdbEventBuffer::completeClusterFailed()
|
||||
}
|
||||
}
|
||||
|
||||
assert(bucket != 0);
|
||||
if (bucket == 0)
|
||||
{
|
||||
// no bucket to complete
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
const Uint32 cnt= bucket->m_gcp_complete_rep_count = 1;
|
||||
bucket->m_gci = gci;
|
||||
@@ -1595,6 +1665,40 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
|
||||
{
|
||||
DBUG_ENTER_EVENT("NdbEventBuffer::insertDataL");
|
||||
Uint64 gci= sdata->gci;
|
||||
const bool is_data_event =
|
||||
sdata->operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
|
||||
|
||||
if (!is_data_event)
|
||||
{
|
||||
switch (sdata->operation)
|
||||
{
|
||||
case NdbDictionary::Event::_TE_NODE_FAILURE:
|
||||
op->m_node_bit_mask.clear(sdata->ndbd_nodeid);
|
||||
break;
|
||||
case NdbDictionary::Event::_TE_ACTIVE:
|
||||
op->m_node_bit_mask.set(sdata->ndbd_nodeid);
|
||||
// internal event, do not relay to user
|
||||
DBUG_RETURN_EVENT(0);
|
||||
break;
|
||||
case NdbDictionary::Event::_TE_CLUSTER_FAILURE:
|
||||
op->m_node_bit_mask.clear();
|
||||
DBUG_ASSERT(op->m_ref_count > 0);
|
||||
op->m_ref_count--;
|
||||
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
|
||||
break;
|
||||
case NdbDictionary::Event::_TE_STOP:
|
||||
op->m_node_bit_mask.clear(sdata->ndbd_nodeid);
|
||||
if (op->m_node_bit_mask.isclear())
|
||||
{
|
||||
DBUG_ASSERT(op->m_ref_count > 0);
|
||||
op->m_ref_count--;
|
||||
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if ( likely((Uint32)op->mi_type & (1 << (Uint32)sdata->operation)) )
|
||||
{
|
||||
@@ -1615,8 +1719,6 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
|
||||
}
|
||||
|
||||
const bool is_blob_event = (op->theMainOp != NULL);
|
||||
const bool is_data_event =
|
||||
sdata->operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
|
||||
const bool use_hash = op->m_mergeEvents && is_data_event;
|
||||
|
||||
if (! is_data_event && is_blob_event)
|
||||
@@ -2244,6 +2346,8 @@ void EventBufData_list::append_list(EventBufData_list *list, Uint64 gci)
|
||||
void
|
||||
EventBufData_list::add_gci_op(Gci_op g, bool del)
|
||||
{
|
||||
DBUG_ENTER_EVENT("EventBufData_list::add_gci_op");
|
||||
DBUG_PRINT_EVENT("info", ("p.op: %p g.event_types: %x", g.op, g.event_types));
|
||||
assert(g.op != NULL);
|
||||
Uint32 i;
|
||||
for (i = 0; i < m_gci_op_count; i++) {
|
||||
@@ -2273,8 +2377,15 @@ EventBufData_list::add_gci_op(Gci_op g, bool del)
|
||||
}
|
||||
assert(m_gci_op_count < m_gci_op_alloc);
|
||||
assert(! del);
|
||||
#ifndef DBUG_OFF
|
||||
i = m_gci_op_count;
|
||||
#endif
|
||||
g.op->m_ref_count++;
|
||||
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", g.op->m_ref_count, g.op));
|
||||
m_gci_op_list[m_gci_op_count++] = g;
|
||||
}
|
||||
DBUG_PRINT_EVENT("exit", ("m_gci_op_list[%u].event_types: %x", i, m_gci_op_list[i].event_types));
|
||||
DBUG_VOID_RETURN_EVENT;
|
||||
}
|
||||
|
||||
void
|
||||
@@ -2337,6 +2448,9 @@ NdbEventBuffer::createEventOperation(const char* eventName,
|
||||
delete tOp;
|
||||
DBUG_RETURN(NULL);
|
||||
}
|
||||
getEventOperationImpl(tOp)->m_ref_count = 1;
|
||||
DBUG_PRINT("info", ("m_ref_count: %u for op: %p",
|
||||
getEventOperationImpl(tOp)->m_ref_count, getEventOperationImpl(tOp)));
|
||||
DBUG_RETURN(tOp);
|
||||
}
|
||||
|
||||
@@ -2362,16 +2476,10 @@ NdbEventBuffer::createEventOperation(NdbEventImpl& evnt,
|
||||
void
|
||||
NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp)
|
||||
{
|
||||
DBUG_ENTER("NdbEventBuffer::dropEventOperation");
|
||||
NdbEventOperationImpl* op= getEventOperationImpl(tOp);
|
||||
|
||||
op->stop();
|
||||
|
||||
op->m_next= m_dropped_ev_op;
|
||||
op->m_prev= 0;
|
||||
if (m_dropped_ev_op)
|
||||
m_dropped_ev_op->m_prev= op;
|
||||
m_dropped_ev_op= op;
|
||||
|
||||
// stop blob event ops
|
||||
if (op->theMainOp == NULL)
|
||||
{
|
||||
@@ -2391,11 +2499,24 @@ NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp)
|
||||
}
|
||||
}
|
||||
|
||||
// ToDo, take care of these to be deleted at the
|
||||
// appropriate time, after we are sure that there
|
||||
// are _no_ more events coming
|
||||
|
||||
// delete tOp;
|
||||
DBUG_ASSERT(op->m_ref_count > 0);
|
||||
op->m_ref_count--;
|
||||
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
|
||||
if (op->m_ref_count == 0)
|
||||
{
|
||||
DBUG_PRINT("info", ("deleting op: %p", op));
|
||||
DBUG_ASSERT(op->m_node_bit_mask.isclear());
|
||||
delete op->m_facade;
|
||||
}
|
||||
else
|
||||
{
|
||||
op->m_next= m_dropped_ev_op;
|
||||
op->m_prev= 0;
|
||||
if (m_dropped_ev_op)
|
||||
m_dropped_ev_op->m_prev= op;
|
||||
m_dropped_ev_op= op;
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -367,6 +367,8 @@ public:
|
||||
Uint32 m_eventId;
|
||||
Uint32 m_oid;
|
||||
|
||||
Bitmask<(unsigned int)_NDB_NODE_BITMASK_SIZE> m_node_bit_mask;
|
||||
int m_ref_count;
|
||||
bool m_mergeEvents;
|
||||
|
||||
EventBufData *m_data_item;
|
||||
@@ -406,10 +408,10 @@ public:
|
||||
void dropEventOperation(NdbEventOperation *);
|
||||
static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp);
|
||||
|
||||
void add_drop_lock();
|
||||
void add_drop_unlock();
|
||||
void lock();
|
||||
void unlock();
|
||||
void add_drop_lock() { NdbMutex_Lock(p_add_drop_mutex); }
|
||||
void add_drop_unlock() { NdbMutex_Unlock(p_add_drop_mutex); }
|
||||
void lock() { NdbMutex_Lock(m_mutex); }
|
||||
void unlock() { NdbMutex_Unlock(m_mutex); }
|
||||
|
||||
void add_op();
|
||||
void remove_op();
|
||||
@@ -430,9 +432,11 @@ public:
|
||||
Uint32 getEventId(int bufferId);
|
||||
|
||||
int pollEvents(int aMillisecondNumber, Uint64 *latestGCI= 0);
|
||||
int flushIncompleteEvents(Uint64 gci);
|
||||
NdbEventOperation *nextEvent();
|
||||
NdbEventOperationImpl* getGCIEventOperations(Uint32* iter,
|
||||
Uint32* event_types);
|
||||
void deleteUsedEventOperations();
|
||||
|
||||
NdbEventOperationImpl *move_data();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user