From dbed2e36625de9d4074243f60f48e04b5ed67810 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 26 Aug 2024 11:00:04 -0700 Subject: [PATCH] Fix memory counter update in ReorderBuffer. Commit 5bec1d6bc5e changed the memory usage updates of the ReorderBufferTXN to zero all at once by subtracting txn->size, rather than updating it for each change. However, if TOAST reconstruction data remained in the transaction when freeing it, there were cases where it further subtracted the memory counter from zero, resulting in an assertion failure. This change calculates the memory size for each change and updates the memory usage to precisely the amount that has been freed. Backpatch to v17, where this was introducd. Reviewed-by: Amit Kapila, Shlok Kyal Discussion: https://postgr.es/m/CAD21AoAqkNUvicgKPT_dXzNoOwpPkVTg0QPPxEcWmzT0moCJ1g%40mail.gmail.com Backpatch-through: 17 --- contrib/test_decoding/expected/stream.out | 19 +++++++++++ contrib/test_decoding/sql/stream.sql | 15 +++++++++ .../replication/logical/reorderbuffer.c | 32 ++++++++++++++++--- 3 files changed, 62 insertions(+), 4 deletions(-) diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out index 4ab2d47bf8d..a76f77601e2 100644 --- a/contrib/test_decoding/expected/stream.out +++ b/contrib/test_decoding/expected/stream.out @@ -109,6 +109,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl committing streamed transaction (17 rows) +/* + * Test concurrent abort with toast data. When streaming the second insertion, we + * detect that the subtransaction was aborted, and reset the transaction while having + * the TOAST changes in memory, resulting in deallocating both decoded changes and + * TOAST reconstruction data. Memory usage counters must be updated correctly. + */ +BEGIN; +INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i); +ALTER TABLE stream_test ADD COLUMN i INT; +SAVEPOINT s1; +INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i); +ROLLBACK TO s1; +COMMIT; +SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + count +------- + 5 +(1 row) + DROP TABLE stream_test; SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql index 4feec62972a..7f43f0c2ab7 100644 --- a/contrib/test_decoding/sql/stream.sql +++ b/contrib/test_decoding/sql/stream.sql @@ -44,5 +44,20 @@ toasted-123456789012345678901234567890123456789012345678901234567890123456789012 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); +/* + * Test concurrent abort with toast data. When streaming the second insertion, we + * detect that the subtransaction was aborted, and reset the transaction while having + * the TOAST changes in memory, resulting in deallocating both decoded changes and + * TOAST reconstruction data. Memory usage counters must be updated correctly. + */ +BEGIN; +INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i); +ALTER TABLE stream_test ADD COLUMN i INT; +SAVEPOINT s1; +INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i); +ROLLBACK TO s1; +COMMIT; +SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + DROP TABLE stream_test; SELECT pg_drop_replication_slot('regression_slot'); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 00a8327e771..b3139c41e2b 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -467,6 +467,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* Reset the toast hash */ ReorderBufferToastReset(rb, txn); + /* All changes must be deallocated */ + Assert(txn->size == 0); + pfree(txn); } @@ -1506,6 +1509,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { bool found; dlist_mutable_iter iter; + Size mem_freed = 0; /* cleanup subtransactions & their changes */ dlist_foreach_modify(iter, &txn->subtxns) @@ -1535,9 +1539,20 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* Check we're not mixing changes from different transactions. */ Assert(change->txn == txn); + /* + * Instead of updating the memory counter for individual changes, + * we sum up the size of memory to free so we can update the memory + * counter all together below. This saves costs of maintaining + * the max-heap. + */ + mem_freed += ReorderBufferChangeSize(change); + ReorderBufferReturnChange(rb, change, false); } + /* Update the memory counter */ + ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed); + /* * Cleanup the tuplecids we stored for decoding catalog snapshot access. * They are always stored in the toplevel transaction. @@ -1594,9 +1609,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) if (rbtxn_is_serialized(txn)) ReorderBufferRestoreCleanup(rb, txn); - /* Update the memory counter */ - ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size); - /* deallocate */ ReorderBufferReturnTXN(rb, txn); } @@ -1616,6 +1628,7 @@ static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared) { dlist_mutable_iter iter; + Size mem_freed = 0; /* cleanup subtransactions & their changes */ dlist_foreach_modify(iter, &txn->subtxns) @@ -1648,11 +1661,19 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep /* remove the change from it's containing list */ dlist_delete(&change->node); + /* + * Instead of updating the memory counter for individual changes, + * we sum up the size of memory to free so we can update the memory + * counter all together below. This saves costs of maintaining + * the max-heap. + */ + mem_freed += ReorderBufferChangeSize(change); + ReorderBufferReturnChange(rb, change, false); } /* Update the memory counter */ - ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size); + ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed); /* * Mark the transaction as streamed. @@ -2062,6 +2083,9 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, rb->stream_stop(rb, txn, last_lsn); ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id); } + + /* All changes must be deallocated */ + Assert(txn->size == 0); } /*