mirror of
https://github.com/postgres/postgres.git
synced 2025-07-30 11:03:19 +03:00
pg_rewind: Fetch small files according to new size.
There's a race condition if a file changes in the source system after we have collected the file list. If the file becomes larger, we only fetched up to its original size. That can easily result in a truncated file. That's not a problem for relation files, files in pg_xact, etc. because any actions on them will be replayed from the WAL. However, configuration files are affected. This commit mitigates the race condition by fetching small files in whole, even if they have grown. A test is added in which an extra file copied is concurrently grown with the output of pg_rewind thus guaranteeing it to have changed in size during the operation. This is not a full fix: we still believe the original file size for files larger than 1 MB. That should be enough for configuration files, and doing more than that would require big changes to the chunking logic in libpq_source.c. This mitigates the race condition if the file is modified between the original scan of files and copying the file, but there's still a race condition if a file is changed while it's being copied. That's a much smaller window, though, and pg_basebackup has the same issue. This race can be seen with pg_auto_failover, which frequently uses ALTER SYSTEM, which updates postgresql.auto.conf. Often, pg_rewind will fail, because the postgresql.auto.conf file changed concurrently and a partial version of it was copied to the target. The partial file would fail to parse, preventing the server from starting up. Author: Heikki Linnakangas Reviewed-by: Cary Huang Discussion: https://postgr.es/m/f67feb24-5833-88cb-1020-19a4a2b83ac7%40iki.fi
This commit is contained in:
@ -29,8 +29,10 @@ static void local_traverse_files(rewind_source *source,
|
||||
process_file_callback_t callback);
|
||||
static char *local_fetch_file(rewind_source *source, const char *path,
|
||||
size_t *filesize);
|
||||
static void local_fetch_file_range(rewind_source *source, const char *path,
|
||||
off_t off, size_t len);
|
||||
static void local_queue_fetch_file(rewind_source *source, const char *path,
|
||||
size_t len);
|
||||
static void local_queue_fetch_range(rewind_source *source, const char *path,
|
||||
off_t off, size_t len);
|
||||
static void local_finish_fetch(rewind_source *source);
|
||||
static void local_destroy(rewind_source *source);
|
||||
|
||||
@ -43,7 +45,8 @@ init_local_source(const char *datadir)
|
||||
|
||||
src->common.traverse_files = local_traverse_files;
|
||||
src->common.fetch_file = local_fetch_file;
|
||||
src->common.queue_fetch_range = local_fetch_file_range;
|
||||
src->common.queue_fetch_file = local_queue_fetch_file;
|
||||
src->common.queue_fetch_range = local_queue_fetch_range;
|
||||
src->common.finish_fetch = local_finish_fetch;
|
||||
src->common.get_current_wal_insert_lsn = NULL;
|
||||
src->common.destroy = local_destroy;
|
||||
@ -65,12 +68,65 @@ local_fetch_file(rewind_source *source, const char *path, size_t *filesize)
|
||||
return slurpFile(((local_source *) source)->datadir, path, filesize);
|
||||
}
|
||||
|
||||
/*
|
||||
* Copy a file from source to target.
|
||||
*
|
||||
* 'len' is the expected length of the file.
|
||||
*/
|
||||
static void
|
||||
local_queue_fetch_file(rewind_source *source, const char *path, size_t len)
|
||||
{
|
||||
const char *datadir = ((local_source *) source)->datadir;
|
||||
PGAlignedBlock buf;
|
||||
char srcpath[MAXPGPATH];
|
||||
int srcfd;
|
||||
size_t written_len;
|
||||
|
||||
snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
|
||||
|
||||
/* Open source file for reading */
|
||||
srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
|
||||
if (srcfd < 0)
|
||||
pg_fatal("could not open source file \"%s\": %m",
|
||||
srcpath);
|
||||
|
||||
/* Truncate and open the target file for writing */
|
||||
open_target_file(path, true);
|
||||
|
||||
written_len = 0;
|
||||
for (;;)
|
||||
{
|
||||
ssize_t read_len;
|
||||
|
||||
read_len = read(srcfd, buf.data, sizeof(buf));
|
||||
|
||||
if (read_len < 0)
|
||||
pg_fatal("could not read file \"%s\": %m", srcpath);
|
||||
else if (read_len == 0)
|
||||
break; /* EOF reached */
|
||||
|
||||
write_target_range(buf.data, written_len, read_len);
|
||||
written_len += read_len;
|
||||
}
|
||||
|
||||
/*
|
||||
* A local source is not expected to change while we're rewinding, so
|
||||
* check that the size of the file matches our earlier expectation.
|
||||
*/
|
||||
if (written_len != len)
|
||||
pg_fatal("size of source file \"%s\" changed concurrently: " UINT64_FORMAT " bytes expected, " UINT64_FORMAT " copied",
|
||||
srcpath, len, written_len);
|
||||
|
||||
if (close(srcfd) != 0)
|
||||
pg_fatal("could not close file \"%s\": %m", srcpath);
|
||||
}
|
||||
|
||||
/*
|
||||
* Copy a file from source to target, starting at 'off', for 'len' bytes.
|
||||
*/
|
||||
static void
|
||||
local_fetch_file_range(rewind_source *source, const char *path, off_t off,
|
||||
size_t len)
|
||||
local_queue_fetch_range(rewind_source *source, const char *path, off_t off,
|
||||
size_t len)
|
||||
{
|
||||
const char *datadir = ((local_source *) source)->datadir;
|
||||
PGAlignedBlock buf;
|
||||
@ -94,14 +150,14 @@ local_fetch_file_range(rewind_source *source, const char *path, off_t off,
|
||||
while (end - begin > 0)
|
||||
{
|
||||
ssize_t readlen;
|
||||
size_t len;
|
||||
size_t thislen;
|
||||
|
||||
if (end - begin > sizeof(buf))
|
||||
len = sizeof(buf);
|
||||
thislen = sizeof(buf);
|
||||
else
|
||||
len = end - begin;
|
||||
thislen = end - begin;
|
||||
|
||||
readlen = read(srcfd, buf.data, len);
|
||||
readlen = read(srcfd, buf.data, thislen);
|
||||
|
||||
if (readlen < 0)
|
||||
pg_fatal("could not read file \"%s\": %m", srcpath);
|
||||
@ -120,7 +176,7 @@ static void
|
||||
local_finish_fetch(rewind_source *source)
|
||||
{
|
||||
/*
|
||||
* Nothing to do, local_fetch_file_range() copies the ranges immediately.
|
||||
* Nothing to do, local_queue_fetch_range() copies the ranges immediately.
|
||||
*/
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user