mirror of
https://github.com/postgres/postgres.git
synced 2025-10-15 05:46:52 +03:00
Commit 216a784829
introduced parallel apply workers, allowing multiple
processes to share a replication origin. To support this,
replorigin_session_setup() was extended to accept a pid argument
identifying the process using the origin.
This commit exposes that capability through the SQL interface function
pg_replication_origin_session_setup() by adding an optional pid parameter.
This enables multiple processes to coordinate replication using the same
origin when using SQL-level replication functions.
This change allows the non-builtin logical replication solutions to
implement parallel apply for large transactions.
Additionally, an existing internal error was made user-facing, as it can
now be triggered via the exposed SQL API.
Author: Doruk Yilmaz <doruk@mixrank.com>
Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Euler Taveira <euler@eulerto.com>
Discussion: https://postgr.es/m/CAMPB6wfe4zLjJL8jiZV5kjjpwBM2=rTRme0UCL7Ra4L8MTVdOg@mail.gmail.com
Discussion: https://postgr.es/m/CAE2gYzyTSNvHY1+iWUwykaLETSuAZsCWyryokjP6rG46ZvRgQA@mail.gmail.com
152 lines
6.9 KiB
PL/PgSQL
152 lines
6.9 KiB
PL/PgSQL
-- predictability
|
|
SET synchronous_commit = on;
|
|
|
|
-- superuser required by default
|
|
CREATE ROLE regress_origin_replication REPLICATION;
|
|
SET ROLE regress_origin_replication;
|
|
SELECT pg_replication_origin_advance('regress_test_decoding: perm', '0/1');
|
|
SELECT pg_replication_origin_create('regress_test_decoding: perm');
|
|
SELECT pg_replication_origin_drop('regress_test_decoding: perm');
|
|
SELECT pg_replication_origin_oid('regress_test_decoding: perm');
|
|
SELECT pg_replication_origin_progress('regress_test_decoding: perm', false);
|
|
SELECT pg_replication_origin_session_is_setup();
|
|
SELECT pg_replication_origin_session_progress(false);
|
|
SELECT pg_replication_origin_session_reset();
|
|
SELECT pg_replication_origin_session_setup('regress_test_decoding: perm');
|
|
SELECT pg_replication_origin_xact_reset();
|
|
SELECT pg_replication_origin_xact_setup('0/1', '2013-01-01 00:00');
|
|
SELECT pg_show_replication_origin_status();
|
|
RESET ROLE;
|
|
DROP ROLE regress_origin_replication;
|
|
|
|
CREATE TABLE origin_tbl(id serial primary key, data text);
|
|
CREATE TABLE target_tbl(id serial primary key, data text);
|
|
|
|
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
|
|
-- ensure duplicate creations fail
|
|
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
|
|
|
|
-- ensure inactive origin cannot be set as session one if pid is specified
|
|
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
|
|
|
|
--ensure deletions work (once)
|
|
SELECT pg_replication_origin_create('regress_test_decoding: temp');
|
|
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
|
|
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
|
|
|
|
-- specifying reserved origin names is not supported
|
|
SELECT pg_replication_origin_create('any');
|
|
SELECT pg_replication_origin_create('none');
|
|
SELECT pg_replication_origin_create('pg_replication_origin');
|
|
|
|
-- various failure checks for undefined slots
|
|
select pg_replication_origin_advance('regress_test_decoding: temp', '0/1');
|
|
select pg_replication_origin_session_setup('regress_test_decoding: temp');
|
|
select pg_replication_origin_progress('regress_test_decoding: temp', true);
|
|
|
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
|
|
|
|
-- origin tx
|
|
INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
|
|
INSERT INTO target_tbl(data)
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
|
|
|
-- as is normal, the insert into target_tbl shows up
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
|
|
|
INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
|
|
|
|
-- mark session as replaying
|
|
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
|
|
|
|
-- ensure we prevent duplicate setup
|
|
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
|
|
|
|
SELECT '' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded');
|
|
|
|
BEGIN;
|
|
-- setup transaction origin
|
|
SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
|
|
INSERT INTO target_tbl(data)
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
|
|
COMMIT;
|
|
|
|
-- check replication progress for the session is correct
|
|
SELECT pg_replication_origin_session_progress(false);
|
|
SELECT pg_replication_origin_session_progress(true);
|
|
|
|
SELECT pg_replication_origin_session_reset();
|
|
|
|
SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status;
|
|
|
|
-- check replication progress identified by name is correct
|
|
SELECT pg_replication_origin_progress('regress_test_decoding: regression_slot', false);
|
|
SELECT pg_replication_origin_progress('regress_test_decoding: regression_slot', true);
|
|
|
|
-- ensure reset requires previously setup state
|
|
SELECT pg_replication_origin_session_reset();
|
|
|
|
-- and magically the replayed xact will be filtered!
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
|
|
|
|
--but new original changes still show up
|
|
INSERT INTO origin_tbl(data) VALUES ('will be replicated');
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
|
|
|
|
SELECT pg_drop_replication_slot('regression_slot');
|
|
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
|
|
|
|
-- Set of transactions with no origin LSNs and commit timestamps set for
|
|
-- this session.
|
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_no_lsn', 'test_decoding');
|
|
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot_no_lsn');
|
|
-- mark session as replaying
|
|
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot_no_lsn');
|
|
-- Simple transactions
|
|
BEGIN;
|
|
INSERT INTO origin_tbl(data) VALUES ('no_lsn, commit');
|
|
COMMIT;
|
|
BEGIN;
|
|
INSERT INTO origin_tbl(data) VALUES ('no_lsn, rollback');
|
|
ROLLBACK;
|
|
-- 2PC transactions
|
|
BEGIN;
|
|
INSERT INTO origin_tbl(data) VALUES ('no_lsn, commit prepared');
|
|
PREPARE TRANSACTION 'replorigin_prepared';
|
|
COMMIT PREPARED 'replorigin_prepared';
|
|
BEGIN;
|
|
INSERT INTO origin_tbl(data) VALUES ('no_lsn, rollback prepared');
|
|
PREPARE TRANSACTION 'replorigin_prepared';
|
|
ROLLBACK PREPARED 'replorigin_prepared';
|
|
SELECT local_id, external_id,
|
|
remote_lsn <> '0/0' AS valid_remote_lsn,
|
|
local_lsn <> '0/0' AS valid_local_lsn
|
|
FROM pg_replication_origin_status;
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot_no_lsn', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
|
|
-- Clean up
|
|
SELECT pg_replication_origin_session_reset();
|
|
SELECT pg_drop_replication_slot('regression_slot_no_lsn');
|
|
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn');
|
|
|
|
-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
|
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
|
|
CREATE PUBLICATION pub FOR TABLE target_tbl;
|
|
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
|
|
|
|
-- mark session as replaying
|
|
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
|
|
|
|
INSERT INTO target_tbl(data) VALUES ('test data');
|
|
|
|
-- The replayed change will be filtered.
|
|
SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none');
|
|
|
|
-- The replayed change will be output if the origin value is not specified.
|
|
SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
|
|
|
|
-- Clean up
|
|
SELECT pg_replication_origin_session_reset();
|
|
SELECT pg_drop_replication_slot('regression_slot');
|
|
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
|
|
DROP PUBLICATION pub;
|