mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-27 00:12:01 +03:00 
			
		
		
		
	In test_decoding module, when skip_empty_xacts option was specified, add
stream_start/stop for streaming transactional messages. This makes the
handling of transactional messages stream consistent irrespective of
whether skip_empty_xacts option was specified.
Commit 26dd0284b9 made a similar change for non-streaming messages but
forgot to update the streaming cases.
Author: Peter Smith
Reviewed-by: Amit Kapila
Discussion: http://postgr.es/m/OS0PR01MB5716AEBD2988F8F5E9D5985794DFA@OS0PR01MB5716.jpnprd01.prod.outlook.com
		
	
		
			
				
	
	
		
			132 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
			
		
		
	
	
			132 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
| -- Test streaming of two-phase commits
 | |
| SET synchronous_commit = on;
 | |
| SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
 | |
|  ?column? 
 | |
| ----------
 | |
|  init
 | |
| (1 row)
 | |
| 
 | |
| CREATE TABLE stream_test(data text);
 | |
| -- consume DDL
 | |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 | |
|  data 
 | |
| ------
 | |
| (0 rows)
 | |
| 
 | |
| -- streaming test with sub-transaction and PREPARE/COMMIT PREPARED
 | |
| BEGIN;
 | |
| SAVEPOINT s1;
 | |
| SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
 | |
|  ?column? 
 | |
| ----------
 | |
|  msg5
 | |
| (1 row)
 | |
| 
 | |
| INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
 | |
| TRUNCATE table stream_test;
 | |
| ROLLBACK TO s1;
 | |
| INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
 | |
| PREPARE TRANSACTION 'test1';
 | |
| -- should show the inserts after a ROLLBACK
 | |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 | |
|                            data                           
 | |
| ----------------------------------------------------------
 | |
|  opening a streamed block for transaction
 | |
|  streaming message: transactional: 1 prefix: test, sz: 50
 | |
|  closing a streamed block for transaction
 | |
|  aborting streamed (sub)transaction
 | |
|  opening a streamed block for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  streaming change for transaction
 | |
|  closing a streamed block for transaction
 | |
|  preparing streamed transaction 'test1'
 | |
| (27 rows)
 | |
| 
 | |
| COMMIT PREPARED 'test1';
 | |
| --should show the COMMIT PREPARED and the other changes in the transaction
 | |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 | |
|           data           
 | |
| -------------------------
 | |
|  COMMIT PREPARED 'test1'
 | |
| (1 row)
 | |
| 
 | |
| -- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
 | |
| -- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
 | |
| BEGIN;
 | |
| SAVEPOINT s1;
 | |
| SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
 | |
|  ?column? 
 | |
| ----------
 | |
|  msg5
 | |
| (1 row)
 | |
| 
 | |
| INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
 | |
| TRUNCATE table stream_test;
 | |
| ROLLBACK to s1;
 | |
| INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
 | |
| PREPARE TRANSACTION 'test1_nodecode';
 | |
| -- should NOT show inserts after a ROLLBACK
 | |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 | |
|                            data                           
 | |
| ----------------------------------------------------------
 | |
|  opening a streamed block for transaction
 | |
|  streaming message: transactional: 1 prefix: test, sz: 50
 | |
|  closing a streamed block for transaction
 | |
|  aborting streamed (sub)transaction
 | |
| (4 rows)
 | |
| 
 | |
| COMMIT PREPARED 'test1_nodecode';
 | |
| -- should show the inserts but not show a COMMIT PREPARED but a COMMIT
 | |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 | |
|                             data                             
 | |
| -------------------------------------------------------------
 | |
|  BEGIN
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
 | |
|  table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
 | |
|  COMMIT
 | |
| (22 rows)
 | |
| 
 | |
| DROP TABLE stream_test;
 | |
| SELECT pg_drop_replication_slot('regression_slot');
 | |
|  pg_drop_replication_slot 
 | |
| --------------------------
 | |
|  
 | |
| (1 row)
 | |
| 
 |