diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 70762671610..b88833c8ee2 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1,4 +1,4 @@ - + Frontend/Backend Protocol @@ -1284,6 +1284,173 @@ + +Streaming Replication Protocol + + +To initiate streaming replication, the frontend sends the +replication parameter in the startup message. This tells the +backend to go into walsender mode, wherein a small set of replication commands +can be issued instead of SQL statements. Only the simple query protocol can be +used in walsender mode. + +The commands accepted in walsender mode are: + + + + IDENTIFY_SYSTEM + + + Requests the server to identify itself. Server replies with a result + set of a single row, containing two fields: + + + + + + + systemid + + + + The unique system identifier identifying the cluster. This + can be used to check that the base backup used to initialize the + slave came from the same cluster. + + + + + + + timeline + + + + Current TimelineID. Also useful to check that the slave is + consistent with the master. + + + + + + + + + + START_REPLICATION XXX/XXX + + + Instructs server to start streaming WAL, starting at + WAL position XXX/XXX. + The server can reply with an error, e.g. if the requested section of WAL + has already been recycled. On success, server responds with a + CopyOutResponse message, and then starts to stream WAL to the frontend. + WAL will continue to be streamed until the connection is broken; + no further commands will be accepted. + + + + WAL data is sent as a series of CopyData messages. (This allows + other information to be intermixed; in particular the server can send + an ErrorResponse message if it encounters a failure after beginning + to stream.) The payload in each CopyData message follows this format: + + + + + + + XLogData (B) + + + + + + + Byte1('w') + + + + Identifies the message as WAL data. + + + + + + Byte8 + + + + The starting point of the WAL data in this message, given in + XLogRecPtr format. + + + + + + Byte8 + + + + The current end of WAL on the server, given in + XLogRecPtr format. + + + + + + Byte8 + + + + The server's system clock at the time of transmission, + given in TimestampTz format. + + + + + + Byten + + + + A section of the WAL data stream. + + + + + + + + + + + A single WAL record is never split across two CopyData messages. + When a WAL record crosses a WAL page boundary, and is therefore + already split using continuation records, it can be split at the page + boundary. In other words, the first main WAL record and its + continuation records can be sent in different CopyData messages. + + + Note that all fields within the WAL data and the above-described header + will be in the sending server's native format. Endianness, and the + format for the timestamp, are unpredictable unless the receiver has + verified that the sender's system identifier matches its own + pg_control contents. + + + If the WAL sender process is terminated normally (during postmaster + shutdown), it will send a CommandComplete message before exiting. + This might not happen during an abnormal shutdown, of course. + + + + + + + + + Message Data Types @@ -4137,120 +4304,6 @@ not line breaks. - -Streaming Replication Protocol - - -To initiate streaming replication, the frontend sends the "replication" -parameter in the startup message. This tells the backend to go into -walsender mode, where a small set of replication commands can be issued -instead of SQL statements. Only the simple query protocol can be used in -walsender mode. - -The commands accepted in walsender mode are: - - - - IDENTIFY_SYSTEM - - - Requests the server to identify itself. Server replies with a result - set of a single row, and two fields: - - systemid: The unique system identifier identifying the cluster. This - can be used to check that the base backup used to initialize the - slave came from the same cluster. - - timeline: Current TimelineID. Also used to check that the slave is - consistent with the master. - - - - - - START_REPLICATION XXX/XXX - - - Instructs backend to start streaming WAL, starting at point XXX/XXX. - Server can reply with an error e.g if the requested piece of WAL has - already been recycled. On success, server responds with a - CopyOutResponse message, and backend starts to stream WAL as CopyData - messages. - The payload in CopyData message consists of the following format. - - - - - - - XLogData (B) - - - - - - - Byte1('w') - - - - Identifies the message as WAL data. - - - - - - Int32 - - - - The log file number of the LSN, indicating the starting point of - the WAL in the message. - - - - - - Int32 - - - - The byte offset of the LSN, indicating the starting point of - the WAL in the message. - - - - - - Byten - - - - Data that forms part of WAL data stream. - - - - - - - - - - - A single WAL record is never split across two CopyData messages. When - a WAL record crosses a WAL page boundary, however, and is therefore - already split using continuation records, it can be split at the page - boundary. In other words, the first main WAL record and its - continuation records can be split across different CopyData messages. - - - - - - - - - Summary of Changes since Protocol 2.0 diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index ecb2c3a6d39..b31cfb4147d 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -29,7 +29,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.10 2010/04/20 22:55:03 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.11 2010/06/03 22:17:32 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -41,6 +41,7 @@ #include "access/xlog_internal.h" #include "libpq/pqsignal.h" #include "miscadmin.h" +#include "replication/walprotocol.h" #include "replication/walreceiver.h" #include "storage/ipc.h" #include "storage/pmsignal.h" @@ -393,18 +394,18 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) { case 'w': /* WAL records */ { - XLogRecPtr recptr; + WalDataMessageHeader msghdr; - if (len < sizeof(XLogRecPtr)) + if (len < sizeof(WalDataMessageHeader)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid WAL message received from primary"))); + /* memcpy is required here for alignment reasons */ + memcpy(&msghdr, buf, sizeof(WalDataMessageHeader)); + buf += sizeof(WalDataMessageHeader); + len -= sizeof(WalDataMessageHeader); - memcpy(&recptr, buf, sizeof(XLogRecPtr)); - buf += sizeof(XLogRecPtr); - len -= sizeof(XLogRecPtr); - - XLogWalRcvWrite(buf, len, recptr); + XLogWalRcvWrite(buf, len, msghdr.dataStart); break; } default: diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d2e37fd0086..e337e7e5a6f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -5,11 +5,10 @@ * The WAL sender process (walsender) is new as of Postgres 9.0. It takes * charge of XLOG streaming sender in the primary server. At first, it is * started by the postmaster when the walreceiver in the standby server - * connects to the primary server and requests XLOG streaming replication, - * i.e., unlike any auxiliary process, it is not an always-running process. + * connects to the primary server and requests XLOG streaming replication. * It attempts to keep reading XLOG records from the disk and sending them * to the standby server, as long as the connection is alive (i.e., like - * any backend, there is an one to one relationship between a connection + * any backend, there is a one-to-one relationship between a connection * and a walsender process). * * Normal termination is by SIGTERM, which instructs the walsender to @@ -30,7 +29,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.24 2010/06/03 21:02:12 petere Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.25 2010/06/03 22:17:32 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -44,6 +43,7 @@ #include "libpq/pqformat.h" #include "libpq/pqsignal.h" #include "miscadmin.h" +#include "replication/walprotocol.h" #include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -80,7 +80,7 @@ static uint32 sendOff = 0; /* * How far have we sent WAL already? This is also advertised in - * MyWalSnd->sentPtr. + * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.) */ static XLogRecPtr sentPtr = {0, 0}; @@ -100,19 +100,9 @@ static void InitWalSnd(void); static void WalSndHandshake(void); static void WalSndKill(int code, Datum arg); static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes); -static bool XLogSend(StringInfo outMsg, bool *caughtup); +static bool XLogSend(char *msgbuf, bool *caughtup); static void CheckClosedConnection(void); -/* - * How much WAL to send in one message? Must be >= XLOG_BLCKSZ. - * - * We don't have a good idea of what a good value would be; there's some - * overhead per message in both walsender and walreceiver, but on the other - * hand sending large batches makes walsender less responsive to signals - * because signals are checked only between messages. 128kB (with - * default 8k blocks) seems like a reasonable guess for now. - */ -#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* Main entry point for walsender process */ int @@ -157,6 +147,9 @@ WalSenderMain(void) return WalSndLoop(); } +/* + * Execute commands from walreceiver, until we enter streaming mode. + */ static void WalSndHandshake(void) { @@ -172,6 +165,13 @@ WalSndHandshake(void) /* Wait for a command to arrive */ firstchar = pq_getbyte(); + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (!PostmasterIsAlive(true)) + exit(1); + /* * Check for any other interesting events that happened while we * slept. @@ -211,7 +211,7 @@ WalSndHandshake(void) /* * Reply with a result set with one row, two columns. - * First col is system ID, and second if timeline ID + * First col is system ID, and second is timeline ID */ snprintf(sysid, sizeof(sysid), UINT64_FORMAT, @@ -253,6 +253,7 @@ WalSndHandshake(void) /* Send CommandComplete and ReadyForQuery messages */ EndCommand("SELECT", DestRemote); ReadyForQuery(DestRemote); + /* ReadyForQuery did pq_flush for us */ } else if (sscanf(query_string, "START_REPLICATION %X/%X", &recptr.xlogid, &recptr.xrecoff) == 2) @@ -365,12 +366,17 @@ CheckClosedConnection(void) static int WalSndLoop(void) { - StringInfoData output_message; + char *output_message; bool caughtup = false; - initStringInfo(&output_message); + /* + * Allocate buffer that will be used for each output message. We do this + * just once to reduce palloc overhead. The buffer must be made large + * enough for maximum-sized messages. + */ + output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE); - /* Loop forever */ + /* Loop forever, unless we get an error */ for (;;) { long remain; /* remaining time (us) */ @@ -381,6 +387,7 @@ WalSndLoop(void) */ if (!PostmasterIsAlive(true)) exit(1); + /* Process any requests or signals received recently */ if (got_SIGHUP) { @@ -394,8 +401,8 @@ WalSndLoop(void) */ if (ready_to_stop) { - if (!XLogSend(&output_message, &caughtup)) - goto eof; + if (!XLogSend(output_message, &caughtup)) + break; if (caughtup) shutdown_requested = true; } @@ -435,17 +442,15 @@ WalSndLoop(void) remain -= NAPTIME_PER_CYCLE; } } + /* Attempt to send the log once every loop */ - if (!XLogSend(&output_message, &caughtup)) - goto eof; + if (!XLogSend(output_message, &caughtup)) + break; } - /* can't get here because the above loop never exits */ - return 1; - -eof: - /* + * Get here on send failure. Clean up and exit. + * * Reset whereToSendOutput to prevent ereport from attempting to send any * more messages to the standby. */ @@ -524,6 +529,9 @@ WalSndKill(int code, Datum arg) /* * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr' + * + * XXX probably this should be improved to suck data directly from the + * WAL buffers when possible. */ static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) @@ -634,51 +642,46 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) /* * Read up to MAX_SEND_SIZE bytes of WAL that's been written (and flushed), - * but not yet sent to the client, and send it. If there is no unsent WAL, - * *caughtup is set to true and nothing is sent, otherwise *caughtup is set - * to false. + * but not yet sent to the client, and send it. + * + * msgbuf is a work area in which the output message is constructed. It's + * passed in just so we can avoid re-palloc'ing the buffer on each cycle. + * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE. + * + * If there is no unsent WAL remaining, *caughtup is set to true, otherwise + * *caughtup is set to false. * * Returns true if OK, false if trouble. */ static bool -XLogSend(StringInfo outMsg, bool *caughtup) +XLogSend(char *msgbuf, bool *caughtup) { XLogRecPtr SendRqstPtr; XLogRecPtr startptr; XLogRecPtr endptr; Size nbytes; - char activitymsg[50]; - - /* use volatile pointer to prevent code rearrangement */ - volatile WalSnd *walsnd = MyWalSnd; + WalDataMessageHeader msghdr; /* Attempt to send all records flushed to the disk already */ SendRqstPtr = GetWriteRecPtr(); /* Quick exit if nothing to do */ - if (!XLByteLT(sentPtr, SendRqstPtr)) + if (XLByteLE(SendRqstPtr, sentPtr)) { *caughtup = true; return true; } - /* - * Otherwise let the caller know that we're not fully caught up. Unless - * there's a huge backlog, we'll be caught up to the current WriteRecPtr - * after we've sent everything below, but more WAL could accumulate while - * we're busy sending. - */ - *caughtup = false; /* - * Figure out how much to send in one message. If there's less than + * Figure out how much to send in one message. If there's no more than * MAX_SEND_SIZE bytes to send, send everything. Otherwise send - * MAX_SEND_SIZE bytes, but round to page boundary. + * MAX_SEND_SIZE bytes, but round to logfile or page boundary. * * The rounding is not only for performance reasons. Walreceiver * relies on the fact that we never split a WAL record across two * messages. Since a long WAL record is split at page boundary into * continuation records, page boundary is always a safe cut-off point. - * We also assume that SendRqstPtr never points in the middle of a WAL + * We also assume that SendRqstPtr never points to the middle of a WAL * record. */ startptr = sentPtr; @@ -694,59 +697,78 @@ XLogSend(StringInfo outMsg, bool *caughtup) endptr = startptr; XLByteAdvance(endptr, MAX_SEND_SIZE); - /* round down to page boundary. */ - endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ); + if (endptr.xlogid != startptr.xlogid) + { + /* Don't cross a logfile boundary within one message */ + Assert(endptr.xlogid == startptr.xlogid + 1); + endptr.xlogid = startptr.xlogid; + endptr.xrecoff = XLogFileSize; + } + /* if we went beyond SendRqstPtr, back off */ - if (XLByteLT(SendRqstPtr, endptr)) + if (XLByteLE(SendRqstPtr, endptr)) + { endptr = SendRqstPtr; + *caughtup = true; + } + else + { + /* round down to page boundary. */ + endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ); + *caughtup = false; + } + + nbytes = endptr.xrecoff - startptr.xrecoff; + Assert(nbytes <= MAX_SEND_SIZE); /* * OK to read and send the slice. - * - * We don't need to convert the xlogid/xrecoff from host byte order to - * network byte order because the both server can be expected to have - * the same byte order. If they have different byte order, we don't - * reach here. */ - pq_sendbyte(outMsg, 'w'); - pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr)); - - if (endptr.xlogid != startptr.xlogid) - { - Assert(endptr.xlogid == startptr.xlogid + 1); - nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff; - } - else - nbytes = endptr.xrecoff - startptr.xrecoff; - - sentPtr = endptr; + msgbuf[0] = 'w'; /* - * Read the log directly into the output buffer to prevent extra - * memcpy calls. + * Read the log directly into the output buffer to avoid extra memcpy + * calls. */ - enlargeStringInfo(outMsg, nbytes); + XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes); - XLogRead(&outMsg->data[outMsg->len], startptr, nbytes); - outMsg->len += nbytes; - outMsg->data[outMsg->len] = '\0'; + /* + * We fill the message header last so that the send timestamp is taken + * as late as possible. + */ + msghdr.dataStart = startptr; + msghdr.walEnd = SendRqstPtr; + msghdr.sendTime = GetCurrentTimestamp(); - pq_putmessage('d', outMsg->data, outMsg->len); - resetStringInfo(outMsg); + memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader)); - /* Update shared memory status */ - SpinLockAcquire(&walsnd->mutex); - walsnd->sentPtr = sentPtr; - SpinLockRelease(&walsnd->mutex); + pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes); /* Flush pending output */ if (pq_flush()) return false; + sentPtr = endptr; + + /* Update shared memory status */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sentPtr = sentPtr; + SpinLockRelease(&walsnd->mutex); + } + /* Report progress of XLOG streaming in PS display */ - snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", - sentPtr.xlogid, sentPtr.xrecoff); - set_ps_display(activitymsg, false); + if (update_process_title) + { + char activitymsg[50]; + + snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", + sentPtr.xlogid, sentPtr.xrecoff); + set_ps_display(activitymsg, false); + } return true; } diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h new file mode 100644 index 00000000000..15025a277c0 --- /dev/null +++ b/src/include/replication/walprotocol.h @@ -0,0 +1,53 @@ +/*------------------------------------------------------------------------- + * + * walprotocol.h + * Definitions relevant to the streaming WAL transmission protocol. + * + * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group + * + * $PostgreSQL: pgsql/src/include/replication/walprotocol.h,v 1.1 2010/06/03 22:17:32 tgl Exp $ + * + *------------------------------------------------------------------------- + */ +#ifndef _WALPROTOCOL_H +#define _WALPROTOCOL_H + +#include "access/xlogdefs.h" +#include "utils/timestamp.h" + + +/* + * Header for a WAL data message (message type 'w'). This is wrapped within + * a CopyData message at the FE/BE protocol level. + * + * The header is followed by actual WAL data. Note that the data length is + * not specified in the header --- it's just whatever remains in the message. + * + * walEnd and sendTime are not essential data, but are provided in case + * the receiver wants to adjust its behavior depending on how far behind + * it is. + */ +typedef struct +{ + /* WAL start location of the data included in this message */ + XLogRecPtr dataStart; + + /* Current end of WAL on the sender */ + XLogRecPtr walEnd; + + /* Sender's system clock at the time of transmission */ + TimestampTz sendTime; +} WalDataMessageHeader; + +/* + * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ. + * + * We don't have a good idea of what a good value would be; there's some + * overhead per message in both walsender and walreceiver, but on the other + * hand sending large batches makes walsender less responsive to signals + * because signals are checked only between messages. 128kB (with + * default 8k blocks) seems like a reasonable guess for now. + */ +#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) + +#endif /* _WALPROTOCOL_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 4300b80b278..5dcaeba3f33 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -5,7 +5,7 @@ * * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group * - * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.8 2010/02/26 02:01:27 momjian Exp $ + * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.9 2010/06/03 22:17:32 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -14,6 +14,7 @@ #include "access/xlogdefs.h" #include "storage/spin.h" +#include "pgtime.h" extern bool am_walreceiver;