From a8ab9c6dbddb685da5e15ceb530ceb1c9147f6a5 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Thu, 14 Jun 2018 11:14:40 +0300 Subject: [PATCH] SR applying implementation, unit tests. --- dbsim/dbms_simulator.cpp | 30 ++++--- include/wsrep/client_context.hpp | 6 -- include/wsrep/server_context.hpp | 17 ++-- include/wsrep/transaction_context.hpp | 8 -- src/server_context.cpp | 113 +++++++++++++++++++++++++- src/transaction_context.cpp | 10 ++- test/client_context_fixture.hpp | 4 - test/fake_server_context.hpp | 14 ++-- test/server_context_test.cpp | 35 +++++++- 9 files changed, 188 insertions(+), 49 deletions(-) diff --git a/dbsim/dbms_simulator.cpp b/dbsim/dbms_simulator.cpp index 4d1e9b9..a1481f8 100644 --- a/dbsim/dbms_simulator.cpp +++ b/dbsim/dbms_simulator.cpp @@ -37,6 +37,7 @@ struct dbms_simulator_params size_t n_servers; size_t n_clients; size_t n_transactions; + size_t n_rows; size_t alg_freq; std::string wsrep_provider; std::string wsrep_provider_options; @@ -46,6 +47,7 @@ struct dbms_simulator_params : n_servers(0) , n_clients(0) , n_transactions(0) + , n_rows(1000) , alg_freq(0) , wsrep_provider() , wsrep_provider_options() @@ -265,11 +267,13 @@ public: // Client context management wsrep::client_context* local_client_context(); - wsrep::client_context& streaming_applier_client_context( - const wsrep::id&, const wsrep::transaction_id&) override + wsrep::client_context* streaming_applier_client_context() override { throw wsrep::not_implemented_error(); } + void log_dummy_write_set(wsrep::client_context&, const wsrep::ws_meta&) + override + { } size_t next_transaction_id() { return (last_transaction_id_.fetch_add(1) + 1); @@ -308,12 +312,12 @@ public: dbms_client(dbms_server& server, const wsrep::client_id& id, enum wsrep::client_context::mode mode, - size_t n_transactions) + const dbms_simulator_params& params) : wsrep::client_context(mutex_, server, id, mode) , mutex_() , server_(server) , se_trx_(server_.storage_engine()) - , n_transactions_(n_transactions) + , params_(params) , stats_() { } @@ -324,7 +328,7 @@ public: void start() { - for (size_t i(0); i < n_transactions_; ++i) + for (size_t i(0); i < params_.n_transactions; ++i) { run_one_transaction(); report_progress(i + 1); @@ -458,7 +462,7 @@ private: // wsrep::log_debug() << "Generate write set"; assert(transaction().active()); assert(err == 0); - int data(std::rand() % 10000000); + int data(std::rand() % params_.n_rows); std::ostringstream os; os << data; wsrep::key key(wsrep::key::exclusive); @@ -513,13 +517,13 @@ private: { wsrep::log() << "client: " << id().get() << " transactions: " << i - << " " << 100*double(i)/n_transactions_ << "%"; + << " " << 100*double(i)/params_.n_transactions << "%"; } } wsrep::default_mutex mutex_; dbms_server& server_; dbms_storage_engine::transaction se_trx_; - const size_t n_transactions_; + const dbms_simulator_params params_; struct stats stats_; }; @@ -529,7 +533,7 @@ void dbms_server::applier_thread() { wsrep::client_id client_id(last_client_id_.fetch_add(1) + 1); dbms_client applier(*this, client_id, - wsrep::client_context::m_applier, 0); + wsrep::client_context::m_applier, simulator_.params()); enum wsrep::provider::status ret(provider().run_applier(&applier)); wsrep::log() << "Applier thread exited with error code " << ret; } @@ -538,7 +542,9 @@ wsrep::client_context* dbms_server::local_client_context() { std::ostringstream id_os; size_t client_id(++last_client_id_); - return new dbms_client(*this, client_id, wsrep::client_context::m_replicating, 0); + return new dbms_client(*this, client_id, + wsrep::client_context::m_replicating, + simulator_.params()); } void dbms_server::start_clients() @@ -576,7 +582,7 @@ void dbms_server::start_client(size_t id) auto client(std::make_shared( *this, id, wsrep::client_context::m_replicating, - simulator_.params().n_transactions)); + simulator_.params())); clients_.push_back(client); client_threads_.push_back( boost::thread(&dbms_server::client_thread, this, client)); @@ -764,6 +770,8 @@ int main(int argc, char** argv) "number of clients to start per server") ("transactions", po::value(¶ms.n_transactions), "number of transactions run by a client") + ("rows", po::value(¶ms.n_rows), + "number of rows per table") ("alg-freq", po::value(¶ms.alg_freq), "ALG frequency") ("debug-log-level", po::value(¶ms.debug_log_level), diff --git a/include/wsrep/client_context.hpp b/include/wsrep/client_context.hpp index 9933fe4..f2a1cef 100644 --- a/include/wsrep/client_context.hpp +++ b/include/wsrep/client_context.hpp @@ -210,12 +210,6 @@ namespace wsrep */ enum after_statement_result after_statement(); - int start_transaction() - { - assert(state_ == s_exec); - return transaction_.start_transaction(); - } - int start_transaction(const wsrep::transaction_id& id) { assert(state_ == s_exec); diff --git a/include/wsrep/server_context.hpp b/include/wsrep/server_context.hpp index 5f49d64..18d0061 100644 --- a/include/wsrep/server_context.hpp +++ b/include/wsrep/server_context.hpp @@ -208,16 +208,23 @@ namespace wsrep * \param transaction_id Transaction ID of the SR transaction on the * origin server. */ - virtual client_context& streaming_applier_client_context( - const wsrep::id& server_id, - const wsrep::transaction_id& transaction_id - ) = 0; + virtual client_context* streaming_applier_client_context() = 0; - void insert_streaming_applier( + void start_streaming_applier( const wsrep::id&, const wsrep::transaction_id&, wsrep::client_context* client_context); + void stop_streaming_applier( + const wsrep::id&, const wsrep::transaction_id&); + /*! + * Return reference to streaming applier. + */ + client_context* find_streaming_applier(const wsrep::id&, + const wsrep::transaction_id&) const; + + virtual void log_dummy_write_set(wsrep::client_context&, + const wsrep::ws_meta&) = 0; /*! * Load WSRep provider. * diff --git a/include/wsrep/transaction_context.hpp b/include/wsrep/transaction_context.hpp index 595ec0f..4953b48 100644 --- a/include/wsrep/transaction_context.hpp +++ b/include/wsrep/transaction_context.hpp @@ -80,14 +80,6 @@ namespace wsrep bool pa_unsafe() const { return pa_unsafe_; } void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; } - // - int start_transaction() - { - assert(active() == false); - assert(ws_meta_.transaction_id() != transaction_id::invalid()); - return start_transaction(ws_meta_.transaction_id()); - } - int start_transaction(const wsrep::transaction_id& id); int start_transaction(const wsrep::ws_handle& ws_handle, diff --git a/src/server_context.cpp b/src/server_context.cpp index 1b28210..d793410 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -122,7 +122,6 @@ int wsrep::server_context::on_apply( // wsrep::log_debug() << "server_context::on apply flags: " // << flags_to_string(ws_meta.flags()); assert(ws_handle.opaque()); - assert(ws_meta.flags()); bool not_replaying(txc.state() != wsrep::transaction_context::s_replaying); @@ -164,12 +163,91 @@ int wsrep::server_context::on_apply( assert(ret || txc.state() == wsrep::transaction_context::s_committed); } + else if (starts_transaction(ws_meta.flags())) + { + assert(not_replaying); + assert(find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id()) == 0); + wsrep::client_context* sac(streaming_applier_client_context()); + start_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id(), sac); + sac->start_transaction(ws_handle, ws_meta); + { + // TODO: Client context switch will be ultimately + // implemented by the application. Therefore need to + // introduce a virtual method call which takes + // both original and sac client contexts as argument + // and a functor which does the applying. + wsrep::client_context_switch sw(client_context, *sac); + sac->before_command(); + sac->before_statement(); + sac->apply(data); + sac->after_statement(); + sac->after_command_before_result(); + sac->after_command_after_result(); + } + log_dummy_write_set(client_context, ws_meta); + } + else if (ws_meta.flags() == 0) + { + wsrep::client_context* sac( + find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id())); + if (sac == 0) + { + // It is possible that rapid group membership changes + // may cause streaming transaction be rolled back before + // commit fragment comes in. Although this is a valid + // situation, log a warning if a sac cannot be found as + // it may be an indication of a bug too. + wsrep::log_warning() << "Could not find applier context for " + << ws_meta.server_id() + << ": " << ws_meta.transaction_id(); + } + else + { + wsrep::client_context_switch(client_context, *sac); + sac->before_command(); + sac->before_statement(); + ret = sac->apply(data); + sac->after_statement(); + sac->after_command_before_result(); + sac->after_command_after_result(); + } + log_dummy_write_set(client_context, ws_meta); + } else if (commits_transaction(ws_meta.flags())) { if (not_replaying) { - // SR commit not implemented yet - assert(0); + wsrep::client_context* sac( + find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id())); + assert(sac); + if (sac == 0) + { + // It is possible that rapid group membership changes + // may cause streaming transaction be rolled back before + // commit fragment comes in. Although this is a valid + // situation, log a warning if a sac cannot be found as + // it may be an indication of a bug too. + wsrep::log_warning() << "Could not find applier context for " + << ws_meta.server_id() + << ": " << ws_meta.transaction_id(); + } + else + { + wsrep::client_context_switch(client_context, *sac); + sac->before_command(); + sac->before_statement(); + ret = sac->commit(); + sac->after_statement(); + sac->after_command_before_result(); + sac->after_command_after_result(); + stop_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id()); + delete sac; + } } else { @@ -198,7 +276,7 @@ bool wsrep::server_context::statement_allowed_for_streaming( return false; } -void wsrep::server_context::insert_streaming_applier( +void wsrep::server_context::start_streaming_applier( const wsrep::id& server_id, const wsrep::transaction_id& transaction_id, wsrep::client_context* client_context) @@ -213,6 +291,33 @@ void wsrep::server_context::insert_streaming_applier( } } +void wsrep::server_context::stop_streaming_applier( + const wsrep::id& server_id, + const wsrep::transaction_id& transaction_id) +{ + streaming_appliers_map::iterator i( + streaming_appliers_.find(std::make_pair(server_id, transaction_id))); + assert(i != streaming_appliers_.end()); + if (i == streaming_appliers_.end()) + { + wsrep::log_warning() << "Could not find streaming applier for " + << server_id << ":" << transaction_id; + } + else + { + streaming_appliers_.erase(i); + } +} + +wsrep::client_context* wsrep::server_context::find_streaming_applier( + const wsrep::id& server_id, + const wsrep::transaction_id& transaction_id) const +{ + streaming_appliers_map::const_iterator i( + streaming_appliers_.find(std::make_pair(server_id, transaction_id))); + return (i == streaming_appliers_.end() ? 0 : i->second); +} + // Private void wsrep::server_context::state( diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index 09b8cdf..cb209be 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -71,6 +71,7 @@ int wsrep::transaction_context::start_transaction( { assert(ws_meta.flags()); assert(active() == false); + id_ = ws_meta.transaction_id(); assert(client_context_.mode() == wsrep::client_context::m_applier); state_ = s_executing; ws_handle_ = ws_handle; @@ -900,10 +901,11 @@ int wsrep::transaction_context::certify_commit( void wsrep::transaction_context::streaming_rollback() { assert(streaming_context_.rolled_back() == false); - wsrep::client_context& applier_context( - client_context_.server_context().streaming_applier_client_context( - client_context_.server_context().id(), id())); - applier_context.adopt_transaction(*this); + wsrep::client_context* sac( + client_context_.server_context().streaming_applier_client_context()); + client_context_.server_context().start_streaming_applier( + client_context_.server_context().id(), id(), sac); + sac->adopt_transaction(*this); streaming_context_.cleanup(); // Replicate rollback fragment provider_.rollback(id_.get()); diff --git a/test/client_context_fixture.hpp b/test/client_context_fixture.hpp index 31871a9..d78b70a 100644 --- a/test/client_context_fixture.hpp +++ b/test/client_context_fixture.hpp @@ -108,8 +108,6 @@ namespace wsrep::provider::flag::start_transaction | wsrep::provider::flag::commit); BOOST_REQUIRE(cc.start_transaction(ws_handle, ws_meta) == 0); - BOOST_REQUIRE(tc.active() == false); - BOOST_REQUIRE(cc.start_transaction() == 0); BOOST_REQUIRE(tc.active() == true); BOOST_REQUIRE(tc.certified() == true); BOOST_REQUIRE(tc.ordered() == true); @@ -138,8 +136,6 @@ namespace wsrep::provider::flag::start_transaction | wsrep::provider::flag::commit); BOOST_REQUIRE(cc.start_transaction(ws_handle, ws_meta) == 0); - BOOST_REQUIRE(tc.active() == false); - BOOST_REQUIRE(cc.start_transaction() == 0); BOOST_REQUIRE(tc.active() == true); BOOST_REQUIRE(tc.certified() == true); BOOST_REQUIRE(tc.ordered() == true); diff --git a/test/fake_server_context.hpp b/test/fake_server_context.hpp index 395da8c..bfcdb09 100644 --- a/test/fake_server_context.hpp +++ b/test/fake_server_context.hpp @@ -33,13 +33,15 @@ namespace wsrep return new wsrep::fake_client_context(*this, ++last_client_id_, wsrep::client_context::m_local); } - wsrep::client_context& streaming_applier_client_context( - const wsrep::id& server_id, - const wsrep::transaction_id& transaction_id) + wsrep::client_context* streaming_applier_client_context() { - wsrep::client_context* sac(new wsrep::fake_client_context(*this, ++last_client_id_, wsrep::client_context::m_applier)); - insert_streaming_applier(server_id, transaction_id, sac); - return *sac; + return new wsrep::fake_client_context( + *this, ++last_client_id_, wsrep::client_context::m_applier); + } + void log_dummy_write_set(wsrep::client_context&, const wsrep::ws_meta&) + WSREP_OVERRIDE + { + // } void on_connect() WSREP_OVERRIDE { } void wait_until_connected() WSREP_OVERRIDE { } diff --git a/test/server_context_test.cpp b/test/server_context_test.cpp index 04c9d97..ed9f4a5 100644 --- a/test/server_context_test.cpp +++ b/test/server_context_test.cpp @@ -24,7 +24,6 @@ namespace wsrep::provider::flag::start_transaction | wsrep::provider::flag::commit) { - cc.start_transaction(ws_handle, ws_meta); } wsrep::fake_server_context sc; wsrep::fake_client_context cc; @@ -84,6 +83,40 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_2pc_rollback, BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_aborted); } +BOOST_AUTO_TEST_CASE(server_context_streaming) +{ + wsrep::fake_server_context sc("s1", "s1", + wsrep::server_context::rm_sync); + wsrep::fake_client_context cc(sc, + wsrep::client_id(1), + wsrep::client_context::m_applier, + false); + wsrep::ws_handle ws_handle(1, (void*)1); + wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(1)), + wsrep::stid(wsrep::id("1"), 1, 1), + wsrep::seqno(0), + wsrep::provider::flag::start_transaction); + BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, + wsrep::const_buffer("1", 1)) == 0); + BOOST_REQUIRE(sc.find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id())); + ws_meta = wsrep::ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(2)), + wsrep::stid(wsrep::id("1"), 1, 1), + wsrep::seqno(1), + 0); + BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, + wsrep::const_buffer("1", 1)) == 0); + ws_meta = wsrep::ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(2)), + wsrep::stid(wsrep::id("1"), 1, 1), + wsrep::seqno(1), + wsrep::provider::flag::commit); + BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, + wsrep::const_buffer("1", 1)) == 0); + BOOST_REQUIRE(sc.find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id()) == 0); +} + + BOOST_AUTO_TEST_CASE(server_context_state_strings) { BOOST_REQUIRE(wsrep::to_string(