1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Refactoring dbms simulator. Intermediate commit.

This commit is contained in:
Teemu Ollakka
2018-06-15 12:58:36 +03:00
parent cb3b2fbf9e
commit 4fbf1d0cf8
17 changed files with 771 additions and 68 deletions

View File

@ -3,6 +3,10 @@
# #
add_executable(dbms_simulator add_executable(dbms_simulator
dbms_simulator.cpp) db_storage_engine.cpp
db_client.cpp
# db_client_service.cpp
# dbms_simulator.cpp
)
target_link_libraries(dbms_simulator wsrep-lib ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_THREAD_LIBRARY}) target_link_libraries(dbms_simulator wsrep-lib ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_THREAD_LIBRARY})
set_property(TARGET dbms_simulator PROPERTY CXX_STANDARD 14) set_property(TARGET dbms_simulator PROPERTY CXX_STANDARD 14)

118
dbsim/db_client.cpp Normal file
View File

@ -0,0 +1,118 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "db_client.hpp"
template <class F>
int db::client::client_command(F f)
{
int err(client_context_.before_command());
// wsrep::log_debug() << "before_command: " << err;
// If err != 0, transaction was BF aborted while client idle
if (err == 0)
{
err = client_context_.before_statement();
if (err == 0)
{
err = f();
}
client_context_.after_statement();
}
client_context_.after_command_before_result();
if (client_context_.current_error())
{
// wsrep::log_info() << "Current error";
assert(client_context_.transaction().state() ==
wsrep::transaction_context::s_aborted);
err = 1;
}
client_context_.after_command_after_result();
// wsrep::log_info() << "client_command(): " << err;
return err;
}
void db::client::run_one_transaction()
{
client_context_.reset_error();
int err = client_command(
[&]()
{
// wsrep::log_debug() << "Start transaction";
err = client_context_.start_transaction(
server_context_.next_transaction_id());
assert(err == 0);
se_trx_.start(this);
return err;
});
const wsrep::transaction_context& transaction(
client_context_.transaction());
err = err || client_command(
[&]()
{
// wsrep::log_debug() << "Generate write set";
assert(transaction.active());
assert(err == 0);
int data(std::rand() % params_.n_rows);
std::ostringstream os;
os << data;
wsrep::key key(wsrep::key::exclusive);
key.append_key_part("dbms", 4);
unsigned long long client_key(client_context_.id().get());
key.append_key_part(&client_key, sizeof(client_key));
key.append_key_part(&data, sizeof(data));
err = client_context_.append_key(key);
err = err || client_context_.append_data(
wsrep::const_buffer(os.str().c_str(),
os.str().size()));
return err;
});
err = err || client_command(
[&]()
{
// wsrep::log_debug() << "Commit";
assert(err == 0);
if (client_context_.do_2pc())
{
err = err || client_context_.before_prepare();
err = err || client_context_.after_prepare();
}
err = err || client_context_.before_commit();
if (err == 0) se_trx_.commit();
err = err || client_context_.ordered_commit();
err = err || client_context_.after_commit();
if (err)
{
client_context_.rollback();
}
return err;
});
assert(err ||
transaction.state() == wsrep::transaction_context::s_aborted ||
transaction.state() == wsrep::transaction_context::s_committed);
assert(se_trx_.active() == false);
assert(transaction.active() == false);
switch (transaction.state())
{
case wsrep::transaction_context::s_committed:
++stats_.commits;
break;
case wsrep::transaction_context::s_aborted:
++stats_.rollbacks;
break;
default:
assert(0);
}
}
bool db::client::bf_abort(wsrep::seqno seqno)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
return client_context_.bf_abort(lock, seqno);
}

46
dbsim/db_client.hpp Normal file
View File

@ -0,0 +1,46 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_DB_CLIENT_HPP
#define WSREP_DB_CLIENT_HPP
#include "db_client_context.hpp"
#include "db_server_context.hpp"
#include "db_storage_engine.hpp"
namespace db
{
class client
{
public:
struct stats
{
long long commits;
long long rollbacks;
long long replays;
stats()
: commits(0)
, rollbacks(0)
, replays(0)
{ }
};
client();
bool bf_abort(wsrep::seqno);
private:
template <class F> int client_command(F f);
void run_one_transaction();
void reset_error();
wsrep::default_mutex mutex_;
const db::params& params_;
db::server_context& server_context_;
db::client_context client_context_;
db::storage_engine::transaction se_trx_;
stats stats_;
};
}
#endif // WSREP_DB_CLIENT_HPP

View File

@ -0,0 +1,39 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_DB_CLIENT_CONTEXT_HPP
#define WSREP_DB_CLIENT_CONTEXT_HPP
#include "wsrep/client_context.hpp"
#include "db_server_context.hpp"
namespace db
{
class client_context : public wsrep::client_context
{
public:
client_context(wsrep::mutex& mutex,
db::server_context& server_context,
wsrep::client_service& client_service,
const wsrep::client_id& client_id,
enum wsrep::client_context::mode mode)
: wsrep::client_context(mutex,
server_context,
client_service,
client_id,
mode)
, is_autocommit_(false)
, do_2pc_(false)
{ }
void reset_globals() { }
void store_globals() { }
bool is_autocommit() const { return is_autocommit_; }
bool do_2pc() const { return do_2pc_; }
private:
bool is_autocommit_;
bool do_2pc_;
};
}
#endif // WSREP_DB_CLIENT_CONTEXT

View File

@ -0,0 +1,5 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "db_client_service.hpp"

107
dbsim/db_client_service.hpp Normal file
View File

@ -0,0 +1,107 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_DB_CLIENT_SREVICE_HPP
#define WSREP_DB_CLIENT_SERVICE_HPP
#include "wsrep/client_service.hpp"
#include "wsrep/client_context.hpp"
#include "wsrep/transaction_context.hpp"
#include "db_client_context.hpp"
namespace db
{
class client_service : public wsrep::client_service
{
public:
client_service(wsrep::provider& provider,
db::client_context& client_context)
: wsrep::client_service(provider)
, client_context_(client_context)
{ }
bool is_autocommit() const override
{
return client_context_.is_autocommit();
}
bool do_2pc() const override
{
return client_context_.do_2pc();
}
bool interrupted() const override
{
return false;
}
void reset_globals() override
{
client_context_.reset_globals();
}
void store_globals() override
{
client_context_.store_globals();
}
int prepare_data_for_replication(wsrep::client_context&, const wsrep::transaction_context&) override
{
return 0;
}
size_t bytes_generated() const override
{
return 0;
}
int prepare_fragment_for_replication(wsrep::client_context&,
const wsrep::transaction_context&,
wsrep::mutable_buffer&) override
{
return 0;
}
void remove_fragments(const wsrep::transaction_context&) override
{ }
int apply(wsrep::client_context&, const wsrep::const_buffer&) override
{
return 0;
}
int commit(wsrep::client_context&,
const wsrep::ws_handle&, const wsrep::ws_meta&) override
{
return 0;
}
int rollback(wsrep::client_context&) override
{
return 0;
}
void will_replay(const wsrep::transaction_context&) override
{ }
enum wsrep::provider::status replay(wsrep::client_context&,
wsrep::transaction_context&)
override
{ return wsrep::provider::success; }
int append_fragment(const wsrep::transaction_context&, int,
const wsrep::const_buffer&) override
{
return 0;
}
void debug_sync(wsrep::client_context&, const char*) override { }
void debug_crash(const char*) override { }
private:
db::client_context& client_context_;
};
}
#endif // WSREP_DB_CLIENT_SERVICE_HPP

38
dbsim/db_params.hpp Normal file
View File

@ -0,0 +1,38 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_DB_PARAMS_HPP
#define WSREP_DB_PARAMS_HPP
#include <cstddef>
#include <string>
namespace db
{
struct params
{
size_t n_servers;
size_t n_clients;
size_t n_transactions;
size_t n_rows;
size_t alg_freq;
std::string wsrep_provider;
std::string wsrep_provider_options;
int debug_log_level;
int fast_exit;
params()
: n_servers(0)
, n_clients(0)
, n_transactions(0)
, n_rows(1000)
, alg_freq(0)
, wsrep_provider()
, wsrep_provider_options()
, debug_log_level(0)
, fast_exit(0)
{ }
};
}
#endif // WSREP_DB_PARAMS_HPP

65
dbsim/db_server.cpp Normal file
View File

@ -0,0 +1,65 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "db_server.hpp"
void db::server::applier_thread()
{
wsrep::client_id client_id(last_client_id_.fetch_add(1) + 1);
db::client applier(*this, client_id,
wsrep::client_context::m_applier, simulator_.params());
enum wsrep::provider::status ret(provider().run_applier(&applier));
wsrep::log() << "Applier thread exited with error code " << ret;
}
wsrep::client_context* db::server::local_client_context()
{
std::ostringstream id_os;
size_t client_id(++last_client_id_);
return new db::client(*this, client_id,
wsrep::client_context::m_replicating,
simulator_.params());
}
void db::server::start_clients()
{
size_t n_clients(simulator_.params().n_clients);
for (size_t i(0); i < n_clients; ++i)
{
start_client(i + 1);
}
}
void db::server::stop_clients()
{
for (auto& i : client_threads_)
{
i.join();
}
for (const auto& i : clients_)
{
struct db::client::stats stats(i->stats());
simulator_.stats_.commits += stats.commits;
simulator_.stats_.aborts += stats.aborts;
simulator_.stats_.replays += stats.replays;
}
}
void db::server::client_thread(const std::shared_ptr<db::client>& client)
{
client->store_globals();
client->start();
}
void db::server::start_client(size_t id)
{
auto client(std::make_shared<db::client>(
*this, id,
wsrep::client_context::m_replicating,
simulator_.params()));
clients_.push_back(client);
client_threads_.push_back(
boost::thread(&db::server::client_thread, this, client));
}

113
dbsim/db_server.hpp Normal file
View File

@ -0,0 +1,113 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_DB_SERVER_HPP
#define WSREP_DB_SERVER_HPP
namespace db
{
class server
{
public:
server(simulator& simulator,
const std::string& name,
const std::string& id,
const std::string& address)
: wsrep::server_context(mutex_,
cond_,
name, id, address, name + "_data",
wsrep::server_context::rm_async)
, simulator_(simulator)
, storage_engine_(simulator_.params())
, mutex_()
, cond_()
, last_client_id_(0)
, last_transaction_id_(0)
, appliers_()
, clients_()
, client_threads_()
{ }
// Provider management
void applier_thread();
void start_applier()
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
appliers_.push_back(boost::thread(&dbms_server::applier_thread, this));
}
void stop_applier()
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
appliers_.front().join();
appliers_.erase(appliers_.begin());
}
bool sst_before_init() const override { return false; }
std::string on_sst_required()
{
return id();
}
void on_sst_request(const std::string& req,
const wsrep::gtid& gtid,
bool bypass)
{
simulator_.donate_sst(*this, req, gtid, bypass);
}
void background_rollback(wsrep::client_context& cs) override
{
assert(0);
cs.before_rollback();
cs.after_rollback();
}
// Client context management
wsrep::client_context* local_client_context();
wsrep::client_context* streaming_applier_client_context() override
{
throw wsrep::not_implemented_error();
}
void log_dummy_write_set(wsrep::client_context&, const wsrep::ws_meta&)
override
{ }
size_t next_transaction_id()
{
return (last_transaction_id_.fetch_add(1) + 1);
}
storage_engine& storage_engine() { return storage_engine_; }
int apply_to_storage_engine(const wsrep::transaction_context& txc,
const wsrep::const_buffer&)
{
storage_engine_.bf_abort_some(txc);
return 0;
}
void start_clients();
void stop_clients();
void client_thread(const std::shared_ptr<dbms_client>& client);
private:
void start_client(size_t id);
simulator& simulator_;
storage_engine storage_engine_;
wsrep::default_mutex mutex_;
wsrep::default_condition_variable cond_;
std::atomic<size_t> last_client_id_;
std::atomic<size_t> last_transaction_id_;
std::vector<boost::thread> appliers_;
std::vector<std::shared_ptr<dbms_client>> clients_;
std::vector<boost::thread> client_threads_;
};
};
#endif // WSREP_DB_SERVER_HPP

View File

@ -0,0 +1,26 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_DB_SERVER_CONTEXT_HPP
#define WSREP_DB_SERVER_CONTEXT_HPP
#include "wsrep/server_context.hpp"
#include <atomic>
namespace db
{
class server_context : public wsrep::server_context
{
public:
size_t next_transaction_id()
{
return (last_transaction_id_.fetch_add(1) + 1);
}
private:
std::atomic<size_t> last_transaction_id_;
};
}
#endif // WSREP_DB_SERVER_CONTEXT_HPP

65
dbsim/db_simulator.hpp Normal file
View File

@ -0,0 +1,65 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_DB_SIMULATOR_HPP
#define WSREP_DB_SIMULATOR_HPP
#include <memory>
#include <chrono>
#include <unordered_set>
#include "db_params.hpp"
namespace db
{
class server;
class simulator
{
public:
simulator(const params& params)
: mutex_()
, params_(params)
, servers_()
, clients_start_()
, clients_stop_()
, stats_()
{ }
void start();
void stop();
void donate_sst(server&,
const std::string& req, const wsrep::gtid& gtid, bool);
const simulator_params& params() const
{ return params_; }
std::string stats() const;
private:
std::string server_port(size_t i) const
{
std::ostringstream os;
os << (10000 + (i + 1)*10);
return os.str();
}
std::string build_cluster_address() const;
wsrep::default_mutex mutex_;
const simulator_params& params_;
std::map<size_t, std::unique_ptr<server>> servers_;
std::chrono::time_point<std::chrono::steady_clock> clients_start_;
std::chrono::time_point<std::chrono::steady_clock> clients_stop_;
public:
struct stats
{
long long commits;
long long aborts;
long long replays;
stats()
: commits(0)
, aborts(0)
, replays(0)
{ }
} stats_;
};
}
#endif // WSRE_DB_SIMULATOR_HPP

View File

@ -0,0 +1,53 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "db_storage_engine.hpp"
#include "db_client.hpp"
void db::storage_engine::transaction::start(client* cc)
{
wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_);
if (se_.transactions_.insert(cc).second == false)
{
::abort();
}
cc_ = cc;
}
void db::storage_engine::transaction::commit()
{
if (cc_)
{
wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_);
se_.transactions_.erase(cc_);
}
cc_ = nullptr;
}
void db::storage_engine::transaction::rollback()
{
if (cc_)
{
wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_);
se_.transactions_.erase(cc_);
}
cc_ = nullptr;
}
void db::storage_engine::bf_abort_some(const wsrep::transaction_context& txc)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
if (alg_freq_ && (std::rand() % alg_freq_) == 0)
{
if (transactions_.empty() == false)
{
auto* victim_txc(*transactions_.begin());
if (victim_txc->bf_abort(txc.seqno()))
{
++bf_aborts_;
}
}
}
}

View File

@ -0,0 +1,60 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_DB_STORAGE_ENGINE_HPP
#define WSREP_DB_STORAGE_ENGINE_HPP
#include "db_params.hpp"
#include "wsrep/mutex.hpp"
#include "wsrep/client_context.hpp"
#include <atomic>
#include <unordered_set>
namespace db
{
class client;
class storage_engine
{
public:
storage_engine(const params& params)
: mutex_()
, transactions_()
, alg_freq_(params.alg_freq)
, bf_aborts_()
{ }
class transaction
{
public:
transaction(storage_engine& se)
: se_(se)
, cc_()
{ }
~transaction()
{
rollback();
}
bool active() const { return cc_ != nullptr; }
void start(client* cc);
void commit();
void rollback();
transaction(const transaction&) = delete;
transaction& operator=(const transaction&) = delete;
private:
storage_engine& se_;
client* cc_;
};
void bf_abort_some(const wsrep::transaction_context& tc);
long long bf_aborts() const { return bf_aborts_; }
private:
wsrep::default_mutex mutex_;
std::unordered_set<db::client*> transactions_;
size_t alg_freq_;
std::atomic<long long> bf_aborts_;
};
}
#endif // WSREP_DB_STORAGE_ENGINE_HPP

View File

@ -32,6 +32,7 @@
class dbms_server; class dbms_server;
#if 0
struct dbms_simulator_params struct dbms_simulator_params
{ {
size_t n_servers; size_t n_servers;
@ -55,7 +56,8 @@ struct dbms_simulator_params
, fast_exit(0) , fast_exit(0)
{ } { }
}; };
#endif
#if 0
class dbms_storage_engine class dbms_storage_engine
{ {
public: public:
@ -154,7 +156,8 @@ private:
size_t alg_freq_; size_t alg_freq_;
std::atomic<long long> bf_aborts_; std::atomic<long long> bf_aborts_;
}; };
#endif
#if 0
class dbms_simulator class dbms_simulator
{ {
public: public:
@ -203,9 +206,11 @@ public:
{ } { }
} stats_; } stats_;
}; };
#endif
class dbms_client; class dbms_client;
#if 0
class dbms_server : public wsrep::server_context class dbms_server : public wsrep::server_context
{ {
public: public:
@ -306,6 +311,7 @@ private:
std::vector<boost::thread> client_threads_; std::vector<boost::thread> client_threads_;
}; };
#endif
class dbms_client : public wsrep::client_context class dbms_client : public wsrep::client_context
{ {
public: public:
@ -534,65 +540,6 @@ private:
}; };
// Server methods
void dbms_server::applier_thread()
{
wsrep::client_id client_id(last_client_id_.fetch_add(1) + 1);
dbms_client applier(*this, client_id,
wsrep::client_context::m_applier, simulator_.params());
enum wsrep::provider::status ret(provider().run_applier(&applier));
wsrep::log() << "Applier thread exited with error code " << ret;
}
wsrep::client_context* dbms_server::local_client_context()
{
std::ostringstream id_os;
size_t client_id(++last_client_id_);
return new dbms_client(*this, client_id,
wsrep::client_context::m_replicating,
simulator_.params());
}
void dbms_server::start_clients()
{
size_t n_clients(simulator_.params().n_clients);
for (size_t i(0); i < n_clients; ++i)
{
start_client(i + 1);
}
}
void dbms_server::stop_clients()
{
for (auto& i : client_threads_)
{
i.join();
}
for (const auto& i : clients_)
{
struct dbms_client::stats stats(i->stats());
simulator_.stats_.commits += stats.commits;
simulator_.stats_.aborts += stats.aborts;
simulator_.stats_.replays += stats.replays;
}
}
void dbms_server::client_thread(const std::shared_ptr<dbms_client>& client)
{
client->store_globals();
client->start();
}
void dbms_server::start_client(size_t id)
{
auto client(std::make_shared<dbms_client>(
*this, id,
wsrep::client_context::m_replicating,
simulator_.params()));
clients_.push_back(client);
client_threads_.push_back(
boost::thread(&dbms_server::client_thread, this, client));
}

View File

@ -5,6 +5,7 @@
#ifndef WSREP_BUFFER_HPP #ifndef WSREP_BUFFER_HPP
#define WSREP_BUFFER_HPP #define WSREP_BUFFER_HPP
#include <cstddef>
#include <vector> #include <vector>
namespace wsrep namespace wsrep

View File

@ -556,10 +556,10 @@ namespace wsrep
/* /*
* Friend declarations * Friend declarations
*/ */
friend int server_context::on_apply(client_context&, //friend int server_context::on_apply(client_context&,
const wsrep::ws_handle&, // const wsrep::ws_handle&,
const wsrep::ws_meta&, // const wsrep::ws_meta&,
const wsrep::const_buffer&); // const wsrep::const_buffer&);
friend class client_context_switch; friend class client_context_switch;
friend class client_applier_mode; friend class client_applier_mode;
friend class client_toi_mode; friend class client_toi_mode;

View File

@ -11,10 +11,18 @@
* wsrep-lib operation or applier client needs to apply a write set. * wsrep-lib operation or applier client needs to apply a write set.
*/ */
#ifndef WSREP_CLIENT_SERVICE_HPP
#define WSREP_CLIENT_SERVICE_HPP
#include "buffer.hpp" #include "buffer.hpp"
#include "provider.hpp"
#include "mutex.hpp"
#include "lock.hpp"
namespace wsrep namespace wsrep
{ {
class transaction_context;
class client_context;
class client_service class client_service
{ {
public: public:
@ -62,6 +70,10 @@ namespace wsrep
wsrep::client_context&, const wsrep::transaction_context&, wsrep::mutable_buffer&) = 0; wsrep::client_context&, const wsrep::transaction_context&, wsrep::mutable_buffer&) = 0;
virtual void remove_fragments(const wsrep::transaction_context&) = 0; virtual void remove_fragments(const wsrep::transaction_context&) = 0;
//
// Applying interface
//
/*! /*!
* Apply a write set. * Apply a write set.
*/ */
@ -77,6 +89,10 @@ namespace wsrep
*/ */
virtual int rollback(wsrep::client_context&) = 0; virtual int rollback(wsrep::client_context&) = 0;
//
// Interface to global server state
//
/*! /*!
* Forcefully shut down the DBMS process or replication system. * Forcefully shut down the DBMS process or replication system.
* This may be called in situations where * This may be called in situations where
@ -150,6 +166,6 @@ namespace wsrep
{ {
public: public:
}; };
} }
#endif // WSREP_CLIENT_SERVICE_HPP