1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-06-16 02:01:44 +03:00

Ported to latest wsrep API v26

This commit is contained in:
Teemu Ollakka
2018-04-20 16:54:10 +03:00
parent 5c1d390b74
commit ea512d9c36
9 changed files with 198 additions and 52 deletions

View File

@ -51,13 +51,29 @@ public:
: mutex_() : mutex_()
, params_(params) , params_(params)
, servers_() , servers_()
, clients_start_()
, clients_stop_()
{ } { }
~dbms_simulator()
{
}
void start(); void start();
void stop(); void stop();
void donate_sst(dbms_server&, void donate_sst(dbms_server&,
const std::string& req, const wsrep_gtid_t& gtid, bool); const std::string& req, const wsrep_gtid_t& gtid, bool);
const dbms_simulator_params& params() const const dbms_simulator_params& params() const
{ return params_; } { return params_; }
std::string stats() const
{
std::ostringstream os;
os << "Number of transactions: " <<
(params_.n_servers * params_.n_clients * params_.n_transactions)
<< "\n"
<< "Seconds: "
<< std::chrono::duration<double>(clients_stop_ - clients_start_).count()
<< "\n";
return os.str();
}
private: private:
std::string server_port(size_t i) const std::string server_port(size_t i) const
{ {
@ -70,6 +86,8 @@ private:
trrep::default_mutex mutex_; trrep::default_mutex mutex_;
const dbms_simulator_params& params_; const dbms_simulator_params& params_;
std::map<size_t, std::unique_ptr<dbms_server>> servers_; std::map<size_t, std::unique_ptr<dbms_server>> servers_;
std::chrono::time_point<std::chrono::steady_clock> clients_start_;
std::chrono::time_point<std::chrono::steady_clock> clients_stop_;
}; };
class dbms_client; class dbms_client;
@ -84,8 +102,10 @@ public:
s_synced s_synced
}; };
dbms_server(dbms_simulator& simulator, dbms_server(dbms_simulator& simulator,
const std::string& name, const std::string& id) const std::string& name,
: trrep::server_context(name, id, name + "_data", const std::string& id,
const std::string& address)
: trrep::server_context(name, id, address, name + "_data",
trrep::server_context::rm_async) trrep::server_context::rm_async)
, simulator_(simulator) , simulator_(simulator)
, mutex_() , mutex_()
@ -252,7 +272,7 @@ public:
bool do_2pc() const { return false; } bool do_2pc() const { return false; }
int apply(trrep::transaction_context&, const trrep::data&) int apply(trrep::transaction_context&, const trrep::data&)
{ {
std::cerr << "applying" << "\n"; // std::cerr << "applying" << "\n";
return 0; return 0;
} }
int commit(trrep::transaction_context& transaction_context) int commit(trrep::transaction_context& transaction_context)
@ -261,12 +281,14 @@ public:
ret = transaction_context.before_commit(); ret = transaction_context.before_commit();
ret = ret || transaction_context.ordered_commit(); ret = ret || transaction_context.ordered_commit();
ret = ret || transaction_context.after_commit(); ret = ret || transaction_context.after_commit();
std::cerr << "commit" << "\n"; // std::cerr << "commit" << "\n";
return 0; return 0;
} }
int rollback(trrep::transaction_context& transaction_context) int rollback(trrep::transaction_context& transaction_context)
{ {
// std::cerr << "rollback: " << transaction_context.id().get() << "\n"; std::cerr << "rollback: " << transaction_context.id().get()
<< "state: " << trrep::to_string(transaction_context.state())
<< "\n";
transaction_context.before_rollback(); transaction_context.before_rollback();
transaction_context.after_rollback(); transaction_context.after_rollback();
return 0; return 0;
@ -288,22 +310,22 @@ private:
int err(0); int err(0);
key.append_key_part(&trx_id, sizeof(trx_id)); key.append_key_part(&trx_id, sizeof(trx_id));
err = trx.append_key(key); err = trx.append_key(key);
std::cout << "append_key: " << err << "\n"; // std::cout << "append_key: " << err << "\n";
err = err || trx.append_data(trrep::data(os.str().c_str(), os.str().size())); err = err || trx.append_data(trrep::data(os.str().c_str(), os.str().size()));
std::cout << "append_data: " << err << "\n"; // std::cout << "append_data: " << err << "\n";
if (do_2pc()) if (do_2pc())
{ {
err = err || trx.before_prepare(); err = err || trx.before_prepare();
std::cout << "before_prepare: " << err << "\n"; // std::cout << "before_prepare: " << err << "\n";
err = err || trx.after_prepare(); err = err || trx.after_prepare();
std::cout << "after_prepare: " << err << "\n"; // std::cout << "after_prepare: " << err << "\n";
} }
err = err || trx.before_commit(); err = err || trx.before_commit();
std::cout << "before_commit: " << err << "\n"; // std::cout << "before_commit: " << err << "\n";
err = err || trx.ordered_commit(); err = err || trx.ordered_commit();
std::cout << "ordered_commit: " << err << "\n"; // std::cout << "ordered_commit: " << err << "\n";
err = err || trx.after_commit(); err = err || trx.after_commit();
std::cout << "after_commit: " << err << "\n"; // std::cout << "after_commit: " << err << "\n";
trx.after_statement(); trx.after_statement();
} }
@ -378,9 +400,16 @@ void dbms_simulator::start()
name_os << (i + 1); name_os << (i + 1);
std::ostringstream id_os; std::ostringstream id_os;
id_os << (i + 1); id_os << (i + 1);
auto it(servers_.insert(std::make_pair((i + 1), std::ostringstream address_os;
address_os << "127.0.0.1:" << server_port(i);
auto it(servers_.insert(
std::make_pair(
(i + 1),
std::make_unique<dbms_server>( std::make_unique<dbms_server>(
*this, name_os.str(), id_os.str())))); *this,
name_os.str(),
id_os.str(),
address_os.str()))));
if (it.second == false) if (it.second == false)
{ {
throw trrep::runtime_error("Failed to add server"); throw trrep::runtime_error("Failed to add server");
@ -390,10 +419,16 @@ void dbms_simulator::start()
dbms_server& server(*it.first->second); dbms_server& server(*it.first->second);
std::string server_options(params_.wsrep_provider_options); std::string server_options(params_.wsrep_provider_options);
server_options += "; base_port=" + server_port(i); // server_options += "; base_port=" + server_port(i);
server.load_provider(params_.wsrep_provider, server_options); if (server.load_provider(params_.wsrep_provider, server_options))
server.provider().connect("sim_cluster", cluster_address, "", {
i == 0); throw trrep::runtime_error("Failed to load provider");
}
if (server.provider().connect("sim_cluster", cluster_address, "",
i == 0))
{
throw trrep::runtime_error("Failed to connect");
}
server.start_applier(); server.start_applier();
server.wait_until_connected(); server.wait_until_connected();
server.wait_until_state(dbms_server::s_synced); server.wait_until_state(dbms_server::s_synced);
@ -401,6 +436,7 @@ void dbms_simulator::start()
// Start client threads // Start client threads
clients_start_ = std::chrono::steady_clock::now();
for (auto& i : servers_) for (auto& i : servers_)
{ {
i.second->start_clients(); i.second->start_clients();
@ -414,9 +450,23 @@ void dbms_simulator::stop()
{ {
dbms_server& server(*i.second); dbms_server& server(*i.second);
server.stop_clients(); server.stop_clients();
}
clients_stop_ = std::chrono::steady_clock::now();
for (auto& i : servers_)
{
dbms_server& server(*i.second);
std::cout << "Status for server: " << server.id() << "\n";
auto status(server.provider().status());
for_each(status.begin(), status.end(),
[](const trrep::provider::status_variable& sv)
{
std::cout << sv.name() << " = " << sv.value() << "\n";
});
server.provider().disconnect(); server.provider().disconnect();
server.wait_until_disconnected(); server.wait_until_disconnected();
server.stop_applier(); server.stop_applier();
} }
} }
@ -443,7 +493,8 @@ void dbms_simulator::donate_sst(dbms_server& server,
} }
std::string dbms_simulator::build_cluster_address() const std::string dbms_simulator::build_cluster_address() const
{ {
std::string ret("gcomm://"); std::string ret;
// std::string ret("gcomm://");
for (size_t i(0); i < params_.n_servers; ++i) for (size_t i(0); i < params_.n_servers; ++i)
{ {
std::ostringstream sa_os; std::ostringstream sa_os;
@ -459,6 +510,7 @@ namespace po = boost::program_options;
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
std::string stats;
try try
{ {
dbms_simulator_params params; dbms_simulator_params params;
@ -489,8 +541,8 @@ int main(int argc, char** argv)
dbms_simulator sim(params); dbms_simulator sim(params);
sim.start(); sim.start();
std::this_thread::sleep_for(std::chrono::seconds(5));
sim.stop(); sim.stop();
stats = sim.stats();
} }
catch (const std::exception& e) catch (const std::exception& e)
{ {
@ -498,5 +550,7 @@ int main(int argc, char** argv)
return 1; return 1;
} }
std::cout << "Stats:\n" << stats << "\n";
return 0; return 0;
} }

View File

@ -96,9 +96,11 @@ namespace trrep
int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) { return 0; } int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) { return 0; }
int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) { return 0; } int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) { return 0; }
int rollback(const wsrep_trx_id_t) { return 0; } int rollback(const wsrep_trx_id_t) { return 0; }
wsrep_status_t commit_order_enter(wsrep_ws_handle_t*) wsrep_status_t commit_order_enter(const wsrep_ws_handle_t*,
const wsrep_trx_meta_t*)
{ return WSREP_OK; } { return WSREP_OK; }
int commit_order_leave(wsrep_ws_handle_t*) { return 0;} int commit_order_leave(const wsrep_ws_handle_t*,
const wsrep_trx_meta_t*) { return 0;}
int release(wsrep_ws_handle_t*) { return 0; } int release(wsrep_ws_handle_t*) { return 0; }
// Methods to modify mock state // Methods to modify mock state
@ -125,6 +127,11 @@ namespace trrep
int sst_sent(const wsrep_gtid_t&, int) { return 0; } int sst_sent(const wsrep_gtid_t&, int) { return 0; }
int sst_received(const wsrep_gtid_t&, int) { return 0; } int sst_received(const wsrep_gtid_t&, int) { return 0; }
std::vector<status_variable> status() const
{
return std::vector<status_variable>();
}
private: private:
wsrep_uuid_t group_id_; wsrep_uuid_t group_id_;
wsrep_uuid_t node_id_; wsrep_uuid_t node_id_;

View File

@ -18,7 +18,7 @@ namespace trrep
mock_server_context(const std::string& name, mock_server_context(const std::string& name,
const std::string& id, const std::string& id,
enum trrep::server_context::rollback_mode rollback_mode) enum trrep::server_context::rollback_mode rollback_mode)
: trrep::server_context(name, id, "./", rollback_mode) : trrep::server_context(name, id, "", "./", rollback_mode)
, provider_() , provider_()
, last_client_id_(0) , last_client_id_(0)
{ } { }

View File

@ -10,6 +10,7 @@
#include <wsrep_api.h> #include <wsrep_api.h>
#include <string> #include <string>
#include <vector>
namespace trrep namespace trrep
{ {
@ -18,6 +19,20 @@ namespace trrep
{ {
public: public:
class status_variable
{
public:
status_variable(const std::string& name,
const std::string& value)
: name_(name)
, value_(value)
{ }
const std::string& name() const { return name_; }
const std::string& value() const { return value_; }
private:
std::string name_;
std::string value_;
};
virtual ~provider() { } virtual ~provider() { }
// Provider state management // Provider state management
virtual int connect(const std::string& cluster_name, virtual int connect(const std::string& cluster_name,
@ -51,12 +66,16 @@ namespace trrep
wsrep_trx_id_t victim_trx, wsrep_trx_id_t victim_trx,
wsrep_seqno_t* victim_seqno) = 0; wsrep_seqno_t* victim_seqno) = 0;
virtual int rollback(const wsrep_trx_id_t) = 0; virtual int rollback(const wsrep_trx_id_t) = 0;
virtual wsrep_status commit_order_enter(wsrep_ws_handle_t*) = 0; virtual wsrep_status commit_order_enter(const wsrep_ws_handle_t*,
virtual int commit_order_leave(wsrep_ws_handle_t*) = 0; const wsrep_trx_meta_t*) = 0;
virtual int commit_order_leave(const wsrep_ws_handle_t*,
const wsrep_trx_meta_t*) = 0;
virtual int release(wsrep_ws_handle_t*) = 0; virtual int release(wsrep_ws_handle_t*) = 0;
virtual int sst_sent(const wsrep_gtid_t&, int) = 0; virtual int sst_sent(const wsrep_gtid_t&, int) = 0;
virtual int sst_received(const wsrep_gtid_t&, int) = 0; virtual int sst_received(const wsrep_gtid_t&, int) = 0;
virtual std::vector<status_variable> status() const = 0;
// Factory method // Factory method
static provider* make_provider(const std::string& provider); static provider* make_provider(const std::string& provider);
}; };

View File

@ -37,7 +37,7 @@ namespace
void* app_ctx, void* app_ctx,
const wsrep_view_info_t* view __attribute((unused))) const wsrep_view_info_t* view __attribute((unused)))
{ {
assert(app_ctx != 0); assert(app_ctx);
trrep::server_context& server_context( trrep::server_context& server_context(
*reinterpret_cast<trrep::server_context*>(app_ctx)); *reinterpret_cast<trrep::server_context*>(app_ctx));
// //
@ -182,7 +182,7 @@ int trrep::server_context::load_provider(const std::string& provider_spec,
memset(&init_args, 0, sizeof(init_args)); memset(&init_args, 0, sizeof(init_args));
init_args.app_ctx = this; init_args.app_ctx = this;
init_args.node_name = name_.c_str(); init_args.node_name = name_.c_str();
init_args.node_address = ""; init_args.node_address = address_.c_str();
init_args.node_incoming = ""; init_args.node_incoming = "";
init_args.data_dir = working_dir_.c_str(); init_args.data_dir = working_dir_.c_str();
init_args.options = provider_options.c_str(); init_args.options = provider_options.c_str();
@ -240,6 +240,7 @@ int trrep::server_context::on_apply(
assert(0); assert(0);
} }
transaction_context.after_statement();
if (ret) if (ret)
{ {
client_context.rollback(transaction_context); client_context.rollback(transaction_context);

View File

@ -32,11 +32,13 @@ namespace trrep
server_context(const std::string& name, server_context(const std::string& name,
const std::string& id, const std::string& id,
const std::string& address,
const std::string& working_dir, const std::string& working_dir,
enum rollback_mode rollback_mode) enum rollback_mode rollback_mode)
: provider_() : provider_()
, name_(name) , name_(name)
, id_(id) , id_(id)
, address_(address)
, working_dir_(working_dir) , working_dir_(working_dir)
, rollback_mode_(rollback_mode) , rollback_mode_(rollback_mode)
{ } { }
@ -52,6 +54,11 @@ namespace trrep
// //
const std::string& id() const { return id_; } const std::string& id() const { return id_; }
//
// Return server group communication address
//
const std::string& address() const { return address_; }
// //
// Create client context which acts only locally, i.e. does // Create client context which acts only locally, i.e. does
// not participate in replication. However, local client // not participate in replication. However, local client
@ -119,6 +126,7 @@ namespace trrep
trrep::provider* provider_; trrep::provider* provider_;
std::string name_; std::string name_;
std::string id_; std::string id_;
std::string address_;
std::string working_dir_; std::string working_dir_;
enum rollback_mode rollback_mode_; enum rollback_mode rollback_mode_;
}; };

View File

@ -52,9 +52,16 @@ trrep::transaction_context::transaction_context(
trrep::transaction_context::~transaction_context() trrep::transaction_context::~transaction_context()
{ {
if (state() != s_committed && state() != s_aborted) if (active())
{ {
client_context_.rollback(*this); try
{
(void)client_context_.rollback(*this);
}
catch (...)
{
// TODO: Log warning
}
} }
} }
@ -194,7 +201,7 @@ int trrep::transaction_context::before_commit()
case trrep::client_context::m_local: case trrep::client_context::m_local:
if (ordered()) if (ordered())
{ {
ret = provider_.commit_order_enter(&ws_handle_); ret = provider_.commit_order_enter(&ws_handle_, &trx_meta_);
} }
break; break;
case trrep::client_context::m_replicating: case trrep::client_context::m_replicating:
@ -222,7 +229,7 @@ int trrep::transaction_context::before_commit()
if (ret == 0) if (ret == 0)
{ {
lock.unlock(); lock.unlock();
switch(provider_.commit_order_enter(&ws_handle_)) switch(provider_.commit_order_enter(&ws_handle_, &trx_meta_))
{ {
case WSREP_OK: case WSREP_OK:
break; break;
@ -240,7 +247,7 @@ int trrep::transaction_context::before_commit()
break; break;
case trrep::client_context::m_applier: case trrep::client_context::m_applier:
assert(ordered()); assert(ordered());
ret = provider_.commit_order_enter(&ws_handle_); ret = provider_.commit_order_enter(&ws_handle_, &trx_meta_);
if (ret) if (ret)
{ {
state(lock, s_must_abort); state(lock, s_must_abort);
@ -268,7 +275,7 @@ int trrep::transaction_context::ordered_commit()
debug_log_state(); debug_log_state();
assert(state() == s_committing); assert(state() == s_committing);
assert(ordered()); assert(ordered());
ret = provider_.commit_order_leave(&ws_handle_); ret = provider_.commit_order_leave(&ws_handle_, &trx_meta_);
// Should always succeed // Should always succeed
assert(ret == 0); assert(ret == 0);
state(lock, s_ordered_commit); state(lock, s_ordered_commit);
@ -427,8 +434,8 @@ int trrep::transaction_context::after_statement()
{ {
if (ordered()) if (ordered())
{ {
ret = provider_.commit_order_enter(&ws_handle_); ret = provider_.commit_order_enter(&ws_handle_, &trx_meta_);
if (ret == 0) provider_.commit_order_leave(&ws_handle_); if (ret == 0) provider_.commit_order_leave(&ws_handle_, &trx_meta_);
} }
provider_.release(&ws_handle_); provider_.release(&ws_handle_);
} }
@ -466,9 +473,9 @@ void trrep::transaction_context::state(
}; };
if (allowed[state_][next_state]) if (allowed[state_][next_state])
{ {
std::cerr << "state transition: " << trrep::to_string(state_) // std::cerr << "state transition: " << trrep::to_string(state_)
<< " -> " << trrep::to_string(next_state) // << " -> " << trrep::to_string(next_state)
<< "\n"; // << "\n";
state_hist_.push_back(state_); state_hist_.push_back(state_);
state_ = next_state; state_ = next_state;
} }
@ -601,7 +608,7 @@ int trrep::transaction_context::certify_commit(
assert(state() == s_certifying || state() == s_must_abort); assert(state() == s_certifying || state() == s_must_abort);
client_context_.debug_sync("wsrep_after_replication"); client_context_.debug_sync("wsrep_after_replication");
std::cout << "seqno: " << trx_meta_.gtid.seqno << "\n"; // std::cout << "seqno: " << trx_meta_.gtid.seqno << "\n";
int ret(1); int ret(1);
switch (cert_ret) switch (cert_ret)
{ {
@ -692,9 +699,9 @@ void trrep::transaction_context::clear_fragments()
void trrep::transaction_context::cleanup() void trrep::transaction_context::cleanup()
{ {
std::cerr << "Cleanup transaction " // std::cerr << "Cleanup transaction "
<< client_context_.id().get() // << client_context_.id().get()
<< ": " << id_.get() << "\n"; // << ": " << id_.get() << "\n";
id_ = trrep::transaction_id::invalid(); id_ = trrep::transaction_id::invalid();
state_ = s_executing; state_ = s_executing;
state_hist_.clear(); state_hist_.clear();
@ -708,7 +715,7 @@ void trrep::transaction_context::cleanup()
void trrep::transaction_context::debug_log_state() const void trrep::transaction_context::debug_log_state() const
{ {
std::cout << "client: " << client_context_.id().get() // std::cout << "client: " << client_context_.id().get()
<< " trx: " << id_.get() // << " trx: " << id_.get()
<< " state: " << state_ << "\n"; // << " state: " << state_ << "\n";
} }

View File

@ -7,7 +7,10 @@
#include <wsrep_api.h> #include <wsrep_api.h>
#include <cassert>
#include <iostream> #include <iostream>
#include <sstream>
trrep::wsrep_provider_v26::wsrep_provider_v26( trrep::wsrep_provider_v26::wsrep_provider_v26(
const char* path, const char* path,
@ -91,14 +94,17 @@ wsrep_status_t trrep::wsrep_provider_v26::certify(wsrep_conn_id_t conn_id,
} }
wsrep_status_t trrep::wsrep_provider_v26::commit_order_enter( wsrep_status_t trrep::wsrep_provider_v26::commit_order_enter(
wsrep_ws_handle_t* wsh) const wsrep_ws_handle_t* wsh,
const wsrep_trx_meta_t* meta)
{ {
return wsrep_->commit_order_enter(wsrep_, wsh); return wsrep_->commit_order_enter(wsrep_, wsh, meta);
} }
int trrep::wsrep_provider_v26::commit_order_leave(wsrep_ws_handle_t* wsh) int trrep::wsrep_provider_v26::commit_order_leave(
const wsrep_ws_handle_t* wsh,
const wsrep_trx_meta_t* meta)
{ {
return (wsrep_->commit_order_leave(wsrep_, wsh, 0) != WSREP_OK); return (wsrep_->commit_order_leave(wsrep_, wsh, meta, 0) != WSREP_OK);
} }
int trrep::wsrep_provider_v26::release(wsrep_ws_handle_t* wsh) int trrep::wsrep_provider_v26::release(wsrep_ws_handle_t* wsh)
@ -123,3 +129,43 @@ int trrep::wsrep_provider_v26::sst_received(const wsrep_gtid_t& gtid, int err)
} }
return 0; return 0;
} }
std::vector<trrep::provider::status_variable>
trrep::wsrep_provider_v26::status() const
{
std::vector<status_variable> ret;
struct wsrep_stats_var* const stats(wsrep_->stats_get(wsrep_));
struct wsrep_stats_var* i(stats);
if (i)
{
while (i->name)
{
switch (i->type)
{
case WSREP_VAR_STRING:
ret.push_back(status_variable(i->name, i->value._string));
break;
case WSREP_VAR_INT64:
{
std::ostringstream os;
os << i->value._int64;
ret.push_back(status_variable(i->name, os.str()));
break;
}
case WSREP_VAR_DOUBLE:
{
std::ostringstream os;
os << i->value._double;
ret.push_back(status_variable(i->name, os.str()));
break;
}
default:
assert(0);
break;
}
++i;
}
wsrep_->stats_free(wsrep_, stats);
}
return ret;
}

View File

@ -33,11 +33,15 @@ namespace trrep
wsrep_trx_id_t, wsrep_trx_id_t,
wsrep_seqno_t*) { return WSREP_OK; } wsrep_seqno_t*) { return WSREP_OK; }
int rollback(const wsrep_trx_id_t) { return 0; } int rollback(const wsrep_trx_id_t) { return 0; }
wsrep_status commit_order_enter(wsrep_ws_handle_t*); wsrep_status commit_order_enter(const wsrep_ws_handle_t*,
int commit_order_leave(wsrep_ws_handle_t*); const wsrep_trx_meta_t*);
int commit_order_leave(const wsrep_ws_handle_t*,
const wsrep_trx_meta_t*);
int release(wsrep_ws_handle_t*); int release(wsrep_ws_handle_t*);
int sst_sent(const wsrep_gtid_t&,int); int sst_sent(const wsrep_gtid_t&,int);
int sst_received(const wsrep_gtid_t& gtid, int); int sst_received(const wsrep_gtid_t& gtid, int);
std::vector<status_variable> status() const;
private: private:
wsrep_provider_v26(const wsrep_provider_v26&); wsrep_provider_v26(const wsrep_provider_v26&);
wsrep_provider_v26& operator=(const wsrep_provider_v26); wsrep_provider_v26& operator=(const wsrep_provider_v26);