1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-21 12:22:06 +03:00

* Moved causal reads/gtid wait into server state interface

* Changed undefined seqno to be defined as -1
This commit is contained in:
Teemu Ollakka
2018-06-21 10:37:55 +03:00
parent ef0fb72b73
commit 3a8861b26b
12 changed files with 167 additions and 65 deletions

View File

@ -4,10 +4,17 @@
/** @file client_state.hpp
*
*
* Return value conventions:
*
* The calls which may alter either client_state or associated
* transaction state will generally return zero on success and
* non-zero on failure. More detailed error information is stored
* into client state and persisted there until explicitly cleared.
*/
#ifndef WSREP_CLIENT_CONTEXT_HPP
#define WSREP_CLIENT_CONTEXT_HPP
#ifndef WSREP_CLIENT_STATE_HPP
#define WSREP_CLIENT_STATE_HPP
#include "server_state.hpp"
#include "provider.hpp"
@ -34,7 +41,7 @@ namespace wsrep
e_append_fragment_error
};
static inline const char* to_string(enum client_error error)
static inline const char* to_c_string(enum client_error error)
{
switch (error)
{
@ -48,6 +55,10 @@ namespace wsrep
return "unknown";
}
static inline std::string to_string(enum client_error error)
{
return to_c_string(error);
}
/**
* Client State
*/
@ -382,45 +393,18 @@ namespace wsrep
transaction_.streaming_context_ = transaction.streaming_context_;
}
//
// Causal reads
//
/**
* Perform a causal read in the cluster. After the call returns,
* all the causally preceding write sets have been committed
* or the error is returned.
*
* This operation may require communication with other processes
* in the DBMS cluster, so it may be relatively heavy operation.
* Method wait_for_gtid() should be used whenever possible.
*/
enum wsrep::provider::status causal_read() const;
/**
* Wait until all the write sets up to given GTID have been
* committed.
*
* @return Zero on success, non-zero on failure.
*/
enum wsrep::provider::status wait_for_gtid(const wsrep::gtid&) const;
//
//
//
/** @name Non-transactional operations */
/** @{*/
/**
* Enter total order isolation critical section.
*/
enum wsrep::provider::status enter_toi();
int enter_toi(const wsrep::key_array&, const wsrep::const_buffer&);
/**
* Leave total order isolation critical section.
*/
void leave_toi();
int leave_toi();
/**
* Begin non-blocking operation.
@ -430,7 +414,7 @@ namespace wsrep
/**
* End non-blocking operation
*/
void end_nbo();
int end_nbo();
/**
* Get reference to the client mutex.
@ -696,4 +680,4 @@ namespace wsrep
};
}
#endif // WSREP_CLIENT_CONTEXT_HPP
#endif // WSREP_CLIENT_STATE_HPP

View File

@ -25,6 +25,10 @@ namespace wsrep
{ }
const wsrep::id& id() const { return id_; }
wsrep::seqno seqno() const { return seqno_ ; }
bool is_undefined() const
{
return (seqno_.is_undefined() && id_.is_undefined());
}
private:
wsrep::id id_;
wsrep::seqno seqno_;

View File

@ -60,11 +60,24 @@ namespace wsrep
{
return (std::memcmp(data_, other.data_, sizeof(data_)) == 0);
}
bool operator!=(const id& other) const
{
return !(*this == other);
}
const void* data() const { return data_; }
size_t size() const { return sizeof(data_); }
bool is_undefined() const
{
return (*this == undefined());
}
static const wsrep::id& undefined()
{
static wsrep::id ret = wsrep::id();
return ret;
}
private:
unsigned char data_[16];
};

View File

@ -232,7 +232,7 @@ namespace wsrep
virtual int desync() = 0;
virtual int resync() = 0;
virtual int pause() = 0;
virtual wsrep::seqno pause() = 0;
virtual int resume() = 0;
// Applier interface
@ -277,6 +277,14 @@ namespace wsrep
virtual enum status replay(
const wsrep::ws_handle& ws_handle, void* applier_ctx) = 0;
/**
* Perform a causal read on cluster.
*
* @param timeout Timeout in seconds
*
* @return Provider status indicating the result of the call.
*/
virtual enum wsrep::provider::status causal_read(int timeout) const = 0;
virtual int sst_sent(const wsrep::gtid&, int) = 0;
virtual int sst_received(const wsrep::gtid&, int) = 0;

View File

@ -23,15 +23,15 @@ namespace wsrep
{
public:
seqno()
: seqno_()
: seqno_(-1)
{ }
explicit seqno(long long seqno)
: seqno_(seqno)
{
if (seqno_ < 0)
if (seqno_ < -1)
{
throw wsrep::runtime_error("Negative seqno given");
throw wsrep::runtime_error("Too negative seqno given");
}
}
@ -40,9 +40,9 @@ namespace wsrep
return seqno_;
}
bool nil() const
bool is_undefined() const
{
return (seqno_ == 0);
return (seqno_ == -1);
}
bool operator<(seqno other) const
@ -54,13 +54,19 @@ namespace wsrep
{
return (seqno_ > other.seqno_);
}
bool operator==(seqno other) const
{
return (seqno_ == other.seqno_);
}
seqno operator+(seqno other) const
{
return (seqno(seqno_ + other.seqno_));
}
static seqno undefined() { return seqno(0); }
seqno operator+(long long other) const
{
return (*this + seqno(other));
}
static seqno undefined() { return seqno(-1); }
private:
long long seqno_;
};

View File

@ -4,11 +4,11 @@
/** @file server_state.hpp
*
* Server Context Abstraction
* Server State Abstraction
* ==========================
*
* This file defines an interface for WSREP Server Context.
* The Server Context will encapsulate server identification,
* This file defines an interface for WSREP Server State.
* The Server State t will encapsulate server identification,
* server state and server capabilities. The class also
* defines an interface for manipulating server state, applying
* of remote transaction write sets, processing SST requests,
@ -56,16 +56,24 @@
* to be aborted or in case of fully optimistic concurrency control,
* the conflict is detected at commit.
*
*
* # Return value conventions
*
* The calls which are proxies to corresponding provider functionality
* will return wsrep::provider::status enum as a result. Otherwise
* the return value is generally zero on success, non zero on failure.
*/
#ifndef WSREP_SERVER_CONTEXT_HPP
#define WSREP_SERVER_CONTEXT_HPP
#ifndef WSREP_SERVER_STATE_HPP
#define WSREP_SERVER_STATE_HPP
#include "mutex.hpp"
#include "condition_variable.hpp"
#include "server_service.hpp"
#include "id.hpp"
#include "transaction_id.hpp"
#include "provider.hpp"
// #include "gtid.hpp"
#include <vector>
#include <string>
@ -76,10 +84,8 @@ namespace wsrep
// Forward declarations
class ws_handle;
class ws_meta;
class provider;
class client_state;
class transaction;
class gtid;
class view;
class const_buffer;
@ -275,6 +281,37 @@ namespace wsrep
*/
void wait_until_state(wsrep::server_state::state) const;
/**
* Set last committed GTID.
*/
void last_committed_gtid(const wsrep::gtid&);
/**
* Return the last committed GTID known to be committed
* on server.
*/
wsrep::gtid last_committed_gtid() const;
/**
* Wait until all the write sets up to given GTID have been
* committed.
*
* @return Zero on success, non-zero on failure.
*/
int wait_for_gtid(const wsrep::gtid&) const;
/**
* Perform a causal read in the cluster. After the call returns,
* all the causally preceding write sets have been committed
* or the error is returned.
*
* This operation may require communication with other processes
* in the DBMS cluster, so it may be relatively heavy operation.
* Method wait_for_gtid() should be used whenever possible.
*
* @param timeout Timeout in seconds
*/
enum wsrep::provider::status causal_read(int timeout) const;
/**
*
*/
@ -369,6 +406,7 @@ namespace wsrep
, address_(address)
, working_dir_(working_dir)
, rollback_mode_(rollback_mode)
, last_committed_gtid_()
, debug_log_level_(0)
{ }
@ -392,6 +430,7 @@ namespace wsrep
std::string address_;
std::string working_dir_;
enum rollback_mode rollback_mode_;
wsrep::gtid last_committed_gtid_;
int debug_log_level_;
};
@ -428,4 +467,4 @@ namespace wsrep
}
#endif // WSREP_SERVER_CONTEXT_HPP
#endif // WSREP_SERVER_STATE_HPP

View File

@ -68,7 +68,7 @@ namespace wsrep
// Return true if the last fragment was ordered by the
// provider
bool ordered() const
{ return (ws_meta_.seqno().nil() == false); }
{ return (ws_meta_.seqno().is_undefined() == false); }
/*!
* Return true if any fragments have been succesfully certified

View File

@ -77,6 +77,41 @@ void wsrep::server_state::wait_until_state(
cond_.notify_all();
}
void wsrep::server_state::last_committed_gtid(const wsrep::gtid& gtid)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(last_committed_gtid_.is_undefined() ||
last_committed_gtid_.seqno() + 1 == gtid.seqno());
last_committed_gtid_ = gtid;
cond_.notify_all();
}
wsrep::gtid wsrep::server_state::last_committed_gtid() const
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
return last_committed_gtid_;
}
int wsrep::server_state::wait_for_gtid(const wsrep::gtid& gtid) const
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
if (gtid.id() != last_committed_gtid_.id())
{
return 1;
}
while (last_committed_gtid_.seqno() < gtid.seqno())
{
cond_.wait(lock);
}
return 0;
}
enum wsrep::provider::status
wsrep::server_state::causal_read(int timeout) const
{
return provider_->causal_read(timeout);
}
void wsrep::server_state::on_connect()
{
wsrep::log() << "Server " << name_ << " connected to cluster";

View File

@ -89,7 +89,7 @@ int wsrep::transaction::start_replaying(const wsrep::ws_meta& ws_meta)
assert(active());
assert(client_state_.mode() == wsrep::client_state::m_high_priority);
assert(state() == s_replaying);
assert(ws_meta_.seqno().nil() == false);
assert(ws_meta_.seqno().is_undefined() == false);
certified_ = true;
return 0;
}

View File

@ -69,12 +69,12 @@ namespace
static inline wsrep_seqno_t seqno_to_native(wsrep::seqno seqno)
{
return (seqno.nil() ? WSREP_SEQNO_UNDEFINED : seqno.get());
return seqno.get();
}
static inline wsrep::seqno seqno_from_native(wsrep_seqno_t seqno)
{
return wsrep::seqno(seqno == WSREP_SEQNO_UNDEFINED ? 0 : seqno);
return wsrep::seqno(seqno);
}
template <typename F, typename T>
inline uint32_t map_one(const int flags, const F from,
@ -520,9 +520,9 @@ int wsrep::wsrep_provider_v26::resync()
return (wsrep_->resync(wsrep_) != WSREP_OK);
}
int wsrep::wsrep_provider_v26::pause()
wsrep::seqno wsrep::wsrep_provider_v26::pause()
{
return (wsrep_->pause(wsrep_) != WSREP_OK);
return wsrep::seqno(wsrep_->pause(wsrep_));
}
int wsrep::wsrep_provider_v26::resume()
@ -633,6 +633,12 @@ wsrep::wsrep_provider_v26::replay(const wsrep::ws_handle& ws_handle,
wsrep_->replay_trx(wsrep_, mwsh.native(), applier_ctx));
}
enum wsrep::provider::status
wsrep::wsrep_provider_v26::causal_read(int timeout) const
{
return map_return_value(wsrep_->sync_wait(wsrep_, 0, timeout, 0));
}
int wsrep::wsrep_provider_v26::sst_sent(const wsrep::gtid& gtid, int err)
{
wsrep_gtid_t wsrep_gtid;

View File

@ -25,7 +25,7 @@ namespace wsrep
int desync();
int resync();
int pause();
wsrep::seqno pause();
int resume();
enum wsrep::provider::status run_applier(void*);
@ -49,6 +49,9 @@ namespace wsrep
const wsrep::ws_meta&);
int release(wsrep::ws_handle&);
enum wsrep::provider::status replay(const wsrep::ws_handle&, void*);
enum wsrep::provider::status causal_read(int) const;
int sst_sent(const wsrep::gtid&,int);
int sst_received(const wsrep::gtid& gtid, int);

View File

@ -46,7 +46,7 @@ namespace wsrep
int capabilities() const { return 0; }
int desync() { return 0; }
int resync() { return 0; }
int pause() { return 0; }
wsrep::seqno pause() { return wsrep::seqno(0); }
int resume() { return 0; }
enum wsrep::provider::status run_applier(void*)
{
@ -104,7 +104,7 @@ namespace wsrep
else
{
enum wsrep::provider::status ret;
if (it->second.nil())
if (it->second.is_undefined())
{
ws_meta = wsrep::ws_meta(wsrep::gtid(), wsrep::stid(),
wsrep::seqno::undefined(), 0);
@ -140,7 +140,7 @@ namespace wsrep
const wsrep::ws_meta& ws_meta)
{
BOOST_REQUIRE(ws_handle.opaque());
BOOST_REQUIRE(ws_meta.seqno().nil() == false);
BOOST_REQUIRE(ws_meta.seqno().is_undefined() == false);
return commit_order_enter_result_;
}
@ -148,7 +148,7 @@ namespace wsrep
const wsrep::ws_meta& ws_meta)
{
BOOST_REQUIRE(ws_handle.opaque());
BOOST_REQUIRE(ws_meta.seqno().nil() == false);
BOOST_REQUIRE(ws_meta.seqno().is_undefined() == false);
return commit_order_leave_result_;
}
@ -170,7 +170,7 @@ namespace wsrep
{
// If the ws_meta was not assigned yet, the certify
// returned early due to BF abort.
if (tc.ws_meta().seqno().nil())
if (tc.ws_meta().seqno().is_undefined())
{
++group_seqno_;
ws_meta = wsrep::ws_meta(
@ -198,6 +198,10 @@ namespace wsrep
return wsrep::provider::success;
}
enum wsrep::provider::status causal_read(int) const WSREP_OVERRIDE
{
return wsrep::provider::success;
}
int sst_sent(const wsrep::gtid&, int) { return 0; }
int sst_received(const wsrep::gtid&, int) { return 0; }
@ -225,7 +229,7 @@ namespace wsrep
wsrep::seqno& victim_seqno)
{
bf_abort_map_.insert(std::make_pair(trx_id, bf_seqno));
if (bf_seqno.nil() == false)
if (bf_seqno.is_undefined() == false)
{
group_seqno_ = bf_seqno.get();
}