1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-12 05:01:15 +03:00

tableam: Add and use scan APIs.

Too allow table accesses to be not directly dependent on heap, several
new abstractions are needed. Specifically:

1) Heap scans need to be generalized into table scans. Do this by
   introducing TableScanDesc, which will be the "base class" for
   individual AMs. This contains the AM independent fields from
   HeapScanDesc.

   The previous heap_{beginscan,rescan,endscan} et al. have been
   replaced with a table_ version.

   There's no direct replacement for heap_getnext(), as that returned
   a HeapTuple, which is undesirable for a other AMs. Instead there's
   table_scan_getnextslot().  But note that heap_getnext() lives on,
   it's still used widely to access catalog tables.

   This is achieved by new scan_begin, scan_end, scan_rescan,
   scan_getnextslot callbacks.

2) The portion of parallel scans that's shared between backends need
   to be able to do so without the user doing per-AM work. To achieve
   that new parallelscan_{estimate, initialize, reinitialize}
   callbacks are introduced, which operate on a new
   ParallelTableScanDesc, which again can be subclassed by AMs.

   As it is likely that several AMs are going to be block oriented,
   block oriented callbacks that can be shared between such AMs are
   provided and used by heap. table_block_parallelscan_{estimate,
   intiialize, reinitialize} as callbacks, and
   table_block_parallelscan_{nextpage, init} for use in AMs. These
   operate on a ParallelBlockTableScanDesc.

3) Index scans need to be able to access tables to return a tuple, and
   there needs to be state across individual accesses to the heap to
   store state like buffers. That's now handled by introducing a
   sort-of-scan IndexFetchTable, which again is intended to be
   subclassed by individual AMs (for heap IndexFetchHeap).

   The relevant callbacks for an AM are index_fetch_{end, begin,
   reset} to create the necessary state, and index_fetch_tuple to
   retrieve an indexed tuple.  Note that index_fetch_tuple
   implementations need to be smarter than just blindly fetching the
   tuples for AMs that have optimizations similar to heap's HOT - the
   currently alive tuple in the update chain needs to be fetched if
   appropriate.

   Similar to table_scan_getnextslot(), it's undesirable to continue
   to return HeapTuples. Thus index_fetch_heap (might want to rename
   that later) now accepts a slot as an argument. Core code doesn't
   have a lot of call sites performing index scans without going
   through the systable_* API (in contrast to loads of heap_getnext
   calls and working directly with HeapTuples).

   Index scans now store the result of a search in
   IndexScanDesc->xs_heaptid, rather than xs_ctup->t_self. As the
   target is not generally a HeapTuple anymore that seems cleaner.

To be able to sensible adapt code to use the above, two further
callbacks have been introduced:

a) slot_callbacks returns a TupleTableSlotOps* suitable for creating
   slots capable of holding a tuple of the AMs
   type. table_slot_callbacks() and table_slot_create() are based
   upon that, but have additional logic to deal with views, foreign
   tables, etc.

   While this change could have been done separately, nearly all the
   call sites that needed to be adapted for the rest of this commit
   also would have been needed to be adapted for
   table_slot_callbacks(), making separation not worthwhile.

b) tuple_satisfies_snapshot checks whether the tuple in a slot is
   currently visible according to a snapshot. That's required as a few
   places now don't have a buffer + HeapTuple around, but a
   slot (which in heap's case internally has that information).

Additionally a few infrastructure changes were needed:

I) SysScanDesc, as used by systable_{beginscan, getnext} et al. now
   internally uses a slot to keep track of tuples. While
   systable_getnext() still returns HeapTuples, and will so for the
   foreseeable future, the index API (see 1) above) now only deals with
   slots.

The remainder, and largest part, of this commit is then adjusting all
scans in postgres to use the new APIs.

Author: Andres Freund, Haribabu Kommi, Alvaro Herrera
Discussion:
    https://postgr.es/m/20180703070645.wchpu5muyto5n647@alap3.anarazel.de
    https://postgr.es/m/20160812231527.GA690404@alvherre.pgsql
This commit is contained in:
Andres Freund
2019-03-11 12:46:41 -07:00
parent a478415281
commit c2fe139c20
63 changed files with 2030 additions and 1265 deletions

View File

@@ -561,7 +561,7 @@ getNextNearest(IndexScanDesc scan)
if (GISTSearchItemIsHeap(*item))
{
/* found a heap item at currently minimal distance */
scan->xs_ctup.t_self = item->data.heap.heapPtr;
scan->xs_heaptid = item->data.heap.heapPtr;
scan->xs_recheck = item->data.heap.recheck;
index_store_float8_orderby_distances(scan, so->orderByTypes,
@@ -650,7 +650,7 @@ gistgettuple(IndexScanDesc scan, ScanDirection dir)
so->pageData[so->curPageData - 1].offnum;
}
/* continuing to return tuples from a leaf page */
scan->xs_ctup.t_self = so->pageData[so->curPageData].heapPtr;
scan->xs_heaptid = so->pageData[so->curPageData].heapPtr;
scan->xs_recheck = so->pageData[so->curPageData].recheck;
/* in an index-only scan, also return the reconstructed tuple */

View File

@@ -119,7 +119,7 @@ _hash_next(IndexScanDesc scan, ScanDirection dir)
/* OK, itemIndex says what to return */
currItem = &so->currPos.items[so->currPos.itemIndex];
scan->xs_ctup.t_self = currItem->heapTid;
scan->xs_heaptid = currItem->heapTid;
return true;
}
@@ -432,7 +432,7 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
/* OK, itemIndex says what to return */
currItem = &so->currPos.items[so->currPos.itemIndex];
scan->xs_ctup.t_self = currItem->heapTid;
scan->xs_heaptid = currItem->heapTid;
/* if we're here, _hash_readpage found a valid tuples */
return true;

File diff suppressed because it is too large Load Diff

View File

@@ -19,15 +19,181 @@
*/
#include "postgres.h"
#include "access/heapam.h"
#include "access/tableam.h"
#include "storage/bufmgr.h"
#include "utils/builtins.h"
static const TableAmRoutine heapam_methods;
/* ------------------------------------------------------------------------
* Slot related callbacks for heap AM
* ------------------------------------------------------------------------
*/
static const TupleTableSlotOps *
heapam_slot_callbacks(Relation relation)
{
return &TTSOpsBufferHeapTuple;
}
/* ------------------------------------------------------------------------
* Index Scan Callbacks for heap AM
* ------------------------------------------------------------------------
*/
static IndexFetchTableData *
heapam_index_fetch_begin(Relation rel)
{
IndexFetchHeapData *hscan = palloc0(sizeof(IndexFetchHeapData));
hscan->xs_base.rel = rel;
hscan->xs_cbuf = InvalidBuffer;
return &hscan->xs_base;
}
static void
heapam_index_fetch_reset(IndexFetchTableData *scan)
{
IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan;
if (BufferIsValid(hscan->xs_cbuf))
{
ReleaseBuffer(hscan->xs_cbuf);
hscan->xs_cbuf = InvalidBuffer;
}
}
static void
heapam_index_fetch_end(IndexFetchTableData *scan)
{
IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan;
heapam_index_fetch_reset(scan);
pfree(hscan);
}
static bool
heapam_index_fetch_tuple(struct IndexFetchTableData *scan,
ItemPointer tid,
Snapshot snapshot,
TupleTableSlot *slot,
bool *call_again, bool *all_dead)
{
IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan;
BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
bool got_heap_tuple;
Assert(TTS_IS_BUFFERTUPLE(slot));
/* We can skip the buffer-switching logic if we're in mid-HOT chain. */
if (!*call_again)
{
/* Switch to correct buffer if we don't have it already */
Buffer prev_buf = hscan->xs_cbuf;
hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf,
hscan->xs_base.rel,
ItemPointerGetBlockNumber(tid));
/*
* Prune page, but only if we weren't already on this page
*/
if (prev_buf != hscan->xs_cbuf)
heap_page_prune_opt(hscan->xs_base.rel, hscan->xs_cbuf);
}
/* Obtain share-lock on the buffer so we can examine visibility */
LockBuffer(hscan->xs_cbuf, BUFFER_LOCK_SHARE);
got_heap_tuple = heap_hot_search_buffer(tid,
hscan->xs_base.rel,
hscan->xs_cbuf,
snapshot,
&bslot->base.tupdata,
all_dead,
!*call_again);
bslot->base.tupdata.t_self = *tid;
LockBuffer(hscan->xs_cbuf, BUFFER_LOCK_UNLOCK);
if (got_heap_tuple)
{
/*
* Only in a non-MVCC snapshot can more than one member of the HOT
* chain be visible.
*/
*call_again = !IsMVCCSnapshot(snapshot);
slot->tts_tableOid = RelationGetRelid(scan->rel);
ExecStoreBufferHeapTuple(&bslot->base.tupdata, slot, hscan->xs_cbuf);
}
else
{
/* We've reached the end of the HOT chain. */
*call_again = false;
}
return got_heap_tuple;
}
/* ------------------------------------------------------------------------
* Callbacks for non-modifying operations on individual tuples for heap AM
* ------------------------------------------------------------------------
*/
static bool
heapam_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot,
Snapshot snapshot)
{
BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
bool res;
Assert(TTS_IS_BUFFERTUPLE(slot));
Assert(BufferIsValid(bslot->buffer));
/*
* We need buffer pin and lock to call HeapTupleSatisfiesVisibility.
* Caller should be holding pin, but not lock.
*/
LockBuffer(bslot->buffer, BUFFER_LOCK_SHARE);
res = HeapTupleSatisfiesVisibility(bslot->base.tuple, snapshot,
bslot->buffer);
LockBuffer(bslot->buffer, BUFFER_LOCK_UNLOCK);
return res;
}
/* ------------------------------------------------------------------------
* Definition of the heap table access method.
* ------------------------------------------------------------------------
*/
static const TableAmRoutine heapam_methods = {
.type = T_TableAmRoutine,
.slot_callbacks = heapam_slot_callbacks,
.scan_begin = heap_beginscan,
.scan_end = heap_endscan,
.scan_rescan = heap_rescan,
.scan_getnextslot = heap_getnextslot,
.parallelscan_estimate = table_block_parallelscan_estimate,
.parallelscan_initialize = table_block_parallelscan_initialize,
.parallelscan_reinitialize = table_block_parallelscan_reinitialize,
.index_fetch_begin = heapam_index_fetch_begin,
.index_fetch_reset = heapam_index_fetch_reset,
.index_fetch_end = heapam_index_fetch_end,
.index_fetch_tuple = heapam_index_fetch_tuple,
.tuple_satisfies_snapshot = heapam_tuple_satisfies_snapshot,
};

View File

@@ -22,6 +22,7 @@
#include "access/genam.h"
#include "access/heapam.h"
#include "access/relscan.h"
#include "access/tableam.h"
#include "access/transam.h"
#include "catalog/index.h"
#include "lib/stringinfo.h"
@@ -83,6 +84,7 @@ RelationGetIndexScan(Relation indexRelation, int nkeys, int norderbys)
scan = (IndexScanDesc) palloc(sizeof(IndexScanDescData));
scan->heapRelation = NULL; /* may be set later */
scan->xs_heapfetch = NULL;
scan->indexRelation = indexRelation;
scan->xs_snapshot = InvalidSnapshot; /* caller must initialize this */
scan->numberOfKeys = nkeys;
@@ -123,11 +125,6 @@ RelationGetIndexScan(Relation indexRelation, int nkeys, int norderbys)
scan->xs_hitup = NULL;
scan->xs_hitupdesc = NULL;
ItemPointerSetInvalid(&scan->xs_ctup.t_self);
scan->xs_ctup.t_data = NULL;
scan->xs_cbuf = InvalidBuffer;
scan->xs_continue_hot = false;
return scan;
}
@@ -335,6 +332,7 @@ systable_beginscan(Relation heapRelation,
sysscan->heap_rel = heapRelation;
sysscan->irel = irel;
sysscan->slot = table_slot_create(heapRelation, NULL);
if (snapshot == NULL)
{
@@ -384,9 +382,9 @@ systable_beginscan(Relation heapRelation,
* disadvantage; and there are no compensating advantages, because
* it's unlikely that such scans will occur in parallel.
*/
sysscan->scan = heap_beginscan_strat(heapRelation, snapshot,
nkeys, key,
true, false);
sysscan->scan = table_beginscan_strat(heapRelation, snapshot,
nkeys, key,
true, false);
sysscan->iscan = NULL;
}
@@ -401,28 +399,46 @@ systable_beginscan(Relation heapRelation,
* Note that returned tuple is a reference to data in a disk buffer;
* it must not be modified, and should be presumed inaccessible after
* next getnext() or endscan() call.
*
* XXX: It'd probably make sense to offer a slot based interface, at least
* optionally.
*/
HeapTuple
systable_getnext(SysScanDesc sysscan)
{
HeapTuple htup;
HeapTuple htup = NULL;
if (sysscan->irel)
{
htup = index_getnext(sysscan->iscan, ForwardScanDirection);
if (index_getnext_slot(sysscan->iscan, ForwardScanDirection, sysscan->slot))
{
bool shouldFree;
/*
* We currently don't need to support lossy index operators for any
* system catalog scan. It could be done here, using the scan keys to
* drive the operator calls, if we arranged to save the heap attnums
* during systable_beginscan(); this is practical because we still
* wouldn't need to support indexes on expressions.
*/
if (htup && sysscan->iscan->xs_recheck)
elog(ERROR, "system catalog scans with lossy index conditions are not implemented");
htup = ExecFetchSlotHeapTuple(sysscan->slot, false, &shouldFree);
Assert(!shouldFree);
/*
* We currently don't need to support lossy index operators for
* any system catalog scan. It could be done here, using the scan
* keys to drive the operator calls, if we arranged to save the
* heap attnums during systable_beginscan(); this is practical
* because we still wouldn't need to support indexes on
* expressions.
*/
if (sysscan->iscan->xs_recheck)
elog(ERROR, "system catalog scans with lossy index conditions are not implemented");
}
}
else
htup = heap_getnext(sysscan->scan, ForwardScanDirection);
{
if (table_scan_getnextslot(sysscan->scan, ForwardScanDirection, sysscan->slot))
{
bool shouldFree;
htup = ExecFetchSlotHeapTuple(sysscan->slot, false, &shouldFree);
Assert(!shouldFree);
}
}
return htup;
}
@@ -446,37 +462,20 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup)
Snapshot freshsnap;
bool result;
Assert(tup == ExecFetchSlotHeapTuple(sysscan->slot, false, NULL));
/*
* Trust that LockBuffer() and HeapTupleSatisfiesMVCC() do not themselves
* Trust that table_tuple_satisfies_snapshot() and its subsidiaries
* (commonly LockBuffer() and HeapTupleSatisfiesMVCC()) do not themselves
* acquire snapshots, so we need not register the snapshot. Those
* facilities are too low-level to have any business scanning tables.
*/
freshsnap = GetCatalogSnapshot(RelationGetRelid(sysscan->heap_rel));
if (sysscan->irel)
{
IndexScanDesc scan = sysscan->iscan;
result = table_tuple_satisfies_snapshot(sysscan->heap_rel,
sysscan->slot,
freshsnap);
Assert(IsMVCCSnapshot(scan->xs_snapshot));
Assert(tup == &scan->xs_ctup);
Assert(BufferIsValid(scan->xs_cbuf));
/* must hold a buffer lock to call HeapTupleSatisfiesVisibility */
LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE);
result = HeapTupleSatisfiesVisibility(tup, freshsnap, scan->xs_cbuf);
LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
}
else
{
HeapScanDesc scan = sysscan->scan;
Assert(IsMVCCSnapshot(scan->rs_snapshot));
Assert(tup == &scan->rs_ctup);
Assert(BufferIsValid(scan->rs_cbuf));
/* must hold a buffer lock to call HeapTupleSatisfiesVisibility */
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
result = HeapTupleSatisfiesVisibility(tup, freshsnap, scan->rs_cbuf);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
}
return result;
}
@@ -488,13 +487,19 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup)
void
systable_endscan(SysScanDesc sysscan)
{
if (sysscan->slot)
{
ExecDropSingleTupleTableSlot(sysscan->slot);
sysscan->slot = NULL;
}
if (sysscan->irel)
{
index_endscan(sysscan->iscan);
index_close(sysscan->irel, AccessShareLock);
}
else
heap_endscan(sysscan->scan);
table_endscan(sysscan->scan);
if (sysscan->snapshot)
UnregisterSnapshot(sysscan->snapshot);
@@ -541,6 +546,7 @@ systable_beginscan_ordered(Relation heapRelation,
sysscan->heap_rel = heapRelation;
sysscan->irel = indexRelation;
sysscan->slot = table_slot_create(heapRelation, NULL);
if (snapshot == NULL)
{
@@ -586,10 +592,12 @@ systable_beginscan_ordered(Relation heapRelation,
HeapTuple
systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction)
{
HeapTuple htup;
HeapTuple htup = NULL;
Assert(sysscan->irel);
htup = index_getnext(sysscan->iscan, direction);
if (index_getnext_slot(sysscan->iscan, direction, sysscan->slot))
htup = ExecFetchSlotHeapTuple(sysscan->slot, false, NULL);
/* See notes in systable_getnext */
if (htup && sysscan->iscan->xs_recheck)
elog(ERROR, "system catalog scans with lossy index conditions are not implemented");
@@ -603,6 +611,12 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction)
void
systable_endscan_ordered(SysScanDesc sysscan)
{
if (sysscan->slot)
{
ExecDropSingleTupleTableSlot(sysscan->slot);
sysscan->slot = NULL;
}
Assert(sysscan->irel);
index_endscan(sysscan->iscan);
if (sysscan->snapshot)

View File

@@ -72,6 +72,7 @@
#include "access/amapi.h"
#include "access/heapam.h"
#include "access/relscan.h"
#include "access/tableam.h"
#include "access/transam.h"
#include "access/xlog.h"
#include "catalog/index.h"
@@ -235,6 +236,9 @@ index_beginscan(Relation heapRelation,
scan->heapRelation = heapRelation;
scan->xs_snapshot = snapshot;
/* prepare to fetch index matches from table */
scan->xs_heapfetch = table_index_fetch_begin(heapRelation);
return scan;
}
@@ -318,16 +322,12 @@ index_rescan(IndexScanDesc scan,
Assert(nkeys == scan->numberOfKeys);
Assert(norderbys == scan->numberOfOrderBys);
/* Release any held pin on a heap page */
if (BufferIsValid(scan->xs_cbuf))
{
ReleaseBuffer(scan->xs_cbuf);
scan->xs_cbuf = InvalidBuffer;
}
scan->xs_continue_hot = false;
/* Release resources (like buffer pins) from table accesses */
if (scan->xs_heapfetch)
table_index_fetch_reset(scan->xs_heapfetch);
scan->kill_prior_tuple = false; /* for safety */
scan->xs_heap_continue = false;
scan->indexRelation->rd_indam->amrescan(scan, keys, nkeys,
orderbys, norderbys);
@@ -343,11 +343,11 @@ index_endscan(IndexScanDesc scan)
SCAN_CHECKS;
CHECK_SCAN_PROCEDURE(amendscan);
/* Release any held pin on a heap page */
if (BufferIsValid(scan->xs_cbuf))
/* Release resources (like buffer pins) from table accesses */
if (scan->xs_heapfetch)
{
ReleaseBuffer(scan->xs_cbuf);
scan->xs_cbuf = InvalidBuffer;
table_index_fetch_end(scan->xs_heapfetch);
scan->xs_heapfetch = NULL;
}
/* End the AM's scan */
@@ -379,17 +379,16 @@ index_markpos(IndexScanDesc scan)
/* ----------------
* index_restrpos - restore a scan position
*
* NOTE: this only restores the internal scan state of the index AM.
* The current result tuple (scan->xs_ctup) doesn't change. See comments
* for ExecRestrPos().
* NOTE: this only restores the internal scan state of the index AM. See
* comments for ExecRestrPos().
*
* NOTE: in the presence of HOT chains, mark/restore only works correctly
* if the scan's snapshot is MVCC-safe; that ensures that there's at most one
* returnable tuple in each HOT chain, and so restoring the prior state at the
* granularity of the index AM is sufficient. Since the only current user
* of mark/restore functionality is nodeMergejoin.c, this effectively means
* that merge-join plans only work for MVCC snapshots. This could be fixed
* if necessary, but for now it seems unimportant.
* NOTE: For heap, in the presence of HOT chains, mark/restore only works
* correctly if the scan's snapshot is MVCC-safe; that ensures that there's at
* most one returnable tuple in each HOT chain, and so restoring the prior
* state at the granularity of the index AM is sufficient. Since the only
* current user of mark/restore functionality is nodeMergejoin.c, this
* effectively means that merge-join plans only work for MVCC snapshots. This
* could be fixed if necessary, but for now it seems unimportant.
* ----------------
*/
void
@@ -400,9 +399,12 @@ index_restrpos(IndexScanDesc scan)
SCAN_CHECKS;
CHECK_SCAN_PROCEDURE(amrestrpos);
scan->xs_continue_hot = false;
/* release resources (like buffer pins) from table accesses */
if (scan->xs_heapfetch)
table_index_fetch_reset(scan->xs_heapfetch);
scan->kill_prior_tuple = false; /* for safety */
scan->xs_heap_continue = false;
scan->indexRelation->rd_indam->amrestrpos(scan);
}
@@ -483,6 +485,9 @@ index_parallelrescan(IndexScanDesc scan)
{
SCAN_CHECKS;
if (scan->xs_heapfetch)
table_index_fetch_reset(scan->xs_heapfetch);
/* amparallelrescan is optional; assume no-op if not provided by AM */
if (scan->indexRelation->rd_indam->amparallelrescan != NULL)
scan->indexRelation->rd_indam->amparallelrescan(scan);
@@ -513,6 +518,9 @@ index_beginscan_parallel(Relation heaprel, Relation indexrel, int nkeys,
scan->heapRelation = heaprel;
scan->xs_snapshot = snapshot;
/* prepare to fetch index matches from table */
scan->xs_heapfetch = table_index_fetch_begin(heaprel);
return scan;
}
@@ -535,7 +543,7 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
/*
* The AM's amgettuple proc finds the next index entry matching the scan
* keys, and puts the TID into scan->xs_ctup.t_self. It should also set
* keys, and puts the TID into scan->xs_heaptid. It should also set
* scan->xs_recheck and possibly scan->xs_itup/scan->xs_hitup, though we
* pay no attention to those fields here.
*/
@@ -543,23 +551,23 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
/* Reset kill flag immediately for safety */
scan->kill_prior_tuple = false;
scan->xs_heap_continue = false;
/* If we're out of index entries, we're done */
if (!found)
{
/* ... but first, release any held pin on a heap page */
if (BufferIsValid(scan->xs_cbuf))
{
ReleaseBuffer(scan->xs_cbuf);
scan->xs_cbuf = InvalidBuffer;
}
/* release resources (like buffer pins) from table accesses */
if (scan->xs_heapfetch)
table_index_fetch_reset(scan->xs_heapfetch);
return NULL;
}
Assert(ItemPointerIsValid(&scan->xs_heaptid));
pgstat_count_index_tuples(scan->indexRelation, 1);
/* Return the TID of the tuple we found. */
return &scan->xs_ctup.t_self;
return &scan->xs_heaptid;
}
/* ----------------
@@ -580,53 +588,18 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
* enough information to do it efficiently in the general case.
* ----------------
*/
HeapTuple
index_fetch_heap(IndexScanDesc scan)
bool
index_fetch_heap(IndexScanDesc scan, TupleTableSlot *slot)
{
ItemPointer tid = &scan->xs_ctup.t_self;
bool all_dead = false;
bool got_heap_tuple;
bool found;
/* We can skip the buffer-switching logic if we're in mid-HOT chain. */
if (!scan->xs_continue_hot)
{
/* Switch to correct buffer if we don't have it already */
Buffer prev_buf = scan->xs_cbuf;
found = table_index_fetch_tuple(scan->xs_heapfetch, &scan->xs_heaptid,
scan->xs_snapshot, slot,
&scan->xs_heap_continue, &all_dead);
scan->xs_cbuf = ReleaseAndReadBuffer(scan->xs_cbuf,
scan->heapRelation,
ItemPointerGetBlockNumber(tid));
/*
* Prune page, but only if we weren't already on this page
*/
if (prev_buf != scan->xs_cbuf)
heap_page_prune_opt(scan->heapRelation, scan->xs_cbuf);
}
/* Obtain share-lock on the buffer so we can examine visibility */
LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE);
got_heap_tuple = heap_hot_search_buffer(tid, scan->heapRelation,
scan->xs_cbuf,
scan->xs_snapshot,
&scan->xs_ctup,
&all_dead,
!scan->xs_continue_hot);
LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
if (got_heap_tuple)
{
/*
* Only in a non-MVCC snapshot can more than one member of the HOT
* chain be visible.
*/
scan->xs_continue_hot = !IsMVCCSnapshot(scan->xs_snapshot);
if (found)
pgstat_count_heap_fetch(scan->indexRelation);
return &scan->xs_ctup;
}
/* We've reached the end of the HOT chain. */
scan->xs_continue_hot = false;
/*
* If we scanned a whole HOT chain and found only dead tuples, tell index
@@ -638,17 +611,17 @@ index_fetch_heap(IndexScanDesc scan)
if (!scan->xactStartedInRecovery)
scan->kill_prior_tuple = all_dead;
return NULL;
return found;
}
/* ----------------
* index_getnext - get the next heap tuple from a scan
* index_getnext_slot - get the next tuple from a scan
*
* The result is the next heap tuple satisfying the scan keys and the
* snapshot, or NULL if no more matching tuples exist.
* The result is true if a tuple satisfying the scan keys and the snapshot was
* found, false otherwise. The tuple is stored in the specified slot.
*
* On success, the buffer containing the heap tup is pinned (the pin will be
* dropped in a future index_getnext_tid, index_fetch_heap or index_endscan
* On success, resources (like buffer pins) are likely to be held, and will be
* dropped by a future index_getnext_tid, index_fetch_heap or index_endscan
* call).
*
* Note: caller must check scan->xs_recheck, and perform rechecking of the
@@ -656,32 +629,23 @@ index_fetch_heap(IndexScanDesc scan)
* enough information to do it efficiently in the general case.
* ----------------
*/
HeapTuple
index_getnext(IndexScanDesc scan, ScanDirection direction)
bool
index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot)
{
HeapTuple heapTuple;
ItemPointer tid;
for (;;)
{
if (scan->xs_continue_hot)
{
/*
* We are resuming scan of a HOT chain after having returned an
* earlier member. Must still hold pin on current heap page.
*/
Assert(BufferIsValid(scan->xs_cbuf));
Assert(ItemPointerGetBlockNumber(&scan->xs_ctup.t_self) ==
BufferGetBlockNumber(scan->xs_cbuf));
}
else
if (!scan->xs_heap_continue)
{
ItemPointer tid;
/* Time to fetch the next TID from the index */
tid = index_getnext_tid(scan, direction);
/* If we're out of index entries, we're done */
if (tid == NULL)
break;
Assert(ItemPointerEquals(tid, &scan->xs_heaptid));
}
/*
@@ -689,12 +653,12 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
* If we don't find anything, loop around and grab the next TID from
* the index.
*/
heapTuple = index_fetch_heap(scan);
if (heapTuple != NULL)
return heapTuple;
Assert(ItemPointerIsValid(&scan->xs_heaptid));
if (index_fetch_heap(scan, slot))
return true;
}
return NULL; /* failure exit */
return false;
}
/* ----------------

View File

@@ -310,7 +310,7 @@ btgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
if (_bt_first(scan, ForwardScanDirection))
{
/* Save tuple ID, and continue scanning */
heapTid = &scan->xs_ctup.t_self;
heapTid = &scan->xs_heaptid;
tbm_add_tuples(tbm, heapTid, 1, false);
ntids++;

View File

@@ -1135,7 +1135,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
readcomplete:
/* OK, itemIndex says what to return */
currItem = &so->currPos.items[so->currPos.itemIndex];
scan->xs_ctup.t_self = currItem->heapTid;
scan->xs_heaptid = currItem->heapTid;
if (scan->xs_want_itup)
scan->xs_itup = (IndexTuple) (so->currTuples + currItem->tupleOffset);
@@ -1185,7 +1185,7 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
/* OK, itemIndex says what to return */
currItem = &so->currPos.items[so->currPos.itemIndex];
scan->xs_ctup.t_self = currItem->heapTid;
scan->xs_heaptid = currItem->heapTid;
if (scan->xs_want_itup)
scan->xs_itup = (IndexTuple) (so->currTuples + currItem->tupleOffset);
@@ -1964,7 +1964,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
/* OK, itemIndex says what to return */
currItem = &so->currPos.items[so->currPos.itemIndex];
scan->xs_ctup.t_self = currItem->heapTid;
scan->xs_heaptid = currItem->heapTid;
if (scan->xs_want_itup)
scan->xs_itup = (IndexTuple) (so->currTuples + currItem->tupleOffset);

View File

@@ -61,6 +61,7 @@
#include "access/nbtree.h"
#include "access/parallel.h"
#include "access/relscan.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "access/xloginsert.h"
@@ -158,9 +159,9 @@ typedef struct BTShared
/*
* This variable-sized field must come last.
*
* See _bt_parallel_estimate_shared() and heap_parallelscan_estimate().
* See _bt_parallel_estimate_shared() and table_parallelscan_estimate().
*/
ParallelHeapScanDescData heapdesc;
ParallelTableScanDescData heapdesc;
} BTShared;
/*
@@ -282,7 +283,7 @@ static void _bt_load(BTWriteState *wstate,
static void _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent,
int request);
static void _bt_end_parallel(BTLeader *btleader);
static Size _bt_parallel_estimate_shared(Snapshot snapshot);
static Size _bt_parallel_estimate_shared(Relation heap, Snapshot snapshot);
static double _bt_parallel_heapscan(BTBuildState *buildstate,
bool *brokenhotchain);
static void _bt_leader_participate_as_worker(BTBuildState *buildstate);
@@ -1275,7 +1276,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
* Estimate size for our own PARALLEL_KEY_BTREE_SHARED workspace, and
* PARALLEL_KEY_TUPLESORT tuplesort workspace
*/
estbtshared = _bt_parallel_estimate_shared(snapshot);
estbtshared = _bt_parallel_estimate_shared(btspool->heap, snapshot);
shm_toc_estimate_chunk(&pcxt->estimator, estbtshared);
estsort = tuplesort_estimate_shared(scantuplesortstates);
shm_toc_estimate_chunk(&pcxt->estimator, estsort);
@@ -1316,7 +1317,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
btshared->havedead = false;
btshared->indtuples = 0.0;
btshared->brokenhotchain = false;
heap_parallelscan_initialize(&btshared->heapdesc, btspool->heap, snapshot);
table_parallelscan_initialize(btspool->heap, &btshared->heapdesc,
snapshot);
/*
* Store shared tuplesort-private state, for which we reserved space.
@@ -1403,10 +1405,10 @@ _bt_end_parallel(BTLeader *btleader)
* btree index build based on the snapshot its parallel scan will use.
*/
static Size
_bt_parallel_estimate_shared(Snapshot snapshot)
_bt_parallel_estimate_shared(Relation heap, Snapshot snapshot)
{
return add_size(offsetof(BTShared, heapdesc),
heap_parallelscan_estimate(snapshot));
table_parallelscan_estimate(heap, snapshot));
}
/*
@@ -1617,7 +1619,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
{
SortCoordinate coordinate;
BTBuildState buildstate;
HeapScanDesc scan;
TableScanDesc scan;
double reltuples;
IndexInfo *indexInfo;
@@ -1670,7 +1672,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
/* Join parallel scan */
indexInfo = BuildIndexInfo(btspool->index);
indexInfo->ii_Concurrent = btshared->isconcurrent;
scan = heap_beginscan_parallel(btspool->heap, &btshared->heapdesc);
scan = table_beginscan_parallel(btspool->heap, &btshared->heapdesc);
reltuples = IndexBuildHeapScan(btspool->heap, btspool->index, indexInfo,
true, _bt_build_callback,
(void *) &buildstate, scan);

View File

@@ -927,7 +927,7 @@ spggettuple(IndexScanDesc scan, ScanDirection dir)
if (so->iPtr < so->nPtrs)
{
/* continuing to return reported tuples */
scan->xs_ctup.t_self = so->heapPtrs[so->iPtr];
scan->xs_heaptid = so->heapPtrs[so->iPtr];
scan->xs_recheck = so->recheck[so->iPtr];
scan->xs_hitup = so->reconTups[so->iPtr];

View File

@@ -6,13 +6,304 @@
* Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/backend/access/table/tableam.c
*
* IDENTIFICATION
* src/backend/access/table/tableam.c
*
* NOTES
* Note that most function in here are documented in tableam.h, rather than
* here. That's because there's a lot of inline functions in tableam.h and
* it'd be harder to understand if one constantly had to switch between files.
*
*----------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/heapam.h" /* for ss_* */
#include "access/tableam.h"
#include "access/xact.h"
#include "storage/bufmgr.h"
#include "storage/shmem.h"
/* GUC variables */
char *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD;
bool synchronize_seqscans = true;
/* ----------------------------------------------------------------------------
* Slot functions.
* ----------------------------------------------------------------------------
*/
const TupleTableSlotOps *
table_slot_callbacks(Relation relation)
{
const TupleTableSlotOps *tts_cb;
if (relation->rd_tableam)
tts_cb = relation->rd_tableam->slot_callbacks(relation);
else if (relation->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
{
/*
* Historically FDWs expect to store heap tuples in slots. Continue
* handing them one, to make it less painful to adapt FDWs to new
* versions. The cost of a heap slot over a virtual slot is pretty
* small.
*/
tts_cb = &TTSOpsHeapTuple;
}
else
{
/*
* These need to be supported, as some parts of the code (like COPY)
* need to create slots for such relations too. It seems better to
* centralize the knowledge that a heap slot is the right thing in
* that case here.
*/
Assert(relation->rd_rel->relkind == RELKIND_VIEW ||
relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
tts_cb = &TTSOpsVirtual;
}
return tts_cb;
}
TupleTableSlot *
table_slot_create(Relation relation, List **reglist)
{
const TupleTableSlotOps *tts_cb;
TupleTableSlot *slot;
tts_cb = table_slot_callbacks(relation);
slot = MakeSingleTupleTableSlot(RelationGetDescr(relation), tts_cb);
if (reglist)
*reglist = lappend(*reglist, slot);
return slot;
}
/* ----------------------------------------------------------------------------
* Table scan functions.
* ----------------------------------------------------------------------------
*/
TableScanDesc
table_beginscan_catalog(Relation relation, int nkeys, struct ScanKeyData *key)
{
Oid relid = RelationGetRelid(relation);
Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid));
return relation->rd_tableam->scan_begin(relation, snapshot, nkeys, key, NULL,
true, true, true, false, false, true);
}
void
table_scan_update_snapshot(TableScanDesc scan, Snapshot snapshot)
{
Assert(IsMVCCSnapshot(snapshot));
RegisterSnapshot(snapshot);
scan->rs_snapshot = snapshot;
scan->rs_temp_snap = true;
}
/* ----------------------------------------------------------------------------
* Parallel table scan related functions.
* ----------------------------------------------------------------------------
*/
Size
table_parallelscan_estimate(Relation rel, Snapshot snapshot)
{
Size sz = 0;
if (IsMVCCSnapshot(snapshot))
sz = add_size(sz, EstimateSnapshotSpace(snapshot));
else
Assert(snapshot == SnapshotAny);
sz = add_size(sz, rel->rd_tableam->parallelscan_estimate(rel));
return sz;
}
void
table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan,
Snapshot snapshot)
{
Size snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan);
pscan->phs_snapshot_off = snapshot_off;
if (IsMVCCSnapshot(snapshot))
{
SerializeSnapshot(snapshot, (char *) pscan + pscan->phs_snapshot_off);
pscan->phs_snapshot_any = false;
}
else
{
Assert(snapshot == SnapshotAny);
pscan->phs_snapshot_any = true;
}
}
TableScanDesc
table_beginscan_parallel(Relation relation, ParallelTableScanDesc parallel_scan)
{
Snapshot snapshot;
Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
if (!parallel_scan->phs_snapshot_any)
{
/* Snapshot was serialized -- restore it */
snapshot = RestoreSnapshot((char *) parallel_scan +
parallel_scan->phs_snapshot_off);
RegisterSnapshot(snapshot);
}
else
{
/* SnapshotAny passed by caller (not serialized) */
snapshot = SnapshotAny;
}
return relation->rd_tableam->scan_begin(relation, snapshot, 0, NULL, parallel_scan,
true, true, true, false, false, !parallel_scan->phs_snapshot_any);
}
/* ----------------------------------------------------------------------------
* Helper functions to implement parallel scans for block oriented AMs.
* ----------------------------------------------------------------------------
*/
Size
table_block_parallelscan_estimate(Relation rel)
{
return sizeof(ParallelBlockTableScanDescData);
}
Size
table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
{
ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan;
bpscan->base.phs_relid = RelationGetRelid(rel);
bpscan->phs_nblocks = RelationGetNumberOfBlocks(rel);
/* compare phs_syncscan initialization to similar logic in initscan */
bpscan->base.phs_syncscan = synchronize_seqscans &&
!RelationUsesLocalBuffers(rel) &&
bpscan->phs_nblocks > NBuffers / 4;
SpinLockInit(&bpscan->phs_mutex);
bpscan->phs_startblock = InvalidBlockNumber;
pg_atomic_init_u64(&bpscan->phs_nallocated, 0);
return sizeof(ParallelBlockTableScanDescData);
}
void
table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
{
ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan;
pg_atomic_write_u64(&bpscan->phs_nallocated, 0);
}
/*
* find and set the scan's startblock
*
* Determine where the parallel seq scan should start. This function may be
* called many times, once by each parallel worker. We must be careful only
* to set the startblock once.
*/
void
table_block_parallelscan_startblock_init(Relation rel, ParallelBlockTableScanDesc pbscan)
{
BlockNumber sync_startpage = InvalidBlockNumber;
retry:
/* Grab the spinlock. */
SpinLockAcquire(&pbscan->phs_mutex);
/*
* If the scan's startblock has not yet been initialized, we must do so
* now. If this is not a synchronized scan, we just start at block 0, but
* if it is a synchronized scan, we must get the starting position from
* the synchronized scan machinery. We can't hold the spinlock while
* doing that, though, so release the spinlock, get the information we
* need, and retry. If nobody else has initialized the scan in the
* meantime, we'll fill in the value we fetched on the second time
* through.
*/
if (pbscan->phs_startblock == InvalidBlockNumber)
{
if (!pbscan->base.phs_syncscan)
pbscan->phs_startblock = 0;
else if (sync_startpage != InvalidBlockNumber)
pbscan->phs_startblock = sync_startpage;
else
{
SpinLockRelease(&pbscan->phs_mutex);
sync_startpage = ss_get_location(rel, pbscan->phs_nblocks);
goto retry;
}
}
SpinLockRelease(&pbscan->phs_mutex);
}
/*
* get the next page to scan
*
* Get the next page to scan. Even if there are no pages left to scan,
* another backend could have grabbed a page to scan and not yet finished
* looking at it, so it doesn't follow that the scan is done when the first
* backend gets an InvalidBlockNumber return.
*/
BlockNumber
table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbscan)
{
BlockNumber page;
uint64 nallocated;
/*
* phs_nallocated tracks how many pages have been allocated to workers
* already. When phs_nallocated >= rs_nblocks, all blocks have been
* allocated.
*
* Because we use an atomic fetch-and-add to fetch the current value, the
* phs_nallocated counter will exceed rs_nblocks, because workers will
* still increment the value, when they try to allocate the next block but
* all blocks have been allocated already. The counter must be 64 bits
* wide because of that, to avoid wrapping around when rs_nblocks is close
* to 2^32.
*
* The actual page to return is calculated by adding the counter to the
* starting block number, modulo nblocks.
*/
nallocated = pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, 1);
if (nallocated >= pbscan->phs_nblocks)
page = InvalidBlockNumber; /* all blocks have been allocated */
else
page = (nallocated + pbscan->phs_startblock) % pbscan->phs_nblocks;
/*
* Report scan location. Normally, we report the current page number.
* When we reach the end of the scan, though, we report the starting page,
* not the ending page, just so the starting positions for later scans
* doesn't slew backwards. We only report the position at the end of the
* scan once, though: subsequent callers will report nothing.
*/
if (pbscan->base.phs_syncscan)
{
if (page != InvalidBlockNumber)
ss_report_location(rel, page);
else if (nallocated == pbscan->phs_nblocks)
ss_report_location(rel, pbscan->phs_startblock);
}
return page;
}

View File

@@ -44,6 +44,26 @@ GetTableAmRoutine(Oid amhandler)
elog(ERROR, "Table access method handler %u did not return a TableAmRoutine struct",
amhandler);
/*
* Assert that all required callbacks are present. That makes it a bit
* easier to keep AMs up to date, e.g. when forward porting them to a new
* major version.
*/
Assert(routine->scan_begin != NULL);
Assert(routine->scan_end != NULL);
Assert(routine->scan_rescan != NULL);
Assert(routine->parallelscan_estimate != NULL);
Assert(routine->parallelscan_initialize != NULL);
Assert(routine->parallelscan_reinitialize != NULL);
Assert(routine->index_fetch_begin != NULL);
Assert(routine->index_fetch_reset != NULL);
Assert(routine->index_fetch_end != NULL);
Assert(routine->index_fetch_tuple != NULL);
Assert(routine->tuple_satisfies_snapshot != NULL);
return routine;
}
@@ -98,7 +118,7 @@ get_table_am_oid(const char *tableamname, bool missing_ok)
{
Oid result;
Relation rel;
HeapScanDesc scandesc;
TableScanDesc scandesc;
HeapTuple tuple;
ScanKeyData entry[1];
@@ -113,7 +133,7 @@ get_table_am_oid(const char *tableamname, bool missing_ok)
Anum_pg_am_amname,
BTEqualStrategyNumber, F_NAMEEQ,
CStringGetDatum(tableamname));
scandesc = heap_beginscan_catalog(rel, 1, entry);
scandesc = table_beginscan_catalog(rel, 1, entry);
tuple = heap_getnext(scandesc, ForwardScanDirection);
/* We assume that there can be at most one matching tuple */
@@ -123,7 +143,7 @@ get_table_am_oid(const char *tableamname, bool missing_ok)
else
result = InvalidOid;
heap_endscan(scandesc);
table_endscan(scandesc);
heap_close(rel, AccessShareLock);
if (!OidIsValid(result) && !missing_ok)

View File

@@ -180,7 +180,8 @@ static BlockNumber
system_nextsampleblock(SampleScanState *node)
{
SystemSamplerData *sampler = (SystemSamplerData *) node->tsm_state;
HeapScanDesc scan = node->ss.ss_currentScanDesc;
TableScanDesc scan = node->ss.ss_currentScanDesc;
HeapScanDesc hscan = (HeapScanDesc) scan;
BlockNumber nextblock = sampler->nextblock;
uint32 hashinput[2];
@@ -199,7 +200,7 @@ system_nextsampleblock(SampleScanState *node)
* Loop over block numbers until finding suitable block or reaching end of
* relation.
*/
for (; nextblock < scan->rs_nblocks; nextblock++)
for (; nextblock < hscan->rs_nblocks; nextblock++)
{
uint32 hash;
@@ -211,7 +212,7 @@ system_nextsampleblock(SampleScanState *node)
break;
}
if (nextblock < scan->rs_nblocks)
if (nextblock < hscan->rs_nblocks)
{
/* Found a suitable block; remember where we should start next time */
sampler->nextblock = nextblock + 1;