1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-10 17:42:29 +03:00

Use PqMsg_* macros in walsender.c

Oversights in commits f4b54e1ed9, dc21234005, and 228c370868.

Author: Dave Cramer <davecramer@gmail.com>
Discussion: https://postgr.es/m/CADK3HH%2BowWVdnbmWH4NHG8%3D%2BkXA_wjsyEVLoY719iJnb%3D%2BtT6A%40mail.gmail.com
This commit is contained in:
Nathan Bossart
2025-07-23 10:29:45 -05:00
parent 196063d676
commit 37c7a7eeb6

View File

@@ -65,6 +65,7 @@
#include "funcapi.h" #include "funcapi.h"
#include "libpq/libpq.h" #include "libpq/libpq.h"
#include "libpq/pqformat.h" #include "libpq/pqformat.h"
#include "libpq/protocol.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "nodes/replnodes.h" #include "nodes/replnodes.h"
#include "pgstat.h" #include "pgstat.h"
@@ -735,13 +736,13 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset,
switch (mtype) switch (mtype)
{ {
case 'd': /* CopyData */ case PqMsg_CopyData:
maxmsglen = PQ_LARGE_MESSAGE_LIMIT; maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
break; break;
case 'c': /* CopyDone */ case PqMsg_CopyDone:
case 'f': /* CopyFail */ case PqMsg_CopyFail:
case 'H': /* Flush */ case PqMsg_Flush:
case 'S': /* Sync */ case PqMsg_Sync:
maxmsglen = PQ_SMALL_MESSAGE_LIMIT; maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
break; break;
default: default:
@@ -763,19 +764,19 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset,
/* Process the message */ /* Process the message */
switch (mtype) switch (mtype)
{ {
case 'd': /* CopyData */ case PqMsg_CopyData:
AppendIncrementalManifestData(ib, buf->data, buf->len); AppendIncrementalManifestData(ib, buf->data, buf->len);
return true; return true;
case 'c': /* CopyDone */ case PqMsg_CopyDone:
return false; return false;
case 'H': /* Sync */ case PqMsg_Sync:
case 'S': /* Flush */ case PqMsg_Flush:
/* Ignore these while in CopyOut mode as we do elsewhere. */ /* Ignore these while in CopyOut mode as we do elsewhere. */
return true; return true;
case 'f': case PqMsg_CopyFail:
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED), (errcode(ERRCODE_QUERY_CANCELED),
errmsg("COPY from stdin failed: %s", errmsg("COPY from stdin failed: %s",
@@ -1569,7 +1570,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
tmpbuf.data, sizeof(int64)); tmpbuf.data, sizeof(int64));
/* output previously gathered data in a CopyData packet */ /* output previously gathered data in a CopyData packet */
pq_putmessage_noblock('d', ctx->out->data, ctx->out->len); pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len);
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
@@ -2305,7 +2306,7 @@ ProcessRepliesIfAny(void)
case PqMsg_CopyDone: case PqMsg_CopyDone:
if (!streamingDoneSending) if (!streamingDoneSending)
{ {
pq_putmessage_noblock('c', NULL, 0); pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0);
streamingDoneSending = true; streamingDoneSending = true;
} }
@@ -2758,7 +2759,7 @@ ProcessStandbyPSRequestMessage(void)
pq_sendint64(&output_message, GetCurrentTimestamp()); pq_sendint64(&output_message, GetCurrentTimestamp());
/* ... and send it wrapped in CopyData */ /* ... and send it wrapped in CopyData */
pq_putmessage_noblock('d', output_message.data, output_message.len); pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len);
} }
/* /*
@@ -3306,7 +3307,7 @@ XLogSendPhysical(void)
wal_segment_close(xlogreader); wal_segment_close(xlogreader);
/* Send CopyDone */ /* Send CopyDone */
pq_putmessage_noblock('c', NULL, 0); pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0);
streamingDoneSending = true; streamingDoneSending = true;
WalSndCaughtUp = true; WalSndCaughtUp = true;
@@ -3434,7 +3435,7 @@ retry:
memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)], memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
tmpbuf.data, sizeof(int64)); tmpbuf.data, sizeof(int64));
pq_putmessage_noblock('d', output_message.data, output_message.len); pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len);
sentPtr = endptr; sentPtr = endptr;
@@ -4140,7 +4141,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
pq_sendbyte(&output_message, requestReply ? 1 : 0); pq_sendbyte(&output_message, requestReply ? 1 : 0);
/* ... and send it wrapped in CopyData */ /* ... and send it wrapped in CopyData */
pq_putmessage_noblock('d', output_message.data, output_message.len); pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len);
/* Set local flag */ /* Set local flag */
if (requestReply) if (requestReply)