diff --git a/dbsim/db_storage_service.hpp b/dbsim/db_storage_service.hpp index b979af0..00279ab 100644 --- a/dbsim/db_storage_service.hpp +++ b/dbsim/db_storage_service.hpp @@ -23,6 +23,8 @@ namespace db { throw wsrep::not_implemented_error(); } int update_fragment_meta(const wsrep::ws_meta&) override { throw wsrep::not_implemented_error(); } + int remove_fragments() override + { throw wsrep::not_implemented_error(); } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override { throw wsrep::not_implemented_error(); } int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) diff --git a/include/wsrep/storage_service.hpp b/include/wsrep/storage_service.hpp index f4e68a1..46dece0 100644 --- a/include/wsrep/storage_service.hpp +++ b/include/wsrep/storage_service.hpp @@ -55,6 +55,12 @@ namespace wsrep */ virtual int update_fragment_meta(const wsrep::ws_meta&) = 0; + /** + * Remove fragments from storage. The storage service must have + * adopted a transaction prior this call. + */ + virtual int remove_fragments() = 0; + /** * Commit the transaction. */ diff --git a/src/transaction.cpp b/src/transaction.cpp index d708fe3..318101f 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -608,6 +608,24 @@ int wsrep::transaction::before_rollback() assert(0); break; } + + if (is_streaming() && bf_aborted_in_total_order_) + { + lock.unlock(); + scoped_storage_service + sr_scope( + client_service_, + server_service_.storage_service(client_service_), + storage_service_deleter(server_service_)); + wsrep::storage_service& storage_service( + sr_scope.storage_service()); + storage_service.adopt_transaction(*this); + storage_service.remove_fragments(); + storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta()); + lock.lock(); + streaming_context_.cleanup(); + } + debug_log_state("before_rollback_leave"); return 0; } @@ -1156,6 +1174,7 @@ int wsrep::transaction::certify_fragment( streaming_rollback(); } client_state_.override_error(wsrep::e_deadlock_error, cert_ret); + ret = 1; } else { @@ -1366,6 +1385,7 @@ void wsrep::transaction::streaming_rollback() if (bf_aborted_in_total_order_) { client_state_.server_state_.stop_streaming_client(&client_state_); + streaming_context_.rolled_back(id_); } else { @@ -1378,8 +1398,6 @@ void wsrep::transaction::streaming_rollback() &client_state_); streaming_context_.cleanup(); - // Send a rollback fragment only if it was not sent before - // for this transaction. streaming_context_.rolled_back(id_); enum wsrep::provider::status ret; if ((ret = provider().rollback(id_))) diff --git a/test/mock_storage_service.hpp b/test/mock_storage_service.hpp index f00518f..f798770 100644 --- a/test/mock_storage_service.hpp +++ b/test/mock_storage_service.hpp @@ -31,7 +31,7 @@ class mock_server_state; int update_fragment_meta(const wsrep::ws_meta&) WSREP_OVERRIDE { return 0; } - + int remove_fragments() WSREP_OVERRIDE { return 0; } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE;