1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-26 01:22:12 +03:00

Replicate generated columns when 'publish_generated_columns' is set.

This patch builds on the work done in commit 745217a051 by enabling the
replication of generated columns alongside regular column changes through
a new publication parameter: publish_generated_columns.

Example usage:
CREATE PUBLICATION pub1 FOR TABLE tab_gencol WITH (publish_generated_columns = true);

The column list takes precedence. If the generated columns are specified
in the column list, they will be replicated even if
'publish_generated_columns' is set to false. Conversely, if generated
columns are not included in the column list (assuming the user specifies a
column list), they will not be replicated even if
'publish_generated_columns' is true.

Author: Vignesh C, Shubham Khanna
Reviewed-by: Peter Smith, Amit Kapila, Hayato Kuroda, Shlok Kyal, Ajin Cherian, Hou Zhijie, Masahiko Sawada
Discussion: https://postgr.es/m/B80D17B2-2C8E-4C7D-87F2-E5B4BE3C069E@gmail.com
This commit is contained in:
Amit Kapila
2024-11-07 08:58:49 +05:30
parent 70291a3c66
commit 7054186c4e
20 changed files with 926 additions and 404 deletions

View File

@ -30,10 +30,11 @@
#define TRUNCATE_RESTART_SEQS (1<<1)
static void logicalrep_write_attrs(StringInfo out, Relation rel,
Bitmapset *columns);
Bitmapset *columns, bool include_gencols);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
TupleTableSlot *slot,
bool binary, Bitmapset *columns);
bool binary, Bitmapset *columns,
bool include_gencols);
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@ -399,7 +400,8 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
*/
void
logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
TupleTableSlot *newslot, bool binary, Bitmapset *columns)
TupleTableSlot *newslot, bool binary,
Bitmapset *columns, bool include_gencols)
{
pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
@ -411,7 +413,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
pq_sendint32(out, RelationGetRelid(rel));
pq_sendbyte(out, 'N'); /* new tuple follows */
logicalrep_write_tuple(out, rel, newslot, binary, columns);
logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols);
}
/*
@ -444,7 +446,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
void
logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
TupleTableSlot *oldslot, TupleTableSlot *newslot,
bool binary, Bitmapset *columns)
bool binary, Bitmapset *columns, bool include_gencols)
{
pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
@ -465,11 +467,12 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
logicalrep_write_tuple(out, rel, oldslot, binary, columns);
logicalrep_write_tuple(out, rel, oldslot, binary, columns,
include_gencols);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
logicalrep_write_tuple(out, rel, newslot, binary, columns);
logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols);
}
/*
@ -519,7 +522,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
void
logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
TupleTableSlot *oldslot, bool binary,
Bitmapset *columns)
Bitmapset *columns, bool include_gencols)
{
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@ -539,7 +542,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
else
pq_sendbyte(out, 'K'); /* old key follows */
logicalrep_write_tuple(out, rel, oldslot, binary, columns);
logicalrep_write_tuple(out, rel, oldslot, binary, columns, include_gencols);
}
/*
@ -655,7 +658,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
*/
void
logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
Bitmapset *columns)
Bitmapset *columns, bool include_gencols)
{
char *relname;
@ -677,7 +680,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
pq_sendbyte(out, rel->rd_rel->relreplident);
/* send the attribute info */
logicalrep_write_attrs(out, rel, columns);
logicalrep_write_attrs(out, rel, columns, include_gencols);
}
/*
@ -754,7 +757,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
*/
static void
logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
bool binary, Bitmapset *columns)
bool binary, Bitmapset *columns, bool include_gencols)
{
TupleDesc desc;
Datum *values;
@ -768,7 +771,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
{
Form_pg_attribute att = TupleDescAttr(desc, i);
if (!logicalrep_should_publish_column(att, columns))
if (!logicalrep_should_publish_column(att, columns, include_gencols))
continue;
nliveatts++;
@ -786,7 +789,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
Form_pg_type typclass;
Form_pg_attribute att = TupleDescAttr(desc, i);
if (!logicalrep_should_publish_column(att, columns))
if (!logicalrep_should_publish_column(att, columns, include_gencols))
continue;
if (isnull[i])
@ -904,7 +907,8 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
* Write relation attribute metadata to the stream.
*/
static void
logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns,
bool include_gencols)
{
TupleDesc desc;
int i;
@ -919,7 +923,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
{
Form_pg_attribute att = TupleDescAttr(desc, i);
if (!logicalrep_should_publish_column(att, columns))
if (!logicalrep_should_publish_column(att, columns, include_gencols))
continue;
nliveatts++;
@ -937,7 +941,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
Form_pg_attribute att = TupleDescAttr(desc, i);
uint8 flags = 0;
if (!logicalrep_should_publish_column(att, columns))
if (!logicalrep_should_publish_column(att, columns, include_gencols))
continue;
/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
@ -1248,29 +1252,26 @@ logicalrep_message_type(LogicalRepMsgType action)
/*
* Check if the column 'att' of a table should be published.
*
* 'columns' represents the column list specified for that table in the
* publication.
* 'columns' represents the publication column list (if any) for that table.
*
* Note that generated columns can be present only in 'columns' list.
* 'include_gencols' flag indicates whether generated columns should be
* published when there is no column list. Typically, this will have the same
* value as the 'publish_generated_columns' publication parameter.
*
* Note that generated columns can be published only when present in a
* publication column list, or when include_gencols is true.
*/
bool
logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns)
logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns,
bool include_gencols)
{
if (att->attisdropped)
return false;
/*
* Skip publishing generated columns if they are not included in the
* column list.
*/
if (!columns && att->attgenerated)
return false;
/* If a column list is provided, publish only the cols in that list. */
if (columns)
return bms_is_member(att->attnum, columns);
/*
* Check if a column is covered by a column list.
*/
if (columns && !bms_is_member(att->attnum, columns))
return false;
return true;
/* All non-generated columns are always published. */
return att->attgenerated ? include_gencols : true;
}

View File

@ -84,9 +84,6 @@ static bool publications_valid;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx,
Bitmapset *columns);
static void send_repl_origin(LogicalDecodingContext *ctx,
RepOriginId origin_id, XLogRecPtr origin_lsn,
bool send_origin);
@ -129,6 +126,12 @@ typedef struct RelationSyncEntry
bool replicate_valid; /* overall validity flag for entry */
bool schema_sent;
/*
* This is set if the 'publish_generated_columns' parameter is true, and
* the relation contains generated columns.
*/
bool include_gencols;
List *streamed_txns; /* streamed toplevel transactions with this
* schema */
@ -213,6 +216,9 @@ static void init_rel_sync_cache(MemoryContext cachectx);
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
Relation relation);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx,
RelationSyncEntry *relentry);
static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
uint32 hashvalue);
@ -731,11 +737,11 @@ maybe_send_schema(LogicalDecodingContext *ctx,
{
Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
send_relation_and_attrs(ancestor, xid, ctx, relentry);
RelationClose(ancestor);
}
send_relation_and_attrs(relation, xid, ctx, relentry->columns);
send_relation_and_attrs(relation, xid, ctx, relentry);
if (data->in_streaming)
set_schema_sent_in_streamed_txn(relentry, topxid);
@ -749,9 +755,11 @@ maybe_send_schema(LogicalDecodingContext *ctx,
static void
send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx,
Bitmapset *columns)
RelationSyncEntry *relentry)
{
TupleDesc desc = RelationGetDescr(relation);
Bitmapset *columns = relentry->columns;
bool include_gencols = relentry->include_gencols;
int i;
/*
@ -766,7 +774,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
{
Form_pg_attribute att = TupleDescAttr(desc, i);
if (!logicalrep_should_publish_column(att, columns))
if (!logicalrep_should_publish_column(att, columns, include_gencols))
continue;
if (att->atttypid < FirstGenbkiObjectId)
@ -778,7 +786,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
}
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_rel(ctx->out, xid, relation, columns);
logicalrep_write_rel(ctx->out, xid, relation, columns, include_gencols);
OutputPluginWrite(ctx, false);
}
@ -1004,6 +1012,66 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications,
}
}
/*
* If the table contains a generated column, check for any conflicting
* values of 'publish_generated_columns' parameter in the publications.
*/
static void
check_and_init_gencol(PGOutputData *data, List *publications,
RelationSyncEntry *entry)
{
Relation relation = RelationIdGetRelation(entry->publish_as_relid);
TupleDesc desc = RelationGetDescr(relation);
bool gencolpresent = false;
bool first = true;
/* Check if there is any generated column present. */
for (int i = 0; i < desc->natts; i++)
{
Form_pg_attribute att = TupleDescAttr(desc, i);
if (att->attgenerated)
{
gencolpresent = true;
break;
}
}
/* There are no generated columns to be published. */
if (!gencolpresent)
{
entry->include_gencols = false;
return;
}
/*
* There may be a conflicting value for 'publish_generated_columns'
* parameter in the publications.
*/
foreach_ptr(Publication, pub, publications)
{
/*
* The column list takes precedence over the
* 'publish_generated_columns' parameter. Those will be checked later,
* see pgoutput_column_list_init.
*/
if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
continue;
if (first)
{
entry->include_gencols = pub->pubgencols;
first = false;
}
else if (entry->include_gencols != pub->pubgencols)
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
get_namespace_name(RelationGetNamespace(relation)),
RelationGetRelationName(relation)));
}
}
/*
* Initialize the column list.
*/
@ -1014,6 +1082,10 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
ListCell *lc;
bool first = true;
Relation relation = RelationIdGetRelation(entry->publish_as_relid);
bool found_pub_collist = false;
Bitmapset *relcols = NULL;
pgoutput_ensure_entry_cxt(data, entry);
/*
* Find if there are any column lists for this relation. If there are,
@ -1027,93 +1099,39 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
* fetch_table_list. But one can later change the publication so we still
* need to check all the given publication-table mappings and report an
* error if any publications have a different column list.
*
* FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list".
*/
foreach(lc, publications)
{
Publication *pub = lfirst(lc);
HeapTuple cftuple = NULL;
Datum cfdatum = 0;
Bitmapset *cols = NULL;
/* Retrieve the bitmap of columns for a column list publication. */
found_pub_collist |= check_and_fetch_column_list(pub,
entry->publish_as_relid,
entry->entry_cxt, &cols);
/*
* If the publication is FOR ALL TABLES then it is treated the same as
* if there are no column lists (even if other publications have a
* list).
* For non-column list publications — e.g. TABLE (without a column
* list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
* of the table (including generated columns when
* 'publish_generated_columns' parameter is true).
*/
if (!pub->alltables)
if (!cols)
{
bool pub_no_list = true;
/*
* Check for the presence of a column list in this publication.
*
* Note: If we find no pg_publication_rel row, it's a publication
* defined for a whole schema, so it can't have a column list,
* just like a FOR ALL TABLES publication.
* Cache the table columns for the first publication with no
* specified column list to detect publication with a different
* column list.
*/
cftuple = SearchSysCache2(PUBLICATIONRELMAP,
ObjectIdGetDatum(entry->publish_as_relid),
ObjectIdGetDatum(pub->oid));
if (HeapTupleIsValid(cftuple))
if (!relcols && (list_length(publications) > 1))
{
/* Lookup the column list attribute. */
cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
Anum_pg_publication_rel_prattrs,
&pub_no_list);
MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
/* Build the column list bitmap in the per-entry context. */
if (!pub_no_list) /* when not null */
{
int i;
int nliveatts = 0;
TupleDesc desc = RelationGetDescr(relation);
bool att_gen_present = false;
pgoutput_ensure_entry_cxt(data, entry);
cols = pub_collist_to_bitmapset(cols, cfdatum,
entry->entry_cxt);
/* Get the number of live attributes. */
for (i = 0; i < desc->natts; i++)
{
Form_pg_attribute att = TupleDescAttr(desc, i);
if (att->attisdropped)
continue;
if (att->attgenerated)
{
/*
* Generated cols are skipped unless they are
* present in a column list.
*/
if (!bms_is_member(att->attnum, cols))
continue;
att_gen_present = true;
}
nliveatts++;
}
/*
* Generated attributes are published only when they are
* present in the column list. Otherwise, a NULL column
* list means publish all columns.
*/
if (!att_gen_present && bms_num_members(cols) == nliveatts)
{
bms_free(cols);
cols = NULL;
}
}
ReleaseSysCache(cftuple);
relcols = pub_form_cols_map(relation, entry->include_gencols);
MemoryContextSwitchTo(oldcxt);
}
cols = relcols;
}
if (first)
@ -1129,6 +1147,13 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
RelationGetRelationName(relation)));
} /* loop all subscribed publications */
/*
* If no column list publications exist, columns to be published will be
* computed later according to the 'publish_generated_columns' parameter.
*/
if (!found_pub_collist)
entry->columns = NULL;
RelationClose(relation);
}
@ -1541,15 +1566,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
case REORDER_BUFFER_CHANGE_INSERT:
logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
data->binary, relentry->columns);
data->binary, relentry->columns,
relentry->include_gencols);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
new_slot, data->binary, relentry->columns);
new_slot, data->binary, relentry->columns,
relentry->include_gencols);
break;
case REORDER_BUFFER_CHANGE_DELETE:
logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
data->binary, relentry->columns);
data->binary, relentry->columns,
relentry->include_gencols);
break;
default:
Assert(false);
@ -2000,6 +2028,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
{
entry->replicate_valid = false;
entry->schema_sent = false;
entry->include_gencols = false;
entry->streamed_txns = NIL;
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
@ -2052,6 +2081,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
* earlier definition.
*/
entry->schema_sent = false;
entry->include_gencols = false;
list_free(entry->streamed_txns);
entry->streamed_txns = NIL;
bms_free(entry->columns);
@ -2223,6 +2253,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
/* Initialize the row filter */
pgoutput_row_filter_init(data, rel_publications, entry);
/* Check whether to publish generated columns. */
check_and_init_gencol(data, rel_publications, entry);
/* Initialize the column list */
pgoutput_column_list_init(data, rel_publications, entry);
}