1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-14 18:42:34 +03:00

Change seqscan logic so that we check visibility of all tuples on a page

when we first read the page, rather than checking them one at a time.
This allows us to take and release the buffer content lock just once
per page, instead of once per tuple.  Since it's a shared lock the
contention penalty for holding the lock longer shouldn't be too bad.
We can safely do this only when using an MVCC snapshot; else the
assumption that visibility won't change over time is uncool.  Therefore
there are now two code paths depending on the snapshot type.  I also
made the same change in nodeBitmapHeapscan.c, where it can be done always
because we only support MVCC snapshots for bitmap scans anyway.
Also make some incidental cleanups in the APIs of these functions.
Per a suggestion from Qingqing Zhou.
This commit is contained in:
Tom Lane
2005-11-26 03:03:07 +00:00
parent 290166f934
commit 70f1482de3
6 changed files with 620 additions and 278 deletions

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.203 2005/11/22 18:17:06 momjian Exp $
* $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.204 2005/11/26 03:03:07 tgl Exp $
*
*
* INTERFACE ROUTINES
@ -78,12 +78,17 @@ initscan(HeapScanDesc scan, ScanKey key)
*/
scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
scan->rs_inited = false;
scan->rs_ctup.t_data = NULL;
ItemPointerSetInvalid(&scan->rs_ctup.t_self);
scan->rs_cbuf = InvalidBuffer;
scan->rs_cblock = InvalidBlockNumber;
/* we don't have a marked position... */
ItemPointerSetInvalid(&(scan->rs_mctid));
/* page-at-a-time fields are always invalid when not rs_inited */
/*
* copy the scan key, if appropriate
*/
@ -93,79 +98,128 @@ initscan(HeapScanDesc scan, ScanKey key)
pgstat_count_heap_scan(&scan->rs_pgstat_info);
}
/*
* heapgetpage - subroutine for heapgettup()
*
* This routine reads and pins the specified page of the relation.
* In page-at-a-time mode it performs additional work, namely determining
* which tuples on the page are visible.
*/
static void
heapgetpage(HeapScanDesc scan, BlockNumber page)
{
Buffer buffer;
Snapshot snapshot;
Page dp;
int lines;
int ntup;
OffsetNumber lineoff;
ItemId lpp;
Assert(page < scan->rs_nblocks);
scan->rs_cbuf = ReleaseAndReadBuffer(scan->rs_cbuf,
scan->rs_rd,
page);
scan->rs_cblock = page;
if (!scan->rs_pageatatime)
return;
buffer = scan->rs_cbuf;
snapshot = scan->rs_snapshot;
/*
* We must hold share lock on the buffer content while examining
* tuple visibility. Afterwards, however, the tuples we have found
* to be visible are guaranteed good as long as we hold the buffer pin.
*/
LockBuffer(buffer, BUFFER_LOCK_SHARE);
dp = (Page) BufferGetPage(buffer);
lines = PageGetMaxOffsetNumber(dp);
ntup = 0;
for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
lineoff <= lines;
lineoff++, lpp++)
{
if (ItemIdIsUsed(lpp))
{
HeapTupleData loctup;
bool valid;
loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
loctup.t_len = ItemIdGetLength(lpp);
ItemPointerSet(&(loctup.t_self), page, lineoff);
valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
if (valid)
scan->rs_vistuples[ntup++] = lineoff;
}
}
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
Assert(ntup <= MaxHeapTuplesPerPage);
scan->rs_ntuples = ntup;
}
/* ----------------
* heapgettup - fetch next heap tuple
*
* routine used by heap_getnext() which does most of the
* real work in scanning tuples.
* Initialize the scan if not already done; then advance to the next
* tuple as indicated by "dir"; return the next tuple in scan->rs_ctup,
* or set scan->rs_ctup.t_data = NULL if no more tuples.
*
* The passed-in *buffer must be either InvalidBuffer or the pinned
* current page of the scan. If we have to move to another page,
* we will unpin this buffer (if valid). On return, *buffer is either
* InvalidBuffer or the ID of a pinned buffer.
* dir == 0 means "re-fetch the tuple indicated by scan->rs_ctup".
*
* Note: the reason nkeys/key are passed separately, even though they are
* kept in the scan descriptor, is that the caller may not want us to check
* the scankeys.
*
* Note: when we fall off the end of the scan in either direction, we
* reset rs_inited. This means that a further request with the same
* scan direction will restart the scan, which is a bit odd, but a
* request with the opposite scan direction will start a fresh scan
* in the proper direction. The latter is required behavior for cursors,
* while the former case is generally undefined behavior in Postgres
* so we don't care too much.
* ----------------
*/
static void
heapgettup(Relation relation,
heapgettup(HeapScanDesc scan,
int dir,
HeapTuple tuple,
Buffer *buffer,
Snapshot snapshot,
int nkeys,
ScanKey key,
BlockNumber pages)
ScanKey key)
{
ItemId lpp;
Page dp;
HeapTuple tuple = &(scan->rs_ctup);
ItemPointer tid = &(tuple->t_self);
Snapshot snapshot = scan->rs_snapshot;
BlockNumber pages = scan->rs_nblocks;
BlockNumber page;
Page dp;
int lines;
OffsetNumber lineoff;
int linesleft;
ItemPointer tid;
ItemId lpp;
tid = (tuple->t_data == NULL) ? NULL : &(tuple->t_self);
/*
* debugging stuff
*
* check validity of arguments, here and for other functions too
*
* Note: no locking manipulations needed--this is a local function
*/
#ifdef HEAPDEBUGALL
if (ItemPointerIsValid(tid))
elog(DEBUG2, "heapgettup(%s, tid=0x%x[%d,%d], dir=%d, ...)",
RelationGetRelationName(relation), tid, tid->ip_blkid,
tid->ip_posid, dir);
else
elog(DEBUG2, "heapgettup(%s, tid=0x%x, dir=%d, ...)",
RelationGetRelationName(relation), tid, dir);
elog(DEBUG2, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key);
elog(DEBUG2, "heapgettup: relation(%c)=`%s', %p",
relation->rd_rel->relkind, RelationGetRelationName(relation),
snapshot);
#endif /* HEAPDEBUGALL */
if (!ItemPointerIsValid(tid))
if (!scan->rs_inited)
{
Assert(!PointerIsValid(tid));
tid = NULL;
/*
* return null immediately if relation is empty
*/
if (pages == 0)
{
Assert(!BufferIsValid(scan->rs_cbuf));
tuple->t_data = NULL;
return;
}
}
tuple->t_tableOid = RelationGetRelid(relation);
/*
* return null immediately if relation is empty
*/
if (pages == 0)
else
{
if (BufferIsValid(*buffer))
ReleaseBuffer(*buffer);
*buffer = InvalidBuffer;
tuple->t_data = NULL;
return;
/* resuming scan from tuple indicated by scan->rs_ctup.t_self */
Assert(ItemPointerIsValid(tid));
}
/*
@ -174,30 +228,26 @@ heapgettup(Relation relation,
if (dir == 0)
{
/*
* ``no movement'' scan direction: refetch same tuple
* ``no movement'' scan direction: refetch prior tuple
*/
if (tid == NULL)
if (!scan->rs_inited)
{
if (BufferIsValid(*buffer))
ReleaseBuffer(*buffer);
*buffer = InvalidBuffer;
Assert(!BufferIsValid(scan->rs_cbuf));
tuple->t_data = NULL;
return;
}
*buffer = ReleaseAndReadBuffer(*buffer,
relation,
ItemPointerGetBlockNumber(tid));
page = ItemPointerGetBlockNumber(tid);
if (page != scan->rs_cblock)
heapgetpage(scan, page);
LockBuffer(*buffer, BUFFER_LOCK_SHARE);
dp = (Page) BufferGetPage(*buffer);
/* Since the tuple was previously fetched, needn't lock page here */
dp = (Page) BufferGetPage(scan->rs_cbuf);
lineoff = ItemPointerGetOffsetNumber(tid);
lpp = PageGetItemId(dp, lineoff);
tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
tuple->t_len = ItemIdGetLength(lpp);
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
return;
}
@ -206,28 +256,23 @@ heapgettup(Relation relation,
/*
* reverse scan direction
*/
if (tid == NULL)
{
if (!scan->rs_inited)
page = pages - 1; /* final page */
}
else
{
page = ItemPointerGetBlockNumber(tid); /* current page */
}
Assert(page < pages);
if (page != scan->rs_cblock)
heapgetpage(scan, page);
*buffer = ReleaseAndReadBuffer(*buffer,
relation,
page);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
LockBuffer(*buffer, BUFFER_LOCK_SHARE);
dp = (Page) BufferGetPage(*buffer);
dp = (Page) BufferGetPage(scan->rs_cbuf);
lines = PageGetMaxOffsetNumber(dp);
if (tid == NULL)
if (!scan->rs_inited)
{
lineoff = lines; /* final offnum */
scan->rs_inited = true;
}
else
{
@ -241,10 +286,11 @@ heapgettup(Relation relation,
/*
* forward scan direction
*/
if (tid == NULL)
if (!scan->rs_inited)
{
page = 0; /* first page */
lineoff = FirstOffsetNumber; /* first offnum */
scan->rs_inited = true;
}
else
{
@ -253,15 +299,12 @@ heapgettup(Relation relation,
OffsetNumberNext(ItemPointerGetOffsetNumber(tid));
}
Assert(page < pages);
if (page != scan->rs_cblock)
heapgetpage(scan, page);
*buffer = ReleaseAndReadBuffer(*buffer,
relation,
page);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
LockBuffer(*buffer, BUFFER_LOCK_SHARE);
dp = (Page) BufferGetPage(*buffer);
dp = (Page) BufferGetPage(scan->rs_cbuf);
lines = PageGetMaxOffsetNumber(dp);
/* page and lineoff now reference the physically next tid */
}
@ -269,22 +312,21 @@ heapgettup(Relation relation,
/* 'dir' is now non-zero */
/*
* calculate line pointer and number of remaining items to check on this
* page.
* calculate number of remaining items to check on this page
*/
lpp = PageGetItemId(dp, lineoff);
if (dir < 0)
linesleft = lineoff - 1;
linesleft = lineoff;
else
linesleft = lines - lineoff;
linesleft = lines - lineoff + 1;
/*
* advance the scan until we find a qualifying tuple or run out of stuff
* to scan
*/
lpp = PageGetItemId(dp, lineoff);
for (;;)
{
while (linesleft >= 0)
while (linesleft > 0)
{
if (ItemIdIsUsed(lpp))
{
@ -297,11 +339,17 @@ heapgettup(Relation relation,
/*
* if current tuple qualifies, return it.
*/
HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp,
snapshot, nkeys, key, valid);
valid = HeapTupleSatisfiesVisibility(tuple,
snapshot,
scan->rs_cbuf);
if (valid && key != NULL)
HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
nkeys, key, valid);
if (valid)
{
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
return;
}
}
@ -326,32 +374,31 @@ heapgettup(Relation relation,
* if we get here, it means we've exhausted the items on this page and
* it's time to move to the next.
*/
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
/*
* return NULL if we've exhausted all the pages
*/
if ((dir < 0) ? (page == 0) : (page + 1 >= pages))
{
if (BufferIsValid(*buffer))
ReleaseBuffer(*buffer);
*buffer = InvalidBuffer;
if (BufferIsValid(scan->rs_cbuf))
ReleaseBuffer(scan->rs_cbuf);
scan->rs_cbuf = InvalidBuffer;
scan->rs_cblock = InvalidBlockNumber;
tuple->t_data = NULL;
scan->rs_inited = false;
return;
}
page = (dir < 0) ? (page - 1) : (page + 1);
Assert(page < pages);
heapgetpage(scan, page);
*buffer = ReleaseAndReadBuffer(*buffer,
relation,
page);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
LockBuffer(*buffer, BUFFER_LOCK_SHARE);
dp = (Page) BufferGetPage(*buffer);
dp = (Page) BufferGetPage(scan->rs_cbuf);
lines = PageGetMaxOffsetNumber((Page) dp);
linesleft = lines - 1;
linesleft = lines;
if (dir < 0)
{
lineoff = lines;
@ -365,6 +412,233 @@ heapgettup(Relation relation,
}
}
/* ----------------
* heapgettup_pagemode - fetch next heap tuple in page-at-a-time mode
*
* Same API as heapgettup, but used in page-at-a-time mode
*
* The internal logic is much the same as heapgettup's too, but there are some
* differences: we do not take the buffer content lock (that only needs to
* happen inside heapgetpage), and we iterate through just the tuples listed
* in rs_vistuples[] rather than all tuples on the page. Notice that
* lineindex is 0-based, where the corresponding loop variable lineoff in
* heapgettup is 1-based.
* ----------------
*/
static void
heapgettup_pagemode(HeapScanDesc scan,
int dir,
int nkeys,
ScanKey key)
{
HeapTuple tuple = &(scan->rs_ctup);
ItemPointer tid = &(tuple->t_self);
BlockNumber pages = scan->rs_nblocks;
BlockNumber page;
Page dp;
int lines;
int lineindex;
OffsetNumber lineoff;
int linesleft;
ItemId lpp;
if (!scan->rs_inited)
{
/*
* return null immediately if relation is empty
*/
if (pages == 0)
{
Assert(!BufferIsValid(scan->rs_cbuf));
tuple->t_data = NULL;
return;
}
}
else
{
/* resuming scan from tuple indicated by scan->rs_ctup.t_self */
Assert(ItemPointerIsValid(tid));
}
/*
* calculate next starting lineindex, given scan direction
*/
if (dir == 0)
{
/*
* ``no movement'' scan direction: refetch prior tuple
*/
if (!scan->rs_inited)
{
Assert(!BufferIsValid(scan->rs_cbuf));
tuple->t_data = NULL;
return;
}
page = ItemPointerGetBlockNumber(tid);
if (page != scan->rs_cblock)
heapgetpage(scan, page);
/* Since the tuple was previously fetched, needn't lock page here */
dp = (Page) BufferGetPage(scan->rs_cbuf);
lineoff = ItemPointerGetOffsetNumber(tid);
lpp = PageGetItemId(dp, lineoff);
tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
tuple->t_len = ItemIdGetLength(lpp);
/* check that rs_cindex is in sync */
Assert(scan->rs_cindex < scan->rs_ntuples);
Assert(lineoff == scan->rs_vistuples[scan->rs_cindex]);
return;
}
else if (dir < 0)
{
/*
* reverse scan direction
*/
if (!scan->rs_inited)
page = pages - 1; /* final page */
else
page = ItemPointerGetBlockNumber(tid); /* current page */
if (page != scan->rs_cblock)
heapgetpage(scan, page);
dp = (Page) BufferGetPage(scan->rs_cbuf);
lines = scan->rs_ntuples;
if (!scan->rs_inited)
{
lineindex = lines - 1;
scan->rs_inited = true;
}
else
{
lineindex = scan->rs_cindex - 1;
}
/* page and lineindex now reference the previous visible tid */
}
else
{
/*
* forward scan direction
*/
if (!scan->rs_inited)
{
page = 0; /* first page */
lineindex = 0;
scan->rs_inited = true;
}
else
{
page = ItemPointerGetBlockNumber(tid); /* current page */
lineindex = scan->rs_cindex + 1;
}
if (page != scan->rs_cblock)
heapgetpage(scan, page);
dp = (Page) BufferGetPage(scan->rs_cbuf);
lines = scan->rs_ntuples;
/* page and lineindex now reference the next visible tid */
}
/* 'dir' is now non-zero */
/*
* calculate number of remaining items to check on this page
*/
if (dir < 0)
linesleft = lineindex + 1;
else
linesleft = lines - lineindex;
/*
* advance the scan until we find a qualifying tuple or run out of stuff
* to scan
*/
for (;;)
{
while (linesleft > 0)
{
lineoff = scan->rs_vistuples[lineindex];
lpp = PageGetItemId(dp, lineoff);
Assert(ItemIdIsUsed(lpp));
tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
tuple->t_len = ItemIdGetLength(lpp);
ItemPointerSet(&(tuple->t_self), page, lineoff);
/*
* if current tuple qualifies, return it.
*/
if (key != NULL)
{
bool valid;
HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
nkeys, key, valid);
if (valid)
{
scan->rs_cindex = lineindex;
return;
}
}
else
{
scan->rs_cindex = lineindex;
return;
}
/*
* otherwise move to the next item on the page
*/
--linesleft;
if (dir < 0)
{
--lineindex;
}
else
{
++lineindex;
}
}
/*
* if we get here, it means we've exhausted the items on this page and
* it's time to move to the next.
*/
/*
* return NULL if we've exhausted all the pages
*/
if ((dir < 0) ? (page == 0) : (page + 1 >= pages))
{
if (BufferIsValid(scan->rs_cbuf))
ReleaseBuffer(scan->rs_cbuf);
scan->rs_cbuf = InvalidBuffer;
scan->rs_cblock = InvalidBlockNumber;
tuple->t_data = NULL;
scan->rs_inited = false;
return;
}
page = (dir < 0) ? (page - 1) : (page + 1);
heapgetpage(scan, page);
dp = (Page) BufferGetPage(scan->rs_cbuf);
lines = scan->rs_ntuples;
linesleft = lines;
if (dir < 0)
lineindex = lines - 1;
else
lineindex = 0;
}
}
#if defined(DISABLE_COMPLEX_MACRO)
/*
@ -642,6 +916,14 @@ heap_beginscan(Relation relation, Snapshot snapshot,
scan->rs_snapshot = snapshot;
scan->rs_nkeys = nkeys;
/*
* we can use page-at-a-time mode if it's an MVCC-safe snapshot
*/
scan->rs_pageatatime = IsMVCCSnapshot(snapshot);
/* we only need to set this up once */
scan->rs_ctup.t_tableOid = RelationGetRelid(relation);
/*
* we do this here instead of in initscan() because heap_rescan also calls
* initscan() and we don't want to allocate memory again
@ -741,16 +1023,14 @@ heap_getnext(HeapScanDesc scan, ScanDirection direction)
/*
* Note: we depend here on the -1/0/1 encoding of ScanDirection.
*/
heapgettup(scan->rs_rd,
(int) direction,
&(scan->rs_ctup),
&(scan->rs_cbuf),
scan->rs_snapshot,
scan->rs_nkeys,
scan->rs_key,
scan->rs_nblocks);
if (scan->rs_pageatatime)
heapgettup_pagemode(scan, (int) direction,
scan->rs_nkeys, scan->rs_key);
else
heapgettup(scan, (int) direction,
scan->rs_nkeys, scan->rs_key);
if (scan->rs_ctup.t_data == NULL && !BufferIsValid(scan->rs_cbuf))
if (scan->rs_ctup.t_data == NULL)
{
HEAPDEBUG_2; /* heap_getnext returning EOS */
return NULL;
@ -760,13 +1040,11 @@ heap_getnext(HeapScanDesc scan, ScanDirection direction)
* if we get here it means we have a new current scan tuple, so point to
* the proper return buffer and return the tuple.
*/
HEAPDEBUG_3; /* heap_getnext returning tuple */
if (scan->rs_ctup.t_data != NULL)
pgstat_count_heap_getnext(&scan->rs_pgstat_info);
pgstat_count_heap_getnext(&scan->rs_pgstat_info);
return ((scan->rs_ctup.t_data == NULL) ? NULL : &(scan->rs_ctup));
return &(scan->rs_ctup);
}
/*
@ -903,8 +1181,7 @@ heap_release_fetch(Relation relation,
/*
* check time qualification of tuple, then release lock
*/
HeapTupleSatisfies(tuple, relation, buffer, dp,
snapshot, 0, NULL, valid);
valid = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
@ -1038,8 +1315,7 @@ heap_get_latest_tid(Relation relation,
* Check time qualification of tuple; if visible, set it as the new
* result candidate.
*/
HeapTupleSatisfies(&tp, relation, buffer, dp,
snapshot, 0, NULL, valid);
valid = HeapTupleSatisfiesVisibility(&tp, snapshot, buffer);
if (valid)
*tid = ctid;
@ -2439,7 +2715,11 @@ heap_markpos(HeapScanDesc scan)
/* Note: no locking manipulations needed */
if (scan->rs_ctup.t_data != NULL)
{
scan->rs_mctid = scan->rs_ctup.t_self;
if (scan->rs_pageatatime)
scan->rs_mindex = scan->rs_cindex;
}
else
ItemPointerSetInvalid(&scan->rs_mctid);
}
@ -2453,31 +2733,38 @@ heap_restrpos(HeapScanDesc scan)
{
/* XXX no amrestrpos checking that ammarkpos called */
/* Note: no locking manipulations needed */
/*
* unpin scan buffers
*/
if (BufferIsValid(scan->rs_cbuf))
ReleaseBuffer(scan->rs_cbuf);
scan->rs_cbuf = InvalidBuffer;
if (!ItemPointerIsValid(&scan->rs_mctid))
{
scan->rs_ctup.t_data = NULL;
/*
* unpin scan buffers
*/
if (BufferIsValid(scan->rs_cbuf))
ReleaseBuffer(scan->rs_cbuf);
scan->rs_cbuf = InvalidBuffer;
scan->rs_cblock = InvalidBlockNumber;
}
else
{
/*
* If we reached end of scan, rs_inited will now be false. We must
* reset it to true to keep heapgettup from doing the wrong thing.
*/
scan->rs_inited = true;
scan->rs_ctup.t_self = scan->rs_mctid;
scan->rs_ctup.t_data = (HeapTupleHeader) 0x1; /* for heapgettup */
heapgettup(scan->rs_rd,
0,
&(scan->rs_ctup),
&(scan->rs_cbuf),
scan->rs_snapshot,
0,
NULL,
scan->rs_nblocks);
if (scan->rs_pageatatime)
{
scan->rs_cindex = scan->rs_mindex;
heapgettup_pagemode(scan,
0, /* "no movement" */
0, /* needn't recheck scan keys */
NULL);
}
else
heapgettup(scan,
0, /* "no movement" */
0, /* needn't recheck scan keys */
NULL);
}
}