diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 832b1cf7642..18f86c73bd3 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4404,6 +4404,17 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) pfree(syncslotname); } +/* + * Reset the origin state. + */ +static void +replorigin_reset(int code, Datum arg) +{ + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; +} + /* * Run the apply loop with error handling. Disable the subscription, * if necessary. @@ -4553,6 +4564,19 @@ ApplyWorkerMain(Datum main_arg) InitializeApplyWorker(); + /* + * Register a callback to reset the origin state before aborting any + * pending transaction during shutdown (see ShutdownPostgres()). This will + * avoid origin advancement for an in-complete transaction which could + * otherwise lead to its loss as such a transaction won't be sent by the + * server again. + * + * Note that even a LOG or DEBUG statement placed after setting the origin + * state may process a shutdown signal before committing the current apply + * operation. So, it is important to register such a callback here. + */ + before_shmem_exit(replorigin_reset, (Datum) 0); + InitializingApplyWorker = false; /* Connect to the origin and start the replication. */ @@ -4916,12 +4940,23 @@ void apply_error_callback(void *arg) { ApplyErrorCallbackArg *errarg = &apply_error_callback_arg; + int elevel; if (apply_error_callback_arg.command == 0) return; Assert(errarg->origin_name); + elevel = geterrlevel(); + + /* + * Reset the origin state to prevent the advancement of origin progress if + * we fail to apply. Otherwise, this will result in transaction loss as + * that transaction won't be sent again by the server. + */ + if (elevel >= ERROR) + replorigin_reset(0, (Datum) 0); + if (errarg->rel == NULL) { if (!TransactionIdIsValid(errarg->remote_xid)) diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c index 7112fb00069..3347f24d4a5 100644 --- a/src/backend/utils/error/elog.c +++ b/src/backend/utils/error/elog.c @@ -1571,6 +1571,23 @@ geterrcode(void) return edata->sqlerrcode; } +/* + * geterrlevel --- return the currently set SQLSTATE error level + * + * This is only intended for use in error callback subroutines, since there + * is no other place outside elog.c where the concept is meaningful. + */ +int +geterrlevel(void) +{ + ErrorData *edata = &errordata[errordata_stack_depth]; + + /* we don't bother incrementing recursion_depth */ + CHECK_STACK_DEPTH(); + + return edata->elevel; +} + /* * geterrposition --- return the currently set error position (0 if none) * diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h index 0292e88b4f2..8bb55e5b3cc 100644 --- a/src/include/utils/elog.h +++ b/src/include/utils/elog.h @@ -226,6 +226,7 @@ extern int internalerrquery(const char *query); extern int err_generic_string(int field, const char *str); extern int geterrcode(void); +extern int geterrlevel(void); extern int geterrposition(void); extern int getinternalerrposition(void); diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl index 8ce4cfc983c..822932c1dbc 100644 --- a/src/test/subscription/t/021_twophase.pl +++ b/src/test/subscription/t/021_twophase.pl @@ -23,7 +23,7 @@ $node_publisher->start; my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init(allows_streaming => 'logical'); $node_subscriber->append_conf('postgresql.conf', - qq(max_prepared_transactions = 10)); + qq(max_prepared_transactions = 0)); $node_subscriber->start; # Create some pre-existing content on publisher @@ -67,12 +67,24 @@ $node_subscriber->poll_query_until('postgres', $twophase_query) # then COMMIT PREPARED ############################### +# Save the log location, to see the failure of the application +my $log_location = -s $node_subscriber->logfile; + $node_publisher->safe_psql( 'postgres', " BEGIN; INSERT INTO tab_full VALUES (11); PREPARE TRANSACTION 'test_prepared_tab_full';"); +# Confirm the ERROR is reported becasue max_prepared_transactions is zero +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/); + +# Set max_prepared_transactions to correct value to resume the replication +$node_subscriber->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_subscriber->restart; + $node_publisher->wait_for_catchup($appname); # check that transaction is in prepared state on subscriber