mirror of
https://github.com/postgres/postgres.git
synced 2025-04-22 23:02:54 +03:00
Logical messages, added in 3fe3511d05, during decoding failed to filter messages emitted in other databases and messages emitted "under" a replication origin the output plugin isn't interested in. Add tests to verify that both types of filtering actually work. While touching message.sql remove hunk obsoleted by d25379e. Bump XLOG_PAGE_MAGIC because xl_logical_message changed and because 3fe3511d05 had omitted doing so. 3fe3511d05 additionally didn't bump catversion, but 7a542700d has done so since. Author: Petr Jelinek Reported-By: Andres Freund Discussion: 20160406142513.wotqy3ba3kanr423@alap3.anarazel.de
148 lines
5.4 KiB
Plaintext
148 lines
5.4 KiB
Plaintext
-- predictability
|
|
SET synchronous_commit = on;
|
|
CREATE TABLE origin_tbl(id serial primary key, data text);
|
|
CREATE TABLE target_tbl(id serial primary key, data text);
|
|
SELECT pg_replication_origin_create('test_decoding: regression_slot');
|
|
pg_replication_origin_create
|
|
------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
-- ensure duplicate creations fail
|
|
SELECT pg_replication_origin_create('test_decoding: regression_slot');
|
|
ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index"
|
|
DETAIL: Key (roname)=(test_decoding: regression_slot) already exists.
|
|
--ensure deletions work (once)
|
|
SELECT pg_replication_origin_create('test_decoding: temp');
|
|
pg_replication_origin_create
|
|
------------------------------
|
|
2
|
|
(1 row)
|
|
|
|
SELECT pg_replication_origin_drop('test_decoding: temp');
|
|
pg_replication_origin_drop
|
|
----------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT pg_replication_origin_drop('test_decoding: temp');
|
|
ERROR: cache lookup failed for replication origin 'test_decoding: temp'
|
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
|
|
?column?
|
|
----------
|
|
init
|
|
(1 row)
|
|
|
|
-- origin tx
|
|
INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
|
|
INSERT INTO target_tbl(data)
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
|
-- as is normal, the insert into target_tbl shows up
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
|
data
|
|
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
|
BEGIN
|
|
table public.target_tbl: INSERT: id[integer]:1 data[text]:'BEGIN'
|
|
table public.target_tbl: INSERT: id[integer]:2 data[text]:'table public.origin_tbl: INSERT: id[integer]:1 data[text]:''will be replicated and decoded and decoded again'''
|
|
table public.target_tbl: INSERT: id[integer]:3 data[text]:'COMMIT'
|
|
COMMIT
|
|
(5 rows)
|
|
|
|
INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
|
|
-- mark session as replaying
|
|
SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
|
|
pg_replication_origin_session_setup
|
|
-------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- ensure we prevent duplicate setup
|
|
SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
|
|
ERROR: cannot setup replication origin when one is already setup
|
|
SELECT '' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded');
|
|
?column?
|
|
----------
|
|
|
|
(1 row)
|
|
|
|
BEGIN;
|
|
-- setup transaction origin
|
|
SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
|
|
pg_replication_origin_xact_setup
|
|
----------------------------------
|
|
|
|
(1 row)
|
|
|
|
INSERT INTO target_tbl(data)
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
|
|
COMMIT;
|
|
-- check replication progress for the session is correct
|
|
SELECT pg_replication_origin_session_progress(false);
|
|
pg_replication_origin_session_progress
|
|
----------------------------------------
|
|
0/AABBCCDD
|
|
(1 row)
|
|
|
|
SELECT pg_replication_origin_session_progress(true);
|
|
pg_replication_origin_session_progress
|
|
----------------------------------------
|
|
0/AABBCCDD
|
|
(1 row)
|
|
|
|
SELECT pg_replication_origin_session_reset();
|
|
pg_replication_origin_session_reset
|
|
-------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status;
|
|
local_id | external_id | remote_lsn | ?column?
|
|
----------+--------------------------------+------------+----------
|
|
1 | test_decoding: regression_slot | 0/AABBCCDD | t
|
|
(1 row)
|
|
|
|
-- check replication progress identified by name is correct
|
|
SELECT pg_replication_origin_progress('test_decoding: regression_slot', false);
|
|
pg_replication_origin_progress
|
|
--------------------------------
|
|
0/AABBCCDD
|
|
(1 row)
|
|
|
|
SELECT pg_replication_origin_progress('test_decoding: regression_slot', true);
|
|
pg_replication_origin_progress
|
|
--------------------------------
|
|
0/AABBCCDD
|
|
(1 row)
|
|
|
|
-- ensure reset requires previously setup state
|
|
SELECT pg_replication_origin_session_reset();
|
|
ERROR: no replication origin is configured
|
|
-- and magically the replayed xact will be filtered!
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
|
|
data
|
|
------
|
|
(0 rows)
|
|
|
|
--but new original changes still show up
|
|
INSERT INTO origin_tbl(data) VALUES ('will be replicated');
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
|
|
data
|
|
--------------------------------------------------------------------------------
|
|
BEGIN
|
|
table public.origin_tbl: INSERT: id[integer]:3 data[text]:'will be replicated'
|
|
COMMIT
|
|
(3 rows)
|
|
|
|
SELECT pg_drop_replication_slot('regression_slot');
|
|
pg_drop_replication_slot
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT pg_replication_origin_drop('test_decoding: regression_slot');
|
|
pg_replication_origin_drop
|
|
----------------------------
|
|
|
|
(1 row)
|
|
|