diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 0289c4bc4f8..6a2e557809a 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -78,7 +78,7 @@ static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum); static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); static void BaseBackup(void); -static bool segment_callback(XLogRecPtr segendpos, uint32 timeline); +static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline, bool segment_finished); #ifdef HAVE_LIBZ static const char * @@ -129,8 +129,7 @@ usage(void) /* - * Called in the background process whenever a complete segment of WAL - * has been received. + * Called in the background process every time data is received. * On Unix, we check to see if there is any data on our pipe * (which would mean we have a stop position), and if it is, check if * it is time to stop. @@ -138,7 +137,7 @@ usage(void) * time to stop. */ static bool -segment_callback(XLogRecPtr segendpos, uint32 timeline) +reached_end_position(XLogRecPtr segendpos, uint32 timeline, bool segment_finished) { if (!has_xlogendptr) { @@ -231,7 +230,7 @@ LogStreamerMain(logstreamer_param * param) { if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, - segment_callback, NULL, standby_message_timeout)) + reached_end_position, standby_message_timeout, true)) /* * Any errors will already have been reported in the function process, diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index 2134c8729cc..01f20f372a4 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -43,7 +43,7 @@ volatile bool time_to_abort = false; static void usage(void); static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline); static void StreamLog(); -static bool segment_callback(XLogRecPtr segendpos, uint32 timeline); +static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished); static void usage(void) @@ -69,21 +69,12 @@ usage(void) } static bool -segment_callback(XLogRecPtr segendpos, uint32 timeline) +stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished) { - if (verbose) + if (verbose && segment_finished) fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"), progname, segendpos.xlogid, segendpos.xrecoff, timeline); - /* - * Never abort from this - we handle all aborting in continue_streaming() - */ - return false; -} - -static bool -continue_streaming(void) -{ if (time_to_abort) { fprintf(stderr, _("%s: received interrupt signal, exiting.\n"), @@ -268,8 +259,8 @@ StreamLog(void) progname, startpos.xlogid, startpos.xrecoff, timeline); ReceiveXlogStream(conn, startpos, timeline, NULL, basedir, - segment_callback, continue_streaming, - standby_message_timeout); + stop_streaming, + standby_message_timeout, false); PQfinish(conn); } diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index b0cf836968d..efbc4ca6533 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -113,8 +113,14 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu return f; } +/* + * Close the current WAL file, and rename it to the correct filename if it's complete. + * + * If segment_complete is true, rename the current WAL file even if we've not + * completed writing the whole segment. + */ static bool -close_walfile(int walfile, char *basedir, char *walname) +close_walfile(int walfile, char *basedir, char *walname, bool segment_complete) { off_t currpos = lseek(walfile, 0, SEEK_CUR); @@ -141,9 +147,9 @@ close_walfile(int walfile, char *basedir, char *walname) /* * Rename the .partial file only if we've completed writing the - * whole segment. + * whole segment or segment_complete is true. */ - if (currpos == XLOG_SEG_SIZE) + if (currpos == XLOG_SEG_SIZE || segment_complete) { char oldfn[MAXPGPATH]; char newfn[MAXPGPATH]; @@ -199,11 +205,10 @@ localGetCurrentTimestamp(void) * All received segments will be written to the directory * specified by basedir. * - * The segment_finish callback will be called after each segment - * has been finished, and the stream_continue callback will be - * called every time data is received. If either of these callbacks - * return true, the streaming will stop and the function - * return. As long as they return false, streaming will continue + * The stream_stop callback will be called every time data + * is received, and whenever a segment is completed. If it returns + * true, the streaming will stop and the function + * return. As long as it returns false, streaming will continue * indefinitely. * * standby_message_timeout controls how often we send a message @@ -214,7 +219,7 @@ localGetCurrentTimestamp(void) * Note: The log position *must* be at a log segment start! */ bool -ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, segment_finish_callback segment_finish, stream_continue_callback stream_continue, int standby_message_timeout) +ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, bool rename_partial) { char query[128]; char current_walfile_name[MAXPGPATH]; @@ -288,11 +293,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi /* * Check if we should continue streaming, or abort at this point. */ - if (stream_continue && stream_continue()) + if (stream_stop && stream_stop(blockpos, timeline, false)) { if (walfile != -1) /* Potential error message is written by close_walfile */ - return close_walfile(walfile, basedir, current_walfile_name); + return close_walfile(walfile, basedir, current_walfile_name, rename_partial); return true; } @@ -486,20 +491,20 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi /* Did we reach the end of a WAL segment? */ if (blockpos.xrecoff % XLOG_SEG_SIZE == 0) { - if (!close_walfile(walfile, basedir, current_walfile_name)) + if (!close_walfile(walfile, basedir, current_walfile_name, false)) /* Error message written in close_walfile() */ return false; walfile = -1; xlogoff = 0; - if (segment_finish != NULL) + if (stream_stop != NULL) { /* * Callback when the segment finished, and return if it * told us to. */ - if (segment_finish(blockpos, timeline)) + if (stream_stop(blockpos, timeline, true)) return true; } } diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h index 1c61ea8ac1d..0a803ee4ac1 100644 --- a/src/bin/pg_basebackup/receivelog.h +++ b/src/bin/pg_basebackup/receivelog.h @@ -1,22 +1,16 @@ #include "access/xlogdefs.h" /* - * Called whenever a segment is finished, return true to stop - * the streaming at this point. + * Called before trying to read more data or when a segment is + * finished. Return true to stop streaming. */ -typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline); - -/* - * Called before trying to read more data. Return true to stop - * the streaming at this point. - */ -typedef bool (*stream_continue_callback)(void); +typedef bool (*stream_stop_callback)(XLogRecPtr segendpos, uint32 timeline, bool segment_finished); extern bool ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, - segment_finish_callback segment_finish, - stream_continue_callback stream_continue, - int standby_message_timeout); + stream_stop_callback stream_stop, + int standby_message_timeout, + bool rename_partial);