diff --git a/dbsim/db_client_service.hpp b/dbsim/db_client_service.hpp index 8b4cb53..6cf3729 100644 --- a/dbsim/db_client_service.hpp +++ b/dbsim/db_client_service.hpp @@ -48,6 +48,10 @@ namespace db { return false; } + bool is_xa_prepare () const override + { + return false; + } size_t bytes_generated() const override { return 0; diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp index faa1571..18e3475 100644 --- a/dbsim/db_high_priority_service.cpp +++ b/dbsim/db_high_priority_service.cpp @@ -35,6 +35,11 @@ int db::high_priority_service::start_transaction( return client_.client_state().start_transaction(ws_handle, ws_meta); } +int db::high_priority_service::next_fragment(const wsrep::ws_meta& ws_meta) +{ + return client_.client_state().next_fragment(ws_meta); +} + const wsrep::transaction& db::high_priority_service::transaction() const { return client_.client_state().transaction(); diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index 8f16234..20999b6 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -32,6 +32,7 @@ namespace db high_priority_service(db::server& server, db::client& client); int start_transaction(const wsrep::ws_handle&, const wsrep::ws_meta&) override; + int next_fragment(const wsrep::ws_meta&) override; const wsrep::transaction& transaction() const override; int adopt_transaction(const wsrep::transaction&) override; int apply_write_set(const wsrep::ws_meta&, diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index a4637e1..f7f1c7c 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -78,6 +78,11 @@ namespace wsrep */ virtual bool is_xa() const = 0; + /** + * Return true if the current statement is XA PREPARE + */ + virtual bool is_xa_prepare() const = 0; + // // Streaming // diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index c293a77..6fad7eb 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -436,6 +436,14 @@ namespace wsrep return transaction_.start_transaction(wsh, meta); } + int next_fragment(const wsrep::ws_meta& meta) + { + wsrep::unique_lock lock(mutex_); + assert(current_thread_id_ == wsrep::this_thread::get_id()); + assert(mode_ == m_high_priority); + return transaction_.next_fragment(meta); + } + /** @name Commit ordering interface */ /** @{ */ int before_prepare() diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index f48f87f..6ef306b 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -51,6 +51,11 @@ namespace wsrep virtual int start_transaction(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0; + /** + * Start the next fragment of current transaction + */ + virtual int next_fragment(const wsrep::ws_meta&) = 0; + /** * Return transaction object associated to high priority * service state. diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 0a7aff8..e04f48d 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -126,6 +126,11 @@ namespace wsrep return client_service_.is_xa(); } + bool is_xa_prepare() const + { + return client_service_.is_xa_prepare(); + } + bool pa_unsafe() const { return pa_unsafe_; } void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; } @@ -137,6 +142,8 @@ namespace wsrep int start_transaction(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta); + int next_fragment(const wsrep::ws_meta& ws_meta); + void adopt(const transaction& transaction); void fragment_applied(wsrep::seqno seqno); diff --git a/src/server_state.cpp b/src/server_state.cpp index 4b94cfa..9f4b29e 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -359,7 +359,7 @@ static int apply_write_set(wsrep::server_state& server_state, ws_meta, data); } - else if (ws_meta.flags() == 0) + else if (ws_meta.flags() == 0 || wsrep::prepares_transaction(ws_meta.flags())) { wsrep::high_priority_service* sa( server_state.find_streaming_applier( @@ -380,8 +380,8 @@ static int apply_write_set(wsrep::server_state& server_state, } else { - ret = apply_fragment(server_state, - high_priority_service, + sa->next_fragment(ws_meta); + ret = apply_fragment(high_priority_service, sa, ws_handle, ws_meta, @@ -421,6 +421,7 @@ static int apply_write_set(wsrep::server_state& server_state, else { // Commit fragment consumes sa + sa->next_fragment(ws_meta); ret = commit_fragment(server_state, high_priority_service, sa, diff --git a/src/transaction.cpp b/src/transaction.cpp index 520274e..25a4d82 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -176,6 +176,16 @@ int wsrep::transaction::start_transaction( return 0; } +int wsrep::transaction::next_fragment( + const wsrep::ws_meta& ws_meta) +{ + debug_log_state("next_fragment enter"); + ws_meta_ = ws_meta; + debug_log_state("next_fragment leave"); + return 0; +} + + void wsrep::transaction::adopt(const wsrep::transaction& transaction) { debug_log_state("adopt enter"); @@ -1151,12 +1161,12 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock& lock) break; } - if (streaming_context_.fragment_size_exceeded()) + if (streaming_context_.fragment_size_exceeded() || client_service_.is_xa_prepare()) { // Some statements have no effect. Do not atttempt to // replicate a fragment if no data has been generated - // since last fragment replication. - if (bytes_to_replicate <= 0) + // since last fragment replication, and statement is not XA PREPARE. + if (bytes_to_replicate <= 0 && !client_service_.is_xa_prepare()) { assert(bytes_to_replicate == 0); return ret; @@ -1414,6 +1424,7 @@ int wsrep::transaction::certify_commit( } flags(flags() | wsrep::provider::flag::commit); + flags(flags() & ~wsrep::provider::flag::prepare); if (client_service_.prepare_data_for_replication()) { diff --git a/test/mock_client_state.hpp b/test/mock_client_state.hpp index ff070e1..7e8308e 100644 --- a/test/mock_client_state.hpp +++ b/test/mock_client_state.hpp @@ -134,6 +134,11 @@ namespace wsrep return false; } + bool is_xa_prepare() const WSREP_OVERRIDE + { + return false; + } + size_t bytes_generated() const WSREP_OVERRIDE { return bytes_generated_; diff --git a/test/mock_high_priority_service.cpp b/test/mock_high_priority_service.cpp index ee6f82c..1e33fba 100644 --- a/test/mock_high_priority_service.cpp +++ b/test/mock_high_priority_service.cpp @@ -27,6 +27,12 @@ int wsrep::mock_high_priority_service::start_transaction( return client_state_->start_transaction(ws_handle, ws_meta); } +int wsrep::mock_high_priority_service::next_fragment( + const wsrep::ws_meta& ws_meta) +{ + return client_state_->next_fragment(ws_meta); +} + int wsrep::mock_high_priority_service::adopt_transaction( const wsrep::transaction& transaction) { diff --git a/test/mock_high_priority_service.hpp b/test/mock_high_priority_service.hpp index c93a6a6..3fd9c34 100644 --- a/test/mock_high_priority_service.hpp +++ b/test/mock_high_priority_service.hpp @@ -43,6 +43,8 @@ namespace wsrep int start_transaction(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE; + int next_fragment(const wsrep::ws_meta&) WSREP_OVERRIDE; + const wsrep::transaction& transaction() const WSREP_OVERRIDE { return client_state_->transaction(); } int adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE;