diff --git a/src/bin/pg_rewind/libpq_source.c b/src/bin/pg_rewind/libpq_source.c index 997d4e2b482..011c9cce6eb 100644 --- a/src/bin/pg_rewind/libpq_source.c +++ b/src/bin/pg_rewind/libpq_source.c @@ -63,6 +63,7 @@ static void process_queued_fetch_requests(libpq_source *src); /* public interface functions */ static void libpq_traverse_files(rewind_source *source, process_file_callback_t callback); +static void libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len); static void libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off, size_t len); static void libpq_finish_fetch(rewind_source *source); @@ -88,6 +89,7 @@ init_libpq_source(PGconn *conn) src->common.traverse_files = libpq_traverse_files; src->common.fetch_file = libpq_fetch_file; + src->common.queue_fetch_file = libpq_queue_fetch_file; src->common.queue_fetch_range = libpq_queue_fetch_range; src->common.finish_fetch = libpq_finish_fetch; src->common.get_current_wal_insert_lsn = libpq_get_current_wal_insert_lsn; @@ -307,6 +309,36 @@ libpq_traverse_files(rewind_source *source, process_file_callback_t callback) PQclear(res); } +/* + * Queue up a request to fetch a file from remote system. + */ +static void +libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len) +{ + /* + * Truncate the target file immediately, and queue a request to fetch it + * from the source. If the file is small, smaller than MAX_CHUNK_SIZE, + * request fetching a full-sized chunk anyway, so that if the file has + * become larger in the source system, after we scanned the source + * directory, we still fetch the whole file. This only works for files up + * to MAX_CHUNK_SIZE, but that's good enough for small configuration files + * and such that are changed every now and then, but not WAL-logged. For + * larger files, we fetch up to the original size. + * + * Even with that mechanism, there is an inherent race condition if the + * file is modified at the same instant that we're copying it, so that we + * might copy a torn version of the file with one half from the old + * version and another half from the new. But pg_basebackup has the same + * problem, and it hasn't been a problem in practice. + * + * It might seem more natural to truncate the file later, when we receive + * it from the source server, but then we'd need to track which + * fetch-requests are for a whole file. + */ + open_target_file(path, true); + libpq_queue_fetch_range(source, path, 0, Max(len, MAX_CHUNK_SIZE)); +} + /* * Queue up a request to fetch a piece of a file from remote system. */ diff --git a/src/bin/pg_rewind/local_source.c b/src/bin/pg_rewind/local_source.c index 3406fc7037d..58699effcc4 100644 --- a/src/bin/pg_rewind/local_source.c +++ b/src/bin/pg_rewind/local_source.c @@ -29,8 +29,10 @@ static void local_traverse_files(rewind_source *source, process_file_callback_t callback); static char *local_fetch_file(rewind_source *source, const char *path, size_t *filesize); -static void local_fetch_file_range(rewind_source *source, const char *path, - off_t off, size_t len); +static void local_queue_fetch_file(rewind_source *source, const char *path, + size_t len); +static void local_queue_fetch_range(rewind_source *source, const char *path, + off_t off, size_t len); static void local_finish_fetch(rewind_source *source); static void local_destroy(rewind_source *source); @@ -43,7 +45,8 @@ init_local_source(const char *datadir) src->common.traverse_files = local_traverse_files; src->common.fetch_file = local_fetch_file; - src->common.queue_fetch_range = local_fetch_file_range; + src->common.queue_fetch_file = local_queue_fetch_file; + src->common.queue_fetch_range = local_queue_fetch_range; src->common.finish_fetch = local_finish_fetch; src->common.get_current_wal_insert_lsn = NULL; src->common.destroy = local_destroy; @@ -65,12 +68,65 @@ local_fetch_file(rewind_source *source, const char *path, size_t *filesize) return slurpFile(((local_source *) source)->datadir, path, filesize); } +/* + * Copy a file from source to target. + * + * 'len' is the expected length of the file. + */ +static void +local_queue_fetch_file(rewind_source *source, const char *path, size_t len) +{ + const char *datadir = ((local_source *) source)->datadir; + PGAlignedBlock buf; + char srcpath[MAXPGPATH]; + int srcfd; + size_t written_len; + + snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path); + + /* Open source file for reading */ + srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0); + if (srcfd < 0) + pg_fatal("could not open source file \"%s\": %m", + srcpath); + + /* Truncate and open the target file for writing */ + open_target_file(path, true); + + written_len = 0; + for (;;) + { + ssize_t read_len; + + read_len = read(srcfd, buf.data, sizeof(buf)); + + if (read_len < 0) + pg_fatal("could not read file \"%s\": %m", srcpath); + else if (read_len == 0) + break; /* EOF reached */ + + write_target_range(buf.data, written_len, read_len); + written_len += read_len; + } + + /* + * A local source is not expected to change while we're rewinding, so + * check that the size of the file matches our earlier expectation. + */ + if (written_len != len) + pg_fatal("size of source file \"%s\" changed concurrently: " UINT64_FORMAT " bytes expected, " UINT64_FORMAT " copied", + srcpath, len, written_len); + + if (close(srcfd) != 0) + pg_fatal("could not close file \"%s\": %m", srcpath); +} + /* * Copy a file from source to target, starting at 'off', for 'len' bytes. */ static void -local_fetch_file_range(rewind_source *source, const char *path, off_t off, - size_t len) +local_queue_fetch_range(rewind_source *source, const char *path, off_t off, + size_t len) { const char *datadir = ((local_source *) source)->datadir; PGAlignedBlock buf; @@ -94,14 +150,14 @@ local_fetch_file_range(rewind_source *source, const char *path, off_t off, while (end - begin > 0) { ssize_t readlen; - size_t len; + size_t thislen; if (end - begin > sizeof(buf)) - len = sizeof(buf); + thislen = sizeof(buf); else - len = end - begin; + thislen = end - begin; - readlen = read(srcfd, buf.data, len); + readlen = read(srcfd, buf.data, thislen); if (readlen < 0) pg_fatal("could not read file \"%s\": %m", srcpath); @@ -120,7 +176,7 @@ static void local_finish_fetch(rewind_source *source) { /* - * Nothing to do, local_fetch_file_range() copies the ranges immediately. + * Nothing to do, local_queue_fetch_range() copies the ranges immediately. */ } diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c index b39b5c1aacc..6cc44172fbf 100644 --- a/src/bin/pg_rewind/pg_rewind.c +++ b/src/bin/pg_rewind/pg_rewind.c @@ -537,10 +537,7 @@ perform_rewind(filemap_t *filemap, rewind_source *source, break; case FILE_ACTION_COPY: - /* Truncate the old file out of the way, if any */ - open_target_file(entry->path, true); - source->queue_fetch_range(source, entry->path, - 0, entry->source_size); + source->queue_fetch_file(source, entry->path, entry->source_size); break; case FILE_ACTION_TRUNCATE: diff --git a/src/bin/pg_rewind/rewind_source.h b/src/bin/pg_rewind/rewind_source.h index 78a12eb0f33..1310e86e75a 100644 --- a/src/bin/pg_rewind/rewind_source.h +++ b/src/bin/pg_rewind/rewind_source.h @@ -47,6 +47,19 @@ typedef struct rewind_source void (*queue_fetch_range) (struct rewind_source *, const char *path, off_t offset, size_t len); + /* + * Like queue_fetch_range(), but requests replacing the whole local file + * from the source system. 'len' is the expected length of the file, + * although when the source is a live server, the file may change + * concurrently. The implementation is not obliged to copy more than 'len' + * bytes, even if the file is larger. However, to avoid copying a + * truncated version of the file, which can cause trouble if e.g. a + * configuration file is modified concurrently, the implementation should + * try to copy the whole file, even if it's larger than expected. + */ + void (*queue_fetch_file) (struct rewind_source *, const char *path, + size_t len); + /* * Execute all requests queued up with queue_fetch_range(). */ diff --git a/src/bin/pg_rewind/t/009_growing_files.pl b/src/bin/pg_rewind/t/009_growing_files.pl new file mode 100644 index 00000000000..752781e637c --- /dev/null +++ b/src/bin/pg_rewind/t/009_growing_files.pl @@ -0,0 +1,76 @@ + +# Copyright (c) 2021-2022, PostgreSQL Global Development Group + +use strict; +use warnings; +use PostgreSQL::Test::Utils; +use Test::More; + +use FindBin; +use lib $FindBin::RealBin; + +use RewindTest; + +RewindTest::setup_cluster("local"); +RewindTest::start_primary(); + +# Create a test table and insert a row in primary. +primary_psql("CREATE TABLE tbl1 (d text)"); +primary_psql("INSERT INTO tbl1 VALUES ('in primary')"); +primary_psql("CHECKPOINT"); + +RewindTest::create_standby("local"); + +# Insert additional data on primary that will be replicated to standby +primary_psql("INSERT INTO tbl1 values ('in primary, before promotion')"); +primary_psql('CHECKPOINT'); + +RewindTest::promote_standby(); + +# Insert a row in the old primary. This causes the primary and standby to have +# "diverged", it's no longer possible to just apply the standy's logs over +# primary directory - you need to rewind. Also insert a new row in the +# standby, which won't be present in the old primary. +primary_psql("INSERT INTO tbl1 VALUES ('in primary, after promotion')"); +standby_psql("INSERT INTO tbl1 VALUES ('in standby, after promotion')"); + +# Stop the nodes before running pg_rewind +$node_standby->stop; +$node_primary->stop; + +my $primary_pgdata = $node_primary->data_dir; +my $standby_pgdata = $node_standby->data_dir; + +# Add an extra file that we can tamper with without interfering with the data +# directory data files. +mkdir "$standby_pgdata/tst_both_dir"; +append_to_file "$standby_pgdata/tst_both_dir/file1", 'a'; + +# Run pg_rewind and pipe the output from the run into the extra file we want +# to copy. This will ensure that the file is continously growing during the +# copy operation and the result will be an error. +my $ret = run_log( + [ + 'pg_rewind', '--debug', + '--source-pgdata', $standby_pgdata, + '--target-pgdata', $primary_pgdata, + '--no-sync', + ], + '2>>', "$standby_pgdata/tst_both_dir/file1"); +ok(!$ret, 'Error out on copying growing file'); + +# Ensure that the files are of different size, the final error message should +# only be in one of them making them guaranteed to be different +my $primary_size = -s "$primary_pgdata/tst_both_dir/file1"; +my $standby_size = -s "$standby_pgdata/tst_both_dir/file1"; +isnt($standby_size, $primary_size, "File sizes should differ"); + +# Extract the last line from the verbose output as that should have the error +# message for the unexpected file size +my $last; +open my $f, '<', "$standby_pgdata/tst_both_dir/file1"; +$last = $_ while (<$f>); +close $f; +like($last, qr/fatal: size of source file/, "Check error message"); + +done_testing();