diff --git a/dbsim/db_client.cpp b/dbsim/db_client.cpp index 2a66100..944fd16 100644 --- a/dbsim/db_client.cpp +++ b/dbsim/db_client.cpp @@ -5,6 +5,8 @@ #include "db_client.hpp" #include "db_server.hpp" +#include "wsrep/logger.hpp" + db::client::client(db::server& server, wsrep::client_id client_id, enum wsrep::client_state::mode mode, diff --git a/dbsim/db_server.cpp b/dbsim/db_server.cpp index 3901580..430ea72 100644 --- a/dbsim/db_server.cpp +++ b/dbsim/db_server.cpp @@ -6,6 +6,8 @@ #include "db_client.hpp" #include "db_simulator.hpp" +#include "wsrep/logger.hpp" + db::server::server(simulator& simulator, const std::string& name, const std::string& server_id, @@ -32,7 +34,7 @@ void db::server::applier_thread() &applier.client_state())); enum wsrep::provider::status ret( server_state_.provider().run_applier(cc)); - wsrep::log() << "Applier thread exited with error code " << ret; + wsrep::log_info() << "Applier thread exited with error code " << ret; } void db::server::start_applier() diff --git a/dbsim/db_server_state.cpp b/dbsim/db_server_state.cpp index 74ea3ba..79a8fd2 100644 --- a/dbsim/db_server_state.cpp +++ b/dbsim/db_server_state.cpp @@ -5,6 +5,8 @@ #include "db_server_state.hpp" #include "db_server.hpp" +#include "wsrep/logger.hpp" + wsrep::client_state* db::server_state::local_client_state() { return server_.local_client_state(); diff --git a/dbsim/db_simulator.cpp b/dbsim/db_simulator.cpp index 48a1302..31bb094 100644 --- a/dbsim/db_simulator.cpp +++ b/dbsim/db_simulator.cpp @@ -3,6 +3,9 @@ // #include "db_simulator.hpp" + +#include "wsrep/logger.hpp" + #include #include diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index a689240..a059607 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -18,7 +18,6 @@ #include "lock.hpp" #include "buffer.hpp" #include "thread.hpp" -#include "logger.hpp" namespace wsrep { @@ -117,12 +116,16 @@ namespace wsrep /** * Return true if the transaction commit requires * two-phase commit. + * + * @todo deprecate */ bool do_2pc() const { return client_service_.do_2pc(); } + /** @name Client command handling */ + /** @{ */ /** * This mehod should be called before the processing of command * received from DBMS client starts. @@ -165,7 +168,10 @@ namespace wsrep * idle. */ void after_command_after_result(); + /** @} */ + /** @name Statement level operations */ + /** @{ */ /** * Before statement execution operations. * @@ -199,10 +205,10 @@ namespace wsrep * * Do rollback if requested */ enum after_statement_result after_statement(); + /** @} */ - // - // Replicating interface - // + /** @name Replication interface */ + /** @{ */ /** * Start a new transaction with a transaction id. * @@ -240,10 +246,10 @@ namespace wsrep { return client_service_.prepare_data_for_replication(*this, tc); } + /** @} */ - // - // Streaming interface - // + /** @name Streaming replication interface */ + /** @{ */ /** * This method should be called after every row operation. */ @@ -270,22 +276,7 @@ namespace wsrep int enable_streaming( enum wsrep::streaming_context::fragment_unit fragment_unit, - size_t fragment_size) - { - assert(mode_ == m_replicating); - if (transaction_.active() && - transaction_.streaming_context_.fragment_unit() != - fragment_unit) - { - wsrep::log_error() - << "Changing fragment unit for active transaction " - << "not allowed"; - return 1; - } - transaction_.streaming_context_.enable( - fragment_unit, fragment_size); - return 0; - } + size_t fragment_size); /** @todo deprecate */ size_t bytes_generated() const @@ -321,11 +312,10 @@ namespace wsrep { client_service_.remove_fragments(transaction_); } + /** @} */ - // - // Applying interface - // - + /** @name Applying interface */ + /** @{ */ int start_transaction(const wsrep::ws_handle& wsh, const wsrep::ws_meta& meta) { @@ -346,11 +336,10 @@ namespace wsrep *this, transaction_.ws_handle(), transaction_.ws_meta()); } + /** @} */ - // - // Commit ordering - // - + /** @name Commit ordering interface */ + /** @{ */ int before_prepare() { wsrep::unique_lock lock(mutex_); @@ -382,10 +371,10 @@ namespace wsrep assert(state_ == s_exec || mode_ == m_local); return transaction_.after_commit(); } + /** @} */ - // - // Rollback - // + /** @name Rollback interface */ + /** @{ */ int rollback() { return client_service_.rollback(*this); @@ -402,6 +391,7 @@ namespace wsrep assert(state_ == s_idle || state_ == s_exec || state_ == s_result); return transaction_.after_rollback(); } + /** @} */ // // BF aborting @@ -481,6 +471,33 @@ namespace wsrep */ int wait_for_gtid(const wsrep::gtid&) const; + // + // + // + + /** @name Non-transactional operations */ + /** @{*/ + + /** + * Enter total order isolation critical section. + */ + int enter_toi(); + + /** + * Leave total order isolation critical section. + */ + void leave_toi(); + + /** + * Begin non-blocking operation. + */ + int begin_nbo(); + + /** + * End non-blocking operation + */ + void end_nbo(); + /** @} */ // // Debug interface // diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index 090a602..050976a 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -16,8 +16,6 @@ #include #include -#include -#include namespace wsrep { @@ -267,26 +265,7 @@ namespace wsrep wsrep::server_state& server_state_; }; - static inline std::string flags_to_string(int flags) - { - std::ostringstream oss; - if (flags & provider::flag::start_transaction) - oss << "start_transaction | "; - if (flags & provider::flag::commit) - oss << "commit | "; - if (flags & provider::flag::rollback) - oss << "rollback | "; - if (flags & provider::flag::isolation) - oss << "isolation | "; - if (flags & provider::flag::pa_unsafe) - oss << "pa_unsafe | "; - if (flags & provider::flag::snapshot) - oss << "snapshot | "; - - std::string ret(oss.str()); - if (ret.size() > 3) ret.erase(ret.size() - 3); - return ret; - } + std::string flags_to_string(int flags); static inline bool starts_transaction(int flags) { diff --git a/include/wsrep/seqno.hpp b/include/wsrep/seqno.hpp index 878133e..24bdd28 100644 --- a/include/wsrep/seqno.hpp +++ b/include/wsrep/seqno.hpp @@ -7,7 +7,7 @@ #include "exception.hpp" -#include +#include namespace wsrep { @@ -65,10 +65,7 @@ namespace wsrep long long seqno_; }; - static inline std::ostream& operator<<(std::ostream& os, wsrep::seqno seqno) - { - return (os << seqno.get()); - } + std::ostream& operator<<(std::ostream& os, wsrep::seqno seqno); } #endif // WSREP_SEQNO_HPP diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index bfc00db..756b147 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -2,6 +2,7 @@ // Copyright (C) 2018 Codership Oy // +/** @file transaction.hpp */ #ifndef WSREP_TRANSACTION_CONTEXT_HPP #define WSREP_TRANSACTION_CONTEXT_HPP diff --git a/include/wsrep/transaction_id.hpp b/include/wsrep/transaction_id.hpp index 1ef4488..9bbf19d 100644 --- a/include/wsrep/transaction_id.hpp +++ b/include/wsrep/transaction_id.hpp @@ -5,7 +5,7 @@ #ifndef WSREP_TRANSACTION_ID_HPP #define WSREP_TRANSACTION_ID_HPP -#include +#include namespace wsrep { diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 9b1cd10..e42ad46 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -7,6 +7,7 @@ add_library(wsrep-lib id.cpp logger.cpp provider.cpp + seqno.cpp server_state.cpp transaction.cpp wsrep_provider_v26.cpp) diff --git a/src/client_state.cpp b/src/client_state.cpp index 7fad9e4..31e1d58 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -185,6 +185,26 @@ wsrep::client_state::after_statement() return asr_success; } +int wsrep::client_state::enable_streaming( + enum wsrep::streaming_context::fragment_unit + fragment_unit, + size_t fragment_size) +{ + assert(mode_ == m_replicating); + if (transaction_.active() && + transaction_.streaming_context_.fragment_unit() != + fragment_unit) + { + wsrep::log_error() + << "Changing fragment unit for active transaction " + << "not allowed"; + return 1; + } + transaction_.streaming_context_.enable( + fragment_unit, fragment_size); + return 0; +} + // Private void wsrep::client_state::debug_log_state(const char* context) const diff --git a/src/provider.cpp b/src/provider.cpp index ea951fd..0cf3eeb 100644 --- a/src/provider.cpp +++ b/src/provider.cpp @@ -33,3 +33,24 @@ wsrep::provider* wsrep::provider::make_provider( } return 0; } + +std::string wsrep::flags_to_string(int flags) +{ + std::ostringstream oss; + if (flags & provider::flag::start_transaction) + oss << "start_transaction | "; + if (flags & provider::flag::commit) + oss << "commit | "; + if (flags & provider::flag::rollback) + oss << "rollback | "; + if (flags & provider::flag::isolation) + oss << "isolation | "; + if (flags & provider::flag::pa_unsafe) + oss << "pa_unsafe | "; + if (flags & provider::flag::snapshot) + oss << "snapshot | "; + + std::string ret(oss.str()); + if (ret.size() > 3) ret.erase(ret.size() - 3); + return ret; +} diff --git a/src/seqno.cpp b/src/seqno.cpp new file mode 100644 index 0000000..a3d1b2d --- /dev/null +++ b/src/seqno.cpp @@ -0,0 +1,11 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#include "wsrep/seqno.hpp" +#include + +std::ostream& wsrep::operator<<(std::ostream& os, wsrep::seqno seqno) +{ + return (os << seqno.get()); +}