1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-09 06:21:09 +03:00

ALTER TABLE rewrite. New cool stuff:

* ALTER ... ADD COLUMN with defaults and NOT NULL constraints works per SQL
spec.  A default is implemented by rewriting the table with the new value
stored in each row.

* ALTER COLUMN TYPE.  You can change a column's datatype to anything you
want, so long as you can specify how to convert the old value.  Rewrites
the table.  (Possible future improvement: optimize no-op conversions such
as varchar(N) to varchar(N+1).)

* Multiple ALTER actions in a single ALTER TABLE command.  You can perform
any number of column additions, type changes, and constraint additions with
only one pass over the table contents.

Basic documentation provided in ALTER TABLE ref page, but some more docs
work is needed.

Original patch from Rod Taylor, additional work from Tom Lane.
This commit is contained in:
Tom Lane
2004-05-05 04:48:48 +00:00
parent 3e3cb0a14a
commit 077db40fa1
30 changed files with 3302 additions and 1906 deletions

View File

@@ -11,7 +11,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.120 2004/03/23 19:35:16 tgl Exp $
* $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.121 2004/05/05 04:48:45 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -64,11 +64,7 @@ typedef struct
static void cluster_rel(RelToCluster *rv, bool recheck);
static Oid make_new_heap(Oid OIDOldHeap, const char *NewName);
static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
static List *get_indexattr_list(Relation OldHeap, Oid OldIndex);
static void rebuild_indexes(Oid OIDOldHeap, List *indexes);
static void swap_relfilenodes(Oid r1, Oid r2);
static List *get_tables_to_cluster(MemoryContext cluster_context);
@@ -479,7 +475,7 @@ rebuild_relation(Relation OldHeap, Oid indexOid)
/*
* Create the new table that we will fill with correctly-ordered data.
*/
static Oid
Oid
make_new_heap(Oid OIDOldHeap, const char *NewName)
{
TupleDesc OldHeapDesc,
@@ -578,7 +574,7 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
* Get the necessary info about the indexes of the relation and
* return a list of IndexAttrs structures.
*/
static List *
List *
get_indexattr_list(Relation OldHeap, Oid OldIndex)
{
List *indexes = NIL;
@@ -621,7 +617,7 @@ get_indexattr_list(Relation OldHeap, Oid OldIndex)
* Create new indexes and swap the filenodes with old indexes. Then drop
* the new index (carrying the old index filenode along).
*/
static void
void
rebuild_indexes(Oid OIDOldHeap, List *indexes)
{
List *elem;
@@ -646,10 +642,15 @@ rebuild_indexes(Oid OIDOldHeap, List *indexes)
* matter: after the filenode swap the index will keep the
* constraint status of the old index.
*/
newIndexOID = index_create(OIDOldHeap, newIndexName,
attrs->indexInfo, attrs->accessMethodOID,
attrs->classOID, false,
false, allowSystemTableMods);
newIndexOID = index_create(OIDOldHeap,
newIndexName,
attrs->indexInfo,
attrs->accessMethodOID,
attrs->classOID,
false,
false,
allowSystemTableMods,
false);
CommandCounterIncrement();
/* Swap the filenodes. */
@@ -698,7 +699,7 @@ rebuild_indexes(Oid OIDOldHeap, List *indexes)
* Also swap any TOAST links, so that the toast data moves along with
* the main-table data.
*/
static void
void
swap_relfilenodes(Oid r1, Oid r2)
{
Relation relRelation,
@@ -789,9 +790,9 @@ swap_relfilenodes(Oid r1, Oid r2)
* their new owning relations. Otherwise the wrong one will get
* dropped ...
*
* NOTE: for now, we can assume the new table will have a TOAST table if
* and only if the old one does. This logic might need work if we get
* smarter about dropped columns.
* NOTE: it is possible that only one table has a toast table; this
* can happen in CLUSTER if there were dropped columns in the old table,
* and in ALTER TABLE when adding or changing type of columns.
*
* NOTE: at present, a TOAST table's only dependency is the one on its
* owning table. If more are ever created, we'd need to use something
@@ -804,35 +805,43 @@ swap_relfilenodes(Oid r1, Oid r2)
toastobject;
long count;
if (!(relform1->reltoastrelid && relform2->reltoastrelid))
elog(ERROR, "expected both swapped tables to have TOAST tables");
/* Delete old dependencies */
count = deleteDependencyRecordsFor(RelOid_pg_class,
relform1->reltoastrelid);
if (count != 1)
elog(ERROR, "expected one dependency record for TOAST table, found %ld",
count);
count = deleteDependencyRecordsFor(RelOid_pg_class,
relform2->reltoastrelid);
if (count != 1)
elog(ERROR, "expected one dependency record for TOAST table, found %ld",
count);
if (relform1->reltoastrelid)
{
count = deleteDependencyRecordsFor(RelOid_pg_class,
relform1->reltoastrelid);
if (count != 1)
elog(ERROR, "expected one dependency record for TOAST table, found %ld",
count);
}
if (relform2->reltoastrelid)
{
count = deleteDependencyRecordsFor(RelOid_pg_class,
relform2->reltoastrelid);
if (count != 1)
elog(ERROR, "expected one dependency record for TOAST table, found %ld",
count);
}
/* Register new dependencies */
baseobject.classId = RelOid_pg_class;
baseobject.objectId = r1;
baseobject.objectSubId = 0;
toastobject.classId = RelOid_pg_class;
toastobject.objectId = relform1->reltoastrelid;
toastobject.objectSubId = 0;
recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
if (relform1->reltoastrelid)
{
baseobject.objectId = r1;
toastobject.objectId = relform1->reltoastrelid;
recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
}
baseobject.objectId = r2;
toastobject.objectId = relform2->reltoastrelid;
recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
if (relform2->reltoastrelid)
{
baseobject.objectId = r2;
toastobject.objectId = relform2->reltoastrelid;
recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
}
}
/*

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.117 2003/12/28 21:57:36 tgl Exp $
* $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.118 2004/05/05 04:48:45 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -31,6 +31,7 @@
#include "miscadmin.h"
#include "optimizer/clauses.h"
#include "optimizer/prep.h"
#include "parser/analyze.h"
#include "parser/parsetree.h"
#include "parser/parse_coerce.h"
#include "parser/parse_expr.h"
@@ -38,38 +39,62 @@
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
/* non-export function prototypes */
static void CheckPredicate(Expr *predicate);
static void ComputeIndexAttrs(IndexInfo *indexInfo, Oid *classOidP,
List *attList,
Oid relId,
char *accessMethodName, Oid accessMethodId);
List *attList,
Oid relId,
char *accessMethodName, Oid accessMethodId,
bool isconstraint);
static Oid GetIndexOpClass(List *opclass, Oid attrType,
char *accessMethodName, Oid accessMethodId);
static Oid GetDefaultOpClass(Oid attrType, Oid accessMethodId);
static char *CreateIndexName(const char *table_name, const char *column_name,
const char *label, Oid inamespace);
static bool relationHasPrimaryKey(Relation rel);
/*
* DefineIndex
* Creates a new index.
*
* 'attributeList' is a list of IndexElem specifying columns and expressions
* 'heapRelation': the relation the index will apply to.
* 'indexRelationName': the name for the new index, or NULL to indicate
* that a nonconflicting default name should be picked.
* 'accessMethodName': name of the AM to use.
* 'attributeList': a list of IndexElem specifying columns and expressions
* to index on.
* 'predicate' is the qual specified in the where clause.
* 'rangetable' is needed to interpret the predicate.
* 'predicate': the partial-index condition, or NULL if none.
* 'rangetable': needed to interpret the predicate.
* 'unique': make the index enforce uniqueness.
* 'primary': mark the index as a primary key in the catalogs.
* 'isconstraint': index is for a PRIMARY KEY or UNIQUE constraint,
* so build a pg_constraint entry for it.
* 'is_alter_table': this is due to an ALTER rather than a CREATE operation.
* 'check_rights': check for CREATE rights in the namespace. (This should
* be true except when ALTER is deleting/recreating an index.)
* 'skip_build': make the catalog entries but leave the index file empty;
* it will be filled later.
* 'quiet': suppress the NOTICE chatter ordinarily provided for constraints.
*/
void
DefineIndex(RangeVar *heapRelation,
char *indexRelationName,
char *accessMethodName,
List *attributeList,
Expr *predicate,
List *rangetable,
bool unique,
bool primary,
bool isconstraint,
Expr *predicate,
List *rangetable)
bool is_alter_table,
bool check_rights,
bool skip_build,
bool quiet)
{
Oid *classObjectId;
Oid accessMethodId;
@@ -111,15 +136,13 @@ DefineIndex(RangeVar *heapRelation,
relationId = RelationGetRelid(rel);
namespaceId = RelationGetNamespace(rel);
heap_close(rel, NoLock);
/*
* Verify we (still) have CREATE rights in the rel's namespace.
* (Presumably we did when the rel was created, but maybe not
* anymore.) Skip check if bootstrapping, since permissions machinery
* may not be working yet.
* anymore.) Skip check if caller doesn't want it. Also skip check
* if bootstrapping, since permissions machinery may not be working yet.
*/
if (!IsBootstrapProcessingMode())
if (check_rights && !IsBootstrapProcessingMode())
{
AclResult aclresult;
@@ -130,6 +153,27 @@ DefineIndex(RangeVar *heapRelation,
get_namespace_name(namespaceId));
}
/*
* Select name for index if caller didn't specify
*/
if (indexRelationName == NULL)
{
if (primary)
indexRelationName = CreateIndexName(RelationGetRelationName(rel),
NULL,
"pkey",
namespaceId);
else
{
IndexElem *iparam = (IndexElem *) lfirst(attributeList);
indexRelationName = CreateIndexName(RelationGetRelationName(rel),
iparam->name,
"key",
namespaceId);
}
}
/*
* look up the access method, verify it can handle the requested
* features
@@ -177,13 +221,33 @@ DefineIndex(RangeVar *heapRelation,
CheckPredicate(predicate);
/*
* Check that all of the attributes in a primary key are marked as not
* null, otherwise attempt to ALTER TABLE .. SET NOT NULL
* Extra checks when creating a PRIMARY KEY index.
*/
if (primary)
{
List *cmds;
List *keys;
/*
* If ALTER TABLE, check that there isn't already a PRIMARY KEY.
* In CREATE TABLE, we have faith that the parser rejected multiple
* pkey clauses; and CREATE INDEX doesn't have a way to say
* PRIMARY KEY, so it's no problem either.
*/
if (is_alter_table &&
relationHasPrimaryKey(rel))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("multiple primary keys for table \"%s\" are not allowed",
RelationGetRelationName(rel))));
}
/*
* Check that all of the attributes in a primary key are marked as not
* null, otherwise attempt to ALTER TABLE .. SET NOT NULL
*/
cmds = NIL;
foreach(keys, attributeList)
{
IndexElem *key = (IndexElem *) lfirst(keys);
@@ -203,29 +267,43 @@ DefineIndex(RangeVar *heapRelation,
{
if (!((Form_pg_attribute) GETSTRUCT(atttuple))->attnotnull)
{
/*
* Try to make it NOT NULL.
*
* XXX: Shouldn't the ALTER TABLE .. SET NOT NULL cascade
* to child tables? Currently, since the PRIMARY KEY
* itself doesn't cascade, we don't cascade the
* notnull constraint either; but this is pretty
* debatable.
*/
AlterTableAlterColumnSetNotNull(relationId, false,
key->name);
/* Add a subcommand to make this one NOT NULL */
AlterTableCmd *cmd = makeNode(AlterTableCmd);
cmd->subtype = AT_SetNotNull;
cmd->name = key->name;
cmds = lappend(cmds, cmd);
}
ReleaseSysCache(atttuple);
}
else
{
/* This shouldn't happen if parser did its job ... */
/*
* This shouldn't happen during CREATE TABLE, but can
* happen during ALTER TABLE. Keep message in sync with
* transformIndexConstraints() in parser/analyze.c.
*/
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" named in key does not exist",
key->name)));
}
}
/*
* XXX: Shouldn't the ALTER TABLE .. SET NOT NULL cascade
* to child tables? Currently, since the PRIMARY KEY
* itself doesn't cascade, we don't cascade the
* notnull constraint(s) either; but this is pretty debatable.
*
* XXX: possible future improvement: when being called from
* ALTER TABLE, it would be more efficient to merge this with
* the outer ALTER TABLE, so as to avoid two scans. But that
* seems to complicate DefineIndex's API unduly.
*/
if (cmds)
AlterTableInternal(relationId, cmds, false);
}
/*
@@ -242,11 +320,26 @@ DefineIndex(RangeVar *heapRelation,
classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
ComputeIndexAttrs(indexInfo, classObjectId, attributeList,
relationId, accessMethodName, accessMethodId);
relationId, accessMethodName, accessMethodId,
isconstraint);
heap_close(rel, NoLock);
/*
* Report index creation if appropriate (delay this till after most
* of the error checks)
*/
if (isconstraint && !quiet)
ereport(NOTICE,
(errmsg("%s %s will create implicit index \"%s\" for table \"%s\"",
is_alter_table ? "ALTER TABLE / ADD" : "CREATE TABLE /",
primary ? "PRIMARY KEY" : "UNIQUE",
indexRelationName, RelationGetRelationName(rel))));
index_create(relationId, indexRelationName,
indexInfo, accessMethodId, classObjectId,
primary, isconstraint, allowSystemTableMods);
primary, isconstraint,
allowSystemTableMods, skip_build);
/*
* We update the relation's pg_class tuple even if it already has
@@ -303,7 +396,8 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
List *attList, /* list of IndexElem's */
Oid relId,
char *accessMethodName,
Oid accessMethodId)
Oid accessMethodId,
bool isconstraint)
{
List *rest;
int attn = 0;
@@ -325,10 +419,19 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
Assert(attribute->expr == NULL);
atttuple = SearchSysCacheAttName(relId, attribute->name);
if (!HeapTupleIsValid(atttuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" does not exist",
attribute->name)));
{
/* difference in error message spellings is historical */
if (isconstraint)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" named in key does not exist",
attribute->name)));
else
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" does not exist",
attribute->name)));
}
attform = (Form_pg_attribute) GETSTRUCT(atttuple);
indexInfo->ii_KeyAttrNumbers[attn] = attform->attnum;
atttype = attform->atttypid;
@@ -553,6 +656,79 @@ GetDefaultOpClass(Oid attrType, Oid accessMethodId)
return InvalidOid;
}
/*
* Select a nonconflicting name for an index.
*/
static char *
CreateIndexName(const char *table_name, const char *column_name,
const char *label, Oid inamespace)
{
int pass = 0;
char *iname = NULL;
char typename[NAMEDATALEN];
/*
* The type name for makeObjectName is label, or labelN if that's
* necessary to prevent collision with existing indexes.
*/
strncpy(typename, label, sizeof(typename));
for (;;)
{
iname = makeObjectName(table_name, column_name, typename);
if (!OidIsValid(get_relname_relid(iname, inamespace)))
break;
/* found a conflict, so try a new name component */
pfree(iname);
snprintf(typename, sizeof(typename), "%s%d", label, ++pass);
}
return iname;
}
/*
* relationHasPrimaryKey -
*
* See whether an existing relation has a primary key.
*/
static bool
relationHasPrimaryKey(Relation rel)
{
bool result = false;
List *indexoidlist,
*indexoidscan;
/*
* Get the list of index OIDs for the table from the relcache, and
* look up each one in the pg_index syscache until we find one marked
* primary key (hopefully there isn't more than one such).
*/
indexoidlist = RelationGetIndexList(rel);
foreach(indexoidscan, indexoidlist)
{
Oid indexoid = lfirsto(indexoidscan);
HeapTuple indexTuple;
indexTuple = SearchSysCache(INDEXRELID,
ObjectIdGetDatum(indexoid),
0, 0, 0);
if (!HeapTupleIsValid(indexTuple)) /* should not happen */
elog(ERROR, "cache lookup failed for index %u", indexoid);
result = ((Form_pg_index) GETSTRUCT(indexTuple))->indisprimary;
ReleaseSysCache(indexTuple);
if (result)
break;
}
freeList(indexoidlist);
return result;
}
/*
* RemoveIndex
* Deletes an index.

File diff suppressed because it is too large Load Diff