mirror of
https://github.com/postgres/postgres.git
synced 2025-07-28 23:42:10 +03:00
Restructure command destination handling so that we pass around
DestReceiver pointers instead of just CommandDest values. The DestReceiver is made at the point where the destination is selected, rather than deep inside the executor. This cleans up the original kluge implementation of tstoreReceiver.c, and makes it easy to support retrieving results from utility statements inside portals. Thus, you can now do fun things like Bind and Execute a FETCH or EXPLAIN command, and it'll all work as expected (e.g., you can Describe the portal, or use Execute's count parameter to suspend the output partway through). Implementation involves stuffing the utility command's output into a Tuplestore, which would be kind of annoying for huge output sets, but should be quite acceptable for typical uses of utility commands.
This commit is contained in:
@ -8,13 +8,14 @@
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $Header: /cvsroot/pgsql/src/backend/tcop/pquery.c,v 1.61 2003/05/06 00:20:33 tgl Exp $
|
||||
* $Header: /cvsroot/pgsql/src/backend/tcop/pquery.c,v 1.62 2003/05/06 20:26:27 tgl Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include "executor/executor.h"
|
||||
#include "executor/tstoreReceiver.h"
|
||||
#include "miscadmin.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "tcop/pquery.h"
|
||||
@ -24,18 +25,18 @@
|
||||
|
||||
|
||||
static uint32 RunFromStore(Portal portal, ScanDirection direction, long count,
|
||||
CommandDest dest);
|
||||
DestReceiver *dest);
|
||||
static long PortalRunSelect(Portal portal, bool forward, long count,
|
||||
CommandDest dest);
|
||||
DestReceiver *dest);
|
||||
static void PortalRunUtility(Portal portal, Query *query,
|
||||
CommandDest dest, char *completionTag);
|
||||
DestReceiver *dest, char *completionTag);
|
||||
static void PortalRunMulti(Portal portal,
|
||||
CommandDest dest, CommandDest altdest,
|
||||
DestReceiver *dest, DestReceiver *altdest,
|
||||
char *completionTag);
|
||||
static long DoPortalRunFetch(Portal portal,
|
||||
FetchDirection fdirection,
|
||||
long count,
|
||||
CommandDest dest);
|
||||
DestReceiver *dest);
|
||||
static void DoPortalRewind(Portal portal);
|
||||
|
||||
|
||||
@ -45,7 +46,7 @@ static void DoPortalRewind(Portal portal);
|
||||
QueryDesc *
|
||||
CreateQueryDesc(Query *parsetree,
|
||||
Plan *plantree,
|
||||
CommandDest dest,
|
||||
DestReceiver *dest,
|
||||
const char *portalName,
|
||||
ParamListInfo params,
|
||||
bool doInstrument)
|
||||
@ -103,7 +104,7 @@ ProcessQuery(Query *parsetree,
|
||||
Plan *plan,
|
||||
ParamListInfo params,
|
||||
const char *portalName,
|
||||
CommandDest dest,
|
||||
DestReceiver *dest,
|
||||
char *completionTag)
|
||||
{
|
||||
int operation = parsetree->commandType;
|
||||
@ -123,7 +124,7 @@ ProcessQuery(Query *parsetree,
|
||||
* special-cases this case. (Perhaps would be cleaner to have
|
||||
* an additional destination type?)
|
||||
*/
|
||||
dest = None;
|
||||
dest = None_Receiver;
|
||||
}
|
||||
}
|
||||
|
||||
@ -185,6 +186,39 @@ ProcessQuery(Query *parsetree,
|
||||
FreeQueryDesc(queryDesc);
|
||||
}
|
||||
|
||||
/*
|
||||
* ChoosePortalStrategy
|
||||
* Select portal execution strategy given the intended query list.
|
||||
*
|
||||
* See the comments in portal.h.
|
||||
*/
|
||||
PortalStrategy
|
||||
ChoosePortalStrategy(List *parseTrees)
|
||||
{
|
||||
PortalStrategy strategy;
|
||||
|
||||
strategy = PORTAL_MULTI_QUERY; /* default assumption */
|
||||
|
||||
if (length(parseTrees) == 1)
|
||||
{
|
||||
Query *query = (Query *) lfirst(parseTrees);
|
||||
|
||||
if (query->commandType == CMD_SELECT &&
|
||||
query->canSetTag &&
|
||||
query->into == NULL)
|
||||
{
|
||||
strategy = PORTAL_ONE_SELECT;
|
||||
}
|
||||
else if (query->commandType == CMD_UTILITY &&
|
||||
query->canSetTag &&
|
||||
query->utilityStmt != NULL)
|
||||
{
|
||||
if (UtilityReturnsTuples(query->utilityStmt))
|
||||
strategy = PORTAL_UTIL_SELECT;
|
||||
}
|
||||
}
|
||||
return strategy;
|
||||
}
|
||||
|
||||
/*
|
||||
* PortalStart
|
||||
@ -202,7 +236,6 @@ void
|
||||
PortalStart(Portal portal, ParamListInfo params)
|
||||
{
|
||||
MemoryContext oldContext;
|
||||
Query *query = NULL;
|
||||
QueryDesc *queryDesc;
|
||||
|
||||
AssertArg(PortalIsValid(portal));
|
||||
@ -215,23 +248,9 @@ PortalStart(Portal portal, ParamListInfo params)
|
||||
portal->portalParams = params;
|
||||
|
||||
/*
|
||||
* Determine the portal execution strategy (see comments in portal.h)
|
||||
* Determine the portal execution strategy
|
||||
*/
|
||||
portal->strategy = PORTAL_MULTI_QUERY; /* default assumption */
|
||||
if (length(portal->parseTrees) == 1)
|
||||
{
|
||||
query = (Query *) lfirst(portal->parseTrees);
|
||||
if (query->commandType == CMD_SELECT &&
|
||||
query->canSetTag &&
|
||||
query->into == NULL)
|
||||
portal->strategy = PORTAL_ONE_SELECT;
|
||||
else if (query->commandType == CMD_UTILITY &&
|
||||
query->canSetTag &&
|
||||
query->utilityStmt != NULL)
|
||||
{
|
||||
/* XXX check for things that can be PORTAL_UTIL_SELECT */
|
||||
}
|
||||
}
|
||||
portal->strategy = ChoosePortalStrategy(portal->parseTrees);
|
||||
|
||||
/*
|
||||
* Fire her up according to the strategy
|
||||
@ -247,9 +266,9 @@ PortalStart(Portal portal, ParamListInfo params)
|
||||
* Create QueryDesc in portal's context; for the moment, set
|
||||
* the destination to None.
|
||||
*/
|
||||
queryDesc = CreateQueryDesc(query,
|
||||
queryDesc = CreateQueryDesc((Query *) lfirst(portal->parseTrees),
|
||||
(Plan *) lfirst(portal->planTrees),
|
||||
None,
|
||||
None_Receiver,
|
||||
portal->name,
|
||||
params,
|
||||
false);
|
||||
@ -261,6 +280,9 @@ PortalStart(Portal portal, ParamListInfo params)
|
||||
* This tells PortalCleanup to shut down the executor
|
||||
*/
|
||||
portal->queryDesc = queryDesc;
|
||||
/*
|
||||
* Remember tuple descriptor
|
||||
*/
|
||||
portal->tupDesc = queryDesc->tupDesc;
|
||||
/*
|
||||
* Reset cursor position data to "start of query"
|
||||
@ -272,10 +294,19 @@ PortalStart(Portal portal, ParamListInfo params)
|
||||
break;
|
||||
|
||||
case PORTAL_UTIL_SELECT:
|
||||
/* XXX implement later */
|
||||
/* XXX query snapshot here? no, RunUtility will do it */
|
||||
/* xxx what about Params? */
|
||||
portal->tupDesc = NULL;
|
||||
/*
|
||||
* We don't set query snapshot here, because PortalRunUtility
|
||||
* will take care of it.
|
||||
*/
|
||||
portal->tupDesc =
|
||||
UtilityTupleDescriptor(((Query *) lfirst(portal->parseTrees))->utilityStmt);
|
||||
/*
|
||||
* Reset cursor position data to "start of query"
|
||||
*/
|
||||
portal->atStart = true;
|
||||
portal->atEnd = false; /* allow fetches */
|
||||
portal->portalPos = 0;
|
||||
portal->posOverflow = false;
|
||||
break;
|
||||
|
||||
case PORTAL_MULTI_QUERY:
|
||||
@ -310,11 +341,11 @@ PortalStart(Portal portal, ParamListInfo params)
|
||||
* suspended due to exhaustion of the count parameter.
|
||||
*/
|
||||
bool
|
||||
PortalRun(Portal portal, long count, CommandDest dest, CommandDest altdest,
|
||||
PortalRun(Portal portal, long count,
|
||||
DestReceiver *dest, DestReceiver *altdest,
|
||||
char *completionTag)
|
||||
{
|
||||
bool result;
|
||||
Portal saveCurrentPortal;
|
||||
MemoryContext savePortalContext;
|
||||
MemoryContext saveQueryContext;
|
||||
MemoryContext oldContext;
|
||||
@ -336,10 +367,8 @@ PortalRun(Portal portal, long count, CommandDest dest, CommandDest altdest,
|
||||
portal->portalActive = true;
|
||||
|
||||
/*
|
||||
* Set global portal and context pointers.
|
||||
* Set global portal context pointers.
|
||||
*/
|
||||
saveCurrentPortal = CurrentPortal;
|
||||
CurrentPortal = portal;
|
||||
savePortalContext = PortalContext;
|
||||
PortalContext = PortalGetHeapMemory(portal);
|
||||
saveQueryContext = QueryContext;
|
||||
@ -367,8 +396,14 @@ PortalRun(Portal portal, long count, CommandDest dest, CommandDest altdest,
|
||||
*/
|
||||
if (!portal->portalUtilReady)
|
||||
{
|
||||
DestReceiver *treceiver;
|
||||
|
||||
PortalCreateHoldStore(portal);
|
||||
treceiver = CreateTuplestoreDestReceiver(portal->holdStore,
|
||||
portal->holdContext);
|
||||
PortalRunUtility(portal, lfirst(portal->parseTrees),
|
||||
Tuplestore, NULL);
|
||||
treceiver, NULL);
|
||||
(*treceiver->destroy) (treceiver);
|
||||
portal->portalUtilReady = true;
|
||||
}
|
||||
/*
|
||||
@ -404,7 +439,6 @@ PortalRun(Portal portal, long count, CommandDest dest, CommandDest altdest,
|
||||
/* Mark portal not active */
|
||||
portal->portalActive = false;
|
||||
|
||||
CurrentPortal = saveCurrentPortal;
|
||||
PortalContext = savePortalContext;
|
||||
QueryContext = saveQueryContext;
|
||||
|
||||
@ -431,7 +465,7 @@ long
|
||||
PortalRunSelect(Portal portal,
|
||||
bool forward,
|
||||
long count,
|
||||
CommandDest dest)
|
||||
DestReceiver *dest)
|
||||
{
|
||||
QueryDesc *queryDesc;
|
||||
ScanDirection direction;
|
||||
@ -568,21 +602,18 @@ PortalRunSelect(Portal portal,
|
||||
*/
|
||||
static uint32
|
||||
RunFromStore(Portal portal, ScanDirection direction, long count,
|
||||
CommandDest dest)
|
||||
DestReceiver *dest)
|
||||
{
|
||||
DestReceiver *destfunc;
|
||||
List *targetlist;
|
||||
long current_tuple_count = 0;
|
||||
|
||||
destfunc = DestToFunction(dest);
|
||||
|
||||
if (portal->strategy == PORTAL_ONE_SELECT)
|
||||
targetlist = ((Plan *) lfirst(portal->planTrees))->targetlist;
|
||||
else
|
||||
targetlist = NIL;
|
||||
|
||||
(*destfunc->setup) (destfunc, CMD_SELECT, portal->name, portal->tupDesc,
|
||||
targetlist);
|
||||
(*dest->startup) (dest, CMD_SELECT, portal->name, portal->tupDesc,
|
||||
targetlist);
|
||||
|
||||
if (direction == NoMovementScanDirection)
|
||||
{
|
||||
@ -608,7 +639,7 @@ RunFromStore(Portal portal, ScanDirection direction, long count,
|
||||
if (tup == NULL)
|
||||
break;
|
||||
|
||||
(*destfunc->receiveTuple) (tup, portal->tupDesc, destfunc);
|
||||
(*dest->receiveTuple) (tup, portal->tupDesc, dest);
|
||||
|
||||
if (should_free)
|
||||
pfree(tup);
|
||||
@ -624,7 +655,7 @@ RunFromStore(Portal portal, ScanDirection direction, long count,
|
||||
}
|
||||
}
|
||||
|
||||
(*destfunc->cleanup) (destfunc);
|
||||
(*dest->shutdown) (dest);
|
||||
|
||||
return (uint32) current_tuple_count;
|
||||
}
|
||||
@ -635,7 +666,7 @@ RunFromStore(Portal portal, ScanDirection direction, long count,
|
||||
*/
|
||||
static void
|
||||
PortalRunUtility(Portal portal, Query *query,
|
||||
CommandDest dest, char *completionTag)
|
||||
DestReceiver *dest, char *completionTag)
|
||||
{
|
||||
Node *utilityStmt = query->utilityStmt;
|
||||
|
||||
@ -690,7 +721,7 @@ PortalRunUtility(Portal portal, Query *query,
|
||||
*/
|
||||
static void
|
||||
PortalRunMulti(Portal portal,
|
||||
CommandDest dest, CommandDest altdest,
|
||||
DestReceiver *dest, DestReceiver *altdest,
|
||||
char *completionTag)
|
||||
{
|
||||
List *plantree_list = portal->planTrees;
|
||||
@ -807,10 +838,9 @@ long
|
||||
PortalRunFetch(Portal portal,
|
||||
FetchDirection fdirection,
|
||||
long count,
|
||||
CommandDest dest)
|
||||
DestReceiver *dest)
|
||||
{
|
||||
long result;
|
||||
Portal saveCurrentPortal;
|
||||
MemoryContext savePortalContext;
|
||||
MemoryContext saveQueryContext;
|
||||
MemoryContext oldContext;
|
||||
@ -828,10 +858,8 @@ PortalRunFetch(Portal portal,
|
||||
portal->portalActive = true;
|
||||
|
||||
/*
|
||||
* Set global portal and context pointers.
|
||||
* Set global portal context pointers.
|
||||
*/
|
||||
saveCurrentPortal = CurrentPortal;
|
||||
CurrentPortal = portal;
|
||||
savePortalContext = PortalContext;
|
||||
PortalContext = PortalGetHeapMemory(portal);
|
||||
saveQueryContext = QueryContext;
|
||||
@ -856,7 +884,6 @@ PortalRunFetch(Portal portal,
|
||||
/* Mark portal not active */
|
||||
portal->portalActive = false;
|
||||
|
||||
CurrentPortal = saveCurrentPortal;
|
||||
PortalContext = savePortalContext;
|
||||
QueryContext = saveQueryContext;
|
||||
|
||||
@ -873,7 +900,7 @@ static long
|
||||
DoPortalRunFetch(Portal portal,
|
||||
FetchDirection fdirection,
|
||||
long count,
|
||||
CommandDest dest)
|
||||
DestReceiver *dest)
|
||||
{
|
||||
bool forward;
|
||||
|
||||
@ -912,7 +939,8 @@ DoPortalRunFetch(Portal portal,
|
||||
{
|
||||
DoPortalRewind(portal);
|
||||
if (count > 1)
|
||||
PortalRunSelect(portal, true, count-1, None);
|
||||
PortalRunSelect(portal, true, count-1,
|
||||
None_Receiver);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -921,9 +949,11 @@ DoPortalRunFetch(Portal portal,
|
||||
if (portal->atEnd)
|
||||
pos++; /* need one extra fetch if off end */
|
||||
if (count <= pos)
|
||||
PortalRunSelect(portal, false, pos-count+1, None);
|
||||
PortalRunSelect(portal, false, pos-count+1,
|
||||
None_Receiver);
|
||||
else if (count > pos+1)
|
||||
PortalRunSelect(portal, true, count-pos-1, None);
|
||||
PortalRunSelect(portal, true, count-pos-1,
|
||||
None_Receiver);
|
||||
}
|
||||
return PortalRunSelect(portal, true, 1L, dest);
|
||||
}
|
||||
@ -936,9 +966,9 @@ DoPortalRunFetch(Portal portal,
|
||||
* (Is it worth considering case where count > half of size
|
||||
* of query? We could rewind once we know the size ...)
|
||||
*/
|
||||
PortalRunSelect(portal, true, FETCH_ALL, None);
|
||||
PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
|
||||
if (count < -1)
|
||||
PortalRunSelect(portal, false, -count-1, None);
|
||||
PortalRunSelect(portal, false, -count-1, None_Receiver);
|
||||
return PortalRunSelect(portal, false, 1L, dest);
|
||||
}
|
||||
else /* count == 0 */
|
||||
@ -955,7 +985,7 @@ DoPortalRunFetch(Portal portal,
|
||||
* Definition: advance count-1 rows, return next row (if any).
|
||||
*/
|
||||
if (count > 1)
|
||||
PortalRunSelect(portal, true, count-1, None);
|
||||
PortalRunSelect(portal, true, count-1, None_Receiver);
|
||||
return PortalRunSelect(portal, true, 1L, dest);
|
||||
}
|
||||
else if (count < 0)
|
||||
@ -965,7 +995,7 @@ DoPortalRunFetch(Portal portal,
|
||||
* (if any).
|
||||
*/
|
||||
if (count < -1)
|
||||
PortalRunSelect(portal, false, -count-1, None);
|
||||
PortalRunSelect(portal, false, -count-1, None_Receiver);
|
||||
return PortalRunSelect(portal, false, 1L, dest);
|
||||
}
|
||||
else /* count == 0 */
|
||||
@ -995,7 +1025,7 @@ DoPortalRunFetch(Portal portal,
|
||||
/* Are we sitting on a row? */
|
||||
on_row = (!portal->atStart && !portal->atEnd);
|
||||
|
||||
if (dest == None)
|
||||
if (dest->mydest == None)
|
||||
{
|
||||
/* MOVE 0 returns 0/1 based on if FETCH 0 would return a row */
|
||||
return on_row ? 1L : 0L;
|
||||
@ -1011,7 +1041,7 @@ DoPortalRunFetch(Portal portal,
|
||||
*/
|
||||
if (on_row)
|
||||
{
|
||||
PortalRunSelect(portal, false, 1L, None);
|
||||
PortalRunSelect(portal, false, 1L, None_Receiver);
|
||||
/* Set up to fetch one row forward */
|
||||
count = 1;
|
||||
forward = true;
|
||||
@ -1022,7 +1052,7 @@ DoPortalRunFetch(Portal portal,
|
||||
/*
|
||||
* Optimize MOVE BACKWARD ALL into a Rewind.
|
||||
*/
|
||||
if (!forward && count == FETCH_ALL && dest == None)
|
||||
if (!forward && count == FETCH_ALL && dest->mydest == None)
|
||||
{
|
||||
long result = portal->portalPos;
|
||||
|
||||
|
Reference in New Issue
Block a user