1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-08-06 15:02:41 +03:00

Initial documentation for server context.

This commit is contained in:
Teemu Ollakka
2018-04-27 10:33:53 +03:00
parent 8f45d88190
commit f3afb6306d
6 changed files with 284 additions and 77 deletions

View File

@@ -9,13 +9,10 @@ include(CTest)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Werror -Weffc++ -Woverloaded-virtual -Wno-non-virtual-dtor -g") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Werror -Weffc++ -Woverloaded-virtual -Wno-non-virtual-dtor -g")
if (NOT WITH_WSREP_API) check_include_file("${CMAKE_CURRENT_SOURCE_DIR}/wsrep/wsrep_api.h" HAVE_WSREP_API_HPP)
message(ERROR "Wsrep API location must be provided") include_directories("${CMAKE_CURRENT_SOURCE_DIR}/wsrep")
else() link_directories("${CMAKE_CURRENT_SOURCE_DIR}/wsrep")
check_include_file("${WITH_WSREP_API}/wsrep_api.h" HAVE_WSREP_API_HPP)
include_directories("${WITH_WSREP_API}")
link_directories("${WITH_WSREP_API}")
endif()
find_package(Boost 1.54.0 REQUIRED find_package(Boost 1.54.0 REQUIRED
unit_test_framework unit_test_framework
@@ -52,5 +49,4 @@ if (WITH_DOCUMENTATION)
VERBATIM) VERBATIM)
endif() endif()
add_subdirectory(src) add_subdirectory(src)

View File

@@ -28,6 +28,19 @@ The rest of the document dscribes the proposed API in more detail
and should be replaced with automatically generated documentation and should be replaced with automatically generated documentation
later on (doxygen). later on (doxygen).
Doxygen documentation style:
* Use Qt style for documentation comment blocks
```
/*!
* A documentation comment block
*/
```
* For regular code comments C++ style is preferred
```
// This is a code comment
```
# High Level Design # High Level Design
## Provider ## Provider

View File

@@ -195,10 +195,6 @@ public:
{ {
provider().sst_sent(gtid, 0); provider().sst_sent(gtid, 0);
} }
void sst_received(const wsrep_gtid_t& gtid)
{
provider().sst_received(gtid, 0);
}
void wait_until_state(enum state state) void wait_until_state(enum state state)
{ {
@@ -263,13 +259,19 @@ public:
, mutex_() , mutex_()
, server_(server) , server_(server)
, n_transactions_(n_transactions) , n_transactions_(n_transactions)
, result_()
{ } { }
~dbms_client()
{
std::cout << "Result: " << result_;
}
void start() void start()
{ {
for (size_t i(0); i < n_transactions_; ++i) for (size_t i(0); i < n_transactions_; ++i)
{ {
run_one_transaction(); run_one_transaction();
report_progress(i + 1);
} }
} }
@@ -304,18 +306,19 @@ private:
trrep::transaction_context trx(*this); trrep::transaction_context trx(*this);
trx.start_transaction(server_.next_transaction_id()); trx.start_transaction(server_.next_transaction_id());
std::ostringstream os; std::ostringstream os;
os << trx.id().get(); int err(0);
for (int i(0); i < 1 && err == 0; ++i)
{
int data(std::rand() % 10000000);
os << data;
trrep::key key; trrep::key key;
key.append_key_part("dbms", 4); key.append_key_part("dbms", 4);
wsrep_conn_id_t client_id(id().get()); wsrep_conn_id_t client_id(id().get());
key.append_key_part(&client_id, sizeof(client_id)); key.append_key_part(&client_id, sizeof(client_id));
wsrep_trx_id_t trx_id(trx.id().get()); key.append_key_part(&data, sizeof(data));
int err(0);
key.append_key_part(&trx_id, sizeof(trx_id));
err = trx.append_key(key); err = trx.append_key(key);
// std::cout << "append_key: " << err << "\n";
err = err || trx.append_data(trrep::data(os.str().c_str(), os.str().size())); err = err || trx.append_data(trrep::data(os.str().c_str(), os.str().size()));
}
// std::cout << "append_data: " << err << "\n"; // std::cout << "append_data: " << err << "\n";
if (do_2pc()) if (do_2pc())
{ {
@@ -331,13 +334,22 @@ private:
err = err || trx.after_commit(); err = err || trx.after_commit();
// std::cout << "after_commit: " << err << "\n"; // std::cout << "after_commit: " << err << "\n";
trx.after_statement(); trx.after_statement();
} }
void report_progress(size_t i) const
{
if ((i % 100) == 0)
{
std::cout << "client: " << id().get()
<< " transactions: " << i
<< " " << 100*double(i)/n_transactions_ << "%"
<< std::endl;
}
}
trrep::default_mutex mutex_; trrep::default_mutex mutex_;
dbms_server& server_; dbms_server& server_;
const size_t n_transactions_; const size_t n_transactions_;
size_t result_;
}; };
@@ -439,7 +451,7 @@ void dbms_simulator::start()
} }
// Start client threads // Start client threads
std::cout << "####################### Starting client load" << "\n";
clients_start_ = std::chrono::steady_clock::now(); clients_start_ = std::chrono::steady_clock::now();
for (auto& i : servers_) for (auto& i : servers_)
{ {
@@ -456,6 +468,11 @@ void dbms_simulator::stop()
server.stop_clients(); server.stop_clients();
} }
clients_stop_ = std::chrono::steady_clock::now(); clients_stop_ = std::chrono::steady_clock::now();
std::cout << "######## Stats ############\n";
std::cout << stats();
std::cout << "######## Stats ############\n";
// REMOVEME: Temporary shortcut
exit(0);
for (auto& i : servers_) for (auto& i : servers_)
{ {
dbms_server& server(*i.second); dbms_server& server(*i.second);
@@ -497,7 +514,11 @@ void dbms_simulator::donate_sst(dbms_server& server,
std::string dbms_simulator::build_cluster_address() const std::string dbms_simulator::build_cluster_address() const
{ {
std::string ret; std::string ret;
// std::string ret("gcomm://"); if (params_.wsrep_provider.find("galera_smm") != std::string::npos)
{
ret += "gcomm://";
}
for (size_t i(0); i < params_.n_servers; ++i) for (size_t i(0); i < params_.n_servers; ++i)
{ {
std::ostringstream sa_os; std::ostringstream sa_os;
@@ -543,8 +564,15 @@ int main(int argc, char** argv)
} }
dbms_simulator sim(params); dbms_simulator sim(params);
try
{
sim.start(); sim.start();
sim.stop(); sim.stop();
}
catch (const std::exception& e)
{
std::cerr << "Caught exception: " << e.what();
}
stats = sim.stats(); stats = sim.stats();
} }
catch (const std::exception& e) catch (const std::exception& e)

View File

@@ -210,6 +210,11 @@ trrep::server_context::~server_context()
delete provider_; delete provider_;
} }
void trrep::server_context::sst_received(const wsrep_gtid_t& gtid)
{
provider_->sst_received(gtid, 0);
}
int trrep::server_context::on_apply( int trrep::server_context::on_apply(
trrep::client_context& client_context, trrep::client_context& client_context,
@@ -247,3 +252,11 @@ int trrep::server_context::on_apply(
} }
return ret; return ret;
} }
bool trrep::server_context::statement_allowed_for_streaming(
const trrep::client_context&,
const trrep::transaction_context&) const
{
/* Streaming not implemented yet. */
return false;
}

View File

@@ -2,6 +2,57 @@
// Copyright (C) 2018 Codership Oy <info@codership.com> // Copyright (C) 2018 Codership Oy <info@codership.com>
// //
/*! \file server_context.hpp
*
* Server Context Abstraction
* ==========================
*
* This file defines an interface for TRRep Server Context.
* The Server Context will encapsulate server identification,
* server state and server capabilities. The class also
* defines an interface for manipulating server state, applying
* of remote transaction write sets, processing SST requests,
* creating local client connections for local storage access
* operations.
*
* Concepts
* ========
*
* State Snapshot Transfer
* -----------------------
*
* TODO
*
* Rollback Mode
* -------------
*
* When High Prioity Transaction (HTP) write set is applied, it
* may be required that the HTP Brute Force Aborts (BFA) locally
* executing transaction. As HTP must be able to apply all its
* write sets without interruption, the locally executing transaction
* must yield immediately, otherwise a transaction processing
* may stop or even deadlock. Depending on DBMS implementation,
* the local transaction may need to be rolled back immediately
* (synchronous mode) or the rollback may happen later on
* (asynchronous mode). The Server Context implementation
* which derives from Server Context base class must provide
* the base class the rollback mode which server operates on.
*
* ### Synchronous
*
* If the DBMS server implementation does not allow asynchronous rollback,
* the victim transaction must be rolled back immediately in order to
* allow transaction processing to proceed. Depending on DBMS process model,
* there may be either background thread which processes the rollback
* or the rollback can be done by the HTP applier.
*
* ### Asynchronous
*
* In asynchronous mode the BFA victim transaction is just marked
* to be aborted or in case of fully optimistic concurrency control,
* the conflict is detected at commit.
*/
#ifndef TRREP_SERVER_CONTEXT_HPP #ifndef TRREP_SERVER_CONTEXT_HPP
#define TRREP_SERVER_CONTEXT_HPP #define TRREP_SERVER_CONTEXT_HPP
@@ -20,16 +71,36 @@ namespace trrep
class view; class view;
class data; class data;
/*! \class Server Context
*
*
*/
class server_context class server_context
{ {
public: public:
/*! Rollback Mode enumeration
*
*/
enum rollback_mode enum rollback_mode
{ {
/*! Asynchronous rollback mode */
rm_async, rm_async,
/*! Synchronous rollback mode */
rm_sync rm_sync
}; };
/*! Server Context constructor
*
* \param name Human Readable Server Name.
* \param id Server Identifier String, UUID or some unique
* identifier.
* \param address Server address in form of IPv4 address, IPv6 address
* or hostname.
* \param working_dir Working directory for replication specific
* data files.
* \param rollback_mode Rollback mode which server operates on.
*/
server_context(const std::string& name, server_context(const std::string& name,
const std::string& id, const std::string& id,
const std::string& address, const std::string& address,
@@ -44,43 +115,57 @@ namespace trrep
{ } { }
virtual ~server_context(); virtual ~server_context();
// /*!
// Return server name * Return human readable server name.
// *
* \return Human readable server name string.
*/
const std::string& name() const { return name_; } const std::string& name() const { return name_; }
// /*!
// Return server identifier * Return Server identifier string.
// *
* \return Server indetifier string.
*/
const std::string& id() const { return id_; } const std::string& id() const { return id_; }
// /*!
// Return server group communication address * Return server group communication address.
// *
* \return Return server group communication address.
*/
const std::string& address() const { return address_; } const std::string& address() const { return address_; }
// /*!
// Create client context which acts only locally, i.e. does * Create client context which acts only locally, i.e. does
// not participate in replication. However, local client * not participate in replication. However, local client
// connection may execute transactions which require ordering, * connection may execute transactions which require ordering,
// as when modifying local SR fragment storage requires * as when modifying local SR fragment storage requires
// strict commit ordering. * strict commit ordering.
// *
* \return Pointer to Client Context.
*/
virtual client_context* local_client_context() = 0; virtual client_context* local_client_context() = 0;
// /*!
// Load provider * Load WSRep provider.
// *
// @return Zero on success, non-zero on error * \param provider WSRep provider library to be loaded.
// * \param provider_options Provider specific options string
int load_provider(const std::string&, const std::string& = ""); * to be passed for provider during initialization.
*
* \return Zero on success, non-zero on error.
*/
int load_provider(const std::string& provider,
const std::string& provider_options);
// /*!
// Return reference to provider * Return reference to provider.
// *
// @return Reference to provider * \return Reference to provider
// @throw trrep::runtime_error if provider has not been loaded *
// * \throw trrep::runtime_error if provider has not been loaded
*/
virtual trrep::provider& provider() const virtual trrep::provider& provider() const
{ {
if (provider_ == 0) if (provider_ == 0)
@@ -90,34 +175,106 @@ namespace trrep
return *provider_; return *provider_;
} }
// /*!
// * Virtual method which will be called when the server
// * has been joined to the cluster. Must be provided by
* the implementation.
*/
virtual void on_connect() = 0; virtual void on_connect() = 0;
/*!
* Wait until the server has connected to the cluster.
*
* \todo This should not be pure virtual method,
* this base class should provide the server state
* machine and proper synchronization.
*/
virtual void wait_until_connected() = 0; virtual void wait_until_connected() = 0;
virtual void on_view(const trrep::view&) = 0;
/*!
* Virtual method which will be called when a view
* notification event has been delivered by the
* provider.
*
* \params view trrep::view object which holds the new view
* information.
*/
virtual void on_view(const trrep::view& view) = 0;
/*!
* Virtual method which will be called when the server
* has been synchronized with the cluster.
*/
virtual void on_sync() = 0; virtual void on_sync() = 0;
/*!
* Virtual method which will be called on *joiner* when the provider
* requests the SST request information. This method should
* provide a string containing an information which the donor
* server can use to donate SST.
*/
virtual std::string on_sst_request() = 0; virtual std::string on_sst_request() = 0;
virtual void on_sst_donate_request(const std::string&,
const wsrep_gtid_t&, /*!
bool) = 0; * Virtual method which will be called on *donor* when the
virtual void sst_received(const wsrep_gtid_t&) = 0; * SST request has been delivered by the provider.
// * This method should initiate SST transfer or throw
// This method will be called by the applier thread when * a trrep::runtime_error
// a remote write set is being applied. It is the responsibility * if the SST transfer cannot be initiated. If the SST request
// of the caller to set up transaction context and data properly. * initiation is succesful, the server remains in s_donor
// * state until the SST is over or fails. The \param bypass
* should be passed to SST implementation. If the flag is true,
* no actual SST should happen, but the joiner server should
* be notified that the donor has seen the request. The notification
* should included \param gtid provided. This must be passed
* to sst_received() call on the joiner.
*
* \todo Figure out better exception for error codition.
*
* \param sst_request SST request string provided by the joiner.
* \param gtid GTID denoting the current replication position.
* \param bypass Boolean bypass flag.
*/
virtual void on_sst_donate_request(const std::string& sst_request,
const wsrep_gtid_t& gtid,
bool bypass) = 0;
/*!
* This method must be called by the joiner after the SST
* transfer has been received.
*
* \param gtid GTID provided by the SST transfer
*/
void sst_received(const wsrep_gtid_t& gtid);
/*!
* This method will be called by the provider hen
* a remote write set is being applied. It is the responsibility
* of the caller to set up transaction context and data properly.
*
* \todo Make this private, allow calls for provider implementations
* only.
* \param client_context Applier client context.
* \param transaction_context Transaction context.
* \param data Write set data
*
* \return Zero on success, non-zero on failure.
*/
int on_apply(trrep::client_context& client_context, int on_apply(trrep::client_context& client_context,
trrep::transaction_context& transaction_context, trrep::transaction_context& transaction_context,
const trrep::data& data); const trrep::data& data);
/*!
* This virtual method should be implemented by the DBMS
* to provide information if the current statement in processing
* is allowd for streaming replication.
*
* \return True if the statement is allowed for streaming
* replication, false otherwise.
*/
virtual bool statement_allowed_for_streaming( virtual bool statement_allowed_for_streaming(
const trrep::client_context&, const trrep::client_context& client_context,
const trrep::transaction_context&) const const trrep::transaction_context& transaction_context) const;
{
// Streaming not implemented yet
return false;
}
private: private:
server_context(const server_context&); server_context(const server_context&);

2
wsrep

Submodule wsrep updated: 7f29c99334...955da1c8c2