From a9dad56441e4bc5d20c34bd56daf8e72df1b22d3 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 8 Nov 2012 10:25:58 +0200 Subject: [PATCH] Teach pg_basebackup and pg_receivexlog to reply to server keepalives. Without this, the connection will be killed after timeout if wal_sender_timeout is set in the server. Original patch by Amit Kapila, modified by me to fit recent changes in the code. --- doc/src/sgml/ref/pg_basebackup.sgml | 8 ++--- doc/src/sgml/ref/pg_receivexlog.sgml | 7 +++-- src/bin/pg_basebackup/receivelog.c | 46 ++++++++++++++++++++++------ 3 files changed, 45 insertions(+), 16 deletions(-) diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml index a951d6b0f04..0bc3ca27b16 100644 --- a/doc/src/sgml/ref/pg_basebackup.sgml +++ b/doc/src/sgml/ref/pg_basebackup.sgml @@ -377,10 +377,10 @@ PostgreSQL documentation Specifies the number of seconds between status packets sent back to the - server. This is required when streaming the transaction log (using - --xlog=stream) if replication timeout is configured - on the server, and allows for easier monitoring. A value of zero disables - the status updates completely. The default value is 10 seconds. + server. This allows for easier monitoring of the progress from server. + A value of zero disables the periodic status updates completely, + although an update will still be sent when requested by the server, to + avoid timeout disconnect. The default value is 10 seconds. diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml index 7f62fd9e615..d06dd1f171c 100644 --- a/doc/src/sgml/ref/pg_receivexlog.sgml +++ b/doc/src/sgml/ref/pg_receivexlog.sgml @@ -155,9 +155,10 @@ PostgreSQL documentation Specifies the number of seconds between status packets sent back to the - server. This is required if replication timeout is configured on the - server, and allows for easier monitoring. A value of zero disables the - status updates completely. The default value is 10 seconds. + server. This allows for easier monitoring of the progress from server. + A value of zero disables the periodic status updates completely, + although an update will still be sent when requested by the server, to + avoid timeout disconnect. The default value is 10 seconds. diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index f653650ffb1..de82ff54d8e 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -287,7 +287,7 @@ recvint64(char *buf) * Send a Standby Status Update message to server. */ static bool -sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now) +sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested) { char replybuf[1 + 8 + 8 + 8 + 8 + 1]; int len = 0; @@ -302,7 +302,7 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now) len += 8; sendint64(now, &replybuf[len]); /* sendTime */ len += 8; - replybuf[len] = 0; /* replyRequested */ + replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */ len += 1; if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) @@ -413,6 +413,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, int bytes_left; int bytes_written; int64 now; + int hdr_len; if (copybuf != NULL) { @@ -441,7 +442,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, standby_message_timeout)) { /* Time to send feedback! */ - if (!sendFeedback(conn, blockpos, now)) + if (!sendFeedback(conn, blockpos, now, false)) goto error; last_status = now; } @@ -520,10 +521,34 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Check the message type. */ if (copybuf[0] == 'k') { + int pos; + bool replyRequested; + /* - * keepalive message, sent in 9.2 and newer. We just ignore this - * message completely, but need to skip past it in the stream. + * Parse the keepalive message, enclosed in the CopyData message. + * We just check if the server requested a reply, and ignore the + * rest. */ + pos = 1; /* skip msgtype 'k' */ + pos += 8; /* skip walEnd */ + pos += 8; /* skip sendTime */ + + if (r < pos + 1) + { + fprintf(stderr, _("%s: streaming header too small: %d\n"), + progname, r); + goto error; + } + replyRequested = copybuf[pos]; + + /* If the server requested an immediate reply, send one. */ + if (replyRequested) + { + now = localGetCurrentTimestamp(); + if (!sendFeedback(conn, blockpos, now, false)) + goto error; + last_status = now; + } continue; } else if (copybuf[0] != 'w') @@ -538,8 +563,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * message. We only need the WAL location field (dataStart), the rest * of the header is ignored. */ -#define STREAMING_HEADER_SIZE (1 /* msgtype */ + 8 /* dataStart */ + 8 /* walEnd */ + 8 /* sendTime */) - if (r < STREAMING_HEADER_SIZE + 1) + hdr_len = 1; /* msgtype 'w' */ + hdr_len += 8; /* dataStart */ + hdr_len += 8; /* walEnd */ + hdr_len += 8; /* sendTime */ + if (r < hdr_len + 1) { fprintf(stderr, _("%s: streaming header too small: %d\n"), progname, r); @@ -578,7 +606,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, } } - bytes_left = r - STREAMING_HEADER_SIZE; + bytes_left = r - hdr_len; bytes_written = 0; while (bytes_left) @@ -604,7 +632,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, } if (write(walfile, - copybuf + STREAMING_HEADER_SIZE + bytes_written, + copybuf + hdr_len + bytes_written, bytes_to_write) != bytes_to_write) { fprintf(stderr,