diff --git a/README.md b/README.md index 47d3b7c..5b13697 100644 --- a/README.md +++ b/README.md @@ -199,3 +199,47 @@ Methods: * apply() +# Example usage + +``` +class dbms_server : public trrep::server_context +{ + public: + // Implementation + void start() + { + // Initialize server components, start accepting client connections + } +}; + +class dbms_client : public trrep::client_context +{ + // Implementation +}; + +int main() +{ + dbms_server server(name, id); + + if (server.sst_before_init()) + { + server.start(); + server.wait_until_state(trrep::server_context::s_joined); + + } + // Initialize dbms code here + if (server.sst_before_init() == false) + { + server.start(); + server.wait_until_state(trrep::server_context::s_joined); + } + + // Start accepting client connections + server.wait_until_state(trrep::server_context::s_synced); + + // Clients can read and write here + + +} + +``` \ No newline at end of file diff --git a/doc/Doxyfile b/doc/Doxyfile index 4d3808f..8243a4a 100644 --- a/doc/Doxyfile +++ b/doc/Doxyfile @@ -424,7 +424,7 @@ EXTRACT_ALL = NO # be included in the documentation. # The default value is: NO. -EXTRACT_PRIVATE = NO +EXTRACT_PRIVATE = YES # If the EXTRACT_PACKAGE tag is set to YES, all members with package or internal # scope will be included in the documentation. diff --git a/src/client_context.cpp b/src/client_context.cpp index 3c445ca..5581249 100644 --- a/src/client_context.cpp +++ b/src/client_context.cpp @@ -74,14 +74,3 @@ int trrep::client_context::after_statement() #endif // 0 return 0; } - -// TODO: This should be pure virtual method, implemented by -// DBMS integration or mock classes only. -int trrep::client_context::replay( - trrep::unique_lock& lock, - trrep::transaction_context& tc) -{ - tc.state(lock, trrep::transaction_context::s_replaying); - tc.state(lock, trrep::transaction_context::s_committed); - return 0; -} diff --git a/src/client_context.hpp b/src/client_context.hpp index 9a7124d..821ab57 100644 --- a/src/client_context.hpp +++ b/src/client_context.hpp @@ -113,67 +113,11 @@ namespace trrep s_quitting }; - + /*! + * Destructor. + */ virtual ~client_context() { } - /*! - * Get reference to the client mutex. - * - * \return Reference to the client mutex. - */ - trrep::mutex& mutex() { return mutex_; } - - /*! - * Get server context associated the the client session. - * - * \return Reference to server context. - */ - trrep::server_context& server_context() const - { return server_context_; } - - /*! - * Get reference to the Provider which is associated - * with the client context. - * - * \return Reference to the provider. - * \throw trrep::runtime_error if no providers are associated - * with the client context. - */ - trrep::provider& provider() const; - - /*! - * Get Client identifier. - * - * \return Client Identifier - */ - client_id id() const { return id_; } - - /*! - * Get Client mode. - * - * \todo Enforce mutex protection if called from other threads. - * - * \return Client mode. - */ - enum mode mode() const { return mode_; } - - /*! - * Get Client state. - * - * \todo Enforce mutex protection if called from other threads. - * - * \return Client state - */ - enum state state() const { return state_; } - - /*! - * Virtual method to return true if the client operates - * in two phase commit mode. - * - * \return True if two phase commit is required, false otherwise. - */ - virtual bool do_2pc() const = 0; - /*! * Virtual method which should be called before the client * starts processing the command received from the application. @@ -229,55 +173,50 @@ namespace trrep virtual int after_statement(); /*! - * Append SR fragment to the transaction. + * Get reference to the client mutex. + * + * \return Reference to the client mutex. */ - virtual int append_fragment(trrep::transaction_context&, - uint32_t, const trrep::data&) - { return 0; } + trrep::mutex& mutex() { return mutex_; } /*! - * Commit the transaction. + * Get server context associated the the client session. + * + * \return Reference to server context. */ - virtual int commit(trrep::transaction_context&) = 0; + trrep::server_context& server_context() const + { return server_context_; } /*! - * Rollback the transaction. + * Get reference to the Provider which is associated + * with the client context. + * + * \return Reference to the provider. + * \throw trrep::runtime_error if no providers are associated + * with the client context. */ - virtual int rollback(trrep::transaction_context&) = 0; + trrep::provider& provider() const; - virtual void will_replay(trrep::transaction_context&) { } - virtual int replay(trrep::unique_lock&, - trrep::transaction_context& tc); + /*! + * Get Client identifier. + * + * \return Client Identifier + */ + client_id id() const { return id_; } + /*! + * Get Client mode. + * + * \todo Enforce mutex protection if called from other threads. + * + * \return Client mode. + */ + enum mode mode() const { return mode_; } - virtual int apply(trrep::transaction_context&, - const trrep::data&) = 0; - - virtual void wait_for_replayers(trrep::unique_lock&) - { } - virtual int prepare_data_for_replication( - const trrep::transaction_context&, trrep::data& data) - { - static const char buf[1] = { 1 }; - data.assign(buf, 1); - return 0; - } - virtual void override_error(const trrep::client_error&) { } - virtual bool killed() const { return 0; } - virtual void abort() const { ::abort(); } - virtual void store_globals() { } - // Debug helpers - virtual void debug_sync(const std::string&) - { - - } - virtual void debug_suicide(const std::string&) - { - abort(); - } protected: /*! - * Client context constuctor + * Client context constuctor. This is protected so that it + * can be called from derived class constructors only. */ client_context(trrep::mutex& mutex, trrep::server_context& server_context, @@ -293,6 +232,105 @@ namespace trrep { } private: + /* + * Friend declarations + */ + friend int server_context::on_apply(client_context&, + trrep::transaction_context&, + const trrep::data&); + friend class client_context_switch; + friend class transaction_context; + + /*! + * Get Client state. + * + * \todo Enforce mutex protection if called from other threads. + * + * \return Client state + */ + enum state state() const { return state_; } + + /*! + * Virtual method to return true if the client operates + * in two phase commit mode. + * + * \return True if two phase commit is required, false otherwise. + */ + virtual bool do_2pc() const = 0; + + + /*! + * Append SR fragment to the transaction. + */ + virtual int append_fragment(trrep::transaction_context&, + uint32_t, const trrep::data&) + { return 0; } + + + /*! + * This method applies a write set give in data buffer. + * This must be implemented by the DBMS integration. + * + * \return Zero on success, non-zero on applying failure. + */ + virtual int apply(trrep::transaction_context& transaction, + const trrep::data& data) = 0; + + /*! + * Virtual method which will be called + * in order to commit the transaction into + * storage engine. + * + * \return Zero on success, non-zero on failure. + */ + virtual int commit(trrep::transaction_context&) = 0; + + /*! + * Rollback the transaction. + * + * This metod must be implemented by DBMS integration. + * + * \return Zero on success, no-zero on failure. + */ + virtual int rollback(trrep::transaction_context&) = 0; + + /*! + * Notify a implementation that the client is about + * to replay the transaction. + */ + virtual void will_replay(trrep::transaction_context&) = 0; + + /*! + * Replay the transaction. + */ + virtual int replay(trrep::unique_lock&, + trrep::transaction_context& tc) = 0; + + + /*! + * Wait until all of the replaying transactions have been committed. + */ + virtual void wait_for_replayers(trrep::unique_lock&) const = 0; + + virtual int prepare_data_for_replication( + const trrep::transaction_context&, trrep::data& data) + { + static const char buf[1] = { 1 }; + data.assign(buf, 1); + return 0; + } + + /*! + * + */ + virtual void override_error(const trrep::client_error&) = 0; + virtual bool killed() const = 0; + virtual void abort() const = 0; + virtual void store_globals() = 0; + // Debug helpers + virtual void debug_sync(const std::string&) = 0; + virtual void debug_suicide(const std::string&) = 0; + void state(enum state state); trrep::mutex& mutex_; diff --git a/src/condition_variable.hpp b/src/condition_variable.hpp index f5579d0..92544f9 100644 --- a/src/condition_variable.hpp +++ b/src/condition_variable.hpp @@ -7,6 +7,8 @@ #include "lock.hpp" +#include + namespace trrep { class condition_variable @@ -23,7 +25,7 @@ namespace trrep }; // Default pthreads based condition variable implementation - class default_condition_variable : condition_variable + class default_condition_variable : public condition_variable { public: default_condition_variable() diff --git a/src/dbms_simulator.cpp b/src/dbms_simulator.cpp index ea66505..3c1ead8 100644 --- a/src/dbms_simulator.cpp +++ b/src/dbms_simulator.cpp @@ -110,6 +110,7 @@ public: const std::string& id, const std::string& address) : trrep::server_context(mutex_, + cond_, name, id, address, name + "_data", trrep::server_context::rm_async) , simulator_(simulator) @@ -140,46 +141,6 @@ public: appliers_.erase(appliers_.begin()); } - void on_connect() - { - std::cerr << "dbms_server: connected" << "\n"; - trrep::unique_lock lock(mutex_); - state_ = s_connected; - cond_.notify_all(); - } - - void on_view(const trrep::view& view) - { - std::cerr << "================================================\nView:\n" - << "id: " << view.id() << "\n" - << "status: " << view.status() << "\n" - << "own_index: " << view.own_index() << "\n" - << "final: " << view.final() << "\n" - << "members: \n"; - auto members(view.members()); - for (const auto& m : members) - { - std::cerr << "id: " << m.id() << " " - << "name: " << m.name() << "\n"; - - } - std::cerr << "=================================================\n"; - trrep::unique_lock lock(mutex_); - if (view.final()) - { - state_ = s_disconnected; - cond_.notify_all(); - } - } - - void on_sync() - { - std::cerr << "Synced with group" << "\n"; - trrep::unique_lock lock(mutex_); - state_ = s_synced; - cond_.notify_all(); - } - bool sst_before_init() const override { return false; } std::string on_sst_request() { @@ -198,32 +159,6 @@ public: provider().sst_sent(gtid, 0); } - void wait_until_state(enum state state) - { - trrep::unique_lock lock(mutex_); - while (state_ != state) - { - cond_.wait(lock); - } - } - void wait_until_connected() - { - trrep::unique_lock lock(mutex_); - while (state_ != s_connected) - { - cond_.wait(lock); - } - } - - void wait_until_disconnected() - { - trrep::unique_lock lock(mutex_); - while (state_ != s_disconnected) - { - cond_.wait(lock); - } - } - // Client context management trrep::client_context* local_client_context(); @@ -277,13 +212,14 @@ public: } } - bool do_2pc() const { return false; } - int apply(trrep::transaction_context&, const trrep::data&) +private: + bool do_2pc() const override { return false; } + int apply(trrep::transaction_context&, const trrep::data&) override { // std::cerr << "applying" << "\n"; return 0; } - int commit(trrep::transaction_context& transaction_context) + int commit(trrep::transaction_context& transaction_context) override { int ret(0); ret = transaction_context.before_commit(); @@ -292,7 +228,7 @@ public: // std::cerr << "commit" << "\n"; return 0; } - int rollback(trrep::transaction_context& transaction_context) + int rollback(trrep::transaction_context& transaction_context) override { std::cerr << "rollback: " << transaction_context.id().get() << "state: " << trrep::to_string(transaction_context.state()) @@ -301,7 +237,23 @@ public: transaction_context.after_rollback(); return 0; } -private: + + void will_replay(trrep::transaction_context&) override { } + int replay(trrep::unique_lock& lock, + trrep::transaction_context& tc) override + { + tc.state(lock, trrep::transaction_context::s_replaying); + tc.state(lock, trrep::transaction_context::s_committed); + return 0; + } + void wait_for_replayers(trrep::unique_lock&) const override + { } + void override_error(const trrep::client_error&) override { } + bool killed() const override { return false; } + void abort() const override { ::abort(); } + void store_globals() override { } + void debug_sync(const std::string&) override { } + void debug_suicide(const std::string&) override { } void run_one_transaction() { @@ -448,8 +400,7 @@ void dbms_simulator::start() throw trrep::runtime_error("Failed to connect"); } server.start_applier(); - server.wait_until_connected(); - server.wait_until_state(dbms_server::s_synced); + server.wait_until_state(trrep::server_context::s_synced); } // Start client threads @@ -487,7 +438,7 @@ void dbms_simulator::stop() }); server.provider().disconnect(); - server.wait_until_disconnected(); + server.wait_until_state(trrep::server_context::s_disconnected); server.stop_applier(); } } diff --git a/src/mock_client_context.hpp b/src/mock_client_context.hpp index 8136716..9339843 100644 --- a/src/mock_client_context.hpp +++ b/src/mock_client_context.hpp @@ -7,6 +7,7 @@ #include "client_context.hpp" #include "mutex.hpp" +#include "compiler.hpp" namespace trrep { @@ -28,6 +29,25 @@ namespace trrep int commit(trrep::transaction_context&); int rollback(trrep::transaction_context&); bool do_2pc() const { return do_2pc_; } + void will_replay(trrep::transaction_context&) TRREP_OVERRIDE { } + int replay(trrep::unique_lock& lock, + trrep::transaction_context& tc) TRREP_OVERRIDE + { + tc.state(lock, trrep::transaction_context::s_replaying); + tc.state(lock, trrep::transaction_context::s_committed); + return 0; + } + void wait_for_replayers(trrep::unique_lock&) const + TRREP_OVERRIDE { } + void override_error(const trrep::client_error&) TRREP_OVERRIDE { } + bool killed() const TRREP_OVERRIDE { return false; } + void abort() const TRREP_OVERRIDE { } + void store_globals() TRREP_OVERRIDE { } + void debug_sync(const std::string&) TRREP_OVERRIDE { } + void debug_suicide(const std::string&) TRREP_OVERRIDE + { + ::abort(); + } // Mock state modifiers void fail_next_applying(bool fail_next_applying) diff --git a/src/mock_server_context.hpp b/src/mock_server_context.hpp index e03ab43..0621f61 100644 --- a/src/mock_server_context.hpp +++ b/src/mock_server_context.hpp @@ -19,8 +19,10 @@ namespace trrep mock_server_context(const std::string& name, const std::string& id, enum trrep::server_context::rollback_mode rollback_mode) - : trrep::server_context(mutex_, name, id, "", "./", rollback_mode) + : trrep::server_context(mutex_, cond_, + name, id, "", "./", rollback_mode) , mutex_() + , cond_() , provider_() , last_client_id_(0) { } @@ -47,6 +49,7 @@ namespace trrep private: trrep::default_mutex mutex_; + trrep::default_condition_variable cond_; mutable trrep::mock_provider provider_; unsigned long long last_client_id_; }; diff --git a/src/server_context.cpp b/src/server_context.cpp index a469380..361a872 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -215,6 +215,55 @@ void trrep::server_context::sst_received(const wsrep_gtid_t& gtid) provider_->sst_received(gtid, 0); } +void trrep::server_context::wait_until_state( + enum trrep::server_context::state state) const +{ + trrep::unique_lock lock(mutex_); + while (state_ != state) + { + cond_.wait(lock); + } +} + +void trrep::server_context::on_connect() +{ + std::cout << "Server " << name_ << " connected to cluster" << "\n"; + trrep::unique_lock lock(mutex_); + state_ = s_connected; + cond_.notify_all(); +} + +void trrep::server_context::on_view(const trrep::view& view) +{ + std::cout << "================================================\nView:\n" + << "id: " << view.id() << "\n" + << "status: " << view.status() << "\n" + << "own_index: " << view.own_index() << "\n" + << "final: " << view.final() << "\n" + << "members: \n"; + const std::vector& members(view.members()); + for (std::vector::const_iterator i(members.begin()); + i != members.end(); ++i) + { + std::cout << "id: " << i->id() << " " + << "name: " << i->name() << "\n"; + } + std::cout << "=================================================\n"; + trrep::unique_lock lock(mutex_); + if (view.final()) + { + state_ = s_disconnected; + cond_.notify_all(); + } +} + +void trrep::server_context::on_sync() +{ + std::cout << "Synced with group" << "\n"; + trrep::unique_lock lock(mutex_); + state_ = s_synced; + cond_.notify_all(); +} int trrep::server_context::on_apply( trrep::client_context& client_context, diff --git a/src/server_context.hpp b/src/server_context.hpp index 4fa1a5c..d48c3cf 100644 --- a/src/server_context.hpp +++ b/src/server_context.hpp @@ -67,6 +67,7 @@ #include "exception.hpp" #include "mutex.hpp" +#include "condition_variable.hpp" #include "wsrep_api.h" #include @@ -216,37 +217,43 @@ namespace trrep return *provider_; } + + /*! * Virtual method which will be called when the server * has been joined to the cluster. Must be provided by * the implementation. - */ - virtual void on_connect() = 0; - - /*! - * Wait until the server has connected to the cluster. * - * \todo This should not be pure virtual method, - * this base class should provide the server state - * machine and proper synchronization. + * \todo Document overriding. */ - virtual void wait_until_connected() = 0; + virtual void on_connect(); /*! * Virtual method which will be called when a view * notification event has been delivered by the * provider. * + * \todo Document overriding. + * * \params view trrep::view object which holds the new view * information. */ - virtual void on_view(const trrep::view& view) = 0; + virtual void on_view(const trrep::view& view); /*! * Virtual method which will be called when the server * has been synchronized with the cluster. + * + * \todo Document overriding. */ - virtual void on_sync() = 0; + virtual void on_sync(); + + /*! + * Wait until server reaches given state. + * + * \todo Waiting for transitional states may not be reliable. + */ + void wait_until_state(trrep::server_context::state) const; /*! * Virtual method to return true if the configured SST @@ -336,12 +343,15 @@ namespace trrep * \param rollback_mode Rollback mode which server operates on. */ server_context(trrep::mutex& mutex, + trrep::condition_variable& cond, const std::string& name, const std::string& id, const std::string& address, const std::string& working_dir, enum rollback_mode rollback_mode) : mutex_(mutex) + , cond_(cond) + , state_(s_disconnected) , provider_() , name_(name) , id_(id) @@ -356,6 +366,8 @@ namespace trrep server_context& operator=(const server_context&); trrep::mutex& mutex_; + trrep::condition_variable& cond_; + enum state state_; trrep::provider* provider_; std::string name_; std::string id_;