1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-10 17:42:29 +03:00

Add support for temporary replication slots

This allows creating temporary replication slots that are removed
automatically at the end of the session or on error.

From: Petr Jelinek <petr.jelinek@2ndquadrant.com>
This commit is contained in:
Peter Eisentraut
2016-12-08 12:00:00 -05:00
parent e7f051b8f9
commit a924c327e2
18 changed files with 237 additions and 51 deletions

View File

@@ -77,6 +77,7 @@ Node *replication_parse_result;
%token K_LOGICAL
%token K_SLOT
%token K_RESERVE_WAL
%token K_TEMPORARY
%type <node> command
%type <node> base_backup start_replication start_logical_replication
@@ -89,7 +90,7 @@ Node *replication_parse_result;
%type <defelt> plugin_opt_elem
%type <node> plugin_opt_arg
%type <str> opt_slot
%type <boolval> opt_reserve_wal
%type <boolval> opt_reserve_wal opt_temporary
%%
@@ -183,24 +184,26 @@ base_backup_opt:
;
create_replication_slot:
/* CREATE_REPLICATION_SLOT slot PHYSICAL RESERVE_WAL */
K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL opt_reserve_wal
/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL opt_reserve_wal
{
CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd);
cmd->kind = REPLICATION_KIND_PHYSICAL;
cmd->slotname = $2;
cmd->reserve_wal = $4;
cmd->temporary = $3;
cmd->reserve_wal = $5;
$$ = (Node *) cmd;
}
/* CREATE_REPLICATION_SLOT slot LOGICAL plugin */
| K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT
/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
| K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT
{
CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd);
cmd->kind = REPLICATION_KIND_LOGICAL;
cmd->slotname = $2;
cmd->plugin = $4;
cmd->temporary = $3;
cmd->plugin = $5;
$$ = (Node *) cmd;
}
;
@@ -276,6 +279,11 @@ opt_reserve_wal:
| /* EMPTY */ { $$ = false; }
;
opt_temporary:
K_TEMPORARY { $$ = true; }
| /* EMPTY */ { $$ = false; }
;
opt_slot:
K_SLOT IDENT
{ $$ = $2; }

View File

@@ -98,6 +98,7 @@ PHYSICAL { return K_PHYSICAL; }
RESERVE_WAL { return K_RESERVE_WAL; }
LOGICAL { return K_LOGICAL; }
SLOT { return K_SLOT; }
TEMPORARY { return K_TEMPORARY; }
"," { return ','; }
";" { return ';'; }

View File

@@ -47,6 +47,7 @@
#include "storage/fd.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
/*
* Replication slot on-disk data structure.
@@ -98,7 +99,9 @@ int max_replication_slots = 0; /* the maximum number of replication
* slots */
static LWLockTranche ReplSlotIOLWLockTranche;
static void ReplicationSlotDropAcquired(void);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);
/* internal persistency functions */
static void RestoreSlotFromDisk(const char *name);
@@ -329,7 +332,7 @@ ReplicationSlotAcquire(const char *name)
{
ReplicationSlot *slot = NULL;
int i;
int active_pid = 0;
int active_pid = 0; /* Keep compiler quiet */
Assert(MyReplicationSlot == NULL);
@@ -346,7 +349,7 @@ ReplicationSlotAcquire(const char *name)
SpinLockAcquire(&s->mutex);
active_pid = s->active_pid;
if (active_pid == 0)
s->active_pid = MyProcPid;
active_pid = s->active_pid = MyProcPid;
SpinLockRelease(&s->mutex);
slot = s;
break;
@@ -359,7 +362,7 @@ ReplicationSlotAcquire(const char *name)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("replication slot \"%s\" does not exist", name)));
if (active_pid != 0)
if (active_pid != MyProcPid)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("replication slot \"%s\" is active for PID %d",
@@ -389,9 +392,12 @@ ReplicationSlotRelease(void)
*/
ReplicationSlotDropAcquired();
}
else
else if (slot->data.persistency == RS_PERSISTENT)
{
/* Mark slot inactive. We're not freeing it, just disconnecting. */
/*
* Mark persistent slot inactive. We're not freeing it, just
* disconnecting.
*/
SpinLockAcquire(&slot->mutex);
slot->active_pid = 0;
SpinLockRelease(&slot->mutex);
@@ -405,6 +411,33 @@ ReplicationSlotRelease(void)
LWLockRelease(ProcArrayLock);
}
/*
* Cleanup all temporary slots created in current session.
*/
void
ReplicationSlotCleanup()
{
int i;
Assert(MyReplicationSlot == NULL);
/*
* No need for locking as we are only interested in slots active in
* current process and those are not touched by other processes.
*/
for (i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
if (s->active_pid == MyProcPid)
{
Assert(s->in_use && s->data.persistency == RS_TEMPORARY);
ReplicationSlotDropPtr(s);
}
}
}
/*
* Permanently drop replication slot identified by the passed in name.
*/
@@ -419,14 +452,11 @@ ReplicationSlotDrop(const char *name)
}
/*
* Permanently drop the currently acquired replication slot which will be
* released by the point this function returns.
* Permanently drop the currently acquired replication slot.
*/
static void
ReplicationSlotDropAcquired(void)
{
char path[MAXPGPATH];
char tmppath[MAXPGPATH];
ReplicationSlot *slot = MyReplicationSlot;
Assert(MyReplicationSlot != NULL);
@@ -434,6 +464,19 @@ ReplicationSlotDropAcquired(void)
/* slot isn't acquired anymore */
MyReplicationSlot = NULL;
ReplicationSlotDropPtr(slot);
}
/*
* Permanently drop the replication slot which will be released by the point
* this function returns.
*/
static void
ReplicationSlotDropPtr(ReplicationSlot *slot)
{
char path[MAXPGPATH];
char tmppath[MAXPGPATH];
/*
* If some other backend ran this code concurrently with us, we might try
* to delete a slot with a certain name while someone else was trying to
@@ -448,9 +491,9 @@ ReplicationSlotDropAcquired(void)
/*
* Rename the slot directory on disk, so that we'll no longer recognize
* this as a valid slot. Note that if this fails, we've got to mark the
* slot inactive before bailing out. If we're dropping an ephemeral slot,
* we better never fail hard as the caller won't expect the slot to
* survive and this might get called during error handling.
* slot inactive before bailing out. If we're dropping an ephemeral or
* a temporary slot, we better never fail hard as the caller won't expect
* the slot to survive and this might get called during error handling.
*/
if (rename(path, tmppath) == 0)
{
@@ -469,7 +512,7 @@ ReplicationSlotDropAcquired(void)
}
else
{
bool fail_softly = slot->data.persistency == RS_EPHEMERAL;
bool fail_softly = slot->data.persistency != RS_PERSISTENT;
SpinLockAcquire(&slot->mutex);
slot->active_pid = 0;

View File

@@ -41,6 +41,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
{
Name name = PG_GETARG_NAME(0);
bool immediately_reserve = PG_GETARG_BOOL(1);
bool temporary = PG_GETARG_BOOL(2);
Datum values[2];
bool nulls[2];
TupleDesc tupdesc;
@@ -57,7 +58,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
CheckSlotRequirements();
/* acquire replication slot, this will check for conflicting names */
ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
ReplicationSlotCreate(NameStr(*name), false,
temporary ? RS_TEMPORARY : RS_PERSISTENT);
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
nulls[0] = false;
@@ -96,6 +98,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
{
Name name = PG_GETARG_NAME(0);
Name plugin = PG_GETARG_NAME(1);
bool temporary = PG_GETARG_BOOL(2);
LogicalDecodingContext *ctx = NULL;
@@ -116,11 +119,14 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
/*
* Acquire a logical decoding slot, this will check for conflicting names.
* Initially create it as ephemeral - that allows us to nicely handle
* errors during initialization because it'll get dropped if this
* Initially create persisent slot as ephemeral - that allows us to nicely
* handle errors during initialization because it'll get dropped if this
* transaction fails. We'll make it persistent at the end.
* Temporary slots can be created as temporary from beginning as they get
* dropped on error as well.
*/
ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL);
ReplicationSlotCreate(NameStr(*name), true,
temporary ? RS_TEMPORARY : RS_EPHEMERAL);
/*
* Create logical decoding context, to build the initial snapshot.
@@ -143,8 +149,9 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
tuple = heap_form_tuple(tupdesc, values, nulls);
result = HeapTupleGetDatum(tuple);
/* ok, slot is now fully created, mark it as persistent */
ReplicationSlotPersist();
/* ok, slot is now fully created, mark it as persistent if needed */
if (!temporary)
ReplicationSlotPersist();
ReplicationSlotRelease();
PG_RETURN_DATUM(result);
@@ -174,7 +181,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
#define PG_GET_REPLICATION_SLOTS_COLS 10
#define PG_GET_REPLICATION_SLOTS_COLS 11
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -219,6 +226,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
Datum values[PG_GET_REPLICATION_SLOTS_COLS];
bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
ReplicationSlotPersistency persistency;
TransactionId xmin;
TransactionId catalog_xmin;
XLogRecPtr restart_lsn;
@@ -246,6 +254,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
namecpy(&plugin, &slot->data.plugin);
active_pid = slot->active_pid;
persistency = slot->data.persistency;
}
SpinLockRelease(&slot->mutex);
@@ -269,6 +278,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
else
values[i++] = database;
values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
values[i++] = BoolGetDatum(active_pid != 0);
if (active_pid != 0)

View File

@@ -266,6 +266,8 @@ WalSndErrorCleanup(void)
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
ReplicationSlotCleanup();
replication_active = false;
if (walsender_ready_to_stop)
proc_exit(0);
@@ -796,18 +798,22 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
{
ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT);
ReplicationSlotCreate(cmd->slotname, false,
cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT);
}
else
{
CheckLogicalDecodingRequirements();
/*
* Initially create the slot as ephemeral - that allows us to nicely
* handle errors during initialization because it'll get dropped if
* this transaction fails. We'll make it persistent at the end.
* Initially create persisent slot as ephemeral - that allows us to
* nicely handle errors during initialization because it'll get
* dropped if this transaction fails. We'll make it persistent at the
* end. Temporary slots can be created as temporary from beginning as
* they get dropped on error as well.
*/
ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL);
ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
}
initStringInfo(&output_message);
@@ -841,15 +847,18 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
/* don't need the decoding context anymore */
FreeDecodingContext(ctx);
ReplicationSlotPersist();
if (!cmd->temporary)
ReplicationSlotPersist();
}
else if (cmd->kind == REPLICATION_KIND_PHYSICAL && cmd->reserve_wal)
{
ReplicationSlotReserveWal();
/* Write this slot to disk */
ReplicationSlotMarkDirty();
ReplicationSlotSave();
/* Write this slot to disk if it's permanent one. */
if (!cmd->temporary)
ReplicationSlotSave();
}
snprintf(xpos, sizeof(xpos), "%X/%X",
@@ -933,9 +942,6 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
pq_endmessage(&buf);
/*
* release active status again, START_REPLICATION will reacquire it
*/
ReplicationSlotRelease();
}