1
0
mirror of https://github.com/postgres/postgres.git synced 2025-09-02 04:21:28 +03:00

Introduce macros for protocol characters.

This commit introduces descriptively-named macros for the
identifiers used in wire protocol messages.  These new macros are
placed in a new header file so that they can be easily used by
third-party code.

Author: Dave Cramer
Reviewed-by: Alvaro Herrera, Tatsuo Ishii, Peter Smith, Robert Haas, Tom Lane, Peter Eisentraut, Michael Paquier
Discussion: https://postgr.es/m/CADK3HHKbBmK-PKf1bPNFoMC%2BoBt%2BpD9PH8h5nvmBQskEHm-Ehw%40mail.gmail.com
This commit is contained in:
Nathan Bossart
2023-08-22 19:16:12 -07:00
parent 7114791158
commit f4b54e1ed9
25 changed files with 305 additions and 204 deletions

View File

@@ -20,6 +20,7 @@
#include "access/printsimple.h"
#include "catalog/pg_type.h"
#include "libpq/protocol.h"
#include "libpq/pqformat.h"
#include "utils/builtins.h"
@@ -32,7 +33,7 @@ printsimple_startup(DestReceiver *self, int operation, TupleDesc tupdesc)
StringInfoData buf;
int i;
pq_beginmessage(&buf, 'T'); /* RowDescription */
pq_beginmessage(&buf, PqMsg_RowDescription);
pq_sendint16(&buf, tupdesc->natts);
for (i = 0; i < tupdesc->natts; ++i)
@@ -65,7 +66,7 @@ printsimple(TupleTableSlot *slot, DestReceiver *self)
slot_getallattrs(slot);
/* Prepare and send message */
pq_beginmessage(&buf, 'D');
pq_beginmessage(&buf, PqMsg_DataRow);
pq_sendint16(&buf, tupdesc->natts);
for (i = 0; i < tupdesc->natts; ++i)

View File

@@ -1127,7 +1127,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
switch (msgtype)
{
case 'K': /* BackendKeyData */
case PqMsg_BackendKeyData:
{
int32 pid = pq_getmsgint(msg, 4);
@@ -1137,8 +1137,8 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
break;
}
case 'E': /* ErrorResponse */
case 'N': /* NoticeResponse */
case PqMsg_ErrorResponse:
case PqMsg_NoticeResponse:
{
ErrorData edata;
ErrorContextCallback *save_error_context_stack;
@@ -1183,7 +1183,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
break;
}
case 'A': /* NotifyResponse */
case PqMsg_NotificationResponse:
{
/* Propagate NotifyResponse. */
int32 pid;
@@ -1217,7 +1217,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
break;
}
case 'X': /* Terminate, indicating clean exit */
case PqMsg_Terminate:
{
shm_mq_detach(pcxt->worker[i].error_mqh);
pcxt->worker[i].error_mqh = NULL;
@@ -1372,7 +1372,7 @@ ParallelWorkerMain(Datum main_arg)
* protocol message is defined, but it won't actually be used for anything
* in this case.
*/
pq_beginmessage(&msgbuf, 'K');
pq_beginmessage(&msgbuf, PqMsg_BackendKeyData);
pq_sendint32(&msgbuf, (int32) MyProcPid);
pq_sendint32(&msgbuf, (int32) MyCancelKey);
pq_endmessage(&msgbuf);
@@ -1550,7 +1550,7 @@ ParallelWorkerMain(Datum main_arg)
DetachSession();
/* Report success. */
pq_putmessage('X', NULL, 0);
pq_putmessage(PqMsg_Terminate, NULL, 0);
}
/*

View File

@@ -152,7 +152,7 @@ bbsink_copystream_begin_backup(bbsink *sink)
SendTablespaceList(state->tablespaces);
/* Send a CommandComplete message */
pq_puttextmessage('C', "SELECT");
pq_puttextmessage(PqMsg_CommandComplete, "SELECT");
/* Begin COPY stream. This will be used for all archives + manifest. */
SendCopyOutResponse();
@@ -169,7 +169,7 @@ bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
StringInfoData buf;
ti = list_nth(state->tablespaces, state->tablespace_num);
pq_beginmessage(&buf, 'd'); /* CopyData */
pq_beginmessage(&buf, PqMsg_CopyData);
pq_sendbyte(&buf, 'n'); /* New archive */
pq_sendstring(&buf, archive_name);
pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
@@ -220,7 +220,7 @@ bbsink_copystream_archive_contents(bbsink *sink, size_t len)
{
mysink->last_progress_report_time = now;
pq_beginmessage(&buf, 'd'); /* CopyData */
pq_beginmessage(&buf, PqMsg_CopyData);
pq_sendbyte(&buf, 'p'); /* Progress report */
pq_sendint64(&buf, state->bytes_done);
pq_endmessage(&buf);
@@ -246,7 +246,7 @@ bbsink_copystream_end_archive(bbsink *sink)
mysink->bytes_done_at_last_time_check = state->bytes_done;
mysink->last_progress_report_time = GetCurrentTimestamp();
pq_beginmessage(&buf, 'd'); /* CopyData */
pq_beginmessage(&buf, PqMsg_CopyData);
pq_sendbyte(&buf, 'p'); /* Progress report */
pq_sendint64(&buf, state->bytes_done);
pq_endmessage(&buf);
@@ -261,7 +261,7 @@ bbsink_copystream_begin_manifest(bbsink *sink)
{
StringInfoData buf;
pq_beginmessage(&buf, 'd'); /* CopyData */
pq_beginmessage(&buf, PqMsg_CopyData);
pq_sendbyte(&buf, 'm'); /* Manifest */
pq_endmessage(&buf);
}
@@ -318,7 +318,7 @@ SendCopyOutResponse(void)
{
StringInfoData buf;
pq_beginmessage(&buf, 'H');
pq_beginmessage(&buf, PqMsg_CopyOutResponse);
pq_sendbyte(&buf, 0); /* overall format */
pq_sendint16(&buf, 0); /* natts */
pq_endmessage(&buf);
@@ -330,7 +330,7 @@ SendCopyOutResponse(void)
static void
SendCopyDone(void)
{
pq_putemptymessage('c');
pq_putemptymessage(PqMsg_CopyDone);
}
/*
@@ -368,7 +368,7 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
end_tup_output(tstate);
/* Send a CommandComplete message */
pq_puttextmessage('C', "SELECT");
pq_puttextmessage(PqMsg_CommandComplete, "SELECT");
}
/*

View File

@@ -2281,7 +2281,7 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
{
StringInfoData buf;
pq_beginmessage(&buf, 'A');
pq_beginmessage(&buf, PqMsg_NotificationResponse);
pq_sendint32(&buf, srcPid);
pq_sendstring(&buf, channel);
pq_sendstring(&buf, payload);

View File

@@ -174,7 +174,7 @@ ReceiveCopyBegin(CopyFromState cstate)
int16 format = (cstate->opts.binary ? 1 : 0);
int i;
pq_beginmessage(&buf, 'G');
pq_beginmessage(&buf, PqMsg_CopyInResponse);
pq_sendbyte(&buf, format); /* overall format */
pq_sendint16(&buf, natts);
for (i = 0; i < natts; i++)
@@ -279,13 +279,13 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
/* Validate message type and set packet size limit */
switch (mtype)
{
case 'd': /* CopyData */
case PqMsg_CopyData:
maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
break;
case 'c': /* CopyDone */
case 'f': /* CopyFail */
case 'H': /* Flush */
case 'S': /* Sync */
case PqMsg_CopyDone:
case PqMsg_CopyFail:
case PqMsg_Flush:
case PqMsg_Sync:
maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
break;
default:
@@ -305,20 +305,20 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
/* ... and process it */
switch (mtype)
{
case 'd': /* CopyData */
case PqMsg_CopyData:
break;
case 'c': /* CopyDone */
case PqMsg_CopyDone:
/* COPY IN correctly terminated by frontend */
cstate->raw_reached_eof = true;
return bytesread;
case 'f': /* CopyFail */
case PqMsg_CopyFail:
ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED),
errmsg("COPY from stdin failed: %s",
pq_getmsgstring(cstate->fe_msgbuf))));
break;
case 'H': /* Flush */
case 'S': /* Sync */
case PqMsg_Flush:
case PqMsg_Sync:
/*
* Ignore Flush/Sync for the convenience of client

View File

@@ -144,7 +144,7 @@ SendCopyBegin(CopyToState cstate)
int16 format = (cstate->opts.binary ? 1 : 0);
int i;
pq_beginmessage(&buf, 'H');
pq_beginmessage(&buf, PqMsg_CopyOutResponse);
pq_sendbyte(&buf, format); /* overall format */
pq_sendint16(&buf, natts);
for (i = 0; i < natts; i++)
@@ -159,7 +159,7 @@ SendCopyEnd(CopyToState cstate)
/* Shouldn't have any unsent data */
Assert(cstate->fe_msgbuf->len == 0);
/* Send Copy Done message */
pq_putemptymessage('c');
pq_putemptymessage(PqMsg_CopyDone);
}
/*----------
@@ -247,7 +247,7 @@ CopySendEndOfRow(CopyToState cstate)
CopySendChar(cstate, '\n');
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
(void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
break;
case COPY_CALLBACK:
cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);

View File

@@ -87,7 +87,7 @@ CheckSASLAuth(const pg_be_sasl_mech *mech, Port *port, char *shadow_pass,
{
pq_startmsgread();
mtype = pq_getbyte();
if (mtype != 'p')
if (mtype != PqMsg_SASLResponse)
{
/* Only log error if client didn't disconnect. */
if (mtype != EOF)

View File

@@ -665,7 +665,7 @@ sendAuthRequest(Port *port, AuthRequest areq, const char *extradata, int extrale
CHECK_FOR_INTERRUPTS();
pq_beginmessage(&buf, 'R');
pq_beginmessage(&buf, PqMsg_AuthenticationRequest);
pq_sendint32(&buf, (int32) areq);
if (extralen > 0)
pq_sendbytes(&buf, extradata, extralen);
@@ -698,7 +698,7 @@ recv_password_packet(Port *port)
/* Expect 'p' message type */
mtype = pq_getbyte();
if (mtype != 'p')
if (mtype != PqMsg_PasswordMessage)
{
/*
* If the client just disconnects without offering a password, don't
@@ -961,7 +961,7 @@ pg_GSS_recvauth(Port *port)
CHECK_FOR_INTERRUPTS();
mtype = pq_getbyte();
if (mtype != 'p')
if (mtype != PqMsg_GSSResponse)
{
/* Only log error if client didn't disconnect. */
if (mtype != EOF)
@@ -1232,7 +1232,7 @@ pg_SSPI_recvauth(Port *port)
{
pq_startmsgread();
mtype = pq_getbyte();
if (mtype != 'p')
if (mtype != PqMsg_GSSResponse)
{
if (sspictx != NULL)
{

View File

@@ -2357,7 +2357,7 @@ SendNegotiateProtocolVersion(List *unrecognized_protocol_options)
StringInfoData buf;
ListCell *lc;
pq_beginmessage(&buf, 'v'); /* NegotiateProtocolVersion */
pq_beginmessage(&buf, PqMsg_NegotiateProtocolVersion);
pq_sendint32(&buf, PG_PROTOCOL_LATEST);
pq_sendint32(&buf, list_length(unrecognized_protocol_options));
foreach(lc, unrecognized_protocol_options)

View File

@@ -603,7 +603,7 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd)
dest->rStartup(dest, CMD_SELECT, tupdesc);
/* Send a DataRow message */
pq_beginmessage(&buf, 'D');
pq_beginmessage(&buf, PqMsg_DataRow);
pq_sendint16(&buf, 2); /* # of columns */
len = strlen(histfname);
pq_sendint32(&buf, len); /* col1 len */
@@ -801,7 +801,7 @@ StartReplication(StartReplicationCmd *cmd)
WalSndSetState(WALSNDSTATE_CATCHUP);
/* Send a CopyBothResponse message, and start streaming */
pq_beginmessage(&buf, 'W');
pq_beginmessage(&buf, PqMsg_CopyBothResponse);
pq_sendbyte(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage(&buf);
@@ -1294,7 +1294,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
WalSndSetState(WALSNDSTATE_CATCHUP);
/* Send a CopyBothResponse message, and start streaming */
pq_beginmessage(&buf, 'W');
pq_beginmessage(&buf, PqMsg_CopyBothResponse);
pq_sendbyte(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage(&buf);
@@ -1923,11 +1923,11 @@ ProcessRepliesIfAny(void)
/* Validate message type and set packet size limit */
switch (firstchar)
{
case 'd':
case PqMsg_CopyData:
maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
break;
case 'c':
case 'X':
case PqMsg_CopyDone:
case PqMsg_Terminate:
maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
break;
default:
@@ -1955,7 +1955,7 @@ ProcessRepliesIfAny(void)
/*
* 'd' means a standby reply wrapped in a CopyData packet.
*/
case 'd':
case PqMsg_CopyData:
ProcessStandbyMessage();
received = true;
break;
@@ -1964,7 +1964,7 @@ ProcessRepliesIfAny(void)
* CopyDone means the standby requested to finish streaming.
* Reply with CopyDone, if we had not sent that already.
*/
case 'c':
case PqMsg_CopyDone:
if (!streamingDoneSending)
{
pq_putmessage_noblock('c', NULL, 0);
@@ -1978,7 +1978,7 @@ ProcessRepliesIfAny(void)
/*
* 'X' means that the standby is closing down the socket.
*/
case 'X':
case PqMsg_Terminate:
proc_exit(0);
default:

View File

@@ -176,7 +176,7 @@ EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_o
len = BuildQueryCompletionString(completionTag, qc,
force_undecorated_output);
pq_putmessage('C', completionTag, len + 1);
pq_putmessage(PqMsg_Close, completionTag, len + 1);
case DestNone:
case DestDebug:
@@ -200,7 +200,7 @@ EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_o
void
EndReplicationCommand(const char *commandTag)
{
pq_putmessage('C', commandTag, strlen(commandTag) + 1);
pq_putmessage(PqMsg_Close, commandTag, strlen(commandTag) + 1);
}
/* ----------------
@@ -220,7 +220,7 @@ NullCommand(CommandDest dest)
case DestRemoteSimple:
/* Tell the FE that we saw an empty query string */
pq_putemptymessage('I');
pq_putemptymessage(PqMsg_EmptyQueryResponse);
break;
case DestNone:
@@ -258,7 +258,7 @@ ReadyForQuery(CommandDest dest)
{
StringInfoData buf;
pq_beginmessage(&buf, 'Z');
pq_beginmessage(&buf, PqMsg_ReadyForQuery);
pq_sendbyte(&buf, TransactionBlockStatusCode());
pq_endmessage(&buf);
}

View File

@@ -69,7 +69,7 @@ SendFunctionResult(Datum retval, bool isnull, Oid rettype, int16 format)
{
StringInfoData buf;
pq_beginmessage(&buf, 'V');
pq_beginmessage(&buf, PqMsg_FunctionCallResponse);
if (isnull)
{

View File

@@ -402,37 +402,37 @@ SocketBackend(StringInfo inBuf)
*/
switch (qtype)
{
case 'Q': /* simple query */
case PqMsg_Query:
maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
doing_extended_query_message = false;
break;
case 'F': /* fastpath function call */
case PqMsg_FunctionCall:
maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
doing_extended_query_message = false;
break;
case 'X': /* terminate */
case PqMsg_Terminate:
maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
doing_extended_query_message = false;
ignore_till_sync = false;
break;
case 'B': /* bind */
case 'P': /* parse */
case PqMsg_Bind:
case PqMsg_Parse:
maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
doing_extended_query_message = true;
break;
case 'C': /* close */
case 'D': /* describe */
case 'E': /* execute */
case 'H': /* flush */
case PqMsg_Close:
case PqMsg_Describe:
case PqMsg_Execute:
case PqMsg_Flush:
maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
doing_extended_query_message = true;
break;
case 'S': /* sync */
case PqMsg_Sync:
maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
/* stop any active skip-till-Sync */
ignore_till_sync = false;
@@ -440,13 +440,13 @@ SocketBackend(StringInfo inBuf)
doing_extended_query_message = false;
break;
case 'd': /* copy data */
case PqMsg_CopyData:
maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
doing_extended_query_message = false;
break;
case 'c': /* copy done */
case 'f': /* copy fail */
case PqMsg_CopyDone:
case PqMsg_CopyFail:
maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
doing_extended_query_message = false;
break;
@@ -1589,7 +1589,7 @@ exec_parse_message(const char *query_string, /* string to execute */
* Send ParseComplete.
*/
if (whereToSendOutput == DestRemote)
pq_putemptymessage('1');
pq_putemptymessage(PqMsg_ParseComplete);
/*
* Emit duration logging if appropriate.
@@ -2047,7 +2047,7 @@ exec_bind_message(StringInfo input_message)
* Send BindComplete.
*/
if (whereToSendOutput == DestRemote)
pq_putemptymessage('2');
pq_putemptymessage(PqMsg_BindComplete);
/*
* Emit duration logging if appropriate.
@@ -2290,7 +2290,7 @@ exec_execute_message(const char *portal_name, long max_rows)
{
/* Portal run not complete, so send PortalSuspended */
if (whereToSendOutput == DestRemote)
pq_putemptymessage('s');
pq_putemptymessage(PqMsg_PortalSuspended);
/*
* Set XACT_FLAGS_PIPELINING whenever we suspend an Execute message,
@@ -2683,7 +2683,7 @@ exec_describe_statement_message(const char *stmt_name)
NULL);
}
else
pq_putemptymessage('n'); /* NoData */
pq_putemptymessage(PqMsg_NoData);
}
/*
@@ -2736,7 +2736,7 @@ exec_describe_portal_message(const char *portal_name)
FetchPortalTargetList(portal),
portal->formats);
else
pq_putemptymessage('n'); /* NoData */
pq_putemptymessage(PqMsg_NoData);
}
@@ -4239,7 +4239,7 @@ PostgresMain(const char *dbname, const char *username)
{
StringInfoData buf;
pq_beginmessage(&buf, 'K');
pq_beginmessage(&buf, PqMsg_BackendKeyData);
pq_sendint32(&buf, (int32) MyProcPid);
pq_sendint32(&buf, (int32) MyCancelKey);
pq_endmessage(&buf);
@@ -4618,7 +4618,7 @@ PostgresMain(const char *dbname, const char *username)
switch (firstchar)
{
case 'Q': /* simple query */
case PqMsg_Query:
{
const char *query_string;
@@ -4642,7 +4642,7 @@ PostgresMain(const char *dbname, const char *username)
}
break;
case 'P': /* parse */
case PqMsg_Parse:
{
const char *stmt_name;
const char *query_string;
@@ -4672,7 +4672,7 @@ PostgresMain(const char *dbname, const char *username)
}
break;
case 'B': /* bind */
case PqMsg_Bind:
forbidden_in_wal_sender(firstchar);
/* Set statement_timestamp() */
@@ -4687,7 +4687,7 @@ PostgresMain(const char *dbname, const char *username)
/* exec_bind_message does valgrind_report_error_query */
break;
case 'E': /* execute */
case PqMsg_Execute:
{
const char *portal_name;
int max_rows;
@@ -4707,7 +4707,7 @@ PostgresMain(const char *dbname, const char *username)
}
break;
case 'F': /* fastpath function call */
case PqMsg_FunctionCall:
forbidden_in_wal_sender(firstchar);
/* Set statement_timestamp() */
@@ -4742,7 +4742,7 @@ PostgresMain(const char *dbname, const char *username)
send_ready_for_query = true;
break;
case 'C': /* close */
case PqMsg_Close:
{
int close_type;
const char *close_target;
@@ -4782,13 +4782,13 @@ PostgresMain(const char *dbname, const char *username)
}
if (whereToSendOutput == DestRemote)
pq_putemptymessage('3'); /* CloseComplete */
pq_putemptymessage(PqMsg_CloseComplete);
valgrind_report_error_query("CLOSE message");
}
break;
case 'D': /* describe */
case PqMsg_Describe:
{
int describe_type;
const char *describe_target;
@@ -4822,13 +4822,13 @@ PostgresMain(const char *dbname, const char *username)
}
break;
case 'H': /* flush */
case PqMsg_Flush:
pq_getmsgend(&input_message);
if (whereToSendOutput == DestRemote)
pq_flush();
break;
case 'S': /* sync */
case PqMsg_Sync:
pq_getmsgend(&input_message);
finish_xact_command();
valgrind_report_error_query("SYNC message");
@@ -4847,7 +4847,7 @@ PostgresMain(const char *dbname, const char *username)
/* FALLTHROUGH */
case 'X':
case PqMsg_Terminate:
/*
* Reset whereToSendOutput to prevent ereport from attempting
@@ -4865,9 +4865,9 @@ PostgresMain(const char *dbname, const char *username)
*/
proc_exit(0);
case 'd': /* copy data */
case 'c': /* copy done */
case 'f': /* copy fail */
case PqMsg_CopyData:
case PqMsg_CopyDone:
case PqMsg_CopyFail:
/*
* Accept but ignore these messages, per protocol spec; we
@@ -4897,7 +4897,7 @@ forbidden_in_wal_sender(char firstchar)
{
if (am_walsender)
{
if (firstchar == 'F')
if (firstchar == PqMsg_FunctionCall)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("fastpath function calls not supported in a replication connection")));

View File

@@ -3465,7 +3465,10 @@ send_message_to_frontend(ErrorData *edata)
char tbuf[12];
/* 'N' (Notice) is for nonfatal conditions, 'E' is for errors */
pq_beginmessage(&msgbuf, (edata->elevel < ERROR) ? 'N' : 'E');
if (edata->elevel < ERROR)
pq_beginmessage(&msgbuf, PqMsg_NoticeResponse);
else
pq_beginmessage(&msgbuf, PqMsg_ErrorResponse);
sev = error_severity(edata->elevel);
pq_sendbyte(&msgbuf, PG_DIAG_SEVERITY);

View File

@@ -2593,7 +2593,7 @@ ReportGUCOption(struct config_generic *record)
{
StringInfoData msgbuf;
pq_beginmessage(&msgbuf, 'S');
pq_beginmessage(&msgbuf, PqMsg_ParameterStatus);
pq_sendstring(&msgbuf, record->name);
pq_sendstring(&msgbuf, val);
pq_endmessage(&msgbuf);