1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-10-18 00:31:19 +03:00

Changed project name to wsrep-lib.

This commit is contained in:
Teemu Ollakka
2018-06-03 21:56:28 +03:00
parent 188bda1339
commit d3cb537d1e
37 changed files with 730 additions and 730 deletions

View File

@@ -5,19 +5,19 @@
//
// This file implementes a simple DBMS simulator which
// will launch one or server threads and replicates transactions
// through trrep interface.
// through wsrep interface.
//
#include "trrep/server_context.hpp"
#include "trrep/client_context.hpp"
#include "trrep/transaction_context.hpp"
#include "trrep/key.hpp"
#include "trrep/data.hpp"
#include "trrep/provider.hpp"
#include "trrep/condition_variable.hpp"
#include "trrep/view.hpp"
#include "trrep/logger.hpp"
#include "wsrep/server_context.hpp"
#include "wsrep/client_context.hpp"
#include "wsrep/transaction_context.hpp"
#include "wsrep/key.hpp"
#include "wsrep/data.hpp"
#include "wsrep/provider.hpp"
#include "wsrep/condition_variable.hpp"
#include "wsrep/view.hpp"
#include "wsrep/logger.hpp"
#include <boost/program_options.hpp>
#include <boost/filesystem.hpp>
@@ -76,9 +76,9 @@ public:
bool active() const { return cc_ != nullptr; }
void start(trrep::client_context* cc)
void start(wsrep::client_context* cc)
{
trrep::unique_lock<trrep::mutex> lock(se_.mutex_);
wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_);
if (se_.transactions_.insert(cc).second == false)
{
::abort();
@@ -90,7 +90,7 @@ public:
{
if (cc_)
{
trrep::unique_lock<trrep::mutex> lock(se_.mutex_);
wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_);
se_.transactions_.erase(cc_);
}
cc_ = nullptr;
@@ -101,7 +101,7 @@ public:
{
if (cc_)
{
trrep::unique_lock<trrep::mutex> lock(se_.mutex_);
wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_);
se_.transactions_.erase(cc_);
}
cc_ = nullptr;
@@ -117,23 +117,23 @@ public:
private:
dbms_storage_engine& se_;
trrep::client_context* cc_;
wsrep::client_context* cc_;
};
void bf_abort_some(const trrep::transaction_context& txc)
void bf_abort_some(const wsrep::transaction_context& txc)
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
if (alg_freq_ && (std::rand() % alg_freq_) == 0)
{
if (transactions_.empty() == false)
{
auto* victim_txc(*transactions_.begin());
trrep::unique_lock<trrep::mutex> victim_txc_lock(
wsrep::unique_lock<wsrep::mutex> victim_txc_lock(
victim_txc->mutex());
lock.unlock();
if (victim_txc->bf_abort(victim_txc_lock, txc.seqno()))
{
trrep::log() << "BF aborted " << victim_txc->id().get();
wsrep::log() << "BF aborted " << victim_txc->id().get();
++bf_aborts_;
}
}
@@ -145,8 +145,8 @@ public:
return bf_aborts_;
}
private:
trrep::default_mutex mutex_;
std::unordered_set<trrep::client_context*> transactions_;
wsrep::default_mutex mutex_;
std::unordered_set<wsrep::client_context*> transactions_;
size_t alg_freq_;
std::atomic<long long> bf_aborts_;
};
@@ -181,7 +181,7 @@ private:
}
std::string build_cluster_address() const;
trrep::default_mutex mutex_;
wsrep::default_mutex mutex_;
const dbms_simulator_params& params_;
std::map<size_t, std::unique_ptr<dbms_server>> servers_;
std::chrono::time_point<std::chrono::steady_clock> clients_start_;
@@ -202,17 +202,17 @@ public:
class dbms_client;
class dbms_server : public trrep::server_context
class dbms_server : public wsrep::server_context
{
public:
dbms_server(dbms_simulator& simulator,
const std::string& name,
const std::string& id,
const std::string& address)
: trrep::server_context(mutex_,
: wsrep::server_context(mutex_,
cond_,
name, id, address, name + "_data",
trrep::server_context::rm_async)
wsrep::server_context::rm_async)
, simulator_(simulator)
, storage_engine_(simulator_.params())
, mutex_()
@@ -230,13 +230,13 @@ public:
void start_applier()
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
appliers_.push_back(boost::thread(&dbms_server::applier_thread, this));
}
void stop_applier()
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
appliers_.front().join();
appliers_.erase(appliers_.begin());
}
@@ -256,7 +256,7 @@ public:
// Client context management
trrep::client_context* local_client_context();
wsrep::client_context* local_client_context();
size_t next_transaction_id()
{
@@ -265,8 +265,8 @@ public:
dbms_storage_engine& storage_engine() { return storage_engine_; }
int apply_to_storage_engine(const trrep::transaction_context& txc,
const trrep::data&)
int apply_to_storage_engine(const wsrep::transaction_context& txc,
const wsrep::data&)
{
storage_engine_.bf_abort_some(txc);
return 0;
@@ -281,8 +281,8 @@ private:
dbms_simulator& simulator_;
dbms_storage_engine storage_engine_;
trrep::default_mutex mutex_;
trrep::default_condition_variable cond_;
wsrep::default_mutex mutex_;
wsrep::default_condition_variable cond_;
std::atomic<size_t> last_client_id_;
std::atomic<size_t> last_transaction_id_;
std::vector<boost::thread> appliers_;
@@ -290,14 +290,14 @@ private:
std::vector<boost::thread> client_threads_;
};
class dbms_client : public trrep::client_context
class dbms_client : public wsrep::client_context
{
public:
dbms_client(dbms_server& server,
const trrep::client_id& id,
enum trrep::client_context::mode mode,
const wsrep::client_id& id,
enum wsrep::client_context::mode mode,
size_t n_transactions)
: trrep::client_context(mutex_, server, id, mode)
: wsrep::client_context(mutex_, server, id, mode)
, mutex_()
, server_(server)
, se_trx_(server_.storage_engine())
@@ -307,7 +307,7 @@ public:
~dbms_client()
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
}
void start()
@@ -337,13 +337,13 @@ public:
}
private:
bool do_2pc() const override { return false; }
int apply(const trrep::data& data) override
int apply(const wsrep::data& data) override
{
return server_.apply_to_storage_engine(transaction(), data);
}
int commit() override
{
assert(mode() == trrep::client_context::m_applier);
assert(mode() == wsrep::client_context::m_applier);
int ret(0);
ret = before_commit();
se_trx_.commit();
@@ -353,31 +353,31 @@ private:
}
int rollback() override
{
trrep::log() << "rollback: " << transaction().id().get()
wsrep::log() << "rollback: " << transaction().id().get()
<< "state: "
<< trrep::to_string(transaction().state());
<< wsrep::to_string(transaction().state());
before_rollback();
se_trx_.abort();
after_rollback();
return 0;
}
void will_replay(trrep::transaction_context&) override { }
int replay(trrep::transaction_context& txc) override
void will_replay(wsrep::transaction_context&) override { }
int replay(wsrep::transaction_context& txc) override
{
trrep::log() << "replay: " << txc.id().get();
trrep::client_applier_mode applier_mode(*this);
wsrep::log() << "replay: " << txc.id().get();
wsrep::client_applier_mode applier_mode(*this);
++stats_.replays;
return provider().replay(&txc.ws_handle(), this);
}
void wait_for_replayers(trrep::unique_lock<trrep::mutex>&) const override
void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>&) const override
{ }
bool killed() const override { return false; }
void abort() const override { ::abort(); }
void store_globals() override { }
void debug_sync(const char*) override { }
void debug_suicide(const char*) override { }
void on_error(enum trrep::client_error) override { }
void on_error(enum wsrep::client_error) override { }
template <class Func>
int client_command(Func f)
@@ -417,13 +417,13 @@ private:
int data(std::rand() % 10000000);
std::ostringstream os;
os << data;
trrep::key key;
wsrep::key key;
key.append_key_part("dbms", 4);
wsrep_conn_id_t client_id(id().get());
key.append_key_part(&client_id, sizeof(client_id));
key.append_key_part(&data, sizeof(data));
err = append_key(key);
err = err || append_data(trrep::data(os.str().c_str(),
err = err || append_data(wsrep::data(os.str().c_str(),
os.str().size()));
return err;
});
@@ -445,16 +445,16 @@ private:
});
assert((current_error() &&
transaction().state() == trrep::transaction_context::s_aborted) ||
transaction().state() == trrep::transaction_context::s_committed);
transaction().state() == wsrep::transaction_context::s_aborted) ||
transaction().state() == wsrep::transaction_context::s_committed);
assert(se_trx_.active() == false);
assert(transaction().active() == false);
switch (transaction().state())
{
case trrep::transaction_context::s_committed:
case wsrep::transaction_context::s_committed:
++stats_.commits;
break;
case trrep::transaction_context::s_aborted:
case wsrep::transaction_context::s_aborted:
++stats_.aborts;
break;
default:
@@ -466,12 +466,12 @@ private:
{
if ((i % 1000) == 0)
{
trrep::log() << "client: " << id().get()
wsrep::log() << "client: " << id().get()
<< " transactions: " << i
<< " " << 100*double(i)/n_transactions_ << "%";
}
}
trrep::default_mutex mutex_;
wsrep::default_mutex mutex_;
dbms_server& server_;
dbms_storage_engine::transaction se_trx_;
const size_t n_transactions_;
@@ -482,18 +482,18 @@ private:
// Server methods
void dbms_server::applier_thread()
{
trrep::client_id client_id(last_client_id_.fetch_add(1) + 1);
wsrep::client_id client_id(last_client_id_.fetch_add(1) + 1);
dbms_client applier(*this, client_id,
trrep::client_context::m_applier, 0);
wsrep::client_context::m_applier, 0);
wsrep_status_t ret(provider().run_applier(&applier));
trrep::log() << "Applier thread exited with error code " << ret;
wsrep::log() << "Applier thread exited with error code " << ret;
}
trrep::client_context* dbms_server::local_client_context()
wsrep::client_context* dbms_server::local_client_context()
{
std::ostringstream id_os;
size_t client_id(++last_client_id_);
return new dbms_client(*this, client_id, trrep::client_context::m_replicating, 0);
return new dbms_client(*this, client_id, wsrep::client_context::m_replicating, 0);
}
void dbms_server::start_clients()
@@ -529,7 +529,7 @@ void dbms_server::start_client(size_t id)
{
auto client(std::make_shared<dbms_client>(
*this, id,
trrep::client_context::m_replicating,
wsrep::client_context::m_replicating,
simulator_.params().n_transactions));
clients_.push_back(client);
client_threads_.push_back(
@@ -540,10 +540,10 @@ void dbms_server::start_client(size_t id)
void dbms_simulator::start()
{
trrep::log() << "Provider: " << params_.wsrep_provider;
wsrep::log() << "Provider: " << params_.wsrep_provider;
std::string cluster_address(build_cluster_address());
trrep::log() << "Cluster address: " << cluster_address;
wsrep::log() << "Cluster address: " << cluster_address;
for (size_t i(0); i < params_.n_servers; ++i)
{
std::ostringstream name_os;
@@ -562,7 +562,7 @@ void dbms_simulator::start()
address_os.str()))));
if (it.second == false)
{
throw trrep::runtime_error("Failed to add server");
throw wsrep::runtime_error("Failed to add server");
}
boost::filesystem::path dir(std::string("./") + id_os.str() + "_data");
boost::filesystem::create_directory(dir);
@@ -573,19 +573,19 @@ void dbms_simulator::start()
if (server.load_provider(params_.wsrep_provider, server_options))
{
throw trrep::runtime_error("Failed to load provider");
throw wsrep::runtime_error("Failed to load provider");
}
if (server.connect("sim_cluster", cluster_address, "",
i == 0))
{
throw trrep::runtime_error("Failed to connect");
throw wsrep::runtime_error("Failed to connect");
}
server.start_applier();
server.wait_until_state(trrep::server_context::s_synced);
server.wait_until_state(wsrep::server_context::s_synced);
}
// Start client threads
trrep::log() << "####################### Starting client load";
wsrep::log() << "####################### Starting client load";
clients_start_ = std::chrono::steady_clock::now();
for (auto& i : servers_)
{
@@ -602,9 +602,9 @@ void dbms_simulator::stop()
server.stop_clients();
}
clients_stop_ = std::chrono::steady_clock::now();
trrep::log() << "######## Stats ############";
trrep::log() << stats();
trrep::log() << "######## Stats ############";
wsrep::log() << "######## Stats ############";
wsrep::log() << stats();
wsrep::log() << "######## Stats ############";
if (params_.fast_exit)
{
exit(0);
@@ -612,16 +612,16 @@ void dbms_simulator::stop()
for (auto& i : servers_)
{
dbms_server& server(*i.second);
trrep::log() << "Status for server: " << server.id();
wsrep::log() << "Status for server: " << server.id();
auto status(server.provider().status());
for_each(status.begin(), status.end(),
[](const trrep::provider::status_variable& sv)
[](const wsrep::provider::status_variable& sv)
{
trrep::log() << sv.name() << " = " << sv.value();
wsrep::log() << sv.name() << " = " << sv.value();
});
server.disconnect();
server.wait_until_state(trrep::server_context::s_disconnected);
server.wait_until_state(wsrep::server_context::s_disconnected);
server.stop_applier();
}
}
@@ -663,15 +663,15 @@ void dbms_simulator::donate_sst(dbms_server& server,
size_t id;
std::istringstream is(req);
is >> id;
trrep::unique_lock<trrep::mutex> lock(mutex_);
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
auto i(servers_.find(id));
if (i == servers_.end())
{
throw trrep::runtime_error("Server " + req + " not found");
throw wsrep::runtime_error("Server " + req + " not found");
}
if (bypass == false)
{
trrep::log() << "SST " << server.id() << " -> " << id;
wsrep::log() << "SST " << server.id() << " -> " << id;
}
i->second->sst_received(gtid, 0);
server.sst_sent(gtid, 0);
@@ -742,17 +742,17 @@ int main(int argc, char** argv)
}
catch (const std::exception& e)
{
trrep::log() << "Caught exception: " << e.what();
wsrep::log() << "Caught exception: " << e.what();
}
stats = sim.stats();
}
catch (const std::exception& e)
{
trrep::log() << e.what();
wsrep::log() << e.what();
return 1;
}
trrep::log() << "Stats:\n" << stats << "\n";
wsrep::log() << "Stats:\n" << stats << "\n";
return 0;
}