From 88733282fb15c80f0bd722df0041d06ad90c26b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20M=C3=A4kel=C3=A4?= Date: Wed, 25 Oct 2023 10:08:20 +0300 Subject: [PATCH] MDEV-32050: Look up tables in the purge coordinator The InnoDB table lookup in purge worker threads is a bottleneck that can degrade a slow shutdown to utilize less than 2 threads. Let us fix that bottleneck by constructing a local lookup table that does not require any synchronization while the undo log records of the current batch are being processed. TRX_PURGE_TABLE_BUCKETS: The initial number of std::unordered_map hash buckets used during a purge batch. This could avoid some resizing and rehashing in trx_purge_attach_undo_recs(). purge_node_t::tables: A lookup table from table ID to an already looked up and locked table. Replaces many fields. trx_purge_attach_undo_recs(): Look up each table in the purge batch only once. trx_purge(): Close all tables and release MDL at the end of the batch. trx_purge_table_open(), trx_purge_table_acquire(): Open a table in purge and acquire a metadata lock on it. This replaces dict_table_open_on_id() and dict_acquire_mdl_shared(). purge_sys_t::close_and_reopen(): In case of an MDL conflict, close and reopen all tables that are covered by the current purge batch. It may be that some of the tables have been dropped meanwhile and can be ignored. This replaces wait_SYS() and wait_FTS(). row_purge_parse_undo_rec(): Make purge_coordinator_task issue a MDL warrant to any purge_worker_task which might need it when innodb_purge_threads>1. purge_node_t::end(): Clear the MDL warrant. Reviewed by: Vladislav Lesin and Vladislav Vaintroub --- .../suite/innodb/r/innodb-index-debug.result | 5 + .../suite/innodb/r/innodb-index-online.result | 1 + .../suite/innodb/r/innodb-table-online.result | 1 + .../innodb/r/row_format_redundant.result | 4 +- .../suite/innodb/t/innodb-index-debug.test | 5 + .../suite/innodb/t/innodb-index-online.test | 2 + .../suite/innodb/t/innodb-table-online.test | 1 + .../suite/innodb/t/row_format_redundant.test | 2 +- .../parts/r/partition_special_innodb.result | 2 + .../parts/t/partition_special_innodb.test | 4 + storage/innobase/dict/dict0dict.cc | 56 +---- storage/innobase/fts/fts0fts.cc | 3 +- storage/innobase/include/dict0dict.h | 3 +- storage/innobase/include/row0purge.h | 187 +++++---------- storage/innobase/include/trx0purge.h | 27 ++- storage/innobase/include/trx0types.h | 5 + storage/innobase/row/row0purge.cc | 90 ++------ storage/innobase/srv/srv0srv.cc | 39 ++-- storage/innobase/trx/trx0purge.cc | 215 +++++++++++++++++- 19 files changed, 344 insertions(+), 308 deletions(-) diff --git a/mysql-test/suite/innodb/r/innodb-index-debug.result b/mysql-test/suite/innodb/r/innodb-index-debug.result index 8c200109aa4..0995b526821 100644 --- a/mysql-test/suite/innodb/r/innodb-index-debug.result +++ b/mysql-test/suite/innodb/r/innodb-index-debug.result @@ -1,3 +1,7 @@ +SET GLOBAL innodb_max_purge_lag_wait=0; +connect stop_purge,localhost,root; +START TRANSACTION WITH CONSISTENT SNAPSHOT; +connection default; CREATE TABLE t1(c1 INT NOT NULL, c2 INT, PRIMARY KEY(c1)) Engine=InnoDB; SHOW CREATE TABLE t1; Table Create Table @@ -143,6 +147,7 @@ ERROR 23000: Duplicate entry '0' for key 'i' SET DEBUG_SYNC='now SIGNAL log2'; connection con1; disconnect con1; +disconnect stop_purge; connection default; SET DEBUG_SYNC='RESET'; DROP TABLE t1; diff --git a/mysql-test/suite/innodb/r/innodb-index-online.result b/mysql-test/suite/innodb/r/innodb-index-online.result index 7138c5d7a18..e6b69f8f5cc 100644 --- a/mysql-test/suite/innodb/r/innodb-index-online.result +++ b/mysql-test/suite/innodb/r/innodb-index-online.result @@ -437,6 +437,7 @@ COUNT(c22f) CHECK TABLE t1; Table Op Msg_type Msg_text test.t1 check status OK +SET GLOBAL innodb_max_purge_lag_wait=0; ALTER TABLE t1 ADD UNIQUE INDEX c3p5(c3(5)); ERROR 23000: Duplicate entry 'NULL' for key 'c3p5' UPDATE t1 SET c3 = NULL WHERE c3 = ''; diff --git a/mysql-test/suite/innodb/r/innodb-table-online.result b/mysql-test/suite/innodb/r/innodb-table-online.result index caefcce47f1..9296fded0fd 100644 --- a/mysql-test/suite/innodb/r/innodb-table-online.result +++ b/mysql-test/suite/innodb/r/innodb-table-online.result @@ -274,6 +274,7 @@ WHERE variable_name = 'innodb_encryption_n_rowlog_blocks_encrypted'); SET @rowlog_decrypt_1= (SELECT variable_value FROM information_schema.global_status WHERE variable_name = 'innodb_encryption_n_rowlog_blocks_decrypted'); +SET GLOBAL innodb_max_purge_lag_wait=0; SET DEBUG_SYNC = 'row_log_table_apply1_before SIGNAL rebuilt3 WAIT_FOR dml3_done'; ALTER TABLE t1 ADD PRIMARY KEY(c22f), CHANGE c2 c22f INT; ERROR 42000: Multiple primary key defined diff --git a/mysql-test/suite/innodb/r/row_format_redundant.result b/mysql-test/suite/innodb/r/row_format_redundant.result index b798832e96f..d95c37f18cc 100644 --- a/mysql-test/suite/innodb/r/row_format_redundant.result +++ b/mysql-test/suite/innodb/r/row_format_redundant.result @@ -42,14 +42,14 @@ TRUNCATE TABLE t2; ERROR HY000: Table 't2' is read only TRUNCATE TABLE t3; ERROR HY000: Table 't3' is read only -# restart: --innodb-data-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-log-group-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-data-file-path=ibdata1:1M:autoextend --innodb-undo-tablespaces=0 --innodb-stats-persistent=0 +# restart: --innodb-data-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-log-group-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-data-file-path=ibdata1:1M:autoextend --innodb-undo-tablespaces=0 --innodb-stats-persistent=0 --skip-innodb-fast-shutdown TRUNCATE TABLE t1; TRUNCATE TABLE t2; TRUNCATE TABLE t3; corrupted SYS_TABLES.MIX_LEN for test/t1 corrupted SYS_TABLES.MIX_LEN for test/t2 corrupted SYS_TABLES.MIX_LEN for test/t3 -# restart: --innodb-data-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-log-group-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-data-file-path=ibdata1:1M:autoextend --innodb-undo-tablespaces=0 --innodb-stats-persistent=0 +# restart: --innodb-data-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-log-group-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-data-file-path=ibdata1:1M:autoextend --innodb-undo-tablespaces=0 --innodb-stats-persistent=0 --skip-innodb-fast-shutdown TRUNCATE TABLE t1; ERROR 42S02: Table 'test.t1' doesn't exist in engine TRUNCATE TABLE t2; diff --git a/mysql-test/suite/innodb/t/innodb-index-debug.test b/mysql-test/suite/innodb/t/innodb-index-debug.test index f03ef061769..736e129584c 100644 --- a/mysql-test/suite/innodb/t/innodb-index-debug.test +++ b/mysql-test/suite/innodb/t/innodb-index-debug.test @@ -6,6 +6,10 @@ --source include/big_test.inc let $MYSQLD_DATADIR= `select @@datadir`; +SET GLOBAL innodb_max_purge_lag_wait=0; +connect (stop_purge,localhost,root); +START TRANSACTION WITH CONSISTENT SNAPSHOT; +connection default; # # Test for BUG# 12739098, check whether trx->error_status is reset on error. @@ -155,6 +159,7 @@ SET DEBUG_SYNC='now SIGNAL log2'; --connection con1 reap; --disconnect con1 +--disconnect stop_purge --connection default SET DEBUG_SYNC='RESET'; DROP TABLE t1; diff --git a/mysql-test/suite/innodb/t/innodb-index-online.test b/mysql-test/suite/innodb/t/innodb-index-online.test index ab4f5a965da..f8eb8957911 100644 --- a/mysql-test/suite/innodb/t/innodb-index-online.test +++ b/mysql-test/suite/innodb/t/innodb-index-online.test @@ -369,6 +369,8 @@ SELECT connection con1; SELECT COUNT(c22f) FROM t1; CHECK TABLE t1; +# Avoid a strange DEBUG_SYNC timeout on c3p5_created. +SET GLOBAL innodb_max_purge_lag_wait=0; # Create a column prefix index. --error ER_DUP_ENTRY diff --git a/mysql-test/suite/innodb/t/innodb-table-online.test b/mysql-test/suite/innodb/t/innodb-table-online.test index 90c8dc2095d..551b71c4883 100644 --- a/mysql-test/suite/innodb/t/innodb-table-online.test +++ b/mysql-test/suite/innodb/t/innodb-table-online.test @@ -255,6 +255,7 @@ SET @rowlog_decrypt_1= (SELECT variable_value FROM information_schema.global_status WHERE variable_name = 'innodb_encryption_n_rowlog_blocks_decrypted'); +SET GLOBAL innodb_max_purge_lag_wait=0; # Accumulate and apply some modification log. SET DEBUG_SYNC = 'row_log_table_apply1_before SIGNAL rebuilt3 WAIT_FOR dml3_done'; --error ER_MULTIPLE_PRI_KEY diff --git a/mysql-test/suite/innodb/t/row_format_redundant.test b/mysql-test/suite/innodb/t/row_format_redundant.test index 6de7597e983..fa3a104af13 100644 --- a/mysql-test/suite/innodb/t/row_format_redundant.test +++ b/mysql-test/suite/innodb/t/row_format_redundant.test @@ -78,7 +78,7 @@ TRUNCATE TABLE t2; --error ER_OPEN_AS_READONLY TRUNCATE TABLE t3; ---let $restart_parameters= $d +--let $restart_parameters= $d --skip-innodb-fast-shutdown --source include/restart_mysqld.inc TRUNCATE TABLE t1; diff --git a/mysql-test/suite/parts/r/partition_special_innodb.result b/mysql-test/suite/parts/r/partition_special_innodb.result index 27f8f0a9d5c..1d0c075d661 100644 --- a/mysql-test/suite/parts/r/partition_special_innodb.result +++ b/mysql-test/suite/parts/r/partition_special_innodb.result @@ -269,6 +269,7 @@ Table Op Msg_type Msg_text test.t1 check status OK connection default; UNLOCK TABLES; +SET GLOBAL innodb_max_purge_lag_wait=0; connection con2; DROP TABLE t1; connection default; @@ -348,6 +349,7 @@ SELECT * FROM t1; ERROR HY000: Lock wait timeout exceeded; try restarting transaction connection default; UNLOCK TABLES; +SET GLOBAL innodb_max_purge_lag_wait=0; connection con2; DROP TABLE t1; disconnect con2; diff --git a/mysql-test/suite/parts/t/partition_special_innodb.test b/mysql-test/suite/parts/t/partition_special_innodb.test index 6273cc5cb2a..836b7048ccf 100644 --- a/mysql-test/suite/parts/t/partition_special_innodb.test +++ b/mysql-test/suite/parts/t/partition_special_innodb.test @@ -114,6 +114,8 @@ CHECK TABLE t1; --connection default UNLOCK TABLES; +# Avoid a MDL conflict between purge and the DROP TABLE below +SET GLOBAL innodb_max_purge_lag_wait=0; --connection con2 DROP TABLE t1; @@ -222,6 +224,8 @@ SELECT * FROM t1; --connection default UNLOCK TABLES; +# Avoid a MDL conflict between purge and the DROP TABLE below +SET GLOBAL innodb_max_purge_lag_wait=0; --connection con2 DROP TABLE t1; diff --git a/storage/innobase/dict/dict0dict.cc b/storage/innobase/dict/dict0dict.cc index d1c719d2090..0680e60d81c 100644 --- a/storage/innobase/dict/dict0dict.cc +++ b/storage/innobase/dict/dict0dict.cc @@ -662,7 +662,7 @@ dict_table_t::parse_name<>(char(&)[NAME_LEN + 1], char(&)[NAME_LEN + 1], @param[in] table_op operation to perform when opening @return table object after locking MDL shared @retval nullptr if the table is not readable, or if trylock && MDL blocked */ -template +template dict_table_t* dict_acquire_mdl_shared(dict_table_t *table, THD *thd, @@ -678,7 +678,6 @@ dict_acquire_mdl_shared(dict_table_t *table, if (trylock) { - static_assert(!trylock || !purge_thd, "usage"); dict_sys.freeze(SRW_LOCK_CALL); db_len= dict_get_db_name_len(table->name.m_name); dict_sys.unfreeze(); @@ -749,13 +748,7 @@ retry: } } -retry_table_open: dict_sys.freeze(SRW_LOCK_CALL); - if (purge_thd && purge_sys.must_wait_FTS()) - { - not_found= reinterpret_cast(-1); - goto return_without_mdl; - } table= dict_sys.find_table(table_id); if (table) table->acquire(); @@ -763,11 +756,6 @@ retry_table_open: { dict_sys.unfreeze(); dict_sys.lock(SRW_LOCK_CALL); - if (purge_thd && purge_sys.must_wait_FTS()) - { - dict_sys.unlock(); - goto retry_table_open; - } table= dict_load_table_on_id(table_id, table_op == DICT_TABLE_OP_LOAD_TABLESPACE ? DICT_ERR_IGNORE_RECOVER_LOCK @@ -826,24 +814,21 @@ return_without_mdl: goto retry; } -template dict_table_t* dict_acquire_mdl_shared +template dict_table_t* dict_acquire_mdl_shared (dict_table_t*,THD*,MDL_ticket**,dict_table_op_t); -template dict_table_t* dict_acquire_mdl_shared +template dict_table_t* dict_acquire_mdl_shared (dict_table_t*,THD*,MDL_ticket**,dict_table_op_t); /** Look up a table by numeric identifier. -@tparam purge_thd Whether the function is called by purge thread @param[in] table_id table identifier @param[in] dict_locked data dictionary locked @param[in] table_op operation to perform when opening @param[in,out] thd background thread, or NULL to not acquire MDL @param[out] mdl mdl ticket, or NULL @return table, NULL if does not exist */ -template -dict_table_t* -dict_table_open_on_id(table_id_t table_id, bool dict_locked, - dict_table_op_t table_op, THD *thd, - MDL_ticket **mdl) +dict_table_t *dict_table_open_on_id(table_id_t table_id, bool dict_locked, + dict_table_op_t table_op, THD *thd, + MDL_ticket **mdl) { if (!dict_locked) dict_sys.freeze(SRW_LOCK_CALL); @@ -852,16 +837,9 @@ dict_table_open_on_id(table_id_t table_id, bool dict_locked, if (table) { - if (purge_thd && purge_sys.must_wait_FTS()) - { - table= reinterpret_cast(-1); - goto func_exit; - } - table->acquire(); if (thd && !dict_locked) - table= dict_acquire_mdl_shared( - table, thd, mdl, table_op); + table= dict_acquire_mdl_shared(table, thd, mdl, table_op); } else if (table_op != DICT_TABLE_OP_OPEN_ONLY_IF_CACHED) { @@ -875,44 +853,26 @@ dict_table_open_on_id(table_id_t table_id, bool dict_locked, ? DICT_ERR_IGNORE_RECOVER_LOCK : DICT_ERR_IGNORE_FK_NOKEY); if (table) - { - if (purge_thd && purge_sys.must_wait_FTS()) - { - dict_sys.unlock(); - return reinterpret_cast(-1); - } table->acquire(); - } if (!dict_locked) { dict_sys.unlock(); if (table && thd) { dict_sys.freeze(SRW_LOCK_CALL); - table= dict_acquire_mdl_shared( - table, thd, mdl, table_op); + table= dict_acquire_mdl_shared(table, thd, mdl, table_op); dict_sys.unfreeze(); } return table; } } -func_exit: if (!dict_locked) dict_sys.unfreeze(); return table; } -template dict_table_t* dict_table_open_on_id -(table_id_t table_id, bool dict_locked, - dict_table_op_t table_op, THD *thd, - MDL_ticket **mdl); -template dict_table_t* dict_table_open_on_id -(table_id_t table_id, bool dict_locked, - dict_table_op_t table_op, THD *thd, - MDL_ticket **mdl); - /********************************************************************//** Looks for column n position in the clustered index. @return position in internal representation of the clustered index */ diff --git a/storage/innobase/fts/fts0fts.cc b/storage/innobase/fts/fts0fts.cc index f94aed58d21..769ce416325 100644 --- a/storage/innobase/fts/fts0fts.cc +++ b/storage/innobase/fts/fts0fts.cc @@ -1609,10 +1609,11 @@ and common table associated with the fts table. already stopped*/ void purge_sys_t::stop_FTS(const dict_table_t &table, bool already_stopped) { - dict_sys.lock(SRW_LOCK_CALL); if (!already_stopped) purge_sys.stop_FTS(); + dict_sys.lock(SRW_LOCK_CALL); + fts_table_t fts_table; char table_name[MAX_FULL_NAME_LEN]; diff --git a/storage/innobase/include/dict0dict.h b/storage/innobase/include/dict0dict.h index e54a138cc02..895743be84b 100644 --- a/storage/innobase/include/dict0dict.h +++ b/storage/innobase/include/dict0dict.h @@ -132,7 +132,7 @@ enum dict_table_op_t { @param[in] table_op operation to perform when opening @return table object after locking MDL shared @retval NULL if the table is not readable, or if trylock && MDL blocked */ -template +template dict_table_t* dict_acquire_mdl_shared(dict_table_t *table, THD *thd, @@ -146,7 +146,6 @@ dict_acquire_mdl_shared(dict_table_t *table, @param[in,out] thd background thread, or NULL to not acquire MDL @param[out] mdl mdl ticket, or NULL @return table, NULL if does not exist */ -template dict_table_t* dict_table_open_on_id(table_id_t table_id, bool dict_locked, dict_table_op_t table_op, THD *thd= nullptr, diff --git a/storage/innobase/include/row0purge.h b/storage/innobase/include/row0purge.h index 389a061cba0..99d2f484241 100644 --- a/storage/innobase/include/row0purge.h +++ b/storage/innobase/include/row0purge.h @@ -24,8 +24,7 @@ Purge obsolete records Created 3/14/1997 Heikki Tuuri *******************************************************/ -#ifndef row0purge_h -#define row0purge_h +#pragma once #include "que0types.h" #include "btr0types.h" @@ -35,6 +34,7 @@ Created 3/14/1997 Heikki Tuuri #include "row0mysql.h" #include "mysqld.h" #include +#include class MDL_ticket; /** Determines if it is possible to remove a secondary index entry. @@ -89,155 +89,70 @@ struct trx_purge_rec_t roll_ptr_t roll_ptr; }; -/* Purge node structure */ +/** Purge worker context */ +struct purge_node_t +{ + /** node type: QUE_NODE_PURGE */ + que_common_t common; -struct purge_node_t{ - que_common_t common; /*!< node type: QUE_NODE_PURGE */ - /*----------------------*/ - /* Local storage for this graph node */ - roll_ptr_t roll_ptr;/* roll pointer to undo log record */ + /** DB_TRX_ID of the undo log record */ + trx_id_t trx_id; + /** DB_ROLL_PTR pointing to undo log record */ + roll_ptr_t roll_ptr; - undo_no_t undo_no;/*!< undo number of the record */ + /** undo number of the record */ + undo_no_t undo_no; - byte rec_type;/*!< undo log record type: TRX_UNDO_INSERT_REC, - ... */ - byte cmpl_info;/* compiler analysis info of an update */ -private: - /** latest unavailable table ID (do not bother looking up again) */ - table_id_t unavailable_table_id; - /** the latest modification of the table definition identified by - unavailable_table_id, or TRX_ID_MAX */ - trx_id_t def_trx_id; -public: - dict_table_t* table; /*!< table where purge is done */ - - upd_t* update; /*!< update vector for a clustered index - record */ - const dtuple_t* ref; /*!< NULL, or row reference to the next row to - handle */ - dtuple_t* row; /*!< NULL, or a copy (also fields copied to - heap) of the indexed fields of the row to - handle */ - dict_index_t* index; /*!< NULL, or the next index whose record should - be handled */ - mem_heap_t* heap; /*!< memory heap used as auxiliary storage for - row; this must be emptied after a successful - purge of a row */ - ibool found_clust;/*!< whether the clustered index record - determined by ref was found in the clustered - index, and we were able to position pcur on - it */ - btr_pcur_t pcur; /*!< persistent cursor used in searching the - clustered index record */ + /** record type: TRX_UNDO_INSERT_REC, ... */ + byte rec_type; + /** compiler analysis info of an update */ + byte cmpl_info; + /** whether the clustered index record determined by ref was found + in the clustered index of the table, and we were able to position + pcur on it */ + bool found_clust; #ifdef UNIV_DEBUG - /** whether the operation is in progress */ - bool in_progress; + /** whether the operation is in progress */ + bool in_progress= false; #endif - trx_id_t trx_id; /*!< trx id for this purging record */ + /** table where purge is done */ + dict_table_t *table= nullptr; + /** update vector for a clustered index record */ + upd_t *update; + /** row reference to the next row to handle, or nullptr */ + const dtuple_t *ref; + /** nullptr, or a deep copy of the indexed fields of the row to handle */ + dtuple_t *row; + /** nullptr, or the next index of table whose record should be handled */ + dict_index_t *index; + /** memory heap used as auxiliary storage; must be emptied between rows */ + mem_heap_t *heap; + /** persistent cursor to the clustered index record */ + btr_pcur_t pcur; - /** meta-data lock for the table name */ - MDL_ticket* mdl_ticket; + /** Undo recs to purge */ + std::queue undo_recs; - /** table id of the previous undo log record */ - table_id_t last_table_id; + /** map of table identifiers to table handles and meta-data locks */ + std::unordered_map> tables; - /** purge thread */ - THD* purge_thd; - - /** metadata lock holds for this number of undo log recs */ - int mdl_hold_recs; - - /** Undo recs to purge */ - std::queue undo_recs; - - /** Constructor */ - explicit purge_node_t(que_thr_t* parent) : - common(QUE_NODE_PURGE, parent), - unavailable_table_id(0), - table(NULL), - heap(mem_heap_create(256)), -#ifdef UNIV_DEBUG - in_progress(false), -#endif - mdl_ticket(NULL), - last_table_id(0), - purge_thd(NULL), - mdl_hold_recs(0) - { - } + /** Constructor */ + explicit purge_node_t(que_thr_t *parent) : + common(QUE_NODE_PURGE, parent), heap(mem_heap_create(256)), + tables(TRX_PURGE_TABLE_BUCKETS) {} #ifdef UNIV_DEBUG - /***********************************************************//** - Validate the persisent cursor. The purge node has two references - to the clustered index record - one via the ref member, and the - other via the persistent cursor. These two references must match - each other if the found_clust flag is set. - @return true if the persistent cursor is consistent with - the ref member.*/ - bool validate_pcur(); + /** Validate the persistent cursor. The purge node has two references + to the clustered index record: ref and pcur, which must match + each other if found_clust. + @return whether pcur is consistent with ref */ + bool validate_pcur(); #endif - /** Determine if a table should be skipped in purge. - @param[in] table_id table identifier - @return whether to skip the table lookup and processing */ - bool is_skipped(table_id_t id) const - { - return id == unavailable_table_id && trx_id <= def_trx_id; - } - - /** Remember that a table should be skipped in purge. - @param[in] id table identifier - @param[in] limit last transaction for which to skip */ - void skip(table_id_t id, trx_id_t limit) - { - DBUG_ASSERT(limit >= trx_id); - unavailable_table_id = id; - def_trx_id = limit; - } - /** Start processing an undo log record. */ inline void start(); - - /** Close the existing table and release the MDL for it. */ - void close_table() - { - last_table_id= 0; - if (!table) - { - ut_ad(!mdl_ticket); - return; - } - - innobase_reset_background_thd(purge_thd); - dict_table_close(table, false, purge_thd, mdl_ticket); - table= nullptr; - mdl_ticket= nullptr; - } - - - /** Retail mdl for the table id. - @param[in] table_id table id to be processed - @return true if retain mdl */ - bool retain_mdl(table_id_t table_id) - { - ut_ad(table_id); - if (last_table_id == table_id && mdl_hold_recs < 100) - { - ut_ad(table); - mdl_hold_recs++; - return true; - } - - mdl_hold_recs= 0; - close_table(); - return false; - } - - /** Reset the state at end @return the query graph parent */ - inline que_node_t *end(); + inline que_node_t *end(THD *); }; - -#endif diff --git a/storage/innobase/include/trx0purge.h b/storage/innobase/include/trx0purge.h index 277b5c0a04d..bcac02ccce1 100644 --- a/storage/innobase/include/trx0purge.h +++ b/storage/innobase/include/trx0purge.h @@ -135,7 +135,11 @@ private: that are older than view will be removed. */ ReadViewBase view; /** whether purge is enabled; protected by latch and std::atomic */ - std::atomic m_enabled; + std::atomic m_enabled{false}; +public: + /** whether purge is active (may hold table handles) */ + std::atomic m_active{false}; +private: /** number of pending stop() calls without resume() */ Atomic_counter m_paused; /** number of stop_SYS() calls without resume_SYS() */ @@ -252,33 +256,30 @@ public: } /** @return whether the purge tasks are active */ - bool running() const; + static bool running(); + /** Stop purge during FLUSH TABLES FOR EXPORT. */ void stop(); /** Resume purge at UNLOCK TABLES after FLUSH TABLES FOR EXPORT */ void resume(); + /** Close and reopen all tables in case of a MDL conflict with DDL */ + dict_table_t *close_and_reopen(table_id_t id, THD *thd, MDL_ticket **mdl); private: - void wait_SYS(); - void wait_FTS(); + /** Suspend purge during a DDL operation on FULLTEXT INDEX tables */ + void wait_FTS(bool also_sys); public: /** Suspend purge in data dictionary tables */ - void stop_SYS(); + void stop_SYS() { m_SYS_paused++; } /** Resume purge in data dictionary tables */ static void resume_SYS(void *); - /** @return whether stop_SYS() is in effect */ - bool must_wait_SYS() const { return m_SYS_paused; } - /** check stop_SYS() */ - void check_stop_SYS() { if (must_wait_SYS()) wait_SYS(); } /** Pause purge during a DDL operation that could drop FTS_ tables. */ - void stop_FTS() { m_FTS_paused++; } + void stop_FTS(); /** Resume purge after stop_FTS(). */ void resume_FTS() { ut_d(const auto p=) m_FTS_paused--; ut_ad(p); } /** @return whether stop_SYS() is in effect */ bool must_wait_FTS() const { return m_FTS_paused; } - /** check stop_SYS() */ - void check_stop_FTS() { if (must_wait_FTS()) wait_FTS(); } /** Determine if the history of a transaction is purgeable. @param trx_id transaction identifier @@ -313,6 +314,8 @@ public: template void clone_oldest_view() { + if (!also_end_view) + wait_FTS(true); latch.wr_lock(SRW_LOCK_CALL); trx_sys.clone_oldest_view(&view); if (also_end_view) diff --git a/storage/innobase/include/trx0types.h b/storage/innobase/include/trx0types.h index 07c1c6a756b..071ea385cf7 100644 --- a/storage/innobase/include/trx0types.h +++ b/storage/innobase/include/trx0types.h @@ -109,6 +109,11 @@ typedef byte trx_undo_rec_t; typedef std::vector > trx_ids_t; +/** Number of std::unordered_map hash buckets expected to be needed +for table IDs in a purge batch. GNU libstdc++ would default to 1 and +enlarge and rehash on demand. */ +static constexpr size_t TRX_PURGE_TABLE_BUCKETS= 128; + /** The number of rollback segments; rollback segment id must fit in the 7 bits reserved for it in DB_ROLL_PTR. */ static constexpr unsigned TRX_SYS_N_RSEGS= 128; diff --git a/storage/innobase/row/row0purge.cc b/storage/innobase/row/row0purge.cc index 2329877c355..c380c4901b9 100644 --- a/storage/innobase/row/row0purge.cc +++ b/storage/innobase/row/row0purge.cc @@ -48,6 +48,7 @@ Created 3/14/1997 Heikki Tuuri #include "ha_innodb.h" #include "fil0fil.h" #include "debug_sync.h" +#include /************************************************************************* IMPORTANT NOTE: Any operation that generates redo MUST check that there @@ -114,7 +115,6 @@ row_purge_remove_clust_if_poss_low( if (table_id) { retry: - purge_sys.check_stop_FTS(); dict_sys.lock(SRW_LOCK_CALL); table = dict_sys.find_table(table_id); if (!table) { @@ -189,7 +189,6 @@ close_and_exit: ibuf_delete_for_discarded_space(space_id); } - purge_sys.check_stop_SYS(); mtr.start(); index->set_modified(mtr); @@ -640,25 +639,12 @@ row_purge_del_mark( return result; } -void purge_sys_t::wait_SYS() -{ - while (must_wait_SYS()) - std::this_thread::sleep_for(std::chrono::seconds(1)); -} - -void purge_sys_t::wait_FTS() -{ - while (must_wait_FTS()) - std::this_thread::sleep_for(std::chrono::seconds(1)); -} - /** Reset DB_TRX_ID, DB_ROLL_PTR of a clustered index record whose old history can no longer be observed. @param[in,out] node purge node @param[in,out] mtr mini-transaction (will be started and committed) */ static void row_purge_reset_trx_id(purge_node_t* node, mtr_t* mtr) { -retry: /* Reset DB_TRX_ID, DB_ROLL_PTR for old records. */ mtr->start(); @@ -694,17 +680,6 @@ retry: ut_ad(!rec_get_deleted_flag( rec, rec_offs_comp(offsets)) || rec_is_alter_metadata(rec, *index)); - switch (node->table->id) { - case DICT_TABLES_ID: - case DICT_COLUMNS_ID: - case DICT_INDEXES_ID: - if (purge_sys.must_wait_SYS()) { - mtr->commit(); - purge_sys.check_stop_SYS(); - goto retry; - } - } - DBUG_LOG("purge", "reset DB_TRX_ID=" << ib::hex(row_get_rec_trx_id( rec, index, offsets))); @@ -1045,10 +1020,7 @@ row_purge_parse_undo_rec( case TRX_UNDO_EMPTY: case TRX_UNDO_INSERT_METADATA: case TRX_UNDO_INSERT_REC: - /* These records do not store any transaction identifier. - - FIXME: Update SYS_TABLES.ID on both DISCARD TABLESPACE - and IMPORT TABLESPACE to get rid of the repeated lookups! */ + /* These records do not store any transaction identifier. */ node->trx_id = TRX_ID_MAX; break; default: @@ -1064,58 +1036,30 @@ row_purge_parse_undo_rec( break; } - if (node->is_skipped(table_id)) { + auto &tables_entry= node->tables[table_id]; + node->table = tables_entry.first; + if (!node->table) { return false; } - trx_id_t trx_id = TRX_ID_MAX; - - if (node->retain_mdl(table_id)) { - ut_ad(node->table != NULL); - goto already_locked; +#ifndef DBUG_OFF + if (MDL_ticket* mdl = tables_entry.second) { + static_cast(thd_mdl_context(current_thd)) + ->lock_warrant = mdl->get_ctx(); } - -try_again: - purge_sys.check_stop_FTS(); - - node->table = dict_table_open_on_id( - table_id, false, DICT_TABLE_OP_NORMAL, node->purge_thd, - &node->mdl_ticket); - - if (node->table == reinterpret_cast(-1)) { - /* purge stop signal */ - goto try_again; - } - - if (!node->table) { - /* The table has been dropped: no need to do purge and - release mdl happened as a part of open process itself */ - goto err_exit; - } - -already_locked: +#endif ut_ad(!node->table->is_temporary()); clust_index = dict_table_get_first_index(node->table); - if (!clust_index || clust_index->is_corrupted()) { + if (clust_index->is_corrupted()) { /* The table was corrupt in the data dictionary. dict_set_corrupted() works on an index, and we do not have an index to call it with. */ DBUG_ASSERT(table_id == node->table->id); - trx_id = node->table->def_trx_id; - if (!trx_id) { - trx_id = TRX_ID_MAX; - } - -err_exit: - node->close_table(); - node->skip(table_id, trx_id); return false; } - node->last_table_id = table_id; - switch (type) { case TRX_UNDO_INSERT_METADATA: node->ref = &trx_undo_metadata; @@ -1265,19 +1209,19 @@ inline void purge_node_t::start() found_clust= false; rec_type= 0; cmpl_info= 0; - if (!purge_thd) - purge_thd= current_thd; } /** Reset the state at end @return the query graph parent */ -inline que_node_t *purge_node_t::end() +inline que_node_t *purge_node_t::end(THD *thd) { DBUG_ASSERT(common.type == QUE_NODE_PURGE); - close_table(); ut_ad(undo_recs.empty()); ut_d(in_progress= false); - purge_thd= nullptr; + innobase_reset_background_thd(thd); +#ifndef DBUG_OFF + static_cast(thd_mdl_context(thd))->lock_warrant= nullptr; +#endif mem_heap_empty(heap); return common.parent; } @@ -1305,7 +1249,7 @@ row_purge_step( row_purge(node, purge_rec.undo_rec, thr); } - thr->run_node = node->end(); + thr->run_node = node->end(current_thd); return(thr); } diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc index dffc412c560..dfb97bf6134 100644 --- a/storage/innobase/srv/srv0srv.cc +++ b/storage/innobase/srv/srv0srv.cc @@ -1294,43 +1294,35 @@ void purge_sys_t::wake_if_not_active() } /** @return whether the purge tasks are active */ -bool purge_sys_t::running() const +bool purge_sys_t::running() { return purge_coordinator_task.is_running(); } -/** Suspend purge in data dictionary tables */ -void purge_sys_t::stop_SYS() +void purge_sys_t::stop_FTS() { latch.rd_lock(SRW_LOCK_CALL); - ++m_SYS_paused; + m_FTS_paused++; latch.rd_unlock(); + while (m_active) + std::this_thread::sleep_for(std::chrono::seconds(1)); } /** Stop purge during FLUSH TABLES FOR EXPORT */ void purge_sys_t::stop() { - for (;;) + latch.wr_lock(SRW_LOCK_CALL); + + if (!enabled()) { - latch.wr_lock(SRW_LOCK_CALL); - - if (!enabled()) - { - /* Shutdown must have been initiated during FLUSH TABLES FOR EXPORT. */ - ut_ad(!srv_undo_sources); - latch.wr_unlock(); - return; - } - - ut_ad(srv_n_purge_threads > 0); - - if (!must_wait_SYS()) - break; - + /* Shutdown must have been initiated during FLUSH TABLES FOR EXPORT. */ + ut_ad(!srv_undo_sources); latch.wr_unlock(); - std::this_thread::sleep_for(std::chrono::seconds(1)); + return; } + ut_ad(srv_n_purge_threads > 0); + const auto paused= m_paused++; latch.wr_unlock(); @@ -1346,9 +1338,8 @@ void purge_sys_t::stop() /** Resume purge in data dictionary tables */ void purge_sys_t::resume_SYS(void *) { - ut_d(const auto s=) - purge_sys.m_SYS_paused--; - ut_ad(s); + ut_d(auto paused=) purge_sys.m_SYS_paused--; + ut_ad(paused); } /** Resume purge at UNLOCK TABLES after FLUSH TABLES FOR EXPORT */ diff --git a/storage/innobase/trx/trx0purge.cc b/storage/innobase/trx/trx0purge.cc index 4011a3dfe7c..a62b0635e49 100644 --- a/storage/innobase/trx/trx0purge.cc +++ b/storage/innobase/trx/trx0purge.cc @@ -38,10 +38,10 @@ Created 3/26/1996 Heikki Tuuri #include "trx0roll.h" #include "trx0rseg.h" #include "trx0trx.h" +#include "dict0load.h" +#include #include -#include - /** Maximum allowable purge history length. <=0 means 'infinite'. */ ulong srv_max_purge_lag = 0; @@ -169,7 +169,6 @@ void purge_sys_t::create() ut_ad(!heap); ut_ad(!enabled()); m_paused= 0; - m_SYS_paused= 0; query= purge_graph_build(); next_stored= false; rseg= NULL; @@ -1100,11 +1099,177 @@ trx_purge_fetch_next_rec( return trx_purge_get_next_rec(n_pages_handled, heap); } +/** Close all tables that were opened in a purge batch for a worker. +@param node purge task context +@param thd purge coordinator thread handle */ +static void trx_purge_close_tables(purge_node_t *node, THD *thd) +{ + for (auto &t : node->tables) + { + if (!t.second.first); + else if (t.second.first == reinterpret_cast(-1)); + else + { + dict_table_close(t.second.first, false, thd, t.second.second); + t.second.first= reinterpret_cast(-1); + } + } +} + +void purge_sys_t::wait_FTS(bool also_sys) +{ + bool paused; + do + { + latch.wr_lock(SRW_LOCK_CALL); + paused= m_FTS_paused || (also_sys && m_SYS_paused); + latch.wr_unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + while (paused); +} + +__attribute__((nonnull)) +/** Aqcuire a metadata lock on a table. +@param table table handle +@param mdl_context metadata lock acquisition context +@param mdl metadata lcok +@return table handle +@retval nullptr if the table is not found or accessible +@retval -1 if the purge of history must be suspended due to DDL */ +static dict_table_t *trx_purge_table_acquire(dict_table_t *table, + MDL_context *mdl_context, + MDL_ticket **mdl) +{ + ut_ad(dict_sys.frozen_not_locked()); + *mdl= nullptr; + + if (!table->is_readable() || table->corrupted) + { + table->release(); + return nullptr; + } + + size_t db_len= dict_get_db_name_len(table->name.m_name); + if (db_len == 0) + return table; /* InnoDB system tables are not covered by MDL */ + + if (purge_sys.must_wait_FTS()) + { + must_wait: + table->release(); + return reinterpret_cast(-1); + } + + char db_buf[NAME_LEN + 1]; + char tbl_buf[NAME_LEN + 1]; + size_t tbl_len; + + if (!table->parse_name(db_buf, tbl_buf, &db_len, &tbl_len)) + /* The name of an intermediate table starts with #sql */ + return table; + + { + MDL_request request; + MDL_REQUEST_INIT(&request,MDL_key::TABLE, db_buf, tbl_buf, MDL_SHARED, + MDL_EXPLICIT); + if (mdl_context->try_acquire_lock(&request)) + goto must_wait; + *mdl= request.ticket; + if (!*mdl) + goto must_wait; + } + + return table; +} + +/** Open a table handle for the purge of committed transaction history +@param table_id InnoDB table identifier +@param mdl_context metadata lock acquisition context +@param mdl metadata lcok +@return table handle +@retval nullptr if the table is not found or accessible +@retval -1 if the purge of history must be suspended due to DDL */ +static dict_table_t *trx_purge_table_open(table_id_t table_id, + MDL_context *mdl_context, + MDL_ticket **mdl) +{ + dict_sys.freeze(SRW_LOCK_CALL); + + dict_table_t *table= dict_sys.find_table(table_id); + + if (table) + table->acquire(); + else + { + dict_sys.unfreeze(); + dict_sys.lock(SRW_LOCK_CALL); + table= dict_load_table_on_id(table_id, DICT_ERR_IGNORE_FK_NOKEY); + if (table) + table->acquire(); + dict_sys.unlock(); + if (!table) + return nullptr; + dict_sys.freeze(SRW_LOCK_CALL); + } + + table= trx_purge_table_acquire(table, mdl_context, mdl); + dict_sys.unfreeze(); + return table; +} + +ATTRIBUTE_COLD +dict_table_t *purge_sys_t::close_and_reopen(table_id_t id, THD *thd, + MDL_ticket **mdl) +{ + MDL_context *mdl_context= static_cast(thd_mdl_context(thd)); + ut_ad(mdl_context); + retry: + ut_ad(m_active); + + for (que_thr_t *thr= UT_LIST_GET_FIRST(purge_sys.query->thrs); thr; + thr= UT_LIST_GET_NEXT(thrs, thr)) + { + purge_node_t *node= static_cast(thr->child); + trx_purge_close_tables(node, thd); + } + + m_active= false; + wait_FTS(false); + m_active= true; + + dict_table_t *table= trx_purge_table_open(id, mdl_context, mdl); + if (table == reinterpret_cast(-1)) + goto retry; + + for (que_thr_t *thr= UT_LIST_GET_FIRST(purge_sys.query->thrs); thr; + thr= UT_LIST_GET_NEXT(thrs, thr)) + { + purge_node_t *node= static_cast(thr->child); + for (auto &t : node->tables) + { + if (t.second.first) + { + t.second.first= trx_purge_table_open(t.first, mdl_context, + &t.second.second); + if (t.second.first == reinterpret_cast(-1)) + { + if (table) + dict_table_close(table, false, thd, *mdl); + goto retry; + } + } + } + } + + return table; +} + /** Run a purge batch. @param n_purge_threads number of purge threads @return new purge_sys.head and the number of undo log pages handled */ static std::pair -trx_purge_attach_undo_recs(ulint n_purge_threads) +trx_purge_attach_undo_recs(ulint n_purge_threads, THD *thd) { que_thr_t* thr; ulint i; @@ -1146,8 +1311,14 @@ trx_purge_attach_undo_recs(ulint n_purge_threads) i = 0; - std::unordered_map table_id_map; + std::unordered_map + table_id_map(TRX_PURGE_TABLE_BUCKETS); mem_heap_empty(purge_sys.heap); + purge_sys.m_active = true; + + MDL_context* const mdl_context + = static_cast(thd_mdl_context(thd)); + ut_ad(mdl_context); while (UNIV_LIKELY(srv_undo_sources) || !srv_fast_shutdown) { trx_purge_rec_t purge_rec; @@ -1174,9 +1345,17 @@ trx_purge_attach_undo_recs(ulint n_purge_threads) table_id_t table_id = trx_undo_rec_get_table_id( purge_rec.undo_rec); - purge_node_t *& table_node = table_id_map[table_id]; + purge_node_t*& table_node = table_id_map[table_id]; if (!table_node) { + std::pair p; + p.first = trx_purge_table_open(table_id, mdl_context, + &p.second); + if (p.first == reinterpret_cast(-1)) { + p.first = purge_sys.close_and_reopen( + table_id, thd, &p.second); + } + thr = UT_LIST_GET_NEXT(thrs, thr); if (!(++i % n_purge_threads)) { @@ -1186,15 +1365,24 @@ trx_purge_attach_undo_recs(ulint n_purge_threads) table_node = static_cast(thr->child); ut_a(que_node_get_type(table_node) == QUE_NODE_PURGE); + ut_d(auto i=) + table_node->tables.emplace(table_id, p); + ut_ad(i.second); + if (p.first) { + goto enqueue; + } + } else if (table_node->tables[table_id].first) { +enqueue: + table_node->undo_recs.push(purge_rec); } - table_node->undo_recs.push(purge_rec); - if (n_pages_handled >= srv_purge_batch_size) { break; } } + purge_sys.m_active = false; + ut_ad(head <= purge_sys.tail); return std::make_pair(head, n_pages_handled); @@ -1260,8 +1448,10 @@ TRANSACTIONAL_TARGET ulint trx_purge(ulint n_tasks, ulint history_size) } #endif /* UNIV_DEBUG */ + THD* const thd = current_thd; + /* Fetch the UNDO recs that need to be purged. */ - const auto n = trx_purge_attach_undo_recs(n_tasks); + const auto n = trx_purge_attach_undo_recs(n_tasks, thd); { ulint delay = n.second ? srv_max_purge_lag : 0; @@ -1297,6 +1487,13 @@ TRANSACTIONAL_TARGET ulint trx_purge(ulint n_tasks, ulint history_size) trx_purge_wait_for_workers_to_complete(); + for (thr = UT_LIST_GET_FIRST(purge_sys.query->thrs); thr; + thr = UT_LIST_GET_NEXT(thrs, thr)) { + purge_node_t* node = static_cast(thr->child); + trx_purge_close_tables(node, thd); + node->tables.clear(); + } + purge_sys.clone_end_view(n.first); MONITOR_INC_VALUE(MONITOR_PURGE_INVOKED, 1);