1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-12 05:01:15 +03:00

Allow internal sorts to be stored in memory rather than in files.

This commit is contained in:
Bruce Momjian
1997-08-06 03:42:21 +00:00
parent 3bea7b138b
commit f5f366e188
11 changed files with 633 additions and 516 deletions

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/utils/sort/Attic/lselect.c,v 1.3 1997/05/20 11:35:48 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/utils/sort/Attic/lselect.c,v 1.4 1997/08/06 03:41:47 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -26,37 +26,14 @@
#include "utils/psort.h"
#include "utils/lselect.h"
extern Relation SortRdesc; /* later static */
/*
* PUTTUP - writes the next tuple
* ENDRUN - mark end of run
* GETLEN - reads the length of the next tuple
* ALLOCTUP - returns space for the new tuple
* SETTUPLEN - stores the length into the tuple
* GETTUP - reads the tuple
*
* Note:
* LEN field must be a short; FP is a stream
*/
#define PUTTUP(TUP, FP) fwrite((char *)TUP, (TUP)->t_len, 1, FP)
#define ENDRUN(FP) fwrite((char *)&shortzero, sizeof (shortzero), 1, FP)
#define GETLEN(LEN, FP) fread(&(LEN), sizeof (shortzero), 1, FP)
#define ALLOCTUP(LEN) ((HeapTuple)palloc((unsigned)LEN))
#define GETTUP(TUP, LEN, FP)\
fread((char *)(TUP) + sizeof (shortzero), 1, (LEN) - sizeof (shortzero), FP)
#define SETTUPLEN(TUP, LEN) (TUP)->t_len = LEN
/*
* USEMEM - record use of memory
* FREEMEM - record freeing of memory
* FULLMEM - 1 iff a tuple will fit
*/
#define USEMEM(AMT) SortMemory -= (AMT)
#define FREEMEM(AMT) SortMemory += (AMT)
#define LACKMEM() (SortMemory <= BLCKSZ) /* not accurate */
#define USEMEM(context,AMT) context->sortMem -= (AMT)
#define FREEMEM(context,AMT) context->sortMem += (AMT)
/*
* lmerge - merges two leftist trees into one
@@ -67,12 +44,12 @@ extern Relation SortRdesc; /* later static */
* speed up code significantly.
*/
struct leftist *
lmerge(struct leftist *pt, struct leftist *qt)
lmerge(struct leftist *pt, struct leftist *qt, LeftistContext context)
{
register struct leftist *root, *majorLeftist, *minorLeftist;
int dist;
if (tuplecmp(pt->lt_tuple, qt->lt_tuple)) {
if (tuplecmp(pt->lt_tuple, qt->lt_tuple, context)) {
root = pt;
majorLeftist = qt;
} else {
@@ -83,7 +60,7 @@ lmerge(struct leftist *pt, struct leftist *qt)
root->lt_left = majorLeftist;
else {
if ((minorLeftist = root->lt_right) != NULL)
majorLeftist = lmerge(majorLeftist, minorLeftist);
majorLeftist = lmerge(majorLeftist, minorLeftist, context);
if ((dist = root->lt_left->lt_dist) < majorLeftist->lt_dist) {
root->lt_dist = 1 + dist;
root->lt_right = root->lt_left;
@@ -97,11 +74,11 @@ lmerge(struct leftist *pt, struct leftist *qt)
}
static struct leftist *
linsert(struct leftist *root, struct leftist *new1)
linsert(struct leftist *root, struct leftist *new1, LeftistContext context)
{
register struct leftist *left, *right;
if (! tuplecmp(root->lt_tuple, new1->lt_tuple)) {
if (! tuplecmp(root->lt_tuple, new1->lt_tuple, context)) {
new1->lt_left = root;
return(new1);
}
@@ -116,7 +93,7 @@ linsert(struct leftist *root, struct leftist *new1)
}
return(root);
}
right = linsert(right, new1);
right = linsert(right, new1, context);
if (right->lt_dist < left->lt_dist) {
root->lt_dist = 1 + left->lt_dist;
root->lt_left = right;
@@ -142,7 +119,8 @@ linsert(struct leftist *root, struct leftist *new1)
*/
HeapTuple
gettuple(struct leftist **treep,
short *devnum) /* device from which tuple came */
short *devnum, /* device from which tuple came */
LeftistContext context)
{
register struct leftist *tp;
HeapTuple tup;
@@ -153,9 +131,9 @@ gettuple(struct leftist **treep,
if (tp->lt_dist == 1) /* lt_left == NULL */
*treep = tp->lt_left;
else
*treep = lmerge(tp->lt_left, tp->lt_right);
*treep = lmerge(tp->lt_left, tp->lt_right, context);
FREEMEM(sizeof (struct leftist));
FREEMEM(context,sizeof (struct leftist));
FREE(tp);
return(tup);
}
@@ -169,14 +147,17 @@ gettuple(struct leftist **treep,
* Note:
* Currently never returns NULL BUG
*/
int
puttuple(struct leftist **treep, HeapTuple newtuple, int devnum)
void
puttuple(struct leftist **treep,
HeapTuple newtuple,
short devnum,
LeftistContext context)
{
register struct leftist *new1;
register struct leftist *tp;
new1 = (struct leftist *) palloc((unsigned) sizeof (struct leftist));
USEMEM(sizeof (struct leftist));
USEMEM(context,sizeof (struct leftist));
new1->lt_dist = 1;
new1->lt_devnum = devnum;
new1->lt_tuple = newtuple;
@@ -185,38 +166,11 @@ puttuple(struct leftist **treep, HeapTuple newtuple, int devnum)
if ((tp = *treep) == NULL)
*treep = new1;
else
*treep = linsert(tp, new1);
return(1);
*treep = linsert(tp, new1, context);
return;
}
/*
* dumptuples - stores all the tuples in tree into file
*/
void
dumptuples(FILE *file)
{
register struct leftist *tp;
register struct leftist *newp;
HeapTuple tup;
tp = Tuples;
while (tp != NULL) {
tup = tp->lt_tuple;
if (tp->lt_dist == 1) /* lt_right == NULL */
newp = tp->lt_left;
else
newp = lmerge(tp->lt_left, tp->lt_right);
FREEMEM(sizeof (struct leftist));
FREE(tp);
PUTTUP(tup, file);
FREEMEM(tup->t_len);
FREE(tup);
tp = newp;
}
Tuples = NULL;
}
/*
* tuplecmp - Compares two tuples with respect CmpList
*
@@ -225,7 +179,7 @@ dumptuples(FILE *file)
* Assumtions:
*/
int
tuplecmp(HeapTuple ltup, HeapTuple rtup)
tuplecmp(HeapTuple ltup, HeapTuple rtup, LeftistContext context)
{
register char *lattr, *rattr;
int nkey = 0;
@@ -238,24 +192,27 @@ tuplecmp(HeapTuple ltup, HeapTuple rtup)
return(0);
if (rtup == (HeapTuple)NULL)
return(1);
while (nkey < Nkeys && !result) {
while (nkey < context->nKeys && !result) {
lattr = heap_getattr(ltup, InvalidBuffer,
Key[nkey].sk_attno,
RelationGetTupleDescriptor(SortRdesc),
&isnull);
context->scanKeys[nkey].sk_attno,
context->tupDesc, &isnull);
if (isnull)
return(0);
rattr = heap_getattr(rtup, InvalidBuffer,
Key[nkey].sk_attno,
RelationGetTupleDescriptor(SortRdesc),
context->scanKeys[nkey].sk_attno,
context->tupDesc,
&isnull);
if (isnull)
return(1);
if (Key[nkey].sk_flags & SK_COMMUTE) {
if (!(result = (long) (*Key[nkey].sk_func) (rattr, lattr)))
result = -(long) (*Key[nkey].sk_func) (lattr, rattr);
} else if (!(result = (long) (*Key[nkey].sk_func) (lattr, rattr)))
result = -(long) (*Key[nkey].sk_func) (rattr, lattr);
if (context->scanKeys[nkey].sk_flags & SK_COMMUTE) {
if (!(result =
(long) (*context->scanKeys[nkey].sk_func) (rattr, lattr)))
result =
-(long) (*context->scanKeys[nkey].sk_func) (lattr, rattr);
} else if (!(result =
(long) (*context->scanKeys[nkey].sk_func) (lattr, rattr)))
result =
-(long) (*context->scanKeys[nkey].sk_func) (rattr, lattr);
nkey++;
}
return (result == 1);
@@ -263,7 +220,7 @@ tuplecmp(HeapTuple ltup, HeapTuple rtup)
#ifdef EBUG
void
checktree(struct leftist *tree)
checktree(struct leftist *tree, LeftistContext context)
{
int lnodes;
int rnodes;
@@ -272,8 +229,8 @@ checktree(struct leftist *tree)
puts("Null tree.");
return;
}
lnodes = checktreer(tree->lt_left, 1);
rnodes = checktreer(tree->lt_right, 1);
lnodes = checktreer(tree->lt_left, 1, context);
rnodes = checktreer(tree->lt_right, 1, context);
if (lnodes < 0) {
lnodes = -lnodes;
puts("0:\tBad left side.");
@@ -297,24 +254,24 @@ checktree(struct leftist *tree)
} else if (tree->lt_dist != 1+ tree->lt_right->lt_dist)
puts("0:\tDistance incorrect.");
if (lnodes > 0)
if (tuplecmp(tree->lt_left->lt_tuple, tree->lt_tuple))
if (tuplecmp(tree->lt_left->lt_tuple, tree->lt_tuple, context))
printf("%d:\tLeft child < parent.\n");
if (rnodes > 0)
if (tuplecmp(tree->lt_right->lt_tuple, tree->lt_tuple))
if (tuplecmp(tree->lt_right->lt_tuple, tree->lt_tuple, context))
printf("%d:\tRight child < parent.\n");
printf("Tree has %d nodes\n", 1 + lnodes + rnodes);
}
int
checktreer(struct leftist *tree, int level)
checktreer(struct leftist *tree, int level, LeftistContext context)
{
int lnodes, rnodes;
int error = 0;
if (tree == NULL)
return(0);
lnodes = checktreer(tree->lt_left, level + 1);
rnodes = checktreer(tree->lt_right, level + 1);
lnodes = checktreer(tree->lt_left, level + 1, context);
rnodes = checktreer(tree->lt_right, level + 1, context);
if (lnodes < 0) {
error = 1;
lnodes = -lnodes;
@@ -349,12 +306,12 @@ checktreer(struct leftist *tree, int level)
printf("%d:\tDistance incorrect.\n", level);
}
if (lnodes > 0)
if (tuplecmp(tree->lt_left->lt_tuple, tree->lt_tuple)) {
if (tuplecmp(tree->lt_left->lt_tuple, tree->lt_tuple, context)) {
error = 1;
printf("%d:\tLeft child < parent.\n");
}
if (rnodes > 0)
if (tuplecmp(tree->lt_right->lt_tuple, tree->lt_tuple)) {
if (tuplecmp(tree->lt_right->lt_tuple, tree->lt_tuple, context)) {
error = 1;
printf("%d:\tRight child < parent.\n");
}

View File

@@ -7,11 +7,25 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/utils/sort/Attic/psort.c,v 1.5 1997/07/24 20:18:07 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/utils/sort/Attic/psort.c,v 1.6 1997/08/06 03:41:55 momjian Exp $
*
* NOTES
* Sorts the first relation into the second relation. The sort may
* not be called twice simultaneously.
* Sorts the first relation into the second relation.
*
* The old psort.c's routines formed a temporary relation from the merged
* sort files. This version keeps the files around instead of generating the
* relation from them, and provides interface functions to the file so that
* you can grab tuples, mark a position in the file, restore a position in the
* file. You must now explicitly call an interface function to end the sort,
* psort_end, when you are done.
* Now most of the global variables are stuck in the Sort nodes, and
* accessed from there (they are passed to all the psort routines) so that
* each sort running has its own separate state. This is facilitated by having
* the Sort nodes passed in to all the interface functions.
* The one global variable that all the sorts still share is SortMemory.
* You should now be allowed to run two or more psorts concurrently,
* so long as the memory they eat up is not greater than SORTMEM, the initial
* value of SortMemory. -Rex 2.15.1995
*
* Use the tape-splitting method (Knuth, Vol. III, pp281-86) in the future.
*
@@ -21,7 +35,6 @@
*/
#include <stdio.h>
#include <math.h>
#include <unistd.h>
#include "postgres.h"
@@ -35,120 +48,150 @@
#include "storage/buf.h"
#include "storage/bufmgr.h" /* for BLCKSZ */
#include "utils/portal.h" /* for {Start,End}PortalAllocMode */
#include "utils/elog.h"
#include "utils/rel.h"
#include "utils/psort.h"
#include "nodes/execnodes.h"
#include "nodes/plannodes.h"
#include "executor/executor.h"
#include "utils/lselect.h"
#include "utils/psort.h"
#include "storage/fd.h"
#ifndef HAVE_MEMMOVE
# include <regex/utils.h>
#else
# include <string.h>
#endif
#define TEMPDIR "./"
int Nkeys;
ScanKey Key;
int SortMemory;
static int TapeRange; /* number of tapes - 1 (T) */
static int Level; /* (l) */
static int TotalDummy; /* summation of tp_dummy */
static struct tape Tape[MAXTAPES];
static long shortzero = 0; /* used to delimit runs */
static struct tuple *LastTuple = NULL; /* last output */
static int BytesRead; /* to keep track of # of IO */
static int BytesWritten;
Relation SortRdesc; /* current tuples in memory */
struct leftist *Tuples; /* current tuples in memory */
extern int SortMem; /* defined as postgres option */
static long shortzero = 0; /* used to delimit runs */
/*
* psort - polyphase merge sort entry point
* old psort global variables
*
* (These are the global variables from the old psort. They are still used,
* but are now accessed from Sort nodes using the PS macro. Note that while
* these variables will be accessed by PS(node)->whatever, they will still
* be called by their original names within the comments! -Rex 2.10.1995)
*
* LeftistContextData treeContext;
*
* static int TapeRange; // number of tapes - 1 (T) //
* static int Level; // (l) //
* static int TotalDummy; // summation of tp_dummy //
* static struct tape *Tape;
*
* static int BytesRead; // to keep track of # of IO //
* static int BytesWritten;
*
* struct leftist *Tuples; // current tuples in memory //
*
* FILE *psort_grab_file; // this holds tuples grabbed
* from merged sort runs //
* long psort_current; // current file position //
* long psort_saved; // file position saved for
* mark and restore //
*/
void
psort(Relation oldrel, Relation newrel, int nkeys, ScanKey key)
/*
* PS - Macro to access and cast psortstate from a Sort node
*/
#define PS(N) ((Psortstate *)N->psortstate)
/*
* psort_begin - polyphase merge sort entry point. Sorts the subplan
* into a temporary file psort_grab_file. After
* this is called, calling the interface function
* psort_grabtuple iteratively will get you the sorted
* tuples. psort_end then finishes the sort off, after
* all the tuples have been grabbed.
*
* Allocates and initializes sort node's psort state.
*/
bool
psort_begin(Sort *node, int nkeys, ScanKey key)
{
bool empty; /* to answer: is child node empty? */
node->psortstate = (struct Psortstate *)palloc(sizeof(struct Psortstate));
if (node->psortstate == NULL)
return false;
AssertArg(nkeys >= 1);
AssertArg(key[0].sk_attno != 0);
AssertArg(key[0].sk_procedure != 0);
Nkeys = nkeys;
Key = key;
SortMemory = 0;
SortRdesc = oldrel;
BytesRead = 0;
BytesWritten = 0;
/*
* may not be the best place.
*
* Pass 0 for the "limit" as the argument is currently ignored.
* Previously, only one arg was passed. -mer 12 Nov. 1991
*/
StartPortalAllocMode(StaticAllocMode, (Size)0);
initpsort();
initialrun(oldrel);
/* call finalrun(newrel, mergerun()) instead */
endpsort(newrel, mergeruns());
EndPortalAllocMode();
NDirectFileRead += (int)ceil((double)BytesRead / BLCKSZ);
NDirectFileWrite += (int)ceil((double)BytesWritten / BLCKSZ);
PS(node)->BytesRead = 0;
PS(node)->BytesWritten = 0;
PS(node)->treeContext.tupDesc =
ExecGetTupType(outerPlan((Plan *)node));
PS(node)->treeContext.nKeys = nkeys;
PS(node)->treeContext.scanKeys = key;
PS(node)->treeContext.sortMem = SortMem;
PS(node)->Tuples = NULL;
PS(node)->tupcount = 0;
PS(node)->using_tape_files = false;
PS(node)->memtuples = NULL;
initialrun(node, &empty);
if (empty)
return false;
if (PS(node)->using_tape_files)
PS(node)->psort_grab_file = mergeruns(node);
PS(node)->psort_current = 0;
PS(node)->psort_saved = 0;
return true;
}
/*
* TAPENO - number of tape in Tape
*/
#define TAPENO(NODE) (NODE - Tape)
#define TUPLENO(TUP) ((TUP == NULL) ? -1 : (int) TUP->t_iid)
/*
* initpsort - initializes the tapes
* inittapes - initializes the tapes
* - (polyphase merge Alg.D(D1)--Knuth, Vol.3, p.270)
* Returns:
* number of allocated tapes
*/
void
initpsort()
inittapes(Sort *node)
{
register int i;
register struct tape *tp;
Assert(node != (Sort *) NULL);
Assert(PS(node) != (Psortstate *) NULL);
/*
ASSERT(ntapes >= 3 && ntapes <= MAXTAPES,
"initpsort: Invalid number of tapes to initialize.\n");
"inittapes: Invalid number of tapes to initialize.\n");
*/
tp = Tape;
tp = PS(node)->Tape;
for (i = 0; i < MAXTAPES && (tp->tp_file = gettape()) != NULL; i++) {
tp->tp_dummy = 1;
tp->tp_fib = 1;
tp->tp_prev = tp - 1;
tp++;
}
TapeRange = --tp - Tape;
PS(node)->TapeRange = --tp - PS(node)->Tape;
tp->tp_dummy = 0;
tp->tp_fib = 0;
Tape[0].tp_prev = tp;
PS(node)->Tape[0].tp_prev = tp;
if (TapeRange <= 1)
elog(WARN, "initpsort: Could only allocate %d < 3 tapes\n",
TapeRange + 1);
if (PS(node)->TapeRange <= 1)
elog(WARN, "inittapes: Could only allocate %d < 3 tapes\n",
PS(node)->TapeRange + 1);
Level = 1;
TotalDummy = TapeRange;
SortMemory = SORTMEM;
LastTuple = NULL;
Tuples = NULL;
PS(node)->Level = 1;
PS(node)->TotalDummy = PS(node)->TapeRange;
PS(node)->using_tape_files = true;
}
/*
* resetpsort - resets (frees) malloc'd memory for an aborted Xaction
* resetpsort - resets (pfrees) palloc'd memory for an aborted Xaction
*
* Not implemented yet.
*/
@@ -170,16 +213,18 @@ resetpsort()
* LEN field must be a short; FP is a stream
*/
#define PUTTUP(TUP, FP)\
BytesWritten += (TUP)->t_len; \
fwrite((char *)TUP, (TUP)->t_len, 1, FP)
#define PUTTUP(NODE, TUP, FP) if (1) {\
((Psortstate *)NODE->psortstate)->BytesWritten += (TUP)->t_len; \
fwrite((char *)TUP, (TUP)->t_len, 1, FP);} else
#define ENDRUN(FP) fwrite((char *)&shortzero, sizeof (shortzero), 1, FP)
#define GETLEN(LEN, FP) fread((char *)&(LEN), sizeof (shortzero), 1, FP)
#define ALLOCTUP(LEN) ((HeapTuple)palloc((unsigned)LEN))
#define GETTUP(TUP, LEN, FP)\
#define GETTUP(NODE, TUP, LEN, FP) if (1) {\
IncrProcessed(); \
BytesRead += (LEN) - sizeof (shortzero); \
fread((char *)(TUP) + sizeof (shortzero), (LEN) - sizeof (shortzero), 1, FP)
((Psortstate *)NODE->psortstate)->BytesRead += (LEN) - sizeof (shortzero); \
fread((char *)(TUP) + sizeof (shortzero), (LEN) - sizeof (shortzero), 1, FP);} \
else
#define SETTUPLEN(TUP, LEN) (TUP)->t_len = LEN
/*
@@ -188,9 +233,9 @@ resetpsort()
* FULLMEM - 1 iff a tuple will fit
*/
#define USEMEM(AMT) SortMemory -= (AMT)
#define FREEMEM(AMT) SortMemory += (AMT)
#define LACKMEM() (SortMemory <= BLCKSZ) /* not accurate */
#define USEMEM(NODE,AMT) PS(node)->treeContext.sortMem -= (AMT)
#define FREEMEM(NODE,AMT) PS(node)->treeContext.sortMem += (AMT)
#define LACKMEM(NODE) (PS(node)->treeContext.sortMem <= MAXBLCKSZ) /* not accurate */
#define TRACEMEM(FUNC)
#define TRACEOUT(FUNC, TUP)
@@ -219,61 +264,66 @@ resetpsort()
* Also, perhaps allocate tapes when needed. Split into 2 funcs.
*/
void
initialrun(Relation rdesc)
initialrun(Sort *node, bool *empty)
{
/* register struct tuple *tup; */
register struct tape *tp;
HeapScanDesc sdesc;
int baseruns; /* D:(a) */
int morepasses; /* EOF */
int extrapasses; /* EOF */
Assert(node != (Sort *) NULL);
Assert(PS(node) != (Psortstate *) NULL);
tp = PS(node)->Tape;
sdesc = heap_beginscan(rdesc, 0, NowTimeQual, 0,
(ScanKey)NULL);
tp = Tape;
if ((bool)createrun(sdesc, tp->tp_file) != false)
morepasses = 0;
if ((bool)createrun(node, tp->tp_file, empty) != false) {
if (! PS(node)->using_tape_files)
inittapes(node);
extrapasses = 0;
}
else
morepasses = 1 + (Tuples != NULL); /* (T != N) ? 2 : 1 */
return; /* if rows fit in memory, we never access tape stuff */
for ( ; ; ) {
tp->tp_dummy--;
TotalDummy--;
PS(node)->TotalDummy--;
if (tp->tp_dummy < (tp + 1)->tp_dummy)
tp++;
else if (tp->tp_dummy != 0)
tp = Tape;
tp = PS(node)->Tape;
else {
Level++;
baseruns = Tape[0].tp_fib;
for (tp = Tape; tp - Tape < TapeRange; tp++) {
TotalDummy +=
PS(node)->Level++;
baseruns = PS(node)->Tape[0].tp_fib;
for (tp = PS(node)->Tape;
tp - PS(node)->Tape < PS(node)->TapeRange; tp++) {
PS(node)->TotalDummy +=
(tp->tp_dummy = baseruns
+ (tp + 1)->tp_fib
- tp->tp_fib);
tp->tp_fib = baseruns
+ (tp + 1)->tp_fib;
}
tp = Tape; /* D4 */
tp = PS(node)->Tape; /* D4 */
} /* D3 */
if (morepasses)
if (--morepasses) {
dumptuples(tp->tp_file);
if (extrapasses)
if (--extrapasses) {
dumptuples(node);
ENDRUN(tp->tp_file);
continue;
} else
break;
if ((bool)createrun(sdesc, tp->tp_file) == false)
morepasses = 1 + (Tuples != NULL);
if ((bool)createrun(node, tp->tp_file, empty) == false)
extrapasses = 1 + (PS(node)->Tuples != NULL);
/* D2 */
}
for (tp = Tape + TapeRange; tp >= Tape; tp--)
rewind(tp->tp_file); /* D. */
heap_endscan(sdesc);
for (tp = PS(node)->Tape + PS(node)->TapeRange; tp >= PS(node)->Tape; tp--)
rewind(tp->tp_file); /* D. */
}
/*
* createrun - places the next run on file
* createrun - places the next run on file, grabbing the tuples by
* executing the subplan passed in
*
* Uses:
* Tuples, which should contain any tuples for this run
@@ -283,7 +333,7 @@ initialrun(Relation rdesc)
* Tuples contains the tuples for the following run upon exit
*/
bool
createrun(HeapScanDesc sdesc, FILE *file)
createrun(Sort *node, FILE *file, bool *empty)
{
register HeapTuple lasttuple;
register HeapTuple btup, tup;
@@ -291,47 +341,74 @@ createrun(HeapScanDesc sdesc, FILE *file)
Buffer b;
bool foundeor;
short junk;
int cr_tuples = 0; /* Count tuples grabbed from plannode */
TupleTableSlot *cr_slot;
Assert(node != (Sort *) NULL);
Assert(PS(node) != (Psortstate *) NULL);
lasttuple = NULL;
nextrun = NULL;
foundeor = false;
for ( ; ; ) {
while (LACKMEM() && Tuples != NULL) {
while (LACKMEM(node) && PS(node)->Tuples != NULL) {
if (lasttuple != NULL) {
FREEMEM(lasttuple->t_len);
FREEMEM(node,lasttuple->t_len);
FREE(lasttuple);
TRACEMEM(createrun);
}
lasttuple = tup = gettuple(&Tuples, &junk);
PUTTUP(tup, file);
lasttuple = tup = gettuple(&PS(node)->Tuples, &junk,
&PS(node)->treeContext);
if (! PS(node)->using_tape_files)
inittapes(node);
PUTTUP(node, tup, PS(node)->Tape->tp_file);
TRACEOUT(createrun, tup);
}
if (LACKMEM())
if (LACKMEM(node))
break;
btup = heap_getnext(sdesc, 0, &b);
if (!HeapTupleIsValid(btup)) {
/* About to call ExecProcNode, it can mess up the state if it
* eventually calls another Sort node. So must stow it away here for
* the meantime. -Rex 2.2.1995
*/
cr_slot = ExecProcNode(outerPlan((Plan *)node), (Plan *)node);
if (TupIsNull(cr_slot)) {
foundeor = true;
break;
}
else {
tup = tuplecopy(cr_slot->val);
ExecClearTuple(cr_slot);
PS(node)->tupcount++;
cr_tuples++;
}
IncrProcessed();
tup = tuplecopy(btup, sdesc->rs_rd, b);
USEMEM(tup->t_len);
USEMEM(node,tup->t_len);
TRACEMEM(createrun);
if (lasttuple != NULL && tuplecmp(tup, lasttuple))
puttuple(&nextrun, tup, 0);
if (lasttuple != NULL && tuplecmp(tup, lasttuple,
&PS(node)->treeContext))
puttuple(&nextrun, tup, 0, &PS(node)->treeContext);
else
puttuple(&Tuples, tup, 0);
ReleaseBuffer(b);
puttuple(&PS(node)->Tuples, tup, 0, &PS(node)->treeContext);
}
if (lasttuple != NULL) {
FREEMEM(lasttuple->t_len);
FREEMEM(node,lasttuple->t_len);
FREE(lasttuple);
TRACEMEM(createrun);
}
dumptuples(file);
ENDRUN(file);
dumptuples(node);
if (PS(node)->using_tape_files)
ENDRUN(file);
/* delimit the end of the run */
Tuples = nextrun;
PS(node)->Tuples = nextrun;
/* if we did not see any tuples, mark empty */
*empty = (cr_tuples > 0) ? false : true;
return((bool)! foundeor); /* XXX - works iff bool is {0,1} */
}
@@ -339,10 +416,10 @@ createrun(HeapScanDesc sdesc, FILE *file)
* tuplecopy - see also tuple.c:palloctup()
*
* This should eventually go there under that name? And this will
* then use malloc directly (see version -r1.2).
* then use palloc directly (see version -r1.2).
*/
HeapTuple
tuplecopy(HeapTuple tup, Relation rdesc, Buffer b)
tuplecopy(HeapTuple tup)
{
HeapTuple rettup;
@@ -362,18 +439,22 @@ tuplecopy(HeapTuple tup, Relation rdesc, Buffer b)
* file of tuples in order
*/
FILE *
mergeruns()
mergeruns(Sort *node)
{
register struct tape *tp;
tp = Tape + TapeRange;
merge(tp);
register struct tape *tp;
Assert(node != (Sort *) NULL);
Assert(PS(node) != (Psortstate *) NULL);
Assert(PS(node)->using_tape_files == true);
tp = PS(node)->Tape + PS(node)->TapeRange;
merge(node, tp);
rewind(tp->tp_file);
while (--Level != 0) {
while (--PS(node)->Level != 0) {
tp = tp->tp_prev;
rewind(tp->tp_file);
/* resettape(tp->tp_file); -not sufficient */
merge(tp);
/* resettape(tp->tp_file); -not sufficient */
merge(node, tp);
rewind(tp->tp_file);
}
return(tp->tp_file);
@@ -384,7 +465,7 @@ mergeruns()
* (polyphase merge Alg.D(D5)--Knuth, Vol.3, p271)
*/
void
merge(struct tape *dest)
merge(Sort *node, struct tape *dest)
{
register HeapTuple tup;
register struct tape *lasttp; /* (TAPE[P]) */
@@ -396,6 +477,10 @@ merge(struct tape *dest)
short fromtape;
long tuplen;
Assert(node != (Sort *) NULL);
Assert(PS(node) != (Psortstate *) NULL);
Assert(PS(node)->using_tape_files == true);
lasttp = dest->tp_prev;
times = lasttp->tp_fib;
for (tp = lasttp ; tp != dest; tp = tp->tp_prev)
@@ -403,95 +488,217 @@ merge(struct tape *dest)
tp->tp_fib += times;
/* Tape[].tp_fib (A[]) is set to proper exit values */
if (TotalDummy < TapeRange) /* no complete dummy runs */
if (PS(node)->TotalDummy < PS(node)->TapeRange)/* no complete dummy runs */
outdummy = 0;
else {
outdummy = TotalDummy; /* a large positive number */
outdummy = PS(node)->TotalDummy; /* a large positive number */
for (tp = lasttp; tp != dest; tp = tp->tp_prev)
if (outdummy > tp->tp_dummy)
outdummy = tp->tp_dummy;
for (tp = lasttp; tp != dest; tp = tp->tp_prev)
tp->tp_dummy -= outdummy;
tp->tp_dummy += outdummy;
TotalDummy -= outdummy * TapeRange;
PS(node)->TotalDummy -= outdummy * PS(node)->TapeRange;
/* do not add the outdummy runs yet */
times -= outdummy;
}
destfile = dest->tp_file;
while (times-- != 0) { /* merge one run */
while (times-- != 0) { /* merge one run */
tuples = NULL;
if (TotalDummy == 0)
if (PS(node)->TotalDummy == 0)
for (tp = dest->tp_prev; tp != dest; tp = tp->tp_prev) {
GETLEN(tuplen, tp->tp_file);
tup = ALLOCTUP(tuplen);
USEMEM(tuplen);
USEMEM(node,tuplen);
TRACEMEM(merge);
SETTUPLEN(tup, tuplen);
GETTUP(tup, tuplen, tp->tp_file);
puttuple(&tuples, tup, TAPENO(tp));
GETTUP(node, tup, tuplen, tp->tp_file);
puttuple(&tuples, tup, tp - PS(node)->Tape,
&PS(node)->treeContext);
}
else {
for (tp = dest->tp_prev; tp != dest; tp = tp->tp_prev) {
if (tp->tp_dummy != 0) {
tp->tp_dummy--;
TotalDummy--;
PS(node)->TotalDummy--;
} else {
GETLEN(tuplen, tp->tp_file);
tup = ALLOCTUP(tuplen);
USEMEM(tuplen);
USEMEM(node,tuplen);
TRACEMEM(merge);
SETTUPLEN(tup, tuplen);
GETTUP(tup, tuplen, tp->tp_file);
puttuple(&tuples, tup, TAPENO(tp));
GETTUP(node, tup, tuplen, tp->tp_file);
puttuple(&tuples, tup, tp - PS(node)->Tape,
&PS(node)->treeContext);
}
}
}
while (tuples != NULL) {
/* possible optimization by using count in tuples */
tup = gettuple(&tuples, &fromtape);
PUTTUP(tup, destfile);
FREEMEM(tup->t_len);
tup = gettuple(&tuples, &fromtape, &PS(node)->treeContext);
PUTTUP(node, tup, destfile);
FREEMEM(node,tup->t_len);
FREE(tup);
TRACEMEM(merge);
GETLEN(tuplen, Tape[fromtape].tp_file);
GETLEN(tuplen, PS(node)->Tape[fromtape].tp_file);
if (tuplen == 0)
;
else {
tup = ALLOCTUP(tuplen);
USEMEM(tuplen);
USEMEM(node,tuplen);
TRACEMEM(merge);
SETTUPLEN(tup, tuplen);
GETTUP(tup, tuplen, Tape[fromtape].tp_file);
puttuple(&tuples, tup, fromtape);
GETTUP(node, tup, tuplen, PS(node)->Tape[fromtape].tp_file);
puttuple(&tuples, tup, fromtape, &PS(node)->treeContext);
}
}
}
ENDRUN(destfile);
}
TotalDummy += outdummy;
PS(node)->TotalDummy += outdummy;
}
/*
* endpsort - creates the new relation and unlinks the tape files
* dumptuples - stores all the tuples in tree into file
*/
void
endpsort(Relation rdesc, FILE *file)
dumptuples(Sort *node)
{
register struct tape *tp;
register HeapTuple tup;
long tuplen;
register struct leftist *tp;
register struct leftist *newp;
struct leftist **treep = &PS(node)->Tuples;
LeftistContext context = &PS(node)->treeContext;
HeapTuple tup;
int memtupindex = 0;
if (! PS(node)->using_tape_files) {
Assert(PS(node)->memtuples == NULL);
PS(node)->memtuples = palloc(PS(node)->tupcount * sizeof(HeapTuple));
}
if (! feof(file))
while (GETLEN(tuplen, file) && tuplen != 0) {
tup = ALLOCTUP(tuplen);
SortMemory += tuplen;
SETTUPLEN(tup, tuplen);
GETTUP(tup, tuplen, file);
heap_insert(rdesc, tup);
tp = *treep;
while (tp != NULL) {
tup = tp->lt_tuple;
if (tp->lt_dist == 1) /* lt_right == NULL */
newp = tp->lt_left;
else
newp = lmerge(tp->lt_left, tp->lt_right, context);
FREEMEM(node,sizeof (struct leftist));
FREE(tp);
if (PS(node)->using_tape_files) {
PUTTUP(node, tup, PS(node)->Tape->tp_file);
FREEMEM(node,tup->t_len);
FREE(tup);
SortMemory -= tuplen;
}
for (tp = Tape + TapeRange; tp >= Tape; tp--)
destroytape(tp->tp_file);
else
PS(node)->memtuples[memtupindex++] = tup;
tp = newp;
}
*treep = NULL;
}
/*
* psort_grabtuple - gets a tuple from the sorted file and returns it.
* If there are no tuples left, returns NULL.
* Should not call psort_end unless this has returned
* a NULL indicating the last tuple has been processed.
*/
HeapTuple
psort_grabtuple(Sort *node)
{
register HeapTuple tup;
long tuplen;
Assert(node != (Sort *) NULL);
Assert(PS(node) != (Psortstate *) NULL);
if (PS(node)->using_tape_files == true) {
if (!feof(PS(node)->psort_grab_file)) {
if (GETLEN(tuplen, PS(node)->psort_grab_file) && tuplen != 0) {
tup = (HeapTuple)palloc((unsigned)tuplen);
SETTUPLEN(tup, tuplen);
GETTUP(node, tup, tuplen, PS(node)->psort_grab_file);
/* Update current merged sort file position */
PS(node)->psort_current += tuplen;
return tup;
}
else
return NULL;
}
else
return NULL;
}
else {
if (PS(node)->psort_current < PS(node)->tupcount)
return PS(node)->memtuples[PS(node)->psort_current++];
else
return NULL;
}
}
/*
* psort_markpos - saves current position in the merged sort file
*/
void
psort_markpos(Sort *node)
{
Assert(node != (Sort *) NULL);
Assert(PS(node) != (Psortstate *) NULL);
PS(node)->psort_saved = PS(node)->psort_current;
}
/*
* psort_restorepos- restores current position in merged sort file to
* last saved position
*/
void
psort_restorepos(Sort *node)
{
Assert(node != (Sort *) NULL);
Assert(PS(node) != (Psortstate *) NULL);
if (PS(node)->using_tape_files == true)
fseek(PS(node)->psort_grab_file, PS(node)->psort_saved, SEEK_SET);
PS(node)->psort_current = PS(node)->psort_saved;
}
/*
* psort_end - unlinks the tape files, and cleans up. Should not be
* called unless psort_grabtuple has returned a NULL.
*/
void
psort_end(Sort *node)
{
register struct tape *tp;
if (!node->cleaned) {
Assert(node != (Sort *) NULL);
/* Assert(PS(node) != (Psortstate *) NULL); */
/*
* I'm changing this because if we are sorting a relation
* with no tuples, psortstate is NULL.
*/
if (PS(node) != (Psortstate *) NULL) {
if (PS(node)->using_tape_files == true)
for (tp = PS(node)->Tape + PS(node)->TapeRange; tp >= PS(node)->Tape; tp--)
destroytape(tp->tp_file);
else if (PS(node)->memtuples)
pfree(PS(node)->memtuples);
NDirectFileRead +=
(int)ceil((double)PS(node)->BytesRead / BLCKSZ);
NDirectFileWrite +=
(int)ceil((double)PS(node)->BytesWritten / BLCKSZ);
pfree((void *)node->psortstate);
node->cleaned = TRUE;
}
}
}
/*
@@ -522,26 +729,34 @@ static char Tempfile[MAXPGPATH] = TEMPDIR;
FILE *
gettape()
{
register struct tapelst *tp;
FILE *file;
static int tapeinit = 0;
register struct tapelst *tp;
FILE *file;
static int tapeinit = 0;
char *mktemp();
static unsigned int uniqueFileId = 0;
extern int errno;
char uniqueName[MAXPGPATH];
tp = (struct tapelst *)palloc((unsigned)sizeof (struct tapelst));
if (!tapeinit) {
Tempfile[sizeof (TEMPDIR) - 1] = '/';
memmove(Tempfile + sizeof(TEMPDIR), TAPEEXT, sizeof (TAPEEXT));
tapeinit = 1;
}
tp->tl_name = palloc((unsigned)sizeof(Tempfile));
sprintf(uniqueName, "%spg_psort.%d.%d", TEMPDIR, getpid(), uniqueFileId);
uniqueFileId++;
tapeinit = 1;
tp->tl_name = palloc((unsigned)sizeof(uniqueName));
/*
* now, copy template with final null into malloc'd space
* now, copy template with final null into palloc'd space
*/
memmove(tp->tl_name, Tempfile, sizeof (TEMPDIR) + sizeof (TAPEEXT));
mktemp(tp->tl_name);
memmove(tp->tl_name, uniqueName, strlen(uniqueName));
AllocateFile();
file = fopen(tp->tl_name, "w+");
if (file == NULL) {
elog(NOTICE, "psort: gettape: fopen returned error code %i", errno);
/* XXX this should not happen */
FreeFile();
FREE(tp->tl_name);