1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-30 21:42:05 +03:00

Revert "Rewrite some RI code to avoid using SPI"

This reverts commit 99392cdd78.
We'd rather rewrite ri_triggers.c as a whole rather than piecemeal.

Discussion: https://postgr.es/m/E1ncXX2-000mFt-Pe@gemulon.postgresql.org
This commit is contained in:
Alvaro Herrera
2022-04-07 23:42:13 +02:00
parent 3e707fbb40
commit a90641eac2
7 changed files with 317 additions and 605 deletions

View File

@ -176,9 +176,8 @@ static void FormPartitionKeyDatum(PartitionDispatch pd,
EState *estate,
Datum *values,
bool *isnull);
static int get_partition_for_tuple(PartitionKey key,
PartitionDesc partdesc,
Datum *values, bool *isnull);
static int get_partition_for_tuple(PartitionDispatch pd, Datum *values,
bool *isnull);
static char *ExecBuildSlotPartitionKeyDescription(Relation rel,
Datum *values,
bool *isnull,
@ -319,9 +318,7 @@ ExecFindPartition(ModifyTableState *mtstate,
* these values, error out.
*/
if (partdesc->nparts == 0 ||
(partidx = get_partition_for_tuple(dispatch->key,
dispatch->partdesc,
values, isnull)) < 0)
(partidx = get_partition_for_tuple(dispatch, values, isnull)) < 0)
{
char *val_desc;
@ -1344,12 +1341,12 @@ FormPartitionKeyDatum(PartitionDispatch pd,
* found or -1 if none found.
*/
static int
get_partition_for_tuple(PartitionKey key,
PartitionDesc partdesc,
Datum *values, bool *isnull)
get_partition_for_tuple(PartitionDispatch pd, Datum *values, bool *isnull)
{
int bound_offset;
int part_index = -1;
PartitionKey key = pd->key;
PartitionDesc partdesc = pd->partdesc;
PartitionBoundInfo boundinfo = partdesc->boundinfo;
/* Route as appropriate based on partitioning strategy. */
@ -1441,165 +1438,6 @@ get_partition_for_tuple(PartitionKey key,
return part_index;
}
/*
* ExecGetLeafPartitionForKey
* Finds the leaf partition of partitioned table 'root_rel' that would
* contain the specified key tuple.
*
* A subset of the table's columns (including all of the partition key columns)
* must be specified:
* - 'key_natts' indicats the number of columns contained in the key
* - 'key_attnums' indicates their attribute numbers as defined in 'root_rel'
* - 'key_vals' and 'key_nulls' specify the key tuple
*
* Returns the leaf partition, locked with the given lockmode, or NULL if
* there isn't one. Caller is responsibly for closing it. All intermediate
* partitions are also locked with the same lockmode. Caller must have locked
* the root already.
*
* In addition, the OID of the index of a unique constraint on the root table
* must be given as 'root_idxoid'; *leaf_idxoid will be set to the OID of the
* corresponding index on the returned leaf partition. (This can be used by
* caller to search for a tuple matching the key in the leaf partition.)
*
* This works because the unique key defined on the root relation is required
* to contain the partition key columns of all of the ancestors that lead up to
* a given leaf partition.
*/
Relation
ExecGetLeafPartitionForKey(Relation root_rel, int key_natts,
const AttrNumber *key_attnums,
Datum *key_vals, char *key_nulls,
Oid root_idxoid, int lockmode,
Oid *leaf_idxoid)
{
Relation found_leafpart = NULL;
Relation rel = root_rel;
Oid constr_idxoid = root_idxoid;
PartitionDirectory partdir;
Assert(root_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
*leaf_idxoid = InvalidOid;
partdir = CreatePartitionDirectory(CurrentMemoryContext, true);
/*
* Descend through partitioned parents to find the leaf partition that
* would accept a row with the provided key values, starting with the root
* parent.
*/
for (;;)
{
PartitionKey partkey = RelationGetPartitionKey(rel);
PartitionDesc partdesc;
Datum partkey_vals[PARTITION_MAX_KEYS];
bool partkey_isnull[PARTITION_MAX_KEYS];
AttrNumber *root_partattrs = partkey->partattrs;
int found_att;
int partidx;
Oid partoid;
CHECK_FOR_INTERRUPTS();
/*
* Collect partition key values from the unique key.
*
* Because we only have the root table's copy of pk_attnums, must map
* any non-root table's partition key attribute numbers to the root
* table's.
*/
if (rel != root_rel)
{
/*
* map->attnums will contain root table attribute numbers for each
* attribute of the current partitioned relation.
*/
AttrMap *map;
map = build_attrmap_by_name_if_req(RelationGetDescr(root_rel),
RelationGetDescr(rel));
if (map)
{
root_partattrs = palloc(partkey->partnatts *
sizeof(AttrNumber));
for (int att = 0; att < partkey->partnatts; att++)
{
AttrNumber partattno = partkey->partattrs[att];
root_partattrs[att] = map->attnums[partattno - 1];
}
free_attrmap(map);
}
}
/*
* Map the values/isnulls to match the partition description, as
* necessary.
*
* (Referenced key specification does not allow expressions, so there
* would not be expressions in the partition keys either.)
*/
Assert(partkey->partexprs == NIL);
found_att = 0;
for (int keyatt = 0; keyatt < key_natts; keyatt++)
{
for (int att = 0; att < partkey->partnatts; att++)
{
if (root_partattrs[att] == key_attnums[keyatt])
{
partkey_vals[found_att] = key_vals[keyatt];
partkey_isnull[found_att] = (key_nulls[keyatt] == 'n');
found_att++;
break;
}
}
}
/* We had better have found values for all partition keys */
Assert(found_att == partkey->partnatts);
if (root_partattrs != partkey->partattrs)
pfree(root_partattrs);
/* Get the PartitionDesc using the partition directory machinery. */
partdesc = PartitionDirectoryLookup(partdir, rel);
if (partdesc->nparts == 0)
break;
/* Find the partition for the key. */
partidx = get_partition_for_tuple(partkey, partdesc,
partkey_vals, partkey_isnull);
Assert(partidx < 0 || partidx < partdesc->nparts);
/* close the previous parent if any, but keep lock */
if (rel != root_rel)
table_close(rel, NoLock);
/* No partition found. */
if (partidx < 0)
break;
partoid = partdesc->oids[partidx];
rel = table_open(partoid, lockmode);
constr_idxoid = index_get_partition(rel, constr_idxoid);
/*
* We're done if the partition is a leaf, else find its partition in
* the next iteration.
*/
if (partdesc->is_leaf[partidx])
{
*leaf_idxoid = constr_idxoid;
found_leafpart = rel;
break;
}
}
DestroyPartitionDirectory(partdir);
return found_leafpart;
}
/*
* ExecBuildSlotPartitionKeyDescription
*

View File

@ -79,7 +79,10 @@ lnext:
Datum datum;
bool isNull;
ItemPointerData tid;
TM_FailureData tmfd;
LockTupleMode lockmode;
int lockflags = 0;
TM_Result test;
TupleTableSlot *markSlot;
/* clear any leftover test tuple for this rel */
@ -176,11 +179,74 @@ lnext:
break;
}
/* skip tuple if it couldn't be locked */
if (!ExecLockTableTuple(erm->relation, &tid, markSlot,
estate->es_snapshot, estate->es_output_cid,
lockmode, erm->waitPolicy, &epq_needed))
goto lnext;
lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
if (!IsolationUsesXactSnapshot())
lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
test = table_tuple_lock(erm->relation, &tid, estate->es_snapshot,
markSlot, estate->es_output_cid,
lockmode, erm->waitPolicy,
lockflags,
&tmfd);
switch (test)
{
case TM_WouldBlock:
/* couldn't lock tuple in SKIP LOCKED mode */
goto lnext;
case TM_SelfModified:
/*
* The target tuple was already updated or deleted by the
* current command, or by a later command in the current
* transaction. We *must* ignore the tuple in the former
* case, so as to avoid the "Halloween problem" of repeated
* update attempts. In the latter case it might be sensible
* to fetch the updated tuple instead, but doing so would
* require changing heap_update and heap_delete to not
* complain about updating "invisible" tuples, which seems
* pretty scary (table_tuple_lock will not complain, but few
* callers expect TM_Invisible, and we're not one of them). So
* for now, treat the tuple as deleted and do not process.
*/
goto lnext;
case TM_Ok:
/*
* Got the lock successfully, the locked tuple saved in
* markSlot for, if needed, EvalPlanQual testing below.
*/
if (tmfd.traversed)
epq_needed = true;
break;
case TM_Updated:
if (IsolationUsesXactSnapshot())
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
elog(ERROR, "unexpected table_tuple_lock status: %u",
test);
break;
case TM_Deleted:
if (IsolationUsesXactSnapshot())
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
/* tuple was deleted so don't return it */
goto lnext;
case TM_Invisible:
elog(ERROR, "attempted to lock invisible tuple");
break;
default:
elog(ERROR, "unrecognized table_tuple_lock status: %u",
test);
}
/* Remember locked tuple's TID for EPQ testing and WHERE CURRENT OF */
erm->curCtid = tid;
@ -215,91 +281,6 @@ lnext:
return slot;
}
/*
* ExecLockTableTuple
* Locks tuple with the specified TID in lockmode following given wait
* policy
*
* Returns true if the tuple was successfully locked. Locked tuple is loaded
* into provided slot.
*/
bool
ExecLockTableTuple(Relation relation, ItemPointer tid, TupleTableSlot *slot,
Snapshot snapshot, CommandId cid,
LockTupleMode lockmode, LockWaitPolicy waitPolicy,
bool *epq_needed)
{
TM_FailureData tmfd;
int lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
TM_Result test;
if (!IsolationUsesXactSnapshot())
lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
test = table_tuple_lock(relation, tid, snapshot, slot, cid, lockmode,
waitPolicy, lockflags, &tmfd);
switch (test)
{
case TM_WouldBlock:
/* couldn't lock tuple in SKIP LOCKED mode */
return false;
case TM_SelfModified:
/*
* The target tuple was already updated or deleted by the current
* command, or by a later command in the current transaction. We
* *must* ignore the tuple in the former case, so as to avoid the
* "Halloween problem" of repeated update attempts. In the latter
* case it might be sensible to fetch the updated tuple instead,
* but doing so would require changing heap_update and heap_delete
* to not complain about updating "invisible" tuples, which seems
* pretty scary (table_tuple_lock will not complain, but few
* callers expect TM_Invisible, and we're not one of them). So for
* now, treat the tuple as deleted and do not process.
*/
return false;
case TM_Ok:
/*
* Got the lock successfully, the locked tuple saved in slot for
* EvalPlanQual, if asked by the caller.
*/
if (tmfd.traversed && epq_needed)
*epq_needed = true;
break;
case TM_Updated:
if (IsolationUsesXactSnapshot())
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
elog(ERROR, "unexpected table_tuple_lock status: %u",
test);
break;
case TM_Deleted:
if (IsolationUsesXactSnapshot())
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
/* tuple was deleted so don't return it */
return false;
case TM_Invisible:
elog(ERROR, "attempted to lock invisible tuple");
return false;
default:
elog(ERROR, "unrecognized table_tuple_lock status: %u", test);
return false;
}
return true;
}
/* ----------------------------------------------------------------
* ExecInitLockRows
*