1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-26 01:22:12 +03:00

Prohibit combining publications with different column lists.

Currently, we simply combine the column lists when publishing tables on
multiple publications and that can sometimes lead to unexpected behavior.
Say, if a column is published in any row-filtered publication, then the
values for that column are sent to the subscriber even for rows that don't
match the row filter, as long as the row matches the row filter for any
other publication, even if that other publication doesn't include the
column.

The main purpose of introducing a column list is to have statically
different shapes on publisher and subscriber or hide sensitive column
data. In both cases, it doesn't seem to make sense to combine column
lists.

So, we disallow the cases where the column list is different for the same
table when combining publications. It can be later extended to combine the
column lists for selective cases where required.

Reported-by: Alvaro Herrera
Author: Hou Zhijie
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/202204251548.mudq7jbqnh7r@alvherre.pgsql
This commit is contained in:
Amit Kapila
2022-06-02 08:31:50 +05:30
parent 99f6f19799
commit fd0b9dcebd
6 changed files with 181 additions and 166 deletions

View File

@ -1754,6 +1754,11 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
/*
* Get the list of tables which belong to specified publications on the
* publisher connection.
*
* Note that we don't support the case where the column list is different for
* the same table in different publications to avoid sending unwanted column
* information for some of the rows. This can happen when both the column
* list and row filter are specified for different publications.
*/
static List *
fetch_table_list(WalReceiverConn *wrconn, List *publications)
@ -1761,17 +1766,23 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[2] = {TEXTOID, TEXTOID};
Oid tableRow[3] = {TEXTOID, TEXTOID, NAMEARRAYOID};
List *tablelist = NIL;
bool check_columnlist = (walrcv_server_version(wrconn) >= 150000);
initStringInfo(&cmd);
appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
" FROM pg_catalog.pg_publication_tables t\n"
appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
/* Get column lists for each relation if the publisher supports it */
if (check_columnlist)
appendStringInfoString(&cmd, ", t.attnames\n");
appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
" WHERE t.pubname IN (");
get_publications_str(publications, &cmd, true);
appendStringInfoChar(&cmd, ')');
res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
pfree(cmd.data);
if (res->status != WALRCV_OK_TUPLES)
@ -1795,7 +1806,14 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
Assert(!isnull);
rv = makeRangeVar(nspname, relname, -1);
tablelist = lappend(tablelist, rv);
if (check_columnlist && list_member(tablelist, rv))
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
nspname, relname));
else
tablelist = lappend(tablelist, rv);
ExecClearTuple(slot);
}

View File

@ -753,17 +753,6 @@ fetch_remote_table_info(char *nspname, char *relname,
/*
* Get column lists for each relation.
*
* For initial synchronization, column lists can be ignored in following
* cases:
*
* 1) one of the subscribed publications for the table hasn't specified
* any column list
*
* 2) one of the subscribed publications has puballtables set to true
*
* 3) one of the subscribed publications is declared as ALL TABLES IN
* SCHEMA that includes this relation
*
* We need to do this before fetching info about column names and types,
* so that we can skip columns that should not be replicated.
*/
@ -771,7 +760,7 @@ fetch_remote_table_info(char *nspname, char *relname,
{
WalRcvExecResult *pubres;
TupleTableSlot *slot;
Oid attrsRow[] = {INT2OID};
Oid attrsRow[] = {INT2VECTOROID};
StringInfoData pub_names;
bool first = true;
@ -786,19 +775,17 @@ fetch_remote_table_info(char *nspname, char *relname,
/*
* Fetch info about column lists for the relation (from all the
* publications). We unnest the int2vector values, because that makes
* it easier to combine lists by simply adding the attnums to a new
* bitmap (without having to parse the int2vector data). This
* preserves NULL values, so that if one of the publications has no
* column list, we'll know that.
* publications).
*/
resetStringInfo(&cmd);
appendStringInfo(&cmd,
"SELECT DISTINCT unnest"
"SELECT DISTINCT"
" (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
" THEN NULL ELSE gpt.attrs END)"
" FROM pg_publication p,"
" LATERAL pg_get_publication_tables(p.pubname) gpt"
" LEFT OUTER JOIN unnest(gpt.attrs) ON TRUE"
" WHERE gpt.relid = %u"
" LATERAL pg_get_publication_tables(p.pubname) gpt,"
" pg_class c"
" WHERE gpt.relid = %u AND c.oid = gpt.relid"
" AND p.pubname IN ( %s )",
lrel->remoteid,
pub_names.data);
@ -813,26 +800,43 @@ fetch_remote_table_info(char *nspname, char *relname,
nspname, relname, pubres->err)));
/*
* Merge the column lists (from different publications) by creating a
* single bitmap with all the attnums. If we find a NULL value, that
* means one of the publications has no column list for the table
* we're syncing.
* We don't support the case where the column list is different for
* the same table when combining publications. See comments atop
* fetch_table_list. So there should be only one row returned.
* Although we already checked this when creating the subscription, we
* still need to check here in case the column list was changed after
* creating the subscription and before the sync worker is started.
*/
if (tuplestore_tuple_count(pubres->tuplestore) > 1)
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
nspname, relname));
/*
* Get the column list and build a single bitmap with the attnums.
*
* If we find a NULL value, it means all the columns should be
* replicated.
*/
slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
if (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
{
Datum cfval = slot_getattr(slot, 1, &isnull);
/* NULL means empty column list, so we're done. */
if (isnull)
if (!isnull)
{
bms_free(included_cols);
included_cols = NULL;
break;
}
ArrayType *arr;
int nelems;
int16 *elems;
included_cols = bms_add_member(included_cols,
DatumGetInt16(cfval));
arr = DatumGetArrayTypeP(cfval);
nelems = ARR_DIMS(arr)[0];
elems = (int16 *) ARR_DATA_PTR(arr);
for (natt = 0; natt < nelems; natt++)
included_cols = bms_add_member(included_cols, elems[natt]);
}
ExecClearTuple(slot);
}

View File

@ -979,30 +979,31 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
RelationSyncEntry *entry)
{
ListCell *lc;
bool first = true;
Relation relation = RelationIdGetRelation(entry->publish_as_relid);
/*
* Find if there are any column lists for this relation. If there are,
* build a bitmap merging all the column lists.
*
* All the given publication-table mappings must be checked.
* build a bitmap using the column lists.
*
* Multiple publications might have multiple column lists for this
* relation.
*
* Note that we don't support the case where the column list is different
* for the same table when combining publications. See comments atop
* fetch_table_list. But one can later change the publication so we still
* need to check all the given publication-table mappings and report an
* error if any publications have a different column list.
*
* FOR ALL TABLES and FOR ALL TABLES IN SCHEMA implies "don't use column
* list" so it takes precedence.
* list".
*/
foreach(lc, publications)
{
Publication *pub = lfirst(lc);
HeapTuple cftuple = NULL;
Datum cfdatum = 0;
/*
* Assume there's no column list. Only if we find pg_publication_rel
* entry with a column list we'll switch it to false.
*/
bool pub_no_list = true;
Bitmapset *cols = NULL;
/*
* If the publication is FOR ALL TABLES then it is treated the same as
@ -1011,6 +1012,8 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
*/
if (!pub->alltables)
{
bool pub_no_list = true;
/*
* Check for the presence of a column list in this publication.
*
@ -1024,51 +1027,48 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
if (HeapTupleIsValid(cftuple))
{
/*
* Lookup the column list attribute.
*
* Note: We update the pub_no_list value directly, because if
* the value is NULL, we have no list (and vice versa).
*/
/* Lookup the column list attribute. */
cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
Anum_pg_publication_rel_prattrs,
&pub_no_list);
/*
* Build the column list bitmap in the per-entry context.
*
* We need to merge column lists from all publications, so we
* update the same bitmapset. If the column list is null, we
* interpret it as replicating all columns.
*/
/* Build the column list bitmap in the per-entry context. */
if (!pub_no_list) /* when not null */
{
pgoutput_ensure_entry_cxt(data, entry);
entry->columns = pub_collist_to_bitmapset(entry->columns,
cfdatum,
entry->entry_cxt);
cols = pub_collist_to_bitmapset(cols, cfdatum,
entry->entry_cxt);
/*
* If column list includes all the columns of the table,
* set it to NULL.
*/
if (bms_num_members(cols) == RelationGetNumberOfAttributes(relation))
{
bms_free(cols);
cols = NULL;
}
}
ReleaseSysCache(cftuple);
}
}
/*
* Found a publication with no column list, so we're done. But first
* discard column list we might have from preceding publications.
*/
if (pub_no_list)
if (first)
{
if (cftuple)
ReleaseSysCache(cftuple);
bms_free(entry->columns);
entry->columns = NULL;
break;
entry->columns = cols;
first = false;
}
ReleaseSysCache(cftuple);
else if (!bms_equal(entry->columns, cols))
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
get_namespace_name(RelationGetNamespace(relation)),
RelationGetRelationName(relation)));
} /* loop all subscribed publications */
RelationClose(relation);
}
/*