mirror of
https://github.com/MariaDB/server.git
synced 2025-08-08 11:22:35 +03:00
Copy of
commit f8f364b47f2784f16b401f27658f1c16eaf348ec Author: Jay Edgar <jkedgar@fb.com> Date: Tue Oct 17 15:19:31 2017 -0700 Add a hashed, hierarchical, wheel timer implementation Summary: In order to implement idle timeouts on detached sessions we need something inside MySQL that is lightweight and can handle calling events in the future with very little cost for cancelling or resetting the event. A hashed, hi By default the timers are grouped into 10ms buckets (the 'hashed' part), though the size of the buckets is configurable at the creation of the timer. Each wheel (the 'wheel' part) maintains 256 buckets and cascades to the whe Reviewed By: djwatson Differential Revision: D6199806 fbshipit-source-id: 5e1590f
This commit is contained in:
@@ -27,6 +27,7 @@
|
||||
#include <limits>
|
||||
#include <map>
|
||||
#include <set>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
@@ -805,6 +806,25 @@ int Rdb_key_def::successor(uchar *const packed_tuple, const uint &len) {
|
||||
return changed;
|
||||
}
|
||||
|
||||
/*
|
||||
@return Number of bytes that were changed
|
||||
*/
|
||||
int Rdb_key_def::predecessor(uchar *const packed_tuple, const uint &len) {
|
||||
DBUG_ASSERT(packed_tuple != nullptr);
|
||||
|
||||
int changed = 0;
|
||||
uchar *p = packed_tuple + len - 1;
|
||||
for (; p > packed_tuple; p--) {
|
||||
changed++;
|
||||
if (*p != uchar(0x00)) {
|
||||
*p = *p - 1;
|
||||
break;
|
||||
}
|
||||
*p = 0xFF;
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
static const std::map<char, size_t> UNPACK_HEADER_SIZES = {
|
||||
{RDB_UNPACK_DATA_TAG, RDB_UNPACK_HEADER_SIZE},
|
||||
{RDB_UNPACK_COVERED_DATA_TAG, RDB_UNPACK_COVERED_HEADER_SIZE}};
|
||||
@@ -1403,11 +1423,11 @@ int Rdb_key_def::unpack_record(TABLE *const table, uchar *const buf,
|
||||
MY_BITMAP covered_bitmap;
|
||||
my_bitmap_map covered_bits;
|
||||
uint curr_bitmap_pos = 0;
|
||||
bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
|
||||
|
||||
const bool has_covered_bitmap =
|
||||
has_unpack_info && (unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG);
|
||||
if (has_covered_bitmap) {
|
||||
bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
|
||||
covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
|
||||
sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
|
||||
RDB_UNPACK_COVERED_DATA_LEN_SIZE);
|
||||
@@ -1481,6 +1501,18 @@ int Rdb_key_def::unpack_record(TABLE *const table, uchar *const buf,
|
||||
}
|
||||
if ((this->*fpi->m_skip_func)(fpi, field, &reader))
|
||||
return HA_ERR_ROCKSDB_CORRUPT_DATA;
|
||||
|
||||
// If this is a space padded varchar, we need to skip the indicator
|
||||
// bytes for trailing bytes. They're useless since we can't restore the
|
||||
// field anyway.
|
||||
//
|
||||
// There is a special case for prefixed varchars where we do not
|
||||
// generate unpack info, because we know prefixed varchars cannot be
|
||||
// unpacked. In this case, it is not necessary to skip.
|
||||
if (fpi->m_skip_func == &Rdb_key_def::skip_variable_space_pad &&
|
||||
!fpi->m_unpack_info_stores_value) {
|
||||
unp_reader.read(fpi->m_unpack_info_uses_two_bytes ? 2 : 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3459,6 +3491,20 @@ void Rdb_tbl_def::set_name(const std::string &name) {
|
||||
check_if_is_mysql_system_table();
|
||||
}
|
||||
|
||||
GL_INDEX_ID Rdb_tbl_def::get_autoincr_gl_index_id() {
|
||||
for (uint i = 0; i < m_key_count; i++) {
|
||||
auto &k = m_key_descr_arr[i];
|
||||
if (k->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
|
||||
k->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY) {
|
||||
return k->get_gl_index_id();
|
||||
}
|
||||
}
|
||||
|
||||
// Every table must have a primary key, even if it's hidden.
|
||||
abort();
|
||||
return GL_INDEX_ID();
|
||||
}
|
||||
|
||||
/*
|
||||
Static function of type my_hash_get_key that gets invoked by
|
||||
the m_ddl_hash object of type my_core::HASH.
|
||||
@@ -3683,6 +3729,68 @@ bool Rdb_validate_tbls::compare_to_actual_tables(const std::string &datadir,
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
Validate that all auto increment values in the data dictionary are on a
|
||||
supported version.
|
||||
*/
|
||||
bool Rdb_ddl_manager::validate_auto_incr() {
|
||||
std::unique_ptr<rocksdb::Iterator> it(m_dict->new_iterator());
|
||||
|
||||
uchar auto_incr_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
|
||||
rdb_netbuf_store_index(auto_incr_entry, Rdb_key_def::AUTO_INC);
|
||||
const rocksdb::Slice auto_incr_entry_slice(
|
||||
reinterpret_cast<char *>(auto_incr_entry),
|
||||
Rdb_key_def::INDEX_NUMBER_SIZE);
|
||||
for (it->Seek(auto_incr_entry_slice); it->Valid(); it->Next()) {
|
||||
const rocksdb::Slice key = it->key();
|
||||
const rocksdb::Slice val = it->value();
|
||||
GL_INDEX_ID gl_index_id;
|
||||
|
||||
if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
|
||||
memcmp(key.data(), auto_incr_entry, Rdb_key_def::INDEX_NUMBER_SIZE))
|
||||
break;
|
||||
|
||||
if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (val.size() <= Rdb_key_def::VERSION_SIZE) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if we have orphaned entries for whatever reason by cross
|
||||
// referencing ddl entries.
|
||||
auto ptr = reinterpret_cast<const uchar *>(key.data());
|
||||
ptr += Rdb_key_def::INDEX_NUMBER_SIZE;
|
||||
rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
|
||||
if (!m_dict->get_index_info(gl_index_id, nullptr)) {
|
||||
// NO_LINT_DEBUG
|
||||
sql_print_warning("RocksDB: AUTOINC mismatch - "
|
||||
"Index number (%u, %u) found in AUTOINC "
|
||||
"but does not exist as a DDL entry",
|
||||
gl_index_id.cf_id, gl_index_id.index_id);
|
||||
return false;
|
||||
}
|
||||
|
||||
ptr = reinterpret_cast<const uchar *>(val.data());
|
||||
const int version = rdb_netbuf_read_uint16(&ptr);
|
||||
if (version > Rdb_key_def::AUTO_INCREMENT_VERSION) {
|
||||
// NO_LINT_DEBUG
|
||||
sql_print_warning("RocksDB: AUTOINC mismatch - "
|
||||
"Index number (%u, %u) found in AUTOINC "
|
||||
"is on unsupported version %d",
|
||||
gl_index_id.cf_id, gl_index_id.index_id, version);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (!it->status().ok()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
Validate that all the tables in the RocksDB database dictionary match the .frm
|
||||
files in the datadir
|
||||
@@ -3847,10 +3955,18 @@ bool Rdb_ddl_manager::init(Rdb_dict_manager *const dict_arg,
|
||||
If validate_tables is greater than 0 run the validation. Only fail the
|
||||
initialzation if the setting is 1. If the setting is 2 we continue.
|
||||
*/
|
||||
if (validate_tables > 0 && !validate_schemas()) {
|
||||
if (validate_tables == 1) {
|
||||
sql_print_error("RocksDB: Problems validating data dictionary "
|
||||
"against .frm files, exiting");
|
||||
if (validate_tables > 0) {
|
||||
std::string msg;
|
||||
if (!validate_schemas()) {
|
||||
msg = "RocksDB: Problems validating data dictionary "
|
||||
"against .frm files, exiting";
|
||||
} else if (!validate_auto_incr()) {
|
||||
msg = "RocksDB: Problems validating auto increment values in "
|
||||
"data dictionary, exiting";
|
||||
}
|
||||
if (validate_tables == 1 && !msg.empty()) {
|
||||
// NO_LINT_DEBUG
|
||||
sql_print_error("%s", msg.c_str());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -4124,6 +4240,7 @@ bool Rdb_ddl_manager::rename(const std::string &from, const std::string &to,
|
||||
new_rec->m_auto_incr_val =
|
||||
rec->m_auto_incr_val.load(std::memory_order_relaxed);
|
||||
new_rec->m_key_descr_arr = rec->m_key_descr_arr;
|
||||
|
||||
// so that it's not free'd when deleting the old rec
|
||||
rec->m_key_descr_arr = nullptr;
|
||||
|
||||
@@ -4555,13 +4672,16 @@ void Rdb_dict_manager::delete_index_info(rocksdb::WriteBatch *batch,
|
||||
const GL_INDEX_ID &gl_index_id) const {
|
||||
delete_with_prefix(batch, Rdb_key_def::INDEX_INFO, gl_index_id);
|
||||
delete_with_prefix(batch, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
|
||||
delete_with_prefix(batch, Rdb_key_def::AUTO_INC, gl_index_id);
|
||||
}
|
||||
|
||||
bool Rdb_dict_manager::get_index_info(
|
||||
const GL_INDEX_ID &gl_index_id,
|
||||
struct Rdb_index_info *const index_info) const {
|
||||
|
||||
index_info->m_gl_index_id = gl_index_id;
|
||||
if (index_info) {
|
||||
index_info->m_gl_index_id = gl_index_id;
|
||||
}
|
||||
|
||||
bool found = false;
|
||||
bool error = false;
|
||||
@@ -4572,6 +4692,10 @@ bool Rdb_dict_manager::get_index_info(
|
||||
|
||||
const rocksdb::Status &status = get_value(key, &value);
|
||||
if (status.ok()) {
|
||||
if (!index_info) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const uchar *const val = (const uchar *)value.c_str();
|
||||
const uchar *ptr = val;
|
||||
index_info->m_index_dict_version = rdb_netbuf_to_uint16(val);
|
||||
@@ -4610,6 +4734,11 @@ bool Rdb_dict_manager::get_index_info(
|
||||
index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
|
||||
ptr += RDB_SIZEOF_KV_VERSION;
|
||||
index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
|
||||
if ((index_info->m_kv_version ==
|
||||
Rdb_key_def::PRIMARY_FORMAT_VERSION_TTL) &&
|
||||
index_info->m_ttl_duration > 0) {
|
||||
index_info->m_index_flags = Rdb_key_def::TTL_FLAG;
|
||||
}
|
||||
found = true;
|
||||
break;
|
||||
|
||||
@@ -4651,7 +4780,7 @@ bool Rdb_dict_manager::get_index_info(
|
||||
"and it may be a bug.",
|
||||
index_info->m_index_dict_version, index_info->m_index_type,
|
||||
index_info->m_kv_version, index_info->m_ttl_duration);
|
||||
abort_with_stack_traces();
|
||||
abort();
|
||||
}
|
||||
|
||||
return found;
|
||||
@@ -4914,7 +5043,7 @@ void Rdb_dict_manager::resume_drop_indexes() const {
|
||||
"bug.",
|
||||
max_index_id_in_dict, gl_index_id.cf_id,
|
||||
gl_index_id.index_id);
|
||||
abort_with_stack_traces();
|
||||
abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4963,7 +5092,7 @@ void Rdb_dict_manager::log_start_drop_index(GL_INDEX_ID gl_index_id,
|
||||
"from index id (%u,%u). MyRocks data dictionary may "
|
||||
"get corrupted.",
|
||||
gl_index_id.cf_id, gl_index_id.index_id);
|
||||
abort_with_stack_traces();
|
||||
abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5021,7 +5150,7 @@ void Rdb_dict_manager::add_stats(
|
||||
// IndexStats::materialize takes complete care of serialization including
|
||||
// storing the version
|
||||
const auto value =
|
||||
Rdb_index_stats::materialize(std::vector<Rdb_index_stats>{it}, 1.);
|
||||
Rdb_index_stats::materialize(std::vector<Rdb_index_stats>{it});
|
||||
|
||||
batch->Put(m_system_cfh, rocksdb::Slice((char *)key_buf, sizeof(key_buf)),
|
||||
value);
|
||||
@@ -5047,6 +5176,53 @@ Rdb_index_stats Rdb_dict_manager::get_stats(GL_INDEX_ID gl_index_id) const {
|
||||
return Rdb_index_stats();
|
||||
}
|
||||
|
||||
rocksdb::Status
|
||||
Rdb_dict_manager::put_auto_incr_val(rocksdb::WriteBatchBase *batch,
|
||||
const GL_INDEX_ID &gl_index_id,
|
||||
ulonglong val, bool overwrite) const {
|
||||
uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
|
||||
dump_index_id(key_buf, Rdb_key_def::AUTO_INC, gl_index_id);
|
||||
const rocksdb::Slice key =
|
||||
rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf));
|
||||
|
||||
// Value is constructed by storing the version and the value.
|
||||
uchar value_buf[RDB_SIZEOF_AUTO_INCREMENT_VERSION +
|
||||
ROCKSDB_SIZEOF_AUTOINC_VALUE] = {0};
|
||||
uchar *ptr = value_buf;
|
||||
rdb_netbuf_store_uint16(ptr, Rdb_key_def::AUTO_INCREMENT_VERSION);
|
||||
ptr += RDB_SIZEOF_AUTO_INCREMENT_VERSION;
|
||||
rdb_netbuf_store_uint64(ptr, val);
|
||||
ptr += ROCKSDB_SIZEOF_AUTOINC_VALUE;
|
||||
const rocksdb::Slice value =
|
||||
rocksdb::Slice(reinterpret_cast<char *>(value_buf), ptr - value_buf);
|
||||
|
||||
if (overwrite) {
|
||||
return batch->Put(m_system_cfh, key, value);
|
||||
}
|
||||
return batch->Merge(m_system_cfh, key, value);
|
||||
}
|
||||
|
||||
bool Rdb_dict_manager::get_auto_incr_val(const GL_INDEX_ID &gl_index_id,
|
||||
ulonglong *new_val) const {
|
||||
uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
|
||||
dump_index_id(key_buf, Rdb_key_def::AUTO_INC, gl_index_id);
|
||||
|
||||
std::string value;
|
||||
const rocksdb::Status status = get_value(
|
||||
rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf)),
|
||||
&value);
|
||||
|
||||
if (status.ok()) {
|
||||
const uchar *const val = reinterpret_cast<const uchar *>(value.data());
|
||||
|
||||
if (rdb_netbuf_to_uint16(val) <= Rdb_key_def::AUTO_INCREMENT_VERSION) {
|
||||
*new_val = rdb_netbuf_to_uint64(val + RDB_SIZEOF_AUTO_INCREMENT_VERSION);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
uint Rdb_seq_generator::get_and_update_next_number(
|
||||
Rdb_dict_manager *const dict) {
|
||||
DBUG_ASSERT(dict != nullptr);
|
||||
|
Reference in New Issue
Block a user