1
0
mirror of https://github.com/postgres/postgres.git synced 2025-08-15 14:02:29 +03:00

Add transforms feature

This provides a mechanism for specifying conversions between SQL data
types and procedural languages.  As examples, there are transforms
for hstore and ltree for PL/Perl and PL/Python.

reviews by Pavel Stěhule and Andres Freund
This commit is contained in:
Peter Eisentraut
2015-04-26 10:33:14 -04:00
parent f320cbb615
commit cac7658205
101 changed files with 6034 additions and 2811 deletions

View File

@@ -45,6 +45,7 @@
#include "catalog/pg_namespace.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_proc_fn.h"
#include "catalog/pg_transform.h"
#include "catalog/pg_type.h"
#include "catalog/pg_type_fn.h"
#include "commands/alter.h"
@@ -583,6 +584,7 @@ static void
compute_attributes_sql_style(List *options,
List **as,
char **language,
Node **transform,
bool *windowfunc_p,
char *volatility_p,
bool *strict_p,
@@ -595,6 +597,7 @@ compute_attributes_sql_style(List *options,
ListCell *option;
DefElem *as_item = NULL;
DefElem *language_item = NULL;
DefElem *transform_item = NULL;
DefElem *windowfunc_item = NULL;
DefElem *volatility_item = NULL;
DefElem *strict_item = NULL;
@@ -624,6 +627,14 @@ compute_attributes_sql_style(List *options,
errmsg("conflicting or redundant options")));
language_item = defel;
}
else if (strcmp(defel->defname, "transform") == 0)
{
if (transform_item)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
transform_item = defel;
}
else if (strcmp(defel->defname, "window") == 0)
{
if (windowfunc_item)
@@ -671,6 +682,8 @@ compute_attributes_sql_style(List *options,
}
/* process optional items */
if (transform_item)
*transform = transform_item->arg;
if (windowfunc_item)
*windowfunc_p = intVal(windowfunc_item->arg);
if (volatility_item)
@@ -807,7 +820,6 @@ interpret_AS_clause(Oid languageOid, const char *languageName,
}
/*
* CreateFunction
* Execute a CREATE FUNCTION utility statement.
@@ -822,6 +834,7 @@ CreateFunction(CreateFunctionStmt *stmt, const char *queryString)
char *language;
Oid languageOid;
Oid languageValidator;
Node *transformDefElem = NULL;
char *funcname;
Oid namespaceId;
AclResult aclresult;
@@ -831,6 +844,8 @@ CreateFunction(CreateFunctionStmt *stmt, const char *queryString)
ArrayType *parameterNames;
List *parameterDefaults;
Oid variadicArgType;
List *trftypes_list = NIL;
ArrayType *trftypes;
Oid requiredResultType;
bool isWindowFunc,
isStrict,
@@ -866,7 +881,7 @@ CreateFunction(CreateFunctionStmt *stmt, const char *queryString)
/* override attributes from explicit list */
compute_attributes_sql_style(stmt->options,
&as_clause, &language,
&as_clause, &language, &transformDefElem,
&isWindowFunc, &volatility,
&isStrict, &security, &isLeakProof,
&proconfig, &procost, &prorows);
@@ -915,6 +930,23 @@ CreateFunction(CreateFunctionStmt *stmt, const char *queryString)
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("only superuser can define a leakproof function")));
if (transformDefElem)
{
ListCell *lc;
Assert(IsA(transformDefElem, List));
foreach (lc, (List *) transformDefElem)
{
Oid typeid = typenameTypeId(NULL, lfirst(lc));
Oid elt = get_base_element_type(typeid);
typeid = elt ? elt : typeid;
get_transform_oid(typeid, languageOid, false);
trftypes_list = lappend_oid(trftypes_list, typeid);
}
}
/*
* Convert remaining parameters of CREATE to form wanted by
* ProcedureCreate.
@@ -958,6 +990,25 @@ CreateFunction(CreateFunctionStmt *stmt, const char *queryString)
returnsSet = false;
}
if (list_length(trftypes_list) > 0)
{
ListCell *lc;
Datum *arr;
int i;
arr = palloc(list_length(trftypes_list) * sizeof(Datum));
i = 0;
foreach (lc, trftypes_list)
arr[i++] = ObjectIdGetDatum(lfirst_oid(lc));
trftypes = construct_array(arr, list_length(trftypes_list),
OIDOID, sizeof(Oid), true, 'i');
}
else
{
/* store SQL NULL instead of emtpy array */
trftypes = NULL;
}
compute_attributes_with_style(stmt->withClause, &isStrict, &volatility);
interpret_AS_clause(languageOid, language, funcname, as_clause,
@@ -1014,6 +1065,7 @@ CreateFunction(CreateFunctionStmt *stmt, const char *queryString)
PointerGetDatum(parameterModes),
PointerGetDatum(parameterNames),
parameterDefaults,
PointerGetDatum(trftypes),
PointerGetDatum(proconfig),
procost,
prorows);
@@ -1653,6 +1705,293 @@ DropCastById(Oid castOid)
heap_close(relation, RowExclusiveLock);
}
static void
check_transform_function(Form_pg_proc procstruct)
{
if (procstruct->provolatile == PROVOLATILE_VOLATILE)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("transform function must not be volatile")));
if (procstruct->proisagg)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("transform function must not be an aggregate function")));
if (procstruct->proiswindow)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("transform function must not be a window function")));
if (procstruct->proretset)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("transform function must not return a set")));
if (procstruct->pronargs != 1)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("transform function must take one argument")));
if (procstruct->proargtypes.values[0] != INTERNALOID)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("first argument of transform function must be type \"internal\"")));
}
/*
* CREATE TRANSFORM
*/
Oid
CreateTransform(CreateTransformStmt *stmt)
{
Oid typeid;
char typtype;
Oid langid;
Oid fromsqlfuncid;
Oid tosqlfuncid;
AclResult aclresult;
Form_pg_proc procstruct;
Datum values[Natts_pg_transform];
bool nulls[Natts_pg_transform];
bool replaces[Natts_pg_transform];
Oid transformid;
HeapTuple tuple;
HeapTuple newtuple;
Relation relation;
ObjectAddress myself,
referenced;
bool is_replace;
/*
* Get the type
*/
typeid = typenameTypeId(NULL, stmt->type_name);
typtype = get_typtype(typeid);
if (typtype == TYPTYPE_PSEUDO)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("data type %s is a pseudo-type",
TypeNameToString(stmt->type_name))));
if (typtype == TYPTYPE_DOMAIN)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("data type %s is a domain",
TypeNameToString(stmt->type_name))));
if (!pg_type_ownercheck(typeid, GetUserId()))
aclcheck_error_type(ACLCHECK_NOT_OWNER, typeid);
aclresult = pg_type_aclcheck(typeid, GetUserId(), ACL_USAGE);
if (aclresult != ACLCHECK_OK)
aclcheck_error_type(aclresult, typeid);
/*
* Get the language
*/
langid = get_language_oid(stmt->lang, false);
aclresult = pg_language_aclcheck(langid, GetUserId(), ACL_USAGE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_LANGUAGE, stmt->lang);
/*
* Get the functions
*/
if (stmt->fromsql)
{
fromsqlfuncid = LookupFuncNameTypeNames(stmt->fromsql->funcname, stmt->fromsql->funcargs, false);
if (!pg_proc_ownercheck(fromsqlfuncid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_PROC, NameListToString(stmt->fromsql->funcname));
aclresult = pg_proc_aclcheck(fromsqlfuncid, GetUserId(), ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_PROC, NameListToString(stmt->fromsql->funcname));
tuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(fromsqlfuncid));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for function %u", fromsqlfuncid);
procstruct = (Form_pg_proc) GETSTRUCT(tuple);
if (procstruct->prorettype != INTERNALOID)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("return data type of FROM SQL function must be \"internal\"")));
check_transform_function(procstruct);
ReleaseSysCache(tuple);
}
else
fromsqlfuncid = InvalidOid;
if (stmt->tosql)
{
tosqlfuncid = LookupFuncNameTypeNames(stmt->tosql->funcname, stmt->tosql->funcargs, false);
if (!pg_proc_ownercheck(tosqlfuncid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_PROC, NameListToString(stmt->tosql->funcname));
aclresult = pg_proc_aclcheck(tosqlfuncid, GetUserId(), ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_PROC, NameListToString(stmt->tosql->funcname));
tuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(tosqlfuncid));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for function %u", tosqlfuncid);
procstruct = (Form_pg_proc) GETSTRUCT(tuple);
if (procstruct->prorettype != typeid)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("return data type of TO SQL function must be the transform data type")));
check_transform_function(procstruct);
ReleaseSysCache(tuple);
}
else
tosqlfuncid = InvalidOid;
/*
* Ready to go
*/
values[Anum_pg_transform_trftype - 1] = ObjectIdGetDatum(typeid);
values[Anum_pg_transform_trflang - 1] = ObjectIdGetDatum(langid);
values[Anum_pg_transform_trffromsql - 1] = ObjectIdGetDatum(fromsqlfuncid);
values[Anum_pg_transform_trftosql - 1] = ObjectIdGetDatum(tosqlfuncid);
MemSet(nulls, false, sizeof(nulls));
relation = heap_open(TransformRelationId, RowExclusiveLock);
tuple = SearchSysCache2(TRFTYPELANG,
ObjectIdGetDatum(typeid),
ObjectIdGetDatum(langid));
if (HeapTupleIsValid(tuple))
{
if (!stmt->replace)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("transform for type %s language %s already exists",
format_type_be(typeid),
stmt->lang)));
MemSet(replaces, false, sizeof(replaces));
replaces[Anum_pg_transform_trffromsql - 1] = true;
replaces[Anum_pg_transform_trftosql - 1] = true;
newtuple = heap_modify_tuple(tuple, RelationGetDescr(relation), values, nulls, replaces);
simple_heap_update(relation, &newtuple->t_self, newtuple);
transformid = HeapTupleGetOid(tuple);
ReleaseSysCache(tuple);
is_replace = true;
}
else
{
newtuple = heap_form_tuple(RelationGetDescr(relation), values, nulls);
transformid = simple_heap_insert(relation, newtuple);
is_replace = false;
}
CatalogUpdateIndexes(relation, newtuple);
if (is_replace)
deleteDependencyRecordsFor(TransformRelationId, transformid, true);
/* make dependency entries */
myself.classId = TransformRelationId;
myself.objectId = transformid;
myself.objectSubId = 0;
/* dependency on language */
referenced.classId = LanguageRelationId;
referenced.objectId = langid;
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
/* dependency on type */
referenced.classId = TypeRelationId;
referenced.objectId = typeid;
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
/* dependencies on functions */
if (OidIsValid(fromsqlfuncid))
{
referenced.classId = ProcedureRelationId;
referenced.objectId = fromsqlfuncid;
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
}
if (OidIsValid(tosqlfuncid))
{
referenced.classId = ProcedureRelationId;
referenced.objectId = tosqlfuncid;
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
}
/* dependency on extension */
recordDependencyOnCurrentExtension(&myself, is_replace);
/* Post creation hook for new transform */
InvokeObjectPostCreateHook(TransformRelationId, transformid, 0);
heap_freetuple(newtuple);
heap_close(relation, RowExclusiveLock);
return transformid;
}
/*
* get_transform_oid - given type OID and language OID, look up a transform OID
*
* If missing_ok is false, throw an error if the transform is not found. If
* true, just return InvalidOid.
*/
Oid
get_transform_oid(Oid type_id, Oid lang_id, bool missing_ok)
{
Oid oid;
oid = GetSysCacheOid2(TRFTYPELANG,
ObjectIdGetDatum(type_id),
ObjectIdGetDatum(lang_id));
if (!OidIsValid(oid) && !missing_ok)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("transform for type %s language \"%s\" does not exist",
format_type_be(type_id),
get_language_name(lang_id, false))));
return oid;
}
void
DropTransformById(Oid transformOid)
{
Relation relation;
ScanKeyData scankey;
SysScanDesc scan;
HeapTuple tuple;
relation = heap_open(TransformRelationId, RowExclusiveLock);
ScanKeyInit(&scankey,
ObjectIdAttributeNumber,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(transformOid));
scan = systable_beginscan(relation, TransformOidIndexId, true,
NULL, 1, &scankey);
tuple = systable_getnext(scan);
if (!HeapTupleIsValid(tuple))
elog(ERROR, "could not find tuple for transform %u", transformOid);
simple_heap_delete(relation, &tuple->t_self);
systable_endscan(scan);
heap_close(relation, RowExclusiveLock);
}
/*
* Subroutine for ALTER FUNCTION/AGGREGATE SET SCHEMA/RENAME
*