1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Return error code from high_priority_service::adopt_transaction()

Adopt transaction may need to start a new transaction on DBMS side,
allow returning an error if the transaction start fails.
This commit is contained in:
Teemu Ollakka
2019-02-25 09:27:17 +02:00
parent c61811ed70
commit badf53a28d
6 changed files with 54 additions and 16 deletions

View File

@ -40,7 +40,7 @@ const wsrep::transaction& db::high_priority_service::transaction() const
return client_.client_state().transaction();
}
void db::high_priority_service::adopt_transaction(const wsrep::transaction&)
int db::high_priority_service::adopt_transaction(const wsrep::transaction&)
{
throw wsrep::not_implemented_error();
}

View File

@ -33,7 +33,7 @@ namespace db
int start_transaction(const wsrep::ws_handle&,
const wsrep::ws_meta&) override;
const wsrep::transaction& transaction() const override;
void adopt_transaction(const wsrep::transaction&) override;
int adopt_transaction(const wsrep::transaction&) override;
int apply_write_set(const wsrep::ws_meta&,
const wsrep::const_buffer&) override;
int append_fragment_and_commit(

View File

@ -60,7 +60,7 @@ namespace wsrep
/**
* Adopt a transaction.
*/
virtual void adopt_transaction(const wsrep::transaction&) = 0;
virtual int adopt_transaction(const wsrep::transaction&) = 0;
/**
* Apply a write set.

View File

@ -49,6 +49,17 @@ static bool is_bootstrap(const std::string& cluster_address, bool bootstrap)
return (bootstrap || cluster_address == "gcomm://");
}
// Helper method to provide detailed error message if transaction
// adopt for fragment removal fails.
static void log_adopt_error(const wsrep::transaction& transaction)
{
wsrep::log_warning() << "Adopting a transaction ("
<< transaction.server_id() << "," << transaction.id()
<< ") for rollback failed, "
<< "this may leave stale entries to streaming log "
<< "which may need to be removed manually.";
}
static int apply_fragment(wsrep::high_priority_service& high_priority_service,
wsrep::high_priority_service& streaming_applier,
const wsrep::ws_handle& ws_handle,
@ -114,11 +125,18 @@ static int rollback_fragment(wsrep::server_state& server_state,
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta)
{
int ret= 0;
int ret(0);
// Adopts transaction state and starts a transaction for
// high priority service
high_priority_service.adopt_transaction(
streaming_applier->transaction());
int adopt_error;
if ((adopt_error = high_priority_service.adopt_transaction(
streaming_applier->transaction())))
{
log_adopt_error(streaming_applier->transaction());
}
// Even if the adopt above fails we roll back the streaming transaction.
// Adopt failure will leave stale entries in streaming log which can
// be removed manually.
{
wsrep::high_priority_switch ws(
high_priority_service, *streaming_applier);
@ -132,8 +150,11 @@ static int rollback_fragment(wsrep::server_state& server_state,
server_state.server_service().release_high_priority_service(
streaming_applier);
if (adopt_error == 0)
{
high_priority_service.remove_fragments(ws_meta);
high_priority_service.commit(ws_handle, ws_meta);
}
high_priority_service.after_apply();
return ret;
}
@ -1014,7 +1035,13 @@ void wsrep::server_state::convert_streaming_client_to_applier(
wsrep::high_priority_service* streaming_applier(
server_service_.streaming_applier_service(
client_state->client_service()));
streaming_applier->adopt_transaction(client_state->transaction());
if (streaming_applier->adopt_transaction(client_state->transaction()))
{
log_adopt_error(client_state->transaction());
streaming_applier->after_apply();
server_service_.release_high_priority_service(streaming_applier);
return;
}
if (streaming_appliers_.insert(
std::make_pair(
std::make_pair(client_state->transaction().server_id(),
@ -1275,8 +1302,15 @@ void wsrep::server_state::close_orphaned_sr_transactions(
wsrep::id server_id(i->first.first);
wsrep::transaction_id transaction_id(i->first.second);
wsrep::high_priority_service* streaming_applier(i->second);
high_priority_service.adopt_transaction(
streaming_applier->transaction());
int adopt_error;
if ((adopt_error = high_priority_service.adopt_transaction(
streaming_applier->transaction())))
{
log_adopt_error(streaming_applier->transaction());
}
// Even if the transaction adopt above fails, we roll back
// the transaction. Adopt error will leave stale entries
// in the streaming log which can be removed manually.
{
wsrep::high_priority_switch sw(high_priority_service,
*streaming_applier);
@ -1292,9 +1326,12 @@ void wsrep::server_state::close_orphaned_sr_transactions(
wsrep::stid(server_id, transaction_id, wsrep::client_id()),
wsrep::seqno::undefined(), 0);
lock.unlock();
if (adopt_error == 0)
{
high_priority_service.remove_fragments(ws_meta);
high_priority_service.commit(wsrep::ws_handle(transaction_id, 0),
ws_meta);
}
high_priority_service.after_apply();
lock.lock();
}

View File

@ -26,10 +26,11 @@ int wsrep::mock_high_priority_service::start_transaction(
return client_state_->start_transaction(ws_handle, ws_meta);
}
void wsrep::mock_high_priority_service::adopt_transaction(
int wsrep::mock_high_priority_service::adopt_transaction(
const wsrep::transaction& transaction)
{
client_state_->adopt_transaction(transaction);
return 0;
}
int wsrep::mock_high_priority_service::apply_write_set(

View File

@ -45,7 +45,7 @@ namespace wsrep
const wsrep::transaction& transaction() const WSREP_OVERRIDE
{ return client_state_->transaction(); }
void adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE;
int adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE;
int apply_write_set(const wsrep::ws_meta&,
const wsrep::const_buffer&) WSREP_OVERRIDE;
int append_fragment_and_commit(