1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-06-16 02:01:44 +03:00

DBMS simulator is now able to start and stop one server

Other:
* Added trrep::condition_variable
* Added trrep::view
This commit is contained in:
Teemu Ollakka
2018-04-19 13:09:36 +03:00
parent 557d43657c
commit 3c6eff581c
13 changed files with 463 additions and 11 deletions

View File

@ -10,7 +10,7 @@ add_library(trrep
server_context.cpp server_context.cpp
transaction_context.cpp transaction_context.cpp
wsrep_provider_v26.cpp) wsrep_provider_v26.cpp)
target_link_libraries(trrep wsrep dl) target_link_libraries(trrep wsrep pthread dl)
add_executable(trrep_test add_executable(trrep_test
mock_client_context.cpp mock_client_context.cpp

View File

@ -0,0 +1,71 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_CONDITION_VARIABLE_HPP
#define TRREP_CONDITION_VARIABLE_HPP
#include "lock.hpp"
namespace trrep
{
class condition_variable
{
public:
condition_variable() { }
virtual ~condition_variable() { }
virtual void notify_one() = 0;
virtual void notify_all() = 0;
virtual void wait(trrep::unique_lock<trrep::mutex>& lock) = 0;
private:
condition_variable(const condition_variable&);
condition_variable& operator=(const condition_variable&);
};
// Default pthreads based condition variable implementation
class default_condition_variable : condition_variable
{
public:
default_condition_variable()
: cond_()
{
if (pthread_cond_init(&cond_, 0))
{
throw trrep::runtime_error("Failed to initialized condvar");
}
}
~default_condition_variable()
{
if (pthread_cond_destroy(&cond_))
{
::abort();
}
}
void notify_one()
{
(void)pthread_cond_signal(&cond_);
}
void notify_all()
{
(void)pthread_cond_broadcast(&cond_);
}
void wait(trrep::unique_lock<trrep::mutex>& lock)
{
if (pthread_cond_wait(
&cond_,
reinterpret_cast<pthread_mutex_t*>(lock.mutex().native())))
{
throw trrep::runtime_error("Cond wait failed");
}
}
private:
pthread_cond_t cond_;
};
}
#endif // TRREP_CONDITION_VARIABLE_HPP

View File

@ -10,6 +10,9 @@
#include "server_context.hpp" #include "server_context.hpp"
#include "client_context.hpp" #include "client_context.hpp"
#include "provider.hpp"
#include "condition_variable.hpp"
#include "view.hpp"
#include <boost/program_options.hpp> #include <boost/program_options.hpp>
@ -17,6 +20,7 @@
#include <memory> #include <memory>
#include <map> #include <map>
#include <atomic> #include <atomic>
#include <thread>
class dbms_server; class dbms_server;
@ -36,6 +40,7 @@ public:
void start(); void start();
void stop(); void stop();
private: private:
std::string build_cluster_address() const;
std::map<size_t, std::unique_ptr<dbms_server>> servers_; std::map<size_t, std::unique_ptr<dbms_server>> servers_;
size_t n_servers_; size_t n_servers_;
size_t n_clients_; size_t n_clients_;
@ -48,20 +53,101 @@ class dbms_client;
class dbms_server : public trrep::server_context class dbms_server : public trrep::server_context
{ {
public: public:
enum state
{
s_disconnected,
s_connected,
s_synced
};
dbms_server(const std::string& name, const std::string& id) dbms_server(const std::string& name, const std::string& id)
: trrep::server_context(name, id, trrep::server_context::rm_async) : trrep::server_context(name, id, trrep::server_context::rm_async)
, mutex_() , mutex_()
, cond_()
, state_(s_disconnected)
, last_client_id_(0) , last_client_id_(0)
, appliers_()
, clients_() , clients_()
{ } { }
void on_connect() { } // Provider management
void on_view() { }
void on_sync() { } void applier_thread();
void start_applier()
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
appliers_.push_back(std::thread(&dbms_server::applier_thread, this));
}
void stop_applier()
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
appliers_.front().join();
appliers_.erase(appliers_.begin());
}
void on_connect()
{
std::cerr << "dbms_server: connected" << "\n";
trrep::unique_lock<trrep::mutex> lock(mutex_);
state_ = s_connected;
cond_.notify_all();
}
void on_view(const trrep::view& view)
{
std::cerr << "================================================\nView:\n"
<< "id: " << view.id() << "\n"
<< "status: " << view.status() << "\n"
<< "own_index: " << view.own_index() << "\n"
<< "final: " << view.final() << "\n"
<< "members: \n";
auto members(view.members());
for (const auto& m : members)
{
std::cerr << "id: " << m.id() << " "
<< "name: " << m.name() << "\n";
}
std::cerr << "=================================================\n";
trrep::unique_lock<trrep::mutex> lock(mutex_);
if (view.final())
{
state_ = s_disconnected;
cond_.notify_all();
}
}
void on_sync()
{
std::cerr << "Synced with group" << "\n";
}
void wait_until_connected()
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
while (state_ != s_connected)
{
cond_.wait(lock);
}
}
void wait_until_disconnected()
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
while (state_ != s_disconnected)
{
cond_.wait(lock);
}
}
// Client context management
trrep::client_context* local_client_context(); trrep::client_context* local_client_context();
private: private:
trrep::default_mutex mutex_; trrep::default_mutex mutex_;
trrep::default_condition_variable cond_;
enum state state_;
std::atomic<size_t> last_client_id_; std::atomic<size_t> last_client_id_;
std::vector<std::thread> appliers_;
std::map<size_t, std::unique_ptr<dbms_client>> clients_; std::map<size_t, std::unique_ptr<dbms_client>> clients_;
}; };
@ -90,6 +176,16 @@ private:
trrep::default_mutex mutex_; trrep::default_mutex mutex_;
}; };
// Server methods
void dbms_server::applier_thread()
{
dbms_client applier(*this, ++last_client_id_,
trrep::client_context::m_applier);
wsrep_status_t ret(provider().run_applier(&applier));
std::cerr << "Applier thread exited with error code " << ret << "\n";
}
trrep::client_context* dbms_server::local_client_context() trrep::client_context* dbms_server::local_client_context()
{ {
std::ostringstream id_os; std::ostringstream id_os;
@ -105,6 +201,9 @@ trrep::client_context* dbms_server::local_client_context()
void dbms_simulator::start() void dbms_simulator::start()
{ {
std::cout << "Provider: " << wsrep_provider_ << "\n"; std::cout << "Provider: " << wsrep_provider_ << "\n";
std::string cluster_address(build_cluster_address());
std::cout << "Cluster address: " << cluster_address << "\n";
for (size_t i(0); i < n_servers_; ++i) for (size_t i(0); i < n_servers_; ++i)
{ {
std::ostringstream name_os; std::ostringstream name_os;
@ -114,10 +213,44 @@ void dbms_simulator::start()
auto it(servers_.insert(std::make_pair((i + 1), auto it(servers_.insert(std::make_pair((i + 1),
std::make_unique<dbms_server>( std::make_unique<dbms_server>(
name_os.str(), id_os.str())))); name_os.str(), id_os.str()))));
it.first->second->load_provider(wsrep_provider_, wsrep_provider_options_); if (it.second == false)
{
throw trrep::runtime_error("Failed to add server");
}
dbms_server& server(*it.first->second);
server.load_provider(wsrep_provider_, wsrep_provider_options_);
server.provider().connect("sim_cluster", cluster_address, "",
i == 0);
server.start_applier();
server.wait_until_connected();
} }
} }
void dbms_simulator::stop()
{
for (auto& i : servers_)
{
dbms_server& server(*i.second);
server.provider().disconnect();
server.wait_until_disconnected();
server.stop_applier();
}
}
std::string dbms_simulator::build_cluster_address() const
{
std::string ret("gcomm://");
for (size_t i(0); i < n_servers_; ++i)
{
std::ostringstream sa_os;
sa_os << "127.0.0.1:";
sa_os << (10000 + (i + 1)*10);
ret += sa_os.str();
if (i < n_servers_ - 1) ret += ",";
}
return ret;
}
namespace po = boost::program_options; namespace po = boost::program_options;
int main(int argc, char** argv) int main(int argc, char** argv)
@ -153,6 +286,8 @@ int main(int argc, char** argv)
dbms_simulator sim(n_servers, n_clients, wsrep_provider, wsrep_provider_options); dbms_simulator sim(n_servers, n_clients, wsrep_provider, wsrep_provider_options);
sim.start(); sim.start();
std::this_thread::sleep_for(std::chrono::seconds(5));
sim.stop();
} }
catch (const std::exception& e) catch (const std::exception& e)
{ {

View File

@ -46,6 +46,8 @@ namespace trrep
{ {
return locked_; return locked_;
} }
M& mutex() { return mutex_; }
private: private:
unique_lock(const unique_lock&); unique_lock(const unique_lock&);
unique_lock& operator=(const unique_lock&); unique_lock& operator=(const unique_lock&);

View File

@ -30,6 +30,12 @@ namespace trrep
memset(&node_id_, 0, sizeof(node_id_)); memset(&node_id_, 0, sizeof(node_id_));
node_id_.data[0] = 1; node_id_.data[0] = 1;
} }
int connect(const std::string&, const std::string&, const std::string&,
bool)
{ return 0; }
int disconnect() { return 0; }
wsrep_status_t run_applier(void*) { return WSREP_OK; }
// Provider implemenatation interface // Provider implemenatation interface
int start_transaction(wsrep_ws_handle_t*) { return 0; } int start_transaction(wsrep_ws_handle_t*) { return 0; }
wsrep_status wsrep_status

View File

@ -31,7 +31,8 @@ namespace trrep
} }
void on_connect() { } void on_connect() { }
void on_view() { } void wait_until_connected() { }
void on_view(const trrep::view&) { }
void on_sync() { } void on_sync() { }
// void on_apply(trrep::transaction_context&) { } // void on_apply(trrep::transaction_context&) { }
// void on_commit(trrep::transaction_context&) { } // void on_commit(trrep::transaction_context&) { }

View File

@ -21,6 +21,8 @@ namespace trrep
virtual ~mutex() { } virtual ~mutex() { }
virtual void lock() = 0; virtual void lock() = 0;
virtual void unlock() = 0; virtual void unlock() = 0;
/* Return native handle */
virtual void* native() = 0;
private: private:
mutex(const mutex& other); mutex(const mutex& other);
mutex& operator=(const mutex& other); mutex& operator=(const mutex& other);
@ -62,6 +64,11 @@ namespace trrep
throw trrep::runtime_error("mutex unlock failed"); throw trrep::runtime_error("mutex unlock failed");
} }
} }
void* native()
{
return &mutex_;
}
private: private:
pthread_mutex_t mutex_; pthread_mutex_t mutex_;
}; };

View File

@ -17,6 +17,20 @@ namespace trrep
class provider class provider
{ {
public: public:
virtual ~provider() { }
// Provider state management
virtual int connect(const std::string& cluster_name,
const std::string& cluster_url,
const std::string& state_donor,
bool bootstrap) = 0;
virtual int disconnect() = 0;
// Applier interface
virtual wsrep_status_t run_applier(void* applier_ctx) = 0;
// Write set replication
// TODO: Rename to assing_read_view()
virtual int start_transaction(wsrep_ws_handle_t*) = 0; virtual int start_transaction(wsrep_ws_handle_t*) = 0;
virtual int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) = 0; virtual int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) = 0;
virtual int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) = 0; virtual int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) = 0;
@ -40,8 +54,9 @@ namespace trrep
virtual wsrep_status commit_order_enter(wsrep_ws_handle_t*) = 0; virtual wsrep_status commit_order_enter(wsrep_ws_handle_t*) = 0;
virtual int commit_order_leave(wsrep_ws_handle_t*) = 0; virtual int commit_order_leave(wsrep_ws_handle_t*) = 0;
virtual int release(wsrep_ws_handle_t*) = 0; virtual int release(wsrep_ws_handle_t*) = 0;
static provider* make_provider(const std::string& provider);
// Factory method
static provider* make_provider(const std::string& provider);
}; };
} }

View File

@ -5,6 +5,7 @@
#include "server_context.hpp" #include "server_context.hpp"
#include "client_context.hpp" #include "client_context.hpp"
#include "transaction_context.hpp" #include "transaction_context.hpp"
#include "view.hpp"
// Todo: refactor into provider factory // Todo: refactor into provider factory
#include "mock_provider.hpp" #include "mock_provider.hpp"
@ -32,6 +33,51 @@ namespace
return (flags & WSREP_FLAG_ROLLBACK); return (flags & WSREP_FLAG_ROLLBACK);
} }
static wsrep_cb_status_t connected_cb(
void* app_ctx,
const wsrep_view_info_t* view __attribute((unused)))
{
assert(app_ctx != 0);
trrep::server_context& server_context(
*reinterpret_cast<trrep::server_context*>(app_ctx));
//
// TODO: Fetch server id and group id from view infor
//
try
{
server_context.on_connect();
return WSREP_CB_SUCCESS;
}
catch (const trrep::runtime_error& e)
{
std::cerr << "Exception: " << e.what();
return WSREP_CB_FAILURE;
}
}
static wsrep_cb_status_t view_cb(void* app_ctx,
void* recv_ctx __attribute__((unused)),
const wsrep_view_info_t* view_info,
const char*,
size_t)
{
assert(app_ctx);
assert(view_info);
trrep::server_context& server_context(
*reinterpret_cast<trrep::server_context*>(app_ctx));
try
{
trrep::view view(*view_info);
server_context.on_view(view);
return WSREP_CB_SUCCESS;
}
catch (const trrep::runtime_error& e)
{
std::cerr << "Exception: " << e.what();
return WSREP_CB_FAILURE;
}
}
static wsrep_cb_status_t apply_cb(void* ctx, static wsrep_cb_status_t apply_cb(void* ctx,
const wsrep_ws_handle_t* wsh, const wsrep_ws_handle_t* wsh,
uint32_t flags, uint32_t flags,
@ -58,6 +104,23 @@ namespace
} }
return ret; return ret;
} }
wsrep_cb_status_t synced_cb(void* app_ctx)
{
assert(app_ctx);
trrep::server_context& server_context(
*reinterpret_cast<trrep::server_context*>(app_ctx));
try
{
server_context.on_sync();
return WSREP_CB_SUCCESS;
}
catch (const trrep::runtime_error& e)
{
std::cerr << "On sync failed: " << e.what() << "\n";
return WSREP_CB_FAILURE;
}
}
} }
int trrep::server_context::load_provider(const std::string& provider_spec, int trrep::server_context::load_provider(const std::string& provider_spec,
@ -83,13 +146,13 @@ int trrep::server_context::load_provider(const std::string& provider_spec,
init_args.state_id = 0; init_args.state_id = 0;
init_args.state = 0; init_args.state = 0;
init_args.logger_cb = 0; init_args.logger_cb = 0;
init_args.connected_cb = 0; init_args.connected_cb = &connected_cb;
init_args.view_cb = 0; init_args.view_cb = &view_cb;
init_args.sst_request_cb = 0; init_args.sst_request_cb = 0;
init_args.apply_cb = &apply_cb; init_args.apply_cb = &apply_cb;
init_args.unordered_cb = 0; init_args.unordered_cb = 0;
init_args.sst_donate_cb = 0; init_args.sst_donate_cb = 0;
init_args.synced_cb = 0; init_args.synced_cb = &synced_cb;
std::cerr << init_args.options << "\n"; std::cerr << init_args.options << "\n";
provider_ = new trrep::wsrep_provider_v26(provider_spec.c_str(), provider_ = new trrep::wsrep_provider_v26(provider_spec.c_str(),
@ -98,6 +161,12 @@ int trrep::server_context::load_provider(const std::string& provider_spec,
return 0; return 0;
} }
trrep::server_context::~server_context()
{
delete provider_;
}
int trrep::server_context::on_apply( int trrep::server_context::on_apply(
trrep::client_context& client_context, trrep::client_context& client_context,
trrep::transaction_context& transaction_context, trrep::transaction_context& transaction_context,

View File

@ -17,6 +17,7 @@ namespace trrep
class provider; class provider;
class client_context; class client_context;
class transaction_context; class transaction_context;
class view;
class data; class data;
class server_context class server_context
@ -38,6 +39,7 @@ namespace trrep
, rollback_mode_(rollback_mode) , rollback_mode_(rollback_mode)
{ } { }
virtual ~server_context();
// //
// Return server name // Return server name
// //
@ -83,7 +85,8 @@ namespace trrep
// //
// //
virtual void on_connect() = 0; virtual void on_connect() = 0;
virtual void on_view() = 0; virtual void wait_until_connected() = 0;
virtual void on_view(const trrep::view&) = 0;
virtual void on_sync() = 0; virtual void on_sync() = 0;
// //

90
src/view.hpp Normal file
View File

@ -0,0 +1,90 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_VIEW_HPP
#define TRREP_VIEW_HPP
#include <wsrep_api.h>
#include <vector>
namespace trrep
{
class view
{
public:
class member
{
public:
member(const wsrep_member_info_t& member_info)
: id_()
, name_(member_info.name, WSREP_MEMBER_NAME_LEN)
, incoming_(member_info.incoming, WSREP_INCOMING_LEN)
{
char uuid_str[WSREP_UUID_STR_LEN + 1];
wsrep_uuid_print(&member_info.id, uuid_str, sizeof(uuid_str));
id_ = uuid_str;
}
const std::string& id() const { return id_; }
const std::string& name() const { return name_; }
const std::string& incoming() const { return incoming_; }
private:
std::string id_;
std::string name_;
std::string incoming_;
};
view(const wsrep_view_info_t& view_info)
: state_id_(view_info.state_id)
, view_(view_info.view)
, status_(view_info.status)
, capabilities_(view_info.capabilities)
, my_idx_(view_info.my_idx)
, proto_ver_(view_info.proto_ver)
, members_()
{
for (int i(0); i < view_info.memb_num; ++i)
{
members_.push_back(view_info.members[i]);
}
}
wsrep_seqno_t id() const
{ return view_; }
wsrep_view_status_t status() const
{ return status_; }
int own_index() const
{ return my_idx_; }
std::vector<member> members() const
{
std::vector<member> ret;
for (std::vector<wsrep_member_info_t>::const_iterator i(members_.begin());
i != members_.end(); ++i)
{
ret.push_back(member(*i));
}
return ret;
}
//
// Return true if the view is final
//
bool final() const
{
return (my_idx_ == -1);
}
private:
wsrep_gtid_t state_id_;
wsrep_seqno_t view_;
wsrep_view_status_t status_;
wsrep_cap_t capabilities_;
int my_idx_;
int proto_ver_;
std::vector<wsrep_member_info_t> members_;
};
}
#endif // TRREP_VIEW

View File

@ -7,6 +7,8 @@
#include <wsrep_api.h> #include <wsrep_api.h>
#include <iostream>
trrep::wsrep_provider_v26::wsrep_provider_v26( trrep::wsrep_provider_v26::wsrep_provider_v26(
const char* path, const char* path,
struct wsrep_init_args* args) struct wsrep_init_args* args)
@ -21,3 +23,47 @@ trrep::wsrep_provider_v26::wsrep_provider_v26(
throw trrep::runtime_error("Failed to initialize wsrep provider"); throw trrep::runtime_error("Failed to initialize wsrep provider");
} }
} }
trrep::wsrep_provider_v26::~wsrep_provider_v26()
{
wsrep_unload(wsrep_);
}
int trrep::wsrep_provider_v26::connect(
const std::string& cluster_name,
const std::string& cluster_url,
const std::string& state_donor,
bool bootstrap)
{
int ret(0);
wsrep_status_t wret;
if ((wret = wsrep_->connect(wsrep_,
cluster_name.c_str(),
cluster_url.c_str(),
state_donor.c_str(),
bootstrap)) != WSREP_OK)
{
std::cerr << "Failed to connect cluster: "
<< wret << "\n";
ret = 1;
}
return ret;
}
int trrep::wsrep_provider_v26::disconnect()
{
int ret(0);
wsrep_status_t wret;
if ((wret = wsrep_->disconnect(wsrep_)) != WSREP_OK)
{
std::cerr << "Failed to disconnect from cluster: "
<< wret << "\n";
ret = 1;
}
return ret;
}
wsrep_status_t trrep::wsrep_provider_v26::run_applier(void *applier_ctx)
{
return wsrep_->recv(wsrep_, applier_ctx);
}

View File

@ -16,7 +16,12 @@ namespace trrep
public: public:
wsrep_provider_v26(const char*, struct wsrep_init_args*); wsrep_provider_v26(const char*, struct wsrep_init_args*);
~wsrep_provider_v26();
int connect(const std::string&, const std::string&, const std::string&,
bool);
int disconnect();
wsrep_status_t run_applier(void*);
int start_transaction(wsrep_ws_handle_t*) { return 0; } int start_transaction(wsrep_ws_handle_t*) { return 0; }
int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) { return 0; } int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) { return 0; }
int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) { return 0; } int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) { return 0; }
@ -32,6 +37,8 @@ namespace trrep
int commit_order_leave(wsrep_ws_handle_t*) { return 0; } int commit_order_leave(wsrep_ws_handle_t*) { return 0; }
int release(wsrep_ws_handle_t*) { return 0; } int release(wsrep_ws_handle_t*) { return 0; }
private: private:
wsrep_provider_v26(const wsrep_provider_v26&);
wsrep_provider_v26& operator=(const wsrep_provider_v26);
struct wsrep* wsrep_; struct wsrep* wsrep_;
}; };
} }