mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-04-19 21:02:17 +03:00
168 lines
4.8 KiB
C++
168 lines
4.8 KiB
C++
/*
|
|
* Copyright (C) 2018 Codership Oy <info@codership.com>
|
|
*
|
|
* This file is part of wsrep-lib.
|
|
*
|
|
* Wsrep-lib is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation, either version 2 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* Wsrep-lib is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "db_server.hpp"
|
|
#include "db_server_service.hpp"
|
|
#include "db_high_priority_service.hpp"
|
|
#include "db_client.hpp"
|
|
#include "db_simulator.hpp"
|
|
|
|
#include "wsrep/logger.hpp"
|
|
|
|
#include <ostream>
|
|
#include <cstdio>
|
|
|
|
static wsrep::default_mutex logger_mtx;
|
|
|
|
static void
|
|
logger_fn(wsrep::log::level l, const char* pfx, const char* msg)
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(logger_mtx);
|
|
|
|
struct timespec time;
|
|
clock_gettime(CLOCK_REALTIME, &time);
|
|
|
|
time_t const tt(time.tv_sec);
|
|
struct tm date;
|
|
localtime_r(&tt, &date);
|
|
|
|
char date_str[85] = { '\0', };
|
|
snprintf(date_str, sizeof(date_str) - 1,
|
|
"%04d-%02d-%02d %02d:%02d:%02d.%03d",
|
|
date.tm_year + 1900, date.tm_mon + 1, date.tm_mday,
|
|
date.tm_hour, date.tm_min, date.tm_sec, (int)time.tv_nsec/1000000);
|
|
|
|
std::cerr << date_str << ' ' << pfx << wsrep::log::to_c_string(l) << ' '
|
|
<< msg << std::endl;
|
|
}
|
|
|
|
db::server::server(simulator& simulator,
|
|
const std::string& name,
|
|
const std::string& address)
|
|
: simulator_(simulator)
|
|
, storage_engine_(simulator_.params())
|
|
, mutex_()
|
|
, cond_()
|
|
, server_service_(*this)
|
|
, reporter_(mutex_, name + ".json", 4)
|
|
, server_state_(server_service_,
|
|
name, address, "dbsim_" + name + "_data")
|
|
, last_client_id_(0)
|
|
, last_transaction_id_(0)
|
|
, appliers_()
|
|
, clients_()
|
|
, client_threads_()
|
|
{
|
|
wsrep::log::logger_fn(logger_fn);
|
|
}
|
|
|
|
void db::server::applier_thread()
|
|
{
|
|
wsrep::client_id client_id(last_client_id_.fetch_add(1) + 1);
|
|
db::client applier(*this, client_id,
|
|
wsrep::client_state::m_high_priority,
|
|
simulator_.params());
|
|
wsrep::client_state* cc(static_cast<wsrep::client_state*>(
|
|
&applier.client_state()));
|
|
db::high_priority_service hps(*this, applier);
|
|
cc->open(cc->id());
|
|
cc->before_command();
|
|
enum wsrep::provider::status ret(
|
|
server_state_.provider().run_applier(&hps));
|
|
wsrep::log_info() << "Applier thread exited with error code " << ret;
|
|
cc->after_command_before_result();
|
|
cc->after_command_after_result();
|
|
cc->close();
|
|
cc->cleanup();
|
|
}
|
|
|
|
void db::server::start_applier()
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
appliers_.push_back(boost::thread(&server::applier_thread, this));
|
|
}
|
|
|
|
void db::server::stop_applier()
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
appliers_.front().join();
|
|
appliers_.erase(appliers_.begin());
|
|
}
|
|
|
|
|
|
void db::server::start_clients()
|
|
{
|
|
size_t n_clients(simulator_.params().n_clients);
|
|
for (size_t i(0); i < n_clients; ++i)
|
|
{
|
|
start_client(i + 1);
|
|
}
|
|
}
|
|
|
|
void db::server::stop_clients()
|
|
{
|
|
for (auto& i : client_threads_)
|
|
{
|
|
i.join();
|
|
}
|
|
for (const auto& i : clients_)
|
|
{
|
|
const struct db::client::stats& stats(i->stats());
|
|
simulator_.stats_.commits += stats.commits;
|
|
simulator_.stats_.rollbacks += stats.rollbacks;
|
|
simulator_.stats_.replays += stats.replays;
|
|
}
|
|
}
|
|
|
|
void db::server::client_thread(const std::shared_ptr<db::client>& client)
|
|
{
|
|
client->start();
|
|
}
|
|
|
|
void db::server::start_client(size_t id)
|
|
{
|
|
auto client(std::make_shared<db::client>(
|
|
*this, wsrep::client_id(id),
|
|
wsrep::client_state::m_local,
|
|
simulator_.params()));
|
|
clients_.push_back(client);
|
|
client_threads_.push_back(
|
|
boost::thread(&db::server::client_thread, this, client));
|
|
}
|
|
|
|
void db::server::donate_sst(const std::string& req,
|
|
const wsrep::gtid& gtid,
|
|
bool bypass)
|
|
{
|
|
simulator_.sst(*this, req, gtid, bypass);
|
|
}
|
|
|
|
|
|
wsrep::high_priority_service* db::server::streaming_applier_service()
|
|
{
|
|
throw wsrep::not_implemented_error();
|
|
}
|
|
|
|
void db::server::log_state_change(enum wsrep::server_state::state from,
|
|
enum wsrep::server_state::state to)
|
|
{
|
|
wsrep::log_info() << "State changed " << from << " -> " << to;
|
|
reporter_.report_state(to);
|
|
}
|