mirror of
https://github.com/postgres/postgres.git
synced 2025-07-02 09:02:37 +03:00
pgindent run for release 9.3
This is the first run of the Perl-based pgindent script. Also update pgindent instructions.
This commit is contained in:
@ -58,7 +58,7 @@ static void base_backup_cleanup(int code, Datum arg);
|
||||
static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
|
||||
static void parse_basebackup_options(List *options, basebackup_options *opt);
|
||||
static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
|
||||
static int compareWalFileNames(const void *a, const void *b);
|
||||
static int compareWalFileNames(const void *a, const void *b);
|
||||
|
||||
/* Was the backup currently in-progress initiated in recovery mode? */
|
||||
static bool backup_started_in_recovery = false;
|
||||
@ -249,8 +249,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
|
||||
* I'd rather not worry about timelines here, so scan pg_xlog and
|
||||
* include all WAL files in the range between 'startptr' and 'endptr',
|
||||
* regardless of the timeline the file is stamped with. If there are
|
||||
* some spurious WAL files belonging to timelines that don't belong
|
||||
* in this server's history, they will be included too. Normally there
|
||||
* some spurious WAL files belonging to timelines that don't belong in
|
||||
* this server's history, they will be included too. Normally there
|
||||
* shouldn't be such files, but if there are, there's little harm in
|
||||
* including them.
|
||||
*/
|
||||
@ -262,7 +262,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
|
||||
dir = AllocateDir("pg_xlog");
|
||||
if (!dir)
|
||||
ereport(ERROR,
|
||||
(errmsg("could not open directory \"%s\": %m", "pg_xlog")));
|
||||
(errmsg("could not open directory \"%s\": %m", "pg_xlog")));
|
||||
while ((de = ReadDir(dir, "pg_xlog")) != NULL)
|
||||
{
|
||||
/* Does it look like a WAL segment, and is it in the range? */
|
||||
@ -290,9 +290,9 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
|
||||
CheckXLogRemoved(startsegno, ThisTimeLineID);
|
||||
|
||||
/*
|
||||
* Put the WAL filenames into an array, and sort. We send the files
|
||||
* in order from oldest to newest, to reduce the chance that a file
|
||||
* is recycled before we get a chance to send it over.
|
||||
* Put the WAL filenames into an array, and sort. We send the files in
|
||||
* order from oldest to newest, to reduce the chance that a file is
|
||||
* recycled before we get a chance to send it over.
|
||||
*/
|
||||
nWalFiles = list_length(walFileList);
|
||||
walFiles = palloc(nWalFiles * sizeof(char *));
|
||||
@ -310,28 +310,31 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
|
||||
XLogFromFileName(walFiles[0], &tli, &segno);
|
||||
if (segno != startsegno)
|
||||
{
|
||||
char startfname[MAXFNAMELEN];
|
||||
char startfname[MAXFNAMELEN];
|
||||
|
||||
XLogFileName(startfname, ThisTimeLineID, startsegno);
|
||||
ereport(ERROR,
|
||||
(errmsg("could not find WAL file \"%s\"", startfname)));
|
||||
}
|
||||
for (i = 0; i < nWalFiles; i++)
|
||||
{
|
||||
XLogSegNo currsegno = segno;
|
||||
XLogSegNo nextsegno = segno + 1;
|
||||
XLogSegNo currsegno = segno;
|
||||
XLogSegNo nextsegno = segno + 1;
|
||||
|
||||
XLogFromFileName(walFiles[i], &tli, &segno);
|
||||
if (!(nextsegno == segno || currsegno == segno))
|
||||
{
|
||||
char nextfname[MAXFNAMELEN];
|
||||
char nextfname[MAXFNAMELEN];
|
||||
|
||||
XLogFileName(nextfname, ThisTimeLineID, nextsegno);
|
||||
ereport(ERROR,
|
||||
(errmsg("could not find WAL file \"%s\"", nextfname)));
|
||||
(errmsg("could not find WAL file \"%s\"", nextfname)));
|
||||
}
|
||||
}
|
||||
if (segno != endsegno)
|
||||
{
|
||||
char endfname[MAXFNAMELEN];
|
||||
char endfname[MAXFNAMELEN];
|
||||
|
||||
XLogFileName(endfname, ThisTimeLineID, endsegno);
|
||||
ereport(ERROR,
|
||||
(errmsg("could not find WAL file \"%s\"", endfname)));
|
||||
@ -373,7 +376,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
|
||||
CheckXLogRemoved(segno, tli);
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
|
||||
errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
|
||||
}
|
||||
|
||||
_tarWriteHeader(pathbuf, NULL, &statbuf);
|
||||
@ -396,7 +399,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
|
||||
CheckXLogRemoved(segno, tli);
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
|
||||
errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
|
||||
}
|
||||
|
||||
/* XLogSegSize is a multiple of 512, so no need for padding */
|
||||
@ -408,13 +411,14 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
|
||||
* file is required for recovery, and even that only if there happens
|
||||
* to be a timeline switch in the first WAL segment that contains the
|
||||
* checkpoint record, or if we're taking a base backup from a standby
|
||||
* server and the target timeline changes while the backup is taken.
|
||||
* server and the target timeline changes while the backup is taken.
|
||||
* But they are small and highly useful for debugging purposes, so
|
||||
* better include them all, always.
|
||||
*/
|
||||
foreach(lc, historyFileList)
|
||||
{
|
||||
char *fname = lfirst(lc);
|
||||
char *fname = lfirst(lc);
|
||||
|
||||
snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", fname);
|
||||
|
||||
if (lstat(pathbuf, &statbuf) != 0)
|
||||
@ -438,8 +442,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
|
||||
static int
|
||||
compareWalFileNames(const void *a, const void *b)
|
||||
{
|
||||
char *fna = *((char **) a);
|
||||
char *fnb = *((char **) b);
|
||||
char *fna = *((char **) a);
|
||||
char *fnb = *((char **) b);
|
||||
|
||||
return strcmp(fna + 8, fnb + 8);
|
||||
}
|
||||
@ -657,11 +661,12 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
|
||||
pq_sendstring(&buf, "tli");
|
||||
pq_sendint(&buf, 0, 4); /* table oid */
|
||||
pq_sendint(&buf, 0, 2); /* attnum */
|
||||
|
||||
/*
|
||||
* int8 may seem like a surprising data type for this, but in thory int4
|
||||
* would not be wide enough for this, as TimeLineID is unsigned.
|
||||
*/
|
||||
pq_sendint(&buf, INT8OID, 4); /* type oid */
|
||||
pq_sendint(&buf, INT8OID, 4); /* type oid */
|
||||
pq_sendint(&buf, -1, 2);
|
||||
pq_sendint(&buf, 0, 4);
|
||||
pq_sendint(&buf, 0, 2);
|
||||
@ -729,7 +734,7 @@ sendFileWithContent(const char *filename, const char *content)
|
||||
|
||||
/*
|
||||
* Include the tablespace directory pointed to by 'path' in the output tar
|
||||
* stream. If 'sizeonly' is true, we just calculate a total length and return
|
||||
* stream. If 'sizeonly' is true, we just calculate a total length and return
|
||||
* it, without actually sending anything.
|
||||
*/
|
||||
static int64
|
||||
@ -747,7 +752,8 @@ sendTablespace(char *path, bool sizeonly)
|
||||
TABLESPACE_VERSION_DIRECTORY);
|
||||
|
||||
/*
|
||||
* Store a directory entry in the tar file so we get the permissions right.
|
||||
* Store a directory entry in the tar file so we get the permissions
|
||||
* right.
|
||||
*/
|
||||
if (lstat(pathbuf, &statbuf) != 0)
|
||||
{
|
||||
@ -762,7 +768,7 @@ sendTablespace(char *path, bool sizeonly)
|
||||
}
|
||||
if (!sizeonly)
|
||||
_tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf);
|
||||
size = 512; /* Size of the header just added */
|
||||
size = 512; /* Size of the header just added */
|
||||
|
||||
/* Send all the files in the tablespace version directory */
|
||||
size += sendDir(pathbuf, strlen(path), sizeonly);
|
||||
@ -818,9 +824,9 @@ sendDir(char *path, int basepathlen, bool sizeonly)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("the standby was promoted during online backup"),
|
||||
errhint("This means that the backup being taken is corrupt "
|
||||
"and should not be used. "
|
||||
"Try taking another online backup.")));
|
||||
errhint("This means that the backup being taken is corrupt "
|
||||
"and should not be used. "
|
||||
"Try taking another online backup.")));
|
||||
|
||||
snprintf(pathbuf, MAXPGPATH, "%s/%s", path, de->d_name);
|
||||
|
||||
@ -923,7 +929,7 @@ sendDir(char *path, int basepathlen, bool sizeonly)
|
||||
}
|
||||
else if (S_ISREG(statbuf.st_mode))
|
||||
{
|
||||
bool sent = false;
|
||||
bool sent = false;
|
||||
|
||||
if (!sizeonly)
|
||||
sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
|
||||
@ -933,7 +939,7 @@ sendDir(char *path, int basepathlen, bool sizeonly)
|
||||
{
|
||||
/* Add size, rounded up to 512byte block */
|
||||
size += ((statbuf.st_size + 511) & ~511);
|
||||
size += 512; /* Size of the header of the file */
|
||||
size += 512; /* Size of the header of the file */
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -967,7 +973,7 @@ sendDir(char *path, int basepathlen, bool sizeonly)
|
||||
* and the file did not exist.
|
||||
*/
|
||||
static bool
|
||||
sendFile(char *readfilename, char *tarfilename, struct stat *statbuf,
|
||||
sendFile(char *readfilename, char *tarfilename, struct stat * statbuf,
|
||||
bool missing_ok)
|
||||
{
|
||||
FILE *fp;
|
||||
|
@ -51,7 +51,7 @@ static void libpqrcv_identify_system(TimeLineID *primary_tli);
|
||||
static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
|
||||
static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint);
|
||||
static void libpqrcv_endstreaming(TimeLineID *next_tli);
|
||||
static int libpqrcv_receive(int timeout, char **buffer);
|
||||
static int libpqrcv_receive(int timeout, char **buffer);
|
||||
static void libpqrcv_send(const char *buffer, int nbytes);
|
||||
static void libpqrcv_disconnect(void);
|
||||
|
||||
@ -209,12 +209,13 @@ libpqrcv_endstreaming(TimeLineID *next_tli)
|
||||
|
||||
if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn))
|
||||
ereport(ERROR,
|
||||
(errmsg("could not send end-of-streaming message to primary: %s",
|
||||
PQerrorMessage(streamConn))));
|
||||
(errmsg("could not send end-of-streaming message to primary: %s",
|
||||
PQerrorMessage(streamConn))));
|
||||
|
||||
/*
|
||||
* After COPY is finished, we should receive a result set indicating the
|
||||
* next timeline's ID, or just CommandComplete if the server was shut down.
|
||||
* next timeline's ID, or just CommandComplete if the server was shut
|
||||
* down.
|
||||
*
|
||||
* If we had not yet received CopyDone from the backend, PGRES_COPY_IN
|
||||
* would also be possible. However, at the moment this function is only
|
||||
@ -456,7 +457,7 @@ libpqrcv_disconnect(void)
|
||||
* 0 if no data was available within timeout, or wait was interrupted
|
||||
* by signal.
|
||||
*
|
||||
* -1 if the server ended the COPY.
|
||||
* -1 if the server ended the COPY.
|
||||
*
|
||||
* ereports on error.
|
||||
*/
|
||||
|
@ -443,7 +443,7 @@ SyncRepReleaseWaiters(void)
|
||||
|
||||
elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
|
||||
numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
|
||||
numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
|
||||
numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
|
||||
|
||||
/*
|
||||
* If we are managing the highest priority standby, though we weren't
|
||||
|
@ -86,7 +86,7 @@ walrcv_disconnect_type walrcv_disconnect = NULL;
|
||||
* corresponding the filename of recvFile.
|
||||
*/
|
||||
static int recvFile = -1;
|
||||
static TimeLineID recvFileTLI = 0;
|
||||
static TimeLineID recvFileTLI = 0;
|
||||
static XLogSegNo recvSegNo = 0;
|
||||
static uint32 recvOff = 0;
|
||||
|
||||
@ -107,8 +107,8 @@ static struct
|
||||
XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
|
||||
} LogstreamResult;
|
||||
|
||||
static StringInfoData reply_message;
|
||||
static StringInfoData incoming_message;
|
||||
static StringInfoData reply_message;
|
||||
static StringInfoData incoming_message;
|
||||
|
||||
/*
|
||||
* About SIGTERM handling:
|
||||
@ -332,12 +332,13 @@ WalReceiverMain(void)
|
||||
|
||||
/*
|
||||
* Get any missing history files. We do this always, even when we're
|
||||
* not interested in that timeline, so that if we're promoted to become
|
||||
* the master later on, we don't select the same timeline that was
|
||||
* already used in the current master. This isn't bullet-proof - you'll
|
||||
* need some external software to manage your cluster if you need to
|
||||
* ensure that a unique timeline id is chosen in every case, but let's
|
||||
* avoid the confusion of timeline id collisions where we can.
|
||||
* not interested in that timeline, so that if we're promoted to
|
||||
* become the master later on, we don't select the same timeline that
|
||||
* was already used in the current master. This isn't bullet-proof -
|
||||
* you'll need some external software to manage your cluster if you
|
||||
* need to ensure that a unique timeline id is chosen in every case,
|
||||
* but let's avoid the confusion of timeline id collisions where we
|
||||
* can.
|
||||
*/
|
||||
WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
|
||||
|
||||
@ -356,18 +357,18 @@ WalReceiverMain(void)
|
||||
ThisTimeLineID = startpointTLI;
|
||||
if (walrcv_startstreaming(startpointTLI, startpoint))
|
||||
{
|
||||
bool endofwal = false;
|
||||
bool endofwal = false;
|
||||
|
||||
if (first_stream)
|
||||
ereport(LOG,
|
||||
(errmsg("started streaming WAL from primary at %X/%X on timeline %u",
|
||||
(uint32) (startpoint >> 32), (uint32) startpoint,
|
||||
(uint32) (startpoint >> 32), (uint32) startpoint,
|
||||
startpointTLI)));
|
||||
else
|
||||
ereport(LOG,
|
||||
(errmsg("restarted WAL streaming at %X/%X on timeline %u",
|
||||
(uint32) (startpoint >> 32), (uint32) startpoint,
|
||||
startpointTLI)));
|
||||
(errmsg("restarted WAL streaming at %X/%X on timeline %u",
|
||||
(uint32) (startpoint >> 32), (uint32) startpoint,
|
||||
startpointTLI)));
|
||||
first_stream = false;
|
||||
|
||||
/* Initialize LogstreamResult and buffers for processing messages */
|
||||
@ -387,7 +388,8 @@ WalReceiverMain(void)
|
||||
|
||||
/*
|
||||
* Emergency bailout if postmaster has died. This is to avoid
|
||||
* the necessity for manual cleanup of all postmaster children.
|
||||
* the necessity for manual cleanup of all postmaster
|
||||
* children.
|
||||
*/
|
||||
if (!PostmasterIsAlive())
|
||||
exit(1);
|
||||
@ -422,7 +424,10 @@ WalReceiverMain(void)
|
||||
{
|
||||
if (len > 0)
|
||||
{
|
||||
/* Something was received from master, so reset timeout */
|
||||
/*
|
||||
* Something was received from master, so reset
|
||||
* timeout
|
||||
*/
|
||||
last_recv_timestamp = GetCurrentTimestamp();
|
||||
ping_sent = false;
|
||||
XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
|
||||
@ -457,12 +462,13 @@ WalReceiverMain(void)
|
||||
/*
|
||||
* We didn't receive anything new. If we haven't heard
|
||||
* anything from the server for more than
|
||||
* wal_receiver_timeout / 2, ping the server. Also, if it's
|
||||
* been longer than wal_receiver_status_interval since the
|
||||
* last update we sent, send a status update to the master
|
||||
* anyway, to report any progress in applying WAL.
|
||||
* wal_receiver_timeout / 2, ping the server. Also, if
|
||||
* it's been longer than wal_receiver_status_interval
|
||||
* since the last update we sent, send a status update to
|
||||
* the master anyway, to report any progress in applying
|
||||
* WAL.
|
||||
*/
|
||||
bool requestReply = false;
|
||||
bool requestReply = false;
|
||||
|
||||
/*
|
||||
* Check if time since last receive from standby has
|
||||
@ -482,13 +488,13 @@ WalReceiverMain(void)
|
||||
(errmsg("terminating walreceiver due to timeout")));
|
||||
|
||||
/*
|
||||
* We didn't receive anything new, for half of receiver
|
||||
* replication timeout. Ping the server.
|
||||
* We didn't receive anything new, for half of
|
||||
* receiver replication timeout. Ping the server.
|
||||
*/
|
||||
if (!ping_sent)
|
||||
{
|
||||
timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
|
||||
(wal_receiver_timeout/2));
|
||||
(wal_receiver_timeout / 2));
|
||||
if (now >= timeout)
|
||||
{
|
||||
requestReply = true;
|
||||
@ -511,9 +517,9 @@ WalReceiverMain(void)
|
||||
DisableWalRcvImmediateExit();
|
||||
|
||||
/*
|
||||
* If the server had switched to a new timeline that we didn't know
|
||||
* about when we began streaming, fetch its timeline history file
|
||||
* now.
|
||||
* If the server had switched to a new timeline that we didn't
|
||||
* know about when we began streaming, fetch its timeline history
|
||||
* file now.
|
||||
*/
|
||||
WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
|
||||
}
|
||||
@ -614,8 +620,8 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
|
||||
if (walrcv->walRcvState == WALRCV_STOPPING)
|
||||
{
|
||||
/*
|
||||
* We should've received SIGTERM if the startup process wants
|
||||
* us to die, but might as well check it here too.
|
||||
* We should've received SIGTERM if the startup process wants us
|
||||
* to die, but might as well check it here too.
|
||||
*/
|
||||
SpinLockRelease(&walrcv->mutex);
|
||||
exit(1);
|
||||
@ -643,7 +649,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
|
||||
static void
|
||||
WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
|
||||
{
|
||||
TimeLineID tli;
|
||||
TimeLineID tli;
|
||||
|
||||
for (tli = first; tli <= last; tli++)
|
||||
{
|
||||
@ -664,8 +670,9 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
|
||||
DisableWalRcvImmediateExit();
|
||||
|
||||
/*
|
||||
* Check that the filename on the master matches what we calculated
|
||||
* ourselves. This is just a sanity check, it should always match.
|
||||
* Check that the filename on the master matches what we
|
||||
* calculated ourselves. This is just a sanity check, it should
|
||||
* always match.
|
||||
*/
|
||||
TLHistoryFileName(expectedfname, tli);
|
||||
if (strcmp(fname, expectedfname) != 0)
|
||||
@ -791,7 +798,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
|
||||
int hdrlen;
|
||||
XLogRecPtr dataStart;
|
||||
XLogRecPtr walEnd;
|
||||
TimestampTz sendTime;
|
||||
TimestampTz sendTime;
|
||||
bool replyRequested;
|
||||
|
||||
resetStringInfo(&incoming_message);
|
||||
@ -812,7 +819,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
|
||||
dataStart = pq_getmsgint64(&incoming_message);
|
||||
walEnd = pq_getmsgint64(&incoming_message);
|
||||
sendTime = IntegerTimestampToTimestampTz(
|
||||
pq_getmsgint64(&incoming_message));
|
||||
pq_getmsgint64(&incoming_message));
|
||||
ProcessWalSndrMessage(walEnd, sendTime);
|
||||
|
||||
buf += hdrlen;
|
||||
@ -833,7 +840,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
|
||||
/* read the fields */
|
||||
walEnd = pq_getmsgint64(&incoming_message);
|
||||
sendTime = IntegerTimestampToTimestampTz(
|
||||
pq_getmsgint64(&incoming_message));
|
||||
pq_getmsgint64(&incoming_message));
|
||||
replyRequested = pq_getmsgbyte(&incoming_message);
|
||||
|
||||
ProcessWalSndrMessage(walEnd, sendTime);
|
||||
@ -890,8 +897,8 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
|
||||
XLogFileNameP(recvFileTLI, recvSegNo))));
|
||||
|
||||
/*
|
||||
* Create .done file forcibly to prevent the streamed segment from
|
||||
* being archived later.
|
||||
* Create .done file forcibly to prevent the streamed segment
|
||||
* from being archived later.
|
||||
*/
|
||||
XLogFileName(xlogfname, recvFileTLI, recvSegNo);
|
||||
XLogArchiveForceDone(xlogfname);
|
||||
@ -920,9 +927,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
|
||||
if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
|
||||
ereport(PANIC,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not seek in log segment %s, to offset %u: %m",
|
||||
XLogFileNameP(recvFileTLI, recvSegNo),
|
||||
startoff)));
|
||||
errmsg("could not seek in log segment %s, to offset %u: %m",
|
||||
XLogFileNameP(recvFileTLI, recvSegNo),
|
||||
startoff)));
|
||||
recvOff = startoff;
|
||||
}
|
||||
|
||||
@ -1110,7 +1117,7 @@ XLogWalRcvSendHSFeedback(bool immed)
|
||||
* Send feedback at most once per wal_receiver_status_interval.
|
||||
*/
|
||||
if (!TimestampDifferenceExceeds(sendTime, now,
|
||||
wal_receiver_status_interval * 1000))
|
||||
wal_receiver_status_interval * 1000))
|
||||
return;
|
||||
sendTime = now;
|
||||
}
|
||||
|
@ -94,12 +94,13 @@ bool am_cascading_walsender = false; /* Am I cascading WAL to
|
||||
|
||||
/* User-settable parameters for walsender */
|
||||
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
|
||||
int wal_sender_timeout = 60 * 1000; /* maximum time to send one
|
||||
int wal_sender_timeout = 60 * 1000; /* maximum time to send one
|
||||
* WAL data message */
|
||||
|
||||
/*
|
||||
* State for WalSndWakeupRequest
|
||||
*/
|
||||
bool wake_wal_senders = false;
|
||||
bool wake_wal_senders = false;
|
||||
|
||||
/*
|
||||
* These variables are used similarly to openLogFile/Id/Seg/Off,
|
||||
@ -110,7 +111,7 @@ static XLogSegNo sendSegNo = 0;
|
||||
static uint32 sendOff = 0;
|
||||
|
||||
/* Timeline ID of the currently open file */
|
||||
static TimeLineID curFileTimeLine = 0;
|
||||
static TimeLineID curFileTimeLine = 0;
|
||||
|
||||
/*
|
||||
* These variables keep track of the state of the timeline we're currently
|
||||
@ -118,10 +119,10 @@ static TimeLineID curFileTimeLine = 0;
|
||||
* the timeline is not the latest timeline on this server, and the server's
|
||||
* history forked off from that timeline at sendTimeLineValidUpto.
|
||||
*/
|
||||
static TimeLineID sendTimeLine = 0;
|
||||
static TimeLineID sendTimeLineNextTLI = 0;
|
||||
static bool sendTimeLineIsHistoric = false;
|
||||
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
|
||||
static TimeLineID sendTimeLine = 0;
|
||||
static TimeLineID sendTimeLineNextTLI = 0;
|
||||
static bool sendTimeLineIsHistoric = false;
|
||||
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
|
||||
|
||||
/*
|
||||
* How far have we sent WAL already? This is also advertised in
|
||||
@ -138,8 +139,9 @@ static StringInfoData tmpbuf;
|
||||
* Timestamp of the last receipt of the reply from the standby.
|
||||
*/
|
||||
static TimestampTz last_reply_timestamp;
|
||||
|
||||
/* Have we sent a heartbeat message asking for reply, since last reply? */
|
||||
static bool ping_sent = false;
|
||||
static bool ping_sent = false;
|
||||
|
||||
/*
|
||||
* While streaming WAL in Copy mode, streamingDoneSending is set to true
|
||||
@ -147,8 +149,8 @@ static bool ping_sent = false;
|
||||
* after that. streamingDoneReceiving is set to true when we receive CopyDone
|
||||
* from the other end. When both become true, it's time to exit Copy mode.
|
||||
*/
|
||||
static bool streamingDoneSending;
|
||||
static bool streamingDoneReceiving;
|
||||
static bool streamingDoneSending;
|
||||
static bool streamingDoneReceiving;
|
||||
|
||||
/* Flags set by signal handlers for later service in main loop */
|
||||
static volatile sig_atomic_t got_SIGHUP = false;
|
||||
@ -322,8 +324,8 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd)
|
||||
off_t bytesleft;
|
||||
|
||||
/*
|
||||
* Reply with a result set with one row, and two columns. The first col
|
||||
* is the name of the history file, 2nd is the contents.
|
||||
* Reply with a result set with one row, and two columns. The first col is
|
||||
* the name of the history file, 2nd is the contents.
|
||||
*/
|
||||
|
||||
TLHistoryFileName(histfname, cmd->timeline);
|
||||
@ -343,7 +345,7 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd)
|
||||
pq_sendint(&buf, 0, 2); /* format code */
|
||||
|
||||
/* second field */
|
||||
pq_sendstring(&buf, "content"); /* col name */
|
||||
pq_sendstring(&buf, "content"); /* col name */
|
||||
pq_sendint(&buf, 0, 4); /* table oid */
|
||||
pq_sendint(&buf, 0, 2); /* attnum */
|
||||
pq_sendint(&buf, BYTEAOID, 4); /* type oid */
|
||||
@ -355,7 +357,7 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd)
|
||||
/* Send a DataRow message */
|
||||
pq_beginmessage(&buf, 'D');
|
||||
pq_sendint(&buf, 2, 2); /* # of columns */
|
||||
pq_sendint(&buf, strlen(histfname), 4); /* col1 len */
|
||||
pq_sendint(&buf, strlen(histfname), 4); /* col1 len */
|
||||
pq_sendbytes(&buf, histfname, strlen(histfname));
|
||||
|
||||
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
|
||||
@ -373,15 +375,15 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd)
|
||||
if (lseek(fd, 0, SEEK_SET) != 0)
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not seek to beginning of file \"%s\": %m", path)));
|
||||
errmsg("could not seek to beginning of file \"%s\": %m", path)));
|
||||
|
||||
pq_sendint(&buf, histfilelen, 4); /* col2 len */
|
||||
|
||||
bytesleft = histfilelen;
|
||||
while (bytesleft > 0)
|
||||
{
|
||||
char rbuf[BLCKSZ];
|
||||
int nread;
|
||||
char rbuf[BLCKSZ];
|
||||
int nread;
|
||||
|
||||
nread = read(fd, rbuf, sizeof(rbuf));
|
||||
if (nread <= 0)
|
||||
@ -407,7 +409,7 @@ static void
|
||||
StartReplication(StartReplicationCmd *cmd)
|
||||
{
|
||||
StringInfoData buf;
|
||||
XLogRecPtr FlushPtr;
|
||||
XLogRecPtr FlushPtr;
|
||||
|
||||
/*
|
||||
* We assume here that we're logging enough information in the WAL for
|
||||
@ -420,8 +422,8 @@ StartReplication(StartReplicationCmd *cmd)
|
||||
|
||||
/*
|
||||
* Select the timeline. If it was given explicitly by the client, use
|
||||
* that. Otherwise use the timeline of the last replayed record, which
|
||||
* is kept in ThisTimeLineID.
|
||||
* that. Otherwise use the timeline of the last replayed record, which is
|
||||
* kept in ThisTimeLineID.
|
||||
*/
|
||||
if (am_cascading_walsender)
|
||||
{
|
||||
@ -448,8 +450,8 @@ StartReplication(StartReplicationCmd *cmd)
|
||||
sendTimeLineIsHistoric = true;
|
||||
|
||||
/*
|
||||
* Check that the timeline the client requested for exists, and the
|
||||
* requested start location is on that timeline.
|
||||
* Check that the timeline the client requested for exists, and
|
||||
* the requested start location is on that timeline.
|
||||
*/
|
||||
timeLineHistory = readTimeLineHistory(ThisTimeLineID);
|
||||
switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
|
||||
@ -461,14 +463,14 @@ StartReplication(StartReplicationCmd *cmd)
|
||||
* requested startpoint is on that timeline in our history.
|
||||
*
|
||||
* This is quite loose on purpose. We only check that we didn't
|
||||
* fork off the requested timeline before the switchpoint. We don't
|
||||
* check that we switched *to* it before the requested starting
|
||||
* point. This is because the client can legitimately request to
|
||||
* start replication from the beginning of the WAL segment that
|
||||
* contains switchpoint, but on the new timeline, so that it
|
||||
* doesn't end up with a partial segment. If you ask for a too old
|
||||
* starting point, you'll get an error later when we fail to find
|
||||
* the requested WAL segment in pg_xlog.
|
||||
* fork off the requested timeline before the switchpoint. We
|
||||
* don't check that we switched *to* it before the requested
|
||||
* starting point. This is because the client can legitimately
|
||||
* request to start replication from the beginning of the WAL
|
||||
* segment that contains switchpoint, but on the new timeline, so
|
||||
* that it doesn't end up with a partial segment. If you ask for a
|
||||
* too old starting point, you'll get an error later when we fail
|
||||
* to find the requested WAL segment in pg_xlog.
|
||||
*
|
||||
* XXX: we could be more strict here and only allow a startpoint
|
||||
* that's older than the switchpoint, if it it's still in the same
|
||||
@ -503,12 +505,13 @@ StartReplication(StartReplicationCmd *cmd)
|
||||
if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
|
||||
{
|
||||
/*
|
||||
* When we first start replication the standby will be behind the primary.
|
||||
* For some applications, for example, synchronous replication, it is
|
||||
* important to have a clear state for this initial catchup mode, so we
|
||||
* can trigger actions when we change streaming state later. We may stay
|
||||
* in this state for a long time, which is exactly why we want to be able
|
||||
* to monitor whether or not we are still here.
|
||||
* When we first start replication the standby will be behind the
|
||||
* primary. For some applications, for example, synchronous
|
||||
* replication, it is important to have a clear state for this initial
|
||||
* catchup mode, so we can trigger actions when we change streaming
|
||||
* state later. We may stay in this state for a long time, which is
|
||||
* exactly why we want to be able to monitor whether or not we are
|
||||
* still here.
|
||||
*/
|
||||
WalSndSetState(WALSNDSTATE_CATCHUP);
|
||||
|
||||
@ -568,20 +571,21 @@ StartReplication(StartReplicationCmd *cmd)
|
||||
if (sendTimeLineIsHistoric)
|
||||
{
|
||||
char tli_str[11];
|
||||
char startpos_str[8+1+8+1];
|
||||
char startpos_str[8 + 1 + 8 + 1];
|
||||
|
||||
snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI);
|
||||
snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
|
||||
(uint32) (sendTimeLineValidUpto >> 32),
|
||||
(uint32) sendTimeLineValidUpto);
|
||||
|
||||
pq_beginmessage(&buf, 'T'); /* RowDescription */
|
||||
pq_sendint(&buf, 2, 2); /* 2 fields */
|
||||
pq_beginmessage(&buf, 'T'); /* RowDescription */
|
||||
pq_sendint(&buf, 2, 2); /* 2 fields */
|
||||
|
||||
/* Field header */
|
||||
pq_sendstring(&buf, "next_tli");
|
||||
pq_sendint(&buf, 0, 4); /* table oid */
|
||||
pq_sendint(&buf, 0, 2); /* attnum */
|
||||
pq_sendint(&buf, 0, 4); /* table oid */
|
||||
pq_sendint(&buf, 0, 2); /* attnum */
|
||||
|
||||
/*
|
||||
* int8 may seem like a surprising data type for this, but in theory
|
||||
* int4 would not be wide enough for this, as TimeLineID is unsigned.
|
||||
@ -592,8 +596,8 @@ StartReplication(StartReplicationCmd *cmd)
|
||||
pq_sendint(&buf, 0, 2);
|
||||
|
||||
pq_sendstring(&buf, "next_tli_startpos");
|
||||
pq_sendint(&buf, 0, 4); /* table oid */
|
||||
pq_sendint(&buf, 0, 2); /* attnum */
|
||||
pq_sendint(&buf, 0, 4); /* table oid */
|
||||
pq_sendint(&buf, 0, 2); /* attnum */
|
||||
pq_sendint(&buf, TEXTOID, 4); /* type oid */
|
||||
pq_sendint(&buf, -1, 2);
|
||||
pq_sendint(&buf, 0, 4);
|
||||
@ -602,12 +606,12 @@ StartReplication(StartReplicationCmd *cmd)
|
||||
|
||||
/* Data row */
|
||||
pq_beginmessage(&buf, 'D');
|
||||
pq_sendint(&buf, 2, 2); /* number of columns */
|
||||
pq_sendint(&buf, 2, 2); /* number of columns */
|
||||
|
||||
pq_sendint(&buf, strlen(tli_str), 4); /* length */
|
||||
pq_sendbytes(&buf, tli_str, strlen(tli_str));
|
||||
|
||||
pq_sendint(&buf, strlen(startpos_str), 4); /* length */
|
||||
pq_sendint(&buf, strlen(startpos_str), 4); /* length */
|
||||
pq_sendbytes(&buf, startpos_str, strlen(startpos_str));
|
||||
|
||||
pq_endmessage(&buf);
|
||||
@ -840,7 +844,7 @@ ProcessStandbyReplyMessage(void)
|
||||
writePtr = pq_getmsgint64(&reply_message);
|
||||
flushPtr = pq_getmsgint64(&reply_message);
|
||||
applyPtr = pq_getmsgint64(&reply_message);
|
||||
(void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
|
||||
(void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
|
||||
replyRequested = pq_getmsgbyte(&reply_message);
|
||||
|
||||
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
|
||||
@ -887,7 +891,7 @@ ProcessStandbyHSFeedbackMessage(void)
|
||||
* Decipher the reply message. The caller already consumed the msgtype
|
||||
* byte.
|
||||
*/
|
||||
(void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
|
||||
(void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
|
||||
feedbackXmin = pq_getmsgint(&reply_message, 4);
|
||||
feedbackEpoch = pq_getmsgint(&reply_message, 4);
|
||||
|
||||
@ -932,11 +936,11 @@ ProcessStandbyHSFeedbackMessage(void)
|
||||
* cleanup conflicts on the standby server.
|
||||
*
|
||||
* There is a small window for a race condition here: although we just
|
||||
* checked that feedbackXmin precedes nextXid, the nextXid could have gotten
|
||||
* advanced between our fetching it and applying the xmin below, perhaps
|
||||
* far enough to make feedbackXmin wrap around. In that case the xmin we
|
||||
* set here would be "in the future" and have no effect. No point in
|
||||
* worrying about this since it's too late to save the desired data
|
||||
* checked that feedbackXmin precedes nextXid, the nextXid could have
|
||||
* gotten advanced between our fetching it and applying the xmin below,
|
||||
* perhaps far enough to make feedbackXmin wrap around. In that case the
|
||||
* xmin we set here would be "in the future" and have no effect. No point
|
||||
* in worrying about this since it's too late to save the desired data
|
||||
* anyway. Assuming that the standby sends us an increasing sequence of
|
||||
* xmins, this could only happen during the first reply cycle, else our
|
||||
* own xmin would prevent nextXid from advancing so far.
|
||||
@ -969,8 +973,8 @@ WalSndLoop(void)
|
||||
ping_sent = false;
|
||||
|
||||
/*
|
||||
* Loop until we reach the end of this timeline or the client requests
|
||||
* to stop streaming.
|
||||
* Loop until we reach the end of this timeline or the client requests to
|
||||
* stop streaming.
|
||||
*/
|
||||
for (;;)
|
||||
{
|
||||
@ -1082,8 +1086,8 @@ WalSndLoop(void)
|
||||
{
|
||||
/*
|
||||
* If half of wal_sender_timeout has lapsed without receiving
|
||||
* any reply from standby, send a keep-alive message to standby
|
||||
* requesting an immediate reply.
|
||||
* any reply from standby, send a keep-alive message to
|
||||
* standby requesting an immediate reply.
|
||||
*/
|
||||
timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
|
||||
wal_sender_timeout / 2);
|
||||
@ -1133,6 +1137,7 @@ WalSndLoop(void)
|
||||
return;
|
||||
|
||||
send_failure:
|
||||
|
||||
/*
|
||||
* Get here on send failure. Clean up and exit.
|
||||
*
|
||||
@ -1290,7 +1295,7 @@ retry:
|
||||
curFileTimeLine = sendTimeLine;
|
||||
if (sendTimeLineIsHistoric)
|
||||
{
|
||||
XLogSegNo endSegNo;
|
||||
XLogSegNo endSegNo;
|
||||
|
||||
XLByteToSeg(sendTimeLineValidUpto, endSegNo);
|
||||
if (sendSegNo == endSegNo)
|
||||
@ -1311,7 +1316,7 @@ retry:
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("requested WAL segment %s has already been removed",
|
||||
XLogFileNameP(curFileTimeLine, sendSegNo))));
|
||||
XLogFileNameP(curFileTimeLine, sendSegNo))));
|
||||
else
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
@ -1327,9 +1332,9 @@ retry:
|
||||
if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not seek in log segment %s to offset %u: %m",
|
||||
XLogFileNameP(curFileTimeLine, sendSegNo),
|
||||
startoff)));
|
||||
errmsg("could not seek in log segment %s to offset %u: %m",
|
||||
XLogFileNameP(curFileTimeLine, sendSegNo),
|
||||
startoff)));
|
||||
sendOff = startoff;
|
||||
}
|
||||
|
||||
@ -1344,9 +1349,9 @@ retry:
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read from log segment %s, offset %u, length %lu: %m",
|
||||
XLogFileNameP(curFileTimeLine, sendSegNo),
|
||||
sendOff, (unsigned long) segbytes)));
|
||||
errmsg("could not read from log segment %s, offset %u, length %lu: %m",
|
||||
XLogFileNameP(curFileTimeLine, sendSegNo),
|
||||
sendOff, (unsigned long) segbytes)));
|
||||
}
|
||||
|
||||
/* Update state for read */
|
||||
@ -1431,16 +1436,16 @@ XLogSend(bool *caughtup)
|
||||
/*
|
||||
* Streaming the latest timeline on a standby.
|
||||
*
|
||||
* Attempt to send all WAL that has already been replayed, so that
|
||||
* we know it's valid. If we're receiving WAL through streaming
|
||||
* Attempt to send all WAL that has already been replayed, so that we
|
||||
* know it's valid. If we're receiving WAL through streaming
|
||||
* replication, it's also OK to send any WAL that has been received
|
||||
* but not replayed.
|
||||
*
|
||||
* The timeline we're recovering from can change, or we can be
|
||||
* promoted. In either case, the current timeline becomes historic.
|
||||
* We need to detect that so that we don't try to stream past the
|
||||
* point where we switched to another timeline. We check for promotion
|
||||
* or timeline switch after calculating FlushPtr, to avoid a race
|
||||
* promoted. In either case, the current timeline becomes historic. We
|
||||
* need to detect that so that we don't try to stream past the point
|
||||
* where we switched to another timeline. We check for promotion or
|
||||
* timeline switch after calculating FlushPtr, to avoid a race
|
||||
* condition: if the timeline becomes historic just after we checked
|
||||
* that it was still current, it's still be OK to stream it up to the
|
||||
* FlushPtr that was calculated before it became historic.
|
||||
@ -1496,7 +1501,7 @@ XLogSend(bool *caughtup)
|
||||
*
|
||||
* Attempt to send all data that's already been written out and
|
||||
* fsync'd to disk. We cannot go further than what's been written out
|
||||
* given the current implementation of XLogRead(). And in any case
|
||||
* given the current implementation of XLogRead(). And in any case
|
||||
* it's unsafe to send WAL that is not securely down to disk on the
|
||||
* master: if the master subsequently crashes and restarts, slaves
|
||||
* must not have applied any WAL that gets lost on the master.
|
||||
@ -1509,13 +1514,14 @@ XLogSend(bool *caughtup)
|
||||
* forked to the next timeline, stop streaming.
|
||||
*
|
||||
* Note: We might already have sent WAL > sendTimeLineValidUpto. The
|
||||
* startup process will normally replay all WAL that has been received from
|
||||
* the master, before promoting, but if the WAL streaming is terminated at
|
||||
* a WAL page boundary, the valid portion of the timeline might end in the
|
||||
* middle of a WAL record. We might've already sent the first half of that
|
||||
* partial WAL record to the cascading standby, so that sentPtr >
|
||||
* sendTimeLineValidUpto. That's OK; the cascading standby can't replay the
|
||||
* partial WAL record either, so it can still follow our timeline switch.
|
||||
* startup process will normally replay all WAL that has been received
|
||||
* from the master, before promoting, but if the WAL streaming is
|
||||
* terminated at a WAL page boundary, the valid portion of the timeline
|
||||
* might end in the middle of a WAL record. We might've already sent the
|
||||
* first half of that partial WAL record to the cascading standby, so that
|
||||
* sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
|
||||
* replay the partial WAL record either, so it can still follow our
|
||||
* timeline switch.
|
||||
*/
|
||||
if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
|
||||
{
|
||||
@ -1585,8 +1591,8 @@ XLogSend(bool *caughtup)
|
||||
pq_sendbyte(&output_message, 'w');
|
||||
|
||||
pq_sendint64(&output_message, startptr); /* dataStart */
|
||||
pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
|
||||
pq_sendint64(&output_message, 0); /* sendtime, filled in last */
|
||||
pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
|
||||
pq_sendint64(&output_message, 0); /* sendtime, filled in last */
|
||||
|
||||
/*
|
||||
* Read the log directly into the output buffer to avoid extra memcpy
|
||||
@ -1643,16 +1649,16 @@ XLogSend(bool *caughtup)
|
||||
static XLogRecPtr
|
||||
GetStandbyFlushRecPtr(void)
|
||||
{
|
||||
XLogRecPtr replayPtr;
|
||||
TimeLineID replayTLI;
|
||||
XLogRecPtr receivePtr;
|
||||
TimeLineID receiveTLI;
|
||||
XLogRecPtr replayPtr;
|
||||
TimeLineID replayTLI;
|
||||
XLogRecPtr receivePtr;
|
||||
TimeLineID receiveTLI;
|
||||
XLogRecPtr result;
|
||||
|
||||
/*
|
||||
* We can safely send what's already been replayed. Also, if walreceiver
|
||||
* is streaming WAL from the same timeline, we can send anything that
|
||||
* it has streamed, but hasn't been replayed yet.
|
||||
* is streaming WAL from the same timeline, we can send anything that it
|
||||
* has streamed, but hasn't been replayed yet.
|
||||
*/
|
||||
|
||||
receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
|
||||
@ -1742,8 +1748,8 @@ WalSndSignals(void)
|
||||
pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config
|
||||
* file */
|
||||
pqsignal(SIGINT, SIG_IGN); /* not used */
|
||||
pqsignal(SIGTERM, die); /* request shutdown */
|
||||
pqsignal(SIGQUIT, quickdie); /* hard crash time */
|
||||
pqsignal(SIGTERM, die); /* request shutdown */
|
||||
pqsignal(SIGQUIT, quickdie); /* hard crash time */
|
||||
InitializeTimeouts(); /* establishes SIGALRM handler */
|
||||
pqsignal(SIGPIPE, SIG_IGN);
|
||||
pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
|
||||
|
Reference in New Issue
Block a user