From 6aa6b6f50ab2fe2ba2b56702e3ab8cee9900f00d Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Mon, 9 Jul 2018 13:02:13 +0300 Subject: [PATCH] Provider write set handle and meta data for high priority commit The write set handle and meta data are needed for SR transactions where the commit context is not known when the transaction starts. The passed handle and meta data can be set through client_state prepare_for_ordering() call before performing commit. --- dbsim/db_high_priority_service.cpp | 3 ++- dbsim/db_high_priority_service.hpp | 2 +- include/wsrep/client_state.hpp | 19 +++++++++++++------ include/wsrep/high_priority_service.hpp | 15 +++++++++++++-- src/server_state.cpp | 6 +++--- src/transaction.cpp | 22 +++++++++++++++++++++- test/mock_high_priority_service.cpp | 3 ++- test/mock_high_priority_service.hpp | 3 ++- test/mock_storage_service.cpp | 4 ++-- 9 files changed, 59 insertions(+), 18 deletions(-) diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp index 7ed59dc..6eef39d 100644 --- a/dbsim/db_high_priority_service.cpp +++ b/dbsim/db_high_priority_service.cpp @@ -39,7 +39,8 @@ int db::high_priority_service::apply_toi( throw wsrep::not_implemented_error(); } -int db::high_priority_service::commit() +int db::high_priority_service::commit(const wsrep::ws_handle&, + const wsrep::ws_meta&) { int ret(client_.client_state_.before_commit()); if (ret == 0) client_.se_trx_.commit(); diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index 21f368d..3e4e7ad 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -22,7 +22,7 @@ namespace db int append_fragment(const wsrep::ws_meta&, const wsrep::const_buffer&) override { return 0; } - int commit() override; + int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override; int rollback() override; int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&) override; void after_apply() override; diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index f7b3fcd..402d112 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -313,13 +313,20 @@ namespace wsrep void disable_streaming(); /** - * Prepare write set meta data for fragment storage ordering. - * This method should be called from storage service commit - * or rollback before performing the operation. + * Prepare write set meta data for ordering. + * This method should be called before ordered commit or + * rollback if the commit time meta data was not known + * at the time of the start of the transaction. + * This mostly applies to streaming replication. + * + * @param ws_handle Write set handle + * @param ws_meta Write set meta data + * @param is_commit Boolean to denote whether the operation + * is commit or rollback. */ - int prepare_for_fragment_ordering(const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta, - bool is_commit) + int prepare_for_ordering(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + bool is_commit) { assert(state_ == s_exec); return transaction_.prepare_for_fragment_ordering( diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index 7a05a1d..1bb1640 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -59,7 +59,7 @@ namespace wsrep /** * Commit a transaction. */ - virtual int commit() = 0; + virtual int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0; /** * Roll back a transaction @@ -98,7 +98,18 @@ namespace wsrep virtual void switch_execution_context( wsrep::high_priority_service& orig_hps) = 0; - virtual int log_dummy_write_set(const ws_handle&, const ws_meta&) = 0; + /** + * Log a dummy write set which is either SR transaction fragment + * or roll back fragment. The implementation must release + * commit order inside the call. + * + * @params ws_handle Write set handle + * @params ws_meta Write set meta data + * + * @return Zero in case of success, non-zero on failure + */ + virtual int log_dummy_write_set(const ws_handle& ws_handle, + const ws_meta& ws_meta) = 0; virtual bool is_replaying() const = 0; diff --git a/src/server_state.cpp b/src/server_state.cpp index b0d75a3..86078e9 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -46,7 +46,7 @@ namespace { ret = 1; } - else if (high_priority_service.commit()) + else if (high_priority_service.commit(ws_handle, ws_meta)) { ret = 1; } @@ -103,7 +103,7 @@ namespace if (high_priority_service.is_replaying()) { ret = high_priority_service.apply_write_set(data) || - high_priority_service.commit(); + high_priority_service.commit(ws_handle, ws_meta); } else { @@ -129,7 +129,7 @@ namespace { wsrep::high_priority_switch sw( high_priority_service, *sa); - ret = sa->commit(); + ret = sa->commit(ws_handle, ws_meta); sa->after_apply(); } server_state.stop_streaming_applier( diff --git a/src/transaction.cpp b/src/transaction.cpp index 5886052..32a28af 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -134,9 +134,10 @@ int wsrep::transaction::start_transaction( const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) { + debug_log_state("start_transaction enter"); if (state() != s_replaying) { - assert(ws_meta.flags()); + // assert(ws_meta.flags()); assert(active() == false); id_ = ws_meta.transaction_id(); assert(client_state_.mode() == wsrep::client_state::m_high_priority); @@ -150,6 +151,7 @@ int wsrep::transaction::start_transaction( { start_replaying(ws_meta); } + debug_log_state("start_transaction leave"); return 0; } @@ -553,6 +555,7 @@ int wsrep::transaction::after_statement() int ret(0); wsrep::unique_lock lock(client_state_.mutex()); debug_log_state("after_statement_enter"); + assert(client_state_.mode() == wsrep::client_state::m_local); assert(state() == s_executing || state() == s_committed || state() == s_aborted || @@ -669,6 +672,23 @@ void wsrep::transaction::after_applying() assert(state_ == s_executing || state_ == s_committed || state_ == s_aborted); + + // We may enter here from either high priority applier or + // from fragment storage service. High priority applier + // should always have set up meta data for ordering, but + // fragment storage service operation may be rolled back + // before the fragment is ordered and certified. + // Therefore we need to check separately if the ordering has + // been done. + if (state_ == s_aborted && ordered()) + { + int ret(provider().commit_order_enter(ws_handle_, ws_meta_)); + if (ret == 0) + { + provider().commit_order_leave(ws_handle_, ws_meta_); + } + } + if (state_ != s_executing) { cleanup(); diff --git a/test/mock_high_priority_service.cpp b/test/mock_high_priority_service.cpp index 14bf95d..57d131a 100644 --- a/test/mock_high_priority_service.cpp +++ b/test/mock_high_priority_service.cpp @@ -34,7 +34,8 @@ int wsrep::mock_high_priority_service::apply_write_set( return (fail_next_applying_ ? 1 : 0); } -int wsrep::mock_high_priority_service::commit() +int wsrep::mock_high_priority_service::commit(const wsrep::ws_handle&, + const wsrep::ws_meta&) { int ret(0); diff --git a/test/mock_high_priority_service.hpp b/test/mock_high_priority_service.hpp index 525e7b0..b442967 100644 --- a/test/mock_high_priority_service.hpp +++ b/test/mock_high_priority_service.hpp @@ -36,7 +36,8 @@ namespace wsrep int append_fragment(const wsrep::ws_meta&, const wsrep::const_buffer&) WSREP_OVERRIDE { return 0; } - int commit() WSREP_OVERRIDE; + int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) + WSREP_OVERRIDE; int rollback() WSREP_OVERRIDE; int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&) WSREP_OVERRIDE; diff --git a/test/mock_storage_service.cpp b/test/mock_storage_service.cpp index a2325c0..746b469 100644 --- a/test/mock_storage_service.cpp +++ b/test/mock_storage_service.cpp @@ -36,7 +36,7 @@ int wsrep::mock_storage_service::start_transaction(const wsrep::ws_handle& ws_ha int wsrep::mock_storage_service::commit(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) { - int ret(client_state_.prepare_for_fragment_ordering( + int ret(client_state_.prepare_for_ordering( ws_handle, ws_meta, true) || client_state_.before_commit() || client_state_.ordered_commit() || @@ -48,7 +48,7 @@ int wsrep::mock_storage_service::commit(const wsrep::ws_handle& ws_handle, int wsrep::mock_storage_service::rollback(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) { - int ret(client_state_.prepare_for_fragment_ordering( + int ret(client_state_.prepare_for_ordering( ws_handle, ws_meta, false) || client_state_.before_rollback() || client_state_.after_rollback());