1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-16 06:01:02 +03:00

Implement CREATE TABLE LIKE ... INCLUDING INDEXES. Patch from NikhilS,

based in part on an earlier patch from Trevor Hardcastle, and reviewed
by myself.
This commit is contained in:
Neil Conway
2007-07-17 05:02:03 +00:00
parent 77d27e43e5
commit 474774918b
15 changed files with 574 additions and 198 deletions

View File

@ -19,20 +19,23 @@
* Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.1 2007/06/23 22:12:51 tgl Exp $
* $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.2 2007/07/17 05:02:02 neilc Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "catalog/heap.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "optimizer/clauses.h"
@ -47,6 +50,7 @@
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
@ -63,6 +67,7 @@ typedef struct
List *ckconstraints; /* CHECK constraints */
List *fkconstraints; /* FOREIGN KEY constraints */
List *ixconstraints; /* index-creating constraints */
List *inh_indexes; /* cloned indexes from INCLUDING INDEXES */
List *blist; /* "before list" of things to do before
* creating the table */
List *alist; /* "after list" of things to do after creating
@ -93,8 +98,13 @@ static void transformTableConstraint(ParseState *pstate,
Constraint *constraint);
static void transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
InhRelation *inhrelation);
static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt,
Relation parent_index, AttrNumber *attmap);
static List *get_opclass(Oid opclass, Oid actual_datatype);
static void transformIndexConstraints(ParseState *pstate,
CreateStmtContext *cxt);
static IndexStmt *transformIndexConstraint(Constraint *constraint,
CreateStmtContext *cxt);
static void transformFKConstraints(ParseState *pstate,
CreateStmtContext *cxt,
bool skipValidation,
@ -146,6 +156,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
cxt.ckconstraints = NIL;
cxt.fkconstraints = NIL;
cxt.ixconstraints = NIL;
cxt.inh_indexes = NIL;
cxt.blist = NIL;
cxt.alist = NIL;
cxt.pkey = NULL;
@ -555,11 +566,6 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
}
}
if (including_indexes)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("LIKE INCLUDING INDEXES is not implemented")));
/*
* Insert the copied attributes into the cxt for the new table
* definition.
@ -657,6 +663,35 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
}
}
if (including_indexes && relation->rd_rel->relhasindex)
{
AttrNumber *attmap;
List *parent_indexes;
ListCell *l;
attmap = varattnos_map_schema(tupleDesc, cxt->columns);
parent_indexes = RelationGetIndexList(relation);
foreach(l, parent_indexes)
{
Oid parent_index_oid = lfirst_oid(l);
Relation parent_index;
IndexStmt *index_stmt;
parent_index = index_open(parent_index_oid, AccessShareLock);
/* Build CREATE INDEX statement to recreate the parent_index */
index_stmt = generateClonedIndexStmt(cxt, parent_index,
attmap);
/* Add the new IndexStmt to the create context */
cxt->inh_indexes = lappend(cxt->inh_indexes, index_stmt);
/* Keep our lock on the index till xact commit */
index_close(parent_index, NoLock);
}
}
/*
* Close the parent rel, but keep our AccessShareLock on it until xact
* commit. That will prevent someone else from deleting or ALTERing the
@ -665,189 +700,255 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
heap_close(relation, NoLock);
}
/*
* Generate an IndexStmt entry using information from an already
* existing index "source_idx".
*
* Note: Much of this functionality is cribbed from pg_get_indexdef.
*/
static IndexStmt *
generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
AttrNumber *attmap)
{
HeapTuple ht_idx;
HeapTuple ht_idxrel;
HeapTuple ht_am;
Form_pg_index idxrec;
Form_pg_class idxrelrec;
Form_pg_am amrec;
List *indexprs = NIL;
ListCell *indexpr_item;
Oid indrelid;
Oid source_relid;
int keyno;
Oid keycoltype;
Datum indclassDatum;
Datum indoptionDatum;
bool isnull;
oidvector *indclass;
int2vector *indoption;
IndexStmt *index;
Datum reloptions;
source_relid = RelationGetRelid(source_idx);
/* Fetch pg_index tuple for source index */
ht_idx = SearchSysCache(INDEXRELID,
ObjectIdGetDatum(source_relid),
0, 0, 0);
if (!HeapTupleIsValid(ht_idx))
elog(ERROR, "cache lookup failed for index %u", source_relid);
idxrec = (Form_pg_index) GETSTRUCT(ht_idx);
Assert(source_relid == idxrec->indexrelid);
indrelid = idxrec->indrelid;
index = makeNode(IndexStmt);
index->unique = idxrec->indisunique;
index->concurrent = false;
index->primary = idxrec->indisprimary;
index->relation = cxt->relation;
index->isconstraint = false;
/*
* We don't try to preserve the name of the source index; instead, just
* let DefineIndex() choose a reasonable name.
*/
index->idxname = NULL;
/* Must get indclass and indoption the hard way */
indclassDatum = SysCacheGetAttr(INDEXRELID, ht_idx,
Anum_pg_index_indclass, &isnull);
Assert(!isnull);
indclass = (oidvector *) DatumGetPointer(indclassDatum);
indoptionDatum = SysCacheGetAttr(INDEXRELID, ht_idx,
Anum_pg_index_indoption, &isnull);
Assert(!isnull);
indoption = (int2vector *) DatumGetPointer(indoptionDatum);
/* Fetch pg_class tuple of source index */
ht_idxrel = SearchSysCache(RELOID,
ObjectIdGetDatum(source_relid),
0, 0, 0);
if (!HeapTupleIsValid(ht_idxrel))
elog(ERROR, "cache lookup failed for relation %u", source_relid);
/*
* Store the reloptions for later use by this new index
*/
reloptions = SysCacheGetAttr(RELOID, ht_idxrel,
Anum_pg_class_reloptions, &isnull);
if (!isnull)
index->src_options = flatten_reloptions(source_relid);
idxrelrec = (Form_pg_class) GETSTRUCT(ht_idxrel);
/* Fetch pg_am tuple for the index's access method */
ht_am = SearchSysCache(AMOID,
ObjectIdGetDatum(idxrelrec->relam),
0, 0, 0);
if (!HeapTupleIsValid(ht_am))
elog(ERROR, "cache lookup failed for access method %u",
idxrelrec->relam);
amrec = (Form_pg_am) GETSTRUCT(ht_am);
index->accessMethod = pstrdup(NameStr(amrec->amname));
/* Get the index expressions, if any */
if (!heap_attisnull(ht_idx, Anum_pg_index_indexprs))
{
Datum exprsDatum;
bool isnull;
char *exprsString;
exprsDatum = SysCacheGetAttr(INDEXRELID, ht_idx,
Anum_pg_index_indexprs, &isnull);
exprsString = DatumGetCString(DirectFunctionCall1(textout,
exprsDatum));
Assert(!isnull);
indexprs = (List *) stringToNode(exprsString);
}
indexpr_item = list_head(indexprs);
for (keyno = 0; keyno < idxrec->indnatts; keyno++)
{
IndexElem *iparam;
AttrNumber attnum = idxrec->indkey.values[keyno];
int16 opt = indoption->values[keyno];
iparam = makeNode(IndexElem);
if (AttributeNumberIsValid(attnum))
{
/* Simple index column */
char *attname;
attname = get_relid_attribute_name(indrelid, attnum);
keycoltype = get_atttype(indrelid, attnum);
iparam->name = attname;
iparam->expr = NULL;
}
else
{
/* Expressional index */
Node *indexkey;
if (indexpr_item == NULL)
elog(ERROR, "too few entries in indexprs list");
indexkey = (Node *) lfirst(indexpr_item);
change_varattnos_of_a_node(indexkey, attmap);
iparam->name = NULL;
iparam->expr = indexkey;
indexpr_item = lnext(indexpr_item);
keycoltype = exprType(indexkey);
}
/* Add the operator class name, if non-default */
iparam->opclass = get_opclass(indclass->values[keyno], keycoltype);
iparam->ordering = SORTBY_DEFAULT;
iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
/* Adjust options if necessary */
if (amrec->amcanorder)
{
/* If it supports sort ordering, report DESC and NULLS opts */
if (opt & INDOPTION_DESC)
iparam->ordering = SORTBY_DESC;
if (opt & INDOPTION_NULLS_FIRST)
iparam->nulls_ordering = SORTBY_NULLS_FIRST;
}
index->indexParams = lappend(index->indexParams, iparam);
}
/* Use the same tablespace as the source index */
index->tableSpace = get_tablespace_name(source_idx->rd_node.spcNode);
/* If it's a partial index, decompile and append the predicate */
if (!heap_attisnull(ht_idx, Anum_pg_index_indpred))
{
Datum pred_datum;
bool isnull;
char *pred_str;
/* Convert text string to node tree */
pred_datum = SysCacheGetAttr(INDEXRELID, ht_idx,
Anum_pg_index_indpred, &isnull);
Assert(!isnull);
pred_str = DatumGetCString(DirectFunctionCall1(textout,
pred_datum));
index->whereClause = (Node *) stringToNode(pred_str);
change_varattnos_of_a_node(index->whereClause, attmap);
}
/* Clean up */
ReleaseSysCache(ht_idx);
ReleaseSysCache(ht_idxrel);
ReleaseSysCache(ht_am);
return index;
}
/*
* get_opclass - fetch name of an index operator class
*
* If the opclass is the default for the given actual_datatype, then
* the return value is NIL.
*/
static List *
get_opclass(Oid opclass, Oid actual_datatype)
{
HeapTuple ht_opc;
Form_pg_opclass opc_rec;
List *result = NIL;
ht_opc = SearchSysCache(CLAOID,
ObjectIdGetDatum(opclass),
0, 0, 0);
if (!HeapTupleIsValid(ht_opc))
elog(ERROR, "cache lookup failed for opclass %u", opclass);
opc_rec = (Form_pg_opclass) GETSTRUCT(ht_opc);
if (!OidIsValid(actual_datatype) ||
GetDefaultOpClass(actual_datatype, opc_rec->opcmethod) != opclass)
{
char *nsp_name = get_namespace_name(opc_rec->opcnamespace);
char *opc_name = NameStr(opc_rec->opcname);
result = list_make2(makeString(nsp_name), makeString(opc_name));
}
ReleaseSysCache(ht_opc);
return result;
}
/*
* transformIndexConstraints
* Handle UNIQUE and PRIMARY KEY constraints, which create indexes
* Handle UNIQUE and PRIMARY KEY constraints, which create
* indexes. We also merge index definitions arising from
* LIKE ... INCLUDING INDEXES.
*/
static void
transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
{
IndexStmt *index;
List *indexlist = NIL;
ListCell *listptr;
ListCell *l;
ListCell *lc;
/*
* Run through the constraints that need to generate an index. For PRIMARY
* KEY, mark each column as NOT NULL and create an index. For UNIQUE,
* create an index as for PRIMARY KEY, but do not insist on NOT NULL.
*/
foreach(listptr, cxt->ixconstraints)
foreach(lc, cxt->ixconstraints)
{
Constraint *constraint = lfirst(listptr);
ListCell *keys;
IndexElem *iparam;
Assert(IsA(constraint, Constraint));
Assert((constraint->contype == CONSTR_PRIMARY)
|| (constraint->contype == CONSTR_UNIQUE));
index = makeNode(IndexStmt);
index->unique = true;
index->primary = (constraint->contype == CONSTR_PRIMARY);
if (index->primary)
{
if (cxt->pkey != NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("multiple primary keys for table \"%s\" are not allowed",
cxt->relation->relname)));
cxt->pkey = index;
/*
* In ALTER TABLE case, a primary index might already exist, but
* DefineIndex will check for it.
*/
}
index->isconstraint = true;
if (constraint->name != NULL)
index->idxname = pstrdup(constraint->name);
else
index->idxname = NULL; /* DefineIndex will choose name */
index->relation = cxt->relation;
index->accessMethod = DEFAULT_INDEX_TYPE;
index->options = constraint->options;
index->tableSpace = constraint->indexspace;
index->indexParams = NIL;
index->whereClause = NULL;
index->concurrent = false;
/*
* Make sure referenced keys exist. If we are making a PRIMARY KEY
* index, also make sure they are NOT NULL, if possible. (Although we
* could leave it to DefineIndex to mark the columns NOT NULL, it's
* more efficient to get it right the first time.)
*/
foreach(keys, constraint->keys)
{
char *key = strVal(lfirst(keys));
bool found = false;
ColumnDef *column = NULL;
ListCell *columns;
foreach(columns, cxt->columns)
{
column = (ColumnDef *) lfirst(columns);
Assert(IsA(column, ColumnDef));
if (strcmp(column->colname, key) == 0)
{
found = true;
break;
}
}
if (found)
{
/* found column in the new table; force it to be NOT NULL */
if (constraint->contype == CONSTR_PRIMARY)
column->is_not_null = TRUE;
}
else if (SystemAttributeByName(key, cxt->hasoids) != NULL)
{
/*
* column will be a system column in the new table, so accept
* it. System columns can't ever be null, so no need to worry
* about PRIMARY/NOT NULL constraint.
*/
found = true;
}
else if (cxt->inhRelations)
{
/* try inherited tables */
ListCell *inher;
foreach(inher, cxt->inhRelations)
{
RangeVar *inh = (RangeVar *) lfirst(inher);
Relation rel;
int count;
Assert(IsA(inh, RangeVar));
rel = heap_openrv(inh, AccessShareLock);
if (rel->rd_rel->relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("inherited relation \"%s\" is not a table",
inh->relname)));
for (count = 0; count < rel->rd_att->natts; count++)
{
Form_pg_attribute inhattr = rel->rd_att->attrs[count];
char *inhname = NameStr(inhattr->attname);
if (inhattr->attisdropped)
continue;
if (strcmp(key, inhname) == 0)
{
found = true;
/*
* We currently have no easy way to force an
* inherited column to be NOT NULL at creation, if
* its parent wasn't so already. We leave it to
* DefineIndex to fix things up in this case.
*/
break;
}
}
heap_close(rel, NoLock);
if (found)
break;
}
}
/*
* In the ALTER TABLE case, don't complain about index keys not
* created in the command; they may well exist already.
* DefineIndex will complain about them if not, and will also take
* care of marking them NOT NULL.
*/
if (!found && !cxt->isalter)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" named in key does not exist",
key)));
/* Check for PRIMARY KEY(foo, foo) */
foreach(columns, index->indexParams)
{
iparam = (IndexElem *) lfirst(columns);
if (iparam->name && strcmp(key, iparam->name) == 0)
{
if (index->primary)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
errmsg("column \"%s\" appears twice in primary key constraint",
key)));
else
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
errmsg("column \"%s\" appears twice in unique constraint",
key)));
}
}
/* OK, add it to the index definition */
iparam = makeNode(IndexElem);
iparam->name = pstrdup(key);
iparam->expr = NULL;
iparam->opclass = NIL;
iparam->ordering = SORTBY_DEFAULT;
iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
index->indexParams = lappend(index->indexParams, iparam);
}
Constraint *constraint = (Constraint *) lfirst(lc);
index = transformIndexConstraint(constraint, cxt);
indexlist = lappend(indexlist, index);
}
@ -867,12 +968,12 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
cxt->alist = list_make1(cxt->pkey);
}
foreach(l, indexlist)
foreach(lc, indexlist)
{
bool keep = true;
ListCell *k;
index = lfirst(l);
index = lfirst(lc);
/* if it's pkey, it's already in cxt->alist */
if (index == cxt->pkey)
@ -900,6 +1001,194 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
if (keep)
cxt->alist = lappend(cxt->alist, index);
}
/* Copy indexes defined by LIKE ... INCLUDING INDEXES */
foreach(lc, cxt->inh_indexes)
{
index = (IndexStmt *) lfirst(lc);
if (index->primary)
{
if (cxt->pkey)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("multiple primary keys for table \"%s\" are not allowed",
cxt->relation->relname)));
cxt->pkey = index;
}
cxt->alist = lappend(cxt->alist, index);
}
}
static IndexStmt *
transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
{
IndexStmt *index;
ListCell *keys;
IndexElem *iparam;
Assert(constraint->contype == CONSTR_PRIMARY ||
constraint->contype == CONSTR_UNIQUE);
index = makeNode(IndexStmt);
index->unique = true;
index->primary = (constraint->contype == CONSTR_PRIMARY);
if (index->primary)
{
if (cxt->pkey != NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("multiple primary keys for table \"%s\" are not allowed",
cxt->relation->relname)));
cxt->pkey = index;
/*
* In ALTER TABLE case, a primary index might already exist, but
* DefineIndex will check for it.
*/
}
index->isconstraint = true;
if (constraint->name != NULL)
index->idxname = pstrdup(constraint->name);
else
index->idxname = NULL; /* DefineIndex will choose name */
index->relation = cxt->relation;
index->accessMethod = DEFAULT_INDEX_TYPE;
index->options = constraint->options;
index->tableSpace = constraint->indexspace;
index->indexParams = NIL;
index->whereClause = NULL;
index->concurrent = false;
/*
* Make sure referenced keys exist. If we are making a PRIMARY KEY
* index, also make sure they are NOT NULL, if possible. (Although we
* could leave it to DefineIndex to mark the columns NOT NULL, it's
* more efficient to get it right the first time.)
*/
foreach(keys, constraint->keys)
{
char *key = strVal(lfirst(keys));
bool found = false;
ColumnDef *column = NULL;
ListCell *columns;
foreach(columns, cxt->columns)
{
column = (ColumnDef *) lfirst(columns);
Assert(IsA(column, ColumnDef));
if (strcmp(column->colname, key) == 0)
{
found = true;
break;
}
}
if (found)
{
/* found column in the new table; force it to be NOT NULL */
if (constraint->contype == CONSTR_PRIMARY)
column->is_not_null = TRUE;
}
else if (SystemAttributeByName(key, cxt->hasoids) != NULL)
{
/*
* column will be a system column in the new table, so accept
* it. System columns can't ever be null, so no need to worry
* about PRIMARY/NOT NULL constraint.
*/
found = true;
}
else if (cxt->inhRelations)
{
/* try inherited tables */
ListCell *inher;
foreach(inher, cxt->inhRelations)
{
RangeVar *inh = (RangeVar *) lfirst(inher);
Relation rel;
int count;
Assert(IsA(inh, RangeVar));
rel = heap_openrv(inh, AccessShareLock);
if (rel->rd_rel->relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("inherited relation \"%s\" is not a table",
inh->relname)));
for (count = 0; count < rel->rd_att->natts; count++)
{
Form_pg_attribute inhattr = rel->rd_att->attrs[count];
char *inhname = NameStr(inhattr->attname);
if (inhattr->attisdropped)
continue;
if (strcmp(key, inhname) == 0)
{
found = true;
/*
* We currently have no easy way to force an
* inherited column to be NOT NULL at creation, if
* its parent wasn't so already. We leave it to
* DefineIndex to fix things up in this case.
*/
break;
}
}
heap_close(rel, NoLock);
if (found)
break;
}
}
/*
* In the ALTER TABLE case, don't complain about index keys not
* created in the command; they may well exist already.
* DefineIndex will complain about them if not, and will also take
* care of marking them NOT NULL.
*/
if (!found && !cxt->isalter)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" named in key does not exist",
key)));
/* Check for PRIMARY KEY(foo, foo) */
foreach(columns, index->indexParams)
{
iparam = (IndexElem *) lfirst(columns);
if (iparam->name && strcmp(key, iparam->name) == 0)
{
if (index->primary)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
errmsg("column \"%s\" appears twice in primary key constraint",
key)));
else
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
errmsg("column \"%s\" appears twice in unique constraint",
key)));
}
}
/* OK, add it to the index definition */
iparam = makeNode(IndexElem);
iparam->name = pstrdup(key);
iparam->expr = NULL;
iparam->opclass = NIL;
iparam->ordering = SORTBY_DEFAULT;
iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
index->indexParams = lappend(index->indexParams, iparam);
}
return index;
}
/*
@ -1376,6 +1665,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
cxt.ckconstraints = NIL;
cxt.fkconstraints = NIL;
cxt.ixconstraints = NIL;
cxt.inh_indexes = NIL;
cxt.blist = NIL;
cxt.alist = NIL;
cxt.pkey = NULL;