mirror of
				https://github.com/postgres/postgres.git
				synced 2025-11-03 09:13:20 +03:00 
			
		
		
		
	Fix read_stream.c for changing io_combine_limit.
In a couple of places, read_stream.c assumed that io_combine_limit would
be stable during the lifetime of a stream.  That is not true in at least
one unusual case: streams held by CURSORs where you could change the GUC
between FETCH commands, with unpredictable results.
Fix, by storing stream->io_combine_limit and referring only to that
after construction.  This mirrors the treatment of the other important
setting {effective,maintenance}_io_concurrency, which is stored in
stream->max_ios.
One of the cases was the queue overflow space, which was sized for
io_combine_limit and could be overrun if the GUC was increased.  Since
that coding was a little hard to follow, also introduce a variable for
better readability instead of open-coding the arithmetic.  Doing so
revealed an off-by-one thinko while clamping max_pinned_buffers to
INT16_MAX, though that wasn't a live bug due to the current limits on
GUC values.
Back-patch to 17.
Discussion: https://postgr.es/m/CA%2BhUKG%2B2T9p-%2BzM6Eeou-RAJjTML6eit1qn26f9twznX59qtCA%40mail.gmail.com
			
			
This commit is contained in:
		@@ -109,6 +109,7 @@ typedef struct InProgressIO
 | 
				
			|||||||
struct ReadStream
 | 
					struct ReadStream
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	int16		max_ios;
 | 
						int16		max_ios;
 | 
				
			||||||
 | 
						int16		io_combine_limit;
 | 
				
			||||||
	int16		ios_in_progress;
 | 
						int16		ios_in_progress;
 | 
				
			||||||
	int16		queue_size;
 | 
						int16		queue_size;
 | 
				
			||||||
	int16		max_pinned_buffers;
 | 
						int16		max_pinned_buffers;
 | 
				
			||||||
@@ -236,7 +237,7 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	/* This should only be called with a pending read. */
 | 
						/* This should only be called with a pending read. */
 | 
				
			||||||
	Assert(stream->pending_read_nblocks > 0);
 | 
						Assert(stream->pending_read_nblocks > 0);
 | 
				
			||||||
	Assert(stream->pending_read_nblocks <= io_combine_limit);
 | 
						Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/* We had better not exceed the pin limit by starting this read. */
 | 
						/* We had better not exceed the pin limit by starting this read. */
 | 
				
			||||||
	Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
 | 
						Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
 | 
				
			||||||
@@ -324,7 +325,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 | 
				
			|||||||
		int16		buffer_index;
 | 
							int16		buffer_index;
 | 
				
			||||||
		void	   *per_buffer_data;
 | 
							void	   *per_buffer_data;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if (stream->pending_read_nblocks == io_combine_limit)
 | 
							if (stream->pending_read_nblocks == stream->io_combine_limit)
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			read_stream_start_pending_read(stream, suppress_advice);
 | 
								read_stream_start_pending_read(stream, suppress_advice);
 | 
				
			||||||
			suppress_advice = false;
 | 
								suppress_advice = false;
 | 
				
			||||||
@@ -384,7 +385,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 | 
				
			|||||||
	 * signaled end-of-stream, we start the read immediately.
 | 
						 * signaled end-of-stream, we start the read immediately.
 | 
				
			||||||
	 */
 | 
						 */
 | 
				
			||||||
	if (stream->pending_read_nblocks > 0 &&
 | 
						if (stream->pending_read_nblocks > 0 &&
 | 
				
			||||||
		(stream->pending_read_nblocks == io_combine_limit ||
 | 
							(stream->pending_read_nblocks == stream->io_combine_limit ||
 | 
				
			||||||
		 (stream->pending_read_nblocks == stream->distance &&
 | 
							 (stream->pending_read_nblocks == stream->distance &&
 | 
				
			||||||
		  stream->pinned_buffers == 0) ||
 | 
							  stream->pinned_buffers == 0) ||
 | 
				
			||||||
		 stream->distance == 0) &&
 | 
							 stream->distance == 0) &&
 | 
				
			||||||
@@ -415,6 +416,7 @@ read_stream_begin_impl(int flags,
 | 
				
			|||||||
	ReadStream *stream;
 | 
						ReadStream *stream;
 | 
				
			||||||
	size_t		size;
 | 
						size_t		size;
 | 
				
			||||||
	int16		queue_size;
 | 
						int16		queue_size;
 | 
				
			||||||
 | 
						int16		queue_overflow;
 | 
				
			||||||
	int			max_ios;
 | 
						int			max_ios;
 | 
				
			||||||
	int			strategy_pin_limit;
 | 
						int			strategy_pin_limit;
 | 
				
			||||||
	uint32		max_pinned_buffers;
 | 
						uint32		max_pinned_buffers;
 | 
				
			||||||
@@ -445,6 +447,14 @@ read_stream_begin_impl(int flags,
 | 
				
			|||||||
	/* Cap to INT16_MAX to avoid overflowing below */
 | 
						/* Cap to INT16_MAX to avoid overflowing below */
 | 
				
			||||||
	max_ios = Min(max_ios, PG_INT16_MAX);
 | 
						max_ios = Min(max_ios, PG_INT16_MAX);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						/*
 | 
				
			||||||
 | 
						 * If starting a multi-block I/O near the end of the queue, we might
 | 
				
			||||||
 | 
						 * temporarily need extra space for overflowing buffers before they are
 | 
				
			||||||
 | 
						 * moved to regular circular position.  This is the maximum extra space we
 | 
				
			||||||
 | 
						 * could need.
 | 
				
			||||||
 | 
						 */
 | 
				
			||||||
 | 
						queue_overflow = io_combine_limit - 1;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/*
 | 
						/*
 | 
				
			||||||
	 * Choose the maximum number of buffers we're prepared to pin.  We try to
 | 
						 * Choose the maximum number of buffers we're prepared to pin.  We try to
 | 
				
			||||||
	 * pin fewer if we can, though.  We add one so that we can make progress
 | 
						 * pin fewer if we can, though.  We add one so that we can make progress
 | 
				
			||||||
@@ -459,7 +469,7 @@ read_stream_begin_impl(int flags,
 | 
				
			|||||||
	 */
 | 
						 */
 | 
				
			||||||
	max_pinned_buffers = (max_ios + 1) * io_combine_limit;
 | 
						max_pinned_buffers = (max_ios + 1) * io_combine_limit;
 | 
				
			||||||
	max_pinned_buffers = Min(max_pinned_buffers,
 | 
						max_pinned_buffers = Min(max_pinned_buffers,
 | 
				
			||||||
							 PG_INT16_MAX - io_combine_limit - 1);
 | 
												 PG_INT16_MAX - queue_overflow - 1);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/* Give the strategy a chance to limit the number of buffers we pin. */
 | 
						/* Give the strategy a chance to limit the number of buffers we pin. */
 | 
				
			||||||
	strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
 | 
						strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
 | 
				
			||||||
@@ -485,18 +495,17 @@ read_stream_begin_impl(int flags,
 | 
				
			|||||||
	 * one big chunk.  Though we have queue_size buffers, we want to be able
 | 
						 * one big chunk.  Though we have queue_size buffers, we want to be able
 | 
				
			||||||
	 * to assume that all the buffers for a single read are contiguous (i.e.
 | 
						 * to assume that all the buffers for a single read are contiguous (i.e.
 | 
				
			||||||
	 * don't wrap around halfway through), so we allow temporary overflows of
 | 
						 * don't wrap around halfway through), so we allow temporary overflows of
 | 
				
			||||||
	 * up to the maximum possible read size by allocating an extra
 | 
						 * up to the maximum possible overflow size.
 | 
				
			||||||
	 * io_combine_limit - 1 elements.
 | 
					 | 
				
			||||||
	 */
 | 
						 */
 | 
				
			||||||
	size = offsetof(ReadStream, buffers);
 | 
						size = offsetof(ReadStream, buffers);
 | 
				
			||||||
	size += sizeof(Buffer) * (queue_size + io_combine_limit - 1);
 | 
						size += sizeof(Buffer) * (queue_size + queue_overflow);
 | 
				
			||||||
	size += sizeof(InProgressIO) * Max(1, max_ios);
 | 
						size += sizeof(InProgressIO) * Max(1, max_ios);
 | 
				
			||||||
	size += per_buffer_data_size * queue_size;
 | 
						size += per_buffer_data_size * queue_size;
 | 
				
			||||||
	size += MAXIMUM_ALIGNOF * 2;
 | 
						size += MAXIMUM_ALIGNOF * 2;
 | 
				
			||||||
	stream = (ReadStream *) palloc(size);
 | 
						stream = (ReadStream *) palloc(size);
 | 
				
			||||||
	memset(stream, 0, offsetof(ReadStream, buffers));
 | 
						memset(stream, 0, offsetof(ReadStream, buffers));
 | 
				
			||||||
	stream->ios = (InProgressIO *)
 | 
						stream->ios = (InProgressIO *)
 | 
				
			||||||
		MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]);
 | 
							MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
 | 
				
			||||||
	if (per_buffer_data_size > 0)
 | 
						if (per_buffer_data_size > 0)
 | 
				
			||||||
		stream->per_buffer_data = (void *)
 | 
							stream->per_buffer_data = (void *)
 | 
				
			||||||
			MAXALIGN(&stream->ios[Max(1, max_ios)]);
 | 
								MAXALIGN(&stream->ios[Max(1, max_ios)]);
 | 
				
			||||||
@@ -523,7 +532,14 @@ read_stream_begin_impl(int flags,
 | 
				
			|||||||
	if (max_ios == 0)
 | 
						if (max_ios == 0)
 | 
				
			||||||
		max_ios = 1;
 | 
							max_ios = 1;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						/*
 | 
				
			||||||
 | 
						 * Capture stable values for these two GUC-derived numbers for the
 | 
				
			||||||
 | 
						 * lifetime of this stream, so we don't have to worry about the GUCs
 | 
				
			||||||
 | 
						 * changing underneath us beyond this point.
 | 
				
			||||||
 | 
						 */
 | 
				
			||||||
	stream->max_ios = max_ios;
 | 
						stream->max_ios = max_ios;
 | 
				
			||||||
 | 
						stream->io_combine_limit = io_combine_limit;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	stream->per_buffer_data_size = per_buffer_data_size;
 | 
						stream->per_buffer_data_size = per_buffer_data_size;
 | 
				
			||||||
	stream->max_pinned_buffers = max_pinned_buffers;
 | 
						stream->max_pinned_buffers = max_pinned_buffers;
 | 
				
			||||||
	stream->queue_size = queue_size;
 | 
						stream->queue_size = queue_size;
 | 
				
			||||||
@@ -537,7 +553,7 @@ read_stream_begin_impl(int flags,
 | 
				
			|||||||
	 * doing full io_combine_limit sized reads (behavior B).
 | 
						 * doing full io_combine_limit sized reads (behavior B).
 | 
				
			||||||
	 */
 | 
						 */
 | 
				
			||||||
	if (flags & READ_STREAM_FULL)
 | 
						if (flags & READ_STREAM_FULL)
 | 
				
			||||||
		stream->distance = Min(max_pinned_buffers, io_combine_limit);
 | 
							stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
 | 
				
			||||||
	else
 | 
						else
 | 
				
			||||||
		stream->distance = 1;
 | 
							stream->distance = 1;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -752,14 +768,14 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 | 
				
			|||||||
		else
 | 
							else
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			/* No advice; move towards io_combine_limit (behavior B). */
 | 
								/* No advice; move towards io_combine_limit (behavior B). */
 | 
				
			||||||
			if (stream->distance > io_combine_limit)
 | 
								if (stream->distance > stream->io_combine_limit)
 | 
				
			||||||
			{
 | 
								{
 | 
				
			||||||
				stream->distance--;
 | 
									stream->distance--;
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			else
 | 
								else
 | 
				
			||||||
			{
 | 
								{
 | 
				
			||||||
				distance = stream->distance * 2;
 | 
									distance = stream->distance * 2;
 | 
				
			||||||
				distance = Min(distance, io_combine_limit);
 | 
									distance = Min(distance, stream->io_combine_limit);
 | 
				
			||||||
				distance = Min(distance, stream->max_pinned_buffers);
 | 
									distance = Min(distance, stream->max_pinned_buffers);
 | 
				
			||||||
				stream->distance = distance;
 | 
									stream->distance = distance;
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user