diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out index 63a9940f73a..261d8886d3b 100644 --- a/contrib/test_decoding/expected/slot.out +++ b/contrib/test_decoding/expected/slot.out @@ -406,3 +406,61 @@ SELECT pg_drop_replication_slot('copied_slot2_notemp'); (1 row) +-- Test failover option of slots. +SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'test_decoding', false, false, true); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot'); + ?column? +---------- + init +(1 row) + +SELECT slot_name, slot_type, failover FROM pg_replication_slots; + slot_name | slot_type | failover +-----------------------+-----------+---------- + failover_true_slot | logical | t + failover_false_slot | logical | f + failover_default_slot | logical | f + physical_slot | physical | f +(4 rows) + +SELECT pg_drop_replication_slot('failover_true_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('failover_false_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('failover_default_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('physical_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql index 1aa27c56674..45aeae7fd5a 100644 --- a/contrib/test_decoding/sql/slot.sql +++ b/contrib/test_decoding/sql/slot.sql @@ -176,3 +176,16 @@ ORDER BY o.slot_name, c.slot_name; SELECT pg_drop_replication_slot('orig_slot2'); SELECT pg_drop_replication_slot('copied_slot2_no_change'); SELECT pg_drop_replication_slot('copied_slot2_notemp'); + +-- Test failover option of slots. +SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'test_decoding', false, false, true); +SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false); +SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false); +SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot'); + +SELECT slot_name, slot_type, failover FROM pg_replication_slots; + +SELECT pg_drop_replication_slot('failover_true_slot'); +SELECT pg_drop_replication_slot('failover_false_slot'); +SELECT pg_drop_replication_slot('failover_default_slot'); +SELECT pg_drop_replication_slot('physical_slot'); diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 5030a1045f9..968e8d59fb5 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -27707,7 +27707,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset pg_create_logical_replication_slot - pg_create_logical_replication_slot ( slot_name name, plugin name , temporary boolean, twophase boolean ) + pg_create_logical_replication_slot ( slot_name name, plugin name , temporary boolean, twophase boolean, failover boolean ) record ( slot_name name, lsn pg_lsn ) @@ -27722,8 +27722,13 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset released upon any error. The optional fourth parameter, twophase, when set to true, specifies that the decoding of prepared transactions is enabled for this - slot. A call to this function has the same effect as the replication - protocol command CREATE_REPLICATION_SLOT ... LOGICAL. + slot. The optional fifth parameter, + failover, when set to true, + specifies that this slot is enabled to be synced to the + standbys so that logical replication can be resumed after + failover. A call to this function has the same effect as + the replication protocol command + CREATE_REPLICATION_SLOT ... LOGICAL. diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 72d01fc624c..dd468b31ea7 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2555,6 +2555,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx + + + + failover bool + + + True if this is a logical slot enabled to be synced to the standbys. + Always false for physical slots. + + diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index f315fecf186..346cfb98a04 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -479,6 +479,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( IN slot_name name, IN plugin name, IN temporary boolean DEFAULT false, IN twophase boolean DEFAULT false, + IN failover boolean DEFAULT false, OUT slot_name name, OUT lsn pg_lsn) RETURNS RECORD LANGUAGE INTERNAL diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 6288270e2b2..c62aa0074a3 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1023,7 +1023,8 @@ CREATE VIEW pg_replication_slots AS L.wal_status, L.safe_wal_size, L.two_phase, - L.conflict_reason + L.conflict_reason, + L.failover FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 52da694c791..02a14ec210e 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -90,7 +90,7 @@ typedef struct ReplicationSlotOnDisk sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 3 /* version for new files */ +#define SLOT_VERSION 4 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -248,10 +248,13 @@ ReplicationSlotValidateName(const char *name, int elevel) * during getting changes, if the two_phase option is enabled it can skip * prepare because by that time start decoding point has been moved. So the * user will only get commit prepared. + * failover: If enabled, allows the slot to be synced to standbys so + * that logical replication can be resumed after failover. */ void ReplicationSlotCreate(const char *name, bool db_specific, - ReplicationSlotPersistency persistency, bool two_phase) + ReplicationSlotPersistency persistency, + bool two_phase, bool failover) { ReplicationSlot *slot = NULL; int i; @@ -311,6 +314,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.persistency = persistency; slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; + slot->data.failover = failover; /* and then data only present in shared memory */ slot->just_dirtied = false; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index cad35dce7fc..eb685089b36 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -42,7 +42,8 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, - temporary ? RS_TEMPORARY : RS_PERSISTENT, false); + temporary ? RS_TEMPORARY : RS_PERSISTENT, false, + false); if (immediately_reserve) { @@ -117,6 +118,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, + bool failover, XLogRecPtr restart_lsn, bool find_startpoint) { @@ -133,7 +135,8 @@ create_logical_replication_slot(char *name, char *plugin, * error as well. */ ReplicationSlotCreate(name, true, - temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase); + temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, + failover); /* * Create logical decoding context to find start point or, if we don't @@ -171,6 +174,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) Name plugin = PG_GETARG_NAME(1); bool temporary = PG_GETARG_BOOL(2); bool two_phase = PG_GETARG_BOOL(3); + bool failover = PG_GETARG_BOOL(4); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -188,6 +192,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) NameStr(*plugin), temporary, two_phase, + failover, InvalidXLogRecPtr, true); @@ -232,7 +237,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 15 +#define PG_GET_REPLICATION_SLOTS_COLS 16 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -426,6 +431,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) } } + values[i++] = BoolGetDatum(slot_contents.data.failover); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, @@ -693,6 +700,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) XLogRecPtr src_restart_lsn; bool src_islogical; bool temporary; + bool failover; char *plugin; Datum values[2]; bool nulls[2]; @@ -748,6 +756,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) src_islogical = SlotIsLogical(&first_slot_contents); src_restart_lsn = first_slot_contents.data.restart_lsn; temporary = (first_slot_contents.data.persistency == RS_TEMPORARY); + failover = first_slot_contents.data.failover; plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL; /* Check type of replication slot */ @@ -787,6 +796,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) plugin, temporary, false, + failover, src_restart_lsn, false); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 087031e9dc2..aa80f3de20f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1212,7 +1212,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false); + false, false); if (reserve_wal) { @@ -1243,7 +1243,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase); + two_phase, false); /* * Do options check early so that we can bail before calling the diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 74e02b3f826..183c2f84eb4 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -666,7 +666,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) * started and stopped several times causing any temporary slots to be * removed. */ - res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, " + res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, " "%s as caught_up, conflict_reason IS NOT NULL as invalid " "FROM pg_catalog.pg_replication_slots " "WHERE slot_type = 'logical' AND " @@ -684,6 +684,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) int i_slotname; int i_plugin; int i_twophase; + int i_failover; int i_caught_up; int i_invalid; @@ -692,6 +693,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) i_slotname = PQfnumber(res, "slot_name"); i_plugin = PQfnumber(res, "plugin"); i_twophase = PQfnumber(res, "two_phase"); + i_failover = PQfnumber(res, "failover"); i_caught_up = PQfnumber(res, "caught_up"); i_invalid = PQfnumber(res, "invalid"); @@ -702,6 +704,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname)); curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin)); curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0); + curr->failover = (strcmp(PQgetvalue(res, slotnum, i_failover), "t") == 0); curr->caught_up = (strcmp(PQgetvalue(res, slotnum, i_caught_up), "t") == 0); curr->invalid = (strcmp(PQgetvalue(res, slotnum, i_invalid), "t") == 0); } diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c index 14a36f0503d..10c94a6c1fc 100644 --- a/src/bin/pg_upgrade/pg_upgrade.c +++ b/src/bin/pg_upgrade/pg_upgrade.c @@ -916,8 +916,10 @@ create_logical_replication_slots(void) appendStringLiteralConn(query, slot_info->slotname, conn); appendPQExpBuffer(query, ", "); appendStringLiteralConn(query, slot_info->plugin, conn); - appendPQExpBuffer(query, ", false, %s);", - slot_info->two_phase ? "true" : "false"); + + appendPQExpBuffer(query, ", false, %s, %s);", + slot_info->two_phase ? "true" : "false", + slot_info->failover ? "true" : "false"); PQclear(executeQueryOrDie(conn, "%s", query->data)); diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index a1d08c3dab1..d9a848cbfde 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -160,6 +160,8 @@ typedef struct bool two_phase; /* can the slot decode 2PC? */ bool caught_up; /* has the slot caught up to latest changes? */ bool invalid; /* if true, the slot is unusable */ + bool failover; /* is the slot designated to be synced to the + * physical standby? */ } LogicalSlotInfo; typedef struct diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 23944db9e6b..739f9253bfe 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202401251 +#define CATALOG_VERSION_NO 202401252 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index e4115cd0840..29af4ce65d5 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11127,17 +11127,17 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'record', - proargtypes => 'name name bool bool', - proallargtypes => '{name,name,bool,bool,name,pg_lsn}', - proargmodes => '{i,i,i,i,o,o}', - proargnames => '{slot_name,plugin,temporary,twophase,slot_name,lsn}', + proargtypes => 'name name bool bool bool', + proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}', + proargmodes => '{i,i,i,i,i,o,o}', + proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, { oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin', diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 9e39aaf3037..db9bb222661 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -111,6 +111,12 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + + /* + * Is this a failover slot (sync candidate for standbys)? Only relevant + * for logical slots on the primary server. + */ + bool failover; } ReplicationSlotPersistentData; /* @@ -218,7 +224,7 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase); + bool two_phase, bool failover); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 5e846b01e68..abc944e8b82 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1473,8 +1473,9 @@ pg_replication_slots| SELECT l.slot_name, l.wal_status, l.safe_wal_size, l.two_phase, - l.conflict_reason - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason) + l.conflict_reason, + l.failover + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper,