1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Added SST methods, able to start several nodes in simulator

This commit is contained in:
Teemu Ollakka
2018-04-19 15:03:38 +03:00
parent 3c6eff581c
commit 3510d0739c
10 changed files with 165 additions and 20 deletions

View File

@ -20,6 +20,7 @@ endif()
find_package(Boost 1.54.0 REQUIRED
unit_test_framework
program_options
filesystem
)
# Coverage

View File

@ -27,5 +27,5 @@ add_test(NAME trrep_test
add_executable(dbms_simulator
dbms_simulator.cpp)
target_link_libraries(dbms_simulator trrep ${Boost_PROGRAM_OPTIONS_LIBRARY})
target_link_libraries(dbms_simulator trrep ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_FILESYSTEM_LIBRARY})
set_property(TARGET dbms_simulator PROPERTY CXX_STANDARD 14)

View File

@ -15,6 +15,7 @@
#include "view.hpp"
#include <boost/program_options.hpp>
#include <boost/filesystem.hpp>
#include <iostream>
#include <memory>
@ -31,7 +32,8 @@ public:
size_t n_clients,
const std::string& wsrep_provider,
const std::string& wsrep_provider_options)
: servers_()
: mutex_()
, servers_()
, n_servers_(n_servers)
, n_clients_(n_clients)
, wsrep_provider_(wsrep_provider)
@ -39,8 +41,18 @@ public:
{ }
void start();
void stop();
void donate_sst(dbms_server&,
const std::string& req, const wsrep_gtid_t& gtid, bool);
private:
std::string server_port(size_t i) const
{
std::ostringstream os;
os << (10000 + (i + 1)*10);
return os.str();
}
std::string build_cluster_address() const;
trrep::default_mutex mutex_;
std::map<size_t, std::unique_ptr<dbms_server>> servers_;
size_t n_servers_;
size_t n_clients_;
@ -59,8 +71,11 @@ public:
s_connected,
s_synced
};
dbms_server(const std::string& name, const std::string& id)
: trrep::server_context(name, id, trrep::server_context::rm_async)
dbms_server(dbms_simulator& simulator,
const std::string& name, const std::string& id)
: trrep::server_context(name, id, name + "_data",
trrep::server_context::rm_async)
, simulator_(simulator)
, mutex_()
, cond_()
, state_(s_disconnected)
@ -123,6 +138,27 @@ public:
std::cerr << "Synced with group" << "\n";
}
std::string on_sst_request()
{
return id();
}
void on_sst_donate_request(const std::string& req,
const wsrep_gtid_t& gtid,
bool bypass)
{
simulator_.donate_sst(*this, req, gtid, bypass);
}
void sst_sent(const wsrep_gtid_t& gtid)
{
provider().sst_sent(gtid, 0);
}
void sst_received(const wsrep_gtid_t& gtid)
{
provider().sst_received(gtid, 0);
}
void wait_until_connected()
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
@ -143,6 +179,7 @@ public:
// Client context management
trrep::client_context* local_client_context();
private:
dbms_simulator& simulator_;
trrep::default_mutex mutex_;
trrep::default_condition_variable cond_;
enum state state_;
@ -212,13 +249,18 @@ void dbms_simulator::start()
id_os << (i + 1);
auto it(servers_.insert(std::make_pair((i + 1),
std::make_unique<dbms_server>(
name_os.str(), id_os.str()))));
*this, name_os.str(), id_os.str()))));
if (it.second == false)
{
throw trrep::runtime_error("Failed to add server");
}
boost::filesystem::path dir(std::string("./") + id_os.str() + "_data");
boost::filesystem::create_directory(dir);
dbms_server& server(*it.first->second);
server.load_provider(wsrep_provider_, wsrep_provider_options_);
std::string server_options(wsrep_provider_options_);
server_options += "; base_port=" + server_port(i);
server.load_provider(wsrep_provider_, server_options);
server.provider().connect("sim_cluster", cluster_address, "",
i == 0);
server.start_applier();
@ -237,6 +279,27 @@ void dbms_simulator::stop()
}
}
void dbms_simulator::donate_sst(dbms_server& server,
const std::string& req,
const wsrep_gtid_t& gtid,
bool bypass)
{
size_t id;
std::istringstream is(req);
is >> id;
trrep::unique_lock<trrep::mutex> lock(mutex_);
auto i(servers_.find(id));
if (i == servers_.end())
{
throw trrep::runtime_error("Server " + req + " not found");
}
if (bypass == false)
{
std::cout << "SST " << server.id() << " -> " << id << "\n";
}
i->second->sst_received(gtid);
server.sst_sent(gtid);
}
std::string dbms_simulator::build_cluster_address() const
{
std::string ret("gcomm://");
@ -244,7 +307,7 @@ std::string dbms_simulator::build_cluster_address() const
{
std::ostringstream sa_os;
sa_os << "127.0.0.1:";
sa_os << (10000 + (i + 1)*10);
sa_os << server_port(i);
ret += sa_os.str();
if (i < n_servers_ - 1) ret += ",";
}

View File

@ -122,6 +122,9 @@ namespace trrep
*victim_seqno = WSREP_SEQNO_UNDEFINED;
return WSREP_OK;
}
int sst_sent(const wsrep_gtid_t&, int) { return 0; }
int sst_received(const wsrep_gtid_t&, int) { return 0; }
private:
wsrep_uuid_t group_id_;
wsrep_uuid_t node_id_;

View File

@ -18,7 +18,7 @@ namespace trrep
mock_server_context(const std::string& name,
const std::string& id,
enum trrep::server_context::rollback_mode rollback_mode)
: trrep::server_context(name, id, rollback_mode)
: trrep::server_context(name, id, "./", rollback_mode)
, provider_()
, last_client_id_(0)
{ }
@ -34,6 +34,11 @@ namespace trrep
void wait_until_connected() { }
void on_view(const trrep::view&) { }
void on_sync() { }
std::string on_sst_request() { return ""; }
void on_sst_donate_request(const std::string&,
const wsrep_gtid_t&,
bool) { }
void sst_received(const wsrep_gtid_t&) { }
// void on_apply(trrep::transaction_context&) { }
// void on_commit(trrep::transaction_context&) { }

View File

@ -55,6 +55,8 @@ namespace trrep
virtual int commit_order_leave(wsrep_ws_handle_t*) = 0;
virtual int release(wsrep_ws_handle_t*) = 0;
virtual int sst_sent(const wsrep_gtid_t&, int) = 0;
virtual int sst_received(const wsrep_gtid_t&, int) = 0;
// Factory method
static provider* make_provider(const std::string& provider);
};

View File

@ -33,7 +33,7 @@ namespace
return (flags & WSREP_FLAG_ROLLBACK);
}
static wsrep_cb_status_t connected_cb(
wsrep_cb_status_t connected_cb(
void* app_ctx,
const wsrep_view_info_t* view __attribute((unused)))
{
@ -55,7 +55,7 @@ namespace
}
}
static wsrep_cb_status_t view_cb(void* app_ctx,
wsrep_cb_status_t view_cb(void* app_ctx,
void* recv_ctx __attribute__((unused)),
const wsrep_view_info_t* view_info,
const char*,
@ -78,12 +78,32 @@ namespace
}
}
static wsrep_cb_status_t apply_cb(void* ctx,
const wsrep_ws_handle_t* wsh,
uint32_t flags,
const wsrep_buf_t* buf,
const wsrep_trx_meta_t* meta,
wsrep_bool_t* exit_loop __attribute__((unused)))
wsrep_cb_status_t sst_request_cb(void* app_ctx,
void **sst_req, size_t* sst_req_len)
{
assert(app_ctx);
trrep::server_context& server_context(
*reinterpret_cast<trrep::server_context*>(app_ctx));
try
{
std::string req(server_context.on_sst_request());
*sst_req = ::strdup(req.c_str());
*sst_req_len = strlen(req.c_str());
return WSREP_CB_SUCCESS;
}
catch (const trrep::runtime_error& e)
{
return WSREP_CB_FAILURE;
}
}
wsrep_cb_status_t apply_cb(void* ctx,
const wsrep_ws_handle_t* wsh,
uint32_t flags,
const wsrep_buf_t* buf,
const wsrep_trx_meta_t* meta,
wsrep_bool_t* exit_loop __attribute__((unused)))
{
wsrep_cb_status_t ret(WSREP_CB_SUCCESS);
@ -121,6 +141,30 @@ namespace
return WSREP_CB_FAILURE;
}
}
wsrep_cb_status_t sst_donate_cb(void* app_ctx,
void* ,
const wsrep_buf_t* req_buf,
const wsrep_gtid_t* gtid,
const wsrep_buf_t*,
bool bypass)
{
assert(app_ctx);
trrep::server_context& server_context(
*reinterpret_cast<trrep::server_context*>(app_ctx));
try
{
std::string req(reinterpret_cast<const char*>(req_buf->ptr),
req_buf->len);
server_context.on_sst_donate_request(req, *gtid, bypass);
return WSREP_CB_SUCCESS;
}
catch (const trrep::runtime_error& e)
{
return WSREP_CB_FAILURE;
}
}
}
int trrep::server_context::load_provider(const std::string& provider_spec,
@ -140,7 +184,7 @@ int trrep::server_context::load_provider(const std::string& provider_spec,
init_args.node_name = name_.c_str();
init_args.node_address = "";
init_args.node_incoming = "";
init_args.data_dir = "./";
init_args.data_dir = working_dir_.c_str();
init_args.options = provider_options.c_str();
init_args.proto_ver = 1;
init_args.state_id = 0;
@ -148,10 +192,10 @@ int trrep::server_context::load_provider(const std::string& provider_spec,
init_args.logger_cb = 0;
init_args.connected_cb = &connected_cb;
init_args.view_cb = &view_cb;
init_args.sst_request_cb = 0;
init_args.sst_request_cb = &sst_request_cb;
init_args.apply_cb = &apply_cb;
init_args.unordered_cb = 0;
init_args.sst_donate_cb = 0;
init_args.sst_donate_cb = &sst_donate_cb;
init_args.synced_cb = &synced_cb;
std::cerr << init_args.options << "\n";

View File

@ -32,10 +32,12 @@ namespace trrep
server_context(const std::string& name,
const std::string& id,
const std::string& working_dir,
enum rollback_mode rollback_mode)
: provider_()
, name_(name)
, id_(id)
, working_dir_(working_dir)
, rollback_mode_(rollback_mode)
{ }
@ -88,7 +90,11 @@ namespace trrep
virtual void wait_until_connected() = 0;
virtual void on_view(const trrep::view&) = 0;
virtual void on_sync() = 0;
virtual std::string on_sst_request() = 0;
virtual void on_sst_donate_request(const std::string&,
const wsrep_gtid_t&,
bool) = 0;
virtual void sst_received(const wsrep_gtid_t&) = 0;
//
// This method will be called by the applier thread when
// a remote write set is being applied. It is the responsibility
@ -113,6 +119,7 @@ namespace trrep
trrep::provider* provider_;
std::string name_;
std::string id_;
std::string working_dir_;
enum rollback_mode rollback_mode_;
};
}

View File

@ -67,3 +67,21 @@ wsrep_status_t trrep::wsrep_provider_v26::run_applier(void *applier_ctx)
{
return wsrep_->recv(wsrep_, applier_ctx);
}
int trrep::wsrep_provider_v26::sst_sent(const wsrep_gtid_t& gtid, int err)
{
if (wsrep_->sst_sent(wsrep_, &gtid, err) != WSREP_OK)
{
return 1;
}
return 0;
}
int trrep::wsrep_provider_v26::sst_received(const wsrep_gtid_t& gtid, int err)
{
if (wsrep_->sst_received(wsrep_, &gtid, 0, err) != WSREP_OK)
{
return 1;
}
return 0;
}

View File

@ -36,6 +36,8 @@ namespace trrep
wsrep_status commit_order_enter(wsrep_ws_handle_t*) { return WSREP_OK; }
int commit_order_leave(wsrep_ws_handle_t*) { return 0; }
int release(wsrep_ws_handle_t*) { return 0; }
int sst_sent(const wsrep_gtid_t&,int);
int sst_received(const wsrep_gtid_t& gtid, int);
private:
wsrep_provider_v26(const wsrep_provider_v26&);
wsrep_provider_v26& operator=(const wsrep_provider_v26);