diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index cdea6295d8a..38c28953078 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4713,6 +4713,17 @@ InitializeLogRepWorker(void) CommitTransactionCommand(); } +/* + * Reset the origin state. + */ +static void +replorigin_reset(int code, Datum arg) +{ + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; +} + /* Common function to setup the leader apply or tablesync worker. */ void SetupApplyOrSyncWorker(int worker_slot) @@ -4741,6 +4752,19 @@ SetupApplyOrSyncWorker(int worker_slot) InitializeLogRepWorker(); + /* + * Register a callback to reset the origin state before aborting any + * pending transaction during shutdown (see ShutdownPostgres()). This will + * avoid origin advancement for an in-complete transaction which could + * otherwise lead to its loss as such a transaction won't be sent by the + * server again. + * + * Note that even a LOG or DEBUG statement placed after setting the origin + * state may process a shutdown signal before committing the current apply + * operation. So, it is important to register such a callback here. + */ + before_shmem_exit(replorigin_reset, (Datum) 0); + /* Connect to the origin and start the replication. */ elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); @@ -4967,12 +4991,23 @@ void apply_error_callback(void *arg) { ApplyErrorCallbackArg *errarg = &apply_error_callback_arg; + int elevel; if (apply_error_callback_arg.command == 0) return; Assert(errarg->origin_name); + elevel = geterrlevel(); + + /* + * Reset the origin state to prevent the advancement of origin progress if + * we fail to apply. Otherwise, this will result in transaction loss as + * that transaction won't be sent again by the server. + */ + if (elevel >= ERROR) + replorigin_reset(0, (Datum) 0); + if (errarg->rel == NULL) { if (!TransactionIdIsValid(errarg->remote_xid)) diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c index 943d8588f3d..5cbb5b54168 100644 --- a/src/backend/utils/error/elog.c +++ b/src/backend/utils/error/elog.c @@ -1568,6 +1568,23 @@ geterrcode(void) return edata->sqlerrcode; } +/* + * geterrlevel --- return the currently set error level + * + * This is only intended for use in error callback subroutines, since there + * is no other place outside elog.c where the concept is meaningful. + */ +int +geterrlevel(void) +{ + ErrorData *edata = &errordata[errordata_stack_depth]; + + /* we don't bother incrementing recursion_depth */ + CHECK_STACK_DEPTH(); + + return edata->elevel; +} + /* * geterrposition --- return the currently set error position (0 if none) * diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h index 054dd2bf62f..e54eca5b489 100644 --- a/src/include/utils/elog.h +++ b/src/include/utils/elog.h @@ -226,6 +226,7 @@ extern int internalerrquery(const char *query); extern int err_generic_string(int field, const char *str); extern int geterrcode(void); +extern int geterrlevel(void); extern int geterrposition(void); extern int getinternalerrposition(void); diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl index 5e50f1af338..19147f31e21 100644 --- a/src/test/subscription/t/021_twophase.pl +++ b/src/test/subscription/t/021_twophase.pl @@ -23,7 +23,7 @@ $node_publisher->start; my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init; $node_subscriber->append_conf('postgresql.conf', - qq(max_prepared_transactions = 10)); + qq(max_prepared_transactions = 0)); $node_subscriber->start; # Create some pre-existing content on publisher @@ -67,12 +67,24 @@ $node_subscriber->poll_query_until('postgres', $twophase_query) # then COMMIT PREPARED ############################### +# Save the log location, to see the failure of the application +my $log_location = -s $node_subscriber->logfile; + $node_publisher->safe_psql( 'postgres', " BEGIN; INSERT INTO tab_full VALUES (11); PREPARE TRANSACTION 'test_prepared_tab_full';"); +# Confirm the ERROR is reported becasue max_prepared_transactions is zero +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/); + +# Set max_prepared_transactions to correct value to resume the replication +$node_subscriber->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_subscriber->restart; + $node_publisher->wait_for_catchup($appname); # check that transaction is in prepared state on subscriber