diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index 06f946b013c..bcbfafccdd5 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -2829,10 +2829,41 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) assert(pOp->getGCI() <= ndb_latest_received_binlog_epoch); bzero((char*) &row, sizeof(row)); injector::transaction trans= inj->new_trans(thd); + { // pass table map before epoch + Uint32 iter=0; + const NdbEventOperation* gci_op; + Uint32 event_types; + while ((gci_op=ndb->getGCIEventOperations(&iter, &event_types)) + != NULL) + { + NDB_SHARE* share=(NDB_SHARE*)gci_op->getCustomData(); + DBUG_PRINT("info", ("per gci op %p share %p event types 0x%x", + gci_op, share, event_types)); + // this should not happen + if (share == NULL || share->table == NULL) + { + DBUG_PRINT("info", ("no share or table !")); + continue; + } + TABLE* table=share->table; + const LEX_STRING& name=table->s->table_name; + DBUG_PRINT("info", ("use_table: %.*s", name.length, name.str)); + injector::transaction::table tbl(table, true); + // TODO enable when mats patch pushed + //trans.use_table(::server_id, tbl); + } + } gci= pOp->getGCI(); if (apply_status_share) { TABLE *table= apply_status_share->table; + + const LEX_STRING& name=table->s->table_name; + DBUG_PRINT("info", ("use_table: %.*s", name.length, name.str)); + injector::transaction::table tbl(table, true); + // TODO enable when mats patch pushed + //trans.use_table(::server_id, tbl); + MY_BITMAP b; uint32 bitbuf; DBUG_ASSERT(table->s->fields <= sizeof(bitbuf) * 8); diff --git a/storage/ndb/include/ndbapi/Ndb.hpp b/storage/ndb/include/ndbapi/Ndb.hpp index f8337d4b281..ea7e47ea622 100644 --- a/storage/ndb/include/ndbapi/Ndb.hpp +++ b/storage/ndb/include/ndbapi/Ndb.hpp @@ -1240,6 +1240,18 @@ public: */ NdbEventOperation *nextEvent(); + /** + * Iterate over distinct event operations which are part of current + * GCI. Valid after nextEvent. Used to get summary information for + * the epoch (e.g. list of all tables) before processing event data. + * + * Set *iter=0 to start. Returns NULL when no more. If event_types + * is not NULL, it returns bitmask of received event types. + */ + const NdbEventOperation* + getGCIEventOperations(Uint32* iter, Uint32* event_types); + + #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL NdbEventOperation *getEventOperation(NdbEventOperation* eventOp= 0); Uint64 getLatestGCI(); diff --git a/storage/ndb/src/ndbapi/Ndb.cpp b/storage/ndb/src/ndbapi/Ndb.cpp index 994d5da4afc..a1e973b6e4b 100644 --- a/storage/ndb/src/ndbapi/Ndb.cpp +++ b/storage/ndb/src/ndbapi/Ndb.cpp @@ -1293,6 +1293,16 @@ NdbEventOperation *Ndb::nextEvent() return theEventBuffer->nextEvent(); } +const NdbEventOperation* +Ndb::getGCIEventOperations(Uint32* iter, Uint32* event_types) +{ + NdbEventOperationImpl* op = + theEventBuffer->getGCIEventOperations(iter, event_types); + if (op != NULL) + return op->m_facade; + return NULL; +} + Uint64 Ndb::getLatestGCI() { return theEventBuffer->getLatestGCI(); diff --git a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp index 82bfb8e2228..411c151410d 100644 --- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp +++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp @@ -1081,6 +1081,19 @@ NdbEventBuffer::nextEvent() DBUG_RETURN_EVENT(0); } +NdbEventOperationImpl* +NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types) +{ + if (*iter < m_available_data.m_gci_op_count) + { + EventBufData_list::Gci_op g = m_available_data.m_gci_op_list[(*iter)++]; + if (event_types != NULL) + *event_types = g.event_types; + return g.op; + } + return NULL; +} + void NdbEventBuffer::lock() { @@ -2061,7 +2074,36 @@ NdbEventBuffer::free_list(EventBufData_list &list) } // list returned to m_free_data - new (&list) EventBufData_list; + list.m_head = list.m_tail = NULL; + list.m_count = list.m_sz = 0; +} + +void +EventBufData_list::add_gci_op(Gci_op g) +{ + assert(g.op != NULL); + Uint32 i; + for (i = 0; i < m_gci_op_count; i++) { + if (m_gci_op_list[i].op == g.op) + break; + } + if (i < m_gci_op_count) { + m_gci_op_list[i].event_types |= g.event_types; + } else { + if (m_gci_op_count == m_gci_op_alloc) { + Uint32 n = 1 + 2 * m_gci_op_alloc; + Gci_op* old_list = m_gci_op_list; + m_gci_op_list = new Gci_op [n]; + if (m_gci_op_alloc != 0) { + Uint32 bytes = m_gci_op_alloc * sizeof(Gci_op); + memcpy(m_gci_op_list, old_list, bytes); + delete [] old_list; + } + m_gci_op_alloc = n; + } + assert(m_gci_op_count < m_gci_op_alloc); + m_gci_op_list[m_gci_op_count++] = g; + } } NdbEventOperation* diff --git a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp index 8ec135cd0dc..61d91d71332 100644 --- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp +++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp @@ -76,19 +76,31 @@ public: EventBufData *m_head, *m_tail; unsigned m_count; unsigned m_sz; + + // distinct ops per gci (assume no hash needed) + struct Gci_op { NdbEventOperationImpl* op; Uint32 event_types; }; + Gci_op* m_gci_op_list; + Uint32 m_gci_op_count; + Uint32 m_gci_op_alloc; +private: + void add_gci_op(Gci_op g); }; inline EventBufData_list::EventBufData_list() : m_head(0), m_tail(0), m_count(0), - m_sz(0) + m_sz(0), + m_gci_op_list(NULL), + m_gci_op_count(0), + m_gci_op_alloc(0) { } inline EventBufData_list::~EventBufData_list() { + delete [] m_gci_op_list; } inline @@ -110,6 +122,9 @@ void EventBufData_list::remove_first() inline void EventBufData_list::append(EventBufData *data) { + Gci_op g = { data->m_event_op, 1 << (Uint32)data->sdata->operation }; + add_gci_op(g); + data->m_next= 0; if (m_tail) m_tail->m_next= data; @@ -130,6 +145,10 @@ void EventBufData_list::append(EventBufData *data) inline void EventBufData_list::append(const EventBufData_list &list) { + Uint32 i; + for (i = 0; i < list.m_gci_op_count; i++) + add_gci_op(list.m_gci_op_list[i]); + if (m_tail) m_tail->m_next= list.m_head; else @@ -265,7 +284,6 @@ private: void receive_data(NdbRecAttr *r, const Uint32 *data, Uint32 sz); }; - class NdbEventBuffer { public: NdbEventBuffer(Ndb*); @@ -303,6 +321,8 @@ public: int pollEvents(int aMillisecondNumber, Uint64 *latestGCI= 0); NdbEventOperation *nextEvent(); + NdbEventOperationImpl* getGCIEventOperations(Uint32* iter, + Uint32* event_types); NdbEventOperationImpl *move_data();