1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-06-16 02:01:44 +03:00

Refactored provider specific code out of server_context.cpp

This commit is contained in:
Teemu Ollakka
2018-06-12 10:52:56 +03:00
parent 97d9f93648
commit d6f185c278
7 changed files with 270 additions and 224 deletions

View File

@ -9,44 +9,11 @@
#include "wsrep/logger.hpp"
#include "wsrep/compiler.hpp"
// Todo: refactor into provider factory
#include "mock_provider.hpp"
#include "wsrep_provider_v26.hpp"
#include <cassert>
#include <sstream>
namespace
{
inline uint32_t map_one(const int flags, const uint32_t from,
const int to)
{
return ((flags & from) ? to : 0);
}
int map_flags_from_native(uint32_t flags)
{
using wsrep::provider;
return (map_one(flags,
WSREP_FLAG_TRX_START,
provider::flag::start_transaction) |
map_one(flags,
WSREP_FLAG_TRX_END,
provider::flag::commit) |
map_one(flags,
WSREP_FLAG_ROLLBACK,
provider::flag::rollback) |
map_one(flags,
WSREP_FLAG_ISOLATION,
provider::flag::isolation) |
map_one(flags,
WSREP_FLAG_PA_UNSAFE,
provider::flag::pa_unsafe) |
// map_one(flags, provider::flag::commutative, WSREP_FLAG_COMMUTATIVE) |
// map_one(flags, provider::flag::native, WSREP_FLAG_NATIVE) |
map_one(flags, WSREP_FLAG_SNAPSHOT, provider::flag::snapshot));
}
inline bool starts_transaction(const wsrep::ws_meta& ws_meta)
{
return (ws_meta.flags() & wsrep::provider::flag::start_transaction);
@ -62,187 +29,14 @@ namespace
return (ws_meta.flags() & wsrep::provider::flag::rollback);
}
wsrep_cb_status_t connected_cb(
void* app_ctx,
const wsrep_view_info_t* view __attribute((unused)))
{
assert(app_ctx);
wsrep::server_context& server_context(
*reinterpret_cast<wsrep::server_context*>(app_ctx));
//
// TODO: Fetch server id and group id from view infor
//
try
{
server_context.on_connect();
return WSREP_CB_SUCCESS;
}
catch (const wsrep::runtime_error& e)
{
std::cerr << "Exception: " << e.what();
return WSREP_CB_FAILURE;
}
}
wsrep_cb_status_t view_cb(void* app_ctx,
void* recv_ctx __attribute__((unused)),
const wsrep_view_info_t* view_info,
const char*,
size_t)
{
assert(app_ctx);
assert(view_info);
wsrep::server_context& server_context(
*reinterpret_cast<wsrep::server_context*>(app_ctx));
try
{
wsrep::view view(*view_info);
server_context.on_view(view);
return WSREP_CB_SUCCESS;
}
catch (const wsrep::runtime_error& e)
{
std::cerr << "Exception: " << e.what();
return WSREP_CB_FAILURE;
}
}
wsrep_cb_status_t sst_request_cb(void* app_ctx,
void **sst_req, size_t* sst_req_len)
{
assert(app_ctx);
wsrep::server_context& server_context(
*reinterpret_cast<wsrep::server_context*>(app_ctx));
try
{
std::string req(server_context.on_sst_required());
*sst_req = ::strdup(req.c_str());
*sst_req_len = strlen(req.c_str());
return WSREP_CB_SUCCESS;
}
catch (const wsrep::runtime_error& e)
{
return WSREP_CB_FAILURE;
}
}
wsrep_cb_status_t apply_cb(void* ctx,
const wsrep_ws_handle_t* wsh,
uint32_t flags,
const wsrep_buf_t* buf,
const wsrep_trx_meta_t* meta,
wsrep_bool_t* exit_loop __attribute__((unused)))
{
wsrep_cb_status_t ret(WSREP_CB_SUCCESS);
wsrep::client_context* client_context(
reinterpret_cast<wsrep::client_context*>(ctx));
assert(client_context);
assert(client_context->mode() == wsrep::client_context::m_applier);
wsrep::data data(buf->ptr, buf->len);
wsrep::ws_handle ws_handle(wsh->trx_id, wsh->opaque);
wsrep::ws_meta ws_meta(
wsrep::gtid(wsrep::id(meta->gtid.uuid.data,
sizeof(meta->gtid.uuid.data)),
wsrep::seqno(meta->gtid.seqno)),
wsrep::stid(wsrep::id(meta->stid.node.data,
sizeof(meta->stid.node.data)),
meta->stid.trx,
meta->stid.conn), wsrep::seqno(meta->depends_on),
map_flags_from_native(flags));
if (ret == WSREP_CB_SUCCESS &&
client_context->server_context().on_apply(
*client_context, ws_handle, ws_meta, data))
{
ret = WSREP_CB_FAILURE;
}
return ret;
}
wsrep_cb_status_t synced_cb(void* app_ctx)
{
assert(app_ctx);
wsrep::server_context& server_context(
*reinterpret_cast<wsrep::server_context*>(app_ctx));
try
{
server_context.on_sync();
return WSREP_CB_SUCCESS;
}
catch (const wsrep::runtime_error& e)
{
std::cerr << "On sync failed: " << e.what() << "\n";
return WSREP_CB_FAILURE;
}
}
wsrep_cb_status_t sst_donate_cb(void* app_ctx,
void* ,
const wsrep_buf_t* req_buf,
const wsrep_gtid_t* req_gtid,
const wsrep_buf_t*,
bool bypass)
{
assert(app_ctx);
wsrep::server_context& server_context(
*reinterpret_cast<wsrep::server_context*>(app_ctx));
try
{
std::string req(reinterpret_cast<const char*>(req_buf->ptr),
req_buf->len);
wsrep::gtid gtid(wsrep::id(req_gtid->uuid.data,
sizeof(req_gtid->uuid.data)),
wsrep::seqno(req_gtid->seqno));
server_context.on_sst_request(req, gtid, bypass);
return WSREP_CB_SUCCESS;
}
catch (const wsrep::runtime_error& e)
{
return WSREP_CB_FAILURE;
}
}
}
int wsrep::server_context::load_provider(const std::string& provider_spec,
const std::string& provider_options)
{
wsrep::log() << "Loading provider " << provider_spec;
if (provider_spec == "mock")
{
provider_ = new wsrep::mock_provider(*this);
}
else
{
std::cerr << "Provider options" << provider_options << "\n";
struct wsrep_init_args init_args;
memset(&init_args, 0, sizeof(init_args));
init_args.app_ctx = this;
init_args.node_name = name_.c_str();
init_args.node_address = address_.c_str();
init_args.node_incoming = "";
init_args.data_dir = working_dir_.c_str();
init_args.options = provider_options.c_str();
init_args.proto_ver = 1;
init_args.state_id = 0;
init_args.state = 0;
init_args.logger_cb = 0;
init_args.connected_cb = &connected_cb;
init_args.view_cb = &view_cb;
init_args.sst_request_cb = &sst_request_cb;
init_args.apply_cb = &apply_cb;
init_args.unordered_cb = 0;
init_args.sst_donate_cb = &sst_donate_cb;
init_args.synced_cb = &synced_cb;
std::cerr << init_args.options << "\n";
provider_ = new wsrep::wsrep_provider_v26(*this,
provider_spec.c_str(),
&init_args);
}
return 0;
wsrep::log_info() << "Loading provider " << provider_spec;
provider_ = wsrep::provider::make_provider(*this, provider_spec, provider_options);
return (provider_ ? 0 : 1);
}
int wsrep::server_context::connect(const std::string& cluster_name,
@ -434,7 +228,7 @@ void wsrep::server_context::state(
std::ostringstream os;
os << "server: " << name_ << " unallowed state transition: "
<< wsrep::to_string(state_) << " -> " << wsrep::to_string(state);
std::cerr << os.str() << "\n";
wsrep::log_error() << os.str() << "\n";
::abort();
// throw wsrep::runtime_error(os.str());
}