mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-25 13:17:41 +03:00 
			
		
		
		
	Provide vectored variant of ReadBuffer().
Break ReadBuffer() up into two steps. StartReadBuffers() and WaitReadBuffers() give us two main advantages: 1. Multiple consecutive blocks can be read with one system call. 2. Advice (hints of future reads) can optionally be issued to the kernel ahead of time. The traditional ReadBuffer() function is now implemented in terms of those functions, to avoid duplication. A new GUC io_combine_limit is defined, and the functions for limiting per-backend pin counts are made into public APIs. Those are provided for use by callers of StartReadBuffers(), when deciding how many buffers to read at once. The following commit will add a higher level mechanism for doing that automatically with a practical interface. With some more infrastructure in later work, StartReadBuffers() could be extended to start real asynchronous I/O instead of just issuing advice and leaving WaitReadBuffers() to do the work synchronously. Author: Thomas Munro <thomas.munro@gmail.com> Author: Andres Freund <andres@anarazel.de> (some optimization tweaks) Reviewed-by: Melanie Plageman <melanieplageman@gmail.com> Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi> Reviewed-by: Nazir Bilal Yavuz <byavuz81@gmail.com> Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com> Reviewed-by: Andres Freund <andres@anarazel.de> Tested-by: Tomas Vondra <tomas.vondra@enterprisedb.com> Discussion: https://postgr.es/m/CA+hUKGJkOiOCa+mag4BF+zHo7qo=o9CFheB8=g6uT5TUm2gkvA@mail.gmail.com
This commit is contained in:
		| @@ -2708,6 +2708,20 @@ include_dir 'conf.d' | |||||||
|        </listitem> |        </listitem> | ||||||
|       </varlistentry> |       </varlistentry> | ||||||
|  |  | ||||||
|  |       <varlistentry id="guc-io-combine-limit" xreflabel="io_combine_limit"> | ||||||
|  |        <term><varname>io_combine_limit</varname> (<type>integer</type>) | ||||||
|  |        <indexterm> | ||||||
|  |         <primary><varname>io_combine_limit</varname> configuration parameter</primary> | ||||||
|  |        </indexterm> | ||||||
|  |        </term> | ||||||
|  |        <listitem> | ||||||
|  |         <para> | ||||||
|  |          Controls the largest I/O size in operations that combine I/O. | ||||||
|  |          The default is 128kB. | ||||||
|  |         </para> | ||||||
|  |        </listitem> | ||||||
|  |       </varlistentry> | ||||||
|  |  | ||||||
|       <varlistentry id="guc-max-worker-processes" xreflabel="max_worker_processes"> |       <varlistentry id="guc-max-worker-processes" xreflabel="max_worker_processes"> | ||||||
|        <term><varname>max_worker_processes</varname> (<type>integer</type>) |        <term><varname>max_worker_processes</varname> (<type>integer</type>) | ||||||
|        <indexterm> |        <indexterm> | ||||||
|   | |||||||
| @@ -19,6 +19,10 @@ | |||||||
|  *		and pin it so that no one can destroy it while this process |  *		and pin it so that no one can destroy it while this process | ||||||
|  *		is using it. |  *		is using it. | ||||||
|  * |  * | ||||||
|  |  * StartReadBuffer() -- as above, with separate wait step | ||||||
|  |  * StartReadBuffers() -- multiple block version | ||||||
|  |  * WaitReadBuffers() -- second step of above | ||||||
|  |  * | ||||||
|  * ReleaseBuffer() -- unpin a buffer |  * ReleaseBuffer() -- unpin a buffer | ||||||
|  * |  * | ||||||
|  * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty". |  * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty". | ||||||
| @@ -152,6 +156,13 @@ int			effective_io_concurrency = DEFAULT_EFFECTIVE_IO_CONCURRENCY; | |||||||
|  */ |  */ | ||||||
| int			maintenance_io_concurrency = DEFAULT_MAINTENANCE_IO_CONCURRENCY; | int			maintenance_io_concurrency = DEFAULT_MAINTENANCE_IO_CONCURRENCY; | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Limit on how many blocks should be handled in single I/O operations. | ||||||
|  |  * StartReadBuffers() callers should respect it, as should other operations | ||||||
|  |  * that call smgr APIs directly. | ||||||
|  |  */ | ||||||
|  | int			io_combine_limit = DEFAULT_IO_COMBINE_LIMIT; | ||||||
|  |  | ||||||
| /* | /* | ||||||
|  * GUC variables about triggering kernel writeback for buffers written; OS |  * GUC variables about triggering kernel writeback for buffers written; OS | ||||||
|  * dependent defaults are set via the GUC mechanism. |  * dependent defaults are set via the GUC mechanism. | ||||||
| @@ -471,10 +482,10 @@ ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref) | |||||||
| ) | ) | ||||||
|  |  | ||||||
|  |  | ||||||
| static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence, | static Buffer ReadBuffer_common(Relation rel, | ||||||
|  | 								SMgrRelation smgr, char smgr_persistence, | ||||||
| 								ForkNumber forkNum, BlockNumber blockNum, | 								ForkNumber forkNum, BlockNumber blockNum, | ||||||
| 								ReadBufferMode mode, BufferAccessStrategy strategy, | 								ReadBufferMode mode, BufferAccessStrategy strategy); | ||||||
| 								bool *hit); |  | ||||||
| static BlockNumber ExtendBufferedRelCommon(BufferManagerRelation bmr, | static BlockNumber ExtendBufferedRelCommon(BufferManagerRelation bmr, | ||||||
| 										   ForkNumber fork, | 										   ForkNumber fork, | ||||||
| 										   BufferAccessStrategy strategy, | 										   BufferAccessStrategy strategy, | ||||||
| @@ -500,13 +511,13 @@ static uint32 WaitBufHdrUnlocked(BufferDesc *buf); | |||||||
| static int	SyncOneBuffer(int buf_id, bool skip_recently_used, | static int	SyncOneBuffer(int buf_id, bool skip_recently_used, | ||||||
| 						  WritebackContext *wb_context); | 						  WritebackContext *wb_context); | ||||||
| static void WaitIO(BufferDesc *buf); | static void WaitIO(BufferDesc *buf); | ||||||
| static bool StartBufferIO(BufferDesc *buf, bool forInput); | static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait); | ||||||
| static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, | static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, | ||||||
| 							  uint32 set_flag_bits, bool forget_owner); | 							  uint32 set_flag_bits, bool forget_owner); | ||||||
| static void AbortBufferIO(Buffer buffer); | static void AbortBufferIO(Buffer buffer); | ||||||
| static void shared_buffer_write_error_callback(void *arg); | static void shared_buffer_write_error_callback(void *arg); | ||||||
| static void local_buffer_write_error_callback(void *arg); | static void local_buffer_write_error_callback(void *arg); | ||||||
| static BufferDesc *BufferAlloc(SMgrRelation smgr, | static inline BufferDesc *BufferAlloc(SMgrRelation smgr, | ||||||
| 									  char relpersistence, | 									  char relpersistence, | ||||||
| 									  ForkNumber forkNum, | 									  ForkNumber forkNum, | ||||||
| 									  BlockNumber blockNum, | 									  BlockNumber blockNum, | ||||||
| @@ -777,11 +788,10 @@ ReadBuffer(Relation reln, BlockNumber blockNum) | |||||||
|  * If strategy is not NULL, a nondefault buffer access strategy is used. |  * If strategy is not NULL, a nondefault buffer access strategy is used. | ||||||
|  * See buffer/README for details. |  * See buffer/README for details. | ||||||
|  */ |  */ | ||||||
| Buffer | inline Buffer | ||||||
| ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum, | ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum, | ||||||
| 				   ReadBufferMode mode, BufferAccessStrategy strategy) | 				   ReadBufferMode mode, BufferAccessStrategy strategy) | ||||||
| { | { | ||||||
| 	bool		hit; |  | ||||||
| 	Buffer		buf; | 	Buffer		buf; | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| @@ -798,11 +808,9 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum, | |||||||
| 	 * Read the buffer, and update pgstat counters to reflect a cache hit or | 	 * Read the buffer, and update pgstat counters to reflect a cache hit or | ||||||
| 	 * miss. | 	 * miss. | ||||||
| 	 */ | 	 */ | ||||||
| 	pgstat_count_buffer_read(reln); | 	buf = ReadBuffer_common(reln, RelationGetSmgr(reln), 0, | ||||||
| 	buf = ReadBuffer_common(RelationGetSmgr(reln), reln->rd_rel->relpersistence, | 							forkNum, blockNum, mode, strategy); | ||||||
| 							forkNum, blockNum, mode, strategy, &hit); |  | ||||||
| 	if (hit) |  | ||||||
| 		pgstat_count_buffer_hit(reln); |  | ||||||
| 	return buf; | 	return buf; | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -822,13 +830,12 @@ ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum, | |||||||
| 						  BlockNumber blockNum, ReadBufferMode mode, | 						  BlockNumber blockNum, ReadBufferMode mode, | ||||||
| 						  BufferAccessStrategy strategy, bool permanent) | 						  BufferAccessStrategy strategy, bool permanent) | ||||||
| { | { | ||||||
| 	bool		hit; |  | ||||||
|  |  | ||||||
| 	SMgrRelation smgr = smgropen(rlocator, INVALID_PROC_NUMBER); | 	SMgrRelation smgr = smgropen(rlocator, INVALID_PROC_NUMBER); | ||||||
|  |  | ||||||
| 	return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT : | 	return ReadBuffer_common(NULL, smgr, | ||||||
| 							 RELPERSISTENCE_UNLOGGED, forkNum, blockNum, | 							 permanent ? RELPERSISTENCE_PERMANENT : RELPERSISTENCE_UNLOGGED, | ||||||
| 							 mode, strategy, &hit); | 							 forkNum, blockNum, | ||||||
|  | 							 mode, strategy); | ||||||
| } | } | ||||||
|  |  | ||||||
| /* | /* | ||||||
| @@ -994,35 +1001,162 @@ ExtendBufferedRelTo(BufferManagerRelation bmr, | |||||||
| 	 */ | 	 */ | ||||||
| 	if (buffer == InvalidBuffer) | 	if (buffer == InvalidBuffer) | ||||||
| 	{ | 	{ | ||||||
| 		bool		hit; |  | ||||||
|  |  | ||||||
| 		Assert(extended_by == 0); | 		Assert(extended_by == 0); | ||||||
| 		buffer = ReadBuffer_common(bmr.smgr, bmr.relpersistence, | 		buffer = ReadBuffer_common(bmr.rel, bmr.smgr, 0, | ||||||
| 								   fork, extend_to - 1, mode, strategy, | 								   fork, extend_to - 1, mode, strategy); | ||||||
| 								   &hit); |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return buffer; | 	return buffer; | ||||||
| } | } | ||||||
|  |  | ||||||
| /* | /* | ||||||
|  * ReadBuffer_common -- common logic for all ReadBuffer variants |  * Zero a buffer and lock it, as part of the implementation of | ||||||
|  * |  * RBM_ZERO_AND_LOCK or RBM_ZERO_AND_CLEANUP_LOCK.  The buffer must be already | ||||||
|  * *hit is set to true if the request was satisfied from shared buffer cache. |  * pinned.  It does not have to be valid, but it is valid and locked on | ||||||
|  |  * return. | ||||||
|  */ |  */ | ||||||
| static Buffer | static void | ||||||
| ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, | ZeroBuffer(Buffer buffer, ReadBufferMode mode) | ||||||
| 				  BlockNumber blockNum, ReadBufferMode mode, | { | ||||||
| 				  BufferAccessStrategy strategy, bool *hit) | 	BufferDesc *bufHdr; | ||||||
|  | 	uint32		buf_state; | ||||||
|  |  | ||||||
|  | 	Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK); | ||||||
|  |  | ||||||
|  | 	if (BufferIsLocal(buffer)) | ||||||
|  | 		bufHdr = GetLocalBufferDescriptor(-buffer - 1); | ||||||
|  | 	else | ||||||
|  | 	{ | ||||||
|  | 		bufHdr = GetBufferDescriptor(buffer - 1); | ||||||
|  | 		if (mode == RBM_ZERO_AND_LOCK) | ||||||
|  | 			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); | ||||||
|  | 		else | ||||||
|  | 			LockBufferForCleanup(buffer); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	memset(BufferGetPage(buffer), 0, BLCKSZ); | ||||||
|  |  | ||||||
|  | 	if (BufferIsLocal(buffer)) | ||||||
|  | 	{ | ||||||
|  | 		buf_state = pg_atomic_read_u32(&bufHdr->state); | ||||||
|  | 		buf_state |= BM_VALID; | ||||||
|  | 		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); | ||||||
|  | 	} | ||||||
|  | 	else | ||||||
|  | 	{ | ||||||
|  | 		buf_state = LockBufHdr(bufHdr); | ||||||
|  | 		buf_state |= BM_VALID; | ||||||
|  | 		UnlockBufHdr(bufHdr, buf_state); | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Pin a buffer for a given block.  *foundPtr is set to true if the block was | ||||||
|  |  * already present, or false if more work is required to either read it in or | ||||||
|  |  * zero it. | ||||||
|  |  */ | ||||||
|  | static pg_attribute_always_inline Buffer | ||||||
|  | PinBufferForBlock(Relation rel, | ||||||
|  | 				  SMgrRelation smgr, | ||||||
|  | 				  char smgr_persistence, | ||||||
|  | 				  ForkNumber forkNum, | ||||||
|  | 				  BlockNumber blockNum, | ||||||
|  | 				  BufferAccessStrategy strategy, | ||||||
|  | 				  bool *foundPtr) | ||||||
| { | { | ||||||
| 	BufferDesc *bufHdr; | 	BufferDesc *bufHdr; | ||||||
| 	Block		bufBlock; |  | ||||||
| 	bool		found; |  | ||||||
| 	IOContext	io_context; | 	IOContext	io_context; | ||||||
| 	IOObject	io_object; | 	IOObject	io_object; | ||||||
| 	bool		isLocalBuf = SmgrIsTemp(smgr); | 	char		persistence; | ||||||
|  |  | ||||||
| 	*hit = false; | 	Assert(blockNum != P_NEW); | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 	 * If there is no Relation it usually implies recovery and thus permanent, | ||||||
|  | 	 * but we take an argmument because CreateAndCopyRelationData can reach us | ||||||
|  | 	 * with only an SMgrRelation for an unlogged relation that we don't want | ||||||
|  | 	 * to flag with BM_PERMANENT. | ||||||
|  | 	 */ | ||||||
|  | 	if (rel) | ||||||
|  | 		persistence = rel->rd_rel->relpersistence; | ||||||
|  | 	else if (smgr_persistence == 0) | ||||||
|  | 		persistence = RELPERSISTENCE_PERMANENT; | ||||||
|  | 	else | ||||||
|  | 		persistence = smgr_persistence; | ||||||
|  |  | ||||||
|  | 	if (persistence == RELPERSISTENCE_TEMP) | ||||||
|  | 	{ | ||||||
|  | 		io_context = IOCONTEXT_NORMAL; | ||||||
|  | 		io_object = IOOBJECT_TEMP_RELATION; | ||||||
|  | 	} | ||||||
|  | 	else | ||||||
|  | 	{ | ||||||
|  | 		io_context = IOContextForStrategy(strategy); | ||||||
|  | 		io_object = IOOBJECT_RELATION; | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum, | ||||||
|  | 									   smgr->smgr_rlocator.locator.spcOid, | ||||||
|  | 									   smgr->smgr_rlocator.locator.dbOid, | ||||||
|  | 									   smgr->smgr_rlocator.locator.relNumber, | ||||||
|  | 									   smgr->smgr_rlocator.backend); | ||||||
|  |  | ||||||
|  | 	if (persistence == RELPERSISTENCE_TEMP) | ||||||
|  | 	{ | ||||||
|  | 		bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, foundPtr); | ||||||
|  | 		if (*foundPtr) | ||||||
|  | 			pgBufferUsage.local_blks_hit++; | ||||||
|  | 	} | ||||||
|  | 	else | ||||||
|  | 	{ | ||||||
|  | 		bufHdr = BufferAlloc(smgr, persistence, forkNum, blockNum, | ||||||
|  | 							 strategy, foundPtr, io_context); | ||||||
|  | 		if (*foundPtr) | ||||||
|  | 			pgBufferUsage.shared_blks_hit++; | ||||||
|  | 	} | ||||||
|  | 	if (rel) | ||||||
|  | 	{ | ||||||
|  | 		/* | ||||||
|  | 		 * While pgBufferUsage's "read" counter isn't bumped unless we reach | ||||||
|  | 		 * WaitReadBuffers() (so, not for hits, and not for buffers that are | ||||||
|  | 		 * zeroed instead), the per-relation stats always count them. | ||||||
|  | 		 */ | ||||||
|  | 		pgstat_count_buffer_read(rel); | ||||||
|  | 		if (*foundPtr) | ||||||
|  | 			pgstat_count_buffer_hit(rel); | ||||||
|  | 	} | ||||||
|  | 	if (*foundPtr) | ||||||
|  | 	{ | ||||||
|  | 		VacuumPageHit++; | ||||||
|  | 		pgstat_count_io_op(io_object, io_context, IOOP_HIT); | ||||||
|  | 		if (VacuumCostActive) | ||||||
|  | 			VacuumCostBalance += VacuumCostPageHit; | ||||||
|  |  | ||||||
|  | 		TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, | ||||||
|  | 										  smgr->smgr_rlocator.locator.spcOid, | ||||||
|  | 										  smgr->smgr_rlocator.locator.dbOid, | ||||||
|  | 										  smgr->smgr_rlocator.locator.relNumber, | ||||||
|  | 										  smgr->smgr_rlocator.backend, | ||||||
|  | 										  true); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return BufferDescriptorGetBuffer(bufHdr); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * ReadBuffer_common -- common logic for all ReadBuffer variants | ||||||
|  |  * | ||||||
|  |  * smgr is required, rel is optional unless using P_NEW. | ||||||
|  |  */ | ||||||
|  | static pg_attribute_always_inline Buffer | ||||||
|  | ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence, | ||||||
|  | 				  ForkNumber forkNum, | ||||||
|  | 				  BlockNumber blockNum, ReadBufferMode mode, | ||||||
|  | 				  BufferAccessStrategy strategy) | ||||||
|  | { | ||||||
|  | 	ReadBuffersOperation operation; | ||||||
|  | 	Buffer		buffer; | ||||||
|  | 	int			flags; | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * Backward compatibility path, most code should use ExtendBufferedRel() | 	 * Backward compatibility path, most code should use ExtendBufferedRel() | ||||||
| @@ -1041,152 +1175,330 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, | |||||||
| 		if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) | 		if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) | ||||||
| 			flags |= EB_LOCK_FIRST; | 			flags |= EB_LOCK_FIRST; | ||||||
|  |  | ||||||
| 		return ExtendBufferedRel(BMR_SMGR(smgr, relpersistence), | 		return ExtendBufferedRel(BMR_REL(rel), forkNum, strategy, flags); | ||||||
| 								 forkNum, strategy, flags); |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum, | 	if (unlikely(mode == RBM_ZERO_AND_CLEANUP_LOCK || | ||||||
| 									   smgr->smgr_rlocator.locator.spcOid, | 				 mode == RBM_ZERO_AND_LOCK)) | ||||||
| 									   smgr->smgr_rlocator.locator.dbOid, | 	{ | ||||||
| 									   smgr->smgr_rlocator.locator.relNumber, | 		bool		found; | ||||||
| 									   smgr->smgr_rlocator.backend); |  | ||||||
|  |  | ||||||
| 	if (isLocalBuf) | 		buffer = PinBufferForBlock(rel, smgr, smgr_persistence, | ||||||
|  | 								   forkNum, blockNum, strategy, &found); | ||||||
|  | 		ZeroBuffer(buffer, mode); | ||||||
|  | 		return buffer; | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if (mode == RBM_ZERO_ON_ERROR) | ||||||
|  | 		flags = READ_BUFFERS_ZERO_ON_ERROR; | ||||||
|  | 	else | ||||||
|  | 		flags = 0; | ||||||
|  | 	operation.smgr = smgr; | ||||||
|  | 	operation.rel = rel; | ||||||
|  | 	operation.smgr_persistence = smgr_persistence; | ||||||
|  | 	operation.forknum = forkNum; | ||||||
|  | 	operation.strategy = strategy; | ||||||
|  | 	if (StartReadBuffer(&operation, | ||||||
|  | 						&buffer, | ||||||
|  | 						blockNum, | ||||||
|  | 						flags)) | ||||||
|  | 		WaitReadBuffers(&operation); | ||||||
|  |  | ||||||
|  | 	return buffer; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | static pg_attribute_always_inline bool | ||||||
|  | StartReadBuffersImpl(ReadBuffersOperation *operation, | ||||||
|  | 					 Buffer *buffers, | ||||||
|  | 					 BlockNumber blockNum, | ||||||
|  | 					 int *nblocks, | ||||||
|  | 					 int flags) | ||||||
|  | { | ||||||
|  | 	int			actual_nblocks = *nblocks; | ||||||
|  | 	int			io_buffers_len = 0; | ||||||
|  |  | ||||||
|  | 	Assert(*nblocks > 0); | ||||||
|  | 	Assert(*nblocks <= MAX_IO_COMBINE_LIMIT); | ||||||
|  |  | ||||||
|  | 	for (int i = 0; i < actual_nblocks; ++i) | ||||||
|  | 	{ | ||||||
|  | 		bool		found; | ||||||
|  |  | ||||||
|  | 		buffers[i] = PinBufferForBlock(operation->rel, | ||||||
|  | 									   operation->smgr, | ||||||
|  | 									   operation->smgr_persistence, | ||||||
|  | 									   operation->forknum, | ||||||
|  | 									   blockNum + i, | ||||||
|  | 									   operation->strategy, | ||||||
|  | 									   &found); | ||||||
|  |  | ||||||
|  | 		if (found) | ||||||
| 		{ | 		{ | ||||||
| 			/* | 			/* | ||||||
| 		 * We do not use a BufferAccessStrategy for I/O of temporary tables. | 			 * Terminate the read as soon as we get a hit.  It could be a | ||||||
| 		 * However, in some cases, the "strategy" may not be NULL, so we can't | 			 * single buffer hit, or it could be a hit that follows a readable | ||||||
| 		 * rely on IOContextForStrategy() to set the right IOContext for us. | 			 * range.  We don't want to create more than one readable range, | ||||||
| 		 * This may happen in cases like CREATE TEMPORARY TABLE AS... | 			 * so we stop here. | ||||||
| 			 */ | 			 */ | ||||||
|  | 			actual_nblocks = i + 1; | ||||||
|  | 			break; | ||||||
|  | 		} | ||||||
|  | 		else | ||||||
|  | 		{ | ||||||
|  | 			/* Extend the readable range to cover this block. */ | ||||||
|  | 			io_buffers_len++; | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	*nblocks = actual_nblocks; | ||||||
|  |  | ||||||
|  | 	if (likely(io_buffers_len == 0)) | ||||||
|  | 		return false; | ||||||
|  |  | ||||||
|  | 	/* Populate information needed for I/O. */ | ||||||
|  | 	operation->buffers = buffers; | ||||||
|  | 	operation->blocknum = blockNum; | ||||||
|  | 	operation->flags = flags; | ||||||
|  | 	operation->nblocks = actual_nblocks; | ||||||
|  | 	operation->io_buffers_len = io_buffers_len; | ||||||
|  |  | ||||||
|  | 	if (flags & READ_BUFFERS_ISSUE_ADVICE) | ||||||
|  | 	{ | ||||||
|  | 		/* | ||||||
|  | 		 * In theory we should only do this if PinBufferForBlock() had to | ||||||
|  | 		 * allocate new buffers above.  That way, if two calls to | ||||||
|  | 		 * StartReadBuffers() were made for the same blocks before | ||||||
|  | 		 * WaitReadBuffers(), only the first would issue the advice. That'd be | ||||||
|  | 		 * a better simulation of true asynchronous I/O, which would only | ||||||
|  | 		 * start the I/O once, but isn't done here for simplicity.  Note also | ||||||
|  | 		 * that the following call might actually issue two advice calls if we | ||||||
|  | 		 * cross a segment boundary; in a true asynchronous version we might | ||||||
|  | 		 * choose to process only one real I/O at a time in that case. | ||||||
|  | 		 */ | ||||||
|  | 		smgrprefetch(operation->smgr, | ||||||
|  | 					 operation->forknum, | ||||||
|  | 					 blockNum, | ||||||
|  | 					 operation->io_buffers_len); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	/* Indicate that WaitReadBuffers() should be called. */ | ||||||
|  | 	return true; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Begin reading a range of blocks beginning at blockNum and extending for | ||||||
|  |  * *nblocks.  On return, up to *nblocks pinned buffers holding those blocks | ||||||
|  |  * are written into the buffers array, and *nblocks is updated to contain the | ||||||
|  |  * actual number, which may be fewer than requested.  Caller sets some of the | ||||||
|  |  * members of operation; see struct definition. | ||||||
|  |  * | ||||||
|  |  * If false is returned, no I/O is necessary.  If true is returned, one I/O | ||||||
|  |  * has been started, and WaitReadBuffers() must be called with the same | ||||||
|  |  * operation object before the buffers are accessed.  Along with the operation | ||||||
|  |  * object, the caller-supplied array of buffers must remain valid until | ||||||
|  |  * WaitReadBuffers() is called. | ||||||
|  |  * | ||||||
|  |  * Currently the I/O is only started with optional operating system advice if | ||||||
|  |  * requested by the caller with READ_BUFFERS_ISSUE_ADVICE, and the real I/O | ||||||
|  |  * happens synchronously in WaitReadBuffers().  In future work, true I/O could | ||||||
|  |  * be initiated here. | ||||||
|  |  */ | ||||||
|  | bool | ||||||
|  | StartReadBuffers(ReadBuffersOperation *operation, | ||||||
|  | 				 Buffer *buffers, | ||||||
|  | 				 BlockNumber blockNum, | ||||||
|  | 				 int *nblocks, | ||||||
|  | 				 int flags) | ||||||
|  | { | ||||||
|  | 	return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Single block version of the StartReadBuffers().  This might save a few | ||||||
|  |  * instructions when called from another translation unit, because it is | ||||||
|  |  * specialized for nblocks == 1. | ||||||
|  |  */ | ||||||
|  | bool | ||||||
|  | StartReadBuffer(ReadBuffersOperation *operation, | ||||||
|  | 				Buffer *buffer, | ||||||
|  | 				BlockNumber blocknum, | ||||||
|  | 				int flags) | ||||||
|  | { | ||||||
|  | 	int			nblocks = 1; | ||||||
|  | 	bool		result; | ||||||
|  |  | ||||||
|  | 	result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags); | ||||||
|  | 	Assert(nblocks == 1);		/* single block can't be short */ | ||||||
|  |  | ||||||
|  | 	return result; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | static inline bool | ||||||
|  | WaitReadBuffersCanStartIO(Buffer buffer, bool nowait) | ||||||
|  | { | ||||||
|  | 	if (BufferIsLocal(buffer)) | ||||||
|  | 	{ | ||||||
|  | 		BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1); | ||||||
|  |  | ||||||
|  | 		return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0; | ||||||
|  | 	} | ||||||
|  | 	else | ||||||
|  | 		return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | void | ||||||
|  | WaitReadBuffers(ReadBuffersOperation *operation) | ||||||
|  | { | ||||||
|  | 	Buffer	   *buffers; | ||||||
|  | 	int			nblocks; | ||||||
|  | 	BlockNumber blocknum; | ||||||
|  | 	ForkNumber	forknum; | ||||||
|  | 	IOContext	io_context; | ||||||
|  | 	IOObject	io_object; | ||||||
|  | 	char		persistence; | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 	 * Currently operations are only allowed to include a read of some range, | ||||||
|  | 	 * with an optional extra buffer that is already pinned at the end.  So | ||||||
|  | 	 * nblocks can be at most one more than io_buffers_len. | ||||||
|  | 	 */ | ||||||
|  | 	Assert((operation->nblocks == operation->io_buffers_len) || | ||||||
|  | 		   (operation->nblocks == operation->io_buffers_len + 1)); | ||||||
|  |  | ||||||
|  | 	/* Find the range of the physical read we need to perform. */ | ||||||
|  | 	nblocks = operation->io_buffers_len; | ||||||
|  | 	if (nblocks == 0) | ||||||
|  | 		return;					/* nothing to do */ | ||||||
|  |  | ||||||
|  | 	buffers = &operation->buffers[0]; | ||||||
|  | 	blocknum = operation->blocknum; | ||||||
|  | 	forknum = operation->forknum; | ||||||
|  |  | ||||||
|  | 	persistence = operation->rel | ||||||
|  | 		? operation->rel->rd_rel->relpersistence | ||||||
|  | 		: RELPERSISTENCE_PERMANENT; | ||||||
|  | 	if (persistence == RELPERSISTENCE_TEMP) | ||||||
|  | 	{ | ||||||
| 		io_context = IOCONTEXT_NORMAL; | 		io_context = IOCONTEXT_NORMAL; | ||||||
| 		io_object = IOOBJECT_TEMP_RELATION; | 		io_object = IOOBJECT_TEMP_RELATION; | ||||||
| 		bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found); |  | ||||||
| 		if (found) |  | ||||||
| 			pgBufferUsage.local_blks_hit++; |  | ||||||
| 		else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG || |  | ||||||
| 				 mode == RBM_ZERO_ON_ERROR) |  | ||||||
| 			pgBufferUsage.local_blks_read++; |  | ||||||
| 	} | 	} | ||||||
| 	else | 	else | ||||||
| 	{ | 	{ | ||||||
| 		/* | 		io_context = IOContextForStrategy(operation->strategy); | ||||||
| 		 * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is |  | ||||||
| 		 * not currently in memory. |  | ||||||
| 		 */ |  | ||||||
| 		io_context = IOContextForStrategy(strategy); |  | ||||||
| 		io_object = IOOBJECT_RELATION; | 		io_object = IOOBJECT_RELATION; | ||||||
| 		bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum, |  | ||||||
| 							 strategy, &found, io_context); |  | ||||||
| 		if (found) |  | ||||||
| 			pgBufferUsage.shared_blks_hit++; |  | ||||||
| 		else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG || |  | ||||||
| 				 mode == RBM_ZERO_ON_ERROR) |  | ||||||
| 			pgBufferUsage.shared_blks_read++; |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	/* At this point we do NOT hold any locks. */ | 	/* | ||||||
|  | 	 * We count all these blocks as read by this backend.  This is traditional | ||||||
|  | 	 * behavior, but might turn out to be not true if we find that someone | ||||||
|  | 	 * else has beaten us and completed the read of some of these blocks.  In | ||||||
|  | 	 * that case the system globally double-counts, but we traditionally don't | ||||||
|  | 	 * count this as a "hit", and we don't have a separate counter for "miss, | ||||||
|  | 	 * but another backend completed the read". | ||||||
|  | 	 */ | ||||||
|  | 	if (persistence == RELPERSISTENCE_TEMP) | ||||||
|  | 		pgBufferUsage.local_blks_read += nblocks; | ||||||
|  | 	else | ||||||
|  | 		pgBufferUsage.shared_blks_read += nblocks; | ||||||
|  |  | ||||||
| 	/* if it was already in the buffer pool, we're done */ | 	for (int i = 0; i < nblocks; ++i) | ||||||
| 	if (found) |  | ||||||
| 	{ | 	{ | ||||||
| 		/* Just need to update stats before we exit */ | 		int			io_buffers_len; | ||||||
| 		*hit = true; | 		Buffer		io_buffers[MAX_IO_COMBINE_LIMIT]; | ||||||
| 		VacuumPageHit++; | 		void	   *io_pages[MAX_IO_COMBINE_LIMIT]; | ||||||
| 		pgstat_count_io_op(io_object, io_context, IOOP_HIT); | 		instr_time	io_start; | ||||||
|  | 		BlockNumber io_first_block; | ||||||
| 		if (VacuumCostActive) |  | ||||||
| 			VacuumCostBalance += VacuumCostPageHit; |  | ||||||
|  |  | ||||||
| 		TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, |  | ||||||
| 										  smgr->smgr_rlocator.locator.spcOid, |  | ||||||
| 										  smgr->smgr_rlocator.locator.dbOid, |  | ||||||
| 										  smgr->smgr_rlocator.locator.relNumber, |  | ||||||
| 										  smgr->smgr_rlocator.backend, |  | ||||||
| 										  found); |  | ||||||
|  |  | ||||||
| 		/* | 		/* | ||||||
| 		 * In RBM_ZERO_AND_LOCK mode the caller expects the page to be locked | 		 * Skip this block if someone else has already completed it.  If an | ||||||
| 		 * on return. | 		 * I/O is already in progress in another backend, this will wait for | ||||||
|  | 		 * the outcome: either done, or something went wrong and we will | ||||||
|  | 		 * retry. | ||||||
| 		 */ | 		 */ | ||||||
| 		if (!isLocalBuf) | 		if (!WaitReadBuffersCanStartIO(buffers[i], false)) | ||||||
| 		{ | 		{ | ||||||
| 			if (mode == RBM_ZERO_AND_LOCK) | 			/* | ||||||
| 				LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), | 			 * Report this as a 'hit' for this backend, even though it must | ||||||
| 							  LW_EXCLUSIVE); | 			 * have started out as a miss in PinBufferForBlock(). | ||||||
| 			else if (mode == RBM_ZERO_AND_CLEANUP_LOCK) | 			 */ | ||||||
| 				LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr)); | 			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i, | ||||||
|  | 											  operation->smgr->smgr_rlocator.locator.spcOid, | ||||||
|  | 											  operation->smgr->smgr_rlocator.locator.dbOid, | ||||||
|  | 											  operation->smgr->smgr_rlocator.locator.relNumber, | ||||||
|  | 											  operation->smgr->smgr_rlocator.backend, | ||||||
|  | 											  true); | ||||||
|  | 			continue; | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		return BufferDescriptorGetBuffer(bufHdr); | 		/* We found a buffer that we need to read in. */ | ||||||
|  | 		io_buffers[0] = buffers[i]; | ||||||
|  | 		io_pages[0] = BufferGetBlock(buffers[i]); | ||||||
|  | 		io_first_block = blocknum + i; | ||||||
|  | 		io_buffers_len = 1; | ||||||
|  |  | ||||||
|  | 		/* | ||||||
|  | 		 * How many neighboring-on-disk blocks can we can scatter-read into | ||||||
|  | 		 * other buffers at the same time?  In this case we don't wait if we | ||||||
|  | 		 * see an I/O already in progress.  We already hold BM_IO_IN_PROGRESS | ||||||
|  | 		 * for the head block, so we should get on with that I/O as soon as | ||||||
|  | 		 * possible.  We'll come back to this block again, above. | ||||||
|  | 		 */ | ||||||
|  | 		while ((i + 1) < nblocks && | ||||||
|  | 			   WaitReadBuffersCanStartIO(buffers[i + 1], true)) | ||||||
|  | 		{ | ||||||
|  | 			/* Must be consecutive block numbers. */ | ||||||
|  | 			Assert(BufferGetBlockNumber(buffers[i + 1]) == | ||||||
|  | 				   BufferGetBlockNumber(buffers[i]) + 1); | ||||||
|  |  | ||||||
|  | 			io_buffers[io_buffers_len] = buffers[++i]; | ||||||
|  | 			io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 	/* | 		io_start = pgstat_prepare_io_time(track_io_timing); | ||||||
| 	 * if we have gotten to this point, we have allocated a buffer for the | 		smgrreadv(operation->smgr, forknum, io_first_block, io_pages, io_buffers_len); | ||||||
| 	 * page but its contents are not yet valid.  IO_IN_PROGRESS is set for it, | 		pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start, | ||||||
| 	 * if it's a shared buffer. | 								io_buffers_len); | ||||||
| 	 */ |  | ||||||
| 	Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID));	/* spinlock not needed */ |  | ||||||
|  |  | ||||||
| 	bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); | 		/* Verify each block we read, and terminate the I/O. */ | ||||||
|  | 		for (int j = 0; j < io_buffers_len; ++j) | ||||||
|  | 		{ | ||||||
|  | 			BufferDesc *bufHdr; | ||||||
|  | 			Block		bufBlock; | ||||||
|  |  | ||||||
| 	/* | 			if (persistence == RELPERSISTENCE_TEMP) | ||||||
| 	 * Read in the page, unless the caller intends to overwrite it and just | 			{ | ||||||
| 	 * wants us to allocate a buffer. | 				bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1); | ||||||
| 	 */ | 				bufBlock = LocalBufHdrGetBlock(bufHdr); | ||||||
| 	if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) | 			} | ||||||
| 		MemSet((char *) bufBlock, 0, BLCKSZ); |  | ||||||
| 			else | 			else | ||||||
| 			{ | 			{ | ||||||
| 		instr_time	io_start = pgstat_prepare_io_time(track_io_timing); | 				bufHdr = GetBufferDescriptor(io_buffers[j] - 1); | ||||||
|  | 				bufBlock = BufHdrGetBlock(bufHdr); | ||||||
| 		smgrread(smgr, forkNum, blockNum, bufBlock); | 			} | ||||||
|  |  | ||||||
| 		pgstat_count_io_op_time(io_object, io_context, |  | ||||||
| 								IOOP_READ, io_start, 1); |  | ||||||
|  |  | ||||||
| 			/* check for garbage data */ | 			/* check for garbage data */ | ||||||
| 		if (!PageIsVerifiedExtended((Page) bufBlock, blockNum, | 			if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j, | ||||||
| 										PIV_LOG_WARNING | PIV_REPORT_STAT)) | 										PIV_LOG_WARNING | PIV_REPORT_STAT)) | ||||||
| 			{ | 			{ | ||||||
| 			if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages) | 				if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages) | ||||||
| 				{ | 				{ | ||||||
| 					ereport(WARNING, | 					ereport(WARNING, | ||||||
| 							(errcode(ERRCODE_DATA_CORRUPTED), | 							(errcode(ERRCODE_DATA_CORRUPTED), | ||||||
| 							 errmsg("invalid page in block %u of relation %s; zeroing out page", | 							 errmsg("invalid page in block %u of relation %s; zeroing out page", | ||||||
| 								blockNum, | 									io_first_block + j, | ||||||
| 								relpath(smgr->smgr_rlocator, forkNum)))); | 									relpath(operation->smgr->smgr_rlocator, forknum)))); | ||||||
| 				MemSet((char *) bufBlock, 0, BLCKSZ); | 					memset(bufBlock, 0, BLCKSZ); | ||||||
| 				} | 				} | ||||||
| 				else | 				else | ||||||
| 					ereport(ERROR, | 					ereport(ERROR, | ||||||
| 							(errcode(ERRCODE_DATA_CORRUPTED), | 							(errcode(ERRCODE_DATA_CORRUPTED), | ||||||
| 							 errmsg("invalid page in block %u of relation %s", | 							 errmsg("invalid page in block %u of relation %s", | ||||||
| 								blockNum, | 									io_first_block + j, | ||||||
| 								relpath(smgr->smgr_rlocator, forkNum)))); | 									relpath(operation->smgr->smgr_rlocator, forknum)))); | ||||||
| 		} |  | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 	/* | 			/* Terminate I/O and set BM_VALID. */ | ||||||
| 	 * In RBM_ZERO_AND_LOCK / RBM_ZERO_AND_CLEANUP_LOCK mode, grab the buffer | 			if (persistence == RELPERSISTENCE_TEMP) | ||||||
| 	 * content lock before marking the page as valid, to make sure that no |  | ||||||
| 	 * other backend sees the zeroed page before the caller has had a chance |  | ||||||
| 	 * to initialize it. |  | ||||||
| 	 * |  | ||||||
| 	 * Since no-one else can be looking at the page contents yet, there is no |  | ||||||
| 	 * difference between an exclusive lock and a cleanup-strength lock. (Note |  | ||||||
| 	 * that we cannot use LockBuffer() or LockBufferForCleanup() here, because |  | ||||||
| 	 * they assert that the buffer is already valid.) |  | ||||||
| 	 */ |  | ||||||
| 	if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) && |  | ||||||
| 		!isLocalBuf) |  | ||||||
| 			{ | 			{ | ||||||
| 		LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE); |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if (isLocalBuf) |  | ||||||
| 	{ |  | ||||||
| 		/* Only need to adjust flags */ |  | ||||||
| 				uint32		buf_state = pg_atomic_read_u32(&bufHdr->state); | 				uint32		buf_state = pg_atomic_read_u32(&bufHdr->state); | ||||||
|  |  | ||||||
| 				buf_state |= BM_VALID; | 				buf_state |= BM_VALID; | ||||||
| @@ -1198,24 +1510,25 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, | |||||||
| 				TerminateBufferIO(bufHdr, false, BM_VALID, true); | 				TerminateBufferIO(bufHdr, false, BM_VALID, true); | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 	VacuumPageMiss++; | 			/* Report I/Os as completing individually. */ | ||||||
|  | 			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j, | ||||||
|  | 											  operation->smgr->smgr_rlocator.locator.spcOid, | ||||||
|  | 											  operation->smgr->smgr_rlocator.locator.dbOid, | ||||||
|  | 											  operation->smgr->smgr_rlocator.locator.relNumber, | ||||||
|  | 											  operation->smgr->smgr_rlocator.backend, | ||||||
|  | 											  false); | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		VacuumPageMiss += io_buffers_len; | ||||||
| 		if (VacuumCostActive) | 		if (VacuumCostActive) | ||||||
| 		VacuumCostBalance += VacuumCostPageMiss; | 			VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; | ||||||
|  | 	} | ||||||
| 	TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, |  | ||||||
| 									  smgr->smgr_rlocator.locator.spcOid, |  | ||||||
| 									  smgr->smgr_rlocator.locator.dbOid, |  | ||||||
| 									  smgr->smgr_rlocator.locator.relNumber, |  | ||||||
| 									  smgr->smgr_rlocator.backend, |  | ||||||
| 									  found); |  | ||||||
|  |  | ||||||
| 	return BufferDescriptorGetBuffer(bufHdr); |  | ||||||
| } | } | ||||||
|  |  | ||||||
| /* | /* | ||||||
|  * BufferAlloc -- subroutine for ReadBuffer.  Handles lookup of a shared |  * BufferAlloc -- subroutine for PinBufferForBlock.  Handles lookup of a shared | ||||||
|  *		buffer.  If no buffer exists already, selects a replacement |  *		buffer.  If no buffer exists already, selects a replacement victim and | ||||||
|  *		victim and evicts the old page, but does NOT read in new page. |  *		evicts the old page, but does NOT read in new page. | ||||||
|  * |  * | ||||||
|  * "strategy" can be a buffer replacement strategy object, or NULL for |  * "strategy" can be a buffer replacement strategy object, or NULL for | ||||||
|  * the default strategy.  The selected buffer's usage_count is advanced when |  * the default strategy.  The selected buffer's usage_count is advanced when | ||||||
| @@ -1223,11 +1536,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, | |||||||
|  * |  * | ||||||
|  * The returned buffer is pinned and is already marked as holding the |  * The returned buffer is pinned and is already marked as holding the | ||||||
|  * desired page.  If it already did have the desired page, *foundPtr is |  * desired page.  If it already did have the desired page, *foundPtr is | ||||||
|  * set true.  Otherwise, *foundPtr is set false and the buffer is marked |  * set true.  Otherwise, *foundPtr is set false. | ||||||
|  * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it. |  | ||||||
|  * |  | ||||||
|  * *foundPtr is actually redundant with the buffer's BM_VALID flag, but |  | ||||||
|  * we keep it for simplicity in ReadBuffer. |  | ||||||
|  * |  * | ||||||
|  * io_context is passed as an output parameter to avoid calling |  * io_context is passed as an output parameter to avoid calling | ||||||
|  * IOContextForStrategy() when there is a shared buffers hit and no IO |  * IOContextForStrategy() when there is a shared buffers hit and no IO | ||||||
| @@ -1235,7 +1544,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, | |||||||
|  * |  * | ||||||
|  * No locks are held either at entry or exit. |  * No locks are held either at entry or exit. | ||||||
|  */ |  */ | ||||||
| static BufferDesc * | static pg_attribute_always_inline BufferDesc * | ||||||
| BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, | BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, | ||||||
| 			BlockNumber blockNum, | 			BlockNumber blockNum, | ||||||
| 			BufferAccessStrategy strategy, | 			BufferAccessStrategy strategy, | ||||||
| @@ -1286,20 +1595,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, | |||||||
| 		{ | 		{ | ||||||
| 			/* | 			/* | ||||||
| 			 * We can only get here if (a) someone else is still reading in | 			 * We can only get here if (a) someone else is still reading in | ||||||
| 			 * the page, or (b) a previous read attempt failed.  We have to | 			 * the page, (b) a previous read attempt failed, or (c) someone | ||||||
| 			 * wait for any active read attempt to finish, and then set up our | 			 * called StartReadBuffers() but not yet WaitReadBuffers(). | ||||||
| 			 * own read attempt if the page is still not BM_VALID. |  | ||||||
| 			 * StartBufferIO does it all. |  | ||||||
| 			 */ |  | ||||||
| 			if (StartBufferIO(buf, true)) |  | ||||||
| 			{ |  | ||||||
| 				/* |  | ||||||
| 				 * If we get here, previous attempts to read the buffer must |  | ||||||
| 				 * have failed ... but we shall bravely try again. |  | ||||||
| 			 */ | 			 */ | ||||||
| 			*foundPtr = false; | 			*foundPtr = false; | ||||||
| 		} | 		} | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		return buf; | 		return buf; | ||||||
| 	} | 	} | ||||||
| @@ -1363,20 +1663,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, | |||||||
| 		{ | 		{ | ||||||
| 			/* | 			/* | ||||||
| 			 * We can only get here if (a) someone else is still reading in | 			 * We can only get here if (a) someone else is still reading in | ||||||
| 			 * the page, or (b) a previous read attempt failed.  We have to | 			 * the page, (b) a previous read attempt failed, or (c) someone | ||||||
| 			 * wait for any active read attempt to finish, and then set up our | 			 * called StartReadBuffers() but not yet WaitReadBuffers(). | ||||||
| 			 * own read attempt if the page is still not BM_VALID. |  | ||||||
| 			 * StartBufferIO does it all. |  | ||||||
| 			 */ |  | ||||||
| 			if (StartBufferIO(existing_buf_hdr, true)) |  | ||||||
| 			{ |  | ||||||
| 				/* |  | ||||||
| 				 * If we get here, previous attempts to read the buffer must |  | ||||||
| 				 * have failed ... but we shall bravely try again. |  | ||||||
| 			 */ | 			 */ | ||||||
| 			*foundPtr = false; | 			*foundPtr = false; | ||||||
| 		} | 		} | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		return existing_buf_hdr; | 		return existing_buf_hdr; | ||||||
| 	} | 	} | ||||||
| @@ -1407,15 +1698,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, | |||||||
| 	LWLockRelease(newPartitionLock); | 	LWLockRelease(newPartitionLock); | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * Buffer contents are currently invalid.  Try to obtain the right to | 	 * Buffer contents are currently invalid. | ||||||
| 	 * start I/O.  If StartBufferIO returns false, then someone else managed |  | ||||||
| 	 * to read it before we did, so there's nothing left for BufferAlloc() to |  | ||||||
| 	 * do. |  | ||||||
| 	 */ | 	 */ | ||||||
| 	if (StartBufferIO(victim_buf_hdr, true)) |  | ||||||
| 	*foundPtr = false; | 	*foundPtr = false; | ||||||
| 	else |  | ||||||
| 		*foundPtr = true; |  | ||||||
|  |  | ||||||
| 	return victim_buf_hdr; | 	return victim_buf_hdr; | ||||||
| } | } | ||||||
| @@ -1769,7 +2054,7 @@ again: | |||||||
|  * pessimistic, but outside of toy-sized shared_buffers it should allow |  * pessimistic, but outside of toy-sized shared_buffers it should allow | ||||||
|  * sufficient pins. |  * sufficient pins. | ||||||
|  */ |  */ | ||||||
| static void | void | ||||||
| LimitAdditionalPins(uint32 *additional_pins) | LimitAdditionalPins(uint32 *additional_pins) | ||||||
| { | { | ||||||
| 	uint32		max_backends; | 	uint32		max_backends; | ||||||
| @@ -2034,7 +2319,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, | |||||||
|  |  | ||||||
| 				buf_state &= ~BM_VALID; | 				buf_state &= ~BM_VALID; | ||||||
| 				UnlockBufHdr(existing_hdr, buf_state); | 				UnlockBufHdr(existing_hdr, buf_state); | ||||||
| 			} while (!StartBufferIO(existing_hdr, true)); | 			} while (!StartBufferIO(existing_hdr, true, false)); | ||||||
| 		} | 		} | ||||||
| 		else | 		else | ||||||
| 		{ | 		{ | ||||||
| @@ -2057,7 +2342,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, | |||||||
| 			LWLockRelease(partition_lock); | 			LWLockRelease(partition_lock); | ||||||
|  |  | ||||||
| 			/* XXX: could combine the locked operations in it with the above */ | 			/* XXX: could combine the locked operations in it with the above */ | ||||||
| 			StartBufferIO(victim_buf_hdr, true); | 			StartBufferIO(victim_buf_hdr, true, false); | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -2372,7 +2657,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) | |||||||
| 	else | 	else | ||||||
| 	{ | 	{ | ||||||
| 		/* | 		/* | ||||||
| 		 * If we previously pinned the buffer, it must surely be valid. | 		 * If we previously pinned the buffer, it is likely to be valid, but | ||||||
|  | 		 * it may not be if StartReadBuffers() was called and | ||||||
|  | 		 * WaitReadBuffers() hasn't been called yet.  We'll check by loading | ||||||
|  | 		 * the flags without locking.  This is racy, but it's OK to return | ||||||
|  | 		 * false spuriously: when WaitReadBuffers() calls StartBufferIO(), | ||||||
|  | 		 * it'll see that it's now valid. | ||||||
| 		 * | 		 * | ||||||
| 		 * Note: We deliberately avoid a Valgrind client request here. | 		 * Note: We deliberately avoid a Valgrind client request here. | ||||||
| 		 * Individual access methods can optionally superimpose buffer page | 		 * Individual access methods can optionally superimpose buffer page | ||||||
| @@ -2381,7 +2671,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) | |||||||
| 		 * that the buffer page is legitimately non-accessible here.  We | 		 * that the buffer page is legitimately non-accessible here.  We | ||||||
| 		 * cannot meddle with that. | 		 * cannot meddle with that. | ||||||
| 		 */ | 		 */ | ||||||
| 		result = true; | 		result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0; | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	ref->refcount++; | 	ref->refcount++; | ||||||
| @@ -3449,7 +3739,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, | |||||||
| 	 * someone else flushed the buffer before we could, so we need not do | 	 * someone else flushed the buffer before we could, so we need not do | ||||||
| 	 * anything. | 	 * anything. | ||||||
| 	 */ | 	 */ | ||||||
| 	if (!StartBufferIO(buf, false)) | 	if (!StartBufferIO(buf, false, false)) | ||||||
| 		return; | 		return; | ||||||
|  |  | ||||||
| 	/* Setup error traceback support for ereport() */ | 	/* Setup error traceback support for ereport() */ | ||||||
| @@ -5184,9 +5474,15 @@ WaitIO(BufferDesc *buf) | |||||||
|  * |  * | ||||||
|  * Returns true if we successfully marked the buffer as I/O busy, |  * Returns true if we successfully marked the buffer as I/O busy, | ||||||
|  * false if someone else already did the work. |  * false if someone else already did the work. | ||||||
|  |  * | ||||||
|  |  * If nowait is true, then we don't wait for an I/O to be finished by another | ||||||
|  |  * backend.  In that case, false indicates either that the I/O was already | ||||||
|  |  * finished, or is still in progress.  This is useful for callers that want to | ||||||
|  |  * find out if they can perform the I/O as part of a larger operation, without | ||||||
|  |  * waiting for the answer or distinguishing the reasons why not. | ||||||
|  */ |  */ | ||||||
| static bool | static bool | ||||||
| StartBufferIO(BufferDesc *buf, bool forInput) | StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) | ||||||
| { | { | ||||||
| 	uint32		buf_state; | 	uint32		buf_state; | ||||||
|  |  | ||||||
| @@ -5199,6 +5495,8 @@ StartBufferIO(BufferDesc *buf, bool forInput) | |||||||
| 		if (!(buf_state & BM_IO_IN_PROGRESS)) | 		if (!(buf_state & BM_IO_IN_PROGRESS)) | ||||||
| 			break; | 			break; | ||||||
| 		UnlockBufHdr(buf, buf_state); | 		UnlockBufHdr(buf, buf_state); | ||||||
|  | 		if (nowait) | ||||||
|  | 			return false; | ||||||
| 		WaitIO(buf); | 		WaitIO(buf); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -108,10 +108,9 @@ PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum, | |||||||
|  * LocalBufferAlloc - |  * LocalBufferAlloc - | ||||||
|  *	  Find or create a local buffer for the given page of the given relation. |  *	  Find or create a local buffer for the given page of the given relation. | ||||||
|  * |  * | ||||||
|  * API is similar to bufmgr.c's BufferAlloc, except that we do not need |  * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do | ||||||
|  * to do any locking since this is all local.   Also, IO_IN_PROGRESS |  * any locking since this is all local.  We support only default access | ||||||
|  * does not get set.  Lastly, we support only default access strategy |  * strategy (hence, usage_count is always advanced). | ||||||
|  * (hence, usage_count is always advanced). |  | ||||||
|  */ |  */ | ||||||
| BufferDesc * | BufferDesc * | ||||||
| LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, | LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, | ||||||
| @@ -287,7 +286,7 @@ GetLocalVictimBuffer(void) | |||||||
| } | } | ||||||
|  |  | ||||||
| /* see LimitAdditionalPins() */ | /* see LimitAdditionalPins() */ | ||||||
| static void | void | ||||||
| LimitAdditionalLocalPins(uint32 *additional_pins) | LimitAdditionalLocalPins(uint32 *additional_pins) | ||||||
| { | { | ||||||
| 	uint32		max_pins; | 	uint32		max_pins; | ||||||
| @@ -297,9 +296,10 @@ LimitAdditionalLocalPins(uint32 *additional_pins) | |||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * In contrast to LimitAdditionalPins() other backends don't play a role | 	 * In contrast to LimitAdditionalPins() other backends don't play a role | ||||||
| 	 * here. We can allow up to NLocBuffer pins in total. | 	 * here. We can allow up to NLocBuffer pins in total, but it might not be | ||||||
|  | 	 * initialized yet so read num_temp_buffers. | ||||||
| 	 */ | 	 */ | ||||||
| 	max_pins = (NLocBuffer - NLocalPinnedBuffers); | 	max_pins = (num_temp_buffers - NLocalPinnedBuffers); | ||||||
|  |  | ||||||
| 	if (*additional_pins >= max_pins) | 	if (*additional_pins >= max_pins) | ||||||
| 		*additional_pins = max_pins; | 		*additional_pins = max_pins; | ||||||
|   | |||||||
| @@ -3129,6 +3129,20 @@ struct config_int ConfigureNamesInt[] = | |||||||
| 		NULL | 		NULL | ||||||
| 	}, | 	}, | ||||||
|  |  | ||||||
|  | 	{ | ||||||
|  | 		{"io_combine_limit", | ||||||
|  | 			PGC_USERSET, | ||||||
|  | 			RESOURCES_ASYNCHRONOUS, | ||||||
|  | 			gettext_noop("Limit on the size of data reads and writes."), | ||||||
|  | 			NULL, | ||||||
|  | 			GUC_UNIT_BLOCKS | ||||||
|  | 		}, | ||||||
|  | 		&io_combine_limit, | ||||||
|  | 		DEFAULT_IO_COMBINE_LIMIT, | ||||||
|  | 		1, MAX_IO_COMBINE_LIMIT, | ||||||
|  | 		NULL, NULL, NULL | ||||||
|  | 	}, | ||||||
|  |  | ||||||
| 	{ | 	{ | ||||||
| 		{"backend_flush_after", PGC_USERSET, RESOURCES_ASYNCHRONOUS, | 		{"backend_flush_after", PGC_USERSET, RESOURCES_ASYNCHRONOUS, | ||||||
| 			gettext_noop("Number of pages after which previously performed writes are flushed to disk."), | 			gettext_noop("Number of pages after which previously performed writes are flushed to disk."), | ||||||
|   | |||||||
| @@ -203,6 +203,7 @@ | |||||||
| #backend_flush_after = 0		# measured in pages, 0 disables | #backend_flush_after = 0		# measured in pages, 0 disables | ||||||
| #effective_io_concurrency = 1		# 1-1000; 0 disables prefetching | #effective_io_concurrency = 1		# 1-1000; 0 disables prefetching | ||||||
| #maintenance_io_concurrency = 10	# 1-1000; 0 disables prefetching | #maintenance_io_concurrency = 10	# 1-1000; 0 disables prefetching | ||||||
|  | #io_combine_limit = 128kB		# usually 1-32 blocks (depends on OS) | ||||||
| #max_worker_processes = 8		# (change requires restart) | #max_worker_processes = 8		# (change requires restart) | ||||||
| #max_parallel_workers_per_gather = 2	# limited by max_parallel_workers | #max_parallel_workers_per_gather = 2	# limited by max_parallel_workers | ||||||
| #max_parallel_maintenance_workers = 2	# limited by max_parallel_workers | #max_parallel_maintenance_workers = 2	# limited by max_parallel_workers | ||||||
|   | |||||||
| @@ -14,6 +14,7 @@ | |||||||
| #ifndef BUFMGR_H | #ifndef BUFMGR_H | ||||||
| #define BUFMGR_H | #define BUFMGR_H | ||||||
|  |  | ||||||
|  | #include "port/pg_iovec.h" | ||||||
| #include "storage/block.h" | #include "storage/block.h" | ||||||
| #include "storage/buf.h" | #include "storage/buf.h" | ||||||
| #include "storage/bufpage.h" | #include "storage/bufpage.h" | ||||||
| @@ -106,6 +107,41 @@ typedef struct BufferManagerRelation | |||||||
| #define BMR_REL(p_rel) ((BufferManagerRelation){.rel = p_rel}) | #define BMR_REL(p_rel) ((BufferManagerRelation){.rel = p_rel}) | ||||||
| #define BMR_SMGR(p_smgr, p_relpersistence) ((BufferManagerRelation){.smgr = p_smgr, .relpersistence = p_relpersistence}) | #define BMR_SMGR(p_smgr, p_relpersistence) ((BufferManagerRelation){.smgr = p_smgr, .relpersistence = p_relpersistence}) | ||||||
|  |  | ||||||
|  | typedef enum ReadBuffersFlags | ||||||
|  | { | ||||||
|  | 	/* Zero out page if reading fails. */ | ||||||
|  | 	READ_BUFFERS_ZERO_ON_ERROR = (1 << 0), | ||||||
|  |  | ||||||
|  | 	/* Call smgrprefetch() if I/O necessary. */ | ||||||
|  | 	READ_BUFFERS_ISSUE_ADVICE = (1 << 1), | ||||||
|  | } ReadBuffersFlags; | ||||||
|  |  | ||||||
|  | struct ReadBuffersOperation | ||||||
|  | { | ||||||
|  | 	/* | ||||||
|  | 	 * The following members should be set by the caller.  If only smgr is | ||||||
|  | 	 * provided without rel, then smgr_persistence can be set to override the | ||||||
|  | 	 * default assumption of RELPERSISTENCE_PERMANENT. | ||||||
|  | 	 */ | ||||||
|  | 	Relation	rel; | ||||||
|  | 	struct SMgrRelationData *smgr; | ||||||
|  | 	char		smgr_persistence; | ||||||
|  | 	ForkNumber	forknum; | ||||||
|  | 	BufferAccessStrategy strategy; | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 	 * The following private members are private state for communication | ||||||
|  | 	 * between StartReadBuffers() and WaitReadBuffers(), initialized only if | ||||||
|  | 	 * an actual read is required, and should not be modified. | ||||||
|  | 	 */ | ||||||
|  | 	Buffer	   *buffers; | ||||||
|  | 	BlockNumber blocknum; | ||||||
|  | 	int			flags; | ||||||
|  | 	int16		nblocks; | ||||||
|  | 	int16		io_buffers_len; | ||||||
|  | }; | ||||||
|  |  | ||||||
|  | typedef struct ReadBuffersOperation ReadBuffersOperation; | ||||||
|  |  | ||||||
| /* forward declared, to avoid having to expose buf_internals.h here */ | /* forward declared, to avoid having to expose buf_internals.h here */ | ||||||
| struct WritebackContext; | struct WritebackContext; | ||||||
| @@ -133,6 +169,10 @@ extern PGDLLIMPORT bool track_io_timing; | |||||||
| extern PGDLLIMPORT int effective_io_concurrency; | extern PGDLLIMPORT int effective_io_concurrency; | ||||||
| extern PGDLLIMPORT int maintenance_io_concurrency; | extern PGDLLIMPORT int maintenance_io_concurrency; | ||||||
|  |  | ||||||
|  | #define MAX_IO_COMBINE_LIMIT PG_IOV_MAX | ||||||
|  | #define DEFAULT_IO_COMBINE_LIMIT Min(MAX_IO_COMBINE_LIMIT, (128 * 1024) / BLCKSZ) | ||||||
|  | extern PGDLLIMPORT int io_combine_limit; | ||||||
|  |  | ||||||
| extern PGDLLIMPORT int checkpoint_flush_after; | extern PGDLLIMPORT int checkpoint_flush_after; | ||||||
| extern PGDLLIMPORT int backend_flush_after; | extern PGDLLIMPORT int backend_flush_after; | ||||||
| extern PGDLLIMPORT int bgwriter_flush_after; | extern PGDLLIMPORT int bgwriter_flush_after; | ||||||
| @@ -177,6 +217,18 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator, | |||||||
| 										ForkNumber forkNum, BlockNumber blockNum, | 										ForkNumber forkNum, BlockNumber blockNum, | ||||||
| 										ReadBufferMode mode, BufferAccessStrategy strategy, | 										ReadBufferMode mode, BufferAccessStrategy strategy, | ||||||
| 										bool permanent); | 										bool permanent); | ||||||
|  |  | ||||||
|  | extern bool StartReadBuffer(ReadBuffersOperation *operation, | ||||||
|  | 							Buffer *buffer, | ||||||
|  | 							BlockNumber blocknum, | ||||||
|  | 							int flags); | ||||||
|  | extern bool StartReadBuffers(ReadBuffersOperation *operation, | ||||||
|  | 							 Buffer *buffers, | ||||||
|  | 							 BlockNumber blocknum, | ||||||
|  | 							 int *nblocks, | ||||||
|  | 							 int flags); | ||||||
|  | extern void WaitReadBuffers(ReadBuffersOperation *operation); | ||||||
|  |  | ||||||
| extern void ReleaseBuffer(Buffer buffer); | extern void ReleaseBuffer(Buffer buffer); | ||||||
| extern void UnlockReleaseBuffer(Buffer buffer); | extern void UnlockReleaseBuffer(Buffer buffer); | ||||||
| extern bool BufferIsExclusiveLocked(Buffer buffer); | extern bool BufferIsExclusiveLocked(Buffer buffer); | ||||||
| @@ -250,6 +302,9 @@ extern bool HoldingBufferPinThatDelaysRecovery(void); | |||||||
|  |  | ||||||
| extern bool BgBufferSync(struct WritebackContext *wb_context); | extern bool BgBufferSync(struct WritebackContext *wb_context); | ||||||
|  |  | ||||||
|  | extern void LimitAdditionalPins(uint32 *additional_pins); | ||||||
|  | extern void LimitAdditionalLocalPins(uint32 *additional_pins); | ||||||
|  |  | ||||||
| /* in buf_init.c */ | /* in buf_init.c */ | ||||||
| extern void InitBufferPool(void); | extern void InitBufferPool(void); | ||||||
| extern Size BufferShmemSize(void); | extern Size BufferShmemSize(void); | ||||||
|   | |||||||
| @@ -2288,6 +2288,8 @@ ReInitializeDSMForeignScan_function | |||||||
| ReScanForeignScan_function | ReScanForeignScan_function | ||||||
| ReadBufPtrType | ReadBufPtrType | ||||||
| ReadBufferMode | ReadBufferMode | ||||||
|  | ReadBuffersFlags | ||||||
|  | ReadBuffersOperation | ||||||
| ReadBytePtrType | ReadBytePtrType | ||||||
| ReadExtraTocPtrType | ReadExtraTocPtrType | ||||||
| ReadFunc | ReadFunc | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user