mirror of
				https://github.com/codership/wsrep-lib.git
				synced 2025-11-03 05:33:11 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			168 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			168 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*
 | 
						|
 * Copyright (C) 2018 Codership Oy <info@codership.com>
 | 
						|
 *
 | 
						|
 * This file is part of wsrep-lib.
 | 
						|
 *
 | 
						|
 * Wsrep-lib is free software: you can redistribute it and/or modify
 | 
						|
 * it under the terms of the GNU General Public License as published by
 | 
						|
 * the Free Software Foundation, either version 2 of the License, or
 | 
						|
 * (at your option) any later version.
 | 
						|
 *
 | 
						|
 * Wsrep-lib is distributed in the hope that it will be useful,
 | 
						|
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
 * GNU General Public License for more details.
 | 
						|
 *
 | 
						|
 * You should have received a copy of the GNU General Public License
 | 
						|
 * along with wsrep-lib.  If not, see <https://www.gnu.org/licenses/>.
 | 
						|
 */
 | 
						|
 | 
						|
#include "db_server.hpp"
 | 
						|
#include "db_server_service.hpp"
 | 
						|
#include "db_high_priority_service.hpp"
 | 
						|
#include "db_client.hpp"
 | 
						|
#include "db_simulator.hpp"
 | 
						|
 | 
						|
#include "wsrep/logger.hpp"
 | 
						|
 | 
						|
#include <ostream>
 | 
						|
#include <cstdio>
 | 
						|
 | 
						|
static wsrep::default_mutex logger_mtx;
 | 
						|
 | 
						|
static void
 | 
						|
logger_fn(wsrep::log::level l, const char* pfx, const char* msg)
 | 
						|
{
 | 
						|
    wsrep::unique_lock<wsrep::mutex> lock(logger_mtx);
 | 
						|
 | 
						|
    struct timespec time;
 | 
						|
    clock_gettime(CLOCK_REALTIME, &time);
 | 
						|
 | 
						|
    time_t const tt(time.tv_sec);
 | 
						|
    struct tm    date;
 | 
						|
    localtime_r(&tt, &date);
 | 
						|
 | 
						|
    char date_str[85] = { '\0', };
 | 
						|
    snprintf(date_str, sizeof(date_str) - 1,
 | 
						|
             "%04d-%02d-%02d %02d:%02d:%02d.%03d",
 | 
						|
             date.tm_year + 1900, date.tm_mon + 1, date.tm_mday,
 | 
						|
             date.tm_hour, date.tm_min, date.tm_sec, (int)time.tv_nsec/1000000);
 | 
						|
 | 
						|
    std::cerr << date_str << ' ' << pfx << wsrep::log::to_c_string(l) << ' '
 | 
						|
              << msg << std::endl;
 | 
						|
}
 | 
						|
 | 
						|
db::server::server(simulator& simulator,
 | 
						|
                   const std::string& name,
 | 
						|
                   const std::string& address)
 | 
						|
    : simulator_(simulator)
 | 
						|
    , storage_engine_(simulator_.params())
 | 
						|
    , mutex_()
 | 
						|
    , cond_()
 | 
						|
    , server_service_(*this)
 | 
						|
    , reporter_(mutex_, name + ".json", 4)
 | 
						|
    , server_state_(server_service_,
 | 
						|
                    name, address, "dbsim_" + name + "_data")
 | 
						|
    , last_client_id_(0)
 | 
						|
    , last_transaction_id_(0)
 | 
						|
    , appliers_()
 | 
						|
    , clients_()
 | 
						|
    , client_threads_()
 | 
						|
{
 | 
						|
    wsrep::log::logger_fn(logger_fn);
 | 
						|
}
 | 
						|
 | 
						|
void db::server::applier_thread()
 | 
						|
{
 | 
						|
    wsrep::client_id client_id(last_client_id_.fetch_add(1) + 1);
 | 
						|
    db::client applier(*this, client_id,
 | 
						|
                       wsrep::client_state::m_high_priority,
 | 
						|
                       simulator_.params());
 | 
						|
    wsrep::client_state* cc(static_cast<wsrep::client_state*>(
 | 
						|
                                &applier.client_state()));
 | 
						|
    db::high_priority_service hps(*this, applier);
 | 
						|
    cc->open(cc->id());
 | 
						|
    cc->before_command();
 | 
						|
    enum wsrep::provider::status ret(
 | 
						|
        server_state_.provider().run_applier(&hps));
 | 
						|
    wsrep::log_info() << "Applier thread exited with error code " << ret;
 | 
						|
    cc->after_command_before_result();
 | 
						|
    cc->after_command_after_result();
 | 
						|
    cc->close();
 | 
						|
    cc->cleanup();
 | 
						|
}
 | 
						|
 | 
						|
void db::server::start_applier()
 | 
						|
{
 | 
						|
    wsrep::unique_lock<wsrep::mutex> lock(mutex_);
 | 
						|
    appliers_.push_back(boost::thread(&server::applier_thread, this));
 | 
						|
}
 | 
						|
 | 
						|
void db::server::stop_applier()
 | 
						|
{
 | 
						|
    wsrep::unique_lock<wsrep::mutex> lock(mutex_);
 | 
						|
    appliers_.front().join();
 | 
						|
    appliers_.erase(appliers_.begin());
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
void db::server::start_clients()
 | 
						|
{
 | 
						|
    size_t n_clients(simulator_.params().n_clients);
 | 
						|
    for (size_t i(0); i < n_clients; ++i)
 | 
						|
    {
 | 
						|
        start_client(i + 1);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
void db::server::stop_clients()
 | 
						|
{
 | 
						|
    for (auto& i : client_threads_)
 | 
						|
    {
 | 
						|
        i.join();
 | 
						|
    }
 | 
						|
    for (const auto& i : clients_)
 | 
						|
    {
 | 
						|
        const struct db::client::stats& stats(i->stats());
 | 
						|
        simulator_.stats_.commits += stats.commits;
 | 
						|
        simulator_.stats_.rollbacks  += stats.rollbacks;
 | 
						|
        simulator_.stats_.replays += stats.replays;
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
void db::server::client_thread(const std::shared_ptr<db::client>& client)
 | 
						|
{
 | 
						|
    client->start();
 | 
						|
}
 | 
						|
 | 
						|
void db::server::start_client(size_t id)
 | 
						|
{
 | 
						|
    auto client(std::make_shared<db::client>(
 | 
						|
                    *this, wsrep::client_id(id),
 | 
						|
                    wsrep::client_state::m_local,
 | 
						|
                    simulator_.params()));
 | 
						|
    clients_.push_back(client);
 | 
						|
    client_threads_.push_back(
 | 
						|
        boost::thread(&db::server::client_thread, this, client));
 | 
						|
}
 | 
						|
 | 
						|
void db::server::donate_sst(const std::string& req,
 | 
						|
                            const wsrep::gtid& gtid,
 | 
						|
                            bool bypass)
 | 
						|
{
 | 
						|
    simulator_.sst(*this, req, gtid, bypass);
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
wsrep::high_priority_service* db::server::streaming_applier_service()
 | 
						|
{
 | 
						|
    throw wsrep::not_implemented_error();
 | 
						|
}
 | 
						|
 | 
						|
void db::server::log_state_change(enum wsrep::server_state::state from,
 | 
						|
                                  enum wsrep::server_state::state to)
 | 
						|
{
 | 
						|
    wsrep::log_info() << "State changed " << from << " -> " << to;
 | 
						|
    reporter_.report_state(to);
 | 
						|
}
 |