1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-09 06:21:09 +03:00

Expand usage of macros for protocol characters.

This commit makes use of the existing PqMsg_* macros in more places
and adds new PqReplMsg_* and PqBackupMsg_* macros for use in
special replication and backup messages, respectively.

Author: Dave Cramer <davecramer@gmail.com>
Co-authored-by: Fabrízio de Royes Mello <fabriziomello@gmail.com>
Reviewed-by: Jacob Champion <jacob.champion@enterprisedb.com>
Reviewed-by: Álvaro Herrera <alvherre@kurilemu.de>
Reviewed-by: Euler Taveira <euler@eulerto.com>
Discussion: https://postgr.es/m/aIECfYfevCUpenBT@nathan
Discussion: https://postgr.es/m/CAFcNs%2Br73NOUb7%2BqKrV4HHEki02CS96Z%2Bx19WaFgE087BWwEng%40mail.gmail.com
This commit is contained in:
Nathan Bossart
2025-08-06 13:37:00 -05:00
parent 35baa60cc7
commit 9ea3b6f751
9 changed files with 70 additions and 43 deletions

View File

@@ -778,10 +778,10 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
/*
* The first byte of messages sent from leader apply worker to
* parallel apply workers can only be 'w'.
* parallel apply workers can only be PqReplMsg_WALData.
*/
c = pq_getmsgbyte(&s);
if (c != 'w')
if (c != PqReplMsg_WALData)
elog(ERROR, "unexpected message \"%c\"", c);
/*

View File

@@ -3994,7 +3994,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
c = pq_getmsgbyte(&s);
if (c == 'w')
if (c == PqReplMsg_WALData)
{
XLogRecPtr start_lsn;
XLogRecPtr end_lsn;
@@ -4016,7 +4016,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
maybe_advance_nonremovable_xid(&rdt_data, false);
}
else if (c == 'k')
else if (c == PqReplMsg_Keepalive)
{
XLogRecPtr end_lsn;
TimestampTz timestamp;
@@ -4035,7 +4035,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
UpdateWorkerStats(last_received, timestamp, true);
}
else if (c == 's') /* Primary status update */
else if (c == PqReplMsg_PrimaryStatusUpdate)
{
rdt_data.remote_lsn = pq_getmsgint64(&s);
rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
@@ -4267,7 +4267,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
else
resetStringInfo(reply_message);
pq_sendbyte(reply_message, 'r');
pq_sendbyte(reply_message, PqReplMsg_StandbyStatusUpdate);
pq_sendint64(reply_message, recvpos); /* write */
pq_sendint64(reply_message, flushpos); /* flush */
pq_sendint64(reply_message, writepos); /* apply */
@@ -4438,7 +4438,7 @@ request_publisher_status(RetainDeadTuplesData *rdt_data)
* Send the current time to update the remote walsender's latest reply
* message received time.
*/
pq_sendbyte(request_message, 'p');
pq_sendbyte(request_message, PqReplMsg_PrimaryStatusRequest);
pq_sendint64(request_message, GetCurrentTimestamp());
elog(DEBUG2, "sending publisher status request message");

View File

@@ -826,7 +826,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
switch (type)
{
case 'w': /* WAL records */
case PqReplMsg_WALData:
{
StringInfoData incoming_message;
@@ -850,7 +850,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
XLogWalRcvWrite(buf, len, dataStart, tli);
break;
}
case 'k': /* Keepalive */
case PqReplMsg_Keepalive:
{
StringInfoData incoming_message;
@@ -1130,7 +1130,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
applyPtr = GetXLogReplayRecPtr(NULL);
resetStringInfo(&reply_message);
pq_sendbyte(&reply_message, 'r');
pq_sendbyte(&reply_message, PqReplMsg_StandbyStatusUpdate);
pq_sendint64(&reply_message, writePtr);
pq_sendint64(&reply_message, flushPtr);
pq_sendint64(&reply_message, applyPtr);
@@ -1234,7 +1234,7 @@ XLogWalRcvSendHSFeedback(bool immed)
/* Construct the message and send it. */
resetStringInfo(&reply_message);
pq_sendbyte(&reply_message, 'h');
pq_sendbyte(&reply_message, PqReplMsg_HotStandbyFeedback);
pq_sendint64(&reply_message, GetCurrentTimestamp());
pq_sendint32(&reply_message, xmin);
pq_sendint32(&reply_message, xmin_epoch);

View File

@@ -1534,7 +1534,7 @@ WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
resetStringInfo(ctx->out);
pq_sendbyte(ctx->out, 'w');
pq_sendbyte(ctx->out, PqReplMsg_WALData);
pq_sendint64(ctx->out, lsn); /* dataStart */
pq_sendint64(ctx->out, lsn); /* walEnd */
@@ -2292,7 +2292,8 @@ ProcessRepliesIfAny(void)
switch (firstchar)
{
/*
* 'd' means a standby reply wrapped in a CopyData packet.
* PqMsg_CopyData means a standby reply wrapped in a CopyData
* packet.
*/
case PqMsg_CopyData:
ProcessStandbyMessage();
@@ -2300,8 +2301,9 @@ ProcessRepliesIfAny(void)
break;
/*
* CopyDone means the standby requested to finish streaming.
* Reply with CopyDone, if we had not sent that already.
* PqMsg_CopyDone means the standby requested to finish
* streaming. Reply with CopyDone, if we had not sent that
* already.
*/
case PqMsg_CopyDone:
if (!streamingDoneSending)
@@ -2315,7 +2317,8 @@ ProcessRepliesIfAny(void)
break;
/*
* 'X' means that the standby is closing down the socket.
* PqMsg_Terminate means that the standby is closing down the
* socket.
*/
case PqMsg_Terminate:
proc_exit(0);
@@ -2350,15 +2353,15 @@ ProcessStandbyMessage(void)
switch (msgtype)
{
case 'r':
case PqReplMsg_StandbyStatusUpdate:
ProcessStandbyReplyMessage();
break;
case 'h':
case PqReplMsg_HotStandbyFeedback:
ProcessStandbyHSFeedbackMessage();
break;
case 'p':
case PqReplMsg_PrimaryStatusRequest:
ProcessStandbyPSRequestMessage();
break;
@@ -2752,7 +2755,7 @@ ProcessStandbyPSRequestMessage(void)
/* construct the message... */
resetStringInfo(&output_message);
pq_sendbyte(&output_message, 's');
pq_sendbyte(&output_message, PqReplMsg_PrimaryStatusUpdate);
pq_sendint64(&output_message, lsn);
pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit));
pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid));
@@ -3364,7 +3367,7 @@ XLogSendPhysical(void)
* OK to read and send the slice.
*/
resetStringInfo(&output_message);
pq_sendbyte(&output_message, 'w');
pq_sendbyte(&output_message, PqReplMsg_WALData);
pq_sendint64(&output_message, startptr); /* dataStart */
pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
@@ -4135,7 +4138,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
/* construct the message... */
resetStringInfo(&output_message);
pq_sendbyte(&output_message, 'k');
pq_sendbyte(&output_message, PqReplMsg_Keepalive);
pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
pq_sendint64(&output_message, GetCurrentTimestamp());
pq_sendbyte(&output_message, requestReply ? 1 : 0);