From d5daae47db5e8a61ce6ed7afaa3e3a99af108c06 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Mon, 20 Jul 2020 13:40:16 -0400 Subject: [PATCH] Fix construction of updated-columns bitmap in logical replication. Commit b9c130a1f failed to apply the publisher-to-subscriber column mapping while checking which columns were updated. Perhaps less significantly, it didn't exclude dropped columns either. This could result in an incorrect updated-columns bitmap and thus wrong decisions about whether to fire column-specific triggers on the subscriber while applying updates. In HEAD (since commit 9de77b545), it could also result in accesses off the end of the colstatus array, as detected by buildfarm member skink. Fix the logic, and adjust 003_constraints.pl so that the problem is exposed in unpatched code. In HEAD, also add some assertions to check that we don't access off the ends of these newly variable-sized arrays. Back-patch to v10, as b9c130a1f was. Discussion: https://postgr.es/m/CAH2-Wz=79hKQ4++c5A060RYbjTHgiYTHz=fw6mptCtgghH2gJA@mail.gmail.com --- src/backend/replication/logical/proto.c | 1 + src/backend/replication/logical/worker.c | 18 +++++++++++++++--- src/include/replication/logicalproto.h | 7 ++++++- src/test/subscription/t/003_constraints.pl | 12 +++++++----- 4 files changed, 29 insertions(+), 9 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 2b1356ee249..04b4f494bb9 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -548,6 +548,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) /* Allocate space for per-column values; zero out unused StringInfoDatas */ tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData)); tuple->colstatus = (char *) palloc(natts * sizeof(char)); + tuple->ncols = natts; /* Read the data */ for (i = 0; i < natts; i++) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 407eee3c0bc..2fcf2e61bc3 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -354,6 +354,8 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, { StringInfo colvalue = &tupleData->colvalues[remoteattnum]; + Assert(remoteattnum < tupleData->ncols); + errarg.local_attnum = i; errarg.remote_attnum = remoteattnum; @@ -477,6 +479,8 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, if (remoteattnum < 0) continue; + Assert(remoteattnum < tupleData->ncols); + if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED) { StringInfo colvalue = &tupleData->colvalues[remoteattnum]; @@ -831,9 +835,17 @@ apply_handle_update(StringInfo s) target_rte = list_nth(estate->es_range_table, 0); for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++) { - if (newtup.colstatus[i] != LOGICALREP_COLUMN_UNCHANGED) - target_rte->updatedCols = bms_add_member(target_rte->updatedCols, - i + 1 - FirstLowInvalidHeapAttributeNumber); + Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i); + int remoteattnum = rel->attrmap->attnums[i]; + + if (!att->attisdropped && remoteattnum >= 0) + { + Assert(remoteattnum < newtup.ncols); + if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED) + target_rte->updatedCols = + bms_add_member(target_rte->updatedCols, + i + 1 - FirstLowInvalidHeapAttributeNumber); + } } fill_extraUpdatedCols(target_rte, RelationGetDescr(rel->localrel)); diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 287288ab415..60a76bc85cf 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -27,13 +27,18 @@ #define LOGICALREP_PROTO_MIN_VERSION_NUM 1 #define LOGICALREP_PROTO_VERSION_NUM 1 -/* Tuple coming via logical replication. */ +/* + * This struct stores a tuple received via logical replication. + * Keep in mind that the columns correspond to the *remote* table. + */ typedef struct LogicalRepTupleData { /* Array of StringInfos, one per column; some may be unused */ StringInfoData *colvalues; /* Array of markers for null/unchanged/text/binary, one per column */ char *colstatus; + /* Length of above arrays */ + int ncols; } LogicalRepTupleData; /* Possible values for LogicalRepTupleData.colstatus[colnum] */ diff --git a/src/test/subscription/t/003_constraints.pl b/src/test/subscription/t/003_constraints.pl index 3a590f871a5..9f140b552b4 100644 --- a/src/test/subscription/t/003_constraints.pl +++ b/src/test/subscription/t/003_constraints.pl @@ -19,14 +19,14 @@ $node_subscriber->start; $node_publisher->safe_psql('postgres', "CREATE TABLE tab_fk (bid int PRIMARY KEY);"); $node_publisher->safe_psql('postgres', - "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));" + "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, junk text, bid int REFERENCES tab_fk (bid));" ); -# Setup structure on subscriber +# Setup structure on subscriber; column order intentionally different $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_fk (bid int PRIMARY KEY);"); $node_subscriber->safe_psql('postgres', - "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));" + "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid), junk text);" ); # Setup logical replication @@ -42,8 +42,10 @@ $node_publisher->wait_for_catchup('tap_sub'); $node_publisher->safe_psql('postgres', "INSERT INTO tab_fk (bid) VALUES (1);"); +# "junk" value is meant to be large enough to force out-of-line storage $node_publisher->safe_psql('postgres', - "INSERT INTO tab_fk_ref (id, bid) VALUES (1, 1);"); + "INSERT INTO tab_fk_ref (id, bid, junk) VALUES (1, 1, repeat(pi()::text,20000));" +); $node_publisher->wait_for_catchup('tap_sub'); @@ -128,7 +130,7 @@ $node_publisher->wait_for_catchup('tap_sub'); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(id), max(id) FROM tab_fk_ref;"); is($result, qq(2|1|2), - 'check column trigger applied on even for other column'); + 'check column trigger applied even on update for other column'); $node_subscriber->stop('fast'); $node_publisher->stop('fast');