diff --git a/include/wsrep/client_context.hpp b/include/wsrep/client_context.hpp index fb7dc64..6354bcb 100644 --- a/include/wsrep/client_context.hpp +++ b/include/wsrep/client_context.hpp @@ -229,6 +229,13 @@ namespace wsrep return transaction_.start_transaction(wsh, meta); } + void enable_streaming( + enum wsrep::transaction_context::streaming_context::fragment_unit fragment_unit, + size_t fragment_size) + { + transaction_.streaming_context_.enable( + fragment_unit, fragment_size); + } int append_key(const wsrep::key& key) { assert(state_ == s_exec); @@ -240,6 +247,13 @@ namespace wsrep assert(state_ == s_exec); return transaction_.append_data(data); } + + + int after_row() + { + assert(state_ == s_exec); + return transaction_.after_row(); + } int before_prepare() { assert(state_ == s_exec); diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index 15ce517..3e736cc 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -129,6 +129,7 @@ namespace wsrep int flags_; }; + // Abstract interface for provider implementations class provider { @@ -284,6 +285,22 @@ namespace wsrep return ret; } + static inline bool starts_transaction(int flags) + { + return (flags & wsrep::provider::flag::start_transaction); + } + + static inline bool commits_transaction(int flags) + { + return (flags & wsrep::provider::flag::commit); + } + + static inline bool rolls_back_transaction(int flags) + { + return (flags & wsrep::provider::flag::rollback); + } + + } #endif // WSREP_PROVIDER_HPP diff --git a/include/wsrep/transaction_context.hpp b/include/wsrep/transaction_context.hpp index b911841..38f3f67 100644 --- a/include/wsrep/transaction_context.hpp +++ b/include/wsrep/transaction_context.hpp @@ -150,8 +150,79 @@ namespace wsrep bool pa_unsafe_; bool certified_; - std::vector fragments_; - wsrep::transaction_id rollback_replicated_for_; + public: + class streaming_context + { + public: + enum fragment_unit + { + row, + bytes, + statement + }; + + streaming_context() + : fragments_() + , rollback_replicated_for_() + , fragment_unit_() + , fragment_size_() + , bytes_certified_() + , unit_counter_() + { } + + void enable(enum fragment_unit fragment_unit, size_t fragment_size) + { + assert(fragment_size > 0); + fragment_unit_ = fragment_unit; + fragment_size_ = fragment_size; + } + + enum fragment_unit fragment_unit() const { return fragment_unit_; } + + size_t fragment_size() const { return fragment_size_; } + + void disable() + { + fragment_size_ = 0; + } + + void certified(wsrep::seqno seqno) + { + fragments_.push_back(seqno); + } + + size_t fragments_certified() const + { + return fragments_.size(); + } + + size_t bytes_certified() const + { + return bytes_certified_; + } + + void rolled_back(wsrep::transaction_id id) + { + assert(rollback_replicated_for_ == wsrep::transaction_id::invalid()); + rollback_replicated_for_ = id; + } + + bool rolled_back() const + { + return (rollback_replicated_for_ != + wsrep::transaction_id::invalid()); + } + + size_t unit_counter() const { return unit_counter_; } + void increment_unit_counter() { ++unit_counter_; } + private: + std::vector fragments_; + wsrep::transaction_id rollback_replicated_for_; + enum fragment_unit fragment_unit_; + size_t fragment_size_; + size_t bytes_certified_; + size_t unit_counter_; + } streaming_context_; }; static inline std::string to_string(enum wsrep::transaction_context::state state) diff --git a/src/mock_provider.hpp b/src/mock_provider.hpp index 7817cd6..b32bb20 100644 --- a/src/mock_provider.hpp +++ b/src/mock_provider.hpp @@ -6,6 +6,7 @@ #define WSREP_MOCK_PROVIDER_HPP #include "wsrep/provider.hpp" +#include "wsrep/logger.hpp" #include #include @@ -25,6 +26,10 @@ namespace wsrep , group_seqno_(0) , bf_abort_map_() , next_error_(wsrep::provider::success) + , start_fragments_() + , fragments_() + , commit_fragments_() + , rollback_fragments_() { } int connect(const std::string&, const std::string&, const std::string&, @@ -43,15 +48,28 @@ namespace wsrep int flags, wsrep::ws_meta& ws_meta) { - assert(flags & wsrep::provider::flag::start_transaction); + wsrep::log_info() << "provider certify: " + << "client: " << client_id.get() + << " flags: " << std::hex << flags; if (next_error_) { return next_error_; } - if ((flags & wsrep::provider::flag::commit) == 0) + + ++fragments_; + if (starts_transaction(flags)) { - return wsrep::provider::error_provider_failed; + ++start_fragments_; } + if (commits_transaction(flags)) + { + ++commit_fragments_; + } + if (rolls_back_transaction(flags)) + { + ++rollback_fragments_; + } + wsrep::stid stid(server_id_, ws_handle.transaction_id(), client_id); @@ -145,12 +163,21 @@ namespace wsrep { next_error_ = error; } + size_t start_fragments() const { return start_fragments_; } + size_t fragments() const { return fragments_; } + size_t commit_fragments() const { return commit_fragments_; } + size_t rollback_fragments() const { return commit_fragments_; } + private: wsrep::id group_id_; wsrep::id server_id_; long long group_seqno_; bf_abort_map bf_abort_map_; enum wsrep::provider::status next_error_; + size_t start_fragments_; + size_t fragments_; + size_t commit_fragments_; + size_t rollback_fragments_; }; } diff --git a/src/server_context.cpp b/src/server_context.cpp index 3b35124..58bb699 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -14,20 +14,6 @@ namespace { - inline bool starts_transaction(const wsrep::ws_meta& ws_meta) - { - return (ws_meta.flags() & wsrep::provider::flag::start_transaction); - } - - inline bool commits_transaction(const wsrep::ws_meta& ws_meta) - { - return (ws_meta.flags() & wsrep::provider::flag::commit); - } - - inline bool rolls_back_transaction(const wsrep::ws_meta& ws_meta) - { - return (ws_meta.flags() & wsrep::provider::flag::rollback); - } } @@ -139,7 +125,8 @@ int wsrep::server_context::on_apply( bool not_replaying(txc.state() != wsrep::transaction_context::s_replaying); - if (starts_transaction(ws_meta) && commits_transaction(ws_meta)) + if (starts_transaction(ws_meta.flags()) && + commits_transaction(ws_meta.flags())) { if (not_replaying) { diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index fde4a80..3ea780c 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -36,8 +36,7 @@ wsrep::transaction_context::transaction_context( , flags_() , pa_unsafe_(false) , certified_(false) - , fragments_() - , rollback_replicated_for_(false) + , streaming_context_() { } @@ -94,6 +93,29 @@ int wsrep::transaction_context::append_data(const wsrep::data& data) return provider_.append_data(ws_handle_, data); } +int wsrep::transaction_context::after_row() +{ + wsrep::unique_lock lock(client_context_.mutex()); + if (streaming_context_.fragment_size() > 0) + { + switch (streaming_context_.fragment_unit()) + { + case streaming_context::row: + streaming_context_.increment_unit_counter(); + if (streaming_context_.fragments_certified() + + streaming_context_.unit_counter() >= + streaming_context_.fragment_size()) + { + return certify_fragment(lock); + } + break; + default: + assert(0); + } + } + return 0; +} + int wsrep::transaction_context::before_prepare() { int ret(0); @@ -585,7 +607,7 @@ void wsrep::transaction_context::state( { /* ex pr ce co oc ct cf ma ab ad mr re */ { 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */ { 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0}, /* pr */ - { 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */ + { 1, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */ { 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */ { 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ct */ @@ -612,16 +634,13 @@ void wsrep::transaction_context::state( } } -#if 0 int wsrep::transaction_context::certify_fragment( wsrep::unique_lock& lock) { - // This method is not fully implemented and tested yet. - throw wsrep::not_implemented_error(); assert(lock.owns_lock()); assert(client_context_.mode() == wsrep::client_context::m_replicating); - assert(rollback_replicated_for_ != id_); + assert(streaming_context_.rolled_back() == false); client_context_.wait_for_replayers(lock); if (state() == s_must_abort) @@ -634,10 +653,9 @@ int wsrep::transaction_context::certify_fragment( lock.unlock(); - uint32_t flags(0); - if (fragments_.empty()) + if (streaming_context_.fragments_certified()) { - flags |= wsrep::provider::flag::start_transaction; + flags_ |= wsrep::provider::flag::start_transaction; } wsrep::data data; @@ -656,8 +674,14 @@ int wsrep::transaction_context::certify_fragment( wsrep::client_context_switch client_context_switch( client_context_, *sr_client_context); - wsrep::transaction_context sr_transaction_context(*sr_client_context); - if (sr_client_context->append_fragment(sr_transaction_context, flags, data)) + + wsrep::unique_lock sr_lock(sr_client_context->mutex()); + wsrep::transaction_context& sr_transaction_context( + sr_client_context->transaction_); + sr_transaction_context.state(sr_lock, s_certifying); + sr_lock.unlock(); + if (sr_client_context->append_fragment( + sr_transaction_context, flags_, data)) { lock.lock(); state(lock, s_must_abort); @@ -667,24 +691,43 @@ int wsrep::transaction_context::certify_fragment( enum wsrep::provider::status cert_ret(provider_.certify(client_context_.id().get(), - &sr_transaction_context.ws_handle_, - flags, - &sr_transaction_context.trx_meta_)); + sr_transaction_context.ws_handle_, + flags_, + sr_transaction_context.ws_meta_)); + int ret(0); switch (cert_ret) { case wsrep::provider::success: - sr_client_context->commit(sr_transaction_context); + streaming_context_.certified(sr_transaction_context.ws_meta().seqno()); + sr_lock.lock(); + sr_transaction_context.certified_ = true; + sr_transaction_context.state(sr_lock, s_committing); + sr_lock.unlock(); + if (sr_client_context->commit()) + { + ret = 1; + } break; default: - sr_client_context->rollback(sr_transaction_context); + sr_client_context->rollback(); ret = 1; break; } - + lock.lock(); + if (ret) + { + client_context_.provider().rollback(id_); + streaming_context_.rolled_back(id_); + state(lock, s_must_abort); + } + else + { + state(lock, s_executing); + flags_ &= ~wsrep::provider::flag::start_transaction; + } return ret; } -#endif int wsrep::transaction_context::certify_commit( wsrep::unique_lock& lock) diff --git a/src/transaction_context_test.cpp b/src/transaction_context_test.cpp index de8abfc..97b1215 100644 --- a/src/transaction_context_test.cpp +++ b/src/transaction_context_test.cpp @@ -102,6 +102,26 @@ namespace const wsrep::transaction_context& tc; }; + struct streaming_client_fixture_row + { + streaming_client_fixture_row() + : sc("s1", "s1", wsrep::server_context::rm_sync) + , cc(sc, wsrep::client_id(1), + wsrep::client_context::m_replicating) + , tc(cc.transaction()) + { + BOOST_REQUIRE(cc.before_command() == 0); + BOOST_REQUIRE(cc.before_statement() == 0); + // Verify initial state + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_executing); + cc.enable_streaming(wsrep::transaction_context::streaming_context::row, 1); + } + wsrep::mock_server_context sc; + wsrep::mock_client_context cc; + const wsrep::transaction_context& tc; + }; + typedef boost::mpl::vector @@ -519,14 +539,15 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( // // Test a 1PC transaction which gets BF aborted during before_commit via -// provider after the write set was ordered and certified. +// provider after the write set was ordered and certified. This must +// result replaying of transaction. // BOOST_FIXTURE_TEST_CASE_TEMPLATE( transaction_context_1pc_bf_during_before_commit_certified, T, replicating_fixtures, T) { wsrep::mock_server_context& sc(T::sc); - wsrep::client_context& cc(T::cc); + wsrep::mock_client_context& cc(T::cc); const wsrep::transaction_context& tc(T::tc); // Start a new transaction with ID 1 @@ -555,6 +576,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( BOOST_REQUIRE(tc.ordered() == false); BOOST_REQUIRE(tc.certified() == false); BOOST_REQUIRE(cc.current_error() == wsrep::e_success); + BOOST_REQUIRE(cc.replays() == 1); } // @@ -1161,3 +1183,24 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_applying_rollback, BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(cc.current_error() == wsrep::e_success); } + + +/////////////////////////////////////////////////////////////////////////////// +// STREAMING REPLICATION // +/////////////////////////////////////////////////////////////////////////////// + +BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_1pc_commit, + streaming_client_fixture_row) +{ + BOOST_REQUIRE(cc.start_transaction(1) == 0); + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(cc.before_commit() == 0); + BOOST_REQUIRE(cc.ordered_commit() == 0); + BOOST_REQUIRE(cc.after_commit() == 0); + BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success); + BOOST_REQUIRE(sc.provider().fragments() == 2); + BOOST_REQUIRE(sc.provider().start_fragments() == 1); + BOOST_REQUIRE(sc.provider().commit_fragments() == 1); + +}