1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-24 10:42:31 +03:00

Finished dbsim integration with refactored client API.

This commit is contained in:
Teemu Ollakka
2018-06-16 10:25:14 +03:00
parent ae68122d59
commit 6dcac8ce4d
15 changed files with 203 additions and 97 deletions

View File

@ -34,12 +34,17 @@ namespace db
const db::params&);
bool bf_abort(wsrep::seqno);
const struct stats stats() const { return stats_; }
void store_globals() { }
void reset_globals() { }
void store_globals()
{
client_context_.store_globals();
}
void reset_globals()
{ }
void start();
wsrep::client_context& client_context() { return client_context_; }
private:
friend class db::server_context;
friend class db::client_service;
template <class F> int client_command(F f);
void run_one_transaction();
void reset_error();

View File

@ -31,7 +31,7 @@ namespace db
{ }
db::client* client() { return client_; }
void reset_globals() { }
void store_globals() { }
void store_globals() { wsrep::client_context::store_globals(); }
bool is_autocommit() const { return is_autocommit_; }
bool do_2pc() const { return do_2pc_; }

View File

@ -3,3 +3,49 @@
//
#include "db_client_service.hpp"
#include "db_client.hpp"
int db::client_service::apply(wsrep::client_context&,
const wsrep::const_buffer&)
{
db::client* client(client_context_.client());
client->se_trx_.start(client);
client->se_trx_.apply(client_context_.transaction());
return 0;
}
int db::client_service::commit(wsrep::client_context&,
const wsrep::ws_handle&,
const wsrep::ws_meta&)
{
db::client* client(client_context_.client());
int ret(client_context_.before_commit());
if (ret == 0) client->se_trx_.commit();
ret = ret || client_context_.ordered_commit();
ret = ret || client_context_.after_commit();
return ret;
}
int db::client_service::rollback(wsrep::client_context&)
{
db::client* client(client_context_.client());
int ret(client_context_.before_rollback());
assert(ret == 0);
client->se_trx_.rollback();
ret = client_context_.after_rollback();
assert(ret == 0);
return ret;
}
enum wsrep::provider::status
db::client_service::replay(wsrep::client_context&,
wsrep::transaction_context& transaction_context)
{
wsrep::client_applier_mode applier_mode(client_context_);
auto ret(provider_.replay(transaction_context.ws_handle(),
&client_context_));
if (ret == wsrep::provider::success)
{
++client_context_.client()->stats_.replays;
}
return ret;
}

View File

@ -2,11 +2,10 @@
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_DB_CLIENT_SREVICE_HPP
#ifndef WSREP_DB_CLIENT_SERVICE_HPP
#define WSREP_DB_CLIENT_SERVICE_HPP
#include "wsrep/client_service.hpp"
#include "wsrep/client_context.hpp"
#include "wsrep/transaction_context.hpp"
#include "db_client_context.hpp"
@ -67,21 +66,12 @@ namespace db
void remove_fragments(const wsrep::transaction_context&) override
{ }
int apply(wsrep::client_context&, const wsrep::const_buffer&) override
{
return 0;
}
int apply(wsrep::client_context&, const wsrep::const_buffer&) override;
int commit(wsrep::client_context&,
const wsrep::ws_handle&, const wsrep::ws_meta&) override
{
return 0;
}
const wsrep::ws_handle&, const wsrep::ws_meta&) override;
int rollback(wsrep::client_context&) override
{
return 0;
}
int rollback(wsrep::client_context&) override;
void will_replay(const wsrep::transaction_context&) override
{ }
@ -90,8 +80,7 @@ namespace db
wsrep::unique_lock<wsrep::mutex>&) override { }
enum wsrep::provider::status replay(wsrep::client_context&,
wsrep::transaction_context&)
override
{ return wsrep::provider::success; }
override;
int append_fragment(const wsrep::transaction_context&, int,
const wsrep::const_buffer&) override

View File

@ -27,7 +27,10 @@ void db::server::applier_thread()
wsrep::client_id client_id(last_client_id_.fetch_add(1) + 1);
db::client applier(*this, client_id,
wsrep::client_context::m_applier, simulator_.params());
enum wsrep::provider::status ret(server_context_.provider().run_applier(&applier));
wsrep::client_context* cc(static_cast<wsrep::client_context*>(
&applier.client_context()));
enum wsrep::provider::status ret(
server_context_.provider().run_applier(cc));
wsrep::log() << "Applier thread exited with error code " << ret;
}
@ -64,7 +67,7 @@ void db::server::stop_clients()
{
const struct db::client::stats& stats(i->stats());
simulator_.stats_.commits += stats.commits;
simulator_.stats_.aborts += stats.rollbacks;
simulator_.stats_.rollbacks += stats.rollbacks;
simulator_.stats_.replays += stats.replays;
}
}

View File

@ -6,6 +6,70 @@
#include <boost/filesystem.hpp>
#include <sstream>
void db::simulator::run()
{
start();
stop();
std::flush(std::cerr);
std::cout << "Results:\n";
std::cout << stats() << std::endl;
}
void db::simulator::sst(db::server& server,
const std::string& request,
const wsrep::gtid& gtid,
bool bypass)
{
size_t id;
std::istringstream is(request);
is >> id;
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
auto i(servers_.find(id));
if (i == servers_.end())
{
throw wsrep::runtime_error("Server " + request + " not found");
}
if (bypass == false)
{
wsrep::log_info() << "SST " << server.server_context().id() << " -> " << id;
}
i->second->server_context().sst_received(gtid, 0);
server.server_context().sst_sent(gtid, 0);
}
std::string db::simulator::stats() const
{
size_t transactions(params_.n_servers * params_.n_clients
* params_.n_transactions);
auto duration(std::chrono::duration<double>(
clients_stop_ - clients_start_).count());
long long bf_aborts(0);
for (const auto& s : servers_)
{
bf_aborts += s.second->storage_engine().bf_aborts();
}
std::ostringstream os;
os << "Number of transactions: " << transactions
<< "\n"
<< "Seconds: " << duration
<< " \n"
<< "Transactions per second: " << transactions/duration
<< "\n"
<< "BF aborts: "
<< bf_aborts
<< "\n"
<< "Client commits: " << stats_.commits
<< "\n"
<< "Client rollbacks: " << stats_.rollbacks
<< "\n"
<< "Client replays: " << stats_.replays;
return os.str();
}
////////////////////////////////////////////////////////////////////////////////
// Private //
////////////////////////////////////////////////////////////////////////////////
void db::simulator::start()
{
wsrep::log() << "Provider: " << params_.wsrep_provider;
@ -65,64 +129,38 @@ void db::simulator::start()
void db::simulator::stop()
{
}
void db::simulator::sst(db::server& server,
const std::string& request,
const wsrep::gtid& gtid,
bool bypass)
{
size_t id;
std::istringstream is(request);
is >> id;
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
auto i(servers_.find(id));
if (i == servers_.end())
for (auto& i : servers_)
{
throw wsrep::runtime_error("Server " + request + " not found");
db::server& server(*i.second);
server.stop_clients();
}
if (bypass == false)
clients_stop_ = std::chrono::steady_clock::now();
wsrep::log() << "######## Stats ############";
wsrep::log() << stats();
wsrep::log() << "######## Stats ############";
if (params_.fast_exit)
{
wsrep::log_info() << "SST " << server.server_context().id() << " -> " << id;
exit(0);
}
i->second->server_context().sst_received(gtid, 0);
server.server_context().sst_sent(gtid, 0);
}
std::string db::simulator::stats() const
{
size_t transactions(params_.n_servers * params_.n_clients
* params_.n_transactions);
auto duration(std::chrono::duration<double>(
clients_stop_ - clients_start_).count());
long long bf_aborts(0);
for (const auto& s : servers_)
for (auto& i : servers_)
{
bf_aborts += s.second->storage_engine().bf_aborts();
db::server& server(*i.second);
wsrep::log_info() << "Status for server: "
<< server.server_context().id();
auto status(server.server_context().provider().status());
for_each(status.begin(), status.end(),
[](const wsrep::provider::status_variable& sv)
{
wsrep::log() << sv.name() << " = " << sv.value();
});
server.server_context().disconnect();
server.server_context().wait_until_state(
wsrep::server_context::s_disconnected);
server.stop_applier();
server.server_context().unload_provider();
}
std::ostringstream os;
os << "Number of transactions: " << transactions
<< "\n"
<< "Seconds: " << duration
<< " \n"
<< "Transactions per second: " << transactions/duration
<< "\n"
<< "BF aborts: "
<< bf_aborts
<< "\n"
<< "Client commits: " << stats_.commits
<< "\n"
<< "Client aborts: " << stats_.aborts
<< "\n"
<< "Client replays: " << stats_.replays;
return os.str();
}
////////////////////////////////////////////////////////////////////////////////
// Private //
////////////////////////////////////////////////////////////////////////////////
std::string db::simulator::server_port(size_t i) const
{
std::ostringstream os;

View File

@ -32,14 +32,15 @@ namespace db
, stats_()
{ }
void start();
void stop();
void run();
void sst(db::server&,
const std::string&, const wsrep::gtid&, bool);
const db::params& params() const
{ return params_; }
std::string stats() const;
private:
void start();
void stop();
std::string server_port(size_t i) const;
std::string build_cluster_address() const;
@ -52,11 +53,11 @@ namespace db
struct stats
{
long long commits;
long long aborts;
long long rollbacks;
long long replays;
stats()
: commits(0)
, aborts(0)
, rollbacks(0)
, replays(0)
{ }
} stats_;

View File

@ -5,7 +5,7 @@
#include "db_storage_engine.hpp"
#include "db_client.hpp"
void db::storage_engine::transaction::start(client* cc)
void db::storage_engine::transaction::start(db::client* cc)
{
wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_);
if (se_.transactions_.insert(cc).second == false)
@ -15,6 +15,13 @@ void db::storage_engine::transaction::start(client* cc)
cc_ = cc;
}
void db::storage_engine::transaction::apply(
const wsrep::transaction_context& transaction_context)
{
assert(cc_);
se_.bf_abort_some(transaction_context);
}
void db::storage_engine::transaction::commit()
{
if (cc_)
@ -43,10 +50,19 @@ void db::storage_engine::bf_abort_some(const wsrep::transaction_context& txc)
{
if (transactions_.empty() == false)
{
auto* victim_txc(*transactions_.begin());
if (victim_txc->bf_abort(txc.seqno()))
for (auto victim : transactions_)
{
++bf_aborts_;
wsrep::client_context& cc(victim->client_context());
wsrep::unique_lock<wsrep::mutex> lock(cc.mutex());
if (cc.mode() == wsrep::client_context::m_replicating)
{
lock.unlock();
if (victim->bf_abort(txc.seqno()))
{
++bf_aborts_;
break;
}
}
}
}
}

View File

@ -39,13 +39,15 @@ namespace db
}
bool active() const { return cc_ != nullptr; }
void start(client* cc);
void apply(const wsrep::transaction_context&);
void commit();
void rollback();
db::client* client() { return cc_; }
transaction(const transaction&) = delete;
transaction& operator=(const transaction&) = delete;
private:
storage_engine& se_;
client* cc_;
db::storage_engine& se_;
db::client* cc_;
};
void bf_abort_some(const wsrep::transaction_context& tc);
long long bf_aborts() const { return bf_aborts_; }

View File

@ -7,10 +7,6 @@
int main(int argc, char** argv)
{
db::params params(db::parse_args(argc, argv));
db::simulator simulator(params);
simulator.start();
simulator.stop();
std::cout << simulator.stats() << std::endl;
db::simulator(db::parse_args(argc, argv)).run();
return 0;
}

View File

@ -126,6 +126,13 @@ namespace wsrep
};
const static int state_max_ = s_quitting + 1;
void store_globals()
{
thread_id_ = wsrep::this_thread::get_id();
}
/*!
* Destructor.
*/
@ -134,7 +141,6 @@ namespace wsrep
assert(transaction_.active() == false);
}
/*!
*
*/
@ -328,7 +334,8 @@ namespace wsrep
int commit()
{
assert(mode_ == m_applier || mode_ == m_local);
return client_service_.commit(*this,
return client_service_.commit(
*this,
transaction_.ws_handle(), transaction_.ws_meta());
}
//
@ -553,13 +560,6 @@ namespace wsrep
client_context(const client_context&);
client_context& operator=(client_context&);
/*
* Friend declarations
*/
//friend int server_context::on_apply(client_context&,
// const wsrep::ws_handle&,
// const wsrep::ws_meta&,
// const wsrep::const_buffer&);
friend class client_context_switch;
friend class client_applier_mode;
friend class client_toi_mode;

View File

@ -239,6 +239,8 @@ namespace wsrep
int load_provider(const std::string& provider,
const std::string& provider_options);
void unload_provider();
/*!
* Return reference to provider.
*

View File

@ -26,6 +26,12 @@ int wsrep::server_context::load_provider(const std::string& provider_spec,
return (provider_ ? 0 : 1);
}
void wsrep::server_context::unload_provider()
{
delete provider_;
provider_ = 0;
}
int wsrep::server_context::connect(const std::string& cluster_name,
const std::string& cluster_address,
const std::string& state_donor,

View File

@ -74,6 +74,7 @@ int wsrep::transaction_context::start_transaction(
id_ = ws_meta.transaction_id();
assert(client_context_.mode() == wsrep::client_context::m_applier);
state_ = s_executing;
state_hist_.clear();
ws_handle_ = ws_handle;
ws_meta_ = ws_meta;
certified_ = true;
@ -946,6 +947,7 @@ void wsrep::transaction_context::clear_fragments()
void wsrep::transaction_context::cleanup()
{
assert(is_streaming() == false);
assert(state() == s_committed || state() == s_aborted);
debug_log_state("cleanup_enter");
id_ = wsrep::transaction_id::invalid();
ws_handle_ = wsrep::ws_handle();

View File

@ -7,8 +7,8 @@
int wsrep::mock_client_service::apply(
wsrep::client_context& client_context,
const wsrep::const_buffer& data __attribute__((unused)))
wsrep::client_context& client_context WSREP_UNUSED,
const wsrep::const_buffer&)
{
assert(client_context.transaction().state() == wsrep::transaction_context::s_executing ||