mirror of
https://github.com/postgres/postgres.git
synced 2025-12-21 05:21:08 +03:00
On Publication rename, we need to only invalidate the RelationSyncCache entries corresponding to relations that are part of the publication being renamed. As part of this patch, we introduce a new invalidation message to invalidate the cache maintained by the logical decoding output plugin. We can't use existing relcache invalidation for this purpose, as that would unnecessarily cause relcache invalidations in other backends. This will improve performance by building fewer relation cache entries during logical replication. Author: Hayato Kuroda <kuroda.hayato@fujitsu.com> Author: Shlok Kyal <shlok.kyal.oss@gmail.com> Reviewed-by: Hou Zhijie <houzj.fnst@fujitsu.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Discussion: https://postgr.es/m/OSCPR01MB14966C09AA201EFFA706576A7F5C92@OSCPR01MB14966.jpnprd01.prod.outlook.com
141 lines
3.7 KiB
C
141 lines
3.7 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* standbydesc.c
|
|
* rmgr descriptor routines for storage/ipc/standby.c
|
|
*
|
|
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/access/rmgrdesc/standbydesc.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
#include "postgres.h"
|
|
|
|
#include "storage/standbydefs.h"
|
|
|
|
static void
|
|
standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
|
|
{
|
|
int i;
|
|
|
|
appendStringInfo(buf, "nextXid %u latestCompletedXid %u oldestRunningXid %u",
|
|
xlrec->nextXid,
|
|
xlrec->latestCompletedXid,
|
|
xlrec->oldestRunningXid);
|
|
if (xlrec->xcnt > 0)
|
|
{
|
|
appendStringInfo(buf, "; %d xacts:", xlrec->xcnt);
|
|
for (i = 0; i < xlrec->xcnt; i++)
|
|
appendStringInfo(buf, " %u", xlrec->xids[i]);
|
|
}
|
|
|
|
if (xlrec->subxid_overflow)
|
|
appendStringInfoString(buf, "; subxid overflowed");
|
|
|
|
if (xlrec->subxcnt > 0)
|
|
{
|
|
appendStringInfo(buf, "; %d subxacts:", xlrec->subxcnt);
|
|
for (i = 0; i < xlrec->subxcnt; i++)
|
|
appendStringInfo(buf, " %u", xlrec->xids[xlrec->xcnt + i]);
|
|
}
|
|
}
|
|
|
|
void
|
|
standby_desc(StringInfo buf, XLogReaderState *record)
|
|
{
|
|
char *rec = XLogRecGetData(record);
|
|
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
|
|
|
|
if (info == XLOG_STANDBY_LOCK)
|
|
{
|
|
xl_standby_locks *xlrec = (xl_standby_locks *) rec;
|
|
int i;
|
|
|
|
for (i = 0; i < xlrec->nlocks; i++)
|
|
appendStringInfo(buf, "xid %u db %u rel %u ",
|
|
xlrec->locks[i].xid, xlrec->locks[i].dbOid,
|
|
xlrec->locks[i].relOid);
|
|
}
|
|
else if (info == XLOG_RUNNING_XACTS)
|
|
{
|
|
xl_running_xacts *xlrec = (xl_running_xacts *) rec;
|
|
|
|
standby_desc_running_xacts(buf, xlrec);
|
|
}
|
|
else if (info == XLOG_INVALIDATIONS)
|
|
{
|
|
xl_invalidations *xlrec = (xl_invalidations *) rec;
|
|
|
|
standby_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs,
|
|
xlrec->dbId, xlrec->tsId,
|
|
xlrec->relcacheInitFileInval);
|
|
}
|
|
}
|
|
|
|
const char *
|
|
standby_identify(uint8 info)
|
|
{
|
|
const char *id = NULL;
|
|
|
|
switch (info & ~XLR_INFO_MASK)
|
|
{
|
|
case XLOG_STANDBY_LOCK:
|
|
id = "LOCK";
|
|
break;
|
|
case XLOG_RUNNING_XACTS:
|
|
id = "RUNNING_XACTS";
|
|
break;
|
|
case XLOG_INVALIDATIONS:
|
|
id = "INVALIDATIONS";
|
|
break;
|
|
}
|
|
|
|
return id;
|
|
}
|
|
|
|
/* also used by non-standby records having analogous invalidation fields */
|
|
void
|
|
standby_desc_invalidations(StringInfo buf,
|
|
int nmsgs, SharedInvalidationMessage *msgs,
|
|
Oid dbId, Oid tsId,
|
|
bool relcacheInitFileInval)
|
|
{
|
|
int i;
|
|
|
|
/* Do nothing if there are no invalidation messages */
|
|
if (nmsgs <= 0)
|
|
return;
|
|
|
|
if (relcacheInitFileInval)
|
|
appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
|
|
dbId, tsId);
|
|
|
|
appendStringInfoString(buf, "; inval msgs:");
|
|
for (i = 0; i < nmsgs; i++)
|
|
{
|
|
SharedInvalidationMessage *msg = &msgs[i];
|
|
|
|
if (msg->id >= 0)
|
|
appendStringInfo(buf, " catcache %d", msg->id);
|
|
else if (msg->id == SHAREDINVALCATALOG_ID)
|
|
appendStringInfo(buf, " catalog %u", msg->cat.catId);
|
|
else if (msg->id == SHAREDINVALRELCACHE_ID)
|
|
appendStringInfo(buf, " relcache %u", msg->rc.relId);
|
|
/* not expected, but print something anyway */
|
|
else if (msg->id == SHAREDINVALSMGR_ID)
|
|
appendStringInfoString(buf, " smgr");
|
|
/* not expected, but print something anyway */
|
|
else if (msg->id == SHAREDINVALRELMAP_ID)
|
|
appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
|
|
else if (msg->id == SHAREDINVALSNAPSHOT_ID)
|
|
appendStringInfo(buf, " snapshot %u", msg->sn.relId);
|
|
else if (msg->id == SHAREDINVALRELSYNC_ID)
|
|
appendStringInfo(buf, " relsync %u", msg->rs.relid);
|
|
else
|
|
appendStringInfo(buf, " unrecognized id %d", msg->id);
|
|
}
|
|
}
|