mirror of
https://github.com/postgres/postgres.git
synced 2025-08-25 20:23:07 +03:00
Refactor sharedfileset.c to separate out fileset implementation.
Move fileset related implementation out of sharedfileset.c to allow its usage by backends that don't want to share filesets among different processes. After this split, fileset infrastructure is used by both sharedfileset.c and worker.c for the named temporary files that survive across transactions. Author: Dilip Kumar, based on suggestion by Andres Freund Reviewed-by: Hou Zhijie, Masahiko Sawada, Amit Kapila Discussion: https://postgr.es/m/E1mCC6U-0004Ik-Fs@gemulon.postgresql.org
This commit is contained in:
@@ -648,6 +648,9 @@ logicalrep_worker_onexit(int code, Datum arg)
|
||||
|
||||
logicalrep_worker_detach();
|
||||
|
||||
/* Cleanup filesets used for streaming transactions. */
|
||||
logicalrep_worker_cleanupfileset();
|
||||
|
||||
ApplyLauncherWakeup();
|
||||
}
|
||||
|
||||
|
@@ -39,13 +39,13 @@
|
||||
* BufFile infrastructure supports temporary files that exceed the OS file size
|
||||
* limit, (b) provides a way for automatic clean up on the error and (c) provides
|
||||
* a way to survive these files across local transactions and allow to open and
|
||||
* close at stream start and close. We decided to use SharedFileSet
|
||||
* close at stream start and close. We decided to use FileSet
|
||||
* infrastructure as without that it deletes the files on the closure of the
|
||||
* file and if we decide to keep stream files open across the start/stop stream
|
||||
* then it will consume a lot of memory (more than 8K for each BufFile and
|
||||
* there could be multiple such BufFiles as the subscriber could receive
|
||||
* multiple start/stop streams for different transactions before getting the
|
||||
* commit). Moreover, if we don't use SharedFileSet then we also need to invent
|
||||
* commit). Moreover, if we don't use FileSet then we also need to invent
|
||||
* a new way to pass filenames to BufFile APIs so that we are allowed to open
|
||||
* the file we desired across multiple stream-open calls for the same
|
||||
* transaction.
|
||||
@@ -246,8 +246,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
|
||||
typedef struct StreamXidHash
|
||||
{
|
||||
TransactionId xid; /* xid is the hash key and must be first */
|
||||
SharedFileSet *stream_fileset; /* shared file set for stream data */
|
||||
SharedFileSet *subxact_fileset; /* shared file set for subxact info */
|
||||
FileSet *stream_fileset; /* file set for stream data */
|
||||
FileSet *subxact_fileset; /* file set for subxact info */
|
||||
} StreamXidHash;
|
||||
|
||||
static MemoryContext ApplyMessageContext = NULL;
|
||||
@@ -270,8 +270,8 @@ static bool in_streamed_transaction = false;
|
||||
static TransactionId stream_xid = InvalidTransactionId;
|
||||
|
||||
/*
|
||||
* Hash table for storing the streaming xid information along with shared file
|
||||
* set for streaming and subxact files.
|
||||
* Hash table for storing the streaming xid information along with filesets
|
||||
* for streaming and subxact files.
|
||||
*/
|
||||
static HTAB *xidhash = NULL;
|
||||
|
||||
@@ -1297,11 +1297,11 @@ apply_handle_stream_abort(StringInfo s)
|
||||
|
||||
/* open the changes file */
|
||||
changes_filename(path, MyLogicalRepWorker->subid, xid);
|
||||
fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
|
||||
fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
|
||||
|
||||
/* OK, truncate the file at the right offset */
|
||||
BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno,
|
||||
subxact_data.subxacts[subidx].offset);
|
||||
BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
|
||||
subxact_data.subxacts[subidx].offset);
|
||||
BufFileClose(fd);
|
||||
|
||||
/* discard the subxacts added later */
|
||||
@@ -1355,7 +1355,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
|
||||
errmsg_internal("transaction %u not found in stream XID hash table",
|
||||
xid)));
|
||||
|
||||
fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
|
||||
fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY);
|
||||
|
||||
buffer = palloc(BLCKSZ);
|
||||
initStringInfo(&s2);
|
||||
@@ -2541,6 +2541,30 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Cleanup filesets.
|
||||
*/
|
||||
void
|
||||
logicalrep_worker_cleanupfileset(void)
|
||||
{
|
||||
HASH_SEQ_STATUS status;
|
||||
StreamXidHash *hentry;
|
||||
|
||||
/* Remove all the pending stream and subxact filesets. */
|
||||
if (xidhash)
|
||||
{
|
||||
hash_seq_init(&status, xidhash);
|
||||
while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
FileSetDeleteAll(hentry->stream_fileset);
|
||||
|
||||
/* Delete the subxact fileset iff it is created. */
|
||||
if (hentry->subxact_fileset)
|
||||
FileSetDeleteAll(hentry->subxact_fileset);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Apply main loop.
|
||||
*/
|
||||
@@ -3024,7 +3048,7 @@ subxact_info_write(Oid subid, TransactionId xid)
|
||||
if (ent->subxact_fileset)
|
||||
{
|
||||
cleanup_subxact_info();
|
||||
SharedFileSetDeleteAll(ent->subxact_fileset);
|
||||
FileSetDeleteAll(ent->subxact_fileset);
|
||||
pfree(ent->subxact_fileset);
|
||||
ent->subxact_fileset = NULL;
|
||||
}
|
||||
@@ -3042,18 +3066,18 @@ subxact_info_write(Oid subid, TransactionId xid)
|
||||
MemoryContext oldctx;
|
||||
|
||||
/*
|
||||
* We need to maintain shared fileset across multiple stream
|
||||
* start/stop calls. So, need to allocate it in a persistent context.
|
||||
* We need to maintain fileset across multiple stream start/stop
|
||||
* calls. So, need to allocate it in a persistent context.
|
||||
*/
|
||||
oldctx = MemoryContextSwitchTo(ApplyContext);
|
||||
ent->subxact_fileset = palloc(sizeof(SharedFileSet));
|
||||
SharedFileSetInit(ent->subxact_fileset, NULL);
|
||||
ent->subxact_fileset = palloc(sizeof(FileSet));
|
||||
FileSetInit(ent->subxact_fileset);
|
||||
MemoryContextSwitchTo(oldctx);
|
||||
|
||||
fd = BufFileCreateShared(ent->subxact_fileset, path);
|
||||
fd = BufFileCreateFileSet(ent->subxact_fileset, path);
|
||||
}
|
||||
else
|
||||
fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
|
||||
fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR);
|
||||
|
||||
len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
|
||||
|
||||
@@ -3107,7 +3131,7 @@ subxact_info_read(Oid subid, TransactionId xid)
|
||||
|
||||
subxact_filename(path, subid, xid);
|
||||
|
||||
fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY);
|
||||
fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY);
|
||||
|
||||
/* read number of subxact items */
|
||||
if (BufFileRead(fd, &subxact_data.nsubxacts,
|
||||
@@ -3264,7 +3288,7 @@ stream_cleanup_files(Oid subid, TransactionId xid)
|
||||
|
||||
/* Delete the change file and release the stream fileset memory */
|
||||
changes_filename(path, subid, xid);
|
||||
SharedFileSetDeleteAll(ent->stream_fileset);
|
||||
FileSetDeleteAll(ent->stream_fileset);
|
||||
pfree(ent->stream_fileset);
|
||||
ent->stream_fileset = NULL;
|
||||
|
||||
@@ -3272,7 +3296,7 @@ stream_cleanup_files(Oid subid, TransactionId xid)
|
||||
if (ent->subxact_fileset)
|
||||
{
|
||||
subxact_filename(path, subid, xid);
|
||||
SharedFileSetDeleteAll(ent->subxact_fileset);
|
||||
FileSetDeleteAll(ent->subxact_fileset);
|
||||
pfree(ent->subxact_fileset);
|
||||
ent->subxact_fileset = NULL;
|
||||
}
|
||||
@@ -3288,8 +3312,8 @@ stream_cleanup_files(Oid subid, TransactionId xid)
|
||||
*
|
||||
* Open a file for streamed changes from a toplevel transaction identified
|
||||
* by stream_xid (global variable). If it's the first chunk of streamed
|
||||
* changes for this transaction, initialize the shared fileset and create the
|
||||
* buffile, otherwise open the previously created file.
|
||||
* changes for this transaction, initialize the fileset and create the buffile,
|
||||
* otherwise open the previously created file.
|
||||
*
|
||||
* This can only be called at the beginning of a "streaming" block, i.e.
|
||||
* between stream_start/stream_stop messages from the upstream.
|
||||
@@ -3330,7 +3354,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
|
||||
if (first_segment)
|
||||
{
|
||||
MemoryContext savectx;
|
||||
SharedFileSet *fileset;
|
||||
FileSet *fileset;
|
||||
|
||||
if (found)
|
||||
ereport(ERROR,
|
||||
@@ -3338,16 +3362,16 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
|
||||
errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
|
||||
|
||||
/*
|
||||
* We need to maintain shared fileset across multiple stream
|
||||
* start/stop calls. So, need to allocate it in a persistent context.
|
||||
* We need to maintain fileset across multiple stream start/stop
|
||||
* calls. So, need to allocate it in a persistent context.
|
||||
*/
|
||||
savectx = MemoryContextSwitchTo(ApplyContext);
|
||||
fileset = palloc(sizeof(SharedFileSet));
|
||||
fileset = palloc(sizeof(FileSet));
|
||||
|
||||
SharedFileSetInit(fileset, NULL);
|
||||
FileSetInit(fileset);
|
||||
MemoryContextSwitchTo(savectx);
|
||||
|
||||
stream_fd = BufFileCreateShared(fileset, path);
|
||||
stream_fd = BufFileCreateFileSet(fileset, path);
|
||||
|
||||
/* Remember the fileset for the next stream of the same transaction */
|
||||
ent->xid = xid;
|
||||
@@ -3365,7 +3389,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
|
||||
* Open the file and seek to the end of the file because we always
|
||||
* append the changes file.
|
||||
*/
|
||||
stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
|
||||
stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
|
||||
BufFileSeek(stream_fd, 0, 0, SEEK_END);
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user