1
0
mirror of https://github.com/postgres/postgres.git synced 2025-08-24 09:27:52 +03:00

Introduce 'bbsink' abstraction to modularize base backup code.

The base backup code has accumulated a healthy number of new
features over the years, but it's becoming increasingly difficult
to maintain and further enhance that code because there's no
real separation of concerns. For example, the code that
understands knows the details of how we send data to the client
using the libpq protocol is scattered throughout basebackup.c,
rather than being centralized in one place.

To try to improve this situation, introduce a new 'bbsink' object
which acts as a recipient for archives generated during the base
backup progress and also for the backup manifest. This commit
introduces three types of bbsink: a 'copytblspc' bbsink forwards the
backup to the client using one COPY OUT operation per tablespace and
another for the manifest, a 'progress' bbsink performs command
progress reporting, and a 'throttle' bbsink performs rate-limiting.
The 'progress' and 'throttle' bbsink types also forward the data to a
successor bbsink; at present, the last bbsink in the chain will
always be of type 'copytblspc'. There are plans to add more types
of 'bbsink' in future commits.

This abstraction is a bit leaky in the case of progress reporting,
but this still seems cleaner than what we had before.

Patch by me, reviewed and tested by Andres Freund, Sumanta Mukherjee,
Dilip Kumar, Suraj Kharage, Dipesh Pandit, Tushar Ahuja, Mark Dilger,
and Jeevan Ladhe.

Discussion: https://postgr.es/m/CA+TgmoZGwR=ZVWFeecncubEyPdwghnvfkkdBe9BLccLSiqdf9Q@mail.gmail.com
Discussion: https://postgr.es/m/CA+TgmoZvqk7UuzxsX1xjJRmMGkqoUGYTZLDCH8SmU1xTPr1Xig@mail.gmail.com
This commit is contained in:
Robert Haas
2021-11-05 10:08:30 -04:00
parent bd807be693
commit bef47ff85d
9 changed files with 1416 additions and 518 deletions

View File

@@ -17,6 +17,10 @@ override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
OBJS = \
backup_manifest.o \
basebackup.o \
basebackup_copy.o \
basebackup_progress.o \
basebackup_sink.o \
basebackup_throttle.o \
repl_gram.o \
slot.o \
slotfuncs.o \

View File

@@ -17,6 +17,7 @@
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "replication/backup_manifest.h"
#include "replication/basebackup_sink.h"
#include "utils/builtins.h"
#include "utils/json.h"
@@ -310,9 +311,8 @@ AddWALInfoToBackupManifest(backup_manifest_info *manifest, XLogRecPtr startptr,
* Finalize the backup manifest, and send it to the client.
*/
void
SendBackupManifest(backup_manifest_info *manifest)
SendBackupManifest(backup_manifest_info *manifest, bbsink *sink)
{
StringInfoData protobuf;
uint8 checksumbuf[PG_SHA256_DIGEST_LENGTH];
char checksumstringbuf[PG_SHA256_DIGEST_STRING_LENGTH];
size_t manifest_bytes_done = 0;
@@ -352,38 +352,28 @@ SendBackupManifest(backup_manifest_info *manifest)
(errcode_for_file_access(),
errmsg("could not rewind temporary file")));
/* Send CopyOutResponse message */
pq_beginmessage(&protobuf, 'H');
pq_sendbyte(&protobuf, 0); /* overall format */
pq_sendint16(&protobuf, 0); /* natts */
pq_endmessage(&protobuf);
/*
* Send CopyData messages.
*
* We choose to read back the data from the temporary file in chunks of
* size BLCKSZ; this isn't necessary, but buffile.c uses that as the I/O
* size, so it seems to make sense to match that value here.
* Send the backup manifest.
*/
bbsink_begin_manifest(sink);
while (manifest_bytes_done < manifest->manifest_size)
{
char manifestbuf[BLCKSZ];
size_t bytes_to_read;
size_t rc;
bytes_to_read = Min(sizeof(manifestbuf),
bytes_to_read = Min(sink->bbs_buffer_length,
manifest->manifest_size - manifest_bytes_done);
rc = BufFileRead(manifest->buffile, manifestbuf, bytes_to_read);
rc = BufFileRead(manifest->buffile, sink->bbs_buffer,
bytes_to_read);
if (rc != bytes_to_read)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from temporary file: %m")));
pq_putmessage('d', manifestbuf, bytes_to_read);
bbsink_manifest_contents(sink, bytes_to_read);
manifest_bytes_done += bytes_to_read;
}
/* No more data, so send CopyDone message */
pq_putemptymessage('c');
bbsink_end_manifest(sink);
/* Release resources */
BufFileClose(manifest->buffile);

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,335 @@
/*-------------------------------------------------------------------------
*
* basebackup_copy.c
* send basebackup archives using one COPY OUT operation per
* tablespace, and an additional COPY OUT for the backup manifest
*
* Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/basebackup_copy.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/pg_type_d.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "replication/basebackup.h"
#include "replication/basebackup_sink.h"
static void bbsink_copytblspc_begin_backup(bbsink *sink);
static void bbsink_copytblspc_begin_archive(bbsink *sink,
const char *archive_name);
static void bbsink_copytblspc_archive_contents(bbsink *sink, size_t len);
static void bbsink_copytblspc_end_archive(bbsink *sink);
static void bbsink_copytblspc_begin_manifest(bbsink *sink);
static void bbsink_copytblspc_manifest_contents(bbsink *sink, size_t len);
static void bbsink_copytblspc_end_manifest(bbsink *sink);
static void bbsink_copytblspc_end_backup(bbsink *sink, XLogRecPtr endptr,
TimeLineID endtli);
static void bbsink_copytblspc_cleanup(bbsink *sink);
static void SendCopyOutResponse(void);
static void SendCopyData(const char *data, size_t len);
static void SendCopyDone(void);
static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
static void SendTablespaceList(List *tablespaces);
static void send_int8_string(StringInfoData *buf, int64 intval);
const bbsink_ops bbsink_copytblspc_ops = {
.begin_backup = bbsink_copytblspc_begin_backup,
.begin_archive = bbsink_copytblspc_begin_archive,
.archive_contents = bbsink_copytblspc_archive_contents,
.end_archive = bbsink_copytblspc_end_archive,
.begin_manifest = bbsink_copytblspc_begin_manifest,
.manifest_contents = bbsink_copytblspc_manifest_contents,
.end_manifest = bbsink_copytblspc_end_manifest,
.end_backup = bbsink_copytblspc_end_backup,
.cleanup = bbsink_copytblspc_cleanup
};
/*
* Create a new 'copytblspc' bbsink.
*/
bbsink *
bbsink_copytblspc_new(void)
{
bbsink *sink = palloc0(sizeof(bbsink));
*((const bbsink_ops **) &sink->bbs_ops) = &bbsink_copytblspc_ops;
return sink;
}
/*
* Begin backup.
*/
static void
bbsink_copytblspc_begin_backup(bbsink *sink)
{
bbsink_state *state = sink->bbs_state;
/* Create a suitable buffer. */
sink->bbs_buffer = palloc(sink->bbs_buffer_length);
/* Tell client the backup start location. */
SendXlogRecPtrResult(state->startptr, state->starttli);
/* Send client a list of tablespaces. */
SendTablespaceList(state->tablespaces);
/* Send a CommandComplete message */
pq_puttextmessage('C', "SELECT");
}
/*
* Each archive is set as a separate stream of COPY data, and thus begins
* with a CopyOutResponse message.
*/
static void
bbsink_copytblspc_begin_archive(bbsink *sink, const char *archive_name)
{
SendCopyOutResponse();
}
/*
* Each chunk of data within the archive is sent as a CopyData message.
*/
static void
bbsink_copytblspc_archive_contents(bbsink *sink, size_t len)
{
SendCopyData(sink->bbs_buffer, len);
}
/*
* The archive is terminated by a CopyDone message.
*/
static void
bbsink_copytblspc_end_archive(bbsink *sink)
{
SendCopyDone();
}
/*
* The backup manifest is sent as a separate stream of COPY data, and thus
* begins with a CopyOutResponse message.
*/
static void
bbsink_copytblspc_begin_manifest(bbsink *sink)
{
SendCopyOutResponse();
}
/*
* Each chunk of manifest data is sent using a CopyData message.
*/
static void
bbsink_copytblspc_manifest_contents(bbsink *sink, size_t len)
{
SendCopyData(sink->bbs_buffer, len);
}
/*
* When we've finished sending the manifest, send a CopyDone message.
*/
static void
bbsink_copytblspc_end_manifest(bbsink *sink)
{
SendCopyDone();
}
/*
* Send end-of-backup wire protocol messages.
*/
static void
bbsink_copytblspc_end_backup(bbsink *sink, XLogRecPtr endptr,
TimeLineID endtli)
{
SendXlogRecPtrResult(endptr, endtli);
}
/*
* Cleanup.
*/
static void
bbsink_copytblspc_cleanup(bbsink *sink)
{
/* Nothing to do. */
}
/*
* Send a CopyOutResponse message.
*/
static void
SendCopyOutResponse(void)
{
StringInfoData buf;
pq_beginmessage(&buf, 'H');
pq_sendbyte(&buf, 0); /* overall format */
pq_sendint16(&buf, 0); /* natts */
pq_endmessage(&buf);
}
/*
* Send a CopyData message.
*/
static void
SendCopyData(const char *data, size_t len)
{
pq_putmessage('d', data, len);
}
/*
* Send a CopyDone message.
*/
static void
SendCopyDone(void)
{
pq_putemptymessage('c');
}
/*
* Send a single resultset containing just a single
* XLogRecPtr record (in text format)
*/
static void
SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
{
StringInfoData buf;
char str[MAXFNAMELEN];
Size len;
pq_beginmessage(&buf, 'T'); /* RowDescription */
pq_sendint16(&buf, 2); /* 2 fields */
/* Field headers */
pq_sendstring(&buf, "recptr");
pq_sendint32(&buf, 0); /* table oid */
pq_sendint16(&buf, 0); /* attnum */
pq_sendint32(&buf, TEXTOID); /* type oid */
pq_sendint16(&buf, -1);
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_sendstring(&buf, "tli");
pq_sendint32(&buf, 0); /* table oid */
pq_sendint16(&buf, 0); /* attnum */
/*
* int8 may seem like a surprising data type for this, but in theory int4
* would not be wide enough for this, as TimeLineID is unsigned.
*/
pq_sendint32(&buf, INT8OID); /* type oid */
pq_sendint16(&buf, -1);
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage(&buf);
/* Data row */
pq_beginmessage(&buf, 'D');
pq_sendint16(&buf, 2); /* number of columns */
len = snprintf(str, sizeof(str),
"%X/%X", LSN_FORMAT_ARGS(ptr));
pq_sendint32(&buf, len);
pq_sendbytes(&buf, str, len);
len = snprintf(str, sizeof(str), "%u", tli);
pq_sendint32(&buf, len);
pq_sendbytes(&buf, str, len);
pq_endmessage(&buf);
/* Send a CommandComplete message */
pq_puttextmessage('C', "SELECT");
}
/*
* Send a result set via libpq describing the tablespace list.
*/
static void
SendTablespaceList(List *tablespaces)
{
StringInfoData buf;
ListCell *lc;
/* Construct and send the directory information */
pq_beginmessage(&buf, 'T'); /* RowDescription */
pq_sendint16(&buf, 3); /* 3 fields */
/* First field - spcoid */
pq_sendstring(&buf, "spcoid");
pq_sendint32(&buf, 0); /* table oid */
pq_sendint16(&buf, 0); /* attnum */
pq_sendint32(&buf, OIDOID); /* type oid */
pq_sendint16(&buf, 4); /* typlen */
pq_sendint32(&buf, 0); /* typmod */
pq_sendint16(&buf, 0); /* format code */
/* Second field - spclocation */
pq_sendstring(&buf, "spclocation");
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_sendint32(&buf, TEXTOID);
pq_sendint16(&buf, -1);
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
/* Third field - size */
pq_sendstring(&buf, "size");
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_sendint32(&buf, INT8OID);
pq_sendint16(&buf, 8);
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage(&buf);
foreach(lc, tablespaces)
{
tablespaceinfo *ti = lfirst(lc);
/* Send one datarow message */
pq_beginmessage(&buf, 'D');
pq_sendint16(&buf, 3); /* number of columns */
if (ti->path == NULL)
{
pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
pq_sendint32(&buf, -1);
}
else
{
Size len;
len = strlen(ti->oid);
pq_sendint32(&buf, len);
pq_sendbytes(&buf, ti->oid, len);
len = strlen(ti->path);
pq_sendint32(&buf, len);
pq_sendbytes(&buf, ti->path, len);
}
if (ti->size >= 0)
send_int8_string(&buf, ti->size / 1024);
else
pq_sendint32(&buf, -1); /* NULL */
pq_endmessage(&buf);
}
}
/*
* Send a 64-bit integer as a string via the wire protocol.
*/
static void
send_int8_string(StringInfoData *buf, int64 intval)
{
char is[32];
sprintf(is, INT64_FORMAT, intval);
pq_sendint32(buf, strlen(is));
pq_sendbytes(buf, is, strlen(is));
}

View File

@@ -0,0 +1,246 @@
/*-------------------------------------------------------------------------
*
* basebackup_progress.c
* Basebackup sink implementing progress tracking, including but not
* limited to command progress reporting.
*
* This should be used even if the PROGRESS option to the replication
* command BASE_BACKUP is not specified. Without that option, we won't
* have tallied up the size of the files that are going to need to be
* backed up, but we can still report to the command progress reporting
* facility how much data we've processed.
*
* Moreover, we also use this as a convenient place to update certain
* fields of the bbsink_state. That work is accurately described as
* keeping track of our progress, but it's not just for introspection.
* We need those fields to be updated properly in order for base backups
* to work.
*
* This particular basebackup sink requires extra callbacks that most base
* backup sinks don't. Rather than cramming those into the interface, we just
* have a few extra functions here that basebackup.c can call. (We could put
* the logic directly into that file as it's fairly simple, but it seems
* cleaner to have everything related to progress reporting in one place.)
*
* Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/basebackup_progress.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "commands/progress.h"
#include "miscadmin.h"
#include "replication/basebackup.h"
#include "replication/basebackup_sink.h"
#include "pgstat.h"
#include "storage/latch.h"
#include "utils/timestamp.h"
static void bbsink_progress_begin_backup(bbsink *sink);
static void bbsink_progress_archive_contents(bbsink *sink, size_t len);
static void bbsink_progress_end_archive(bbsink *sink);
const bbsink_ops bbsink_progress_ops = {
.begin_backup = bbsink_progress_begin_backup,
.begin_archive = bbsink_forward_begin_archive,
.archive_contents = bbsink_progress_archive_contents,
.end_archive = bbsink_progress_end_archive,
.begin_manifest = bbsink_forward_begin_manifest,
.manifest_contents = bbsink_forward_manifest_contents,
.end_manifest = bbsink_forward_end_manifest,
.end_backup = bbsink_forward_end_backup,
.cleanup = bbsink_forward_cleanup
};
/*
* Create a new basebackup sink that performs progress tracking functions and
* forwards data to a successor sink.
*/
bbsink *
bbsink_progress_new(bbsink *next, bool estimate_backup_size)
{
bbsink *sink;
Assert(next != NULL);
sink = palloc0(sizeof(bbsink));
*((const bbsink_ops **) &sink->bbs_ops) = &bbsink_progress_ops;
sink->bbs_next = next;
/*
* Report that a base backup is in progress, and set the total size of the
* backup to -1, which will get translated to NULL. If we're estimating
* the backup size, we'll insert the real estimate when we have it.
*/
pgstat_progress_start_command(PROGRESS_COMMAND_BASEBACKUP, InvalidOid);
pgstat_progress_update_param(PROGRESS_BASEBACKUP_BACKUP_TOTAL, -1);
return sink;
}
/*
* Progress reporting at start of backup.
*/
static void
bbsink_progress_begin_backup(bbsink *sink)
{
const int index[] = {
PROGRESS_BASEBACKUP_PHASE,
PROGRESS_BASEBACKUP_BACKUP_TOTAL,
PROGRESS_BASEBACKUP_TBLSPC_TOTAL
};
int64 val[3];
/*
* Report that we are now streaming database files as a base backup. Also
* advertise the number of tablespaces, and, if known, the estimated total
* backup size.
*/
val[0] = PROGRESS_BASEBACKUP_PHASE_STREAM_BACKUP;
if (sink->bbs_state->bytes_total_is_valid)
val[1] = sink->bbs_state->bytes_total;
else
val[1] = -1;
val[2] = list_length(sink->bbs_state->tablespaces);
pgstat_progress_update_multi_param(3, index, val);
/* Delegate to next sink. */
bbsink_forward_begin_backup(sink);
}
/*
* End-of archive progress reporting.
*/
static void
bbsink_progress_end_archive(bbsink *sink)
{
/*
* We expect one archive per tablespace, so reaching the end of an archive
* also means reaching the end of a tablespace. (Some day we might have a
* reason to decouple these concepts.)
*
* If WAL is included in the backup, we'll mark the last tablespace
* complete before the last archive is complete, so we need a guard here
* to ensure that the number of tablespaces streamed doesn't exceed the
* total.
*/
if (sink->bbs_state->tablespace_num < list_length(sink->bbs_state->tablespaces))
pgstat_progress_update_param(PROGRESS_BASEBACKUP_TBLSPC_STREAMED,
sink->bbs_state->tablespace_num + 1);
/* Delegate to next sink. */
bbsink_forward_end_archive(sink);
/*
* This is a convenient place to update the bbsink_state's notion of which
* is the current tablespace. Note that the bbsink_state object is shared
* across all bbsink objects involved, but we're the outermost one and
* this is the very last thing we do.
*/
sink->bbs_state->tablespace_num++;
}
/*
* Handle progress tracking for new archive contents.
*
* Increment the counter for the amount of data already streamed
* by the given number of bytes, and update the progress report for
* pg_stat_progress_basebackup.
*/
static void
bbsink_progress_archive_contents(bbsink *sink, size_t len)
{
bbsink_state *state = sink->bbs_state;
const int index[] = {
PROGRESS_BASEBACKUP_BACKUP_STREAMED,
PROGRESS_BASEBACKUP_BACKUP_TOTAL
};
int64 val[2];
int nparam = 0;
/* First update bbsink_state with # of bytes done. */
state->bytes_done += len;
/* Now forward to next sink. */
bbsink_forward_archive_contents(sink, len);
/* Prepare to set # of bytes done for command progress reporting. */
val[nparam++] = state->bytes_done;
/*
* We may also want to update # of total bytes, to avoid overflowing past
* 100% or the full size. This may make the total size number change as we
* approach the end of the backup (the estimate will always be wrong if
* WAL is included), but that's better than having the done column be
* bigger than the total.
*/
if (state->bytes_total_is_valid && state->bytes_done > state->bytes_total)
val[nparam++] = state->bytes_done;
pgstat_progress_update_multi_param(nparam, index, val);
}
/*
* Advertise that we are waiting for the start-of-backup checkpoint.
*/
void
basebackup_progress_wait_checkpoint(void)
{
pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
PROGRESS_BASEBACKUP_PHASE_WAIT_CHECKPOINT);
}
/*
* Advertise that we are estimating the backup size.
*/
void
basebackup_progress_estimate_backup_size(void)
{
pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
PROGRESS_BASEBACKUP_PHASE_ESTIMATE_BACKUP_SIZE);
}
/*
* Advertise that we are waiting for WAL archiving at end-of-backup.
*/
void
basebackup_progress_wait_wal_archive(bbsink_state *state)
{
const int index[] = {
PROGRESS_BASEBACKUP_PHASE,
PROGRESS_BASEBACKUP_TBLSPC_STREAMED
};
int64 val[2];
/*
* We report having finished all tablespaces at this point, even if the
* archive for the main tablespace is still open, because what's going to
* be added is WAL files, not files that are really from the main
* tablespace.
*/
val[0] = PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE;
val[1] = list_length(state->tablespaces);
pgstat_progress_update_multi_param(2, index, val);
}
/*
* Advertise that we are transferring WAL files into the final archive.
*/
void
basebackup_progress_transfer_wal(void)
{
pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL);
}
/*
* Advertise that we are no longer performing a backup.
*/
void
basebackup_progress_done(void)
{
pgstat_progress_end_command();
}

View File

@@ -0,0 +1,125 @@
/*-------------------------------------------------------------------------
*
* basebackup_sink.c
* Default implementations for bbsink (basebackup sink) callbacks.
*
* Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
*
* src/backend/replication/basebackup_sink.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "replication/basebackup_sink.h"
/*
* Forward begin_backup callback.
*
* Only use this implementation if you want the bbsink you're implementing to
* share a buffer with the succesor bbsink.
*/
void
bbsink_forward_begin_backup(bbsink *sink)
{
Assert(sink->bbs_next != NULL);
Assert(sink->bbs_state != NULL);
bbsink_begin_backup(sink->bbs_next, sink->bbs_state,
sink->bbs_buffer_length);
sink->bbs_buffer = sink->bbs_next->bbs_buffer;
}
/*
* Forward begin_archive callback.
*/
void
bbsink_forward_begin_archive(bbsink *sink, const char *archive_name)
{
Assert(sink->bbs_next != NULL);
bbsink_begin_archive(sink->bbs_next, archive_name);
}
/*
* Forward archive_contents callback.
*
* Code that wants to use this should initalize its own bbs_buffer and
* bbs_buffer_length fields to the values from the successor sink. In cases
* where the buffer isn't shared, the data needs to be copied before forwarding
* the callback. We don't do try to do that here, because there's really no
* reason to have separately allocated buffers containing the same identical
* data.
*/
void
bbsink_forward_archive_contents(bbsink *sink, size_t len)
{
Assert(sink->bbs_next != NULL);
Assert(sink->bbs_buffer == sink->bbs_next->bbs_buffer);
Assert(sink->bbs_buffer_length == sink->bbs_next->bbs_buffer_length);
bbsink_archive_contents(sink->bbs_next, len);
}
/*
* Forward end_archive callback.
*/
void
bbsink_forward_end_archive(bbsink *sink)
{
Assert(sink->bbs_next != NULL);
bbsink_end_archive(sink->bbs_next);
}
/*
* Forward begin_manifest callback.
*/
void
bbsink_forward_begin_manifest(bbsink *sink)
{
Assert(sink->bbs_next != NULL);
bbsink_begin_manifest(sink->bbs_next);
}
/*
* Forward manifest_contents callback.
*
* As with the archive_contents callback, it's expected that the buffer is
* shared.
*/
void
bbsink_forward_manifest_contents(bbsink *sink, size_t len)
{
Assert(sink->bbs_next != NULL);
Assert(sink->bbs_buffer == sink->bbs_next->bbs_buffer);
Assert(sink->bbs_buffer_length == sink->bbs_next->bbs_buffer_length);
bbsink_manifest_contents(sink->bbs_next, len);
}
/*
* Forward end_manifest callback.
*/
void
bbsink_forward_end_manifest(bbsink *sink)
{
Assert(sink->bbs_next != NULL);
bbsink_end_manifest(sink->bbs_next);
}
/*
* Forward end_backup callback.
*/
void
bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
{
Assert(sink->bbs_next != NULL);
bbsink_end_backup(sink->bbs_next, endptr, endtli);
}
/*
* Forward cleanup callback.
*/
void
bbsink_forward_cleanup(bbsink *sink)
{
Assert(sink->bbs_next != NULL);
bbsink_cleanup(sink->bbs_next);
}

View File

@@ -0,0 +1,199 @@
/*-------------------------------------------------------------------------
*
* basebackup_throttle.c
* Basebackup sink implementing throttling. Data is forwarded to the
* next base backup sink in the chain at a rate no greater than the
* configured maximum.
*
* Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/basebackup_throttle.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "replication/basebackup_sink.h"
#include "pgstat.h"
#include "storage/latch.h"
#include "utils/timestamp.h"
typedef struct bbsink_throttle
{
/* Common information for all types of sink. */
bbsink base;
/* The actual number of bytes, transfer of which may cause sleep. */
uint64 throttling_sample;
/* Amount of data already transferred but not yet throttled. */
int64 throttling_counter;
/* The minimum time required to transfer throttling_sample bytes. */
TimeOffset elapsed_min_unit;
/* The last check of the transfer rate. */
TimestampTz throttled_last;
} bbsink_throttle;
static void bbsink_throttle_begin_backup(bbsink *sink);
static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
static void throttle(bbsink_throttle *sink, size_t increment);
const bbsink_ops bbsink_throttle_ops = {
.begin_backup = bbsink_throttle_begin_backup,
.begin_archive = bbsink_forward_begin_archive,
.archive_contents = bbsink_throttle_archive_contents,
.end_archive = bbsink_forward_end_archive,
.begin_manifest = bbsink_forward_begin_manifest,
.manifest_contents = bbsink_throttle_manifest_contents,
.end_manifest = bbsink_forward_end_manifest,
.end_backup = bbsink_forward_end_backup,
.cleanup = bbsink_forward_cleanup
};
/*
* How frequently to throttle, as a fraction of the specified rate-second.
*/
#define THROTTLING_FREQUENCY 8
/*
* Create a new basebackup sink that performs throttling and forwards data
* to a successor sink.
*/
bbsink *
bbsink_throttle_new(bbsink *next, uint32 maxrate)
{
bbsink_throttle *sink;
Assert(next != NULL);
Assert(maxrate > 0);
sink = palloc0(sizeof(bbsink_throttle));
*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
sink->base.bbs_next = next;
sink->throttling_sample =
(int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
/*
* The minimum amount of time for throttling_sample bytes to be
* transferred.
*/
sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
return &sink->base;
}
/*
* There's no real work to do here, but we need to record the current time so
* that it can be used for future calculations.
*/
static void
bbsink_throttle_begin_backup(bbsink *sink)
{
bbsink_throttle *mysink = (bbsink_throttle *) sink;
bbsink_forward_begin_backup(sink);
/* The 'real data' starts now (header was ignored). */
mysink->throttled_last = GetCurrentTimestamp();
}
/*
* First throttle, and then pass archive contents to next sink.
*/
static void
bbsink_throttle_archive_contents(bbsink *sink, size_t len)
{
throttle((bbsink_throttle *) sink, len);
bbsink_forward_archive_contents(sink, len);
}
/*
* First throttle, and then pass manifest contents to next sink.
*/
static void
bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
{
throttle((bbsink_throttle *) sink, len);
bbsink_forward_manifest_contents(sink->bbs_next, len);
}
/*
* Increment the network transfer counter by the given number of bytes,
* and sleep if necessary to comply with the requested network transfer
* rate.
*/
static void
throttle(bbsink_throttle *sink, size_t increment)
{
TimeOffset elapsed_min;
Assert(sink->throttling_counter >= 0);
sink->throttling_counter += increment;
if (sink->throttling_counter < sink->throttling_sample)
return;
/* How much time should have elapsed at minimum? */
elapsed_min = sink->elapsed_min_unit *
(sink->throttling_counter / sink->throttling_sample);
/*
* Since the latch could be set repeatedly because of concurrently WAL
* activity, sleep in a loop to ensure enough time has passed.
*/
for (;;)
{
TimeOffset elapsed,
sleep;
int wait_result;
/* Time elapsed since the last measurement (and possible wake up). */
elapsed = GetCurrentTimestamp() - sink->throttled_last;
/* sleep if the transfer is faster than it should be */
sleep = elapsed_min - elapsed;
if (sleep <= 0)
break;
ResetLatch(MyLatch);
/* We're eating a potentially set latch, so check for interrupts */
CHECK_FOR_INTERRUPTS();
/*
* (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
* the maximum time to sleep. Thus the cast to long is safe.
*/
wait_result = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
(long) (sleep / 1000),
WAIT_EVENT_BASE_BACKUP_THROTTLE);
if (wait_result & WL_LATCH_SET)
CHECK_FOR_INTERRUPTS();
/* Done waiting? */
if (wait_result & WL_TIMEOUT)
break;
}
/*
* As we work with integers, only whole multiple of throttling_sample was
* processed. The rest will be done during the next call of this function.
*/
sink->throttling_counter %= sink->throttling_sample;
/*
* Time interval for the remaining amount and possible next increments
* starts now.
*/
sink->throttled_last = GetCurrentTimestamp();
}