1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-28 23:42:10 +03:00

Add option to include WAL in base backup

When included, this makes the base backup a complete working
"clone" of the initial database, ready to have a postmaster
started against it without the need to set up any log archiving
or similar.

Magnus Hagander, reviewed by Fujii Masao and Heikki Linnakangas
This commit is contained in:
Magnus Hagander
2011-01-30 21:30:09 +01:00
parent 5d5678d7c3
commit 507069de6d
8 changed files with 197 additions and 44 deletions

View File

@ -37,6 +37,7 @@ typedef struct
const char *label;
bool progress;
bool fastcheckpoint;
bool includewal;
} basebackup_options;
@ -46,11 +47,17 @@ static void _tarWriteHeader(char *filename, char *linktarget,
struct stat * statbuf);
static void send_int8_string(StringInfoData *buf, int64 intval);
static void SendBackupHeader(List *tablespaces);
static void SendBackupDirectory(char *location, char *spcoid);
static void base_backup_cleanup(int code, Datum arg);
static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
static void parse_basebackup_options(List *options, basebackup_options *opt);
/*
* Size of each block sent into the tar stream for larger files.
*
* XLogSegSize *MUST* be evenly dividable by this
*/
#define TAR_SEND_SIZE 32768
typedef struct
{
char *oid;
@ -78,7 +85,10 @@ base_backup_cleanup(int code, Datum arg)
static void
perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
{
do_pg_start_backup(opt->label, opt->fastcheckpoint);
XLogRecPtr startptr;
XLogRecPtr endptr;
startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint);
PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
{
@ -87,12 +97,6 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
struct dirent *de;
tablespaceinfo *ti;
/* Add a node for the base directory */
ti = palloc0(sizeof(tablespaceinfo));
ti->size = opt->progress ? sendDir(".", 1, true) : -1;
tablespaces = lappend(tablespaces, ti);
/* Collect information about all tablespaces */
while ((de = ReadDir(tblspcdir, "pg_tblspc")) != NULL)
{
@ -120,6 +124,10 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
tablespaces = lappend(tablespaces, ti);
}
/* Add a node for the base directory at the end */
ti = palloc0(sizeof(tablespaceinfo));
ti->size = opt->progress ? sendDir(".", 1, true) : -1;
tablespaces = lappend(tablespaces, ti);
/* Send tablespace header */
SendBackupHeader(tablespaces);
@ -128,13 +136,102 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
foreach(lc, tablespaces)
{
tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc);
StringInfoData buf;
SendBackupDirectory(ti->path, ti->oid);
/* Send CopyOutResponse message */
pq_beginmessage(&buf, 'H');
pq_sendbyte(&buf, 0); /* overall format */
pq_sendint(&buf, 0, 2); /* natts */
pq_endmessage(&buf);
sendDir(ti->path == NULL ? "." : ti->path,
ti->path == NULL ? 1 : strlen(ti->path),
false);
/*
* If we're including WAL, and this is the main data directory we
* don't terminate the tar stream here. Instead, we will append
* the xlog files below and terminate it then. This is safe since
* the main data directory is always sent *last*.
*/
if (opt->includewal && ti->path == NULL)
{
Assert(lnext(lc) == NULL);
}
else
pq_putemptymessage('c'); /* CopyDone */
}
}
PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
do_pg_stop_backup();
endptr = do_pg_stop_backup();
if (opt->includewal)
{
/*
* We've left the last tar file "open", so we can now append the
* required WAL files to it.
*/
uint32 logid,
logseg;
uint32 endlogid,
endlogseg;
struct stat statbuf;
MemSet(&statbuf, 0, sizeof(statbuf));
statbuf.st_mode = S_IRUSR | S_IWUSR;
#ifndef WIN32
statbuf.st_uid = geteuid();
statbuf.st_gid = getegid();
#endif
statbuf.st_size = XLogSegSize;
statbuf.st_mtime = time(NULL);
XLByteToSeg(startptr, logid, logseg);
XLByteToPrevSeg(endptr, endlogid, endlogseg);
while (true)
{
/* Send another xlog segment */
char fn[MAXPGPATH];
int i;
XLogFilePath(fn, ThisTimeLineID, logid, logseg);
_tarWriteHeader(fn, NULL, &statbuf);
/* Send the actual WAL file contents, block-by-block */
for (i = 0; i < XLogSegSize / TAR_SEND_SIZE; i++)
{
char buf[TAR_SEND_SIZE];
XLogRecPtr ptr;
ptr.xlogid = logid;
ptr.xrecoff = logseg * XLogSegSize + TAR_SEND_SIZE * i;
XLogRead(buf, ptr, TAR_SEND_SIZE);
if (pq_putmessage('d', buf, TAR_SEND_SIZE))
ereport(ERROR,
(errmsg("base backup could not send data, aborting backup")));
}
/*
* Files are always fixed size, and always end on a 512 byte
* boundary, so padding is never necessary.
*/
/* Advance to the next WAL file */
NextLogSeg(logid, logseg);
/* Have we reached our stop position yet? */
if (logid > endlogid ||
(logid == endlogid && logseg > endlogseg))
break;
}
/* Send CopyDone message for the last tar file */
pq_putemptymessage('c');
}
}
/*
@ -147,6 +244,7 @@ parse_basebackup_options(List *options, basebackup_options *opt)
bool o_label = false;
bool o_progress = false;
bool o_fast = false;
bool o_wal = false;
MemSet(opt, 0, sizeof(*opt));
foreach(lopt, options)
@ -180,6 +278,15 @@ parse_basebackup_options(List *options, basebackup_options *opt)
opt->fastcheckpoint = true;
o_fast = true;
}
else if (strcmp(defel->defname, "wal") == 0)
{
if (o_wal)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("duplicate option \"%s\"", defel->defname)));
opt->includewal = true;
o_wal = true;
}
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
@ -316,26 +423,6 @@ SendBackupHeader(List *tablespaces)
pq_puttextmessage('C', "SELECT");
}
static void
SendBackupDirectory(char *location, char *spcoid)
{
StringInfoData buf;
/* Send CopyOutResponse message */
pq_beginmessage(&buf, 'H');
pq_sendbyte(&buf, 0); /* overall format */
pq_sendint(&buf, 0, 2); /* natts */
pq_endmessage(&buf);
/* tar up the data directory if NULL, otherwise the tablespace */
sendDir(location == NULL ? "." : location,
location == NULL ? 1 : strlen(location),
false);
/* Send CopyDone message */
pq_putemptymessage('c');
}
static int64
sendDir(char *path, int basepathlen, bool sizeonly)
@ -506,7 +593,7 @@ static void
sendFile(char *filename, int basepathlen, struct stat * statbuf)
{
FILE *fp;
char buf[32768];
char buf[TAR_SEND_SIZE];
size_t cnt;
pgoff_t len = 0;
size_t pad;

View File

@ -71,6 +71,7 @@ Node *replication_parse_result;
%token K_LABEL
%token K_PROGRESS
%token K_FAST
%token K_WAL
%token K_START_REPLICATION
%type <node> command
@ -106,7 +107,7 @@ identify_system:
;
/*
* BASE_BACKUP [LABEL <label>] [PROGRESS] [FAST]
* BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL]
*/
base_backup:
K_BASE_BACKUP base_backup_opt_list
@ -136,7 +137,12 @@ base_backup_opt:
$$ = makeDefElem("fast",
(Node *)makeInteger(TRUE));
}
| K_WAL
{
$$ = makeDefElem("wal",
(Node *)makeInteger(TRUE));
}
;
/*
* START_REPLICATION %X/%X

View File

@ -61,6 +61,7 @@ FAST { return K_FAST; }
IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; }
LABEL { return K_LABEL; }
PROGRESS { return K_PROGRESS; }
WAL { return K_WAL; }
START_REPLICATION { return K_START_REPLICATION; }
"," { return ','; }
";" { return ';'; }

View File

@ -105,7 +105,6 @@ static int WalSndLoop(void);
static void InitWalSnd(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
static bool XLogSend(char *msgbuf, bool *caughtup);
static void CheckClosedConnection(void);
static void IdentifySystem(void);
@ -649,8 +648,13 @@ WalSndKill(int code, Datum arg)
*
* XXX probably this should be improved to suck data directly from the
* WAL buffers when possible.
*
* Will open, and keep open, one WAL segment stored in the global file
* descriptor sendFile. This means if XLogRead is used once, there will
* always be one descriptor left open until the process ends, but never
* more than one.
*/
static void
void
XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
{
XLogRecPtr startRecPtr = recptr;

View File

@ -33,6 +33,7 @@ char *label = "pg_basebackup base backup";
bool showprogress = false;
int verbose = 0;
int compresslevel = 0;
bool includewal = false;
bool fastcheckpoint = false;
char *dbhost = NULL;
char *dbuser = NULL;
@ -124,6 +125,7 @@ usage(void)
printf(_("\nOptions controlling the output:\n"));
printf(_(" -D, --pgdata=directory receive base backup into directory\n"));
printf(_(" -F, --format=p|t output format (plain, tar)\n"));
printf(_(" -x, --xlog include required WAL files in backup\n"));
printf(_(" -Z, --compress=0-9 compress tar output\n"));
printf(_("\nGeneral options:\n"));
printf(_(" -c, --checkpoint=fast|spread\n"
@ -200,16 +202,20 @@ verify_dir_is_empty_or_create(char *dirname)
static void
progress_report(int tablespacenum, char *fn)
{
int percent = (int) ((totaldone / 1024) * 100 / totalsize);
if (percent > 100)
percent = 100;
if (verbose)
fprintf(stderr,
INT64_FORMAT "/" INT64_FORMAT " kB (%i%%) %i/%i tablespaces (%-30s)\r",
totaldone / 1024, totalsize,
(int) ((totaldone / 1024) * 100 / totalsize),
percent,
tablespacenum, tablespacecount, fn);
else
fprintf(stderr, INT64_FORMAT "/" INT64_FORMAT " kB (%i%%) %i/%i tablespaces\r",
totaldone / 1024, totalsize,
(int) ((totaldone / 1024) * 100 / totalsize),
percent,
tablespacenum, tablespacecount);
}
@ -746,9 +752,10 @@ BaseBackup()
conn = GetConnection();
PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s",
snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s",
escaped_label,
showprogress ? "PROGRESS" : "",
includewal ? "WAL" : "",
fastcheckpoint ? "FAST" : "");
if (PQsendQuery(conn, current_path) == 0)
@ -789,7 +796,7 @@ BaseBackup()
* first once since it can be relocated, and it will be checked before
* we do anything anyway.
*/
if (format == 'p' && i > 0)
if (format == 'p' && !PQgetisnull(res, i, 1))
verify_dir_is_empty_or_create(PQgetvalue(res, i, 1));
}
@ -848,6 +855,7 @@ main(int argc, char **argv)
{"pgdata", required_argument, NULL, 'D'},
{"format", required_argument, NULL, 'F'},
{"checkpoint", required_argument, NULL, 'c'},
{"xlog", no_argument, NULL, 'x'},
{"compress", required_argument, NULL, 'Z'},
{"label", required_argument, NULL, 'l'},
{"host", required_argument, NULL, 'h'},
@ -881,7 +889,7 @@ main(int argc, char **argv)
}
}
while ((c = getopt_long(argc, argv, "D:F:l:Z:c:h:p:U:wWvP",
while ((c = getopt_long(argc, argv, "D:F:l:Z:c:h:p:U:xwWvP",
long_options, &option_index)) != -1)
{
switch (c)
@ -901,6 +909,9 @@ main(int argc, char **argv)
exit(1);
}
break;
case 'x':
includewal = true;
break;
case 'l':
label = xstrdup(optarg);
break;

View File

@ -67,6 +67,7 @@ extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
extern void WalSndWakeup(void);
extern void WalSndSetState(WalSndState state);
extern void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);