1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-30 11:03:19 +03:00

Add facility to copy replication slots

This allows the user to create duplicates of existing replication slots,
either logical or physical, and even changing properties such as whether
they are temporary or the output plugin used.

There are multiple uses for this, such as initializing multiple replicas
using the slot for one base backup; when doing investigation of logical
replication issues; and to select a different output plugins.

Author: Masahiko Sawada
Reviewed-by: Michael Paquier, Andres Freund, Petr Jelinek
Discussion: https://postgr.es/m/CAD21AoAm7XX8y_tOPP6j4Nzzch12FvA1wPqiO690RCk+uYVstg@mail.gmail.com
This commit is contained in:
Alvaro Herrera
2019-04-05 14:52:45 -03:00
parent de2b38419c
commit 9f06d79ef8
9 changed files with 736 additions and 50 deletions

View File

@ -211,11 +211,15 @@ StartupDecodingContext(List *output_plugin_options,
/*
* Create a new decoding context, for a new logical slot.
*
* plugin contains the name of the output plugin
* output_plugin_options contains options passed to the output plugin
* read_page, prepare_write, do_write, update_progress
* callbacks that have to be filled to perform the use-case dependent,
* actual, work.
* plugin -- contains the name of the output plugin
* output_plugin_options -- contains options passed to the output plugin
* restart_lsn -- if given as invalid, it's this routine's responsibility to
* mark WAL as reserved by setting a convenient restart_lsn for the slot.
* Otherwise, we set for decoding to start from the given LSN without
* marking WAL reserved beforehand. In that scenario, it's up to the
* caller to guarantee that WAL remains available.
* read_page, prepare_write, do_write, update_progress --
* callbacks that perform the use-case dependent, actual, work.
*
* Needs to be called while in a memory context that's at least as long lived
* as the decoding context because further memory contexts will be created
@ -228,6 +232,7 @@ LogicalDecodingContext *
CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
@ -271,7 +276,14 @@ CreateInitDecodingContext(char *plugin,
StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
SpinLockRelease(&slot->mutex);
ReplicationSlotReserveWal();
if (XLogRecPtrIsInvalid(restart_lsn))
ReplicationSlotReserveWal();
else
{
SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = restart_lsn;
SpinLockRelease(&slot->mutex);
}
/* ----
* This is a bit tricky: We need to determine a safe xmin horizon to start
@ -316,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
ReplicationSlotMarkDirty();
ReplicationSlotSave();
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
need_full_snapshot, false,
read_page, prepare_write, do_write,
update_progress);

View File

@ -10,13 +10,12 @@
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup_details.h"
#include "access/xlog_internal.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "access/htup_details.h"
#include "replication/decode.h"
#include "replication/slot.h"
#include "replication/logical.h"
@ -35,6 +34,38 @@ check_permissions(void)
(errmsg("must be superuser or replication role to use replication slots"))));
}
/*
* Helper function for creating a new physical replication slot with
* given arguments. Note that this function doesn't release the created
* slot.
*
* If restart_lsn is a valid value, we use it without WAL reservation
* routine. So the caller must guarantee that WAL is available.
*/
static void
create_physical_replication_slot(char *name, bool immediately_reserve,
bool temporary, XLogRecPtr restart_lsn)
{
Assert(!MyReplicationSlot);
/* acquire replication slot, this will check for conflicting names */
ReplicationSlotCreate(name, false,
temporary ? RS_TEMPORARY : RS_PERSISTENT);
if (immediately_reserve)
{
/* Reserve WAL as the user asked for it */
if (XLogRecPtrIsInvalid(restart_lsn))
ReplicationSlotReserveWal();
else
MyReplicationSlot->data.restart_lsn = restart_lsn;
/* Write this slot to disk */
ReplicationSlotMarkDirty();
ReplicationSlotSave();
}
}
/*
* SQL function for creating a new physical (streaming replication)
* replication slot.
@ -51,8 +82,6 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
HeapTuple tuple;
Datum result;
Assert(!MyReplicationSlot);
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
@ -60,29 +89,21 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
CheckSlotRequirements();
/* acquire replication slot, this will check for conflicting names */
ReplicationSlotCreate(NameStr(*name), false,
temporary ? RS_TEMPORARY : RS_PERSISTENT);
create_physical_replication_slot(NameStr(*name),
immediately_reserve,
temporary,
InvalidXLogRecPtr);
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
nulls[0] = false;
if (immediately_reserve)
{
/* Reserve WAL as the user asked for it */
ReplicationSlotReserveWal();
/* Write this slot to disk */
ReplicationSlotMarkDirty();
ReplicationSlotSave();
values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
nulls[1] = false;
}
else
{
nulls[1] = true;
}
tuple = heap_form_tuple(tupdesc, values, nulls);
result = HeapTupleGetDatum(tuple);
@ -94,32 +115,18 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
/*
* SQL function for creating a new logical replication slot.
* Helper function for creating a new logical replication slot with
* given arguments. Note that this function doesn't release the created
* slot.
*/
Datum
pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
static void
create_logical_replication_slot(char *name, char *plugin,
bool temporary, XLogRecPtr restart_lsn)
{
Name name = PG_GETARG_NAME(0);
Name plugin = PG_GETARG_NAME(1);
bool temporary = PG_GETARG_BOOL(2);
LogicalDecodingContext *ctx = NULL;
TupleDesc tupdesc;
HeapTuple tuple;
Datum result;
Datum values[2];
bool nulls[2];
Assert(!MyReplicationSlot);
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
check_permissions();
CheckLogicalDecodingRequirements();
/*
* Acquire a logical decoding slot, this will check for conflicting names.
* Initially create persistent slot as ephemeral - that allows us to
@ -128,25 +135,54 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
* slots can be created as temporary from beginning as they get dropped on
* error as well.
*/
ReplicationSlotCreate(NameStr(*name), true,
ReplicationSlotCreate(name, true,
temporary ? RS_TEMPORARY : RS_EPHEMERAL);
/*
* Create logical decoding context, to build the initial snapshot.
*/
ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
ctx = CreateInitDecodingContext(plugin, NIL,
false, /* do not build snapshot */
restart_lsn,
logical_read_local_xlog_page, NULL, NULL,
NULL);
/* build initial snapshot, might take a while */
DecodingContextFindStartpoint(ctx);
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
/* don't need the decoding context anymore */
FreeDecodingContext(ctx);
}
/*
* SQL function for creating a new logical replication slot.
*/
Datum
pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
{
Name name = PG_GETARG_NAME(0);
Name plugin = PG_GETARG_NAME(1);
bool temporary = PG_GETARG_BOOL(2);
Datum result;
TupleDesc tupdesc;
HeapTuple tuple;
Datum values[2];
bool nulls[2];
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
check_permissions();
CheckLogicalDecodingRequirements();
create_logical_replication_slot(NameStr(*name),
NameStr(*plugin),
temporary,
InvalidXLogRecPtr);
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
memset(nulls, 0, sizeof(nulls));
@ -558,3 +594,235 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
PG_RETURN_DATUM(result);
}
/*
* Helper function of copying a replication slot.
*/
static Datum
copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
{
Name src_name = PG_GETARG_NAME(0);
Name dst_name = PG_GETARG_NAME(1);
ReplicationSlot *src = NULL;
XLogRecPtr src_restart_lsn;
bool src_islogical;
bool temporary;
char *plugin;
Datum values[2];
bool nulls[2];
Datum result;
TupleDesc tupdesc;
HeapTuple tuple;
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
check_permissions();
if (logical_slot)
CheckLogicalDecodingRequirements();
else
CheckSlotRequirements();
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
/*
* We need to prevent the source slot's reserved WAL from being removed,
* but we don't want to lock that slot for very long, and it can advance
* in the meantime. So obtain the source slot's data, and create a new
* slot using its restart_lsn. Afterwards we lock the source slot again
* and verify that the data we copied (name, type) has not changed
* incompatibly. No inconvenient WAL removal can occur once the new slot
* is created -- but since WAL removal could have occurred before we
* managed to create the new slot, we advance the new slot's restart_lsn
* to the source slot's updated restart_lsn the second time we lock it.
*/
for (int i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
{
SpinLockAcquire(&s->mutex);
src_islogical = SlotIsLogical(s);
src_restart_lsn = s->data.restart_lsn;
temporary = s->data.persistency == RS_TEMPORARY;
plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL;
SpinLockRelease(&s->mutex);
src = s;
break;
}
}
LWLockRelease(ReplicationSlotControlLock);
if (src == NULL)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
/* Check type of replication slot */
if (src_islogical != logical_slot)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
src_islogical ?
errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
NameStr(*src_name)) :
errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
NameStr(*src_name))));
/* Copying non-reserved slot doesn't make sense */
if (XLogRecPtrIsInvalid(src_restart_lsn))
{
Assert(!logical_slot);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
(errmsg("cannot copy a replication slot that doesn't reserve WAL"))));
}
/* Overwrite params from optional arguments */
if (PG_NARGS() >= 3)
temporary = PG_GETARG_BOOL(2);
if (PG_NARGS() >= 4)
{
Assert(logical_slot);
plugin = NameStr(*(PG_GETARG_NAME(3)));
}
/* Create new slot and acquire it */
if (logical_slot)
create_logical_replication_slot(NameStr(*dst_name),
plugin,
temporary,
src_restart_lsn);
else
create_physical_replication_slot(NameStr(*dst_name),
true,
temporary,
src_restart_lsn);
/*
* Update the destination slot to current values of the source slot;
* recheck that the source slot is still the one we saw previously.
*/
{
TransactionId copy_effective_xmin;
TransactionId copy_effective_catalog_xmin;
TransactionId copy_xmin;
TransactionId copy_catalog_xmin;
XLogRecPtr copy_restart_lsn;
bool copy_islogical;
char *copy_name;
/* Copy data of source slot again */
SpinLockAcquire(&src->mutex);
copy_effective_xmin = src->effective_xmin;
copy_effective_catalog_xmin = src->effective_catalog_xmin;
copy_xmin = src->data.xmin;
copy_catalog_xmin = src->data.catalog_xmin;
copy_restart_lsn = src->data.restart_lsn;
/* for existence check */
copy_name = pstrdup(NameStr(src->data.name));
copy_islogical = SlotIsLogical(src);
SpinLockRelease(&src->mutex);
/*
* Check if the source slot still exists and is valid. We regards it
* as invalid if the type of replication slot or name has been
* changed, or the restart_lsn either is invalid or has gone backward.
* (The restart_lsn could go backwards if the source slot is dropped
* and copied from an older slot during installation.)
*
* Since erroring out will release and drop the destination slot we
* don't need to release it here.
*/
if (copy_restart_lsn < src_restart_lsn ||
src_islogical != copy_islogical ||
strcmp(copy_name, NameStr(*src_name)) != 0)
ereport(ERROR,
(errmsg("could not copy replication slot \"%s\"",
NameStr(*src_name)),
errdetail("The source replication slot was modified incompatibly during the copy operation.")));
/* Install copied values again */
SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->effective_xmin = copy_effective_xmin;
MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
MyReplicationSlot->data.xmin = copy_xmin;
MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
SpinLockRelease(&MyReplicationSlot->mutex);
ReplicationSlotMarkDirty();
ReplicationSlotsComputeRequiredXmin(false);
ReplicationSlotsComputeRequiredLSN();
ReplicationSlotSave();
#ifdef USE_ASSERT_CHECKING
/* Check that the restart_lsn is available */
{
XLogSegNo segno;
XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
Assert(XLogGetLastRemovedSegno() < segno);
}
#endif
}
/* target slot fully created, mark as persistent if needed */
if (logical_slot && !temporary)
ReplicationSlotPersist();
/* All done. Set up the return values */
values[0] = NameGetDatum(dst_name);
nulls[0] = false;
if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush))
{
values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
nulls[1] = false;
}
else
nulls[1] = true;
tuple = heap_form_tuple(tupdesc, values, nulls);
result = HeapTupleGetDatum(tuple);
ReplicationSlotRelease();
PG_RETURN_DATUM(result);
}
/* The wrappers below are all to appease opr_sanity */
Datum
pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
{
return copy_replication_slot(fcinfo, true);
}
Datum
pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
{
return copy_replication_slot(fcinfo, true);
}
Datum
pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
{
return copy_replication_slot(fcinfo, true);
}
Datum
pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
{
return copy_replication_slot(fcinfo, false);
}
Datum
pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
{
return copy_replication_slot(fcinfo, false);
}

View File

@ -934,6 +934,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
}
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
InvalidXLogRecPtr,
logical_read_xlog_page,
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);

View File

@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201904031
#define CATALOG_VERSION_NO 201904051
#endif

View File

@ -9774,6 +9774,20 @@
proargmodes => '{i,i,i,o,o}',
proargnames => '{slot_name,immediately_reserve,temporary,slot_name,lsn}',
prosrc => 'pg_create_physical_replication_slot' },
{ oid => '4220', descr => 'copy a physical replication slot, changing temporality',
proname => 'pg_copy_physical_replication_slot', provolatile => 'v',
proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool',
proallargtypes => '{name,name,bool,name,pg_lsn}',
proargmodes => '{i,i,i,o,o}',
proargnames => '{src_slot_name,dst_slot_name,temporary,slot_name,lsn}',
prosrc => 'pg_copy_physical_replication_slot_a' },
{ oid => '4221', descr => 'copy a physical replication slot',
proname => 'pg_copy_physical_replication_slot', provolatile => 'v',
proparallel => 'u', prorettype => 'record', proargtypes => 'name name',
proallargtypes => '{name,name,name,pg_lsn}',
proargmodes => '{i,i,o,o}',
proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}',
prosrc => 'pg_copy_physical_replication_slot_b' },
{ oid => '3780', descr => 'drop a replication slot',
proname => 'pg_drop_replication_slot', provolatile => 'v', proparallel => 'u',
prorettype => 'void', proargtypes => 'name',
@ -9794,6 +9808,27 @@
proargmodes => '{i,i,i,o,o}',
proargnames => '{slot_name,plugin,temporary,slot_name,lsn}',
prosrc => 'pg_create_logical_replication_slot' },
{ oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin',
proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool name',
proallargtypes => '{name,name,bool,name,name,pg_lsn}',
proargmodes => '{i,i,i,i,o,o}',
proargnames => '{src_slot_name,dst_slot_name,temporary,plugin,slot_name,lsn}',
prosrc => 'pg_copy_logical_replication_slot_a' },
{ oid => '4223', descr => 'copy a logical replication slot, changing temporality',
proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool',
proallargtypes => '{name,name,bool,name,pg_lsn}',
proargmodes => '{i,i,i,o,o}',
proargnames => '{src_slot_name,dst_slot_name,temporary,slot_name,lsn}',
prosrc => 'pg_copy_logical_replication_slot_b' },
{ oid => '4224', descr => 'copy a logical replication slot',
proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
proparallel => 'u', prorettype => 'record', proargtypes => 'name name',
proallargtypes => '{name,name,name,pg_lsn}',
proargmodes => '{i,i,o,o}',
proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}',
prosrc => 'pg_copy_logical_replication_slot_c' },
{ oid => '3782', descr => 'get changes from replication slot',
proname => 'pg_logical_slot_get_changes', procost => '1000',
prorows => '1000', provariadic => 'text', proisstrict => 'f',

View File

@ -97,6 +97,7 @@ extern void CheckLogicalDecodingRequirements(void);
extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,