1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-29 05:21:33 +03:00
Avoiding ha_kill_query for aborts initiated by replicator
This commit is contained in:
Seppo Jaakola
2013-02-17 00:22:40 +02:00
parent 2a6aa0a312
commit 7144f7f2d1
8 changed files with 26 additions and 16 deletions

View File

@ -893,10 +893,11 @@ extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id)
{ {
thd->wsrep_last_query_id= id; thd->wsrep_last_query_id= id;
} }
extern "C" void wsrep_thd_awake(THD *thd, my_bool signal) extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal)
{ {
if (signal) if (signal)
{ {
thd->wsrep_bf_thd = bf_thd;
mysql_mutex_lock(&thd->LOCK_thd_data); mysql_mutex_lock(&thd->LOCK_thd_data);
thd->awake(KILL_QUERY); thd->awake(KILL_QUERY);
mysql_mutex_unlock(&thd->LOCK_thd_data); mysql_mutex_unlock(&thd->LOCK_thd_data);
@ -1114,7 +1115,7 @@ THD::THD()
wsrep_consistency_check = NO_CONSISTENCY_CHECK; wsrep_consistency_check = NO_CONSISTENCY_CHECK;
wsrep_status_vars = 0; wsrep_status_vars = 0;
wsrep_mysql_replicated = 0; wsrep_mysql_replicated = 0;
wsrep_bf_thd = NULL;
#endif #endif
/* Call to init() below requires fully initialized Open_tables_state. */ /* Call to init() below requires fully initialized Open_tables_state. */
reset_open_tables_state(this); reset_open_tables_state(this);
@ -1469,6 +1470,7 @@ void THD::init(void)
wsrep_seqno_changed= false; wsrep_seqno_changed= false;
wsrep_consistency_check = NO_CONSISTENCY_CHECK; wsrep_consistency_check = NO_CONSISTENCY_CHECK;
wsrep_mysql_replicated = 0; wsrep_mysql_replicated = 0;
wsrep_bf_thd = NULL;
#endif #endif
if (variables.sql_log_bin) if (variables.sql_log_bin)
variables.option_bits|= OPTION_BIN_LOG; variables.option_bits|= OPTION_BIN_LOG;
@ -1836,8 +1838,11 @@ void THD::awake(killed_state state_to_set)
/* Interrupt target waiting inside a storage engine. */ /* Interrupt target waiting inside a storage engine. */
if (state_to_set != NOT_KILLED) if (state_to_set != NOT_KILLED)
#ifdef WITH_WSREP
if (!wsrep_bf_thd || wsrep_bf_thd->wsrep_exec_mode == LOCAL_STATE)
#else
ha_kill_query(this, thd_kill_level(this)); ha_kill_query(this, thd_kill_level(this));
#endif /* WITH_WSREP */
/* Broadcast a condition to kick the target if it is waiting on it. */ /* Broadcast a condition to kick the target if it is waiting on it. */
if (mysys_var) if (mysys_var)
{ {

View File

@ -2376,6 +2376,7 @@ public:
wsrep_consistency_check; wsrep_consistency_check;
wsrep_stats_var* wsrep_status_vars; wsrep_stats_var* wsrep_status_vars;
int wsrep_mysql_replicated; int wsrep_mysql_replicated;
THD* wsrep_bf_thd;
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
/** /**
Internal parser state. Internal parser state.

View File

@ -786,6 +786,7 @@ bool do_command(THD *thd)
else if (thd->wsrep_conflict_state == ABORTED) else if (thd->wsrep_conflict_state == ABORTED)
{ {
thd->store_globals(); thd->store_globals();
thd->wsrep_bf_thd = NULL;
} }
thd->wsrep_query_state= QUERY_EXEC; thd->wsrep_query_state= QUERY_EXEC;
@ -1059,6 +1060,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
thd->mysys_var->abort = 0; thd->mysys_var->abort = 0;
thd->wsrep_conflict_state = NO_CONFLICT; thd->wsrep_conflict_state = NO_CONFLICT;
thd->wsrep_retry_counter = 0; thd->wsrep_retry_counter = 0;
thd->wsrep_bf_thd = NULL;
/* /*
Increment threads running to compensate dec_thread_running() called Increment threads running to compensate dec_thread_running() called
after dispatch_end label. after dispatch_end label.
@ -6131,6 +6133,7 @@ static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
} }
thd->wsrep_conflict_state= ABORTED; thd->wsrep_conflict_state= ABORTED;
thd->wsrep_bf_thd = NULL;
wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle); wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle);
break; break;
default: default:
@ -7992,6 +7995,7 @@ static void wsrep_client_rollback(THD *thd)
} }
mysql_mutex_lock(&thd->LOCK_wsrep_thd); mysql_mutex_lock(&thd->LOCK_wsrep_thd);
thd->wsrep_conflict_state= ABORTED; thd->wsrep_conflict_state= ABORTED;
thd->wsrep_bf_thd = NULL;
} }
static enum wsrep_status wsrep_apply_sql( static enum wsrep_status wsrep_apply_sql(

View File

@ -205,7 +205,7 @@ extern "C" query_id_t wsrep_thd_query_id(THD *thd);
extern "C" char * wsrep_thd_query(THD *thd); extern "C" char * wsrep_thd_query(THD *thd);
extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd); extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd);
extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id); extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id);
extern "C" void wsrep_thd_awake(THD *thd, my_bool signal); extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal);

View File

@ -12376,7 +12376,7 @@ wsrep_innobase_kill_one_trx(void *bf_thd_ptr, trx_t *bf_trx, trx_t *victim_trx,
WSREP_DEBUG("victim %llu in MUST ABORT state", WSREP_DEBUG("victim %llu in MUST ABORT state",
victim_trx->id); victim_trx->id);
wsrep_thd_UNLOCK(thd); wsrep_thd_UNLOCK(thd);
wsrep_thd_awake(thd, signal); wsrep_thd_awake(bf_thd, thd, signal);
DBUG_RETURN(0); DBUG_RETURN(0);
break; break;
case ABORTED: case ABORTED:
@ -12395,7 +12395,7 @@ wsrep_innobase_kill_one_trx(void *bf_thd_ptr, trx_t *bf_trx, trx_t *victim_trx,
WSREP_DEBUG("kill query for: %ld", WSREP_DEBUG("kill query for: %ld",
wsrep_thd_thread_id(thd)); wsrep_thd_thread_id(thd));
wsrep_thd_awake(thd, signal); wsrep_thd_awake(bf_thd, thd, signal);
WSREP_DEBUG("kill trx QUERY_COMMITTING for %llu", WSREP_DEBUG("kill trx QUERY_COMMITTING for %llu",
victim_trx->id); victim_trx->id);
@ -12448,14 +12448,14 @@ wsrep_innobase_kill_one_trx(void *bf_thd_ptr, trx_t *bf_trx, trx_t *victim_trx,
lock_cancel_waiting_and_release(wait_lock); lock_cancel_waiting_and_release(wait_lock);
} }
wsrep_thd_awake(thd, signal); wsrep_thd_awake(bf_thd, thd, signal);
} else { } else {
/* abort currently executing query */ /* abort currently executing query */
DBUG_PRINT("wsrep",("sending KILL_QUERY to: %ld", DBUG_PRINT("wsrep",("sending KILL_QUERY to: %ld",
wsrep_thd_thread_id(thd))); wsrep_thd_thread_id(thd)));
WSREP_DEBUG("kill query for: %ld", WSREP_DEBUG("kill query for: %ld",
wsrep_thd_thread_id(thd)); wsrep_thd_thread_id(thd));
wsrep_thd_awake(thd, signal); wsrep_thd_awake(bf_thd, thd, signal);
/* for BF thd, we need to prevent him from committing */ /* for BF thd, we need to prevent him from committing */
if (wsrep_thd_exec_mode(thd) == REPL_RECV) { if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
@ -12546,7 +12546,7 @@ wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd,
wsrep_thd_LOCK(victim_thd); wsrep_thd_LOCK(victim_thd);
wsrep_thd_set_conflict_state(victim_thd, MUST_ABORT); wsrep_thd_set_conflict_state(victim_thd, MUST_ABORT);
wsrep_thd_UNLOCK(victim_thd); wsrep_thd_UNLOCK(victim_thd);
wsrep_thd_awake(victim_thd, signal); wsrep_thd_awake(bf_thd, victim_thd, signal);
} }
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }

View File

@ -328,7 +328,7 @@ extern "C" query_id_t wsrep_thd_query_id(THD *thd);
extern "C" char * wsrep_thd_query(THD *thd); extern "C" char * wsrep_thd_query(THD *thd);
extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd); extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd);
extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id); extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id);
extern "C" void wsrep_thd_awake(THD *thd, my_bool signal); extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal);
#endif #endif
typedef struct trx_struct trx_t; typedef struct trx_struct trx_t;
/********************************************************************//** /********************************************************************//**

View File

@ -13502,7 +13502,7 @@ wsrep_innobase_kill_one_trx(trx_t *bf_trx, trx_t *victim_trx, ibool signal)
WSREP_DEBUG("victim %llu in MUST ABORT state", WSREP_DEBUG("victim %llu in MUST ABORT state",
victim_trx->id); victim_trx->id);
wsrep_thd_UNLOCK(thd); wsrep_thd_UNLOCK(thd);
wsrep_thd_awake(thd, signal); wsrep_thd_awake(bf_thd, thd, signal);
DBUG_RETURN(0); DBUG_RETURN(0);
break; break;
case ABORTED: case ABORTED:
@ -13521,7 +13521,7 @@ wsrep_innobase_kill_one_trx(trx_t *bf_trx, trx_t *victim_trx, ibool signal)
WSREP_DEBUG("kill query for: %ld", WSREP_DEBUG("kill query for: %ld",
wsrep_thd_thread_id(thd)); wsrep_thd_thread_id(thd));
wsrep_thd_awake(thd, signal); wsrep_thd_awake(bf_thd, thd, signal);
WSREP_DEBUG("kill trx QUERY_COMMITTING for %llu", WSREP_DEBUG("kill trx QUERY_COMMITTING for %llu",
victim_trx->id); victim_trx->id);
@ -13574,14 +13574,14 @@ wsrep_innobase_kill_one_trx(trx_t *bf_trx, trx_t *victim_trx, ibool signal)
lock_cancel_waiting_and_release(wait_lock); lock_cancel_waiting_and_release(wait_lock);
} }
wsrep_thd_awake(thd, signal); wsrep_thd_awake(bf_thd, thd, signal);
} else { } else {
/* abort currently executing query */ /* abort currently executing query */
DBUG_PRINT("wsrep",("sending KILL_QUERY to: %ld", DBUG_PRINT("wsrep",("sending KILL_QUERY to: %ld",
wsrep_thd_thread_id(thd))); wsrep_thd_thread_id(thd)));
WSREP_DEBUG("kill query for: %ld", WSREP_DEBUG("kill query for: %ld",
wsrep_thd_thread_id(thd)); wsrep_thd_thread_id(thd));
wsrep_thd_awake(thd, signal); wsrep_thd_awake(bf_thd, thd, signal);
/* for BF thd, we need to prevent him from committing */ /* for BF thd, we need to prevent him from committing */
if (wsrep_thd_exec_mode(thd) == REPL_RECV) { if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
@ -13672,7 +13672,7 @@ wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd,
wsrep_thd_LOCK(victim_thd); wsrep_thd_LOCK(victim_thd);
wsrep_thd_set_conflict_state(victim_thd, MUST_ABORT); wsrep_thd_set_conflict_state(victim_thd, MUST_ABORT);
wsrep_thd_UNLOCK(victim_thd); wsrep_thd_UNLOCK(victim_thd);
wsrep_thd_awake(victim_thd, signal); wsrep_thd_awake(bf_thd, victim_thd, signal);
} }
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }

View File

@ -404,7 +404,7 @@ extern "C" query_id_t wsrep_thd_query_id(THD *thd);
extern "C" char * wsrep_thd_query(THD *thd); extern "C" char * wsrep_thd_query(THD *thd);
extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd); extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd);
extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id); extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id);
extern "C" void wsrep_thd_awake(THD *thd, my_bool signal); extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal);
#endif #endif
typedef struct trx_struct trx_t; typedef struct trx_struct trx_t;
/********************************************************************//** /********************************************************************//**