1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-25 13:17:41 +03:00

Emit invalidations to standby for transactions without xid.

So far, when a transaction with pending invalidations, but without an
assigned xid, committed, we simply ignored those invalidation
messages. That's problematic, because those are actually sent for a
reason.

Known symptoms of this include that existing sessions on a hot-standby
replica sometimes fail to notice new concurrently built indexes and
visibility map updates.

The solution is to WAL log such invalidations in transactions without an
xid. We considered to alternatively force-assign an xid, but that'd be
problematic for vacuum, which might be run in systems with few xids.

Important: This adds a new WAL record, but as the patch has to be
back-patched, we can't bump the WAL page magic. This means that standbys
have to be updated before primaries; otherwise
"PANIC: standby_redo: unknown op code 32" errors can be encountered.

XXX:

Reported-By: Васильев Дмитрий, Masahiko Sawada
Discussion:
    CAB-SwXY6oH=9twBkXJtgR4UC1NqT-vpYAtxCseME62ADwyK5OA@mail.gmail.com
    CAD21AoDpZ6Xjg=gFrGPnSn4oTRRcwK1EBrWCq9OqOHuAcMMC=w@mail.gmail.com
This commit is contained in:
Andres Freund
2016-04-23 19:18:00 -07:00
parent 2ac3be2e76
commit c6ff84b06a
10 changed files with 181 additions and 48 deletions

View File

@@ -58,6 +58,14 @@ standby_desc(StringInfo buf, XLogReaderState *record)
standby_desc_running_xacts(buf, xlrec);
}
else if (info == XLOG_INVALIDATIONS)
{
xl_invalidations *xlrec = (xl_invalidations *) rec;
standby_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs,
xlrec->dbId, xlrec->tsId,
xlrec->relcacheInitFileInval);
}
}
const char *
@@ -73,7 +81,53 @@ standby_identify(uint8 info)
case XLOG_RUNNING_XACTS:
id = "RUNNING_XACTS";
break;
case XLOG_INVALIDATIONS:
id = "INVALIDATIONS";
break;
}
return id;
}
/*
* This routine is used by both standby_desc and xact_desc, because
* transaction commits and XLOG_INVALIDATIONS messages contain invalidations;
* it seems pointless to duplicate the code.
*/
void
standby_desc_invalidations(StringInfo buf,
int nmsgs, SharedInvalidationMessage *msgs,
Oid dbId, Oid tsId,
bool relcacheInitFileInval)
{
int i;
if (relcacheInitFileInval)
appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
dbId, tsId);
appendStringInfoString(buf, "; inval msgs:");
for (i = 0; i < nmsgs; i++)
{
SharedInvalidationMessage *msg = &msgs[i];
if (msg->id >= 0)
appendStringInfo(buf, " catcache %d", msg->id);
else if (msg->id == SHAREDINVALCATALOG_ID)
appendStringInfo(buf, " catalog %u", msg->cat.catId);
else if (msg->id == SHAREDINVALRELCACHE_ID)
appendStringInfo(buf, " relcache %u", msg->rc.relId);
/* not expected, but print something anyway */
else if (msg->id == SHAREDINVALSMGR_ID)
appendStringInfoString(buf, " smgr");
/* not expected, but print something anyway */
else if (msg->id == SHAREDINVALRELMAP_ID)
appendStringInfoString(buf, " relmap");
else if (msg->id == SHAREDINVALRELMAP_ID)
appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
else if (msg->id == SHAREDINVALSNAPSHOT_ID)
appendStringInfo(buf, " snapshot %u", msg->sn.relId);
else
appendStringInfo(buf, " unknown id %d", msg->id);
}
}

View File

@@ -18,6 +18,7 @@
#include "access/xact.h"
#include "catalog/catalog.h"
#include "storage/sinval.h"
#include "storage/standbydefs.h"
#include "utils/timestamp.h"
/*
@@ -203,32 +204,9 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId
}
if (parsed.nmsgs > 0)
{
if (XactCompletionRelcacheInitFileInval(parsed.xinfo))
appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
parsed.dbId, parsed.tsId);
appendStringInfoString(buf, "; inval msgs:");
for (i = 0; i < parsed.nmsgs; i++)
{
SharedInvalidationMessage *msg = &parsed.msgs[i];
if (msg->id >= 0)
appendStringInfo(buf, " catcache %d", msg->id);
else if (msg->id == SHAREDINVALCATALOG_ID)
appendStringInfo(buf, " catalog %u", msg->cat.catId);
else if (msg->id == SHAREDINVALRELCACHE_ID)
appendStringInfo(buf, " relcache %u", msg->rc.relId);
/* not expected, but print something anyway */
else if (msg->id == SHAREDINVALSMGR_ID)
appendStringInfoString(buf, " smgr");
/* not expected, but print something anyway */
else if (msg->id == SHAREDINVALRELMAP_ID)
appendStringInfoString(buf, " relmap");
else if (msg->id == SHAREDINVALSNAPSHOT_ID)
appendStringInfo(buf, " snapshot %u", msg->sn.relId);
else
appendStringInfo(buf, " unknown id %d", msg->id);
}
standby_desc_invalidations(
buf, parsed.nmsgs, parsed.msgs, parsed.dbId, parsed.tsId,
XactCompletionRelcacheInitFileInval(parsed.xinfo));
}
if (XactCompletionForceSyncCommit(parsed.xinfo))

View File

@@ -1163,6 +1163,24 @@ RecordTransactionCommit(void)
/* Can't have child XIDs either; AssignTransactionId enforces this */
Assert(nchildren == 0);
/*
* Transactions without an assigned xid can contain invalidation
* messages (e.g. explicit relcache invalidations or catcache
* invalidations for inplace updates); standbys need to process
* those. We can't emit a commit record without an xid, and we don't
* want to force assigning an xid, because that'd be problematic for
* e.g. vacuum. Hence we emit a bespoke record for the
* invalidations. We don't want to use that in case a commit record is
* emitted, so they happen synchronously with commits (besides not
* wanting to emit more WAL recoreds).
*/
if (nmsgs != 0)
{
LogStandbyInvalidations(nmsgs, invalMessages,
RelcacheInitFileInval);
wrote_xlog = true; /* not strictly necessary */
}
/*
* If we didn't create XLOG entries, we're done here; otherwise we
* should trigger flushing those entries the same as a commit record

View File

@@ -327,6 +327,15 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
break;
case XLOG_STANDBY_LOCK:
break;
case XLOG_INVALIDATIONS:
{
xl_invalidations *invalidations =
(xl_invalidations *) XLogRecGetData(r);
ReorderBufferImmediateInvalidation(
ctx->reorder, invalidations->nmsgs, invalidations->msgs);
}
break;
default:
elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
}

View File

@@ -1810,26 +1810,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
* catalog and we need to update the caches according to that.
*/
if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
{
bool use_subtxn = IsTransactionOrTransactionBlock();
if (use_subtxn)
BeginInternalSubTransaction("replay");
/*
* Force invalidations to happen outside of a valid transaction - that
* way entries will just be marked as invalid without accessing the
* catalog. That's advantageous because we don't need to setup the
* full state necessary for catalog access.
*/
if (use_subtxn)
AbortCurrentTransaction();
ReorderBufferExecuteInvalidations(rb, txn);
if (use_subtxn)
RollbackAndReleaseCurrentSubTransaction();
}
ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
txn->invalidations);
else
Assert(txn->ninvalidations == 0);
@@ -1837,6 +1819,37 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
ReorderBufferCleanupTXN(rb, txn);
}
/*
* Execute invalidations happening outside the context of a decoded
* transaction. That currently happens either for xid-less commits
* (c.f. RecordTransactionCommit()) or for invalidations in uninteresting
* transactions (via ReorderBufferForget()).
*/
void
ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
SharedInvalidationMessage *invalidations)
{
bool use_subtxn = IsTransactionOrTransactionBlock();
int i;
if (use_subtxn)
BeginInternalSubTransaction("replay");
/*
* Force invalidations to happen outside of a valid transaction - that
* way entries will just be marked as invalid without accessing the
* catalog. That's advantageous because we don't need to setup the
* full state necessary for catalog access.
*/
if (use_subtxn)
AbortCurrentTransaction();
for (i = 0; i < ninvalidations; i++)
LocalExecuteInvalidationMessage(&invalidations[i]);
if (use_subtxn)
RollbackAndReleaseCurrentSubTransaction();
}
/*
* Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at

View File

@@ -825,6 +825,16 @@ standby_redo(XLogReaderState *record)
ProcArrayApplyRecoveryInfo(&running);
}
else if (info == XLOG_INVALIDATIONS)
{
xl_invalidations *xlrec = (xl_invalidations *) XLogRecGetData(record);
ProcessCommittedInvalidationMessages(xlrec->msgs,
xlrec->nmsgs,
xlrec->relcacheInitFileInval,
xlrec->dbId,
xlrec->tsId);
}
else
elog(PANIC, "standby_redo: unknown op code %u", info);
}
@@ -1068,3 +1078,28 @@ LogAccessExclusiveLockPrepare(void)
*/
(void) GetTopTransactionId();
}
/*
* Emit WAL for invalidations. This currently is only used for commits without
* an xid but which contain invalidations.
*/
void
LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
bool relcacheInitFileInval)
{
xl_invalidations xlrec;
/* prepare record */
memset(&xlrec, 0, sizeof(xlrec));
xlrec.dbId = MyDatabaseId;
xlrec.tsId = MyDatabaseTableSpace;
xlrec.relcacheInitFileInval = relcacheInitFileInval;
xlrec.nmsgs = nmsgs;
/* perform insertion */
XLogBeginInsert();
XLogRegisterData((char *) (&xlrec), MinSizeOfInvalidations);
XLogRegisterData((char *) msgs,
nmsgs * sizeof(SharedInvalidationMessage));
XLogInsert(RM_STANDBY_ID, XLOG_INVALIDATIONS);
}

View File

@@ -842,8 +842,9 @@ xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
}
/*
* ProcessCommittedInvalidationMessages is executed by xact_redo_commit()
* to process invalidation messages added to commit records.
* ProcessCommittedInvalidationMessages is executed by xact_redo_commit() or
* standby_redo() to process invalidation messages. Currently that happens
* only at end-of-xact.
*
* Relcache init file invalidation requires processing both
* before and after we send the SI messages. See AtEOXact_Inval()

View File

@@ -391,6 +391,8 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn
CommandId cmin, CommandId cmax, CommandId combocid);
void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
Size nmsgs, SharedInvalidationMessage *msgs);
void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
SharedInvalidationMessage *invalidations);
void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);

View File

@@ -85,5 +85,7 @@ extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid);
extern void LogAccessExclusiveLockPrepare(void);
extern XLogRecPtr LogStandbySnapshot(void);
extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
bool relcacheInitFileInval);
#endif /* STANDBY_H */

View File

@@ -17,17 +17,23 @@
#include "access/xlogreader.h"
#include "lib/stringinfo.h"
#include "storage/lockdefs.h"
#include "storage/sinval.h"
/* Recovery handlers for the Standby Rmgr (RM_STANDBY_ID) */
extern void standby_redo(XLogReaderState *record);
extern void standby_desc(StringInfo buf, XLogReaderState *record);
extern const char *standby_identify(uint8 info);
extern void standby_desc_invalidations(StringInfo buf,
int nmsgs, SharedInvalidationMessage *msgs,
Oid dbId, Oid tsId,
bool relcacheInitFileInval);
/*
* XLOG message types
*/
#define XLOG_STANDBY_LOCK 0x00
#define XLOG_RUNNING_XACTS 0x10
#define XLOG_INVALIDATIONS 0x20
typedef struct xl_standby_locks
{
@@ -50,4 +56,19 @@ typedef struct xl_running_xacts
TransactionId xids[FLEXIBLE_ARRAY_MEMBER];
} xl_running_xacts;
/*
* Invalidations for standby, currently only when transactions without an
* assigned xid commit.
*/
typedef struct xl_invalidations
{
Oid dbId; /* MyDatabaseId */
Oid tsId; /* MyDatabaseTableSpace */
bool relcacheInitFileInval; /* invalidate relcache init file */
int nmsgs; /* number of shared inval msgs */
SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
} xl_invalidations;
#define MinSizeOfInvalidations offsetof(xl_invalidations, msgs)
#endif /* STANDBYDEFS_H */