1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-10 17:42:29 +03:00

Make the streaming replication protocol messages architecture-independent.

We used to send structs wrapped in CopyData messages, which works as long as
the client and server agree on things like endianess, timestamp format and
alignment. That's good enough for running a standby server, which has to run
on the same platform anyway, but it's useful for tools like pg_receivexlog
to work across platforms.

This breaks protocol compatibility of streaming replication, but we never
promised that to be compatible across versions, anyway.
This commit is contained in:
Heikki Linnakangas
2012-11-07 18:59:12 +02:00
parent ed5699dd1b
commit add6c3179a
7 changed files with 384 additions and 367 deletions

View File

@@ -39,9 +39,9 @@
#include <unistd.h>
#include "access/xlog_internal.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "replication/walprotocol.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
@@ -93,8 +93,8 @@ static struct
XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
} LogstreamResult;
static StandbyReplyMessage reply_message;
static StandbyHSFeedbackMessage feedback_message;
static StringInfoData reply_message;
static StringInfoData incoming_message;
/*
* About SIGTERM handling:
@@ -279,10 +279,10 @@ WalReceiverMain(void)
walrcv_connect(conninfo, startpoint);
DisableWalRcvImmediateExit();
/* Initialize LogstreamResult, reply_message and feedback_message */
/* Initialize LogstreamResult and buffers for processing messages */
LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
MemSet(&reply_message, 0, sizeof(reply_message));
MemSet(&feedback_message, 0, sizeof(feedback_message));
initStringInfo(&reply_message);
initStringInfo(&incoming_message);
/* Initialize the last recv timestamp */
last_recv_timestamp = GetCurrentTimestamp();
@@ -480,41 +480,58 @@ WalRcvQuickDieHandler(SIGNAL_ARGS)
static void
XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
{
int hdrlen;
XLogRecPtr dataStart;
XLogRecPtr walEnd;
TimestampTz sendTime;
bool replyRequested;
resetStringInfo(&incoming_message);
switch (type)
{
case 'w': /* WAL records */
{
WalDataMessageHeader msghdr;
if (len < sizeof(WalDataMessageHeader))
/* copy message to StringInfo */
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
if (len < hdrlen)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid WAL message received from primary")));
/* memcpy is required here for alignment reasons */
memcpy(&msghdr, buf, sizeof(WalDataMessageHeader));
appendBinaryStringInfo(&incoming_message, buf, hdrlen);
ProcessWalSndrMessage(msghdr.walEnd, msghdr.sendTime);
/* read the fields */
dataStart = pq_getmsgint64(&incoming_message);
walEnd = pq_getmsgint64(&incoming_message);
sendTime = IntegerTimestampToTimestampTz(
pq_getmsgint64(&incoming_message));
ProcessWalSndrMessage(walEnd, sendTime);
buf += sizeof(WalDataMessageHeader);
len -= sizeof(WalDataMessageHeader);
XLogWalRcvWrite(buf, len, msghdr.dataStart);
buf += hdrlen;
len -= hdrlen;
XLogWalRcvWrite(buf, len, dataStart);
break;
}
case 'k': /* Keepalive */
{
PrimaryKeepaliveMessage keepalive;
if (len != sizeof(PrimaryKeepaliveMessage))
/* copy message to StringInfo */
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
if (len != hdrlen)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid keepalive message received from primary")));
/* memcpy is required here for alignment reasons */
memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage));
appendBinaryStringInfo(&incoming_message, buf, hdrlen);
ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime);
/* read the fields */
walEnd = pq_getmsgint64(&incoming_message);
sendTime = IntegerTimestampToTimestampTz(
pq_getmsgint64(&incoming_message));
replyRequested = pq_getmsgbyte(&incoming_message);
ProcessWalSndrMessage(walEnd, sendTime);
/* If the primary requested a reply, send one immediately */
if (keepalive.replyRequested)
if (replyRequested)
XLogWalRcvSendReply(true, false);
break;
}
@@ -685,7 +702,10 @@ XLogWalRcvFlush(bool dying)
static void
XLogWalRcvSendReply(bool force, bool requestReply)
{
char buf[sizeof(StandbyReplyMessage) + 1];
static XLogRecPtr writePtr = 0;
static XLogRecPtr flushPtr = 0;
XLogRecPtr applyPtr;
static TimestampTz sendTime = 0;
TimestampTz now;
/*
@@ -708,28 +728,34 @@ XLogWalRcvSendReply(bool force, bool requestReply)
* probably OK.
*/
if (!force
&& XLByteEQ(reply_message.write, LogstreamResult.Write)
&& XLByteEQ(reply_message.flush, LogstreamResult.Flush)
&& !TimestampDifferenceExceeds(reply_message.sendTime, now,
&& XLByteEQ(writePtr, LogstreamResult.Write)
&& XLByteEQ(flushPtr, LogstreamResult.Flush)
&& !TimestampDifferenceExceeds(sendTime, now,
wal_receiver_status_interval * 1000))
return;
sendTime = now;
/* Construct a new message */
reply_message.write = LogstreamResult.Write;
reply_message.flush = LogstreamResult.Flush;
reply_message.apply = GetXLogReplayRecPtr(NULL);
reply_message.sendTime = now;
reply_message.replyRequested = requestReply;
writePtr = LogstreamResult.Write;
flushPtr = LogstreamResult.Flush;
applyPtr = GetXLogReplayRecPtr(NULL);
elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
(uint32) (reply_message.write >> 32), (uint32) reply_message.write,
(uint32) (reply_message.flush >> 32), (uint32) reply_message.flush,
(uint32) (reply_message.apply >> 32), (uint32) reply_message.apply);
resetStringInfo(&reply_message);
pq_sendbyte(&reply_message, 'r');
pq_sendint64(&reply_message, writePtr);
pq_sendint64(&reply_message, flushPtr);
pq_sendint64(&reply_message, applyPtr);
pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
pq_sendbyte(&reply_message, requestReply ? 1 : 0);
/* Prepend with the message type and send it. */
buf[0] = 'r';
memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage));
walrcv_send(buf, sizeof(StandbyReplyMessage) + 1);
/* Send it */
elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
(uint32) (writePtr >> 32), (uint32) writePtr,
(uint32) (flushPtr >> 32), (uint32) flushPtr,
(uint32) (applyPtr >> 32), (uint32) applyPtr,
requestReply ? " (reply requested)" : "");
walrcv_send(reply_message.data, reply_message.len);
}
/*
@@ -739,11 +765,11 @@ XLogWalRcvSendReply(bool force, bool requestReply)
static void
XLogWalRcvSendHSFeedback(void)
{
char buf[sizeof(StandbyHSFeedbackMessage) + 1];
TimestampTz now;
TransactionId nextXid;
uint32 nextEpoch;
TransactionId xmin;
static TimestampTz sendTime = 0;
/*
* If the user doesn't want status to be reported to the master, be sure
@@ -758,9 +784,10 @@ XLogWalRcvSendHSFeedback(void)
/*
* Send feedback at most once per wal_receiver_status_interval.
*/
if (!TimestampDifferenceExceeds(feedback_message.sendTime, now,
if (!TimestampDifferenceExceeds(sendTime, now,
wal_receiver_status_interval * 1000))
return;
sendTime = now;
/*
* If Hot Standby is not yet active there is nothing to send. Check this
@@ -783,25 +810,23 @@ XLogWalRcvSendHSFeedback(void)
if (nextXid < xmin)
nextEpoch--;
/*
* Always send feedback message.
*/
feedback_message.sendTime = now;
feedback_message.xmin = xmin;
feedback_message.epoch = nextEpoch;
elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
feedback_message.xmin,
feedback_message.epoch);
xmin, nextEpoch);
/* Prepend with the message type and send it. */
buf[0] = 'h';
memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
/* Construct the the message and send it. */
resetStringInfo(&reply_message);
pq_sendbyte(&reply_message, 'h');
pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
pq_sendint(&reply_message, xmin, 4);
pq_sendint(&reply_message, nextEpoch, 4);
walrcv_send(reply_message.data, reply_message.len);
}
/*
* Keep track of important messages from primary.
* Update shared memory status upon receiving a message from primary.
*
* 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
* message, reported by primary.
*/
static void
ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)