diff --git a/mysql-test/r/ndb_autodiscover3.result b/mysql-test/r/ndb_autodiscover3.result index 6e3e5fcc335..86495ebb3eb 100644 --- a/mysql-test/r/ndb_autodiscover3.result +++ b/mysql-test/r/ndb_autodiscover3.result @@ -18,6 +18,7 @@ select * from t2; ERROR 42S02: Table 'test.t2' doesn't exist show tables like 't2'; Tables_in_test (t2) +reset master; create table t2 (a int key) engine=ndbcluster; insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10); select * from t2 order by a limit 3; @@ -30,10 +31,12 @@ a 1 2 3 +reset master; select * from t2; ERROR 42S02: Table 'test.t2' doesn't exist show tables like 't2'; Tables_in_test (t2) +reset master; create table t2 (a int key) engine=ndbcluster; insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10); select * from t2 order by a limit 3; @@ -46,4 +49,5 @@ a 1 2 3 +reset master; drop table t2; diff --git a/mysql-test/t/disabled.def b/mysql-test/t/disabled.def index 746a117ccd6..562006c7687 100644 --- a/mysql-test/t/disabled.def +++ b/mysql-test/t/disabled.def @@ -16,7 +16,7 @@ events_scheduling : BUG#19170 2006-04-26 andrey Test case of 19170 fails events_logs_tests : BUG#17619 2006-05-16 andrey Test case problems ndb_autodiscover : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog ndb_autodiscover2 : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog -ndb_binlog_discover : BUG#19395 2006-04-28 tomas/knielsen mysqld does not always detect cluster shutdown +#ndb_binlog_discover : BUG#19395 2006-04-28 tomas/knielsen mysqld does not always detect cluster shutdown #ndb_cache2 : BUG#18597 2006-03-28 brian simultaneous drop table and ndb statistics update triggers node failure #ndb_cache_multi2 : BUG#18597 2006-04-10 kent simultaneous drop table and ndb statistics update triggers node failure ndb_load : BUG#17233 2006-05-04 tomas failed load data from infile causes mysqld dbug_assert, binlog not flushed diff --git a/mysql-test/t/ndb_autodiscover3.test b/mysql-test/t/ndb_autodiscover3.test index afbebc4dd03..ed75c89cdd1 100644 --- a/mysql-test/t/ndb_autodiscover3.test +++ b/mysql-test/t/ndb_autodiscover3.test @@ -43,6 +43,7 @@ select * from t2 order by a limit 3; --error ER_NO_SUCH_TABLE select * from t2; show tables like 't2'; +reset master; create table t2 (a int key) engine=ndbcluster; insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10); select * from t2 order by a limit 3; @@ -50,6 +51,7 @@ select * from t2 order by a limit 3; # server 1 should have a stale cache, and in this case wrong frm, transaction must be retried --connection server1 select * from t2 order by a limit 3; +reset master; --exec $NDB_MGM --no-defaults -e "all restart -i" >> $NDB_TOOLS_OUTPUT --exec $NDB_TOOLS_DIR/ndb_waiter --no-defaults >> $NDB_TOOLS_OUTPUT @@ -60,6 +62,7 @@ select * from t2 order by a limit 3; --error ER_NO_SUCH_TABLE select * from t2; show tables like 't2'; +reset master; create table t2 (a int key) engine=ndbcluster; insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10); select * from t2 order by a limit 3; @@ -67,6 +70,7 @@ select * from t2 order by a limit 3; # server 2 should have a stale cache, but with right frm, transaction need not be retried --connection server2 select * from t2 order by a limit 3; +reset master; drop table t2; # End of 4.1 tests diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 5a1d4f48c9b..f46a5eccabf 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -182,6 +182,8 @@ static const char * ndb_connected_host= 0; static long ndb_connected_port= 0; static long ndb_number_of_replicas= 0; long ndb_number_of_storage_nodes= 0; +long ndb_number_of_ready_storage_nodes= 0; +long ndb_connect_count= 0; static int update_status_variables(Ndb_cluster_connection *c) { @@ -190,6 +192,8 @@ static int update_status_variables(Ndb_cluster_connection *c) ndb_connected_host= c->get_connected_host(); ndb_number_of_replicas= 0; ndb_number_of_storage_nodes= c->no_db_nodes(); + ndb_number_of_ready_storage_nodes= c->get_no_ready(); + ndb_connect_count= c->get_connect_count(); return 0; } @@ -7128,10 +7132,6 @@ void ndbcluster_real_free_share(NDB_SHARE **share) #ifndef DBUG_OFF bzero((gptr)(*share)->table_share, sizeof(*(*share)->table_share)); bzero((gptr)(*share)->table, sizeof(*(*share)->table)); -#endif - my_free((gptr) (*share)->table_share, MYF(0)); - my_free((gptr) (*share)->table, MYF(0)); -#ifndef DBUG_OFF (*share)->table_share= 0; (*share)->table= 0; #endif @@ -9361,11 +9361,15 @@ ndbcluster_show_status(THD* thd, stat_print_fn *stat_print, "cluster_node_id=%u, " "connected_host=%s, " "connected_port=%u, " - "number_of_storage_nodes=%u", + "number_of_storage_nodes=%u, " + "number_of_ready_storage_nodes=%u, " + "connect_count=%u", ndb_cluster_node_id, ndb_connected_host, ndb_connected_port, - ndb_number_of_storage_nodes); + ndb_number_of_storage_nodes, + ndb_number_of_ready_storage_nodes, + ndb_connect_count); if (stat_print(thd, ndbcluster_hton.name, strlen(ndbcluster_hton.name), "connection", strlen("connection"), buf, buflen)) diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h index f407cb0090f..78c9ec765bb 100644 --- a/sql/ha_ndbcluster.h +++ b/sql/ha_ndbcluster.h @@ -113,6 +113,7 @@ typedef struct st_ndbcluster_share { char *old_names; // for rename table TABLE_SHARE *table_share; TABLE *table; + byte *record[2]; // pointer to allocated records for receiving data NdbValue *ndb_value[2]; MY_BITMAP *subscriber_bitmap; #endif diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index bae2a3cbd9f..144c073d565 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -25,6 +25,7 @@ #include "slave.h" #include "ha_ndbcluster_binlog.h" #include "NdbDictionary.hpp" +#include #ifdef ndb_dynamite #undef assert @@ -265,7 +266,8 @@ ndbcluster_binlog_close_table(THD *thd, NDB_SHARE *share) static int ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share, - TABLE_SHARE *table_share, TABLE *table) + TABLE_SHARE *table_share, TABLE *table, + int reopen) { int error; DBUG_ENTER("ndbcluster_binlog_open_table"); @@ -278,27 +280,34 @@ ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share, share->key, error); DBUG_PRINT("error", ("open_table_def failed %d", error)); free_table_share(table_share); - my_free((gptr) table_share, MYF(0)); - my_free((gptr) table, MYF(0)); DBUG_RETURN(error); } - if ((error= open_table_from_share(thd, table_share, "", 0, + if ((error= open_table_from_share(thd, table_share, "", 0 /* fon't allocate buffers */, (uint) READ_ALL, 0, table, FALSE))) { sql_print_error("Unable to open table for %s, error=%d(%d)", share->key, error, my_errno); DBUG_PRINT("error", ("open_table_from_share failed %d", error)); free_table_share(table_share); - my_free((gptr) table_share, MYF(0)); - my_free((gptr) table, MYF(0)); DBUG_RETURN(error); } assign_new_table_id(table_share); - if (!table->record[1] || table->record[1] == table->record[0]) + + if (!reopen) { - table->record[1]= alloc_root(&table->mem_root, - table->s->rec_buff_length); + // allocate memory on ndb share so it can be reused after online alter table + share->record[0]= (byte*) alloc_root(&share->mem_root, table->s->rec_buff_length); + share->record[1]= (byte*) alloc_root(&share->mem_root, table->s->rec_buff_length); } + { + my_ptrdiff_t row_offset= share->record[0] - table->record[0]; + Field **p_field; + for (p_field= table->field; *p_field; p_field++) + (*p_field)->move_field_offset(row_offset); + table->record[0]= share->record[0]; + table->record[1]= share->record[1]; + } + table->in_use= injector_thd; table->s->db.str= share->db; @@ -366,10 +375,9 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) while (1) { int error; - TABLE_SHARE *table_share= - (TABLE_SHARE *) my_malloc(sizeof(*table_share), MYF(MY_WME)); - TABLE *table= (TABLE*) my_malloc(sizeof(*table), MYF(MY_WME)); - if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table))) + TABLE_SHARE *table_share= (TABLE_SHARE *) alloc_root(mem_root, sizeof(*table_share)); + TABLE *table= (TABLE*) alloc_root(mem_root, sizeof(*table)); + if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table, 0))) break; /* ! do not touch the contents of the table @@ -1535,6 +1543,10 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, sql_print_information("NDB: Failed write frm for %s.%s, error %d", dbname, tabname, error); } + + // copy names as memory will be freed + NdbAutoPtr a1((char *)(dbname= strdup(dbname))); + NdbAutoPtr a2((char *)(tabname= strdup(tabname))); ndbcluster_binlog_close_table(thd, share); TABLE_LIST table_list; @@ -1543,10 +1555,16 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, table_list.alias= table_list.table_name= (char *)tabname; close_cached_tables(thd, 0, &table_list, TRUE); - if ((error= ndbcluster_binlog_open_table(thd, share, - table_share, table))) + if ((error= ndbcluster_binlog_open_table(thd, share, + table_share, table, 1))) sql_print_information("NDB: Failed to re-open table %s.%s", dbname, tabname); + + table= share->table; + table_share= share->table_share; + dbname= table_share->db.str; + tabname= table_share->table_name.str; + pthread_mutex_unlock(&LOCK_open); } my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR)); @@ -1776,7 +1794,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, break; case NDBEVENT::TE_CLUSTER_FAILURE: if (ndb_extra_logging) - sql_print_information("NDB Binlog: cluster failure for %s.", schema_share->key); + sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.", + schema_share->key, (unsigned) pOp->getGCI()); // fall through case NDBEVENT::TE_DROP: if (ndb_extra_logging && @@ -1785,7 +1804,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, "read only on reconnect."); free_share(&schema_share); schema_share= 0; - ndb_binlog_tables_inited= FALSE; close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, FALSE); // fall through case NDBEVENT::TE_ALTER: @@ -2829,7 +2847,8 @@ ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb, { case NDBEVENT::TE_CLUSTER_FAILURE: if (ndb_extra_logging) - sql_print_information("NDB Binlog: cluster failure for %s.", share->key); + sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.", + share->key, (unsigned) pOp->getGCI()); if (apply_status_share == share) { if (ndb_extra_logging && @@ -2838,7 +2857,6 @@ ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb, "read only on reconnect."); free_share(&apply_status_share); apply_status_share= 0; - ndb_binlog_tables_inited= FALSE; } DBUG_PRINT("info", ("CLUSTER FAILURE EVENT: " "%s received share: 0x%lx op: %lx share op: %lx " @@ -2854,7 +2872,6 @@ ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb, "read only on reconnect."); free_share(&apply_status_share); apply_status_share= 0; - ndb_binlog_tables_inited= FALSE; } /* ToDo: remove printout */ if (ndb_extra_logging) @@ -3267,46 +3284,43 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) pthread_mutex_unlock(&injector_mutex); pthread_cond_signal(&injector_cond); - thd->proc_info= "Waiting for ndbcluster to start"; - - pthread_mutex_lock(&injector_mutex); - while (!schema_share || - (ndb_binlog_running && !apply_status_share)) - { - /* ndb not connected yet */ - struct timespec abstime; - set_timespec(abstime, 1); - pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime); - if (abort_loop) - { - pthread_mutex_unlock(&injector_mutex); - goto err; - } - } - pthread_mutex_unlock(&injector_mutex); - +restart: /* Main NDB Injector loop */ + { + thd->proc_info= "Waiting for ndbcluster to start"; - DBUG_ASSERT(ndbcluster_hton.slot != ~(uint)0); - if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb())) - { - sql_print_error("Could not allocate Thd_ndb object"); - goto err; - } - set_thd_ndb(thd, thd_ndb); - thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP; - thd->query_id= 0; // to keep valgrind quiet - { - static char db[]= ""; - thd->db= db; - if (ndb_binlog_running) - open_binlog_index(thd, &binlog_tables, &binlog_index); - thd->db= db; + pthread_mutex_lock(&injector_mutex); + while (!schema_share || + (ndb_binlog_running && !apply_status_share)) + { + /* ndb not connected yet */ + struct timespec abstime; + set_timespec(abstime, 1); + pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime); + if (abort_loop) + { + pthread_mutex_unlock(&injector_mutex); + goto err; + } + } + pthread_mutex_unlock(&injector_mutex); + + if (thd_ndb == NULL) + { + DBUG_ASSERT(ndbcluster_hton.slot != ~(uint)0); + if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb())) + { + sql_print_error("Could not allocate Thd_ndb object"); + goto err; + } + set_thd_ndb(thd, thd_ndb); + thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP; + thd->query_id= 0; // to keep valgrind quiet + } } -restart: { // wait for the first event thd->proc_info= "Waiting for first event from ndbcluster"; @@ -3321,6 +3335,9 @@ restart: DBUG_PRINT("info", ("schema_res: %d schema_gci: %d", schema_res, schema_gci)); if (schema_res > 0) { + i_ndb->pollEvents(0); + i_ndb->flushIncompleteEvents(schema_gci); + s_ndb->flushIncompleteEvents(schema_gci); if (schema_gci < ndb_latest_handled_binlog_epoch) { sql_print_error("NDB Binlog: cluster has been restarted --initial or with older filesystem. " @@ -3334,7 +3351,13 @@ restart: } } } - + { + static char db[]= ""; + thd->db= db; + if (ndb_binlog_running) + open_binlog_index(thd, &binlog_tables, &binlog_index); + thd->db= db; + } do_ndbcluster_binlog_close_connection= BCCC_running; for ( ; !((abort_loop || do_ndbcluster_binlog_close_connection) && ndb_latest_handled_binlog_epoch >= g_latest_trans_gci) && @@ -3683,7 +3706,12 @@ restart: ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch; } if (do_ndbcluster_binlog_close_connection == BCCC_restart) + { + ndb_binlog_tables_inited= FALSE; + close_thread_tables(thd); + binlog_index= 0; goto restart; + } err: DBUG_PRINT("info",("Shutting down cluster binlog thread")); thd->proc_info= "Shutting down"; diff --git a/storage/ndb/include/ndbapi/Ndb.hpp b/storage/ndb/include/ndbapi/Ndb.hpp index 010b85b03a9..f6f313e9224 100644 --- a/storage/ndb/include/ndbapi/Ndb.hpp +++ b/storage/ndb/include/ndbapi/Ndb.hpp @@ -1262,6 +1262,7 @@ public: #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL + int flushIncompleteEvents(Uint64 gci); NdbEventOperation *getEventOperation(NdbEventOperation* eventOp= 0); Uint64 getLatestGCI(); void forceGCP(); diff --git a/storage/ndb/include/ndbapi/NdbDictionary.hpp b/storage/ndb/include/ndbapi/NdbDictionary.hpp index b31e2551e89..865fb506f05 100644 --- a/storage/ndb/include/ndbapi/NdbDictionary.hpp +++ b/storage/ndb/include/ndbapi/NdbDictionary.hpp @@ -1132,7 +1132,8 @@ public: _TE_NODE_FAILURE=10, _TE_SUBSCRIBE=11, _TE_UNSUBSCRIBE=12, - _TE_NUL=13 // internal (e.g. INS o DEL within same GCI) + _TE_NUL=13, // internal (e.g. INS o DEL within same GCI) + _TE_ACTIVE=14 // internal (node becomes active) }; #endif /** diff --git a/storage/ndb/include/util/SocketServer.hpp b/storage/ndb/include/util/SocketServer.hpp index e766a0b99c4..c4f7e8c0ade 100644 --- a/storage/ndb/include/util/SocketServer.hpp +++ b/storage/ndb/include/util/SocketServer.hpp @@ -106,7 +106,8 @@ public: void stopSessions(bool wait = false); void foreachSession(void (*f)(Session*, void*), void *data); - + void checkSessions(); + private: struct SessionInstance { Service * m_service; @@ -117,12 +118,13 @@ private: Service * m_service; NDB_SOCKET_TYPE m_socket; }; - MutexVector m_sessions; + NdbLockable m_session_mutex; + Vector m_sessions; MutexVector m_services; unsigned m_maxSessions; void doAccept(); - void checkSessions(); + void checkSessionsImpl(); void startSession(SessionInstance &); /** diff --git a/storage/ndb/src/common/util/SocketServer.cpp b/storage/ndb/src/common/util/SocketServer.cpp index f0af925cf6d..f9d2c7463be 100644 --- a/storage/ndb/src/common/util/SocketServer.cpp +++ b/storage/ndb/src/common/util/SocketServer.cpp @@ -184,9 +184,12 @@ SocketServer::doAccept(){ SessionInstance s; s.m_service = si.m_service; s.m_session = si.m_service->newSession(childSock); - if(s.m_session != 0){ + if(s.m_session != 0) + { + m_session_mutex.lock(); m_sessions.push_back(s); startSession(m_sessions.back()); + m_session_mutex.unlock(); } continue; @@ -240,10 +243,13 @@ void SocketServer::doRun(){ while(!m_stopThread){ - checkSessions(); + m_session_mutex.lock(); + checkSessionsImpl(); if(m_sessions.size() < m_maxSessions){ + m_session_mutex.unlock(); doAccept(); } else { + m_session_mutex.unlock(); NdbSleep_MilliSleep(200); } } @@ -276,17 +282,30 @@ transfer(NDB_SOCKET_TYPE sock){ void SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *), void *data) { + m_session_mutex.lock(); for(int i = m_sessions.size() - 1; i >= 0; i--){ (*func)(m_sessions[i].m_session, data); } - checkSessions(); + m_session_mutex.unlock(); } void -SocketServer::checkSessions(){ - for(int i = m_sessions.size() - 1; i >= 0; i--){ - if(m_sessions[i].m_session->m_stopped){ - if(m_sessions[i].m_thread != 0){ +SocketServer::checkSessions() +{ + m_session_mutex.lock(); + checkSessionsImpl(); + m_session_mutex.unlock(); +} + +void +SocketServer::checkSessionsImpl() +{ + for(int i = m_sessions.size() - 1; i >= 0; i--) + { + if(m_sessions[i].m_session->m_stopped) + { + if(m_sessions[i].m_thread != 0) + { void* ret; NdbThread_WaitFor(m_sessions[i].m_thread, &ret); NdbThread_Destroy(&m_sessions[i].m_thread); @@ -301,19 +320,26 @@ SocketServer::checkSessions(){ void SocketServer::stopSessions(bool wait){ int i; + m_session_mutex.lock(); for(i = m_sessions.size() - 1; i>=0; i--) { m_sessions[i].m_session->stopSession(); m_sessions[i].m_session->m_stop = true; // to make sure } + m_session_mutex.unlock(); + for(i = m_services.size() - 1; i>=0; i--) m_services[i].m_service->stopSessions(); if(wait){ + m_session_mutex.lock(); while(m_sessions.size() > 0){ - checkSessions(); + checkSessionsImpl(); + m_session_mutex.unlock(); NdbSleep_MilliSleep(100); + m_session_mutex.lock(); } + m_session_mutex.unlock(); } } @@ -348,4 +374,4 @@ sessionThread_C(void* _sc){ } template class MutexVector; -template class MutexVector; +template class Vector; diff --git a/storage/ndb/src/kernel/blocks/suma/Suma.cpp b/storage/ndb/src/kernel/blocks/suma/Suma.cpp index 867b13e1e40..91f0fab06f8 100644 --- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp +++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp @@ -2649,6 +2649,22 @@ Suma::reportAllSubscribers(Signal *signal, SubscriptionPtr subPtr, SubscriberPtr subbPtr) { + SubTableData * data = (SubTableData*)signal->getDataPtrSend(); + + if (table_event == NdbDictionary::Event::_TE_SUBSCRIBE) + { + data->gci = m_last_complete_gci + 1; + data->tableId = subPtr.p->m_tableId; + data->operation = NdbDictionary::Event::_TE_ACTIVE; + data->ndbd_nodeid = refToNode(reference()); + data->changeMask = 0; + data->totalLen = 0; + data->req_nodeid = refToNode(subbPtr.p->m_senderRef); + data->senderData = subbPtr.p->m_senderData; + sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, + SubTableData::SignalLength, JBB); + } + if (!(subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE)) { return; @@ -2663,7 +2679,6 @@ Suma::reportAllSubscribers(Signal *signal, ndbout_c("reportAllSubscribers subPtr.i: %d subPtr.p->n_subscribers: %d", subPtr.i, subPtr.p->n_subscribers); //#endif - SubTableData * data = (SubTableData*)signal->getDataPtrSend(); data->gci = m_last_complete_gci + 1; data->tableId = subPtr.p->m_tableId; data->operation = table_event; diff --git a/storage/ndb/src/mgmsrv/Services.cpp b/storage/ndb/src/mgmsrv/Services.cpp index be15484688b..d7f58d124aa 100644 --- a/storage/ndb/src/mgmsrv/Services.cpp +++ b/storage/ndb/src/mgmsrv/Services.cpp @@ -502,6 +502,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, ps.tick= tick; m_mgmsrv.get_socket_server()-> foreachSession(stop_session_if_timed_out,&ps); + m_mgmsrv.get_socket_server()->checkSessions(); error_string = ""; continue; } @@ -1559,6 +1560,7 @@ MgmApiSession::purge_stale_sessions(Parser_t::Context &ctx, ps.free_nodes.bitXORC(NodeBitmask()); // invert connected_nodes to get free nodes m_mgmsrv.get_socket_server()->foreachSession(stop_session_if_not_connected,&ps); + m_mgmsrv.get_socket_server()->checkSessions(); m_output->println("purge stale sessions reply"); if (str.length() > 0) diff --git a/storage/ndb/src/ndbapi/Ndb.cpp b/storage/ndb/src/ndbapi/Ndb.cpp index 5cc4875d78f..0b0749c835e 100644 --- a/storage/ndb/src/ndbapi/Ndb.cpp +++ b/storage/ndb/src/ndbapi/Ndb.cpp @@ -1324,6 +1324,12 @@ Ndb::pollEvents(int aMillisecondNumber, Uint64 *latestGCI) return theEventBuffer->pollEvents(aMillisecondNumber, latestGCI); } +int +Ndb::flushIncompleteEvents(Uint64 gci) +{ + return theEventBuffer->flushIncompleteEvents(gci); +} + NdbEventOperation *Ndb::nextEvent() { return theEventBuffer->nextEvent(); diff --git a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp index 5a842d1ecc5..d2d8c43a064 100644 --- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp +++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp @@ -3002,63 +3002,6 @@ NdbDictionaryImpl::removeCachedObject(NdbTableImpl & impl) DBUG_RETURN(0); } -/***************************************************************** - * Get index info - */ -NdbIndexImpl* -NdbDictionaryImpl::getIndexImpl(const char * externalName, - const BaseString& internalName) -{ - ASSERT_NOT_MYSQLD; - Ndb_local_table_info * info = get_local_table_info(internalName); - if(info == 0){ - m_error.code = 4243; - return 0; - } - NdbTableImpl * tab = info->m_table_impl; - - if(tab->m_indexType == NdbDictionary::Object::TypeUndefined) - { - // Not an index - m_error.code = 4243; - return 0; - } - - NdbTableImpl* prim = getTable(tab->m_primaryTable.c_str()); - if(prim == 0){ - m_error.code = 4243; - return 0; - } - - return getIndexImpl(externalName, internalName, *tab, *prim); -} - -NdbIndexImpl* -NdbDictionaryImpl::getIndexImpl(const char * externalName, - const BaseString& internalName, - NdbTableImpl &tab, - NdbTableImpl &prim) -{ - DBUG_ENTER("NdbDictionaryImpl::getIndexImpl"); - DBUG_ASSERT(tab.m_indexType != NdbDictionary::Object::TypeUndefined); - /** - * Create index impl - */ - NdbIndexImpl* idx; - if(NdbDictInterface::create_index_obj_from_table(&idx, &tab, &prim) == 0){ - idx->m_table = &tab; - idx->m_externalName.assign(externalName); - idx->m_internalName.assign(internalName); - idx->m_table_id = prim.getObjectId(); - idx->m_table_version = prim.getObjectVersion(); - // TODO Assign idx to tab->m_index - // Don't do it right now since assign can't asign a table with index - // tab->m_index = idx; - DBUG_RETURN(idx); - } - DBUG_RETURN(0); -} - int NdbDictInterface::create_index_obj_from_table(NdbIndexImpl** dst, NdbTableImpl* tab, @@ -3116,6 +3059,9 @@ NdbDictInterface::create_index_obj_from_table(NdbIndexImpl** dst, tab->m_columns[i]->m_distributionKey = 0; } + idx->m_table_id = prim->getObjectId(); + idx->m_table_version = prim->getObjectVersion(); + * dst = idx; DBUG_PRINT("exit", ("m_id: %d m_version: %d", idx->m_id, idx->m_version)); DBUG_RETURN(0); diff --git a/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp b/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp index 5a7a1ebb0ab..cf30abc6c3f 100644 --- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp +++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp @@ -617,6 +617,7 @@ public: get_local_table_info(const BaseString& internalTableName); NdbIndexImpl * getIndex(const char * indexName, const char * tableName); + NdbIndexImpl * getIndex(const char * indexName, const NdbTableImpl& prim); NdbEventImpl * getEvent(const char * eventName, NdbTableImpl* = NULL); NdbEventImpl * getBlobEvent(const NdbEventImpl& ev, uint col_no); NdbEventImpl * getEventImpl(const char * internalName); @@ -958,51 +959,36 @@ NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName) DBUG_RETURN(info); // autoincrement already initialized } -class InitIndexGlobal : public GlobalCacheInitObject -{ -public: - const char *m_index_name; - NdbTableImpl &m_prim; - - InitIndexGlobal(NdbDictionaryImpl *dict, - const BaseString &internal_indexname, - const char *index_name, - NdbTableImpl &prim) : - GlobalCacheInitObject(dict, internal_indexname), - m_index_name(index_name), - m_prim(prim) - {} - int init(NdbTableImpl &tab) const - { - tab.m_index= m_dict->getIndexImpl(m_index_name, m_name, tab, m_prim); - if (tab.m_index == 0) - return 1; - tab.m_index->m_table= &tab; - return 0; - } -}; - class InitIndex : public GlobalCacheInitObject { public: const char *m_index_name; + const NdbTableImpl &m_prim; - InitIndex(NdbDictionaryImpl *dict, - const BaseString &internal_indexname, - const char *index_name) : - GlobalCacheInitObject(dict, internal_indexname), - m_index_name(index_name) - {} - int init(NdbTableImpl &tab) const - { - DBUG_ASSERT(tab.m_index == 0); - tab.m_index= m_dict->getIndexImpl(m_index_name, m_name); - if (tab.m_index) + InitIndex(const BaseString &internal_indexname, + const char *index_name, + const NdbTableImpl &prim) : + GlobalCacheInitObject(0, internal_indexname), + m_index_name(index_name), + m_prim(prim) + {} + + int init(NdbTableImpl &tab) const { + DBUG_ENTER("InitIndex::init"); + DBUG_ASSERT(tab.m_indexType != NdbDictionary::Object::TypeUndefined); + /** + * Create index impl + */ + NdbIndexImpl* idx; + if(NdbDictInterface::create_index_obj_from_table(&idx, &tab, &m_prim) == 0) { - tab.m_index->m_table= &tab; - return 0; + idx->m_table = &tab; + idx->m_externalName.assign(m_index_name); + idx->m_internalName.assign(m_name); + tab.m_index = idx; + DBUG_RETURN(0); } - return 1; + DBUG_RETURN(1); } }; @@ -1019,14 +1005,14 @@ NdbDictionaryImpl::getIndexGlobal(const char * index_name, while (retry) { NdbTableImpl *tab= - fetchGlobalTableImplRef(InitIndexGlobal(this, internal_indexname, - index_name, ndbtab)); + fetchGlobalTableImplRef(InitIndex(internal_indexname, + index_name, ndbtab)); if (tab) { // tab->m_index sould be set. otherwise tab == 0 NdbIndexImpl *idx= tab->m_index; - if (idx->m_table_id != ndbtab.getObjectId() || - idx->m_table_version != ndbtab.getObjectVersion()) + if (idx->m_table_id != (unsigned)ndbtab.getObjectId() || + idx->m_table_version != (unsigned)ndbtab.getObjectVersion()) { releaseIndexGlobal(*idx, 1); retry--; @@ -1067,41 +1053,54 @@ NdbIndexImpl * NdbDictionaryImpl::getIndex(const char * index_name, const char * table_name) { - while (table_name || m_ndb.usingFullyQualifiedNames()) + if (table_name == 0) { - const BaseString internal_indexname( - (table_name) - ? - m_ndb.internalize_index_name(getTable(table_name), index_name) - : - m_ndb.internalize_table_name(index_name)); // Index is also a table - - if (internal_indexname.length()) - { - Ndb_local_table_info *info= m_localHash.get(internal_indexname.c_str()); - NdbTableImpl *tab; - if (info == 0) - { - tab= fetchGlobalTableImplRef(InitIndex(this, internal_indexname, - index_name)); - if (tab) - { - info= Ndb_local_table_info::create(tab, 0); - if (info) - m_localHash.put(internal_indexname.c_str(), info); - else - break; - } - else - break; - } - else - tab= info->m_table_impl; - return tab->m_index; - } - break; + assert(0); + m_error.code= 4243; + return 0; + } + + + NdbTableImpl* prim = getTable(table_name); + if (prim == 0) + { + m_error.code= 4243; + return 0; } + return getIndex(index_name, *prim); +} + +inline +NdbIndexImpl * +NdbDictionaryImpl::getIndex(const char* index_name, + const NdbTableImpl& prim) +{ + + const BaseString + internal_indexname(m_ndb.internalize_index_name(&prim, index_name)); + + Ndb_local_table_info *info= m_localHash.get(internal_indexname.c_str()); + NdbTableImpl *tab; + if (info == 0) + { + tab= fetchGlobalTableImplRef(InitIndex(internal_indexname, + index_name, + prim)); + if (!tab) + goto err; + + info= Ndb_local_table_info::create(tab, 0); + if (!info) + goto err; + m_localHash.put(internal_indexname.c_str(), info); + } + else + tab= info->m_table_impl; + + return tab->m_index; + +err: m_error.code= 4243; return 0; } diff --git a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp index 628ad5d925f..a5fbd84e5b0 100644 --- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp +++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp @@ -153,11 +153,14 @@ NdbEventOperationImpl::init(NdbEventImpl& evnt) m_state= EO_CREATED; + m_node_bit_mask.clear(); #ifdef ndb_event_stores_merge_events_flag m_mergeEvents = m_eventImpl->m_mergeEvents; #else - m_mergeEvents = false; + m_mergeEvents = false; #endif + m_ref_count = 0; + DBUG_PRINT("info", ("m_ref_count = 0 for op: %p", this)); m_has_error= 0; @@ -530,7 +533,11 @@ NdbEventOperationImpl::execute_nolock() } } if (r == 0) + { + m_ref_count++; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this)); DBUG_RETURN(0); + } } //Error m_state= EO_ERROR; @@ -657,80 +664,79 @@ NdbEventOperationImpl::execSUB_TABLE_DATA(NdbApiSignal * signal, int NdbEventOperationImpl::receive_event() { - DBUG_ENTER_EVENT("NdbEventOperationImpl::receive_event"); - Uint32 operation= (Uint32)m_data_item->sdata->operation; - DBUG_PRINT_EVENT("info",("sdata->operation %u",operation)); - - if (operation == NdbDictionary::Event::_TE_ALTER) - { - // Parse the new table definition and - // create a table object - NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); - NdbDictionaryImpl *dict = & NdbDictionaryImpl::getImpl(*myDict); - NdbError error; - NdbDictInterface dif(error); - NdbTableImpl *at; - m_change_mask = m_data_item->sdata->changeMask; - error.code = dif.parseTableInfo(&at, - (Uint32*)m_buffer.get_data(), - m_buffer.length() / 4, - true); - m_buffer.clear(); - if (at) - at->buildColumnHash(); - else - { - DBUG_PRINT_EVENT("info", ("Failed to parse DictTabInfo error %u", - error.code)); - DBUG_RETURN_EVENT(1); - } - - NdbTableImpl *tmp_table_impl= m_eventImpl->m_tableImpl; - m_eventImpl->m_tableImpl = at; - - DBUG_PRINT("info", ("switching table impl 0x%x -> 0x%x", - tmp_table_impl, at)); - - // change the rec attrs to refer to the new table object - int i; - for (i = 0; i < 2; i++) - { - NdbRecAttr *p = theFirstPkAttrs[i]; - while (p) - { - int no = p->getColumn()->getColumnNo(); - NdbColumnImpl *tAttrInfo = at->getColumn(no); - DBUG_PRINT("info", ("rec_attr: 0x%x " - "switching column impl 0x%x -> 0x%x", - p, p->m_column, tAttrInfo)); - p->m_column = tAttrInfo; - p = p->next(); - } - } - for (i = 0; i < 2; i++) - { - NdbRecAttr *p = theFirstDataAttrs[i]; - while (p) - { - int no = p->getColumn()->getColumnNo(); - NdbColumnImpl *tAttrInfo = at->getColumn(no); - DBUG_PRINT("info", ("rec_attr: 0x%x " - "switching column impl 0x%x -> 0x%x", - p, p->m_column, tAttrInfo)); - p->m_column = tAttrInfo; - p = p->next(); - } - } - if (tmp_table_impl) - delete tmp_table_impl; - } - if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT)) { - DBUG_RETURN_EVENT(1); + DBUG_ENTER("NdbEventOperationImpl::receive_event"); + DBUG_PRINT("info",("sdata->operation %u this: %p", operation, this)); + if (operation == NdbDictionary::Event::_TE_ALTER) + { + // Parse the new table definition and + // create a table object + NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); + NdbDictionaryImpl *dict = & NdbDictionaryImpl::getImpl(*myDict); + NdbError error; + NdbDictInterface dif(error); + NdbTableImpl *at; + m_change_mask = m_data_item->sdata->changeMask; + error.code = dif.parseTableInfo(&at, + (Uint32*)m_buffer.get_data(), + m_buffer.length() / 4, + true); + m_buffer.clear(); + if (unlikely(!at)) + { + DBUG_PRINT("info", ("Failed to parse DictTabInfo error %u", + error.code)); + ndbout_c("Failed to parse DictTabInfo error %u", error.code); + DBUG_RETURN(1); + } + at->buildColumnHash(); + + NdbTableImpl *tmp_table_impl= m_eventImpl->m_tableImpl; + m_eventImpl->m_tableImpl = at; + + DBUG_PRINT("info", ("switching table impl 0x%x -> 0x%x", + tmp_table_impl, at)); + + // change the rec attrs to refer to the new table object + int i; + for (i = 0; i < 2; i++) + { + NdbRecAttr *p = theFirstPkAttrs[i]; + while (p) + { + int no = p->getColumn()->getColumnNo(); + NdbColumnImpl *tAttrInfo = at->getColumn(no); + DBUG_PRINT("info", ("rec_attr: 0x%x " + "switching column impl 0x%x -> 0x%x", + p, p->m_column, tAttrInfo)); + p->m_column = tAttrInfo; + p = p->next(); + } + } + for (i = 0; i < 2; i++) + { + NdbRecAttr *p = theFirstDataAttrs[i]; + while (p) + { + int no = p->getColumn()->getColumnNo(); + NdbColumnImpl *tAttrInfo = at->getColumn(no); + DBUG_PRINT("info", ("rec_attr: 0x%x " + "switching column impl 0x%x -> 0x%x", + p, p->m_column, tAttrInfo)); + p->m_column = tAttrInfo; + p = p->next(); + } + } + if (tmp_table_impl) + delete tmp_table_impl; + } + DBUG_RETURN(1); } + DBUG_ENTER_EVENT("NdbEventOperationImpl::receive_event"); + DBUG_PRINT_EVENT("info",("sdata->operation %u this: %p", operation, this)); // now move the data into the RecAttrs int is_update= operation == NdbDictionary::Event::_TE_UPDATE; @@ -1089,6 +1095,33 @@ NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *latestGCI) return ret; } +int +NdbEventBuffer::flushIncompleteEvents(Uint64 gci) +{ + /** + * Find min complete gci + */ + Uint32 i; + Uint32 sz= m_active_gci.size(); + Gci_container* array = (Gci_container*)m_active_gci.getBase(); + for(i = 0; i < sz; i++) + { + Gci_container* tmp = array + i; + if (tmp->m_gci && tmp->m_gci < gci) + { + // we have found an old not-completed gci, remove it + ndbout_c("ndb: flushing incomplete epoch %lld (<%lld)", tmp->m_gci, gci); + if(!tmp->m_data.is_empty()) + { + free_list(tmp->m_data); + } + tmp->~Gci_container(); + bzero(tmp, sizeof(Gci_container)); + } + } + return 0; +} + NdbEventOperation * NdbEventBuffer::nextEvent() { @@ -1157,7 +1190,10 @@ NdbEventBuffer::nextEvent() } EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops(); while (gci_ops && op->getGCI() > gci_ops->m_gci) + { + deleteUsedEventOperations(); gci_ops = m_available_data.next_gci_ops(); + } assert(gci_ops && (op->getGCI() == gci_ops->m_gci)); DBUG_RETURN_EVENT(op->m_facade); } @@ -1177,7 +1213,10 @@ NdbEventBuffer::nextEvent() // free all "per gci unique" collected operations EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops(); while (gci_ops) + { + deleteUsedEventOperations(); gci_ops = m_available_data.next_gci_ops(); + } DBUG_RETURN_EVENT(0); } @@ -1191,31 +1230,37 @@ NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types) EventBufData_list::Gci_op g = gci_ops->m_gci_op_list[(*iter)++]; if (event_types != NULL) *event_types = g.event_types; - DBUG_PRINT("info", ("gci: %d", (unsigned)gci_ops->m_gci)); + DBUG_PRINT("info", ("gci: %d g.op: %x g.event_types: %x", + (unsigned)gci_ops->m_gci, g.op, g.event_types)); DBUG_RETURN(g.op); } DBUG_RETURN(NULL); } void -NdbEventBuffer::lock() +NdbEventBuffer::deleteUsedEventOperations() { - NdbMutex_Lock(m_mutex); -} -void -NdbEventBuffer::unlock() -{ - NdbMutex_Unlock(m_mutex); -} -void -NdbEventBuffer::add_drop_lock() -{ - NdbMutex_Lock(p_add_drop_mutex); -} -void -NdbEventBuffer::add_drop_unlock() -{ - NdbMutex_Unlock(p_add_drop_mutex); + Uint32 iter= 0; + const NdbEventOperation *op_f; + while ((op_f= getGCIEventOperations(&iter, NULL)) != NULL) + { + NdbEventOperationImpl *op = &op_f->m_impl; + DBUG_ASSERT(op->m_ref_count > 0); + op->m_ref_count--; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op)); + if (op->m_ref_count == 0) + { + DBUG_PRINT("info", ("deleting op: %p", op)); + DBUG_ASSERT(op->m_node_bit_mask.isclear()); + if (op->m_next) + op->m_next->m_prev = op->m_prev; + if (op->m_prev) + op->m_prev->m_next = op->m_next; + else + m_dropped_ev_op = op->m_next; + delete op->m_facade; + } + } } static @@ -1469,6 +1514,10 @@ NdbEventBuffer::complete_outof_order_gcis() void NdbEventBuffer::report_node_failure(Uint32 node_id) { + NdbEventOperation* op= m_ndb->getEventOperation(0); + if (op == 0) + return; + DBUG_ENTER("NdbEventBuffer::report_node_failure"); SubTableData data; LinearSectionPtr ptr[3]; @@ -1484,12 +1533,20 @@ NdbEventBuffer::report_node_failure(Uint32 node_id) /** * Insert this event for each operation */ - NdbEventOperation* op= 0; - while((op = m_ndb->getEventOperation(op))) { - NdbEventOperationImpl* impl= &op->m_impl; - data.senderData = impl->m_oid; - insertDataL(impl, &data, ptr); + // no need to lock()/unlock(), receive thread calls this + NdbEventOperationImpl* impl = &op->m_impl; + do if (!impl->m_node_bit_mask.isclear()) + { + data.senderData = impl->m_oid; + insertDataL(impl, &data, ptr); + } while((impl = impl->m_next)); + for (impl = m_dropped_ev_op; impl; impl = impl->m_next) + if (!impl->m_node_bit_mask.isclear()) + { + data.senderData = impl->m_oid; + insertDataL(impl, &data, ptr); + } } DBUG_VOID_RETURN; } @@ -1515,12 +1572,21 @@ NdbEventBuffer::completeClusterFailed() /** * Insert this event for each operation */ - do { - NdbEventOperationImpl* impl= &op->m_impl; - data.senderData = impl->m_oid; - insertDataL(impl, &data, ptr); - } while((op = m_ndb->getEventOperation(op))); + // no need to lock()/unlock(), receive thread calls this + NdbEventOperationImpl* impl = &op->m_impl; + do if (!impl->m_node_bit_mask.isclear()) + { + data.senderData = impl->m_oid; + insertDataL(impl, &data, ptr); + } while((impl = impl->m_next)); + for (impl = m_dropped_ev_op; impl; impl = impl->m_next) + if (!impl->m_node_bit_mask.isclear()) + { + data.senderData = impl->m_oid; + insertDataL(impl, &data, ptr); + } + } /** * Release all GCI's with m_gci > gci @@ -1565,7 +1631,11 @@ NdbEventBuffer::completeClusterFailed() } } - assert(bucket != 0); + if (bucket == 0) + { + // no bucket to complete + DBUG_VOID_RETURN; + } const Uint32 cnt= bucket->m_gcp_complete_rep_count = 1; bucket->m_gci = gci; @@ -1595,6 +1665,40 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, { DBUG_ENTER_EVENT("NdbEventBuffer::insertDataL"); Uint64 gci= sdata->gci; + const bool is_data_event = + sdata->operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT; + + if (!is_data_event) + { + switch (sdata->operation) + { + case NdbDictionary::Event::_TE_NODE_FAILURE: + op->m_node_bit_mask.clear(sdata->ndbd_nodeid); + break; + case NdbDictionary::Event::_TE_ACTIVE: + op->m_node_bit_mask.set(sdata->ndbd_nodeid); + // internal event, do not relay to user + DBUG_RETURN_EVENT(0); + break; + case NdbDictionary::Event::_TE_CLUSTER_FAILURE: + op->m_node_bit_mask.clear(); + DBUG_ASSERT(op->m_ref_count > 0); + op->m_ref_count--; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op)); + break; + case NdbDictionary::Event::_TE_STOP: + op->m_node_bit_mask.clear(sdata->ndbd_nodeid); + if (op->m_node_bit_mask.isclear()) + { + DBUG_ASSERT(op->m_ref_count > 0); + op->m_ref_count--; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op)); + } + break; + default: + break; + } + } if ( likely((Uint32)op->mi_type & (1 << (Uint32)sdata->operation)) ) { @@ -1615,8 +1719,6 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, } const bool is_blob_event = (op->theMainOp != NULL); - const bool is_data_event = - sdata->operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT; const bool use_hash = op->m_mergeEvents && is_data_event; if (! is_data_event && is_blob_event) @@ -2244,6 +2346,8 @@ void EventBufData_list::append_list(EventBufData_list *list, Uint64 gci) void EventBufData_list::add_gci_op(Gci_op g, bool del) { + DBUG_ENTER_EVENT("EventBufData_list::add_gci_op"); + DBUG_PRINT_EVENT("info", ("p.op: %p g.event_types: %x", g.op, g.event_types)); assert(g.op != NULL); Uint32 i; for (i = 0; i < m_gci_op_count; i++) { @@ -2273,8 +2377,15 @@ EventBufData_list::add_gci_op(Gci_op g, bool del) } assert(m_gci_op_count < m_gci_op_alloc); assert(! del); +#ifndef DBUG_OFF + i = m_gci_op_count; +#endif + g.op->m_ref_count++; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", g.op->m_ref_count, g.op)); m_gci_op_list[m_gci_op_count++] = g; } + DBUG_PRINT_EVENT("exit", ("m_gci_op_list[%u].event_types: %x", i, m_gci_op_list[i].event_types)); + DBUG_VOID_RETURN_EVENT; } void @@ -2337,6 +2448,9 @@ NdbEventBuffer::createEventOperation(const char* eventName, delete tOp; DBUG_RETURN(NULL); } + getEventOperationImpl(tOp)->m_ref_count = 1; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", + getEventOperationImpl(tOp)->m_ref_count, getEventOperationImpl(tOp))); DBUG_RETURN(tOp); } @@ -2362,16 +2476,10 @@ NdbEventBuffer::createEventOperation(NdbEventImpl& evnt, void NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp) { + DBUG_ENTER("NdbEventBuffer::dropEventOperation"); NdbEventOperationImpl* op= getEventOperationImpl(tOp); op->stop(); - - op->m_next= m_dropped_ev_op; - op->m_prev= 0; - if (m_dropped_ev_op) - m_dropped_ev_op->m_prev= op; - m_dropped_ev_op= op; - // stop blob event ops if (op->theMainOp == NULL) { @@ -2391,11 +2499,24 @@ NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp) } } - // ToDo, take care of these to be deleted at the - // appropriate time, after we are sure that there - // are _no_ more events coming - - // delete tOp; + DBUG_ASSERT(op->m_ref_count > 0); + op->m_ref_count--; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op)); + if (op->m_ref_count == 0) + { + DBUG_PRINT("info", ("deleting op: %p", op)); + DBUG_ASSERT(op->m_node_bit_mask.isclear()); + delete op->m_facade; + } + else + { + op->m_next= m_dropped_ev_op; + op->m_prev= 0; + if (m_dropped_ev_op) + m_dropped_ev_op->m_prev= op; + m_dropped_ev_op= op; + } + DBUG_VOID_RETURN; } void diff --git a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp index 70b3ce6b8de..bcae650bf44 100644 --- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp +++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp @@ -367,6 +367,8 @@ public: Uint32 m_eventId; Uint32 m_oid; + Bitmask<(unsigned int)_NDB_NODE_BITMASK_SIZE> m_node_bit_mask; + int m_ref_count; bool m_mergeEvents; EventBufData *m_data_item; @@ -406,10 +408,10 @@ public: void dropEventOperation(NdbEventOperation *); static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp); - void add_drop_lock(); - void add_drop_unlock(); - void lock(); - void unlock(); + void add_drop_lock() { NdbMutex_Lock(p_add_drop_mutex); } + void add_drop_unlock() { NdbMutex_Unlock(p_add_drop_mutex); } + void lock() { NdbMutex_Lock(m_mutex); } + void unlock() { NdbMutex_Unlock(m_mutex); } void add_op(); void remove_op(); @@ -430,9 +432,11 @@ public: Uint32 getEventId(int bufferId); int pollEvents(int aMillisecondNumber, Uint64 *latestGCI= 0); + int flushIncompleteEvents(Uint64 gci); NdbEventOperation *nextEvent(); NdbEventOperationImpl* getGCIEventOperations(Uint32* iter, Uint32* event_types); + void deleteUsedEventOperations(); NdbEventOperationImpl *move_data();