1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-25 01:02:05 +03:00

Refactor handling of nbtree array redundancies.

Teach _bt_preprocess_array_keys to eliminate redundant array equality
scan keys directly, rather than just marking them as redundant.  Its
_bt_preprocess_keys caller is no longer required to ignore input scan
keys that were marked redundant in this way.  Oversights like the one
fixed by commit f22e17f7 are no longer possible.

The new scheme also makes it easier for _bt_preprocess_keys to output a
so.keyData[] scan key array with _more_ scan keys than it was passed in
its scan.keyData[] input scan key array.  An upcoming patch that adds
skip scan optimizations to nbtree will take advantage of this.

In passing, remove and rename certain _bt_preprocess_keys variables to
make the difference between our input scan key array and our output scan
key array clearer.

Author: Peter Geoghegan <pg@bowt.ie>
Reviewed-By: Tomas Vondra <tomas@vondra.me>
Discussion: https://postgr.es/m/CAH2-Wz=9A_UtM7HzUThSkQ+BcrQsQZuNhWOvQWK06PRkEp=SKQ@mail.gmail.com
This commit is contained in:
Peter Geoghegan
2024-09-21 13:25:49 -04:00
parent 54562c9cfa
commit b524974106

View File

@ -41,8 +41,8 @@ typedef struct BTSortArrayContext
typedef struct BTScanKeyPreproc typedef struct BTScanKeyPreproc
{ {
ScanKey skey; ScanKey inkey;
int ikey; int inkeyi;
int arrayidx; int arrayidx;
} BTScanKeyPreproc; } BTScanKeyPreproc;
@ -62,7 +62,7 @@ static bool _bt_compare_array_scankey_args(IndexScanDesc scan,
ScanKey arraysk, ScanKey skey, ScanKey arraysk, ScanKey skey,
FmgrInfo *orderproc, BTArrayKeyInfo *array, FmgrInfo *orderproc, BTArrayKeyInfo *array,
bool *qual_ok); bool *qual_ok);
static ScanKey _bt_preprocess_array_keys(IndexScanDesc scan); static ScanKey _bt_preprocess_array_keys(IndexScanDesc scan, int *new_numberOfKeys);
static void _bt_preprocess_array_keys_final(IndexScanDesc scan, int *keyDataMap); static void _bt_preprocess_array_keys_final(IndexScanDesc scan, int *keyDataMap);
static int _bt_compare_array_elements(const void *a, const void *b, void *arg); static int _bt_compare_array_elements(const void *a, const void *b, void *arg);
static inline int32 _bt_compare_array_skey(FmgrInfo *orderproc, static inline int32 _bt_compare_array_skey(FmgrInfo *orderproc,
@ -248,12 +248,13 @@ _bt_freestack(BTStack stack)
* as eliminating whole array scan keys as redundant. It can also allow us to * as eliminating whole array scan keys as redundant. It can also allow us to
* detect contradictory quals. * detect contradictory quals.
* *
* It is convenient for _bt_preprocess_keys caller to have to deal with no * Caller must pass *new_numberOfKeys to give us a way to change the number of
* more than one equality strategy array scan key per index attribute. We'll * scan keys that caller treats as input to standard preprocessing steps. The
* always be able to set things up that way when complete opfamilies are used. * returned array is smaller than scan->keyData[] when we could eliminate a
* Eliminated array scan keys can be recognized as those that have had their * redundant array scan key (redundant with another array scan key). It is
* sk_strategy field set to InvalidStrategy here by us. Caller should avoid * convenient for _bt_preprocess_keys caller to have to deal with no more than
* including these in the scan's so->keyData[] output array. * one equality strategy array scan key per index attribute. We'll always be
* able to set things up that way when complete opfamilies are used.
* *
* We set the scan key references from the scan's BTArrayKeyInfo info array to * We set the scan key references from the scan's BTArrayKeyInfo info array to
* offsets into the temp modified input array returned to caller. Scans that * offsets into the temp modified input array returned to caller. Scans that
@ -266,13 +267,14 @@ _bt_freestack(BTStack stack)
* without supplying a new set of scankey data. * without supplying a new set of scankey data.
*/ */
static ScanKey static ScanKey
_bt_preprocess_array_keys(IndexScanDesc scan) _bt_preprocess_array_keys(IndexScanDesc scan, int *new_numberOfKeys)
{ {
BTScanOpaque so = (BTScanOpaque) scan->opaque; BTScanOpaque so = (BTScanOpaque) scan->opaque;
Relation rel = scan->indexRelation; Relation rel = scan->indexRelation;
int numberOfKeys = scan->numberOfKeys; int numberOfKeys = scan->numberOfKeys;
int16 *indoption = rel->rd_indoption; int16 *indoption = rel->rd_indoption;
int numArrayKeys; int numArrayKeys,
output_ikey = 0;
int origarrayatt = InvalidAttrNumber, int origarrayatt = InvalidAttrNumber,
origarraykey = -1; origarraykey = -1;
Oid origelemtype = InvalidOid; Oid origelemtype = InvalidOid;
@ -317,9 +319,8 @@ _bt_preprocess_array_keys(IndexScanDesc scan)
oldContext = MemoryContextSwitchTo(so->arrayContext); oldContext = MemoryContextSwitchTo(so->arrayContext);
/* Create modifiable copy of scan->keyData in the workspace context */ /* Create output scan keys in the workspace context */
arrayKeyData = (ScanKey) palloc(numberOfKeys * sizeof(ScanKeyData)); arrayKeyData = (ScanKey) palloc(numberOfKeys * sizeof(ScanKeyData));
memcpy(arrayKeyData, scan->keyData, numberOfKeys * sizeof(ScanKeyData));
/* Allocate space for per-array data in the workspace context */ /* Allocate space for per-array data in the workspace context */
so->arrayKeys = (BTArrayKeyInfo *) palloc(numArrayKeys * sizeof(BTArrayKeyInfo)); so->arrayKeys = (BTArrayKeyInfo *) palloc(numArrayKeys * sizeof(BTArrayKeyInfo));
@ -329,7 +330,7 @@ _bt_preprocess_array_keys(IndexScanDesc scan)
/* Now process each array key */ /* Now process each array key */
numArrayKeys = 0; numArrayKeys = 0;
for (int i = 0; i < numberOfKeys; i++) for (int input_ikey = 0; input_ikey < numberOfKeys; input_ikey++)
{ {
FmgrInfo sortproc; FmgrInfo sortproc;
FmgrInfo *sortprocp = &sortproc; FmgrInfo *sortprocp = &sortproc;
@ -345,14 +346,21 @@ _bt_preprocess_array_keys(IndexScanDesc scan)
int num_nonnulls; int num_nonnulls;
int j; int j;
cur = &arrayKeyData[i]; /*
* Provisionally copy scan key into arrayKeyData[] array we'll return
* to _bt_preprocess_keys caller
*/
cur = &arrayKeyData[output_ikey];
*cur = scan->keyData[input_ikey];
if (!(cur->sk_flags & SK_SEARCHARRAY)) if (!(cur->sk_flags & SK_SEARCHARRAY))
{
output_ikey++; /* keep this non-array scan key */
continue; continue;
}
/* /*
* First, deconstruct the array into elements. Anything allocated * Deconstruct the array into elements
* here (including a possibly detoasted array value) is in the
* workspace context.
*/ */
arrayval = DatumGetArrayTypeP(cur->sk_argument); arrayval = DatumGetArrayTypeP(cur->sk_argument);
/* We could cache this data, but not clear it's worth it */ /* We could cache this data, but not clear it's worth it */
@ -406,6 +414,7 @@ _bt_preprocess_array_keys(IndexScanDesc scan)
_bt_find_extreme_element(scan, cur, elemtype, _bt_find_extreme_element(scan, cur, elemtype,
BTGreaterStrategyNumber, BTGreaterStrategyNumber,
elem_values, num_nonnulls); elem_values, num_nonnulls);
output_ikey++; /* keep this transformed scan key */
continue; continue;
case BTEqualStrategyNumber: case BTEqualStrategyNumber:
/* proceed with rest of loop */ /* proceed with rest of loop */
@ -416,6 +425,7 @@ _bt_preprocess_array_keys(IndexScanDesc scan)
_bt_find_extreme_element(scan, cur, elemtype, _bt_find_extreme_element(scan, cur, elemtype,
BTLessStrategyNumber, BTLessStrategyNumber,
elem_values, num_nonnulls); elem_values, num_nonnulls);
output_ikey++; /* keep this transformed scan key */
continue; continue;
default: default:
elog(ERROR, "unrecognized StrategyNumber: %d", elog(ERROR, "unrecognized StrategyNumber: %d",
@ -432,7 +442,7 @@ _bt_preprocess_array_keys(IndexScanDesc scan)
* sortproc just points to the same proc used during binary searches. * sortproc just points to the same proc used during binary searches.
*/ */
_bt_setup_array_cmp(scan, cur, elemtype, _bt_setup_array_cmp(scan, cur, elemtype,
&so->orderProcs[i], &sortprocp); &so->orderProcs[output_ikey], &sortprocp);
/* /*
* Sort the non-null elements and eliminate any duplicates. We must * Sort the non-null elements and eliminate any duplicates. We must
@ -476,11 +486,7 @@ _bt_preprocess_array_keys(IndexScanDesc scan)
break; break;
} }
/* /* Throw away this scan key/array */
* Indicate to _bt_preprocess_keys caller that it must ignore
* this scan key
*/
cur->sk_strategy = InvalidStrategy;
continue; continue;
} }
@ -511,13 +517,17 @@ _bt_preprocess_array_keys(IndexScanDesc scan)
* Note: _bt_preprocess_array_keys_final will fix-up each array's * Note: _bt_preprocess_array_keys_final will fix-up each array's
* scan_key field later on, after so->keyData[] has been finalized. * scan_key field later on, after so->keyData[] has been finalized.
*/ */
so->arrayKeys[numArrayKeys].scan_key = i; so->arrayKeys[numArrayKeys].scan_key = output_ikey;
so->arrayKeys[numArrayKeys].num_elems = num_elems; so->arrayKeys[numArrayKeys].num_elems = num_elems;
so->arrayKeys[numArrayKeys].elem_values = elem_values; so->arrayKeys[numArrayKeys].elem_values = elem_values;
numArrayKeys++; numArrayKeys++;
output_ikey++; /* keep this scan key/array */
} }
/* Set final number of equality-type array keys */
so->numArrayKeys = numArrayKeys; so->numArrayKeys = numArrayKeys;
/* Set number of scan keys remaining in arrayKeyData[] */
*new_numberOfKeys = output_ikey;
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
@ -2463,7 +2473,8 @@ end_toplevel_scan:
* The given search-type keys (taken from scan->keyData[]) * The given search-type keys (taken from scan->keyData[])
* are copied to so->keyData[] with possible transformation. * are copied to so->keyData[] with possible transformation.
* scan->numberOfKeys is the number of input keys, so->numberOfKeys gets * scan->numberOfKeys is the number of input keys, so->numberOfKeys gets
* the number of output keys (possibly less, never greater). * the number of output keys. Calling here a second or subsequent time
* (during the same btrescan) is a no-op.
* *
* The output keys are marked with additional sk_flags bits beyond the * The output keys are marked with additional sk_flags bits beyond the
* system-standard bits supplied by the caller. The DESC and NULLS_FIRST * system-standard bits supplied by the caller. The DESC and NULLS_FIRST
@ -2551,12 +2562,8 @@ _bt_preprocess_keys(IndexScanDesc scan)
int new_numberOfKeys; int new_numberOfKeys;
int numberOfEqualCols; int numberOfEqualCols;
ScanKey inkeys; ScanKey inkeys;
ScanKey outkeys;
ScanKey cur;
BTScanKeyPreproc xform[BTMaxStrategyNumber]; BTScanKeyPreproc xform[BTMaxStrategyNumber];
bool test_result; bool test_result;
int i,
j;
AttrNumber attno; AttrNumber attno;
ScanKey arrayKeyData; ScanKey arrayKeyData;
int *keyDataMap = NULL; int *keyDataMap = NULL;
@ -2584,7 +2591,7 @@ _bt_preprocess_keys(IndexScanDesc scan)
return; /* done if qual-less scan */ return; /* done if qual-less scan */
/* If any keys are SK_SEARCHARRAY type, set up array-key info */ /* If any keys are SK_SEARCHARRAY type, set up array-key info */
arrayKeyData = _bt_preprocess_array_keys(scan); arrayKeyData = _bt_preprocess_array_keys(scan, &numberOfKeys);
if (!so->qual_ok) if (!so->qual_ok)
{ {
/* unmatchable array, so give up */ /* unmatchable array, so give up */
@ -2607,23 +2614,21 @@ _bt_preprocess_keys(IndexScanDesc scan)
else else
inkeys = scan->keyData; inkeys = scan->keyData;
outkeys = so->keyData;
cur = &inkeys[0];
/* we check that input keys are correctly ordered */ /* we check that input keys are correctly ordered */
if (cur->sk_attno < 1) if (inkeys[0].sk_attno < 1)
elog(ERROR, "btree index keys must be ordered by attribute"); elog(ERROR, "btree index keys must be ordered by attribute");
/* We can short-circuit most of the work if there's just one key */ /* We can short-circuit most of the work if there's just one key */
if (numberOfKeys == 1) if (numberOfKeys == 1)
{ {
/* Apply indoption to scankey (might change sk_strategy!) */ /* Apply indoption to scankey (might change sk_strategy!) */
if (!_bt_fix_scankey_strategy(cur, indoption)) if (!_bt_fix_scankey_strategy(&inkeys[0], indoption))
so->qual_ok = false; so->qual_ok = false;
memcpy(outkeys, cur, sizeof(ScanKeyData)); memcpy(&so->keyData[0], &inkeys[0], sizeof(ScanKeyData));
so->numberOfKeys = 1; so->numberOfKeys = 1;
/* We can mark the qual as required if it's for first index col */ /* We can mark the qual as required if it's for first index col */
if (cur->sk_attno == 1) if (inkeys[0].sk_attno == 1)
_bt_mark_scankey_required(outkeys); _bt_mark_scankey_required(&so->keyData[0]);
if (arrayKeyData) if (arrayKeyData)
{ {
/* /*
@ -2631,8 +2636,8 @@ _bt_preprocess_keys(IndexScanDesc scan)
* (we'll miss out on the single value array transformation, but * (we'll miss out on the single value array transformation, but
* that's not nearly as important when there's only one scan key) * that's not nearly as important when there's only one scan key)
*/ */
Assert(cur->sk_flags & SK_SEARCHARRAY); Assert(so->keyData[0].sk_flags & SK_SEARCHARRAY);
Assert(cur->sk_strategy != BTEqualStrategyNumber || Assert(so->keyData[0].sk_strategy != BTEqualStrategyNumber ||
(so->arrayKeys[0].scan_key == 0 && (so->arrayKeys[0].scan_key == 0 &&
OidIsValid(so->orderProcs[0].fn_oid))); OidIsValid(so->orderProcs[0].fn_oid)));
} }
@ -2660,12 +2665,15 @@ _bt_preprocess_keys(IndexScanDesc scan)
* handle after-last-key processing. Actual exit from the loop is at the * handle after-last-key processing. Actual exit from the loop is at the
* "break" statement below. * "break" statement below.
*/ */
for (i = 0;; cur++, i++) for (int i = 0;; i++)
{ {
ScanKey inkey = inkeys + i;
int j;
if (i < numberOfKeys) if (i < numberOfKeys)
{ {
/* Apply indoption to scankey (might change sk_strategy!) */ /* Apply indoption to scankey (might change sk_strategy!) */
if (!_bt_fix_scankey_strategy(cur, indoption)) if (!_bt_fix_scankey_strategy(inkey, indoption))
{ {
/* NULL can't be matched, so give up */ /* NULL can't be matched, so give up */
so->qual_ok = false; so->qual_ok = false;
@ -2677,12 +2685,12 @@ _bt_preprocess_keys(IndexScanDesc scan)
* If we are at the end of the keys for a particular attr, finish up * If we are at the end of the keys for a particular attr, finish up
* processing and emit the cleaned-up keys. * processing and emit the cleaned-up keys.
*/ */
if (i == numberOfKeys || cur->sk_attno != attno) if (i == numberOfKeys || inkey->sk_attno != attno)
{ {
int priorNumberOfEqualCols = numberOfEqualCols; int priorNumberOfEqualCols = numberOfEqualCols;
/* check input keys are correctly ordered */ /* check input keys are correctly ordered */
if (i < numberOfKeys && cur->sk_attno < attno) if (i < numberOfKeys && inkey->sk_attno < attno)
elog(ERROR, "btree index keys must be ordered by attribute"); elog(ERROR, "btree index keys must be ordered by attribute");
/* /*
@ -2696,9 +2704,9 @@ _bt_preprocess_keys(IndexScanDesc scan)
* check, and we've rejected any combination of it with a regular * check, and we've rejected any combination of it with a regular
* equality condition; but not with other types of conditions. * equality condition; but not with other types of conditions.
*/ */
if (xform[BTEqualStrategyNumber - 1].skey) if (xform[BTEqualStrategyNumber - 1].inkey)
{ {
ScanKey eq = xform[BTEqualStrategyNumber - 1].skey; ScanKey eq = xform[BTEqualStrategyNumber - 1].inkey;
BTArrayKeyInfo *array = NULL; BTArrayKeyInfo *array = NULL;
FmgrInfo *orderproc = NULL; FmgrInfo *orderproc = NULL;
@ -2707,7 +2715,7 @@ _bt_preprocess_keys(IndexScanDesc scan)
int eq_in_ikey, int eq_in_ikey,
eq_arrayidx; eq_arrayidx;
eq_in_ikey = xform[BTEqualStrategyNumber - 1].ikey; eq_in_ikey = xform[BTEqualStrategyNumber - 1].inkeyi;
eq_arrayidx = xform[BTEqualStrategyNumber - 1].arrayidx; eq_arrayidx = xform[BTEqualStrategyNumber - 1].arrayidx;
array = &so->arrayKeys[eq_arrayidx - 1]; array = &so->arrayKeys[eq_arrayidx - 1];
orderproc = so->orderProcs + eq_in_ikey; orderproc = so->orderProcs + eq_in_ikey;
@ -2718,7 +2726,7 @@ _bt_preprocess_keys(IndexScanDesc scan)
for (j = BTMaxStrategyNumber; --j >= 0;) for (j = BTMaxStrategyNumber; --j >= 0;)
{ {
ScanKey chk = xform[j].skey; ScanKey chk = xform[j].inkey;
if (!chk || j == (BTEqualStrategyNumber - 1)) if (!chk || j == (BTEqualStrategyNumber - 1))
continue; continue;
@ -2742,8 +2750,8 @@ _bt_preprocess_keys(IndexScanDesc scan)
} }
/* else discard the redundant non-equality key */ /* else discard the redundant non-equality key */
Assert(!array || array->num_elems > 0); Assert(!array || array->num_elems > 0);
xform[j].skey = NULL; xform[j].inkey = NULL;
xform[j].ikey = -1; xform[j].inkeyi = -1;
} }
/* else, cannot determine redundancy, keep both keys */ /* else, cannot determine redundancy, keep both keys */
} }
@ -2752,53 +2760,53 @@ _bt_preprocess_keys(IndexScanDesc scan)
} }
/* try to keep only one of <, <= */ /* try to keep only one of <, <= */
if (xform[BTLessStrategyNumber - 1].skey if (xform[BTLessStrategyNumber - 1].inkey &&
&& xform[BTLessEqualStrategyNumber - 1].skey) xform[BTLessEqualStrategyNumber - 1].inkey)
{ {
ScanKey lt = xform[BTLessStrategyNumber - 1].skey; ScanKey lt = xform[BTLessStrategyNumber - 1].inkey;
ScanKey le = xform[BTLessEqualStrategyNumber - 1].skey; ScanKey le = xform[BTLessEqualStrategyNumber - 1].inkey;
if (_bt_compare_scankey_args(scan, le, lt, le, NULL, NULL, if (_bt_compare_scankey_args(scan, le, lt, le, NULL, NULL,
&test_result)) &test_result))
{ {
if (test_result) if (test_result)
xform[BTLessEqualStrategyNumber - 1].skey = NULL; xform[BTLessEqualStrategyNumber - 1].inkey = NULL;
else else
xform[BTLessStrategyNumber - 1].skey = NULL; xform[BTLessStrategyNumber - 1].inkey = NULL;
} }
} }
/* try to keep only one of >, >= */ /* try to keep only one of >, >= */
if (xform[BTGreaterStrategyNumber - 1].skey if (xform[BTGreaterStrategyNumber - 1].inkey &&
&& xform[BTGreaterEqualStrategyNumber - 1].skey) xform[BTGreaterEqualStrategyNumber - 1].inkey)
{ {
ScanKey gt = xform[BTGreaterStrategyNumber - 1].skey; ScanKey gt = xform[BTGreaterStrategyNumber - 1].inkey;
ScanKey ge = xform[BTGreaterEqualStrategyNumber - 1].skey; ScanKey ge = xform[BTGreaterEqualStrategyNumber - 1].inkey;
if (_bt_compare_scankey_args(scan, ge, gt, ge, NULL, NULL, if (_bt_compare_scankey_args(scan, ge, gt, ge, NULL, NULL,
&test_result)) &test_result))
{ {
if (test_result) if (test_result)
xform[BTGreaterEqualStrategyNumber - 1].skey = NULL; xform[BTGreaterEqualStrategyNumber - 1].inkey = NULL;
else else
xform[BTGreaterStrategyNumber - 1].skey = NULL; xform[BTGreaterStrategyNumber - 1].inkey = NULL;
} }
} }
/* /*
* Emit the cleaned-up keys into the outkeys[] array, and then * Emit the cleaned-up keys into the so->keyData[] array, and then
* mark them if they are required. They are required (possibly * mark them if they are required. They are required (possibly
* only in one direction) if all attrs before this one had "=". * only in one direction) if all attrs before this one had "=".
*/ */
for (j = BTMaxStrategyNumber; --j >= 0;) for (j = BTMaxStrategyNumber; --j >= 0;)
{ {
if (xform[j].skey) if (xform[j].inkey)
{ {
ScanKey outkey = &outkeys[new_numberOfKeys++]; ScanKey outkey = &so->keyData[new_numberOfKeys++];
memcpy(outkey, xform[j].skey, sizeof(ScanKeyData)); memcpy(outkey, xform[j].inkey, sizeof(ScanKeyData));
if (arrayKeyData) if (arrayKeyData)
keyDataMap[new_numberOfKeys - 1] = xform[j].ikey; keyDataMap[new_numberOfKeys - 1] = xform[j].inkeyi;
if (priorNumberOfEqualCols == attno - 1) if (priorNumberOfEqualCols == attno - 1)
_bt_mark_scankey_required(outkey); _bt_mark_scankey_required(outkey);
} }
@ -2811,19 +2819,19 @@ _bt_preprocess_keys(IndexScanDesc scan)
break; break;
/* Re-initialize for new attno */ /* Re-initialize for new attno */
attno = cur->sk_attno; attno = inkey->sk_attno;
memset(xform, 0, sizeof(xform)); memset(xform, 0, sizeof(xform));
} }
/* check strategy this key's operator corresponds to */ /* check strategy this key's operator corresponds to */
j = cur->sk_strategy - 1; j = inkey->sk_strategy - 1;
/* if row comparison, push it directly to the output array */ /* if row comparison, push it directly to the output array */
if (cur->sk_flags & SK_ROW_HEADER) if (inkey->sk_flags & SK_ROW_HEADER)
{ {
ScanKey outkey = &outkeys[new_numberOfKeys++]; ScanKey outkey = &so->keyData[new_numberOfKeys++];
memcpy(outkey, cur, sizeof(ScanKeyData)); memcpy(outkey, inkey, sizeof(ScanKeyData));
if (arrayKeyData) if (arrayKeyData)
keyDataMap[new_numberOfKeys - 1] = i; keyDataMap[new_numberOfKeys - 1] = i;
if (numberOfEqualCols == attno - 1) if (numberOfEqualCols == attno - 1)
@ -2837,21 +2845,10 @@ _bt_preprocess_keys(IndexScanDesc scan)
continue; continue;
} }
/* if (inkey->sk_strategy == BTEqualStrategyNumber &&
* Does this input scan key require further processing as an array? (inkey->sk_flags & SK_SEARCHARRAY))
*/
if (cur->sk_strategy == InvalidStrategy)
{ {
/* _bt_preprocess_array_keys marked this array key redundant */ /* must track how input scan keys map to arrays */
Assert(arrayKeyData);
Assert(cur->sk_flags & SK_SEARCHARRAY);
continue;
}
if (cur->sk_strategy == BTEqualStrategyNumber &&
(cur->sk_flags & SK_SEARCHARRAY))
{
/* _bt_preprocess_array_keys kept this array key */
Assert(arrayKeyData); Assert(arrayKeyData);
arrayidx++; arrayidx++;
} }
@ -2860,11 +2857,11 @@ _bt_preprocess_keys(IndexScanDesc scan)
* have we seen a scan key for this same attribute and using this same * have we seen a scan key for this same attribute and using this same
* operator strategy before now? * operator strategy before now?
*/ */
if (xform[j].skey == NULL) if (xform[j].inkey == NULL)
{ {
/* nope, so this scan key wins by default (at least for now) */ /* nope, so this scan key wins by default (at least for now) */
xform[j].skey = cur; xform[j].inkey = inkey;
xform[j].ikey = i; xform[j].inkeyi = i;
xform[j].arrayidx = arrayidx; xform[j].arrayidx = arrayidx;
} }
else else
@ -2881,7 +2878,7 @@ _bt_preprocess_keys(IndexScanDesc scan)
/* /*
* Have to set up array keys * Have to set up array keys
*/ */
if ((cur->sk_flags & SK_SEARCHARRAY)) if (inkey->sk_flags & SK_SEARCHARRAY)
{ {
array = &so->arrayKeys[arrayidx - 1]; array = &so->arrayKeys[arrayidx - 1];
orderproc = so->orderProcs + i; orderproc = so->orderProcs + i;
@ -2889,12 +2886,12 @@ _bt_preprocess_keys(IndexScanDesc scan)
Assert(array->scan_key == i); Assert(array->scan_key == i);
Assert(OidIsValid(orderproc->fn_oid)); Assert(OidIsValid(orderproc->fn_oid));
} }
else if ((xform[j].skey->sk_flags & SK_SEARCHARRAY)) else if (xform[j].inkey->sk_flags & SK_SEARCHARRAY)
{ {
array = &so->arrayKeys[xform[j].arrayidx - 1]; array = &so->arrayKeys[xform[j].arrayidx - 1];
orderproc = so->orderProcs + xform[j].ikey; orderproc = so->orderProcs + xform[j].inkeyi;
Assert(array->scan_key == xform[j].ikey); Assert(array->scan_key == xform[j].inkeyi);
Assert(OidIsValid(orderproc->fn_oid)); Assert(OidIsValid(orderproc->fn_oid));
} }
@ -2909,7 +2906,7 @@ _bt_preprocess_keys(IndexScanDesc scan)
*/ */
} }
if (_bt_compare_scankey_args(scan, cur, cur, xform[j].skey, if (_bt_compare_scankey_args(scan, inkey, inkey, xform[j].inkey,
array, orderproc, &test_result)) array, orderproc, &test_result))
{ {
/* Have all we need to determine redundancy */ /* Have all we need to determine redundancy */
@ -2921,10 +2918,10 @@ _bt_preprocess_keys(IndexScanDesc scan)
* New key is more restrictive, and so replaces old key... * New key is more restrictive, and so replaces old key...
*/ */
if (j != (BTEqualStrategyNumber - 1) || if (j != (BTEqualStrategyNumber - 1) ||
!(xform[j].skey->sk_flags & SK_SEARCHARRAY)) !(xform[j].inkey->sk_flags & SK_SEARCHARRAY))
{ {
xform[j].skey = cur; xform[j].inkey = inkey;
xform[j].ikey = i; xform[j].inkeyi = i;
xform[j].arrayidx = arrayidx; xform[j].arrayidx = arrayidx;
} }
else else
@ -2936,7 +2933,7 @@ _bt_preprocess_keys(IndexScanDesc scan)
* scan key. _bt_compare_scankey_args expects us to * scan key. _bt_compare_scankey_args expects us to
* always keep arrays (and discard non-arrays). * always keep arrays (and discard non-arrays).
*/ */
Assert(!(cur->sk_flags & SK_SEARCHARRAY)); Assert(!(inkey->sk_flags & SK_SEARCHARRAY));
} }
} }
else if (j == (BTEqualStrategyNumber - 1)) else if (j == (BTEqualStrategyNumber - 1))
@ -2959,15 +2956,15 @@ _bt_preprocess_keys(IndexScanDesc scan)
* even with incomplete opfamilies. _bt_advance_array_keys * even with incomplete opfamilies. _bt_advance_array_keys
* depends on this. * depends on this.
*/ */
ScanKey outkey = &outkeys[new_numberOfKeys++]; ScanKey outkey = &so->keyData[new_numberOfKeys++];
memcpy(outkey, xform[j].skey, sizeof(ScanKeyData)); memcpy(outkey, xform[j].inkey, sizeof(ScanKeyData));
if (arrayKeyData) if (arrayKeyData)
keyDataMap[new_numberOfKeys - 1] = xform[j].ikey; keyDataMap[new_numberOfKeys - 1] = xform[j].inkeyi;
if (numberOfEqualCols == attno - 1) if (numberOfEqualCols == attno - 1)
_bt_mark_scankey_required(outkey); _bt_mark_scankey_required(outkey);
xform[j].skey = cur; xform[j].inkey = inkey;
xform[j].ikey = i; xform[j].inkeyi = i;
xform[j].arrayidx = arrayidx; xform[j].arrayidx = arrayidx;
} }
} }
@ -3380,13 +3377,6 @@ _bt_fix_scankey_strategy(ScanKey skey, int16 *indoption)
return true; return true;
} }
if (skey->sk_strategy == InvalidStrategy)
{
/* Already-eliminated array scan key; don't need to fix anything */
Assert(skey->sk_flags & SK_SEARCHARRAY);
return true;
}
/* Adjust strategy for DESC, if we didn't already */ /* Adjust strategy for DESC, if we didn't already */
if ((addflags & SK_BT_DESC) && !(skey->sk_flags & SK_BT_DESC)) if ((addflags & SK_BT_DESC) && !(skey->sk_flags & SK_BT_DESC))
skey->sk_strategy = BTCommuteStrategyNumber(skey->sk_strategy); skey->sk_strategy = BTCommuteStrategyNumber(skey->sk_strategy);