1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-09 06:21:09 +03:00

Use a lexer and grammar for parsing walsender commands

Makes it easier to parse mainly the BASE_BACKUP command
with it's options, and avoids having to manually deal
with quoted identifiers in the label (previously broken),
and makes it easier to add new commands and options in
the future.

In passing, refactor the case statement in the walsender
to put each command in it's own function.
This commit is contained in:
Magnus Hagander
2011-01-14 16:30:33 +01:00
parent 688423d004
commit fcd810c69a
12 changed files with 603 additions and 128 deletions

View File

@@ -45,6 +45,7 @@
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "replication/basebackup.h"
#include "replication/replnodes.h"
#include "replication/walprotocol.h"
#include "replication/walsender.h"
#include "storage/fd.h"
@@ -99,6 +100,7 @@ static void WalSndXLogSendHandler(SIGNAL_ARGS);
static void WalSndLastCycleHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
static bool HandleReplicationCommand(const char *cmd_string);
static int WalSndLoop(void);
static void InitWalSnd(void);
static void WalSndHandshake(void);
@@ -106,6 +108,8 @@ static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
static bool XLogSend(char *msgbuf, bool *caughtup);
static void CheckClosedConnection(void);
static void IdentifySystem(void);
static void StartReplication(StartReplicationCmd * cmd);
/* Main entry point for walsender process */
@@ -218,118 +222,14 @@ WalSndHandshake(void)
case 'Q': /* Query message */
{
const char *query_string;
XLogRecPtr recptr;
query_string = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message);
if (strcmp(query_string, "IDENTIFY_SYSTEM") == 0)
{
StringInfoData buf;
char sysid[32];
char tli[11];
/*
* Reply with a result set with one row, two columns.
* First col is system ID, and second is timeline ID
*/
snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
GetSystemIdentifier());
snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
/* Send a RowDescription message */
pq_beginmessage(&buf, 'T');
pq_sendint(&buf, 2, 2); /* 2 fields */
/* first field */
pq_sendstring(&buf, "systemid"); /* col name */
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
pq_sendint(&buf, TEXTOID, 4); /* type oid */
pq_sendint(&buf, -1, 2); /* typlen */
pq_sendint(&buf, 0, 4); /* typmod */
pq_sendint(&buf, 0, 2); /* format code */
/* second field */
pq_sendstring(&buf, "timeline"); /* col name */
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
pq_sendint(&buf, INT4OID, 4); /* type oid */
pq_sendint(&buf, 4, 2); /* typlen */
pq_sendint(&buf, 0, 4); /* typmod */
pq_sendint(&buf, 0, 2); /* format code */
pq_endmessage(&buf);
/* Send a DataRow message */
pq_beginmessage(&buf, 'D');
pq_sendint(&buf, 2, 2); /* # of columns */
pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
pq_sendint(&buf, strlen(tli), 4); /* col2 len */
pq_sendbytes(&buf, (char *) tli, strlen(tli));
pq_endmessage(&buf);
/* Send CommandComplete and ReadyForQuery messages */
EndCommand("SELECT", DestRemote);
ReadyForQuery(DestRemote);
/* ReadyForQuery did pq_flush for us */
}
else if (sscanf(query_string, "START_REPLICATION %X/%X",
&recptr.xlogid, &recptr.xrecoff) == 2)
{
StringInfoData buf;
/*
* Check that we're logging enough information in the
* WAL for log-shipping.
*
* NOTE: This only checks the current value of
* wal_level. Even if the current setting is not
* 'minimal', there can be old WAL in the pg_xlog
* directory that was created with 'minimal'. So this
* is not bulletproof, the purpose is just to give a
* user-friendly error message that hints how to
* configure the system correctly.
*/
if (wal_level == WAL_LEVEL_MINIMAL)
ereport(FATAL,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("standby connections not allowed because wal_level=minimal")));
/* Send a CopyBothResponse message, and start streaming */
pq_beginmessage(&buf, 'W');
pq_sendbyte(&buf, 0);
pq_sendint(&buf, 0, 2);
pq_endmessage(&buf);
pq_flush();
/*
* Initialize position to the received one, then the
* xlog records begin to be shipped from that position
*/
sentPtr = recptr;
/* break out of the loop */
if (HandleReplicationCommand(query_string))
replication_started = true;
}
else if (strncmp(query_string, "BASE_BACKUP ", 12) == 0)
{
/* Command is BASE_BACKUP <options>;<label> */
SendBaseBackup(query_string + strlen("BASE_BACKUP "));
/* Send CommandComplete and ReadyForQuery messages */
EndCommand("SELECT", DestRemote);
ReadyForQuery(DestRemote);
/* ReadyForQuery did pq_flush for us */
}
else
{
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid standby query string: %s", query_string)));
}
break;
}
break;
case 'X':
/* standby is closing the connection */
@@ -350,6 +250,170 @@ WalSndHandshake(void)
}
}
/*
* IDENTIFY_SYSTEM
*/
static void
IdentifySystem(void)
{
StringInfoData buf;
char sysid[32];
char tli[11];
/*
* Reply with a result set with one row, two columns. First col is system
* ID, and second is timeline ID
*/
snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
GetSystemIdentifier());
snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
/* Send a RowDescription message */
pq_beginmessage(&buf, 'T');
pq_sendint(&buf, 2, 2); /* 2 fields */
/* first field */
pq_sendstring(&buf, "systemid"); /* col name */
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
pq_sendint(&buf, TEXTOID, 4); /* type oid */
pq_sendint(&buf, -1, 2); /* typlen */
pq_sendint(&buf, 0, 4); /* typmod */
pq_sendint(&buf, 0, 2); /* format code */
/* second field */
pq_sendstring(&buf, "timeline"); /* col name */
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
pq_sendint(&buf, INT4OID, 4); /* type oid */
pq_sendint(&buf, 4, 2); /* typlen */
pq_sendint(&buf, 0, 4); /* typmod */
pq_sendint(&buf, 0, 2); /* format code */
pq_endmessage(&buf);
/* Send a DataRow message */
pq_beginmessage(&buf, 'D');
pq_sendint(&buf, 2, 2); /* # of columns */
pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
pq_sendint(&buf, strlen(tli), 4); /* col2 len */
pq_sendbytes(&buf, (char *) tli, strlen(tli));
pq_endmessage(&buf);
/* Send CommandComplete and ReadyForQuery messages */
EndCommand("SELECT", DestRemote);
ReadyForQuery(DestRemote);
/* ReadyForQuery did pq_flush for us */
}
/*
* START_REPLICATION
*/
static void
StartReplication(StartReplicationCmd * cmd)
{
StringInfoData buf;
/*
* Check that we're logging enough information in the WAL for
* log-shipping.
*
* NOTE: This only checks the current value of wal_level. Even if the
* current setting is not 'minimal', there can be old WAL in the pg_xlog
* directory that was created with 'minimal'. So this is not bulletproof,
* the purpose is just to give a user-friendly error message that hints
* how to configure the system correctly.
*/
if (wal_level == WAL_LEVEL_MINIMAL)
ereport(FATAL,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("standby connections not allowed because wal_level=minimal")));
/* Send a CopyBothResponse message, and start streaming */
pq_beginmessage(&buf, 'W');
pq_sendbyte(&buf, 0);
pq_sendint(&buf, 0, 2);
pq_endmessage(&buf);
pq_flush();
/*
* Initialize position to the received one, then the xlog records begin to
* be shipped from that position
*/
sentPtr = cmd->startpoint;
}
/*
* Execute an incoming replication command.
*/
static bool
HandleReplicationCommand(const char *cmd_string)
{
bool replication_started = false;
int parse_rc;
Node *cmd_node;
MemoryContext cmd_context;
MemoryContext old_context;
elog(DEBUG1, "received replication command: %s", cmd_string);
cmd_context = AllocSetContextCreate(CurrentMemoryContext,
"Replication command context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
old_context = MemoryContextSwitchTo(cmd_context);
replication_scanner_init(cmd_string);
parse_rc = replication_yyparse();
if (parse_rc != 0)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
(errmsg_internal("replication command parser returned %d",
parse_rc))));
cmd_node = replication_parse_result;
switch (cmd_node->type)
{
case T_IdentifySystemCmd:
IdentifySystem();
break;
case T_StartReplicationCmd:
StartReplication((StartReplicationCmd *) cmd_node);
/* break out of the loop */
replication_started = true;
break;
case T_BaseBackupCmd:
{
BaseBackupCmd *cmd = (BaseBackupCmd *) cmd_node;
SendBaseBackup(cmd->label, cmd->progress);
/* Send CommandComplete and ReadyForQuery messages */
EndCommand("SELECT", DestRemote);
ReadyForQuery(DestRemote);
/* ReadyForQuery did pq_flush for us */
break;
}
default:
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid standby query string: %s", cmd_string)));
}
/* done */
MemoryContextSwitchTo(old_context);
MemoryContextDelete(cmd_context);
return replication_started;
}
/*
* Check if the remote end has closed the connection.
*/