mirror of
https://github.com/postgres/postgres.git
synced 2025-11-10 17:42:29 +03:00
Ability to advance replication slots
Ability to advance both physical and logical replication slots using a new user function pg_replication_slot_advance(). For logical advance that means records are consumed as fast as possible and changes are not given to output plugin for sending. Makes 2nd phase (after we reached SNAPBUILD_FULL_SNAPSHOT) of replication slot creation faster, especially when there are big transactions as the reorder buffer does not have to deal with data changes and does not have to spill to disk. Author: Petr Jelinek Reviewed-by: Simon Riggs
This commit is contained in:
@@ -88,6 +88,9 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
|
||||
* call ReorderBufferProcessXid for each record type by default, because
|
||||
* e.g. empty xacts can be handled more efficiently if there's no previous
|
||||
* state for them.
|
||||
*
|
||||
* We also support the ability to fast forward thru records, skipping some
|
||||
* record types completely - see individual record types for details.
|
||||
*/
|
||||
void
|
||||
LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
|
||||
@@ -332,8 +335,10 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
xl_invalidations *invalidations =
|
||||
(xl_invalidations *) XLogRecGetData(r);
|
||||
|
||||
ReorderBufferImmediateInvalidation(
|
||||
ctx->reorder, invalidations->nmsgs, invalidations->msgs);
|
||||
if (!ctx->fast_forward)
|
||||
ReorderBufferImmediateInvalidation(ctx->reorder,
|
||||
invalidations->nmsgs,
|
||||
invalidations->msgs);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
@@ -353,14 +358,19 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
|
||||
ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
|
||||
|
||||
/* no point in doing anything yet */
|
||||
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
|
||||
/*
|
||||
* If we don't have snapshot or we are just fast-forwarding, there is no
|
||||
* point in decoding changes.
|
||||
*/
|
||||
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
|
||||
ctx->fast_forward)
|
||||
return;
|
||||
|
||||
switch (info)
|
||||
{
|
||||
case XLOG_HEAP2_MULTI_INSERT:
|
||||
if (SnapBuildProcessChange(builder, xid, buf->origptr))
|
||||
if (!ctx->fast_forward &&
|
||||
SnapBuildProcessChange(builder, xid, buf->origptr))
|
||||
DecodeMultiInsert(ctx, buf);
|
||||
break;
|
||||
case XLOG_HEAP2_NEW_CID:
|
||||
@@ -408,8 +418,12 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
|
||||
ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
|
||||
|
||||
/* no point in doing anything yet */
|
||||
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
|
||||
/*
|
||||
* If we don't have snapshot or we are just fast-forwarding, there is no
|
||||
* point in decoding data changes.
|
||||
*/
|
||||
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
|
||||
ctx->fast_forward)
|
||||
return;
|
||||
|
||||
switch (info)
|
||||
@@ -501,8 +515,12 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
|
||||
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
|
||||
|
||||
/* No point in doing anything yet. */
|
||||
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
|
||||
/*
|
||||
* If we don't have snapshot or we are just fast-forwarding, there is no
|
||||
* point in decoding messages.
|
||||
*/
|
||||
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
|
||||
ctx->fast_forward)
|
||||
return;
|
||||
|
||||
message = (xl_logical_message *) XLogRecGetData(r);
|
||||
@@ -554,8 +572,9 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
||||
*/
|
||||
if (parsed->nmsgs > 0)
|
||||
{
|
||||
ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
|
||||
parsed->nmsgs, parsed->msgs);
|
||||
if (!ctx->fast_forward)
|
||||
ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
|
||||
parsed->nmsgs, parsed->msgs);
|
||||
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
|
||||
}
|
||||
|
||||
@@ -574,6 +593,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
||||
* are restarting or if we haven't assembled a consistent snapshot yet.
|
||||
* 2) The transaction happened in another database.
|
||||
* 3) The output plugin is not interested in the origin.
|
||||
* 4) We are doing fast-forwarding
|
||||
*
|
||||
* We can't just use ReorderBufferAbort() here, because we need to execute
|
||||
* the transaction's invalidations. This currently won't be needed if
|
||||
@@ -589,7 +609,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
||||
*/
|
||||
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
|
||||
(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
|
||||
FilterByOrigin(ctx, origin_id))
|
||||
ctx->fast_forward || FilterByOrigin(ctx, origin_id))
|
||||
{
|
||||
for (i = 0; i < parsed->nsubxacts; i++)
|
||||
{
|
||||
|
||||
@@ -115,6 +115,7 @@ StartupDecodingContext(List *output_plugin_options,
|
||||
XLogRecPtr start_lsn,
|
||||
TransactionId xmin_horizon,
|
||||
bool need_full_snapshot,
|
||||
bool fast_forward,
|
||||
XLogPageReadCB read_page,
|
||||
LogicalOutputPluginWriterPrepareWrite prepare_write,
|
||||
LogicalOutputPluginWriterWrite do_write,
|
||||
@@ -140,7 +141,8 @@ StartupDecodingContext(List *output_plugin_options,
|
||||
* (re-)load output plugins, so we detect a bad (removed) output plugin
|
||||
* now.
|
||||
*/
|
||||
LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
|
||||
if (!fast_forward)
|
||||
LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
|
||||
|
||||
/*
|
||||
* Now that the slot's xmin has been set, we can announce ourselves as a
|
||||
@@ -191,6 +193,8 @@ StartupDecodingContext(List *output_plugin_options,
|
||||
|
||||
ctx->output_plugin_options = output_plugin_options;
|
||||
|
||||
ctx->fast_forward = fast_forward;
|
||||
|
||||
MemoryContextSwitchTo(old_context);
|
||||
|
||||
return ctx;
|
||||
@@ -303,8 +307,9 @@ CreateInitDecodingContext(char *plugin,
|
||||
ReplicationSlotSave();
|
||||
|
||||
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
|
||||
need_full_snapshot, read_page, prepare_write,
|
||||
do_write, update_progress);
|
||||
need_full_snapshot, true,
|
||||
read_page, prepare_write, do_write,
|
||||
update_progress);
|
||||
|
||||
/* call output plugin initialization callback */
|
||||
old_context = MemoryContextSwitchTo(ctx->context);
|
||||
@@ -342,6 +347,7 @@ CreateInitDecodingContext(char *plugin,
|
||||
LogicalDecodingContext *
|
||||
CreateDecodingContext(XLogRecPtr start_lsn,
|
||||
List *output_plugin_options,
|
||||
bool fast_forward,
|
||||
XLogPageReadCB read_page,
|
||||
LogicalOutputPluginWriterPrepareWrite prepare_write,
|
||||
LogicalOutputPluginWriterWrite do_write,
|
||||
@@ -395,8 +401,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
|
||||
|
||||
ctx = StartupDecodingContext(output_plugin_options,
|
||||
start_lsn, InvalidTransactionId, false,
|
||||
read_page, prepare_write, do_write,
|
||||
update_progress);
|
||||
fast_forward, read_page, prepare_write,
|
||||
do_write, update_progress);
|
||||
|
||||
/* call output plugin initialization callback */
|
||||
old_context = MemoryContextSwitchTo(ctx->context);
|
||||
@@ -573,6 +579,8 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
|
||||
LogicalErrorCallbackState state;
|
||||
ErrorContextCallback errcallback;
|
||||
|
||||
Assert(!ctx->fast_forward);
|
||||
|
||||
/* Push callback + info on the error context stack */
|
||||
state.ctx = ctx;
|
||||
state.callback_name = "startup";
|
||||
@@ -598,6 +606,8 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
|
||||
LogicalErrorCallbackState state;
|
||||
ErrorContextCallback errcallback;
|
||||
|
||||
Assert(!ctx->fast_forward);
|
||||
|
||||
/* Push callback + info on the error context stack */
|
||||
state.ctx = ctx;
|
||||
state.callback_name = "shutdown";
|
||||
@@ -629,6 +639,8 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
|
||||
LogicalErrorCallbackState state;
|
||||
ErrorContextCallback errcallback;
|
||||
|
||||
Assert(!ctx->fast_forward);
|
||||
|
||||
/* Push callback + info on the error context stack */
|
||||
state.ctx = ctx;
|
||||
state.callback_name = "begin";
|
||||
@@ -658,6 +670,8 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
LogicalErrorCallbackState state;
|
||||
ErrorContextCallback errcallback;
|
||||
|
||||
Assert(!ctx->fast_forward);
|
||||
|
||||
/* Push callback + info on the error context stack */
|
||||
state.ctx = ctx;
|
||||
state.callback_name = "commit";
|
||||
@@ -687,6 +701,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
LogicalErrorCallbackState state;
|
||||
ErrorContextCallback errcallback;
|
||||
|
||||
Assert(!ctx->fast_forward);
|
||||
|
||||
/* Push callback + info on the error context stack */
|
||||
state.ctx = ctx;
|
||||
state.callback_name = "change";
|
||||
@@ -721,6 +737,8 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
|
||||
ErrorContextCallback errcallback;
|
||||
bool ret;
|
||||
|
||||
Assert(!ctx->fast_forward);
|
||||
|
||||
/* Push callback + info on the error context stack */
|
||||
state.ctx = ctx;
|
||||
state.callback_name = "filter_by_origin";
|
||||
@@ -751,6 +769,8 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
LogicalErrorCallbackState state;
|
||||
ErrorContextCallback errcallback;
|
||||
|
||||
Assert(!ctx->fast_forward);
|
||||
|
||||
if (ctx->callbacks.message_cb == NULL)
|
||||
return;
|
||||
|
||||
|
||||
@@ -251,6 +251,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
|
||||
/* restart at slot's confirmed_flush */
|
||||
ctx = CreateDecodingContext(InvalidXLogRecPtr,
|
||||
options,
|
||||
false,
|
||||
logical_read_local_xlog_page,
|
||||
LogicalOutputPrepareWrite,
|
||||
LogicalOutputWrite, NULL);
|
||||
|
||||
@@ -17,11 +17,14 @@
|
||||
#include "miscadmin.h"
|
||||
|
||||
#include "access/htup_details.h"
|
||||
#include "replication/decode.h"
|
||||
#include "replication/slot.h"
|
||||
#include "replication/logical.h"
|
||||
#include "replication/logicalfuncs.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/inval.h"
|
||||
#include "utils/pg_lsn.h"
|
||||
#include "utils/resowner.h"
|
||||
|
||||
static void
|
||||
check_permissions(void)
|
||||
@@ -312,3 +315,200 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
|
||||
return (Datum) 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper function for advancing physical replication slot forward.
|
||||
*/
|
||||
static XLogRecPtr
|
||||
pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
|
||||
{
|
||||
XLogRecPtr retlsn = InvalidXLogRecPtr;
|
||||
|
||||
SpinLockAcquire(&MyReplicationSlot->mutex);
|
||||
if (MyReplicationSlot->data.restart_lsn < moveto)
|
||||
{
|
||||
MyReplicationSlot->data.restart_lsn = moveto;
|
||||
retlsn = moveto;
|
||||
}
|
||||
SpinLockRelease(&MyReplicationSlot->mutex);
|
||||
|
||||
return retlsn;
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper function for advancing logical replication slot forward.
|
||||
*/
|
||||
static XLogRecPtr
|
||||
pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
|
||||
{
|
||||
LogicalDecodingContext *ctx;
|
||||
ResourceOwner old_resowner = CurrentResourceOwner;
|
||||
XLogRecPtr retlsn = InvalidXLogRecPtr;
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
/* restart at slot's confirmed_flush */
|
||||
ctx = CreateDecodingContext(InvalidXLogRecPtr,
|
||||
NIL,
|
||||
true,
|
||||
logical_read_local_xlog_page,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner,
|
||||
"logical decoding");
|
||||
|
||||
/* invalidate non-timetravel entries */
|
||||
InvalidateSystemCaches();
|
||||
|
||||
/* Decode until we run out of records */
|
||||
while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) ||
|
||||
(ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto))
|
||||
{
|
||||
XLogRecord *record;
|
||||
char *errm = NULL;
|
||||
|
||||
record = XLogReadRecord(ctx->reader, startlsn, &errm);
|
||||
if (errm)
|
||||
elog(ERROR, "%s", errm);
|
||||
|
||||
/*
|
||||
* Now that we've set up the xlog reader state, subsequent calls
|
||||
* pass InvalidXLogRecPtr to say "continue from last record"
|
||||
*/
|
||||
startlsn = InvalidXLogRecPtr;
|
||||
|
||||
/*
|
||||
* The {begin_txn,change,commit_txn}_wrapper callbacks above will
|
||||
* store the description into our tuplestore.
|
||||
*/
|
||||
if (record != NULL)
|
||||
LogicalDecodingProcessRecord(ctx, ctx->reader);
|
||||
|
||||
/* check limits */
|
||||
if (moveto <= ctx->reader->EndRecPtr)
|
||||
break;
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
|
||||
CurrentResourceOwner = old_resowner;
|
||||
|
||||
if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
|
||||
{
|
||||
LogicalConfirmReceivedLocation(moveto);
|
||||
|
||||
/*
|
||||
* If only the confirmed_flush_lsn has changed the slot won't get
|
||||
* marked as dirty by the above. Callers on the walsender
|
||||
* interface are expected to keep track of their own progress and
|
||||
* don't need it written out. But SQL-interface users cannot
|
||||
* specify their own start positions and it's harder for them to
|
||||
* keep track of their progress, so we should make more of an
|
||||
* effort to save it for them.
|
||||
*
|
||||
* Dirty the slot so it's written out at the next checkpoint.
|
||||
* We'll still lose its position on crash, as documented, but it's
|
||||
* better than always losing the position even on clean restart.
|
||||
*/
|
||||
ReplicationSlotMarkDirty();
|
||||
}
|
||||
|
||||
retlsn = MyReplicationSlot->data.confirmed_flush;
|
||||
|
||||
/* free context, call shutdown callback */
|
||||
FreeDecodingContext(ctx);
|
||||
|
||||
InvalidateSystemCaches();
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
/* clear all timetravel entries */
|
||||
InvalidateSystemCaches();
|
||||
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
return retlsn;
|
||||
}
|
||||
|
||||
/*
|
||||
* SQL function for moving the position in a replication slot.
|
||||
*/
|
||||
Datum
|
||||
pg_replication_slot_advance(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Name slotname = PG_GETARG_NAME(0);
|
||||
XLogRecPtr moveto = PG_GETARG_LSN(1);
|
||||
XLogRecPtr endlsn;
|
||||
XLogRecPtr startlsn;
|
||||
TupleDesc tupdesc;
|
||||
Datum values[2];
|
||||
bool nulls[2];
|
||||
HeapTuple tuple;
|
||||
Datum result;
|
||||
|
||||
Assert(!MyReplicationSlot);
|
||||
|
||||
check_permissions();
|
||||
|
||||
if (XLogRecPtrIsInvalid(moveto))
|
||||
ereport(ERROR,
|
||||
(errmsg("invalid target wal lsn")));
|
||||
|
||||
/* Build a tuple descriptor for our result type */
|
||||
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
|
||||
elog(ERROR, "return type must be a row type");
|
||||
|
||||
/*
|
||||
* We can't move slot past what's been flushed/replayed so clamp the
|
||||
* target possition accordingly.
|
||||
*/
|
||||
if (!RecoveryInProgress())
|
||||
moveto = Min(moveto, GetFlushRecPtr());
|
||||
else
|
||||
moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
|
||||
|
||||
/* Acquire the slot so we "own" it */
|
||||
ReplicationSlotAcquire(NameStr(*slotname), true);
|
||||
|
||||
startlsn = MyReplicationSlot->data.confirmed_flush;
|
||||
if (moveto < startlsn)
|
||||
{
|
||||
ReplicationSlotRelease();
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot move slot to %X/%X, minimum is %X/%X",
|
||||
(uint32) (moveto >> 32), (uint32) moveto,
|
||||
(uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
|
||||
(uint32) (MyReplicationSlot->data.confirmed_flush))));
|
||||
}
|
||||
|
||||
if (OidIsValid(MyReplicationSlot->data.database))
|
||||
endlsn = pg_logical_replication_slot_advance(startlsn, moveto);
|
||||
else
|
||||
endlsn = pg_physical_replication_slot_advance(startlsn, moveto);
|
||||
|
||||
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
|
||||
nulls[0] = false;
|
||||
|
||||
/* Update the on disk state when lsn was updated. */
|
||||
if (XLogRecPtrIsInvalid(endlsn))
|
||||
{
|
||||
ReplicationSlotMarkDirty();
|
||||
ReplicationSlotsComputeRequiredXmin(false);
|
||||
ReplicationSlotsComputeRequiredLSN();
|
||||
ReplicationSlotSave();
|
||||
}
|
||||
|
||||
ReplicationSlotRelease();
|
||||
|
||||
/* Return the reached position. */
|
||||
values[1] = LSNGetDatum(endlsn);
|
||||
nulls[1] = false;
|
||||
|
||||
tuple = heap_form_tuple(tupdesc, values, nulls);
|
||||
result = HeapTupleGetDatum(tuple);
|
||||
|
||||
PG_RETURN_DATUM(result);
|
||||
}
|
||||
|
||||
@@ -1075,6 +1075,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
|
||||
* to be shipped from that position.
|
||||
*/
|
||||
logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
|
||||
false,
|
||||
logical_read_xlog_page,
|
||||
WalSndPrepareWrite,
|
||||
WalSndWriteData,
|
||||
|
||||
Reference in New Issue
Block a user