1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-09 06:21:09 +03:00

Preserve conflict-relevant data during logical replication.

Logical replication requires reliable conflict detection to maintain data
consistency across nodes. To achieve this, we must prevent premature
removal of tuples deleted by other origins and their associated commit_ts
data by VACUUM, which could otherwise lead to incorrect conflict reporting
and resolution.

This patch introduces a mechanism to retain deleted tuples on the
subscriber during the application of concurrent transactions from remote
nodes. Retaining these tuples allows us to correctly ignore concurrent
updates to the same tuple. Without this, an UPDATE might be misinterpreted
as an INSERT during resolutions due to the absence of the original tuple.

Additionally, we ensure that origin metadata is not prematurely removed by
vacuum freeze, which is essential for detecting update_origin_differs and
delete_origin_differs conflicts.

To support this, a new replication slot named pg_conflict_detection is
created and maintained by the launcher on the subscriber. Each apply
worker tracks its own non-removable transaction ID, which the launcher
aggregates to determine the appropriate xmin for the slot, thereby
retaining necessary tuples.

Conflict information retention (deleted tuples and commit_ts) can be
enabled per subscription via the retain_conflict_info option. This is
disabled by default to avoid unnecessary overhead for configurations that
do not require conflict resolution or logging.

During upgrades, if any subscription on the old cluster has
retain_conflict_info enabled, a conflict detection slot will be created to
protect relevant tuples from deletion when the new cluster starts.

This is a foundational work to correctly detect update_deleted conflict
which will be done in a follow-up patch.

Author: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Nisha Moond <nisha.moond412@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/OS0PR01MB5716BE80DAEB0EE2A6A5D1F5949D2@OS0PR01MB5716.jpnprd01.prod.outlook.com
This commit is contained in:
Amit Kapila
2025-07-23 02:56:00 +00:00
parent 039f7ee0fe
commit 228c370868
45 changed files with 2234 additions and 221 deletions

View File

@@ -441,7 +441,8 @@ pa_launch_parallel_worker(void)
MySubscription->name,
MyLogicalRepWorker->userid,
InvalidOid,
dsm_segment_handle(winfo->dsm_seg));
dsm_segment_handle(winfo->dsm_seg),
false);
if (launched)
{

View File

@@ -32,6 +32,7 @@
#include "postmaster/interrupt.h"
#include "replication/logicallauncher.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
@@ -91,7 +92,6 @@ static dshash_table *last_start_times = NULL;
static bool on_commit_launcher_wakeup = false;
static void ApplyLauncherWakeup(void);
static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
static void logicalrep_worker_detach(void);
@@ -100,6 +100,9 @@ static int logicalrep_pa_worker_count(Oid subid);
static void logicalrep_launcher_attach_dshmem(void);
static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
static bool acquire_conflict_slot_if_exists(void);
static void advance_conflict_slot_xmin(TransactionId new_xmin);
/*
@@ -148,6 +151,7 @@ get_subscription_list(void)
sub->owner = subform->subowner;
sub->enabled = subform->subenabled;
sub->name = pstrdup(NameStr(subform->subname));
sub->retaindeadtuples = subform->subretaindeadtuples;
/* We don't fill fields we are not interested in. */
res = lappend(res, sub);
@@ -309,7 +313,8 @@ logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
bool
logicalrep_worker_launch(LogicalRepWorkerType wtype,
Oid dbid, Oid subid, const char *subname, Oid userid,
Oid relid, dsm_handle subworker_dsm)
Oid relid, dsm_handle subworker_dsm,
bool retain_dead_tuples)
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
@@ -328,10 +333,13 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
* - must be valid worker type
* - tablesync workers are only ones to have relid
* - parallel apply worker is the only kind of subworker
* - The replication slot used in conflict detection is created when
* retain_dead_tuples is enabled
*/
Assert(wtype != WORKERTYPE_UNKNOWN);
Assert(is_tablesync_worker == OidIsValid(relid));
Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
Assert(!retain_dead_tuples || MyReplicationSlot);
ereport(DEBUG1,
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -454,6 +462,9 @@ retry:
worker->stream_fileset = NULL;
worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
worker->parallel_apply = is_parallel_apply_worker;
worker->oldest_nonremovable_xid = retain_dead_tuples
? MyReplicationSlot->data.xmin
: InvalidTransactionId;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -1118,7 +1129,10 @@ ApplyLauncherWakeupAtCommit(void)
on_commit_launcher_wakeup = true;
}
static void
/*
* Wakeup the launcher immediately.
*/
void
ApplyLauncherWakeup(void)
{
if (LogicalRepCtx->launcher_pid != 0)
@@ -1150,6 +1164,12 @@ ApplyLauncherMain(Datum main_arg)
*/
BackgroundWorkerInitializeConnection(NULL, NULL, 0);
/*
* Acquire the conflict detection slot at startup to ensure it can be
* dropped if no longer needed after a restart.
*/
acquire_conflict_slot_if_exists();
/* Enter main loop */
for (;;)
{
@@ -1159,6 +1179,9 @@ ApplyLauncherMain(Datum main_arg)
MemoryContext subctx;
MemoryContext oldctx;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
bool can_advance_xmin = true;
bool retain_dead_tuples = false;
TransactionId xmin = InvalidTransactionId;
CHECK_FOR_INTERRUPTS();
@@ -1168,7 +1191,14 @@ ApplyLauncherMain(Datum main_arg)
ALLOCSET_DEFAULT_SIZES);
oldctx = MemoryContextSwitchTo(subctx);
/* Start any missing workers for enabled subscriptions. */
/*
* Start any missing workers for enabled subscriptions.
*
* Also, during the iteration through all subscriptions, we compute
* the minimum XID required to protect deleted tuples for conflict
* detection if one of the subscription enables retain_dead_tuples
* option.
*/
sublist = get_subscription_list();
foreach(lc, sublist)
{
@@ -1178,6 +1208,38 @@ ApplyLauncherMain(Datum main_arg)
TimestampTz now;
long elapsed;
if (sub->retaindeadtuples)
{
retain_dead_tuples = true;
/*
* Can't advance xmin of the slot unless all the subscriptions
* with retain_dead_tuples are enabled. This is required to
* ensure that we don't advance the xmin of
* CONFLICT_DETECTION_SLOT if one of the subscriptions is not
* enabled. Otherwise, we won't be able to detect conflicts
* reliably for such a subscription even though it has set the
* retain_dead_tuples option.
*/
can_advance_xmin &= sub->enabled;
/*
* Create a replication slot to retain information necessary
* for conflict detection such as dead tuples, commit
* timestamps, and origins.
*
* The slot is created before starting the apply worker to
* prevent it from unnecessarily maintaining its
* oldest_nonremovable_xid.
*
* The slot is created even for a disabled subscription to
* ensure that conflict-related information is available when
* applying remote changes that occurred before the
* subscription was enabled.
*/
CreateConflictDetectionSlot();
}
if (!sub->enabled)
continue;
@@ -1186,7 +1248,27 @@ ApplyLauncherMain(Datum main_arg)
LWLockRelease(LogicalRepWorkerLock);
if (w != NULL)
continue; /* worker is running already */
{
/*
* Compute the minimum xmin required to protect dead tuples
* required for conflict detection among all running apply
* workers that enables retain_dead_tuples.
*/
if (sub->retaindeadtuples && can_advance_xmin)
compute_min_nonremovable_xid(w, &xmin);
/* worker is running already */
continue;
}
/*
* Can't advance xmin of the slot unless all the workers
* corresponding to subscriptions with retain_dead_tuples are
* running, disabling the further computation of the minimum
* nonremovable xid.
*/
if (sub->retaindeadtuples)
can_advance_xmin = false;
/*
* If the worker is eligible to start now, launch it. Otherwise,
@@ -1210,7 +1292,8 @@ ApplyLauncherMain(Datum main_arg)
if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
sub->dbid, sub->oid, sub->name,
sub->owner, InvalidOid,
DSM_HANDLE_INVALID))
DSM_HANDLE_INVALID,
sub->retaindeadtuples))
{
/*
* We get here either if we failed to launch a worker
@@ -1230,6 +1313,20 @@ ApplyLauncherMain(Datum main_arg)
}
}
/*
* Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
* that requires us to retain dead tuples. Otherwise, if required,
* advance the slot's xmin to protect dead tuples required for the
* conflict detection.
*/
if (MyReplicationSlot)
{
if (!retain_dead_tuples)
ReplicationSlotDropAcquired();
else if (can_advance_xmin)
advance_conflict_slot_xmin(xmin);
}
/* Switch back to original memory context. */
MemoryContextSwitchTo(oldctx);
/* Clean the temporary memory. */
@@ -1257,6 +1354,125 @@ ApplyLauncherMain(Datum main_arg)
/* Not reachable */
}
/*
* Determine the minimum non-removable transaction ID across all apply workers
* for subscriptions that have retain_dead_tuples enabled. Store the result
* in *xmin.
*/
static void
compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
{
TransactionId nonremovable_xid;
Assert(worker != NULL);
/*
* The replication slot for conflict detection must be created before the
* worker starts.
*/
Assert(MyReplicationSlot);
SpinLockAcquire(&worker->relmutex);
nonremovable_xid = worker->oldest_nonremovable_xid;
SpinLockRelease(&worker->relmutex);
Assert(TransactionIdIsValid(nonremovable_xid));
if (!TransactionIdIsValid(*xmin) ||
TransactionIdPrecedes(nonremovable_xid, *xmin))
*xmin = nonremovable_xid;
}
/*
* Acquire the replication slot used to retain information for conflict
* detection, if it exists.
*
* Return true if successfully acquired, otherwise return false.
*/
static bool
acquire_conflict_slot_if_exists(void)
{
if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
return false;
ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
return true;
}
/*
* Advance the xmin the replication slot used to retain information required
* for conflict detection.
*/
static void
advance_conflict_slot_xmin(TransactionId new_xmin)
{
Assert(MyReplicationSlot);
Assert(TransactionIdIsValid(new_xmin));
Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
/* Return if the xmin value of the slot cannot be advanced */
if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
return;
SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->effective_xmin = new_xmin;
MyReplicationSlot->data.xmin = new_xmin;
SpinLockRelease(&MyReplicationSlot->mutex);
elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
ReplicationSlotMarkDirty();
ReplicationSlotsComputeRequiredXmin(false);
/*
* Like PhysicalConfirmReceivedLocation(), do not save slot information
* each time. This is acceptable because all concurrent transactions on
* the publisher that require the data preceding the slot's xmin should
* have already been applied and flushed on the subscriber before the xmin
* is advanced. So, even if the slot's xmin regresses after a restart, it
* will be advanced again in the next cycle. Therefore, no data required
* for conflict detection will be prematurely removed.
*/
return;
}
/*
* Create and acquire the replication slot used to retain information for
* conflict detection, if not yet.
*/
void
CreateConflictDetectionSlot(void)
{
TransactionId xmin_horizon;
/* Exit early, if the replication slot is already created and acquired */
if (MyReplicationSlot)
return;
ereport(LOG,
errmsg("creating replication conflict detection slot"));
ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
false, false);
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
xmin_horizon = GetOldestSafeDecodingTransactionId(false);
SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->effective_xmin = xmin_horizon;
MyReplicationSlot->data.xmin = xmin_horizon;
SpinLockRelease(&MyReplicationSlot->mutex);
ReplicationSlotsComputeRequiredXmin(true);
LWLockRelease(ProcArrayLock);
/* Write this slot to disk */
ReplicationSlotMarkDirty();
ReplicationSlotSave();
}
/*
* Is current process the logical replication launcher?
*/

View File

@@ -4917,7 +4917,7 @@ StartupReorderBuffer(void)
continue;
/* if it cannot be a slot, skip the directory */
if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
if (!ReplicationSlotValidateName(logical_de->d_name, true, DEBUG2))
continue;
/*

View File

@@ -615,7 +615,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
MySubscription->name,
MyLogicalRepWorker->userid,
rstate->relid,
DSM_HANDLE_INVALID);
DSM_HANDLE_INVALID,
false);
}
}
}

View File

@@ -132,6 +132,96 @@
* failover = true when creating the subscription. Enabling failover allows us
* to smoothly transition to the promoted standby, ensuring that we can
* subscribe to the new primary without losing any data.
*
* RETAIN DEAD TUPLES
* ----------------------
* Each apply worker that enabled retain_dead_tuples option maintains a
* non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
* prevent dead rows from being removed prematurely when the apply worker still
* needs them to detect conflicts reliably. This helps to retain the required
* commit_ts module information, which further helps to detect
* update_origin_differs and delete_origin_differs conflicts reliably, as
* otherwise, vacuum freeze could remove the required information.
*
* The logical replication launcher manages an internal replication slot named
* "pg_conflict_detection". It asynchronously aggregates the non-removable
* transaction ID from all apply workers to determine the appropriate xmin for
* the slot, thereby retaining necessary tuples.
*
* The non-removable transaction ID in the apply worker is advanced to the
* oldest running transaction ID once all concurrent transactions on the
* publisher have been applied and flushed locally. The process involves:
*
* - RDT_GET_CANDIDATE_XID:
* Call GetOldestActiveTransactionId() to take oldestRunningXid as the
* candidate xid.
*
* - RDT_REQUEST_PUBLISHER_STATUS:
* Send a message to the walsender requesting the publisher status, which
* includes the latest WAL write position and information about transactions
* that are in the commit phase.
*
* - RDT_WAIT_FOR_PUBLISHER_STATUS:
* Wait for the status from the walsender. After receiving the first status,
* do not proceed if there are concurrent remote transactions that are still
* in the commit phase. These transactions might have been assigned an
* earlier commit timestamp but have not yet written the commit WAL record.
* Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
* until all these transactions have completed.
*
* - RDT_WAIT_FOR_LOCAL_FLUSH:
* Advance the non-removable transaction ID if the current flush location has
* reached or surpassed the last received WAL position.
*
* The overall state progression is: GET_CANDIDATE_XID ->
* REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
* REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
* WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
*
* Retaining the dead tuples for this period is sufficient for ensuring
* eventual consistency using last-update-wins strategy, as dead tuples are
* useful for detecting conflicts only during the application of concurrent
* transactions from remote nodes. After applying and flushing all remote
* transactions that occurred concurrently with the tuple DELETE, any
* subsequent UPDATE from a remote node should have a later timestamp. In such
* cases, it is acceptable to detect an update_missing scenario and convert the
* UPDATE to an INSERT when applying it. But, detecting concurrent remote
* transactions with earlier timestamps than the DELETE is necessary, as the
* UPDATEs in remote transactions should be ignored if their timestamp is
* earlier than that of the dead tuples.
*
* Note that advancing the non-removable transaction ID is not supported if the
* publisher is also a physical standby. This is because the logical walsender
* on the standby can only get the WAL replay position but there may be more
* WALs that are being replicated from the primary and those WALs could have
* earlier commit timestamp.
*
* Similarly, when the publisher has subscribed to another publisher,
* information necessary for conflict detection cannot be retained for
* changes from origins other than the publisher. This is because publisher
* lacks the information on concurrent transactions of other publishers to
* which it subscribes. As the information on concurrent transactions is
* unavailable beyond subscriber's immediate publishers, the non-removable
* transaction ID might be advanced prematurely before changes from other
* origins have been fully applied.
*
* XXX Retaining information for changes from other origins might be possible
* by requesting the subscription on that origin to enable retain_dead_tuples
* and fetching the conflict detection slot.xmin along with the publisher's
* status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
* wait for the remote slot's xmin to reach the oldest active transaction ID,
* ensuring that all transactions from other origins have been applied on the
* publisher, thereby getting the latest WAL position that includes all
* concurrent changes. However, this approach may impact performance, so it
* might not worth the effort.
*
* XXX It seems feasible to get the latest commit's WAL location from the
* publisher and wait till that is applied. However, we can't do that
* because commit timestamps can regress as a commit with a later LSN is not
* guaranteed to have a later timestamp than those with earlier LSNs. Having
* said that, even if that is possible, it won't improve performance much as
* the apply always lag and moves slowly as compared with the transactions
* on the publisher.
*-------------------------------------------------------------------------
*/
@@ -140,6 +230,7 @@
#include <sys/stat.h>
#include <unistd.h>
#include "access/commit_ts.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/twophase.h"
@@ -148,6 +239,7 @@
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/subscriptioncmds.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
@@ -166,12 +258,14 @@
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/buffile.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/dynahash.h"
@@ -268,6 +362,78 @@ typedef enum
TRANS_PARALLEL_APPLY,
} TransApplyAction;
/*
* The phases involved in advancing the non-removable transaction ID.
*
* See comments atop worker.c for details of the transition between these
* phases.
*/
typedef enum
{
RDT_GET_CANDIDATE_XID,
RDT_REQUEST_PUBLISHER_STATUS,
RDT_WAIT_FOR_PUBLISHER_STATUS,
RDT_WAIT_FOR_LOCAL_FLUSH
} RetainDeadTuplesPhase;
/*
* Critical information for managing phase transitions within the
* RetainDeadTuplesPhase.
*/
typedef struct RetainDeadTuplesData
{
RetainDeadTuplesPhase phase; /* current phase */
XLogRecPtr remote_lsn; /* WAL write position on the publisher */
/*
* Oldest transaction ID that was in the commit phase on the publisher.
* Use FullTransactionId to prevent issues with transaction ID wraparound,
* where a new remote_oldestxid could falsely appear to originate from the
* past and block advancement.
*/
FullTransactionId remote_oldestxid;
/*
* Next transaction ID to be assigned on the publisher. Use
* FullTransactionId for consistency and to allow straightforward
* comparisons with remote_oldestxid.
*/
FullTransactionId remote_nextxid;
TimestampTz reply_time; /* when the publisher responds with status */
/*
* Publisher transaction ID that must be awaited to complete before
* entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
* FullTransactionId for the same reason as remote_nextxid.
*/
FullTransactionId remote_wait_for;
TransactionId candidate_xid; /* candidate for the non-removable
* transaction ID */
TimestampTz flushpos_update_time; /* when the remote flush position was
* updated in final phase
* (RDT_WAIT_FOR_LOCAL_FLUSH) */
/*
* The following fields are used to determine the timing for the next
* round of transaction ID advancement.
*/
TimestampTz last_recv_time; /* when the last message was received */
TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
int xid_advance_interval; /* how much time (ms) to wait before
* attempting to advance the
* non-removable transaction ID */
} RetainDeadTuplesData;
/*
* The minimum (100ms) and maximum (3 minutes) intervals for advancing
* non-removable transaction IDs. The maximum interval is a bit arbitrary but
* is sufficient to not cause any undue network traffic.
*/
#define MIN_XID_ADVANCE_INTERVAL 100
#define MAX_XID_ADVANCE_INTERVAL 180000
/* errcontext tracker */
static ApplyErrorCallbackArg apply_error_callback_arg =
{
@@ -332,6 +498,13 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
/*
* The remote WAL position that has been applied and flushed locally. We record
* and use this information both while sending feedback to the server and
* advancing oldest_nonremovable_xid.
*/
static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
typedef struct SubXactInfo
{
TransactionId xid; /* XID of the subxact */
@@ -372,6 +545,19 @@ static void stream_close_file(void);
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
bool status_received);
static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data);
static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
bool status_received);
static void get_candidate_xid(RetainDeadTuplesData *rdt_data);
static void request_publisher_status(RetainDeadTuplesData *rdt_data);
static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
bool status_received);
static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
bool new_xid_found);
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
static void apply_handle_insert_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
@@ -3577,6 +3763,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
bool ping_sent = false;
TimeLineID tli;
ErrorContextCallback errcallback;
RetainDeadTuplesData rdt_data = {0};
/*
* Init the ApplyMessageContext which we clean up after each replication
@@ -3655,6 +3842,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
rdt_data.last_recv_time = last_recv_timestamp;
/* Ensure we are reading the data into our memory context. */
MemoryContextSwitchTo(ApplyMessageContext);
@@ -3681,6 +3870,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
UpdateWorkerStats(last_received, send_time, false);
apply_dispatch(&s);
maybe_advance_nonremovable_xid(&rdt_data, false);
}
else if (c == 'k')
{
@@ -3696,8 +3887,31 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
last_received = end_lsn;
send_feedback(last_received, reply_requested, false);
maybe_advance_nonremovable_xid(&rdt_data, false);
UpdateWorkerStats(last_received, timestamp, true);
}
else if (c == 's') /* Primary status update */
{
rdt_data.remote_lsn = pq_getmsgint64(&s);
rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
rdt_data.reply_time = pq_getmsgint64(&s);
/*
* This should never happen, see
* ProcessStandbyPSRequestMessage. But if it happens
* due to a bug, we don't want to proceed as it can
* incorrectly advance oldest_nonremovable_xid.
*/
if (XLogRecPtrIsInvalid(rdt_data.remote_lsn))
elog(ERROR, "cannot get the latest WAL position from the publisher");
maybe_advance_nonremovable_xid(&rdt_data, true);
UpdateWorkerStats(last_received, rdt_data.reply_time, false);
}
/* other message types are purposefully ignored */
MemoryContextReset(ApplyMessageContext);
@@ -3710,6 +3924,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/* confirm all writes so far */
send_feedback(last_received, false, false);
/* Reset the timestamp if no message was received */
rdt_data.last_recv_time = 0;
maybe_advance_nonremovable_xid(&rdt_data, false);
if (!in_remote_transaction && !in_streamed_transaction)
{
/*
@@ -3744,6 +3963,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
else
wait_time = NAPTIME_PER_CYCLE;
/*
* Ensure to wake up when it's possible to advance the non-removable
* transaction ID.
*/
if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
rdt_data.xid_advance_interval)
wait_time = Min(wait_time, rdt_data.xid_advance_interval);
rc = WaitLatchOrSocket(MyLatch,
WL_SOCKET_READABLE | WL_LATCH_SET |
WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
@@ -3807,6 +4034,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
send_feedback(last_received, requestReply, requestReply);
maybe_advance_nonremovable_xid(&rdt_data, false);
/*
* Force reporting to ensure long idle periods don't lead to
* arbitrarily delayed stats. Stats can only be reported outside
@@ -3842,7 +4071,6 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
static XLogRecPtr last_writepos = InvalidXLogRecPtr;
static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
XLogRecPtr writepos;
XLogRecPtr flushpos;
@@ -3920,6 +4148,367 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
last_flushpos = flushpos;
}
/*
* Attempt to advance the non-removable transaction ID.
*
* See comments atop worker.c for details.
*/
static void
maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
bool status_received)
{
if (!can_advance_nonremovable_xid(rdt_data))
return;
process_rdt_phase_transition(rdt_data, status_received);
}
/*
* Preliminary check to determine if advancing the non-removable transaction ID
* is allowed.
*/
static bool
can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
{
/*
* It is sufficient to manage non-removable transaction ID for a
* subscription by the main apply worker to detect conflicts reliably even
* for table sync or parallel apply workers.
*/
if (!am_leader_apply_worker())
return false;
/* No need to advance if retaining dead tuples is not required */
if (!MySubscription->retaindeadtuples)
return false;
return true;
}
/*
* Process phase transitions during the non-removable transaction ID
* advancement. See comments atop worker.c for details of the transition.
*/
static void
process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
bool status_received)
{
switch (rdt_data->phase)
{
case RDT_GET_CANDIDATE_XID:
get_candidate_xid(rdt_data);
break;
case RDT_REQUEST_PUBLISHER_STATUS:
request_publisher_status(rdt_data);
break;
case RDT_WAIT_FOR_PUBLISHER_STATUS:
wait_for_publisher_status(rdt_data, status_received);
break;
case RDT_WAIT_FOR_LOCAL_FLUSH:
wait_for_local_flush(rdt_data);
break;
}
}
/*
* Workhorse for the RDT_GET_CANDIDATE_XID phase.
*/
static void
get_candidate_xid(RetainDeadTuplesData *rdt_data)
{
TransactionId oldest_running_xid;
TimestampTz now;
/*
* Use last_recv_time when applying changes in the loop to avoid
* unnecessary system time retrieval. If last_recv_time is not available,
* obtain the current timestamp.
*/
now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
/*
* Compute the candidate_xid and request the publisher status at most once
* per xid_advance_interval. Refer to adjust_xid_advance_interval() for
* details on how this value is dynamically adjusted. This is to avoid
* using CPU and network resources without making much progress.
*/
if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
rdt_data->xid_advance_interval))
return;
/*
* Immediately update the timer, even if the function returns later
* without setting candidate_xid due to inactivity on the subscriber. This
* avoids frequent calls to GetOldestActiveTransactionId.
*/
rdt_data->candidate_xid_time = now;
/*
* Consider transactions in the current database, as only dead tuples from
* this database are required for conflict detection.
*/
oldest_running_xid = GetOldestActiveTransactionId(false, false);
/*
* Oldest active transaction ID (oldest_running_xid) can't be behind any
* of its previously computed value.
*/
Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
oldest_running_xid));
/* Return if the oldest_nonremovable_xid cannot be advanced */
if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
oldest_running_xid))
{
adjust_xid_advance_interval(rdt_data, false);
return;
}
adjust_xid_advance_interval(rdt_data, true);
rdt_data->candidate_xid = oldest_running_xid;
rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
/* process the next phase */
process_rdt_phase_transition(rdt_data, false);
}
/*
* Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
*/
static void
request_publisher_status(RetainDeadTuplesData *rdt_data)
{
static StringInfo request_message = NULL;
if (!request_message)
{
MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
request_message = makeStringInfo();
MemoryContextSwitchTo(oldctx);
}
else
resetStringInfo(request_message);
/*
* Send the current time to update the remote walsender's latest reply
* message received time.
*/
pq_sendbyte(request_message, 'p');
pq_sendint64(request_message, GetCurrentTimestamp());
elog(DEBUG2, "sending publisher status request message");
/* Send a request for the publisher status */
walrcv_send(LogRepWorkerWalRcvConn,
request_message->data, request_message->len);
rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS;
/*
* Skip calling maybe_advance_nonremovable_xid() since further transition
* is possible only once we receive the publisher status message.
*/
}
/*
* Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
*/
static void
wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
bool status_received)
{
/*
* Return if we have requested but not yet received the publisher status.
*/
if (!status_received)
return;
if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
rdt_data->remote_wait_for = rdt_data->remote_nextxid;
/*
* Check if all remote concurrent transactions that were active at the
* first status request have now completed. If completed, proceed to the
* next phase; otherwise, continue checking the publisher status until
* these transactions finish.
*
* It's possible that transactions in the commit phase during the last
* cycle have now finished committing, but remote_oldestxid remains older
* than remote_wait_for. This can happen if some old transaction came in
* the commit phase when we requested status in this cycle. We do not
* handle this case explicitly as it's rare and the benefit doesn't
* justify the required complexity. Tracking would require either caching
* all xids at the publisher or sending them to subscribers. The condition
* will resolve naturally once the remaining transactions are finished.
*
* Directly advancing the non-removable transaction ID is possible if
* there are no activities on the publisher since the last advancement
* cycle. However, it requires maintaining two fields, last_remote_nextxid
* and last_remote_lsn, within the structure for comparison with the
* current cycle's values. Considering the minimal cost of continuing in
* RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
* advance the transaction ID here.
*/
if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
rdt_data->remote_oldestxid))
rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
else
rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
/* process the next phase */
process_rdt_phase_transition(rdt_data, false);
}
/*
* Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
*/
static void
wait_for_local_flush(RetainDeadTuplesData *rdt_data)
{
Assert(!XLogRecPtrIsInvalid(rdt_data->remote_lsn) &&
TransactionIdIsValid(rdt_data->candidate_xid));
/*
* We expect the publisher and subscriber clocks to be in sync using time
* sync service like NTP. Otherwise, we will advance this worker's
* oldest_nonremovable_xid prematurely, leading to the removal of rows
* required to detect conflicts reliably. This check primarily addresses
* scenarios where the publisher's clock falls behind; if the publisher's
* clock is ahead, subsequent transactions will naturally bear later
* commit timestamps, conforming to the design outlined atop worker.c.
*
* XXX Consider waiting for the publisher's clock to catch up with the
* subscriber's before proceeding to the next phase.
*/
if (TimestampDifferenceExceeds(rdt_data->reply_time,
rdt_data->candidate_xid_time, 0))
ereport(ERROR,
errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
errdetail_internal("The clock on the publisher is behind that of the subscriber."));
/*
* Do not attempt to advance the non-removable transaction ID when table
* sync is in progress. During this time, changes from a single
* transaction may be applied by multiple table sync workers corresponding
* to the target tables. So, it's necessary for all table sync workers to
* apply and flush the corresponding changes before advancing the
* transaction ID, otherwise, dead tuples that are still needed for
* conflict detection in table sync workers could be removed prematurely.
* However, confirming the apply and flush progress across all table sync
* workers is complex and not worth the effort, so we simply return if not
* all tables are in the READY state.
*
* It is safe to add new tables with initial states to the subscription
* after this check because any changes applied to these tables should
* have a WAL position greater than the rdt_data->remote_lsn.
*/
if (!AllTablesyncsReady())
return;
/*
* Update and check the remote flush position if we are applying changes
* in a loop. This is done at most once per WalWriterDelay to avoid
* performing costly operations in get_flush_position() too frequently
* during change application.
*/
if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
rdt_data->last_recv_time, WalWriterDelay))
{
XLogRecPtr writepos;
XLogRecPtr flushpos;
bool have_pending_txes;
/* Fetch the latest remote flush position */
get_flush_position(&writepos, &flushpos, &have_pending_txes);
if (flushpos > last_flushpos)
last_flushpos = flushpos;
rdt_data->flushpos_update_time = rdt_data->last_recv_time;
}
/* Return to wait for the changes to be applied */
if (last_flushpos < rdt_data->remote_lsn)
return;
/*
* Reaching here means the remote WAL position has been received, and all
* transactions up to that position on the publisher have been applied and
* flushed locally. So, we can advance the non-removable transaction ID.
*/
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
SpinLockRelease(&MyLogicalRepWorker->relmutex);
elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u",
LSN_FORMAT_ARGS(rdt_data->remote_lsn),
rdt_data->candidate_xid);
/* Notify launcher to update the xmin of the conflict slot */
ApplyLauncherWakeup();
/*
* Reset all data fields except those used to determine the timing for the
* next round of transaction ID advancement. We can even use
* flushpos_update_time in the next round to decide whether to get the
* latest flush position.
*/
rdt_data->phase = RDT_GET_CANDIDATE_XID;
rdt_data->remote_lsn = InvalidXLogRecPtr;
rdt_data->remote_oldestxid = InvalidFullTransactionId;
rdt_data->remote_nextxid = InvalidFullTransactionId;
rdt_data->reply_time = 0;
rdt_data->remote_wait_for = InvalidFullTransactionId;
rdt_data->candidate_xid = InvalidTransactionId;
/* process the next phase */
process_rdt_phase_transition(rdt_data, false);
}
/*
* Adjust the interval for advancing non-removable transaction IDs.
*
* We double the interval to try advancing the non-removable transaction IDs
* if there is no activity on the node. The maximum value of the interval is
* capped by wal_receiver_status_interval if it is not zero, otherwise to a
* 3 minutes which should be sufficient to avoid using CPU or network
* resources without much benefit.
*
* The interval is reset to a minimum value of 100ms once there is some
* activity on the node.
*
* XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
* consider the other interval or a separate GUC if the need arises.
*/
static void
adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
{
if (!new_xid_found && rdt_data->xid_advance_interval)
{
int max_interval = wal_receiver_status_interval
? wal_receiver_status_interval * 1000
: MAX_XID_ADVANCE_INTERVAL;
/*
* No new transaction ID has been assigned since the last check, so
* double the interval, but not beyond the maximum allowable value.
*/
rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
max_interval);
}
else
{
/*
* A new transaction ID was found or the interval is not yet
* initialized, so set the interval to the minimum value.
*/
rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
}
}
/*
* Exit routine for apply workers due to subscription parameter changes.
*/
@@ -4708,6 +5297,30 @@ InitializeLogRepWorker(void)
apply_worker_exit();
}
/*
* Restart the worker if retain_dead_tuples was enabled during startup.
*
* At this point, the replication slot used for conflict detection might
* not exist yet, or could be dropped soon if the launcher perceives
* retain_dead_tuples as disabled. To avoid unnecessary tracking of
* oldest_nonremovable_xid when the slot is absent or at risk of being
* dropped, a restart is initiated.
*
* The oldest_nonremovable_xid should be initialized only when the
* retain_dead_tuples is enabled before launching the worker. See
* logicalrep_worker_launch.
*/
if (am_leader_apply_worker() &&
MySubscription->retaindeadtuples &&
!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
{
ereport(LOG,
errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
MySubscription->name, "retain_dead_tuples"));
apply_worker_exit();
}
/* Setup synchronous commit according to the user's wishes */
SetConfigOption("synchronous_commit", MySubscription->synccommit,
PGC_BACKEND, PGC_S_OVERRIDE);
@@ -4864,6 +5477,14 @@ DisableSubscriptionAndExit(void)
errmsg("subscription \"%s\" has been disabled because of an error",
MySubscription->name));
/*
* Skip the track_commit_timestamp check when disabling the worker due to
* an error, as verifying commit timestamps is unnecessary in this
* context.
*/
if (MySubscription->retaindeadtuples)
CheckSubDeadTupleRetention(false, true, WARNING);
proc_exit(0);
}

View File

@@ -47,6 +47,7 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/logicallauncher.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/walsender_private.h"
@@ -172,6 +173,7 @@ static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
static void ReplicationSlotShmemExit(int code, Datum arg);
static bool IsSlotForConflictCheck(const char *name);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);
/* internal persistency functions */
@@ -258,13 +260,17 @@ ReplicationSlotShmemExit(int code, Datum arg)
/*
* Check whether the passed slot name is valid and report errors at elevel.
*
* An error will be reported for a reserved replication slot name if
* allow_reserved_name is set to false.
*
* Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
* the name to be used as a directory name on every supported OS.
*
* Returns whether the directory name is valid or not if elevel < ERROR.
*/
bool
ReplicationSlotValidateName(const char *name, int elevel)
ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
int elevel)
{
const char *cp;
@@ -300,9 +306,31 @@ ReplicationSlotValidateName(const char *name, int elevel)
return false;
}
}
if (!allow_reserved_name && IsSlotForConflictCheck(name))
{
ereport(elevel,
errcode(ERRCODE_RESERVED_NAME),
errmsg("replication slot name \"%s\" is reserved",
name),
errdetail("The name \"%s\" is reserved for the conflict detection slot.",
CONFLICT_DETECTION_SLOT));
return false;
}
return true;
}
/*
* Return true if the replication slot name is "pg_conflict_detection".
*/
static bool
IsSlotForConflictCheck(const char *name)
{
return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
}
/*
* Create a new replication slot and mark it as used by this backend.
*
@@ -330,7 +358,12 @@ ReplicationSlotCreate(const char *name, bool db_specific,
Assert(MyReplicationSlot == NULL);
ReplicationSlotValidateName(name, ERROR);
/*
* The logical launcher or pg_upgrade may create or migrate an internal
* slot, so using a reserved name is allowed in these cases.
*/
ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
ERROR);
if (failover)
{
@@ -581,6 +614,17 @@ retry:
name)));
}
/*
* Do not allow users to acquire the reserved slot. This scenario may
* occur if the launcher that owns the slot has terminated unexpectedly
* due to an error, and a backend process attempts to reuse the slot.
*/
if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
ereport(ERROR,
errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("cannot acquire replication slot \"%s\"", name),
errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
/*
* This is the slot we want; check if it's active under some other
* process. In single user mode, we don't need this check.

View File

@@ -84,6 +84,7 @@
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
@@ -258,6 +259,7 @@ static void StartLogicalReplication(StartReplicationCmd *cmd);
static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessStandbyPSRequestMessage(void);
static void ProcessRepliesIfAny(void);
static void ProcessPendingWrites(void);
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
@@ -2355,6 +2357,10 @@ ProcessStandbyMessage(void)
ProcessStandbyHSFeedbackMessage();
break;
case 'p':
ProcessStandbyPSRequestMessage();
break;
default:
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -2701,6 +2707,60 @@ ProcessStandbyHSFeedbackMessage(void)
}
}
/*
* Process the request for a primary status update message.
*/
static void
ProcessStandbyPSRequestMessage(void)
{
XLogRecPtr lsn = InvalidXLogRecPtr;
TransactionId oldestXidInCommit;
FullTransactionId nextFullXid;
FullTransactionId fullOldestXidInCommit;
WalSnd *walsnd = MyWalSnd;
TimestampTz replyTime;
/*
* This shouldn't happen because we don't support getting primary status
* message from standby.
*/
if (RecoveryInProgress())
elog(ERROR, "the primary status is unavailable during recovery");
replyTime = pq_getmsgint64(&reply_message);
/*
* Update shared state for this WalSender process based on reply data from
* standby.
*/
SpinLockAcquire(&walsnd->mutex);
walsnd->replyTime = replyTime;
SpinLockRelease(&walsnd->mutex);
/*
* Consider transactions in the current database, as only these are the
* ones replicated.
*/
oldestXidInCommit = GetOldestActiveTransactionId(true, false);
nextFullXid = ReadNextFullTransactionId();
fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid,
oldestXidInCommit);
lsn = GetXLogWriteRecPtr();
elog(DEBUG2, "sending primary status");
/* construct the message... */
resetStringInfo(&output_message);
pq_sendbyte(&output_message, 's');
pq_sendint64(&output_message, lsn);
pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit));
pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid));
pq_sendint64(&output_message, GetCurrentTimestamp());
/* ... and send it wrapped in CopyData */
pq_putmessage_noblock('d', output_message.data, output_message.len);
}
/*
* Compute how long send/receive loops should sleep.
*