diff --git a/contrib/test_decoding/expected/parallel_session_origin.out b/contrib/test_decoding/expected/parallel_session_origin.out index e515b39f7ce..8e41831fcbc 100644 --- a/contrib/test_decoding/expected/parallel_session_origin.out +++ b/contrib/test_decoding/expected/parallel_session_origin.out @@ -1,6 +1,6 @@ Parsed test spec with 2 sessions -starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset +starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s1_reset s0_reset step s0_setup: SELECT pg_replication_origin_session_setup('origin'); pg_replication_origin_session_setup ----------------------------------- @@ -65,15 +65,59 @@ step s0_compare: t (1 row) -step s0_reset: SELECT pg_replication_origin_session_reset(); -pg_replication_origin_session_reset ------------------------------------ - -(1 row) - step s1_reset: SELECT pg_replication_origin_session_reset(); pg_replication_origin_session_reset ----------------------------------- (1 row) +step s0_reset: SELECT pg_replication_origin_session_reset(); +pg_replication_origin_session_reset +----------------------------------- + +(1 row) + + +starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_reset s1_reset s0_reset +step s0_setup: SELECT pg_replication_origin_session_setup('origin'); +pg_replication_origin_session_setup +----------------------------------- + +(1 row) + +step s0_is_setup: SELECT pg_replication_origin_session_is_setup(); +pg_replication_origin_session_is_setup +-------------------------------------- +t +(1 row) + +step s1_setup: + SELECT pg_replication_origin_session_setup('origin', pid) + FROM pg_stat_activity + WHERE application_name = 'isolation/parallel_session_origin/s0'; + +pg_replication_origin_session_setup +----------------------------------- + +(1 row) + +step s1_is_setup: SELECT pg_replication_origin_session_is_setup(); +pg_replication_origin_session_is_setup +-------------------------------------- +t +(1 row) + +step s0_reset: SELECT pg_replication_origin_session_reset(); +ERROR: cannot reset replication origin with ID 1 because it is still in use by other processes +step s1_reset: SELECT pg_replication_origin_session_reset(); +pg_replication_origin_session_reset +----------------------------------- + +(1 row) + +step s0_reset: SELECT pg_replication_origin_session_reset(); +pg_replication_origin_session_reset +----------------------------------- + +(1 row) + diff --git a/contrib/test_decoding/specs/parallel_session_origin.spec b/contrib/test_decoding/specs/parallel_session_origin.spec index c0e5fda0723..2253a7a14eb 100644 --- a/contrib/test_decoding/specs/parallel_session_origin.spec +++ b/contrib/test_decoding/specs/parallel_session_origin.spec @@ -53,4 +53,8 @@ step "s1_reset" { SELECT pg_replication_origin_session_reset(); } # Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions # commits a transaction and store the local_lsn of the replication origin. # Compare LSNs and expect latter transaction (done by s1) has larger local_lsn. -permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset" +permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s1_reset" "s0_reset" + +# Test that the origin cannot be released if another session is actively using +# it. +permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_reset" "s1_reset" "s0_reset" diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 04bc704a332..13808a4674b 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -130,6 +130,9 @@ typedef struct ReplicationState */ int acquired_by; + /* Count of processes that are currently using this origin. */ + int refcount; + /* * Condition variable that's signaled when acquired_by changes. */ @@ -383,16 +386,19 @@ restart: if (state->roident == roident) { /* found our slot, is it busy? */ - if (state->acquired_by != 0) + if (state->refcount > 0) { ConditionVariable *cv; if (nowait) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("could not drop replication origin with ID %d, in use by PID %d", - state->roident, - state->acquired_by))); + (state->acquired_by != 0) + ? errmsg("could not drop replication origin with ID %d, in use by PID %d", + state->roident, + state->acquired_by) + : errmsg("could not drop replication origin with ID %d, in use by another process", + state->roident))); /* * We must wait and then retry. Since we don't know which CV @@ -959,13 +965,16 @@ replorigin_advance(RepOriginId node, LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE); /* Make sure it's not used by somebody else */ - if (replication_state->acquired_by != 0) + if (replication_state->refcount > 0) { ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("replication origin with ID %d is already active for PID %d", - replication_state->roident, - replication_state->acquired_by))); + (replication_state->acquired_by != 0) + ? errmsg("replication origin with ID %d is already active for PID %d", + replication_state->roident, + replication_state->acquired_by) + : errmsg("replication origin with ID %d is already active in another process", + replication_state->roident))); } break; @@ -1069,6 +1078,37 @@ replorigin_get_progress(RepOriginId node, bool flush) return remote_lsn; } +/* Helper function to reset the session replication origin */ +static void +replorigin_session_reset_internal(void) +{ + ConditionVariable *cv; + + Assert(session_replication_state != NULL); + + LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); + + /* The origin must be held by at least one process at this point. */ + Assert(session_replication_state->refcount > 0); + + /* + * Reset the PID only if the current session is the first to set up this + * origin. This avoids clearing the first process's PID when any other + * session releases the origin. + */ + if (session_replication_state->acquired_by == MyProcPid) + session_replication_state->acquired_by = 0; + + session_replication_state->refcount--; + + cv = &session_replication_state->origin_cv; + session_replication_state = NULL; + + LWLockRelease(ReplicationOriginLock); + + ConditionVariableBroadcast(cv); +} + /* * Tear down a (possibly) configured session replication origin during process * exit. @@ -1076,25 +1116,10 @@ replorigin_get_progress(RepOriginId node, bool flush) static void ReplicationOriginExitCleanup(int code, Datum arg) { - ConditionVariable *cv = NULL; - if (session_replication_state == NULL) return; - LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); - - if (session_replication_state->acquired_by == MyProcPid) - { - cv = &session_replication_state->origin_cv; - - session_replication_state->acquired_by = 0; - session_replication_state = NULL; - } - - LWLockRelease(ReplicationOriginLock); - - if (cv) - ConditionVariableBroadcast(cv); + replorigin_session_reset_internal(); } /* @@ -1174,6 +1199,18 @@ replorigin_session_setup(RepOriginId node, int acquired_by) node, acquired_by))); } + /* + * The origin is in use, but PID is not recorded. This can happen if + * the process that originally acquired the origin exited without + * releasing it. To ensure correctness, other processes cannot acquire + * the origin until all processes currently using it have released it. + */ + else if (curstate->acquired_by == 0 && curstate->refcount > 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("replication origin with ID %d is already active in another process", + curstate->roident))); + /* ok, found slot */ session_replication_state = curstate; break; @@ -1205,9 +1242,21 @@ replorigin_session_setup(RepOriginId node, int acquired_by) Assert(session_replication_state->roident != InvalidRepOriginId); if (acquired_by == 0) + { session_replication_state->acquired_by = MyProcPid; + Assert(session_replication_state->refcount == 0); + } else + { + /* + * Sanity check: the origin must already be acquired by the process + * passed as input, and at least one process must be using it. + */ Assert(session_replication_state->acquired_by == acquired_by); + Assert(session_replication_state->refcount > 0); + } + + session_replication_state->refcount++; LWLockRelease(ReplicationOriginLock); @@ -1224,8 +1273,6 @@ replorigin_session_setup(RepOriginId node, int acquired_by) void replorigin_session_reset(void) { - ConditionVariable *cv; - Assert(max_active_replication_origins != 0); if (session_replication_state == NULL) @@ -1233,15 +1280,22 @@ replorigin_session_reset(void) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("no replication origin is configured"))); - LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); + /* + * Restrict explicit resetting of the replication origin if it was first + * acquired by this process and others are still using it. While the + * system handles this safely (as happens if the first session exits + * without calling reset), it is best to avoid doing so. + */ + if (session_replication_state->acquired_by == MyProcPid && + session_replication_state->refcount > 1) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot reset replication origin with ID %d because it is still in use by other processes", + session_replication_state->roident), + errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."), + errhint("Reset the replication origin in all other processes before retrying."))); - session_replication_state->acquired_by = 0; - cv = &session_replication_state->origin_cv; - session_replication_state = NULL; - - LWLockRelease(ReplicationOriginLock); - - ConditionVariableBroadcast(cv); + replorigin_session_reset_internal(); } /*