1
0
mirror of https://github.com/postgres/postgres.git synced 2025-08-25 20:23:07 +03:00

Add replication command READ_REPLICATION_SLOT

The command is supported for physical slots for now, and returns the
type of slot, its restart_lsn and its restart_tli.

This will be useful for an upcoming patch related to pg_receivewal, to
allow the tool to be able to stream from the position of a slot, rather
than the last WAL position flushed by the backend (as reported by
IDENTIFY_SYSTEM) if the archive directory is found as empty, which would
be an advantage in the case of switching to a different archive
locations with the same slot used to avoid holes in WAL segment
archives.

Author: Ronan Dunklau
Reviewed-by: Kyotaro Horiguchi, Michael Paquier, Bharath Rupireddy
Discussion: https://postgr.es/m/18708360.4lzOvYHigE@aivenronan
This commit is contained in:
Michael Paquier
2021-10-25 07:40:42 +09:00
parent 70bef49400
commit b4ada4e19f
9 changed files with 224 additions and 3 deletions

View File

@@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void);
/* Keyword tokens. */
%token K_BASE_BACKUP
%token K_IDENTIFY_SYSTEM
%token K_READ_REPLICATION_SLOT
%token K_SHOW
%token K_START_REPLICATION
%token K_CREATE_REPLICATION_SLOT
@@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void);
%type <node> command
%type <node> base_backup start_replication start_logical_replication
create_replication_slot drop_replication_slot identify_system
timeline_history show sql_cmd
read_replication_slot timeline_history show sql_cmd
%type <list> base_backup_legacy_opt_list generic_option_list
%type <defelt> base_backup_legacy_opt generic_option
%type <uintval> opt_timeline
@@ -125,6 +126,7 @@ command:
| start_logical_replication
| create_replication_slot
| drop_replication_slot
| read_replication_slot
| timeline_history
| show
| sql_cmd
@@ -140,6 +142,18 @@ identify_system:
}
;
/*
* READ_REPLICATION_SLOT %s
*/
read_replication_slot:
K_READ_REPLICATION_SLOT var_name
{
ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd);
n->slotname = $2;
$$ = (Node *) n;
}
;
/*
* SHOW setting
*/

View File

@@ -85,6 +85,7 @@ identifier {ident_start}{ident_cont}*
BASE_BACKUP { return K_BASE_BACKUP; }
FAST { return K_FAST; }
IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; }
READ_REPLICATION_SLOT { return K_READ_REPLICATION_SLOT; }
SHOW { return K_SHOW; }
LABEL { return K_LABEL; }
NOWAIT { return K_NOWAIT; }

View File

@@ -232,6 +232,7 @@ static void XLogSendLogical(void);
static void WalSndDone(WalSndSendDataCallback send_data);
static XLogRecPtr GetStandbyFlushRecPtr(void);
static void IdentifySystem(void);
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
static void StartReplication(StartReplicationCmd *cmd);
@@ -457,6 +458,104 @@ IdentifySystem(void)
end_tup_output(tstate);
}
/* Handle READ_REPLICATION_SLOT command */
static void
ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
{
#define READ_REPLICATION_SLOT_COLS 3
ReplicationSlot *slot;
DestReceiver *dest;
TupOutputState *tstate;
TupleDesc tupdesc;
Datum values[READ_REPLICATION_SLOT_COLS];
bool nulls[READ_REPLICATION_SLOT_COLS];
tupdesc = CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
TEXTOID, -1, 0);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
TEXTOID, -1, 0);
/* TimeLineID is unsigned, so int4 is not wide enough. */
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
INT8OID, -1, 0);
MemSet(values, 0, READ_REPLICATION_SLOT_COLS * sizeof(Datum));
MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
slot = SearchNamedReplicationSlot(cmd->slotname, false);
if (slot == NULL || !slot->in_use)
{
LWLockRelease(ReplicationSlotControlLock);
}
else
{
ReplicationSlot slot_contents;
int i = 0;
/* Copy slot contents while holding spinlock */
SpinLockAcquire(&slot->mutex);
slot_contents = *slot;
SpinLockRelease(&slot->mutex);
LWLockRelease(ReplicationSlotControlLock);
if (OidIsValid(slot_contents.data.database))
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use \"%s\" with logical replication slot \"%s\"",
"READ_REPLICATION_SLOT",
NameStr(slot_contents.data.name)));
/* slot type */
values[i] = CStringGetTextDatum("physical");
nulls[i] = false;
i++;
/* start LSN */
if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
{
char xloc[64];
snprintf(xloc, sizeof(xloc), "%X/%X",
LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
values[i] = CStringGetTextDatum(xloc);
nulls[i] = false;
}
i++;
/* timeline this WAL was produced on */
if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
{
TimeLineID slots_position_timeline;
TimeLineID current_timeline;
List *timeline_history = NIL;
/*
* While in recovery, use as timeline the currently-replaying one
* to get the LSN position's history.
*/
if (RecoveryInProgress())
(void) GetXLogReplayRecPtr(&current_timeline);
else
current_timeline = ThisTimeLineID;
timeline_history = readTimeLineHistory(current_timeline);
slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
timeline_history);
values[i] = Int64GetDatum((int64) slots_position_timeline);
nulls[i] = false;
}
i++;
Assert(i == READ_REPLICATION_SLOT_COLS);
}
dest = CreateDestReceiver(DestRemoteSimple);
tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
do_tup_output(tstate, values, nulls);
end_tup_output(tstate);
}
/*
* Handle TIMELINE_HISTORY command.
@@ -1622,6 +1721,13 @@ exec_replication_command(const char *cmd_string)
EndReplicationCommand(cmdtag);
break;
case T_ReadReplicationSlotCmd:
cmdtag = "READ_REPLICATION_SLOT";
set_ps_display(cmdtag);
ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node);
EndReplicationCommand(cmdtag);
break;
case T_BaseBackupCmd:
cmdtag = "BASE_BACKUP";
set_ps_display(cmdtag);