1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-02 09:02:37 +03:00

Get rid of ReferentialIntegritySnapshotOverride by extending Executor API

to allow es_snapshot to be set to SnapshotNow rather than a query snapshot.
This solves a bug reported by Wade Klaver, wherein triggers fired as a
result of RI cascade updates could misbehave.
This commit is contained in:
Tom Lane
2003-09-25 18:58:36 +00:00
parent 7ab5c5b83e
commit c63a5452d8
16 changed files with 101 additions and 94 deletions

View File

@ -26,7 +26,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/execMain.c,v 1.218 2003/09/25 06:57:59 petere Exp $
* $Header: /cvsroot/pgsql/src/backend/executor/execMain.c,v 1.219 2003/09/25 18:58:35 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -104,6 +104,9 @@ static void EvalPlanQualStop(evalPlanQual *epq);
* field of the QueryDesc is filled in to describe the tuples that will be
* returned, and the internal fields (estate and planstate) are set up.
*
* If useSnapshotNow is true, run the query with SnapshotNow time qual rules
* instead of the normal use of QuerySnapshot.
*
* If explainOnly is true, we are not actually intending to run the plan,
* only to set up for EXPLAIN; so skip unwanted side-effects.
*
@ -112,7 +115,7 @@ static void EvalPlanQualStop(evalPlanQual *epq);
* ----------------------------------------------------------------
*/
void
ExecutorStart(QueryDesc *queryDesc, bool explainOnly)
ExecutorStart(QueryDesc *queryDesc, bool useSnapshotNow, bool explainOnly)
{
EState *estate;
MemoryContext oldcontext;
@ -154,7 +157,16 @@ ExecutorStart(QueryDesc *queryDesc, bool explainOnly)
* the life of this query, even if it outlives the current command and
* current snapshot.
*/
estate->es_snapshot = CopyQuerySnapshot();
if (useSnapshotNow)
{
estate->es_snapshot = SnapshotNow;
estate->es_snapshot_cid = GetCurrentCommandId();
}
else
{
estate->es_snapshot = CopyQuerySnapshot();
estate->es_snapshot_cid = estate->es_snapshot->curcid;
}
/*
* Initialize the plan state tree
@ -1106,7 +1118,7 @@ lnext: ;
tuple.t_self = *((ItemPointer) DatumGetPointer(datum));
test = heap_mark4update(erm->relation, &tuple, &buffer,
estate->es_snapshot->curcid);
estate->es_snapshot_cid);
ReleaseBuffer(buffer);
switch (test)
{
@ -1266,7 +1278,7 @@ ExecSelect(TupleTableSlot *slot,
if (estate->es_into_relation_descriptor != NULL)
{
heap_insert(estate->es_into_relation_descriptor, tuple,
estate->es_snapshot->curcid);
estate->es_snapshot_cid);
IncrAppended();
}
@ -1342,7 +1354,7 @@ ExecInsert(TupleTableSlot *slot,
* insert the tuple
*/
newId = heap_insert(resultRelationDesc, tuple,
estate->es_snapshot->curcid);
estate->es_snapshot_cid);
IncrAppended();
(estate->es_processed)++;
@ -1394,7 +1406,7 @@ ExecDelete(TupleTableSlot *slot,
bool dodelete;
dodelete = ExecBRDeleteTriggers(estate, resultRelInfo, tupleid,
estate->es_snapshot->curcid);
estate->es_snapshot_cid);
if (!dodelete) /* "do nothing" */
return;
@ -1406,7 +1418,7 @@ ExecDelete(TupleTableSlot *slot,
ldelete:;
result = heap_delete(resultRelationDesc, tupleid,
&ctid,
estate->es_snapshot->curcid,
estate->es_snapshot_cid,
true /* wait for commit */);
switch (result)
{
@ -1505,7 +1517,7 @@ ExecUpdate(TupleTableSlot *slot,
newtuple = ExecBRUpdateTriggers(estate, resultRelInfo,
tupleid, tuple,
estate->es_snapshot->curcid);
estate->es_snapshot_cid);
if (newtuple == NULL) /* "do nothing" */
return;
@ -1541,7 +1553,7 @@ lreplace:;
*/
result = heap_update(resultRelationDesc, tupleid, tuple,
&ctid,
estate->es_snapshot->curcid,
estate->es_snapshot_cid,
true /* wait for commit */);
switch (result)
{
@ -2027,6 +2039,7 @@ EvalPlanQualStart(evalPlanQual *epq, EState *estate, evalPlanQual *priorepq)
*/
epqstate->es_direction = ForwardScanDirection;
epqstate->es_snapshot = estate->es_snapshot;
epqstate->es_snapshot_cid = estate->es_snapshot_cid;
epqstate->es_range_table = estate->es_range_table;
epqstate->es_result_relations = estate->es_result_relations;
epqstate->es_num_result_relations = estate->es_num_result_relations;

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/execUtils.c,v 1.104 2003/09/24 18:54:01 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/executor/execUtils.c,v 1.105 2003/09/25 18:58:35 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -178,6 +178,7 @@ CreateExecutorState(void)
*/
estate->es_direction = ForwardScanDirection;
estate->es_snapshot = SnapshotNow;
estate->es_snapshot_cid = FirstCommandId;
estate->es_range_table = NIL;
estate->es_result_relations = NULL;

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/functions.c,v 1.74 2003/09/25 06:57:59 petere Exp $
* $Header: /cvsroot/pgsql/src/backend/executor/functions.c,v 1.75 2003/09/25 18:58:35 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -291,7 +291,7 @@ postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
/* Utility commands don't need Executor. */
if (es->qd->operation != CMD_UTILITY)
ExecutorStart(es->qd, false);
ExecutorStart(es->qd, false, false);
es->status = F_EXEC_RUN;
}

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/nodeSubplan.c,v 1.56 2003/09/25 06:57:59 petere Exp $
* $Header: /cvsroot/pgsql/src/backend/executor/nodeSubplan.c,v 1.57 2003/09/25 18:58:35 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -709,6 +709,7 @@ ExecInitSubPlan(SubPlanState *node, EState *estate)
sp_estate->es_tupleTable =
ExecCreateTupleTable(ExecCountSlotsNode(subplan->plan) + 10);
sp_estate->es_snapshot = estate->es_snapshot;
sp_estate->es_snapshot_cid = estate->es_snapshot_cid;
sp_estate->es_instrument = estate->es_instrument;
/*

View File

@ -12,7 +12,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/nodeSubqueryscan.c,v 1.20 2003/08/04 02:39:59 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/executor/nodeSubqueryscan.c,v 1.21 2003/09/25 18:58:35 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -177,6 +177,7 @@ ExecInitSubqueryScan(SubqueryScan *node, EState *estate)
sp_estate->es_tupleTable =
ExecCreateTupleTable(ExecCountSlotsNode(node->subplan) + 10);
sp_estate->es_snapshot = estate->es_snapshot;
sp_estate->es_snapshot_cid = estate->es_snapshot_cid;
sp_estate->es_instrument = estate->es_instrument;
/*

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/spi.c,v 1.105 2003/09/23 15:11:33 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/executor/spi.c,v 1.106 2003/09/25 18:58:35 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -32,10 +32,12 @@ static int _SPI_connected = -1;
static int _SPI_curid = -1;
static int _SPI_execute(const char *src, int tcount, _SPI_plan *plan);
static int _SPI_pquery(QueryDesc *queryDesc, bool runit, int tcount);
static int _SPI_pquery(QueryDesc *queryDesc, bool runit,
bool useSnapshotNow, int tcount);
static int _SPI_execute_plan(_SPI_plan *plan,
Datum *Values, const char *Nulls, int tcount);
Datum *Values, const char *Nulls,
bool useSnapshotNow, int tcount);
static void _SPI_cursor_operation(Portal portal, bool forward, int count,
DestReceiver *dest);
@ -236,7 +238,33 @@ SPI_execp(void *plan, Datum *Values, const char *Nulls, int tcount)
if (res < 0)
return res;
res = _SPI_execute_plan((_SPI_plan *) plan, Values, Nulls, tcount);
res = _SPI_execute_plan((_SPI_plan *) plan, Values, Nulls, false, tcount);
_SPI_end_call(true);
return res;
}
/*
* SPI_execp_now -- identical to SPI_execp, except that we use SnapshotNow
* instead of the normal QuerySnapshot. This is currently not documented
* in spi.sgml because it is only intended for use by RI triggers.
*/
int
SPI_execp_now(void *plan, Datum *Values, const char *Nulls, int tcount)
{
int res;
if (plan == NULL || tcount < 0)
return SPI_ERROR_ARGUMENT;
if (((_SPI_plan *) plan)->nargs > 0 && Values == NULL)
return SPI_ERROR_PARAM;
res = _SPI_begin_call(true);
if (res < 0)
return res;
res = _SPI_execute_plan((_SPI_plan *) plan, Values, Nulls, true, tcount);
_SPI_end_call(true);
return res;
@ -1068,7 +1096,7 @@ _SPI_execute(const char *src, int tcount, _SPI_plan *plan)
{
qdesc = CreateQueryDesc(queryTree, planTree, dest,
NULL, false);
res = _SPI_pquery(qdesc, true,
res = _SPI_pquery(qdesc, true, false,
queryTree->canSetTag ? tcount : 0);
if (res < 0)
return res;
@ -1078,7 +1106,7 @@ _SPI_execute(const char *src, int tcount, _SPI_plan *plan)
{
qdesc = CreateQueryDesc(queryTree, planTree, dest,
NULL, false);
res = _SPI_pquery(qdesc, false, 0);
res = _SPI_pquery(qdesc, false, false, 0);
if (res < 0)
return res;
}
@ -1096,7 +1124,7 @@ _SPI_execute(const char *src, int tcount, _SPI_plan *plan)
static int
_SPI_execute_plan(_SPI_plan *plan, Datum *Values, const char *Nulls,
int tcount)
bool useSnapshotNow, int tcount)
{
List *query_list_list = plan->qtlist;
List *plan_list = plan->ptlist;
@ -1167,7 +1195,7 @@ _SPI_execute_plan(_SPI_plan *plan, Datum *Values, const char *Nulls,
{
qdesc = CreateQueryDesc(queryTree, planTree, dest,
paramLI, false);
res = _SPI_pquery(qdesc, true,
res = _SPI_pquery(qdesc, true, useSnapshotNow,
queryTree->canSetTag ? tcount : 0);
if (res < 0)
return res;
@ -1180,7 +1208,7 @@ _SPI_execute_plan(_SPI_plan *plan, Datum *Values, const char *Nulls,
}
static int
_SPI_pquery(QueryDesc *queryDesc, bool runit, int tcount)
_SPI_pquery(QueryDesc *queryDesc, bool runit, bool useSnapshotNow, int tcount)
{
int operation = queryDesc->operation;
int res;
@ -1217,7 +1245,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool runit, int tcount)
ResetUsage();
#endif
ExecutorStart(queryDesc, false);
ExecutorStart(queryDesc, useSnapshotNow, false);
ExecutorRun(queryDesc, ForwardScanDirection, (long) tcount);