diff --git a/dbsim/db_client_service.cpp b/dbsim/db_client_service.cpp index edf7df8..bb40017 100644 --- a/dbsim/db_client_service.cpp +++ b/dbsim/db_client_service.cpp @@ -21,6 +21,8 @@ #include "db_high_priority_service.hpp" #include "db_client.hpp" +#include + db::client_service::client_service(db::client& client) : wsrep::client_service() , client_(client) diff --git a/dbsim/db_storage_engine.cpp b/dbsim/db_storage_engine.cpp index c102783..9452e76 100644 --- a/dbsim/db_storage_engine.cpp +++ b/dbsim/db_storage_engine.cpp @@ -20,6 +20,8 @@ #include "db_storage_engine.hpp" #include "db_client.hpp" +#include + void db::storage_engine::transaction::start(db::client* cc) { wsrep::unique_lock lock(se_.mutex_); diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index e740370..cb46cef 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -163,10 +163,7 @@ namespace wsrep /** * Destructor. */ - virtual ~client_state() - { - assert(transaction_.active() == false); - } + virtual ~client_state(); /** @name Client session handling */ /** @{ */ @@ -298,11 +295,7 @@ namespace wsrep * * @param err Applying error (empty for no error) */ - void after_applying() - { - assert(mode_ == m_high_priority); - transaction_.after_applying(); - } + void after_applying(); /** @name Replication interface */ /** @{ */ @@ -313,12 +306,7 @@ namespace wsrep * - Register the transaction on server level for bookkeeping * - Isolation levels? Or part of the transaction? */ - int start_transaction(const wsrep::transaction_id& id) - { - wsrep::unique_lock lock(mutex_); - assert(state_ == s_exec); - return transaction_.start_transaction(id); - } + int start_transaction(const wsrep::transaction_id& id); /** * Establish read view ID of the transaction. @@ -332,12 +320,7 @@ namespace wsrep * * @param gtid optional explicit GTID of the transaction read view. */ - int assign_read_view(const wsrep::gtid* const gtid = NULL) - { - assert(mode_ == m_local); - assert(state_ == s_exec); - return transaction_.assign_read_view(gtid); - } + int assign_read_view(const wsrep::gtid* const gtid = NULL); /** * Append a key into transaction write set. @@ -346,12 +329,7 @@ namespace wsrep * * @return Zero on success, non-zero on failure. */ - int append_key(const wsrep::key& key) - { - assert(mode_ == m_local); - assert(state_ == s_exec); - return transaction_.append_key(key); - } + int append_key(const wsrep::key& key); /** * Append keys in key_array into transaction write set. @@ -360,29 +338,12 @@ namespace wsrep * * @return Zero in case of success, non-zero on failure. */ - int append_keys(const wsrep::key_array& keys) - { - assert(mode_ == m_local || mode_ == m_toi); - assert(state_ == s_exec); - for (auto i(keys.begin()); i != keys.end(); ++i) - { - if (transaction_.append_key(*i)) - { - return 1; - } - } - return 0; - } + int append_keys(const wsrep::key_array& keys); /** * Append data into transaction write set. */ - int append_data(const wsrep::const_buffer& data) - { - assert(mode_ == m_local); - assert(state_ == s_exec); - return transaction_.append_data(data); - } + int append_data(const wsrep::const_buffer& data); /** @} */ @@ -391,13 +352,7 @@ namespace wsrep /** * This method should be called after every row operation. */ - int after_row() - { - assert(mode_ == m_local); - assert(state_ == s_exec); - return (transaction_.streaming_context().fragment_size() ? - transaction_.after_row() : 0); - } + int after_row(); /** * Set streaming parameters. @@ -431,12 +386,7 @@ namespace wsrep */ void disable_streaming(); - void fragment_applied(wsrep::seqno seqno) - { - assert(mode_ == m_high_priority); - transaction_.fragment_applied(seqno); - } - + void fragment_applied(wsrep::seqno seqno); /** * Prepare write set meta data for ordering. * This method should be called before ordered commit or @@ -451,90 +401,31 @@ namespace wsrep */ int prepare_for_ordering(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, - bool is_commit) - { - assert(state_ == s_exec); - return transaction_.prepare_for_ordering( - ws_handle, ws_meta, is_commit); - } + bool is_commit); /** @} */ /** @name Applying interface */ /** @{ */ int start_transaction(const wsrep::ws_handle& wsh, - const wsrep::ws_meta& meta) - { - wsrep::unique_lock lock(mutex_); - assert(owning_thread_id_ == wsrep::this_thread::get_id()); - assert(mode_ == m_high_priority); - return transaction_.start_transaction(wsh, meta); - } + const wsrep::ws_meta& meta); - int next_fragment(const wsrep::ws_meta& meta) - { - wsrep::unique_lock lock(mutex_); - assert(mode_ == m_high_priority); - return transaction_.next_fragment(meta); - } + int next_fragment(const wsrep::ws_meta& meta); /** @name Commit ordering interface */ /** @{ */ - int before_prepare() - { - wsrep::unique_lock lock(mutex_); - assert(owning_thread_id_ == wsrep::this_thread::get_id()); - assert(state_ == s_exec); - return transaction_.before_prepare(lock); - } + int before_prepare(); - int after_prepare() - { - wsrep::unique_lock lock(mutex_); - assert(owning_thread_id_ == wsrep::this_thread::get_id()); - assert(state_ == s_exec); - return transaction_.after_prepare(lock); - } + int after_prepare(); - int before_commit() - { - assert(owning_thread_id_ == wsrep::this_thread::get_id()); - assert(state_ == s_exec || mode_ == m_local); - return transaction_.before_commit(); - } + int before_commit(); - int ordered_commit() - { - assert(owning_thread_id_ == wsrep::this_thread::get_id()); - assert(state_ == s_exec || mode_ == m_local); - return transaction_.ordered_commit(); - } + int ordered_commit(); - int after_commit() - { - assert(owning_thread_id_ == wsrep::this_thread::get_id()); - assert(state_ == s_exec || mode_ == m_local); - return transaction_.after_commit(); - } + int after_commit(); /** @} */ - int before_rollback() - { - assert(owning_thread_id_ == wsrep::this_thread::get_id()); - assert(state_ == s_idle || - state_ == s_exec || - state_ == s_result || - state_ == s_quitting); - return transaction_.before_rollback(); - } + int before_rollback(); - int after_rollback() - { - assert(owning_thread_id_ == wsrep::this_thread::get_id()); - assert(state_ == s_idle || - state_ == s_exec || - state_ == s_result || - state_ == s_quitting); - return transaction_.after_rollback(); - } + int after_rollback(); /** * This method should be called by the background rollbacker @@ -629,12 +520,7 @@ namespace wsrep * After this call, a different client may later attempt to terminate * the transaction by calling method commit_by_xid() or rollback_by_xid(). */ - void xa_detach() - { - assert(mode_ == m_local); - assert(state_ == s_none || state_ == s_exec || state_ == s_quitting); - transaction_.xa_detach(); - } + void xa_detach(); /** * Replay a XA transaction @@ -645,13 +531,7 @@ namespace wsrep * Since the victim is idle, this method can be called * by the BF aborter or the backround rollbacker. */ - void xa_replay() - { - assert(mode_ == m_local); - assert(state_ == s_idle); - wsrep::unique_lock lock(mutex_); - transaction_.xa_replay(lock); - } + void xa_replay(); // // BF aborting @@ -661,23 +541,14 @@ namespace wsrep * called by a transaction which needs to BF abort a conflicting * locally processing transaction. */ - int bf_abort(wsrep::seqno bf_seqno) - { - wsrep::unique_lock lock(mutex_); - assert(mode_ == m_local || transaction_.is_streaming()); - return transaction_.bf_abort(lock, bf_seqno); - } + int bf_abort(wsrep::seqno bf_seqno); + /** * Brute force abort a transaction in total order. This method * should be called by the TOI operation which needs to * BF abort a transaction. */ - int total_order_bf_abort(wsrep::seqno bf_seqno) - { - wsrep::unique_lock lock(mutex_); - assert(mode_ == m_local || transaction_.is_streaming()); - return transaction_.total_order_bf_abort(lock, bf_seqno); - } + int total_order_bf_abort(wsrep::seqno bf_seqno); /** * Adopt a streaming transaction state. This is must be @@ -686,20 +557,12 @@ namespace wsrep * set up enough context for handling the rollback * fragment. */ - void adopt_transaction(const wsrep::transaction& transaction) - { - assert(mode_ == m_high_priority); - transaction_.adopt(transaction); - } + void adopt_transaction(const wsrep::transaction& transaction); /** * Adopt (store) transaction applying error for further processing. */ - void adopt_apply_error(wsrep::mutable_buffer& err) - { - assert(mode_ == m_high_priority); - transaction_.adopt_apply_error(err); - } + void adopt_apply_error(wsrep::mutable_buffer& err); /** * Clone enough state from another transaction so that replaing will @@ -710,7 +573,6 @@ namespace wsrep */ void clone_transaction_for_replay(const wsrep::transaction& transaction) { - // assert(mode_ == m_high_priority); transaction_.clone_for_replay(transaction); } @@ -1186,19 +1048,8 @@ namespace wsrep class high_priority_context { public: - high_priority_context(wsrep::client_state& client) - : client_(client) - , orig_mode_(client.mode_) - { - wsrep::unique_lock lock(client.mutex_); - client.mode(lock, wsrep::client_state::m_high_priority); - } - virtual ~high_priority_context() - { - wsrep::unique_lock lock(client_.mutex_); - assert(client_.mode() == wsrep::client_state::m_high_priority); - client_.mode(lock, orig_mode_); - } + high_priority_context(wsrep::client_state& client); + virtual ~high_priority_context(); private: wsrep::client_state& client_; enum wsrep::client_state::mode orig_mode_; diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index ea7e97f..c00e9ed 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -27,7 +27,6 @@ #include "transaction_id.hpp" #include "compiler.hpp" -#include #include #include diff --git a/include/wsrep/server_state.hpp b/include/wsrep/server_state.hpp index 26ca073..e0f9d8c 100644 --- a/include/wsrep/server_state.hpp +++ b/include/wsrep/server_state.hpp @@ -554,11 +554,7 @@ namespace wsrep } enum state state(wsrep::unique_lock& - lock WSREP_UNUSED) const - { - assert(lock.owns_lock()); - return state_; - } + lock WSREP_UNUSED) const; /** * Get provider status variables. diff --git a/include/wsrep/sr_key_set.hpp b/include/wsrep/sr_key_set.hpp index abdb1e0..69227f5 100644 --- a/include/wsrep/sr_key_set.hpp +++ b/include/wsrep/sr_key_set.hpp @@ -22,9 +22,11 @@ #include #include +#include namespace wsrep { + class key; class sr_key_set { public: @@ -33,24 +35,9 @@ namespace wsrep sr_key_set() : root_() { } - - void insert(const wsrep::key& key) - { - assert(key.size() >= 2); - if (key.size() < 2) - { - throw wsrep::runtime_error("Invalid key size"); - } - - root_[std::string( - static_cast(key.key_parts()[0].data()), - key.key_parts()[0].size())].insert( - std::string(static_cast(key.key_parts()[1].data()), - key.key_parts()[1].size())); - } - + void insert(const wsrep::key& key); const branch_type& root() const { return root_; } - void clear() { root_.clear(); } + void clear(); bool empty() const { return root_.empty(); } private: branch_type root_; diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp index 9b205c5..45b19c9 100644 --- a/include/wsrep/streaming_context.hpp +++ b/include/wsrep/streaming_context.hpp @@ -29,6 +29,7 @@ namespace wsrep { + /* Helper class to store streaming transaction context. */ class streaming_context { public: @@ -58,144 +59,113 @@ namespace wsrep * @param fragment_unit Desired fragment unit. * @param fragment_size Desired fragment size. */ - void params(enum fragment_unit fragment_unit, size_t fragment_size) - { - if (fragment_size) - { - WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), - wsrep::log::debug_level_streaming, - "Enabling streaming: " - << fragment_unit << " " << fragment_size); - } - else - { - WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), - wsrep::log::debug_level_streaming, - "Disabling streaming"); - } - fragment_unit_ = fragment_unit; - fragment_size_ = fragment_size; - reset_unit_counter(); - } + void params(enum fragment_unit fragment_unit, size_t fragment_size); - void enable(enum fragment_unit fragment_unit, size_t fragment_size) - { - WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), - wsrep::log::debug_level_streaming, - "Enabling streaming: " - << fragment_unit << " " << fragment_size); - assert(fragment_size > 0); - fragment_unit_ = fragment_unit; - fragment_size_ = fragment_size; - } + /** + * Enable streaming replication. + * + * @param fragment_unit Desired fragment unit. + * @param fragment_size Desired fragment size. + */ + void enable(enum fragment_unit fragment_unit, size_t fragment_size); + /** Return current fragment unit. */ enum fragment_unit fragment_unit() const { return fragment_unit_; } + /** Return current fragment size. */ size_t fragment_size() const { return fragment_size_; } - void disable() - { - WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), - wsrep::log::debug_level_streaming, - "Disabling streaming"); - fragment_size_ = 0; - } + /** Disable streaming replication. */ + void disable(); + /** Increment counter for certified fragments. */ void certified() { ++fragments_certified_; } + /** Return number of certified fragments. */ size_t fragments_certified() const { return fragments_certified_; } - void stored(wsrep::seqno seqno) - { - check_fragment_seqno(seqno); - fragments_.push_back(seqno); - } + /** Mark fragment with seqno as stored in fragment store. */ + void stored(wsrep::seqno seqno); + /** Return number of stored fragments. */ size_t fragments_stored() const { return fragments_.size(); } - void applied(wsrep::seqno seqno) - { - check_fragment_seqno(seqno); - ++fragments_certified_; - fragments_.push_back(seqno); - } + /** Mark fragment with seqno as applied. */ + void applied(wsrep::seqno seqno); - void rolled_back(wsrep::transaction_id id) - { - assert(rollback_replicated_for_ == wsrep::transaction_id::undefined()); - rollback_replicated_for_ = id; - } + /** Mark streaming transaction as rolled back. */ + void rolled_back(wsrep::transaction_id id); + /** Return true if streaming transaction has been marked + * as rolled back. */ bool rolled_back() const { return (rollback_replicated_for_ != wsrep::transaction_id::undefined()); } + /** Return current value of unit counter. */ size_t unit_counter() const { return unit_counter_; } + /** Set value for unit counter. */ void set_unit_counter(size_t count) { unit_counter_ = count; } + /** Increment unit counter by inc. */ void increment_unit_counter(size_t inc) { unit_counter_ += inc; } + /** Reset unit counter to zero. */ void reset_unit_counter() { unit_counter_ = 0; } + /** Return current log position. */ size_t log_position() const { return log_position_; } + /** Set log position. */ void set_log_position(size_t position) { log_position_ = position; } + /** Return vector of stored fragments. */ const std::vector& fragments() const { return fragments_; } + /** Return true if the fragment size was exceeded. */ bool fragment_size_exceeded() const { return unit_counter_ >= fragment_size_; } - void cleanup() - { - fragments_certified_ = 0; - fragments_.clear(); - rollback_replicated_for_ = wsrep::transaction_id::undefined(); - unit_counter_ = 0; - log_position_ = 0; - } + /** Clean up the streaming transaction state. */ + void cleanup(); private: - void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED) - { - assert(seqno.is_undefined() == false); - assert(fragments_.empty() || fragments_.back() < seqno); - } + void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED); size_t fragments_certified_; std::vector fragments_; diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index fb69b35..c744066 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -29,7 +29,6 @@ #include "buffer.hpp" #include "xid.hpp" -#include #include namespace wsrep diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 7a3ee2b..85524ce 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -3,26 +3,29 @@ # add_library(wsrep-lib + allowlist_service_v1.cpp client_state.cpp + config_service_v1.cpp + event_service_v1.cpp exception.cpp gtid.cpp id.cpp - xid.cpp key.cpp logger.cpp provider.cpp provider_options.cpp + reporter.cpp seqno.cpp - view.cpp server_state.cpp + sr_key_set.cpp + streaming_context.cpp thread.cpp thread_service_v1.cpp tls_service_v1.cpp - event_service_v1.cpp transaction.cpp uuid.cpp - reporter.cpp - allowlist_service_v1.cpp + view.cpp wsrep_provider_v26.cpp - config_service_v1.cpp) + xid.cpp + ) target_link_libraries(wsrep-lib wsrep_api_v26 pthread ${WSREP_LIB_LIBDL}) diff --git a/src/client_state.cpp b/src/client_state.cpp index c547373..7d58dfd 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -25,9 +25,15 @@ #include "wsrep/client_service.hpp" #include // usleep() +#include #include #include +wsrep::client_state::~client_state() +{ + assert(transaction_.active() == false); +} + wsrep::provider& wsrep::client_state::provider() const { return server_state_.provider(); @@ -296,6 +302,146 @@ int wsrep::client_state::after_statement() return 0; } +void wsrep::client_state::after_applying() +{ + assert(mode_ == m_high_priority); + transaction_.after_applying(); +} + +int wsrep::client_state::start_transaction(const wsrep::transaction_id& id) +{ + wsrep::unique_lock lock(mutex_); + assert(state_ == s_exec); + return transaction_.start_transaction(id); +} + +int wsrep::client_state::assign_read_view(const wsrep::gtid* const gtid) +{ + assert(mode_ == m_local); + assert(state_ == s_exec); + return transaction_.assign_read_view(gtid); +} + +int wsrep::client_state::append_key(const wsrep::key& key) +{ + assert(mode_ == m_local); + assert(state_ == s_exec); + return transaction_.append_key(key); +} + +int wsrep::client_state::append_keys(const wsrep::key_array& keys) +{ + assert(mode_ == m_local || mode_ == m_toi); + assert(state_ == s_exec); + for (auto i(keys.begin()); i != keys.end(); ++i) + { + if (transaction_.append_key(*i)) + { + return 1; + } + } + return 0; +} + +int wsrep::client_state::append_data(const wsrep::const_buffer& data) +{ + assert(mode_ == m_local); + assert(state_ == s_exec); + return transaction_.append_data(data); +} + +int wsrep::client_state::after_row() +{ + assert(mode_ == m_local); + assert(state_ == s_exec); + return (transaction_.streaming_context().fragment_size() + ? transaction_.after_row() + : 0); +} + +void wsrep::client_state::fragment_applied(wsrep::seqno seqno) +{ + assert(mode_ == m_high_priority); + transaction_.fragment_applied(seqno); +} + +int wsrep::client_state::prepare_for_ordering(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + bool is_commit) +{ + assert(state_ == s_exec); + return transaction_.prepare_for_ordering(ws_handle, ws_meta, is_commit); +} + +int wsrep::client_state::start_transaction(const wsrep::ws_handle& wsh, + const wsrep::ws_meta& meta) +{ + wsrep::unique_lock lock(mutex_); + assert(owning_thread_id_ == wsrep::this_thread::get_id()); + assert(mode_ == m_high_priority); + return transaction_.start_transaction(wsh, meta); +} + +int wsrep::client_state::next_fragment(const wsrep::ws_meta& meta) +{ + wsrep::unique_lock lock(mutex_); + assert(mode_ == m_high_priority); + return transaction_.next_fragment(meta); +} + +int wsrep::client_state::before_prepare() +{ + wsrep::unique_lock lock(mutex_); + assert(owning_thread_id_ == wsrep::this_thread::get_id()); + assert(state_ == s_exec); + return transaction_.before_prepare(lock); +} + +int wsrep::client_state::after_prepare() +{ + wsrep::unique_lock lock(mutex_); + assert(owning_thread_id_ == wsrep::this_thread::get_id()); + assert(state_ == s_exec); + return transaction_.after_prepare(lock); +} + +int wsrep::client_state::before_commit() +{ + assert(owning_thread_id_ == wsrep::this_thread::get_id()); + assert(state_ == s_exec || mode_ == m_local); + return transaction_.before_commit(); +} + +int wsrep::client_state::ordered_commit() +{ + assert(owning_thread_id_ == wsrep::this_thread::get_id()); + assert(state_ == s_exec || mode_ == m_local); + return transaction_.ordered_commit(); +} + +int wsrep::client_state::after_commit() +{ + assert(owning_thread_id_ == wsrep::this_thread::get_id()); + assert(state_ == s_exec || mode_ == m_local); + return transaction_.after_commit(); +} + +int wsrep::client_state::before_rollback() +{ + assert(owning_thread_id_ == wsrep::this_thread::get_id()); + assert(state_ == s_idle || state_ == s_exec || state_ == s_result + || state_ == s_quitting); + return transaction_.before_rollback(); +} + +int wsrep::client_state::after_rollback() +{ + assert(owning_thread_id_ == wsrep::this_thread::get_id()); + assert(state_ == s_idle || state_ == s_exec || state_ == s_result + || state_ == s_quitting); + return transaction_.after_rollback(); +} + ////////////////////////////////////////////////////////////////////////////// // Rollbacker synchronization // ////////////////////////////////////////////////////////////////////////////// @@ -361,6 +507,56 @@ void wsrep::client_state::disable_streaming() transaction_.streaming_context().disable(); } +////////////////////////////////////////////////////////////////////////////// +// XA // +////////////////////////////////////////////////////////////////////////////// + +void wsrep::client_state::xa_detach() +{ + assert(mode_ == m_local); + assert(state_ == s_none || state_ == s_exec || state_ == s_quitting); + transaction_.xa_detach(); +} + +void wsrep::client_state::xa_replay() +{ + assert(mode_ == m_local); + assert(state_ == s_idle); + wsrep::unique_lock lock(mutex_); + transaction_.xa_replay(lock); +} + +////////////////////////////////////////////////////////////////////////////// +// BF // +////////////////////////////////////////////////////////////////////////////// + +int wsrep::client_state::bf_abort(wsrep::seqno bf_seqno) +{ + wsrep::unique_lock lock(mutex_); + assert(mode_ == m_local || transaction_.is_streaming()); + return transaction_.bf_abort(lock, bf_seqno); +} + +int wsrep::client_state::total_order_bf_abort(wsrep::seqno bf_seqno) +{ + wsrep::unique_lock lock(mutex_); + assert(mode_ == m_local || transaction_.is_streaming()); + return transaction_.total_order_bf_abort(lock, bf_seqno); +} + +void wsrep::client_state::adopt_transaction( + const wsrep::transaction& transaction) +{ + assert(mode_ == m_high_priority); + transaction_.adopt(transaction); +} + +void wsrep::client_state::adopt_apply_error(wsrep::mutable_buffer& err) +{ + assert(mode_ == m_high_priority); + transaction_.adopt_apply_error(err); +} + ////////////////////////////////////////////////////////////////////////////// // TOI // ////////////////////////////////////////////////////////////////////////////// @@ -884,3 +1080,22 @@ void wsrep::client_state::mode( } mode_ = mode; } + +/////////////////////////////////////////////////////////////////////////////// +// High Priority Context // +/////////////////////////////////////////////////////////////////////////////// + +wsrep::high_priority_context::high_priority_context(wsrep::client_state& client) + : client_(client) + , orig_mode_(client.mode_) +{ + wsrep::unique_lock lock(client.mutex_); + client.mode(lock, wsrep::client_state::m_high_priority); +} + +wsrep::high_priority_context::~high_priority_context() +{ + wsrep::unique_lock lock(client_.mutex_); + assert(client_.mode() == wsrep::client_state::m_high_priority); + client_.mode(lock, orig_mode_); +} diff --git a/src/config_service_v1.cpp b/src/config_service_v1.cpp index 6c4484d..ace6142 100644 --- a/src/config_service_v1.cpp +++ b/src/config_service_v1.cpp @@ -23,6 +23,8 @@ #include "wsrep/logger.hpp" #include "wsrep/provider_options.hpp" +#include + namespace wsrep_config_service_v1 { wsrep_config_service_v1_t service{ 0 }; diff --git a/src/provider.cpp b/src/provider.cpp index 43ded86..9e99f7f 100644 --- a/src/provider.cpp +++ b/src/provider.cpp @@ -23,6 +23,7 @@ #include "wsrep_provider_v26.hpp" #include +#include #include wsrep::provider* wsrep::provider::make_provider( diff --git a/src/reporter.cpp b/src/reporter.cpp index 9a8b24b..ad415d9 100644 --- a/src/reporter.cpp +++ b/src/reporter.cpp @@ -23,6 +23,7 @@ #include #include +#include #include // strerror() #include // mkstemp() #include // errno diff --git a/src/server_state.cpp b/src/server_state.cpp index 795127c..c7d63e2 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -1132,6 +1132,13 @@ int wsrep::server_state::on_apply( } } +enum wsrep::server_state::state wsrep::server_state::state( + wsrep::unique_lock& lock WSREP_UNUSED) const +{ + assert(lock.owns_lock()); + return state_; +} + void wsrep::server_state::start_streaming_client( wsrep::client_state* client_state) { diff --git a/src/sr_key_set.cpp b/src/sr_key_set.cpp new file mode 100644 index 0000000..31b6e69 --- /dev/null +++ b/src/sr_key_set.cpp @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2023 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "wsrep/sr_key_set.hpp" + +#include "wsrep/key.hpp" + +#include + +void wsrep::sr_key_set::insert(const wsrep::key& key) +{ + assert(key.size() >= 2); + if (key.size() < 2) + { + throw wsrep::runtime_error("Invalid key size"); + } + + root_[std::string(static_cast(key.key_parts()[0].data()), + key.key_parts()[0].size())] + .insert(std::string(static_cast(key.key_parts()[1].data()), + key.key_parts()[1].size())); +} + +void wsrep::sr_key_set::clear() +{ + root_.clear(); +} diff --git a/src/streaming_context.cpp b/src/streaming_context.cpp new file mode 100644 index 0000000..c542307 --- /dev/null +++ b/src/streaming_context.cpp @@ -0,0 +1,95 @@ +/* + * Copyright (C) 2023 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "wsrep/streaming_context.hpp" + +#include + +void wsrep::streaming_context::params(enum fragment_unit fragment_unit, + size_t fragment_size) +{ + if (fragment_size) + { + WSREP_LOG_DEBUG( + wsrep::log::debug_log_level(), wsrep::log::debug_level_streaming, + "Enabling streaming: " << fragment_unit << " " << fragment_size); + } + else + { + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_streaming, + "Disabling streaming"); + } + fragment_unit_ = fragment_unit; + fragment_size_ = fragment_size; + reset_unit_counter(); +} + +void wsrep::streaming_context::enable(enum fragment_unit fragment_unit, + size_t fragment_size) +{ + WSREP_LOG_DEBUG( + wsrep::log::debug_log_level(), wsrep::log::debug_level_streaming, + "Enabling streaming: " << fragment_unit << " " << fragment_size); + assert(fragment_size > 0); + fragment_unit_ = fragment_unit; + fragment_size_ = fragment_size; +} + +void wsrep::streaming_context::disable() +{ + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_streaming, "Disabling streaming"); + fragment_size_ = 0; +} + +void wsrep::streaming_context::stored(wsrep::seqno seqno) +{ + check_fragment_seqno(seqno); + fragments_.push_back(seqno); +} + +void wsrep::streaming_context::applied(wsrep::seqno seqno) +{ + check_fragment_seqno(seqno); + ++fragments_certified_; + fragments_.push_back(seqno); +} + +void wsrep::streaming_context::rolled_back(wsrep::transaction_id id) +{ + assert(rollback_replicated_for_ == wsrep::transaction_id::undefined()); + rollback_replicated_for_ = id; +} + +void wsrep::streaming_context::cleanup() +{ + fragments_certified_ = 0; + fragments_.clear(); + rollback_replicated_for_ = wsrep::transaction_id::undefined(); + unit_counter_ = 0; + log_position_ = 0; +} + +void wsrep::streaming_context::check_fragment_seqno( + wsrep::seqno seqno WSREP_UNUSED) +{ + assert(seqno.is_undefined() == false); + assert(fragments_.empty() || fragments_.back() < seqno); +} diff --git a/src/transaction.cpp b/src/transaction.cpp index 879cc23..f91c72c 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -28,6 +28,7 @@ #include "wsrep/server_service.hpp" #include "wsrep/client_service.hpp" +#include #include #include