1
0
mirror of https://github.com/postgres/postgres.git synced 2025-05-18 17:41:14 +03:00

Prevent WAL files created by pg_basebackup -x/X from being archived again.

WAL (and timeline history) files created by pg_basebackup did not
maintain the new base backup's archive status. That's currently not a
problem if the new node is used as a standby - but if that node is
promoted all still existing files can get archived again.  With a high
wal_keep_segment settings that can happen a significant time later -
which is quite confusing.

Change both the backend (for the -x/-X fetch case) and pg_basebackup
(for -X stream) itself to always mark WAL/timeline files included in
the base backup as .done. That's in line with walreceiver.c doing so.

The verbosity of the pg_basebackup changes show pretty clearly that it
needs some refactoring, but that'd result in not be backpatchable
changes.

Backpatch to 9.1 where pg_basebackup was introduced.

Discussion: 20141205002854.GE21964@awork2.anarazel.de
This commit is contained in:
Andres Freund 2015-01-03 20:51:52 +01:00
parent bb2e2ce6e2
commit f6cea45029
5 changed files with 116 additions and 22 deletions

View File

@ -404,6 +404,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
}
/* send the WAL file itself */
_tarWriteHeader(pathbuf, NULL, &statbuf);
while ((cnt = fread(buf, 1, Min(sizeof(buf), XLogSegSize - len), fp)) > 0)
@ -428,7 +429,17 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
}
/* XLogSegSize is a multiple of 512, so no need for padding */
FreeFile(fp);
/*
* Mark file as archived, otherwise files can get archived again
* after promotion of a new node. This is in line with
* walreceiver.c always doing a XLogArchiveForceDone() after a
* complete segment.
*/
StatusFilePath(pathbuf, walFiles[i], ".done");
sendFileWithContent(pathbuf, "");
}
/*
@ -452,6 +463,10 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
errmsg("could not stat file \"%s\": %m", pathbuf)));
sendFile(pathbuf, pathbuf, &statbuf, false);
/* unconditionally mark file as archived */
StatusFilePath(pathbuf, fname, ".done");
sendFileWithContent(pathbuf, "");
}
/* Send CopyDone message for the last tar file */
@ -900,6 +915,15 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces)
_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
}
size += 512; /* Size of the header just added */
/*
* Also send archive_status directory (by hackishly reusing
* statbuf from above ...).
*/
if (!sizeonly)
_tarWriteHeader("./pg_xlog/archive_status", NULL, &statbuf);
size += 512; /* Size of the header just added */
continue; /* don't recurse into pg_xlog */
}

View File

@ -28,6 +28,7 @@
#include <zlib.h>
#endif
#include "common/string.h"
#include "getopt_long.h"
#include "receivelog.h"
@ -266,7 +267,7 @@ LogStreamerMain(logstreamer_param *param)
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
NULL))
NULL, true))
/*
* Any errors will already have been reported in the function process,
@ -290,6 +291,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
logstreamer_param *param;
uint32 hi,
lo;
char statusdir[MAXPGPATH];
param = pg_malloc0(sizeof(logstreamer_param));
param->timeline = timeline;
@ -324,13 +326,23 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
/* Error message already written in GetConnection() */
exit(1);
/*
* Always in plain format, so we can write to basedir/pg_xlog. But the
* directory entry in the tar file may arrive later, so make sure it's
* created before we start.
*/
snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
verify_dir_is_empty_or_create(param->xlogdir);
/*
* Create pg_xlog/archive_status (and thus pg_xlog) so we can can write to
* basedir/pg_xlog as the directory entry in the tar file may arrive
* later.
*/
snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status",
basedir);
if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
{
fprintf(stderr,
_("%s: could not create directory \"%s\": %s\n"),
progname, statusdir, strerror(errno));
disconnect_and_exit(1);
}
/*
* Start a child process and tell it to start streaming. On Unix, this is
@ -1003,10 +1015,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
{
/*
* When streaming WAL, pg_xlog will have been created
* by the wal receiver process, so just ignore failure
* on that.
* by the wal receiver process. So just ignore creation
* failures on related directories.
*/
if (!streamwal || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0)
if (!((pg_str_endswith(filename, "/pg_xlog") ||
pg_str_endswith(filename, "/archive_status")) &&
errno == EEXIST))
{
fprintf(stderr,
_("%s: could not create directory \"%s\": %s\n"),

View File

@ -335,7 +335,8 @@ StreamLog(void)
starttli);
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
stop_streaming, standby_message_timeout, ".partial");
stop_streaming, standby_message_timeout, ".partial",
false);
PQfinish(conn);
}

View File

@ -35,11 +35,41 @@ static char current_walfile_name[MAXPGPATH] = "";
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos);
char *partial_suffix, XLogRecPtr *stoppos,
bool mark_done);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
static bool
mark_file_as_archived(const char *basedir, const char *fname)
{
int fd;
static char tmppath[MAXPGPATH];
snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
basedir, fname);
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0)
{
fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
progname, tmppath, strerror(errno));
return false;
}
if (fsync(fd) != 0)
{
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
progname, tmppath, strerror(errno));
return false;
}
close(fd);
return true;
}
/*
* Open a new WAL file in the specified directory.
*
@ -133,7 +163,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
* and returns false, otherwise returns true.
*/
static bool
close_walfile(char *basedir, char *partial_suffix)
close_walfile(char *basedir, char *partial_suffix, bool mark_done)
{
off_t currpos;
@ -187,6 +217,19 @@ close_walfile(char *basedir, char *partial_suffix)
_("%s: not renaming \"%s%s\", segment is not complete\n"),
progname, current_walfile_name, partial_suffix);
/*
* Mark file as archived if requested by the caller - pg_basebackup needs
* to do so as files can otherwise get archived again after promotion of a
* new node. This is in line with walreceiver.c always doing a
* XLogArchiveForceDone() after a complete segment.
*/
if (currpos == XLOG_SEG_SIZE && mark_done)
{
/* writes error message if failed */
if (!mark_file_as_archived(basedir, current_walfile_name))
return false;
}
return true;
}
@ -285,7 +328,8 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
}
static bool
writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content)
writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
char *content, bool mark_done)
{
int size = strlen(content);
char path[MAXPGPATH];
@ -364,6 +408,14 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co
return false;
}
/* Maintain archive_status, check close_walfile() for details. */
if (mark_done)
{
/* writes error message if failed */
if (!mark_file_as_archived(basedir, histfname))
return false;
}
return true;
}
@ -508,7 +560,8 @@ bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix)
int standby_message_timeout, char *partial_suffix,
bool mark_done)
{
char query[128];
PGresult *res;
@ -593,7 +646,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Write the history file to disk */
writeTimeLineHistoryFile(basedir, timeline,
PQgetvalue(res, 0, 0),
PQgetvalue(res, 0, 1));
PQgetvalue(res, 0, 1),
mark_done);
PQclear(res);
}
@ -622,7 +676,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
&stoppos);
&stoppos, mark_done);
if (res == NULL)
goto error;
@ -787,7 +841,7 @@ static PGresult *
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
XLogRecPtr *stoppos)
XLogRecPtr *stoppos, bool mark_done)
{
char *copybuf = NULL;
int64 last_status = -1;
@ -814,7 +868,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
*/
if (still_sending && stream_stop(blockpos, timeline, false))
{
if (!close_walfile(basedir, partial_suffix))
if (!close_walfile(basedir, partial_suffix, mark_done))
{
/* Potential error message is written by close_walfile */
goto error;
@ -913,7 +967,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
*/
if (still_sending)
{
if (!close_walfile(basedir, partial_suffix))
if (!close_walfile(basedir, partial_suffix, mark_done))
{
/* Error message written in close_walfile() */
PQclear(res);
@ -1081,7 +1135,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Did we reach the end of a WAL segment? */
if (blockpos % XLOG_SEG_SIZE == 0)
{
if (!close_walfile(basedir, partial_suffix))
if (!close_walfile(basedir, partial_suffix, mark_done))
/* Error message written in close_walfile() */
goto error;

View File

@ -14,4 +14,5 @@ extern bool ReceiveXlogStream(PGconn *conn,
char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout,
char *partial_suffix);
char *partial_suffix,
bool mark_done);