mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-24 01:29:19 +03:00 
			
		
		
		
	When implementing a replication solution ontop of logical decoding, two
related problems exist:
* How to safely keep track of replication progress
* How to change replication behavior, based on the origin of a row;
  e.g. to avoid loops in bi-directional replication setups
The solution to these problems, as implemented here, consist out of
three parts:
1) 'replication origins', which identify nodes in a replication setup.
2) 'replication progress tracking', which remembers, for each
   replication origin, how far replay has progressed in a efficient and
   crash safe manner.
3) The ability to filter out changes performed on the behest of a
   replication origin during logical decoding; this allows complex
   replication topologies. E.g. by filtering all replayed changes out.
Most of this could also be implemented in "userspace", e.g. by inserting
additional rows contain origin information, but that ends up being much
less efficient and more complicated.  We don't want to require various
replication solutions to reimplement logic for this independently. The
infrastructure is intended to be generic enough to be reusable.
This infrastructure also replaces the 'nodeid' infrastructure of commit
timestamps. It is intended to provide all the former capabilities,
except that there's only 2^16 different origins; but now they integrate
with logical decoding. Additionally more functionality is accessible via
SQL.  Since the commit timestamp infrastructure has also been introduced
in 9.5 (commit 73c986add) changing the API is not a problem.
For now the number of origins for which the replication progress can be
tracked simultaneously is determined by the max_replication_slots
GUC. That GUC is not a perfect match to configure this, but there
doesn't seem to be sufficient reason to introduce a separate new one.
Bumps both catversion and wal page magic.
Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer
Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer
Discussion: 20150216002155.GI15326@awork2.anarazel.de,
    20140923182422.GA15776@alap3.anarazel.de,
    20131114172632.GE7522@alap2.anarazel.de
		
	
		
			
				
	
	
		
			142 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
			
		
		
	
	
			142 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
| -- predictability
 | |
| SET synchronous_commit = on;
 | |
| CREATE TABLE origin_tbl(id serial primary key, data text);
 | |
| CREATE TABLE target_tbl(id serial primary key, data text);
 | |
| SELECT pg_replication_origin_create('test_decoding: regression_slot');
 | |
|  pg_replication_origin_create 
 | |
| ------------------------------
 | |
|                             1
 | |
| (1 row)
 | |
| 
 | |
| -- ensure duplicate creations fail
 | |
| SELECT pg_replication_origin_create('test_decoding: regression_slot');
 | |
| ERROR:  duplicate key value violates unique constraint "pg_replication_origin_roname_index"
 | |
| DETAIL:  Key (roname)=(test_decoding: regression_slot) already exists.
 | |
| --ensure deletions work (once)
 | |
| SELECT pg_replication_origin_create('test_decoding: temp');
 | |
|  pg_replication_origin_create 
 | |
| ------------------------------
 | |
|                             2
 | |
| (1 row)
 | |
| 
 | |
| SELECT pg_replication_origin_drop('test_decoding: temp');
 | |
|  pg_replication_origin_drop 
 | |
| ----------------------------
 | |
|  
 | |
| (1 row)
 | |
| 
 | |
| SELECT pg_replication_origin_drop('test_decoding: temp');
 | |
| ERROR:  cache lookup failed for replication origin 'test_decoding: temp'
 | |
| SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 | |
|  ?column? 
 | |
| ----------
 | |
|  init
 | |
| (1 row)
 | |
| 
 | |
| -- origin tx
 | |
| INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
 | |
| INSERT INTO target_tbl(data)
 | |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 | |
| -- as is normal, the insert into target_tbl shows up
 | |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 | |
|                                                                                     data                                                                                    
 | |
| ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 | |
|  BEGIN
 | |
|  table public.target_tbl: INSERT: id[integer]:1 data[text]:'BEGIN'
 | |
|  table public.target_tbl: INSERT: id[integer]:2 data[text]:'table public.origin_tbl: INSERT: id[integer]:1 data[text]:''will be replicated and decoded and decoded again'''
 | |
|  table public.target_tbl: INSERT: id[integer]:3 data[text]:'COMMIT'
 | |
|  COMMIT
 | |
| (5 rows)
 | |
| 
 | |
| INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
 | |
| -- mark session as replaying
 | |
| SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
 | |
|  pg_replication_origin_session_setup 
 | |
| -------------------------------------
 | |
|  
 | |
| (1 row)
 | |
| 
 | |
| -- ensure we prevent duplicate setup
 | |
| SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
 | |
| ERROR:  cannot setup replication origin when one is already setup
 | |
| BEGIN;
 | |
| -- setup transaction origin
 | |
| SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
 | |
|  pg_replication_origin_xact_setup 
 | |
| ----------------------------------
 | |
|  
 | |
| (1 row)
 | |
| 
 | |
| INSERT INTO target_tbl(data)
 | |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
 | |
| COMMIT;
 | |
| -- check replication progress for the session is correct
 | |
| SELECT pg_replication_origin_session_progress(false);
 | |
|  pg_replication_origin_session_progress 
 | |
| ----------------------------------------
 | |
|  0/AABBCCDD
 | |
| (1 row)
 | |
| 
 | |
| SELECT pg_replication_origin_session_progress(true);
 | |
|  pg_replication_origin_session_progress 
 | |
| ----------------------------------------
 | |
|  0/AABBCCDD
 | |
| (1 row)
 | |
| 
 | |
| SELECT pg_replication_origin_session_reset();
 | |
|  pg_replication_origin_session_reset 
 | |
| -------------------------------------
 | |
|  
 | |
| (1 row)
 | |
| 
 | |
| SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status;
 | |
|  local_id |          external_id           | remote_lsn | ?column? 
 | |
| ----------+--------------------------------+------------+----------
 | |
|         1 | test_decoding: regression_slot | 0/AABBCCDD | t
 | |
| (1 row)
 | |
| 
 | |
| -- check replication progress identified by name is correct
 | |
| SELECT pg_replication_origin_progress('test_decoding: regression_slot', false);
 | |
|  pg_replication_origin_progress 
 | |
| --------------------------------
 | |
|  0/AABBCCDD
 | |
| (1 row)
 | |
| 
 | |
| SELECT pg_replication_origin_progress('test_decoding: regression_slot', true);
 | |
|  pg_replication_origin_progress 
 | |
| --------------------------------
 | |
|  0/AABBCCDD
 | |
| (1 row)
 | |
| 
 | |
| -- ensure reset requires previously setup state
 | |
| SELECT pg_replication_origin_session_reset();
 | |
| ERROR:  no replication origin is configured
 | |
| -- and magically the replayed xact will be filtered!
 | |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
 | |
|  data 
 | |
| ------
 | |
| (0 rows)
 | |
| 
 | |
| --but new original changes still show up
 | |
| INSERT INTO origin_tbl(data) VALUES ('will be replicated');
 | |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1',  'only-local', '1');
 | |
|                                       data                                      
 | |
| --------------------------------------------------------------------------------
 | |
|  BEGIN
 | |
|  table public.origin_tbl: INSERT: id[integer]:3 data[text]:'will be replicated'
 | |
|  COMMIT
 | |
| (3 rows)
 | |
| 
 | |
| SELECT pg_drop_replication_slot('regression_slot');
 | |
|  pg_drop_replication_slot 
 | |
| --------------------------
 | |
|  
 | |
| (1 row)
 | |
| 
 | |
| SELECT pg_replication_origin_drop('test_decoding: regression_slot');
 | |
|  pg_replication_origin_drop 
 | |
| ----------------------------
 | |
|  
 | |
| (1 row)
 | |
| 
 |