mirror of
https://github.com/postgres/postgres.git
synced 2025-11-15 03:41:20 +03:00
Implement SKIP LOCKED for row-level locks
This clause changes the behavior of SELECT locking clauses in the presence of locked rows: instead of causing a process to block waiting for the locks held by other processes (or raise an error, with NOWAIT), SKIP LOCKED makes the new reader skip over such rows. While this is not appropriate behavior for general purposes, there are some cases in which it is useful, such as queue-like tables. Catalog version bumped because this patch changes the representation of stored rules. Reviewed by Craig Ringer (based on a previous attempt at an implementation by Simon Riggs, who also provided input on the syntax used in the current patch), David Rowley, and Álvaro Herrera. Author: Thomas Munro
This commit is contained in:
@@ -4090,7 +4090,7 @@ get_mxact_status_for_lock(LockTupleMode mode, bool is_update)
|
||||
* cid: current command ID (used for visibility test, and stored into
|
||||
* tuple's cmax if lock is successful)
|
||||
* mode: indicates if shared or exclusive tuple lock is desired
|
||||
* nowait: if true, ereport rather than blocking if lock not available
|
||||
* wait_policy: what to do if tuple lock is not available
|
||||
* follow_updates: if true, follow the update chain to also lock descendant
|
||||
* tuples.
|
||||
*
|
||||
@@ -4103,6 +4103,7 @@ get_mxact_status_for_lock(LockTupleMode mode, bool is_update)
|
||||
* HeapTupleMayBeUpdated: lock was successfully acquired
|
||||
* HeapTupleSelfUpdated: lock failed because tuple updated by self
|
||||
* HeapTupleUpdated: lock failed because tuple updated by other xact
|
||||
* HeapTupleWouldBlock: lock couldn't be acquired and wait_policy is skip
|
||||
*
|
||||
* In the failure cases, the routine fills *hufd with the tuple's t_ctid,
|
||||
* t_xmax (resolving a possible MultiXact, if necessary), and t_cmax
|
||||
@@ -4114,7 +4115,7 @@ get_mxact_status_for_lock(LockTupleMode mode, bool is_update)
|
||||
*/
|
||||
HTSU_Result
|
||||
heap_lock_tuple(Relation relation, HeapTuple tuple,
|
||||
CommandId cid, LockTupleMode mode, bool nowait,
|
||||
CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy,
|
||||
bool follow_updates,
|
||||
Buffer *buffer, HeapUpdateFailureData *hufd)
|
||||
{
|
||||
@@ -4220,16 +4221,28 @@ l3:
|
||||
*/
|
||||
if (!have_tuple_lock)
|
||||
{
|
||||
if (nowait)
|
||||
switch (wait_policy)
|
||||
{
|
||||
if (!ConditionalLockTupleTuplock(relation, tid, mode))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
|
||||
errmsg("could not obtain lock on row in relation \"%s\"",
|
||||
RelationGetRelationName(relation))));
|
||||
case LockWaitBlock:
|
||||
LockTupleTuplock(relation, tid, mode);
|
||||
break;
|
||||
case LockWaitSkip:
|
||||
if (!ConditionalLockTupleTuplock(relation, tid, mode))
|
||||
{
|
||||
result = HeapTupleWouldBlock;
|
||||
/* recovery code expects to have buffer lock held */
|
||||
LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
|
||||
goto failed;
|
||||
}
|
||||
break;
|
||||
case LockWaitError:
|
||||
if (!ConditionalLockTupleTuplock(relation, tid, mode))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
|
||||
errmsg("could not obtain lock on row in relation \"%s\"",
|
||||
RelationGetRelationName(relation))));
|
||||
break;
|
||||
}
|
||||
else
|
||||
LockTupleTuplock(relation, tid, mode);
|
||||
have_tuple_lock = true;
|
||||
}
|
||||
|
||||
@@ -4432,21 +4445,35 @@ l3:
|
||||
if (status >= MultiXactStatusNoKeyUpdate)
|
||||
elog(ERROR, "invalid lock mode in heap_lock_tuple");
|
||||
|
||||
/* wait for multixact to end */
|
||||
if (nowait)
|
||||
/* wait for multixact to end, or die trying */
|
||||
switch (wait_policy)
|
||||
{
|
||||
if (!ConditionalMultiXactIdWait((MultiXactId) xwait,
|
||||
status, infomask, relation,
|
||||
NULL))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
|
||||
errmsg("could not obtain lock on row in relation \"%s\"",
|
||||
RelationGetRelationName(relation))));
|
||||
case LockWaitBlock:
|
||||
MultiXactIdWait((MultiXactId) xwait, status, infomask,
|
||||
relation, &tuple->t_data->t_ctid, XLTW_Lock, NULL);
|
||||
break;
|
||||
case LockWaitSkip:
|
||||
if (!ConditionalMultiXactIdWait((MultiXactId) xwait,
|
||||
status, infomask, relation,
|
||||
NULL))
|
||||
{
|
||||
result = HeapTupleWouldBlock;
|
||||
/* recovery code expects to have buffer lock held */
|
||||
LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
|
||||
goto failed;
|
||||
}
|
||||
break;
|
||||
case LockWaitError:
|
||||
if (!ConditionalMultiXactIdWait((MultiXactId) xwait,
|
||||
status, infomask, relation,
|
||||
NULL))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
|
||||
errmsg("could not obtain lock on row in relation \"%s\"",
|
||||
RelationGetRelationName(relation))));
|
||||
|
||||
break;
|
||||
}
|
||||
else
|
||||
MultiXactIdWait((MultiXactId) xwait, status, infomask,
|
||||
relation, &tuple->t_data->t_ctid,
|
||||
XLTW_Lock, NULL);
|
||||
|
||||
/* if there are updates, follow the update chain */
|
||||
if (follow_updates &&
|
||||
@@ -4491,18 +4518,30 @@ l3:
|
||||
}
|
||||
else
|
||||
{
|
||||
/* wait for regular transaction to end */
|
||||
if (nowait)
|
||||
/* wait for regular transaction to end, or die trying */
|
||||
switch (wait_policy)
|
||||
{
|
||||
if (!ConditionalXactLockTableWait(xwait))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
|
||||
errmsg("could not obtain lock on row in relation \"%s\"",
|
||||
RelationGetRelationName(relation))));
|
||||
case LockWaitBlock:
|
||||
XactLockTableWait(xwait, relation, &tuple->t_data->t_ctid,
|
||||
XLTW_Lock);
|
||||
break;
|
||||
case LockWaitSkip:
|
||||
if (!ConditionalXactLockTableWait(xwait))
|
||||
{
|
||||
result = HeapTupleWouldBlock;
|
||||
/* recovery code expects to have buffer lock held */
|
||||
LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
|
||||
goto failed;
|
||||
}
|
||||
break;
|
||||
case LockWaitError:
|
||||
if (!ConditionalXactLockTableWait(xwait))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
|
||||
errmsg("could not obtain lock on row in relation \"%s\"",
|
||||
RelationGetRelationName(relation))));
|
||||
break;
|
||||
}
|
||||
else
|
||||
XactLockTableWait(xwait, relation, &tuple->t_data->t_ctid,
|
||||
XLTW_Lock);
|
||||
|
||||
/* if there are updates, follow the update chain */
|
||||
if (follow_updates &&
|
||||
@@ -4564,7 +4603,8 @@ l3:
|
||||
failed:
|
||||
if (result != HeapTupleMayBeUpdated)
|
||||
{
|
||||
Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
|
||||
Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated ||
|
||||
result == HeapTupleWouldBlock);
|
||||
Assert(!(tuple->t_data->t_infomask & HEAP_XMAX_INVALID));
|
||||
hufd->ctid = tuple->t_data->t_ctid;
|
||||
hufd->xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
|
||||
|
||||
@@ -2706,7 +2706,7 @@ ltrmark:;
|
||||
tuple.t_self = *tid;
|
||||
test = heap_lock_tuple(relation, &tuple,
|
||||
estate->es_output_cid,
|
||||
lockmode, false /* wait */ ,
|
||||
lockmode, LockWaitBlock,
|
||||
false, &buffer, &hufd);
|
||||
switch (test)
|
||||
{
|
||||
|
||||
@@ -836,7 +836,7 @@ InitPlan(QueryDesc *queryDesc, int eflags)
|
||||
erm->prti = rc->prti;
|
||||
erm->rowmarkId = rc->rowmarkId;
|
||||
erm->markType = rc->markType;
|
||||
erm->noWait = rc->noWait;
|
||||
erm->waitPolicy = rc->waitPolicy;
|
||||
ItemPointerSetInvalid(&(erm->curCtid));
|
||||
estate->es_rowMarks = lappend(estate->es_rowMarks, erm);
|
||||
}
|
||||
@@ -1871,7 +1871,7 @@ EvalPlanQual(EState *estate, EPQState *epqstate,
|
||||
/*
|
||||
* Get and lock the updated version of the row; if fail, return NULL.
|
||||
*/
|
||||
copyTuple = EvalPlanQualFetch(estate, relation, lockmode, false /* wait */,
|
||||
copyTuple = EvalPlanQualFetch(estate, relation, lockmode, LockWaitBlock,
|
||||
tid, priorXmax);
|
||||
|
||||
if (copyTuple == NULL)
|
||||
@@ -1930,12 +1930,15 @@ EvalPlanQual(EState *estate, EPQState *epqstate,
|
||||
* estate - executor state data
|
||||
* relation - table containing tuple
|
||||
* lockmode - requested tuple lock mode
|
||||
* noWait - wait mode to pass to heap_lock_tuple
|
||||
* wait_policy - requested lock wait policy
|
||||
* *tid - t_ctid from the outdated tuple (ie, next updated version)
|
||||
* priorXmax - t_xmax from the outdated tuple
|
||||
*
|
||||
* Returns a palloc'd copy of the newest tuple version, or NULL if we find
|
||||
* that there is no newest version (ie, the row was deleted not updated).
|
||||
* We also return NULL if the tuple is locked and the wait policy is to skip
|
||||
* such tuples.
|
||||
*
|
||||
* If successful, we have locked the newest tuple version, so caller does not
|
||||
* need to worry about it changing anymore.
|
||||
*
|
||||
@@ -1943,7 +1946,8 @@ EvalPlanQual(EState *estate, EPQState *epqstate,
|
||||
* but we use "int" to avoid having to include heapam.h in executor.h.
|
||||
*/
|
||||
HeapTuple
|
||||
EvalPlanQualFetch(EState *estate, Relation relation, int lockmode, bool noWait,
|
||||
EvalPlanQualFetch(EState *estate, Relation relation, int lockmode,
|
||||
LockWaitPolicy wait_policy,
|
||||
ItemPointer tid, TransactionId priorXmax)
|
||||
{
|
||||
HeapTuple copyTuple = NULL;
|
||||
@@ -1992,18 +1996,25 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode, bool noWait,
|
||||
if (TransactionIdIsValid(SnapshotDirty.xmax))
|
||||
{
|
||||
ReleaseBuffer(buffer);
|
||||
if (noWait)
|
||||
switch (wait_policy)
|
||||
{
|
||||
if (!ConditionalXactLockTableWait(SnapshotDirty.xmax))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
|
||||
errmsg("could not obtain lock on row in relation \"%s\"",
|
||||
RelationGetRelationName(relation))));
|
||||
case LockWaitBlock:
|
||||
XactLockTableWait(SnapshotDirty.xmax,
|
||||
relation, &tuple.t_data->t_ctid,
|
||||
XLTW_FetchUpdated);
|
||||
break;
|
||||
case LockWaitSkip:
|
||||
if (!ConditionalXactLockTableWait(SnapshotDirty.xmax))
|
||||
return NULL; /* skip instead of waiting */
|
||||
break;
|
||||
case LockWaitError:
|
||||
if (!ConditionalXactLockTableWait(SnapshotDirty.xmax))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
|
||||
errmsg("could not obtain lock on row in relation \"%s\"",
|
||||
RelationGetRelationName(relation))));
|
||||
break;
|
||||
}
|
||||
else
|
||||
XactLockTableWait(SnapshotDirty.xmax,
|
||||
relation, &tuple.t_data->t_ctid,
|
||||
XLTW_FetchUpdated);
|
||||
continue; /* loop back to repeat heap_fetch */
|
||||
}
|
||||
|
||||
@@ -2030,7 +2041,7 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode, bool noWait,
|
||||
*/
|
||||
test = heap_lock_tuple(relation, &tuple,
|
||||
estate->es_output_cid,
|
||||
lockmode, noWait,
|
||||
lockmode, wait_policy,
|
||||
false, &buffer, &hufd);
|
||||
/* We now have two pins on the buffer, get rid of one */
|
||||
ReleaseBuffer(buffer);
|
||||
@@ -2076,6 +2087,10 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode, bool noWait,
|
||||
/* tuple was deleted, so give up */
|
||||
return NULL;
|
||||
|
||||
case HeapTupleWouldBlock:
|
||||
ReleaseBuffer(buffer);
|
||||
return NULL;
|
||||
|
||||
default:
|
||||
ReleaseBuffer(buffer);
|
||||
elog(ERROR, "unrecognized heap_lock_tuple status: %u",
|
||||
|
||||
@@ -133,11 +133,15 @@ lnext:
|
||||
|
||||
test = heap_lock_tuple(erm->relation, &tuple,
|
||||
estate->es_output_cid,
|
||||
lockmode, erm->noWait, true,
|
||||
lockmode, erm->waitPolicy, true,
|
||||
&buffer, &hufd);
|
||||
ReleaseBuffer(buffer);
|
||||
switch (test)
|
||||
{
|
||||
case HeapTupleWouldBlock:
|
||||
/* couldn't lock tuple in SKIP LOCKED mode */
|
||||
goto lnext;
|
||||
|
||||
case HeapTupleSelfUpdated:
|
||||
|
||||
/*
|
||||
@@ -170,12 +174,15 @@ lnext:
|
||||
}
|
||||
|
||||
/* updated, so fetch and lock the updated version */
|
||||
copyTuple = EvalPlanQualFetch(estate, erm->relation, lockmode, erm->noWait,
|
||||
&hufd.ctid, hufd.xmax);
|
||||
copyTuple = EvalPlanQualFetch(estate, erm->relation, lockmode,
|
||||
erm->waitPolicy, &hufd.ctid, hufd.xmax);
|
||||
|
||||
if (copyTuple == NULL)
|
||||
{
|
||||
/* Tuple was deleted, so don't return it */
|
||||
/*
|
||||
* Tuple was deleted; or it's locked and we're under SKIP
|
||||
* LOCKED policy, so don't return it
|
||||
*/
|
||||
goto lnext;
|
||||
}
|
||||
/* remember the actually locked tuple's TID */
|
||||
|
||||
@@ -959,7 +959,7 @@ _copyPlanRowMark(const PlanRowMark *from)
|
||||
COPY_SCALAR_FIELD(prti);
|
||||
COPY_SCALAR_FIELD(rowmarkId);
|
||||
COPY_SCALAR_FIELD(markType);
|
||||
COPY_SCALAR_FIELD(noWait);
|
||||
COPY_SCALAR_FIELD(waitPolicy);
|
||||
COPY_SCALAR_FIELD(isParent);
|
||||
|
||||
return newnode;
|
||||
@@ -2071,7 +2071,7 @@ _copyRowMarkClause(const RowMarkClause *from)
|
||||
|
||||
COPY_SCALAR_FIELD(rti);
|
||||
COPY_SCALAR_FIELD(strength);
|
||||
COPY_SCALAR_FIELD(noWait);
|
||||
COPY_SCALAR_FIELD(waitPolicy);
|
||||
COPY_SCALAR_FIELD(pushedDown);
|
||||
|
||||
return newnode;
|
||||
@@ -2452,7 +2452,7 @@ _copyLockingClause(const LockingClause *from)
|
||||
|
||||
COPY_NODE_FIELD(lockedRels);
|
||||
COPY_SCALAR_FIELD(strength);
|
||||
COPY_SCALAR_FIELD(noWait);
|
||||
COPY_SCALAR_FIELD(waitPolicy);
|
||||
|
||||
return newnode;
|
||||
}
|
||||
|
||||
@@ -2312,7 +2312,7 @@ _equalLockingClause(const LockingClause *a, const LockingClause *b)
|
||||
{
|
||||
COMPARE_NODE_FIELD(lockedRels);
|
||||
COMPARE_SCALAR_FIELD(strength);
|
||||
COMPARE_SCALAR_FIELD(noWait);
|
||||
COMPARE_SCALAR_FIELD(waitPolicy);
|
||||
|
||||
return true;
|
||||
}
|
||||
@@ -2408,7 +2408,7 @@ _equalRowMarkClause(const RowMarkClause *a, const RowMarkClause *b)
|
||||
{
|
||||
COMPARE_SCALAR_FIELD(rti);
|
||||
COMPARE_SCALAR_FIELD(strength);
|
||||
COMPARE_SCALAR_FIELD(noWait);
|
||||
COMPARE_SCALAR_FIELD(waitPolicy);
|
||||
COMPARE_SCALAR_FIELD(pushedDown);
|
||||
|
||||
return true;
|
||||
|
||||
@@ -836,7 +836,7 @@ _outPlanRowMark(StringInfo str, const PlanRowMark *node)
|
||||
WRITE_UINT_FIELD(prti);
|
||||
WRITE_UINT_FIELD(rowmarkId);
|
||||
WRITE_ENUM_FIELD(markType, RowMarkType);
|
||||
WRITE_BOOL_FIELD(noWait);
|
||||
WRITE_BOOL_FIELD(waitPolicy);
|
||||
WRITE_BOOL_FIELD(isParent);
|
||||
}
|
||||
|
||||
@@ -2136,7 +2136,7 @@ _outLockingClause(StringInfo str, const LockingClause *node)
|
||||
|
||||
WRITE_NODE_FIELD(lockedRels);
|
||||
WRITE_ENUM_FIELD(strength, LockClauseStrength);
|
||||
WRITE_BOOL_FIELD(noWait);
|
||||
WRITE_ENUM_FIELD(waitPolicy, LockWaitPolicy);
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -2327,7 +2327,7 @@ _outRowMarkClause(StringInfo str, const RowMarkClause *node)
|
||||
|
||||
WRITE_UINT_FIELD(rti);
|
||||
WRITE_ENUM_FIELD(strength, LockClauseStrength);
|
||||
WRITE_BOOL_FIELD(noWait);
|
||||
WRITE_ENUM_FIELD(waitPolicy, LockWaitPolicy);
|
||||
WRITE_BOOL_FIELD(pushedDown);
|
||||
}
|
||||
|
||||
|
||||
@@ -321,7 +321,7 @@ _readRowMarkClause(void)
|
||||
|
||||
READ_UINT_FIELD(rti);
|
||||
READ_ENUM_FIELD(strength, LockClauseStrength);
|
||||
READ_BOOL_FIELD(noWait);
|
||||
READ_ENUM_FIELD(waitPolicy, LockWaitPolicy);
|
||||
READ_BOOL_FIELD(pushedDown);
|
||||
|
||||
READ_DONE();
|
||||
|
||||
@@ -2232,7 +2232,7 @@ preprocess_rowmarks(PlannerInfo *root)
|
||||
newrc->markType = ROW_MARK_KEYSHARE;
|
||||
break;
|
||||
}
|
||||
newrc->noWait = rc->noWait;
|
||||
newrc->waitPolicy = rc->waitPolicy;
|
||||
newrc->isParent = false;
|
||||
|
||||
prowmarks = lappend(prowmarks, newrc);
|
||||
@@ -2260,7 +2260,7 @@ preprocess_rowmarks(PlannerInfo *root)
|
||||
newrc->markType = ROW_MARK_REFERENCE;
|
||||
else
|
||||
newrc->markType = ROW_MARK_COPY;
|
||||
newrc->noWait = false; /* doesn't matter */
|
||||
newrc->waitPolicy = LockWaitBlock; /* doesn't matter */
|
||||
newrc->isParent = false;
|
||||
|
||||
prowmarks = lappend(prowmarks, newrc);
|
||||
|
||||
@@ -233,19 +233,19 @@ expand_security_qual(PlannerInfo *root, List *tlist, int rt_index,
|
||||
{
|
||||
case ROW_MARK_EXCLUSIVE:
|
||||
applyLockingClause(subquery, 1, LCS_FORUPDATE,
|
||||
rc->noWait, false);
|
||||
rc->waitPolicy, false);
|
||||
break;
|
||||
case ROW_MARK_NOKEYEXCLUSIVE:
|
||||
applyLockingClause(subquery, 1, LCS_FORNOKEYUPDATE,
|
||||
rc->noWait, false);
|
||||
rc->waitPolicy, false);
|
||||
break;
|
||||
case ROW_MARK_SHARE:
|
||||
applyLockingClause(subquery, 1, LCS_FORSHARE,
|
||||
rc->noWait, false);
|
||||
rc->waitPolicy, false);
|
||||
break;
|
||||
case ROW_MARK_KEYSHARE:
|
||||
applyLockingClause(subquery, 1, LCS_FORKEYSHARE,
|
||||
rc->noWait, false);
|
||||
rc->waitPolicy, false);
|
||||
break;
|
||||
case ROW_MARK_REFERENCE:
|
||||
case ROW_MARK_COPY:
|
||||
|
||||
@@ -1389,7 +1389,7 @@ expand_inherited_rtentry(PlannerInfo *root, RangeTblEntry *rte, Index rti)
|
||||
newrc->prti = rti;
|
||||
newrc->rowmarkId = oldrc->rowmarkId;
|
||||
newrc->markType = oldrc->markType;
|
||||
newrc->noWait = oldrc->noWait;
|
||||
newrc->waitPolicy = oldrc->waitPolicy;
|
||||
newrc->isParent = false;
|
||||
|
||||
root->rowMarks = lappend(root->rowMarks, newrc);
|
||||
|
||||
@@ -2358,7 +2358,7 @@ transformLockingClause(ParseState *pstate, Query *qry, LockingClause *lc,
|
||||
allrels = makeNode(LockingClause);
|
||||
allrels->lockedRels = NIL; /* indicates all rels */
|
||||
allrels->strength = lc->strength;
|
||||
allrels->noWait = lc->noWait;
|
||||
allrels->waitPolicy = lc->waitPolicy;
|
||||
|
||||
if (lockedRels == NIL)
|
||||
{
|
||||
@@ -2372,13 +2372,13 @@ transformLockingClause(ParseState *pstate, Query *qry, LockingClause *lc,
|
||||
switch (rte->rtekind)
|
||||
{
|
||||
case RTE_RELATION:
|
||||
applyLockingClause(qry, i,
|
||||
lc->strength, lc->noWait, pushedDown);
|
||||
applyLockingClause(qry, i, lc->strength, lc->waitPolicy,
|
||||
pushedDown);
|
||||
rte->requiredPerms |= ACL_SELECT_FOR_UPDATE;
|
||||
break;
|
||||
case RTE_SUBQUERY:
|
||||
applyLockingClause(qry, i,
|
||||
lc->strength, lc->noWait, pushedDown);
|
||||
applyLockingClause(qry, i, lc->strength, lc->waitPolicy,
|
||||
pushedDown);
|
||||
|
||||
/*
|
||||
* FOR UPDATE/SHARE of subquery is propagated to all of
|
||||
@@ -2424,15 +2424,13 @@ transformLockingClause(ParseState *pstate, Query *qry, LockingClause *lc,
|
||||
switch (rte->rtekind)
|
||||
{
|
||||
case RTE_RELATION:
|
||||
applyLockingClause(qry, i,
|
||||
lc->strength, lc->noWait,
|
||||
pushedDown);
|
||||
applyLockingClause(qry, i, lc->strength,
|
||||
lc->waitPolicy, pushedDown);
|
||||
rte->requiredPerms |= ACL_SELECT_FOR_UPDATE;
|
||||
break;
|
||||
case RTE_SUBQUERY:
|
||||
applyLockingClause(qry, i,
|
||||
lc->strength, lc->noWait,
|
||||
pushedDown);
|
||||
applyLockingClause(qry, i, lc->strength,
|
||||
lc->waitPolicy, pushedDown);
|
||||
/* see comment above */
|
||||
transformLockingClause(pstate, rte->subquery,
|
||||
allrels, true);
|
||||
@@ -2499,7 +2497,8 @@ transformLockingClause(ParseState *pstate, Query *qry, LockingClause *lc,
|
||||
*/
|
||||
void
|
||||
applyLockingClause(Query *qry, Index rtindex,
|
||||
LockClauseStrength strength, bool noWait, bool pushedDown)
|
||||
LockClauseStrength strength, LockWaitPolicy waitPolicy,
|
||||
bool pushedDown)
|
||||
{
|
||||
RowMarkClause *rc;
|
||||
|
||||
@@ -2516,15 +2515,20 @@ applyLockingClause(Query *qry, Index rtindex,
|
||||
* a shared and exclusive lock at the same time; it'll end up being
|
||||
* exclusive anyway.)
|
||||
*
|
||||
* We also consider that NOWAIT wins if it's specified both ways. This
|
||||
* is a bit more debatable but raising an error doesn't seem helpful.
|
||||
* (Consider for instance SELECT FOR UPDATE NOWAIT from a view that
|
||||
* internally contains a plain FOR UPDATE spec.)
|
||||
* Similarly, if the same RTE is specified with more than one lock wait
|
||||
* policy, consider that NOWAIT wins over SKIP LOCKED, which in turn
|
||||
* wins over waiting for the lock (the default). This is a bit more
|
||||
* debatable but raising an error doesn't seem helpful. (Consider for
|
||||
* instance SELECT FOR UPDATE NOWAIT from a view that internally
|
||||
* contains a plain FOR UPDATE spec.) Having NOWAIT win over SKIP
|
||||
* LOCKED is reasonable since the former throws an error in case of
|
||||
* coming across a locked tuple, which may be undesirable in some cases
|
||||
* but it seems better than silently returning inconsistent results.
|
||||
*
|
||||
* And of course pushedDown becomes false if any clause is explicit.
|
||||
*/
|
||||
rc->strength = Max(rc->strength, strength);
|
||||
rc->noWait |= noWait;
|
||||
rc->waitPolicy = Max(rc->waitPolicy, waitPolicy);
|
||||
rc->pushedDown &= pushedDown;
|
||||
return;
|
||||
}
|
||||
@@ -2533,7 +2537,7 @@ applyLockingClause(Query *qry, Index rtindex,
|
||||
rc = makeNode(RowMarkClause);
|
||||
rc->rti = rtindex;
|
||||
rc->strength = strength;
|
||||
rc->noWait = noWait;
|
||||
rc->waitPolicy = waitPolicy;
|
||||
rc->pushedDown = pushedDown;
|
||||
qry->rowMarks = lappend(qry->rowMarks, rc);
|
||||
}
|
||||
|
||||
@@ -284,6 +284,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
|
||||
%type <boolean> opt_force opt_or_replace
|
||||
opt_grant_grant_option opt_grant_admin_option
|
||||
opt_nowait opt_if_exists opt_with_data
|
||||
%type <ival> opt_nowait_or_skip
|
||||
|
||||
%type <list> OptRoleList AlterOptRoleList
|
||||
%type <defelt> CreateOptRoleElem AlterOptRoleElem
|
||||
@@ -582,7 +583,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
|
||||
|
||||
LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
|
||||
LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
|
||||
LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOGGED
|
||||
LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
|
||||
|
||||
MAPPING MATCH MATERIALIZED MAXVALUE MINUTE_P MINVALUE MODE MONTH_P MOVE
|
||||
|
||||
@@ -606,7 +607,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
|
||||
|
||||
SAVEPOINT SCHEMA SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE SEQUENCES
|
||||
SERIALIZABLE SERVER SESSION SESSION_USER SET SETOF SHARE
|
||||
SHOW SIMILAR SIMPLE SMALLINT SNAPSHOT SOME STABLE STANDALONE_P START
|
||||
SHOW SIMILAR SIMPLE SKIP SMALLINT SNAPSHOT SOME STABLE STANDALONE_P START
|
||||
STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P SUBSTRING
|
||||
SYMMETRIC SYSID SYSTEM_P
|
||||
|
||||
@@ -9370,6 +9371,12 @@ opt_nowait: NOWAIT { $$ = TRUE; }
|
||||
| /*EMPTY*/ { $$ = FALSE; }
|
||||
;
|
||||
|
||||
opt_nowait_or_skip:
|
||||
NOWAIT { $$ = LockWaitError; }
|
||||
| SKIP LOCKED { $$ = LockWaitSkip; }
|
||||
| /*EMPTY*/ { $$ = LockWaitBlock; }
|
||||
;
|
||||
|
||||
|
||||
/*****************************************************************************
|
||||
*
|
||||
@@ -10011,12 +10018,12 @@ for_locking_items:
|
||||
;
|
||||
|
||||
for_locking_item:
|
||||
for_locking_strength locked_rels_list opt_nowait
|
||||
for_locking_strength locked_rels_list opt_nowait_or_skip
|
||||
{
|
||||
LockingClause *n = makeNode(LockingClause);
|
||||
n->lockedRels = $2;
|
||||
n->strength = $1;
|
||||
n->noWait = $3;
|
||||
n->waitPolicy = $3;
|
||||
$$ = (Node *) n;
|
||||
}
|
||||
;
|
||||
@@ -13145,6 +13152,7 @@ unreserved_keyword:
|
||||
| LOCAL
|
||||
| LOCATION
|
||||
| LOCK_P
|
||||
| LOCKED
|
||||
| LOGGED
|
||||
| MAPPING
|
||||
| MATCH
|
||||
@@ -13229,6 +13237,7 @@ unreserved_keyword:
|
||||
| SHARE
|
||||
| SHOW
|
||||
| SIMPLE
|
||||
| SKIP
|
||||
| SNAPSHOT
|
||||
| STABLE
|
||||
| STANDALONE_P
|
||||
|
||||
@@ -63,7 +63,8 @@ static void rewriteValuesRTE(RangeTblEntry *rte, Relation target_relation,
|
||||
static void rewriteTargetListUD(Query *parsetree, RangeTblEntry *target_rte,
|
||||
Relation target_relation);
|
||||
static void markQueryForLocking(Query *qry, Node *jtnode,
|
||||
LockClauseStrength strength, bool noWait, bool pushedDown);
|
||||
LockClauseStrength strength, LockWaitPolicy waitPolicy,
|
||||
bool pushedDown);
|
||||
static List *matchLocks(CmdType event, RuleLock *rulelocks,
|
||||
int varno, Query *parsetree);
|
||||
static Query *fireRIRrules(Query *parsetree, List *activeRIRs,
|
||||
@@ -1482,7 +1483,7 @@ ApplyRetrieveRule(Query *parsetree,
|
||||
*/
|
||||
if (rc != NULL)
|
||||
markQueryForLocking(rule_action, (Node *) rule_action->jointree,
|
||||
rc->strength, rc->noWait, true);
|
||||
rc->strength, rc->waitPolicy, true);
|
||||
|
||||
return parsetree;
|
||||
}
|
||||
@@ -1500,7 +1501,8 @@ ApplyRetrieveRule(Query *parsetree,
|
||||
*/
|
||||
static void
|
||||
markQueryForLocking(Query *qry, Node *jtnode,
|
||||
LockClauseStrength strength, bool noWait, bool pushedDown)
|
||||
LockClauseStrength strength, LockWaitPolicy waitPolicy,
|
||||
bool pushedDown)
|
||||
{
|
||||
if (jtnode == NULL)
|
||||
return;
|
||||
@@ -1511,15 +1513,15 @@ markQueryForLocking(Query *qry, Node *jtnode,
|
||||
|
||||
if (rte->rtekind == RTE_RELATION)
|
||||
{
|
||||
applyLockingClause(qry, rti, strength, noWait, pushedDown);
|
||||
applyLockingClause(qry, rti, strength, waitPolicy, pushedDown);
|
||||
rte->requiredPerms |= ACL_SELECT_FOR_UPDATE;
|
||||
}
|
||||
else if (rte->rtekind == RTE_SUBQUERY)
|
||||
{
|
||||
applyLockingClause(qry, rti, strength, noWait, pushedDown);
|
||||
applyLockingClause(qry, rti, strength, waitPolicy, pushedDown);
|
||||
/* FOR UPDATE/SHARE of subquery is propagated to subquery's rels */
|
||||
markQueryForLocking(rte->subquery, (Node *) rte->subquery->jointree,
|
||||
strength, noWait, true);
|
||||
strength, waitPolicy, true);
|
||||
}
|
||||
/* other RTE types are unaffected by FOR UPDATE */
|
||||
}
|
||||
@@ -1529,14 +1531,14 @@ markQueryForLocking(Query *qry, Node *jtnode,
|
||||
ListCell *l;
|
||||
|
||||
foreach(l, f->fromlist)
|
||||
markQueryForLocking(qry, lfirst(l), strength, noWait, pushedDown);
|
||||
markQueryForLocking(qry, lfirst(l), strength, waitPolicy, pushedDown);
|
||||
}
|
||||
else if (IsA(jtnode, JoinExpr))
|
||||
{
|
||||
JoinExpr *j = (JoinExpr *) jtnode;
|
||||
|
||||
markQueryForLocking(qry, j->larg, strength, noWait, pushedDown);
|
||||
markQueryForLocking(qry, j->rarg, strength, noWait, pushedDown);
|
||||
markQueryForLocking(qry, j->larg, strength, waitPolicy, pushedDown);
|
||||
markQueryForLocking(qry, j->rarg, strength, waitPolicy, pushedDown);
|
||||
}
|
||||
else
|
||||
elog(ERROR, "unrecognized node type: %d",
|
||||
|
||||
@@ -4446,8 +4446,10 @@ get_select_query_def(Query *query, deparse_context *context,
|
||||
appendStringInfo(buf, " OF %s",
|
||||
quote_identifier(get_rtable_name(rc->rti,
|
||||
context)));
|
||||
if (rc->noWait)
|
||||
if (rc->waitPolicy == LockWaitError)
|
||||
appendStringInfoString(buf, " NOWAIT");
|
||||
else if (rc->waitPolicy == LockWaitSkip)
|
||||
appendStringInfoString(buf, " SKIP LOCKED");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user