1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-09 06:21:09 +03:00

Teach btree to handle ScalarArrayOpExpr quals natively.

This allows "indexedcol op ANY(ARRAY[...])" conditions to be used in plain
indexscans, and particularly in index-only scans.
This commit is contained in:
Tom Lane
2011-10-16 15:39:24 -04:00
parent 0898d71f66
commit 9e8da0f757
13 changed files with 726 additions and 129 deletions

View File

@@ -276,39 +276,63 @@ btgettuple(PG_FUNCTION_ARGS)
scan->xs_recheck = false;
/*
* If we've already initialized this scan, we can just advance it in the
* appropriate direction. If we haven't done so yet, we call a routine to
* get the first item in the scan.
* If we have any array keys, initialize them during first call for a
* scan. We can't do this in btrescan because we don't know the scan
* direction at that time.
*/
if (BTScanPosIsValid(so->currPos))
if (so->numArrayKeys && !BTScanPosIsValid(so->currPos))
{
/* punt if we have any unsatisfiable array keys */
if (so->numArrayKeys < 0)
PG_RETURN_BOOL(false);
_bt_start_array_keys(scan, dir);
}
/* This loop handles advancing to the next array elements, if any */
do
{
/*
* Check to see if we should kill the previously-fetched tuple.
* If we've already initialized this scan, we can just advance it in
* the appropriate direction. If we haven't done so yet, we call
* _bt_first() to get the first item in the scan.
*/
if (scan->kill_prior_tuple)
if (!BTScanPosIsValid(so->currPos))
res = _bt_first(scan, dir);
else
{
/*
* Yes, remember it for later. (We'll deal with all such tuples
* at once right before leaving the index page.) The test for
* numKilled overrun is not just paranoia: if the caller reverses
* direction in the indexscan then the same item might get entered
* multiple times. It's not worth trying to optimize that, so we
* don't detect it, but instead just forget any excess entries.
* Check to see if we should kill the previously-fetched tuple.
*/
if (so->killedItems == NULL)
so->killedItems = (int *)
palloc(MaxIndexTuplesPerPage * sizeof(int));
if (so->numKilled < MaxIndexTuplesPerPage)
so->killedItems[so->numKilled++] = so->currPos.itemIndex;
if (scan->kill_prior_tuple)
{
/*
* Yes, remember it for later. (We'll deal with all such
* tuples at once right before leaving the index page.) The
* test for numKilled overrun is not just paranoia: if the
* caller reverses direction in the indexscan then the same
* item might get entered multiple times. It's not worth
* trying to optimize that, so we don't detect it, but instead
* just forget any excess entries.
*/
if (so->killedItems == NULL)
so->killedItems = (int *)
palloc(MaxIndexTuplesPerPage * sizeof(int));
if (so->numKilled < MaxIndexTuplesPerPage)
so->killedItems[so->numKilled++] = so->currPos.itemIndex;
}
/*
* Now continue the scan.
*/
res = _bt_next(scan, dir);
}
/*
* Now continue the scan.
*/
res = _bt_next(scan, dir);
}
else
res = _bt_first(scan, dir);
/* If we have a tuple, return it ... */
if (res)
break;
/* ... otherwise see if we have more array keys to deal with */
} while (so->numArrayKeys && _bt_advance_array_keys(scan, dir));
PG_RETURN_BOOL(res);
}
@@ -325,35 +349,50 @@ btgetbitmap(PG_FUNCTION_ARGS)
int64 ntids = 0;
ItemPointer heapTid;
/* Fetch the first page & tuple. */
if (!_bt_first(scan, ForwardScanDirection))
/*
* If we have any array keys, initialize them.
*/
if (so->numArrayKeys)
{
/* empty scan */
PG_RETURN_INT64(0);
}
/* Save tuple ID, and continue scanning */
heapTid = &scan->xs_ctup.t_self;
tbm_add_tuples(tbm, heapTid, 1, false);
ntids++;
/* punt if we have any unsatisfiable array keys */
if (so->numArrayKeys < 0)
PG_RETURN_INT64(ntids);
for (;;)
_bt_start_array_keys(scan, ForwardScanDirection);
}
/* This loop handles advancing to the next array elements, if any */
do
{
/*
* Advance to next tuple within page. This is the same as the easy
* case in _bt_next().
*/
if (++so->currPos.itemIndex > so->currPos.lastItem)
/* Fetch the first page & tuple */
if (_bt_first(scan, ForwardScanDirection))
{
/* let _bt_next do the heavy lifting */
if (!_bt_next(scan, ForwardScanDirection))
break;
}
/* Save tuple ID, and continue scanning */
heapTid = &scan->xs_ctup.t_self;
tbm_add_tuples(tbm, heapTid, 1, false);
ntids++;
/* Save tuple ID, and continue scanning */
heapTid = &so->currPos.items[so->currPos.itemIndex].heapTid;
tbm_add_tuples(tbm, heapTid, 1, false);
ntids++;
}
for (;;)
{
/*
* Advance to next tuple within page. This is the same as the
* easy case in _bt_next().
*/
if (++so->currPos.itemIndex > so->currPos.lastItem)
{
/* let _bt_next do the heavy lifting */
if (!_bt_next(scan, ForwardScanDirection))
break;
}
/* Save tuple ID, and continue scanning */
heapTid = &so->currPos.items[so->currPos.itemIndex].heapTid;
tbm_add_tuples(tbm, heapTid, 1, false);
ntids++;
}
}
/* Now see if we have more array keys to deal with */
} while (so->numArrayKeys && _bt_advance_array_keys(scan, ForwardScanDirection));
PG_RETURN_INT64(ntids);
}
@@ -383,6 +422,12 @@ btbeginscan(PG_FUNCTION_ARGS)
so->keyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
else
so->keyData = NULL;
so->arrayKeyData = NULL; /* assume no array keys for now */
so->numArrayKeys = 0;
so->arrayKeys = NULL;
so->arrayContext = NULL;
so->killedItems = NULL; /* until needed */
so->numKilled = 0;
@@ -460,6 +505,9 @@ btrescan(PG_FUNCTION_ARGS)
scan->numberOfKeys * sizeof(ScanKeyData));
so->numberOfKeys = 0; /* until _bt_preprocess_keys sets it */
/* If any keys are SK_SEARCHARRAY type, set up array-key info */
_bt_preprocess_array_keys(scan);
PG_RETURN_VOID();
}
@@ -490,10 +538,13 @@ btendscan(PG_FUNCTION_ARGS)
so->markItemIndex = -1;
/* Release storage */
if (so->killedItems != NULL)
pfree(so->killedItems);
if (so->keyData != NULL)
pfree(so->keyData);
/* so->arrayKeyData and so->arrayKeys are in arrayContext */
if (so->arrayContext != NULL)
MemoryContextDelete(so->arrayContext);
if (so->killedItems != NULL)
pfree(so->killedItems);
if (so->currTuples != NULL)
pfree(so->currTuples);
/* so->markTuples should not be pfree'd, see btrescan */

View File

@@ -21,10 +21,26 @@
#include "access/reloptions.h"
#include "access/relscan.h"
#include "miscadmin.h"
#include "utils/array.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
typedef struct BTSortArrayContext
{
FmgrInfo flinfo;
Oid collation;
bool reverse;
} BTSortArrayContext;
static Datum _bt_find_extreme_element(IndexScanDesc scan, ScanKey skey,
StrategyNumber strat,
Datum *elems, int nelems);
static int _bt_sort_array_elements(IndexScanDesc scan, ScanKey skey,
bool reverse,
Datum *elems, int nelems);
static int _bt_compare_array_elements(const void *a, const void *b, void *arg);
static bool _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
ScanKey leftarg, ScanKey rightarg,
bool *result);
@@ -158,13 +174,433 @@ _bt_freestack(BTStack stack)
}
/*
* _bt_preprocess_array_keys() -- Preprocess SK_SEARCHARRAY scan keys
*
* If there are any SK_SEARCHARRAY scan keys, deconstruct the array(s) and
* set up BTArrayKeyInfo info for each one that is an equality-type key.
* Prepare modified scan keys in so->arrayKeyData, which will hold the current
* array elements during each primitive indexscan operation. For inequality
* array keys, it's sufficient to find the extreme element value and replace
* the whole array with that scalar value.
*
* Note: the reason we need so->arrayKeyData, rather than just scribbling
* on scan->keyData, is that callers are permitted to call btrescan without
* supplying a new set of scankey data.
*/
void
_bt_preprocess_array_keys(IndexScanDesc scan)
{
BTScanOpaque so = (BTScanOpaque) scan->opaque;
int numberOfKeys = scan->numberOfKeys;
int16 *indoption = scan->indexRelation->rd_indoption;
int numArrayKeys;
ScanKey cur;
int i;
MemoryContext oldContext;
/* Quick check to see if there are any array keys */
numArrayKeys = 0;
for (i = 0; i < numberOfKeys; i++)
{
cur = &scan->keyData[i];
if (cur->sk_flags & SK_SEARCHARRAY)
{
numArrayKeys++;
Assert(!(cur->sk_flags & (SK_ROW_HEADER | SK_SEARCHNULL | SK_SEARCHNOTNULL)));
/* If any arrays are null as a whole, we can quit right now. */
if (cur->sk_flags & SK_ISNULL)
{
so->numArrayKeys = -1;
so->arrayKeyData = NULL;
return;
}
}
}
/* Quit if nothing to do. */
if (numArrayKeys == 0)
{
so->numArrayKeys = 0;
so->arrayKeyData = NULL;
return;
}
/*
* Make a scan-lifespan context to hold array-associated data, or reset
* it if we already have one from a previous rescan cycle.
*/
if (so->arrayContext == NULL)
so->arrayContext = AllocSetContextCreate(CurrentMemoryContext,
"BTree Array Context",
ALLOCSET_SMALL_MINSIZE,
ALLOCSET_SMALL_INITSIZE,
ALLOCSET_SMALL_MAXSIZE);
else
MemoryContextReset(so->arrayContext);
oldContext = MemoryContextSwitchTo(so->arrayContext);
/* Create modifiable copy of scan->keyData in the workspace context */
so->arrayKeyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
memcpy(so->arrayKeyData,
scan->keyData,
scan->numberOfKeys * sizeof(ScanKeyData));
/* Allocate space for per-array data in the workspace context */
so->arrayKeys = (BTArrayKeyInfo *) palloc0(numArrayKeys * sizeof(BTArrayKeyInfo));
/* Now process each array key */
numArrayKeys = 0;
for (i = 0; i < numberOfKeys; i++)
{
ArrayType *arrayval;
int16 elmlen;
bool elmbyval;
char elmalign;
int num_elems;
Datum *elem_values;
bool *elem_nulls;
int num_nonnulls;
int j;
cur = &so->arrayKeyData[i];
if (!(cur->sk_flags & SK_SEARCHARRAY))
continue;
/*
* First, deconstruct the array into elements. Anything allocated
* here (including a possibly detoasted array value) is in the
* workspace context.
*/
arrayval = DatumGetArrayTypeP(cur->sk_argument);
/* We could cache this data, but not clear it's worth it */
get_typlenbyvalalign(ARR_ELEMTYPE(arrayval),
&elmlen, &elmbyval, &elmalign);
deconstruct_array(arrayval,
ARR_ELEMTYPE(arrayval),
elmlen, elmbyval, elmalign,
&elem_values, &elem_nulls, &num_elems);
/*
* Compress out any null elements. We can ignore them since we assume
* all btree operators are strict.
*/
num_nonnulls = 0;
for (j = 0; j < num_elems; j++)
{
if (!elem_nulls[j])
elem_values[num_nonnulls++] = elem_values[j];
}
/* We could pfree(elem_nulls) now, but not worth the cycles */
/* If there's no non-nulls, the scan qual is unsatisfiable */
if (num_nonnulls == 0)
{
numArrayKeys = -1;
break;
}
/*
* If the comparison operator is not equality, then the array qual
* degenerates to a simple comparison against the smallest or largest
* non-null array element, as appropriate.
*/
switch (cur->sk_strategy)
{
case BTLessStrategyNumber:
case BTLessEqualStrategyNumber:
cur->sk_argument =
_bt_find_extreme_element(scan, cur,
BTGreaterStrategyNumber,
elem_values, num_nonnulls);
continue;
case BTEqualStrategyNumber:
/* proceed with rest of loop */
break;
case BTGreaterEqualStrategyNumber:
case BTGreaterStrategyNumber:
cur->sk_argument =
_bt_find_extreme_element(scan, cur,
BTLessStrategyNumber,
elem_values, num_nonnulls);
continue;
default:
elog(ERROR, "unrecognized StrategyNumber: %d",
(int) cur->sk_strategy);
break;
}
/*
* Sort the non-null elements and eliminate any duplicates. We must
* sort in the same ordering used by the index column, so that the
* successive primitive indexscans produce data in index order.
*/
num_elems = _bt_sort_array_elements(scan, cur,
(indoption[cur->sk_attno - 1] & INDOPTION_DESC) != 0,
elem_values, num_nonnulls);
/*
* And set up the BTArrayKeyInfo data.
*/
so->arrayKeys[numArrayKeys].scan_key = i;
so->arrayKeys[numArrayKeys].num_elems = num_elems;
so->arrayKeys[numArrayKeys].elem_values = elem_values;
numArrayKeys++;
}
so->numArrayKeys = numArrayKeys;
MemoryContextSwitchTo(oldContext);
}
/*
* _bt_find_extreme_element() -- get least or greatest array element
*
* scan and skey identify the index column, whose opfamily determines the
* comparison semantics. strat should be BTLessStrategyNumber to get the
* least element, or BTGreaterStrategyNumber to get the greatest.
*/
static Datum
_bt_find_extreme_element(IndexScanDesc scan, ScanKey skey,
StrategyNumber strat,
Datum *elems, int nelems)
{
Relation rel = scan->indexRelation;
Oid elemtype,
cmp_op;
RegProcedure cmp_proc;
FmgrInfo flinfo;
Datum result;
int i;
/*
* Determine the nominal datatype of the array elements. We have to
* support the convention that sk_subtype == InvalidOid means the opclass
* input type; this is a hack to simplify life for ScanKeyInit().
*/
elemtype = skey->sk_subtype;
if (elemtype == InvalidOid)
elemtype = rel->rd_opcintype[skey->sk_attno - 1];
/*
* Look up the appropriate comparison operator in the opfamily.
*
* Note: it's possible that this would fail, if the opfamily is incomplete,
* but it seems quite unlikely that an opfamily would omit non-cross-type
* comparison operators for any datatype that it supports at all.
*/
cmp_op = get_opfamily_member(rel->rd_opfamily[skey->sk_attno - 1],
elemtype,
elemtype,
strat);
if (!OidIsValid(cmp_op))
elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
strat, elemtype, elemtype,
rel->rd_opfamily[skey->sk_attno - 1]);
cmp_proc = get_opcode(cmp_op);
if (!RegProcedureIsValid(cmp_proc))
elog(ERROR, "missing oprcode for operator %u", cmp_op);
fmgr_info(cmp_proc, &flinfo);
Assert(nelems > 0);
result = elems[0];
for (i = 1; i < nelems; i++)
{
if (DatumGetBool(FunctionCall2Coll(&flinfo,
skey->sk_collation,
elems[i],
result)))
result = elems[i];
}
return result;
}
/*
* _bt_sort_array_elements() -- sort and de-dup array elements
*
* The array elements are sorted in-place, and the new number of elements
* after duplicate removal is returned.
*
* scan and skey identify the index column, whose opfamily determines the
* comparison semantics. If reverse is true, we sort in descending order.
*/
static int
_bt_sort_array_elements(IndexScanDesc scan, ScanKey skey,
bool reverse,
Datum *elems, int nelems)
{
Relation rel = scan->indexRelation;
Oid elemtype;
RegProcedure cmp_proc;
BTSortArrayContext cxt;
int last_non_dup;
int i;
if (nelems <= 1)
return nelems; /* no work to do */
/*
* Determine the nominal datatype of the array elements. We have to
* support the convention that sk_subtype == InvalidOid means the opclass
* input type; this is a hack to simplify life for ScanKeyInit().
*/
elemtype = skey->sk_subtype;
if (elemtype == InvalidOid)
elemtype = rel->rd_opcintype[skey->sk_attno - 1];
/*
* Look up the appropriate comparison function in the opfamily.
*
* Note: it's possible that this would fail, if the opfamily is incomplete,
* but it seems quite unlikely that an opfamily would omit non-cross-type
* support functions for any datatype that it supports at all.
*/
cmp_proc = get_opfamily_proc(rel->rd_opfamily[skey->sk_attno - 1],
elemtype,
elemtype,
BTORDER_PROC);
if (!RegProcedureIsValid(cmp_proc))
elog(ERROR, "missing support function %d(%u,%u) in opfamily %u",
BTORDER_PROC, elemtype, elemtype,
rel->rd_opfamily[skey->sk_attno - 1]);
/* Sort the array elements */
fmgr_info(cmp_proc, &cxt.flinfo);
cxt.collation = skey->sk_collation;
cxt.reverse = reverse;
qsort_arg((void *) elems, nelems, sizeof(Datum),
_bt_compare_array_elements, (void *) &cxt);
/* Now scan the sorted elements and remove duplicates */
last_non_dup = 0;
for (i = 1; i < nelems; i++)
{
int32 compare;
compare = DatumGetInt32(FunctionCall2Coll(&cxt.flinfo,
cxt.collation,
elems[last_non_dup],
elems[i]));
if (compare != 0)
elems[++last_non_dup] = elems[i];
}
return last_non_dup + 1;
}
/*
* qsort_arg comparator for sorting array elements
*/
static int
_bt_compare_array_elements(const void *a, const void *b, void *arg)
{
Datum da = *((const Datum *) a);
Datum db = *((const Datum *) b);
BTSortArrayContext *cxt = (BTSortArrayContext *) arg;
int32 compare;
compare = DatumGetInt32(FunctionCall2Coll(&cxt->flinfo,
cxt->collation,
da, db));
if (cxt->reverse)
compare = -compare;
return compare;
}
/*
* _bt_start_array_keys() -- Initialize array keys at start of a scan
*
* Set up the cur_elem counters and fill in the first sk_argument value for
* each array scankey. We can't do this until we know the scan direction.
*/
void
_bt_start_array_keys(IndexScanDesc scan, ScanDirection dir)
{
BTScanOpaque so = (BTScanOpaque) scan->opaque;
int i;
for (i = 0; i < so->numArrayKeys; i++)
{
BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
ScanKey skey = &so->arrayKeyData[curArrayKey->scan_key];
Assert(curArrayKey->num_elems > 0);
if (ScanDirectionIsBackward(dir))
curArrayKey->cur_elem = curArrayKey->num_elems - 1;
else
curArrayKey->cur_elem = 0;
skey->sk_argument = curArrayKey->elem_values[curArrayKey->cur_elem];
}
}
/*
* _bt_advance_array_keys() -- Advance to next set of array elements
*
* Returns TRUE if there is another set of values to consider, FALSE if not.
* On TRUE result, the scankeys are initialized with the next set of values.
*/
bool
_bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir)
{
BTScanOpaque so = (BTScanOpaque) scan->opaque;
bool found = false;
int i;
/*
* We must advance the last array key most quickly, since it will
* correspond to the lowest-order index column among the available
* qualifications. This is necessary to ensure correct ordering of output
* when there are multiple array keys.
*/
for (i = so->numArrayKeys - 1; i >= 0; i--)
{
BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
ScanKey skey = &so->arrayKeyData[curArrayKey->scan_key];
int cur_elem = curArrayKey->cur_elem;
int num_elems = curArrayKey->num_elems;
if (ScanDirectionIsBackward(dir))
{
if (--cur_elem < 0)
{
cur_elem = num_elems - 1;
found = false; /* need to advance next array key */
}
else
found = true;
}
else
{
if (++cur_elem >= num_elems)
{
cur_elem = 0;
found = false; /* need to advance next array key */
}
else
found = true;
}
curArrayKey->cur_elem = cur_elem;
skey->sk_argument = curArrayKey->elem_values[cur_elem];
if (found)
break;
}
return found;
}
/*
* _bt_preprocess_keys() -- Preprocess scan keys
*
* The caller-supplied search-type keys (in scan->keyData[]) are copied to
* so->keyData[] with possible transformation. scan->numberOfKeys is
* the number of input keys, so->numberOfKeys gets the number of output
* keys (possibly less, never greater).
* The given search-type keys (in scan->keyData[] or so->arrayKeyData[])
* are copied to so->keyData[] with possible transformation.
* scan->numberOfKeys is the number of input keys, so->numberOfKeys gets
* the number of output keys (possibly less, never greater).
*
* The output keys are marked with additional sk_flag bits beyond the
* system-standard bits supplied by the caller. The DESC and NULLS_FIRST
@@ -226,8 +662,8 @@ _bt_freestack(BTStack stack)
*
* Note: the reason we have to copy the preprocessed scan keys into private
* storage is that we are modifying the array based on comparisons of the
* key argument values, which could change on a rescan. Therefore we can't
* overwrite the caller's data structure.
* key argument values, which could change on a rescan or after moving to
* new elements of array keys. Therefore we can't overwrite the source data.
*/
void
_bt_preprocess_keys(IndexScanDesc scan)
@@ -253,7 +689,14 @@ _bt_preprocess_keys(IndexScanDesc scan)
if (numberOfKeys < 1)
return; /* done if qual-less scan */
inkeys = scan->keyData;
/*
* Read so->arrayKeyData if array keys are present, else scan->keyData
*/
if (so->arrayKeyData != NULL)
inkeys = so->arrayKeyData;
else
inkeys = scan->keyData;
outkeys = so->keyData;
cur = &inkeys[0];
/* we check that input keys are correctly ordered */