1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-06-07 07:22:02 +03:00

Removing assert() calls from public headers

Removed calls to assert() from public headers to have
full control when assertions are enabled in wsrep-lib
code regardless of parent project build configuration.
Moved methods containing assertions and non-trivial
code from headers into compilation units.
This commit is contained in:
Teemu Ollakka 2023-02-26 09:12:24 +02:00
parent 940ba9bd0e
commit 53638a8384
17 changed files with 446 additions and 272 deletions

View File

@ -21,6 +21,8 @@
#include "db_high_priority_service.hpp" #include "db_high_priority_service.hpp"
#include "db_client.hpp" #include "db_client.hpp"
#include <cassert>
db::client_service::client_service(db::client& client) db::client_service::client_service(db::client& client)
: wsrep::client_service() : wsrep::client_service()
, client_(client) , client_(client)

View File

@ -20,6 +20,8 @@
#include "db_storage_engine.hpp" #include "db_storage_engine.hpp"
#include "db_client.hpp" #include "db_client.hpp"
#include <cassert>
void db::storage_engine::transaction::start(db::client* cc) void db::storage_engine::transaction::start(db::client* cc)
{ {
wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_); wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_);

View File

@ -163,10 +163,7 @@ namespace wsrep
/** /**
* Destructor. * Destructor.
*/ */
virtual ~client_state() virtual ~client_state();
{
assert(transaction_.active() == false);
}
/** @name Client session handling */ /** @name Client session handling */
/** @{ */ /** @{ */
@ -298,11 +295,7 @@ namespace wsrep
* *
* @param err Applying error (empty for no error) * @param err Applying error (empty for no error)
*/ */
void after_applying() void after_applying();
{
assert(mode_ == m_high_priority);
transaction_.after_applying();
}
/** @name Replication interface */ /** @name Replication interface */
/** @{ */ /** @{ */
@ -313,12 +306,7 @@ namespace wsrep
* - Register the transaction on server level for bookkeeping * - Register the transaction on server level for bookkeeping
* - Isolation levels? Or part of the transaction? * - Isolation levels? Or part of the transaction?
*/ */
int start_transaction(const wsrep::transaction_id& id) int start_transaction(const wsrep::transaction_id& id);
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(state_ == s_exec);
return transaction_.start_transaction(id);
}
/** /**
* Establish read view ID of the transaction. * Establish read view ID of the transaction.
@ -332,12 +320,7 @@ namespace wsrep
* *
* @param gtid optional explicit GTID of the transaction read view. * @param gtid optional explicit GTID of the transaction read view.
*/ */
int assign_read_view(const wsrep::gtid* const gtid = NULL) int assign_read_view(const wsrep::gtid* const gtid = NULL);
{
assert(mode_ == m_local);
assert(state_ == s_exec);
return transaction_.assign_read_view(gtid);
}
/** /**
* Append a key into transaction write set. * Append a key into transaction write set.
@ -346,12 +329,7 @@ namespace wsrep
* *
* @return Zero on success, non-zero on failure. * @return Zero on success, non-zero on failure.
*/ */
int append_key(const wsrep::key& key) int append_key(const wsrep::key& key);
{
assert(mode_ == m_local);
assert(state_ == s_exec);
return transaction_.append_key(key);
}
/** /**
* Append keys in key_array into transaction write set. * Append keys in key_array into transaction write set.
@ -360,29 +338,12 @@ namespace wsrep
* *
* @return Zero in case of success, non-zero on failure. * @return Zero in case of success, non-zero on failure.
*/ */
int append_keys(const wsrep::key_array& keys) int append_keys(const wsrep::key_array& keys);
{
assert(mode_ == m_local || mode_ == m_toi);
assert(state_ == s_exec);
for (auto i(keys.begin()); i != keys.end(); ++i)
{
if (transaction_.append_key(*i))
{
return 1;
}
}
return 0;
}
/** /**
* Append data into transaction write set. * Append data into transaction write set.
*/ */
int append_data(const wsrep::const_buffer& data) int append_data(const wsrep::const_buffer& data);
{
assert(mode_ == m_local);
assert(state_ == s_exec);
return transaction_.append_data(data);
}
/** @} */ /** @} */
@ -391,13 +352,7 @@ namespace wsrep
/** /**
* This method should be called after every row operation. * This method should be called after every row operation.
*/ */
int after_row() int after_row();
{
assert(mode_ == m_local);
assert(state_ == s_exec);
return (transaction_.streaming_context().fragment_size() ?
transaction_.after_row() : 0);
}
/** /**
* Set streaming parameters. * Set streaming parameters.
@ -431,12 +386,7 @@ namespace wsrep
*/ */
void disable_streaming(); void disable_streaming();
void fragment_applied(wsrep::seqno seqno) void fragment_applied(wsrep::seqno seqno);
{
assert(mode_ == m_high_priority);
transaction_.fragment_applied(seqno);
}
/** /**
* Prepare write set meta data for ordering. * Prepare write set meta data for ordering.
* This method should be called before ordered commit or * This method should be called before ordered commit or
@ -451,90 +401,31 @@ namespace wsrep
*/ */
int prepare_for_ordering(const wsrep::ws_handle& ws_handle, int prepare_for_ordering(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta, const wsrep::ws_meta& ws_meta,
bool is_commit) bool is_commit);
{
assert(state_ == s_exec);
return transaction_.prepare_for_ordering(
ws_handle, ws_meta, is_commit);
}
/** @} */ /** @} */
/** @name Applying interface */ /** @name Applying interface */
/** @{ */ /** @{ */
int start_transaction(const wsrep::ws_handle& wsh, int start_transaction(const wsrep::ws_handle& wsh,
const wsrep::ws_meta& meta) const wsrep::ws_meta& meta);
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(mode_ == m_high_priority);
return transaction_.start_transaction(wsh, meta);
}
int next_fragment(const wsrep::ws_meta& meta) int next_fragment(const wsrep::ws_meta& meta);
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(mode_ == m_high_priority);
return transaction_.next_fragment(meta);
}
/** @name Commit ordering interface */ /** @name Commit ordering interface */
/** @{ */ /** @{ */
int before_prepare() int before_prepare();
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_exec);
return transaction_.before_prepare(lock);
}
int after_prepare() int after_prepare();
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_exec);
return transaction_.after_prepare(lock);
}
int before_commit() int before_commit();
{
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_exec || mode_ == m_local);
return transaction_.before_commit();
}
int ordered_commit() int ordered_commit();
{
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_exec || mode_ == m_local);
return transaction_.ordered_commit();
}
int after_commit() int after_commit();
{
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_exec || mode_ == m_local);
return transaction_.after_commit();
}
/** @} */ /** @} */
int before_rollback() int before_rollback();
{
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_idle ||
state_ == s_exec ||
state_ == s_result ||
state_ == s_quitting);
return transaction_.before_rollback();
}
int after_rollback() int after_rollback();
{
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_idle ||
state_ == s_exec ||
state_ == s_result ||
state_ == s_quitting);
return transaction_.after_rollback();
}
/** /**
* This method should be called by the background rollbacker * This method should be called by the background rollbacker
@ -629,12 +520,7 @@ namespace wsrep
* After this call, a different client may later attempt to terminate * After this call, a different client may later attempt to terminate
* the transaction by calling method commit_by_xid() or rollback_by_xid(). * the transaction by calling method commit_by_xid() or rollback_by_xid().
*/ */
void xa_detach() void xa_detach();
{
assert(mode_ == m_local);
assert(state_ == s_none || state_ == s_exec || state_ == s_quitting);
transaction_.xa_detach();
}
/** /**
* Replay a XA transaction * Replay a XA transaction
@ -645,13 +531,7 @@ namespace wsrep
* Since the victim is idle, this method can be called * Since the victim is idle, this method can be called
* by the BF aborter or the backround rollbacker. * by the BF aborter or the backround rollbacker.
*/ */
void xa_replay() void xa_replay();
{
assert(mode_ == m_local);
assert(state_ == s_idle);
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
transaction_.xa_replay(lock);
}
// //
// BF aborting // BF aborting
@ -661,23 +541,14 @@ namespace wsrep
* called by a transaction which needs to BF abort a conflicting * called by a transaction which needs to BF abort a conflicting
* locally processing transaction. * locally processing transaction.
*/ */
int bf_abort(wsrep::seqno bf_seqno) int bf_abort(wsrep::seqno bf_seqno);
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(mode_ == m_local || transaction_.is_streaming());
return transaction_.bf_abort(lock, bf_seqno);
}
/** /**
* Brute force abort a transaction in total order. This method * Brute force abort a transaction in total order. This method
* should be called by the TOI operation which needs to * should be called by the TOI operation which needs to
* BF abort a transaction. * BF abort a transaction.
*/ */
int total_order_bf_abort(wsrep::seqno bf_seqno) int total_order_bf_abort(wsrep::seqno bf_seqno);
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(mode_ == m_local || transaction_.is_streaming());
return transaction_.total_order_bf_abort(lock, bf_seqno);
}
/** /**
* Adopt a streaming transaction state. This is must be * Adopt a streaming transaction state. This is must be
@ -686,20 +557,12 @@ namespace wsrep
* set up enough context for handling the rollback * set up enough context for handling the rollback
* fragment. * fragment.
*/ */
void adopt_transaction(const wsrep::transaction& transaction) void adopt_transaction(const wsrep::transaction& transaction);
{
assert(mode_ == m_high_priority);
transaction_.adopt(transaction);
}
/** /**
* Adopt (store) transaction applying error for further processing. * Adopt (store) transaction applying error for further processing.
*/ */
void adopt_apply_error(wsrep::mutable_buffer& err) void adopt_apply_error(wsrep::mutable_buffer& err);
{
assert(mode_ == m_high_priority);
transaction_.adopt_apply_error(err);
}
/** /**
* Clone enough state from another transaction so that replaing will * Clone enough state from another transaction so that replaing will
@ -710,7 +573,6 @@ namespace wsrep
*/ */
void clone_transaction_for_replay(const wsrep::transaction& transaction) void clone_transaction_for_replay(const wsrep::transaction& transaction)
{ {
// assert(mode_ == m_high_priority);
transaction_.clone_for_replay(transaction); transaction_.clone_for_replay(transaction);
} }
@ -1186,19 +1048,8 @@ namespace wsrep
class high_priority_context class high_priority_context
{ {
public: public:
high_priority_context(wsrep::client_state& client) high_priority_context(wsrep::client_state& client);
: client_(client) virtual ~high_priority_context();
, orig_mode_(client.mode_)
{
wsrep::unique_lock<wsrep::mutex> lock(client.mutex_);
client.mode(lock, wsrep::client_state::m_high_priority);
}
virtual ~high_priority_context()
{
wsrep::unique_lock<wsrep::mutex> lock(client_.mutex_);
assert(client_.mode() == wsrep::client_state::m_high_priority);
client_.mode(lock, orig_mode_);
}
private: private:
wsrep::client_state& client_; wsrep::client_state& client_;
enum wsrep::client_state::mode orig_mode_; enum wsrep::client_state::mode orig_mode_;

View File

@ -27,7 +27,6 @@
#include "transaction_id.hpp" #include "transaction_id.hpp"
#include "compiler.hpp" #include "compiler.hpp"
#include <cassert>
#include <cstring> #include <cstring>
#include <string> #include <string>

View File

@ -554,11 +554,7 @@ namespace wsrep
} }
enum state state(wsrep::unique_lock<wsrep::mutex>& enum state state(wsrep::unique_lock<wsrep::mutex>&
lock WSREP_UNUSED) const lock WSREP_UNUSED) const;
{
assert(lock.owns_lock());
return state_;
}
/** /**
* Get provider status variables. * Get provider status variables.

View File

@ -22,9 +22,11 @@
#include <set> #include <set>
#include <map> #include <map>
#include <string>
namespace wsrep namespace wsrep
{ {
class key;
class sr_key_set class sr_key_set
{ {
public: public:
@ -33,24 +35,9 @@ namespace wsrep
sr_key_set() sr_key_set()
: root_() : root_()
{ } { }
void insert(const wsrep::key& key);
void insert(const wsrep::key& key)
{
assert(key.size() >= 2);
if (key.size() < 2)
{
throw wsrep::runtime_error("Invalid key size");
}
root_[std::string(
static_cast<const char*>(key.key_parts()[0].data()),
key.key_parts()[0].size())].insert(
std::string(static_cast<const char*>(key.key_parts()[1].data()),
key.key_parts()[1].size()));
}
const branch_type& root() const { return root_; } const branch_type& root() const { return root_; }
void clear() { root_.clear(); } void clear();
bool empty() const { return root_.empty(); } bool empty() const { return root_.empty(); }
private: private:
branch_type root_; branch_type root_;

View File

@ -29,6 +29,7 @@
namespace wsrep namespace wsrep
{ {
/* Helper class to store streaming transaction context. */
class streaming_context class streaming_context
{ {
public: public:
@ -58,144 +59,113 @@ namespace wsrep
* @param fragment_unit Desired fragment unit. * @param fragment_unit Desired fragment unit.
* @param fragment_size Desired fragment size. * @param fragment_size Desired fragment size.
*/ */
void params(enum fragment_unit fragment_unit, size_t fragment_size) void params(enum fragment_unit fragment_unit, size_t fragment_size);
{
if (fragment_size)
{
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
wsrep::log::debug_level_streaming,
"Enabling streaming: "
<< fragment_unit << " " << fragment_size);
}
else
{
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
wsrep::log::debug_level_streaming,
"Disabling streaming");
}
fragment_unit_ = fragment_unit;
fragment_size_ = fragment_size;
reset_unit_counter();
}
void enable(enum fragment_unit fragment_unit, size_t fragment_size) /**
{ * Enable streaming replication.
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), *
wsrep::log::debug_level_streaming, * @param fragment_unit Desired fragment unit.
"Enabling streaming: " * @param fragment_size Desired fragment size.
<< fragment_unit << " " << fragment_size); */
assert(fragment_size > 0); void enable(enum fragment_unit fragment_unit, size_t fragment_size);
fragment_unit_ = fragment_unit;
fragment_size_ = fragment_size;
}
/** Return current fragment unit. */
enum fragment_unit fragment_unit() const { return fragment_unit_; } enum fragment_unit fragment_unit() const { return fragment_unit_; }
/** Return current fragment size. */
size_t fragment_size() const { return fragment_size_; } size_t fragment_size() const { return fragment_size_; }
void disable() /** Disable streaming replication. */
{ void disable();
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
wsrep::log::debug_level_streaming,
"Disabling streaming");
fragment_size_ = 0;
}
/** Increment counter for certified fragments. */
void certified() void certified()
{ {
++fragments_certified_; ++fragments_certified_;
} }
/** Return number of certified fragments. */
size_t fragments_certified() const size_t fragments_certified() const
{ {
return fragments_certified_; return fragments_certified_;
} }
void stored(wsrep::seqno seqno) /** Mark fragment with seqno as stored in fragment store. */
{ void stored(wsrep::seqno seqno);
check_fragment_seqno(seqno);
fragments_.push_back(seqno);
}
/** Return number of stored fragments. */
size_t fragments_stored() const size_t fragments_stored() const
{ {
return fragments_.size(); return fragments_.size();
} }
void applied(wsrep::seqno seqno) /** Mark fragment with seqno as applied. */
{ void applied(wsrep::seqno seqno);
check_fragment_seqno(seqno);
++fragments_certified_;
fragments_.push_back(seqno);
}
void rolled_back(wsrep::transaction_id id) /** Mark streaming transaction as rolled back. */
{ void rolled_back(wsrep::transaction_id id);
assert(rollback_replicated_for_ == wsrep::transaction_id::undefined());
rollback_replicated_for_ = id;
}
/** Return true if streaming transaction has been marked
* as rolled back. */
bool rolled_back() const bool rolled_back() const
{ {
return (rollback_replicated_for_ != return (rollback_replicated_for_ !=
wsrep::transaction_id::undefined()); wsrep::transaction_id::undefined());
} }
/** Return current value of unit counter. */
size_t unit_counter() const size_t unit_counter() const
{ {
return unit_counter_; return unit_counter_;
} }
/** Set value for unit counter. */
void set_unit_counter(size_t count) void set_unit_counter(size_t count)
{ {
unit_counter_ = count; unit_counter_ = count;
} }
/** Increment unit counter by inc. */
void increment_unit_counter(size_t inc) void increment_unit_counter(size_t inc)
{ {
unit_counter_ += inc; unit_counter_ += inc;
} }
/** Reset unit counter to zero. */
void reset_unit_counter() void reset_unit_counter()
{ {
unit_counter_ = 0; unit_counter_ = 0;
} }
/** Return current log position. */
size_t log_position() const size_t log_position() const
{ {
return log_position_; return log_position_;
} }
/** Set log position. */
void set_log_position(size_t position) void set_log_position(size_t position)
{ {
log_position_ = position; log_position_ = position;
} }
/** Return vector of stored fragments. */
const std::vector<wsrep::seqno>& fragments() const const std::vector<wsrep::seqno>& fragments() const
{ {
return fragments_; return fragments_;
} }
/** Return true if the fragment size was exceeded. */
bool fragment_size_exceeded() const bool fragment_size_exceeded() const
{ {
return unit_counter_ >= fragment_size_; return unit_counter_ >= fragment_size_;
} }
void cleanup() /** Clean up the streaming transaction state. */
{ void cleanup();
fragments_certified_ = 0;
fragments_.clear();
rollback_replicated_for_ = wsrep::transaction_id::undefined();
unit_counter_ = 0;
log_position_ = 0;
}
private: private:
void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED) void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED);
{
assert(seqno.is_undefined() == false);
assert(fragments_.empty() || fragments_.back() < seqno);
}
size_t fragments_certified_; size_t fragments_certified_;
std::vector<wsrep::seqno> fragments_; std::vector<wsrep::seqno> fragments_;

View File

@ -29,7 +29,6 @@
#include "buffer.hpp" #include "buffer.hpp"
#include "xid.hpp" #include "xid.hpp"
#include <cassert>
#include <vector> #include <vector>
namespace wsrep namespace wsrep

View File

@ -3,26 +3,29 @@
# #
add_library(wsrep-lib add_library(wsrep-lib
allowlist_service_v1.cpp
client_state.cpp client_state.cpp
config_service_v1.cpp
event_service_v1.cpp
exception.cpp exception.cpp
gtid.cpp gtid.cpp
id.cpp id.cpp
xid.cpp
key.cpp key.cpp
logger.cpp logger.cpp
provider.cpp provider.cpp
provider_options.cpp provider_options.cpp
reporter.cpp
seqno.cpp seqno.cpp
view.cpp
server_state.cpp server_state.cpp
sr_key_set.cpp
streaming_context.cpp
thread.cpp thread.cpp
thread_service_v1.cpp thread_service_v1.cpp
tls_service_v1.cpp tls_service_v1.cpp
event_service_v1.cpp
transaction.cpp transaction.cpp
uuid.cpp uuid.cpp
reporter.cpp view.cpp
allowlist_service_v1.cpp
wsrep_provider_v26.cpp wsrep_provider_v26.cpp
config_service_v1.cpp) xid.cpp
)
target_link_libraries(wsrep-lib wsrep_api_v26 pthread ${WSREP_LIB_LIBDL}) target_link_libraries(wsrep-lib wsrep_api_v26 pthread ${WSREP_LIB_LIBDL})

View File

@ -25,9 +25,15 @@
#include "wsrep/client_service.hpp" #include "wsrep/client_service.hpp"
#include <unistd.h> // usleep() #include <unistd.h> // usleep()
#include <cassert>
#include <sstream> #include <sstream>
#include <iostream> #include <iostream>
wsrep::client_state::~client_state()
{
assert(transaction_.active() == false);
}
wsrep::provider& wsrep::client_state::provider() const wsrep::provider& wsrep::client_state::provider() const
{ {
return server_state_.provider(); return server_state_.provider();
@ -296,6 +302,146 @@ int wsrep::client_state::after_statement()
return 0; return 0;
} }
void wsrep::client_state::after_applying()
{
assert(mode_ == m_high_priority);
transaction_.after_applying();
}
int wsrep::client_state::start_transaction(const wsrep::transaction_id& id)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(state_ == s_exec);
return transaction_.start_transaction(id);
}
int wsrep::client_state::assign_read_view(const wsrep::gtid* const gtid)
{
assert(mode_ == m_local);
assert(state_ == s_exec);
return transaction_.assign_read_view(gtid);
}
int wsrep::client_state::append_key(const wsrep::key& key)
{
assert(mode_ == m_local);
assert(state_ == s_exec);
return transaction_.append_key(key);
}
int wsrep::client_state::append_keys(const wsrep::key_array& keys)
{
assert(mode_ == m_local || mode_ == m_toi);
assert(state_ == s_exec);
for (auto i(keys.begin()); i != keys.end(); ++i)
{
if (transaction_.append_key(*i))
{
return 1;
}
}
return 0;
}
int wsrep::client_state::append_data(const wsrep::const_buffer& data)
{
assert(mode_ == m_local);
assert(state_ == s_exec);
return transaction_.append_data(data);
}
int wsrep::client_state::after_row()
{
assert(mode_ == m_local);
assert(state_ == s_exec);
return (transaction_.streaming_context().fragment_size()
? transaction_.after_row()
: 0);
}
void wsrep::client_state::fragment_applied(wsrep::seqno seqno)
{
assert(mode_ == m_high_priority);
transaction_.fragment_applied(seqno);
}
int wsrep::client_state::prepare_for_ordering(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
bool is_commit)
{
assert(state_ == s_exec);
return transaction_.prepare_for_ordering(ws_handle, ws_meta, is_commit);
}
int wsrep::client_state::start_transaction(const wsrep::ws_handle& wsh,
const wsrep::ws_meta& meta)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(mode_ == m_high_priority);
return transaction_.start_transaction(wsh, meta);
}
int wsrep::client_state::next_fragment(const wsrep::ws_meta& meta)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(mode_ == m_high_priority);
return transaction_.next_fragment(meta);
}
int wsrep::client_state::before_prepare()
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_exec);
return transaction_.before_prepare(lock);
}
int wsrep::client_state::after_prepare()
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_exec);
return transaction_.after_prepare(lock);
}
int wsrep::client_state::before_commit()
{
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_exec || mode_ == m_local);
return transaction_.before_commit();
}
int wsrep::client_state::ordered_commit()
{
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_exec || mode_ == m_local);
return transaction_.ordered_commit();
}
int wsrep::client_state::after_commit()
{
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_exec || mode_ == m_local);
return transaction_.after_commit();
}
int wsrep::client_state::before_rollback()
{
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_idle || state_ == s_exec || state_ == s_result
|| state_ == s_quitting);
return transaction_.before_rollback();
}
int wsrep::client_state::after_rollback()
{
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_idle || state_ == s_exec || state_ == s_result
|| state_ == s_quitting);
return transaction_.after_rollback();
}
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// Rollbacker synchronization // // Rollbacker synchronization //
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -361,6 +507,56 @@ void wsrep::client_state::disable_streaming()
transaction_.streaming_context().disable(); transaction_.streaming_context().disable();
} }
//////////////////////////////////////////////////////////////////////////////
// XA //
//////////////////////////////////////////////////////////////////////////////
void wsrep::client_state::xa_detach()
{
assert(mode_ == m_local);
assert(state_ == s_none || state_ == s_exec || state_ == s_quitting);
transaction_.xa_detach();
}
void wsrep::client_state::xa_replay()
{
assert(mode_ == m_local);
assert(state_ == s_idle);
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
transaction_.xa_replay(lock);
}
//////////////////////////////////////////////////////////////////////////////
// BF //
//////////////////////////////////////////////////////////////////////////////
int wsrep::client_state::bf_abort(wsrep::seqno bf_seqno)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(mode_ == m_local || transaction_.is_streaming());
return transaction_.bf_abort(lock, bf_seqno);
}
int wsrep::client_state::total_order_bf_abort(wsrep::seqno bf_seqno)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(mode_ == m_local || transaction_.is_streaming());
return transaction_.total_order_bf_abort(lock, bf_seqno);
}
void wsrep::client_state::adopt_transaction(
const wsrep::transaction& transaction)
{
assert(mode_ == m_high_priority);
transaction_.adopt(transaction);
}
void wsrep::client_state::adopt_apply_error(wsrep::mutable_buffer& err)
{
assert(mode_ == m_high_priority);
transaction_.adopt_apply_error(err);
}
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// TOI // // TOI //
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -884,3 +1080,22 @@ void wsrep::client_state::mode(
} }
mode_ = mode; mode_ = mode;
} }
///////////////////////////////////////////////////////////////////////////////
// High Priority Context //
///////////////////////////////////////////////////////////////////////////////
wsrep::high_priority_context::high_priority_context(wsrep::client_state& client)
: client_(client)
, orig_mode_(client.mode_)
{
wsrep::unique_lock<wsrep::mutex> lock(client.mutex_);
client.mode(lock, wsrep::client_state::m_high_priority);
}
wsrep::high_priority_context::~high_priority_context()
{
wsrep::unique_lock<wsrep::mutex> lock(client_.mutex_);
assert(client_.mode() == wsrep::client_state::m_high_priority);
client_.mode(lock, orig_mode_);
}

View File

@ -23,6 +23,8 @@
#include "wsrep/logger.hpp" #include "wsrep/logger.hpp"
#include "wsrep/provider_options.hpp" #include "wsrep/provider_options.hpp"
#include <cassert>
namespace wsrep_config_service_v1 namespace wsrep_config_service_v1
{ {
wsrep_config_service_v1_t service{ 0 }; wsrep_config_service_v1_t service{ 0 };

View File

@ -23,6 +23,7 @@
#include "wsrep_provider_v26.hpp" #include "wsrep_provider_v26.hpp"
#include <dlfcn.h> #include <dlfcn.h>
#include <cassert>
#include <memory> #include <memory>
wsrep::provider* wsrep::provider::make_provider( wsrep::provider* wsrep::provider::make_provider(

View File

@ -23,6 +23,7 @@
#include <sstream> #include <sstream>
#include <iomanip> #include <iomanip>
#include <cassert>
#include <cstring> // strerror() #include <cstring> // strerror()
#include <cstdlib> // mkstemp() #include <cstdlib> // mkstemp()
#include <cerrno> // errno #include <cerrno> // errno

View File

@ -1132,6 +1132,13 @@ int wsrep::server_state::on_apply(
} }
} }
enum wsrep::server_state::state wsrep::server_state::state(
wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED) const
{
assert(lock.owns_lock());
return state_;
}
void wsrep::server_state::start_streaming_client( void wsrep::server_state::start_streaming_client(
wsrep::client_state* client_state) wsrep::client_state* client_state)
{ {

43
src/sr_key_set.cpp Normal file
View File

@ -0,0 +1,43 @@
/*
* Copyright (C) 2023 Codership Oy <info@codership.com>
*
* This file is part of wsrep-lib.
*
* Wsrep-lib is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* Wsrep-lib is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
*/
#include "wsrep/sr_key_set.hpp"
#include "wsrep/key.hpp"
#include <cassert>
void wsrep::sr_key_set::insert(const wsrep::key& key)
{
assert(key.size() >= 2);
if (key.size() < 2)
{
throw wsrep::runtime_error("Invalid key size");
}
root_[std::string(static_cast<const char*>(key.key_parts()[0].data()),
key.key_parts()[0].size())]
.insert(std::string(static_cast<const char*>(key.key_parts()[1].data()),
key.key_parts()[1].size()));
}
void wsrep::sr_key_set::clear()
{
root_.clear();
}

95
src/streaming_context.cpp Normal file
View File

@ -0,0 +1,95 @@
/*
* Copyright (C) 2023 Codership Oy <info@codership.com>
*
* This file is part of wsrep-lib.
*
* Wsrep-lib is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* Wsrep-lib is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
*/
#include "wsrep/streaming_context.hpp"
#include <cassert>
void wsrep::streaming_context::params(enum fragment_unit fragment_unit,
size_t fragment_size)
{
if (fragment_size)
{
WSREP_LOG_DEBUG(
wsrep::log::debug_log_level(), wsrep::log::debug_level_streaming,
"Enabling streaming: " << fragment_unit << " " << fragment_size);
}
else
{
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
wsrep::log::debug_level_streaming,
"Disabling streaming");
}
fragment_unit_ = fragment_unit;
fragment_size_ = fragment_size;
reset_unit_counter();
}
void wsrep::streaming_context::enable(enum fragment_unit fragment_unit,
size_t fragment_size)
{
WSREP_LOG_DEBUG(
wsrep::log::debug_log_level(), wsrep::log::debug_level_streaming,
"Enabling streaming: " << fragment_unit << " " << fragment_size);
assert(fragment_size > 0);
fragment_unit_ = fragment_unit;
fragment_size_ = fragment_size;
}
void wsrep::streaming_context::disable()
{
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
wsrep::log::debug_level_streaming, "Disabling streaming");
fragment_size_ = 0;
}
void wsrep::streaming_context::stored(wsrep::seqno seqno)
{
check_fragment_seqno(seqno);
fragments_.push_back(seqno);
}
void wsrep::streaming_context::applied(wsrep::seqno seqno)
{
check_fragment_seqno(seqno);
++fragments_certified_;
fragments_.push_back(seqno);
}
void wsrep::streaming_context::rolled_back(wsrep::transaction_id id)
{
assert(rollback_replicated_for_ == wsrep::transaction_id::undefined());
rollback_replicated_for_ = id;
}
void wsrep::streaming_context::cleanup()
{
fragments_certified_ = 0;
fragments_.clear();
rollback_replicated_for_ = wsrep::transaction_id::undefined();
unit_counter_ = 0;
log_position_ = 0;
}
void wsrep::streaming_context::check_fragment_seqno(
wsrep::seqno seqno WSREP_UNUSED)
{
assert(seqno.is_undefined() == false);
assert(fragments_.empty() || fragments_.back() < seqno);
}

View File

@ -28,6 +28,7 @@
#include "wsrep/server_service.hpp" #include "wsrep/server_service.hpp"
#include "wsrep/client_service.hpp" #include "wsrep/client_service.hpp"
#include <cassert>
#include <sstream> #include <sstream>
#include <memory> #include <memory>