diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index a6641f5040d..d2bc8b8350a 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -39,7 +39,7 @@ submake-test_decoding:
REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \
- spill
+ spill slot
regresscheck: | submake-regress submake-test_decoding temp-install
$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index a9ba615b5bb..c104c4802d1 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -702,7 +702,7 @@ SELECT pg_drop_replication_slot('regression_slot');
/* check that the slot is gone */
SELECT * FROM pg_replication_slots;
- slot_name | plugin | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
------------+--------+-----------+--------+----------+--------+------------+------+--------------+-------------+---------------------
+ slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
+-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
(0 rows)
diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out
new file mode 100644
index 00000000000..5e6b70ba38a
--- /dev/null
+++ b/contrib/test_decoding/expected/slot.out
@@ -0,0 +1,58 @@
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t', 'test_decoding', true);
+ ?column?
+----------
+ init
+(1 row)
+
+SELECT pg_drop_replication_slot('regression_slot_p');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test_decoding', false);
+ ?column?
+----------
+ init
+(1 row)
+
+-- reconnect to clean temp slots
+\c
+SELECT pg_drop_replication_slot('regression_slot_p');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+-- should fail because the temporary slot was dropped automatically
+SELECT pg_drop_replication_slot('regression_slot_t');
+ERROR: replication slot "regression_slot_t" does not exist
+-- test switching between slots in a session
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true);
+ ?column?
+----------
+ init
+(1 row)
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot2', 'test_decoding', true);
+ ?column?
+----------
+ init
+(1 row)
+
+SELECT * FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL);
+ location | xid | data
+----------+-----+------
+(0 rows)
+
+SELECT * FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL);
+ location | xid | data
+----------+-----+------
+(0 rows)
+
diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql
new file mode 100644
index 00000000000..3b0aecd6a88
--- /dev/null
+++ b/contrib/test_decoding/sql/slot.sql
@@ -0,0 +1,20 @@
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t', 'test_decoding', true);
+
+SELECT pg_drop_replication_slot('regression_slot_p');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test_decoding', false);
+
+-- reconnect to clean temp slots
+\c
+
+SELECT pg_drop_replication_slot('regression_slot_p');
+
+-- should fail because the temporary slot was dropped automatically
+SELECT pg_drop_replication_slot('regression_slot_t');
+
+
+-- test switching between slots in a session
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true);
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot2', 'test_decoding', true);
+SELECT * FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL);
+SELECT * FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL);
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index eca98dfd349..0f9c9bf1296 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -18465,7 +18465,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
pg_create_physical_replication_slot
- pg_create_physical_replication_slot(slot_name name , immediately_reserve> boolean> )
+ pg_create_physical_replication_slot(slot_name name , immediately_reserve> boolean>, temporary> boolean>)
(slot_name name, xlog_position pg_lsn)
@@ -18478,7 +18478,11 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
the LSN> is reserved on first connection from a streaming
replication client. Streaming changes from a physical slot is only
possible with the streaming-replication protocol —
- see . This function corresponds
+ see . The optional third
+ parameter, temporary>, when set to true, specifies that
+ the slot should not be permanently stored to disk and is only meant
+ for use by current session. Temporary slots are also
+ released upon any error. This function corresponds
to the replication protocol command CREATE_REPLICATION_SLOT
... PHYSICAL.
@@ -18505,7 +18509,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
pg_create_logical_replication_slot
- pg_create_logical_replication_slot(slot_name name, plugin name)
+ pg_create_logical_replication_slot(slot_name name, plugin name , temporary> boolean>)
(slot_name name, xlog_position pg_lsn)
@@ -18513,7 +18517,11 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
Creates a new logical (decoding) replication slot named
slot_name using the output plugin
- plugin. A call to this function has the same
+ plugin. The optional third
+ parameter, temporary>, when set to true, specifies that
+ the slot should not be permanently stored to disk and is only meant
+ for use by current session. Temporary slots are also
+ released upon any error. A call to this function has the same
effect as the replication protocol command
CREATE_REPLICATION_SLOT ... LOGICAL.
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 50cf5274274..9ba147cae5e 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1434,7 +1434,7 @@ The commands accepted in walsender mode are:
- CREATE_REPLICATION_SLOT slot_name> { PHYSICAL> [ RESERVE_WAL> ] | LOGICAL> output_plugin> }
+ CREATE_REPLICATION_SLOT slot_name> [ TEMPORARY> ] { PHYSICAL> [ RESERVE_WAL> ] | LOGICAL> output_plugin> }
CREATE_REPLICATION_SLOT
@@ -1464,6 +1464,17 @@ The commands accepted in walsender mode are:
+
+ TEMPORARY>
+
+
+ Specify that this replication slot is a temporary one. Temporary
+ slots are not saved to disk and are automatically dropped on error
+ or when the session has finished.
+
+
+
+
RESERVE_WAL>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index df59d1819ca..48e7c4b7f95 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -726,6 +726,7 @@ CREATE VIEW pg_replication_slots AS
L.slot_type,
L.datoid,
D.datname AS database,
+ L.temporary,
L.active,
L.active_pid,
L.xmin,
@@ -991,12 +992,22 @@ AS 'pg_logical_slot_peek_binary_changes';
CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
IN slot_name name, IN immediately_reserve boolean DEFAULT false,
+ IN temporary boolean DEFAULT false,
OUT slot_name name, OUT xlog_position pg_lsn)
RETURNS RECORD
LANGUAGE INTERNAL
STRICT VOLATILE
AS 'pg_create_physical_replication_slot';
+CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
+ IN slot_name name, IN plugin name,
+ IN temporary boolean DEFAULT false,
+ OUT slot_name text, OUT xlog_position pg_lsn)
+RETURNS RECORD
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_create_logical_replication_slot';
+
CREATE OR REPLACE FUNCTION
make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0,
days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0,
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index fd0fa6dde08..e75516c8d25 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -77,6 +77,7 @@ Node *replication_parse_result;
%token K_LOGICAL
%token K_SLOT
%token K_RESERVE_WAL
+%token K_TEMPORARY
%type command
%type base_backup start_replication start_logical_replication
@@ -89,7 +90,7 @@ Node *replication_parse_result;
%type plugin_opt_elem
%type plugin_opt_arg
%type opt_slot
-%type opt_reserve_wal
+%type opt_reserve_wal opt_temporary
%%
@@ -183,24 +184,26 @@ base_backup_opt:
;
create_replication_slot:
- /* CREATE_REPLICATION_SLOT slot PHYSICAL RESERVE_WAL */
- K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL opt_reserve_wal
+ /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
+ K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL opt_reserve_wal
{
CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd);
cmd->kind = REPLICATION_KIND_PHYSICAL;
cmd->slotname = $2;
- cmd->reserve_wal = $4;
+ cmd->temporary = $3;
+ cmd->reserve_wal = $5;
$$ = (Node *) cmd;
}
- /* CREATE_REPLICATION_SLOT slot LOGICAL plugin */
- | K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT
+ /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
+ | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT
{
CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd);
cmd->kind = REPLICATION_KIND_LOGICAL;
cmd->slotname = $2;
- cmd->plugin = $4;
+ cmd->temporary = $3;
+ cmd->plugin = $5;
$$ = (Node *) cmd;
}
;
@@ -276,6 +279,11 @@ opt_reserve_wal:
| /* EMPTY */ { $$ = false; }
;
+opt_temporary:
+ K_TEMPORARY { $$ = true; }
+ | /* EMPTY */ { $$ = false; }
+ ;
+
opt_slot:
K_SLOT IDENT
{ $$ = $2; }
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index f83ec538b67..9f50ce64a54 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -98,6 +98,7 @@ PHYSICAL { return K_PHYSICAL; }
RESERVE_WAL { return K_RESERVE_WAL; }
LOGICAL { return K_LOGICAL; }
SLOT { return K_SLOT; }
+TEMPORARY { return K_TEMPORARY; }
"," { return ','; }
";" { return ';'; }
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 0b2575ee9d0..d8ed005e7ec 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -47,6 +47,7 @@
#include "storage/fd.h"
#include "storage/proc.h"
#include "storage/procarray.h"
+#include "utils/builtins.h"
/*
* Replication slot on-disk data structure.
@@ -98,7 +99,9 @@ int max_replication_slots = 0; /* the maximum number of replication
* slots */
static LWLockTranche ReplSlotIOLWLockTranche;
+
static void ReplicationSlotDropAcquired(void);
+static void ReplicationSlotDropPtr(ReplicationSlot *slot);
/* internal persistency functions */
static void RestoreSlotFromDisk(const char *name);
@@ -329,7 +332,7 @@ ReplicationSlotAcquire(const char *name)
{
ReplicationSlot *slot = NULL;
int i;
- int active_pid = 0;
+ int active_pid = 0; /* Keep compiler quiet */
Assert(MyReplicationSlot == NULL);
@@ -346,7 +349,7 @@ ReplicationSlotAcquire(const char *name)
SpinLockAcquire(&s->mutex);
active_pid = s->active_pid;
if (active_pid == 0)
- s->active_pid = MyProcPid;
+ active_pid = s->active_pid = MyProcPid;
SpinLockRelease(&s->mutex);
slot = s;
break;
@@ -359,7 +362,7 @@ ReplicationSlotAcquire(const char *name)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("replication slot \"%s\" does not exist", name)));
- if (active_pid != 0)
+ if (active_pid != MyProcPid)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("replication slot \"%s\" is active for PID %d",
@@ -389,9 +392,12 @@ ReplicationSlotRelease(void)
*/
ReplicationSlotDropAcquired();
}
- else
+ else if (slot->data.persistency == RS_PERSISTENT)
{
- /* Mark slot inactive. We're not freeing it, just disconnecting. */
+ /*
+ * Mark persistent slot inactive. We're not freeing it, just
+ * disconnecting.
+ */
SpinLockAcquire(&slot->mutex);
slot->active_pid = 0;
SpinLockRelease(&slot->mutex);
@@ -405,6 +411,33 @@ ReplicationSlotRelease(void)
LWLockRelease(ProcArrayLock);
}
+/*
+ * Cleanup all temporary slots created in current session.
+ */
+void
+ReplicationSlotCleanup()
+{
+ int i;
+
+ Assert(MyReplicationSlot == NULL);
+
+ /*
+ * No need for locking as we are only interested in slots active in
+ * current process and those are not touched by other processes.
+ */
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ if (s->active_pid == MyProcPid)
+ {
+ Assert(s->in_use && s->data.persistency == RS_TEMPORARY);
+
+ ReplicationSlotDropPtr(s);
+ }
+ }
+}
+
/*
* Permanently drop replication slot identified by the passed in name.
*/
@@ -419,14 +452,11 @@ ReplicationSlotDrop(const char *name)
}
/*
- * Permanently drop the currently acquired replication slot which will be
- * released by the point this function returns.
+ * Permanently drop the currently acquired replication slot.
*/
static void
ReplicationSlotDropAcquired(void)
{
- char path[MAXPGPATH];
- char tmppath[MAXPGPATH];
ReplicationSlot *slot = MyReplicationSlot;
Assert(MyReplicationSlot != NULL);
@@ -434,6 +464,19 @@ ReplicationSlotDropAcquired(void)
/* slot isn't acquired anymore */
MyReplicationSlot = NULL;
+ ReplicationSlotDropPtr(slot);
+}
+
+/*
+ * Permanently drop the replication slot which will be released by the point
+ * this function returns.
+ */
+static void
+ReplicationSlotDropPtr(ReplicationSlot *slot)
+{
+ char path[MAXPGPATH];
+ char tmppath[MAXPGPATH];
+
/*
* If some other backend ran this code concurrently with us, we might try
* to delete a slot with a certain name while someone else was trying to
@@ -448,9 +491,9 @@ ReplicationSlotDropAcquired(void)
/*
* Rename the slot directory on disk, so that we'll no longer recognize
* this as a valid slot. Note that if this fails, we've got to mark the
- * slot inactive before bailing out. If we're dropping an ephemeral slot,
- * we better never fail hard as the caller won't expect the slot to
- * survive and this might get called during error handling.
+ * slot inactive before bailing out. If we're dropping an ephemeral or
+ * a temporary slot, we better never fail hard as the caller won't expect
+ * the slot to survive and this might get called during error handling.
*/
if (rename(path, tmppath) == 0)
{
@@ -469,7 +512,7 @@ ReplicationSlotDropAcquired(void)
}
else
{
- bool fail_softly = slot->data.persistency == RS_EPHEMERAL;
+ bool fail_softly = slot->data.persistency != RS_PERSISTENT;
SpinLockAcquire(&slot->mutex);
slot->active_pid = 0;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index f9087619d2b..1f1c56cc21f 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -41,6 +41,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
{
Name name = PG_GETARG_NAME(0);
bool immediately_reserve = PG_GETARG_BOOL(1);
+ bool temporary = PG_GETARG_BOOL(2);
Datum values[2];
bool nulls[2];
TupleDesc tupdesc;
@@ -57,7 +58,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
CheckSlotRequirements();
/* acquire replication slot, this will check for conflicting names */
- ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
+ ReplicationSlotCreate(NameStr(*name), false,
+ temporary ? RS_TEMPORARY : RS_PERSISTENT);
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
nulls[0] = false;
@@ -96,6 +98,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
{
Name name = PG_GETARG_NAME(0);
Name plugin = PG_GETARG_NAME(1);
+ bool temporary = PG_GETARG_BOOL(2);
LogicalDecodingContext *ctx = NULL;
@@ -116,11 +119,14 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
/*
* Acquire a logical decoding slot, this will check for conflicting names.
- * Initially create it as ephemeral - that allows us to nicely handle
- * errors during initialization because it'll get dropped if this
+ * Initially create persisent slot as ephemeral - that allows us to nicely
+ * handle errors during initialization because it'll get dropped if this
* transaction fails. We'll make it persistent at the end.
+ * Temporary slots can be created as temporary from beginning as they get
+ * dropped on error as well.
*/
- ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL);
+ ReplicationSlotCreate(NameStr(*name), true,
+ temporary ? RS_TEMPORARY : RS_EPHEMERAL);
/*
* Create logical decoding context, to build the initial snapshot.
@@ -143,8 +149,9 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
tuple = heap_form_tuple(tupdesc, values, nulls);
result = HeapTupleGetDatum(tuple);
- /* ok, slot is now fully created, mark it as persistent */
- ReplicationSlotPersist();
+ /* ok, slot is now fully created, mark it as persistent if needed */
+ if (!temporary)
+ ReplicationSlotPersist();
ReplicationSlotRelease();
PG_RETURN_DATUM(result);
@@ -174,7 +181,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_GET_REPLICATION_SLOTS_COLS 10
+#define PG_GET_REPLICATION_SLOTS_COLS 11
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -219,6 +226,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
Datum values[PG_GET_REPLICATION_SLOTS_COLS];
bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
+ ReplicationSlotPersistency persistency;
TransactionId xmin;
TransactionId catalog_xmin;
XLogRecPtr restart_lsn;
@@ -246,6 +254,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
namecpy(&plugin, &slot->data.plugin);
active_pid = slot->active_pid;
+ persistency = slot->data.persistency;
}
SpinLockRelease(&slot->mutex);
@@ -269,6 +278,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
else
values[i++] = database;
+ values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
values[i++] = BoolGetDatum(active_pid != 0);
if (active_pid != 0)
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index aa42d596104..b14d82153af 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -266,6 +266,8 @@ WalSndErrorCleanup(void)
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
+ ReplicationSlotCleanup();
+
replication_active = false;
if (walsender_ready_to_stop)
proc_exit(0);
@@ -796,18 +798,22 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
{
- ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT);
+ ReplicationSlotCreate(cmd->slotname, false,
+ cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT);
}
else
{
CheckLogicalDecodingRequirements();
/*
- * Initially create the slot as ephemeral - that allows us to nicely
- * handle errors during initialization because it'll get dropped if
- * this transaction fails. We'll make it persistent at the end.
+ * Initially create persisent slot as ephemeral - that allows us to
+ * nicely handle errors during initialization because it'll get
+ * dropped if this transaction fails. We'll make it persistent at the
+ * end. Temporary slots can be created as temporary from beginning as
+ * they get dropped on error as well.
*/
- ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL);
+ ReplicationSlotCreate(cmd->slotname, true,
+ cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
}
initStringInfo(&output_message);
@@ -841,15 +847,18 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
/* don't need the decoding context anymore */
FreeDecodingContext(ctx);
- ReplicationSlotPersist();
+ if (!cmd->temporary)
+ ReplicationSlotPersist();
}
else if (cmd->kind == REPLICATION_KIND_PHYSICAL && cmd->reserve_wal)
{
ReplicationSlotReserveWal();
- /* Write this slot to disk */
ReplicationSlotMarkDirty();
- ReplicationSlotSave();
+
+ /* Write this slot to disk if it's permanent one. */
+ if (!cmd->temporary)
+ ReplicationSlotSave();
}
snprintf(xpos, sizeof(xpos), "%X/%X",
@@ -933,9 +942,6 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
pq_endmessage(&buf);
- /*
- * release active status again, START_REPLICATION will reacquire it
- */
ReplicationSlotRelease();
}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 83e9ca15d18..276261bd7b3 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -810,6 +810,9 @@ ProcKill(int code, Datum arg)
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
+ /* Also cleanup all the temporary slots. */
+ ReplicationSlotCleanup();
+
/*
* Detach from any lock group of which we are a member. If the leader
* exist before all other group members, it's PGPROC will remain allocated
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index cc847548a9f..b17923106a6 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -3878,6 +3878,9 @@ PostgresMain(int argc, char *argv[],
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
+ /* We also want to cleanup temporary slots on error. */
+ ReplicationSlotCleanup();
+
/*
* Now return to normal top-level context and clear ErrorContext for
* next time.
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 96e77ec4379..cd7b9098120 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5178,13 +5178,13 @@ DATA(insert OID = 5016 ( spg_box_quad_leaf_consistent PGNSP PGUID 12 1 0 0 0 f
DESCR("SP-GiST support for quad tree over box");
/* replication slots */
-DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
+DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 16 16" "{19,16,16,19,3220}" "{i,i,i,o,o}" "{slot_name,immediately_reserve,temporary,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
DESCR("create a physical replication slot");
DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
DESCR("drop a replication slot");
-DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
+DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
DESCR("information about replication slots currently in use");
-DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
+DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
DESCR("set up a logical replication slot");
DATA(insert OID = 3782 ( pg_logical_slot_get_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_get_changes _null_ _null_ _null_ ));
DESCR("get changes from replication slot");
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index d2f1edbf0d1..024b965a24b 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -55,6 +55,7 @@ typedef struct CreateReplicationSlotCmd
char *slotname;
ReplicationKind kind;
char *plugin;
+ bool temporary;
bool reserve_wal;
} CreateReplicationSlotCmd;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index e00562d274f..b653e5c1962 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -28,7 +28,8 @@
typedef enum ReplicationSlotPersistency
{
RS_PERSISTENT,
- RS_EPHEMERAL
+ RS_EPHEMERAL,
+ RS_TEMPORARY
} ReplicationSlotPersistency;
/*
@@ -165,6 +166,7 @@ extern void ReplicationSlotDrop(const char *name);
extern void ReplicationSlotAcquire(const char *name);
extern void ReplicationSlotRelease(void);
+extern void ReplicationSlotCleanup(void);
extern void ReplicationSlotSave(void);
extern void ReplicationSlotMarkDirty(void);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index a8f35a76fab..5314b9c207f 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1427,13 +1427,14 @@ pg_replication_slots| SELECT l.slot_name,
l.slot_type,
l.datoid,
d.datname AS database,
+ l.temporary,
l.active,
l.active_pid,
l.xmin,
l.catalog_xmin,
l.restart_lsn,
l.confirmed_flush_lsn
- FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
+ FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,