diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index ef34421f1ca..07c73f39de8 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -4219,6 +4219,8 @@ afterTriggerInvokeEvents(AfterTriggerEventList *events, { rInfo = ExecGetTriggerResultRel(estate, evtshared->ats_relid); rel = rInfo->ri_RelationDesc; + /* Catch calls with insufficient relcache refcounting */ + Assert(!RelationHasReferenceCountZero(rel)); trigdesc = rInfo->ri_TrigDesc; finfo = rInfo->ri_TrigFunctions; instr = rInfo->ri_TrigInstrument; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 452e5f3df7f..6ba447ea974 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -136,6 +136,18 @@ typedef struct SlotErrCallbackArg int remote_attnum; } SlotErrCallbackArg; +typedef struct ApplyExecutionData +{ + EState *estate; /* executor state, used to track resources */ + + LogicalRepRelMapEntry *targetRel; /* replication target rel */ + ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */ + + /* These fields are used when the target relation is partitioned: */ + ModifyTableState *mtstate; /* dummy ModifyTable state */ + PartitionTupleRouting *proute; /* partition routing info */ +} ApplyExecutionData; + /* * Stream xid hash entry. Whenever we see a new xid we create this entry in the * xidhash and along with it create the streaming file and store the fileset handle. @@ -226,24 +238,23 @@ static void apply_dispatch(StringInfo s); static void apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data); -static void apply_handle_insert_internal(ResultRelInfo *relinfo, - EState *estate, TupleTableSlot *remoteslot); -static void apply_handle_update_internal(ResultRelInfo *relinfo, - EState *estate, TupleTableSlot *remoteslot, - LogicalRepTupleData *newtup, - LogicalRepRelMapEntry *relmapentry); -static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, +static void apply_handle_insert_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot); +static void apply_handle_update_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, TupleTableSlot *remoteslot, - LogicalRepRelation *remoterel); + LogicalRepTupleData *newtup); +static void apply_handle_delete_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot); static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot); -static void apply_handle_tuple_routing(ResultRelInfo *relinfo, - EState *estate, +static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, - LogicalRepRelMapEntry *relmapentry, CmdType operation); /* @@ -336,27 +347,29 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) /* * Executor state preparation for evaluation of constraint expressions, - * indexes and triggers. + * indexes and triggers for the specified relation. * - * resultRelInfo is a ResultRelInfo for the relation to be passed to the - * executor routines. The caller must open and close any indexes to be - * updated independently of the relation registered here. + * Note that the caller must open and close any indexes to be updated. */ -static EState * -create_estate_for_relation(LogicalRepRelMapEntry *rel, - ResultRelInfo **resultRelInfo) +static ApplyExecutionData * +create_edata_for_relation(LogicalRepRelMapEntry *rel) { + ApplyExecutionData *edata; EState *estate; RangeTblEntry *rte; + ResultRelInfo *resultRelInfo; /* * Input functions may need an active snapshot, as may AFTER triggers - * invoked during finish_estate. For safety, ensure an active snapshot + * invoked during finish_edata. For safety, ensure an active snapshot * exists throughout all our usage of the executor. */ PushActiveSnapshot(GetTransactionSnapshot()); - estate = CreateExecutorState(); + edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData)); + edata->targetRel = rel; + + edata->estate = estate = CreateExecutorState(); rte = makeNode(RangeTblEntry); rte->rtekind = RTE_RELATION; @@ -365,48 +378,62 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel, rte->rellockmode = AccessShareLock; ExecInitRangeTable(estate, list_make1(rte)); - *resultRelInfo = makeNode(ResultRelInfo); + edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo); /* * Use Relation opened by logicalrep_rel_open() instead of opening it * again. */ - InitResultRelInfo(*resultRelInfo, rel->localrel, 1, NULL, 0); + InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0); /* * We put the ResultRelInfo in the es_opened_result_relations list, even * though we don't populate the es_result_relations array. That's a bit * bogus, but it's enough to make ExecGetTriggerResultRel() find them. - * Also, because we did not open the Relation ourselves here, there is no - * need to worry about closing it. * * ExecOpenIndices() is not called here either, each execution path doing * an apply operation being responsible for that. */ estate->es_opened_result_relations = - lappend(estate->es_opened_result_relations, *resultRelInfo); + lappend(estate->es_opened_result_relations, resultRelInfo); estate->es_output_cid = GetCurrentCommandId(true); /* Prepare to catch AFTER triggers. */ AfterTriggerBeginQuery(); - return estate; + /* other fields of edata remain NULL for now */ + + return edata; } /* * Finish any operations related to the executor state created by - * create_estate_for_relation(). + * create_edata_for_relation(). */ static void -finish_estate(EState *estate) +finish_edata(ApplyExecutionData *edata) { + EState *estate = edata->estate; + /* Handle any queued AFTER triggers. */ AfterTriggerEndQuery(estate); - /* Cleanup. */ + /* Shut down tuple routing, if any was done. */ + if (edata->proute) + ExecCleanupTupleRouting(edata->mtstate, edata->proute); + + /* + * Cleanup. It might seem that we should call ExecCloseResultRelations() + * here, but we intentionally don't. It would close the rel we added to + * es_opened_result_relations above, which is wrong because we took no + * corresponding refcount. We rely on ExecCleanupTupleRouting() to close + * any other relations opened during execution. + */ ExecResetTupleTable(estate->es_tupleTable, false); FreeExecutorState(estate); + pfree(edata); + PopActiveSnapshot(); } @@ -1189,10 +1216,10 @@ GetRelationIdentityOrPK(Relation rel) static void apply_handle_insert(StringInfo s) { - ResultRelInfo *resultRelInfo; LogicalRepRelMapEntry *rel; LogicalRepTupleData newtup; LogicalRepRelId relid; + ApplyExecutionData *edata; EState *estate; TupleTableSlot *remoteslot; MemoryContext oldctx; @@ -1215,7 +1242,8 @@ apply_handle_insert(StringInfo s) } /* Initialize the executor state. */ - estate = create_estate_for_relation(rel, &resultRelInfo); + edata = create_edata_for_relation(rel); + estate = edata->estate; remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); @@ -1228,24 +1256,32 @@ apply_handle_insert(StringInfo s) /* For a partitioned table, insert the tuple into a partition. */ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - apply_handle_tuple_routing(resultRelInfo, estate, - remoteslot, NULL, rel, CMD_INSERT); + apply_handle_tuple_routing(edata, + remoteslot, NULL, CMD_INSERT); else - apply_handle_insert_internal(resultRelInfo, estate, + apply_handle_insert_internal(edata, edata->targetRelInfo, remoteslot); - finish_estate(estate); + finish_edata(edata); logicalrep_rel_close(rel, NoLock); CommandCounterIncrement(); } -/* Workhorse for apply_handle_insert() */ +/* + * Workhorse for apply_handle_insert() + * relinfo is for the relation we're actually inserting into + * (could be a child partition of edata->targetRelInfo) + */ static void -apply_handle_insert_internal(ResultRelInfo *relinfo, - EState *estate, TupleTableSlot *remoteslot) +apply_handle_insert_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot) { + EState *estate = edata->estate; + + /* We must open indexes here. */ ExecOpenIndices(relinfo, false); /* Do the insert. */ @@ -1296,9 +1332,9 @@ check_relation_updatable(LogicalRepRelMapEntry *rel) static void apply_handle_update(StringInfo s) { - ResultRelInfo *resultRelInfo; LogicalRepRelMapEntry *rel; LogicalRepRelId relid; + ApplyExecutionData *edata; EState *estate; LogicalRepTupleData oldtup; LogicalRepTupleData newtup; @@ -1329,7 +1365,8 @@ apply_handle_update(StringInfo s) check_relation_updatable(rel); /* Initialize the executor state. */ - estate = create_estate_for_relation(rel, &resultRelInfo); + edata = create_edata_for_relation(rel); + estate = edata->estate; remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); @@ -1369,26 +1406,32 @@ apply_handle_update(StringInfo s) /* For a partitioned table, apply update to correct partition. */ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - apply_handle_tuple_routing(resultRelInfo, estate, - remoteslot, &newtup, rel, CMD_UPDATE); + apply_handle_tuple_routing(edata, + remoteslot, &newtup, CMD_UPDATE); else - apply_handle_update_internal(resultRelInfo, estate, - remoteslot, &newtup, rel); + apply_handle_update_internal(edata, edata->targetRelInfo, + remoteslot, &newtup); - finish_estate(estate); + finish_edata(edata); logicalrep_rel_close(rel, NoLock); CommandCounterIncrement(); } -/* Workhorse for apply_handle_update() */ +/* + * Workhorse for apply_handle_update() + * relinfo is for the relation we're actually updating in + * (could be a child partition of edata->targetRelInfo) + */ static void -apply_handle_update_internal(ResultRelInfo *relinfo, - EState *estate, TupleTableSlot *remoteslot, - LogicalRepTupleData *newtup, - LogicalRepRelMapEntry *relmapentry) +apply_handle_update_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup) { + EState *estate = edata->estate; + LogicalRepRelMapEntry *relmapentry = edata->targetRel; Relation localrel = relinfo->ri_RelationDesc; EPQState epqstate; TupleTableSlot *localslot; @@ -1447,10 +1490,10 @@ apply_handle_update_internal(ResultRelInfo *relinfo, static void apply_handle_delete(StringInfo s) { - ResultRelInfo *resultRelInfo; LogicalRepRelMapEntry *rel; LogicalRepTupleData oldtup; LogicalRepRelId relid; + ApplyExecutionData *edata; EState *estate; TupleTableSlot *remoteslot; MemoryContext oldctx; @@ -1476,7 +1519,8 @@ apply_handle_delete(StringInfo s) check_relation_updatable(rel); /* Initialize the executor state. */ - estate = create_estate_for_relation(rel, &resultRelInfo); + edata = create_edata_for_relation(rel); + estate = edata->estate; remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); @@ -1488,26 +1532,32 @@ apply_handle_delete(StringInfo s) /* For a partitioned table, apply delete to correct partition. */ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - apply_handle_tuple_routing(resultRelInfo, estate, - remoteslot, NULL, rel, CMD_DELETE); + apply_handle_tuple_routing(edata, + remoteslot, NULL, CMD_DELETE); else - apply_handle_delete_internal(resultRelInfo, estate, - remoteslot, &rel->remoterel); + apply_handle_delete_internal(edata, edata->targetRelInfo, + remoteslot); - finish_estate(estate); + finish_edata(edata); logicalrep_rel_close(rel, NoLock); CommandCounterIncrement(); } -/* Workhorse for apply_handle_delete() */ +/* + * Workhorse for apply_handle_delete() + * relinfo is for the relation we're actually deleting from + * (could be a child partition of edata->targetRelInfo) + */ static void -apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, - TupleTableSlot *remoteslot, - LogicalRepRelation *remoterel) +apply_handle_delete_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot) { + EState *estate = edata->estate; Relation localrel = relinfo->ri_RelationDesc; + LogicalRepRelation *remoterel = &edata->targetRel->remoterel; EPQState epqstate; TupleTableSlot *localslot; bool found; @@ -1577,16 +1627,17 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel, * This handles insert, update, delete on a partitioned table. */ static void -apply_handle_tuple_routing(ResultRelInfo *relinfo, - EState *estate, +apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, - LogicalRepRelMapEntry *relmapentry, CmdType operation) { + EState *estate = edata->estate; + LogicalRepRelMapEntry *relmapentry = edata->targetRel; + ResultRelInfo *relinfo = edata->targetRelInfo; Relation parentrel = relinfo->ri_RelationDesc; - ModifyTableState *mtstate = NULL; - PartitionTupleRouting *proute = NULL; + ModifyTableState *mtstate; + PartitionTupleRouting *proute; ResultRelInfo *partrelinfo; Relation partrel; TupleTableSlot *remoteslot_part; @@ -1594,12 +1645,14 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, MemoryContext oldctx; /* ModifyTableState is needed for ExecFindPartition(). */ - mtstate = makeNode(ModifyTableState); + edata->mtstate = mtstate = makeNode(ModifyTableState); mtstate->ps.plan = NULL; mtstate->ps.state = estate; mtstate->operation = operation; mtstate->resultRelInfo = relinfo; - proute = ExecSetupPartitionTupleRouting(estate, parentrel); + + /* ... as is PartitionTupleRouting. */ + edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel); /* * Find the partition to which the "search tuple" belongs. @@ -1633,14 +1686,13 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, switch (operation) { case CMD_INSERT: - apply_handle_insert_internal(partrelinfo, estate, + apply_handle_insert_internal(edata, partrelinfo, remoteslot_part); break; case CMD_DELETE: - apply_handle_delete_internal(partrelinfo, estate, - remoteslot_part, - &relmapentry->remoterel); + apply_handle_delete_internal(edata, partrelinfo, + remoteslot_part); break; case CMD_UPDATE: @@ -1752,9 +1804,8 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, Assert(partrelinfo_new != partrelinfo); /* DELETE old tuple found in the old partition. */ - apply_handle_delete_internal(partrelinfo, estate, - localslot, - &relmapentry->remoterel); + apply_handle_delete_internal(edata, partrelinfo, + localslot); /* INSERT new tuple into the new partition. */ @@ -1782,7 +1833,7 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, slot_getallattrs(remoteslot); } MemoryContextSwitchTo(oldctx); - apply_handle_insert_internal(partrelinfo_new, estate, + apply_handle_insert_internal(edata, partrelinfo_new, remoteslot_part); } } @@ -1792,8 +1843,6 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, elog(ERROR, "unrecognized CmdType: %d", (int) operation); break; } - - ExecCleanupTupleRouting(mtstate, proute); } /*