diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 14fc1bd1248..323382dcfa8 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1252,12 +1252,13 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, Buffer *buffers, BlockNumber blockNum, int *nblocks, - int flags) + int flags, + bool allow_forwarding) { int actual_nblocks = *nblocks; - int io_buffers_len = 0; int maxcombine = 0; + Assert(*nblocks == 1 || allow_forwarding); Assert(*nblocks > 0); Assert(*nblocks <= MAX_IO_COMBINE_LIMIT); @@ -1265,30 +1266,81 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, { bool found; - buffers[i] = PinBufferForBlock(operation->rel, - operation->smgr, - operation->persistence, - operation->forknum, - blockNum + i, - operation->strategy, - &found); + if (allow_forwarding && buffers[i] != InvalidBuffer) + { + BufferDesc *bufHdr; + + /* + * This is a buffer that was pinned by an earlier call to + * StartReadBuffers(), but couldn't be handled in one operation at + * that time. The operation was split, and the caller has passed + * an already pinned buffer back to us to handle the rest of the + * operation. It must continue at the expected block number. + */ + Assert(BufferGetBlockNumber(buffers[i]) == blockNum + i); + + /* + * It might be an already valid buffer (a hit) that followed the + * final contiguous block of an earlier I/O (a miss) marking the + * end of it, or a buffer that some other backend has since made + * valid by performing the I/O for us, in which case we can handle + * it as a hit now. It is safe to check for a BM_VALID flag with + * a relaxed load, because we got a fresh view of it while pinning + * it in the previous call. + * + * On the other hand if we don't see BM_VALID yet, it must be an + * I/O that was split by the previous call and we need to try to + * start a new I/O from this block. We're also racing against any + * other backend that might start the I/O or even manage to mark + * it BM_VALID after this check, but StartBufferIO() will handle + * those cases. + */ + if (BufferIsLocal(buffers[i])) + bufHdr = GetLocalBufferDescriptor(-buffers[i] - 1); + else + bufHdr = GetBufferDescriptor(buffers[i] - 1); + Assert(pg_atomic_read_u32(&bufHdr->state) & BM_TAG_VALID); + found = pg_atomic_read_u32(&bufHdr->state) & BM_VALID; + } + else + { + buffers[i] = PinBufferForBlock(operation->rel, + operation->smgr, + operation->persistence, + operation->forknum, + blockNum + i, + operation->strategy, + &found); + } if (found) { /* - * Terminate the read as soon as we get a hit. It could be a - * single buffer hit, or it could be a hit that follows a readable - * range. We don't want to create more than one readable range, - * so we stop here. + * We have a hit. If it's the first block in the requested range, + * we can return it immediately and report that WaitReadBuffers() + * does not need to be called. If the initial value of *nblocks + * was larger, the caller will have to call again for the rest. */ - actual_nblocks = i + 1; + if (i == 0) + { + *nblocks = 1; + return false; + } + + /* + * Otherwise we already have an I/O to perform, but this block + * can't be included as it is already valid. Split the I/O here. + * There may or may not be more blocks requiring I/O after this + * one, we haven't checked, but they can't be contiguous with this + * one in the way. We'll leave this buffer pinned, forwarding it + * to the next call, avoiding the need to unpin it here and re-pin + * it in the next call. + */ + actual_nblocks = i; break; } else { - /* Extend the readable range to cover this block. */ - io_buffers_len++; - /* * Check how many blocks we can cover with the same IO. The smgr * implementation might e.g. be limited due to a segment boundary. @@ -1309,15 +1361,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, } *nblocks = actual_nblocks; - if (likely(io_buffers_len == 0)) - return false; - /* Populate information needed for I/O. */ operation->buffers = buffers; operation->blocknum = blockNum; operation->flags = flags; operation->nblocks = actual_nblocks; - operation->io_buffers_len = io_buffers_len; if (flags & READ_BUFFERS_ISSUE_ADVICE) { @@ -1332,7 +1380,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, smgrprefetch(operation->smgr, operation->forknum, blockNum, - operation->io_buffers_len); + actual_nblocks); } /* Indicate that WaitReadBuffers() should be called. */ @@ -1341,16 +1389,26 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, /* * Begin reading a range of blocks beginning at blockNum and extending for - * *nblocks. On return, up to *nblocks pinned buffers holding those blocks - * are written into the buffers array, and *nblocks is updated to contain the - * actual number, which may be fewer than requested. Caller sets some of the - * members of operation; see struct definition. + * *nblocks. *nblocks and the buffers array are in/out parameters. On entry, + * the buffers elements covered by *nblocks must hold either InvalidBuffer or + * buffers forwarded by an earlier call to StartReadBuffers() that was split + * and is now being continued. On return, *nblocks holds the number of blocks + * accepted by this operation. If it is less than the original number then + * this operation has been split, but buffer elements up to the original + * requested size may hold forwarded buffers to be used for a continuing + * operation. The caller must either start a new I/O beginning at the block + * immediately following the blocks accepted by this call and pass those + * buffers back in, or release them if it chooses not to. It shouldn't make + * any other use of or assumptions about forwarded buffers. * - * If false is returned, no I/O is necessary. If true is returned, one I/O - * has been started, and WaitReadBuffers() must be called with the same - * operation object before the buffers are accessed. Along with the operation - * object, the caller-supplied array of buffers must remain valid until - * WaitReadBuffers() is called. + * If false is returned, no I/O is necessary and the buffers covered by + * *nblocks on exit are valid and ready to be accessed. If true is returned, + * an I/O has been started, and WaitReadBuffers() must be called with the same + * operation object before the buffers covered by *nblocks on exit can be + * accessed. Along with the operation object, the caller-supplied array of + * buffers must remain valid until WaitReadBuffers() is called, and any + * forwarded buffers must also be preserved for a continuing call unless + * they are explicitly released. * * Currently the I/O is only started with optional operating system advice if * requested by the caller with READ_BUFFERS_ISSUE_ADVICE, and the real I/O @@ -1364,13 +1422,17 @@ StartReadBuffers(ReadBuffersOperation *operation, int *nblocks, int flags) { - return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags); + return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags, + true /* expect forwarded buffers */ ); } /* * Single block version of the StartReadBuffers(). This might save a few * instructions when called from another translation unit, because it is * specialized for nblocks == 1. + * + * This version does not support "forwarded" buffers: they cannot be created + * by reading only one block and *buffer is ignored on entry. */ bool StartReadBuffer(ReadBuffersOperation *operation, @@ -1381,7 +1443,8 @@ StartReadBuffer(ReadBuffersOperation *operation, int nblocks = 1; bool result; - result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags); + result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags, + false /* single block, no forwarding */ ); Assert(nblocks == 1); /* single block can't be short */ return result; @@ -1407,24 +1470,16 @@ WaitReadBuffers(ReadBuffersOperation *operation) IOObject io_object; char persistence; - /* - * Currently operations are only allowed to include a read of some range, - * with an optional extra buffer that is already pinned at the end. So - * nblocks can be at most one more than io_buffers_len. - */ - Assert((operation->nblocks == operation->io_buffers_len) || - (operation->nblocks == operation->io_buffers_len + 1)); - /* Find the range of the physical read we need to perform. */ - nblocks = operation->io_buffers_len; - if (nblocks == 0) - return; /* nothing to do */ - + nblocks = operation->nblocks; buffers = &operation->buffers[0]; blocknum = operation->blocknum; forknum = operation->forknum; persistence = operation->persistence; + Assert(nblocks > 0); + Assert(nblocks <= MAX_IO_COMBINE_LIMIT); + if (persistence == RELPERSISTENCE_TEMP) { io_context = IOCONTEXT_NORMAL; diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index ecc1c3a909b..538b890a51d 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -130,7 +130,6 @@ struct ReadBuffersOperation BlockNumber blocknum; int flags; int16 nblocks; - int16 io_buffers_len; }; typedef struct ReadBuffersOperation ReadBuffersOperation;