diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 89074ac9ad0..117f23afb99 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -6052,6 +6052,89 @@ void mysql_init_multi_delete(LEX *lex) } #ifdef WITH_WSREP +void wsrep_replay_transaction(THD *thd) +{ + /* checking if BF trx must be replayed */ + if (thd->wsrep_conflict_state== MUST_REPLAY) + { + if (thd->wsrep_exec_mode!= REPL_RECV) + { + if (thd->stmt_da->is_sent) + { + WSREP_ERROR("replay issue, thd has reported status already"); + } + thd->stmt_da->reset_diagnostics_area(); + + thd->wsrep_conflict_state= REPLAYING; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + mysql_reset_thd_for_next_command(thd, opt_userstat_running); + thd->killed= NOT_KILLED; + close_thread_tables(thd); + if (thd->locked_tables_mode && thd->lock) + { + WSREP_DEBUG("releasing table lock for replaying (%ld)", + thd->thread_id); + thd->locked_tables_list.unlock_locked_tables(thd); + thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); + } + thd->mdl_context.release_transactional_locks(); + + thd_proc_info(thd, "wsrep replaying trx"); + WSREP_DEBUG("replay trx: %s %lld", + thd->query() ? thd->query() : "void", + (long long)thd->wsrep_trx_seqno); + struct wsrep_thd_shadow shadow; + wsrep_prepare_bf_thd(thd, &shadow); + int rcode = wsrep->replay_trx(wsrep, + &thd->wsrep_trx_handle, + (void *)thd); + + wsrep_return_from_bf_mode(thd, &shadow); + if (thd->wsrep_conflict_state!= REPLAYING) + WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state ); + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + + switch (rcode) + { + case WSREP_OK: + thd->wsrep_conflict_state= NO_CONFLICT; + wsrep->post_commit(wsrep, &thd->wsrep_trx_handle); + WSREP_DEBUG("trx_replay successful for: %ld %llu", + thd->thread_id, (long long)thd->real_id); + break; + case WSREP_TRX_FAIL: + if (thd->stmt_da->is_sent) + { + WSREP_ERROR("replay failed, thd has reported status"); + } + else + { + WSREP_DEBUG("replay failed, rolling back"); + my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); + } + thd->wsrep_conflict_state= ABORTED; + thd->wsrep_bf_thd = NULL; + wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle); + break; + default: + WSREP_ERROR("trx_replay failed for: %d, query: %s", + rcode, thd->query() ? thd->query() : "void"); + /* we're now in inconsistent state, must abort */ + unireg_abort(1); + break; + } + mysql_mutex_lock(&LOCK_wsrep_replaying); + wsrep_replaying--; + WSREP_DEBUG("replaying decreased: %d, thd: %lu", + wsrep_replaying, thd->thread_id); + mysql_cond_broadcast(&COND_wsrep_replaying); + mysql_mutex_unlock(&LOCK_wsrep_replaying); + } + } +} + static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, Parser_state *parser_state) { @@ -6078,79 +6161,9 @@ static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, WSREP_DEBUG("abort in exec query state, avoiding autocommit"); } - /* checking if BF trx must be replayed */ - if (thd->wsrep_conflict_state== MUST_REPLAY) { - if (thd->wsrep_exec_mode!= REPL_RECV) { - if (thd->stmt_da->is_sent) { - WSREP_ERROR("replay issue, thd has reported status already"); - } - thd->stmt_da->reset_diagnostics_area(); - - thd->wsrep_conflict_state= REPLAYING; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - - mysql_reset_thd_for_next_command(thd, opt_userstat_running); - thd->killed= NOT_KILLED; - close_thread_tables(thd); - if (thd->locked_tables_mode && thd->lock) - { - WSREP_DEBUG("releasing table lock for replaying (%ld)", - thd->thread_id); - thd->locked_tables_list.unlock_locked_tables(thd); - thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); - } - thd->mdl_context.release_transactional_locks(); - - thd_proc_info(thd, "wsrep replaying trx"); - WSREP_DEBUG("replay trx: %s %lld", - thd->query() ? thd->query() : "void", - (long long)thd->wsrep_trx_seqno); - struct wsrep_thd_shadow shadow; - wsrep_prepare_bf_thd(thd, &shadow); - int rcode = wsrep->replay_trx(wsrep, - &thd->wsrep_trx_handle, - (void *)thd); - - wsrep_return_from_bf_mode(thd, &shadow); - if (thd->wsrep_conflict_state!= REPLAYING) - WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state ); - - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - - switch (rcode) { - case WSREP_OK: - thd->wsrep_conflict_state= NO_CONFLICT; - wsrep->post_commit(wsrep, &thd->wsrep_trx_handle); - WSREP_DEBUG("trx_replay successful for: %ld %llu", - thd->thread_id, (long long)thd->real_id); - break; - case WSREP_TRX_FAIL: - if (thd->stmt_da->is_sent) { - WSREP_ERROR("replay failed, thd has reported status"); - } - else - { - WSREP_DEBUG("replay failed, rolling back"); - my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); - } - thd->wsrep_conflict_state= ABORTED; - thd->wsrep_bf_thd = NULL; - wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle); - break; - default: - WSREP_ERROR("trx_replay failed for: %d, query: %s", - rcode, thd->query() ? thd->query() : "void"); - /* we're now in inconsistent state, must abort */ - unireg_abort(1); - break; - } - mysql_mutex_lock(&LOCK_wsrep_replaying); - wsrep_replaying--; - WSREP_DEBUG("replaying decreased: %d, thd: %lu", - wsrep_replaying, thd->thread_id); - mysql_cond_broadcast(&COND_wsrep_replaying); - mysql_mutex_unlock(&LOCK_wsrep_replaying); - } + if (thd->wsrep_conflict_state== MUST_REPLAY) + { + wsrep_replay_transaction(thd); } /* setting error code for BF aborted trxs */ diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc index c1f07581106..10aa1ccec26 100644 --- a/sql/sql_prepare.cc +++ b/sql/sql_prepare.cc @@ -3483,7 +3483,9 @@ Prepared_statement::set_parameters(String *expanded_query, return res; } - +#ifdef WITH_WSREP +void wsrep_replay_transaction(THD *thd); +#endif /* WITH_WSREP */ /** Execute a prepared statement. Re-prepare it a limited number of times if necessary. @@ -3563,6 +3565,22 @@ reexecute: error= execute(expanded_query, open_cursor) || thd->is_error(); thd->m_reprepare_observer= NULL; +#ifdef WITH_WSREP + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + switch (thd->wsrep_conflict_state) + { + case CERT_FAILURE: + WSREP_DEBUG("PS execute fail for CERT_FAILURE: thd: %ld err: %d", + thd->thread_id, thd->stmt_da->sql_errno() ); + thd->wsrep_conflict_state = NO_CONFLICT; + break; + + case MUST_REPLAY: + (void)wsrep_replay_transaction(thd); + default: break; + } + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); +#endif /* WITH_WSREP */ if (error && !thd->is_fatal_error && !thd->killed && reprepare_observer.is_invalidated() &&