1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-25 13:17:41 +03:00

DDL support for collations

- collowner field
- CREATE COLLATION
- ALTER COLLATION
- DROP COLLATION
- COMMENT ON COLLATION
- integration with extensions
- pg_dump support for the above
- dependency management
- psql tab completion
- psql \dO command
This commit is contained in:
Peter Eisentraut
2011-02-12 15:54:13 +02:00
parent d31e2a495b
commit b313bca0af
51 changed files with 1860 additions and 91 deletions

View File

@@ -13,7 +13,7 @@ top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
OBJS = aggregatecmds.o alter.o analyze.o async.o cluster.o comment.o \
constraint.o conversioncmds.o copy.o \
collationcmds.o constraint.o conversioncmds.o copy.o \
dbcommands.o define.o discard.o explain.o extension.o \
foreigncmds.o functioncmds.o \
indexcmds.o lockcmds.o operatorcmds.o opclasscmds.o \

View File

@@ -20,6 +20,7 @@
#include "catalog/pg_largeobject.h"
#include "catalog/pg_namespace.h"
#include "commands/alter.h"
#include "commands/collationcmds.h"
#include "commands/conversioncmds.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
@@ -53,6 +54,10 @@ ExecRenameStmt(RenameStmt *stmt)
RenameAggregate(stmt->object, stmt->objarg, stmt->newname);
break;
case OBJECT_COLLATION:
RenameCollation(stmt->object, stmt->newname);
break;
case OBJECT_CONVERSION:
RenameConversion(stmt->object, stmt->newname);
break;
@@ -185,6 +190,10 @@ ExecAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt)
stmt->newschema);
break;
case OBJECT_COLLATION:
AlterCollationNamespace(stmt->object, stmt->newschema);
break;
case OBJECT_CONVERSION:
AlterConversionNamespace(stmt->object, stmt->newschema);
break;
@@ -302,6 +311,10 @@ AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid)
oldNspOid = AlterTypeNamespace_oid(objid, nspOid);
break;
case OCLASS_COLLATION:
oldNspOid = AlterCollationNamespace_oid(objid, nspOid);
break;
case OCLASS_CONVERSION:
oldNspOid = AlterConversionNamespace_oid(objid, nspOid);
break;
@@ -478,6 +491,10 @@ ExecAlterOwnerStmt(AlterOwnerStmt *stmt)
AlterAggregateOwner(stmt->object, stmt->objarg, newowner);
break;
case OBJECT_COLLATION:
AlterCollationOwner(stmt->object, newowner);
break;
case OBJECT_CONVERSION:
AlterConversionOwner(stmt->object, newowner);
break;

View File

@@ -0,0 +1,401 @@
/*-------------------------------------------------------------------------
*
* collationcmds.c
* collation creation command support code
*
* Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/commands/collationcmds.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/heapam.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_collation_fn.h"
#include "commands/alter.h"
#include "commands/collationcmds.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "parser/parse_type.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
static void AlterCollationOwner_internal(Relation rel, Oid collationOid,
Oid newOwnerId);
/*
* CREATE COLLATION
*/
void
DefineCollation(List *names, List *parameters)
{
char *collName;
Oid collNamespace;
AclResult aclresult;
ListCell *pl;
DefElem *fromEl = NULL;
DefElem *localeEl = NULL;
DefElem *lccollateEl = NULL;
DefElem *lcctypeEl = NULL;
char *collcollate = NULL;
char *collctype = NULL;
collNamespace = QualifiedNameGetCreationNamespace(names, &collName);
aclresult = pg_namespace_aclcheck(collNamespace, GetUserId(), ACL_CREATE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
get_namespace_name(collNamespace));
foreach(pl, parameters)
{
DefElem *defel = (DefElem *) lfirst(pl);
DefElem **defelp;
if (pg_strcasecmp(defel->defname, "from") == 0)
defelp = &fromEl;
else if (pg_strcasecmp(defel->defname, "locale") == 0)
defelp = &localeEl;
else if (pg_strcasecmp(defel->defname, "lc_collate") == 0)
defelp = &lccollateEl;
else if (pg_strcasecmp(defel->defname, "lc_ctype") == 0)
defelp = &lcctypeEl;
else
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("collation attribute \"%s\" not recognized",
defel->defname)));
break;
}
*defelp = defel;
}
if ((localeEl && (lccollateEl || lcctypeEl))
|| (fromEl && list_length(parameters) != 1))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
if (fromEl)
{
Oid collid;
HeapTuple tp;
collid = LookupCollation(NULL, defGetQualifiedName(fromEl), -1);
tp = SearchSysCache1(COLLOID, ObjectIdGetDatum(collid));
if (!HeapTupleIsValid(tp))
elog(ERROR, "cache lookup failed for collation %u", collid);
collcollate = pstrdup(NameStr(((Form_pg_collation) GETSTRUCT(tp))->collcollate));
collctype = pstrdup(NameStr(((Form_pg_collation) GETSTRUCT(tp))->collctype));
ReleaseSysCache(tp);
}
if (localeEl)
{
collcollate = defGetString(localeEl);
collctype = defGetString(localeEl);
}
if (lccollateEl)
collcollate = defGetString(lccollateEl);
if (lcctypeEl)
collctype = defGetString(lcctypeEl);
if (!collcollate)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("parameter \"lc_collate\" parameter must be specified")));
if (!collctype)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("parameter \"lc_ctype\" must be specified")));
check_encoding_locale_matches(GetDatabaseEncoding(), collcollate, collctype);
CollationCreate(collName,
collNamespace,
GetUserId(),
GetDatabaseEncoding(),
collcollate,
collctype);
}
/*
* DROP COLLATION
*/
void
DropCollationsCommand(DropStmt *drop)
{
ObjectAddresses *objects;
ListCell *cell;
/*
* First we identify all the collations, then we delete them in a single
* performMultipleDeletions() call. This is to avoid unwanted DROP
* RESTRICT errors if one of the collations depends on another. (Not that
* that is very likely, but we may as well do this consistently.)
*/
objects = new_object_addresses();
foreach(cell, drop->objects)
{
List *name = (List *) lfirst(cell);
Oid collationOid;
HeapTuple tuple;
Form_pg_collation coll;
ObjectAddress object;
collationOid = get_collation_oid(name, drop->missing_ok);
if (!OidIsValid(collationOid))
{
ereport(NOTICE,
(errmsg("collation \"%s\" does not exist, skipping",
NameListToString(name))));
continue;
}
tuple = SearchSysCache1(COLLOID, ObjectIdGetDatum(collationOid));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for collation %u",
collationOid);
coll = (Form_pg_collation) GETSTRUCT(tuple);
/* Permission check: must own collation or its namespace */
if (!pg_collation_ownercheck(collationOid, GetUserId()) &&
!pg_namespace_ownercheck(coll->collnamespace, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_COLLATION,
NameStr(coll->collname));
object.classId = CollationRelationId;
object.objectId = collationOid;
object.objectSubId = 0;
add_exact_object_address(&object, objects);
ReleaseSysCache(tuple);
}
performMultipleDeletions(objects, drop->behavior);
free_object_addresses(objects);
}
/*
* Rename collation
*/
void
RenameCollation(List *name, const char *newname)
{
Oid collationOid;
Oid namespaceOid;
HeapTuple tup;
Relation rel;
AclResult aclresult;
rel = heap_open(CollationRelationId, RowExclusiveLock);
collationOid = get_collation_oid(name, false);
tup = SearchSysCacheCopy1(COLLOID, ObjectIdGetDatum(collationOid));
if (!HeapTupleIsValid(tup)) /* should not happen */
elog(ERROR, "cache lookup failed for collation %u", collationOid);
namespaceOid = ((Form_pg_collation) GETSTRUCT(tup))->collnamespace;
/* make sure the new name doesn't exist */
if (SearchSysCacheExists3(COLLNAMEENCNSP,
CStringGetDatum(newname),
Int32GetDatum(GetDatabaseEncoding()),
ObjectIdGetDatum(namespaceOid)))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("collation \"%s\" for current database encoding \"%s\" already exists in schema \"%s\"",
newname,
GetDatabaseEncodingName(),
get_namespace_name(namespaceOid))));
/* must be owner */
if (!pg_collation_ownercheck(collationOid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_COLLATION,
NameListToString(name));
/* must have CREATE privilege on namespace */
aclresult = pg_namespace_aclcheck(namespaceOid, GetUserId(), ACL_CREATE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
get_namespace_name(namespaceOid));
/* rename */
namestrcpy(&(((Form_pg_collation) GETSTRUCT(tup))->collname), newname);
simple_heap_update(rel, &tup->t_self, tup);
CatalogUpdateIndexes(rel, tup);
heap_close(rel, NoLock);
heap_freetuple(tup);
}
/*
* Change collation owner, by name
*/
void
AlterCollationOwner(List *name, Oid newOwnerId)
{
Oid collationOid;
Relation rel;
rel = heap_open(CollationRelationId, RowExclusiveLock);
collationOid = get_collation_oid(name, false);
AlterCollationOwner_internal(rel, collationOid, newOwnerId);
heap_close(rel, NoLock);
}
/*
* Change collation owner, by oid
*/
void
AlterCollationOwner_oid(Oid collationOid, Oid newOwnerId)
{
Relation rel;
rel = heap_open(CollationRelationId, RowExclusiveLock);
AlterCollationOwner_internal(rel, collationOid, newOwnerId);
heap_close(rel, NoLock);
}
/*
* AlterCollationOwner_internal
*
* Internal routine for changing the owner. rel must be pg_collation, already
* open and suitably locked; it will not be closed.
*/
static void
AlterCollationOwner_internal(Relation rel, Oid collationOid, Oid newOwnerId)
{
Form_pg_collation collForm;
HeapTuple tup;
Assert(RelationGetRelid(rel) == CollationRelationId);
tup = SearchSysCacheCopy1(COLLOID, ObjectIdGetDatum(collationOid));
if (!HeapTupleIsValid(tup)) /* should not happen */
elog(ERROR, "cache lookup failed for collation %u", collationOid);
collForm = (Form_pg_collation) GETSTRUCT(tup);
/*
* If the new owner is the same as the existing owner, consider the
* command to have succeeded. This is for dump restoration purposes.
*/
if (collForm->collowner != newOwnerId)
{
AclResult aclresult;
/* Superusers can always do it */
if (!superuser())
{
/* Otherwise, must be owner of the existing object */
if (!pg_collation_ownercheck(HeapTupleGetOid(tup), GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_COLLATION,
NameStr(collForm->collname));
/* Must be able to become new owner */
check_is_member_of_role(GetUserId(), newOwnerId);
/* New owner must have CREATE privilege on namespace */
aclresult = pg_namespace_aclcheck(collForm->collnamespace,
newOwnerId,
ACL_CREATE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
get_namespace_name(collForm->collnamespace));
}
/*
* Modify the owner --- okay to scribble on tup because it's a copy
*/
collForm->collowner = newOwnerId;
simple_heap_update(rel, &tup->t_self, tup);
CatalogUpdateIndexes(rel, tup);
/* Update owner dependency reference */
changeDependencyOnOwner(CollationRelationId, collationOid,
newOwnerId);
}
heap_freetuple(tup);
}
/*
* Execute ALTER COLLATION SET SCHEMA
*/
void
AlterCollationNamespace(List *name, const char *newschema)
{
Oid collOid, nspOid;
Relation rel;
rel = heap_open(CollationRelationId, RowExclusiveLock);
collOid = get_collation_oid(name, false);
/* get schema OID */
nspOid = LookupCreationNamespace(newschema);
AlterObjectNamespace(rel, COLLOID, -1,
collOid, nspOid,
Anum_pg_collation_collname,
Anum_pg_collation_collnamespace,
Anum_pg_collation_collowner,
ACL_KIND_COLLATION);
heap_close(rel, NoLock);
}
/*
* Change collation schema, by oid
*/
Oid
AlterCollationNamespace_oid(Oid collOid, Oid newNspOid)
{
Oid oldNspOid;
Relation rel;
rel = heap_open(CollationRelationId, RowExclusiveLock);
oldNspOid = AlterObjectNamespace(rel, COLLOID, -1,
collOid, newNspOid,
Anum_pg_collation_collname,
Anum_pg_collation_collnamespace,
Anum_pg_collation_collowner,
ACL_KIND_COLLATION);
heap_close(rel, RowExclusiveLock);
return oldNspOid;
}

View File

@@ -133,6 +133,11 @@ CommentObject(CommentStmt *stmt)
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_NAMESPACE,
strVal(linitial(stmt->objname)));
break;
case OBJECT_COLLATION:
if (!pg_collation_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_COLLATION,
NameListToString(stmt->objname));
break;
case OBJECT_CONVERSION:
if (!pg_conversion_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CONVERSION,

View File

@@ -129,8 +129,6 @@ createdb(const CreatedbStmt *stmt)
char *dbctype = NULL;
int encoding = -1;
int dbconnlimit = -1;
int ctype_encoding;
int collate_encoding;
int notherbackends;
int npreparedxacts;
createdb_failure_params fparms;
@@ -334,60 +332,7 @@ createdb(const CreatedbStmt *stmt)
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("invalid locale name %s", dbctype)));
/*
* Check whether chosen encoding matches chosen locale settings. This
* restriction is necessary because libc's locale-specific code usually
* fails when presented with data in an encoding it's not expecting. We
* allow mismatch in four cases:
*
* 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
* which works with any encoding.
*
* 2. locale encoding = -1, which means that we couldn't determine the
* locale's encoding and have to trust the user to get it right.
*
* 3. selected encoding is UTF8 and platform is win32. This is because
* UTF8 is a pseudo codepage that is supported in all locales since it's
* converted to UTF16 before being used.
*
* 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
* is risky but we have historically allowed it --- notably, the
* regression tests require it.
*
* Note: if you change this policy, fix initdb to match.
*/
ctype_encoding = pg_get_encoding_from_locale(dbctype, true);
collate_encoding = pg_get_encoding_from_locale(dbcollate, true);
if (!(ctype_encoding == encoding ||
ctype_encoding == PG_SQL_ASCII ||
ctype_encoding == -1 ||
#ifdef WIN32
encoding == PG_UTF8 ||
#endif
(encoding == PG_SQL_ASCII && superuser())))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("encoding %s does not match locale %s",
pg_encoding_to_char(encoding),
dbctype),
errdetail("The chosen LC_CTYPE setting requires encoding %s.",
pg_encoding_to_char(ctype_encoding))));
if (!(collate_encoding == encoding ||
collate_encoding == PG_SQL_ASCII ||
collate_encoding == -1 ||
#ifdef WIN32
encoding == PG_UTF8 ||
#endif
(encoding == PG_SQL_ASCII && superuser())))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("encoding %s does not match locale %s",
pg_encoding_to_char(encoding),
dbcollate),
errdetail("The chosen LC_COLLATE setting requires encoding %s.",
pg_encoding_to_char(collate_encoding))));
check_encoding_locale_matches(encoding, dbcollate, dbctype);
/*
* Check that the new encoding and locale settings match the source
@@ -710,6 +655,65 @@ createdb(const CreatedbStmt *stmt)
PointerGetDatum(&fparms));
}
/*
* Check whether chosen encoding matches chosen locale settings. This
* restriction is necessary because libc's locale-specific code usually
* fails when presented with data in an encoding it's not expecting. We
* allow mismatch in four cases:
*
* 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
* which works with any encoding.
*
* 2. locale encoding = -1, which means that we couldn't determine the
* locale's encoding and have to trust the user to get it right.
*
* 3. selected encoding is UTF8 and platform is win32. This is because
* UTF8 is a pseudo codepage that is supported in all locales since it's
* converted to UTF16 before being used.
*
* 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
* is risky but we have historically allowed it --- notably, the
* regression tests require it.
*
* Note: if you change this policy, fix initdb to match.
*/
void
check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
{
int ctype_encoding = pg_get_encoding_from_locale(ctype, true);
int collate_encoding = pg_get_encoding_from_locale(collate, true);
if (!(ctype_encoding == encoding ||
ctype_encoding == PG_SQL_ASCII ||
ctype_encoding == -1 ||
#ifdef WIN32
encoding == PG_UTF8 ||
#endif
(encoding == PG_SQL_ASCII && superuser())))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("encoding %s does not match locale %s",
pg_encoding_to_char(encoding),
ctype),
errdetail("The chosen LC_CTYPE setting requires encoding %s.",
pg_encoding_to_char(ctype_encoding))));
if (!(collate_encoding == encoding ||
collate_encoding == PG_SQL_ASCII ||
collate_encoding == -1 ||
#ifdef WIN32
encoding == PG_UTF8 ||
#endif
(encoding == PG_SQL_ASCII && superuser())))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("encoding %s does not match locale %s",
pg_encoding_to_char(encoding),
collate),
errdetail("The chosen LC_COLLATE setting requires encoding %s.",
pg_encoding_to_char(collate_encoding))));
}
/* Error cleanup callback for createdb */
static void
createdb_failure_callback(int code, Datum arg)

View File

@@ -27,6 +27,7 @@
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_depend.h"
#include "catalog/pg_foreign_table.h"
@@ -293,7 +294,7 @@ static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, bool recu
AlterTableCmd *cmd, LOCKMODE lockmode);
static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
ColumnDef *colDef, bool isOid, LOCKMODE lockmode);
static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid, Oid collid);
static void ATPrepAddOids(List **wqueue, Relation rel, bool recurse,
AlterTableCmd *cmd, LOCKMODE lockmode);
static void ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode);
@@ -4369,14 +4370,14 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
/*
* Add needed dependency entries for the new column.
*/
add_column_datatype_dependency(myrelid, newattnum, attribute.atttypid);
add_column_datatype_dependency(myrelid, newattnum, attribute.atttypid, attribute.attcollation);
}
/*
* Install a column's dependency on its datatype.
*/
static void
add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid)
add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid, Oid collid)
{
ObjectAddress myself,
referenced;
@@ -4388,6 +4389,14 @@ add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid)
referenced.objectId = typid;
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
if (collid)
{
referenced.classId = CollationRelationId;
referenced.objectId = collid;
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
}
}
/*
@@ -6877,6 +6886,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
case OCLASS_PROC:
case OCLASS_TYPE:
case OCLASS_CAST:
case OCLASS_COLLATION:
case OCLASS_CONVERSION:
case OCLASS_LANGUAGE:
case OCLASS_LARGEOBJECT:
@@ -6918,7 +6928,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
/*
* Now scan for dependencies of this column on other things. The only
* thing we should find is the dependency on the column datatype, which we
* want to remove.
* want to remove, and possibly an associated collation.
*/
ScanKeyInit(&key[0],
Anum_pg_depend_classid,
@@ -6943,8 +6953,10 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
if (foundDep->deptype != DEPENDENCY_NORMAL)
elog(ERROR, "found unexpected dependency type '%c'",
foundDep->deptype);
if (foundDep->refclassid != TypeRelationId ||
foundDep->refobjid != attTup->atttypid)
if (!(foundDep->refclassid == TypeRelationId &&
foundDep->refobjid == attTup->atttypid) &&
!(foundDep->refclassid == CollationRelationId &&
foundDep->refobjid == attTup->attcollation))
elog(ERROR, "found unexpected dependency for column");
simple_heap_delete(depRel, &depTup->t_self);
@@ -6977,7 +6989,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
heap_close(attrelation, RowExclusiveLock);
/* Install dependency on new datatype */
add_column_datatype_dependency(RelationGetRelid(rel), attnum, targettype);
add_column_datatype_dependency(RelationGetRelid(rel), attnum, targettype, targetcollid);
/*
* Drop any pg_statistic entry for the column, since it's now wrong type

View File

@@ -1736,6 +1736,7 @@ AlterDomainDefault(List *names, Node *defaultRaw)
InvalidOid,
false, /* a domain isn't an implicit array */
typTup->typbasetype,
typTup->typcollation,
defaultExpr,
true); /* Rebuild is true */