1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-29 10:41:53 +03:00

Enhancement of SPI to get access to portals

- New functions to create a portal using a prepared/saved
  SPI plan or lookup an existing portal by name.
- Functions to fetch/move from/in portals. Results are placed
  in the usual SPI_processed and SPI_tuptable, so the entire
  set of utility functions can be used to gain attribute access.
- Prepared/saved SPI plans now use their own memory context
  and SPI_freeplan(plan) can remove them.
- Tuple result sets (SPI_tuptable) now uses it's own memory
  context and can be free'd by SPI_freetuptable(tuptab).

Enhancement of PL/pgSQL

- Uses generic named portals internally in FOR ... SELECT
  loops to avoid running out of memory on huge result sets.
- Support for CURSOR and REFCURSOR syntax using the new SPI
  functionality. Cursors used internally only need no explicit
  transaction block. Refcursor variables can be used inside
  of explicit transaction block to pass cursors between main
  application and functions.


Jan
This commit is contained in:
Jan Wieck
2001-05-21 14:22:19 +00:00
parent be03eb25f3
commit d27f363e3f
13 changed files with 1521 additions and 121 deletions

View File

@ -3,12 +3,13 @@
* spi.c
* Server Programming Interface
*
* $Id: spi.c,v 1.53 2001/03/22 03:59:29 momjian Exp $
* $Id: spi.c,v 1.54 2001/05/21 14:22:17 wieck Exp $
*
*-------------------------------------------------------------------------
*/
#include "executor/spi_priv.h"
#include "access/printtup.h"
#include "commands/command.h"
uint32 SPI_processed = 0;
Oid SPI_lastoid = InvalidOid;
@ -26,6 +27,9 @@ static int _SPI_pquery(QueryDesc *queryDesc, EState *state, int tcount);
static int _SPI_execute_plan(_SPI_plan *plan,
Datum *Values, char *Nulls, int tcount);
static void _SPI_cursor_operation(Portal portal, bool forward, int count,
CommandDest dest);
static _SPI_plan *_SPI_copy_plan(_SPI_plan *plan, int location);
static int _SPI_begin_call(bool execmem);
@ -272,6 +276,18 @@ SPI_saveplan(void *plan)
}
int
SPI_freeplan(void *plan)
{
_SPI_plan *spiplan = (_SPI_plan *)plan;
if (plan == NULL)
return SPI_ERROR_ARGUMENT;
MemoryContextDelete(spiplan->plancxt);
return 0;
}
HeapTuple
SPI_copytuple(HeapTuple tuple)
{
@ -555,6 +571,181 @@ SPI_freetuple(HeapTuple tuple)
heap_freetuple(tuple);
}
void
SPI_freetuptable(SPITupleTable *tuptable)
{
if (tuptable != NULL)
MemoryContextDelete(tuptable->tuptabcxt);
}
/*
* SPI_cursor_open()
*
* Open a prepared SPI plan as a portal
*/
Portal
SPI_cursor_open(char *name, void *plan, Datum *Values, char *Nulls)
{
static int unnamed_portal_count = 0;
_SPI_plan *spiplan = (_SPI_plan *)plan;
List *qtlist = spiplan->qtlist;
List *ptlist = spiplan->ptlist;
Query *queryTree;
Plan *planTree;
QueryDesc *queryDesc;
EState *eState;
TupleDesc attinfo;
MemoryContext oldcontext;
Portal portal;
char portalname[64];
int k;
/* Ensure that the plan contains only one regular SELECT query */
if (length(ptlist) != 1)
elog(ERROR, "cannot open multi-query plan as cursor");
queryTree = (Query *)lfirst(qtlist);
planTree = (Plan *)lfirst(ptlist);
if (queryTree->commandType != CMD_SELECT)
elog(ERROR, "plan in SPI_cursor_open() is not a SELECT");
if (queryTree->isPortal)
elog(ERROR, "plan in SPI_cursor_open() must NOT be a DECLARE already");
else if (queryTree->into != NULL)
elog(ERROR, "plan in SPI_cursor_open() must NOT be a SELECT INTO");
/* Reset SPI result */
SPI_processed = 0;
SPI_tuptable = NULL;
_SPI_current->processed = 0;
_SPI_current->tuptable = NULL;
/* Make up a portal name if none given */
if (name == NULL)
{
for (;;)
{
unnamed_portal_count++;
if (unnamed_portal_count < 0)
unnamed_portal_count = 0;
sprintf(portalname, "<unnamed cursor %d>", unnamed_portal_count);
if (GetPortalByName(portalname) == NULL)
break;
}
name = portalname;
}
/* Ensure the portal doesn't exist already */
portal = GetPortalByName(name);
if (portal != NULL)
elog(ERROR, "cursor \"%s\" already in use", name);
/* Create the portal */
portal = CreatePortal(name);
if (portal == NULL)
elog(ERROR, "failed to create portal \"%s\"", name);
/* Switch to portals memory and copy the parsetree and plan to there */
oldcontext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
queryTree = copyObject(queryTree);
planTree = copyObject(planTree);
/* Modify the parsetree to be a cursor */
queryTree->isPortal = true;
queryTree->into = pstrdup(name);
queryTree->isBinary = false;
/* Create the QueryDesc object and the executor state */
queryDesc = CreateQueryDesc(queryTree, planTree, SPI);
eState = CreateExecutorState();
/* If the plan has parameters, put them into the executor state */
if (spiplan->nargs > 0)
{
ParamListInfo paramLI = (ParamListInfo) palloc((spiplan->nargs + 1) *
sizeof(ParamListInfoData));
eState->es_param_list_info = paramLI;
for (k = 0; k < spiplan->nargs; paramLI++, k++)
{
paramLI->kind = PARAM_NUM;
paramLI->id = k + 1;
paramLI->isnull = (Nulls && Nulls[k] == 'n');
paramLI->value = Values[k];
}
paramLI->kind = PARAM_INVALID;
}
else
eState->es_param_list_info = NULL;
/* Start the executor */
attinfo = ExecutorStart(queryDesc, eState);
/* Put all the objects into the portal */
PortalSetQuery(portal, queryDesc, attinfo, eState, PortalCleanup);
/* Switch back to the callers memory context */
MemoryContextSwitchTo(oldcontext);
/* Return the created portal */
return portal;
}
/*
* SPI_cursor_find()
*
* Find the portal of an existing open cursor
*/
Portal
SPI_cursor_find(char *name)
{
return GetPortalByName(name);
}
/*
* SPI_cursor_fetch()
*
* Fetch rows in a cursor
*/
void
SPI_cursor_fetch(Portal portal, bool forward, int count)
{
_SPI_cursor_operation(portal, forward, count, SPI);
}
/*
* SPI_cursor_move()
*
* Move in a cursor
*/
void
SPI_cursor_move(Portal portal, bool forward, int count)
{
_SPI_cursor_operation(portal, forward, count, None);
}
/*
* SPI_cursor_close()
*
* Close a cursor
*/
void
SPI_cursor_close(Portal portal)
{
Portal my_portal = portal;
if (!PortalIsValid(my_portal))
elog(ERROR, "invalid portal in SPI cursor operation");
PortalDrop(&my_portal);
}
/* =================== private functions =================== */
/*
@ -568,6 +759,7 @@ spi_printtup(HeapTuple tuple, TupleDesc tupdesc, DestReceiver *self)
{
SPITupleTable *tuptable;
MemoryContext oldcxt;
MemoryContext tuptabcxt;
/*
* When called by Executor _SPI_curid expected to be equal to
@ -583,18 +775,31 @@ spi_printtup(HeapTuple tuple, TupleDesc tupdesc, DestReceiver *self)
tuptable = _SPI_current->tuptable;
if (tuptable == NULL)
{
tuptabcxt = AllocSetContextCreate(CurrentMemoryContext,
"SPI TupTable",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContextSwitchTo(tuptabcxt);
_SPI_current->tuptable = tuptable = (SPITupleTable *)
palloc(sizeof(SPITupleTable));
tuptable->tuptabcxt = tuptabcxt;
tuptable->alloced = tuptable->free = 128;
tuptable->vals = (HeapTuple *) palloc(tuptable->alloced * sizeof(HeapTuple));
tuptable->tupdesc = CreateTupleDescCopy(tupdesc);
}
else if (tuptable->free == 0)
else
{
tuptable->free = 256;
tuptable->alloced += tuptable->free;
tuptable->vals = (HeapTuple *) repalloc(tuptable->vals,
tuptable->alloced * sizeof(HeapTuple));
MemoryContextSwitchTo(tuptable->tuptabcxt);
if (tuptable->free == 0)
{
tuptable->free = 256;
tuptable->alloced += tuptable->free;
tuptable->vals = (HeapTuple *) repalloc(tuptable->vals,
tuptable->alloced * sizeof(HeapTuple));
}
}
tuptable->vals[tuptable->alloced - tuptable->free] = heap_copytuple(tuple);
@ -876,6 +1081,86 @@ _SPI_pquery(QueryDesc *queryDesc, EState *state, int tcount)
}
/*
* _SPI_cursor_operation()
*
* Do a FETCH or MOVE in a cursor
*/
static void
_SPI_cursor_operation(Portal portal, bool forward, int count,
CommandDest dest)
{
QueryDesc *querydesc;
EState *estate;
MemoryContext oldcontext;
CommandDest olddest;
/* Check that the portal is valid */
if (!PortalIsValid(portal))
elog(ERROR, "invalid portal in SPI cursor operation");
/* Push the SPI stack */
_SPI_begin_call(true);
/* Reset the SPI result */
SPI_processed = 0;
SPI_tuptable = NULL;
_SPI_current->processed = 0;
_SPI_current->tuptable = NULL;
/* Switch to the portals memory context */
oldcontext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
querydesc = PortalGetQueryDesc(portal);
estate = PortalGetState(portal);
/* Save the queries command destination and set it to SPI (for fetch) */
/* or None (for move) */
olddest = querydesc->dest;
querydesc->dest = dest;
/* Run the executor like PerformPortalFetch and remember states */
if (forward)
{
if (!portal->atEnd)
{
ExecutorRun(querydesc, estate, EXEC_FOR, (long)count);
_SPI_current->processed = estate->es_processed;
if (estate->es_processed > 0)
portal->atStart = false;
if (count <= 0 || (int) estate->es_processed < count)
portal->atEnd = true;
}
}
else
{
if (!portal->atStart)
{
ExecutorRun(querydesc, estate, EXEC_BACK, (long) count);
_SPI_current->processed = estate->es_processed;
if (estate->es_processed > 0)
portal->atEnd = false;
if (count <= 0 || estate->es_processed < count)
portal->atStart = true;
}
}
/* Restore the old command destination and switch back to callers */
/* memory context */
querydesc->dest = olddest;
MemoryContextSwitchTo(oldcontext);
if (dest == SPI && _SPI_checktuples())
elog(FATAL, "SPI_fetch: # of processed tuples check failed");
/* Put the result into place for access by caller */
SPI_processed = _SPI_current->processed;
SPI_tuptable = _SPI_current->tuptable;
/* Pop the SPI stack */
_SPI_end_call(true);
}
static MemoryContext
_SPI_execmem()
{
@ -956,14 +1241,33 @@ static _SPI_plan *
_SPI_copy_plan(_SPI_plan *plan, int location)
{
_SPI_plan *newplan;
MemoryContext oldcxt = NULL;
MemoryContext oldcxt;
MemoryContext plancxt;
MemoryContext parentcxt = CurrentMemoryContext;
/* Determine correct parent for the plans memory context */
if (location == _SPI_CPLAN_PROCXT)
parentcxt = _SPI_current->procCxt;
/*
oldcxt = MemoryContextSwitchTo(_SPI_current->procCxt);
*/
else if (location == _SPI_CPLAN_TOPCXT)
parentcxt = TopMemoryContext;
/*
oldcxt = MemoryContextSwitchTo(TopMemoryContext);
*/
/* Create a memory context for the plan */
plancxt = AllocSetContextCreate(parentcxt,
"SPI Plan",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
oldcxt = MemoryContextSwitchTo(plancxt);
/* Copy the SPI plan into it's own context */
newplan = (_SPI_plan *) palloc(sizeof(_SPI_plan));
newplan->plancxt = plancxt;
newplan->qtlist = (List *) copyObject(plan->qtlist);
newplan->ptlist = (List *) copyObject(plan->ptlist);
newplan->nargs = plan->nargs;
@ -975,8 +1279,7 @@ _SPI_copy_plan(_SPI_plan *plan, int location)
else
newplan->argtypes = NULL;
if (oldcxt != NULL)
MemoryContextSwitchTo(oldcxt);
MemoryContextSwitchTo(oldcxt);
return newplan;
}