1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-30 16:24:05 +03:00

Bug#53674: InnoDB: Error: unlock row could not find a 4 mode lock on the record

In semi-consistent read, only unlock freshly locked non-matching records.

Define DB_SUCCESS_LOCKED_REC for indicating a successful operation
where a record lock was created.

lock_rec_lock_fast(): Return LOCK_REC_SUCCESS,
LOCK_REC_SUCCESS_CREATED, or LOCK_REC_FAIL instead of TRUE/FALSE.

lock_sec_rec_read_check_and_lock(),
lock_clust_rec_read_check_and_lock(), lock_rec_enqueue_waiting(),
lock_rec_lock_slow(), lock_rec_lock(), row_ins_set_shared_rec_lock(),
row_ins_set_exclusive_rec_lock(), sel_set_rec_lock(),
row_sel_get_clust_rec_for_mysql(): Return DB_SUCCESS_LOCKED_REC if a
new record lock was created. Adjust callers.

row_unlock_for_mysql(): Correct the function documentation.

row_prebuilt_t::new_rec_locks: Correct the documentation.
This commit is contained in:
Marko Mäkelä
2010-06-02 13:26:37 +03:00
parent b55ec58701
commit 306e1338a5
10 changed files with 295 additions and 210 deletions

View File

@ -0,0 +1,11 @@
create table bug53674(a int)engine=innodb;
insert into bug53674 values (1),(2);
start transaction;
select * from bug53674 for update;
a
1
2
select * from bug53674 where a=(select a from bug53674 where a > 1);
a
2
drop table bug53674;

View File

@ -0,0 +1 @@
--log-bin --innodb-locks-unsafe-for-binlog --binlog-format=mixed

View File

@ -0,0 +1,8 @@
-- source include/have_innodb.inc
create table bug53674(a int)engine=innodb;
insert into bug53674 values (1),(2);
start transaction;
select * from bug53674 for update;
select * from bug53674 where a=(select a from bug53674 where a > 1);
drop table bug53674;

View File

@ -10,6 +10,8 @@ Created 5/24/1996 Heikki Tuuri
#define db0err_h #define db0err_h
#define DB_SUCCESS_LOCKED_REC 9 /* like DB_SUCCESS, but a new
explicit record lock was created */
#define DB_SUCCESS 10 #define DB_SUCCESS 10
/* The following are error codes */ /* The following are error codes */

View File

@ -292,14 +292,15 @@ lock_sec_rec_modify_check_and_lock(
dict_index_t* index, /* in: secondary index */ dict_index_t* index, /* in: secondary index */
que_thr_t* thr); /* in: query thread */ que_thr_t* thr); /* in: query thread */
/************************************************************************* /*************************************************************************
Like the counterpart for a clustered index below, but now we read a Like lock_clust_rec_read_check_and_lock(), but reads a
secondary index record. */ secondary index record. */
ulint ulint
lock_sec_rec_read_check_and_lock( lock_sec_rec_read_check_and_lock(
/*=============================*/ /*=============================*/
/* out: DB_SUCCESS, DB_LOCK_WAIT, /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */ DB_LOCK_WAIT, DB_DEADLOCK,
or DB_QUE_THR_SUSPENDED */
ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set, ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set,
does nothing */ does nothing */
rec_t* rec, /* in: user record or page supremum record rec_t* rec, /* in: user record or page supremum record
@ -324,8 +325,9 @@ lock on the record. */
ulint ulint
lock_clust_rec_read_check_and_lock( lock_clust_rec_read_check_and_lock(
/*===============================*/ /*===============================*/
/* out: DB_SUCCESS, DB_LOCK_WAIT, /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */ DB_LOCK_WAIT, DB_DEADLOCK,
or DB_QUE_THR_SUSPENDED */
ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set, ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set,
does nothing */ does nothing */
rec_t* rec, /* in: user record or page supremum record rec_t* rec, /* in: user record or page supremum record

View File

@ -246,22 +246,20 @@ row_update_for_mysql(
row_prebuilt_t* prebuilt); /* in: prebuilt struct in MySQL row_prebuilt_t* prebuilt); /* in: prebuilt struct in MySQL
handle */ handle */
/************************************************************************* /*************************************************************************
This can only be used when srv_locks_unsafe_for_binlog is TRUE or This can only be used when srv_locks_unsafe_for_binlog is TRUE or this
session is using a READ COMMITTED isolation level. Before session is using a READ COMMITTED or READ UNCOMMITTED isolation level.
calling this function we must use trx_reset_new_rec_lock_info() and Before calling this function row_search_for_mysql() must have
trx_register_new_rec_lock() to store the information which new record locks initialized prebuilt->new_rec_locks to store the information which new
really were set. This function removes a newly set lock under prebuilt->pcur, record locks really were set. This function removes a newly set
and also under prebuilt->clust_pcur. Currently, this is only used and tested clustered index record lock under prebuilt->pcur or
in the case of an UPDATE or a DELETE statement, where the row lock is of the prebuilt->clust_pcur. Thus, this implements a 'mini-rollback' that
LOCK_X type. releases the latest clustered index record lock we set. */
Thus, this implements a 'mini-rollback' that releases the latest record
locks we set. */
int int
row_unlock_for_mysql( row_unlock_for_mysql(
/*=================*/ /*=================*/
/* out: error code or DB_SUCCESS */ /* out: error code or DB_SUCCESS */
row_prebuilt_t* prebuilt, /* in: prebuilt struct in MySQL row_prebuilt_t* prebuilt, /* in/out: prebuilt struct in MySQL
handle */ handle */
ibool has_latches_on_recs);/* TRUE if called so that we have ibool has_latches_on_recs);/* TRUE if called so that we have
the latches on the records under pcur the latches on the records under pcur
@ -660,18 +658,17 @@ struct row_prebuilt_struct {
ulint new_rec_locks; /* normally 0; if ulint new_rec_locks; /* normally 0; if
srv_locks_unsafe_for_binlog is srv_locks_unsafe_for_binlog is
TRUE or session is using READ TRUE or session is using READ
COMMITTED isolation level, in a COMMITTED or READ UNCOMMITTED
cursor search, if we set a new isolation level, set in
record lock on an index, this is row_search_for_mysql() if we set a new
incremented; this is used in record lock on the secondary
releasing the locks under the or clustered index; this is
cursors if we are performing an used in row_unlock_for_mysql()
UPDATE and we determine after when releasing the lock under
retrieving the row that it does the cursor if we determine
not need to be locked; thus, after retrieving the row that
these can be used to implement a it does not need to be locked
'mini-rollback' that releases ('mini-rollback') */
the latest record locks */
ulint mysql_prefix_len;/* byte offset of the end of ulint mysql_prefix_len;/* byte offset of the end of
the last requested column */ the last requested column */
ulint mysql_row_len; /* length in bytes of a row in the ulint mysql_row_len; /* length in bytes of a row in the

View File

@ -1739,11 +1739,12 @@ ulint
lock_rec_enqueue_waiting( lock_rec_enqueue_waiting(
/*=====================*/ /*=====================*/
/* out: DB_LOCK_WAIT, DB_DEADLOCK, or /* out: DB_LOCK_WAIT, DB_DEADLOCK, or
DB_QUE_THR_SUSPENDED, or DB_SUCCESS; DB_QUE_THR_SUSPENDED, or DB_SUCCESS_LOCKED_REC;
DB_SUCCESS means that there was a deadlock, DB_SUCCESS_LOCKED_REC means that there
but another transaction was chosen as a was a deadlock, but another
victim, and we got the lock immediately: transaction was chosen as a victim,
no need to wait then */ and we got the lock immediately: no
need to wait then */
ulint type_mode,/* in: lock mode this transaction is ulint type_mode,/* in: lock mode this transaction is
requesting: LOCK_S or LOCK_X, possibly ORed requesting: LOCK_S or LOCK_X, possibly ORed
with LOCK_GAP or LOCK_REC_NOT_GAP, ORed with LOCK_GAP or LOCK_REC_NOT_GAP, ORed
@ -1804,7 +1805,7 @@ lock_rec_enqueue_waiting(
if (trx->wait_lock == NULL) { if (trx->wait_lock == NULL) {
return(DB_SUCCESS); return(DB_SUCCESS_LOCKED_REC);
} }
trx->que_state = TRX_QUE_LOCK_WAIT; trx->que_state = TRX_QUE_LOCK_WAIT;
@ -1903,6 +1904,16 @@ lock_rec_add_to_queue(
return(lock_rec_create(type_mode, rec, index, trx)); return(lock_rec_create(type_mode, rec, index, trx));
} }
/** Record locking request status */
enum lock_rec_req_status {
/** Failed to acquire a lock */
LOCK_REC_FAIL,
/** Succeeded in acquiring a lock (implicit or already acquired) */
LOCK_REC_SUCCESS,
/** Explicitly created a new lock */
LOCK_REC_SUCCESS_CREATED
};
/************************************************************************* /*************************************************************************
This is a fast routine for locking a record in the most common cases: This is a fast routine for locking a record in the most common cases:
there are no explicit locks on the page, or there is just one lock, owned there are no explicit locks on the page, or there is just one lock, owned
@ -1911,10 +1922,10 @@ which does NOT look at implicit locks! Checks lock compatibility within
explicit locks. This function sets a normal next-key lock, or in the case of explicit locks. This function sets a normal next-key lock, or in the case of
a page supremum record, a gap type lock. */ a page supremum record, a gap type lock. */
UNIV_INLINE UNIV_INLINE
ibool enum lock_rec_req_status
lock_rec_lock_fast( lock_rec_lock_fast(
/*===============*/ /*===============*/
/* out: TRUE if locking succeeded */ /* out: whether the locking succeeded */
ibool impl, /* in: if TRUE, no lock is set if no wait ibool impl, /* in: if TRUE, no lock is set if no wait
is necessary: we assume that the caller will is necessary: we assume that the caller will
set an implicit lock */ set an implicit lock */
@ -1950,19 +1961,19 @@ lock_rec_lock_fast(
lock_rec_create(mode, rec, index, trx); lock_rec_create(mode, rec, index, trx);
} }
return(TRUE); return(LOCK_REC_SUCCESS_CREATED);
} }
if (lock_rec_get_next_on_page(lock)) { if (lock_rec_get_next_on_page(lock)) {
return(FALSE); return(LOCK_REC_FAIL);
} }
if (lock->trx != trx if (lock->trx != trx
|| lock->type_mode != (mode | LOCK_REC) || lock->type_mode != (mode | LOCK_REC)
|| lock_rec_get_n_bits(lock) <= heap_no) { || lock_rec_get_n_bits(lock) <= heap_no) {
return(FALSE); return(LOCK_REC_FAIL);
} }
if (!impl) { if (!impl) {
@ -1971,10 +1982,11 @@ lock_rec_lock_fast(
if (!lock_rec_get_nth_bit(lock, heap_no)) { if (!lock_rec_get_nth_bit(lock, heap_no)) {
lock_rec_set_nth_bit(lock, heap_no); lock_rec_set_nth_bit(lock, heap_no);
return(LOCK_REC_SUCCESS_CREATED);
} }
} }
return(TRUE); return(LOCK_REC_SUCCESS);
} }
/************************************************************************* /*************************************************************************
@ -1986,8 +1998,9 @@ static
ulint ulint
lock_rec_lock_slow( lock_rec_lock_slow(
/*===============*/ /*===============*/
/* out: DB_SUCCESS, DB_LOCK_WAIT, or error /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
code */ DB_LOCK_WAIT, DB_DEADLOCK,
or DB_QUE_THR_SUSPENDED */
ibool impl, /* in: if TRUE, no lock is set if no wait is ibool impl, /* in: if TRUE, no lock is set if no wait is
necessary: we assume that the caller will set necessary: we assume that the caller will set
an implicit lock */ an implicit lock */
@ -1998,7 +2011,6 @@ lock_rec_lock_slow(
que_thr_t* thr) /* in: query thread */ que_thr_t* thr) /* in: query thread */
{ {
trx_t* trx; trx_t* trx;
ulint err;
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
ut_ad((LOCK_MODE_MASK & mode) != LOCK_S ut_ad((LOCK_MODE_MASK & mode) != LOCK_S
@ -2017,26 +2029,21 @@ lock_rec_lock_slow(
/* The trx already has a strong enough lock on rec: do /* The trx already has a strong enough lock on rec: do
nothing */ nothing */
err = DB_SUCCESS;
} else if (lock_rec_other_has_conflicting(mode, rec, trx)) { } else if (lock_rec_other_has_conflicting(mode, rec, trx)) {
/* If another transaction has a non-gap conflicting request in /* If another transaction has a non-gap conflicting request in
the queue, as this transaction does not have a lock strong the queue, as this transaction does not have a lock strong
enough already granted on the record, we have to wait. */ enough already granted on the record, we have to wait. */
err = lock_rec_enqueue_waiting(mode, rec, index, thr); return(lock_rec_enqueue_waiting(mode, rec, index, thr));
} else { } else if (!impl) {
if (!impl) { /* Set the requested lock on the record */
/* Set the requested lock on the record */
lock_rec_add_to_queue(LOCK_REC | mode, rec, index, lock_rec_add_to_queue(LOCK_REC | mode, rec, index, trx);
trx); return(DB_SUCCESS_LOCKED_REC);
}
err = DB_SUCCESS;
} }
return(err); return(DB_SUCCESS);
} }
/************************************************************************* /*************************************************************************
@ -2049,8 +2056,9 @@ static
ulint ulint
lock_rec_lock( lock_rec_lock(
/*==========*/ /*==========*/
/* out: DB_SUCCESS, DB_LOCK_WAIT, or error /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
code */ DB_LOCK_WAIT, DB_DEADLOCK, or
DB_QUE_THR_SUSPENDED */
ibool impl, /* in: if TRUE, no lock is set if no wait is ibool impl, /* in: if TRUE, no lock is set if no wait is
necessary: we assume that the caller will set necessary: we assume that the caller will set
an implicit lock */ an implicit lock */
@ -2060,8 +2068,6 @@ lock_rec_lock(
dict_index_t* index, /* in: index of record */ dict_index_t* index, /* in: index of record */
que_thr_t* thr) /* in: query thread */ que_thr_t* thr) /* in: query thread */
{ {
ulint err;
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
ut_ad((LOCK_MODE_MASK & mode) != LOCK_S ut_ad((LOCK_MODE_MASK & mode) != LOCK_S
|| lock_table_has(thr_get_trx(thr), index->table, LOCK_IS)); || lock_table_has(thr_get_trx(thr), index->table, LOCK_IS));
@ -2073,17 +2079,19 @@ lock_rec_lock(
|| mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP || mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP
|| mode - (LOCK_MODE_MASK & mode) == 0); || mode - (LOCK_MODE_MASK & mode) == 0);
if (lock_rec_lock_fast(impl, mode, rec, index, thr)) { /* We try a simplified and faster subroutine for the most
common cases */
/* We try a simplified and faster subroutine for the most switch (lock_rec_lock_fast(impl, mode, rec, index, thr)) {
common cases */ case LOCK_REC_SUCCESS:
return(DB_SUCCESS);
err = DB_SUCCESS; case LOCK_REC_SUCCESS_CREATED:
} else { return(DB_SUCCESS_LOCKED_REC);
err = lock_rec_lock_slow(impl, mode, rec, index, thr); case LOCK_REC_FAIL:
return(lock_rec_lock_slow(impl, mode, rec, index, thr));
} }
return(err); ut_error;
return(DB_ERROR);
} }
/************************************************************************* /*************************************************************************
@ -4832,7 +4840,7 @@ lock_rec_insert_check_and_lock(
lock = lock_rec_get_first(next_rec); lock = lock_rec_get_first(next_rec);
if (lock == NULL) { if (UNIV_LIKELY(lock == NULL)) {
/* We optimize CPU time usage in the simplest case */ /* We optimize CPU time usage in the simplest case */
lock_mutex_exit_kernel(); lock_mutex_exit_kernel();
@ -4840,8 +4848,7 @@ lock_rec_insert_check_and_lock(
if (!(index->type & DICT_CLUSTERED)) { if (!(index->type & DICT_CLUSTERED)) {
/* Update the page max trx id field */ /* Update the page max trx id field */
page_update_max_trx_id(buf_frame_align(rec), page_update_max_trx_id(buf_frame_align(rec), trx->id);
thr_get_trx(thr)->id);
} }
return(DB_SUCCESS); return(DB_SUCCESS);
@ -4873,11 +4880,16 @@ lock_rec_insert_check_and_lock(
lock_mutex_exit_kernel(); lock_mutex_exit_kernel();
if (!(index->type & DICT_CLUSTERED) && (err == DB_SUCCESS)) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
err = DB_SUCCESS;
/* fall through */
case DB_SUCCESS:
if (index->type & DICT_CLUSTERED) {
break;
}
/* Update the page max trx id field */ /* Update the page max trx id field */
page_update_max_trx_id(buf_frame_align(rec), page_update_max_trx_id(buf_frame_align(rec), trx->id);
thr_get_trx(thr)->id);
} }
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
@ -4984,6 +4996,10 @@ lock_clust_rec_modify_check_and_lock(
ut_ad(lock_rec_queue_validate(rec, index, offsets)); ut_ad(lock_rec_queue_validate(rec, index, offsets));
if (UNIV_UNLIKELY(err == DB_SUCCESS_LOCKED_REC)) {
err = DB_SUCCESS;
}
return(err); return(err);
} }
@ -5043,25 +5059,29 @@ lock_sec_rec_modify_check_and_lock(
} }
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
if (err == DB_SUCCESS) { if (err == DB_SUCCESS || err == DB_SUCCESS_LOCKED_REC) {
/* Update the page max trx id field */ /* Update the page max trx id field */
/* It might not be necessary to do this if
err == DB_SUCCESS (no new lock created),
but it should not cost too much performance. */
page_update_max_trx_id(buf_frame_align(rec), page_update_max_trx_id(buf_frame_align(rec),
thr_get_trx(thr)->id); thr_get_trx(thr)->id);
err = DB_SUCCESS;
} }
return(err); return(err);
} }
/************************************************************************* /*************************************************************************
Like the counterpart for a clustered index below, but now we read a Like lock_clust_rec_read_check_and_lock(), but reads a
secondary index record. */ secondary index record. */
ulint ulint
lock_sec_rec_read_check_and_lock( lock_sec_rec_read_check_and_lock(
/*=============================*/ /*=============================*/
/* out: DB_SUCCESS, DB_LOCK_WAIT, /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */ DB_LOCK_WAIT, DB_DEADLOCK,
or DB_QUE_THR_SUSPENDED */
ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set, ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set,
does nothing */ does nothing */
rec_t* rec, /* in: user record or page supremum record rec_t* rec, /* in: user record or page supremum record
@ -5126,8 +5146,9 @@ lock on the record. */
ulint ulint
lock_clust_rec_read_check_and_lock( lock_clust_rec_read_check_and_lock(
/*===============================*/ /*===============================*/
/* out: DB_SUCCESS, DB_LOCK_WAIT, /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */ DB_LOCK_WAIT, DB_DEADLOCK,
or DB_QUE_THR_SUSPENDED */
ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set, ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set,
does nothing */ does nothing */
rec_t* rec, /* in: user record or page supremum record rec_t* rec, /* in: user record or page supremum record
@ -5206,16 +5227,21 @@ lock_clust_rec_read_check_and_lock_alt(
mem_heap_t* tmp_heap = NULL; mem_heap_t* tmp_heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE]; ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_; ulint* offsets = offsets_;
ulint ret; ulint err;
*offsets_ = (sizeof offsets_) / sizeof *offsets_; *offsets_ = (sizeof offsets_) / sizeof *offsets_;
offsets = rec_get_offsets(rec, index, offsets, offsets = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED, &tmp_heap); ULINT_UNDEFINED, &tmp_heap);
ret = lock_clust_rec_read_check_and_lock(flags, rec, index, err = lock_clust_rec_read_check_and_lock(flags, rec, index,
offsets, mode, gap_mode, thr); offsets, mode, gap_mode, thr);
if (tmp_heap) { if (tmp_heap) {
mem_heap_free(tmp_heap); mem_heap_free(tmp_heap);
} }
return(ret);
if (UNIV_UNLIKELY(err == DB_SUCCESS_LOCKED_REC)) {
err = DB_SUCCESS;
}
return(err);
} }

View File

@ -1114,7 +1114,8 @@ static
ulint ulint
row_ins_set_shared_rec_lock( row_ins_set_shared_rec_lock(
/*========================*/ /*========================*/
/* out: DB_SUCCESS or error code */ /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
or error code */
ulint type, /* in: LOCK_ORDINARY, LOCK_GAP, or ulint type, /* in: LOCK_ORDINARY, LOCK_GAP, or
LOCK_REC_NOT_GAP type lock */ LOCK_REC_NOT_GAP type lock */
rec_t* rec, /* in: record */ rec_t* rec, /* in: record */
@ -1145,7 +1146,8 @@ static
ulint ulint
row_ins_set_exclusive_rec_lock( row_ins_set_exclusive_rec_lock(
/*===========================*/ /*===========================*/
/* out: DB_SUCCESS or error code */ /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
or error code */
ulint type, /* in: LOCK_ORDINARY, LOCK_GAP, or ulint type, /* in: LOCK_ORDINARY, LOCK_GAP, or
LOCK_REC_NOT_GAP type lock */ LOCK_REC_NOT_GAP type lock */
rec_t* rec, /* in: record */ rec_t* rec, /* in: record */
@ -1195,9 +1197,7 @@ row_ins_check_foreign_constraint(
dict_table_t* check_table; dict_table_t* check_table;
dict_index_t* check_index; dict_index_t* check_index;
ulint n_fields_cmp; ulint n_fields_cmp;
rec_t* rec;
btr_pcur_t pcur; btr_pcur_t pcur;
ibool moved;
int cmp; int cmp;
ulint err; ulint err;
ulint i; ulint i;
@ -1328,12 +1328,12 @@ run_again:
/* Scan index records and check if there is a matching record */ /* Scan index records and check if there is a matching record */
for (;;) { do {
rec = btr_pcur_get_rec(&pcur); rec_t* rec = btr_pcur_get_rec(&pcur);
if (page_rec_is_infimum(rec)) { if (page_rec_is_infimum(rec)) {
goto next_rec; continue;
} }
offsets = rec_get_offsets(rec, check_index, offsets = rec_get_offsets(rec, check_index,
@ -1343,12 +1343,13 @@ run_again:
err = row_ins_set_shared_rec_lock( err = row_ins_set_shared_rec_lock(
LOCK_ORDINARY, rec, check_index, offsets, thr); LOCK_ORDINARY, rec, check_index, offsets, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
break; case DB_SUCCESS:
continue;
default:
goto end_scan;
} }
goto next_rec;
} }
cmp = cmp_dtuple_rec(entry, rec, offsets); cmp = cmp_dtuple_rec(entry, rec, offsets);
@ -1359,9 +1360,12 @@ run_again:
err = row_ins_set_shared_rec_lock( err = row_ins_set_shared_rec_lock(
LOCK_ORDINARY, rec, check_index, LOCK_ORDINARY, rec, check_index,
offsets, thr); offsets, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
break; break;
default:
goto end_scan;
} }
} else { } else {
/* Found a matching record. Lock only /* Found a matching record. Lock only
@ -1372,15 +1376,18 @@ run_again:
LOCK_REC_NOT_GAP, rec, check_index, LOCK_REC_NOT_GAP, rec, check_index,
offsets, thr); offsets, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
break; break;
default:
goto end_scan;
} }
if (check_ref) { if (check_ref) {
err = DB_SUCCESS; err = DB_SUCCESS;
break; goto end_scan;
} else if (foreign->type != 0) { } else if (foreign->type != 0) {
/* There is an ON UPDATE or ON DELETE /* There is an ON UPDATE or ON DELETE
condition: check them in a separate condition: check them in a separate
@ -1406,7 +1413,7 @@ run_again:
err = DB_FOREIGN_DUPLICATE_KEY; err = DB_FOREIGN_DUPLICATE_KEY;
} }
break; goto end_scan;
} }
} else { } else {
row_ins_foreign_report_err( row_ins_foreign_report_err(
@ -1414,48 +1421,39 @@ run_again:
thr, foreign, rec, entry); thr, foreign, rec, entry);
err = DB_ROW_IS_REFERENCED; err = DB_ROW_IS_REFERENCED;
break; goto end_scan;
} }
} }
} } else {
ut_a(cmp < 0);
if (cmp < 0) {
err = row_ins_set_shared_rec_lock( err = row_ins_set_shared_rec_lock(
LOCK_GAP, rec, check_index, offsets, thr); LOCK_GAP, rec, check_index, offsets, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
break; case DB_SUCCESS:
if (check_ref) {
err = DB_NO_REFERENCED_ROW;
row_ins_foreign_report_add_err(
trx, foreign, rec, entry);
} else {
err = DB_SUCCESS;
}
} }
if (check_ref) { goto end_scan;
err = DB_NO_REFERENCED_ROW;
row_ins_foreign_report_add_err(
trx, foreign, rec, entry);
} else {
err = DB_SUCCESS;
}
break;
} }
} while (btr_pcur_move_to_next(&pcur, &mtr));
ut_a(cmp == 0); if (check_ref) {
next_rec: row_ins_foreign_report_add_err(
moved = btr_pcur_move_to_next(&pcur, &mtr); trx, foreign, btr_pcur_get_rec(&pcur), entry);
err = DB_NO_REFERENCED_ROW;
if (!moved) { } else {
if (check_ref) { err = DB_SUCCESS;
rec = btr_pcur_get_rec(&pcur);
row_ins_foreign_report_add_err(
trx, foreign, rec, entry);
err = DB_NO_REFERENCED_ROW;
} else {
err = DB_SUCCESS;
}
break;
}
} }
end_scan:
btr_pcur_close(&pcur); btr_pcur_close(&pcur);
mtr_commit(&mtr); mtr_commit(&mtr);
@ -1641,10 +1639,8 @@ row_ins_scan_sec_index_for_duplicate(
ulint i; ulint i;
int cmp; int cmp;
ulint n_fields_cmp; ulint n_fields_cmp;
rec_t* rec;
btr_pcur_t pcur; btr_pcur_t pcur;
ulint err = DB_SUCCESS; ulint err = DB_SUCCESS;
ibool moved;
unsigned allow_duplicates; unsigned allow_duplicates;
mtr_t mtr; mtr_t mtr;
mem_heap_t* heap = NULL; mem_heap_t* heap = NULL;
@ -1680,12 +1676,12 @@ row_ins_scan_sec_index_for_duplicate(
/* Scan index records and check if there is a duplicate */ /* Scan index records and check if there is a duplicate */
for (;;) { do {
rec = btr_pcur_get_rec(&pcur); rec_t* rec = btr_pcur_get_rec(&pcur);
if (page_rec_is_infimum(rec)) { if (page_rec_is_infimum(rec)) {
goto next_rec; continue;
} }
offsets = rec_get_offsets(rec, index, offsets, offsets = rec_get_offsets(rec, index, offsets,
@ -1706,14 +1702,18 @@ row_ins_scan_sec_index_for_duplicate(
LOCK_ORDINARY, rec, index, offsets, thr); LOCK_ORDINARY, rec, index, offsets, thr);
} }
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
err = DB_SUCCESS;
case DB_SUCCESS:
break; break;
default:
goto end_scan;
} }
if (page_rec_is_supremum(rec)) { if (page_rec_is_supremum(rec)) {
goto next_rec; continue;
} }
cmp = cmp_dtuple_rec(entry, rec, offsets); cmp = cmp_dtuple_rec(entry, rec, offsets);
@ -1725,23 +1725,15 @@ row_ins_scan_sec_index_for_duplicate(
thr_get_trx(thr)->error_info = index; thr_get_trx(thr)->error_info = index;
break; goto end_scan;
} }
} else {
ut_a(cmp < 0);
goto end_scan;
} }
} while (btr_pcur_move_to_next(&pcur, &mtr));
if (cmp < 0) { end_scan:
break;
}
ut_a(cmp == 0);
next_rec:
moved = btr_pcur_move_to_next(&pcur, &mtr);
if (!moved) {
break;
}
}
if (UNIV_LIKELY_NULL(heap)) { if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap); mem_heap_free(heap);
} }
@ -1837,7 +1829,11 @@ row_ins_duplicate_error_in_clust(
cursor->index, offsets, thr); cursor->index, offsets, thr);
} }
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
break;
default:
goto func_exit; goto func_exit;
} }
@ -1875,7 +1871,11 @@ row_ins_duplicate_error_in_clust(
cursor->index, offsets, thr); cursor->index, offsets, thr);
} }
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
break;
default:
goto func_exit; goto func_exit;
} }

View File

@ -1455,22 +1455,20 @@ run_again:
} }
/************************************************************************* /*************************************************************************
This can only be used when srv_locks_unsafe_for_binlog is TRUE or This can only be used when srv_locks_unsafe_for_binlog is TRUE or this
this session is using a READ COMMITTED isolation level. Before session is using a READ COMMITTED or READ UNCOMMITTED isolation level.
calling this function we must use trx_reset_new_rec_lock_info() and Before calling this function row_search_for_mysql() must have
trx_register_new_rec_lock() to store the information which new record locks initialized prebuilt->new_rec_locks to store the information which new
really were set. This function removes a newly set lock under prebuilt->pcur, record locks really were set. This function removes a newly set
and also under prebuilt->clust_pcur. Currently, this is only used and tested clustered index record lock under prebuilt->pcur or
in the case of an UPDATE or a DELETE statement, where the row lock is of the prebuilt->clust_pcur. Thus, this implements a 'mini-rollback' that
LOCK_X type. releases the latest clustered index record lock we set. */
Thus, this implements a 'mini-rollback' that releases the latest record
locks we set. */
int int
row_unlock_for_mysql( row_unlock_for_mysql(
/*=================*/ /*=================*/
/* out: error code or DB_SUCCESS */ /* out: error code or DB_SUCCESS */
row_prebuilt_t* prebuilt, /* in: prebuilt struct in MySQL row_prebuilt_t* prebuilt, /* in/out: prebuilt struct in MySQL
handle */ handle */
ibool has_latches_on_recs)/* TRUE if called so that we have ibool has_latches_on_recs)/* TRUE if called so that we have
the latches on the records under pcur the latches on the records under pcur

View File

@ -754,8 +754,14 @@ row_sel_get_clust_rec(
0, clust_rec, index, offsets, 0, clust_rec, index, offsets,
node->row_lock_mode, lock_type, thr); node->row_lock_mode, lock_type, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS:
case DB_SUCCESS_LOCKED_REC:
/* Declare the variable uninitialized in Valgrind.
It should be set to DB_SUCCESS at func_exit. */
UNIV_MEM_INVALID(&err, sizeof err);
break;
default:
goto err_exit; goto err_exit;
} }
} else { } else {
@ -826,7 +832,8 @@ UNIV_INLINE
ulint ulint
sel_set_rec_lock( sel_set_rec_lock(
/*=============*/ /*=============*/
/* out: DB_SUCCESS or error code */ /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
or error code */
rec_t* rec, /* in: record */ rec_t* rec, /* in: record */
dict_index_t* index, /* in: index */ dict_index_t* index, /* in: index */
const ulint* offsets,/* in: rec_get_offsets(rec, index) */ const ulint* offsets,/* in: rec_get_offsets(rec, index) */
@ -1374,11 +1381,15 @@ rec_loop:
node->row_lock_mode, node->row_lock_mode,
lock_type, thr); lock_type, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
err = DB_SUCCESS;
case DB_SUCCESS:
break;
default:
/* Note that in this case we will store in pcur /* Note that in this case we will store in pcur
the PREDECESSOR of the record we are waiting the PREDECESSOR of the record we are waiting
the lock for */ the lock for */
goto lock_wait_or_error; goto lock_wait_or_error;
} }
} }
@ -1429,8 +1440,12 @@ skip_lock:
err = sel_set_rec_lock(rec, index, offsets, err = sel_set_rec_lock(rec, index, offsets,
node->row_lock_mode, lock_type, thr); node->row_lock_mode, lock_type, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
err = DB_SUCCESS;
case DB_SUCCESS:
break;
default:
goto lock_wait_or_error; goto lock_wait_or_error;
} }
} }
@ -2745,7 +2760,8 @@ static
ulint ulint
row_sel_get_clust_rec_for_mysql( row_sel_get_clust_rec_for_mysql(
/*============================*/ /*============================*/
/* out: DB_SUCCESS or error code */ /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
or error code */
row_prebuilt_t* prebuilt,/* in: prebuilt struct in the handle */ row_prebuilt_t* prebuilt,/* in: prebuilt struct in the handle */
dict_index_t* sec_index,/* in: secondary index where rec resides */ dict_index_t* sec_index,/* in: secondary index where rec resides */
rec_t* rec, /* in: record in a non-clustered index; if rec_t* rec, /* in: record in a non-clustered index; if
@ -2826,6 +2842,7 @@ row_sel_get_clust_rec_for_mysql(
clust_rec = NULL; clust_rec = NULL;
err = DB_SUCCESS;
goto func_exit; goto func_exit;
} }
@ -2840,8 +2857,11 @@ row_sel_get_clust_rec_for_mysql(
err = lock_clust_rec_read_check_and_lock( err = lock_clust_rec_read_check_and_lock(
0, clust_rec, clust_index, *offsets, 0, clust_rec, clust_index, *offsets,
prebuilt->select_lock_type, LOCK_REC_NOT_GAP, thr); prebuilt->select_lock_type, LOCK_REC_NOT_GAP, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS:
case DB_SUCCESS_LOCKED_REC:
break;
default:
goto err_exit; goto err_exit;
} }
} else { } else {
@ -2900,6 +2920,8 @@ row_sel_get_clust_rec_for_mysql(
rec, sec_index, clust_rec, clust_index)); rec, sec_index, clust_rec, clust_index));
#endif #endif
} }
err = DB_SUCCESS;
} }
func_exit: func_exit:
@ -2912,7 +2934,6 @@ func_exit:
btr_pcur_store_position(prebuilt->clust_pcur, mtr); btr_pcur_store_position(prebuilt->clust_pcur, mtr);
} }
err = DB_SUCCESS;
err_exit: err_exit:
return(err); return(err);
} }
@ -3626,8 +3647,12 @@ shortcut_fails_too_big_rec:
prebuilt->select_lock_type, prebuilt->select_lock_type,
LOCK_GAP, thr); LOCK_GAP, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
err = DB_SUCCESS;
case DB_SUCCESS:
break;
default:
goto lock_wait_or_error; goto lock_wait_or_error;
} }
} }
@ -3724,8 +3749,12 @@ rec_loop:
prebuilt->select_lock_type, prebuilt->select_lock_type,
LOCK_ORDINARY, thr); LOCK_ORDINARY, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
err = DB_SUCCESS;
case DB_SUCCESS:
break;
default:
goto lock_wait_or_error; goto lock_wait_or_error;
} }
} }
@ -3856,8 +3885,11 @@ wrong_offs:
prebuilt->select_lock_type, LOCK_GAP, prebuilt->select_lock_type, LOCK_GAP,
thr); thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
break;
default:
goto lock_wait_or_error; goto lock_wait_or_error;
} }
} }
@ -3891,8 +3923,11 @@ wrong_offs:
prebuilt->select_lock_type, LOCK_GAP, prebuilt->select_lock_type, LOCK_GAP,
thr); thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
break;
default:
goto lock_wait_or_error; goto lock_wait_or_error;
} }
} }
@ -3961,15 +3996,21 @@ no_gap_lock:
switch (err) { switch (err) {
rec_t* old_vers; rec_t* old_vers;
case DB_SUCCESS: case DB_SUCCESS_LOCKED_REC:
if (srv_locks_unsafe_for_binlog if (srv_locks_unsafe_for_binlog
|| trx->isolation_level <= TRX_ISO_READ_COMMITTED) { || trx->isolation_level
<= TRX_ISO_READ_COMMITTED) {
/* Note that a record of /* Note that a record of
prebuilt->index was locked. */ prebuilt->index was locked. */
prebuilt->new_rec_locks = 1; prebuilt->new_rec_locks = 1;
} }
err = DB_SUCCESS;
case DB_SUCCESS:
break; break;
case DB_LOCK_WAIT: case DB_LOCK_WAIT:
/* Never unlock rows that were part of a conflict. */
prebuilt->new_rec_locks = 0;
if (UNIV_LIKELY(prebuilt->row_read_type if (UNIV_LIKELY(prebuilt->row_read_type
!= ROW_READ_TRY_SEMI_CONSISTENT) != ROW_READ_TRY_SEMI_CONSISTENT)
|| unique_search || unique_search
@ -3999,7 +4040,6 @@ no_gap_lock:
if (UNIV_LIKELY(trx->wait_lock != NULL)) { if (UNIV_LIKELY(trx->wait_lock != NULL)) {
lock_cancel_waiting_and_release( lock_cancel_waiting_and_release(
trx->wait_lock); trx->wait_lock);
prebuilt->new_rec_locks = 0;
} else { } else {
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
@ -4011,9 +4051,6 @@ no_gap_lock:
ULINT_UNDEFINED, ULINT_UNDEFINED,
&heap); &heap);
err = DB_SUCCESS; err = DB_SUCCESS;
/* Note that a record of
prebuilt->index was locked. */
prebuilt->new_rec_locks = 1;
break; break;
} }
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
@ -4151,27 +4188,30 @@ requires_clust_rec:
err = row_sel_get_clust_rec_for_mysql(prebuilt, index, rec, err = row_sel_get_clust_rec_for_mysql(prebuilt, index, rec,
thr, &clust_rec, thr, &clust_rec,
&offsets, &heap, &mtr); &offsets, &heap, &mtr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS:
if (clust_rec == NULL) {
/* The record did not exist in the read view */
ut_ad(prebuilt->select_lock_type == LOCK_NONE);
goto next_rec;
}
break;
case DB_SUCCESS_LOCKED_REC:
ut_a(clust_rec != NULL);
if (srv_locks_unsafe_for_binlog
|| trx->isolation_level
<= TRX_ISO_READ_COMMITTED) {
/* Note that the clustered index record
was locked. */
prebuilt->new_rec_locks = 2;
}
err = DB_SUCCESS;
break;
default:
goto lock_wait_or_error; goto lock_wait_or_error;
} }
if (clust_rec == NULL) {
/* The record did not exist in the read view */
ut_ad(prebuilt->select_lock_type == LOCK_NONE);
goto next_rec;
}
if ((srv_locks_unsafe_for_binlog
|| trx->isolation_level <= TRX_ISO_READ_COMMITTED)
&& prebuilt->select_lock_type != LOCK_NONE) {
/* Note that both the secondary index record
and the clustered index record were locked. */
ut_ad(prebuilt->new_rec_locks == 1);
prebuilt->new_rec_locks = 2;
}
if (UNIV_UNLIKELY(rec_get_deleted_flag(clust_rec, comp))) { if (UNIV_UNLIKELY(rec_get_deleted_flag(clust_rec, comp))) {
/* The record is delete marked: we can skip it */ /* The record is delete marked: we can skip it */