1
0
mirror of https://github.com/postgres/postgres.git synced 2025-05-06 19:59:18 +03:00

Reduce memory consumption for pending invalidation messages.

The existing data structures in inval.c are fairly inefficient for
the common case of a command or subtransaction that registers a small
number of cache invalidation events.  While this doesn't matter if we
commit right away, it can build up to a lot of bloat in a transaction
that contains many DDL operations.  By making a few more assumptions
about the expected use-case, we can switch to a representation using
densely-packed arrays.  Although this eliminates some data-copying,
it doesn't seem to make much difference time-wise.  But the space
consumption decreases substantially.

Patch by me; thanks to Nathan Bossart for review.

Discussion: https://postgr.es/m/2380555.1622395376@sss.pgh.pa.us
This commit is contained in:
Tom Lane 2021-08-16 16:48:25 -04:00
parent 069d33d0c5
commit 3aafc030a5

View File

@ -71,11 +71,6 @@
* manipulating the init file is in relcache.c, but we keep track of the * manipulating the init file is in relcache.c, but we keep track of the
* need for it here. * need for it here.
* *
* The request lists proper are kept in CurTransactionContext of their
* creating (sub)transaction, since they can be forgotten on abort of that
* transaction but must be kept till top-level commit otherwise. For
* simplicity we keep the controlling list-of-lists in TopTransactionContext.
*
* Currently, inval messages are sent without regard for the possibility * Currently, inval messages are sent without regard for the possibility
* that the object described by the catalog tuple might be a session-local * that the object described by the catalog tuple might be a session-local
* object such as a temporary table. This is because (1) this code has * object such as a temporary table. This is because (1) this code has
@ -106,7 +101,6 @@
#include "catalog/catalog.h" #include "catalog/catalog.h"
#include "catalog/pg_constraint.h" #include "catalog/pg_constraint.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "port/pg_bitutils.h"
#include "storage/sinval.h" #include "storage/sinval.h"
#include "storage/smgr.h" #include "storage/smgr.h"
#include "utils/catcache.h" #include "utils/catcache.h"
@ -121,35 +115,86 @@
/* /*
* To minimize palloc traffic, we keep pending requests in successively- * Pending requests are stored as ready-to-send SharedInvalidationMessages.
* larger chunks (a slightly more sophisticated version of an expansible * We keep the messages themselves in arrays in TopTransactionContext
* array). All request types can be stored as SharedInvalidationMessage * (there are separate arrays for catcache and relcache messages). Control
* records. The ordering of requests within a list is never significant. * information is kept in a chain of TransInvalidationInfo structs, also
* allocated in TopTransactionContext. (We could keep a subtransaction's
* TransInvalidationInfo in its CurTransactionContext; but that's more
* wasteful not less so, since in very many scenarios it'd be the only
* allocation in the subtransaction's CurTransactionContext.)
*
* We can store the message arrays densely, and yet avoid moving data around
* within an array, because within any one subtransaction we need only
* distinguish between messages emitted by prior commands and those emitted
* by the current command. Once a command completes and we've done local
* processing on its messages, we can fold those into the prior-commands
* messages just by changing array indexes in the TransInvalidationInfo
* struct. Similarly, we need distinguish messages of prior subtransactions
* from those of the current subtransaction only until the subtransaction
* completes, after which we adjust the array indexes in the parent's
* TransInvalidationInfo to include the subtransaction's messages.
*
* The ordering of the individual messages within a command's or
* subtransaction's output is not considered significant, although this
* implementation happens to preserve the order in which they were queued.
* (Previous versions of this code did not preserve it.)
*
* For notational convenience, control information is kept in two-element
* arrays, the first for catcache messages and the second for relcache
* messages.
*/ */
typedef struct InvalidationChunk #define CatCacheMsgs 0
{ #define RelCacheMsgs 1
struct InvalidationChunk *next; /* list link */
int nitems; /* # items currently stored in chunk */
int maxitems; /* size of allocated array in this chunk */
SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
} InvalidationChunk;
typedef struct InvalidationListHeader /* Pointers to main arrays in TopTransactionContext */
typedef struct InvalMessageArray
{ {
InvalidationChunk *cclist; /* list of chunks holding catcache msgs */ SharedInvalidationMessage *msgs; /* palloc'd array (can be expanded) */
InvalidationChunk *rclist; /* list of chunks holding relcache msgs */ int maxmsgs; /* current allocated size of array */
} InvalidationListHeader; } InvalMessageArray;
static InvalMessageArray InvalMessageArrays[2];
/* Control information for one logical group of messages */
typedef struct InvalidationMsgsGroup
{
int firstmsg[2]; /* first index in relevant array */
int nextmsg[2]; /* last+1 index */
} InvalidationMsgsGroup;
/* Macros to help preserve InvalidationMsgsGroup abstraction */
#define SetSubGroupToFollow(targetgroup, priorgroup, subgroup) \
do { \
(targetgroup)->firstmsg[subgroup] = \
(targetgroup)->nextmsg[subgroup] = \
(priorgroup)->nextmsg[subgroup]; \
} while (0)
#define SetGroupToFollow(targetgroup, priorgroup) \
do { \
SetSubGroupToFollow(targetgroup, priorgroup, CatCacheMsgs); \
SetSubGroupToFollow(targetgroup, priorgroup, RelCacheMsgs); \
} while (0)
#define NumMessagesInSubGroup(group, subgroup) \
((group)->nextmsg[subgroup] - (group)->firstmsg[subgroup])
#define NumMessagesInGroup(group) \
(NumMessagesInSubGroup(group, CatCacheMsgs) + \
NumMessagesInSubGroup(group, RelCacheMsgs))
/*---------------- /*----------------
* Invalidation info is divided into two lists: * Invalidation messages are divided into two groups:
* 1) events so far in current command, not yet reflected to caches. * 1) events so far in current command, not yet reflected to caches.
* 2) events in previous commands of current transaction; these have * 2) events in previous commands of current transaction; these have
* been reflected to local caches, and must be either broadcast to * been reflected to local caches, and must be either broadcast to
* other backends or rolled back from local cache when we commit * other backends or rolled back from local cache when we commit
* or abort the transaction. * or abort the transaction.
* Actually, we need two such lists for each level of nested transaction, * Actually, we need such groups for each level of nested transaction,
* so that we can discard events from an aborted subtransaction. When * so that we can discard events from an aborted subtransaction. When
* a subtransaction commits, we append its lists to the parent's lists. * a subtransaction commits, we append its events to the parent's groups.
* *
* The relcache-file-invalidated flag can just be a simple boolean, * The relcache-file-invalidated flag can just be a simple boolean,
* since we only act on it at transaction commit; we don't care which * since we only act on it at transaction commit; we don't care which
@ -165,11 +210,11 @@ typedef struct TransInvalidationInfo
/* Subtransaction nesting depth */ /* Subtransaction nesting depth */
int my_level; int my_level;
/* head of current-command event list */ /* Events emitted by current command */
InvalidationListHeader CurrentCmdInvalidMsgs; InvalidationMsgsGroup CurrentCmdInvalidMsgs;
/* head of previous-commands event list */ /* Events emitted by previous commands of this (sub)transaction */
InvalidationListHeader PriorCmdInvalidMsgs; InvalidationMsgsGroup PriorCmdInvalidMsgs;
/* init file must be invalidated? */ /* init file must be invalidated? */
bool RelcacheInitFileInval; bool RelcacheInitFileInval;
@ -177,10 +222,6 @@ typedef struct TransInvalidationInfo
static TransInvalidationInfo *transInvalInfo = NULL; static TransInvalidationInfo *transInvalInfo = NULL;
static SharedInvalidationMessage *SharedInvalidMessagesArray;
static int numSharedInvalidMessagesArray;
static int maxSharedInvalidMessagesArray;
/* GUC storage */ /* GUC storage */
int debug_discard_caches = 0; int debug_discard_caches = 0;
@ -218,124 +259,118 @@ static struct RELCACHECALLBACK
static int relcache_callback_count = 0; static int relcache_callback_count = 0;
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* Invalidation list support functions * Invalidation subgroup support functions
*
* These three routines encapsulate processing of the "chunked"
* representation of what is logically just a list of messages.
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
/* /*
* AddInvalidationMessage * AddInvalidationMessage
* Add an invalidation message to a list (of chunks). * Add an invalidation message to a (sub)group.
* *
* Note that we do not pay any great attention to maintaining the original * The group must be the last active one, since we assume we can add to the
* ordering of the messages. * end of the relevant InvalMessageArray.
*
* subgroup must be CatCacheMsgs or RelCacheMsgs.
*/ */
static void static void
AddInvalidationMessage(InvalidationChunk **listHdr, AddInvalidationMessage(InvalidationMsgsGroup *group, int subgroup,
SharedInvalidationMessage *msg) const SharedInvalidationMessage *msg)
{ {
InvalidationChunk *chunk = *listHdr; InvalMessageArray *ima = &InvalMessageArrays[subgroup];
int nextindex = group->nextmsg[subgroup];
if (chunk == NULL) if (nextindex >= ima->maxmsgs)
{ {
/* First time through; create initial chunk */ if (ima->msgs == NULL)
#define FIRSTCHUNKSIZE 32 {
chunk = (InvalidationChunk *) /* Create new storage array in TopTransactionContext */
MemoryContextAlloc(CurTransactionContext, int reqsize = 32; /* arbitrary */
offsetof(InvalidationChunk, msgs) +
FIRSTCHUNKSIZE * sizeof(SharedInvalidationMessage));
chunk->nitems = 0;
chunk->maxitems = FIRSTCHUNKSIZE;
chunk->next = *listHdr;
*listHdr = chunk;
}
else if (chunk->nitems >= chunk->maxitems)
{
/* Need another chunk; double size of last chunk */
int chunksize = 2 * chunk->maxitems;
chunk = (InvalidationChunk *) ima->msgs = (SharedInvalidationMessage *)
MemoryContextAlloc(CurTransactionContext, MemoryContextAlloc(TopTransactionContext,
offsetof(InvalidationChunk, msgs) + reqsize * sizeof(SharedInvalidationMessage));
chunksize * sizeof(SharedInvalidationMessage)); ima->maxmsgs = reqsize;
chunk->nitems = 0; Assert(nextindex == 0);
chunk->maxitems = chunksize; }
chunk->next = *listHdr; else
*listHdr = chunk; {
/* Enlarge storage array */
int reqsize = 2 * ima->maxmsgs;
ima->msgs = (SharedInvalidationMessage *)
repalloc(ima->msgs,
reqsize * sizeof(SharedInvalidationMessage));
ima->maxmsgs = reqsize;
}
} }
/* Okay, add message to current chunk */ /* Okay, add message to current group */
chunk->msgs[chunk->nitems] = *msg; ima->msgs[nextindex] = *msg;
chunk->nitems++; group->nextmsg[subgroup]++;
} }
/* /*
* Append one list of invalidation message chunks to another, resetting * Append one subgroup of invalidation messages to another, resetting
* the source chunk-list pointer to NULL. * the source subgroup to empty.
*/ */
static void static void
AppendInvalidationMessageList(InvalidationChunk **destHdr, AppendInvalidationMessageSubGroup(InvalidationMsgsGroup *dest,
InvalidationChunk **srcHdr) InvalidationMsgsGroup *src,
int subgroup)
{ {
InvalidationChunk *chunk = *srcHdr; /* Messages must be adjacent in main array */
Assert(dest->nextmsg[subgroup] == src->firstmsg[subgroup]);
if (chunk == NULL) /* ... which makes this easy: */
return; /* nothing to do */ dest->nextmsg[subgroup] = src->nextmsg[subgroup];
while (chunk->next != NULL) /*
chunk = chunk->next; * This is handy for some callers and irrelevant for others. But we do it
* always, reasoning that it's bad to leave different groups pointing at
chunk->next = *destHdr; * the same fragment of the message array.
*/
*destHdr = *srcHdr; SetSubGroupToFollow(src, dest, subgroup);
*srcHdr = NULL;
} }
/* /*
* Process a list of invalidation messages. * Process a subgroup of invalidation messages.
* *
* This is a macro that executes the given code fragment for each message in * This is a macro that executes the given code fragment for each message in
* a message chunk list. The fragment should refer to the message as *msg. * a message subgroup. The fragment should refer to the message as *msg.
*/ */
#define ProcessMessageList(listHdr, codeFragment) \ #define ProcessMessageSubGroup(group, subgroup, codeFragment) \
do { \ do { \
InvalidationChunk *_chunk; \ int _msgindex = (group)->firstmsg[subgroup]; \
for (_chunk = (listHdr); _chunk != NULL; _chunk = _chunk->next) \ int _endmsg = (group)->nextmsg[subgroup]; \
for (; _msgindex < _endmsg; _msgindex++) \
{ \ { \
int _cindex; \ SharedInvalidationMessage *msg = \
for (_cindex = 0; _cindex < _chunk->nitems; _cindex++) \ &InvalMessageArrays[subgroup].msgs[_msgindex]; \
{ \ codeFragment; \
SharedInvalidationMessage *msg = &_chunk->msgs[_cindex]; \
codeFragment; \
} \
} \ } \
} while (0) } while (0)
/* /*
* Process a list of invalidation messages group-wise. * Process a subgroup of invalidation messages as an array.
* *
* As above, but the code fragment can handle an array of messages. * As above, but the code fragment can handle an array of messages.
* The fragment should refer to the messages as msgs[], with n entries. * The fragment should refer to the messages as msgs[], with n entries.
*/ */
#define ProcessMessageListMulti(listHdr, codeFragment) \ #define ProcessMessageSubGroupMulti(group, subgroup, codeFragment) \
do { \ do { \
InvalidationChunk *_chunk; \ int n = NumMessagesInSubGroup(group, subgroup); \
for (_chunk = (listHdr); _chunk != NULL; _chunk = _chunk->next) \ if (n > 0) { \
{ \ SharedInvalidationMessage *msgs = \
SharedInvalidationMessage *msgs = _chunk->msgs; \ &InvalMessageArrays[subgroup].msgs[(group)->firstmsg[subgroup]]; \
int n = _chunk->nitems; \
codeFragment; \ codeFragment; \
} \ } \
} while (0) } while (0)
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* Invalidation set support functions * Invalidation group support functions
* *
* These routines understand about the division of a logical invalidation * These routines understand about the division of a logical invalidation
* list into separate physical lists for catcache and relcache entries. * group into separate physical arrays for catcache and relcache entries.
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
@ -343,7 +378,7 @@ AppendInvalidationMessageList(InvalidationChunk **destHdr,
* Add a catcache inval entry * Add a catcache inval entry
*/ */
static void static void
AddCatcacheInvalidationMessage(InvalidationListHeader *hdr, AddCatcacheInvalidationMessage(InvalidationMsgsGroup *group,
int id, uint32 hashValue, Oid dbId) int id, uint32 hashValue, Oid dbId)
{ {
SharedInvalidationMessage msg; SharedInvalidationMessage msg;
@ -364,14 +399,14 @@ AddCatcacheInvalidationMessage(InvalidationListHeader *hdr,
*/ */
VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg)); VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
AddInvalidationMessage(&hdr->cclist, &msg); AddInvalidationMessage(group, CatCacheMsgs, &msg);
} }
/* /*
* Add a whole-catalog inval entry * Add a whole-catalog inval entry
*/ */
static void static void
AddCatalogInvalidationMessage(InvalidationListHeader *hdr, AddCatalogInvalidationMessage(InvalidationMsgsGroup *group,
Oid dbId, Oid catId) Oid dbId, Oid catId)
{ {
SharedInvalidationMessage msg; SharedInvalidationMessage msg;
@ -382,14 +417,14 @@ AddCatalogInvalidationMessage(InvalidationListHeader *hdr,
/* check AddCatcacheInvalidationMessage() for an explanation */ /* check AddCatcacheInvalidationMessage() for an explanation */
VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg)); VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
AddInvalidationMessage(&hdr->cclist, &msg); AddInvalidationMessage(group, CatCacheMsgs, &msg);
} }
/* /*
* Add a relcache inval entry * Add a relcache inval entry
*/ */
static void static void
AddRelcacheInvalidationMessage(InvalidationListHeader *hdr, AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
Oid dbId, Oid relId) Oid dbId, Oid relId)
{ {
SharedInvalidationMessage msg; SharedInvalidationMessage msg;
@ -399,11 +434,11 @@ AddRelcacheInvalidationMessage(InvalidationListHeader *hdr,
* it will never change. InvalidOid for relId means all relations so we * it will never change. InvalidOid for relId means all relations so we
* don't need to add individual ones when it is present. * don't need to add individual ones when it is present.
*/ */
ProcessMessageList(hdr->rclist, ProcessMessageSubGroup(group, RelCacheMsgs,
if (msg->rc.id == SHAREDINVALRELCACHE_ID && if (msg->rc.id == SHAREDINVALRELCACHE_ID &&
(msg->rc.relId == relId || (msg->rc.relId == relId ||
msg->rc.relId == InvalidOid)) msg->rc.relId == InvalidOid))
return); return);
/* OK, add the item */ /* OK, add the item */
msg.rc.id = SHAREDINVALRELCACHE_ID; msg.rc.id = SHAREDINVALRELCACHE_ID;
@ -412,24 +447,26 @@ AddRelcacheInvalidationMessage(InvalidationListHeader *hdr,
/* check AddCatcacheInvalidationMessage() for an explanation */ /* check AddCatcacheInvalidationMessage() for an explanation */
VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg)); VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
AddInvalidationMessage(&hdr->rclist, &msg); AddInvalidationMessage(group, RelCacheMsgs, &msg);
} }
/* /*
* Add a snapshot inval entry * Add a snapshot inval entry
*
* We put these into the relcache subgroup for simplicity.
*/ */
static void static void
AddSnapshotInvalidationMessage(InvalidationListHeader *hdr, AddSnapshotInvalidationMessage(InvalidationMsgsGroup *group,
Oid dbId, Oid relId) Oid dbId, Oid relId)
{ {
SharedInvalidationMessage msg; SharedInvalidationMessage msg;
/* Don't add a duplicate item */ /* Don't add a duplicate item */
/* We assume dbId need not be checked because it will never change */ /* We assume dbId need not be checked because it will never change */
ProcessMessageList(hdr->rclist, ProcessMessageSubGroup(group, RelCacheMsgs,
if (msg->sn.id == SHAREDINVALSNAPSHOT_ID && if (msg->sn.id == SHAREDINVALSNAPSHOT_ID &&
msg->sn.relId == relId) msg->sn.relId == relId)
return); return);
/* OK, add the item */ /* OK, add the item */
msg.sn.id = SHAREDINVALSNAPSHOT_ID; msg.sn.id = SHAREDINVALSNAPSHOT_ID;
@ -438,33 +475,33 @@ AddSnapshotInvalidationMessage(InvalidationListHeader *hdr,
/* check AddCatcacheInvalidationMessage() for an explanation */ /* check AddCatcacheInvalidationMessage() for an explanation */
VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg)); VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
AddInvalidationMessage(&hdr->rclist, &msg); AddInvalidationMessage(group, RelCacheMsgs, &msg);
} }
/* /*
* Append one list of invalidation messages to another, resetting * Append one group of invalidation messages to another, resetting
* the source list to empty. * the source group to empty.
*/ */
static void static void
AppendInvalidationMessages(InvalidationListHeader *dest, AppendInvalidationMessages(InvalidationMsgsGroup *dest,
InvalidationListHeader *src) InvalidationMsgsGroup *src)
{ {
AppendInvalidationMessageList(&dest->cclist, &src->cclist); AppendInvalidationMessageSubGroup(dest, src, CatCacheMsgs);
AppendInvalidationMessageList(&dest->rclist, &src->rclist); AppendInvalidationMessageSubGroup(dest, src, RelCacheMsgs);
} }
/* /*
* Execute the given function for all the messages in an invalidation list. * Execute the given function for all the messages in an invalidation group.
* The list is not altered. * The group is not altered.
* *
* catcache entries are processed first, for reasons mentioned above. * catcache entries are processed first, for reasons mentioned above.
*/ */
static void static void
ProcessInvalidationMessages(InvalidationListHeader *hdr, ProcessInvalidationMessages(InvalidationMsgsGroup *group,
void (*func) (SharedInvalidationMessage *msg)) void (*func) (SharedInvalidationMessage *msg))
{ {
ProcessMessageList(hdr->cclist, func(msg)); ProcessMessageSubGroup(group, CatCacheMsgs, func(msg));
ProcessMessageList(hdr->rclist, func(msg)); ProcessMessageSubGroup(group, RelCacheMsgs, func(msg));
} }
/* /*
@ -472,11 +509,11 @@ ProcessInvalidationMessages(InvalidationListHeader *hdr,
* rather than just one at a time. * rather than just one at a time.
*/ */
static void static void
ProcessInvalidationMessagesMulti(InvalidationListHeader *hdr, ProcessInvalidationMessagesMulti(InvalidationMsgsGroup *group,
void (*func) (const SharedInvalidationMessage *msgs, int n)) void (*func) (const SharedInvalidationMessage *msgs, int n))
{ {
ProcessMessageListMulti(hdr->cclist, func(msgs, n)); ProcessMessageSubGroupMulti(group, CatCacheMsgs, func(msgs, n));
ProcessMessageListMulti(hdr->rclist, func(msgs, n)); ProcessMessageSubGroupMulti(group, RelCacheMsgs, func(msgs, n));
} }
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
@ -731,7 +768,7 @@ AcceptInvalidationMessages(void)
/* /*
* PrepareInvalidationState * PrepareInvalidationState
* Initialize inval lists for the current (sub)transaction. * Initialize inval data for the current (sub)transaction.
*/ */
static void static void
PrepareInvalidationState(void) PrepareInvalidationState(void)
@ -748,12 +785,45 @@ PrepareInvalidationState(void)
myInfo->parent = transInvalInfo; myInfo->parent = transInvalInfo;
myInfo->my_level = GetCurrentTransactionNestLevel(); myInfo->my_level = GetCurrentTransactionNestLevel();
/* /* Now, do we have a previous stack entry? */
* If there's any previous entry, this one should be for a deeper nesting if (transInvalInfo != NULL)
* level. {
*/ /* Yes; this one should be for a deeper nesting level. */
Assert(transInvalInfo == NULL || Assert(myInfo->my_level > transInvalInfo->my_level);
myInfo->my_level > transInvalInfo->my_level);
/*
* The parent (sub)transaction must not have any current (i.e.,
* not-yet-locally-processed) messages. If it did, we'd have a
* semantic problem: the new subtransaction presumably ought not be
* able to see those events yet, but since the CommandCounter is
* linear, that can't work once the subtransaction advances the
* counter. This is a convenient place to check for that, as well as
* being important to keep management of the message arrays simple.
*/
if (NumMessagesInGroup(&transInvalInfo->CurrentCmdInvalidMsgs) != 0)
elog(ERROR, "cannot start a subtransaction when there are unprocessed inval messages");
/*
* MemoryContextAllocZero set firstmsg = nextmsg = 0 in each group,
* which is fine for the first (sub)transaction, but otherwise we need
* to update them to follow whatever is already in the arrays.
*/
SetGroupToFollow(&myInfo->PriorCmdInvalidMsgs,
&transInvalInfo->CurrentCmdInvalidMsgs);
SetGroupToFollow(&myInfo->CurrentCmdInvalidMsgs,
&myInfo->PriorCmdInvalidMsgs);
}
else
{
/*
* Here, we need only clear any array pointers left over from a prior
* transaction.
*/
InvalMessageArrays[CatCacheMsgs].msgs = NULL;
InvalMessageArrays[CatCacheMsgs].maxmsgs = 0;
InvalMessageArrays[RelCacheMsgs].msgs = NULL;
InvalMessageArrays[RelCacheMsgs].maxmsgs = 0;
}
transInvalInfo = myInfo; transInvalInfo = myInfo;
} }
@ -777,47 +847,8 @@ PostPrepare_Inval(void)
} }
/* /*
* Collect invalidation messages into SharedInvalidMessagesArray array. * xactGetCommittedInvalidationMessages() is called by
*/ * RecordTransactionCommit() to collect invalidation messages to add to the
static void
MakeSharedInvalidMessagesArray(const SharedInvalidationMessage *msgs, int n)
{
/*
* Initialise array first time through in each commit
*/
if (SharedInvalidMessagesArray == NULL)
{
maxSharedInvalidMessagesArray = FIRSTCHUNKSIZE;
numSharedInvalidMessagesArray = 0;
/*
* Although this is being palloc'd we don't actually free it directly.
* We're so close to EOXact that we now we're going to lose it anyhow.
*/
SharedInvalidMessagesArray = palloc(maxSharedInvalidMessagesArray
* sizeof(SharedInvalidationMessage));
}
if ((numSharedInvalidMessagesArray + n) > maxSharedInvalidMessagesArray)
{
maxSharedInvalidMessagesArray = pg_nextpower2_32(numSharedInvalidMessagesArray + n);
SharedInvalidMessagesArray = repalloc(SharedInvalidMessagesArray,
maxSharedInvalidMessagesArray
* sizeof(SharedInvalidationMessage));
}
/*
* Append the next chunk onto the array
*/
memcpy(SharedInvalidMessagesArray + numSharedInvalidMessagesArray,
msgs, n * sizeof(SharedInvalidationMessage));
numSharedInvalidMessagesArray += n;
}
/*
* xactGetCommittedInvalidationMessages() is executed by
* RecordTransactionCommit() to add invalidation messages onto the
* commit record. This applies only to commit message types, never to * commit record. This applies only to commit message types, never to
* abort records. Must always run before AtEOXact_Inval(), since that * abort records. Must always run before AtEOXact_Inval(), since that
* removes the data we need to see. * removes the data we need to see.
@ -832,7 +863,9 @@ int
xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs, xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
bool *RelcacheInitFileInval) bool *RelcacheInitFileInval)
{ {
MemoryContext oldcontext; SharedInvalidationMessage *msgarray;
int nummsgs;
int nmsgs;
/* Quick exit if we haven't done anything with invalidation messages. */ /* Quick exit if we haven't done anything with invalidation messages. */
if (transInvalInfo == NULL) if (transInvalInfo == NULL)
@ -853,27 +886,48 @@ xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
*RelcacheInitFileInval = transInvalInfo->RelcacheInitFileInval; *RelcacheInitFileInval = transInvalInfo->RelcacheInitFileInval;
/* /*
* Walk through TransInvalidationInfo to collect all the messages into a * Collect all the pending messages into a single contiguous array of
* single contiguous array of invalidation messages. It must be contiguous * invalidation messages, to simplify what needs to happen while building
* so we can copy directly into WAL message. Maintain the order that they * the commit WAL message. Maintain the order that they would be
* would be processed in by AtEOXact_Inval(), to ensure emulated behaviour * processed in by AtEOXact_Inval(), to ensure emulated behaviour in redo
* in redo is as similar as possible to original. We want the same bugs, * is as similar as possible to original. We want the same bugs, if any,
* if any, not new ones. * not new ones.
*/ */
oldcontext = MemoryContextSwitchTo(CurTransactionContext); nummsgs = NumMessagesInGroup(&transInvalInfo->PriorCmdInvalidMsgs) +
NumMessagesInGroup(&transInvalInfo->CurrentCmdInvalidMsgs);
ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs, *msgs = msgarray = (SharedInvalidationMessage *)
MakeSharedInvalidMessagesArray); MemoryContextAlloc(CurTransactionContext,
ProcessInvalidationMessagesMulti(&transInvalInfo->PriorCmdInvalidMsgs, nummsgs * sizeof(SharedInvalidationMessage));
MakeSharedInvalidMessagesArray);
MemoryContextSwitchTo(oldcontext);
Assert(!(numSharedInvalidMessagesArray > 0 && nmsgs = 0;
SharedInvalidMessagesArray == NULL)); ProcessMessageSubGroupMulti(&transInvalInfo->PriorCmdInvalidMsgs,
CatCacheMsgs,
(memcpy(msgarray + nmsgs,
msgs,
n * sizeof(SharedInvalidationMessage)),
nmsgs += n));
ProcessMessageSubGroupMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
CatCacheMsgs,
(memcpy(msgarray + nmsgs,
msgs,
n * sizeof(SharedInvalidationMessage)),
nmsgs += n));
ProcessMessageSubGroupMulti(&transInvalInfo->PriorCmdInvalidMsgs,
RelCacheMsgs,
(memcpy(msgarray + nmsgs,
msgs,
n * sizeof(SharedInvalidationMessage)),
nmsgs += n));
ProcessMessageSubGroupMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
RelCacheMsgs,
(memcpy(msgarray + nmsgs,
msgs,
n * sizeof(SharedInvalidationMessage)),
nmsgs += n));
Assert(nmsgs == nummsgs);
*msgs = SharedInvalidMessagesArray; return nmsgs;
return numSharedInvalidMessagesArray;
} }
/* /*
@ -942,7 +996,7 @@ ProcessCommittedInvalidationMessages(SharedInvalidationMessage *msgs,
* about CurrentCmdInvalidMsgs too, since those changes haven't touched * about CurrentCmdInvalidMsgs too, since those changes haven't touched
* the caches yet. * the caches yet.
* *
* In any case, reset the various lists to empty. We need not physically * In any case, reset our state to empty. We need not physically
* free memory here, since TopTransactionContext is about to be emptied * free memory here, since TopTransactionContext is about to be emptied
* anyway. * anyway.
* *
@ -986,8 +1040,6 @@ AtEOXact_Inval(bool isCommit)
/* Need not free anything explicitly */ /* Need not free anything explicitly */
transInvalInfo = NULL; transInvalInfo = NULL;
SharedInvalidMessagesArray = NULL;
numSharedInvalidMessagesArray = 0;
} }
/* /*
@ -1043,10 +1095,21 @@ AtEOSubXact_Inval(bool isCommit)
return; return;
} }
/* Pass up my inval messages to parent */ /*
* Pass up my inval messages to parent. Notice that we stick them in
* PriorCmdInvalidMsgs, not CurrentCmdInvalidMsgs, since they've
* already been locally processed. (This would trigger the Assert in
* AppendInvalidationMessageSubGroup if the parent's
* CurrentCmdInvalidMsgs isn't empty; but we already checked that in
* PrepareInvalidationState.)
*/
AppendInvalidationMessages(&myInfo->parent->PriorCmdInvalidMsgs, AppendInvalidationMessages(&myInfo->parent->PriorCmdInvalidMsgs,
&myInfo->PriorCmdInvalidMsgs); &myInfo->PriorCmdInvalidMsgs);
/* Must readjust parent's CurrentCmdInvalidMsgs indexes now */
SetGroupToFollow(&myInfo->parent->CurrentCmdInvalidMsgs,
&myInfo->parent->PriorCmdInvalidMsgs);
/* Pending relcache inval becomes parent's problem too */ /* Pending relcache inval becomes parent's problem too */
if (myInfo->RelcacheInitFileInval) if (myInfo->RelcacheInitFileInval)
myInfo->parent->RelcacheInitFileInval = true; myInfo->parent->RelcacheInitFileInval = true;
@ -1514,31 +1577,24 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
/* /*
* LogLogicalInvalidations * LogLogicalInvalidations
* *
* Emit WAL for invalidations. This is currently only used for logging * Emit WAL for invalidations caused by the current command.
* invalidations at the command end or at commit time if any invalidations *
* are pending. * This is currently only used for logging invalidations at the command end
* or at commit time if any invalidations are pending.
*/ */
void void
LogLogicalInvalidations() LogLogicalInvalidations(void)
{ {
xl_xact_invals xlrec; xl_xact_invals xlrec;
SharedInvalidationMessage *invalMessages; InvalidationMsgsGroup *group;
int nmsgs = 0; int nmsgs;
/* Quick exit if we haven't done anything with invalidation messages. */ /* Quick exit if we haven't done anything with invalidation messages. */
if (transInvalInfo == NULL) if (transInvalInfo == NULL)
return; return;
ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs, group = &transInvalInfo->CurrentCmdInvalidMsgs;
MakeSharedInvalidMessagesArray); nmsgs = NumMessagesInGroup(group);
Assert(!(numSharedInvalidMessagesArray > 0 &&
SharedInvalidMessagesArray == NULL));
invalMessages = SharedInvalidMessagesArray;
nmsgs = numSharedInvalidMessagesArray;
SharedInvalidMessagesArray = NULL;
numSharedInvalidMessagesArray = 0;
if (nmsgs > 0) if (nmsgs > 0)
{ {
@ -1549,10 +1605,12 @@ LogLogicalInvalidations()
/* perform insertion */ /* perform insertion */
XLogBeginInsert(); XLogBeginInsert();
XLogRegisterData((char *) (&xlrec), MinSizeOfXactInvals); XLogRegisterData((char *) (&xlrec), MinSizeOfXactInvals);
XLogRegisterData((char *) invalMessages, ProcessMessageSubGroupMulti(group, CatCacheMsgs,
nmsgs * sizeof(SharedInvalidationMessage)); XLogRegisterData((char *) msgs,
n * sizeof(SharedInvalidationMessage)));
ProcessMessageSubGroupMulti(group, RelCacheMsgs,
XLogRegisterData((char *) msgs,
n * sizeof(SharedInvalidationMessage)));
XLogInsert(RM_XACT_ID, XLOG_XACT_INVALIDATIONS); XLogInsert(RM_XACT_ID, XLOG_XACT_INVALIDATIONS);
pfree(invalMessages);
} }
} }