1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-09 22:41:56 +03:00

Report progress of streaming base backup.

This commit adds pg_stat_progress_basebackup view that reports
the progress while an application like pg_basebackup is taking
a base backup. This uses the progress reporting infrastructure
added by c16dc1aca5, adding support for streaming base backup.

Bump catversion.

Author: Fujii Masao
Reviewed-by: Kyotaro Horiguchi, Amit Langote, Sergei Kornilov
Discussion: https://postgr.es/m/9ed8b801-8215-1f3d-62d7-65bff53f6e94@oss.nttdata.com
This commit is contained in:
Fujii Masao
2020-03-03 12:03:43 +09:00
parent d79fb88ac7
commit e65497df8f
11 changed files with 339 additions and 6 deletions

View File

@ -19,6 +19,7 @@
#include "access/xlog_internal.h" /* for pg_start/stop_backup */
#include "catalog/pg_type.h"
#include "common/file_perm.h"
#include "commands/progress.h"
#include "lib/stringinfo.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
@ -70,6 +71,7 @@ static void parse_basebackup_options(List *options, basebackup_options *opt);
static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
static int compareWalFileNames(const ListCell *a, const ListCell *b);
static void throttle(size_t increment);
static void update_basebackup_progress(int64 delta);
static bool is_checksummed_file(const char *fullpath, const char *filename);
/* Was the backup currently in-progress initiated in recovery mode? */
@ -121,6 +123,12 @@ static long long int total_checksum_failures;
/* Do not verify checksums. */
static bool noverify_checksums = false;
/* Total amount of backup data that will be streamed */
static int64 backup_total = 0;
/* Amount of backup data already streamed */
static int64 backup_streamed = 0;
/*
* Definition of one element part of an exclusion list, used for paths part
* of checksum validation or base backups. "name" is the name of the file
@ -246,6 +254,10 @@ perform_base_backup(basebackup_options *opt)
int datadirpathlen;
List *tablespaces = NIL;
backup_total = 0;
backup_streamed = 0;
pgstat_progress_start_command(PROGRESS_COMMAND_BASEBACKUP, InvalidOid);
datadirpathlen = strlen(DataDir);
backup_started_in_recovery = RecoveryInProgress();
@ -255,6 +267,8 @@ perform_base_backup(basebackup_options *opt)
total_checksum_failures = 0;
pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
PROGRESS_BASEBACKUP_PHASE_WAIT_CHECKPOINT);
startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
labelfile, &tablespaces,
tblspc_map_file,
@ -271,8 +285,7 @@ perform_base_backup(basebackup_options *opt)
{
ListCell *lc;
tablespaceinfo *ti;
SendXlogRecPtrResult(startptr, starttli);
int tblspc_streamed = 0;
/*
* Calculate the relative path of temporary statistics directory in
@ -291,6 +304,38 @@ perform_base_backup(basebackup_options *opt)
ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true) : -1;
tablespaces = lappend(tablespaces, ti);
/*
* Calculate the total backup size by summing up the size of each
* tablespace
*/
if (opt->progress)
{
foreach(lc, tablespaces)
{
tablespaceinfo *tmp = (tablespaceinfo *) lfirst(lc);
backup_total += tmp->size;
}
}
/* Report that we are now streaming database files as a base backup */
{
const int index[] = {
PROGRESS_BASEBACKUP_PHASE,
PROGRESS_BASEBACKUP_BACKUP_TOTAL,
PROGRESS_BASEBACKUP_TBLSPC_TOTAL
};
const int64 val[] = {
PROGRESS_BASEBACKUP_PHASE_STREAM_BACKUP,
backup_total, list_length(tablespaces)
};
pgstat_progress_update_multi_param(3, index, val);
}
/* Send the starting position of the backup */
SendXlogRecPtrResult(startptr, starttli);
/* Send tablespace header */
SendBackupHeader(tablespaces);
@ -372,8 +417,14 @@ perform_base_backup(basebackup_options *opt)
}
else
pq_putemptymessage('c'); /* CopyDone */
tblspc_streamed++;
pgstat_progress_update_param(PROGRESS_BASEBACKUP_TBLSPC_STREAMED,
tblspc_streamed);
}
pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE);
endptr = do_pg_stop_backup(labelfile->data, !opt->nowait, &endtli);
}
PG_END_ENSURE_ERROR_CLEANUP(do_pg_abort_backup, BoolGetDatum(false));
@ -399,6 +450,9 @@ perform_base_backup(basebackup_options *opt)
ListCell *lc;
TimeLineID tli;
pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL);
/*
* I'd rather not worry about timelines here, so scan pg_wal and
* include all WAL files in the range between 'startptr' and 'endptr',
@ -548,6 +602,7 @@ perform_base_backup(basebackup_options *opt)
if (pq_putmessage('d', buf, cnt))
ereport(ERROR,
(errmsg("base backup could not send data, aborting backup")));
update_basebackup_progress(cnt);
len += cnt;
throttle(cnt);
@ -623,6 +678,7 @@ perform_base_backup(basebackup_options *opt)
errmsg("checksum verification failure during base backup")));
}
pgstat_progress_end_command();
}
/*
@ -949,6 +1005,7 @@ sendFileWithContent(const char *filename, const char *content)
_tarWriteHeader(filename, NULL, &statbuf, false);
/* Send the contents as a CopyData message */
pq_putmessage('d', content, len);
update_basebackup_progress(len);
/* Pad to 512 byte boundary, per tar format requirements */
pad = ((len + 511) & ~511) - len;
@ -958,6 +1015,7 @@ sendFileWithContent(const char *filename, const char *content)
MemSet(buf, 0, pad);
pq_putmessage('d', buf, pad);
update_basebackup_progress(pad);
}
}
@ -1565,6 +1623,7 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
if (pq_putmessage('d', buf, cnt))
ereport(ERROR,
(errmsg("base backup could not send data, aborting backup")));
update_basebackup_progress(cnt);
len += cnt;
throttle(cnt);
@ -1590,6 +1649,7 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
{
cnt = Min(sizeof(buf), statbuf->st_size - len);
pq_putmessage('d', buf, cnt);
update_basebackup_progress(cnt);
len += cnt;
throttle(cnt);
}
@ -1604,6 +1664,7 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
{
MemSet(buf, 0, pad);
pq_putmessage('d', buf, pad);
update_basebackup_progress(pad);
}
FreeFile(fp);
@ -1658,6 +1719,7 @@ _tarWriteHeader(const char *filename, const char *linktarget,
}
pq_putmessage('d', h, sizeof(h));
update_basebackup_progress(sizeof(h));
}
return sizeof(h);
@ -1755,3 +1817,36 @@ throttle(size_t increment)
*/
throttled_last = GetCurrentTimestamp();
}
/*
* Increment the counter for the amount of data already streamed
* by the given number of bytes, and update the progress report for
* pg_stat_progress_basebackup.
*/
static void
update_basebackup_progress(int64 delta)
{
const int index[] = {
PROGRESS_BASEBACKUP_BACKUP_STREAMED,
PROGRESS_BASEBACKUP_BACKUP_TOTAL
};
int64 val[2];
int nparam = 0;
backup_streamed += delta;
val[nparam++] = backup_streamed;
/*
* Avoid overflowing past 100% or the full size. This may make the total
* size number change as we approach the end of the backup (the estimate
* will always be wrong if WAL is included), but that's better than having
* the done column be bigger than the total.
*/
if (backup_total > 0 && backup_streamed > backup_total)
{
backup_total = backup_streamed;
val[nparam++] = backup_total;
}
pgstat_progress_update_multi_param(nparam, index, val);
}