mirror of
https://github.com/postgres/postgres.git
synced 2025-08-21 10:42:50 +03:00
Add support for incremental backup.
To take an incremental backup, you use the new replication command UPLOAD_MANIFEST to upload the manifest for the prior backup. This prior backup could either be a full backup or another incremental backup. You then use BASE_BACKUP with the INCREMENTAL option to take the backup. pg_basebackup now has an --incremental=PATH_TO_MANIFEST option to trigger this behavior. An incremental backup is like a regular full backup except that some relation files are replaced with files with names like INCREMENTAL.${ORIGINAL_NAME}, and the backup_label file contains additional lines identifying it as an incremental backup. The new pg_combinebackup tool can be used to reconstruct a data directory from a full backup and a series of incremental backups. Patch by me. Reviewed by Matthias van de Meent, Dilip Kumar, Jakub Wartak, Peter Eisentraut, and Álvaro Herrera. Thanks especially to Jakub for incredibly helpful and extensive testing. Discussion: http://postgr.es/m/CA+TgmoYOYZfMCyOXFyC-P+-mdrZqm5pP2N7S-r0z3_402h9rsA@mail.gmail.com
This commit is contained in:
@@ -20,8 +20,10 @@
|
||||
#include "access/xlogbackup.h"
|
||||
#include "backup/backup_manifest.h"
|
||||
#include "backup/basebackup.h"
|
||||
#include "backup/basebackup_incremental.h"
|
||||
#include "backup/basebackup_sink.h"
|
||||
#include "backup/basebackup_target.h"
|
||||
#include "catalog/pg_tablespace_d.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "common/compression.h"
|
||||
#include "common/file_perm.h"
|
||||
@@ -33,6 +35,7 @@
|
||||
#include "pgtar.h"
|
||||
#include "port.h"
|
||||
#include "postmaster/syslogger.h"
|
||||
#include "postmaster/walsummarizer.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "replication/walsender_private.h"
|
||||
#include "storage/bufpage.h"
|
||||
@@ -64,6 +67,7 @@ typedef struct
|
||||
bool fastcheckpoint;
|
||||
bool nowait;
|
||||
bool includewal;
|
||||
bool incremental;
|
||||
uint32 maxrate;
|
||||
bool sendtblspcmapfile;
|
||||
bool send_to_client;
|
||||
@@ -76,21 +80,28 @@ typedef struct
|
||||
} basebackup_options;
|
||||
|
||||
static int64 sendTablespace(bbsink *sink, char *path, Oid spcoid, bool sizeonly,
|
||||
struct backup_manifest_info *manifest);
|
||||
struct backup_manifest_info *manifest,
|
||||
IncrementalBackupInfo *ib);
|
||||
static int64 sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
|
||||
List *tablespaces, bool sendtblspclinks,
|
||||
backup_manifest_info *manifest, Oid spcoid);
|
||||
backup_manifest_info *manifest, Oid spcoid,
|
||||
IncrementalBackupInfo *ib);
|
||||
static bool sendFile(bbsink *sink, const char *readfilename, const char *tarfilename,
|
||||
struct stat *statbuf, bool missing_ok,
|
||||
Oid dboid, Oid spcoid, RelFileNumber relfilenumber,
|
||||
unsigned segno,
|
||||
backup_manifest_info *manifest);
|
||||
backup_manifest_info *manifest,
|
||||
unsigned num_incremental_blocks,
|
||||
BlockNumber *incremental_blocks,
|
||||
unsigned truncation_block_length);
|
||||
static off_t read_file_data_into_buffer(bbsink *sink,
|
||||
const char *readfilename, int fd,
|
||||
off_t offset, size_t length,
|
||||
BlockNumber blkno,
|
||||
bool verify_checksum,
|
||||
int *checksum_failures);
|
||||
static void push_to_sink(bbsink *sink, pg_checksum_context *checksum_ctx,
|
||||
size_t *bytes_done, void *data, size_t length);
|
||||
static bool verify_page_checksum(Page page, XLogRecPtr start_lsn,
|
||||
BlockNumber blkno,
|
||||
uint16 *expected_checksum);
|
||||
@@ -102,7 +113,8 @@ static int64 _tarWriteHeader(bbsink *sink, const char *filename,
|
||||
bool sizeonly);
|
||||
static void _tarWritePadding(bbsink *sink, int len);
|
||||
static void convert_link_to_directory(const char *pathbuf, struct stat *statbuf);
|
||||
static void perform_base_backup(basebackup_options *opt, bbsink *sink);
|
||||
static void perform_base_backup(basebackup_options *opt, bbsink *sink,
|
||||
IncrementalBackupInfo *ib);
|
||||
static void parse_basebackup_options(List *options, basebackup_options *opt);
|
||||
static int compareWalFileNames(const ListCell *a, const ListCell *b);
|
||||
static int basebackup_read_file(int fd, char *buf, size_t nbytes, off_t offset,
|
||||
@@ -220,7 +232,8 @@ static const struct exclude_list_item excludeFiles[] =
|
||||
* clobbered by longjmp" from stupider versions of gcc.
|
||||
*/
|
||||
static void
|
||||
perform_base_backup(basebackup_options *opt, bbsink *sink)
|
||||
perform_base_backup(basebackup_options *opt, bbsink *sink,
|
||||
IncrementalBackupInfo *ib)
|
||||
{
|
||||
bbsink_state state;
|
||||
XLogRecPtr endptr;
|
||||
@@ -270,6 +283,10 @@ perform_base_backup(basebackup_options *opt, bbsink *sink)
|
||||
ListCell *lc;
|
||||
tablespaceinfo *newti;
|
||||
|
||||
/* If this is an incremental backup, execute preparatory steps. */
|
||||
if (ib != NULL)
|
||||
PrepareForIncrementalBackup(ib, backup_state);
|
||||
|
||||
/* Add a node for the base directory at the end */
|
||||
newti = palloc0(sizeof(tablespaceinfo));
|
||||
newti->size = -1;
|
||||
@@ -289,10 +306,10 @@ perform_base_backup(basebackup_options *opt, bbsink *sink)
|
||||
|
||||
if (tmp->path == NULL)
|
||||
tmp->size = sendDir(sink, ".", 1, true, state.tablespaces,
|
||||
true, NULL, InvalidOid);
|
||||
true, NULL, InvalidOid, NULL);
|
||||
else
|
||||
tmp->size = sendTablespace(sink, tmp->path, tmp->oid, true,
|
||||
NULL);
|
||||
NULL, NULL);
|
||||
state.bytes_total += tmp->size;
|
||||
}
|
||||
state.bytes_total_is_valid = true;
|
||||
@@ -330,7 +347,7 @@ perform_base_backup(basebackup_options *opt, bbsink *sink)
|
||||
|
||||
/* Then the bulk of the files... */
|
||||
sendDir(sink, ".", 1, false, state.tablespaces,
|
||||
sendtblspclinks, &manifest, InvalidOid);
|
||||
sendtblspclinks, &manifest, InvalidOid, ib);
|
||||
|
||||
/* ... and pg_control after everything else. */
|
||||
if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
|
||||
@@ -340,7 +357,7 @@ perform_base_backup(basebackup_options *opt, bbsink *sink)
|
||||
XLOG_CONTROL_FILE)));
|
||||
sendFile(sink, XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf,
|
||||
false, InvalidOid, InvalidOid,
|
||||
InvalidRelFileNumber, 0, &manifest);
|
||||
InvalidRelFileNumber, 0, &manifest, 0, NULL, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -348,7 +365,7 @@ perform_base_backup(basebackup_options *opt, bbsink *sink)
|
||||
|
||||
bbsink_begin_archive(sink, archive_name);
|
||||
|
||||
sendTablespace(sink, ti->path, ti->oid, false, &manifest);
|
||||
sendTablespace(sink, ti->path, ti->oid, false, &manifest, ib);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -610,7 +627,7 @@ perform_base_backup(basebackup_options *opt, bbsink *sink)
|
||||
|
||||
sendFile(sink, pathbuf, pathbuf, &statbuf, false,
|
||||
InvalidOid, InvalidOid, InvalidRelFileNumber, 0,
|
||||
&manifest);
|
||||
&manifest, 0, NULL, 0);
|
||||
|
||||
/* unconditionally mark file as archived */
|
||||
StatusFilePath(pathbuf, fname, ".done");
|
||||
@@ -686,6 +703,7 @@ parse_basebackup_options(List *options, basebackup_options *opt)
|
||||
bool o_checkpoint = false;
|
||||
bool o_nowait = false;
|
||||
bool o_wal = false;
|
||||
bool o_incremental = false;
|
||||
bool o_maxrate = false;
|
||||
bool o_tablespace_map = false;
|
||||
bool o_noverify_checksums = false;
|
||||
@@ -764,6 +782,20 @@ parse_basebackup_options(List *options, basebackup_options *opt)
|
||||
opt->includewal = defGetBoolean(defel);
|
||||
o_wal = true;
|
||||
}
|
||||
else if (strcmp(defel->defname, "incremental") == 0)
|
||||
{
|
||||
if (o_incremental)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("duplicate option \"%s\"", defel->defname)));
|
||||
opt->incremental = defGetBoolean(defel);
|
||||
if (opt->incremental && !summarize_wal)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("incremental backups cannot be taken unless WAL summarization is enabled")));
|
||||
opt->incremental = defGetBoolean(defel);
|
||||
o_incremental = true;
|
||||
}
|
||||
else if (strcmp(defel->defname, "max_rate") == 0)
|
||||
{
|
||||
int64 maxrate;
|
||||
@@ -956,7 +988,7 @@ parse_basebackup_options(List *options, basebackup_options *opt)
|
||||
* the filesystem, bypassing the buffer cache.
|
||||
*/
|
||||
void
|
||||
SendBaseBackup(BaseBackupCmd *cmd)
|
||||
SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
|
||||
{
|
||||
basebackup_options opt;
|
||||
bbsink *sink;
|
||||
@@ -980,6 +1012,20 @@ SendBaseBackup(BaseBackupCmd *cmd)
|
||||
set_ps_display(activitymsg);
|
||||
}
|
||||
|
||||
/*
|
||||
* If we're asked to perform an incremental backup and the user has not
|
||||
* supplied a manifest, that's an ERROR.
|
||||
*
|
||||
* If we're asked to perform a full backup and the user did supply a
|
||||
* manifest, just ignore it.
|
||||
*/
|
||||
if (!opt.incremental)
|
||||
ib = NULL;
|
||||
else if (ib == NULL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("must UPLOAD_MANIFEST before performing an incremental BASE_BACKUP")));
|
||||
|
||||
/*
|
||||
* If the target is specifically 'client' then set up to stream the backup
|
||||
* to the client; otherwise, it's being sent someplace else and should not
|
||||
@@ -1011,7 +1057,7 @@ SendBaseBackup(BaseBackupCmd *cmd)
|
||||
*/
|
||||
PG_TRY();
|
||||
{
|
||||
perform_base_backup(&opt, sink);
|
||||
perform_base_backup(&opt, sink, ib);
|
||||
}
|
||||
PG_FINALLY();
|
||||
{
|
||||
@@ -1089,7 +1135,7 @@ sendFileWithContent(bbsink *sink, const char *filename, const char *content,
|
||||
*/
|
||||
static int64
|
||||
sendTablespace(bbsink *sink, char *path, Oid spcoid, bool sizeonly,
|
||||
backup_manifest_info *manifest)
|
||||
backup_manifest_info *manifest, IncrementalBackupInfo *ib)
|
||||
{
|
||||
int64 size;
|
||||
char pathbuf[MAXPGPATH];
|
||||
@@ -1123,7 +1169,7 @@ sendTablespace(bbsink *sink, char *path, Oid spcoid, bool sizeonly,
|
||||
|
||||
/* Send all the files in the tablespace version directory */
|
||||
size += sendDir(sink, pathbuf, strlen(path), sizeonly, NIL, true, manifest,
|
||||
spcoid);
|
||||
spcoid, ib);
|
||||
|
||||
return size;
|
||||
}
|
||||
@@ -1143,7 +1189,7 @@ sendTablespace(bbsink *sink, char *path, Oid spcoid, bool sizeonly,
|
||||
static int64
|
||||
sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
|
||||
List *tablespaces, bool sendtblspclinks, backup_manifest_info *manifest,
|
||||
Oid spcoid)
|
||||
Oid spcoid, IncrementalBackupInfo *ib)
|
||||
{
|
||||
DIR *dir;
|
||||
struct dirent *de;
|
||||
@@ -1152,7 +1198,16 @@ sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
|
||||
int64 size = 0;
|
||||
const char *lastDir; /* Split last dir from parent path. */
|
||||
bool isRelationDir = false; /* Does directory contain relations? */
|
||||
bool isGlobalDir = false;
|
||||
Oid dboid = InvalidOid;
|
||||
BlockNumber *relative_block_numbers = NULL;
|
||||
|
||||
/*
|
||||
* Since this array is relatively large, avoid putting it on the stack.
|
||||
* But we don't need it at all if this is not an incremental backup.
|
||||
*/
|
||||
if (ib != NULL)
|
||||
relative_block_numbers = palloc(sizeof(BlockNumber) * RELSEG_SIZE);
|
||||
|
||||
/*
|
||||
* Determine if the current path is a database directory that can contain
|
||||
@@ -1185,7 +1240,10 @@ sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
|
||||
}
|
||||
}
|
||||
else if (strcmp(path, "./global") == 0)
|
||||
{
|
||||
isRelationDir = true;
|
||||
isGlobalDir = true;
|
||||
}
|
||||
|
||||
dir = AllocateDir(path);
|
||||
while ((de = ReadDir(dir, path)) != NULL)
|
||||
@@ -1334,11 +1392,13 @@ sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
|
||||
&statbuf, sizeonly);
|
||||
|
||||
/*
|
||||
* Also send archive_status directory (by hackishly reusing
|
||||
* statbuf from above ...).
|
||||
* Also send archive_status and summaries directories (by
|
||||
* hackishly reusing statbuf from above ...).
|
||||
*/
|
||||
size += _tarWriteHeader(sink, "./pg_wal/archive_status", NULL,
|
||||
&statbuf, sizeonly);
|
||||
size += _tarWriteHeader(sink, "./pg_wal/summaries", NULL,
|
||||
&statbuf, sizeonly);
|
||||
|
||||
continue; /* don't recurse into pg_wal */
|
||||
}
|
||||
@@ -1407,16 +1467,64 @@ sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
|
||||
|
||||
if (!skip_this_dir)
|
||||
size += sendDir(sink, pathbuf, basepathlen, sizeonly, tablespaces,
|
||||
sendtblspclinks, manifest, spcoid);
|
||||
sendtblspclinks, manifest, spcoid, ib);
|
||||
}
|
||||
else if (S_ISREG(statbuf.st_mode))
|
||||
{
|
||||
bool sent = false;
|
||||
unsigned num_blocks_required = 0;
|
||||
unsigned truncation_block_length = 0;
|
||||
char tarfilenamebuf[MAXPGPATH * 2];
|
||||
char *tarfilename = pathbuf + basepathlen + 1;
|
||||
FileBackupMethod method = BACK_UP_FILE_FULLY;
|
||||
|
||||
if (ib != NULL && isRelationFile)
|
||||
{
|
||||
Oid relspcoid;
|
||||
char *lookup_path;
|
||||
|
||||
if (OidIsValid(spcoid))
|
||||
{
|
||||
relspcoid = spcoid;
|
||||
lookup_path = psprintf("pg_tblspc/%u/%s", spcoid,
|
||||
tarfilename);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (isGlobalDir)
|
||||
relspcoid = GLOBALTABLESPACE_OID;
|
||||
else
|
||||
relspcoid = DEFAULTTABLESPACE_OID;
|
||||
lookup_path = pstrdup(tarfilename);
|
||||
}
|
||||
|
||||
method = GetFileBackupMethod(ib, lookup_path, dboid, relspcoid,
|
||||
relfilenumber, relForkNum,
|
||||
segno, statbuf.st_size,
|
||||
&num_blocks_required,
|
||||
relative_block_numbers,
|
||||
&truncation_block_length);
|
||||
if (method == BACK_UP_FILE_INCREMENTALLY)
|
||||
{
|
||||
statbuf.st_size =
|
||||
GetIncrementalFileSize(num_blocks_required);
|
||||
snprintf(tarfilenamebuf, sizeof(tarfilenamebuf),
|
||||
"%s/INCREMENTAL.%s",
|
||||
path + basepathlen + 1,
|
||||
de->d_name);
|
||||
tarfilename = tarfilenamebuf;
|
||||
}
|
||||
|
||||
pfree(lookup_path);
|
||||
}
|
||||
|
||||
if (!sizeonly)
|
||||
sent = sendFile(sink, pathbuf, pathbuf + basepathlen + 1, &statbuf,
|
||||
sent = sendFile(sink, pathbuf, tarfilename, &statbuf,
|
||||
true, dboid, spcoid,
|
||||
relfilenumber, segno, manifest);
|
||||
relfilenumber, segno, manifest,
|
||||
num_blocks_required,
|
||||
method == BACK_UP_FILE_INCREMENTALLY ? relative_block_numbers : NULL,
|
||||
truncation_block_length);
|
||||
|
||||
if (sent || sizeonly)
|
||||
{
|
||||
@@ -1434,6 +1542,10 @@ sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
|
||||
ereport(WARNING,
|
||||
(errmsg("skipping special file \"%s\"", pathbuf)));
|
||||
}
|
||||
|
||||
if (relative_block_numbers != NULL)
|
||||
pfree(relative_block_numbers);
|
||||
|
||||
FreeDir(dir);
|
||||
return size;
|
||||
}
|
||||
@@ -1446,6 +1558,12 @@ sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
|
||||
* If dboid is anything other than InvalidOid then any checksum failures
|
||||
* detected will get reported to the cumulative stats system.
|
||||
*
|
||||
* If the file is to be sent incrementally, then num_incremental_blocks
|
||||
* should be the number of blocks to be sent, and incremental_blocks
|
||||
* an array of block numbers relative to the start of the current segment.
|
||||
* If the whole file is to be sent, then incremental_blocks should be NULL,
|
||||
* and num_incremental_blocks can have any value, as it will be ignored.
|
||||
*
|
||||
* Returns true if the file was successfully sent, false if 'missing_ok',
|
||||
* and the file did not exist.
|
||||
*/
|
||||
@@ -1453,7 +1571,8 @@ static bool
|
||||
sendFile(bbsink *sink, const char *readfilename, const char *tarfilename,
|
||||
struct stat *statbuf, bool missing_ok, Oid dboid, Oid spcoid,
|
||||
RelFileNumber relfilenumber, unsigned segno,
|
||||
backup_manifest_info *manifest)
|
||||
backup_manifest_info *manifest, unsigned num_incremental_blocks,
|
||||
BlockNumber *incremental_blocks, unsigned truncation_block_length)
|
||||
{
|
||||
int fd;
|
||||
BlockNumber blkno = 0;
|
||||
@@ -1462,6 +1581,7 @@ sendFile(bbsink *sink, const char *readfilename, const char *tarfilename,
|
||||
pgoff_t bytes_done = 0;
|
||||
bool verify_checksum = false;
|
||||
pg_checksum_context checksum_ctx;
|
||||
int ibindex = 0;
|
||||
|
||||
if (pg_checksum_init(&checksum_ctx, manifest->checksum_type) < 0)
|
||||
elog(ERROR, "could not initialize checksum of file \"%s\"",
|
||||
@@ -1494,22 +1614,111 @@ sendFile(bbsink *sink, const char *readfilename, const char *tarfilename,
|
||||
RelFileNumberIsValid(relfilenumber))
|
||||
verify_checksum = true;
|
||||
|
||||
/*
|
||||
* If we're sending an incremental file, write the file header.
|
||||
*/
|
||||
if (incremental_blocks != NULL)
|
||||
{
|
||||
unsigned magic = INCREMENTAL_MAGIC;
|
||||
size_t header_bytes_done = 0;
|
||||
|
||||
/* Emit header data. */
|
||||
push_to_sink(sink, &checksum_ctx, &header_bytes_done,
|
||||
&magic, sizeof(magic));
|
||||
push_to_sink(sink, &checksum_ctx, &header_bytes_done,
|
||||
&num_incremental_blocks, sizeof(num_incremental_blocks));
|
||||
push_to_sink(sink, &checksum_ctx, &header_bytes_done,
|
||||
&truncation_block_length, sizeof(truncation_block_length));
|
||||
push_to_sink(sink, &checksum_ctx, &header_bytes_done,
|
||||
incremental_blocks,
|
||||
sizeof(BlockNumber) * num_incremental_blocks);
|
||||
|
||||
/* Flush out any data still in the buffer so it's again empty. */
|
||||
if (header_bytes_done > 0)
|
||||
{
|
||||
bbsink_archive_contents(sink, header_bytes_done);
|
||||
if (pg_checksum_update(&checksum_ctx,
|
||||
(uint8 *) sink->bbs_buffer,
|
||||
header_bytes_done) < 0)
|
||||
elog(ERROR, "could not update checksum of base backup");
|
||||
}
|
||||
|
||||
/* Update our notion of file position. */
|
||||
bytes_done += sizeof(magic);
|
||||
bytes_done += sizeof(num_incremental_blocks);
|
||||
bytes_done += sizeof(truncation_block_length);
|
||||
bytes_done += sizeof(BlockNumber) * num_incremental_blocks;
|
||||
}
|
||||
|
||||
/*
|
||||
* Loop until we read the amount of data the caller told us to expect. The
|
||||
* file could be longer, if it was extended while we were sending it, but
|
||||
* for a base backup we can ignore such extended data. It will be restored
|
||||
* from WAL.
|
||||
*/
|
||||
while (bytes_done < statbuf->st_size)
|
||||
while (1)
|
||||
{
|
||||
size_t remaining = statbuf->st_size - bytes_done;
|
||||
/*
|
||||
* Determine whether we've read all the data that we need, and if not,
|
||||
* read some more.
|
||||
*/
|
||||
if (incremental_blocks == NULL)
|
||||
{
|
||||
size_t remaining = statbuf->st_size - bytes_done;
|
||||
|
||||
/* Try to read some more data. */
|
||||
cnt = read_file_data_into_buffer(sink, readfilename, fd, bytes_done,
|
||||
remaining,
|
||||
blkno + segno * RELSEG_SIZE,
|
||||
verify_checksum,
|
||||
&checksum_failures);
|
||||
/*
|
||||
* If we've read the required number of bytes, then it's time to
|
||||
* stop.
|
||||
*/
|
||||
if (bytes_done >= statbuf->st_size)
|
||||
break;
|
||||
|
||||
/*
|
||||
* Read as many bytes as will fit in the buffer, or however many
|
||||
* are left to read, whichever is less.
|
||||
*/
|
||||
cnt = read_file_data_into_buffer(sink, readfilename, fd,
|
||||
bytes_done, remaining,
|
||||
blkno + segno * RELSEG_SIZE,
|
||||
verify_checksum,
|
||||
&checksum_failures);
|
||||
}
|
||||
else
|
||||
{
|
||||
BlockNumber relative_blkno;
|
||||
|
||||
/*
|
||||
* If we've read all the blocks, then it's time to stop.
|
||||
*/
|
||||
if (ibindex >= num_incremental_blocks)
|
||||
break;
|
||||
|
||||
/*
|
||||
* Read just one block, whichever one is the next that we're
|
||||
* supposed to include.
|
||||
*/
|
||||
relative_blkno = incremental_blocks[ibindex++];
|
||||
cnt = read_file_data_into_buffer(sink, readfilename, fd,
|
||||
relative_blkno * BLCKSZ,
|
||||
BLCKSZ,
|
||||
relative_blkno + segno * RELSEG_SIZE,
|
||||
verify_checksum,
|
||||
&checksum_failures);
|
||||
|
||||
/*
|
||||
* If we get a partial read, that must mean that the relation is
|
||||
* being truncated. Ultimately, it should be truncated to a
|
||||
* multiple of BLCKSZ, since this path should only be reached for
|
||||
* relation files, but we might transiently observe an
|
||||
* intermediate value.
|
||||
*
|
||||
* It should be fine to treat this just as if the entire block had
|
||||
* been truncated away - i.e. fill this and all later blocks with
|
||||
* zeroes. WAL replay will fix things up.
|
||||
*/
|
||||
if (cnt < BLCKSZ)
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* If the amount of data we were able to read was not a multiple of
|
||||
@@ -1692,6 +1901,56 @@ read_file_data_into_buffer(bbsink *sink, const char *readfilename, int fd,
|
||||
return cnt;
|
||||
}
|
||||
|
||||
/*
|
||||
* Push data into a bbsink.
|
||||
*
|
||||
* It's better, when possible, to read data directly into the bbsink's buffer,
|
||||
* rather than using this function to copy it into the buffer; this function is
|
||||
* for cases where that approach is not practical.
|
||||
*
|
||||
* bytes_done should point to a count of the number of bytes that are
|
||||
* currently used in the bbsink's buffer. Upon return, the bytes identified by
|
||||
* data and length will have been copied into the bbsink's buffer, flushing
|
||||
* as required, and *bytes_done will have been updated accordingly. If the
|
||||
* buffer was flushed, the previous contents will also have been fed to
|
||||
* checksum_ctx.
|
||||
*
|
||||
* Note that after one or more calls to this function it is the caller's
|
||||
* responsibility to perform any required final flush.
|
||||
*/
|
||||
static void
|
||||
push_to_sink(bbsink *sink, pg_checksum_context *checksum_ctx,
|
||||
size_t *bytes_done, void *data, size_t length)
|
||||
{
|
||||
while (length > 0)
|
||||
{
|
||||
size_t bytes_to_copy;
|
||||
|
||||
/*
|
||||
* We use < here rather than <= so that if the data exactly fills the
|
||||
* remaining buffer space, we trigger a flush now.
|
||||
*/
|
||||
if (length < sink->bbs_buffer_length - *bytes_done)
|
||||
{
|
||||
/* Append remaining data to buffer. */
|
||||
memcpy(sink->bbs_buffer + *bytes_done, data, length);
|
||||
*bytes_done += length;
|
||||
return;
|
||||
}
|
||||
|
||||
/* Copy until buffer is full and flush it. */
|
||||
bytes_to_copy = sink->bbs_buffer_length - *bytes_done;
|
||||
memcpy(sink->bbs_buffer + *bytes_done, data, bytes_to_copy);
|
||||
data = ((char *) data) + bytes_to_copy;
|
||||
length -= bytes_to_copy;
|
||||
bbsink_archive_contents(sink, sink->bbs_buffer_length);
|
||||
if (pg_checksum_update(checksum_ctx, (uint8 *) sink->bbs_buffer,
|
||||
sink->bbs_buffer_length) < 0)
|
||||
elog(ERROR, "could not update checksum");
|
||||
*bytes_done = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Try to verify the checksum for the provided page, if it seems appropriate
|
||||
* to do so.
|
||||
|
Reference in New Issue
Block a user