1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-04-18 10:24:01 +03:00

Implemented thread service support.

Added a wsrep::thread_service interface to allow application to
inject instrumented thread, mutex and condition variable implementation
for provider.

The interface is defined in include/wsrep/thread_service.hpp.
Sample implementation is provided in dbsim/db_threads.[h|c]pp.

This patch will also clean up some remaining dependencies to
wsrep-API compilation units so that the dependency to wsrep-API
is header only. This will extending the provider support to
later wsrep-API versions.
This commit is contained in:
Teemu Ollakka 2019-02-16 17:09:18 +02:00
parent 477a71dd46
commit eb4cf86c1e
33 changed files with 1821 additions and 58 deletions

58
.clang-format Normal file
View File

@ -0,0 +1,58 @@
---
Language: Cpp
# BasedOnStyle: WebKit
AccessModifierOffset: -4
ConstructorInitializerIndentWidth: 4
AlignEscapedNewlinesLeft: false
AlignTrailingComments: false
AllowAllParametersOfDeclarationOnNextLine: true
AllowShortBlocksOnASingleLine: false
AllowShortCaseLabelsOnASingleLine: true
AllowShortIfStatementsOnASingleLine: false
AllowShortLoopsOnASingleLine: false
AllowShortFunctionsOnASingleLine: All
AlwaysBreakTemplateDeclarations: false
AlwaysBreakBeforeMultilineStrings: false
BreakBeforeBinaryOperators: true
BreakBeforeTernaryOperators: true
BreakConstructorInitializersBeforeComma: true
BinPackParameters: true
ColumnLimit: 80
ConstructorInitializerAllOnOneLineOrOnePerLine: false
DerivePointerAlignment: false
ExperimentalAutoDetectBinPacking: false
IndentCaseLabels: false
IndentWrappedFunctionNames: false
IndentFunctionDeclarationAfterType: false
MaxEmptyLinesToKeep: 1
KeepEmptyLinesAtTheStartOfBlocks: true
NamespaceIndentation: All
ObjCSpaceAfterProperty: true
ObjCSpaceBeforeProtocolList: true
PenaltyBreakBeforeFirstCallParameter: 19
PenaltyBreakComment: 300
PenaltyBreakString: 1000
PenaltyBreakFirstLessLess: 120
PenaltyExcessCharacter: 1000000
PenaltyReturnTypeOnItsOwnLine: 60
PointerAlignment: Left
SpacesBeforeTrailingComments: 1
Cpp11BracedListStyle: false
Standard: Cpp11
IndentWidth: 4
TabWidth: 8
UseTab: Never
BreakBeforeBraces: Allman
SpacesInParentheses: false
SpacesInAngles: false
SpaceInEmptyParentheses: false
SpacesInCStyleCastParentheses: false
SpacesInContainerLiterals: true
SpaceBeforeAssignmentOperators: true
ContinuationIndentWidth: 4
CommentPragmas: '^ IWYU pragma:'
ForEachMacros: [ foreach, Q_FOREACH, BOOST_FOREACH ]
SpaceBeforeParens: ControlStatements
DisableFormat: false
...

1
.gitmodules vendored
View File

@ -1,4 +1,3 @@
[submodule "wsrep-API/v26"]
path = wsrep-API/v26
url = https://github.com/codership/wsrep-API.git
branch = master

View File

@ -82,7 +82,7 @@ endif()
# Set up include directories
include_directories("${CMAKE_CURRENT_SOURCE_DIR}/include")
include_directories("${CMAKE_CURRENT_SOURCE_DIR}/wsrep-API/v26")
include_directories("${CMAKE_CURRENT_SOURCE_DIR}/wsrep-API")
if (WSREP_LIB_WITH_UNIT_TESTS)
find_package(Boost 1.54.0 REQUIRED

View File

@ -12,6 +12,7 @@ add_executable(dbsim
db_server_state.cpp
db_simulator.cpp
db_storage_engine.cpp
db_threads.cpp
dbsim.cpp
)

View File

@ -72,7 +72,16 @@ db::params db::parse_args(int argc, char** argv)
("debug-log-level", po::value<int>(&params.debug_log_level),
"debug logging level: 0 - none, 1 - verbose")
("fast-exit", po::value<int>(&params.fast_exit),
"exit from simulation without graceful shutdown");
"exit from simulation without graceful shutdown")
("ti",
po::value<int>(&params.thread_instrumentation),
"use instrumentation for threads/mutexes/condition variables"
"(0 default disabled, 1 total counts, 2 per object)")
("ti-cond-checks",
po::value<bool>(&params.cond_checks),
"Enable checks for correct condition variable use. "
" Effective only if thread-instrumentation is enabled")
;
try
{
po::variables_map vm;
@ -86,8 +95,14 @@ db::params db::parse_args(int argc, char** argv)
validate_params(params);
}
catch (const po::error& e)
{
std::cerr << "Error parsing arguments: " << e.what() << "\n";
exit(1);
}
catch (...)
{
std::cerr << "Error parsing arguments\n";
std::cerr << desc << "\n";
exit(1);
}

View File

@ -37,6 +37,8 @@ namespace db
std::string wsrep_provider_options;
int debug_log_level;
int fast_exit;
int thread_instrumentation;
bool cond_checks;
params()
: n_servers(0)
, n_clients(0)
@ -48,6 +50,8 @@ namespace db
, wsrep_provider_options()
, debug_log_level(0)
, fast_exit(0)
, thread_instrumentation()
, cond_checks()
{ }
};

View File

@ -102,7 +102,6 @@ void db::server::stop_clients()
void db::server::client_thread(const std::shared_ptr<db::client>& client)
{
client->store_globals();
client->start();
}

View File

@ -19,12 +19,15 @@
#include "db_simulator.hpp"
#include "db_client.hpp"
#include "db_threads.hpp"
#include "wsrep/logger.hpp"
#include <boost/filesystem.hpp>
#include <sstream>
static db::ti thread_instrumentation;
void db::simulator::run()
{
start();
@ -32,6 +35,7 @@ void db::simulator::run()
std::flush(std::cerr);
std::cout << "Results:\n";
std::cout << stats() << std::endl;
std::cout << db::ti::stats() << std::endl;
}
void db::simulator::sst(db::server& server,
@ -108,6 +112,8 @@ std::string db::simulator::stats() const
void db::simulator::start()
{
thread_instrumentation.level(params_.thread_instrumentation);
thread_instrumentation.cond_checks(params_.cond_checks);
wsrep::log_info() << "Provider: " << params_.wsrep_provider;
std::string cluster_address(build_cluster_address());
@ -139,13 +145,17 @@ void db::simulator::start()
server.server_state().debug_log_level(params_.debug_log_level);
std::string server_options(params_.wsrep_provider_options);
if (server.server_state().load_provider(
params_.wsrep_provider, server_options))
wsrep::provider::services services;
services.thread_service = params_.thread_instrumentation
? &thread_instrumentation
: nullptr;
if (server.server_state().load_provider(params_.wsrep_provider,
server_options, services))
{
throw wsrep::runtime_error("Failed to load provider");
}
if (server.server_state().connect("sim_cluster", cluster_address, "",
i == 0))
i == 0))
{
throw wsrep::runtime_error("Failed to connect");
}
@ -185,6 +195,7 @@ void db::simulator::stop()
clients_stop_ = std::chrono::steady_clock::now();
wsrep::log_info() << "######## Stats ############";
wsrep::log_info() << stats();
std::cout << db::ti::stats() << std::endl;
wsrep::log_info() << "######## Stats ############";
if (params_.fast_exit)
{

718
dbsim/db_threads.cpp Normal file
View File

@ -0,0 +1,718 @@
/*
* Copyright (C) 2019 Codership Oy <info@codership.com>
*
* This file is part of wsrep-lib.
*
* Wsrep-lib is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* Wsrep-lib is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
*/
#include "db_threads.hpp"
#include "wsrep/compiler.hpp"
#include "wsrep/logger.hpp"
#include <cassert>
#include <pthread.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <map>
#include <mutex>
#include <ostream>
#include <sstream>
#include <thread>
#include <unordered_map>
#include <vector>
namespace
{
struct ti_obj
{
};
enum ti_opcode
{
oc_thread_create,
oc_thread_destroy,
oc_mutex_create,
oc_mutex_destroy,
oc_mutex_lock,
oc_mutex_trylock,
oc_mutex_unlock,
oc_cond_create,
oc_cond_destroy,
oc_cond_wait,
oc_cond_timedwait,
oc_cond_signal,
oc_cond_broadcast,
oc_max // must be the last
};
static const char* ti_opstring(enum ti_opcode op)
{
switch (op)
{
case oc_thread_create: return "thread_create";
case oc_thread_destroy: return "thread_destroy";
case oc_mutex_create: return "mutex_create";
case oc_mutex_destroy: return "mutex_destroy";
case oc_mutex_lock: return "mutex_lock";
case oc_mutex_trylock: return "mutex_trylock";
case oc_mutex_unlock: return "mutex_unlock";
case oc_cond_create: return "cond_create";
case oc_cond_destroy: return "cond_destroy";
case oc_cond_wait: return "cond_wait";
case oc_cond_timedwait: return "cond_timedwait";
case oc_cond_signal: return "cond_signal";
case oc_cond_broadcast: return "cond_broadcast";
default: return "unknown";
}
}
static std::vector<std::string> key_vec;
static std::atomic<int> key_cnt;
static std::vector<std::vector<size_t>> ops_map;
static std::vector<std::mutex*> ops_map_sync;
static struct ops_map_sync_deleter
{
~ops_map_sync_deleter()
{
std::for_each(ops_map_sync.begin(), ops_map_sync.end(),
[](auto entry) { delete entry; });
}
} ops_map_sync_deleter;
static std::array<std::atomic<size_t>, oc_max> total_ops;
static std::atomic<size_t> total_allocations;
static std::atomic<size_t> mutex_contention;
static std::unordered_map<std::string, size_t> mutex_contention_counts;
static int op_level;
// Check correct condition variable usage:
// - Associated mutex must be locked when waiting for cond
// - There must be at least one waiter when signalling for condition
static bool cond_checks;
static inline void cond_check(bool condition, const char* name,
const char* message)
{
if (cond_checks && !condition)
{
wsrep::log_error() << "Condition variable check failed for '"
<< name << "': " << message;
::abort();
}
}
static inline int append_key(const char* name, const char* type)
{
key_vec.push_back(std::string(name) + "_" + type);
wsrep::log_info() << "Register key " << name << "_" << type
<< " with index " << (key_cnt + 1);
ops_map.push_back(std::vector<size_t>());
ops_map_sync.push_back(new std::mutex());
ops_map.back().resize(oc_max);
return ++key_cnt;
}
template <class Key> static inline size_t get_key_index(const Key* key)
{
size_t index(reinterpret_cast<const size_t>(key) - 1);
assert(index >= 0 && index < key_vec.size());
return index;
}
template <class Key>
static inline const char* get_key_name(const Key* key)
{
return key_vec[get_key_index(key)].c_str();
}
static inline const std::string& get_key_name_by_index(size_t index)
{
assert(index < key_vec.size());
return key_vec[index];
}
// Note: Do not refer the obj pointer in this function, it may
// have been deleted before the call.
template <class Key>
static inline void update_ops(const ti_obj* obj,
const Key* key,
enum ti_opcode op)
{
if (op_level < 1)
return;
total_ops[op] += 1;
if (op_level < 2)
return;
if (false && op == oc_mutex_destroy)
{
wsrep::log_info() << "thread: " << std::this_thread::get_id()
<< " object: " << obj
<< ": name: " << get_key_name(key)
<< " op: " << ti_opstring(op);
}
std::lock_guard<std::mutex> lock(*ops_map_sync[get_key_index(key)]);
ops_map[get_key_index(key)][op] += 1;
}
struct thread_args
{
void* this_thread;
void* (*fn)(void*);
void* args;
};
pthread_key_t this_thread_key;
struct this_thread_key_initializer
{
this_thread_key_initializer()
{
pthread_key_create(&this_thread_key, nullptr);
}
~this_thread_key_initializer()
{
pthread_key_delete(this_thread_key);
}
};
void* thread_start_fn(void* args_ptr);
class ti_thread : public ti_obj
{
public:
ti_thread(const wsrep::thread_service::thread_key* key)
: key_(key)
, th_()
, retval_()
, detached_()
{
update_ops(this, key_, oc_thread_create);
}
~ti_thread()
{
update_ops(this, key_, oc_thread_destroy);
}
ti_thread(const ti_thread&) = delete;
ti_thread& operator=(const ti_thread&) = delete;
int run(void* (*fn)(void *), void* args)
{
auto ta(new thread_args{this, fn, args});
return pthread_create(&th_, nullptr, thread_start_fn, ta);
}
int detach()
{
detached_ = true;
return pthread_detach(th_);
}
int join(void** retval)
{
return pthread_join(th_, retval);
}
bool detached() const { return detached_; }
void retval(void* retval) { retval_ = retval; }
static ti_thread* self()
{
return reinterpret_cast<ti_thread*>(
pthread_getspecific(this_thread_key));
}
int setschedparam(int policy, const struct sched_param* param)
{
return pthread_setschedparam(th_, policy, param);
}
int getschedparam(int* policy, struct sched_param* param)
{
return pthread_getschedparam(th_, policy, param);
}
int equal(ti_thread* other)
{
return pthread_equal(th_, other->th_);
}
private:
const wsrep::thread_service::thread_key* key_;
pthread_t th_;
void* retval_;
bool detached_;
};
void* thread_start_fn(void* args_ptr)
{
thread_args* ta(reinterpret_cast<thread_args*>(args_ptr));
ti_thread* thread = reinterpret_cast<ti_thread*>(ta->this_thread);
pthread_setspecific(this_thread_key, thread);
void* (*fn)(void*) = ta->fn;
void* args = ta->args;
delete ta;
void* ret((*fn)(args));
pthread_setspecific(this_thread_key, nullptr);
// If we end here the thread returned instead of calling
// pthread_exit()
if (thread->detached())
delete thread;
return ret;
}
class ti_mutex : public ti_obj
{
public:
ti_mutex(const wsrep::thread_service::mutex_key* key, bool inplace)
: mutex_(PTHREAD_MUTEX_INITIALIZER)
, key_(key)
, inplace_(inplace)
#ifndef NDEBUG
, locked_()
, owner_()
#endif // ! NDEBUG
{
update_ops(this, key_, oc_mutex_create);
if (not inplace) total_allocations++;
}
~ti_mutex() { update_ops(this, key_, oc_mutex_destroy); }
ti_mutex& operator=(const ti_mutex&) = delete;
ti_mutex(const ti_mutex&) = delete;
int lock()
{
update_ops(this, key_, oc_mutex_lock);
int ret(pthread_mutex_trylock(&mutex_));
if (ret == EBUSY)
{
mutex_contention++;
{
std::lock_guard<std::mutex> lock(*ops_map_sync[get_key_index(key_)]);
mutex_contention_counts[get_key_name(key_)] += 1;
}
ret = pthread_mutex_lock(&mutex_);
}
#ifndef NDEBUG
if (ret == 0)
{
assert(owner_ == std::thread::id());
locked_ = true;
owner_ = std::this_thread::get_id();
}
#endif // ! NDEBUG
return ret;
}
int trylock()
{
update_ops(this, key_, oc_mutex_trylock);
int ret(pthread_mutex_trylock(&mutex_));
#ifndef NDEBUG
if (ret == 0)
{
assert(owner_ == std::thread::id());
locked_ = true;
owner_ = std::this_thread::get_id();
}
#endif // ! NDEBUG
return ret;
}
int unlock()
{
assert(locked_);
#ifndef NDEBUG
assert(owner_ == std::this_thread::get_id());
owner_ = std::thread::id();
#endif // ! NDEBUG
// Use temporary object. After mutex is unlocked it may be
// destroyed before this update_ops() finishes.
auto key(key_);
int ret(pthread_mutex_unlock(&mutex_));
update_ops(this, key, oc_mutex_unlock);
return ret;
}
struct condwait_context
{
#ifndef NDEBUG
bool locked;
std::thread::id owner;
#endif // ! NDEBUG
};
condwait_context save_for_condwait()
{
#ifndef NDEBUG
return condwait_context{ locked_, owner_ };
#else
return condwait_context{};
#endif // ! NDEBUG
}
void reset()
{
#ifndef NDEBUG
locked_ = false;
owner_ = std::thread::id();
#endif // ! NDEBUG
}
void restore_from_condwait(const condwait_context& ctx WSREP_UNUSED)
{
#ifndef NDEBUG
locked_ = ctx.locked;
owner_ = ctx.owner;
#endif // ! NDEBUG
}
pthread_mutex_t* native_handle() { return &mutex_; }
const wsrep::thread_service::mutex_key* key() const { return key_; }
bool inplace() const { return inplace_; }
private:
pthread_mutex_t mutex_;
const wsrep::thread_service::mutex_key* key_;
const bool inplace_;
#ifndef NDEBUG
bool locked_;
std::atomic<std::thread::id> owner_;
#endif // ! NDEBU
};
class ti_cond : public ti_obj
{
public:
ti_cond(const wsrep::thread_service::cond_key* key, bool inplace)
: cond_(PTHREAD_COND_INITIALIZER)
, key_(key)
, inplace_(inplace)
, waiter_()
{
update_ops(this, key_, oc_cond_create);
if (not inplace) total_allocations++;
}
~ti_cond() { update_ops(this, key_, oc_cond_destroy); }
ti_cond& operator=(const ti_cond&) = delete;
ti_cond(const ti_cond&) = delete;
int wait(ti_mutex& mutex)
{
cond_check(pthread_mutex_trylock(mutex.native_handle()),
get_key_name(key_), "Mutex not locked in cond wait");
waiter_ = true;
update_ops(this, key_, oc_cond_wait);
// update_ops(&mutex, mutex.key(), oc_mutex_unlock);
auto condwait_ctx(mutex.save_for_condwait());
mutex.reset();
int ret(pthread_cond_wait(&cond_, mutex.native_handle()));
// update_ops(&mutex, mutex.key(), oc_mutex_lock);
mutex.restore_from_condwait(condwait_ctx);
waiter_ = false;
return ret;
}
int timedwait(ti_mutex& mutex, const struct timespec* ts)
{
cond_check(pthread_mutex_trylock(mutex.native_handle()),
get_key_name(key_), "Mutex not locked in cond wait");
waiter_ = true;
update_ops(this, key_, oc_cond_timedwait);
// update_ops(&mutex, mutex.key(), oc_mutex_unlock);
auto condwait_ctx(mutex.save_for_condwait());
mutex.reset();
int ret(pthread_cond_timedwait(&cond_, mutex.native_handle(), ts));
// update_ops(&mutex, mutex.key(), oc_mutex_lock);
mutex.restore_from_condwait(condwait_ctx);
waiter_ = false;
return ret;
}
int signal()
{
update_ops(this, key_, oc_cond_signal);
cond_check(waiter_, get_key_name(key_),
"Signalling condition variable without waiter");
return pthread_cond_signal(&cond_);
}
int broadcast()
{
update_ops(this, key_, oc_cond_broadcast);
return pthread_cond_broadcast(&cond_);
}
bool inplace() const { return inplace_; }
private:
pthread_cond_t cond_;
const wsrep::thread_service::cond_key* key_;
const bool inplace_;
bool waiter_;
};
}
int db::ti::before_init()
{
wsrep::log_info() << "db::ti::before_init()";
return 0;
}
int db::ti::after_init()
{
wsrep::log_info() << "db::ti::after_init()";
return 0;
}
//////////////////////////////////////////////////////////////////////////////
// Thread //
//////////////////////////////////////////////////////////////////////////////
const wsrep::thread_service::thread_key*
db::ti::create_thread_key(const char* name) WSREP_NOEXCEPT
{
assert(name);
return reinterpret_cast<const wsrep::thread_service::thread_key*>(
append_key(name, "thread"));
}
int db::ti::create_thread(const wsrep::thread_service::thread_key* key,
wsrep::thread_service::thread** thread,
void* (*fn)(void*), void* args) WSREP_NOEXCEPT
{
auto pit(new ti_thread(key));
total_allocations++;
int ret;
if ((ret = pit->run(fn, args)))
{
delete pit;
}
else
{
*thread = reinterpret_cast<wsrep::thread_service::thread*>(pit);
}
return ret;
}
int db::ti::detach(wsrep::thread_service::thread* thread) WSREP_NOEXCEPT
{
return reinterpret_cast<ti_thread*>(thread)->detach();
}
void db::ti::exit(wsrep::thread_service::thread* thread, void* retval) WSREP_NOEXCEPT
{
ti_thread* th(reinterpret_cast<ti_thread*>(thread));
th->retval(retval);
if (th->detached())
delete th;
pthread_exit(retval);
}
int db::ti::equal(wsrep::thread_service::thread* thread_1,
wsrep::thread_service::thread* thread_2) WSREP_NOEXCEPT
{
return (reinterpret_cast<ti_thread*>(thread_1)->equal(
reinterpret_cast<ti_thread*>(thread_2)));
}
int db::ti::join(wsrep::thread_service::thread* thread, void** retval) WSREP_NOEXCEPT
{
ti_thread* th(reinterpret_cast<ti_thread*>(thread));
int ret(th->join(retval));
if (not th->detached())
{
delete th;
}
return ret;
}
wsrep::thread_service::thread* db::ti::self() WSREP_NOEXCEPT
{
return reinterpret_cast<wsrep::thread_service::thread*>(ti_thread::self());
}
int db::ti::setschedparam(wsrep::thread_service::thread* thread,
int policy, const struct sched_param* param) WSREP_NOEXCEPT
{
return reinterpret_cast<ti_thread*>(thread)->setschedparam(policy, param);
}
int db::ti::getschedparam(wsrep::thread_service::thread* thread,
int* policy, struct sched_param* param) WSREP_NOEXCEPT
{
return reinterpret_cast<ti_thread*>(thread)->getschedparam(policy, param);
}
//////////////////////////////////////////////////////////////////////////////
// Mutex //
//////////////////////////////////////////////////////////////////////////////
const wsrep::thread_service::mutex_key*
db::ti::create_mutex_key(const char* name) WSREP_NOEXCEPT
{
assert(name);
return reinterpret_cast<const wsrep::thread_service::mutex_key*>(
append_key(name, "mutex"));
}
wsrep::thread_service::mutex*
db::ti::init_mutex(const wsrep::thread_service::mutex_key* key, void* memblock,
size_t memblock_size) WSREP_NOEXCEPT
{
return reinterpret_cast<wsrep::thread_service::mutex*>(
memblock_size >= sizeof(ti_mutex) ? new (memblock) ti_mutex(key, true)
: new ti_mutex(key, false));
}
int db::ti::destroy(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT
{
ti_mutex* m(reinterpret_cast<ti_mutex*>(mutex));
if (m->inplace())
{
m->~ti_mutex();
}
else
{
delete m;
}
return 0;
}
int db::ti::lock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT
{
return reinterpret_cast<ti_mutex*>(mutex)->lock();
}
int db::ti::trylock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT
{
return reinterpret_cast<ti_mutex*>(mutex)->trylock();
}
int db::ti::unlock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT
{
return reinterpret_cast<ti_mutex*>(mutex)->unlock();
}
//////////////////////////////////////////////////////////////////////////////
// Cond //
//////////////////////////////////////////////////////////////////////////////
const wsrep::thread_service::cond_key* db::ti::create_cond_key(const char* name) WSREP_NOEXCEPT
{
assert(name);
return reinterpret_cast<const wsrep::thread_service::cond_key*>(
append_key(name, "cond"));
}
wsrep::thread_service::cond*
db::ti::init_cond(const wsrep::thread_service::cond_key* key, void* memblock,
size_t memblock_size) WSREP_NOEXCEPT
{
return reinterpret_cast<wsrep::thread_service::cond*>(
memblock_size >= sizeof(ti_cond) ? new (memblock) ti_cond(key, true)
: new ti_cond(key, false));
}
int db::ti::destroy(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT
{
ti_cond* c(reinterpret_cast<ti_cond*>(cond));
if (c->inplace())
{
c->~ti_cond();
}
else
{
delete c;
}
return 0;
}
int db::ti::wait(wsrep::thread_service::cond* cond,
wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT
{
return reinterpret_cast<ti_cond*>(cond)->wait(
*reinterpret_cast<ti_mutex*>(mutex));
}
int db::ti::timedwait(wsrep::thread_service::cond* cond,
wsrep::thread_service::mutex* mutex,
const struct timespec* ts) WSREP_NOEXCEPT
{
return reinterpret_cast<ti_cond*>(cond)->timedwait(
*reinterpret_cast<ti_mutex*>(mutex), ts);
}
int db::ti::signal(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT
{
return reinterpret_cast<ti_cond*>(cond)->signal();
}
int db::ti::broadcast(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT
{
return reinterpret_cast<ti_cond*>(cond)->broadcast();
}
void db::ti::level(int level)
{
::op_level = level;
}
void db::ti::cond_checks(bool cond_checks)
{
if (cond_checks)
wsrep::log_info() << "Enabling condition variable checking";
::cond_checks = cond_checks;
}
std::string db::ti::stats()
{
std::ostringstream os;
os << "Totals:\n";
for (size_t i(0); i < total_ops.size(); ++i)
{
if (total_ops[i] > 0)
{
os << " " << ti_opstring(static_cast<enum ti_opcode>(i)) << ": "
<< total_ops[i] << "\n";
}
}
os << "Total allocations: " << total_allocations << "\n";
os << "Mutex contention: " << mutex_contention << "\n";
for (auto i : mutex_contention_counts)
{
os << " " << i.first << ": " << i.second << "\n";
}
os << "Per key:\n";
std::map<std::string, std::vector<size_t>> sorted;
for (size_t i(0); i < ops_map.size(); ++i)
{
sorted.insert(std::make_pair(get_key_name_by_index(i), ops_map[i]));
}
for (auto i : sorted)
{
for (size_t j(0); j < i.second.size(); ++j)
{
if (i.second[j])
{
os << " " << i.first << ": "
<< ti_opstring(static_cast<enum ti_opcode>(j)) << ": "
<< i.second[j] << "\n";
}
}
}
return os.str();
}

85
dbsim/db_threads.hpp Normal file
View File

@ -0,0 +1,85 @@
/*
* Copyright (C) 2019 Codership Oy <info@codership.com>
*
* This file is part of wsrep-lib.
*
* Wsrep-lib is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* Wsrep-lib is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
*/
#ifndef WSREP_DB_THREADS_HPP
#define WSREP_DB_THREADS_HPP
#include "wsrep/thread_service.hpp"
#include <string>
namespace db
{
class ti : public wsrep::thread_service
{
public:
ti() { }
int before_init() override;
int after_init() override;
/* Thread */
const wsrep::thread_service::thread_key*
create_thread_key(const char* name) WSREP_NOEXCEPT override;
int create_thread(const wsrep::thread_service::thread_key* key,
wsrep::thread_service::thread**,
void* (*fn)(void*), void*) WSREP_NOEXCEPT override;
int detach(wsrep::thread_service::thread*) WSREP_NOEXCEPT override;
int equal(wsrep::thread_service::thread*,
wsrep::thread_service::thread*) WSREP_NOEXCEPT override;
void exit(wsrep::thread_service::thread*, void*) WSREP_NOEXCEPT override;
int join(wsrep::thread_service::thread*, void**) WSREP_NOEXCEPT override;
wsrep::thread_service::thread* self() WSREP_NOEXCEPT override;
int setschedparam(wsrep::thread_service::thread*, int,
const struct sched_param*) WSREP_NOEXCEPT override;
int getschedparam(wsrep::thread_service::thread*, int*,
struct sched_param*) WSREP_NOEXCEPT override;
/* Mutex */
const wsrep::thread_service::mutex_key*
create_mutex_key(const char* name) WSREP_NOEXCEPT override;
wsrep::thread_service::mutex*
init_mutex(const wsrep::thread_service::mutex_key* key, void* memblock,
size_t memblock_size) WSREP_NOEXCEPT override;
int destroy(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT override;
int lock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT override;
int trylock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT override;
int unlock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT override;
/* Cond */
const wsrep::thread_service::cond_key*
create_cond_key(const char* name) WSREP_NOEXCEPT override;
wsrep::thread_service::cond*
init_cond(const wsrep::thread_service::cond_key* key, void* memblock,
size_t memblock_size) WSREP_NOEXCEPT override;
int destroy(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT override;
int wait(wsrep::thread_service::cond* cond,
wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT override;
int timedwait(wsrep::thread_service::cond* cond,
wsrep::thread_service::mutex* mutex,
const struct timespec* ts) WSREP_NOEXCEPT override;
int signal(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT override;
int broadcast(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT override;
static void level(int level);
static void cond_checks(bool cond_checks);
static std::string stats();
};
}
#endif // WSREP_DB_THREADS_HPP

View File

@ -307,6 +307,10 @@ namespace wsrep
/**
* Append a key into transaction write set.
*
* @param key Key to be appended
*
* @return Zero on success, non-zero on failure.
*/
int append_key(const wsrep::key& key)
{
@ -315,6 +319,27 @@ namespace wsrep
return transaction_.append_key(key);
}
/**
* Append keys in key_array into transaction write set.
*
* @param keys Array of keys to be appended
*
* @return Zero in case of success, non-zero on failure.
*/
int append_keys(const wsrep::key_array& keys)
{
assert(mode_ == m_local || mode_ == m_toi);
assert(state_ == s_exec);
for (auto i(keys.begin()); i != keys.end(); ++i)
{
if (transaction_.append_key(*i))
{
return 1;
}
}
return 0;
}
/**
* Append data into transaction write set.
*/

View File

@ -20,12 +20,30 @@
/** @file compiler.hpp
*
* Compiler specific options.
* Compiler specific macro definitions.
*
* WSREP_NOEXCEPT - Specifies that the method/function does not throw. If
* and exception is thrown inside, std::terminate is called
* without propagating the exception.
* Set to "noexcept" if the compiler supports it, otherwise
* left empty.
* WSREP_NORETURN - Indicates that the method/function does not return.
* Set to attribute "[[noreturn]]" if the compiler supports,
* it, otherwise "__attribute__((noreturn))".
* WSREP_OVERRIDE - Set to "override" if the compiler supports it, otherwise
* left empty.
* WSREP_UNUSED - Can be used to mark variables which may be present in
* debug builds but not in release builds.
*/
#define WSREP_UNUSED __attribute__((unused))
#if __cplusplus >= 201103L
#define WSREP_NOEXCEPT noexcept
#define WSREP_NORETURN [[noreturn]]
#define WSREP_OVERRIDE override
#else
#define WSREP_NOEXCEPT
#define WSREP_NORETURN __attribute__((noreturn))
#define WSREP_OVERRIDE
#endif // __cplusplus >= 201103L
#define WSREP_UNUSED __attribute__((unused))

View File

@ -22,9 +22,16 @@
#include "id.hpp"
#include "seqno.hpp"
#include "compiler.hpp"
#include <iosfwd>
/**
* Minimum number of bytes guaratneed to store GTID string representation,
* terminating '\0' not included (36 + 1 + 20).
*/
#define WSREP_LIB_GTID_C_STR_LEN 57
namespace wsrep
{
class gtid
@ -61,6 +68,18 @@ namespace wsrep
wsrep::seqno seqno_;
};
/**
* Scan a GTID from C string.
*
* @param buf Buffer containing the string
* @param len Length of buffer
* @param[out] gtid Gtid to be printed to
*
* @return Number of bytes scanned, negative value on error.
*/
ssize_t scan_from_c_str(const char* buf, size_t buf_len,
wsrep::gtid& gtid);
/**
* Print a GTID into character buffer.
* @param buf Pointer to the beginning of the buffer
@ -68,16 +87,16 @@ namespace wsrep
*
* @return Number of characters printed or negative value for error
*/
ssize_t gtid_print_to_c_str(const wsrep::gtid&, char* buf, size_t buf_len);
ssize_t print_to_c_str(const wsrep::gtid&, char* buf, size_t buf_len);
/**
* Return minimum number of bytes guaranteed to store GTID string
* representation, terminating '\0' not included (36 + 1 + 20)
* Overload for ostream operator<<.
*/
static inline size_t gtid_c_str_len()
{
return 57;
}
std::ostream& operator<<(std::ostream&, const wsrep::gtid&);
/**
* Overload for istream operator>>.
*/
std::istream& operator>>(std::istream&, wsrep::gtid&);
}

View File

@ -42,10 +42,11 @@ namespace wsrep
class id
{
public:
typedef struct native_type { unsigned char buf[16]; } native_type;
/**
* Default constructor. Constructs an empty identifier.
*/
id() : data_() { std::memset(data_, 0, sizeof(data_)); }
id() : data_() { std::memset(data_.buf, 0, sizeof(data_.buf)); }
/**
* Construct from string. The input string may contain either
@ -62,24 +63,24 @@ namespace wsrep
{
throw wsrep::runtime_error("Too long identifier");
}
std::memset(data_, 0, sizeof(data_));
std::memcpy(data_, data, size);
std::memset(data_.buf, 0, sizeof(data_.buf));
std::memcpy(data_.buf, data, size);
}
bool operator<(const id& other) const
{
return (std::memcmp(data_, other.data_, sizeof(data_)) < 0);
return (std::memcmp(data_.buf, other.data_.buf, sizeof(data_.buf)) < 0);
}
bool operator==(const id& other) const
{
return (std::memcmp(data_, other.data_, sizeof(data_)) == 0);
return (std::memcmp(data_.buf, other.data_.buf, sizeof(data_.buf)) == 0);
}
bool operator!=(const id& other) const
{
return !(*this == other);
}
const void* data() const { return data_; }
const void* data() const { return data_.buf; }
size_t size() const { return sizeof(data_); }
@ -94,7 +95,7 @@ namespace wsrep
}
private:
static const wsrep::id undefined_;
unsigned char data_[16];
native_type data_;
};
std::ostream& operator<<(std::ostream&, const wsrep::id& id);

View File

@ -25,6 +25,7 @@
#include "buffer.hpp"
#include "client_id.hpp"
#include "transaction_id.hpp"
#include "compiler.hpp"
#include <cassert>
#include <cstring>
@ -33,10 +34,17 @@
#include <vector>
#include <ostream>
/**
* Empty provider magic. If none provider is passed to make_provider(),
* a dummy provider is loaded.
*/
#define WSREP_LIB_PROVIDER_NONE "none"
namespace wsrep
{
class server_state;
class high_priority_service;
class thread_service;
class stid
{
public:
@ -186,7 +194,6 @@ namespace wsrep
class provider
{
public:
class status_variable
{
public:
@ -404,16 +411,31 @@ namespace wsrep
*/
virtual void* native() const = 0;
/**
* Services argument passed to make_provider. This struct contains
* optional services which are passed to the provider.
*/
struct services
{
wsrep::thread_service* thread_service;
services()
: thread_service()
{
}
};
/**
* Create a new provider.
*
* @param provider_spec Provider specification
* @param provider_options Initial options to provider
* @param thread_service Optional thread service implementation.
*/
static provider* make_provider(
wsrep::server_state&,
const std::string& provider_spec,
const std::string& provider_options);
static provider* make_provider(wsrep::server_state&,
const std::string& provider_spec,
const std::string& provider_options,
const wsrep::provider::services& services
= wsrep::provider::services());
protected:
wsrep::server_state& server_state_;
};

View File

@ -20,8 +20,6 @@
#ifndef WSREP_SEQNO_HPP
#define WSREP_SEQNO_HPP
#include "exception.hpp"
#include <iosfwd>
namespace wsrep
@ -33,6 +31,8 @@ namespace wsrep
class seqno
{
public:
typedef long long native_type;
seqno()
: seqno_(-1)
{ }
@ -77,8 +77,9 @@ namespace wsrep
return (*this + seqno(other));
}
static seqno undefined() { return seqno(-1); }
private:
long long seqno_;
native_type seqno_;
};
std::ostream& operator<<(std::ostream& os, wsrep::seqno seqno);

View File

@ -95,6 +95,13 @@
#include <string>
#include <map>
/**
* Magic string to tell provider to engage into trivial (empty)
* state transfer. No data will be passed, but the node shall be
* considered joined.
*/
#define WSREP_LIB_SST_TRIVIAL "trivial"
namespace wsrep
{
// Forward declarations
@ -179,7 +186,6 @@ namespace wsrep
rm_sync
};
virtual ~server_state();
wsrep::encryption_service* encryption_service()
@ -262,11 +268,15 @@ namespace wsrep
* @param provider WSRep provider library to be loaded.
* @param provider_options Provider specific options string
* to be passed for provider during initialization.
* @param services Application defined services passed to
* the provider.
*
* @return Zero on success, non-zero on error.
*/
int load_provider(const std::string& provider,
const std::string& provider_options);
const std::string& provider_options,
const wsrep::provider::services& services
= wsrep::provider::services());
void unload_provider();

View File

@ -0,0 +1,100 @@
/*
* Copyright (C) 2019 Codership Oy <info@codership.com>
*
* This file is part of wsrep-lib.
*
* Wsrep-lib is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* Wsrep-lib is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
*/
/** @file thread_service.hpp
*
* Service interface for threads and synchronization primitives.
* The purpose of this interface is to provider provider implementations
* means to integrate with application thread implementation.
*
* Interface is designed to resemble POSIX threads, mutexes and
* condition variables.
*/
#include <cstddef> // size_t
#include "compiler.hpp"
struct timespec;
struct sched_param;
namespace wsrep
{
class thread_service
{
public:
virtual ~thread_service() { }
class thread_key { };
class thread { };
class mutex_key { };
class mutex { };
class cond_key { };
class cond { };
/**
* Method will be called before library side thread
* service initialization.
*/
virtual int before_init() = 0;
/**
* Method will be called after library side thread service
* has been initialized.
*/
virtual int after_init() = 0;
/* Thread */
virtual const thread_key* create_thread_key(const char* name) WSREP_NOEXCEPT
= 0;
virtual int create_thread(const thread_key*, thread**,
void* (*fn)(void*), void*) WSREP_NOEXCEPT
= 0;
virtual int detach(thread*) WSREP_NOEXCEPT = 0;
virtual int equal(thread*, thread*) WSREP_NOEXCEPT = 0;
WSREP_NORETURN virtual void exit(thread*, void* retval) WSREP_NOEXCEPT = 0;
virtual int join(thread*, void** retval) WSREP_NOEXCEPT = 0;
virtual thread* self() WSREP_NOEXCEPT = 0;
virtual int setschedparam(thread*, int,
const struct sched_param*) WSREP_NOEXCEPT
= 0;
virtual int getschedparam(thread*, int*, struct sched_param*) WSREP_NOEXCEPT
= 0;
/* Mutex */
virtual const mutex_key* create_mutex_key(const char* name) WSREP_NOEXCEPT
= 0;
virtual mutex* init_mutex(const mutex_key*, void*, size_t) WSREP_NOEXCEPT = 0;
virtual int destroy(mutex*) WSREP_NOEXCEPT = 0;
virtual int lock(mutex*) WSREP_NOEXCEPT = 0;
virtual int trylock(mutex*) WSREP_NOEXCEPT = 0;
virtual int unlock(mutex*) WSREP_NOEXCEPT = 0;
/* Condition variable */
virtual const cond_key* create_cond_key(const char* name) WSREP_NOEXCEPT = 0;
virtual cond* init_cond(const cond_key*, void*, size_t) WSREP_NOEXCEPT = 0;
virtual int destroy(cond*) WSREP_NOEXCEPT = 0;
virtual int wait(cond*, mutex*) WSREP_NOEXCEPT = 0;
virtual int timedwait(cond*, mutex*, const struct timespec*) WSREP_NOEXCEPT
= 0;
virtual int signal(cond*) WSREP_NOEXCEPT = 0;
virtual int broadcast(cond*) WSREP_NOEXCEPT = 0;
};
} // namespace wsrep

View File

@ -14,6 +14,8 @@ add_library(wsrep-lib
view.cpp
server_state.cpp
thread.cpp
thread_service_v1.cpp
transaction.cpp
uuid.cpp
wsrep_provider_v26.cpp)
target_link_libraries(wsrep-lib wsrep_api_v26 pthread dl)

View File

@ -36,11 +36,47 @@ std::istream& wsrep::operator>>(std::istream& is, wsrep::gtid& gtid)
std::getline(is, id_str, ':');
long long seq;
is >> seq;
gtid = wsrep::gtid(wsrep::id(id_str), wsrep::seqno(seq));
if (!is)
{
is.clear(std::ios_base::failbit);
return is;
}
try
{
// wsrep::id constructor will throw if it cannot parse the
// id_str.
gtid = wsrep::gtid(wsrep::id(id_str), wsrep::seqno(seq));
}
catch (const wsrep::runtime_error& e)
{
// Formatting or extraction error. Clear the istream state and
// set failibit.
is.clear(std::ios_base::failbit);
}
return is;
}
ssize_t wsrep::gtid_print_to_c_str(
ssize_t wsrep::scan_from_c_str(
const char* buf, size_t buf_len, wsrep::gtid& gtid)
{
std::istringstream is(std::string(buf, buf_len));
is >> gtid;
// Whole string was consumed without failures
if (is && is.eof())
{
return static_cast<ssize_t>(buf_len);
}
// Some failure occurred
if (!is)
{
return -EINVAL;
}
// The string was not consumed completely, return current position
// of the istream.
return is.tellg();
}
ssize_t wsrep::print_to_c_str(
const wsrep::gtid& gtid, char* buf, size_t buf_len)
{
std::ostringstream os;

View File

@ -18,7 +18,7 @@
*/
#include "wsrep/id.hpp"
#include <wsrep_api.h>
#include "uuid.hpp"
#include <cctype>
#include <sstream>
@ -29,15 +29,17 @@ const wsrep::id wsrep::id::undefined_ = wsrep::id();
wsrep::id::id(const std::string& str)
: data_()
{
wsrep_uuid_t wsrep_uuid;
if (wsrep_uuid_scan(str.c_str(), str.size(), &wsrep_uuid) ==
WSREP_UUID_STR_LEN)
wsrep::uuid_t wsrep_uuid;
if (str.size() == WSREP_LIB_UUID_STR_LEN &&
wsrep::uuid_scan(str.c_str(), str.size(), &wsrep_uuid) ==
WSREP_LIB_UUID_STR_LEN)
{
std::memcpy(data_, wsrep_uuid.data, sizeof(data_));
std::memcpy(data_.buf, wsrep_uuid.data, sizeof(data_.buf));
}
else if (str.size() <= 16)
{
std::memcpy(data_, str.c_str(), str.size());
std::memcpy(data_.buf, str.c_str(), str.size());
}
else
{
@ -58,14 +60,14 @@ std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::id& id)
}
else
{
char uuid_str[WSREP_UUID_STR_LEN + 1];
wsrep_uuid_t uuid;
char uuid_str[WSREP_LIB_UUID_STR_LEN + 1];
wsrep::uuid_t uuid;
std::memcpy(uuid.data, ptr, sizeof(uuid.data));
if (wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str)) < 0)
if (wsrep::uuid_print(&uuid, uuid_str, sizeof(uuid_str)) < 0)
{
throw wsrep::runtime_error("Could not print uuid");
}
uuid_str[WSREP_UUID_STR_LEN] = '\0';
uuid_str[WSREP_LIB_UUID_STR_LEN] = '\0';
return (os << uuid_str);
}
}

View File

@ -19,18 +19,69 @@
#include "wsrep/provider.hpp"
#include "wsrep/logger.hpp"
#include "wsrep/compiler.hpp"
#include "wsrep_provider_v26.hpp"
#include <dlfcn.h>
#include <memory>
struct dlh_deleter
{
void operator()(void* dlh)
{
dlclose(dlh);
}
};
std::unique_ptr<void, dlh_deleter>
load_library(const std::string& provider_spec)
{
void* ret(dlopen(provider_spec.c_str(), RTLD_NOW | RTLD_LOCAL));
if (ret == 0)
{
wsrep::log_error() << "Failed to load library " << provider_spec;
}
return std::unique_ptr<void, dlh_deleter>(ret);
}
static int get_api_version(void* dlh)
{
const char** version(reinterpret_cast<const char**>(
dlsym(dlh, "wsrep_interface_version")));
if (version == 0)
{
wsrep::log_error() << "Failed to read interface version";
return 0;
}
std::istringstream is(*version);
int ret;
is >> ret;
return ret;
}
wsrep::provider* wsrep::provider::make_provider(
wsrep::server_state& server_state,
const std::string& provider_spec,
const std::string& provider_options)
const std::string& provider_options,
const wsrep::provider::services& services)
{
auto dlh(load_library(provider_spec));
if (dlh == 0) return 0;
int api_ver(get_api_version(dlh.get()));
if (api_ver == 0) return 0;
wsrep::log_info() << "Found provider with API version " << api_ver;
try
{
return new wsrep::wsrep_provider_v26(
server_state, provider_options, provider_spec);
switch (api_ver)
{
case 26:
return new wsrep::wsrep_provider_v26(
server_state, provider_options, provider_spec, services);
default:
wsrep::log_error() << "Unimplemented wsrep-API version "
<< api_ver;
}
}
catch (const wsrep::runtime_error& e)
{

View File

@ -471,15 +471,17 @@ static int apply_toi(wsrep::provider& provider,
// Server State //
//////////////////////////////////////////////////////////////////////////////
int wsrep::server_state::load_provider(const std::string& provider_spec,
const std::string& provider_options)
int wsrep::server_state::load_provider(
const std::string& provider_spec, const std::string& provider_options,
const wsrep::provider::services& services)
{
wsrep::log_info() << "Loading provider " << provider_spec
<< " initial position: " << initial_position_;
provider_ = wsrep::provider::make_provider(*this,
provider_spec,
provider_options);
provider_options,
services);
return (provider_ ? 0 : 1);
}

284
src/thread_service_v1.cpp Normal file
View File

@ -0,0 +1,284 @@
/*
* Copyright (C) 2019 Codership Oy <info@codership.com>
*
* This file is part of wsrep-lib.
*
* Wsrep-lib is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* Wsrep-lib is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
*/
#include "thread_service_v1.hpp"
#include "wsrep/thread_service.hpp"
#include "wsrep/logger.hpp"
#include "v26/wsrep_thread_service.h"
#include <dlfcn.h>
#include <cerrno>
namespace wsrep_thread_service_v1
{
//
// Thread service callbacks
//
// Pointer to thread service implementation provided by
// the application.
static wsrep::thread_service* thread_service_impl{ 0 };
static const wsrep_thread_key_t* thread_key_create_cb(const char* name)
{
assert(thread_service_impl);
return reinterpret_cast<const wsrep_thread_key_t*>(
thread_service_impl->create_thread_key(name));
;
}
static int thread_create_cb(const wsrep_thread_key_t* key,
wsrep_thread_t** thread,
void* (*fn)(void*), void* args)
{
assert(thread_service_impl);
return thread_service_impl->create_thread(
reinterpret_cast<const wsrep::thread_service::thread_key*>(key),
reinterpret_cast<wsrep::thread_service::thread**>(thread), fn,
args);
}
int thread_detach_cb(wsrep_thread_t* thread)
{
assert(thread_service_impl);
return thread_service_impl->detach(
reinterpret_cast<wsrep::thread_service::thread*>(thread));
}
int thread_equal_cb(wsrep_thread_t* thread_1, wsrep_thread_t* thread_2)
{
assert(thread_service_impl);
return thread_service_impl->equal(
reinterpret_cast<wsrep::thread_service::thread*>(thread_1),
reinterpret_cast<wsrep::thread_service::thread*>(thread_2));
}
__attribute__((noreturn))
void thread_exit_cb(wsrep_thread_t* thread, void* retval)
{
assert(thread_service_impl);
thread_service_impl->exit(
reinterpret_cast<wsrep::thread_service::thread*>(thread), retval);
throw; // Implementation broke the contract and returned.
}
int thread_join_cb(wsrep_thread_t* thread, void** retval)
{
assert(thread_service_impl);
return thread_service_impl->join(
reinterpret_cast<wsrep::thread_service::thread*>(thread), retval);
}
wsrep_thread_t* thread_self_cb(void)
{
assert(thread_service_impl);
return reinterpret_cast<wsrep_thread_t*>(thread_service_impl->self());
}
int thread_setschedparam_cb(wsrep_thread_t* thread, int policy,
const struct sched_param* sp)
{
assert(thread_service_impl);
return thread_service_impl->setschedparam(
reinterpret_cast<wsrep::thread_service::thread*>(thread),
policy, sp);
}
int thread_getschedparam_cb(wsrep_thread_t* thread, int* policy,
struct sched_param* sp)
{
assert(thread_service_impl);
return thread_service_impl->getschedparam(
reinterpret_cast<wsrep::thread_service::thread*>(thread), policy,
sp);
}
const wsrep_mutex_key_t* mutex_key_create_cb(const char* name)
{
assert(thread_service_impl);
return reinterpret_cast<const wsrep_mutex_key_t*>(
thread_service_impl->create_mutex_key(name));
}
wsrep_mutex_t* mutex_init_cb(const wsrep_mutex_key_t* key, void* memblock,
size_t memblock_size)
{
assert(thread_service_impl);
return reinterpret_cast<wsrep_mutex_t*>(
thread_service_impl->init_mutex(
reinterpret_cast<const wsrep::thread_service::mutex_key*>(key),
memblock, memblock_size));
}
int mutex_destroy_cb(wsrep_mutex_t* mutex)
{
assert(thread_service_impl);
return thread_service_impl->destroy(
reinterpret_cast<wsrep::thread_service::mutex*>(mutex));
}
int mutex_lock_cb(wsrep_mutex_t* mutex)
{
assert(thread_service_impl);
return thread_service_impl->lock(
reinterpret_cast<wsrep::thread_service::mutex*>(mutex));
}
int mutex_trylock_cb(wsrep_mutex_t* mutex)
{
assert(thread_service_impl);
return thread_service_impl->trylock(
reinterpret_cast<wsrep::thread_service::mutex*>(mutex));
}
int mutex_unlock_cb(wsrep_mutex_t* mutex)
{
assert(thread_service_impl);
return thread_service_impl->unlock(
reinterpret_cast<wsrep::thread_service::mutex*>(mutex));
}
const wsrep_cond_key_t* cond_key_create_cb(const char* name)
{
assert(thread_service_impl);
return reinterpret_cast<const wsrep_cond_key_t*>(
thread_service_impl->create_cond_key(name));
}
wsrep_cond_t* cond_init_cb(const wsrep_cond_key_t* key, void* memblock,
size_t memblock_size)
{
assert(thread_service_impl);
return reinterpret_cast<wsrep_cond_t*>(thread_service_impl->init_cond(
reinterpret_cast<const wsrep::thread_service::cond_key*>(key),
memblock, memblock_size));
}
int cond_destroy_cb(wsrep_cond_t* cond)
{
assert(thread_service_impl);
return thread_service_impl->destroy(
reinterpret_cast<wsrep::thread_service::cond*>(cond));
}
int cond_wait_cb(wsrep_cond_t* cond, wsrep_mutex_t* mutex)
{
assert(thread_service_impl);
return thread_service_impl->wait(
reinterpret_cast<wsrep::thread_service::cond*>(cond),
reinterpret_cast<wsrep::thread_service::mutex*>(mutex));
}
int cond_timedwait_cb(wsrep_cond_t* cond, wsrep_mutex_t* mutex,
const struct timespec* ts)
{
assert(thread_service_impl);
return thread_service_impl->timedwait(
reinterpret_cast<wsrep::thread_service::cond*>(cond),
reinterpret_cast<wsrep::thread_service::mutex*>(mutex), ts);
}
int cond_signal_cb(wsrep_cond_t* cond)
{
assert(thread_service_impl);
return thread_service_impl->signal(
reinterpret_cast<wsrep::thread_service::cond*>(cond));
}
int cond_broadcast_cb(wsrep_cond_t* cond)
{
assert(thread_service_impl);
return thread_service_impl->broadcast(
reinterpret_cast<wsrep::thread_service::cond*>(cond));
}
static wsrep_thread_service_v1_t thread_service_callbacks
= { thread_key_create_cb,
thread_create_cb,
thread_detach_cb,
thread_equal_cb,
thread_exit_cb,
thread_join_cb,
thread_self_cb,
thread_setschedparam_cb,
thread_getschedparam_cb,
mutex_key_create_cb,
mutex_init_cb,
mutex_destroy_cb,
mutex_lock_cb,
mutex_trylock_cb,
mutex_unlock_cb,
cond_key_create_cb,
cond_init_cb,
cond_destroy_cb,
cond_wait_cb,
cond_timedwait_cb,
cond_signal_cb,
cond_broadcast_cb };
}
int wsrep::thread_service_v1_probe(void* dlh)
{
typedef int (*init_fn)(wsrep_thread_service_v1_t*);
union {
init_fn dlfun;
void* obj;
} alias;
// Clear previous errors
(void)dlerror();
alias.obj = dlsym(dlh, WSREP_THREAD_SERVICE_INIT_FUNC);
if (alias.obj)
{
wsrep::log_info()
<< "Found support for thread service v1 from provider";
return 0;
}
else
{
wsrep::log_info() << "Thread service v1 not found from provider: "
<< dlerror();
return ENOTSUP;
}
}
int wsrep::thread_service_v1_init(void* dlh,
wsrep::thread_service* thread_service)
{
if (not (dlh && thread_service)) return EINVAL;
typedef int (*init_fn)(wsrep_thread_service_v1_t*);
union {
init_fn dlfun;
void* obj;
} alias;
alias.obj = dlsym(dlh, WSREP_THREAD_SERVICE_INIT_FUNC);
if (alias.obj)
{
wsrep::log_info() << "Initializing process instrumentation";
wsrep_thread_service_v1::thread_service_impl = thread_service;
return (*alias.dlfun)(&wsrep_thread_service_v1::thread_service_callbacks);
}
else
{
wsrep::log_info()
<< "Provider does not support process instrumentation";
return ENOTSUP;
}
}

47
src/thread_service_v1.hpp Normal file
View File

@ -0,0 +1,47 @@
/*
* Copyright (C) 2019 Codership Oy <info@codership.com>
*
* This file is part of wsrep-lib.
*
* Wsrep-lib is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* Wsrep-lib is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
*/
#ifndef WSREP_THREAD_SERVICE_V1_HPP
#define WSREP_THREAD_SERVICE_V1_HPP
namespace wsrep
{
class thread_service;
/**
* Probe thread_service_v1 support in loaded library.
*
* @param dlh Handle returned by dlopen().
*
* @return Zero on success, non-zero system error code on failure.
*/
int thread_service_v1_probe(void *dlh);
/**
* Initialize the thread service.
*
* @param dlh Handle returned by dlopen().
* @params thread_service Pointer to wsrep::thread_service implementation.
*
* @return Zero on success, non-zero system error code on failure.
*/
int thread_service_v1_init(void* dlh,
wsrep::thread_service* thread_service);
}
#endif // WSREP_THREAD_SERVICE_V1_HPP

74
src/uuid.cpp Normal file
View File

@ -0,0 +1,74 @@
/*
* Copyright (C) 2019 Codership Oy <info@codership.com>
*
* This file is part of wsrep-lib.
*
* Wsrep-lib is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* Wsrep-lib is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
*/
#include "uuid.hpp"
#include <cstring>
#include <cerrno>
#include <cstdio>
#include <cctype>
int wsrep::uuid_scan (const char* str, size_t str_len, wsrep::uuid_t* uuid)
{
unsigned int uuid_len = 0;
unsigned int uuid_offt = 0;
while (uuid_len + 1 < str_len) {
/* We are skipping potential '-' after uuid_offt == 4, 6, 8, 10
* which means
* (uuid_offt >> 1) == 2, 3, 4, 5,
* which in turn means
* (uuid_offt >> 1) - 2 <= 3
* since it is always >= 0, because uuid_offt is unsigned */
if (((uuid_offt >> 1) - 2) <= 3 && str[uuid_len] == '-') {
// skip dashes after 4th, 6th, 8th and 10th positions
uuid_len += 1;
continue;
}
if (isxdigit(str[uuid_len]) && isxdigit(str[uuid_len + 1])) {
// got hex digit, scan another byte to uuid, increment uuid_offt
sscanf (str + uuid_len, "%2hhx", uuid->data + uuid_offt);
uuid_len += 2;
uuid_offt += 1;
if (sizeof (uuid->data) == uuid_offt)
return static_cast<int>(uuid_len);
}
else {
break;
}
}
*uuid = wsrep::uuid_initializer;
return -EINVAL;
}
int wsrep::uuid_print (const wsrep::uuid_t* uuid, char* str, size_t str_len)
{
if (str_len > WSREP_LIB_UUID_STR_LEN) {
const unsigned char* u = uuid->data;
return snprintf(str, str_len, "%02x%02x%02x%02x-%02x%02x-%02x%02x-"
"%02x%02x-%02x%02x%02x%02x%02x%02x",
u[ 0], u[ 1], u[ 2], u[ 3], u[ 4], u[ 5], u[ 6], u[ 7],
u[ 8], u[ 9], u[10], u[11], u[12], u[13], u[14], u[15]);
}
else {
return -EMSGSIZE;
}
}

79
src/uuid.hpp Normal file
View File

@ -0,0 +1,79 @@
/*
* Copyright (C) 2019 Codership Oy <info@codership.com>
*
* This file is part of wsrep-lib.
*
* Wsrep-lib is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* Wsrep-lib is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
*/
/** @file uuid.hpp
*
* Helper methods to parse and print UUIDs, intended to use
* internally in wsrep-lib.
*
* The implementation is copied from wsrep-API v26.
*/
#ifndef WSREP_UUID_HPP
#define WSREP_UUID_HPP
#include "wsrep/compiler.hpp"
#include <cstddef>
/**
* Length of UUID string representation, not including terminating '\0'.
*/
#define WSREP_LIB_UUID_STR_LEN 36
namespace wsrep
{
/**
* UUID type.
*/
typedef union uuid_
{
unsigned char data[16];
size_t alignment;
} uuid_t;
static const wsrep::uuid_t uuid_initializer = {{0, }};
/**
* Read UUID from string.
*
* @param str String to read from
* @param str_len Length of string
* @param[out] UUID to read to
*
* @return Number of bytes read or negative error code in case
* of error.
*/
int uuid_scan(const char* str, size_t str_len, wsrep::uuid_t* uuid);
/**
* Write UUID to string. The caller must allocate at least
* WSREP_LIB_UUID_STR_LEN + 1 space for the output str parameter.
*
* @param uuid UUID to print
* @param str[out] Output buffer
* @param str_len Size of output buffer
*
* @return Number of chars printerd, negative error code in case of
* error.
*/
int uuid_print(const wsrep::uuid_t* uuid, char* str, size_t str_len);
}
#endif // WSREP_UUID_HPP

View File

@ -25,9 +25,13 @@
#include "wsrep/view.hpp"
#include "wsrep/exception.hpp"
#include "wsrep/logger.hpp"
#include "wsrep/thread_service.hpp"
#include <wsrep_api.h>
#include "thread_service_v1.hpp"
#include "v26/wsrep_api.h"
#include <dlfcn.h>
#include <cassert>
#include <climits>
@ -562,12 +566,39 @@ namespace
break;
}
}
static int init_thread_service(void* dlh,
wsrep::thread_service* thread_service)
{
assert(thread_service);
if (wsrep::thread_service_v1_probe(dlh))
{
// No support in library.
return 0;
}
else
{
if (thread_service->before_init())
{
wsrep::log_error() << "Thread service before init failed";
return 1;
}
wsrep::thread_service_v1_init(dlh, thread_service);
if (thread_service->after_init())
{
wsrep::log_error() << "Thread service after init failed";
return 1;
}
}
return 0;
}
}
wsrep::wsrep_provider_v26::wsrep_provider_v26(
wsrep::server_state& server_state,
const std::string& provider_options,
const std::string& provider_spec)
const std::string& provider_spec,
const wsrep::provider::services& services)
: provider(server_state)
, wsrep_()
{
@ -603,6 +634,13 @@ wsrep::wsrep_provider_v26::wsrep_provider_v26(
{
throw wsrep::runtime_error("Failed to load wsrep library");
}
if (services.thread_service &&
init_thread_service(wsrep_->dlh, services.thread_service))
{
throw wsrep::runtime_error("Failed to initialize thread service");
}
if (wsrep_->init(wsrep_, &init_args) != WSREP_OK)
{
throw wsrep::runtime_error("Failed to initialize wsrep provider");

View File

@ -26,12 +26,14 @@ struct wsrep_st;
namespace wsrep
{
class thread_service;
class wsrep_provider_v26 : public wsrep::provider
{
public:
wsrep_provider_v26(wsrep::server_state&, const std::string&,
const std::string&);
const std::string&,
const wsrep::provider::services& services);
~wsrep_provider_v26();
enum wsrep::provider::status
connect(const std::string&, const std::string&, const std::string&,

View File

@ -7,6 +7,7 @@ add_executable(wsrep-lib_test
mock_high_priority_service.cpp
mock_storage_service.cpp
test_utils.cpp
gtid_test.cpp
id_test.cpp
server_context_test.cpp
transaction_test.cpp

62
test/gtid_test.cpp Normal file
View File

@ -0,0 +1,62 @@
/*
* Copyright (C) 2018 Codership Oy <info@codership.com>
*
* This file is part of wsrep-lib.
*
* Wsrep-lib is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* Wsrep-lib is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
*/
#include "wsrep/gtid.hpp"
#include <boost/test/unit_test.hpp>
BOOST_AUTO_TEST_CASE(gtid_test_scan_from_string_uuid)
{
std::string gtid_str("6a20d44a-6e17-11e8-b1e2-9061aec0cdad:123456");
wsrep::gtid gtid;
ssize_t ret(wsrep::scan_from_c_str(
gtid_str.c_str(),
gtid_str.size(), gtid));
BOOST_REQUIRE_MESSAGE(ret == ssize_t(gtid_str.size()),
"Expected " << gtid_str.size() << " got " << ret);
BOOST_REQUIRE(gtid.seqno().get() == 123456);
}
BOOST_AUTO_TEST_CASE(gtid_test_scan_from_string_uuid_too_long)
{
std::string gtid_str("6a20d44a-6e17-11e8-b1e2-9061aec0cdadx:123456");
wsrep::gtid gtid;
ssize_t ret(wsrep::scan_from_c_str(
gtid_str.c_str(),
gtid_str.size(), gtid));
BOOST_REQUIRE_MESSAGE(ret == -EINVAL,
"Expected " << -EINVAL << " got " << ret);
}
BOOST_AUTO_TEST_CASE(gtid_test_scan_from_string_seqno_out_of_range)
{
std::string gtid_str("6a20d44a-6e17-11e8-b1e2-9061aec0cdad:9223372036854775808");
wsrep::gtid gtid;
ssize_t ret(wsrep::scan_from_c_str(
gtid_str.c_str(),
gtid_str.size(), gtid));
BOOST_REQUIRE_MESSAGE(ret == -EINVAL,
"Expected " << -EINVAL << " got " << ret);
gtid_str = "6a20d44a-6e17-11e8-b1e2-9061aec0cdad:-9223372036854775809";
ret = wsrep::scan_from_c_str(
gtid_str.c_str(),
gtid_str.size(), gtid);
BOOST_REQUIRE_MESSAGE(ret == -EINVAL,
"Expected " << -EINVAL << " got " << ret);
}

View File

@ -1,7 +1,4 @@
add_library(wsrep_api_v26
v26/wsrep_dummy.c
v26/wsrep_gtid.c
v26/wsrep_loader.c
v26/wsrep_uuid.c
)

@ -1 +1 @@
Subproject commit c7cec7d719f8bd6f44a138b1c4e29a959d910fc9
Subproject commit 12a50c43b112648fec3b1213a1470a85aca55f2c