mirror of
https://github.com/postgres/postgres.git
synced 2025-07-31 22:04:40 +03:00
Ensure that the sync slots reach a consistent state after promotion without losing data.
We were directly copying the LSN locations while syncing the slots on the standby. Now, it is possible that at some particular restart_lsn there are some running xacts, which means if we start reading the WAL from that location after promotion, we won't reach a consistent snapshot state at that point. However, on the primary, we would have already been in a consistent snapshot state at that restart_lsn so we would have just serialized the existing snapshot. To avoid this problem we will use the advance_slot functionality unless the snapshot already exists at the synced restart_lsn location. This will help us to ensure that snapbuilder/slot statuses are updated properly without generating any changes. Note that the synced slot will remain as RS_TEMPORARY till the decoding from corresponding restart_lsn can reach a consistent snapshot state after which they will be marked as RS_PERSISTENT. Per buildfarm Author: Hou Zhijie Reviewed-by: Bertrand Drouvot, Shveta Malik, Bharath Rupireddy, Amit Kapila Discussion: https://postgr.es/m/OS0PR01MB5716B3942AE49F3F725ACA92943B2@OS0PR01MB5716.jpnprd01.prod.outlook.com
This commit is contained in:
@ -36,6 +36,7 @@
|
|||||||
#include "replication/decode.h"
|
#include "replication/decode.h"
|
||||||
#include "replication/logical.h"
|
#include "replication/logical.h"
|
||||||
#include "replication/reorderbuffer.h"
|
#include "replication/reorderbuffer.h"
|
||||||
|
#include "replication/slotsync.h"
|
||||||
#include "replication/snapbuild.h"
|
#include "replication/snapbuild.h"
|
||||||
#include "storage/proc.h"
|
#include "storage/proc.h"
|
||||||
#include "storage/procarray.h"
|
#include "storage/procarray.h"
|
||||||
@ -516,17 +517,23 @@ CreateDecodingContext(XLogRecPtr start_lsn,
|
|||||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
errmsg("cannot use physical replication slot for logical decoding")));
|
errmsg("cannot use physical replication slot for logical decoding")));
|
||||||
|
|
||||||
if (slot->data.database != MyDatabaseId)
|
/*
|
||||||
|
* We need to access the system tables during decoding to build the
|
||||||
|
* logical changes unless we are in fast_forward mode where no changes are
|
||||||
|
* generated.
|
||||||
|
*/
|
||||||
|
if (slot->data.database != MyDatabaseId && !fast_forward)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
errmsg("replication slot \"%s\" was not created in this database",
|
errmsg("replication slot \"%s\" was not created in this database",
|
||||||
NameStr(slot->data.name))));
|
NameStr(slot->data.name))));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Do not allow consumption of a "synchronized" slot until the standby
|
* The slots being synced from the primary can't be used for decoding as
|
||||||
* gets promoted.
|
* they are used after failover. However, we do allow advancing the LSNs
|
||||||
|
* during the synchronization of slots. See update_local_synced_slot.
|
||||||
*/
|
*/
|
||||||
if (RecoveryInProgress() && slot->data.synced)
|
if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots())
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
errmsg("cannot use replication slot \"%s\" for logical decoding",
|
errmsg("cannot use replication slot \"%s\" for logical decoding",
|
||||||
@ -2034,3 +2041,135 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
|
|||||||
|
|
||||||
return has_pending_wal;
|
return has_pending_wal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Helper function for advancing our logical replication slot forward.
|
||||||
|
*
|
||||||
|
* The slot's restart_lsn is used as start point for reading records, while
|
||||||
|
* confirmed_flush is used as base point for the decoding context.
|
||||||
|
*
|
||||||
|
* We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
|
||||||
|
* because we need to digest WAL to advance restart_lsn allowing to recycle
|
||||||
|
* WAL and removal of old catalog tuples. As decoding is done in fast_forward
|
||||||
|
* mode, no changes are generated anyway.
|
||||||
|
*
|
||||||
|
* *found_consistent_snapshot will be true if the initial decoding snapshot has
|
||||||
|
* been built; Otherwise, it will be false.
|
||||||
|
*/
|
||||||
|
XLogRecPtr
|
||||||
|
LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
|
||||||
|
bool *found_consistent_snapshot)
|
||||||
|
{
|
||||||
|
LogicalDecodingContext *ctx;
|
||||||
|
ResourceOwner old_resowner = CurrentResourceOwner;
|
||||||
|
XLogRecPtr retlsn;
|
||||||
|
|
||||||
|
Assert(moveto != InvalidXLogRecPtr);
|
||||||
|
|
||||||
|
if (found_consistent_snapshot)
|
||||||
|
*found_consistent_snapshot = false;
|
||||||
|
|
||||||
|
PG_TRY();
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Create our decoding context in fast_forward mode, passing start_lsn
|
||||||
|
* as InvalidXLogRecPtr, so that we start processing from my slot's
|
||||||
|
* confirmed_flush.
|
||||||
|
*/
|
||||||
|
ctx = CreateDecodingContext(InvalidXLogRecPtr,
|
||||||
|
NIL,
|
||||||
|
true, /* fast_forward */
|
||||||
|
XL_ROUTINE(.page_read = read_local_xlog_page,
|
||||||
|
.segment_open = wal_segment_open,
|
||||||
|
.segment_close = wal_segment_close),
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Wait for specified streaming replication standby servers (if any)
|
||||||
|
* to confirm receipt of WAL up to moveto lsn.
|
||||||
|
*/
|
||||||
|
WaitForStandbyConfirmation(moveto);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Start reading at the slot's restart_lsn, which we know to point to
|
||||||
|
* a valid record.
|
||||||
|
*/
|
||||||
|
XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
|
||||||
|
|
||||||
|
/* invalidate non-timetravel entries */
|
||||||
|
InvalidateSystemCaches();
|
||||||
|
|
||||||
|
/* Decode records until we reach the requested target */
|
||||||
|
while (ctx->reader->EndRecPtr < moveto)
|
||||||
|
{
|
||||||
|
char *errm = NULL;
|
||||||
|
XLogRecord *record;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Read records. No changes are generated in fast_forward mode,
|
||||||
|
* but snapbuilder/slot statuses are updated properly.
|
||||||
|
*/
|
||||||
|
record = XLogReadRecord(ctx->reader, &errm);
|
||||||
|
if (errm)
|
||||||
|
elog(ERROR, "could not find record while advancing replication slot: %s",
|
||||||
|
errm);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Process the record. Storage-level changes are ignored in
|
||||||
|
* fast_forward mode, but other modules (such as snapbuilder)
|
||||||
|
* might still have critical updates to do.
|
||||||
|
*/
|
||||||
|
if (record)
|
||||||
|
LogicalDecodingProcessRecord(ctx, ctx->reader);
|
||||||
|
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (found_consistent_snapshot && DecodingContextReady(ctx))
|
||||||
|
*found_consistent_snapshot = true;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Logical decoding could have clobbered CurrentResourceOwner during
|
||||||
|
* transaction management, so restore the executor's value. (This is
|
||||||
|
* a kluge, but it's not worth cleaning up right now.)
|
||||||
|
*/
|
||||||
|
CurrentResourceOwner = old_resowner;
|
||||||
|
|
||||||
|
if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
|
||||||
|
{
|
||||||
|
LogicalConfirmReceivedLocation(moveto);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If only the confirmed_flush LSN has changed the slot won't get
|
||||||
|
* marked as dirty by the above. Callers on the walsender
|
||||||
|
* interface are expected to keep track of their own progress and
|
||||||
|
* don't need it written out. But SQL-interface users cannot
|
||||||
|
* specify their own start positions and it's harder for them to
|
||||||
|
* keep track of their progress, so we should make more of an
|
||||||
|
* effort to save it for them.
|
||||||
|
*
|
||||||
|
* Dirty the slot so it is written out at the next checkpoint. The
|
||||||
|
* LSN position advanced to may still be lost on a crash but this
|
||||||
|
* makes the data consistent after a clean shutdown.
|
||||||
|
*/
|
||||||
|
ReplicationSlotMarkDirty();
|
||||||
|
}
|
||||||
|
|
||||||
|
retlsn = MyReplicationSlot->data.confirmed_flush;
|
||||||
|
|
||||||
|
/* free context, call shutdown callback */
|
||||||
|
FreeDecodingContext(ctx);
|
||||||
|
|
||||||
|
InvalidateSystemCaches();
|
||||||
|
}
|
||||||
|
PG_CATCH();
|
||||||
|
{
|
||||||
|
/* clear all timetravel entries */
|
||||||
|
InvalidateSystemCaches();
|
||||||
|
|
||||||
|
PG_RE_THROW();
|
||||||
|
}
|
||||||
|
PG_END_TRY();
|
||||||
|
|
||||||
|
return retlsn;
|
||||||
|
}
|
||||||
|
@ -25,6 +25,15 @@
|
|||||||
* which slot sync worker can perform the sync periodically or user can call
|
* which slot sync worker can perform the sync periodically or user can call
|
||||||
* pg_sync_replication_slots() periodically to perform the syncs.
|
* pg_sync_replication_slots() periodically to perform the syncs.
|
||||||
*
|
*
|
||||||
|
* If synchronized slots fail to build a consistent snapshot from the
|
||||||
|
* restart_lsn before reaching confirmed_flush_lsn, they would become
|
||||||
|
* unreliable after promotion due to potential data loss from changes
|
||||||
|
* before reaching a consistent point. This can happen because the slots can
|
||||||
|
* be synced at some random time and we may not reach the consistent point
|
||||||
|
* at the same WAL location as the primary. So, we mark such slots as
|
||||||
|
* RS_TEMPORARY. Once the decoding from corresponding LSNs can reach a
|
||||||
|
* consistent point, they will be marked as RS_PERSISTENT.
|
||||||
|
*
|
||||||
* The slot sync worker waits for some time before the next synchronization,
|
* The slot sync worker waits for some time before the next synchronization,
|
||||||
* with the duration varying based on whether any slots were updated during
|
* with the duration varying based on whether any slots were updated during
|
||||||
* the last cycle. Refer to the comments above wait_for_slot_activity() for
|
* the last cycle. Refer to the comments above wait_for_slot_activity() for
|
||||||
@ -49,8 +58,9 @@
|
|||||||
#include "postmaster/fork_process.h"
|
#include "postmaster/fork_process.h"
|
||||||
#include "postmaster/interrupt.h"
|
#include "postmaster/interrupt.h"
|
||||||
#include "postmaster/postmaster.h"
|
#include "postmaster/postmaster.h"
|
||||||
#include "replication/slot.h"
|
#include "replication/logical.h"
|
||||||
#include "replication/slotsync.h"
|
#include "replication/slotsync.h"
|
||||||
|
#include "replication/snapbuild.h"
|
||||||
#include "storage/ipc.h"
|
#include "storage/ipc.h"
|
||||||
#include "storage/lmgr.h"
|
#include "storage/lmgr.h"
|
||||||
#include "storage/proc.h"
|
#include "storage/proc.h"
|
||||||
@ -147,50 +157,85 @@ static void slotsync_failure_callback(int code, Datum arg);
|
|||||||
*
|
*
|
||||||
* If no update was needed (the data of the remote slot is the same as the
|
* If no update was needed (the data of the remote slot is the same as the
|
||||||
* local slot) return false, otherwise true.
|
* local slot) return false, otherwise true.
|
||||||
|
*
|
||||||
|
* *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
|
||||||
|
* modified, and decoding from the corresponding LSN's can reach a
|
||||||
|
* consistent snapshot.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
|
update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
|
||||||
|
bool *found_consistent_snapshot)
|
||||||
{
|
{
|
||||||
ReplicationSlot *slot = MyReplicationSlot;
|
ReplicationSlot *slot = MyReplicationSlot;
|
||||||
bool xmin_changed;
|
bool slot_updated = false;
|
||||||
bool restart_lsn_changed;
|
|
||||||
NameData plugin_name;
|
|
||||||
|
|
||||||
Assert(slot->data.invalidated == RS_INVAL_NONE);
|
Assert(slot->data.invalidated == RS_INVAL_NONE);
|
||||||
|
|
||||||
xmin_changed = (remote_slot->catalog_xmin != slot->data.catalog_xmin);
|
if (found_consistent_snapshot)
|
||||||
restart_lsn_changed = (remote_slot->restart_lsn != slot->data.restart_lsn);
|
*found_consistent_snapshot = false;
|
||||||
|
|
||||||
if (!xmin_changed &&
|
if (remote_slot->confirmed_lsn != slot->data.confirmed_flush ||
|
||||||
!restart_lsn_changed &&
|
remote_slot->restart_lsn != slot->data.restart_lsn ||
|
||||||
remote_dbid == slot->data.database &&
|
remote_slot->catalog_xmin != slot->data.catalog_xmin)
|
||||||
remote_slot->two_phase == slot->data.two_phase &&
|
{
|
||||||
remote_slot->failover == slot->data.failover &&
|
/*
|
||||||
remote_slot->confirmed_lsn == slot->data.confirmed_flush &&
|
* We can't directly copy the remote slot's LSN or xmin unless there
|
||||||
strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0)
|
* exists a consistent snapshot at that point. Otherwise, after
|
||||||
return false;
|
* promotion, the slots may not reach a consistent point before the
|
||||||
|
* confirmed_flush_lsn which can lead to a data loss. To avoid data
|
||||||
|
* loss, we let slot machinery advance the slot which ensures that
|
||||||
|
* snapbuilder/slot statuses are updated properly.
|
||||||
|
*/
|
||||||
|
if (SnapBuildSnapshotExists(remote_slot->restart_lsn))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Update the slot info directly if there is a serialized snapshot
|
||||||
|
* at the restart_lsn, as the slot can quickly reach consistency
|
||||||
|
* at restart_lsn by restoring the snapshot.
|
||||||
|
*/
|
||||||
|
SpinLockAcquire(&slot->mutex);
|
||||||
|
slot->data.restart_lsn = remote_slot->restart_lsn;
|
||||||
|
slot->data.confirmed_flush = remote_slot->confirmed_lsn;
|
||||||
|
slot->data.catalog_xmin = remote_slot->catalog_xmin;
|
||||||
|
slot->effective_catalog_xmin = remote_slot->catalog_xmin;
|
||||||
|
SpinLockRelease(&slot->mutex);
|
||||||
|
|
||||||
/* Avoid expensive operations while holding a spinlock. */
|
if (found_consistent_snapshot)
|
||||||
namestrcpy(&plugin_name, remote_slot->plugin);
|
*found_consistent_snapshot = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,
|
||||||
|
found_consistent_snapshot);
|
||||||
|
}
|
||||||
|
|
||||||
SpinLockAcquire(&slot->mutex);
|
|
||||||
slot->data.plugin = plugin_name;
|
|
||||||
slot->data.database = remote_dbid;
|
|
||||||
slot->data.two_phase = remote_slot->two_phase;
|
|
||||||
slot->data.failover = remote_slot->failover;
|
|
||||||
slot->data.restart_lsn = remote_slot->restart_lsn;
|
|
||||||
slot->data.confirmed_flush = remote_slot->confirmed_lsn;
|
|
||||||
slot->data.catalog_xmin = remote_slot->catalog_xmin;
|
|
||||||
slot->effective_catalog_xmin = remote_slot->catalog_xmin;
|
|
||||||
SpinLockRelease(&slot->mutex);
|
|
||||||
|
|
||||||
if (xmin_changed)
|
|
||||||
ReplicationSlotsComputeRequiredXmin(false);
|
ReplicationSlotsComputeRequiredXmin(false);
|
||||||
|
|
||||||
if (restart_lsn_changed)
|
|
||||||
ReplicationSlotsComputeRequiredLSN();
|
ReplicationSlotsComputeRequiredLSN();
|
||||||
|
|
||||||
return true;
|
slot_updated = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (remote_dbid != slot->data.database ||
|
||||||
|
remote_slot->two_phase != slot->data.two_phase ||
|
||||||
|
remote_slot->failover != slot->data.failover ||
|
||||||
|
strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0)
|
||||||
|
{
|
||||||
|
NameData plugin_name;
|
||||||
|
|
||||||
|
/* Avoid expensive operations while holding a spinlock. */
|
||||||
|
namestrcpy(&plugin_name, remote_slot->plugin);
|
||||||
|
|
||||||
|
SpinLockAcquire(&slot->mutex);
|
||||||
|
slot->data.plugin = plugin_name;
|
||||||
|
slot->data.database = remote_dbid;
|
||||||
|
slot->data.two_phase = remote_slot->two_phase;
|
||||||
|
slot->data.failover = remote_slot->failover;
|
||||||
|
SpinLockRelease(&slot->mutex);
|
||||||
|
|
||||||
|
slot_updated = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return slot_updated;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -413,6 +458,7 @@ static bool
|
|||||||
update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
|
update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
|
||||||
{
|
{
|
||||||
ReplicationSlot *slot = MyReplicationSlot;
|
ReplicationSlot *slot = MyReplicationSlot;
|
||||||
|
bool found_consistent_snapshot = false;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check if the primary server has caught up. Refer to the comment atop
|
* Check if the primary server has caught up. Refer to the comment atop
|
||||||
@ -443,9 +489,22 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* First time slot update, the function must return true */
|
(void) update_local_synced_slot(remote_slot, remote_dbid,
|
||||||
if (!update_local_synced_slot(remote_slot, remote_dbid))
|
&found_consistent_snapshot);
|
||||||
elog(ERROR, "failed to update slot");
|
|
||||||
|
/*
|
||||||
|
* Don't persist the slot if it cannot reach the consistent point from the
|
||||||
|
* restart_lsn. See comments atop this file.
|
||||||
|
*/
|
||||||
|
if (!found_consistent_snapshot)
|
||||||
|
{
|
||||||
|
ereport(LOG,
|
||||||
|
errmsg("could not sync slot \"%s\"", remote_slot->name),
|
||||||
|
errdetail("Logical decoding cannot find consistent point from local slot's LSN %X/%X.",
|
||||||
|
LSN_FORMAT_ARGS(slot->data.restart_lsn)));
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
ReplicationSlotPersist();
|
ReplicationSlotPersist();
|
||||||
|
|
||||||
@ -578,7 +637,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
|
|||||||
LSN_FORMAT_ARGS(remote_slot->restart_lsn));
|
LSN_FORMAT_ARGS(remote_slot->restart_lsn));
|
||||||
|
|
||||||
/* Make sure the slot changes persist across server restart */
|
/* Make sure the slot changes persist across server restart */
|
||||||
if (update_local_synced_slot(remote_slot, remote_dbid))
|
if (update_local_synced_slot(remote_slot, remote_dbid, NULL))
|
||||||
{
|
{
|
||||||
ReplicationSlotMarkDirty();
|
ReplicationSlotMarkDirty();
|
||||||
ReplicationSlotSave();
|
ReplicationSlotSave();
|
||||||
|
@ -2134,3 +2134,26 @@ CheckPointSnapBuild(void)
|
|||||||
}
|
}
|
||||||
FreeDir(snap_dir);
|
FreeDir(snap_dir);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Check if a logical snapshot at the specified point has been serialized.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
SnapBuildSnapshotExists(XLogRecPtr lsn)
|
||||||
|
{
|
||||||
|
char path[MAXPGPATH];
|
||||||
|
int ret;
|
||||||
|
struct stat stat_buf;
|
||||||
|
|
||||||
|
sprintf(path, "pg_logical/snapshots/%X-%X.snap",
|
||||||
|
LSN_FORMAT_ARGS(lsn));
|
||||||
|
|
||||||
|
ret = stat(path, &stat_buf);
|
||||||
|
|
||||||
|
if (ret != 0 && errno != ENOENT)
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode_for_file_access(),
|
||||||
|
errmsg("could not stat file \"%s\": %m", path)));
|
||||||
|
|
||||||
|
return ret == 0;
|
||||||
|
}
|
||||||
|
@ -492,125 +492,13 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Helper function for advancing our logical replication slot forward.
|
* Advance our logical replication slot forward. See
|
||||||
*
|
* LogicalSlotAdvanceAndCheckSnapState for details.
|
||||||
* The slot's restart_lsn is used as start point for reading records, while
|
|
||||||
* confirmed_flush is used as base point for the decoding context.
|
|
||||||
*
|
|
||||||
* We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
|
|
||||||
* because we need to digest WAL to advance restart_lsn allowing to recycle
|
|
||||||
* WAL and removal of old catalog tuples. As decoding is done in fast_forward
|
|
||||||
* mode, no changes are generated anyway.
|
|
||||||
*/
|
*/
|
||||||
static XLogRecPtr
|
static XLogRecPtr
|
||||||
pg_logical_replication_slot_advance(XLogRecPtr moveto)
|
pg_logical_replication_slot_advance(XLogRecPtr moveto)
|
||||||
{
|
{
|
||||||
LogicalDecodingContext *ctx;
|
return LogicalSlotAdvanceAndCheckSnapState(moveto, NULL);
|
||||||
ResourceOwner old_resowner = CurrentResourceOwner;
|
|
||||||
XLogRecPtr retlsn;
|
|
||||||
|
|
||||||
Assert(moveto != InvalidXLogRecPtr);
|
|
||||||
|
|
||||||
PG_TRY();
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Create our decoding context in fast_forward mode, passing start_lsn
|
|
||||||
* as InvalidXLogRecPtr, so that we start processing from my slot's
|
|
||||||
* confirmed_flush.
|
|
||||||
*/
|
|
||||||
ctx = CreateDecodingContext(InvalidXLogRecPtr,
|
|
||||||
NIL,
|
|
||||||
true, /* fast_forward */
|
|
||||||
XL_ROUTINE(.page_read = read_local_xlog_page,
|
|
||||||
.segment_open = wal_segment_open,
|
|
||||||
.segment_close = wal_segment_close),
|
|
||||||
NULL, NULL, NULL);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Wait for specified streaming replication standby servers (if any)
|
|
||||||
* to confirm receipt of WAL up to moveto lsn.
|
|
||||||
*/
|
|
||||||
WaitForStandbyConfirmation(moveto);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Start reading at the slot's restart_lsn, which we know to point to
|
|
||||||
* a valid record.
|
|
||||||
*/
|
|
||||||
XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
|
|
||||||
|
|
||||||
/* invalidate non-timetravel entries */
|
|
||||||
InvalidateSystemCaches();
|
|
||||||
|
|
||||||
/* Decode records until we reach the requested target */
|
|
||||||
while (ctx->reader->EndRecPtr < moveto)
|
|
||||||
{
|
|
||||||
char *errm = NULL;
|
|
||||||
XLogRecord *record;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Read records. No changes are generated in fast_forward mode,
|
|
||||||
* but snapbuilder/slot statuses are updated properly.
|
|
||||||
*/
|
|
||||||
record = XLogReadRecord(ctx->reader, &errm);
|
|
||||||
if (errm)
|
|
||||||
elog(ERROR, "could not find record while advancing replication slot: %s",
|
|
||||||
errm);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Process the record. Storage-level changes are ignored in
|
|
||||||
* fast_forward mode, but other modules (such as snapbuilder)
|
|
||||||
* might still have critical updates to do.
|
|
||||||
*/
|
|
||||||
if (record)
|
|
||||||
LogicalDecodingProcessRecord(ctx, ctx->reader);
|
|
||||||
|
|
||||||
CHECK_FOR_INTERRUPTS();
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Logical decoding could have clobbered CurrentResourceOwner during
|
|
||||||
* transaction management, so restore the executor's value. (This is
|
|
||||||
* a kluge, but it's not worth cleaning up right now.)
|
|
||||||
*/
|
|
||||||
CurrentResourceOwner = old_resowner;
|
|
||||||
|
|
||||||
if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
|
|
||||||
{
|
|
||||||
LogicalConfirmReceivedLocation(moveto);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If only the confirmed_flush LSN has changed the slot won't get
|
|
||||||
* marked as dirty by the above. Callers on the walsender
|
|
||||||
* interface are expected to keep track of their own progress and
|
|
||||||
* don't need it written out. But SQL-interface users cannot
|
|
||||||
* specify their own start positions and it's harder for them to
|
|
||||||
* keep track of their progress, so we should make more of an
|
|
||||||
* effort to save it for them.
|
|
||||||
*
|
|
||||||
* Dirty the slot so it is written out at the next checkpoint. The
|
|
||||||
* LSN position advanced to may still be lost on a crash but this
|
|
||||||
* makes the data consistent after a clean shutdown.
|
|
||||||
*/
|
|
||||||
ReplicationSlotMarkDirty();
|
|
||||||
}
|
|
||||||
|
|
||||||
retlsn = MyReplicationSlot->data.confirmed_flush;
|
|
||||||
|
|
||||||
/* free context, call shutdown callback */
|
|
||||||
FreeDecodingContext(ctx);
|
|
||||||
|
|
||||||
InvalidateSystemCaches();
|
|
||||||
}
|
|
||||||
PG_CATCH();
|
|
||||||
{
|
|
||||||
/* clear all timetravel entries */
|
|
||||||
InvalidateSystemCaches();
|
|
||||||
|
|
||||||
PG_RE_THROW();
|
|
||||||
}
|
|
||||||
PG_END_TRY();
|
|
||||||
|
|
||||||
return retlsn;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -149,5 +149,7 @@ extern void ResetLogicalStreamingState(void);
|
|||||||
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
|
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
|
||||||
|
|
||||||
extern bool LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal);
|
extern bool LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal);
|
||||||
|
extern XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
|
||||||
|
bool *found_consistent_snapshot);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -91,4 +91,6 @@ extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
|
|||||||
struct xl_running_xacts *running);
|
struct xl_running_xacts *running);
|
||||||
extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
|
extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
|
||||||
|
|
||||||
|
extern bool SnapBuildSnapshotExists(XLogRecPtr lsn);
|
||||||
|
|
||||||
#endif /* SNAPBUILD_H */
|
#endif /* SNAPBUILD_H */
|
||||||
|
@ -365,6 +365,68 @@ ok( $stderr =~
|
|||||||
|
|
||||||
$cascading_standby->stop;
|
$cascading_standby->stop;
|
||||||
|
|
||||||
|
##################################################
|
||||||
|
# Create a failover slot and advance the restart_lsn to a position where a
|
||||||
|
# running transaction exists. This setup is for testing that the synced slots
|
||||||
|
# can achieve the consistent snapshot state starting from the restart_lsn
|
||||||
|
# after promotion without losing any data that otherwise would have been
|
||||||
|
# received from the primary.
|
||||||
|
##################################################
|
||||||
|
|
||||||
|
$primary->safe_psql('postgres',
|
||||||
|
"SELECT pg_create_logical_replication_slot('snap_test_slot', 'test_decoding', false, false, true);"
|
||||||
|
);
|
||||||
|
|
||||||
|
$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
|
||||||
|
|
||||||
|
# Two xl_running_xacts logs are generated here. When decoding the first log, it
|
||||||
|
# only serializes the snapshot, without advancing the restart_lsn to the latest
|
||||||
|
# position. This is because if a transaction is running, the restart_lsn can
|
||||||
|
# only move to a position before that transaction. Hence, the second
|
||||||
|
# xl_running_xacts log is needed, the decoding for which allows the restart_lsn
|
||||||
|
# to advance to the last serialized snapshot's position (the first log).
|
||||||
|
$primary->safe_psql(
|
||||||
|
'postgres', qq(
|
||||||
|
BEGIN;
|
||||||
|
SELECT txid_current();
|
||||||
|
SELECT pg_log_standby_snapshot();
|
||||||
|
COMMIT;
|
||||||
|
BEGIN;
|
||||||
|
SELECT txid_current();
|
||||||
|
SELECT pg_log_standby_snapshot();
|
||||||
|
COMMIT;
|
||||||
|
));
|
||||||
|
|
||||||
|
$primary->wait_for_replay_catchup($standby1);
|
||||||
|
|
||||||
|
# Advance the restart_lsn to the position of the first xl_running_xacts log
|
||||||
|
# generated above. Note that there might be concurrent xl_running_xacts logs
|
||||||
|
# written by the bgwriter, which could cause the position to be advanced to an
|
||||||
|
# unexpected point, but that would be a rare scenario and doesn't affect the
|
||||||
|
# test results.
|
||||||
|
$primary->safe_psql('postgres',
|
||||||
|
"SELECT pg_replication_slot_advance('snap_test_slot', pg_current_wal_lsn());"
|
||||||
|
);
|
||||||
|
|
||||||
|
# Log a message that will be consumed on the standby after promotion using the
|
||||||
|
# synced slot. See the test where we promote standby (Promote the standby1 to
|
||||||
|
# primary.)
|
||||||
|
$primary->safe_psql('postgres',
|
||||||
|
"SELECT pg_logical_emit_message(false, 'test', 'test');"
|
||||||
|
);
|
||||||
|
|
||||||
|
# Get the confirmed_flush_lsn for the logical slot snap_test_slot on the primary
|
||||||
|
my $confirmed_flush_lsn = $primary->safe_psql('postgres',
|
||||||
|
"SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'snap_test_slot';");
|
||||||
|
|
||||||
|
$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
|
||||||
|
|
||||||
|
# Verify that confirmed_flush_lsn of snap_test_slot slot is synced to the standby
|
||||||
|
ok( $standby1->poll_query_until(
|
||||||
|
'postgres',
|
||||||
|
"SELECT '$confirmed_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'snap_test_slot' AND synced AND NOT temporary;"),
|
||||||
|
'confirmed_flush_lsn of slot snap_test_slot synced to standby');
|
||||||
|
|
||||||
##################################################
|
##################################################
|
||||||
# Test to confirm that the slot synchronization is protected from malicious
|
# Test to confirm that the slot synchronization is protected from malicious
|
||||||
# users.
|
# users.
|
||||||
@ -458,8 +520,8 @@ $standby1->wait_for_log(qr/slot sync worker started/,
|
|||||||
$log_offset);
|
$log_offset);
|
||||||
|
|
||||||
##################################################
|
##################################################
|
||||||
# Test to confirm that restart_lsn and confirmed_flush_lsn of the logical slot
|
# Test to confirm that confirmed_flush_lsn of the logical slot on the primary
|
||||||
# on the primary is synced to the standby via the slot sync worker.
|
# is synced to the standby via the slot sync worker.
|
||||||
##################################################
|
##################################################
|
||||||
|
|
||||||
# Insert data on the primary
|
# Insert data on the primary
|
||||||
@ -479,8 +541,8 @@ $subscriber1->safe_psql(
|
|||||||
|
|
||||||
$subscriber1->wait_for_subscription_sync;
|
$subscriber1->wait_for_subscription_sync;
|
||||||
|
|
||||||
# Do not allow any further advancement of the restart_lsn and
|
# Do not allow any further advancement of the confirmed_flush_lsn for the
|
||||||
# confirmed_flush_lsn for the lsub1_slot.
|
# lsub1_slot.
|
||||||
$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 DISABLE");
|
$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 DISABLE");
|
||||||
|
|
||||||
# Wait for the replication slot to become inactive on the publisher
|
# Wait for the replication slot to become inactive on the publisher
|
||||||
@ -489,20 +551,15 @@ $primary->poll_query_until(
|
|||||||
"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'",
|
"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'",
|
||||||
1);
|
1);
|
||||||
|
|
||||||
# Get the restart_lsn for the logical slot lsub1_slot on the primary
|
|
||||||
my $primary_restart_lsn = $primary->safe_psql('postgres',
|
|
||||||
"SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';");
|
|
||||||
|
|
||||||
# Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary
|
# Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary
|
||||||
my $primary_flush_lsn = $primary->safe_psql('postgres',
|
my $primary_flush_lsn = $primary->safe_psql('postgres',
|
||||||
"SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';");
|
"SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';");
|
||||||
|
|
||||||
# Confirm that restart_lsn and confirmed_flush_lsn of lsub1_slot slot are synced
|
# Confirm that confirmed_flush_lsn of lsub1_slot slot is synced to the standby
|
||||||
# to the standby
|
|
||||||
ok( $standby1->poll_query_until(
|
ok( $standby1->poll_query_until(
|
||||||
'postgres',
|
'postgres',
|
||||||
"SELECT '$primary_restart_lsn' = restart_lsn AND '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"),
|
"SELECT '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"),
|
||||||
'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby');
|
'confirmed_flush_lsn of slot lsub1_slot synced to standby');
|
||||||
|
|
||||||
##################################################
|
##################################################
|
||||||
# Test that logical failover replication slots wait for the specified
|
# Test that logical failover replication slots wait for the specified
|
||||||
@ -744,8 +801,9 @@ $primary->reload;
|
|||||||
|
|
||||||
##################################################
|
##################################################
|
||||||
# Promote the standby1 to primary. Confirm that:
|
# Promote the standby1 to primary. Confirm that:
|
||||||
# a) the slot 'lsub1_slot' is retained on the new primary
|
# a) the slot 'lsub1_slot' and 'snap_test_slot' are retained on the new primary
|
||||||
# b) logical replication for regress_mysub1 is resumed successfully after failover
|
# b) logical replication for regress_mysub1 is resumed successfully after failover
|
||||||
|
# c) changes can be consumed from the synced slot 'snap_test_slot'
|
||||||
##################################################
|
##################################################
|
||||||
$standby1->start;
|
$standby1->start;
|
||||||
$primary->wait_for_replay_catchup($standby1);
|
$primary->wait_for_replay_catchup($standby1);
|
||||||
@ -759,8 +817,8 @@ $subscriber1->safe_psql('postgres',
|
|||||||
|
|
||||||
# Confirm the synced slot 'lsub1_slot' is retained on the new primary
|
# Confirm the synced slot 'lsub1_slot' is retained on the new primary
|
||||||
is($standby1->safe_psql('postgres',
|
is($standby1->safe_psql('postgres',
|
||||||
q{SELECT slot_name FROM pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;}),
|
q{SELECT count(*) = 2 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot', 'snap_test_slot') AND synced AND NOT temporary;}),
|
||||||
'lsub1_slot',
|
't',
|
||||||
'synced slot retained on the new primary');
|
'synced slot retained on the new primary');
|
||||||
|
|
||||||
# Insert data on the new primary
|
# Insert data on the new primary
|
||||||
@ -773,4 +831,13 @@ is( $subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
|
|||||||
"20",
|
"20",
|
||||||
'data replicated from the new primary');
|
'data replicated from the new primary');
|
||||||
|
|
||||||
|
# Consume the data from the snap_test_slot. The synced slot should reach a
|
||||||
|
# consistent point by restoring the snapshot at the restart_lsn serialized
|
||||||
|
# during slot synchronization.
|
||||||
|
$result = $standby1->safe_psql('postgres',
|
||||||
|
"SELECT count(*) FROM pg_logical_slot_get_changes('snap_test_slot', NULL, NULL) WHERE data ~ 'message*';"
|
||||||
|
);
|
||||||
|
|
||||||
|
is($result, '1', "data can be consumed using snap_test_slot");
|
||||||
|
|
||||||
done_testing();
|
done_testing();
|
||||||
|
Reference in New Issue
Block a user