mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-25 13:17:41 +03:00 
			
		
		
		
	Optimize fileset usage in apply worker.
Use one fileset for the entire worker lifetime instead of using separate filesets for each streaming transaction. Now, the changes/subxacts files for every streaming transaction will be created under the same fileset and the files will be deleted after the transaction is completed. This patch extends the BufFileOpenFileSet and BufFileDeleteFileSet APIs to allow users to specify whether to give an error on missing files. Author: Dilip Kumar, based on suggestion by Thomas Munro Reviewed-by: Hou Zhijie, Masahiko Sawada, Amit Kapila Discussion: https://postgr.es/m/E1mCC6U-0004Ik-Fs@gemulon.postgresql.org
This commit is contained in:
		| @@ -379,6 +379,7 @@ retry: | |||||||
| 	worker->relid = relid; | 	worker->relid = relid; | ||||||
| 	worker->relstate = SUBREL_STATE_UNKNOWN; | 	worker->relstate = SUBREL_STATE_UNKNOWN; | ||||||
| 	worker->relstate_lsn = InvalidXLogRecPtr; | 	worker->relstate_lsn = InvalidXLogRecPtr; | ||||||
|  | 	worker->stream_fileset = NULL; | ||||||
| 	worker->last_lsn = InvalidXLogRecPtr; | 	worker->last_lsn = InvalidXLogRecPtr; | ||||||
| 	TIMESTAMP_NOBEGIN(worker->last_send_time); | 	TIMESTAMP_NOBEGIN(worker->last_send_time); | ||||||
| 	TIMESTAMP_NOBEGIN(worker->last_recv_time); | 	TIMESTAMP_NOBEGIN(worker->last_recv_time); | ||||||
| @@ -648,8 +649,9 @@ logicalrep_worker_onexit(int code, Datum arg) | |||||||
|  |  | ||||||
| 	logicalrep_worker_detach(); | 	logicalrep_worker_detach(); | ||||||
|  |  | ||||||
| 	/* Cleanup filesets used for streaming transactions. */ | 	/* Cleanup fileset used for streaming transactions. */ | ||||||
| 	logicalrep_worker_cleanupfileset(); | 	if (MyLogicalRepWorker->stream_fileset != NULL) | ||||||
|  | 		FileSetDeleteAll(MyLogicalRepWorker->stream_fileset); | ||||||
|  |  | ||||||
| 	ApplyLauncherWakeup(); | 	ApplyLauncherWakeup(); | ||||||
| } | } | ||||||
|   | |||||||
| @@ -236,20 +236,6 @@ static ApplyErrorCallbackArg apply_error_callback_arg = | |||||||
| 	.ts = 0, | 	.ts = 0, | ||||||
| }; | }; | ||||||
|  |  | ||||||
| /* |  | ||||||
|  * Stream xid hash entry. Whenever we see a new xid we create this entry in the |  | ||||||
|  * xidhash and along with it create the streaming file and store the fileset handle. |  | ||||||
|  * The subxact file is created iff there is any subxact info under this xid. This |  | ||||||
|  * entry is used on the subsequent streams for the xid to get the corresponding |  | ||||||
|  * fileset handles, so storing them in hash makes the search faster. |  | ||||||
|  */ |  | ||||||
| typedef struct StreamXidHash |  | ||||||
| { |  | ||||||
| 	TransactionId xid;			/* xid is the hash key and must be first */ |  | ||||||
| 	FileSet    *stream_fileset; /* file set for stream data */ |  | ||||||
| 	FileSet    *subxact_fileset;	/* file set for subxact info */ |  | ||||||
| } StreamXidHash; |  | ||||||
|  |  | ||||||
| static MemoryContext ApplyMessageContext = NULL; | static MemoryContext ApplyMessageContext = NULL; | ||||||
| MemoryContext ApplyContext = NULL; | MemoryContext ApplyContext = NULL; | ||||||
|  |  | ||||||
| @@ -269,12 +255,6 @@ static bool in_streamed_transaction = false; | |||||||
|  |  | ||||||
| static TransactionId stream_xid = InvalidTransactionId; | static TransactionId stream_xid = InvalidTransactionId; | ||||||
|  |  | ||||||
| /* |  | ||||||
|  * Hash table for storing the streaming xid information along with filesets |  | ||||||
|  * for streaming and subxact files. |  | ||||||
|  */ |  | ||||||
| static HTAB *xidhash = NULL; |  | ||||||
|  |  | ||||||
| /* BufFile handle of the current streaming file */ | /* BufFile handle of the current streaming file */ | ||||||
| static BufFile *stream_fd = NULL; | static BufFile *stream_fd = NULL; | ||||||
|  |  | ||||||
| @@ -1118,7 +1098,6 @@ static void | |||||||
| apply_handle_stream_start(StringInfo s) | apply_handle_stream_start(StringInfo s) | ||||||
| { | { | ||||||
| 	bool		first_segment; | 	bool		first_segment; | ||||||
| 	HASHCTL		hash_ctl; |  | ||||||
|  |  | ||||||
| 	if (in_streamed_transaction) | 	if (in_streamed_transaction) | ||||||
| 		ereport(ERROR, | 		ereport(ERROR, | ||||||
| @@ -1148,17 +1127,23 @@ apply_handle_stream_start(StringInfo s) | |||||||
| 	set_apply_error_context_xact(stream_xid, 0); | 	set_apply_error_context_xact(stream_xid, 0); | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * Initialize the xidhash table if we haven't yet. This will be used for | 	 * Initialize the worker's stream_fileset if we haven't yet. This will be | ||||||
| 	 * the entire duration of the apply worker so create it in permanent | 	 * used for the entire duration of the worker so create it in a permanent | ||||||
| 	 * context. | 	 * context. We create this on the very first streaming message from any | ||||||
|  | 	 * transaction and then use it for this and other streaming transactions. | ||||||
|  | 	 * Now, we could create a fileset at the start of the worker as well but | ||||||
|  | 	 * then we won't be sure that it will ever be used. | ||||||
| 	 */ | 	 */ | ||||||
| 	if (xidhash == NULL) | 	if (MyLogicalRepWorker->stream_fileset == NULL) | ||||||
| 	{ | 	{ | ||||||
| 		hash_ctl.keysize = sizeof(TransactionId); | 		MemoryContext oldctx; | ||||||
| 		hash_ctl.entrysize = sizeof(StreamXidHash); |  | ||||||
| 		hash_ctl.hcxt = ApplyContext; | 		oldctx = MemoryContextSwitchTo(ApplyContext); | ||||||
| 		xidhash = hash_create("StreamXidHash", 1024, &hash_ctl, |  | ||||||
| 							  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); | 		MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet)); | ||||||
|  | 		FileSetInit(MyLogicalRepWorker->stream_fileset); | ||||||
|  |  | ||||||
|  | 		MemoryContextSwitchTo(oldctx); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	/* open the spool file for this transaction */ | 	/* open the spool file for this transaction */ | ||||||
| @@ -1253,7 +1238,6 @@ apply_handle_stream_abort(StringInfo s) | |||||||
| 		BufFile    *fd; | 		BufFile    *fd; | ||||||
| 		bool		found = false; | 		bool		found = false; | ||||||
| 		char		path[MAXPGPATH]; | 		char		path[MAXPGPATH]; | ||||||
| 		StreamXidHash *ent; |  | ||||||
|  |  | ||||||
| 		set_apply_error_context_xact(subxid, 0); | 		set_apply_error_context_xact(subxid, 0); | ||||||
|  |  | ||||||
| @@ -1285,19 +1269,10 @@ apply_handle_stream_abort(StringInfo s) | |||||||
| 			return; | 			return; | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		ent = (StreamXidHash *) hash_search(xidhash, |  | ||||||
| 											(void *) &xid, |  | ||||||
| 											HASH_FIND, |  | ||||||
| 											NULL); |  | ||||||
| 		if (!ent) |  | ||||||
| 			ereport(ERROR, |  | ||||||
| 					(errcode(ERRCODE_PROTOCOL_VIOLATION), |  | ||||||
| 					 errmsg_internal("transaction %u not found in stream XID hash table", |  | ||||||
| 									 xid))); |  | ||||||
|  |  | ||||||
| 		/* open the changes file */ | 		/* open the changes file */ | ||||||
| 		changes_filename(path, MyLogicalRepWorker->subid, xid); | 		changes_filename(path, MyLogicalRepWorker->subid, xid); | ||||||
| 		fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR); | 		fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, | ||||||
|  | 								O_RDWR, false); | ||||||
|  |  | ||||||
| 		/* OK, truncate the file at the right offset */ | 		/* OK, truncate the file at the right offset */ | ||||||
| 		BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno, | 		BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno, | ||||||
| @@ -1327,7 +1302,6 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) | |||||||
| 	int			nchanges; | 	int			nchanges; | ||||||
| 	char		path[MAXPGPATH]; | 	char		path[MAXPGPATH]; | ||||||
| 	char	   *buffer = NULL; | 	char	   *buffer = NULL; | ||||||
| 	StreamXidHash *ent; |  | ||||||
| 	MemoryContext oldcxt; | 	MemoryContext oldcxt; | ||||||
| 	BufFile    *fd; | 	BufFile    *fd; | ||||||
|  |  | ||||||
| @@ -1345,17 +1319,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) | |||||||
| 	changes_filename(path, MyLogicalRepWorker->subid, xid); | 	changes_filename(path, MyLogicalRepWorker->subid, xid); | ||||||
| 	elog(DEBUG1, "replaying changes from file \"%s\"", path); | 	elog(DEBUG1, "replaying changes from file \"%s\"", path); | ||||||
|  |  | ||||||
| 	ent = (StreamXidHash *) hash_search(xidhash, | 	fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY, | ||||||
| 										(void *) &xid, | 							false); | ||||||
| 										HASH_FIND, |  | ||||||
| 										NULL); |  | ||||||
| 	if (!ent) |  | ||||||
| 		ereport(ERROR, |  | ||||||
| 				(errcode(ERRCODE_PROTOCOL_VIOLATION), |  | ||||||
| 				 errmsg_internal("transaction %u not found in stream XID hash table", |  | ||||||
| 								 xid))); |  | ||||||
|  |  | ||||||
| 	fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY); |  | ||||||
|  |  | ||||||
| 	buffer = palloc(BLCKSZ); | 	buffer = palloc(BLCKSZ); | ||||||
| 	initStringInfo(&s2); | 	initStringInfo(&s2); | ||||||
| @@ -2541,30 +2506,6 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| /* |  | ||||||
|  * Cleanup filesets. |  | ||||||
|  */ |  | ||||||
| void |  | ||||||
| logicalrep_worker_cleanupfileset(void) |  | ||||||
| { |  | ||||||
| 	HASH_SEQ_STATUS status; |  | ||||||
| 	StreamXidHash *hentry; |  | ||||||
|  |  | ||||||
| 	/* Remove all the pending stream and subxact filesets. */ |  | ||||||
| 	if (xidhash) |  | ||||||
| 	{ |  | ||||||
| 		hash_seq_init(&status, xidhash); |  | ||||||
| 		while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL) |  | ||||||
| 		{ |  | ||||||
| 			FileSetDeleteAll(hentry->stream_fileset); |  | ||||||
|  |  | ||||||
| 			/* Delete the subxact fileset iff it is created. */ |  | ||||||
| 			if (hentry->subxact_fileset) |  | ||||||
| 				FileSetDeleteAll(hentry->subxact_fileset); |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| /* | /* | ||||||
|  * Apply main loop. |  * Apply main loop. | ||||||
|  */ |  */ | ||||||
| @@ -3026,58 +2967,30 @@ subxact_info_write(Oid subid, TransactionId xid) | |||||||
| { | { | ||||||
| 	char		path[MAXPGPATH]; | 	char		path[MAXPGPATH]; | ||||||
| 	Size		len; | 	Size		len; | ||||||
| 	StreamXidHash *ent; |  | ||||||
| 	BufFile    *fd; | 	BufFile    *fd; | ||||||
|  |  | ||||||
| 	Assert(TransactionIdIsValid(xid)); | 	Assert(TransactionIdIsValid(xid)); | ||||||
|  |  | ||||||
| 	/* Find the xid entry in the xidhash */ | 	/* construct the subxact filename */ | ||||||
| 	ent = (StreamXidHash *) hash_search(xidhash, | 	subxact_filename(path, subid, xid); | ||||||
| 										(void *) &xid, |  | ||||||
| 										HASH_FIND, |  | ||||||
| 										NULL); |  | ||||||
| 	/* By this time we must have created the transaction entry */ |  | ||||||
| 	Assert(ent); |  | ||||||
|  |  | ||||||
| 	/* | 	/* Delete the subxacts file, if exists. */ | ||||||
| 	 * If there is no subtransaction then nothing to do, but if already have |  | ||||||
| 	 * subxact file then delete that. |  | ||||||
| 	 */ |  | ||||||
| 	if (subxact_data.nsubxacts == 0) | 	if (subxact_data.nsubxacts == 0) | ||||||
| 	{ | 	{ | ||||||
| 		if (ent->subxact_fileset) | 		cleanup_subxact_info(); | ||||||
| 		{ | 		BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true); | ||||||
| 			cleanup_subxact_info(); |  | ||||||
| 			FileSetDeleteAll(ent->subxact_fileset); |  | ||||||
| 			pfree(ent->subxact_fileset); |  | ||||||
| 			ent->subxact_fileset = NULL; |  | ||||||
| 		} |  | ||||||
| 		return; | 		return; | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	subxact_filename(path, subid, xid); |  | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * Create the subxact file if it not already created, otherwise open the | 	 * Create the subxact file if it not already created, otherwise open the | ||||||
| 	 * existing file. | 	 * existing file. | ||||||
| 	 */ | 	 */ | ||||||
| 	if (ent->subxact_fileset == NULL) | 	fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR, | ||||||
| 	{ | 							true); | ||||||
| 		MemoryContext oldctx; | 	if (fd == NULL) | ||||||
|  | 		fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path); | ||||||
| 		/* |  | ||||||
| 		 * We need to maintain fileset across multiple stream start/stop |  | ||||||
| 		 * calls. So, need to allocate it in a persistent context. |  | ||||||
| 		 */ |  | ||||||
| 		oldctx = MemoryContextSwitchTo(ApplyContext); |  | ||||||
| 		ent->subxact_fileset = palloc(sizeof(FileSet)); |  | ||||||
| 		FileSetInit(ent->subxact_fileset); |  | ||||||
| 		MemoryContextSwitchTo(oldctx); |  | ||||||
|  |  | ||||||
| 		fd = BufFileCreateFileSet(ent->subxact_fileset, path); |  | ||||||
| 	} |  | ||||||
| 	else |  | ||||||
| 		fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR); |  | ||||||
|  |  | ||||||
| 	len = sizeof(SubXactInfo) * subxact_data.nsubxacts; | 	len = sizeof(SubXactInfo) * subxact_data.nsubxacts; | ||||||
|  |  | ||||||
| @@ -3104,34 +3017,21 @@ subxact_info_read(Oid subid, TransactionId xid) | |||||||
| 	char		path[MAXPGPATH]; | 	char		path[MAXPGPATH]; | ||||||
| 	Size		len; | 	Size		len; | ||||||
| 	BufFile    *fd; | 	BufFile    *fd; | ||||||
| 	StreamXidHash *ent; |  | ||||||
| 	MemoryContext oldctx; | 	MemoryContext oldctx; | ||||||
|  |  | ||||||
| 	Assert(!subxact_data.subxacts); | 	Assert(!subxact_data.subxacts); | ||||||
| 	Assert(subxact_data.nsubxacts == 0); | 	Assert(subxact_data.nsubxacts == 0); | ||||||
| 	Assert(subxact_data.nsubxacts_max == 0); | 	Assert(subxact_data.nsubxacts_max == 0); | ||||||
|  |  | ||||||
| 	/* Find the stream xid entry in the xidhash */ |  | ||||||
| 	ent = (StreamXidHash *) hash_search(xidhash, |  | ||||||
| 										(void *) &xid, |  | ||||||
| 										HASH_FIND, |  | ||||||
| 										NULL); |  | ||||||
| 	if (!ent) |  | ||||||
| 		ereport(ERROR, |  | ||||||
| 				(errcode(ERRCODE_PROTOCOL_VIOLATION), |  | ||||||
| 				 errmsg_internal("transaction %u not found in stream XID hash table", |  | ||||||
| 								 xid))); |  | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * If subxact_fileset is not valid that mean we don't have any subxact | 	 * If the subxact file doesn't exist that means we don't have any subxact | ||||||
| 	 * info | 	 * info. | ||||||
| 	 */ | 	 */ | ||||||
| 	if (ent->subxact_fileset == NULL) |  | ||||||
| 		return; |  | ||||||
|  |  | ||||||
| 	subxact_filename(path, subid, xid); | 	subxact_filename(path, subid, xid); | ||||||
|  | 	fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY, | ||||||
| 	fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY); | 							true); | ||||||
|  | 	if (fd == NULL) | ||||||
|  | 		return; | ||||||
|  |  | ||||||
| 	/* read number of subxact items */ | 	/* read number of subxact items */ | ||||||
| 	if (BufFileRead(fd, &subxact_data.nsubxacts, | 	if (BufFileRead(fd, &subxact_data.nsubxacts, | ||||||
| @@ -3267,42 +3167,21 @@ changes_filename(char *path, Oid subid, TransactionId xid) | |||||||
|  *	  Cleanup files for a subscription / toplevel transaction. |  *	  Cleanup files for a subscription / toplevel transaction. | ||||||
|  * |  * | ||||||
|  * Remove files with serialized changes and subxact info for a particular |  * Remove files with serialized changes and subxact info for a particular | ||||||
|  * toplevel transaction. Each subscription has a separate set of files. |  * toplevel transaction. Each subscription has a separate set of files | ||||||
|  |  * for any toplevel transaction. | ||||||
|  */ |  */ | ||||||
| static void | static void | ||||||
| stream_cleanup_files(Oid subid, TransactionId xid) | stream_cleanup_files(Oid subid, TransactionId xid) | ||||||
| { | { | ||||||
| 	char		path[MAXPGPATH]; | 	char		path[MAXPGPATH]; | ||||||
| 	StreamXidHash *ent; |  | ||||||
|  |  | ||||||
| 	/* Find the xid entry in the xidhash */ | 	/* Delete the changes file. */ | ||||||
| 	ent = (StreamXidHash *) hash_search(xidhash, |  | ||||||
| 										(void *) &xid, |  | ||||||
| 										HASH_FIND, |  | ||||||
| 										NULL); |  | ||||||
| 	if (!ent) |  | ||||||
| 		ereport(ERROR, |  | ||||||
| 				(errcode(ERRCODE_PROTOCOL_VIOLATION), |  | ||||||
| 				 errmsg_internal("transaction %u not found in stream XID hash table", |  | ||||||
| 								 xid))); |  | ||||||
|  |  | ||||||
| 	/* Delete the change file and release the stream fileset memory */ |  | ||||||
| 	changes_filename(path, subid, xid); | 	changes_filename(path, subid, xid); | ||||||
| 	FileSetDeleteAll(ent->stream_fileset); | 	BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false); | ||||||
| 	pfree(ent->stream_fileset); |  | ||||||
| 	ent->stream_fileset = NULL; |  | ||||||
|  |  | ||||||
| 	/* Delete the subxact file and release the memory, if it exist */ | 	/* Delete the subxact file, if it exists. */ | ||||||
| 	if (ent->subxact_fileset) | 	subxact_filename(path, subid, xid); | ||||||
| 	{ | 	BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true); | ||||||
| 		subxact_filename(path, subid, xid); |  | ||||||
| 		FileSetDeleteAll(ent->subxact_fileset); |  | ||||||
| 		pfree(ent->subxact_fileset); |  | ||||||
| 		ent->subxact_fileset = NULL; |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	/* Remove the xid entry from the stream xid hash */ |  | ||||||
| 	hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL); |  | ||||||
| } | } | ||||||
|  |  | ||||||
| /* | /* | ||||||
| @@ -3312,8 +3191,8 @@ stream_cleanup_files(Oid subid, TransactionId xid) | |||||||
|  * |  * | ||||||
|  * Open a file for streamed changes from a toplevel transaction identified |  * Open a file for streamed changes from a toplevel transaction identified | ||||||
|  * by stream_xid (global variable). If it's the first chunk of streamed |  * by stream_xid (global variable). If it's the first chunk of streamed | ||||||
|  * changes for this transaction, initialize the fileset and create the buffile, |  * changes for this transaction, create the buffile, otherwise open the | ||||||
|  * otherwise open the previously created file. |  * previously created file. | ||||||
|  * |  * | ||||||
|  * This can only be called at the beginning of a "streaming" block, i.e. |  * This can only be called at the beginning of a "streaming" block, i.e. | ||||||
|  * between stream_start/stream_stop messages from the upstream. |  * between stream_start/stream_stop messages from the upstream. | ||||||
| @@ -3322,20 +3201,13 @@ static void | |||||||
| stream_open_file(Oid subid, TransactionId xid, bool first_segment) | stream_open_file(Oid subid, TransactionId xid, bool first_segment) | ||||||
| { | { | ||||||
| 	char		path[MAXPGPATH]; | 	char		path[MAXPGPATH]; | ||||||
| 	bool		found; |  | ||||||
| 	MemoryContext oldcxt; | 	MemoryContext oldcxt; | ||||||
| 	StreamXidHash *ent; |  | ||||||
|  |  | ||||||
| 	Assert(in_streamed_transaction); | 	Assert(in_streamed_transaction); | ||||||
| 	Assert(OidIsValid(subid)); | 	Assert(OidIsValid(subid)); | ||||||
| 	Assert(TransactionIdIsValid(xid)); | 	Assert(TransactionIdIsValid(xid)); | ||||||
| 	Assert(stream_fd == NULL); | 	Assert(stream_fd == NULL); | ||||||
|  |  | ||||||
| 	/* create or find the xid entry in the xidhash */ |  | ||||||
| 	ent = (StreamXidHash *) hash_search(xidhash, |  | ||||||
| 										(void *) &xid, |  | ||||||
| 										HASH_ENTER, |  | ||||||
| 										&found); |  | ||||||
|  |  | ||||||
| 	changes_filename(path, subid, xid); | 	changes_filename(path, subid, xid); | ||||||
| 	elog(DEBUG1, "opening file \"%s\" for streamed changes", path); | 	elog(DEBUG1, "opening file \"%s\" for streamed changes", path); | ||||||
| @@ -3347,49 +3219,20 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) | |||||||
| 	oldcxt = MemoryContextSwitchTo(LogicalStreamingContext); | 	oldcxt = MemoryContextSwitchTo(LogicalStreamingContext); | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * If this is the first streamed segment, the file must not exist, so make | 	 * If this is the first streamed segment, create the changes file. | ||||||
| 	 * sure we're the ones creating it. Otherwise just open the file for | 	 * Otherwise, just open the file for writing, in append mode. | ||||||
| 	 * writing, in append mode. |  | ||||||
| 	 */ | 	 */ | ||||||
| 	if (first_segment) | 	if (first_segment) | ||||||
| 	{ | 		stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, | ||||||
| 		MemoryContext savectx; | 										 path); | ||||||
| 		FileSet    *fileset; |  | ||||||
|  |  | ||||||
| 		if (found) |  | ||||||
| 			ereport(ERROR, |  | ||||||
| 					(errcode(ERRCODE_PROTOCOL_VIOLATION), |  | ||||||
| 					 errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); |  | ||||||
|  |  | ||||||
| 		/* |  | ||||||
| 		 * We need to maintain fileset across multiple stream start/stop |  | ||||||
| 		 * calls. So, need to allocate it in a persistent context. |  | ||||||
| 		 */ |  | ||||||
| 		savectx = MemoryContextSwitchTo(ApplyContext); |  | ||||||
| 		fileset = palloc(sizeof(FileSet)); |  | ||||||
|  |  | ||||||
| 		FileSetInit(fileset); |  | ||||||
| 		MemoryContextSwitchTo(savectx); |  | ||||||
|  |  | ||||||
| 		stream_fd = BufFileCreateFileSet(fileset, path); |  | ||||||
|  |  | ||||||
| 		/* Remember the fileset for the next stream of the same transaction */ |  | ||||||
| 		ent->xid = xid; |  | ||||||
| 		ent->stream_fileset = fileset; |  | ||||||
| 		ent->subxact_fileset = NULL; |  | ||||||
| 	} |  | ||||||
| 	else | 	else | ||||||
| 	{ | 	{ | ||||||
| 		if (!found) |  | ||||||
| 			ereport(ERROR, |  | ||||||
| 					(errcode(ERRCODE_PROTOCOL_VIOLATION), |  | ||||||
| 					 errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); |  | ||||||
|  |  | ||||||
| 		/* | 		/* | ||||||
| 		 * Open the file and seek to the end of the file because we always | 		 * Open the file and seek to the end of the file because we always | ||||||
| 		 * append the changes file. | 		 * append the changes file. | ||||||
| 		 */ | 		 */ | ||||||
| 		stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR); | 		stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, | ||||||
|  | 									   path, O_RDWR, false); | ||||||
| 		BufFileSeek(stream_fd, 0, 0, SEEK_END); | 		BufFileSeek(stream_fd, 0, 0, SEEK_END); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -278,10 +278,13 @@ BufFileCreateFileSet(FileSet *fileset, const char *name) | |||||||
|  * with BufFileCreateFileSet in the same FileSet using the same name. |  * with BufFileCreateFileSet in the same FileSet using the same name. | ||||||
|  * The backend that created the file must have called BufFileClose() or |  * The backend that created the file must have called BufFileClose() or | ||||||
|  * BufFileExportFileSet() to make sure that it is ready to be opened by other |  * BufFileExportFileSet() to make sure that it is ready to be opened by other | ||||||
|  * backends and render it read-only. |  * backends and render it read-only.  If missing_ok is true, which indicates | ||||||
|  |  * that missing files can be safely ignored, then return NULL if the BufFile | ||||||
|  |  * with the given name is not found, otherwise, throw an error. | ||||||
|  */ |  */ | ||||||
| BufFile * | BufFile * | ||||||
| BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) | BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, | ||||||
|  | 				   bool missing_ok) | ||||||
| { | { | ||||||
| 	BufFile    *file; | 	BufFile    *file; | ||||||
| 	char		segment_name[MAXPGPATH]; | 	char		segment_name[MAXPGPATH]; | ||||||
| @@ -318,10 +321,18 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) | |||||||
| 	 * name. | 	 * name. | ||||||
| 	 */ | 	 */ | ||||||
| 	if (nfiles == 0) | 	if (nfiles == 0) | ||||||
|  | 	{ | ||||||
|  | 		/* free the memory */ | ||||||
|  | 		pfree(files); | ||||||
|  |  | ||||||
|  | 		if (missing_ok) | ||||||
|  | 			return NULL; | ||||||
|  |  | ||||||
| 		ereport(ERROR, | 		ereport(ERROR, | ||||||
| 				(errcode_for_file_access(), | 				(errcode_for_file_access(), | ||||||
| 				 errmsg("could not open temporary file \"%s\" from BufFile \"%s\": %m", | 				 errmsg("could not open temporary file \"%s\" from BufFile \"%s\": %m", | ||||||
| 						segment_name, name))); | 						segment_name, name))); | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	file = makeBufFileCommon(nfiles); | 	file = makeBufFileCommon(nfiles); | ||||||
| 	file->files = files; | 	file->files = files; | ||||||
| @@ -341,10 +352,11 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) | |||||||
|  * the FileSet to be cleaned up. |  * the FileSet to be cleaned up. | ||||||
|  * |  * | ||||||
|  * Only one backend should attempt to delete a given name, and should know |  * Only one backend should attempt to delete a given name, and should know | ||||||
|  * that it exists and has been exported or closed. |  * that it exists and has been exported or closed otherwise missing_ok should | ||||||
|  |  * be passed true. | ||||||
|  */ |  */ | ||||||
| void | void | ||||||
| BufFileDeleteFileSet(FileSet *fileset, const char *name) | BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok) | ||||||
| { | { | ||||||
| 	char		segment_name[MAXPGPATH]; | 	char		segment_name[MAXPGPATH]; | ||||||
| 	int			segment = 0; | 	int			segment = 0; | ||||||
| @@ -366,7 +378,7 @@ BufFileDeleteFileSet(FileSet *fileset, const char *name) | |||||||
| 		CHECK_FOR_INTERRUPTS(); | 		CHECK_FOR_INTERRUPTS(); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if (!found) | 	if (!found && !missing_ok) | ||||||
| 		elog(ERROR, "could not delete unknown BufFile \"%s\"", name); | 		elog(ERROR, "could not delete unknown BufFile \"%s\"", name); | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -564,7 +564,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, | |||||||
| 		lt = <s->tapes[i]; | 		lt = <s->tapes[i]; | ||||||
|  |  | ||||||
| 		pg_itoa(i, filename); | 		pg_itoa(i, filename); | ||||||
| 		file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY); | 		file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY, false); | ||||||
| 		filesize = BufFileSize(file); | 		filesize = BufFileSize(file); | ||||||
|  |  | ||||||
| 		/* | 		/* | ||||||
|   | |||||||
| @@ -560,7 +560,8 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) | |||||||
|  |  | ||||||
| 				sts_filename(name, accessor, accessor->read_participant); | 				sts_filename(name, accessor, accessor->read_participant); | ||||||
| 				accessor->read_file = | 				accessor->read_file = | ||||||
| 					BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY); | 					BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY, | ||||||
|  | 									   false); | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			/* Seek and load the chunk header. */ | 			/* Seek and load the chunk header. */ | ||||||
|   | |||||||
| @@ -50,6 +50,15 @@ typedef struct LogicalRepWorker | |||||||
| 	XLogRecPtr	relstate_lsn; | 	XLogRecPtr	relstate_lsn; | ||||||
| 	slock_t		relmutex; | 	slock_t		relmutex; | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 	 * Used to create the changes and subxact files for the streaming | ||||||
|  | 	 * transactions.  Upon the arrival of the first streaming transaction, the | ||||||
|  | 	 * fileset will be initialized, and it will be deleted when the worker | ||||||
|  | 	 * exits.  Under this, separate buffiles would be created for each | ||||||
|  | 	 * transaction which will be deleted after the transaction is finished. | ||||||
|  | 	 */ | ||||||
|  | 	FileSet    *stream_fileset; | ||||||
|  |  | ||||||
| 	/* Stats. */ | 	/* Stats. */ | ||||||
| 	XLogRecPtr	last_lsn; | 	XLogRecPtr	last_lsn; | ||||||
| 	TimestampTz last_send_time; | 	TimestampTz last_send_time; | ||||||
| @@ -79,7 +88,6 @@ extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, | |||||||
| extern void logicalrep_worker_stop(Oid subid, Oid relid); | extern void logicalrep_worker_stop(Oid subid, Oid relid); | ||||||
| extern void logicalrep_worker_wakeup(Oid subid, Oid relid); | extern void logicalrep_worker_wakeup(Oid subid, Oid relid); | ||||||
| extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); | extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); | ||||||
| extern void logicalrep_worker_cleanupfileset(void); |  | ||||||
|  |  | ||||||
| extern int	logicalrep_sync_worker_count(Oid subid); | extern int	logicalrep_sync_worker_count(Oid subid); | ||||||
|  |  | ||||||
|   | |||||||
| @@ -49,8 +49,9 @@ extern long BufFileAppend(BufFile *target, BufFile *source); | |||||||
| extern BufFile *BufFileCreateFileSet(FileSet *fileset, const char *name); | extern BufFile *BufFileCreateFileSet(FileSet *fileset, const char *name); | ||||||
| extern void BufFileExportFileSet(BufFile *file); | extern void BufFileExportFileSet(BufFile *file); | ||||||
| extern BufFile *BufFileOpenFileSet(FileSet *fileset, const char *name, | extern BufFile *BufFileOpenFileSet(FileSet *fileset, const char *name, | ||||||
| 								   int mode); | 								   int mode, bool missing_ok); | ||||||
| extern void BufFileDeleteFileSet(FileSet *fileset, const char *name); | extern void BufFileDeleteFileSet(FileSet *fileset, const char *name, | ||||||
|  | 								 bool missing_ok); | ||||||
| extern void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset); | extern void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset); | ||||||
|  |  | ||||||
| #endif							/* BUFFILE_H */ | #endif							/* BUFFILE_H */ | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user