diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index e3da7d36250..5adf253583b 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -208,6 +208,8 @@ StartupDecodingContext(List *output_plugin_options, * * plugin -- contains the name of the output plugin * output_plugin_options -- contains options passed to the output plugin + * need_full_snapshot -- if true, must obtain a snapshot able to read all + * tables; if false, one that can read only catalogs is acceptable. * restart_lsn -- if given as invalid, it's this routine's responsibility to * mark WAL as reserved by setting a convenient restart_lsn for the slot. * Otherwise, we set for decoding to start from the given LSN without diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 2c9d5de6d90..beb735d87b6 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -118,10 +118,14 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) * Helper function for creating a new logical replication slot with * given arguments. Note that this function doesn't release the created * slot. + * + * When find_startpoint is false, the slot's confirmed_flush is not set; it's + * caller's responsibility to ensure it's set to something sensible. */ static void create_logical_replication_slot(char *name, char *plugin, - bool temporary, XLogRecPtr restart_lsn) + bool temporary, XLogRecPtr restart_lsn, + bool find_startpoint) { LogicalDecodingContext *ctx = NULL; @@ -139,16 +143,24 @@ create_logical_replication_slot(char *name, char *plugin, temporary ? RS_TEMPORARY : RS_EPHEMERAL); /* - * Create logical decoding context, to build the initial snapshot. + * Create logical decoding context to find start point or, if we don't + * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity. + * + * Note: when !find_startpoint this is still important, because it's at + * this point that the output plugin is validated. */ ctx = CreateInitDecodingContext(plugin, NIL, - false, /* do not build snapshot */ + false, /* just catalogs is OK */ restart_lsn, logical_read_local_xlog_page, NULL, NULL, NULL); - /* build initial snapshot, might take a while */ - DecodingContextFindStartpoint(ctx); + /* + * If caller needs us to determine the decoding start point, do so now. + * This might take a while. + */ + if (find_startpoint) + DecodingContextFindStartpoint(ctx); /* don't need the decoding context anymore */ FreeDecodingContext(ctx); @@ -179,7 +191,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) create_logical_replication_slot(NameStr(*name), NameStr(*plugin), temporary, - InvalidXLogRecPtr); + InvalidXLogRecPtr, + true); values[0] = NameGetDatum(&MyReplicationSlot->data.name); values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); @@ -683,10 +696,18 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) /* Create new slot and acquire it */ if (logical_slot) + { + /* + * We must not try to read WAL, since we haven't reserved it yet -- + * hence pass find_startpoint false. confirmed_flush will be set + * below, by copying from the source slot. + */ create_logical_replication_slot(NameStr(*dst_name), plugin, temporary, - src_restart_lsn); + src_restart_lsn, + false); + } else create_physical_replication_slot(NameStr(*dst_name), true, @@ -703,6 +724,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) TransactionId copy_xmin; TransactionId copy_catalog_xmin; XLogRecPtr copy_restart_lsn; + XLogRecPtr copy_confirmed_flush; bool copy_islogical; char *copy_name; @@ -714,6 +736,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) copy_xmin = src->data.xmin; copy_catalog_xmin = src->data.catalog_xmin; copy_restart_lsn = src->data.restart_lsn; + copy_confirmed_flush = src->data.confirmed_flush; /* for existence check */ copy_name = pstrdup(NameStr(src->data.name)); @@ -738,6 +761,14 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) NameStr(*src_name)), errdetail("The source replication slot was modified incompatibly during the copy operation."))); + /* The source slot must have a consistent snapshot */ + if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot copy unfinished logical replication slot \"%s\"", + NameStr(*src_name)), + errhint("Retry when the source replication slot's confirmed_flush_lsn is valid."))); + /* Install copied values again */ SpinLockAcquire(&MyReplicationSlot->mutex); MyReplicationSlot->effective_xmin = copy_effective_xmin; @@ -746,6 +777,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) MyReplicationSlot->data.xmin = copy_xmin; MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin; MyReplicationSlot->data.restart_lsn = copy_restart_lsn; + MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush; SpinLockRelease(&MyReplicationSlot->mutex); ReplicationSlotMarkDirty();