mirror of
https://github.com/postgres/postgres.git
synced 2025-07-28 23:42:10 +03:00
Add option to enable two_phase commits via pg_create_logical_replication_slot.
Commit 0aa8a01d04
extends the output plugin API to allow decoding of
prepared xacts and allowed the user to enable/disable the two-phase option
via pg_logical_slot_get_changes(). This can lead to a problem such that
the first time when it gets changes via pg_logical_slot_get_changes()
without two_phase option enabled it will not get the prepared even though
prepare is after consistent snapshot. Now next time during getting changes,
if the two_phase option is enabled it can skip prepare because by that
time start decoding point has been moved. So the user will only get commit
prepared.
Allow to enable/disable this option at the create slot time and default
will be false. It will break the existing slots which is fine in a major
release.
Author: Ajin Cherian
Reviewed-by: Amit Kapila and Vignesh C
Discussion: https://postgr.es/m/d0f60d60-133d-bf8d-bd70-47784d8fabf3@enterprisedb.com
This commit is contained in:
@ -894,7 +894,8 @@ CREATE VIEW pg_replication_slots AS
|
||||
L.restart_lsn,
|
||||
L.confirmed_flush_lsn,
|
||||
L.wal_status,
|
||||
L.safe_wal_size
|
||||
L.safe_wal_size,
|
||||
L.two_phase
|
||||
FROM pg_get_replication_slots() AS L
|
||||
LEFT JOIN pg_database D ON (L.datoid = D.oid);
|
||||
|
||||
@ -1318,6 +1319,7 @@ AS 'pg_create_physical_replication_slot';
|
||||
CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
|
||||
IN slot_name name, IN plugin name,
|
||||
IN temporary boolean DEFAULT false,
|
||||
IN twophase boolean DEFAULT false,
|
||||
OUT slot_name name, OUT lsn pg_lsn)
|
||||
RETURNS RECORD
|
||||
LANGUAGE INTERNAL
|
||||
|
@ -431,6 +431,12 @@ CreateInitDecodingContext(const char *plugin,
|
||||
startup_cb_wrapper(ctx, &ctx->options, true);
|
||||
MemoryContextSwitchTo(old_context);
|
||||
|
||||
/*
|
||||
* We allow decoding of prepared transactions iff the two_phase option is
|
||||
* enabled at the time of slot creation.
|
||||
*/
|
||||
ctx->twophase &= MyReplicationSlot->data.two_phase;
|
||||
|
||||
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
|
||||
|
||||
return ctx;
|
||||
@ -531,6 +537,12 @@ CreateDecodingContext(XLogRecPtr start_lsn,
|
||||
startup_cb_wrapper(ctx, &ctx->options, false);
|
||||
MemoryContextSwitchTo(old_context);
|
||||
|
||||
/*
|
||||
* We allow decoding of prepared transactions iff the two_phase option is
|
||||
* enabled at the time of slot creation.
|
||||
*/
|
||||
ctx->twophase &= MyReplicationSlot->data.two_phase;
|
||||
|
||||
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
|
||||
|
||||
ereport(LOG,
|
||||
|
@ -216,10 +216,17 @@ ReplicationSlotValidateName(const char *name, int elevel)
|
||||
* name: Name of the slot
|
||||
* db_specific: logical decoding is db specific; if the slot is going to
|
||||
* be used for that pass true, otherwise false.
|
||||
* two_phase: Allows decoding of prepared transactions. We allow this option
|
||||
* to be enabled only at the slot creation time. If we allow this option
|
||||
* to be changed during decoding then it is quite possible that we skip
|
||||
* prepare first time because this option was not enabled. Now next time
|
||||
* during getting changes, if the two_phase option is enabled it can skip
|
||||
* prepare because by that time start decoding point has been moved. So the
|
||||
* user will only get commit prepared.
|
||||
*/
|
||||
void
|
||||
ReplicationSlotCreate(const char *name, bool db_specific,
|
||||
ReplicationSlotPersistency persistency)
|
||||
ReplicationSlotPersistency persistency, bool two_phase)
|
||||
{
|
||||
ReplicationSlot *slot = NULL;
|
||||
int i;
|
||||
@ -277,6 +284,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
|
||||
namestrcpy(&slot->data.name, name);
|
||||
slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
|
||||
slot->data.persistency = persistency;
|
||||
slot->data.two_phase = two_phase;
|
||||
|
||||
/* and then data only present in shared memory */
|
||||
slot->just_dirtied = false;
|
||||
|
@ -50,7 +50,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
|
||||
|
||||
/* acquire replication slot, this will check for conflicting names */
|
||||
ReplicationSlotCreate(name, false,
|
||||
temporary ? RS_TEMPORARY : RS_PERSISTENT);
|
||||
temporary ? RS_TEMPORARY : RS_PERSISTENT, false);
|
||||
|
||||
if (immediately_reserve)
|
||||
{
|
||||
@ -124,7 +124,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
|
||||
*/
|
||||
static void
|
||||
create_logical_replication_slot(char *name, char *plugin,
|
||||
bool temporary, XLogRecPtr restart_lsn,
|
||||
bool temporary, bool two_phase,
|
||||
XLogRecPtr restart_lsn,
|
||||
bool find_startpoint)
|
||||
{
|
||||
LogicalDecodingContext *ctx = NULL;
|
||||
@ -140,7 +141,7 @@ create_logical_replication_slot(char *name, char *plugin,
|
||||
* error as well.
|
||||
*/
|
||||
ReplicationSlotCreate(name, true,
|
||||
temporary ? RS_TEMPORARY : RS_EPHEMERAL);
|
||||
temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase);
|
||||
|
||||
/*
|
||||
* Create logical decoding context to find start point or, if we don't
|
||||
@ -177,6 +178,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
|
||||
Name name = PG_GETARG_NAME(0);
|
||||
Name plugin = PG_GETARG_NAME(1);
|
||||
bool temporary = PG_GETARG_BOOL(2);
|
||||
bool two_phase = PG_GETARG_BOOL(3);
|
||||
Datum result;
|
||||
TupleDesc tupdesc;
|
||||
HeapTuple tuple;
|
||||
@ -193,6 +195,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
|
||||
create_logical_replication_slot(NameStr(*name),
|
||||
NameStr(*plugin),
|
||||
temporary,
|
||||
two_phase,
|
||||
InvalidXLogRecPtr,
|
||||
true);
|
||||
|
||||
@ -236,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
|
||||
Datum
|
||||
pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
{
|
||||
#define PG_GET_REPLICATION_SLOTS_COLS 13
|
||||
#define PG_GET_REPLICATION_SLOTS_COLS 14
|
||||
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||
TupleDesc tupdesc;
|
||||
Tuplestorestate *tupstore;
|
||||
@ -432,6 +435,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
values[i++] = Int64GetDatum(failLSN - currlsn);
|
||||
}
|
||||
|
||||
values[i++] = BoolGetDatum(slot_contents.data.two_phase);
|
||||
|
||||
Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
|
||||
|
||||
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
|
||||
@ -796,6 +801,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
|
||||
create_logical_replication_slot(NameStr(*dst_name),
|
||||
plugin,
|
||||
temporary,
|
||||
false,
|
||||
src_restart_lsn,
|
||||
false);
|
||||
}
|
||||
|
@ -938,7 +938,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
|
||||
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
|
||||
{
|
||||
ReplicationSlotCreate(cmd->slotname, false,
|
||||
cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT);
|
||||
cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
|
||||
false);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -952,7 +953,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
|
||||
* they get dropped on error as well.
|
||||
*/
|
||||
ReplicationSlotCreate(cmd->slotname, true,
|
||||
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
|
||||
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
|
||||
cmd->two_phase);
|
||||
}
|
||||
|
||||
if (cmd->kind == REPLICATION_KIND_LOGICAL)
|
||||
|
@ -53,6 +53,6 @@
|
||||
*/
|
||||
|
||||
/* yyyymmddN */
|
||||
#define CATALOG_VERSION_NO 202102191
|
||||
#define CATALOG_VERSION_NO 202103031
|
||||
|
||||
#endif
|
||||
|
@ -10496,16 +10496,16 @@
|
||||
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
|
||||
proretset => 't', provolatile => 's', prorettype => 'record',
|
||||
proargtypes => '',
|
||||
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}',
|
||||
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
|
||||
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size}',
|
||||
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}',
|
||||
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
|
||||
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}',
|
||||
prosrc => 'pg_get_replication_slots' },
|
||||
{ oid => '3786', descr => 'set up a logical replication slot',
|
||||
proname => 'pg_create_logical_replication_slot', provolatile => 'v',
|
||||
proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool',
|
||||
proallargtypes => '{name,name,bool,name,pg_lsn}',
|
||||
proargmodes => '{i,i,i,o,o}',
|
||||
proargnames => '{slot_name,plugin,temporary,slot_name,lsn}',
|
||||
proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool bool',
|
||||
proallargtypes => '{name,name,bool,bool,name,pg_lsn}',
|
||||
proargmodes => '{i,i,i,i,o,o}',
|
||||
proargnames => '{slot_name,plugin,temporary,twophase,slot_name,lsn}',
|
||||
prosrc => 'pg_create_logical_replication_slot' },
|
||||
{ oid => '4222',
|
||||
descr => 'copy a logical replication slot, changing temporality and plugin',
|
||||
|
@ -56,6 +56,7 @@ typedef struct CreateReplicationSlotCmd
|
||||
ReplicationKind kind;
|
||||
char *plugin;
|
||||
bool temporary;
|
||||
bool two_phase;
|
||||
List *options;
|
||||
} CreateReplicationSlotCmd;
|
||||
|
||||
|
@ -98,6 +98,11 @@ typedef struct ReplicationSlotPersistentData
|
||||
*/
|
||||
XLogRecPtr initial_consistent_point;
|
||||
|
||||
/*
|
||||
* Allow decoding of prepared transactions?
|
||||
*/
|
||||
bool two_phase;
|
||||
|
||||
/* plugin name */
|
||||
NameData plugin;
|
||||
} ReplicationSlotPersistentData;
|
||||
@ -199,7 +204,7 @@ extern void ReplicationSlotsShmemInit(void);
|
||||
|
||||
/* management of individual slots */
|
||||
extern void ReplicationSlotCreate(const char *name, bool db_specific,
|
||||
ReplicationSlotPersistency p);
|
||||
ReplicationSlotPersistency p, bool two_phase);
|
||||
extern void ReplicationSlotPersist(void);
|
||||
extern void ReplicationSlotDrop(const char *name, bool nowait);
|
||||
|
||||
|
@ -1477,8 +1477,9 @@ pg_replication_slots| SELECT l.slot_name,
|
||||
l.restart_lsn,
|
||||
l.confirmed_flush_lsn,
|
||||
l.wal_status,
|
||||
l.safe_wal_size
|
||||
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size)
|
||||
l.safe_wal_size,
|
||||
l.two_phase
|
||||
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase)
|
||||
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
|
||||
pg_roles| SELECT pg_authid.rolname,
|
||||
pg_authid.rolsuper,
|
||||
|
Reference in New Issue
Block a user