mirror of
https://github.com/MariaDB/server.git
synced 2025-08-07 00:04:31 +03:00
Bug #19395 mysqld does not always detect cluster shutdown
Bug #17610 ndbapi: dropEventOperation leaves object behind, memleak mysql-test/r/ndb_autodiscover3.result: Bug #19395 mysqld does not always detect cluster shutdown mysql-test/t/disabled.def: Bug #19395 mysqld does not always detect cluster shutdown mysql-test/t/ndb_autodiscover3.test: Bug #19395 mysqld does not always detect cluster shutdown sql/ha_ndbcluster_binlog.cc: Bug #19395 mysqld does not always detect cluster shutdown storage/ndb/include/ndbapi/Ndb.hpp: Bug #19395 mysqld does not always detect cluster shutdown storage/ndb/include/ndbapi/NdbDictionary.hpp: Bug #17610 ndbapi: dropEventOperation leaves object behind, memleak storage/ndb/src/kernel/blocks/suma/Suma.cpp: Bug #17610 ndbapi: dropEventOperation leaves object behind, memleak storage/ndb/src/ndbapi/Ndb.cpp: Bug #19395 mysqld does not always detect cluster shutdown storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp: Bug #17610 ndbapi: dropEventOperation leaves object behind, memleak storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp: Bug #17610 ndbapi: dropEventOperation leaves object behind, memleak
This commit is contained in:
@@ -18,6 +18,7 @@ select * from t2;
|
|||||||
ERROR 42S02: Table 'test.t2' doesn't exist
|
ERROR 42S02: Table 'test.t2' doesn't exist
|
||||||
show tables like 't2';
|
show tables like 't2';
|
||||||
Tables_in_test (t2)
|
Tables_in_test (t2)
|
||||||
|
reset master;
|
||||||
create table t2 (a int key) engine=ndbcluster;
|
create table t2 (a int key) engine=ndbcluster;
|
||||||
insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
|
insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
|
||||||
select * from t2 order by a limit 3;
|
select * from t2 order by a limit 3;
|
||||||
@@ -30,10 +31,12 @@ a
|
|||||||
1
|
1
|
||||||
2
|
2
|
||||||
3
|
3
|
||||||
|
reset master;
|
||||||
select * from t2;
|
select * from t2;
|
||||||
ERROR 42S02: Table 'test.t2' doesn't exist
|
ERROR 42S02: Table 'test.t2' doesn't exist
|
||||||
show tables like 't2';
|
show tables like 't2';
|
||||||
Tables_in_test (t2)
|
Tables_in_test (t2)
|
||||||
|
reset master;
|
||||||
create table t2 (a int key) engine=ndbcluster;
|
create table t2 (a int key) engine=ndbcluster;
|
||||||
insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
|
insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
|
||||||
select * from t2 order by a limit 3;
|
select * from t2 order by a limit 3;
|
||||||
@@ -46,4 +49,5 @@ a
|
|||||||
1
|
1
|
||||||
2
|
2
|
||||||
3
|
3
|
||||||
|
reset master;
|
||||||
drop table t2;
|
drop table t2;
|
||||||
|
@@ -16,7 +16,7 @@ events_scheduling : BUG#19170 2006-04-26 andrey Test case of 19170 fails
|
|||||||
events_logs_tests : BUG#17619 2006-05-16 andrey Test case problems
|
events_logs_tests : BUG#17619 2006-05-16 andrey Test case problems
|
||||||
ndb_autodiscover : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog
|
ndb_autodiscover : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog
|
||||||
ndb_autodiscover2 : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog
|
ndb_autodiscover2 : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog
|
||||||
ndb_binlog_discover : BUG#19395 2006-04-28 tomas/knielsen mysqld does not always detect cluster shutdown
|
#ndb_binlog_discover : BUG#19395 2006-04-28 tomas/knielsen mysqld does not always detect cluster shutdown
|
||||||
#ndb_cache2 : BUG#18597 2006-03-28 brian simultaneous drop table and ndb statistics update triggers node failure
|
#ndb_cache2 : BUG#18597 2006-03-28 brian simultaneous drop table and ndb statistics update triggers node failure
|
||||||
#ndb_cache_multi2 : BUG#18597 2006-04-10 kent simultaneous drop table and ndb statistics update triggers node failure
|
#ndb_cache_multi2 : BUG#18597 2006-04-10 kent simultaneous drop table and ndb statistics update triggers node failure
|
||||||
ndb_load : BUG#17233 2006-05-04 tomas failed load data from infile causes mysqld dbug_assert, binlog not flushed
|
ndb_load : BUG#17233 2006-05-04 tomas failed load data from infile causes mysqld dbug_assert, binlog not flushed
|
||||||
|
@@ -43,6 +43,7 @@ select * from t2 order by a limit 3;
|
|||||||
--error ER_NO_SUCH_TABLE
|
--error ER_NO_SUCH_TABLE
|
||||||
select * from t2;
|
select * from t2;
|
||||||
show tables like 't2';
|
show tables like 't2';
|
||||||
|
reset master;
|
||||||
create table t2 (a int key) engine=ndbcluster;
|
create table t2 (a int key) engine=ndbcluster;
|
||||||
insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
|
insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
|
||||||
select * from t2 order by a limit 3;
|
select * from t2 order by a limit 3;
|
||||||
@@ -50,6 +51,7 @@ select * from t2 order by a limit 3;
|
|||||||
# server 1 should have a stale cache, and in this case wrong frm, transaction must be retried
|
# server 1 should have a stale cache, and in this case wrong frm, transaction must be retried
|
||||||
--connection server1
|
--connection server1
|
||||||
select * from t2 order by a limit 3;
|
select * from t2 order by a limit 3;
|
||||||
|
reset master;
|
||||||
|
|
||||||
--exec $NDB_MGM --no-defaults -e "all restart -i" >> $NDB_TOOLS_OUTPUT
|
--exec $NDB_MGM --no-defaults -e "all restart -i" >> $NDB_TOOLS_OUTPUT
|
||||||
--exec $NDB_TOOLS_DIR/ndb_waiter --no-defaults >> $NDB_TOOLS_OUTPUT
|
--exec $NDB_TOOLS_DIR/ndb_waiter --no-defaults >> $NDB_TOOLS_OUTPUT
|
||||||
@@ -60,6 +62,7 @@ select * from t2 order by a limit 3;
|
|||||||
--error ER_NO_SUCH_TABLE
|
--error ER_NO_SUCH_TABLE
|
||||||
select * from t2;
|
select * from t2;
|
||||||
show tables like 't2';
|
show tables like 't2';
|
||||||
|
reset master;
|
||||||
create table t2 (a int key) engine=ndbcluster;
|
create table t2 (a int key) engine=ndbcluster;
|
||||||
insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
|
insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
|
||||||
select * from t2 order by a limit 3;
|
select * from t2 order by a limit 3;
|
||||||
@@ -67,6 +70,7 @@ select * from t2 order by a limit 3;
|
|||||||
# server 2 should have a stale cache, but with right frm, transaction need not be retried
|
# server 2 should have a stale cache, but with right frm, transaction need not be retried
|
||||||
--connection server2
|
--connection server2
|
||||||
select * from t2 order by a limit 3;
|
select * from t2 order by a limit 3;
|
||||||
|
reset master;
|
||||||
|
|
||||||
drop table t2;
|
drop table t2;
|
||||||
# End of 4.1 tests
|
# End of 4.1 tests
|
||||||
|
@@ -3321,6 +3321,9 @@ restart:
|
|||||||
DBUG_PRINT("info", ("schema_res: %d schema_gci: %d", schema_res, schema_gci));
|
DBUG_PRINT("info", ("schema_res: %d schema_gci: %d", schema_res, schema_gci));
|
||||||
if (schema_res > 0)
|
if (schema_res > 0)
|
||||||
{
|
{
|
||||||
|
i_ndb->pollEvents(0);
|
||||||
|
i_ndb->flushIncompleteEvents(schema_gci);
|
||||||
|
s_ndb->flushIncompleteEvents(schema_gci);
|
||||||
if (schema_gci < ndb_latest_handled_binlog_epoch)
|
if (schema_gci < ndb_latest_handled_binlog_epoch)
|
||||||
{
|
{
|
||||||
sql_print_error("NDB Binlog: cluster has been restarted --initial or with older filesystem. "
|
sql_print_error("NDB Binlog: cluster has been restarted --initial or with older filesystem. "
|
||||||
|
@@ -1262,6 +1262,7 @@ public:
|
|||||||
|
|
||||||
|
|
||||||
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
|
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
|
||||||
|
int flushIncompleteEvents(Uint64 gci);
|
||||||
NdbEventOperation *getEventOperation(NdbEventOperation* eventOp= 0);
|
NdbEventOperation *getEventOperation(NdbEventOperation* eventOp= 0);
|
||||||
Uint64 getLatestGCI();
|
Uint64 getLatestGCI();
|
||||||
void forceGCP();
|
void forceGCP();
|
||||||
|
@@ -1132,7 +1132,8 @@ public:
|
|||||||
_TE_NODE_FAILURE=10,
|
_TE_NODE_FAILURE=10,
|
||||||
_TE_SUBSCRIBE=11,
|
_TE_SUBSCRIBE=11,
|
||||||
_TE_UNSUBSCRIBE=12,
|
_TE_UNSUBSCRIBE=12,
|
||||||
_TE_NUL=13 // internal (e.g. INS o DEL within same GCI)
|
_TE_NUL=13, // internal (e.g. INS o DEL within same GCI)
|
||||||
|
_TE_ACTIVE=14 // internal (node becomes active)
|
||||||
};
|
};
|
||||||
#endif
|
#endif
|
||||||
/**
|
/**
|
||||||
|
@@ -2649,6 +2649,22 @@ Suma::reportAllSubscribers(Signal *signal,
|
|||||||
SubscriptionPtr subPtr,
|
SubscriptionPtr subPtr,
|
||||||
SubscriberPtr subbPtr)
|
SubscriberPtr subbPtr)
|
||||||
{
|
{
|
||||||
|
SubTableData * data = (SubTableData*)signal->getDataPtrSend();
|
||||||
|
|
||||||
|
if (table_event == NdbDictionary::Event::_TE_SUBSCRIBE)
|
||||||
|
{
|
||||||
|
data->gci = m_last_complete_gci + 1;
|
||||||
|
data->tableId = subPtr.p->m_tableId;
|
||||||
|
data->operation = NdbDictionary::Event::_TE_ACTIVE;
|
||||||
|
data->ndbd_nodeid = refToNode(reference());
|
||||||
|
data->changeMask = 0;
|
||||||
|
data->totalLen = 0;
|
||||||
|
data->req_nodeid = refToNode(subbPtr.p->m_senderRef);
|
||||||
|
data->senderData = subbPtr.p->m_senderData;
|
||||||
|
sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
|
||||||
|
SubTableData::SignalLength, JBB);
|
||||||
|
}
|
||||||
|
|
||||||
if (!(subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE))
|
if (!(subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE))
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
@@ -2663,7 +2679,6 @@ Suma::reportAllSubscribers(Signal *signal,
|
|||||||
ndbout_c("reportAllSubscribers subPtr.i: %d subPtr.p->n_subscribers: %d",
|
ndbout_c("reportAllSubscribers subPtr.i: %d subPtr.p->n_subscribers: %d",
|
||||||
subPtr.i, subPtr.p->n_subscribers);
|
subPtr.i, subPtr.p->n_subscribers);
|
||||||
//#endif
|
//#endif
|
||||||
SubTableData * data = (SubTableData*)signal->getDataPtrSend();
|
|
||||||
data->gci = m_last_complete_gci + 1;
|
data->gci = m_last_complete_gci + 1;
|
||||||
data->tableId = subPtr.p->m_tableId;
|
data->tableId = subPtr.p->m_tableId;
|
||||||
data->operation = table_event;
|
data->operation = table_event;
|
||||||
|
@@ -1324,6 +1324,12 @@ Ndb::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
|
|||||||
return theEventBuffer->pollEvents(aMillisecondNumber, latestGCI);
|
return theEventBuffer->pollEvents(aMillisecondNumber, latestGCI);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
Ndb::flushIncompleteEvents(Uint64 gci)
|
||||||
|
{
|
||||||
|
return theEventBuffer->flushIncompleteEvents(gci);
|
||||||
|
}
|
||||||
|
|
||||||
NdbEventOperation *Ndb::nextEvent()
|
NdbEventOperation *Ndb::nextEvent()
|
||||||
{
|
{
|
||||||
return theEventBuffer->nextEvent();
|
return theEventBuffer->nextEvent();
|
||||||
|
@@ -153,11 +153,14 @@ NdbEventOperationImpl::init(NdbEventImpl& evnt)
|
|||||||
|
|
||||||
m_state= EO_CREATED;
|
m_state= EO_CREATED;
|
||||||
|
|
||||||
|
m_node_bit_mask.clear();
|
||||||
#ifdef ndb_event_stores_merge_events_flag
|
#ifdef ndb_event_stores_merge_events_flag
|
||||||
m_mergeEvents = m_eventImpl->m_mergeEvents;
|
m_mergeEvents = m_eventImpl->m_mergeEvents;
|
||||||
#else
|
#else
|
||||||
m_mergeEvents = false;
|
m_mergeEvents = false;
|
||||||
#endif
|
#endif
|
||||||
|
m_ref_count = 0;
|
||||||
|
DBUG_PRINT("info", ("m_ref_count = 0 for op: %p", this));
|
||||||
|
|
||||||
m_has_error= 0;
|
m_has_error= 0;
|
||||||
|
|
||||||
@@ -530,7 +533,11 @@ NdbEventOperationImpl::execute_nolock()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (r == 0)
|
if (r == 0)
|
||||||
|
{
|
||||||
|
m_ref_count++;
|
||||||
|
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
|
||||||
DBUG_RETURN(0);
|
DBUG_RETURN(0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
//Error
|
//Error
|
||||||
m_state= EO_ERROR;
|
m_state= EO_ERROR;
|
||||||
@@ -657,80 +664,79 @@ NdbEventOperationImpl::execSUB_TABLE_DATA(NdbApiSignal * signal,
|
|||||||
int
|
int
|
||||||
NdbEventOperationImpl::receive_event()
|
NdbEventOperationImpl::receive_event()
|
||||||
{
|
{
|
||||||
DBUG_ENTER_EVENT("NdbEventOperationImpl::receive_event");
|
|
||||||
|
|
||||||
Uint32 operation= (Uint32)m_data_item->sdata->operation;
|
Uint32 operation= (Uint32)m_data_item->sdata->operation;
|
||||||
DBUG_PRINT_EVENT("info",("sdata->operation %u",operation));
|
|
||||||
|
|
||||||
if (operation == NdbDictionary::Event::_TE_ALTER)
|
|
||||||
{
|
|
||||||
// Parse the new table definition and
|
|
||||||
// create a table object
|
|
||||||
NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
|
|
||||||
NdbDictionaryImpl *dict = & NdbDictionaryImpl::getImpl(*myDict);
|
|
||||||
NdbError error;
|
|
||||||
NdbDictInterface dif(error);
|
|
||||||
NdbTableImpl *at;
|
|
||||||
m_change_mask = m_data_item->sdata->changeMask;
|
|
||||||
error.code = dif.parseTableInfo(&at,
|
|
||||||
(Uint32*)m_buffer.get_data(),
|
|
||||||
m_buffer.length() / 4,
|
|
||||||
true);
|
|
||||||
m_buffer.clear();
|
|
||||||
if (at)
|
|
||||||
at->buildColumnHash();
|
|
||||||
else
|
|
||||||
{
|
|
||||||
DBUG_PRINT_EVENT("info", ("Failed to parse DictTabInfo error %u",
|
|
||||||
error.code));
|
|
||||||
DBUG_RETURN_EVENT(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
NdbTableImpl *tmp_table_impl= m_eventImpl->m_tableImpl;
|
|
||||||
m_eventImpl->m_tableImpl = at;
|
|
||||||
|
|
||||||
DBUG_PRINT("info", ("switching table impl 0x%x -> 0x%x",
|
|
||||||
tmp_table_impl, at));
|
|
||||||
|
|
||||||
// change the rec attrs to refer to the new table object
|
|
||||||
int i;
|
|
||||||
for (i = 0; i < 2; i++)
|
|
||||||
{
|
|
||||||
NdbRecAttr *p = theFirstPkAttrs[i];
|
|
||||||
while (p)
|
|
||||||
{
|
|
||||||
int no = p->getColumn()->getColumnNo();
|
|
||||||
NdbColumnImpl *tAttrInfo = at->getColumn(no);
|
|
||||||
DBUG_PRINT("info", ("rec_attr: 0x%x "
|
|
||||||
"switching column impl 0x%x -> 0x%x",
|
|
||||||
p, p->m_column, tAttrInfo));
|
|
||||||
p->m_column = tAttrInfo;
|
|
||||||
p = p->next();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (i = 0; i < 2; i++)
|
|
||||||
{
|
|
||||||
NdbRecAttr *p = theFirstDataAttrs[i];
|
|
||||||
while (p)
|
|
||||||
{
|
|
||||||
int no = p->getColumn()->getColumnNo();
|
|
||||||
NdbColumnImpl *tAttrInfo = at->getColumn(no);
|
|
||||||
DBUG_PRINT("info", ("rec_attr: 0x%x "
|
|
||||||
"switching column impl 0x%x -> 0x%x",
|
|
||||||
p, p->m_column, tAttrInfo));
|
|
||||||
p->m_column = tAttrInfo;
|
|
||||||
p = p->next();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (tmp_table_impl)
|
|
||||||
delete tmp_table_impl;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
|
if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
|
||||||
{
|
{
|
||||||
DBUG_RETURN_EVENT(1);
|
DBUG_ENTER("NdbEventOperationImpl::receive_event");
|
||||||
|
DBUG_PRINT("info",("sdata->operation %u this: %p", operation, this));
|
||||||
|
if (operation == NdbDictionary::Event::_TE_ALTER)
|
||||||
|
{
|
||||||
|
// Parse the new table definition and
|
||||||
|
// create a table object
|
||||||
|
NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
|
||||||
|
NdbDictionaryImpl *dict = & NdbDictionaryImpl::getImpl(*myDict);
|
||||||
|
NdbError error;
|
||||||
|
NdbDictInterface dif(error);
|
||||||
|
NdbTableImpl *at;
|
||||||
|
m_change_mask = m_data_item->sdata->changeMask;
|
||||||
|
error.code = dif.parseTableInfo(&at,
|
||||||
|
(Uint32*)m_buffer.get_data(),
|
||||||
|
m_buffer.length() / 4,
|
||||||
|
true);
|
||||||
|
m_buffer.clear();
|
||||||
|
if (unlikely(!at))
|
||||||
|
{
|
||||||
|
DBUG_PRINT("info", ("Failed to parse DictTabInfo error %u",
|
||||||
|
error.code));
|
||||||
|
ndbout_c("Failed to parse DictTabInfo error %u", error.code);
|
||||||
|
DBUG_RETURN(1);
|
||||||
|
}
|
||||||
|
at->buildColumnHash();
|
||||||
|
|
||||||
|
NdbTableImpl *tmp_table_impl= m_eventImpl->m_tableImpl;
|
||||||
|
m_eventImpl->m_tableImpl = at;
|
||||||
|
|
||||||
|
DBUG_PRINT("info", ("switching table impl 0x%x -> 0x%x",
|
||||||
|
tmp_table_impl, at));
|
||||||
|
|
||||||
|
// change the rec attrs to refer to the new table object
|
||||||
|
int i;
|
||||||
|
for (i = 0; i < 2; i++)
|
||||||
|
{
|
||||||
|
NdbRecAttr *p = theFirstPkAttrs[i];
|
||||||
|
while (p)
|
||||||
|
{
|
||||||
|
int no = p->getColumn()->getColumnNo();
|
||||||
|
NdbColumnImpl *tAttrInfo = at->getColumn(no);
|
||||||
|
DBUG_PRINT("info", ("rec_attr: 0x%x "
|
||||||
|
"switching column impl 0x%x -> 0x%x",
|
||||||
|
p, p->m_column, tAttrInfo));
|
||||||
|
p->m_column = tAttrInfo;
|
||||||
|
p = p->next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (i = 0; i < 2; i++)
|
||||||
|
{
|
||||||
|
NdbRecAttr *p = theFirstDataAttrs[i];
|
||||||
|
while (p)
|
||||||
|
{
|
||||||
|
int no = p->getColumn()->getColumnNo();
|
||||||
|
NdbColumnImpl *tAttrInfo = at->getColumn(no);
|
||||||
|
DBUG_PRINT("info", ("rec_attr: 0x%x "
|
||||||
|
"switching column impl 0x%x -> 0x%x",
|
||||||
|
p, p->m_column, tAttrInfo));
|
||||||
|
p->m_column = tAttrInfo;
|
||||||
|
p = p->next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (tmp_table_impl)
|
||||||
|
delete tmp_table_impl;
|
||||||
|
}
|
||||||
|
DBUG_RETURN(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DBUG_ENTER_EVENT("NdbEventOperationImpl::receive_event");
|
||||||
|
DBUG_PRINT_EVENT("info",("sdata->operation %u this: %p", operation, this));
|
||||||
// now move the data into the RecAttrs
|
// now move the data into the RecAttrs
|
||||||
|
|
||||||
int is_update= operation == NdbDictionary::Event::_TE_UPDATE;
|
int is_update= operation == NdbDictionary::Event::_TE_UPDATE;
|
||||||
@@ -1089,6 +1095,32 @@ NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
NdbEventBuffer::flushIncompleteEvents(Uint64 gci)
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Find min complete gci
|
||||||
|
*/
|
||||||
|
Uint32 i;
|
||||||
|
Uint32 sz= m_active_gci.size();
|
||||||
|
Gci_container* array = (Gci_container*)m_active_gci.getBase();
|
||||||
|
for(i = 0; i < sz; i++)
|
||||||
|
{
|
||||||
|
Gci_container* tmp = array + i;
|
||||||
|
if (tmp->m_gci < gci)
|
||||||
|
{
|
||||||
|
// we have found an old not-completed gci, remove it
|
||||||
|
if(!tmp->m_data.is_empty())
|
||||||
|
{
|
||||||
|
free_list(tmp->m_data);
|
||||||
|
}
|
||||||
|
tmp->~Gci_container();
|
||||||
|
bzero(tmp, sizeof(Gci_container));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
NdbEventOperation *
|
NdbEventOperation *
|
||||||
NdbEventBuffer::nextEvent()
|
NdbEventBuffer::nextEvent()
|
||||||
{
|
{
|
||||||
@@ -1157,7 +1189,10 @@ NdbEventBuffer::nextEvent()
|
|||||||
}
|
}
|
||||||
EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
|
EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
|
||||||
while (gci_ops && op->getGCI() > gci_ops->m_gci)
|
while (gci_ops && op->getGCI() > gci_ops->m_gci)
|
||||||
|
{
|
||||||
|
deleteUsedEventOperations();
|
||||||
gci_ops = m_available_data.next_gci_ops();
|
gci_ops = m_available_data.next_gci_ops();
|
||||||
|
}
|
||||||
assert(gci_ops && (op->getGCI() == gci_ops->m_gci));
|
assert(gci_ops && (op->getGCI() == gci_ops->m_gci));
|
||||||
DBUG_RETURN_EVENT(op->m_facade);
|
DBUG_RETURN_EVENT(op->m_facade);
|
||||||
}
|
}
|
||||||
@@ -1177,7 +1212,10 @@ NdbEventBuffer::nextEvent()
|
|||||||
// free all "per gci unique" collected operations
|
// free all "per gci unique" collected operations
|
||||||
EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
|
EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
|
||||||
while (gci_ops)
|
while (gci_ops)
|
||||||
|
{
|
||||||
|
deleteUsedEventOperations();
|
||||||
gci_ops = m_available_data.next_gci_ops();
|
gci_ops = m_available_data.next_gci_ops();
|
||||||
|
}
|
||||||
DBUG_RETURN_EVENT(0);
|
DBUG_RETURN_EVENT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1191,31 +1229,38 @@ NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types)
|
|||||||
EventBufData_list::Gci_op g = gci_ops->m_gci_op_list[(*iter)++];
|
EventBufData_list::Gci_op g = gci_ops->m_gci_op_list[(*iter)++];
|
||||||
if (event_types != NULL)
|
if (event_types != NULL)
|
||||||
*event_types = g.event_types;
|
*event_types = g.event_types;
|
||||||
DBUG_PRINT("info", ("gci: %d", (unsigned)gci_ops->m_gci));
|
DBUG_PRINT("info", ("gci: %d g.op: %x g.event_types: %x",
|
||||||
|
(unsigned)gci_ops->m_gci, g.op, g.event_types));
|
||||||
DBUG_RETURN(g.op);
|
DBUG_RETURN(g.op);
|
||||||
}
|
}
|
||||||
DBUG_RETURN(NULL);
|
DBUG_RETURN(NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
NdbEventBuffer::lock()
|
NdbEventBuffer::deleteUsedEventOperations()
|
||||||
{
|
{
|
||||||
NdbMutex_Lock(m_mutex);
|
Uint32 iter= 0;
|
||||||
}
|
const NdbEventOperation *op_f;
|
||||||
void
|
while ((op_f= getGCIEventOperations(&iter, NULL)) != NULL)
|
||||||
NdbEventBuffer::unlock()
|
{
|
||||||
{
|
NdbEventOperationImpl *op = &op_f->m_impl;
|
||||||
NdbMutex_Unlock(m_mutex);
|
DBUG_ASSERT(op->m_ref_count > 0);
|
||||||
}
|
op->m_ref_count--;
|
||||||
void
|
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
|
||||||
NdbEventBuffer::add_drop_lock()
|
if (op->m_ref_count == 0)
|
||||||
{
|
{
|
||||||
NdbMutex_Lock(p_add_drop_mutex);
|
DBUG_PRINT("info", ("deleting op: %p", op));
|
||||||
}
|
DBUG_ASSERT(op->m_node_bit_mask.isclear());
|
||||||
void
|
if (op->m_next)
|
||||||
NdbEventBuffer::add_drop_unlock()
|
op->m_next->m_prev = op->m_prev;
|
||||||
{
|
if (op->m_prev)
|
||||||
NdbMutex_Unlock(p_add_drop_mutex);
|
op->m_prev->m_next = op->m_next;
|
||||||
|
else
|
||||||
|
m_dropped_ev_op = op->m_next;
|
||||||
|
ndbout_c("deleting NdbEventOperation %p", op->m_facade);
|
||||||
|
delete op->m_facade;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static
|
static
|
||||||
@@ -1469,6 +1514,10 @@ NdbEventBuffer::complete_outof_order_gcis()
|
|||||||
void
|
void
|
||||||
NdbEventBuffer::report_node_failure(Uint32 node_id)
|
NdbEventBuffer::report_node_failure(Uint32 node_id)
|
||||||
{
|
{
|
||||||
|
NdbEventOperation* op= m_ndb->getEventOperation(0);
|
||||||
|
if (op == 0)
|
||||||
|
return;
|
||||||
|
|
||||||
DBUG_ENTER("NdbEventBuffer::report_node_failure");
|
DBUG_ENTER("NdbEventBuffer::report_node_failure");
|
||||||
SubTableData data;
|
SubTableData data;
|
||||||
LinearSectionPtr ptr[3];
|
LinearSectionPtr ptr[3];
|
||||||
@@ -1484,12 +1533,20 @@ NdbEventBuffer::report_node_failure(Uint32 node_id)
|
|||||||
/**
|
/**
|
||||||
* Insert this event for each operation
|
* Insert this event for each operation
|
||||||
*/
|
*/
|
||||||
NdbEventOperation* op= 0;
|
|
||||||
while((op = m_ndb->getEventOperation(op)))
|
|
||||||
{
|
{
|
||||||
NdbEventOperationImpl* impl= &op->m_impl;
|
// no need to lock()/unlock(), receive thread calls this
|
||||||
data.senderData = impl->m_oid;
|
NdbEventOperationImpl* impl = &op->m_impl;
|
||||||
insertDataL(impl, &data, ptr);
|
do if (!impl->m_node_bit_mask.isclear())
|
||||||
|
{
|
||||||
|
data.senderData = impl->m_oid;
|
||||||
|
insertDataL(impl, &data, ptr);
|
||||||
|
} while((impl = impl->m_next));
|
||||||
|
for (impl = m_dropped_ev_op; impl; impl = impl->m_next)
|
||||||
|
if (!impl->m_node_bit_mask.isclear())
|
||||||
|
{
|
||||||
|
data.senderData = impl->m_oid;
|
||||||
|
insertDataL(impl, &data, ptr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
DBUG_VOID_RETURN;
|
DBUG_VOID_RETURN;
|
||||||
}
|
}
|
||||||
@@ -1515,12 +1572,21 @@ NdbEventBuffer::completeClusterFailed()
|
|||||||
/**
|
/**
|
||||||
* Insert this event for each operation
|
* Insert this event for each operation
|
||||||
*/
|
*/
|
||||||
do
|
|
||||||
{
|
{
|
||||||
NdbEventOperationImpl* impl= &op->m_impl;
|
// no need to lock()/unlock(), receive thread calls this
|
||||||
data.senderData = impl->m_oid;
|
NdbEventOperationImpl* impl = &op->m_impl;
|
||||||
insertDataL(impl, &data, ptr);
|
do if (!impl->m_node_bit_mask.isclear())
|
||||||
} while((op = m_ndb->getEventOperation(op)));
|
{
|
||||||
|
data.senderData = impl->m_oid;
|
||||||
|
insertDataL(impl, &data, ptr);
|
||||||
|
} while((impl = impl->m_next));
|
||||||
|
for (impl = m_dropped_ev_op; impl; impl = impl->m_next)
|
||||||
|
if (!impl->m_node_bit_mask.isclear())
|
||||||
|
{
|
||||||
|
data.senderData = impl->m_oid;
|
||||||
|
insertDataL(impl, &data, ptr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Release all GCI's with m_gci > gci
|
* Release all GCI's with m_gci > gci
|
||||||
@@ -1565,7 +1631,11 @@ NdbEventBuffer::completeClusterFailed()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(bucket != 0);
|
if (bucket == 0)
|
||||||
|
{
|
||||||
|
// no bucket to complete
|
||||||
|
DBUG_VOID_RETURN;
|
||||||
|
}
|
||||||
|
|
||||||
const Uint32 cnt= bucket->m_gcp_complete_rep_count = 1;
|
const Uint32 cnt= bucket->m_gcp_complete_rep_count = 1;
|
||||||
bucket->m_gci = gci;
|
bucket->m_gci = gci;
|
||||||
@@ -1595,6 +1665,40 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
|
|||||||
{
|
{
|
||||||
DBUG_ENTER_EVENT("NdbEventBuffer::insertDataL");
|
DBUG_ENTER_EVENT("NdbEventBuffer::insertDataL");
|
||||||
Uint64 gci= sdata->gci;
|
Uint64 gci= sdata->gci;
|
||||||
|
const bool is_data_event =
|
||||||
|
sdata->operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
|
||||||
|
|
||||||
|
if (!is_data_event)
|
||||||
|
{
|
||||||
|
switch (sdata->operation)
|
||||||
|
{
|
||||||
|
case NdbDictionary::Event::_TE_NODE_FAILURE:
|
||||||
|
op->m_node_bit_mask.clear(sdata->ndbd_nodeid);
|
||||||
|
break;
|
||||||
|
case NdbDictionary::Event::_TE_ACTIVE:
|
||||||
|
op->m_node_bit_mask.set(sdata->ndbd_nodeid);
|
||||||
|
// internal event, do not relay to user
|
||||||
|
DBUG_RETURN_EVENT(0);
|
||||||
|
break;
|
||||||
|
case NdbDictionary::Event::_TE_CLUSTER_FAILURE:
|
||||||
|
op->m_node_bit_mask.clear();
|
||||||
|
DBUG_ASSERT(op->m_ref_count > 0);
|
||||||
|
op->m_ref_count--;
|
||||||
|
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
|
||||||
|
break;
|
||||||
|
case NdbDictionary::Event::_TE_STOP:
|
||||||
|
op->m_node_bit_mask.clear(sdata->ndbd_nodeid);
|
||||||
|
if (op->m_node_bit_mask.isclear())
|
||||||
|
{
|
||||||
|
DBUG_ASSERT(op->m_ref_count > 0);
|
||||||
|
op->m_ref_count--;
|
||||||
|
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if ( likely((Uint32)op->mi_type & (1 << (Uint32)sdata->operation)) )
|
if ( likely((Uint32)op->mi_type & (1 << (Uint32)sdata->operation)) )
|
||||||
{
|
{
|
||||||
@@ -1615,8 +1719,6 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
|
|||||||
}
|
}
|
||||||
|
|
||||||
const bool is_blob_event = (op->theMainOp != NULL);
|
const bool is_blob_event = (op->theMainOp != NULL);
|
||||||
const bool is_data_event =
|
|
||||||
sdata->operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
|
|
||||||
const bool use_hash = op->m_mergeEvents && is_data_event;
|
const bool use_hash = op->m_mergeEvents && is_data_event;
|
||||||
|
|
||||||
if (! is_data_event && is_blob_event)
|
if (! is_data_event && is_blob_event)
|
||||||
@@ -2244,6 +2346,8 @@ void EventBufData_list::append_list(EventBufData_list *list, Uint64 gci)
|
|||||||
void
|
void
|
||||||
EventBufData_list::add_gci_op(Gci_op g, bool del)
|
EventBufData_list::add_gci_op(Gci_op g, bool del)
|
||||||
{
|
{
|
||||||
|
DBUG_ENTER_EVENT("EventBufData_list::add_gci_op");
|
||||||
|
DBUG_PRINT_EVENT("info", ("p.op: %p g.event_types: %x", g.op, g.event_types));
|
||||||
assert(g.op != NULL);
|
assert(g.op != NULL);
|
||||||
Uint32 i;
|
Uint32 i;
|
||||||
for (i = 0; i < m_gci_op_count; i++) {
|
for (i = 0; i < m_gci_op_count; i++) {
|
||||||
@@ -2273,8 +2377,15 @@ EventBufData_list::add_gci_op(Gci_op g, bool del)
|
|||||||
}
|
}
|
||||||
assert(m_gci_op_count < m_gci_op_alloc);
|
assert(m_gci_op_count < m_gci_op_alloc);
|
||||||
assert(! del);
|
assert(! del);
|
||||||
|
#ifndef DBUG_OFF
|
||||||
|
i = m_gci_op_count;
|
||||||
|
#endif
|
||||||
|
g.op->m_ref_count++;
|
||||||
|
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", g.op->m_ref_count, g.op));
|
||||||
m_gci_op_list[m_gci_op_count++] = g;
|
m_gci_op_list[m_gci_op_count++] = g;
|
||||||
}
|
}
|
||||||
|
DBUG_PRINT_EVENT("exit", ("m_gci_op_list[%u].event_types: %x", i, m_gci_op_list[i].event_types));
|
||||||
|
DBUG_VOID_RETURN_EVENT;
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
@@ -2337,6 +2448,9 @@ NdbEventBuffer::createEventOperation(const char* eventName,
|
|||||||
delete tOp;
|
delete tOp;
|
||||||
DBUG_RETURN(NULL);
|
DBUG_RETURN(NULL);
|
||||||
}
|
}
|
||||||
|
getEventOperationImpl(tOp)->m_ref_count = 1;
|
||||||
|
DBUG_PRINT("info", ("m_ref_count: %u for op: %p",
|
||||||
|
getEventOperationImpl(tOp)->m_ref_count, getEventOperationImpl(tOp)));
|
||||||
DBUG_RETURN(tOp);
|
DBUG_RETURN(tOp);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2362,16 +2476,10 @@ NdbEventBuffer::createEventOperation(NdbEventImpl& evnt,
|
|||||||
void
|
void
|
||||||
NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp)
|
NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp)
|
||||||
{
|
{
|
||||||
|
DBUG_ENTER("NdbEventBuffer::dropEventOperation");
|
||||||
NdbEventOperationImpl* op= getEventOperationImpl(tOp);
|
NdbEventOperationImpl* op= getEventOperationImpl(tOp);
|
||||||
|
|
||||||
op->stop();
|
op->stop();
|
||||||
|
|
||||||
op->m_next= m_dropped_ev_op;
|
|
||||||
op->m_prev= 0;
|
|
||||||
if (m_dropped_ev_op)
|
|
||||||
m_dropped_ev_op->m_prev= op;
|
|
||||||
m_dropped_ev_op= op;
|
|
||||||
|
|
||||||
// stop blob event ops
|
// stop blob event ops
|
||||||
if (op->theMainOp == NULL)
|
if (op->theMainOp == NULL)
|
||||||
{
|
{
|
||||||
@@ -2391,11 +2499,25 @@ NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ToDo, take care of these to be deleted at the
|
DBUG_ASSERT(op->m_ref_count > 0);
|
||||||
// appropriate time, after we are sure that there
|
op->m_ref_count--;
|
||||||
// are _no_ more events coming
|
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
|
||||||
|
if (op->m_ref_count == 0)
|
||||||
// delete tOp;
|
{
|
||||||
|
DBUG_PRINT("info", ("deleting op: %p", op));
|
||||||
|
DBUG_ASSERT(op->m_node_bit_mask.isclear());
|
||||||
|
ndbout_c("deleting NdbEventOperation %p", op->m_facade);
|
||||||
|
delete op->m_facade;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
op->m_next= m_dropped_ev_op;
|
||||||
|
op->m_prev= 0;
|
||||||
|
if (m_dropped_ev_op)
|
||||||
|
m_dropped_ev_op->m_prev= op;
|
||||||
|
m_dropped_ev_op= op;
|
||||||
|
}
|
||||||
|
DBUG_VOID_RETURN;
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
@@ -25,7 +25,7 @@
|
|||||||
#include <UtilBuffer.hpp>
|
#include <UtilBuffer.hpp>
|
||||||
|
|
||||||
#define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
|
#define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
|
||||||
//#define EVENT_DEBUG
|
#define EVENT_DEBUG
|
||||||
#ifdef EVENT_DEBUG
|
#ifdef EVENT_DEBUG
|
||||||
#define DBUG_ENTER_EVENT(A) DBUG_ENTER(A)
|
#define DBUG_ENTER_EVENT(A) DBUG_ENTER(A)
|
||||||
#define DBUG_RETURN_EVENT(A) DBUG_RETURN(A)
|
#define DBUG_RETURN_EVENT(A) DBUG_RETURN(A)
|
||||||
@@ -367,6 +367,8 @@ public:
|
|||||||
Uint32 m_eventId;
|
Uint32 m_eventId;
|
||||||
Uint32 m_oid;
|
Uint32 m_oid;
|
||||||
|
|
||||||
|
Bitmask<(unsigned int)_NDB_NODE_BITMASK_SIZE> m_node_bit_mask;
|
||||||
|
int m_ref_count;
|
||||||
bool m_mergeEvents;
|
bool m_mergeEvents;
|
||||||
|
|
||||||
EventBufData *m_data_item;
|
EventBufData *m_data_item;
|
||||||
@@ -406,10 +408,10 @@ public:
|
|||||||
void dropEventOperation(NdbEventOperation *);
|
void dropEventOperation(NdbEventOperation *);
|
||||||
static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp);
|
static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp);
|
||||||
|
|
||||||
void add_drop_lock();
|
void add_drop_lock() { NdbMutex_Lock(p_add_drop_mutex); }
|
||||||
void add_drop_unlock();
|
void add_drop_unlock() { NdbMutex_Unlock(p_add_drop_mutex); }
|
||||||
void lock();
|
void lock() { NdbMutex_Lock(m_mutex); }
|
||||||
void unlock();
|
void unlock() { NdbMutex_Unlock(m_mutex); }
|
||||||
|
|
||||||
void add_op();
|
void add_op();
|
||||||
void remove_op();
|
void remove_op();
|
||||||
@@ -430,9 +432,11 @@ public:
|
|||||||
Uint32 getEventId(int bufferId);
|
Uint32 getEventId(int bufferId);
|
||||||
|
|
||||||
int pollEvents(int aMillisecondNumber, Uint64 *latestGCI= 0);
|
int pollEvents(int aMillisecondNumber, Uint64 *latestGCI= 0);
|
||||||
|
int flushIncompleteEvents(Uint64 gci);
|
||||||
NdbEventOperation *nextEvent();
|
NdbEventOperation *nextEvent();
|
||||||
NdbEventOperationImpl* getGCIEventOperations(Uint32* iter,
|
NdbEventOperationImpl* getGCIEventOperations(Uint32* iter,
|
||||||
Uint32* event_types);
|
Uint32* event_types);
|
||||||
|
void NdbEventBuffer::deleteUsedEventOperations();
|
||||||
|
|
||||||
NdbEventOperationImpl *move_data();
|
NdbEventOperationImpl *move_data();
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user