1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-30 07:23:07 +03:00

Refactoring continued.

This commit is contained in:
Teemu Ollakka
2018-06-15 15:13:22 +03:00
parent 4fbf1d0cf8
commit b3f60b7be1
13 changed files with 197 additions and 89 deletions

View File

@ -3,9 +3,11 @@
#
add_executable(dbms_simulator
db_storage_engine.cpp
db_client.cpp
# db_client_service.cpp
db_server.cpp
db_storage_engine.cpp
# db_client_service.cpp
# dbms_simulator.cpp
)
target_link_libraries(dbms_simulator wsrep-lib ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_THREAD_LIBRARY})

View File

@ -3,7 +3,20 @@
//
#include "db_client.hpp"
#include "db_server.hpp"
db::client::client(db::server& server,
wsrep::client_id client_id,
enum wsrep::client_context::mode mode,
const db::params& params)
: mutex_()
, params_(params)
, server_context_(server.server_context())
, client_context_(mutex_, server_context_, client_service_, client_id, mode)
, client_service_(server_context_.provider(), client_context_)
, se_trx_(server.storage_engine())
, stats_()
{ }
template <class F>
int db::client::client_command(F f)

View File

@ -7,10 +7,12 @@
#include "db_client_context.hpp"
#include "db_server_context.hpp"
#include "db_client_service.hpp"
#include "db_storage_engine.hpp"
namespace db
{
class server;
class client
{
public:
@ -26,11 +28,20 @@ namespace db
{ }
};
client();
client(db::server&,
wsrep::client_id,
enum wsrep::client_context::mode,
const db::params&);
bool bf_abort(wsrep::seqno);
private:
const struct stats stats() const { return stats_; }
void store_globals() { }
void reset_globals() { }
void start();
private:
friend class db::server_context;
template <class F> int client_command(F f);
void run_one_transaction();
void reset_error();
@ -38,8 +49,9 @@ namespace db
const db::params& params_;
db::server_context& server_context_;
db::client_context client_context_;
db::client_service client_service_;
db::storage_engine::transaction se_trx_;
stats stats_;
struct stats stats_;
};
}

View File

@ -86,6 +86,8 @@ namespace db
void will_replay(const wsrep::transaction_context&) override
{ }
void wait_for_replayers(wsrep::client_context&,
wsrep::unique_lock<wsrep::mutex>&) override { }
enum wsrep::provider::status replay(wsrep::client_context&,
wsrep::transaction_context&)
override
@ -97,6 +99,7 @@ namespace db
return 0;
}
void emergency_shutdown() { ::abort(); }
void debug_sync(wsrep::client_context&, const char*) override { }
void debug_crash(const char*) override { }
private:

View File

@ -3,25 +3,67 @@
//
#include "db_server.hpp"
#include "db_client.hpp"
#include "db_simulator.hpp"
db::server::server(simulator& simulator,
const std::string& name,
const std::string& server_id,
const std::string& address)
: simulator_(simulator)
, storage_engine_(simulator_.params())
, mutex_()
, cond_()
, server_context_(name, server_id, address, "dbsim_" + name + "_data")
, last_client_id_(0)
, last_transaction_id_(0)
, appliers_()
, clients_()
, client_threads_()
{ }
void db::server::applier_thread()
{
wsrep::client_id client_id(last_client_id_.fetch_add(1) + 1);
db::client applier(*this, client_id,
wsrep::client_context::m_applier, simulator_.params());
enum wsrep::provider::status ret(provider().run_applier(&applier));
enum wsrep::provider::status ret(server_context_.provider().run_applier(&applier));
wsrep::log() << "Applier thread exited with error code " << ret;
}
void db::server::start_applier()
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
appliers_.push_back(boost::thread(&server::applier_thread, this));
}
void db::server::stop_applier()
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
appliers_.front().join();
appliers_.erase(appliers_.begin());
}
#if 0
void db::server::on_sst_request(const std::string& req,
const wsrep::gtid& gtid,
bool bypass)
{
simulator_.donate_sst(*this, req, gtid, bypass);
}
wsrep::client_context* db::server::local_client_context()
{
std::ostringstream id_os;
size_t client_id(++last_client_id_);
return new db::client(*this, client_id,
wsrep::client_context::m_replicating,
simulator_.params());
db::client* client(new db::client(*this, client_id,
wsrep::client_context::m_replicating,
simulator_.params()));
return &client->client_context_;
}
#endif
void db::server::start_clients()
{
size_t n_clients(simulator_.params().n_clients);
@ -39,9 +81,9 @@ void db::server::stop_clients()
}
for (const auto& i : clients_)
{
struct db::client::stats stats(i->stats());
const struct db::client::stats& stats(i->stats());
simulator_.stats_.commits += stats.commits;
simulator_.stats_.aborts += stats.aborts;
simulator_.stats_.aborts += stats.rollbacks;
simulator_.stats_.replays += stats.replays;
}
}

View File

@ -5,84 +5,40 @@
#ifndef WSREP_DB_SERVER_HPP
#define WSREP_DB_SERVER_HPP
#include "wsrep/gtid.hpp"
#include "wsrep/client_context.hpp"
#include "db_storage_engine.hpp"
#include "db_server_context.hpp"
#include <boost/thread.hpp>
#include <string>
#include <memory>
namespace db
{
class simulator;
class client;
class server
{
public:
public:
server(simulator& simulator,
const std::string& name,
const std::string& id,
const std::string& address)
: wsrep::server_context(mutex_,
cond_,
name, id, address, name + "_data",
wsrep::server_context::rm_async)
, simulator_(simulator)
, storage_engine_(simulator_.params())
, mutex_()
, cond_()
, last_client_id_(0)
, last_transaction_id_(0)
, appliers_()
, clients_()
, client_threads_()
{ }
const std::string& address);
// Provider management
void applier_thread();
void start_applier()
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
appliers_.push_back(boost::thread(&dbms_server::applier_thread, this));
}
void start_applier();
void stop_applier()
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
appliers_.front().join();
appliers_.erase(appliers_.begin());
}
void stop_applier();
bool sst_before_init() const override { return false; }
std::string on_sst_required()
{
return id();
}
void on_sst_request(const std::string& req,
const wsrep::gtid& gtid,
bool bypass)
{
simulator_.donate_sst(*this, req, gtid, bypass);
}
void background_rollback(wsrep::client_context& cs) override
{
assert(0);
cs.before_rollback();
cs.after_rollback();
}
// Client context management
wsrep::client_context* local_client_context();
wsrep::client_context* streaming_applier_client_context() override
{
throw wsrep::not_implemented_error();
}
void log_dummy_write_set(wsrep::client_context&, const wsrep::ws_meta&)
override
{ }
size_t next_transaction_id()
{
return (last_transaction_id_.fetch_add(1) + 1);
}
storage_engine& storage_engine() { return storage_engine_; }
db::storage_engine& storage_engine() { return storage_engine_; }
int apply_to_storage_engine(const wsrep::transaction_context& txc,
const wsrep::const_buffer&)
@ -93,19 +49,21 @@ public:
void start_clients();
void stop_clients();
void client_thread(const std::shared_ptr<dbms_client>& client);
void client_thread(const std::shared_ptr<db::client>& client);
db::server_context& server_context() { return server_context_; }
private:
void start_client(size_t id);
simulator& simulator_;
storage_engine storage_engine_;
db::simulator& simulator_;
db::storage_engine storage_engine_;
wsrep::default_mutex mutex_;
wsrep::default_condition_variable cond_;
db::server_context server_context_;
std::atomic<size_t> last_client_id_;
std::atomic<size_t> last_transaction_id_;
std::vector<boost::thread> appliers_;
std::vector<std::shared_ptr<dbms_client>> clients_;
std::vector<std::shared_ptr<db::client>> clients_;
std::vector<boost::thread> client_threads_;
};
};

View File

@ -6,6 +6,7 @@
#define WSREP_DB_SERVER_CONTEXT_HPP
#include "wsrep/server_context.hpp"
#include "wsrep/client_context.hpp"
#include <atomic>
@ -14,11 +15,38 @@ namespace db
class server_context : public wsrep::server_context
{
public:
size_t next_transaction_id()
server_context(const std::string& name,
const std::string& server_id,
const std::string& address,
const std::string& working_dir)
: wsrep::server_context(
mutex_,
cond_,
name,
server_id,
address,
working_dir,
wsrep::server_context::rm_async)
, mutex_()
, cond_()
, last_transaction_id_()
{ }
wsrep::client_context* local_client_context() override;
wsrep::client_context* streaming_applier_client_context() override;
void release_client_context(wsrep::client_context*) override;
bool sst_before_init() const override;
void on_sst_request(const std::string&, const wsrep::gtid&, bool) override;
std::string on_sst_required() override;
void background_rollback(wsrep::client_context&) override;
void log_dummy_write_set(wsrep::client_context&, const wsrep::ws_meta&)
override;
size_t next_transaction_id()
{
return (last_transaction_id_.fetch_add(1) + 1);
}
private:
wsrep::default_mutex mutex_;
wsrep::default_condition_variable cond_;
std::atomic<size_t> last_transaction_id_;
};
}

View File

@ -31,7 +31,7 @@ namespace db
void stop();
void donate_sst(server&,
const std::string& req, const wsrep::gtid& gtid, bool);
const simulator_params& params() const
const db::params& params() const
{ return params_; }
std::string stats() const;
private:
@ -44,7 +44,7 @@ namespace db
std::string build_cluster_address() const;
wsrep::default_mutex mutex_;
const simulator_params& params_;
const db::params& params_;
std::map<size_t, std::unique_ptr<server>> servers_;
std::chrono::time_point<std::chrono::steady_clock> clients_start_;
std::chrono::time_point<std::chrono::steady_clock> clients_stop_;

View File

@ -649,6 +649,30 @@ namespace wsrep
enum wsrep::client_context::mode orig_mode_;
};
template <class D>
class scoped_client_context
{
public:
scoped_client_context(wsrep::client_context* client_context, D deleter)
: client_context_(client_context)
, deleter_(deleter)
{
if (client_context_ == 0)
{
throw wsrep::runtime_error("Null client_context provided");
}
}
wsrep::client_context& client_context() { return *client_context_; }
~scoped_client_context()
{
deleter_(client_context_);
}
private:
scoped_client_context(const scoped_client_context&);
scoped_client_context& operator=(const scoped_client_context&);
wsrep::client_context* client_context_;
D deleter_;
};
}

View File

@ -210,6 +210,8 @@ namespace wsrep
*/
virtual client_context* streaming_applier_client_context() = 0;
virtual void release_client_context(wsrep::client_context*) = 0;
void start_streaming_applier(
const wsrep::id&,
const wsrep::transaction_id&,
@ -442,6 +444,20 @@ namespace wsrep
int debug_log_level_;
};
class client_deleter
{
public:
client_deleter(wsrep::server_context& server_context)
: server_context_(server_context)
{ }
void operator()(wsrep::client_context* client_context)
{
server_context_.release_client_context(client_context);
}
private:
wsrep::server_context& server_context_;
};
static inline std::string to_string(enum wsrep::server_context::state state)
{
switch (state)

View File

@ -246,7 +246,7 @@ int wsrep::server_context::on_apply(
sac->after_command_after_result();
stop_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id());
delete sac;
release_client_context(sac);
}
}
else

View File

@ -719,18 +719,23 @@ int wsrep::transaction_context::certify_fragment(
// Client context to store fragment in separate transaction
// Switch temporarily to sr_transaction_context, switch back
// to original when this goes out of scope
std::auto_ptr<wsrep::client_context> sr_client_context(
client_context_.server_context().local_client_context());
// std::auto_ptr<wsrep::client_context> sr_client_context(
// client_context_.server_context().local_client_context());
wsrep::scoped_client_context<wsrep::client_deleter> sr_client_context_scope(
client_context_.server_context().local_client_context(),
wsrep::client_deleter(client_context_.server_context()));
wsrep::client_context& sr_client_context(
sr_client_context_scope.client_context());
wsrep::client_context_switch client_context_switch(
client_context_,
*sr_client_context);
sr_client_context);
wsrep::unique_lock<wsrep::mutex> sr_lock(sr_client_context->mutex());
wsrep::unique_lock<wsrep::mutex> sr_lock(sr_client_context.mutex());
wsrep::transaction_context& sr_transaction_context(
sr_client_context->transaction_);
sr_client_context.transaction_);
sr_transaction_context.state(sr_lock, s_certifying);
sr_lock.unlock();
if (sr_client_context->append_fragment(
if (sr_client_context.append_fragment(
sr_transaction_context, flags_,
wsrep::const_buffer(data.data(), data.size())))
{
@ -755,7 +760,7 @@ int wsrep::transaction_context::certify_fragment(
sr_transaction_context.certified_ = true;
sr_transaction_context.state(sr_lock, s_committing);
sr_lock.unlock();
if (sr_client_context->commit())
if (sr_client_context.commit())
{
ret = 1;
}
@ -764,7 +769,7 @@ int wsrep::transaction_context::certify_fragment(
sr_lock.lock();
sr_transaction_context.state(sr_lock, s_must_abort);
sr_lock.unlock();
sr_client_context->rollback();
sr_client_context.rollback();
ret = 1;
break;
}

View File

@ -41,6 +41,11 @@ namespace wsrep
*this, client_service_, ++last_client_id_,
wsrep::client_context::m_applier);
}
void release_client_context(wsrep::client_context* client_context)
{
delete client_context;
}
void log_dummy_write_set(wsrep::client_context&,
const wsrep::ws_meta&)
WSREP_OVERRIDE