mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-28 11:55:03 +03:00 
			
		
		
		
	Commit 216a784829 introduced parallel apply workers, allowing multiple
processes to share a replication origin. To support this,
replorigin_session_setup() was extended to accept a pid argument
identifying the process using the origin.
This commit exposes that capability through the SQL interface function
pg_replication_origin_session_setup() by adding an optional pid parameter.
This enables multiple processes to coordinate replication using the same
origin when using SQL-level replication functions.
This change allows the non-builtin logical replication solutions to
implement parallel apply for large transactions.
Additionally, an existing internal error was made user-facing, as it can
now be triggered via the exposed SQL API.
Author: Doruk Yilmaz <doruk@mixrank.com>
Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Euler Taveira <euler@eulerto.com>
Discussion: https://postgr.es/m/CAMPB6wfe4zLjJL8jiZV5kjjpwBM2=rTRme0UCL7Ra4L8MTVdOg@mail.gmail.com
Discussion: https://postgr.es/m/CAE2gYzyTSNvHY1+iWUwykaLETSuAZsCWyryokjP6rG46ZvRgQA@mail.gmail.com
		
	
		
			
				
	
	
		
			57 lines
		
	
	
		
			1.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			57 lines
		
	
	
		
			1.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Test parallel replication origin manipulations; ensure local_lsn can be
 | |
| # updated by all attached sessions.
 | |
| 
 | |
| setup
 | |
| {
 | |
|     SELECT pg_replication_origin_create('origin');
 | |
|     CREATE UNLOGGED TABLE local_lsn_store (session int, lsn pg_lsn);
 | |
| }
 | |
| 
 | |
| teardown
 | |
| {
 | |
|     SELECT pg_replication_origin_drop('origin');
 | |
|     DROP TABLE local_lsn_store;
 | |
| }
 | |
| 
 | |
| session "s0"
 | |
| setup { SET synchronous_commit = on; }
 | |
| step "s0_setup" { SELECT pg_replication_origin_session_setup('origin'); }
 | |
| step "s0_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
 | |
| step "s0_add_message" {
 | |
|     SELECT 1
 | |
|     FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
 | |
| }
 | |
| step "s0_store_lsn" {
 | |
|     INSERT INTO local_lsn_store
 | |
|     SELECT 0, local_lsn FROM pg_replication_origin_status;
 | |
| }
 | |
| step "s0_compare" {
 | |
|     SELECT s0.lsn < s1.lsn
 | |
|     FROM local_lsn_store as s0, local_lsn_store as s1
 | |
|     WHERE s0.session = 0 AND s1.session = 1;
 | |
| }
 | |
| step "s0_reset" { SELECT pg_replication_origin_session_reset(); }
 | |
| 
 | |
| session "s1"
 | |
| setup { SET synchronous_commit = on; }
 | |
| step "s1_setup" {
 | |
|     SELECT pg_replication_origin_session_setup('origin', pid)
 | |
|     FROM pg_stat_activity
 | |
|     WHERE application_name = 'isolation/parallel_session_origin/s0';
 | |
| }
 | |
| step "s1_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
 | |
| step "s1_add_message" {
 | |
|     SELECT 1
 | |
|     FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
 | |
| }
 | |
| step "s1_store_lsn" {
 | |
|     INSERT INTO local_lsn_store
 | |
|     SELECT 1, local_lsn FROM pg_replication_origin_status;
 | |
| }
 | |
| step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
 | |
| 
 | |
| # Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
 | |
| # commits a transaction and store the local_lsn of the replication origin.
 | |
| # Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
 | |
| permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
 |