1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Provider abstraction code compiles.

This commit is contained in:
Teemu Ollakka
2018-06-10 12:46:42 +03:00
parent e74b214c9c
commit ca615fcbd8
11 changed files with 324 additions and 83 deletions

View File

@ -168,7 +168,7 @@ public:
void start();
void stop();
void donate_sst(dbms_server&,
const std::string& req, const wsrep_gtid_t& gtid, bool);
const std::string& req, const wsrep::gtid& gtid, bool);
const dbms_simulator_params& params() const
{ return params_; }
std::string stats() const;
@ -248,7 +248,7 @@ public:
}
void on_sst_request(const std::string& req,
const wsrep_gtid_t& gtid,
const wsrep::gtid& gtid,
bool bypass)
{
simulator_.donate_sst(*this, req, gtid, bypass);
@ -369,7 +369,7 @@ private:
wsrep::log() << "replay: " << txc.id().get();
wsrep::client_applier_mode applier_mode(*this);
++stats_.replays;
return provider().replay(&txc.ws_handle(), this);
return provider().replay(txc.ws_handle(), this);
}
void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>&) const override
{ }
@ -418,10 +418,10 @@ private:
int data(std::rand() % 10000000);
std::ostringstream os;
os << data;
wsrep::key key;
wsrep::key key(wsrep::key::exclusive);
key.append_key_part("dbms", 4);
wsrep_conn_id_t client_id(id().get());
key.append_key_part(&client_id, sizeof(client_id));
unsigned long long client_key(id().get());
key.append_key_part(&client_key, sizeof(client_key));
key.append_key_part(&data, sizeof(data));
err = append_key(key);
err = err || append_data(wsrep::data(os.str().c_str(),
@ -486,7 +486,7 @@ void dbms_server::applier_thread()
wsrep::client_id client_id(last_client_id_.fetch_add(1) + 1);
dbms_client applier(*this, client_id,
wsrep::client_context::m_applier, 0);
wsrep_status_t ret(provider().run_applier(&applier));
enum wsrep::provider::status ret(provider().run_applier(&applier));
wsrep::log() << "Applier thread exited with error code " << ret;
}
@ -658,7 +658,7 @@ std::string dbms_simulator::stats() const
void dbms_simulator::donate_sst(dbms_server& server,
const std::string& req,
const wsrep_gtid_t& gtid,
const wsrep::gtid& gtid,
bool bypass)
{
size_t id;

View File

@ -41,7 +41,7 @@ namespace wsrep
bool sst_before_init() const WSREP_OVERRIDE { return false; }
std::string on_sst_required() WSREP_OVERRIDE { return ""; }
void on_sst_request(const std::string&,
const wsrep_gtid_t&,
const wsrep::gtid&,
bool) WSREP_OVERRIDE { }
// void sst_received(const wsrep_gtid_t&, int) WSREP_OVERRIDE { }
// void on_apply(wsrep::transaction_context&) { }

View File

@ -18,26 +18,24 @@ void wsrep_mock::bf_abort_unordered(wsrep::client_context& cc)
// BF abort method to abort transactions via provider
void wsrep_mock::bf_abort_provider(wsrep::mock_server_context& sc,
const wsrep::transaction_context& tc,
wsrep_seqno_t bf_seqno)
wsrep::seqno bf_seqno)
{
wsrep_seqno_t victim_seqno;
sc.provider().bf_abort(bf_seqno, tc.id().get(), &victim_seqno);
wsrep::seqno victim_seqno;
sc.provider().bf_abort(bf_seqno, tc.id(), victim_seqno);
(void)victim_seqno;
}
void wsrep_mock::start_applying_transaction(
wsrep::client_context& cc,
const wsrep::transaction_id& id,
wsrep_seqno_t seqno,
uint32_t flags)
wsrep::seqno seqno,
int flags)
{
wsrep_ws_handle_t ws_handle = { id.get(), 0 };
wsrep_trx_meta_t meta = {
{ {1 }, seqno }, /* gtid */
{ { static_cast<uint8_t>(cc.id().get()) }, id.get(), cc.id().get() }, /* stid */
seqno - 1
};
int ret(cc.start_transaction(ws_handle, meta, flags));
wsrep::ws_handle ws_handle(id, 0);
wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), seqno),
wsrep::stid(wsrep::id("1"), id, cc.id()),
flags, seqno.get() - 1);
int ret(cc.start_transaction(ws_handle, ws_meta));
if (ret != 0)
{
throw wsrep::runtime_error("failed to start applying transaction");

View File

@ -2,8 +2,6 @@
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include <wsrep_api.h> // Wsrep typedefs
// Forward declarations
namespace wsrep
{
@ -12,6 +10,7 @@ namespace wsrep
}
#include "wsrep/transaction_context.hpp"
#include "wsrep/provider.hpp"
//
// Utility functions
@ -25,11 +24,11 @@ namespace wsrep_mock
// BF abort method to abort transactions via provider
void bf_abort_provider(wsrep::mock_server_context& sc,
const wsrep::transaction_context& tc,
wsrep_seqno_t bf_seqno);
wsrep::seqno bf_seqno);
void start_applying_transaction(
wsrep::client_context& cc,
const wsrep::transaction_id& id,
wsrep_seqno_t seqno,
uint32_t flags);
wsrep::seqno seqno,
int flags);
}

View File

@ -27,11 +27,13 @@ wsrep::id::id(const std::string& str)
type_ = string;
std::memcpy(data_, str.c_str(), str.size());
}
std::ostringstream os;
os << "String '" << str << "' does not contain UUID";
throw wsrep::runtime_error(os.str());
else
{
std::ostringstream os;
os << "String '" << str << "' does not contain UUID";
throw wsrep::runtime_error(os.str());
}
}
wsrep::provider* wsrep::provider::make_provider(
const std::string&)
{

View File

@ -18,7 +18,7 @@ BOOST_AUTO_TEST_CASE(server_context_applying_1pc)
false);
wsrep_mock::start_applying_transaction(
cc, 1, 1,
WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END);
wsrep::provider::flag::start_transaction | wsrep::provider::flag::commit);
char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, wsrep::data(buf, 1)) == 0);
const wsrep::transaction_context& txc(cc.transaction());
@ -36,7 +36,7 @@ BOOST_AUTO_TEST_CASE(server_context_applying_2pc)
true);
wsrep_mock::start_applying_transaction(
cc, 1, 1,
WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END);
wsrep::provider::flag::start_transaction | wsrep::provider::flag::commit);
char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, wsrep::data(buf, 1)) == 0);
const wsrep::transaction_context& txc(cc.transaction());
@ -56,7 +56,7 @@ BOOST_AUTO_TEST_CASE(server_context_applying_1pc_rollback)
cc.fail_next_applying(true);
wsrep_mock::start_applying_transaction(
cc, 1, 1,
WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END);
wsrep::provider::flag::start_transaction | wsrep::provider::flag::commit);
char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, wsrep::data(buf, 1)) == 1);
@ -77,7 +77,7 @@ BOOST_AUTO_TEST_CASE(server_context_applying_2pc_rollback)
cc.fail_next_applying(true);
wsrep_mock::start_applying_transaction(
cc, 1, 1,
WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END);
wsrep::provider::flag::start_transaction | wsrep::provider::flag::commit);
char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, wsrep::data(buf, 1)) == 1);
const wsrep::transaction_context& txc(cc.transaction());

View File

@ -47,7 +47,8 @@ namespace
BOOST_REQUIRE(cc.before_statement() == 0);
wsrep_mock::start_applying_transaction(
cc, 1, 1,
WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END);
wsrep::provider::flag::start_transaction |
wsrep::provider::flag::commit);
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(cc.start_transaction() == 0);
BOOST_REQUIRE(tc.active() == true);
@ -280,7 +281,7 @@ BOOST_FIXTURE_TEST_CASE(
BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1));
BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_executing);
wsrep_mock::bf_abort_provider(sc, tc, WSREP_SEQNO_UNDEFINED);
wsrep_mock::bf_abort_provider(sc, tc, wsrep::seqno::undefined());
// Run before commit
BOOST_REQUIRE(cc.before_commit());
@ -388,7 +389,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc_applying)
wsrep_mock::start_applying_transaction(
cc, 1, 1,
WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END);
wsrep::provider::flag::start_transaction | wsrep::provider::flag::commit);
const wsrep::transaction_context& tc(cc.transaction());
BOOST_REQUIRE(tc.active() == false);

View File

@ -18,20 +18,178 @@ namespace
{
switch (status)
{
case WSREP_OK: return wsrep::provider::success;
case WSREP_WARNING: return wsrep::provider::error_warning;
case WSREP_TRX_MISSING: return wsrep::provider::error_transaction_missing;
case WSREP_TRX_FAIL: return wsrep::provider::error_certification_failed;
case WSREP_BF_ABORT: return wsrep::provider::error_bf_abort;
case WSREP_SIZE_EXCEEDED: return wsrep::provider::error_size_exceeded;
case WSREP_CONN_FAIL: return wsrep::provider::error_connection_failed;
case WSREP_NODE_FAIL: return wsrep::provider::error_provider_failed;
case WSREP_FATAL: return wsrep::provider::error_fatal;
case WSREP_NOT_IMPLEMENTED: return wsrep::provider::error_not_implemented;
case WSREP_NOT_ALLOWED: return wsrep::provider::error_not_allowed;
case WSREP_OK:
return wsrep::provider::success;
case WSREP_WARNING:
return wsrep::provider::error_warning;
case WSREP_TRX_MISSING:
return wsrep::provider::error_transaction_missing;
case WSREP_TRX_FAIL:
return wsrep::provider::error_certification_failed;
case WSREP_BF_ABORT:
return wsrep::provider::error_bf_abort;
case WSREP_SIZE_EXCEEDED:
return wsrep::provider::error_size_exceeded;
case WSREP_CONN_FAIL:
return wsrep::provider::error_connection_failed;
case WSREP_NODE_FAIL:
return wsrep::provider::error_provider_failed;
case WSREP_FATAL:
return wsrep::provider::error_fatal;
case WSREP_NOT_IMPLEMENTED:
return wsrep::provider::error_not_implemented;
case WSREP_NOT_ALLOWED:
return wsrep::provider::error_not_allowed;
}
return wsrep::provider::error_unknown;
}
wsrep_key_type_t map_key_type(enum wsrep::key::type type)
{
switch (type)
{
case wsrep::key::shared: return WSREP_KEY_SHARED;
case wsrep::key::semi_shared: return WSREP_KEY_SEMI;
case wsrep::key::semi_exclusive:
/*! \todo Implement semi exclusive in API */
assert(0);
return WSREP_KEY_EXCLUSIVE;
case wsrep::key::exclusive: return WSREP_KEY_EXCLUSIVE;
}
throw wsrep::runtime_error("Invalid key type");
}
inline uint32_t map_one(const int flags, const int from,
const uint32_t to)
{
return ((flags & from) ? to : 0);
}
uint32_t map_flags_to_native(int flags)
{
using wsrep::provider;
return (map_one(flags, provider::flag::start_transaction,
WSREP_FLAG_TRX_START) |
map_one(flags, provider::flag::commit, WSREP_FLAG_TRX_END) |
map_one(flags, provider::flag::rollback, WSREP_FLAG_ROLLBACK) |
map_one(flags, provider::flag::isolation, WSREP_FLAG_ISOLATION) |
map_one(flags, provider::flag::pa_unsafe, WSREP_FLAG_PA_UNSAFE) |
// map_one(flags, provider::flag::commutative, WSREP_FLAG_COMMUTATIVE) |
// map_one(flags, provider::flag::native, WSREP_FLAG_NATIVE) |
map_one(flags, provider::flag::snapshot, WSREP_FLAG_SNAPSHOT));
}
class mutable_ws_handle
{
public:
mutable_ws_handle(wsrep::ws_handle& ws_handle)
: ws_handle_(ws_handle)
, native_((wsrep_ws_handle_t)
{
ws_handle_.transaction_id().get(),
ws_handle_.opaque()
})
{ }
~mutable_ws_handle()
{
ws_handle_ = wsrep::ws_handle(native_.trx_id, native_.opaque);
}
wsrep_ws_handle_t* native()
{
return &native_;
}
private:
wsrep::ws_handle& ws_handle_;
wsrep_ws_handle_t native_;
};
class const_ws_handle
{
public:
const_ws_handle(const wsrep::ws_handle& ws_handle)
: ws_handle_(ws_handle)
, native_((wsrep_ws_handle_t)
{
ws_handle_.transaction_id().get(),
ws_handle_.opaque()
})
{ }
~const_ws_handle()
{
assert(ws_handle.transaction_id().get() == native_.trx_id);
assert(ws_handle.opaque() == native_.opaque);
}
const wsrep_ws_handle_t* native() const
{
return &native_;
}
private:
const wsrep::ws_handle& ws_handle_;
const wsrep_ws_handle_t native_;
};
class mutable_ws_meta
{
public:
mutable_ws_meta(wsrep::ws_meta& ws_meta, int flags)
: ws_meta_(ws_meta)
, trx_meta_()
, flags_(map_flags_to_native(flags))
{ }
~mutable_ws_meta()
{
ws_meta_ = wsrep::ws_meta(
wsrep::gtid(
wsrep::id(trx_meta_.gtid.uuid.data,
sizeof(trx_meta_.gtid.uuid.data)),
trx_meta_.gtid.seqno),
wsrep::stid(wsrep::id(trx_meta_.stid.node.data,
sizeof(trx_meta_.stid.node.data)),
trx_meta_.stid.trx,
trx_meta_.stid.conn),
trx_meta_.depends_on, flags_);
}
wsrep_trx_meta* native() { return &trx_meta_; }
uint32_t native_flags() const { return map_flags_to_native(flags_); }
private:
wsrep::ws_meta& ws_meta_;
wsrep_trx_meta_t trx_meta_;
int flags_;
};
class const_ws_meta
{
public:
const_ws_meta(const wsrep::ws_meta& ws_meta)
: ws_meta_(ws_meta)
, trx_meta_()
{
std::memcpy(trx_meta_.gtid.uuid.data, ws_meta.group_id().data(),
sizeof(trx_meta_.gtid.uuid.data));
trx_meta_.gtid.seqno = ws_meta.seqno().get();
std::memcpy(trx_meta_.stid.node.data, ws_meta.server_id().data(),
sizeof(trx_meta_.stid.node.data));
trx_meta_.stid.conn = ws_meta.client_id().get();
trx_meta_.stid.trx = ws_meta.transaction_id().get();
trx_meta_.depends_on = ws_meta.depends_on().get();
}
~const_ws_meta()
{
}
const wsrep_trx_meta* native() const { return &trx_meta_; }
private:
const wsrep::ws_meta& ws_meta_;
wsrep_trx_meta_t trx_meta_;
};
}
wsrep::wsrep_provider_v26::wsrep_provider_v26(
@ -94,74 +252,119 @@ wsrep::wsrep_provider_v26::run_applier(void *applier_ctx)
return map_return_value(wsrep_->recv(wsrep_, applier_ctx));
}
int wsrep::wsrep_provider_v26::append_key(wsrep_ws_handle_t* wsh,
const wsrep_key_t* key)
int wsrep::wsrep_provider_v26::append_key(wsrep::ws_handle& ws_handle,
const wsrep::key& key)
{
return (wsrep_->append_key(wsrep_, wsh, key, 1, WSREP_KEY_EXCLUSIVE, true)
if (key.size() > 3)
{
assert(0);
return 1;
}
wsrep_buf_t key_parts[3];
for (size_t i(0); i < key.size(); ++i)
{
key_parts[i].ptr = key.key_parts()[i].ptr();
key_parts[i].len = key.key_parts()[i].size();
}
wsrep_key_t wsrep_key = {key_parts, key.size()};
mutable_ws_handle mwsh(ws_handle);
return (wsrep_->append_key(
wsrep_, mwsh.native(),
&wsrep_key, 1, map_key_type(key.type()), true)
!= WSREP_OK);
}
int wsrep::wsrep_provider_v26::append_data(wsrep_ws_handle_t* wsh,
const wsrep_buf_t* data)
int wsrep::wsrep_provider_v26::append_data(wsrep::ws_handle& ws_handle,
const wsrep::data& data)
{
return (wsrep_->append_data(wsrep_, wsh, data, 1, WSREP_DATA_ORDERED, true)
const wsrep_buf_t wsrep_buf = {data.get().ptr(), data.get().size()};
mutable_ws_handle mwsh(ws_handle);
return (wsrep_->append_data(wsrep_, mwsh.native(), &wsrep_buf,
1, WSREP_DATA_ORDERED, true)
!= WSREP_OK);
}
wsrep_status_t wsrep::wsrep_provider_v26::certify(wsrep_conn_id_t conn_id,
wsrep_ws_handle_t* wsh,
uint32_t flags,
wsrep_trx_meta_t* meta)
enum wsrep::provider::status
wsrep::wsrep_provider_v26::certify(wsrep::client_id client_id,
wsrep::ws_handle& ws_handle,
int flags,
wsrep::ws_meta& ws_meta)
{
return wsrep_->certify(wsrep_, conn_id, wsh, flags, meta);
mutable_ws_handle mwsh(ws_handle);
mutable_ws_meta mmeta(ws_meta, flags);
return map_return_value(
wsrep_->certify(wsrep_, client_id.get(), mwsh.native(),
mmeta.native_flags(),
mmeta.native()));
}
wsrep_status_t wsrep::wsrep_provider_v26::bf_abort(
wsrep_seqno_t bf_seqno,
wsrep_trx_id_t victim_id,
wsrep_seqno_t *victim_seqno)
enum wsrep::provider::status
wsrep::wsrep_provider_v26::bf_abort(
wsrep::seqno bf_seqno,
wsrep::transaction_id victim_id,
wsrep::seqno& victim_seqno)
{
return wsrep_->abort_certification(
wsrep_, bf_seqno, victim_id, victim_seqno);
wsrep_seqno_t wsrep_victim_seqno;
wsrep_status_t ret(
wsrep_->abort_certification(
wsrep_, bf_seqno.get(), victim_id.get(), &wsrep_victim_seqno));
victim_seqno = wsrep_victim_seqno;
return map_return_value(ret);
}
wsrep_status_t wsrep::wsrep_provider_v26::commit_order_enter(
const wsrep_ws_handle_t* wsh,
const wsrep_trx_meta_t* meta)
enum wsrep::provider::status
wsrep::wsrep_provider_v26::commit_order_enter(
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta)
{
return wsrep_->commit_order_enter(wsrep_, wsh, meta);
const_ws_handle cwsh(ws_handle);
const_ws_meta cwsm(ws_meta);
return map_return_value(
wsrep_->commit_order_enter(wsrep_, cwsh.native(), cwsm.native()));
}
int wsrep::wsrep_provider_v26::commit_order_leave(
const wsrep_ws_handle_t* wsh,
const wsrep_trx_meta_t* meta)
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta)
{
return (wsrep_->commit_order_leave(wsrep_, wsh, meta, 0) != WSREP_OK);
const_ws_handle cwsh(ws_handle);
const_ws_meta cwsm(ws_meta);
return (wsrep_->commit_order_leave(wsrep_, cwsh.native(), cwsm.native(), 0) != WSREP_OK);
}
int wsrep::wsrep_provider_v26::release(wsrep_ws_handle_t* wsh)
int wsrep::wsrep_provider_v26::release(wsrep::ws_handle& ws_handle)
{
return (wsrep_->release(wsrep_, wsh) != WSREP_OK);
mutable_ws_handle mwsh(ws_handle);
return (wsrep_->release(wsrep_, mwsh.native()) != WSREP_OK);
}
int wsrep::wsrep_provider_v26::replay(wsrep_ws_handle_t* wsh,
int wsrep::wsrep_provider_v26::replay(wsrep::ws_handle& ws_handle,
void* applier_ctx)
{
return (wsrep_->replay_trx(wsrep_, wsh, applier_ctx) != WSREP_OK);
mutable_ws_handle mwsh(ws_handle);
return (wsrep_->replay_trx(wsrep_, mwsh.native(), applier_ctx) != WSREP_OK);
}
int wsrep::wsrep_provider_v26::sst_sent(const wsrep_gtid_t& gtid, int err)
int wsrep::wsrep_provider_v26::sst_sent(const wsrep::gtid& gtid, int err)
{
if (wsrep_->sst_sent(wsrep_, &gtid, err) != WSREP_OK)
wsrep_gtid_t wsrep_gtid;
std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(),
sizeof(wsrep_gtid.uuid.data));
wsrep_gtid.seqno = gtid.seqno().get();
if (wsrep_->sst_sent(wsrep_, &wsrep_gtid, err) != WSREP_OK)
{
return 1;
}
return 0;
}
int wsrep::wsrep_provider_v26::sst_received(const wsrep_gtid_t& gtid, int err)
int wsrep::wsrep_provider_v26::sst_received(const wsrep::gtid& gtid, int err)
{
if (wsrep_->sst_received(wsrep_, &gtid, 0, err) != WSREP_OK)
wsrep_gtid_t wsrep_gtid;
std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(),
sizeof(wsrep_gtid.uuid.data));
wsrep_gtid.seqno = gtid.seqno().get();
if (wsrep_->sst_received(wsrep_, &wsrep_gtid, 0, err) != WSREP_OK)
{
return 1;
}