1
0
mirror of https://github.com/postgres/postgres.git synced 2025-12-21 05:21:08 +03:00

Fix assorted bugs related to identity column in partitioned tables

When changing the data type of a column of a partitioned table, craft
the ALTER SEQUENCE command only once.  Partitions do not have identity
sequences of their own and thus do not need a ALTER SEQUENCE command
for each partition.

Fix getIdentitySequence() to fetch the identity sequence associated
with the top-level partitioned table when a Relation of a partition is
passed to it.  While doing so, translate the attribute number of the
partition into the attribute number of the partitioned table.

Author: Ashutosh Bapat <ashutosh.bapat@enterprisedb.com>
Reported-by: Alexander Lakhin <exclusion@gmail.com>
Reviewed-by: Dmitry Dolgov <9erthalion6@gmail.com>
Discussion: https://www.postgresql.org/message-id/3b8a9dc1-bbc7-0ef5-6863-c432afac7d59@gmail.com
This commit is contained in:
Peter Eisentraut
2024-05-07 22:42:32 +02:00
parent 832c4f657f
commit 509199587d
7 changed files with 100 additions and 47 deletions

View File

@@ -23,10 +23,12 @@
#include "catalog/pg_constraint.h"
#include "catalog/pg_depend.h"
#include "catalog/pg_extension.h"
#include "catalog/partition.h"
#include "commands/extension.h"
#include "miscadmin.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "utils/rel.h"
@@ -941,10 +943,35 @@ getOwnedSequences(Oid relid)
* Get owned identity sequence, error if not exactly one.
*/
Oid
getIdentitySequence(Oid relid, AttrNumber attnum, bool missing_ok)
getIdentitySequence(Relation rel, AttrNumber attnum, bool missing_ok)
{
List *seqlist = getOwnedSequences_internal(relid, attnum, DEPENDENCY_INTERNAL);
Oid relid;
List *seqlist;
/*
* The identity sequence is associated with the topmost partitioned table,
* which might have column order different than the given partition.
*/
if (RelationGetForm(rel)->relispartition)
{
List *ancestors =
get_partition_ancestors(RelationGetRelid(rel));
HeapTuple ctup = SearchSysCacheAttNum(RelationGetRelid(rel), attnum);
const char *attname = NameStr(((Form_pg_attribute) GETSTRUCT(ctup))->attname);
HeapTuple ptup;
relid = llast_oid(ancestors);
ptup = SearchSysCacheAttName(relid, attname);
attnum = ((Form_pg_attribute) GETSTRUCT(ptup))->attnum;
ReleaseSysCache(ctup);
ReleaseSysCache(ptup);
list_free(ancestors);
}
else
relid = RelationGetRelid(rel);
seqlist = getOwnedSequences_internal(relid, attnum, DEPENDENCY_INTERNAL);
if (list_length(seqlist) > 1)
elog(ERROR, "more than one owned sequence found");
else if (seqlist == NIL)

View File

@@ -8535,7 +8535,7 @@ ATExecDropIdentity(Relation rel, const char *colName, bool missing_ok, LOCKMODE
if (!recursing)
{
/* drop the internal sequence */
seqid = getIdentitySequence(RelationGetRelid(rel), attnum, false);
seqid = getIdentitySequence(rel, attnum, false);
deleteDependencyRecordsForClass(RelationRelationId, seqid,
RelationRelationId, DEPENDENCY_INTERNAL);
CommandCounterIncrement();

View File

@@ -1136,7 +1136,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
* find sequence owned by old column; extract sequence parameters;
* build new create sequence command
*/
seq_relid = getIdentitySequence(RelationGetRelid(relation), attribute->attnum, false);
seq_relid = getIdentitySequence(relation, attribute->attnum, false);
seq_options = sequence_options(seq_relid);
generateSerialExtraStmts(cxt, def,
InvalidOid, seq_options,
@@ -3716,28 +3716,36 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
/*
* For identity column, create ALTER SEQUENCE command to
* change the data type of the sequence.
* change the data type of the sequence. Identity sequence
* is associated with the top level partitioned table.
* Hence ignore partitions.
*/
attnum = get_attnum(relid, cmd->name);
if (attnum == InvalidAttrNumber)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" of relation \"%s\" does not exist",
cmd->name, RelationGetRelationName(rel))));
if (attnum > 0 &&
TupleDescAttr(tupdesc, attnum - 1)->attidentity)
if (!RelationGetForm(rel)->relispartition)
{
Oid seq_relid = getIdentitySequence(relid, attnum, false);
Oid typeOid = typenameTypeId(pstate, def->typeName);
AlterSeqStmt *altseqstmt = makeNode(AlterSeqStmt);
attnum = get_attnum(relid, cmd->name);
if (attnum == InvalidAttrNumber)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" of relation \"%s\" does not exist",
cmd->name, RelationGetRelationName(rel))));
altseqstmt->sequence = makeRangeVar(get_namespace_name(get_rel_namespace(seq_relid)),
get_rel_name(seq_relid),
-1);
altseqstmt->options = list_make1(makeDefElem("as", (Node *) makeTypeNameFromOid(typeOid, -1), -1));
altseqstmt->for_identity = true;
cxt.blist = lappend(cxt.blist, altseqstmt);
if (attnum > 0 &&
TupleDescAttr(tupdesc, attnum - 1)->attidentity)
{
Oid seq_relid = getIdentitySequence(rel, attnum, false);
Oid typeOid = typenameTypeId(pstate, def->typeName);
AlterSeqStmt *altseqstmt = makeNode(AlterSeqStmt);
altseqstmt->sequence
= makeRangeVar(get_namespace_name(get_rel_namespace(seq_relid)),
get_rel_name(seq_relid),
-1);
altseqstmt->options = list_make1(makeDefElem("as",
(Node *) makeTypeNameFromOid(typeOid, -1),
-1));
altseqstmt->for_identity = true;
cxt.blist = lappend(cxt.blist, altseqstmt);
}
}
newcmds = lappend(newcmds, cmd);
@@ -3803,7 +3811,7 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
errmsg("column \"%s\" of relation \"%s\" does not exist",
cmd->name, RelationGetRelationName(rel))));
seq_relid = getIdentitySequence(relid, attnum, true);
seq_relid = getIdentitySequence(rel, attnum, true);
if (seq_relid)
{

View File

@@ -24,7 +24,6 @@
#include "access/sysattr.h"
#include "access/table.h"
#include "catalog/dependency.h"
#include "catalog/partition.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "foreign/fdwapi.h"
@@ -1233,24 +1232,8 @@ build_column_default(Relation rel, int attrno)
if (att_tup->attidentity)
{
NextValueExpr *nve = makeNode(NextValueExpr);
Oid reloid;
/*
* The identity sequence is associated with the topmost partitioned
* table.
*/
if (rel->rd_rel->relispartition)
{
List *ancestors =
get_partition_ancestors(RelationGetRelid(rel));
reloid = llast_oid(ancestors);
list_free(ancestors);
}
else
reloid = RelationGetRelid(rel);
nve->seqid = getIdentitySequence(reloid, attrno, false);
nve->seqid = getIdentitySequence(rel, attrno, false);
nve->typeId = att_tup->atttypid;
return (Node *) nve;