1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-11 10:01:57 +03:00

Add optional compression method to SP-GiST

Patch allows to have different types of column and value stored in leaf tuples
of SP-GiST. The main application of feature is to transform complex column type
to simple indexed type or for truncating too long value, transformation could
be lossy.  Simple example: polygons are converted to their bounding boxes,
this opclass follows.

Authors: me, Heikki Linnakangas, Alexander Korotkov, Nikita Glukhov
Reviewed-By: all authors + Darafei Praliaskouski
Discussions:
https://www.postgresql.org/message-id/5447B3FF.2080406@sigaev.ru
https://www.postgresql.org/message-id/flat/54907069.1030506@sigaev.ru#54907069.1030506@sigaev.ru
This commit is contained in:
Teodor Sigaev
2017-12-22 13:33:16 +03:00
parent 9373baa0f7
commit 854823fa33
7 changed files with 182 additions and 37 deletions

View File

@ -1906,14 +1906,37 @@ spgdoinsert(Relation index, SpGistState *state,
procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
/*
* Since we don't use index_form_tuple in this AM, we have to make sure
* Prepare the leaf datum to insert.
*
* If an optional "compress" method is provided, then call it to form
* the leaf datum from the input datum. Otherwise store the input datum as
* is. Since we don't use index_form_tuple in this AM, we have to make sure
* value to be inserted is not toasted; FormIndexDatum doesn't guarantee
* that.
* that. But we assume the "compress" method to return an untoasted value.
*/
if (!isnull && state->attType.attlen == -1)
datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
if (!isnull)
{
if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
{
FmgrInfo *compressProcinfo = NULL;
leafDatum = datum;
compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
leafDatum = FunctionCall1Coll(compressProcinfo,
index->rd_indcollation[0],
datum);
}
else
{
Assert(state->attLeafType.type == state->attType.type);
if (state->attType.attlen == -1)
leafDatum = PointerGetDatum(PG_DETOAST_DATUM(datum));
else
leafDatum = datum;
}
}
else
leafDatum = (Datum) 0;
/*
* Compute space needed for a leaf tuple containing the given datum.
@ -1923,7 +1946,7 @@ spgdoinsert(Relation index, SpGistState *state,
*/
if (!isnull)
leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
SpGistGetTypeSize(&state->attType, leafDatum);
SpGistGetTypeSize(&state->attLeafType, leafDatum);
else
leafSize = SGDTSIZE + sizeof(ItemIdData);
@ -2138,7 +2161,7 @@ spgdoinsert(Relation index, SpGistState *state,
{
leafDatum = out.result.matchNode.restDatum;
leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
SpGistGetTypeSize(&state->attType, leafDatum);
SpGistGetTypeSize(&state->attLeafType, leafDatum);
}
/*

View File

@ -40,7 +40,7 @@ typedef struct ScanStackEntry
static void
freeScanStackEntry(SpGistScanOpaque so, ScanStackEntry *stackEntry)
{
if (!so->state.attType.attbyval &&
if (!so->state.attLeafType.attbyval &&
DatumGetPointer(stackEntry->reconstructedValue) != NULL)
pfree(DatumGetPointer(stackEntry->reconstructedValue));
if (stackEntry->traversalValue)
@ -527,8 +527,8 @@ redirect:
if (out.reconstructedValues)
newEntry->reconstructedValue =
datumCopy(out.reconstructedValues[i],
so->state.attType.attbyval,
so->state.attType.attlen);
so->state.attLeafType.attbyval,
so->state.attLeafType.attlen);
else
newEntry->reconstructedValue = (Datum) 0;

View File

@ -125,6 +125,22 @@ spgGetCache(Relation index)
/* Get the information we need about each relevant datatype */
fillTypeDesc(&cache->attType, atttype);
if (OidIsValid(cache->config.leafType) &&
cache->config.leafType != atttype)
{
if (!OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("compress method must not defined when leaf type is different from input type")));
fillTypeDesc(&cache->attLeafType, cache->config.leafType);
}
else
{
cache->attLeafType = cache->attType;
}
fillTypeDesc(&cache->attPrefixType, cache->config.prefixType);
fillTypeDesc(&cache->attLabelType, cache->config.labelType);
@ -164,6 +180,7 @@ initSpGistState(SpGistState *state, Relation index)
state->config = cache->config;
state->attType = cache->attType;
state->attLeafType = cache->attLeafType;
state->attPrefixType = cache->attPrefixType;
state->attLabelType = cache->attLabelType;
@ -618,7 +635,7 @@ spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
/* compute space needed (note result is already maxaligned) */
size = SGLTHDRSZ;
if (!isnull)
size += SpGistGetTypeSize(&state->attType, datum);
size += SpGistGetTypeSize(&state->attLeafType, datum);
/*
* Ensure that we can replace the tuple with a dead tuple later. This
@ -634,7 +651,7 @@ spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
tup->nextOffset = InvalidOffsetNumber;
tup->heapPtr = *heapPtr;
if (!isnull)
memcpyDatum(SGLTDATAPTR(tup), &state->attType, datum);
memcpyDatum(SGLTDATAPTR(tup), &state->attLeafType, datum);
return tup;
}

View File

@ -22,6 +22,7 @@
#include "catalog/pg_opfamily.h"
#include "catalog/pg_type.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/regproc.h"
#include "utils/syscache.h"
@ -52,6 +53,10 @@ spgvalidate(Oid opclassoid)
OpFamilyOpFuncGroup *opclassgroup;
int i;
ListCell *lc;
spgConfigIn configIn;
spgConfigOut configOut;
Oid configOutLefttype = InvalidOid;
Oid configOutRighttype = InvalidOid;
/* Fetch opclass information */
classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
@ -74,6 +79,7 @@ spgvalidate(Oid opclassoid)
/* Fetch all operators and support functions of the opfamily */
oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid));
grouplist = identify_opfamily_groups(oprlist, proclist);
/* Check individual support functions */
for (i = 0; i < proclist->n_members; i++)
@ -100,6 +106,40 @@ spgvalidate(Oid opclassoid)
switch (procform->amprocnum)
{
case SPGIST_CONFIG_PROC:
ok = check_amproc_signature(procform->amproc, VOIDOID, true,
2, 2, INTERNALOID, INTERNALOID);
configIn.attType = procform->amproclefttype;
memset(&configOut, 0, sizeof(configOut));
OidFunctionCall2(procform->amproc,
PointerGetDatum(&configIn),
PointerGetDatum(&configOut));
configOutLefttype = procform->amproclefttype;
configOutRighttype = procform->amprocrighttype;
/*
* When leaf and attribute types are the same, compress function
* is not required and we set corresponding bit in functionset
* for later group consistency check.
*/
if (!OidIsValid(configOut.leafType) ||
configOut.leafType == configIn.attType)
{
foreach(lc, grouplist)
{
OpFamilyOpFuncGroup *group = lfirst(lc);
if (group->lefttype == procform->amproclefttype &&
group->righttype == procform->amprocrighttype)
{
group->functionset |=
((uint64) 1) << SPGIST_COMPRESS_PROC;
break;
}
}
}
break;
case SPGIST_CHOOSE_PROC:
case SPGIST_PICKSPLIT_PROC:
case SPGIST_INNER_CONSISTENT_PROC:
@ -110,6 +150,15 @@ spgvalidate(Oid opclassoid)
ok = check_amproc_signature(procform->amproc, BOOLOID, true,
2, 2, INTERNALOID, INTERNALOID);
break;
case SPGIST_COMPRESS_PROC:
if (configOutLefttype != procform->amproclefttype ||
configOutRighttype != procform->amprocrighttype)
ok = false;
else
ok = check_amproc_signature(procform->amproc,
configOut.leafType, true,
1, 1, procform->amproclefttype);
break;
default:
ereport(INFO,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
@ -178,7 +227,6 @@ spgvalidate(Oid opclassoid)
}
/* Now check for inconsistent groups of operators/functions */
grouplist = identify_opfamily_groups(oprlist, proclist);
opclassgroup = NULL;
foreach(lc, grouplist)
{