diff --git a/dbsim/db_storage_service.hpp b/dbsim/db_storage_service.hpp index 8974f9e..28eb78e 100644 --- a/dbsim/db_storage_service.hpp +++ b/dbsim/db_storage_service.hpp @@ -14,7 +14,8 @@ namespace db { int start_transaction() override { throw wsrep::not_implemented_error(); } - int append_fragment(const wsrep::id&, wsrep::client_id, + int append_fragment(const wsrep::id&, + wsrep::transaction_id, int, const wsrep::const_buffer&) override { throw wsrep::not_implemented_error(); } diff --git a/include/wsrep/storage_service.hpp b/include/wsrep/storage_service.hpp index 43d20df..e8f5f24 100644 --- a/include/wsrep/storage_service.hpp +++ b/include/wsrep/storage_service.hpp @@ -14,7 +14,7 @@ #ifndef WSREP_STORAGE_SERVICE_HPP #define WSREP_STORAGE_SERVICE_HPP -#include "client_id.hpp" +#include "transaction_id.hpp" #include "id.hpp" #include "buffer.hpp" @@ -46,7 +46,7 @@ namespace wsrep * Append fragment into stable storage. */ virtual int append_fragment(const wsrep::id& server_id, - wsrep::client_id client_id, + wsrep::transaction_id client_id, int flags, const wsrep::const_buffer& data) = 0; diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp index 2bb4a87..6be91c2 100644 --- a/include/wsrep/streaming_context.hpp +++ b/include/wsrep/streaming_context.hpp @@ -5,93 +5,97 @@ #ifndef WSREP_STREAMING_CONTEXT_HPP #define WSREP_STREAMING_CONTEXT_HPP +#include "logger.hpp" + namespace wsrep { - class streaming_context + class streaming_context + { + public: + enum fragment_unit { - public: - enum fragment_unit - { - row, - bytes, - statement - }; - - streaming_context() - : fragments_() - , rollback_replicated_for_() - , fragment_unit_() - , fragment_size_() - , bytes_certified_() - , unit_counter_() - { } - - void enable(enum fragment_unit fragment_unit, size_t fragment_size) - { - assert(fragment_size > 0); - fragment_unit_ = fragment_unit; - fragment_size_ = fragment_size; - } - - enum fragment_unit fragment_unit() const { return fragment_unit_; } - - size_t fragment_size() const { return fragment_size_; } - - void disable() - { - fragment_size_ = 0; - } - - void certified(wsrep::seqno seqno) - { - fragments_.push_back(seqno); - } - - size_t fragments_certified() const - { - return fragments_.size(); - } - - size_t bytes_certified() const - { - return bytes_certified_; - } - - void rolled_back(wsrep::transaction_id id) - { - assert(rollback_replicated_for_ == wsrep::transaction_id::undefined()); - rollback_replicated_for_ = id; - } - - bool rolled_back() const - { - return (rollback_replicated_for_ != - wsrep::transaction_id::undefined()); - } - - size_t unit_counter() const { return unit_counter_; } - void increment_unit_counter(size_t inc) - { unit_counter_ += inc; } - void reset_unit_counter() { unit_counter_ = 0; } - const std::vector& fragments() const - { - return fragments_; - } - void cleanup() - { - fragments_.clear(); - rollback_replicated_for_ = wsrep::transaction_id::undefined(); - bytes_certified_ = 0; - unit_counter_ = 0; - } - private: - std::vector fragments_; - wsrep::transaction_id rollback_replicated_for_; - enum fragment_unit fragment_unit_; - size_t fragment_size_; - size_t bytes_certified_; - size_t unit_counter_; + row, + bytes, + statement }; + + streaming_context() + : fragments_() + , rollback_replicated_for_() + , fragment_unit_() + , fragment_size_() + , bytes_certified_() + , unit_counter_() + { } + + void enable(enum fragment_unit fragment_unit, size_t fragment_size) + { + wsrep::log_info() << "Enabling streaming: " + << fragment_unit << " " << fragment_size; + assert(fragment_size > 0); + fragment_unit_ = fragment_unit; + fragment_size_ = fragment_size; + } + + enum fragment_unit fragment_unit() const { return fragment_unit_; } + + size_t fragment_size() const { return fragment_size_; } + + void disable() + { + fragment_size_ = 0; + } + + void certified(wsrep::seqno seqno) + { + fragments_.push_back(seqno); + } + + size_t fragments_certified() const + { + return fragments_.size(); + } + + size_t bytes_certified() const + { + return bytes_certified_; + } + + void rolled_back(wsrep::transaction_id id) + { + assert(rollback_replicated_for_ == wsrep::transaction_id::undefined()); + rollback_replicated_for_ = id; + } + + bool rolled_back() const + { + return (rollback_replicated_for_ != + wsrep::transaction_id::undefined()); + } + + size_t unit_counter() const { return unit_counter_; } + void increment_unit_counter(size_t inc) + { unit_counter_ += inc; } + void reset_unit_counter() { unit_counter_ = 0; } + const std::vector& fragments() const + { + return fragments_; + } + void cleanup() + { + fragments_.clear(); + rollback_replicated_for_ = wsrep::transaction_id::undefined(); + bytes_certified_ = 0; + unit_counter_ = 0; + } + private: + std::vector fragments_; + wsrep::transaction_id rollback_replicated_for_; + enum fragment_unit fragment_unit_; + size_t fragment_size_; + size_t bytes_certified_; + size_t unit_counter_; + }; } #endif // WSREP_STREAMING_CONTEXT_HPP diff --git a/src/client_state.cpp b/src/client_state.cpp index ebde98c..15ea948 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -194,6 +194,7 @@ int wsrep::client_state::after_statement() // wsrep::unique_lock lock(mutex_); debug_log_state("after_statement: enter"); assert(state() == s_exec); + assert(mode() == m_local); #if 0 /** * @todo Check for replay state, do rollback if requested. diff --git a/src/transaction.cpp b/src/transaction.cpp index 6ae493d..464face 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -195,6 +195,7 @@ int wsrep::transaction::append_data(const wsrep::const_buffer& data) int wsrep::transaction::after_row() { wsrep::unique_lock lock(client_state_.mutex()); + debug_log_state("after_row_enter"); if (streaming_context_.fragment_size() > 0) { switch (streaming_context_.fragment_unit()) @@ -843,7 +844,7 @@ int wsrep::transaction::certify_fragment( if (storage_service.append_fragment( client_state_.server_state().id(), - client_state_.id(), + id(), flags_, wsrep::const_buffer(data.data(), data.size()))) { diff --git a/test/mock_storage_service.hpp b/test/mock_storage_service.hpp index 6aaa1be..2270db4 100644 --- a/test/mock_storage_service.hpp +++ b/test/mock_storage_service.hpp @@ -21,7 +21,7 @@ class mock_server_state; int start_transaction() WSREP_OVERRIDE; int append_fragment(const wsrep::id&, - wsrep::client_id, + wsrep::transaction_id, int, const wsrep::const_buffer&) WSREP_OVERRIDE