1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-09 22:41:56 +03:00

Add exclusion constraints, which generalize the concept of uniqueness to

support any indexable commutative operator, not just equality.  Two rows
violate the exclusion constraint if "row1.col OP row2.col" is TRUE for
each of the columns in the constraint.

Jeff Davis, reviewed by Robert Haas
This commit is contained in:
Tom Lane
2009-12-07 05:22:23 +00:00
parent 8de7472b45
commit 0cb65564e5
43 changed files with 1613 additions and 236 deletions

View File

@ -11,7 +11,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.694 2009/11/20 20:38:10 tgl Exp $
* $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.695 2009/12/07 05:22:22 tgl Exp $
*
* HISTORY
* AUTHOR DATE MAJOR EVENT
@ -352,6 +352,8 @@ static TypeName *TableFuncTypeName(List *columns);
%type <node> def_arg columnElem where_clause where_or_current_clause
a_expr b_expr c_expr func_expr AexprConst indirection_el
columnref in_expr having_clause func_table array_expr
ExclusionWhereClause
%type <list> ExclusionConstraintList ExclusionConstraintElem
%type <list> func_arg_list
%type <node> func_arg_expr
%type <list> row type_list array_expr_list
@ -475,7 +477,7 @@ static TypeName *TableFuncTypeName(List *columns);
DICTIONARY DISABLE_P DISCARD DISTINCT DO DOCUMENT_P DOMAIN_P DOUBLE_P DROP
EACH ELSE ENABLE_P ENCODING ENCRYPTED END_P ENUM_P ESCAPE EXCEPT
EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN EXTERNAL EXTRACT
EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN EXTERNAL EXTRACT
FALSE_P FAMILY FETCH FIRST_P FLOAT_P FOLLOWING FOR FORCE FOREIGN FORWARD
FREEZE FROM FULL FUNCTION FUNCTIONS
@ -1591,8 +1593,16 @@ alter_table_cmds:
;
alter_table_cmd:
/* ALTER TABLE <name> ADD [COLUMN] <coldef> */
ADD_P opt_column columnDef
/* ALTER TABLE <name> ADD <coldef> */
ADD_P columnDef
{
AlterTableCmd *n = makeNode(AlterTableCmd);
n->subtype = AT_AddColumn;
n->def = $2;
$$ = (Node *)n;
}
/* ALTER TABLE <name> ADD COLUMN <coldef> */
| ADD_P COLUMN columnDef
{
AlterTableCmd *n = makeNode(AlterTableCmd);
n->subtype = AT_AddColumn;
@ -2487,6 +2497,22 @@ ConstraintElem:
n->initdeferred = ($8 & 2) != 0;
$$ = (Node *)n;
}
| EXCLUDE access_method_clause '(' ExclusionConstraintList ')'
opt_definition OptConsTableSpace ExclusionWhereClause
ConstraintAttributeSpec
{
Constraint *n = makeNode(Constraint);
n->contype = CONSTR_EXCLUSION;
n->location = @1;
n->access_method = $2;
n->exclusions = $4;
n->options = $6;
n->indexspace = $7;
n->where_clause = $8;
n->deferrable = ($9 & 1) != 0;
n->initdeferred = ($9 & 2) != 0;
$$ = (Node *)n;
}
| FOREIGN KEY '(' columnList ')' REFERENCES qualified_name
opt_column_list key_match key_actions ConstraintAttributeSpec
{
@ -2544,6 +2570,28 @@ key_match: MATCH FULL
}
;
ExclusionConstraintList:
ExclusionConstraintElem { $$ = list_make1($1); }
| ExclusionConstraintList ',' ExclusionConstraintElem
{ $$ = lappend($1, $3); }
;
ExclusionConstraintElem: index_elem WITH any_operator
{
$$ = list_make2($1, $3);
}
/* allow OPERATOR() decoration for the benefit of ruleutils.c */
| index_elem WITH OPERATOR '(' any_operator ')'
{
$$ = list_make2($1, $5);
}
;
ExclusionWhereClause:
WHERE '(' a_expr ')' { $$ = $3; }
| /*EMPTY*/ { $$ = NULL; }
;
/*
* We combine the update and delete actions into one value temporarily
* for simplicity of parsing, and then break them down again in the
@ -10619,6 +10667,7 @@ unreserved_keyword:
| ENCRYPTED
| ENUM_P
| ESCAPE
| EXCLUDE
| EXCLUDING
| EXCLUSIVE
| EXECUTE

View File

@ -19,7 +19,7 @@
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.30 2009/11/13 23:49:23 tgl Exp $
* $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.31 2009/12/07 05:22:22 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -35,6 +35,7 @@
#include "catalog/namespace.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_type.h"
#include "commands/comment.h"
#include "commands/defrem.h"
@ -456,6 +457,10 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt,
saw_default = true;
break;
case CONSTR_CHECK:
cxt->ckconstraints = lappend(cxt->ckconstraints, constraint);
break;
case CONSTR_PRIMARY:
case CONSTR_UNIQUE:
if (constraint->keys == NIL)
@ -463,8 +468,9 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt,
cxt->ixconstraints = lappend(cxt->ixconstraints, constraint);
break;
case CONSTR_CHECK:
cxt->ckconstraints = lappend(cxt->ckconstraints, constraint);
case CONSTR_EXCLUSION:
/* grammar does not allow EXCLUDE as a column constraint */
elog(ERROR, "column exclusion constraints are not supported");
break;
case CONSTR_FOREIGN:
@ -503,6 +509,7 @@ transformTableConstraint(ParseState *pstate, CreateStmtContext *cxt,
{
case CONSTR_PRIMARY:
case CONSTR_UNIQUE:
case CONSTR_EXCLUSION:
cxt->ixconstraints = lappend(cxt->ixconstraints, constraint);
break;
@ -814,7 +821,7 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
/*
* chooseIndexName
*
* Set name to unnamed index. See also the same logic in DefineIndex.
* Set name for unnamed index. See also the same logic in DefineIndex.
*/
static char *
chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt)
@ -828,6 +835,13 @@ chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt)
return ChooseRelationName(relation->relname, NULL,
"pkey", namespaceId);
}
else if (index_stmt->excludeOpNames != NIL)
{
IndexElem *iparam = (IndexElem *) linitial(index_stmt->indexParams);
return ChooseRelationName(relation->relname, iparam->name,
"exclusion", namespaceId);
}
else
{
IndexElem *iparam = (IndexElem *) linitial(index_stmt->indexParams);
@ -880,7 +894,7 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
/* Fetch pg_am tuple for source index from relcache entry */
amrec = source_idx->rd_am;
/* Must get indclass the hard way, since it's not stored in relcache */
/* Extract indclass from the pg_index tuple */
datum = SysCacheGetAttr(INDEXRELID, ht_idx,
Anum_pg_index_indclass, &isnull);
Assert(!isnull);
@ -905,12 +919,12 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
index->idxname = NULL;
/*
* If the index is marked PRIMARY, it's certainly from a constraint; else,
* if it's not marked UNIQUE, it certainly isn't. If it is or might be
* from a constraint, we have to fetch the constraint to check for
* deferrability attributes.
* If the index is marked PRIMARY or has an exclusion condition, it's
* certainly from a constraint; else, if it's not marked UNIQUE, it
* certainly isn't. If it is or might be from a constraint, we have to
* fetch the pg_constraint record.
*/
if (index->primary || index->unique)
if (index->primary || index->unique || idxrelrec->relhasexclusion)
{
Oid constraintId = get_index_constraint(source_relid);
@ -931,6 +945,53 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
index->deferrable = conrec->condeferrable;
index->initdeferred = conrec->condeferred;
/* If it's an exclusion constraint, we need the operator names */
if (idxrelrec->relhasexclusion)
{
Datum *elems;
int nElems;
int i;
Assert(conrec->contype == CONSTRAINT_EXCLUSION);
/* Extract operator OIDs from the pg_constraint tuple */
datum = SysCacheGetAttr(CONSTROID, ht_constr,
Anum_pg_constraint_conexclop,
&isnull);
if (isnull)
elog(ERROR, "null conexclop for constraint %u",
constraintId);
deconstruct_array(DatumGetArrayTypeP(datum),
OIDOID, sizeof(Oid), true, 'i',
&elems, NULL, &nElems);
for (i = 0; i < nElems; i++)
{
Oid operid = DatumGetObjectId(elems[i]);
HeapTuple opertup;
Form_pg_operator operform;
char *oprname;
char *nspname;
List *namelist;
opertup = SearchSysCache(OPEROID,
ObjectIdGetDatum(operid),
0, 0, 0);
if (!HeapTupleIsValid(opertup))
elog(ERROR, "cache lookup failed for operator %u",
operid);
operform = (Form_pg_operator) GETSTRUCT(opertup);
oprname = pstrdup(NameStr(operform->oprname));
/* For simplicity we always schema-qualify the op name */
nspname = get_namespace_name(operform->oprnamespace);
namelist = list_make2(makeString(nspname),
makeString(oprname));
index->excludeOpNames = lappend(index->excludeOpNames,
namelist);
ReleaseSysCache(opertup);
}
}
ReleaseSysCache(ht_constr);
}
else
@ -1087,7 +1148,7 @@ get_opclass(Oid opclass, Oid actual_datatype)
/*
* transformIndexConstraints
* Handle UNIQUE and PRIMARY KEY constraints, which create indexes.
* Handle UNIQUE, PRIMARY KEY, EXCLUDE constraints, which create indexes.
* We also merge in any index definitions arising from
* LIKE ... INCLUDING INDEXES.
*/
@ -1100,8 +1161,9 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
/*
* Run through the constraints that need to generate an index. For PRIMARY
* KEY, mark each column as NOT NULL and create an index. For UNIQUE,
* create an index as for PRIMARY KEY, but do not insist on NOT NULL.
* KEY, mark each column as NOT NULL and create an index. For UNIQUE or
* EXCLUDE, create an index as for PRIMARY KEY, but do not insist on NOT
* NULL.
*/
foreach(lc, cxt->ixconstraints)
{
@ -1109,7 +1171,8 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
Assert(IsA(constraint, Constraint));
Assert(constraint->contype == CONSTR_PRIMARY ||
constraint->contype == CONSTR_UNIQUE);
constraint->contype == CONSTR_UNIQUE ||
constraint->contype == CONSTR_EXCLUSION);
index = transformIndexConstraint(constraint, cxt);
@ -1167,6 +1230,7 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
if (equal(index->indexParams, priorindex->indexParams) &&
equal(index->whereClause, priorindex->whereClause) &&
equal(index->excludeOpNames, priorindex->excludeOpNames) &&
strcmp(index->accessMethod, priorindex->accessMethod) == 0 &&
index->deferrable == priorindex->deferrable &&
index->initdeferred == priorindex->initdeferred)
@ -1193,19 +1257,18 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
/*
* transformIndexConstraint
* Transform one UNIQUE or PRIMARY KEY constraint for
* Transform one UNIQUE, PRIMARY KEY, or EXCLUDE constraint for
* transformIndexConstraints.
*/
static IndexStmt *
transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
{
IndexStmt *index;
ListCell *keys;
IndexElem *iparam;
ListCell *lc;
index = makeNode(IndexStmt);
index->unique = true;
index->unique = (constraint->contype != CONSTR_EXCLUSION);
index->primary = (constraint->contype == CONSTR_PRIMARY);
if (index->primary)
{
@ -1231,25 +1294,55 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
index->idxname = NULL; /* DefineIndex will choose name */
index->relation = cxt->relation;
index->accessMethod = DEFAULT_INDEX_TYPE;
index->accessMethod = constraint->access_method ? constraint->access_method : DEFAULT_INDEX_TYPE;
index->options = constraint->options;
index->tableSpace = constraint->indexspace;
index->whereClause = constraint->where_clause;
index->indexParams = NIL;
index->whereClause = NULL;
index->excludeOpNames = NIL;
index->concurrent = false;
/*
* If it's an EXCLUDE constraint, the grammar returns a list of pairs
* of IndexElems and operator names. We have to break that apart into
* separate lists.
*/
if (constraint->contype == CONSTR_EXCLUSION)
{
foreach(lc, constraint->exclusions)
{
List *pair = (List *) lfirst(lc);
IndexElem *elem;
List *opname;
Assert(list_length(pair) == 2);
elem = (IndexElem *) linitial(pair);
Assert(IsA(elem, IndexElem));
opname = (List *) lsecond(pair);
Assert(IsA(opname, List));
index->indexParams = lappend(index->indexParams, elem);
index->excludeOpNames = lappend(index->excludeOpNames, opname);
}
return index;
}
/*
* For UNIQUE and PRIMARY KEY, we just have a list of column names.
*
* Make sure referenced keys exist. If we are making a PRIMARY KEY index,
* also make sure they are NOT NULL, if possible. (Although we could leave
* it to DefineIndex to mark the columns NOT NULL, it's more efficient to
* get it right the first time.)
*/
foreach(keys, constraint->keys)
foreach(lc, constraint->keys)
{
char *key = strVal(lfirst(keys));
char *key = strVal(lfirst(lc));
bool found = false;
ColumnDef *column = NULL;
ListCell *columns;
IndexElem *iparam;
foreach(columns, cxt->columns)
{
@ -2000,6 +2093,7 @@ transformConstraintAttrs(ParseState *pstate, List *constraintList)
((node) != NULL && \
((node)->contype == CONSTR_PRIMARY || \
(node)->contype == CONSTR_UNIQUE || \
(node)->contype == CONSTR_EXCLUSION || \
(node)->contype == CONSTR_FOREIGN))
foreach(clist, constraintList)