1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-25 13:17:41 +03:00
Files
postgres/contrib/test_decoding/expected/twophase_stream.out
Amit Kapila a271a1b50e Allow decoding at prepare time in ReorderBuffer.
This patch allows PREPARE-time decoding of two-phase transactions (if the
output plugin supports this capability), in which case the transactions
are replayed at PREPARE and then committed later when COMMIT PREPARED
arrives.

Now that we decode the changes before the commit, the concurrent aborts
may cause failures when the output plugin consults catalogs (both system
and user-defined).

We detect such failures with a special sqlerrcode
ERRCODE_TRANSACTION_ROLLBACK introduced by commit 7259736a6e and stop
decoding the remaining changes. Then we rollback the changes when rollback
prepared is encountered.

Author: Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich
Reviewed-by: Amit Kapila, Peter Smith, Sawada Masahiko, Arseny Sher, and Dilip Kumar
Tested-by: Takamichi Osumi
Discussion:
https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com
2021-01-04 08:34:50 +05:30

148 lines
6.3 KiB
Plaintext

-- Test streaming of two-phase commits
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
?column?
----------
init
(1 row)
CREATE TABLE stream_test(data text);
-- consume DDL
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED
BEGIN;
SAVEPOINT s1;
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
?column?
----------
msg5
(1 row)
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
TRUNCATE table stream_test;
ROLLBACK TO s1;
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
PREPARE TRANSACTION 'test1';
-- should show the inserts after a ROLLBACK
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
----------------------------------------------------------
streaming message: transactional: 1 prefix: test, sz: 50
opening a streamed block for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
closing a streamed block for transaction
preparing streamed transaction 'test1'
(24 rows)
COMMIT PREPARED 'test1';
--should show the COMMIT PREPARED and the other changes in the transaction
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
-------------------------------------------------------------
BEGIN
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
PREPARE TRANSACTION 'test1'
COMMIT PREPARED 'test1'
(23 rows)
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
BEGIN;
SAVEPOINT s1;
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
?column?
----------
msg5
(1 row)
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
TRUNCATE table stream_test;
ROLLBACK to s1;
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
PREPARE TRANSACTION 'test1_nodecode';
-- should NOT show inserts after a ROLLBACK
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
----------------------------------------------------------
streaming message: transactional: 1 prefix: test, sz: 50
(1 row)
COMMIT PREPARED 'test1_nodecode';
-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
-------------------------------------------------------------
BEGIN
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
COMMIT
(22 rows)
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
(1 row)