diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp index 6eef39d..f260f52 100644 --- a/dbsim/db_high_priority_service.cpp +++ b/dbsim/db_high_priority_service.cpp @@ -25,7 +25,9 @@ void db::high_priority_service::adopt_transaction(const wsrep::transaction&) throw wsrep::not_implemented_error(); } -int db::high_priority_service::apply_write_set(const wsrep::const_buffer&) +int db::high_priority_service::apply_write_set( + const wsrep::ws_meta&, + const wsrep::const_buffer&) { client_.se_trx_.start(&client_); client_.se_trx_.apply(client_.client_state().transaction()); diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index 3e4e7ad..f0479ad 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -18,10 +18,15 @@ namespace db int start_transaction(const wsrep::ws_handle&, const wsrep::ws_meta&) override; void adopt_transaction(const wsrep::transaction&) override; - int apply_write_set(const wsrep::const_buffer&) override; - int append_fragment(const wsrep::ws_meta&, const wsrep::const_buffer&) + int apply_write_set(const wsrep::ws_meta&, + const wsrep::const_buffer&) override; + int append_fragment_and_commit( + const wsrep::ws_handle&, + const wsrep::ws_meta&, const wsrep::const_buffer&) override { return 0; } + int remove_fragments(const wsrep::ws_meta&) override + { return 0; } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override; int rollback() override; int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&) override; diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 8119cc9..dcac518 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -312,6 +312,12 @@ namespace wsrep */ void disable_streaming(); + void fragment_applied(wsrep::seqno seqno) + { + assert(mode_ == m_high_priority); + transaction_.fragment_applied(seqno); + } + /** * Prepare write set meta data for ordering. * This method should be called before ordered commit or diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index 1bb1640..e8d04c8 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -48,19 +48,43 @@ namespace wsrep * as a part of the transaction. The caller must start a * new transaction before applying a write set and must * either commit to make changes persistent or roll back. - */ - virtual int apply_write_set(const wsrep::const_buffer&) = 0; - - /** * */ - virtual int append_fragment(const wsrep::ws_meta&, - const wsrep::const_buffer& data) = 0; + virtual int apply_write_set(const wsrep::ws_meta&, + const wsrep::const_buffer&) = 0; + + /** + * Append a fragment into fragment storage. This will be + * called after a non-committing fragment belonging to + * streaming transaction has been applied. The call will + * not be done within an open transaction, the implementation + * must start a new transaction and commit. + * + * Note that the call is not done from streaming transaction + * context, but from applier context. + */ + virtual int append_fragment_and_commit( + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) = 0; + + /** + * Remove fragments belonging to streaming transaction. + * This method will be called within the streaming transaction + * before commit. The implementation must not commit the + * whole transaction. The call will be done from streaming + * transaction context. + * + * @param ws_meta Write set meta data for commit fragment. + * + * @return Zero on success, non-zero on failure. + */ + virtual int remove_fragments(const wsrep::ws_meta& ws_meta) = 0; + /** * Commit a transaction. */ virtual int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0; - /** * Roll back a transaction */ diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp index ef51a28..c04fe26 100644 --- a/include/wsrep/streaming_context.hpp +++ b/include/wsrep/streaming_context.hpp @@ -52,6 +52,11 @@ namespace wsrep bytes_certified_ += bytes; } + void applied(wsrep::seqno seqno) + { + fragments_.push_back(seqno); + } + size_t fragments_certified() const { return fragments_.size(); diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 5ce5579..2af1d3c 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -87,6 +87,8 @@ namespace wsrep int start_transaction(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta); + void fragment_applied(wsrep::seqno seqno); + int prepare_for_ordering(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, bool is_commit); diff --git a/src/server_state.cpp b/src/server_state.cpp index 86078e9..fd01627 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -42,7 +42,7 @@ namespace { ret = 1; } - else if (high_priority_service.apply_write_set(data)) + else if (high_priority_service.apply_write_set(ws_meta, data)) { ret = 1; } @@ -68,10 +68,11 @@ namespace sa->start_transaction(ws_handle, ws_meta); { wsrep::high_priority_switch sw(high_priority_service, *sa); - sa->apply_write_set(data); + sa->apply_write_set(ws_meta, data); sa->after_apply(); } - high_priority_service.log_dummy_write_set(ws_handle, ws_meta); + high_priority_service.append_fragment_and_commit(ws_handle, ws_meta, data); + high_priority_service.after_apply(); } else if (ws_meta.flags() == 0) { @@ -92,17 +93,22 @@ namespace else { wsrep::high_priority_switch sw(high_priority_service, *sa); - ret = sa->apply_write_set(data); + ret = sa->apply_write_set(ws_meta, data); sa->after_apply(); } - high_priority_service.log_dummy_write_set( - ws_handle, ws_meta); + ret = ret || high_priority_service.append_fragment_and_commit( + ws_handle, ws_meta, data); + if (ret) + { + high_priority_service.rollback(); + } + high_priority_service.after_apply(); } else if (wsrep::commits_transaction(ws_meta.flags())) { if (high_priority_service.is_replaying()) { - ret = high_priority_service.apply_write_set(data) || + ret = high_priority_service.apply_write_set(ws_meta, data) || high_priority_service.commit(ws_handle, ws_meta); } else @@ -129,6 +135,7 @@ namespace { wsrep::high_priority_switch sw( high_priority_service, *sa); + sa->remove_fragments(ws_meta); ret = sa->commit(ws_handle, ws_meta); sa->after_apply(); } diff --git a/src/transaction.cpp b/src/transaction.cpp index d08ca4a..f460579 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -155,6 +155,12 @@ int wsrep::transaction::start_transaction( return 0; } +void wsrep::transaction::fragment_applied(wsrep::seqno seqno) +{ + assert(active()); + streaming_context_.applied(seqno); +} + int wsrep::transaction::prepare_for_ordering( const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, @@ -271,10 +277,8 @@ int wsrep::transaction::before_prepare( } break; case wsrep::client_state::m_high_priority: - if (is_streaming()) - { - client_service_.remove_fragments(); - } + // Note: fragment removal is done from applying + // context for high priority mode. break; default: assert(0); @@ -909,8 +913,6 @@ int wsrep::transaction::certify_fragment( ret = 1; break; } - wsrep::log_info() << "Committing " - << sr_ws_meta.transaction_id().get(); if (storage_service.commit(ws_handle_, sr_ws_meta)) { ret = 1; diff --git a/test/mock_high_priority_service.cpp b/test/mock_high_priority_service.cpp index 57d131a..ffcce0d 100644 --- a/test/mock_high_priority_service.cpp +++ b/test/mock_high_priority_service.cpp @@ -26,6 +26,7 @@ void wsrep::mock_high_priority_service::adopt_transaction( } int wsrep::mock_high_priority_service::apply_write_set( + const wsrep::ws_meta&, const wsrep::const_buffer&) { assert(client_state_->toi_meta().seqno().is_undefined()); diff --git a/test/mock_high_priority_service.hpp b/test/mock_high_priority_service.hpp index b442967..e0d2887 100644 --- a/test/mock_high_priority_service.hpp +++ b/test/mock_high_priority_service.hpp @@ -32,9 +32,14 @@ namespace wsrep WSREP_OVERRIDE; void adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE; - int apply_write_set(const wsrep::const_buffer&) WSREP_OVERRIDE; - int append_fragment(const wsrep::ws_meta&, - const wsrep::const_buffer&) WSREP_OVERRIDE + int apply_write_set(const wsrep::ws_meta&, + const wsrep::const_buffer&) WSREP_OVERRIDE; + int append_fragment_and_commit( + const wsrep::ws_handle&, + const wsrep::ws_meta&, + const wsrep::const_buffer&) WSREP_OVERRIDE + { return 0; } + int remove_fragments(const wsrep::ws_meta&) WSREP_OVERRIDE { return 0; } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE;