mirror of
https://github.com/postgres/postgres.git
synced 2025-05-29 16:21:20 +03:00
Fix read_stream.c for changing io_combine_limit.
In a couple of places, read_stream.c assumed that io_combine_limit would be stable during the lifetime of a stream. That is not true in at least one unusual case: streams held by CURSORs where you could change the GUC between FETCH commands, with unpredictable results. Fix, by storing stream->io_combine_limit and referring only to that after construction. This mirrors the treatment of the other important setting {effective,maintenance}_io_concurrency, which is stored in stream->max_ios. One of the cases was the queue overflow space, which was sized for io_combine_limit and could be overrun if the GUC was increased. Since that coding was a little hard to follow, also introduce a variable for better readability instead of open-coding the arithmetic. Doing so revealed an off-by-one thinko while clamping max_pinned_buffers to INT16_MAX, though that wasn't a live bug due to the current limits on GUC values. Back-patch to 17. Discussion: https://postgr.es/m/CA%2BhUKG%2B2T9p-%2BzM6Eeou-RAJjTML6eit1qn26f9twznX59qtCA%40mail.gmail.com
This commit is contained in:
parent
8b2392ae3d
commit
e273468070
@ -109,6 +109,7 @@ typedef struct InProgressIO
|
|||||||
struct ReadStream
|
struct ReadStream
|
||||||
{
|
{
|
||||||
int16 max_ios;
|
int16 max_ios;
|
||||||
|
int16 io_combine_limit;
|
||||||
int16 ios_in_progress;
|
int16 ios_in_progress;
|
||||||
int16 queue_size;
|
int16 queue_size;
|
||||||
int16 max_pinned_buffers;
|
int16 max_pinned_buffers;
|
||||||
@ -219,7 +220,7 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
|
|||||||
|
|
||||||
/* This should only be called with a pending read. */
|
/* This should only be called with a pending read. */
|
||||||
Assert(stream->pending_read_nblocks > 0);
|
Assert(stream->pending_read_nblocks > 0);
|
||||||
Assert(stream->pending_read_nblocks <= io_combine_limit);
|
Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
|
||||||
|
|
||||||
/* We had better not exceed the pin limit by starting this read. */
|
/* We had better not exceed the pin limit by starting this read. */
|
||||||
Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
|
Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
|
||||||
@ -307,7 +308,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
|
|||||||
int16 buffer_index;
|
int16 buffer_index;
|
||||||
void *per_buffer_data;
|
void *per_buffer_data;
|
||||||
|
|
||||||
if (stream->pending_read_nblocks == io_combine_limit)
|
if (stream->pending_read_nblocks == stream->io_combine_limit)
|
||||||
{
|
{
|
||||||
read_stream_start_pending_read(stream, suppress_advice);
|
read_stream_start_pending_read(stream, suppress_advice);
|
||||||
suppress_advice = false;
|
suppress_advice = false;
|
||||||
@ -367,7 +368,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
|
|||||||
* signaled end-of-stream, we start the read immediately.
|
* signaled end-of-stream, we start the read immediately.
|
||||||
*/
|
*/
|
||||||
if (stream->pending_read_nblocks > 0 &&
|
if (stream->pending_read_nblocks > 0 &&
|
||||||
(stream->pending_read_nblocks == io_combine_limit ||
|
(stream->pending_read_nblocks == stream->io_combine_limit ||
|
||||||
(stream->pending_read_nblocks == stream->distance &&
|
(stream->pending_read_nblocks == stream->distance &&
|
||||||
stream->pinned_buffers == 0) ||
|
stream->pinned_buffers == 0) ||
|
||||||
stream->distance == 0) &&
|
stream->distance == 0) &&
|
||||||
@ -396,6 +397,7 @@ read_stream_begin_relation(int flags,
|
|||||||
ReadStream *stream;
|
ReadStream *stream;
|
||||||
size_t size;
|
size_t size;
|
||||||
int16 queue_size;
|
int16 queue_size;
|
||||||
|
int16 queue_overflow;
|
||||||
int max_ios;
|
int max_ios;
|
||||||
int strategy_pin_limit;
|
int strategy_pin_limit;
|
||||||
uint32 max_pinned_buffers;
|
uint32 max_pinned_buffers;
|
||||||
@ -429,6 +431,14 @@ read_stream_begin_relation(int flags,
|
|||||||
/* Cap to INT16_MAX to avoid overflowing below */
|
/* Cap to INT16_MAX to avoid overflowing below */
|
||||||
max_ios = Min(max_ios, PG_INT16_MAX);
|
max_ios = Min(max_ios, PG_INT16_MAX);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If starting a multi-block I/O near the end of the queue, we might
|
||||||
|
* temporarily need extra space for overflowing buffers before they are
|
||||||
|
* moved to regular circular position. This is the maximum extra space we
|
||||||
|
* could need.
|
||||||
|
*/
|
||||||
|
queue_overflow = io_combine_limit - 1;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Choose the maximum number of buffers we're prepared to pin. We try to
|
* Choose the maximum number of buffers we're prepared to pin. We try to
|
||||||
* pin fewer if we can, though. We clamp it to at least io_combine_limit
|
* pin fewer if we can, though. We clamp it to at least io_combine_limit
|
||||||
@ -439,7 +449,7 @@ read_stream_begin_relation(int flags,
|
|||||||
*/
|
*/
|
||||||
max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
|
max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
|
||||||
max_pinned_buffers = Min(max_pinned_buffers,
|
max_pinned_buffers = Min(max_pinned_buffers,
|
||||||
PG_INT16_MAX - io_combine_limit - 1);
|
PG_INT16_MAX - queue_overflow - 1);
|
||||||
|
|
||||||
/* Give the strategy a chance to limit the number of buffers we pin. */
|
/* Give the strategy a chance to limit the number of buffers we pin. */
|
||||||
strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
|
strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
|
||||||
@ -465,18 +475,17 @@ read_stream_begin_relation(int flags,
|
|||||||
* one big chunk. Though we have queue_size buffers, we want to be able
|
* one big chunk. Though we have queue_size buffers, we want to be able
|
||||||
* to assume that all the buffers for a single read are contiguous (i.e.
|
* to assume that all the buffers for a single read are contiguous (i.e.
|
||||||
* don't wrap around halfway through), so we allow temporary overflows of
|
* don't wrap around halfway through), so we allow temporary overflows of
|
||||||
* up to the maximum possible read size by allocating an extra
|
* up to the maximum possible overflow size.
|
||||||
* io_combine_limit - 1 elements.
|
|
||||||
*/
|
*/
|
||||||
size = offsetof(ReadStream, buffers);
|
size = offsetof(ReadStream, buffers);
|
||||||
size += sizeof(Buffer) * (queue_size + io_combine_limit - 1);
|
size += sizeof(Buffer) * (queue_size + queue_overflow);
|
||||||
size += sizeof(InProgressIO) * Max(1, max_ios);
|
size += sizeof(InProgressIO) * Max(1, max_ios);
|
||||||
size += per_buffer_data_size * queue_size;
|
size += per_buffer_data_size * queue_size;
|
||||||
size += MAXIMUM_ALIGNOF * 2;
|
size += MAXIMUM_ALIGNOF * 2;
|
||||||
stream = (ReadStream *) palloc(size);
|
stream = (ReadStream *) palloc(size);
|
||||||
memset(stream, 0, offsetof(ReadStream, buffers));
|
memset(stream, 0, offsetof(ReadStream, buffers));
|
||||||
stream->ios = (InProgressIO *)
|
stream->ios = (InProgressIO *)
|
||||||
MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]);
|
MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
|
||||||
if (per_buffer_data_size > 0)
|
if (per_buffer_data_size > 0)
|
||||||
stream->per_buffer_data = (void *)
|
stream->per_buffer_data = (void *)
|
||||||
MAXALIGN(&stream->ios[Max(1, max_ios)]);
|
MAXALIGN(&stream->ios[Max(1, max_ios)]);
|
||||||
@ -503,7 +512,14 @@ read_stream_begin_relation(int flags,
|
|||||||
if (max_ios == 0)
|
if (max_ios == 0)
|
||||||
max_ios = 1;
|
max_ios = 1;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Capture stable values for these two GUC-derived numbers for the
|
||||||
|
* lifetime of this stream, so we don't have to worry about the GUCs
|
||||||
|
* changing underneath us beyond this point.
|
||||||
|
*/
|
||||||
stream->max_ios = max_ios;
|
stream->max_ios = max_ios;
|
||||||
|
stream->io_combine_limit = io_combine_limit;
|
||||||
|
|
||||||
stream->per_buffer_data_size = per_buffer_data_size;
|
stream->per_buffer_data_size = per_buffer_data_size;
|
||||||
stream->max_pinned_buffers = max_pinned_buffers;
|
stream->max_pinned_buffers = max_pinned_buffers;
|
||||||
stream->queue_size = queue_size;
|
stream->queue_size = queue_size;
|
||||||
@ -517,7 +533,7 @@ read_stream_begin_relation(int flags,
|
|||||||
* doing full io_combine_limit sized reads (behavior B).
|
* doing full io_combine_limit sized reads (behavior B).
|
||||||
*/
|
*/
|
||||||
if (flags & READ_STREAM_FULL)
|
if (flags & READ_STREAM_FULL)
|
||||||
stream->distance = Min(max_pinned_buffers, io_combine_limit);
|
stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
|
||||||
else
|
else
|
||||||
stream->distance = 1;
|
stream->distance = 1;
|
||||||
|
|
||||||
@ -683,14 +699,14 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
/* No advice; move towards io_combine_limit (behavior B). */
|
/* No advice; move towards io_combine_limit (behavior B). */
|
||||||
if (stream->distance > io_combine_limit)
|
if (stream->distance > stream->io_combine_limit)
|
||||||
{
|
{
|
||||||
stream->distance--;
|
stream->distance--;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
distance = stream->distance * 2;
|
distance = stream->distance * 2;
|
||||||
distance = Min(distance, io_combine_limit);
|
distance = Min(distance, stream->io_combine_limit);
|
||||||
distance = Min(distance, stream->max_pinned_buffers);
|
distance = Min(distance, stream->max_pinned_buffers);
|
||||||
stream->distance = distance;
|
stream->distance = distance;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user