mirror of
https://github.com/postgres/postgres.git
synced 2025-11-10 17:42:29 +03:00
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log stream to be decoded into a series of logical changes; that is, inserts, updates, and deletes and the transactions which contain them. It is capable of handling decoding even across changes to the schema of the effected tables. The output format is controlled by a so-called "output plugin"; an example is included. To make use of this in a real replication system, the output plugin will need to be modified to produce output in the format appropriate to that system, and to perform filtering. Currently, information can be extracted from the logical decoding system only via SQL; future commits will add the ability to stream changes via walsender. Andres Freund, with review and other contributions from many other people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan, Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve Singer.
This commit is contained in:
@@ -17,6 +17,8 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
|
||||
OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
|
||||
repl_gram.o slot.o slotfuncs.o syncrep.o
|
||||
|
||||
SUBDIRS = logical
|
||||
|
||||
include $(top_srcdir)/src/backend/common.mk
|
||||
|
||||
# repl_scanner is compiled as part of repl_gram
|
||||
|
||||
19
src/backend/replication/logical/Makefile
Normal file
19
src/backend/replication/logical/Makefile
Normal file
@@ -0,0 +1,19 @@
|
||||
#-------------------------------------------------------------------------
|
||||
#
|
||||
# Makefile--
|
||||
# Makefile for src/backend/replication/logical
|
||||
#
|
||||
# IDENTIFICATION
|
||||
# src/backend/replication/logical/Makefile
|
||||
#
|
||||
#-------------------------------------------------------------------------
|
||||
|
||||
subdir = src/backend/replication/logical
|
||||
top_builddir = ../../../..
|
||||
include $(top_builddir)/src/Makefile.global
|
||||
|
||||
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
|
||||
|
||||
OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o snapbuild.o
|
||||
|
||||
include $(top_srcdir)/src/backend/common.mk
|
||||
826
src/backend/replication/logical/decode.c
Normal file
826
src/backend/replication/logical/decode.c
Normal file
@@ -0,0 +1,826 @@
|
||||
/* -------------------------------------------------------------------------
|
||||
*
|
||||
* decode.c
|
||||
* This module decodes WAL records read using xlogreader.h's APIs for the
|
||||
* purpose of logical decoding by passing information to the
|
||||
* reorderbuffer module (containing the actual changes) and to the
|
||||
* snapbuild module to build a fitting catalog snapshot (to be able to
|
||||
* properly decode the changes in the reorderbuffer).
|
||||
*
|
||||
* NOTE:
|
||||
* This basically tries to handle all low level xlog stuff for
|
||||
* reorderbuffer.c and snapbuild.c. There's some minor leakage where a
|
||||
* specific record's struct is used to pass data along, but those just
|
||||
* happen to contain the right amount of data in a convenient
|
||||
* format. There isn't and shouldn't be much intelligence about the
|
||||
* contents of records in here except turning them into a more usable
|
||||
* format.
|
||||
*
|
||||
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/replication/logical/decode.c
|
||||
*
|
||||
* -------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/heapam.h"
|
||||
#include "access/heapam_xlog.h"
|
||||
#include "access/transam.h"
|
||||
#include "access/xact.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "access/xlogreader.h"
|
||||
|
||||
#include "catalog/pg_control.h"
|
||||
|
||||
#include "replication/decode.h"
|
||||
#include "replication/logical.h"
|
||||
#include "replication/reorderbuffer.h"
|
||||
#include "replication/snapbuild.h"
|
||||
|
||||
#include "storage/standby.h"
|
||||
|
||||
typedef struct XLogRecordBuffer
|
||||
{
|
||||
XLogRecPtr origptr;
|
||||
XLogRecPtr endptr;
|
||||
XLogRecord record;
|
||||
char *record_data;
|
||||
} XLogRecordBuffer;
|
||||
|
||||
/* RMGR Handlers */
|
||||
static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||
static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||
static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||
static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||
static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||
|
||||
/* individual record(group)'s handlers */
|
||||
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||
static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
||||
TransactionId xid, Oid dboid,
|
||||
TimestampTz commit_time,
|
||||
int nsubxacts, TransactionId *sub_xids,
|
||||
int ninval_msgs, SharedInvalidationMessage *msg);
|
||||
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn,
|
||||
TransactionId xid, TransactionId *sub_xids, int nsubxacts);
|
||||
|
||||
/* common function to decode tuples */
|
||||
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
|
||||
|
||||
/*
|
||||
* Take every XLogReadRecord()ed record and perform the actions required to
|
||||
* decode it using the output plugin already setup in the logical decoding
|
||||
* context.
|
||||
*/
|
||||
void
|
||||
LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record)
|
||||
{
|
||||
XLogRecordBuffer buf;
|
||||
|
||||
buf.origptr = ctx->reader->ReadRecPtr;
|
||||
buf.endptr = ctx->reader->EndRecPtr;
|
||||
buf.record = *record;
|
||||
buf.record_data = XLogRecGetData(record);
|
||||
|
||||
/* cast so we get a warning when new rmgrs are added */
|
||||
switch ((RmgrIds) buf.record.xl_rmid)
|
||||
{
|
||||
/*
|
||||
* Rmgrs we care about for logical decoding. Add new rmgrs in
|
||||
* rmgrlist.h's order.
|
||||
*/
|
||||
case RM_XLOG_ID:
|
||||
DecodeXLogOp(ctx, &buf);
|
||||
break;
|
||||
|
||||
case RM_XACT_ID:
|
||||
DecodeXactOp(ctx, &buf);
|
||||
break;
|
||||
|
||||
case RM_STANDBY_ID:
|
||||
DecodeStandbyOp(ctx, &buf);
|
||||
break;
|
||||
|
||||
case RM_HEAP2_ID:
|
||||
DecodeHeap2Op(ctx, &buf);
|
||||
break;
|
||||
|
||||
case RM_HEAP_ID:
|
||||
DecodeHeapOp(ctx, &buf);
|
||||
break;
|
||||
|
||||
/*
|
||||
* Rmgrs irrelevant for logical decoding; they describe stuff not
|
||||
* represented in logical decoding. Add new rmgrs in rmgrlist.h's
|
||||
* order.
|
||||
*/
|
||||
case RM_SMGR_ID:
|
||||
case RM_CLOG_ID:
|
||||
case RM_DBASE_ID:
|
||||
case RM_TBLSPC_ID:
|
||||
case RM_MULTIXACT_ID:
|
||||
case RM_RELMAP_ID:
|
||||
case RM_BTREE_ID:
|
||||
case RM_HASH_ID:
|
||||
case RM_GIN_ID:
|
||||
case RM_GIST_ID:
|
||||
case RM_SEQ_ID:
|
||||
case RM_SPGIST_ID:
|
||||
break;
|
||||
case RM_NEXT_ID:
|
||||
elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) buf.record.xl_rmid);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
|
||||
*/
|
||||
static void
|
||||
DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
{
|
||||
SnapBuild *builder = ctx->snapshot_builder;
|
||||
uint8 info = buf->record.xl_info & ~XLR_INFO_MASK;
|
||||
|
||||
switch (info)
|
||||
{
|
||||
/* this is also used in END_OF_RECOVERY checkpoints */
|
||||
case XLOG_CHECKPOINT_SHUTDOWN:
|
||||
case XLOG_END_OF_RECOVERY:
|
||||
SnapBuildSerializationPoint(builder, buf->origptr);
|
||||
|
||||
break;
|
||||
case XLOG_CHECKPOINT_ONLINE:
|
||||
/*
|
||||
* a RUNNING_XACTS record will have been logged near to this, we
|
||||
* can restart from there.
|
||||
*/
|
||||
break;
|
||||
case XLOG_NOOP:
|
||||
case XLOG_NEXTOID:
|
||||
case XLOG_SWITCH:
|
||||
case XLOG_BACKUP_END:
|
||||
case XLOG_PARAMETER_CHANGE:
|
||||
case XLOG_RESTORE_POINT:
|
||||
case XLOG_FPW_CHANGE:
|
||||
case XLOG_FPI:
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer().
|
||||
*/
|
||||
static void
|
||||
DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
{
|
||||
SnapBuild *builder = ctx->snapshot_builder;
|
||||
ReorderBuffer *reorder = ctx->reorder;
|
||||
XLogRecord *r = &buf->record;
|
||||
uint8 info = r->xl_info & ~XLR_INFO_MASK;
|
||||
|
||||
/* no point in doing anything yet, data could not be decoded anyway */
|
||||
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
|
||||
return;
|
||||
|
||||
switch (info)
|
||||
{
|
||||
case XLOG_XACT_COMMIT:
|
||||
{
|
||||
xl_xact_commit *xlrec;
|
||||
TransactionId *subxacts = NULL;
|
||||
SharedInvalidationMessage *invals = NULL;
|
||||
|
||||
xlrec = (xl_xact_commit *) buf->record_data;
|
||||
|
||||
subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
|
||||
invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
|
||||
|
||||
DecodeCommit(ctx, buf, r->xl_xid, xlrec->dbId,
|
||||
xlrec->xact_time,
|
||||
xlrec->nsubxacts, subxacts,
|
||||
xlrec->nmsgs, invals);
|
||||
|
||||
break;
|
||||
}
|
||||
case XLOG_XACT_COMMIT_PREPARED:
|
||||
{
|
||||
xl_xact_commit_prepared *prec;
|
||||
xl_xact_commit *xlrec;
|
||||
TransactionId *subxacts;
|
||||
SharedInvalidationMessage *invals = NULL;
|
||||
|
||||
/* Prepared commits contain a normal commit record... */
|
||||
prec = (xl_xact_commit_prepared *) buf->record_data;
|
||||
xlrec = &prec->crec;
|
||||
|
||||
subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
|
||||
invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
|
||||
|
||||
DecodeCommit(ctx, buf, r->xl_xid, xlrec->dbId,
|
||||
xlrec->xact_time,
|
||||
xlrec->nsubxacts, subxacts,
|
||||
xlrec->nmsgs, invals);
|
||||
|
||||
break;
|
||||
}
|
||||
case XLOG_XACT_COMMIT_COMPACT:
|
||||
{
|
||||
xl_xact_commit_compact *xlrec;
|
||||
|
||||
xlrec = (xl_xact_commit_compact *) buf->record_data;
|
||||
|
||||
DecodeCommit(ctx, buf, r->xl_xid, InvalidOid,
|
||||
xlrec->xact_time,
|
||||
xlrec->nsubxacts, xlrec->subxacts,
|
||||
0, NULL);
|
||||
break;
|
||||
}
|
||||
case XLOG_XACT_ABORT:
|
||||
{
|
||||
xl_xact_abort *xlrec;
|
||||
TransactionId *sub_xids;
|
||||
|
||||
xlrec = (xl_xact_abort *) buf->record_data;
|
||||
|
||||
sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
|
||||
|
||||
DecodeAbort(ctx, buf->origptr, r->xl_xid,
|
||||
sub_xids, xlrec->nsubxacts);
|
||||
break;
|
||||
}
|
||||
case XLOG_XACT_ABORT_PREPARED:
|
||||
{
|
||||
xl_xact_abort_prepared *prec;
|
||||
xl_xact_abort *xlrec;
|
||||
TransactionId *sub_xids;
|
||||
|
||||
/* prepared abort contain a normal commit abort... */
|
||||
prec = (xl_xact_abort_prepared *) buf->record_data;
|
||||
xlrec = &prec->arec;
|
||||
|
||||
sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
|
||||
|
||||
/* r->xl_xid is committed in a separate record */
|
||||
DecodeAbort(ctx, buf->origptr, prec->xid,
|
||||
sub_xids, xlrec->nsubxacts);
|
||||
break;
|
||||
}
|
||||
|
||||
case XLOG_XACT_ASSIGNMENT:
|
||||
{
|
||||
xl_xact_assignment *xlrec;
|
||||
int i;
|
||||
TransactionId *sub_xid;
|
||||
|
||||
xlrec = (xl_xact_assignment *) buf->record_data;
|
||||
|
||||
sub_xid = &xlrec->xsub[0];
|
||||
|
||||
for (i = 0; i < xlrec->nsubxacts; i++)
|
||||
{
|
||||
ReorderBufferAssignChild(reorder, xlrec->xtop,
|
||||
*(sub_xid++), buf->origptr);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case XLOG_XACT_PREPARE:
|
||||
/*
|
||||
* Currently decoding ignores PREPARE TRANSACTION and will just
|
||||
* decode the transaction when the COMMIT PREPARED is sent or
|
||||
* throw away the transaction's contents when a ROLLBACK PREPARED
|
||||
* is received. In the future we could add code to expose prepared
|
||||
* transactions in the changestream allowing for a kind of
|
||||
* distributed 2PC.
|
||||
*/
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer().
|
||||
*/
|
||||
static void
|
||||
DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
{
|
||||
SnapBuild *builder = ctx->snapshot_builder;
|
||||
XLogRecord *r = &buf->record;
|
||||
uint8 info = r->xl_info & ~XLR_INFO_MASK;
|
||||
|
||||
switch (info)
|
||||
{
|
||||
case XLOG_RUNNING_XACTS:
|
||||
{
|
||||
xl_running_xacts *running = (xl_running_xacts *) buf->record_data;
|
||||
SnapBuildProcessRunningXacts(builder, buf->origptr, running);
|
||||
/*
|
||||
* Abort all transactions that we keep track of, that are
|
||||
* older than the record's oldestRunningXid. This is the most
|
||||
* convenient spot for doing so since, in contrast to shutdown
|
||||
* or end-of-recovery checkpoints, we have information about
|
||||
* all running transactions which includes prepared ones,
|
||||
* while shutdown checkpoints just know that no non-prepared
|
||||
* transactions are in progress.
|
||||
*/
|
||||
ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
|
||||
}
|
||||
break;
|
||||
case XLOG_STANDBY_LOCK:
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
|
||||
*/
|
||||
static void
|
||||
DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
{
|
||||
uint8 info = buf->record.xl_info & XLOG_HEAP_OPMASK;
|
||||
TransactionId xid = buf->record.xl_xid;
|
||||
SnapBuild *builder = ctx->snapshot_builder;
|
||||
|
||||
/* no point in doing anything yet */
|
||||
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
|
||||
return;
|
||||
|
||||
switch (info)
|
||||
{
|
||||
case XLOG_HEAP2_MULTI_INSERT:
|
||||
if (SnapBuildProcessChange(builder, xid, buf->origptr))
|
||||
DecodeMultiInsert(ctx, buf);
|
||||
break;
|
||||
case XLOG_HEAP2_NEW_CID:
|
||||
{
|
||||
xl_heap_new_cid *xlrec;
|
||||
xlrec = (xl_heap_new_cid *) buf->record_data;
|
||||
SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
|
||||
|
||||
break;
|
||||
}
|
||||
case XLOG_HEAP2_REWRITE:
|
||||
/*
|
||||
* Although these records only exist to serve the needs of logical
|
||||
* decoding, all the work happens as part of crash or archive
|
||||
* recovery, so we don't need to do anything here.
|
||||
*/
|
||||
break;
|
||||
/*
|
||||
* Everything else here is just low level physical stuff we're
|
||||
* not interested in.
|
||||
*/
|
||||
case XLOG_HEAP2_FREEZE_PAGE:
|
||||
case XLOG_HEAP2_CLEAN:
|
||||
case XLOG_HEAP2_CLEANUP_INFO:
|
||||
case XLOG_HEAP2_VISIBLE:
|
||||
case XLOG_HEAP2_LOCK_UPDATED:
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer().
|
||||
*/
|
||||
static void
|
||||
DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
{
|
||||
uint8 info = buf->record.xl_info & XLOG_HEAP_OPMASK;
|
||||
TransactionId xid = buf->record.xl_xid;
|
||||
SnapBuild *builder = ctx->snapshot_builder;
|
||||
|
||||
/* no point in doing anything yet */
|
||||
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
|
||||
return;
|
||||
|
||||
switch (info)
|
||||
{
|
||||
case XLOG_HEAP_INSERT:
|
||||
if (SnapBuildProcessChange(builder, xid, buf->origptr))
|
||||
DecodeInsert(ctx, buf);
|
||||
break;
|
||||
|
||||
/*
|
||||
* Treat HOT update as normal updates. There is no useful
|
||||
* information in the fact that we could make it a HOT update
|
||||
* locally and the WAL layout is compatible.
|
||||
*/
|
||||
case XLOG_HEAP_HOT_UPDATE:
|
||||
case XLOG_HEAP_UPDATE:
|
||||
if (SnapBuildProcessChange(builder, xid, buf->origptr))
|
||||
DecodeUpdate(ctx, buf);
|
||||
break;
|
||||
|
||||
case XLOG_HEAP_DELETE:
|
||||
if (SnapBuildProcessChange(builder, xid, buf->origptr))
|
||||
DecodeDelete(ctx, buf);
|
||||
break;
|
||||
|
||||
case XLOG_HEAP_NEWPAGE:
|
||||
/*
|
||||
* This is only used in places like indexams and CLUSTER which
|
||||
* don't contain changes relevant for logical replication.
|
||||
*/
|
||||
break;
|
||||
|
||||
case XLOG_HEAP_INPLACE:
|
||||
/*
|
||||
* Inplace updates are only ever performed on catalog tuples and
|
||||
* can, per definition, not change tuple visibility. Since we
|
||||
* don't decode catalog tuples, we're not interested in the
|
||||
* record's contents.
|
||||
*
|
||||
* In-place updates can be used either by XID-bearing transactions
|
||||
* (e.g. in CREATE INDEX CONCURRENTLY) or by XID-less
|
||||
* transactions (e.g. VACUUM). In the former case, the commit
|
||||
* record will include cache invalidations, so we mark the
|
||||
* transaction as catalog modifying here. Currently that's
|
||||
* redundant because the commit will do that as well, but once we
|
||||
* support decoding in-progress relations, this will be important.
|
||||
*/
|
||||
if (!TransactionIdIsValid(xid))
|
||||
break;
|
||||
|
||||
SnapBuildProcessChange(builder, xid, buf->origptr);
|
||||
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
|
||||
break;
|
||||
|
||||
case XLOG_HEAP_LOCK:
|
||||
/* we don't care about row level locks for now */
|
||||
break;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Consolidated commit record handling between the different form of commit
|
||||
* records.
|
||||
*/
|
||||
static void
|
||||
DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
||||
TransactionId xid, Oid dboid,
|
||||
TimestampTz commit_time,
|
||||
int nsubxacts, TransactionId *sub_xids,
|
||||
int ninval_msgs, SharedInvalidationMessage *msgs)
|
||||
{
|
||||
int i;
|
||||
|
||||
/*
|
||||
* Process invalidation messages, even if we're not interested in the
|
||||
* transaction's contents, since the various caches need to always be
|
||||
* consistent.
|
||||
*/
|
||||
if (ninval_msgs > 0)
|
||||
{
|
||||
ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
|
||||
ninval_msgs, msgs);
|
||||
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
|
||||
}
|
||||
|
||||
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
|
||||
nsubxacts, sub_xids);
|
||||
|
||||
/* ----
|
||||
* Check whether we are interested in this specific transaction, and tell
|
||||
* the the reorderbuffer to forget the content of the (sub-)transactions
|
||||
* if not.
|
||||
*
|
||||
* There basically two reasons we might not be interested in this
|
||||
* transaction:
|
||||
* 1) We might not be interested in decoding transactions up to this
|
||||
* LSN. This can happen because we previously decoded it and now just
|
||||
* are restarting or if we haven't assembled a consistent snapshot yet.
|
||||
* 2) The transaction happened in another database.
|
||||
*
|
||||
* We can't just use ReorderBufferAbort() here, because we need to execute
|
||||
* the transaction's invalidations. This currently won't be needed if
|
||||
* we're just skipping over the transaction because currently we only do
|
||||
* so during startup, to get to the first transaction the client needs. As
|
||||
* we have reset the catalog caches before starting to read WAL, and we
|
||||
* haven't yet touched any catalogs, there can't be anything to invalidate.
|
||||
* But if we're "forgetting" this commit because it's it happened in
|
||||
* another database, the invalidations might be important, because they
|
||||
* could be for shared catalogs and we might have loaded data into the
|
||||
* relevant syscaches.
|
||||
* ---
|
||||
*/
|
||||
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
|
||||
(dboid != InvalidOid && dboid != ctx->slot->data.database))
|
||||
{
|
||||
for (i = 0; i < nsubxacts; i++)
|
||||
{
|
||||
ReorderBufferForget(ctx->reorder, *sub_xids, buf->origptr);
|
||||
sub_xids++;
|
||||
}
|
||||
ReorderBufferForget(ctx->reorder, xid, buf->origptr);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/* tell the reorderbuffer about the surviving subtransactions */
|
||||
for (i = 0; i < nsubxacts; i++)
|
||||
{
|
||||
ReorderBufferCommitChild(ctx->reorder, xid, *sub_xids,
|
||||
buf->origptr, buf->endptr);
|
||||
sub_xids++;
|
||||
}
|
||||
|
||||
/* replay actions of all transaction + subtransactions in order */
|
||||
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
|
||||
commit_time);
|
||||
}
|
||||
|
||||
/*
|
||||
* Get the data from the various forms of abort records and pass it on to
|
||||
* snapbuild.c and reorderbuffer.c
|
||||
*/
|
||||
static void
|
||||
DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
|
||||
TransactionId *sub_xids, int nsubxacts)
|
||||
{
|
||||
int i;
|
||||
|
||||
SnapBuildAbortTxn(ctx->snapshot_builder, lsn, xid, nsubxacts, sub_xids);
|
||||
|
||||
for (i = 0; i < nsubxacts; i++)
|
||||
{
|
||||
ReorderBufferAbort(ctx->reorder, *sub_xids, lsn);
|
||||
sub_xids++;
|
||||
}
|
||||
|
||||
ReorderBufferAbort(ctx->reorder, xid, lsn);
|
||||
}
|
||||
|
||||
/*
|
||||
* Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
|
||||
*
|
||||
* Deletes can contain the new tuple.
|
||||
*/
|
||||
static void
|
||||
DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
{
|
||||
XLogRecord *r = &buf->record;
|
||||
xl_heap_insert *xlrec;
|
||||
ReorderBufferChange *change;
|
||||
|
||||
xlrec = (xl_heap_insert *) buf->record_data;
|
||||
|
||||
/* only interested in our database */
|
||||
if (xlrec->target.node.dbNode != ctx->slot->data.database)
|
||||
return;
|
||||
|
||||
change = ReorderBufferGetChange(ctx->reorder);
|
||||
change->action = REORDER_BUFFER_CHANGE_INSERT;
|
||||
memcpy(&change->tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
|
||||
|
||||
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
|
||||
{
|
||||
Assert(r->xl_len > (SizeOfHeapInsert + SizeOfHeapHeader));
|
||||
|
||||
change->tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
|
||||
|
||||
DecodeXLogTuple((char *) xlrec + SizeOfHeapInsert,
|
||||
r->xl_len - SizeOfHeapInsert,
|
||||
change->tp.newtuple);
|
||||
}
|
||||
|
||||
ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change);
|
||||
}
|
||||
|
||||
/*
|
||||
* Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
|
||||
* in the record, from wal into proper tuplebufs.
|
||||
*
|
||||
* Updates can possibly contain a new tuple and the old primary key.
|
||||
*/
|
||||
static void
|
||||
DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
{
|
||||
XLogRecord *r = &buf->record;
|
||||
xl_heap_update *xlrec;
|
||||
xl_heap_header_len *xlhdr;
|
||||
ReorderBufferChange *change;
|
||||
char *data;
|
||||
|
||||
xlrec = (xl_heap_update *) buf->record_data;
|
||||
xlhdr = (xl_heap_header_len *) (buf->record_data + SizeOfHeapUpdate);
|
||||
|
||||
/* only interested in our database */
|
||||
if (xlrec->target.node.dbNode != ctx->slot->data.database)
|
||||
return;
|
||||
|
||||
change = ReorderBufferGetChange(ctx->reorder);
|
||||
change->action = REORDER_BUFFER_CHANGE_UPDATE;
|
||||
memcpy(&change->tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
|
||||
|
||||
data = (char *) &xlhdr->header;
|
||||
|
||||
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
|
||||
{
|
||||
Assert(r->xl_len > (SizeOfHeapUpdate + SizeOfHeapHeaderLen));
|
||||
|
||||
change->tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
|
||||
|
||||
DecodeXLogTuple(data,
|
||||
xlhdr->t_len + SizeOfHeapHeader,
|
||||
change->tp.newtuple);
|
||||
/* skip over the rest of the tuple header */
|
||||
data += SizeOfHeapHeader;
|
||||
/* skip over the tuple data */
|
||||
data += xlhdr->t_len;
|
||||
}
|
||||
|
||||
if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
|
||||
{
|
||||
xlhdr = (xl_heap_header_len *) data;
|
||||
change->tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
|
||||
DecodeXLogTuple((char *) &xlhdr->header,
|
||||
xlhdr->t_len + SizeOfHeapHeader,
|
||||
change->tp.oldtuple);
|
||||
data = (char *) &xlhdr->header;
|
||||
data += SizeOfHeapHeader;
|
||||
data += xlhdr->t_len;
|
||||
}
|
||||
|
||||
ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change);
|
||||
}
|
||||
|
||||
/*
|
||||
* Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
|
||||
*
|
||||
* Deletes can possibly contain the old primary key.
|
||||
*/
|
||||
static void
|
||||
DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
{
|
||||
XLogRecord *r = &buf->record;
|
||||
xl_heap_delete *xlrec;
|
||||
ReorderBufferChange *change;
|
||||
|
||||
xlrec = (xl_heap_delete *) buf->record_data;
|
||||
|
||||
/* only interested in our database */
|
||||
if (xlrec->target.node.dbNode != ctx->slot->data.database)
|
||||
return;
|
||||
|
||||
change = ReorderBufferGetChange(ctx->reorder);
|
||||
change->action = REORDER_BUFFER_CHANGE_DELETE;
|
||||
|
||||
memcpy(&change->tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
|
||||
|
||||
/* old primary key stored */
|
||||
if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
|
||||
{
|
||||
Assert(r->xl_len > (SizeOfHeapDelete + SizeOfHeapHeader));
|
||||
|
||||
change->tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
|
||||
|
||||
DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
|
||||
r->xl_len - SizeOfHeapDelete,
|
||||
change->tp.oldtuple);
|
||||
}
|
||||
ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change);
|
||||
}
|
||||
|
||||
/*
|
||||
* Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
|
||||
*
|
||||
* Currently MULTI_INSERT will always contain the full tuples.
|
||||
*/
|
||||
static void
|
||||
DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
{
|
||||
XLogRecord *r = &buf->record;
|
||||
xl_heap_multi_insert *xlrec;
|
||||
int i;
|
||||
char *data;
|
||||
bool isinit = (r->xl_info & XLOG_HEAP_INIT_PAGE) != 0;
|
||||
|
||||
xlrec = (xl_heap_multi_insert *) buf->record_data;
|
||||
|
||||
/* only interested in our database */
|
||||
if (xlrec->node.dbNode != ctx->slot->data.database)
|
||||
return;
|
||||
|
||||
data = buf->record_data + SizeOfHeapMultiInsert;
|
||||
|
||||
/*
|
||||
* OffsetNumbers (which are not of interest to us) are stored when
|
||||
* XLOG_HEAP_INIT_PAGE is not set -- skip over them.
|
||||
*/
|
||||
if (!isinit)
|
||||
data += sizeof(OffsetNumber) * xlrec->ntuples;
|
||||
|
||||
for (i = 0; i < xlrec->ntuples; i++)
|
||||
{
|
||||
ReorderBufferChange *change;
|
||||
xl_multi_insert_tuple *xlhdr;
|
||||
int datalen;
|
||||
ReorderBufferTupleBuf *tuple;
|
||||
|
||||
change = ReorderBufferGetChange(ctx->reorder);
|
||||
change->action = REORDER_BUFFER_CHANGE_INSERT;
|
||||
memcpy(&change->tp.relnode, &xlrec->node, sizeof(RelFileNode));
|
||||
|
||||
/*
|
||||
* CONTAINS_NEW_TUPLE will always be set currently as multi_insert
|
||||
* isn't used for catalogs, but better be future proof.
|
||||
*
|
||||
* We decode the tuple in pretty much the same way as DecodeXLogTuple,
|
||||
* but since the layout is slightly different, we can't use it here.
|
||||
*/
|
||||
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
|
||||
{
|
||||
change->tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
|
||||
|
||||
tuple = change->tp.newtuple;
|
||||
|
||||
/* not a disk based tuple */
|
||||
ItemPointerSetInvalid(&tuple->tuple.t_self);
|
||||
|
||||
xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
|
||||
data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
|
||||
datalen = xlhdr->datalen;
|
||||
|
||||
/*
|
||||
* We can only figure this out after reassembling the
|
||||
* transactions.
|
||||
*/
|
||||
tuple->tuple.t_tableOid = InvalidOid;
|
||||
tuple->tuple.t_data = &tuple->header;
|
||||
tuple->tuple.t_len = datalen
|
||||
+ offsetof(HeapTupleHeaderData, t_bits);
|
||||
|
||||
memset(&tuple->header, 0, sizeof(HeapTupleHeaderData));
|
||||
|
||||
memcpy((char *) &tuple->header
|
||||
+ offsetof(HeapTupleHeaderData, t_bits),
|
||||
(char *) data,
|
||||
datalen);
|
||||
data += datalen;
|
||||
|
||||
tuple->header.t_infomask = xlhdr->t_infomask;
|
||||
tuple->header.t_infomask2 = xlhdr->t_infomask2;
|
||||
tuple->header.t_hoff = xlhdr->t_hoff;
|
||||
}
|
||||
|
||||
ReorderBufferQueueChange(ctx->reorder, r->xl_xid,
|
||||
buf->origptr, change);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
|
||||
* (but not by heap_multi_insert) into a tuplebuf.
|
||||
*
|
||||
* The size 'len' and the pointer 'data' in the record need to be
|
||||
* computed outside as they are record specific.
|
||||
*/
|
||||
static void
|
||||
DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
|
||||
{
|
||||
xl_heap_header xlhdr;
|
||||
int datalen = len - SizeOfHeapHeader;
|
||||
|
||||
Assert(datalen >= 0);
|
||||
Assert(datalen <= MaxHeapTupleSize);
|
||||
|
||||
tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits);
|
||||
|
||||
/* not a disk based tuple */
|
||||
ItemPointerSetInvalid(&tuple->tuple.t_self);
|
||||
|
||||
/* we can only figure this out after reassembling the transactions */
|
||||
tuple->tuple.t_tableOid = InvalidOid;
|
||||
tuple->tuple.t_data = &tuple->header;
|
||||
|
||||
/* data is not stored aligned, copy to aligned storage */
|
||||
memcpy((char *) &xlhdr,
|
||||
data,
|
||||
SizeOfHeapHeader);
|
||||
|
||||
memset(&tuple->header, 0, sizeof(HeapTupleHeaderData));
|
||||
|
||||
memcpy((char *) &tuple->header + offsetof(HeapTupleHeaderData, t_bits),
|
||||
data + SizeOfHeapHeader,
|
||||
datalen);
|
||||
|
||||
tuple->header.t_infomask = xlhdr.t_infomask;
|
||||
tuple->header.t_infomask2 = xlhdr.t_infomask2;
|
||||
tuple->header.t_hoff = xlhdr.t_hoff;
|
||||
}
|
||||
920
src/backend/replication/logical/logical.c
Normal file
920
src/backend/replication/logical/logical.c
Normal file
@@ -0,0 +1,920 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
* logical.c
|
||||
* PostgreSQL logical decoding coordination
|
||||
*
|
||||
* Copyright (c) 2012-2014, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/replication/logical/logical.c
|
||||
*
|
||||
* NOTES
|
||||
* This file coordinates interaction between the various modules that
|
||||
* together providethe logical decoding, primarily by providing so
|
||||
* called LogicalDecodingContexts. The goal is to encapsulate most of the
|
||||
* internal complexity for consumers of logical decoding, so they can
|
||||
* create and consume a changestream with a low amount of code.
|
||||
*
|
||||
* The idea is that a consumer provides three callbacks, one to read WAL,
|
||||
* one to prepare a data write, and a final one for actually writing since
|
||||
* their implementation depends on the type of consumer. Check
|
||||
* logicalfunc.c for an example implementations of a fairly simple consumer
|
||||
* and a implementation of a WAL reading callback that's suitable for
|
||||
* simpler consumers.
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include <unistd.h>
|
||||
#include <sys/stat.h>
|
||||
|
||||
#include "miscadmin.h"
|
||||
|
||||
#include "access/xact.h"
|
||||
|
||||
#include "replication/decode.h"
|
||||
#include "replication/logical.h"
|
||||
#include "replication/reorderbuffer.h"
|
||||
#include "replication/snapbuild.h"
|
||||
|
||||
#include "storage/proc.h"
|
||||
#include "storage/procarray.h"
|
||||
|
||||
#include "utils/memutils.h"
|
||||
|
||||
/* data for errcontext callback */
|
||||
typedef struct LogicalErrorCallbackState
|
||||
{
|
||||
LogicalDecodingContext *ctx;
|
||||
const char *callback_name;
|
||||
XLogRecPtr report_location;
|
||||
} LogicalErrorCallbackState;
|
||||
|
||||
/* wrappers around output plugin callbacks */
|
||||
static void output_plugin_error_callback(void *arg);
|
||||
static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
|
||||
bool is_init);
|
||||
static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
|
||||
static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
|
||||
static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
XLogRecPtr commit_lsn);
|
||||
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change);
|
||||
|
||||
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
|
||||
|
||||
/*
|
||||
* Make sure the current settings & environment are capable of doing logical
|
||||
* decoding.
|
||||
*/
|
||||
void
|
||||
CheckLogicalDecodingRequirements(void)
|
||||
{
|
||||
CheckSlotRequirements();
|
||||
|
||||
if (wal_level < WAL_LEVEL_LOGICAL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("logical decoding requires wal_level >= logical")));
|
||||
|
||||
if (MyDatabaseId == InvalidOid)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("logical decoding requires a database connection")));
|
||||
|
||||
/* ----
|
||||
* TODO: We got to change that someday soon...
|
||||
*
|
||||
* There's basically three things missing to allow this:
|
||||
* 1) We need to be able to correctly and quickly identify the timeline a
|
||||
* LSN belongs to
|
||||
* 2) We need to force hot_standby_feedback to be enabled at all times so
|
||||
* the primary cannot remove rows we need.
|
||||
* 3) support dropping replication slots referring to a database, in
|
||||
* dbase_redo. There can't be any active ones due to HS recovery
|
||||
* conflicts, so that should be relatively easy.
|
||||
* ----
|
||||
*/
|
||||
if (RecoveryInProgress())
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("logical decoding cannot be used while in recovery")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper function for CreateInitialDecodingContext() and
|
||||
* CreateDecodingContext() performing common tasks.
|
||||
*/
|
||||
static LogicalDecodingContext *
|
||||
StartupDecodingContext(List *output_plugin_options,
|
||||
XLogRecPtr start_lsn,
|
||||
TransactionId xmin_horizon,
|
||||
XLogPageReadCB read_page,
|
||||
LogicalOutputPluginWriterPrepareWrite prepare_write,
|
||||
LogicalOutputPluginWriterWrite do_write)
|
||||
{
|
||||
ReplicationSlot *slot;
|
||||
MemoryContext context, old_context;
|
||||
LogicalDecodingContext *ctx;
|
||||
|
||||
/* shorter lines... */
|
||||
slot = MyReplicationSlot;
|
||||
|
||||
context = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"Changeset Extraction Context",
|
||||
ALLOCSET_DEFAULT_MINSIZE,
|
||||
ALLOCSET_DEFAULT_INITSIZE,
|
||||
ALLOCSET_DEFAULT_MAXSIZE);
|
||||
old_context = MemoryContextSwitchTo(context);
|
||||
ctx = palloc0(sizeof(LogicalDecodingContext));
|
||||
|
||||
ctx->context = context;
|
||||
|
||||
/* (re-)load output plugins, so we detect a bad (removed) output plugin now. */
|
||||
LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
|
||||
|
||||
/*
|
||||
* Now that the slot's xmin has been set, we can announce ourselves as a
|
||||
* logical decoding backend which doesn't need to be checked individually
|
||||
* when computing the xmin horizon because the xmin is enforced via
|
||||
* replication slots.
|
||||
*/
|
||||
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
|
||||
MyPgXact->vacuumFlags |= PROC_IN_LOGICAL_DECODING;
|
||||
LWLockRelease(ProcArrayLock);
|
||||
|
||||
ctx->slot = slot;
|
||||
|
||||
ctx->reader = XLogReaderAllocate(read_page, ctx);
|
||||
ctx->reader->private_data = ctx;
|
||||
|
||||
ctx->reorder = ReorderBufferAllocate();
|
||||
ctx->snapshot_builder =
|
||||
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn);
|
||||
|
||||
ctx->reorder->private_data = ctx;
|
||||
|
||||
/* wrap output plugin callbacks, so we can add error context information */
|
||||
ctx->reorder->begin = begin_cb_wrapper;
|
||||
ctx->reorder->apply_change = change_cb_wrapper;
|
||||
ctx->reorder->commit = commit_cb_wrapper;
|
||||
|
||||
ctx->out = makeStringInfo();
|
||||
ctx->prepare_write = prepare_write;
|
||||
ctx->write = do_write;
|
||||
|
||||
ctx->output_plugin_options = output_plugin_options;
|
||||
|
||||
MemoryContextSwitchTo(old_context);
|
||||
|
||||
return ctx;
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a new decoding context, for a new logical slot.
|
||||
*
|
||||
* plugin contains the name of the output plugin
|
||||
* output_plugin_options contains options passed to the output plugin
|
||||
* read_page, prepare_write, do_write are callbacks that have to be filled to
|
||||
* perform the use-case dependent, actual, work.
|
||||
*
|
||||
* Needs to be called while in a memory context that's at least as long lived
|
||||
* as the the decoding context because further memory contexts will be created
|
||||
* inside it.
|
||||
*
|
||||
* Returns an initialized decoding context after calling the output plugin's
|
||||
* startup function.
|
||||
*/
|
||||
LogicalDecodingContext *
|
||||
CreateInitDecodingContext(char *plugin,
|
||||
List *output_plugin_options,
|
||||
XLogPageReadCB read_page,
|
||||
LogicalOutputPluginWriterPrepareWrite prepare_write,
|
||||
LogicalOutputPluginWriterWrite do_write)
|
||||
{
|
||||
TransactionId xmin_horizon = InvalidTransactionId;
|
||||
ReplicationSlot *slot;
|
||||
LogicalDecodingContext *ctx;
|
||||
MemoryContext old_context;
|
||||
|
||||
/* shorter lines... */
|
||||
slot = MyReplicationSlot;
|
||||
|
||||
/* first some sanity checks that are unlikely to be violated */
|
||||
if (slot == NULL)
|
||||
elog(ERROR, "cannot perform logical decoding without a acquired slot");
|
||||
|
||||
if (plugin == NULL)
|
||||
elog(ERROR, "cannot initialize logical decoding without a specified plugin");
|
||||
|
||||
/* Make sure the passed slot is suitable. These are user facing errors. */
|
||||
if (slot->data.database == InvalidOid)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("cannot use physical replication slot created for logical decoding")));
|
||||
|
||||
if (slot->data.database != MyDatabaseId)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("replication slot \"%s\" was not created in this database",
|
||||
NameStr(slot->data.name))));
|
||||
|
||||
if (IsTransactionState() &&
|
||||
GetTopTransactionIdIfAny() != InvalidTransactionId)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot create logical replication slot in transaction that has performed writes")));
|
||||
|
||||
/* register output plugin name with slot */
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
strncpy(NameStr(slot->data.plugin), plugin,
|
||||
NAMEDATALEN);
|
||||
NameStr(slot->data.plugin)[NAMEDATALEN - 1] = '\0';
|
||||
SpinLockRelease(&slot->mutex);
|
||||
|
||||
/*
|
||||
* The replication slot mechanism is used to prevent removal of required
|
||||
* WAL. As there is no interlock between this and checkpoints required WAL
|
||||
* could be removed before ReplicationSlotsComputeRequiredLSN() has been
|
||||
* called to prevent that. In the very unlikely case that this happens
|
||||
* we'll just retry.
|
||||
*/
|
||||
while (true)
|
||||
{
|
||||
XLogSegNo segno;
|
||||
|
||||
/*
|
||||
* Let's start with enough information if we can, so log a standby
|
||||
* snapshot and start decoding at exactly that position.
|
||||
*/
|
||||
if (!RecoveryInProgress())
|
||||
{
|
||||
XLogRecPtr flushptr;
|
||||
|
||||
/* start at current insert position*/
|
||||
slot->data.restart_lsn = GetXLogInsertRecPtr();
|
||||
|
||||
/* make sure we have enough information to start */
|
||||
flushptr = LogStandbySnapshot();
|
||||
|
||||
/* and make sure it's fsynced to disk */
|
||||
XLogFlush(flushptr);
|
||||
}
|
||||
else
|
||||
slot->data.restart_lsn = GetRedoRecPtr();
|
||||
|
||||
/* prevent WAL removal as fast as possible */
|
||||
ReplicationSlotsComputeRequiredLSN();
|
||||
|
||||
/*
|
||||
* If all required WAL is still there, great, otherwise retry. The
|
||||
* slot should prevent further removal of WAL, unless there's a
|
||||
* concurrent ReplicationSlotsComputeRequiredLSN() after we've written
|
||||
* the new restart_lsn above, so normally we should never need to loop
|
||||
* more than twice.
|
||||
*/
|
||||
XLByteToSeg(slot->data.restart_lsn, segno);
|
||||
if (XLogGetLastRemovedSegno() < segno)
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
/* ----
|
||||
* This is a bit tricky: We need to determine a safe xmin horizon to start
|
||||
* decoding from, to avoid starting from a running xacts record referring
|
||||
* to xids whose rows have been vacuumed or pruned
|
||||
* already. GetOldestSafeDecodingTransactionId() returns such a value, but
|
||||
* without further interlock it's return value might immediately be out of
|
||||
* date.
|
||||
*
|
||||
* So we have to acquire the ProcArrayLock to prevent computation of new
|
||||
* xmin horizons by other backends, get the safe decoding xid, and inform
|
||||
* the slot machinery about the new limit. Once that's done the
|
||||
* ProcArrayLock can be be released as the slot machinery now is
|
||||
* protecting against vacuum.
|
||||
* ----
|
||||
*/
|
||||
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
|
||||
|
||||
slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId();
|
||||
slot->data.catalog_xmin = slot->effective_catalog_xmin;
|
||||
|
||||
ReplicationSlotsComputeRequiredXmin(true);
|
||||
|
||||
LWLockRelease(ProcArrayLock);
|
||||
|
||||
/*
|
||||
* tell the snapshot builder to only assemble snapshot once reaching
|
||||
* the a running_xact's record with the respective xmin.
|
||||
*/
|
||||
xmin_horizon = slot->data.catalog_xmin;
|
||||
|
||||
ReplicationSlotMarkDirty();
|
||||
ReplicationSlotSave();
|
||||
|
||||
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
|
||||
read_page, prepare_write, do_write);
|
||||
|
||||
/* call output plugin initialization callback */
|
||||
old_context = MemoryContextSwitchTo(ctx->context);
|
||||
if (ctx->callbacks.startup_cb != NULL)
|
||||
startup_cb_wrapper(ctx, &ctx->options, true);
|
||||
MemoryContextSwitchTo(old_context);
|
||||
|
||||
return ctx;
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a new decoding context, for a logical slot that has previously been
|
||||
* used already.
|
||||
*
|
||||
* start_lsn contains the LSN of the last received data or InvalidXLogRecPtr
|
||||
* output_plugin_options contains options passed to the output plugin
|
||||
* read_page, prepare_write, do_write are callbacks that have to be filled to
|
||||
* perform the use-case dependent, actual, work.
|
||||
*
|
||||
* Needs to be called while in a memory context that's at least as long lived
|
||||
* as the the decoding context because further memory contexts will be created
|
||||
* inside it.
|
||||
*
|
||||
* Returns an initialized decoding context after calling the output plugin's
|
||||
* startup function.
|
||||
*/
|
||||
LogicalDecodingContext *
|
||||
CreateDecodingContext(XLogRecPtr start_lsn,
|
||||
List *output_plugin_options,
|
||||
XLogPageReadCB read_page,
|
||||
LogicalOutputPluginWriterPrepareWrite prepare_write,
|
||||
LogicalOutputPluginWriterWrite do_write)
|
||||
{
|
||||
LogicalDecodingContext *ctx;
|
||||
ReplicationSlot *slot;
|
||||
MemoryContext old_context;
|
||||
|
||||
/* shorter lines... */
|
||||
slot = MyReplicationSlot;
|
||||
|
||||
/* first some sanity checks that are unlikely to be violated */
|
||||
if (slot == NULL)
|
||||
elog(ERROR, "cannot perform logical decoding without a acquired slot");
|
||||
|
||||
/* make sure the passed slot is suitable, these are user facing errors */
|
||||
if (slot->data.database == InvalidOid)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
(errmsg("cannot use physical replication slot for logical decoding"))));
|
||||
|
||||
if (slot->data.database != MyDatabaseId)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
(errmsg("replication slot \"%s\" was not created in this database",
|
||||
NameStr(slot->data.name)))));
|
||||
|
||||
if (start_lsn == InvalidXLogRecPtr)
|
||||
{
|
||||
/* continue from last position */
|
||||
start_lsn = slot->data.confirmed_flush;
|
||||
}
|
||||
else if (start_lsn < slot->data.confirmed_flush)
|
||||
{
|
||||
/*
|
||||
* It might seem like we should error out in this case, but it's
|
||||
* pretty common for a client to acknowledge a LSN it doesn't have to
|
||||
* do anything for, and thus didn't store persistently, because the
|
||||
* xlog records didn't result in anything relevant for logical
|
||||
* decoding. Clients have to be able to do that to support
|
||||
* synchronous replication.
|
||||
*/
|
||||
start_lsn = slot->data.confirmed_flush;
|
||||
elog(DEBUG1, "cannot stream from %X/%X, minimum is %X/%X, forwarding",
|
||||
(uint32)(start_lsn >> 32), (uint32)start_lsn,
|
||||
(uint32)(slot->data.confirmed_flush >> 32),
|
||||
(uint32)slot->data.confirmed_flush);
|
||||
}
|
||||
|
||||
ctx = StartupDecodingContext(output_plugin_options,
|
||||
start_lsn, InvalidTransactionId,
|
||||
read_page, prepare_write, do_write);
|
||||
|
||||
/* call output plugin initialization callback */
|
||||
old_context = MemoryContextSwitchTo(ctx->context);
|
||||
if (ctx->callbacks.startup_cb != NULL)
|
||||
startup_cb_wrapper(ctx, &ctx->options, true);
|
||||
MemoryContextSwitchTo(old_context);
|
||||
|
||||
ereport(LOG,
|
||||
(errmsg("starting logical decoding for slot %s",
|
||||
NameStr(slot->data.name)),
|
||||
errdetail("streaming transactions committing after %X/%X, reading WAL from %X/%X",
|
||||
(uint32)(slot->data.confirmed_flush >> 32),
|
||||
(uint32)slot->data.confirmed_flush,
|
||||
(uint32)(slot->data.restart_lsn >> 32),
|
||||
(uint32)slot->data.restart_lsn)));
|
||||
|
||||
return ctx;
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns true if an consistent initial decoding snapshot has been built.
|
||||
*/
|
||||
bool
|
||||
DecodingContextReady(LogicalDecodingContext *ctx)
|
||||
{
|
||||
return SnapBuildCurrentState(ctx->snapshot_builder) == SNAPBUILD_CONSISTENT;
|
||||
}
|
||||
|
||||
/*
|
||||
* Read from the decoding slot, until it is ready to start extracting changes.
|
||||
*/
|
||||
void
|
||||
DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
|
||||
{
|
||||
XLogRecPtr startptr;
|
||||
|
||||
/* Initialize from where to start reading WAL. */
|
||||
startptr = ctx->slot->data.restart_lsn;
|
||||
|
||||
elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
|
||||
(uint32)(ctx->slot->data.restart_lsn >> 32),
|
||||
(uint32)ctx->slot->data.restart_lsn);
|
||||
|
||||
/* Wait for a consistent starting point */
|
||||
for (;;)
|
||||
{
|
||||
XLogRecord *record;
|
||||
char *err = NULL;
|
||||
|
||||
/*
|
||||
* If the caller requires that interrupts be checked, the read_page
|
||||
* callback should do so, as those will often wait.
|
||||
*/
|
||||
|
||||
/* the read_page callback waits for new WAL */
|
||||
record = XLogReadRecord(ctx->reader, startptr, &err);
|
||||
if (err)
|
||||
elog(ERROR, "%s", err);
|
||||
|
||||
Assert(record);
|
||||
|
||||
startptr = InvalidXLogRecPtr;
|
||||
|
||||
LogicalDecodingProcessRecord(ctx, record);
|
||||
|
||||
/* only continue till we found a consistent spot */
|
||||
if (DecodingContextReady(ctx))
|
||||
break;
|
||||
}
|
||||
|
||||
ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
|
||||
}
|
||||
|
||||
/*
|
||||
* Free a previously allocated decoding context, invoking the shutdown
|
||||
* callback if necessary.
|
||||
*/
|
||||
void
|
||||
FreeDecodingContext(LogicalDecodingContext *ctx)
|
||||
{
|
||||
if (ctx->callbacks.shutdown_cb != NULL)
|
||||
shutdown_cb_wrapper(ctx);
|
||||
|
||||
ReorderBufferFree(ctx->reorder);
|
||||
FreeSnapshotBuilder(ctx->snapshot_builder);
|
||||
XLogReaderFree(ctx->reader);
|
||||
MemoryContextDelete(ctx->context);
|
||||
}
|
||||
|
||||
/*
|
||||
* Prepare a write using the context's output routine.
|
||||
*/
|
||||
void
|
||||
OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
|
||||
{
|
||||
if (!ctx->accept_writes)
|
||||
elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
|
||||
|
||||
ctx->prepare_write(ctx, ctx->write_location, ctx->write_xid, last_write);
|
||||
ctx->prepared_write = true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Perform a write using the context's output routine.
|
||||
*/
|
||||
void
|
||||
OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
|
||||
{
|
||||
if (!ctx->prepared_write)
|
||||
elog(ERROR, "OutputPluginPrepareWrite needs to be called before OutputPluginWrite");
|
||||
|
||||
ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write);
|
||||
ctx->prepared_write = false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Load the output plugin, lookup its output plugin init function, and check
|
||||
* that it provides the required callbacks.
|
||||
*/
|
||||
static void
|
||||
LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin)
|
||||
{
|
||||
LogicalOutputPluginInit plugin_init;
|
||||
|
||||
plugin_init = (LogicalOutputPluginInit)
|
||||
load_external_function(plugin, "_PG_output_plugin_init", false, NULL);
|
||||
|
||||
if (plugin_init == NULL)
|
||||
elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol");
|
||||
|
||||
/* ask the output plugin to fill the callback struct */
|
||||
plugin_init(callbacks);
|
||||
|
||||
if (callbacks->begin_cb == NULL)
|
||||
elog(ERROR, "output plugins have to register a begin callback");
|
||||
if (callbacks->change_cb == NULL)
|
||||
elog(ERROR, "output plugins have to register a change callback");
|
||||
if (callbacks->commit_cb == NULL)
|
||||
elog(ERROR, "output plugins have to register a commit callback");
|
||||
}
|
||||
|
||||
static void
|
||||
output_plugin_error_callback(void *arg)
|
||||
{
|
||||
LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg;
|
||||
/* not all callbacks have an associated LSN */
|
||||
if (state->report_location != InvalidXLogRecPtr)
|
||||
errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
|
||||
NameStr(state->ctx->slot->data.name),
|
||||
NameStr(state->ctx->slot->data.plugin),
|
||||
state->callback_name,
|
||||
(uint32)(state->report_location >> 32),
|
||||
(uint32)state->report_location);
|
||||
else
|
||||
errcontext("slot \"%s\", output plugin \"%s\", in the %s callback",
|
||||
NameStr(state->ctx->slot->data.name),
|
||||
NameStr(state->ctx->slot->data.plugin),
|
||||
state->callback_name);
|
||||
}
|
||||
|
||||
static void
|
||||
startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
|
||||
{
|
||||
LogicalErrorCallbackState state;
|
||||
ErrorContextCallback errcallback;
|
||||
|
||||
/* Push callback + info on the error context stack */
|
||||
state.ctx = ctx;
|
||||
state.callback_name = "startup";
|
||||
state.report_location = InvalidXLogRecPtr;
|
||||
errcallback.callback = output_plugin_error_callback;
|
||||
errcallback.arg = (void *) &state;
|
||||
errcallback.previous = error_context_stack;
|
||||
error_context_stack = &errcallback;
|
||||
|
||||
/* set output state */
|
||||
ctx->accept_writes = false;
|
||||
|
||||
/* do the actual work: call callback */
|
||||
ctx->callbacks.startup_cb(ctx, opt, is_init);
|
||||
|
||||
/* Pop the error context stack */
|
||||
error_context_stack = errcallback.previous;
|
||||
}
|
||||
|
||||
static void
|
||||
shutdown_cb_wrapper(LogicalDecodingContext *ctx)
|
||||
{
|
||||
LogicalErrorCallbackState state;
|
||||
ErrorContextCallback errcallback;
|
||||
|
||||
/* Push callback + info on the error context stack */
|
||||
state.ctx = ctx;
|
||||
state.callback_name = "shutdown";
|
||||
state.report_location = InvalidXLogRecPtr;
|
||||
errcallback.callback = output_plugin_error_callback;
|
||||
errcallback.arg = (void *) &state;
|
||||
errcallback.previous = error_context_stack;
|
||||
error_context_stack = &errcallback;
|
||||
|
||||
/* set output state */
|
||||
ctx->accept_writes = false;
|
||||
|
||||
/* do the actual work: call callback */
|
||||
ctx->callbacks.shutdown_cb(ctx);
|
||||
|
||||
/* Pop the error context stack */
|
||||
error_context_stack = errcallback.previous;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Callbacks for ReorderBuffer which add in some more information and then call
|
||||
* output_plugin.h plugins.
|
||||
*/
|
||||
static void
|
||||
begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
|
||||
{
|
||||
LogicalDecodingContext *ctx = cache->private_data;
|
||||
LogicalErrorCallbackState state;
|
||||
ErrorContextCallback errcallback;
|
||||
|
||||
/* Push callback + info on the error context stack */
|
||||
state.ctx = ctx;
|
||||
state.callback_name = "begin";
|
||||
state.report_location = txn->first_lsn;
|
||||
errcallback.callback = output_plugin_error_callback;
|
||||
errcallback.arg = (void *) &state;
|
||||
errcallback.previous = error_context_stack;
|
||||
error_context_stack = &errcallback;
|
||||
|
||||
/* set output state */
|
||||
ctx->accept_writes = true;
|
||||
ctx->write_xid = txn->xid;
|
||||
ctx->write_location = txn->first_lsn;
|
||||
|
||||
/* do the actual work: call callback */
|
||||
ctx->callbacks.begin_cb(ctx, txn);
|
||||
|
||||
/* Pop the error context stack */
|
||||
error_context_stack = errcallback.previous;
|
||||
}
|
||||
|
||||
static void
|
||||
commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
XLogRecPtr commit_lsn)
|
||||
{
|
||||
LogicalDecodingContext *ctx = cache->private_data;
|
||||
LogicalErrorCallbackState state;
|
||||
ErrorContextCallback errcallback;
|
||||
|
||||
/* Push callback + info on the error context stack */
|
||||
state.ctx = ctx;
|
||||
state.callback_name = "commit";
|
||||
state.report_location = txn->final_lsn; /* beginning of commit record */
|
||||
errcallback.callback = output_plugin_error_callback;
|
||||
errcallback.arg = (void *) &state;
|
||||
errcallback.previous = error_context_stack;
|
||||
error_context_stack = &errcallback;
|
||||
|
||||
/* set output state */
|
||||
ctx->accept_writes = true;
|
||||
ctx->write_xid = txn->xid;
|
||||
ctx->write_location = txn->end_lsn; /* points to the end of the record */
|
||||
|
||||
/* do the actual work: call callback */
|
||||
ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
|
||||
|
||||
/* Pop the error context stack */
|
||||
error_context_stack = errcallback.previous;
|
||||
}
|
||||
|
||||
static void
|
||||
change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change)
|
||||
{
|
||||
LogicalDecodingContext *ctx = cache->private_data;
|
||||
LogicalErrorCallbackState state;
|
||||
ErrorContextCallback errcallback;
|
||||
|
||||
/* Push callback + info on the error context stack */
|
||||
state.ctx = ctx;
|
||||
state.callback_name = "change";
|
||||
state.report_location = change->lsn;
|
||||
errcallback.callback = output_plugin_error_callback;
|
||||
errcallback.arg = (void *) &state;
|
||||
errcallback.previous = error_context_stack;
|
||||
error_context_stack = &errcallback;
|
||||
|
||||
/* set output state */
|
||||
ctx->accept_writes = true;
|
||||
ctx->write_xid = txn->xid;
|
||||
/*
|
||||
* report this change's lsn so replies from clients can give an up2date
|
||||
* answer. This won't ever be enough (and shouldn't be!) to confirm
|
||||
* receipt of this transaction, but it might allow another transaction's
|
||||
* commit to be confirmed with one message.
|
||||
*/
|
||||
ctx->write_location = change->lsn;
|
||||
|
||||
ctx->callbacks.change_cb(ctx, txn, relation, change);
|
||||
|
||||
/* Pop the error context stack */
|
||||
error_context_stack = errcallback.previous;
|
||||
}
|
||||
|
||||
/*
|
||||
* Set the required catalog xmin horizon for historic snapshots in the current
|
||||
* replication slot.
|
||||
*
|
||||
* Note that in the most cases, we won't be able to immediately use the xmin
|
||||
* to increase the xmin horizon, we need to wait till the client has confirmed
|
||||
* receiving current_lsn with LogicalConfirmReceivedLocation().
|
||||
*/
|
||||
void
|
||||
LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
|
||||
{
|
||||
bool updated_xmin = false;
|
||||
ReplicationSlot *slot;
|
||||
|
||||
slot = MyReplicationSlot;
|
||||
|
||||
Assert(slot != NULL);
|
||||
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
|
||||
/*
|
||||
* don't overwrite if we already have a newer xmin. This can
|
||||
* happen if we restart decoding in a slot.
|
||||
*/
|
||||
if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
|
||||
{
|
||||
}
|
||||
/*
|
||||
* If the client has already confirmed up to this lsn, we directly
|
||||
* can mark this as accepted. This can happen if we restart
|
||||
* decoding in a slot.
|
||||
*/
|
||||
else if (current_lsn <= slot->data.confirmed_flush)
|
||||
{
|
||||
slot->candidate_catalog_xmin = xmin;
|
||||
slot->candidate_xmin_lsn = current_lsn;
|
||||
|
||||
/* our candidate can directly be used */
|
||||
updated_xmin = true;
|
||||
}
|
||||
/*
|
||||
* Only increase if the previous values have been applied, otherwise we
|
||||
* might never end up updating if the receiver acks too slowly.
|
||||
*/
|
||||
else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
|
||||
{
|
||||
slot->candidate_catalog_xmin = xmin;
|
||||
slot->candidate_xmin_lsn = current_lsn;
|
||||
}
|
||||
SpinLockRelease(&slot->mutex);
|
||||
|
||||
/* candidate already valid with the current flush position, apply */
|
||||
if (updated_xmin)
|
||||
LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
|
||||
}
|
||||
|
||||
/*
|
||||
* Mark the minimal LSN (restart_lsn) we need to read to replay all
|
||||
* transactions that have not yet committed at current_lsn.
|
||||
*
|
||||
* Just like IncreaseRestartDecodingForSlot this nly takes effect when the
|
||||
* client has confirmed to have received current_lsn.
|
||||
*/
|
||||
void
|
||||
LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
|
||||
{
|
||||
bool updated_lsn = false;
|
||||
ReplicationSlot *slot;
|
||||
|
||||
slot = MyReplicationSlot;
|
||||
|
||||
Assert(slot != NULL);
|
||||
Assert(restart_lsn != InvalidXLogRecPtr);
|
||||
Assert(current_lsn != InvalidXLogRecPtr);
|
||||
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
|
||||
/* don't overwrite if have a newer restart lsn*/
|
||||
if (restart_lsn <= slot->data.restart_lsn)
|
||||
{
|
||||
}
|
||||
/*
|
||||
* We might have already flushed far enough to directly accept this lsn, in
|
||||
* this case there is no need to check for existing candidate LSNs
|
||||
*/
|
||||
else if (current_lsn <= slot->data.confirmed_flush)
|
||||
{
|
||||
slot->candidate_restart_valid = current_lsn;
|
||||
slot->candidate_restart_lsn = restart_lsn;
|
||||
|
||||
/* our candidate can directly be used */
|
||||
updated_lsn = true;
|
||||
}
|
||||
/*
|
||||
* Only increase if the previous values have been applied, otherwise we
|
||||
* might never end up updating if the receiver acks too slowly. A missed
|
||||
* value here will just cause some extra effort after reconnecting.
|
||||
*/
|
||||
if (slot->candidate_restart_valid == InvalidXLogRecPtr)
|
||||
{
|
||||
slot->candidate_restart_valid = current_lsn;
|
||||
slot->candidate_restart_lsn = restart_lsn;
|
||||
|
||||
elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
|
||||
(uint32) (restart_lsn >> 32), (uint32) restart_lsn,
|
||||
(uint32) (current_lsn >> 32), (uint32) current_lsn);
|
||||
}
|
||||
else
|
||||
{
|
||||
elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
|
||||
(uint32) (restart_lsn >> 32), (uint32) restart_lsn,
|
||||
(uint32) (current_lsn >> 32), (uint32) current_lsn,
|
||||
(uint32) (slot->candidate_restart_lsn >> 32),
|
||||
(uint32) slot->candidate_restart_lsn,
|
||||
(uint32) (slot->candidate_restart_valid >> 32),
|
||||
(uint32) slot->candidate_restart_valid,
|
||||
(uint32) (slot->data.confirmed_flush >> 32),
|
||||
(uint32) slot->data.confirmed_flush
|
||||
);
|
||||
}
|
||||
SpinLockRelease(&slot->mutex);
|
||||
|
||||
/* candidates are already valid with the current flush position, apply */
|
||||
if (updated_lsn)
|
||||
LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
|
||||
}
|
||||
|
||||
/*
|
||||
* Handle a consumer's conformation having received all changes up to lsn.
|
||||
*/
|
||||
void
|
||||
LogicalConfirmReceivedLocation(XLogRecPtr lsn)
|
||||
{
|
||||
Assert(lsn != InvalidXLogRecPtr);
|
||||
|
||||
/* Do an unlocked check for candidate_lsn first. */
|
||||
if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
|
||||
MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
|
||||
{
|
||||
bool updated_xmin = false;
|
||||
bool updated_restart = false;
|
||||
|
||||
/* use volatile pointer to prevent code rearrangement */
|
||||
volatile ReplicationSlot *slot = MyReplicationSlot;
|
||||
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
|
||||
slot->data.confirmed_flush = lsn;
|
||||
|
||||
/* if were past the location required for bumping xmin, do so */
|
||||
if (slot->candidate_xmin_lsn != InvalidXLogRecPtr &&
|
||||
slot->candidate_xmin_lsn <= lsn)
|
||||
{
|
||||
/*
|
||||
* We have to write the changed xmin to disk *before* we change
|
||||
* the in-memory value, otherwise after a crash we wouldn't know
|
||||
* that some catalog tuples might have been removed already.
|
||||
*
|
||||
* Ensure that by first writing to ->xmin and only update
|
||||
* ->effective_xmin once the new state is synced to disk. After a
|
||||
* crash ->effective_xmin is set to ->xmin.
|
||||
*/
|
||||
if (TransactionIdIsValid(slot->candidate_catalog_xmin) &&
|
||||
slot->data.catalog_xmin != slot->candidate_catalog_xmin)
|
||||
{
|
||||
slot->data.catalog_xmin = slot->candidate_catalog_xmin;
|
||||
slot->candidate_catalog_xmin = InvalidTransactionId;
|
||||
slot->candidate_xmin_lsn = InvalidXLogRecPtr;
|
||||
updated_xmin = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (slot->candidate_restart_valid != InvalidXLogRecPtr &&
|
||||
slot->candidate_restart_valid <= lsn)
|
||||
{
|
||||
Assert(slot->candidate_restart_lsn != InvalidXLogRecPtr);
|
||||
|
||||
slot->data.restart_lsn = slot->candidate_restart_lsn;
|
||||
slot->candidate_restart_lsn = InvalidXLogRecPtr;
|
||||
slot->candidate_restart_valid = InvalidXLogRecPtr;
|
||||
updated_restart = true;
|
||||
}
|
||||
|
||||
SpinLockRelease(&slot->mutex);
|
||||
|
||||
/* first write new xmin to disk, so we know whats up after a crash */
|
||||
if (updated_xmin || updated_restart)
|
||||
{
|
||||
ReplicationSlotMarkDirty();
|
||||
ReplicationSlotSave();
|
||||
elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
|
||||
}
|
||||
/*
|
||||
* Now the new xmin is safely on disk, we can let the global value
|
||||
* advance. We do not take ProcArrayLock or similar since we only
|
||||
* advance xmin here and there's not much harm done by a concurrent
|
||||
* computation missing that.
|
||||
*/
|
||||
if (updated_xmin)
|
||||
{
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
slot->effective_catalog_xmin = slot->data.catalog_xmin;
|
||||
SpinLockRelease(&slot->mutex);
|
||||
|
||||
ReplicationSlotsComputeRequiredXmin(false);
|
||||
ReplicationSlotsComputeRequiredLSN();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
volatile ReplicationSlot *slot = MyReplicationSlot;
|
||||
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
slot->data.confirmed_flush = lsn;
|
||||
SpinLockRelease(&slot->mutex);
|
||||
}
|
||||
}
|
||||
509
src/backend/replication/logical/logicalfuncs.c
Normal file
509
src/backend/replication/logical/logicalfuncs.c
Normal file
@@ -0,0 +1,509 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* logicalfuncs.c
|
||||
*
|
||||
* Support functions for using logical decoding and managemnt of
|
||||
* logical replication slots via SQL.
|
||||
*
|
||||
*
|
||||
* Copyright (c) 2012-2014, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/replication/logicalfuncs.c
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
#include "fmgr.h"
|
||||
#include "funcapi.h"
|
||||
#include "miscadmin.h"
|
||||
|
||||
#include "catalog/pg_type.h"
|
||||
|
||||
#include "nodes/makefuncs.h"
|
||||
|
||||
#include "mb/pg_wchar.h"
|
||||
|
||||
#include "utils/array.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/inval.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/pg_lsn.h"
|
||||
#include "utils/resowner.h"
|
||||
#include "utils/lsyscache.h"
|
||||
|
||||
#include "replication/decode.h"
|
||||
#include "replication/logical.h"
|
||||
#include "replication/logicalfuncs.h"
|
||||
|
||||
#include "storage/fd.h"
|
||||
|
||||
/* private date for writing out data */
|
||||
typedef struct DecodingOutputState {
|
||||
Tuplestorestate *tupstore;
|
||||
TupleDesc tupdesc;
|
||||
bool binary_output;
|
||||
int64 returned_rows;
|
||||
} DecodingOutputState;
|
||||
|
||||
/*
|
||||
* Prepare for a output plugin write.
|
||||
*/
|
||||
static void
|
||||
LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
|
||||
bool last_write)
|
||||
{
|
||||
resetStringInfo(ctx->out);
|
||||
}
|
||||
|
||||
/*
|
||||
* Perform output plugin write into tuplestore.
|
||||
*/
|
||||
static void
|
||||
LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
|
||||
bool last_write)
|
||||
{
|
||||
Datum values[3];
|
||||
bool nulls[3];
|
||||
DecodingOutputState *p;
|
||||
|
||||
/* SQL Datums can only be of a limited length... */
|
||||
if (ctx->out->len > MaxAllocSize - VARHDRSZ)
|
||||
elog(ERROR, "too much output for sql interface");
|
||||
|
||||
p = (DecodingOutputState *) ctx->output_writer_private;
|
||||
|
||||
memset(nulls, 0, sizeof(nulls));
|
||||
values[0] = LSNGetDatum(lsn);
|
||||
values[1] = TransactionIdGetDatum(xid);
|
||||
|
||||
/*
|
||||
* Assert ctx->out is in database encoding when we're writing textual
|
||||
* output.
|
||||
*/
|
||||
if (!p->binary_output)
|
||||
Assert(pg_verify_mbstr(GetDatabaseEncoding(),
|
||||
ctx->out->data, ctx->out->len,
|
||||
false));
|
||||
|
||||
/* ick, but cstring_to_text_with_len works for bytea perfectly fine */
|
||||
values[2] = PointerGetDatum(
|
||||
cstring_to_text_with_len(ctx->out->data, ctx->out->len));
|
||||
|
||||
tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
|
||||
p->returned_rows++;
|
||||
}
|
||||
|
||||
/*
|
||||
* TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
|
||||
* we currently don't have the infrastructure (elog!) to share it.
|
||||
*/
|
||||
static void
|
||||
XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
|
||||
{
|
||||
char *p;
|
||||
XLogRecPtr recptr;
|
||||
Size nbytes;
|
||||
|
||||
static int sendFile = -1;
|
||||
static XLogSegNo sendSegNo = 0;
|
||||
static uint32 sendOff = 0;
|
||||
|
||||
p = buf;
|
||||
recptr = startptr;
|
||||
nbytes = count;
|
||||
|
||||
while (nbytes > 0)
|
||||
{
|
||||
uint32 startoff;
|
||||
int segbytes;
|
||||
int readbytes;
|
||||
|
||||
startoff = recptr % XLogSegSize;
|
||||
|
||||
if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
|
||||
{
|
||||
char path[MAXPGPATH];
|
||||
|
||||
/* Switch to another logfile segment */
|
||||
if (sendFile >= 0)
|
||||
close(sendFile);
|
||||
|
||||
XLByteToSeg(recptr, sendSegNo);
|
||||
|
||||
XLogFilePath(path, tli, sendSegNo);
|
||||
|
||||
sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
|
||||
|
||||
if (sendFile < 0)
|
||||
{
|
||||
if (errno == ENOENT)
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("requested WAL segment %s has already been removed",
|
||||
path)));
|
||||
else
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not open file \"%s\": %m",
|
||||
path)));
|
||||
}
|
||||
sendOff = 0;
|
||||
}
|
||||
|
||||
/* Need to seek in the file? */
|
||||
if (sendOff != startoff)
|
||||
{
|
||||
if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
|
||||
{
|
||||
char path[MAXPGPATH];
|
||||
|
||||
XLogFilePath(path, tli, sendSegNo);
|
||||
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not seek in log segment %s to offset %u: %m",
|
||||
path, startoff)));
|
||||
}
|
||||
sendOff = startoff;
|
||||
}
|
||||
|
||||
/* How many bytes are within this segment? */
|
||||
if (nbytes > (XLogSegSize - startoff))
|
||||
segbytes = XLogSegSize - startoff;
|
||||
else
|
||||
segbytes = nbytes;
|
||||
|
||||
readbytes = read(sendFile, p, segbytes);
|
||||
if (readbytes <= 0)
|
||||
{
|
||||
char path[MAXPGPATH];
|
||||
|
||||
XLogFilePath(path, tli, sendSegNo);
|
||||
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read from log segment %s, offset %u, length %lu: %m",
|
||||
path, sendOff, (unsigned long) segbytes)));
|
||||
}
|
||||
|
||||
/* Update state for read */
|
||||
recptr += readbytes;
|
||||
|
||||
sendOff += readbytes;
|
||||
nbytes -= readbytes;
|
||||
p += readbytes;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
check_permissions(void)
|
||||
{
|
||||
if (!superuser() && !has_rolreplication(GetUserId()))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
(errmsg("must be superuser or replication role to use replication slots"))));
|
||||
}
|
||||
|
||||
/*
|
||||
* read_page callback for logical decoding contexts.
|
||||
*
|
||||
* Public because it would likely be very helpful for someone writing another
|
||||
* output method outside walsender, e.g. in a bgworker.
|
||||
*
|
||||
* TODO: The walsender has it's own version of this, but it relies on the
|
||||
* walsender's latch being set whenever WAL is flushed. No such infrastructure
|
||||
* exists for normal backends, so we have to do a check/sleep/repeat style of
|
||||
* loop for now.
|
||||
*/
|
||||
int
|
||||
logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
|
||||
int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
|
||||
{
|
||||
XLogRecPtr flushptr,
|
||||
loc;
|
||||
int count;
|
||||
|
||||
loc = targetPagePtr + reqLen;
|
||||
while (1)
|
||||
{
|
||||
/*
|
||||
* TODO: we're going to have to do something more intelligent about
|
||||
* timelines on standbys. Use readTimeLineHistory() and
|
||||
* tliOfPointInHistory() to get the proper LSN? For now we'll catch
|
||||
* that case earlier, but the code and TODO is left in here for when
|
||||
* that changes.
|
||||
*/
|
||||
if (!RecoveryInProgress())
|
||||
{
|
||||
*pageTLI = ThisTimeLineID;
|
||||
flushptr = GetFlushRecPtr();
|
||||
}
|
||||
else
|
||||
flushptr = GetXLogReplayRecPtr(pageTLI);
|
||||
|
||||
if (loc <= flushptr)
|
||||
break;
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
pg_usleep(1000L);
|
||||
}
|
||||
|
||||
/* more than one block available */
|
||||
if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
|
||||
count = XLOG_BLCKSZ;
|
||||
/* not enough data there */
|
||||
else if (targetPagePtr + reqLen > flushptr)
|
||||
return -1;
|
||||
/* part of the page available */
|
||||
else
|
||||
count = flushptr - targetPagePtr;
|
||||
|
||||
XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper function for the various SQL callable logical decoding functions.
|
||||
*/
|
||||
static Datum
|
||||
pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
|
||||
{
|
||||
Name name = PG_GETARG_NAME(0);
|
||||
XLogRecPtr upto_lsn;
|
||||
int32 upto_nchanges;
|
||||
|
||||
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||
MemoryContext per_query_ctx;
|
||||
MemoryContext oldcontext;
|
||||
|
||||
XLogRecPtr end_of_wal;
|
||||
XLogRecPtr startptr;
|
||||
|
||||
LogicalDecodingContext *ctx;
|
||||
|
||||
ResourceOwner old_resowner = CurrentResourceOwner;
|
||||
ArrayType *arr;
|
||||
Size ndim;
|
||||
List *options = NIL;
|
||||
DecodingOutputState *p;
|
||||
|
||||
if (PG_ARGISNULL(1))
|
||||
upto_lsn = InvalidXLogRecPtr;
|
||||
else
|
||||
upto_lsn = PG_GETARG_LSN(1);
|
||||
|
||||
if (PG_ARGISNULL(2))
|
||||
upto_nchanges = InvalidXLogRecPtr;
|
||||
else
|
||||
upto_nchanges = PG_GETARG_INT32(2);
|
||||
|
||||
/* check to see if caller supports us returning a tuplestore */
|
||||
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("set-valued function called in context that cannot accept a set")));
|
||||
if (!(rsinfo->allowedModes & SFRM_Materialize))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("materialize mode required, but it is not allowed in this context")));
|
||||
|
||||
/* state to write output to */
|
||||
p = palloc0(sizeof(DecodingOutputState));
|
||||
|
||||
p->binary_output = binary;
|
||||
|
||||
/* Build a tuple descriptor for our result type */
|
||||
if (get_call_result_type(fcinfo, NULL, &p->tupdesc) != TYPEFUNC_COMPOSITE)
|
||||
elog(ERROR, "return type must be a row type");
|
||||
|
||||
check_permissions();
|
||||
|
||||
CheckLogicalDecodingRequirements();
|
||||
|
||||
arr = PG_GETARG_ARRAYTYPE_P(3);
|
||||
ndim = ARR_NDIM(arr);
|
||||
|
||||
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
|
||||
oldcontext = MemoryContextSwitchTo(per_query_ctx);
|
||||
|
||||
if (ndim > 1)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("array must be one-dimensional")));
|
||||
}
|
||||
else if (array_contains_nulls(arr))
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("array must not contain nulls")));
|
||||
}
|
||||
else if (ndim == 1)
|
||||
{
|
||||
int nelems;
|
||||
Datum *datum_opts;
|
||||
int i;
|
||||
|
||||
Assert(ARR_ELEMTYPE(arr) == TEXTOID);
|
||||
|
||||
deconstruct_array(arr, TEXTOID, -1, false, 'i',
|
||||
&datum_opts, NULL, &nelems);
|
||||
|
||||
if (nelems % 2 != 0)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("array must have even number of elements")));
|
||||
|
||||
for (i = 0; i < nelems; i += 2)
|
||||
{
|
||||
char *name = TextDatumGetCString(datum_opts[i]);
|
||||
char *opt = TextDatumGetCString(datum_opts[i + 1]);
|
||||
|
||||
options = lappend(options, makeDefElem(name, (Node *) makeString(opt)));
|
||||
}
|
||||
}
|
||||
|
||||
p->tupstore = tuplestore_begin_heap(true, false, work_mem);
|
||||
rsinfo->returnMode = SFRM_Materialize;
|
||||
rsinfo->setResult = p->tupstore;
|
||||
rsinfo->setDesc = p->tupdesc;
|
||||
|
||||
/* compute the current end-of-wal */
|
||||
if (!RecoveryInProgress())
|
||||
end_of_wal = GetFlushRecPtr();
|
||||
else
|
||||
end_of_wal = GetXLogReplayRecPtr(NULL);
|
||||
|
||||
CheckLogicalDecodingRequirements();
|
||||
ReplicationSlotAcquire(NameStr(*name));
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
ctx = CreateDecodingContext(InvalidXLogRecPtr,
|
||||
options,
|
||||
logical_read_local_xlog_page,
|
||||
LogicalOutputPrepareWrite,
|
||||
LogicalOutputWrite);
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
/*
|
||||
* Check whether the output pluggin writes textual output if that's
|
||||
* what we need.
|
||||
*/
|
||||
if (!binary &&
|
||||
ctx->options.output_type != OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("output plugin cannot produce text output")));
|
||||
|
||||
ctx->output_writer_private = p;
|
||||
|
||||
startptr = MyReplicationSlot->data.restart_lsn;
|
||||
|
||||
CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner, "logical decoding");
|
||||
|
||||
/* invalidate non-timetravel entries */
|
||||
InvalidateSystemCaches();
|
||||
|
||||
while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
|
||||
(ctx->reader->EndRecPtr && ctx->reader->EndRecPtr < end_of_wal))
|
||||
{
|
||||
XLogRecord *record;
|
||||
char *errm = NULL;
|
||||
|
||||
record = XLogReadRecord(ctx->reader, startptr, &errm);
|
||||
if (errm)
|
||||
elog(ERROR, "%s", errm);
|
||||
|
||||
startptr = InvalidXLogRecPtr;
|
||||
|
||||
/*
|
||||
* The {begin_txn,change,commit_txn}_wrapper callbacks above will
|
||||
* store the description into our tuplestore.
|
||||
*/
|
||||
if (record != NULL)
|
||||
LogicalDecodingProcessRecord(ctx, record);
|
||||
|
||||
/* check limits */
|
||||
if (upto_lsn != InvalidXLogRecPtr &&
|
||||
upto_lsn <= ctx->reader->EndRecPtr)
|
||||
break;
|
||||
if (upto_nchanges != 0 &&
|
||||
upto_nchanges <= p->returned_rows)
|
||||
break;
|
||||
}
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
/* clear all timetravel entries */
|
||||
InvalidateSystemCaches();
|
||||
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
tuplestore_donestoring(tupstore);
|
||||
|
||||
CurrentResourceOwner = old_resowner;
|
||||
|
||||
/*
|
||||
* Next time, start where we left off. (Hunting things, the family
|
||||
* business..)
|
||||
*/
|
||||
if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
|
||||
LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
|
||||
|
||||
/* free context, call shutdown callback */
|
||||
FreeDecodingContext(ctx);
|
||||
|
||||
ReplicationSlotRelease();
|
||||
InvalidateSystemCaches();
|
||||
|
||||
return (Datum) 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* SQL function returning the changestream as text, consuming the data.
|
||||
*/
|
||||
Datum
|
||||
pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, false);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
* SQL function returning the changestream as text, only peeking ahead.
|
||||
*/
|
||||
Datum
|
||||
pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, false);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
* SQL function returning the changestream in binary, consuming the data.
|
||||
*/
|
||||
Datum
|
||||
pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, true);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
* SQL function returning the changestream in binary, only peeking ahead.
|
||||
*/
|
||||
Datum
|
||||
pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, true);
|
||||
return ret;
|
||||
}
|
||||
3059
src/backend/replication/logical/reorderbuffer.c
Normal file
3059
src/backend/replication/logical/reorderbuffer.c
Normal file
File diff suppressed because it is too large
Load Diff
1885
src/backend/replication/logical/snapbuild.c
Normal file
1885
src/backend/replication/logical/snapbuild.c
Normal file
File diff suppressed because it is too large
Load Diff
@@ -43,6 +43,7 @@
|
||||
#include "miscadmin.h"
|
||||
#include "replication/slot.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/proc.h"
|
||||
#include "storage/procarray.h"
|
||||
|
||||
/*
|
||||
@@ -82,6 +83,8 @@ ReplicationSlot *MyReplicationSlot = NULL;
|
||||
/* GUCs */
|
||||
int max_replication_slots = 0; /* the maximum number of replication slots */
|
||||
|
||||
static void ReplicationSlotDropAcquired(void);
|
||||
|
||||
/* internal persistency functions */
|
||||
static void RestoreSlotFromDisk(const char *name);
|
||||
static void CreateSlotOnDisk(ReplicationSlot *slot);
|
||||
@@ -190,11 +193,12 @@ ReplicationSlotValidateName(const char *name, int elevel)
|
||||
* Create a new replication slot and mark it as used by this backend.
|
||||
*
|
||||
* name: Name of the slot
|
||||
* db_specific: changeset extraction is db specific, if the slot is going to
|
||||
* db_specific: logical decoding is db specific; if the slot is going to
|
||||
* be used for that pass true, otherwise false.
|
||||
*/
|
||||
void
|
||||
ReplicationSlotCreate(const char *name, bool db_specific)
|
||||
ReplicationSlotCreate(const char *name, bool db_specific,
|
||||
ReplicationSlotPersistency persistency)
|
||||
{
|
||||
ReplicationSlot *slot = NULL;
|
||||
int i;
|
||||
@@ -246,6 +250,7 @@ ReplicationSlotCreate(const char *name, bool db_specific)
|
||||
*/
|
||||
Assert(!slot->in_use);
|
||||
Assert(!slot->active);
|
||||
slot->data.persistency = persistency;
|
||||
slot->data.xmin = InvalidTransactionId;
|
||||
slot->effective_xmin = InvalidTransactionId;
|
||||
strncpy(NameStr(slot->data.name), name, NAMEDATALEN);
|
||||
@@ -348,14 +353,30 @@ ReplicationSlotRelease(void)
|
||||
|
||||
Assert(slot != NULL && slot->active);
|
||||
|
||||
/* Mark slot inactive. We're not freeing it, just disconnecting. */
|
||||
if (slot->data.persistency == RS_EPHEMERAL)
|
||||
{
|
||||
/*
|
||||
* Delete the slot. There is no !PANIC case where this is allowed to
|
||||
* fail, all that may happen is an incomplete cleanup of the on-disk
|
||||
* data.
|
||||
*/
|
||||
ReplicationSlotDropAcquired();
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Mark slot inactive. We're not freeing it, just disconnecting. */
|
||||
volatile ReplicationSlot *vslot = slot;
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
vslot->active = false;
|
||||
SpinLockRelease(&slot->mutex);
|
||||
MyReplicationSlot = NULL;
|
||||
}
|
||||
|
||||
MyReplicationSlot = NULL;
|
||||
|
||||
/* might not have been set when we've been a plain slot */
|
||||
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
|
||||
MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
|
||||
LWLockRelease(ProcArrayLock);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -364,52 +385,36 @@ ReplicationSlotRelease(void)
|
||||
void
|
||||
ReplicationSlotDrop(const char *name)
|
||||
{
|
||||
ReplicationSlot *slot = NULL;
|
||||
int i;
|
||||
bool active;
|
||||
Assert(MyReplicationSlot == NULL);
|
||||
|
||||
ReplicationSlotAcquire(name);
|
||||
|
||||
ReplicationSlotDropAcquired();
|
||||
}
|
||||
|
||||
/*
|
||||
* Permanently drop the currently acquired replication slot which will be
|
||||
* released by the point this function returns.
|
||||
*/
|
||||
static void
|
||||
ReplicationSlotDropAcquired(void)
|
||||
{
|
||||
char path[MAXPGPATH];
|
||||
char tmppath[MAXPGPATH];
|
||||
ReplicationSlot *slot = MyReplicationSlot;
|
||||
|
||||
ReplicationSlotValidateName(name, ERROR);
|
||||
Assert(MyReplicationSlot != NULL);
|
||||
|
||||
/* slot isn't acquired anymore */
|
||||
MyReplicationSlot = NULL;
|
||||
|
||||
/*
|
||||
* If some other backend ran this code currently with us, we might both
|
||||
* try to free the same slot at the same time. Or we might try to delete
|
||||
* a slot with a certain name while someone else was trying to create a
|
||||
* slot with the same name.
|
||||
* If some other backend ran this code concurrently with us, we might try
|
||||
* to delete a slot with a certain name while someone else was trying to
|
||||
* create a slot with the same name.
|
||||
*/
|
||||
LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
|
||||
|
||||
/* Search for the named slot and mark it active if we find it. */
|
||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||
for (i = 0; i < max_replication_slots; i++)
|
||||
{
|
||||
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
||||
|
||||
if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
|
||||
{
|
||||
volatile ReplicationSlot *vslot = s;
|
||||
|
||||
SpinLockAcquire(&s->mutex);
|
||||
active = vslot->active;
|
||||
vslot->active = true;
|
||||
SpinLockRelease(&s->mutex);
|
||||
slot = s;
|
||||
break;
|
||||
}
|
||||
}
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
|
||||
/* If we did not find the slot or it was already active, error out. */
|
||||
if (slot == NULL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_UNDEFINED_OBJECT),
|
||||
errmsg("replication slot \"%s\" does not exist", name)));
|
||||
if (active)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_IN_USE),
|
||||
errmsg("replication slot \"%s\" is already active", name)));
|
||||
|
||||
/* Generate pathnames. */
|
||||
sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
|
||||
sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
|
||||
@@ -417,34 +422,40 @@ ReplicationSlotDrop(const char *name)
|
||||
/*
|
||||
* Rename the slot directory on disk, so that we'll no longer recognize
|
||||
* this as a valid slot. Note that if this fails, we've got to mark the
|
||||
* slot inactive again before bailing out.
|
||||
* slot inactive before bailing out. If we're dropping a ephemeral slot,
|
||||
* we better never fail hard as the caller won't expect the slot to
|
||||
* survive and this might get called during error handling.
|
||||
*/
|
||||
if (rename(path, tmppath) != 0)
|
||||
if (rename(path, tmppath) == 0)
|
||||
{
|
||||
/*
|
||||
* We need to fsync() the directory we just renamed and its parent to
|
||||
* make sure that our changes are on disk in a crash-safe fashion. If
|
||||
* fsync() fails, we can't be sure whether the changes are on disk or
|
||||
* not. For now, we handle that by panicking;
|
||||
* StartupReplicationSlots() will try to straighten it out after
|
||||
* restart.
|
||||
*/
|
||||
START_CRIT_SECTION();
|
||||
fsync_fname(tmppath, true);
|
||||
fsync_fname("pg_replslot", true);
|
||||
END_CRIT_SECTION();
|
||||
}
|
||||
else
|
||||
{
|
||||
volatile ReplicationSlot *vslot = slot;
|
||||
bool fail_softly = slot->data.persistency == RS_EPHEMERAL;
|
||||
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
vslot->active = false;
|
||||
SpinLockRelease(&slot->mutex);
|
||||
|
||||
ereport(ERROR,
|
||||
ereport(fail_softly ? WARNING : ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not rename \"%s\" to \"%s\": %m",
|
||||
path, tmppath)));
|
||||
}
|
||||
|
||||
/*
|
||||
* We need to fsync() the directory we just renamed and its parent to make
|
||||
* sure that our changes are on disk in a crash-safe fashion. If fsync()
|
||||
* fails, we can't be sure whether the changes are on disk or not. For
|
||||
* now, we handle that by panicking; StartupReplicationSlots() will
|
||||
* try to straighten it out after restart.
|
||||
*/
|
||||
START_CRIT_SECTION();
|
||||
fsync_fname(tmppath, true);
|
||||
fsync_fname("pg_replslot", true);
|
||||
END_CRIT_SECTION();
|
||||
|
||||
/*
|
||||
* The slot is definitely gone. Lock out concurrent scans of the array
|
||||
* long enough to kill it. It's OK to clear the active flag here without
|
||||
@@ -461,7 +472,7 @@ ReplicationSlotDrop(const char *name)
|
||||
* Slot is dead and doesn't prevent resource removal anymore, recompute
|
||||
* limits.
|
||||
*/
|
||||
ReplicationSlotsComputeRequiredXmin();
|
||||
ReplicationSlotsComputeRequiredXmin(false);
|
||||
ReplicationSlotsComputeRequiredLSN();
|
||||
|
||||
/*
|
||||
@@ -518,22 +529,50 @@ ReplicationSlotMarkDirty(void)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Convert a slot that's marked as RS_DROP_ON_ERROR to a RS_PERSISTENT slot,
|
||||
* guaranteeing it will be there after a eventual crash.
|
||||
*/
|
||||
void
|
||||
ReplicationSlotPersist(void)
|
||||
{
|
||||
ReplicationSlot *slot = MyReplicationSlot;
|
||||
|
||||
Assert(slot != NULL);
|
||||
Assert(slot->data.persistency != RS_PERSISTENT);
|
||||
|
||||
{
|
||||
volatile ReplicationSlot *vslot = slot;
|
||||
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
vslot->data.persistency = RS_PERSISTENT;
|
||||
SpinLockRelease(&slot->mutex);
|
||||
}
|
||||
|
||||
ReplicationSlotMarkDirty();
|
||||
ReplicationSlotSave();
|
||||
}
|
||||
|
||||
/*
|
||||
* Compute the oldest xmin across all slots and store it in the ProcArray.
|
||||
*/
|
||||
void
|
||||
ReplicationSlotsComputeRequiredXmin(void)
|
||||
ReplicationSlotsComputeRequiredXmin(bool already_locked)
|
||||
{
|
||||
int i;
|
||||
TransactionId agg_xmin = InvalidTransactionId;
|
||||
TransactionId agg_catalog_xmin = InvalidTransactionId;
|
||||
|
||||
Assert(ReplicationSlotCtl != NULL);
|
||||
|
||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||
if (!already_locked)
|
||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||
|
||||
for (i = 0; i < max_replication_slots; i++)
|
||||
{
|
||||
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
||||
TransactionId effective_xmin;
|
||||
TransactionId effective_catalog_xmin;
|
||||
|
||||
if (!s->in_use)
|
||||
continue;
|
||||
@@ -543,6 +582,7 @@ ReplicationSlotsComputeRequiredXmin(void)
|
||||
|
||||
SpinLockAcquire(&s->mutex);
|
||||
effective_xmin = vslot->effective_xmin;
|
||||
effective_catalog_xmin = vslot->effective_catalog_xmin;
|
||||
SpinLockRelease(&s->mutex);
|
||||
}
|
||||
|
||||
@@ -551,10 +591,18 @@ ReplicationSlotsComputeRequiredXmin(void)
|
||||
(!TransactionIdIsValid(agg_xmin) ||
|
||||
TransactionIdPrecedes(effective_xmin, agg_xmin)))
|
||||
agg_xmin = effective_xmin;
|
||||
}
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
|
||||
ProcArraySetReplicationSlotXmin(agg_xmin);
|
||||
/* check the catalog xmin */
|
||||
if (TransactionIdIsValid(effective_catalog_xmin) &&
|
||||
(!TransactionIdIsValid(agg_catalog_xmin) ||
|
||||
TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
|
||||
agg_catalog_xmin = effective_catalog_xmin;
|
||||
}
|
||||
|
||||
if (!already_locked)
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
|
||||
ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -595,6 +643,110 @@ ReplicationSlotsComputeRequiredLSN(void)
|
||||
XLogSetReplicationSlotMinimumLSN(min_required);
|
||||
}
|
||||
|
||||
/*
|
||||
* Compute the oldest WAL LSN required by *logical* decoding slots..
|
||||
*
|
||||
* Returns InvalidXLogRecPtr if logical decoding is disabled or no logicals
|
||||
* slots exist.
|
||||
*
|
||||
* NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
|
||||
* ignores physical replication slots.
|
||||
*
|
||||
* The results aren't required frequently, so we don't maintain a precomputed
|
||||
* value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
|
||||
*/
|
||||
XLogRecPtr
|
||||
ReplicationSlotsComputeLogicalRestartLSN(void)
|
||||
{
|
||||
XLogRecPtr result = InvalidXLogRecPtr;
|
||||
int i;
|
||||
|
||||
if (max_replication_slots <= 0)
|
||||
return InvalidXLogRecPtr;
|
||||
|
||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||
|
||||
for (i = 0; i < max_replication_slots; i++)
|
||||
{
|
||||
volatile ReplicationSlot *s;
|
||||
XLogRecPtr restart_lsn;
|
||||
|
||||
s = &ReplicationSlotCtl->replication_slots[i];
|
||||
|
||||
/* cannot change while ReplicationSlotCtlLock is held */
|
||||
if (!s->in_use)
|
||||
continue;
|
||||
|
||||
/* we're only interested in logical slots */
|
||||
if (s->data.database == InvalidOid)
|
||||
continue;
|
||||
|
||||
/* read once, it's ok if it increases while we're checking */
|
||||
SpinLockAcquire(&s->mutex);
|
||||
restart_lsn = s->data.restart_lsn;
|
||||
SpinLockRelease(&s->mutex);
|
||||
|
||||
if (result == InvalidXLogRecPtr ||
|
||||
restart_lsn < result)
|
||||
result = restart_lsn;
|
||||
}
|
||||
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
|
||||
* passed database oid.
|
||||
*
|
||||
* Returns true if there are any slots referencing the database. *nslots will
|
||||
* be set to the absolute number of slots in the database, *nactive to ones
|
||||
* currently active.
|
||||
*/
|
||||
bool
|
||||
ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
|
||||
{
|
||||
int i;
|
||||
|
||||
*nslots = *nactive = 0;
|
||||
|
||||
if (max_replication_slots <= 0)
|
||||
return false;
|
||||
|
||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||
for (i = 0; i < max_replication_slots; i++)
|
||||
{
|
||||
volatile ReplicationSlot *s;
|
||||
|
||||
s = &ReplicationSlotCtl->replication_slots[i];
|
||||
|
||||
/* cannot change while ReplicationSlotCtlLock is held */
|
||||
if (!s->in_use)
|
||||
continue;
|
||||
|
||||
/* not database specific, skip */
|
||||
if (s->data.database == InvalidOid)
|
||||
|
||||
/* not our database, skip */
|
||||
if (s->data.database != dboid)
|
||||
continue;
|
||||
|
||||
/* count slots with spinlock held */
|
||||
SpinLockAcquire(&s->mutex);
|
||||
(*nslots)++;
|
||||
if (s->active)
|
||||
(*nactive)++;
|
||||
SpinLockRelease(&s->mutex);
|
||||
}
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
|
||||
if (*nslots > 0)
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Check whether the server's configuration supports using replication
|
||||
* slots.
|
||||
@@ -723,7 +875,7 @@ StartupReplicationSlots(XLogRecPtr checkPointRedo)
|
||||
return;
|
||||
|
||||
/* Now that we have recovered all the data, compute replication xmin */
|
||||
ReplicationSlotsComputeRequiredXmin();
|
||||
ReplicationSlotsComputeRequiredXmin(false);
|
||||
ReplicationSlotsComputeRequiredLSN();
|
||||
}
|
||||
|
||||
@@ -1050,8 +1202,19 @@ RestoreSlotFromDisk(const char *name)
|
||||
memcpy(&slot->data, &cp.slotdata,
|
||||
sizeof(ReplicationSlotPersistentData));
|
||||
|
||||
/* Don't restore the slot if it's not parked as persistent. */
|
||||
if (slot->data.persistency != RS_PERSISTENT)
|
||||
return;
|
||||
|
||||
/* initialize in memory state */
|
||||
slot->effective_xmin = cp.slotdata.xmin;
|
||||
slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
|
||||
|
||||
slot->candidate_catalog_xmin = InvalidTransactionId;
|
||||
slot->candidate_xmin_lsn = InvalidXLogRecPtr;
|
||||
slot->candidate_restart_lsn = InvalidXLogRecPtr;
|
||||
slot->candidate_restart_valid = InvalidXLogRecPtr;
|
||||
|
||||
slot->in_use = true;
|
||||
slot->active = false;
|
||||
|
||||
|
||||
@@ -15,13 +15,13 @@
|
||||
|
||||
#include "funcapi.h"
|
||||
#include "miscadmin.h"
|
||||
|
||||
#include "access/htup_details.h"
|
||||
#include "replication/slot.h"
|
||||
#include "replication/logical.h"
|
||||
#include "replication/logicalfuncs.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/pg_lsn.h"
|
||||
#include "replication/slot.h"
|
||||
|
||||
Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS);
|
||||
Datum pg_drop_replication_slot(PG_FUNCTION_ARGS);
|
||||
|
||||
static void
|
||||
check_permissions(void)
|
||||
@@ -54,7 +54,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
|
||||
elog(ERROR, "return type must be a row type");
|
||||
|
||||
/* acquire replication slot, this will check for conflicting names*/
|
||||
ReplicationSlotCreate(NameStr(*name), false);
|
||||
ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
|
||||
|
||||
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
|
||||
|
||||
@@ -69,6 +69,68 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
|
||||
PG_RETURN_DATUM(result);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SQL function for creating a new logical replication slot.
|
||||
*/
|
||||
Datum
|
||||
pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Name name = PG_GETARG_NAME(0);
|
||||
Name plugin = PG_GETARG_NAME(1);
|
||||
|
||||
LogicalDecodingContext *ctx = NULL;
|
||||
|
||||
TupleDesc tupdesc;
|
||||
HeapTuple tuple;
|
||||
Datum result;
|
||||
Datum values[2];
|
||||
bool nulls[2];
|
||||
|
||||
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
|
||||
elog(ERROR, "return type must be a row type");
|
||||
|
||||
check_permissions();
|
||||
|
||||
CheckLogicalDecodingRequirements();
|
||||
|
||||
Assert(!MyReplicationSlot);
|
||||
|
||||
/*
|
||||
* Acquire a logical decoding slot, this will check for conflicting
|
||||
* names.
|
||||
*/
|
||||
ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL);
|
||||
|
||||
/*
|
||||
* Create logical decoding context, to build the initial snapshot.
|
||||
*/
|
||||
ctx = CreateInitDecodingContext(
|
||||
NameStr(*plugin), NIL,
|
||||
logical_read_local_xlog_page, NULL, NULL);
|
||||
|
||||
/* build initial snapshot, might take a while */
|
||||
DecodingContextFindStartpoint(ctx);
|
||||
|
||||
values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name));
|
||||
values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
|
||||
|
||||
/* don't need the decoding context anymore */
|
||||
FreeDecodingContext(ctx);
|
||||
|
||||
memset(nulls, 0, sizeof(nulls));
|
||||
|
||||
tuple = heap_form_tuple(tupdesc, values, nulls);
|
||||
result = HeapTupleGetDatum(tuple);
|
||||
|
||||
/* ok, slot is now fully created, mark it as persistent */
|
||||
ReplicationSlotPersist();
|
||||
ReplicationSlotRelease();
|
||||
|
||||
PG_RETURN_DATUM(result);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SQL function for dropping a replication slot.
|
||||
*/
|
||||
@@ -92,7 +154,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
|
||||
Datum
|
||||
pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
{
|
||||
#define PG_STAT_GET_REPLICATION_SLOTS_COLS 6
|
||||
#define PG_GET_REPLICATION_SLOTS_COLS 8
|
||||
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||
TupleDesc tupdesc;
|
||||
Tuplestorestate *tupstore;
|
||||
@@ -134,15 +196,16 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
for (slotno = 0; slotno < max_replication_slots; slotno++)
|
||||
{
|
||||
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
|
||||
Datum values[PG_STAT_GET_REPLICATION_SLOTS_COLS];
|
||||
bool nulls[PG_STAT_GET_REPLICATION_SLOTS_COLS];
|
||||
Datum values[PG_GET_REPLICATION_SLOTS_COLS];
|
||||
bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
|
||||
|
||||
TransactionId xmin;
|
||||
TransactionId catalog_xmin;
|
||||
XLogRecPtr restart_lsn;
|
||||
bool active;
|
||||
Oid database;
|
||||
NameData slot_name;
|
||||
|
||||
NameData plugin;
|
||||
int i;
|
||||
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
@@ -154,9 +217,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
else
|
||||
{
|
||||
xmin = slot->data.xmin;
|
||||
catalog_xmin = slot->data.catalog_xmin;
|
||||
database = slot->data.database;
|
||||
restart_lsn = slot->data.restart_lsn;
|
||||
namecpy(&slot_name, &slot->data.name);
|
||||
namecpy(&plugin, &slot->data.plugin);
|
||||
|
||||
active = slot->active;
|
||||
}
|
||||
@@ -166,19 +231,34 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
|
||||
i = 0;
|
||||
values[i++] = NameGetDatum(&slot_name);
|
||||
|
||||
if (database == InvalidOid)
|
||||
nulls[i++] = true;
|
||||
else
|
||||
values[i++] = NameGetDatum(&plugin);
|
||||
|
||||
if (database == InvalidOid)
|
||||
values[i++] = CStringGetTextDatum("physical");
|
||||
else
|
||||
values[i++] = CStringGetTextDatum("logical");
|
||||
|
||||
if (database == InvalidOid)
|
||||
nulls[i++] = true;
|
||||
else
|
||||
values[i++] = database;
|
||||
|
||||
values[i++] = BoolGetDatum(active);
|
||||
|
||||
if (xmin != InvalidTransactionId)
|
||||
values[i++] = TransactionIdGetDatum(xmin);
|
||||
else
|
||||
nulls[i++] = true;
|
||||
|
||||
if (catalog_xmin != InvalidTransactionId)
|
||||
values[i++] = TransactionIdGetDatum(catalog_xmin);
|
||||
else
|
||||
nulls[i++] = true;
|
||||
|
||||
if (restart_lsn != InvalidTransactionId)
|
||||
values[i++] = LSNGetDatum(restart_lsn);
|
||||
else
|
||||
|
||||
@@ -1147,7 +1147,7 @@ XLogWalRcvSendHSFeedback(bool immed)
|
||||
* everything else has been checked.
|
||||
*/
|
||||
if (hot_standby_feedback)
|
||||
xmin = GetOldestXmin(true, false);
|
||||
xmin = GetOldestXmin(NULL, false);
|
||||
else
|
||||
xmin = InvalidTransactionId;
|
||||
|
||||
|
||||
@@ -55,6 +55,7 @@
|
||||
#include "replication/basebackup.h"
|
||||
#include "replication/slot.h"
|
||||
#include "replication/syncrep.h"
|
||||
#include "replication/slot.h"
|
||||
#include "replication/walreceiver.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "replication/walsender_private.h"
|
||||
@@ -434,7 +435,7 @@ StartReplication(StartReplicationCmd *cmd)
|
||||
if (MyReplicationSlot->data.database != InvalidOid)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
(errmsg("cannot use a replication slot created for changeset extraction for streaming replication"))));
|
||||
(errmsg("cannot use a logical replication slot for physical replication"))));
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -656,7 +657,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
|
||||
sendTimeLineIsHistoric = false;
|
||||
sendTimeLine = ThisTimeLineID;
|
||||
|
||||
ReplicationSlotCreate(cmd->slotname, cmd->kind == REPLICATION_KIND_LOGICAL);
|
||||
ReplicationSlotCreate(cmd->slotname,
|
||||
cmd->kind == REPLICATION_KIND_LOGICAL,
|
||||
RS_PERSISTENT);
|
||||
|
||||
initStringInfo(&output_message);
|
||||
|
||||
@@ -766,7 +769,7 @@ exec_replication_command(const char *cmd_string)
|
||||
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
|
||||
StartReplication(cmd);
|
||||
else
|
||||
elog(ERROR, "cannot handle changeset extraction yet");
|
||||
elog(ERROR, "cannot handle logical decoding yet");
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -1017,7 +1020,7 @@ ProcessStandbyReplyMessage(void)
|
||||
if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
|
||||
{
|
||||
if (MyReplicationSlot->data.database != InvalidOid)
|
||||
elog(ERROR, "cannot handle changeset extraction yet");
|
||||
elog(ERROR, "cannot handle logical decoding yet");
|
||||
else
|
||||
PhysicalConfirmReceivedLocation(flushPtr);
|
||||
}
|
||||
@@ -1050,7 +1053,7 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
|
||||
if (changed)
|
||||
{
|
||||
ReplicationSlotMarkDirty();
|
||||
ReplicationSlotsComputeRequiredXmin();
|
||||
ReplicationSlotsComputeRequiredXmin(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user