From 3de594b662752a741391c8461c623baf14160172 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Thu, 21 Nov 2024 14:29:11 +0200 Subject: [PATCH] Add application defined sequential consistency for certification Client state methods before_prepare() and before_commit() accept a callback which is called by the provider after it can guarantee sequential consistency. This allows application threads which wish to maintain sequential consistency to enter before_prepare() and before_commit() calls concurrently without waiting the prior call to finish. --- include/wsrep/client_state.hpp | 42 ++++++++++++++++++++++++++++++++-- include/wsrep/provider.hpp | 33 +++++++++++++++++++++++--- include/wsrep/transaction.hpp | 8 ++++--- src/client_state.cpp | 8 +++---- src/transaction.cpp | 22 +++++++++--------- src/wsrep_provider_v26.cpp | 24 +++++++++++++++---- src/wsrep_provider_v26.hpp | 2 +- test/mock_provider.hpp | 3 ++- wsrep-API/v26 | 2 +- 9 files changed, 113 insertions(+), 31 deletions(-) diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index d8449d7..b81ee9a 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -413,11 +413,49 @@ namespace wsrep /** @name Commit ordering interface */ /** @{ */ - int before_prepare(); + + /** + * This method should be called before the transaction + * is prepared. This call certifies the transaction and + * assigns write set meta data. + * + * @param seq_cb Callback which is passed to underlying + * certify() call. See wsrep::provider::certify(). + * + * @return Zero on success, non-zero on failure. + */ + int before_prepare(const wsrep::provider::seq_cb_t* seq_cb); + + /** Same as before_prepare() above, but nullptr is passed + * to seq_cb. */ + int before_prepare() + { + return before_prepare(nullptr); + } int after_prepare(); - int before_commit(); + /** + * This method should be called before transaction is committed. + * This call makes the transaction to enter commit time + * critical section. The critical section is left by calling + * ordered_commit(). + * + * If before_prepare() is not called before this call, the + * before_prepare() is called internally. + * + * @param seq_cb Callback which is passed to underlying + * before_prepare() call. + * + * @return Zero on success, non-zero on failure. + */ + int before_commit(const wsrep::provider::seq_cb_t* seq_cb); + + /** Same as before_commit(), but nullptr is passed to seq_cb. */ + int before_commit() + { + return before_commit(nullptr); + } int ordered_commit(); diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index 5e82ecd..9b62079 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -332,10 +332,37 @@ namespace wsrep virtual int append_key(wsrep::ws_handle&, const wsrep::key&) = 0; virtual enum status append_data( wsrep::ws_handle&, const wsrep::const_buffer&) = 0; + + /** + * Callback for application defined sequential consistency. + * The provider will call + * the callback once it can guarantee sequential consistency. */ + typedef struct seq_cb { + /** Opaque caller context */ + void *ctx; + /** Function to be called by the provider when sequential + * consistency is guaranteed. */ + void (*fn)(void *ctx); + } seq_cb_t; + + /** + * Certify the write set. + * + * @param client_id[in] Id of the client session. + * @param ws_handle[in,out] Write set handle associated to the current + * transaction. + * @param flags[in] Flags associated to the write set (see struct flag). + * @param ws_meta[out] Write set meta data associated to the + * replicated write set. + * @param seq_cb[in] Optional callback for application defined + * sequential consistency. + * + * @return Status code defined in struct status. + */ virtual enum status - certify(wsrep::client_id, wsrep::ws_handle&, - int, - wsrep::ws_meta&) = 0; + certify(wsrep::client_id client_id, wsrep::ws_handle& ws_handle, + int flags, wsrep::ws_meta& ws_meta, const seq_cb_t* seq_cb) + = 0; /** * BF abort a transaction inside provider. * diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 3328c09..3e4ce7e 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -175,11 +175,12 @@ namespace wsrep int after_row(); - int before_prepare(wsrep::unique_lock&); + int before_prepare(wsrep::unique_lock&, + const wsrep::provider::seq_cb_t*); int after_prepare(wsrep::unique_lock&); - int before_commit(); + int before_commit(const wsrep::provider::seq_cb_t*); int ordered_commit(); @@ -248,7 +249,8 @@ namespace wsrep bool abort_or_interrupt(wsrep::unique_lock&); int streaming_step(wsrep::unique_lock&, bool force = false); int certify_fragment(wsrep::unique_lock&); - int certify_commit(wsrep::unique_lock&); + int certify_commit(wsrep::unique_lock&, + const wsrep::provider::seq_cb_t*); int append_sr_keys_for_commit(); int release_commit_order(wsrep::unique_lock&); void remove_fragments_in_storage_service_scope( diff --git a/src/client_state.cpp b/src/client_state.cpp index c6708f1..047e67a 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -365,12 +365,12 @@ int wsrep::client_state::next_fragment(const wsrep::ws_meta& meta) return transaction_.next_fragment(meta); } -int wsrep::client_state::before_prepare() +int wsrep::client_state::before_prepare(const wsrep::provider::seq_cb_t* seq_cb) { wsrep::unique_lock lock(mutex_); assert(owning_thread_id_ == wsrep::this_thread::get_id()); assert(state_ == s_exec); - return transaction_.before_prepare(lock); + return transaction_.before_prepare(lock, seq_cb); } int wsrep::client_state::after_prepare() @@ -381,11 +381,11 @@ int wsrep::client_state::after_prepare() return transaction_.after_prepare(lock); } -int wsrep::client_state::before_commit() +int wsrep::client_state::before_commit(const wsrep::provider::seq_cb_t* seq_cb) { assert(owning_thread_id_ == wsrep::this_thread::get_id()); assert(state_ == s_exec || mode_ == m_local); - return transaction_.before_commit(); + return transaction_.before_commit(seq_cb); } int wsrep::client_state::ordered_commit() diff --git a/src/transaction.cpp b/src/transaction.cpp index c5e5bf3..3688ad7 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -273,8 +273,8 @@ int wsrep::transaction::after_row() return ret; } -int wsrep::transaction::before_prepare( - wsrep::unique_lock& lock) +int wsrep::transaction::before_prepare(wsrep::unique_lock& lock, + const wsrep::provider::seq_cb_t* seq_cb) { assert(lock.owns_lock()); int ret(0); @@ -349,7 +349,7 @@ int wsrep::transaction::before_prepare( } else { - ret = certify_commit(lock); + ret = certify_commit(lock, seq_cb); } assert((ret == 0 && state() == s_preparing) || @@ -445,7 +445,7 @@ int wsrep::transaction::after_prepare( return ret; } -int wsrep::transaction::before_commit() +int wsrep::transaction::before_commit(const wsrep::provider::seq_cb* seq_cb) { int ret(1); @@ -465,7 +465,7 @@ int wsrep::transaction::before_commit() case wsrep::client_state::m_local: if (state() == s_executing) { - ret = before_prepare(lock) || after_prepare(lock); + ret = before_prepare(lock, seq_cb) || after_prepare(lock); assert((ret == 0 && (state() == s_committing || state() == s_prepared)) || @@ -495,7 +495,7 @@ int wsrep::transaction::before_commit() if (ret == 0 && state() == s_prepared) { - ret = certify_commit(lock); + ret = certify_commit(lock, nullptr); assert((ret == 0 && state() == s_committing) || (state() == s_must_abort || state() == s_must_replay || @@ -543,7 +543,7 @@ int wsrep::transaction::before_commit() } else if (state() == s_executing || state() == s_replaying) { - ret = before_prepare(lock) || after_prepare(lock); + ret = before_prepare(lock, nullptr) || after_prepare(lock); } else { @@ -1195,7 +1195,7 @@ int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid, provider().certify(client_state_.id(), ws_handle_, flags(), - meta)); + meta, nullptr)); int ret; if (cert_ret == wsrep::provider::success) @@ -1622,7 +1622,7 @@ int wsrep::transaction::certify_fragment( cert_ret = provider().certify(client_state_.id(), ws_handle_, flags(), - sr_ws_meta); + sr_ws_meta, nullptr); client_service_.debug_crash( "crash_replicate_fragment_after_certify"); @@ -1744,7 +1744,7 @@ int wsrep::transaction::certify_fragment( } int wsrep::transaction::certify_commit( - wsrep::unique_lock& lock) + wsrep::unique_lock& lock, const provider::seq_cb_t* seq_cb) { assert(lock.owns_lock()); assert(active()); @@ -1828,7 +1828,7 @@ int wsrep::transaction::certify_commit( cert_ret(provider().certify(client_state_.id(), ws_handle_, flags(), - ws_meta_)); + ws_meta_, seq_cb)); client_service_.debug_sync("wsrep_after_certification"); lock.lock(); diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 8fa9fee..80240fe 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -674,6 +674,7 @@ namespace } wsrep_node_isolation_mode_set_fn_v1 node_isolation_mode_set; + wsrep_certify_fn_v1 certify_v1; } @@ -721,6 +722,9 @@ void wsrep::wsrep_provider_v26::init_services( node_isolation_mode_set = wsrep_impl::resolve_function( wsrep_->dlh, WSREP_NODE_ISOLATION_MODE_SET_V1); + + certify_v1 = wsrep_impl::resolve_function( + wsrep_->dlh, WSREP_CERTIFY_V1); } void wsrep::wsrep_provider_v26::deinit_services() @@ -922,14 +926,24 @@ enum wsrep::provider::status wsrep::wsrep_provider_v26::certify(wsrep::client_id client_id, wsrep::ws_handle& ws_handle, int flags, - wsrep::ws_meta& ws_meta) + wsrep::ws_meta& ws_meta, + const seq_cb_t* seq_cb) { mutable_ws_handle mwsh(ws_handle); mutable_ws_meta mmeta(ws_meta, flags); - return map_return_value( - wsrep_->certify(wsrep_, client_id.get(), mwsh.native(), - mmeta.native_flags(), - mmeta.native())); + if (seq_cb && certify_v1) + { + wsrep_seq_cb_t wseq_cb{seq_cb->ctx, seq_cb->fn}; + return map_return_value(certify_v1(wsrep_, client_id.get(), + mwsh.native(), mmeta.native_flags(), + mmeta.native(), &wseq_cb)); + } + else + { + return map_return_value( + wsrep_->certify(wsrep_, client_id.get(), mwsh.native(), + mmeta.native_flags(), mmeta.native())); + } } enum wsrep::provider::status diff --git a/src/wsrep_provider_v26.hpp b/src/wsrep_provider_v26.hpp index 2e03c55..0fc8259 100644 --- a/src/wsrep_provider_v26.hpp +++ b/src/wsrep_provider_v26.hpp @@ -59,7 +59,7 @@ namespace wsrep enum wsrep::provider::status certify(wsrep::client_id, wsrep::ws_handle&, int, - wsrep::ws_meta&) WSREP_OVERRIDE; + wsrep::ws_meta&, const seq_cb_t*) WSREP_OVERRIDE; enum wsrep::provider::status bf_abort(wsrep::seqno, wsrep::transaction_id, diff --git a/test/mock_provider.hpp b/test/mock_provider.hpp index b99ddeb..75233a7 100644 --- a/test/mock_provider.hpp +++ b/test/mock_provider.hpp @@ -79,7 +79,8 @@ namespace wsrep certify(wsrep::client_id client_id, wsrep::ws_handle& ws_handle, int flags, - wsrep::ws_meta& ws_meta) + wsrep::ws_meta& ws_meta, + const seq_cb* /* Ignored in unit tests. */) WSREP_OVERRIDE { ws_handle = wsrep::ws_handle(ws_handle.transaction_id(), (void*)1); diff --git a/wsrep-API/v26 b/wsrep-API/v26 index 427c73c..12c02f5 160000 --- a/wsrep-API/v26 +++ b/wsrep-API/v26 @@ -1 +1 @@ -Subproject commit 427c73c5c8c443765ec16bfc70d94a65a7fca64c +Subproject commit 12c02f5fdafec70e55d52536b4068e7728675f01