1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-25 13:17:41 +03:00

Optimize fileset usage in apply worker.

Use one fileset for the entire worker lifetime instead of using
separate filesets for each streaming transaction. Now, the
changes/subxacts files for every streaming transaction will be
created under the same fileset and the files will be deleted
after the transaction is completed.

This patch extends the BufFileOpenFileSet and BufFileDeleteFileSet
APIs to allow users to specify whether to give an error on missing
files.

Author: Dilip Kumar, based on suggestion by Thomas Munro
Reviewed-by: Hou Zhijie, Masahiko Sawada, Amit Kapila
Discussion: https://postgr.es/m/E1mCC6U-0004Ik-Fs@gemulon.postgresql.org
This commit is contained in:
Amit Kapila
2021-09-02 08:13:46 +05:30
parent 163074ea84
commit 31c389d8de
7 changed files with 86 additions and 219 deletions

View File

@@ -379,6 +379,7 @@ retry:
worker->relid = relid;
worker->relstate = SUBREL_STATE_UNKNOWN;
worker->relstate_lsn = InvalidXLogRecPtr;
worker->stream_fileset = NULL;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -648,8 +649,9 @@ logicalrep_worker_onexit(int code, Datum arg)
logicalrep_worker_detach();
/* Cleanup filesets used for streaming transactions. */
logicalrep_worker_cleanupfileset();
/* Cleanup fileset used for streaming transactions. */
if (MyLogicalRepWorker->stream_fileset != NULL)
FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
ApplyLauncherWakeup();
}

View File

@@ -236,20 +236,6 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.ts = 0,
};
/*
* Stream xid hash entry. Whenever we see a new xid we create this entry in the
* xidhash and along with it create the streaming file and store the fileset handle.
* The subxact file is created iff there is any subxact info under this xid. This
* entry is used on the subsequent streams for the xid to get the corresponding
* fileset handles, so storing them in hash makes the search faster.
*/
typedef struct StreamXidHash
{
TransactionId xid; /* xid is the hash key and must be first */
FileSet *stream_fileset; /* file set for stream data */
FileSet *subxact_fileset; /* file set for subxact info */
} StreamXidHash;
static MemoryContext ApplyMessageContext = NULL;
MemoryContext ApplyContext = NULL;
@@ -269,12 +255,6 @@ static bool in_streamed_transaction = false;
static TransactionId stream_xid = InvalidTransactionId;
/*
* Hash table for storing the streaming xid information along with filesets
* for streaming and subxact files.
*/
static HTAB *xidhash = NULL;
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
@@ -1118,7 +1098,6 @@ static void
apply_handle_stream_start(StringInfo s)
{
bool first_segment;
HASHCTL hash_ctl;
if (in_streamed_transaction)
ereport(ERROR,
@@ -1148,17 +1127,23 @@ apply_handle_stream_start(StringInfo s)
set_apply_error_context_xact(stream_xid, 0);
/*
* Initialize the xidhash table if we haven't yet. This will be used for
* the entire duration of the apply worker so create it in permanent
* context.
* Initialize the worker's stream_fileset if we haven't yet. This will be
* used for the entire duration of the worker so create it in a permanent
* context. We create this on the very first streaming message from any
* transaction and then use it for this and other streaming transactions.
* Now, we could create a fileset at the start of the worker as well but
* then we won't be sure that it will ever be used.
*/
if (xidhash == NULL)
if (MyLogicalRepWorker->stream_fileset == NULL)
{
hash_ctl.keysize = sizeof(TransactionId);
hash_ctl.entrysize = sizeof(StreamXidHash);
hash_ctl.hcxt = ApplyContext;
xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
MemoryContext oldctx;
oldctx = MemoryContextSwitchTo(ApplyContext);
MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
FileSetInit(MyLogicalRepWorker->stream_fileset);
MemoryContextSwitchTo(oldctx);
}
/* open the spool file for this transaction */
@@ -1253,7 +1238,6 @@ apply_handle_stream_abort(StringInfo s)
BufFile *fd;
bool found = false;
char path[MAXPGPATH];
StreamXidHash *ent;
set_apply_error_context_xact(subxid, 0);
@@ -1285,19 +1269,10 @@ apply_handle_stream_abort(StringInfo s)
return;
}
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
NULL);
if (!ent)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("transaction %u not found in stream XID hash table",
xid)));
/* open the changes file */
changes_filename(path, MyLogicalRepWorker->subid, xid);
fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
O_RDWR, false);
/* OK, truncate the file at the right offset */
BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
@@ -1327,7 +1302,6 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
int nchanges;
char path[MAXPGPATH];
char *buffer = NULL;
StreamXidHash *ent;
MemoryContext oldcxt;
BufFile *fd;
@@ -1345,17 +1319,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
changes_filename(path, MyLogicalRepWorker->subid, xid);
elog(DEBUG1, "replaying changes from file \"%s\"", path);
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
NULL);
if (!ent)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("transaction %u not found in stream XID hash table",
xid)));
fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY);
fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
false);
buffer = palloc(BLCKSZ);
initStringInfo(&s2);
@@ -2541,30 +2506,6 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
}
}
/*
* Cleanup filesets.
*/
void
logicalrep_worker_cleanupfileset(void)
{
HASH_SEQ_STATUS status;
StreamXidHash *hentry;
/* Remove all the pending stream and subxact filesets. */
if (xidhash)
{
hash_seq_init(&status, xidhash);
while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL)
{
FileSetDeleteAll(hentry->stream_fileset);
/* Delete the subxact fileset iff it is created. */
if (hentry->subxact_fileset)
FileSetDeleteAll(hentry->subxact_fileset);
}
}
}
/*
* Apply main loop.
*/
@@ -3026,58 +2967,30 @@ subxact_info_write(Oid subid, TransactionId xid)
{
char path[MAXPGPATH];
Size len;
StreamXidHash *ent;
BufFile *fd;
Assert(TransactionIdIsValid(xid));
/* Find the xid entry in the xidhash */
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
NULL);
/* By this time we must have created the transaction entry */
Assert(ent);
/* construct the subxact filename */
subxact_filename(path, subid, xid);
/*
* If there is no subtransaction then nothing to do, but if already have
* subxact file then delete that.
*/
/* Delete the subxacts file, if exists. */
if (subxact_data.nsubxacts == 0)
{
if (ent->subxact_fileset)
{
cleanup_subxact_info();
FileSetDeleteAll(ent->subxact_fileset);
pfree(ent->subxact_fileset);
ent->subxact_fileset = NULL;
}
BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
return;
}
subxact_filename(path, subid, xid);
/*
* Create the subxact file if it not already created, otherwise open the
* existing file.
*/
if (ent->subxact_fileset == NULL)
{
MemoryContext oldctx;
/*
* We need to maintain fileset across multiple stream start/stop
* calls. So, need to allocate it in a persistent context.
*/
oldctx = MemoryContextSwitchTo(ApplyContext);
ent->subxact_fileset = palloc(sizeof(FileSet));
FileSetInit(ent->subxact_fileset);
MemoryContextSwitchTo(oldctx);
fd = BufFileCreateFileSet(ent->subxact_fileset, path);
}
else
fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR);
fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
true);
if (fd == NULL)
fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
@@ -3104,34 +3017,21 @@ subxact_info_read(Oid subid, TransactionId xid)
char path[MAXPGPATH];
Size len;
BufFile *fd;
StreamXidHash *ent;
MemoryContext oldctx;
Assert(!subxact_data.subxacts);
Assert(subxact_data.nsubxacts == 0);
Assert(subxact_data.nsubxacts_max == 0);
/* Find the stream xid entry in the xidhash */
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
NULL);
if (!ent)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("transaction %u not found in stream XID hash table",
xid)));
/*
* If subxact_fileset is not valid that mean we don't have any subxact
* info
* If the subxact file doesn't exist that means we don't have any subxact
* info.
*/
if (ent->subxact_fileset == NULL)
return;
subxact_filename(path, subid, xid);
fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY);
fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
true);
if (fd == NULL)
return;
/* read number of subxact items */
if (BufFileRead(fd, &subxact_data.nsubxacts,
@@ -3267,42 +3167,21 @@ changes_filename(char *path, Oid subid, TransactionId xid)
* Cleanup files for a subscription / toplevel transaction.
*
* Remove files with serialized changes and subxact info for a particular
* toplevel transaction. Each subscription has a separate set of files.
* toplevel transaction. Each subscription has a separate set of files
* for any toplevel transaction.
*/
static void
stream_cleanup_files(Oid subid, TransactionId xid)
{
char path[MAXPGPATH];
StreamXidHash *ent;
/* Find the xid entry in the xidhash */
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
NULL);
if (!ent)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("transaction %u not found in stream XID hash table",
xid)));
/* Delete the change file and release the stream fileset memory */
/* Delete the changes file. */
changes_filename(path, subid, xid);
FileSetDeleteAll(ent->stream_fileset);
pfree(ent->stream_fileset);
ent->stream_fileset = NULL;
BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
/* Delete the subxact file and release the memory, if it exist */
if (ent->subxact_fileset)
{
/* Delete the subxact file, if it exists. */
subxact_filename(path, subid, xid);
FileSetDeleteAll(ent->subxact_fileset);
pfree(ent->subxact_fileset);
ent->subxact_fileset = NULL;
}
/* Remove the xid entry from the stream xid hash */
hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL);
BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
}
/*
@@ -3312,8 +3191,8 @@ stream_cleanup_files(Oid subid, TransactionId xid)
*
* Open a file for streamed changes from a toplevel transaction identified
* by stream_xid (global variable). If it's the first chunk of streamed
* changes for this transaction, initialize the fileset and create the buffile,
* otherwise open the previously created file.
* changes for this transaction, create the buffile, otherwise open the
* previously created file.
*
* This can only be called at the beginning of a "streaming" block, i.e.
* between stream_start/stream_stop messages from the upstream.
@@ -3322,20 +3201,13 @@ static void
stream_open_file(Oid subid, TransactionId xid, bool first_segment)
{
char path[MAXPGPATH];
bool found;
MemoryContext oldcxt;
StreamXidHash *ent;
Assert(in_streamed_transaction);
Assert(OidIsValid(subid));
Assert(TransactionIdIsValid(xid));
Assert(stream_fd == NULL);
/* create or find the xid entry in the xidhash */
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_ENTER,
&found);
changes_filename(path, subid, xid);
elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
@@ -3347,49 +3219,20 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
/*
* If this is the first streamed segment, the file must not exist, so make
* sure we're the ones creating it. Otherwise just open the file for
* writing, in append mode.
* If this is the first streamed segment, create the changes file.
* Otherwise, just open the file for writing, in append mode.
*/
if (first_segment)
{
MemoryContext savectx;
FileSet *fileset;
if (found)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
/*
* We need to maintain fileset across multiple stream start/stop
* calls. So, need to allocate it in a persistent context.
*/
savectx = MemoryContextSwitchTo(ApplyContext);
fileset = palloc(sizeof(FileSet));
FileSetInit(fileset);
MemoryContextSwitchTo(savectx);
stream_fd = BufFileCreateFileSet(fileset, path);
/* Remember the fileset for the next stream of the same transaction */
ent->xid = xid;
ent->stream_fileset = fileset;
ent->subxact_fileset = NULL;
}
stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
path);
else
{
if (!found)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
/*
* Open the file and seek to the end of the file because we always
* append the changes file.
*/
stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
path, O_RDWR, false);
BufFileSeek(stream_fd, 0, 0, SEEK_END);
}

View File

@@ -278,10 +278,13 @@ BufFileCreateFileSet(FileSet *fileset, const char *name)
* with BufFileCreateFileSet in the same FileSet using the same name.
* The backend that created the file must have called BufFileClose() or
* BufFileExportFileSet() to make sure that it is ready to be opened by other
* backends and render it read-only.
* backends and render it read-only. If missing_ok is true, which indicates
* that missing files can be safely ignored, then return NULL if the BufFile
* with the given name is not found, otherwise, throw an error.
*/
BufFile *
BufFileOpenFileSet(FileSet *fileset, const char *name, int mode)
BufFileOpenFileSet(FileSet *fileset, const char *name, int mode,
bool missing_ok)
{
BufFile *file;
char segment_name[MAXPGPATH];
@@ -318,10 +321,18 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode)
* name.
*/
if (nfiles == 0)
{
/* free the memory */
pfree(files);
if (missing_ok)
return NULL;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open temporary file \"%s\" from BufFile \"%s\": %m",
segment_name, name)));
}
file = makeBufFileCommon(nfiles);
file->files = files;
@@ -341,10 +352,11 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode)
* the FileSet to be cleaned up.
*
* Only one backend should attempt to delete a given name, and should know
* that it exists and has been exported or closed.
* that it exists and has been exported or closed otherwise missing_ok should
* be passed true.
*/
void
BufFileDeleteFileSet(FileSet *fileset, const char *name)
BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
{
char segment_name[MAXPGPATH];
int segment = 0;
@@ -366,7 +378,7 @@ BufFileDeleteFileSet(FileSet *fileset, const char *name)
CHECK_FOR_INTERRUPTS();
}
if (!found)
if (!found && !missing_ok)
elog(ERROR, "could not delete unknown BufFile \"%s\"", name);
}

View File

@@ -564,7 +564,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
lt = &lts->tapes[i];
pg_itoa(i, filename);
file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY);
file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY, false);
filesize = BufFileSize(file);
/*

View File

@@ -560,7 +560,8 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
sts_filename(name, accessor, accessor->read_participant);
accessor->read_file =
BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY);
BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
false);
}
/* Seek and load the chunk header. */

View File

@@ -50,6 +50,15 @@ typedef struct LogicalRepWorker
XLogRecPtr relstate_lsn;
slock_t relmutex;
/*
* Used to create the changes and subxact files for the streaming
* transactions. Upon the arrival of the first streaming transaction, the
* fileset will be initialized, and it will be deleted when the worker
* exits. Under this, separate buffiles would be created for each
* transaction which will be deleted after the transaction is finished.
*/
FileSet *stream_fileset;
/* Stats. */
XLogRecPtr last_lsn;
TimestampTz last_send_time;
@@ -79,7 +88,6 @@ extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
extern void logicalrep_worker_stop(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
extern void logicalrep_worker_cleanupfileset(void);
extern int logicalrep_sync_worker_count(Oid subid);

View File

@@ -49,8 +49,9 @@ extern long BufFileAppend(BufFile *target, BufFile *source);
extern BufFile *BufFileCreateFileSet(FileSet *fileset, const char *name);
extern void BufFileExportFileSet(BufFile *file);
extern BufFile *BufFileOpenFileSet(FileSet *fileset, const char *name,
int mode);
extern void BufFileDeleteFileSet(FileSet *fileset, const char *name);
int mode, bool missing_ok);
extern void BufFileDeleteFileSet(FileSet *fileset, const char *name,
bool missing_ok);
extern void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset);
#endif /* BUFFILE_H */