mirror of
https://github.com/postgres/postgres.git
synced 2025-06-16 06:01:02 +03:00
Code review for holdable-cursors patch. Fix error recovery, memory
context sloppiness, some other things. Includes Neil's mopup patch of 22-Apr.
This commit is contained in:
@ -8,7 +8,7 @@
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $Header: /cvsroot/pgsql/src/backend/commands/portalcmds.c,v 1.11 2003/03/27 16:51:27 momjian Exp $
|
||||
* $Header: /cvsroot/pgsql/src/backend/commands/portalcmds.c,v 1.12 2003/04/29 03:21:29 tgl Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -24,14 +24,12 @@
|
||||
#include "rewrite/rewriteHandler.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
|
||||
static long DoRelativeFetch(Portal portal,
|
||||
bool forward,
|
||||
long count,
|
||||
CommandDest dest);
|
||||
static long DoRelativeStoreFetch(Portal portal,
|
||||
bool forward,
|
||||
long count,
|
||||
CommandDest dest);
|
||||
static uint32 RunFromStore(Portal portal, ScanDirection direction, long count);
|
||||
static void DoPortalRewind(Portal portal);
|
||||
static Portal PreparePortal(DeclareCursorStmt *stmt);
|
||||
|
||||
@ -52,11 +50,9 @@ PerformCursorOpen(DeclareCursorStmt *stmt, CommandDest dest)
|
||||
QueryDesc *queryDesc;
|
||||
|
||||
/*
|
||||
* If this is a non-holdable cursor, we ensure that this statement
|
||||
* If this is a non-holdable cursor, we require that this statement
|
||||
* has been executed inside a transaction block (or else, it would
|
||||
* have no user-visible effect).
|
||||
*
|
||||
* XXX: surely there is a better way to check this?
|
||||
*/
|
||||
if (!(stmt->options & CURSOR_OPT_HOLD))
|
||||
RequireTransactionChain((void *) stmt, "DECLARE CURSOR");
|
||||
@ -106,7 +102,7 @@ PerformCursorOpen(DeclareCursorStmt *stmt, CommandDest dest)
|
||||
ExecutorStart(queryDesc);
|
||||
|
||||
/* Arrange to shut down the executor if portal is dropped */
|
||||
PortalSetQuery(portal, queryDesc, PortalCleanup);
|
||||
PortalSetQuery(portal, queryDesc);
|
||||
|
||||
/*
|
||||
* We're done; the query won't actually be run until PerformPortalFetch
|
||||
@ -352,15 +348,11 @@ DoRelativeFetch(Portal portal,
|
||||
CommandDest dest)
|
||||
{
|
||||
QueryDesc *queryDesc;
|
||||
EState *estate;
|
||||
ScanDirection direction;
|
||||
QueryDesc temp_queryDesc;
|
||||
|
||||
if (portal->holdStore)
|
||||
return DoRelativeStoreFetch(portal, forward, count, dest);
|
||||
ScanDirection direction;
|
||||
uint32 nprocessed;
|
||||
|
||||
queryDesc = PortalGetQueryDesc(portal);
|
||||
estate = queryDesc->estate;
|
||||
|
||||
/*
|
||||
* If the requested destination is not the same as the query's
|
||||
@ -403,19 +395,26 @@ DoRelativeFetch(Portal portal,
|
||||
if (count == FETCH_ALL)
|
||||
count = 0;
|
||||
|
||||
ExecutorRun(queryDesc, direction, count);
|
||||
if (portal->holdStore)
|
||||
nprocessed = RunFromStore(portal, direction, count);
|
||||
else
|
||||
{
|
||||
Assert(portal->executorRunning);
|
||||
ExecutorRun(queryDesc, direction, count);
|
||||
nprocessed = queryDesc->estate->es_processed;
|
||||
}
|
||||
|
||||
if (direction != NoMovementScanDirection)
|
||||
{
|
||||
long oldPos;
|
||||
|
||||
if (estate->es_processed > 0)
|
||||
if (nprocessed > 0)
|
||||
portal->atStart = false; /* OK to go backward now */
|
||||
if (count == 0 ||
|
||||
(unsigned long) estate->es_processed < (unsigned long) count)
|
||||
(unsigned long) nprocessed < (unsigned long) count)
|
||||
portal->atEnd = true; /* we retrieved 'em all */
|
||||
oldPos = portal->portalPos;
|
||||
portal->portalPos += estate->es_processed;
|
||||
portal->portalPos += nprocessed;
|
||||
/* portalPos doesn't advance when we fall off the end */
|
||||
if (portal->portalPos < oldPos)
|
||||
portal->posOverflow = true;
|
||||
@ -436,17 +435,24 @@ DoRelativeFetch(Portal portal,
|
||||
if (count == FETCH_ALL)
|
||||
count = 0;
|
||||
|
||||
ExecutorRun(queryDesc, direction, count);
|
||||
if (portal->holdStore)
|
||||
nprocessed = RunFromStore(portal, direction, count);
|
||||
else
|
||||
{
|
||||
Assert(portal->executorRunning);
|
||||
ExecutorRun(queryDesc, direction, count);
|
||||
nprocessed = queryDesc->estate->es_processed;
|
||||
}
|
||||
|
||||
if (direction != NoMovementScanDirection)
|
||||
{
|
||||
if (estate->es_processed > 0 && portal->atEnd)
|
||||
if (nprocessed > 0 && portal->atEnd)
|
||||
{
|
||||
portal->atEnd = false; /* OK to go forward now */
|
||||
portal->portalPos++; /* adjust for endpoint case */
|
||||
}
|
||||
if (count == 0 ||
|
||||
(unsigned long) estate->es_processed < (unsigned long) count)
|
||||
(unsigned long) nprocessed < (unsigned long) count)
|
||||
{
|
||||
portal->atStart = true; /* we retrieved 'em all */
|
||||
portal->portalPos = 0;
|
||||
@ -457,7 +463,7 @@ DoRelativeFetch(Portal portal,
|
||||
long oldPos;
|
||||
|
||||
oldPos = portal->portalPos;
|
||||
portal->portalPos -= estate->es_processed;
|
||||
portal->portalPos -= nprocessed;
|
||||
if (portal->portalPos > oldPos ||
|
||||
portal->portalPos <= 0)
|
||||
portal->posOverflow = true;
|
||||
@ -465,76 +471,75 @@ DoRelativeFetch(Portal portal,
|
||||
}
|
||||
}
|
||||
|
||||
return estate->es_processed;
|
||||
return nprocessed;
|
||||
}
|
||||
|
||||
/*
|
||||
* DoRelativeStoreFetch
|
||||
* Do fetch for a simple N-rows-forward-or-backward case, getting
|
||||
* the results from the portal's tuple store.
|
||||
* RunFromStore
|
||||
* Fetch tuples from the portal's tuple store.
|
||||
*
|
||||
* Calling conventions are similar to ExecutorRun, except that we
|
||||
* do not depend on having an estate, and therefore return the number
|
||||
* of tuples processed as the result, not in estate->es_processed.
|
||||
*
|
||||
* One difference from ExecutorRun is that the destination receiver functions
|
||||
* are run in the caller's memory context (since we have no estate). Watch
|
||||
* out for memory leaks.
|
||||
*/
|
||||
static long
|
||||
DoRelativeStoreFetch(Portal portal,
|
||||
bool forward,
|
||||
long count,
|
||||
CommandDest dest)
|
||||
static uint32
|
||||
RunFromStore(Portal portal, ScanDirection direction, long count)
|
||||
{
|
||||
QueryDesc *queryDesc = PortalGetQueryDesc(portal);
|
||||
DestReceiver *destfunc;
|
||||
QueryDesc *queryDesc = portal->queryDesc;
|
||||
long rows_fetched = 0;
|
||||
long current_tuple_count = 0;
|
||||
|
||||
if (!forward && portal->scrollType == DISABLE_SCROLL)
|
||||
elog(ERROR, "Cursor can only scan forward"
|
||||
"\n\tDeclare it with SCROLL option to enable backward scan");
|
||||
|
||||
destfunc = DestToFunction(dest);
|
||||
destfunc = DestToFunction(queryDesc->dest);
|
||||
(*destfunc->setup) (destfunc, queryDesc->operation,
|
||||
portal->name, queryDesc->tupDesc);
|
||||
queryDesc->portalName, queryDesc->tupDesc);
|
||||
|
||||
for (;;)
|
||||
if (direction == NoMovementScanDirection)
|
||||
{
|
||||
HeapTuple tup;
|
||||
bool should_free;
|
||||
/* do nothing except start/stop the destination */
|
||||
}
|
||||
else
|
||||
{
|
||||
bool forward = (direction == ForwardScanDirection);
|
||||
|
||||
if (rows_fetched >= count)
|
||||
break;
|
||||
if (portal->atEnd && forward)
|
||||
break;
|
||||
if (portal->atStart && !forward)
|
||||
break;
|
||||
|
||||
tup = tuplestore_getheaptuple(portal->holdStore, forward, &should_free);
|
||||
|
||||
if (tup == NULL)
|
||||
for (;;)
|
||||
{
|
||||
if (forward)
|
||||
portal->atEnd = true;
|
||||
else
|
||||
portal->atStart = true;
|
||||
MemoryContext oldcontext;
|
||||
HeapTuple tup;
|
||||
bool should_free;
|
||||
|
||||
break;
|
||||
oldcontext = MemoryContextSwitchTo(portal->holdContext);
|
||||
|
||||
tup = tuplestore_getheaptuple(portal->holdStore, forward,
|
||||
&should_free);
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
if (tup == NULL)
|
||||
break;
|
||||
|
||||
(*destfunc->receiveTuple) (tup, queryDesc->tupDesc, destfunc);
|
||||
|
||||
if (should_free)
|
||||
pfree(tup);
|
||||
|
||||
/*
|
||||
* check our tuple count.. if we've processed the proper number
|
||||
* then quit, else loop again and process more tuples. Zero
|
||||
* count means no limit.
|
||||
*/
|
||||
current_tuple_count++;
|
||||
if (count && count == current_tuple_count)
|
||||
break;
|
||||
}
|
||||
|
||||
(*destfunc->receiveTuple) (tup, queryDesc->tupDesc, destfunc);
|
||||
|
||||
rows_fetched++;
|
||||
if (forward)
|
||||
portal->portalPos++;
|
||||
else
|
||||
portal->portalPos--;
|
||||
|
||||
if (forward && portal->atStart)
|
||||
portal->atStart = false;
|
||||
if (!forward && portal->atEnd)
|
||||
portal->atEnd = false;
|
||||
|
||||
if (should_free)
|
||||
pfree(tup);
|
||||
}
|
||||
|
||||
(*destfunc->cleanup) (destfunc);
|
||||
|
||||
return rows_fetched;
|
||||
return (uint32) current_tuple_count;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -544,9 +549,17 @@ static void
|
||||
DoPortalRewind(Portal portal)
|
||||
{
|
||||
if (portal->holdStore)
|
||||
{
|
||||
MemoryContext oldcontext;
|
||||
|
||||
oldcontext = MemoryContextSwitchTo(portal->holdContext);
|
||||
tuplestore_rescan(portal->holdStore);
|
||||
else
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
}
|
||||
if (portal->executorRunning)
|
||||
{
|
||||
ExecutorRewind(PortalGetQueryDesc(portal));
|
||||
}
|
||||
|
||||
portal->atStart = true;
|
||||
portal->atEnd = false;
|
||||
@ -630,12 +643,11 @@ PreparePortal(DeclareCursorStmt *stmt)
|
||||
/*
|
||||
* PortalCleanup
|
||||
*
|
||||
* Clean up a portal when it's dropped. Since this mainly exists to run
|
||||
* ExecutorEnd(), it should not be set as the cleanup hook until we have
|
||||
* called ExecutorStart() on the portal's query.
|
||||
* Clean up a portal when it's dropped. This is the standard cleanup hook
|
||||
* for portals.
|
||||
*/
|
||||
void
|
||||
PortalCleanup(Portal portal)
|
||||
PortalCleanup(Portal portal, bool isError)
|
||||
{
|
||||
/*
|
||||
* sanity checks
|
||||
@ -643,11 +655,31 @@ PortalCleanup(Portal portal)
|
||||
AssertArg(PortalIsValid(portal));
|
||||
AssertArg(portal->cleanup == PortalCleanup);
|
||||
|
||||
/*
|
||||
* Delete tuplestore if present. (Note: portalmem.c is responsible
|
||||
* for removing holdContext.) We should do this even under error
|
||||
* conditions.
|
||||
*/
|
||||
if (portal->holdStore)
|
||||
tuplestore_end(portal->holdStore);
|
||||
else
|
||||
ExecutorEnd(PortalGetQueryDesc(portal));
|
||||
{
|
||||
MemoryContext oldcontext;
|
||||
|
||||
oldcontext = MemoryContextSwitchTo(portal->holdContext);
|
||||
tuplestore_end(portal->holdStore);
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
portal->holdStore = NULL;
|
||||
}
|
||||
/*
|
||||
* Shut down executor, if still running. We skip this during error
|
||||
* abort, since other mechanisms will take care of releasing executor
|
||||
* resources, and we can't be sure that ExecutorEnd itself wouldn't fail.
|
||||
*/
|
||||
if (portal->executorRunning)
|
||||
{
|
||||
portal->executorRunning = false;
|
||||
if (!isError)
|
||||
ExecutorEnd(PortalGetQueryDesc(portal));
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@ -661,8 +693,10 @@ PortalCleanup(Portal portal)
|
||||
void
|
||||
PersistHoldablePortal(Portal portal)
|
||||
{
|
||||
MemoryContext oldcxt;
|
||||
QueryDesc *queryDesc = PortalGetQueryDesc(portal);
|
||||
MemoryContext oldcxt;
|
||||
CommandDest olddest;
|
||||
TupleDesc tupdesc;
|
||||
|
||||
/*
|
||||
* If we're preserving a holdable portal, we had better be
|
||||
@ -672,17 +706,15 @@ PersistHoldablePortal(Portal portal)
|
||||
Assert(portal->holdStore == NULL);
|
||||
|
||||
/*
|
||||
* This context is used to store portal data that needs to persist
|
||||
* between transactions.
|
||||
* This context is used to store the tuple set.
|
||||
* Caller must have created it already.
|
||||
*/
|
||||
Assert(portal->holdContext != NULL);
|
||||
oldcxt = MemoryContextSwitchTo(portal->holdContext);
|
||||
|
||||
/* XXX: Should SortMem be used for this? */
|
||||
portal->holdStore = tuplestore_begin_heap(true, true, SortMem);
|
||||
|
||||
/* Set the destination to output to the tuplestore */
|
||||
queryDesc->dest = Tuplestore;
|
||||
|
||||
/*
|
||||
* Rewind the executor: we need to store the entire result set in
|
||||
* the tuplestore, so that subsequent backward FETCHs can be
|
||||
@ -690,8 +722,29 @@ PersistHoldablePortal(Portal portal)
|
||||
*/
|
||||
ExecutorRewind(queryDesc);
|
||||
|
||||
/* Set the destination to output to the tuplestore */
|
||||
olddest = queryDesc->dest;
|
||||
queryDesc->dest = Tuplestore;
|
||||
|
||||
/* Fetch the result set into the tuplestore */
|
||||
ExecutorRun(queryDesc, ForwardScanDirection, 0);
|
||||
ExecutorRun(queryDesc, ForwardScanDirection, 0L);
|
||||
|
||||
queryDesc->dest = olddest;
|
||||
|
||||
/*
|
||||
* Before closing down the executor, we must copy the tupdesc, since
|
||||
* it was created in executor memory.
|
||||
*/
|
||||
tupdesc = CreateTupleDescCopy(queryDesc->tupDesc);
|
||||
|
||||
/*
|
||||
* Now shut down the inner executor.
|
||||
*/
|
||||
portal->executorRunning = false;
|
||||
ExecutorEnd(queryDesc);
|
||||
|
||||
/* ExecutorEnd clears this, so must wait to save copied pointer */
|
||||
queryDesc->tupDesc = tupdesc;
|
||||
|
||||
/*
|
||||
* Reset the position in the result set: ideally, this could be
|
||||
@ -702,69 +755,26 @@ PersistHoldablePortal(Portal portal)
|
||||
*/
|
||||
if (!portal->atEnd)
|
||||
{
|
||||
int store_pos = 0;
|
||||
bool should_free;
|
||||
long store_pos;
|
||||
|
||||
tuplestore_rescan(portal->holdStore);
|
||||
|
||||
while (store_pos < portal->portalPos)
|
||||
for (store_pos = 0; store_pos < portal->portalPos; store_pos++)
|
||||
{
|
||||
HeapTuple tmp = tuplestore_gettuple(portal->holdStore,
|
||||
true, &should_free);
|
||||
HeapTuple tup;
|
||||
bool should_free;
|
||||
|
||||
if (tmp == NULL)
|
||||
tup = tuplestore_gettuple(portal->holdStore, true,
|
||||
&should_free);
|
||||
|
||||
if (tup == NULL)
|
||||
elog(ERROR,
|
||||
"PersistHoldablePortal: unexpected end of tuple stream");
|
||||
|
||||
store_pos++;
|
||||
|
||||
/*
|
||||
* This could probably be optimized by creating and then
|
||||
* deleting a separate memory context for this series of
|
||||
* operations.
|
||||
*/
|
||||
if (should_free)
|
||||
pfree(tmp);
|
||||
pfree(tup);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* The current Portal structure contains some data that will be
|
||||
* needed by the holdable cursor, but it has been allocated in a
|
||||
* memory context that is not sufficiently long-lived: we need to
|
||||
* copy it into the portal's long-term memory context.
|
||||
*/
|
||||
{
|
||||
TupleDesc tupDescCopy;
|
||||
QueryDesc *queryDescCopy;
|
||||
|
||||
/*
|
||||
* We need to use this order as ExecutorEnd invalidates the
|
||||
* queryDesc's tuple descriptor
|
||||
*/
|
||||
tupDescCopy = CreateTupleDescCopy(queryDesc->tupDesc);
|
||||
|
||||
ExecutorEnd(queryDesc);
|
||||
|
||||
queryDescCopy = palloc(sizeof(*queryDescCopy));
|
||||
|
||||
/*
|
||||
* This doesn't copy all the dependant data in the QueryDesc,
|
||||
* but that's okay -- the only complex field we need to keep is
|
||||
* the query's tupledesc, which we've copied ourselves.
|
||||
*/
|
||||
memcpy(queryDescCopy, queryDesc, sizeof(*queryDesc));
|
||||
|
||||
FreeQueryDesc(queryDesc);
|
||||
|
||||
queryDescCopy->tupDesc = tupDescCopy;
|
||||
portal->queryDesc = queryDescCopy;
|
||||
}
|
||||
|
||||
/*
|
||||
* We no longer need the portal's short-term memory context.
|
||||
*/
|
||||
MemoryContextDelete(PortalGetHeapMemory(portal));
|
||||
|
||||
PortalGetHeapMemory(portal) = NULL;
|
||||
MemoryContextSwitchTo(oldcxt);
|
||||
}
|
||||
|
Reference in New Issue
Block a user