diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp index 7ed59dc..6eef39d 100644 --- a/dbsim/db_high_priority_service.cpp +++ b/dbsim/db_high_priority_service.cpp @@ -39,7 +39,8 @@ int db::high_priority_service::apply_toi( throw wsrep::not_implemented_error(); } -int db::high_priority_service::commit() +int db::high_priority_service::commit(const wsrep::ws_handle&, + const wsrep::ws_meta&) { int ret(client_.client_state_.before_commit()); if (ret == 0) client_.se_trx_.commit(); diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index 21f368d..3e4e7ad 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -22,7 +22,7 @@ namespace db int append_fragment(const wsrep::ws_meta&, const wsrep::const_buffer&) override { return 0; } - int commit() override; + int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override; int rollback() override; int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&) override; void after_apply() override; diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index f7b3fcd..402d112 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -313,13 +313,20 @@ namespace wsrep void disable_streaming(); /** - * Prepare write set meta data for fragment storage ordering. - * This method should be called from storage service commit - * or rollback before performing the operation. + * Prepare write set meta data for ordering. + * This method should be called before ordered commit or + * rollback if the commit time meta data was not known + * at the time of the start of the transaction. + * This mostly applies to streaming replication. + * + * @param ws_handle Write set handle + * @param ws_meta Write set meta data + * @param is_commit Boolean to denote whether the operation + * is commit or rollback. */ - int prepare_for_fragment_ordering(const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta, - bool is_commit) + int prepare_for_ordering(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + bool is_commit) { assert(state_ == s_exec); return transaction_.prepare_for_fragment_ordering( diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index 7a05a1d..1bb1640 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -59,7 +59,7 @@ namespace wsrep /** * Commit a transaction. */ - virtual int commit() = 0; + virtual int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0; /** * Roll back a transaction @@ -98,7 +98,18 @@ namespace wsrep virtual void switch_execution_context( wsrep::high_priority_service& orig_hps) = 0; - virtual int log_dummy_write_set(const ws_handle&, const ws_meta&) = 0; + /** + * Log a dummy write set which is either SR transaction fragment + * or roll back fragment. The implementation must release + * commit order inside the call. + * + * @params ws_handle Write set handle + * @params ws_meta Write set meta data + * + * @return Zero in case of success, non-zero on failure + */ + virtual int log_dummy_write_set(const ws_handle& ws_handle, + const ws_meta& ws_meta) = 0; virtual bool is_replaying() const = 0; diff --git a/src/server_state.cpp b/src/server_state.cpp index b0d75a3..86078e9 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -46,7 +46,7 @@ namespace { ret = 1; } - else if (high_priority_service.commit()) + else if (high_priority_service.commit(ws_handle, ws_meta)) { ret = 1; } @@ -103,7 +103,7 @@ namespace if (high_priority_service.is_replaying()) { ret = high_priority_service.apply_write_set(data) || - high_priority_service.commit(); + high_priority_service.commit(ws_handle, ws_meta); } else { @@ -129,7 +129,7 @@ namespace { wsrep::high_priority_switch sw( high_priority_service, *sa); - ret = sa->commit(); + ret = sa->commit(ws_handle, ws_meta); sa->after_apply(); } server_state.stop_streaming_applier( diff --git a/src/transaction.cpp b/src/transaction.cpp index 5886052..32a28af 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -134,9 +134,10 @@ int wsrep::transaction::start_transaction( const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) { + debug_log_state("start_transaction enter"); if (state() != s_replaying) { - assert(ws_meta.flags()); + // assert(ws_meta.flags()); assert(active() == false); id_ = ws_meta.transaction_id(); assert(client_state_.mode() == wsrep::client_state::m_high_priority); @@ -150,6 +151,7 @@ int wsrep::transaction::start_transaction( { start_replaying(ws_meta); } + debug_log_state("start_transaction leave"); return 0; } @@ -553,6 +555,7 @@ int wsrep::transaction::after_statement() int ret(0); wsrep::unique_lock lock(client_state_.mutex()); debug_log_state("after_statement_enter"); + assert(client_state_.mode() == wsrep::client_state::m_local); assert(state() == s_executing || state() == s_committed || state() == s_aborted || @@ -669,6 +672,23 @@ void wsrep::transaction::after_applying() assert(state_ == s_executing || state_ == s_committed || state_ == s_aborted); + + // We may enter here from either high priority applier or + // from fragment storage service. High priority applier + // should always have set up meta data for ordering, but + // fragment storage service operation may be rolled back + // before the fragment is ordered and certified. + // Therefore we need to check separately if the ordering has + // been done. + if (state_ == s_aborted && ordered()) + { + int ret(provider().commit_order_enter(ws_handle_, ws_meta_)); + if (ret == 0) + { + provider().commit_order_leave(ws_handle_, ws_meta_); + } + } + if (state_ != s_executing) { cleanup(); diff --git a/test/mock_high_priority_service.cpp b/test/mock_high_priority_service.cpp index 14bf95d..57d131a 100644 --- a/test/mock_high_priority_service.cpp +++ b/test/mock_high_priority_service.cpp @@ -34,7 +34,8 @@ int wsrep::mock_high_priority_service::apply_write_set( return (fail_next_applying_ ? 1 : 0); } -int wsrep::mock_high_priority_service::commit() +int wsrep::mock_high_priority_service::commit(const wsrep::ws_handle&, + const wsrep::ws_meta&) { int ret(0); diff --git a/test/mock_high_priority_service.hpp b/test/mock_high_priority_service.hpp index 525e7b0..b442967 100644 --- a/test/mock_high_priority_service.hpp +++ b/test/mock_high_priority_service.hpp @@ -36,7 +36,8 @@ namespace wsrep int append_fragment(const wsrep::ws_meta&, const wsrep::const_buffer&) WSREP_OVERRIDE { return 0; } - int commit() WSREP_OVERRIDE; + int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) + WSREP_OVERRIDE; int rollback() WSREP_OVERRIDE; int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&) WSREP_OVERRIDE; diff --git a/test/mock_storage_service.cpp b/test/mock_storage_service.cpp index a2325c0..746b469 100644 --- a/test/mock_storage_service.cpp +++ b/test/mock_storage_service.cpp @@ -36,7 +36,7 @@ int wsrep::mock_storage_service::start_transaction(const wsrep::ws_handle& ws_ha int wsrep::mock_storage_service::commit(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) { - int ret(client_state_.prepare_for_fragment_ordering( + int ret(client_state_.prepare_for_ordering( ws_handle, ws_meta, true) || client_state_.before_commit() || client_state_.ordered_commit() || @@ -48,7 +48,7 @@ int wsrep::mock_storage_service::commit(const wsrep::ws_handle& ws_handle, int wsrep::mock_storage_service::rollback(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) { - int ret(client_state_.prepare_for_fragment_ordering( + int ret(client_state_.prepare_for_ordering( ws_handle, ws_meta, false) || client_state_.before_rollback() || client_state_.after_rollback());