mirror of
				https://github.com/postgres/postgres.git
				synced 2025-11-03 09:13:20 +03:00 
			
		
		
		
	This reverts a sequence of commits, implementing features related to logical decoding and replication of sequences: -0da92dc530-80901b3291-b779d7d8fd-d5ed9da41d-a180c2b34d-75b1521dae-2d2232933b-002c9dd97a-05843b1aa4The implementation has issues, mostly due to combining transactional and non-transactional behavior of sequences. It's not clear how this could be fixed, but it'll require reworking significant part of the patch. Discussion: https://postgr.es/m/95345a19-d508-63d1-860a-f5c2f41e8d40@enterprisedb.com
		
			
				
	
	
		
			260 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
			
		
		
	
	
			260 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
-- predictability
 | 
						|
SET synchronous_commit = on;
 | 
						|
-- superuser required by default
 | 
						|
CREATE ROLE regress_origin_replication REPLICATION;
 | 
						|
SET ROLE regress_origin_replication;
 | 
						|
SELECT pg_replication_origin_advance('regress_test_decoding: perm', '0/1');
 | 
						|
ERROR:  permission denied for function pg_replication_origin_advance
 | 
						|
SELECT pg_replication_origin_create('regress_test_decoding: perm');
 | 
						|
ERROR:  permission denied for function pg_replication_origin_create
 | 
						|
SELECT pg_replication_origin_drop('regress_test_decoding: perm');
 | 
						|
ERROR:  permission denied for function pg_replication_origin_drop
 | 
						|
SELECT pg_replication_origin_oid('regress_test_decoding: perm');
 | 
						|
ERROR:  permission denied for function pg_replication_origin_oid
 | 
						|
SELECT pg_replication_origin_progress('regress_test_decoding: perm', false);
 | 
						|
ERROR:  permission denied for function pg_replication_origin_progress
 | 
						|
SELECT pg_replication_origin_session_is_setup();
 | 
						|
ERROR:  permission denied for function pg_replication_origin_session_is_setup
 | 
						|
SELECT pg_replication_origin_session_progress(false);
 | 
						|
ERROR:  permission denied for function pg_replication_origin_session_progress
 | 
						|
SELECT pg_replication_origin_session_reset();
 | 
						|
ERROR:  permission denied for function pg_replication_origin_session_reset
 | 
						|
SELECT pg_replication_origin_session_setup('regress_test_decoding: perm');
 | 
						|
ERROR:  permission denied for function pg_replication_origin_session_setup
 | 
						|
SELECT pg_replication_origin_xact_reset();
 | 
						|
ERROR:  permission denied for function pg_replication_origin_xact_reset
 | 
						|
SELECT pg_replication_origin_xact_setup('0/1', '2013-01-01 00:00');
 | 
						|
ERROR:  permission denied for function pg_replication_origin_xact_setup
 | 
						|
SELECT pg_show_replication_origin_status();
 | 
						|
ERROR:  permission denied for function pg_show_replication_origin_status
 | 
						|
RESET ROLE;
 | 
						|
DROP ROLE regress_origin_replication;
 | 
						|
CREATE TABLE origin_tbl(id serial primary key, data text);
 | 
						|
CREATE TABLE target_tbl(id serial primary key, data text);
 | 
						|
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 | 
						|
 pg_replication_origin_create 
 | 
						|
------------------------------
 | 
						|
                            1
 | 
						|
(1 row)
 | 
						|
 | 
						|
-- ensure duplicate creations fail
 | 
						|
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 | 
						|
ERROR:  duplicate key value violates unique constraint "pg_replication_origin_roname_index"
 | 
						|
DETAIL:  Key (roname)=(regress_test_decoding: regression_slot) already exists.
 | 
						|
--ensure deletions work (once)
 | 
						|
SELECT pg_replication_origin_create('regress_test_decoding: temp');
 | 
						|
 pg_replication_origin_create 
 | 
						|
------------------------------
 | 
						|
                            2
 | 
						|
(1 row)
 | 
						|
 | 
						|
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
 | 
						|
 pg_replication_origin_drop 
 | 
						|
----------------------------
 | 
						|
 
 | 
						|
(1 row)
 | 
						|
 | 
						|
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
 | 
						|
ERROR:  replication origin "regress_test_decoding: temp" does not exist
 | 
						|
-- various failure checks for undefined slots
 | 
						|
select pg_replication_origin_advance('regress_test_decoding: temp', '0/1');
 | 
						|
ERROR:  replication origin "regress_test_decoding: temp" does not exist
 | 
						|
select pg_replication_origin_session_setup('regress_test_decoding: temp');
 | 
						|
ERROR:  replication origin "regress_test_decoding: temp" does not exist
 | 
						|
select pg_replication_origin_progress('regress_test_decoding: temp', true);
 | 
						|
ERROR:  replication origin "regress_test_decoding: temp" does not exist
 | 
						|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 | 
						|
 ?column? 
 | 
						|
----------
 | 
						|
 init
 | 
						|
(1 row)
 | 
						|
 | 
						|
-- origin tx
 | 
						|
INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
 | 
						|
INSERT INTO target_tbl(data)
 | 
						|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 | 
						|
-- as is normal, the insert into target_tbl shows up
 | 
						|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 | 
						|
                                                                                    data                                                                                    
 | 
						|
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 | 
						|
 BEGIN
 | 
						|
 table public.target_tbl: INSERT: id[integer]:1 data[text]:'BEGIN'
 | 
						|
 table public.target_tbl: INSERT: id[integer]:2 data[text]:'table public.origin_tbl: INSERT: id[integer]:1 data[text]:''will be replicated and decoded and decoded again'''
 | 
						|
 table public.target_tbl: INSERT: id[integer]:3 data[text]:'COMMIT'
 | 
						|
 COMMIT
 | 
						|
(5 rows)
 | 
						|
 | 
						|
INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
 | 
						|
-- mark session as replaying
 | 
						|
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
 | 
						|
 pg_replication_origin_session_setup 
 | 
						|
-------------------------------------
 | 
						|
 
 | 
						|
(1 row)
 | 
						|
 | 
						|
-- ensure we prevent duplicate setup
 | 
						|
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
 | 
						|
ERROR:  cannot setup replication origin when one is already setup
 | 
						|
SELECT '' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded');
 | 
						|
 ?column? 
 | 
						|
----------
 | 
						|
 
 | 
						|
(1 row)
 | 
						|
 | 
						|
BEGIN;
 | 
						|
-- setup transaction origin
 | 
						|
SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
 | 
						|
 pg_replication_origin_xact_setup 
 | 
						|
----------------------------------
 | 
						|
 
 | 
						|
(1 row)
 | 
						|
 | 
						|
INSERT INTO target_tbl(data)
 | 
						|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
 | 
						|
COMMIT;
 | 
						|
-- check replication progress for the session is correct
 | 
						|
SELECT pg_replication_origin_session_progress(false);
 | 
						|
 pg_replication_origin_session_progress 
 | 
						|
----------------------------------------
 | 
						|
 0/AABBCCDD
 | 
						|
(1 row)
 | 
						|
 | 
						|
SELECT pg_replication_origin_session_progress(true);
 | 
						|
 pg_replication_origin_session_progress 
 | 
						|
----------------------------------------
 | 
						|
 0/AABBCCDD
 | 
						|
(1 row)
 | 
						|
 | 
						|
SELECT pg_replication_origin_session_reset();
 | 
						|
 pg_replication_origin_session_reset 
 | 
						|
-------------------------------------
 | 
						|
 
 | 
						|
(1 row)
 | 
						|
 | 
						|
SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status;
 | 
						|
 local_id |              external_id               | remote_lsn | ?column? 
 | 
						|
----------+----------------------------------------+------------+----------
 | 
						|
        1 | regress_test_decoding: regression_slot | 0/AABBCCDD | t
 | 
						|
(1 row)
 | 
						|
 | 
						|
-- check replication progress identified by name is correct
 | 
						|
SELECT pg_replication_origin_progress('regress_test_decoding: regression_slot', false);
 | 
						|
 pg_replication_origin_progress 
 | 
						|
--------------------------------
 | 
						|
 0/AABBCCDD
 | 
						|
(1 row)
 | 
						|
 | 
						|
SELECT pg_replication_origin_progress('regress_test_decoding: regression_slot', true);
 | 
						|
 pg_replication_origin_progress 
 | 
						|
--------------------------------
 | 
						|
 0/AABBCCDD
 | 
						|
(1 row)
 | 
						|
 | 
						|
-- ensure reset requires previously setup state
 | 
						|
SELECT pg_replication_origin_session_reset();
 | 
						|
ERROR:  no replication origin is configured
 | 
						|
-- and magically the replayed xact will be filtered!
 | 
						|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
 | 
						|
 data 
 | 
						|
------
 | 
						|
(0 rows)
 | 
						|
 | 
						|
--but new original changes still show up
 | 
						|
INSERT INTO origin_tbl(data) VALUES ('will be replicated');
 | 
						|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1',  'only-local', '1');
 | 
						|
                                      data                                      
 | 
						|
--------------------------------------------------------------------------------
 | 
						|
 BEGIN
 | 
						|
 table public.origin_tbl: INSERT: id[integer]:3 data[text]:'will be replicated'
 | 
						|
 COMMIT
 | 
						|
(3 rows)
 | 
						|
 | 
						|
SELECT pg_drop_replication_slot('regression_slot');
 | 
						|
 pg_drop_replication_slot 
 | 
						|
--------------------------
 | 
						|
 
 | 
						|
(1 row)
 | 
						|
 | 
						|
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
 | 
						|
 pg_replication_origin_drop 
 | 
						|
----------------------------
 | 
						|
 
 | 
						|
(1 row)
 | 
						|
 | 
						|
-- Set of transactions with no origin LSNs and commit timestamps set for
 | 
						|
-- this session.
 | 
						|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_no_lsn', 'test_decoding');
 | 
						|
 ?column? 
 | 
						|
----------
 | 
						|
 init
 | 
						|
(1 row)
 | 
						|
 | 
						|
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot_no_lsn');
 | 
						|
 pg_replication_origin_create 
 | 
						|
------------------------------
 | 
						|
                            1
 | 
						|
(1 row)
 | 
						|
 | 
						|
-- mark session as replaying
 | 
						|
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot_no_lsn');
 | 
						|
 pg_replication_origin_session_setup 
 | 
						|
-------------------------------------
 | 
						|
 
 | 
						|
(1 row)
 | 
						|
 | 
						|
-- Simple transactions
 | 
						|
BEGIN;
 | 
						|
INSERT INTO origin_tbl(data) VALUES ('no_lsn, commit');
 | 
						|
COMMIT;
 | 
						|
BEGIN;
 | 
						|
INSERT INTO origin_tbl(data) VALUES ('no_lsn, rollback');
 | 
						|
ROLLBACK;
 | 
						|
-- 2PC transactions
 | 
						|
BEGIN;
 | 
						|
INSERT INTO origin_tbl(data) VALUES ('no_lsn, commit prepared');
 | 
						|
PREPARE TRANSACTION 'replorigin_prepared';
 | 
						|
COMMIT PREPARED 'replorigin_prepared';
 | 
						|
BEGIN;
 | 
						|
INSERT INTO origin_tbl(data) VALUES ('no_lsn, rollback prepared');
 | 
						|
PREPARE TRANSACTION 'replorigin_prepared';
 | 
						|
ROLLBACK PREPARED 'replorigin_prepared';
 | 
						|
SELECT local_id, external_id,
 | 
						|
       remote_lsn <> '0/0' AS valid_remote_lsn,
 | 
						|
       local_lsn <> '0/0' AS valid_local_lsn
 | 
						|
       FROM pg_replication_origin_status;
 | 
						|
 local_id |                  external_id                  | valid_remote_lsn | valid_local_lsn 
 | 
						|
----------+-----------------------------------------------+------------------+-----------------
 | 
						|
        1 | regress_test_decoding: regression_slot_no_lsn | f                | t
 | 
						|
(1 row)
 | 
						|
 | 
						|
SELECT data FROM pg_logical_slot_get_changes('regression_slot_no_lsn', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
 | 
						|
                                        data                                         
 | 
						|
-------------------------------------------------------------------------------------
 | 
						|
 BEGIN
 | 
						|
 table public.origin_tbl: INSERT: id[integer]:4 data[text]:'no_lsn, commit'
 | 
						|
 COMMIT
 | 
						|
 BEGIN
 | 
						|
 table public.origin_tbl: INSERT: id[integer]:6 data[text]:'no_lsn, commit prepared'
 | 
						|
 COMMIT
 | 
						|
(6 rows)
 | 
						|
 | 
						|
-- Clean up
 | 
						|
SELECT pg_replication_origin_session_reset();
 | 
						|
 pg_replication_origin_session_reset 
 | 
						|
-------------------------------------
 | 
						|
 
 | 
						|
(1 row)
 | 
						|
 | 
						|
SELECT pg_drop_replication_slot('regression_slot_no_lsn');
 | 
						|
 pg_drop_replication_slot 
 | 
						|
--------------------------
 | 
						|
 
 | 
						|
(1 row)
 | 
						|
 | 
						|
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn');
 | 
						|
 pg_replication_origin_drop 
 | 
						|
----------------------------
 | 
						|
 
 | 
						|
(1 row)
 | 
						|
 |