diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d1ee0261c64..718408bb599 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -5849,6 +5849,23 @@ InitializeLogRepWorker(void) MySubscription->name)); CommitTransactionCommand(); + + /* + * Register a callback to reset the origin state before aborting any + * pending transaction during shutdown (see ShutdownPostgres()). This will + * avoid origin advancement for an incomplete transaction which could + * otherwise lead to its loss as such a transaction won't be sent by the + * server again. + * + * Note that even a LOG or DEBUG statement placed after setting the origin + * state may process a shutdown signal before committing the current apply + * operation. So, it is important to register such a callback here. + * + * Register this callback here to ensure that all types of logical + * replication workers that set up origins and apply remote transactions + * are protected. + */ + before_shmem_exit(replorigin_reset, (Datum) 0); } /* @@ -5892,19 +5909,6 @@ SetupApplyOrSyncWorker(int worker_slot) InitializeLogRepWorker(); - /* - * Register a callback to reset the origin state before aborting any - * pending transaction during shutdown (see ShutdownPostgres()). This will - * avoid origin advancement for an in-complete transaction which could - * otherwise lead to its loss as such a transaction won't be sent by the - * server again. - * - * Note that even a LOG or DEBUG statement placed after setting the origin - * state may process a shutdown signal before committing the current apply - * operation. So, it is important to register such a callback here. - */ - before_shmem_exit(replorigin_reset, (Datum) 0); - /* Connect to the origin and start the replication. */ elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl index e01347ca699..dc629425daa 100644 --- a/src/test/subscription/t/023_twophase_stream.pl +++ b/src/test/subscription/t/023_twophase_stream.pl @@ -429,6 +429,51 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); is($result, qq(1), 'transaction is committed on subscriber'); +# Test the ability to re-apply a transaction when a parallel apply worker fails +# to prepare the transaction due to insufficient max_prepared_transactions +# setting. +$node_subscriber->append_conf( + 'postgresql.conf', qq( +max_prepared_transactions = 0 +debug_logical_replication_streaming = buffered +)); +$node_subscriber->restart; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab_2 values(2); + PREPARE TRANSACTION 'xact'; + COMMIT PREPARED 'xact'; + }); + +$offset = -s $node_subscriber->logfile; + +# Confirm the ERROR is reported because max_prepared_transactions is zero +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/, + $offset); + +# Confirm that the parallel apply worker has encountered an error. The check +# focuses on the worker type as a keyword, since the error message content may +# differ based on whether the leader initially detected the parallel apply +# worker's failure or received a signal from it. +$node_subscriber->wait_for_log( + qr/ERROR: .*logical replication parallel apply worker.*/, + $offset); + +# Set max_prepared_transactions to correct value to resume the replication +$node_subscriber->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_subscriber->restart; + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is committed on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(2), 'transaction is committed on subscriber after retrying'); + ############################### # check all the cleanup ###############################