diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index d8449d7..6d247a7 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -354,6 +354,14 @@ namespace wsrep */ int after_row(); + /** + * Force a streaming step + * + * This method can be used to replicate a fragment in the + * current context. + */ + int stream(); + /** * Set streaming parameters. * diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 3328c09..0cea1bc 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -175,6 +175,8 @@ namespace wsrep int after_row(); + int stream(); + int before_prepare(wsrep::unique_lock&); int after_prepare(wsrep::unique_lock&); diff --git a/src/client_state.cpp b/src/client_state.cpp index 99c4222..68e23e9 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -336,6 +336,13 @@ int wsrep::client_state::after_row() : 0); } +int wsrep::client_state::stream() +{ + assert(mode_ == m_local); + assert(state_ == s_exec); + return transaction_.stream(); +} + void wsrep::client_state::fragment_applied(wsrep::seqno seqno) { assert(mode_ == m_high_priority); diff --git a/src/transaction.cpp b/src/transaction.cpp index 2e1a984..a555de9 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -273,6 +273,15 @@ int wsrep::transaction::after_row() return ret; } +int wsrep::transaction::stream() +{ + wsrep::unique_lock lock(client_state_.mutex()); + debug_log_state("stream_enter"); + int ret(streaming_step(lock, true)); + debug_log_state("stream_leave"); + return ret; +} + int wsrep::transaction::before_prepare( wsrep::unique_lock& lock) {