diff --git a/mysql-test/suite/innodb/r/innodb-index-debug.result b/mysql-test/suite/innodb/r/innodb-index-debug.result index 8c200109aa4..0995b526821 100644 --- a/mysql-test/suite/innodb/r/innodb-index-debug.result +++ b/mysql-test/suite/innodb/r/innodb-index-debug.result @@ -1,3 +1,7 @@ +SET GLOBAL innodb_max_purge_lag_wait=0; +connect stop_purge,localhost,root; +START TRANSACTION WITH CONSISTENT SNAPSHOT; +connection default; CREATE TABLE t1(c1 INT NOT NULL, c2 INT, PRIMARY KEY(c1)) Engine=InnoDB; SHOW CREATE TABLE t1; Table Create Table @@ -143,6 +147,7 @@ ERROR 23000: Duplicate entry '0' for key 'i' SET DEBUG_SYNC='now SIGNAL log2'; connection con1; disconnect con1; +disconnect stop_purge; connection default; SET DEBUG_SYNC='RESET'; DROP TABLE t1; diff --git a/mysql-test/suite/innodb/r/innodb-index-online.result b/mysql-test/suite/innodb/r/innodb-index-online.result index 7138c5d7a18..e6b69f8f5cc 100644 --- a/mysql-test/suite/innodb/r/innodb-index-online.result +++ b/mysql-test/suite/innodb/r/innodb-index-online.result @@ -437,6 +437,7 @@ COUNT(c22f) CHECK TABLE t1; Table Op Msg_type Msg_text test.t1 check status OK +SET GLOBAL innodb_max_purge_lag_wait=0; ALTER TABLE t1 ADD UNIQUE INDEX c3p5(c3(5)); ERROR 23000: Duplicate entry 'NULL' for key 'c3p5' UPDATE t1 SET c3 = NULL WHERE c3 = ''; diff --git a/mysql-test/suite/innodb/r/innodb-table-online.result b/mysql-test/suite/innodb/r/innodb-table-online.result index caefcce47f1..9296fded0fd 100644 --- a/mysql-test/suite/innodb/r/innodb-table-online.result +++ b/mysql-test/suite/innodb/r/innodb-table-online.result @@ -274,6 +274,7 @@ WHERE variable_name = 'innodb_encryption_n_rowlog_blocks_encrypted'); SET @rowlog_decrypt_1= (SELECT variable_value FROM information_schema.global_status WHERE variable_name = 'innodb_encryption_n_rowlog_blocks_decrypted'); +SET GLOBAL innodb_max_purge_lag_wait=0; SET DEBUG_SYNC = 'row_log_table_apply1_before SIGNAL rebuilt3 WAIT_FOR dml3_done'; ALTER TABLE t1 ADD PRIMARY KEY(c22f), CHANGE c2 c22f INT; ERROR 42000: Multiple primary key defined diff --git a/mysql-test/suite/innodb/r/row_format_redundant.result b/mysql-test/suite/innodb/r/row_format_redundant.result index b798832e96f..d95c37f18cc 100644 --- a/mysql-test/suite/innodb/r/row_format_redundant.result +++ b/mysql-test/suite/innodb/r/row_format_redundant.result @@ -42,14 +42,14 @@ TRUNCATE TABLE t2; ERROR HY000: Table 't2' is read only TRUNCATE TABLE t3; ERROR HY000: Table 't3' is read only -# restart: --innodb-data-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-log-group-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-data-file-path=ibdata1:1M:autoextend --innodb-undo-tablespaces=0 --innodb-stats-persistent=0 +# restart: --innodb-data-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-log-group-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-data-file-path=ibdata1:1M:autoextend --innodb-undo-tablespaces=0 --innodb-stats-persistent=0 --skip-innodb-fast-shutdown TRUNCATE TABLE t1; TRUNCATE TABLE t2; TRUNCATE TABLE t3; corrupted SYS_TABLES.MIX_LEN for test/t1 corrupted SYS_TABLES.MIX_LEN for test/t2 corrupted SYS_TABLES.MIX_LEN for test/t3 -# restart: --innodb-data-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-log-group-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-data-file-path=ibdata1:1M:autoextend --innodb-undo-tablespaces=0 --innodb-stats-persistent=0 +# restart: --innodb-data-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-log-group-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-data-file-path=ibdata1:1M:autoextend --innodb-undo-tablespaces=0 --innodb-stats-persistent=0 --skip-innodb-fast-shutdown TRUNCATE TABLE t1; ERROR 42S02: Table 'test.t1' doesn't exist in engine TRUNCATE TABLE t2; diff --git a/mysql-test/suite/innodb/t/innodb-index-debug.test b/mysql-test/suite/innodb/t/innodb-index-debug.test index f03ef061769..736e129584c 100644 --- a/mysql-test/suite/innodb/t/innodb-index-debug.test +++ b/mysql-test/suite/innodb/t/innodb-index-debug.test @@ -6,6 +6,10 @@ --source include/big_test.inc let $MYSQLD_DATADIR= `select @@datadir`; +SET GLOBAL innodb_max_purge_lag_wait=0; +connect (stop_purge,localhost,root); +START TRANSACTION WITH CONSISTENT SNAPSHOT; +connection default; # # Test for BUG# 12739098, check whether trx->error_status is reset on error. @@ -155,6 +159,7 @@ SET DEBUG_SYNC='now SIGNAL log2'; --connection con1 reap; --disconnect con1 +--disconnect stop_purge --connection default SET DEBUG_SYNC='RESET'; DROP TABLE t1; diff --git a/mysql-test/suite/innodb/t/innodb-index-online.test b/mysql-test/suite/innodb/t/innodb-index-online.test index ab4f5a965da..f8eb8957911 100644 --- a/mysql-test/suite/innodb/t/innodb-index-online.test +++ b/mysql-test/suite/innodb/t/innodb-index-online.test @@ -369,6 +369,8 @@ SELECT connection con1; SELECT COUNT(c22f) FROM t1; CHECK TABLE t1; +# Avoid a strange DEBUG_SYNC timeout on c3p5_created. +SET GLOBAL innodb_max_purge_lag_wait=0; # Create a column prefix index. --error ER_DUP_ENTRY diff --git a/mysql-test/suite/innodb/t/innodb-table-online.test b/mysql-test/suite/innodb/t/innodb-table-online.test index 90c8dc2095d..551b71c4883 100644 --- a/mysql-test/suite/innodb/t/innodb-table-online.test +++ b/mysql-test/suite/innodb/t/innodb-table-online.test @@ -255,6 +255,7 @@ SET @rowlog_decrypt_1= (SELECT variable_value FROM information_schema.global_status WHERE variable_name = 'innodb_encryption_n_rowlog_blocks_decrypted'); +SET GLOBAL innodb_max_purge_lag_wait=0; # Accumulate and apply some modification log. SET DEBUG_SYNC = 'row_log_table_apply1_before SIGNAL rebuilt3 WAIT_FOR dml3_done'; --error ER_MULTIPLE_PRI_KEY diff --git a/mysql-test/suite/innodb/t/row_format_redundant.test b/mysql-test/suite/innodb/t/row_format_redundant.test index 6de7597e983..fa3a104af13 100644 --- a/mysql-test/suite/innodb/t/row_format_redundant.test +++ b/mysql-test/suite/innodb/t/row_format_redundant.test @@ -78,7 +78,7 @@ TRUNCATE TABLE t2; --error ER_OPEN_AS_READONLY TRUNCATE TABLE t3; ---let $restart_parameters= $d +--let $restart_parameters= $d --skip-innodb-fast-shutdown --source include/restart_mysqld.inc TRUNCATE TABLE t1; diff --git a/mysql-test/suite/parts/r/partition_special_innodb.result b/mysql-test/suite/parts/r/partition_special_innodb.result index 27f8f0a9d5c..1d0c075d661 100644 --- a/mysql-test/suite/parts/r/partition_special_innodb.result +++ b/mysql-test/suite/parts/r/partition_special_innodb.result @@ -269,6 +269,7 @@ Table Op Msg_type Msg_text test.t1 check status OK connection default; UNLOCK TABLES; +SET GLOBAL innodb_max_purge_lag_wait=0; connection con2; DROP TABLE t1; connection default; @@ -348,6 +349,7 @@ SELECT * FROM t1; ERROR HY000: Lock wait timeout exceeded; try restarting transaction connection default; UNLOCK TABLES; +SET GLOBAL innodb_max_purge_lag_wait=0; connection con2; DROP TABLE t1; disconnect con2; diff --git a/mysql-test/suite/parts/t/partition_special_innodb.test b/mysql-test/suite/parts/t/partition_special_innodb.test index 6273cc5cb2a..836b7048ccf 100644 --- a/mysql-test/suite/parts/t/partition_special_innodb.test +++ b/mysql-test/suite/parts/t/partition_special_innodb.test @@ -114,6 +114,8 @@ CHECK TABLE t1; --connection default UNLOCK TABLES; +# Avoid a MDL conflict between purge and the DROP TABLE below +SET GLOBAL innodb_max_purge_lag_wait=0; --connection con2 DROP TABLE t1; @@ -222,6 +224,8 @@ SELECT * FROM t1; --connection default UNLOCK TABLES; +# Avoid a MDL conflict between purge and the DROP TABLE below +SET GLOBAL innodb_max_purge_lag_wait=0; --connection con2 DROP TABLE t1; diff --git a/storage/innobase/dict/dict0dict.cc b/storage/innobase/dict/dict0dict.cc index d1c719d2090..0680e60d81c 100644 --- a/storage/innobase/dict/dict0dict.cc +++ b/storage/innobase/dict/dict0dict.cc @@ -662,7 +662,7 @@ dict_table_t::parse_name<>(char(&)[NAME_LEN + 1], char(&)[NAME_LEN + 1], @param[in] table_op operation to perform when opening @return table object after locking MDL shared @retval nullptr if the table is not readable, or if trylock && MDL blocked */ -template +template dict_table_t* dict_acquire_mdl_shared(dict_table_t *table, THD *thd, @@ -678,7 +678,6 @@ dict_acquire_mdl_shared(dict_table_t *table, if (trylock) { - static_assert(!trylock || !purge_thd, "usage"); dict_sys.freeze(SRW_LOCK_CALL); db_len= dict_get_db_name_len(table->name.m_name); dict_sys.unfreeze(); @@ -749,13 +748,7 @@ retry: } } -retry_table_open: dict_sys.freeze(SRW_LOCK_CALL); - if (purge_thd && purge_sys.must_wait_FTS()) - { - not_found= reinterpret_cast(-1); - goto return_without_mdl; - } table= dict_sys.find_table(table_id); if (table) table->acquire(); @@ -763,11 +756,6 @@ retry_table_open: { dict_sys.unfreeze(); dict_sys.lock(SRW_LOCK_CALL); - if (purge_thd && purge_sys.must_wait_FTS()) - { - dict_sys.unlock(); - goto retry_table_open; - } table= dict_load_table_on_id(table_id, table_op == DICT_TABLE_OP_LOAD_TABLESPACE ? DICT_ERR_IGNORE_RECOVER_LOCK @@ -826,24 +814,21 @@ return_without_mdl: goto retry; } -template dict_table_t* dict_acquire_mdl_shared +template dict_table_t* dict_acquire_mdl_shared (dict_table_t*,THD*,MDL_ticket**,dict_table_op_t); -template dict_table_t* dict_acquire_mdl_shared +template dict_table_t* dict_acquire_mdl_shared (dict_table_t*,THD*,MDL_ticket**,dict_table_op_t); /** Look up a table by numeric identifier. -@tparam purge_thd Whether the function is called by purge thread @param[in] table_id table identifier @param[in] dict_locked data dictionary locked @param[in] table_op operation to perform when opening @param[in,out] thd background thread, or NULL to not acquire MDL @param[out] mdl mdl ticket, or NULL @return table, NULL if does not exist */ -template -dict_table_t* -dict_table_open_on_id(table_id_t table_id, bool dict_locked, - dict_table_op_t table_op, THD *thd, - MDL_ticket **mdl) +dict_table_t *dict_table_open_on_id(table_id_t table_id, bool dict_locked, + dict_table_op_t table_op, THD *thd, + MDL_ticket **mdl) { if (!dict_locked) dict_sys.freeze(SRW_LOCK_CALL); @@ -852,16 +837,9 @@ dict_table_open_on_id(table_id_t table_id, bool dict_locked, if (table) { - if (purge_thd && purge_sys.must_wait_FTS()) - { - table= reinterpret_cast(-1); - goto func_exit; - } - table->acquire(); if (thd && !dict_locked) - table= dict_acquire_mdl_shared( - table, thd, mdl, table_op); + table= dict_acquire_mdl_shared(table, thd, mdl, table_op); } else if (table_op != DICT_TABLE_OP_OPEN_ONLY_IF_CACHED) { @@ -875,44 +853,26 @@ dict_table_open_on_id(table_id_t table_id, bool dict_locked, ? DICT_ERR_IGNORE_RECOVER_LOCK : DICT_ERR_IGNORE_FK_NOKEY); if (table) - { - if (purge_thd && purge_sys.must_wait_FTS()) - { - dict_sys.unlock(); - return reinterpret_cast(-1); - } table->acquire(); - } if (!dict_locked) { dict_sys.unlock(); if (table && thd) { dict_sys.freeze(SRW_LOCK_CALL); - table= dict_acquire_mdl_shared( - table, thd, mdl, table_op); + table= dict_acquire_mdl_shared(table, thd, mdl, table_op); dict_sys.unfreeze(); } return table; } } -func_exit: if (!dict_locked) dict_sys.unfreeze(); return table; } -template dict_table_t* dict_table_open_on_id -(table_id_t table_id, bool dict_locked, - dict_table_op_t table_op, THD *thd, - MDL_ticket **mdl); -template dict_table_t* dict_table_open_on_id -(table_id_t table_id, bool dict_locked, - dict_table_op_t table_op, THD *thd, - MDL_ticket **mdl); - /********************************************************************//** Looks for column n position in the clustered index. @return position in internal representation of the clustered index */ diff --git a/storage/innobase/fts/fts0fts.cc b/storage/innobase/fts/fts0fts.cc index f94aed58d21..769ce416325 100644 --- a/storage/innobase/fts/fts0fts.cc +++ b/storage/innobase/fts/fts0fts.cc @@ -1609,10 +1609,11 @@ and common table associated with the fts table. already stopped*/ void purge_sys_t::stop_FTS(const dict_table_t &table, bool already_stopped) { - dict_sys.lock(SRW_LOCK_CALL); if (!already_stopped) purge_sys.stop_FTS(); + dict_sys.lock(SRW_LOCK_CALL); + fts_table_t fts_table; char table_name[MAX_FULL_NAME_LEN]; diff --git a/storage/innobase/include/dict0dict.h b/storage/innobase/include/dict0dict.h index e54a138cc02..895743be84b 100644 --- a/storage/innobase/include/dict0dict.h +++ b/storage/innobase/include/dict0dict.h @@ -132,7 +132,7 @@ enum dict_table_op_t { @param[in] table_op operation to perform when opening @return table object after locking MDL shared @retval NULL if the table is not readable, or if trylock && MDL blocked */ -template +template dict_table_t* dict_acquire_mdl_shared(dict_table_t *table, THD *thd, @@ -146,7 +146,6 @@ dict_acquire_mdl_shared(dict_table_t *table, @param[in,out] thd background thread, or NULL to not acquire MDL @param[out] mdl mdl ticket, or NULL @return table, NULL if does not exist */ -template dict_table_t* dict_table_open_on_id(table_id_t table_id, bool dict_locked, dict_table_op_t table_op, THD *thd= nullptr, diff --git a/storage/innobase/include/row0purge.h b/storage/innobase/include/row0purge.h index 389a061cba0..99d2f484241 100644 --- a/storage/innobase/include/row0purge.h +++ b/storage/innobase/include/row0purge.h @@ -24,8 +24,7 @@ Purge obsolete records Created 3/14/1997 Heikki Tuuri *******************************************************/ -#ifndef row0purge_h -#define row0purge_h +#pragma once #include "que0types.h" #include "btr0types.h" @@ -35,6 +34,7 @@ Created 3/14/1997 Heikki Tuuri #include "row0mysql.h" #include "mysqld.h" #include +#include class MDL_ticket; /** Determines if it is possible to remove a secondary index entry. @@ -89,155 +89,70 @@ struct trx_purge_rec_t roll_ptr_t roll_ptr; }; -/* Purge node structure */ +/** Purge worker context */ +struct purge_node_t +{ + /** node type: QUE_NODE_PURGE */ + que_common_t common; -struct purge_node_t{ - que_common_t common; /*!< node type: QUE_NODE_PURGE */ - /*----------------------*/ - /* Local storage for this graph node */ - roll_ptr_t roll_ptr;/* roll pointer to undo log record */ + /** DB_TRX_ID of the undo log record */ + trx_id_t trx_id; + /** DB_ROLL_PTR pointing to undo log record */ + roll_ptr_t roll_ptr; - undo_no_t undo_no;/*!< undo number of the record */ + /** undo number of the record */ + undo_no_t undo_no; - byte rec_type;/*!< undo log record type: TRX_UNDO_INSERT_REC, - ... */ - byte cmpl_info;/* compiler analysis info of an update */ -private: - /** latest unavailable table ID (do not bother looking up again) */ - table_id_t unavailable_table_id; - /** the latest modification of the table definition identified by - unavailable_table_id, or TRX_ID_MAX */ - trx_id_t def_trx_id; -public: - dict_table_t* table; /*!< table where purge is done */ - - upd_t* update; /*!< update vector for a clustered index - record */ - const dtuple_t* ref; /*!< NULL, or row reference to the next row to - handle */ - dtuple_t* row; /*!< NULL, or a copy (also fields copied to - heap) of the indexed fields of the row to - handle */ - dict_index_t* index; /*!< NULL, or the next index whose record should - be handled */ - mem_heap_t* heap; /*!< memory heap used as auxiliary storage for - row; this must be emptied after a successful - purge of a row */ - ibool found_clust;/*!< whether the clustered index record - determined by ref was found in the clustered - index, and we were able to position pcur on - it */ - btr_pcur_t pcur; /*!< persistent cursor used in searching the - clustered index record */ + /** record type: TRX_UNDO_INSERT_REC, ... */ + byte rec_type; + /** compiler analysis info of an update */ + byte cmpl_info; + /** whether the clustered index record determined by ref was found + in the clustered index of the table, and we were able to position + pcur on it */ + bool found_clust; #ifdef UNIV_DEBUG - /** whether the operation is in progress */ - bool in_progress; + /** whether the operation is in progress */ + bool in_progress= false; #endif - trx_id_t trx_id; /*!< trx id for this purging record */ + /** table where purge is done */ + dict_table_t *table= nullptr; + /** update vector for a clustered index record */ + upd_t *update; + /** row reference to the next row to handle, or nullptr */ + const dtuple_t *ref; + /** nullptr, or a deep copy of the indexed fields of the row to handle */ + dtuple_t *row; + /** nullptr, or the next index of table whose record should be handled */ + dict_index_t *index; + /** memory heap used as auxiliary storage; must be emptied between rows */ + mem_heap_t *heap; + /** persistent cursor to the clustered index record */ + btr_pcur_t pcur; - /** meta-data lock for the table name */ - MDL_ticket* mdl_ticket; + /** Undo recs to purge */ + std::queue undo_recs; - /** table id of the previous undo log record */ - table_id_t last_table_id; + /** map of table identifiers to table handles and meta-data locks */ + std::unordered_map> tables; - /** purge thread */ - THD* purge_thd; - - /** metadata lock holds for this number of undo log recs */ - int mdl_hold_recs; - - /** Undo recs to purge */ - std::queue undo_recs; - - /** Constructor */ - explicit purge_node_t(que_thr_t* parent) : - common(QUE_NODE_PURGE, parent), - unavailable_table_id(0), - table(NULL), - heap(mem_heap_create(256)), -#ifdef UNIV_DEBUG - in_progress(false), -#endif - mdl_ticket(NULL), - last_table_id(0), - purge_thd(NULL), - mdl_hold_recs(0) - { - } + /** Constructor */ + explicit purge_node_t(que_thr_t *parent) : + common(QUE_NODE_PURGE, parent), heap(mem_heap_create(256)), + tables(TRX_PURGE_TABLE_BUCKETS) {} #ifdef UNIV_DEBUG - /***********************************************************//** - Validate the persisent cursor. The purge node has two references - to the clustered index record - one via the ref member, and the - other via the persistent cursor. These two references must match - each other if the found_clust flag is set. - @return true if the persistent cursor is consistent with - the ref member.*/ - bool validate_pcur(); + /** Validate the persistent cursor. The purge node has two references + to the clustered index record: ref and pcur, which must match + each other if found_clust. + @return whether pcur is consistent with ref */ + bool validate_pcur(); #endif - /** Determine if a table should be skipped in purge. - @param[in] table_id table identifier - @return whether to skip the table lookup and processing */ - bool is_skipped(table_id_t id) const - { - return id == unavailable_table_id && trx_id <= def_trx_id; - } - - /** Remember that a table should be skipped in purge. - @param[in] id table identifier - @param[in] limit last transaction for which to skip */ - void skip(table_id_t id, trx_id_t limit) - { - DBUG_ASSERT(limit >= trx_id); - unavailable_table_id = id; - def_trx_id = limit; - } - /** Start processing an undo log record. */ inline void start(); - - /** Close the existing table and release the MDL for it. */ - void close_table() - { - last_table_id= 0; - if (!table) - { - ut_ad(!mdl_ticket); - return; - } - - innobase_reset_background_thd(purge_thd); - dict_table_close(table, false, purge_thd, mdl_ticket); - table= nullptr; - mdl_ticket= nullptr; - } - - - /** Retail mdl for the table id. - @param[in] table_id table id to be processed - @return true if retain mdl */ - bool retain_mdl(table_id_t table_id) - { - ut_ad(table_id); - if (last_table_id == table_id && mdl_hold_recs < 100) - { - ut_ad(table); - mdl_hold_recs++; - return true; - } - - mdl_hold_recs= 0; - close_table(); - return false; - } - - /** Reset the state at end @return the query graph parent */ - inline que_node_t *end(); + inline que_node_t *end(THD *); }; - -#endif diff --git a/storage/innobase/include/trx0purge.h b/storage/innobase/include/trx0purge.h index 277b5c0a04d..bcac02ccce1 100644 --- a/storage/innobase/include/trx0purge.h +++ b/storage/innobase/include/trx0purge.h @@ -135,7 +135,11 @@ private: that are older than view will be removed. */ ReadViewBase view; /** whether purge is enabled; protected by latch and std::atomic */ - std::atomic m_enabled; + std::atomic m_enabled{false}; +public: + /** whether purge is active (may hold table handles) */ + std::atomic m_active{false}; +private: /** number of pending stop() calls without resume() */ Atomic_counter m_paused; /** number of stop_SYS() calls without resume_SYS() */ @@ -252,33 +256,30 @@ public: } /** @return whether the purge tasks are active */ - bool running() const; + static bool running(); + /** Stop purge during FLUSH TABLES FOR EXPORT. */ void stop(); /** Resume purge at UNLOCK TABLES after FLUSH TABLES FOR EXPORT */ void resume(); + /** Close and reopen all tables in case of a MDL conflict with DDL */ + dict_table_t *close_and_reopen(table_id_t id, THD *thd, MDL_ticket **mdl); private: - void wait_SYS(); - void wait_FTS(); + /** Suspend purge during a DDL operation on FULLTEXT INDEX tables */ + void wait_FTS(bool also_sys); public: /** Suspend purge in data dictionary tables */ - void stop_SYS(); + void stop_SYS() { m_SYS_paused++; } /** Resume purge in data dictionary tables */ static void resume_SYS(void *); - /** @return whether stop_SYS() is in effect */ - bool must_wait_SYS() const { return m_SYS_paused; } - /** check stop_SYS() */ - void check_stop_SYS() { if (must_wait_SYS()) wait_SYS(); } /** Pause purge during a DDL operation that could drop FTS_ tables. */ - void stop_FTS() { m_FTS_paused++; } + void stop_FTS(); /** Resume purge after stop_FTS(). */ void resume_FTS() { ut_d(const auto p=) m_FTS_paused--; ut_ad(p); } /** @return whether stop_SYS() is in effect */ bool must_wait_FTS() const { return m_FTS_paused; } - /** check stop_SYS() */ - void check_stop_FTS() { if (must_wait_FTS()) wait_FTS(); } /** Determine if the history of a transaction is purgeable. @param trx_id transaction identifier @@ -313,6 +314,8 @@ public: template void clone_oldest_view() { + if (!also_end_view) + wait_FTS(true); latch.wr_lock(SRW_LOCK_CALL); trx_sys.clone_oldest_view(&view); if (also_end_view) diff --git a/storage/innobase/include/trx0types.h b/storage/innobase/include/trx0types.h index 07c1c6a756b..071ea385cf7 100644 --- a/storage/innobase/include/trx0types.h +++ b/storage/innobase/include/trx0types.h @@ -109,6 +109,11 @@ typedef byte trx_undo_rec_t; typedef std::vector > trx_ids_t; +/** Number of std::unordered_map hash buckets expected to be needed +for table IDs in a purge batch. GNU libstdc++ would default to 1 and +enlarge and rehash on demand. */ +static constexpr size_t TRX_PURGE_TABLE_BUCKETS= 128; + /** The number of rollback segments; rollback segment id must fit in the 7 bits reserved for it in DB_ROLL_PTR. */ static constexpr unsigned TRX_SYS_N_RSEGS= 128; diff --git a/storage/innobase/row/row0purge.cc b/storage/innobase/row/row0purge.cc index 2329877c355..c380c4901b9 100644 --- a/storage/innobase/row/row0purge.cc +++ b/storage/innobase/row/row0purge.cc @@ -48,6 +48,7 @@ Created 3/14/1997 Heikki Tuuri #include "ha_innodb.h" #include "fil0fil.h" #include "debug_sync.h" +#include /************************************************************************* IMPORTANT NOTE: Any operation that generates redo MUST check that there @@ -114,7 +115,6 @@ row_purge_remove_clust_if_poss_low( if (table_id) { retry: - purge_sys.check_stop_FTS(); dict_sys.lock(SRW_LOCK_CALL); table = dict_sys.find_table(table_id); if (!table) { @@ -189,7 +189,6 @@ close_and_exit: ibuf_delete_for_discarded_space(space_id); } - purge_sys.check_stop_SYS(); mtr.start(); index->set_modified(mtr); @@ -640,25 +639,12 @@ row_purge_del_mark( return result; } -void purge_sys_t::wait_SYS() -{ - while (must_wait_SYS()) - std::this_thread::sleep_for(std::chrono::seconds(1)); -} - -void purge_sys_t::wait_FTS() -{ - while (must_wait_FTS()) - std::this_thread::sleep_for(std::chrono::seconds(1)); -} - /** Reset DB_TRX_ID, DB_ROLL_PTR of a clustered index record whose old history can no longer be observed. @param[in,out] node purge node @param[in,out] mtr mini-transaction (will be started and committed) */ static void row_purge_reset_trx_id(purge_node_t* node, mtr_t* mtr) { -retry: /* Reset DB_TRX_ID, DB_ROLL_PTR for old records. */ mtr->start(); @@ -694,17 +680,6 @@ retry: ut_ad(!rec_get_deleted_flag( rec, rec_offs_comp(offsets)) || rec_is_alter_metadata(rec, *index)); - switch (node->table->id) { - case DICT_TABLES_ID: - case DICT_COLUMNS_ID: - case DICT_INDEXES_ID: - if (purge_sys.must_wait_SYS()) { - mtr->commit(); - purge_sys.check_stop_SYS(); - goto retry; - } - } - DBUG_LOG("purge", "reset DB_TRX_ID=" << ib::hex(row_get_rec_trx_id( rec, index, offsets))); @@ -1045,10 +1020,7 @@ row_purge_parse_undo_rec( case TRX_UNDO_EMPTY: case TRX_UNDO_INSERT_METADATA: case TRX_UNDO_INSERT_REC: - /* These records do not store any transaction identifier. - - FIXME: Update SYS_TABLES.ID on both DISCARD TABLESPACE - and IMPORT TABLESPACE to get rid of the repeated lookups! */ + /* These records do not store any transaction identifier. */ node->trx_id = TRX_ID_MAX; break; default: @@ -1064,58 +1036,30 @@ row_purge_parse_undo_rec( break; } - if (node->is_skipped(table_id)) { + auto &tables_entry= node->tables[table_id]; + node->table = tables_entry.first; + if (!node->table) { return false; } - trx_id_t trx_id = TRX_ID_MAX; - - if (node->retain_mdl(table_id)) { - ut_ad(node->table != NULL); - goto already_locked; +#ifndef DBUG_OFF + if (MDL_ticket* mdl = tables_entry.second) { + static_cast(thd_mdl_context(current_thd)) + ->lock_warrant = mdl->get_ctx(); } - -try_again: - purge_sys.check_stop_FTS(); - - node->table = dict_table_open_on_id( - table_id, false, DICT_TABLE_OP_NORMAL, node->purge_thd, - &node->mdl_ticket); - - if (node->table == reinterpret_cast(-1)) { - /* purge stop signal */ - goto try_again; - } - - if (!node->table) { - /* The table has been dropped: no need to do purge and - release mdl happened as a part of open process itself */ - goto err_exit; - } - -already_locked: +#endif ut_ad(!node->table->is_temporary()); clust_index = dict_table_get_first_index(node->table); - if (!clust_index || clust_index->is_corrupted()) { + if (clust_index->is_corrupted()) { /* The table was corrupt in the data dictionary. dict_set_corrupted() works on an index, and we do not have an index to call it with. */ DBUG_ASSERT(table_id == node->table->id); - trx_id = node->table->def_trx_id; - if (!trx_id) { - trx_id = TRX_ID_MAX; - } - -err_exit: - node->close_table(); - node->skip(table_id, trx_id); return false; } - node->last_table_id = table_id; - switch (type) { case TRX_UNDO_INSERT_METADATA: node->ref = &trx_undo_metadata; @@ -1265,19 +1209,19 @@ inline void purge_node_t::start() found_clust= false; rec_type= 0; cmpl_info= 0; - if (!purge_thd) - purge_thd= current_thd; } /** Reset the state at end @return the query graph parent */ -inline que_node_t *purge_node_t::end() +inline que_node_t *purge_node_t::end(THD *thd) { DBUG_ASSERT(common.type == QUE_NODE_PURGE); - close_table(); ut_ad(undo_recs.empty()); ut_d(in_progress= false); - purge_thd= nullptr; + innobase_reset_background_thd(thd); +#ifndef DBUG_OFF + static_cast(thd_mdl_context(thd))->lock_warrant= nullptr; +#endif mem_heap_empty(heap); return common.parent; } @@ -1305,7 +1249,7 @@ row_purge_step( row_purge(node, purge_rec.undo_rec, thr); } - thr->run_node = node->end(); + thr->run_node = node->end(current_thd); return(thr); } diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc index dffc412c560..dfb97bf6134 100644 --- a/storage/innobase/srv/srv0srv.cc +++ b/storage/innobase/srv/srv0srv.cc @@ -1294,43 +1294,35 @@ void purge_sys_t::wake_if_not_active() } /** @return whether the purge tasks are active */ -bool purge_sys_t::running() const +bool purge_sys_t::running() { return purge_coordinator_task.is_running(); } -/** Suspend purge in data dictionary tables */ -void purge_sys_t::stop_SYS() +void purge_sys_t::stop_FTS() { latch.rd_lock(SRW_LOCK_CALL); - ++m_SYS_paused; + m_FTS_paused++; latch.rd_unlock(); + while (m_active) + std::this_thread::sleep_for(std::chrono::seconds(1)); } /** Stop purge during FLUSH TABLES FOR EXPORT */ void purge_sys_t::stop() { - for (;;) + latch.wr_lock(SRW_LOCK_CALL); + + if (!enabled()) { - latch.wr_lock(SRW_LOCK_CALL); - - if (!enabled()) - { - /* Shutdown must have been initiated during FLUSH TABLES FOR EXPORT. */ - ut_ad(!srv_undo_sources); - latch.wr_unlock(); - return; - } - - ut_ad(srv_n_purge_threads > 0); - - if (!must_wait_SYS()) - break; - + /* Shutdown must have been initiated during FLUSH TABLES FOR EXPORT. */ + ut_ad(!srv_undo_sources); latch.wr_unlock(); - std::this_thread::sleep_for(std::chrono::seconds(1)); + return; } + ut_ad(srv_n_purge_threads > 0); + const auto paused= m_paused++; latch.wr_unlock(); @@ -1346,9 +1338,8 @@ void purge_sys_t::stop() /** Resume purge in data dictionary tables */ void purge_sys_t::resume_SYS(void *) { - ut_d(const auto s=) - purge_sys.m_SYS_paused--; - ut_ad(s); + ut_d(auto paused=) purge_sys.m_SYS_paused--; + ut_ad(paused); } /** Resume purge at UNLOCK TABLES after FLUSH TABLES FOR EXPORT */ diff --git a/storage/innobase/trx/trx0purge.cc b/storage/innobase/trx/trx0purge.cc index 4011a3dfe7c..a62b0635e49 100644 --- a/storage/innobase/trx/trx0purge.cc +++ b/storage/innobase/trx/trx0purge.cc @@ -38,10 +38,10 @@ Created 3/26/1996 Heikki Tuuri #include "trx0roll.h" #include "trx0rseg.h" #include "trx0trx.h" +#include "dict0load.h" +#include #include -#include - /** Maximum allowable purge history length. <=0 means 'infinite'. */ ulong srv_max_purge_lag = 0; @@ -169,7 +169,6 @@ void purge_sys_t::create() ut_ad(!heap); ut_ad(!enabled()); m_paused= 0; - m_SYS_paused= 0; query= purge_graph_build(); next_stored= false; rseg= NULL; @@ -1100,11 +1099,177 @@ trx_purge_fetch_next_rec( return trx_purge_get_next_rec(n_pages_handled, heap); } +/** Close all tables that were opened in a purge batch for a worker. +@param node purge task context +@param thd purge coordinator thread handle */ +static void trx_purge_close_tables(purge_node_t *node, THD *thd) +{ + for (auto &t : node->tables) + { + if (!t.second.first); + else if (t.second.first == reinterpret_cast(-1)); + else + { + dict_table_close(t.second.first, false, thd, t.second.second); + t.second.first= reinterpret_cast(-1); + } + } +} + +void purge_sys_t::wait_FTS(bool also_sys) +{ + bool paused; + do + { + latch.wr_lock(SRW_LOCK_CALL); + paused= m_FTS_paused || (also_sys && m_SYS_paused); + latch.wr_unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + while (paused); +} + +__attribute__((nonnull)) +/** Aqcuire a metadata lock on a table. +@param table table handle +@param mdl_context metadata lock acquisition context +@param mdl metadata lcok +@return table handle +@retval nullptr if the table is not found or accessible +@retval -1 if the purge of history must be suspended due to DDL */ +static dict_table_t *trx_purge_table_acquire(dict_table_t *table, + MDL_context *mdl_context, + MDL_ticket **mdl) +{ + ut_ad(dict_sys.frozen_not_locked()); + *mdl= nullptr; + + if (!table->is_readable() || table->corrupted) + { + table->release(); + return nullptr; + } + + size_t db_len= dict_get_db_name_len(table->name.m_name); + if (db_len == 0) + return table; /* InnoDB system tables are not covered by MDL */ + + if (purge_sys.must_wait_FTS()) + { + must_wait: + table->release(); + return reinterpret_cast(-1); + } + + char db_buf[NAME_LEN + 1]; + char tbl_buf[NAME_LEN + 1]; + size_t tbl_len; + + if (!table->parse_name(db_buf, tbl_buf, &db_len, &tbl_len)) + /* The name of an intermediate table starts with #sql */ + return table; + + { + MDL_request request; + MDL_REQUEST_INIT(&request,MDL_key::TABLE, db_buf, tbl_buf, MDL_SHARED, + MDL_EXPLICIT); + if (mdl_context->try_acquire_lock(&request)) + goto must_wait; + *mdl= request.ticket; + if (!*mdl) + goto must_wait; + } + + return table; +} + +/** Open a table handle for the purge of committed transaction history +@param table_id InnoDB table identifier +@param mdl_context metadata lock acquisition context +@param mdl metadata lcok +@return table handle +@retval nullptr if the table is not found or accessible +@retval -1 if the purge of history must be suspended due to DDL */ +static dict_table_t *trx_purge_table_open(table_id_t table_id, + MDL_context *mdl_context, + MDL_ticket **mdl) +{ + dict_sys.freeze(SRW_LOCK_CALL); + + dict_table_t *table= dict_sys.find_table(table_id); + + if (table) + table->acquire(); + else + { + dict_sys.unfreeze(); + dict_sys.lock(SRW_LOCK_CALL); + table= dict_load_table_on_id(table_id, DICT_ERR_IGNORE_FK_NOKEY); + if (table) + table->acquire(); + dict_sys.unlock(); + if (!table) + return nullptr; + dict_sys.freeze(SRW_LOCK_CALL); + } + + table= trx_purge_table_acquire(table, mdl_context, mdl); + dict_sys.unfreeze(); + return table; +} + +ATTRIBUTE_COLD +dict_table_t *purge_sys_t::close_and_reopen(table_id_t id, THD *thd, + MDL_ticket **mdl) +{ + MDL_context *mdl_context= static_cast(thd_mdl_context(thd)); + ut_ad(mdl_context); + retry: + ut_ad(m_active); + + for (que_thr_t *thr= UT_LIST_GET_FIRST(purge_sys.query->thrs); thr; + thr= UT_LIST_GET_NEXT(thrs, thr)) + { + purge_node_t *node= static_cast(thr->child); + trx_purge_close_tables(node, thd); + } + + m_active= false; + wait_FTS(false); + m_active= true; + + dict_table_t *table= trx_purge_table_open(id, mdl_context, mdl); + if (table == reinterpret_cast(-1)) + goto retry; + + for (que_thr_t *thr= UT_LIST_GET_FIRST(purge_sys.query->thrs); thr; + thr= UT_LIST_GET_NEXT(thrs, thr)) + { + purge_node_t *node= static_cast(thr->child); + for (auto &t : node->tables) + { + if (t.second.first) + { + t.second.first= trx_purge_table_open(t.first, mdl_context, + &t.second.second); + if (t.second.first == reinterpret_cast(-1)) + { + if (table) + dict_table_close(table, false, thd, *mdl); + goto retry; + } + } + } + } + + return table; +} + /** Run a purge batch. @param n_purge_threads number of purge threads @return new purge_sys.head and the number of undo log pages handled */ static std::pair -trx_purge_attach_undo_recs(ulint n_purge_threads) +trx_purge_attach_undo_recs(ulint n_purge_threads, THD *thd) { que_thr_t* thr; ulint i; @@ -1146,8 +1311,14 @@ trx_purge_attach_undo_recs(ulint n_purge_threads) i = 0; - std::unordered_map table_id_map; + std::unordered_map + table_id_map(TRX_PURGE_TABLE_BUCKETS); mem_heap_empty(purge_sys.heap); + purge_sys.m_active = true; + + MDL_context* const mdl_context + = static_cast(thd_mdl_context(thd)); + ut_ad(mdl_context); while (UNIV_LIKELY(srv_undo_sources) || !srv_fast_shutdown) { trx_purge_rec_t purge_rec; @@ -1174,9 +1345,17 @@ trx_purge_attach_undo_recs(ulint n_purge_threads) table_id_t table_id = trx_undo_rec_get_table_id( purge_rec.undo_rec); - purge_node_t *& table_node = table_id_map[table_id]; + purge_node_t*& table_node = table_id_map[table_id]; if (!table_node) { + std::pair p; + p.first = trx_purge_table_open(table_id, mdl_context, + &p.second); + if (p.first == reinterpret_cast(-1)) { + p.first = purge_sys.close_and_reopen( + table_id, thd, &p.second); + } + thr = UT_LIST_GET_NEXT(thrs, thr); if (!(++i % n_purge_threads)) { @@ -1186,15 +1365,24 @@ trx_purge_attach_undo_recs(ulint n_purge_threads) table_node = static_cast(thr->child); ut_a(que_node_get_type(table_node) == QUE_NODE_PURGE); + ut_d(auto i=) + table_node->tables.emplace(table_id, p); + ut_ad(i.second); + if (p.first) { + goto enqueue; + } + } else if (table_node->tables[table_id].first) { +enqueue: + table_node->undo_recs.push(purge_rec); } - table_node->undo_recs.push(purge_rec); - if (n_pages_handled >= srv_purge_batch_size) { break; } } + purge_sys.m_active = false; + ut_ad(head <= purge_sys.tail); return std::make_pair(head, n_pages_handled); @@ -1260,8 +1448,10 @@ TRANSACTIONAL_TARGET ulint trx_purge(ulint n_tasks, ulint history_size) } #endif /* UNIV_DEBUG */ + THD* const thd = current_thd; + /* Fetch the UNDO recs that need to be purged. */ - const auto n = trx_purge_attach_undo_recs(n_tasks); + const auto n = trx_purge_attach_undo_recs(n_tasks, thd); { ulint delay = n.second ? srv_max_purge_lag : 0; @@ -1297,6 +1487,13 @@ TRANSACTIONAL_TARGET ulint trx_purge(ulint n_tasks, ulint history_size) trx_purge_wait_for_workers_to_complete(); + for (thr = UT_LIST_GET_FIRST(purge_sys.query->thrs); thr; + thr = UT_LIST_GET_NEXT(thrs, thr)) { + purge_node_t* node = static_cast(thr->child); + trx_purge_close_tables(node, thd); + node->tables.clear(); + } + purge_sys.clone_end_view(n.first); MONITOR_INC_VALUE(MONITOR_PURGE_INVOKED, 1);