diff --git a/storage/rocksdb/CMakeLists.txt b/storage/rocksdb/CMakeLists.txt index 2a4ea4dfc0f..2689dc7151c 100644 --- a/storage/rocksdb/CMakeLists.txt +++ b/storage/rocksdb/CMakeLists.txt @@ -155,6 +155,9 @@ if (UNIX AND NOT APPLE) endif() TARGET_LINK_LIBRARIES(rocksdb rocksdb_aux_lib) + FIND_LIBRARY(LZ4_LIBRARY + NAMES liblz4${PIC_EXT}.a lz4 + HINTS ${WITH_LZ4}/lib) IF(CMAKE_CXX_COMPILER_ID MATCHES "GNU" OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") @@ -182,8 +185,11 @@ IF(HAVE_SCHED_GETCPU) ENDIF() IF (WITH_TBB) + FIND_LIBRARY(TBB_LIBRARY + NAMES libtbb${PIC_EXT}.a tbb + HINTS ${WITH_TBB}/lib) SET(rocksdb_static_libs ${rocksdb_static_libs} - ${WITH_TBB}/lib/libtbb${PIC_EXT}.a) + ${TBB_LIBRARY}) ADD_DEFINITIONS(-DTBB) ENDIF() diff --git a/storage/rocksdb/build_rocksdb.cmake b/storage/rocksdb/build_rocksdb.cmake index 895d473c2d4..f09024ef800 100644 --- a/storage/rocksdb/build_rocksdb.cmake +++ b/storage/rocksdb/build_rocksdb.cmake @@ -178,6 +178,7 @@ set(ROCKSDB_SOURCES db/db_info_dumper.cc db/db_iter.cc db/dbformat.cc + db/error_handler.cc db/event_helpers.cc db/experimental.cc db/external_sst_file_ingestion_job.cc @@ -188,6 +189,7 @@ set(ROCKSDB_SOURCES db/internal_stats.cc db/log_reader.cc db/log_writer.cc + db/logs_with_prep_tracker.cc db/malloc_stats.cc db/managed_iterator.cc db/memtable.cc @@ -284,6 +286,7 @@ set(ROCKSDB_SOURCES util/coding.cc util/compaction_job_stats_impl.cc util/comparator.cc + util/compression_context_cache.cc util/concurrent_arena.cc util/crc32c.cc util/delete_scheduler.cc @@ -304,6 +307,7 @@ set(ROCKSDB_SOURCES util/status_message.cc util/string_util.cc util/sync_point.cc + util/sync_point_impl.cc util/testutil.cc util/thread_local.cc util/threadpool_imp.cc @@ -352,6 +356,8 @@ set(ROCKSDB_SOURCES utilities/transactions/transaction_util.cc utilities/transactions/write_prepared_txn.cc utilities/transactions/write_prepared_txn_db.cc + utilities/transactions/write_unprepared_txn.cc + utilities/transactions/write_unprepared_txn_db.cc utilities/ttl/db_ttl_impl.cc utilities/write_batch_with_index/write_batch_with_index.cc utilities/write_batch_with_index/write_batch_with_index_internal.cc diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index adef71e4e4a..61561a67fcf 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -32,6 +32,7 @@ /* C++ standard header files */ #include +#include #include #include #include @@ -127,6 +128,60 @@ const std::string DEFAULT_CF_NAME("default"); const std::string DEFAULT_SYSTEM_CF_NAME("__system__"); const std::string PER_INDEX_CF_NAME("$per_index_cf"); +class Rdb_explicit_snapshot; + +std::mutex explicit_snapshot_mutex; +ulonglong explicit_snapshot_counter = 0; +std::unordered_map> + explicit_snapshots; +static std::vector rdb_indexes_to_recalc; + +#ifdef MARIADB_NOT_YET +class Rdb_explicit_snapshot : public explicit_snapshot { + std::unique_ptr snapshot; + + public: + static std::shared_ptr + create(snapshot_info_st *ss_info, rocksdb::DB *db, + const rocksdb::Snapshot *snapshot) { + std::lock_guard lock(explicit_snapshot_mutex); + auto s = std::unique_ptr( + new rocksdb::ManagedSnapshot(db, snapshot)); + if (!s) { + return nullptr; + } + ss_info->snapshot_id = ++explicit_snapshot_counter; + auto ret = std::make_shared(*ss_info, std::move(s)); + if (!ret) { + return nullptr; + } + explicit_snapshots[ss_info->snapshot_id] = ret; + return ret; + } + + static std::shared_ptr + get(const ulonglong snapshot_id) { + std::lock_guard lock(explicit_snapshot_mutex); + auto elem = explicit_snapshots.find(snapshot_id); + if (elem == explicit_snapshots.end()) { + return nullptr; + } + return elem->second.lock(); + } + + rocksdb::ManagedSnapshot *get_snapshot() { return snapshot.get(); } + + Rdb_explicit_snapshot(snapshot_info_st ss_info, + std::unique_ptr snapshot) + : explicit_snapshot(ss_info), snapshot(std::move(snapshot)) {} + + virtual ~Rdb_explicit_snapshot() { + std::lock_guard lock(explicit_snapshot_mutex); + explicit_snapshots.erase(ss_info.snapshot_id); + } +}; +#endif + /** Updates row counters based on the table type and operation type. */ @@ -144,11 +199,15 @@ static handler *rocksdb_create_handler(my_core::handlerton *hton, my_core::TABLE_SHARE *table_arg, my_core::MEM_ROOT *mem_root); -static rocksdb::CompactRangeOptions getCompactRangeOptions() { +static rocksdb::CompactRangeOptions +getCompactRangeOptions(int concurrency = 0) { rocksdb::CompactRangeOptions compact_range_options; compact_range_options.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForce; compact_range_options.exclusive_manual_compaction = false; + if (concurrency > 0) { + compact_range_options.max_subcompactions = concurrency; + } return compact_range_options; } @@ -187,6 +246,8 @@ Rdb_io_watchdog *io_watchdog = nullptr; static Rdb_background_thread rdb_bg_thread; +static Rdb_manual_compaction_thread rdb_mc_thread; + // List of table names (using regex) that are exceptions to the strict // collation check requirement. Regex_list_handler *rdb_collation_exceptions; @@ -200,30 +261,6 @@ static void rocksdb_flush_all_memtables() { } } -static void rocksdb_compact_column_family_stub( - THD *const thd, struct st_mysql_sys_var *const var, void *const var_ptr, - const void *const save) {} - -static int rocksdb_compact_column_family(THD *const thd, - struct st_mysql_sys_var *const var, - void *const var_ptr, - struct st_mysql_value *const value) { - char buff[STRING_BUFFER_USUAL_SIZE]; - int len = sizeof(buff); - - DBUG_ASSERT(value != nullptr); - - if (const char *const cf = value->val_str(value, buff, &len)) { - auto cfh = cf_manager.get_cf(cf); - if (cfh != nullptr && rdb != nullptr) { - sql_print_verbose_info("RocksDB: Manual compaction of column family: %s\n", - cf); - rdb->CompactRange(getCompactRangeOptions(), cfh, nullptr, nullptr); - } - } - return HA_EXIT_SUCCESS; -} - /////////////////////////////////////////////////////////// // Hash map: table name => open table handler /////////////////////////////////////////////////////////// @@ -249,6 +286,8 @@ struct Rdb_open_tables_map { Rdb_open_tables_map() : m_hash(get_hash_key, system_charset_info) { } + void free_hash(void) { m_hash.~Rdb_table_set(); } + std::vector get_table_names(void) const; }; @@ -368,6 +407,7 @@ static void rocksdb_drop_index_wakeup_thread( static my_bool rocksdb_pause_background_work = 0; static mysql_mutex_t rdb_sysvars_mutex; +static mysql_mutex_t rdb_block_cache_resize_mutex; static void rocksdb_set_pause_background_work( my_core::THD *const thd MY_ATTRIBUTE((__unused__)), @@ -450,6 +490,9 @@ static void rocksdb_set_wal_bytes_per_sync(THD *thd, struct st_mysql_sys_var *const var, void *const var_ptr, const void *const save); +static int rocksdb_validate_set_block_cache_size( + THD *thd, struct st_mysql_sys_var *const var, void *var_ptr, + struct st_mysql_value *value); ////////////////////////////////////////////////////////////////////////////// // Options definitions ////////////////////////////////////////////////////////////////////////////// @@ -510,11 +553,19 @@ static char* rocksdb_git_hash; char *compression_types_val= const_cast(get_rocksdb_supported_compression_types()); +static uint64_t rocksdb_write_policy = + rocksdb::TxnDBWritePolicy::WRITE_COMMITTED; +static my_bool rocksdb_error_on_suboptimal_collation = 1; +static uint32_t rocksdb_stats_recalc_rate = 0; +static uint32_t rocksdb_debug_manual_compaction_delay = 0; +static uint32_t rocksdb_max_manual_compactions = 0; std::atomic rocksdb_row_lock_deadlocks(0); std::atomic rocksdb_row_lock_wait_timeouts(0); std::atomic rocksdb_snapshot_conflict_errors(0); std::atomic rocksdb_wal_group_syncs(0); +std::atomic rocksdb_manual_compactions_processed(0); +std::atomic rocksdb_manual_compactions_running(0); @@ -600,6 +651,14 @@ static std::unique_ptr rocksdb_db_options = static std::shared_ptr rocksdb_rate_limiter; +/* This enum needs to be kept up to date with rocksdb::TxnDBWritePolicy */ +static const char *write_policy_names[] = {"write_committed", "write_prepared", + "write_unprepared", NullS}; + +static TYPELIB write_policy_typelib = {array_elements(write_policy_names) - 1, + "write_policy_typelib", + write_policy_names, nullptr}; + /* This enum needs to be kept up to date with rocksdb::InfoLogLevel */ static const char *info_log_level_names[] = {"debug_level", "info_level", "warn_level", "error_level", @@ -694,6 +753,14 @@ static int rocksdb_validate_flush_log_at_trx_commit( *static_cast(var_ptr) = static_cast(new_value); return HA_EXIT_SUCCESS; } +static void rocksdb_compact_column_family_stub( + THD *const thd, struct st_mysql_sys_var *const var, void *const var_ptr, + const void *const save) {} + +static int rocksdb_compact_column_family(THD *const thd, + struct st_mysql_sys_var *const var, + void *const var_ptr, + struct st_mysql_value *const value); static const char *index_type_names[] = {"kBinarySearch", "kHashSearch", NullS}; @@ -702,7 +769,8 @@ static TYPELIB index_type_typelib = {array_elements(index_type_names) - 1, nullptr}; const ulong RDB_MAX_LOCK_WAIT_SECONDS = 1024 * 1024 * 1024; -const ulong RDB_MAX_ROW_LOCKS = 1024 * 1024; +const ulong RDB_DEFAULT_MAX_ROW_LOCKS = 1024 * 1024; +const ulong RDB_MAX_ROW_LOCKS = 1024 * 1024 * 1024; const ulong RDB_DEFAULT_BULK_LOAD_SIZE = 1000; const ulong RDB_MAX_BULK_LOAD_SIZE = 1024 * 1024 * 1024; const size_t RDB_DEFAULT_MERGE_BUF_SIZE = 64 * 1024 * 1024; @@ -733,6 +801,11 @@ static MYSQL_THDVAR_ULONG(deadlock_detect_depth, PLUGIN_VAR_RQCMDARG, /*min*/ 2, /*max*/ ULONG_MAX, 0); +static MYSQL_THDVAR_BOOL( + commit_time_batch_for_recovery, PLUGIN_VAR_RQCMDARG, + "TransactionOptions::commit_time_batch_for_recovery for RocksDB", nullptr, + nullptr, FALSE); + static MYSQL_THDVAR_BOOL( trace_sst_api, PLUGIN_VAR_RQCMDARG, "Generate trace output in the log for each call to the SstFileWriter", @@ -744,6 +817,13 @@ static MYSQL_THDVAR_BOOL( "unique_checks and enables rocksdb_commit_in_the_middle.", rocksdb_check_bulk_load, nullptr, FALSE); +static MYSQL_THDVAR_BOOL(bulk_load_allow_sk, PLUGIN_VAR_RQCMDARG, + "Allow bulk loading of sk keys during bulk-load. " + "Can be changed only when bulk load is disabled.", + /* Intentionally reuse unsorted's check function */ + rocksdb_check_bulk_load_allow_unsorted, nullptr, + FALSE); + static MYSQL_THDVAR_BOOL(bulk_load_allow_unsorted, PLUGIN_VAR_RQCMDARG, "Allow unsorted input during bulk-load. " "Can be changed only when bulk load is disabled.", @@ -794,7 +874,7 @@ static MYSQL_THDVAR_BOOL(skip_bloom_filter_on_read, PLUGIN_VAR_RQCMDARG, static MYSQL_THDVAR_ULONG(max_row_locks, PLUGIN_VAR_RQCMDARG, "Maximum number of locks a transaction can have", nullptr, nullptr, - /*default*/ RDB_MAX_ROW_LOCKS, + /*default*/ RDB_DEFAULT_MAX_ROW_LOCKS, /*min*/ 1, /*max*/ RDB_MAX_ROW_LOCKS, 0); @@ -846,6 +926,12 @@ static MYSQL_THDVAR_ULONGLONG( /* min (0ms) */ RDB_MIN_MERGE_TMP_FILE_REMOVAL_DELAY, /* max */ SIZE_T_MAX, 1); +static MYSQL_THDVAR_INT( + manual_compaction_threads, PLUGIN_VAR_RQCMDARG, + "How many rocksdb threads to run for manual compactions", nullptr, nullptr, + /* default rocksdb.dboption max_subcompactions */ 0, + /* min */ 0, /* max */ 128, 0); + static MYSQL_SYSVAR_BOOL( create_if_missing, *reinterpret_cast(&rocksdb_db_options->create_if_missing), @@ -867,6 +953,12 @@ static MYSQL_SYSVAR_BOOL( "DBOptions::manual_wal_flush for RocksDB", nullptr, nullptr, rocksdb_db_options->manual_wal_flush); +static MYSQL_SYSVAR_ENUM(write_policy, rocksdb_write_policy, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, + "DBOptions::write_policy for RocksDB", nullptr, + nullptr, rocksdb::TxnDBWritePolicy::WRITE_COMMITTED, + &write_policy_typelib); + static MYSQL_SYSVAR_BOOL( create_missing_column_families, *reinterpret_cast( @@ -1077,7 +1169,9 @@ static MYSQL_SYSVAR_INT(table_cache_numshardbits, "DBOptions::table_cache_numshardbits for RocksDB", nullptr, nullptr, rocksdb_db_options->table_cache_numshardbits, - /* min */ 0, /* max */ INT_MAX, 0); + // LRUCache limits this to 19 bits, anything greater + // fails to create a cache and returns a nullptr + /* min */ 0, /* max */ 19, 0); static MYSQL_SYSVAR_UINT64_T(wal_ttl_seconds, rocksdb_db_options->WAL_ttl_seconds, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, @@ -1187,8 +1281,9 @@ static MYSQL_SYSVAR_BOOL( "DBOptions::enable_thread_tracking for RocksDB", nullptr, nullptr, true); static MYSQL_SYSVAR_LONGLONG(block_cache_size, rocksdb_block_cache_size, - PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, - "block_cache size for RocksDB", nullptr, nullptr, + PLUGIN_VAR_RQCMDARG, + "block_cache size for RocksDB", + rocksdb_validate_set_block_cache_size, nullptr, /* default */ RDB_DEFAULT_BLOCK_CACHE_SIZE, /* min */ RDB_MIN_BLOCK_CACHE_SIZE, /* max */ LONGLONG_MAX, @@ -1435,6 +1530,18 @@ static MYSQL_SYSVAR_BOOL( "on PK TTL data. This variable is a no-op in non-debug builds.", nullptr, nullptr, FALSE); +static MYSQL_SYSVAR_UINT( + max_manual_compactions, rocksdb_max_manual_compactions, PLUGIN_VAR_RQCMDARG, + "Maximum number of pending + ongoing number of manual compactions.", + nullptr, nullptr, /* default */ 10, /* min */ 0, /* max */ UINT_MAX, 0); + +static MYSQL_SYSVAR_UINT( + debug_manual_compaction_delay, rocksdb_debug_manual_compaction_delay, + PLUGIN_VAR_RQCMDARG, + "For debugging purposes only. Sleeping specified seconds " + "for simulating long running compactions.", + nullptr, nullptr, 0, /* min */ 0, /* max */ UINT_MAX, 0); + static MYSQL_SYSVAR_BOOL( reset_stats, rocksdb_reset_stats, PLUGIN_VAR_RQCMDARG, "Reset the RocksDB internal statistics without restarting the DB.", nullptr, @@ -1597,6 +1704,13 @@ static MYSQL_SYSVAR_UINT( RDB_DEFAULT_TBL_STATS_SAMPLE_PCT, /* everything */ 0, /* max */ RDB_TBL_STATS_SAMPLE_PCT_MAX, 0); +static MYSQL_SYSVAR_UINT( + stats_recalc_rate, rocksdb_stats_recalc_rate, PLUGIN_VAR_RQCMDARG, + "The number of indexes per second to recalculate statistics for. 0 to " + "disable background recalculation.", + nullptr, nullptr, 0 /* default value */, 0 /* min value */, + UINT_MAX /* max value */, 0); + static MYSQL_SYSVAR_BOOL( large_prefix, rocksdb_large_prefix, PLUGIN_VAR_RQCMDARG, "Support large index prefix length of 3072 bytes. If off, the maximum " @@ -1610,16 +1724,25 @@ static MYSQL_SYSVAR_BOOL( "detected.", nullptr, nullptr, FALSE); +static MYSQL_SYSVAR_BOOL(error_on_suboptimal_collation, + rocksdb_error_on_suboptimal_collation, + PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_READONLY, + "Raise an error instead of warning if a sub-optimal " + "collation is used", + nullptr, nullptr, TRUE); + static const int ROCKSDB_ASSUMED_KEY_VALUE_DISK_SIZE = 100; static struct st_mysql_sys_var *rocksdb_system_variables[] = { MYSQL_SYSVAR(lock_wait_timeout), MYSQL_SYSVAR(deadlock_detect), MYSQL_SYSVAR(deadlock_detect_depth), + MYSQL_SYSVAR(commit_time_batch_for_recovery), MYSQL_SYSVAR(max_row_locks), MYSQL_SYSVAR(write_batch_max_bytes), MYSQL_SYSVAR(lock_scanned_rows), MYSQL_SYSVAR(bulk_load), + MYSQL_SYSVAR(bulk_load_allow_sk), MYSQL_SYSVAR(bulk_load_allow_unsorted), MYSQL_SYSVAR(skip_unique_check_tables), MYSQL_SYSVAR(trace_sst_api), @@ -1637,6 +1760,7 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = { MYSQL_SYSVAR(create_if_missing), MYSQL_SYSVAR(two_write_queues), MYSQL_SYSVAR(manual_wal_flush), + MYSQL_SYSVAR(write_policy), MYSQL_SYSVAR(create_missing_column_families), MYSQL_SYSVAR(error_if_exists), MYSQL_SYSVAR(paranoid_checks), @@ -1754,6 +1878,11 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = { MYSQL_SYSVAR(large_prefix), MYSQL_SYSVAR(allow_to_start_after_corruption), MYSQL_SYSVAR(git_hash), + MYSQL_SYSVAR(error_on_suboptimal_collation), + MYSQL_SYSVAR(stats_recalc_rate), + MYSQL_SYSVAR(debug_manual_compaction_delay), + MYSQL_SYSVAR(max_manual_compactions), + MYSQL_SYSVAR(manual_compaction_threads), nullptr}; static rocksdb::WriteOptions @@ -1768,6 +1897,50 @@ rdb_get_rocksdb_write_options(my_core::THD *const thd) { return opt; } +static int rocksdb_compact_column_family(THD *const thd, + struct st_mysql_sys_var *const var, + void *const var_ptr, + struct st_mysql_value *const value) { + char buff[STRING_BUFFER_USUAL_SIZE]; + int len = sizeof(buff); + + DBUG_ASSERT(value != nullptr); + + if (const char *const cf = value->val_str(value, buff, &len)) { + auto cfh = cf_manager.get_cf(cf); + if (cfh != nullptr && rdb != nullptr) { + int mc_id = rdb_mc_thread.request_manual_compaction( + cfh, nullptr, nullptr, THDVAR(thd, manual_compaction_threads)); + if (mc_id == -1) { + my_error(ER_INTERNAL_ERROR, MYF(0), + "Can't schedule more manual compactions. " + "Increase rocksdb_max_manual_compactions or stop issuing " + "more manual compactions."); + return HA_EXIT_FAILURE; + } else if (mc_id < 0) { + return HA_EXIT_FAILURE; + } + // NO_LINT_DEBUG + sql_print_information("RocksDB: Manual compaction of column family: %s\n", + cf); + // Checking thd state every short cycle (100ms). This is for allowing to + // exiting this function without waiting for CompactRange to finish. + do { + my_sleep(100000); + } while (!thd->killed && + !rdb_mc_thread.is_manual_compaction_finished(mc_id)); + + if (thd->killed) { + // This cancels if requested compaction state is INITED. + // TODO(yoshinorim): Cancel running compaction as well once + // it is supported in RocksDB. + rdb_mc_thread.clear_manual_compaction_request(mc_id, true); + } + } + } + return HA_EXIT_SUCCESS; +} + /////////////////////////////////////////////////////////////////////////////////////////// /** @@ -1887,8 +2060,6 @@ protected: bool m_is_two_phase = false; private: - /* Number of RockDB savepoints taken */ - int m_n_savepoints; /* Number of write operations this transaction had when we took the last savepoint (the idea is not to take another savepoint if we haven't made @@ -1897,9 +2068,9 @@ private: ulonglong m_writes_at_last_savepoint; protected: - THD *m_thd = nullptr; - rocksdb::ReadOptions m_read_opts; +protected: + THD *m_thd = nullptr; static std::multiset s_tx_list; static mysql_mutex_t s_tx_list_mutex; @@ -1957,6 +2128,7 @@ protected: } public: + rocksdb::ReadOptions m_read_opts; const char *m_mysql_log_file_name; my_off_t m_mysql_log_offset; #ifdef MARIAROCKS_NOT_YET @@ -1967,6 +2139,7 @@ protected: String m_detailed_error; int64_t m_snapshot_timestamp = 0; bool m_ddl_transaction; + std::shared_ptr m_explicit_snapshot; /* Tracks the number of tables in use through external_lock. @@ -2032,7 +2205,7 @@ protected: if (s.IsDeadlock()) { my_core::thd_mark_transaction_to_rollback(thd, - false /* just statement */); + true /* whole transaction */); m_detailed_error = String(); table_handler->m_deadlock_counter.inc(); rocksdb_row_lock_deadlocks++; @@ -2049,7 +2222,7 @@ protected: } m_detailed_error = String(" (snapshot conflict)", system_charset_info); table_handler->m_deadlock_counter.inc(); - return HA_ERR_LOCK_DEADLOCK; + return HA_ERR_ROCKSDB_STATUS_BUSY; } if (s.IsIOError() || s.IsCorruption()) { @@ -2462,7 +2635,6 @@ public: entire transaction. */ do_set_savepoint(); - m_n_savepoints= 1; m_writes_at_last_savepoint= m_write_count; } @@ -2479,7 +2651,6 @@ public: { do_set_savepoint(); m_writes_at_last_savepoint= m_write_count; - m_n_savepoints++; } } @@ -2490,10 +2661,14 @@ public: void rollback_to_stmt_savepoint() { if (m_writes_at_last_savepoint != m_write_count) { do_rollback_to_savepoint(); - if (!--m_n_savepoints) { - do_set_savepoint(); - m_n_savepoints= 1; - } + /* + RollbackToSavePoint "removes the most recent SetSavePoint()", so + we need to set it again so that next statement can roll back to this + stage. + It's ok to do it here at statement end (instead of doing it at next + statement start) because setting a savepoint is cheap. + */ + do_set_savepoint(); m_writes_at_last_savepoint= m_write_count; } } @@ -2666,6 +2841,17 @@ public: void acquire_snapshot(bool acquire_now) override { if (m_read_opts.snapshot == nullptr) { +#ifdef MARIAROCKS_NOT_YET + const auto thd_ss = std::static_pointer_cast( + m_thd->get_explicit_snapshot()); + if (thd_ss) { + m_explicit_snapshot = thd_ss; + } + if (m_explicit_snapshot) { + auto snapshot = m_explicit_snapshot->get_snapshot()->snapshot(); + snapshot_created(snapshot); + } else +#endif if (is_tx_read_only()) { snapshot_created(rdb->GetSnapshot()); } else if (acquire_now) { @@ -2683,6 +2869,12 @@ public: if (m_read_opts.snapshot != nullptr) { m_snapshot_timestamp = 0; +#ifdef MARIAROCKS_NOT_YET + if (m_explicit_snapshot) { + m_explicit_snapshot.reset(); + need_clear = false; + } else +#endif if (is_tx_read_only()) { rdb->ReleaseSnapshot(m_read_opts.snapshot); need_clear = false; @@ -2792,6 +2984,10 @@ public: tx_opts.lock_timeout = rdb_convert_sec_to_ms(m_timeout_sec); tx_opts.deadlock_detect = THDVAR(m_thd, deadlock_detect); tx_opts.deadlock_detect_depth = THDVAR(m_thd, deadlock_detect_depth); + // If this variable is set, this will write commit time write batch + // information on recovery or memtable flush. + tx_opts.use_only_the_last_commit_time_batch_for_recovery = + THDVAR(m_thd, commit_time_batch_for_recovery); tx_opts.max_write_batch_size = THDVAR(m_thd, write_batch_max_bytes); write_opts.sync = (rocksdb_flush_log_at_trx_commit == FLUSH_LOG_SYNC); @@ -2886,7 +3082,7 @@ public: /* This is a rocksdb write batch. This class doesn't hold or wait on any transaction locks (skips rocksdb transaction API) thus giving better - performance. The commit is done through rdb->GetBaseDB()->Commit(). + performance. Currently this is only used for replication threads which are guaranteed to be non-conflicting. Any further usage of this class should completely @@ -2908,6 +3104,8 @@ private: bool commit_no_binlog() override { bool res = false; rocksdb::Status s; + rocksdb::TransactionDBWriteOptimizations optimize; + optimize.skip_concurrency_control = true; s = merge_auto_incr_map(m_batch->GetWriteBatch()); if (!s.ok()) { @@ -2918,7 +3116,7 @@ private: release_snapshot(); - s = rdb->GetBaseDB()->Write(write_opts, m_batch->GetWriteBatch()); + s = rdb->Write(write_opts, optimize, m_batch->GetWriteBatch()); if (!s.ok()) { rdb_handle_io_error(s, RDB_IO_ERROR_TX_COMMIT); res = true; @@ -2936,7 +3134,6 @@ error: return res; } -protected: /* Implementations of do_*savepoint based on rocksdB::WriteBatch savepoints */ void do_set_savepoint() override { m_batch->SetSavePoint(); @@ -2946,6 +3143,7 @@ protected: m_batch->RollbackToSavePoint(); } + public: bool is_writebatch_trx() const override { return true; } @@ -3033,6 +3231,13 @@ public: get_for_update(rocksdb::ColumnFamilyHandle *const column_family, const rocksdb::Slice &key, rocksdb::PinnableSlice *const value, bool exclusive) override { + if (value == nullptr) { + rocksdb::PinnableSlice pin_val; + rocksdb::Status s = get(column_family, key, &pin_val); + pin_val.Reset(); + return s; + } + return get(column_family, key, value); } @@ -3564,6 +3769,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx) We get here when committing a statement within a transaction. */ tx->make_stmt_savepoint_permanent(); + tx->make_stmt_savepoint_permanent(); } if (my_core::thd_tx_isolation(thd) <= ISO_READ_COMMITTED) { @@ -3752,6 +3958,7 @@ private: if (!path_entry.path.empty() && !path_entry.limit_exceeded) { auto deadlocking_txn = *(path_entry.path.end() - 1); deadlock_info.victim_trx_id = deadlocking_txn.m_txn_id; + deadlock_info.deadlock_time = path_entry.deadlock_time; } return deadlock_info; } @@ -3799,16 +4006,18 @@ private: path_data += "\n*** DEADLOCK PATH\n" "=========================================\n"; const auto dl_info = get_dl_path_trx_info(path_entry); + const auto deadlock_time = dl_info.deadlock_time; for (auto it = dl_info.path.begin(); it != dl_info.path.end(); it++) { const auto trx_info = *it; path_data += format_string( + "TIMESTAMP: %" PRId64 "\n" "TRANSACTION ID: %u\n" "COLUMN FAMILY NAME: %s\n" "WAITING KEY: %s\n" "LOCK TYPE: %s\n" "INDEX NAME: %s\n" "TABLE NAME: %s\n", - trx_info.trx_id, trx_info.cf_name.c_str(), + deadlock_time, trx_info.trx_id, trx_info.cf_name.c_str(), trx_info.waiting_key.c_str(), trx_info.exclusive_lock ? "EXCLUSIVE" : "SHARED", trx_info.index_name.c_str(), trx_info.table_name.c_str()); @@ -4082,7 +4291,7 @@ static bool rocksdb_show_status(handlerton *const hton, THD *const thd, (ulonglong)internal_cache_count * kDefaultInternalCacheSize); str.append(buf); res |= print_stats(thd, "MEMORY_STATS", "rocksdb", str, stat_print); -#ifdef MARIAROCKS_NOT_YET + /* Show the background thread status */ std::vector thread_list; rocksdb::Status s = rdb->GetEnv()->GetThreadList(&thread_list); @@ -4119,8 +4328,27 @@ static bool rocksdb_show_status(handlerton *const hton, THD *const thd, str, stat_print); } } + +#ifdef MARIAROCKS_NOT_YET + /* Explicit snapshot information */ + str.clear(); + { + std::lock_guard lock(explicit_snapshot_mutex); + for (const auto &elem : explicit_snapshots) { + const auto &ss = elem.second.lock(); + DBUG_ASSERT(ss != nullptr); + const auto &info = ss->ss_info; + str += "\nSnapshot ID: " + std::to_string(info.snapshot_id) + + "\nBinlog File: " + info.binlog_file + + "\nBinlog Pos: " + std::to_string(info.binlog_pos) + + "\nGtid Executed: " + info.gtid_executed + "\n"; + } + } #endif + if (!str.empty()) { + res |= print_stats(thd, "EXPLICIT_SNAPSHOTS", "rocksdb", str, stat_print); + } #ifdef MARIAROCKS_NOT_YET } else if (stat_type == HA_ENGINE_TRX) { /* Handle the SHOW ENGINE ROCKSDB TRANSACTION STATUS command */ @@ -4143,6 +4371,50 @@ static inline void rocksdb_register_tx(handlerton *const hton, THD *const thd, static const char *ha_rocksdb_exts[] = {NullS}; +#ifdef MARIAROCKS_NOT_YET +static bool rocksdb_explicit_snapshot( + handlerton *const /* hton */, /*!< in: RocksDB handlerton */ + THD *const thd, /*!< in: MySQL thread handle */ + snapshot_info_st *ss_info) /*!< out: Snapshot information */ +{ + switch (ss_info->op) { + case snapshot_operation::SNAPSHOT_CREATE: { + if (mysql_bin_log_is_open()) { + mysql_bin_log_lock_commits(ss_info); + } + auto s = Rdb_explicit_snapshot::create(ss_info, rdb, rdb->GetSnapshot()); + if (mysql_bin_log_is_open()) { + mysql_bin_log_unlock_commits(ss_info); + } + + thd->set_explicit_snapshot(s); + return s == nullptr; + } + case snapshot_operation::SNAPSHOT_ATTACH: { + auto s = Rdb_explicit_snapshot::get(ss_info->snapshot_id); + if (!s) { + return true; + } + *ss_info = s->ss_info; + thd->set_explicit_snapshot(s); + return false; + } + case snapshot_operation::SNAPSHOT_RELEASE: { + if (!thd->get_explicit_snapshot()) { + return true; + } + *ss_info = thd->get_explicit_snapshot()->ss_info; + thd->set_explicit_snapshot(nullptr); + return false; + } + default: + DBUG_ASSERT(false); + return true; + } + return true; +} +#endif + /* Supporting START TRANSACTION WITH CONSISTENT [ROCKSDB] SNAPSHOT @@ -4165,10 +4437,15 @@ static const char *ha_rocksdb_exts[] = {NullS}; InnoDB and RocksDB transactions. */ static int rocksdb_start_tx_and_assign_read_view( - handlerton *const hton, /*!< in: RocksDB handlerton */ - THD* thd) /*!< in: MySQL thread handle of the - user for whom the transaction should - be committed */ + handlerton *const hton, /*!< in: RocksDB handlerton */ + THD *const thd /*!< in: MySQL thread handle of the + user for whom the transaction should + be committed */ +) +#ifdef MARIAROCKS_NOT_YET + snapshot_info_st *ss_info) /*!< in/out: Snapshot info like binlog file, pos, + gtid executed and snapshot ID */ +#endif { ulong const tx_isolation = my_core::thd_tx_isolation(thd); @@ -4176,14 +4453,25 @@ static int rocksdb_start_tx_and_assign_read_view( my_error(ER_ISOLATION_LEVEL_WITH_CONSISTENT_SNAPSHOT, MYF(0)); return HA_EXIT_FAILURE; } + +#ifdef MARIADB_NOT_YET + if (ss_info) { + if (mysql_bin_log_is_open()) { + mysql_bin_log_lock_commits(ss_info); + } else { + return HA_EXIT_FAILURE; + } +#endif + /* MariaDB: there is no need to call mysql_bin_log_lock_commits and then unlock back. SQL layer calls start_consistent_snapshot() for all engines, including the binlog under LOCK_commit_ordered mutex. + The mutex prevents binlog commits from happening (right?) while the storage engine(s) allocate read snapshots. That way, each storage engine is - synchronized with current binlog position. + synchronized with current binlog position. */ mysql_mutex_assert_owner(&LOCK_commit_ordered); @@ -4195,9 +4483,106 @@ static int rocksdb_start_tx_and_assign_read_view( rocksdb_register_tx(hton, thd, tx); tx->acquire_snapshot(true); +#ifdef MARIADB_NOT_YET + if (ss_info) { + mysql_bin_log_unlock_commits(ss_info); + } +#endif return HA_EXIT_SUCCESS; } +#ifdef MARIADB_NOT_YET +static int rocksdb_start_tx_with_shared_read_view( + handlerton *const hton, /*!< in: RocksDB handlerton */ + THD *const thd) /*!< in: MySQL thread handle of the + user for whom the transaction should + be committed */ +#ifdef MARIADB_NOT_YET + snapshot_info_st *ss_info) /*!< out: Snapshot info like binlog file, pos, + gtid executed and snapshot ID */ +#endif +{ + DBUG_ASSERT(thd != nullptr); + + int error = HA_EXIT_SUCCESS; + + ulong const tx_isolation = my_core::thd_tx_isolation(thd); + if (tx_isolation != ISO_REPEATABLE_READ) { + my_error(ER_ISOLATION_LEVEL_WITH_CONSISTENT_SNAPSHOT, MYF(0)); + return HA_EXIT_FAILURE; + } + + Rdb_transaction *tx = nullptr; +#ifdef MARIADB_NOT_YET + std::shared_ptr explicit_snapshot; + const auto op = ss_info->op; + + DBUG_ASSERT(op == snapshot_operation::SNAPSHOT_CREATE || + op == snapshot_operation::SNAPSHOT_ATTACH); + + // case: if binlogs are available get binlog file/pos and gtid info + if (op == snapshot_operation::SNAPSHOT_CREATE && mysql_bin_log_is_open()) { + mysql_bin_log_lock_commits(ss_info); + } + + if (op == snapshot_operation::SNAPSHOT_ATTACH) { + explicit_snapshot = Rdb_explicit_snapshot::get(ss_info->snapshot_id); + if (!explicit_snapshot) { + my_printf_error(ER_UNKNOWN_ERROR, "Snapshot %llu does not exist", MYF(0), + ss_info->snapshot_id); + error = HA_EXIT_FAILURE; + } + } +#endif + + // case: all good till now + if (error == HA_EXIT_SUCCESS) { + tx = get_or_create_tx(thd); + Rdb_perf_context_guard guard(tx, rocksdb_perf_context_level(thd)); + +#ifdef MARIADB_NOT_YET + if (explicit_snapshot) { + tx->m_explicit_snapshot = explicit_snapshot; + } +#endif + + DBUG_ASSERT(!tx->has_snapshot()); + tx->set_tx_read_only(true); + rocksdb_register_tx(hton, thd, tx); + tx->acquire_snapshot(true); + +#ifdef MARIADB_NOT_YET + // case: an explicit snapshot was not assigned to this transaction + if (!tx->m_explicit_snapshot) { + tx->m_explicit_snapshot = + Rdb_explicit_snapshot::create(ss_info, rdb, tx->m_read_opts.snapshot); + if (!tx->m_explicit_snapshot) { + my_printf_error(ER_UNKNOWN_ERROR, "Could not create snapshot", MYF(0)); + error = HA_EXIT_FAILURE; + } + } +#endif + } + +#ifdef MARIADB_NOT_YET + // case: unlock the binlog + if (op == snapshot_operation::SNAPSHOT_CREATE && mysql_bin_log_is_open()) { + mysql_bin_log_unlock_commits(ss_info); + } + + DBUG_ASSERT(error == HA_EXIT_FAILURE || tx->m_explicit_snapshot); + + // copy over the snapshot details to pass to the upper layers + if (tx->m_explicit_snapshot) { + *ss_info = tx->m_explicit_snapshot->ss_info; + ss_info->op = op; + } +#endif + + return error; +} +#endif + /* Dummy SAVEPOINT support. This is needed for long running transactions * like mysqldump (https://bugs.mysql.com/bug.php?id=71017). * Current SAVEPOINT does not correctly handle ROLLBACK and does not return @@ -4422,9 +4807,11 @@ static int rocksdb_init_func(void *const p) { rdb_bg_thread.init(rdb_signal_bg_psi_mutex_key, rdb_signal_bg_psi_cond_key); rdb_drop_idx_thread.init(rdb_signal_drop_idx_psi_mutex_key, rdb_signal_drop_idx_psi_cond_key); + rdb_mc_thread.init(rdb_signal_mc_psi_mutex_key, rdb_signal_mc_psi_cond_key); #else rdb_bg_thread.init(); rdb_drop_idx_thread.init(); + rdb_mc_thread.init(); #endif mysql_mutex_init(rdb_collation_data_mutex_key, &rdb_collation_data_mutex, MY_MUTEX_INIT_FAST); @@ -4445,6 +4832,8 @@ static int rocksdb_init_func(void *const p) { mysql_mutex_init(rdb_sysvars_psi_mutex_key, &rdb_sysvars_mutex, MY_MUTEX_INIT_FAST); + mysql_mutex_init(rdb_block_cache_resize_mutex_key, + &rdb_block_cache_resize_mutex, MY_MUTEX_INIT_FAST); Rdb_transaction::init_mutex(); rocksdb_hton->state = SHOW_OPTION_YES; @@ -4465,8 +4854,14 @@ static int rocksdb_init_func(void *const p) { rocksdb_hton->rollback = rocksdb_rollback; rocksdb_hton->show_status = rocksdb_show_status; +#ifdef MARIADB_NOT_YET + rocksdb_hton->explicit_snapshot = rocksdb_explicit_snapshot; +#endif rocksdb_hton->start_consistent_snapshot = rocksdb_start_tx_and_assign_read_view; +#ifdef MARIADB_NOT_YET + rocksdb_hton->start_shared_snapshot = rocksdb_start_tx_with_shared_read_view; +#endif rocksdb_hton->savepoint_set = rocksdb_savepoint; rocksdb_hton->savepoint_rollback = rocksdb_rollback_to_savepoint; rocksdb_hton->savepoint_rollback_can_release_mdl = @@ -4535,6 +4930,35 @@ static int rocksdb_init_func(void *const p) { DBUG_RETURN(HA_EXIT_FAILURE); } + // Check whether the filesystem backing rocksdb_datadir allows O_DIRECT + if (rocksdb_db_options->use_direct_reads) { + rocksdb::EnvOptions soptions; + rocksdb::Status check_status; + rocksdb::Env *const env = rocksdb_db_options->env; + + std::string fname = format_string("%s/DIRECT_CHECK", rocksdb_datadir); + if (env->FileExists(fname).ok()) { + std::unique_ptr file; + soptions.use_direct_reads = true; + check_status = env->NewSequentialFile(fname, &file, soptions); + } else { + std::unique_ptr file; + soptions.use_direct_writes = true; + check_status = env->ReopenWritableFile(fname, &file, soptions); + if (file != nullptr) { + file->Close(); + } + env->DeleteFile(fname); + } + + if (!check_status.ok()) { + sql_print_error("RocksDB: Unable to use direct io in rocksdb-datadir:" + "(%s)", check_status.getState()); + rdb_open_tables.free_hash(); + DBUG_RETURN(HA_EXIT_FAILURE); + } + } + if (rocksdb_db_options->allow_mmap_writes && rocksdb_db_options->use_direct_io_for_flush_and_compaction) { // See above comment for allow_mmap_reads. (NO_LINT_DEBUG) @@ -4687,8 +5111,10 @@ static int rocksdb_init_func(void *const p) { cf_options_map->get_defaults()); rocksdb::TransactionDBOptions tx_db_options; - tx_db_options.transaction_lock_timeout = 2; // 2 seconds + tx_db_options.transaction_lock_timeout = 2000; // 2 seconds tx_db_options.custom_mutex_factory = std::make_shared(); + tx_db_options.write_policy = + static_cast(rocksdb_write_policy); status = check_rocksdb_options_compatibility(rocksdb_datadir, main_opts, cf_descr); @@ -4710,7 +5136,7 @@ static int rocksdb_init_func(void *const p) { } cf_manager.init(std::move(cf_options_map), &cf_handles); - if (dict_manager.init(rdb->GetBaseDB(), &cf_manager)) { + if (dict_manager.init(rdb, &cf_manager)) { // NO_LINT_DEBUG sql_print_error("RocksDB: Failed to initialize data dictionary."); DBUG_RETURN(HA_EXIT_FAILURE); @@ -4771,6 +5197,21 @@ static int rocksdb_init_func(void *const p) { DBUG_RETURN(HA_EXIT_FAILURE); } + err = rdb_mc_thread.create_thread(MANUAL_COMPACTION_THREAD_NAME +#ifdef HAVE_PSI_INTERFACE + , + rdb_mc_psi_thread_key +#endif + ); + if (err != 0) { + // NO_LINT_DEBUG + sql_print_error( + "RocksDB: Couldn't start the manual compaction thread: (errno=%d)", + err); + rdb_open_tables.free_hash(); + DBUG_RETURN(HA_EXIT_FAILURE); + } + rdb_set_collation_exception_list(rocksdb_strict_collation_exceptions); if (rocksdb_pause_background_work) { @@ -4877,6 +5318,16 @@ static int rocksdb_done_func(void *const p) { sql_print_error("RocksDB: Couldn't stop the index thread: (errno=%d)", err); } + // signal the manual compaction thread to stop + rdb_mc_thread.signal(true); + // Wait for the manual compaction thread to finish. + err = rdb_mc_thread.join(); + if (err != 0) { + // NO_LINT_DEBUG + sql_print_error( + "RocksDB: Couldn't stop the manual compaction thread: (errno=%d)", err); + } + if (rdb_open_tables.m_hash.size()) { // Looks like we are getting unloaded and yet we have some open tables // left behind. @@ -4933,6 +5384,7 @@ static int rocksdb_done_func(void *const p) { rdb_open_tables.m_hash.~Rdb_table_set(); mysql_mutex_destroy(&rdb_open_tables.m_mutex); mysql_mutex_destroy(&rdb_sysvars_mutex); + mysql_mutex_destroy(&rdb_block_cache_resize_mutex); delete rdb_collation_exceptions; @@ -5348,7 +5800,6 @@ longlong ha_rocksdb::update_hidden_pk_val() { /* Get the id of the hidden pk id from m_last_rowkey */ int ha_rocksdb::read_hidden_pk_id_from_rowkey(longlong *const hidden_pk_id) { - DBUG_ASSERT(hidden_pk_id != nullptr); DBUG_ASSERT(table != nullptr); DBUG_ASSERT(has_hidden_pk(table)); @@ -5404,14 +5855,14 @@ ha_rocksdb::ha_rocksdb(my_core::handlerton *const hton, my_core::TABLE_SHARE *const table_arg) : handler(hton, table_arg), m_table_handler(nullptr), m_scan_it(nullptr), m_scan_it_skips_bloom(false), m_scan_it_snapshot(nullptr), + m_scan_it_lower_bound(nullptr), m_scan_it_upper_bound(nullptr), m_tbl_def(nullptr), m_pk_descr(nullptr), m_key_descr_arr(nullptr), m_pk_can_be_decoded(false), m_maybe_unpack_info(false), m_pk_tuple(nullptr), m_pk_packed_tuple(nullptr), m_sk_packed_tuple(nullptr), m_end_key_packed_tuple(nullptr), m_sk_match_prefix(nullptr), m_sk_match_prefix_buf(nullptr), m_sk_packed_tuple_old(nullptr), m_dup_sk_packed_tuple(nullptr), - m_dup_sk_packed_tuple_old(nullptr), m_eq_cond_lower_bound(nullptr), - m_eq_cond_upper_bound(nullptr), m_pack_buffer(nullptr), + m_dup_sk_packed_tuple_old(nullptr), m_pack_buffer(nullptr), m_lock_rows(RDB_LOCK_NONE), m_keyread_only(FALSE), m_encoder_arr(nullptr), m_row_checksums_checked(0), m_in_rpl_delete_rows(false), m_in_rpl_update_rows(false), m_force_skip_unique_check(false) {} @@ -5950,9 +6401,6 @@ int ha_rocksdb::convert_field_from_storage_format( int ha_rocksdb::convert_record_from_storage_format( const rocksdb::Slice *const key, const rocksdb::Slice *const value, uchar *const buf) { - DBUG_ASSERT(key != nullptr); - DBUG_ASSERT(buf != nullptr); - Rdb_string_reader reader(value); /* @@ -6196,7 +6644,6 @@ int ha_rocksdb::alloc_key_buffers(const TABLE *const table_arg, DBUG_ENTER_FUNC(); DBUG_ASSERT(m_pk_tuple == nullptr); - DBUG_ASSERT(tbl_def_arg != nullptr); std::shared_ptr *const kd_arr = tbl_def_arg->m_key_descr_arr; @@ -6248,9 +6695,9 @@ int ha_rocksdb::alloc_key_buffers(const TABLE *const table_arg, m_pack_buffer = reinterpret_cast(my_malloc(max_packed_sk_len, MYF(0))); - m_eq_cond_upper_bound = + m_scan_it_lower_bound = reinterpret_cast(my_malloc(max_packed_sk_len, MYF(0))); - m_eq_cond_lower_bound = + m_scan_it_upper_bound = reinterpret_cast(my_malloc(max_packed_sk_len, MYF(0))); /* @@ -6267,7 +6714,7 @@ int ha_rocksdb::alloc_key_buffers(const TABLE *const table_arg, if (m_pk_tuple == nullptr || m_pk_packed_tuple == nullptr || m_sk_packed_tuple == nullptr || m_sk_packed_tuple_old == nullptr || m_end_key_packed_tuple == nullptr || m_pack_buffer == nullptr || - m_eq_cond_upper_bound == nullptr || m_eq_cond_lower_bound == nullptr || + m_scan_it_upper_bound == nullptr || m_scan_it_lower_bound == nullptr || (alloc_alter_buffers && (m_dup_sk_packed_tuple == nullptr || m_dup_sk_packed_tuple_old == nullptr))) { // One or more of the above allocations failed. Clean up and exit @@ -6307,11 +6754,11 @@ void ha_rocksdb::free_key_buffers() { my_free(m_dup_sk_packed_tuple_old); m_dup_sk_packed_tuple_old = nullptr; - my_free(m_eq_cond_upper_bound); - m_eq_cond_upper_bound = nullptr; + my_free(m_scan_it_lower_bound); + m_scan_it_lower_bound = nullptr; - my_free(m_eq_cond_lower_bound); - m_eq_cond_lower_bound = nullptr; + my_free(m_scan_it_upper_bound); + m_scan_it_upper_bound = nullptr; } #ifdef MARIAROCKS_NOT_YET @@ -6569,9 +7016,8 @@ bool ha_rocksdb::get_error_message(const int error, String *const buf) { static_assert(HA_ERR_ROCKSDB_LAST > HA_ERR_LAST, "HA_ERR_ROCKSDB_LAST > HA_ERR_LAST"); - DBUG_ASSERT(buf != nullptr); - - if (error == HA_ERR_LOCK_WAIT_TIMEOUT || error == HA_ERR_LOCK_DEADLOCK) { + if (error == HA_ERR_LOCK_WAIT_TIMEOUT || error == HA_ERR_LOCK_DEADLOCK || + error == HA_ERR_ROCKSDB_STATUS_BUSY) { Rdb_transaction *const tx = get_tx_from_thd(ha_thd()); DBUG_ASSERT(tx != nullptr); buf->append(tx->m_detailed_error); @@ -6653,10 +7099,10 @@ int ha_rocksdb::rdb_error_to_mysql(const rocksdb::Status &s, } if (opt_msg) { - my_error(ER_RDB_STATUS_MSG, MYF(0), opt_msg, s.code(), - s.ToString().c_str()); + std::string concatenated_error = s.ToString() + " (" + std::string(opt_msg) + ")"; + my_error(ER_GET_ERRMSG, MYF(0), s.code(), concatenated_error.c_str(), rocksdb_hton_name); } else { - my_error(ER_RDB_STATUS_GENERAL, MYF(0), s.code(), s.ToString().c_str()); + my_error(ER_GET_ERRMSG, MYF(0), s.code(), s.ToString().c_str(), rocksdb_hton_name); } return err; @@ -6725,7 +7171,6 @@ int ha_rocksdb::create_key_defs( /* = nullptr */) const { DBUG_ENTER_FUNC(); - DBUG_ASSERT(table_arg != nullptr); DBUG_ASSERT(table_arg->s != nullptr); uint i; @@ -6796,9 +7241,7 @@ int ha_rocksdb::create_cfs( std::array *const cfs) const { DBUG_ENTER_FUNC(); - DBUG_ASSERT(table_arg != nullptr); DBUG_ASSERT(table_arg->s != nullptr); - DBUG_ASSERT(tbl_def_arg != nullptr); char tablename_sys[NAME_LEN + 1]; bool tsys_set= false; @@ -6904,10 +7347,6 @@ int ha_rocksdb::create_inplace_key_defs( const std::array &cfs) const { DBUG_ENTER_FUNC(); - DBUG_ASSERT(table_arg != nullptr); - DBUG_ASSERT(tbl_def_arg != nullptr); - DBUG_ASSERT(old_tbl_def_arg != nullptr); - std::shared_ptr *const old_key_descr = old_tbl_def_arg->m_key_descr_arr; std::shared_ptr *const new_key_descr = @@ -6975,11 +7414,6 @@ std::unordered_map ha_rocksdb::get_old_key_positions( const Rdb_tbl_def *const old_tbl_def_arg) const { DBUG_ENTER_FUNC(); - DBUG_ASSERT(table_arg != nullptr); - DBUG_ASSERT(old_table_arg != nullptr); - DBUG_ASSERT(tbl_def_arg != nullptr); - DBUG_ASSERT(old_tbl_def_arg != nullptr); - std::shared_ptr *const old_key_descr = old_tbl_def_arg->m_key_descr_arr; std::unordered_map old_key_pos; @@ -7045,9 +7479,6 @@ int ha_rocksdb::compare_keys(const KEY *const old_key, const KEY *const new_key) const { DBUG_ENTER_FUNC(); - DBUG_ASSERT(old_key != nullptr); - DBUG_ASSERT(new_key != nullptr); - /* Check index name. */ if (strcmp(old_key->name, new_key->name) != 0) { DBUG_RETURN(HA_EXIT_FAILURE); @@ -7078,9 +7509,6 @@ int ha_rocksdb::compare_key_parts(const KEY *const old_key, const KEY *const new_key) const { DBUG_ENTER_FUNC(); - DBUG_ASSERT(old_key != nullptr); - DBUG_ASSERT(new_key != nullptr); - /* Skip if key parts do not match, as it is a different key */ if (new_key->user_defined_key_parts != old_key->user_defined_key_parts) { DBUG_RETURN(HA_EXIT_FAILURE); @@ -7125,7 +7553,6 @@ int ha_rocksdb::create_key_def(const TABLE *const table_arg, const uint &i, const struct key_def_cf_info &cf_info) const { DBUG_ENTER_FUNC(); - DBUG_ASSERT(new_key_def != nullptr); DBUG_ASSERT(*new_key_def == nullptr); uint64 ttl_duration = 0; @@ -7212,8 +7639,6 @@ int ha_rocksdb::create_key_def(const TABLE *const table_arg, const uint &i, int rdb_normalize_tablename(const std::string &tablename, std::string *const strbuf) { - DBUG_ASSERT(strbuf != nullptr); - if (tablename.size() < 2 || tablename[0] != '.' || (tablename[1] != FN_LIBCHAR && tablename[1] != FN_LIBCHAR2)) { DBUG_ASSERT(0); // We were not passed table name? @@ -7537,8 +7962,6 @@ int ha_rocksdb::read_key_exact(const Rdb_key_def &kd, const bool &full_key_match, const rocksdb::Slice &key_slice, const int64_t ttl_filter_ts) { - DBUG_ASSERT(iter != nullptr); - /* We are looking for the first record such that index_tuple= lookup_tuple. @@ -7746,8 +8169,6 @@ int ha_rocksdb::calc_eq_cond_len(const Rdb_key_def &kd, } int ha_rocksdb::read_row_from_primary_key(uchar *const buf) { - DBUG_ASSERT(buf != nullptr); - int rc; const rocksdb::Slice &rkey = m_scan_it->key(); const uint pk_size = rkey.size(); @@ -7771,8 +8192,6 @@ int ha_rocksdb::read_row_from_primary_key(uchar *const buf) { int ha_rocksdb::read_row_from_secondary_key(uchar *const buf, const Rdb_key_def &kd, bool move_forward) { - DBUG_ASSERT(buf != nullptr); - int rc = 0; uint pk_size; @@ -7875,7 +8294,6 @@ ulong ha_rocksdb::index_flags(uint inx, uint part, bool all_parts) const { pair for. */ int ha_rocksdb::secondary_index_read(const int keyno, uchar *const buf) { - DBUG_ASSERT(buf != nullptr); DBUG_ASSERT(table != nullptr); #ifdef MARIAROCKS_NOT_YET stats.rows_requested++; @@ -8192,7 +8610,7 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key, else rc = read_row_from_secondary_key(buf, kd, move_forward); - if (rc != HA_ERR_LOCK_DEADLOCK || !is_new_snapshot) + if (rc != HA_ERR_ROCKSDB_STATUS_BUSY || !is_new_snapshot) break; /* Exit the loop */ // release the snapshot and iterator so they will be regenerated @@ -8237,8 +8655,6 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key, int ha_rocksdb::find_icp_matching_index_rec(const bool &move_forward, uchar *const buf) { - DBUG_ASSERT(buf != nullptr); - if (pushed_idx_cond && pushed_idx_cond_keyno == active_index) { const Rdb_key_def &kd = *m_key_descr_arr[active_index]; @@ -8552,8 +8968,6 @@ int ha_rocksdb::get_row_by_rowid(uchar *const buf, const char *const rowid, const bool skip_ttl_check) { DBUG_ENTER_FUNC(); - DBUG_ASSERT(buf != nullptr); - DBUG_ASSERT(rowid != nullptr); DBUG_ASSERT(table != nullptr); int rc; @@ -8766,8 +9180,6 @@ int ha_rocksdb::index_last(uchar *const buf) { int ha_rocksdb::index_first_intern(uchar *const buf) { DBUG_ENTER_FUNC(); - DBUG_ASSERT(buf != nullptr); - uchar *key; uint key_size; int rc; @@ -8797,7 +9209,7 @@ int ha_rocksdb::index_first_intern(uchar *const buf) { m_skip_scan_it_next_call = true; rc = index_next_with_direction(buf, true); - if (rc != HA_ERR_LOCK_DEADLOCK || !is_new_snapshot) + if (rc != HA_ERR_ROCKSDB_STATUS_BUSY || !is_new_snapshot) break; // exit the loop // release the snapshot and iterator so they will be regenerated @@ -8859,8 +9271,6 @@ int ha_rocksdb::index_first_intern(uchar *const buf) { int ha_rocksdb::index_last_intern(uchar *const buf) { DBUG_ENTER_FUNC(); - DBUG_ASSERT(buf != nullptr); - uchar *key; uint key_size; int rc; @@ -8902,7 +9312,7 @@ int ha_rocksdb::index_last_intern(uchar *const buf) { rc = secondary_index_read(active_index, buf); } - if (rc != HA_ERR_LOCK_DEADLOCK || !is_new_snapshot) + if (rc != HA_ERR_ROCKSDB_STATUS_BUSY || !is_new_snapshot) break; /* exit the loop */ // release the snapshot and iterator so they will be regenerated @@ -8987,7 +9397,6 @@ bool ha_rocksdb::commit_in_the_middle() { @retval false if bulk commit was skipped or succeeded */ bool ha_rocksdb::do_bulk_commit(Rdb_transaction *const tx) { - DBUG_ASSERT(tx != nullptr); return commit_in_the_middle() && tx->get_write_count() >= THDVAR(table->in_use, bulk_load_size) && tx->flush_batch(); @@ -9000,7 +9409,6 @@ bool ha_rocksdb::do_bulk_commit(Rdb_transaction *const tx) { 'auto-incremented' pk.) */ bool ha_rocksdb::has_hidden_pk(const TABLE *const table) const { - DBUG_ASSERT(table != nullptr); return Rdb_key_def::table_has_hidden_pk(table); } @@ -9010,9 +9418,7 @@ bool ha_rocksdb::has_hidden_pk(const TABLE *const table) const { */ bool ha_rocksdb::is_hidden_pk(const uint index, const TABLE *const table_arg, const Rdb_tbl_def *const tbl_def_arg) { - DBUG_ASSERT(table_arg != nullptr); DBUG_ASSERT(table_arg->s != nullptr); - DBUG_ASSERT(tbl_def_arg != nullptr); return (table_arg->s->primary_key == MAX_INDEXES && index == tbl_def_arg->m_key_count - 1); @@ -9021,9 +9427,7 @@ bool ha_rocksdb::is_hidden_pk(const uint index, const TABLE *const table_arg, /* Returns index of primary key */ uint ha_rocksdb::pk_index(const TABLE *const table_arg, const Rdb_tbl_def *const tbl_def_arg) { - DBUG_ASSERT(table_arg != nullptr); DBUG_ASSERT(table_arg->s != nullptr); - DBUG_ASSERT(tbl_def_arg != nullptr); return table_arg->s->primary_key == MAX_INDEXES ? tbl_def_arg->m_key_count - 1 : table_arg->s->primary_key; @@ -9032,9 +9436,7 @@ uint ha_rocksdb::pk_index(const TABLE *const table_arg, /* Returns true if given index number is a primary key */ bool ha_rocksdb::is_pk(const uint index, const TABLE *const table_arg, const Rdb_tbl_def *const tbl_def_arg) { - DBUG_ASSERT(table_arg != nullptr); DBUG_ASSERT(table_arg->s != nullptr); - DBUG_ASSERT(tbl_def_arg != nullptr); return index == table_arg->s->primary_key || is_hidden_pk(index, table_arg, tbl_def_arg); @@ -9049,9 +9451,6 @@ uint ha_rocksdb::max_supported_key_part_length() const { const char *ha_rocksdb::get_key_name(const uint index, const TABLE *const table_arg, const Rdb_tbl_def *const tbl_def_arg) { - DBUG_ASSERT(table_arg != nullptr); - DBUG_ASSERT(tbl_def_arg != nullptr); - if (is_hidden_pk(index, table_arg, tbl_def_arg)) { return HIDDEN_PK_NAME; } @@ -9065,9 +9464,6 @@ const char *ha_rocksdb::get_key_name(const uint index, const char *ha_rocksdb::get_key_comment(const uint index, const TABLE *const table_arg, const Rdb_tbl_def *const tbl_def_arg) { - DBUG_ASSERT(table_arg != nullptr); - DBUG_ASSERT(tbl_def_arg != nullptr); - if (is_hidden_pk(index, table_arg, tbl_def_arg)) { return nullptr; } @@ -9119,7 +9515,6 @@ const std::string ha_rocksdb::generate_cf_name(const uint index, } const std::string ha_rocksdb::get_table_comment(const TABLE *const table_arg) { - DBUG_ASSERT(table_arg != nullptr); DBUG_ASSERT(table_arg->s != nullptr); return table_arg->s->comment.str; @@ -9236,8 +9631,7 @@ int ha_rocksdb::check_and_lock_unique_pk(const uint &key_id, /* If the keys are the same, then no lock is needed */ - if (!Rdb_pk_comparator::bytewise_compare(row_info.new_pk_slice, - row_info.old_pk_slice)) { + if (!row_info.new_pk_slice.compare(row_info.old_pk_slice)) { *found = false; return HA_EXIT_SUCCESS; } @@ -9326,8 +9720,7 @@ int ha_rocksdb::check_and_lock_sk(const uint &key_id, rocksdb::Slice((const char *)m_sk_packed_tuple, size); /* - For UPDATEs, if the key has changed, we need to obtain a lock. INSERTs - always require locking. + Acquire lock on the old key in case of UPDATE */ if (row_info.old_data != nullptr) { size = kd.pack_record(table, m_pack_buffer, row_info.old_data, @@ -9336,14 +9729,18 @@ int ha_rocksdb::check_and_lock_sk(const uint &key_id, const rocksdb::Slice old_slice = rocksdb::Slice((const char *)m_sk_packed_tuple_old, size); - /* - For updates, if the keys are the same, then no lock is needed + const rocksdb::Status s = + get_for_update(row_info.tx, kd.get_cf(), old_slice, nullptr); + if (!s.ok()) { + return row_info.tx->set_status_error(table->in_use, s, kd, m_tbl_def, + m_table_handler); + } - Also check to see if the key has any fields set to NULL. If it does, then - this key is unique since NULL is not equal to each other, so no lock is - needed. + /* + If the old and new keys are the same we're done since we've already taken + the lock on the old key */ - if (!Rdb_pk_comparator::bytewise_compare(new_slice, old_slice)) { + if (!new_slice.compare(old_slice)) { return HA_EXIT_SUCCESS; } } @@ -9369,16 +9766,14 @@ int ha_rocksdb::check_and_lock_sk(const uint &key_id, The bloom filter may need to be disabled for this lookup. */ - uchar min_bound_buf[MAX_KEY_LENGTH]; - uchar max_bound_buf[MAX_KEY_LENGTH]; - rocksdb::Slice min_bound_slice; - rocksdb::Slice max_bound_slice; + uchar lower_bound_buf[Rdb_key_def::INDEX_NUMBER_SIZE]; + uchar upper_bound_buf[Rdb_key_def::INDEX_NUMBER_SIZE]; + rocksdb::Slice lower_bound_slice; + rocksdb::Slice upper_bound_slice; + const bool total_order_seek = !check_bloom_and_set_bounds( - ha_thd(), kd, new_slice, all_parts_used, - min_bound_buf, - max_bound_buf, - &min_bound_slice, - &max_bound_slice); + ha_thd(), kd, new_slice, all_parts_used, Rdb_key_def::INDEX_NUMBER_SIZE, + lower_bound_buf, upper_bound_buf, &lower_bound_slice, &upper_bound_slice); const bool fill_cache = !THDVAR(ha_thd(), skip_fill_cache); const rocksdb::Status s = @@ -9389,9 +9784,8 @@ int ha_rocksdb::check_and_lock_sk(const uint &key_id, } rocksdb::Iterator *const iter = row_info.tx->get_iterator( - kd.get_cf(), total_order_seek, fill_cache, - min_bound_slice, max_bound_slice, - true /* read current data */, + kd.get_cf(), total_order_seek, fill_cache, lower_bound_slice, + upper_bound_slice, true /* read current data */, false /* acquire snapshot */); /* Need to scan the transaction to see if there is a duplicate key. @@ -9601,9 +9995,11 @@ int ha_rocksdb::update_pk(const Rdb_key_def &kd, } int ha_rocksdb::update_sk(const TABLE *const table_arg, const Rdb_key_def &kd, - const struct update_row_info &row_info) { + const struct update_row_info &row_info, + const bool bulk_load_sk) { int new_packed_size; int old_packed_size; + int rc = HA_EXIT_SUCCESS; rocksdb::Slice new_key_slice; rocksdb::Slice new_value_slice; @@ -9681,18 +10077,23 @@ int ha_rocksdb::update_sk(const TABLE *const table_arg, const Rdb_key_def &kd, rocksdb::Slice(reinterpret_cast(m_sk_tails.ptr()), m_sk_tails.get_current_pos()); - row_info.tx->get_indexed_write_batch()->Put(kd.get_cf(), new_key_slice, - new_value_slice); + if (bulk_load_sk && row_info.old_data == nullptr) { + rc = bulk_load_key(row_info.tx, kd, new_key_slice, new_value_slice, true); + } else { + row_info.tx->get_indexed_write_batch()->Put(kd.get_cf(), new_key_slice, + new_value_slice); + } row_info.tx->update_bytes_written(bytes_written + new_key_slice.size() + new_value_slice.size()); - return HA_EXIT_SUCCESS; + return rc; } int ha_rocksdb::update_indexes(const struct update_row_info &row_info, const bool &pk_changed) { int rc; + bool bulk_load_sk; // The PK must be updated first to pull out the TTL value. rc = update_pk(*m_pk_descr, row_info, pk_changed); @@ -9700,13 +10101,17 @@ int ha_rocksdb::update_indexes(const struct update_row_info &row_info, return rc; } - // Update the remaining indexes. + // Update the remaining indexes. Allow bulk loading only if + // allow_sk is enabled + bulk_load_sk = rocksdb_enable_bulk_load_api && + THDVAR(table->in_use, bulk_load) && + THDVAR(table->in_use, bulk_load_allow_sk); for (uint key_id = 0; key_id < m_tbl_def->m_key_count; key_id++) { if (is_pk(key_id, table, m_tbl_def)) { continue; } - rc = update_sk(table, *m_key_descr_arr[key_id], row_info); + rc = update_sk(table, *m_key_descr_arr[key_id], row_info, bulk_load_sk); if (rc != HA_EXIT_SUCCESS) { return rc; } @@ -9804,28 +10209,22 @@ int ha_rocksdb::update_write_row(const uchar *const old_data, @param outer_u */ -void ha_rocksdb::setup_iterator_bounds(const Rdb_key_def &kd, - const rocksdb::Slice &eq_cond, - uchar *lower_bound_buf, - uchar *upper_bound_buf, - rocksdb::Slice *out_lower_bound, - rocksdb::Slice *out_upper_bound) { - uint eq_cond_len = eq_cond.size(); - memcpy(upper_bound_buf, eq_cond.data(), eq_cond_len); - kd.successor(upper_bound_buf, eq_cond_len); - memcpy(lower_bound_buf, eq_cond.data(), eq_cond_len); - kd.predecessor(lower_bound_buf, eq_cond_len); +void ha_rocksdb::setup_iterator_bounds( + const Rdb_key_def &kd, const rocksdb::Slice &eq_cond, size_t bound_len, + uchar *const lower_bound, uchar *const upper_bound, + rocksdb::Slice *lower_bound_slice, rocksdb::Slice *upper_bound_slice) { + uint min_len = std::min(eq_cond.size(), bound_len); + memcpy(upper_bound, eq_cond.data(), min_len); + kd.successor(upper_bound, min_len); + memcpy(lower_bound, eq_cond.data(), min_len); + kd.predecessor(lower_bound, min_len); if (kd.m_is_reverse_cf) { - *out_upper_bound = - rocksdb::Slice((const char *)lower_bound_buf, eq_cond_len); - *out_lower_bound = - rocksdb::Slice((const char *)upper_bound_buf, eq_cond_len); + *upper_bound_slice = rocksdb::Slice((const char *)lower_bound, min_len); + *lower_bound_slice = rocksdb::Slice((const char *)upper_bound, min_len); } else { - *out_upper_bound = - rocksdb::Slice((const char *)upper_bound_buf, eq_cond_len); - *out_lower_bound = - rocksdb::Slice((const char *)lower_bound_buf, eq_cond_len); + *upper_bound_slice = rocksdb::Slice((const char *)upper_bound, min_len); + *lower_bound_slice = rocksdb::Slice((const char *)lower_bound, min_len); } } @@ -9837,7 +10236,6 @@ void ha_rocksdb::setup_scan_iterator(const Rdb_key_def &kd, rocksdb::Slice *const slice, const bool use_all_keys, const uint eq_cond_len) { - DBUG_ASSERT(slice != nullptr); DBUG_ASSERT(slice->size() >= eq_cond_len); Rdb_transaction *const tx = get_or_create_tx(table->in_use); @@ -9845,11 +10243,10 @@ void ha_rocksdb::setup_scan_iterator(const Rdb_key_def &kd, bool skip_bloom = true; const rocksdb::Slice eq_cond(slice->data(), eq_cond_len); - if (check_bloom_and_set_bounds(ha_thd(), kd, eq_cond, use_all_keys, - m_eq_cond_lower_bound, - m_eq_cond_upper_bound, - &m_eq_cond_lower_bound_slice, - &m_eq_cond_upper_bound_slice)) { + if (check_bloom_and_set_bounds( + ha_thd(), kd, eq_cond, use_all_keys, eq_cond_len, + m_scan_it_lower_bound, m_scan_it_upper_bound, + &m_scan_it_lower_bound_slice, &m_scan_it_upper_bound_slice)) { skip_bloom = false; } @@ -9890,8 +10287,8 @@ void ha_rocksdb::setup_scan_iterator(const Rdb_key_def &kd, m_scan_it = rdb->NewIterator(read_opts, kd.get_cf()); } else { m_scan_it = tx->get_iterator(kd.get_cf(), skip_bloom, fill_cache, - m_eq_cond_lower_bound_slice, - m_eq_cond_upper_bound_slice); + m_scan_it_lower_bound_slice, + m_scan_it_upper_bound_slice); } m_scan_it_skips_bloom = skip_bloom; } @@ -9957,7 +10354,7 @@ int ha_rocksdb::rnd_next(uchar *const buf) { int rc; for (;;) { rc = rnd_next_with_direction(buf, true); - if (rc != HA_ERR_LOCK_DEADLOCK || !m_rnd_scan_is_new_snapshot) + if (rc != HA_ERR_ROCKSDB_STATUS_BUSY || !m_rnd_scan_is_new_snapshot) break; /* exit the loop */ // release the snapshot and iterator and then regenerate them @@ -10286,7 +10683,7 @@ int ha_rocksdb::info(uint flag) { if (static_cast(stats.data_file_length) < 0 || static_cast(stats.index_file_length) < 0 || static_cast(stats.records) < 0) { - if (analyze(nullptr, nullptr)) { + if (calculate_stats_for_table()) { DBUG_RETURN(HA_EXIT_FAILURE); } @@ -10364,10 +10761,21 @@ int ha_rocksdb::info(uint flag) { KEY *const k = &table->key_info[i]; for (uint j = 0; j < k->ext_key_parts; j++) { const Rdb_index_stats &k_stats = m_key_descr_arr[i]->m_stats; - uint x = k_stats.m_distinct_keys_per_prefix.size() > j && - k_stats.m_distinct_keys_per_prefix[j] > 0 - ? k_stats.m_rows / k_stats.m_distinct_keys_per_prefix[j] - : 0; + uint x; + + if (k_stats.m_distinct_keys_per_prefix.size() > j && + k_stats.m_distinct_keys_per_prefix[j] > 0) { + x = k_stats.m_rows / k_stats.m_distinct_keys_per_prefix[j]; + /* + If the number of rows is less than the number of prefixes (due to + sampling), the average number of rows with the same prefix is 1. + */ + if (x == 0) { + x = 1; + } + } else { + x = 0; + } if (x > stats.records) x = stats.records; if ((x == 0 && rocksdb_debug_optimizer_no_zero_cardinality) || @@ -10571,7 +10979,7 @@ THR_LOCK_DATA **ha_rocksdb::store_lock(THD *const thd, THR_LOCK_DATA **to, m_lock_rows = RDB_LOCK_WRITE; } else if (lock_type == TL_READ_WITH_SHARED_LOCKS) { m_lock_rows = RDB_LOCK_READ; - } else { + } else if (lock_type != TL_IGNORE) { m_lock_rows = RDB_LOCK_NONE; if (THDVAR(thd, lock_scanned_rows)) { /* @@ -10736,6 +11144,13 @@ int ha_rocksdb::external_lock(THD *const thd, int lock_type) { DBUG_RETURN(HA_ERR_UNSUPPORTED); } +#ifdef MARIADB_NOT_YET + if (thd->get_explicit_snapshot()) { + my_error(ER_UPDATES_WITH_EXPLICIT_SNAPSHOT, MYF(0)); + DBUG_RETURN(HA_ERR_UNSUPPORTED); + } +#endif + /* SQL layer signals us to take a write lock. It does so when starting DML statement. We should put locks on the rows we're reading. @@ -10957,8 +11372,6 @@ void Rdb_drop_index_thread::run() { } Rdb_tbl_def *ha_rocksdb::get_table_if_exists(const char *const tablename) { - DBUG_ASSERT(tablename != nullptr); - std::string str; if (rdb_normalize_tablename(tablename, &str) != HA_EXIT_SUCCESS) { // We were not passed table name? @@ -11021,6 +11434,12 @@ int ha_rocksdb::remove_rows(Rdb_tbl_def *const tbl) { char key_buf[MAX_KEY_LENGTH]; uint key_len; ulonglong bytes_written = 0; + + uchar lower_bound_buf[Rdb_key_def::INDEX_NUMBER_SIZE]; + uchar upper_bound_buf[Rdb_key_def::INDEX_NUMBER_SIZE]; + rocksdb::Slice lower_bound_slice; + rocksdb::Slice upper_bound_slice; + /* Remove all records in each index. (This is is not crash-safe, but it doesn't matter, because bulk row @@ -11031,13 +11450,12 @@ int ha_rocksdb::remove_rows(Rdb_tbl_def *const tbl) { kd.get_infimum_key(reinterpret_cast(key_buf), &key_len); rocksdb::ColumnFamilyHandle *cf = kd.get_cf(); const rocksdb::Slice table_key(key_buf, key_len); - setup_iterator_bounds(kd, table_key, - m_eq_cond_lower_bound, - m_eq_cond_upper_bound, - &m_eq_cond_lower_bound_slice, - &m_eq_cond_upper_bound_slice); - opts.iterate_lower_bound = &m_eq_cond_lower_bound_slice; - opts.iterate_upper_bound = &m_eq_cond_upper_bound_slice; + setup_iterator_bounds(kd, table_key, Rdb_key_def::INDEX_NUMBER_SIZE, + lower_bound_buf, upper_bound_buf, &lower_bound_slice, + &upper_bound_slice); + DBUG_ASSERT(key_len == Rdb_key_def::INDEX_NUMBER_SIZE); + opts.iterate_lower_bound = &lower_bound_slice; + opts.iterate_upper_bound = &upper_bound_slice; std::unique_ptr it(rdb->NewIterator(opts, cf)); it->Seek(table_key); @@ -11076,9 +11494,6 @@ int ha_rocksdb::remove_rows(Rdb_tbl_def *const tbl) { int ha_rocksdb::rename_table(const char *const from, const char *const to) { DBUG_ENTER_FUNC(); - DBUG_ASSERT(from != nullptr); - DBUG_ASSERT(to != nullptr); - std::string from_str; std::string to_str; std::string from_db; @@ -11329,29 +11744,28 @@ int ha_rocksdb::optimize(THD *const thd, HA_CHECK_OPT *const check_opt) { DBUG_RETURN(HA_EXIT_SUCCESS); } -int ha_rocksdb::calculate_stats(const TABLE *const table_arg, THD *const thd, - HA_CHECK_OPT *const check_opt) { +static int calculate_stats( + const std::unordered_map> + &to_recalc, + bool include_memtables) { DBUG_ENTER_FUNC(); // find per column family key ranges which need to be queried std::unordered_map> ranges; - std::unordered_set ids_to_check; - std::vector buf(table_arg->s->keys * 2 * - Rdb_key_def::INDEX_NUMBER_SIZE); std::unordered_map stats; - for (uint i = 0; i < table_arg->s->keys; i++) { - const auto bufp = &buf[i * 2 * Rdb_key_def::INDEX_NUMBER_SIZE]; - const Rdb_key_def &kd = *m_key_descr_arr[i]; - const GL_INDEX_ID index_id = kd.get_gl_index_id(); - ranges[kd.get_cf()].push_back(get_range(i, bufp)); + std::vector buf(to_recalc.size() * 2 * Rdb_key_def::INDEX_NUMBER_SIZE); + + uchar *bufp = buf.data(); + for (const auto &it : to_recalc) { + const GL_INDEX_ID index_id = it.first; + auto &kd = it.second; + ranges[kd->get_cf()].push_back(myrocks::get_range(*kd, bufp)); + bufp += 2 * Rdb_key_def::INDEX_NUMBER_SIZE; - ids_to_check.insert(index_id); - // Initialize the stats to 0. If there are no files that contain - // this gl_index_id, then 0 should be stored for the cached stats. stats[index_id] = Rdb_index_stats(index_id); - DBUG_ASSERT(kd.get_key_parts() > 0); - stats[index_id].m_distinct_keys_per_prefix.resize(kd.get_key_parts()); + DBUG_ASSERT(kd->get_key_parts() > 0); + stats[index_id].m_distinct_keys_per_prefix.resize(kd->get_key_parts()); } // get RocksDB table properties for these ranges @@ -11362,8 +11776,8 @@ int ha_rocksdb::calculate_stats(const TABLE *const table_arg, THD *const thd, it.first, &it.second[0], it.second.size(), &props); DBUG_ASSERT(props.size() >= old_size); if (!status.ok()) { - DBUG_RETURN( - rdb_error_to_mysql(status, "Could not access RocksDB properties")); + DBUG_RETURN(ha_rocksdb::rdb_error_to_mysql( + status, "Could not access RocksDB properties")); } } @@ -11384,61 +11798,62 @@ int ha_rocksdb::calculate_stats(const TABLE *const table_arg, THD *const thd, other SQL tables, it can be that we're only seeing a small fraction of table's entries (and so we can't update statistics based on that). */ - if (ids_to_check.find(it1.m_gl_index_id) == ids_to_check.end()) + if (stats.find(it1.m_gl_index_id) == stats.end()) { continue; + } - auto kd = ddl_manager.safe_find(it1.m_gl_index_id); - DBUG_ASSERT(kd != nullptr); - stats[it1.m_gl_index_id].merge(it1, true, kd->max_storage_fmt_length()); + auto it_index = to_recalc.find(it1.m_gl_index_id); + DBUG_ASSERT(it_index != to_recalc.end()); + if (it_index == to_recalc.end()) { + continue; + } + stats[it1.m_gl_index_id].merge( + it1, true, it_index->second->max_storage_fmt_length()); } num_sst++; } - // calculate memtable cardinality - Rdb_tbl_card_coll cardinality_collector(rocksdb_table_stats_sampling_pct); - auto read_opts = rocksdb::ReadOptions(); - read_opts.read_tier = rocksdb::ReadTier::kMemtableTier; - for (uint i = 0; i < table_arg->s->keys; i++) { - const Rdb_key_def &kd = *m_key_descr_arr[i]; - Rdb_index_stats &stat = stats[kd.get_gl_index_id()]; + if (include_memtables) { + // calculate memtable cardinality + Rdb_tbl_card_coll cardinality_collector(rocksdb_table_stats_sampling_pct); + auto read_opts = rocksdb::ReadOptions(); + read_opts.read_tier = rocksdb::ReadTier::kMemtableTier; + for (const auto &it_kd : to_recalc) { + const std::shared_ptr &kd = it_kd.second; + Rdb_index_stats &stat = stats[kd->get_gl_index_id()]; - uchar r_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2]; - auto r = get_range(i, r_buf); - uint64_t memtableCount; - uint64_t memtableSize; - rdb->GetApproximateMemTableStats(kd.get_cf(), r, &memtableCount, - &memtableSize); - if (memtableCount < (uint64_t)stat.m_rows / 10) { - // skip tables that already have enough stats from SST files to reduce - // overhead and avoid degradation of big tables stats by sampling from - // relatively tiny (less than 10% of full data set) memtable dataset - continue; - } - - std::unique_ptr it = std::unique_ptr( - rdb->NewIterator(read_opts, kd.get_cf())); - - uchar *first_key; - uint key_size; - if (is_pk(i, table, m_tbl_def)) { - first_key = m_pk_packed_tuple; - } else { - first_key = m_sk_packed_tuple; - } - kd.get_first_key(first_key, &key_size); - rocksdb::Slice first_index_key((const char *)first_key, key_size); - - cardinality_collector.Reset(); - for (it->Seek(first_index_key); is_valid(it.get()); it->Next()) { - const rocksdb::Slice key = it->key(); - if (!kd.covers_key(key)) { - break; // end of this index + uchar r_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2]; + auto r = myrocks::get_range(*kd, r_buf); + uint64_t memtableCount; + uint64_t memtableSize; + rdb->GetApproximateMemTableStats(kd->get_cf(), r, &memtableCount, + &memtableSize); + if (memtableCount < (uint64_t)stat.m_rows / 10) { + // skip tables that already have enough stats from SST files to reduce + // overhead and avoid degradation of big tables stats by sampling from + // relatively tiny (less than 10% of full data set) memtable dataset + continue; } - stat.m_rows++; - cardinality_collector.ProcessKey(key, &kd, &stat); + std::unique_ptr it = + std::unique_ptr( + rdb->NewIterator(read_opts, kd->get_cf())); + + rocksdb::Slice first_index_key((const char *)r_buf, + Rdb_key_def::INDEX_NUMBER_SIZE); + + cardinality_collector.Reset(); + for (it->Seek(first_index_key); is_valid(it.get()); it->Next()) { + const rocksdb::Slice key = it->key(); + if (!kd->covers_key(key)) { + break; // end of this index + } + stat.m_rows++; + + cardinality_collector.ProcessKey(key, kd.get(), &stat); + } + cardinality_collector.AdjustStats(&stat); } - cardinality_collector.AdjustStats(&stat); } // set and persist new stats @@ -11448,6 +11863,19 @@ int ha_rocksdb::calculate_stats(const TABLE *const table_arg, THD *const thd, DBUG_RETURN(HA_EXIT_SUCCESS); } +int ha_rocksdb::calculate_stats_for_table() { + DBUG_ENTER_FUNC(); + + std::unordered_map> + ids_to_check; + for (uint i = 0; i < table->s->keys; i++) { + ids_to_check.insert(std::make_pair(m_key_descr_arr[i]->get_gl_index_id(), + m_key_descr_arr[i])); + } + + DBUG_RETURN(calculate_stats(ids_to_check, true)); +} + /* @return HA_ADMIN_OK OK @@ -11456,7 +11884,16 @@ int ha_rocksdb::calculate_stats(const TABLE *const table_arg, THD *const thd, int ha_rocksdb::analyze(THD *const thd, HA_CHECK_OPT *const check_opt) { DBUG_ENTER_FUNC(); - if (table && calculate_stats(table, thd, check_opt) != HA_EXIT_SUCCESS) { + if (table) { + if (calculate_stats_for_table() != HA_EXIT_SUCCESS) { + DBUG_RETURN(HA_ADMIN_FAILED); + } + } + + // A call to ::info is needed to repopulate some SQL level structs. This is + // necessary for online analyze because we cannot rely on another ::open + // call to call info for us. + if (info(HA_STATUS_CONST | HA_STATUS_VARIABLE) != HA_EXIT_SUCCESS) { DBUG_RETURN(HA_ADMIN_FAILED); } @@ -12286,18 +12723,6 @@ bool ha_rocksdb::commit_inplace_alter_table( dict_manager.finish_indexes_operation( create_index_ids, Rdb_key_def::DDL_CREATE_INDEX_ONGOING); - /* - We need to recalculate the index stats here manually. The reason is that - the secondary index does not exist inside - m_index_num_to_keydef until it is committed to the data dictionary, which - prevents us from updating the stats normally as the ddl_manager cannot - find the proper gl_index_ids yet during adjust_stats calls. - */ - if (calculate_stats(altered_table, nullptr, nullptr)) { - /* Failed to update index statistics, should never happen */ - DBUG_ASSERT(0); - } - rdb_drop_idx_thread.signal(); } @@ -12369,6 +12794,8 @@ struct rocksdb_status_counters_t { uint64_t block_cache_data_hit; uint64_t block_cache_data_add; uint64_t bloom_filter_useful; + uint64_t bloom_filter_full_positive; + uint64_t bloom_filter_full_true_positive; uint64_t memtable_hit; uint64_t memtable_miss; uint64_t get_hit_l0; @@ -12443,6 +12870,8 @@ DEF_SHOW_FUNC(block_cache_data_miss, BLOCK_CACHE_DATA_MISS) DEF_SHOW_FUNC(block_cache_data_hit, BLOCK_CACHE_DATA_HIT) DEF_SHOW_FUNC(block_cache_data_add, BLOCK_CACHE_DATA_ADD) DEF_SHOW_FUNC(bloom_filter_useful, BLOOM_FILTER_USEFUL) +DEF_SHOW_FUNC(bloom_filter_full_positive, BLOOM_FILTER_FULL_POSITIVE) +DEF_SHOW_FUNC(bloom_filter_full_true_positive, BLOOM_FILTER_FULL_TRUE_POSITIVE) DEF_SHOW_FUNC(memtable_hit, MEMTABLE_HIT) DEF_SHOW_FUNC(memtable_miss, MEMTABLE_MISS) DEF_SHOW_FUNC(get_hit_l0, GET_HIT_L0) @@ -12677,6 +13106,8 @@ static SHOW_VAR rocksdb_status_vars[] = { DEF_STATUS_VAR(block_cache_data_hit), DEF_STATUS_VAR(block_cache_data_add), DEF_STATUS_VAR(bloom_filter_useful), + DEF_STATUS_VAR(bloom_filter_full_positive), + DEF_STATUS_VAR(bloom_filter_full_true_positive), DEF_STATUS_VAR(memtable_hit), DEF_STATUS_VAR(memtable_miss), DEF_STATUS_VAR(get_hit_l0), @@ -12734,6 +13165,10 @@ static SHOW_VAR rocksdb_status_vars[] = { &rocksdb_snapshot_conflict_errors, SHOW_LONGLONG), DEF_STATUS_VAR_PTR("wal_group_syncs", &rocksdb_wal_group_syncs, SHOW_LONGLONG), + DEF_STATUS_VAR_PTR("manual_compactions_processed", + &rocksdb_manual_compactions_processed, SHOW_LONGLONG), + DEF_STATUS_VAR_PTR("manual_compactions_running", + &rocksdb_manual_compactions_running, SHOW_LONGLONG), DEF_STATUS_VAR_PTR("number_sst_entry_put", &rocksdb_num_sst_entry_put, SHOW_LONGLONG), DEF_STATUS_VAR_PTR("number_sst_entry_delete", &rocksdb_num_sst_entry_delete, @@ -12807,24 +13242,194 @@ void Rdb_background_thread::run() { rdb_handle_io_error(s, RDB_IO_ERROR_BG_THREAD); } } + // Recalculate statistics for indexes. + if (rocksdb_stats_recalc_rate) { + std::unordered_map> + to_recalc; + + if (rdb_indexes_to_recalc.empty()) { + struct Rdb_index_collector : public Rdb_tables_scanner { + int add_table(Rdb_tbl_def *tdef) override { + for (uint i = 0; i < tdef->m_key_count; i++) { + rdb_indexes_to_recalc.push_back( + tdef->m_key_descr_arr[i]->get_gl_index_id()); + } + return HA_EXIT_SUCCESS; + } + } collector; + ddl_manager.scan_for_tables(&collector); + } + + while (to_recalc.size() < rocksdb_stats_recalc_rate && + !rdb_indexes_to_recalc.empty()) { + const auto index_id = rdb_indexes_to_recalc.back(); + rdb_indexes_to_recalc.pop_back(); + + std::shared_ptr keydef = + ddl_manager.safe_find(index_id); + + if (keydef) { + to_recalc.insert(std::make_pair(keydef->get_gl_index_id(), keydef)); + } + } + + if (!to_recalc.empty()) { + calculate_stats(to_recalc, false); + } + } + } // save remaining stats which might've left unsaved ddl_manager.persist_stats(); } -bool ha_rocksdb::check_bloom_and_set_bounds(THD *thd, const Rdb_key_def &kd, - const rocksdb::Slice &eq_cond, - const bool use_all_keys, - uchar *lower_bound_buf, - uchar *upper_bound_buf, - rocksdb::Slice *out_lower_bound, - rocksdb::Slice *out_upper_bound) { +/* + A background thread to handle manual compactions, + except for dropping indexes/tables. Every second, it checks + pending manual compactions, and it calls CompactRange if there is. +*/ +void Rdb_manual_compaction_thread::run() { + mysql_mutex_init(0, &m_mc_mutex, MY_MUTEX_INIT_FAST); + RDB_MUTEX_LOCK_CHECK(m_signal_mutex); + for (;;) { + if (m_stop) { + break; + } + timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += 1; + + const auto ret MY_ATTRIBUTE((__unused__)) = + mysql_cond_timedwait(&m_signal_cond, &m_signal_mutex, &ts); + if (m_stop) { + break; + } + // make sure, no program error is returned + DBUG_ASSERT(ret == 0 || ret == ETIMEDOUT); + RDB_MUTEX_UNLOCK_CHECK(m_signal_mutex); + + RDB_MUTEX_LOCK_CHECK(m_mc_mutex); + // Grab the first item and proceed, if not empty. + if (m_requests.empty()) { + RDB_MUTEX_UNLOCK_CHECK(m_mc_mutex); + RDB_MUTEX_LOCK_CHECK(m_signal_mutex); + continue; + } + Manual_compaction_request &mcr = m_requests.begin()->second; + DBUG_ASSERT(mcr.cf != nullptr); + DBUG_ASSERT(mcr.state == Manual_compaction_request::INITED); + mcr.state = Manual_compaction_request::RUNNING; + RDB_MUTEX_UNLOCK_CHECK(m_mc_mutex); + + DBUG_ASSERT(mcr.state == Manual_compaction_request::RUNNING); + // NO_LINT_DEBUG + sql_print_information("Manual Compaction id %d cf %s started.", mcr.mc_id, + mcr.cf->GetName().c_str()); + rocksdb_manual_compactions_running++; + if (rocksdb_debug_manual_compaction_delay > 0) { + my_sleep(rocksdb_debug_manual_compaction_delay * 1000000); + } + // CompactRange may take a very long time. On clean shutdown, + // it is cancelled by CancelAllBackgroundWork, then status is + // set to shutdownInProgress. + const rocksdb::Status s = rdb->CompactRange( + getCompactRangeOptions(mcr.concurrency), mcr.cf, mcr.start, mcr.limit); + rocksdb_manual_compactions_running--; + if (s.ok()) { + // NO_LINT_DEBUG + sql_print_information("Manual Compaction id %d cf %s ended.", mcr.mc_id, + mcr.cf->GetName().c_str()); + } else { + // NO_LINT_DEBUG + sql_print_information("Manual Compaction id %d cf %s aborted. %s", + mcr.mc_id, mcr.cf->GetName().c_str(), s.getState()); + if (!s.IsShutdownInProgress()) { + rdb_handle_io_error(s, RDB_IO_ERROR_BG_THREAD); + } else { + DBUG_ASSERT(m_requests.size() == 1); + } + } + rocksdb_manual_compactions_processed++; + clear_manual_compaction_request(mcr.mc_id, false); + RDB_MUTEX_LOCK_CHECK(m_signal_mutex); + } + clear_all_manual_compaction_requests(); + DBUG_ASSERT(m_requests.empty()); + RDB_MUTEX_UNLOCK_CHECK(m_signal_mutex); + mysql_mutex_destroy(&m_mc_mutex); +} + +void Rdb_manual_compaction_thread::clear_all_manual_compaction_requests() { + RDB_MUTEX_LOCK_CHECK(m_mc_mutex); + m_requests.clear(); + RDB_MUTEX_UNLOCK_CHECK(m_mc_mutex); +} + +void Rdb_manual_compaction_thread::clear_manual_compaction_request( + int mc_id, bool init_only) { + bool erase = true; + RDB_MUTEX_LOCK_CHECK(m_mc_mutex); + auto it = m_requests.find(mc_id); + if (it != m_requests.end()) { + if (init_only) { + Manual_compaction_request mcr = it->second; + if (mcr.state != Manual_compaction_request::INITED) { + erase = false; + } + } + if (erase) { + m_requests.erase(it); + } + } else { + // Current code path guarantees that erasing by the same mc_id happens + // at most once. INITED state may be erased by a thread that requested + // the compaction. RUNNING state is erased by mc thread only. + DBUG_ASSERT(0); + } + RDB_MUTEX_UNLOCK_CHECK(m_mc_mutex); +} + +int Rdb_manual_compaction_thread::request_manual_compaction( + rocksdb::ColumnFamilyHandle *cf, rocksdb::Slice *start, + rocksdb::Slice *limit, int concurrency) { + int mc_id = -1; + RDB_MUTEX_LOCK_CHECK(m_mc_mutex); + if (m_requests.size() >= rocksdb_max_manual_compactions) { + RDB_MUTEX_UNLOCK_CHECK(m_mc_mutex); + return mc_id; + } + Manual_compaction_request mcr; + mc_id = mcr.mc_id = ++m_latest_mc_id; + mcr.state = Manual_compaction_request::INITED; + mcr.cf = cf; + mcr.start = start; + mcr.limit = limit; + mcr.concurrency = concurrency; + m_requests.insert(std::make_pair(mcr.mc_id, mcr)); + RDB_MUTEX_UNLOCK_CHECK(m_mc_mutex); + return mc_id; +} + +bool Rdb_manual_compaction_thread::is_manual_compaction_finished(int mc_id) { + bool finished = false; + RDB_MUTEX_LOCK_CHECK(m_mc_mutex); + if (m_requests.count(mc_id) == 0) { + finished = true; + } + RDB_MUTEX_UNLOCK_CHECK(m_mc_mutex); + return finished; +} + +bool ha_rocksdb::check_bloom_and_set_bounds( + THD *thd, const Rdb_key_def &kd, const rocksdb::Slice &eq_cond, + const bool use_all_keys, size_t bound_len, uchar *const lower_bound, + uchar *const upper_bound, rocksdb::Slice *lower_bound_slice, + rocksdb::Slice *upper_bound_slice) { bool can_use_bloom = can_use_bloom_filter(thd, kd, eq_cond, use_all_keys); if (!can_use_bloom) { - setup_iterator_bounds(kd, eq_cond, - lower_bound_buf, upper_bound_buf, - out_lower_bound, out_upper_bound); + setup_iterator_bounds(kd, eq_cond, bound_len, lower_bound, upper_bound, + lower_bound_slice, upper_bound_slice); } return can_use_bloom; } @@ -12934,7 +13539,6 @@ void rdb_update_global_stats(const operation_type &type, uint count, int rdb_get_table_perf_counters(const char *const tablename, Rdb_perf_counters *const counters) { - DBUG_ASSERT(counters != nullptr); DBUG_ASSERT(tablename != nullptr); Rdb_table_handler *table_handler; @@ -12974,10 +13578,7 @@ const char *get_rdb_io_error_string(const RDB_IO_ERROR_TYPE err_type) { // so that we can capture as much data as possible to debug the root cause // more efficiently. #ifdef __GNUC__ -#pragma GCC push_options -#pragma GCC optimize("O0") #endif - void rdb_handle_io_error(const rocksdb::Status status, const RDB_IO_ERROR_TYPE err_type) { if (status.IsIOError()) { @@ -12992,6 +13593,9 @@ void rdb_handle_io_error(const rocksdb::Status status, } case RDB_IO_ERROR_BG_THREAD: { rdb_log_status_error(status, "BG thread failed to write to RocksDB"); + /* NO_LINT_DEBUG */ + sql_print_error("MyRocks: aborting on BG write error."); + abort(); break; } case RDB_IO_ERROR_GENERAL: { @@ -13027,9 +13631,7 @@ void rdb_handle_io_error(const rocksdb::Status status, } } #ifdef __GNUC__ -#pragma GCC pop_options #endif - Rdb_dict_manager *rdb_get_dict_manager(void) { return &dict_manager; } Rdb_ddl_manager *rdb_get_ddl_manager(void) { return &ddl_manager; } @@ -13330,6 +13932,42 @@ static void rocksdb_set_wal_bytes_per_sync( RDB_MUTEX_UNLOCK_CHECK(rdb_sysvars_mutex); } +/* + Validating and updating block cache size via sys_var::check path. + SetCapacity may take seconds when reducing block cache, and + sys_var::update holds LOCK_global_system_variables mutex, so + updating block cache size is done at check path instead. +*/ +static int rocksdb_validate_set_block_cache_size( + THD *thd MY_ATTRIBUTE((__unused__)), + struct st_mysql_sys_var *const var MY_ATTRIBUTE((__unused__)), + void *var_ptr, struct st_mysql_value *value) { + DBUG_ASSERT(value != nullptr); + + long long new_value; + + /* value is NULL */ + if (value->val_int(value, &new_value)) { + return HA_EXIT_FAILURE; + } + + if (new_value < RDB_MIN_BLOCK_CACHE_SIZE || + (uint64_t)new_value > (uint64_t)LONGLONG_MAX) { + return HA_EXIT_FAILURE; + } + + RDB_MUTEX_LOCK_CHECK(rdb_block_cache_resize_mutex); + const rocksdb::BlockBasedTableOptions &table_options = + rdb_get_table_options(); + + if (rocksdb_block_cache_size != new_value && table_options.block_cache) { + table_options.block_cache->SetCapacity(new_value); + } + *static_cast(var_ptr) = static_cast(new_value); + RDB_MUTEX_UNLOCK_CHECK(rdb_block_cache_resize_mutex); + return HA_EXIT_SUCCESS; +} + static int rocksdb_validate_update_cf_options(THD * /* unused */, struct st_mysql_sys_var * /*unused*/, @@ -13511,6 +14149,13 @@ double ha_rocksdb::read_time(uint index, uint ranges, ha_rows rows) { DBUG_RETURN((rows / 20.0) + 1); } +void ha_rocksdb::print_error(int error, myf errflag) { + if (error == HA_ERR_ROCKSDB_STATUS_BUSY) { + error = HA_ERR_LOCK_DEADLOCK; + } + handler::print_error(error, errflag); +} + std::string rdb_corruption_marker_file_name() { std::string ret(rocksdb_datadir); ret.append("/ROCKSDB_CORRUPTED"); @@ -13573,7 +14218,7 @@ maria_declare_plugin(rocksdb_se){ myrocks::rdb_i_s_perf_context, myrocks::rdb_i_s_perf_context_global, myrocks::rdb_i_s_cfoptions, myrocks::rdb_i_s_compact_stats, myrocks::rdb_i_s_global_info, myrocks::rdb_i_s_ddl, - myrocks::rdb_i_s_index_file_map, myrocks::rdb_i_s_lock_info, - myrocks::rdb_i_s_trx_info, + myrocks::rdb_i_s_sst_props, myrocks::rdb_i_s_index_file_map, + myrocks::rdb_i_s_lock_info, myrocks::rdb_i_s_trx_info, myrocks::rdb_i_s_deadlock_info maria_declare_plugin_end; diff --git a/storage/rocksdb/ha_rocksdb.h b/storage/rocksdb/ha_rocksdb.h index 739a2c8cdd6..c4447109b0d 100644 --- a/storage/rocksdb/ha_rocksdb.h +++ b/storage/rocksdb/ha_rocksdb.h @@ -107,6 +107,7 @@ struct Rdb_deadlock_info { std::string table_name; }; std::vector path; + int64_t deadlock_time; ulonglong victim_trx_id; }; @@ -147,6 +148,11 @@ const char *const BG_THREAD_NAME = "myrocks-bg"; */ const char *const INDEX_THREAD_NAME = "myrocks-index"; +/* + Name for the manual compaction thread. +*/ +const char *const MANUAL_COMPACTION_THREAD_NAME = "myrocks-mc"; + /* Separator between partition name and the qualifier. Sample usage: @@ -503,6 +509,12 @@ class ha_rocksdb : public my_core::handler { const rocksdb::Snapshot *m_scan_it_snapshot; + /* Buffers used for upper/lower bounds for m_scan_it. */ + uchar *m_scan_it_lower_bound; + uchar *m_scan_it_upper_bound; + rocksdb::Slice m_scan_it_lower_bound_slice; + rocksdb::Slice m_scan_it_upper_bound_slice; + Rdb_tbl_def *m_tbl_def; /* Primary Key encoder from KeyTupleFormat to StorageFormat */ @@ -566,12 +578,6 @@ class ha_rocksdb : public my_core::handler { uchar *m_dup_sk_packed_tuple; uchar *m_dup_sk_packed_tuple_old; - /* Buffers used for passing upper/bound eq conditions. */ - uchar *m_eq_cond_lower_bound; - uchar *m_eq_cond_upper_bound; - rocksdb::Slice m_eq_cond_lower_bound_slice; - rocksdb::Slice m_eq_cond_upper_bound_slice; - /* Temporary space for packing VARCHARs (we provide it to pack_record()/pack_index_tuple() calls). @@ -653,21 +659,20 @@ class ha_rocksdb : public my_core::handler { enum ha_rkey_function find_flag) const MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); void setup_iterator_bounds(const Rdb_key_def &kd, - const rocksdb::Slice &eq_cond, - uchar *lower_bound_buf, - uchar *upper_bound_buf, - rocksdb::Slice *out_lower_bound, - rocksdb::Slice *out_upper_bound); + const rocksdb::Slice &eq_cond, size_t bound_len, + uchar *const lower_bound, uchar *const upper_bound, + rocksdb::Slice *lower_bound_slice, + rocksdb::Slice *upper_bound_slice); bool can_use_bloom_filter(THD *thd, const Rdb_key_def &kd, const rocksdb::Slice &eq_cond, const bool use_all_keys); bool check_bloom_and_set_bounds(THD *thd, const Rdb_key_def &kd, const rocksdb::Slice &eq_cond, - const bool use_all_keys, - uchar *lower_bound_buf, - uchar *upper_bound_buf, - rocksdb::Slice *out_lower_bound, - rocksdb::Slice *out_upper_bound); + const bool use_all_keys, size_t bound_len, + uchar *const lower_bound, + uchar *const upper_bound, + rocksdb::Slice *lower_bound_slice, + rocksdb::Slice *upper_bound_slice); void setup_scan_iterator(const Rdb_key_def &kd, rocksdb::Slice *slice, const bool use_all_keys, const uint eq_cond_len) MY_ATTRIBUTE((__nonnull__)); @@ -1053,6 +1058,7 @@ public: } virtual double read_time(uint, uint, ha_rows rows) override; + virtual void print_error(int error, myf errflag) override; int open(const char *const name, int mode, uint test_if_locked) override MY_ATTRIBUTE((__warn_unused_result__)); @@ -1167,8 +1173,8 @@ private: MY_ATTRIBUTE((__nonnull__)); int compare_key_parts(const KEY *const old_key, - const KEY *const new_key) const; - MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); + const KEY *const new_key) const + MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); int compare_keys(const KEY *const old_key, const KEY *const new_key) const MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); @@ -1223,7 +1229,7 @@ private: int update_pk(const Rdb_key_def &kd, const struct update_row_info &row_info, const bool &pk_changed) MY_ATTRIBUTE((__warn_unused_result__)); int update_sk(const TABLE *const table_arg, const Rdb_key_def &kd, - const struct update_row_info &row_info) + const struct update_row_info &row_info, const bool bulk_load_sk) MY_ATTRIBUTE((__warn_unused_result__)); int update_indexes(const struct update_row_info &row_info, const bool &pk_changed) @@ -1277,7 +1283,9 @@ private: int finalize_bulk_load(bool print_client_error = true) MY_ATTRIBUTE((__warn_unused_result__)); -public: + int calculate_stats_for_table() MY_ATTRIBUTE((__warn_unused_result__)); + + public: int index_init(uint idx, bool sorted) override MY_ATTRIBUTE((__warn_unused_result__)); int index_end() override MY_ATTRIBUTE((__warn_unused_result__)); @@ -1370,9 +1378,6 @@ public: MY_ATTRIBUTE((__warn_unused_result__)); int analyze(THD *const thd, HA_CHECK_OPT *const check_opt) override MY_ATTRIBUTE((__warn_unused_result__)); - int calculate_stats(const TABLE *const table_arg, THD *const thd, - HA_CHECK_OPT *const check_opt) - MY_ATTRIBUTE((__warn_unused_result__)); enum_alter_inplace_result check_if_supported_inplace_alter( TABLE *altered_table, @@ -1402,7 +1407,7 @@ public: virtual void rpl_after_delete_rows() override; virtual void rpl_before_update_rows() override; virtual void rpl_after_update_rows() override; - virtual bool use_read_free_rpl(); + virtual bool use_read_free_rpl() override; #endif // MARIAROCKS_NOT_YET private: diff --git a/storage/rocksdb/ha_rocksdb_proto.h b/storage/rocksdb/ha_rocksdb_proto.h index 85c3968cc99..deb65edddd3 100644 --- a/storage/rocksdb/ha_rocksdb_proto.h +++ b/storage/rocksdb/ha_rocksdb_proto.h @@ -39,7 +39,12 @@ enum RDB_IO_ERROR_TYPE { const char *get_rdb_io_error_string(const RDB_IO_ERROR_TYPE err_type); void rdb_handle_io_error(const rocksdb::Status status, - const RDB_IO_ERROR_TYPE err_type); + const RDB_IO_ERROR_TYPE err_type) +#if defined(__clang__) + MY_ATTRIBUTE((optnone)); +#else + MY_ATTRIBUTE((optimize("O0"))); +#endif int rdb_normalize_tablename(const std::string &tablename, std::string *str) MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); diff --git a/storage/rocksdb/mysql-test/rocksdb/combinations b/storage/rocksdb/mysql-test/rocksdb/combinations new file mode 100644 index 00000000000..c3f6b9d0396 --- /dev/null +++ b/storage/rocksdb/mysql-test/rocksdb/combinations @@ -0,0 +1,6 @@ +[write_committed] +rocksdb_write_policy=write_committed + +[write_prepared] +rocksdb_write_policy=write_prepared +rocksdb_commit_time_batch_for_recovery=on diff --git a/storage/rocksdb/mysql-test/rocksdb/include/have_write_committed.inc b/storage/rocksdb/mysql-test/rocksdb/include/have_write_committed.inc new file mode 100644 index 00000000000..681b966f680 --- /dev/null +++ b/storage/rocksdb/mysql-test/rocksdb/include/have_write_committed.inc @@ -0,0 +1,3 @@ +if (`select count(*) = 0 from information_schema.session_variables where variable_name = 'rocksdb_write_policy' and variable_value = 'write_committed';`) { + --skip Test requires write_committed policy +} diff --git a/storage/rocksdb/mysql-test/rocksdb/r/add_index_inplace.result b/storage/rocksdb/mysql-test/rocksdb/r/add_index_inplace.result index 6325dc97cf5..32c0537c780 100644 --- a/storage/rocksdb/mysql-test/rocksdb/r/add_index_inplace.result +++ b/storage/rocksdb/mysql-test/rocksdb/r/add_index_inplace.result @@ -299,11 +299,13 @@ connection con1; show global variables like 'rocksdb_bulk_load%'; Variable_name Value rocksdb_bulk_load ON +rocksdb_bulk_load_allow_sk OFF rocksdb_bulk_load_allow_unsorted OFF rocksdb_bulk_load_size 1000 show session variables like 'rocksdb_bulk_load%'; Variable_name Value rocksdb_bulk_load ON +rocksdb_bulk_load_allow_sk OFF rocksdb_bulk_load_allow_unsorted OFF rocksdb_bulk_load_size 1000 CREATE TABLE t1 (i INT, j INT, PRIMARY KEY (i)) ENGINE = ROCKSDB; @@ -356,6 +358,7 @@ SET session rocksdb_merge_buf_size = 340; show variables like 'rocksdb_bulk_load%'; Variable_name Value rocksdb_bulk_load OFF +rocksdb_bulk_load_allow_sk OFF rocksdb_bulk_load_allow_unsorted OFF rocksdb_bulk_load_size 1000 CREATE TABLE t1 (a VARCHAR(80)) ENGINE=RocksDB; @@ -463,3 +466,24 @@ t1 CREATE TABLE `t1` ( KEY `kb` (`b`(8)) ) ENGINE=ROCKSDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin DROP TABLE t1; +SET @prior_rocksdb_table_stats_sampling_pct = @@rocksdb_table_stats_sampling_pct; +set global rocksdb_table_stats_sampling_pct = 100; +CREATE TABLE t1 (a INT, b INT, PRIMARY KEY ka(a)) ENGINE=RocksDB; +INSERT INTO t1 (a, b) VALUES (1, 10); +INSERT INTO t1 (a, b) VALUES (2, 10); +INSERT INTO t1 (a, b) VALUES (3, 20); +INSERT INTO t1 (a, b) VALUES (4, 20); +set global rocksdb_force_flush_memtable_now=1; +analyze table t1; +Table Op Msg_type Msg_text +test.t1 analyze status OK +SHOW INDEX in t1; +Table Non_unique Key_name Seq_in_index Column_name Collation Cardinality Sub_part Packed Null Index_type Comment Index_comment +t1 0 PRIMARY 1 a A 4 NULL NULL LSMTREE +ALTER TABLE t1 ADD INDEX kb(b), ALGORITHM=INPLACE; +SHOW INDEX in t1; +Table Non_unique Key_name Seq_in_index Column_name Collation Cardinality Sub_part Packed Null Index_type Comment Index_comment +t1 0 PRIMARY 1 a A 4 NULL NULL LSMTREE +t1 1 kb 1 b A 2 NULL NULL YES LSMTREE +DROP TABLE t1; +SET global rocksdb_table_stats_sampling_pct = @prior_rocksdb_table_stats_sampling_pct; diff --git a/storage/rocksdb/mysql-test/rocksdb/r/add_index_inplace_sstfilewriter.result b/storage/rocksdb/mysql-test/rocksdb/r/add_index_inplace_sstfilewriter.result index 08f2329f688..0617232f1e3 100644 --- a/storage/rocksdb/mysql-test/rocksdb/r/add_index_inplace_sstfilewriter.result +++ b/storage/rocksdb/mysql-test/rocksdb/r/add_index_inplace_sstfilewriter.result @@ -17,7 +17,7 @@ ALTER TABLE t1 ADD INDEX kb(b), ALGORITHM=INPLACE; set @tmp= @@rocksdb_max_row_locks; set session rocksdb_max_row_locks=1000; ALTER TABLE t1 ADD INDEX kb_copy(b), ALGORITHM=COPY; -ERROR HY000: Status error 10 received from RocksDB: Operation aborted: Failed to acquire lock due to max_num_locks limit +ERROR HY000: Got error 10 'Operation aborted: Failed to acquire lock due to max_num_locks limit' from ROCKSDB set session rocksdb_bulk_load=1; ALTER TABLE t1 ADD INDEX kb_copy(b), ALGORITHM=COPY; set session rocksdb_bulk_load=0; diff --git a/storage/rocksdb/mysql-test/rocksdb/r/allow_no_primary_key.result b/storage/rocksdb/mysql-test/rocksdb/r/allow_no_primary_key.result index a8d5c07072c..de8a1fb4329 100644 --- a/storage/rocksdb/mysql-test/rocksdb/r/allow_no_primary_key.result +++ b/storage/rocksdb/mysql-test/rocksdb/r/allow_no_primary_key.result @@ -276,10 +276,10 @@ a 10 20 30 -connect con1,localhost,root,,; -connection con1; alter table t1 force; -connection default; +select * from t1; +a +insert into t1 values (100); select * from t1; a connection con1; diff --git a/storage/rocksdb/mysql-test/rocksdb/r/autoinc_debug.result b/storage/rocksdb/mysql-test/rocksdb/r/autoinc_debug.result index fe08cd7c361..604e5572eab 100644 --- a/storage/rocksdb/mysql-test/rocksdb/r/autoinc_debug.result +++ b/storage/rocksdb/mysql-test/rocksdb/r/autoinc_debug.result @@ -59,12 +59,10 @@ insert into t values (); set debug_dbug="+d,crash_commit_before"; commit; ERROR HY000: Lost connection to MySQL server during query -select table_schema, table_name, auto_increment from information_schema.tables where table_name = 't'; -table_schema table_name auto_increment -test t 4 -select max(i) from t; -max(i) -3 +select max(i) into @row_max from t; +select table_schema, table_name, auto_increment > @row_max from information_schema.tables where table_name = 't'; +table_schema table_name auto_increment > @row_max +test t 1 # After engine prepare begin; insert into t values (); @@ -72,12 +70,10 @@ insert into t values (); set debug_dbug="+d,crash_commit_after_prepare"; commit; ERROR HY000: Lost connection to MySQL server during query -select table_schema, table_name, auto_increment from information_schema.tables where table_name = 't'; -table_schema table_name auto_increment -test t 4 -select max(i) from t; -max(i) -3 +select max(i) into @row_max from t; +select table_schema, table_name, auto_increment > @row_max from information_schema.tables where table_name = 't'; +table_schema table_name auto_increment > @row_max +test t 1 # After binlog begin; insert into t values (); @@ -85,12 +81,10 @@ insert into t values (); set debug_dbug="+d,crash_commit_after_log"; commit; ERROR HY000: Lost connection to MySQL server during query -select table_schema, table_name, auto_increment from information_schema.tables where table_name = 't'; -table_schema table_name auto_increment -test t 6 -select max(i) from t; -max(i) -5 +select max(i) into @row_max from t; +select table_schema, table_name, auto_increment > @row_max from information_schema.tables where table_name = 't'; +table_schema table_name auto_increment > @row_max +test t 1 # After everything begin; insert into t values (); @@ -98,10 +92,8 @@ insert into t values (); set debug_dbug="+d,crash_commit_after"; commit; ERROR HY000: Lost connection to MySQL server during query -select table_schema, table_name, auto_increment from information_schema.tables where table_name = 't'; -table_schema table_name auto_increment -test t 8 -select max(i) from t; -max(i) -7 +select max(i) into @row_max from t; +select table_schema, table_name, auto_increment > @row_max from information_schema.tables where table_name = 't'; +table_schema table_name auto_increment > @row_max +test t 1 drop table t; diff --git a/storage/rocksdb/mysql-test/rocksdb/r/autoinc_vars.result b/storage/rocksdb/mysql-test/rocksdb/r/autoinc_vars.result index 0c496227006..f59b841a595 100644 --- a/storage/rocksdb/mysql-test/rocksdb/r/autoinc_vars.result +++ b/storage/rocksdb/mysql-test/rocksdb/r/autoinc_vars.result @@ -158,3 +158,21 @@ INSERT INTO t1 (a) VALUES (1); UPDATE t1 SET pk = 3; ALTER TABLE t1 AUTO_INCREMENT 2; DROP TABLE t1; +#---------------------------------- +# Issue #792 Crash in autoincrement +#---------------------------------- +CREATE TABLE t1(C1 DOUBLE AUTO_INCREMENT KEY,C2 CHAR) ENGINE=ROCKSDB; +INSERT INTO t1 VALUES(2177,0); +DROP TABLE t1; +CREATE TABLE t0(c0 BLOB) ENGINE=ROCKSDB; +INSERT INTO t0 VALUES(0); +ALTER TABLE t0 AUTO_INCREMENT=0; +DROP TABLE t0; +#---------------------------------- +# Issue #869 Crash in autoincrement +#---------------------------------- +CREATE TABLE t1 (pk INT AUTO_INCREMENT, a INT, PRIMARY KEY(pk)) ENGINE=RocksDB; +INSERT INTO t1 (a) VALUES (1); +UPDATE t1 SET pk = 3; +ALTER TABLE t1 AUTO_INCREMENT 2; +DROP TABLE t1; diff --git a/storage/rocksdb/mysql-test/rocksdb/r/bloomfilter5.result b/storage/rocksdb/mysql-test/rocksdb/r/bloomfilter5.result index 4f6702b85a7..058d3608c75 100644 --- a/storage/rocksdb/mysql-test/rocksdb/r/bloomfilter5.result +++ b/storage/rocksdb/mysql-test/rocksdb/r/bloomfilter5.result @@ -27,7 +27,7 @@ set global rocksdb_force_flush_memtable_now=1; explain select * from t1 limit 10; id select_type table type possible_keys key key_len ref rows Extra -1 SIMPLE t1 ALL NULL NULL NULL NULL 10000 +1 SIMPLE t1 ALL NULL NULL NULL NULL 10000 NULL select * from t1 limit 10; id1 id2 id3 id4 id5 value value2 1000 2000 2000 10000 10000 1000 aaabbbccc @@ -44,7 +44,7 @@ id1 id2 id3 id4 id5 value value2 explain select * from t1 order by id1 desc,id2 desc, id3 desc, id4 desc limit 1; id select_type table type possible_keys key key_len ref rows Extra -1 SIMPLE t1 index NULL PRIMARY 122 NULL 1 +1 SIMPLE t1 index NULL PRIMARY 122 NULL 1 NULL select * from t1 order by id1 desc,id2 desc, id3 desc, id4 desc limit 1; id1 id2 id3 id4 id5 value value2 1000 2000 2000 10000 10000 1000 aaabbbccc diff --git a/storage/rocksdb/mysql-test/rocksdb/r/bloomfilter_bulk_load.result b/storage/rocksdb/mysql-test/rocksdb/r/bloomfilter_bulk_load.result new file mode 100644 index 00000000000..4b02d1103cf --- /dev/null +++ b/storage/rocksdb/mysql-test/rocksdb/r/bloomfilter_bulk_load.result @@ -0,0 +1,15 @@ +create table r1 (id bigint primary key, value bigint) engine=rocksdb; +create table r2 (id bigint, value bigint, primary key (id) comment 'cf2') engine=rocksdb; +set session rocksdb_bulk_load=1; +set session rocksdb_bulk_load=0; +select variable_value into @h from information_schema.global_status where variable_name='rocksdb_block_cache_filter_hit'; +insert into r1 values (100, 100); +select variable_value-@h from information_schema.global_status where variable_name='rocksdb_block_cache_filter_hit'; +variable_value-@h +1 +select variable_value into @h from information_schema.global_status where variable_name='rocksdb_block_cache_filter_hit'; +insert into r2 values (100, 100); +select variable_value-@h from information_schema.global_status where variable_name='rocksdb_block_cache_filter_hit'; +variable_value-@h +0 +DROP TABLE r1, r2; diff --git a/storage/rocksdb/mysql-test/rocksdb/r/bulk_load_sk.result b/storage/rocksdb/mysql-test/rocksdb/r/bulk_load_sk.result new file mode 100644 index 00000000000..42f820a2a42 --- /dev/null +++ b/storage/rocksdb/mysql-test/rocksdb/r/bulk_load_sk.result @@ -0,0 +1,229 @@ +SET rocksdb_bulk_load_size=15; +CREATE TABLE t4 (a INT, b INT, c INT, +PRIMARY KEY (a), +KEY (b), +KEY (c) COMMENT "rev:cf") ENGINE=ROCKSDB; +CREATE TABLE t3 (a INT, b INT, c INT, +PRIMARY KEY (a), +KEY (b), +KEY (c) COMMENT "rev:cf") ENGINE=ROCKSDB; +CREATE TABLE t2 (a INT, b INT, c INT, +PRIMARY KEY (a), +KEY (b), +KEY (c) COMMENT "rev:cf") ENGINE=ROCKSDB; +CREATE TABLE t1 (a INT, b INT, c INT, +PRIMARY KEY (a), +KEY (b), +KEY (c) COMMENT "rev:cf") ENGINE=ROCKSDB; +SET rocksdb_bulk_load=1; +INSERT INTO t1 SELECT * FROM t3 FORCE INDEX (PRIMARY) ORDER BY a; +SELECT count(*) FROM t1 FORCE INDEX (PRIMARY); +count(*) +0 +SELECT count(*) FROM t1 FORCE INDEX (b); +count(*) +10 +SELECT count(*) FROM t1 FORCE INDEX (c); +count(*) +10 +SET rocksdb_bulk_load=0; +SELECT * FROM t1 FORCE INDEX (PRIMARY); +a b c +-9 11 11 +-7 9 9 +-5 7 7 +-3 5 5 +-1 3 3 +2 0 0 +4 -2 -2 +6 -4 -4 +8 -6 -6 +10 -8 -8 +SELECT b FROM t1 FORCE INDEX (b); +b +-8 +-6 +-4 +-2 +0 +3 +5 +7 +9 +11 +SELECT c FROM t1 FORCE INDEX (c); +c +-8 +-6 +-4 +-2 +0 +3 +5 +7 +9 +11 +Checksums should match +CHECKSUM TABLE t3; +Table Checksum +test.t3 3862424802 +CHECKSUM TABLE t1; +Table Checksum +test.t1 3862424802 +SET rocksdb_bulk_load_allow_sk=1; +SET rocksdb_bulk_load=1; +INSERT INTO t4 SELECT * FROM t3 FORCE INDEX (PRIMARY) ORDER BY a; +SELECT count(*) FROM t4 FORCE INDEX (PRIMARY); +count(*) +0 +SELECT count(*) FROM t4 FORCE INDEX (b); +count(*) +0 +SELECT count(*) FROM t4 FORCE INDEX (c); +count(*) +0 +SET rocksdb_bulk_load=0; +SELECT * FROM t4 FORCE INDEX (PRIMARY); +a b c +-9 11 11 +-7 9 9 +-5 7 7 +-3 5 5 +-1 3 3 +2 0 0 +4 -2 -2 +6 -4 -4 +8 -6 -6 +10 -8 -8 +SELECT b FROM t4 FORCE INDEX (b); +b +-8 +-6 +-4 +-2 +0 +3 +5 +7 +9 +11 +SELECT c FROM t4 FORCE INDEX (c); +c +-8 +-6 +-4 +-2 +0 +3 +5 +7 +9 +11 +Checksums should match +CHECKSUM TABLE t3; +Table Checksum +test.t3 3862424802 +CHECKSUM TABLE t4; +Table Checksum +test.t4 3862424802 +SET rocksdb_bulk_load_allow_unsorted=1; +SET rocksdb_bulk_load_allow_sk=1; +SET rocksdb_bulk_load=1; +INSERT INTO t2 SELECT * FROM t3 WHERE b >= 0 ORDER BY b; +INSERT INTO t2 SELECT * FROM t3 WHERE b < 0 ORDER BY b; +SELECT count(*) FROM t2 FORCE INDEX (PRIMARY); +count(*) +0 +SELECT count(*) FROM t2 FORCE INDEX (b); +count(*) +0 +SELECT count(*) FROM t2 FORCE INDEX (c); +count(*) +0 +SELECT count(*) FROM t2 FORCE INDEX (PRIMARY); +count(*) +0 +SELECT count(*) FROM t2 FORCE INDEX (b); +count(*) +0 +SELECT count(*) FROM t2 FORCE INDEX (c); +count(*) +0 +SET rocksdb_bulk_load=0; +SELECT * FROM t2 FORCE INDEX (PRIMARY); +a b c +-19 21 21 +-17 19 19 +-15 17 17 +-13 15 15 +-11 13 13 +-9 11 11 +-7 9 9 +-5 7 7 +-3 5 5 +-1 3 3 +2 0 0 +4 -2 -2 +6 -4 -4 +8 -6 -6 +10 -8 -8 +12 -10 -10 +14 -12 -12 +16 -14 -14 +18 -16 -16 +20 -18 -18 +SELECT b FROM t2 FORCE INDEX (b); +b +-18 +-16 +-14 +-12 +-10 +-8 +-6 +-4 +-2 +0 +3 +5 +7 +9 +11 +13 +15 +17 +19 +21 +SELECT c FROM t2 FORCE INDEX (c); +c +-18 +-16 +-14 +-12 +-10 +-8 +-6 +-4 +-2 +0 +3 +5 +7 +9 +11 +13 +15 +17 +19 +21 +Checksums should match +CHECKSUM TABLE t3; +Table Checksum +test.t3 1495594118 +CHECKSUM TABLE t2; +Table Checksum +test.t2 1495594118 +DROP TABLE t1; +DROP TABLE t2; +DROP TABLE t3; +DROP TABLE t4; diff --git a/storage/rocksdb/mysql-test/rocksdb/r/cardinality.result b/storage/rocksdb/mysql-test/rocksdb/r/cardinality.result index 4b201d523d9..d037c636a16 100644 --- a/storage/rocksdb/mysql-test/rocksdb/r/cardinality.result +++ b/storage/rocksdb/mysql-test/rocksdb/r/cardinality.result @@ -82,4 +82,19 @@ t1 1 t1_5 2 c1 A 100000 NULL NULL YES LSMTREE SELECT table_name, table_rows FROM information_schema.tables WHERE table_schema = DATABASE(); table_name table_rows t1 100000 -drop table t1; +CREATE TABLE t2 (a INT, b INT, c INT, d INT, e INT, f INT, g INT, +PRIMARY KEY (a), KEY (c, b, a, d, e, f, g)) +ENGINE=ROCKSDB; +SET GLOBAL rocksdb_force_flush_memtable_now = 1; +ANALYZE TABLE t2; +Table Op Msg_type Msg_text +test.t2 analyze status OK +cardinality of the columns after 'a' must be equal to the cardinality of column 'a' +SELECT CARDINALITY INTO @c FROM information_schema.statistics WHERE TABLE_NAME='t2' AND INDEX_NAME='c' AND COLUMN_NAME='a'; +SELECT COLUMN_NAME, CARDINALITY = @c FROM information_schema.statistics WHERE TABLE_NAME='t2' AND INDEX_NAME='c' AND SEQ_IN_INDEX > 3; +COLUMN_NAME CARDINALITY = @c +d 1 +e 1 +f 1 +g 1 +drop table t1, t2; diff --git a/storage/rocksdb/mysql-test/rocksdb/r/collation.result b/storage/rocksdb/mysql-test/rocksdb/r/collation.result index e372cbe2109..10e0d9b0002 100644 --- a/storage/rocksdb/mysql-test/rocksdb/r/collation.result +++ b/storage/rocksdb/mysql-test/rocksdb/r/collation.result @@ -1,6 +1,7 @@ -SET @start_global_value = @@global.ROCKSDB_STRICT_COLLATION_EXCEPTIONS; -DROP TABLE IF EXISTS t1; +call mtr.add_suppression("Invalid pattern"); CREATE TABLE t1 (id INT primary key, value varchar(50), value2 varbinary(50), value3 text) engine=rocksdb charset utf8; +ALTER TABLE t1 ADD INDEX (value); +ERROR HY000: Unsupported collation on string indexed column test.t1.value Use binary collation (binary, latin1_bin, utf8_bin). DROP TABLE t1; CREATE TABLE t1 (id INT primary key, value varchar(50), value2 varbinary(50), value3 text, index(value)) engine=rocksdb charset utf8; ERROR HY000: Unsupported collation on string indexed column test.t1.value Use binary collation (latin1_bin, binary, utf8_bin). @@ -13,6 +14,7 @@ SET GLOBAL rocksdb_strict_collation_check=1; CREATE TABLE t1 (id INT primary key, value varchar(50), value2 varbinary(50), value3 text, index(value2)) engine=rocksdb charset utf8; DROP TABLE t1; CREATE TABLE t1 (id varchar(20), value varchar(50), value2 varchar(50), value3 text, primary key (id), index(value, value2)) engine=rocksdb charset latin1 collate latin1_bin; +ALTER TABLE t1 collate=latin1_general_ci; DROP TABLE t1; CREATE TABLE t1 (id varchar(20), value varchar(50), value2 varchar(50), value3 text, primary key (id), index(value, value2)) engine=rocksdb charset utf8 collate utf8_bin; DROP TABLE t1; @@ -127,4 +129,16 @@ CREATE TABLE abcd (id INT PRIMARY KEY, value varchar(50), index(value)) engine=r ERROR HY000: Unsupported collation on string indexed column test.abcd.value Use binary collation (latin1_bin, binary, utf8_bin). DROP TABLE abc; SET GLOBAL rocksdb_strict_collation_exceptions=null; -SET GLOBAL rocksdb_strict_collation_exceptions=@start_global_value; +SET GLOBAL rocksdb_strict_collation_check=1; +CREATE TABLE t1 (id INT primary key, value varchar(50), value2 varbinary(50), value3 text, index(value)) engine=rocksdb charset utf8; +Warnings: +Warning 1210 Unsupported collation on string indexed column test.t1.value Use binary collation (binary, latin1_bin, utf8_bin). +DROP TABLE t1; +CREATE TABLE t1 (id INT primary key, value varchar(50), value2 varbinary(50), value3 text) engine=rocksdb charset utf8; +ALTER TABLE t1 ADD INDEX (value); +Warnings: +Warning 1210 Unsupported collation on string indexed column test.t1.value Use binary collation (binary, latin1_bin, utf8_bin). +DROP TABLE t1; +CREATE TABLE t1 (id varchar(20), value varchar(50), value2 varchar(50), value3 text, primary key (id), index(value, value2)) engine=rocksdb charset latin1 collate latin1_bin; +ALTER TABLE t1 collate=latin1_general_ci; +DROP TABLE t1; diff --git a/storage/rocksdb/mysql-test/rocksdb/r/com_rpc_tx.result b/storage/rocksdb/mysql-test/rocksdb/r/com_rpc_tx.result new file mode 100644 index 00000000000..789ce12e900 --- /dev/null +++ b/storage/rocksdb/mysql-test/rocksdb/r/com_rpc_tx.result @@ -0,0 +1,21 @@ +CREATE DATABASE db_rpc; +USE db_rpc; +CREATE TABLE t1(pk INT PRIMARY KEY) ENGINE=rocksdb; +SET GLOBAL rocksdb_enable_2pc=1; +SET autocommit = 0; +SET autocommit = 0; +BEGIN; +BEGIN; +SELECT * from t1; +pk +SELECT * from t1; +pk +INSERT INTO t1 VALUES(1); +INSERT INTO t1 VALUES(2); +COMMIT; +COMMIT; +SELECT * from db_rpc.t1; +pk +1 +2 +DROP DATABASE db_rpc; diff --git a/storage/rocksdb/mysql-test/rocksdb/r/create_no_primary_key_table.result b/storage/rocksdb/mysql-test/rocksdb/r/create_no_primary_key_table.result new file mode 100644 index 00000000000..e5aeb57ebdf --- /dev/null +++ b/storage/rocksdb/mysql-test/rocksdb/r/create_no_primary_key_table.result @@ -0,0 +1,38 @@ +USE mysql; +CREATE TABLE mysql_table (a INT) ENGINE=ROCKSDB; +CREATE TABLE test.mysql_table (a INT) ENGINE=ROCKSDB; +ERROR HY000: Table without primary key cannot be created outside mysql schema. +USE test; +CREATE TABLE mysql_table (a INT) ENGINE=ROCKSDB; +ERROR HY000: Table without primary key cannot be created outside mysql schema. +CREATE TABLE IF NOT EXISTS mysql_table_2 (a INT) ENGINE=ROCKSDB; +ERROR HY000: Table without primary key cannot be created outside mysql schema. +CREATE TABLE mysql_table_no_cols ENGINE=ROCKSDB; +ERROR HY000: Table without primary key cannot be created outside mysql schema. +CREATE TABLE mysql.mysql_table_2 (a INT) ENGINE=ROCKSDB; +CREATE TABLE mysql_primkey (a INT PRIMARY KEY, b INT, c INT, d INT, INDEX (c)) ENGINE=ROCKSDB; +ALTER TABLE mysql_primkey DROP b, DROP a, ADD (f INT PRIMARY KEY); +ALTER TABLE mysql_primkey DROP PRIMARY KEY; +ERROR HY000: Table without primary key cannot be created outside mysql schema. +CREATE TABLE mysql_primkey2 (a INT PRIMARY KEY, b INT, c INT) ENGINE=ROCKSDB; +ALTER TABLE mysql_primkey2 DROP b; +ALTER TABLE mysql_primkey2 ADD (b INT); +ALTER TABLE mysql_primkey2 DROP c, DROP A; +ERROR HY000: Table without primary key cannot be created outside mysql schema. +CREATE TABLE mysql_primkey3 (a INT PRIMARY KEY, b INT, c INT, INDEX indexonb (b), INDEX indexonc (c)) ENGINE=ROCKSDB; +ALTER TABLE mysql_primkey3 DROP INDEX indexonb; +ALTER TABLE mysql_primkey3 DROP c; +ALTER TABLE mysql_primkey3 DROP PRIMARY KEY, ADD PRIMARY KEY(b); +CREATE TABLE mysql_primkey4(a INT, b INT, PRIMARY KEY(a), INDEX si (a, b)) ENGINE=ROCKSDB; +DROP INDEX si ON mysql_primkey4; +DROP INDEX `PRIMARY` ON mysql_primkey4; +ERROR HY000: Table without primary key cannot be created outside mysql schema. +ALTER TABLE mysql.mysql_table ADD PRIMARY KEY (a); +ALTER TABLE mysql.mysql_table DROP PRIMARY KEY; +DROP TABLE mysql_primkey; +DROP TABLE mysql_primkey2; +DROP TABLE mysql_primkey3; +DROP TABLE mysql_primkey4; +USE mysql; +DROP TABLE mysql_table; +DROP TABLE mysql_table_2; diff --git a/storage/rocksdb/mysql-test/rocksdb/r/ddl_high_priority.result b/storage/rocksdb/mysql-test/rocksdb/r/ddl_high_priority.result index 39130475349..50733f81598 100644 --- a/storage/rocksdb/mysql-test/rocksdb/r/ddl_high_priority.result +++ b/storage/rocksdb/mysql-test/rocksdb/r/ddl_high_priority.result @@ -45,7 +45,7 @@ set high_priority_ddl = 1; select @@high_priority_ddl; @@high_priority_ddl 1 -lock tables t1 write; +rename table t1 to t2; ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on table metadata: test.t1 alter table t1 modify i bigint;; set high_priority_ddl = 0; @@ -98,7 +98,7 @@ set high_priority_ddl = 1; select @@high_priority_ddl; @@high_priority_ddl 1 -lock tables t1 write; +rename table t1 to t2; ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on table metadata: test.t1 alter table t1 rename t1_new;; set high_priority_ddl = 0; @@ -152,7 +152,7 @@ set high_priority_ddl = 1; select @@high_priority_ddl; @@high_priority_ddl 1 -lock tables t1 write; +rename table t1 to t2; ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on table metadata: test.t1 drop table t1;; ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on table metadata: test.t1 @@ -202,7 +202,7 @@ set high_priority_ddl = 1; select @@high_priority_ddl; @@high_priority_ddl 1 -lock tables t1 write; +rename table t1 to t2; ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on table metadata: test.t1 drop table t1;; set high_priority_ddl = 0; @@ -251,7 +251,7 @@ set high_priority_ddl = 1; select @@high_priority_ddl; @@high_priority_ddl 1 -lock tables t1 write; +rename table t1 to t2; ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on table metadata: test.t1 alter table t1 modify i bigint;; ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on table metadata: test.t1 @@ -302,7 +302,7 @@ set high_priority_ddl = 1; select @@high_priority_ddl; @@high_priority_ddl 1 -lock tables t1 write; +rename table t1 to t2; ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on table metadata: test.t1 create index idx1 on t1 (i);; set high_priority_ddl = 0; @@ -342,7 +342,7 @@ set high_priority_ddl = 1; select @@high_priority_ddl; @@high_priority_ddl 1 -lock tables t1 write; +rename table t1 to t2; ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on table metadata: test.t1 drop index idx1 on t1;; set high_priority_ddl = 0; @@ -390,7 +390,7 @@ set high_priority_ddl = 1; select @@high_priority_ddl; @@high_priority_ddl 1 -lock tables t1 write; +rename table t1 to t2; ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on table metadata: test.t1 truncate t1;; set high_priority_ddl = 0; @@ -438,7 +438,7 @@ set high_priority_ddl = 1; select @@high_priority_ddl; @@high_priority_ddl 1 -lock tables t1 write; +rename table t1 to t2; ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on table metadata: test.t1 create trigger ins_sum before insert on t1 for each row set @sum = @sum + new.i;; set high_priority_ddl = 0; @@ -478,7 +478,7 @@ set high_priority_ddl = 1; select @@high_priority_ddl; @@high_priority_ddl 1 -lock tables t1 write; +rename table t1 to t2; ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on table metadata: test.t1 drop trigger ins_sum;; set high_priority_ddl = 0; @@ -528,7 +528,7 @@ set high_priority_ddl = 1; select @@high_priority_ddl; @@high_priority_ddl 1 -lock tables t1 write; +rename table t1 to t2; ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on table metadata: test.t1 optimize table t1;; Table Op Msg_type Msg_text @@ -538,6 +538,55 @@ connection: default (for show processlist) show processlist; Id User Host db Command Time State Info Rows examined Rows sent Tid Srv_Id root test