1
0
mirror of https://github.com/postgres/postgres.git synced 2025-08-25 20:23:07 +03:00

Add support for prepared transactions to built-in logical replication.

To add support for streaming transactions at prepare time into the
built-in logical replication, we need to do the following things:

* Modify the output plugin (pgoutput) to implement the new two-phase API
callbacks, by leveraging the extended replication protocol.

* Modify the replication apply worker, to properly handle two-phase
transactions by replaying them on prepare.

* Add a new SUBSCRIPTION option "two_phase" to allow users to enable
two-phase transactions. We enable the two_phase once the initial data sync
is over.

We however must explicitly disable replication of two-phase transactions
during replication slot creation, even if the plugin supports it. We
don't need to replicate the changes accumulated during this phase,
and moreover, we don't have a replication connection open so we don't know
where to send the data anyway.

The streaming option is not allowed with this new two_phase option. This
can be done as a separate patch.

We don't allow to toggle two_phase option of a subscription because it can
lead to an inconsistent replica. For the same reason, we don't allow to
refresh the publication once the two_phase is enabled for a subscription
unless copy_data option is false.

Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich
Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi, Greg Nancarrow
Tested-By: Haiying Tang
Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com
This commit is contained in:
Amit Kapila
2021-07-14 07:33:50 +05:30
parent 6c9c283166
commit a8fd13cab0
43 changed files with 2382 additions and 191 deletions

View File

@@ -73,6 +73,7 @@ static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
static char *libpqrcv_create_slot(WalReceiverConn *conn,
const char *slotname,
bool temporary,
bool two_phase,
CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn);
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
@@ -436,6 +437,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
PQserverVersion(conn->streamConn) >= 140000)
appendStringInfoString(&cmd, ", streaming 'on'");
if (options->proto.logical.twophase &&
PQserverVersion(conn->streamConn) >= 150000)
appendStringInfoString(&cmd, ", two_phase 'on'");
pubnames = options->proto.logical.publication_names;
pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
if (!pubnames_str)
@@ -851,7 +856,7 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
*/
static char *
libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
bool temporary, CRSSnapshotAction snapshot_action,
bool temporary, bool two_phase, CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn)
{
PGresult *res;
@@ -868,6 +873,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
if (conn->logical)
{
appendStringInfoString(&cmd, " LOGICAL pgoutput");
if (two_phase)
appendStringInfoString(&cmd, " TWO_PHASE");
switch (snapshot_action)
{
case CRS_EXPORT_SNAPSHOT:

View File

@@ -374,11 +374,10 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*
* XXX Now, this can even lead to a deadlock if the prepare
* transaction is waiting to get it logically replicated for
* distributed 2PC. Currently, we don't have an in-core
* implementation of prepares for distributed 2PC but some
* out-of-core logical replication solution can have such an
* implementation. They need to inform users to not have locks
* on catalog tables in such transactions.
* distributed 2PC. This can be avoided by disallowing
* preparing transactions that have locked [user] catalog
* tables exclusively but as of now, we ask users not to do
* such an operation.
*/
DecodePrepare(ctx, buf, &parsed);
break;
@@ -735,7 +734,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
if (two_phase)
{
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
SnapBuildInitialConsistentPoint(ctx->snapshot_builder),
SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
commit_time, origin_id, origin_lsn,
parsed->twophase_gid, true);
}

View File

@@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder =
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
need_full_snapshot, slot->data.initial_consistent_point);
need_full_snapshot, slot->data.two_phase_at);
ctx->reorder->private_data = ctx;
@@ -432,10 +432,12 @@ CreateInitDecodingContext(const char *plugin,
MemoryContextSwitchTo(old_context);
/*
* We allow decoding of prepared transactions iff the two_phase option is
* enabled at the time of slot creation.
* We allow decoding of prepared transactions when the two_phase is
* enabled at the time of slot creation, or when the two_phase option is
* given at the streaming start, provided the plugin supports all the
* callbacks for two-phase.
*/
ctx->twophase &= MyReplicationSlot->data.two_phase;
ctx->twophase &= slot->data.two_phase;
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
@@ -538,10 +540,22 @@ CreateDecodingContext(XLogRecPtr start_lsn,
MemoryContextSwitchTo(old_context);
/*
* We allow decoding of prepared transactions iff the two_phase option is
* enabled at the time of slot creation.
* We allow decoding of prepared transactions when the two_phase is
* enabled at the time of slot creation, or when the two_phase option is
* given at the streaming start, provided the plugin supports all the
* callbacks for two-phase.
*/
ctx->twophase &= MyReplicationSlot->data.two_phase;
ctx->twophase &= (slot->data.two_phase || ctx->twophase_opt_given);
/* Mark slot to allow two_phase decoding if not already marked */
if (ctx->twophase && !slot->data.two_phase)
{
slot->data.two_phase = true;
slot->data.two_phase_at = start_lsn;
ReplicationSlotMarkDirty();
ReplicationSlotSave();
SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn);
}
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
@@ -602,7 +616,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
SpinLockAcquire(&slot->mutex);
slot->data.confirmed_flush = ctx->reader->EndRecPtr;
slot->data.initial_consistent_point = ctx->reader->EndRecPtr;
if (slot->data.two_phase)
slot->data.two_phase_at = ctx->reader->EndRecPtr;
SpinLockRelease(&slot->mutex);
}

View File

@@ -973,8 +973,11 @@ replorigin_advance(RepOriginId node,
/*
* Due to - harmless - race conditions during a checkpoint we could see
* values here that are older than the ones we already have in memory.
* Don't overwrite those.
* values here that are older than the ones we already have in memory. We
* could also see older values for prepared transactions when the prepare
* is sent at a later point of time along with commit prepared and there
* are other transactions commits between prepare and commit prepared. See
* ReorderBufferFinishPrepared. Don't overwrite those.
*/
if (go_backward || replication_state->remote_lsn < remote_commit)
replication_state->remote_lsn = remote_commit;

View File

@@ -49,7 +49,7 @@ logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
/* fixed fields */
pq_sendint64(out, txn->final_lsn);
pq_sendint64(out, txn->commit_time);
pq_sendint64(out, txn->xact_time.commit_time);
pq_sendint32(out, txn->xid);
}
@@ -85,7 +85,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
/* send fields */
pq_sendint64(out, commit_lsn);
pq_sendint64(out, txn->end_lsn);
pq_sendint64(out, txn->commit_time);
pq_sendint64(out, txn->xact_time.commit_time);
}
/*
@@ -106,6 +106,217 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
commit_data->committime = pq_getmsgint64(in);
}
/*
* Write BEGIN PREPARE to the output stream.
*/
void
logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
{
pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
/* fixed fields */
pq_sendint64(out, txn->final_lsn);
pq_sendint64(out, txn->end_lsn);
pq_sendint64(out, txn->xact_time.prepare_time);
pq_sendint32(out, txn->xid);
/* send gid */
pq_sendstring(out, txn->gid);
}
/*
* Read transaction BEGIN PREPARE from the stream.
*/
void
logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
{
/* read fields */
begin_data->prepare_lsn = pq_getmsgint64(in);
if (begin_data->prepare_lsn == InvalidXLogRecPtr)
elog(ERROR, "prepare_lsn not set in begin prepare message");
begin_data->end_lsn = pq_getmsgint64(in);
if (begin_data->end_lsn == InvalidXLogRecPtr)
elog(ERROR, "end_lsn not set in begin prepare message");
begin_data->prepare_time = pq_getmsgint64(in);
begin_data->xid = pq_getmsgint(in, 4);
/* read gid (copy it into a pre-allocated buffer) */
strcpy(begin_data->gid, pq_getmsgstring(in));
}
/*
* Write PREPARE to the output stream.
*/
void
logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
{
uint8 flags = 0;
pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE);
/*
* This should only ever happen for two-phase commit transactions, in
* which case we expect to have a valid GID.
*/
Assert(txn->gid != NULL);
Assert(rbtxn_prepared(txn));
/* send the flags field */
pq_sendbyte(out, flags);
/* send fields */
pq_sendint64(out, prepare_lsn);
pq_sendint64(out, txn->end_lsn);
pq_sendint64(out, txn->xact_time.prepare_time);
pq_sendint32(out, txn->xid);
/* send gid */
pq_sendstring(out, txn->gid);
}
/*
* Read transaction PREPARE from the stream.
*/
void
logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
{
/* read flags */
uint8 flags = pq_getmsgbyte(in);
if (flags != 0)
elog(ERROR, "unrecognized flags %u in prepare message", flags);
/* read fields */
prepare_data->prepare_lsn = pq_getmsgint64(in);
if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
elog(ERROR, "prepare_lsn is not set in prepare message");
prepare_data->end_lsn = pq_getmsgint64(in);
if (prepare_data->end_lsn == InvalidXLogRecPtr)
elog(ERROR, "end_lsn is not set in prepare message");
prepare_data->prepare_time = pq_getmsgint64(in);
prepare_data->xid = pq_getmsgint(in, 4);
/* read gid (copy it into a pre-allocated buffer) */
strcpy(prepare_data->gid, pq_getmsgstring(in));
}
/*
* Write COMMIT PREPARED to the output stream.
*/
void
logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
uint8 flags = 0;
pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
/*
* This should only ever happen for two-phase commit transactions, in
* which case we expect to have a valid GID.
*/
Assert(txn->gid != NULL);
/* send the flags field */
pq_sendbyte(out, flags);
/* send fields */
pq_sendint64(out, commit_lsn);
pq_sendint64(out, txn->end_lsn);
pq_sendint64(out, txn->xact_time.commit_time);
pq_sendint32(out, txn->xid);
/* send gid */
pq_sendstring(out, txn->gid);
}
/*
* Read transaction COMMIT PREPARED from the stream.
*/
void
logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
{
/* read flags */
uint8 flags = pq_getmsgbyte(in);
if (flags != 0)
elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
/* read fields */
prepare_data->commit_lsn = pq_getmsgint64(in);
if (prepare_data->commit_lsn == InvalidXLogRecPtr)
elog(ERROR, "commit_lsn is not set in commit prepared message");
prepare_data->end_lsn = pq_getmsgint64(in);
if (prepare_data->end_lsn == InvalidXLogRecPtr)
elog(ERROR, "end_lsn is not set in commit prepared message");
prepare_data->commit_time = pq_getmsgint64(in);
prepare_data->xid = pq_getmsgint(in, 4);
/* read gid (copy it into a pre-allocated buffer) */
strcpy(prepare_data->gid, pq_getmsgstring(in));
}
/*
* Write ROLLBACK PREPARED to the output stream.
*/
void
logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time)
{
uint8 flags = 0;
pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
/*
* This should only ever happen for two-phase commit transactions, in
* which case we expect to have a valid GID.
*/
Assert(txn->gid != NULL);
/* send the flags field */
pq_sendbyte(out, flags);
/* send fields */
pq_sendint64(out, prepare_end_lsn);
pq_sendint64(out, txn->end_lsn);
pq_sendint64(out, prepare_time);
pq_sendint64(out, txn->xact_time.commit_time);
pq_sendint32(out, txn->xid);
/* send gid */
pq_sendstring(out, txn->gid);
}
/*
* Read transaction ROLLBACK PREPARED from the stream.
*/
void
logicalrep_read_rollback_prepared(StringInfo in,
LogicalRepRollbackPreparedTxnData *rollback_data)
{
/* read flags */
uint8 flags = pq_getmsgbyte(in);
if (flags != 0)
elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
/* read fields */
rollback_data->prepare_end_lsn = pq_getmsgint64(in);
if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
rollback_data->rollback_end_lsn = pq_getmsgint64(in);
if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
rollback_data->prepare_time = pq_getmsgint64(in);
rollback_data->rollback_time = pq_getmsgint64(in);
rollback_data->xid = pq_getmsgint(in, 4);
/* read gid (copy it into a pre-allocated buffer) */
strcpy(rollback_data->gid, pq_getmsgstring(in));
}
/*
* Write ORIGIN to the output stream.
*/
@@ -841,7 +1052,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
/* send fields */
pq_sendint64(out, commit_lsn);
pq_sendint64(out, txn->end_lsn);
pq_sendint64(out, txn->commit_time);
pq_sendint64(out, txn->xact_time.commit_time);
}
/*

View File

@@ -2576,7 +2576,7 @@ ReorderBufferReplay(ReorderBufferTXN *txn,
txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn;
txn->commit_time = commit_time;
txn->xact_time.commit_time = commit_time;
txn->origin_id = origin_id;
txn->origin_lsn = origin_lsn;
@@ -2667,7 +2667,7 @@ ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
*/
txn->final_lsn = prepare_lsn;
txn->end_lsn = end_lsn;
txn->commit_time = prepare_time;
txn->xact_time.prepare_time = prepare_time;
txn->origin_id = origin_id;
txn->origin_lsn = origin_lsn;
@@ -2714,7 +2714,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
Assert(txn->final_lsn != InvalidXLogRecPtr);
ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
txn->commit_time, txn->origin_id, txn->origin_lsn);
txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
/*
* We send the prepare for the concurrently aborted xacts so that later
@@ -2734,7 +2734,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
void
ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
XLogRecPtr initial_consistent_point,
XLogRecPtr two_phase_at,
TimestampTz commit_time, RepOriginId origin_id,
XLogRecPtr origin_lsn, char *gid, bool is_commit)
{
@@ -2753,19 +2753,20 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
* be later used for rollback.
*/
prepare_end_lsn = txn->end_lsn;
prepare_time = txn->commit_time;
prepare_time = txn->xact_time.prepare_time;
/* add the gid in the txn */
txn->gid = pstrdup(gid);
/*
* It is possible that this transaction is not decoded at prepare time
* either because by that time we didn't have a consistent snapshot or it
* was decoded earlier but we have restarted. We only need to send the
* prepare if it was not decoded earlier. We don't need to decode the xact
* for aborts if it is not done already.
* either because by that time we didn't have a consistent snapshot, or
* two_phase was not enabled, or it was decoded earlier but we have
* restarted. We only need to send the prepare if it was not decoded
* earlier. We don't need to decode the xact for aborts if it is not done
* already.
*/
if ((txn->final_lsn < initial_consistent_point) && is_commit)
if ((txn->final_lsn < two_phase_at) && is_commit)
{
txn->txn_flags |= RBTXN_PREPARE;
@@ -2783,12 +2784,12 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
* prepared after the restart.
*/
ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
txn->commit_time, txn->origin_id, txn->origin_lsn);
txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
}
txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn;
txn->commit_time = commit_time;
txn->xact_time.commit_time = commit_time;
txn->origin_id = origin_id;
txn->origin_lsn = origin_lsn;

View File

@@ -165,15 +165,15 @@ struct SnapBuild
XLogRecPtr start_decoding_at;
/*
* LSN at which we found a consistent point at the time of slot creation.
* This is also the point where we have exported a snapshot for the
* initial copy.
* LSN at which two-phase decoding was enabled or LSN at which we found a
* consistent point at the time of slot creation.
*
* The prepared transactions that are not covered by initial snapshot
* needs to be sent later along with commit prepared and they must be
* before this point.
* The prepared transactions, that were skipped because previously
* two-phase was not enabled or are not covered by initial snapshot, need
* to be sent later along with commit prepared and they must be before
* this point.
*/
XLogRecPtr initial_consistent_point;
XLogRecPtr two_phase_at;
/*
* Don't start decoding WAL until the "xl_running_xacts" information
@@ -281,7 +281,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
TransactionId xmin_horizon,
XLogRecPtr start_lsn,
bool need_full_snapshot,
XLogRecPtr initial_consistent_point)
XLogRecPtr two_phase_at)
{
MemoryContext context;
MemoryContext oldcontext;
@@ -309,7 +309,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
builder->initial_xmin_horizon = xmin_horizon;
builder->start_decoding_at = start_lsn;
builder->building_full_snapshot = need_full_snapshot;
builder->initial_consistent_point = initial_consistent_point;
builder->two_phase_at = two_phase_at;
MemoryContextSwitchTo(oldcontext);
@@ -370,12 +370,21 @@ SnapBuildCurrentState(SnapBuild *builder)
}
/*
* Return the LSN at which the snapshot was exported
* Return the LSN at which the two-phase decoding was first enabled.
*/
XLogRecPtr
SnapBuildInitialConsistentPoint(SnapBuild *builder)
SnapBuildGetTwoPhaseAt(SnapBuild *builder)
{
return builder->initial_consistent_point;
return builder->two_phase_at;
}
/*
* Set the LSN at which two-phase decoding is enabled.
*/
void
SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
{
builder->two_phase_at = ptr;
}
/*

View File

@@ -96,6 +96,7 @@
#include "access/table.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
@@ -114,8 +115,11 @@
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
static bool table_states_valid = false;
static List *table_states_not_ready = NIL;
static bool FetchTableStates(bool *started_tx);
StringInfo copybuf = NULL;
@@ -362,7 +366,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Oid relid;
TimestampTz last_start_time;
};
static List *table_states = NIL;
static HTAB *last_start_times = NULL;
ListCell *lc;
bool started_tx = false;
@@ -370,42 +373,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Assert(!IsTransactionState());
/* We need up-to-date sync state info for subscription tables here. */
if (!table_states_valid)
{
MemoryContext oldctx;
List *rstates;
ListCell *lc;
SubscriptionRelState *rstate;
/* Clean the old list. */
list_free_deep(table_states);
table_states = NIL;
StartTransactionCommand();
started_tx = true;
/* Fetch all non-ready tables. */
rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
/* Allocate the tracking info in a permanent memory context. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
foreach(lc, rstates)
{
rstate = palloc(sizeof(SubscriptionRelState));
memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
table_states = lappend(table_states, rstate);
}
MemoryContextSwitchTo(oldctx);
table_states_valid = true;
}
FetchTableStates(&started_tx);
/*
* Prepare a hash table for tracking last start times of workers, to avoid
* immediate restarts. We don't need it if there are no tables that need
* syncing.
*/
if (table_states && !last_start_times)
if (table_states_not_ready && !last_start_times)
{
HASHCTL ctl;
@@ -419,16 +394,38 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* Clean up the hash table when we're done with all tables (just to
* release the bit of memory).
*/
else if (!table_states && last_start_times)
else if (!table_states_not_ready && last_start_times)
{
hash_destroy(last_start_times);
last_start_times = NULL;
}
/*
* Even when the two_phase mode is requested by the user, it remains as
* 'pending' until all tablesyncs have reached READY state.
*
* When this happens, we restart the apply worker and (if the conditions
* are still ok) then the two_phase tri-state will become 'enabled' at
* that time.
*
* Note: If the subscription has no tables then leave the state as
* PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
* work.
*/
if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
AllTablesyncsReady())
{
ereport(LOG,
(errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
MySubscription->name)));
proc_exit(0);
}
/*
* Process all tables that are being synchronized.
*/
foreach(lc, table_states)
foreach(lc, table_states_not_ready)
{
SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
@@ -1071,7 +1068,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* slot leading to a dangling slot on the server.
*/
HOLD_INTERRUPTS();
walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ ,
walrcv_create_slot(LogRepWorkerWalRcvConn,
slotname, false /* permanent */ , false /* two_phase */ ,
CRS_USE_SNAPSHOT, origin_startpos);
RESUME_INTERRUPTS();
@@ -1158,3 +1156,134 @@ copy_table_done:
wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
return slotname;
}
/*
* Common code to fetch the up-to-date sync state info into the static lists.
*
* Returns true if subscription has 1 or more tables, else false.
*
* Note: If this function started the transaction (indicated by the parameter)
* then it is the caller's responsibility to commit it.
*/
static bool
FetchTableStates(bool *started_tx)
{
static bool has_subrels = false;
*started_tx = false;
if (!table_states_valid)
{
MemoryContext oldctx;
List *rstates;
ListCell *lc;
SubscriptionRelState *rstate;
/* Clean the old lists. */
list_free_deep(table_states_not_ready);
table_states_not_ready = NIL;
if (!IsTransactionState())
{
StartTransactionCommand();
*started_tx = true;
}
/* Fetch all non-ready tables. */
rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
/* Allocate the tracking info in a permanent memory context. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
foreach(lc, rstates)
{
rstate = palloc(sizeof(SubscriptionRelState));
memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
table_states_not_ready = lappend(table_states_not_ready, rstate);
}
MemoryContextSwitchTo(oldctx);
/*
* Does the subscription have tables?
*
* If there were not-READY relations found then we know it does. But
* if table_state_not_ready was empty we still need to check again to
* see if there are 0 tables.
*/
has_subrels = (list_length(table_states_not_ready) > 0) ||
HasSubscriptionRelations(MySubscription->oid);
table_states_valid = true;
}
return has_subrels;
}
/*
* If the subscription has no tables then return false.
*
* Otherwise, are all tablesyncs READY?
*
* Note: This function is not suitable to be called from outside of apply or
* tablesync workers because MySubscription needs to be already initialized.
*/
bool
AllTablesyncsReady(void)
{
bool started_tx = false;
bool has_subrels = false;
/* We need up-to-date sync state info for subscription tables here. */
has_subrels = FetchTableStates(&started_tx);
if (started_tx)
{
CommitTransactionCommand();
pgstat_report_stat(false);
}
/*
* Return false when there are no tables in subscription or not all tables
* are in ready state; true otherwise.
*/
return has_subrels && list_length(table_states_not_ready) == 0;
}
/*
* Update the two_phase state of the specified subscription in pg_subscription.
*/
void
UpdateTwoPhaseState(Oid suboid, char new_state)
{
Relation rel;
HeapTuple tup;
bool nulls[Natts_pg_subscription];
bool replaces[Natts_pg_subscription];
Datum values[Natts_pg_subscription];
Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
rel = table_open(SubscriptionRelationId, RowExclusiveLock);
tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
if (!HeapTupleIsValid(tup))
elog(ERROR,
"cache lookup failed for subscription oid %u",
suboid);
/* Form a new tuple. */
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
memset(replaces, false, sizeof(replaces));
/* And update/set two_phase state */
values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
tup = heap_modify_tuple(tup, RelationGetDescr(rel),
values, nulls, replaces);
CatalogTupleUpdate(rel, &tup->t_self, tup);
heap_freetuple(tup);
table_close(rel, RowExclusiveLock);
}

View File

@@ -49,6 +49,79 @@
* a new way to pass filenames to BufFile APIs so that we are allowed to open
* the file we desired across multiple stream-open calls for the same
* transaction.
*
* TWO_PHASE TRANSACTIONS
* ----------------------
* Two phase transactions are replayed at prepare and then committed or
* rolled back at commit prepared and rollback prepared respectively. It is
* possible to have a prepared transaction that arrives at the apply worker
* when the tablesync is busy doing the initial copy. In this case, the apply
* worker skips all the prepared operations [e.g. inserts] while the tablesync
* is still busy (see the condition of should_apply_changes_for_rel). The
* tablesync worker might not get such a prepared transaction because say it
* was prior to the initial consistent point but might have got some later
* commits. Now, the tablesync worker will exit without doing anything for the
* prepared transaction skipped by the apply worker as the sync location for it
* will be already ahead of the apply worker's current location. This would lead
* to an "empty prepare", because later when the apply worker does the commit
* prepare, there is nothing in it (the inserts were skipped earlier).
*
* To avoid this, and similar prepare confusions the subscription's two_phase
* commit is enabled only after the initial sync is over. The two_phase option
* has been implemented as a tri-state with values DISABLED, PENDING, and
* ENABLED.
*
* Even if the user specifies they want a subscription with two_phase = on,
* internally it will start with a tri-state of PENDING which only becomes
* ENABLED after all tablesync initializations are completed - i.e. when all
* tablesync workers have reached their READY state. In other words, the value
* PENDING is only a temporary state for subscription start-up.
*
* Until the two_phase is properly available (ENABLED) the subscription will
* behave as if two_phase = off. When the apply worker detects that all
* tablesyncs have become READY (while the tri-state was PENDING) it will
* restart the apply worker process. This happens in
* process_syncing_tables_for_apply.
*
* When the (re-started) apply worker finds that all tablesyncs are READY for a
* two_phase tri-state of PENDING it start streaming messages with the
* two_phase option which in turn enables the decoding of two-phase commits at
* the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
* Now, it is possible that during the time we have not enabled two_phase, the
* publisher (replication server) would have skipped some prepares but we
* ensure that such prepares are sent along with commit prepare, see
* ReorderBufferFinishPrepared.
*
* If the subscription has no tables then a two_phase tri-state PENDING is
* left unchanged. This lets the user still do an ALTER TABLE REFRESH
* PUBLICATION which might otherwise be disallowed (see below).
*
* If ever a user needs to be aware of the tri-state value, they can fetch it
* from the pg_subscription catalog (see column subtwophasestate).
*
* We don't allow to toggle two_phase option of a subscription because it can
* lead to an inconsistent replica. Consider, initially, it was on and we have
* received some prepare then we turn it off, now at commit time the server
* will send the entire transaction data along with the commit. With some more
* analysis, we can allow changing this option from off to on but not sure if
* that alone would be useful.
*
* Finally, to avoid problems mentioned in previous paragraphs from any
* subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
* to 'off' and then again back to 'on') there is a restriction for
* ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
* the two_phase tri-state is ENABLED, except when copy_data = false.
*
* We can get prepare of the same GID more than once for the genuine cases
* where we have defined multiple subscriptions for publications on the same
* server and prepared transaction has operations on tables subscribed to those
* subscriptions. For such cases, if we use the GID sent by publisher one of
* the prepares will be successful and others will fail, in which case the
* server will send them again. Now, this can lead to a deadlock if user has
* set synchronous_standby_names for all the subscriptions on subscriber. To
* avoid such deadlocks, we generate a unique GID (consisting of the
* subscription oid and the xid of the prepared transaction) for each prepare
* transaction on the subscriber.
*-------------------------------------------------------------------------
*/
@@ -59,6 +132,7 @@
#include "access/table.h"
#include "access/tableam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
@@ -256,6 +330,10 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
LogicalRepTupleData *newtup,
CmdType operation);
/* Compute GID for two_phase transactions */
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
/*
* Should this worker apply changes for given relation.
*
@@ -783,6 +861,185 @@ apply_handle_commit(StringInfo s)
pgstat_report_activity(STATE_IDLE, NULL);
}
/*
* Handle BEGIN PREPARE message.
*/
static void
apply_handle_begin_prepare(StringInfo s)
{
LogicalRepPreparedTxnData begin_data;
/* Tablesync should never receive prepare. */
if (am_tablesync_worker())
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data);
remote_final_lsn = begin_data.prepare_lsn;
in_remote_transaction = true;
pgstat_report_activity(STATE_RUNNING, NULL);
}
/*
* Handle PREPARE message.
*/
static void
apply_handle_prepare(StringInfo s)
{
LogicalRepPreparedTxnData prepare_data;
char gid[GIDSIZE];
logicalrep_read_prepare(s, &prepare_data);
if (prepare_data.prepare_lsn != remote_final_lsn)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
LSN_FORMAT_ARGS(remote_final_lsn))));
/*
* Compute unique GID for two_phase transactions. We don't use GID of
* prepared transaction sent by server as that can lead to deadlock when
* we have multiple subscriptions from same node point to publications on
* the same node. See comments atop worker.c
*/
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
gid, sizeof(gid));
/*
* Unlike commit, here, we always prepare the transaction even though no
* change has happened in this transaction. It is done this way because at
* commit prepared time, we won't know whether we have skipped preparing a
* transaction because of no change.
*
* XXX, We can optimize such that at commit prepared time, we first check
* whether we have prepared the transaction or not but that doesn't seem
* worthwhile because such cases shouldn't be common.
*/
begin_replication_step();
/*
* BeginTransactionBlock is necessary to balance the EndTransactionBlock
* called within the PrepareTransactionBlock below.
*/
BeginTransactionBlock();
CommitTransactionCommand(); /* Completes the preceding Begin command. */
/*
* Update origin state so we can restart streaming from correct position
* in case of crash.
*/
replorigin_session_origin_lsn = prepare_data.end_lsn;
replorigin_session_origin_timestamp = prepare_data.prepare_time;
PrepareTransactionBlock(gid);
end_replication_step();
CommitTransactionCommand();
pgstat_report_stat(false);
store_flush_position(prepare_data.end_lsn);
in_remote_transaction = false;
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(prepare_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
}
/*
* Handle a COMMIT PREPARED of a previously PREPARED transaction.
*/
static void
apply_handle_commit_prepared(StringInfo s)
{
LogicalRepCommitPreparedTxnData prepare_data;
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
gid, sizeof(gid));
/* There is no transaction when COMMIT PREPARED is called */
begin_replication_step();
/*
* Update origin state so we can restart streaming from correct position
* in case of crash.
*/
replorigin_session_origin_lsn = prepare_data.end_lsn;
replorigin_session_origin_timestamp = prepare_data.commit_time;
FinishPreparedTransaction(gid, true);
end_replication_step();
CommitTransactionCommand();
pgstat_report_stat(false);
store_flush_position(prepare_data.end_lsn);
in_remote_transaction = false;
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(prepare_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
}
/*
* Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
*/
static void
apply_handle_rollback_prepared(StringInfo s)
{
LogicalRepRollbackPreparedTxnData rollback_data;
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
gid, sizeof(gid));
/*
* It is possible that we haven't received prepare because it occurred
* before walsender reached a consistent point or the two_phase was still
* not enabled by that time, so in such cases, we need to skip rollback
* prepared.
*/
if (LookupGXact(gid, rollback_data.prepare_end_lsn,
rollback_data.prepare_time))
{
/*
* Update origin state so we can restart streaming from correct
* position in case of crash.
*/
replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
replorigin_session_origin_timestamp = rollback_data.rollback_time;
/* There is no transaction when ABORT/ROLLBACK PREPARED is called */
begin_replication_step();
FinishPreparedTransaction(gid, false);
end_replication_step();
CommitTransactionCommand();
}
pgstat_report_stat(false);
store_flush_position(rollback_data.rollback_end_lsn);
in_remote_transaction = false;
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(rollback_data.rollback_end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
}
/*
* Handle ORIGIN message.
*
@@ -2060,6 +2317,22 @@ apply_dispatch(StringInfo s)
case LOGICAL_REP_MSG_STREAM_COMMIT:
apply_handle_stream_commit(s);
return;
case LOGICAL_REP_MSG_BEGIN_PREPARE:
apply_handle_begin_prepare(s);
return;
case LOGICAL_REP_MSG_PREPARE:
apply_handle_prepare(s);
return;
case LOGICAL_REP_MSG_COMMIT_PREPARED:
apply_handle_commit_prepared(s);
return;
case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
apply_handle_rollback_prepared(s);
return;
}
ereport(ERROR,
@@ -2539,6 +2812,9 @@ maybe_reread_subscription(void)
/* !slotname should never happen when enabled is true. */
Assert(newsub->slotname);
/* two-phase should not be altered */
Assert(newsub->twophasestate == MySubscription->twophasestate);
/*
* Exit if any parameter that affects the remote connection was changed.
* The launcher will start a new worker.
@@ -3040,6 +3316,24 @@ cleanup_subxact_info()
subxact_data.nsubxacts_max = 0;
}
/*
* Form the prepared transaction GID for two_phase transactions.
*
* Return the GID in the supplied buffer.
*/
static void
TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
{
Assert(subid != InvalidRepOriginId);
if (!TransactionIdIsValid(xid))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid two-phase transaction ID")));
snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
}
/* Logical Replication Apply worker entry point */
void
ApplyWorkerMain(Datum main_arg)
@@ -3050,6 +3344,7 @@ ApplyWorkerMain(Datum main_arg)
XLogRecPtr origin_startpos;
char *myslotname;
WalRcvStreamOptions options;
int server_version;
/* Attach to slot */
logicalrep_worker_attach(worker_slot);
@@ -3208,15 +3503,59 @@ ApplyWorkerMain(Datum main_arg)
options.logical = true;
options.startpoint = origin_startpos;
options.slotname = myslotname;
server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
options.proto.logical.proto_version =
walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ?
LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
LOGICALREP_PROTO_VERSION_NUM;
options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary;
options.proto.logical.streaming = MySubscription->stream;
options.proto.logical.twophase = false;
/* Start normal logical streaming replication. */
walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
if (!am_tablesync_worker())
{
/*
* Even when the two_phase mode is requested by the user, it remains
* as the tri-state PENDING until all tablesyncs have reached READY
* state. Only then, can it become ENABLED.
*
* Note: If the subscription has no tables then leave the state as
* PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
* work.
*/
if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
AllTablesyncsReady())
{
/* Start streaming with two_phase enabled */
options.proto.logical.twophase = true;
walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
StartTransactionCommand();
UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
CommitTransactionCommand();
}
else
{
walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
}
ereport(DEBUG1,
(errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s.",
MySubscription->name,
MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
"?")));
}
else
{
/* Start normal logical streaming replication. */
walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
}
/* Run the main loop. */
LogicalRepApplyLoop(origin_startpos);

View File

@@ -51,6 +51,16 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
Size sz, const char *message);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time);
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
@@ -70,6 +80,9 @@ static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx);
static void send_repl_origin(LogicalDecodingContext *ctx,
RepOriginId origin_id, XLogRecPtr origin_lsn,
bool send_origin);
/*
* Entry in the map used to remember which relation schemas we sent.
@@ -145,6 +158,11 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->truncate_cb = pgoutput_truncate;
cb->message_cb = pgoutput_message;
cb->commit_cb = pgoutput_commit_txn;
cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
cb->prepare_cb = pgoutput_prepare_txn;
cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown;
@@ -156,6 +174,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_change_cb = pgoutput_change;
cb->stream_message_cb = pgoutput_message;
cb->stream_truncate_cb = pgoutput_truncate;
/* transaction streaming - two-phase commit */
cb->stream_prepare_cb = NULL;
}
static void
@@ -167,10 +187,12 @@ parse_output_parameters(List *options, PGOutputData *data)
bool binary_option_given = false;
bool messages_option_given = false;
bool streaming_given = false;
bool two_phase_option_given = false;
data->binary = false;
data->streaming = false;
data->messages = false;
data->two_phase = false;
foreach(lc, options)
{
@@ -246,8 +268,29 @@ parse_output_parameters(List *options, PGOutputData *data)
data->streaming = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "two_phase") == 0)
{
if (two_phase_option_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
two_phase_option_given = true;
data->two_phase = defGetBoolean(defel);
}
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
/*
* Do additional checking for the disallowed combination of two_phase
* and streaming. While streaming and two_phase can theoretically be
* supported, it needs more analysis to allow them together.
*/
if (data->two_phase && data->streaming)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("%s and %s are mutually exclusive options",
"two_phase", "streaming")));
}
}
@@ -319,6 +362,27 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
/* Also remember we're currently not streaming any transaction. */
in_streaming = false;
/*
* Here, we just check whether the two-phase option is passed by
* plugin and decide whether to enable it at later point of time. It
* remains enabled if the previous start-up has done so. But we only
* allow the option to be passed in with sufficient version of the
* protocol, and when the output plugin supports it.
*/
if (!data->two_phase)
ctx->twophase_opt_given = false;
else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
else if (!ctx->twophase)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("two-phase commit requested, but not supported by output plugin")));
else
ctx->twophase_opt_given = true;
/* Init publication state. */
data->publications = NIL;
publications_valid = false;
@@ -331,8 +395,12 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
}
else
{
/* Disable the streaming during the slot initialization mode. */
/*
* Disable the streaming and prepared transactions during the slot
* initialization mode.
*/
ctx->streaming = false;
ctx->twophase = false;
}
}
@@ -347,29 +415,8 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_begin(ctx->out, txn);
if (send_replication_origin)
{
char *origin;
/*----------
* XXX: which behaviour do we want here?
*
* Alternatives:
* - don't send origin message if origin name not found
* (that's what we do now)
* - throw error - that will break replication, not good
* - send some special "unknown" origin
*----------
*/
if (replorigin_by_oid(txn->origin_id, true, &origin))
{
/* Message boundary */
OutputPluginWrite(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
}
}
send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
send_replication_origin);
OutputPluginWrite(ctx, true);
}
@@ -388,6 +435,68 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
/*
* BEGIN PREPARE callback
*/
static void
pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_begin_prepare(ctx->out, txn);
send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
send_replication_origin);
OutputPluginWrite(ctx, true);
}
/*
* PREPARE callback
*/
static void
pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
{
OutputPluginUpdateProgress(ctx);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);
}
/*
* COMMIT PREPARED callback
*/
static void
pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
OutputPluginUpdateProgress(ctx);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);
}
/*
* ROLLBACK PREPARED callback
*/
static void
pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time)
{
OutputPluginUpdateProgress(ctx);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
prepare_time);
OutputPluginWrite(ctx, true);
}
/*
* Write the current schema of the relation and its ancestor (if any) if not
* done yet.
@@ -839,18 +948,8 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx,
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
if (send_replication_origin)
{
char *origin;
if (replorigin_by_oid(txn->origin_id, true, &origin))
{
/* Message boundary */
OutputPluginWrite(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr);
}
}
send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
send_replication_origin);
OutputPluginWrite(ctx, true);
@@ -1270,3 +1369,33 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
entry->pubactions.pubtruncate = false;
}
}
/* Send Replication origin */
static void
send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
XLogRecPtr origin_lsn, bool send_origin)
{
if (send_origin)
{
char *origin;
/*----------
* XXX: which behaviour do we want here?
*
* Alternatives:
* - don't send origin message if origin name not found
* (that's what we do now)
* - throw error - that will break replication, not good
* - send some special "unknown" origin
*----------
*/
if (replorigin_by_oid(origin_id, true, &origin))
{
/* Message boundary */
OutputPluginWrite(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_origin(ctx->out, origin, origin_lsn);
}
}
}

View File

@@ -283,6 +283,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
slot->data.persistency = persistency;
slot->data.two_phase = two_phase;
slot->data.two_phase_at = InvalidXLogRecPtr;
/* and then data only present in shared memory */
slot->just_dirtied = false;

View File

@@ -370,7 +370,7 @@ WalReceiverMain(void)
"pg_walreceiver_%lld",
(long long int) walrcv_get_backend_pid(wrconn));
walrcv_create_slot(wrconn, slotname, true, 0, NULL);
walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
SpinLockAcquire(&walrcv->mutex);
strlcpy(walrcv->slotname, slotname, NAMEDATALEN);