diff --git a/mysql-test/suite/innodb/r/avoid_deadlock_with_blocked.result b/mysql-test/suite/innodb/r/avoid_deadlock_with_blocked.result new file mode 100644 index 00000000000..4e0cca42898 --- /dev/null +++ b/mysql-test/suite/innodb/r/avoid_deadlock_with_blocked.result @@ -0,0 +1,198 @@ +connect stop_purge,localhost,root; +START TRANSACTION WITH CONSISTENT SNAPSHOT; +connect con1,localhost,root,,; +connect con2,localhost,root,,; +connect con3,localhost,root,,; +connection default; +CREATE TABLE t1 (id INT PRIMARY KEY) ENGINE=InnoDB STATS_PERSISTENT=0; +INSERT INTO t1 (id) VALUES (1); +# Simplest scenario: +# , +# , , +# Before MDEV-34877: +# , , +# After MDEV-34877: +# , , +# Expected: instead of deadlocking, the con1's request should ingore con2's +connection con1; +BEGIN; +SELECT * FROM t1 LOCK IN SHARE MODE; +id +1 +connection con2; +BEGIN; +SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con2_will_wait'; +SELECT * FROM t1 FOR UPDATE; +connection con1; +SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait'; +SELECT * FROM t1 FOR UPDATE; +id +1 +COMMIT; +connection con2; +id +1 +COMMIT; +# The scenario when we bypass X<-S pair: +# , +# , , +# , , +# , , , +connection con1; +BEGIN; +SELECT * FROM t1 LOCK IN SHARE MODE; +id +1 +connection con2; +BEGIN; +SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con2_will_wait'; +SELECT * FROM t1 FOR UPDATE; +connection con3; +SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait'; +BEGIN; +SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait'; +SELECT * FROM t1 LOCK IN SHARE MODE;; +connection con1; +SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait'; +SELECT * FROM t1 FOR UPDATE; +id +1 +COMMIT; +connection con2; +id +1 +COMMIT; +connection con3; +id +1 +COMMIT; +# A variant of the above scenario: +# , +# , , +# , , +# Expected: a deadlock, as INSERT INTENTION should not overtake locks on gap, to not slice them +connection con1; +BEGIN; +SELECT * FROM t1 WHERE id=1 FOR UPDATE; +id +1 +connection con2; +BEGIN; +SET DEBUG_SYNC = 'lock_wait_start SIGNAL con2_will_wait'; +SELECT * FROM t1 LOCK IN SHARE MODE; +connection con1; +SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait'; +INSERT INTO t1 VALUES (0); +ROLLBACK; +connection con2; +ERROR 40001: Deadlock found when trying to get lock; try restarting transaction +COMMIT; +# More complicated scenario: +# , +# , , +# , , +# , , , +# , , +# Expected: a deadlock, as INSERT INTENTION should not overtake locks on gap, to not slice them +connection con1; +BEGIN; +SELECT * FROM t1 LOCK IN SHARE MODE; +id +1 +connection con2; +BEGIN; +SELECT * FROM t1 WHERE id=1 LOCK IN SHARE MODE; +id +1 +connection con3; +SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait'; +SELECT * FROM t1 FOR UPDATE; +connection con1; +SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait'; +SET DEBUG_SYNC = 'lock_wait_start SIGNAL con1_will_wait'; +INSERT INTO t1 VALUES (0); +connection con2; +SET DEBUG_SYNC = 'now WAIT_FOR con1_will_wait'; +COMMIT; +connection con1; +ROLLBACK; +connection con3; +ERROR 40001: Deadlock found when trying to get lock; try restarting transaction +# More complicated scenario. +# , +# , , +# , , +# , , , +# Before MDEV-34877: +# , , +# After MDEV-34877: +# , , +connection con1; +BEGIN; +SELECT * FROM t1 LOCK IN SHARE MODE; +id +1 +connection con2; +BEGIN; +SELECT * FROM t1 WHERE id=1 LOCK IN SHARE MODE; +id +1 +connection default; +connection con3; +SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait'; +SELECT * FROM t1 FOR UPDATE; +connection con1; +SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait'; +SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con1_will_wait'; +SELECT * FROM t1 WHERE id=1 FOR UPDATE; +connection con2; +SET DEBUG_SYNC = 'now WAIT_FOR con1_will_wait'; +COMMIT; +connection con1; +id +1 +COMMIT; +connection con3; +id +1 +COMMIT; +# A secenario, where con1 has to bypass two transactions: +# +# +# +# Before MDEV-34877: +# +# After MDEV-34877: +# +connection con1; +BEGIN; +SELECT * FROM t1 LOCK IN SHARE MODE; +id +1 +connection con2; +SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con2_will_wait'; +SELECT * FROM t1 FOR UPDATE; +connection con3; +SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait'; +SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait'; +SELECT * FROM t1 FOR UPDATE; +connection con1; +SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait'; +SELECT * FROM t1 WHERE id=1 FOR UPDATE; +id +1 +COMMIT; +connection con2; +id +1 +COMMIT; +connection con3; +id +1 +COMMIT; +connection default; +disconnect con1; +disconnect con2; +disconnect con3; +disconnect stop_purge; +DROP TABLE t1; diff --git a/mysql-test/suite/innodb/t/avoid_deadlock_with_blocked.test b/mysql-test/suite/innodb/t/avoid_deadlock_with_blocked.test new file mode 100644 index 00000000000..aa55b1ba008 --- /dev/null +++ b/mysql-test/suite/innodb/t/avoid_deadlock_with_blocked.test @@ -0,0 +1,228 @@ +--source include/have_innodb.inc +--source include/have_debug.inc +--source include/have_debug_sync.inc +--source include/count_sessions.inc + +--disable_query_log +call mtr.add_suppression("InnoDB: Transaction was aborted due to "); +--enable_query_log + +connect stop_purge,localhost,root; +START TRANSACTION WITH CONSISTENT SNAPSHOT; + +--connect (con1,localhost,root,,) +--connect (con2,localhost,root,,) +--connect (con3,localhost,root,,) + +--connection default +CREATE TABLE t1 (id INT PRIMARY KEY) ENGINE=InnoDB STATS_PERSISTENT=0; +INSERT INTO t1 (id) VALUES (1); + +--echo # Simplest scenario: +--echo # , +--echo # , , +--echo # Before MDEV-34877: +--echo # , , +--echo # After MDEV-34877: +--echo # , , +--echo # Expected: instead of deadlocking, the con1's request should ingore con2's + +--connection con1 + BEGIN; + SELECT * FROM t1 LOCK IN SHARE MODE; + +--connection con2 + BEGIN; + SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con2_will_wait'; + --send SELECT * FROM t1 FOR UPDATE + +--connection con1 + SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait'; + SELECT * FROM t1 FOR UPDATE; + COMMIT; + +--connection con2 + --reap + COMMIT; + +--echo # The scenario when we bypass X<-S pair: +--echo # , +--echo # , , +--echo # , , +--echo # , , , + +--connection con1 + BEGIN; + SELECT * FROM t1 LOCK IN SHARE MODE; + +--connection con2 + BEGIN; + SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con2_will_wait'; + --send SELECT * FROM t1 FOR UPDATE + +--connection con3 + SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait'; + BEGIN; + SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait'; + --send SELECT * FROM t1 LOCK IN SHARE MODE; + +--connection con1 + SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait'; + SELECT * FROM t1 FOR UPDATE; + COMMIT; + +--connection con2 + --reap + COMMIT; + +--connection con3 + --reap + COMMIT; + +# +--echo # A variant of the above scenario: +--echo # , +--echo # , , +--echo # , , +--echo # Expected: a deadlock, as INSERT INTENTION should not overtake locks on gap, to not slice them +--connection con1 + BEGIN; + SELECT * FROM t1 WHERE id=1 FOR UPDATE; + +--connection con2 + BEGIN; + SET DEBUG_SYNC = 'lock_wait_start SIGNAL con2_will_wait'; + --send SELECT * FROM t1 LOCK IN SHARE MODE + +--connection con1 + SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait'; + INSERT INTO t1 VALUES (0); + ROLLBACK; + +--connection con2 + --error ER_LOCK_DEADLOCK + --reap + COMMIT; + +--echo # More complicated scenario: +--echo # , +--echo # , , +--echo # , , +--echo # , , , +--echo # , , +--echo # Expected: a deadlock, as INSERT INTENTION should not overtake locks on gap, to not slice them + +--connection con1 + BEGIN; + SELECT * FROM t1 LOCK IN SHARE MODE; + +--connection con2 + BEGIN; + SELECT * FROM t1 WHERE id=1 LOCK IN SHARE MODE; + +--connection con3 + SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait'; + --send SELECT * FROM t1 FOR UPDATE + +--connection con1 + SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait'; + SET DEBUG_SYNC = 'lock_wait_start SIGNAL con1_will_wait'; + --send INSERT INTO t1 VALUES (0) + +--connection con2 + SET DEBUG_SYNC = 'now WAIT_FOR con1_will_wait'; + COMMIT; + +--connection con1 + --reap + ROLLBACK; + +--connection con3 + --error ER_LOCK_DEADLOCK + --reap + +--echo # More complicated scenario. +--echo # , +--echo # , , +--echo # , , +--echo # , , , +--echo # Before MDEV-34877: +--echo # , , +--echo # After MDEV-34877: +--echo # , , + + +--connection con1 + BEGIN; + SELECT * FROM t1 LOCK IN SHARE MODE; + +--connection con2 + BEGIN; + SELECT * FROM t1 WHERE id=1 LOCK IN SHARE MODE; + +--connection default + +--connection con3 + SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait'; + --send SELECT * FROM t1 FOR UPDATE + +--connection con1 + SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait'; + SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con1_will_wait'; + --send SELECT * FROM t1 WHERE id=1 FOR UPDATE + +--connection con2 + SET DEBUG_SYNC = 'now WAIT_FOR con1_will_wait'; + COMMIT; + +--connection con1 + --reap + COMMIT; + +--connection con3 + --reap + COMMIT; + +--echo # A secenario, where con1 has to bypass two transactions: +--echo # +--echo # +--echo # +--echo # Before MDEV-34877: +--echo # +--echo # After MDEV-34877: +--echo # +--connection con1 + BEGIN; + SELECT * FROM t1 LOCK IN SHARE MODE; + +--connection con2 + SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con2_will_wait'; + --send SELECT * FROM t1 FOR UPDATE + +--connection con3 + SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait'; + SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait'; + --send SELECT * FROM t1 FOR UPDATE + +--connection con1 + SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait'; + SELECT * FROM t1 WHERE id=1 FOR UPDATE; + COMMIT; + +--connection con2 + --reap + COMMIT; + +--connection con3 + --reap + COMMIT; + +--connection default +--disconnect con1 +--disconnect con2 +--disconnect con3 +--disconnect stop_purge + +DROP TABLE t1; + +--source include/wait_until_count_sessions.inc diff --git a/storage/innobase/include/hash0hash.h b/storage/innobase/include/hash0hash.h index 4d45c0bf772..4fc21a90f1f 100644 --- a/storage/innobase/include/hash0hash.h +++ b/storage/innobase/include/hash0hash.h @@ -107,10 +107,28 @@ public: @param element the being-removed element @param next the next-element pointer in T */ template - void remove(T &element, T *T::*next) noexcept + void remove(const T &element, T *T::*next) noexcept { remove(search(next, [&element](const T *p){return p==&element;}), next); } + + /** Insert an element after another. + @tparam T type of the element + @param after the element after which to insert + @param insert the being-inserted element + @param next the next-element pointer in T */ + template void insert_after(T &after, T &insert, T *T::*next) + { +#ifdef UNIV_DEBUG + for (const T *c= static_cast(node); c; c= c->*next) + if (c == &after) + goto found; + ut_error; + found: +#endif + insert.*next= after.*next; + after.*next= &insert; + } }; /** Hash table with singly-linked overflow lists */ diff --git a/storage/innobase/include/lock0lock.h b/storage/innobase/include/lock0lock.h index 9da6f1680cd..fb79a82a808 100644 --- a/storage/innobase/include/lock0lock.h +++ b/storage/innobase/include/lock0lock.h @@ -52,6 +52,20 @@ namespace Deadlock enum report { REPORT_OFF, REPORT_BASIC, REPORT_FULL }; } +/** Conflicting lock info */ +struct conflicting_lock_info { + /** Conflicting lock */ + const lock_t *conflicting; + /** If some lock was bypassed, points to the lock after which bypassing + lock must be inserted into linked list of locks for the certain cell of + record locks hash table. */ + lock_t *insert_after; + /** First bypassed lock */ + ut_d(const lock_t *bypassed;) +}; + +extern const conflicting_lock_info null_c_lock_info; + /*********************************************************************//** Gets the heap_no of the smallest user record on a page. @return heap_no of smallest user record, or PAGE_HEAP_NO_SUPREMUM */ @@ -1144,25 +1158,6 @@ struct TMTrxGuard #endif }; -/*********************************************************************//** -Creates a new record lock and inserts it to the lock queue. Does NOT check -for deadlocks or lock compatibility! -@return created lock */ -UNIV_INLINE -lock_t* -lock_rec_create( -/*============*/ - lock_t* c_lock, /*!< conflicting lock */ - unsigned type_mode,/*!< in: lock mode and wait flag */ - const buf_block_t* block, /*!< in: buffer block containing - the record */ - ulint heap_no,/*!< in: heap number of the record */ - dict_index_t* index, /*!< in: index of record */ - trx_t* trx, /*!< in,out: transaction */ - bool caller_owns_trx_mutex); - /*!< in: true if caller owns - trx mutex */ - /** Remove a record lock request, waiting or granted, on a discarded page @param in_lock lock object @param cell hash table cell containing in_lock */ @@ -1170,7 +1165,7 @@ void lock_rec_discard(lock_t *in_lock, hash_cell_t &cell) noexcept; /** Create a new record lock and inserts it to the lock queue, without checking for deadlocks or conflicts. -@param[in] c_lock conflicting lock, or NULL +@param c_lock_info conflicting lock info @param[in] type_mode lock mode and wait flag @param[in] page_id index page number @param[in] page R-tree index page, or NULL @@ -1180,8 +1175,8 @@ without checking for deadlocks or conflicts. @param[in] holds_trx_mutex whether the caller holds trx->mutex @return created lock */ lock_t* -lock_rec_create_low( - lock_t* c_lock, +lock_rec_create( + const conflicting_lock_info &c_lock_info, unsigned type_mode, const page_id_t page_id, const page_t* page, @@ -1192,7 +1187,7 @@ lock_rec_create_low( /** Enqueue a waiting request for a lock which cannot be granted immediately. Check for deadlocks. -@param[in] c_lock conflicting lock +@param c_lock_info conflicting lock info @param[in] type_mode the requested lock mode (LOCK_S or LOCK_X) possibly ORed with LOCK_GAP or LOCK_REC_NOT_GAP, ORed with @@ -1210,7 +1205,7 @@ Check for deadlocks. @retval DB_DEADLOCK if this transaction was chosen as the victim */ dberr_t lock_rec_enqueue_waiting( - lock_t* c_lock, + const conflicting_lock_info &c_lock_info, unsigned type_mode, const page_id_t id, const page_t* page, diff --git a/storage/innobase/include/lock0lock.inl b/storage/innobase/include/lock0lock.inl index 1b9255ffb3e..37db4062e8c 100644 --- a/storage/innobase/include/lock0lock.inl +++ b/storage/innobase/include/lock0lock.inl @@ -51,28 +51,3 @@ lock_get_min_heap_no( FALSE))); } } - -/*********************************************************************//** -Creates a new record lock and inserts it to the lock queue. Does NOT check -for deadlocks or lock compatibility! -@return created lock */ -UNIV_INLINE -lock_t* -lock_rec_create( -/*============*/ - lock_t* c_lock, /*!< conflicting lock */ - unsigned type_mode,/*!< in: lock mode and wait flag */ - const buf_block_t* block, /*!< in: buffer block containing - the record */ - ulint heap_no,/*!< in: heap number of the record */ - dict_index_t* index, /*!< in: index of record */ - trx_t* trx, /*!< in,out: transaction */ - bool caller_owns_trx_mutex) - /*!< in: TRUE if caller owns - trx mutex */ -{ - return lock_rec_create_low( - c_lock, - type_mode, block->page.id(), block->page.frame, heap_no, - index, trx, caller_owns_trx_mutex); -} diff --git a/storage/innobase/include/lock0priv.h b/storage/innobase/include/lock0priv.h index e8a4cdd5240..14f0a6e0903 100644 --- a/storage/innobase/include/lock0priv.h +++ b/storage/innobase/include/lock0priv.h @@ -497,14 +497,11 @@ inline byte lock_rec_reset_nth_bit(lock_t* lock, ulint i) return(bit); } -/*********************************************************************//** -Gets the first or next record lock on a page. +/** Gets the first or next record lock on a page. +@param lock a record lock @return next lock, NULL if none exists */ UNIV_INLINE -lock_t* -lock_rec_get_next_on_page( -/*======================*/ - lock_t* lock); /*!< in: a record lock */ +lock_t *lock_rec_get_next_on_page(const lock_t *lock); /*********************************************************************//** Gets the next explicit lock request on a record. diff --git a/storage/innobase/include/lock0priv.inl b/storage/innobase/include/lock0priv.inl index 3c8ec01367b..27f12bc552d 100644 --- a/storage/innobase/include/lock0priv.inl +++ b/storage/innobase/include/lock0priv.inl @@ -101,14 +101,11 @@ lock_rec_set_nth_bit( lock->trx->lock.set_nth_bit_calls++; } -/*********************************************************************//** -Gets the first or next record lock on a page. +/** Gets the first or next record lock on a page. +@param lock a record lock @return next lock, NULL if none exists */ UNIV_INLINE -lock_t* -lock_rec_get_next_on_page( -/*======================*/ - lock_t* lock) /*!< in: a record lock */ +lock_t *lock_rec_get_next_on_page(const lock_t *lock) { return const_cast(lock_rec_get_next_on_page_const(lock)); } @@ -167,14 +164,11 @@ lock_rec_get_nth_bit( return(1 & *b >> (i % 8)); } -/*********************************************************************//** -Gets the first or next record lock on a page. +/** Gets the first or next record lock on a page. +@param lock a record lock @return next lock, NULL if none exists */ UNIV_INLINE -const lock_t* -lock_rec_get_next_on_page_const( -/*============================*/ - const lock_t* lock) /*!< in: a record lock */ +const lock_t *lock_rec_get_next_on_page_const(const lock_t *lock) { ut_ad(!lock->is_table()); diff --git a/storage/innobase/include/lock0types.h b/storage/innobase/include/lock0types.h index 0d00b4b360d..da235fb06a0 100644 --- a/storage/innobase/include/lock0types.h +++ b/storage/innobase/include/lock0types.h @@ -232,11 +232,43 @@ struct ib_lock_t return(static_cast(type_mode & LOCK_MODE_MASK)); } - bool is_rec_granted_exclusive_not_gap() const + static bool is_rec_exclusive_not_gap(unsigned type_mode) { + ut_ad(!(type_mode & LOCK_TABLE)); return (type_mode & (LOCK_MODE_MASK | LOCK_GAP)) == LOCK_X; } + bool is_rec_exclusive_not_gap() const + { + return is_rec_exclusive_not_gap(type_mode); + } + + bool is_waiting_not_gap() const + { + return (type_mode & (LOCK_WAIT | LOCK_GAP)) == LOCK_WAIT; + } + + /** Checks if a lock can be bypassed. + @param has_s_lock_or_stronger if caller's transaction already holds + not gap and not insert intention S-lock + or stronger for the same heap_no as the + current lock + @return true if the lock can be bypassed, false otherwise */ + bool can_be_bypassed(bool has_s_lock_or_stronger) const noexcept + { + ut_ad(!is_table()); + /* We don't neet do check supremum bit in the lock's bitmap here, + because the function is always called after checking for + bypass_mode, which already contains check for supremum. */ + ut_ad(!is_insert_intention() || is_gap()); + /* We don't need to check + trx->lock.wait_trx == blocking_trx && mode() == LOCK_X + condition here because there can be the following case: + S1 X2(waits for S1) S3(waits for X2), + bypassing X1 must not conflict with S3. */ + return has_s_lock_or_stronger && is_waiting_not_gap(); + } + /** Print the lock object into the given output stream. @param[in,out] out the output stream @return the given output stream. */ diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc index ed9d6d2db02..6545d38817a 100644 --- a/storage/innobase/lock/lock0lock.cc +++ b/storage/innobase/lock/lock0lock.cc @@ -56,6 +56,8 @@ Created 5/7/1996 Heikki Tuuri #include #endif /* WITH_WSREP */ +const conflicting_lock_info null_c_lock_info{nullptr, nullptr, ut_d(nullptr)}; + /** The value of innodb_deadlock_detect */ my_bool innodb_deadlock_detect; /** The value of innodb_deadlock_report */ @@ -1159,12 +1161,23 @@ func_exit: lock= lock_rec_get_next(heap_no, lock); do { + /* TODO: Conflicting locks can be only before the waiting lock, + consider the following optimization: + if (lock == wait_lock) + break; */ /* This is similar case as above except here we have record-locks instead of table locks. See details from comment above. */ if (lock->trx->mysql_thd && wsrep_will_BF_abort(lock, trx)) { + /* There can't be bypassed locks because: + 1. The transaction can't be blocked by lock to bypass because + lock_rec_other_has_conflicting() does not treat such lock as + conflicting. + 2. The lock is placed before bypassed lock in + lock_rec_create_low(). + TODO: add debug check here */ victims.emplace(lock->trx); } } while ((lock= lock_rec_get_next(heap_no, lock))); @@ -1200,8 +1213,22 @@ func_exit: } #endif /* WITH_WSREP */ -/*********************************************************************//** -Checks if some other transaction has a conflicting explicit lock request +static inline bool lock_rec_can_be_bypassing(const trx_t *trx, + const lock_t *lock) +{ + ut_ad(!lock->is_insert_intention() || lock->is_gap()); + static_assert(int{LOCK_S} == 2, ""); + static_assert(int{LOCK_X} == 3, ""); + /* The below is an optimization of the following: + return lock->trx == trx && !(lock->type_mode & (LOCK_WAIT | LOCK_GAP)) && + lock_mode_stronger_or_eq(lock->mode(), LOCK_S); + The bitwise & with LOCK_MODE_MASK - 1 will map both LOCK_X and LOCK_S to + LOCK_S, which we are comparing to. */ + return lock->trx == trx && + (lock->type_mode & (LOCK_WAIT | LOCK_GAP | (LOCK_MODE_MASK - 1))) == LOCK_S; +} + +/** Checks if some other transaction has a conflicting explicit lock request in the queue, so that we have to wait. @param[in] mode LOCK_S or LOCK_X, possibly ORed to LOCK_GAP or LOC_REC_NOT_GAP, LOCK_INSERT_INTENTION @@ -1209,23 +1236,48 @@ LOCK_INSERT_INTENTION @param[in] id page identifier @param[in] heap_no heap number of the record @param[in] trx our transaction -@return conflicting lock and the flag which indicated if conflicting locks -which wait for the current transaction were ignored */ -static lock_t *lock_rec_other_has_conflicting(unsigned mode, - const hash_cell_t &cell, - const page_id_t id, - ulint heap_no, const trx_t *trx) +@return conflicting lock, lock after which new lock should be inserted +in lock queue in the case when the conflicting lock must be bypassed and +bypassed lock */ +static conflicting_lock_info +lock_rec_other_has_conflicting(unsigned mode, const hash_cell_t &cell, + const page_id_t id, ulint heap_no, + const trx_t *trx) noexcept { - bool is_supremum = (heap_no == PAGE_HEAP_NO_SUPREMUM); + const bool is_supremum= (heap_no == PAGE_HEAP_NO_SUPREMUM); + ut_ad(!(mode & LOCK_INSERT_INTENTION) || (mode & LOCK_GAP) || is_supremum); + const bool bypass_mode= + !is_supremum && lock_t::is_rec_exclusive_not_gap(mode); + bool has_s_lock_or_stronger= false; + const lock_t *insert_after= nullptr; + ut_d(const lock_t *bypassed= nullptr;) + const lock_t *prev_lock= nullptr; - for (lock_t* lock = lock_sys_t::get_first(cell, id, heap_no); - lock; lock = lock_rec_get_next(heap_no, lock)) { - if (lock_rec_has_to_wait(trx, mode, lock, is_supremum)) { - return(lock); - } - } + for (lock_t *lock= lock_sys_t::get_first(cell, id, heap_no); lock; + lock= lock_rec_get_next(heap_no, lock)) + { + if (bypass_mode && lock_rec_can_be_bypassing(trx, lock)) + { + has_s_lock_or_stronger= true; + } + else if (lock_rec_has_to_wait(trx, mode, lock, is_supremum)) + { + if (!bypass_mode || !lock->can_be_bypassed(has_s_lock_or_stronger)) + return {lock, nullptr, ut_d(nullptr)}; + /* Store the first lock to bypass to invoke + lock_rec_find_similar_on_page() only for the locks which precede all + bypassed locks. */ + ut_d(if (!bypassed) bypassed= lock;) + /* There can be several locks to bypass, insert bypassing lock just + before the first bypassed lock. */ + if (!insert_after) + insert_after= prev_lock; + continue; + } + prev_lock= lock; + } - return(NULL); + return {nullptr, const_cast(insert_after), ut_d(bypassed)}; } /*********************************************************************//** @@ -1294,6 +1346,69 @@ lock_number_of_tables_locked( /*============== RECORD LOCK CREATION AND QUEUE MANAGEMENT =============*/ +#ifdef UNIV_DEBUG +/** Validates the correctness of locks bypassing in lock queue on a single +record, i.e. there must not be the following sequence: + (trx1 S) (trx2 X) (trx3 X) (trx1 X) +If bypassing works correctly, where must be the following sequence instead of +the above: + (trx1 S) (trx1 X) (trx2 X) (trx3 X) +Note the above locks are record or next-key locks. +If wrong sequence is found, the function will crash with failed assertion. +@param checked_lock the lock up to which the queue to check +@param heap_no heap_no of the queue to check */ +static void lock_rec_queue_validate_bypass(const lock_t *checked_lock, + ulint heap_no) +{ + /* "do_lock_reverse_page_reorganize" causes lock queue reversing during page + reorganizing, which causes validation failure. Skip the validation for such + case. */ + DBUG_EXECUTE_IF("do_lock_reverse_page_reorganize", return;); + if (!checked_lock || checked_lock->is_waiting()) + return; + page_id_t page_id= checked_lock->un_member.rec_lock.page_id; + hash_cell_t *cell= lock_sys.rec_hash.cell_get(page_id.fold()); + auto mode= checked_lock->type_mode; + const trx_t *trx= checked_lock->trx; + const bool is_supremum= (heap_no == PAGE_HEAP_NO_SUPREMUM); + ut_ad(!(mode & LOCK_INSERT_INTENTION) || (mode & LOCK_GAP) || is_supremum); + if (is_supremum || !lock_t::is_rec_exclusive_not_gap(mode)) + return; + const lock_t *has_s_lock_or_stronger= nullptr; + const lock_t *bypassed= nullptr; + + for (lock_t *lock= lock_sys_t::get_first(*cell, page_id, heap_no); lock; + lock= lock_rec_get_next(heap_no, lock)) + { + if (lock_rec_can_be_bypassing(trx, lock)) + { + ut_ad(!bypassed || lock != checked_lock); + has_s_lock_or_stronger= lock; + continue; + } + if (lock_rec_has_to_wait(trx, mode, lock, is_supremum)) + { + if (!lock->can_be_bypassed(has_s_lock_or_stronger)) + return; + bypassed= lock; + } + ut_ad(lock != checked_lock || !bypassed); + if (lock == checked_lock) + return; + } +} + +/** Validates the correctness of locks bypassing in lock queue for each set bit +in the lock bitmap. If wrong sequence is found, the function will crash with +failed assertion. +@param lock the lock which bitmap to be checked */ +static void lock_rec_queue_validate_bypass(const lock_t *lock) { + for (ulint i= 0; i < lock_rec_get_n_bits(lock); ++i) + if (lock_rec_get_nth_bit(lock, i)) + lock_rec_queue_validate_bypass(lock, i); +} +#endif + /** Reset the wait status of a lock. @param[in,out] lock lock that was possibly being waited for */ static void lock_reset_lock_and_trx_wait(lock_t *lock) @@ -1308,6 +1423,10 @@ static void lock_reset_lock_and_trx_wait(lock_t *lock) trx->lock.wait_lock= nullptr; trx->lock.wait_trx= nullptr; lock->type_mode&= ~LOCK_WAIT; +#ifdef UNIV_DEBUG + if (!lock->is_table()) + lock_rec_queue_validate_bypass(lock); +#endif } #ifdef UNIV_DEBUG @@ -1325,7 +1444,7 @@ static void check_trx_state(const trx_t *trx) /** Create a new record lock and inserts it to the lock queue, without checking for deadlocks or conflicts. -@param[in] c_lock conflicting lock +@param[in] c_lock_info conflicting lock info @param[in] type_mode lock mode and wait flag @param[in] page_id index page number @param[in] page R-tree index page, or NULL @@ -1335,8 +1454,8 @@ without checking for deadlocks or conflicts. @param[in] holds_trx_mutex whether the caller holds trx->mutex @return created lock */ lock_t* -lock_rec_create_low( - lock_t* c_lock, +lock_rec_create( + const conflicting_lock_info &c_lock_info, unsigned type_mode, const page_id_t page_id, const page_t* page, @@ -1354,6 +1473,8 @@ lock_rec_create_low( ut_ad(!(type_mode & LOCK_TABLE)); ut_ad(trx->state != TRX_STATE_NOT_STARTED); ut_ad(!trx->is_autocommit_non_locking()); + ut_ad(c_lock_info.insert_after ? !(type_mode & LOCK_WAIT) : + !c_lock_info.bypassed); /* If rec is the supremum record, then we reset the gap and LOCK_REC_NOT_GAP bits, as all locks on the supremum are @@ -1424,23 +1545,30 @@ lock_rec_create_low( } else { /* Predicate lock always on INFIMUM (0) */ lock->un_member.rec_lock.n_bits = 8; - } + } lock_rec_bitmap_reset(lock); lock_rec_set_nth_bit(lock, heap_no); index->table->n_rec_locks++; ut_ad(index->table->get_ref_count() || !index->table->can_be_evicted); const auto lock_hash = &lock_sys.hash_get(type_mode); - lock_hash->cell_get(page_id.fold())->append(*lock, &lock_t::hash); + hash_cell_t& cell = *lock_hash->cell_get(page_id.fold()); + if (UNIV_LIKELY(!c_lock_info.insert_after)) + cell.append(*lock, &lock_t::hash); + else + cell.insert_after(*c_lock_info.insert_after, *lock, + &lock_t::hash); if (type_mode & LOCK_WAIT) { if (trx->lock.wait_trx) { - ut_ad(!c_lock || trx->lock.wait_trx == c_lock->trx); + ut_ad(!c_lock_info.conflicting + || trx->lock.wait_trx + == c_lock_info.conflicting->trx); ut_ad(trx->lock.wait_lock); ut_ad((*trx->lock.wait_lock).trx == trx); } else { - ut_ad(c_lock); - trx->lock.wait_trx = c_lock->trx; + ut_ad(c_lock_info.conflicting); + trx->lock.wait_trx = c_lock_info.conflicting->trx; ut_ad(!trx->lock.wait_lock); } trx->lock.wait_lock = lock; @@ -1451,12 +1579,13 @@ lock_rec_create_low( } MONITOR_INC(MONITOR_RECLOCK_CREATED); MONITOR_INC(MONITOR_NUM_RECLOCK); - + ut_d(lock_rec_queue_validate_bypass(lock, heap_no)); return lock; } /** Enqueue a waiting request for a lock which cannot be granted immediately. Check for deadlocks. +@param c_lock_info conflicting lock info @param[in] type_mode the requested lock mode (LOCK_S or LOCK_X) possibly ORed with LOCK_GAP or LOCK_REC_NOT_GAP, ORed with @@ -1474,7 +1603,7 @@ Check for deadlocks. @retval DB_DEADLOCK if this transaction was chosen as the victim */ dberr_t lock_rec_enqueue_waiting( - lock_t* c_lock, + const conflicting_lock_info &c_lock_info, unsigned type_mode, const page_id_t id, const page_t* page, @@ -1506,8 +1635,8 @@ lock_rec_enqueue_waiting( /* Enqueue the lock request that will wait to be granted, note that we already own the trx mutex. */ - lock_t* lock = lock_rec_create_low( - c_lock, + lock_t* lock = lock_rec_create( + c_lock_info, type_mode | LOCK_WAIT, id, page, heap_no, index, trx, true); if (prdt && type_mode & LOCK_PREDICATE) { @@ -1525,18 +1654,20 @@ lock_rec_enqueue_waiting( return DB_LOCK_WAIT; } -/*********************************************************************//** -Looks for a suitable type record lock struct by the same trx on the same page. -This can be used to save space when a new record lock should be set on a page: -no new struct is needed, if a suitable old is found. +/** Looks for a suitable type record lock struct by the same trx on the same +page. This can be used to save space when a new record lock should be set on a +page: no new struct is needed, if a suitable old is found. +@param type_mode lock type_mode field +@param heap_no heap number of the record +@param lock lock_sys.get_first() +@param last_lock the lock up to which to find +@param trx the transaction which lock we are looking for @return lock or NULL */ -static inline -lock_t* -lock_rec_find_similar_on_page( - ulint type_mode, /*!< in: lock type_mode field */ - ulint heap_no, /*!< in: heap number of the record */ - lock_t* lock, /*!< in: lock_sys.get_first() */ - const trx_t* trx) /*!< in: transaction */ +static inline lock_t *lock_rec_find_similar_on_page(ulint type_mode, + ulint heap_no, + const lock_t *lock, + const lock_t *last_lock, + const trx_t *trx) { lock_sys.rec_hash.assert_locked(lock->un_member.rec_lock.page_id); DBUG_EXECUTE_IF("innodb_skip_lock_bitmap", { @@ -1546,14 +1677,14 @@ lock_rec_find_similar_on_page( }); for (/* No op */; - lock != NULL; + lock != last_lock; lock = lock_rec_get_next_on_page(lock)) { if (lock->trx == trx && lock->type_mode == type_mode && lock_rec_get_n_bits(lock) > heap_no) { - return(lock); + return const_cast(lock); } } @@ -1576,7 +1707,8 @@ which does NOT check for deadlocks or lock compatibility! @param[in,out] trx transaction @param[in] caller_owns_trx_mutex TRUE if caller owns the transaction mutex */ TRANSACTIONAL_TARGET -static void lock_rec_add_to_queue(unsigned type_mode, const hash_cell_t &cell, +static void lock_rec_add_to_queue(const conflicting_lock_info &c_lock_info, + unsigned type_mode, const hash_cell_t &cell, const page_id_t id, const page_t *page, ulint heap_no, dict_index_t *index, trx_t *trx, bool caller_owns_trx_mutex) @@ -1623,8 +1755,8 @@ static void lock_rec_add_to_queue(unsigned type_mode, const hash_cell_t &cell, all locks on the supremum are automatically of the gap type, and we try to avoid unnecessary memory consumption of a new record lock struct for a gap type lock */ - - if (heap_no == PAGE_HEAP_NO_SUPREMUM) { + const bool is_supremum = heap_no == PAGE_HEAP_NO_SUPREMUM; + if (is_supremum) { ut_ad(!(type_mode & LOCK_REC_NOT_GAP)); /* There should never be LOCK_REC_NOT_GAP on a supremum @@ -1634,23 +1766,47 @@ static void lock_rec_add_to_queue(unsigned type_mode, const hash_cell_t &cell, } if (type_mode & LOCK_WAIT) { - goto create; } else if (lock_t *first_lock = lock_sys_t::get_first(cell, id)) { + ut_ad(!(type_mode & LOCK_INSERT_INTENTION) + || (type_mode & LOCK_GAP) || is_supremum); + const bool bypass_mode = !is_supremum + && lock_t::is_rec_exclusive_not_gap(type_mode); + bool has_s_lock_or_stronger = false; for (lock_t* lock = first_lock;;) { - if (lock->is_waiting() - && lock_rec_get_nth_bit(lock, heap_no)) { - goto create; + if (!lock_rec_get_nth_bit(lock, heap_no)) + goto cont; + ut_ad(!lock->is_insert_intention() || lock->is_gap() + || is_supremum); + if (bypass_mode && lock_rec_can_be_bypassing(trx, lock)) + { + has_s_lock_or_stronger= true; } + /* There can be several locks suited for bypassing, + skip them all, the below condition is optimization of + lock->is_waiting() + && (!bypass_mode || !lock->can_be_bypassed( + has_s_lock_or_stronger)) + so we don't have to check lock's 'waiting' flag twice.*/ + else if (lock->is_waiting() + && (!bypass_mode || !has_s_lock_or_stronger + || !lock->is_gap())) + goto create; +cont: if (!(lock = lock_rec_get_next_on_page(lock))) { break; } } + const lock_t *bypassed = c_lock_info.insert_after + ? lock_rec_get_next(heap_no, c_lock_info.insert_after) + : nullptr; + ut_ad(bypassed == c_lock_info.bypassed); /* Look for a similar record lock on the same page: if one is found and there are no waiting lock requests, we can just set the bit */ if (lock_t* lock = lock_rec_find_similar_on_page( - type_mode, heap_no, first_lock, trx)) { + type_mode, heap_no, first_lock, + bypassed, trx)) { trx_t* lock_trx = lock->trx; if (caller_owns_trx_mutex) { trx->mutex_unlock(); @@ -1663,6 +1819,7 @@ static void lock_rec_add_to_queue(unsigned type_mode, const hash_cell_t &cell, if (caller_owns_trx_mutex) { trx->mutex_lock(); } + ut_d(lock_rec_queue_validate_bypass(lock)); return; } } @@ -1672,9 +1829,9 @@ create: because we should be moving an existing waiting lock request. */ ut_ad(!(type_mode & LOCK_WAIT) || trx->lock.wait_trx); - lock_rec_create_low(nullptr, - type_mode, id, page, heap_no, index, trx, - caller_owns_trx_mutex); + lock_rec_create(c_lock_info, + type_mode, id, page, heap_no, index, trx, + caller_owns_trx_mutex); } /** A helper function for lock_rec_lock_slow(), which grants a Next Key Lock @@ -1713,12 +1870,13 @@ static void lock_reuse_for_next_key_lock(const lock_t *held_lock, that GAP Locks do not conflict with anything. Therefore a GAP Lock could be granted to us right now if we've requested: */ mode|= LOCK_GAP; - ut_ad(nullptr == - lock_rec_other_has_conflicting(mode, cell, id, heap_no, trx)); + ut_ad(nullptr == lock_rec_other_has_conflicting(mode, cell, id, heap_no, trx) + .conflicting); /* It might be the case we already have one, so we first check that. */ if (lock_rec_has_expl(mode, cell, id, heap_no, trx) == nullptr) - lock_rec_add_to_queue(mode, cell, id, page, heap_no, index, trx, true); + lock_rec_add_to_queue(null_c_lock_info, mode, cell, id, page, heap_no, + index, trx, true); } @@ -1806,21 +1964,26 @@ lock_rec_lock( /* Do nothing if the trx already has a strong enough lock on rec */ if (!held_lock) { - if (lock_t *c_lock= lock_rec_other_has_conflicting(mode, g.cell(), id, - heap_no, trx)) + conflicting_lock_info c_lock_info= + lock_rec_other_has_conflicting(mode, g.cell(), id, heap_no, trx); + if (c_lock_info.conflicting) /* If another transaction has a non-gap conflicting request in the queue, as this transaction does not have a lock strong enough already granted on the record, we have to wait. */ - err= lock_rec_enqueue_waiting(c_lock, mode, id, block->page.frame, - heap_no, index, thr, nullptr); - else if (!impl) + err= lock_rec_enqueue_waiting(c_lock_info, mode, id, + block->page.frame, heap_no, index, thr, + nullptr); + /* If some lock was bypassed, we need to create explicit lock to avoid + conflicting lock search on every try to convert implicit to explicit + lock. */ + else if (!impl || c_lock_info.insert_after) { /* Set the requested lock on the record. */ - lock_rec_add_to_queue(mode, g.cell(), id, block->page.frame, heap_no, - index, trx, true); + lock_rec_add_to_queue(c_lock_info, mode, g.cell(), id, + block->page.frame, heap_no, index, trx, true); err= DB_SUCCESS_LOCKED_REC; } } @@ -1853,46 +2016,74 @@ lock_rec_lock( /* Simplified and faster path for the most common cases */ if (!impl) - lock_rec_create_low(nullptr, mode, id, block->page.frame, heap_no, index, - trx, false); + lock_rec_create(null_c_lock_info, mode, id, block->page.frame, heap_no, + index, trx, false); return DB_SUCCESS_LOCKED_REC; } -/*********************************************************************//** -Checks if a waiting record lock request still has to wait in a queue. -@return lock that is causing the wait */ -static -const lock_t* +/** Checks if a waiting record lock request still has to wait in a queue. +@param cell record locks hash table cell for waiting lock +@param wait_lock waiting lock +@return lock that is causing the wait, lock after which new lock should be +inserted in lock queue in the case when the lock that is causing the wait must +be bypassed and bypassed lock itself */ +static conflicting_lock_info lock_rec_has_to_wait_in_queue(const hash_cell_t &cell, const lock_t *wait_lock) { - const lock_t* lock; - ulint heap_no; - ulint bit_mask; - ulint bit_offset; + const lock_t *lock; + ulint heap_no; + ulint bit_mask; + ulint bit_offset; - ut_ad(wait_lock->is_waiting()); - ut_ad(!wait_lock->is_table()); + ut_ad(wait_lock->is_waiting()); + ut_ad(!wait_lock->is_table()); - heap_no = lock_rec_find_set_bit(wait_lock); + heap_no= lock_rec_find_set_bit(wait_lock); + const bool is_supremum= (heap_no == PAGE_HEAP_NO_SUPREMUM); + ut_ad(!(wait_lock->is_insert_intention()) || + (wait_lock->is_gap()) || is_supremum); + const bool bypass_mode= + !is_supremum && wait_lock->is_rec_exclusive_not_gap(); + bool has_s_lock_or_stronger= false; + const lock_t *insert_after= nullptr; + ut_d(const lock_t *bypassed= nullptr); - bit_offset = heap_no / 8; - bit_mask = static_cast(1) << (heap_no % 8); + bit_offset= heap_no / 8; + bit_mask= static_cast(1) << (heap_no % 8); - for (lock = lock_sys_t::get_first( - cell, wait_lock->un_member.rec_lock.page_id); - lock != wait_lock; - lock = lock_rec_get_next_on_page_const(lock)) { - const byte* p = (const byte*) &lock[1]; - - if (heap_no < lock_rec_get_n_bits(lock) - && (p[bit_offset] & bit_mask) - && lock_has_to_wait(wait_lock, lock)) { - return(lock); - } - } - - return(NULL); + const trx_t *trx= wait_lock->trx; + const lock_t *prev_lock= nullptr; + /* We can't use lock_sys_t::get_first(cell, id, heap_no) here as in + lock_rec_other_has_conflicting() because we iterate locks only till + wait_lock */ + for (lock= + lock_sys_t::get_first(cell, wait_lock->un_member.rec_lock.page_id); + lock != wait_lock; lock= lock_rec_get_next_on_page_const(lock)) + { + const byte *p= (const byte *) &lock[1]; + if (heap_no >= lock_rec_get_n_bits(lock) || !(p[bit_offset] & bit_mask)) + continue; + if (bypass_mode && lock_rec_can_be_bypassing(trx, lock)) + { + has_s_lock_or_stronger= true; + } + else if (lock_has_to_wait(wait_lock, lock)) + { + if (!bypass_mode || !lock->can_be_bypassed(has_s_lock_or_stronger)) + return {lock, nullptr, ut_d(nullptr)}; + /* Store only the first lock to bypass. */ + ut_d(if (!bypassed) + bypassed= lock;) + /* There can be several locks to bypass, insert bypassing lock just + before the first bypassed lock. */ + if (!insert_after) + insert_after= prev_lock; + continue; + } + prev_lock= lock; + } + return {nullptr, const_cast(insert_after), ut_d(bypassed)}; } /** Note that a record lock wait started */ @@ -2375,13 +2566,15 @@ static void lock_rec_dequeue_from_page(lock_t *in_lock, bool owns_wait_mutex) grant locks if there are no conflicting locks ahead. Stop at the first X lock that is waiting or has been granted. */ - for (lock_t* lock = lock_sys_t::get_first(cell, page_id); - lock != NULL; - lock = lock_rec_get_next_on_page(lock)) { - - if (!lock->is_waiting()) { + for (lock_t* lock = lock_sys_t::get_first(cell, page_id), *next; + lock != NULL; lock= next) { + /* Store pointer to the next element, because if some lock is + bypassed, the pointer to the next lock in the current lock + object will be changed, as the current lock will change + its position in lock queue. */ + next= lock_rec_get_next_on_page(lock); + if (!lock->is_waiting()) continue; - } if (!owns_wait_mutex) { mysql_mutex_lock(&lock_sys.wait_mutex); @@ -2390,10 +2583,10 @@ static void lock_rec_dequeue_from_page(lock_t *in_lock, bool owns_wait_mutex) ut_ad(lock->trx->lock.wait_trx); ut_ad(lock->trx->lock.wait_lock); - - if (const lock_t* c = lock_rec_has_to_wait_in_queue( - cell, lock)) { - trx_t* c_trx = c->trx; + conflicting_lock_info c_lock_info= + lock_rec_has_to_wait_in_queue(cell, lock); + if (c_lock_info.conflicting) { + trx_t* c_trx = c_lock_info.conflicting->trx; lock->trx->lock.wait_trx = c_trx; if (c_trx->lock.wait_trx && innodb_deadlock_detect @@ -2401,6 +2594,12 @@ static void lock_rec_dequeue_from_page(lock_t *in_lock, bool owns_wait_mutex) Deadlock::to_be_checked = true; } } else { + if (UNIV_UNLIKELY(c_lock_info.insert_after != nullptr)) + { + cell.remove(*lock, &lock_t::hash); + cell.insert_after(*c_lock_info.insert_after, + *lock, &lock_t::hash); + } /* Grant the lock */ ut_ad(lock->trx != in_lock->trx); lock_grant(lock); @@ -2551,9 +2750,9 @@ lock_rec_inherit_to_gap(hash_cell_t &heir_cell, const page_id_t heir, ((!from_split || !lock->is_record_not_gap()) && lock->mode() != (lock_trx->duplicates ? LOCK_S : LOCK_X)))) { - lock_rec_add_to_queue(LOCK_GAP | lock->mode(), heir_cell, heir, - heir_page, heir_heap_no, lock->index, lock_trx, - false); + lock_rec_add_to_queue(null_c_lock_info, LOCK_GAP | lock->mode(), + heir_cell, heir, heir_page, heir_heap_no, + lock->index, lock_trx, false); } } } @@ -2583,7 +2782,7 @@ lock_rec_inherit_to_gap_if_gap_lock( !lock->is_insert_intention() && (heap_no == PAGE_HEAP_NO_SUPREMUM || !lock->is_record_not_gap()) && !lock_table_has(lock->trx, lock->index->table, LOCK_X)) - lock_rec_add_to_queue(LOCK_GAP | lock->mode(), + lock_rec_add_to_queue(null_c_lock_info, LOCK_GAP | lock->mode(), g.cell(), id, block->page.frame, heir_heap_no, lock->index, lock->trx, false); } @@ -2629,15 +2828,19 @@ lock_rec_move( /* Note that we FIRST reset the bit, and then set the lock: the function works also if donator_id == receiver_id */ - lock_rec_add_to_queue(type_mode, receiver_cell, - receiver_id, receiver.page.frame, - receiver_heap_no, + lock_rec_add_to_queue(null_c_lock_info, type_mode, + receiver_cell, receiver_id, + receiver.page.frame, receiver_heap_no, lock->index, lock_trx, true); lock_trx->mutex_unlock(); } ut_ad(!lock_sys_t::get_first(donator_cell, donator_id, donator_heap_no)); + ut_d(lock_rec_queue_validate_bypass(lock_sys_t::get_first( + receiver_cell, + receiver_id, receiver_heap_no), + receiver_heap_no)); } /** Move all the granted locks to the front of the given lock list. @@ -2796,8 +2999,9 @@ lock_move_reorganize_page( /* NOTE that the old lock bitmap could be too small for the new heap number! */ - lock_rec_add_to_queue(lock->type_mode, cell, id, block->page.frame, - new_heap_no, lock->index, lock_trx, true); + lock_rec_add_to_queue(null_c_lock_info, lock->type_mode, cell, id, + block->page.frame, new_heap_no, lock->index, + lock_trx, true); } lock_trx->mutex_unlock(); @@ -2939,9 +3143,9 @@ lock_move_rec_list_end( lock->type_mode&= ~LOCK_WAIT; } - lock_rec_add_to_queue(type_mode, g.cell2(), new_id, - new_page, - rec2_heap_no, lock->index, lock_trx, true); + lock_rec_add_to_queue(null_c_lock_info, type_mode, g.cell2(), new_id, + new_page, rec2_heap_no, lock->index, lock_trx, + true); } lock_trx->mutex_unlock(); @@ -3062,7 +3266,7 @@ lock_move_rec_list_start( lock->type_mode&= ~LOCK_WAIT; } - lock_rec_add_to_queue(type_mode, g.cell2(), new_id, + lock_rec_add_to_queue(null_c_lock_info, type_mode, g.cell2(), new_id, new_block->page.frame, rec2_heap_no, lock->index, lock_trx, true); } @@ -3156,7 +3360,7 @@ lock_rtr_move_rec_list( lock->type_mode&= ~LOCK_WAIT; } - lock_rec_add_to_queue(type_mode, g.cell2(), new_id, + lock_rec_add_to_queue(null_c_lock_info, type_mode, g.cell2(), new_id, new_block->page.frame, rec2_heap_no, lock->index, lock_trx, true); @@ -4253,19 +4457,30 @@ static void lock_rec_rebuild_waiting_queue( { lock_sys.assert_locked(cell); - for (lock_t *lock= first_lock; lock != NULL; - lock= lock_rec_get_next(heap_no, lock)) + for (lock_t *lock= first_lock, *next; lock != NULL; lock= next) { + /* Store pointer to the next element, because if some lock is + bypassed, the pointer to the next lock in the current lock + object will be changed, as the current lock will change + its position in lock queue. */ + next= lock_rec_get_next(heap_no, lock); if (!lock->is_waiting()) continue; mysql_mutex_lock(&lock_sys.wait_mutex); ut_ad(lock->trx->lock.wait_trx); ut_ad(lock->trx->lock.wait_lock); - if (const lock_t *c= lock_rec_has_to_wait_in_queue(cell, lock)) - lock->trx->lock.wait_trx= c->trx; + conflicting_lock_info c_lock_info= + lock_rec_has_to_wait_in_queue(cell, lock); + if (c_lock_info.conflicting) + lock->trx->lock.wait_trx= c_lock_info.conflicting->trx; else { + if (c_lock_info.insert_after) + { + cell.remove(*lock, &lock_t::hash); + cell.insert_after(*c_lock_info.insert_after, *lock, &lock_t::hash); + } /* Grant the lock */ ut_ad(trx != lock->trx); lock_grant(lock); @@ -4696,8 +4911,10 @@ reiterate: { ut_ad(!lock->index->table->is_temporary()); bool supremum_bit= lock_rec_get_nth_bit(lock, PAGE_HEAP_NO_SUPREMUM); + /* if XA is being prepared, it must not own waiting locks */ + ut_ad(!lock->is_waiting()); bool rec_granted_exclusive_not_gap= - lock->is_rec_granted_exclusive_not_gap(); + lock->is_rec_exclusive_not_gap(); if (UNIV_UNLIKELY(lock->type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE))) continue; /* SPATIAL INDEX locking is broken. */ const auto fold = lock->un_member.rec_lock.page_id.fold(); @@ -4870,7 +5087,9 @@ reiterate: if (!lock->is_table()) { ut_ad(!lock->index->table->is_temporary()); - if (!lock->is_rec_granted_exclusive_not_gap()) + /* if XA is being prepared, it must not own waiting locks */ + ut_ad(!lock->is_waiting()); + if (!lock->is_rec_exclusive_not_gap()) lock_rec_dequeue_from_page(lock, false); else if (UNIV_UNLIKELY(lock->type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE))) @@ -5432,7 +5651,8 @@ lock_rec_queue_validate( ut_ad(trx_state_eq(lock->trx, TRX_STATE_COMMITTED_IN_MEMORY) || !lock->is_waiting() - || lock_rec_has_to_wait_in_queue(cell, lock)); + || lock_rec_has_to_wait_in_queue(cell, lock). + conflicting); lock->trx->mutex_unlock(); } @@ -5524,7 +5744,8 @@ func_exit: if (lock->is_waiting()) { ut_a(lock->is_gap() - || lock_rec_has_to_wait_in_queue(cell, lock)); + || lock_rec_has_to_wait_in_queue(cell, lock). + conflicting); } else if (!lock->is_gap()) { const lock_mode mode = lock->mode() == LOCK_S ? LOCK_X : LOCK_S; @@ -5830,13 +6051,16 @@ lock_rec_insert_check_and_lock( on the successor, which produced an unnecessary deadlock. */ const unsigned type_mode= LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION; - if (lock_t *c_lock= lock_rec_other_has_conflicting(type_mode, - g.cell(), id, - heap_no, trx)) + conflicting_lock_info c_lock_info= lock_rec_other_has_conflicting( + type_mode, g.cell(), id, heap_no, trx); + /* Insert intention locks must not bypass any other lock. */ + ut_ad(!c_lock_info.insert_after && !c_lock_info.bypassed); + if (c_lock_info.conflicting) { trx->mutex_lock(); - err= lock_rec_enqueue_waiting(c_lock, type_mode, id, block->page.frame, - heap_no, index, thr, nullptr); + err= lock_rec_enqueue_waiting(c_lock_info, type_mode, id, + block->page.frame, heap_no, index, thr, + nullptr); trx->mutex_unlock(); } } @@ -5905,8 +6129,9 @@ static trx_t *lock_rec_convert_impl_to_expl_for_trx(trx_t *trx, if (!trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY) && !lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, g.cell(), id, heap_no, trx)) - lock_rec_add_to_queue(LOCK_X | LOCK_REC_NOT_GAP, g.cell(), id, - page_align(rec), heap_no, index, trx, true); + lock_rec_add_to_queue(null_c_lock_info, LOCK_X | LOCK_REC_NOT_GAP, + g.cell(), id, page_align(rec), heap_no, index, + trx, true); } trx->release_reference(); diff --git a/storage/innobase/lock/lock0prdt.cc b/storage/innobase/lock/lock0prdt.cc index 3ea05ddb741..12b2a990f8c 100644 --- a/storage/innobase/lock/lock0prdt.cc +++ b/storage/innobase/lock/lock0prdt.cc @@ -470,8 +470,9 @@ create: because we should be moving an existing waiting lock request. */ ut_ad(!(type_mode & LOCK_WAIT) || trx->lock.wait_trx); - lock_t* lock = lock_rec_create(nullptr, - type_mode, block, PRDT_HEAPNO, index, + lock_t* lock = lock_rec_create(null_c_lock_info, + type_mode, block->page.id(), + block->page.frame, PRDT_HEAPNO, index, trx, caller_owns_trx_mutex); if (lock->type_mode & LOCK_PREDICATE) { @@ -533,8 +534,9 @@ lock_prdt_insert_check_and_lock( trx->mutex_lock(); /* Allocate MBR on the lock heap */ lock_init_prdt_from_mbr(prdt, mbr, 0, trx->lock.lock_heap); - err= lock_rec_enqueue_waiting(c_lock, mode, id, block->page.frame, - PRDT_HEAPNO, index, thr, prdt); + err= lock_rec_enqueue_waiting({c_lock, nullptr, ut_d(nullptr)}, mode, id, + block->page.frame, PRDT_HEAPNO, index, + thr, prdt); trx->mutex_unlock(); } } @@ -734,10 +736,10 @@ lock_prdt_lock( lock_t* lock = lock_sys_t::get_first(g.cell(), id); if (lock == NULL) { - lock = lock_rec_create( - NULL, - prdt_mode, block, PRDT_HEAPNO, - index, trx, FALSE); + lock = lock_rec_create(null_c_lock_info, + prdt_mode, block->page.id(), + block->page.frame, PRDT_HEAPNO, index, + trx, FALSE); status = LOCK_REC_SUCCESS_CREATED; } else { @@ -759,7 +761,8 @@ lock_prdt_lock( prdt_mode, g.cell(), id, prdt, trx)) { err = lock_rec_enqueue_waiting( - wait_for, prdt_mode, id, + {wait_for, nullptr, ut_d(nullptr)}, + prdt_mode, id, block->page.frame, PRDT_HEAPNO, index, thr, prdt); } else { @@ -826,10 +829,9 @@ lock_place_prdt_page_lock( } if (lock == NULL) { - lock = lock_rec_create_low( - NULL, - mode, page_id, NULL, PRDT_HEAPNO, - index, trx, FALSE); + lock = lock_rec_create(null_c_lock_info, + mode, page_id, NULL, PRDT_HEAPNO, + index, trx, FALSE); #ifdef PRDT_DIAG printf("GIS_DIAGNOSTIC: page lock %d\n", (int) page_no);