mirror of
https://github.com/postgres/postgres.git
synced 2025-04-21 12:05:57 +03:00
Fix skip-empty-xacts with sequences in test_decoding
Regression tests need to use skip-empty-xacts = false, because there might be accidental concurrent activity (like autovacuum), particularly on slow machines. The tests added by 80901b3291 failed to do that in a couple places, triggering occasional failures on buildfarm. Fixing the tests however uncovered a bug in the code, because sequence callbacks did not handle skip-empty-xacts properly. For trasactional increments we need to check/update the xact_wrote_changes flag, and emit the BEGIN if it's the first change in the transaction. Reported-by: Andres Freund Discussion: https://postgr.es/m/20220212220413.b25amklo7t4xb7ni%40alap3.anarazel.de
This commit is contained in:
parent
faa189c932
commit
b779d7d8fd
@ -87,7 +87,7 @@ SELECT nextval('test_sequence');
|
|||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- show results and drop sequence
|
-- show results and drop sequence
|
||||||
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
||||||
data
|
data
|
||||||
----------------------------------------------------------------------------------------
|
----------------------------------------------------------------------------------------
|
||||||
BEGIN
|
BEGIN
|
||||||
@ -236,7 +236,7 @@ SELECT nextval('test_table_a_seq');
|
|||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
DROP TABLE test_table;
|
DROP TABLE test_table;
|
||||||
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
||||||
data
|
data
|
||||||
-------------------------------------------------------------------------------------------
|
-------------------------------------------------------------------------------------------
|
||||||
BEGIN
|
BEGIN
|
||||||
@ -245,9 +245,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
|
|||||||
sequence public.test_table_a_seq: transactional:0 last_value: 33 log_cnt: 0 is_called:1
|
sequence public.test_table_a_seq: transactional:0 last_value: 33 log_cnt: 0 is_called:1
|
||||||
sequence public.test_table_a_seq: transactional:0 last_value: 3000 log_cnt: 0 is_called:1
|
sequence public.test_table_a_seq: transactional:0 last_value: 3000 log_cnt: 0 is_called:1
|
||||||
sequence public.test_table_a_seq: transactional:0 last_value: 3033 log_cnt: 0 is_called:1
|
sequence public.test_table_a_seq: transactional:0 last_value: 3033 log_cnt: 0 is_called:1
|
||||||
BEGIN
|
(6 rows)
|
||||||
COMMIT
|
|
||||||
(8 rows)
|
|
||||||
|
|
||||||
-- savepoint test on table with serial column
|
-- savepoint test on table with serial column
|
||||||
BEGIN;
|
BEGIN;
|
||||||
@ -259,7 +257,7 @@ INSERT INTO test_table (b) VALUES (300);
|
|||||||
ROLLBACK TO SAVEPOINT a;
|
ROLLBACK TO SAVEPOINT a;
|
||||||
DROP TABLE test_table;
|
DROP TABLE test_table;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
||||||
data
|
data
|
||||||
-----------------------------------------------------------------------------------------
|
-----------------------------------------------------------------------------------------
|
||||||
BEGIN
|
BEGIN
|
||||||
@ -308,7 +306,7 @@ SELECT * FROM test_sequence;
|
|||||||
|
|
||||||
DROP SEQUENCE test_sequence;
|
DROP SEQUENCE test_sequence;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
||||||
data
|
data
|
||||||
----------------------------------------------------------------------------------------
|
----------------------------------------------------------------------------------------
|
||||||
BEGIN
|
BEGIN
|
||||||
|
@ -28,7 +28,7 @@ SELECT setval('test_sequence', 3500, false);
|
|||||||
SELECT nextval('test_sequence');
|
SELECT nextval('test_sequence');
|
||||||
|
|
||||||
-- show results and drop sequence
|
-- show results and drop sequence
|
||||||
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
||||||
DROP SEQUENCE test_sequence;
|
DROP SEQUENCE test_sequence;
|
||||||
|
|
||||||
-- rollback on sequence creation and update
|
-- rollback on sequence creation and update
|
||||||
@ -87,7 +87,7 @@ SELECT * from test_table_a_seq;
|
|||||||
SELECT nextval('test_table_a_seq');
|
SELECT nextval('test_table_a_seq');
|
||||||
|
|
||||||
DROP TABLE test_table;
|
DROP TABLE test_table;
|
||||||
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
||||||
|
|
||||||
-- savepoint test on table with serial column
|
-- savepoint test on table with serial column
|
||||||
BEGIN;
|
BEGIN;
|
||||||
@ -99,7 +99,7 @@ INSERT INTO test_table (b) VALUES (300);
|
|||||||
ROLLBACK TO SAVEPOINT a;
|
ROLLBACK TO SAVEPOINT a;
|
||||||
DROP TABLE test_table;
|
DROP TABLE test_table;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
||||||
|
|
||||||
-- savepoint test on table with serial column
|
-- savepoint test on table with serial column
|
||||||
BEGIN;
|
BEGIN;
|
||||||
@ -114,6 +114,6 @@ ROLLBACK TO SAVEPOINT a;
|
|||||||
SELECT * FROM test_sequence;
|
SELECT * FROM test_sequence;
|
||||||
DROP SEQUENCE test_sequence;
|
DROP SEQUENCE test_sequence;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
||||||
|
|
||||||
SELECT pg_drop_replication_slot('regression_slot');
|
SELECT pg_drop_replication_slot('regression_slot');
|
||||||
|
@ -774,10 +774,21 @@ pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||||||
int64 last_value, int64 log_cnt, bool is_called)
|
int64 last_value, int64 log_cnt, bool is_called)
|
||||||
{
|
{
|
||||||
TestDecodingData *data = ctx->output_plugin_private;
|
TestDecodingData *data = ctx->output_plugin_private;
|
||||||
|
TestDecodingTxnData *txndata = txn->output_plugin_private;
|
||||||
|
|
||||||
if (!data->include_sequences)
|
if (!data->include_sequences)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
/* output BEGIN if we haven't yet, but only for the transactional case */
|
||||||
|
if (transactional)
|
||||||
|
{
|
||||||
|
if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
|
||||||
|
{
|
||||||
|
pg_output_begin(ctx, data, txn, false);
|
||||||
|
}
|
||||||
|
txndata->xact_wrote_changes = true;
|
||||||
|
}
|
||||||
|
|
||||||
OutputPluginPrepareWrite(ctx, true);
|
OutputPluginPrepareWrite(ctx, true);
|
||||||
appendStringInfoString(ctx->out, "sequence ");
|
appendStringInfoString(ctx->out, "sequence ");
|
||||||
appendStringInfoString(ctx->out,
|
appendStringInfoString(ctx->out,
|
||||||
@ -994,10 +1005,21 @@ pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||||||
int64 last_value, int64 log_cnt, bool is_called)
|
int64 last_value, int64 log_cnt, bool is_called)
|
||||||
{
|
{
|
||||||
TestDecodingData *data = ctx->output_plugin_private;
|
TestDecodingData *data = ctx->output_plugin_private;
|
||||||
|
TestDecodingTxnData *txndata = txn->output_plugin_private;
|
||||||
|
|
||||||
if (!data->include_sequences)
|
if (!data->include_sequences)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
/* output BEGIN if we haven't yet, but only for the transactional case */
|
||||||
|
if (transactional)
|
||||||
|
{
|
||||||
|
if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
|
||||||
|
{
|
||||||
|
pg_output_begin(ctx, data, txn, false);
|
||||||
|
}
|
||||||
|
txndata->xact_wrote_changes = true;
|
||||||
|
}
|
||||||
|
|
||||||
OutputPluginPrepareWrite(ctx, true);
|
OutputPluginPrepareWrite(ctx, true);
|
||||||
appendStringInfoString(ctx->out, "streaming sequence ");
|
appendStringInfoString(ctx->out, "streaming sequence ");
|
||||||
appendStringInfoString(ctx->out,
|
appendStringInfoString(ctx->out,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user