1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-24 01:29:19 +03:00

Allow ALTER TABLE name {OF type | NOT OF}.

This syntax allows a standalone table to be made into a typed table,
or a typed table to be made standalone.  This is possibly a mildly
useful feature in its own right, but the real motivation for this
change is that we need it to make pg_upgrade work with typed tables.
This doesn't actually fix that problem, but it's necessary
infrastructure.

Noah Misch
This commit is contained in:
Robert Haas
2011-04-20 21:35:15 -04:00
parent 520bcd9c9b
commit 68739ba856
8 changed files with 376 additions and 38 deletions

View File

@@ -81,6 +81,7 @@
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/tqual.h"
#include "utils/typcache.h"
/*
@@ -357,6 +358,9 @@ static void ATExecEnableDisableRule(Relation rel, char *rulename,
static void ATPrepAddInherit(Relation child_rel);
static void ATExecAddInherit(Relation child_rel, RangeVar *parent, LOCKMODE lockmode);
static void ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode);
static void drop_parent_dependency(Oid relid, Oid refclassid, Oid refobjid);
static void ATExecAddOf(Relation rel, const TypeName *ofTypename, LOCKMODE lockmode);
static void ATExecDropOf(Relation rel, LOCKMODE lockmode);
static void ATExecGenericOptions(Relation rel, List *options);
static void copy_relation_data(SMgrRelation rel, SMgrRelation dst,
@@ -2683,6 +2687,16 @@ AlterTableGetLockLevel(List *cmds)
cmd_lockmode = ShareUpdateExclusiveLock;
break;
/*
* These subcommands affect implicit row type conversion. They
* have affects similar to CREATE/DROP CAST on queries. We
* don't provide for invalidating parse trees as a result of
* such changes. Do avoid concurrent pg_class updates, though.
*/
case AT_AddOf:
case AT_DropOf:
cmd_lockmode = ShareUpdateExclusiveLock;
/*
* These subcommands affect general strategies for performance
* and maintenance, though don't change the semantic results
@@ -2942,16 +2956,14 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
case AT_EnableAlwaysRule:
case AT_EnableReplicaRule:
case AT_DisableRule:
case AT_DropInherit: /* NO INHERIT */
case AT_AddOf: /* OF */
case AT_DropOf: /* NOT OF */
ATSimplePermissions(rel, ATT_TABLE);
/* These commands never recurse */
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
case AT_DropInherit: /* NO INHERIT */
ATSimplePermissions(rel, ATT_TABLE);
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
case AT_GenericOptions:
ATSimplePermissions(rel, ATT_FOREIGN_TABLE);
/* No command-specific prep needed */
@@ -3211,6 +3223,12 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
case AT_DropInherit:
ATExecDropInherit(rel, (RangeVar *) cmd->def, lockmode);
break;
case AT_AddOf:
ATExecAddOf(rel, (TypeName *) cmd->def, lockmode);
break;
case AT_DropOf:
ATExecDropOf(rel, lockmode);
break;
case AT_GenericOptions:
ATExecGenericOptions(rel, (List *) cmd->def);
break;
@@ -4045,6 +4063,42 @@ find_typed_table_dependencies(Oid typeOid, const char *typeName, DropBehavior be
}
/*
* check_of_type
*
* Check whether a type is suitable for CREATE TABLE OF/ALTER TABLE OF. If it
* isn't suitable, throw an error. Currently, we require that the type
* originated with CREATE TABLE AS. We could support any row type, but doing so
* would require handling a number of extra corner cases in the DDL commands.
*/
void
check_of_type(HeapTuple typetuple)
{
Form_pg_type typ = (Form_pg_type) GETSTRUCT(typetuple);
bool typeOk = false;
if (typ->typtype == TYPTYPE_COMPOSITE)
{
Relation typeRelation;
Assert(OidIsValid(typ->typrelid));
typeRelation = relation_open(typ->typrelid, AccessShareLock);
typeOk = (typeRelation->rd_rel->relkind == RELKIND_COMPOSITE_TYPE);
/*
* Close the parent rel, but keep our AccessShareLock on it until xact
* commit. That will prevent someone else from deleting or ALTERing
* the type before the typed table creation/conversion commits.
*/
relation_close(typeRelation, NoLock);
}
if (!typeOk)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("type %s is not a composite type",
format_type_be(HeapTupleGetOid(typetuple)))));
}
/*
* ALTER TABLE ADD COLUMN
*
@@ -8355,8 +8409,7 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
ScanKeyData key[3];
HeapTuple inheritsTuple,
attributeTuple,
constraintTuple,
depTuple;
constraintTuple;
List *connames;
bool found = false;
@@ -8522,11 +8575,29 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
systable_endscan(scan);
heap_close(catalogRelation, RowExclusiveLock);
/*
* Drop the dependency
*
* There's no convenient way to do this, so go trawling through pg_depend
*/
drop_parent_dependency(RelationGetRelid(rel),
RelationRelationId,
RelationGetRelid(parent_rel));
/* keep our lock on the parent relation until commit */
heap_close(parent_rel, NoLock);
}
/*
* Drop the dependency created by StoreCatalogInheritance1 (CREATE TABLE
* INHERITS/ALTER TABLE INHERIT -- refclassid will be RelationRelationId) or
* heap_create_with_catalog (CREATE TABLE OF/ALTER TABLE OF -- refclassid will
* be TypeRelationId). There's no convenient way to do this, so go trawling
* through pg_depend.
*/
static void
drop_parent_dependency(Oid relid, Oid refclassid, Oid refobjid)
{
Relation catalogRelation;
SysScanDesc scan;
ScanKeyData key[3];
HeapTuple depTuple;
catalogRelation = heap_open(DependRelationId, RowExclusiveLock);
ScanKeyInit(&key[0],
@@ -8536,7 +8607,7 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
ScanKeyInit(&key[1],
Anum_pg_depend_objid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(rel)));
ObjectIdGetDatum(relid));
ScanKeyInit(&key[2],
Anum_pg_depend_objsubid,
BTEqualStrategyNumber, F_INT4EQ,
@@ -8549,8 +8620,8 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
{
Form_pg_depend dep = (Form_pg_depend) GETSTRUCT(depTuple);
if (dep->refclassid == RelationRelationId &&
dep->refobjid == RelationGetRelid(parent_rel) &&
if (dep->refclassid == refclassid &&
dep->refobjid == refobjid &&
dep->refobjsubid == 0 &&
dep->deptype == DEPENDENCY_NORMAL)
simple_heap_delete(catalogRelation, &depTuple->t_self);
@@ -8558,9 +8629,181 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
systable_endscan(scan);
heap_close(catalogRelation, RowExclusiveLock);
}
/* keep our lock on the parent relation until commit */
heap_close(parent_rel, NoLock);
/*
* ALTER TABLE OF
*
* Attach a table to a composite type, as though it had been created with CREATE
* TABLE OF. All attname, atttypid, atttypmod and attcollation must match. The
* subject table must not have inheritance parents. These restrictions ensure
* that you cannot create a configuration impossible with CREATE TABLE OF alone.
*/
static void
ATExecAddOf(Relation rel, const TypeName *ofTypename, LOCKMODE lockmode)
{
Oid relid = RelationGetRelid(rel);
Type typetuple;
Form_pg_type typ;
Oid typeid;
Relation inheritsRelation,
relationRelation;
SysScanDesc scan;
ScanKeyData key;
AttrNumber table_attno,
type_attno;
TupleDesc typeTupleDesc,
tableTupleDesc;
ObjectAddress tableobj,
typeobj;
HeapTuple classtuple;
/* Validate the type. */
typetuple = typenameType(NULL, ofTypename, NULL);
check_of_type(typetuple);
typ = (Form_pg_type) GETSTRUCT(typetuple);
typeid = HeapTupleGetOid(typetuple);
/* Fail if the table has any inheritance parents. */
inheritsRelation = heap_open(InheritsRelationId, AccessShareLock);
ScanKeyInit(&key,
Anum_pg_inherits_inhrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(relid));
scan = systable_beginscan(inheritsRelation, InheritsRelidSeqnoIndexId,
true, SnapshotNow, 1, &key);
if (HeapTupleIsValid(systable_getnext(scan)))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("typed tables cannot inherit")));
systable_endscan(scan);
heap_close(inheritsRelation, AccessShareLock);
/*
* Check the tuple descriptors for compatibility. Unlike inheritance, we
* require that the order also match. However, attnotnull need not match.
* Also unlike inheritance, we do not require matching relhasoids.
*/
typeTupleDesc = lookup_rowtype_tupdesc(typeid, -1);
tableTupleDesc = RelationGetDescr(rel);
table_attno = 1;
for (type_attno = 1; type_attno <= typeTupleDesc->natts; type_attno++)
{
Form_pg_attribute type_attr,
table_attr;
const char *type_attname,
*table_attname;
/* Get the next non-dropped type attribute. */
type_attr = typeTupleDesc->attrs[type_attno - 1];
if (type_attr->attisdropped)
continue;
type_attname = NameStr(type_attr->attname);
/* Get the next non-dropped table attribute. */
do
{
if (table_attno > tableTupleDesc->natts)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("table is missing column \"%s\"",
type_attname)));
table_attr = tableTupleDesc->attrs[table_attno++ - 1];
} while (table_attr->attisdropped);
table_attname = NameStr(table_attr->attname);
/* Compare name. */
if (strncmp(table_attname, type_attname, NAMEDATALEN) != 0)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("table has column \"%s\" where type requires \"%s\"",
table_attname, type_attname)));
/* Compare type. */
if (table_attr->atttypid != type_attr->atttypid ||
table_attr->atttypmod != type_attr->atttypmod ||
table_attr->attcollation != type_attr->attcollation)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("table \"%s\" has different type for column \"%s\"",
RelationGetRelationName(rel), type_attname)));
}
DecrTupleDescRefCount(typeTupleDesc);
/* Any remaining columns at the end of the table had better be dropped. */
for (; table_attno <= tableTupleDesc->natts; table_attno++)
{
Form_pg_attribute table_attr = tableTupleDesc->attrs[table_attno - 1];
if (!table_attr->attisdropped)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("table has extra column \"%s\"",
NameStr(table_attr->attname))));
}
/* If the table was already typed, drop the existing dependency. */
if (rel->rd_rel->reloftype)
drop_parent_dependency(relid, TypeRelationId, rel->rd_rel->reloftype);
/* Record a dependency on the new type. */
tableobj.classId = RelationRelationId;
tableobj.objectId = relid;
tableobj.objectSubId = 0;
typeobj.classId = TypeRelationId;
typeobj.objectId = typeid;
typeobj.objectSubId = 0;
recordDependencyOn(&tableobj, &typeobj, DEPENDENCY_NORMAL);
/* Update pg_class.reloftype */
relationRelation = heap_open(RelationRelationId, RowExclusiveLock);
classtuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(classtuple))
elog(ERROR, "cache lookup failed for relation %u", relid);
((Form_pg_class) GETSTRUCT(classtuple))->reloftype = typeid;
simple_heap_update(relationRelation, &classtuple->t_self, classtuple);
CatalogUpdateIndexes(relationRelation, classtuple);
heap_freetuple(classtuple);
heap_close(relationRelation, RowExclusiveLock);
ReleaseSysCache(typetuple);
}
/*
* ALTER TABLE NOT OF
*
* Detach a typed table from its originating type. Just clear reloftype and
* remove the dependency.
*/
static void
ATExecDropOf(Relation rel, LOCKMODE lockmode)
{
Oid relid = RelationGetRelid(rel);
Relation relationRelation;
HeapTuple tuple;
if (!OidIsValid(rel->rd_rel->reloftype))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a typed table",
RelationGetRelationName(rel))));
/*
* We don't bother to check ownership of the type --- ownership of the table
* is presumed enough rights. No lock required on the type, either.
*/
drop_parent_dependency(relid, TypeRelationId, rel->rd_rel->reloftype);
/* Clear pg_class.reloftype */
relationRelation = heap_open(RelationRelationId, RowExclusiveLock);
tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u", relid);
((Form_pg_class) GETSTRUCT(tuple))->reloftype = InvalidOid;
simple_heap_update(relationRelation, &tuple->t_self, tuple);
CatalogUpdateIndexes(relationRelation, tuple);
heap_freetuple(tuple);
heap_close(relationRelation, RowExclusiveLock);
}
/*