1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-08 11:22:35 +03:00

MDEV-34877 Port "Bug #11745929 Change lock priority so that the transaction holding S-lock gets X-lock first" fix from MySQL to MariaDB

This commit implements
mysql/mysql-server@7037a0bdc8
functionality.

If some transaction 't' requests not-gap X-lock 'Xt' on record 'r', and
locks list of the record 'r' contains not-gap granted S-lock 'St' of
transaction 't', followed by not-gap waiting locks WB={Wb1,
Wb2, ..., Wbn} conflicting with 'Xt', and 'Xt' does not conflict with any
other lock, located in the list after 'St', then grant 'Xt'. Note that
insert-intention locks are also gap locks.

If some transaction 't' holds not-gap lock 'Lt' on record 'r', and some
other transactions have not-gap continuous waiting locks sequence
L(B)={L(b1), L(b2), ..., L(bn)} following L(t) in
the list of locks for the record 'r', and transaction 't' requests not-gap,
what means also not insert intention, as ii-locks are also gap locks,
X-lock conflicting with any lock in L(B), then grant the.

MySQL's commit contains the following explanation of why insert-intention
locks must not overtake a waiting ordinary or gap locks:

"It is important that this decission rule doesn't allow
INSERT_INTENTION locks to overtake WAITING locks on gaps (`S`, `S|GAP`,
`X`, `X|GAP`), as inserting a record into a gap would split such WAITING
lock, violating the invariant that each transaction can have at most
single WAITING lock at any time."

I would add to the explanation the following. Suppose we have trx 1 which
holds ordinary X-lock on some record. And trx 2 executes "DELETE FROM t"
or "SELECT * FOR UPDATE" in RR(see lock_delete_updated.test and
MDEV-27992), i.e. it creates waiting ordinary X-lock on the same record.
And then trx 1 wants to insert some record just before the locked record.
It requests insert-intention lock, and if the lock overtakes trx 2 lock,
there will be phantom records for trx 2 in RR. lock_delete_updated.test
shows how "DELETE" allows to insert some records in already scanned gap
and misses some records to delete.

The current implementation differs from MySQL implementation. There are
two key differences:

1. Lock queue ordering. In MySQL all waiting locks precede all granted
   locks. A new waiting lock is added to the head of the queue, a new
   granted lock is added to the end of the queue, if some waiting lock
   is granted, it's moved to the end of the queue. In MariaDB any new
   lock is added to the end of the queue and waiting lock does not change
   its position in the queue where the lock is granted. The rule is that
   blocking lock must be located before blocked lock in lock queue. We
   maintain the rule with inserting bypassing lock just before bypassed
   one.

2. MySQL implementation uses some object(locksys::Trx_locks_cache) which
   can be passed to consecutive calls to rec_lock_has_to_wait() for the
   same trx and heap_no to cache the result of checking if trx has a
   granted lock which is blocking the waiting lock(see
   locksys::Trx_locks_cache::has_granted_blocker()). The current
   implementation does not use such object, because it looks for such
   granted lock on the level of lock_rec_other_has_conflicting() and
   lock_rec_has_to_wait_in_queue(). I.e. there is no need in additional
   lock queue iteration in
   locksys::Trx_locks_cache::has_granted_blocker(), as we already iterate
   it in lock_rec_other_has_conflicting() and
   lock_rec_has_to_wait_in_queue().

During the testing the following case was found. Suppose we have
delete-marked record and going to do inplace insert into
that delete-marked record. Usually we don't create explicit lock if
there are no conlicting with not gap X-lock locks(see
lock_clust_rec_modify_check_and_lock(), btr_cur_update_in_place()). The
implicit lock will be converted to explicit one by demand.

That can happen during INSERT, the not-gap S-lock can
be acquired on searching for duplicates(see
row_ins_duplicate_error_in_clust()), and, if delete-marked record is
found, inplace insert(see btr_cur_upd_rec_in_place()) modifies the
record, what is treated as implicit lock.

But there can be a case when some transaction trx1 holds not-gap S-lock,
another transaction trx2 creates waiting X-lock, and then trx2 tries to
do inplace insert. Before the fix the waiting X-lock of trx2 would be
conflicting lock, and trx1 would try to create explicit X-lock, what
would cause deadlock, and one of the transactions whould be rolled back.
But after the fix, trx2 waiting X-lock is not treated as conflicting
with trx1 X-lock anymore, as trx1 already holds S-lock. If we don't create
explicit lock, then some other transaction trx3 can create it during
implicit to explicit lock conversion and place it at the end of the
queue. So there can be the following locks order in the queue:

S1(granted) X2(waiting) X1(granted)

The above queue is not valid, because all granted trx1 locks must be
placed before waiting trx2 lock. Besides, lock_rec_release_try() can
remove S(granted, trx1) lock and grant X lock to trx 2, and there can be
two granted X-locks on the same record:

X2(granted) X1(granted)

Taking into account that lock_rec_release_try() can release cell and
lock_sys latches leaving some locks unreleased, the queue validation
function can fail in any unexpected place.

It can be fixed with two ways:

1) Place explicit X(granted, trx1) lock before X(waiting, trx2) lock
   during implicit to explicit lock conversion. This option is implemented
   in MySQL, as granted lock is always placed at the top of locks queue,
   and waiting locks are placed at the bottom of the queue. MariaDB does
   not do this, and implementing this variant would require conflicting
   locks search before converting implicit to explicit lock, what, in
   turns, would require cell and/or lock_sys latch acquiring.

2) Create and place X(granted, trx1) lock before X(waiting, trx2) during
   inplace INSERT, i.e. when lock_rec_lock() is invoked from
   lock_clust_rec_modify_check_and_lock() or
   lock_sec_rec_modify_check_and_lock(), if X(waiting, trx2) is
   bypassed. Such a way we don't need in additional conflicting locks
   search, as they are searched anyway in lock_rec_low().

This fix implements the second variant(see the changes around
c_lock_info.insert_after in lock_rec_lock). I.e. if some record was
delete-marked and we do inplace insert in such a record, and some lock for
bypass was found, create explicit lock to avoid conflicting lock search on
each implicit to explicit lock conversion. We can remove it if MDEV-35624
is implemented.

lock_rec_other_has_conflicting(), lock_rec_has_to_wait_in_queue():
search locks to bypass along with conflicting locks searching in the
same loop. The result is returned in conflicting_lock_info object.
There can be several locks to bypass, only the first one is returned to
limit lock_rec_find_similar_on_page() with the first bypassed lock to
preserve "blocking before blocked" invariant. conflicting_lock_info also
contains a pointer to the lock, after which we can insert bypassing
lock. This lock precedes bypassed one.

Bypassing lock can be next-key lock, and the following cases are
possible:

1. S1(not-gap, granted) II2(granted) X3(waiting for S1),

   When new X1(ordinary) lock is acquired, there will be the following
   locks queue:

   S1(not-gap, granted) II2(granted) X1(ordinary, granted) X3(waiting for
   S1)

   If we had inserted new X1 lock just after S1, and S1 had been released
   on transaction commit or rollback, we would have the following
   sequence in the locks queue:

   X1(ordinary, granted) II2(granted) X3(waiting for X1)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   This is not a real issue as II lock once granted can be
   ignored but it could possibly hit some assert(taking into account
   that lock_release_try() can release lock_sys latch, and other threads
   can acquire the latch and validate lock queue) as it breaks our design
   constraint that any granted lock in the queue should not conflict
   with locks ahead in the queue. But lock_rec_queue_validate() does not
   check the above constraint. We place new bypassing lock just before
   bypassed one, but there still can be the case when lock bitmap is used
   instead of creating new lock object(see lock_rec_add_to_queue() and
   lock_rec_find_similar_on_page()), and the lock, which owns the
   bitmap, can precede II2(granted). We can either disable
   lock_rec_find_similar_on_page() space optimization for bypassing locks
   or treat "X1(ordinary, granted) II2(granted)" sequence as valid. As
   we don't currently have the function which would fail on the above
   sequence, let treat it as valid for the case, when lock_release()
   execution is in process.

2. S1(ordinary, granted) II2(waiting for S1) X3(waiting for S1)

   When new X1(ordinary) lock is acquired, there will be the following
   locks queue:

   S1(ordinary, granted) II2(waiting for S1) X1(ordinary, granted)
   X3(waiting for S1).

   After S1 releasing there will be:

   II2(granted) X1(ordinary, granted) X3(waiting for X1)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

   The above queue is valid because ordinary lock does not conflict with
   II-lock(see lock_rec_has_to_wait()).

lock_rec_create_low(): insert new lock to the position which
lock_rec_other_has_conflicting(), lock_rec_has_to_wait_in_queue()
returned if the lock is bypassing.

lock_rec_find_similar_on_page(): add ability to limit similiar lock search
with the certain lock to preserve "blocking before blocked" invariant for
all bypassed locks.

lock_rec_add_to_queue(): don't treat bypassed locks as waiting ones to
let lock bitmap reusing for bypassing locks.

lock_rec_lock(): fix inplace insert case, explained above.

lock_rec_dequeue_from_page(), lock_rec_rebuild_waiting_queue(): move
bypassing lock to the correct place to preserve "blocking before blocked"
invariant.

Reviewed by: Debarun Banerjee, Marko Mäkelä.
This commit is contained in:
Vlad Lesin
2024-10-10 15:43:36 +03:00
parent c1559f261f
commit 3a6af458e6
10 changed files with 877 additions and 213 deletions

View File

@@ -0,0 +1,198 @@
connect stop_purge,localhost,root;
START TRANSACTION WITH CONSISTENT SNAPSHOT;
connect con1,localhost,root,,;
connect con2,localhost,root,,;
connect con3,localhost,root,,;
connection default;
CREATE TABLE t1 (id INT PRIMARY KEY) ENGINE=InnoDB STATS_PERSISTENT=0;
INSERT INTO t1 (id) VALUES (1);
# Simplest scenario:
# <con1, S, granted>,
# <con1, S, granted>, <con2, X, waiting for con1>,
# Before MDEV-34877:
# <con1, S, granted>, <con2, X, waiting for con1>, <con1, X, waiting for con1>
# After MDEV-34877:
# <con1, S, granted>, <con1, X, granted>, <con2, X, waiting for con1>
# Expected: instead of deadlocking, the con1's request should ingore con2's
connection con1;
BEGIN;
SELECT * FROM t1 LOCK IN SHARE MODE;
id
1
connection con2;
BEGIN;
SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con2_will_wait';
SELECT * FROM t1 FOR UPDATE;
connection con1;
SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait';
SELECT * FROM t1 FOR UPDATE;
id
1
COMMIT;
connection con2;
id
1
COMMIT;
# The scenario when we bypass X<-S pair:
# <con1, S, granted>,
# <con1, S, granted>, <con2, X, waiting for con1>,
# <con1, S, granted>, <con2, X, waiting for con1>, <con3, S, waiting for con2>
# <con1, S, granted>, <con1, X, granted>, <con2, X, waiting for con1>, <con3, S, waiting for con2>
connection con1;
BEGIN;
SELECT * FROM t1 LOCK IN SHARE MODE;
id
1
connection con2;
BEGIN;
SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con2_will_wait';
SELECT * FROM t1 FOR UPDATE;
connection con3;
SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait';
BEGIN;
SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait';
SELECT * FROM t1 LOCK IN SHARE MODE;;
connection con1;
SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait';
SELECT * FROM t1 FOR UPDATE;
id
1
COMMIT;
connection con2;
id
1
COMMIT;
connection con3;
id
1
COMMIT;
# A variant of the above scenario:
# <con1, X REC_NOT_GAP, granted>,
# <con1, X REC_NOT_GAP, granted>, <con2, S, waiting for con1>,
# <con1, X REC_NOT_GAP, granted>, <con2, S, waiting for con1>, <con1, INSERT INTENTION, waiting for con1>
# Expected: a deadlock, as INSERT INTENTION should not overtake locks on gap, to not slice them
connection con1;
BEGIN;
SELECT * FROM t1 WHERE id=1 FOR UPDATE;
id
1
connection con2;
BEGIN;
SET DEBUG_SYNC = 'lock_wait_start SIGNAL con2_will_wait';
SELECT * FROM t1 LOCK IN SHARE MODE;
connection con1;
SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait';
INSERT INTO t1 VALUES (0);
ROLLBACK;
connection con2;
ERROR 40001: Deadlock found when trying to get lock; try restarting transaction
COMMIT;
# More complicated scenario:
# <con1, S, granted>,
# <con1, S, granted>, <con2, S REC_NOT_GAP, granted>,
# <con1, S, granted>, <con2, S REC_NOT_GAP, granted>, <con3, X, waiting for con1>
# <con1, S, granted>, <con2, S REC_NOT_GAP, granted>, <con3, X, waiting for con1>, <con1, INSERT_INTENTION, waiting for con3>
# <con1, S, granted>, <con3, X, waiting for con1>, <con1, INSERT_INTENTION, waiting for con3>
# Expected: a deadlock, as INSERT INTENTION should not overtake locks on gap, to not slice them
connection con1;
BEGIN;
SELECT * FROM t1 LOCK IN SHARE MODE;
id
1
connection con2;
BEGIN;
SELECT * FROM t1 WHERE id=1 LOCK IN SHARE MODE;
id
1
connection con3;
SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait';
SELECT * FROM t1 FOR UPDATE;
connection con1;
SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait';
SET DEBUG_SYNC = 'lock_wait_start SIGNAL con1_will_wait';
INSERT INTO t1 VALUES (0);
connection con2;
SET DEBUG_SYNC = 'now WAIT_FOR con1_will_wait';
COMMIT;
connection con1;
ROLLBACK;
connection con3;
ERROR 40001: Deadlock found when trying to get lock; try restarting transaction
# More complicated scenario.
# <con1, S, granted>,
# <con1, S, granted>, <con2, S REC_NOT_GAP, granted>,
# <con1, S, granted>, <con2, S REC_NOT_GAP, granted>, <con3, X, waiting for con1>
# <con1, S, granted>, <con2, S REC_NOT_GAP, granted>, <con3, X, waiting for con1>, <con1, X REC_NOT_GAP, waiting for con2>
# Before MDEV-34877:
# <con1, S, granted>, <con3, X, waiting for con1>, <con1, X REC_NOT_GAP, waiting for con3>
# After MDEV-34877:
# <con1, S, granted>, <con1, X REC_NOT_GAP, granted>, <con3, X, waiting for con1>
connection con1;
BEGIN;
SELECT * FROM t1 LOCK IN SHARE MODE;
id
1
connection con2;
BEGIN;
SELECT * FROM t1 WHERE id=1 LOCK IN SHARE MODE;
id
1
connection default;
connection con3;
SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait';
SELECT * FROM t1 FOR UPDATE;
connection con1;
SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait';
SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con1_will_wait';
SELECT * FROM t1 WHERE id=1 FOR UPDATE;
connection con2;
SET DEBUG_SYNC = 'now WAIT_FOR con1_will_wait';
COMMIT;
connection con1;
id
1
COMMIT;
connection con3;
id
1
COMMIT;
# A secenario, where con1 has to bypass two transactions:
# <con1, S, granted>
# <con1, S, granted> <con2, X, waiting>
# <con1, S, granted> <con2, X, waiting> <con3, X, waiting>
# Before MDEV-34877:
# <con1, S, granted> <con2, X, waiting> <con3, X, waiting> <con1, X REC_NOT_GAP, waiting for con2>
# After MDEV-34877:
# <con1, S, granted> <con1, X REC_NOT_GAP, granted> <con2, X, waiting> <con3, X, waiting>
connection con1;
BEGIN;
SELECT * FROM t1 LOCK IN SHARE MODE;
id
1
connection con2;
SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con2_will_wait';
SELECT * FROM t1 FOR UPDATE;
connection con3;
SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait';
SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait';
SELECT * FROM t1 FOR UPDATE;
connection con1;
SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait';
SELECT * FROM t1 WHERE id=1 FOR UPDATE;
id
1
COMMIT;
connection con2;
id
1
COMMIT;
connection con3;
id
1
COMMIT;
connection default;
disconnect con1;
disconnect con2;
disconnect con3;
disconnect stop_purge;
DROP TABLE t1;

View File

@@ -0,0 +1,228 @@
--source include/have_innodb.inc
--source include/have_debug.inc
--source include/have_debug_sync.inc
--source include/count_sessions.inc
--disable_query_log
call mtr.add_suppression("InnoDB: Transaction was aborted due to ");
--enable_query_log
connect stop_purge,localhost,root;
START TRANSACTION WITH CONSISTENT SNAPSHOT;
--connect (con1,localhost,root,,)
--connect (con2,localhost,root,,)
--connect (con3,localhost,root,,)
--connection default
CREATE TABLE t1 (id INT PRIMARY KEY) ENGINE=InnoDB STATS_PERSISTENT=0;
INSERT INTO t1 (id) VALUES (1);
--echo # Simplest scenario:
--echo # <con1, S, granted>,
--echo # <con1, S, granted>, <con2, X, waiting for con1>,
--echo # Before MDEV-34877:
--echo # <con1, S, granted>, <con2, X, waiting for con1>, <con1, X, waiting for con1>
--echo # After MDEV-34877:
--echo # <con1, S, granted>, <con1, X, granted>, <con2, X, waiting for con1>
--echo # Expected: instead of deadlocking, the con1's request should ingore con2's
--connection con1
BEGIN;
SELECT * FROM t1 LOCK IN SHARE MODE;
--connection con2
BEGIN;
SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con2_will_wait';
--send SELECT * FROM t1 FOR UPDATE
--connection con1
SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait';
SELECT * FROM t1 FOR UPDATE;
COMMIT;
--connection con2
--reap
COMMIT;
--echo # The scenario when we bypass X<-S pair:
--echo # <con1, S, granted>,
--echo # <con1, S, granted>, <con2, X, waiting for con1>,
--echo # <con1, S, granted>, <con2, X, waiting for con1>, <con3, S, waiting for con2>
--echo # <con1, S, granted>, <con1, X, granted>, <con2, X, waiting for con1>, <con3, S, waiting for con2>
--connection con1
BEGIN;
SELECT * FROM t1 LOCK IN SHARE MODE;
--connection con2
BEGIN;
SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con2_will_wait';
--send SELECT * FROM t1 FOR UPDATE
--connection con3
SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait';
BEGIN;
SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait';
--send SELECT * FROM t1 LOCK IN SHARE MODE;
--connection con1
SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait';
SELECT * FROM t1 FOR UPDATE;
COMMIT;
--connection con2
--reap
COMMIT;
--connection con3
--reap
COMMIT;
#
--echo # A variant of the above scenario:
--echo # <con1, X REC_NOT_GAP, granted>,
--echo # <con1, X REC_NOT_GAP, granted>, <con2, S, waiting for con1>,
--echo # <con1, X REC_NOT_GAP, granted>, <con2, S, waiting for con1>, <con1, INSERT INTENTION, waiting for con1>
--echo # Expected: a deadlock, as INSERT INTENTION should not overtake locks on gap, to not slice them
--connection con1
BEGIN;
SELECT * FROM t1 WHERE id=1 FOR UPDATE;
--connection con2
BEGIN;
SET DEBUG_SYNC = 'lock_wait_start SIGNAL con2_will_wait';
--send SELECT * FROM t1 LOCK IN SHARE MODE
--connection con1
SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait';
INSERT INTO t1 VALUES (0);
ROLLBACK;
--connection con2
--error ER_LOCK_DEADLOCK
--reap
COMMIT;
--echo # More complicated scenario:
--echo # <con1, S, granted>,
--echo # <con1, S, granted>, <con2, S REC_NOT_GAP, granted>,
--echo # <con1, S, granted>, <con2, S REC_NOT_GAP, granted>, <con3, X, waiting for con1>
--echo # <con1, S, granted>, <con2, S REC_NOT_GAP, granted>, <con3, X, waiting for con1>, <con1, INSERT_INTENTION, waiting for con3>
--echo # <con1, S, granted>, <con3, X, waiting for con1>, <con1, INSERT_INTENTION, waiting for con3>
--echo # Expected: a deadlock, as INSERT INTENTION should not overtake locks on gap, to not slice them
--connection con1
BEGIN;
SELECT * FROM t1 LOCK IN SHARE MODE;
--connection con2
BEGIN;
SELECT * FROM t1 WHERE id=1 LOCK IN SHARE MODE;
--connection con3
SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait';
--send SELECT * FROM t1 FOR UPDATE
--connection con1
SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait';
SET DEBUG_SYNC = 'lock_wait_start SIGNAL con1_will_wait';
--send INSERT INTO t1 VALUES (0)
--connection con2
SET DEBUG_SYNC = 'now WAIT_FOR con1_will_wait';
COMMIT;
--connection con1
--reap
ROLLBACK;
--connection con3
--error ER_LOCK_DEADLOCK
--reap
--echo # More complicated scenario.
--echo # <con1, S, granted>,
--echo # <con1, S, granted>, <con2, S REC_NOT_GAP, granted>,
--echo # <con1, S, granted>, <con2, S REC_NOT_GAP, granted>, <con3, X, waiting for con1>
--echo # <con1, S, granted>, <con2, S REC_NOT_GAP, granted>, <con3, X, waiting for con1>, <con1, X REC_NOT_GAP, waiting for con2>
--echo # Before MDEV-34877:
--echo # <con1, S, granted>, <con3, X, waiting for con1>, <con1, X REC_NOT_GAP, waiting for con3>
--echo # After MDEV-34877:
--echo # <con1, S, granted>, <con1, X REC_NOT_GAP, granted>, <con3, X, waiting for con1>
--connection con1
BEGIN;
SELECT * FROM t1 LOCK IN SHARE MODE;
--connection con2
BEGIN;
SELECT * FROM t1 WHERE id=1 LOCK IN SHARE MODE;
--connection default
--connection con3
SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait';
--send SELECT * FROM t1 FOR UPDATE
--connection con1
SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait';
SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con1_will_wait';
--send SELECT * FROM t1 WHERE id=1 FOR UPDATE
--connection con2
SET DEBUG_SYNC = 'now WAIT_FOR con1_will_wait';
COMMIT;
--connection con1
--reap
COMMIT;
--connection con3
--reap
COMMIT;
--echo # A secenario, where con1 has to bypass two transactions:
--echo # <con1, S, granted>
--echo # <con1, S, granted> <con2, X, waiting>
--echo # <con1, S, granted> <con2, X, waiting> <con3, X, waiting>
--echo # Before MDEV-34877:
--echo # <con1, S, granted> <con2, X, waiting> <con3, X, waiting> <con1, X REC_NOT_GAP, waiting for con2>
--echo # After MDEV-34877:
--echo # <con1, S, granted> <con1, X REC_NOT_GAP, granted> <con2, X, waiting> <con3, X, waiting>
--connection con1
BEGIN;
SELECT * FROM t1 LOCK IN SHARE MODE;
--connection con2
SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con2_will_wait';
--send SELECT * FROM t1 FOR UPDATE
--connection con3
SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait';
SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait';
--send SELECT * FROM t1 FOR UPDATE
--connection con1
SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait';
SELECT * FROM t1 WHERE id=1 FOR UPDATE;
COMMIT;
--connection con2
--reap
COMMIT;
--connection con3
--reap
COMMIT;
--connection default
--disconnect con1
--disconnect con2
--disconnect con3
--disconnect stop_purge
DROP TABLE t1;
--source include/wait_until_count_sessions.inc

View File

@@ -107,10 +107,28 @@ public:
@param element the being-removed element
@param next the next-element pointer in T */
template<typename T>
void remove(T &element, T *T::*next) noexcept
void remove(const T &element, T *T::*next) noexcept
{
remove(search(next, [&element](const T *p){return p==&element;}), next);
}
/** Insert an element after another.
@tparam T type of the element
@param after the element after which to insert
@param insert the being-inserted element
@param next the next-element pointer in T */
template <typename T> void insert_after(T &after, T &insert, T *T::*next)
{
#ifdef UNIV_DEBUG
for (const T *c= static_cast<const T *>(node); c; c= c->*next)
if (c == &after)
goto found;
ut_error;
found:
#endif
insert.*next= after.*next;
after.*next= &insert;
}
};
/** Hash table with singly-linked overflow lists */

View File

@@ -52,6 +52,20 @@ namespace Deadlock
enum report { REPORT_OFF, REPORT_BASIC, REPORT_FULL };
}
/** Conflicting lock info */
struct conflicting_lock_info {
/** Conflicting lock */
const lock_t *conflicting;
/** If some lock was bypassed, points to the lock after which bypassing
lock must be inserted into linked list of locks for the certain cell of
record locks hash table. */
lock_t *insert_after;
/** First bypassed lock */
ut_d(const lock_t *bypassed;)
};
extern const conflicting_lock_info null_c_lock_info;
/*********************************************************************//**
Gets the heap_no of the smallest user record on a page.
@return heap_no of smallest user record, or PAGE_HEAP_NO_SUPREMUM */
@@ -1144,25 +1158,6 @@ struct TMTrxGuard
#endif
};
/*********************************************************************//**
Creates a new record lock and inserts it to the lock queue. Does NOT check
for deadlocks or lock compatibility!
@return created lock */
UNIV_INLINE
lock_t*
lock_rec_create(
/*============*/
lock_t* c_lock, /*!< conflicting lock */
unsigned type_mode,/*!< in: lock mode and wait flag */
const buf_block_t* block, /*!< in: buffer block containing
the record */
ulint heap_no,/*!< in: heap number of the record */
dict_index_t* index, /*!< in: index of record */
trx_t* trx, /*!< in,out: transaction */
bool caller_owns_trx_mutex);
/*!< in: true if caller owns
trx mutex */
/** Remove a record lock request, waiting or granted, on a discarded page
@param in_lock lock object
@param cell hash table cell containing in_lock */
@@ -1170,7 +1165,7 @@ void lock_rec_discard(lock_t *in_lock, hash_cell_t &cell) noexcept;
/** Create a new record lock and inserts it to the lock queue,
without checking for deadlocks or conflicts.
@param[in] c_lock conflicting lock, or NULL
@param c_lock_info conflicting lock info
@param[in] type_mode lock mode and wait flag
@param[in] page_id index page number
@param[in] page R-tree index page, or NULL
@@ -1180,8 +1175,8 @@ without checking for deadlocks or conflicts.
@param[in] holds_trx_mutex whether the caller holds trx->mutex
@return created lock */
lock_t*
lock_rec_create_low(
lock_t* c_lock,
lock_rec_create(
const conflicting_lock_info &c_lock_info,
unsigned type_mode,
const page_id_t page_id,
const page_t* page,
@@ -1192,7 +1187,7 @@ lock_rec_create_low(
/** Enqueue a waiting request for a lock which cannot be granted immediately.
Check for deadlocks.
@param[in] c_lock conflicting lock
@param c_lock_info conflicting lock info
@param[in] type_mode the requested lock mode (LOCK_S or LOCK_X)
possibly ORed with LOCK_GAP or
LOCK_REC_NOT_GAP, ORed with
@@ -1210,7 +1205,7 @@ Check for deadlocks.
@retval DB_DEADLOCK if this transaction was chosen as the victim */
dberr_t
lock_rec_enqueue_waiting(
lock_t* c_lock,
const conflicting_lock_info &c_lock_info,
unsigned type_mode,
const page_id_t id,
const page_t* page,

View File

@@ -51,28 +51,3 @@ lock_get_min_heap_no(
FALSE)));
}
}
/*********************************************************************//**
Creates a new record lock and inserts it to the lock queue. Does NOT check
for deadlocks or lock compatibility!
@return created lock */
UNIV_INLINE
lock_t*
lock_rec_create(
/*============*/
lock_t* c_lock, /*!< conflicting lock */
unsigned type_mode,/*!< in: lock mode and wait flag */
const buf_block_t* block, /*!< in: buffer block containing
the record */
ulint heap_no,/*!< in: heap number of the record */
dict_index_t* index, /*!< in: index of record */
trx_t* trx, /*!< in,out: transaction */
bool caller_owns_trx_mutex)
/*!< in: TRUE if caller owns
trx mutex */
{
return lock_rec_create_low(
c_lock,
type_mode, block->page.id(), block->page.frame, heap_no,
index, trx, caller_owns_trx_mutex);
}

View File

@@ -497,14 +497,11 @@ inline byte lock_rec_reset_nth_bit(lock_t* lock, ulint i)
return(bit);
}
/*********************************************************************//**
Gets the first or next record lock on a page.
/** Gets the first or next record lock on a page.
@param lock a record lock
@return next lock, NULL if none exists */
UNIV_INLINE
lock_t*
lock_rec_get_next_on_page(
/*======================*/
lock_t* lock); /*!< in: a record lock */
lock_t *lock_rec_get_next_on_page(const lock_t *lock);
/*********************************************************************//**
Gets the next explicit lock request on a record.

View File

@@ -101,14 +101,11 @@ lock_rec_set_nth_bit(
lock->trx->lock.set_nth_bit_calls++;
}
/*********************************************************************//**
Gets the first or next record lock on a page.
/** Gets the first or next record lock on a page.
@param lock a record lock
@return next lock, NULL if none exists */
UNIV_INLINE
lock_t*
lock_rec_get_next_on_page(
/*======================*/
lock_t* lock) /*!< in: a record lock */
lock_t *lock_rec_get_next_on_page(const lock_t *lock)
{
return const_cast<lock_t*>(lock_rec_get_next_on_page_const(lock));
}
@@ -167,14 +164,11 @@ lock_rec_get_nth_bit(
return(1 & *b >> (i % 8));
}
/*********************************************************************//**
Gets the first or next record lock on a page.
/** Gets the first or next record lock on a page.
@param lock a record lock
@return next lock, NULL if none exists */
UNIV_INLINE
const lock_t*
lock_rec_get_next_on_page_const(
/*============================*/
const lock_t* lock) /*!< in: a record lock */
const lock_t *lock_rec_get_next_on_page_const(const lock_t *lock)
{
ut_ad(!lock->is_table());

View File

@@ -232,11 +232,43 @@ struct ib_lock_t
return(static_cast<enum lock_mode>(type_mode & LOCK_MODE_MASK));
}
bool is_rec_granted_exclusive_not_gap() const
static bool is_rec_exclusive_not_gap(unsigned type_mode)
{
ut_ad(!(type_mode & LOCK_TABLE));
return (type_mode & (LOCK_MODE_MASK | LOCK_GAP)) == LOCK_X;
}
bool is_rec_exclusive_not_gap() const
{
return is_rec_exclusive_not_gap(type_mode);
}
bool is_waiting_not_gap() const
{
return (type_mode & (LOCK_WAIT | LOCK_GAP)) == LOCK_WAIT;
}
/** Checks if a lock can be bypassed.
@param has_s_lock_or_stronger if caller's transaction already holds
not gap and not insert intention S-lock
or stronger for the same heap_no as the
current lock
@return true if the lock can be bypassed, false otherwise */
bool can_be_bypassed(bool has_s_lock_or_stronger) const noexcept
{
ut_ad(!is_table());
/* We don't neet do check supremum bit in the lock's bitmap here,
because the function is always called after checking for
bypass_mode, which already contains check for supremum. */
ut_ad(!is_insert_intention() || is_gap());
/* We don't need to check
trx->lock.wait_trx == blocking_trx && mode() == LOCK_X
condition here because there can be the following case:
S1 X2(waits for S1) S3(waits for X2),
bypassing X1 must not conflict with S3. */
return has_s_lock_or_stronger && is_waiting_not_gap();
}
/** Print the lock object into the given output stream.
@param[in,out] out the output stream
@return the given output stream. */

View File

@@ -56,6 +56,8 @@ Created 5/7/1996 Heikki Tuuri
#include <mysql/service_wsrep.h>
#endif /* WITH_WSREP */
const conflicting_lock_info null_c_lock_info{nullptr, nullptr, ut_d(nullptr)};
/** The value of innodb_deadlock_detect */
my_bool innodb_deadlock_detect;
/** The value of innodb_deadlock_report */
@@ -1159,12 +1161,23 @@ func_exit:
lock= lock_rec_get_next(heap_no, lock);
do
{
/* TODO: Conflicting locks can be only before the waiting lock,
consider the following optimization:
if (lock == wait_lock)
break; */
/* This is similar case as above except here we have
record-locks instead of table locks. See details
from comment above.
*/
if (lock->trx->mysql_thd && wsrep_will_BF_abort(lock, trx))
{
/* There can't be bypassed locks because:
1. The transaction can't be blocked by lock to bypass because
lock_rec_other_has_conflicting() does not treat such lock as
conflicting.
2. The lock is placed before bypassed lock in
lock_rec_create_low().
TODO: add debug check here */
victims.emplace(lock->trx);
}
} while ((lock= lock_rec_get_next(heap_no, lock)));
@@ -1200,8 +1213,22 @@ func_exit:
}
#endif /* WITH_WSREP */
/*********************************************************************//**
Checks if some other transaction has a conflicting explicit lock request
static inline bool lock_rec_can_be_bypassing(const trx_t *trx,
const lock_t *lock)
{
ut_ad(!lock->is_insert_intention() || lock->is_gap());
static_assert(int{LOCK_S} == 2, "");
static_assert(int{LOCK_X} == 3, "");
/* The below is an optimization of the following:
return lock->trx == trx && !(lock->type_mode & (LOCK_WAIT | LOCK_GAP)) &&
lock_mode_stronger_or_eq(lock->mode(), LOCK_S);
The bitwise & with LOCK_MODE_MASK - 1 will map both LOCK_X and LOCK_S to
LOCK_S, which we are comparing to. */
return lock->trx == trx &&
(lock->type_mode & (LOCK_WAIT | LOCK_GAP | (LOCK_MODE_MASK - 1))) == LOCK_S;
}
/** Checks if some other transaction has a conflicting explicit lock request
in the queue, so that we have to wait.
@param[in] mode LOCK_S or LOCK_X, possibly ORed to LOCK_GAP or LOC_REC_NOT_GAP,
LOCK_INSERT_INTENTION
@@ -1209,23 +1236,48 @@ LOCK_INSERT_INTENTION
@param[in] id page identifier
@param[in] heap_no heap number of the record
@param[in] trx our transaction
@return conflicting lock and the flag which indicated if conflicting locks
which wait for the current transaction were ignored */
static lock_t *lock_rec_other_has_conflicting(unsigned mode,
const hash_cell_t &cell,
const page_id_t id,
ulint heap_no, const trx_t *trx)
@return conflicting lock, lock after which new lock should be inserted
in lock queue in the case when the conflicting lock must be bypassed and
bypassed lock */
static conflicting_lock_info
lock_rec_other_has_conflicting(unsigned mode, const hash_cell_t &cell,
const page_id_t id, ulint heap_no,
const trx_t *trx) noexcept
{
bool is_supremum = (heap_no == PAGE_HEAP_NO_SUPREMUM);
const bool is_supremum= (heap_no == PAGE_HEAP_NO_SUPREMUM);
ut_ad(!(mode & LOCK_INSERT_INTENTION) || (mode & LOCK_GAP) || is_supremum);
const bool bypass_mode=
!is_supremum && lock_t::is_rec_exclusive_not_gap(mode);
bool has_s_lock_or_stronger= false;
const lock_t *insert_after= nullptr;
ut_d(const lock_t *bypassed= nullptr;)
const lock_t *prev_lock= nullptr;
for (lock_t* lock = lock_sys_t::get_first(cell, id, heap_no);
lock; lock = lock_rec_get_next(heap_no, lock)) {
if (lock_rec_has_to_wait(trx, mode, lock, is_supremum)) {
return(lock);
for (lock_t *lock= lock_sys_t::get_first(cell, id, heap_no); lock;
lock= lock_rec_get_next(heap_no, lock))
{
if (bypass_mode && lock_rec_can_be_bypassing(trx, lock))
{
has_s_lock_or_stronger= true;
}
else if (lock_rec_has_to_wait(trx, mode, lock, is_supremum))
{
if (!bypass_mode || !lock->can_be_bypassed(has_s_lock_or_stronger))
return {lock, nullptr, ut_d(nullptr)};
/* Store the first lock to bypass to invoke
lock_rec_find_similar_on_page() only for the locks which precede all
bypassed locks. */
ut_d(if (!bypassed) bypassed= lock;)
/* There can be several locks to bypass, insert bypassing lock just
before the first bypassed lock. */
if (!insert_after)
insert_after= prev_lock;
continue;
}
prev_lock= lock;
}
return(NULL);
return {nullptr, const_cast<lock_t *>(insert_after), ut_d(bypassed)};
}
/*********************************************************************//**
@@ -1294,6 +1346,69 @@ lock_number_of_tables_locked(
/*============== RECORD LOCK CREATION AND QUEUE MANAGEMENT =============*/
#ifdef UNIV_DEBUG
/** Validates the correctness of locks bypassing in lock queue on a single
record, i.e. there must not be the following sequence:
(trx1 S) (trx2 X) (trx3 X) (trx1 X)
If bypassing works correctly, where must be the following sequence instead of
the above:
(trx1 S) (trx1 X) (trx2 X) (trx3 X)
Note the above locks are record or next-key locks.
If wrong sequence is found, the function will crash with failed assertion.
@param checked_lock the lock up to which the queue to check
@param heap_no heap_no of the queue to check */
static void lock_rec_queue_validate_bypass(const lock_t *checked_lock,
ulint heap_no)
{
/* "do_lock_reverse_page_reorganize" causes lock queue reversing during page
reorganizing, which causes validation failure. Skip the validation for such
case. */
DBUG_EXECUTE_IF("do_lock_reverse_page_reorganize", return;);
if (!checked_lock || checked_lock->is_waiting())
return;
page_id_t page_id= checked_lock->un_member.rec_lock.page_id;
hash_cell_t *cell= lock_sys.rec_hash.cell_get(page_id.fold());
auto mode= checked_lock->type_mode;
const trx_t *trx= checked_lock->trx;
const bool is_supremum= (heap_no == PAGE_HEAP_NO_SUPREMUM);
ut_ad(!(mode & LOCK_INSERT_INTENTION) || (mode & LOCK_GAP) || is_supremum);
if (is_supremum || !lock_t::is_rec_exclusive_not_gap(mode))
return;
const lock_t *has_s_lock_or_stronger= nullptr;
const lock_t *bypassed= nullptr;
for (lock_t *lock= lock_sys_t::get_first(*cell, page_id, heap_no); lock;
lock= lock_rec_get_next(heap_no, lock))
{
if (lock_rec_can_be_bypassing(trx, lock))
{
ut_ad(!bypassed || lock != checked_lock);
has_s_lock_or_stronger= lock;
continue;
}
if (lock_rec_has_to_wait(trx, mode, lock, is_supremum))
{
if (!lock->can_be_bypassed(has_s_lock_or_stronger))
return;
bypassed= lock;
}
ut_ad(lock != checked_lock || !bypassed);
if (lock == checked_lock)
return;
}
}
/** Validates the correctness of locks bypassing in lock queue for each set bit
in the lock bitmap. If wrong sequence is found, the function will crash with
failed assertion.
@param lock the lock which bitmap to be checked */
static void lock_rec_queue_validate_bypass(const lock_t *lock) {
for (ulint i= 0; i < lock_rec_get_n_bits(lock); ++i)
if (lock_rec_get_nth_bit(lock, i))
lock_rec_queue_validate_bypass(lock, i);
}
#endif
/** Reset the wait status of a lock.
@param[in,out] lock lock that was possibly being waited for */
static void lock_reset_lock_and_trx_wait(lock_t *lock)
@@ -1308,6 +1423,10 @@ static void lock_reset_lock_and_trx_wait(lock_t *lock)
trx->lock.wait_lock= nullptr;
trx->lock.wait_trx= nullptr;
lock->type_mode&= ~LOCK_WAIT;
#ifdef UNIV_DEBUG
if (!lock->is_table())
lock_rec_queue_validate_bypass(lock);
#endif
}
#ifdef UNIV_DEBUG
@@ -1325,7 +1444,7 @@ static void check_trx_state(const trx_t *trx)
/** Create a new record lock and inserts it to the lock queue,
without checking for deadlocks or conflicts.
@param[in] c_lock conflicting lock
@param[in] c_lock_info conflicting lock info
@param[in] type_mode lock mode and wait flag
@param[in] page_id index page number
@param[in] page R-tree index page, or NULL
@@ -1335,8 +1454,8 @@ without checking for deadlocks or conflicts.
@param[in] holds_trx_mutex whether the caller holds trx->mutex
@return created lock */
lock_t*
lock_rec_create_low(
lock_t* c_lock,
lock_rec_create(
const conflicting_lock_info &c_lock_info,
unsigned type_mode,
const page_id_t page_id,
const page_t* page,
@@ -1354,6 +1473,8 @@ lock_rec_create_low(
ut_ad(!(type_mode & LOCK_TABLE));
ut_ad(trx->state != TRX_STATE_NOT_STARTED);
ut_ad(!trx->is_autocommit_non_locking());
ut_ad(c_lock_info.insert_after ? !(type_mode & LOCK_WAIT) :
!c_lock_info.bypassed);
/* If rec is the supremum record, then we reset the gap and
LOCK_REC_NOT_GAP bits, as all locks on the supremum are
@@ -1431,16 +1552,23 @@ lock_rec_create_low(
ut_ad(index->table->get_ref_count() || !index->table->can_be_evicted);
const auto lock_hash = &lock_sys.hash_get(type_mode);
lock_hash->cell_get(page_id.fold())->append(*lock, &lock_t::hash);
hash_cell_t& cell = *lock_hash->cell_get(page_id.fold());
if (UNIV_LIKELY(!c_lock_info.insert_after))
cell.append(*lock, &lock_t::hash);
else
cell.insert_after(*c_lock_info.insert_after, *lock,
&lock_t::hash);
if (type_mode & LOCK_WAIT) {
if (trx->lock.wait_trx) {
ut_ad(!c_lock || trx->lock.wait_trx == c_lock->trx);
ut_ad(!c_lock_info.conflicting
|| trx->lock.wait_trx
== c_lock_info.conflicting->trx);
ut_ad(trx->lock.wait_lock);
ut_ad((*trx->lock.wait_lock).trx == trx);
} else {
ut_ad(c_lock);
trx->lock.wait_trx = c_lock->trx;
ut_ad(c_lock_info.conflicting);
trx->lock.wait_trx = c_lock_info.conflicting->trx;
ut_ad(!trx->lock.wait_lock);
}
trx->lock.wait_lock = lock;
@@ -1451,12 +1579,13 @@ lock_rec_create_low(
}
MONITOR_INC(MONITOR_RECLOCK_CREATED);
MONITOR_INC(MONITOR_NUM_RECLOCK);
ut_d(lock_rec_queue_validate_bypass(lock, heap_no));
return lock;
}
/** Enqueue a waiting request for a lock which cannot be granted immediately.
Check for deadlocks.
@param c_lock_info conflicting lock info
@param[in] type_mode the requested lock mode (LOCK_S or LOCK_X)
possibly ORed with LOCK_GAP or
LOCK_REC_NOT_GAP, ORed with
@@ -1474,7 +1603,7 @@ Check for deadlocks.
@retval DB_DEADLOCK if this transaction was chosen as the victim */
dberr_t
lock_rec_enqueue_waiting(
lock_t* c_lock,
const conflicting_lock_info &c_lock_info,
unsigned type_mode,
const page_id_t id,
const page_t* page,
@@ -1506,8 +1635,8 @@ lock_rec_enqueue_waiting(
/* Enqueue the lock request that will wait to be granted, note that
we already own the trx mutex. */
lock_t* lock = lock_rec_create_low(
c_lock,
lock_t* lock = lock_rec_create(
c_lock_info,
type_mode | LOCK_WAIT, id, page, heap_no, index, trx, true);
if (prdt && type_mode & LOCK_PREDICATE) {
@@ -1525,18 +1654,20 @@ lock_rec_enqueue_waiting(
return DB_LOCK_WAIT;
}
/*********************************************************************//**
Looks for a suitable type record lock struct by the same trx on the same page.
This can be used to save space when a new record lock should be set on a page:
no new struct is needed, if a suitable old is found.
/** Looks for a suitable type record lock struct by the same trx on the same
page. This can be used to save space when a new record lock should be set on a
page: no new struct is needed, if a suitable old is found.
@param type_mode lock type_mode field
@param heap_no heap number of the record
@param lock lock_sys.get_first()
@param last_lock the lock up to which to find
@param trx the transaction which lock we are looking for
@return lock or NULL */
static inline
lock_t*
lock_rec_find_similar_on_page(
ulint type_mode, /*!< in: lock type_mode field */
ulint heap_no, /*!< in: heap number of the record */
lock_t* lock, /*!< in: lock_sys.get_first() */
const trx_t* trx) /*!< in: transaction */
static inline lock_t *lock_rec_find_similar_on_page(ulint type_mode,
ulint heap_no,
const lock_t *lock,
const lock_t *last_lock,
const trx_t *trx)
{
lock_sys.rec_hash.assert_locked(lock->un_member.rec_lock.page_id);
DBUG_EXECUTE_IF("innodb_skip_lock_bitmap", {
@@ -1546,14 +1677,14 @@ lock_rec_find_similar_on_page(
});
for (/* No op */;
lock != NULL;
lock != last_lock;
lock = lock_rec_get_next_on_page(lock)) {
if (lock->trx == trx
&& lock->type_mode == type_mode
&& lock_rec_get_n_bits(lock) > heap_no) {
return(lock);
return const_cast<lock_t *>(lock);
}
}
@@ -1576,7 +1707,8 @@ which does NOT check for deadlocks or lock compatibility!
@param[in,out] trx transaction
@param[in] caller_owns_trx_mutex TRUE if caller owns the transaction mutex */
TRANSACTIONAL_TARGET
static void lock_rec_add_to_queue(unsigned type_mode, const hash_cell_t &cell,
static void lock_rec_add_to_queue(const conflicting_lock_info &c_lock_info,
unsigned type_mode, const hash_cell_t &cell,
const page_id_t id, const page_t *page,
ulint heap_no, dict_index_t *index,
trx_t *trx, bool caller_owns_trx_mutex)
@@ -1623,8 +1755,8 @@ static void lock_rec_add_to_queue(unsigned type_mode, const hash_cell_t &cell,
all locks on the supremum are automatically of the gap type, and we
try to avoid unnecessary memory consumption of a new record lock
struct for a gap type lock */
if (heap_no == PAGE_HEAP_NO_SUPREMUM) {
const bool is_supremum = heap_no == PAGE_HEAP_NO_SUPREMUM;
if (is_supremum) {
ut_ad(!(type_mode & LOCK_REC_NOT_GAP));
/* There should never be LOCK_REC_NOT_GAP on a supremum
@@ -1634,23 +1766,47 @@ static void lock_rec_add_to_queue(unsigned type_mode, const hash_cell_t &cell,
}
if (type_mode & LOCK_WAIT) {
goto create;
} else if (lock_t *first_lock = lock_sys_t::get_first(cell, id)) {
ut_ad(!(type_mode & LOCK_INSERT_INTENTION)
|| (type_mode & LOCK_GAP) || is_supremum);
const bool bypass_mode = !is_supremum
&& lock_t::is_rec_exclusive_not_gap(type_mode);
bool has_s_lock_or_stronger = false;
for (lock_t* lock = first_lock;;) {
if (lock->is_waiting()
&& lock_rec_get_nth_bit(lock, heap_no)) {
goto create;
if (!lock_rec_get_nth_bit(lock, heap_no))
goto cont;
ut_ad(!lock->is_insert_intention() || lock->is_gap()
|| is_supremum);
if (bypass_mode && lock_rec_can_be_bypassing(trx, lock))
{
has_s_lock_or_stronger= true;
}
/* There can be several locks suited for bypassing,
skip them all, the below condition is optimization of
lock->is_waiting()
&& (!bypass_mode || !lock->can_be_bypassed(
has_s_lock_or_stronger))
so we don't have to check lock's 'waiting' flag twice.*/
else if (lock->is_waiting()
&& (!bypass_mode || !has_s_lock_or_stronger
|| !lock->is_gap()))
goto create;
cont:
if (!(lock = lock_rec_get_next_on_page(lock))) {
break;
}
}
const lock_t *bypassed = c_lock_info.insert_after
? lock_rec_get_next(heap_no, c_lock_info.insert_after)
: nullptr;
ut_ad(bypassed == c_lock_info.bypassed);
/* Look for a similar record lock on the same page:
if one is found and there are no waiting lock requests,
we can just set the bit */
if (lock_t* lock = lock_rec_find_similar_on_page(
type_mode, heap_no, first_lock, trx)) {
type_mode, heap_no, first_lock,
bypassed, trx)) {
trx_t* lock_trx = lock->trx;
if (caller_owns_trx_mutex) {
trx->mutex_unlock();
@@ -1663,6 +1819,7 @@ static void lock_rec_add_to_queue(unsigned type_mode, const hash_cell_t &cell,
if (caller_owns_trx_mutex) {
trx->mutex_lock();
}
ut_d(lock_rec_queue_validate_bypass(lock));
return;
}
}
@@ -1672,7 +1829,7 @@ create:
because we should be moving an existing waiting lock request. */
ut_ad(!(type_mode & LOCK_WAIT) || trx->lock.wait_trx);
lock_rec_create_low(nullptr,
lock_rec_create(c_lock_info,
type_mode, id, page, heap_no, index, trx,
caller_owns_trx_mutex);
}
@@ -1713,12 +1870,13 @@ static void lock_reuse_for_next_key_lock(const lock_t *held_lock,
that GAP Locks do not conflict with anything. Therefore a GAP Lock
could be granted to us right now if we've requested: */
mode|= LOCK_GAP;
ut_ad(nullptr ==
lock_rec_other_has_conflicting(mode, cell, id, heap_no, trx));
ut_ad(nullptr == lock_rec_other_has_conflicting(mode, cell, id, heap_no, trx)
.conflicting);
/* It might be the case we already have one, so we first check that. */
if (lock_rec_has_expl(mode, cell, id, heap_no, trx) == nullptr)
lock_rec_add_to_queue(mode, cell, id, page, heap_no, index, trx, true);
lock_rec_add_to_queue(null_c_lock_info, mode, cell, id, page, heap_no,
index, trx, true);
}
@@ -1806,21 +1964,26 @@ lock_rec_lock(
/* Do nothing if the trx already has a strong enough lock on rec */
if (!held_lock)
{
if (lock_t *c_lock= lock_rec_other_has_conflicting(mode, g.cell(), id,
heap_no, trx))
conflicting_lock_info c_lock_info=
lock_rec_other_has_conflicting(mode, g.cell(), id, heap_no, trx);
if (c_lock_info.conflicting)
/*
If another transaction has a non-gap conflicting
request in the queue, as this transaction does not
have a lock strong enough already granted on the
record, we have to wait.
*/
err= lock_rec_enqueue_waiting(c_lock, mode, id, block->page.frame,
heap_no, index, thr, nullptr);
else if (!impl)
err= lock_rec_enqueue_waiting(c_lock_info, mode, id,
block->page.frame, heap_no, index, thr,
nullptr);
/* If some lock was bypassed, we need to create explicit lock to avoid
conflicting lock search on every try to convert implicit to explicit
lock. */
else if (!impl || c_lock_info.insert_after)
{
/* Set the requested lock on the record. */
lock_rec_add_to_queue(mode, g.cell(), id, block->page.frame, heap_no,
index, trx, true);
lock_rec_add_to_queue(c_lock_info, mode, g.cell(), id,
block->page.frame, heap_no, index, trx, true);
err= DB_SUCCESS_LOCKED_REC;
}
}
@@ -1853,20 +2016,22 @@ lock_rec_lock(
/* Simplified and faster path for the most common cases */
if (!impl)
lock_rec_create_low(nullptr, mode, id, block->page.frame, heap_no, index,
trx, false);
lock_rec_create(null_c_lock_info, mode, id, block->page.frame, heap_no,
index, trx, false);
return DB_SUCCESS_LOCKED_REC;
}
/*********************************************************************//**
Checks if a waiting record lock request still has to wait in a queue.
@return lock that is causing the wait */
static
const lock_t*
/** Checks if a waiting record lock request still has to wait in a queue.
@param cell record locks hash table cell for waiting lock
@param wait_lock waiting lock
@return lock that is causing the wait, lock after which new lock should be
inserted in lock queue in the case when the lock that is causing the wait must
be bypassed and bypassed lock itself */
static conflicting_lock_info
lock_rec_has_to_wait_in_queue(const hash_cell_t &cell, const lock_t *wait_lock)
{
const lock_t* lock;
const lock_t *lock;
ulint heap_no;
ulint bit_mask;
ulint bit_offset;
@@ -1874,25 +2039,51 @@ lock_rec_has_to_wait_in_queue(const hash_cell_t &cell, const lock_t *wait_lock)
ut_ad(wait_lock->is_waiting());
ut_ad(!wait_lock->is_table());
heap_no = lock_rec_find_set_bit(wait_lock);
heap_no= lock_rec_find_set_bit(wait_lock);
const bool is_supremum= (heap_no == PAGE_HEAP_NO_SUPREMUM);
ut_ad(!(wait_lock->is_insert_intention()) ||
(wait_lock->is_gap()) || is_supremum);
const bool bypass_mode=
!is_supremum && wait_lock->is_rec_exclusive_not_gap();
bool has_s_lock_or_stronger= false;
const lock_t *insert_after= nullptr;
ut_d(const lock_t *bypassed= nullptr);
bit_offset = heap_no / 8;
bit_mask = static_cast<ulint>(1) << (heap_no % 8);
bit_offset= heap_no / 8;
bit_mask= static_cast<ulint>(1) << (heap_no % 8);
for (lock = lock_sys_t::get_first(
cell, wait_lock->un_member.rec_lock.page_id);
lock != wait_lock;
lock = lock_rec_get_next_on_page_const(lock)) {
const byte* p = (const byte*) &lock[1];
if (heap_no < lock_rec_get_n_bits(lock)
&& (p[bit_offset] & bit_mask)
&& lock_has_to_wait(wait_lock, lock)) {
return(lock);
const trx_t *trx= wait_lock->trx;
const lock_t *prev_lock= nullptr;
/* We can't use lock_sys_t::get_first(cell, id, heap_no) here as in
lock_rec_other_has_conflicting() because we iterate locks only till
wait_lock */
for (lock=
lock_sys_t::get_first(cell, wait_lock->un_member.rec_lock.page_id);
lock != wait_lock; lock= lock_rec_get_next_on_page_const(lock))
{
const byte *p= (const byte *) &lock[1];
if (heap_no >= lock_rec_get_n_bits(lock) || !(p[bit_offset] & bit_mask))
continue;
if (bypass_mode && lock_rec_can_be_bypassing(trx, lock))
{
has_s_lock_or_stronger= true;
}
else if (lock_has_to_wait(wait_lock, lock))
{
if (!bypass_mode || !lock->can_be_bypassed(has_s_lock_or_stronger))
return {lock, nullptr, ut_d(nullptr)};
/* Store only the first lock to bypass. */
ut_d(if (!bypassed)
bypassed= lock;)
/* There can be several locks to bypass, insert bypassing lock just
before the first bypassed lock. */
if (!insert_after)
insert_after= prev_lock;
continue;
}
return(NULL);
prev_lock= lock;
}
return {nullptr, const_cast<lock_t *>(insert_after), ut_d(bypassed)};
}
/** Note that a record lock wait started */
@@ -2375,13 +2566,15 @@ static void lock_rec_dequeue_from_page(lock_t *in_lock, bool owns_wait_mutex)
grant locks if there are no conflicting locks ahead. Stop at
the first X lock that is waiting or has been granted. */
for (lock_t* lock = lock_sys_t::get_first(cell, page_id);
lock != NULL;
lock = lock_rec_get_next_on_page(lock)) {
if (!lock->is_waiting()) {
for (lock_t* lock = lock_sys_t::get_first(cell, page_id), *next;
lock != NULL; lock= next) {
/* Store pointer to the next element, because if some lock is
bypassed, the pointer to the next lock in the current lock
object will be changed, as the current lock will change
its position in lock queue. */
next= lock_rec_get_next_on_page(lock);
if (!lock->is_waiting())
continue;
}
if (!owns_wait_mutex) {
mysql_mutex_lock(&lock_sys.wait_mutex);
@@ -2390,10 +2583,10 @@ static void lock_rec_dequeue_from_page(lock_t *in_lock, bool owns_wait_mutex)
ut_ad(lock->trx->lock.wait_trx);
ut_ad(lock->trx->lock.wait_lock);
if (const lock_t* c = lock_rec_has_to_wait_in_queue(
cell, lock)) {
trx_t* c_trx = c->trx;
conflicting_lock_info c_lock_info=
lock_rec_has_to_wait_in_queue(cell, lock);
if (c_lock_info.conflicting) {
trx_t* c_trx = c_lock_info.conflicting->trx;
lock->trx->lock.wait_trx = c_trx;
if (c_trx->lock.wait_trx
&& innodb_deadlock_detect
@@ -2401,6 +2594,12 @@ static void lock_rec_dequeue_from_page(lock_t *in_lock, bool owns_wait_mutex)
Deadlock::to_be_checked = true;
}
} else {
if (UNIV_UNLIKELY(c_lock_info.insert_after != nullptr))
{
cell.remove(*lock, &lock_t::hash);
cell.insert_after(*c_lock_info.insert_after,
*lock, &lock_t::hash);
}
/* Grant the lock */
ut_ad(lock->trx != in_lock->trx);
lock_grant(lock);
@@ -2551,9 +2750,9 @@ lock_rec_inherit_to_gap(hash_cell_t &heir_cell, const page_id_t heir,
((!from_split || !lock->is_record_not_gap()) &&
lock->mode() != (lock_trx->duplicates ? LOCK_S : LOCK_X))))
{
lock_rec_add_to_queue(LOCK_GAP | lock->mode(), heir_cell, heir,
heir_page, heir_heap_no, lock->index, lock_trx,
false);
lock_rec_add_to_queue(null_c_lock_info, LOCK_GAP | lock->mode(),
heir_cell, heir, heir_page, heir_heap_no,
lock->index, lock_trx, false);
}
}
}
@@ -2583,7 +2782,7 @@ lock_rec_inherit_to_gap_if_gap_lock(
!lock->is_insert_intention() && (heap_no == PAGE_HEAP_NO_SUPREMUM ||
!lock->is_record_not_gap()) &&
!lock_table_has(lock->trx, lock->index->table, LOCK_X))
lock_rec_add_to_queue(LOCK_GAP | lock->mode(),
lock_rec_add_to_queue(null_c_lock_info, LOCK_GAP | lock->mode(),
g.cell(), id, block->page.frame,
heir_heap_no, lock->index, lock->trx, false);
}
@@ -2629,15 +2828,19 @@ lock_rec_move(
/* Note that we FIRST reset the bit, and then set the lock:
the function works also if donator_id == receiver_id */
lock_rec_add_to_queue(type_mode, receiver_cell,
receiver_id, receiver.page.frame,
receiver_heap_no,
lock_rec_add_to_queue(null_c_lock_info, type_mode,
receiver_cell, receiver_id,
receiver.page.frame, receiver_heap_no,
lock->index, lock_trx, true);
lock_trx->mutex_unlock();
}
ut_ad(!lock_sys_t::get_first(donator_cell, donator_id,
donator_heap_no));
ut_d(lock_rec_queue_validate_bypass(lock_sys_t::get_first(
receiver_cell,
receiver_id, receiver_heap_no),
receiver_heap_no));
}
/** Move all the granted locks to the front of the given lock list.
@@ -2796,8 +2999,9 @@ lock_move_reorganize_page(
/* NOTE that the old lock bitmap could be too
small for the new heap number! */
lock_rec_add_to_queue(lock->type_mode, cell, id, block->page.frame,
new_heap_no, lock->index, lock_trx, true);
lock_rec_add_to_queue(null_c_lock_info, lock->type_mode, cell, id,
block->page.frame, new_heap_no, lock->index,
lock_trx, true);
}
lock_trx->mutex_unlock();
@@ -2939,9 +3143,9 @@ lock_move_rec_list_end(
lock->type_mode&= ~LOCK_WAIT;
}
lock_rec_add_to_queue(type_mode, g.cell2(), new_id,
new_page,
rec2_heap_no, lock->index, lock_trx, true);
lock_rec_add_to_queue(null_c_lock_info, type_mode, g.cell2(), new_id,
new_page, rec2_heap_no, lock->index, lock_trx,
true);
}
lock_trx->mutex_unlock();
@@ -3062,7 +3266,7 @@ lock_move_rec_list_start(
lock->type_mode&= ~LOCK_WAIT;
}
lock_rec_add_to_queue(type_mode, g.cell2(), new_id,
lock_rec_add_to_queue(null_c_lock_info, type_mode, g.cell2(), new_id,
new_block->page.frame,
rec2_heap_no, lock->index, lock_trx, true);
}
@@ -3156,7 +3360,7 @@ lock_rtr_move_rec_list(
lock->type_mode&= ~LOCK_WAIT;
}
lock_rec_add_to_queue(type_mode, g.cell2(), new_id,
lock_rec_add_to_queue(null_c_lock_info, type_mode, g.cell2(), new_id,
new_block->page.frame,
rec2_heap_no, lock->index, lock_trx, true);
@@ -4253,19 +4457,30 @@ static void lock_rec_rebuild_waiting_queue(
{
lock_sys.assert_locked(cell);
for (lock_t *lock= first_lock; lock != NULL;
lock= lock_rec_get_next(heap_no, lock))
for (lock_t *lock= first_lock, *next; lock != NULL; lock= next)
{
/* Store pointer to the next element, because if some lock is
bypassed, the pointer to the next lock in the current lock
object will be changed, as the current lock will change
its position in lock queue. */
next= lock_rec_get_next(heap_no, lock);
if (!lock->is_waiting())
continue;
mysql_mutex_lock(&lock_sys.wait_mutex);
ut_ad(lock->trx->lock.wait_trx);
ut_ad(lock->trx->lock.wait_lock);
if (const lock_t *c= lock_rec_has_to_wait_in_queue(cell, lock))
lock->trx->lock.wait_trx= c->trx;
conflicting_lock_info c_lock_info=
lock_rec_has_to_wait_in_queue(cell, lock);
if (c_lock_info.conflicting)
lock->trx->lock.wait_trx= c_lock_info.conflicting->trx;
else
{
if (c_lock_info.insert_after)
{
cell.remove(*lock, &lock_t::hash);
cell.insert_after(*c_lock_info.insert_after, *lock, &lock_t::hash);
}
/* Grant the lock */
ut_ad(trx != lock->trx);
lock_grant(lock);
@@ -4696,8 +4911,10 @@ reiterate:
{
ut_ad(!lock->index->table->is_temporary());
bool supremum_bit= lock_rec_get_nth_bit(lock, PAGE_HEAP_NO_SUPREMUM);
/* if XA is being prepared, it must not own waiting locks */
ut_ad(!lock->is_waiting());
bool rec_granted_exclusive_not_gap=
lock->is_rec_granted_exclusive_not_gap();
lock->is_rec_exclusive_not_gap();
if (UNIV_UNLIKELY(lock->type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))
continue; /* SPATIAL INDEX locking is broken. */
const auto fold = lock->un_member.rec_lock.page_id.fold();
@@ -4870,7 +5087,9 @@ reiterate:
if (!lock->is_table())
{
ut_ad(!lock->index->table->is_temporary());
if (!lock->is_rec_granted_exclusive_not_gap())
/* if XA is being prepared, it must not own waiting locks */
ut_ad(!lock->is_waiting());
if (!lock->is_rec_exclusive_not_gap())
lock_rec_dequeue_from_page(lock, false);
else if (UNIV_UNLIKELY(lock->type_mode &
(LOCK_PREDICATE | LOCK_PRDT_PAGE)))
@@ -5432,7 +5651,8 @@ lock_rec_queue_validate(
ut_ad(trx_state_eq(lock->trx,
TRX_STATE_COMMITTED_IN_MEMORY)
|| !lock->is_waiting()
|| lock_rec_has_to_wait_in_queue(cell, lock));
|| lock_rec_has_to_wait_in_queue(cell, lock).
conflicting);
lock->trx->mutex_unlock();
}
@@ -5524,7 +5744,8 @@ func_exit:
if (lock->is_waiting()) {
ut_a(lock->is_gap()
|| lock_rec_has_to_wait_in_queue(cell, lock));
|| lock_rec_has_to_wait_in_queue(cell, lock).
conflicting);
} else if (!lock->is_gap()) {
const lock_mode mode = lock->mode() == LOCK_S
? LOCK_X : LOCK_S;
@@ -5830,13 +6051,16 @@ lock_rec_insert_check_and_lock(
on the successor, which produced an unnecessary deadlock. */
const unsigned type_mode= LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION;
if (lock_t *c_lock= lock_rec_other_has_conflicting(type_mode,
g.cell(), id,
heap_no, trx))
conflicting_lock_info c_lock_info= lock_rec_other_has_conflicting(
type_mode, g.cell(), id, heap_no, trx);
/* Insert intention locks must not bypass any other lock. */
ut_ad(!c_lock_info.insert_after && !c_lock_info.bypassed);
if (c_lock_info.conflicting)
{
trx->mutex_lock();
err= lock_rec_enqueue_waiting(c_lock, type_mode, id, block->page.frame,
heap_no, index, thr, nullptr);
err= lock_rec_enqueue_waiting(c_lock_info, type_mode, id,
block->page.frame, heap_no, index, thr,
nullptr);
trx->mutex_unlock();
}
}
@@ -5905,8 +6129,9 @@ static trx_t *lock_rec_convert_impl_to_expl_for_trx(trx_t *trx,
if (!trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY) &&
!lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, g.cell(), id, heap_no,
trx))
lock_rec_add_to_queue(LOCK_X | LOCK_REC_NOT_GAP, g.cell(), id,
page_align(rec), heap_no, index, trx, true);
lock_rec_add_to_queue(null_c_lock_info, LOCK_X | LOCK_REC_NOT_GAP,
g.cell(), id, page_align(rec), heap_no, index,
trx, true);
}
trx->release_reference();

View File

@@ -470,8 +470,9 @@ create:
because we should be moving an existing waiting lock request. */
ut_ad(!(type_mode & LOCK_WAIT) || trx->lock.wait_trx);
lock_t* lock = lock_rec_create(nullptr,
type_mode, block, PRDT_HEAPNO, index,
lock_t* lock = lock_rec_create(null_c_lock_info,
type_mode, block->page.id(),
block->page.frame, PRDT_HEAPNO, index,
trx, caller_owns_trx_mutex);
if (lock->type_mode & LOCK_PREDICATE) {
@@ -533,8 +534,9 @@ lock_prdt_insert_check_and_lock(
trx->mutex_lock();
/* Allocate MBR on the lock heap */
lock_init_prdt_from_mbr(prdt, mbr, 0, trx->lock.lock_heap);
err= lock_rec_enqueue_waiting(c_lock, mode, id, block->page.frame,
PRDT_HEAPNO, index, thr, prdt);
err= lock_rec_enqueue_waiting({c_lock, nullptr, ut_d(nullptr)}, mode, id,
block->page.frame, PRDT_HEAPNO, index,
thr, prdt);
trx->mutex_unlock();
}
}
@@ -734,10 +736,10 @@ lock_prdt_lock(
lock_t* lock = lock_sys_t::get_first(g.cell(), id);
if (lock == NULL) {
lock = lock_rec_create(
NULL,
prdt_mode, block, PRDT_HEAPNO,
index, trx, FALSE);
lock = lock_rec_create(null_c_lock_info,
prdt_mode, block->page.id(),
block->page.frame, PRDT_HEAPNO, index,
trx, FALSE);
status = LOCK_REC_SUCCESS_CREATED;
} else {
@@ -759,7 +761,8 @@ lock_prdt_lock(
prdt_mode, g.cell(), id, prdt,
trx)) {
err = lock_rec_enqueue_waiting(
wait_for, prdt_mode, id,
{wait_for, nullptr, ut_d(nullptr)},
prdt_mode, id,
block->page.frame, PRDT_HEAPNO,
index, thr, prdt);
} else {
@@ -826,8 +829,7 @@ lock_place_prdt_page_lock(
}
if (lock == NULL) {
lock = lock_rec_create_low(
NULL,
lock = lock_rec_create(null_c_lock_info,
mode, page_id, NULL, PRDT_HEAPNO,
index, trx, FALSE);