diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 2b1356ee249..04b4f494bb9 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -548,6 +548,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) /* Allocate space for per-column values; zero out unused StringInfoDatas */ tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData)); tuple->colstatus = (char *) palloc(natts * sizeof(char)); + tuple->ncols = natts; /* Read the data */ for (i = 0; i < natts; i++) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 407eee3c0bc..2fcf2e61bc3 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -354,6 +354,8 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, { StringInfo colvalue = &tupleData->colvalues[remoteattnum]; + Assert(remoteattnum < tupleData->ncols); + errarg.local_attnum = i; errarg.remote_attnum = remoteattnum; @@ -477,6 +479,8 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, if (remoteattnum < 0) continue; + Assert(remoteattnum < tupleData->ncols); + if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED) { StringInfo colvalue = &tupleData->colvalues[remoteattnum]; @@ -831,9 +835,17 @@ apply_handle_update(StringInfo s) target_rte = list_nth(estate->es_range_table, 0); for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++) { - if (newtup.colstatus[i] != LOGICALREP_COLUMN_UNCHANGED) - target_rte->updatedCols = bms_add_member(target_rte->updatedCols, - i + 1 - FirstLowInvalidHeapAttributeNumber); + Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i); + int remoteattnum = rel->attrmap->attnums[i]; + + if (!att->attisdropped && remoteattnum >= 0) + { + Assert(remoteattnum < newtup.ncols); + if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED) + target_rte->updatedCols = + bms_add_member(target_rte->updatedCols, + i + 1 - FirstLowInvalidHeapAttributeNumber); + } } fill_extraUpdatedCols(target_rte, RelationGetDescr(rel->localrel)); diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 287288ab415..60a76bc85cf 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -27,13 +27,18 @@ #define LOGICALREP_PROTO_MIN_VERSION_NUM 1 #define LOGICALREP_PROTO_VERSION_NUM 1 -/* Tuple coming via logical replication. */ +/* + * This struct stores a tuple received via logical replication. + * Keep in mind that the columns correspond to the *remote* table. + */ typedef struct LogicalRepTupleData { /* Array of StringInfos, one per column; some may be unused */ StringInfoData *colvalues; /* Array of markers for null/unchanged/text/binary, one per column */ char *colstatus; + /* Length of above arrays */ + int ncols; } LogicalRepTupleData; /* Possible values for LogicalRepTupleData.colstatus[colnum] */ diff --git a/src/test/subscription/t/003_constraints.pl b/src/test/subscription/t/003_constraints.pl index 3a590f871a5..9f140b552b4 100644 --- a/src/test/subscription/t/003_constraints.pl +++ b/src/test/subscription/t/003_constraints.pl @@ -19,14 +19,14 @@ $node_subscriber->start; $node_publisher->safe_psql('postgres', "CREATE TABLE tab_fk (bid int PRIMARY KEY);"); $node_publisher->safe_psql('postgres', - "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));" + "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, junk text, bid int REFERENCES tab_fk (bid));" ); -# Setup structure on subscriber +# Setup structure on subscriber; column order intentionally different $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_fk (bid int PRIMARY KEY);"); $node_subscriber->safe_psql('postgres', - "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));" + "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid), junk text);" ); # Setup logical replication @@ -42,8 +42,10 @@ $node_publisher->wait_for_catchup('tap_sub'); $node_publisher->safe_psql('postgres', "INSERT INTO tab_fk (bid) VALUES (1);"); +# "junk" value is meant to be large enough to force out-of-line storage $node_publisher->safe_psql('postgres', - "INSERT INTO tab_fk_ref (id, bid) VALUES (1, 1);"); + "INSERT INTO tab_fk_ref (id, bid, junk) VALUES (1, 1, repeat(pi()::text,20000));" +); $node_publisher->wait_for_catchup('tap_sub'); @@ -128,7 +130,7 @@ $node_publisher->wait_for_catchup('tap_sub'); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(id), max(id) FROM tab_fk_ref;"); is($result, qq(2|1|2), - 'check column trigger applied on even for other column'); + 'check column trigger applied even on update for other column'); $node_subscriber->stop('fast'); $node_publisher->stop('fast');