diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 1fc8f2dee9b..88fa35a9625 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -339,20 +339,26 @@ replorigin_drop(RepOriginId roident, bool nowait) Assert(IsTransactionState()); + /* + * To interlock against concurrent drops, we hold ExclusiveLock on + * pg_replication_origin throughout this funcion. + */ rel = heap_open(ReplicationOriginRelationId, ExclusiveLock); + /* + * First, clean up the slot state info, if there is any matching slot. + */ restart: tuple = NULL; - /* cleanup the slot state info */ LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); for (i = 0; i < max_replication_slots; i++) { ReplicationState *state = &replication_states[i]; - /* found our slot */ if (state->roident == roident) { + /* found our slot, is it busy? */ if (state->acquired_by != 0) { ConditionVariable *cv; @@ -363,16 +369,23 @@ restart: errmsg("could not drop replication origin with OID %d, in use by PID %d", state->roident, state->acquired_by))); + + /* + * We must wait and then retry. Since we don't know which CV + * to wait on until here, we can't readily use + * ConditionVariablePrepareToSleep (calling it here would be + * wrong, since we could miss the signal if we did so); just + * use ConditionVariableSleep directly. + */ cv = &state->origin_cv; LWLockRelease(ReplicationOriginLock); - ConditionVariablePrepareToSleep(cv); + ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP); - ConditionVariableCancelSleep(); goto restart; } - /* first WAL log */ + /* first make a WAL log entry */ { xl_replorigin_drop xlrec; @@ -382,7 +395,7 @@ restart: XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP); } - /* then reset the in-memory entry */ + /* then clear the in-memory slot */ state->roident = InvalidRepOriginId; state->remote_lsn = InvalidXLogRecPtr; state->local_lsn = InvalidXLogRecPtr; @@ -390,7 +403,11 @@ restart: } } LWLockRelease(ReplicationOriginLock); + ConditionVariableCancelSleep(); + /* + * Now, we can delete the catalog entry. + */ tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident)); if (!HeapTupleIsValid(tuple)) elog(ERROR, "cache lookup failed for replication origin with oid %u",