1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-31 18:24:25 +03:00

* Unit test for SR 2PC

* Removed redundant data class
* Introduced const_buffer and mutable_buffer
* Transaction context and client context interface refactoring
This commit is contained in:
Teemu Ollakka
2018-06-12 18:04:32 +03:00
parent 0186342092
commit 9e8e6d47ba
17 changed files with 148 additions and 96 deletions

View File

@ -5,27 +5,52 @@
#ifndef WSREP_BUFFER_HPP #ifndef WSREP_BUFFER_HPP
#define WSREP_BUFFER_HPP #define WSREP_BUFFER_HPP
#include <vector>
namespace wsrep namespace wsrep
{ {
class buffer class const_buffer
{ {
public: public:
buffer() const_buffer()
: ptr_() : ptr_()
, size_() , size_()
{ } { }
buffer(const void* ptr, size_t size)
const_buffer(const void* ptr, size_t size)
: ptr_(ptr) : ptr_(ptr)
, size_(size) , size_(size)
{ } { }
const void* ptr() const { return ptr_; } const void* ptr() const { return ptr_; }
const void* data() const { return ptr_; }
size_t size() const { return size_; } size_t size() const { return size_; }
private: private:
// const_buffer(const const_buffer&);
// const_buffer& operator=(const const_buffer&);
const void* ptr_; const void* ptr_;
size_t size_; size_t size_;
}; };
class mutable_buffer
{
public:
mutable_buffer()
: buffer_()
{ }
void push_back(const char* begin, const char* end)
{
buffer_.insert(buffer_.end(), begin, end);
}
const char* data() const { return &buffer_[0]; }
size_t size() const { return buffer_.size(); }
private:
std::vector<char> buffer_;
};
} }
#endif // WSREP_BUFFER_HPP #endif // WSREP_BUFFER_HPP

View File

@ -44,7 +44,7 @@
#include "client_id.hpp" #include "client_id.hpp"
#include "mutex.hpp" #include "mutex.hpp"
#include "lock.hpp" #include "lock.hpp"
#include "data.hpp" #include "buffer.hpp"
#include "thread.hpp" #include "thread.hpp"
@ -242,7 +242,7 @@ namespace wsrep
return transaction_.append_key(key); return transaction_.append_key(key);
} }
int append_data(const wsrep::data& data) int append_data(const wsrep::const_buffer& data)
{ {
assert(state_ == s_exec); assert(state_ == s_exec);
return transaction_.append_data(data); return transaction_.append_data(data);
@ -400,7 +400,7 @@ namespace wsrep
friend int server_context::on_apply(client_context&, friend int server_context::on_apply(client_context&,
const wsrep::ws_handle&, const wsrep::ws_handle&,
const wsrep::ws_meta&, const wsrep::ws_meta&,
const wsrep::data&); const wsrep::const_buffer&);
friend class client_context_switch; friend class client_context_switch;
friend class client_applier_mode; friend class client_applier_mode;
friend class client_toi_mode; friend class client_toi_mode;
@ -425,18 +425,17 @@ namespace wsrep
/*! /*!
* Append SR fragment to the transaction. * Append SR fragment to the transaction.
*/ */
virtual int append_fragment(wsrep::transaction_context&, virtual int append_fragment(const wsrep::transaction_context&,
int, const wsrep::data&) int, const wsrep::const_buffer&) = 0;
{ return 0; }
virtual void remove_fragments(const wsrep::transaction_context&) = 0;
/*! /*!
* This method applies a write set give in data buffer. * This method applies a write set give in data buffer.
* This must be implemented by the DBMS integration. * This must be implemented by the DBMS integration.
* *
* \return Zero on success, non-zero on applying failure. * \return Zero on success, non-zero on applying failure.
*/ */
virtual int apply(const wsrep::data& data) = 0; virtual int apply(const wsrep::const_buffer& data) = 0;
/*! /*!
* Virtual method which will be called * Virtual method which will be called
@ -474,8 +473,10 @@ namespace wsrep
virtual void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>&) = 0; virtual void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>&) = 0;
virtual int prepare_data_for_replication( virtual int prepare_data_for_replication(
const wsrep::transaction_context&, wsrep::data& data) = 0; const wsrep::transaction_context&) = 0;
virtual int prepare_fragment_for_replication(
const wsrep::transaction_context&, wsrep::mutable_buffer&) = 0;
/*! /*!
* Return true if the current client operation was killed. * Return true if the current client operation was killed.
*/ */

View File

@ -1,27 +0,0 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_DATA_HPP
#define WSREP_DATA_HPP
namespace wsrep
{
class data
{
public:
data()
: buf_()
{
}
data(const void* ptr, size_t len)
: buf_(ptr, len)
{
}
const wsrep::buffer& get() const { return buf_; }
private:
wsrep::buffer buf_;
};
}
#endif // WSREP_DATA_HPP

View File

@ -33,7 +33,7 @@ namespace wsrep
{ {
throw wsrep::runtime_error("key parts exceed maximum of 3"); throw wsrep::runtime_error("key parts exceed maximum of 3");
} }
key_parts_[key_parts_len_] = wsrep::buffer(ptr, len); key_parts_[key_parts_len_] = wsrep::const_buffer(ptr, len);
++key_parts_len_; ++key_parts_len_;
} }
@ -47,14 +47,14 @@ namespace wsrep
return key_parts_len_; return key_parts_len_;
} }
const wsrep::buffer* key_parts() const const wsrep::const_buffer* key_parts() const
{ {
return key_parts_; return key_parts_;
} }
private: private:
enum type type_; enum type type_;
wsrep::buffer key_parts_[3]; wsrep::const_buffer key_parts_[3];
size_t key_parts_len_; size_t key_parts_len_;
}; };
} }

View File

@ -7,7 +7,7 @@
#include "gtid.hpp" #include "gtid.hpp"
#include "key.hpp" #include "key.hpp"
#include "data.hpp" #include "buffer.hpp"
#include "client_id.hpp" #include "client_id.hpp"
#include "transaction_id.hpp" #include "transaction_id.hpp"
@ -213,7 +213,7 @@ namespace wsrep
// TODO: Rename to assing_read_view() // TODO: Rename to assing_read_view()
virtual int start_transaction(wsrep::ws_handle&) = 0; virtual int start_transaction(wsrep::ws_handle&) = 0;
virtual int append_key(wsrep::ws_handle&, const wsrep::key&) = 0; virtual int append_key(wsrep::ws_handle&, const wsrep::key&) = 0;
virtual int append_data(wsrep::ws_handle&, const wsrep::data&) = 0; virtual int append_data(wsrep::ws_handle&, const wsrep::const_buffer&) = 0;
virtual enum status virtual enum status
certify(wsrep::client_id, wsrep::ws_handle&, certify(wsrep::client_id, wsrep::ws_handle&,
int, int,

View File

@ -77,7 +77,7 @@ namespace wsrep
class transaction_context; class transaction_context;
class gtid; class gtid;
class view; class view;
class data; class const_buffer;
/*! \class Server Context /*! \class Server Context
* *
@ -342,7 +342,7 @@ namespace wsrep
int on_apply(wsrep::client_context& client_context, int on_apply(wsrep::client_context& client_context,
const wsrep::ws_handle& ws_handle, const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta, const wsrep::ws_meta& ws_meta,
const wsrep::data& data); const wsrep::const_buffer& data);
/*! /*!
* This virtual method should be implemented by the DBMS * This virtual method should be implemented by the DBMS

View File

@ -17,7 +17,7 @@ namespace wsrep
{ {
class client_context; class client_context;
class key; class key;
class data; class const_buffer;
class transaction_context class transaction_context
@ -67,10 +67,13 @@ namespace wsrep
bool ordered() const bool ordered() const
{ return (ws_meta_.seqno().nil() == false); } { return (ws_meta_.seqno().nil() == false); }
/*!
* Return true if any fragments have been succesfully certified
* for the transaction.
*/
bool is_streaming() const bool is_streaming() const
{ {
// Streaming support not yet implemented return (streaming_context_.fragments_certified() > 0);
return false;
} }
bool pa_unsafe() const { return pa_unsafe_; } bool pa_unsafe() const { return pa_unsafe_; }
@ -91,7 +94,7 @@ namespace wsrep
int append_key(const wsrep::key&); int append_key(const wsrep::key&);
int append_data(const wsrep::data&); int append_data(const wsrep::const_buffer&);
int after_row(); int after_row();
@ -132,7 +135,6 @@ namespace wsrep
void flags(int flags) { flags_ = flags; } void flags(int flags) { flags_ = flags; }
int certify_fragment(wsrep::unique_lock<wsrep::mutex>&); int certify_fragment(wsrep::unique_lock<wsrep::mutex>&);
int certify_commit(wsrep::unique_lock<wsrep::mutex>&); int certify_commit(wsrep::unique_lock<wsrep::mutex>&);
void remove_fragments();
void clear_fragments(); void clear_fragments();
void cleanup(); void cleanup();
void debug_log_state(const char*) const; void debug_log_state(const char*) const;
@ -215,6 +217,14 @@ namespace wsrep
size_t unit_counter() const { return unit_counter_; } size_t unit_counter() const { return unit_counter_; }
void increment_unit_counter() { ++unit_counter_; } void increment_unit_counter() { ++unit_counter_; }
void cleanup()
{
fragments_.clear();
rollback_replicated_for_ = wsrep::transaction_id::invalid();
bytes_certified_ = 0;
unit_counter_ = 0;
}
private: private:
std::vector<wsrep::seqno> fragments_; std::vector<wsrep::seqno> fragments_;
wsrep::transaction_id rollback_replicated_for_; wsrep::transaction_id rollback_replicated_for_;

View File

@ -13,7 +13,6 @@
#include "wsrep/client_context.hpp" #include "wsrep/client_context.hpp"
#include "wsrep/transaction_context.hpp" #include "wsrep/transaction_context.hpp"
#include "wsrep/key.hpp" #include "wsrep/key.hpp"
#include "wsrep/data.hpp"
#include "wsrep/provider.hpp" #include "wsrep/provider.hpp"
#include "wsrep/condition_variable.hpp" #include "wsrep/condition_variable.hpp"
#include "wsrep/view.hpp" #include "wsrep/view.hpp"
@ -274,7 +273,7 @@ public:
dbms_storage_engine& storage_engine() { return storage_engine_; } dbms_storage_engine& storage_engine() { return storage_engine_; }
int apply_to_storage_engine(const wsrep::transaction_context& txc, int apply_to_storage_engine(const wsrep::transaction_context& txc,
const wsrep::data&) const wsrep::const_buffer&)
{ {
storage_engine_.bf_abort_some(txc); storage_engine_.bf_abort_some(txc);
return 0; return 0;
@ -346,7 +345,15 @@ public:
private: private:
bool is_autocommit() const override { return false; } bool is_autocommit() const override { return false; }
bool do_2pc() const override { return false; } bool do_2pc() const override { return false; }
int apply(const wsrep::data& data) override int append_fragment(const wsrep::transaction_context&,
int, const wsrep::const_buffer&) override
{
return 0;
}
void remove_fragments(const wsrep::transaction_context&) override
{
}
int apply(const wsrep::const_buffer& data) override
{ {
return server_.apply_to_storage_engine(transaction(), data); return server_.apply_to_storage_engine(transaction(), data);
} }
@ -381,12 +388,12 @@ private:
} }
void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>&) override void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>&) override
{ } { }
int prepare_data_for_replication(const wsrep::transaction_context&, int prepare_data_for_replication(const wsrep::transaction_context&)
wsrep::data&)
override override
{ { return 0; }
return 0; int prepare_fragment_for_replication(const wsrep::transaction_context&,
} wsrep::mutable_buffer&) override
{ return 0; }
bool killed() const override { return false; } bool killed() const override { return false; }
void abort() override { ::abort(); } void abort() override { ::abort(); }
public: public:
@ -454,8 +461,9 @@ private:
key.append_key_part(&client_key, sizeof(client_key)); key.append_key_part(&client_key, sizeof(client_key));
key.append_key_part(&data, sizeof(data)); key.append_key_part(&data, sizeof(data));
err = append_key(key); err = append_key(key);
err = err || append_data(wsrep::data(os.str().c_str(), err = err || append_data(
os.str().size())); wsrep::const_buffer(os.str().c_str(),
os.str().size()));
return err; return err;
}); });
err = err || client_command( err = err || client_command(

View File

@ -7,7 +7,7 @@
int wsrep::mock_client_context::apply( int wsrep::mock_client_context::apply(
const wsrep::data& data __attribute__((unused))) const wsrep::const_buffer& data __attribute__((unused)))
{ {
assert(transaction_.state() == wsrep::transaction_context::s_executing); assert(transaction_.state() == wsrep::transaction_context::s_executing);

View File

@ -42,11 +42,16 @@ namespace wsrep
(void)rollback(); (void)rollback();
} }
} }
int apply(const wsrep::data&); int apply(const wsrep::const_buffer&);
int commit(); int commit();
int rollback(); int rollback();
bool is_autocommit() const { return is_autocommit_; } bool is_autocommit() const { return is_autocommit_; }
bool do_2pc() const { return do_2pc_; } bool do_2pc() const { return do_2pc_; }
int append_fragment(const wsrep::transaction_context&,
int, const wsrep::const_buffer&) WSREP_OVERRIDE
{ return 0; }
void remove_fragments(const wsrep::transaction_context& )
WSREP_OVERRIDE { }
void will_replay(wsrep::transaction_context&) WSREP_OVERRIDE { } void will_replay(wsrep::transaction_context&) WSREP_OVERRIDE { }
int replay(wsrep::transaction_context& tc) WSREP_OVERRIDE int replay(wsrep::transaction_context& tc) WSREP_OVERRIDE
{ {
@ -68,17 +73,31 @@ namespace wsrep
lock.lock(); lock.lock();
} }
int prepare_data_for_replication( int prepare_data_for_replication(
const wsrep::transaction_context&, wsrep::data& data) WSREP_OVERRIDE const wsrep::transaction_context&) WSREP_OVERRIDE
{ {
if (error_during_prepare_data_) if (error_during_prepare_data_)
{ {
return 1; return 1;
} }
static const char buf[1] = { 1 }; static const char buf[1] = { 1 };
data = wsrep::data(buf, 1); wsrep::const_buffer data = wsrep::const_buffer(buf, 1);
return 0; return transaction_.append_data(data);
} }
int prepare_fragment_for_replication(const wsrep::transaction_context&,
wsrep::mutable_buffer& buffer)
WSREP_OVERRIDE
{
if (error_during_prepare_data_)
{
return 1;
}
static const char buf[1] = { 1 };
buffer.push_back(&buf[0], &buf[1]);
wsrep::const_buffer data(buffer.data(), buffer.size());
return transaction_.append_data(data);
}
bool killed() const WSREP_OVERRIDE { return killed_before_certify_; } bool killed() const WSREP_OVERRIDE { return killed_before_certify_; }
void abort() WSREP_OVERRIDE { ++aborts_; } void abort() WSREP_OVERRIDE { ++aborts_; }
void store_globals() WSREP_OVERRIDE { } void store_globals() WSREP_OVERRIDE { }
@ -94,7 +113,7 @@ namespace wsrep
} }
void debug_suicide(const char*) WSREP_OVERRIDE void debug_suicide(const char*) WSREP_OVERRIDE
{ {
::abort(); // Not going to do this while unit testing
} }
void on_error(enum wsrep::client_error) { } void on_error(enum wsrep::client_error) { }

View File

@ -7,6 +7,7 @@
#include "wsrep/provider.hpp" #include "wsrep/provider.hpp"
#include "wsrep/logger.hpp" #include "wsrep/logger.hpp"
#include "wsrep/buffer.hpp"
#include <cstring> #include <cstring>
#include <map> #include <map>
@ -50,7 +51,10 @@ namespace wsrep
{ {
wsrep::log_info() << "provider certify: " wsrep::log_info() << "provider certify: "
<< "client: " << client_id.get() << "client: " << client_id.get()
<< " flags: " << std::hex << flags; << " flags: " << std::hex << flags
<< std::dec
<< "next_error: " << next_error_;
if (next_error_) if (next_error_)
{ {
return next_error_; return next_error_;
@ -108,9 +112,9 @@ namespace wsrep
} }
int append_key(wsrep::ws_handle&, const wsrep::key&) int append_key(wsrep::ws_handle&, const wsrep::key&)
{ return next_error_; } { return 0; }
int append_data(wsrep::ws_handle&, const wsrep::data&) int append_data(wsrep::ws_handle&, const wsrep::const_buffer&)
{ return next_error_; } { return 0; }
int rollback(const wsrep::transaction_id) int rollback(const wsrep::transaction_id)
{ return next_error_; } { return next_error_; }
enum wsrep::provider::status enum wsrep::provider::status

View File

@ -114,7 +114,7 @@ int wsrep::server_context::on_apply(
wsrep::client_context& client_context, wsrep::client_context& client_context,
const wsrep::ws_handle& ws_handle, const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta, const wsrep::ws_meta& ws_meta,
const wsrep::data& data) const wsrep::const_buffer& data)
{ {
int ret(0); int ret(0);
const wsrep::transaction_context& txc(client_context.transaction()); const wsrep::transaction_context& txc(client_context.transaction());

View File

@ -39,7 +39,7 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_1pc,
{ {
char buf[1] = { 1 }; char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
wsrep::data(buf, 1)) == 0); wsrep::const_buffer(buf, 1)) == 0);
const wsrep::transaction_context& txc(cc.transaction()); const wsrep::transaction_context& txc(cc.transaction());
// ::abort(); // ::abort();
BOOST_REQUIRE_MESSAGE( BOOST_REQUIRE_MESSAGE(
@ -53,7 +53,7 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_2pc,
{ {
char buf[1] = { 1 }; char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
wsrep::data(buf, 1)) == 0); wsrep::const_buffer(buf, 1)) == 0);
const wsrep::transaction_context& txc(cc.transaction()); const wsrep::transaction_context& txc(cc.transaction());
BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_committed); BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_committed);
} }
@ -66,7 +66,7 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_1pc_rollback,
cc.fail_next_applying_ = true; cc.fail_next_applying_ = true;
char buf[1] = { 1 }; char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
wsrep::data(buf, 1)) == 1); wsrep::const_buffer(buf, 1)) == 1);
const wsrep::transaction_context& txc(cc.transaction()); const wsrep::transaction_context& txc(cc.transaction());
BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_aborted); BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_aborted);
} }
@ -79,7 +79,7 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_2pc_rollback,
cc.fail_next_applying_ = true; cc.fail_next_applying_ = true;
char buf[1] = { 1 }; char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
wsrep::data(buf, 1)) == 1); wsrep::const_buffer(buf, 1)) == 1);
const wsrep::transaction_context& txc(cc.transaction()); const wsrep::transaction_context& txc(cc.transaction());
BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_aborted); BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_aborted);
} }

View File

@ -6,7 +6,6 @@
#include "wsrep/client_context.hpp" #include "wsrep/client_context.hpp"
#include "wsrep/server_context.hpp" #include "wsrep/server_context.hpp"
#include "wsrep/key.hpp" #include "wsrep/key.hpp"
#include "wsrep/data.hpp"
#include "wsrep/logger.hpp" #include "wsrep/logger.hpp"
#include "wsrep/compiler.hpp" #include "wsrep/compiler.hpp"
@ -87,7 +86,7 @@ int wsrep::transaction_context::append_key(const wsrep::key& key)
return provider_.append_key(ws_handle_, key); return provider_.append_key(ws_handle_, key);
} }
int wsrep::transaction_context::append_data(const wsrep::data& data) int wsrep::transaction_context::append_data(const wsrep::const_buffer& data)
{ {
return provider_.append_data(ws_handle_, data); return provider_.append_data(ws_handle_, data);
@ -149,7 +148,7 @@ int wsrep::transaction_context::before_prepare()
} }
else else
{ {
remove_fragments(); client_context_.remove_fragments(*this);
} }
lock.lock(); lock.lock();
client_context_.debug_suicide( client_context_.debug_suicide(
@ -658,8 +657,8 @@ int wsrep::transaction_context::certify_fragment(
flags_ |= wsrep::provider::flag::start_transaction; flags_ |= wsrep::provider::flag::start_transaction;
} }
wsrep::data data; wsrep::mutable_buffer data;
if (client_context_.prepare_data_for_replication(*this, data)) if (client_context_.prepare_fragment_for_replication(*this, data))
{ {
lock.lock(); lock.lock();
state(lock, s_must_abort); state(lock, s_must_abort);
@ -681,7 +680,8 @@ int wsrep::transaction_context::certify_fragment(
sr_transaction_context.state(sr_lock, s_certifying); sr_transaction_context.state(sr_lock, s_certifying);
sr_lock.unlock(); sr_lock.unlock();
if (sr_client_context->append_fragment( if (sr_client_context->append_fragment(
sr_transaction_context, flags_, data)) sr_transaction_context, flags_,
wsrep::const_buffer(data.data(), data.size())))
{ {
lock.lock(); lock.lock();
state(lock, s_must_abort); state(lock, s_must_abort);
@ -750,8 +750,7 @@ int wsrep::transaction_context::certify_commit(
flags(flags() | wsrep::provider::flag::commit); flags(flags() | wsrep::provider::flag::commit);
lock.unlock(); lock.unlock();
wsrep::data data; if (client_context_.prepare_data_for_replication(*this))
if (client_context_.prepare_data_for_replication(*this, data))
{ {
// Note: Error must be set by prepare_data_for_replication() // Note: Error must be set by prepare_data_for_replication()
lock.lock(); lock.lock();
@ -870,14 +869,9 @@ int wsrep::transaction_context::certify_commit(
return ret; return ret;
} }
void wsrep::transaction_context::remove_fragments()
{
throw wsrep::not_implemented_error();
}
void wsrep::transaction_context::clear_fragments() void wsrep::transaction_context::clear_fragments()
{ {
throw wsrep::not_implemented_error(); streaming_context_.cleanup();
} }
void wsrep::transaction_context::cleanup() void wsrep::transaction_context::cleanup()

View File

@ -140,7 +140,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_append_key_data,
key.append_key_part(&vals[i], sizeof(vals[i])); key.append_key_part(&vals[i], sizeof(vals[i]));
} }
BOOST_REQUIRE(cc.append_key(key) == 0); BOOST_REQUIRE(cc.append_key(key) == 0);
wsrep::data data(&vals[2], sizeof(vals[2])); wsrep::const_buffer data(&vals[2], sizeof(vals[2]));
BOOST_REQUIRE(cc.append_data(data) == 0); BOOST_REQUIRE(cc.append_data(data) == 0);
BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(cc.before_commit() == 0);
BOOST_REQUIRE(cc.ordered_commit() == 0); BOOST_REQUIRE(cc.ordered_commit() == 0);
@ -1204,3 +1204,21 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_1pc_commit,
BOOST_REQUIRE(sc.provider().commit_fragments() == 1); BOOST_REQUIRE(sc.provider().commit_fragments() == 1);
} }
BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_2pc_commit,
streaming_client_fixture_row)
{
BOOST_REQUIRE(cc.start_transaction(1) == 0);
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1);
BOOST_REQUIRE(cc.before_prepare() == 0);
BOOST_REQUIRE(cc.after_prepare() == 0);
BOOST_REQUIRE(cc.before_commit() == 0);
BOOST_REQUIRE(cc.ordered_commit() == 0);
BOOST_REQUIRE(cc.after_commit() == 0);
BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success);
BOOST_REQUIRE(sc.provider().fragments() == 2);
BOOST_REQUIRE(sc.provider().start_fragments() == 1);
BOOST_REQUIRE(sc.provider().commit_fragments() == 1);
}

View File

@ -314,7 +314,7 @@ namespace
assert(client_context); assert(client_context);
assert(client_context->mode() == wsrep::client_context::m_applier); assert(client_context->mode() == wsrep::client_context::m_applier);
wsrep::data data(buf->ptr, buf->len); wsrep::const_buffer data(buf->ptr, buf->len);
wsrep::ws_handle ws_handle(wsh->trx_id, wsh->opaque); wsrep::ws_handle ws_handle(wsh->trx_id, wsh->opaque);
wsrep::ws_meta ws_meta( wsrep::ws_meta ws_meta(
wsrep::gtid(wsrep::id(meta->gtid.uuid.data, wsrep::gtid(wsrep::id(meta->gtid.uuid.data,
@ -484,9 +484,9 @@ int wsrep::wsrep_provider_v26::append_key(wsrep::ws_handle& ws_handle,
} }
int wsrep::wsrep_provider_v26::append_data(wsrep::ws_handle& ws_handle, int wsrep::wsrep_provider_v26::append_data(wsrep::ws_handle& ws_handle,
const wsrep::data& data) const wsrep::const_buffer& data)
{ {
const wsrep_buf_t wsrep_buf = {data.get().ptr(), data.get().size()}; const wsrep_buf_t wsrep_buf = {data.data(), data.size()};
mutable_ws_handle mwsh(ws_handle); mutable_ws_handle mwsh(ws_handle);
return (wsrep_->append_data(wsrep_, mwsh.native(), &wsrep_buf, return (wsrep_->append_data(wsrep_, mwsh.native(), &wsrep_buf,
1, WSREP_DATA_ORDERED, true) 1, WSREP_DATA_ORDERED, true)

View File

@ -25,7 +25,7 @@ namespace wsrep
enum wsrep::provider::status run_applier(void*); enum wsrep::provider::status run_applier(void*);
int start_transaction(wsrep::ws_handle&) { return 0; } int start_transaction(wsrep::ws_handle&) { return 0; }
int append_key(wsrep::ws_handle&, const wsrep::key&); int append_key(wsrep::ws_handle&, const wsrep::key&);
int append_data(wsrep::ws_handle&, const wsrep::data&); int append_data(wsrep::ws_handle&, const wsrep::const_buffer&);
enum wsrep::provider::status enum wsrep::provider::status
certify(wsrep::client_id, wsrep::ws_handle&, certify(wsrep::client_id, wsrep::ws_handle&,
int, int,