1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-31 10:30:33 +03:00

Introduce "REFRESH SEQUENCES" for subscriptions.

This patch adds support for a new SQL command:
ALTER SUBSCRIPTION ... REFRESH SEQUENCES
This command updates the sequence entries present in the
pg_subscription_rel catalog table with the INIT state to trigger
resynchronization.

In addition to the new command, the following subscription commands have
been enhanced to automatically refresh sequence mappings:
ALTER SUBSCRIPTION ... REFRESH PUBLICATION
ALTER SUBSCRIPTION ... ADD PUBLICATION
ALTER SUBSCRIPTION ... DROP PUBLICATION
ALTER SUBSCRIPTION ... SET PUBLICATION

These commands will perform the following actions:
Add newly published sequences that are not yet part of the subscription.
Remove sequences that are no longer included in the publication.

This ensures that sequence replication remains aligned with the current
state of the publication on the publisher side.

Note that the actual synchronization of sequence data/values will be
handled in a subsequent patch that introduces a dedicated sequence sync
worker.

Author: Vignesh C <vignesh21@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Peter Smith <smithpb2250@gmail.com>
Reviewed-by: Nisha Moond <nisha.moond412@gmail.com>
Reviewed-by: Shlok Kyal <shlok.kyal.oss@gmail.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Reviewed-by: Hou Zhijie <houzj.fnst@fujitsu.com>
Discussion: https://postgr.es/m/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com
This commit is contained in:
Amit Kapila
2025-10-23 08:30:27 +00:00
parent 6ae08d9583
commit f0b3573c3a
19 changed files with 609 additions and 141 deletions

View File

@@ -106,12 +106,29 @@ typedef struct SubOpts
XLogRecPtr lsn;
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
static void check_publications_origin(WalReceiverConn *wrconn,
List *publications, bool copydata,
bool retain_dead_tuples, char *origin,
Oid *subrel_local_oids, int subrel_count,
char *subname);
/*
* PublicationRelKind represents a relation included in a publication.
* It stores the schema-qualified relation name (rv) and its kind (relkind).
*/
typedef struct PublicationRelKind
{
RangeVar *rv;
char relkind;
} PublicationRelKind;
static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications);
static void check_publications_origin_tables(WalReceiverConn *wrconn,
List *publications, bool copydata,
bool retain_dead_tuples,
char *origin,
Oid *subrel_local_oids,
int subrel_count, char *subname);
static void check_publications_origin_sequences(WalReceiverConn *wrconn,
List *publications,
bool copydata, char *origin,
Oid *subrel_local_oids,
int subrel_count,
char *subname);
static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
@@ -736,20 +753,27 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
/*
* A replication origin is currently created for all subscriptions,
* including those that only contain sequences or are otherwise empty.
*
* XXX: While this is technically unnecessary, optimizing it would require
* additional logic to skip origin creation during DDL operations and
* apply workers initialization, and to handle origin creation dynamically
* when tables are added to the subscription. It is not clear whether
* preventing creation of origins is worth additional complexity.
*/
ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
replorigin_create(originname);
/*
* Connect to remote side to execute requested commands and fetch table
* info.
* and sequence info.
*/
if (opts.connect)
{
char *err;
WalReceiverConn *wrconn;
List *tables;
ListCell *lc;
char table_state;
bool must_use_password;
/* Try to connect to the publisher. */
@@ -764,10 +788,18 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
PG_TRY();
{
bool has_tables = false;
List *pubrels;
char relation_state;
check_publications(wrconn, publications);
check_publications_origin(wrconn, publications, opts.copy_data,
opts.retaindeadtuples, opts.origin,
NULL, 0, stmt->subname);
check_publications_origin_tables(wrconn, publications,
opts.copy_data,
opts.retaindeadtuples, opts.origin,
NULL, 0, stmt->subname);
check_publications_origin_sequences(wrconn, publications,
opts.copy_data, opts.origin,
NULL, 0, stmt->subname);
if (opts.retaindeadtuples)
check_pub_dead_tuple_retention(wrconn);
@@ -776,25 +808,28 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* Set sync state based on if we were asked to do data copy or
* not.
*/
table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
relation_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
/*
* Get the table list from publisher and build local table status
* info.
* Build local relation status info. Relations are for both tables
* and sequences from the publisher.
*/
tables = fetch_table_list(wrconn, publications);
foreach(lc, tables)
pubrels = fetch_relation_list(wrconn, publications);
foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
{
RangeVar *rv = (RangeVar *) lfirst(lc);
Oid relid;
char relkind;
RangeVar *rv = pubrelinfo->rv;
relid = RangeVarGetRelid(rv, AccessShareLock, false);
relkind = get_rel_relkind(relid);
/* Check for supported relkind. */
CheckSubscriptionRelkind(get_rel_relkind(relid),
CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
rv->schemaname, rv->relname);
AddSubscriptionRelState(subid, relid, table_state,
has_tables |= (relkind != RELKIND_SEQUENCE);
AddSubscriptionRelState(subid, relid, relation_state,
InvalidXLogRecPtr, true);
}
@@ -802,6 +837,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* If requested, create permanent slot for the subscription. We
* won't use the initial snapshot for anything, so no need to
* export it.
*
* XXX: Similar to origins, it is not clear whether preventing the
* slot creation for empty and sequence-only subscriptions is
* worth additional complexity.
*/
if (opts.create_slot)
{
@@ -825,7 +864,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
* PUBLICATION to work.
*/
if (opts.twophase && !opts.copy_data && tables != NIL)
if (opts.twophase && !opts.copy_data && has_tables)
twophase_enabled = true;
walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
@@ -879,21 +918,24 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
List *validate_publications)
{
char *err;
List *pubrel_names;
List *subrel_states;
Oid *subrel_local_oids;
List *pubrels = NIL;
Oid *pubrel_local_oids;
List *subrel_states;
List *sub_remove_rels = NIL;
Oid *subrel_local_oids;
Oid *subseq_local_oids;
int subrel_count;
ListCell *lc;
int off;
int remove_rel_len;
int subrel_count;
int tbl_count = 0;
int seq_count = 0;
Relation rel = NULL;
typedef struct SubRemoveRels
{
Oid relid;
char state;
} SubRemoveRels;
SubRemoveRels *sub_remove_rels;
WalReceiverConn *wrconn;
bool must_use_password;
@@ -915,71 +957,84 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
if (validate_publications)
check_publications(wrconn, validate_publications);
/* Get the table list from publisher. */
pubrel_names = fetch_table_list(wrconn, sub->publications);
/* Get the relation list from publisher. */
pubrels = fetch_relation_list(wrconn, sub->publications);
/* Get local table list. */
subrel_states = GetSubscriptionRelations(sub->oid, false);
/* Get local relation list. */
subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
subrel_count = list_length(subrel_states);
/*
* Build qsorted array of local table oids for faster lookup. This can
* potentially contain all tables in the database so speed of lookup
* is important.
* Build qsorted arrays of local table oids and sequence oids for
* faster lookup. This can potentially contain all tables and
* sequences in the database so speed of lookup is important.
*
* We do not yet know the exact count of tables and sequences, so we
* allocate separate arrays for table OIDs and sequence OIDs based on
* the total number of relations (subrel_count).
*/
subrel_local_oids = palloc(subrel_count * sizeof(Oid));
off = 0;
subseq_local_oids = palloc(subrel_count * sizeof(Oid));
foreach(lc, subrel_states)
{
SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
subrel_local_oids[off++] = relstate->relid;
if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
subseq_local_oids[seq_count++] = relstate->relid;
else
subrel_local_oids[tbl_count++] = relstate->relid;
}
qsort(subrel_local_oids, subrel_count,
sizeof(Oid), oid_cmp);
check_publications_origin(wrconn, sub->publications, copy_data,
sub->retaindeadtuples, sub->origin,
subrel_local_oids, subrel_count, sub->name);
qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp);
check_publications_origin_tables(wrconn, sub->publications, copy_data,
sub->retaindeadtuples, sub->origin,
subrel_local_oids, tbl_count,
sub->name);
qsort(subseq_local_oids, seq_count, sizeof(Oid), oid_cmp);
check_publications_origin_sequences(wrconn, sub->publications,
copy_data, sub->origin,
subseq_local_oids, seq_count,
sub->name);
/*
* Rels that we want to remove from subscription and drop any slots
* and origins corresponding to them.
*/
sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels));
/*
* Walk over the remote tables and try to match them to locally known
* tables. If the table is not known locally create a new state for
* it.
* Walk over the remote relations and try to match them to locally
* known relations. If the relation is not known locally create a new
* state for it.
*
* Also builds array of local oids of remote tables for the next step.
* Also builds array of local oids of remote relations for the next
* step.
*/
off = 0;
pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
pubrel_local_oids = palloc(list_length(pubrels) * sizeof(Oid));
foreach(lc, pubrel_names)
foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
{
RangeVar *rv = (RangeVar *) lfirst(lc);
RangeVar *rv = pubrelinfo->rv;
Oid relid;
char relkind;
relid = RangeVarGetRelid(rv, AccessShareLock, false);
relkind = get_rel_relkind(relid);
/* Check for supported relkind. */
CheckSubscriptionRelkind(get_rel_relkind(relid),
CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
rv->schemaname, rv->relname);
pubrel_local_oids[off++] = relid;
if (!bsearch(&relid, subrel_local_oids,
subrel_count, sizeof(Oid), oid_cmp))
tbl_count, sizeof(Oid), oid_cmp) &&
!bsearch(&relid, subseq_local_oids,
seq_count, sizeof(Oid), oid_cmp))
{
AddSubscriptionRelState(sub->oid, relid,
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
InvalidXLogRecPtr, true);
ereport(DEBUG1,
(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
rv->schemaname, rv->relname, sub->name)));
errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
relkind == RELKIND_SEQUENCE ? "sequence" : "table",
rv->schemaname, rv->relname, sub->name));
}
}
@@ -987,19 +1042,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
* Next remove state for tables we should not care about anymore using
* the data we collected above
*/
qsort(pubrel_local_oids, list_length(pubrel_names),
sizeof(Oid), oid_cmp);
qsort(pubrel_local_oids, list_length(pubrels), sizeof(Oid), oid_cmp);
remove_rel_len = 0;
for (off = 0; off < subrel_count; off++)
for (off = 0; off < tbl_count; off++)
{
Oid relid = subrel_local_oids[off];
if (!bsearch(&relid, pubrel_local_oids,
list_length(pubrel_names), sizeof(Oid), oid_cmp))
list_length(pubrels), sizeof(Oid), oid_cmp))
{
char state;
XLogRecPtr statelsn;
SubRemoveRels *remove_rel = palloc(sizeof(SubRemoveRels));
/*
* Lock pg_subscription_rel with AccessExclusiveLock to
@@ -1021,11 +1075,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
/* Last known rel state. */
state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
sub_remove_rels[remove_rel_len].relid = relid;
sub_remove_rels[remove_rel_len++].state = state;
RemoveSubscriptionRel(sub->oid, relid);
remove_rel->relid = relid;
remove_rel->state = state;
sub_remove_rels = lappend(sub_remove_rels, remove_rel);
logicalrep_worker_stop(sub->oid, relid);
/*
@@ -1064,10 +1120,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
* to be at the end because otherwise if there is an error while doing
* the database operations we won't be able to rollback dropped slots.
*/
for (off = 0; off < remove_rel_len; off++)
foreach_ptr(SubRemoveRels, rel, sub_remove_rels)
{
if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
if (rel->state != SUBREL_STATE_READY &&
rel->state != SUBREL_STATE_SYNCDONE)
{
char syncslotname[NAMEDATALEN] = {0};
@@ -1081,11 +1137,39 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
* dropped slots and fail. For these reasons, we allow
* missing_ok = true for the drop.
*/
ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
ReplicationSlotNameForTablesync(sub->oid, rel->relid,
syncslotname, sizeof(syncslotname));
ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
}
}
/*
* Next remove state for sequences we should not care about anymore
* using the data we collected above
*/
for (off = 0; off < seq_count; off++)
{
Oid relid = subseq_local_oids[off];
if (!bsearch(&relid, pubrel_local_oids,
list_length(pubrels), sizeof(Oid), oid_cmp))
{
/*
* This locking ensures that the state of rels won't change
* till we are done with this refresh operation.
*/
if (!rel)
rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
RemoveSubscriptionRel(sub->oid, relid);
ereport(DEBUG1,
errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
get_namespace_name(get_rel_namespace(relid)),
get_rel_name(relid),
sub->name));
}
}
}
PG_FINALLY();
{
@@ -1097,6 +1181,58 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
table_close(rel, NoLock);
}
/*
* Marks all sequences with INIT state.
*/
static void
AlterSubscription_refresh_seq(Subscription *sub)
{
char *err = NULL;
WalReceiverConn *wrconn;
bool must_use_password;
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
/* Try to connect to the publisher. */
must_use_password = sub->passwordrequired && !sub->ownersuperuser;
wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
sub->name, &err);
if (!wrconn)
ereport(ERROR,
errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("subscription \"%s\" could not connect to the publisher: %s",
sub->name, err));
PG_TRY();
{
List *subrel_states;
check_publications_origin_sequences(wrconn, sub->publications, true,
sub->origin, NULL, 0, sub->name);
/* Get local sequence list. */
subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
foreach_ptr(SubscriptionRelState, subrel, subrel_states)
{
Oid relid = subrel->relid;
UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
InvalidXLogRecPtr, false);
ereport(DEBUG1,
errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
get_namespace_name(get_rel_namespace(relid)),
get_rel_name(relid),
sub->name));
}
}
PG_FINALLY();
{
walrcv_disconnect(wrconn);
}
PG_END_TRY();
}
/*
* Common checks for altering failover, two_phase, and retain_dead_tuples
* options.
@@ -1733,6 +1869,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
break;
}
case ALTER_SUBSCRIPTION_REFRESH_SEQUENCES:
{
if (!sub->enabled)
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("%s is not allowed for disabled subscriptions",
"ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
AlterSubscription_refresh_seq(sub);
break;
}
case ALTER_SUBSCRIPTION_SKIP:
{
parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
@@ -1824,9 +1973,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
if (retain_dead_tuples)
check_pub_dead_tuple_retention(wrconn);
check_publications_origin(wrconn, sub->publications, false,
retain_dead_tuples, origin, NULL, 0,
sub->name);
check_publications_origin_tables(wrconn, sub->publications, false,
retain_dead_tuples, origin, NULL, 0,
sub->name);
if (update_failover || update_two_phase)
walrcv_alter_slot(wrconn, sub->slotname,
@@ -2008,7 +2157,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* the apply and tablesync workers and they can't restart because of
* exclusive lock on the subscription.
*/
rstates = GetSubscriptionRelations(subid, true);
rstates = GetSubscriptionRelations(subid, true, false, true);
foreach(lc, rstates)
{
SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
@@ -2341,10 +2490,10 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
* - See comments atop worker.c for more details.
*/
static void
check_publications_origin(WalReceiverConn *wrconn, List *publications,
bool copydata, bool retain_dead_tuples,
char *origin, Oid *subrel_local_oids,
int subrel_count, char *subname)
check_publications_origin_tables(WalReceiverConn *wrconn, List *publications,
bool copydata, bool retain_dead_tuples,
char *origin, Oid *subrel_local_oids,
int subrel_count, char *subname)
{
WalRcvExecResult *res;
StringInfoData cmd;
@@ -2421,7 +2570,7 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
errmsg("could not receive list of replicated tables from the publisher: %s",
res->err)));
/* Process tables. */
/* Process publications. */
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
{
@@ -2482,6 +2631,114 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
walrcv_clear_result(res);
}
/*
* This function is similar to check_publications_origin_tables and serves
* same purpose for sequences.
*/
static void
check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications,
bool copydata, char *origin,
Oid *subrel_local_oids, int subrel_count,
char *subname)
{
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[1] = {TEXTOID};
List *publist = NIL;
/*
* Enable sequence synchronization checks only when origin is 'none' , to
* ensure that sequence data from other origins is not inadvertently
* copied.
*/
if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0)
return;
initStringInfo(&cmd);
appendStringInfoString(&cmd,
"SELECT DISTINCT P.pubname AS pubname\n"
"FROM pg_publication P,\n"
" LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
" JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
" pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
"WHERE C.oid = GPS.relid AND P.pubname IN (");
GetPublicationsStr(publications, &cmd, true);
appendStringInfoString(&cmd, ")\n");
/*
* In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
* subrel_local_oids contains the list of relations that are already
* present on the subscriber. This check should be skipped as these will
* not be re-synced.
*/
for (int i = 0; i < subrel_count; i++)
{
Oid relid = subrel_local_oids[i];
char *schemaname = get_namespace_name(get_rel_namespace(relid));
char *seqname = get_rel_name(relid);
appendStringInfo(&cmd,
"AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
schemaname, seqname);
}
res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
pfree(cmd.data);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not receive list of replicated sequences from the publisher: %s",
res->err)));
/* Process publications. */
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
{
char *pubname;
bool isnull;
pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
Assert(!isnull);
ExecClearTuple(slot);
publist = list_append_unique(publist, makeString(pubname));
}
/*
* Log a warning if the publisher has subscribed to the same sequence from
* some other publisher. We cannot know the origin of sequences data
* during the initial sync.
*/
if (publist)
{
StringInfo pubnames = makeStringInfo();
StringInfo err_msg = makeStringInfo();
StringInfo err_hint = makeStringInfo();
/* Prepare the list of publication(s) for warning message. */
GetPublicationsStr(publist, pubnames, false);
appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"),
subname);
appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher sequences did not come from other origins."));
ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg_internal("%s", err_msg->data),
errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
"The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
list_length(publist), pubnames->data),
errhint_internal("%s", err_hint->data));
}
ExecDropSingleTupleTableSlot(slot);
walrcv_clear_result(res);
}
/*
* Determine whether the retain_dead_tuples can be enabled based on the
* publisher's status.
@@ -2594,8 +2851,23 @@ CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
}
/*
* Get the list of tables which belong to specified publications on the
* publisher connection.
* Return true iff 'rv' is a member of the list.
*/
static bool
list_member_rangevar(const List *list, RangeVar *rv)
{
foreach_ptr(PublicationRelKind, relinfo, list)
{
if (equal(relinfo->rv, rv))
return true;
}
return false;
}
/*
* Get the list of tables and sequences which belong to specified publications
* on the publisher connection.
*
* Note that we don't support the case where the column list is different for
* the same table in different publications to avoid sending unwanted column
@@ -2603,15 +2875,16 @@ CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
* list and row filter are specified for different publications.
*/
static List *
fetch_table_list(WalReceiverConn *wrconn, List *publications)
fetch_relation_list(WalReceiverConn *wrconn, List *publications)
{
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[3] = {TEXTOID, TEXTOID, InvalidOid};
List *tablelist = NIL;
Oid tableRow[4] = {TEXTOID, TEXTOID, CHAROID, InvalidOid};
List *relationlist = NIL;
int server_version = walrcv_server_version(wrconn);
bool check_columnlist = (server_version >= 150000);
int column_count = check_columnlist ? 4 : 3;
StringInfo pub_names = makeStringInfo();
initStringInfo(&cmd);
@@ -2619,10 +2892,10 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
/* Build the pub_names comma-separated string. */
GetPublicationsStr(publications, pub_names, true);
/* Get the list of tables from the publisher. */
/* Get the list of relations from the publisher */
if (server_version >= 160000)
{
tableRow[2] = INT2VECTOROID;
tableRow[3] = INT2VECTOROID;
/*
* From version 16, we allowed passing multiple publications to the
@@ -2637,19 +2910,28 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
* to worry if different publications have specified them in a
* different order. See pub_collist_validate.
*/
appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n"
" FROM pg_class c\n"
appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
" FROM pg_class c\n"
" JOIN pg_namespace n ON n.oid = c.relnamespace\n"
" JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
" FROM pg_publication\n"
" WHERE pubname IN ( %s )) AS gpt\n"
" ON gpt.relid = c.oid\n",
pub_names->data);
/* From version 19, inclusion of sequences in the target is supported */
if (server_version >= 190000)
appendStringInfo(&cmd,
"UNION ALL\n"
" SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
" FROM pg_catalog.pg_publication_sequences s\n"
" WHERE s.pubname IN ( %s )",
pub_names->data);
}
else
{
tableRow[2] = NAMEARRAYOID;
appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
tableRow[3] = NAMEARRAYOID;
appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
/* Get column lists for each relation if the publisher supports it */
if (check_columnlist)
@@ -2662,7 +2944,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
destroyStringInfo(pub_names);
res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
res = walrcv_exec(wrconn, cmd.data, column_count, tableRow);
pfree(cmd.data);
if (res->status != WALRCV_OK_TUPLES)
@@ -2678,22 +2960,28 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
char *nspname;
char *relname;
bool isnull;
RangeVar *rv;
char relkind;
PublicationRelKind *relinfo = palloc_object(PublicationRelKind);
nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
Assert(!isnull);
relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
Assert(!isnull);
relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
Assert(!isnull);
rv = makeRangeVar(nspname, relname, -1);
relinfo->rv = makeRangeVar(nspname, relname, -1);
relinfo->relkind = relkind;
if (check_columnlist && list_member(tablelist, rv))
if (relkind != RELKIND_SEQUENCE &&
check_columnlist &&
list_member_rangevar(relationlist, relinfo->rv))
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
nspname, relname));
else
tablelist = lappend(tablelist, rv);
relationlist = lappend(relationlist, relinfo);
ExecClearTuple(slot);
}
@@ -2701,7 +2989,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
walrcv_clear_result(res);
return tablelist;
return relationlist;
}
/*