1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-24 14:22:24 +03:00

Change publication's publish_generated_columns option type to enum.

The current boolean publish_generated_columns option only supports a
binary choice, which is insufficient for future enhancements where
generated columns can be of different types (e.g., stored or virtual). The
supported values for the publish_generated_columns option are 'none' and
'stored'.

Author: Vignesh C <vignesh21@gmail.com>
Reviewed-by: Peter Smith <smithpb2250@gmail.com>
Reviewed-by: Peter Eisentraut <peter@eisentraut.org>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/d718d219-dd47-4a33-bb97-56e8fc4da994@eisentraut.org
Discussion: https://postgr.es/m/B80D17B2-2C8E-4C7D-87F2-E5B4BE3C069E@gmail.com
This commit is contained in:
Amit Kapila
2025-01-23 15:28:37 +05:30
parent eef4a33f62
commit e65dbc9927
19 changed files with 394 additions and 230 deletions

View File

@ -30,11 +30,12 @@
#define TRUNCATE_RESTART_SEQS (1<<1)
static void logicalrep_write_attrs(StringInfo out, Relation rel,
Bitmapset *columns, bool include_gencols);
Bitmapset *columns,
PublishGencolsType include_gencols_type);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
TupleTableSlot *slot,
bool binary, Bitmapset *columns,
bool include_gencols);
PublishGencolsType include_gencols_type);
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@ -401,7 +402,8 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
void
logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
TupleTableSlot *newslot, bool binary,
Bitmapset *columns, bool include_gencols)
Bitmapset *columns,
PublishGencolsType include_gencols_type)
{
pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
@ -413,7 +415,8 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
pq_sendint32(out, RelationGetRelid(rel));
pq_sendbyte(out, 'N'); /* new tuple follows */
logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols);
logicalrep_write_tuple(out, rel, newslot, binary, columns,
include_gencols_type);
}
/*
@ -446,7 +449,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
void
logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
TupleTableSlot *oldslot, TupleTableSlot *newslot,
bool binary, Bitmapset *columns, bool include_gencols)
bool binary, Bitmapset *columns,
PublishGencolsType include_gencols_type)
{
pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
@ -468,11 +472,12 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
else
pq_sendbyte(out, 'K'); /* old key follows */
logicalrep_write_tuple(out, rel, oldslot, binary, columns,
include_gencols);
include_gencols_type);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols);
logicalrep_write_tuple(out, rel, newslot, binary, columns,
include_gencols_type);
}
/*
@ -522,7 +527,8 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
void
logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
TupleTableSlot *oldslot, bool binary,
Bitmapset *columns, bool include_gencols)
Bitmapset *columns,
PublishGencolsType include_gencols_type)
{
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@ -542,7 +548,8 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
else
pq_sendbyte(out, 'K'); /* old key follows */
logicalrep_write_tuple(out, rel, oldslot, binary, columns, include_gencols);
logicalrep_write_tuple(out, rel, oldslot, binary, columns,
include_gencols_type);
}
/*
@ -658,7 +665,8 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
*/
void
logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
Bitmapset *columns, bool include_gencols)
Bitmapset *columns,
PublishGencolsType include_gencols_type)
{
char *relname;
@ -680,7 +688,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
pq_sendbyte(out, rel->rd_rel->relreplident);
/* send the attribute info */
logicalrep_write_attrs(out, rel, columns, include_gencols);
logicalrep_write_attrs(out, rel, columns, include_gencols_type);
}
/*
@ -757,7 +765,8 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
*/
static void
logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
bool binary, Bitmapset *columns, bool include_gencols)
bool binary, Bitmapset *columns,
PublishGencolsType include_gencols_type)
{
TupleDesc desc;
Datum *values;
@ -771,7 +780,8 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
{
Form_pg_attribute att = TupleDescAttr(desc, i);
if (!logicalrep_should_publish_column(att, columns, include_gencols))
if (!logicalrep_should_publish_column(att, columns,
include_gencols_type))
continue;
nliveatts++;
@ -789,7 +799,8 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
Form_pg_type typclass;
Form_pg_attribute att = TupleDescAttr(desc, i);
if (!logicalrep_should_publish_column(att, columns, include_gencols))
if (!logicalrep_should_publish_column(att, columns,
include_gencols_type))
continue;
if (isnull[i])
@ -908,7 +919,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
*/
static void
logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns,
bool include_gencols)
PublishGencolsType include_gencols_type)
{
TupleDesc desc;
int i;
@ -923,7 +934,8 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns,
{
Form_pg_attribute att = TupleDescAttr(desc, i);
if (!logicalrep_should_publish_column(att, columns, include_gencols))
if (!logicalrep_should_publish_column(att, columns,
include_gencols_type))
continue;
nliveatts++;
@ -941,7 +953,8 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns,
Form_pg_attribute att = TupleDescAttr(desc, i);
uint8 flags = 0;
if (!logicalrep_should_publish_column(att, columns, include_gencols))
if (!logicalrep_should_publish_column(att, columns,
include_gencols_type))
continue;
/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
@ -1254,16 +1267,17 @@ logicalrep_message_type(LogicalRepMsgType action)
*
* 'columns' represents the publication column list (if any) for that table.
*
* 'include_gencols' flag indicates whether generated columns should be
* 'include_gencols_type' value indicates whether generated columns should be
* published when there is no column list. Typically, this will have the same
* value as the 'publish_generated_columns' publication parameter.
*
* Note that generated columns can be published only when present in a
* publication column list, or when include_gencols is true.
* publication column list, or when include_gencols_type is
* PUBLISH_GENCOLS_STORED.
*/
bool
logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns,
bool include_gencols)
PublishGencolsType include_gencols_type)
{
if (att->attisdropped)
return false;
@ -1273,5 +1287,15 @@ logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns,
return bms_is_member(att->attnum, columns);
/* All non-generated columns are always published. */
return att->attgenerated ? include_gencols : true;
if (!att->attgenerated)
return true;
/*
* Stored generated columns are only published when the user sets
* publish_generated_columns as stored.
*/
if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)
return include_gencols_type == PUBLISH_GENCOLS_STORED;
return false;
}

View File

@ -128,10 +128,13 @@ typedef struct RelationSyncEntry
bool schema_sent;
/*
* This is set if the 'publish_generated_columns' parameter is true, and
* the relation contains generated columns.
* This will be PUBLISH_GENCOLS_STORED if the relation contains generated
* columns and the 'publish_generated_columns' parameter is set to
* PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
* indicating that no generated columns should be published, unless
* explicitly specified in the column list.
*/
bool include_gencols;
PublishGencolsType include_gencols_type;
List *streamed_txns; /* streamed toplevel transactions with this
* schema */
@ -763,7 +766,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
{
TupleDesc desc = RelationGetDescr(relation);
Bitmapset *columns = relentry->columns;
bool include_gencols = relentry->include_gencols;
PublishGencolsType include_gencols_type = relentry->include_gencols_type;
int i;
/*
@ -778,7 +781,8 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
{
Form_pg_attribute att = TupleDescAttr(desc, i);
if (!logicalrep_should_publish_column(att, columns, include_gencols))
if (!logicalrep_should_publish_column(att, columns,
include_gencols_type))
continue;
if (att->atttypid < FirstGenbkiObjectId)
@ -790,7 +794,8 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
}
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_rel(ctx->out, xid, relation, columns, include_gencols);
logicalrep_write_rel(ctx->out, xid, relation, columns,
include_gencols_type);
OutputPluginWrite(ctx, false);
}
@ -1044,7 +1049,7 @@ check_and_init_gencol(PGOutputData *data, List *publications,
/* There are no generated columns to be published. */
if (!gencolpresent)
{
entry->include_gencols = false;
entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
return;
}
@ -1064,10 +1069,10 @@ check_and_init_gencol(PGOutputData *data, List *publications,
if (first)
{
entry->include_gencols = pub->pubgencols;
entry->include_gencols_type = pub->pubgencols_type;
first = false;
}
else if (entry->include_gencols != pub->pubgencols)
else if (entry->include_gencols_type != pub->pubgencols_type)
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
@ -1131,7 +1136,8 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
{
MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
relcols = pub_form_cols_map(relation, entry->include_gencols);
relcols = pub_form_cols_map(relation,
entry->include_gencols_type);
MemoryContextSwitchTo(oldcxt);
}
@ -1571,17 +1577,17 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_INSERT:
logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
data->binary, relentry->columns,
relentry->include_gencols);
relentry->include_gencols_type);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
new_slot, data->binary, relentry->columns,
relentry->include_gencols);
relentry->include_gencols_type);
break;
case REORDER_BUFFER_CHANGE_DELETE:
logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
data->binary, relentry->columns,
relentry->include_gencols);
relentry->include_gencols_type);
break;
default:
Assert(false);
@ -2032,7 +2038,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
{
entry->replicate_valid = false;
entry->schema_sent = false;
entry->include_gencols = false;
entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
entry->streamed_txns = NIL;
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
@ -2082,7 +2088,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
* earlier definition.
*/
entry->schema_sent = false;
entry->include_gencols = false;
entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
list_free(entry->streamed_txns);
entry->streamed_txns = NIL;
bms_free(entry->columns);