1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Client context documentation, refactoring.

This commit is contained in:
Teemu Ollakka
2018-04-27 17:03:09 +03:00
parent 28026ea4fc
commit c1f8f2c37d
10 changed files with 303 additions and 195 deletions

View File

@ -74,14 +74,3 @@ int trrep::client_context::after_statement()
#endif // 0
return 0;
}
// TODO: This should be pure virtual method, implemented by
// DBMS integration or mock classes only.
int trrep::client_context::replay(
trrep::unique_lock<trrep::mutex>& lock,
trrep::transaction_context& tc)
{
tc.state(lock, trrep::transaction_context::s_replaying);
tc.state(lock, trrep::transaction_context::s_committed);
return 0;
}

View File

@ -113,67 +113,11 @@ namespace trrep
s_quitting
};
/*!
* Destructor.
*/
virtual ~client_context() { }
/*!
* Get reference to the client mutex.
*
* \return Reference to the client mutex.
*/
trrep::mutex& mutex() { return mutex_; }
/*!
* Get server context associated the the client session.
*
* \return Reference to server context.
*/
trrep::server_context& server_context() const
{ return server_context_; }
/*!
* Get reference to the Provider which is associated
* with the client context.
*
* \return Reference to the provider.
* \throw trrep::runtime_error if no providers are associated
* with the client context.
*/
trrep::provider& provider() const;
/*!
* Get Client identifier.
*
* \return Client Identifier
*/
client_id id() const { return id_; }
/*!
* Get Client mode.
*
* \todo Enforce mutex protection if called from other threads.
*
* \return Client mode.
*/
enum mode mode() const { return mode_; }
/*!
* Get Client state.
*
* \todo Enforce mutex protection if called from other threads.
*
* \return Client state
*/
enum state state() const { return state_; }
/*!
* Virtual method to return true if the client operates
* in two phase commit mode.
*
* \return True if two phase commit is required, false otherwise.
*/
virtual bool do_2pc() const = 0;
/*!
* Virtual method which should be called before the client
* starts processing the command received from the application.
@ -229,55 +173,50 @@ namespace trrep
virtual int after_statement();
/*!
* Append SR fragment to the transaction.
* Get reference to the client mutex.
*
* \return Reference to the client mutex.
*/
virtual int append_fragment(trrep::transaction_context&,
uint32_t, const trrep::data&)
{ return 0; }
trrep::mutex& mutex() { return mutex_; }
/*!
* Commit the transaction.
* Get server context associated the the client session.
*
* \return Reference to server context.
*/
virtual int commit(trrep::transaction_context&) = 0;
trrep::server_context& server_context() const
{ return server_context_; }
/*!
* Rollback the transaction.
* Get reference to the Provider which is associated
* with the client context.
*
* \return Reference to the provider.
* \throw trrep::runtime_error if no providers are associated
* with the client context.
*/
virtual int rollback(trrep::transaction_context&) = 0;
trrep::provider& provider() const;
virtual void will_replay(trrep::transaction_context&) { }
virtual int replay(trrep::unique_lock<trrep::mutex>&,
trrep::transaction_context& tc);
/*!
* Get Client identifier.
*
* \return Client Identifier
*/
client_id id() const { return id_; }
/*!
* Get Client mode.
*
* \todo Enforce mutex protection if called from other threads.
*
* \return Client mode.
*/
enum mode mode() const { return mode_; }
virtual int apply(trrep::transaction_context&,
const trrep::data&) = 0;
virtual void wait_for_replayers(trrep::unique_lock<trrep::mutex>&)
{ }
virtual int prepare_data_for_replication(
const trrep::transaction_context&, trrep::data& data)
{
static const char buf[1] = { 1 };
data.assign(buf, 1);
return 0;
}
virtual void override_error(const trrep::client_error&) { }
virtual bool killed() const { return 0; }
virtual void abort() const { ::abort(); }
virtual void store_globals() { }
// Debug helpers
virtual void debug_sync(const std::string&)
{
}
virtual void debug_suicide(const std::string&)
{
abort();
}
protected:
/*!
* Client context constuctor
* Client context constuctor. This is protected so that it
* can be called from derived class constructors only.
*/
client_context(trrep::mutex& mutex,
trrep::server_context& server_context,
@ -293,6 +232,105 @@ namespace trrep
{ }
private:
/*
* Friend declarations
*/
friend int server_context::on_apply(client_context&,
trrep::transaction_context&,
const trrep::data&);
friend class client_context_switch;
friend class transaction_context;
/*!
* Get Client state.
*
* \todo Enforce mutex protection if called from other threads.
*
* \return Client state
*/
enum state state() const { return state_; }
/*!
* Virtual method to return true if the client operates
* in two phase commit mode.
*
* \return True if two phase commit is required, false otherwise.
*/
virtual bool do_2pc() const = 0;
/*!
* Append SR fragment to the transaction.
*/
virtual int append_fragment(trrep::transaction_context&,
uint32_t, const trrep::data&)
{ return 0; }
/*!
* This method applies a write set give in data buffer.
* This must be implemented by the DBMS integration.
*
* \return Zero on success, non-zero on applying failure.
*/
virtual int apply(trrep::transaction_context& transaction,
const trrep::data& data) = 0;
/*!
* Virtual method which will be called
* in order to commit the transaction into
* storage engine.
*
* \return Zero on success, non-zero on failure.
*/
virtual int commit(trrep::transaction_context&) = 0;
/*!
* Rollback the transaction.
*
* This metod must be implemented by DBMS integration.
*
* \return Zero on success, no-zero on failure.
*/
virtual int rollback(trrep::transaction_context&) = 0;
/*!
* Notify a implementation that the client is about
* to replay the transaction.
*/
virtual void will_replay(trrep::transaction_context&) = 0;
/*!
* Replay the transaction.
*/
virtual int replay(trrep::unique_lock<trrep::mutex>&,
trrep::transaction_context& tc) = 0;
/*!
* Wait until all of the replaying transactions have been committed.
*/
virtual void wait_for_replayers(trrep::unique_lock<trrep::mutex>&) const = 0;
virtual int prepare_data_for_replication(
const trrep::transaction_context&, trrep::data& data)
{
static const char buf[1] = { 1 };
data.assign(buf, 1);
return 0;
}
/*!
*
*/
virtual void override_error(const trrep::client_error&) = 0;
virtual bool killed() const = 0;
virtual void abort() const = 0;
virtual void store_globals() = 0;
// Debug helpers
virtual void debug_sync(const std::string&) = 0;
virtual void debug_suicide(const std::string&) = 0;
void state(enum state state);
trrep::mutex& mutex_;

View File

@ -7,6 +7,8 @@
#include "lock.hpp"
#include <cstdlib>
namespace trrep
{
class condition_variable
@ -23,7 +25,7 @@ namespace trrep
};
// Default pthreads based condition variable implementation
class default_condition_variable : condition_variable
class default_condition_variable : public condition_variable
{
public:
default_condition_variable()

View File

@ -110,6 +110,7 @@ public:
const std::string& id,
const std::string& address)
: trrep::server_context(mutex_,
cond_,
name, id, address, name + "_data",
trrep::server_context::rm_async)
, simulator_(simulator)
@ -140,46 +141,6 @@ public:
appliers_.erase(appliers_.begin());
}
void on_connect()
{
std::cerr << "dbms_server: connected" << "\n";
trrep::unique_lock<trrep::mutex> lock(mutex_);
state_ = s_connected;
cond_.notify_all();
}
void on_view(const trrep::view& view)
{
std::cerr << "================================================\nView:\n"
<< "id: " << view.id() << "\n"
<< "status: " << view.status() << "\n"
<< "own_index: " << view.own_index() << "\n"
<< "final: " << view.final() << "\n"
<< "members: \n";
auto members(view.members());
for (const auto& m : members)
{
std::cerr << "id: " << m.id() << " "
<< "name: " << m.name() << "\n";
}
std::cerr << "=================================================\n";
trrep::unique_lock<trrep::mutex> lock(mutex_);
if (view.final())
{
state_ = s_disconnected;
cond_.notify_all();
}
}
void on_sync()
{
std::cerr << "Synced with group" << "\n";
trrep::unique_lock<trrep::mutex> lock(mutex_);
state_ = s_synced;
cond_.notify_all();
}
bool sst_before_init() const override { return false; }
std::string on_sst_request()
{
@ -198,32 +159,6 @@ public:
provider().sst_sent(gtid, 0);
}
void wait_until_state(enum state state)
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
while (state_ != state)
{
cond_.wait(lock);
}
}
void wait_until_connected()
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
while (state_ != s_connected)
{
cond_.wait(lock);
}
}
void wait_until_disconnected()
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
while (state_ != s_disconnected)
{
cond_.wait(lock);
}
}
// Client context management
trrep::client_context* local_client_context();
@ -277,13 +212,14 @@ public:
}
}
bool do_2pc() const { return false; }
int apply(trrep::transaction_context&, const trrep::data&)
private:
bool do_2pc() const override { return false; }
int apply(trrep::transaction_context&, const trrep::data&) override
{
// std::cerr << "applying" << "\n";
return 0;
}
int commit(trrep::transaction_context& transaction_context)
int commit(trrep::transaction_context& transaction_context) override
{
int ret(0);
ret = transaction_context.before_commit();
@ -292,7 +228,7 @@ public:
// std::cerr << "commit" << "\n";
return 0;
}
int rollback(trrep::transaction_context& transaction_context)
int rollback(trrep::transaction_context& transaction_context) override
{
std::cerr << "rollback: " << transaction_context.id().get()
<< "state: " << trrep::to_string(transaction_context.state())
@ -301,7 +237,23 @@ public:
transaction_context.after_rollback();
return 0;
}
private:
void will_replay(trrep::transaction_context&) override { }
int replay(trrep::unique_lock<trrep::mutex>& lock,
trrep::transaction_context& tc) override
{
tc.state(lock, trrep::transaction_context::s_replaying);
tc.state(lock, trrep::transaction_context::s_committed);
return 0;
}
void wait_for_replayers(trrep::unique_lock<trrep::mutex>&) const override
{ }
void override_error(const trrep::client_error&) override { }
bool killed() const override { return false; }
void abort() const override { ::abort(); }
void store_globals() override { }
void debug_sync(const std::string&) override { }
void debug_suicide(const std::string&) override { }
void run_one_transaction()
{
@ -448,8 +400,7 @@ void dbms_simulator::start()
throw trrep::runtime_error("Failed to connect");
}
server.start_applier();
server.wait_until_connected();
server.wait_until_state(dbms_server::s_synced);
server.wait_until_state(trrep::server_context::s_synced);
}
// Start client threads
@ -487,7 +438,7 @@ void dbms_simulator::stop()
});
server.provider().disconnect();
server.wait_until_disconnected();
server.wait_until_state(trrep::server_context::s_disconnected);
server.stop_applier();
}
}

View File

@ -7,6 +7,7 @@
#include "client_context.hpp"
#include "mutex.hpp"
#include "compiler.hpp"
namespace trrep
{
@ -28,6 +29,25 @@ namespace trrep
int commit(trrep::transaction_context&);
int rollback(trrep::transaction_context&);
bool do_2pc() const { return do_2pc_; }
void will_replay(trrep::transaction_context&) TRREP_OVERRIDE { }
int replay(trrep::unique_lock<trrep::mutex>& lock,
trrep::transaction_context& tc) TRREP_OVERRIDE
{
tc.state(lock, trrep::transaction_context::s_replaying);
tc.state(lock, trrep::transaction_context::s_committed);
return 0;
}
void wait_for_replayers(trrep::unique_lock<trrep::mutex>&) const
TRREP_OVERRIDE { }
void override_error(const trrep::client_error&) TRREP_OVERRIDE { }
bool killed() const TRREP_OVERRIDE { return false; }
void abort() const TRREP_OVERRIDE { }
void store_globals() TRREP_OVERRIDE { }
void debug_sync(const std::string&) TRREP_OVERRIDE { }
void debug_suicide(const std::string&) TRREP_OVERRIDE
{
::abort();
}
// Mock state modifiers
void fail_next_applying(bool fail_next_applying)

View File

@ -19,8 +19,10 @@ namespace trrep
mock_server_context(const std::string& name,
const std::string& id,
enum trrep::server_context::rollback_mode rollback_mode)
: trrep::server_context(mutex_, name, id, "", "./", rollback_mode)
: trrep::server_context(mutex_, cond_,
name, id, "", "./", rollback_mode)
, mutex_()
, cond_()
, provider_()
, last_client_id_(0)
{ }
@ -47,6 +49,7 @@ namespace trrep
private:
trrep::default_mutex mutex_;
trrep::default_condition_variable cond_;
mutable trrep::mock_provider provider_;
unsigned long long last_client_id_;
};

View File

@ -215,6 +215,55 @@ void trrep::server_context::sst_received(const wsrep_gtid_t& gtid)
provider_->sst_received(gtid, 0);
}
void trrep::server_context::wait_until_state(
enum trrep::server_context::state state) const
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
while (state_ != state)
{
cond_.wait(lock);
}
}
void trrep::server_context::on_connect()
{
std::cout << "Server " << name_ << " connected to cluster" << "\n";
trrep::unique_lock<trrep::mutex> lock(mutex_);
state_ = s_connected;
cond_.notify_all();
}
void trrep::server_context::on_view(const trrep::view& view)
{
std::cout << "================================================\nView:\n"
<< "id: " << view.id() << "\n"
<< "status: " << view.status() << "\n"
<< "own_index: " << view.own_index() << "\n"
<< "final: " << view.final() << "\n"
<< "members: \n";
const std::vector<trrep::view::member>& members(view.members());
for (std::vector<trrep::view::member>::const_iterator i(members.begin());
i != members.end(); ++i)
{
std::cout << "id: " << i->id() << " "
<< "name: " << i->name() << "\n";
}
std::cout << "=================================================\n";
trrep::unique_lock<trrep::mutex> lock(mutex_);
if (view.final())
{
state_ = s_disconnected;
cond_.notify_all();
}
}
void trrep::server_context::on_sync()
{
std::cout << "Synced with group" << "\n";
trrep::unique_lock<trrep::mutex> lock(mutex_);
state_ = s_synced;
cond_.notify_all();
}
int trrep::server_context::on_apply(
trrep::client_context& client_context,

View File

@ -67,6 +67,7 @@
#include "exception.hpp"
#include "mutex.hpp"
#include "condition_variable.hpp"
#include "wsrep_api.h"
#include <string>
@ -216,37 +217,43 @@ namespace trrep
return *provider_;
}
/*!
* Virtual method which will be called when the server
* has been joined to the cluster. Must be provided by
* the implementation.
*/
virtual void on_connect() = 0;
/*!
* Wait until the server has connected to the cluster.
*
* \todo This should not be pure virtual method,
* this base class should provide the server state
* machine and proper synchronization.
* \todo Document overriding.
*/
virtual void wait_until_connected() = 0;
virtual void on_connect();
/*!
* Virtual method which will be called when a view
* notification event has been delivered by the
* provider.
*
* \todo Document overriding.
*
* \params view trrep::view object which holds the new view
* information.
*/
virtual void on_view(const trrep::view& view) = 0;
virtual void on_view(const trrep::view& view);
/*!
* Virtual method which will be called when the server
* has been synchronized with the cluster.
*
* \todo Document overriding.
*/
virtual void on_sync() = 0;
virtual void on_sync();
/*!
* Wait until server reaches given state.
*
* \todo Waiting for transitional states may not be reliable.
*/
void wait_until_state(trrep::server_context::state) const;
/*!
* Virtual method to return true if the configured SST
@ -336,12 +343,15 @@ namespace trrep
* \param rollback_mode Rollback mode which server operates on.
*/
server_context(trrep::mutex& mutex,
trrep::condition_variable& cond,
const std::string& name,
const std::string& id,
const std::string& address,
const std::string& working_dir,
enum rollback_mode rollback_mode)
: mutex_(mutex)
, cond_(cond)
, state_(s_disconnected)
, provider_()
, name_(name)
, id_(id)
@ -356,6 +366,8 @@ namespace trrep
server_context& operator=(const server_context&);
trrep::mutex& mutex_;
trrep::condition_variable& cond_;
enum state state_;
trrep::provider* provider_;
std::string name_;
std::string id_;