1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-09 06:21:09 +03:00

Improve BRIN infra, minmax opclass and regression test

The minmax opclass was using the wrong support functions when
cross-datatypes queries were run.  Instead of trying to fix the
pg_amproc definitions (which apparently is not possible), use the
already correct pg_amop entries instead.  This requires jumping through
more hoops (read: extra syscache lookups) to obtain the underlying
functions to execute, but it is necessary for correctness.

Author: Emre Hasegeli, tweaked by Álvaro
Review: Andreas Karlsson

Also change BrinOpcInfo to record each stored type's typecache entry
instead of just the OID.  Turns out that the full type cache is
necessary in brin_deform_tuple: the original code used the indexed
type's byval and typlen properties to extract the stored tuple, which is
correct in Minmax; but in other implementations that want to store
something different, that's wrong.  The realization that this is a bug
comes from Emre also, but I did not use his patch.

I also adopted Emre's regression test code (with smallish changes),
which is more complete.
This commit is contained in:
Alvaro Herrera
2015-05-07 13:02:22 -03:00
parent 7be47c56af
commit db5f98ab4f
10 changed files with 367 additions and 402 deletions

View File

@@ -15,35 +15,21 @@
#include "access/brin_tuple.h"
#include "access/skey.h"
#include "catalog/pg_type.h"
#include "catalog/pg_amop.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
/*
* Procedure numbers must not collide with BRIN_PROCNUM defines in
* brin_internal.h. Note we only need inequality functions.
*/
#define MINMAX_NUM_PROCNUMS 4 /* # support procs we need */
#define PROCNUM_LESS 11
#define PROCNUM_LESSEQUAL 12
#define PROCNUM_GREATEREQUAL 13
#define PROCNUM_GREATER 14
/*
* Subtract this from procnum to obtain index in MinmaxOpaque arrays
* (Must be equal to minimum of private procnums)
*/
#define PROCNUM_BASE 11
typedef struct MinmaxOpaque
{
FmgrInfo operators[MINMAX_NUM_PROCNUMS];
bool inited[MINMAX_NUM_PROCNUMS];
Oid cached_subtype;
FmgrInfo strategy_procinfos[BTMaxStrategyNumber];
} MinmaxOpaque;
static FmgrInfo *minmax_get_procinfo(BrinDesc *bdesc, uint16 attno,
uint16 procnum);
static FmgrInfo *minmax_get_strategy_procinfo(BrinDesc *bdesc, uint16 attno,
Oid subtype, uint16 strategynum);
Datum
@@ -53,8 +39,8 @@ brin_minmax_opcinfo(PG_FUNCTION_ARGS)
BrinOpcInfo *result;
/*
* opaque->operators is initialized lazily, as indicated by 'inited' which
* is initialized to all false by palloc0.
* opaque->strategy_procinfos is initialized lazily; here it is set to
* all-uninitialized by palloc0 which sets fn_oid to InvalidOid.
*/
result = palloc0(MAXALIGN(SizeofBrinOpcInfo(2)) +
@@ -62,8 +48,8 @@ brin_minmax_opcinfo(PG_FUNCTION_ARGS)
result->oi_nstored = 2;
result->oi_opaque = (MinmaxOpaque *)
MAXALIGN((char *) result + SizeofBrinOpcInfo(2));
result->oi_typids[0] = typoid;
result->oi_typids[1] = typoid;
result->oi_typcache[0] = result->oi_typcache[1] =
lookup_type_cache(typoid, 0);
PG_RETURN_POINTER(result);
}
@@ -122,7 +108,8 @@ brin_minmax_add_value(PG_FUNCTION_ARGS)
* and update them accordingly. First check if it's less than the
* existing minimum.
*/
cmpFn = minmax_get_procinfo(bdesc, attno, PROCNUM_LESS);
cmpFn = minmax_get_strategy_procinfo(bdesc, attno, attr->atttypid,
BTLessStrategyNumber);
compar = FunctionCall2Coll(cmpFn, colloid, newval, column->bv_values[0]);
if (DatumGetBool(compar))
{
@@ -135,7 +122,8 @@ brin_minmax_add_value(PG_FUNCTION_ARGS)
/*
* And now compare it to the existing maximum.
*/
cmpFn = minmax_get_procinfo(bdesc, attno, PROCNUM_GREATER);
cmpFn = minmax_get_strategy_procinfo(bdesc, attno, attr->atttypid,
BTGreaterStrategyNumber);
compar = FunctionCall2Coll(cmpFn, colloid, newval, column->bv_values[1]);
if (DatumGetBool(compar))
{
@@ -159,10 +147,12 @@ brin_minmax_consistent(PG_FUNCTION_ARGS)
BrinDesc *bdesc = (BrinDesc *) PG_GETARG_POINTER(0);
BrinValues *column = (BrinValues *) PG_GETARG_POINTER(1);
ScanKey key = (ScanKey) PG_GETARG_POINTER(2);
Oid colloid = PG_GET_COLLATION();
Oid colloid = PG_GET_COLLATION(),
subtype;
AttrNumber attno;
Datum value;
Datum matches;
FmgrInfo *finfo;
Assert(key->sk_attno == column->bv_attno);
@@ -189,18 +179,16 @@ brin_minmax_consistent(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(false);
attno = key->sk_attno;
subtype = key->sk_subtype;
value = key->sk_argument;
switch (key->sk_strategy)
{
case BTLessStrategyNumber:
matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
PROCNUM_LESS),
colloid, column->bv_values[0], value);
break;
case BTLessEqualStrategyNumber:
matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
PROCNUM_LESSEQUAL),
colloid, column->bv_values[0], value);
finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype,
key->sk_strategy);
matches = FunctionCall2Coll(finfo, colloid, column->bv_values[0],
value);
break;
case BTEqualStrategyNumber:
@@ -209,25 +197,24 @@ brin_minmax_consistent(PG_FUNCTION_ARGS)
* the current page range if the minimum value in the range <=
* scan key, and the maximum value >= scan key.
*/
matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
PROCNUM_LESSEQUAL),
colloid, column->bv_values[0], value);
finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype,
BTLessEqualStrategyNumber);
matches = FunctionCall2Coll(finfo, colloid, column->bv_values[0],
value);
if (!DatumGetBool(matches))
break;
/* max() >= scankey */
matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
PROCNUM_GREATEREQUAL),
colloid, column->bv_values[1], value);
finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype,
BTGreaterEqualStrategyNumber);
matches = FunctionCall2Coll(finfo, colloid, column->bv_values[1],
value);
break;
case BTGreaterEqualStrategyNumber:
matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
PROCNUM_GREATEREQUAL),
colloid, column->bv_values[1], value);
break;
case BTGreaterStrategyNumber:
matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
PROCNUM_GREATER),
colloid, column->bv_values[1], value);
finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype,
key->sk_strategy);
matches = FunctionCall2Coll(finfo, colloid, column->bv_values[1],
value);
break;
default:
/* shouldn't happen */
@@ -252,6 +239,7 @@ brin_minmax_union(PG_FUNCTION_ARGS)
Oid colloid = PG_GET_COLLATION();
AttrNumber attno;
Form_pg_attribute attr;
FmgrInfo *finfo;
bool needsadj;
Assert(col_a->bv_attno == col_b->bv_attno);
@@ -284,9 +272,10 @@ brin_minmax_union(PG_FUNCTION_ARGS)
}
/* Adjust minimum, if B's min is less than A's min */
needsadj = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
PROCNUM_LESS),
colloid, col_b->bv_values[0], col_a->bv_values[0]);
finfo = minmax_get_strategy_procinfo(bdesc, attno, attr->atttypid,
BTLessStrategyNumber);
needsadj = FunctionCall2Coll(finfo, colloid, col_b->bv_values[0],
col_a->bv_values[0]);
if (needsadj)
{
if (!attr->attbyval)
@@ -296,9 +285,10 @@ brin_minmax_union(PG_FUNCTION_ARGS)
}
/* Adjust maximum, if B's max is greater than A's max */
needsadj = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
PROCNUM_GREATER),
colloid, col_b->bv_values[1], col_a->bv_values[1]);
finfo = minmax_get_strategy_procinfo(bdesc, attno, attr->atttypid,
BTGreaterStrategyNumber);
needsadj = FunctionCall2Coll(finfo, colloid, col_b->bv_values[1],
col_a->bv_values[1]);
if (needsadj)
{
if (!attr->attbyval)
@@ -311,27 +301,61 @@ brin_minmax_union(PG_FUNCTION_ARGS)
}
/*
* Return the procedure corresponding to the given function support number.
* Cache and return the procedure for the given strategy.
*/
static FmgrInfo *
minmax_get_procinfo(BrinDesc *bdesc, uint16 attno, uint16 procnum)
FmgrInfo *
minmax_get_strategy_procinfo(BrinDesc *bdesc, uint16 attno, Oid subtype,
uint16 strategynum)
{
MinmaxOpaque *opaque;
uint16 basenum = procnum - PROCNUM_BASE;
Assert(strategynum >= 1 &&
strategynum <= BTMaxStrategyNumber);
opaque = (MinmaxOpaque *) bdesc->bd_info[attno - 1]->oi_opaque;
/*
* We cache these in the opaque struct, to avoid repetitive syscache
* lookups.
* We cache the procedures for the previous subtype in the opaque struct,
* to avoid repetitive syscache lookups. If the subtype changed,
* invalidate all the cached entries.
*/
if (!opaque->inited[basenum])
if (opaque->cached_subtype != subtype)
{
fmgr_info_copy(&opaque->operators[basenum],
index_getprocinfo(bdesc->bd_index, attno, procnum),
bdesc->bd_context);
opaque->inited[basenum] = true;
uint16 i;
for (i = 1; i <= BTMaxStrategyNumber; i++)
opaque->strategy_procinfos[i - 1].fn_oid = InvalidOid;
opaque->cached_subtype = subtype;
}
return &opaque->operators[basenum];
if (opaque->strategy_procinfos[strategynum - 1].fn_oid == InvalidOid)
{
Form_pg_attribute attr;
HeapTuple tuple;
Oid opfamily,
oprid;
bool isNull;
opfamily = bdesc->bd_index->rd_opfamily[attno - 1];
attr = bdesc->bd_tupdesc->attrs[attno - 1];
tuple = SearchSysCache4(AMOPSTRATEGY, ObjectIdGetDatum(opfamily),
ObjectIdGetDatum(attr->atttypid),
ObjectIdGetDatum(subtype),
Int16GetDatum(strategynum));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
strategynum, attr->atttypid, subtype, opfamily);
oprid = DatumGetObjectId(SysCacheGetAttr(AMOPSTRATEGY, tuple,
Anum_pg_amop_amopopr, &isNull));
ReleaseSysCache(tuple);
Assert(!isNull && RegProcedureIsValid(oprid));
fmgr_info_cxt(get_opcode(oprid),
&opaque->strategy_procinfos[strategynum - 1],
bdesc->bd_context);
}
return &opaque->strategy_procinfos[strategynum - 1];
}

View File

@@ -68,7 +68,7 @@ brtuple_disk_tupdesc(BrinDesc *brdesc)
{
for (j = 0; j < brdesc->bd_info[i]->oi_nstored; j++)
TupleDescInitEntry(tupdesc, attno++, NULL,
brdesc->bd_info[i]->oi_typids[j],
brdesc->bd_info[i]->oi_typcache[j]->type_id,
-1, 0);
}
@@ -444,8 +444,8 @@ brin_deform_tuple(BrinDesc *brdesc, BrinTuple *tuple)
for (i = 0; i < brdesc->bd_info[keyno]->oi_nstored; i++)
dtup->bt_columns[keyno].bv_values[i] =
datumCopy(values[valueno++],
brdesc->bd_tupdesc->attrs[keyno]->attbyval,
brdesc->bd_tupdesc->attrs[keyno]->attlen);
brdesc->bd_info[keyno]->oi_typcache[i]->typbyval,
brdesc->bd_info[keyno]->oi_typcache[i]->typlen);
dtup->bt_columns[keyno].bv_hasnulls = hasnulls[keyno];
dtup->bt_columns[keyno].bv_allnulls = false;