1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Check local sequential consistency in dbsim

- Release commit time critical section in callback
- Check the consistency inside commit order critical section

Other: Add 2pc switch to dbsim
This commit is contained in:
Teemu Ollakka
2024-11-28 12:33:21 +02:00
parent 3de594b662
commit 7994288534
8 changed files with 116 additions and 43 deletions

View File

@ -91,6 +91,15 @@ int db::client::client_command(F f)
return err; return err;
} }
static void release_commit_critical_section(void* ptr)
{
auto* crit = static_cast<db::server::commit_critical_section*>(ptr);
if (crit->lock.owns_lock())
{
crit->lock.unlock();
}
}
void db::client::run_one_transaction() void db::client::run_one_transaction()
{ {
if (params_.sync_wait) if (params_.sync_wait)
@ -145,15 +154,35 @@ void db::client::run_one_transaction()
err = err || client_command( err = err || client_command(
[&]() [&]()
{ {
// wsrep::log_debug() << "Commit"; auto commit_crit = server_.get_commit_critical_section();
if (not params_.check_sequential_consistency) {
commit_crit.lock.unlock();
}
client_state_.append_data({&commit_crit.commit_seqno,
sizeof(commit_crit.commit_seqno)});
wsrep::provider::seq_cb seq_cb {
&commit_crit,
release_commit_critical_section
};
assert(err == 0); assert(err == 0);
if (do_2pc()) if (params_.do_2pc)
{ {
err = err || client_state_.before_prepare(); err = err || client_state_.before_prepare(&seq_cb);
err = err || client_state_.after_prepare(); err = err || client_state_.after_prepare();
} }
err = err || client_state_.before_commit(); err = err || client_state_.before_commit(&seq_cb);
if (err == 0) se_trx_.commit(transaction.ws_meta().gtid()); if (err == 0)
{
se_trx_.commit(transaction.ws_meta().gtid());
if (params_.check_sequential_consistency)
{
server_.check_sequential_consistency(
client_state_.id(), commit_crit.commit_seqno);
}
}
err = err || client_state_.ordered_commit(); err = err || client_state_.ordered_commit();
err = err || client_state_.after_commit(); err = err || client_state_.after_commit();
if (err) if (err)

View File

@ -62,7 +62,6 @@ namespace db
void start(); void start();
wsrep::client_state& client_state() { return client_state_; } wsrep::client_state& client_state() { return client_state_; }
wsrep::client_service& client_service() { return client_service_; } wsrep::client_service& client_service() { return client_service_; }
bool do_2pc() const { return false; }
private: private:
friend class db::server_state; friend class db::server_state;
friend class db::client_service; friend class db::client_service;

View File

@ -26,6 +26,7 @@ db::high_priority_service::high_priority_service(
: wsrep::high_priority_service(server.server_state()) : wsrep::high_priority_service(server.server_state())
, server_(server) , server_(server)
, client_(client) , client_(client)
, commit_seqno_()
{ } { }
int db::high_priority_service::start_transaction( int db::high_priority_service::start_transaction(
@ -52,11 +53,14 @@ int db::high_priority_service::adopt_transaction(const wsrep::transaction&)
int db::high_priority_service::apply_write_set( int db::high_priority_service::apply_write_set(
const wsrep::ws_meta&, const wsrep::ws_meta&,
const wsrep::const_buffer&, const wsrep::const_buffer& buf,
wsrep::mutable_buffer&) wsrep::mutable_buffer&)
{ {
client_.se_trx_.start(&client_); client_.se_trx_.start(&client_);
client_.se_trx_.apply(client_.client_state().transaction()); client_.se_trx_.apply(client_.client_state().transaction());
assert(buf.size() > sizeof(uint64_t));
::memcpy(&commit_seqno_, buf.data() + buf.size() - sizeof(uint64_t),
sizeof(uint64_t));
return 0; return 0;
} }
@ -82,6 +86,14 @@ int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle,
client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, true); client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, true);
int ret(client_.client_state_.before_commit()); int ret(client_.client_state_.before_commit());
if (ret == 0) client_.se_trx_.commit(ws_meta.gtid()); if (ret == 0) client_.se_trx_.commit(ws_meta.gtid());
/* Local client session replaying. */
if (ws_meta.server_id() == server_.server_state().id()
&& client_.params_.check_sequential_consistency)
{
server_.check_sequential_consistency(ws_meta.client_id(),
commit_seqno_);
}
ret = ret || client_.client_state_.ordered_commit(); ret = ret || client_.client_state_.ordered_commit();
ret = ret || client_.client_state_.after_commit(); ret = ret || client_.client_state_.after_commit();
return ret; return ret;

View File

@ -69,6 +69,7 @@ namespace db
high_priority_service& operator=(const high_priority_service&); high_priority_service& operator=(const high_priority_service&);
db::server& server_; db::server& server_;
db::client& client_; db::client& client_;
uint64_t commit_seqno_;
}; };
class replayer_service : public db::high_priority_service class replayer_service : public db::high_priority_service

View File

@ -95,6 +95,12 @@ db::params db::parse_args(int argc, char** argv)
"Configure TLS service stubs.\n0 default disabled\n1 enabled\n" "Configure TLS service stubs.\n0 default disabled\n1 enabled\n"
"2 enabled with short read/write and renegotiation simulation\n" "2 enabled with short read/write and renegotiation simulation\n"
"3 enabled with error simulation.") "3 enabled with error simulation.")
("check-sequential-consistency",
po::value<bool>(&params.check_sequential_consistency),
"Check if the provider provides sequential consistency")
("do-2pc",
po::value<bool>(&params.do_2pc),
"Run commits in 2pc")
; ;
try try
{ {

View File

@ -27,42 +27,27 @@ namespace db
{ {
struct params struct params
{ {
size_t n_servers; size_t n_servers{0};
size_t n_clients; size_t n_clients{0};
size_t n_transactions; size_t n_transactions{0};
size_t n_rows; size_t n_rows{1000};
size_t max_data_size; // Maximum size of write set data payload. size_t max_data_size{8}; // Maximum size of write set data payload.
bool random_data_size; // If true, randomize data payload size. bool random_data_size{false}; // If true, randomize data payload size.
size_t alg_freq; /* Asymmetric lock granularity frequency. */
bool sync_wait; size_t alg_freq{0};
std::string topology; /* Whether to sync wait before start of transaction. */
std::string wsrep_provider; bool sync_wait{false};
std::string wsrep_provider_options; std::string topology{};
std::string status_file; std::string wsrep_provider{};
int debug_log_level; std::string wsrep_provider_options{};
int fast_exit; std::string status_file{"status.json"};
int thread_instrumentation; int debug_log_level{0};
bool cond_checks; int fast_exit{0};
int tls_service; int thread_instrumentation{0};
params() bool cond_checks{false};
: n_servers(0) int tls_service{0};
, n_clients(0) bool check_sequential_consistency{false};
, n_transactions(0) bool do_2pc{false};
, n_rows(1000)
, max_data_size(8)
, random_data_size(false)
, alg_freq(0)
, sync_wait(false)
, topology()
, wsrep_provider()
, wsrep_provider_options()
, status_file("status.json")
, debug_log_level(0)
, fast_exit(0)
, thread_instrumentation()
, cond_checks()
, tls_service()
{ }
}; };
params parse_args(int argc, char** argv); params parse_args(int argc, char** argv);

View File

@ -68,6 +68,9 @@ db::server::server(simulator& simulator,
, appliers_() , appliers_()
, clients_() , clients_()
, client_threads_() , client_threads_()
, commit_mutex_()
, next_commit_seqno_()
, committed_seqno_()
{ {
wsrep::log::logger_fn(logger_fn); wsrep::log::logger_fn(logger_fn);
} }
@ -165,3 +168,16 @@ void db::server::log_state_change(enum wsrep::server_state::state from,
wsrep::log_info() << "State changed " << from << " -> " << to; wsrep::log_info() << "State changed " << from << " -> " << to;
reporter_.report_state(to); reporter_.report_state(to);
} }
void db::server::check_sequential_consistency(wsrep::client_id client_id,
uint64_t commit_seqno)
{
if (committed_seqno_ >= commit_seqno)
{
wsrep::log_error() << "Sequentiality violation for " << client_id
<< " commit seqno " << commit_seqno << " previous "
<< committed_seqno_;
::abort();
}
committed_seqno_ = commit_seqno;
}

View File

@ -61,6 +61,27 @@ namespace db
wsrep::high_priority_service* streaming_applier_service(); wsrep::high_priority_service* streaming_applier_service();
void log_state_change(enum wsrep::server_state::state, void log_state_change(enum wsrep::server_state::state,
enum wsrep::server_state::state); enum wsrep::server_state::state);
/* Sequential consistency checks */
struct commit_critical_section
{
wsrep::unique_lock<wsrep::default_mutex> lock;
uint64_t commit_seqno;
commit_critical_section(wsrep::default_mutex& mutex,
uint64_t& next_commit_seqno)
: lock{ mutex }
, commit_seqno{ ++next_commit_seqno }
{
}
commit_critical_section(commit_critical_section&&) = default;
};
commit_critical_section get_commit_critical_section() {
return { commit_mutex_, next_commit_seqno_ };
}
/* Check that commits remain sequential according commit_seqno.
* This method must be called inside commit order critical section. */
void check_sequential_consistency(wsrep::client_id client_id,
uint64_t commit_seqno);
private: private:
void start_client(size_t id); void start_client(size_t id);
@ -76,6 +97,10 @@ namespace db
std::vector<boost::thread> appliers_; std::vector<boost::thread> appliers_;
std::vector<std::shared_ptr<db::client>> clients_; std::vector<std::shared_ptr<db::client>> clients_;
std::vector<boost::thread> client_threads_; std::vector<boost::thread> client_threads_;
wsrep::default_mutex commit_mutex_;
uint64_t next_commit_seqno_;
uint64_t committed_seqno_;
}; };
} }