mirror of
https://github.com/MariaDB/server.git
synced 2025-11-12 10:22:39 +03:00
Merge bk-internal:/home/bk/mysql-5.1-new
into neptunus.(none):/home/msvensson/mysql/mysql-5.1
This commit is contained in:
4
mysql-test/include/show_binlog_events.inc
Normal file
4
mysql-test/include/show_binlog_events.inc
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
--let $binlog_start=102
|
||||||
|
--replace_result $binlog_start <binlog_start>
|
||||||
|
--replace_column 2 # 4 # 5 #
|
||||||
|
--eval show binlog events from $binlog_start
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
drop database if exists mysqltest;
|
drop database if exists mysqltest;
|
||||||
drop table if exists t1,t2;
|
drop table if exists t1,t2,t3;
|
||||||
drop database if exists mysqltest;
|
drop database if exists mysqltest;
|
||||||
drop table if exists t1,t2;
|
drop table if exists t1,t2,t3;
|
||||||
reset master;
|
reset master;
|
||||||
reset master;
|
reset master;
|
||||||
create database mysqltest;
|
create database mysqltest;
|
||||||
@@ -123,3 +123,68 @@ master-bin1.000001 # Table_map # # cluster.apply_status
|
|||||||
master-bin1.000001 # Write_rows # #
|
master-bin1.000001 # Write_rows # #
|
||||||
master-bin1.000001 # Query # # COMMIT
|
master-bin1.000001 # Query # # COMMIT
|
||||||
master-bin1.000001 # Query # # use `test`; drop table `t1`
|
master-bin1.000001 # Query # # use `test`; drop table `t1`
|
||||||
|
reset master;
|
||||||
|
show tables;
|
||||||
|
Tables_in_test
|
||||||
|
reset master;
|
||||||
|
show tables;
|
||||||
|
Tables_in_test
|
||||||
|
create table t1 (a int key) engine=ndb;
|
||||||
|
create table t2 (a int key) engine=ndb;
|
||||||
|
create table t3 (a int key) engine=ndb;
|
||||||
|
rename table t3 to t4, t2 to t3, t1 to t2, t4 to t1;
|
||||||
|
show binlog events from <binlog_start>;
|
||||||
|
Log_name Pos Event_type Server_id End_log_pos Info
|
||||||
|
master-bin1.000001 # Query # # use `test`; create table t1 (a int key) engine=ndb
|
||||||
|
master-bin1.000001 # Query # # use `test`; create table t2 (a int key) engine=ndb
|
||||||
|
master-bin1.000001 # Query # # use `test`; create table t3 (a int key) engine=ndb
|
||||||
|
master-bin1.000001 # Query # # BEGIN
|
||||||
|
master-bin1.000001 # Table_map # # cluster.apply_status
|
||||||
|
master-bin1.000001 # Write_rows # #
|
||||||
|
master-bin1.000001 # Query # # COMMIT
|
||||||
|
master-bin1.000001 # Query # # use `test`; rename table `test.t3` to `test.t4`
|
||||||
|
master-bin1.000001 # Query # # BEGIN
|
||||||
|
master-bin1.000001 # Table_map # # cluster.apply_status
|
||||||
|
master-bin1.000001 # Write_rows # #
|
||||||
|
master-bin1.000001 # Query # # COMMIT
|
||||||
|
master-bin1.000001 # Query # # use `test`; rename table `test.t2` to `test.t3`
|
||||||
|
master-bin1.000001 # Query # # BEGIN
|
||||||
|
master-bin1.000001 # Table_map # # cluster.apply_status
|
||||||
|
master-bin1.000001 # Write_rows # #
|
||||||
|
master-bin1.000001 # Query # # COMMIT
|
||||||
|
master-bin1.000001 # Query # # use `test`; rename table `test.t1` to `test.t2`
|
||||||
|
master-bin1.000001 # Query # # BEGIN
|
||||||
|
master-bin1.000001 # Table_map # # cluster.apply_status
|
||||||
|
master-bin1.000001 # Write_rows # #
|
||||||
|
master-bin1.000001 # Query # # COMMIT
|
||||||
|
master-bin1.000001 # Query # # use `test`; rename table `test.t4` to `test.t1`
|
||||||
|
drop table t1;
|
||||||
|
drop table t2;
|
||||||
|
drop table t3;
|
||||||
|
reset master;
|
||||||
|
show tables;
|
||||||
|
Tables_in_test
|
||||||
|
reset master;
|
||||||
|
show tables;
|
||||||
|
Tables_in_test
|
||||||
|
create table t1 (a int key) engine=ndb;
|
||||||
|
insert into t1 values(1);
|
||||||
|
rename table t1 to t2;
|
||||||
|
insert into t2 values(2);
|
||||||
|
show binlog events from <binlog_start>;
|
||||||
|
Log_name Pos Event_type Server_id End_log_pos Info
|
||||||
|
master-bin1.000001 # Query # # use `test`; create table t1 (a int key) engine=ndb
|
||||||
|
master-bin1.000001 # Query # # BEGIN
|
||||||
|
master-bin1.000001 # Table_map # # cluster.apply_status
|
||||||
|
master-bin1.000001 # Write_rows # #
|
||||||
|
master-bin1.000001 # Table_map # # test.t1
|
||||||
|
master-bin1.000001 # Write_rows # #
|
||||||
|
master-bin1.000001 # Query # # COMMIT
|
||||||
|
master-bin1.000001 # Query # # use `test`; rename table `test.t1` to `test.t2`
|
||||||
|
master-bin1.000001 # Query # # BEGIN
|
||||||
|
master-bin1.000001 # Table_map # # cluster.apply_status
|
||||||
|
master-bin1.000001 # Write_rows # #
|
||||||
|
master-bin1.000001 # Table_map # # test.t2
|
||||||
|
master-bin1.000001 # Write_rows # #
|
||||||
|
master-bin1.000001 # Query # # COMMIT
|
||||||
|
drop table t2;
|
||||||
|
|||||||
@@ -5,18 +5,16 @@
|
|||||||
--disable_warnings
|
--disable_warnings
|
||||||
connection server2;
|
connection server2;
|
||||||
drop database if exists mysqltest;
|
drop database if exists mysqltest;
|
||||||
drop table if exists t1,t2;
|
drop table if exists t1,t2,t3;
|
||||||
connection server1;
|
connection server1;
|
||||||
drop database if exists mysqltest;
|
drop database if exists mysqltest;
|
||||||
drop table if exists t1,t2;
|
drop table if exists t1,t2,t3;
|
||||||
--connection server1
|
--connection server1
|
||||||
reset master;
|
reset master;
|
||||||
--connection server2
|
--connection server2
|
||||||
reset master;
|
reset master;
|
||||||
--enable_warnings
|
--enable_warnings
|
||||||
|
|
||||||
--let $binlog_start=102
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# basic test to see if ddl distribution works across
|
# basic test to see if ddl distribution works across
|
||||||
# multiple binlogs
|
# multiple binlogs
|
||||||
@@ -33,15 +31,10 @@ create table t1 (a int primary key) engine=ndb;
|
|||||||
|
|
||||||
--connection server2
|
--connection server2
|
||||||
create table t2 (a int primary key) engine=ndb;
|
create table t2 (a int primary key) engine=ndb;
|
||||||
--replace_result $binlog_start <binlog_start>
|
--source include/show_binlog_events.inc
|
||||||
--replace_column 2 # 4 # 5 #
|
|
||||||
--eval show binlog events from $binlog_start
|
|
||||||
|
|
||||||
--connection server1
|
--connection server1
|
||||||
--replace_result $binlog_start <binlog_start>
|
-- source include/show_binlog_events.inc
|
||||||
--replace_column 2 # 4 # 5 #
|
|
||||||
--eval show binlog events from $binlog_start
|
|
||||||
|
|
||||||
|
|
||||||
# alter table
|
# alter table
|
||||||
--connection server1
|
--connection server1
|
||||||
@@ -53,9 +46,7 @@ reset master;
|
|||||||
alter table t2 add column (b int);
|
alter table t2 add column (b int);
|
||||||
|
|
||||||
--connections server1
|
--connections server1
|
||||||
--replace_result $binlog_start <binlog_start>
|
--source include/show_binlog_events.inc
|
||||||
--replace_column 2 # 4 # 5 #
|
|
||||||
--eval show binlog events from $binlog_start
|
|
||||||
|
|
||||||
|
|
||||||
# alter database
|
# alter database
|
||||||
@@ -91,9 +82,7 @@ drop database mysqltest;
|
|||||||
create table t1 (a int primary key) engine=ndb;
|
create table t1 (a int primary key) engine=ndb;
|
||||||
|
|
||||||
--connection server2
|
--connection server2
|
||||||
--replace_result $binlog_start <binlog_start>
|
--source include/show_binlog_events.inc
|
||||||
--replace_column 2 # 4 # 5 #
|
|
||||||
--eval show binlog events from $binlog_start
|
|
||||||
|
|
||||||
--connection server2
|
--connection server2
|
||||||
drop table t2;
|
drop table t2;
|
||||||
@@ -144,6 +133,51 @@ ENGINE =NDB;
|
|||||||
drop table t1;
|
drop table t1;
|
||||||
|
|
||||||
--connection server2
|
--connection server2
|
||||||
--replace_result $binlog_start <binlog_start>
|
--source include/show_binlog_events.inc
|
||||||
--replace_column 2 # 4 # 5 #
|
|
||||||
--eval show binlog events from $binlog_start
|
#
|
||||||
|
# Bug #17827 cluster: rename of several tables in one statement,
|
||||||
|
# gets multiply logged
|
||||||
|
#
|
||||||
|
--connection server1
|
||||||
|
reset master;
|
||||||
|
show tables;
|
||||||
|
--connection server2
|
||||||
|
reset master;
|
||||||
|
show tables;
|
||||||
|
|
||||||
|
--connection server1
|
||||||
|
create table t1 (a int key) engine=ndb;
|
||||||
|
create table t2 (a int key) engine=ndb;
|
||||||
|
create table t3 (a int key) engine=ndb;
|
||||||
|
rename table t3 to t4, t2 to t3, t1 to t2, t4 to t1;
|
||||||
|
--connection server2
|
||||||
|
--source include/show_binlog_events.inc
|
||||||
|
|
||||||
|
drop table t1;
|
||||||
|
drop table t2;
|
||||||
|
drop table t3;
|
||||||
|
|
||||||
|
#
|
||||||
|
# Bug #17838 binlog not setup on seconday master after rename
|
||||||
|
#
|
||||||
|
#
|
||||||
|
--connection server1
|
||||||
|
reset master;
|
||||||
|
show tables;
|
||||||
|
--connection server2
|
||||||
|
reset master;
|
||||||
|
show tables;
|
||||||
|
|
||||||
|
--connection server1
|
||||||
|
create table t1 (a int key) engine=ndb;
|
||||||
|
insert into t1 values(1);
|
||||||
|
rename table t1 to t2;
|
||||||
|
insert into t2 values(2);
|
||||||
|
|
||||||
|
# now we should see data in table t1 _and_ t2
|
||||||
|
# prior to bug fix, data was missing for t2
|
||||||
|
--connection server2
|
||||||
|
--source include/show_binlog_events.inc
|
||||||
|
|
||||||
|
drop table t2;
|
||||||
|
|||||||
@@ -1367,15 +1367,17 @@ int ha_ndbcluster::drop_indexes(Ndb *ndb, TABLE *tab)
|
|||||||
*/
|
*/
|
||||||
NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint inx) const
|
NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint inx) const
|
||||||
{
|
{
|
||||||
return get_index_type_from_key(inx, table_share->key_info);
|
return get_index_type_from_key(inx, table_share->key_info,
|
||||||
|
inx == table_share->primary_key);
|
||||||
}
|
}
|
||||||
|
|
||||||
NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_key(uint inx,
|
NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_key(uint inx,
|
||||||
KEY *key_info) const
|
KEY *key_info,
|
||||||
|
bool primary) const
|
||||||
{
|
{
|
||||||
bool is_hash_index= (key_info[inx].algorithm ==
|
bool is_hash_index= (key_info[inx].algorithm ==
|
||||||
HA_KEY_ALG_HASH);
|
HA_KEY_ALG_HASH);
|
||||||
if (inx == table_share->primary_key)
|
if (primary)
|
||||||
return is_hash_index ? PRIMARY_KEY_INDEX : PRIMARY_KEY_ORDERED_INDEX;
|
return is_hash_index ? PRIMARY_KEY_INDEX : PRIMARY_KEY_ORDERED_INDEX;
|
||||||
|
|
||||||
return ((key_info[inx].flags & HA_NOSAME) ?
|
return ((key_info[inx].flags & HA_NOSAME) ?
|
||||||
@@ -4644,7 +4646,7 @@ int ha_ndbcluster::add_index(TABLE *table_arg,
|
|||||||
KEY *key= key_info + idx;
|
KEY *key= key_info + idx;
|
||||||
KEY_PART_INFO *key_part= key->key_part;
|
KEY_PART_INFO *key_part= key->key_part;
|
||||||
KEY_PART_INFO *end= key_part + key->key_parts;
|
KEY_PART_INFO *end= key_part + key->key_parts;
|
||||||
NDB_INDEX_TYPE idx_type= get_index_type_from_key(idx, key);
|
NDB_INDEX_TYPE idx_type= get_index_type_from_key(idx, key, false);
|
||||||
DBUG_PRINT("info", ("Adding index: '%s'", key_info[idx].name));
|
DBUG_PRINT("info", ("Adding index: '%s'", key_info[idx].name));
|
||||||
// Add fields to key_part struct
|
// Add fields to key_part struct
|
||||||
for (; key_part != end; key_part++)
|
for (; key_part != end; key_part++)
|
||||||
|
|||||||
@@ -719,7 +719,8 @@ private:
|
|||||||
void release_metadata();
|
void release_metadata();
|
||||||
NDB_INDEX_TYPE get_index_type(uint idx_no) const;
|
NDB_INDEX_TYPE get_index_type(uint idx_no) const;
|
||||||
NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const;
|
NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const;
|
||||||
NDB_INDEX_TYPE get_index_type_from_key(uint index_no, KEY *key_info) const;
|
NDB_INDEX_TYPE get_index_type_from_key(uint index_no, KEY *key_info,
|
||||||
|
bool primary) const;
|
||||||
int check_index_fields_not_null(uint index_no);
|
int check_index_fields_not_null(uint index_no);
|
||||||
|
|
||||||
uint set_up_partition_info(partition_info *part_info,
|
uint set_up_partition_info(partition_info *part_info,
|
||||||
|
|||||||
@@ -230,12 +230,25 @@ static void run_query(THD *thd, char *buf, char *end,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int
|
static void
|
||||||
|
ndbcluster_binlog_close_table(THD *thd, NDB_SHARE *share)
|
||||||
|
{
|
||||||
|
DBUG_ENTER("ndbcluster_binlog_close_table");
|
||||||
|
if (share->table_share)
|
||||||
|
{
|
||||||
|
free_table_share(share->table_share);
|
||||||
|
share->table_share= 0;
|
||||||
|
share->table= 0;
|
||||||
|
}
|
||||||
|
DBUG_ASSERT(share->table == 0);
|
||||||
|
DBUG_VOID_RETURN;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
|
ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
|
||||||
TABLE_SHARE *table_share, TABLE *table)
|
TABLE_SHARE *table_share, TABLE *table)
|
||||||
{
|
{
|
||||||
int error;
|
int error;
|
||||||
MEM_ROOT *mem_root= &share->mem_root;
|
|
||||||
DBUG_ENTER("ndbcluster_binlog_open_table");
|
DBUG_ENTER("ndbcluster_binlog_open_table");
|
||||||
|
|
||||||
init_tmp_table_share(table_share, share->db, 0, share->table_name,
|
init_tmp_table_share(table_share, share->db, 0, share->table_name,
|
||||||
@@ -274,22 +287,13 @@ ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
|
|||||||
table->s->table_name.str= share->table_name;
|
table->s->table_name.str= share->table_name;
|
||||||
table->s->table_name.length= strlen(share->table_name);
|
table->s->table_name.length= strlen(share->table_name);
|
||||||
|
|
||||||
|
DBUG_ASSERT(share->table_share == 0);
|
||||||
share->table_share= table_share;
|
share->table_share= table_share;
|
||||||
|
DBUG_ASSERT(share->table == 0);
|
||||||
share->table= table;
|
share->table= table;
|
||||||
#ifndef DBUG_OFF
|
#ifndef DBUG_OFF
|
||||||
dbug_print_table("table", table);
|
dbug_print_table("table", table);
|
||||||
#endif
|
#endif
|
||||||
/*
|
|
||||||
! do not touch the contents of the table
|
|
||||||
it may be in use by the injector thread
|
|
||||||
*/
|
|
||||||
share->ndb_value[0]= (NdbValue*)
|
|
||||||
alloc_root(mem_root, sizeof(NdbValue) *
|
|
||||||
(table->s->fields + 2 /*extra for hidden key and part key*/));
|
|
||||||
share->ndb_value[1]= (NdbValue*)
|
|
||||||
alloc_root(mem_root, sizeof(NdbValue) *
|
|
||||||
(table->s->fields + 2 /*extra for hidden key and part key*/));
|
|
||||||
|
|
||||||
DBUG_RETURN(0);
|
DBUG_RETURN(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -351,6 +355,18 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
|
|||||||
TABLE *table= (TABLE*) my_malloc(sizeof(*table), MYF(MY_WME));
|
TABLE *table= (TABLE*) my_malloc(sizeof(*table), MYF(MY_WME));
|
||||||
if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table)))
|
if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table)))
|
||||||
break;
|
break;
|
||||||
|
/*
|
||||||
|
! do not touch the contents of the table
|
||||||
|
it may be in use by the injector thread
|
||||||
|
*/
|
||||||
|
MEM_ROOT *mem_root= &share->mem_root;
|
||||||
|
share->ndb_value[0]= (NdbValue*)
|
||||||
|
alloc_root(mem_root, sizeof(NdbValue) *
|
||||||
|
(table->s->fields + 2 /*extra for hidden key and part key*/));
|
||||||
|
share->ndb_value[1]= (NdbValue*)
|
||||||
|
alloc_root(mem_root, sizeof(NdbValue) *
|
||||||
|
(table->s->fields + 2 /*extra for hidden key and part key*/));
|
||||||
|
|
||||||
if (table->s->primary_key == MAX_KEY)
|
if (table->s->primary_key == MAX_KEY)
|
||||||
share->flags|= NSF_HIDDEN_PK;
|
share->flags|= NSF_HIDDEN_PK;
|
||||||
if (table->s->blob_fields != 0)
|
if (table->s->blob_fields != 0)
|
||||||
@@ -1156,8 +1172,11 @@ end:
|
|||||||
(void) pthread_mutex_unlock(&share->mutex);
|
(void) pthread_mutex_unlock(&share->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (get_a_share)
|
if (get_a_share && share)
|
||||||
|
{
|
||||||
free_share(&share);
|
free_share(&share);
|
||||||
|
share= 0;
|
||||||
|
}
|
||||||
|
|
||||||
DBUG_RETURN(0);
|
DBUG_RETURN(0);
|
||||||
}
|
}
|
||||||
@@ -1314,22 +1333,35 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
|
|||||||
NDB_SHARE *share)
|
NDB_SHARE *share)
|
||||||
{
|
{
|
||||||
DBUG_ENTER("ndb_handle_schema_change");
|
DBUG_ENTER("ndb_handle_schema_change");
|
||||||
int remote_drop_table= 0, do_close_cached_tables= 0;
|
bool do_close_cached_tables= FALSE;
|
||||||
const char *dbname= share->table->s->db.str;
|
bool is_online_alter_table= FALSE;
|
||||||
const char *tabname= share->table->s->table_name.str;
|
bool is_rename_table= FALSE;
|
||||||
bool online_alter_table= (pOp->getEventType() == NDBEVENT::TE_ALTER &&
|
bool is_remote_change=
|
||||||
pOp->tableFrmChanged());
|
(uint) pOp->getReqNodeId() != g_ndb_cluster_connection->node_id();
|
||||||
|
|
||||||
if (pOp->getEventType() != NDBEVENT::TE_CLUSTER_FAILURE &&
|
if (pOp->getEventType() == NDBEVENT::TE_ALTER)
|
||||||
(uint) pOp->getReqNodeId() != g_ndb_cluster_connection->node_id())
|
{
|
||||||
|
if (pOp->tableFrmChanged())
|
||||||
|
{
|
||||||
|
is_online_alter_table= TRUE;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
DBUG_ASSERT(pOp->tableNameChanged());
|
||||||
|
is_rename_table= TRUE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is_remote_change) /* includes CLUSTER_FAILURE */
|
||||||
{
|
{
|
||||||
TABLE_SHARE *table_share= share->table->s;
|
|
||||||
TABLE* table= share->table;
|
TABLE* table= share->table;
|
||||||
|
TABLE_SHARE *table_share= table->s;
|
||||||
|
const char *dbname= table_share->db.str;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Invalidate table and all it's indexes
|
Invalidate table and all it's indexes
|
||||||
*/
|
*/
|
||||||
ndb->setDatabaseName(share->table->s->db.str);
|
ndb->setDatabaseName(dbname);
|
||||||
Thd_ndb *thd_ndb= get_thd_ndb(thd);
|
Thd_ndb *thd_ndb= get_thd_ndb(thd);
|
||||||
DBUG_ASSERT(thd_ndb != NULL);
|
DBUG_ASSERT(thd_ndb != NULL);
|
||||||
Ndb* old_ndb= thd_ndb->ndb;
|
Ndb* old_ndb= thd_ndb->ndb;
|
||||||
@@ -1341,8 +1373,9 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
|
|||||||
table_handler.invalidate_dictionary_cache(TRUE);
|
table_handler.invalidate_dictionary_cache(TRUE);
|
||||||
thd_ndb->ndb= old_ndb;
|
thd_ndb->ndb= old_ndb;
|
||||||
|
|
||||||
if (online_alter_table)
|
if (is_online_alter_table)
|
||||||
{
|
{
|
||||||
|
const char *tabname= table_share->table_name.str;
|
||||||
char key[FN_REFLEN];
|
char key[FN_REFLEN];
|
||||||
const void *data= 0, *pack_data= 0;
|
const void *data= 0, *pack_data= 0;
|
||||||
uint length, pack_length;
|
uint length, pack_length;
|
||||||
@@ -1375,6 +1408,7 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
|
|||||||
sql_print_information("NDB: Failed write frm for %s.%s, error %d",
|
sql_print_information("NDB: Failed write frm for %s.%s, error %d",
|
||||||
dbname, tabname, error);
|
dbname, tabname, error);
|
||||||
}
|
}
|
||||||
|
ndbcluster_binlog_close_table(thd, share);
|
||||||
close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE);
|
close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE);
|
||||||
if ((error= ndbcluster_binlog_open_table(thd, share,
|
if ((error= ndbcluster_binlog_open_table(thd, share,
|
||||||
table_share, table)))
|
table_share, table)))
|
||||||
@@ -1383,11 +1417,10 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
|
|||||||
pthread_mutex_unlock(&LOCK_open);
|
pthread_mutex_unlock(&LOCK_open);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
remote_drop_table= 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If only frm was changed continue replicating
|
// If only frm was changed continue replicating
|
||||||
if (online_alter_table)
|
if (is_online_alter_table)
|
||||||
{
|
{
|
||||||
/* Signal ha_ndbcluster::alter_table that drop is done */
|
/* Signal ha_ndbcluster::alter_table that drop is done */
|
||||||
(void) pthread_cond_signal(&injector_cond);
|
(void) pthread_cond_signal(&injector_cond);
|
||||||
@@ -1395,6 +1428,22 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
|
|||||||
}
|
}
|
||||||
|
|
||||||
(void) pthread_mutex_lock(&share->mutex);
|
(void) pthread_mutex_lock(&share->mutex);
|
||||||
|
if (is_rename_table && !is_remote_change)
|
||||||
|
{
|
||||||
|
DBUG_PRINT("info", ("Detected name change of table %s.%s",
|
||||||
|
share->db, share->table_name));
|
||||||
|
/* ToDo: remove printout */
|
||||||
|
if (ndb_extra_logging)
|
||||||
|
sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
|
||||||
|
share_prefix, share->table->s->db.str,
|
||||||
|
share->table->s->table_name.str,
|
||||||
|
share->key);
|
||||||
|
/* do the rename of the table in the share */
|
||||||
|
share->table->s->db.str= share->db;
|
||||||
|
share->table->s->db.length= strlen(share->db);
|
||||||
|
share->table->s->table_name.str= share->table_name;
|
||||||
|
share->table->s->table_name.length= strlen(share->table_name);
|
||||||
|
}
|
||||||
DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
|
DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
|
||||||
if (share->op_old == pOp)
|
if (share->op_old == pOp)
|
||||||
share->op_old= 0;
|
share->op_old= 0;
|
||||||
@@ -1408,11 +1457,11 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
|
|||||||
|
|
||||||
pthread_mutex_lock(&ndbcluster_mutex);
|
pthread_mutex_lock(&ndbcluster_mutex);
|
||||||
free_share(&share, TRUE);
|
free_share(&share, TRUE);
|
||||||
if (remote_drop_table && share && share->state != NSS_DROPPED)
|
if (is_remote_change && share && share->state != NSS_DROPPED)
|
||||||
{
|
{
|
||||||
DBUG_PRINT("info", ("remote drop table"));
|
DBUG_PRINT("info", ("remote change"));
|
||||||
if (share->use_count != 1)
|
if (share->use_count != 1)
|
||||||
do_close_cached_tables= 1;
|
do_close_cached_tables= TRUE;
|
||||||
share->state= NSS_DROPPED;
|
share->state= NSS_DROPPED;
|
||||||
free_share(&share, TRUE);
|
free_share(&share, TRUE);
|
||||||
}
|
}
|
||||||
@@ -1464,24 +1513,45 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
|
|||||||
int log_query= 0;
|
int log_query= 0;
|
||||||
DBUG_PRINT("info", ("log query_length: %d query: '%s'",
|
DBUG_PRINT("info", ("log query_length: %d query: '%s'",
|
||||||
schema->query_length, schema->query));
|
schema->query_length, schema->query));
|
||||||
|
char key[FN_REFLEN];
|
||||||
|
build_table_filename(key, sizeof(key), schema->db, schema->name, "");
|
||||||
|
NDB_SHARE *share= get_share(key, 0, false, false);
|
||||||
|
|
||||||
switch ((enum SCHEMA_OP_TYPE)schema->type)
|
switch ((enum SCHEMA_OP_TYPE)schema->type)
|
||||||
{
|
{
|
||||||
case SOT_DROP_TABLE:
|
case SOT_DROP_TABLE:
|
||||||
/* binlog dropping table after any table operations */
|
/* binlog dropping table after any table operations */
|
||||||
if (ndb_binlog_running)
|
if (share && share->op)
|
||||||
post_epoch_log_list->push_back(schema, mem_root);
|
post_epoch_log_list->push_back(schema, mem_root);
|
||||||
log_query= 0;
|
log_query= 0;
|
||||||
break;
|
break;
|
||||||
case SOT_RENAME_TABLE:
|
case SOT_RENAME_TABLE:
|
||||||
/* fall through */
|
if (share && share->op)
|
||||||
case SOT_ALTER_TABLE:
|
|
||||||
if (ndb_binlog_running)
|
|
||||||
{
|
{
|
||||||
log_query= 1;
|
log_query= 0;
|
||||||
|
post_epoch_log_list->push_back(schema, mem_root);
|
||||||
break; /* discovery will be handled by binlog */
|
break; /* discovery will be handled by binlog */
|
||||||
}
|
}
|
||||||
/* fall through */
|
goto sot_create_table;
|
||||||
|
case SOT_ALTER_TABLE:
|
||||||
|
if (share && share->op)
|
||||||
|
{
|
||||||
|
log_query= 0;
|
||||||
|
post_epoch_log_list->push_back(schema, mem_root);
|
||||||
|
break; /* discovery will be handled by binlog */
|
||||||
|
}
|
||||||
|
goto sot_create_table;
|
||||||
case SOT_CREATE_TABLE:
|
case SOT_CREATE_TABLE:
|
||||||
|
sot_create_table:
|
||||||
|
/*
|
||||||
|
we need to free any share here as command below
|
||||||
|
may need to call handle_trailing_share
|
||||||
|
*/
|
||||||
|
if (share)
|
||||||
|
{
|
||||||
|
free_share(&share);
|
||||||
|
share= 0;
|
||||||
|
}
|
||||||
pthread_mutex_lock(&LOCK_open);
|
pthread_mutex_lock(&LOCK_open);
|
||||||
if (ndb_create_table_from_engine(thd, schema->db, schema->name))
|
if (ndb_create_table_from_engine(thd, schema->db, schema->name))
|
||||||
{
|
{
|
||||||
@@ -1514,10 +1584,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
|
|||||||
break;
|
break;
|
||||||
case SOT_CLEAR_SLOCK:
|
case SOT_CLEAR_SLOCK:
|
||||||
{
|
{
|
||||||
char key[FN_REFLEN];
|
|
||||||
build_table_filename(key, sizeof(key),
|
|
||||||
schema->db, schema->name, "");
|
|
||||||
NDB_SHARE *share= get_share(key, 0, false, false);
|
|
||||||
if (share)
|
if (share)
|
||||||
{
|
{
|
||||||
pthread_mutex_lock(&share->mutex);
|
pthread_mutex_lock(&share->mutex);
|
||||||
@@ -1528,6 +1594,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
|
|||||||
pthread_mutex_unlock(&share->mutex);
|
pthread_mutex_unlock(&share->mutex);
|
||||||
pthread_cond_signal(&injector_cond);
|
pthread_cond_signal(&injector_cond);
|
||||||
free_share(&share);
|
free_share(&share);
|
||||||
|
share= 0;
|
||||||
}
|
}
|
||||||
DBUG_RETURN(0);
|
DBUG_RETURN(0);
|
||||||
}
|
}
|
||||||
@@ -1536,7 +1603,11 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
|
|||||||
log_query= 1;
|
log_query= 1;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if (share)
|
||||||
|
{
|
||||||
|
free_share(&share);
|
||||||
|
share= 0;
|
||||||
|
}
|
||||||
/* signal that schema operation has been handled */
|
/* signal that schema operation has been handled */
|
||||||
if ((enum SCHEMA_OP_TYPE)schema->type != SOT_CLEAR_SLOCK)
|
if ((enum SCHEMA_OP_TYPE)schema->type != SOT_CLEAR_SLOCK)
|
||||||
{
|
{
|
||||||
@@ -1571,23 +1642,12 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
|
|||||||
case NDBEVENT::TE_DELETE:
|
case NDBEVENT::TE_DELETE:
|
||||||
// skip
|
// skip
|
||||||
break;
|
break;
|
||||||
case NDBEVENT::TE_ALTER:
|
|
||||||
if (pOp->tableNameChanged())
|
|
||||||
{
|
|
||||||
DBUG_PRINT("info", ("Detected name change of table %s.%s",
|
|
||||||
share->db, share->table_name));
|
|
||||||
/* do the rename of the table in the share */
|
|
||||||
share->table->s->db.str= share->db;
|
|
||||||
share->table->s->db.length= strlen(share->db);
|
|
||||||
share->table->s->table_name.str= share->table_name;
|
|
||||||
share->table->s->table_name.length= strlen(share->table_name);
|
|
||||||
}
|
|
||||||
ndb_handle_schema_change(thd, ndb, pOp, share);
|
|
||||||
break;
|
|
||||||
case NDBEVENT::TE_CLUSTER_FAILURE:
|
case NDBEVENT::TE_CLUSTER_FAILURE:
|
||||||
case NDBEVENT::TE_DROP:
|
case NDBEVENT::TE_DROP:
|
||||||
free_share(&schema_share);
|
free_share(&schema_share);
|
||||||
schema_share= 0;
|
schema_share= 0;
|
||||||
|
// fall through
|
||||||
|
case NDBEVENT::TE_ALTER:
|
||||||
ndb_handle_schema_change(thd, ndb, pOp, share);
|
ndb_handle_schema_change(thd, ndb, pOp, share);
|
||||||
break;
|
break;
|
||||||
case NDBEVENT::TE_NODE_FAILURE:
|
case NDBEVENT::TE_NODE_FAILURE:
|
||||||
@@ -1659,6 +1719,72 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
|
|||||||
DBUG_RETURN(0);
|
DBUG_RETURN(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
process any operations that should be done after
|
||||||
|
the epoch is complete
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
|
||||||
|
List<Cluster_replication_schema>
|
||||||
|
*post_epoch_log_list,
|
||||||
|
List<Cluster_replication_schema>
|
||||||
|
*post_epoch_unlock_list)
|
||||||
|
{
|
||||||
|
DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
|
||||||
|
Cluster_replication_schema *schema;
|
||||||
|
while ((schema= post_epoch_log_list->pop()))
|
||||||
|
{
|
||||||
|
DBUG_PRINT("info", ("log query_length: %d query: '%s'",
|
||||||
|
schema->query_length, schema->query));
|
||||||
|
{
|
||||||
|
char key[FN_REFLEN];
|
||||||
|
build_table_filename(key, sizeof(key), schema->db, schema->name, "");
|
||||||
|
NDB_SHARE *share= get_share(key, 0, false, false);
|
||||||
|
switch ((enum SCHEMA_OP_TYPE)schema->type)
|
||||||
|
{
|
||||||
|
case SOT_DROP_DB:
|
||||||
|
case SOT_DROP_TABLE:
|
||||||
|
break;
|
||||||
|
case SOT_RENAME_TABLE:
|
||||||
|
case SOT_ALTER_TABLE:
|
||||||
|
if (share && share->op)
|
||||||
|
{
|
||||||
|
break; /* discovery handled by binlog */
|
||||||
|
}
|
||||||
|
pthread_mutex_lock(&LOCK_open);
|
||||||
|
if (ndb_create_table_from_engine(thd, schema->db, schema->name))
|
||||||
|
{
|
||||||
|
sql_print_error("Could not discover table '%s.%s' from "
|
||||||
|
"binlog schema event '%s' from node %d",
|
||||||
|
schema->db, schema->name, schema->query,
|
||||||
|
schema->node_id);
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&LOCK_open);
|
||||||
|
default:
|
||||||
|
DBUG_ASSERT(false);
|
||||||
|
}
|
||||||
|
if (share)
|
||||||
|
{
|
||||||
|
free_share(&share);
|
||||||
|
share= 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
{
|
||||||
|
char *thd_db_save= thd->db;
|
||||||
|
thd->db= schema->db;
|
||||||
|
thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
|
||||||
|
schema->query_length, FALSE,
|
||||||
|
schema->name[0] == 0);
|
||||||
|
thd->db= thd_db_save;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
while ((schema= post_epoch_unlock_list->pop()))
|
||||||
|
{
|
||||||
|
ndbcluster_update_slock(thd, schema->db, schema->name);
|
||||||
|
}
|
||||||
|
DBUG_VOID_RETURN;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Timer class for doing performance measurements
|
Timer class for doing performance measurements
|
||||||
*/
|
*/
|
||||||
@@ -2206,6 +2332,10 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
|
|||||||
if (share->flags & NSF_BLOB_FLAG)
|
if (share->flags & NSF_BLOB_FLAG)
|
||||||
op->mergeEvents(true); // currently not inherited from event
|
op->mergeEvents(true); // currently not inherited from event
|
||||||
|
|
||||||
|
DBUG_PRINT("info", ("share->ndb_value[0]: 0x%x",
|
||||||
|
share->ndb_value[0]));
|
||||||
|
DBUG_PRINT("info", ("share->ndb_value[1]: 0x%x",
|
||||||
|
share->ndb_value[1]));
|
||||||
int n_columns= ndbtab->getNoOfColumns();
|
int n_columns= ndbtab->getNoOfColumns();
|
||||||
int n_fields= table ? table->s->fields : 0; // XXX ???
|
int n_fields= table ? table->s->fields : 0; // XXX ???
|
||||||
for (int j= 0; j < n_columns; j++)
|
for (int j= 0; j < n_columns; j++)
|
||||||
@@ -2258,6 +2388,12 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
|
|||||||
}
|
}
|
||||||
share->ndb_value[0][j].ptr= attr0.ptr;
|
share->ndb_value[0][j].ptr= attr0.ptr;
|
||||||
share->ndb_value[1][j].ptr= attr1.ptr;
|
share->ndb_value[1][j].ptr= attr1.ptr;
|
||||||
|
DBUG_PRINT("info", ("&share->ndb_value[0][%d]: 0x%x "
|
||||||
|
"share->ndb_value[0][%d]: 0x%x",
|
||||||
|
j, &share->ndb_value[0][j], j, attr0.ptr));
|
||||||
|
DBUG_PRINT("info", ("&share->ndb_value[1][%d]: 0x%x "
|
||||||
|
"share->ndb_value[1][%d]: 0x%x",
|
||||||
|
j, &share->ndb_value[0][j], j, attr1.ptr));
|
||||||
}
|
}
|
||||||
op->setCustomData((void *) share); // set before execute
|
op->setCustomData((void *) share); // set before execute
|
||||||
share->op= op; // assign op in NDB_SHARE
|
share->op= op; // assign op in NDB_SHARE
|
||||||
@@ -2468,24 +2604,6 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
|
|||||||
"op_old: %lx",
|
"op_old: %lx",
|
||||||
share->key, share, pOp, share->op, share->op_old));
|
share->key, share, pOp, share->op, share->op_old));
|
||||||
break;
|
break;
|
||||||
case NDBEVENT::TE_ALTER:
|
|
||||||
if (pOp->tableNameChanged())
|
|
||||||
{
|
|
||||||
DBUG_PRINT("info", ("Detected name change of table %s.%s",
|
|
||||||
share->db, share->table_name));
|
|
||||||
/* ToDo: remove printout */
|
|
||||||
if (ndb_extra_logging)
|
|
||||||
sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
|
|
||||||
share_prefix, share->table->s->db.str,
|
|
||||||
share->table->s->table_name.str,
|
|
||||||
share->key);
|
|
||||||
/* do the rename of the table in the share */
|
|
||||||
share->table->s->db.str= share->db;
|
|
||||||
share->table->s->db.length= strlen(share->db);
|
|
||||||
share->table->s->table_name.str= share->table_name;
|
|
||||||
share->table->s->table_name.length= strlen(share->table_name);
|
|
||||||
}
|
|
||||||
goto drop_alter_common;
|
|
||||||
case NDBEVENT::TE_DROP:
|
case NDBEVENT::TE_DROP:
|
||||||
if (apply_status_share == share)
|
if (apply_status_share == share)
|
||||||
{
|
{
|
||||||
@@ -2495,7 +2613,8 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
|
|||||||
/* ToDo: remove printout */
|
/* ToDo: remove printout */
|
||||||
if (ndb_extra_logging)
|
if (ndb_extra_logging)
|
||||||
sql_print_information("NDB Binlog: drop table %s.", share->key);
|
sql_print_information("NDB Binlog: drop table %s.", share->key);
|
||||||
drop_alter_common:
|
// fall through
|
||||||
|
case NDBEVENT::TE_ALTER:
|
||||||
row.n_schemaops++;
|
row.n_schemaops++;
|
||||||
DBUG_PRINT("info", ("TABLE %s EVENT: %s received share: 0x%lx op: %lx "
|
DBUG_PRINT("info", ("TABLE %s EVENT: %s received share: 0x%lx op: %lx "
|
||||||
"share op: %lx op_old: %lx",
|
"share op: %lx op_old: %lx",
|
||||||
@@ -3075,26 +3194,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
ndb_binlog_thread_handle_schema_event_post_epoch(thd,
|
||||||
process any operations that should be done after
|
&post_epoch_log_list,
|
||||||
the epoch is complete
|
&post_epoch_unlock_list);
|
||||||
*/
|
|
||||||
{
|
|
||||||
Cluster_replication_schema *schema;
|
|
||||||
while ((schema= post_epoch_unlock_list.pop()))
|
|
||||||
{
|
|
||||||
ndbcluster_update_slock(thd, schema->db, schema->name);
|
|
||||||
}
|
|
||||||
while ((schema= post_epoch_log_list.pop()))
|
|
||||||
{
|
|
||||||
char *thd_db_save= thd->db;
|
|
||||||
thd->db= schema->db;
|
|
||||||
thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
|
|
||||||
schema->query_length, FALSE,
|
|
||||||
schema->name[0] == 0);
|
|
||||||
thd->db= thd_db_save;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
free_root(&mem_root, MYF(0));
|
free_root(&mem_root, MYF(0));
|
||||||
*root_ptr= old_root;
|
*root_ptr= old_root;
|
||||||
ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
|
ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
|
||||||
@@ -3110,9 +3212,15 @@ err:
|
|||||||
sql_print_information("Stopping Cluster Binlog");
|
sql_print_information("Stopping Cluster Binlog");
|
||||||
|
|
||||||
if (apply_status_share)
|
if (apply_status_share)
|
||||||
|
{
|
||||||
free_share(&apply_status_share);
|
free_share(&apply_status_share);
|
||||||
|
apply_status_share= 0;
|
||||||
|
}
|
||||||
if (schema_share)
|
if (schema_share)
|
||||||
|
{
|
||||||
free_share(&schema_share);
|
free_share(&schema_share);
|
||||||
|
schema_share= 0;
|
||||||
|
}
|
||||||
|
|
||||||
/* remove all event operations */
|
/* remove all event operations */
|
||||||
if (ndb)
|
if (ndb)
|
||||||
|
|||||||
@@ -292,6 +292,7 @@ private:
|
|||||||
};
|
};
|
||||||
Buf theKeyBuf;
|
Buf theKeyBuf;
|
||||||
Buf theAccessKeyBuf;
|
Buf theAccessKeyBuf;
|
||||||
|
Buf thePackKeyBuf;
|
||||||
Buf theHeadInlineBuf;
|
Buf theHeadInlineBuf;
|
||||||
Buf theHeadInlineCopyBuf; // for writeTuple
|
Buf theHeadInlineCopyBuf; // for writeTuple
|
||||||
Buf thePartBuf;
|
Buf thePartBuf;
|
||||||
@@ -328,6 +329,9 @@ private:
|
|||||||
Uint32 getPartNumber(Uint64 pos);
|
Uint32 getPartNumber(Uint64 pos);
|
||||||
Uint32 getPartCount();
|
Uint32 getPartCount();
|
||||||
Uint32 getDistKey(Uint32 part);
|
Uint32 getDistKey(Uint32 part);
|
||||||
|
// pack / unpack
|
||||||
|
int packKeyValue(const NdbTableImpl* aTable, const Buf& srcBuf);
|
||||||
|
int unpackKeyValue(const NdbTableImpl* aTable, Buf& dstBuf);
|
||||||
// getters and setters
|
// getters and setters
|
||||||
int getTableKeyValue(NdbOperation* anOp);
|
int getTableKeyValue(NdbOperation* anOp);
|
||||||
int setTableKeyValue(NdbOperation* anOp);
|
int setTableKeyValue(NdbOperation* anOp);
|
||||||
|
|||||||
@@ -881,7 +881,7 @@ protected:
|
|||||||
Uint32 ptr2int() { return theReceiver.getId(); };
|
Uint32 ptr2int() { return theReceiver.getId(); };
|
||||||
|
|
||||||
// get table or index key from prepared signals
|
// get table or index key from prepared signals
|
||||||
int getKeyFromTCREQ(Uint32* data, unsigned size);
|
int getKeyFromTCREQ(Uint32* data, Uint32 & size);
|
||||||
|
|
||||||
/******************************************************************************
|
/******************************************************************************
|
||||||
* These are the private variables that are defined in the operation objects.
|
* These are the private variables that are defined in the operation objects.
|
||||||
|
|||||||
@@ -240,7 +240,7 @@ protected:
|
|||||||
void receiver_completed(NdbReceiver*);
|
void receiver_completed(NdbReceiver*);
|
||||||
void execCLOSE_SCAN_REP();
|
void execCLOSE_SCAN_REP();
|
||||||
|
|
||||||
int getKeyFromKEYINFO20(Uint32* data, unsigned size);
|
int getKeyFromKEYINFO20(Uint32* data, Uint32 & size);
|
||||||
NdbOperation* takeOverScanOp(OperationType opType, NdbTransaction*);
|
NdbOperation* takeOverScanOp(OperationType opType, NdbTransaction*);
|
||||||
|
|
||||||
bool m_ordered;
|
bool m_ordered;
|
||||||
|
|||||||
@@ -310,7 +310,7 @@ NdbBlob::Buf::alloc(unsigned n)
|
|||||||
void
|
void
|
||||||
NdbBlob::Buf::copyfrom(const NdbBlob::Buf& src)
|
NdbBlob::Buf::copyfrom(const NdbBlob::Buf& src)
|
||||||
{
|
{
|
||||||
assert(size == src.size);
|
size = src.size;
|
||||||
memcpy(data, src.data, size);
|
memcpy(data, src.data, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -408,6 +408,75 @@ NdbBlob::getDistKey(Uint32 part)
|
|||||||
return (part / theStripeSize) % theStripeSize;
|
return (part / theStripeSize) % theStripeSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pack/unpack table/index key XXX support routines, shortcuts
|
||||||
|
|
||||||
|
int
|
||||||
|
NdbBlob::packKeyValue(const NdbTableImpl* aTable, const Buf& srcBuf)
|
||||||
|
{
|
||||||
|
DBUG_ENTER("NdbBlob::packKeyValue");
|
||||||
|
const Uint32* data = (const Uint32*)srcBuf.data;
|
||||||
|
unsigned pos = 0;
|
||||||
|
Uint32* pack_data = (Uint32*)thePackKeyBuf.data;
|
||||||
|
unsigned pack_pos = 0;
|
||||||
|
for (unsigned i = 0; i < aTable->m_columns.size(); i++) {
|
||||||
|
NdbColumnImpl* c = aTable->m_columns[i];
|
||||||
|
assert(c != NULL);
|
||||||
|
if (c->m_pk) {
|
||||||
|
unsigned len = c->m_attrSize * c->m_arraySize;
|
||||||
|
Uint32 pack_len;
|
||||||
|
bool ok = c->get_var_length(&data[pos], pack_len);
|
||||||
|
if (! ok) {
|
||||||
|
setErrorCode(NdbBlobImpl::ErrCorruptPK);
|
||||||
|
DBUG_RETURN(-1);
|
||||||
|
}
|
||||||
|
memcpy(&pack_data[pack_pos], &data[pos], pack_len);
|
||||||
|
while (pack_len % 4 != 0) {
|
||||||
|
char* p = (char*)&pack_data[pack_pos] + pack_len++;
|
||||||
|
*p = 0;
|
||||||
|
}
|
||||||
|
pos += (len + 3) / 4;
|
||||||
|
pack_pos += pack_len / 4;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert(4 * pos == srcBuf.size);
|
||||||
|
assert(4 * pack_pos <= thePackKeyBuf.maxsize);
|
||||||
|
thePackKeyBuf.size = 4 * pack_pos;
|
||||||
|
DBUG_RETURN(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
NdbBlob::unpackKeyValue(const NdbTableImpl* aTable, Buf& dstBuf)
|
||||||
|
{
|
||||||
|
DBUG_ENTER("NdbBlob::unpackKeyValue");
|
||||||
|
Uint32* data = (Uint32*)dstBuf.data;
|
||||||
|
unsigned pos = 0;
|
||||||
|
const Uint32* pack_data = (const Uint32*)thePackKeyBuf.data;
|
||||||
|
unsigned pack_pos = 0;
|
||||||
|
for (unsigned i = 0; i < aTable->m_columns.size(); i++) {
|
||||||
|
NdbColumnImpl* c = aTable->m_columns[i];
|
||||||
|
assert(c != NULL);
|
||||||
|
if (c->m_pk) {
|
||||||
|
unsigned len = c->m_attrSize * c->m_arraySize;
|
||||||
|
Uint32 pack_len;
|
||||||
|
bool ok = c->get_var_length(&pack_data[pack_pos], pack_len);
|
||||||
|
if (! ok) {
|
||||||
|
setErrorCode(NdbBlobImpl::ErrCorruptPK);
|
||||||
|
DBUG_RETURN(-1);
|
||||||
|
}
|
||||||
|
memcpy(&data[pos], &pack_data[pack_pos], pack_len);
|
||||||
|
while (pack_len % 4 != 0) {
|
||||||
|
char* p = (char*)&data[pos] + pack_len++;
|
||||||
|
*p = 0;
|
||||||
|
}
|
||||||
|
pos += (len + 3) / 4;
|
||||||
|
pack_pos += pack_len / 4;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert(4 * pos == dstBuf.size);
|
||||||
|
assert(4 * pack_pos == thePackKeyBuf.size);
|
||||||
|
DBUG_RETURN(0);
|
||||||
|
}
|
||||||
|
|
||||||
// getters and setters
|
// getters and setters
|
||||||
|
|
||||||
int
|
int
|
||||||
@@ -489,12 +558,10 @@ int
|
|||||||
NdbBlob::setPartKeyValue(NdbOperation* anOp, Uint32 part)
|
NdbBlob::setPartKeyValue(NdbOperation* anOp, Uint32 part)
|
||||||
{
|
{
|
||||||
DBUG_ENTER("NdbBlob::setPartKeyValue");
|
DBUG_ENTER("NdbBlob::setPartKeyValue");
|
||||||
DBUG_PRINT("info", ("dist=%u part=%u key=", getDistKey(part), part));
|
DBUG_PRINT("info", ("dist=%u part=%u packkey=", getDistKey(part), part));
|
||||||
DBUG_DUMP("info", theKeyBuf.data, 4 * theTable->m_keyLenInWords);
|
DBUG_DUMP("info", thePackKeyBuf.data, 4 * thePackKeyBuf.size);
|
||||||
//Uint32* data = (Uint32*)theKeyBuf.data;
|
|
||||||
//unsigned size = theTable->m_keyLenInWords;
|
|
||||||
// TODO use attr ids after compatibility with 4.1.7 not needed
|
// TODO use attr ids after compatibility with 4.1.7 not needed
|
||||||
if (anOp->equal("PK", theKeyBuf.data) == -1 ||
|
if (anOp->equal("PK", thePackKeyBuf.data) == -1 ||
|
||||||
anOp->equal("DIST", getDistKey(part)) == -1 ||
|
anOp->equal("DIST", getDistKey(part)) == -1 ||
|
||||||
anOp->equal("PART", part) == -1) {
|
anOp->equal("PART", part) == -1) {
|
||||||
setErrorCode(anOp);
|
setErrorCode(anOp);
|
||||||
@@ -1242,21 +1309,27 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl
|
|||||||
if (isKeyOp()) {
|
if (isKeyOp()) {
|
||||||
if (isTableOp()) {
|
if (isTableOp()) {
|
||||||
// get table key
|
// get table key
|
||||||
Uint32* data = (Uint32*)theKeyBuf.data;
|
Uint32* data = (Uint32*)thePackKeyBuf.data;
|
||||||
unsigned size = theTable->m_keyLenInWords;
|
Uint32 size = theTable->m_keyLenInWords; // in-out
|
||||||
if (theNdbOp->getKeyFromTCREQ(data, size) == -1) {
|
if (theNdbOp->getKeyFromTCREQ(data, size) == -1) {
|
||||||
setErrorCode(NdbBlobImpl::ErrUsage);
|
setErrorCode(NdbBlobImpl::ErrUsage);
|
||||||
DBUG_RETURN(-1);
|
DBUG_RETURN(-1);
|
||||||
}
|
}
|
||||||
|
thePackKeyBuf.size = 4 * size;
|
||||||
|
if (unpackKeyValue(theTable, theKeyBuf) == -1)
|
||||||
|
DBUG_RETURN(-1);
|
||||||
}
|
}
|
||||||
if (isIndexOp()) {
|
if (isIndexOp()) {
|
||||||
// get index key
|
// get index key
|
||||||
Uint32* data = (Uint32*)theAccessKeyBuf.data;
|
Uint32* data = (Uint32*)thePackKeyBuf.data;
|
||||||
unsigned size = theAccessTable->m_keyLenInWords;
|
Uint32 size = theAccessTable->m_keyLenInWords; // in-out
|
||||||
if (theNdbOp->getKeyFromTCREQ(data, size) == -1) {
|
if (theNdbOp->getKeyFromTCREQ(data, size) == -1) {
|
||||||
setErrorCode(NdbBlobImpl::ErrUsage);
|
setErrorCode(NdbBlobImpl::ErrUsage);
|
||||||
DBUG_RETURN(-1);
|
DBUG_RETURN(-1);
|
||||||
}
|
}
|
||||||
|
thePackKeyBuf.size = 4 * size;
|
||||||
|
if (unpackKeyValue(theAccessTable, theAccessKeyBuf) == -1)
|
||||||
|
DBUG_RETURN(-1);
|
||||||
}
|
}
|
||||||
if (isReadOp()) {
|
if (isReadOp()) {
|
||||||
// add read of head+inline in this op
|
// add read of head+inline in this op
|
||||||
@@ -1303,6 +1376,7 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp,
|
|||||||
theEventOp = anOp;
|
theEventOp = anOp;
|
||||||
theBlobEventOp = aBlobOp;
|
theBlobEventOp = aBlobOp;
|
||||||
theTable = anOp->m_eventImpl->m_tableImpl;
|
theTable = anOp->m_eventImpl->m_tableImpl;
|
||||||
|
theAccessTable = theTable;
|
||||||
theColumn = aColumn;
|
theColumn = aColumn;
|
||||||
// prepare blob column and table
|
// prepare blob column and table
|
||||||
if (prepareColumn() == -1)
|
if (prepareColumn() == -1)
|
||||||
@@ -1321,7 +1395,7 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp,
|
|||||||
if (theBlobEventOp != NULL) {
|
if (theBlobEventOp != NULL) {
|
||||||
if ((theBlobEventPkRecAttr =
|
if ((theBlobEventPkRecAttr =
|
||||||
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)0),
|
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)0),
|
||||||
theKeyBuf.data, version)) == NULL ||
|
thePackKeyBuf.data, version)) == NULL ||
|
||||||
(theBlobEventDistRecAttr =
|
(theBlobEventDistRecAttr =
|
||||||
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)1),
|
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)1),
|
||||||
(char*)0, version)) == NULL ||
|
(char*)0, version)) == NULL ||
|
||||||
@@ -1380,6 +1454,7 @@ NdbBlob::prepareColumn()
|
|||||||
}
|
}
|
||||||
// these buffers are always used
|
// these buffers are always used
|
||||||
theKeyBuf.alloc(theTable->m_keyLenInWords << 2);
|
theKeyBuf.alloc(theTable->m_keyLenInWords << 2);
|
||||||
|
thePackKeyBuf.alloc(max(theTable->m_keyLenInWords, theAccessTable->m_keyLenInWords) << 2);
|
||||||
theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize);
|
theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize);
|
||||||
theHead = (Head*)theHeadInlineBuf.data;
|
theHead = (Head*)theHeadInlineBuf.data;
|
||||||
theInlineData = theHeadInlineBuf.data + sizeof(Head);
|
theInlineData = theHeadInlineBuf.data + sizeof(Head);
|
||||||
@@ -1464,7 +1539,7 @@ NdbBlob::preExecute(NdbTransaction::ExecType anExecType, bool& batch)
|
|||||||
if (tOp == NULL ||
|
if (tOp == NULL ||
|
||||||
tOp->readTuple() == -1 ||
|
tOp->readTuple() == -1 ||
|
||||||
setAccessKeyValue(tOp) == -1 ||
|
setAccessKeyValue(tOp) == -1 ||
|
||||||
tOp->getValue(pkAttrId, theKeyBuf.data) == NULL) {
|
tOp->getValue(pkAttrId, thePackKeyBuf.data) == NULL) {
|
||||||
setErrorCode(tOp);
|
setErrorCode(tOp);
|
||||||
DBUG_RETURN(-1);
|
DBUG_RETURN(-1);
|
||||||
}
|
}
|
||||||
@@ -1553,10 +1628,12 @@ NdbBlob::postExecute(NdbTransaction::ExecType anExecType)
|
|||||||
assert(isKeyOp());
|
assert(isKeyOp());
|
||||||
if (isIndexOp()) {
|
if (isIndexOp()) {
|
||||||
NdbBlob* tFirstBlob = theNdbOp->theBlobList;
|
NdbBlob* tFirstBlob = theNdbOp->theBlobList;
|
||||||
if (this != tFirstBlob) {
|
if (this == tFirstBlob) {
|
||||||
|
packKeyValue(theTable, theKeyBuf);
|
||||||
|
} else {
|
||||||
// copy key from first blob
|
// copy key from first blob
|
||||||
assert(theKeyBuf.size == tFirstBlob->theKeyBuf.size);
|
theKeyBuf.copyfrom(tFirstBlob->theKeyBuf);
|
||||||
memcpy(theKeyBuf.data, tFirstBlob->theKeyBuf.data, tFirstBlob->theKeyBuf.size);
|
thePackKeyBuf.copyfrom(tFirstBlob->thePackKeyBuf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (isReadOp()) {
|
if (isReadOp()) {
|
||||||
@@ -1710,12 +1787,16 @@ NdbBlob::atNextResult()
|
|||||||
DBUG_RETURN(-1);
|
DBUG_RETURN(-1);
|
||||||
assert(isScanOp());
|
assert(isScanOp());
|
||||||
// get primary key
|
// get primary key
|
||||||
{ Uint32* data = (Uint32*)theKeyBuf.data;
|
{ NdbScanOperation* tScanOp = (NdbScanOperation*)theNdbOp;
|
||||||
unsigned size = theTable->m_keyLenInWords;
|
Uint32* data = (Uint32*)thePackKeyBuf.data;
|
||||||
if (((NdbScanOperation*)theNdbOp)->getKeyFromKEYINFO20(data, size) == -1) {
|
unsigned size = theTable->m_keyLenInWords; // in-out
|
||||||
|
if (tScanOp->getKeyFromKEYINFO20(data, size) == -1) {
|
||||||
setErrorCode(NdbBlobImpl::ErrUsage);
|
setErrorCode(NdbBlobImpl::ErrUsage);
|
||||||
DBUG_RETURN(-1);
|
DBUG_RETURN(-1);
|
||||||
}
|
}
|
||||||
|
thePackKeyBuf.size = 4 * size;
|
||||||
|
if (unpackKeyValue(theTable, theKeyBuf) == -1)
|
||||||
|
DBUG_RETURN(-1);
|
||||||
}
|
}
|
||||||
getHeadFromRecAttr();
|
getHeadFromRecAttr();
|
||||||
if (setPos(0) == -1)
|
if (setPos(0) == -1)
|
||||||
|
|||||||
@@ -34,6 +34,8 @@ public:
|
|||||||
STATIC_CONST( ErrAbort = 4268 );
|
STATIC_CONST( ErrAbort = 4268 );
|
||||||
// "Unknown blob error"
|
// "Unknown blob error"
|
||||||
STATIC_CONST( ErrUnknown = 4269 );
|
STATIC_CONST( ErrUnknown = 4269 );
|
||||||
|
// "Corrupted main table PK in blob operation"
|
||||||
|
STATIC_CONST( ErrCorruptPK = 4274 );
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@@ -689,9 +689,45 @@ NdbEventOperationImpl::receive_event()
|
|||||||
error.code));
|
error.code));
|
||||||
DBUG_RETURN_EVENT(1);
|
DBUG_RETURN_EVENT(1);
|
||||||
}
|
}
|
||||||
if ( m_eventImpl->m_tableImpl)
|
|
||||||
delete m_eventImpl->m_tableImpl;
|
NdbTableImpl *tmp_table_impl= m_eventImpl->m_tableImpl;
|
||||||
m_eventImpl->m_tableImpl = at;
|
m_eventImpl->m_tableImpl = at;
|
||||||
|
|
||||||
|
DBUG_PRINT("info", ("switching table impl 0x%x -> 0x%x",
|
||||||
|
tmp_table_impl, at));
|
||||||
|
|
||||||
|
// change the rec attrs to refer to the new table object
|
||||||
|
int i;
|
||||||
|
for (i = 0; i < 2; i++)
|
||||||
|
{
|
||||||
|
NdbRecAttr *p = theFirstPkAttrs[i];
|
||||||
|
while (p)
|
||||||
|
{
|
||||||
|
int no = p->getColumn()->getColumnNo();
|
||||||
|
NdbColumnImpl *tAttrInfo = at->getColumn(no);
|
||||||
|
DBUG_PRINT("info", ("rec_attr: 0x%x "
|
||||||
|
"switching column impl 0x%x -> 0x%x",
|
||||||
|
p, p->m_column, tAttrInfo));
|
||||||
|
p->m_column = tAttrInfo;
|
||||||
|
p = p->next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (i = 0; i < 2; i++)
|
||||||
|
{
|
||||||
|
NdbRecAttr *p = theFirstDataAttrs[i];
|
||||||
|
while (p)
|
||||||
|
{
|
||||||
|
int no = p->getColumn()->getColumnNo();
|
||||||
|
NdbColumnImpl *tAttrInfo = at->getColumn(no);
|
||||||
|
DBUG_PRINT("info", ("rec_attr: 0x%x "
|
||||||
|
"switching column impl 0x%x -> 0x%x",
|
||||||
|
p, p->m_column, tAttrInfo));
|
||||||
|
p->m_column = tAttrInfo;
|
||||||
|
p = p->next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (tmp_table_impl)
|
||||||
|
delete tmp_table_impl;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
|
if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
|
||||||
@@ -1979,7 +2015,7 @@ split_concatenated_pk(const NdbTableImpl* t, Uint32* ah_buffer,
|
|||||||
ah_buffer[n++] = ah.m_value;
|
ah_buffer[n++] = ah.m_value;
|
||||||
sz += ah.getDataSize();
|
sz += ah.getDataSize();
|
||||||
}
|
}
|
||||||
assert(n == t->m_noOfKeys && sz == pk_sz);
|
assert(n == t->m_noOfKeys && sz <= pk_sz);
|
||||||
}
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
|
|||||||
@@ -472,7 +472,8 @@ void
|
|||||||
NdbOperation::reorderKEYINFO()
|
NdbOperation::reorderKEYINFO()
|
||||||
{
|
{
|
||||||
Uint32 data[4000];
|
Uint32 data[4000];
|
||||||
getKeyFromTCREQ(data, 4000);
|
Uint32 size = 4000;
|
||||||
|
getKeyFromTCREQ(data, size);
|
||||||
Uint32 pos = 1;
|
Uint32 pos = 1;
|
||||||
Uint32 k;
|
Uint32 k;
|
||||||
for (k = 0; k < m_accessTable->m_noOfKeys; k++) {
|
for (k = 0; k < m_accessTable->m_noOfKeys; k++) {
|
||||||
@@ -501,7 +502,7 @@ NdbOperation::reorderKEYINFO()
|
|||||||
}
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
NdbOperation::getKeyFromTCREQ(Uint32* data, unsigned size)
|
NdbOperation::getKeyFromTCREQ(Uint32* data, Uint32 & size)
|
||||||
{
|
{
|
||||||
assert(size >= theTupKeyLen && theTupKeyLen > 0);
|
assert(size >= theTupKeyLen && theTupKeyLen > 0);
|
||||||
size = theTupKeyLen;
|
size = theTupKeyLen;
|
||||||
|
|||||||
@@ -199,7 +199,7 @@ NdbOut& operator<<(NdbOut& out, const NdbRecAttr &r)
|
|||||||
out << hex << "H'" << r.u_32_value() << dec;
|
out << hex << "H'" << r.u_32_value() << dec;
|
||||||
break;
|
break;
|
||||||
case NdbDictionary::Column::Unsigned:
|
case NdbDictionary::Column::Unsigned:
|
||||||
out << r.u_32_value();
|
out << *((Uint32*)r.aRef() + j);
|
||||||
break;
|
break;
|
||||||
case NdbDictionary::Column::Smallunsigned:
|
case NdbDictionary::Column::Smallunsigned:
|
||||||
out << r.u_short_value();
|
out << r.u_short_value();
|
||||||
|
|||||||
@@ -912,13 +912,20 @@ NdbScanOperation::doSendScan(int aProcessorId)
|
|||||||
* the scan process.
|
* the scan process.
|
||||||
****************************************************************************/
|
****************************************************************************/
|
||||||
int
|
int
|
||||||
NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size)
|
NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, Uint32 & size)
|
||||||
{
|
{
|
||||||
NdbRecAttr * tRecAttr = m_curr_row;
|
NdbRecAttr * tRecAttr = m_curr_row;
|
||||||
if(tRecAttr)
|
if(tRecAttr)
|
||||||
{
|
{
|
||||||
const Uint32 * src = (Uint32*)tRecAttr->aRef();
|
const Uint32 * src = (Uint32*)tRecAttr->aRef();
|
||||||
memcpy(data, src, 4*size);
|
|
||||||
|
assert(tRecAttr->get_size_in_bytes() > 0);
|
||||||
|
assert(tRecAttr->get_size_in_bytes() < 65536);
|
||||||
|
const Uint32 len = (tRecAttr->get_size_in_bytes() + 3)/4-1;
|
||||||
|
|
||||||
|
assert(size >= len);
|
||||||
|
memcpy(data, src, 4*len);
|
||||||
|
size = len;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
|
|||||||
@@ -599,7 +599,8 @@ ErrorBundle ErrorCodes[] = {
|
|||||||
{ 4336, DMEC, AE, "Auto-increment value set below current value" },
|
{ 4336, DMEC, AE, "Auto-increment value set below current value" },
|
||||||
{ 4271, DMEC, AE, "Invalid index object, not retrieved via getIndex()" },
|
{ 4271, DMEC, AE, "Invalid index object, not retrieved via getIndex()" },
|
||||||
{ 4272, DMEC, AE, "Table definition has undefined column" },
|
{ 4272, DMEC, AE, "Table definition has undefined column" },
|
||||||
{ 4273, DMEC, IE, "No blob table in dict cache" }
|
{ 4273, DMEC, IE, "No blob table in dict cache" },
|
||||||
|
{ 4274, DMEC, IE, "Corrupted main table PK in blob operation" }
|
||||||
};
|
};
|
||||||
|
|
||||||
static
|
static
|
||||||
|
|||||||
@@ -223,7 +223,7 @@ dropTable()
|
|||||||
{
|
{
|
||||||
NdbDictionary::Table tab(g_opt.m_tname);
|
NdbDictionary::Table tab(g_opt.m_tname);
|
||||||
if (g_dic->getTable(g_opt.m_tname) != 0)
|
if (g_dic->getTable(g_opt.m_tname) != 0)
|
||||||
CHK(g_dic->dropTable(tab) == 0);
|
CHK(g_dic->dropTable(g_opt.m_tname) == 0);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user