mirror of
https://github.com/postgres/postgres.git
synced 2025-06-26 12:21:12 +03:00
Fix data loss in logical replication.
This commit is a backpatch of commit 4909b38af0
for 13.
Data loss can happen when the DDLs like ALTER PUBLICATION ... ADD TABLE ...
or ALTER TYPE ... that don't take a strong lock on table happens
concurrently to DMLs on the tables involved in the DDL. This happens
because logical decoding doesn't distribute invalidations to concurrent
transactions and those transactions use stale cache data to decode the
changes. The problem becomes bigger because we keep using the stale cache
even after those in-progress transactions are finished and skip the
changes required to be sent to the client.
This commit fixes the issue by distributing invalidation messages from
catalog-modifying transactions to all concurrent in-progress transactions.
This allows the necessary rebuild of the catalog cache when decoding new
changes after concurrent DDL.
The fix for 13 is different from what we did in branches 14 and above,
such that for 13, the concurrent DDL changes (from DDL types mentioned
earlier) will be visible for any newly started transactions. To make them
visible in concurrent transactions, we need to introduce a new change type
REORDER_BUFFER_CHANGE_INVALIDATION, already present in branches 14 and
greater. We decided not to take the risk of a bigger change and fix the
issue partially in 13.
Reported-by: hubert depesz lubaczewski <depesz@depesz.com>
Reported-by: Tomas Vondra <tomas.vondra@enterprisedb.com>
Author: Shlok Kyal <shlok.kyal.oss@gmail.com>
Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Tested-by: Benoit Lobréau <benoit.lobreau@dalibo.com>
Discussion: https://postgr.es/m/de52b282-1166-1180-45a2-8d8917ca74c6@enterprisedb.com
Discussion: https://postgr.es/m/CAD21AoAenVqiMjpN-PvGHL1N9DWnHSq673bfgr6phmBUzx=kLQ@mail.gmail.com
Discussion: https://postgr.es/m/CAD21AoAhU3kp8shYqP=ExiFDZ9sZxpFb5WzLa0p+vEe5j+7CWQ@mail.gmail.com
This commit is contained in:
@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
|
||||
spill slot truncate
|
||||
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
|
||||
oldest_xmin snapshot_transfer subxact_without_top catalog_change_snapshot \
|
||||
skip_snapshot_restore
|
||||
skip_snapshot_restore invalidation_distrubution
|
||||
|
||||
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
|
||||
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
|
||||
|
20
contrib/test_decoding/expected/invalidation_distrubution.out
Normal file
20
contrib/test_decoding/expected/invalidation_distrubution.out
Normal file
@ -0,0 +1,20 @@
|
||||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
|
||||
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
|
||||
step s1_begin: BEGIN;
|
||||
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
|
||||
step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
|
||||
step s1_commit: COMMIT;
|
||||
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
|
||||
step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
|
||||
count
|
||||
-----
|
||||
1
|
||||
(1 row)
|
||||
|
||||
?column?
|
||||
--------
|
||||
stop
|
||||
(1 row)
|
||||
|
32
contrib/test_decoding/specs/invalidation_distrubution.spec
Normal file
32
contrib/test_decoding/specs/invalidation_distrubution.spec
Normal file
@ -0,0 +1,32 @@
|
||||
# Test that catalog cache invalidation messages are distributed to ongoing
|
||||
# transactions, ensuring they can access the updated catalog content after
|
||||
# processing these messages.
|
||||
setup
|
||||
{
|
||||
SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput');
|
||||
CREATE TABLE tbl1(val1 integer, val2 integer);
|
||||
CREATE PUBLICATION pub;
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP TABLE tbl1;
|
||||
DROP PUBLICATION pub;
|
||||
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
|
||||
}
|
||||
|
||||
session "s1"
|
||||
setup { SET synchronous_commit=on; }
|
||||
|
||||
step "s1_begin" { BEGIN; }
|
||||
step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); }
|
||||
step "s1_commit" { COMMIT; }
|
||||
|
||||
session "s2"
|
||||
setup { SET synchronous_commit=on; }
|
||||
|
||||
step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
|
||||
step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; }
|
||||
|
||||
# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
|
||||
permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"
|
Reference in New Issue
Block a user