From 4fd14794945e60c4aedad84b06ff1e2a7a6236f0 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Mon, 19 Sep 2022 11:20:18 -0400 Subject: [PATCH] walmethods.c/h: Make Walfile a struct, rather than a void * This makes the curent file position and pathname visible in a generic way, so we no longer need current_walfile_name global variable or the get_current_pos() method. Since that purported to be able to fail but never actually did, this also lets us get rid of some unnecessary error-handling code. One risk of this change is that the get_current_pos() method previously cleared the error indicator, and that will no longer happen with the new approach. I looked for a way that this could cause problems and did not find one. The previous code was confused about whether "Walfile" was the implementation-dependent structure representing a WAL file or whether it was a pointer to that stucture. Some of the code used it one way, and some in the other. The compiler tolerated that because void * is interchangeable with void **, but now that Walfile is a struct, it's necessary to be consistent. Hence, some references to "Walfile" have been converted to "Walfile *". Discussion: http://postgr.es/m/CA+TgmoZS0Kw98fOoAcGz8B9iDhdqB4Be4e=vDZaJZ5A-xMYBqA@mail.gmail.com --- src/bin/pg_basebackup/receivelog.c | 48 +++++++--------- src/bin/pg_basebackup/walmethods.c | 89 +++++++++++------------------- src/bin/pg_basebackup/walmethods.h | 27 ++++++--- 3 files changed, 70 insertions(+), 94 deletions(-) diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 5f6fd3201f3..a619176511f 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -25,9 +25,8 @@ #include "receivelog.h" #include "streamutil.h" -/* fd and filename for currently open WAL file */ +/* currently open WAL file */ static Walfile *walfile = NULL; -static char current_walfile_name[MAXPGPATH] = ""; static bool reportFlushPosition = false; static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; @@ -82,8 +81,7 @@ mark_file_as_archived(StreamCtl *stream, const char *fname) * Open a new WAL file in the specified directory. * * Returns true if OK; on failure, returns false after printing an error msg. - * On success, 'walfile' is set to the FD for the file, and the base filename - * (without partial_suffix) is stored in 'current_walfile_name'. + * On success, 'walfile' is set to the opened WAL file. * * The file will be padded to 16Mb with zeroes. */ @@ -94,12 +92,13 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint) char *fn; ssize_t size; XLogSegNo segno; + char walfile_name[MAXPGPATH]; XLByteToSeg(startpoint, segno, WalSegSz); - XLogFileName(current_walfile_name, stream->timeline, segno, WalSegSz); + XLogFileName(walfile_name, stream->timeline, segno, WalSegSz); /* Note that this considers the compression used if necessary */ - fn = stream->walmethod->get_file_name(current_walfile_name, + fn = stream->walmethod->get_file_name(walfile_name, stream->partial_suffix); /* @@ -126,7 +125,7 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint) if (size == WalSegSz) { /* Already padded file. Open it for use */ - f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0); + f = stream->walmethod->open_for_write(walfile_name, stream->partial_suffix, 0); if (f == NULL) { pg_log_error("could not open existing write-ahead log file \"%s\": %s", @@ -165,7 +164,7 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint) /* No file existed, so create one */ - f = stream->walmethod->open_for_write(current_walfile_name, + f = stream->walmethod->open_for_write(walfile_name, stream->partial_suffix, WalSegSz); if (f == NULL) { @@ -191,27 +190,18 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos) char *fn; off_t currpos; int r; + char walfile_name[MAXPGPATH]; if (walfile == NULL) return true; + strlcpy(walfile_name, walfile->pathname, MAXPGPATH); + currpos = walfile->currpos; + /* Note that this considers the compression used if necessary */ - fn = stream->walmethod->get_file_name(current_walfile_name, + fn = stream->walmethod->get_file_name(walfile_name, stream->partial_suffix); - currpos = stream->walmethod->get_current_pos(walfile); - - if (currpos == -1) - { - pg_log_error("could not determine seek position in file \"%s\": %s", - fn, stream->walmethod->getlasterror()); - stream->walmethod->close(walfile, CLOSE_UNLINK); - walfile = NULL; - - pg_free(fn); - return false; - } - if (stream->partial_suffix) { if (currpos == WalSegSz) @@ -247,7 +237,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos) if (currpos == WalSegSz && stream->mark_done) { /* writes error message if failed */ - if (!mark_file_as_archived(stream, current_walfile_name)) + if (!mark_file_as_archived(stream, walfile_name)) return false; } @@ -690,7 +680,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) error: if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0) pg_log_error("could not close file \"%s\": %s", - current_walfile_name, stream->walmethod->getlasterror()); + walfile->pathname, stream->walmethod->getlasterror()); walfile = NULL; return false; } @@ -777,7 +767,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, { if (stream->walmethod->sync(walfile) != 0) pg_fatal("could not fsync file \"%s\": %s", - current_walfile_name, stream->walmethod->getlasterror()); + walfile->pathname, stream->walmethod->getlasterror()); lastFlushPosition = blockpos; /* @@ -1024,7 +1014,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, */ if (stream->walmethod->sync(walfile) != 0) pg_fatal("could not fsync file \"%s\": %s", - current_walfile_name, stream->walmethod->getlasterror()); + walfile->pathname, stream->walmethod->getlasterror()); lastFlushPosition = blockpos; } @@ -1092,10 +1082,10 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, else { /* More data in existing segment */ - if (stream->walmethod->get_current_pos(walfile) != xlogoff) + if (walfile->currpos != xlogoff) { pg_log_error("got WAL data offset %08x, expected %08x", - xlogoff, (int) stream->walmethod->get_current_pos(walfile)); + xlogoff, (int) walfile->currpos); return false; } } @@ -1129,7 +1119,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, bytes_to_write) != bytes_to_write) { pg_log_error("could not write %d bytes to WAL file \"%s\": %s", - bytes_to_write, current_walfile_name, + bytes_to_write, walfile->pathname, stream->walmethod->getlasterror()); return false; } diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c index 4d4bc63fbe7..d98a2681b90 100644 --- a/src/bin/pg_basebackup/walmethods.c +++ b/src/bin/pg_basebackup/walmethods.c @@ -62,9 +62,8 @@ static DirectoryMethodData *dir_data = NULL; */ typedef struct DirectoryMethodFile { + Walfile base; int fd; - off_t currpos; - char *pathname; char *fullpath; char *temp_suffix; #ifdef HAVE_LIBZ @@ -104,7 +103,7 @@ dir_get_file_name(const char *pathname, const char *temp_suffix) return filename; } -static Walfile +static Walfile * dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size) { char tmppath[MAXPGPATH]; @@ -279,18 +278,18 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ } #endif + f->base.currpos = 0; + f->base.pathname = pg_strdup(pathname); f->fd = fd; - f->currpos = 0; - f->pathname = pg_strdup(pathname); f->fullpath = pg_strdup(tmppath); if (temp_suffix) f->temp_suffix = pg_strdup(temp_suffix); - return f; + return &f->base; } static ssize_t -dir_write(Walfile f, const void *buf, size_t count) +dir_write(Walfile *f, const void *buf, size_t count) { ssize_t r; DirectoryMethodFile *df = (DirectoryMethodFile *) f; @@ -366,22 +365,12 @@ dir_write(Walfile f, const void *buf, size_t count) } } if (r > 0) - df->currpos += r; + df->base.currpos += r; return r; } -static off_t -dir_get_current_pos(Walfile f) -{ - Assert(f != NULL); - dir_clear_error(); - - /* Use a cached value to prevent lots of reseeks */ - return ((DirectoryMethodFile *) f)->currpos; -} - static int -dir_close(Walfile f, WalCloseMethod method) +dir_close(Walfile *f, WalCloseMethod method) { int r; DirectoryMethodFile *df = (DirectoryMethodFile *) f; @@ -440,13 +429,13 @@ dir_close(Walfile f, WalCloseMethod method) * If we have a temp prefix, normal operation is to rename the * file. */ - filename = dir_get_file_name(df->pathname, df->temp_suffix); + filename = dir_get_file_name(df->base.pathname, df->temp_suffix); snprintf(tmppath, sizeof(tmppath), "%s/%s", dir_data->basedir, filename); pg_free(filename); /* permanent name, so no need for the prefix */ - filename2 = dir_get_file_name(df->pathname, NULL); + filename2 = dir_get_file_name(df->base.pathname, NULL); snprintf(tmppath2, sizeof(tmppath2), "%s/%s", dir_data->basedir, filename2); pg_free(filename2); @@ -467,7 +456,7 @@ dir_close(Walfile f, WalCloseMethod method) char *filename; /* Unlink the file once it's closed */ - filename = dir_get_file_name(df->pathname, df->temp_suffix); + filename = dir_get_file_name(df->base.pathname, df->temp_suffix); snprintf(tmppath, sizeof(tmppath), "%s/%s", dir_data->basedir, filename); pg_free(filename); @@ -498,7 +487,7 @@ dir_close(Walfile f, WalCloseMethod method) LZ4F_freeCompressionContext(df->ctx); #endif - pg_free(df->pathname); + pg_free(df->base.pathname); pg_free(df->fullpath); pg_free(df->temp_suffix); pg_free(df); @@ -507,7 +496,7 @@ dir_close(Walfile f, WalCloseMethod method) } static int -dir_sync(Walfile f) +dir_sync(Walfile *f) { int r; @@ -630,7 +619,6 @@ CreateWalDirectoryMethod(const char *basedir, method = pg_malloc0(sizeof(WalWriteMethod)); method->open_for_write = dir_open_for_write; method->write = dir_write; - method->get_current_pos = dir_get_current_pos; method->get_file_size = dir_get_file_size; method->get_file_name = dir_get_file_name; method->compression_algorithm = dir_compression_algorithm; @@ -665,10 +653,9 @@ FreeWalDirectoryMethod(void) typedef struct TarMethodFile { + Walfile base; off_t ofs_start; /* Where does the *header* for this file start */ - off_t currpos; char header[TAR_BLOCK_SIZE]; - char *pathname; size_t pad_to_size; } TarMethodFile; @@ -755,7 +742,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush) #endif static ssize_t -tar_write(Walfile f, const void *buf, size_t count) +tar_write(Walfile *f, const void *buf, size_t count) { ssize_t r; @@ -773,7 +760,7 @@ tar_write(Walfile f, const void *buf, size_t count) tar_data->lasterrno = errno ? errno : ENOSPC; return -1; } - ((TarMethodFile *) f)->currpos += r; + f->currpos += r; return r; } #ifdef HAVE_LIBZ @@ -781,7 +768,7 @@ tar_write(Walfile f, const void *buf, size_t count) { if (!tar_write_compressed_data(unconstify(void *, buf), count, false)) return -1; - ((TarMethodFile *) f)->currpos += count; + f->currpos += count; return count; } #endif @@ -803,7 +790,7 @@ tar_write_padding_data(TarMethodFile *f, size_t bytes) while (bytesleft) { size_t bytestowrite = Min(bytesleft, XLOG_BLCKSZ); - ssize_t r = tar_write(f, zerobuf.data, bytestowrite); + ssize_t r = tar_write(&f->base, zerobuf.data, bytestowrite); if (r < 0) return false; @@ -824,7 +811,7 @@ tar_get_file_name(const char *pathname, const char *temp_suffix) return filename; } -static Walfile +static Walfile * tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size) { char *tmppath; @@ -920,7 +907,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ tar_data->currentfile = NULL; return NULL; } - tar_data->currentfile->currpos = 0; + tar_data->currentfile->base.currpos = 0; if (tar_data->compression_algorithm == PG_COMPRESSION_NONE) { @@ -958,7 +945,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ Assert(false); } - tar_data->currentfile->pathname = pg_strdup(pathname); + tar_data->currentfile->base.pathname = pg_strdup(pathname); /* * Uncompressed files are padded on creation, but for compression we can't @@ -981,11 +968,11 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ return NULL; } - tar_data->currentfile->currpos = 0; + tar_data->currentfile->base.currpos = 0; } } - return tar_data->currentfile; + return &tar_data->currentfile->base; } static ssize_t @@ -1004,17 +991,8 @@ tar_compression_algorithm(void) return tar_data->compression_algorithm; } -static off_t -tar_get_current_pos(Walfile f) -{ - Assert(f != NULL); - tar_clear_error(); - - return ((TarMethodFile *) f)->currpos; -} - static int -tar_sync(Walfile f) +tar_sync(Walfile *f) { int r; @@ -1038,7 +1016,7 @@ tar_sync(Walfile f) } static int -tar_close(Walfile f, WalCloseMethod method) +tar_close(Walfile *f, WalCloseMethod method) { ssize_t filesize; int padding; @@ -1066,7 +1044,7 @@ tar_close(Walfile f, WalCloseMethod method) return -1; } - pg_free(tf->pathname); + pg_free(tf->base.pathname); pg_free(tf); tar_data->currentfile = NULL; @@ -1086,7 +1064,7 @@ tar_close(Walfile f, WalCloseMethod method) * A compressed tarfile is padded on close since we cannot know * the size of the compressed output until the end. */ - size_t sizeleft = tf->pad_to_size - tf->currpos; + size_t sizeleft = tf->pad_to_size - tf->base.currpos; if (sizeleft) { @@ -1100,7 +1078,7 @@ tar_close(Walfile f, WalCloseMethod method) * An uncompressed tarfile was padded on creation, so just adjust * the current position as if we seeked to the end. */ - tf->currpos = tf->pad_to_size; + tf->base.currpos = tf->pad_to_size; } } @@ -1108,7 +1086,7 @@ tar_close(Walfile f, WalCloseMethod method) * Get the size of the file, and pad out to a multiple of the tar block * size. */ - filesize = tar_get_current_pos(f); + filesize = f->currpos; padding = tarPaddingBytesRequired(filesize); if (padding) { @@ -1141,7 +1119,7 @@ tar_close(Walfile f, WalCloseMethod method) * We overwrite it with what it was before if we have no tempname, * since we're going to write the buffer anyway. */ - strlcpy(&(tf->header[0]), tf->pathname, 100); + strlcpy(&(tf->header[0]), tf->base.pathname, 100); print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header)); if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start) @@ -1201,11 +1179,11 @@ tar_close(Walfile f, WalCloseMethod method) { /* XXX this seems pretty bogus; why is only this case fatal? */ pg_fatal("could not fsync file \"%s\": %s", - tf->pathname, tar_getlasterror()); + tf->base.pathname, tar_getlasterror()); } /* Clean up and done */ - pg_free(tf->pathname); + pg_free(tf->base.pathname); pg_free(tf); tar_data->currentfile = NULL; @@ -1229,7 +1207,7 @@ tar_finish(void) if (tar_data->currentfile) { - if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0) + if (tar_close(&tar_data->currentfile->base, CLOSE_NORMAL) != 0) return false; } @@ -1345,7 +1323,6 @@ CreateWalTarMethod(const char *tarbase, method = pg_malloc0(sizeof(WalWriteMethod)); method->open_for_write = tar_open_for_write; method->write = tar_write; - method->get_current_pos = tar_get_current_pos; method->get_file_size = tar_get_file_size; method->get_file_name = tar_get_file_name; method->compression_algorithm = tar_compression_algorithm; diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h index 76530dc9419..cf5ed87fbe8 100644 --- a/src/bin/pg_basebackup/walmethods.h +++ b/src/bin/pg_basebackup/walmethods.h @@ -11,7 +11,20 @@ #include "common/compression.h" -typedef void *Walfile; +struct WalWriteMethod; +typedef struct WalWriteMethod WalWriteMethod; + +typedef struct +{ + off_t currpos; + char *pathname; + /* + * MORE DATA FOLLOWS AT END OF STRUCT + * + * Each WalWriteMethod is expected to embed this as the first member of + * a larger struct with method-specific fields following. + */ +} Walfile; typedef enum { @@ -30,7 +43,6 @@ typedef enum * care not to clobber errno between a failed method call and use of * getlasterror() to retrieve the message. */ -typedef struct WalWriteMethod WalWriteMethod; struct WalWriteMethod { /* @@ -39,13 +51,13 @@ struct WalWriteMethod * automatically renamed in close(). If pad_to_size is specified, the file * will be padded with NUL up to that size, if supported by the Walmethod. */ - Walfile (*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size); + Walfile *(*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size); /* * Close an open Walfile, using one or more methods for handling automatic * unlinking etc. Returns 0 on success, other values for error. */ - int (*close) (Walfile f, WalCloseMethod method); + int (*close) (Walfile *f, WalCloseMethod method); /* Check if a file exist */ bool (*existsfile) (const char *pathname); @@ -66,15 +78,12 @@ struct WalWriteMethod * Write count number of bytes to the file, and return the number of bytes * actually written or -1 for error. */ - ssize_t (*write) (Walfile f, const void *buf, size_t count); - - /* Return the current position in a file or -1 on error */ - off_t (*get_current_pos) (Walfile f); + ssize_t (*write) (Walfile *f, const void *buf, size_t count); /* * fsync the contents of the specified file. Returns 0 on success. */ - int (*sync) (Walfile f); + int (*sync) (Walfile *f); /* * Clean up the Walmethod, closing any shared resources. For methods like