mirror of
https://github.com/postgres/postgres.git
synced 2025-04-29 13:56:47 +03:00
This makes the code more consistent with the surroundings. Author: ChangAo Chen Reviewed-by: Ashutosh Bapat Discussion: https://postgr.es/m/CAExHW5tNTevUh58SKddTtcX3yU_5_PDSC8Mdp-Q2hc9PpZHRJg@mail.gmail.com
1321 lines
39 KiB
C
1321 lines
39 KiB
C
/* -------------------------------------------------------------------------
|
|
*
|
|
* decode.c
|
|
* This module decodes WAL records read using xlogreader.h's APIs for the
|
|
* purpose of logical decoding by passing information to the
|
|
* reorderbuffer module (containing the actual changes) and to the
|
|
* snapbuild module to build a fitting catalog snapshot (to be able to
|
|
* properly decode the changes in the reorderbuffer).
|
|
*
|
|
* NOTE:
|
|
* This basically tries to handle all low level xlog stuff for
|
|
* reorderbuffer.c and snapbuild.c. There's some minor leakage where a
|
|
* specific record's struct is used to pass data along, but those just
|
|
* happen to contain the right amount of data in a convenient
|
|
* format. There isn't and shouldn't be much intelligence about the
|
|
* contents of records in here except turning them into a more usable
|
|
* format.
|
|
*
|
|
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/replication/logical/decode.c
|
|
*
|
|
* -------------------------------------------------------------------------
|
|
*/
|
|
#include "postgres.h"
|
|
|
|
#include "access/heapam_xlog.h"
|
|
#include "access/transam.h"
|
|
#include "access/xact.h"
|
|
#include "access/xlog_internal.h"
|
|
#include "access/xlogreader.h"
|
|
#include "access/xlogrecord.h"
|
|
#include "catalog/pg_control.h"
|
|
#include "replication/decode.h"
|
|
#include "replication/logical.h"
|
|
#include "replication/message.h"
|
|
#include "replication/reorderbuffer.h"
|
|
#include "replication/snapbuild.h"
|
|
#include "storage/standbydefs.h"
|
|
|
|
/* individual record(group)'s handlers */
|
|
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
|
static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
|
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
|
static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
|
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
|
static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
|
|
|
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
|
xl_xact_parsed_commit *parsed, TransactionId xid,
|
|
bool two_phase);
|
|
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
|
xl_xact_parsed_abort *parsed, TransactionId xid,
|
|
bool two_phase);
|
|
static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
|
xl_xact_parsed_prepare *parsed);
|
|
|
|
|
|
/* common function to decode tuples */
|
|
static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple);
|
|
|
|
/* helper functions for decoding transactions */
|
|
static inline bool FilterPrepare(LogicalDecodingContext *ctx,
|
|
TransactionId xid, const char *gid);
|
|
static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
|
|
XLogRecordBuffer *buf, Oid txn_dbid,
|
|
RepOriginId origin_id);
|
|
|
|
/*
|
|
* Take every XLogReadRecord()ed record and perform the actions required to
|
|
* decode it using the output plugin already setup in the logical decoding
|
|
* context.
|
|
*
|
|
* NB: Note that every record's xid needs to be processed by reorderbuffer
|
|
* (xids contained in the content of records are not relevant for this rule).
|
|
* That means that for records which'd otherwise not go through the
|
|
* reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
|
|
* call ReorderBufferProcessXid for each record type by default, because
|
|
* e.g. empty xacts can be handled more efficiently if there's no previous
|
|
* state for them.
|
|
*
|
|
* We also support the ability to fast forward thru records, skipping some
|
|
* record types completely - see individual record types for details.
|
|
*/
|
|
void
|
|
LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
|
|
{
|
|
XLogRecordBuffer buf;
|
|
TransactionId txid;
|
|
RmgrData rmgr;
|
|
|
|
buf.origptr = ctx->reader->ReadRecPtr;
|
|
buf.endptr = ctx->reader->EndRecPtr;
|
|
buf.record = record;
|
|
|
|
txid = XLogRecGetTopXid(record);
|
|
|
|
/*
|
|
* If the top-level xid is valid, we need to assign the subxact to the
|
|
* top-level xact. We need to do this for all records, hence we do it
|
|
* before the switch.
|
|
*/
|
|
if (TransactionIdIsValid(txid))
|
|
{
|
|
ReorderBufferAssignChild(ctx->reorder,
|
|
txid,
|
|
XLogRecGetXid(record),
|
|
buf.origptr);
|
|
}
|
|
|
|
rmgr = GetRmgr(XLogRecGetRmid(record));
|
|
|
|
if (rmgr.rm_decode != NULL)
|
|
rmgr.rm_decode(ctx, &buf);
|
|
else
|
|
{
|
|
/* just deal with xid, and done */
|
|
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
|
|
buf.origptr);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Handle rmgr XLOG_ID records for LogicalDecodingProcessRecord().
|
|
*/
|
|
void
|
|
xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|
{
|
|
SnapBuild *builder = ctx->snapshot_builder;
|
|
uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
|
|
|
|
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
|
|
buf->origptr);
|
|
|
|
switch (info)
|
|
{
|
|
/* this is also used in END_OF_RECOVERY checkpoints */
|
|
case XLOG_CHECKPOINT_SHUTDOWN:
|
|
case XLOG_END_OF_RECOVERY:
|
|
SnapBuildSerializationPoint(builder, buf->origptr);
|
|
|
|
break;
|
|
case XLOG_CHECKPOINT_ONLINE:
|
|
|
|
/*
|
|
* a RUNNING_XACTS record will have been logged near to this, we
|
|
* can restart from there.
|
|
*/
|
|
break;
|
|
case XLOG_PARAMETER_CHANGE:
|
|
{
|
|
xl_parameter_change *xlrec =
|
|
(xl_parameter_change *) XLogRecGetData(buf->record);
|
|
|
|
/*
|
|
* If wal_level on the primary is reduced to less than
|
|
* logical, we want to prevent existing logical slots from
|
|
* being used. Existing logical slots on the standby get
|
|
* invalidated when this WAL record is replayed; and further,
|
|
* slot creation fails when wal_level is not sufficient; but
|
|
* all these operations are not synchronized, so a logical
|
|
* slot may creep in while the wal_level is being reduced.
|
|
* Hence this extra check.
|
|
*/
|
|
if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
|
|
{
|
|
/*
|
|
* This can occur only on a standby, as a primary would
|
|
* not allow to restart after changing wal_level < logical
|
|
* if there is pre-existing logical slot.
|
|
*/
|
|
Assert(RecoveryInProgress());
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary")));
|
|
}
|
|
break;
|
|
}
|
|
case XLOG_NOOP:
|
|
case XLOG_NEXTOID:
|
|
case XLOG_SWITCH:
|
|
case XLOG_BACKUP_END:
|
|
case XLOG_RESTORE_POINT:
|
|
case XLOG_FPW_CHANGE:
|
|
case XLOG_FPI_FOR_HINT:
|
|
case XLOG_FPI:
|
|
case XLOG_OVERWRITE_CONTRECORD:
|
|
case XLOG_CHECKPOINT_REDO:
|
|
break;
|
|
default:
|
|
elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Handle rmgr XACT_ID records for LogicalDecodingProcessRecord().
|
|
*/
|
|
void
|
|
xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|
{
|
|
SnapBuild *builder = ctx->snapshot_builder;
|
|
ReorderBuffer *reorder = ctx->reorder;
|
|
XLogReaderState *r = buf->record;
|
|
uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
|
|
|
|
/*
|
|
* If the snapshot isn't yet fully built, we cannot decode anything, so
|
|
* bail out.
|
|
*/
|
|
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
|
|
return;
|
|
|
|
switch (info)
|
|
{
|
|
case XLOG_XACT_COMMIT:
|
|
case XLOG_XACT_COMMIT_PREPARED:
|
|
{
|
|
xl_xact_commit *xlrec;
|
|
xl_xact_parsed_commit parsed;
|
|
TransactionId xid;
|
|
bool two_phase = false;
|
|
|
|
xlrec = (xl_xact_commit *) XLogRecGetData(r);
|
|
ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
|
|
|
|
if (!TransactionIdIsValid(parsed.twophase_xid))
|
|
xid = XLogRecGetXid(r);
|
|
else
|
|
xid = parsed.twophase_xid;
|
|
|
|
/*
|
|
* We would like to process the transaction in a two-phase
|
|
* manner iff output plugin supports two-phase commits and
|
|
* doesn't filter the transaction at prepare time.
|
|
*/
|
|
if (info == XLOG_XACT_COMMIT_PREPARED)
|
|
two_phase = !(FilterPrepare(ctx, xid,
|
|
parsed.twophase_gid));
|
|
|
|
DecodeCommit(ctx, buf, &parsed, xid, two_phase);
|
|
break;
|
|
}
|
|
case XLOG_XACT_ABORT:
|
|
case XLOG_XACT_ABORT_PREPARED:
|
|
{
|
|
xl_xact_abort *xlrec;
|
|
xl_xact_parsed_abort parsed;
|
|
TransactionId xid;
|
|
bool two_phase = false;
|
|
|
|
xlrec = (xl_xact_abort *) XLogRecGetData(r);
|
|
ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
|
|
|
|
if (!TransactionIdIsValid(parsed.twophase_xid))
|
|
xid = XLogRecGetXid(r);
|
|
else
|
|
xid = parsed.twophase_xid;
|
|
|
|
/*
|
|
* We would like to process the transaction in a two-phase
|
|
* manner iff output plugin supports two-phase commits and
|
|
* doesn't filter the transaction at prepare time.
|
|
*/
|
|
if (info == XLOG_XACT_ABORT_PREPARED)
|
|
two_phase = !(FilterPrepare(ctx, xid,
|
|
parsed.twophase_gid));
|
|
|
|
DecodeAbort(ctx, buf, &parsed, xid, two_phase);
|
|
break;
|
|
}
|
|
case XLOG_XACT_ASSIGNMENT:
|
|
|
|
/*
|
|
* We assign subxact to the toplevel xact while processing each
|
|
* record if required. So, we don't need to do anything here. See
|
|
* LogicalDecodingProcessRecord.
|
|
*/
|
|
break;
|
|
case XLOG_XACT_INVALIDATIONS:
|
|
{
|
|
TransactionId xid;
|
|
xl_xact_invals *invals;
|
|
|
|
xid = XLogRecGetXid(r);
|
|
invals = (xl_xact_invals *) XLogRecGetData(r);
|
|
|
|
/*
|
|
* Execute the invalidations for xid-less transactions,
|
|
* otherwise, accumulate them so that they can be processed at
|
|
* the commit time.
|
|
*/
|
|
if (TransactionIdIsValid(xid))
|
|
{
|
|
if (!ctx->fast_forward)
|
|
ReorderBufferAddInvalidations(reorder, xid,
|
|
buf->origptr,
|
|
invals->nmsgs,
|
|
invals->msgs);
|
|
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
|
|
buf->origptr);
|
|
}
|
|
else if (!ctx->fast_forward)
|
|
ReorderBufferImmediateInvalidation(ctx->reorder,
|
|
invals->nmsgs,
|
|
invals->msgs);
|
|
|
|
break;
|
|
}
|
|
case XLOG_XACT_PREPARE:
|
|
{
|
|
xl_xact_parsed_prepare parsed;
|
|
xl_xact_prepare *xlrec;
|
|
|
|
/* ok, parse it */
|
|
xlrec = (xl_xact_prepare *) XLogRecGetData(r);
|
|
ParsePrepareRecord(XLogRecGetInfo(buf->record),
|
|
xlrec, &parsed);
|
|
|
|
/*
|
|
* We would like to process the transaction in a two-phase
|
|
* manner iff output plugin supports two-phase commits and
|
|
* doesn't filter the transaction at prepare time.
|
|
*/
|
|
if (FilterPrepare(ctx, parsed.twophase_xid,
|
|
parsed.twophase_gid))
|
|
{
|
|
ReorderBufferProcessXid(reorder, parsed.twophase_xid,
|
|
buf->origptr);
|
|
break;
|
|
}
|
|
|
|
/*
|
|
* Note that if the prepared transaction has locked [user]
|
|
* catalog tables exclusively then decoding prepare can block
|
|
* till the main transaction is committed because it needs to
|
|
* lock the catalog tables.
|
|
*
|
|
* XXX Now, this can even lead to a deadlock if the prepare
|
|
* transaction is waiting to get it logically replicated for
|
|
* distributed 2PC. This can be avoided by disallowing
|
|
* preparing transactions that have locked [user] catalog
|
|
* tables exclusively but as of now, we ask users not to do
|
|
* such an operation.
|
|
*/
|
|
DecodePrepare(ctx, buf, &parsed);
|
|
break;
|
|
}
|
|
default:
|
|
elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Handle rmgr STANDBY_ID records for LogicalDecodingProcessRecord().
|
|
*/
|
|
void
|
|
standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|
{
|
|
SnapBuild *builder = ctx->snapshot_builder;
|
|
XLogReaderState *r = buf->record;
|
|
uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
|
|
|
|
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
|
|
|
|
switch (info)
|
|
{
|
|
case XLOG_RUNNING_XACTS:
|
|
{
|
|
xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
|
|
|
|
SnapBuildProcessRunningXacts(builder, buf->origptr, running);
|
|
|
|
/*
|
|
* Abort all transactions that we keep track of, that are
|
|
* older than the record's oldestRunningXid. This is the most
|
|
* convenient spot for doing so since, in contrast to shutdown
|
|
* or end-of-recovery checkpoints, we have information about
|
|
* all running transactions which includes prepared ones,
|
|
* while shutdown checkpoints just know that no non-prepared
|
|
* transactions are in progress.
|
|
*/
|
|
ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
|
|
}
|
|
break;
|
|
case XLOG_STANDBY_LOCK:
|
|
break;
|
|
case XLOG_INVALIDATIONS:
|
|
|
|
/*
|
|
* We are processing the invalidations at the command level via
|
|
* XLOG_XACT_INVALIDATIONS. So we don't need to do anything here.
|
|
*/
|
|
break;
|
|
default:
|
|
elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Handle rmgr HEAP2_ID records for LogicalDecodingProcessRecord().
|
|
*/
|
|
void
|
|
heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|
{
|
|
uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
|
|
TransactionId xid = XLogRecGetXid(buf->record);
|
|
SnapBuild *builder = ctx->snapshot_builder;
|
|
|
|
ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
|
|
|
|
/*
|
|
* If we don't have snapshot or we are just fast-forwarding, there is no
|
|
* point in decoding changes.
|
|
*/
|
|
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
|
|
ctx->fast_forward)
|
|
return;
|
|
|
|
switch (info)
|
|
{
|
|
case XLOG_HEAP2_MULTI_INSERT:
|
|
if (SnapBuildProcessChange(builder, xid, buf->origptr))
|
|
DecodeMultiInsert(ctx, buf);
|
|
break;
|
|
case XLOG_HEAP2_NEW_CID:
|
|
{
|
|
xl_heap_new_cid *xlrec;
|
|
|
|
xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
|
|
SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
|
|
|
|
break;
|
|
}
|
|
case XLOG_HEAP2_REWRITE:
|
|
|
|
/*
|
|
* Although these records only exist to serve the needs of logical
|
|
* decoding, all the work happens as part of crash or archive
|
|
* recovery, so we don't need to do anything here.
|
|
*/
|
|
break;
|
|
|
|
/*
|
|
* Everything else here is just low level physical stuff we're not
|
|
* interested in.
|
|
*/
|
|
case XLOG_HEAP2_PRUNE_ON_ACCESS:
|
|
case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
|
|
case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
|
|
case XLOG_HEAP2_VISIBLE:
|
|
case XLOG_HEAP2_LOCK_UPDATED:
|
|
break;
|
|
default:
|
|
elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Handle rmgr HEAP_ID records for LogicalDecodingProcessRecord().
|
|
*/
|
|
void
|
|
heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|
{
|
|
uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
|
|
TransactionId xid = XLogRecGetXid(buf->record);
|
|
SnapBuild *builder = ctx->snapshot_builder;
|
|
|
|
ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
|
|
|
|
/*
|
|
* If we don't have snapshot or we are just fast-forwarding, there is no
|
|
* point in decoding data changes.
|
|
*/
|
|
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
|
|
ctx->fast_forward)
|
|
return;
|
|
|
|
switch (info)
|
|
{
|
|
case XLOG_HEAP_INSERT:
|
|
if (SnapBuildProcessChange(builder, xid, buf->origptr))
|
|
DecodeInsert(ctx, buf);
|
|
break;
|
|
|
|
/*
|
|
* Treat HOT update as normal updates. There is no useful
|
|
* information in the fact that we could make it a HOT update
|
|
* locally and the WAL layout is compatible.
|
|
*/
|
|
case XLOG_HEAP_HOT_UPDATE:
|
|
case XLOG_HEAP_UPDATE:
|
|
if (SnapBuildProcessChange(builder, xid, buf->origptr))
|
|
DecodeUpdate(ctx, buf);
|
|
break;
|
|
|
|
case XLOG_HEAP_DELETE:
|
|
if (SnapBuildProcessChange(builder, xid, buf->origptr))
|
|
DecodeDelete(ctx, buf);
|
|
break;
|
|
|
|
case XLOG_HEAP_TRUNCATE:
|
|
if (SnapBuildProcessChange(builder, xid, buf->origptr))
|
|
DecodeTruncate(ctx, buf);
|
|
break;
|
|
|
|
case XLOG_HEAP_INPLACE:
|
|
|
|
/*
|
|
* Inplace updates are only ever performed on catalog tuples and
|
|
* can, per definition, not change tuple visibility. Since we
|
|
* don't decode catalog tuples, we're not interested in the
|
|
* record's contents.
|
|
*
|
|
* In-place updates can be used either by XID-bearing transactions
|
|
* (e.g. in CREATE INDEX CONCURRENTLY) or by XID-less
|
|
* transactions (e.g. VACUUM). In the former case, the commit
|
|
* record will include cache invalidations, so we mark the
|
|
* transaction as catalog modifying here. Currently that's
|
|
* redundant because the commit will do that as well, but once we
|
|
* support decoding in-progress relations, this will be important.
|
|
*/
|
|
if (!TransactionIdIsValid(xid))
|
|
break;
|
|
|
|
(void) SnapBuildProcessChange(builder, xid, buf->origptr);
|
|
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
|
|
break;
|
|
|
|
case XLOG_HEAP_CONFIRM:
|
|
if (SnapBuildProcessChange(builder, xid, buf->origptr))
|
|
DecodeSpecConfirm(ctx, buf);
|
|
break;
|
|
|
|
case XLOG_HEAP_LOCK:
|
|
/* we don't care about row level locks for now */
|
|
break;
|
|
|
|
default:
|
|
elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
|
|
break;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Ask output plugin whether we want to skip this PREPARE and send
|
|
* this transaction as a regular commit later.
|
|
*/
|
|
static inline bool
|
|
FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
|
|
const char *gid)
|
|
{
|
|
/*
|
|
* Skip if decoding of two-phase transactions at PREPARE time is not
|
|
* enabled. In that case, all two-phase transactions are considered
|
|
* filtered out and will be applied as regular transactions at COMMIT
|
|
* PREPARED.
|
|
*/
|
|
if (!ctx->twophase)
|
|
return true;
|
|
|
|
/*
|
|
* The filter_prepare callback is optional. When not supplied, all
|
|
* prepared transactions should go through.
|
|
*/
|
|
if (ctx->callbacks.filter_prepare_cb == NULL)
|
|
return false;
|
|
|
|
return filter_prepare_cb_wrapper(ctx, xid, gid);
|
|
}
|
|
|
|
static inline bool
|
|
FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
|
|
{
|
|
if (ctx->callbacks.filter_by_origin_cb == NULL)
|
|
return false;
|
|
|
|
return filter_by_origin_cb_wrapper(ctx, origin_id);
|
|
}
|
|
|
|
/*
|
|
* Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
|
|
*/
|
|
void
|
|
logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|
{
|
|
SnapBuild *builder = ctx->snapshot_builder;
|
|
XLogReaderState *r = buf->record;
|
|
TransactionId xid = XLogRecGetXid(r);
|
|
uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
|
|
RepOriginId origin_id = XLogRecGetOrigin(r);
|
|
Snapshot snapshot = NULL;
|
|
xl_logical_message *message;
|
|
|
|
if (info != XLOG_LOGICAL_MESSAGE)
|
|
elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
|
|
|
|
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
|
|
|
|
/* If we don't have snapshot, there is no point in decoding messages */
|
|
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
|
|
return;
|
|
|
|
message = (xl_logical_message *) XLogRecGetData(r);
|
|
|
|
if (message->dbId != ctx->slot->data.database ||
|
|
FilterByOrigin(ctx, origin_id))
|
|
return;
|
|
|
|
if (message->transactional &&
|
|
!SnapBuildProcessChange(builder, xid, buf->origptr))
|
|
return;
|
|
else if (!message->transactional &&
|
|
(SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
|
|
SnapBuildXactNeedsSkip(builder, buf->origptr)))
|
|
return;
|
|
|
|
/*
|
|
* We also skip decoding in fast_forward mode. This check must be last
|
|
* because we don't want to set the processing_required flag unless we
|
|
* have a decodable message.
|
|
*/
|
|
if (ctx->fast_forward)
|
|
{
|
|
/*
|
|
* We need to set processing_required flag to notify the message's
|
|
* existence to the caller. Usually, the flag is set when either the
|
|
* COMMIT or ABORT records are decoded, but this must be turned on
|
|
* here because the non-transactional logical message is decoded
|
|
* without waiting for these records.
|
|
*/
|
|
if (!message->transactional)
|
|
ctx->processing_required = true;
|
|
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* If this is a non-transactional change, get the snapshot we're expected
|
|
* to use. We only get here when the snapshot is consistent, and the
|
|
* change is not meant to be skipped.
|
|
*
|
|
* For transactional changes we don't need a snapshot, we'll use the
|
|
* regular snapshot maintained by ReorderBuffer. We just leave it NULL.
|
|
*/
|
|
if (!message->transactional)
|
|
snapshot = SnapBuildGetOrBuildSnapshot(builder);
|
|
|
|
ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
|
|
message->transactional,
|
|
message->message, /* first part of message is
|
|
* prefix */
|
|
message->message_size,
|
|
message->message + message->prefix_size);
|
|
}
|
|
|
|
/*
|
|
* Consolidated commit record handling between the different form of commit
|
|
* records.
|
|
*
|
|
* 'two_phase' indicates that caller wants to process the transaction in two
|
|
* phases, first process prepare if not already done and then process
|
|
* commit_prepared.
|
|
*/
|
|
static void
|
|
DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
|
xl_xact_parsed_commit *parsed, TransactionId xid,
|
|
bool two_phase)
|
|
{
|
|
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
|
|
TimestampTz commit_time = parsed->xact_time;
|
|
RepOriginId origin_id = XLogRecGetOrigin(buf->record);
|
|
int i;
|
|
|
|
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
|
|
{
|
|
origin_lsn = parsed->origin_lsn;
|
|
commit_time = parsed->origin_timestamp;
|
|
}
|
|
|
|
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
|
|
parsed->nsubxacts, parsed->subxacts,
|
|
parsed->xinfo);
|
|
|
|
/* ----
|
|
* Check whether we are interested in this specific transaction, and tell
|
|
* the reorderbuffer to forget the content of the (sub-)transactions
|
|
* if not.
|
|
*
|
|
* We can't just use ReorderBufferAbort() here, because we need to execute
|
|
* the transaction's invalidations. This currently won't be needed if
|
|
* we're just skipping over the transaction because currently we only do
|
|
* so during startup, to get to the first transaction the client needs. As
|
|
* we have reset the catalog caches before starting to read WAL, and we
|
|
* haven't yet touched any catalogs, there can't be anything to invalidate.
|
|
* But if we're "forgetting" this commit because it happened in another
|
|
* database, the invalidations might be important, because they could be
|
|
* for shared catalogs and we might have loaded data into the relevant
|
|
* syscaches.
|
|
* ---
|
|
*/
|
|
if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
|
|
{
|
|
for (i = 0; i < parsed->nsubxacts; i++)
|
|
{
|
|
ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
|
|
}
|
|
ReorderBufferForget(ctx->reorder, xid, buf->origptr);
|
|
|
|
return;
|
|
}
|
|
|
|
/* tell the reorderbuffer about the surviving subtransactions */
|
|
for (i = 0; i < parsed->nsubxacts; i++)
|
|
{
|
|
ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
|
|
buf->origptr, buf->endptr);
|
|
}
|
|
|
|
/*
|
|
* Send the final commit record if the transaction data is already
|
|
* decoded, otherwise, process the entire transaction.
|
|
*/
|
|
if (two_phase)
|
|
{
|
|
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
|
|
SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
|
|
commit_time, origin_id, origin_lsn,
|
|
parsed->twophase_gid, true);
|
|
}
|
|
else
|
|
{
|
|
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
|
|
commit_time, origin_id, origin_lsn);
|
|
}
|
|
|
|
/*
|
|
* Update the decoding stats at transaction prepare/commit/abort.
|
|
* Additionally we send the stats when we spill or stream the changes to
|
|
* avoid losing them in case the decoding is interrupted. It is not clear
|
|
* that sending more or less frequently than this would be better.
|
|
*/
|
|
UpdateDecodingStats(ctx);
|
|
}
|
|
|
|
/*
|
|
* Decode PREPARE record. Similar logic as in DecodeCommit.
|
|
*
|
|
* Note that we don't skip prepare even if have detected concurrent abort
|
|
* because it is quite possible that we had already sent some changes before we
|
|
* detect abort in which case we need to abort those changes in the subscriber.
|
|
* To abort such changes, we do send the prepare and then the rollback prepared
|
|
* which is what happened on the publisher-side as well. Now, we can invent a
|
|
* new abort API wherein in such cases we send abort and skip sending prepared
|
|
* and rollback prepared but then it is not that straightforward because we
|
|
* might have streamed this transaction by that time in which case it is
|
|
* handled when the rollback is encountered. It is not impossible to optimize
|
|
* the concurrent abort case but it can introduce design complexity w.r.t
|
|
* handling different cases so leaving it for now as it doesn't seem worth it.
|
|
*/
|
|
static void
|
|
DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
|
xl_xact_parsed_prepare *parsed)
|
|
{
|
|
SnapBuild *builder = ctx->snapshot_builder;
|
|
XLogRecPtr origin_lsn = parsed->origin_lsn;
|
|
TimestampTz prepare_time = parsed->xact_time;
|
|
RepOriginId origin_id = XLogRecGetOrigin(buf->record);
|
|
int i;
|
|
TransactionId xid = parsed->twophase_xid;
|
|
|
|
if (parsed->origin_timestamp != 0)
|
|
prepare_time = parsed->origin_timestamp;
|
|
|
|
/*
|
|
* Remember the prepare info for a txn so that it can be used later in
|
|
* commit prepared if required. See ReorderBufferFinishPrepared.
|
|
*/
|
|
if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
|
|
buf->endptr, prepare_time, origin_id,
|
|
origin_lsn))
|
|
return;
|
|
|
|
/* We can't start streaming unless a consistent state is reached. */
|
|
if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
|
|
{
|
|
ReorderBufferSkipPrepare(ctx->reorder, xid);
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* Check whether we need to process this transaction. See
|
|
* DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
|
|
* transaction.
|
|
*
|
|
* We can't call ReorderBufferForget as we did in DecodeCommit as the txn
|
|
* hasn't yet been committed, removing this txn before a commit might
|
|
* result in the computation of an incorrect restart_lsn. See
|
|
* SnapBuildProcessRunningXacts. But we need to process cache
|
|
* invalidations if there are any for the reasons mentioned in
|
|
* DecodeCommit.
|
|
*/
|
|
if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
|
|
{
|
|
ReorderBufferSkipPrepare(ctx->reorder, xid);
|
|
ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
|
|
return;
|
|
}
|
|
|
|
/* Tell the reorderbuffer about the surviving subtransactions. */
|
|
for (i = 0; i < parsed->nsubxacts; i++)
|
|
{
|
|
ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
|
|
buf->origptr, buf->endptr);
|
|
}
|
|
|
|
/* replay actions of all transaction + subtransactions in order */
|
|
ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
|
|
|
|
/*
|
|
* Update the decoding stats at transaction prepare/commit/abort.
|
|
* Additionally we send the stats when we spill or stream the changes to
|
|
* avoid losing them in case the decoding is interrupted. It is not clear
|
|
* that sending more or less frequently than this would be better.
|
|
*/
|
|
UpdateDecodingStats(ctx);
|
|
}
|
|
|
|
|
|
/*
|
|
* Get the data from the various forms of abort records and pass it on to
|
|
* snapbuild.c and reorderbuffer.c.
|
|
*
|
|
* 'two_phase' indicates to finish prepared transaction.
|
|
*/
|
|
static void
|
|
DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
|
xl_xact_parsed_abort *parsed, TransactionId xid,
|
|
bool two_phase)
|
|
{
|
|
int i;
|
|
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
|
|
TimestampTz abort_time = parsed->xact_time;
|
|
RepOriginId origin_id = XLogRecGetOrigin(buf->record);
|
|
bool skip_xact;
|
|
|
|
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
|
|
{
|
|
origin_lsn = parsed->origin_lsn;
|
|
abort_time = parsed->origin_timestamp;
|
|
}
|
|
|
|
/*
|
|
* Check whether we need to process this transaction. See
|
|
* DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
|
|
* transaction.
|
|
*/
|
|
skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
|
|
|
|
/*
|
|
* Send the final rollback record for a prepared transaction unless we
|
|
* need to skip it. For non-two-phase xacts, simply forget the xact.
|
|
*/
|
|
if (two_phase && !skip_xact)
|
|
{
|
|
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
|
|
InvalidXLogRecPtr,
|
|
abort_time, origin_id, origin_lsn,
|
|
parsed->twophase_gid, false);
|
|
}
|
|
else
|
|
{
|
|
for (i = 0; i < parsed->nsubxacts; i++)
|
|
{
|
|
ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
|
|
buf->record->EndRecPtr, abort_time);
|
|
}
|
|
|
|
ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr,
|
|
abort_time);
|
|
}
|
|
|
|
/* update the decoding stats */
|
|
UpdateDecodingStats(ctx);
|
|
}
|
|
|
|
/*
|
|
* Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
|
|
*
|
|
* Inserts can contain the new tuple.
|
|
*/
|
|
static void
|
|
DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|
{
|
|
Size datalen;
|
|
char *tupledata;
|
|
Size tuplelen;
|
|
XLogReaderState *r = buf->record;
|
|
xl_heap_insert *xlrec;
|
|
ReorderBufferChange *change;
|
|
RelFileLocator target_locator;
|
|
|
|
xlrec = (xl_heap_insert *) XLogRecGetData(r);
|
|
|
|
/*
|
|
* Ignore insert records without new tuples (this does happen when
|
|
* raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
|
|
*/
|
|
if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
|
|
return;
|
|
|
|
/* only interested in our database */
|
|
XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
|
|
if (target_locator.dbOid != ctx->slot->data.database)
|
|
return;
|
|
|
|
/* output plugin doesn't look for this origin, no need to queue */
|
|
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
|
|
return;
|
|
|
|
change = ReorderBufferGetChange(ctx->reorder);
|
|
if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
|
|
change->action = REORDER_BUFFER_CHANGE_INSERT;
|
|
else
|
|
change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
|
|
change->origin_id = XLogRecGetOrigin(r);
|
|
|
|
memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
|
|
|
|
tupledata = XLogRecGetBlockData(r, 0, &datalen);
|
|
tuplelen = datalen - SizeOfHeapHeader;
|
|
|
|
change->data.tp.newtuple =
|
|
ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
|
|
|
|
DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
|
|
|
|
change->data.tp.clear_toast_afterwards = true;
|
|
|
|
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
|
|
change,
|
|
xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
|
|
}
|
|
|
|
/*
|
|
* Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
|
|
* in the record, from wal into proper tuplebufs.
|
|
*
|
|
* Updates can possibly contain a new tuple and the old primary key.
|
|
*/
|
|
static void
|
|
DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|
{
|
|
XLogReaderState *r = buf->record;
|
|
xl_heap_update *xlrec;
|
|
ReorderBufferChange *change;
|
|
char *data;
|
|
RelFileLocator target_locator;
|
|
|
|
xlrec = (xl_heap_update *) XLogRecGetData(r);
|
|
|
|
/* only interested in our database */
|
|
XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
|
|
if (target_locator.dbOid != ctx->slot->data.database)
|
|
return;
|
|
|
|
/* output plugin doesn't look for this origin, no need to queue */
|
|
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
|
|
return;
|
|
|
|
change = ReorderBufferGetChange(ctx->reorder);
|
|
change->action = REORDER_BUFFER_CHANGE_UPDATE;
|
|
change->origin_id = XLogRecGetOrigin(r);
|
|
memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
|
|
|
|
if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
|
|
{
|
|
Size datalen;
|
|
Size tuplelen;
|
|
|
|
data = XLogRecGetBlockData(r, 0, &datalen);
|
|
|
|
tuplelen = datalen - SizeOfHeapHeader;
|
|
|
|
change->data.tp.newtuple =
|
|
ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
|
|
|
|
DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
|
|
}
|
|
|
|
if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
|
|
{
|
|
Size datalen;
|
|
Size tuplelen;
|
|
|
|
/* caution, remaining data in record is not aligned */
|
|
data = XLogRecGetData(r) + SizeOfHeapUpdate;
|
|
datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
|
|
tuplelen = datalen - SizeOfHeapHeader;
|
|
|
|
change->data.tp.oldtuple =
|
|
ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
|
|
|
|
DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
|
|
}
|
|
|
|
change->data.tp.clear_toast_afterwards = true;
|
|
|
|
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
|
|
change, false);
|
|
}
|
|
|
|
/*
|
|
* Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
|
|
*
|
|
* Deletes can possibly contain the old primary key.
|
|
*/
|
|
static void
|
|
DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|
{
|
|
XLogReaderState *r = buf->record;
|
|
xl_heap_delete *xlrec;
|
|
ReorderBufferChange *change;
|
|
RelFileLocator target_locator;
|
|
|
|
xlrec = (xl_heap_delete *) XLogRecGetData(r);
|
|
|
|
/* only interested in our database */
|
|
XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
|
|
if (target_locator.dbOid != ctx->slot->data.database)
|
|
return;
|
|
|
|
/* output plugin doesn't look for this origin, no need to queue */
|
|
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
|
|
return;
|
|
|
|
change = ReorderBufferGetChange(ctx->reorder);
|
|
|
|
if (xlrec->flags & XLH_DELETE_IS_SUPER)
|
|
change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
|
|
else
|
|
change->action = REORDER_BUFFER_CHANGE_DELETE;
|
|
|
|
change->origin_id = XLogRecGetOrigin(r);
|
|
|
|
memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
|
|
|
|
/* old primary key stored */
|
|
if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
|
|
{
|
|
Size datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
|
|
Size tuplelen = datalen - SizeOfHeapHeader;
|
|
|
|
Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
|
|
|
|
change->data.tp.oldtuple =
|
|
ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
|
|
|
|
DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
|
|
datalen, change->data.tp.oldtuple);
|
|
}
|
|
|
|
change->data.tp.clear_toast_afterwards = true;
|
|
|
|
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
|
|
change, false);
|
|
}
|
|
|
|
/*
|
|
* Parse XLOG_HEAP_TRUNCATE from wal
|
|
*/
|
|
static void
|
|
DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|
{
|
|
XLogReaderState *r = buf->record;
|
|
xl_heap_truncate *xlrec;
|
|
ReorderBufferChange *change;
|
|
|
|
xlrec = (xl_heap_truncate *) XLogRecGetData(r);
|
|
|
|
/* only interested in our database */
|
|
if (xlrec->dbId != ctx->slot->data.database)
|
|
return;
|
|
|
|
/* output plugin doesn't look for this origin, no need to queue */
|
|
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
|
|
return;
|
|
|
|
change = ReorderBufferGetChange(ctx->reorder);
|
|
change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
|
|
change->origin_id = XLogRecGetOrigin(r);
|
|
if (xlrec->flags & XLH_TRUNCATE_CASCADE)
|
|
change->data.truncate.cascade = true;
|
|
if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
|
|
change->data.truncate.restart_seqs = true;
|
|
change->data.truncate.nrelids = xlrec->nrelids;
|
|
change->data.truncate.relids = ReorderBufferGetRelids(ctx->reorder,
|
|
xlrec->nrelids);
|
|
memcpy(change->data.truncate.relids, xlrec->relids,
|
|
xlrec->nrelids * sizeof(Oid));
|
|
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
|
|
buf->origptr, change, false);
|
|
}
|
|
|
|
/*
|
|
* Decode XLOG_HEAP2_MULTI_INSERT record into multiple tuplebufs.
|
|
*
|
|
* Currently MULTI_INSERT will always contain the full tuples.
|
|
*/
|
|
static void
|
|
DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|
{
|
|
XLogReaderState *r = buf->record;
|
|
xl_heap_multi_insert *xlrec;
|
|
int i;
|
|
char *data;
|
|
char *tupledata;
|
|
Size tuplelen;
|
|
RelFileLocator rlocator;
|
|
|
|
xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
|
|
|
|
/*
|
|
* Ignore insert records without new tuples. This happens when a
|
|
* multi_insert is done on a catalog or on a non-persistent relation.
|
|
*/
|
|
if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
|
|
return;
|
|
|
|
/* only interested in our database */
|
|
XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL);
|
|
if (rlocator.dbOid != ctx->slot->data.database)
|
|
return;
|
|
|
|
/* output plugin doesn't look for this origin, no need to queue */
|
|
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
|
|
return;
|
|
|
|
/*
|
|
* We know that this multi_insert isn't for a catalog, so the block should
|
|
* always have data even if a full-page write of it is taken.
|
|
*/
|
|
tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
|
|
Assert(tupledata != NULL);
|
|
|
|
data = tupledata;
|
|
for (i = 0; i < xlrec->ntuples; i++)
|
|
{
|
|
ReorderBufferChange *change;
|
|
xl_multi_insert_tuple *xlhdr;
|
|
int datalen;
|
|
HeapTuple tuple;
|
|
HeapTupleHeader header;
|
|
|
|
change = ReorderBufferGetChange(ctx->reorder);
|
|
change->action = REORDER_BUFFER_CHANGE_INSERT;
|
|
change->origin_id = XLogRecGetOrigin(r);
|
|
|
|
memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator));
|
|
|
|
xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
|
|
data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
|
|
datalen = xlhdr->datalen;
|
|
|
|
change->data.tp.newtuple =
|
|
ReorderBufferGetTupleBuf(ctx->reorder, datalen);
|
|
|
|
tuple = change->data.tp.newtuple;
|
|
header = tuple->t_data;
|
|
|
|
/* not a disk based tuple */
|
|
ItemPointerSetInvalid(&tuple->t_self);
|
|
|
|
/*
|
|
* We can only figure this out after reassembling the transactions.
|
|
*/
|
|
tuple->t_tableOid = InvalidOid;
|
|
|
|
tuple->t_len = datalen + SizeofHeapTupleHeader;
|
|
|
|
memset(header, 0, SizeofHeapTupleHeader);
|
|
|
|
memcpy((char *) tuple->t_data + SizeofHeapTupleHeader,
|
|
(char *) data,
|
|
datalen);
|
|
header->t_infomask = xlhdr->t_infomask;
|
|
header->t_infomask2 = xlhdr->t_infomask2;
|
|
header->t_hoff = xlhdr->t_hoff;
|
|
|
|
/*
|
|
* Reset toast reassembly state only after the last row in the last
|
|
* xl_multi_insert_tuple record emitted by one heap_multi_insert()
|
|
* call.
|
|
*/
|
|
if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
|
|
(i + 1) == xlrec->ntuples)
|
|
change->data.tp.clear_toast_afterwards = true;
|
|
else
|
|
change->data.tp.clear_toast_afterwards = false;
|
|
|
|
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
|
|
buf->origptr, change, false);
|
|
|
|
/* move to the next xl_multi_insert_tuple entry */
|
|
data += datalen;
|
|
}
|
|
Assert(data == tupledata + tuplelen);
|
|
}
|
|
|
|
/*
|
|
* Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
|
|
*
|
|
* This is pretty trivial, all the state essentially already setup by the
|
|
* speculative insertion.
|
|
*/
|
|
static void
|
|
DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|
{
|
|
XLogReaderState *r = buf->record;
|
|
ReorderBufferChange *change;
|
|
RelFileLocator target_locator;
|
|
|
|
/* only interested in our database */
|
|
XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
|
|
if (target_locator.dbOid != ctx->slot->data.database)
|
|
return;
|
|
|
|
/* output plugin doesn't look for this origin, no need to queue */
|
|
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
|
|
return;
|
|
|
|
change = ReorderBufferGetChange(ctx->reorder);
|
|
change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
|
|
change->origin_id = XLogRecGetOrigin(r);
|
|
|
|
memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
|
|
|
|
change->data.tp.clear_toast_afterwards = true;
|
|
|
|
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
|
|
change, false);
|
|
}
|
|
|
|
|
|
/*
|
|
* Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
|
|
* (but not by heap_multi_insert) into a tuplebuf.
|
|
*
|
|
* The size 'len' and the pointer 'data' in the record need to be
|
|
* computed outside as they are record specific.
|
|
*/
|
|
static void
|
|
DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
|
|
{
|
|
xl_heap_header xlhdr;
|
|
int datalen = len - SizeOfHeapHeader;
|
|
HeapTupleHeader header;
|
|
|
|
Assert(datalen >= 0);
|
|
|
|
tuple->t_len = datalen + SizeofHeapTupleHeader;
|
|
header = tuple->t_data;
|
|
|
|
/* not a disk based tuple */
|
|
ItemPointerSetInvalid(&tuple->t_self);
|
|
|
|
/* we can only figure this out after reassembling the transactions */
|
|
tuple->t_tableOid = InvalidOid;
|
|
|
|
/* data is not stored aligned, copy to aligned storage */
|
|
memcpy((char *) &xlhdr,
|
|
data,
|
|
SizeOfHeapHeader);
|
|
|
|
memset(header, 0, SizeofHeapTupleHeader);
|
|
|
|
memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader,
|
|
data + SizeOfHeapHeader,
|
|
datalen);
|
|
|
|
header->t_infomask = xlhdr.t_infomask;
|
|
header->t_infomask2 = xlhdr.t_infomask2;
|
|
header->t_hoff = xlhdr.t_hoff;
|
|
}
|
|
|
|
/*
|
|
* Check whether we are interested in this specific transaction.
|
|
*
|
|
* There can be several reasons we might not be interested in this
|
|
* transaction:
|
|
* 1) We might not be interested in decoding transactions up to this
|
|
* LSN. This can happen because we previously decoded it and now just
|
|
* are restarting or if we haven't assembled a consistent snapshot yet.
|
|
* 2) The transaction happened in another database.
|
|
* 3) The output plugin is not interested in the origin.
|
|
* 4) We are doing fast-forwarding
|
|
*/
|
|
static bool
|
|
DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
|
Oid txn_dbid, RepOriginId origin_id)
|
|
{
|
|
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
|
|
(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
|
|
FilterByOrigin(ctx, origin_id))
|
|
return true;
|
|
|
|
/*
|
|
* We also skip decoding in fast_forward mode. In passing set the
|
|
* processing_required flag to indicate that if it were not for
|
|
* fast_forward mode, processing would have been required.
|
|
*/
|
|
if (ctx->fast_forward)
|
|
{
|
|
ctx->processing_required = true;
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|