diff --git a/include/wsrep/buffer.hpp b/include/wsrep/buffer.hpp index 014b113..8d306a4 100644 --- a/include/wsrep/buffer.hpp +++ b/include/wsrep/buffer.hpp @@ -5,27 +5,52 @@ #ifndef WSREP_BUFFER_HPP #define WSREP_BUFFER_HPP +#include + namespace wsrep { - class buffer + class const_buffer { public: - buffer() + const_buffer() : ptr_() , size_() { } - buffer(const void* ptr, size_t size) + + const_buffer(const void* ptr, size_t size) : ptr_(ptr) , size_(size) { } const void* ptr() const { return ptr_; } + const void* data() const { return ptr_; } size_t size() const { return size_; } private: + // const_buffer(const const_buffer&); + // const_buffer& operator=(const const_buffer&); const void* ptr_; size_t size_; }; + + + class mutable_buffer + { + public: + mutable_buffer() + : buffer_() + { } + + void push_back(const char* begin, const char* end) + { + buffer_.insert(buffer_.end(), begin, end); + } + const char* data() const { return &buffer_[0]; } + size_t size() const { return buffer_.size(); } + private: + std::vector buffer_; + }; + } #endif // WSREP_BUFFER_HPP diff --git a/include/wsrep/client_context.hpp b/include/wsrep/client_context.hpp index 6354bcb..0997a46 100644 --- a/include/wsrep/client_context.hpp +++ b/include/wsrep/client_context.hpp @@ -44,7 +44,7 @@ #include "client_id.hpp" #include "mutex.hpp" #include "lock.hpp" -#include "data.hpp" +#include "buffer.hpp" #include "thread.hpp" @@ -242,7 +242,7 @@ namespace wsrep return transaction_.append_key(key); } - int append_data(const wsrep::data& data) + int append_data(const wsrep::const_buffer& data) { assert(state_ == s_exec); return transaction_.append_data(data); @@ -400,7 +400,7 @@ namespace wsrep friend int server_context::on_apply(client_context&, const wsrep::ws_handle&, const wsrep::ws_meta&, - const wsrep::data&); + const wsrep::const_buffer&); friend class client_context_switch; friend class client_applier_mode; friend class client_toi_mode; @@ -425,18 +425,17 @@ namespace wsrep /*! * Append SR fragment to the transaction. */ - virtual int append_fragment(wsrep::transaction_context&, - int, const wsrep::data&) - { return 0; } - + virtual int append_fragment(const wsrep::transaction_context&, + int, const wsrep::const_buffer&) = 0; + virtual void remove_fragments(const wsrep::transaction_context&) = 0; /*! * This method applies a write set give in data buffer. * This must be implemented by the DBMS integration. * * \return Zero on success, non-zero on applying failure. */ - virtual int apply(const wsrep::data& data) = 0; + virtual int apply(const wsrep::const_buffer& data) = 0; /*! * Virtual method which will be called @@ -474,8 +473,10 @@ namespace wsrep virtual void wait_for_replayers(wsrep::unique_lock&) = 0; virtual int prepare_data_for_replication( - const wsrep::transaction_context&, wsrep::data& data) = 0; + const wsrep::transaction_context&) = 0; + virtual int prepare_fragment_for_replication( + const wsrep::transaction_context&, wsrep::mutable_buffer&) = 0; /*! * Return true if the current client operation was killed. */ diff --git a/include/wsrep/data.hpp b/include/wsrep/data.hpp deleted file mode 100644 index 3718b28..0000000 --- a/include/wsrep/data.hpp +++ /dev/null @@ -1,27 +0,0 @@ -// -// Copyright (C) 2018 Codership Oy -// - -#ifndef WSREP_DATA_HPP -#define WSREP_DATA_HPP - -namespace wsrep -{ - class data - { - public: - data() - : buf_() - { - } - data(const void* ptr, size_t len) - : buf_(ptr, len) - { - } - const wsrep::buffer& get() const { return buf_; } - private: - wsrep::buffer buf_; - }; -} - -#endif // WSREP_DATA_HPP diff --git a/include/wsrep/key.hpp b/include/wsrep/key.hpp index 04673ed..c230e74 100644 --- a/include/wsrep/key.hpp +++ b/include/wsrep/key.hpp @@ -33,7 +33,7 @@ namespace wsrep { throw wsrep::runtime_error("key parts exceed maximum of 3"); } - key_parts_[key_parts_len_] = wsrep::buffer(ptr, len); + key_parts_[key_parts_len_] = wsrep::const_buffer(ptr, len); ++key_parts_len_; } @@ -47,14 +47,14 @@ namespace wsrep return key_parts_len_; } - const wsrep::buffer* key_parts() const + const wsrep::const_buffer* key_parts() const { return key_parts_; } private: enum type type_; - wsrep::buffer key_parts_[3]; + wsrep::const_buffer key_parts_[3]; size_t key_parts_len_; }; } diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index 3e736cc..89a253b 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -7,7 +7,7 @@ #include "gtid.hpp" #include "key.hpp" -#include "data.hpp" +#include "buffer.hpp" #include "client_id.hpp" #include "transaction_id.hpp" @@ -213,7 +213,7 @@ namespace wsrep // TODO: Rename to assing_read_view() virtual int start_transaction(wsrep::ws_handle&) = 0; virtual int append_key(wsrep::ws_handle&, const wsrep::key&) = 0; - virtual int append_data(wsrep::ws_handle&, const wsrep::data&) = 0; + virtual int append_data(wsrep::ws_handle&, const wsrep::const_buffer&) = 0; virtual enum status certify(wsrep::client_id, wsrep::ws_handle&, int, diff --git a/include/wsrep/server_context.hpp b/include/wsrep/server_context.hpp index 0438b4d..ffc0a4c 100644 --- a/include/wsrep/server_context.hpp +++ b/include/wsrep/server_context.hpp @@ -77,7 +77,7 @@ namespace wsrep class transaction_context; class gtid; class view; - class data; + class const_buffer; /*! \class Server Context * @@ -342,7 +342,7 @@ namespace wsrep int on_apply(wsrep::client_context& client_context, const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, - const wsrep::data& data); + const wsrep::const_buffer& data); /*! * This virtual method should be implemented by the DBMS diff --git a/include/wsrep/transaction_context.hpp b/include/wsrep/transaction_context.hpp index 38f3f67..6edba09 100644 --- a/include/wsrep/transaction_context.hpp +++ b/include/wsrep/transaction_context.hpp @@ -17,7 +17,7 @@ namespace wsrep { class client_context; class key; - class data; + class const_buffer; class transaction_context @@ -67,10 +67,13 @@ namespace wsrep bool ordered() const { return (ws_meta_.seqno().nil() == false); } + /*! + * Return true if any fragments have been succesfully certified + * for the transaction. + */ bool is_streaming() const { - // Streaming support not yet implemented - return false; + return (streaming_context_.fragments_certified() > 0); } bool pa_unsafe() const { return pa_unsafe_; } @@ -91,7 +94,7 @@ namespace wsrep int append_key(const wsrep::key&); - int append_data(const wsrep::data&); + int append_data(const wsrep::const_buffer&); int after_row(); @@ -132,7 +135,6 @@ namespace wsrep void flags(int flags) { flags_ = flags; } int certify_fragment(wsrep::unique_lock&); int certify_commit(wsrep::unique_lock&); - void remove_fragments(); void clear_fragments(); void cleanup(); void debug_log_state(const char*) const; @@ -215,6 +217,14 @@ namespace wsrep size_t unit_counter() const { return unit_counter_; } void increment_unit_counter() { ++unit_counter_; } + + void cleanup() + { + fragments_.clear(); + rollback_replicated_for_ = wsrep::transaction_id::invalid(); + bytes_certified_ = 0; + unit_counter_ = 0; + } private: std::vector fragments_; wsrep::transaction_id rollback_replicated_for_; diff --git a/src/dbms_simulator.cpp b/src/dbms_simulator.cpp index dd6d814..a1f0d74 100644 --- a/src/dbms_simulator.cpp +++ b/src/dbms_simulator.cpp @@ -13,7 +13,6 @@ #include "wsrep/client_context.hpp" #include "wsrep/transaction_context.hpp" #include "wsrep/key.hpp" -#include "wsrep/data.hpp" #include "wsrep/provider.hpp" #include "wsrep/condition_variable.hpp" #include "wsrep/view.hpp" @@ -274,7 +273,7 @@ public: dbms_storage_engine& storage_engine() { return storage_engine_; } int apply_to_storage_engine(const wsrep::transaction_context& txc, - const wsrep::data&) + const wsrep::const_buffer&) { storage_engine_.bf_abort_some(txc); return 0; @@ -346,7 +345,15 @@ public: private: bool is_autocommit() const override { return false; } bool do_2pc() const override { return false; } - int apply(const wsrep::data& data) override + int append_fragment(const wsrep::transaction_context&, + int, const wsrep::const_buffer&) override + { + return 0; + } + void remove_fragments(const wsrep::transaction_context&) override + { + } + int apply(const wsrep::const_buffer& data) override { return server_.apply_to_storage_engine(transaction(), data); } @@ -381,12 +388,12 @@ private: } void wait_for_replayers(wsrep::unique_lock&) override { } - int prepare_data_for_replication(const wsrep::transaction_context&, - wsrep::data&) + int prepare_data_for_replication(const wsrep::transaction_context&) override - { - return 0; - } + { return 0; } + int prepare_fragment_for_replication(const wsrep::transaction_context&, + wsrep::mutable_buffer&) override + { return 0; } bool killed() const override { return false; } void abort() override { ::abort(); } public: @@ -454,8 +461,9 @@ private: key.append_key_part(&client_key, sizeof(client_key)); key.append_key_part(&data, sizeof(data)); err = append_key(key); - err = err || append_data(wsrep::data(os.str().c_str(), - os.str().size())); + err = err || append_data( + wsrep::const_buffer(os.str().c_str(), + os.str().size())); return err; }); err = err || client_command( diff --git a/src/mock_client_context.cpp b/src/mock_client_context.cpp index 861a8c5..4c5a01e 100644 --- a/src/mock_client_context.cpp +++ b/src/mock_client_context.cpp @@ -7,7 +7,7 @@ int wsrep::mock_client_context::apply( - const wsrep::data& data __attribute__((unused))) + const wsrep::const_buffer& data __attribute__((unused))) { assert(transaction_.state() == wsrep::transaction_context::s_executing); diff --git a/src/mock_client_context.hpp b/src/mock_client_context.hpp index 39c5462..f9a009e 100644 --- a/src/mock_client_context.hpp +++ b/src/mock_client_context.hpp @@ -42,11 +42,16 @@ namespace wsrep (void)rollback(); } } - int apply(const wsrep::data&); + int apply(const wsrep::const_buffer&); int commit(); int rollback(); bool is_autocommit() const { return is_autocommit_; } bool do_2pc() const { return do_2pc_; } + int append_fragment(const wsrep::transaction_context&, + int, const wsrep::const_buffer&) WSREP_OVERRIDE + { return 0; } + void remove_fragments(const wsrep::transaction_context& ) + WSREP_OVERRIDE { } void will_replay(wsrep::transaction_context&) WSREP_OVERRIDE { } int replay(wsrep::transaction_context& tc) WSREP_OVERRIDE { @@ -68,17 +73,31 @@ namespace wsrep lock.lock(); } int prepare_data_for_replication( - const wsrep::transaction_context&, wsrep::data& data) WSREP_OVERRIDE + const wsrep::transaction_context&) WSREP_OVERRIDE { if (error_during_prepare_data_) { return 1; } static const char buf[1] = { 1 }; - data = wsrep::data(buf, 1); - return 0; - + wsrep::const_buffer data = wsrep::const_buffer(buf, 1); + return transaction_.append_data(data); } + + int prepare_fragment_for_replication(const wsrep::transaction_context&, + wsrep::mutable_buffer& buffer) + WSREP_OVERRIDE + { + if (error_during_prepare_data_) + { + return 1; + } + static const char buf[1] = { 1 }; + buffer.push_back(&buf[0], &buf[1]); + wsrep::const_buffer data(buffer.data(), buffer.size()); + return transaction_.append_data(data); + } + bool killed() const WSREP_OVERRIDE { return killed_before_certify_; } void abort() WSREP_OVERRIDE { ++aborts_; } void store_globals() WSREP_OVERRIDE { } @@ -94,7 +113,7 @@ namespace wsrep } void debug_suicide(const char*) WSREP_OVERRIDE { - ::abort(); + // Not going to do this while unit testing } void on_error(enum wsrep::client_error) { } diff --git a/src/mock_provider.hpp b/src/mock_provider.hpp index b32bb20..10656bc 100644 --- a/src/mock_provider.hpp +++ b/src/mock_provider.hpp @@ -7,6 +7,7 @@ #include "wsrep/provider.hpp" #include "wsrep/logger.hpp" +#include "wsrep/buffer.hpp" #include #include @@ -50,7 +51,10 @@ namespace wsrep { wsrep::log_info() << "provider certify: " << "client: " << client_id.get() - << " flags: " << std::hex << flags; + << " flags: " << std::hex << flags + << std::dec + << "next_error: " << next_error_; + if (next_error_) { return next_error_; @@ -108,9 +112,9 @@ namespace wsrep } int append_key(wsrep::ws_handle&, const wsrep::key&) - { return next_error_; } - int append_data(wsrep::ws_handle&, const wsrep::data&) - { return next_error_; } + { return 0; } + int append_data(wsrep::ws_handle&, const wsrep::const_buffer&) + { return 0; } int rollback(const wsrep::transaction_id) { return next_error_; } enum wsrep::provider::status diff --git a/src/server_context.cpp b/src/server_context.cpp index 58bb699..7cbe0aa 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -114,7 +114,7 @@ int wsrep::server_context::on_apply( wsrep::client_context& client_context, const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, - const wsrep::data& data) + const wsrep::const_buffer& data) { int ret(0); const wsrep::transaction_context& txc(client_context.transaction()); diff --git a/src/server_context_test.cpp b/src/server_context_test.cpp index 7cf2bdf..2d2cdd4 100644 --- a/src/server_context_test.cpp +++ b/src/server_context_test.cpp @@ -39,7 +39,7 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_1pc, { char buf[1] = { 1 }; BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, - wsrep::data(buf, 1)) == 0); + wsrep::const_buffer(buf, 1)) == 0); const wsrep::transaction_context& txc(cc.transaction()); // ::abort(); BOOST_REQUIRE_MESSAGE( @@ -53,7 +53,7 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_2pc, { char buf[1] = { 1 }; BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, - wsrep::data(buf, 1)) == 0); + wsrep::const_buffer(buf, 1)) == 0); const wsrep::transaction_context& txc(cc.transaction()); BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_committed); } @@ -66,7 +66,7 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_1pc_rollback, cc.fail_next_applying_ = true; char buf[1] = { 1 }; BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, - wsrep::data(buf, 1)) == 1); + wsrep::const_buffer(buf, 1)) == 1); const wsrep::transaction_context& txc(cc.transaction()); BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_aborted); } @@ -79,7 +79,7 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_2pc_rollback, cc.fail_next_applying_ = true; char buf[1] = { 1 }; BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, - wsrep::data(buf, 1)) == 1); + wsrep::const_buffer(buf, 1)) == 1); const wsrep::transaction_context& txc(cc.transaction()); BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_aborted); } diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index 3ea780c..3579a9a 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -6,7 +6,6 @@ #include "wsrep/client_context.hpp" #include "wsrep/server_context.hpp" #include "wsrep/key.hpp" -#include "wsrep/data.hpp" #include "wsrep/logger.hpp" #include "wsrep/compiler.hpp" @@ -87,7 +86,7 @@ int wsrep::transaction_context::append_key(const wsrep::key& key) return provider_.append_key(ws_handle_, key); } -int wsrep::transaction_context::append_data(const wsrep::data& data) +int wsrep::transaction_context::append_data(const wsrep::const_buffer& data) { return provider_.append_data(ws_handle_, data); @@ -149,7 +148,7 @@ int wsrep::transaction_context::before_prepare() } else { - remove_fragments(); + client_context_.remove_fragments(*this); } lock.lock(); client_context_.debug_suicide( @@ -658,8 +657,8 @@ int wsrep::transaction_context::certify_fragment( flags_ |= wsrep::provider::flag::start_transaction; } - wsrep::data data; - if (client_context_.prepare_data_for_replication(*this, data)) + wsrep::mutable_buffer data; + if (client_context_.prepare_fragment_for_replication(*this, data)) { lock.lock(); state(lock, s_must_abort); @@ -681,7 +680,8 @@ int wsrep::transaction_context::certify_fragment( sr_transaction_context.state(sr_lock, s_certifying); sr_lock.unlock(); if (sr_client_context->append_fragment( - sr_transaction_context, flags_, data)) + sr_transaction_context, flags_, + wsrep::const_buffer(data.data(), data.size()))) { lock.lock(); state(lock, s_must_abort); @@ -750,8 +750,7 @@ int wsrep::transaction_context::certify_commit( flags(flags() | wsrep::provider::flag::commit); lock.unlock(); - wsrep::data data; - if (client_context_.prepare_data_for_replication(*this, data)) + if (client_context_.prepare_data_for_replication(*this)) { // Note: Error must be set by prepare_data_for_replication() lock.lock(); @@ -870,14 +869,9 @@ int wsrep::transaction_context::certify_commit( return ret; } -void wsrep::transaction_context::remove_fragments() -{ - throw wsrep::not_implemented_error(); -} - void wsrep::transaction_context::clear_fragments() { - throw wsrep::not_implemented_error(); + streaming_context_.cleanup(); } void wsrep::transaction_context::cleanup() diff --git a/src/transaction_context_test.cpp b/src/transaction_context_test.cpp index 97b1215..33b37e4 100644 --- a/src/transaction_context_test.cpp +++ b/src/transaction_context_test.cpp @@ -140,7 +140,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_append_key_data, key.append_key_part(&vals[i], sizeof(vals[i])); } BOOST_REQUIRE(cc.append_key(key) == 0); - wsrep::data data(&vals[2], sizeof(vals[2])); + wsrep::const_buffer data(&vals[2], sizeof(vals[2])); BOOST_REQUIRE(cc.append_data(data) == 0); BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(cc.ordered_commit() == 0); @@ -1204,3 +1204,21 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_1pc_commit, BOOST_REQUIRE(sc.provider().commit_fragments() == 1); } + +BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_2pc_commit, + streaming_client_fixture_row) +{ + BOOST_REQUIRE(cc.start_transaction(1) == 0); + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(cc.before_prepare() == 0); + BOOST_REQUIRE(cc.after_prepare() == 0); + BOOST_REQUIRE(cc.before_commit() == 0); + BOOST_REQUIRE(cc.ordered_commit() == 0); + BOOST_REQUIRE(cc.after_commit() == 0); + BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success); + BOOST_REQUIRE(sc.provider().fragments() == 2); + BOOST_REQUIRE(sc.provider().start_fragments() == 1); + BOOST_REQUIRE(sc.provider().commit_fragments() == 1); + +} diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 52b2dc7..0b76c57 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -314,7 +314,7 @@ namespace assert(client_context); assert(client_context->mode() == wsrep::client_context::m_applier); - wsrep::data data(buf->ptr, buf->len); + wsrep::const_buffer data(buf->ptr, buf->len); wsrep::ws_handle ws_handle(wsh->trx_id, wsh->opaque); wsrep::ws_meta ws_meta( wsrep::gtid(wsrep::id(meta->gtid.uuid.data, @@ -484,9 +484,9 @@ int wsrep::wsrep_provider_v26::append_key(wsrep::ws_handle& ws_handle, } int wsrep::wsrep_provider_v26::append_data(wsrep::ws_handle& ws_handle, - const wsrep::data& data) + const wsrep::const_buffer& data) { - const wsrep_buf_t wsrep_buf = {data.get().ptr(), data.get().size()}; + const wsrep_buf_t wsrep_buf = {data.data(), data.size()}; mutable_ws_handle mwsh(ws_handle); return (wsrep_->append_data(wsrep_, mwsh.native(), &wsrep_buf, 1, WSREP_DATA_ORDERED, true) diff --git a/src/wsrep_provider_v26.hpp b/src/wsrep_provider_v26.hpp index b89298f..db650db 100644 --- a/src/wsrep_provider_v26.hpp +++ b/src/wsrep_provider_v26.hpp @@ -25,7 +25,7 @@ namespace wsrep enum wsrep::provider::status run_applier(void*); int start_transaction(wsrep::ws_handle&) { return 0; } int append_key(wsrep::ws_handle&, const wsrep::key&); - int append_data(wsrep::ws_handle&, const wsrep::data&); + int append_data(wsrep::ws_handle&, const wsrep::const_buffer&); enum wsrep::provider::status certify(wsrep::client_id, wsrep::ws_handle&, int,