mirror of
https://github.com/postgres/postgres.git
synced 2025-06-10 09:21:54 +03:00
Add 'active_in' column to pg_replication_slots.
Right now it is visible whether a replication slot is active in any session, but not in which. Adding the active_in column, containing the pid of the backend having acquired the slot, makes it much easier to associate pg_replication_slots entries with the corresponding pg_stat_replication/pg_stat_activity row. This should have been done from the start, but I (Andres) dropped the ball there somehow. Author: Craig Ringer, revised by me Discussion: CAMsr+YFKgZca5_7_ouaMWxA5PneJC9LNViPzpDHusaPhU9pA7g@mail.gmail.com
This commit is contained in:
parent
528c2e44ab
commit
d811c037ce
@ -603,7 +603,7 @@ SELECT pg_drop_replication_slot('regression_slot');
|
|||||||
|
|
||||||
/* check that the slot is gone */
|
/* check that the slot is gone */
|
||||||
SELECT * FROM pg_replication_slots;
|
SELECT * FROM pg_replication_slots;
|
||||||
slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn
|
slot_name | plugin | slot_type | datoid | database | active | active_in | xmin | catalog_xmin | restart_lsn
|
||||||
-----------+--------+-----------+--------+----------+--------+------+--------------+-------------
|
-----------+--------+-----------+--------+----------+--------+-----------+------+--------------+-------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
|
@ -5400,6 +5400,16 @@
|
|||||||
<entry>True if this slot is currently actively being used</entry>
|
<entry>True if this slot is currently actively being used</entry>
|
||||||
</row>
|
</row>
|
||||||
|
|
||||||
|
<row>
|
||||||
|
<entry><structfield>active_in</structfield></entry>
|
||||||
|
<entry><type>integer</type></entry>
|
||||||
|
<entry></entry>
|
||||||
|
<entry>The process ID of the session using this slot if the slot
|
||||||
|
is currently actively being used. <literal>NULL</literal> if
|
||||||
|
inactive.
|
||||||
|
</entry>
|
||||||
|
</row>
|
||||||
|
|
||||||
<row>
|
<row>
|
||||||
<entry><structfield>xmin</structfield></entry>
|
<entry><structfield>xmin</structfield></entry>
|
||||||
<entry><type>xid</type></entry>
|
<entry><type>xid</type></entry>
|
||||||
|
@ -62,10 +62,10 @@ postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', '
|
|||||||
regression_slot | 0/16B1970
|
regression_slot | 0/16B1970
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
postgres=# SELECT * FROM pg_replication_slots;
|
postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn FROM pg_replication_slots;
|
||||||
slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn
|
slot_name | plugin | slot_type | database | active | restart_lsn
|
||||||
-----------------+---------------+-----------+--------+----------+--------+--------+--------------+-------------
|
-----------------+---------------+-----------+----------+--------+-------------
|
||||||
regression_slot | test_decoding | logical | 12052 | postgres | f | | 684 | 0/16A4408
|
regression_slot | test_decoding | logical | postgres | f | 0/16A4408
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
postgres=# -- There are no changes to see yet
|
postgres=# -- There are no changes to see yet
|
||||||
|
@ -665,6 +665,7 @@ CREATE VIEW pg_replication_slots AS
|
|||||||
L.datoid,
|
L.datoid,
|
||||||
D.datname AS database,
|
D.datname AS database,
|
||||||
L.active,
|
L.active,
|
||||||
|
L.active_in,
|
||||||
L.xmin,
|
L.xmin,
|
||||||
L.catalog_xmin,
|
L.catalog_xmin,
|
||||||
L.restart_lsn
|
L.restart_lsn
|
||||||
|
@ -262,7 +262,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
|
|||||||
* be doing that. So it's safe to initialize the slot.
|
* be doing that. So it's safe to initialize the slot.
|
||||||
*/
|
*/
|
||||||
Assert(!slot->in_use);
|
Assert(!slot->in_use);
|
||||||
Assert(!slot->active);
|
Assert(slot->active_pid == 0);
|
||||||
slot->data.persistency = persistency;
|
slot->data.persistency = persistency;
|
||||||
slot->data.xmin = InvalidTransactionId;
|
slot->data.xmin = InvalidTransactionId;
|
||||||
slot->effective_xmin = InvalidTransactionId;
|
slot->effective_xmin = InvalidTransactionId;
|
||||||
@ -291,8 +291,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
|
|||||||
volatile ReplicationSlot *vslot = slot;
|
volatile ReplicationSlot *vslot = slot;
|
||||||
|
|
||||||
SpinLockAcquire(&slot->mutex);
|
SpinLockAcquire(&slot->mutex);
|
||||||
Assert(!vslot->active);
|
Assert(vslot->active_pid == 0);
|
||||||
vslot->active = true;
|
vslot->active_pid = MyProcPid;
|
||||||
SpinLockRelease(&slot->mutex);
|
SpinLockRelease(&slot->mutex);
|
||||||
MyReplicationSlot = slot;
|
MyReplicationSlot = slot;
|
||||||
}
|
}
|
||||||
@ -314,7 +314,7 @@ ReplicationSlotAcquire(const char *name)
|
|||||||
{
|
{
|
||||||
ReplicationSlot *slot = NULL;
|
ReplicationSlot *slot = NULL;
|
||||||
int i;
|
int i;
|
||||||
bool active = false;
|
int active_pid = 0;
|
||||||
|
|
||||||
Assert(MyReplicationSlot == NULL);
|
Assert(MyReplicationSlot == NULL);
|
||||||
|
|
||||||
@ -331,8 +331,9 @@ ReplicationSlotAcquire(const char *name)
|
|||||||
volatile ReplicationSlot *vslot = s;
|
volatile ReplicationSlot *vslot = s;
|
||||||
|
|
||||||
SpinLockAcquire(&s->mutex);
|
SpinLockAcquire(&s->mutex);
|
||||||
active = vslot->active;
|
active_pid = vslot->active_pid;
|
||||||
vslot->active = true;
|
if (active_pid == 0)
|
||||||
|
vslot->active_pid = MyProcPid;
|
||||||
SpinLockRelease(&s->mutex);
|
SpinLockRelease(&s->mutex);
|
||||||
slot = s;
|
slot = s;
|
||||||
break;
|
break;
|
||||||
@ -345,10 +346,11 @@ ReplicationSlotAcquire(const char *name)
|
|||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_UNDEFINED_OBJECT),
|
(errcode(ERRCODE_UNDEFINED_OBJECT),
|
||||||
errmsg("replication slot \"%s\" does not exist", name)));
|
errmsg("replication slot \"%s\" does not exist", name)));
|
||||||
if (active)
|
if (active_pid != 0)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_OBJECT_IN_USE),
|
(errcode(ERRCODE_OBJECT_IN_USE),
|
||||||
errmsg("replication slot \"%s\" is already active", name)));
|
errmsg("replication slot \"%s\" is already active for pid %d",
|
||||||
|
name, active_pid)));
|
||||||
|
|
||||||
/* We made this slot active, so it's ours now. */
|
/* We made this slot active, so it's ours now. */
|
||||||
MyReplicationSlot = slot;
|
MyReplicationSlot = slot;
|
||||||
@ -363,7 +365,7 @@ ReplicationSlotRelease(void)
|
|||||||
{
|
{
|
||||||
ReplicationSlot *slot = MyReplicationSlot;
|
ReplicationSlot *slot = MyReplicationSlot;
|
||||||
|
|
||||||
Assert(slot != NULL && slot->active);
|
Assert(slot != NULL && slot->active_pid != 0);
|
||||||
|
|
||||||
if (slot->data.persistency == RS_EPHEMERAL)
|
if (slot->data.persistency == RS_EPHEMERAL)
|
||||||
{
|
{
|
||||||
@ -380,7 +382,7 @@ ReplicationSlotRelease(void)
|
|||||||
volatile ReplicationSlot *vslot = slot;
|
volatile ReplicationSlot *vslot = slot;
|
||||||
|
|
||||||
SpinLockAcquire(&slot->mutex);
|
SpinLockAcquire(&slot->mutex);
|
||||||
vslot->active = false;
|
vslot->active_pid = 0;
|
||||||
SpinLockRelease(&slot->mutex);
|
SpinLockRelease(&slot->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -460,7 +462,7 @@ ReplicationSlotDropAcquired(void)
|
|||||||
bool fail_softly = slot->data.persistency == RS_EPHEMERAL;
|
bool fail_softly = slot->data.persistency == RS_EPHEMERAL;
|
||||||
|
|
||||||
SpinLockAcquire(&slot->mutex);
|
SpinLockAcquire(&slot->mutex);
|
||||||
vslot->active = false;
|
vslot->active_pid = 0;
|
||||||
SpinLockRelease(&slot->mutex);
|
SpinLockRelease(&slot->mutex);
|
||||||
|
|
||||||
ereport(fail_softly ? WARNING : ERROR,
|
ereport(fail_softly ? WARNING : ERROR,
|
||||||
@ -477,7 +479,7 @@ ReplicationSlotDropAcquired(void)
|
|||||||
* scanning the array.
|
* scanning the array.
|
||||||
*/
|
*/
|
||||||
LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
|
LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
|
||||||
slot->active = false;
|
slot->active_pid = 0;
|
||||||
slot->in_use = false;
|
slot->in_use = false;
|
||||||
LWLockRelease(ReplicationSlotControlLock);
|
LWLockRelease(ReplicationSlotControlLock);
|
||||||
|
|
||||||
@ -749,7 +751,7 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
|
|||||||
/* count slots with spinlock held */
|
/* count slots with spinlock held */
|
||||||
SpinLockAcquire(&s->mutex);
|
SpinLockAcquire(&s->mutex);
|
||||||
(*nslots)++;
|
(*nslots)++;
|
||||||
if (s->active)
|
if (s->active_pid != 0)
|
||||||
(*nactive)++;
|
(*nactive)++;
|
||||||
SpinLockRelease(&s->mutex);
|
SpinLockRelease(&s->mutex);
|
||||||
}
|
}
|
||||||
@ -1227,7 +1229,7 @@ RestoreSlotFromDisk(const char *name)
|
|||||||
slot->candidate_restart_valid = InvalidXLogRecPtr;
|
slot->candidate_restart_valid = InvalidXLogRecPtr;
|
||||||
|
|
||||||
slot->in_use = true;
|
slot->in_use = true;
|
||||||
slot->active = false;
|
slot->active_pid = 0;
|
||||||
|
|
||||||
restored = true;
|
restored = true;
|
||||||
break;
|
break;
|
||||||
|
@ -158,7 +158,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
|
|||||||
Datum
|
Datum
|
||||||
pg_get_replication_slots(PG_FUNCTION_ARGS)
|
pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
#define PG_GET_REPLICATION_SLOTS_COLS 8
|
#define PG_GET_REPLICATION_SLOTS_COLS 9
|
||||||
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||||
TupleDesc tupdesc;
|
TupleDesc tupdesc;
|
||||||
Tuplestorestate *tupstore;
|
Tuplestorestate *tupstore;
|
||||||
@ -206,7 +206,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
|||||||
TransactionId xmin;
|
TransactionId xmin;
|
||||||
TransactionId catalog_xmin;
|
TransactionId catalog_xmin;
|
||||||
XLogRecPtr restart_lsn;
|
XLogRecPtr restart_lsn;
|
||||||
bool active;
|
pid_t active_pid;
|
||||||
Oid database;
|
Oid database;
|
||||||
NameData slot_name;
|
NameData slot_name;
|
||||||
NameData plugin;
|
NameData plugin;
|
||||||
@ -227,7 +227,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
|||||||
namecpy(&slot_name, &slot->data.name);
|
namecpy(&slot_name, &slot->data.name);
|
||||||
namecpy(&plugin, &slot->data.plugin);
|
namecpy(&plugin, &slot->data.plugin);
|
||||||
|
|
||||||
active = slot->active;
|
active_pid = slot->active_pid;
|
||||||
}
|
}
|
||||||
SpinLockRelease(&slot->mutex);
|
SpinLockRelease(&slot->mutex);
|
||||||
|
|
||||||
@ -251,7 +251,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
|||||||
else
|
else
|
||||||
values[i++] = database;
|
values[i++] = database;
|
||||||
|
|
||||||
values[i++] = BoolGetDatum(active);
|
values[i++] = BoolGetDatum(active_pid != 0);
|
||||||
|
|
||||||
|
if (active_pid != 0)
|
||||||
|
values[i++] = Int32GetDatum(active_pid);
|
||||||
|
else
|
||||||
|
nulls[i++] = true;
|
||||||
|
|
||||||
if (xmin != InvalidTransactionId)
|
if (xmin != InvalidTransactionId)
|
||||||
values[i++] = TransactionIdGetDatum(xmin);
|
values[i++] = TransactionIdGetDatum(xmin);
|
||||||
|
@ -5106,7 +5106,7 @@ DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0
|
|||||||
DESCR("create a physical replication slot");
|
DESCR("create a physical replication slot");
|
||||||
DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
|
DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
|
||||||
DESCR("drop a replication slot");
|
DESCR("drop a replication slot");
|
||||||
DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,28,28,3220}" "{o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,xmin,catalog_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
|
DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220}" "{o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_in,xmin,catalog_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
|
||||||
DESCR("information about replication slots currently in use");
|
DESCR("information about replication slots currently in use");
|
||||||
DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
|
DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
|
||||||
DESCR("set up a logical replication slot");
|
DESCR("set up a logical replication slot");
|
||||||
|
@ -84,8 +84,8 @@ typedef struct ReplicationSlot
|
|||||||
/* is this slot defined */
|
/* is this slot defined */
|
||||||
bool in_use;
|
bool in_use;
|
||||||
|
|
||||||
/* is somebody streaming out changes for this slot */
|
/* Who is streaming out changes for this slot? 0 in unused slots. */
|
||||||
bool active;
|
pid_t active_pid;
|
||||||
|
|
||||||
/* any outstanding modifications? */
|
/* any outstanding modifications? */
|
||||||
bool just_dirtied;
|
bool just_dirtied;
|
||||||
|
@ -1396,10 +1396,11 @@ pg_replication_slots| SELECT l.slot_name,
|
|||||||
l.datoid,
|
l.datoid,
|
||||||
d.datname AS database,
|
d.datname AS database,
|
||||||
l.active,
|
l.active,
|
||||||
|
l.active_in,
|
||||||
l.xmin,
|
l.xmin,
|
||||||
l.catalog_xmin,
|
l.catalog_xmin,
|
||||||
l.restart_lsn
|
l.restart_lsn
|
||||||
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, xmin, catalog_xmin, restart_lsn)
|
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_in, xmin, catalog_xmin, restart_lsn)
|
||||||
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
|
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
|
||||||
pg_roles| SELECT pg_authid.rolname,
|
pg_roles| SELECT pg_authid.rolname,
|
||||||
pg_authid.rolsuper,
|
pg_authid.rolsuper,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user