1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-10 17:42:29 +03:00

Send status updates back from standby server to master, indicating how far

the standby has written, flushed, and applied the WAL. At the moment, this
is for informational purposes only, the values are only shown in
pg_stat_replication system view, but in the future they will also be needed
for synchronous replication.

Extracted from Simon riggs' synchronous replication patch by Robert Haas, with
some tweaking by me.
This commit is contained in:
Heikki Linnakangas
2011-02-10 21:00:29 +02:00
parent 4c468b37a2
commit b186523fd9
15 changed files with 352 additions and 22 deletions

View File

@@ -54,6 +54,9 @@
/* Global variable to indicate if this process is a walreceiver process */
bool am_walreceiver;
/* GUC variable */
int wal_receiver_status_interval;
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
walrcv_receive_type walrcv_receive = NULL;
@@ -88,6 +91,8 @@ static struct
XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
} LogstreamResult;
static StandbyReplyMessage reply_message;
/*
* About SIGTERM handling:
*
@@ -114,6 +119,7 @@ static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(void);
static void XLogWalRcvSendReply(void);
/* Signal handlers */
static void WalRcvSigHupHandler(SIGNAL_ARGS);
@@ -306,12 +312,23 @@ WalReceiverMain(void)
while (walrcv_receive(0, &type, &buf, &len))
XLogWalRcvProcessMsg(type, buf, len);
/* Let the master know that we received some data. */
XLogWalRcvSendReply();
/*
* If we've written some records, flush them to disk and let the
* startup process know about them.
*/
XLogWalRcvFlush();
}
else
{
/*
* We didn't receive anything new, but send a status update to
* the master anyway, to report any progress in applying WAL.
*/
XLogWalRcvSendReply();
}
}
}
@@ -546,5 +563,60 @@ XLogWalRcvFlush(void)
LogstreamResult.Write.xrecoff);
set_ps_display(activitymsg, false);
}
/* Also let the master know that we made some progress */
XLogWalRcvSendReply();
}
}
/*
* Send reply message to primary, indicating our current XLOG positions and
* the current time.
*/
static void
XLogWalRcvSendReply(void)
{
char buf[sizeof(StandbyReplyMessage) + 1];
TimestampTz now;
/*
* If the user doesn't want status to be reported to the master, be sure
* to exit before doing anything at all.
*/
if (wal_receiver_status_interval <= 0)
return;
/* Get current timestamp. */
now = GetCurrentTimestamp();
/*
* We can compare the write and flush positions to the last message we
* sent without taking any lock, but the apply position requires a spin
* lock, so we don't check that unless something else has changed or 10
* seconds have passed. This means that the apply log position will
* appear, from the master's point of view, to lag slightly, but since
* this is only for reporting purposes and only on idle systems, that's
* probably OK.
*/
if (XLByteEQ(reply_message.write, LogstreamResult.Write)
&& XLByteEQ(reply_message.flush, LogstreamResult.Flush)
&& !TimestampDifferenceExceeds(reply_message.sendTime, now,
wal_receiver_status_interval * 1000))
return;
/* Construct a new message. */
reply_message.write = LogstreamResult.Write;
reply_message.flush = LogstreamResult.Flush;
reply_message.apply = GetXLogReplayRecPtr();
reply_message.sendTime = now;
elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
reply_message.write.xlogid, reply_message.write.xrecoff,
reply_message.flush.xlogid, reply_message.flush.xrecoff,
reply_message.apply.xlogid, reply_message.apply.xrecoff);
/* Prepend with the message type and send it. */
buf[0] = 'r';
memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage));
walrcv_send(buf, sizeof(StandbyReplyMessage) + 1);
}

View File

@@ -39,6 +39,7 @@
#include "funcapi.h"
#include "access/xlog_internal.h"
#include "access/transam.h"
#include "catalog/pg_type.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
@@ -51,6 +52,7 @@
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
@@ -106,9 +108,10 @@ static void InitWalSnd(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static bool XLogSend(char *msgbuf, bool *caughtup);
static void CheckClosedConnection(void);
static void IdentifySystem(void);
static void StartReplication(StartReplicationCmd * cmd);
static void ProcessStandbyReplyMessage(void);
static void ProcessRepliesIfAny(void);
/* Main entry point for walsender process */
@@ -442,7 +445,7 @@ HandleReplicationCommand(const char *cmd_string)
* Check if the remote end has closed the connection.
*/
static void
CheckClosedConnection(void)
ProcessRepliesIfAny(void)
{
unsigned char firstchar;
int r;
@@ -465,6 +468,13 @@ CheckClosedConnection(void)
/* Handle the very limited subset of commands expected in this phase */
switch (firstchar)
{
/*
* 'd' means a standby reply wrapped in a COPY BOTH packet.
*/
case 'd':
ProcessStandbyReplyMessage();
break;
/*
* 'X' means that the standby is closing down the socket.
*/
@@ -479,6 +489,62 @@ CheckClosedConnection(void)
}
}
/*
* Process a status update message received from standby.
*/
static void
ProcessStandbyReplyMessage(void)
{
static StringInfoData input_message;
StandbyReplyMessage reply;
char msgtype;
initStringInfo(&input_message);
/*
* Read the message contents.
*/
if (pq_getmessage(&input_message, 0))
{
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("unexpected EOF on standby connection")));
proc_exit(0);
}
/*
* Check message type from the first byte. At the moment, there is only
* one type.
*/
msgtype = pq_getmsgbyte(&input_message);
if (msgtype != 'r')
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("unexpected message type %c", msgtype)));
pq_copymsgbytes(&input_message, (char *) &reply, sizeof(StandbyReplyMessage));
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X ",
reply.write.xlogid, reply.write.xrecoff,
reply.flush.xlogid, reply.flush.xrecoff,
reply.apply.xlogid, reply.apply.xrecoff);
/*
* Update shared state for this WalSender process
* based on reply data from standby.
*/
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
walsnd->write = reply.write;
walsnd->flush = reply.flush;
walsnd->apply = reply.apply;
SpinLockRelease(&walsnd->mutex);
}
}
/* Main loop of walsender process */
static int
WalSndLoop(void)
@@ -518,6 +584,7 @@ WalSndLoop(void)
{
if (!XLogSend(output_message, &caughtup))
break;
ProcessRepliesIfAny();
if (caughtup)
walsender_shutdown_requested = true;
}
@@ -561,9 +628,6 @@ WalSndLoop(void)
WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
WalSndDelay * 1000L);
}
/* Check if the connection was closed */
CheckClosedConnection();
}
else
{
@@ -574,6 +638,7 @@ WalSndLoop(void)
/* Update our state to indicate if we're behind or not */
WalSndSetState(caughtup ? WALSNDSTATE_STREAMING : WALSNDSTATE_CATCHUP);
ProcessRepliesIfAny();
}
/*
@@ -1104,7 +1169,7 @@ WalSndGetStateString(WalSndState state)
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
#define PG_STAT_GET_WAL_SENDERS_COLS 3
#define PG_STAT_GET_WAL_SENDERS_COLS 6
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -1141,8 +1206,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
char sent_location[MAXFNAMELEN];
char location[MAXFNAMELEN];
XLogRecPtr sentPtr;
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
WalSndState state;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -1153,13 +1221,14 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
SpinLockAcquire(&walsnd->mutex);
sentPtr = walsnd->sentPtr;
state = walsnd->state;
write = walsnd->write;
flush = walsnd->flush;
apply = walsnd->apply;
SpinLockRelease(&walsnd->mutex);
snprintf(sent_location, sizeof(sent_location), "%X/%X",
sentPtr.xlogid, sentPtr.xrecoff);
memset(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(walsnd->pid);
if (!superuser())
{
/*
@@ -1168,11 +1237,35 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
*/
nulls[1] = true;
nulls[2] = true;
nulls[3] = true;
nulls[4] = true;
nulls[5] = true;
}
else
{
values[1] = CStringGetTextDatum(WalSndGetStateString(state));
values[2] = CStringGetTextDatum(sent_location);
snprintf(location, sizeof(location), "%X/%X",
sentPtr.xlogid, sentPtr.xrecoff);
values[2] = CStringGetTextDatum(location);
if (write.xlogid == 0 && write.xrecoff == 0)
nulls[3] = true;
snprintf(location, sizeof(location), "%X/%X",
write.xlogid, write.xrecoff);
values[3] = CStringGetTextDatum(location);
if (flush.xlogid == 0 && flush.xrecoff == 0)
nulls[4] = true;
snprintf(location, sizeof(location), "%X/%X",
flush.xlogid, flush.xrecoff);
values[4] = CStringGetTextDatum(location);
if (apply.xlogid == 0 && apply.xrecoff == 0)
nulls[5] = true;
snprintf(location, sizeof(location), "%X/%X",
apply.xlogid, apply.xrecoff);
values[5] = CStringGetTextDatum(location);
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);