diff --git a/mysql-test/r/rpl_ndb_log.result b/mysql-test/r/rpl_ndb_log.result index 5f6f040b715..c435fb37531 100644 --- a/mysql-test/r/rpl_ndb_log.result +++ b/mysql-test/r/rpl_ndb_log.result @@ -47,6 +47,10 @@ master-bin.000001 # Table_map 1 # table_id: # (test.t1) flush logs; create table t3 (a int)ENGINE=NDB; start slave; + +let $result_pattern= '%127.0.0.1%root%master-bin.000002%slave-relay-bin.000005%Yes%Yes%0%0%None%' ; + +--source include/wait_slave_status.inc flush logs; stop slave; create table t2 (n int)ENGINE=NDB; diff --git a/mysql-test/t/disabled.def b/mysql-test/t/disabled.def index 562006c7687..b4e37825326 100644 --- a/mysql-test/t/disabled.def +++ b/mysql-test/t/disabled.def @@ -29,7 +29,7 @@ rpl_ndb_commit_afterflush : BUG#19328 2006-05-04 tomas Slave timeout with COM_RE rpl_ndb_dd_partitions : BUG#19259 2006-04-21 rpl_ndb_dd_partitions fails on s/AMD rpl_ndb_ddl : BUG#18946 result file needs update + test needs to checked rpl_ndb_innodb2ndb : Bug #19710 Cluster replication to partition table fails on DELETE FROM statement -rpl_ndb_log : BUG#18947 2006-03-21 tomas CRBR: order in binlog of create table and insert (on different table) not determ +#rpl_ndb_log : BUG#18947 2006-03-21 tomas CRBR: order in binlog of create table and insert (on different table) not determ rpl_ndb_myisam2ndb : Bug #19710 Cluster replication to partition table fails on DELETE FROM statement rpl_switch_stm_row_mixed : BUG#18590 2006-03-28 brian rpl_row_blob_innodb : BUG#18980 2006-04-10 kent Test fails randomly diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 8b3ff56a847..787bc07f1f7 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -5054,6 +5054,7 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, const char *db, const char *table_name) { + THD *thd= current_thd; DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table"); NDBDICT *dict= ndb->getDictionary(); #ifdef HAVE_NDB_BINLOG @@ -5085,7 +5086,7 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, ndb_table_version= h->m_table->getObjectVersion(); } #endif - h->release_metadata(current_thd, ndb); + h->release_metadata(thd, ndb); } else { @@ -5151,8 +5152,8 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, if (!IS_TMP_PREFIX(table_name) && share) { - ndbcluster_log_schema_op(current_thd, share, - current_thd->query, current_thd->query_length, + ndbcluster_log_schema_op(thd, share, + thd->query, thd->query_length, share->db, share->table_name, ndb_table_id, ndb_table_version, SOT_DROP_TABLE, 0, 0, 1); @@ -5733,6 +5734,7 @@ int ndbcluster_drop_database_impl(const char *path) static void ndbcluster_drop_database(char *path) { + THD *thd= current_thd; DBUG_ENTER("ndbcluster_drop_database"); #ifdef HAVE_NDB_BINLOG /* @@ -5750,8 +5752,8 @@ static void ndbcluster_drop_database(char *path) #ifdef HAVE_NDB_BINLOG char db[FN_REFLEN]; ha_ndbcluster::set_dbname(path, db); - ndbcluster_log_schema_op(current_thd, 0, - current_thd->query, current_thd->query_length, + ndbcluster_log_schema_op(thd, 0, + thd->query, thd->query_length, db, "", 0, 0, SOT_DROP_DB, 0, 0, 0); #endif DBUG_VOID_RETURN; @@ -6827,6 +6829,7 @@ static void dbug_print_open_tables() */ int handle_trailing_share(NDB_SHARE *share) { + THD *thd= current_thd; static ulong trailing_share_id= 0; DBUG_ENTER("handle_trailing_share"); @@ -6837,7 +6840,7 @@ int handle_trailing_share(NDB_SHARE *share) bzero((char*) &table_list,sizeof(table_list)); table_list.db= share->db; table_list.alias= table_list.table_name= share->table_name; - close_cached_tables(current_thd, 0, &table_list, TRUE); + close_cached_tables(thd, 0, &table_list, TRUE); pthread_mutex_lock(&ndbcluster_mutex); if (!--share->use_count) diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index 452be961235..48273320cc0 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -972,364 +972,6 @@ static char *ndb_pack_varchar(const NDBCOL *col, char *buf, return buf; } -/* - log query in schema table -*/ -static void ndb_report_waiting(const char *key, - int the_time, - const char *op, - const char *obj) -{ - ulonglong ndb_latest_epoch= 0; - const char *proc_info= ""; - pthread_mutex_lock(&injector_mutex); - if (injector_ndb) - ndb_latest_epoch= injector_ndb->getLatestGCI(); - if (injector_thd) - proc_info= injector_thd->proc_info; - pthread_mutex_unlock(&injector_mutex); - sql_print_information("NDB %s:" - " waiting max %u sec for %s %s." - " epochs: (%u,%u,%u)" - " injector proc_info: %s" - ,key, the_time, op, obj - ,(uint)ndb_latest_handled_binlog_epoch - ,(uint)ndb_latest_received_binlog_epoch - ,(uint)ndb_latest_epoch - ,proc_info - ); -} - -int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, - const char *query, int query_length, - const char *db, const char *table_name, - uint32 ndb_table_id, - uint32 ndb_table_version, - enum SCHEMA_OP_TYPE type, - const char *new_db, const char *new_table_name, - int have_lock_open) -{ - DBUG_ENTER("ndbcluster_log_schema_op"); - Thd_ndb *thd_ndb= get_thd_ndb(thd); - if (!thd_ndb) - { - if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb())) - { - sql_print_error("Could not allocate Thd_ndb object"); - DBUG_RETURN(1); - } - set_thd_ndb(thd, thd_ndb); - } - - DBUG_PRINT("enter", - ("query: %s db: %s table_name: %s thd_ndb->options: %d", - query, db, table_name, thd_ndb->options)); - if (!schema_share || thd_ndb->options & TNO_NO_LOG_SCHEMA_OP) - { - DBUG_RETURN(0); - } - - char tmp_buf2[FN_REFLEN]; - const char *type_str; - switch (type) - { - case SOT_DROP_TABLE: - /* drop database command, do not log at drop table */ - if (thd->lex->sql_command == SQLCOM_DROP_DB) - DBUG_RETURN(0); - /* redo the drop table query as is may contain several tables */ - query= tmp_buf2; - query_length= (uint) (strxmov(tmp_buf2, "drop table `", - table_name, "`", NullS) - tmp_buf2); - type_str= "drop table"; - break; - case SOT_RENAME_TABLE: - /* redo the rename table query as is may contain several tables */ - query= tmp_buf2; - query_length= (uint) (strxmov(tmp_buf2, "rename table `", - db, ".", table_name, "` to `", - new_db, ".", new_table_name, "`", NullS) - tmp_buf2); - type_str= "rename table"; - break; - case SOT_CREATE_TABLE: - type_str= "create table"; - break; - case SOT_ALTER_TABLE: - type_str= "create table"; - break; - case SOT_DROP_DB: - type_str= "drop db"; - break; - case SOT_CREATE_DB: - type_str= "create db"; - break; - case SOT_ALTER_DB: - type_str= "alter db"; - break; - case SOT_TABLESPACE: - type_str= "tablespace"; - break; - case SOT_LOGFILE_GROUP: - type_str= "logfile group"; - break; - default: - abort(); /* should not happen, programming error */ - } - - NDB_SCHEMA_OBJECT *ndb_schema_object; - { - char key[FN_REFLEN]; - build_table_filename(key, sizeof(key), db, table_name, ""); - ndb_schema_object= ndb_get_schema_object(key, TRUE, FALSE); - } - - const NdbError *ndb_error= 0; - uint32 node_id= g_ndb_cluster_connection->node_id(); - Uint64 epoch= 0; - MY_BITMAP schema_subscribers; - uint32 bitbuf[sizeof(ndb_schema_object->slock)/4]; - uint32 bitbuf_e[sizeof(bitbuf)]; - bzero((char *)bitbuf_e, sizeof(bitbuf_e)); - { - int i, updated= 0; - int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes(); - bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, false); - bitmap_set_all(&schema_subscribers); - (void) pthread_mutex_lock(&schema_share->mutex); - for (i= 0; i < no_storage_nodes; i++) - { - MY_BITMAP *table_subscribers= &schema_share->subscriber_bitmap[i]; - if (!bitmap_is_clear_all(table_subscribers)) - { - bitmap_intersect(&schema_subscribers, - table_subscribers); - updated= 1; - } - } - (void) pthread_mutex_unlock(&schema_share->mutex); - if (updated) - bitmap_clear_bit(&schema_subscribers, node_id); - else - bitmap_clear_all(&schema_subscribers); - - if (ndb_schema_object) - { - (void) pthread_mutex_lock(&ndb_schema_object->mutex); - memcpy(ndb_schema_object->slock, schema_subscribers.bitmap, - sizeof(ndb_schema_object->slock)); - (void) pthread_mutex_unlock(&ndb_schema_object->mutex); - } - - DBUG_DUMP("schema_subscribers", (char*)schema_subscribers.bitmap, - no_bytes_in_map(&schema_subscribers)); - DBUG_PRINT("info", ("bitmap_is_clear_all(&schema_subscribers): %d", - bitmap_is_clear_all(&schema_subscribers))); - } - - Ndb *ndb= thd_ndb->ndb; - char save_db[FN_REFLEN]; - strcpy(save_db, ndb->getDatabaseName()); - - char tmp_buf[FN_REFLEN]; - NDBDICT *dict= ndb->getDictionary(); - ndb->setDatabaseName(NDB_REP_DB); - Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE); - const NDBTAB *ndbtab= ndbtab_g.get_table(); - NdbTransaction *trans= 0; - int retries= 100; - const NDBCOL *col[SCHEMA_SIZE]; - unsigned sz[SCHEMA_SIZE]; - - if (ndbtab == 0) - { - if (strcmp(NDB_REP_DB, db) != 0 || - strcmp(NDB_SCHEMA_TABLE, table_name)) - { - ndb_error= &dict->getNdbError(); - } - goto end; - } - - { - uint i; - for (i= 0; i < SCHEMA_SIZE; i++) - { - col[i]= ndbtab->getColumn(i); - if (i != SCHEMA_QUERY_I) - { - sz[i]= col[i]->getLength(); - DBUG_ASSERT(sz[i] <= sizeof(tmp_buf)); - } - } - } - - while (1) - { - const char *log_db= db; - const char *log_tab= table_name; - const char *log_subscribers= (char*)schema_subscribers.bitmap; - uint32 log_type= (uint32)type; - if ((trans= ndb->startTransaction()) == 0) - goto err; - while (1) - { - NdbOperation *op= 0; - int r= 0; - r|= (op= trans->getNdbOperation(ndbtab)) == 0; - DBUG_ASSERT(r == 0); - r|= op->writeTuple(); - DBUG_ASSERT(r == 0); - - /* db */ - ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, strlen(log_db)); - r|= op->equal(SCHEMA_DB_I, tmp_buf); - DBUG_ASSERT(r == 0); - /* name */ - ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab, - strlen(log_tab)); - r|= op->equal(SCHEMA_NAME_I, tmp_buf); - DBUG_ASSERT(r == 0); - /* slock */ - DBUG_ASSERT(sz[SCHEMA_SLOCK_I] == sizeof(bitbuf)); - r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers); - DBUG_ASSERT(r == 0); - /* query */ - { - NdbBlob *ndb_blob= op->getBlobHandle(SCHEMA_QUERY_I); - DBUG_ASSERT(ndb_blob != 0); - uint blob_len= query_length; - const char* blob_ptr= query; - r|= ndb_blob->setValue(blob_ptr, blob_len); - DBUG_ASSERT(r == 0); - } - /* node_id */ - r|= op->setValue(SCHEMA_NODE_ID_I, node_id); - DBUG_ASSERT(r == 0); - /* epoch */ - r|= op->setValue(SCHEMA_EPOCH_I, epoch); - DBUG_ASSERT(r == 0); - /* id */ - r|= op->setValue(SCHEMA_ID_I, ndb_table_id); - DBUG_ASSERT(r == 0); - /* version */ - r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version); - DBUG_ASSERT(r == 0); - /* type */ - r|= op->setValue(SCHEMA_TYPE_I, log_type); - DBUG_ASSERT(r == 0); - if (log_db != new_db && new_db && new_table_name) - { - log_db= new_db; - log_tab= new_table_name; - log_subscribers= (const char *)bitbuf_e; // no ack expected on this - log_type= (uint32)SOT_RENAME_TABLE_NEW; - continue; - } - break; - } - if (trans->execute(NdbTransaction::Commit) == 0) - { - dict->forceGCPWait(); - DBUG_PRINT("info", ("logged: %s", query)); - break; - } -err: - const NdbError *this_error= trans ? - &trans->getNdbError() : &ndb->getNdbError(); - if (this_error->status == NdbError::TemporaryError) - { - if (retries--) - { - if (trans) - ndb->closeTransaction(trans); - continue; // retry - } - } - ndb_error= this_error; - break; - } -end: - if (ndb_error) - push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, - ER_GET_ERRMSG, ER(ER_GET_ERRMSG), - ndb_error->code, - ndb_error->message, - "Could not log query '%s' on other mysqld's"); - - if (trans) - ndb->closeTransaction(trans); - ndb->setDatabaseName(save_db); - - /* - Wait for other mysqld's to acknowledge the table operation - */ - if (ndb_error == 0 && - !bitmap_is_clear_all(&schema_subscribers)) - { - int max_timeout= opt_ndb_sync_timeout; - (void) pthread_mutex_lock(&ndb_schema_object->mutex); - if (have_lock_open) - { - safe_mutex_assert_owner(&LOCK_open); - (void) pthread_mutex_unlock(&LOCK_open); - } - while (1) - { - struct timespec abstime; - int i; - int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes(); - set_timespec(abstime, 1); - int ret= pthread_cond_timedwait(&injector_cond, - &ndb_schema_object->mutex, - &abstime); - - (void) pthread_mutex_lock(&schema_share->mutex); - for (i= 0; i < no_storage_nodes; i++) - { - /* remove any unsubscribed from schema_subscribers */ - MY_BITMAP *tmp= &schema_share->subscriber_bitmap[i]; - if (!bitmap_is_clear_all(tmp)) - bitmap_intersect(&schema_subscribers, tmp); - } - (void) pthread_mutex_unlock(&schema_share->mutex); - - /* remove any unsubscribed from ndb_schema_object->slock */ - bitmap_intersect(&ndb_schema_object->slock_bitmap, &schema_subscribers); - - DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap", - (char*)ndb_schema_object->slock_bitmap.bitmap, - no_bytes_in_map(&ndb_schema_object->slock_bitmap)); - - if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap)) - break; - - if (ret) - { - max_timeout--; - if (max_timeout == 0) - { - sql_print_error("NDB %s: distributing %s timed out. Ignoring...", - type_str, ndb_schema_object->key); - break; - } - if (ndb_extra_logging) - ndb_report_waiting(type_str, max_timeout, - "distributing", ndb_schema_object->key); - } - } - if (have_lock_open) - { - (void) pthread_mutex_lock(&LOCK_open); - } - (void) pthread_mutex_unlock(&ndb_schema_object->mutex); - } - - if (ndb_schema_object) - ndb_free_schema_object(&ndb_schema_object, FALSE); - - DBUG_RETURN(0); -} - /* acknowledge handling of schema operation */ @@ -1466,7 +1108,7 @@ ndbcluster_update_slock(THD *thd, } end: if (ndb_error) - push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR, ER_GET_ERRMSG, ER(ER_GET_ERRMSG), ndb_error->code, ndb_error->message, @@ -1478,6 +1120,383 @@ end: DBUG_RETURN(0); } +/* + log query in schema table +*/ +static void ndb_report_waiting(const char *key, + int the_time, + const char *op, + const char *obj) +{ + ulonglong ndb_latest_epoch= 0; + const char *proc_info= ""; + pthread_mutex_lock(&injector_mutex); + if (injector_ndb) + ndb_latest_epoch= injector_ndb->getLatestGCI(); + if (injector_thd) + proc_info= injector_thd->proc_info; + pthread_mutex_unlock(&injector_mutex); + sql_print_information("NDB %s:" + " waiting max %u sec for %s %s." + " epochs: (%u,%u,%u)" + " injector proc_info: %s" + ,key, the_time, op, obj + ,(uint)ndb_latest_handled_binlog_epoch + ,(uint)ndb_latest_received_binlog_epoch + ,(uint)ndb_latest_epoch + ,proc_info + ); +} + +int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, + const char *query, int query_length, + const char *db, const char *table_name, + uint32 ndb_table_id, + uint32 ndb_table_version, + enum SCHEMA_OP_TYPE type, + const char *new_db, const char *new_table_name, + int have_lock_open) +{ + DBUG_ENTER("ndbcluster_log_schema_op"); + Thd_ndb *thd_ndb= get_thd_ndb(thd); + if (!thd_ndb) + { + if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb())) + { + sql_print_error("Could not allocate Thd_ndb object"); + DBUG_RETURN(1); + } + set_thd_ndb(thd, thd_ndb); + } + + DBUG_PRINT("enter", + ("query: %s db: %s table_name: %s thd_ndb->options: %d", + query, db, table_name, thd_ndb->options)); + if (!schema_share || thd_ndb->options & TNO_NO_LOG_SCHEMA_OP) + { + DBUG_RETURN(0); + } + + char tmp_buf2[FN_REFLEN]; + const char *type_str; + switch (type) + { + case SOT_DROP_TABLE: + /* drop database command, do not log at drop table */ + if (thd->lex->sql_command == SQLCOM_DROP_DB) + DBUG_RETURN(0); + /* redo the drop table query as is may contain several tables */ + query= tmp_buf2; + query_length= (uint) (strxmov(tmp_buf2, "drop table `", + table_name, "`", NullS) - tmp_buf2); + type_str= "drop table"; + break; + case SOT_RENAME_TABLE: + /* redo the rename table query as is may contain several tables */ + query= tmp_buf2; + query_length= (uint) (strxmov(tmp_buf2, "rename table `", + db, ".", table_name, "` to `", + new_db, ".", new_table_name, "`", NullS) - tmp_buf2); + type_str= "rename table"; + break; + case SOT_CREATE_TABLE: + type_str= "create table"; + break; + case SOT_ALTER_TABLE: + type_str= "create table"; + break; + case SOT_DROP_DB: + type_str= "drop db"; + break; + case SOT_CREATE_DB: + type_str= "create db"; + break; + case SOT_ALTER_DB: + type_str= "alter db"; + break; + case SOT_TABLESPACE: + type_str= "tablespace"; + break; + case SOT_LOGFILE_GROUP: + type_str= "logfile group"; + break; + default: + abort(); /* should not happen, programming error */ + } + + NDB_SCHEMA_OBJECT *ndb_schema_object; + { + char key[FN_REFLEN]; + build_table_filename(key, sizeof(key), db, table_name, ""); + ndb_schema_object= ndb_get_schema_object(key, TRUE, FALSE); + } + + const NdbError *ndb_error= 0; + uint32 node_id= g_ndb_cluster_connection->node_id(); + Uint64 epoch= 0; + MY_BITMAP schema_subscribers; + uint32 bitbuf[sizeof(ndb_schema_object->slock)/4]; + char bitbuf_e[sizeof(bitbuf)]; + bzero(bitbuf_e, sizeof(bitbuf_e)); + { + int i, updated= 0; + int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes(); + bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, false); + bitmap_set_all(&schema_subscribers); + (void) pthread_mutex_lock(&schema_share->mutex); + for (i= 0; i < no_storage_nodes; i++) + { + MY_BITMAP *table_subscribers= &schema_share->subscriber_bitmap[i]; + if (!bitmap_is_clear_all(table_subscribers)) + { + bitmap_intersect(&schema_subscribers, + table_subscribers); + updated= 1; + } + } + (void) pthread_mutex_unlock(&schema_share->mutex); + if (updated) + { + bitmap_clear_bit(&schema_subscribers, node_id); + /* + if setting own acknowledge bit it is important that + no other mysqld's are registred, as subsequent code + will cause the original event to be hidden (by blob + merge event code) + */ + if (bitmap_is_clear_all(&schema_subscribers)) + bitmap_set_bit(&schema_subscribers, node_id); + } + else + bitmap_clear_all(&schema_subscribers); + + if (ndb_schema_object) + { + (void) pthread_mutex_lock(&ndb_schema_object->mutex); + memcpy(ndb_schema_object->slock, schema_subscribers.bitmap, + sizeof(ndb_schema_object->slock)); + (void) pthread_mutex_unlock(&ndb_schema_object->mutex); + } + + DBUG_DUMP("schema_subscribers", (char*)schema_subscribers.bitmap, + no_bytes_in_map(&schema_subscribers)); + DBUG_PRINT("info", ("bitmap_is_clear_all(&schema_subscribers): %d", + bitmap_is_clear_all(&schema_subscribers))); + } + + Ndb *ndb= thd_ndb->ndb; + char save_db[FN_REFLEN]; + strcpy(save_db, ndb->getDatabaseName()); + + char tmp_buf[FN_REFLEN]; + NDBDICT *dict= ndb->getDictionary(); + ndb->setDatabaseName(NDB_REP_DB); + Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE); + const NDBTAB *ndbtab= ndbtab_g.get_table(); + NdbTransaction *trans= 0; + int retries= 100; + const NDBCOL *col[SCHEMA_SIZE]; + unsigned sz[SCHEMA_SIZE]; + + if (ndbtab == 0) + { + if (strcmp(NDB_REP_DB, db) != 0 || + strcmp(NDB_SCHEMA_TABLE, table_name)) + { + ndb_error= &dict->getNdbError(); + } + goto end; + } + + { + uint i; + for (i= 0; i < SCHEMA_SIZE; i++) + { + col[i]= ndbtab->getColumn(i); + if (i != SCHEMA_QUERY_I) + { + sz[i]= col[i]->getLength(); + DBUG_ASSERT(sz[i] <= sizeof(tmp_buf)); + } + } + } + + while (1) + { + const char *log_db= db; + const char *log_tab= table_name; + const char *log_subscribers= (char*)schema_subscribers.bitmap; + uint32 log_type= (uint32)type; + if ((trans= ndb->startTransaction()) == 0) + goto err; + while (1) + { + NdbOperation *op= 0; + int r= 0; + r|= (op= trans->getNdbOperation(ndbtab)) == 0; + DBUG_ASSERT(r == 0); + r|= op->writeTuple(); + DBUG_ASSERT(r == 0); + + /* db */ + ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, strlen(log_db)); + r|= op->equal(SCHEMA_DB_I, tmp_buf); + DBUG_ASSERT(r == 0); + /* name */ + ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab, + strlen(log_tab)); + r|= op->equal(SCHEMA_NAME_I, tmp_buf); + DBUG_ASSERT(r == 0); + /* slock */ + DBUG_ASSERT(sz[SCHEMA_SLOCK_I] == sizeof(bitbuf)); + r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers); + DBUG_ASSERT(r == 0); + /* query */ + { + NdbBlob *ndb_blob= op->getBlobHandle(SCHEMA_QUERY_I); + DBUG_ASSERT(ndb_blob != 0); + uint blob_len= query_length; + const char* blob_ptr= query; + r|= ndb_blob->setValue(blob_ptr, blob_len); + DBUG_ASSERT(r == 0); + } + /* node_id */ + r|= op->setValue(SCHEMA_NODE_ID_I, node_id); + DBUG_ASSERT(r == 0); + /* epoch */ + r|= op->setValue(SCHEMA_EPOCH_I, epoch); + DBUG_ASSERT(r == 0); + /* id */ + r|= op->setValue(SCHEMA_ID_I, ndb_table_id); + DBUG_ASSERT(r == 0); + /* version */ + r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version); + DBUG_ASSERT(r == 0); + /* type */ + r|= op->setValue(SCHEMA_TYPE_I, log_type); + DBUG_ASSERT(r == 0); + if (log_db != new_db && new_db && new_table_name) + { + log_db= new_db; + log_tab= new_table_name; + log_subscribers= bitbuf_e; // no ack expected on this + log_type= (uint32)SOT_RENAME_TABLE_NEW; + continue; + } + break; + } + if (trans->execute(NdbTransaction::Commit) == 0) + { + DBUG_PRINT("info", ("logged: %s", query)); + break; + } +err: + const NdbError *this_error= trans ? + &trans->getNdbError() : &ndb->getNdbError(); + if (this_error->status == NdbError::TemporaryError) + { + if (retries--) + { + if (trans) + ndb->closeTransaction(trans); + continue; // retry + } + } + ndb_error= this_error; + break; + } +end: + if (ndb_error) + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + ER_GET_ERRMSG, ER(ER_GET_ERRMSG), + ndb_error->code, + ndb_error->message, + "Could not log query '%s' on other mysqld's"); + + if (trans) + ndb->closeTransaction(trans); + ndb->setDatabaseName(save_db); + + /* + Wait for other mysqld's to acknowledge the table operation + */ + if (ndb_error == 0 && + !bitmap_is_clear_all(&schema_subscribers)) + { + /* + if own nodeid is set we are a single mysqld registred + as an optimization we update the slock directly + */ + if (bitmap_is_set(&schema_subscribers, node_id)) + ndbcluster_update_slock(thd, db, table_name); + else + dict->forceGCPWait(); + + int max_timeout= opt_ndb_sync_timeout; + (void) pthread_mutex_lock(&ndb_schema_object->mutex); + if (have_lock_open) + { + safe_mutex_assert_owner(&LOCK_open); + (void) pthread_mutex_unlock(&LOCK_open); + } + while (1) + { + struct timespec abstime; + int i; + int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes(); + set_timespec(abstime, 1); + int ret= pthread_cond_timedwait(&injector_cond, + &ndb_schema_object->mutex, + &abstime); + if (thd->killed) + break; + (void) pthread_mutex_lock(&schema_share->mutex); + for (i= 0; i < no_storage_nodes; i++) + { + /* remove any unsubscribed from schema_subscribers */ + MY_BITMAP *tmp= &schema_share->subscriber_bitmap[i]; + if (!bitmap_is_clear_all(tmp)) + bitmap_intersect(&schema_subscribers, tmp); + } + (void) pthread_mutex_unlock(&schema_share->mutex); + + /* remove any unsubscribed from ndb_schema_object->slock */ + bitmap_intersect(&ndb_schema_object->slock_bitmap, &schema_subscribers); + + DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap", + (char*)ndb_schema_object->slock_bitmap.bitmap, + no_bytes_in_map(&ndb_schema_object->slock_bitmap)); + + if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap)) + break; + + if (ret) + { + max_timeout--; + if (max_timeout == 0) + { + sql_print_error("NDB %s: distributing %s timed out. Ignoring...", + type_str, ndb_schema_object->key); + break; + } + if (ndb_extra_logging) + ndb_report_waiting(type_str, max_timeout, + "distributing", ndb_schema_object->key); + } + } + if (have_lock_open) + { + (void) pthread_mutex_lock(&LOCK_open); + } + (void) pthread_mutex_unlock(&ndb_schema_object->mutex); + } + + if (ndb_schema_object) + ndb_free_schema_object(&ndb_schema_object, FALSE); + + DBUG_RETURN(0); +} + /* Handle _non_ data events from the storage nodes */ @@ -1701,16 +1720,36 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, false); uint node_id= g_ndb_cluster_connection->node_id(); ndbcluster_get_schema(tmp_share, schema); + DBUG_PRINT("info", + ("%s.%s: log query_length: %d query: '%s' type: %d", + schema->db, schema->name, + schema->query_length, schema->query, + schema->type)); + char key[FN_REFLEN]; + build_table_filename(key, sizeof(key), schema->db, schema->name, ""); + if ((enum SCHEMA_OP_TYPE)schema->type == SOT_CLEAR_SLOCK) + { + pthread_mutex_lock(&ndbcluster_mutex); + NDB_SCHEMA_OBJECT *ndb_schema_object= + (NDB_SCHEMA_OBJECT*) hash_search(&ndb_schema_objects, + (byte*) key, strlen(key)); + if (ndb_schema_object) + { + pthread_mutex_lock(&ndb_schema_object->mutex); + memcpy(ndb_schema_object->slock, schema->slock, + sizeof(ndb_schema_object->slock)); + DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap", + (char*)ndb_schema_object->slock_bitmap.bitmap, + no_bytes_in_map(&ndb_schema_object->slock_bitmap)); + pthread_mutex_unlock(&ndb_schema_object->mutex); + pthread_cond_signal(&injector_cond); + } + pthread_mutex_unlock(&ndbcluster_mutex); + DBUG_RETURN(0); + } if (schema->node_id != node_id) { int log_query= 0, post_epoch_unlock= 0; - DBUG_PRINT("info", - ("%s.%s: log query_length: %d query: '%s' type: %d", - schema->db, schema->name, - schema->query_length, schema->query, - schema->type)); - char key[FN_REFLEN]; - build_table_filename(key, sizeof(key), schema->db, schema->name, ""); switch ((enum SCHEMA_OP_TYPE)schema->type) { case SOT_DROP_TABLE: @@ -1759,30 +1798,12 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, TRUE, /* print error */ FALSE); /* binlog the query */ break; - case SOT_CLEAR_SLOCK: - { - pthread_mutex_lock(&ndbcluster_mutex); - NDB_SCHEMA_OBJECT *ndb_schema_object= - (NDB_SCHEMA_OBJECT*) hash_search(&ndb_schema_objects, - (byte*) key, strlen(key)); - if (ndb_schema_object) - { - pthread_mutex_lock(&ndb_schema_object->mutex); - memcpy(ndb_schema_object->slock, schema->slock, - sizeof(ndb_schema_object->slock)); - DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap", - (char*)ndb_schema_object->slock_bitmap.bitmap, - no_bytes_in_map(&ndb_schema_object->slock_bitmap)); - pthread_mutex_unlock(&ndb_schema_object->mutex); - pthread_cond_signal(&injector_cond); - } - pthread_mutex_unlock(&ndbcluster_mutex); - DBUG_RETURN(0); - } case SOT_TABLESPACE: case SOT_LOGFILE_GROUP: log_query= 1; break; + case SOT_CLEAR_SLOCK: + abort(); } if (log_query && ndb_binlog_running) { @@ -2349,6 +2370,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, const char *event_name, NDB_SHARE *share, int push_warning) { + THD *thd= current_thd; DBUG_ENTER("ndbcluster_create_event"); DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s", ndbtab->getName(), ndbtab->getObjectVersion(), @@ -2378,7 +2400,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, "with BLOB attribute and no PK is not supported", share->key); if (push_warning) - push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR, ER_ILLEGAL_HA_CREATE_OPTION, ER(ER_ILLEGAL_HA_CREATE_OPTION), ndbcluster_hton.name, @@ -2422,7 +2444,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, failed, print a warning */ if (push_warning) - push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR, ER_GET_ERRMSG, ER(ER_GET_ERRMSG), dict->getNdbError().code, dict->getNdbError().message, "NDB"); @@ -2450,7 +2472,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, dict->dropEvent(my_event.getName())) { if (push_warning) - push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR, ER_GET_ERRMSG, ER(ER_GET_ERRMSG), dict->getNdbError().code, dict->getNdbError().message, "NDB"); @@ -2469,7 +2491,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, if (dict->createEvent(my_event)) { if (push_warning) - push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR, ER_GET_ERRMSG, ER(ER_GET_ERRMSG), dict->getNdbError().code, dict->getNdbError().message, "NDB"); @@ -2482,7 +2504,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, DBUG_RETURN(-1); } #ifdef NDB_BINLOG_EXTRA_WARNINGS - push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR, ER_GET_ERRMSG, ER(ER_GET_ERRMSG), 0, "NDB Binlog: Removed trailing event", "NDB"); @@ -2511,6 +2533,7 @@ int ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, const char *event_name) { + THD *thd= current_thd; /* we are in either create table or rename table so table should be locked, hence we can work with the share without locks @@ -2584,7 +2607,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, { sql_print_error("NDB Binlog: Creating NdbEventOperation failed for" " %s",event_name); - push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR, ER_GET_ERRMSG, ER(ER_GET_ERRMSG), ndb->getNdbError().code, ndb->getNdbError().message, @@ -2634,7 +2657,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, sql_print_error("NDB Binlog: Creating NdbEventOperation" " blob field %u handles failed (code=%d) for %s", j, op->getNdbError().code, event_name); - push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR, ER_GET_ERRMSG, ER(ER_GET_ERRMSG), op->getNdbError().code, op->getNdbError().message, @@ -2671,7 +2694,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, retries= 0; if (retries == 0) { - push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR, ER_GET_ERRMSG, ER(ER_GET_ERRMSG), op->getNdbError().code, op->getNdbError().message, "NDB"); @@ -2727,7 +2750,7 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, if (dict->getNdbError().code != 4710) { /* drop event failed for some reason, issue a warning */ - push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR, ER_GET_ERRMSG, ER(ER_GET_ERRMSG), dict->getNdbError().code, dict->getNdbError().message, "NDB"); @@ -2780,7 +2803,8 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, int ret= pthread_cond_timedwait(&injector_cond, &share->mutex, &abstime); - if (share->op == 0) + if (thd->killed || + share->op == 0) break; if (ret) {