diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index e31551340c9..a570900a429 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -118,7 +118,7 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); static void store_flush_position(XLogRecPtr remote_lsn); -static void reread_subscription(void); +static void maybe_reread_subscription(void); /* Flags set by signal handlers */ static volatile sig_atomic_t got_SIGHUP = false; @@ -165,8 +165,7 @@ ensure_transaction(void) StartTransactionCommand(); - if (!MySubscriptionValid) - reread_subscription(); + maybe_reread_subscription(); MemoryContextSwitchTo(ApplyMessageContext); return true; @@ -463,6 +462,12 @@ apply_handle_commit(StringInfo s) store_flush_position(commit_data.end_lsn); } + else + { + /* Process any invalidation messages that might have accumulated. */ + AcceptInvalidationMessages(); + maybe_reread_subscription(); + } in_remote_transaction = false; @@ -1119,8 +1124,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) * now. */ AcceptInvalidationMessages(); - if (!MySubscriptionValid) - reread_subscription(); + maybe_reread_subscription(); /* Process any table synchronization changes. */ process_syncing_tables(last_received); @@ -1302,17 +1306,20 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) last_flushpos = flushpos; } - /* - * Reread subscription info and exit on change. + * Reread subscription info if needed. Most changes will be exit. */ static void -reread_subscription(void) +maybe_reread_subscription(void) { MemoryContext oldctx; Subscription *newsub; bool started_tx = false; + /* When cache state is valid there is nothing to do here. */ + if (MySubscriptionValid) + return; + /* This function might be called inside or outside of transaction. */ if (!IsTransactionState()) {