1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-30 16:24:05 +03:00

Bug #18947 CRBR: order in binlog of create table and insert (on different table) not determ

- wait for schema event also if mysqld is a single one in cluster to ensure serialization with data events
+ some current_thd removals
+ enabling kill of sql thread during shema sync wait
This commit is contained in:
tomas@poseidon.ndb.mysql.com
2006-05-31 16:16:03 +02:00
parent df507201b2
commit b6f54a737a
4 changed files with 434 additions and 403 deletions

View File

@ -47,6 +47,10 @@ master-bin.000001 # Table_map 1 # table_id: # (test.t1)
flush logs; flush logs;
create table t3 (a int)ENGINE=NDB; create table t3 (a int)ENGINE=NDB;
start slave; start slave;
let $result_pattern= '%127.0.0.1%root%master-bin.000002%slave-relay-bin.000005%Yes%Yes%0%0%None%' ;
--source include/wait_slave_status.inc
flush logs; flush logs;
stop slave; stop slave;
create table t2 (n int)ENGINE=NDB; create table t2 (n int)ENGINE=NDB;

View File

@ -29,7 +29,7 @@ rpl_ndb_commit_afterflush : BUG#19328 2006-05-04 tomas Slave timeout with COM_RE
rpl_ndb_dd_partitions : BUG#19259 2006-04-21 rpl_ndb_dd_partitions fails on s/AMD rpl_ndb_dd_partitions : BUG#19259 2006-04-21 rpl_ndb_dd_partitions fails on s/AMD
rpl_ndb_ddl : BUG#18946 result file needs update + test needs to checked rpl_ndb_ddl : BUG#18946 result file needs update + test needs to checked
rpl_ndb_innodb2ndb : Bug #19710 Cluster replication to partition table fails on DELETE FROM statement rpl_ndb_innodb2ndb : Bug #19710 Cluster replication to partition table fails on DELETE FROM statement
rpl_ndb_log : BUG#18947 2006-03-21 tomas CRBR: order in binlog of create table and insert (on different table) not determ #rpl_ndb_log : BUG#18947 2006-03-21 tomas CRBR: order in binlog of create table and insert (on different table) not determ
rpl_ndb_myisam2ndb : Bug #19710 Cluster replication to partition table fails on DELETE FROM statement rpl_ndb_myisam2ndb : Bug #19710 Cluster replication to partition table fails on DELETE FROM statement
rpl_switch_stm_row_mixed : BUG#18590 2006-03-28 brian rpl_switch_stm_row_mixed : BUG#18590 2006-03-28 brian
rpl_row_blob_innodb : BUG#18980 2006-04-10 kent Test fails randomly rpl_row_blob_innodb : BUG#18980 2006-04-10 kent Test fails randomly

View File

@ -5054,6 +5054,7 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
const char *db, const char *db,
const char *table_name) const char *table_name)
{ {
THD *thd= current_thd;
DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table"); DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
NDBDICT *dict= ndb->getDictionary(); NDBDICT *dict= ndb->getDictionary();
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
@ -5085,7 +5086,7 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
ndb_table_version= h->m_table->getObjectVersion(); ndb_table_version= h->m_table->getObjectVersion();
} }
#endif #endif
h->release_metadata(current_thd, ndb); h->release_metadata(thd, ndb);
} }
else else
{ {
@ -5151,8 +5152,8 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
if (!IS_TMP_PREFIX(table_name) && share) if (!IS_TMP_PREFIX(table_name) && share)
{ {
ndbcluster_log_schema_op(current_thd, share, ndbcluster_log_schema_op(thd, share,
current_thd->query, current_thd->query_length, thd->query, thd->query_length,
share->db, share->table_name, share->db, share->table_name,
ndb_table_id, ndb_table_version, ndb_table_id, ndb_table_version,
SOT_DROP_TABLE, 0, 0, 1); SOT_DROP_TABLE, 0, 0, 1);
@ -5733,6 +5734,7 @@ int ndbcluster_drop_database_impl(const char *path)
static void ndbcluster_drop_database(char *path) static void ndbcluster_drop_database(char *path)
{ {
THD *thd= current_thd;
DBUG_ENTER("ndbcluster_drop_database"); DBUG_ENTER("ndbcluster_drop_database");
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
/* /*
@ -5750,8 +5752,8 @@ static void ndbcluster_drop_database(char *path)
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
char db[FN_REFLEN]; char db[FN_REFLEN];
ha_ndbcluster::set_dbname(path, db); ha_ndbcluster::set_dbname(path, db);
ndbcluster_log_schema_op(current_thd, 0, ndbcluster_log_schema_op(thd, 0,
current_thd->query, current_thd->query_length, thd->query, thd->query_length,
db, "", 0, 0, SOT_DROP_DB, 0, 0, 0); db, "", 0, 0, SOT_DROP_DB, 0, 0, 0);
#endif #endif
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
@ -6827,6 +6829,7 @@ static void dbug_print_open_tables()
*/ */
int handle_trailing_share(NDB_SHARE *share) int handle_trailing_share(NDB_SHARE *share)
{ {
THD *thd= current_thd;
static ulong trailing_share_id= 0; static ulong trailing_share_id= 0;
DBUG_ENTER("handle_trailing_share"); DBUG_ENTER("handle_trailing_share");
@ -6837,7 +6840,7 @@ int handle_trailing_share(NDB_SHARE *share)
bzero((char*) &table_list,sizeof(table_list)); bzero((char*) &table_list,sizeof(table_list));
table_list.db= share->db; table_list.db= share->db;
table_list.alias= table_list.table_name= share->table_name; table_list.alias= table_list.table_name= share->table_name;
close_cached_tables(current_thd, 0, &table_list, TRUE); close_cached_tables(thd, 0, &table_list, TRUE);
pthread_mutex_lock(&ndbcluster_mutex); pthread_mutex_lock(&ndbcluster_mutex);
if (!--share->use_count) if (!--share->use_count)

View File

@ -972,364 +972,6 @@ static char *ndb_pack_varchar(const NDBCOL *col, char *buf,
return buf; return buf;
} }
/*
log query in schema table
*/
static void ndb_report_waiting(const char *key,
int the_time,
const char *op,
const char *obj)
{
ulonglong ndb_latest_epoch= 0;
const char *proc_info= "<no info>";
pthread_mutex_lock(&injector_mutex);
if (injector_ndb)
ndb_latest_epoch= injector_ndb->getLatestGCI();
if (injector_thd)
proc_info= injector_thd->proc_info;
pthread_mutex_unlock(&injector_mutex);
sql_print_information("NDB %s:"
" waiting max %u sec for %s %s."
" epochs: (%u,%u,%u)"
" injector proc_info: %s"
,key, the_time, op, obj
,(uint)ndb_latest_handled_binlog_epoch
,(uint)ndb_latest_received_binlog_epoch
,(uint)ndb_latest_epoch
,proc_info
);
}
int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
const char *query, int query_length,
const char *db, const char *table_name,
uint32 ndb_table_id,
uint32 ndb_table_version,
enum SCHEMA_OP_TYPE type,
const char *new_db, const char *new_table_name,
int have_lock_open)
{
DBUG_ENTER("ndbcluster_log_schema_op");
Thd_ndb *thd_ndb= get_thd_ndb(thd);
if (!thd_ndb)
{
if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
{
sql_print_error("Could not allocate Thd_ndb object");
DBUG_RETURN(1);
}
set_thd_ndb(thd, thd_ndb);
}
DBUG_PRINT("enter",
("query: %s db: %s table_name: %s thd_ndb->options: %d",
query, db, table_name, thd_ndb->options));
if (!schema_share || thd_ndb->options & TNO_NO_LOG_SCHEMA_OP)
{
DBUG_RETURN(0);
}
char tmp_buf2[FN_REFLEN];
const char *type_str;
switch (type)
{
case SOT_DROP_TABLE:
/* drop database command, do not log at drop table */
if (thd->lex->sql_command == SQLCOM_DROP_DB)
DBUG_RETURN(0);
/* redo the drop table query as is may contain several tables */
query= tmp_buf2;
query_length= (uint) (strxmov(tmp_buf2, "drop table `",
table_name, "`", NullS) - tmp_buf2);
type_str= "drop table";
break;
case SOT_RENAME_TABLE:
/* redo the rename table query as is may contain several tables */
query= tmp_buf2;
query_length= (uint) (strxmov(tmp_buf2, "rename table `",
db, ".", table_name, "` to `",
new_db, ".", new_table_name, "`", NullS) - tmp_buf2);
type_str= "rename table";
break;
case SOT_CREATE_TABLE:
type_str= "create table";
break;
case SOT_ALTER_TABLE:
type_str= "create table";
break;
case SOT_DROP_DB:
type_str= "drop db";
break;
case SOT_CREATE_DB:
type_str= "create db";
break;
case SOT_ALTER_DB:
type_str= "alter db";
break;
case SOT_TABLESPACE:
type_str= "tablespace";
break;
case SOT_LOGFILE_GROUP:
type_str= "logfile group";
break;
default:
abort(); /* should not happen, programming error */
}
NDB_SCHEMA_OBJECT *ndb_schema_object;
{
char key[FN_REFLEN];
build_table_filename(key, sizeof(key), db, table_name, "");
ndb_schema_object= ndb_get_schema_object(key, TRUE, FALSE);
}
const NdbError *ndb_error= 0;
uint32 node_id= g_ndb_cluster_connection->node_id();
Uint64 epoch= 0;
MY_BITMAP schema_subscribers;
uint32 bitbuf[sizeof(ndb_schema_object->slock)/4];
uint32 bitbuf_e[sizeof(bitbuf)];
bzero((char *)bitbuf_e, sizeof(bitbuf_e));
{
int i, updated= 0;
int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, false);
bitmap_set_all(&schema_subscribers);
(void) pthread_mutex_lock(&schema_share->mutex);
for (i= 0; i < no_storage_nodes; i++)
{
MY_BITMAP *table_subscribers= &schema_share->subscriber_bitmap[i];
if (!bitmap_is_clear_all(table_subscribers))
{
bitmap_intersect(&schema_subscribers,
table_subscribers);
updated= 1;
}
}
(void) pthread_mutex_unlock(&schema_share->mutex);
if (updated)
bitmap_clear_bit(&schema_subscribers, node_id);
else
bitmap_clear_all(&schema_subscribers);
if (ndb_schema_object)
{
(void) pthread_mutex_lock(&ndb_schema_object->mutex);
memcpy(ndb_schema_object->slock, schema_subscribers.bitmap,
sizeof(ndb_schema_object->slock));
(void) pthread_mutex_unlock(&ndb_schema_object->mutex);
}
DBUG_DUMP("schema_subscribers", (char*)schema_subscribers.bitmap,
no_bytes_in_map(&schema_subscribers));
DBUG_PRINT("info", ("bitmap_is_clear_all(&schema_subscribers): %d",
bitmap_is_clear_all(&schema_subscribers)));
}
Ndb *ndb= thd_ndb->ndb;
char save_db[FN_REFLEN];
strcpy(save_db, ndb->getDatabaseName());
char tmp_buf[FN_REFLEN];
NDBDICT *dict= ndb->getDictionary();
ndb->setDatabaseName(NDB_REP_DB);
Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
const NDBTAB *ndbtab= ndbtab_g.get_table();
NdbTransaction *trans= 0;
int retries= 100;
const NDBCOL *col[SCHEMA_SIZE];
unsigned sz[SCHEMA_SIZE];
if (ndbtab == 0)
{
if (strcmp(NDB_REP_DB, db) != 0 ||
strcmp(NDB_SCHEMA_TABLE, table_name))
{
ndb_error= &dict->getNdbError();
}
goto end;
}
{
uint i;
for (i= 0; i < SCHEMA_SIZE; i++)
{
col[i]= ndbtab->getColumn(i);
if (i != SCHEMA_QUERY_I)
{
sz[i]= col[i]->getLength();
DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
}
}
}
while (1)
{
const char *log_db= db;
const char *log_tab= table_name;
const char *log_subscribers= (char*)schema_subscribers.bitmap;
uint32 log_type= (uint32)type;
if ((trans= ndb->startTransaction()) == 0)
goto err;
while (1)
{
NdbOperation *op= 0;
int r= 0;
r|= (op= trans->getNdbOperation(ndbtab)) == 0;
DBUG_ASSERT(r == 0);
r|= op->writeTuple();
DBUG_ASSERT(r == 0);
/* db */
ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, strlen(log_db));
r|= op->equal(SCHEMA_DB_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* name */
ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab,
strlen(log_tab));
r|= op->equal(SCHEMA_NAME_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* slock */
DBUG_ASSERT(sz[SCHEMA_SLOCK_I] == sizeof(bitbuf));
r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers);
DBUG_ASSERT(r == 0);
/* query */
{
NdbBlob *ndb_blob= op->getBlobHandle(SCHEMA_QUERY_I);
DBUG_ASSERT(ndb_blob != 0);
uint blob_len= query_length;
const char* blob_ptr= query;
r|= ndb_blob->setValue(blob_ptr, blob_len);
DBUG_ASSERT(r == 0);
}
/* node_id */
r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
DBUG_ASSERT(r == 0);
/* epoch */
r|= op->setValue(SCHEMA_EPOCH_I, epoch);
DBUG_ASSERT(r == 0);
/* id */
r|= op->setValue(SCHEMA_ID_I, ndb_table_id);
DBUG_ASSERT(r == 0);
/* version */
r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version);
DBUG_ASSERT(r == 0);
/* type */
r|= op->setValue(SCHEMA_TYPE_I, log_type);
DBUG_ASSERT(r == 0);
if (log_db != new_db && new_db && new_table_name)
{
log_db= new_db;
log_tab= new_table_name;
log_subscribers= (const char *)bitbuf_e; // no ack expected on this
log_type= (uint32)SOT_RENAME_TABLE_NEW;
continue;
}
break;
}
if (trans->execute(NdbTransaction::Commit) == 0)
{
dict->forceGCPWait();
DBUG_PRINT("info", ("logged: %s", query));
break;
}
err:
const NdbError *this_error= trans ?
&trans->getNdbError() : &ndb->getNdbError();
if (this_error->status == NdbError::TemporaryError)
{
if (retries--)
{
if (trans)
ndb->closeTransaction(trans);
continue; // retry
}
}
ndb_error= this_error;
break;
}
end:
if (ndb_error)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
ndb_error->code,
ndb_error->message,
"Could not log query '%s' on other mysqld's");
if (trans)
ndb->closeTransaction(trans);
ndb->setDatabaseName(save_db);
/*
Wait for other mysqld's to acknowledge the table operation
*/
if (ndb_error == 0 &&
!bitmap_is_clear_all(&schema_subscribers))
{
int max_timeout= opt_ndb_sync_timeout;
(void) pthread_mutex_lock(&ndb_schema_object->mutex);
if (have_lock_open)
{
safe_mutex_assert_owner(&LOCK_open);
(void) pthread_mutex_unlock(&LOCK_open);
}
while (1)
{
struct timespec abstime;
int i;
int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
set_timespec(abstime, 1);
int ret= pthread_cond_timedwait(&injector_cond,
&ndb_schema_object->mutex,
&abstime);
(void) pthread_mutex_lock(&schema_share->mutex);
for (i= 0; i < no_storage_nodes; i++)
{
/* remove any unsubscribed from schema_subscribers */
MY_BITMAP *tmp= &schema_share->subscriber_bitmap[i];
if (!bitmap_is_clear_all(tmp))
bitmap_intersect(&schema_subscribers, tmp);
}
(void) pthread_mutex_unlock(&schema_share->mutex);
/* remove any unsubscribed from ndb_schema_object->slock */
bitmap_intersect(&ndb_schema_object->slock_bitmap, &schema_subscribers);
DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
(char*)ndb_schema_object->slock_bitmap.bitmap,
no_bytes_in_map(&ndb_schema_object->slock_bitmap));
if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
break;
if (ret)
{
max_timeout--;
if (max_timeout == 0)
{
sql_print_error("NDB %s: distributing %s timed out. Ignoring...",
type_str, ndb_schema_object->key);
break;
}
if (ndb_extra_logging)
ndb_report_waiting(type_str, max_timeout,
"distributing", ndb_schema_object->key);
}
}
if (have_lock_open)
{
(void) pthread_mutex_lock(&LOCK_open);
}
(void) pthread_mutex_unlock(&ndb_schema_object->mutex);
}
if (ndb_schema_object)
ndb_free_schema_object(&ndb_schema_object, FALSE);
DBUG_RETURN(0);
}
/* /*
acknowledge handling of schema operation acknowledge handling of schema operation
*/ */
@ -1466,7 +1108,7 @@ ndbcluster_update_slock(THD *thd,
} }
end: end:
if (ndb_error) if (ndb_error)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG), ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
ndb_error->code, ndb_error->code,
ndb_error->message, ndb_error->message,
@ -1478,6 +1120,383 @@ end:
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/*
log query in schema table
*/
static void ndb_report_waiting(const char *key,
int the_time,
const char *op,
const char *obj)
{
ulonglong ndb_latest_epoch= 0;
const char *proc_info= "<no info>";
pthread_mutex_lock(&injector_mutex);
if (injector_ndb)
ndb_latest_epoch= injector_ndb->getLatestGCI();
if (injector_thd)
proc_info= injector_thd->proc_info;
pthread_mutex_unlock(&injector_mutex);
sql_print_information("NDB %s:"
" waiting max %u sec for %s %s."
" epochs: (%u,%u,%u)"
" injector proc_info: %s"
,key, the_time, op, obj
,(uint)ndb_latest_handled_binlog_epoch
,(uint)ndb_latest_received_binlog_epoch
,(uint)ndb_latest_epoch
,proc_info
);
}
int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
const char *query, int query_length,
const char *db, const char *table_name,
uint32 ndb_table_id,
uint32 ndb_table_version,
enum SCHEMA_OP_TYPE type,
const char *new_db, const char *new_table_name,
int have_lock_open)
{
DBUG_ENTER("ndbcluster_log_schema_op");
Thd_ndb *thd_ndb= get_thd_ndb(thd);
if (!thd_ndb)
{
if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
{
sql_print_error("Could not allocate Thd_ndb object");
DBUG_RETURN(1);
}
set_thd_ndb(thd, thd_ndb);
}
DBUG_PRINT("enter",
("query: %s db: %s table_name: %s thd_ndb->options: %d",
query, db, table_name, thd_ndb->options));
if (!schema_share || thd_ndb->options & TNO_NO_LOG_SCHEMA_OP)
{
DBUG_RETURN(0);
}
char tmp_buf2[FN_REFLEN];
const char *type_str;
switch (type)
{
case SOT_DROP_TABLE:
/* drop database command, do not log at drop table */
if (thd->lex->sql_command == SQLCOM_DROP_DB)
DBUG_RETURN(0);
/* redo the drop table query as is may contain several tables */
query= tmp_buf2;
query_length= (uint) (strxmov(tmp_buf2, "drop table `",
table_name, "`", NullS) - tmp_buf2);
type_str= "drop table";
break;
case SOT_RENAME_TABLE:
/* redo the rename table query as is may contain several tables */
query= tmp_buf2;
query_length= (uint) (strxmov(tmp_buf2, "rename table `",
db, ".", table_name, "` to `",
new_db, ".", new_table_name, "`", NullS) - tmp_buf2);
type_str= "rename table";
break;
case SOT_CREATE_TABLE:
type_str= "create table";
break;
case SOT_ALTER_TABLE:
type_str= "create table";
break;
case SOT_DROP_DB:
type_str= "drop db";
break;
case SOT_CREATE_DB:
type_str= "create db";
break;
case SOT_ALTER_DB:
type_str= "alter db";
break;
case SOT_TABLESPACE:
type_str= "tablespace";
break;
case SOT_LOGFILE_GROUP:
type_str= "logfile group";
break;
default:
abort(); /* should not happen, programming error */
}
NDB_SCHEMA_OBJECT *ndb_schema_object;
{
char key[FN_REFLEN];
build_table_filename(key, sizeof(key), db, table_name, "");
ndb_schema_object= ndb_get_schema_object(key, TRUE, FALSE);
}
const NdbError *ndb_error= 0;
uint32 node_id= g_ndb_cluster_connection->node_id();
Uint64 epoch= 0;
MY_BITMAP schema_subscribers;
uint32 bitbuf[sizeof(ndb_schema_object->slock)/4];
char bitbuf_e[sizeof(bitbuf)];
bzero(bitbuf_e, sizeof(bitbuf_e));
{
int i, updated= 0;
int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, false);
bitmap_set_all(&schema_subscribers);
(void) pthread_mutex_lock(&schema_share->mutex);
for (i= 0; i < no_storage_nodes; i++)
{
MY_BITMAP *table_subscribers= &schema_share->subscriber_bitmap[i];
if (!bitmap_is_clear_all(table_subscribers))
{
bitmap_intersect(&schema_subscribers,
table_subscribers);
updated= 1;
}
}
(void) pthread_mutex_unlock(&schema_share->mutex);
if (updated)
{
bitmap_clear_bit(&schema_subscribers, node_id);
/*
if setting own acknowledge bit it is important that
no other mysqld's are registred, as subsequent code
will cause the original event to be hidden (by blob
merge event code)
*/
if (bitmap_is_clear_all(&schema_subscribers))
bitmap_set_bit(&schema_subscribers, node_id);
}
else
bitmap_clear_all(&schema_subscribers);
if (ndb_schema_object)
{
(void) pthread_mutex_lock(&ndb_schema_object->mutex);
memcpy(ndb_schema_object->slock, schema_subscribers.bitmap,
sizeof(ndb_schema_object->slock));
(void) pthread_mutex_unlock(&ndb_schema_object->mutex);
}
DBUG_DUMP("schema_subscribers", (char*)schema_subscribers.bitmap,
no_bytes_in_map(&schema_subscribers));
DBUG_PRINT("info", ("bitmap_is_clear_all(&schema_subscribers): %d",
bitmap_is_clear_all(&schema_subscribers)));
}
Ndb *ndb= thd_ndb->ndb;
char save_db[FN_REFLEN];
strcpy(save_db, ndb->getDatabaseName());
char tmp_buf[FN_REFLEN];
NDBDICT *dict= ndb->getDictionary();
ndb->setDatabaseName(NDB_REP_DB);
Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
const NDBTAB *ndbtab= ndbtab_g.get_table();
NdbTransaction *trans= 0;
int retries= 100;
const NDBCOL *col[SCHEMA_SIZE];
unsigned sz[SCHEMA_SIZE];
if (ndbtab == 0)
{
if (strcmp(NDB_REP_DB, db) != 0 ||
strcmp(NDB_SCHEMA_TABLE, table_name))
{
ndb_error= &dict->getNdbError();
}
goto end;
}
{
uint i;
for (i= 0; i < SCHEMA_SIZE; i++)
{
col[i]= ndbtab->getColumn(i);
if (i != SCHEMA_QUERY_I)
{
sz[i]= col[i]->getLength();
DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
}
}
}
while (1)
{
const char *log_db= db;
const char *log_tab= table_name;
const char *log_subscribers= (char*)schema_subscribers.bitmap;
uint32 log_type= (uint32)type;
if ((trans= ndb->startTransaction()) == 0)
goto err;
while (1)
{
NdbOperation *op= 0;
int r= 0;
r|= (op= trans->getNdbOperation(ndbtab)) == 0;
DBUG_ASSERT(r == 0);
r|= op->writeTuple();
DBUG_ASSERT(r == 0);
/* db */
ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, strlen(log_db));
r|= op->equal(SCHEMA_DB_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* name */
ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab,
strlen(log_tab));
r|= op->equal(SCHEMA_NAME_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* slock */
DBUG_ASSERT(sz[SCHEMA_SLOCK_I] == sizeof(bitbuf));
r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers);
DBUG_ASSERT(r == 0);
/* query */
{
NdbBlob *ndb_blob= op->getBlobHandle(SCHEMA_QUERY_I);
DBUG_ASSERT(ndb_blob != 0);
uint blob_len= query_length;
const char* blob_ptr= query;
r|= ndb_blob->setValue(blob_ptr, blob_len);
DBUG_ASSERT(r == 0);
}
/* node_id */
r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
DBUG_ASSERT(r == 0);
/* epoch */
r|= op->setValue(SCHEMA_EPOCH_I, epoch);
DBUG_ASSERT(r == 0);
/* id */
r|= op->setValue(SCHEMA_ID_I, ndb_table_id);
DBUG_ASSERT(r == 0);
/* version */
r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version);
DBUG_ASSERT(r == 0);
/* type */
r|= op->setValue(SCHEMA_TYPE_I, log_type);
DBUG_ASSERT(r == 0);
if (log_db != new_db && new_db && new_table_name)
{
log_db= new_db;
log_tab= new_table_name;
log_subscribers= bitbuf_e; // no ack expected on this
log_type= (uint32)SOT_RENAME_TABLE_NEW;
continue;
}
break;
}
if (trans->execute(NdbTransaction::Commit) == 0)
{
DBUG_PRINT("info", ("logged: %s", query));
break;
}
err:
const NdbError *this_error= trans ?
&trans->getNdbError() : &ndb->getNdbError();
if (this_error->status == NdbError::TemporaryError)
{
if (retries--)
{
if (trans)
ndb->closeTransaction(trans);
continue; // retry
}
}
ndb_error= this_error;
break;
}
end:
if (ndb_error)
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
ndb_error->code,
ndb_error->message,
"Could not log query '%s' on other mysqld's");
if (trans)
ndb->closeTransaction(trans);
ndb->setDatabaseName(save_db);
/*
Wait for other mysqld's to acknowledge the table operation
*/
if (ndb_error == 0 &&
!bitmap_is_clear_all(&schema_subscribers))
{
/*
if own nodeid is set we are a single mysqld registred
as an optimization we update the slock directly
*/
if (bitmap_is_set(&schema_subscribers, node_id))
ndbcluster_update_slock(thd, db, table_name);
else
dict->forceGCPWait();
int max_timeout= opt_ndb_sync_timeout;
(void) pthread_mutex_lock(&ndb_schema_object->mutex);
if (have_lock_open)
{
safe_mutex_assert_owner(&LOCK_open);
(void) pthread_mutex_unlock(&LOCK_open);
}
while (1)
{
struct timespec abstime;
int i;
int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
set_timespec(abstime, 1);
int ret= pthread_cond_timedwait(&injector_cond,
&ndb_schema_object->mutex,
&abstime);
if (thd->killed)
break;
(void) pthread_mutex_lock(&schema_share->mutex);
for (i= 0; i < no_storage_nodes; i++)
{
/* remove any unsubscribed from schema_subscribers */
MY_BITMAP *tmp= &schema_share->subscriber_bitmap[i];
if (!bitmap_is_clear_all(tmp))
bitmap_intersect(&schema_subscribers, tmp);
}
(void) pthread_mutex_unlock(&schema_share->mutex);
/* remove any unsubscribed from ndb_schema_object->slock */
bitmap_intersect(&ndb_schema_object->slock_bitmap, &schema_subscribers);
DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
(char*)ndb_schema_object->slock_bitmap.bitmap,
no_bytes_in_map(&ndb_schema_object->slock_bitmap));
if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
break;
if (ret)
{
max_timeout--;
if (max_timeout == 0)
{
sql_print_error("NDB %s: distributing %s timed out. Ignoring...",
type_str, ndb_schema_object->key);
break;
}
if (ndb_extra_logging)
ndb_report_waiting(type_str, max_timeout,
"distributing", ndb_schema_object->key);
}
}
if (have_lock_open)
{
(void) pthread_mutex_lock(&LOCK_open);
}
(void) pthread_mutex_unlock(&ndb_schema_object->mutex);
}
if (ndb_schema_object)
ndb_free_schema_object(&ndb_schema_object, FALSE);
DBUG_RETURN(0);
}
/* /*
Handle _non_ data events from the storage nodes Handle _non_ data events from the storage nodes
*/ */
@ -1701,16 +1720,36 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, false); bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, false);
uint node_id= g_ndb_cluster_connection->node_id(); uint node_id= g_ndb_cluster_connection->node_id();
ndbcluster_get_schema(tmp_share, schema); ndbcluster_get_schema(tmp_share, schema);
DBUG_PRINT("info",
("%s.%s: log query_length: %d query: '%s' type: %d",
schema->db, schema->name,
schema->query_length, schema->query,
schema->type));
char key[FN_REFLEN];
build_table_filename(key, sizeof(key), schema->db, schema->name, "");
if ((enum SCHEMA_OP_TYPE)schema->type == SOT_CLEAR_SLOCK)
{
pthread_mutex_lock(&ndbcluster_mutex);
NDB_SCHEMA_OBJECT *ndb_schema_object=
(NDB_SCHEMA_OBJECT*) hash_search(&ndb_schema_objects,
(byte*) key, strlen(key));
if (ndb_schema_object)
{
pthread_mutex_lock(&ndb_schema_object->mutex);
memcpy(ndb_schema_object->slock, schema->slock,
sizeof(ndb_schema_object->slock));
DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
(char*)ndb_schema_object->slock_bitmap.bitmap,
no_bytes_in_map(&ndb_schema_object->slock_bitmap));
pthread_mutex_unlock(&ndb_schema_object->mutex);
pthread_cond_signal(&injector_cond);
}
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(0);
}
if (schema->node_id != node_id) if (schema->node_id != node_id)
{ {
int log_query= 0, post_epoch_unlock= 0; int log_query= 0, post_epoch_unlock= 0;
DBUG_PRINT("info",
("%s.%s: log query_length: %d query: '%s' type: %d",
schema->db, schema->name,
schema->query_length, schema->query,
schema->type));
char key[FN_REFLEN];
build_table_filename(key, sizeof(key), schema->db, schema->name, "");
switch ((enum SCHEMA_OP_TYPE)schema->type) switch ((enum SCHEMA_OP_TYPE)schema->type)
{ {
case SOT_DROP_TABLE: case SOT_DROP_TABLE:
@ -1759,30 +1798,12 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
TRUE, /* print error */ TRUE, /* print error */
FALSE); /* binlog the query */ FALSE); /* binlog the query */
break; break;
case SOT_CLEAR_SLOCK:
{
pthread_mutex_lock(&ndbcluster_mutex);
NDB_SCHEMA_OBJECT *ndb_schema_object=
(NDB_SCHEMA_OBJECT*) hash_search(&ndb_schema_objects,
(byte*) key, strlen(key));
if (ndb_schema_object)
{
pthread_mutex_lock(&ndb_schema_object->mutex);
memcpy(ndb_schema_object->slock, schema->slock,
sizeof(ndb_schema_object->slock));
DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
(char*)ndb_schema_object->slock_bitmap.bitmap,
no_bytes_in_map(&ndb_schema_object->slock_bitmap));
pthread_mutex_unlock(&ndb_schema_object->mutex);
pthread_cond_signal(&injector_cond);
}
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(0);
}
case SOT_TABLESPACE: case SOT_TABLESPACE:
case SOT_LOGFILE_GROUP: case SOT_LOGFILE_GROUP:
log_query= 1; log_query= 1;
break; break;
case SOT_CLEAR_SLOCK:
abort();
} }
if (log_query && ndb_binlog_running) if (log_query && ndb_binlog_running)
{ {
@ -2349,6 +2370,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
const char *event_name, NDB_SHARE *share, const char *event_name, NDB_SHARE *share,
int push_warning) int push_warning)
{ {
THD *thd= current_thd;
DBUG_ENTER("ndbcluster_create_event"); DBUG_ENTER("ndbcluster_create_event");
DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s", DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s",
ndbtab->getName(), ndbtab->getObjectVersion(), ndbtab->getName(), ndbtab->getObjectVersion(),
@ -2378,7 +2400,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
"with BLOB attribute and no PK is not supported", "with BLOB attribute and no PK is not supported",
share->key); share->key);
if (push_warning) if (push_warning)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_ILLEGAL_HA_CREATE_OPTION, ER_ILLEGAL_HA_CREATE_OPTION,
ER(ER_ILLEGAL_HA_CREATE_OPTION), ER(ER_ILLEGAL_HA_CREATE_OPTION),
ndbcluster_hton.name, ndbcluster_hton.name,
@ -2422,7 +2444,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
failed, print a warning failed, print a warning
*/ */
if (push_warning) if (push_warning)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG), ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
dict->getNdbError().code, dict->getNdbError().code,
dict->getNdbError().message, "NDB"); dict->getNdbError().message, "NDB");
@ -2450,7 +2472,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
dict->dropEvent(my_event.getName())) dict->dropEvent(my_event.getName()))
{ {
if (push_warning) if (push_warning)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG), ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
dict->getNdbError().code, dict->getNdbError().code,
dict->getNdbError().message, "NDB"); dict->getNdbError().message, "NDB");
@ -2469,7 +2491,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
if (dict->createEvent(my_event)) if (dict->createEvent(my_event))
{ {
if (push_warning) if (push_warning)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG), ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
dict->getNdbError().code, dict->getNdbError().code,
dict->getNdbError().message, "NDB"); dict->getNdbError().message, "NDB");
@ -2482,7 +2504,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
#ifdef NDB_BINLOG_EXTRA_WARNINGS #ifdef NDB_BINLOG_EXTRA_WARNINGS
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG), ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
0, "NDB Binlog: Removed trailing event", 0, "NDB Binlog: Removed trailing event",
"NDB"); "NDB");
@ -2511,6 +2533,7 @@ int
ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
const char *event_name) const char *event_name)
{ {
THD *thd= current_thd;
/* /*
we are in either create table or rename table so table should be we are in either create table or rename table so table should be
locked, hence we can work with the share without locks locked, hence we can work with the share without locks
@ -2584,7 +2607,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
{ {
sql_print_error("NDB Binlog: Creating NdbEventOperation failed for" sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
" %s",event_name); " %s",event_name);
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG), ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
ndb->getNdbError().code, ndb->getNdbError().code,
ndb->getNdbError().message, ndb->getNdbError().message,
@ -2634,7 +2657,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
sql_print_error("NDB Binlog: Creating NdbEventOperation" sql_print_error("NDB Binlog: Creating NdbEventOperation"
" blob field %u handles failed (code=%d) for %s", " blob field %u handles failed (code=%d) for %s",
j, op->getNdbError().code, event_name); j, op->getNdbError().code, event_name);
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG), ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
op->getNdbError().code, op->getNdbError().code,
op->getNdbError().message, op->getNdbError().message,
@ -2671,7 +2694,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
retries= 0; retries= 0;
if (retries == 0) if (retries == 0)
{ {
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG), ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
op->getNdbError().code, op->getNdbError().message, op->getNdbError().code, op->getNdbError().message,
"NDB"); "NDB");
@ -2727,7 +2750,7 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
if (dict->getNdbError().code != 4710) if (dict->getNdbError().code != 4710)
{ {
/* drop event failed for some reason, issue a warning */ /* drop event failed for some reason, issue a warning */
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG), ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
dict->getNdbError().code, dict->getNdbError().code,
dict->getNdbError().message, "NDB"); dict->getNdbError().message, "NDB");
@ -2780,7 +2803,8 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
int ret= pthread_cond_timedwait(&injector_cond, int ret= pthread_cond_timedwait(&injector_cond,
&share->mutex, &share->mutex,
&abstime); &abstime);
if (share->op == 0) if (thd->killed ||
share->op == 0)
break; break;
if (ret) if (ret)
{ {