1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-16 06:01:02 +03:00

Add the notion of REPLICA IDENTITY for a table.

Pending patches for logical replication will use this to determine
which columns of a tuple ought to be considered as its candidate key.

Andres Freund, with minor, mostly cosmetic adjustments by me
This commit is contained in:
Robert Haas
2013-11-08 12:30:43 -05:00
parent b97ee66cc1
commit 07cacba983
23 changed files with 902 additions and 49 deletions

View File

@ -399,6 +399,7 @@ static void ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
static void drop_parent_dependency(Oid relid, Oid refclassid, Oid refobjid);
static void ATExecAddOf(Relation rel, const TypeName *ofTypename, LOCKMODE lockmode);
static void ATExecDropOf(Relation rel, LOCKMODE lockmode);
static void ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode);
static void ATExecGenericOptions(Relation rel, List *options);
static void copy_relation_data(SMgrRelation rel, SMgrRelation dst,
@ -2809,6 +2810,7 @@ AlterTableGetLockLevel(List *cmds)
case AT_DisableTrigUser:
case AT_AddIndex: /* from ADD CONSTRAINT */
case AT_AddIndexConstraint:
case AT_ReplicaIdentity:
cmd_lockmode = ShareRowExclusiveLock;
break;
@ -3140,6 +3142,12 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
cmd->subtype = AT_ValidateConstraintRecurse;
pass = AT_PASS_MISC;
break;
case AT_ReplicaIdentity: /* REPLICA IDENTITY ... */
ATSimplePermissions(rel, ATT_TABLE | ATT_MATVIEW);
pass = AT_PASS_MISC;
/* This command never recurses */
/* No command-specific prep needed */
break;
case AT_EnableTrig: /* ENABLE TRIGGER variants */
case AT_EnableAlwaysTrig:
case AT_EnableReplicaTrig:
@ -3440,6 +3448,9 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
case AT_DropOf:
ATExecDropOf(rel, lockmode);
break;
case AT_ReplicaIdentity:
ATExecReplicaIdentity(rel, (ReplicaIdentityStmt *) cmd->def, lockmode);
break;
case AT_GenericOptions:
ATExecGenericOptions(rel, (List *) cmd->def);
break;
@ -10009,6 +10020,217 @@ ATExecDropOf(Relation rel, LOCKMODE lockmode)
heap_close(relationRelation, RowExclusiveLock);
}
/*
* relation_mark_replica_identity: Update a table's replica identity
*
* Iff ri_type = REPLICA_IDENTITY_INDEX, indexOid must be the Oid of a suitable
* index. Otherwise, it should be InvalidOid.
*/
static void
relation_mark_replica_identity(Relation rel, char ri_type, Oid indexOid,
bool is_internal)
{
Relation pg_index;
Relation pg_class;
HeapTuple pg_class_tuple;
HeapTuple pg_index_tuple;
Form_pg_class pg_class_form;
Form_pg_index pg_index_form;
ListCell *index;
/*
* Check whether relreplident has changed, and update it if so.
*/
pg_class = heap_open(RelationRelationId, RowExclusiveLock);
pg_class_tuple = SearchSysCacheCopy1(RELOID,
ObjectIdGetDatum(RelationGetRelid(rel)));
if (!HeapTupleIsValid(pg_class_tuple))
elog(ERROR, "cache lookup failed for relation \"%s\"",
RelationGetRelationName(rel));
pg_class_form = (Form_pg_class) GETSTRUCT(pg_class_tuple);
if (pg_class_form->relreplident != ri_type)
{
pg_class_form->relreplident = ri_type;
simple_heap_update(pg_class, &pg_class_tuple->t_self, pg_class_tuple);
CatalogUpdateIndexes(pg_class, pg_class_tuple);
}
heap_close(pg_class, RowExclusiveLock);
heap_freetuple(pg_class_tuple);
/*
* Check whether the correct index is marked indisreplident; if so, we're
* done.
*/
if (OidIsValid(indexOid))
{
Assert(ri_type == REPLICA_IDENTITY_INDEX);
pg_index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexOid));
if (!HeapTupleIsValid(pg_index_tuple))
elog(ERROR, "cache lookup failed for index %u", indexOid);
pg_index_form = (Form_pg_index) GETSTRUCT(pg_index_tuple);
if (pg_index_form->indisreplident)
{
ReleaseSysCache(pg_index_tuple);
return;
}
ReleaseSysCache(pg_index_tuple);
}
/*
* Clear the indisreplident flag from any index that had it previously, and
* set it for any index that should have it now.
*/
pg_index = heap_open(IndexRelationId, RowExclusiveLock);
foreach(index, RelationGetIndexList(rel))
{
Oid thisIndexOid = lfirst_oid(index);
bool dirty = false;
pg_index_tuple = SearchSysCacheCopy1(INDEXRELID,
ObjectIdGetDatum(thisIndexOid));
if (!HeapTupleIsValid(pg_index_tuple))
elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
pg_index_form = (Form_pg_index) GETSTRUCT(pg_index_tuple);
/*
* Unset the bit if set. We know it's wrong because we checked this
* earlier.
*/
if (pg_index_form->indisreplident)
{
dirty = true;
pg_index_form->indisreplident = false;
}
else if (thisIndexOid == indexOid)
{
dirty = true;
pg_index_form->indisreplident = true;
}
if (dirty)
{
simple_heap_update(pg_index, &pg_index_tuple->t_self, pg_index_tuple);
CatalogUpdateIndexes(pg_index, pg_index_tuple);
InvokeObjectPostAlterHookArg(IndexRelationId, thisIndexOid, 0,
InvalidOid, is_internal);
}
heap_freetuple(pg_index_tuple);
}
heap_close(pg_index, RowExclusiveLock);
}
/*
* ALTER TABLE <name> REPLICA IDENTITY ...
*/
static void
ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode)
{
Oid indexOid;
Relation indexRel;
int key;
if (stmt->identity_type == REPLICA_IDENTITY_DEFAULT)
{
relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true);
return;
}
else if (stmt->identity_type == REPLICA_IDENTITY_FULL)
{
relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true);
return;
}
else if (stmt->identity_type == REPLICA_IDENTITY_NOTHING)
{
relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true);
return;
}
else if (stmt->identity_type == REPLICA_IDENTITY_INDEX)
{
/* fallthrough */;
}
else
elog(ERROR, "unexpected identity type %u", stmt->identity_type);
/* Check that the index exists */
indexOid = get_relname_relid(stmt->name, rel->rd_rel->relnamespace);
if (!OidIsValid(indexOid))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("index \"%s\" for table \"%s\" does not exist",
stmt->name, RelationGetRelationName(rel))));
indexRel = index_open(indexOid, ShareLock);
/* Check that the index is on the relation we're altering. */
if (indexRel->rd_index == NULL ||
indexRel->rd_index->indrelid != RelationGetRelid(rel))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not an index for table \"%s\"",
RelationGetRelationName(indexRel),
RelationGetRelationName(rel))));
/* The AM must support uniqueness, and the index must in fact be unique. */
if (!indexRel->rd_am->amcanunique || !indexRel->rd_index->indisunique)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot use non-unique index \"%s\" as replica identity",
RelationGetRelationName(indexRel))));
/* Deferred indexes are not guaranteed to be always unique. */
if (!indexRel->rd_index->indimmediate)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use non-immediate index \"%s\" as replica identity",
RelationGetRelationName(indexRel))));
/* Expression indexes aren't supported. */
if (RelationGetIndexExpressions(indexRel) != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use expression index \"%s\" as replica identity",
RelationGetRelationName(indexRel))));
/* Predicate indexes aren't supported. */
if (RelationGetIndexPredicate(indexRel) != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use partial index \"%s\" as replica identity",
RelationGetRelationName(indexRel))));
/* And neither are invalid indexes. */
if (!IndexIsValid(indexRel->rd_index))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use invalid index \"%s\" as replica identity",
RelationGetRelationName(indexRel))));
/* Check index for nullable columns. */
for (key = 0; key < indexRel->rd_index->indnatts; key++)
{
int16 attno = indexRel->rd_index->indkey.values[key];
Form_pg_attribute attr;
/* Of the system columns, only oid is indexable. */
if (attno <= 0 && attno != ObjectIdAttributeNumber)
elog(ERROR, "internal column %u in unique index \"%s\"",
attno, RelationGetRelationName(indexRel));
attr = rel->rd_att->attrs[attno - 1];
if (!attr->attnotnull)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("index \"%s\" cannot be used as replica identity because column \"%s\" is nullable",
RelationGetRelationName(indexRel),
NameStr(attr->attname))));
}
/* This index is suitable for use as a replica identity. Mark it. */
relation_mark_replica_identity(rel, stmt->identity_type, indexOid, true);
index_close(indexRel, NoLock);
}
/*
* ALTER FOREIGN TABLE <name> OPTIONS (...)
*/