mirror of
				https://github.com/postgres/postgres.git
				synced 2025-11-03 09:13:20 +03:00 
			
		
		
		
	Commit 2367503177 fixed this in RelationFindReplTupleByIndex(), but I
missed two other similar cases.
Per report from Ranier Vilela.
Discussion: https://www.postgresql.org/message-id/CAEudQArUT1dE45WN87F-Gb7XMy_hW6x1DFd3sqdhhxP-RMDa0Q@mail.gmail.com
Backpatch-through: 13
		
	
		
			
				
	
	
		
			712 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			712 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
/*-------------------------------------------------------------------------
 | 
						|
 *
 | 
						|
 * execReplication.c
 | 
						|
 *	  miscellaneous executor routines for logical replication
 | 
						|
 *
 | 
						|
 * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
 | 
						|
 * Portions Copyright (c) 1994, Regents of the University of California
 | 
						|
 *
 | 
						|
 * IDENTIFICATION
 | 
						|
 *	  src/backend/executor/execReplication.c
 | 
						|
 *
 | 
						|
 *-------------------------------------------------------------------------
 | 
						|
 */
 | 
						|
 | 
						|
#include "postgres.h"
 | 
						|
 | 
						|
#include "access/genam.h"
 | 
						|
#include "access/relscan.h"
 | 
						|
#include "access/tableam.h"
 | 
						|
#include "access/transam.h"
 | 
						|
#include "access/xact.h"
 | 
						|
#include "commands/trigger.h"
 | 
						|
#include "executor/executor.h"
 | 
						|
#include "executor/nodeModifyTable.h"
 | 
						|
#include "nodes/nodeFuncs.h"
 | 
						|
#include "parser/parse_relation.h"
 | 
						|
#include "parser/parsetree.h"
 | 
						|
#include "replication/logicalrelation.h"
 | 
						|
#include "storage/bufmgr.h"
 | 
						|
#include "storage/lmgr.h"
 | 
						|
#include "utils/builtins.h"
 | 
						|
#include "utils/datum.h"
 | 
						|
#include "utils/lsyscache.h"
 | 
						|
#include "utils/memutils.h"
 | 
						|
#include "utils/rel.h"
 | 
						|
#include "utils/snapmgr.h"
 | 
						|
#include "utils/syscache.h"
 | 
						|
#include "utils/typcache.h"
 | 
						|
 | 
						|
 | 
						|
static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
 | 
						|
						 TypeCacheEntry **eq);
 | 
						|
 | 
						|
/*
 | 
						|
 * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
 | 
						|
 * is setup to match 'rel' (*NOT* idxrel!).
 | 
						|
 *
 | 
						|
 * Returns how many columns to use for the index scan.
 | 
						|
 *
 | 
						|
 * This is not generic routine, idxrel must be PK, RI, or an index that can be
 | 
						|
 * used for REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
 | 
						|
 * for details.
 | 
						|
 *
 | 
						|
 * By definition, replication identity of a rel meets all limitations associated
 | 
						|
 * with that. Note that any other index could also meet these limitations.
 | 
						|
 */
 | 
						|
static int
 | 
						|
build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 | 
						|
						 TupleTableSlot *searchslot)
 | 
						|
{
 | 
						|
	int			index_attoff;
 | 
						|
	int			skey_attoff = 0;
 | 
						|
	Datum		indclassDatum;
 | 
						|
	oidvector  *opclass;
 | 
						|
	int2vector *indkey = &idxrel->rd_index->indkey;
 | 
						|
 | 
						|
	indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
 | 
						|
										   Anum_pg_index_indclass);
 | 
						|
	opclass = (oidvector *) DatumGetPointer(indclassDatum);
 | 
						|
 | 
						|
	/* Build scankey for every non-expression attribute in the index. */
 | 
						|
	for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
 | 
						|
		 index_attoff++)
 | 
						|
	{
 | 
						|
		Oid			operator;
 | 
						|
		Oid			optype;
 | 
						|
		Oid			opfamily;
 | 
						|
		RegProcedure regop;
 | 
						|
		int			table_attno = indkey->values[index_attoff];
 | 
						|
 | 
						|
		if (!AttributeNumberIsValid(table_attno))
 | 
						|
		{
 | 
						|
			/*
 | 
						|
			 * XXX: Currently, we don't support expressions in the scan key,
 | 
						|
			 * see code below.
 | 
						|
			 */
 | 
						|
			continue;
 | 
						|
		}
 | 
						|
 | 
						|
		/*
 | 
						|
		 * Load the operator info.  We need this to get the equality operator
 | 
						|
		 * function for the scan key.
 | 
						|
		 */
 | 
						|
		optype = get_opclass_input_type(opclass->values[index_attoff]);
 | 
						|
		opfamily = get_opclass_family(opclass->values[index_attoff]);
 | 
						|
 | 
						|
		operator = get_opfamily_member(opfamily, optype,
 | 
						|
									   optype,
 | 
						|
									   BTEqualStrategyNumber);
 | 
						|
		if (!OidIsValid(operator))
 | 
						|
			elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
 | 
						|
				 BTEqualStrategyNumber, optype, optype, opfamily);
 | 
						|
 | 
						|
		regop = get_opcode(operator);
 | 
						|
 | 
						|
		/* Initialize the scankey. */
 | 
						|
		ScanKeyInit(&skey[skey_attoff],
 | 
						|
					index_attoff + 1,
 | 
						|
					BTEqualStrategyNumber,
 | 
						|
					regop,
 | 
						|
					searchslot->tts_values[table_attno - 1]);
 | 
						|
 | 
						|
		skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
 | 
						|
 | 
						|
		/* Check for null value. */
 | 
						|
		if (searchslot->tts_isnull[table_attno - 1])
 | 
						|
			skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
 | 
						|
 | 
						|
		skey_attoff++;
 | 
						|
	}
 | 
						|
 | 
						|
	/* There must always be at least one attribute for the index scan. */
 | 
						|
	Assert(skey_attoff > 0);
 | 
						|
 | 
						|
	return skey_attoff;
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 * Search the relation 'rel' for tuple using the index.
 | 
						|
 *
 | 
						|
 * If a matching tuple is found, lock it with lockmode, fill the slot with its
 | 
						|
 * contents, and return true.  Return false otherwise.
 | 
						|
 */
 | 
						|
bool
 | 
						|
RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 | 
						|
							 LockTupleMode lockmode,
 | 
						|
							 TupleTableSlot *searchslot,
 | 
						|
							 TupleTableSlot *outslot)
 | 
						|
{
 | 
						|
	ScanKeyData skey[INDEX_MAX_KEYS];
 | 
						|
	int			skey_attoff;
 | 
						|
	IndexScanDesc scan;
 | 
						|
	SnapshotData snap;
 | 
						|
	TransactionId xwait;
 | 
						|
	Relation	idxrel;
 | 
						|
	bool		found;
 | 
						|
	TypeCacheEntry **eq = NULL;
 | 
						|
	bool		isIdxSafeToSkipDuplicates;
 | 
						|
 | 
						|
	/* Open the index. */
 | 
						|
	idxrel = index_open(idxoid, RowExclusiveLock);
 | 
						|
 | 
						|
	isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
 | 
						|
 | 
						|
	InitDirtySnapshot(snap);
 | 
						|
 | 
						|
	/* Build scan key. */
 | 
						|
	skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
 | 
						|
 | 
						|
	/* Start an index scan. */
 | 
						|
	scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
 | 
						|
 | 
						|
retry:
 | 
						|
	found = false;
 | 
						|
 | 
						|
	index_rescan(scan, skey, skey_attoff, NULL, 0);
 | 
						|
 | 
						|
	/* Try to find the tuple */
 | 
						|
	while (index_getnext_slot(scan, ForwardScanDirection, outslot))
 | 
						|
	{
 | 
						|
		/*
 | 
						|
		 * Avoid expensive equality check if the index is primary key or
 | 
						|
		 * replica identity index.
 | 
						|
		 */
 | 
						|
		if (!isIdxSafeToSkipDuplicates)
 | 
						|
		{
 | 
						|
			if (eq == NULL)
 | 
						|
				eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
 | 
						|
 | 
						|
			if (!tuples_equal(outslot, searchslot, eq))
 | 
						|
				continue;
 | 
						|
		}
 | 
						|
 | 
						|
		ExecMaterializeSlot(outslot);
 | 
						|
 | 
						|
		xwait = TransactionIdIsValid(snap.xmin) ?
 | 
						|
			snap.xmin : snap.xmax;
 | 
						|
 | 
						|
		/*
 | 
						|
		 * If the tuple is locked, wait for locking transaction to finish and
 | 
						|
		 * retry.
 | 
						|
		 */
 | 
						|
		if (TransactionIdIsValid(xwait))
 | 
						|
		{
 | 
						|
			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
 | 
						|
			goto retry;
 | 
						|
		}
 | 
						|
 | 
						|
		/* Found our tuple and it's not locked */
 | 
						|
		found = true;
 | 
						|
		break;
 | 
						|
	}
 | 
						|
 | 
						|
	/* Found tuple, try to lock it in the lockmode. */
 | 
						|
	if (found)
 | 
						|
	{
 | 
						|
		TM_FailureData tmfd;
 | 
						|
		TM_Result	res;
 | 
						|
 | 
						|
		PushActiveSnapshot(GetLatestSnapshot());
 | 
						|
 | 
						|
		res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
 | 
						|
							   outslot,
 | 
						|
							   GetCurrentCommandId(false),
 | 
						|
							   lockmode,
 | 
						|
							   LockWaitBlock,
 | 
						|
							   0 /* don't follow updates */ ,
 | 
						|
							   &tmfd);
 | 
						|
 | 
						|
		PopActiveSnapshot();
 | 
						|
 | 
						|
		switch (res)
 | 
						|
		{
 | 
						|
			case TM_Ok:
 | 
						|
				break;
 | 
						|
			case TM_Updated:
 | 
						|
				/* XXX: Improve handling here */
 | 
						|
				if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
 | 
						|
					ereport(LOG,
 | 
						|
							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 | 
						|
							 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
 | 
						|
				else
 | 
						|
					ereport(LOG,
 | 
						|
							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 | 
						|
							 errmsg("concurrent update, retrying")));
 | 
						|
				goto retry;
 | 
						|
			case TM_Deleted:
 | 
						|
				/* XXX: Improve handling here */
 | 
						|
				ereport(LOG,
 | 
						|
						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 | 
						|
						 errmsg("concurrent delete, retrying")));
 | 
						|
				goto retry;
 | 
						|
			case TM_Invisible:
 | 
						|
				elog(ERROR, "attempted to lock invisible tuple");
 | 
						|
				break;
 | 
						|
			default:
 | 
						|
				elog(ERROR, "unexpected table_tuple_lock status: %u", res);
 | 
						|
				break;
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	index_endscan(scan);
 | 
						|
 | 
						|
	/* Don't release lock until commit. */
 | 
						|
	index_close(idxrel, NoLock);
 | 
						|
 | 
						|
	return found;
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 * Compare the tuples in the slots by checking if they have equal values.
 | 
						|
 */
 | 
						|
static bool
 | 
						|
tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
 | 
						|
			 TypeCacheEntry **eq)
 | 
						|
{
 | 
						|
	int			attrnum;
 | 
						|
 | 
						|
	Assert(slot1->tts_tupleDescriptor->natts ==
 | 
						|
		   slot2->tts_tupleDescriptor->natts);
 | 
						|
 | 
						|
	slot_getallattrs(slot1);
 | 
						|
	slot_getallattrs(slot2);
 | 
						|
 | 
						|
	/* Check equality of the attributes. */
 | 
						|
	for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
 | 
						|
	{
 | 
						|
		Form_pg_attribute att;
 | 
						|
		TypeCacheEntry *typentry;
 | 
						|
 | 
						|
		att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
 | 
						|
 | 
						|
		/*
 | 
						|
		 * Ignore dropped and generated columns as the publisher doesn't send
 | 
						|
		 * those
 | 
						|
		 */
 | 
						|
		if (att->attisdropped || att->attgenerated)
 | 
						|
			continue;
 | 
						|
 | 
						|
		/*
 | 
						|
		 * If one value is NULL and other is not, then they are certainly not
 | 
						|
		 * equal
 | 
						|
		 */
 | 
						|
		if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
 | 
						|
			return false;
 | 
						|
 | 
						|
		/*
 | 
						|
		 * If both are NULL, they can be considered equal.
 | 
						|
		 */
 | 
						|
		if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
 | 
						|
			continue;
 | 
						|
 | 
						|
		typentry = eq[attrnum];
 | 
						|
		if (typentry == NULL)
 | 
						|
		{
 | 
						|
			typentry = lookup_type_cache(att->atttypid,
 | 
						|
										 TYPECACHE_EQ_OPR_FINFO);
 | 
						|
			if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
 | 
						|
				ereport(ERROR,
 | 
						|
						(errcode(ERRCODE_UNDEFINED_FUNCTION),
 | 
						|
						 errmsg("could not identify an equality operator for type %s",
 | 
						|
								format_type_be(att->atttypid))));
 | 
						|
			eq[attrnum] = typentry;
 | 
						|
		}
 | 
						|
 | 
						|
		if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
 | 
						|
											att->attcollation,
 | 
						|
											slot1->tts_values[attrnum],
 | 
						|
											slot2->tts_values[attrnum])))
 | 
						|
			return false;
 | 
						|
	}
 | 
						|
 | 
						|
	return true;
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 * Search the relation 'rel' for tuple using the sequential scan.
 | 
						|
 *
 | 
						|
 * If a matching tuple is found, lock it with lockmode, fill the slot with its
 | 
						|
 * contents, and return true.  Return false otherwise.
 | 
						|
 *
 | 
						|
 * Note that this stops on the first matching tuple.
 | 
						|
 *
 | 
						|
 * This can obviously be quite slow on tables that have more than few rows.
 | 
						|
 */
 | 
						|
bool
 | 
						|
RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 | 
						|
						 TupleTableSlot *searchslot, TupleTableSlot *outslot)
 | 
						|
{
 | 
						|
	TupleTableSlot *scanslot;
 | 
						|
	TableScanDesc scan;
 | 
						|
	SnapshotData snap;
 | 
						|
	TypeCacheEntry **eq;
 | 
						|
	TransactionId xwait;
 | 
						|
	bool		found;
 | 
						|
	TupleDesc	desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
 | 
						|
 | 
						|
	Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
 | 
						|
 | 
						|
	eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
 | 
						|
 | 
						|
	/* Start a heap scan. */
 | 
						|
	InitDirtySnapshot(snap);
 | 
						|
	scan = table_beginscan(rel, &snap, 0, NULL);
 | 
						|
	scanslot = table_slot_create(rel, NULL);
 | 
						|
 | 
						|
retry:
 | 
						|
	found = false;
 | 
						|
 | 
						|
	table_rescan(scan, NULL);
 | 
						|
 | 
						|
	/* Try to find the tuple */
 | 
						|
	while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
 | 
						|
	{
 | 
						|
		if (!tuples_equal(scanslot, searchslot, eq))
 | 
						|
			continue;
 | 
						|
 | 
						|
		found = true;
 | 
						|
		ExecCopySlot(outslot, scanslot);
 | 
						|
 | 
						|
		xwait = TransactionIdIsValid(snap.xmin) ?
 | 
						|
			snap.xmin : snap.xmax;
 | 
						|
 | 
						|
		/*
 | 
						|
		 * If the tuple is locked, wait for locking transaction to finish and
 | 
						|
		 * retry.
 | 
						|
		 */
 | 
						|
		if (TransactionIdIsValid(xwait))
 | 
						|
		{
 | 
						|
			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
 | 
						|
			goto retry;
 | 
						|
		}
 | 
						|
 | 
						|
		/* Found our tuple and it's not locked */
 | 
						|
		break;
 | 
						|
	}
 | 
						|
 | 
						|
	/* Found tuple, try to lock it in the lockmode. */
 | 
						|
	if (found)
 | 
						|
	{
 | 
						|
		TM_FailureData tmfd;
 | 
						|
		TM_Result	res;
 | 
						|
 | 
						|
		PushActiveSnapshot(GetLatestSnapshot());
 | 
						|
 | 
						|
		res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
 | 
						|
							   outslot,
 | 
						|
							   GetCurrentCommandId(false),
 | 
						|
							   lockmode,
 | 
						|
							   LockWaitBlock,
 | 
						|
							   0 /* don't follow updates */ ,
 | 
						|
							   &tmfd);
 | 
						|
 | 
						|
		PopActiveSnapshot();
 | 
						|
 | 
						|
		switch (res)
 | 
						|
		{
 | 
						|
			case TM_Ok:
 | 
						|
				break;
 | 
						|
			case TM_Updated:
 | 
						|
				/* XXX: Improve handling here */
 | 
						|
				if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
 | 
						|
					ereport(LOG,
 | 
						|
							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 | 
						|
							 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
 | 
						|
				else
 | 
						|
					ereport(LOG,
 | 
						|
							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 | 
						|
							 errmsg("concurrent update, retrying")));
 | 
						|
				goto retry;
 | 
						|
			case TM_Deleted:
 | 
						|
				/* XXX: Improve handling here */
 | 
						|
				ereport(LOG,
 | 
						|
						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 | 
						|
						 errmsg("concurrent delete, retrying")));
 | 
						|
				goto retry;
 | 
						|
			case TM_Invisible:
 | 
						|
				elog(ERROR, "attempted to lock invisible tuple");
 | 
						|
				break;
 | 
						|
			default:
 | 
						|
				elog(ERROR, "unexpected table_tuple_lock status: %u", res);
 | 
						|
				break;
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	table_endscan(scan);
 | 
						|
	ExecDropSingleTupleTableSlot(scanslot);
 | 
						|
 | 
						|
	return found;
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 * Insert tuple represented in the slot to the relation, update the indexes,
 | 
						|
 * and execute any constraints and per-row triggers.
 | 
						|
 *
 | 
						|
 * Caller is responsible for opening the indexes.
 | 
						|
 */
 | 
						|
void
 | 
						|
ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
 | 
						|
						 EState *estate, TupleTableSlot *slot)
 | 
						|
{
 | 
						|
	bool		skip_tuple = false;
 | 
						|
	Relation	rel = resultRelInfo->ri_RelationDesc;
 | 
						|
 | 
						|
	/* For now we support only tables. */
 | 
						|
	Assert(rel->rd_rel->relkind == RELKIND_RELATION);
 | 
						|
 | 
						|
	CheckCmdReplicaIdentity(rel, CMD_INSERT);
 | 
						|
 | 
						|
	/* BEFORE ROW INSERT Triggers */
 | 
						|
	if (resultRelInfo->ri_TrigDesc &&
 | 
						|
		resultRelInfo->ri_TrigDesc->trig_insert_before_row)
 | 
						|
	{
 | 
						|
		if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
 | 
						|
			skip_tuple = true;	/* "do nothing" */
 | 
						|
	}
 | 
						|
 | 
						|
	if (!skip_tuple)
 | 
						|
	{
 | 
						|
		List	   *recheckIndexes = NIL;
 | 
						|
 | 
						|
		/* Compute stored generated columns */
 | 
						|
		if (rel->rd_att->constr &&
 | 
						|
			rel->rd_att->constr->has_generated_stored)
 | 
						|
			ExecComputeStoredGenerated(resultRelInfo, estate, slot,
 | 
						|
									   CMD_INSERT);
 | 
						|
 | 
						|
		/* Check the constraints of the tuple */
 | 
						|
		if (rel->rd_att->constr)
 | 
						|
			ExecConstraints(resultRelInfo, slot, estate);
 | 
						|
		if (rel->rd_rel->relispartition)
 | 
						|
			ExecPartitionCheck(resultRelInfo, slot, estate, true);
 | 
						|
 | 
						|
		/* OK, store the tuple and create index entries for it */
 | 
						|
		simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
 | 
						|
 | 
						|
		if (resultRelInfo->ri_NumIndices > 0)
 | 
						|
			recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
 | 
						|
												   slot, estate, false, false,
 | 
						|
												   NULL, NIL, false);
 | 
						|
 | 
						|
		/* AFTER ROW INSERT Triggers */
 | 
						|
		ExecARInsertTriggers(estate, resultRelInfo, slot,
 | 
						|
							 recheckIndexes, NULL);
 | 
						|
 | 
						|
		/*
 | 
						|
		 * XXX we should in theory pass a TransitionCaptureState object to the
 | 
						|
		 * above to capture transition tuples, but after statement triggers
 | 
						|
		 * don't actually get fired by replication yet anyway
 | 
						|
		 */
 | 
						|
 | 
						|
		list_free(recheckIndexes);
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 * Find the searchslot tuple and update it with data in the slot,
 | 
						|
 * update the indexes, and execute any constraints and per-row triggers.
 | 
						|
 *
 | 
						|
 * Caller is responsible for opening the indexes.
 | 
						|
 */
 | 
						|
void
 | 
						|
ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
 | 
						|
						 EState *estate, EPQState *epqstate,
 | 
						|
						 TupleTableSlot *searchslot, TupleTableSlot *slot)
 | 
						|
{
 | 
						|
	bool		skip_tuple = false;
 | 
						|
	Relation	rel = resultRelInfo->ri_RelationDesc;
 | 
						|
	ItemPointer tid = &(searchslot->tts_tid);
 | 
						|
 | 
						|
	/*
 | 
						|
	 * We support only non-system tables, with
 | 
						|
	 * check_publication_add_relation() accountable.
 | 
						|
	 */
 | 
						|
	Assert(rel->rd_rel->relkind == RELKIND_RELATION);
 | 
						|
	Assert(!IsCatalogRelation(rel));
 | 
						|
 | 
						|
	CheckCmdReplicaIdentity(rel, CMD_UPDATE);
 | 
						|
 | 
						|
	/* BEFORE ROW UPDATE Triggers */
 | 
						|
	if (resultRelInfo->ri_TrigDesc &&
 | 
						|
		resultRelInfo->ri_TrigDesc->trig_update_before_row)
 | 
						|
	{
 | 
						|
		if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
 | 
						|
								  tid, NULL, slot, NULL, NULL))
 | 
						|
			skip_tuple = true;	/* "do nothing" */
 | 
						|
	}
 | 
						|
 | 
						|
	if (!skip_tuple)
 | 
						|
	{
 | 
						|
		List	   *recheckIndexes = NIL;
 | 
						|
		TU_UpdateIndexes update_indexes;
 | 
						|
 | 
						|
		/* Compute stored generated columns */
 | 
						|
		if (rel->rd_att->constr &&
 | 
						|
			rel->rd_att->constr->has_generated_stored)
 | 
						|
			ExecComputeStoredGenerated(resultRelInfo, estate, slot,
 | 
						|
									   CMD_UPDATE);
 | 
						|
 | 
						|
		/* Check the constraints of the tuple */
 | 
						|
		if (rel->rd_att->constr)
 | 
						|
			ExecConstraints(resultRelInfo, slot, estate);
 | 
						|
		if (rel->rd_rel->relispartition)
 | 
						|
			ExecPartitionCheck(resultRelInfo, slot, estate, true);
 | 
						|
 | 
						|
		simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
 | 
						|
								  &update_indexes);
 | 
						|
 | 
						|
		if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
 | 
						|
			recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
 | 
						|
												   slot, estate, true, false,
 | 
						|
												   NULL, NIL,
 | 
						|
												   (update_indexes == TU_Summarizing));
 | 
						|
 | 
						|
		/* AFTER ROW UPDATE Triggers */
 | 
						|
		ExecARUpdateTriggers(estate, resultRelInfo,
 | 
						|
							 NULL, NULL,
 | 
						|
							 tid, NULL, slot,
 | 
						|
							 recheckIndexes, NULL, false);
 | 
						|
 | 
						|
		list_free(recheckIndexes);
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 * Find the searchslot tuple and delete it, and execute any constraints
 | 
						|
 * and per-row triggers.
 | 
						|
 *
 | 
						|
 * Caller is responsible for opening the indexes.
 | 
						|
 */
 | 
						|
void
 | 
						|
ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
 | 
						|
						 EState *estate, EPQState *epqstate,
 | 
						|
						 TupleTableSlot *searchslot)
 | 
						|
{
 | 
						|
	bool		skip_tuple = false;
 | 
						|
	Relation	rel = resultRelInfo->ri_RelationDesc;
 | 
						|
	ItemPointer tid = &searchslot->tts_tid;
 | 
						|
 | 
						|
	CheckCmdReplicaIdentity(rel, CMD_DELETE);
 | 
						|
 | 
						|
	/* BEFORE ROW DELETE Triggers */
 | 
						|
	if (resultRelInfo->ri_TrigDesc &&
 | 
						|
		resultRelInfo->ri_TrigDesc->trig_delete_before_row)
 | 
						|
	{
 | 
						|
		skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
 | 
						|
										   tid, NULL, NULL, NULL, NULL);
 | 
						|
	}
 | 
						|
 | 
						|
	if (!skip_tuple)
 | 
						|
	{
 | 
						|
		/* OK, delete the tuple */
 | 
						|
		simple_table_tuple_delete(rel, tid, estate->es_snapshot);
 | 
						|
 | 
						|
		/* AFTER ROW DELETE Triggers */
 | 
						|
		ExecARDeleteTriggers(estate, resultRelInfo,
 | 
						|
							 tid, NULL, NULL, false);
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 * Check if command can be executed with current replica identity.
 | 
						|
 */
 | 
						|
void
 | 
						|
CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
 | 
						|
{
 | 
						|
	PublicationDesc pubdesc;
 | 
						|
 | 
						|
	/*
 | 
						|
	 * Skip checking the replica identity for partitioned tables, because the
 | 
						|
	 * operations are actually performed on the leaf partitions.
 | 
						|
	 */
 | 
						|
	if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 | 
						|
		return;
 | 
						|
 | 
						|
	/* We only need to do checks for UPDATE and DELETE. */
 | 
						|
	if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
 | 
						|
		return;
 | 
						|
 | 
						|
	/*
 | 
						|
	 * It is only safe to execute UPDATE/DELETE when all columns, referenced
 | 
						|
	 * in the row filters from publications which the relation is in, are
 | 
						|
	 * valid - i.e. when all referenced columns are part of REPLICA IDENTITY
 | 
						|
	 * or the table does not publish UPDATEs or DELETEs.
 | 
						|
	 *
 | 
						|
	 * XXX We could optimize it by first checking whether any of the
 | 
						|
	 * publications have a row filter for this relation. If not and relation
 | 
						|
	 * has replica identity then we can avoid building the descriptor but as
 | 
						|
	 * this happens only one time it doesn't seem worth the additional
 | 
						|
	 * complexity.
 | 
						|
	 */
 | 
						|
	RelationBuildPublicationDesc(rel, &pubdesc);
 | 
						|
	if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
 | 
						|
		ereport(ERROR,
 | 
						|
				(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
 | 
						|
				 errmsg("cannot update table \"%s\"",
 | 
						|
						RelationGetRelationName(rel)),
 | 
						|
				 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
 | 
						|
	else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
 | 
						|
		ereport(ERROR,
 | 
						|
				(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
 | 
						|
				 errmsg("cannot update table \"%s\"",
 | 
						|
						RelationGetRelationName(rel)),
 | 
						|
				 errdetail("Column list used by the publication does not cover the replica identity.")));
 | 
						|
	else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
 | 
						|
		ereport(ERROR,
 | 
						|
				(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
 | 
						|
				 errmsg("cannot delete from table \"%s\"",
 | 
						|
						RelationGetRelationName(rel)),
 | 
						|
				 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
 | 
						|
	else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
 | 
						|
		ereport(ERROR,
 | 
						|
				(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
 | 
						|
				 errmsg("cannot delete from table \"%s\"",
 | 
						|
						RelationGetRelationName(rel)),
 | 
						|
				 errdetail("Column list used by the publication does not cover the replica identity.")));
 | 
						|
 | 
						|
	/* If relation has replica identity we are always good. */
 | 
						|
	if (OidIsValid(RelationGetReplicaIndex(rel)))
 | 
						|
		return;
 | 
						|
 | 
						|
	/* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
 | 
						|
	if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
 | 
						|
		return;
 | 
						|
 | 
						|
	/*
 | 
						|
	 * This is UPDATE/DELETE and there is no replica identity.
 | 
						|
	 *
 | 
						|
	 * Check if the table publishes UPDATES or DELETES.
 | 
						|
	 */
 | 
						|
	if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
 | 
						|
		ereport(ERROR,
 | 
						|
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 | 
						|
				 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
 | 
						|
						RelationGetRelationName(rel)),
 | 
						|
				 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
 | 
						|
	else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
 | 
						|
		ereport(ERROR,
 | 
						|
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 | 
						|
				 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
 | 
						|
						RelationGetRelationName(rel)),
 | 
						|
				 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
/*
 | 
						|
 * Check if we support writing into specific relkind.
 | 
						|
 *
 | 
						|
 * The nspname and relname are only needed for error reporting.
 | 
						|
 */
 | 
						|
void
 | 
						|
CheckSubscriptionRelkind(char relkind, const char *nspname,
 | 
						|
						 const char *relname)
 | 
						|
{
 | 
						|
	if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
 | 
						|
		ereport(ERROR,
 | 
						|
				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
 | 
						|
				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
 | 
						|
						nspname, relname),
 | 
						|
				 errdetail_relkind_not_supported(relkind)));
 | 
						|
}
 |