mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-04-19 21:02:17 +03:00
Changed server_state public methods sst_received() and wait_until_state() to report errors as return value instead of throwing exceptions. This was done to gradually get rid of public methods which report errors via exceptions. This change was part of MDEV-30419.
268 lines
8.8 KiB
C++
268 lines
8.8 KiB
C++
/*
|
|
* Copyright (C) 2018 Codership Oy <info@codership.com>
|
|
*
|
|
* This file is part of wsrep-lib.
|
|
*
|
|
* Wsrep-lib is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation, either version 2 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* Wsrep-lib is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "db_simulator.hpp"
|
|
#include "db_client.hpp"
|
|
#include "db_threads.hpp"
|
|
#include "db_tls.hpp"
|
|
|
|
#include "wsrep/logger.hpp"
|
|
|
|
#include <boost/filesystem.hpp>
|
|
#include <sstream>
|
|
|
|
static db::ti thread_instrumentation;
|
|
static db::tls tls_service;
|
|
|
|
void db::simulator::run()
|
|
{
|
|
start();
|
|
stop();
|
|
std::flush(std::cerr);
|
|
std::cout << "Results:\n";
|
|
std::cout << stats() << std::endl;
|
|
std::cout << db::ti::stats() << std::endl;
|
|
std::cout << db::tls::stats() << std::endl;
|
|
}
|
|
|
|
void db::simulator::sst(db::server& server,
|
|
const std::string& request,
|
|
const wsrep::gtid& gtid,
|
|
bool bypass)
|
|
{
|
|
// The request may contain extra trailing '\0' after it goes
|
|
// through the provider, strip it first.
|
|
std::string name(request);
|
|
name.erase(std::find(name.begin(), name.end(), '\0'), name.end());
|
|
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
auto i(servers_.find(name));
|
|
wsrep::log_info() << "SST request '" << name << "'";
|
|
if (i == servers_.end())
|
|
{
|
|
wsrep::log_error() << "Server " << request << " not found";
|
|
wsrep::log_info() << "servers:";
|
|
for (const auto& s : servers_)
|
|
{
|
|
wsrep::log_info() << "server: " << s.first;
|
|
}
|
|
throw wsrep::runtime_error("Server " + request + " not found");
|
|
}
|
|
if (bypass == false)
|
|
{
|
|
wsrep::log_info() << "SST "
|
|
<< server.server_state().name()
|
|
<< " -> " << request;
|
|
i->second->storage_engine().store_position(gtid);
|
|
i->second->storage_engine().store_view(
|
|
server.storage_engine().get_view());
|
|
}
|
|
|
|
db::client dummy(*(i->second), wsrep::client_id(-1),
|
|
wsrep::client_state::m_local, params());
|
|
|
|
if (i->second->server_state().sst_received(dummy.client_service(), 0))
|
|
{
|
|
throw wsrep::runtime_error("Call to SST received failed");
|
|
}
|
|
server.server_state().sst_sent(gtid, 0);
|
|
}
|
|
|
|
std::string db::simulator::stats() const
|
|
{
|
|
auto duration(std::chrono::duration<double>(
|
|
clients_stop_ - clients_start_).count());
|
|
long long transactions(stats_.commits + stats_.rollbacks);
|
|
long long bf_aborts(0);
|
|
for (const auto& s : servers_)
|
|
{
|
|
bf_aborts += s.second->storage_engine().bf_aborts();
|
|
}
|
|
std::ostringstream os;
|
|
os << "Number of transactions: " << transactions
|
|
<< "\n"
|
|
<< "Seconds: " << duration
|
|
<< " \n"
|
|
<< "Transactions per second: " << double(transactions)/double(duration)
|
|
<< "\n"
|
|
<< "BF aborts: "
|
|
<< bf_aborts
|
|
<< "\n"
|
|
<< "Client commits: " << stats_.commits
|
|
<< "\n"
|
|
<< "Client rollbacks: " << stats_.rollbacks
|
|
<< "\n"
|
|
<< "Client replays: " << stats_.replays;
|
|
return os.str();
|
|
}
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
// Private //
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
void db::simulator::start()
|
|
{
|
|
thread_instrumentation.level(params_.thread_instrumentation);
|
|
thread_instrumentation.cond_checks(params_.cond_checks);
|
|
tls_service.init(params_.tls_service);
|
|
wsrep::log_info() << "Provider: " << params_.wsrep_provider;
|
|
|
|
std::string cluster_address(build_cluster_address());
|
|
wsrep::log_info() << "Cluster address: " << cluster_address;
|
|
for (size_t i(0); i < params_.n_servers; ++i)
|
|
{
|
|
std::ostringstream name_os;
|
|
name_os << (i + 1);
|
|
std::ostringstream id_os;
|
|
id_os << (i + 1);
|
|
std::ostringstream address_os;
|
|
address_os << "127.0.0.1:" << server_port(i);
|
|
wsrep::id server_id(id_os.str());
|
|
auto it(servers_.insert(
|
|
std::make_pair(
|
|
name_os.str(),
|
|
std::make_unique<db::server>(
|
|
*this,
|
|
name_os.str(),
|
|
address_os.str()))));
|
|
if (it.second == false)
|
|
{
|
|
throw wsrep::runtime_error("Failed to add server");
|
|
}
|
|
boost::filesystem::path dir("dbsim_" + id_os.str() + "_data");
|
|
boost::filesystem::create_directory(dir);
|
|
|
|
db::server& server(*it.first->second);
|
|
server.server_state().debug_log_level(params_.debug_log_level);
|
|
std::string server_options(params_.wsrep_provider_options);
|
|
|
|
wsrep::provider::services services;
|
|
services.thread_service = params_.thread_instrumentation
|
|
? &thread_instrumentation
|
|
: nullptr;
|
|
services.tls_service = params_.tls_service
|
|
? &tls_service
|
|
: nullptr;
|
|
if (server.server_state().load_provider(params_.wsrep_provider,
|
|
server_options, services))
|
|
{
|
|
throw wsrep::runtime_error("Failed to load provider");
|
|
}
|
|
if (server.server_state().connect("sim_cluster", cluster_address, "",
|
|
i == 0))
|
|
{
|
|
throw wsrep::runtime_error("Failed to connect");
|
|
}
|
|
wsrep::log_debug() << "main: Starting applier";
|
|
server.start_applier();
|
|
wsrep::log_debug() << "main: Waiting initializing state";
|
|
if (server.server_state().wait_until_state(
|
|
wsrep::server_state::s_initializing))
|
|
{
|
|
throw wsrep::runtime_error("Failed to reach initializing state");
|
|
}
|
|
wsrep::log_debug() << "main: Calling initialized";
|
|
server.server_state().initialized();
|
|
wsrep::log_debug() << "main: Waiting for synced state";
|
|
if (server.server_state().wait_until_state(
|
|
wsrep::server_state::s_synced))
|
|
{
|
|
throw wsrep::runtime_error("Failed to reach synced state");
|
|
}
|
|
wsrep::log_debug() << "main: Server synced";
|
|
}
|
|
|
|
// Start client threads
|
|
wsrep::log_info() << "####################### Starting client load";
|
|
clients_start_ = std::chrono::steady_clock::now();
|
|
size_t index(0);
|
|
for (auto& i : servers_)
|
|
{
|
|
if (params_.topology.size() == 0 || params_.topology[index] == 'm')
|
|
{
|
|
i.second->start_clients();
|
|
}
|
|
++index;
|
|
}
|
|
}
|
|
|
|
void db::simulator::stop()
|
|
{
|
|
for (auto& i : servers_)
|
|
{
|
|
db::server& server(*i.second);
|
|
server.stop_clients();
|
|
}
|
|
clients_stop_ = std::chrono::steady_clock::now();
|
|
wsrep::log_info() << "######## Stats ############";
|
|
wsrep::log_info() << stats();
|
|
std::cout << db::ti::stats() << std::endl;
|
|
wsrep::log_info() << "######## Stats ############";
|
|
if (params_.fast_exit)
|
|
{
|
|
exit(0);
|
|
}
|
|
for (auto& i : servers_)
|
|
{
|
|
db::server& server(*i.second);
|
|
wsrep::log_info() << "Status for server: "
|
|
<< server.server_state().id();
|
|
auto status(server.server_state().provider().status());
|
|
for_each(status.begin(), status.end(),
|
|
[](const wsrep::provider::status_variable& sv)
|
|
{
|
|
wsrep::log_info() << sv.name() << " = " << sv.value();
|
|
});
|
|
server.server_state().disconnect();
|
|
if (server.server_state().wait_until_state(
|
|
wsrep::server_state::s_disconnected))
|
|
{
|
|
throw wsrep::runtime_error("Failed to reach disconnected state");
|
|
}
|
|
server.stop_applier();
|
|
server.server_state().unload_provider();
|
|
}
|
|
}
|
|
|
|
std::string db::simulator::server_port(size_t i) const
|
|
{
|
|
std::ostringstream os;
|
|
os << (10000 + (i + 1)*10);
|
|
return os.str();
|
|
}
|
|
|
|
std::string db::simulator::build_cluster_address() const
|
|
{
|
|
std::string ret;
|
|
if (params_.wsrep_provider.find("galera_smm") != std::string::npos)
|
|
{
|
|
ret += "gcomm://";
|
|
}
|
|
|
|
for (size_t i(0); i < params_.n_servers; ++i)
|
|
{
|
|
std::ostringstream sa_os;
|
|
sa_os << "127.0.0.1:";
|
|
sa_os << server_port(i);
|
|
ret += sa_os.str();
|
|
if (i < params_.n_servers - 1) ret += ",";
|
|
}
|
|
return ret;
|
|
}
|