mirror of
https://github.com/postgres/postgres.git
synced 2025-09-02 04:21:28 +03:00
Make WAL segment size configurable at initdb time.
For performance reasons a larger segment size than the default 16MB can be useful. A larger segment size has two main benefits: Firstly, in setups using archiving, it makes it easier to write scripts that can keep up with higher amounts of WAL, secondly, the WAL has to be written and synced to disk less frequently. But at the same time large segment size are disadvantageous for smaller databases. So far the segment size had to be configured at compile time, often making it unrealistic to choose one fitting to a particularly load. Therefore change it to a initdb time setting. This includes a breaking changes to the xlogreader.h API, which now requires the current segment size to be configured. For that and similar reasons a number of binaries had to be taught how to recognize the current segment size. Author: Beena Emerson, editorialized by Andres Freund Reviewed-By: Andres Freund, David Steele, Kuntal Ghosh, Michael Paquier, Peter Eisentraut, Robert Hass, Tushar Ahuja Discussion: https://postgr.es/m/CAOG9ApEAcQ--1ieKbhFzXSQPw_YLmepaa4hNdnY5+ZULpt81Mw@mail.gmail.com
This commit is contained in:
@@ -26,6 +26,7 @@
|
||||
#include <zlib.h>
|
||||
#endif
|
||||
|
||||
#include "access/xlog_internal.h"
|
||||
#include "common/file_utils.h"
|
||||
#include "common/string.h"
|
||||
#include "fe_utils/string_utils.h"
|
||||
@@ -555,7 +556,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
|
||||
}
|
||||
param->startptr = ((uint64) hi) << 32 | lo;
|
||||
/* Round off to even segment position */
|
||||
param->startptr -= param->startptr % XLOG_SEG_SIZE;
|
||||
param->startptr -= XLogSegmentOffset(param->startptr, WalSegSz);
|
||||
|
||||
#ifndef WIN32
|
||||
/* Create our background pipe */
|
||||
@@ -2397,6 +2398,10 @@ main(int argc, char **argv)
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* determine remote server's xlog segment size */
|
||||
if (!RetrieveWalSegSize(conn))
|
||||
disconnect_and_exit(1);
|
||||
|
||||
/* Create pg_wal symlink, if required */
|
||||
if (xlog_dir)
|
||||
{
|
||||
|
@@ -191,7 +191,7 @@ close_destination_dir(DIR *dest_dir, char *dest_folder)
|
||||
/*
|
||||
* Determine starting location for streaming, based on any existing xlog
|
||||
* segments in the directory. We start at the end of the last one that is
|
||||
* complete (size matches XLogSegSize), on the timeline with highest ID.
|
||||
* complete (size matches wal segment size), on the timeline with highest ID.
|
||||
*
|
||||
* If there are no WAL files in the directory, returns InvalidXLogRecPtr.
|
||||
*/
|
||||
@@ -242,7 +242,7 @@ FindStreamingStart(uint32 *tli)
|
||||
/*
|
||||
* Looks like an xlog file. Parse its position.
|
||||
*/
|
||||
XLogFromFileName(dirent->d_name, &tli, &segno);
|
||||
XLogFromFileName(dirent->d_name, &tli, &segno, WalSegSz);
|
||||
|
||||
/*
|
||||
* Check that the segment has the right size, if it's supposed to be
|
||||
@@ -267,7 +267,7 @@ FindStreamingStart(uint32 *tli)
|
||||
disconnect_and_exit(1);
|
||||
}
|
||||
|
||||
if (statbuf.st_size != XLOG_SEG_SIZE)
|
||||
if (statbuf.st_size != WalSegSz)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: segment file \"%s\" has incorrect size %d, skipping\n"),
|
||||
@@ -308,7 +308,7 @@ FindStreamingStart(uint32 *tli)
|
||||
bytes_out = (buf[3] << 24) | (buf[2] << 16) |
|
||||
(buf[1] << 8) | buf[0];
|
||||
|
||||
if (bytes_out != XLOG_SEG_SIZE)
|
||||
if (bytes_out != WalSegSz)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: compressed segment file \"%s\" has incorrect uncompressed size %d, skipping\n"),
|
||||
@@ -349,7 +349,7 @@ FindStreamingStart(uint32 *tli)
|
||||
if (!high_ispartial)
|
||||
high_segno++;
|
||||
|
||||
XLogSegNoOffsetToRecPtr(high_segno, 0, high_ptr);
|
||||
XLogSegNoOffsetToRecPtr(high_segno, 0, high_ptr, WalSegSz);
|
||||
|
||||
*tli = high_tli;
|
||||
return high_ptr;
|
||||
@@ -410,7 +410,7 @@ StreamLog(void)
|
||||
/*
|
||||
* Always start streaming at the beginning of a segment
|
||||
*/
|
||||
stream.startpos -= stream.startpos % XLOG_SEG_SIZE;
|
||||
stream.startpos -= XLogSegmentOffset(stream.startpos, WalSegSz);
|
||||
|
||||
/*
|
||||
* Start the replication
|
||||
@@ -689,6 +689,10 @@ main(int argc, char **argv)
|
||||
if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
|
||||
disconnect_and_exit(1);
|
||||
|
||||
/* determine remote server's xlog segment size */
|
||||
if (!RetrieveWalSegSize(conn))
|
||||
disconnect_and_exit(1);
|
||||
|
||||
/*
|
||||
* Check that there is a database associated with connection, none should
|
||||
* be defined in this context.
|
||||
|
@@ -95,17 +95,17 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
|
||||
ssize_t size;
|
||||
XLogSegNo segno;
|
||||
|
||||
XLByteToSeg(startpoint, segno);
|
||||
XLogFileName(current_walfile_name, stream->timeline, segno);
|
||||
XLByteToSeg(startpoint, segno, WalSegSz);
|
||||
XLogFileName(current_walfile_name, stream->timeline, segno, WalSegSz);
|
||||
|
||||
snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
|
||||
stream->partial_suffix ? stream->partial_suffix : "");
|
||||
|
||||
/*
|
||||
* When streaming to files, if an existing file exists we verify that it's
|
||||
* either empty (just created), or a complete XLogSegSize segment (in
|
||||
* which case it has been created and padded). Anything else indicates a
|
||||
* corrupt file.
|
||||
* either empty (just created), or a complete WalSegSz segment (in which
|
||||
* case it has been created and padded). Anything else indicates a corrupt
|
||||
* file.
|
||||
*
|
||||
* When streaming to tar, no file with this name will exist before, so we
|
||||
* never have to verify a size.
|
||||
@@ -120,7 +120,7 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
|
||||
progname, fn, stream->walmethod->getlasterror());
|
||||
return false;
|
||||
}
|
||||
if (size == XLogSegSize)
|
||||
if (size == WalSegSz)
|
||||
{
|
||||
/* Already padded file. Open it for use */
|
||||
f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
|
||||
@@ -154,7 +154,7 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
|
||||
ngettext("%s: write-ahead log file \"%s\" has %d byte, should be 0 or %d\n",
|
||||
"%s: write-ahead log file \"%s\" has %d bytes, should be 0 or %d\n",
|
||||
size),
|
||||
progname, fn, (int) size, XLogSegSize);
|
||||
progname, fn, (int) size, WalSegSz);
|
||||
return false;
|
||||
}
|
||||
/* File existed and was empty, so fall through and open */
|
||||
@@ -162,7 +162,8 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
|
||||
|
||||
/* No file existed, so create one */
|
||||
|
||||
f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, XLogSegSize);
|
||||
f = stream->walmethod->open_for_write(current_walfile_name,
|
||||
stream->partial_suffix, WalSegSz);
|
||||
if (f == NULL)
|
||||
{
|
||||
fprintf(stderr,
|
||||
@@ -203,7 +204,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
|
||||
|
||||
if (stream->partial_suffix)
|
||||
{
|
||||
if (currpos == XLOG_SEG_SIZE)
|
||||
if (currpos == WalSegSz)
|
||||
r = stream->walmethod->close(walfile, CLOSE_NORMAL);
|
||||
else
|
||||
{
|
||||
@@ -231,7 +232,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
|
||||
* new node. This is in line with walreceiver.c always doing a
|
||||
* XLogArchiveForceDone() after a complete segment.
|
||||
*/
|
||||
if (currpos == XLOG_SEG_SIZE && stream->mark_done)
|
||||
if (currpos == WalSegSz && stream->mark_done)
|
||||
{
|
||||
/* writes error message if failed */
|
||||
if (!mark_file_as_archived(stream, current_walfile_name))
|
||||
@@ -676,7 +677,8 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
|
||||
* start streaming at the beginning of a segment.
|
||||
*/
|
||||
stream->timeline = newtimeline;
|
||||
stream->startpos = stream->startpos - (stream->startpos % XLOG_SEG_SIZE);
|
||||
stream->startpos = stream->startpos -
|
||||
XLogSegmentOffset(stream->startpos, WalSegSz);
|
||||
continue;
|
||||
}
|
||||
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
|
||||
@@ -1111,7 +1113,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
|
||||
*blockpos = fe_recvint64(©buf[1]);
|
||||
|
||||
/* Extract WAL location for this block */
|
||||
xlogoff = *blockpos % XLOG_SEG_SIZE;
|
||||
xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
|
||||
|
||||
/*
|
||||
* Verify that the initial location in the stream matches where we think
|
||||
@@ -1148,11 +1150,11 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
|
||||
int bytes_to_write;
|
||||
|
||||
/*
|
||||
* If crossing a WAL boundary, only write up until we reach
|
||||
* XLOG_SEG_SIZE.
|
||||
* If crossing a WAL boundary, only write up until we reach wal
|
||||
* segment size.
|
||||
*/
|
||||
if (xlogoff + bytes_left > XLOG_SEG_SIZE)
|
||||
bytes_to_write = XLOG_SEG_SIZE - xlogoff;
|
||||
if (xlogoff + bytes_left > WalSegSz)
|
||||
bytes_to_write = WalSegSz - xlogoff;
|
||||
else
|
||||
bytes_to_write = bytes_left;
|
||||
|
||||
@@ -1182,7 +1184,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
|
||||
xlogoff += bytes_to_write;
|
||||
|
||||
/* Did we reach the end of a WAL segment? */
|
||||
if (*blockpos % XLOG_SEG_SIZE == 0)
|
||||
if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
|
||||
{
|
||||
if (!close_walfile(stream, *blockpos))
|
||||
/* Error message written in close_walfile() */
|
||||
|
@@ -25,12 +25,18 @@
|
||||
#include "receivelog.h"
|
||||
#include "streamutil.h"
|
||||
|
||||
#include "access/xlog_internal.h"
|
||||
#include "pqexpbuffer.h"
|
||||
#include "common/fe_memutils.h"
|
||||
#include "datatype/timestamp.h"
|
||||
|
||||
#define ERRCODE_DUPLICATE_OBJECT "42710"
|
||||
|
||||
uint32 WalSegSz;
|
||||
|
||||
/* SHOW command for replication connection was introduced in version 10 */
|
||||
#define MINIMUM_VERSION_FOR_SHOW_CMD 100000
|
||||
|
||||
const char *progname;
|
||||
char *connection_string = NULL;
|
||||
char *dbhost = NULL;
|
||||
@@ -231,6 +237,76 @@ GetConnection(void)
|
||||
return tmpconn;
|
||||
}
|
||||
|
||||
/*
|
||||
* From version 10, explicitly set wal segment size using SHOW wal_segment_size
|
||||
* since ControlFile is not accessible here.
|
||||
*/
|
||||
bool
|
||||
RetrieveWalSegSize(PGconn *conn)
|
||||
{
|
||||
PGresult *res;
|
||||
char xlog_unit[3];
|
||||
int xlog_val,
|
||||
multiplier = 1;
|
||||
|
||||
/* check connection existence */
|
||||
Assert(conn != NULL);
|
||||
|
||||
/* for previous versions set the default xlog seg size */
|
||||
if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_SHOW_CMD)
|
||||
{
|
||||
WalSegSz = DEFAULT_XLOG_SEG_SIZE;
|
||||
return true;
|
||||
}
|
||||
|
||||
res = PQexec(conn, "SHOW wal_segment_size");
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
{
|
||||
fprintf(stderr, _("%s: could not send replication command \"%s\": %s\n"),
|
||||
progname, "SHOW wal_segment_size", PQerrorMessage(conn));
|
||||
|
||||
PQclear(res);
|
||||
return false;
|
||||
}
|
||||
if (PQntuples(res) != 1 || PQnfields(res) < 1)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
|
||||
progname, PQntuples(res), PQnfields(res), 1, 1);
|
||||
|
||||
PQclear(res);
|
||||
return false;
|
||||
}
|
||||
|
||||
/* fetch xlog value and unit from the result */
|
||||
if (sscanf(PQgetvalue(res, 0, 0), "%d%s", &xlog_val, xlog_unit) != 2)
|
||||
{
|
||||
fprintf(stderr, _("%s: WAL segment size could not be parsed\n"),
|
||||
progname);
|
||||
return false;
|
||||
}
|
||||
|
||||
/* set the multiplier based on unit to convert xlog_val to bytes */
|
||||
if (strcmp(xlog_unit, "MB") == 0)
|
||||
multiplier = 1024 * 1024;
|
||||
else if (strcmp(xlog_unit, "GB") == 0)
|
||||
multiplier = 1024 * 1024 * 1024;
|
||||
|
||||
/* convert and set WalSegSz */
|
||||
WalSegSz = xlog_val * multiplier;
|
||||
|
||||
if (!IsValidWalSegSize(WalSegSz))
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: WAL segment size must be a power of two between 1MB and 1GB, but the remote server reported a value of %d bytes\n"),
|
||||
progname, WalSegSz);
|
||||
return false;
|
||||
}
|
||||
|
||||
PQclear(res);
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Run IDENTIFY_SYSTEM through a given connection and give back to caller
|
||||
* some result information if requested:
|
||||
|
@@ -24,6 +24,7 @@ extern char *dbuser;
|
||||
extern char *dbport;
|
||||
extern char *dbname;
|
||||
extern int dbgetpassword;
|
||||
extern uint32 WalSegSz;
|
||||
|
||||
/* Connection kept global so we can disconnect easily */
|
||||
extern PGconn *conn;
|
||||
@@ -39,6 +40,7 @@ extern bool RunIdentifySystem(PGconn *conn, char **sysid,
|
||||
TimeLineID *starttli,
|
||||
XLogRecPtr *startpos,
|
||||
char **db_name);
|
||||
extern bool RetrieveWalSegSize(PGconn *conn);
|
||||
extern TimestampTz feGetCurrentTimestamp(void);
|
||||
extern void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
|
||||
long *secs, int *microsecs);
|
||||
|
Reference in New Issue
Block a user