mirror of
https://github.com/postgres/postgres.git
synced 2025-07-28 23:42:10 +03:00
Fix race condition during replication origin drop.
replorigin_drop() misunderstood the API for condition variables: it
had ConditionVariablePrepareToSleep and ConditionVariableCancelSleep
inside its test-and-sleep loop, rather than outside the loop as
intended. The net effect is a narrow race-condition window wherein,
if the process using a replication slot releases it immediately after
replorigin_drop() releases the ReplicationOriginLock, replorigin_drop()
would get into the condition variable's wait list too late and then
wait indefinitely for a signal that won't come.
Because there's a different CV for each replication slot, we can't
just move the ConditionVariablePrepareToSleep call to above the
test-and-sleep loop. What we can do, in the wake of commit 13db3b936
,
is drop the ConditionVariablePrepareToSleep call entirely. This fix
depends on that commit because (at least in principle) the slot matching
the target replication origin might move around, so that once in a blue
moon successive loop iterations might involve different CVs. We can now
cope with such a scenario, at the cost of an extra trip through the
retry loop.
(There are ways we could fix this bug without depending on that commit,
but they're all a lot more complicated than this way.)
While at it, upgrade the rather skimpy comments in this function.
Back-patch to v10 where this code came in.
Discussion: https://postgr.es/m/19947.1515455433@sss.pgh.pa.us
This commit is contained in:
@ -339,20 +339,26 @@ replorigin_drop(RepOriginId roident, bool nowait)
|
|||||||
|
|
||||||
Assert(IsTransactionState());
|
Assert(IsTransactionState());
|
||||||
|
|
||||||
|
/*
|
||||||
|
* To interlock against concurrent drops, we hold ExclusiveLock on
|
||||||
|
* pg_replication_origin throughout this funcion.
|
||||||
|
*/
|
||||||
rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
|
rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* First, clean up the slot state info, if there is any matching slot.
|
||||||
|
*/
|
||||||
restart:
|
restart:
|
||||||
tuple = NULL;
|
tuple = NULL;
|
||||||
/* cleanup the slot state info */
|
|
||||||
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
|
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
for (i = 0; i < max_replication_slots; i++)
|
for (i = 0; i < max_replication_slots; i++)
|
||||||
{
|
{
|
||||||
ReplicationState *state = &replication_states[i];
|
ReplicationState *state = &replication_states[i];
|
||||||
|
|
||||||
/* found our slot */
|
|
||||||
if (state->roident == roident)
|
if (state->roident == roident)
|
||||||
{
|
{
|
||||||
|
/* found our slot, is it busy? */
|
||||||
if (state->acquired_by != 0)
|
if (state->acquired_by != 0)
|
||||||
{
|
{
|
||||||
ConditionVariable *cv;
|
ConditionVariable *cv;
|
||||||
@ -363,16 +369,23 @@ restart:
|
|||||||
errmsg("could not drop replication origin with OID %d, in use by PID %d",
|
errmsg("could not drop replication origin with OID %d, in use by PID %d",
|
||||||
state->roident,
|
state->roident,
|
||||||
state->acquired_by)));
|
state->acquired_by)));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We must wait and then retry. Since we don't know which CV
|
||||||
|
* to wait on until here, we can't readily use
|
||||||
|
* ConditionVariablePrepareToSleep (calling it here would be
|
||||||
|
* wrong, since we could miss the signal if we did so); just
|
||||||
|
* use ConditionVariableSleep directly.
|
||||||
|
*/
|
||||||
cv = &state->origin_cv;
|
cv = &state->origin_cv;
|
||||||
|
|
||||||
LWLockRelease(ReplicationOriginLock);
|
LWLockRelease(ReplicationOriginLock);
|
||||||
ConditionVariablePrepareToSleep(cv);
|
|
||||||
ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
|
ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
|
||||||
ConditionVariableCancelSleep();
|
|
||||||
goto restart;
|
goto restart;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* first WAL log */
|
/* first make a WAL log entry */
|
||||||
{
|
{
|
||||||
xl_replorigin_drop xlrec;
|
xl_replorigin_drop xlrec;
|
||||||
|
|
||||||
@ -382,7 +395,7 @@ restart:
|
|||||||
XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
|
XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* then reset the in-memory entry */
|
/* then clear the in-memory slot */
|
||||||
state->roident = InvalidRepOriginId;
|
state->roident = InvalidRepOriginId;
|
||||||
state->remote_lsn = InvalidXLogRecPtr;
|
state->remote_lsn = InvalidXLogRecPtr;
|
||||||
state->local_lsn = InvalidXLogRecPtr;
|
state->local_lsn = InvalidXLogRecPtr;
|
||||||
@ -390,7 +403,11 @@ restart:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
LWLockRelease(ReplicationOriginLock);
|
LWLockRelease(ReplicationOriginLock);
|
||||||
|
ConditionVariableCancelSleep();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Now, we can delete the catalog entry.
|
||||||
|
*/
|
||||||
tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
|
tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
|
||||||
if (!HeapTupleIsValid(tuple))
|
if (!HeapTupleIsValid(tuple))
|
||||||
elog(ERROR, "cache lookup failed for replication origin with oid %u",
|
elog(ERROR, "cache lookup failed for replication origin with oid %u",
|
||||||
|
Reference in New Issue
Block a user