From 66fd0adc73a8d3b0c43423c7d263cba37edaa36b Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Mon, 6 Jan 2020 08:21:14 +0100 Subject: [PATCH] Have logical replication subscriber fire column triggers The logical replication apply worker did not fire per-column update triggers because the updatedCols bitmap in the RTE was not populated. This fixes that. Reviewed-by: Euler Taveira Discussion: https://www.postgresql.org/message-id/flat/21673e2d-597c-6afe-637e-e8b10425b240%402ndquadrant.com --- src/backend/replication/logical/worker.c | 18 ++++++++++++ src/test/subscription/t/003_constraints.pl | 34 +++++++++++++++++++--- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 67126088385..7e3f3f0b0c0 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -27,6 +27,7 @@ #include "pgstat.h" #include "funcapi.h" +#include "access/sysattr.h" #include "access/xact.h" #include "access/xlog_internal.h" @@ -703,6 +704,8 @@ apply_handle_update(StringInfo s) bool has_oldtup; TupleTableSlot *localslot; TupleTableSlot *remoteslot; + RangeTblEntry *target_rte; + int i; bool found; MemoryContext oldctx; @@ -732,6 +735,21 @@ apply_handle_update(StringInfo s) ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel)); EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + /* + * Populate updatedCols so that per-column triggers can fire. This could + * include more columns than were actually changed on the publisher + * because the logical replication protocol doesn't contain that + * information. But it would for example exclude columns that only exist + * on the subscriber, since we are not touching those. + */ + target_rte = list_nth(estate->es_range_table, 0); + for (i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++) + { + if (newtup.changed[i]) + target_rte->updatedCols = bms_add_member(target_rte->updatedCols, + i + 1 - FirstLowInvalidHeapAttributeNumber); + } + PushActiveSnapshot(GetTransactionSnapshot()); ExecOpenIndices(estate->es_result_relation_info, false); diff --git a/src/test/subscription/t/003_constraints.pl b/src/test/subscription/t/003_constraints.pl index 06863aef84a..65528edf537 100644 --- a/src/test/subscription/t/003_constraints.pl +++ b/src/test/subscription/t/003_constraints.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 4; +use Test::More tests => 6; # Initialize publisher node my $node_publisher = get_new_node('publisher'); @@ -88,6 +88,8 @@ BEGIN ELSE RETURN NULL; END IF; + ELSIF (TG_OP = 'UPDATE') THEN + RETURN NULL; ELSE RAISE WARNING 'Unknown action'; RETURN NULL; @@ -95,7 +97,7 @@ BEGIN END; \$\$ LANGUAGE plpgsql; CREATE TRIGGER filter_basic_dml_trg - BEFORE INSERT ON tab_fk_ref + BEFORE INSERT OR UPDATE OF bid ON tab_fk_ref FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn(); ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg; }); @@ -107,10 +109,34 @@ $node_publisher->safe_psql('postgres', $node_publisher->poll_query_until('postgres', $caughtup_query) or die "Timed out while waiting for subscriber to catch up"; -# The row should be skipped on subscriber +# The trigger should cause the insert to be skipped on subscriber $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;"); -is($result, qq(2|1|2), 'check replica trigger applied on subscriber'); +is($result, qq(2|1|2), 'check replica insert trigger applied on subscriber'); + +# Update data +$node_publisher->safe_psql('postgres', + "UPDATE tab_fk_ref SET bid = 2 WHERE bid = 1;"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# The trigger should cause the update to be skipped on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;"); +is($result, qq(2|1|2), 'check replica update column trigger applied on subscriber'); + +# Update on a column not specified in the trigger, but it will trigger +# anyway because logical replication ships all columns in an update. +$node_publisher->safe_psql('postgres', + "UPDATE tab_fk_ref SET id = 6 WHERE id = 1;"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(id), max(id) FROM tab_fk_ref;"); +is($result, qq(2|1|2), 'check column trigger applied on even for other column'); $node_subscriber->stop('fast'); $node_publisher->stop('fast');