mirror of
https://github.com/postgres/postgres.git
synced 2025-06-13 07:41:39 +03:00
Add STREAM_START/STREAM_STOP for transactional messages during decoding.
In test_decoding module, when skip_empty_xacts option was specified, add
stream_start/stop for streaming transactional messages. This makes the
handling of transactional messages stream consistent irrespective of
whether skip_empty_xacts option was specified.
Commit 26dd0284b9
made a similar change for non-streaming messages but
forgot to update the streaming cases.
Author: Peter Smith
Reviewed-by: Amit Kapila
Discussion: http://postgr.es/m/OS0PR01MB5716AEBD2988F8F5E9D5985794DFA@OS0PR01MB5716.jpnprd01.prod.outlook.com
This commit is contained in:
@ -29,7 +29,10 @@ COMMIT;
|
|||||||
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
||||||
data
|
data
|
||||||
----------------------------------------------------------
|
----------------------------------------------------------
|
||||||
|
opening a streamed block for transaction
|
||||||
streaming message: transactional: 1 prefix: test, sz: 50
|
streaming message: transactional: 1 prefix: test, sz: 50
|
||||||
|
closing a streamed block for transaction
|
||||||
|
aborting streamed (sub)transaction
|
||||||
opening a streamed block for transaction
|
opening a streamed block for transaction
|
||||||
streaming change for transaction
|
streaming change for transaction
|
||||||
streaming change for transaction
|
streaming change for transaction
|
||||||
@ -53,7 +56,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
|
|||||||
streaming change for transaction
|
streaming change for transaction
|
||||||
closing a streamed block for transaction
|
closing a streamed block for transaction
|
||||||
committing streamed transaction
|
committing streamed transaction
|
||||||
(24 rows)
|
(27 rows)
|
||||||
|
|
||||||
-- streaming test for toast changes
|
-- streaming test for toast changes
|
||||||
ALTER TABLE stream_test ALTER COLUMN data set storage external;
|
ALTER TABLE stream_test ALTER COLUMN data set storage external;
|
||||||
|
@ -31,7 +31,10 @@ PREPARE TRANSACTION 'test1';
|
|||||||
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
||||||
data
|
data
|
||||||
----------------------------------------------------------
|
----------------------------------------------------------
|
||||||
|
opening a streamed block for transaction
|
||||||
streaming message: transactional: 1 prefix: test, sz: 50
|
streaming message: transactional: 1 prefix: test, sz: 50
|
||||||
|
closing a streamed block for transaction
|
||||||
|
aborting streamed (sub)transaction
|
||||||
opening a streamed block for transaction
|
opening a streamed block for transaction
|
||||||
streaming change for transaction
|
streaming change for transaction
|
||||||
streaming change for transaction
|
streaming change for transaction
|
||||||
@ -55,7 +58,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
|
|||||||
streaming change for transaction
|
streaming change for transaction
|
||||||
closing a streamed block for transaction
|
closing a streamed block for transaction
|
||||||
preparing streamed transaction 'test1'
|
preparing streamed transaction 'test1'
|
||||||
(24 rows)
|
(27 rows)
|
||||||
|
|
||||||
COMMIT PREPARED 'test1';
|
COMMIT PREPARED 'test1';
|
||||||
--should show the COMMIT PREPARED and the other changes in the transaction
|
--should show the COMMIT PREPARED and the other changes in the transaction
|
||||||
@ -84,8 +87,11 @@ PREPARE TRANSACTION 'test1_nodecode';
|
|||||||
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
||||||
data
|
data
|
||||||
----------------------------------------------------------
|
----------------------------------------------------------
|
||||||
|
opening a streamed block for transaction
|
||||||
streaming message: transactional: 1 prefix: test, sz: 50
|
streaming message: transactional: 1 prefix: test, sz: 50
|
||||||
(1 row)
|
closing a streamed block for transaction
|
||||||
|
aborting streamed (sub)transaction
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
COMMIT PREPARED 'test1_nodecode';
|
COMMIT PREPARED 'test1_nodecode';
|
||||||
-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
|
-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
|
||||||
|
@ -944,6 +944,19 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
|
|||||||
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
|
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
|
||||||
const char *prefix, Size sz, const char *message)
|
const char *prefix, Size sz, const char *message)
|
||||||
{
|
{
|
||||||
|
/* Output stream start if we haven't yet for transactional messages. */
|
||||||
|
if (transactional)
|
||||||
|
{
|
||||||
|
TestDecodingData *data = ctx->output_plugin_private;
|
||||||
|
TestDecodingTxnData *txndata = txn->output_plugin_private;
|
||||||
|
|
||||||
|
if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
|
||||||
|
{
|
||||||
|
pg_output_stream_start(ctx, data, txn, false);
|
||||||
|
}
|
||||||
|
txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
|
||||||
|
}
|
||||||
|
|
||||||
OutputPluginPrepareWrite(ctx, true);
|
OutputPluginPrepareWrite(ctx, true);
|
||||||
|
|
||||||
if (transactional)
|
if (transactional)
|
||||||
|
Reference in New Issue
Block a user