1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-24 10:42:31 +03:00

Collect and append SR commit keys.

This commit is contained in:
Teemu Ollakka
2018-07-09 20:16:31 +03:00
parent 6f68c70d37
commit 8c4a786f79
3 changed files with 87 additions and 3 deletions

View File

@ -0,0 +1,44 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_SR_KEY_SET_HPP
#define WSREP_SR_KEY_SET_HPP
#include <set>
#include <map>
namespace wsrep
{
class sr_key_set
{
public:
typedef std::set<std::string> leaf_type;
typedef std::map<std::string, leaf_type > branch_type;
sr_key_set()
: root_()
{ }
void insert(const wsrep::key& key)
{
assert(key.size() == 3);
if (key.size() < 3)
{
throw wsrep::runtime_error("Invalid key size");
}
root_[std::string(
static_cast<const char*>(key.key_parts()[0].data()),
key.key_parts()[0].size())].insert(
std::string(static_cast<const char*>(key.key_parts()[1].data()),
key.key_parts()[1].size()));
}
const branch_type& root() const { return root_; }
void clear() { root_.clear(); }
private:
branch_type root_;
};
}
#endif // WSREP_KEY_SET_HPP

View File

@ -11,6 +11,7 @@
#include "transaction_id.hpp"
#include "streaming_context.hpp"
#include "lock.hpp"
#include "sr_key_set.hpp"
#include <cassert>
#include <vector>
@ -149,6 +150,7 @@ namespace wsrep
void flags(int flags) { flags_ = flags; }
int certify_fragment(wsrep::unique_lock<wsrep::mutex>&);
int certify_commit(wsrep::unique_lock<wsrep::mutex>&);
int append_sr_keys_for_commit();
void streaming_rollback();
void clear_fragments();
void cleanup();
@ -169,6 +171,7 @@ namespace wsrep
bool pa_unsafe_;
bool certified_;
wsrep::streaming_context streaming_context_;
wsrep::sr_key_set sr_keys_;
};
static inline const char* to_c_string(enum wsrep::transaction::state state)

View File

@ -98,6 +98,7 @@ wsrep::transaction::transaction(
, pa_unsafe_(false)
, certified_(false)
, streaming_context_()
, sr_keys_()
{ }
@ -192,7 +193,16 @@ int wsrep::transaction::start_replaying(const wsrep::ws_meta& ws_meta)
int wsrep::transaction::append_key(const wsrep::key& key)
{
/** @todo Collect table level keys for SR commit */
return provider().append_key(ws_handle_, key);
try
{
sr_keys_.insert(key);
return provider().append_key(ws_handle_, key);
}
catch (...)
{
wsrep::log_error() << "Failed to append key";
return 1;
}
}
int wsrep::transaction::append_data(const wsrep::const_buffer& data)
@ -954,11 +964,17 @@ int wsrep::transaction::certify_commit(
}
state(lock, s_certifying);
lock.unlock();
if (is_streaming())
{
append_sr_keys_for_commit();
flags(flags() | wsrep::provider::flag::pa_unsafe);
}
sr_keys_.clear();
flags(flags() | wsrep::provider::flag::commit);
lock.unlock();
if (client_service_.prepare_data_for_replication())
{
lock.lock();
@ -1100,6 +1116,27 @@ int wsrep::transaction::certify_commit(
return ret;
}
int wsrep::transaction::append_sr_keys_for_commit()
{
int ret(0);
assert(client_state_.mode() == wsrep::client_state::m_local);
for (wsrep::sr_key_set::branch_type::const_iterator
i(sr_keys_.root().begin());
ret == 0 && i != sr_keys_.root().end(); ++i)
{
for (wsrep::sr_key_set::leaf_type::const_iterator
j(i->second.begin());
ret == 0 && j != i->second.end(); ++j)
{
wsrep::key key(wsrep::key::shared);
key.append_key_part(i->first.data(), i->first.size());
key.append_key_part(j->data(), j->size());
ret = provider().append_key(ws_handle_, key);
}
}
return ret;
}
void wsrep::transaction::streaming_rollback()
{
assert(streaming_context_.rolled_back() == false);