1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-13 07:41:39 +03:00

Attached is an update to contrib/tablefunc. It introduces a new

function, connectby(), which can serve as a reference implementation for

the changes made in the last few days -- namely the ability of a
function to return an entire tuplestore, and the ability of a function
to make use of the query provided "expected" tuple description.

Description:

   connectby(text relname, text keyid_fld, text parent_keyid_fld,
     text start_with, int max_depth [, text branch_delim])
   - returns keyid, parent_keyid, level, and an optional branch string
   - requires anonymous composite type syntax in the FROM clause. See
     the instructions in the documentation below.

Joe Conway
This commit is contained in:
Bruce Momjian
2002-09-02 05:44:05 +00:00
parent 9fd842c4b2
commit 6aa4482f2f
5 changed files with 602 additions and 12 deletions

View File

@ -32,16 +32,42 @@
#include "fmgr.h"
#include "funcapi.h"
#include "executor/spi.h"
#include "executor/spi.h"
#include "miscadmin.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "tablefunc.h"
static bool compatTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
static void validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch);
static bool compatCrosstabTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
static bool compatConnectbyTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
static void get_normal_pair(float8 *x1, float8 *x2);
static TupleDesc make_crosstab_tupledesc(TupleDesc spi_tupdesc, int num_catagories);
static TupleDesc make_crosstab_tupledesc(TupleDesc spi_tupdesc,
int num_catagories);
static Tuplestorestate *connectby(char *relname,
char *key_fld,
char *parent_key_fld,
char *branch_delim,
char *start_with,
int max_depth,
bool show_branch,
MemoryContext per_query_ctx,
AttInMetadata *attinmeta);
static Tuplestorestate *build_tuplestore_recursively(char *key_fld,
char *parent_key_fld,
char *relname,
char *branch_delim,
char *start_with,
char *branch,
int level,
int max_depth,
bool show_branch,
MemoryContext per_query_ctx,
AttInMetadata *attinmeta,
Tuplestorestate *tupstore);
static char *quote_ident_cstr(char *rawstr);
typedef struct
{
@ -68,6 +94,9 @@ typedef struct
} \
} while (0)
/* sign, 10 digits, '\0' */
#define INT32_STRLEN 12
/*
* normal_rand - return requested number of random values
* with a Gaussian (Normal) distribution.
@ -358,7 +387,7 @@ crosstab(PG_FUNCTION_ARGS)
* from ret_relname, at least based on number and type of
* attributes
*/
if (!compatTupleDescs(tupdesc, spi_tupdesc))
if (!compatCrosstabTupleDescs(tupdesc, spi_tupdesc))
elog(ERROR, "crosstab: return and sql tuple descriptions are"
" incompatible");
@ -558,11 +587,401 @@ crosstab(PG_FUNCTION_ARGS)
}
}
/*
* connectby_text - produce a result set from a hierarchical (parent/child)
* table.
*
* e.g. given table foo:
*
* keyid parent_keyid
* ------+--------------
* row1 NULL
* row2 row1
* row3 row1
* row4 row2
* row5 row2
* row6 row4
* row7 row3
* row8 row6
* row9 row5
*
*
* connectby(text relname, text keyid_fld, text parent_keyid_fld,
* text start_with, int max_depth [, text branch_delim])
* connectby('foo', 'keyid', 'parent_keyid', 'row2', 0, '~') returns:
*
* keyid parent_id level branch
* ------+-----------+--------+-----------------------
* row2 NULL 0 row2
* row4 row2 1 row2~row4
* row6 row4 2 row2~row4~row6
* row8 row6 3 row2~row4~row6~row8
* row5 row2 1 row2~row5
* row9 row5 2 row2~row5~row9
*
*/
PG_FUNCTION_INFO_V1(connectby_text);
#define CONNECTBY_NCOLS 4
#define CONNECTBY_NCOLS_NOBRANCH 3
Datum
connectby_text(PG_FUNCTION_ARGS)
{
char *relname = GET_STR(PG_GETARG_TEXT_P(0));
char *key_fld = GET_STR(PG_GETARG_TEXT_P(1));
char *parent_key_fld = GET_STR(PG_GETARG_TEXT_P(2));
char *start_with = GET_STR(PG_GETARG_TEXT_P(3));
int max_depth = PG_GETARG_INT32(4);
char *branch_delim = NULL;
bool show_branch = false;
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
AttInMetadata *attinmeta;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
if (fcinfo->nargs == 6)
{
branch_delim = GET_STR(PG_GETARG_TEXT_P(5));
show_branch = true;
}
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
/* get the requested return tuple description */
tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
/* does it meet our needs */
validateConnectbyTupleDesc(tupdesc, show_branch);
/* OK, use it then */
attinmeta = TupleDescGetAttInMetadata(tupdesc);
/* check to see if caller supports us returning a tuplestore */
if (!rsinfo->allowedModes & SFRM_Materialize)
elog(ERROR, "connectby requires Materialize mode, but it is not "
"allowed in this context");
/* OK, go to work */
rsinfo->returnMode = SFRM_Materialize;
rsinfo->setResult = connectby(relname,
key_fld,
parent_key_fld,
branch_delim,
start_with,
max_depth,
show_branch,
per_query_ctx,
attinmeta);
rsinfo->setDesc = tupdesc;
MemoryContextSwitchTo(oldcontext);
/*
* SFRM_Materialize mode expects us to return a NULL Datum.
* The actual tuples are in our tuplestore and passed back through
* rsinfo->setResult. rsinfo->setDesc is set to the tuple description
* that we actually used to build our tuples with, so the caller can
* verify we did what it was expecting.
*/
return (Datum) 0;
}
/*
* connectby - does the real work for connectby_text()
*/
static Tuplestorestate *
connectby(char *relname,
char *key_fld,
char *parent_key_fld,
char *branch_delim,
char *start_with,
int max_depth,
bool show_branch,
MemoryContext per_query_ctx,
AttInMetadata *attinmeta)
{
Tuplestorestate *tupstore = NULL;
int ret;
MemoryContext oldcontext;
/* Connect to SPI manager */
if ((ret = SPI_connect()) < 0)
elog(ERROR, "connectby: SPI_connect returned %d", ret);
/* switch to longer term context to create the tuple store */
oldcontext = MemoryContextSwitchTo(per_query_ctx);
/* initialize our tuplestore */
tupstore = tuplestore_begin_heap(true, SortMem);
MemoryContextSwitchTo(oldcontext);
/* now go get the whole tree */
tupstore = build_tuplestore_recursively(key_fld,
parent_key_fld,
relname,
branch_delim,
start_with,
start_with, /* current_branch */
0, /* initial level is 0 */
max_depth,
show_branch,
per_query_ctx,
attinmeta,
tupstore);
SPI_finish();
oldcontext = MemoryContextSwitchTo(per_query_ctx);
tuplestore_donestoring(tupstore);
MemoryContextSwitchTo(oldcontext);
return tupstore;
}
static Tuplestorestate *
build_tuplestore_recursively(char *key_fld,
char *parent_key_fld,
char *relname,
char *branch_delim,
char *start_with,
char *branch,
int level,
int max_depth,
bool show_branch,
MemoryContext per_query_ctx,
AttInMetadata *attinmeta,
Tuplestorestate *tupstore)
{
TupleDesc tupdesc = attinmeta->tupdesc;
MemoryContext oldcontext;
StringInfo sql = makeStringInfo();
int ret;
int proc;
if(max_depth > 0 && level > max_depth)
return tupstore;
/* Build initial sql statement */
appendStringInfo(sql, "SELECT %s, %s FROM %s WHERE %s = '%s' AND %s IS NOT NULL",
quote_ident_cstr(key_fld),
quote_ident_cstr(parent_key_fld),
quote_ident_cstr(relname),
quote_ident_cstr(parent_key_fld),
start_with,
quote_ident_cstr(key_fld));
/* Retrieve the desired rows */
ret = SPI_exec(sql->data, 0);
proc = SPI_processed;
/* Check for qualifying tuples */
if ((ret == SPI_OK_SELECT) && (proc > 0))
{
HeapTuple tuple;
HeapTuple spi_tuple;
SPITupleTable *tuptable = SPI_tuptable;
TupleDesc spi_tupdesc = tuptable->tupdesc;
int i;
char *current_key;
char *current_key_parent;
char current_level[INT32_STRLEN];
char *current_branch;
char **values;
if (show_branch)
values = (char **) palloc(CONNECTBY_NCOLS * sizeof(char *));
else
values = (char **) palloc(CONNECTBY_NCOLS_NOBRANCH * sizeof(char *));
/* First time through, do a little setup */
if (level == 0)
{
/*
* Check that return tupdesc is compatible with the one we got
* from the query, but only at level 0 -- no need to check more
* than once
*/
if (!compatConnectbyTupleDescs(tupdesc, spi_tupdesc))
elog(ERROR, "connectby: return and sql tuple descriptions are "
"incompatible");
/* root value is the one we initially start with */
values[0] = start_with;
/* root value has no parent */
values[1] = NULL;
/* root level is 0 */
sprintf(current_level, "%d", level);
values[2] = current_level;
/* root branch is just starting root value */
if (show_branch)
values[3] = start_with;
/* construct the tuple */
tuple = BuildTupleFromCStrings(attinmeta, values);
/* switch to long lived context while storing the tuple */
oldcontext = MemoryContextSwitchTo(per_query_ctx);
/* now store it */
tuplestore_puttuple(tupstore, tuple);
/* now reset the context */
MemoryContextSwitchTo(oldcontext);
/* increment level */
level++;
}
for (i = 0; i < proc; i++)
{
StringInfo branchstr = NULL;
/* start a new branch */
if (show_branch)
{
branchstr = makeStringInfo();
appendStringInfo(branchstr, "%s", branch);
}
/* get the next sql result tuple */
spi_tuple = tuptable->vals[i];
/* get the current key and parent */
current_key = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
current_key_parent = pstrdup(SPI_getvalue(spi_tuple, spi_tupdesc, 2));
/* get the current level */
sprintf(current_level, "%d", level);
/* extend the branch */
if (show_branch)
{
appendStringInfo(branchstr, "%s%s", branch_delim, current_key);
current_branch = branchstr->data;
}
else
current_branch = NULL;
/* build a tuple */
values[0] = pstrdup(current_key);
values[1] = current_key_parent;
values[2] = current_level;
if (show_branch)
values[3] = current_branch;
tuple = BuildTupleFromCStrings(attinmeta, values);
xpfree(current_key);
xpfree(current_key_parent);
/* switch to long lived context while storing the tuple */
oldcontext = MemoryContextSwitchTo(per_query_ctx);
/* store the tuple for later use */
tuplestore_puttuple(tupstore, tuple);
/* now reset the context */
MemoryContextSwitchTo(oldcontext);
heap_freetuple(tuple);
/* recurse using current_key_parent as the new start_with */
tupstore = build_tuplestore_recursively(key_fld,
parent_key_fld,
relname,
branch_delim,
values[0],
current_branch,
level + 1,
max_depth,
show_branch,
per_query_ctx,
attinmeta,
tupstore);
}
}
return tupstore;
}
/*
* Check expected (query runtime) tupdesc suitable for Connectby
*/
static void
validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch)
{
/* are there the correct number of columns */
if (show_branch)
{
if (tupdesc->natts != CONNECTBY_NCOLS)
elog(ERROR, "Query-specified return tuple not valid for Connectby: "
"wrong number of columns");
}
else
{
if (tupdesc->natts != CONNECTBY_NCOLS_NOBRANCH)
elog(ERROR, "Query-specified return tuple not valid for Connectby: "
"wrong number of columns");
}
/* check that the types of the first two columns match */
if (tupdesc->attrs[0]->atttypid != tupdesc->attrs[1]->atttypid)
elog(ERROR, "Query-specified return tuple not valid for Connectby: "
"first two columns must be the same type");
/* check that the type of the third column is INT4 */
if (tupdesc->attrs[2]->atttypid != INT4OID)
elog(ERROR, "Query-specified return tuple not valid for Connectby: "
"third column must be type %s", format_type_be(INT4OID));
/* check that the type of the forth column is TEXT if applicable */
if (show_branch && tupdesc->attrs[3]->atttypid != TEXTOID)
elog(ERROR, "Query-specified return tuple not valid for Connectby: "
"third column must be type %s", format_type_be(TEXTOID));
/* OK, the tupdesc is valid for our purposes */
}
/*
* Check if spi sql tupdesc and return tupdesc are compatible
*/
static bool
compatConnectbyTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
{
Oid ret_atttypid;
Oid sql_atttypid;
/* check the key_fld types match */
ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
if (ret_atttypid != sql_atttypid)
elog(ERROR, "compatConnectbyTupleDescs: SQL key field datatype does "
"not match return key field datatype");
/* check the parent_key_fld types match */
ret_atttypid = ret_tupdesc->attrs[1]->atttypid;
sql_atttypid = sql_tupdesc->attrs[1]->atttypid;
if (ret_atttypid != sql_atttypid)
elog(ERROR, "compatConnectbyTupleDescs: SQL parent key field datatype "
"does not match return parent key field datatype");
/* OK, the two tupdescs are compatible for our purposes */
return true;
}
/*
* Check if two tupdescs match in type of attributes
*/
static bool
compatTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
compatCrosstabTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
{
int i;
Form_pg_attribute ret_attr;
@ -574,7 +993,7 @@ compatTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
if (ret_atttypid != sql_atttypid)
elog(ERROR, "compatTupleDescs: SQL rowid datatype does not match"
elog(ERROR, "compatCrosstabTupleDescs: SQL rowid datatype does not match"
" return rowid datatype");
/*
@ -643,3 +1062,20 @@ make_crosstab_tupledesc(TupleDesc spi_tupdesc, int num_catagories)
return tupdesc;
}
/*
* Return a properly quoted identifier.
* Uses quote_ident in quote.c
*/
static char *
quote_ident_cstr(char *rawstr)
{
text *rawstr_text;
text *result_text;
char *result;
rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
result_text = DatumGetTextP(DirectFunctionCall1(quote_ident, PointerGetDatum(rawstr_text)));
result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
return result;
}