1
0
mirror of https://github.com/MariaDB/server.git synced 2025-09-02 09:41:40 +03:00

Backport of:

------------------------------------------------------------
revno: 2630.22.3
committer: Davi Arnaut <Davi.Arnaut@Sun.COM>
branch nick: 4284-6.0
timestamp: Thu 2008-08-07 22:33:43 -0300
message:
WL#4284: Transactional DDL locking

Make transaction management more modular through a new interface.

The overall objective of this change is to provide groundwork
for the design of transactional DDL locking by cleaning up the
transaction high level API to better distinguish operations implicit
and explicit, and single statement transaction from operations on
the normal transaction.

Having a a high-level interface for transaction management provides
a better base for implementing transactional concepts that are not
always tied to storage engines and also makes it easier to interect
with other higher level modules of the server.
This commit is contained in:
Konstantin Osipov
2009-12-03 21:37:38 +03:00
parent 3a8a750919
commit c43f894c51
24 changed files with 819 additions and 581 deletions

View File

@@ -28,6 +28,7 @@
#include "sp_cache.h"
#include "events.h"
#include "sql_trigger.h"
#include "transaction.h"
#include "sql_prepare.h"
#include "probes_mysql.h"
@@ -88,113 +89,6 @@ const char *xa_state_names[]={
"NON-EXISTING", "ACTIVE", "IDLE", "PREPARED", "ROLLBACK ONLY"
};
/**
Mark a XA transaction as rollback-only if the RM unilaterally
rolled back the transaction branch.
@note If a rollback was requested by the RM, this function sets
the appropriate rollback error code and transits the state
to XA_ROLLBACK_ONLY.
@return TRUE if transaction was rolled back or if the transaction
state is XA_ROLLBACK_ONLY. FALSE otherwise.
*/
static bool xa_trans_rolled_back(XID_STATE *xid_state)
{
if (xid_state->rm_error)
{
switch (xid_state->rm_error) {
case ER_LOCK_WAIT_TIMEOUT:
my_error(ER_XA_RBTIMEOUT, MYF(0));
break;
case ER_LOCK_DEADLOCK:
my_error(ER_XA_RBDEADLOCK, MYF(0));
break;
default:
my_error(ER_XA_RBROLLBACK, MYF(0));
}
xid_state->xa_state= XA_ROLLBACK_ONLY;
}
return (xid_state->xa_state == XA_ROLLBACK_ONLY);
}
/**
Rollback work done on behalf of at ransaction branch.
*/
static bool xa_trans_rollback(THD *thd)
{
/*
Resource Manager error is meaningless at this point, as we perform
explicit rollback request by user. We must reset rm_error before
calling ha_rollback(), so thd->transaction.xid structure gets reset
by ha_rollback()/THD::transaction::cleanup().
*/
thd->transaction.xid_state.rm_error= 0;
bool status= test(ha_rollback(thd));
thd->options&= ~(ulong) OPTION_BEGIN;
thd->transaction.all.modified_non_trans_table= FALSE;
thd->server_status&= ~SERVER_STATUS_IN_TRANS;
xid_cache_delete(&thd->transaction.xid_state);
thd->transaction.xid_state.xa_state= XA_NOTR;
return status;
}
bool end_active_trans(THD *thd)
{
int error=0;
DBUG_ENTER("end_active_trans");
if (unlikely(thd->in_sub_stmt))
{
my_error(ER_COMMIT_NOT_ALLOWED_IN_SF_OR_TRG, MYF(0));
DBUG_RETURN(1);
}
if (thd->transaction.xid_state.xa_state != XA_NOTR)
{
my_error(ER_XAER_RMFAIL, MYF(0),
xa_state_names[thd->transaction.xid_state.xa_state]);
DBUG_RETURN(1);
}
if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN |
OPTION_TABLE_LOCK))
{
DBUG_PRINT("info",("options: 0x%llx", thd->options));
/* Safety if one did "drop table" on locked tables */
if (!thd->locked_tables_mode)
thd->options&= ~OPTION_TABLE_LOCK;
thd->server_status&= ~SERVER_STATUS_IN_TRANS;
if (ha_commit(thd))
error=1;
}
thd->options&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
thd->transaction.all.modified_non_trans_table= FALSE;
DBUG_RETURN(error);
}
bool begin_trans(THD *thd)
{
int error=0;
if (unlikely(thd->in_sub_stmt))
{
my_error(ER_COMMIT_NOT_ALLOWED_IN_SF_OR_TRG, MYF(0));
return 1;
}
thd->locked_tables_list.unlock_locked_tables(thd);
if (end_active_trans(thd))
error= -1;
else
{
thd->options|= OPTION_BEGIN;
thd->server_status|= SERVER_STATUS_IN_TRANS;
}
return error;
}
#ifdef HAVE_REPLICATION
/**
@@ -256,9 +150,9 @@ static bool opt_implicit_commit(THD *thd, uint mask)
if (!skip)
{
/* Commit or rollback the statement transaction. */
ha_autocommit_or_rollback(thd, thd->is_error());
thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
/* Commit the normal transaction if one is active. */
res= end_active_trans(thd);
res= trans_commit_implicit(thd);
}
DBUG_RETURN(res);
@@ -691,80 +585,6 @@ void cleanup_items(Item *item)
DBUG_VOID_RETURN;
}
/**
Ends the current transaction and (maybe) begin the next.
@param thd Current thread
@param completion Completion type
@retval
0 OK
*/
int end_trans(THD *thd, enum enum_mysql_completiontype completion)
{
bool do_release= 0;
int res= 0;
DBUG_ENTER("end_trans");
if (unlikely(thd->in_sub_stmt))
{
my_error(ER_COMMIT_NOT_ALLOWED_IN_SF_OR_TRG, MYF(0));
DBUG_RETURN(1);
}
if (thd->transaction.xid_state.xa_state != XA_NOTR)
{
my_error(ER_XAER_RMFAIL, MYF(0),
xa_state_names[thd->transaction.xid_state.xa_state]);
DBUG_RETURN(1);
}
switch (completion) {
case COMMIT:
/*
We don't use end_active_trans() here to ensure that this works
even if there is a problem with the OPTION_AUTO_COMMIT flag
(Which of course should never happen...)
*/
thd->server_status&= ~SERVER_STATUS_IN_TRANS;
res= ha_commit(thd);
thd->options&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
thd->transaction.all.modified_non_trans_table= FALSE;
break;
case COMMIT_RELEASE:
do_release= 1; /* fall through */
case COMMIT_AND_CHAIN:
res= end_active_trans(thd);
if (!res && completion == COMMIT_AND_CHAIN)
res= begin_trans(thd);
break;
case ROLLBACK_RELEASE:
do_release= 1; /* fall through */
case ROLLBACK:
case ROLLBACK_AND_CHAIN:
{
thd->server_status&= ~SERVER_STATUS_IN_TRANS;
if (ha_rollback(thd))
res= -1;
thd->options&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
thd->transaction.all.modified_non_trans_table= FALSE;
if (!res && (completion == ROLLBACK_AND_CHAIN))
res= begin_trans(thd);
break;
}
default:
res= -1;
my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
DBUG_RETURN(-1);
}
if (res < 0)
my_error(thd->killed_errno(), MYF(0));
else if ((res == 0) && do_release)
thd->killed= THD::KILL_CONNECTION;
DBUG_RETURN(res);
}
#ifndef EMBEDDED_LIBRARY
/**
@@ -1346,7 +1166,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
bool not_used;
status_var_increment(thd->status_var.com_stat[SQLCOM_FLUSH]);
ulong options= (ulong) (uchar) packet[0];
if (end_active_trans(thd))
if (trans_commit_implicit(thd))
break;
if (check_global_access(thd,RELOAD_ACL))
break;
@@ -1374,7 +1194,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
#endif
if (reload_acl_and_cache(thd, options, (TABLE_LIST*) 0, &not_used))
break;
if (end_active_trans(thd))
if (trans_commit_implicit(thd))
break;
my_ok(thd);
break;
@@ -1532,7 +1352,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
/* If commit fails, we should be able to reset the OK status. */
thd->stmt_da->can_overwrite_status= TRUE;
ha_autocommit_or_rollback(thd, thd->is_error());
thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
thd->stmt_da->can_overwrite_status= FALSE;
thd->transaction.stmt.reset();
@@ -3485,7 +3305,7 @@ end_with_restore_list:
thd->locked_tables_list.unlock_locked_tables(thd);
if (thd->options & OPTION_TABLE_LOCK)
{
end_active_trans(thd);
trans_commit_implicit(thd);
thd->options&= ~(OPTION_TABLE_LOCK);
}
if (thd->global_read_lock)
@@ -3495,7 +3315,7 @@ end_with_restore_list:
case SQLCOM_LOCK_TABLES:
thd->locked_tables_list.unlock_locked_tables(thd);
/* we must end the trasaction first, regardless of anything */
if (end_active_trans(thd))
if (trans_commit_implicit(thd))
goto error;
if (check_table_access(thd, LOCK_TABLES_ACL | SELECT_ACL, all_tables,
FALSE, UINT_MAX, FALSE))
@@ -3524,8 +3344,8 @@ end_with_restore_list:
can free its locks if LOCK TABLES locked some tables before finding
that it can't lock a table in its list
*/
ha_autocommit_or_rollback(thd, 1);
end_active_trans(thd);
trans_rollback_stmt(thd);
trans_commit_implicit(thd);
thd->options&= ~(OPTION_TABLE_LOCK);
}
else
@@ -4007,132 +3827,50 @@ end_with_restore_list:
break;
case SQLCOM_BEGIN:
if (thd->transaction.xid_state.xa_state != XA_NOTR)
{
my_error(ER_XAER_RMFAIL, MYF(0),
xa_state_names[thd->transaction.xid_state.xa_state]);
break;
}
if (begin_trans(thd))
if (trans_begin(thd, lex->start_transaction_opt))
goto error;
if (lex->start_transaction_opt & MYSQL_START_TRANS_OPT_WITH_CONS_SNAPSHOT)
{
if (ha_start_consistent_snapshot(thd))
goto error;
}
my_ok(thd);
break;
case SQLCOM_COMMIT:
DBUG_ASSERT(thd->lock == NULL ||
thd->locked_tables_mode == LTM_LOCK_TABLES);
if (end_trans(thd, lex->tx_release ? COMMIT_RELEASE :
lex->tx_chain ? COMMIT_AND_CHAIN : COMMIT))
if (trans_commit(thd))
goto error;
/* Begin transaction with the same isolation level. */
if (lex->tx_chain && trans_begin(thd))
goto error;
/* Disconnect the current client connection. */
if (lex->tx_release)
thd->killed= THD::KILL_CONNECTION;
my_ok(thd);
break;
case SQLCOM_ROLLBACK:
DBUG_ASSERT(thd->lock == NULL ||
thd->locked_tables_mode == LTM_LOCK_TABLES);
if (end_trans(thd, lex->tx_release ? ROLLBACK_RELEASE :
lex->tx_chain ? ROLLBACK_AND_CHAIN : ROLLBACK))
if (trans_rollback(thd))
goto error;
/* Begin transaction with the same isolation level. */
if (lex->tx_chain && trans_begin(thd))
goto error;
/* Disconnect the current client connection. */
if (lex->tx_release)
thd->killed= THD::KILL_CONNECTION;
my_ok(thd);
break;
case SQLCOM_RELEASE_SAVEPOINT:
{
SAVEPOINT *sv;
for (sv=thd->transaction.savepoints; sv; sv=sv->prev)
{
if (my_strnncoll(system_charset_info,
(uchar *)lex->ident.str, lex->ident.length,
(uchar *)sv->name, sv->length) == 0)
break;
}
if (sv)
{
if (ha_release_savepoint(thd, sv))
res= TRUE; // cannot happen
else
my_ok(thd);
thd->transaction.savepoints=sv->prev;
}
else
my_error(ER_SP_DOES_NOT_EXIST, MYF(0), "SAVEPOINT", lex->ident.str);
if (trans_release_savepoint(thd, lex->ident))
goto error;
my_ok(thd);
break;
}
case SQLCOM_ROLLBACK_TO_SAVEPOINT:
{
SAVEPOINT *sv;
for (sv=thd->transaction.savepoints; sv; sv=sv->prev)
{
if (my_strnncoll(system_charset_info,
(uchar *)lex->ident.str, lex->ident.length,
(uchar *)sv->name, sv->length) == 0)
break;
}
if (sv)
{
if (ha_rollback_to_savepoint(thd, sv))
res= TRUE; // cannot happen
else
{
if (((thd->options & OPTION_KEEP_LOG) ||
thd->transaction.all.modified_non_trans_table) &&
!thd->slave_thread)
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_WARNING_NOT_COMPLETE_ROLLBACK,
ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
my_ok(thd);
}
thd->transaction.savepoints=sv;
}
else
my_error(ER_SP_DOES_NOT_EXIST, MYF(0), "SAVEPOINT", lex->ident.str);
if (trans_rollback_to_savepoint(thd, lex->ident))
goto error;
my_ok(thd);
break;
}
case SQLCOM_SAVEPOINT:
if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN) ||
thd->in_sub_stmt) || !opt_using_transactions)
my_ok(thd);
else
{
SAVEPOINT **sv, *newsv;
for (sv=&thd->transaction.savepoints; *sv; sv=&(*sv)->prev)
{
if (my_strnncoll(system_charset_info,
(uchar *)lex->ident.str, lex->ident.length,
(uchar *)(*sv)->name, (*sv)->length) == 0)
break;
}
if (*sv) /* old savepoint of the same name exists */
{
newsv=*sv;
ha_release_savepoint(thd, *sv); // it cannot fail
*sv=(*sv)->prev;
}
else if ((newsv=(SAVEPOINT *) alloc_root(&thd->transaction.mem_root,
savepoint_alloc_size)) == 0)
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
break;
}
newsv->name=strmake_root(&thd->transaction.mem_root,
lex->ident.str, lex->ident.length);
newsv->length=lex->ident.length;
/*
if we'll get an error here, don't add new savepoint to the list.
we'll lose a little bit of memory in transaction mem_root, but it'll
be free'd when transaction ends anyway
*/
if (ha_savepoint(thd, newsv))
res= TRUE;
else
{
newsv->prev=thd->transaction.savepoints;
thd->transaction.savepoints=newsv;
my_ok(thd);
}
}
if (trans_savepoint(thd, lex->ident))
goto error;
my_ok(thd);
break;
case SQLCOM_CREATE_PROCEDURE:
case SQLCOM_CREATE_SPFUNCTION:
@@ -4456,7 +4194,7 @@ create_sp_error:
lex->sql_command == SQLCOM_DROP_PROCEDURE, 0))
goto error;
if (end_active_trans(thd))
if (trans_commit_implicit(thd))
goto error;
#ifndef NO_EMBEDDED_ACCESS_CHECKS
if (sp_automatic_privileges && !opt_noacl &&
@@ -4620,185 +4358,29 @@ create_sp_error:
break;
}
case SQLCOM_XA_START:
if (thd->transaction.xid_state.xa_state == XA_IDLE &&
thd->lex->xa_opt == XA_RESUME)
{
if (! thd->transaction.xid_state.xid.eq(thd->lex->xid))
{
my_error(ER_XAER_NOTA, MYF(0));
break;
}
thd->transaction.xid_state.xa_state=XA_ACTIVE;
my_ok(thd);
break;
}
if (thd->lex->xa_opt != XA_NONE)
{ // JOIN is not supported yet. TODO
my_error(ER_XAER_INVAL, MYF(0));
break;
}
if (thd->transaction.xid_state.xa_state != XA_NOTR)
{
my_error(ER_XAER_RMFAIL, MYF(0),
xa_state_names[thd->transaction.xid_state.xa_state]);
break;
}
if (thd->locked_tables_mode || thd->active_transaction())
{
my_error(ER_XAER_OUTSIDE, MYF(0));
break;
}
if (xid_cache_search(thd->lex->xid))
{
my_error(ER_XAER_DUPID, MYF(0));
break;
}
DBUG_ASSERT(thd->transaction.xid_state.xid.is_null());
thd->transaction.xid_state.xa_state=XA_ACTIVE;
thd->transaction.xid_state.rm_error= 0;
thd->transaction.xid_state.xid.set(thd->lex->xid);
xid_cache_insert(&thd->transaction.xid_state);
thd->transaction.all.modified_non_trans_table= FALSE;
thd->options= ((thd->options & ~(OPTION_KEEP_LOG)) | OPTION_BEGIN);
thd->server_status|= SERVER_STATUS_IN_TRANS;
if (trans_xa_start(thd))
goto error;
my_ok(thd);
break;
case SQLCOM_XA_END:
/* fake it */
if (thd->lex->xa_opt != XA_NONE)
{ // SUSPEND and FOR MIGRATE are not supported yet. TODO
my_error(ER_XAER_INVAL, MYF(0));
break;
}
if (thd->transaction.xid_state.xa_state != XA_ACTIVE)
{
my_error(ER_XAER_RMFAIL, MYF(0),
xa_state_names[thd->transaction.xid_state.xa_state]);
break;
}
if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
{
my_error(ER_XAER_NOTA, MYF(0));
break;
}
if (xa_trans_rolled_back(&thd->transaction.xid_state))
break;
thd->transaction.xid_state.xa_state=XA_IDLE;
if (trans_xa_end(thd))
goto error;
my_ok(thd);
break;
case SQLCOM_XA_PREPARE:
if (thd->transaction.xid_state.xa_state != XA_IDLE)
{
my_error(ER_XAER_RMFAIL, MYF(0),
xa_state_names[thd->transaction.xid_state.xa_state]);
break;
}
if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
{
my_error(ER_XAER_NOTA, MYF(0));
break;
}
if (ha_prepare(thd))
{
my_error(ER_XA_RBROLLBACK, MYF(0));
xid_cache_delete(&thd->transaction.xid_state);
thd->transaction.xid_state.xa_state=XA_NOTR;
break;
}
thd->transaction.xid_state.xa_state=XA_PREPARED;
if (trans_xa_prepare(thd))
goto error;
my_ok(thd);
break;
case SQLCOM_XA_COMMIT:
if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
{
XID_STATE *xs=xid_cache_search(thd->lex->xid);
if (!xs || xs->in_thd)
my_error(ER_XAER_NOTA, MYF(0));
else if (xa_trans_rolled_back(xs))
{
ha_commit_or_rollback_by_xid(thd->lex->xid, 0);
xid_cache_delete(xs);
break;
}
else
{
ha_commit_or_rollback_by_xid(thd->lex->xid, 1);
xid_cache_delete(xs);
my_ok(thd);
}
break;
}
if (xa_trans_rolled_back(&thd->transaction.xid_state))
{
xa_trans_rollback(thd);
break;
}
if (thd->transaction.xid_state.xa_state == XA_IDLE &&
thd->lex->xa_opt == XA_ONE_PHASE)
{
int r;
if ((r= ha_commit(thd)))
my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0));
else
my_ok(thd);
}
else if (thd->transaction.xid_state.xa_state == XA_PREPARED &&
thd->lex->xa_opt == XA_NONE)
{
if (wait_if_global_read_lock(thd, 0, 0))
{
ha_rollback(thd);
my_error(ER_XAER_RMERR, MYF(0));
}
else
{
if (ha_commit_one_phase(thd, 1))
my_error(ER_XAER_RMERR, MYF(0));
else
my_ok(thd);
start_waiting_global_read_lock(thd);
}
}
else
{
my_error(ER_XAER_RMFAIL, MYF(0),
xa_state_names[thd->transaction.xid_state.xa_state]);
break;
}
thd->options&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
thd->transaction.all.modified_non_trans_table= FALSE;
thd->server_status&= ~SERVER_STATUS_IN_TRANS;
xid_cache_delete(&thd->transaction.xid_state);
thd->transaction.xid_state.xa_state=XA_NOTR;
if (trans_xa_commit(thd))
goto error;
my_ok(thd);
break;
case SQLCOM_XA_ROLLBACK:
if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
{
XID_STATE *xs=xid_cache_search(thd->lex->xid);
if (!xs || xs->in_thd)
my_error(ER_XAER_NOTA, MYF(0));
else
{
bool ok= !xa_trans_rolled_back(xs);
ha_commit_or_rollback_by_xid(thd->lex->xid, 0);
xid_cache_delete(xs);
if (ok)
my_ok(thd);
}
break;
}
if (thd->transaction.xid_state.xa_state != XA_IDLE &&
thd->transaction.xid_state.xa_state != XA_PREPARED &&
thd->transaction.xid_state.xa_state != XA_ROLLBACK_ONLY)
{
my_error(ER_XAER_RMFAIL, MYF(0),
xa_state_names[thd->transaction.xid_state.xa_state]);
break;
}
if (xa_trans_rollback(thd))
my_error(ER_XAER_RMERR, MYF(0));
else
my_ok(thd);
if (trans_xa_rollback(thd))
goto error;
my_ok(thd);
break;
case SQLCOM_XA_RECOVER:
res= mysql_xa_recover(thd);