mirror of
https://github.com/postgres/postgres.git
synced 2025-07-15 19:21:59 +03:00
Prevent WAL files created by pg_basebackup -x/X from being archived again.
WAL (and timeline history) files created by pg_basebackup did not maintain the new base backup's archive status. That's currently not a problem if the new node is used as a standby - but if that node is promoted all still existing files can get archived again. With a high wal_keep_segment settings that can happen a significant time later - which is quite confusing. Change both the backend (for the -x/-X fetch case) and pg_basebackup (for -X stream) itself to always mark WAL/timeline files included in the base backup as .done. That's in line with walreceiver.c doing so. The verbosity of the pg_basebackup changes show pretty clearly that it needs some refactoring, but that'd result in not be backpatchable changes. Backpatch to 9.1 where pg_basebackup was introduced. Discussion: 20141205002854.GE21964@awork2.anarazel.de
This commit is contained in:
@ -400,6 +400,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
|
|||||||
errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
|
errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* send the WAL file itself */
|
||||||
_tarWriteHeader(pathbuf, NULL, &statbuf);
|
_tarWriteHeader(pathbuf, NULL, &statbuf);
|
||||||
|
|
||||||
while ((cnt = fread(buf, 1, Min(sizeof(buf), XLogSegSize - len), fp)) > 0)
|
while ((cnt = fread(buf, 1, Min(sizeof(buf), XLogSegSize - len), fp)) > 0)
|
||||||
@ -424,7 +425,17 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* XLogSegSize is a multiple of 512, so no need for padding */
|
/* XLogSegSize is a multiple of 512, so no need for padding */
|
||||||
|
|
||||||
FreeFile(fp);
|
FreeFile(fp);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Mark file as archived, otherwise files can get archived again
|
||||||
|
* after promotion of a new node. This is in line with
|
||||||
|
* walreceiver.c always doing a XLogArchiveForceDone() after a
|
||||||
|
* complete segment.
|
||||||
|
*/
|
||||||
|
StatusFilePath(pathbuf, walFiles[i], ".done");
|
||||||
|
sendFileWithContent(pathbuf, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -447,6 +458,10 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
|
|||||||
errmsg("could not stat file \"%s\": %m", pathbuf)));
|
errmsg("could not stat file \"%s\": %m", pathbuf)));
|
||||||
|
|
||||||
sendFile(pathbuf, pathbuf, &statbuf, false);
|
sendFile(pathbuf, pathbuf, &statbuf, false);
|
||||||
|
|
||||||
|
/* unconditionally mark file as archived */
|
||||||
|
StatusFilePath(pathbuf, fname, ".done");
|
||||||
|
sendFileWithContent(pathbuf, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Send CopyDone message for the last tar file */
|
/* Send CopyDone message for the last tar file */
|
||||||
@ -881,6 +896,15 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces)
|
|||||||
_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
|
_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
|
||||||
}
|
}
|
||||||
size += 512; /* Size of the header just added */
|
size += 512; /* Size of the header just added */
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Also send archive_status directory (by hackishly reusing
|
||||||
|
* statbuf from above ...).
|
||||||
|
*/
|
||||||
|
if (!sizeonly)
|
||||||
|
_tarWriteHeader("./pg_xlog/archive_status", NULL, &statbuf);
|
||||||
|
size += 512; /* Size of the header just added */
|
||||||
|
|
||||||
continue; /* don't recurse into pg_xlog */
|
continue; /* don't recurse into pg_xlog */
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -259,7 +259,7 @@ LogStreamerMain(logstreamer_param *param)
|
|||||||
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
|
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
|
||||||
param->sysidentifier, param->xlogdir,
|
param->sysidentifier, param->xlogdir,
|
||||||
reached_end_position, standby_message_timeout,
|
reached_end_position, standby_message_timeout,
|
||||||
true))
|
true, true))
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Any errors will already have been reported in the function process,
|
* Any errors will already have been reported in the function process,
|
||||||
@ -281,6 +281,7 @@ static void
|
|||||||
StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
|
StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
|
||||||
{
|
{
|
||||||
logstreamer_param *param;
|
logstreamer_param *param;
|
||||||
|
char statusdir[MAXPGPATH];
|
||||||
|
|
||||||
param = xmalloc0(sizeof(logstreamer_param));
|
param = xmalloc0(sizeof(logstreamer_param));
|
||||||
param->timeline = timeline;
|
param->timeline = timeline;
|
||||||
@ -314,13 +315,23 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
|
|||||||
/* Error message already written in GetConnection() */
|
/* Error message already written in GetConnection() */
|
||||||
exit(1);
|
exit(1);
|
||||||
|
|
||||||
/*
|
|
||||||
* Always in plain format, so we can write to basedir/pg_xlog. But the
|
|
||||||
* directory entry in the tar file may arrive later, so make sure it's
|
|
||||||
* created before we start.
|
|
||||||
*/
|
|
||||||
snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
|
snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
|
||||||
verify_dir_is_empty_or_create(param->xlogdir);
|
|
||||||
|
/*
|
||||||
|
* Create pg_xlog/archive_status (and thus pg_xlog) so we can can write to
|
||||||
|
* basedir/pg_xlog as the directory entry in the tar file may arrive
|
||||||
|
* later.
|
||||||
|
*/
|
||||||
|
snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status",
|
||||||
|
basedir);
|
||||||
|
|
||||||
|
if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
|
||||||
|
{
|
||||||
|
fprintf(stderr,
|
||||||
|
_("%s: could not create directory \"%s\": %s\n"),
|
||||||
|
progname, statusdir, strerror(errno));
|
||||||
|
disconnect_and_exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Start a child process and tell it to start streaming. On Unix, this is
|
* Start a child process and tell it to start streaming. On Unix, this is
|
||||||
@ -403,6 +414,23 @@ verify_dir_is_empty_or_create(char *dirname)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Returns whether the string `str' has the postfix `end'.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
pg_str_endswith(const char *str, const char *end)
|
||||||
|
{
|
||||||
|
size_t slen = strlen(str);
|
||||||
|
size_t elen = strlen(end);
|
||||||
|
|
||||||
|
/* can't be a postfix if longer */
|
||||||
|
if (elen > slen)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
/* compare the end of the strings */
|
||||||
|
str += slen - elen;
|
||||||
|
return strcmp(str, end) == 0;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Print a progress report based on the global variables. If verbose output
|
* Print a progress report based on the global variables. If verbose output
|
||||||
@ -835,10 +863,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
|
|||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* When streaming WAL, pg_xlog will have been created
|
* When streaming WAL, pg_xlog will have been created
|
||||||
* by the wal receiver process, so just ignore failure
|
* by the wal receiver process. So just ignore creation
|
||||||
* on that.
|
* failures on related directories.
|
||||||
*/
|
*/
|
||||||
if (!streamwal || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0)
|
if (!((pg_str_endswith(filename, "/pg_xlog") ||
|
||||||
|
pg_str_endswith(filename, "/archive_status")) &&
|
||||||
|
errno == EEXIST))
|
||||||
{
|
{
|
||||||
fprintf(stderr,
|
fprintf(stderr,
|
||||||
_("%s: could not create directory \"%s\": %s\n"),
|
_("%s: could not create directory \"%s\": %s\n"),
|
||||||
|
@ -321,7 +321,7 @@ StreamLog(void)
|
|||||||
progname, startpos.xlogid, startpos.xrecoff, timeline);
|
progname, startpos.xlogid, startpos.xrecoff, timeline);
|
||||||
|
|
||||||
ReceiveXlogStream(conn, startpos, timeline, NULL, basedir,
|
ReceiveXlogStream(conn, startpos, timeline, NULL, basedir,
|
||||||
stop_streaming, standby_message_timeout, false);
|
stop_streaming, standby_message_timeout, false, true);
|
||||||
|
|
||||||
PQfinish(conn);
|
PQfinish(conn);
|
||||||
}
|
}
|
||||||
|
@ -44,6 +44,35 @@ const XLogRecPtr InvalidXLogRecPtr = {0, 0};
|
|||||||
static int walfile = -1;
|
static int walfile = -1;
|
||||||
|
|
||||||
|
|
||||||
|
static bool
|
||||||
|
mark_file_as_archived(const char *basedir, const char *fname)
|
||||||
|
{
|
||||||
|
int fd;
|
||||||
|
static char tmppath[MAXPGPATH];
|
||||||
|
|
||||||
|
snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
|
||||||
|
basedir, fname);
|
||||||
|
|
||||||
|
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
|
||||||
|
if (fd < 0)
|
||||||
|
{
|
||||||
|
fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
|
||||||
|
progname, tmppath, strerror(errno));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fsync(fd) != 0)
|
||||||
|
{
|
||||||
|
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
|
||||||
|
progname, tmppath, strerror(errno));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
close(fd);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Open a new WAL file in the specified directory. Store the name
|
* Open a new WAL file in the specified directory. Store the name
|
||||||
* (not including the full directory) in namebuf. Assumes there is
|
* (not including the full directory) in namebuf. Assumes there is
|
||||||
@ -133,7 +162,8 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
|
|||||||
* completed writing the whole segment.
|
* completed writing the whole segment.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
close_walfile(char *basedir, char *walname, bool segment_complete)
|
close_walfile(char *basedir, char *walname, bool segment_complete,
|
||||||
|
bool mark_done)
|
||||||
{
|
{
|
||||||
off_t currpos = lseek(walfile, 0, SEEK_CUR);
|
off_t currpos = lseek(walfile, 0, SEEK_CUR);
|
||||||
|
|
||||||
@ -184,6 +214,19 @@ close_walfile(char *basedir, char *walname, bool segment_complete)
|
|||||||
_("%s: not renaming \"%s\", segment is not complete\n"),
|
_("%s: not renaming \"%s\", segment is not complete\n"),
|
||||||
progname, walname);
|
progname, walname);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Mark file as archived if requested by the caller - pg_basebackup needs
|
||||||
|
* to do so as files can otherwise get archived again after promotion of a
|
||||||
|
* new node. This is in line with walreceiver.c always doing a
|
||||||
|
* XLogArchiveForceDone() after a complete segment.
|
||||||
|
*/
|
||||||
|
if (currpos == XLOG_SEG_SIZE && mark_done)
|
||||||
|
{
|
||||||
|
/* writes error message if failed */
|
||||||
|
if (!mark_file_as_archived(basedir, walname))
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -284,7 +327,8 @@ bool
|
|||||||
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
|
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
|
||||||
char *sysidentifier, char *basedir,
|
char *sysidentifier, char *basedir,
|
||||||
stream_stop_callback stream_stop,
|
stream_stop_callback stream_stop,
|
||||||
int standby_message_timeout, bool rename_partial)
|
int standby_message_timeout, bool rename_partial,
|
||||||
|
bool mark_done)
|
||||||
{
|
{
|
||||||
char query[128];
|
char query[128];
|
||||||
char current_walfile_name[MAXPGPATH];
|
char current_walfile_name[MAXPGPATH];
|
||||||
@ -343,7 +387,6 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Receive the actual xlog data
|
* Receive the actual xlog data
|
||||||
*/
|
*/
|
||||||
@ -367,7 +410,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
|
|||||||
if (stream_stop && stream_stop(blockpos, timeline, false))
|
if (stream_stop && stream_stop(blockpos, timeline, false))
|
||||||
{
|
{
|
||||||
if (walfile != -1 && !close_walfile(basedir, current_walfile_name,
|
if (walfile != -1 && !close_walfile(basedir, current_walfile_name,
|
||||||
rename_partial))
|
rename_partial, mark_done))
|
||||||
/* Potential error message is written by close_walfile */
|
/* Potential error message is written by close_walfile */
|
||||||
goto error;
|
goto error;
|
||||||
return true;
|
return true;
|
||||||
@ -579,7 +622,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
|
|||||||
/* Did we reach the end of a WAL segment? */
|
/* Did we reach the end of a WAL segment? */
|
||||||
if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
|
if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
|
||||||
{
|
{
|
||||||
if (!close_walfile(basedir, current_walfile_name, false))
|
if (!close_walfile(basedir, current_walfile_name, false,
|
||||||
|
mark_done))
|
||||||
/* Error message written in close_walfile() */
|
/* Error message written in close_walfile() */
|
||||||
goto error;
|
goto error;
|
||||||
|
|
||||||
|
@ -13,4 +13,5 @@ extern bool ReceiveXlogStream(PGconn *conn,
|
|||||||
char *basedir,
|
char *basedir,
|
||||||
stream_stop_callback stream_stop,
|
stream_stop_callback stream_stop,
|
||||||
int standby_message_timeout,
|
int standby_message_timeout,
|
||||||
bool rename_partial);
|
bool rename_partial,
|
||||||
|
bool mark_done);
|
||||||
|
Reference in New Issue
Block a user