1
0
mirror of https://github.com/postgres/postgres.git synced 2025-05-06 19:59:18 +03:00

Drop replication origin slots before tablesync worker exits.

Currently, the replication origin tracking of the tablesync worker is
dropped by the apply worker. So, there will be a small lag between the
tablesync worker exit and its origin tracking got removed. In the
meantime, new tablesync workers can be launched and will try to set up
a new origin tracking. This can lead the system to reach max configured
limit (max_replication_slots) even if the user has configured the max
limit considering the number of tablesync workers required in the system.

We decided not to back-patch as this can occur in very narrow
circumstances and users have to option to increase the configured limit by
increasing max_replication_slots.

Reported-by: Hubert Depesz Lubaczewski
Author: Ajin Cherian
Reviwed-by: Masahiko Sawada, Peter Smith, Hou Zhijie, Amit Kapila
Discussion: https://postgr.es/m/20220714115155.GA5439@depesz.com
This commit is contained in:
Amit Kapila 2022-08-30 08:51:41 +05:30
parent 865424627d
commit f6c5edb8ab
2 changed files with 41 additions and 34 deletions

View File

@ -919,10 +919,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
logicalrep_worker_stop(sub->oid, relid); logicalrep_worker_stop(sub->oid, relid);
/* /*
* For READY state, we would have already dropped the * For READY state and SYNCDONE state, we would have already
* tablesync origin. * dropped the tablesync origin.
*/ */
if (state != SUBREL_STATE_READY) if (state != SUBREL_STATE_READY && state != SUBREL_STATE_SYNCDONE)
{ {
char originname[NAMEDATALEN]; char originname[NAMEDATALEN];
@ -930,11 +930,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
* Drop the tablesync's origin tracking if exists. * Drop the tablesync's origin tracking if exists.
* *
* It is possible that the origin is not yet created for * It is possible that the origin is not yet created for
* tablesync worker, this can happen for the states before * tablesync worker so passing missing_ok = true. This can
* SUBREL_STATE_FINISHEDCOPY. The apply worker can also * happen for the states before SUBREL_STATE_FINISHEDCOPY.
* concurrently try to drop the origin and by this time
* the origin might be already removed. For these reasons,
* passing missing_ok = true.
*/ */
ReplicationOriginNameForTablesync(sub->oid, relid, originname, ReplicationOriginNameForTablesync(sub->oid, relid, originname,
sizeof(originname)); sizeof(originname));
@ -1507,13 +1504,19 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
/* /*
* Drop the tablesync's origin tracking if exists. * Drop the tablesync's origin tracking if exists.
* *
* For SYNCDONE/READY states, the tablesync origin tracking is known
* to have already been dropped by the tablesync worker.
*
* It is possible that the origin is not yet created for tablesync * It is possible that the origin is not yet created for tablesync
* worker so passing missing_ok = true. This can happen for the states * worker so passing missing_ok = true. This can happen for the states
* before SUBREL_STATE_FINISHEDCOPY. * before SUBREL_STATE_FINISHEDCOPY.
*/ */
ReplicationOriginNameForTablesync(subid, relid, originname, if (rstate->state != SUBREL_STATE_SYNCDONE)
sizeof(originname)); {
replorigin_drop_by_name(originname, true, false); ReplicationOriginNameForTablesync(subid, relid, originname,
sizeof(originname));
replorigin_drop_by_name(originname, true, false);
}
} }
/* Clean up dependencies */ /* Clean up dependencies */

View File

@ -291,6 +291,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
{ {
TimeLineID tli; TimeLineID tli;
char syncslotname[NAMEDATALEN] = {0}; char syncslotname[NAMEDATALEN] = {0};
char originname[NAMEDATALEN] = {0};
MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
MyLogicalRepWorker->relstate_lsn = current_lsn; MyLogicalRepWorker->relstate_lsn = current_lsn;
@ -309,6 +310,30 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn); MyLogicalRepWorker->relstate_lsn);
/*
* Cleanup the tablesync origin tracking.
*
* Resetting the origin session removes the ownership of the slot.
* This is needed to allow the origin to be dropped.
*/
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
originname,
sizeof(originname));
replorigin_session_reset();
replorigin_session_origin = InvalidRepOriginId;
replorigin_session_origin_lsn = InvalidXLogRecPtr;
replorigin_session_origin_timestamp = 0;
/*
* We expect that origin must be present. The concurrent operations
* that remove origin like a refresh for the subscription take an
* access exclusive lock on pg_subscription which prevent the previous
* operation to update the rel state to SUBREL_STATE_SYNCDONE to
* succeed.
*/
replorigin_drop_by_name(originname, false, false);
/* /*
* End streaming so that LogRepWorkerWalRcvConn can be used to drop * End streaming so that LogRepWorkerWalRcvConn can be used to drop
* the slot. * the slot.
@ -318,7 +343,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
/* /*
* Cleanup the tablesync slot. * Cleanup the tablesync slot.
* *
* This has to be done after updating the state because otherwise if * This has to be done after the data changes because otherwise if
* there is an error while doing the database operations we won't be * there is an error while doing the database operations we won't be
* able to rollback dropped slot. * able to rollback dropped slot.
*/ */
@ -441,8 +466,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
*/ */
if (current_lsn >= rstate->lsn) if (current_lsn >= rstate->lsn)
{ {
char originname[NAMEDATALEN];
rstate->state = SUBREL_STATE_READY; rstate->state = SUBREL_STATE_READY;
rstate->lsn = current_lsn; rstate->lsn = current_lsn;
if (!started_tx) if (!started_tx)
@ -452,26 +475,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
} }
/* /*
* Remove the tablesync origin tracking if exists. * Update the state to READY.
*
* The normal case origin drop is done here instead of in the
* process_syncing_tables_for_sync function because we don't
* allow to drop the origin till the process owning the origin
* is alive.
*
* There is a chance that the user is concurrently performing
* refresh for the subscription where we remove the table
* state and its origin and by this time the origin might be
* already removed. So passing missing_ok = true.
*/
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
rstate->relid,
originname,
sizeof(originname));
replorigin_drop_by_name(originname, true, false);
/*
* Update the state to READY only after the origin cleanup.
*/ */
UpdateSubscriptionRelState(MyLogicalRepWorker->subid, UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state, rstate->relid, rstate->state,