From f6c5edb8abcac04eb3eac6da356e59d399b2bcef Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Tue, 30 Aug 2022 08:51:41 +0530 Subject: [PATCH] Drop replication origin slots before tablesync worker exits. Currently, the replication origin tracking of the tablesync worker is dropped by the apply worker. So, there will be a small lag between the tablesync worker exit and its origin tracking got removed. In the meantime, new tablesync workers can be launched and will try to set up a new origin tracking. This can lead the system to reach max configured limit (max_replication_slots) even if the user has configured the max limit considering the number of tablesync workers required in the system. We decided not to back-patch as this can occur in very narrow circumstances and users have to option to increase the configured limit by increasing max_replication_slots. Reported-by: Hubert Depesz Lubaczewski Author: Ajin Cherian Reviwed-by: Masahiko Sawada, Peter Smith, Hou Zhijie, Amit Kapila Discussion: https://postgr.es/m/20220714115155.GA5439@depesz.com --- src/backend/commands/subscriptioncmds.c | 25 ++++++----- src/backend/replication/logical/tablesync.c | 50 +++++++++++---------- 2 files changed, 41 insertions(+), 34 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 670b219c8d4..f87796e5afe 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -919,10 +919,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, logicalrep_worker_stop(sub->oid, relid); /* - * For READY state, we would have already dropped the - * tablesync origin. + * For READY state and SYNCDONE state, we would have already + * dropped the tablesync origin. */ - if (state != SUBREL_STATE_READY) + if (state != SUBREL_STATE_READY && state != SUBREL_STATE_SYNCDONE) { char originname[NAMEDATALEN]; @@ -930,11 +930,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * Drop the tablesync's origin tracking if exists. * * It is possible that the origin is not yet created for - * tablesync worker, this can happen for the states before - * SUBREL_STATE_FINISHEDCOPY. The apply worker can also - * concurrently try to drop the origin and by this time - * the origin might be already removed. For these reasons, - * passing missing_ok = true. + * tablesync worker so passing missing_ok = true. This can + * happen for the states before SUBREL_STATE_FINISHEDCOPY. */ ReplicationOriginNameForTablesync(sub->oid, relid, originname, sizeof(originname)); @@ -1507,13 +1504,19 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) /* * Drop the tablesync's origin tracking if exists. * + * For SYNCDONE/READY states, the tablesync origin tracking is known + * to have already been dropped by the tablesync worker. + * * It is possible that the origin is not yet created for tablesync * worker so passing missing_ok = true. This can happen for the states * before SUBREL_STATE_FINISHEDCOPY. */ - ReplicationOriginNameForTablesync(subid, relid, originname, - sizeof(originname)); - replorigin_drop_by_name(originname, true, false); + if (rstate->state != SUBREL_STATE_SYNCDONE) + { + ReplicationOriginNameForTablesync(subid, relid, originname, + sizeof(originname)); + replorigin_drop_by_name(originname, true, false); + } } /* Clean up dependencies */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index d37d8a0d74a..91ba49a14bd 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -291,6 +291,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) { TimeLineID tli; char syncslotname[NAMEDATALEN] = {0}; + char originname[NAMEDATALEN] = {0}; MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; MyLogicalRepWorker->relstate_lsn = current_lsn; @@ -309,6 +310,30 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); + /* + * Cleanup the tablesync origin tracking. + * + * Resetting the origin session removes the ownership of the slot. + * This is needed to allow the origin to be dropped. + */ + ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + replorigin_session_reset(); + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; + + /* + * We expect that origin must be present. The concurrent operations + * that remove origin like a refresh for the subscription take an + * access exclusive lock on pg_subscription which prevent the previous + * operation to update the rel state to SUBREL_STATE_SYNCDONE to + * succeed. + */ + replorigin_drop_by_name(originname, false, false); + /* * End streaming so that LogRepWorkerWalRcvConn can be used to drop * the slot. @@ -318,7 +343,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) /* * Cleanup the tablesync slot. * - * This has to be done after updating the state because otherwise if + * This has to be done after the data changes because otherwise if * there is an error while doing the database operations we won't be * able to rollback dropped slot. */ @@ -441,8 +466,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ if (current_lsn >= rstate->lsn) { - char originname[NAMEDATALEN]; - rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; if (!started_tx) @@ -452,26 +475,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } /* - * Remove the tablesync origin tracking if exists. - * - * The normal case origin drop is done here instead of in the - * process_syncing_tables_for_sync function because we don't - * allow to drop the origin till the process owning the origin - * is alive. - * - * There is a chance that the user is concurrently performing - * refresh for the subscription where we remove the table - * state and its origin and by this time the origin might be - * already removed. So passing missing_ok = true. - */ - ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, - rstate->relid, - originname, - sizeof(originname)); - replorigin_drop_by_name(originname, true, false); - - /* - * Update the state to READY only after the origin cleanup. + * Update the state to READY. */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state,