1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-25 13:17:41 +03:00

bufmgr: Introduce infrastructure for faster relation extension

The primary bottlenecks for relation extension are:

1) The extension lock is held while acquiring a victim buffer for the new
   page. Acquiring a victim buffer can require writing out the old page
   contents including possibly needing to flush WAL.

2) When extending via ReadBuffer() et al, we write a zero page during the
   extension, and then later write out the actual page contents. This can
   nearly double the write rate.

3) The existing bulk relation extension infrastructure in hio.c just amortized
   the cost of acquiring the relation extension lock, but none of the other
   costs.

Unfortunately 1) cannot currently be addressed in a central manner as the
callers to ReadBuffer() need to acquire the extension lock. To address that,
this this commit moves the responsibility for acquiring the extension lock
into bufmgr.c functions. That allows to acquire the relation extension lock
for just the required time. This will also allow us to improve relation
extension further, without changing callers.

The reason we write all-zeroes pages during relation extension is that we hope
to get ENOSPC errors earlier that way (largely works, except for CoW
filesystems). It is easier to handle out-of-space errors gracefully if the
page doesn't yet contain actual tuples. This commit addresses 2), by using the
recently introduced smgrzeroextend(), which extends the relation, without
dirtying the kernel page cache for all the extended pages.

To address 3), this commit introduces a function to extend a relation by
multiple blocks at a time.

There are three new exposed functions: ExtendBufferedRel() for extending the
relation by a single block, ExtendBufferedRelBy() to extend a relation by
multiple blocks at once, and ExtendBufferedRelTo() for extending a relation up
to a certain size.

To avoid duplicating code between ReadBuffer(P_NEW) and the new functions,
ReadBuffer(P_NEW) now implements relation extension with
ExtendBufferedRel(), using a flag to tell ExtendBufferedRel() that the
relation lock is already held.

Note that this commit does not yet lead to a meaningful performance or
scalability improvement - for that uses of ReadBuffer(P_NEW) will need to be
converted to ExtendBuffered*(), which will be done in subsequent commits.

Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Discussion: https://postgr.es/m/20221029025420.eplyow6k7tgu6he3@awork3.anarazel.de
This commit is contained in:
Andres Freund
2023-04-05 16:21:09 -07:00
parent 8eda731465
commit 31966b151e
9 changed files with 905 additions and 185 deletions

View File

@@ -48,6 +48,7 @@
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/smgr.h"
#include "storage/standby.h"
@@ -450,6 +451,22 @@ static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence,
ForkNumber forkNum, BlockNumber blockNum,
ReadBufferMode mode, BufferAccessStrategy strategy,
bool *hit);
static BlockNumber ExtendBufferedRelCommon(ExtendBufferedWhat eb,
ForkNumber fork,
BufferAccessStrategy strategy,
uint32 flags,
uint32 extend_by,
BlockNumber extend_upto,
Buffer *buffers,
uint32 *extended_by);
static BlockNumber ExtendBufferedRelShared(ExtendBufferedWhat eb,
ForkNumber fork,
BufferAccessStrategy strategy,
uint32 flags,
uint32 extend_by,
BlockNumber extend_upto,
Buffer *buffers,
uint32 *extended_by);
static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy);
static void PinBuffer_Locked(BufferDesc *buf);
static void UnpinBuffer(BufferDesc *buf);
@@ -785,6 +802,180 @@ ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum,
mode, strategy, &hit);
}
/*
* Convenience wrapper around ExtendBufferedRelBy() extending by one block.
*/
Buffer
ExtendBufferedRel(ExtendBufferedWhat eb,
ForkNumber forkNum,
BufferAccessStrategy strategy,
uint32 flags)
{
Buffer buf;
uint32 extend_by = 1;
ExtendBufferedRelBy(eb, forkNum, strategy, flags, extend_by,
&buf, &extend_by);
return buf;
}
/*
* Extend relation by multiple blocks.
*
* Tries to extend the relation by extend_by blocks. Depending on the
* availability of resources the relation may end up being extended by a
* smaller number of pages (unless an error is thrown, always by at least one
* page). *extended_by is updated to the number of pages the relation has been
* extended to.
*
* buffers needs to be an array that is at least extend_by long. Upon
* completion, the first extend_by array elements will point to a pinned
* buffer.
*
* If EB_LOCK_FIRST is part of flags, the first returned buffer is
* locked. This is useful for callers that want a buffer that is guaranteed to
* be empty.
*/
BlockNumber
ExtendBufferedRelBy(ExtendBufferedWhat eb,
ForkNumber fork,
BufferAccessStrategy strategy,
uint32 flags,
uint32 extend_by,
Buffer *buffers,
uint32 *extended_by)
{
Assert((eb.rel != NULL) != (eb.smgr != NULL));
Assert(eb.smgr == NULL || eb.relpersistence != 0);
Assert(extend_by > 0);
if (eb.smgr == NULL)
{
eb.smgr = RelationGetSmgr(eb.rel);
eb.relpersistence = eb.rel->rd_rel->relpersistence;
}
return ExtendBufferedRelCommon(eb, fork, strategy, flags,
extend_by, InvalidBlockNumber,
buffers, extended_by);
}
/*
* Extend the relation so it is at least extend_to blocks large, return buffer
* (extend_to - 1).
*
* This is useful for callers that want to write a specific page, regardless
* of the current size of the relation (e.g. useful for visibilitymap and for
* crash recovery).
*/
Buffer
ExtendBufferedRelTo(ExtendBufferedWhat eb,
ForkNumber fork,
BufferAccessStrategy strategy,
uint32 flags,
BlockNumber extend_to,
ReadBufferMode mode)
{
BlockNumber current_size;
uint32 extended_by = 0;
Buffer buffer = InvalidBuffer;
Buffer buffers[64];
Assert((eb.rel != NULL) != (eb.smgr != NULL));
Assert(eb.smgr == NULL || eb.relpersistence != 0);
Assert(extend_to != InvalidBlockNumber && extend_to > 0);
Assert(mode == RBM_NORMAL || mode == RBM_ZERO_ON_ERROR ||
mode == RBM_ZERO_AND_LOCK);
if (eb.smgr == NULL)
{
eb.smgr = RelationGetSmgr(eb.rel);
eb.relpersistence = eb.rel->rd_rel->relpersistence;
}
/*
* If desired, create the file if it doesn't exist. If
* smgr_cached_nblocks[fork] is positive then it must exist, no need for
* an smgrexists call.
*/
if ((flags & EB_CREATE_FORK_IF_NEEDED) &&
(eb.smgr->smgr_cached_nblocks[fork] == 0 ||
eb.smgr->smgr_cached_nblocks[fork] == InvalidBlockNumber) &&
!smgrexists(eb.smgr, fork))
{
LockRelationForExtension(eb.rel, ExclusiveLock);
/* could have been closed while waiting for lock */
if (eb.rel)
eb.smgr = RelationGetSmgr(eb.rel);
/* recheck, fork might have been created concurrently */
if (!smgrexists(eb.smgr, fork))
smgrcreate(eb.smgr, fork, flags & EB_PERFORMING_RECOVERY);
UnlockRelationForExtension(eb.rel, ExclusiveLock);
}
/*
* If requested, invalidate size cache, so that smgrnblocks asks the
* kernel.
*/
if (flags & EB_CLEAR_SIZE_CACHE)
eb.smgr->smgr_cached_nblocks[fork] = InvalidBlockNumber;
/*
* Estimate how many pages we'll need to extend by. This avoids acquiring
* unnecessarily many victim buffers.
*/
current_size = smgrnblocks(eb.smgr, fork);
if (mode == RBM_ZERO_AND_LOCK)
flags |= EB_LOCK_TARGET;
while (current_size < extend_to)
{
uint32 num_pages = lengthof(buffers);
BlockNumber first_block;
if ((uint64) current_size + num_pages > extend_to)
num_pages = extend_to - current_size;
first_block = ExtendBufferedRelCommon(eb, fork, strategy, flags,
num_pages, extend_to,
buffers, &extended_by);
current_size = first_block + extended_by;
Assert(current_size <= extend_to);
Assert(num_pages != 0 || current_size >= extend_to);
for (int i = 0; i < extended_by; i++)
{
if (first_block + i != extend_to - 1)
ReleaseBuffer(buffers[i]);
else
buffer = buffers[i];
}
}
/*
* It's possible that another backend concurrently extended the relation.
* In that case read the buffer.
*
* XXX: Should we control this via a flag?
*/
if (buffer == InvalidBuffer)
{
bool hit;
Assert(extended_by == 0);
buffer = ReadBuffer_common(eb.smgr, eb.relpersistence,
fork, extend_to - 1, mode, strategy,
&hit);
}
return buffer;
}
/*
* ReadBuffer_common -- common logic for all ReadBuffer variants
@@ -801,35 +992,38 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
bool found;
IOContext io_context;
IOObject io_object;
bool isExtend;
bool isLocalBuf = SmgrIsTemp(smgr);
*hit = false;
/*
* Backward compatibility path, most code should use ExtendBufferedRel()
* instead, as acquiring the extension lock inside ExtendBufferedRel()
* scales a lot better.
*/
if (unlikely(blockNum == P_NEW))
{
uint32 flags = EB_SKIP_EXTENSION_LOCK;
Assert(mode == RBM_NORMAL ||
mode == RBM_ZERO_AND_LOCK ||
mode == RBM_ZERO_ON_ERROR);
if (mode == RBM_ZERO_AND_LOCK)
flags |= EB_LOCK_FIRST;
return ExtendBufferedRel(EB_SMGR(smgr, relpersistence),
forkNum, strategy, flags);
}
/* Make sure we will have room to remember the buffer pin */
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
isExtend = (blockNum == P_NEW);
TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
smgr->smgr_rlocator.locator.spcOid,
smgr->smgr_rlocator.locator.dbOid,
smgr->smgr_rlocator.locator.relNumber,
smgr->smgr_rlocator.backend,
isExtend);
/* Substitute proper block number if caller asked for P_NEW */
if (isExtend)
{
blockNum = smgrnblocks(smgr, forkNum);
/* Fail if relation is already at maximum possible length */
if (blockNum == P_NEW)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("cannot extend relation %s beyond %u blocks",
relpath(smgr->smgr_rlocator, forkNum),
P_NEW)));
}
smgr->smgr_rlocator.backend);
if (isLocalBuf)
{
@@ -844,8 +1038,6 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
if (found)
pgBufferUsage.local_blks_hit++;
else if (isExtend)
pgBufferUsage.local_blks_written++;
else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
mode == RBM_ZERO_ON_ERROR)
pgBufferUsage.local_blks_read++;
@@ -862,8 +1054,6 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
strategy, &found, io_context);
if (found)
pgBufferUsage.shared_blks_hit++;
else if (isExtend)
pgBufferUsage.shared_blks_written++;
else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
mode == RBM_ZERO_ON_ERROR)
pgBufferUsage.shared_blks_read++;
@@ -874,175 +1064,91 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
/* if it was already in the buffer pool, we're done */
if (found)
{
if (!isExtend)
{
/* Just need to update stats before we exit */
*hit = true;
VacuumPageHit++;
pgstat_count_io_op(io_object, io_context, IOOP_HIT);
/* Just need to update stats before we exit */
*hit = true;
VacuumPageHit++;
pgstat_count_io_op(io_object, io_context, IOOP_HIT);
if (VacuumCostActive)
VacuumCostBalance += VacuumCostPageHit;
if (VacuumCostActive)
VacuumCostBalance += VacuumCostPageHit;
TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
smgr->smgr_rlocator.locator.spcOid,
smgr->smgr_rlocator.locator.dbOid,
smgr->smgr_rlocator.locator.relNumber,
smgr->smgr_rlocator.backend,
isExtend,
found);
/*
* In RBM_ZERO_AND_LOCK mode the caller expects the page to be
* locked on return.
*/
if (!isLocalBuf)
{
if (mode == RBM_ZERO_AND_LOCK)
LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
LW_EXCLUSIVE);
else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
}
return BufferDescriptorGetBuffer(bufHdr);
}
TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
smgr->smgr_rlocator.locator.spcOid,
smgr->smgr_rlocator.locator.dbOid,
smgr->smgr_rlocator.locator.relNumber,
smgr->smgr_rlocator.backend,
found);
/*
* We get here only in the corner case where we are trying to extend
* the relation but we found a pre-existing buffer marked BM_VALID.
* This can happen because mdread doesn't complain about reads beyond
* EOF (when zero_damaged_pages is ON) and so a previous attempt to
* read a block beyond EOF could have left a "valid" zero-filled
* buffer. Unfortunately, we have also seen this case occurring
* because of buggy Linux kernels that sometimes return an
* lseek(SEEK_END) result that doesn't account for a recent write. In
* that situation, the pre-existing buffer would contain valid data
* that we don't want to overwrite. Since the legitimate case should
* always have left a zero-filled buffer, complain if not PageIsNew.
* In RBM_ZERO_AND_LOCK mode the caller expects the page to be locked
* on return.
*/
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
if (!PageIsNew((Page) bufBlock))
ereport(ERROR,
(errmsg("unexpected data beyond EOF in block %u of relation %s",
blockNum, relpath(smgr->smgr_rlocator, forkNum)),
errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
/*
* We *must* do smgrextend before succeeding, else the page will not
* be reserved by the kernel, and the next P_NEW call will decide to
* return the same page. Clear the BM_VALID bit, do the StartBufferIO
* call that BufferAlloc didn't, and proceed.
*/
if (isLocalBuf)
if (!isLocalBuf)
{
/* Only need to adjust flags */
uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
Assert(buf_state & BM_VALID);
buf_state &= ~BM_VALID;
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
if (mode == RBM_ZERO_AND_LOCK)
LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
LW_EXCLUSIVE);
else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
}
else
{
/*
* Loop to handle the very small possibility that someone re-sets
* BM_VALID between our clearing it and StartBufferIO inspecting
* it.
*/
do
{
uint32 buf_state = LockBufHdr(bufHdr);
Assert(buf_state & BM_VALID);
buf_state &= ~BM_VALID;
UnlockBufHdr(bufHdr, buf_state);
} while (!StartBufferIO(bufHdr, true));
}
return BufferDescriptorGetBuffer(bufHdr);
}
/*
* if we have gotten to this point, we have allocated a buffer for the
* page but its contents are not yet valid. IO_IN_PROGRESS is set for it,
* if it's a shared buffer.
*
* Note: if smgrextend fails, we will end up with a buffer that is
* allocated but not marked BM_VALID. P_NEW will still select the same
* block number (because the relation didn't get any longer on disk) and
* so future attempts to extend the relation will find the same buffer (if
* it's not been recycled) but come right back here to try smgrextend
* again.
*/
Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
if (isExtend)
{
/* new buffers are zero-filled */
/*
* Read in the page, unless the caller intends to overwrite it and just
* wants us to allocate a buffer.
*/
if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
MemSet((char *) bufBlock, 0, BLCKSZ);
/* don't set checksum for all-zero page */
smgrextend(smgr, forkNum, blockNum, bufBlock, false);
pgstat_count_io_op(io_object, io_context, IOOP_EXTEND);
/*
* NB: we're *not* doing a ScheduleBufferTagForWriteback here;
* although we're essentially performing a write. At least on linux
* doing so defeats the 'delayed allocation' mechanism, leading to
* increased file fragmentation.
*/
}
else
{
/*
* Read in the page, unless the caller intends to overwrite it and
* just wants us to allocate a buffer.
*/
if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
MemSet((char *) bufBlock, 0, BLCKSZ);
else
instr_time io_start,
io_time;
if (track_io_timing)
INSTR_TIME_SET_CURRENT(io_start);
smgrread(smgr, forkNum, blockNum, bufBlock);
if (track_io_timing)
{
instr_time io_start,
io_time;
INSTR_TIME_SET_CURRENT(io_time);
INSTR_TIME_SUBTRACT(io_time, io_start);
pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
}
if (track_io_timing)
INSTR_TIME_SET_CURRENT(io_start);
pgstat_count_io_op(io_object, io_context, IOOP_READ);
/* check for garbage data */
if (!PageIsVerifiedExtended((Page) bufBlock, blockNum,
PIV_LOG_WARNING | PIV_REPORT_STAT))
{
if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
{
ereport(WARNING,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("invalid page in block %u of relation %s; zeroing out page",
blockNum,
relpath(smgr->smgr_rlocator, forkNum))));
MemSet((char *) bufBlock, 0, BLCKSZ);
}
else
INSTR_TIME_SET_ZERO(io_start);
smgrread(smgr, forkNum, blockNum, bufBlock);
pgstat_count_io_op(io_object, io_context, IOOP_READ);
if (track_io_timing)
{
INSTR_TIME_SET_CURRENT(io_time);
INSTR_TIME_SUBTRACT(io_time, io_start);
pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
}
/* check for garbage data */
if (!PageIsVerifiedExtended((Page) bufBlock, blockNum,
PIV_LOG_WARNING | PIV_REPORT_STAT))
{
if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
{
ereport(WARNING,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("invalid page in block %u of relation %s; zeroing out page",
blockNum,
relpath(smgr->smgr_rlocator, forkNum))));
MemSet((char *) bufBlock, 0, BLCKSZ);
}
else
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("invalid page in block %u of relation %s",
blockNum,
relpath(smgr->smgr_rlocator, forkNum))));
}
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("invalid page in block %u of relation %s",
blockNum,
relpath(smgr->smgr_rlocator, forkNum))));
}
}
@@ -1085,7 +1191,6 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
smgr->smgr_rlocator.locator.dbOid,
smgr->smgr_rlocator.locator.relNumber,
smgr->smgr_rlocator.backend,
isExtend,
found);
return BufferDescriptorGetBuffer(bufHdr);
@@ -1219,8 +1324,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
UnpinBuffer(victim_buf_hdr);
/*
* The victim buffer we acquired peviously is clean and unused,
* let it be found again quickly
* The victim buffer we acquired peviously is clean and unused, let it
* be found again quickly
*/
StrategyFreeBuffer(victim_buf_hdr);
@@ -1633,6 +1738,365 @@ again:
return buf;
}
/*
* Limit the number of pins a batch operation may additionally acquire, to
* avoid running out of pinnable buffers.
*
* One additional pin is always allowed, as otherwise the operation likely
* cannot be performed at all.
*
* The number of allowed pins for a backend is computed based on
* shared_buffers and the maximum number of connections possible. That's very
* pessimistic, but outside of toy-sized shared_buffers it should allow
* sufficient pins.
*/
static void
LimitAdditionalPins(uint32 *additional_pins)
{
uint32 max_backends;
int max_proportional_pins;
if (*additional_pins <= 1)
return;
max_backends = MaxBackends + NUM_AUXILIARY_PROCS;
max_proportional_pins = NBuffers / max_backends;
/*
* Subtract the approximate number of buffers already pinned by this
* backend. We get the number of "overflowed" pins for free, but don't
* know the number of pins in PrivateRefCountArray. The cost of
* calculating that exactly doesn't seem worth it, so just assume the max.
*/
max_proportional_pins -= PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES;
if (max_proportional_pins < 0)
max_proportional_pins = 1;
if (*additional_pins > max_proportional_pins)
*additional_pins = max_proportional_pins;
}
/*
* Logic shared between ExtendBufferedRelBy(), ExtendBufferedRelTo(). Just to
* avoid duplicating the tracing and relpersistence related logic.
*/
static BlockNumber
ExtendBufferedRelCommon(ExtendBufferedWhat eb,
ForkNumber fork,
BufferAccessStrategy strategy,
uint32 flags,
uint32 extend_by,
BlockNumber extend_upto,
Buffer *buffers,
uint32 *extended_by)
{
BlockNumber first_block;
TRACE_POSTGRESQL_BUFFER_EXTEND_START(fork,
eb.smgr->smgr_rlocator.locator.spcOid,
eb.smgr->smgr_rlocator.locator.dbOid,
eb.smgr->smgr_rlocator.locator.relNumber,
eb.smgr->smgr_rlocator.backend,
extend_by);
if (eb.relpersistence == RELPERSISTENCE_TEMP)
first_block = ExtendBufferedRelLocal(eb, fork, flags,
extend_by, extend_upto,
buffers, &extend_by);
else
first_block = ExtendBufferedRelShared(eb, fork, strategy, flags,
extend_by, extend_upto,
buffers, &extend_by);
*extended_by = extend_by;
TRACE_POSTGRESQL_BUFFER_EXTEND_DONE(fork,
eb.smgr->smgr_rlocator.locator.spcOid,
eb.smgr->smgr_rlocator.locator.dbOid,
eb.smgr->smgr_rlocator.locator.relNumber,
eb.smgr->smgr_rlocator.backend,
*extended_by,
first_block);
return first_block;
}
/*
* Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
* shared buffers.
*/
static BlockNumber
ExtendBufferedRelShared(ExtendBufferedWhat eb,
ForkNumber fork,
BufferAccessStrategy strategy,
uint32 flags,
uint32 extend_by,
BlockNumber extend_upto,
Buffer *buffers,
uint32 *extended_by)
{
BlockNumber first_block;
IOContext io_context = IOContextForStrategy(strategy);
LimitAdditionalPins(&extend_by);
/*
* Acquire victim buffers for extension without holding extension lock.
* Writing out victim buffers is the most expensive part of extending the
* relation, particularly when doing so requires WAL flushes. Zeroing out
* the buffers is also quite expensive, so do that before holding the
* extension lock as well.
*
* These pages are pinned by us and not valid. While we hold the pin they
* can't be acquired as victim buffers by another backend.
*/
for (uint32 i = 0; i < extend_by; i++)
{
Block buf_block;
buffers[i] = GetVictimBuffer(strategy, io_context);
buf_block = BufHdrGetBlock(GetBufferDescriptor(buffers[i] - 1));
/* new buffers are zero-filled */
MemSet((char *) buf_block, 0, BLCKSZ);
}
/* in case we need to pin an existing buffer below */
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
/*
* Lock relation against concurrent extensions, unless requested not to.
*
* We use the same extension lock for all forks. That's unnecessarily
* restrictive, but currently extensions for forks don't happen often
* enough to make it worth locking more granularly.
*
* Note that another backend might have extended the relation by the time
* we get the lock.
*/
if (!(flags & EB_SKIP_EXTENSION_LOCK))
{
LockRelationForExtension(eb.rel, ExclusiveLock);
if (eb.rel)
eb.smgr = RelationGetSmgr(eb.rel);
}
/*
* If requested, invalidate size cache, so that smgrnblocks asks the
* kernel.
*/
if (flags & EB_CLEAR_SIZE_CACHE)
eb.smgr->smgr_cached_nblocks[fork] = InvalidBlockNumber;
first_block = smgrnblocks(eb.smgr, fork);
/*
* Now that we have the accurate relation size, check if the caller wants
* us to extend to only up to a specific size. If there were concurrent
* extensions, we might have acquired too many buffers and need to release
* them.
*/
if (extend_upto != InvalidBlockNumber)
{
uint32 orig_extend_by = extend_by;
if (first_block > extend_upto)
extend_by = 0;
else if ((uint64) first_block + extend_by > extend_upto)
extend_by = extend_upto - first_block;
for (uint32 i = extend_by; i < orig_extend_by; i++)
{
BufferDesc *buf_hdr = GetBufferDescriptor(buffers[i] - 1);
/*
* The victim buffer we acquired peviously is clean and unused,
* let it be found again quickly
*/
StrategyFreeBuffer(buf_hdr);
UnpinBuffer(buf_hdr);
}
if (extend_by == 0)
{
if (!(flags & EB_SKIP_EXTENSION_LOCK))
UnlockRelationForExtension(eb.rel, ExclusiveLock);
*extended_by = extend_by;
return first_block;
}
}
/* Fail if relation is already at maximum possible length */
if ((uint64) first_block + extend_by >= MaxBlockNumber)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("cannot extend relation %s beyond %u blocks",
relpath(eb.smgr->smgr_rlocator, fork),
MaxBlockNumber)));
/*
* Insert buffers into buffer table, mark as IO_IN_PROGRESS.
*
* This needs to happen before we extend the relation, because as soon as
* we do, other backends can start to read in those pages.
*/
for (int i = 0; i < extend_by; i++)
{
Buffer victim_buf = buffers[i];
BufferDesc *victim_buf_hdr = GetBufferDescriptor(victim_buf - 1);
BufferTag tag;
uint32 hash;
LWLock *partition_lock;
int existing_id;
InitBufferTag(&tag, &eb.smgr->smgr_rlocator.locator, fork, first_block + i);
hash = BufTableHashCode(&tag);
partition_lock = BufMappingPartitionLock(hash);
LWLockAcquire(partition_lock, LW_EXCLUSIVE);
existing_id = BufTableInsert(&tag, hash, victim_buf_hdr->buf_id);
/*
* We get here only in the corner case where we are trying to extend
* the relation but we found a pre-existing buffer. This can happen
* because a prior attempt at extending the relation failed, and
* because mdread doesn't complain about reads beyond EOF (when
* zero_damaged_pages is ON) and so a previous attempt to read a block
* beyond EOF could have left a "valid" zero-filled buffer.
* Unfortunately, we have also seen this case occurring because of
* buggy Linux kernels that sometimes return an lseek(SEEK_END) result
* that doesn't account for a recent write. In that situation, the
* pre-existing buffer would contain valid data that we don't want to
* overwrite. Since the legitimate cases should always have left a
* zero-filled buffer, complain if not PageIsNew.
*/
if (existing_id >= 0)
{
BufferDesc *existing_hdr = GetBufferDescriptor(existing_id);
Block buf_block;
bool valid;
/*
* Pin the existing buffer before releasing the partition lock,
* preventing it from being evicted.
*/
valid = PinBuffer(existing_hdr, strategy);
LWLockRelease(partition_lock);
/*
* The victim buffer we acquired peviously is clean and unused,
* let it be found again quickly
*/
StrategyFreeBuffer(victim_buf_hdr);
UnpinBuffer(victim_buf_hdr);
buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
buf_block = BufHdrGetBlock(existing_hdr);
if (valid && !PageIsNew((Page) buf_block))
ereport(ERROR,
(errmsg("unexpected data beyond EOF in block %u of relation %s",
existing_hdr->tag.blockNum, relpath(eb.smgr->smgr_rlocator, fork)),
errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
/*
* We *must* do smgr[zero]extend before succeeding, else the page
* will not be reserved by the kernel, and the next P_NEW call
* will decide to return the same page. Clear the BM_VALID bit,
* do StartBufferIO() and proceed.
*
* Loop to handle the very small possibility that someone re-sets
* BM_VALID between our clearing it and StartBufferIO inspecting
* it.
*/
do
{
uint32 buf_state = LockBufHdr(existing_hdr);
buf_state &= ~BM_VALID;
UnlockBufHdr(existing_hdr, buf_state);
} while (!StartBufferIO(existing_hdr, true));
}
else
{
uint32 buf_state;
buf_state = LockBufHdr(victim_buf_hdr);
/* some sanity checks while we hold the buffer header lock */
Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 1);
victim_buf_hdr->tag = tag;
buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
if (eb.relpersistence == RELPERSISTENCE_PERMANENT || fork == INIT_FORKNUM)
buf_state |= BM_PERMANENT;
UnlockBufHdr(victim_buf_hdr, buf_state);
LWLockRelease(partition_lock);
/* XXX: could combine the locked operations in it with the above */
StartBufferIO(victim_buf_hdr, true);
}
}
/*
* Note: if smgzerorextend fails, we will end up with buffers that are
* allocated but not marked BM_VALID. The next relation extension will
* still select the same block number (because the relation didn't get any
* longer on disk) and so future attempts to extend the relation will find
* the same buffers (if they have not been recycled) but come right back
* here to try smgrzeroextend again.
*
* We don't need to set checksum for all-zero pages.
*/
smgrzeroextend(eb.smgr, fork, first_block, extend_by, false);
/*
* Release the file-extension lock; it's now OK for someone else to extend
* the relation some more.
*
* We remove IO_IN_PROGRESS after this, as waking up waiting backends can
* take noticeable time.
*/
if (!(flags & EB_SKIP_EXTENSION_LOCK))
UnlockRelationForExtension(eb.rel, ExclusiveLock);
/* Set BM_VALID, terminate IO, and wake up any waiters */
for (int i = 0; i < extend_by; i++)
{
Buffer buf = buffers[i];
BufferDesc *buf_hdr = GetBufferDescriptor(buf - 1);
bool lock = false;
if (flags & EB_LOCK_FIRST && i == 0)
lock = true;
else if (flags & EB_LOCK_TARGET)
{
Assert(extend_upto != InvalidBlockNumber);
if (first_block + i + 1 == extend_upto)
lock = true;
}
if (lock)
LWLockAcquire(BufferDescriptorGetContentLock(buf_hdr), LW_EXCLUSIVE);
TerminateBufferIO(buf_hdr, false, BM_VALID);
}
pgBufferUsage.shared_blks_written += extend_by;
pgstat_count_io_op_n(IOOBJECT_RELATION, io_context, IOOP_EXTEND,
extend_by);
*extended_by = extend_by;
return first_block;
}
/*
* MarkBufferDirty
*