mirror of
				https://github.com/codership/wsrep-lib.git
				synced 2025-10-31 06:50:26 +03:00 
			
		
		
		
	Changed server_state public methods sst_received() and wait_until_state() to report errors as return value instead of throwing exceptions. This was done to gradually get rid of public methods which report errors via exceptions. This change was part of MDEV-30419.
		
			
				
	
	
		
			268 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			268 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
|  * Copyright (C) 2018 Codership Oy <info@codership.com>
 | |
|  *
 | |
|  * This file is part of wsrep-lib.
 | |
|  *
 | |
|  * Wsrep-lib is free software: you can redistribute it and/or modify
 | |
|  * it under the terms of the GNU General Public License as published by
 | |
|  * the Free Software Foundation, either version 2 of the License, or
 | |
|  * (at your option) any later version.
 | |
|  *
 | |
|  * Wsrep-lib is distributed in the hope that it will be useful,
 | |
|  * but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|  * GNU General Public License for more details.
 | |
|  *
 | |
|  * You should have received a copy of the GNU General Public License
 | |
|  * along with wsrep-lib.  If not, see <https://www.gnu.org/licenses/>.
 | |
|  */
 | |
| 
 | |
| #include "db_simulator.hpp"
 | |
| #include "db_client.hpp"
 | |
| #include "db_threads.hpp"
 | |
| #include "db_tls.hpp"
 | |
| 
 | |
| #include "wsrep/logger.hpp"
 | |
| 
 | |
| #include <boost/filesystem.hpp>
 | |
| #include <sstream>
 | |
| 
 | |
| static db::ti thread_instrumentation;
 | |
| static db::tls tls_service;
 | |
| 
 | |
| void db::simulator::run()
 | |
| {
 | |
|     start();
 | |
|     stop();
 | |
|     std::flush(std::cerr);
 | |
|     std::cout << "Results:\n";
 | |
|     std::cout << stats() << std::endl;
 | |
|     std::cout << db::ti::stats() << std::endl;
 | |
|     std::cout << db::tls::stats() << std::endl;
 | |
| }
 | |
| 
 | |
| void db::simulator::sst(db::server& server,
 | |
|                         const std::string& request,
 | |
|                         const wsrep::gtid& gtid,
 | |
|                         bool bypass)
 | |
| {
 | |
|     // The request may contain extra trailing '\0' after it goes
 | |
|     // through the provider, strip it first.
 | |
|     std::string name(request);
 | |
|     name.erase(std::find(name.begin(), name.end(), '\0'), name.end());
 | |
| 
 | |
|     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
 | |
|     auto i(servers_.find(name));
 | |
|     wsrep::log_info() << "SST request '" << name << "'";
 | |
|     if (i == servers_.end())
 | |
|     {
 | |
|         wsrep::log_error() << "Server " << request << " not found";
 | |
|         wsrep::log_info() << "servers:";
 | |
|         for (const auto& s : servers_)
 | |
|         {
 | |
|             wsrep::log_info() << "server: " << s.first;
 | |
|         }
 | |
|         throw wsrep::runtime_error("Server " + request + " not found");
 | |
|     }
 | |
|     if (bypass == false)
 | |
|     {
 | |
|         wsrep::log_info() << "SST "
 | |
|                           << server.server_state().name()
 | |
|                           << " -> " << request;
 | |
|         i->second->storage_engine().store_position(gtid);
 | |
|         i->second->storage_engine().store_view(
 | |
|             server.storage_engine().get_view());
 | |
|     }
 | |
| 
 | |
|     db::client dummy(*(i->second), wsrep::client_id(-1),
 | |
|                      wsrep::client_state::m_local, params());
 | |
| 
 | |
|     if (i->second->server_state().sst_received(dummy.client_service(), 0))
 | |
|     {
 | |
|         throw wsrep::runtime_error("Call to SST received failed");
 | |
|     }
 | |
|     server.server_state().sst_sent(gtid, 0);
 | |
| }
 | |
| 
 | |
| std::string db::simulator::stats() const
 | |
| {
 | |
|     auto duration(std::chrono::duration<double>(
 | |
|                       clients_stop_ - clients_start_).count());
 | |
|     long long transactions(stats_.commits + stats_.rollbacks);
 | |
|     long long bf_aborts(0);
 | |
|     for (const auto& s : servers_)
 | |
|     {
 | |
|         bf_aborts += s.second->storage_engine().bf_aborts();
 | |
|     }
 | |
|     std::ostringstream os;
 | |
|     os << "Number of transactions: " << transactions
 | |
|        << "\n"
 | |
|        << "Seconds: " << duration
 | |
|        << " \n"
 | |
|        << "Transactions per second: " << double(transactions)/double(duration)
 | |
|        << "\n"
 | |
|        << "BF aborts: "
 | |
|        << bf_aborts
 | |
|        << "\n"
 | |
|        << "Client commits: " << stats_.commits
 | |
|        << "\n"
 | |
|        << "Client rollbacks: " << stats_.rollbacks
 | |
|        << "\n"
 | |
|        << "Client replays: " << stats_.replays;
 | |
|     return os.str();
 | |
| }
 | |
| 
 | |
| ////////////////////////////////////////////////////////////////////////////////
 | |
| //                              Private                                       //
 | |
| ////////////////////////////////////////////////////////////////////////////////
 | |
| 
 | |
| void db::simulator::start()
 | |
| {
 | |
|     thread_instrumentation.level(params_.thread_instrumentation);
 | |
|     thread_instrumentation.cond_checks(params_.cond_checks);
 | |
|     tls_service.init(params_.tls_service);
 | |
|     wsrep::log_info() << "Provider: " << params_.wsrep_provider;
 | |
| 
 | |
|     std::string cluster_address(build_cluster_address());
 | |
|     wsrep::log_info() << "Cluster address: " << cluster_address;
 | |
|     for (size_t i(0); i < params_.n_servers; ++i)
 | |
|     {
 | |
|         std::ostringstream name_os;
 | |
|         name_os << (i + 1);
 | |
|         std::ostringstream id_os;
 | |
|         id_os << (i + 1);
 | |
|         std::ostringstream address_os;
 | |
|         address_os << "127.0.0.1:" << server_port(i);
 | |
|         wsrep::id server_id(id_os.str());
 | |
|         auto it(servers_.insert(
 | |
|                     std::make_pair(
 | |
|                         name_os.str(),
 | |
|                         std::make_unique<db::server>(
 | |
|                             *this,
 | |
|                             name_os.str(),
 | |
|                             address_os.str()))));
 | |
|         if (it.second == false)
 | |
|         {
 | |
|             throw wsrep::runtime_error("Failed to add server");
 | |
|         }
 | |
|         boost::filesystem::path dir("dbsim_" + id_os.str() + "_data");
 | |
|         boost::filesystem::create_directory(dir);
 | |
| 
 | |
|         db::server& server(*it.first->second);
 | |
|         server.server_state().debug_log_level(params_.debug_log_level);
 | |
|         std::string server_options(params_.wsrep_provider_options);
 | |
| 
 | |
|         wsrep::provider::services services;
 | |
|         services.thread_service = params_.thread_instrumentation
 | |
|                                       ? &thread_instrumentation
 | |
|                                       : nullptr;
 | |
|         services.tls_service = params_.tls_service
 | |
|             ? &tls_service
 | |
|             : nullptr;
 | |
|         if (server.server_state().load_provider(params_.wsrep_provider,
 | |
|                                                 server_options, services))
 | |
|         {
 | |
|             throw wsrep::runtime_error("Failed to load provider");
 | |
|         }
 | |
|         if (server.server_state().connect("sim_cluster", cluster_address, "",
 | |
|                                           i == 0))
 | |
|         {
 | |
|             throw wsrep::runtime_error("Failed to connect");
 | |
|         }
 | |
|         wsrep::log_debug() << "main: Starting applier";
 | |
|         server.start_applier();
 | |
|         wsrep::log_debug() << "main: Waiting initializing state";
 | |
|         if (server.server_state().wait_until_state(
 | |
|                 wsrep::server_state::s_initializing))
 | |
|         {
 | |
|             throw wsrep::runtime_error("Failed to reach initializing state");
 | |
|         }
 | |
|         wsrep::log_debug() << "main: Calling initialized";
 | |
|         server.server_state().initialized();
 | |
|         wsrep::log_debug() << "main: Waiting for synced state";
 | |
|         if (server.server_state().wait_until_state(
 | |
|                 wsrep::server_state::s_synced))
 | |
|         {
 | |
|             throw wsrep::runtime_error("Failed to reach synced state");
 | |
|         }
 | |
|         wsrep::log_debug() << "main: Server synced";
 | |
|     }
 | |
| 
 | |
|     // Start client threads
 | |
|     wsrep::log_info() << "####################### Starting client load";
 | |
|     clients_start_ = std::chrono::steady_clock::now();
 | |
|     size_t index(0);
 | |
|     for (auto& i : servers_)
 | |
|     {
 | |
|         if (params_.topology.size() == 0 || params_.topology[index]  == 'm')
 | |
|         {
 | |
|             i.second->start_clients();
 | |
|         }
 | |
|         ++index;
 | |
|     }
 | |
| }
 | |
| 
 | |
| void db::simulator::stop()
 | |
| {
 | |
|     for (auto& i : servers_)
 | |
|     {
 | |
|         db::server& server(*i.second);
 | |
|         server.stop_clients();
 | |
|     }
 | |
|     clients_stop_ = std::chrono::steady_clock::now();
 | |
|     wsrep::log_info() << "######## Stats ############";
 | |
|     wsrep::log_info()  << stats();
 | |
|     std::cout << db::ti::stats() << std::endl;
 | |
|     wsrep::log_info() << "######## Stats ############";
 | |
|     if (params_.fast_exit)
 | |
|     {
 | |
|         exit(0);
 | |
|     }
 | |
|     for (auto& i : servers_)
 | |
|     {
 | |
|         db::server& server(*i.second);
 | |
|         wsrep::log_info() << "Status for server: "
 | |
|                           << server.server_state().id();
 | |
|         auto status(server.server_state().provider().status());
 | |
|         for_each(status.begin(), status.end(),
 | |
|                  [](const wsrep::provider::status_variable& sv)
 | |
|                  {
 | |
|                      wsrep::log_info() << sv.name() << " = " << sv.value();
 | |
|                  });
 | |
|         server.server_state().disconnect();
 | |
|         if (server.server_state().wait_until_state(
 | |
|                 wsrep::server_state::s_disconnected))
 | |
|         {
 | |
|             throw wsrep::runtime_error("Failed to reach disconnected state");
 | |
|         }
 | |
|         server.stop_applier();
 | |
|         server.server_state().unload_provider();
 | |
|     }
 | |
| }
 | |
| 
 | |
| std::string db::simulator::server_port(size_t i) const
 | |
| {
 | |
|     std::ostringstream os;
 | |
|     os << (10000 + (i + 1)*10);
 | |
|     return os.str();
 | |
| }
 | |
| 
 | |
| std::string db::simulator::build_cluster_address() const
 | |
| {
 | |
|     std::string ret;
 | |
|     if (params_.wsrep_provider.find("galera_smm") != std::string::npos)
 | |
|     {
 | |
|         ret += "gcomm://";
 | |
|     }
 | |
| 
 | |
|     for (size_t i(0); i < params_.n_servers; ++i)
 | |
|     {
 | |
|         std::ostringstream sa_os;
 | |
|         sa_os << "127.0.0.1:";
 | |
|         sa_os << server_port(i);
 | |
|         ret += sa_os.str();
 | |
|         if (i < params_.n_servers - 1) ret += ",";
 | |
|     }
 | |
|     return ret;
 | |
| }
 |