mirror of
https://github.com/postgres/postgres.git
synced 2025-11-06 07:49:08 +03:00
During logical decoding of in-progress transactions, we perform the toast table scan while fetching the default toast value for an attribute. We forgot to initialize the flag during this scan to indicate that the system table scan is in progress. We need this flag to ensure that during logical decoding we never directly access the tableam or heap APIs because we check for concurrent aborts only in systable_* APIs. Reported-by: Alexander Lakhin Author: Takeshi Ideriha, Hou Zhijie Reviewed-by: Amit Kapila, Hou Zhijie Backpatch-through: 14 Discussion: https://postgr.es/m/18641-6687273b7f15269d@postgresql.org
160 lines
5.5 KiB
Plaintext
160 lines
5.5 KiB
Plaintext
SET synchronous_commit = on;
|
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
|
|
?column?
|
|
----------
|
|
init
|
|
(1 row)
|
|
|
|
CREATE TABLE stream_test(data text);
|
|
-- consume DDL
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
|
data
|
|
------
|
|
(0 rows)
|
|
|
|
-- streaming test with sub-transaction
|
|
BEGIN;
|
|
savepoint s1;
|
|
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
|
|
?column?
|
|
----------
|
|
msg5
|
|
(1 row)
|
|
|
|
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
|
|
TRUNCATE table stream_test;
|
|
rollback to s1;
|
|
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
|
|
COMMIT;
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
|
data
|
|
----------------------------------------------------------
|
|
opening a streamed block for transaction
|
|
streaming message: transactional: 1 prefix: test, sz: 50
|
|
closing a streamed block for transaction
|
|
aborting streamed (sub)transaction
|
|
opening a streamed block for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
closing a streamed block for transaction
|
|
committing streamed transaction
|
|
(27 rows)
|
|
|
|
-- streaming test for toast changes
|
|
ALTER TABLE stream_test ALTER COLUMN data set storage external;
|
|
-- consume DDL
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
|
data
|
|
------
|
|
(0 rows)
|
|
|
|
INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 10) g(i);
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
|
data
|
|
------------------------------------------
|
|
opening a streamed block for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
closing a streamed block for transaction
|
|
committing streamed transaction
|
|
(13 rows)
|
|
|
|
-- streaming test for toast with multi-insert
|
|
\COPY stream_test FROM STDIN
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
|
data
|
|
------------------------------------------
|
|
opening a streamed block for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
closing a streamed block for transaction
|
|
opening a streamed block for transaction
|
|
streaming change for transaction
|
|
closing a streamed block for transaction
|
|
committing streamed transaction
|
|
(17 rows)
|
|
|
|
/*
|
|
* Test concurrent abort with toast data. When streaming the second insertion, we
|
|
* detect that the subtransaction was aborted, and reset the transaction while having
|
|
* the TOAST changes in memory, resulting in deallocating both decoded changes and
|
|
* TOAST reconstruction data. Memory usage counters must be updated correctly.
|
|
*/
|
|
BEGIN;
|
|
INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
|
|
ALTER TABLE stream_test ADD COLUMN i INT;
|
|
SAVEPOINT s1;
|
|
INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i);
|
|
ROLLBACK TO s1;
|
|
COMMIT;
|
|
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
|
count
|
|
-------
|
|
5
|
|
(1 row)
|
|
|
|
-- Test that accessing a TOAST table in streaming mode is allowed.
|
|
-- Create a table with a column that uses a TOASTed default value.
|
|
-- (temporarily hide query, to avoid the long CREATE TABLE stmt)
|
|
\set ECHO none
|
|
SET debug_logical_replication_streaming = immediate;
|
|
BEGIN;
|
|
INSERT INTO test_tab VALUES(1);
|
|
-- Force WAL flush, so that the above changes will be streamed.
|
|
SELECT 'force flush' FROM pg_switch_wal();
|
|
?column?
|
|
-------------
|
|
force flush
|
|
(1 row)
|
|
|
|
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
|
count
|
|
-------
|
|
3
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
RESET debug_logical_replication_streaming;
|
|
DROP TABLE stream_test;
|
|
SELECT pg_drop_replication_slot('regression_slot');
|
|
pg_drop_replication_slot
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|