mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-07-28 20:02:00 +03:00
Initial implementation of client_state TOI mode.
This commit is contained in:
@ -80,6 +80,7 @@ namespace wsrep
|
||||
m_toi
|
||||
};
|
||||
|
||||
static const int mode_max_ = m_toi + 1;
|
||||
/**
|
||||
* Client state enumeration.
|
||||
*
|
||||
@ -110,7 +111,7 @@ namespace wsrep
|
||||
s_quitting
|
||||
};
|
||||
|
||||
const static int state_max_ = s_quitting + 1;
|
||||
static const int state_max_ = s_quitting + 1;
|
||||
|
||||
/**
|
||||
* Store variables related to global execution context.
|
||||
@ -398,8 +399,16 @@ namespace wsrep
|
||||
|
||||
/**
|
||||
* Enter total order isolation critical section.
|
||||
* @param key_array Array of keys
|
||||
* @param buffer Buffer containing the action to execute inside
|
||||
* total order isolation section
|
||||
* @param flags Provider flags for TOI operation
|
||||
*
|
||||
* @return Zero on success, non-zero otherwise.
|
||||
*/
|
||||
int enter_toi(const wsrep::key_array&, const wsrep::const_buffer&);
|
||||
int enter_toi(const wsrep::key_array& key_array,
|
||||
const wsrep::const_buffer& buffer,
|
||||
int flags);
|
||||
|
||||
/**
|
||||
* Leave total order isolation critical section.
|
||||
@ -545,6 +554,7 @@ namespace wsrep
|
||||
, mode_(mode)
|
||||
, state_(s_none)
|
||||
, transaction_(*this)
|
||||
, toi_meta_()
|
||||
, allow_dirty_reads_()
|
||||
, debug_log_level_(0)
|
||||
, current_error_(wsrep::e_success)
|
||||
@ -561,6 +571,7 @@ namespace wsrep
|
||||
|
||||
void debug_log_state(const char*) const;
|
||||
void state(wsrep::unique_lock<wsrep::mutex>& lock, enum state state);
|
||||
void mode(wsrep::unique_lock<wsrep::mutex>& lock, enum mode mode);
|
||||
void override_error(enum wsrep::client_error error);
|
||||
|
||||
wsrep::thread::id owning_thread_id_;
|
||||
@ -572,6 +583,7 @@ namespace wsrep
|
||||
enum mode mode_;
|
||||
enum state state_;
|
||||
wsrep::transaction transaction_;
|
||||
wsrep::ws_meta toi_meta_;
|
||||
bool allow_dirty_reads_;
|
||||
int debug_log_level_;
|
||||
wsrep::client_error current_error_;
|
||||
@ -621,11 +633,14 @@ namespace wsrep
|
||||
: client_(client)
|
||||
, orig_mode_(client.mode_)
|
||||
{
|
||||
client_.mode_ = wsrep::client_state::m_high_priority;
|
||||
wsrep::unique_lock<wsrep::mutex> lock(client.mutex_);
|
||||
client.mode(lock, wsrep::client_state::m_high_priority);
|
||||
}
|
||||
virtual ~high_priority_context()
|
||||
{
|
||||
client_.mode_ = orig_mode_;
|
||||
wsrep::unique_lock<wsrep::mutex> lock(client_.mutex_);
|
||||
assert(client_.mode() == wsrep::client_state::m_high_priority);
|
||||
client_.mode(lock, orig_mode_);
|
||||
}
|
||||
private:
|
||||
wsrep::client_state& client_;
|
||||
@ -642,12 +657,14 @@ namespace wsrep
|
||||
: client_(client)
|
||||
, orig_mode_(client.mode_)
|
||||
{
|
||||
client_.mode_ = wsrep::client_state::m_toi;
|
||||
wsrep::unique_lock<wsrep::mutex> lock(client.mutex_);
|
||||
client.mode(lock, wsrep::client_state::m_toi);
|
||||
}
|
||||
~client_toi_mode()
|
||||
{
|
||||
wsrep::unique_lock<wsrep::mutex> lock(client_.mutex_);
|
||||
assert(client_.mode() == wsrep::client_state::m_toi);
|
||||
client_.mode_ = orig_mode_;
|
||||
client_.mode(lock, orig_mode_);
|
||||
}
|
||||
private:
|
||||
wsrep::client_state& client_;
|
||||
|
@ -277,6 +277,19 @@ namespace wsrep
|
||||
virtual enum status replay(
|
||||
const wsrep::ws_handle& ws_handle, void* applier_ctx) = 0;
|
||||
|
||||
/**
|
||||
* Enter total order isolation critical section
|
||||
*/
|
||||
virtual enum status enter_toi(wsrep::client_id,
|
||||
const wsrep::key_array& keys,
|
||||
const wsrep::const_buffer& buffer,
|
||||
wsrep::ws_meta& ws_meta,
|
||||
int flags) = 0;
|
||||
/**
|
||||
* Leave total order isolation critical section
|
||||
*/
|
||||
virtual enum status leave_toi(wsrep::client_id) = 0;
|
||||
|
||||
/**
|
||||
* Perform a causal read on cluster.
|
||||
*
|
||||
@ -284,7 +297,7 @@ namespace wsrep
|
||||
*
|
||||
* @return Provider status indicating the result of the call.
|
||||
*/
|
||||
virtual enum wsrep::provider::status causal_read(int timeout) const = 0;
|
||||
virtual enum status causal_read(int timeout) const = 0;
|
||||
virtual int sst_sent(const wsrep::gtid&, int) = 0;
|
||||
virtual int sst_received(const wsrep::gtid&, int) = 0;
|
||||
|
||||
|
@ -231,6 +231,47 @@ int wsrep::client_state::enable_streaming(
|
||||
return 0;
|
||||
}
|
||||
|
||||
int wsrep::client_state::enter_toi(const wsrep::key_array& keys,
|
||||
const wsrep::const_buffer& buffer,
|
||||
int flags)
|
||||
{
|
||||
assert(state_ == s_exec);
|
||||
int ret;
|
||||
switch (provider().enter_toi(id_, keys, buffer, toi_meta_, flags))
|
||||
{
|
||||
case wsrep::provider::success:
|
||||
{
|
||||
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
||||
mode(lock, m_toi);
|
||||
ret = 0;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
override_error(wsrep::e_error_during_commit);
|
||||
ret = 1;
|
||||
break;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int wsrep::client_state::leave_toi()
|
||||
{
|
||||
int ret;
|
||||
switch (provider().leave_toi(id_))
|
||||
{
|
||||
case wsrep::provider::success:
|
||||
ret = 0;
|
||||
break;
|
||||
default:
|
||||
assert(0);
|
||||
override_error(wsrep::e_error_during_commit);
|
||||
ret = 1;
|
||||
break;
|
||||
}
|
||||
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
||||
mode(lock, m_replicating);
|
||||
return ret;
|
||||
}
|
||||
// Private
|
||||
|
||||
void wsrep::client_state::debug_log_state(const char* context) const
|
||||
@ -271,3 +312,29 @@ void wsrep::client_state::state(
|
||||
throw wsrep::runtime_error(os.str());
|
||||
}
|
||||
}
|
||||
|
||||
void wsrep::client_state::mode(
|
||||
wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
|
||||
enum mode mode)
|
||||
{
|
||||
assert(lock.owns_lock());
|
||||
static const char allowed[mode_max_][mode_max_] =
|
||||
{
|
||||
{ 0, 0, 0, 0}, /* local */
|
||||
{ 0, 0, 1, 1}, /* repl */
|
||||
{ 0, 1, 0, 0}, /* high prio */
|
||||
{ 0, 1, 0, 0} /* toi */
|
||||
};
|
||||
if (allowed[mode_][mode])
|
||||
{
|
||||
mode_ = mode;
|
||||
}
|
||||
else
|
||||
{
|
||||
std::ostringstream os;
|
||||
os << "client_state: Unallowed mode transition: "
|
||||
<< mode_ << " -> " << mode;
|
||||
throw wsrep::runtime_error(os.str());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -633,6 +633,50 @@ wsrep::wsrep_provider_v26::replay(const wsrep::ws_handle& ws_handle,
|
||||
wsrep_->replay_trx(wsrep_, mwsh.native(), applier_ctx));
|
||||
}
|
||||
|
||||
enum wsrep::provider::status
|
||||
wsrep::wsrep_provider_v26::enter_toi(
|
||||
wsrep::client_id client_id,
|
||||
const wsrep::key_array& keys,
|
||||
const wsrep::const_buffer& buffer,
|
||||
wsrep::ws_meta& ws_meta,
|
||||
int flags)
|
||||
{
|
||||
mutable_ws_meta mmeta(ws_meta, flags);
|
||||
std::vector<wsrep_buf_t> key_parts;
|
||||
size_t key_parts_offset(0);
|
||||
std::vector<wsrep_key_t> wsrep_keys;
|
||||
wsrep_buf_t wsrep_buf = {buffer.data(), buffer.size()};
|
||||
for (wsrep::key_array::const_iterator i(keys.begin());
|
||||
i != keys.end(); ++i)
|
||||
{
|
||||
for (size_t kp(0); kp < i->size(); ++kp)
|
||||
{
|
||||
wsrep_buf_t buf = {i->key_parts()[kp].data(),
|
||||
i->key_parts()[kp].size()};
|
||||
key_parts.push_back(buf);
|
||||
}
|
||||
wsrep_key_t key = {&key_parts[0] + key_parts_offset, i->size()};
|
||||
wsrep_keys.push_back(key);
|
||||
key_parts_offset += i->size();
|
||||
}
|
||||
return map_return_value(wsrep_->to_execute_start(
|
||||
wsrep_,
|
||||
client_id.get(),
|
||||
&wsrep_keys[0],
|
||||
wsrep_keys.size(),
|
||||
&wsrep_buf,
|
||||
1,
|
||||
mmeta.native_flags(),
|
||||
mmeta.native()));
|
||||
}
|
||||
|
||||
enum wsrep::provider::status
|
||||
wsrep::wsrep_provider_v26::leave_toi(wsrep::client_id client_id)
|
||||
{
|
||||
return map_return_value(wsrep_->to_execute_end(
|
||||
wsrep_, client_id.get(), 0));
|
||||
}
|
||||
|
||||
enum wsrep::provider::status
|
||||
wsrep::wsrep_provider_v26::causal_read(int timeout) const
|
||||
{
|
||||
|
@ -49,7 +49,12 @@ namespace wsrep
|
||||
const wsrep::ws_meta&);
|
||||
int release(wsrep::ws_handle&);
|
||||
enum wsrep::provider::status replay(const wsrep::ws_handle&, void*);
|
||||
|
||||
enum wsrep::provider::status enter_toi(wsrep::client_id,
|
||||
const wsrep::key_array&,
|
||||
const wsrep::const_buffer&,
|
||||
wsrep::ws_meta&,
|
||||
int);
|
||||
enum wsrep::provider::status leave_toi(wsrep::client_id);
|
||||
enum wsrep::provider::status causal_read(int) const;
|
||||
|
||||
int sst_sent(const wsrep::gtid&,int);
|
||||
|
@ -198,6 +198,15 @@ namespace wsrep
|
||||
return wsrep::provider::success;
|
||||
}
|
||||
|
||||
enum wsrep::provider::status enter_toi(wsrep::client_id,
|
||||
const wsrep::key_array&,
|
||||
const wsrep::const_buffer&,
|
||||
wsrep::ws_meta&,
|
||||
int)
|
||||
{ return wsrep::provider::success; }
|
||||
enum wsrep::provider::status leave_toi(wsrep::client_id)
|
||||
{ return wsrep::provider::success; }
|
||||
|
||||
enum wsrep::provider::status causal_read(int) const WSREP_OVERRIDE
|
||||
{
|
||||
return wsrep::provider::success;
|
||||
|
Reference in New Issue
Block a user