mirror of
https://github.com/MariaDB/server.git
synced 2025-07-30 16:24:05 +03:00
MDEV-32444 Data from orphaned XA transaction is lost after online alter
XA support for online alter was totally missing. Tying on binlog_hton made this hardly visible: simply having binlog_commit called from xa_commit made an impression that it will automagically work for online alter, which turns out wrong: all binlog does is writes "XA END" into trx cache and flushes it to a real binlog. In comparison, online alter can't do the same, since online replication happens in a single transaction. Solution: make a dedicated XA support. * Extend struct xid_t with a pointer to Online_alter_cache_list * On prepare: move online alter cache from THD::ha_data to XID passed * On XA commit/rollback: use the online alter cache stored in this XID. This makes us pass xid_cache_element->xid to xa_commit/xa_rollback instead of lex->xid * Use manual memory management for online alter cache list, instead of mem_root allocation, since we don't have mem_root connected to the XA transaction.
This commit is contained in:
@ -1563,6 +1563,116 @@ connection default;
|
||||
drop table t1, t2;
|
||||
set @@binlog_format=default;
|
||||
set debug_sync= reset;
|
||||
# MDEV-32444 Data from orphaned XA transaction is lost after online alter
|
||||
create table t (a int primary key) engine=innodb;
|
||||
insert into t values (1);
|
||||
# XA commit
|
||||
set debug_sync= 'alter_table_online_downgraded signal downgraded wait_for go';
|
||||
alter table t force, algorithm=copy, lock=none;
|
||||
connection con1;
|
||||
set debug_sync= 'now wait_for downgraded';
|
||||
xa begin 'x1';
|
||||
update t set a = 2 where a = 1;
|
||||
xa end 'x1';
|
||||
xa prepare 'x1';
|
||||
set debug_sync= 'THD_cleanup_after_trans_cleanup signal xa_detach';
|
||||
disconnect con1;
|
||||
connection con2;
|
||||
set debug_sync= 'now wait_for xa_detach';
|
||||
xa commit 'x1';
|
||||
set debug_sync= 'now signal go';
|
||||
connection default;
|
||||
select * from t;
|
||||
a
|
||||
2
|
||||
# XA rollback
|
||||
set debug_sync= 'alter_table_online_downgraded signal downgraded wait_for go';
|
||||
alter table t force, algorithm=copy, lock=none;
|
||||
connect con1, localhost, root,,;
|
||||
set debug_sync= 'now wait_for downgraded';
|
||||
xa begin 'x2';
|
||||
insert into t values (53);
|
||||
xa end 'x2';
|
||||
xa prepare 'x2';
|
||||
set debug_sync= 'THD_cleanup_after_trans_cleanup signal xa_detach';
|
||||
disconnect con1;
|
||||
connection con2;
|
||||
set debug_sync= 'now wait_for xa_detach';
|
||||
xa rollback 'x2';
|
||||
set debug_sync= 'now signal go';
|
||||
connection default;
|
||||
select * from t;
|
||||
a
|
||||
2
|
||||
# XA transaction is left uncommitted
|
||||
# end then is rollbacked after alter fails
|
||||
set debug_sync= 'alter_table_online_downgraded signal downgraded wait_for go';
|
||||
set statement innodb_lock_wait_timeout=0, lock_wait_timeout= 0
|
||||
for alter table t force, algorithm=copy, lock=none;
|
||||
connect con1, localhost, root,,;
|
||||
set debug_sync= 'now wait_for downgraded';
|
||||
xa begin 'xuncommitted';
|
||||
insert into t values (3);
|
||||
xa end 'xuncommitted';
|
||||
xa prepare 'xuncommitted';
|
||||
set debug_sync= 'now signal go';
|
||||
set debug_sync= 'THD_cleanup_after_trans_cleanup signal xa_detach';
|
||||
disconnect con1;
|
||||
connection default;
|
||||
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
|
||||
set debug_sync= 'now wait_for xa_detach';
|
||||
xa rollback 'xuncommitted';
|
||||
select * from t;
|
||||
a
|
||||
2
|
||||
# Same, but commit
|
||||
set debug_sync= 'alter_table_online_downgraded signal downgraded wait_for go';
|
||||
set statement innodb_lock_wait_timeout=0, lock_wait_timeout= 0
|
||||
for alter table t force, algorithm=copy, lock=none;
|
||||
connect con1, localhost, root,,;
|
||||
set debug_sync= 'now wait_for downgraded';
|
||||
xa begin 'committed_later';
|
||||
insert into t values (3);
|
||||
xa end 'committed_later';
|
||||
xa prepare 'committed_later';
|
||||
set debug_sync= 'now signal go';
|
||||
set debug_sync= 'THD_cleanup_after_trans_cleanup signal xa_detach';
|
||||
disconnect con1;
|
||||
connection default;
|
||||
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
|
||||
set debug_sync= 'now wait_for xa_detach';
|
||||
xa commit 'committed_later';
|
||||
select * from t;
|
||||
a
|
||||
2
|
||||
3
|
||||
# Commit, but error in statement, and there is some stmt data to rollback
|
||||
set debug_sync= 'alter_table_online_downgraded signal downgraded wait_for go';
|
||||
alter table t force, algorithm=copy, lock=none;
|
||||
connect con1, localhost, root,,;
|
||||
set debug_sync= 'now wait_for downgraded';
|
||||
xa begin 'x1';
|
||||
insert into t values (4), (3);
|
||||
ERROR 23000: Duplicate entry '3' for key 'PRIMARY'
|
||||
insert into t values (5);
|
||||
xa end 'x1';
|
||||
xa prepare 'x1';
|
||||
set debug_sync= 'THD_cleanup_after_trans_cleanup signal xa_detach';
|
||||
disconnect con1;
|
||||
connection con2;
|
||||
set debug_sync= 'now wait_for xa_detach';
|
||||
xa commit 'x1';
|
||||
set debug_sync= 'now signal go';
|
||||
connection default;
|
||||
select * from t;
|
||||
a
|
||||
2
|
||||
3
|
||||
5
|
||||
connect con1, localhost, root,,;
|
||||
connection default;
|
||||
drop table t;
|
||||
set debug_sync= reset;
|
||||
disconnect con1;
|
||||
disconnect con2;
|
||||
#
|
||||
|
@ -1791,6 +1791,135 @@ set @@binlog_format=default;
|
||||
set debug_sync= reset;
|
||||
|
||||
|
||||
--echo # MDEV-32444 Data from orphaned XA transaction is lost after online alter
|
||||
|
||||
create table t (a int primary key) engine=innodb;
|
||||
insert into t values (1);
|
||||
|
||||
--echo # XA commit
|
||||
|
||||
set debug_sync= 'alter_table_online_downgraded signal downgraded wait_for go';
|
||||
send alter table t force, algorithm=copy, lock=none;
|
||||
|
||||
--connection con1
|
||||
set debug_sync= 'now wait_for downgraded';
|
||||
xa begin 'x1';
|
||||
update t set a = 2 where a = 1;
|
||||
xa end 'x1';
|
||||
xa prepare 'x1';
|
||||
set debug_sync= 'THD_cleanup_after_trans_cleanup signal xa_detach';
|
||||
--disconnect con1
|
||||
|
||||
--connection con2
|
||||
set debug_sync= 'now wait_for xa_detach';
|
||||
xa commit 'x1';
|
||||
set debug_sync= 'now signal go';
|
||||
--connection default
|
||||
--reap # alter table
|
||||
|
||||
select * from t;
|
||||
|
||||
--echo # XA rollback
|
||||
|
||||
set debug_sync= 'alter_table_online_downgraded signal downgraded wait_for go';
|
||||
send alter table t force, algorithm=copy, lock=none;
|
||||
--connect(con1, localhost, root,,)
|
||||
set debug_sync= 'now wait_for downgraded';
|
||||
xa begin 'x2';
|
||||
insert into t values (53);
|
||||
xa end 'x2';
|
||||
xa prepare 'x2';
|
||||
set debug_sync= 'THD_cleanup_after_trans_cleanup signal xa_detach';
|
||||
--disconnect con1
|
||||
|
||||
--connection con2
|
||||
set debug_sync= 'now wait_for xa_detach';
|
||||
xa rollback 'x2';
|
||||
set debug_sync= 'now signal go';
|
||||
--connection default
|
||||
--reap # alter table
|
||||
|
||||
select * from t;
|
||||
|
||||
--echo # XA transaction is left uncommitted
|
||||
--echo # end then is rollbacked after alter fails
|
||||
|
||||
set debug_sync= 'alter_table_online_downgraded signal downgraded wait_for go';
|
||||
send set statement innodb_lock_wait_timeout=0, lock_wait_timeout= 0
|
||||
for alter table t force, algorithm=copy, lock=none;
|
||||
|
||||
--connect(con1, localhost, root,,)
|
||||
set debug_sync= 'now wait_for downgraded';
|
||||
xa begin 'xuncommitted';
|
||||
insert into t values (3);
|
||||
xa end 'xuncommitted';
|
||||
xa prepare 'xuncommitted';
|
||||
set debug_sync= 'now signal go';
|
||||
set debug_sync= 'THD_cleanup_after_trans_cleanup signal xa_detach';
|
||||
--disconnect con1
|
||||
|
||||
--connection default
|
||||
--error ER_LOCK_WAIT_TIMEOUT
|
||||
--reap # alter table
|
||||
set debug_sync= 'now wait_for xa_detach';
|
||||
xa rollback 'xuncommitted';
|
||||
|
||||
select * from t;
|
||||
|
||||
--echo # Same, but commit
|
||||
|
||||
set debug_sync= 'alter_table_online_downgraded signal downgraded wait_for go';
|
||||
send set statement innodb_lock_wait_timeout=0, lock_wait_timeout= 0
|
||||
for alter table t force, algorithm=copy, lock=none;
|
||||
|
||||
--connect(con1, localhost, root,,)
|
||||
set debug_sync= 'now wait_for downgraded';
|
||||
xa begin 'committed_later';
|
||||
insert into t values (3);
|
||||
xa end 'committed_later';
|
||||
xa prepare 'committed_later';
|
||||
set debug_sync= 'now signal go';
|
||||
set debug_sync= 'THD_cleanup_after_trans_cleanup signal xa_detach';
|
||||
--disconnect con1
|
||||
|
||||
--connection default
|
||||
--error ER_LOCK_WAIT_TIMEOUT
|
||||
--reap # alter table
|
||||
set debug_sync= 'now wait_for xa_detach';
|
||||
xa commit 'committed_later';
|
||||
|
||||
|
||||
select * from t;
|
||||
|
||||
--echo # Commit, but error in statement, and there is some stmt data to rollback
|
||||
|
||||
set debug_sync= 'alter_table_online_downgraded signal downgraded wait_for go';
|
||||
send alter table t force, algorithm=copy, lock=none;
|
||||
--connect(con1, localhost, root,,)
|
||||
set debug_sync= 'now wait_for downgraded';
|
||||
xa begin 'x1';
|
||||
--error ER_DUP_ENTRY
|
||||
insert into t values (4), (3);
|
||||
insert into t values (5);
|
||||
xa end 'x1';
|
||||
xa prepare 'x1';
|
||||
set debug_sync= 'THD_cleanup_after_trans_cleanup signal xa_detach';
|
||||
--disconnect con1
|
||||
|
||||
--connection con2
|
||||
set debug_sync= 'now wait_for xa_detach';
|
||||
xa commit 'x1';
|
||||
set debug_sync= 'now signal go';
|
||||
--connection default
|
||||
--reap # alter table
|
||||
|
||||
select * from t;
|
||||
|
||||
--connect(con1, localhost, root,,)
|
||||
--connection default
|
||||
drop table t;
|
||||
set debug_sync= reset;
|
||||
|
||||
--disconnect con1
|
||||
--disconnect con2
|
||||
--echo #
|
||||
|
@ -2596,7 +2596,7 @@ static bool xarecover_decide_to_commit(xid_recovery_member* member,
|
||||
static void xarecover_do_commit_or_rollback(handlerton *hton,
|
||||
xarecover_complete_arg *arg)
|
||||
{
|
||||
xid_t x;
|
||||
XA_data x;
|
||||
my_bool rc;
|
||||
xid_recovery_member *member= arg->member;
|
||||
Binlog_offset *ptr_commit_max= arg->binlog_coord;
|
||||
@ -2605,7 +2605,7 @@ static void xarecover_do_commit_or_rollback(handlerton *hton,
|
||||
// Populate xid using the server_id from original transaction
|
||||
x.set(member->xid, member->server_id);
|
||||
else
|
||||
x= *member->full_xid;
|
||||
(XID)x= *member->full_xid;
|
||||
|
||||
rc= xarecover_decide_to_commit(member, ptr_commit_max) ?
|
||||
hton->commit_by_xid(hton, &x) : hton->rollback_by_xid(hton, &x);
|
||||
|
@ -890,6 +890,7 @@ typedef ulonglong my_xid; // this line is the same as in log_event.h
|
||||
#define COMPATIBLE_DATA_YES 0
|
||||
#define COMPATIBLE_DATA_NO 1
|
||||
|
||||
|
||||
/**
|
||||
struct xid_t is binary compatible with the XID structure as
|
||||
in the X/Open CAE Specification, Distributed Transaction Processing:
|
||||
@ -973,6 +974,12 @@ struct xid_t {
|
||||
};
|
||||
typedef struct xid_t XID;
|
||||
|
||||
struct Online_alter_cache_list;
|
||||
struct XA_data: XID
|
||||
{
|
||||
Online_alter_cache_list *online_alter_cache= NULL;
|
||||
};
|
||||
|
||||
/*
|
||||
Enumerates a sequence in the order of
|
||||
their creation that is in the top-down order of the index file.
|
||||
|
@ -38,7 +38,7 @@ struct table_savepoint: ilist_node<>
|
||||
table_savepoint(sv_id_t id, my_off_t pos): id(id), log_prev_pos(pos){}
|
||||
};
|
||||
|
||||
class online_alter_cache_data: public Sql_alloc, public ilist_node<>,
|
||||
class online_alter_cache_data: public ilist_node<>,
|
||||
public binlog_cache_data
|
||||
{
|
||||
public:
|
||||
@ -83,7 +83,7 @@ online_alter_cache_data *setup_cache_data(MEM_ROOT *root, TABLE_SHARE *share)
|
||||
{
|
||||
static ulong online_alter_cache_use= 0, online_alter_cache_disk_use= 0;
|
||||
|
||||
auto cache= new (root) online_alter_cache_data();
|
||||
auto cache= new online_alter_cache_data();
|
||||
if (!cache || open_cached_file(&cache->cache_log, mysql_tmpdir, LOG_PREFIX,
|
||||
(size_t)binlog_cache_size, MYF(MY_WME)))
|
||||
{
|
||||
@ -202,16 +202,15 @@ cleanup_cache_list(ilist<online_alter_cache_data> &list, bool ending_trans)
|
||||
|
||||
|
||||
static
|
||||
int online_alter_end_trans(handlerton *hton, THD *thd, bool all, bool commit)
|
||||
int online_alter_end_trans(Online_alter_cache_list &cache_list, THD *thd,
|
||||
bool is_ending_transaction, bool commit)
|
||||
{
|
||||
DBUG_ENTER("online_alter_end_trans");
|
||||
int error= 0;
|
||||
auto &cache_list= get_cache_list(hton, thd);
|
||||
|
||||
if (cache_list.empty())
|
||||
DBUG_RETURN(0);
|
||||
|
||||
bool is_ending_transaction= ending_trans(thd, all);
|
||||
|
||||
for (auto &cache: cache_list)
|
||||
{
|
||||
auto *binlog= cache.sink_log;
|
||||
@ -263,12 +262,16 @@ int online_alter_end_trans(handlerton *hton, THD *thd, bool all, bool commit)
|
||||
|
||||
cleanup_cache_list(cache_list, is_ending_transaction);
|
||||
|
||||
for (TABLE *table= thd->open_tables; table; table= table->next)
|
||||
table->online_alter_cache= NULL;
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
|
||||
void cleanup_tables(THD *thd)
|
||||
{
|
||||
for (TABLE *table= thd->open_tables; table; table= table->next)
|
||||
table->online_alter_cache= NULL;
|
||||
}
|
||||
|
||||
static
|
||||
int online_alter_savepoint_set(handlerton *hton, THD *thd, sv_id_t sv_id)
|
||||
{
|
||||
DBUG_ENTER("binlog_online_alter_savepoint");
|
||||
@ -289,7 +292,7 @@ int online_alter_savepoint_set(handlerton *hton, THD *thd, sv_id_t sv_id)
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
static
|
||||
int online_alter_savepoint_rollback(handlerton *hton, THD *thd, sv_id_t sv_id)
|
||||
{
|
||||
DBUG_ENTER("online_alter_savepoint_rollback");
|
||||
@ -310,6 +313,63 @@ int online_alter_savepoint_rollback(handlerton *hton, THD *thd, sv_id_t sv_id)
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
static int online_alter_commit(handlerton *hton, THD *thd, bool all)
|
||||
{
|
||||
int res= online_alter_end_trans(get_cache_list(hton, thd), thd,
|
||||
ending_trans(thd, all), true);
|
||||
cleanup_tables(thd);
|
||||
return res;
|
||||
};
|
||||
|
||||
static int online_alter_rollback(handlerton *hton, THD *thd, bool all)
|
||||
{
|
||||
int res= online_alter_end_trans(get_cache_list(hton, thd), thd,
|
||||
ending_trans(thd, all), false);
|
||||
cleanup_tables(thd);
|
||||
return res;
|
||||
};
|
||||
|
||||
static int online_alter_prepare(handlerton *hton, THD *thd, bool all)
|
||||
{
|
||||
auto &cache_list= get_cache_list(hton, thd);
|
||||
int res= 0;
|
||||
if (ending_trans(thd, all))
|
||||
{
|
||||
thd->transaction->xid_state.set_online_alter_cache(&cache_list);
|
||||
thd_set_ha_data(thd, hton, NULL);
|
||||
}
|
||||
else
|
||||
{
|
||||
res= online_alter_end_trans(cache_list, thd, false, true);
|
||||
}
|
||||
|
||||
cleanup_tables(thd);
|
||||
return res;
|
||||
};
|
||||
|
||||
static int online_alter_commit_by_xid(handlerton *hton, XID *x)
|
||||
{
|
||||
auto *xid= static_cast<XA_data*>(x);
|
||||
if (likely(xid->online_alter_cache == NULL))
|
||||
return 1;
|
||||
int res= online_alter_end_trans(*xid->online_alter_cache, current_thd,
|
||||
true, true);
|
||||
delete xid->online_alter_cache;
|
||||
xid->online_alter_cache= NULL;
|
||||
return res;
|
||||
};
|
||||
|
||||
static int online_alter_rollback_by_xid(handlerton *hton, XID *x)
|
||||
{
|
||||
auto *xid= static_cast<XA_data*>(x);
|
||||
if (likely(xid->online_alter_cache == NULL))
|
||||
return 1;
|
||||
int res= online_alter_end_trans(*xid->online_alter_cache, current_thd,
|
||||
true, false);
|
||||
delete xid->online_alter_cache;
|
||||
xid->online_alter_cache= NULL;
|
||||
return res;
|
||||
};
|
||||
|
||||
static int online_alter_close_connection(handlerton *hton, THD *thd)
|
||||
{
|
||||
@ -334,14 +394,14 @@ static int online_alter_log_init(void *p)
|
||||
online_alter_hton->savepoint_rollback_can_release_mdl=
|
||||
[](handlerton *hton, THD *thd){ return true; };
|
||||
|
||||
online_alter_hton->commit= [](handlerton *hton, THD *thd, bool all)
|
||||
{ return online_alter_end_trans(hton, thd, all, true); };
|
||||
online_alter_hton->rollback= [](handlerton *hton, THD *thd, bool all)
|
||||
{ return online_alter_end_trans(hton, thd, all, false); };
|
||||
online_alter_hton->commit_by_xid= [](handlerton *hton, XID *xid)
|
||||
{ return online_alter_end_trans(hton, current_thd, true, true); };
|
||||
online_alter_hton->rollback_by_xid= [](handlerton *hton, XID *xid)
|
||||
{ return online_alter_end_trans(hton, current_thd, true, false); };
|
||||
online_alter_hton->commit= online_alter_commit;
|
||||
online_alter_hton->rollback= online_alter_rollback;
|
||||
|
||||
|
||||
online_alter_hton->recover= [](handlerton*, XID*, uint){ return 0; };
|
||||
online_alter_hton->prepare= online_alter_prepare;
|
||||
online_alter_hton->commit_by_xid= online_alter_commit_by_xid;
|
||||
online_alter_hton->rollback_by_xid= online_alter_rollback_by_xid;
|
||||
|
||||
online_alter_hton->drop_table= [](handlerton *, const char*) { return -1; };
|
||||
online_alter_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN
|
||||
|
@ -1539,6 +1539,8 @@ void THD::cleanup(void)
|
||||
else
|
||||
trans_rollback(this);
|
||||
|
||||
DEBUG_SYNC(this, "THD_cleanup_after_trans_cleanup");
|
||||
|
||||
DBUG_ASSERT(open_tables == NULL);
|
||||
DBUG_ASSERT(m_transaction_psi == NULL);
|
||||
|
||||
|
12
sql/xa.cc
12
sql/xa.cc
@ -78,7 +78,7 @@ public:
|
||||
/* Error reported by the Resource Manager (RM) to the Transaction Manager. */
|
||||
uint rm_error;
|
||||
enum xa_states xa_state;
|
||||
XID xid;
|
||||
XA_data xid;
|
||||
bool is_set(int32_t flag)
|
||||
{ return m_state.load(std::memory_order_relaxed) & flag; }
|
||||
void set(int32_t flag)
|
||||
@ -140,6 +140,7 @@ public:
|
||||
{
|
||||
XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
|
||||
element->m_state= 0;
|
||||
new(&element->xid) XA_data();
|
||||
}
|
||||
static void lf_alloc_destructor(uchar *ptr)
|
||||
{
|
||||
@ -179,6 +180,11 @@ void XID_STATE::set_error(uint error)
|
||||
xid_cache_element->rm_error= error;
|
||||
}
|
||||
|
||||
void XID_STATE::set_online_alter_cache(Online_alter_cache_list *cache)
|
||||
{
|
||||
if (is_explicit_XA())
|
||||
xid_cache_element->xid.online_alter_cache= cache;
|
||||
}
|
||||
|
||||
void XID_STATE::er_xaer_rmfail() const
|
||||
{
|
||||
@ -646,7 +652,7 @@ bool trans_xa_commit(THD *thd)
|
||||
DBUG_ASSERT(!xid_state.xid_cache_element);
|
||||
|
||||
xid_state.xid_cache_element= xs;
|
||||
ha_commit_or_rollback_by_xid(thd->lex->xid, !res);
|
||||
ha_commit_or_rollback_by_xid(&xs->xid, !res);
|
||||
if (!res && thd->is_error())
|
||||
{
|
||||
// hton completion error retains xs/xid in the cache,
|
||||
@ -826,7 +832,7 @@ bool trans_xa_rollback(THD *thd)
|
||||
DBUG_ASSERT(!xid_state.xid_cache_element);
|
||||
|
||||
xid_state.xid_cache_element= xs;
|
||||
ha_commit_or_rollback_by_xid(thd->lex->xid, 0);
|
||||
ha_commit_or_rollback_by_xid(&xs->xid, 0);
|
||||
if (!res && thd->is_error())
|
||||
{
|
||||
goto _end_external_xid;
|
||||
|
1
sql/xa.h
1
sql/xa.h
@ -34,6 +34,7 @@ struct XID_STATE {
|
||||
bool check_has_uncommitted_xa() const;
|
||||
bool is_explicit_XA() const { return xid_cache_element != 0; }
|
||||
void set_error(uint error);
|
||||
void set_online_alter_cache(Online_alter_cache_list *);
|
||||
void er_xaer_rmfail() const;
|
||||
XID *get_xid() const;
|
||||
enum xa_states get_state_code() const;
|
||||
|
Reference in New Issue
Block a user