1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-12 21:01:52 +03:00

Allow altering of two_phase option of a SUBSCRIPTION.

The two_phase option is controlled by both the publisher (as a slot
option) and the subscriber (as a subscription option), so the slot option
must also be modified.

Changing the 'two_phase' option for a subscription from 'true' to 'false'
is permitted only when there are no pending prepared transactions
corresponding to that subscription. Otherwise, the changes of already
prepared transactions can be replicated again along with their corresponding
commit leading to duplicate data or errors.

To avoid data loss, the 'two_phase' option for a subscription can only be
changed from 'false' to 'true' once the initial data synchronization is
completed. Therefore this is performed later by the logical replication worker.

Author: Hayato Kuroda, Ajin Cherian, Amit Kapila
Reviewed-by: Peter Smith, Hou Zhijie, Amit Kapila, Vitaly Davydov, Vignesh C
Discussion: https://postgr.es/m/8fab8-65d74c80-1-2f28e880@39088166
This commit is contained in:
Amit Kapila
2024-07-24 10:13:36 +05:30
parent 774d47b6c0
commit 1462aad2e4
17 changed files with 458 additions and 120 deletions

View File

@ -80,7 +80,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn);
static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
bool failover);
const bool *failover, const bool *two_phase);
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
const char *query,
@ -1121,15 +1121,27 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
*/
static void
libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
bool failover)
const bool *failover, const bool *two_phase)
{
StringInfoData cmd;
PGresult *res;
initStringInfo(&cmd);
appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )",
quote_identifier(slotname),
failover ? "true" : "false");
appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ",
quote_identifier(slotname));
if (failover)
appendStringInfo(&cmd, "FAILOVER %s",
*failover ? "true" : "false");
if (failover && two_phase)
appendStringInfo(&cmd, ", ");
if (two_phase)
appendStringInfo(&cmd, "TWO_PHASE %s",
*two_phase ? "true" : "false");
appendStringInfoString(&cmd, " );");
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
pfree(cmd.data);

View File

@ -272,11 +272,14 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
* the subscription, instead of just one.
*/
List *
logicalrep_workers_find(Oid subid, bool only_running)
logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
{
int i;
List *res = NIL;
if (acquire_lock)
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
/* Search for attached worker for a given subscription id. */
@ -288,6 +291,9 @@ logicalrep_workers_find(Oid subid, bool only_running)
res = lappend(res, w);
}
if (acquire_lock)
LWLockRelease(LogicalRepWorkerLock);
return res;
}
@ -759,7 +765,7 @@ logicalrep_worker_detach(void)
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true);
workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
foreach(lc, workers)
{
LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);

View File

@ -401,9 +401,6 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
LogicalRepTupleData *newtup,
CmdType operation);
/* Compute GID for two_phase transactions */
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
/* Functions for skipping changes */
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
static void stop_skipping_changes(void);
@ -3911,7 +3908,7 @@ maybe_reread_subscription(void)
/* !slotname should never happen when enabled is true. */
Assert(newsub->slotname);
/* two-phase should not be altered */
/* two-phase cannot be altered while the worker is running */
Assert(newsub->twophasestate == MySubscription->twophasestate);
/*
@ -4396,24 +4393,6 @@ cleanup_subxact_info()
subxact_data.nsubxacts_max = 0;
}
/*
* Form the prepared transaction GID for two_phase transactions.
*
* Return the GID in the supplied buffer.
*/
static void
TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
{
Assert(subid != InvalidRepOriginId);
if (!TransactionIdIsValid(xid))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid two-phase transaction ID")));
snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
}
/*
* Common function to run the apply loop with error handling. Disable the
* subscription, if necessary.
@ -5014,7 +4993,7 @@ AtEOXact_LogicalRepWorkers(bool isCommit)
List *workers;
ListCell *lc2;
workers = logicalrep_workers_find(subid, true);
workers = logicalrep_workers_find(subid, true, false);
foreach(lc2, workers)
{
LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);

View File

@ -804,9 +804,13 @@ ReplicationSlotDrop(const char *name, bool nowait)
* Change the definition of the slot identified by the specified name.
*/
void
ReplicationSlotAlter(const char *name, bool failover)
ReplicationSlotAlter(const char *name, const bool *failover,
const bool *two_phase)
{
bool update_slot = false;
Assert(MyReplicationSlot == NULL);
Assert(failover || two_phase);
ReplicationSlotAcquire(name, false);
@ -832,28 +836,45 @@ ReplicationSlotAlter(const char *name, bool failover)
* Do not allow users to enable failover on the standby as we do not
* support sync to the cascading standby.
*/
if (failover)
if (failover && *failover)
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot enable failover for a replication slot"
" on the standby"));
}
/*
* Do not allow users to enable failover for temporary slots as we do not
* support syncing temporary slots to the standby.
*/
if (failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot enable failover for a temporary replication slot"));
if (failover)
{
/*
* Do not allow users to enable failover for temporary slots as we do
* not support syncing temporary slots to the standby.
*/
if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot enable failover for a temporary replication slot"));
if (MyReplicationSlot->data.failover != failover)
if (MyReplicationSlot->data.failover != *failover)
{
SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->data.failover = *failover;
SpinLockRelease(&MyReplicationSlot->mutex);
update_slot = true;
}
}
if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
{
SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->data.failover = failover;
MyReplicationSlot->data.two_phase = *two_phase;
SpinLockRelease(&MyReplicationSlot->mutex);
update_slot = true;
}
if (update_slot)
{
ReplicationSlotMarkDirty();
ReplicationSlotSave();
}

View File

@ -1407,12 +1407,15 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
}
/*
* Process extra options given to ALTER_REPLICATION_SLOT.
* Change the definition of a replication slot.
*/
static void
ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
{
bool failover_given = false;
bool two_phase_given = false;
bool failover;
bool two_phase;
/* Parse options */
foreach_ptr(DefElem, defel, cmd->options)
@ -1424,23 +1427,24 @@ ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
failover_given = true;
*failover = defGetBoolean(defel);
failover = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "two_phase") == 0)
{
if (two_phase_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
two_phase_given = true;
two_phase = defGetBoolean(defel);
}
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
}
/*
* Change the definition of a replication slot.
*/
static void
AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
{
bool failover = false;
ParseAlterReplSlotOptions(cmd, &failover);
ReplicationSlotAlter(cmd->slotname, failover);
ReplicationSlotAlter(cmd->slotname,
failover_given ? &failover : NULL,
two_phase_given ? &two_phase : NULL);
}
/*