diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index b885890de37..e06587b0265 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -506,13 +506,13 @@ RemoveSubscriptionRel(Oid subid, Oid relid) } /* - * Does the subscription have any relations? + * Does the subscription have any tables? * * Use this function only to know true/false, and when you have no need for the * List returned by GetSubscriptionRelations. */ bool -HasSubscriptionRelations(Oid subid) +HasSubscriptionTables(Oid subid) { Relation rel; ScanKeyData skey[1]; diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 1e08bbbd4eb..c62c8c67521 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -28,6 +28,7 @@ OBJS = \ reorderbuffer.o \ slotsync.o \ snapbuild.o \ + syncutils.o \ tablesync.o \ worker.o diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 33b7ec7f029..14325581afc 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -970,7 +970,7 @@ ParallelApplyWorkerMain(Datum main_arg) * the subscription relation state. */ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, - invalidate_syncing_table_states, + InvalidateSyncingRelStates, (Datum) 0); set_apply_error_context_origin(originname); diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 6f19614c79d..9283e996ef4 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -14,6 +14,7 @@ backend_sources += files( 'reorderbuffer.c', 'slotsync.c', 'snapbuild.c', + 'syncutils.c', 'tablesync.c', 'worker.c', ) diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c new file mode 100644 index 00000000000..1bb3ca01db0 --- /dev/null +++ b/src/backend/replication/logical/syncutils.c @@ -0,0 +1,187 @@ +/*------------------------------------------------------------------------- + * syncutils.c + * PostgreSQL logical replication: common synchronization code + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/syncutils.c + * + * NOTES + * This file contains code common for synchronization workers. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "catalog/pg_subscription_rel.h" +#include "pgstat.h" +#include "replication/worker_internal.h" +#include "storage/ipc.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" + +/* + * Enum for phases of the subscription relations state. + * + * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations + * state is no longer valid, and the subscription relations should be rebuilt. + * + * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription + * relations state is being rebuilt. + * + * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is + * up-to-date and valid. + */ +typedef enum +{ + SYNC_RELATIONS_STATE_NEEDS_REBUILD, + SYNC_RELATIONS_STATE_REBUILD_STARTED, + SYNC_RELATIONS_STATE_VALID, +} SyncingRelationsState; + +static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD; + +/* + * Exit routine for synchronization worker. + */ +pg_noreturn void +FinishSyncWorker(void) +{ + /* + * Commit any outstanding transaction. This is the usual case, unless + * there was nothing to do for the table. + */ + if (IsTransactionState()) + { + CommitTransactionCommand(); + pgstat_report_stat(true); + } + + /* And flush all writes. */ + XLogFlush(GetXLogWriteRecPtr()); + + StartTransactionCommand(); + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid)))); + CommitTransactionCommand(); + + /* Find the leader apply worker and signal it. */ + logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + + /* Stop gracefully */ + proc_exit(0); +} + +/* + * Callback from syscache invalidation. + */ +void +InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue) +{ + relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD; +} + +/* + * Process possible state change(s) of relations that are being synchronized. + */ +void +ProcessSyncingRelations(XLogRecPtr current_lsn) +{ + switch (MyLogicalRepWorker->type) + { + case WORKERTYPE_PARALLEL_APPLY: + + /* + * Skip for parallel apply workers because they only operate on + * tables that are in a READY state. See pa_can_start() and + * should_apply_changes_for_rel(). + */ + break; + + case WORKERTYPE_TABLESYNC: + ProcessSyncingTablesForSync(current_lsn); + break; + + case WORKERTYPE_APPLY: + ProcessSyncingTablesForApply(current_lsn); + break; + + case WORKERTYPE_UNKNOWN: + /* Should never happen. */ + elog(ERROR, "Unknown worker type"); + } +} + +/* + * Common code to fetch the up-to-date sync state info into the static lists. + * + * Returns true if subscription has 1 or more tables, else false. + * + * Note: If this function started the transaction (indicated by the parameter) + * then it is the caller's responsibility to commit it. + */ +bool +FetchRelationStates(bool *started_tx) +{ + static bool has_subtables = false; + + *started_tx = false; + + if (relation_states_validity != SYNC_RELATIONS_STATE_VALID) + { + MemoryContext oldctx; + List *rstates; + ListCell *lc; + SubscriptionRelState *rstate; + + relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED; + + /* Clean the old lists. */ + list_free_deep(table_states_not_ready); + table_states_not_ready = NIL; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + *started_tx = true; + } + + /* Fetch tables and sequences that are in non-ready state. */ + rstates = GetSubscriptionRelations(MySubscription->oid, true); + + /* Allocate the tracking info in a permanent memory context. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + foreach(lc, rstates) + { + rstate = palloc(sizeof(SubscriptionRelState)); + memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); + table_states_not_ready = lappend(table_states_not_ready, rstate); + } + MemoryContextSwitchTo(oldctx); + + /* + * Does the subscription have tables? + * + * If there were not-READY tables found then we know it does. But if + * table_states_not_ready was empty we still need to check again to + * see if there are 0 tables. + */ + has_subtables = (table_states_not_ready != NIL) || + HasSubscriptionTables(MySubscription->oid); + + /* + * If the subscription relation cache has been invalidated since we + * entered this routine, we still use and return the relations we just + * finished constructing, to avoid infinite loops, but we leave the + * table states marked as stale so that we'll rebuild it again on next + * access. Otherwise, we mark the table states as valid. + */ + if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED) + relation_states_validity = SYNC_RELATIONS_STATE_VALID; + } + + return has_subtables; +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e6da4028d39..2ba12517e93 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -117,58 +117,15 @@ #include "utils/array.h" #include "utils/builtins.h" #include "utils/lsyscache.h" -#include "utils/memutils.h" #include "utils/rls.h" #include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/usercontext.h" -typedef enum -{ - SYNC_TABLE_STATE_NEEDS_REBUILD, - SYNC_TABLE_STATE_REBUILD_STARTED, - SYNC_TABLE_STATE_VALID, -} SyncingTablesState; - -static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD; -static List *table_states_not_ready = NIL; -static bool FetchTableStates(bool *started_tx); +List *table_states_not_ready = NIL; static StringInfo copybuf = NULL; -/* - * Exit routine for synchronization worker. - */ -pg_noreturn static void -finish_sync_worker(void) -{ - /* - * Commit any outstanding transaction. This is the usual case, unless - * there was nothing to do for the table. - */ - if (IsTransactionState()) - { - CommitTransactionCommand(); - pgstat_report_stat(true); - } - - /* And flush all writes. */ - XLogFlush(GetXLogWriteRecPtr()); - - StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); - CommitTransactionCommand(); - - /* Find the leader apply worker and signal it. */ - logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); - - /* Stop gracefully */ - proc_exit(0); -} - /* * Wait until the relation sync state is set in the catalog to the expected * one; return true when it happens. @@ -180,7 +137,7 @@ finish_sync_worker(void) * CATCHUP state to SYNCDONE. */ static bool -wait_for_relation_state_change(Oid relid, char expected_state) +wait_for_table_state_change(Oid relid, char expected_state) { char state; @@ -273,15 +230,6 @@ wait_for_worker_state_change(char expected_state) return false; } -/* - * Callback from syscache invalidation. - */ -void -invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) -{ - table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD; -} - /* * Handle table synchronization cooperation from the synchronization * worker. @@ -290,8 +238,8 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) * predetermined synchronization point in the WAL stream, mark the table as * SYNCDONE and finish. */ -static void -process_syncing_tables_for_sync(XLogRecPtr current_lsn) +void +ProcessSyncingTablesForSync(XLogRecPtr current_lsn) { SpinLockAcquire(&MyLogicalRepWorker->relmutex); @@ -349,9 +297,9 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) /* * Start a new transaction to clean up the tablesync origin tracking. - * This transaction will be ended within the finish_sync_worker(). - * Now, even, if we fail to remove this here, the apply worker will - * ensure to clean it up afterward. + * This transaction will be ended within the FinishSyncWorker(). Now, + * even, if we fail to remove this here, the apply worker will ensure + * to clean it up afterward. * * We need to do this after the table state is set to SYNCDONE. * Otherwise, if an error occurs while performing the database @@ -387,7 +335,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ replorigin_drop_by_name(originname, true, false); - finish_sync_worker(); + FinishSyncWorker(); } else SpinLockRelease(&MyLogicalRepWorker->relmutex); @@ -414,8 +362,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) * If the synchronization position is reached (SYNCDONE), then the table can * be marked as READY and is no longer tracked. */ -static void -process_syncing_tables_for_apply(XLogRecPtr current_lsn) +void +ProcessSyncingTablesForApply(XLogRecPtr current_lsn) { struct tablesync_start_time_mapping { @@ -431,7 +379,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Assert(!IsTransactionState()); /* We need up-to-date sync state info for subscription tables here. */ - FetchTableStates(&started_tx); + FetchRelationStates(&started_tx); /* * Prepare a hash table for tracking last start times of workers, to avoid @@ -586,8 +534,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) StartTransactionCommand(); started_tx = true; - wait_for_relation_state_change(rstate->relid, - SUBREL_STATE_SYNCDONE); + wait_for_table_state_change(rstate->relid, + SUBREL_STATE_SYNCDONE); } else LWLockRelease(LogicalRepWorkerLock); @@ -689,37 +637,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } } -/* - * Process possible state change(s) of tables that are being synchronized. - */ -void -process_syncing_tables(XLogRecPtr current_lsn) -{ - switch (MyLogicalRepWorker->type) - { - case WORKERTYPE_PARALLEL_APPLY: - - /* - * Skip for parallel apply workers because they only operate on - * tables that are in a READY state. See pa_can_start() and - * should_apply_changes_for_rel(). - */ - break; - - case WORKERTYPE_TABLESYNC: - process_syncing_tables_for_sync(current_lsn); - break; - - case WORKERTYPE_APPLY: - process_syncing_tables_for_apply(current_lsn); - break; - - case WORKERTYPE_UNKNOWN: - /* Should never happen. */ - elog(ERROR, "Unknown worker type"); - } -} - /* * Create list of columns for COPY based on logical relation mapping. */ @@ -1356,7 +1273,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: case SUBREL_STATE_UNKNOWN: - finish_sync_worker(); /* doesn't return */ + FinishSyncWorker(); /* doesn't return */ } /* Calculate the name of the tablesync slot. */ @@ -1599,77 +1516,6 @@ copy_table_done: return slotname; } -/* - * Common code to fetch the up-to-date sync state info into the static lists. - * - * Returns true if subscription has 1 or more tables, else false. - * - * Note: If this function started the transaction (indicated by the parameter) - * then it is the caller's responsibility to commit it. - */ -static bool -FetchTableStates(bool *started_tx) -{ - static bool has_subrels = false; - - *started_tx = false; - - if (table_states_validity != SYNC_TABLE_STATE_VALID) - { - MemoryContext oldctx; - List *rstates; - ListCell *lc; - SubscriptionRelState *rstate; - - table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED; - - /* Clean the old lists. */ - list_free_deep(table_states_not_ready); - table_states_not_ready = NIL; - - if (!IsTransactionState()) - { - StartTransactionCommand(); - *started_tx = true; - } - - /* Fetch all non-ready tables. */ - rstates = GetSubscriptionRelations(MySubscription->oid, true); - - /* Allocate the tracking info in a permanent memory context. */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - foreach(lc, rstates) - { - rstate = palloc(sizeof(SubscriptionRelState)); - memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - table_states_not_ready = lappend(table_states_not_ready, rstate); - } - MemoryContextSwitchTo(oldctx); - - /* - * Does the subscription have tables? - * - * If there were not-READY relations found then we know it does. But - * if table_states_not_ready was empty we still need to check again to - * see if there are 0 tables. - */ - has_subrels = (table_states_not_ready != NIL) || - HasSubscriptionRelations(MySubscription->oid); - - /* - * If the subscription relation cache has been invalidated since we - * entered this routine, we still use and return the relations we just - * finished constructing, to avoid infinite loops, but we leave the - * table states marked as stale so that we'll rebuild it again on next - * access. Otherwise, we mark the table states as valid. - */ - if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED) - table_states_validity = SYNC_TABLE_STATE_VALID; - } - - return has_subrels; -} - /* * Execute the initial sync with error handling. Disable the subscription, * if it's required. @@ -1755,7 +1601,7 @@ TablesyncWorkerMain(Datum main_arg) run_tablesync_worker(); - finish_sync_worker(); + FinishSyncWorker(); } /* @@ -1773,7 +1619,7 @@ AllTablesyncsReady(void) bool has_subrels = false; /* We need up-to-date sync state info for subscription tables here. */ - has_subrels = FetchTableStates(&started_tx); + has_subrels = FetchRelationStates(&started_tx); if (started_tx) { @@ -1789,21 +1635,21 @@ AllTablesyncsReady(void) } /* - * Return whether the subscription currently has any relations. + * Return whether the subscription currently has any tables. * - * Note: Unlike HasSubscriptionRelations(), this function relies on cached - * information for subscription relations. Additionally, it should not be + * Note: Unlike HasSubscriptionTables(), this function relies on cached + * information for subscription tables. Additionally, it should not be * invoked outside of apply or tablesync workers, as MySubscription must be * initialized first. */ bool -HasSubscriptionRelationsCached(void) +HasSubscriptionTablesCached(void) { bool started_tx; bool has_subrels; /* We need up-to-date subscription tables info here */ - has_subrels = FetchTableStates(&started_tx); + has_subrels = FetchRelationStates(&started_tx); if (started_tx) { diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 419e478b4c6..3c58ad88476 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -91,7 +91,7 @@ * behave as if two_phase = off. When the apply worker detects that all * tablesyncs have become READY (while the tri-state was PENDING) it will * restart the apply worker process. This happens in - * process_syncing_tables_for_apply. + * ProcessSyncingTablesForApply. * * When the (re-started) apply worker finds that all tablesyncs are READY for a * two_phase tri-state of PENDING it start streaming messages with the @@ -1243,7 +1243,7 @@ apply_handle_commit(StringInfo s) apply_handle_commit_internal(&commit_data); /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(commit_data.end_lsn); + ProcessSyncingRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); @@ -1365,7 +1365,7 @@ apply_handle_prepare(StringInfo s) in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(prepare_data.end_lsn); + ProcessSyncingRelations(prepare_data.end_lsn); /* * Since we have already prepared the transaction, in a case where the @@ -1421,7 +1421,7 @@ apply_handle_commit_prepared(StringInfo s) in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(prepare_data.end_lsn); + ProcessSyncingRelations(prepare_data.end_lsn); clear_subscription_skip_lsn(prepare_data.end_lsn); @@ -1487,7 +1487,7 @@ apply_handle_rollback_prepared(StringInfo s) in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(rollback_data.rollback_end_lsn); + ProcessSyncingRelations(rollback_data.rollback_end_lsn); pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); @@ -1622,7 +1622,7 @@ apply_handle_stream_prepare(StringInfo s) pgstat_report_stat(false); /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(prepare_data.end_lsn); + ProcessSyncingRelations(prepare_data.end_lsn); /* * Similar to prepare case, the subskiplsn could be left in a case of @@ -2464,7 +2464,7 @@ apply_handle_stream_commit(StringInfo s) } /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(commit_data.end_lsn); + ProcessSyncingRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -4133,7 +4133,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) maybe_reread_subscription(); /* Process any table synchronization changes. */ - process_syncing_tables(last_received); + ProcessSyncingRelations(last_received); } /* Cleanup the memory. */ @@ -4623,7 +4623,7 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users * concurrently add tables to the subscription, the apply worker may not * process invalidations in time. Consequently, - * HasSubscriptionRelationsCached() might miss the new tables, leading to + * HasSubscriptionTablesCached() might miss the new tables, leading to * premature advancement of oldest_nonremovable_xid. * * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as @@ -4637,7 +4637,7 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * subscription tables at this stage to prevent unnecessary tuple * retention. */ - if (HasSubscriptionRelationsCached() && !AllTablesyncsReady()) + if (HasSubscriptionTablesCached() && !AllTablesyncsReady()) { TimestampTz now; @@ -5876,7 +5876,7 @@ SetupApplyOrSyncWorker(int worker_slot) * the subscription relation state. */ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, - invalidate_syncing_table_states, + InvalidateSyncingRelStates, (Datum) 0); } diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c index a1976fae607..4e7303ea631 100644 --- a/src/bin/pg_dump/common.c +++ b/src/bin/pg_dump/common.c @@ -244,8 +244,8 @@ getSchemaData(Archive *fout, int *numTablesPtr) pg_log_info("reading subscriptions"); getSubscriptions(fout); - pg_log_info("reading subscription membership of tables"); - getSubscriptionTables(fout); + pg_log_info("reading subscription membership of relations"); + getSubscriptionRelations(fout); free(inhinfo); /* not needed any longer */ diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 641bece12c7..890db7b08c2 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -5305,12 +5305,12 @@ getSubscriptions(Archive *fout) } /* - * getSubscriptionTables - * Get information about subscription membership for dumpable tables. This + * getSubscriptionRelations + * Get information about subscription membership for dumpable relations. This * will be used only in binary-upgrade mode for PG17 or later versions. */ void -getSubscriptionTables(Archive *fout) +getSubscriptionRelations(Archive *fout) { DumpOptions *dopt = fout->dopt; SubscriptionInfo *subinfo = NULL; @@ -5364,7 +5364,7 @@ getSubscriptionTables(Archive *fout) tblinfo = findTableByOid(relid); if (tblinfo == NULL) - pg_fatal("failed sanity check, table with OID %u not found", + pg_fatal("failed sanity check, relation with OID %u not found", relid); /* OK, make a DumpableObject for this relationship */ diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index fa6d1a510f7..72a00e1bc20 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -829,6 +829,6 @@ extern void getPublicationNamespaces(Archive *fout); extern void getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables); extern void getSubscriptions(Archive *fout); -extern void getSubscriptionTables(Archive *fout); +extern void getSubscriptionRelations(Archive *fout); #endif /* PG_DUMP_H */ diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 02f97a547dd..61b63c6bb7a 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -89,7 +89,7 @@ extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); -extern bool HasSubscriptionRelations(Oid subid); +extern bool HasSubscriptionTables(Oid subid); extern List *GetSubscriptionRelations(Oid subid, bool not_ready); extern void UpdateDeadTupleRetentionStatus(Oid subid, bool active); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index de003802612..ae352f6e691 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -251,6 +251,8 @@ extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; +extern PGDLLIMPORT List *table_states_not_ready; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); @@ -272,12 +274,16 @@ extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname); extern bool AllTablesyncsReady(void); -extern bool HasSubscriptionRelationsCached(void); +extern bool HasSubscriptionTablesCached(void); extern void UpdateTwoPhaseState(Oid suboid, char new_state); -extern void process_syncing_tables(XLogRecPtr current_lsn); -extern void invalidate_syncing_table_states(Datum arg, int cacheid, - uint32 hashvalue); +extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn); +extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn); + +pg_noreturn extern void FinishSyncWorker(void); +extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue); +extern void ProcessSyncingRelations(XLogRecPtr current_lsn); +extern bool FetchRelationStates(bool *started_tx); extern void stream_start_internal(TransactionId xid, bool first_segment); extern void stream_stop_internal(TransactionId xid); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 5290b91e83e..ee1cab6190f 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2922,7 +2922,7 @@ SyncRepStandbyData SyncRequestHandler SyncRequestType SyncStandbySlotsConfigData -SyncingTablesState +SyncingRelationsState SysFKRelationship SysScanDesc SyscacheCallbackFunction