mirror of
https://github.com/postgres/postgres.git
synced 2025-04-22 23:02:54 +03:00
Commit 9a3cebeaa changed things so that parallel workers didn't obtain any lock of their own on tables they access. That was clearly a bad idea, but I'd mistakenly supposed that it was the intended end result of the series of patches for simplifying the executor's lock management. Undo that change in relation_open(), and adjust ExecOpenScanRelation() so that it gets the correct lock if inside a parallel worker. In passing, clean up some more obsolete comments about when locks are acquired. Discussion: https://postgr.es/m/468c85d9-540e-66a2-1dde-fec2b741e688@lab.ntt.co.jp
327 lines
8.7 KiB
C
327 lines
8.7 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* nodeSeqscan.c
|
|
* Support routines for sequential scans of relations.
|
|
*
|
|
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/executor/nodeSeqscan.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
/*
|
|
* INTERFACE ROUTINES
|
|
* ExecSeqScan sequentially scans a relation.
|
|
* ExecSeqNext retrieve next tuple in sequential order.
|
|
* ExecInitSeqScan creates and initializes a seqscan node.
|
|
* ExecEndSeqScan releases any storage allocated.
|
|
* ExecReScanSeqScan rescans the relation
|
|
*
|
|
* ExecSeqScanEstimate estimates DSM space needed for parallel scan
|
|
* ExecSeqScanInitializeDSM initialize DSM for parallel scan
|
|
* ExecSeqScanReInitializeDSM reinitialize DSM for fresh parallel scan
|
|
* ExecSeqScanInitializeWorker attach to DSM info in parallel worker
|
|
*/
|
|
#include "postgres.h"
|
|
|
|
#include "access/relscan.h"
|
|
#include "executor/execdebug.h"
|
|
#include "executor/nodeSeqscan.h"
|
|
#include "utils/rel.h"
|
|
|
|
static TupleTableSlot *SeqNext(SeqScanState *node);
|
|
|
|
/* ----------------------------------------------------------------
|
|
* Scan Support
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
|
|
/* ----------------------------------------------------------------
|
|
* SeqNext
|
|
*
|
|
* This is a workhorse for ExecSeqScan
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
static TupleTableSlot *
|
|
SeqNext(SeqScanState *node)
|
|
{
|
|
HeapTuple tuple;
|
|
HeapScanDesc scandesc;
|
|
EState *estate;
|
|
ScanDirection direction;
|
|
TupleTableSlot *slot;
|
|
|
|
/*
|
|
* get information from the estate and scan state
|
|
*/
|
|
scandesc = node->ss.ss_currentScanDesc;
|
|
estate = node->ss.ps.state;
|
|
direction = estate->es_direction;
|
|
slot = node->ss.ss_ScanTupleSlot;
|
|
|
|
if (scandesc == NULL)
|
|
{
|
|
/*
|
|
* We reach here if the scan is not parallel, or if we're serially
|
|
* executing a scan that was planned to be parallel.
|
|
*/
|
|
scandesc = heap_beginscan(node->ss.ss_currentRelation,
|
|
estate->es_snapshot,
|
|
0, NULL);
|
|
node->ss.ss_currentScanDesc = scandesc;
|
|
}
|
|
|
|
/*
|
|
* get the next tuple from the table
|
|
*/
|
|
tuple = heap_getnext(scandesc, direction);
|
|
|
|
/*
|
|
* save the tuple and the buffer returned to us by the access methods in
|
|
* our scan tuple slot and return the slot. Note: we pass 'false' because
|
|
* tuples returned by heap_getnext() are pointers onto disk pages and were
|
|
* not created with palloc() and so should not be pfree()'d. Note also
|
|
* that ExecStoreHeapTuple will increment the refcount of the buffer; the
|
|
* refcount will not be dropped until the tuple table slot is cleared.
|
|
*/
|
|
if (tuple)
|
|
ExecStoreBufferHeapTuple(tuple, /* tuple to store */
|
|
slot, /* slot to store in */
|
|
scandesc->rs_cbuf); /* buffer associated
|
|
* with this tuple */
|
|
else
|
|
ExecClearTuple(slot);
|
|
|
|
return slot;
|
|
}
|
|
|
|
/*
|
|
* SeqRecheck -- access method routine to recheck a tuple in EvalPlanQual
|
|
*/
|
|
static bool
|
|
SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
|
|
{
|
|
/*
|
|
* Note that unlike IndexScan, SeqScan never use keys in heap_beginscan
|
|
* (and this is very bad) - so, here we do not check are keys ok or not.
|
|
*/
|
|
return true;
|
|
}
|
|
|
|
/* ----------------------------------------------------------------
|
|
* ExecSeqScan(node)
|
|
*
|
|
* Scans the relation sequentially and returns the next qualifying
|
|
* tuple.
|
|
* We call the ExecScan() routine and pass it the appropriate
|
|
* access method functions.
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
static TupleTableSlot *
|
|
ExecSeqScan(PlanState *pstate)
|
|
{
|
|
SeqScanState *node = castNode(SeqScanState, pstate);
|
|
|
|
return ExecScan(&node->ss,
|
|
(ExecScanAccessMtd) SeqNext,
|
|
(ExecScanRecheckMtd) SeqRecheck);
|
|
}
|
|
|
|
|
|
/* ----------------------------------------------------------------
|
|
* ExecInitSeqScan
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
SeqScanState *
|
|
ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
|
|
{
|
|
SeqScanState *scanstate;
|
|
|
|
/*
|
|
* Once upon a time it was possible to have an outerPlan of a SeqScan, but
|
|
* not any more.
|
|
*/
|
|
Assert(outerPlan(node) == NULL);
|
|
Assert(innerPlan(node) == NULL);
|
|
|
|
/*
|
|
* create state structure
|
|
*/
|
|
scanstate = makeNode(SeqScanState);
|
|
scanstate->ss.ps.plan = (Plan *) node;
|
|
scanstate->ss.ps.state = estate;
|
|
scanstate->ss.ps.ExecProcNode = ExecSeqScan;
|
|
|
|
/*
|
|
* Miscellaneous initialization
|
|
*
|
|
* create expression context for node
|
|
*/
|
|
ExecAssignExprContext(estate, &scanstate->ss.ps);
|
|
|
|
/*
|
|
* open the scan relation
|
|
*/
|
|
scanstate->ss.ss_currentRelation =
|
|
ExecOpenScanRelation(estate,
|
|
node->scanrelid,
|
|
eflags);
|
|
|
|
/* and create slot with the appropriate rowtype */
|
|
ExecInitScanTupleSlot(estate, &scanstate->ss,
|
|
RelationGetDescr(scanstate->ss.ss_currentRelation));
|
|
|
|
/*
|
|
* Initialize result slot, type and projection.
|
|
*/
|
|
ExecInitResultTupleSlotTL(estate, &scanstate->ss.ps);
|
|
ExecAssignScanProjectionInfo(&scanstate->ss);
|
|
|
|
/*
|
|
* initialize child expressions
|
|
*/
|
|
scanstate->ss.ps.qual =
|
|
ExecInitQual(node->plan.qual, (PlanState *) scanstate);
|
|
|
|
return scanstate;
|
|
}
|
|
|
|
/* ----------------------------------------------------------------
|
|
* ExecEndSeqScan
|
|
*
|
|
* frees any storage allocated through C routines.
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
void
|
|
ExecEndSeqScan(SeqScanState *node)
|
|
{
|
|
HeapScanDesc scanDesc;
|
|
|
|
/*
|
|
* get information from node
|
|
*/
|
|
scanDesc = node->ss.ss_currentScanDesc;
|
|
|
|
/*
|
|
* Free the exprcontext
|
|
*/
|
|
ExecFreeExprContext(&node->ss.ps);
|
|
|
|
/*
|
|
* clean out the tuple table
|
|
*/
|
|
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
|
|
ExecClearTuple(node->ss.ss_ScanTupleSlot);
|
|
|
|
/*
|
|
* close heap scan
|
|
*/
|
|
if (scanDesc != NULL)
|
|
heap_endscan(scanDesc);
|
|
}
|
|
|
|
/* ----------------------------------------------------------------
|
|
* Join Support
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
|
|
/* ----------------------------------------------------------------
|
|
* ExecReScanSeqScan
|
|
*
|
|
* Rescans the relation.
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
void
|
|
ExecReScanSeqScan(SeqScanState *node)
|
|
{
|
|
HeapScanDesc scan;
|
|
|
|
scan = node->ss.ss_currentScanDesc;
|
|
|
|
if (scan != NULL)
|
|
heap_rescan(scan, /* scan desc */
|
|
NULL); /* new scan keys */
|
|
|
|
ExecScanReScan((ScanState *) node);
|
|
}
|
|
|
|
/* ----------------------------------------------------------------
|
|
* Parallel Scan Support
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
|
|
/* ----------------------------------------------------------------
|
|
* ExecSeqScanEstimate
|
|
*
|
|
* Compute the amount of space we'll need in the parallel
|
|
* query DSM, and inform pcxt->estimator about our needs.
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
void
|
|
ExecSeqScanEstimate(SeqScanState *node,
|
|
ParallelContext *pcxt)
|
|
{
|
|
EState *estate = node->ss.ps.state;
|
|
|
|
node->pscan_len = heap_parallelscan_estimate(estate->es_snapshot);
|
|
shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
|
|
shm_toc_estimate_keys(&pcxt->estimator, 1);
|
|
}
|
|
|
|
/* ----------------------------------------------------------------
|
|
* ExecSeqScanInitializeDSM
|
|
*
|
|
* Set up a parallel heap scan descriptor.
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
void
|
|
ExecSeqScanInitializeDSM(SeqScanState *node,
|
|
ParallelContext *pcxt)
|
|
{
|
|
EState *estate = node->ss.ps.state;
|
|
ParallelHeapScanDesc pscan;
|
|
|
|
pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
|
|
heap_parallelscan_initialize(pscan,
|
|
node->ss.ss_currentRelation,
|
|
estate->es_snapshot);
|
|
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
|
|
node->ss.ss_currentScanDesc =
|
|
heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
|
|
}
|
|
|
|
/* ----------------------------------------------------------------
|
|
* ExecSeqScanReInitializeDSM
|
|
*
|
|
* Reset shared state before beginning a fresh scan.
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
void
|
|
ExecSeqScanReInitializeDSM(SeqScanState *node,
|
|
ParallelContext *pcxt)
|
|
{
|
|
HeapScanDesc scan = node->ss.ss_currentScanDesc;
|
|
|
|
heap_parallelscan_reinitialize(scan->rs_parallel);
|
|
}
|
|
|
|
/* ----------------------------------------------------------------
|
|
* ExecSeqScanInitializeWorker
|
|
*
|
|
* Copy relevant information from TOC into planstate.
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
void
|
|
ExecSeqScanInitializeWorker(SeqScanState *node,
|
|
ParallelWorkerContext *pwcxt)
|
|
{
|
|
ParallelHeapScanDesc pscan;
|
|
|
|
pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
|
|
node->ss.ss_currentScanDesc =
|
|
heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
|
|
}
|