mirror of
https://github.com/postgres/postgres.git
synced 2025-06-26 12:21:12 +03:00
Fix re-distributing previously distributed invalidation messages during logical decoding.
Commit4909b38af0
introduced logic to distribute invalidation messages from catalog-modifying transactions to all concurrent in-progress transactions. However, since each transaction distributes not only its original invalidation messages but also previously distributed messages to other transactions, this leads to an exponential increase in allocation request size for invalidation messages, ultimately causing memory allocation failure. This commit fixes this issue by tracking distributed invalidation messages separately per decoded transaction and not redistributing these messages to other in-progress transactions. The maximum size of distributed invalidation messages that one transaction can store is limited to MAX_DISTR_INVAL_MSG_PER_TXN (8MB). Once the size of the distributed invalidation messages exceeds this threshold, we invalidate all caches in locations where distributed invalidation messages need to be executed. Back-patch to all supported versions where we introduced the fix by commit4909b38af0
. Note that this commit adds two new fields to ReorderBufferTXN to store the distributed transactions. This change breaks ABI compatibility in back branches, affecting third-party extensions that depend on the size of the ReorderBufferTXN struct, though this scenario seems unlikely. Additionally, it adds a new flag to the txn_flags field of ReorderBufferTXN to indicate distributed invalidation message overflow. This should not affect existing implementations, as it is unlikely that third-party extensions use unused bits in the txn_flags field. Bug: #18938 #18942 Author: vignesh C <vignesh21@gmail.com> Reported-by: Duncan Sands <duncan.sands@deepbluecap.com> Reported-by: John Hutchins <john.hutchins@wicourts.gov> Reported-by: Laurence Parry <greenreaper@hotmail.com> Reported-by: Max Madden <maxmmadden@gmail.com> Reported-by: Braulio Fdo Gonzalez <brauliofg@gmail.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com> Discussion: https://postgr.es/m/680bdaf6-f7d1-4536-b580-05c2760c67c6@deepbluecap.com Discussion: https://postgr.es/m/18942-0ab1e5ae156613ad@postgresql.org Discussion: https://postgr.es/m/18938-57c9a1c463b68ce0@postgresql.org Discussion: https://postgr.es/m/CAD1FGCT2sYrP_70RTuo56QTizyc+J3wJdtn2gtO3VttQFpdMZg@mail.gmail.com Discussion: https://postgr.es/m/CANO2=B=2BT1hSYCE=nuuTnVTnjidMg0+-FfnRnqM6kd23qoygg@mail.gmail.com Backpatch-through: 13
This commit is contained in:
@ -1,4 +1,4 @@
|
||||
Parsed test spec with 2 sessions
|
||||
Parsed test spec with 3 sessions
|
||||
|
||||
starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
|
||||
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
|
||||
@ -18,3 +18,24 @@ count
|
||||
stop
|
||||
(1 row)
|
||||
|
||||
|
||||
starting permutation: s1_begin s1_insert_tbl1 s3_begin s3_insert_tbl1 s2_alter_pub_add_tbl s1_insert_tbl1 s1_commit s3_commit s2_get_binary_changes
|
||||
step s1_begin: BEGIN;
|
||||
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
|
||||
step s3_begin: BEGIN;
|
||||
step s3_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (2, 2);
|
||||
step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
|
||||
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
|
||||
step s1_commit: COMMIT;
|
||||
step s3_commit: COMMIT;
|
||||
step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
|
||||
count
|
||||
-----
|
||||
0
|
||||
(1 row)
|
||||
|
||||
?column?
|
||||
--------
|
||||
stop
|
||||
(1 row)
|
||||
|
||||
|
@ -28,5 +28,16 @@ setup { SET synchronous_commit=on; }
|
||||
step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
|
||||
step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; }
|
||||
|
||||
session "s3"
|
||||
setup { SET synchronous_commit=on; }
|
||||
step "s3_begin" { BEGIN; }
|
||||
step "s3_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (2, 2); }
|
||||
step "s3_commit" { COMMIT; }
|
||||
|
||||
# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
|
||||
permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"
|
||||
|
||||
# Expect to get no change because both s1's and s3's transactions
|
||||
# use the snapshot from before adding the table tbl1 to the
|
||||
# publication by "s2_alter_pub_add_tbl".
|
||||
permutation "s1_begin" "s1_insert_tbl1" "s3_begin" "s3_insert_tbl1" "s2_alter_pub_add_tbl" "s1_insert_tbl1" "s1_commit" "s3_commit" "s2_get_binary_changes"
|
||||
|
@ -103,12 +103,24 @@
|
||||
#include "storage/sinval.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/combocid.h"
|
||||
#include "utils/inval.h"
|
||||
#include "utils/memdebug.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/relfilenodemap.h"
|
||||
|
||||
|
||||
/*
|
||||
* Each transaction has an 8MB limit for invalidation messages distributed from
|
||||
* other transactions. This limit is set considering scenarios with many
|
||||
* concurrent logical decoding operations. When the distributed invalidation
|
||||
* messages reach this threshold, the transaction is marked as
|
||||
* RBTXN_DISTR_INVAL_OVERFLOWED to invalidate the complete cache as we have lost
|
||||
* some inval messages and hence don't know what needs to be invalidated.
|
||||
*/
|
||||
#define MAX_DISTR_INVAL_MSG_PER_TXN \
|
||||
((8 * 1024 * 1024) / sizeof(SharedInvalidationMessage))
|
||||
|
||||
/* entry for a hash table we use to map from xid to our transaction state */
|
||||
typedef struct ReorderBufferTXNByIdEnt
|
||||
{
|
||||
@ -220,7 +232,8 @@ static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
|
||||
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
|
||||
ReorderBufferIterTXNState *state);
|
||||
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
|
||||
static void ReorderBufferExecuteInvalidations(uint32 nmsgs,
|
||||
SharedInvalidationMessage *msgs);
|
||||
|
||||
/*
|
||||
* ---------------------------------------
|
||||
@ -406,6 +419,12 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
|
||||
txn->invalidations = NULL;
|
||||
}
|
||||
|
||||
if (txn->invalidations_distributed)
|
||||
{
|
||||
pfree(txn->invalidations_distributed);
|
||||
txn->invalidations_distributed = NULL;
|
||||
}
|
||||
|
||||
/* Reset the toast hash */
|
||||
ReorderBufferToastReset(rb, txn);
|
||||
|
||||
@ -1883,7 +1902,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
|
||||
* see new catalog contents, so execute all
|
||||
* invalidations.
|
||||
*/
|
||||
ReorderBufferExecuteInvalidations(rb, txn);
|
||||
ReorderBufferExecuteInvalidations(txn->ninvalidations,
|
||||
txn->invalidations);
|
||||
}
|
||||
|
||||
break;
|
||||
@ -1921,7 +1941,17 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
|
||||
AbortCurrentTransaction();
|
||||
|
||||
/* make sure there's no cache pollution */
|
||||
ReorderBufferExecuteInvalidations(rb, txn);
|
||||
if (rbtxn_distr_inval_overflowed(txn))
|
||||
{
|
||||
Assert(txn->ninvalidations_distributed == 0);
|
||||
InvalidateSystemCaches();
|
||||
}
|
||||
else
|
||||
{
|
||||
ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
|
||||
ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
|
||||
txn->invalidations_distributed);
|
||||
}
|
||||
|
||||
if (using_subtxn)
|
||||
RollbackAndReleaseCurrentSubTransaction();
|
||||
@ -1947,7 +1977,17 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
|
||||
AbortCurrentTransaction();
|
||||
|
||||
/* make sure there's no cache pollution */
|
||||
ReorderBufferExecuteInvalidations(rb, txn);
|
||||
if (rbtxn_distr_inval_overflowed(txn))
|
||||
{
|
||||
Assert(txn->ninvalidations_distributed == 0);
|
||||
InvalidateSystemCaches();
|
||||
}
|
||||
else
|
||||
{
|
||||
ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
|
||||
ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
|
||||
txn->invalidations_distributed);
|
||||
}
|
||||
|
||||
if (using_subtxn)
|
||||
RollbackAndReleaseCurrentSubTransaction();
|
||||
@ -2060,9 +2100,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
|
||||
txn->final_lsn = lsn;
|
||||
|
||||
/*
|
||||
* Process cache invalidation messages if there are any. Even if we're not
|
||||
* interested in the transaction's contents, it could have manipulated the
|
||||
* catalog and we need to update the caches according to that.
|
||||
* Process only cache invalidation messages in this transaction if there
|
||||
* are any. Even if we're not interested in the transaction's contents, it
|
||||
* could have manipulated the catalog and we need to update the caches
|
||||
* according to that.
|
||||
*/
|
||||
if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
|
||||
ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
|
||||
@ -2253,6 +2294,36 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
|
||||
txn->ntuplecids++;
|
||||
}
|
||||
|
||||
/*
|
||||
* A helper function for ReorderBufferAddInvalidations() and
|
||||
* ReorderBufferAddDistributedInvalidations() to accumulate the invalidation
|
||||
* messages to the **invals_out.
|
||||
*/
|
||||
static void
|
||||
ReorderBufferAccumulateInvalidations(SharedInvalidationMessage **invals_out,
|
||||
uint32 *ninvals_out,
|
||||
SharedInvalidationMessage *msgs_new,
|
||||
Size nmsgs_new)
|
||||
{
|
||||
if (*ninvals_out == 0)
|
||||
{
|
||||
*ninvals_out = nmsgs_new;
|
||||
*invals_out = (SharedInvalidationMessage *)
|
||||
palloc(sizeof(SharedInvalidationMessage) * nmsgs_new);
|
||||
memcpy(*invals_out, msgs_new, sizeof(SharedInvalidationMessage) * nmsgs_new);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Enlarge the array of inval messages */
|
||||
*invals_out = (SharedInvalidationMessage *)
|
||||
repalloc(*invals_out, sizeof(SharedInvalidationMessage) *
|
||||
(*ninvals_out + nmsgs_new));
|
||||
memcpy(*invals_out + *ninvals_out, msgs_new,
|
||||
nmsgs_new * sizeof(SharedInvalidationMessage));
|
||||
*ninvals_out += nmsgs_new;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Setup the invalidation of the toplevel transaction.
|
||||
*
|
||||
@ -2282,24 +2353,74 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
|
||||
|
||||
Assert(nmsgs > 0);
|
||||
|
||||
/* Accumulate invalidations. */
|
||||
if (txn->ninvalidations == 0)
|
||||
{
|
||||
txn->ninvalidations = nmsgs;
|
||||
txn->invalidations = (SharedInvalidationMessage *)
|
||||
palloc(sizeof(SharedInvalidationMessage) * nmsgs);
|
||||
memcpy(txn->invalidations, msgs,
|
||||
sizeof(SharedInvalidationMessage) * nmsgs);
|
||||
}
|
||||
else
|
||||
{
|
||||
txn->invalidations = (SharedInvalidationMessage *)
|
||||
repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
|
||||
(txn->ninvalidations + nmsgs));
|
||||
ReorderBufferAccumulateInvalidations(&txn->invalidations,
|
||||
&txn->ninvalidations,
|
||||
msgs, nmsgs);
|
||||
|
||||
memcpy(txn->invalidations + txn->ninvalidations, msgs,
|
||||
nmsgs * sizeof(SharedInvalidationMessage));
|
||||
txn->ninvalidations += nmsgs;
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Accumulate the invalidations distributed by other committed transactions
|
||||
* for executing them later.
|
||||
*
|
||||
* This function is similar to ReorderBufferAddInvalidations() but stores
|
||||
* the given inval messages to the txn->invalidations_distributed with the
|
||||
* overflow check.
|
||||
*
|
||||
* This needs to be called by committed transactions to distribute their
|
||||
* inval messages to in-progress transactions.
|
||||
*/
|
||||
void
|
||||
ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid,
|
||||
XLogRecPtr lsn, Size nmsgs,
|
||||
SharedInvalidationMessage *msgs)
|
||||
{
|
||||
ReorderBufferTXN *txn;
|
||||
MemoryContext oldcontext;
|
||||
|
||||
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
|
||||
|
||||
oldcontext = MemoryContextSwitchTo(rb->context);
|
||||
|
||||
/*
|
||||
* Collect all the invalidations under the top transaction, if available,
|
||||
* so that we can execute them all together.
|
||||
*/
|
||||
if (txn->toplevel_xid)
|
||||
{
|
||||
txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, true, NULL, lsn,
|
||||
true);
|
||||
}
|
||||
|
||||
Assert(nmsgs > 0);
|
||||
|
||||
if (!rbtxn_distr_inval_overflowed(txn))
|
||||
{
|
||||
/*
|
||||
* Check the transaction has enough space for storing distributed
|
||||
* invalidation messages.
|
||||
*/
|
||||
if (txn->ninvalidations_distributed + nmsgs >= MAX_DISTR_INVAL_MSG_PER_TXN)
|
||||
{
|
||||
/*
|
||||
* Mark the invalidation message as overflowed and free up the
|
||||
* messages accumulated so far.
|
||||
*/
|
||||
txn->txn_flags |= RBTXN_DISTR_INVAL_OVERFLOWED;
|
||||
|
||||
if (txn->invalidations_distributed)
|
||||
{
|
||||
pfree(txn->invalidations_distributed);
|
||||
txn->invalidations_distributed = NULL;
|
||||
txn->ninvalidations_distributed = 0;
|
||||
}
|
||||
}
|
||||
else
|
||||
ReorderBufferAccumulateInvalidations(&txn->invalidations_distributed,
|
||||
&txn->ninvalidations_distributed,
|
||||
msgs, nmsgs);
|
||||
}
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
@ -2310,12 +2431,12 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
|
||||
* in the changestream but we don't know which those are.
|
||||
*/
|
||||
static void
|
||||
ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
|
||||
ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i = 0; i < txn->ninvalidations; i++)
|
||||
LocalExecuteInvalidationMessage(&txn->invalidations[i]);
|
||||
for (i = 0; i < nmsgs; i++)
|
||||
LocalExecuteInvalidationMessage(&msgs[i]);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -927,6 +927,13 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
|
||||
* contents built by the current transaction even after its decoding,
|
||||
* which should have been invalidated due to concurrent catalog
|
||||
* changing transaction.
|
||||
*
|
||||
* Distribute only the invalidation messages generated by the current
|
||||
* committed transaction. Invalidation messages received from other
|
||||
* transactions would have already been propagated to the relevant
|
||||
* in-progress transactions. This transaction would have processed
|
||||
* those invalidations, ensuring that subsequent transactions observe
|
||||
* a consistent cache state.
|
||||
*/
|
||||
if (txn->xid != xid)
|
||||
{
|
||||
@ -940,8 +947,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
|
||||
{
|
||||
Assert(msgs != NULL);
|
||||
|
||||
ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
|
||||
ninvalidations, msgs);
|
||||
ReorderBufferAddDistributedInvalidations(builder->reorder,
|
||||
txn->xid, lsn,
|
||||
ninvalidations, msgs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -160,9 +160,10 @@ typedef struct ReorderBufferChange
|
||||
} ReorderBufferChange;
|
||||
|
||||
/* ReorderBufferTXN txn_flags */
|
||||
#define RBTXN_HAS_CATALOG_CHANGES 0x0001
|
||||
#define RBTXN_IS_SUBXACT 0x0002
|
||||
#define RBTXN_IS_SERIALIZED 0x0004
|
||||
#define RBTXN_HAS_CATALOG_CHANGES 0x0001
|
||||
#define RBTXN_IS_SUBXACT 0x0002
|
||||
#define RBTXN_IS_SERIALIZED 0x0004
|
||||
#define RBTXN_DISTR_INVAL_OVERFLOWED 0x0008
|
||||
|
||||
/* Does the transaction have catalog changes? */
|
||||
#define rbtxn_has_catalog_changes(txn) \
|
||||
@ -182,6 +183,12 @@ typedef struct ReorderBufferChange
|
||||
((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
|
||||
)
|
||||
|
||||
/* Is the array of distributed inval messages overflowed? */
|
||||
#define rbtxn_distr_inval_overflowed(txn) \
|
||||
( \
|
||||
((txn)->txn_flags & RBTXN_DISTR_INVAL_OVERFLOWED) != 0 \
|
||||
)
|
||||
|
||||
typedef struct ReorderBufferTXN
|
||||
{
|
||||
/* See above */
|
||||
@ -311,6 +318,12 @@ typedef struct ReorderBufferTXN
|
||||
* Size of this transaction (changes currently in memory, in bytes).
|
||||
*/
|
||||
Size size;
|
||||
|
||||
/*
|
||||
* Stores cache invalidation messages distributed by other transactions.
|
||||
*/
|
||||
uint32 ninvalidations_distributed;
|
||||
SharedInvalidationMessage *invalidations_distributed;
|
||||
} ReorderBufferTXN;
|
||||
|
||||
/* so we can define the callbacks used inside struct ReorderBuffer itself */
|
||||
@ -451,6 +464,9 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr ls
|
||||
CommandId cmin, CommandId cmax, CommandId combocid);
|
||||
void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
|
||||
Size nmsgs, SharedInvalidationMessage *msgs);
|
||||
void ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid,
|
||||
XLogRecPtr lsn, Size nmsgs,
|
||||
SharedInvalidationMessage *msgs);
|
||||
void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
|
||||
SharedInvalidationMessage *invalidations);
|
||||
void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
|
||||
|
Reference in New Issue
Block a user