1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-16 06:01:02 +03:00

Add ALTER TYPE ... ADD/DROP/ALTER/RENAME ATTRIBUTE

Like with tables, this also requires allowing the existence of
composite types with zero attributes.

reviewed by KaiGai Kohei
This commit is contained in:
Peter Eisentraut
2010-09-26 14:41:03 +03:00
parent 899beb7894
commit e440e12c56
15 changed files with 636 additions and 148 deletions

View File

@ -263,12 +263,13 @@ static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
static void ATRewriteTables(List **wqueue, LOCKMODE lockmode);
static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode);
static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
static void ATSimplePermissions(Relation rel, bool allowView);
static void ATSimplePermissions(Relation rel, bool allowView, bool allowType);
static void ATSimplePermissionsRelationOrIndex(Relation rel);
static void ATSimpleRecursion(List **wqueue, Relation rel,
AlterTableCmd *cmd, bool recurse, LOCKMODE lockmode);
static void ATOneLevelRecursion(List **wqueue, Relation rel,
AlterTableCmd *cmd, LOCKMODE lockmode);
static void find_typed_table_dependencies(Oid typeOid, const char *typeName);
static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
AlterTableCmd *cmd, LOCKMODE lockmode);
static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
@ -1978,6 +1979,10 @@ renameatt(Oid myrelid,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot rename column of typed table")));
if (targetrelation->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
find_typed_table_dependencies(targetrelation->rd_rel->reltype,
RelationGetRelationName(targetrelation));
/*
* Renaming the columns of sequences or toast tables doesn't actually
* break anything from the system's point of view, since internal
@ -2368,8 +2373,13 @@ AlterTable(AlterTableStmt *stmt)
/*
* For mostly-historical reasons, we allow ALTER TABLE to apply to
* all relation types.
* almost all relation types.
*/
if (rel->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table",
RelationGetRelationName(rel))));
break;
case OBJECT_INDEX:
@ -2388,6 +2398,14 @@ AlterTable(AlterTableStmt *stmt)
RelationGetRelationName(rel))));
break;
case OBJECT_TYPE:
if (rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a composite type",
RelationGetRelationName(rel))));
break;
case OBJECT_VIEW:
if (rel->rd_rel->relkind != RELKIND_VIEW)
ereport(ERROR,
@ -2639,14 +2657,14 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
switch (cmd->subtype)
{
case AT_AddColumn: /* ADD COLUMN */
ATSimplePermissions(rel, false);
ATSimplePermissions(rel, false, true);
/* Performs own recursion */
ATPrepAddColumn(wqueue, rel, recurse, cmd, lockmode);
pass = AT_PASS_ADD_COL;
break;
case AT_AddColumnToView: /* add column via CREATE OR REPLACE
* VIEW */
ATSimplePermissions(rel, true);
ATSimplePermissions(rel, true, false);
/* Performs own recursion */
ATPrepAddColumn(wqueue, rel, recurse, cmd, lockmode);
pass = AT_PASS_ADD_COL;
@ -2659,19 +2677,19 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
* substitutes default values into INSERTs before it expands
* rules.
*/
ATSimplePermissions(rel, true);
ATSimplePermissions(rel, true, false);
ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* No command-specific prep needed */
pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
break;
case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
ATSimplePermissions(rel, false);
ATSimplePermissions(rel, false, false);
ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* No command-specific prep needed */
pass = AT_PASS_DROP;
break;
case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
ATSimplePermissions(rel, false);
ATSimplePermissions(rel, false, false);
ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* No command-specific prep needed */
pass = AT_PASS_ADD_CONSTR;
@ -2689,25 +2707,25 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
pass = AT_PASS_MISC;
break;
case AT_SetStorage: /* ALTER COLUMN SET STORAGE */
ATSimplePermissions(rel, false);
ATSimplePermissions(rel, false, false);
ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
case AT_DropColumn: /* DROP COLUMN */
ATSimplePermissions(rel, false);
ATSimplePermissions(rel, false, true);
ATPrepDropColumn(rel, recurse, cmd);
/* Recursion occurs during execution phase */
pass = AT_PASS_DROP;
break;
case AT_AddIndex: /* ADD INDEX */
ATSimplePermissions(rel, false);
ATSimplePermissions(rel, false, false);
/* This command never recurses */
/* No command-specific prep needed */
pass = AT_PASS_ADD_INDEX;
break;
case AT_AddConstraint: /* ADD CONSTRAINT */
ATSimplePermissions(rel, false);
ATSimplePermissions(rel, false, false);
/* Recursion occurs during execution phase */
/* No command-specific prep needed except saving recurse flag */
if (recurse)
@ -2715,7 +2733,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
pass = AT_PASS_ADD_CONSTR;
break;
case AT_DropConstraint: /* DROP CONSTRAINT */
ATSimplePermissions(rel, false);
ATSimplePermissions(rel, false, false);
/* Recursion occurs during execution phase */
/* No command-specific prep needed except saving recurse flag */
if (recurse)
@ -2723,7 +2741,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
pass = AT_PASS_DROP;
break;
case AT_AlterColumnType: /* ALTER COLUMN TYPE */
ATSimplePermissions(rel, false);
ATSimplePermissions(rel, false, true);
/* Performs own recursion */
ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd, lockmode);
pass = AT_PASS_ALTER_TYPE;
@ -2735,20 +2753,20 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
break;
case AT_ClusterOn: /* CLUSTER ON */
case AT_DropCluster: /* SET WITHOUT CLUSTER */
ATSimplePermissions(rel, false);
ATSimplePermissions(rel, false, false);
/* These commands never recurse */
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
case AT_AddOids: /* SET WITH OIDS */
ATSimplePermissions(rel, false);
ATSimplePermissions(rel, false, false);
/* Performs own recursion */
if (!rel->rd_rel->relhasoids || recursing)
ATPrepAddOids(wqueue, rel, recurse, cmd, lockmode);
pass = AT_PASS_ADD_COL;
break;
case AT_DropOids: /* SET WITHOUT OIDS */
ATSimplePermissions(rel, false);
ATSimplePermissions(rel, false, false);
/* Performs own recursion */
if (rel->rd_rel->relhasoids)
{
@ -2775,7 +2793,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
pass = AT_PASS_MISC;
break;
case AT_AddInherit: /* INHERIT */
ATSimplePermissions(rel, false);
ATSimplePermissions(rel, false, false);
/* This command never recurses */
ATPrepAddInherit(rel);
pass = AT_PASS_MISC;
@ -2793,7 +2811,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
case AT_EnableReplicaRule:
case AT_DisableRule:
case AT_DropInherit: /* NO INHERIT */
ATSimplePermissions(rel, false);
ATSimplePermissions(rel, false, false);
/* These commands never recurse */
/* No command-specific prep needed */
pass = AT_PASS_MISC;
@ -3519,7 +3537,7 @@ ATGetQueueEntry(List **wqueue, Relation rel)
* - Ensure that it is not a system table
*/
static void
ATSimplePermissions(Relation rel, bool allowView)
ATSimplePermissions(Relation rel, bool allowView, bool allowType)
{
if (rel->rd_rel->relkind != RELKIND_RELATION)
{
@ -3531,6 +3549,14 @@ ATSimplePermissions(Relation rel, bool allowView)
errmsg("\"%s\" is not a table or view",
RelationGetRelationName(rel))));
}
else if (allowType)
{
if (rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table or composite type",
RelationGetRelationName(rel))));
}
else
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
@ -3758,6 +3784,44 @@ find_composite_type_dependencies(Oid typeOid,
}
/*
* find_typed_table_dependencies
*
* Check to see if a composite type is being used as the type of a
* typed table. Eventually, we'd like to propagate the alter
* operation into such tables, but for now, just error out if we find
* any.
*/
static void
find_typed_table_dependencies(Oid typeOid, const char *typeName)
{
Relation classRel;
ScanKeyData key[1];
HeapScanDesc scan;
HeapTuple tuple;
classRel = heap_open(RelationRelationId, AccessShareLock);
ScanKeyInit(&key[0],
Anum_pg_class_reloftype,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(typeOid));
scan = heap_beginscan(classRel, SnapshotNow, 1, key);
if (HeapTupleIsValid(tuple = heap_getnext(scan, ForwardScanDirection)))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot alter type \"%s\" because it is the type of a typed table",
typeName)));
}
heap_endscan(scan);
heap_close(classRel, AccessShareLock);
}
/*
* ALTER TABLE ADD COLUMN
*
@ -3804,6 +3868,10 @@ ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("column must be added to child tables too")));
}
if (rel->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
find_typed_table_dependencies(rel->rd_rel->reltype,
RelationGetRelationName(rel));
}
static void
@ -4007,7 +4075,7 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
* defaults, not even for domain-typed columns. And in any case we
* mustn't invoke Phase 3 on a view, since it has no storage.
*/
if (relkind != RELKIND_VIEW && attribute.attnum > 0)
if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE && attribute.attnum > 0)
{
defval = (Expr *) build_column_default(rel, attribute.attnum);
@ -4535,6 +4603,10 @@ ATPrepDropColumn(Relation rel, bool recurse, AlterTableCmd *cmd)
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot drop column from typed table")));
if (rel->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
find_typed_table_dependencies(rel->rd_rel->reltype,
RelationGetRelationName(rel));
/* No command-specific prep needed except saving recurse flag */
if (recurse)
cmd->subtype = AT_DropColumnRecurse;
@ -4554,7 +4626,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
/* At top level, permission check was done in ATPrepCmd, else do it */
if (recursing)
ATSimplePermissions(rel, false);
ATSimplePermissions(rel, false, true);
/*
* get the number of the attribute
@ -4858,7 +4930,7 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
/* At top level, permission check was done in ATPrepCmd, else do it */
if (recursing)
ATSimplePermissions(rel, false);
ATSimplePermissions(rel, false, false);
/*
* Call AddRelationNewConstraints to do the work, making sure it works on
@ -5801,7 +5873,7 @@ ATExecDropConstraint(Relation rel, const char *constrName,
/* At top level, permission check was done in ATPrepCmd, else do it */
if (recursing)
ATSimplePermissions(rel, false);
ATSimplePermissions(rel, false, false);
conrel = heap_open(ConstraintRelationId, RowExclusiveLock);
@ -6033,77 +6105,94 @@ ATPrepAlterColumnType(List **wqueue,
/* make sure datatype is legal for a column */
CheckAttributeType(colName, targettype, false);
/*
* Set up an expression to transform the old data value to the new type.
* If a USING option was given, transform and use that expression, else
* just take the old value and try to coerce it. We do this first so that
* type incompatibility can be detected before we waste effort, and
* because we need the expression to be parsed against the original table
* rowtype.
*/
if (cmd->transform)
if (tab->relkind == RELKIND_RELATION)
{
RangeTblEntry *rte;
/*
* Set up an expression to transform the old data value to the new type.
* If a USING option was given, transform and use that expression, else
* just take the old value and try to coerce it. We do this first so that
* type incompatibility can be detected before we waste effort, and
* because we need the expression to be parsed against the original table
* rowtype.
*/
if (cmd->transform)
{
RangeTblEntry *rte;
/* Expression must be able to access vars of old table */
rte = addRangeTableEntryForRelation(pstate,
rel,
NULL,
false,
true);
addRTEtoQuery(pstate, rte, false, true, true);
/* Expression must be able to access vars of old table */
rte = addRangeTableEntryForRelation(pstate,
rel,
NULL,
false,
true);
addRTEtoQuery(pstate, rte, false, true, true);
transform = transformExpr(pstate, cmd->transform);
transform = transformExpr(pstate, cmd->transform);
/* It can't return a set */
if (expression_returns_set(transform))
/* It can't return a set */
if (expression_returns_set(transform))
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("transform expression must not return a set")));
/* No subplans or aggregates, either... */
if (pstate->p_hasSubLinks)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use subquery in transform expression")));
if (pstate->p_hasAggs)
ereport(ERROR,
(errcode(ERRCODE_GROUPING_ERROR),
errmsg("cannot use aggregate function in transform expression")));
if (pstate->p_hasWindowFuncs)
ereport(ERROR,
(errcode(ERRCODE_WINDOWING_ERROR),
errmsg("cannot use window function in transform expression")));
}
else
{
transform = (Node *) makeVar(1, attnum,
attTup->atttypid, attTup->atttypmod,
0);
}
transform = coerce_to_target_type(pstate,
transform, exprType(transform),
targettype, targettypmod,
COERCION_ASSIGNMENT,
COERCE_IMPLICIT_CAST,
-1);
if (transform == NULL)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("transform expression must not return a set")));
errmsg("column \"%s\" cannot be cast to type %s",
colName, format_type_be(targettype))));
/* No subplans or aggregates, either... */
if (pstate->p_hasSubLinks)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use subquery in transform expression")));
if (pstate->p_hasAggs)
ereport(ERROR,
(errcode(ERRCODE_GROUPING_ERROR),
errmsg("cannot use aggregate function in transform expression")));
if (pstate->p_hasWindowFuncs)
ereport(ERROR,
(errcode(ERRCODE_WINDOWING_ERROR),
errmsg("cannot use window function in transform expression")));
/*
* Add a work queue item to make ATRewriteTable update the column
* contents.
*/
newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
newval->attnum = attnum;
newval->expr = (Expr *) transform;
tab->newvals = lappend(tab->newvals, newval);
}
else
if (tab->relkind == RELKIND_COMPOSITE_TYPE)
{
transform = (Node *) makeVar(1, attnum,
attTup->atttypid, attTup->atttypmod,
0);
/*
* For composite types, do this check now. Tables will check
* it later when the table is being rewritten.
*/
find_composite_type_dependencies(rel->rd_rel->reltype,
NULL,
RelationGetRelationName(rel));
find_typed_table_dependencies(rel->rd_rel->reltype,
RelationGetRelationName(rel));
}
transform = coerce_to_target_type(pstate,
transform, exprType(transform),
targettype, targettypmod,
COERCION_ASSIGNMENT,
COERCE_IMPLICIT_CAST,
-1);
if (transform == NULL)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("column \"%s\" cannot be cast to type %s",
colName, format_type_be(targettype))));
/*
* Add a work queue item to make ATRewriteTable update the column
* contents.
*/
newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
newval->attnum = attnum;
newval->expr = (Expr *) transform;
tab->newvals = lappend(tab->newvals, newval);
ReleaseSysCache(tuple);
/*
@ -7367,7 +7456,7 @@ ATExecAddInherit(Relation child_rel, RangeVar *parent, LOCKMODE lockmode)
* Must be owner of both parent and child -- child was checked by
* ATSimplePermissions call in ATPrepCmd
*/
ATSimplePermissions(parent_rel, false);
ATSimplePermissions(parent_rel, false, false);
/* Permanent rels cannot inherit from temporary ones */
if (parent_rel->rd_istemp && !child_rel->rd_istemp)