mirror of
https://github.com/postgres/postgres.git
synced 2025-05-02 11:44:50 +03:00
Support buffer forwarding in StartReadBuffers().
StartReadBuffers() reports a short read when it finds a cached block that ends a range needing I/O by updating the caller's *nblocks. It doesn't want to have to unpin the trailing hit that it knows the caller wants, so the v17 version used sleight of hand in the name of simplicity: it included it in *nblocks as if it were part of the I/O, but internally tracked the shorter real I/O size in io_buffers_len (now removed). This API change "forwards" the delimiting buffer to the next call. It's still pinned, and still stored in the caller's array, but *nblocks no longer includes stray buffers that are not really part of the operation. The expectation is that the caller still wants the rest of the blocks and will call again starting from that point, and now it can pass the already pinned buffer back in (or choose not to and release it). The change is needed for the coming asynchronous I/O version's larger version of the problem: by definition it must move BM_IO_IN_PROGRESS negotiation from WaitReadBuffers() to StartReadBuffers(), but it might already have many buffers pinned before it discovers a need to split an I/O. (The current synchronous I/O version hides that detail from callers by looping over smaller reads if required to make all covered buffers valid in WaitReadBuffers(), so it looks like one operation but it might occasionally be several under the covers.) Aside from avoiding unnecessary pin traffic, this will also be important for later work on out-of-order streams: you can't prioritize data that is already available right now if that fact is hidden from you. The new API is natural for read_stream.c (see ed0b87ca). After a short read it leaves forwarded buffers where they fell in its circular queue for the continuing call to pick up. Single-block StartReadBuffer() and traditional ReadBuffer() share code but are not affected by the change. They don't do multi-block I/O. Reviewed-by: Andres Freund <andres@anarazel.de> (earlier versions) Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com
This commit is contained in:
parent
ed0b87caac
commit
ce1a75c4fe
@ -1252,12 +1252,13 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
|
||||
Buffer *buffers,
|
||||
BlockNumber blockNum,
|
||||
int *nblocks,
|
||||
int flags)
|
||||
int flags,
|
||||
bool allow_forwarding)
|
||||
{
|
||||
int actual_nblocks = *nblocks;
|
||||
int io_buffers_len = 0;
|
||||
int maxcombine = 0;
|
||||
|
||||
Assert(*nblocks == 1 || allow_forwarding);
|
||||
Assert(*nblocks > 0);
|
||||
Assert(*nblocks <= MAX_IO_COMBINE_LIMIT);
|
||||
|
||||
@ -1265,6 +1266,44 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
|
||||
{
|
||||
bool found;
|
||||
|
||||
if (allow_forwarding && buffers[i] != InvalidBuffer)
|
||||
{
|
||||
BufferDesc *bufHdr;
|
||||
|
||||
/*
|
||||
* This is a buffer that was pinned by an earlier call to
|
||||
* StartReadBuffers(), but couldn't be handled in one operation at
|
||||
* that time. The operation was split, and the caller has passed
|
||||
* an already pinned buffer back to us to handle the rest of the
|
||||
* operation. It must continue at the expected block number.
|
||||
*/
|
||||
Assert(BufferGetBlockNumber(buffers[i]) == blockNum + i);
|
||||
|
||||
/*
|
||||
* It might be an already valid buffer (a hit) that followed the
|
||||
* final contiguous block of an earlier I/O (a miss) marking the
|
||||
* end of it, or a buffer that some other backend has since made
|
||||
* valid by performing the I/O for us, in which case we can handle
|
||||
* it as a hit now. It is safe to check for a BM_VALID flag with
|
||||
* a relaxed load, because we got a fresh view of it while pinning
|
||||
* it in the previous call.
|
||||
*
|
||||
* On the other hand if we don't see BM_VALID yet, it must be an
|
||||
* I/O that was split by the previous call and we need to try to
|
||||
* start a new I/O from this block. We're also racing against any
|
||||
* other backend that might start the I/O or even manage to mark
|
||||
* it BM_VALID after this check, but StartBufferIO() will handle
|
||||
* those cases.
|
||||
*/
|
||||
if (BufferIsLocal(buffers[i]))
|
||||
bufHdr = GetLocalBufferDescriptor(-buffers[i] - 1);
|
||||
else
|
||||
bufHdr = GetBufferDescriptor(buffers[i] - 1);
|
||||
Assert(pg_atomic_read_u32(&bufHdr->state) & BM_TAG_VALID);
|
||||
found = pg_atomic_read_u32(&bufHdr->state) & BM_VALID;
|
||||
}
|
||||
else
|
||||
{
|
||||
buffers[i] = PinBufferForBlock(operation->rel,
|
||||
operation->smgr,
|
||||
operation->persistence,
|
||||
@ -1272,23 +1311,36 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
|
||||
blockNum + i,
|
||||
operation->strategy,
|
||||
&found);
|
||||
}
|
||||
|
||||
if (found)
|
||||
{
|
||||
/*
|
||||
* Terminate the read as soon as we get a hit. It could be a
|
||||
* single buffer hit, or it could be a hit that follows a readable
|
||||
* range. We don't want to create more than one readable range,
|
||||
* so we stop here.
|
||||
* We have a hit. If it's the first block in the requested range,
|
||||
* we can return it immediately and report that WaitReadBuffers()
|
||||
* does not need to be called. If the initial value of *nblocks
|
||||
* was larger, the caller will have to call again for the rest.
|
||||
*/
|
||||
actual_nblocks = i + 1;
|
||||
if (i == 0)
|
||||
{
|
||||
*nblocks = 1;
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Otherwise we already have an I/O to perform, but this block
|
||||
* can't be included as it is already valid. Split the I/O here.
|
||||
* There may or may not be more blocks requiring I/O after this
|
||||
* one, we haven't checked, but they can't be contiguous with this
|
||||
* one in the way. We'll leave this buffer pinned, forwarding it
|
||||
* to the next call, avoiding the need to unpin it here and re-pin
|
||||
* it in the next call.
|
||||
*/
|
||||
actual_nblocks = i;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Extend the readable range to cover this block. */
|
||||
io_buffers_len++;
|
||||
|
||||
/*
|
||||
* Check how many blocks we can cover with the same IO. The smgr
|
||||
* implementation might e.g. be limited due to a segment boundary.
|
||||
@ -1309,15 +1361,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
|
||||
}
|
||||
*nblocks = actual_nblocks;
|
||||
|
||||
if (likely(io_buffers_len == 0))
|
||||
return false;
|
||||
|
||||
/* Populate information needed for I/O. */
|
||||
operation->buffers = buffers;
|
||||
operation->blocknum = blockNum;
|
||||
operation->flags = flags;
|
||||
operation->nblocks = actual_nblocks;
|
||||
operation->io_buffers_len = io_buffers_len;
|
||||
|
||||
if (flags & READ_BUFFERS_ISSUE_ADVICE)
|
||||
{
|
||||
@ -1332,7 +1380,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
|
||||
smgrprefetch(operation->smgr,
|
||||
operation->forknum,
|
||||
blockNum,
|
||||
operation->io_buffers_len);
|
||||
actual_nblocks);
|
||||
}
|
||||
|
||||
/* Indicate that WaitReadBuffers() should be called. */
|
||||
@ -1341,16 +1389,26 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
|
||||
|
||||
/*
|
||||
* Begin reading a range of blocks beginning at blockNum and extending for
|
||||
* *nblocks. On return, up to *nblocks pinned buffers holding those blocks
|
||||
* are written into the buffers array, and *nblocks is updated to contain the
|
||||
* actual number, which may be fewer than requested. Caller sets some of the
|
||||
* members of operation; see struct definition.
|
||||
* *nblocks. *nblocks and the buffers array are in/out parameters. On entry,
|
||||
* the buffers elements covered by *nblocks must hold either InvalidBuffer or
|
||||
* buffers forwarded by an earlier call to StartReadBuffers() that was split
|
||||
* and is now being continued. On return, *nblocks holds the number of blocks
|
||||
* accepted by this operation. If it is less than the original number then
|
||||
* this operation has been split, but buffer elements up to the original
|
||||
* requested size may hold forwarded buffers to be used for a continuing
|
||||
* operation. The caller must either start a new I/O beginning at the block
|
||||
* immediately following the blocks accepted by this call and pass those
|
||||
* buffers back in, or release them if it chooses not to. It shouldn't make
|
||||
* any other use of or assumptions about forwarded buffers.
|
||||
*
|
||||
* If false is returned, no I/O is necessary. If true is returned, one I/O
|
||||
* has been started, and WaitReadBuffers() must be called with the same
|
||||
* operation object before the buffers are accessed. Along with the operation
|
||||
* object, the caller-supplied array of buffers must remain valid until
|
||||
* WaitReadBuffers() is called.
|
||||
* If false is returned, no I/O is necessary and the buffers covered by
|
||||
* *nblocks on exit are valid and ready to be accessed. If true is returned,
|
||||
* an I/O has been started, and WaitReadBuffers() must be called with the same
|
||||
* operation object before the buffers covered by *nblocks on exit can be
|
||||
* accessed. Along with the operation object, the caller-supplied array of
|
||||
* buffers must remain valid until WaitReadBuffers() is called, and any
|
||||
* forwarded buffers must also be preserved for a continuing call unless
|
||||
* they are explicitly released.
|
||||
*
|
||||
* Currently the I/O is only started with optional operating system advice if
|
||||
* requested by the caller with READ_BUFFERS_ISSUE_ADVICE, and the real I/O
|
||||
@ -1364,13 +1422,17 @@ StartReadBuffers(ReadBuffersOperation *operation,
|
||||
int *nblocks,
|
||||
int flags)
|
||||
{
|
||||
return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags);
|
||||
return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags,
|
||||
true /* expect forwarded buffers */ );
|
||||
}
|
||||
|
||||
/*
|
||||
* Single block version of the StartReadBuffers(). This might save a few
|
||||
* instructions when called from another translation unit, because it is
|
||||
* specialized for nblocks == 1.
|
||||
*
|
||||
* This version does not support "forwarded" buffers: they cannot be created
|
||||
* by reading only one block and *buffer is ignored on entry.
|
||||
*/
|
||||
bool
|
||||
StartReadBuffer(ReadBuffersOperation *operation,
|
||||
@ -1381,7 +1443,8 @@ StartReadBuffer(ReadBuffersOperation *operation,
|
||||
int nblocks = 1;
|
||||
bool result;
|
||||
|
||||
result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags);
|
||||
result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags,
|
||||
false /* single block, no forwarding */ );
|
||||
Assert(nblocks == 1); /* single block can't be short */
|
||||
|
||||
return result;
|
||||
@ -1407,24 +1470,16 @@ WaitReadBuffers(ReadBuffersOperation *operation)
|
||||
IOObject io_object;
|
||||
char persistence;
|
||||
|
||||
/*
|
||||
* Currently operations are only allowed to include a read of some range,
|
||||
* with an optional extra buffer that is already pinned at the end. So
|
||||
* nblocks can be at most one more than io_buffers_len.
|
||||
*/
|
||||
Assert((operation->nblocks == operation->io_buffers_len) ||
|
||||
(operation->nblocks == operation->io_buffers_len + 1));
|
||||
|
||||
/* Find the range of the physical read we need to perform. */
|
||||
nblocks = operation->io_buffers_len;
|
||||
if (nblocks == 0)
|
||||
return; /* nothing to do */
|
||||
|
||||
nblocks = operation->nblocks;
|
||||
buffers = &operation->buffers[0];
|
||||
blocknum = operation->blocknum;
|
||||
forknum = operation->forknum;
|
||||
persistence = operation->persistence;
|
||||
|
||||
Assert(nblocks > 0);
|
||||
Assert(nblocks <= MAX_IO_COMBINE_LIMIT);
|
||||
|
||||
if (persistence == RELPERSISTENCE_TEMP)
|
||||
{
|
||||
io_context = IOCONTEXT_NORMAL;
|
||||
|
@ -130,7 +130,6 @@ struct ReadBuffersOperation
|
||||
BlockNumber blocknum;
|
||||
int flags;
|
||||
int16 nblocks;
|
||||
int16 io_buffers_len;
|
||||
};
|
||||
|
||||
typedef struct ReadBuffersOperation ReadBuffersOperation;
|
||||
|
Loading…
x
Reference in New Issue
Block a user