1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-22 14:32:25 +03:00

Improve implementation of range-contains-element tests.

Implement these tests directly instead of constructing a singleton range
and then applying range-contains.  This saves a range serialize/deserialize
cycle as well as a couple of redundant bound-comparison steps, and adds
very little code on net.

Remove elem_contained_by_range from the GiST opclass: it doesn't belong
there because there is no way to use it in an index clause (where the
indexed column would have to be on the left).  Its commutator is in the
opclass, and that's what counts.
This commit is contained in:
Tom Lane
2011-11-22 17:45:02 -05:00
parent f1b4aa2a84
commit cddc819e45
5 changed files with 92 additions and 84 deletions

View File

@@ -65,6 +65,8 @@ static char *range_deparse(char flags, const char *lbound_str,
static char *range_bound_escape(const char *value);
static bool range_contains_internal(TypeCacheEntry *typcache,
RangeType *r1, RangeType *r2);
static bool range_contains_elem_internal(TypeCacheEntry *typcache,
RangeType *r, Datum val);
static Size datum_compute_size(Size sz, Datum datum, bool typbyval,
char typalign, int16 typlen, char typstorage);
static Pointer datum_write(Pointer ptr, Datum datum, bool typbyval,
@@ -555,18 +557,13 @@ range_upper_inf(PG_FUNCTION_ARGS)
Datum
range_contains_elem(PG_FUNCTION_ARGS)
{
RangeType *r1 = PG_GETARG_RANGE(0);
RangeType *r = PG_GETARG_RANGE(0);
Datum val = PG_GETARG_DATUM(1);
TypeCacheEntry *typcache;
RangeType *r2;
typcache = range_get_typcache(fcinfo, RangeTypeGetOid(r1));
typcache = range_get_typcache(fcinfo, RangeTypeGetOid(r));
/* Construct a singleton range representing just "val" */
r2 = make_singleton_range(typcache, val);
/* And use range_contains */
PG_RETURN_BOOL(range_contains_internal(typcache, r1, r2));
PG_RETURN_BOOL(range_contains_elem_internal(typcache, r, val));
}
/* contained by? */
@@ -574,17 +571,12 @@ Datum
elem_contained_by_range(PG_FUNCTION_ARGS)
{
Datum val = PG_GETARG_DATUM(0);
RangeType *r1 = PG_GETARG_RANGE(1);
RangeType *r = PG_GETARG_RANGE(1);
TypeCacheEntry *typcache;
RangeType *r2;
typcache = range_get_typcache(fcinfo, RangeTypeGetOid(r1));
typcache = range_get_typcache(fcinfo, RangeTypeGetOid(r));
/* Construct a singleton range representing just "val" */
r2 = make_singleton_range(typcache, val);
/* And use range_contains */
PG_RETURN_BOOL(range_contains_internal(typcache, r1, r2));
PG_RETURN_BOOL(range_contains_elem_internal(typcache, r, val));
}
@@ -2173,6 +2165,47 @@ range_contains_internal(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
return true;
}
/*
* Test whether range r contains a specific element value.
*/
static bool
range_contains_elem_internal(TypeCacheEntry *typcache, RangeType *r, Datum val)
{
RangeBound lower;
RangeBound upper;
bool empty;
int32 cmp;
range_deserialize(typcache, r, &lower, &upper, &empty);
if (empty)
return false;
if (!lower.infinite)
{
cmp = DatumGetInt32(FunctionCall2Coll(&typcache->rng_cmp_proc_finfo,
typcache->rng_collation,
lower.val, val));
if (cmp > 0)
return false;
if (cmp == 0 && !lower.inclusive)
return false;
}
if (!upper.infinite)
{
cmp = DatumGetInt32(FunctionCall2Coll(&typcache->rng_cmp_proc_finfo,
typcache->rng_collation,
upper.val, val));
if (cmp < 0)
return false;
if (cmp == 0 && !upper.inclusive)
return false;
}
return true;
}
/*
* datum_compute_size() and datum_write() are used to insert the bound

View File

@@ -31,7 +31,6 @@
#define RANGESTRAT_CONTAINS 7
#define RANGESTRAT_CONTAINED_BY 8
#define RANGESTRAT_CONTAINS_ELEM 16
#define RANGESTRAT_ELEM_CONTAINED_BY 17
#define RANGESTRAT_EQ 18
#define RANGESTRAT_NE 19
@@ -48,11 +47,11 @@ typedef struct
static RangeType *range_super_union(TypeCacheEntry *typcache, RangeType * r1,
RangeType * r2);
static bool range_gist_consistent_int(FmgrInfo *flinfo,
StrategyNumber strategy, RangeType * key,
RangeType * query);
StrategyNumber strategy, RangeType *key,
Datum query);
static bool range_gist_consistent_leaf(FmgrInfo *flinfo,
StrategyNumber strategy, RangeType * key,
RangeType * query);
StrategyNumber strategy, RangeType *key,
Datum query);
static int sort_item_cmp(const void *a, const void *b);
@@ -61,39 +60,15 @@ Datum
range_gist_consistent(PG_FUNCTION_ARGS)
{
GISTENTRY *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
Datum dquery = PG_GETARG_DATUM(1);
Datum query = PG_GETARG_DATUM(1);
StrategyNumber strategy = (StrategyNumber) PG_GETARG_UINT16(2);
/* Oid subtype = PG_GETARG_OID(3); */
bool *recheck = (bool *) PG_GETARG_POINTER(4);
RangeType *key = DatumGetRangeType(entry->key);
TypeCacheEntry *typcache;
RangeType *query;
/* All operators served by this function are exact */
*recheck = false;
switch (strategy)
{
/*
* For element contains and contained by operators, the other operand
* is a "point" of the subtype. Construct a singleton range
* containing just that value. (Since range_contains_elem and
* elem_contained_by_range would do that anyway, it's actually more
* efficient not less so to merge these cases into range containment
* at this step. But revisit this if we ever change the implementation
* of those functions.)
*/
case RANGESTRAT_CONTAINS_ELEM:
case RANGESTRAT_ELEM_CONTAINED_BY:
typcache = range_get_typcache(fcinfo, RangeTypeGetOid(key));
query = make_singleton_range(typcache, dquery);
break;
default:
query = DatumGetRangeType(dquery);
break;
}
if (GIST_LEAF(entry))
PG_RETURN_BOOL(range_gist_consistent_leaf(fcinfo->flinfo, strategy,
key, query));
@@ -170,21 +145,23 @@ range_gist_penalty(PG_FUNCTION_ARGS)
subtype_diff = &typcache->rng_subdiff_finfo;
/* we want to compare the size of "orig" to size of "orig union new" */
/*
* We want to compare the size of "orig" to size of "orig union new".
* The penalty will be the sum of the reduction in the lower bound plus
* the increase in the upper bound.
*/
s_union = range_super_union(typcache, orig, new);
range_deserialize(typcache, orig, &lower1, &upper1, &empty1);
range_deserialize(typcache, s_union, &lower2, &upper2, &empty2);
/* if orig isn't empty, s_union can't be either */
Assert(empty1 || !empty2);
/* handle cases where orig is empty */
if (empty1 && empty2)
{
*penalty = 0;
PG_RETURN_POINTER(penalty);
}
else if (empty1 && !empty2)
else if (empty1)
{
if (lower2.infinite || upper2.infinite)
{
@@ -199,6 +176,7 @@ range_gist_penalty(PG_FUNCTION_ARGS)
typcache->rng_collation,
upper2.val,
lower2.val));
/* upper2 must be >= lower2 */
if (*penalty < 0)
*penalty = 0; /* subtype_diff is broken */
PG_RETURN_POINTER(penalty);
@@ -211,46 +189,53 @@ range_gist_penalty(PG_FUNCTION_ARGS)
}
}
/* if orig isn't empty, s_union can't be either */
Assert(!empty2);
/* similarly, if orig's lower bound is infinite, s_union's must be too */
Assert(lower2.infinite || !lower1.infinite);
if (lower2.infinite && !lower1.infinite)
lower_diff = get_float8_infinity();
else if (lower2.infinite && lower1.infinite)
if (lower2.infinite && lower1.infinite)
lower_diff = 0;
else if (lower2.infinite)
lower_diff = get_float8_infinity();
else if (OidIsValid(subtype_diff->fn_oid))
{
lower_diff = DatumGetFloat8(FunctionCall2Coll(subtype_diff,
typcache->rng_collation,
lower1.val,
lower2.val));
/* orig's lower bound must be >= s_union's */
if (lower_diff < 0)
lower_diff = 0; /* subtype_diff is broken */
}
else
{
/* only know whether there is a difference or not */
lower_diff = (float) range_cmp_bounds(typcache, &lower1, &lower2);
lower_diff = range_cmp_bounds(typcache, &lower1, &lower2) > 0 ? 1 : 0;
}
/* similarly, if orig's upper bound is infinite, s_union's must be too */
Assert(upper2.infinite || !upper1.infinite);
if (upper2.infinite && !upper1.infinite)
upper_diff = get_float8_infinity();
else if (upper2.infinite && upper1.infinite)
if (upper2.infinite && upper1.infinite)
upper_diff = 0;
else if (upper2.infinite)
upper_diff = get_float8_infinity();
else if (OidIsValid(subtype_diff->fn_oid))
{
upper_diff = DatumGetFloat8(FunctionCall2Coll(subtype_diff,
typcache->rng_collation,
upper2.val,
upper1.val));
/* orig's upper bound must be <= s_union's */
if (upper_diff < 0)
upper_diff = 0; /* subtype_diff is broken */
}
else
{
/* only know whether there is a difference or not */
upper_diff = (float) range_cmp_bounds(typcache, &upper2, &upper1);
upper_diff = range_cmp_bounds(typcache, &upper2, &upper1) > 0 ? 1 : 0;
}
Assert(lower_diff >= 0 && upper_diff >= 0);
@@ -450,7 +435,7 @@ TrickFunctionCall2(PGFunction proc, FmgrInfo *flinfo, Datum arg1, Datum arg2)
*/
static bool
range_gist_consistent_int(FmgrInfo *flinfo, StrategyNumber strategy,
RangeType * key, RangeType * query)
RangeType *key, Datum query)
{
PGFunction proc;
bool negate = false;
@@ -486,22 +471,23 @@ range_gist_consistent_int(FmgrInfo *flinfo, StrategyNumber strategy,
negate = true;
break;
case RANGESTRAT_ADJACENT:
if (RangeIsEmpty(key) || RangeIsEmpty(query))
if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeType(query)))
return false;
if (DatumGetBool(TrickFunctionCall2(range_adjacent, flinfo,
RangeTypeGetDatum(key),
RangeTypeGetDatum(query))))
query)))
return true;
proc = range_overlaps;
break;
case RANGESTRAT_CONTAINS:
case RANGESTRAT_CONTAINS_ELEM:
proc = range_contains;
break;
case RANGESTRAT_CONTAINED_BY:
case RANGESTRAT_ELEM_CONTAINED_BY:
return true;
break;
case RANGESTRAT_CONTAINS_ELEM:
proc = range_contains_elem;
break;
case RANGESTRAT_EQ:
proc = range_contains;
break;
@@ -516,7 +502,7 @@ range_gist_consistent_int(FmgrInfo *flinfo, StrategyNumber strategy,
retval = DatumGetBool(TrickFunctionCall2(proc, flinfo,
RangeTypeGetDatum(key),
RangeTypeGetDatum(query)));
query));
if (negate)
retval = !retval;
@@ -528,48 +514,39 @@ range_gist_consistent_int(FmgrInfo *flinfo, StrategyNumber strategy,
*/
static bool
range_gist_consistent_leaf(FmgrInfo *flinfo, StrategyNumber strategy,
RangeType * key, RangeType * query)
RangeType *key, Datum query)
{
PGFunction proc;
switch (strategy)
{
case RANGESTRAT_BEFORE:
if (RangeIsEmpty(key) || RangeIsEmpty(query))
return false;
proc = range_before;
break;
case RANGESTRAT_OVERLEFT:
if (RangeIsEmpty(key) || RangeIsEmpty(query))
return false;
proc = range_overleft;
break;
case RANGESTRAT_OVERLAPS:
proc = range_overlaps;
break;
case RANGESTRAT_OVERRIGHT:
if (RangeIsEmpty(key) || RangeIsEmpty(query))
return false;
proc = range_overright;
break;
case RANGESTRAT_AFTER:
if (RangeIsEmpty(key) || RangeIsEmpty(query))
return false;
proc = range_after;
break;
case RANGESTRAT_ADJACENT:
if (RangeIsEmpty(key) || RangeIsEmpty(query))
return false;
proc = range_adjacent;
break;
case RANGESTRAT_CONTAINS:
case RANGESTRAT_CONTAINS_ELEM:
proc = range_contains;
break;
case RANGESTRAT_CONTAINED_BY:
case RANGESTRAT_ELEM_CONTAINED_BY:
proc = range_contained_by;
break;
case RANGESTRAT_CONTAINS_ELEM:
proc = range_contains_elem;
break;
case RANGESTRAT_EQ:
proc = range_eq;
break;
@@ -584,7 +561,7 @@ range_gist_consistent_leaf(FmgrInfo *flinfo, StrategyNumber strategy,
return DatumGetBool(TrickFunctionCall2(proc, flinfo,
RangeTypeGetDatum(key),
RangeTypeGetDatum(query)));
query));
}
/*
@@ -649,7 +626,7 @@ sort_item_cmp(const void *a, const void *b)
else if (lower2.infinite)
return 1;
else if (upper1.infinite && upper2.infinite)
return -1 * range_cmp_bounds(typcache, &lower1, &lower2);
return -(range_cmp_bounds(typcache, &lower1, &lower2));
else if (upper1.infinite)
return 1;
else if (upper2.infinite)

View File

@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201111211
#define CATALOG_VERSION_NO 201111221
#endif

View File

@@ -735,7 +735,6 @@ DATA(insert ( 3919 3831 3831 6 s 3897 783 0 ));
DATA(insert ( 3919 3831 3831 7 s 3890 783 0 ));
DATA(insert ( 3919 3831 3831 8 s 3892 783 0 ));
DATA(insert ( 3919 3831 2283 16 s 3889 783 0 ));
DATA(insert ( 3919 2283 3831 17 s 3891 783 0 ));
DATA(insert ( 3919 3831 3831 18 s 3882 783 0 ));
DATA(insert ( 3919 3831 3831 19 s 3883 783 0 ));

View File

@@ -1040,7 +1040,6 @@ ORDER BY 1, 2, 3;
783 | 14 | @
783 | 15 | <->
783 | 16 | @>
783 | 17 | <@
783 | 18 | =
783 | 19 | <>
783 | 27 | @>
@@ -1055,7 +1054,7 @@ ORDER BY 1, 2, 3;
2742 | 2 | @@@
2742 | 3 | <@
2742 | 4 | =
(45 rows)
(44 rows)
-- Check that all opclass search operators have selectivity estimators.
-- This is not absolutely required, but it seems a reasonable thing