mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-07-28 20:02:00 +03:00
This is the first version of the SR speedup feature for MariaDB 10.6.
Now all galera_sr tests succeed when SR speedup is disabled and most galera_sr MTR tests succeed when SR speedup up is enabled. The failing tests are listed in "skipped_tests" file. The following MTR test suite run succeeds: (cd mysql-test; ./mysql-test-run.pl --mysqld="--wsrep_SR_store=undolog" --suite=galera_sr --skip-test-list=../skipped_tests --force)
This commit is contained in:
@ -50,6 +50,19 @@ option(WSREP_LIB_WITH_COVERAGE "Compile with coverage instrumentation" OFF)
|
|||||||
option(WSREP_LIB_STRICT_BUILD_FLAGS "Compile with strict build flags" OFF)
|
option(WSREP_LIB_STRICT_BUILD_FLAGS "Compile with strict build flags" OFF)
|
||||||
option(WSREP_LIB_MAINTAINER_MODE "Fail compilation on any warnings" OFF)
|
option(WSREP_LIB_MAINTAINER_MODE "Fail compilation on any warnings" OFF)
|
||||||
|
|
||||||
|
#
|
||||||
|
# Option for WSREP's Streaming Replication speedup
|
||||||
|
#
|
||||||
|
option(WITH_WSREP_SR_SPEEDUP "WSREP's Streaming Replication speedup" ${WITH_WSREP})
|
||||||
|
#option(WITH_WSREP_SR_SPEEDUP "WSREP's Streaming Replication speedup" OFF)
|
||||||
|
## add_definitions(-DWITH_WSREP_SR_SPEEDUP_CHECK)
|
||||||
|
#option(WITH_WSREP_SR_SPEEDUP_TIME "Timing for Streaming Replication speedup" ON)
|
||||||
|
if (WITH_WSREP_SR_SPEEDUP)
|
||||||
|
ADD_DEFINITIONS(-DWITH_WSREP_SR_SPEEDUP)
|
||||||
|
ADD_DEFINITIONS(-DDEBUG_SR_SPEEDUP)
|
||||||
|
endif()
|
||||||
|
|
||||||
|
|
||||||
# Compiler options
|
# Compiler options
|
||||||
|
|
||||||
# Set std to C++0x/C++11 if superproject has not set standard yet
|
# Set std to C++0x/C++11 if superproject has not set standard yet
|
||||||
|
@ -217,6 +217,14 @@ namespace wsrep
|
|||||||
* been enabled.
|
* been enabled.
|
||||||
*/
|
*/
|
||||||
virtual void debug_crash(const char* crash_point) = 0;
|
virtual void debug_crash(const char* crash_point) = 0;
|
||||||
|
#ifdef WITH_WSREP_SR_SPEEDUP
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the binlog cache for the currently execution
|
||||||
|
* transaction or a NULL pointer if no such cache exists.
|
||||||
|
*/
|
||||||
|
virtual void *get_binlog_cache() = 0;
|
||||||
|
#endif /* WITH_WSREP_SR_SPEEDUP */
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -926,7 +926,16 @@ namespace wsrep
|
|||||||
{
|
{
|
||||||
return transaction_;
|
return transaction_;
|
||||||
}
|
}
|
||||||
|
#ifdef WITH_WSREP_SR_SPEEDUP
|
||||||
|
/**
|
||||||
|
* Return a reference to the transaction associated
|
||||||
|
* with the client state.
|
||||||
|
*/
|
||||||
|
wsrep::transaction& transaction()
|
||||||
|
{
|
||||||
|
return transaction_;
|
||||||
|
}
|
||||||
|
#endif /* WITH_WSREP_SR_SPEEDUP */
|
||||||
const wsrep::ws_meta& toi_meta() const
|
const wsrep::ws_meta& toi_meta() const
|
||||||
{
|
{
|
||||||
return toi_meta_;
|
return toi_meta_;
|
||||||
|
@ -98,6 +98,9 @@ namespace wsrep
|
|||||||
const wsrep::ws_handle& ws_handle,
|
const wsrep::ws_handle& ws_handle,
|
||||||
const wsrep::ws_meta& ws_meta,
|
const wsrep::ws_meta& ws_meta,
|
||||||
const wsrep::const_buffer& data,
|
const wsrep::const_buffer& data,
|
||||||
|
#ifdef WITH_WSREP_SR_SPEEDUP
|
||||||
|
int sr_store,
|
||||||
|
#endif /* WITH_WSREP_SR_SPEEDUP */
|
||||||
const wsrep::xid& xid) = 0;
|
const wsrep::xid& xid) = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -144,8 +147,14 @@ namespace wsrep
|
|||||||
*
|
*
|
||||||
* @return Zero in case of success, non-zero in case of failure
|
* @return Zero in case of success, non-zero in case of failure
|
||||||
*/
|
*/
|
||||||
|
#ifdef WITH_WSREP_SR_SPEEDUP
|
||||||
|
virtual int rollback(const wsrep::ws_handle& ws_handle,
|
||||||
|
const wsrep::ws_meta& ws_meta,
|
||||||
|
bool skip_rollback = false) = 0;
|
||||||
|
#else
|
||||||
virtual int rollback(const wsrep::ws_handle& ws_handle,
|
virtual int rollback(const wsrep::ws_handle& ws_handle,
|
||||||
const wsrep::ws_meta& ws_meta) = 0;
|
const wsrep::ws_meta& ws_meta) = 0;
|
||||||
|
#endif /* WITH_WSREP_SR_SPEEDUP */
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Apply a TOI operation.
|
* Apply a TOI operation.
|
||||||
|
@ -65,8 +65,14 @@ namespace wsrep
|
|||||||
wsrep::transaction_id client_id,
|
wsrep::transaction_id client_id,
|
||||||
int flags,
|
int flags,
|
||||||
const wsrep::const_buffer& data,
|
const wsrep::const_buffer& data,
|
||||||
|
#ifdef WITH_WSREP_SR_SPEEDUP
|
||||||
|
int sr_store,
|
||||||
|
size_t offset,
|
||||||
|
const wsrep::xid& xid,
|
||||||
|
void *binlog_cache) = 0;
|
||||||
|
#else
|
||||||
const wsrep::xid& xid) = 0;
|
const wsrep::xid& xid) = 0;
|
||||||
|
#endif /* WITH_WSREP_SR_SPEEDUP */
|
||||||
/**
|
/**
|
||||||
* Update fragment meta data after certification process.
|
* Update fragment meta data after certification process.
|
||||||
*/
|
*/
|
||||||
@ -77,6 +83,10 @@ namespace wsrep
|
|||||||
* adopted a transaction prior this call.
|
* adopted a transaction prior this call.
|
||||||
*/
|
*/
|
||||||
virtual int remove_fragments() = 0;
|
virtual int remove_fragments() = 0;
|
||||||
|
#ifdef WITH_WSREP_SR_SPEEDUP
|
||||||
|
|
||||||
|
int set_fragments_from_table() {return 0;};
|
||||||
|
#endif /* WITH_WSREP_SR_SPEEDUP */
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Commit the transaction.
|
* Commit the transaction.
|
||||||
|
@ -189,6 +189,17 @@ namespace wsrep
|
|||||||
unit_counter_ = 0;
|
unit_counter_ = 0;
|
||||||
log_position_ = 0;
|
log_position_ = 0;
|
||||||
}
|
}
|
||||||
|
#ifdef WITH_WSREP_SR_SPEEDUP
|
||||||
|
void set_sr_store(int store_type)
|
||||||
|
{
|
||||||
|
sr_store_ = store_type;
|
||||||
|
}
|
||||||
|
|
||||||
|
int get_sr_store() const
|
||||||
|
{
|
||||||
|
return (sr_store_);
|
||||||
|
}
|
||||||
|
#endif /* WITH_WSREP_SR_SPEEDUP */
|
||||||
private:
|
private:
|
||||||
|
|
||||||
void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED)
|
void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED)
|
||||||
@ -204,6 +215,9 @@ namespace wsrep
|
|||||||
size_t fragment_size_;
|
size_t fragment_size_;
|
||||||
size_t unit_counter_;
|
size_t unit_counter_;
|
||||||
size_t log_position_;
|
size_t log_position_;
|
||||||
|
#ifdef WITH_WSREP_SR_SPEEDUP
|
||||||
|
int sr_store_;
|
||||||
|
#endif /* WITH_WSREP_SR_SPEEDUP */
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,6 +142,10 @@ namespace wsrep
|
|||||||
void xa_detach();
|
void xa_detach();
|
||||||
|
|
||||||
int xa_replay(wsrep::unique_lock<wsrep::mutex>&);
|
int xa_replay(wsrep::unique_lock<wsrep::mutex>&);
|
||||||
|
#ifdef WITH_WSREP_SR_SPEEDUP
|
||||||
|
int set_fragments_from_table();
|
||||||
|
void *get_binlog_cache();
|
||||||
|
#endif /* WITH_WSREP_SR_SPEEDUP */
|
||||||
|
|
||||||
bool pa_unsafe() const { return pa_unsafe_; }
|
bool pa_unsafe() const { return pa_unsafe_; }
|
||||||
void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; }
|
void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; }
|
||||||
|
@ -87,15 +87,29 @@ static int apply_fragment(wsrep::server_state& server_state,
|
|||||||
wsrep::high_priority_service* streaming_applier,
|
wsrep::high_priority_service* streaming_applier,
|
||||||
const wsrep::ws_handle& ws_handle,
|
const wsrep::ws_handle& ws_handle,
|
||||||
const wsrep::ws_meta& ws_meta,
|
const wsrep::ws_meta& ws_meta,
|
||||||
const wsrep::const_buffer& data)
|
const wsrep::const_buffer& data, int line)
|
||||||
{
|
{
|
||||||
int ret(0);
|
int ret(0);
|
||||||
int apply_err;
|
int apply_err;
|
||||||
wsrep::mutable_buffer err;
|
wsrep::mutable_buffer err;
|
||||||
|
#ifdef WITH_WSREP_SR_SPEEDUP
|
||||||
|
int sr_store = streaming_applier->transaction().streaming_context().
|
||||||
|
get_sr_store();
|
||||||
|
#endif /* WITH_WSREP_SR_SPEEDUP */
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "server_state: apply_fragment, line = " << line;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
{
|
{
|
||||||
wsrep::high_priority_switch sw(high_priority_service,
|
wsrep::high_priority_switch sw(high_priority_service,
|
||||||
*streaming_applier);
|
*streaming_applier);
|
||||||
apply_err = streaming_applier->apply_write_set(ws_meta, data, err);
|
apply_err = streaming_applier->apply_write_set(ws_meta, data, err);
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "server_state: apply_fragment, line = "
|
||||||
|
<< __LINE__ << ", apply_err = " << apply_err
|
||||||
|
<< ", sr_store = "
|
||||||
|
<< streaming_applier->transaction(
|
||||||
|
).streaming_context().get_sr_store();
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
if (!apply_err)
|
if (!apply_err)
|
||||||
{
|
{
|
||||||
assert(err.size() == 0);
|
assert(err.size() == 0);
|
||||||
@ -124,7 +138,10 @@ static int apply_fragment(wsrep::server_state& server_state,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "server_state: apply_fragment, line = "
|
||||||
|
<< __LINE__ << ", ret = " << ret;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
if (!ret)
|
if (!ret)
|
||||||
{
|
{
|
||||||
if (!apply_err)
|
if (!apply_err)
|
||||||
@ -132,7 +149,11 @@ static int apply_fragment(wsrep::server_state& server_state,
|
|||||||
high_priority_service.debug_crash("crash_apply_cb_before_append_frag");
|
high_priority_service.debug_crash("crash_apply_cb_before_append_frag");
|
||||||
const wsrep::xid xid(streaming_applier->transaction().xid());
|
const wsrep::xid xid(streaming_applier->transaction().xid());
|
||||||
ret = high_priority_service.append_fragment_and_commit(
|
ret = high_priority_service.append_fragment_and_commit(
|
||||||
ws_handle, ws_meta, data, xid);
|
#ifdef WITH_WSREP_SR_SPEEDUP
|
||||||
|
ws_handle, ws_meta, data, sr_store, xid);
|
||||||
|
#else
|
||||||
|
ws_handle, ws_meta, data, xid);
|
||||||
|
#endif /* WITH_WSREP_SR_SPEEDUP */
|
||||||
high_priority_service.debug_crash("crash_apply_cb_after_append_frag");
|
high_priority_service.debug_crash("crash_apply_cb_after_append_frag");
|
||||||
ret = ret || (high_priority_service.after_apply(), 0);
|
ret = ret || (high_priority_service.after_apply(), 0);
|
||||||
}
|
}
|
||||||
@ -357,7 +378,7 @@ static int apply_write_set(wsrep::server_state& server_state,
|
|||||||
sa,
|
sa,
|
||||||
ws_handle,
|
ws_handle,
|
||||||
ws_meta,
|
ws_meta,
|
||||||
data);
|
data, __LINE__);
|
||||||
}
|
}
|
||||||
else if (ws_meta.flags() == 0 || wsrep::prepares_transaction(ws_meta.flags()))
|
else if (ws_meta.flags() == 0 || wsrep::prepares_transaction(ws_meta.flags()))
|
||||||
{
|
{
|
||||||
@ -386,7 +407,7 @@ static int apply_write_set(wsrep::server_state& server_state,
|
|||||||
sa,
|
sa,
|
||||||
ws_handle,
|
ws_handle,
|
||||||
ws_meta,
|
ws_meta,
|
||||||
data);
|
data, __LINE__);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (wsrep::commits_transaction(ws_meta.flags()))
|
else if (wsrep::commits_transaction(ws_meta.flags()))
|
||||||
@ -1145,6 +1166,10 @@ void wsrep::server_state::start_streaming_client(
|
|||||||
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
|
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
|
||||||
wsrep::log::debug_level_server_state,
|
wsrep::log::debug_level_server_state,
|
||||||
"Start streaming client: " << client_state->id());
|
"Start streaming client: " << client_state->id());
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "server_state: start_streaming_client";
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
|
|
||||||
if (streaming_clients_.insert(
|
if (streaming_clients_.insert(
|
||||||
std::make_pair(client_state->id(), client_state)).second == false)
|
std::make_pair(client_state->id(), client_state)).second == false)
|
||||||
{
|
{
|
||||||
@ -1246,6 +1271,10 @@ void wsrep::server_state::start_streaming_applier(
|
|||||||
wsrep::high_priority_service* sa)
|
wsrep::high_priority_service* sa)
|
||||||
{
|
{
|
||||||
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "start_streaming_applier: wsrep_trx_id = "
|
||||||
|
<< transaction_id;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
if (streaming_appliers_.insert(
|
if (streaming_appliers_.insert(
|
||||||
std::make_pair(std::make_pair(server_id, transaction_id),
|
std::make_pair(std::make_pair(server_id, transaction_id),
|
||||||
sa)).second == false)
|
sa)).second == false)
|
||||||
@ -1559,6 +1588,9 @@ void wsrep::server_state::close_orphaned_sr_transactions(
|
|||||||
void wsrep::server_state::close_transactions_at_disconnect(
|
void wsrep::server_state::close_transactions_at_disconnect(
|
||||||
wsrep::high_priority_service& high_priority_service)
|
wsrep::high_priority_service& high_priority_service)
|
||||||
{
|
{
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << __FUNCTION__ << "(" << __LINE__ << ")";
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
// Close streaming applier without removing fragments
|
// Close streaming applier without removing fragments
|
||||||
// from fragment storage. When the server is started again,
|
// from fragment storage. When the server is started again,
|
||||||
// it must be able to recover ongoing streaming transactions.
|
// it must be able to recover ongoing streaming transactions.
|
||||||
@ -1569,9 +1601,19 @@ void wsrep::server_state::close_transactions_at_disconnect(
|
|||||||
{
|
{
|
||||||
wsrep::high_priority_switch sw(high_priority_service,
|
wsrep::high_priority_switch sw(high_priority_service,
|
||||||
*streaming_applier);
|
*streaming_applier);
|
||||||
streaming_applier->rollback(
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
wsrep::ws_handle(), wsrep::ws_meta());
|
wsrep::log_info() << __FUNCTION__ << "(" << __LINE__
|
||||||
streaming_applier->after_apply();
|
<< ") skipped rollback for wsrep::trx = "
|
||||||
|
<< streaming_applier->transaction().id();
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
|
#ifdef WITH_WSREP_SR_SPEEDUP
|
||||||
|
streaming_applier->rollback(
|
||||||
|
wsrep::ws_handle(), wsrep::ws_meta(), true);
|
||||||
|
#else
|
||||||
|
streaming_applier->rollback(
|
||||||
|
wsrep::ws_handle(), wsrep::ws_meta());
|
||||||
|
#endif /* WITH_WSREP_SR_SPEEDUP */
|
||||||
|
streaming_applier->after_apply();
|
||||||
}
|
}
|
||||||
streaming_appliers_.erase(i++);
|
streaming_appliers_.erase(i++);
|
||||||
server_service_.release_high_priority_service(streaming_applier);
|
server_service_.release_high_priority_service(streaming_applier);
|
||||||
|
@ -29,6 +29,12 @@
|
|||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
|
#ifdef WITH_WSREP_SR_SPEEDUP
|
||||||
|
extern void *wsrep_get_current_thd();
|
||||||
|
extern int fragment_cache_remove_transaction(
|
||||||
|
const wsrep::id&, wsrep::transaction_id);
|
||||||
|
#endif /* WITH_WSREP_SR_SPEEDUP */
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
class storage_service_deleter
|
class storage_service_deleter
|
||||||
@ -129,6 +135,10 @@ int wsrep::transaction::start_transaction(
|
|||||||
state_hist_.clear();
|
state_hist_.clear();
|
||||||
ws_handle_ = wsrep::ws_handle(id);
|
ws_handle_ = wsrep::ws_handle(id);
|
||||||
flags(wsrep::provider::flag::start_transaction);
|
flags(wsrep::provider::flag::start_transaction);
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "wsrep::transaction::" << __FUNCTION__
|
||||||
|
<< ": id = " << id.get();
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
switch (client_state_.mode())
|
switch (client_state_.mode())
|
||||||
{
|
{
|
||||||
case wsrep::client_state::m_high_priority:
|
case wsrep::client_state::m_high_priority:
|
||||||
@ -175,6 +185,10 @@ int wsrep::transaction::start_transaction(
|
|||||||
certified_ = true;
|
certified_ = true;
|
||||||
}
|
}
|
||||||
debug_log_state("start_transaction leave");
|
debug_log_state("start_transaction leave");
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "wsrep::transaction::" << __FUNCTION__
|
||||||
|
<< ": id = " << id().get();
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -202,6 +216,12 @@ void wsrep::transaction::fragment_applied(wsrep::seqno seqno)
|
|||||||
{
|
{
|
||||||
assert(active());
|
assert(active());
|
||||||
streaming_context_.applied(seqno);
|
streaming_context_.applied(seqno);
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "wsrep::transaction::fragment_applied: "
|
||||||
|
<< "SR context = " << (void *)&streaming_context_
|
||||||
|
<< ", size = " << streaming_context_.fragments_stored()
|
||||||
|
<< ", seqno = " << seqno.get();
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
}
|
}
|
||||||
|
|
||||||
int wsrep::transaction::prepare_for_ordering(
|
int wsrep::transaction::prepare_for_ordering(
|
||||||
@ -250,6 +270,9 @@ int wsrep::transaction::append_key(const wsrep::key& key)
|
|||||||
|
|
||||||
int wsrep::transaction::append_data(const wsrep::const_buffer& data)
|
int wsrep::transaction::append_data(const wsrep::const_buffer& data)
|
||||||
{
|
{
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << __FUNCTION__ << ": size = " << data.size();
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
|
|
||||||
return provider().append_data(ws_handle_, data);
|
return provider().append_data(ws_handle_, data);
|
||||||
}
|
}
|
||||||
@ -262,6 +285,9 @@ int wsrep::transaction::after_row()
|
|||||||
if (streaming_context_.fragment_size() &&
|
if (streaming_context_.fragment_size() &&
|
||||||
streaming_context_.fragment_unit() != streaming_context::statement)
|
streaming_context_.fragment_unit() != streaming_context::statement)
|
||||||
{
|
{
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << __FUNCTION__;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
ret = streaming_step(lock);
|
ret = streaming_step(lock);
|
||||||
}
|
}
|
||||||
debug_log_state("after_row_leave");
|
debug_log_state("after_row_leave");
|
||||||
@ -304,6 +330,10 @@ int wsrep::transaction::before_prepare(
|
|||||||
// Note: we can't remove fragments here for XA,
|
// Note: we can't remove fragments here for XA,
|
||||||
// the transaction has already issued XA END and
|
// the transaction has already issued XA END and
|
||||||
// is in IDLE state, no more changes allowed!
|
// is in IDLE state, no more changes allowed!
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "before remove_fragments, line = "
|
||||||
|
<< __LINE__;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
ret = client_service_.remove_fragments();
|
ret = client_service_.remove_fragments();
|
||||||
if (ret)
|
if (ret)
|
||||||
{
|
{
|
||||||
@ -329,6 +359,9 @@ int wsrep::transaction::before_prepare(
|
|||||||
flags(flags() | wsrep::provider::flag::pa_unsafe);
|
flags(flags() | wsrep::provider::flag::pa_unsafe);
|
||||||
append_sr_keys_for_commit();
|
append_sr_keys_for_commit();
|
||||||
const bool force_streaming_step = true;
|
const bool force_streaming_step = true;
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << __FUNCTION__ << ", line = " << __LINE__;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
ret = streaming_step(lock, force_streaming_step);
|
ret = streaming_step(lock, force_streaming_step);
|
||||||
if (ret == 0)
|
if (ret == 0)
|
||||||
{
|
{
|
||||||
@ -339,6 +372,10 @@ int wsrep::transaction::before_prepare(
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
ret = certify_commit(lock);
|
ret = certify_commit(lock);
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "after certify_commit(): line = "
|
||||||
|
<< __LINE__ << ", ret = " << ret;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
}
|
}
|
||||||
|
|
||||||
assert((ret == 0 && state() == s_preparing) ||
|
assert((ret == 0 && state() == s_preparing) ||
|
||||||
@ -485,6 +522,10 @@ int wsrep::transaction::before_commit()
|
|||||||
if (ret == 0 && state() == s_prepared)
|
if (ret == 0 && state() == s_prepared)
|
||||||
{
|
{
|
||||||
ret = certify_commit(lock);
|
ret = certify_commit(lock);
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "after certify_commit(): line = "
|
||||||
|
<< __LINE__;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
assert((ret == 0 && state() == s_committing) ||
|
assert((ret == 0 && state() == s_committing) ||
|
||||||
(state() == s_must_abort ||
|
(state() == s_must_abort ||
|
||||||
state() == s_must_replay ||
|
state() == s_must_replay ||
|
||||||
@ -589,6 +630,10 @@ int wsrep::transaction::ordered_commit()
|
|||||||
{
|
{
|
||||||
state(lock, s_ordered_commit);
|
state(lock, s_ordered_commit);
|
||||||
}
|
}
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "wsrep::transaction::" << __FUNCTION__
|
||||||
|
<< ": id = " << id().get() << ", --> " << ret;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
debug_log_state("ordered_commit_leave");
|
debug_log_state("ordered_commit_leave");
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -619,7 +664,11 @@ int wsrep::transaction::after_commit()
|
|||||||
wsrep::storage_service& storage_service(
|
wsrep::storage_service& storage_service(
|
||||||
sr_scope.storage_service());
|
sr_scope.storage_service());
|
||||||
storage_service.adopt_transaction(*this);
|
storage_service.adopt_transaction(*this);
|
||||||
storage_service.remove_fragments();
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "before remove_fragments, line = "
|
||||||
|
<< __LINE__;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
|
storage_service.remove_fragments();
|
||||||
storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta());
|
storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta());
|
||||||
lock.lock();
|
lock.lock();
|
||||||
}
|
}
|
||||||
@ -654,6 +703,10 @@ int wsrep::transaction::after_commit()
|
|||||||
|
|
||||||
int wsrep::transaction::before_rollback()
|
int wsrep::transaction::before_rollback()
|
||||||
{
|
{
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "BEGIN: wsrep::transaction::before_rollback: id = "
|
||||||
|
<< id().get();
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
|
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
|
||||||
debug_log_state("before_rollback_enter");
|
debug_log_state("before_rollback_enter");
|
||||||
assert(state() == s_executing ||
|
assert(state() == s_executing ||
|
||||||
@ -736,11 +789,18 @@ int wsrep::transaction::before_rollback()
|
|||||||
}
|
}
|
||||||
|
|
||||||
debug_log_state("before_rollback_leave");
|
debug_log_state("before_rollback_leave");
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "END: wsrep::transaction::before_rollback";
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int wsrep::transaction::after_rollback()
|
int wsrep::transaction::after_rollback()
|
||||||
{
|
{
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "BEGIN: wsrep::transaction::after_rollback: id = "
|
||||||
|
<< id().get();
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
|
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
|
||||||
debug_log_state("after_rollback_enter");
|
debug_log_state("after_rollback_enter");
|
||||||
assert(state() == s_aborting ||
|
assert(state() == s_aborting ||
|
||||||
@ -759,7 +819,11 @@ int wsrep::transaction::after_rollback()
|
|||||||
wsrep::storage_service& storage_service(
|
wsrep::storage_service& storage_service(
|
||||||
sr_scope.storage_service());
|
sr_scope.storage_service());
|
||||||
storage_service.adopt_transaction(*this);
|
storage_service.adopt_transaction(*this);
|
||||||
storage_service.remove_fragments();
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "before remove_fragments, line = "
|
||||||
|
<< __LINE__;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
|
storage_service.remove_fragments();
|
||||||
storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta());
|
storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta());
|
||||||
}
|
}
|
||||||
lock.lock();
|
lock.lock();
|
||||||
@ -783,6 +847,9 @@ int wsrep::transaction::after_rollback()
|
|||||||
// releasing the commit ordering critical section should be
|
// releasing the commit ordering critical section should be
|
||||||
// also postponed until all resources have been released.
|
// also postponed until all resources have been released.
|
||||||
debug_log_state("after_rollback_leave");
|
debug_log_state("after_rollback_leave");
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "END: wsrep::transaction::after_rollback";
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -819,6 +886,9 @@ int wsrep::transaction::after_statement()
|
|||||||
streaming_context_.fragment_size() &&
|
streaming_context_.fragment_size() &&
|
||||||
streaming_context_.fragment_unit() == streaming_context::statement)
|
streaming_context_.fragment_unit() == streaming_context::statement)
|
||||||
{
|
{
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << __FUNCTION__ << ", line = " << __LINE__;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
ret = streaming_step(lock);
|
ret = streaming_step(lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1237,6 +1307,34 @@ int wsrep::transaction::xa_replay(wsrep::unique_lock<wsrep::mutex>& lock)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef WITH_WSREP_SR_SPEEDUP
|
||||||
|
int wsrep::transaction::set_fragments_from_table()
|
||||||
|
{
|
||||||
|
scoped_storage_service<storage_service_deleter>
|
||||||
|
sr_scope(
|
||||||
|
client_service_,
|
||||||
|
server_service_.storage_service(client_service_),
|
||||||
|
storage_service_deleter(server_service_));
|
||||||
|
wsrep::storage_service& storage_service(
|
||||||
|
sr_scope.storage_service());
|
||||||
|
|
||||||
|
int rcode = storage_service.set_fragments_from_table();
|
||||||
|
|
||||||
|
return (rcode);
|
||||||
|
}
|
||||||
|
|
||||||
|
void *wsrep::transaction::get_binlog_cache()
|
||||||
|
{
|
||||||
|
void *cache = client_service_.get_binlog_cache();
|
||||||
|
|
||||||
|
assert(cache);
|
||||||
|
|
||||||
|
return (cache);
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif /* WITH_WSREP_SR_SPEEDUP */
|
||||||
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
// Private //
|
// Private //
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
@ -1345,6 +1443,14 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock,
|
|||||||
assert(lock.owns_lock());
|
assert(lock.owns_lock());
|
||||||
assert(streaming_context_.fragment_size() || is_xa());
|
assert(streaming_context_.fragment_size() || is_xa());
|
||||||
|
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << __FUNCTION__
|
||||||
|
<< ": bytes_generated = "
|
||||||
|
<< client_service_.bytes_generated()
|
||||||
|
<< ", log_position = "
|
||||||
|
<< streaming_context_.log_position();
|
||||||
|
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
if (client_service_.bytes_generated() <
|
if (client_service_.bytes_generated() <
|
||||||
streaming_context_.log_position())
|
streaming_context_.log_position())
|
||||||
{
|
{
|
||||||
@ -1388,6 +1494,10 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock,
|
|||||||
if (streaming_context_.fragment_size_exceeded() || force)
|
if (streaming_context_.fragment_size_exceeded() || force)
|
||||||
{
|
{
|
||||||
streaming_context_.reset_unit_counter();
|
streaming_context_.reset_unit_counter();
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << __FUNCTION__ << ": bytes_to_replicate = "
|
||||||
|
<< bytes_to_replicate;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
ret = certify_fragment(lock);
|
ret = certify_fragment(lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1397,6 +1507,9 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock,
|
|||||||
int wsrep::transaction::certify_fragment(
|
int wsrep::transaction::certify_fragment(
|
||||||
wsrep::unique_lock<wsrep::mutex>& lock)
|
wsrep::unique_lock<wsrep::mutex>& lock)
|
||||||
{
|
{
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "BEGIN certify_fragment";
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
assert(lock.owns_lock());
|
assert(lock.owns_lock());
|
||||||
|
|
||||||
assert(client_state_.mode() == wsrep::client_state::m_local);
|
assert(client_state_.mode() == wsrep::client_state::m_local);
|
||||||
@ -1424,6 +1537,15 @@ int wsrep::transaction::certify_fragment(
|
|||||||
}
|
}
|
||||||
streaming_context_.set_log_position(log_position);
|
streaming_context_.set_log_position(log_position);
|
||||||
|
|
||||||
|
#ifdef WITH_WSREP_SR_SPEEDUP
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "certify_fragment, line = " << __LINE__
|
||||||
|
<< ", log_position = " << log_position
|
||||||
|
<< ", data size = " << data.size()
|
||||||
|
<< ", wsrep_get_current_thd() = "
|
||||||
|
<< wsrep_get_current_thd();
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
|
#endif /* WITH_WSREP_SR_SPEEDUP */
|
||||||
if (data.size() == 0)
|
if (data.size() == 0)
|
||||||
{
|
{
|
||||||
wsrep::log_warning() << "Attempt to replicate empty data buffer";
|
wsrep::log_warning() << "Attempt to replicate empty data buffer";
|
||||||
@ -1432,6 +1554,9 @@ int wsrep::transaction::certify_fragment(
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "certify_fragment, line = " << __LINE__;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
if (provider().append_data(ws_handle_,
|
if (provider().append_data(ws_handle_,
|
||||||
wsrep::const_buffer(data.data(), data.size())))
|
wsrep::const_buffer(data.data(), data.size())))
|
||||||
{
|
{
|
||||||
@ -1440,6 +1565,9 @@ int wsrep::transaction::certify_fragment(
|
|||||||
client_state_.override_error(wsrep::e_error_during_commit);
|
client_state_.override_error(wsrep::e_error_during_commit);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "certify_fragment, line = " << __LINE__;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
|
|
||||||
if (is_xa())
|
if (is_xa())
|
||||||
{
|
{
|
||||||
@ -1490,12 +1618,20 @@ int wsrep::transaction::certify_fragment(
|
|||||||
id(),
|
id(),
|
||||||
flags(),
|
flags(),
|
||||||
wsrep::const_buffer(data.data(), data.size()),
|
wsrep::const_buffer(data.data(), data.size()),
|
||||||
|
#ifdef WITH_WSREP_SR_SPEEDUP
|
||||||
|
streaming_context_.get_sr_store(),
|
||||||
|
log_position - data.size(),
|
||||||
|
xid(), get_binlog_cache()))
|
||||||
|
#else
|
||||||
xid()))
|
xid()))
|
||||||
|
#endif /* WITH_WSREP_SR_SPEEDUP */
|
||||||
{
|
{
|
||||||
ret = 1;
|
ret = 1;
|
||||||
error = wsrep::e_append_fragment_error;
|
error = wsrep::e_append_fragment_error;
|
||||||
}
|
}
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "certify_fragment, line = " << __LINE__;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
if (ret == 0)
|
if (ret == 0)
|
||||||
{
|
{
|
||||||
client_service_.debug_crash(
|
client_service_.debug_crash(
|
||||||
@ -1620,6 +1756,9 @@ int wsrep::transaction::certify_fragment(
|
|||||||
state(lock, s_executing);
|
state(lock, s_executing);
|
||||||
flags(flags() & ~wsrep::provider::flag::start_transaction);
|
flags(flags() & ~wsrep::provider::flag::start_transaction);
|
||||||
}
|
}
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "END certify_fragment";
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1630,6 +1769,9 @@ int wsrep::transaction::certify_commit(
|
|||||||
assert(active());
|
assert(active());
|
||||||
client_service_.wait_for_replayers(lock);
|
client_service_.wait_for_replayers(lock);
|
||||||
|
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "BEGIN certify_commit";
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
assert(lock.owns_lock());
|
assert(lock.owns_lock());
|
||||||
|
|
||||||
if (abort_or_interrupt(lock))
|
if (abort_or_interrupt(lock))
|
||||||
@ -1805,6 +1947,11 @@ int wsrep::transaction::certify_commit(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "END certify_commit: trx_id = "
|
||||||
|
<< id().get()
|
||||||
|
<< ", cert_ret = " << cert_ret;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1831,6 +1978,9 @@ int wsrep::transaction::append_sr_keys_for_commit()
|
|||||||
|
|
||||||
void wsrep::transaction::streaming_rollback(wsrep::unique_lock<wsrep::mutex>& lock)
|
void wsrep::transaction::streaming_rollback(wsrep::unique_lock<wsrep::mutex>& lock)
|
||||||
{
|
{
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "BEGIN: wsrep::transaction::streaming_rollback";
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
debug_log_state("streaming_rollback enter");
|
debug_log_state("streaming_rollback enter");
|
||||||
assert(state_ != s_must_replay);
|
assert(state_ != s_must_replay);
|
||||||
assert(is_streaming());
|
assert(is_streaming());
|
||||||
@ -1874,6 +2024,9 @@ void wsrep::transaction::streaming_rollback(wsrep::unique_lock<wsrep::mutex>& lo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
debug_log_state("streaming_rollback leave");
|
debug_log_state("streaming_rollback leave");
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "END: wsrep::transaction::streaming_rollback";
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
}
|
}
|
||||||
|
|
||||||
int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
|
int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
|
||||||
@ -1904,6 +2057,10 @@ int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
|
|||||||
}
|
}
|
||||||
if (is_streaming())
|
if (is_streaming())
|
||||||
{
|
{
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "before clear_fragments, line = "
|
||||||
|
<< __LINE__;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
clear_fragments();
|
clear_fragments();
|
||||||
}
|
}
|
||||||
provider().release(ws_handle_);
|
provider().release(ws_handle_);
|
||||||
@ -1913,6 +2070,10 @@ int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
|
|||||||
wsrep::e_deadlock_error);
|
wsrep::e_deadlock_error);
|
||||||
if (is_streaming())
|
if (is_streaming())
|
||||||
{
|
{
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "before remove_fragments, line = "
|
||||||
|
<< __LINE__;
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
client_service_.remove_fragments();
|
client_service_.remove_fragments();
|
||||||
clear_fragments();
|
clear_fragments();
|
||||||
}
|
}
|
||||||
@ -1932,7 +2093,14 @@ int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
|
|||||||
|
|
||||||
void wsrep::transaction::clear_fragments()
|
void wsrep::transaction::clear_fragments()
|
||||||
{
|
{
|
||||||
|
#ifdef DEBUG_SR_SPEEDUP
|
||||||
|
wsrep::log_info() << "END: wsrep::transaction::clear_fragments: id = "
|
||||||
|
<< id_ << ", server = |" << server_id_ << "|";
|
||||||
|
#endif /* DEBUG_SR_SPEEDUP */
|
||||||
streaming_context_.cleanup();
|
streaming_context_.cleanup();
|
||||||
|
#ifdef WITH_WSREP_SR_SPEEDUP
|
||||||
|
fragment_cache_remove_transaction(server_id_, id_);
|
||||||
|
#endif /* WITH_WSREP_SR_SPEEDUP */
|
||||||
}
|
}
|
||||||
|
|
||||||
void wsrep::transaction::cleanup()
|
void wsrep::transaction::cleanup()
|
||||||
|
Reference in New Issue
Block a user