1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-18 04:29:09 +03:00

Refactor logical worker synchronization code into a separate file.

To support the upcoming addition of a sequence synchronization worker,
this patch extracts common synchronization logic shared by table sync
workers and the new sequence sync worker into a dedicated file. This
modularization improves code reuse, maintainability, and clarity in the
logical workers framework.

Author: vignesh C <vignesh21@gmail.com>
Author: Hou Zhijie <houzj.fnst@fujitsu.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Peter Smith <smithpb2250@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com
This commit is contained in:
Amit Kapila
2025-10-16 05:10:50 +00:00
parent 905e932f09
commit 41c674d2e3
13 changed files with 243 additions and 202 deletions

View File

@@ -506,13 +506,13 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
} }
/* /*
* Does the subscription have any relations? * Does the subscription have any tables?
* *
* Use this function only to know true/false, and when you have no need for the * Use this function only to know true/false, and when you have no need for the
* List returned by GetSubscriptionRelations. * List returned by GetSubscriptionRelations.
*/ */
bool bool
HasSubscriptionRelations(Oid subid) HasSubscriptionTables(Oid subid)
{ {
Relation rel; Relation rel;
ScanKeyData skey[1]; ScanKeyData skey[1];

View File

@@ -28,6 +28,7 @@ OBJS = \
reorderbuffer.o \ reorderbuffer.o \
slotsync.o \ slotsync.o \
snapbuild.o \ snapbuild.o \
syncutils.o \
tablesync.o \ tablesync.o \
worker.o worker.o

View File

@@ -970,7 +970,7 @@ ParallelApplyWorkerMain(Datum main_arg)
* the subscription relation state. * the subscription relation state.
*/ */
CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
invalidate_syncing_table_states, InvalidateSyncingRelStates,
(Datum) 0); (Datum) 0);
set_apply_error_context_origin(originname); set_apply_error_context_origin(originname);

View File

@@ -14,6 +14,7 @@ backend_sources += files(
'reorderbuffer.c', 'reorderbuffer.c',
'slotsync.c', 'slotsync.c',
'snapbuild.c', 'snapbuild.c',
'syncutils.c',
'tablesync.c', 'tablesync.c',
'worker.c', 'worker.c',
) )

View File

@@ -0,0 +1,187 @@
/*-------------------------------------------------------------------------
* syncutils.c
* PostgreSQL logical replication: common synchronization code
*
* Copyright (c) 2025, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/logical/syncutils.c
*
* NOTES
* This file contains code common for synchronization workers.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/pg_subscription_rel.h"
#include "pgstat.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
/*
* Enum for phases of the subscription relations state.
*
* SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations
* state is no longer valid, and the subscription relations should be rebuilt.
*
* SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription
* relations state is being rebuilt.
*
* SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is
* up-to-date and valid.
*/
typedef enum
{
SYNC_RELATIONS_STATE_NEEDS_REBUILD,
SYNC_RELATIONS_STATE_REBUILD_STARTED,
SYNC_RELATIONS_STATE_VALID,
} SyncingRelationsState;
static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
/*
* Exit routine for synchronization worker.
*/
pg_noreturn void
FinishSyncWorker(void)
{
/*
* Commit any outstanding transaction. This is the usual case, unless
* there was nothing to do for the table.
*/
if (IsTransactionState())
{
CommitTransactionCommand();
pgstat_report_stat(true);
}
/* And flush all writes. */
XLogFlush(GetXLogWriteRecPtr());
StartTransactionCommand();
ereport(LOG,
(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
MySubscription->name,
get_rel_name(MyLogicalRepWorker->relid))));
CommitTransactionCommand();
/* Find the leader apply worker and signal it. */
logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
/* Stop gracefully */
proc_exit(0);
}
/*
* Callback from syscache invalidation.
*/
void
InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
{
relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
}
/*
* Process possible state change(s) of relations that are being synchronized.
*/
void
ProcessSyncingRelations(XLogRecPtr current_lsn)
{
switch (MyLogicalRepWorker->type)
{
case WORKERTYPE_PARALLEL_APPLY:
/*
* Skip for parallel apply workers because they only operate on
* tables that are in a READY state. See pa_can_start() and
* should_apply_changes_for_rel().
*/
break;
case WORKERTYPE_TABLESYNC:
ProcessSyncingTablesForSync(current_lsn);
break;
case WORKERTYPE_APPLY:
ProcessSyncingTablesForApply(current_lsn);
break;
case WORKERTYPE_UNKNOWN:
/* Should never happen. */
elog(ERROR, "Unknown worker type");
}
}
/*
* Common code to fetch the up-to-date sync state info into the static lists.
*
* Returns true if subscription has 1 or more tables, else false.
*
* Note: If this function started the transaction (indicated by the parameter)
* then it is the caller's responsibility to commit it.
*/
bool
FetchRelationStates(bool *started_tx)
{
static bool has_subtables = false;
*started_tx = false;
if (relation_states_validity != SYNC_RELATIONS_STATE_VALID)
{
MemoryContext oldctx;
List *rstates;
ListCell *lc;
SubscriptionRelState *rstate;
relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
/* Clean the old lists. */
list_free_deep(table_states_not_ready);
table_states_not_ready = NIL;
if (!IsTransactionState())
{
StartTransactionCommand();
*started_tx = true;
}
/* Fetch tables and sequences that are in non-ready state. */
rstates = GetSubscriptionRelations(MySubscription->oid, true);
/* Allocate the tracking info in a permanent memory context. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
foreach(lc, rstates)
{
rstate = palloc(sizeof(SubscriptionRelState));
memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
table_states_not_ready = lappend(table_states_not_ready, rstate);
}
MemoryContextSwitchTo(oldctx);
/*
* Does the subscription have tables?
*
* If there were not-READY tables found then we know it does. But if
* table_states_not_ready was empty we still need to check again to
* see if there are 0 tables.
*/
has_subtables = (table_states_not_ready != NIL) ||
HasSubscriptionTables(MySubscription->oid);
/*
* If the subscription relation cache has been invalidated since we
* entered this routine, we still use and return the relations we just
* finished constructing, to avoid infinite loops, but we leave the
* table states marked as stale so that we'll rebuild it again on next
* access. Otherwise, we mark the table states as valid.
*/
if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED)
relation_states_validity = SYNC_RELATIONS_STATE_VALID;
}
return has_subtables;
}

View File

@@ -117,58 +117,15 @@
#include "utils/array.h" #include "utils/array.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rls.h" #include "utils/rls.h"
#include "utils/snapmgr.h" #include "utils/snapmgr.h"
#include "utils/syscache.h" #include "utils/syscache.h"
#include "utils/usercontext.h" #include "utils/usercontext.h"
typedef enum List *table_states_not_ready = NIL;
{
SYNC_TABLE_STATE_NEEDS_REBUILD,
SYNC_TABLE_STATE_REBUILD_STARTED,
SYNC_TABLE_STATE_VALID,
} SyncingTablesState;
static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
static List *table_states_not_ready = NIL;
static bool FetchTableStates(bool *started_tx);
static StringInfo copybuf = NULL; static StringInfo copybuf = NULL;
/*
* Exit routine for synchronization worker.
*/
pg_noreturn static void
finish_sync_worker(void)
{
/*
* Commit any outstanding transaction. This is the usual case, unless
* there was nothing to do for the table.
*/
if (IsTransactionState())
{
CommitTransactionCommand();
pgstat_report_stat(true);
}
/* And flush all writes. */
XLogFlush(GetXLogWriteRecPtr());
StartTransactionCommand();
ereport(LOG,
(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
MySubscription->name,
get_rel_name(MyLogicalRepWorker->relid))));
CommitTransactionCommand();
/* Find the leader apply worker and signal it. */
logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
/* Stop gracefully */
proc_exit(0);
}
/* /*
* Wait until the relation sync state is set in the catalog to the expected * Wait until the relation sync state is set in the catalog to the expected
* one; return true when it happens. * one; return true when it happens.
@@ -180,7 +137,7 @@ finish_sync_worker(void)
* CATCHUP state to SYNCDONE. * CATCHUP state to SYNCDONE.
*/ */
static bool static bool
wait_for_relation_state_change(Oid relid, char expected_state) wait_for_table_state_change(Oid relid, char expected_state)
{ {
char state; char state;
@@ -273,15 +230,6 @@ wait_for_worker_state_change(char expected_state)
return false; return false;
} }
/*
* Callback from syscache invalidation.
*/
void
invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
{
table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
}
/* /*
* Handle table synchronization cooperation from the synchronization * Handle table synchronization cooperation from the synchronization
* worker. * worker.
@@ -290,8 +238,8 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
* predetermined synchronization point in the WAL stream, mark the table as * predetermined synchronization point in the WAL stream, mark the table as
* SYNCDONE and finish. * SYNCDONE and finish.
*/ */
static void void
process_syncing_tables_for_sync(XLogRecPtr current_lsn) ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
{ {
SpinLockAcquire(&MyLogicalRepWorker->relmutex); SpinLockAcquire(&MyLogicalRepWorker->relmutex);
@@ -349,9 +297,9 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
/* /*
* Start a new transaction to clean up the tablesync origin tracking. * Start a new transaction to clean up the tablesync origin tracking.
* This transaction will be ended within the finish_sync_worker(). * This transaction will be ended within the FinishSyncWorker(). Now,
* Now, even, if we fail to remove this here, the apply worker will * even, if we fail to remove this here, the apply worker will ensure
* ensure to clean it up afterward. * to clean it up afterward.
* *
* We need to do this after the table state is set to SYNCDONE. * We need to do this after the table state is set to SYNCDONE.
* Otherwise, if an error occurs while performing the database * Otherwise, if an error occurs while performing the database
@@ -387,7 +335,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
*/ */
replorigin_drop_by_name(originname, true, false); replorigin_drop_by_name(originname, true, false);
finish_sync_worker(); FinishSyncWorker();
} }
else else
SpinLockRelease(&MyLogicalRepWorker->relmutex); SpinLockRelease(&MyLogicalRepWorker->relmutex);
@@ -414,8 +362,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
* If the synchronization position is reached (SYNCDONE), then the table can * If the synchronization position is reached (SYNCDONE), then the table can
* be marked as READY and is no longer tracked. * be marked as READY and is no longer tracked.
*/ */
static void void
process_syncing_tables_for_apply(XLogRecPtr current_lsn) ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
{ {
struct tablesync_start_time_mapping struct tablesync_start_time_mapping
{ {
@@ -431,7 +379,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Assert(!IsTransactionState()); Assert(!IsTransactionState());
/* We need up-to-date sync state info for subscription tables here. */ /* We need up-to-date sync state info for subscription tables here. */
FetchTableStates(&started_tx); FetchRelationStates(&started_tx);
/* /*
* Prepare a hash table for tracking last start times of workers, to avoid * Prepare a hash table for tracking last start times of workers, to avoid
@@ -586,7 +534,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
StartTransactionCommand(); StartTransactionCommand();
started_tx = true; started_tx = true;
wait_for_relation_state_change(rstate->relid, wait_for_table_state_change(rstate->relid,
SUBREL_STATE_SYNCDONE); SUBREL_STATE_SYNCDONE);
} }
else else
@@ -689,37 +637,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
} }
} }
/*
* Process possible state change(s) of tables that are being synchronized.
*/
void
process_syncing_tables(XLogRecPtr current_lsn)
{
switch (MyLogicalRepWorker->type)
{
case WORKERTYPE_PARALLEL_APPLY:
/*
* Skip for parallel apply workers because they only operate on
* tables that are in a READY state. See pa_can_start() and
* should_apply_changes_for_rel().
*/
break;
case WORKERTYPE_TABLESYNC:
process_syncing_tables_for_sync(current_lsn);
break;
case WORKERTYPE_APPLY:
process_syncing_tables_for_apply(current_lsn);
break;
case WORKERTYPE_UNKNOWN:
/* Should never happen. */
elog(ERROR, "Unknown worker type");
}
}
/* /*
* Create list of columns for COPY based on logical relation mapping. * Create list of columns for COPY based on logical relation mapping.
*/ */
@@ -1356,7 +1273,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_SYNCDONE:
case SUBREL_STATE_READY: case SUBREL_STATE_READY:
case SUBREL_STATE_UNKNOWN: case SUBREL_STATE_UNKNOWN:
finish_sync_worker(); /* doesn't return */ FinishSyncWorker(); /* doesn't return */
} }
/* Calculate the name of the tablesync slot. */ /* Calculate the name of the tablesync slot. */
@@ -1599,77 +1516,6 @@ copy_table_done:
return slotname; return slotname;
} }
/*
* Common code to fetch the up-to-date sync state info into the static lists.
*
* Returns true if subscription has 1 or more tables, else false.
*
* Note: If this function started the transaction (indicated by the parameter)
* then it is the caller's responsibility to commit it.
*/
static bool
FetchTableStates(bool *started_tx)
{
static bool has_subrels = false;
*started_tx = false;
if (table_states_validity != SYNC_TABLE_STATE_VALID)
{
MemoryContext oldctx;
List *rstates;
ListCell *lc;
SubscriptionRelState *rstate;
table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED;
/* Clean the old lists. */
list_free_deep(table_states_not_ready);
table_states_not_ready = NIL;
if (!IsTransactionState())
{
StartTransactionCommand();
*started_tx = true;
}
/* Fetch all non-ready tables. */
rstates = GetSubscriptionRelations(MySubscription->oid, true);
/* Allocate the tracking info in a permanent memory context. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
foreach(lc, rstates)
{
rstate = palloc(sizeof(SubscriptionRelState));
memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
table_states_not_ready = lappend(table_states_not_ready, rstate);
}
MemoryContextSwitchTo(oldctx);
/*
* Does the subscription have tables?
*
* If there were not-READY relations found then we know it does. But
* if table_states_not_ready was empty we still need to check again to
* see if there are 0 tables.
*/
has_subrels = (table_states_not_ready != NIL) ||
HasSubscriptionRelations(MySubscription->oid);
/*
* If the subscription relation cache has been invalidated since we
* entered this routine, we still use and return the relations we just
* finished constructing, to avoid infinite loops, but we leave the
* table states marked as stale so that we'll rebuild it again on next
* access. Otherwise, we mark the table states as valid.
*/
if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED)
table_states_validity = SYNC_TABLE_STATE_VALID;
}
return has_subrels;
}
/* /*
* Execute the initial sync with error handling. Disable the subscription, * Execute the initial sync with error handling. Disable the subscription,
* if it's required. * if it's required.
@@ -1755,7 +1601,7 @@ TablesyncWorkerMain(Datum main_arg)
run_tablesync_worker(); run_tablesync_worker();
finish_sync_worker(); FinishSyncWorker();
} }
/* /*
@@ -1773,7 +1619,7 @@ AllTablesyncsReady(void)
bool has_subrels = false; bool has_subrels = false;
/* We need up-to-date sync state info for subscription tables here. */ /* We need up-to-date sync state info for subscription tables here. */
has_subrels = FetchTableStates(&started_tx); has_subrels = FetchRelationStates(&started_tx);
if (started_tx) if (started_tx)
{ {
@@ -1789,21 +1635,21 @@ AllTablesyncsReady(void)
} }
/* /*
* Return whether the subscription currently has any relations. * Return whether the subscription currently has any tables.
* *
* Note: Unlike HasSubscriptionRelations(), this function relies on cached * Note: Unlike HasSubscriptionTables(), this function relies on cached
* information for subscription relations. Additionally, it should not be * information for subscription tables. Additionally, it should not be
* invoked outside of apply or tablesync workers, as MySubscription must be * invoked outside of apply or tablesync workers, as MySubscription must be
* initialized first. * initialized first.
*/ */
bool bool
HasSubscriptionRelationsCached(void) HasSubscriptionTablesCached(void)
{ {
bool started_tx; bool started_tx;
bool has_subrels; bool has_subrels;
/* We need up-to-date subscription tables info here */ /* We need up-to-date subscription tables info here */
has_subrels = FetchTableStates(&started_tx); has_subrels = FetchRelationStates(&started_tx);
if (started_tx) if (started_tx)
{ {

View File

@@ -91,7 +91,7 @@
* behave as if two_phase = off. When the apply worker detects that all * behave as if two_phase = off. When the apply worker detects that all
* tablesyncs have become READY (while the tri-state was PENDING) it will * tablesyncs have become READY (while the tri-state was PENDING) it will
* restart the apply worker process. This happens in * restart the apply worker process. This happens in
* process_syncing_tables_for_apply. * ProcessSyncingTablesForApply.
* *
* When the (re-started) apply worker finds that all tablesyncs are READY for a * When the (re-started) apply worker finds that all tablesyncs are READY for a
* two_phase tri-state of PENDING it start streaming messages with the * two_phase tri-state of PENDING it start streaming messages with the
@@ -1243,7 +1243,7 @@ apply_handle_commit(StringInfo s)
apply_handle_commit_internal(&commit_data); apply_handle_commit_internal(&commit_data);
/* Process any tables that are being synchronized in parallel. */ /* Process any tables that are being synchronized in parallel. */
process_syncing_tables(commit_data.end_lsn); ProcessSyncingRelations(commit_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info(); reset_apply_error_context_info();
@@ -1365,7 +1365,7 @@ apply_handle_prepare(StringInfo s)
in_remote_transaction = false; in_remote_transaction = false;
/* Process any tables that are being synchronized in parallel. */ /* Process any tables that are being synchronized in parallel. */
process_syncing_tables(prepare_data.end_lsn); ProcessSyncingRelations(prepare_data.end_lsn);
/* /*
* Since we have already prepared the transaction, in a case where the * Since we have already prepared the transaction, in a case where the
@@ -1421,7 +1421,7 @@ apply_handle_commit_prepared(StringInfo s)
in_remote_transaction = false; in_remote_transaction = false;
/* Process any tables that are being synchronized in parallel. */ /* Process any tables that are being synchronized in parallel. */
process_syncing_tables(prepare_data.end_lsn); ProcessSyncingRelations(prepare_data.end_lsn);
clear_subscription_skip_lsn(prepare_data.end_lsn); clear_subscription_skip_lsn(prepare_data.end_lsn);
@@ -1487,7 +1487,7 @@ apply_handle_rollback_prepared(StringInfo s)
in_remote_transaction = false; in_remote_transaction = false;
/* Process any tables that are being synchronized in parallel. */ /* Process any tables that are being synchronized in parallel. */
process_syncing_tables(rollback_data.rollback_end_lsn); ProcessSyncingRelations(rollback_data.rollback_end_lsn);
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info(); reset_apply_error_context_info();
@@ -1622,7 +1622,7 @@ apply_handle_stream_prepare(StringInfo s)
pgstat_report_stat(false); pgstat_report_stat(false);
/* Process any tables that are being synchronized in parallel. */ /* Process any tables that are being synchronized in parallel. */
process_syncing_tables(prepare_data.end_lsn); ProcessSyncingRelations(prepare_data.end_lsn);
/* /*
* Similar to prepare case, the subskiplsn could be left in a case of * Similar to prepare case, the subskiplsn could be left in a case of
@@ -2464,7 +2464,7 @@ apply_handle_stream_commit(StringInfo s)
} }
/* Process any tables that are being synchronized in parallel. */ /* Process any tables that are being synchronized in parallel. */
process_syncing_tables(commit_data.end_lsn); ProcessSyncingRelations(commit_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
@@ -4133,7 +4133,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
maybe_reread_subscription(); maybe_reread_subscription();
/* Process any table synchronization changes. */ /* Process any table synchronization changes. */
process_syncing_tables(last_received); ProcessSyncingRelations(last_received);
} }
/* Cleanup the memory. */ /* Cleanup the memory. */
@@ -4623,7 +4623,7 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
* RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
* concurrently add tables to the subscription, the apply worker may not * concurrently add tables to the subscription, the apply worker may not
* process invalidations in time. Consequently, * process invalidations in time. Consequently,
* HasSubscriptionRelationsCached() might miss the new tables, leading to * HasSubscriptionTablesCached() might miss the new tables, leading to
* premature advancement of oldest_nonremovable_xid. * premature advancement of oldest_nonremovable_xid.
* *
* Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
@@ -4637,7 +4637,7 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
* subscription tables at this stage to prevent unnecessary tuple * subscription tables at this stage to prevent unnecessary tuple
* retention. * retention.
*/ */
if (HasSubscriptionRelationsCached() && !AllTablesyncsReady()) if (HasSubscriptionTablesCached() && !AllTablesyncsReady())
{ {
TimestampTz now; TimestampTz now;
@@ -5876,7 +5876,7 @@ SetupApplyOrSyncWorker(int worker_slot)
* the subscription relation state. * the subscription relation state.
*/ */
CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
invalidate_syncing_table_states, InvalidateSyncingRelStates,
(Datum) 0); (Datum) 0);
} }

View File

@@ -244,8 +244,8 @@ getSchemaData(Archive *fout, int *numTablesPtr)
pg_log_info("reading subscriptions"); pg_log_info("reading subscriptions");
getSubscriptions(fout); getSubscriptions(fout);
pg_log_info("reading subscription membership of tables"); pg_log_info("reading subscription membership of relations");
getSubscriptionTables(fout); getSubscriptionRelations(fout);
free(inhinfo); /* not needed any longer */ free(inhinfo); /* not needed any longer */

View File

@@ -5305,12 +5305,12 @@ getSubscriptions(Archive *fout)
} }
/* /*
* getSubscriptionTables * getSubscriptionRelations
* Get information about subscription membership for dumpable tables. This * Get information about subscription membership for dumpable relations. This
* will be used only in binary-upgrade mode for PG17 or later versions. * will be used only in binary-upgrade mode for PG17 or later versions.
*/ */
void void
getSubscriptionTables(Archive *fout) getSubscriptionRelations(Archive *fout)
{ {
DumpOptions *dopt = fout->dopt; DumpOptions *dopt = fout->dopt;
SubscriptionInfo *subinfo = NULL; SubscriptionInfo *subinfo = NULL;
@@ -5364,7 +5364,7 @@ getSubscriptionTables(Archive *fout)
tblinfo = findTableByOid(relid); tblinfo = findTableByOid(relid);
if (tblinfo == NULL) if (tblinfo == NULL)
pg_fatal("failed sanity check, table with OID %u not found", pg_fatal("failed sanity check, relation with OID %u not found",
relid); relid);
/* OK, make a DumpableObject for this relationship */ /* OK, make a DumpableObject for this relationship */

View File

@@ -829,6 +829,6 @@ extern void getPublicationNamespaces(Archive *fout);
extern void getPublicationTables(Archive *fout, TableInfo tblinfo[], extern void getPublicationTables(Archive *fout, TableInfo tblinfo[],
int numTables); int numTables);
extern void getSubscriptions(Archive *fout); extern void getSubscriptions(Archive *fout);
extern void getSubscriptionTables(Archive *fout); extern void getSubscriptionRelations(Archive *fout);
#endif /* PG_DUMP_H */ #endif /* PG_DUMP_H */

View File

@@ -89,7 +89,7 @@ extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
extern void RemoveSubscriptionRel(Oid subid, Oid relid); extern void RemoveSubscriptionRel(Oid subid, Oid relid);
extern bool HasSubscriptionRelations(Oid subid); extern bool HasSubscriptionTables(Oid subid);
extern List *GetSubscriptionRelations(Oid subid, bool not_ready); extern List *GetSubscriptionRelations(Oid subid, bool not_ready);
extern void UpdateDeadTupleRetentionStatus(Oid subid, bool active); extern void UpdateDeadTupleRetentionStatus(Oid subid, bool active);

View File

@@ -251,6 +251,8 @@ extern PGDLLIMPORT bool in_remote_transaction;
extern PGDLLIMPORT bool InitializingApplyWorker; extern PGDLLIMPORT bool InitializingApplyWorker;
extern PGDLLIMPORT List *table_states_not_ready;
extern void logicalrep_worker_attach(int slot); extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running); bool only_running);
@@ -272,12 +274,16 @@ extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
char *originname, Size szoriginname); char *originname, Size szoriginname);
extern bool AllTablesyncsReady(void); extern bool AllTablesyncsReady(void);
extern bool HasSubscriptionRelationsCached(void); extern bool HasSubscriptionTablesCached(void);
extern void UpdateTwoPhaseState(Oid suboid, char new_state); extern void UpdateTwoPhaseState(Oid suboid, char new_state);
extern void process_syncing_tables(XLogRecPtr current_lsn); extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn);
extern void invalidate_syncing_table_states(Datum arg, int cacheid, extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn);
uint32 hashvalue);
pg_noreturn extern void FinishSyncWorker(void);
extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue);
extern void ProcessSyncingRelations(XLogRecPtr current_lsn);
extern bool FetchRelationStates(bool *started_tx);
extern void stream_start_internal(TransactionId xid, bool first_segment); extern void stream_start_internal(TransactionId xid, bool first_segment);
extern void stream_stop_internal(TransactionId xid); extern void stream_stop_internal(TransactionId xid);

View File

@@ -2922,7 +2922,7 @@ SyncRepStandbyData
SyncRequestHandler SyncRequestHandler
SyncRequestType SyncRequestType
SyncStandbySlotsConfigData SyncStandbySlotsConfigData
SyncingTablesState SyncingRelationsState
SysFKRelationship SysFKRelationship
SysScanDesc SysScanDesc
SyscacheCallbackFunction SyscacheCallbackFunction