1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-10-18 00:31:19 +03:00

DBMS simulator runs client load

This commit is contained in:
Teemu Ollakka
2018-04-19 18:44:56 +03:00
parent 3510d0739c
commit 5c1d390b74
6 changed files with 254 additions and 53 deletions

View File

@@ -10,6 +10,9 @@
#include "server_context.hpp"
#include "client_context.hpp"
#include "transaction_context.hpp"
#include "key.hpp"
#include "data.hpp"
#include "provider.hpp"
#include "condition_variable.hpp"
#include "view.hpp"
@@ -25,24 +28,36 @@
class dbms_server;
struct dbms_simulator_params
{
size_t n_servers;
size_t n_clients;
size_t n_transactions;
std::string wsrep_provider;
std::string wsrep_provider_options;
dbms_simulator_params()
: n_servers(0)
, n_clients(0)
, n_transactions(0)
, wsrep_provider()
, wsrep_provider_options()
{ }
};
class dbms_simulator
{
public:
dbms_simulator(size_t n_servers,
size_t n_clients,
const std::string& wsrep_provider,
const std::string& wsrep_provider_options)
dbms_simulator(const dbms_simulator_params& params)
: mutex_()
, params_(params)
, servers_()
, n_servers_(n_servers)
, n_clients_(n_clients)
, wsrep_provider_(wsrep_provider)
, wsrep_provider_options_(wsrep_provider_options)
{ }
void start();
void stop();
void donate_sst(dbms_server&,
const std::string& req, const wsrep_gtid_t& gtid, bool);
const dbms_simulator_params& params() const
{ return params_; }
private:
std::string server_port(size_t i) const
{
@@ -53,11 +68,8 @@ private:
std::string build_cluster_address() const;
trrep::default_mutex mutex_;
const dbms_simulator_params& params_;
std::map<size_t, std::unique_ptr<dbms_server>> servers_;
size_t n_servers_;
size_t n_clients_;
std::string wsrep_provider_;
std::string wsrep_provider_options_;
};
class dbms_client;
@@ -80,8 +92,10 @@ public:
, cond_()
, state_(s_disconnected)
, last_client_id_(0)
, last_transaction_id_(0)
, appliers_()
, clients_()
, client_threads_()
{ }
// Provider management
@@ -136,6 +150,9 @@ public:
void on_sync()
{
std::cerr << "Synced with group" << "\n";
trrep::unique_lock<trrep::mutex> lock(mutex_);
state_ = s_synced;
cond_.notify_all();
}
std::string on_sst_request()
@@ -159,6 +176,14 @@ public:
provider().sst_received(gtid, 0);
}
void wait_until_state(enum state state)
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
while (state_ != state)
{
cond_.wait(lock);
}
}
void wait_until_connected()
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
@@ -167,6 +192,7 @@ public:
cond_.wait(lock);
}
}
void wait_until_disconnected()
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
@@ -178,47 +204,123 @@ public:
// Client context management
trrep::client_context* local_client_context();
size_t next_transaction_id()
{
return (last_transaction_id_.fetch_add(1) + 1);
}
void start_clients();
void stop_clients();
void client_thread(const std::shared_ptr<dbms_client>& client);
private:
void start_client(size_t id);
dbms_simulator& simulator_;
trrep::default_mutex mutex_;
trrep::default_condition_variable cond_;
enum state state_;
std::atomic<size_t> last_client_id_;
std::atomic<size_t> last_transaction_id_;
std::vector<std::thread> appliers_;
std::map<size_t, std::unique_ptr<dbms_client>> clients_;
std::vector<std::thread> client_threads_;
};
class dbms_client : public trrep::client_context
{
public:
dbms_client(dbms_server& server, const trrep::client_id& id,
enum trrep::client_context::mode mode)
dbms_client(dbms_server& server,
const trrep::client_id& id,
enum trrep::client_context::mode mode,
size_t n_transactions)
: trrep::client_context(mutex_, server, id, mode)
, mutex_()
, server_(server)
, n_transactions_(n_transactions)
{ }
void start()
{
for (size_t i(0); i < n_transactions_; ++i)
{
run_one_transaction();
}
}
bool do_2pc() const { return false; }
int apply(trrep::transaction_context&, const trrep::data&)
{
std::cerr << "applying" << "\n";
return 0;
}
int commit(trrep::transaction_context&)
int commit(trrep::transaction_context& transaction_context)
{
int ret(0);
ret = transaction_context.before_commit();
ret = ret || transaction_context.ordered_commit();
ret = ret || transaction_context.after_commit();
std::cerr << "commit" << "\n";
return 0;
}
int rollback(trrep::transaction_context&)
int rollback(trrep::transaction_context& transaction_context)
{
// std::cerr << "rollback: " << transaction_context.id().get() << "\n";
transaction_context.before_rollback();
transaction_context.after_rollback();
return 0;
}
private:
void run_one_transaction()
{
trrep::transaction_context trx(*this);
trx.start_transaction(server_.next_transaction_id());
std::ostringstream os;
os << trx.id().get();
trrep::key key;
key.append_key_part("dbms", 4);
wsrep_conn_id_t client_id(id().get());
key.append_key_part(&client_id, sizeof(client_id));
wsrep_trx_id_t trx_id(trx.id().get());
int err(0);
key.append_key_part(&trx_id, sizeof(trx_id));
err = trx.append_key(key);
std::cout << "append_key: " << err << "\n";
err = err || trx.append_data(trrep::data(os.str().c_str(), os.str().size()));
std::cout << "append_data: " << err << "\n";
if (do_2pc())
{
err = err || trx.before_prepare();
std::cout << "before_prepare: " << err << "\n";
err = err || trx.after_prepare();
std::cout << "after_prepare: " << err << "\n";
}
err = err || trx.before_commit();
std::cout << "before_commit: " << err << "\n";
err = err || trx.ordered_commit();
std::cout << "ordered_commit: " << err << "\n";
err = err || trx.after_commit();
std::cout << "after_commit: " << err << "\n";
trx.after_statement();
}
trrep::default_mutex mutex_;
dbms_server& server_;
const size_t n_transactions_;
};
// Server methods
void dbms_server::applier_thread()
{
dbms_client applier(*this, ++last_client_id_,
trrep::client_context::m_applier);
trrep::client_id client_id(last_client_id_.fetch_add(1) + 1);
dbms_client applier(*this, client_id,
trrep::client_context::m_applier, 0);
wsrep_status_t ret(provider().run_applier(&applier));
std::cerr << "Applier thread exited with error code " << ret << "\n";
}
@@ -226,22 +328,51 @@ void dbms_server::applier_thread()
trrep::client_context* dbms_server::local_client_context()
{
std::ostringstream id_os;
size_t client_id(last_client_id_.fetch_add(1) + 1);
trrep::unique_lock<trrep::mutex> lock(mutex_);
return clients_.insert(std::make_pair(client_id,
std::make_unique<dbms_client>(
*this, client_id,
trrep::client_context::m_replicating))).first->second.get();
size_t client_id(++last_client_id_);
return new dbms_client(*this, client_id, trrep::client_context::m_replicating, 0);
}
void dbms_server::start_clients()
{
size_t n_clients(simulator_.params().n_clients);
for (size_t i(0); i < n_clients; ++i)
{
start_client(i + 1);
}
}
void dbms_server::stop_clients()
{
for (auto& i : client_threads_)
{
i.join();
}
}
void dbms_server::client_thread(const std::shared_ptr<dbms_client>& client)
{
client->start();
}
void dbms_server::start_client(size_t id)
{
auto client(std::make_shared<dbms_client>(
*this, id,
trrep::client_context::m_replicating,
simulator_.params().n_transactions));
client_threads_.push_back(
std::thread(&dbms_server::client_thread, this, client));
}
void dbms_simulator::start()
{
std::cout << "Provider: " << wsrep_provider_ << "\n";
std::cout << "Provider: " << params_.wsrep_provider << "\n";
std::string cluster_address(build_cluster_address());
std::cout << "Cluster address: " << cluster_address << "\n";
for (size_t i(0); i < n_servers_; ++i)
for (size_t i(0); i < params_.n_servers; ++i)
{
std::ostringstream name_os;
name_os << (i + 1);
@@ -258,14 +389,23 @@ void dbms_simulator::start()
boost::filesystem::create_directory(dir);
dbms_server& server(*it.first->second);
std::string server_options(wsrep_provider_options_);
std::string server_options(params_.wsrep_provider_options);
server_options += "; base_port=" + server_port(i);
server.load_provider(wsrep_provider_, server_options);
server.load_provider(params_.wsrep_provider, server_options);
server.provider().connect("sim_cluster", cluster_address, "",
i == 0);
server.start_applier();
server.wait_until_connected();
server.wait_until_state(dbms_server::s_synced);
}
// Start client threads
for (auto& i : servers_)
{
i.second->start_clients();
}
}
void dbms_simulator::stop()
@@ -273,6 +413,7 @@ void dbms_simulator::stop()
for (auto& i : servers_)
{
dbms_server& server(*i.second);
server.stop_clients();
server.provider().disconnect();
server.wait_until_disconnected();
server.stop_applier();
@@ -303,13 +444,13 @@ void dbms_simulator::donate_sst(dbms_server& server,
std::string dbms_simulator::build_cluster_address() const
{
std::string ret("gcomm://");
for (size_t i(0); i < n_servers_; ++i)
for (size_t i(0); i < params_.n_servers; ++i)
{
std::ostringstream sa_os;
sa_os << "127.0.0.1:";
sa_os << server_port(i);
ret += sa_os.str();
if (i < n_servers_ - 1) ret += ",";
if (i < params_.n_servers - 1) ret += ",";
}
return ret;
}
@@ -320,23 +461,22 @@ int main(int argc, char** argv)
{
try
{
size_t n_servers;
size_t n_clients;
std::string wsrep_provider;
std::string wsrep_provider_options;
dbms_simulator_params params;
po::options_description desc("Allowed options");
desc.add_options()
("help", "produce help message")
("wsrep-provider",
po::value<std::string>(&wsrep_provider)->required(),
po::value<std::string>(&params.wsrep_provider)->required(),
"wsrep provider to load")
("wsrep-provider-options",
po::value<std::string>(&wsrep_provider_options),
po::value<std::string>(&params.wsrep_provider_options),
"wsrep provider options")
("servers", po::value<size_t>(&n_servers)->required(),
("servers", po::value<size_t>(&params.n_servers)->required(),
"number of servers to start")
("clients", po::value<size_t>(&n_clients)->required(),
"number of clients to start per server");
("clients", po::value<size_t>(&params.n_clients)->required(),
"number of clients to start per server")
("transactions", po::value<size_t>(&params.n_transactions),
"number of transactions run by a client");
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);
@@ -347,7 +487,7 @@ int main(int argc, char** argv)
return 1;
}
dbms_simulator sim(n_servers, n_clients, wsrep_provider, wsrep_provider_options);
dbms_simulator sim(params);
sim.start();
std::this_thread::sleep_for(std::chrono::seconds(5));
sim.stop();