diff --git a/mysql-test/r/ndb_autodiscover3.result b/mysql-test/r/ndb_autodiscover3.result index 6e3e5fcc335..86495ebb3eb 100644 --- a/mysql-test/r/ndb_autodiscover3.result +++ b/mysql-test/r/ndb_autodiscover3.result @@ -18,6 +18,7 @@ select * from t2; ERROR 42S02: Table 'test.t2' doesn't exist show tables like 't2'; Tables_in_test (t2) +reset master; create table t2 (a int key) engine=ndbcluster; insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10); select * from t2 order by a limit 3; @@ -30,10 +31,12 @@ a 1 2 3 +reset master; select * from t2; ERROR 42S02: Table 'test.t2' doesn't exist show tables like 't2'; Tables_in_test (t2) +reset master; create table t2 (a int key) engine=ndbcluster; insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10); select * from t2 order by a limit 3; @@ -46,4 +49,5 @@ a 1 2 3 +reset master; drop table t2; diff --git a/mysql-test/t/disabled.def b/mysql-test/t/disabled.def index 746a117ccd6..562006c7687 100644 --- a/mysql-test/t/disabled.def +++ b/mysql-test/t/disabled.def @@ -16,7 +16,7 @@ events_scheduling : BUG#19170 2006-04-26 andrey Test case of 19170 fails events_logs_tests : BUG#17619 2006-05-16 andrey Test case problems ndb_autodiscover : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog ndb_autodiscover2 : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog -ndb_binlog_discover : BUG#19395 2006-04-28 tomas/knielsen mysqld does not always detect cluster shutdown +#ndb_binlog_discover : BUG#19395 2006-04-28 tomas/knielsen mysqld does not always detect cluster shutdown #ndb_cache2 : BUG#18597 2006-03-28 brian simultaneous drop table and ndb statistics update triggers node failure #ndb_cache_multi2 : BUG#18597 2006-04-10 kent simultaneous drop table and ndb statistics update triggers node failure ndb_load : BUG#17233 2006-05-04 tomas failed load data from infile causes mysqld dbug_assert, binlog not flushed diff --git a/mysql-test/t/ndb_autodiscover3.test b/mysql-test/t/ndb_autodiscover3.test index afbebc4dd03..ed75c89cdd1 100644 --- a/mysql-test/t/ndb_autodiscover3.test +++ b/mysql-test/t/ndb_autodiscover3.test @@ -43,6 +43,7 @@ select * from t2 order by a limit 3; --error ER_NO_SUCH_TABLE select * from t2; show tables like 't2'; +reset master; create table t2 (a int key) engine=ndbcluster; insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10); select * from t2 order by a limit 3; @@ -50,6 +51,7 @@ select * from t2 order by a limit 3; # server 1 should have a stale cache, and in this case wrong frm, transaction must be retried --connection server1 select * from t2 order by a limit 3; +reset master; --exec $NDB_MGM --no-defaults -e "all restart -i" >> $NDB_TOOLS_OUTPUT --exec $NDB_TOOLS_DIR/ndb_waiter --no-defaults >> $NDB_TOOLS_OUTPUT @@ -60,6 +62,7 @@ select * from t2 order by a limit 3; --error ER_NO_SUCH_TABLE select * from t2; show tables like 't2'; +reset master; create table t2 (a int key) engine=ndbcluster; insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10); select * from t2 order by a limit 3; @@ -67,6 +70,7 @@ select * from t2 order by a limit 3; # server 2 should have a stale cache, but with right frm, transaction need not be retried --connection server2 select * from t2 order by a limit 3; +reset master; drop table t2; # End of 4.1 tests diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index bae2a3cbd9f..9cac897d965 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -3321,6 +3321,9 @@ restart: DBUG_PRINT("info", ("schema_res: %d schema_gci: %d", schema_res, schema_gci)); if (schema_res > 0) { + i_ndb->pollEvents(0); + i_ndb->flushIncompleteEvents(schema_gci); + s_ndb->flushIncompleteEvents(schema_gci); if (schema_gci < ndb_latest_handled_binlog_epoch) { sql_print_error("NDB Binlog: cluster has been restarted --initial or with older filesystem. " diff --git a/storage/ndb/include/ndbapi/Ndb.hpp b/storage/ndb/include/ndbapi/Ndb.hpp index 010b85b03a9..f6f313e9224 100644 --- a/storage/ndb/include/ndbapi/Ndb.hpp +++ b/storage/ndb/include/ndbapi/Ndb.hpp @@ -1262,6 +1262,7 @@ public: #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL + int flushIncompleteEvents(Uint64 gci); NdbEventOperation *getEventOperation(NdbEventOperation* eventOp= 0); Uint64 getLatestGCI(); void forceGCP(); diff --git a/storage/ndb/include/ndbapi/NdbDictionary.hpp b/storage/ndb/include/ndbapi/NdbDictionary.hpp index b31e2551e89..865fb506f05 100644 --- a/storage/ndb/include/ndbapi/NdbDictionary.hpp +++ b/storage/ndb/include/ndbapi/NdbDictionary.hpp @@ -1132,7 +1132,8 @@ public: _TE_NODE_FAILURE=10, _TE_SUBSCRIBE=11, _TE_UNSUBSCRIBE=12, - _TE_NUL=13 // internal (e.g. INS o DEL within same GCI) + _TE_NUL=13, // internal (e.g. INS o DEL within same GCI) + _TE_ACTIVE=14 // internal (node becomes active) }; #endif /** diff --git a/storage/ndb/src/kernel/blocks/suma/Suma.cpp b/storage/ndb/src/kernel/blocks/suma/Suma.cpp index 867b13e1e40..91f0fab06f8 100644 --- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp +++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp @@ -2649,6 +2649,22 @@ Suma::reportAllSubscribers(Signal *signal, SubscriptionPtr subPtr, SubscriberPtr subbPtr) { + SubTableData * data = (SubTableData*)signal->getDataPtrSend(); + + if (table_event == NdbDictionary::Event::_TE_SUBSCRIBE) + { + data->gci = m_last_complete_gci + 1; + data->tableId = subPtr.p->m_tableId; + data->operation = NdbDictionary::Event::_TE_ACTIVE; + data->ndbd_nodeid = refToNode(reference()); + data->changeMask = 0; + data->totalLen = 0; + data->req_nodeid = refToNode(subbPtr.p->m_senderRef); + data->senderData = subbPtr.p->m_senderData; + sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, + SubTableData::SignalLength, JBB); + } + if (!(subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE)) { return; @@ -2663,7 +2679,6 @@ Suma::reportAllSubscribers(Signal *signal, ndbout_c("reportAllSubscribers subPtr.i: %d subPtr.p->n_subscribers: %d", subPtr.i, subPtr.p->n_subscribers); //#endif - SubTableData * data = (SubTableData*)signal->getDataPtrSend(); data->gci = m_last_complete_gci + 1; data->tableId = subPtr.p->m_tableId; data->operation = table_event; diff --git a/storage/ndb/src/ndbapi/Ndb.cpp b/storage/ndb/src/ndbapi/Ndb.cpp index 5cc4875d78f..0b0749c835e 100644 --- a/storage/ndb/src/ndbapi/Ndb.cpp +++ b/storage/ndb/src/ndbapi/Ndb.cpp @@ -1324,6 +1324,12 @@ Ndb::pollEvents(int aMillisecondNumber, Uint64 *latestGCI) return theEventBuffer->pollEvents(aMillisecondNumber, latestGCI); } +int +Ndb::flushIncompleteEvents(Uint64 gci) +{ + return theEventBuffer->flushIncompleteEvents(gci); +} + NdbEventOperation *Ndb::nextEvent() { return theEventBuffer->nextEvent(); diff --git a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp index 628ad5d925f..08c9af33872 100644 --- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp +++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp @@ -153,11 +153,14 @@ NdbEventOperationImpl::init(NdbEventImpl& evnt) m_state= EO_CREATED; + m_node_bit_mask.clear(); #ifdef ndb_event_stores_merge_events_flag m_mergeEvents = m_eventImpl->m_mergeEvents; #else - m_mergeEvents = false; + m_mergeEvents = false; #endif + m_ref_count = 0; + DBUG_PRINT("info", ("m_ref_count = 0 for op: %p", this)); m_has_error= 0; @@ -530,7 +533,11 @@ NdbEventOperationImpl::execute_nolock() } } if (r == 0) + { + m_ref_count++; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this)); DBUG_RETURN(0); + } } //Error m_state= EO_ERROR; @@ -657,80 +664,79 @@ NdbEventOperationImpl::execSUB_TABLE_DATA(NdbApiSignal * signal, int NdbEventOperationImpl::receive_event() { - DBUG_ENTER_EVENT("NdbEventOperationImpl::receive_event"); - Uint32 operation= (Uint32)m_data_item->sdata->operation; - DBUG_PRINT_EVENT("info",("sdata->operation %u",operation)); - - if (operation == NdbDictionary::Event::_TE_ALTER) - { - // Parse the new table definition and - // create a table object - NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); - NdbDictionaryImpl *dict = & NdbDictionaryImpl::getImpl(*myDict); - NdbError error; - NdbDictInterface dif(error); - NdbTableImpl *at; - m_change_mask = m_data_item->sdata->changeMask; - error.code = dif.parseTableInfo(&at, - (Uint32*)m_buffer.get_data(), - m_buffer.length() / 4, - true); - m_buffer.clear(); - if (at) - at->buildColumnHash(); - else - { - DBUG_PRINT_EVENT("info", ("Failed to parse DictTabInfo error %u", - error.code)); - DBUG_RETURN_EVENT(1); - } - - NdbTableImpl *tmp_table_impl= m_eventImpl->m_tableImpl; - m_eventImpl->m_tableImpl = at; - - DBUG_PRINT("info", ("switching table impl 0x%x -> 0x%x", - tmp_table_impl, at)); - - // change the rec attrs to refer to the new table object - int i; - for (i = 0; i < 2; i++) - { - NdbRecAttr *p = theFirstPkAttrs[i]; - while (p) - { - int no = p->getColumn()->getColumnNo(); - NdbColumnImpl *tAttrInfo = at->getColumn(no); - DBUG_PRINT("info", ("rec_attr: 0x%x " - "switching column impl 0x%x -> 0x%x", - p, p->m_column, tAttrInfo)); - p->m_column = tAttrInfo; - p = p->next(); - } - } - for (i = 0; i < 2; i++) - { - NdbRecAttr *p = theFirstDataAttrs[i]; - while (p) - { - int no = p->getColumn()->getColumnNo(); - NdbColumnImpl *tAttrInfo = at->getColumn(no); - DBUG_PRINT("info", ("rec_attr: 0x%x " - "switching column impl 0x%x -> 0x%x", - p, p->m_column, tAttrInfo)); - p->m_column = tAttrInfo; - p = p->next(); - } - } - if (tmp_table_impl) - delete tmp_table_impl; - } - if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT)) { - DBUG_RETURN_EVENT(1); + DBUG_ENTER("NdbEventOperationImpl::receive_event"); + DBUG_PRINT("info",("sdata->operation %u this: %p", operation, this)); + if (operation == NdbDictionary::Event::_TE_ALTER) + { + // Parse the new table definition and + // create a table object + NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); + NdbDictionaryImpl *dict = & NdbDictionaryImpl::getImpl(*myDict); + NdbError error; + NdbDictInterface dif(error); + NdbTableImpl *at; + m_change_mask = m_data_item->sdata->changeMask; + error.code = dif.parseTableInfo(&at, + (Uint32*)m_buffer.get_data(), + m_buffer.length() / 4, + true); + m_buffer.clear(); + if (unlikely(!at)) + { + DBUG_PRINT("info", ("Failed to parse DictTabInfo error %u", + error.code)); + ndbout_c("Failed to parse DictTabInfo error %u", error.code); + DBUG_RETURN(1); + } + at->buildColumnHash(); + + NdbTableImpl *tmp_table_impl= m_eventImpl->m_tableImpl; + m_eventImpl->m_tableImpl = at; + + DBUG_PRINT("info", ("switching table impl 0x%x -> 0x%x", + tmp_table_impl, at)); + + // change the rec attrs to refer to the new table object + int i; + for (i = 0; i < 2; i++) + { + NdbRecAttr *p = theFirstPkAttrs[i]; + while (p) + { + int no = p->getColumn()->getColumnNo(); + NdbColumnImpl *tAttrInfo = at->getColumn(no); + DBUG_PRINT("info", ("rec_attr: 0x%x " + "switching column impl 0x%x -> 0x%x", + p, p->m_column, tAttrInfo)); + p->m_column = tAttrInfo; + p = p->next(); + } + } + for (i = 0; i < 2; i++) + { + NdbRecAttr *p = theFirstDataAttrs[i]; + while (p) + { + int no = p->getColumn()->getColumnNo(); + NdbColumnImpl *tAttrInfo = at->getColumn(no); + DBUG_PRINT("info", ("rec_attr: 0x%x " + "switching column impl 0x%x -> 0x%x", + p, p->m_column, tAttrInfo)); + p->m_column = tAttrInfo; + p = p->next(); + } + } + if (tmp_table_impl) + delete tmp_table_impl; + } + DBUG_RETURN(1); } + DBUG_ENTER_EVENT("NdbEventOperationImpl::receive_event"); + DBUG_PRINT_EVENT("info",("sdata->operation %u this: %p", operation, this)); // now move the data into the RecAttrs int is_update= operation == NdbDictionary::Event::_TE_UPDATE; @@ -1089,6 +1095,32 @@ NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *latestGCI) return ret; } +int +NdbEventBuffer::flushIncompleteEvents(Uint64 gci) +{ + /** + * Find min complete gci + */ + Uint32 i; + Uint32 sz= m_active_gci.size(); + Gci_container* array = (Gci_container*)m_active_gci.getBase(); + for(i = 0; i < sz; i++) + { + Gci_container* tmp = array + i; + if (tmp->m_gci < gci) + { + // we have found an old not-completed gci, remove it + if(!tmp->m_data.is_empty()) + { + free_list(tmp->m_data); + } + tmp->~Gci_container(); + bzero(tmp, sizeof(Gci_container)); + } + } + return 0; +} + NdbEventOperation * NdbEventBuffer::nextEvent() { @@ -1157,7 +1189,10 @@ NdbEventBuffer::nextEvent() } EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops(); while (gci_ops && op->getGCI() > gci_ops->m_gci) + { + deleteUsedEventOperations(); gci_ops = m_available_data.next_gci_ops(); + } assert(gci_ops && (op->getGCI() == gci_ops->m_gci)); DBUG_RETURN_EVENT(op->m_facade); } @@ -1177,7 +1212,10 @@ NdbEventBuffer::nextEvent() // free all "per gci unique" collected operations EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops(); while (gci_ops) + { + deleteUsedEventOperations(); gci_ops = m_available_data.next_gci_ops(); + } DBUG_RETURN_EVENT(0); } @@ -1191,31 +1229,38 @@ NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types) EventBufData_list::Gci_op g = gci_ops->m_gci_op_list[(*iter)++]; if (event_types != NULL) *event_types = g.event_types; - DBUG_PRINT("info", ("gci: %d", (unsigned)gci_ops->m_gci)); + DBUG_PRINT("info", ("gci: %d g.op: %x g.event_types: %x", + (unsigned)gci_ops->m_gci, g.op, g.event_types)); DBUG_RETURN(g.op); } DBUG_RETURN(NULL); } void -NdbEventBuffer::lock() +NdbEventBuffer::deleteUsedEventOperations() { - NdbMutex_Lock(m_mutex); -} -void -NdbEventBuffer::unlock() -{ - NdbMutex_Unlock(m_mutex); -} -void -NdbEventBuffer::add_drop_lock() -{ - NdbMutex_Lock(p_add_drop_mutex); -} -void -NdbEventBuffer::add_drop_unlock() -{ - NdbMutex_Unlock(p_add_drop_mutex); + Uint32 iter= 0; + const NdbEventOperation *op_f; + while ((op_f= getGCIEventOperations(&iter, NULL)) != NULL) + { + NdbEventOperationImpl *op = &op_f->m_impl; + DBUG_ASSERT(op->m_ref_count > 0); + op->m_ref_count--; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op)); + if (op->m_ref_count == 0) + { + DBUG_PRINT("info", ("deleting op: %p", op)); + DBUG_ASSERT(op->m_node_bit_mask.isclear()); + if (op->m_next) + op->m_next->m_prev = op->m_prev; + if (op->m_prev) + op->m_prev->m_next = op->m_next; + else + m_dropped_ev_op = op->m_next; + ndbout_c("deleting NdbEventOperation %p", op->m_facade); + delete op->m_facade; + } + } } static @@ -1469,6 +1514,10 @@ NdbEventBuffer::complete_outof_order_gcis() void NdbEventBuffer::report_node_failure(Uint32 node_id) { + NdbEventOperation* op= m_ndb->getEventOperation(0); + if (op == 0) + return; + DBUG_ENTER("NdbEventBuffer::report_node_failure"); SubTableData data; LinearSectionPtr ptr[3]; @@ -1484,12 +1533,20 @@ NdbEventBuffer::report_node_failure(Uint32 node_id) /** * Insert this event for each operation */ - NdbEventOperation* op= 0; - while((op = m_ndb->getEventOperation(op))) { - NdbEventOperationImpl* impl= &op->m_impl; - data.senderData = impl->m_oid; - insertDataL(impl, &data, ptr); + // no need to lock()/unlock(), receive thread calls this + NdbEventOperationImpl* impl = &op->m_impl; + do if (!impl->m_node_bit_mask.isclear()) + { + data.senderData = impl->m_oid; + insertDataL(impl, &data, ptr); + } while((impl = impl->m_next)); + for (impl = m_dropped_ev_op; impl; impl = impl->m_next) + if (!impl->m_node_bit_mask.isclear()) + { + data.senderData = impl->m_oid; + insertDataL(impl, &data, ptr); + } } DBUG_VOID_RETURN; } @@ -1515,12 +1572,21 @@ NdbEventBuffer::completeClusterFailed() /** * Insert this event for each operation */ - do { - NdbEventOperationImpl* impl= &op->m_impl; - data.senderData = impl->m_oid; - insertDataL(impl, &data, ptr); - } while((op = m_ndb->getEventOperation(op))); + // no need to lock()/unlock(), receive thread calls this + NdbEventOperationImpl* impl = &op->m_impl; + do if (!impl->m_node_bit_mask.isclear()) + { + data.senderData = impl->m_oid; + insertDataL(impl, &data, ptr); + } while((impl = impl->m_next)); + for (impl = m_dropped_ev_op; impl; impl = impl->m_next) + if (!impl->m_node_bit_mask.isclear()) + { + data.senderData = impl->m_oid; + insertDataL(impl, &data, ptr); + } + } /** * Release all GCI's with m_gci > gci @@ -1565,7 +1631,11 @@ NdbEventBuffer::completeClusterFailed() } } - assert(bucket != 0); + if (bucket == 0) + { + // no bucket to complete + DBUG_VOID_RETURN; + } const Uint32 cnt= bucket->m_gcp_complete_rep_count = 1; bucket->m_gci = gci; @@ -1595,6 +1665,40 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, { DBUG_ENTER_EVENT("NdbEventBuffer::insertDataL"); Uint64 gci= sdata->gci; + const bool is_data_event = + sdata->operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT; + + if (!is_data_event) + { + switch (sdata->operation) + { + case NdbDictionary::Event::_TE_NODE_FAILURE: + op->m_node_bit_mask.clear(sdata->ndbd_nodeid); + break; + case NdbDictionary::Event::_TE_ACTIVE: + op->m_node_bit_mask.set(sdata->ndbd_nodeid); + // internal event, do not relay to user + DBUG_RETURN_EVENT(0); + break; + case NdbDictionary::Event::_TE_CLUSTER_FAILURE: + op->m_node_bit_mask.clear(); + DBUG_ASSERT(op->m_ref_count > 0); + op->m_ref_count--; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op)); + break; + case NdbDictionary::Event::_TE_STOP: + op->m_node_bit_mask.clear(sdata->ndbd_nodeid); + if (op->m_node_bit_mask.isclear()) + { + DBUG_ASSERT(op->m_ref_count > 0); + op->m_ref_count--; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op)); + } + break; + default: + break; + } + } if ( likely((Uint32)op->mi_type & (1 << (Uint32)sdata->operation)) ) { @@ -1615,8 +1719,6 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, } const bool is_blob_event = (op->theMainOp != NULL); - const bool is_data_event = - sdata->operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT; const bool use_hash = op->m_mergeEvents && is_data_event; if (! is_data_event && is_blob_event) @@ -2244,6 +2346,8 @@ void EventBufData_list::append_list(EventBufData_list *list, Uint64 gci) void EventBufData_list::add_gci_op(Gci_op g, bool del) { + DBUG_ENTER_EVENT("EventBufData_list::add_gci_op"); + DBUG_PRINT_EVENT("info", ("p.op: %p g.event_types: %x", g.op, g.event_types)); assert(g.op != NULL); Uint32 i; for (i = 0; i < m_gci_op_count; i++) { @@ -2273,8 +2377,15 @@ EventBufData_list::add_gci_op(Gci_op g, bool del) } assert(m_gci_op_count < m_gci_op_alloc); assert(! del); +#ifndef DBUG_OFF + i = m_gci_op_count; +#endif + g.op->m_ref_count++; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", g.op->m_ref_count, g.op)); m_gci_op_list[m_gci_op_count++] = g; } + DBUG_PRINT_EVENT("exit", ("m_gci_op_list[%u].event_types: %x", i, m_gci_op_list[i].event_types)); + DBUG_VOID_RETURN_EVENT; } void @@ -2337,6 +2448,9 @@ NdbEventBuffer::createEventOperation(const char* eventName, delete tOp; DBUG_RETURN(NULL); } + getEventOperationImpl(tOp)->m_ref_count = 1; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", + getEventOperationImpl(tOp)->m_ref_count, getEventOperationImpl(tOp))); DBUG_RETURN(tOp); } @@ -2362,16 +2476,10 @@ NdbEventBuffer::createEventOperation(NdbEventImpl& evnt, void NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp) { + DBUG_ENTER("NdbEventBuffer::dropEventOperation"); NdbEventOperationImpl* op= getEventOperationImpl(tOp); op->stop(); - - op->m_next= m_dropped_ev_op; - op->m_prev= 0; - if (m_dropped_ev_op) - m_dropped_ev_op->m_prev= op; - m_dropped_ev_op= op; - // stop blob event ops if (op->theMainOp == NULL) { @@ -2391,11 +2499,25 @@ NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp) } } - // ToDo, take care of these to be deleted at the - // appropriate time, after we are sure that there - // are _no_ more events coming - - // delete tOp; + DBUG_ASSERT(op->m_ref_count > 0); + op->m_ref_count--; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op)); + if (op->m_ref_count == 0) + { + DBUG_PRINT("info", ("deleting op: %p", op)); + DBUG_ASSERT(op->m_node_bit_mask.isclear()); + ndbout_c("deleting NdbEventOperation %p", op->m_facade); + delete op->m_facade; + } + else + { + op->m_next= m_dropped_ev_op; + op->m_prev= 0; + if (m_dropped_ev_op) + m_dropped_ev_op->m_prev= op; + m_dropped_ev_op= op; + } + DBUG_VOID_RETURN; } void diff --git a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp index 70b3ce6b8de..9ba14d9d259 100644 --- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp +++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp @@ -25,7 +25,7 @@ #include #define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4 -//#define EVENT_DEBUG +#define EVENT_DEBUG #ifdef EVENT_DEBUG #define DBUG_ENTER_EVENT(A) DBUG_ENTER(A) #define DBUG_RETURN_EVENT(A) DBUG_RETURN(A) @@ -367,6 +367,8 @@ public: Uint32 m_eventId; Uint32 m_oid; + Bitmask<(unsigned int)_NDB_NODE_BITMASK_SIZE> m_node_bit_mask; + int m_ref_count; bool m_mergeEvents; EventBufData *m_data_item; @@ -406,10 +408,10 @@ public: void dropEventOperation(NdbEventOperation *); static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp); - void add_drop_lock(); - void add_drop_unlock(); - void lock(); - void unlock(); + void add_drop_lock() { NdbMutex_Lock(p_add_drop_mutex); } + void add_drop_unlock() { NdbMutex_Unlock(p_add_drop_mutex); } + void lock() { NdbMutex_Lock(m_mutex); } + void unlock() { NdbMutex_Unlock(m_mutex); } void add_op(); void remove_op(); @@ -430,9 +432,11 @@ public: Uint32 getEventId(int bufferId); int pollEvents(int aMillisecondNumber, Uint64 *latestGCI= 0); + int flushIncompleteEvents(Uint64 gci); NdbEventOperation *nextEvent(); NdbEventOperationImpl* getGCIEventOperations(Uint32* iter, Uint32* event_types); + void NdbEventBuffer::deleteUsedEventOperations(); NdbEventOperationImpl *move_data();