1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-06-16 02:01:44 +03:00

SR applying implementation, unit tests.

This commit is contained in:
Teemu Ollakka
2018-06-14 11:14:40 +03:00
parent 39c9ef8a63
commit a8ab9c6dbd
9 changed files with 188 additions and 49 deletions

View File

@ -37,6 +37,7 @@ struct dbms_simulator_params
size_t n_servers; size_t n_servers;
size_t n_clients; size_t n_clients;
size_t n_transactions; size_t n_transactions;
size_t n_rows;
size_t alg_freq; size_t alg_freq;
std::string wsrep_provider; std::string wsrep_provider;
std::string wsrep_provider_options; std::string wsrep_provider_options;
@ -46,6 +47,7 @@ struct dbms_simulator_params
: n_servers(0) : n_servers(0)
, n_clients(0) , n_clients(0)
, n_transactions(0) , n_transactions(0)
, n_rows(1000)
, alg_freq(0) , alg_freq(0)
, wsrep_provider() , wsrep_provider()
, wsrep_provider_options() , wsrep_provider_options()
@ -265,11 +267,13 @@ public:
// Client context management // Client context management
wsrep::client_context* local_client_context(); wsrep::client_context* local_client_context();
wsrep::client_context& streaming_applier_client_context( wsrep::client_context* streaming_applier_client_context() override
const wsrep::id&, const wsrep::transaction_id&) override
{ {
throw wsrep::not_implemented_error(); throw wsrep::not_implemented_error();
} }
void log_dummy_write_set(wsrep::client_context&, const wsrep::ws_meta&)
override
{ }
size_t next_transaction_id() size_t next_transaction_id()
{ {
return (last_transaction_id_.fetch_add(1) + 1); return (last_transaction_id_.fetch_add(1) + 1);
@ -308,12 +312,12 @@ public:
dbms_client(dbms_server& server, dbms_client(dbms_server& server,
const wsrep::client_id& id, const wsrep::client_id& id,
enum wsrep::client_context::mode mode, enum wsrep::client_context::mode mode,
size_t n_transactions) const dbms_simulator_params& params)
: wsrep::client_context(mutex_, server, id, mode) : wsrep::client_context(mutex_, server, id, mode)
, mutex_() , mutex_()
, server_(server) , server_(server)
, se_trx_(server_.storage_engine()) , se_trx_(server_.storage_engine())
, n_transactions_(n_transactions) , params_(params)
, stats_() , stats_()
{ } { }
@ -324,7 +328,7 @@ public:
void start() void start()
{ {
for (size_t i(0); i < n_transactions_; ++i) for (size_t i(0); i < params_.n_transactions; ++i)
{ {
run_one_transaction(); run_one_transaction();
report_progress(i + 1); report_progress(i + 1);
@ -458,7 +462,7 @@ private:
// wsrep::log_debug() << "Generate write set"; // wsrep::log_debug() << "Generate write set";
assert(transaction().active()); assert(transaction().active());
assert(err == 0); assert(err == 0);
int data(std::rand() % 10000000); int data(std::rand() % params_.n_rows);
std::ostringstream os; std::ostringstream os;
os << data; os << data;
wsrep::key key(wsrep::key::exclusive); wsrep::key key(wsrep::key::exclusive);
@ -513,13 +517,13 @@ private:
{ {
wsrep::log() << "client: " << id().get() wsrep::log() << "client: " << id().get()
<< " transactions: " << i << " transactions: " << i
<< " " << 100*double(i)/n_transactions_ << "%"; << " " << 100*double(i)/params_.n_transactions << "%";
} }
} }
wsrep::default_mutex mutex_; wsrep::default_mutex mutex_;
dbms_server& server_; dbms_server& server_;
dbms_storage_engine::transaction se_trx_; dbms_storage_engine::transaction se_trx_;
const size_t n_transactions_; const dbms_simulator_params params_;
struct stats stats_; struct stats stats_;
}; };
@ -529,7 +533,7 @@ void dbms_server::applier_thread()
{ {
wsrep::client_id client_id(last_client_id_.fetch_add(1) + 1); wsrep::client_id client_id(last_client_id_.fetch_add(1) + 1);
dbms_client applier(*this, client_id, dbms_client applier(*this, client_id,
wsrep::client_context::m_applier, 0); wsrep::client_context::m_applier, simulator_.params());
enum wsrep::provider::status ret(provider().run_applier(&applier)); enum wsrep::provider::status ret(provider().run_applier(&applier));
wsrep::log() << "Applier thread exited with error code " << ret; wsrep::log() << "Applier thread exited with error code " << ret;
} }
@ -538,7 +542,9 @@ wsrep::client_context* dbms_server::local_client_context()
{ {
std::ostringstream id_os; std::ostringstream id_os;
size_t client_id(++last_client_id_); size_t client_id(++last_client_id_);
return new dbms_client(*this, client_id, wsrep::client_context::m_replicating, 0); return new dbms_client(*this, client_id,
wsrep::client_context::m_replicating,
simulator_.params());
} }
void dbms_server::start_clients() void dbms_server::start_clients()
@ -576,7 +582,7 @@ void dbms_server::start_client(size_t id)
auto client(std::make_shared<dbms_client>( auto client(std::make_shared<dbms_client>(
*this, id, *this, id,
wsrep::client_context::m_replicating, wsrep::client_context::m_replicating,
simulator_.params().n_transactions)); simulator_.params()));
clients_.push_back(client); clients_.push_back(client);
client_threads_.push_back( client_threads_.push_back(
boost::thread(&dbms_server::client_thread, this, client)); boost::thread(&dbms_server::client_thread, this, client));
@ -764,6 +770,8 @@ int main(int argc, char** argv)
"number of clients to start per server") "number of clients to start per server")
("transactions", po::value<size_t>(&params.n_transactions), ("transactions", po::value<size_t>(&params.n_transactions),
"number of transactions run by a client") "number of transactions run by a client")
("rows", po::value<size_t>(&params.n_rows),
"number of rows per table")
("alg-freq", po::value<size_t>(&params.alg_freq), ("alg-freq", po::value<size_t>(&params.alg_freq),
"ALG frequency") "ALG frequency")
("debug-log-level", po::value<int>(&params.debug_log_level), ("debug-log-level", po::value<int>(&params.debug_log_level),

View File

@ -210,12 +210,6 @@ namespace wsrep
*/ */
enum after_statement_result after_statement(); enum after_statement_result after_statement();
int start_transaction()
{
assert(state_ == s_exec);
return transaction_.start_transaction();
}
int start_transaction(const wsrep::transaction_id& id) int start_transaction(const wsrep::transaction_id& id)
{ {
assert(state_ == s_exec); assert(state_ == s_exec);

View File

@ -208,16 +208,23 @@ namespace wsrep
* \param transaction_id Transaction ID of the SR transaction on the * \param transaction_id Transaction ID of the SR transaction on the
* origin server. * origin server.
*/ */
virtual client_context& streaming_applier_client_context( virtual client_context* streaming_applier_client_context() = 0;
const wsrep::id& server_id,
const wsrep::transaction_id& transaction_id
) = 0;
void insert_streaming_applier( void start_streaming_applier(
const wsrep::id&, const wsrep::id&,
const wsrep::transaction_id&, const wsrep::transaction_id&,
wsrep::client_context* client_context); wsrep::client_context* client_context);
void stop_streaming_applier(
const wsrep::id&, const wsrep::transaction_id&);
/*!
* Return reference to streaming applier.
*/
client_context* find_streaming_applier(const wsrep::id&,
const wsrep::transaction_id&) const;
virtual void log_dummy_write_set(wsrep::client_context&,
const wsrep::ws_meta&) = 0;
/*! /*!
* Load WSRep provider. * Load WSRep provider.
* *

View File

@ -80,14 +80,6 @@ namespace wsrep
bool pa_unsafe() const { return pa_unsafe_; } bool pa_unsafe() const { return pa_unsafe_; }
void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; } void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; }
//
int start_transaction()
{
assert(active() == false);
assert(ws_meta_.transaction_id() != transaction_id::invalid());
return start_transaction(ws_meta_.transaction_id());
}
int start_transaction(const wsrep::transaction_id& id); int start_transaction(const wsrep::transaction_id& id);
int start_transaction(const wsrep::ws_handle& ws_handle, int start_transaction(const wsrep::ws_handle& ws_handle,

View File

@ -122,7 +122,6 @@ int wsrep::server_context::on_apply(
// wsrep::log_debug() << "server_context::on apply flags: " // wsrep::log_debug() << "server_context::on apply flags: "
// << flags_to_string(ws_meta.flags()); // << flags_to_string(ws_meta.flags());
assert(ws_handle.opaque()); assert(ws_handle.opaque());
assert(ws_meta.flags());
bool not_replaying(txc.state() != bool not_replaying(txc.state() !=
wsrep::transaction_context::s_replaying); wsrep::transaction_context::s_replaying);
@ -164,12 +163,91 @@ int wsrep::server_context::on_apply(
assert(ret || assert(ret ||
txc.state() == wsrep::transaction_context::s_committed); txc.state() == wsrep::transaction_context::s_committed);
} }
else if (starts_transaction(ws_meta.flags()))
{
assert(not_replaying);
assert(find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()) == 0);
wsrep::client_context* sac(streaming_applier_client_context());
start_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id(), sac);
sac->start_transaction(ws_handle, ws_meta);
{
// TODO: Client context switch will be ultimately
// implemented by the application. Therefore need to
// introduce a virtual method call which takes
// both original and sac client contexts as argument
// and a functor which does the applying.
wsrep::client_context_switch sw(client_context, *sac);
sac->before_command();
sac->before_statement();
sac->apply(data);
sac->after_statement();
sac->after_command_before_result();
sac->after_command_after_result();
}
log_dummy_write_set(client_context, ws_meta);
}
else if (ws_meta.flags() == 0)
{
wsrep::client_context* sac(
find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()));
if (sac == 0)
{
// It is possible that rapid group membership changes
// may cause streaming transaction be rolled back before
// commit fragment comes in. Although this is a valid
// situation, log a warning if a sac cannot be found as
// it may be an indication of a bug too.
wsrep::log_warning() << "Could not find applier context for "
<< ws_meta.server_id()
<< ": " << ws_meta.transaction_id();
}
else
{
wsrep::client_context_switch(client_context, *sac);
sac->before_command();
sac->before_statement();
ret = sac->apply(data);
sac->after_statement();
sac->after_command_before_result();
sac->after_command_after_result();
}
log_dummy_write_set(client_context, ws_meta);
}
else if (commits_transaction(ws_meta.flags())) else if (commits_transaction(ws_meta.flags()))
{ {
if (not_replaying) if (not_replaying)
{ {
// SR commit not implemented yet wsrep::client_context* sac(
assert(0); find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()));
assert(sac);
if (sac == 0)
{
// It is possible that rapid group membership changes
// may cause streaming transaction be rolled back before
// commit fragment comes in. Although this is a valid
// situation, log a warning if a sac cannot be found as
// it may be an indication of a bug too.
wsrep::log_warning() << "Could not find applier context for "
<< ws_meta.server_id()
<< ": " << ws_meta.transaction_id();
}
else
{
wsrep::client_context_switch(client_context, *sac);
sac->before_command();
sac->before_statement();
ret = sac->commit();
sac->after_statement();
sac->after_command_before_result();
sac->after_command_after_result();
stop_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id());
delete sac;
}
} }
else else
{ {
@ -198,7 +276,7 @@ bool wsrep::server_context::statement_allowed_for_streaming(
return false; return false;
} }
void wsrep::server_context::insert_streaming_applier( void wsrep::server_context::start_streaming_applier(
const wsrep::id& server_id, const wsrep::id& server_id,
const wsrep::transaction_id& transaction_id, const wsrep::transaction_id& transaction_id,
wsrep::client_context* client_context) wsrep::client_context* client_context)
@ -213,6 +291,33 @@ void wsrep::server_context::insert_streaming_applier(
} }
} }
void wsrep::server_context::stop_streaming_applier(
const wsrep::id& server_id,
const wsrep::transaction_id& transaction_id)
{
streaming_appliers_map::iterator i(
streaming_appliers_.find(std::make_pair(server_id, transaction_id)));
assert(i != streaming_appliers_.end());
if (i == streaming_appliers_.end())
{
wsrep::log_warning() << "Could not find streaming applier for "
<< server_id << ":" << transaction_id;
}
else
{
streaming_appliers_.erase(i);
}
}
wsrep::client_context* wsrep::server_context::find_streaming_applier(
const wsrep::id& server_id,
const wsrep::transaction_id& transaction_id) const
{
streaming_appliers_map::const_iterator i(
streaming_appliers_.find(std::make_pair(server_id, transaction_id)));
return (i == streaming_appliers_.end() ? 0 : i->second);
}
// Private // Private
void wsrep::server_context::state( void wsrep::server_context::state(

View File

@ -71,6 +71,7 @@ int wsrep::transaction_context::start_transaction(
{ {
assert(ws_meta.flags()); assert(ws_meta.flags());
assert(active() == false); assert(active() == false);
id_ = ws_meta.transaction_id();
assert(client_context_.mode() == wsrep::client_context::m_applier); assert(client_context_.mode() == wsrep::client_context::m_applier);
state_ = s_executing; state_ = s_executing;
ws_handle_ = ws_handle; ws_handle_ = ws_handle;
@ -900,10 +901,11 @@ int wsrep::transaction_context::certify_commit(
void wsrep::transaction_context::streaming_rollback() void wsrep::transaction_context::streaming_rollback()
{ {
assert(streaming_context_.rolled_back() == false); assert(streaming_context_.rolled_back() == false);
wsrep::client_context& applier_context( wsrep::client_context* sac(
client_context_.server_context().streaming_applier_client_context( client_context_.server_context().streaming_applier_client_context());
client_context_.server_context().id(), id())); client_context_.server_context().start_streaming_applier(
applier_context.adopt_transaction(*this); client_context_.server_context().id(), id(), sac);
sac->adopt_transaction(*this);
streaming_context_.cleanup(); streaming_context_.cleanup();
// Replicate rollback fragment // Replicate rollback fragment
provider_.rollback(id_.get()); provider_.rollback(id_.get());

View File

@ -108,8 +108,6 @@ namespace
wsrep::provider::flag::start_transaction | wsrep::provider::flag::start_transaction |
wsrep::provider::flag::commit); wsrep::provider::flag::commit);
BOOST_REQUIRE(cc.start_transaction(ws_handle, ws_meta) == 0); BOOST_REQUIRE(cc.start_transaction(ws_handle, ws_meta) == 0);
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(cc.start_transaction() == 0);
BOOST_REQUIRE(tc.active() == true); BOOST_REQUIRE(tc.active() == true);
BOOST_REQUIRE(tc.certified() == true); BOOST_REQUIRE(tc.certified() == true);
BOOST_REQUIRE(tc.ordered() == true); BOOST_REQUIRE(tc.ordered() == true);
@ -138,8 +136,6 @@ namespace
wsrep::provider::flag::start_transaction | wsrep::provider::flag::start_transaction |
wsrep::provider::flag::commit); wsrep::provider::flag::commit);
BOOST_REQUIRE(cc.start_transaction(ws_handle, ws_meta) == 0); BOOST_REQUIRE(cc.start_transaction(ws_handle, ws_meta) == 0);
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(cc.start_transaction() == 0);
BOOST_REQUIRE(tc.active() == true); BOOST_REQUIRE(tc.active() == true);
BOOST_REQUIRE(tc.certified() == true); BOOST_REQUIRE(tc.certified() == true);
BOOST_REQUIRE(tc.ordered() == true); BOOST_REQUIRE(tc.ordered() == true);

View File

@ -33,13 +33,15 @@ namespace wsrep
return new wsrep::fake_client_context(*this, ++last_client_id_, return new wsrep::fake_client_context(*this, ++last_client_id_,
wsrep::client_context::m_local); wsrep::client_context::m_local);
} }
wsrep::client_context& streaming_applier_client_context( wsrep::client_context* streaming_applier_client_context()
const wsrep::id& server_id,
const wsrep::transaction_id& transaction_id)
{ {
wsrep::client_context* sac(new wsrep::fake_client_context(*this, ++last_client_id_, wsrep::client_context::m_applier)); return new wsrep::fake_client_context(
insert_streaming_applier(server_id, transaction_id, sac); *this, ++last_client_id_, wsrep::client_context::m_applier);
return *sac; }
void log_dummy_write_set(wsrep::client_context&, const wsrep::ws_meta&)
WSREP_OVERRIDE
{
//
} }
void on_connect() WSREP_OVERRIDE { } void on_connect() WSREP_OVERRIDE { }
void wait_until_connected() WSREP_OVERRIDE { } void wait_until_connected() WSREP_OVERRIDE { }

View File

@ -24,7 +24,6 @@ namespace
wsrep::provider::flag::start_transaction | wsrep::provider::flag::start_transaction |
wsrep::provider::flag::commit) wsrep::provider::flag::commit)
{ {
cc.start_transaction(ws_handle, ws_meta);
} }
wsrep::fake_server_context sc; wsrep::fake_server_context sc;
wsrep::fake_client_context cc; wsrep::fake_client_context cc;
@ -84,6 +83,40 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_2pc_rollback,
BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_aborted); BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_aborted);
} }
BOOST_AUTO_TEST_CASE(server_context_streaming)
{
wsrep::fake_server_context sc("s1", "s1",
wsrep::server_context::rm_sync);
wsrep::fake_client_context cc(sc,
wsrep::client_id(1),
wsrep::client_context::m_applier,
false);
wsrep::ws_handle ws_handle(1, (void*)1);
wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(1)),
wsrep::stid(wsrep::id("1"), 1, 1),
wsrep::seqno(0),
wsrep::provider::flag::start_transaction);
BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
wsrep::const_buffer("1", 1)) == 0);
BOOST_REQUIRE(sc.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()));
ws_meta = wsrep::ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(2)),
wsrep::stid(wsrep::id("1"), 1, 1),
wsrep::seqno(1),
0);
BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
wsrep::const_buffer("1", 1)) == 0);
ws_meta = wsrep::ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(2)),
wsrep::stid(wsrep::id("1"), 1, 1),
wsrep::seqno(1),
wsrep::provider::flag::commit);
BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
wsrep::const_buffer("1", 1)) == 0);
BOOST_REQUIRE(sc.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()) == 0);
}
BOOST_AUTO_TEST_CASE(server_context_state_strings) BOOST_AUTO_TEST_CASE(server_context_state_strings)
{ {
BOOST_REQUIRE(wsrep::to_string( BOOST_REQUIRE(wsrep::to_string(