mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-06-13 04:01:32 +03:00
Initial RSU implementation.
This commit is contained in:
@ -75,6 +75,11 @@ void db::server_service::log_view(const wsrep::view&)
|
||||
wsrep::log_info() << "View";
|
||||
}
|
||||
|
||||
int db::server_service::wait_committing_transactions(int)
|
||||
{
|
||||
throw wsrep::not_implemented_error();
|
||||
}
|
||||
|
||||
void db::server_service::debug_sync(const char*)
|
||||
{
|
||||
|
||||
|
@ -29,6 +29,7 @@ namespace db
|
||||
void log_dummy_write_set(wsrep::client_state&, const wsrep::ws_meta&)
|
||||
override;
|
||||
void log_view(const wsrep::view&) override;
|
||||
int wait_committing_transactions(int) override;
|
||||
void debug_sync(const char*) override;
|
||||
private:
|
||||
db::server& server_;
|
||||
|
@ -81,10 +81,12 @@ namespace wsrep
|
||||
/** High priority mode */
|
||||
m_high_priority,
|
||||
/** Client is in total order isolation mode */
|
||||
m_toi
|
||||
m_toi,
|
||||
/** Client is executing rolling schema upgrade */
|
||||
m_rsu
|
||||
};
|
||||
|
||||
static const int mode_max_ = m_toi + 1;
|
||||
static const int n_modes_ = m_rsu + 1;
|
||||
/**
|
||||
* Client state enumeration.
|
||||
*
|
||||
@ -443,6 +445,19 @@ namespace wsrep
|
||||
*/
|
||||
int leave_toi();
|
||||
|
||||
/**
|
||||
* Begin rolling schema upgrade operation.
|
||||
*
|
||||
* @param timeout Timeout in seconds to wait for committing
|
||||
* connections to finish.
|
||||
*/
|
||||
int begin_rsu(int timeout);
|
||||
|
||||
/**
|
||||
* End rolling schema upgrade operation.
|
||||
*/
|
||||
int end_rsu();
|
||||
|
||||
/**
|
||||
* Begin non-blocking operation.
|
||||
*/
|
||||
@ -691,6 +706,7 @@ namespace wsrep
|
||||
case wsrep::client_state::m_replicating: return "replicating";
|
||||
case wsrep::client_state::m_high_priority: return "high priority";
|
||||
case wsrep::client_state::m_toi: return "toi";
|
||||
case wsrep::client_state::m_rsu: return "rsu";
|
||||
}
|
||||
return "unknown";
|
||||
}
|
||||
|
@ -125,6 +125,15 @@ namespace wsrep
|
||||
const wsrep::gtid& gtid,
|
||||
bool bypass) = 0;
|
||||
|
||||
|
||||
/**
|
||||
* Wait until committing transactions have completed.
|
||||
* Prior calling this method the server should have been
|
||||
* desynced from the group to disallow further transactions
|
||||
* to start committing.
|
||||
*/
|
||||
virtual int wait_committing_transactions(int timeout) = 0;
|
||||
|
||||
/**
|
||||
* Provide a server level debug sync point for a caller.
|
||||
*/
|
||||
|
@ -300,6 +300,51 @@ int wsrep::client_state::leave_toi()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int wsrep::client_state::begin_rsu(int timeout)
|
||||
{
|
||||
if (server_state_.desync())
|
||||
{
|
||||
wsrep::log_warning() << "Failed to desync server";
|
||||
return 1;
|
||||
}
|
||||
if (server_state_.server_service().wait_committing_transactions(timeout))
|
||||
{
|
||||
wsrep::log_warning() << "RSU failed due to pending transactions";
|
||||
server_state_.resync();
|
||||
return 1;
|
||||
}
|
||||
wsrep::seqno pause_seqno(server_state_.pause());
|
||||
if (pause_seqno.is_undefined())
|
||||
{
|
||||
wsrep::log_warning() << "Failed to pause provider";
|
||||
server_state_.resync();
|
||||
return 1;
|
||||
}
|
||||
wsrep::log_info() << "Provider paused at: " << pause_seqno;
|
||||
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
||||
toi_mode_ = mode_;
|
||||
mode(lock, m_rsu);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int wsrep::client_state::end_rsu()
|
||||
{
|
||||
int ret(0);
|
||||
try
|
||||
{
|
||||
server_state_.resume();
|
||||
server_state_.resync();
|
||||
}
|
||||
catch (const wsrep::runtime_error& e)
|
||||
{
|
||||
wsrep::log_warning() << "End RSU failed: " << e.what();
|
||||
ret = 1;
|
||||
}
|
||||
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
||||
mode(lock, toi_mode_);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int wsrep::client_state::sync_wait(int timeout)
|
||||
{
|
||||
std::pair<wsrep::gtid, enum wsrep::provider::status> result(
|
||||
@ -382,12 +427,13 @@ void wsrep::client_state::mode(
|
||||
enum mode mode)
|
||||
{
|
||||
assert(lock.owns_lock());
|
||||
static const char allowed[mode_max_][mode_max_] =
|
||||
{ /* l r h t */
|
||||
{ 0, 0, 0, 0 }, /* local */
|
||||
{ 0, 0, 1, 1 }, /* repl */
|
||||
{ 0, 1, 0, 1 }, /* high prio */
|
||||
{ 0, 1, 1, 0 } /* toi */
|
||||
static const char allowed[n_modes_][n_modes_] =
|
||||
{ /* l r h t r */
|
||||
{ 0, 0, 0, 0, 0 }, /* local */
|
||||
{ 0, 0, 1, 1, 1 }, /* repl */
|
||||
{ 0, 1, 0, 1, 0 }, /* high prio */
|
||||
{ 0, 1, 1, 0, 0 }, /* toi */
|
||||
{ 0, 1, 0, 0, 0 } /* rsu */
|
||||
};
|
||||
if (allowed[mode_][mode])
|
||||
{
|
||||
|
@ -85,10 +85,6 @@ namespace wsrep
|
||||
}
|
||||
void log_view(const wsrep::view&) { }
|
||||
|
||||
// void on_connect(const wsrep::gtid& ) WSREP_OVERRIDE { }
|
||||
// void wait_until_connected() WSREP_OVERRIDE { }
|
||||
// void on_view(const wsrep::view&) WSREP_OVERRIDE { }
|
||||
// void on_sync() WSREP_OVERRIDE { }
|
||||
bool sst_before_init() const WSREP_OVERRIDE
|
||||
{ return sst_before_init_; }
|
||||
std::string sst_request() WSREP_OVERRIDE { return ""; }
|
||||
@ -102,6 +98,8 @@ namespace wsrep
|
||||
client_state.after_rollback();
|
||||
}
|
||||
|
||||
int wait_committing_transactions(int) WSREP_OVERRIDE { return 0; }
|
||||
|
||||
void debug_sync(const char* sync_point) WSREP_OVERRIDE
|
||||
{
|
||||
if (sync_point_enabled_ == sync_point)
|
||||
|
Reference in New Issue
Block a user