diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index ab9692a6d56..94852877a2d 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -372,10 +372,20 @@ typedef struct static int LogStreamerMain(logstreamer_param *param) { - if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, - param->sysidentifier, param->xlogdir, - reached_end_position, standby_message_timeout, - NULL, false, true)) + StreamCtl stream; + + MemSet(&stream, sizeof(stream), 0); + stream.startpos = param->startptr; + stream.timeline = param->timeline; + stream.sysidentifier = param->sysidentifier; + stream.stream_stop = reached_end_position; + stream.standby_message_timeout = standby_message_timeout; + stream.synchronous = false; + stream.mark_done = true; + stream.basedir = param->xlogdir; + stream.partial_suffix = NULL; + + if (!ReceiveXlogStream(param->bgconn, &stream)) /* * Any errors will already have been reported in the function process, diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index f96b547b0f3..7f7ee9dc9ba 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -276,10 +276,11 @@ FindStreamingStart(uint32 *tli) static void StreamLog(void) { - XLogRecPtr startpos, - serverpos; - TimeLineID starttli, - servertli; + XLogRecPtr serverpos; + TimeLineID servertli; + StreamCtl stream; + + MemSet(&stream, 0, sizeof(stream)); /* * Connect in replication mode to the server @@ -311,17 +312,17 @@ StreamLog(void) /* * Figure out where to start streaming. */ - startpos = FindStreamingStart(&starttli); - if (startpos == InvalidXLogRecPtr) + stream.startpos = FindStreamingStart(&stream.timeline); + if (stream.startpos == InvalidXLogRecPtr) { - startpos = serverpos; - starttli = servertli; + stream.startpos = serverpos; + stream.timeline = servertli; } /* * Always start streaming at the beginning of a segment */ - startpos -= startpos % XLOG_SEG_SIZE; + stream.startpos -= stream.startpos % XLOG_SEG_SIZE; /* * Start the replication @@ -329,12 +330,17 @@ StreamLog(void) if (verbose) fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"), - progname, (uint32) (startpos >> 32), (uint32) startpos, - starttli); + progname, (uint32) (stream.startpos >> 32), (uint32) stream.startpos, + stream.timeline); - ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, - stop_streaming, standby_message_timeout, ".partial", - synchronous, false); + stream.stream_stop = stop_streaming; + stream.standby_message_timeout = standby_message_timeout; + stream.synchronous = synchronous; + stream.mark_done = false; + stream.basedir = basedir; + stream.partial_suffix = ".partial"; + + ReceiveXlogStream(conn, &stream); PQfinish(conn); conn = NULL; diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 01c42fc0639..595213f0420 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -33,27 +33,18 @@ static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; static bool still_sending = true; /* feedback still needs to be sent? */ -static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, - uint32 timeline, char *basedir, - stream_stop_callback stream_stop, int standby_message_timeout, - char *partial_suffix, XLogRecPtr *stoppos, - bool synchronous, bool mark_done); +static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream, + XLogRecPtr *stoppos); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, XLogRecPtr blockpos, int64 *last_status); -static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, - XLogRecPtr *blockpos, uint32 timeline, - char *basedir, stream_stop_callback stream_stop, - char *partial_suffix, bool mark_done); -static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf, - XLogRecPtr blockpos, char *basedir, char *partial_suffix, - XLogRecPtr *stoppos, bool mark_done); -static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, - uint32 timeline, char *basedir, - stream_stop_callback stream_stop, - char *partial_suffix, XLogRecPtr *stoppos, - bool mark_done); +static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, + XLogRecPtr *blockpos); +static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, + XLogRecPtr blockpos, XLogRecPtr *stoppos); +static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos, + XLogRecPtr *stoppos); static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, int64 last_status); @@ -99,8 +90,7 @@ mark_file_as_archived(const char *basedir, const char *fname) * partial_suffix) is stored in current_walfile_name. */ static bool -open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, - char *partial_suffix) +open_walfile(StreamCtl *stream, XLogRecPtr startpoint) { int f; char fn[MAXPGPATH]; @@ -110,10 +100,10 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, XLogSegNo segno; XLByteToSeg(startpoint, segno); - XLogFileName(current_walfile_name, timeline, segno); + XLogFileName(current_walfile_name, stream->timeline, segno); - snprintf(fn, sizeof(fn), "%s/%s%s", basedir, current_walfile_name, - partial_suffix ? partial_suffix : ""); + snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name, + stream->partial_suffix ? stream->partial_suffix : ""); f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); if (f == -1) { @@ -185,7 +175,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, * and returns false, otherwise returns true. */ static bool -close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_done) +close_walfile(StreamCtl *stream, XLogRecPtr pos) { off_t currpos; @@ -220,13 +210,13 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don /* * If we finished writing a .partial file, rename it into place. */ - if (currpos == XLOG_SEG_SIZE && partial_suffix) + if (currpos == XLOG_SEG_SIZE && stream->partial_suffix) { char oldfn[MAXPGPATH]; char newfn[MAXPGPATH]; - snprintf(oldfn, sizeof(oldfn), "%s/%s%s", basedir, current_walfile_name, partial_suffix); - snprintf(newfn, sizeof(newfn), "%s/%s", basedir, current_walfile_name); + snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix); + snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name); if (rename(oldfn, newfn) != 0) { fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"), @@ -234,10 +224,10 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don return false; } } - else if (partial_suffix) + else if (stream->partial_suffix) fprintf(stderr, _("%s: not renaming \"%s%s\", segment is not complete\n"), - progname, current_walfile_name, partial_suffix); + progname, current_walfile_name, stream->partial_suffix); /* * Mark file as archived if requested by the caller - pg_basebackup needs @@ -245,10 +235,10 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don * new node. This is in line with walreceiver.c always doing a * XLogArchiveForceDone() after a complete segment. */ - if (currpos == XLOG_SEG_SIZE && mark_done) + if (currpos == XLOG_SEG_SIZE && stream->mark_done) { /* writes error message if failed */ - if (!mark_file_as_archived(basedir, current_walfile_name)) + if (!mark_file_as_archived(stream->basedir, current_walfile_name)) return false; } @@ -261,7 +251,7 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don * Check if a timeline history file exists. */ static bool -existsTimeLineHistoryFile(char *basedir, TimeLineID tli) +existsTimeLineHistoryFile(StreamCtl *stream) { char path[MAXPGPATH]; char histfname[MAXFNAMELEN]; @@ -271,12 +261,12 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli) * Timeline 1 never has a history file. We treat that as if it existed, * since we never need to stream it. */ - if (tli == 1) + if (stream->timeline == 1) return true; - TLHistoryFileName(histfname, tli); + TLHistoryFileName(histfname, stream->timeline); - snprintf(path, sizeof(path), "%s/%s", basedir, histfname); + snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname); fd = open(path, O_RDONLY | PG_BINARY, 0); if (fd < 0) @@ -294,8 +284,7 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli) } static bool -writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, - char *content, bool mark_done) +writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) { int size = strlen(content); char path[MAXPGPATH]; @@ -307,15 +296,15 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, * Check that the server's idea of how timeline history files should be * named matches ours. */ - TLHistoryFileName(histfname, tli); + TLHistoryFileName(histfname, stream->timeline); if (strcmp(histfname, filename) != 0) { fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"), - progname, tli, filename); + progname, stream->timeline, filename); return false; } - snprintf(path, sizeof(path), "%s/%s", basedir, histfname); + snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname); /* * Write into a temp file name. @@ -375,10 +364,10 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, } /* Maintain archive_status, check close_walfile() for details. */ - if (mark_done) + if (stream->mark_done) { /* writes error message if failed */ - if (!mark_file_as_archived(basedir, histfname)) + if (!mark_file_as_archived(stream->basedir, histfname)) return false; } @@ -468,6 +457,8 @@ CheckServerVersionForStreaming(PGconn *conn) /* * Receive a log stream starting at the specified position. * + * Individual parameters are passed through the StreamCtl structure. + * * If sysidentifier is specified, validate that both the system * identifier and the timeline matches the specified ones * (by sending an extra IDENTIFY_SYSTEM command) @@ -498,11 +489,7 @@ CheckServerVersionForStreaming(PGconn *conn) * Note: The log position *must* be at a log segment start! */ bool -ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, - char *sysidentifier, char *basedir, - stream_stop_callback stream_stop, - int standby_message_timeout, char *partial_suffix, - bool synchronous, bool mark_done) +ReceiveXlogStream(PGconn *conn, StreamCtl *stream) { char query[128]; char slotcmd[128]; @@ -539,7 +526,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, slotcmd[0] = 0; } - if (sysidentifier != NULL) + if (stream->sysidentifier != NULL) { /* Validate system identifier hasn't changed */ res = PQexec(conn, "IDENTIFY_SYSTEM"); @@ -559,7 +546,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, PQclear(res); return false; } - if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0) + if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0) { fprintf(stderr, _("%s: system identifier does not match between base backup and streaming connection\n"), @@ -567,11 +554,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, PQclear(res); return false; } - if (timeline > atoi(PQgetvalue(res, 0, 1))) + if (stream->timeline > atoi(PQgetvalue(res, 0, 1))) { fprintf(stderr, _("%s: starting timeline %u is not present in the server\n"), - progname, timeline); + progname, stream->timeline); PQclear(res); return false; } @@ -582,7 +569,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * initialize flush position to starting point, it's the caller's * responsibility that that's sane. */ - lastFlushPosition = startpos; + lastFlushPosition = stream->startpos; while (1) { @@ -590,9 +577,9 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * Fetch the timeline history file for this timeline, if we don't have * it already. */ - if (!existsTimeLineHistoryFile(basedir, timeline)) + if (!existsTimeLineHistoryFile(stream)) { - snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline); + snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline); res = PQexec(conn, query); if (PQresultStatus(res) != PGRES_TUPLES_OK) { @@ -615,10 +602,9 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, } /* Write the history file to disk */ - writeTimeLineHistoryFile(basedir, timeline, + writeTimeLineHistoryFile(stream, PQgetvalue(res, 0, 0), - PQgetvalue(res, 0, 1), - mark_done); + PQgetvalue(res, 0, 1)); PQclear(res); } @@ -627,14 +613,14 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * Before we start streaming from the requested location, check if the * callback tells us to stop here. */ - if (stream_stop(startpos, timeline, false)) + if (stream->stream_stop(stream->startpos, stream->timeline, false)) return true; /* Initiate the replication stream at specified location */ snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u", slotcmd, - (uint32) (startpos >> 32), (uint32) startpos, - timeline); + (uint32) (stream->startpos >> 32), (uint32) stream->startpos, + stream->timeline); res = PQexec(conn, query); if (PQresultStatus(res) != PGRES_COPY_BOTH) { @@ -646,9 +632,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, PQclear(res); /* Stream the WAL */ - res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, - standby_message_timeout, partial_suffix, - &stoppos, synchronous, mark_done); + res = HandleCopyStream(conn, stream, &stoppos); if (res == NULL) goto error; @@ -676,26 +660,26 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, uint32 newtimeline; bool parsed; - parsed = ReadEndOfStreamingResult(res, &startpos, &newtimeline); + parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline); PQclear(res); if (!parsed) goto error; /* Sanity check the values the server gave us */ - if (newtimeline <= timeline) + if (newtimeline <= stream->timeline) { fprintf(stderr, _("%s: server reported unexpected next timeline %u, following timeline %u\n"), - progname, newtimeline, timeline); + progname, newtimeline, stream->timeline); goto error; } - if (startpos > stoppos) + if (stream->startpos > stoppos) { fprintf(stderr, _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"), progname, - timeline, (uint32) (stoppos >> 32), (uint32) stoppos, - newtimeline, (uint32) (startpos >> 32), (uint32) startpos); + stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos, + newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos); goto error; } @@ -715,8 +699,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * Loop back to start streaming from the new timeline. Always * start streaming at the beginning of a segment. */ - timeline = newtimeline; - startpos = startpos - (startpos % XLOG_SEG_SIZE); + stream->timeline = newtimeline; + stream->startpos = stream->startpos - (stream->startpos % XLOG_SEG_SIZE); continue; } else if (PQresultStatus(res) == PGRES_COMMAND_OK) @@ -729,7 +713,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * Check if the callback thinks it's OK to stop here. If not, * complain. */ - if (stream_stop(stoppos, timeline, false)) + if (stream->stream_stop(stoppos, stream->timeline, false)) return true; else { @@ -810,14 +794,12 @@ ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline) * On any other sort of error, returns NULL. */ static PGresult * -HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, - char *basedir, stream_stop_callback stream_stop, - int standby_message_timeout, char *partial_suffix, - XLogRecPtr *stoppos, bool synchronous, bool mark_done) +HandleCopyStream(PGconn *conn, StreamCtl *stream, + XLogRecPtr *stoppos) { char *copybuf = NULL; int64 last_status = -1; - XLogRecPtr blockpos = startpos; + XLogRecPtr blockpos = stream->startpos; still_sending = true; @@ -830,9 +812,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* * Check if we should continue streaming, or abort at this point. */ - if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, - stream_stop, partial_suffix, stoppos, - mark_done)) + if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos)) goto error; now = feGetCurrentTimestamp(); @@ -841,7 +821,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * If synchronous option is true, issue sync command as soon as there * are WAL data which has not been flushed yet. */ - if (synchronous && lastFlushPosition < blockpos && walfile != -1) + if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1) { if (fsync(walfile) != 0) { @@ -863,9 +843,9 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* * Potentially send a status message to the master */ - if (still_sending && standby_message_timeout > 0 && + if (still_sending && stream->standby_message_timeout > 0 && feTimestampDifferenceExceeds(last_status, now, - standby_message_timeout)) + stream->standby_message_timeout)) { /* Time to send feedback! */ if (!sendFeedback(conn, blockpos, now, false)) @@ -876,7 +856,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* * Calculate how long send/receive loops should sleep */ - sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout, + sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout, last_status); r = CopyStreamReceive(conn, sleeptime, ©buf); @@ -886,9 +866,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, goto error; if (r == -2) { - PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos, - basedir, partial_suffix, - stoppos, mark_done); + PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos); if (res == NULL) goto error; @@ -905,18 +883,14 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, } else if (copybuf[0] == 'w') { - if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos, - timeline, basedir, stream_stop, - partial_suffix, mark_done)) + if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos)) goto error; /* * Check if we should continue streaming, or abort at this * point. */ - if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, - stream_stop, partial_suffix, stoppos, - mark_done)) + if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos)) goto error; } else @@ -1114,10 +1088,8 @@ ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, * Process XLogData message. */ static bool -ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, - XLogRecPtr *blockpos, uint32 timeline, - char *basedir, stream_stop_callback stream_stop, - char *partial_suffix, bool mark_done) +ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, + XLogRecPtr *blockpos) { int xlogoff; int bytes_left; @@ -1197,8 +1169,7 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, if (walfile == -1) { - if (!open_walfile(*blockpos, timeline, - basedir, partial_suffix)) + if (!open_walfile(stream, *blockpos)) { /* Error logged by open_walfile */ return false; @@ -1225,13 +1196,13 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, /* Did we reach the end of a WAL segment? */ if (*blockpos % XLOG_SEG_SIZE == 0) { - if (!close_walfile(basedir, partial_suffix, *blockpos, mark_done)) + if (!close_walfile(stream, *blockpos)) /* Error message written in close_walfile() */ return false; xlogoff = 0; - if (still_sending && stream_stop(*blockpos, timeline, true)) + if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true)) { if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) { @@ -1253,9 +1224,8 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, * Handle end of the copy stream. */ static PGresult * -HandleEndOfCopyStream(PGconn *conn, char *copybuf, - XLogRecPtr blockpos, char *basedir, char *partial_suffix, - XLogRecPtr *stoppos, bool mark_done) +HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, + XLogRecPtr blockpos, XLogRecPtr *stoppos) { PGresult *res = PQgetResult(conn); @@ -1266,7 +1236,7 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf, */ if (still_sending) { - if (!close_walfile(basedir, partial_suffix, blockpos, mark_done)) + if (!close_walfile(stream, blockpos)) { /* Error message written in close_walfile() */ PQclear(res); @@ -1296,13 +1266,12 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf, * Check if we should continue streaming, or abort at this point. */ static bool -CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline, - char *basedir, stream_stop_callback stream_stop, - char *partial_suffix, XLogRecPtr *stoppos, bool mark_done) +CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos, + XLogRecPtr *stoppos) { - if (still_sending && stream_stop(blockpos, timeline, false)) + if (still_sending && stream->stream_stop(blockpos, stream->timeline, false)) { - if (!close_walfile(basedir, partial_suffix, blockpos, mark_done)) + if (!close_walfile(stream, blockpos)) { /* Potential error message is written by close_walfile */ return false; diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h index 8d4dbf285b4..554ff8b5b28 100644 --- a/src/bin/pg_basebackup/receivelog.h +++ b/src/bin/pg_basebackup/receivelog.h @@ -22,16 +22,31 @@ */ typedef bool (*stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished); +/* + * Global parameters when receiving xlog stream. For details about the individual fields, + * see the function comment for ReceiveXlogStream(). + */ +typedef struct StreamCtl +{ + XLogRecPtr startpos; /* Start position for streaming */ + TimeLineID timeline; /* Timeline to stream data from */ + char *sysidentifier; /* Validate this system identifier and + * timeline */ + int standby_message_timeout; /* Send status messages this + * often */ + bool synchronous; /* Flush data on write */ + bool mark_done; /* Mark segment as done in generated archive */ + + stream_stop_callback stream_stop; /* Stop streaming when returns true */ + + char *basedir; /* Received segments written to this dir */ + char *partial_suffix; /* Suffix appended to partially received files */ +} StreamCtl; + + + extern bool CheckServerVersionForStreaming(PGconn *conn); extern bool ReceiveXlogStream(PGconn *conn, - XLogRecPtr startpos, - uint32 timeline, - char *sysidentifier, - char *basedir, - stream_stop_callback stream_stop, - int standby_message_timeout, - char *partial_suffix, - bool synchronous, - bool mark_done); + StreamCtl *stream); #endif /* RECEIVELOG_H */