1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-17 17:02:08 +03:00

Identity columns

This is the SQL standard-conforming variant of PostgreSQL's serial
columns.  It fixes a few usability issues that serial columns have:

- CREATE TABLE / LIKE copies default but refers to same sequence
- cannot add/drop serialness with ALTER TABLE
- dropping default does not drop sequence
- need to grant separate privileges to sequence
- other slight weirdnesses because serial is some kind of special macro

Reviewed-by: Vitaly Burovoy <vitaly.burovoy@gmail.com>
This commit is contained in:
Peter Eisentraut
2017-04-06 08:33:16 -04:00
parent 6bad580d9e
commit 3217327053
57 changed files with 2140 additions and 202 deletions

View File

@ -42,6 +42,7 @@
#include "catalog/pg_type.h"
#include "commands/comment.h"
#include "commands/defrem.h"
#include "commands/sequence.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "miscadmin.h"
@ -356,6 +357,132 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
return result;
}
static void
generateSerialExtraStmts(CreateStmtContext *cxt, ColumnDef *column,
Oid seqtypid, List *seqoptions, bool for_identity,
char **snamespace_p, char **sname_p)
{
ListCell *option;
DefElem *nameEl = NULL;
Oid snamespaceid;
char *snamespace;
char *sname;
CreateSeqStmt *seqstmt;
AlterSeqStmt *altseqstmt;
List *attnamelist;
/*
* Determine namespace and name to use for the sequence.
*
* First, check if a sequence name was passed in as an option. This is
* used by pg_dump. Else, generate a name.
*
* Although we use ChooseRelationName, it's not guaranteed that the
* selected sequence name won't conflict; given sufficiently long
* field names, two different serial columns in the same table could
* be assigned the same sequence name, and we'd not notice since we
* aren't creating the sequence quite yet. In practice this seems
* quite unlikely to be a problem, especially since few people would
* need two serial columns in one table.
*/
foreach(option, seqoptions)
{
DefElem *defel = castNode(DefElem, lfirst(option));
if (strcmp(defel->defname, "sequence_name") == 0)
{
if (nameEl)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
nameEl = defel;
}
}
if (nameEl)
{
RangeVar *rv = makeRangeVarFromNameList(castNode(List, nameEl->arg));
snamespace = rv->schemaname;
sname = rv->relname;
seqoptions = list_delete_ptr(seqoptions, nameEl);
}
else
{
if (cxt->rel)
snamespaceid = RelationGetNamespace(cxt->rel);
else
{
snamespaceid = RangeVarGetCreationNamespace(cxt->relation);
RangeVarAdjustRelationPersistence(cxt->relation, snamespaceid);
}
snamespace = get_namespace_name(snamespaceid);
sname = ChooseRelationName(cxt->relation->relname,
column->colname,
"seq",
snamespaceid);
}
ereport(DEBUG1,
(errmsg("%s will create implicit sequence \"%s\" for serial column \"%s.%s\"",
cxt->stmtType, sname,
cxt->relation->relname, column->colname)));
/*
* Build a CREATE SEQUENCE command to create the sequence object, and
* add it to the list of things to be done before this CREATE/ALTER
* TABLE.
*/
seqstmt = makeNode(CreateSeqStmt);
seqstmt->for_identity = for_identity;
seqstmt->sequence = makeRangeVar(snamespace, sname, -1);
seqstmt->options = seqoptions;
/*
* If a sequence data type was specified, add it to the options. Prepend
* to the list rather than append; in case a user supplied their own AS
* clause, the "redundant options" error will point to their occurrence,
* not our synthetic one.
*/
if (seqtypid)
seqstmt->options = lcons(makeDefElem("as", (Node *) makeTypeNameFromOid(seqtypid, -1), -1),
seqstmt->options);
/*
* If this is ALTER ADD COLUMN, make sure the sequence will be owned
* by the table's owner. The current user might be someone else
* (perhaps a superuser, or someone who's only a member of the owning
* role), but the SEQUENCE OWNED BY mechanisms will bleat unless table
* and sequence have exactly the same owning role.
*/
if (cxt->rel)
seqstmt->ownerId = cxt->rel->rd_rel->relowner;
else
seqstmt->ownerId = InvalidOid;
cxt->blist = lappend(cxt->blist, seqstmt);
/*
* Build an ALTER SEQUENCE ... OWNED BY command to mark the sequence
* as owned by this column, and add it to the list of things to be
* done after this CREATE/ALTER TABLE.
*/
altseqstmt = makeNode(AlterSeqStmt);
altseqstmt->sequence = makeRangeVar(snamespace, sname, -1);
attnamelist = list_make3(makeString(snamespace),
makeString(cxt->relation->relname),
makeString(column->colname));
altseqstmt->options = list_make1(makeDefElem("owned_by",
(Node *) attnamelist, -1));
altseqstmt->for_identity = for_identity;
cxt->alist = lappend(cxt->alist, altseqstmt);
if (snamespace_p)
*snamespace_p = snamespace;
if (sname_p)
*sname_p = sname;
}
/*
* transformColumnDefinition -
* transform a single ColumnDef within CREATE TABLE
@ -367,7 +494,7 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column)
bool is_serial;
bool saw_nullable;
bool saw_default;
Constraint *constraint;
bool saw_identity;
ListCell *clist;
cxt->columns = lappend(cxt->columns, column);
@ -422,83 +549,17 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column)
/* Special actions for SERIAL pseudo-types */
if (is_serial)
{
Oid snamespaceid;
char *snamespace;
char *sname;
char *qstring;
A_Const *snamenode;
TypeCast *castnode;
FuncCall *funccallnode;
CreateSeqStmt *seqstmt;
AlterSeqStmt *altseqstmt;
List *attnamelist;
Constraint *constraint;
/*
* Determine namespace and name to use for the sequence.
*
* Although we use ChooseRelationName, it's not guaranteed that the
* selected sequence name won't conflict; given sufficiently long
* field names, two different serial columns in the same table could
* be assigned the same sequence name, and we'd not notice since we
* aren't creating the sequence quite yet. In practice this seems
* quite unlikely to be a problem, especially since few people would
* need two serial columns in one table.
*/
if (cxt->rel)
snamespaceid = RelationGetNamespace(cxt->rel);
else
{
snamespaceid = RangeVarGetCreationNamespace(cxt->relation);
RangeVarAdjustRelationPersistence(cxt->relation, snamespaceid);
}
snamespace = get_namespace_name(snamespaceid);
sname = ChooseRelationName(cxt->relation->relname,
column->colname,
"seq",
snamespaceid);
ereport(DEBUG1,
(errmsg("%s will create implicit sequence \"%s\" for serial column \"%s.%s\"",
cxt->stmtType, sname,
cxt->relation->relname, column->colname)));
/*
* Build a CREATE SEQUENCE command to create the sequence object, and
* add it to the list of things to be done before this CREATE/ALTER
* TABLE.
*/
seqstmt = makeNode(CreateSeqStmt);
seqstmt->sequence = makeRangeVar(snamespace, sname, -1);
seqstmt->options = list_make1(makeDefElem("as", (Node *) makeTypeNameFromOid(column->typeName->typeOid, -1), -1));
/*
* If this is ALTER ADD COLUMN, make sure the sequence will be owned
* by the table's owner. The current user might be someone else
* (perhaps a superuser, or someone who's only a member of the owning
* role), but the SEQUENCE OWNED BY mechanisms will bleat unless table
* and sequence have exactly the same owning role.
*/
if (cxt->rel)
seqstmt->ownerId = cxt->rel->rd_rel->relowner;
else
seqstmt->ownerId = InvalidOid;
cxt->blist = lappend(cxt->blist, seqstmt);
/*
* Build an ALTER SEQUENCE ... OWNED BY command to mark the sequence
* as owned by this column, and add it to the list of things to be
* done after this CREATE/ALTER TABLE.
*/
altseqstmt = makeNode(AlterSeqStmt);
altseqstmt->sequence = makeRangeVar(snamespace, sname, -1);
attnamelist = list_make3(makeString(snamespace),
makeString(cxt->relation->relname),
makeString(column->colname));
altseqstmt->options = list_make1(makeDefElem("owned_by",
(Node *) attnamelist, -1));
cxt->alist = lappend(cxt->alist, altseqstmt);
generateSerialExtraStmts(cxt, column,
column->typeName->typeOid, NIL, false,
&snamespace, &sname);
/*
* Create appropriate constraints for SERIAL. We do this in full,
@ -540,10 +601,11 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column)
saw_nullable = false;
saw_default = false;
saw_identity = false;
foreach(clist, column->constraints)
{
constraint = castNode(Constraint, lfirst(clist));
Constraint *constraint = castNode(Constraint, lfirst(clist));
switch (constraint->contype)
{
@ -584,6 +646,33 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column)
saw_default = true;
break;
case CONSTR_IDENTITY:
{
Type ctype;
Oid typeOid;
ctype = typenameType(cxt->pstate, column->typeName, NULL);
typeOid = HeapTupleGetOid(ctype);
ReleaseSysCache(ctype);
if (saw_identity)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("multiple identity specifications for column \"%s\" of table \"%s\"",
column->colname, cxt->relation->relname),
parser_errposition(cxt->pstate,
constraint->location)));
generateSerialExtraStmts(cxt, column,
typeOid, constraint->options, true,
NULL, NULL);
column->identity = constraint->generated_when;
saw_identity = true;
column->is_not_null = TRUE;
break;
}
case CONSTR_CHECK:
cxt->ckconstraints = lappend(cxt->ckconstraints, constraint);
break;
@ -660,6 +749,14 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column)
constraint->contype);
break;
}
if (saw_default && saw_identity)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("both default and identity specified for column \"%s\" of table \"%s\"",
column->colname, cxt->relation->relname),
parser_errposition(cxt->pstate,
constraint->location)));
}
/*
@ -932,6 +1029,27 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
def->cooked_default = this_default;
}
/*
* Copy identity if requested
*/
if (attribute->attidentity &&
(table_like_clause->options & CREATE_TABLE_LIKE_IDENTITY))
{
Oid seq_relid;
List *seq_options;
/*
* find sequence owned by old column; extract sequence parameters;
* build new create sequence command
*/
seq_relid = getOwnedSequence(RelationGetRelid(relation), attribute->attnum);
seq_options = sequence_options(seq_relid);
generateSerialExtraStmts(cxt, def,
InvalidOid, seq_options, true,
NULL, NULL);
def->identity = attribute->attidentity;
}
/* Likewise, copy storage if requested */
if (table_like_clause->options & CREATE_TABLE_LIKE_STORAGE)
def->storage = attribute->attstorage;
@ -2628,6 +2746,7 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
case AT_AlterColumnType:
{
ColumnDef *def = (ColumnDef *) cmd->def;
AttrNumber attnum;
/*
* For ALTER COLUMN TYPE, transform the USING clause if
@ -2640,6 +2759,103 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
EXPR_KIND_ALTER_COL_TRANSFORM);
}
/*
* For identity column, create ALTER SEQUENCE command to
* change the data type of the sequence.
*/
attnum = get_attnum(relid, cmd->name);
/* if attribute not found, something will error about it later */
if (attnum != InvalidAttrNumber && get_attidentity(relid, attnum))
{
Oid seq_relid = getOwnedSequence(relid, attnum);
Oid typeOid = typenameTypeId(pstate, def->typeName);
AlterSeqStmt *altseqstmt = makeNode(AlterSeqStmt);
altseqstmt->sequence = makeRangeVar(get_namespace_name(get_rel_namespace(seq_relid)),
get_rel_name(seq_relid),
-1);
altseqstmt->options = list_make1(makeDefElem("as", (Node *) makeTypeNameFromOid(typeOid, -1), -1));
altseqstmt->for_identity = true;
cxt.blist = lappend(cxt.blist, altseqstmt);
}
newcmds = lappend(newcmds, cmd);
break;
}
case AT_AddIdentity:
{
Constraint *def = castNode(Constraint, cmd->def);
ColumnDef *newdef = makeNode(ColumnDef);
AttrNumber attnum;
newdef->colname = cmd->name;
newdef->identity = def->generated_when;
cmd->def = (Node *) newdef;
attnum = get_attnum(relid, cmd->name);
/* if attribute not found, something will error about it later */
if (attnum != InvalidAttrNumber)
generateSerialExtraStmts(&cxt, newdef,
get_atttype(relid, attnum),
def->options, true,
NULL, NULL);
newcmds = lappend(newcmds, cmd);
break;
}
case AT_SetIdentity:
{
/*
* Create an ALTER SEQUENCE statement for the internal
* sequence of the identity column.
*/
ListCell *lc;
List *newseqopts = NIL;
List *newdef = NIL;
List *seqlist;
AttrNumber attnum;
/*
* Split options into those handled by ALTER SEQUENCE and
* those for ALTER TABLE proper.
*/
foreach(lc, castNode(List, cmd->def))
{
DefElem *def = castNode(DefElem, lfirst(lc));
if (strcmp(def->defname, "generated") == 0)
newdef = lappend(newdef, def);
else
newseqopts = lappend(newseqopts, def);
}
attnum = get_attnum(relid, cmd->name);
if (attnum)
{
seqlist = getOwnedSequences(relid, attnum);
if (seqlist)
{
AlterSeqStmt *seqstmt;
Oid seq_relid;
seqstmt = makeNode(AlterSeqStmt);
seq_relid = linitial_oid(seqlist);
seqstmt->sequence = makeRangeVar(get_namespace_name(get_rel_namespace(seq_relid)),
get_rel_name(seq_relid), -1);
seqstmt->options = newseqopts;
seqstmt->for_identity = true;
seqstmt->missing_ok = false;
cxt.alist = lappend(cxt.alist, seqstmt);
}
}
/* If column was not found or was not an identity column, we
* just let the ALTER TABLE command error out later. */
cmd->def = (Node *) newdef;
newcmds = lappend(newcmds, cmd);
break;
}