From ebf87c019c042a33cf9d2810c1fe360ddc7f8e93 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Fri, 2 Dec 2022 10:34:16 +0530 Subject: [PATCH] Fix incorrect output from pgoutput when using column lists. For Updates and Deletes, we were not honoring the columns list for old tuple values while sending tuple data via pgoutput. This results in pgoutput emitting more columns than expected. This is not a problem for built-in logical replication as we simply ignore additional columns based on the relation information sent previously which didn't have those columns. However, some other users of pgoutput plugin may expect the columns as per the column list. Also, sending extra columns unnecessarily consumes network bandwidth defeating the purpose of the column list feature. Reported-by: Gunnar Morling Author: Hou Zhijie Reviewed-by: Amit Kapila Backpatch-through: 15 Discussion: https://postgr.es/m/CADGJaX9kiRZ-OH0EpWF5Fkyh1ZZYofoNRCrhapBfdk02tj5EKg@mail.gmail.com --- src/backend/replication/logical/proto.c | 7 +++-- src/backend/replication/pgoutput/pgoutput.c | 6 ++-- src/include/replication/logicalproto.h | 2 +- src/test/subscription/t/031_column_list.pl | 33 +++++++++++++++++++++ 4 files changed, 42 insertions(+), 6 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index ff8513e2d29..f5f2bc24d8f 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -478,7 +478,7 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldslot, binary, NULL); + logicalrep_write_tuple(out, rel, oldslot, binary, columns); } pq_sendbyte(out, 'N'); /* new tuple follows */ @@ -531,7 +531,8 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, */ void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, - TupleTableSlot *oldslot, bool binary) + TupleTableSlot *oldslot, bool binary, + Bitmapset *columns) { Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -551,7 +552,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldslot, binary, NULL); + logicalrep_write_tuple(out, rel, oldslot, binary, columns); } /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 90228fdd86a..a059315c4f9 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1513,7 +1513,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, break; case REORDER_BUFFER_CHANGE_DELETE: logicalrep_write_delete(ctx->out, xid, targetrel, - old_slot, data->binary); + old_slot, data->binary, + relentry->columns); break; default: Assert(false); @@ -1559,7 +1560,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginPrepareWrite(ctx, true); logicalrep_write_delete(ctx->out, xid, targetrel, - old_slot, data->binary); + old_slot, data->binary, + relentry->columns); OutputPluginWrite(ctx, true); } else diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index a771ab8ff33..04e1cd89cf7 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -220,7 +220,7 @@ extern LogicalRepRelId logicalrep_read_update(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldtuple, - bool binary); + bool binary, Bitmapset *columns); extern LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup); extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl index ae022faa785..2ca120f7a47 100644 --- a/src/test/subscription/t/031_column_list.pl +++ b/src/test/subscription/t/031_column_list.pl @@ -1151,6 +1151,39 @@ is( $node_subscriber->safe_psql( 4||), 'publication containing both parent and child relation'); +# TEST: Only columns in the column list should exist in the old tuple of UPDATE +# and DELETE. + +$node_publisher->safe_psql( + 'postgres', qq( + CREATE TABLE test_oldtuple_col (a int PRIMARY KEY, b int, c int); + CREATE PUBLICATION pub_check_oldtuple FOR TABLE test_oldtuple_col (a, b); + INSERT INTO test_oldtuple_col VALUES(1, 2, 3); + SELECT * FROM pg_create_logical_replication_slot('test_slot', 'pgoutput'); + UPDATE test_oldtuple_col SET a = 2; + DELETE FROM test_oldtuple_col; +)); + + +# Check at 7th byte of binary data for the number of columns in the old tuple. +# +# 7 = 1 (count from 1) + 1 byte (message type) + 4 byte (relid) + 1 byte (flag +# for old key). +# +# The message type of UPDATE is 85('U'). +# The message type of DELETE is 68('D'). +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT substr(data, 7, 2) = int2send(2::smallint) + FROM pg_logical_slot_peek_binary_changes('test_slot', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'pub_check_oldtuple') + WHERE get_byte(data, 0) = 85 OR get_byte(data, 0) = 68 +)); + +is( $result, qq(t +t), 'check the number of columns in the old tuple'); + # TEST: With a table included in multiple publications with different column # lists, we should catch the error when creating the subscription.