1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-12 05:01:15 +03:00

Restructure index access method API to hide most of it at the C level.

This patch reduces pg_am to just two columns, a name and a handler
function.  All the data formerly obtained from pg_am is now provided
in a C struct returned by the handler function.  This is similar to
the designs we've adopted for FDWs and tablesample methods.  There
are multiple advantages.  For one, the index AM's support functions
are now simple C functions, making them faster to call and much less
error-prone, since the C compiler can now check function signatures.
For another, this will make it far more practical to define index access
methods in installable extensions.

A disadvantage is that SQL-level code can no longer see attributes
of index AMs; in particular, some of the crosschecks in the opr_sanity
regression test are no longer possible from SQL.  We've addressed that
by adding a facility for the index AM to perform such checks instead.
(Much more could be done in that line, but for now we're content if the
amvalidate functions more or less replace what opr_sanity used to do.)
We might also want to expose some sort of reporting functionality, but
this patch doesn't do that.

Alexander Korotkov, reviewed by Petr Jelínek, and rather heavily
editorialized on by me.
This commit is contained in:
Tom Lane
2016-01-17 19:36:59 -05:00
parent 8d290c8ec6
commit 65c5fcd353
93 changed files with 2493 additions and 1924 deletions

View File

@@ -13,6 +13,6 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = brin.o brin_pageops.o brin_revmap.o brin_tuple.o brin_xlog.o \
brin_minmax.o brin_inclusion.o
brin_minmax.o brin_inclusion.o brin_validate.o
include $(top_srcdir)/src/backend/common.mk

View File

@@ -16,22 +16,21 @@
#include "postgres.h"
#include "access/brin.h"
#include "access/brin_internal.h"
#include "access/brin_page.h"
#include "access/brin_pageops.h"
#include "access/brin_xlog.h"
#include "access/reloptions.h"
#include "access/relscan.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/index.h"
#include "catalog/pg_am.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "utils/index_selfuncs.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
/*
@@ -71,6 +70,50 @@ static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
/*
* BRIN handler function: return IndexAmRoutine with access method parameters
* and callbacks.
*/
Datum
brinhandler(PG_FUNCTION_ARGS)
{
IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
amroutine->amstrategies = 0;
amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
amroutine->amcanorder = false;
amroutine->amcanorderbyop = false;
amroutine->amcanbackward = false;
amroutine->amcanunique = false;
amroutine->amcanmulticol = true;
amroutine->amoptionalkey = true;
amroutine->amsearcharray = false;
amroutine->amsearchnulls = true;
amroutine->amstorage = true;
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = brinbuild;
amroutine->ambuildempty = brinbuildempty;
amroutine->aminsert = brininsert;
amroutine->ambulkdelete = brinbulkdelete;
amroutine->amvacuumcleanup = brinvacuumcleanup;
amroutine->amcanreturn = NULL;
amroutine->amcostestimate = brincostestimate;
amroutine->amoptions = brinoptions;
amroutine->amvalidate = brinvalidate;
amroutine->ambeginscan = brinbeginscan;
amroutine->amrescan = brinrescan;
amroutine->amgettuple = NULL;
amroutine->amgetbitmap = bringetbitmap;
amroutine->amendscan = brinendscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
PG_RETURN_POINTER(amroutine);
}
/*
* A tuple in the heap is being inserted. To keep a brin index up to date,
* we need to obtain the relevant index tuple and compare its stored values
@@ -80,15 +123,11 @@ static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
* If the range is not currently summarized (i.e. the revmap returns NULL for
* it), there's nothing to do.
*/
Datum
brininsert(PG_FUNCTION_ARGS)
bool
brininsert(Relation idxRel, Datum *values, bool *nulls,
ItemPointer heaptid, Relation heapRel,
IndexUniqueCheck checkUnique)
{
Relation idxRel = (Relation) PG_GETARG_POINTER(0);
Datum *values = (Datum *) PG_GETARG_POINTER(1);
bool *nulls = (bool *) PG_GETARG_POINTER(2);
ItemPointer heaptid = (ItemPointer) PG_GETARG_POINTER(3);
/* we ignore the rest of our arguments */
BlockNumber pagesPerRange;
BrinDesc *bdesc = NULL;
BrinRevmap *revmap;
@@ -226,7 +265,7 @@ brininsert(PG_FUNCTION_ARGS)
MemoryContextDelete(tupcxt);
}
return BoolGetDatum(false);
return false;
}
/*
@@ -236,12 +275,9 @@ brininsert(PG_FUNCTION_ARGS)
* index was built with. Note that since this cannot be changed while we're
* holding lock on index, it's not necessary to recompute it during brinrescan.
*/
Datum
brinbeginscan(PG_FUNCTION_ARGS)
IndexScanDesc
brinbeginscan(Relation r, int nkeys, int norderbys)
{
Relation r = (Relation) PG_GETARG_POINTER(0);
int nkeys = PG_GETARG_INT32(1);
int norderbys = PG_GETARG_INT32(2);
IndexScanDesc scan;
BrinOpaque *opaque;
@@ -252,7 +288,7 @@ brinbeginscan(PG_FUNCTION_ARGS)
opaque->bo_bdesc = brin_build_desc(r);
scan->opaque = opaque;
PG_RETURN_POINTER(scan);
return scan;
}
/*
@@ -267,11 +303,9 @@ brinbeginscan(PG_FUNCTION_ARGS)
* unsummarized. Pages in those ranges need to be returned regardless of scan
* keys.
*/
Datum
bringetbitmap(PG_FUNCTION_ARGS)
int64
bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
TIDBitmap *tbm = (TIDBitmap *) PG_GETARG_POINTER(1);
Relation idxRel = scan->indexRelation;
Buffer buf = InvalidBuffer;
BrinDesc *bdesc;
@@ -451,20 +485,16 @@ bringetbitmap(PG_FUNCTION_ARGS)
* returns, but we don't have a precise idea of the number of heap tuples
* involved.
*/
PG_RETURN_INT64(totalpages * 10);
return totalpages * 10;
}
/*
* Re-initialize state for a BRIN index scan
*/
Datum
brinrescan(PG_FUNCTION_ARGS)
void
brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
ScanKey orderbys, int norderbys)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanKey scankey = (ScanKey) PG_GETARG_POINTER(1);
/* other arguments ignored */
/*
* Other index AMs preprocess the scan keys at this point, or sometime
* early during the scan; this lets them optimize by removing redundant
@@ -476,38 +506,19 @@ brinrescan(PG_FUNCTION_ARGS)
if (scankey && scan->numberOfKeys > 0)
memmove(scan->keyData, scankey,
scan->numberOfKeys * sizeof(ScanKeyData));
PG_RETURN_VOID();
}
/*
* Close down a BRIN index scan
*/
Datum
brinendscan(PG_FUNCTION_ARGS)
void
brinendscan(IndexScanDesc scan)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
brinRevmapTerminate(opaque->bo_rmAccess);
brin_free_desc(opaque->bo_bdesc);
pfree(opaque);
PG_RETURN_VOID();
}
Datum
brinmarkpos(PG_FUNCTION_ARGS)
{
elog(ERROR, "BRIN does not support mark/restore");
PG_RETURN_VOID();
}
Datum
brinrestrpos(PG_FUNCTION_ARGS)
{
elog(ERROR, "BRIN does not support mark/restore");
PG_RETURN_VOID();
}
/*
@@ -579,12 +590,9 @@ brinbuildCallback(Relation index,
/*
* brinbuild() -- build a new BRIN index.
*/
Datum
brinbuild(PG_FUNCTION_ARGS)
IndexBuildResult *
brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
{
Relation heap = (Relation) PG_GETARG_POINTER(0);
Relation index = (Relation) PG_GETARG_POINTER(1);
IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
IndexBuildResult *result;
double reltuples;
double idxtuples;
@@ -663,13 +671,12 @@ brinbuild(PG_FUNCTION_ARGS)
result->heap_tuples = reltuples;
result->index_tuples = idxtuples;
PG_RETURN_POINTER(result);
return result;
}
Datum
brinbuildempty(PG_FUNCTION_ARGS)
void
brinbuildempty(Relation index)
{
Relation index = (Relation) PG_GETARG_POINTER(0);
Buffer metabuf;
/* An empty BRIN index has a metapage only. */
@@ -686,8 +693,6 @@ brinbuildempty(PG_FUNCTION_ARGS)
END_CRIT_SECTION();
UnlockReleaseBuffer(metabuf);
PG_RETURN_VOID();
}
/*
@@ -699,35 +704,29 @@ brinbuildempty(PG_FUNCTION_ARGS)
* tuple is deleted), meaning the need to re-run summarization on the affected
* range. Would need to add an extra flag in brintuples for that.
*/
Datum
brinbulkdelete(PG_FUNCTION_ARGS)
IndexBulkDeleteResult *
brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
IndexBulkDeleteCallback callback, void *callback_state)
{
/* other arguments are not currently used */
IndexBulkDeleteResult *stats =
(IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
/* allocate stats if first time through, else re-use existing struct */
if (stats == NULL)
stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
PG_RETURN_POINTER(stats);
return stats;
}
/*
* This routine is in charge of "vacuuming" a BRIN index: we just summarize
* ranges that are currently unsummarized.
*/
Datum
brinvacuumcleanup(PG_FUNCTION_ARGS)
IndexBulkDeleteResult *
brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
IndexBulkDeleteResult *stats =
(IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
Relation heapRel;
/* No-op in ANALYZE ONLY mode */
if (info->analyze_only)
PG_RETURN_POINTER(stats);
return stats;
if (!stats)
stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
@@ -744,17 +743,15 @@ brinvacuumcleanup(PG_FUNCTION_ARGS)
heap_close(heapRel, AccessShareLock);
PG_RETURN_POINTER(stats);
return stats;
}
/*
* reloptions processor for BRIN indexes
*/
Datum
brinoptions(PG_FUNCTION_ARGS)
bytea *
brinoptions(Datum reloptions, bool validate)
{
Datum reloptions = PG_GETARG_DATUM(0);
bool validate = PG_GETARG_BOOL(1);
relopt_value *options;
BrinOptions *rdopts;
int numoptions;
@@ -767,7 +764,7 @@ brinoptions(PG_FUNCTION_ARGS)
/* if none set, we're done */
if (numoptions == 0)
PG_RETURN_NULL();
return NULL;
rdopts = allocateReloptStruct(sizeof(BrinOptions), options, numoptions);
@@ -776,7 +773,7 @@ brinoptions(PG_FUNCTION_ARGS)
pfree(options);
PG_RETURN_BYTEA_P(rdopts);
return (bytea *) rdopts;
}
/*

View File

@@ -0,0 +1,152 @@
/*-------------------------------------------------------------------------
*
* brin_validate.c
* Opclass validator for BRIN.
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/brin/brin_validate.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/brin_internal.h"
#include "access/htup_details.h"
#include "catalog/pg_amop.h"
#include "catalog/pg_amproc.h"
#include "catalog/pg_opclass.h"
#include "utils/catcache.h"
#include "utils/syscache.h"
/*
* Validator for a BRIN opclass.
*/
bool
brinvalidate(Oid opclassoid)
{
HeapTuple classtup;
Form_pg_opclass classform;
Oid opfamilyoid;
Oid opcintype;
int numclassops;
int32 classfuncbits;
CatCList *proclist,
*oprlist;
int i,
j;
/* Fetch opclass information */
classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
if (!HeapTupleIsValid(classtup))
elog(ERROR, "cache lookup failed for operator class %u", opclassoid);
classform = (Form_pg_opclass) GETSTRUCT(classtup);
opfamilyoid = classform->opcfamily;
opcintype = classform->opcintype;
ReleaseSysCache(classtup);
/* Fetch all operators and support functions of the opfamily */
oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid));
/* We'll track the ops and functions belonging to the named opclass */
numclassops = 0;
classfuncbits = 0;
/* Check support functions */
for (i = 0; i < proclist->n_members; i++)
{
HeapTuple proctup = &proclist->members[i]->tuple;
Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
/* Check that only allowed procedure numbers exist */
if (procform->amprocnum < 1 ||
procform->amprocnum > BRIN_LAST_OPTIONAL_PROCNUM)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("brin opfamily %u contains invalid support number %d for procedure %u",
opfamilyoid,
procform->amprocnum, procform->amproc)));
/* Remember functions that are specifically for the named opclass */
if (procform->amproclefttype == opcintype &&
procform->amprocrighttype == opcintype)
classfuncbits |= (1 << procform->amprocnum);
}
/* Check operators */
for (i = 0; i < oprlist->n_members; i++)
{
HeapTuple oprtup = &oprlist->members[i]->tuple;
Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
bool found = false;
/* TODO: Check that only allowed strategy numbers exist */
if (oprform->amopstrategy < 1)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("brin opfamily %u contains invalid strategy number %d for operator %u",
opfamilyoid,
oprform->amopstrategy, oprform->amopopr)));
/* TODO: check more thoroughly for missing support functions */
for (j = 0; j < proclist->n_members; j++)
{
HeapTuple proctup = &proclist->members[j]->tuple;
Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
/* note only the operator's lefttype matters */
if (procform->amproclefttype == oprform->amoplefttype &&
procform->amprocrighttype == oprform->amoplefttype)
{
found = true;
break;
}
}
if (!found)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("brin opfamily %u lacks support function for operator %u",
opfamilyoid, oprform->amopopr)));
/* brin doesn't support ORDER BY operators */
if (oprform->amoppurpose != AMOP_SEARCH ||
OidIsValid(oprform->amopsortfamily))
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("brin opfamily %u contains invalid ORDER BY specification for operator %u",
opfamilyoid, oprform->amopopr)));
/* Count operators that are specifically for the named opclass */
if (oprform->amoplefttype == opcintype &&
oprform->amoprighttype == opcintype)
numclassops++;
}
/* Check that the named opclass is complete */
if (numclassops == 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("brin opclass %u is missing operator(s)",
opclassoid)));
for (i = 1; i <= BRIN_MANDATORY_NPROCS; i++)
{
if ((classfuncbits & (1 << i)) != 0)
continue; /* got it */
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("brin opclass %u is missing required support function %d",
opclassoid, i)));
}
ReleaseCatCacheList(proclist);
ReleaseCatCacheList(oprlist);
return true;
}

View File

@@ -887,11 +887,13 @@ untransformRelOptions(Datum options)
* other uses, consider grabbing the rd_options pointer from the relcache entry
* instead.
*
* tupdesc is pg_class' tuple descriptor. amoptions is the amoptions regproc
* in the case of the tuple corresponding to an index, or InvalidOid otherwise.
* tupdesc is pg_class' tuple descriptor. amoptions is a pointer to the index
* AM's options parser function in the case of a tuple corresponding to an
* index, or NULL otherwise.
*/
bytea *
extractRelOptions(HeapTuple tuple, TupleDesc tupdesc, Oid amoptions)
extractRelOptions(HeapTuple tuple, TupleDesc tupdesc,
amoptions_function amoptions)
{
bytea *options;
bool isnull;
@@ -1374,39 +1376,20 @@ heap_reloptions(char relkind, Datum reloptions, bool validate)
/*
* Parse options for indexes.
*
* amoptions Oid of option parser
* amoptions index AM's option parser function
* reloptions options as text[] datum
* validate error flag
*/
bytea *
index_reloptions(RegProcedure amoptions, Datum reloptions, bool validate)
index_reloptions(amoptions_function amoptions, Datum reloptions, bool validate)
{
FmgrInfo flinfo;
FunctionCallInfoData fcinfo;
Datum result;
Assert(RegProcedureIsValid(amoptions));
Assert(amoptions != NULL);
/* Assume function is strict */
if (!PointerIsValid(DatumGetPointer(reloptions)))
return NULL;
/* Can't use OidFunctionCallN because we might get a NULL result */
fmgr_info(amoptions, &flinfo);
InitFunctionCallInfoData(fcinfo, &flinfo, 2, InvalidOid, NULL, NULL);
fcinfo.arg[0] = reloptions;
fcinfo.arg[1] = BoolGetDatum(validate);
fcinfo.argnull[0] = false;
fcinfo.argnull[1] = false;
result = FunctionCallInvoke(&fcinfo);
if (fcinfo.isnull || DatumGetPointer(result) == NULL)
return NULL;
return DatumGetByteaP(result);
return amoptions(reloptions, validate);
}
/*

View File

@@ -14,6 +14,6 @@ include $(top_builddir)/src/Makefile.global
OBJS = ginutil.o gininsert.o ginxlog.o ginentrypage.o gindatapage.o \
ginbtree.o ginscan.o ginget.o ginvacuum.o ginarrayproc.o \
ginbulk.o ginfast.o ginpostinglist.o ginlogic.o
ginbulk.o ginfast.o ginpostinglist.o ginlogic.o ginvalidate.o
include $(top_srcdir)/src/backend/common.mk

View File

@@ -1772,11 +1772,9 @@ scanPendingInsert(IndexScanDesc scan, TIDBitmap *tbm, int64 *ntids)
#define GinIsVoidRes(s) ( ((GinScanOpaque) scan->opaque)->isVoidRes )
Datum
gingetbitmap(PG_FUNCTION_ARGS)
int64
gingetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
TIDBitmap *tbm = (TIDBitmap *) PG_GETARG_POINTER(1);
GinScanOpaque so = (GinScanOpaque) scan->opaque;
int64 ntids;
ItemPointerData iptr;
@@ -1790,7 +1788,7 @@ gingetbitmap(PG_FUNCTION_ARGS)
ginNewScanKey(scan);
if (GinIsVoidRes(scan))
PG_RETURN_INT64(0);
return 0;
ntids = 0;
@@ -1827,5 +1825,5 @@ gingetbitmap(PG_FUNCTION_ARGS)
ntids++;
}
PG_RETURN_INT64(ntids);
return ntids;
}

View File

@@ -306,12 +306,9 @@ ginBuildCallback(Relation index, HeapTuple htup, Datum *values,
MemoryContextSwitchTo(oldCtx);
}
Datum
ginbuild(PG_FUNCTION_ARGS)
IndexBuildResult *
ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
{
Relation heap = (Relation) PG_GETARG_POINTER(0);
Relation index = (Relation) PG_GETARG_POINTER(1);
IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
IndexBuildResult *result;
double reltuples;
GinBuildState buildstate;
@@ -429,16 +426,15 @@ ginbuild(PG_FUNCTION_ARGS)
result->heap_tuples = reltuples;
result->index_tuples = buildstate.indtuples;
PG_RETURN_POINTER(result);
return result;
}
/*
* ginbuildempty() -- build an empty gin index in the initialization fork
*/
Datum
ginbuildempty(PG_FUNCTION_ARGS)
void
ginbuildempty(Relation index)
{
Relation index = (Relation) PG_GETARG_POINTER(0);
Buffer RootBuffer,
MetaBuffer;
@@ -463,8 +459,6 @@ ginbuildempty(PG_FUNCTION_ARGS)
/* Unlock and release the buffers. */
UnlockReleaseBuffer(MetaBuffer);
UnlockReleaseBuffer(RootBuffer);
PG_RETURN_VOID();
}
/*
@@ -489,18 +483,11 @@ ginHeapTupleInsert(GinState *ginstate, OffsetNumber attnum,
item, 1, NULL);
}
Datum
gininsert(PG_FUNCTION_ARGS)
bool
gininsert(Relation index, Datum *values, bool *isnull,
ItemPointer ht_ctid, Relation heapRel,
IndexUniqueCheck checkUnique)
{
Relation index = (Relation) PG_GETARG_POINTER(0);
Datum *values = (Datum *) PG_GETARG_POINTER(1);
bool *isnull = (bool *) PG_GETARG_POINTER(2);
ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
#ifdef NOT_USED
Relation heapRel = (Relation) PG_GETARG_POINTER(4);
IndexUniqueCheck checkUnique = (IndexUniqueCheck) PG_GETARG_INT32(5);
#endif
GinState ginstate;
MemoryContext oldCtx;
MemoryContext insertCtx;
@@ -541,5 +528,5 @@ gininsert(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(oldCtx);
MemoryContextDelete(insertCtx);
PG_RETURN_BOOL(false);
return false;
}

View File

@@ -21,12 +21,9 @@
#include "utils/rel.h"
Datum
ginbeginscan(PG_FUNCTION_ARGS)
IndexScanDesc
ginbeginscan(Relation rel, int nkeys, int norderbys)
{
Relation rel = (Relation) PG_GETARG_POINTER(0);
int nkeys = PG_GETARG_INT32(1);
int norderbys = PG_GETARG_INT32(2);
IndexScanDesc scan;
GinScanOpaque so;
@@ -53,7 +50,7 @@ ginbeginscan(PG_FUNCTION_ARGS)
scan->opaque = so;
PG_RETURN_POINTER(scan);
return scan;
}
/*
@@ -417,13 +414,10 @@ ginNewScanKey(IndexScanDesc scan)
pgstat_count_index_scan(scan->indexRelation);
}
Datum
ginrescan(PG_FUNCTION_ARGS)
void
ginrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
ScanKey orderbys, int norderbys)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanKey scankey = (ScanKey) PG_GETARG_POINTER(1);
/* remaining arguments are ignored */
GinScanOpaque so = (GinScanOpaque) scan->opaque;
ginFreeScanKeys(so);
@@ -433,15 +427,12 @@ ginrescan(PG_FUNCTION_ARGS)
memmove(scan->keyData, scankey,
scan->numberOfKeys * sizeof(ScanKeyData));
}
PG_RETURN_VOID();
}
Datum
ginendscan(PG_FUNCTION_ARGS)
void
ginendscan(IndexScanDesc scan)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
GinScanOpaque so = (GinScanOpaque) scan->opaque;
ginFreeScanKeys(so);
@@ -450,20 +441,4 @@ ginendscan(PG_FUNCTION_ARGS)
MemoryContextDelete(so->keyCtx);
pfree(so);
PG_RETURN_VOID();
}
Datum
ginmarkpos(PG_FUNCTION_ARGS)
{
elog(ERROR, "GIN does not support mark/restore");
PG_RETURN_VOID();
}
Datum
ginrestrpos(PG_FUNCTION_ARGS)
{
elog(ERROR, "GIN does not support mark/restore");
PG_RETURN_VOID();
}

View File

@@ -1,7 +1,7 @@
/*-------------------------------------------------------------------------
*
* ginutil.c
* utilities routines for the postgres inverted index access method.
* Utility routines for the Postgres inverted index access method.
*
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
@@ -22,8 +22,53 @@
#include "miscadmin.h"
#include "storage/indexfsm.h"
#include "storage/lmgr.h"
#include "utils/index_selfuncs.h"
/*
* GIN handler function: return IndexAmRoutine with access method parameters
* and callbacks.
*/
Datum
ginhandler(PG_FUNCTION_ARGS)
{
IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
amroutine->amstrategies = 0;
amroutine->amsupport = 6;
amroutine->amcanorder = false;
amroutine->amcanorderbyop = false;
amroutine->amcanbackward = false;
amroutine->amcanunique = false;
amroutine->amcanmulticol = true;
amroutine->amoptionalkey = true;
amroutine->amsearcharray = false;
amroutine->amsearchnulls = false;
amroutine->amstorage = true;
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = ginbuild;
amroutine->ambuildempty = ginbuildempty;
amroutine->aminsert = gininsert;
amroutine->ambulkdelete = ginbulkdelete;
amroutine->amvacuumcleanup = ginvacuumcleanup;
amroutine->amcanreturn = NULL;
amroutine->amcostestimate = gincostestimate;
amroutine->amoptions = ginoptions;
amroutine->amvalidate = ginvalidate;
amroutine->ambeginscan = ginbeginscan;
amroutine->amrescan = ginrescan;
amroutine->amgettuple = NULL;
amroutine->amgetbitmap = gingetbitmap;
amroutine->amendscan = ginendscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
PG_RETURN_POINTER(amroutine);
}
/*
* initGinState: fill in an empty GinState struct to describe the index
*
@@ -516,11 +561,9 @@ ginExtractEntries(GinState *ginstate, OffsetNumber attnum,
return entries;
}
Datum
ginoptions(PG_FUNCTION_ARGS)
bytea *
ginoptions(Datum reloptions, bool validate)
{
Datum reloptions = PG_GETARG_DATUM(0);
bool validate = PG_GETARG_BOOL(1);
relopt_value *options;
GinOptions *rdopts;
int numoptions;
@@ -535,7 +578,7 @@ ginoptions(PG_FUNCTION_ARGS)
/* if none set, we're done */
if (numoptions == 0)
PG_RETURN_NULL();
return NULL;
rdopts = allocateReloptStruct(sizeof(GinOptions), options, numoptions);
@@ -544,7 +587,7 @@ ginoptions(PG_FUNCTION_ARGS)
pfree(options);
PG_RETURN_BYTEA_P(rdopts);
return (bytea *) rdopts;
}
/*

View File

@@ -513,13 +513,10 @@ ginVacuumEntryPage(GinVacuumState *gvs, Buffer buffer, BlockNumber *roots, uint3
return (tmppage == origpage) ? NULL : tmppage;
}
Datum
ginbulkdelete(PG_FUNCTION_ARGS)
IndexBulkDeleteResult *
ginbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
IndexBulkDeleteCallback callback, void *callback_state)
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
void *callback_state = (void *) PG_GETARG_POINTER(3);
Relation index = info->index;
BlockNumber blkno = GIN_ROOT_BLKNO;
GinVacuumState gvs;
@@ -634,14 +631,12 @@ ginbulkdelete(PG_FUNCTION_ARGS)
MemoryContextDelete(gvs.tmpCxt);
PG_RETURN_POINTER(gvs.result);
return gvs.result;
}
Datum
ginvacuumcleanup(PG_FUNCTION_ARGS)
IndexBulkDeleteResult *
ginvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
Relation index = info->index;
bool needLock;
BlockNumber npages,
@@ -661,7 +656,7 @@ ginvacuumcleanup(PG_FUNCTION_ARGS)
initGinState(&ginstate, index);
ginInsertCleanup(&ginstate, true, true, stats);
}
PG_RETURN_POINTER(stats);
return stats;
}
/*
@@ -746,5 +741,5 @@ ginvacuumcleanup(PG_FUNCTION_ARGS)
if (needLock)
UnlockRelationForExtension(index, ExclusiveLock);
PG_RETURN_POINTER(stats);
return stats;
}

View File

@@ -0,0 +1,145 @@
/*-------------------------------------------------------------------------
*
* ginvalidate.c
* Opclass validator for GIN.
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/gin/ginvalidate.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/gin_private.h"
#include "access/htup_details.h"
#include "catalog/pg_amop.h"
#include "catalog/pg_amproc.h"
#include "catalog/pg_opclass.h"
#include "utils/catcache.h"
#include "utils/syscache.h"
/*
* Validator for a GIN opclass.
*/
bool
ginvalidate(Oid opclassoid)
{
HeapTuple classtup;
Form_pg_opclass classform;
Oid opfamilyoid;
Oid opcintype;
int numclassops;
int32 classfuncbits;
CatCList *proclist,
*oprlist;
int i;
/* Fetch opclass information */
classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
if (!HeapTupleIsValid(classtup))
elog(ERROR, "cache lookup failed for operator class %u", opclassoid);
classform = (Form_pg_opclass) GETSTRUCT(classtup);
opfamilyoid = classform->opcfamily;
opcintype = classform->opcintype;
ReleaseSysCache(classtup);
/* Fetch all operators and support functions of the opfamily */
oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid));
/* We'll track the ops and functions belonging to the named opclass */
numclassops = 0;
classfuncbits = 0;
/* Check support functions */
for (i = 0; i < proclist->n_members; i++)
{
HeapTuple proctup = &proclist->members[i]->tuple;
Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
/* Check that only allowed procedure numbers exist */
if (procform->amprocnum < 1 ||
procform->amprocnum > GINNProcs)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("gin opfamily %u contains invalid support number %d for procedure %u",
opfamilyoid,
procform->amprocnum, procform->amproc)));
/* Remember functions that are specifically for the named opclass */
if (procform->amproclefttype == opcintype &&
procform->amprocrighttype == opcintype)
classfuncbits |= (1 << procform->amprocnum);
}
/* Check operators */
for (i = 0; i < oprlist->n_members; i++)
{
HeapTuple oprtup = &oprlist->members[i]->tuple;
Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
/* TODO: Check that only allowed strategy numbers exist */
if (oprform->amopstrategy < 1)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("gin opfamily %u contains invalid strategy number %d for operator %u",
opfamilyoid,
oprform->amopstrategy, oprform->amopopr)));
/* gin doesn't support ORDER BY operators */
if (oprform->amoppurpose != AMOP_SEARCH ||
OidIsValid(oprform->amopsortfamily))
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("gin opfamily %u contains invalid ORDER BY specification for operator %u",
opfamilyoid, oprform->amopopr)));
/* Count operators that are specifically for the named opclass */
if (oprform->amoplefttype == opcintype &&
oprform->amoprighttype == opcintype)
numclassops++;
}
/* Check that the named opclass is complete */
/* XXX needs work: we need to detect applicability of ANYARRAY operators */
#ifdef NOT_USED
if (numclassops == 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("gin opclass %u is missing operator(s)",
opclassoid)));
#endif
for (i = 1; i <= GINNProcs; i++)
{
if ((classfuncbits & (1 << i)) != 0)
continue; /* got it */
if (i == GIN_COMPARE_PARTIAL_PROC)
continue; /* optional method */
if (i == GIN_CONSISTENT_PROC || i == GIN_TRICONSISTENT_PROC)
continue; /* don't need to have both, see check below
* loop */
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("gin opclass %u is missing required support function %d",
opclassoid, i)));
}
if ((classfuncbits & (1 << GIN_CONSISTENT_PROC)) == 0 &&
(classfuncbits & (1 << GIN_TRICONSISTENT_PROC)) == 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("gin opclass %u is missing required support function",
opclassoid)));
ReleaseCatCacheList(proclist);
ReleaseCatCacheList(oprlist);
return true;
}

View File

@@ -13,6 +13,6 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = gist.o gistutil.o gistxlog.o gistvacuum.o gistget.o gistscan.o \
gistproc.o gistsplit.o gistbuild.o gistbuildbuffers.o
gistproc.o gistsplit.o gistbuild.o gistbuildbuffers.o gistvalidate.o
include $(top_srcdir)/src/backend/common.mk

View File

@@ -14,17 +14,15 @@
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/gist_private.h"
#include "access/xloginsert.h"
#include "catalog/index.h"
#include "access/gistscan.h"
#include "catalog/pg_collation.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/indexfsm.h"
#include "utils/index_selfuncs.h"
#include "utils/memutils.h"
#include "utils/rel.h"
/* non-export function prototypes */
static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
static bool gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
@@ -49,6 +47,50 @@ static void gistvacuumpage(Relation rel, Page page, Buffer buffer);
} while(0)
/*
* GiST handler function: return IndexAmRoutine with access method parameters
* and callbacks.
*/
Datum
gisthandler(PG_FUNCTION_ARGS)
{
IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
amroutine->amstrategies = 0;
amroutine->amsupport = 9;
amroutine->amcanorder = false;
amroutine->amcanorderbyop = true;
amroutine->amcanbackward = false;
amroutine->amcanunique = false;
amroutine->amcanmulticol = true;
amroutine->amoptionalkey = true;
amroutine->amsearcharray = false;
amroutine->amsearchnulls = true;
amroutine->amstorage = true;
amroutine->amclusterable = true;
amroutine->ampredlocks = false;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = gistbuild;
amroutine->ambuildempty = gistbuildempty;
amroutine->aminsert = gistinsert;
amroutine->ambulkdelete = gistbulkdelete;
amroutine->amvacuumcleanup = gistvacuumcleanup;
amroutine->amcanreturn = gistcanreturn;
amroutine->amcostestimate = gistcostestimate;
amroutine->amoptions = gistoptions;
amroutine->amvalidate = gistvalidate;
amroutine->ambeginscan = gistbeginscan;
amroutine->amrescan = gistrescan;
amroutine->amgettuple = gistgettuple;
amroutine->amgetbitmap = gistgetbitmap;
amroutine->amendscan = gistendscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
PG_RETURN_POINTER(amroutine);
}
/*
* Create and return a temporary memory context for use by GiST. We
* _always_ invoke user-provided methods in a temporary memory
@@ -70,10 +112,9 @@ createTempGistContext(void)
/*
* gistbuildempty() -- build an empty gist index in the initialization fork
*/
Datum
gistbuildempty(PG_FUNCTION_ARGS)
void
gistbuildempty(Relation index)
{
Relation index = (Relation) PG_GETARG_POINTER(0);
Buffer buffer;
/* Initialize the root page */
@@ -89,8 +130,6 @@ gistbuildempty(PG_FUNCTION_ARGS)
/* Unlock and release the buffer */
UnlockReleaseBuffer(buffer);
PG_RETURN_VOID();
}
/*
@@ -99,18 +138,11 @@ gistbuildempty(PG_FUNCTION_ARGS)
* This is the public interface routine for tuple insertion in GiSTs.
* It doesn't do any work; just locks the relation and passes the buck.
*/
Datum
gistinsert(PG_FUNCTION_ARGS)
bool
gistinsert(Relation r, Datum *values, bool *isnull,
ItemPointer ht_ctid, Relation heapRel,
IndexUniqueCheck checkUnique)
{
Relation r = (Relation) PG_GETARG_POINTER(0);
Datum *values = (Datum *) PG_GETARG_POINTER(1);
bool *isnull = (bool *) PG_GETARG_POINTER(2);
ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
#ifdef NOT_USED
Relation heapRel = (Relation) PG_GETARG_POINTER(4);
IndexUniqueCheck checkUnique = (IndexUniqueCheck) PG_GETARG_INT32(5);
#endif
IndexTuple itup;
GISTSTATE *giststate;
MemoryContext oldCxt;
@@ -136,7 +168,7 @@ gistinsert(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(oldCxt);
freeGISTstate(giststate);
PG_RETURN_BOOL(false);
return false;
}

View File

@@ -109,12 +109,9 @@ static BlockNumber gistGetParent(GISTBuildState *buildstate, BlockNumber child);
* but switches to more efficient buffering build algorithm after a certain
* number of tuples (unless buffering mode is disabled).
*/
Datum
gistbuild(PG_FUNCTION_ARGS)
IndexBuildResult *
gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
{
Relation heap = (Relation) PG_GETARG_POINTER(0);
Relation index = (Relation) PG_GETARG_POINTER(1);
IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
IndexBuildResult *result;
double reltuples;
GISTBuildState buildstate;
@@ -232,7 +229,7 @@ gistbuild(PG_FUNCTION_ARGS)
result->heap_tuples = reltuples;
result->index_tuples = (double) buildstate.indtuples;
PG_RETURN_POINTER(result);
return result;
}
/*

View File

@@ -618,18 +618,16 @@ getNextNearest(IndexScanDesc scan)
/*
* gistgettuple() -- Get the next tuple in the scan
*/
Datum
gistgettuple(PG_FUNCTION_ARGS)
bool
gistgettuple(IndexScanDesc scan, ScanDirection dir)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
GISTScanOpaque so = (GISTScanOpaque) scan->opaque;
if (dir != ForwardScanDirection)
elog(ERROR, "GiST only supports forward scan direction");
if (!so->qual_ok)
PG_RETURN_BOOL(false);
return false;
if (so->firstCall)
{
@@ -651,7 +649,7 @@ gistgettuple(PG_FUNCTION_ARGS)
if (scan->numberOfOrderBys > 0)
{
/* Must fetch tuples in strict distance order */
PG_RETURN_BOOL(getNextNearest(scan));
return getNextNearest(scan);
}
else
{
@@ -688,7 +686,7 @@ gistgettuple(PG_FUNCTION_ARGS)
so->curPageData++;
PG_RETURN_BOOL(true);
return true;
}
/*
@@ -726,7 +724,7 @@ gistgettuple(PG_FUNCTION_ARGS)
item = getNextGISTSearchItem(so);
if (!item)
PG_RETURN_BOOL(false);
return false;
CHECK_FOR_INTERRUPTS();
@@ -750,17 +748,15 @@ gistgettuple(PG_FUNCTION_ARGS)
/*
* gistgetbitmap() -- Get a bitmap of all heap tuple locations
*/
Datum
gistgetbitmap(PG_FUNCTION_ARGS)
int64
gistgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
TIDBitmap *tbm = (TIDBitmap *) PG_GETARG_POINTER(1);
GISTScanOpaque so = (GISTScanOpaque) scan->opaque;
int64 ntids = 0;
GISTSearchItem fakeItem;
if (!so->qual_ok)
PG_RETURN_INT64(0);
return 0;
pgstat_count_index_scan(scan->indexRelation);
@@ -791,7 +787,7 @@ gistgetbitmap(PG_FUNCTION_ARGS)
pfree(item);
}
PG_RETURN_INT64(ntids);
return ntids;
}
/*
@@ -799,14 +795,11 @@ gistgetbitmap(PG_FUNCTION_ARGS)
*
* Opclasses that implement a fetch function support index-only scans.
*/
Datum
gistcanreturn(PG_FUNCTION_ARGS)
bool
gistcanreturn(Relation index, int attno)
{
Relation index = (Relation) PG_GETARG_POINTER(0);
int attno = PG_GETARG_INT32(1);
if (OidIsValid(index_getprocid(index, attno, GIST_FETCH_PROC)))
PG_RETURN_BOOL(true);
return true;
else
PG_RETURN_BOOL(false);
return false;
}

View File

@@ -54,12 +54,9 @@ pairingheap_GISTSearchItem_cmp(const pairingheap_node *a, const pairingheap_node
* Index AM API functions for scanning GiST indexes
*/
Datum
gistbeginscan(PG_FUNCTION_ARGS)
IndexScanDesc
gistbeginscan(Relation r, int nkeys, int norderbys)
{
Relation r = (Relation) PG_GETARG_POINTER(0);
int nkeys = PG_GETARG_INT32(1);
int norderbys = PG_GETARG_INT32(2);
IndexScanDesc scan;
GISTSTATE *giststate;
GISTScanOpaque so;
@@ -107,16 +104,13 @@ gistbeginscan(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(oldCxt);
PG_RETURN_POINTER(scan);
return scan;
}
Datum
gistrescan(PG_FUNCTION_ARGS)
void
gistrescan(IndexScanDesc scan, ScanKey key, int nkeys,
ScanKey orderbys, int norderbys)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanKey key = (ScanKey) PG_GETARG_POINTER(1);
ScanKey orderbys = (ScanKey) PG_GETARG_POINTER(3);
/* nkeys and norderbys arguments are ignored */
GISTScanOpaque so = (GISTScanOpaque) scan->opaque;
bool first_time;
@@ -314,28 +308,11 @@ gistrescan(PG_FUNCTION_ARGS)
if (!first_time)
pfree(fn_extras);
}
PG_RETURN_VOID();
}
Datum
gistmarkpos(PG_FUNCTION_ARGS)
void
gistendscan(IndexScanDesc scan)
{
elog(ERROR, "GiST does not support mark/restore");
PG_RETURN_VOID();
}
Datum
gistrestrpos(PG_FUNCTION_ARGS)
{
elog(ERROR, "GiST does not support mark/restore");
PG_RETURN_VOID();
}
Datum
gistendscan(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
GISTScanOpaque so = (GISTScanOpaque) scan->opaque;
/*
@@ -343,6 +320,4 @@ gistendscan(PG_FUNCTION_ARGS)
* as well as the queueCxt if there is a separate context for it.
*/
freeGISTstate(so->giststate);
PG_RETURN_VOID();
}

View File

@@ -808,11 +808,9 @@ gistNewBuffer(Relation r)
return buffer;
}
Datum
gistoptions(PG_FUNCTION_ARGS)
bytea *
gistoptions(Datum reloptions, bool validate)
{
Datum reloptions = PG_GETARG_DATUM(0);
bool validate = PG_GETARG_BOOL(1);
relopt_value *options;
GiSTOptions *rdopts;
int numoptions;
@@ -826,7 +824,7 @@ gistoptions(PG_FUNCTION_ARGS)
/* if none set, we're done */
if (numoptions == 0)
PG_RETURN_NULL();
return NULL;
rdopts = allocateReloptStruct(sizeof(GiSTOptions), options, numoptions);
@@ -835,8 +833,7 @@ gistoptions(PG_FUNCTION_ARGS)
pfree(options);
PG_RETURN_BYTEA_P(rdopts);
return (bytea *) rdopts;
}
/*

View File

@@ -25,11 +25,9 @@
/*
* VACUUM cleanup: update FSM
*/
Datum
gistvacuumcleanup(PG_FUNCTION_ARGS)
IndexBulkDeleteResult *
gistvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
Relation rel = info->index;
BlockNumber npages,
blkno;
@@ -38,7 +36,7 @@ gistvacuumcleanup(PG_FUNCTION_ARGS)
/* No-op in ANALYZE ONLY mode */
if (info->analyze_only)
PG_RETURN_POINTER(stats);
return stats;
/* Set up all-zero stats if gistbulkdelete wasn't called */
if (stats == NULL)
@@ -98,7 +96,7 @@ gistvacuumcleanup(PG_FUNCTION_ARGS)
if (needLock)
UnlockRelationForExtension(rel, ExclusiveLock);
PG_RETURN_POINTER(stats);
return stats;
}
typedef struct GistBDItem
@@ -137,13 +135,10 @@ pushStackIfSplited(Page page, GistBDItem *stack)
*
* Result: a palloc'd struct containing statistical info for VACUUM displays.
*/
Datum
gistbulkdelete(PG_FUNCTION_ARGS)
IndexBulkDeleteResult *
gistbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
IndexBulkDeleteCallback callback, void *callback_state)
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
void *callback_state = (void *) PG_GETARG_POINTER(3);
Relation rel = info->index;
GistBDItem *stack,
*ptr;
@@ -276,5 +271,5 @@ gistbulkdelete(PG_FUNCTION_ARGS)
vacuum_delay_point();
}
PG_RETURN_POINTER(stats);
return stats;
}

View File

@@ -0,0 +1,133 @@
/*-------------------------------------------------------------------------
*
* gistvalidate.c
* Opclass validator for GiST.
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/gist/gistvalidate.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/gist_private.h"
#include "access/htup_details.h"
#include "catalog/pg_amop.h"
#include "catalog/pg_amproc.h"
#include "catalog/pg_opclass.h"
#include "utils/catcache.h"
#include "utils/syscache.h"
/*
* Validator for a GiST opclass.
*/
bool
gistvalidate(Oid opclassoid)
{
HeapTuple classtup;
Form_pg_opclass classform;
Oid opfamilyoid;
Oid opcintype;
int numclassops;
int32 classfuncbits;
CatCList *proclist,
*oprlist;
int i;
/* Fetch opclass information */
classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
if (!HeapTupleIsValid(classtup))
elog(ERROR, "cache lookup failed for operator class %u", opclassoid);
classform = (Form_pg_opclass) GETSTRUCT(classtup);
opfamilyoid = classform->opcfamily;
opcintype = classform->opcintype;
ReleaseSysCache(classtup);
/* Fetch all operators and support functions of the opfamily */
oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid));
/* We'll track the ops and functions belonging to the named opclass */
numclassops = 0;
classfuncbits = 0;
/* Check support functions */
for (i = 0; i < proclist->n_members; i++)
{
HeapTuple proctup = &proclist->members[i]->tuple;
Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
/* Check that only allowed procedure numbers exist */
if (procform->amprocnum < 1 ||
procform->amprocnum > GISTNProcs)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("gist opfamily %u contains invalid support number %d for procedure %u",
opfamilyoid,
procform->amprocnum, procform->amproc)));
/* Remember functions that are specifically for the named opclass */
if (procform->amproclefttype == opcintype &&
procform->amprocrighttype == opcintype)
classfuncbits |= (1 << procform->amprocnum);
}
/* Check operators */
for (i = 0; i < oprlist->n_members; i++)
{
HeapTuple oprtup = &oprlist->members[i]->tuple;
Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
/* TODO: Check that only allowed strategy numbers exist */
if (oprform->amopstrategy < 1)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("gist opfamily %u contains invalid strategy number %d for operator %u",
opfamilyoid,
oprform->amopstrategy, oprform->amopopr)));
/* GiST supports ORDER BY operators, but must have distance proc */
if (oprform->amoppurpose != AMOP_SEARCH &&
oprform->amoplefttype == opcintype &&
oprform->amoprighttype == opcintype &&
(classfuncbits & (1 << GIST_DISTANCE_PROC)) == 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("gist opfamily %u contains unsupported ORDER BY specification for operator %u",
opfamilyoid, oprform->amopopr)));
/* Count operators that are specifically for the named opclass */
/* XXX we consider only lefttype here */
if (oprform->amoplefttype == opcintype)
numclassops++;
}
/* Check that the named opclass is complete */
if (numclassops == 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("gist opclass %u is missing operator(s)",
opclassoid)));
for (i = 1; i <= GISTNProcs; i++)
{
if ((classfuncbits & (1 << i)) != 0)
continue; /* got it */
if (i == GIST_DISTANCE_PROC || i == GIST_FETCH_PROC)
continue; /* optional methods */
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("gist opclass %u is missing required support function %d",
opclassoid, i)));
}
ReleaseCatCacheList(proclist);
ReleaseCatCacheList(oprlist);
return true;
}

View File

@@ -13,6 +13,6 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = hash.o hashfunc.o hashinsert.o hashovfl.o hashpage.o hashscan.o \
hashsearch.o hashsort.o hashutil.o
hashsearch.o hashsort.o hashutil.o hashvalidate.o
include $(top_srcdir)/src/backend/common.mk

View File

@@ -22,9 +22,8 @@
#include "access/relscan.h"
#include "catalog/index.h"
#include "commands/vacuum.h"
#include "optimizer/cost.h"
#include "optimizer/plancat.h"
#include "storage/bufmgr.h"
#include "utils/index_selfuncs.h"
#include "utils/rel.h"
@@ -44,14 +43,55 @@ static void hashbuildCallback(Relation index,
/*
* hashbuild() -- build a new hash index.
* Hash handler function: return IndexAmRoutine with access method parameters
* and callbacks.
*/
Datum
hashbuild(PG_FUNCTION_ARGS)
hashhandler(PG_FUNCTION_ARGS)
{
IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
amroutine->amstrategies = 1;
amroutine->amsupport = 1;
amroutine->amcanorder = false;
amroutine->amcanorderbyop = false;
amroutine->amcanbackward = true;
amroutine->amcanunique = false;
amroutine->amcanmulticol = false;
amroutine->amoptionalkey = false;
amroutine->amsearcharray = false;
amroutine->amsearchnulls = false;
amroutine->amstorage = false;
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
amroutine->amkeytype = INT4OID;
amroutine->ambuild = hashbuild;
amroutine->ambuildempty = hashbuildempty;
amroutine->aminsert = hashinsert;
amroutine->ambulkdelete = hashbulkdelete;
amroutine->amvacuumcleanup = hashvacuumcleanup;
amroutine->amcanreturn = NULL;
amroutine->amcostestimate = hashcostestimate;
amroutine->amoptions = hashoptions;
amroutine->amvalidate = hashvalidate;
amroutine->ambeginscan = hashbeginscan;
amroutine->amrescan = hashrescan;
amroutine->amgettuple = hashgettuple;
amroutine->amgetbitmap = hashgetbitmap;
amroutine->amendscan = hashendscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
PG_RETURN_POINTER(amroutine);
}
/*
* hashbuild() -- build a new hash index.
*/
IndexBuildResult *
hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
{
Relation heap = (Relation) PG_GETARG_POINTER(0);
Relation index = (Relation) PG_GETARG_POINTER(1);
IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
IndexBuildResult *result;
BlockNumber relpages;
double reltuples;
@@ -112,20 +152,16 @@ hashbuild(PG_FUNCTION_ARGS)
result->heap_tuples = reltuples;
result->index_tuples = buildstate.indtuples;
PG_RETURN_POINTER(result);
return result;
}
/*
* hashbuildempty() -- build an empty hash index in the initialization fork
*/
Datum
hashbuildempty(PG_FUNCTION_ARGS)
void
hashbuildempty(Relation index)
{
Relation index = (Relation) PG_GETARG_POINTER(0);
_hash_metapinit(index, 0, INIT_FORKNUM);
PG_RETURN_VOID();
}
/*
@@ -167,18 +203,11 @@ hashbuildCallback(Relation index,
* Hash on the heap tuple's key, form an index tuple with hash code.
* Find the appropriate location for the new tuple, and put it there.
*/
Datum
hashinsert(PG_FUNCTION_ARGS)
bool
hashinsert(Relation rel, Datum *values, bool *isnull,
ItemPointer ht_ctid, Relation heapRel,
IndexUniqueCheck checkUnique)
{
Relation rel = (Relation) PG_GETARG_POINTER(0);
Datum *values = (Datum *) PG_GETARG_POINTER(1);
bool *isnull = (bool *) PG_GETARG_POINTER(2);
ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
#ifdef NOT_USED
Relation heapRel = (Relation) PG_GETARG_POINTER(4);
IndexUniqueCheck checkUnique = (IndexUniqueCheck) PG_GETARG_INT32(5);
#endif
IndexTuple itup;
/*
@@ -191,7 +220,7 @@ hashinsert(PG_FUNCTION_ARGS)
* chosen in 1986, not of the way nulls are handled here.
*/
if (isnull[0])
PG_RETURN_BOOL(false);
return false;
/* generate an index tuple */
itup = _hash_form_tuple(rel, values, isnull);
@@ -201,18 +230,16 @@ hashinsert(PG_FUNCTION_ARGS)
pfree(itup);
PG_RETURN_BOOL(false);
return false;
}
/*
* hashgettuple() -- Get the next tuple in the scan.
*/
Datum
hashgettuple(PG_FUNCTION_ARGS)
bool
hashgettuple(IndexScanDesc scan, ScanDirection dir)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
HashScanOpaque so = (HashScanOpaque) scan->opaque;
Relation rel = scan->indexRelation;
Buffer buf;
@@ -314,18 +341,16 @@ hashgettuple(PG_FUNCTION_ARGS)
/* Return current heap TID on success */
scan->xs_ctup.t_self = so->hashso_heappos;
PG_RETURN_BOOL(res);
return res;
}
/*
* hashgetbitmap() -- get all tuples at once
*/
Datum
hashgetbitmap(PG_FUNCTION_ARGS)
int64
hashgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
TIDBitmap *tbm = (TIDBitmap *) PG_GETARG_POINTER(1);
HashScanOpaque so = (HashScanOpaque) scan->opaque;
bool res;
int64 ntids = 0;
@@ -362,19 +387,16 @@ hashgetbitmap(PG_FUNCTION_ARGS)
res = _hash_next(scan, ForwardScanDirection);
}
PG_RETURN_INT64(ntids);
return ntids;
}
/*
* hashbeginscan() -- start a scan on a hash index
*/
Datum
hashbeginscan(PG_FUNCTION_ARGS)
IndexScanDesc
hashbeginscan(Relation rel, int nkeys, int norderbys)
{
Relation rel = (Relation) PG_GETARG_POINTER(0);
int nkeys = PG_GETARG_INT32(1);
int norderbys = PG_GETARG_INT32(2);
IndexScanDesc scan;
HashScanOpaque so;
@@ -396,19 +418,16 @@ hashbeginscan(PG_FUNCTION_ARGS)
/* register scan in case we change pages it's using */
_hash_regscan(scan);
PG_RETURN_POINTER(scan);
return scan;
}
/*
* hashrescan() -- rescan an index relation
*/
Datum
hashrescan(PG_FUNCTION_ARGS)
void
hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
ScanKey orderbys, int norderbys)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanKey scankey = (ScanKey) PG_GETARG_POINTER(1);
/* remaining arguments are ignored */
HashScanOpaque so = (HashScanOpaque) scan->opaque;
Relation rel = scan->indexRelation;
@@ -434,17 +453,14 @@ hashrescan(PG_FUNCTION_ARGS)
scan->numberOfKeys * sizeof(ScanKeyData));
so->hashso_bucket_valid = false;
}
PG_RETURN_VOID();
}
/*
* hashendscan() -- close down a scan
*/
Datum
hashendscan(PG_FUNCTION_ARGS)
void
hashendscan(IndexScanDesc scan)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
HashScanOpaque so = (HashScanOpaque) scan->opaque;
Relation rel = scan->indexRelation;
@@ -463,28 +479,6 @@ hashendscan(PG_FUNCTION_ARGS)
pfree(so);
scan->opaque = NULL;
PG_RETURN_VOID();
}
/*
* hashmarkpos() -- save current scan position
*/
Datum
hashmarkpos(PG_FUNCTION_ARGS)
{
elog(ERROR, "hash does not support mark/restore");
PG_RETURN_VOID();
}
/*
* hashrestrpos() -- restore scan to last saved position
*/
Datum
hashrestrpos(PG_FUNCTION_ARGS)
{
elog(ERROR, "hash does not support mark/restore");
PG_RETURN_VOID();
}
/*
@@ -494,13 +488,10 @@ hashrestrpos(PG_FUNCTION_ARGS)
*
* Result: a palloc'd struct containing statistical info for VACUUM displays.
*/
Datum
hashbulkdelete(PG_FUNCTION_ARGS)
IndexBulkDeleteResult *
hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
IndexBulkDeleteCallback callback, void *callback_state)
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
void *callback_state = (void *) PG_GETARG_POINTER(3);
Relation rel = info->index;
double tuples_removed;
double num_index_tuples;
@@ -670,7 +661,7 @@ loop_top:
stats->tuples_removed += tuples_removed;
/* hashvacuumcleanup will fill in num_pages */
PG_RETURN_POINTER(stats);
return stats;
}
/*
@@ -678,24 +669,22 @@ loop_top:
*
* Result: a palloc'd struct containing statistical info for VACUUM displays.
*/
Datum
hashvacuumcleanup(PG_FUNCTION_ARGS)
IndexBulkDeleteResult *
hashvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
Relation rel = info->index;
BlockNumber num_pages;
/* If hashbulkdelete wasn't called, return NULL signifying no change */
/* Note: this covers the analyze_only case too */
if (stats == NULL)
PG_RETURN_POINTER(NULL);
return NULL;
/* update statistics */
num_pages = RelationGetNumberOfBlocks(rel);
stats->num_pages = num_pages;
PG_RETURN_POINTER(stats);
return stats;
}

View File

@@ -217,18 +217,10 @@ _hash_checkpage(Relation rel, Buffer buf, int flags)
}
}
Datum
hashoptions(PG_FUNCTION_ARGS)
bytea *
hashoptions(Datum reloptions, bool validate)
{
Datum reloptions = PG_GETARG_DATUM(0);
bool validate = PG_GETARG_BOOL(1);
bytea *result;
result = default_reloptions(reloptions, validate, RELOPT_KIND_HASH);
if (result)
PG_RETURN_BYTEA_P(result);
PG_RETURN_NULL();
return default_reloptions(reloptions, validate, RELOPT_KIND_HASH);
}
/*

View File

@@ -0,0 +1,157 @@
/*-------------------------------------------------------------------------
*
* hashvalidate.c
* Opclass validator for hash.
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/hash/hashvalidate.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/hash.h"
#include "access/htup_details.h"
#include "catalog/pg_amop.h"
#include "catalog/pg_amproc.h"
#include "catalog/pg_opclass.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/syscache.h"
/*
* Validator for a hash opclass.
*
* Some of the checks done here cover the whole opfamily, and therefore are
* redundant when checking each opclass in a family. But they don't run long
* enough to be much of a problem, so we accept the duplication rather than
* complicate the amvalidate API.
*
* Some of the code here relies on the fact that hash has only one operator
* strategy and support function; we don't have to check for incomplete sets.
*/
bool
hashvalidate(Oid opclassoid)
{
HeapTuple classtup;
Form_pg_opclass classform;
Oid opfamilyoid;
Oid opcintype;
int numclassops;
int32 classfuncbits;
CatCList *proclist,
*oprlist;
int i,
j;
/* Fetch opclass information */
classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
if (!HeapTupleIsValid(classtup))
elog(ERROR, "cache lookup failed for operator class %u", opclassoid);
classform = (Form_pg_opclass) GETSTRUCT(classtup);
opfamilyoid = classform->opcfamily;
opcintype = classform->opcintype;
ReleaseSysCache(classtup);
/* Fetch all operators and support functions of the opfamily */
oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid));
/* We'll track the ops and functions belonging to the named opclass */
numclassops = 0;
classfuncbits = 0;
/* Check support functions */
for (i = 0; i < proclist->n_members; i++)
{
HeapTuple proctup = &proclist->members[i]->tuple;
Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
/* Check that only allowed procedure numbers exist */
if (procform->amprocnum != HASHPROC)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("hash opfamily %u contains invalid support number %d for procedure %u",
opfamilyoid,
procform->amprocnum, procform->amproc)));
/* Remember functions that are specifically for the named opclass */
if (procform->amproclefttype == opcintype &&
procform->amprocrighttype == opcintype)
classfuncbits |= (1 << procform->amprocnum);
}
/* Check operators */
for (i = 0; i < oprlist->n_members; i++)
{
HeapTuple oprtup = &oprlist->members[i]->tuple;
Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
bool leftFound = false,
rightFound = false;
/* Check that only allowed strategy numbers exist */
if (oprform->amopstrategy < 1 ||
oprform->amopstrategy > HTMaxStrategyNumber)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("hash opfamily %u contains invalid strategy number %d for operator %u",
opfamilyoid,
oprform->amopstrategy, oprform->amopopr)));
/*
* There should be relevant hash procedures for each operator
*/
for (j = 0; j < proclist->n_members; j++)
{
HeapTuple proctup = &proclist->members[j]->tuple;
Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
if (procform->amproclefttype == oprform->amoplefttype)
leftFound = true;
if (procform->amproclefttype == oprform->amoprighttype)
rightFound = true;
}
if (!leftFound || !rightFound)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("hash opfamily %u lacks support function for operator %u",
opfamilyoid, oprform->amopopr)));
/* hash doesn't support ORDER BY operators */
if (oprform->amoppurpose != AMOP_SEARCH ||
OidIsValid(oprform->amopsortfamily))
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("hash opfamily %u contains invalid ORDER BY specification for operator %u",
opfamilyoid, oprform->amopopr)));
/* Count operators that are specifically for the named opclass */
if (oprform->amoplefttype == opcintype &&
oprform->amoprighttype == opcintype)
numclassops++;
}
/* Check that the named opclass is complete */
if (numclassops != HTMaxStrategyNumber)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("hash opclass %u is missing operator(s)",
opclassoid)));
if ((classfuncbits & (1 << HASHPROC)) == 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("hash opclass %u is missing required support function",
opclassoid)));
ReleaseCatCacheList(proclist);
ReleaseCatCacheList(oprlist);
return true;
}

View File

@@ -12,6 +12,6 @@ subdir = src/backend/access/index
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = genam.o indexam.o
OBJS = amapi.o genam.o indexam.o
include $(top_srcdir)/src/backend/common.mk

View File

@@ -0,0 +1,114 @@
/*-------------------------------------------------------------------------
*
* amapi.c
* Support routines for API for Postgres index access methods.
*
* Copyright (c) 2015-2016, PostgreSQL Global Development Group
*
*
* IDENTIFICATION
* src/backend/access/index/amapi.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/amapi.h"
#include "access/htup_details.h"
#include "catalog/pg_am.h"
#include "catalog/pg_opclass.h"
#include "utils/syscache.h"
/*
* GetIndexAmRoutine - call the specified access method handler routine to get
* its IndexAmRoutine struct, which will be palloc'd in the caller's context.
*
* Note that if the amhandler function is built-in, this will not involve
* any catalog access. It's therefore safe to use this while bootstrapping
* indexes for the system catalogs. relcache.c relies on that.
*/
IndexAmRoutine *
GetIndexAmRoutine(Oid amhandler)
{
Datum datum;
IndexAmRoutine *routine;
datum = OidFunctionCall0(amhandler);
routine = (IndexAmRoutine *) DatumGetPointer(datum);
if (routine == NULL || !IsA(routine, IndexAmRoutine))
elog(ERROR, "index access method handler function %u did not return an IndexAmRoutine struct",
amhandler);
return routine;
}
/*
* GetIndexAmRoutineByAmId - look up the handler of the index access method
* with the given OID, and get its IndexAmRoutine struct.
*/
IndexAmRoutine *
GetIndexAmRoutineByAmId(Oid amoid)
{
HeapTuple tuple;
Form_pg_am amform;
regproc amhandler;
/* Get handler function OID for the access method */
tuple = SearchSysCache1(AMOID, ObjectIdGetDatum(amoid));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for access method %u",
amoid);
amform = (Form_pg_am) GETSTRUCT(tuple);
amhandler = amform->amhandler;
/* Complain if handler OID is invalid */
if (!RegProcedureIsValid(amhandler))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("index access method \"%s\" does not have a handler",
NameStr(amform->amname))));
ReleaseSysCache(tuple);
/* And finally, call the handler function to get the API struct. */
return GetIndexAmRoutine(amhandler);
}
/*
* Ask appropriate access method to validate the specified opclass.
*/
Datum
amvalidate(PG_FUNCTION_ARGS)
{
Oid opclassoid = PG_GETARG_OID(0);
bool result;
HeapTuple classtup;
Form_pg_opclass classform;
Oid amoid;
IndexAmRoutine *amroutine;
classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
if (!HeapTupleIsValid(classtup))
elog(ERROR, "cache lookup failed for operator class %u", opclassoid);
classform = (Form_pg_opclass) GETSTRUCT(classtup);
amoid = classform->opcmethod;
ReleaseSysCache(classtup);
amroutine = GetIndexAmRoutineByAmId(amoid);
if (amroutine->amvalidate == NULL)
elog(ERROR, "function amvalidate is not defined for index access method %u",
amoid);
result = amroutine->amvalidate(opclassoid);
pfree(amroutine);
PG_RETURN_BOOL(result);
}

View File

@@ -65,12 +65,12 @@
#include "postgres.h"
#include "access/amapi.h"
#include "access/relscan.h"
#include "access/transam.h"
#include "access/xlog.h"
#include "catalog/index.h"
#include "catalog/catalog.h"
#include "catalog/index.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
@@ -94,7 +94,7 @@
#define RELATION_CHECKS \
( \
AssertMacro(RelationIsValid(indexRelation)), \
AssertMacro(PointerIsValid(indexRelation->rd_am)), \
AssertMacro(PointerIsValid(indexRelation->rd_amroutine)), \
AssertMacro(!ReindexIsProcessingIndex(RelationGetRelid(indexRelation))) \
)
@@ -102,38 +102,21 @@
( \
AssertMacro(IndexScanIsValid(scan)), \
AssertMacro(RelationIsValid(scan->indexRelation)), \
AssertMacro(PointerIsValid(scan->indexRelation->rd_am)) \
AssertMacro(PointerIsValid(scan->indexRelation->rd_amroutine)) \
)
#define GET_REL_PROCEDURE(pname) \
#define CHECK_REL_PROCEDURE(pname) \
do { \
procedure = &indexRelation->rd_aminfo->pname; \
if (!OidIsValid(procedure->fn_oid)) \
{ \
RegProcedure procOid = indexRelation->rd_am->pname; \
if (!RegProcedureIsValid(procOid)) \
elog(ERROR, "invalid %s regproc", CppAsString(pname)); \
fmgr_info_cxt(procOid, procedure, indexRelation->rd_indexcxt); \
} \
if (indexRelation->rd_amroutine->pname == NULL) \
elog(ERROR, "function %s is not defined for index %s", \
CppAsString(pname), RelationGetRelationName(indexRelation)); \
} while(0)
#define GET_UNCACHED_REL_PROCEDURE(pname) \
#define CHECK_SCAN_PROCEDURE(pname) \
do { \
if (!RegProcedureIsValid(indexRelation->rd_am->pname)) \
elog(ERROR, "invalid %s regproc", CppAsString(pname)); \
fmgr_info(indexRelation->rd_am->pname, &procedure); \
} while(0)
#define GET_SCAN_PROCEDURE(pname) \
do { \
procedure = &scan->indexRelation->rd_aminfo->pname; \
if (!OidIsValid(procedure->fn_oid)) \
{ \
RegProcedure procOid = scan->indexRelation->rd_am->pname; \
if (!RegProcedureIsValid(procOid)) \
elog(ERROR, "invalid %s regproc", CppAsString(pname)); \
fmgr_info_cxt(procOid, procedure, scan->indexRelation->rd_indexcxt); \
} \
if (scan->indexRelation->rd_amroutine->pname == NULL) \
elog(ERROR, "function %s is not defined for index %s", \
CppAsString(pname), RelationGetRelationName(scan->indexRelation)); \
} while(0)
static IndexScanDesc index_beginscan_internal(Relation indexRelation,
@@ -210,26 +193,17 @@ index_insert(Relation indexRelation,
Relation heapRelation,
IndexUniqueCheck checkUnique)
{
FmgrInfo *procedure;
RELATION_CHECKS;
GET_REL_PROCEDURE(aminsert);
CHECK_REL_PROCEDURE(aminsert);
if (!(indexRelation->rd_am->ampredlocks))
if (!(indexRelation->rd_amroutine->ampredlocks))
CheckForSerializableConflictIn(indexRelation,
(HeapTuple) NULL,
InvalidBuffer);
/*
* have the am's insert proc do all the work.
*/
return DatumGetBool(FunctionCall6(procedure,
PointerGetDatum(indexRelation),
PointerGetDatum(values),
PointerGetDatum(isnull),
PointerGetDatum(heap_t_ctid),
PointerGetDatum(heapRelation),
Int32GetDatum((int32) checkUnique)));
return indexRelation->rd_amroutine->aminsert(indexRelation, values, isnull,
heap_t_ctid, heapRelation,
checkUnique);
}
/*
@@ -288,13 +262,10 @@ static IndexScanDesc
index_beginscan_internal(Relation indexRelation,
int nkeys, int norderbys, Snapshot snapshot)
{
IndexScanDesc scan;
FmgrInfo *procedure;
RELATION_CHECKS;
GET_REL_PROCEDURE(ambeginscan);
CHECK_REL_PROCEDURE(ambeginscan);
if (!(indexRelation->rd_am->ampredlocks))
if (!(indexRelation->rd_amroutine->ampredlocks))
PredicateLockRelation(indexRelation, snapshot);
/*
@@ -305,13 +276,8 @@ index_beginscan_internal(Relation indexRelation,
/*
* Tell the AM to open a scan.
*/
scan = (IndexScanDesc)
DatumGetPointer(FunctionCall3(procedure,
PointerGetDatum(indexRelation),
Int32GetDatum(nkeys),
Int32GetDatum(norderbys)));
return scan;
return indexRelation->rd_amroutine->ambeginscan(indexRelation, nkeys,
norderbys);
}
/* ----------------
@@ -331,10 +297,8 @@ index_rescan(IndexScanDesc scan,
ScanKey keys, int nkeys,
ScanKey orderbys, int norderbys)
{
FmgrInfo *procedure;
SCAN_CHECKS;
GET_SCAN_PROCEDURE(amrescan);
CHECK_SCAN_PROCEDURE(amrescan);
Assert(nkeys == scan->numberOfKeys);
Assert(norderbys == scan->numberOfOrderBys);
@@ -350,12 +314,8 @@ index_rescan(IndexScanDesc scan,
scan->kill_prior_tuple = false; /* for safety */
FunctionCall5(procedure,
PointerGetDatum(scan),
PointerGetDatum(keys),
Int32GetDatum(nkeys),
PointerGetDatum(orderbys),
Int32GetDatum(norderbys));
scan->indexRelation->rd_amroutine->amrescan(scan, keys, nkeys,
orderbys, norderbys);
}
/* ----------------
@@ -365,10 +325,8 @@ index_rescan(IndexScanDesc scan,
void
index_endscan(IndexScanDesc scan)
{
FmgrInfo *procedure;
SCAN_CHECKS;
GET_SCAN_PROCEDURE(amendscan);
CHECK_SCAN_PROCEDURE(amendscan);
/* Release any held pin on a heap page */
if (BufferIsValid(scan->xs_cbuf))
@@ -378,7 +336,7 @@ index_endscan(IndexScanDesc scan)
}
/* End the AM's scan */
FunctionCall1(procedure, PointerGetDatum(scan));
scan->indexRelation->rd_amroutine->amendscan(scan);
/* Release index refcount acquired by index_beginscan */
RelationDecrementReferenceCount(scan->indexRelation);
@@ -394,12 +352,10 @@ index_endscan(IndexScanDesc scan)
void
index_markpos(IndexScanDesc scan)
{
FmgrInfo *procedure;
SCAN_CHECKS;
GET_SCAN_PROCEDURE(ammarkpos);
CHECK_SCAN_PROCEDURE(ammarkpos);
FunctionCall1(procedure, PointerGetDatum(scan));
scan->indexRelation->rd_amroutine->ammarkpos(scan);
}
/* ----------------
@@ -421,18 +377,16 @@ index_markpos(IndexScanDesc scan)
void
index_restrpos(IndexScanDesc scan)
{
FmgrInfo *procedure;
Assert(IsMVCCSnapshot(scan->xs_snapshot));
SCAN_CHECKS;
GET_SCAN_PROCEDURE(amrestrpos);
CHECK_SCAN_PROCEDURE(amrestrpos);
scan->xs_continue_hot = false;
scan->kill_prior_tuple = false; /* for safety */
FunctionCall1(procedure, PointerGetDatum(scan));
scan->indexRelation->rd_amroutine->amrestrpos(scan);
}
/* ----------------
@@ -445,11 +399,10 @@ index_restrpos(IndexScanDesc scan)
ItemPointer
index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
{
FmgrInfo *procedure;
bool found;
SCAN_CHECKS;
GET_SCAN_PROCEDURE(amgettuple);
CHECK_SCAN_PROCEDURE(amgettuple);
Assert(TransactionIdIsValid(RecentGlobalXmin));
@@ -459,9 +412,7 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
* scan->xs_recheck and possibly scan->xs_itup, though we pay no attention
* to those fields here.
*/
found = DatumGetBool(FunctionCall2(procedure,
PointerGetDatum(scan),
Int32GetDatum(direction)));
found = scan->indexRelation->rd_amroutine->amgettuple(scan, direction);
/* Reset kill flag immediately for safety */
scan->kill_prior_tuple = false;
@@ -635,12 +586,10 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
int64
index_getbitmap(IndexScanDesc scan, TIDBitmap *bitmap)
{
FmgrInfo *procedure;
int64 ntids;
Datum d;
SCAN_CHECKS;
GET_SCAN_PROCEDURE(amgetbitmap);
CHECK_SCAN_PROCEDURE(amgetbitmap);
/* just make sure this is false... */
scan->kill_prior_tuple = false;
@@ -648,16 +597,7 @@ index_getbitmap(IndexScanDesc scan, TIDBitmap *bitmap)
/*
* have the am's getbitmap proc do all the work.
*/
d = FunctionCall2(procedure,
PointerGetDatum(scan),
PointerGetDatum(bitmap));
ntids = DatumGetInt64(d);
/* If int8 is pass-by-ref, must free the result to avoid memory leak */
#ifndef USE_FLOAT8_BYVAL
pfree(DatumGetPointer(d));
#endif
ntids = scan->indexRelation->rd_amroutine->amgetbitmap(scan, bitmap);
pgstat_count_index_tuples(scan->indexRelation, ntids);
@@ -680,20 +620,12 @@ index_bulk_delete(IndexVacuumInfo *info,
void *callback_state)
{
Relation indexRelation = info->index;
FmgrInfo procedure;
IndexBulkDeleteResult *result;
RELATION_CHECKS;
GET_UNCACHED_REL_PROCEDURE(ambulkdelete);
CHECK_REL_PROCEDURE(ambulkdelete);
result = (IndexBulkDeleteResult *)
DatumGetPointer(FunctionCall4(&procedure,
PointerGetDatum(info),
PointerGetDatum(stats),
PointerGetDatum((Pointer) callback),
PointerGetDatum(callback_state)));
return result;
return indexRelation->rd_amroutine->ambulkdelete(info, stats,
callback, callback_state);
}
/* ----------------
@@ -707,18 +639,11 @@ index_vacuum_cleanup(IndexVacuumInfo *info,
IndexBulkDeleteResult *stats)
{
Relation indexRelation = info->index;
FmgrInfo procedure;
IndexBulkDeleteResult *result;
RELATION_CHECKS;
GET_UNCACHED_REL_PROCEDURE(amvacuumcleanup);
CHECK_REL_PROCEDURE(amvacuumcleanup);
result = (IndexBulkDeleteResult *)
DatumGetPointer(FunctionCall2(&procedure,
PointerGetDatum(info),
PointerGetDatum(stats)));
return result;
return indexRelation->rd_amroutine->amvacuumcleanup(info, stats);
}
/* ----------------
@@ -731,19 +656,13 @@ index_vacuum_cleanup(IndexVacuumInfo *info,
bool
index_can_return(Relation indexRelation, int attno)
{
FmgrInfo *procedure;
RELATION_CHECKS;
/* amcanreturn is optional; assume FALSE if not provided by AM */
if (!RegProcedureIsValid(indexRelation->rd_am->amcanreturn))
if (indexRelation->rd_amroutine->amcanreturn == NULL)
return false;
GET_REL_PROCEDURE(amcanreturn);
return DatumGetBool(FunctionCall2(procedure,
PointerGetDatum(indexRelation),
Int32GetDatum(attno)));
return indexRelation->rd_amroutine->amcanreturn(indexRelation, attno);
}
/* ----------------
@@ -781,7 +700,7 @@ index_getprocid(Relation irel,
int nproc;
int procindex;
nproc = irel->rd_am->amsupport;
nproc = irel->rd_amroutine->amsupport;
Assert(procnum > 0 && procnum <= (uint16) nproc);
@@ -815,7 +734,7 @@ index_getprocinfo(Relation irel,
int nproc;
int procindex;
nproc = irel->rd_am->amsupport;
nproc = irel->rd_amroutine->amsupport;
Assert(procnum > 0 && procnum <= (uint16) nproc);

View File

@@ -13,6 +13,6 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = nbtcompare.o nbtinsert.o nbtpage.o nbtree.o nbtsearch.o \
nbtutils.o nbtsort.o nbtxlog.o
nbtutils.o nbtsort.o nbtvalidate.o nbtxlog.o
include $(top_srcdir)/src/backend/common.mk

View File

@@ -28,7 +28,7 @@
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/smgr.h"
#include "tcop/tcopprot.h"
#include "utils/index_selfuncs.h"
#include "utils/memutils.h"
@@ -77,14 +77,55 @@ static void btvacuumpage(BTVacState *vstate, BlockNumber blkno,
/*
* btbuild() -- build a new btree index.
* Btree handler function: return IndexAmRoutine with access method parameters
* and callbacks.
*/
Datum
btbuild(PG_FUNCTION_ARGS)
bthandler(PG_FUNCTION_ARGS)
{
IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
amroutine->amstrategies = 5;
amroutine->amsupport = 2;
amroutine->amcanorder = true;
amroutine->amcanorderbyop = false;
amroutine->amcanbackward = true;
amroutine->amcanunique = true;
amroutine->amcanmulticol = true;
amroutine->amoptionalkey = true;
amroutine->amsearcharray = true;
amroutine->amsearchnulls = true;
amroutine->amstorage = false;
amroutine->amclusterable = true;
amroutine->ampredlocks = true;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = btbuild;
amroutine->ambuildempty = btbuildempty;
amroutine->aminsert = btinsert;
amroutine->ambulkdelete = btbulkdelete;
amroutine->amvacuumcleanup = btvacuumcleanup;
amroutine->amcanreturn = btcanreturn;
amroutine->amcostestimate = btcostestimate;
amroutine->amoptions = btoptions;
amroutine->amvalidate = btvalidate;
amroutine->ambeginscan = btbeginscan;
amroutine->amrescan = btrescan;
amroutine->amgettuple = btgettuple;
amroutine->amgetbitmap = btgetbitmap;
amroutine->amendscan = btendscan;
amroutine->ammarkpos = btmarkpos;
amroutine->amrestrpos = btrestrpos;
PG_RETURN_POINTER(amroutine);
}
/*
* btbuild() -- build a new btree index.
*/
IndexBuildResult *
btbuild(Relation heap, Relation index, IndexInfo *indexInfo)
{
Relation heap = (Relation) PG_GETARG_POINTER(0);
Relation index = (Relation) PG_GETARG_POINTER(1);
IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
IndexBuildResult *result;
double reltuples;
BTBuildState buildstate;
@@ -156,7 +197,7 @@ btbuild(PG_FUNCTION_ARGS)
result->heap_tuples = reltuples;
result->index_tuples = buildstate.indtuples;
PG_RETURN_POINTER(result);
return result;
}
/*
@@ -191,10 +232,9 @@ btbuildCallback(Relation index,
/*
* btbuildempty() -- build an empty btree index in the initialization fork
*/
Datum
btbuildempty(PG_FUNCTION_ARGS)
void
btbuildempty(Relation index)
{
Relation index = (Relation) PG_GETARG_POINTER(0);
Page metapage;
/* Construct metapage. */
@@ -215,8 +255,6 @@ btbuildempty(PG_FUNCTION_ARGS)
* checkpoint may have moved the redo pointer past our xlog record.
*/
smgrimmedsync(index->rd_smgr, INIT_FORKNUM);
PG_RETURN_VOID();
}
/*
@@ -225,15 +263,11 @@ btbuildempty(PG_FUNCTION_ARGS)
* Descend the tree recursively, find the appropriate location for our
* new tuple, and put it there.
*/
Datum
btinsert(PG_FUNCTION_ARGS)
bool
btinsert(Relation rel, Datum *values, bool *isnull,
ItemPointer ht_ctid, Relation heapRel,
IndexUniqueCheck checkUnique)
{
Relation rel = (Relation) PG_GETARG_POINTER(0);
Datum *values = (Datum *) PG_GETARG_POINTER(1);
bool *isnull = (bool *) PG_GETARG_POINTER(2);
ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
Relation heapRel = (Relation) PG_GETARG_POINTER(4);
IndexUniqueCheck checkUnique = (IndexUniqueCheck) PG_GETARG_INT32(5);
bool result;
IndexTuple itup;
@@ -245,17 +279,15 @@ btinsert(PG_FUNCTION_ARGS)
pfree(itup);
PG_RETURN_BOOL(result);
return result;
}
/*
* btgettuple() -- Get the next tuple in the scan.
*/
Datum
btgettuple(PG_FUNCTION_ARGS)
bool
btgettuple(IndexScanDesc scan, ScanDirection dir)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
BTScanOpaque so = (BTScanOpaque) scan->opaque;
bool res;
@@ -271,7 +303,7 @@ btgettuple(PG_FUNCTION_ARGS)
{
/* punt if we have any unsatisfiable array keys */
if (so->numArrayKeys < 0)
PG_RETURN_BOOL(false);
return false;
_bt_start_array_keys(scan, dir);
}
@@ -321,17 +353,15 @@ btgettuple(PG_FUNCTION_ARGS)
/* ... otherwise see if we have more array keys to deal with */
} while (so->numArrayKeys && _bt_advance_array_keys(scan, dir));
PG_RETURN_BOOL(res);
return res;
}
/*
* btgetbitmap() -- gets all matching tuples, and adds them to a bitmap
*/
Datum
btgetbitmap(PG_FUNCTION_ARGS)
int64
btgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
TIDBitmap *tbm = (TIDBitmap *) PG_GETARG_POINTER(1);
BTScanOpaque so = (BTScanOpaque) scan->opaque;
int64 ntids = 0;
ItemPointer heapTid;
@@ -343,7 +373,7 @@ btgetbitmap(PG_FUNCTION_ARGS)
{
/* punt if we have any unsatisfiable array keys */
if (so->numArrayKeys < 0)
PG_RETURN_INT64(ntids);
return ntids;
_bt_start_array_keys(scan, ForwardScanDirection);
}
@@ -381,18 +411,15 @@ btgetbitmap(PG_FUNCTION_ARGS)
/* Now see if we have more array keys to deal with */
} while (so->numArrayKeys && _bt_advance_array_keys(scan, ForwardScanDirection));
PG_RETURN_INT64(ntids);
return ntids;
}
/*
* btbeginscan() -- start a scan on a btree index
*/
Datum
btbeginscan(PG_FUNCTION_ARGS)
IndexScanDesc
btbeginscan(Relation rel, int nkeys, int norderbys)
{
Relation rel = (Relation) PG_GETARG_POINTER(0);
int nkeys = PG_GETARG_INT32(1);
int norderbys = PG_GETARG_INT32(2);
IndexScanDesc scan;
BTScanOpaque so;
@@ -430,19 +457,16 @@ btbeginscan(PG_FUNCTION_ARGS)
scan->opaque = so;
PG_RETURN_POINTER(scan);
return scan;
}
/*
* btrescan() -- rescan an index relation
*/
Datum
btrescan(PG_FUNCTION_ARGS)
void
btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
ScanKey orderbys, int norderbys)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanKey scankey = (ScanKey) PG_GETARG_POINTER(1);
/* remaining arguments are ignored */
BTScanOpaque so = (BTScanOpaque) scan->opaque;
/* we aren't holding any read locks, but gotta drop the pins */
@@ -493,17 +517,14 @@ btrescan(PG_FUNCTION_ARGS)
/* If any keys are SK_SEARCHARRAY type, set up array-key info */
_bt_preprocess_array_keys(scan);
PG_RETURN_VOID();
}
/*
* btendscan() -- close down a scan
*/
Datum
btendscan(PG_FUNCTION_ARGS)
void
btendscan(IndexScanDesc scan)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
BTScanOpaque so = (BTScanOpaque) scan->opaque;
/* we aren't holding any read locks, but gotta drop the pins */
@@ -532,17 +553,14 @@ btendscan(PG_FUNCTION_ARGS)
pfree(so->currTuples);
/* so->markTuples should not be pfree'd, see btrescan */
pfree(so);
PG_RETURN_VOID();
}
/*
* btmarkpos() -- save current scan position
*/
Datum
btmarkpos(PG_FUNCTION_ARGS)
void
btmarkpos(IndexScanDesc scan)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
BTScanOpaque so = (BTScanOpaque) scan->opaque;
/* There may be an old mark with a pin (but no lock). */
@@ -565,17 +583,14 @@ btmarkpos(PG_FUNCTION_ARGS)
/* Also record the current positions of any array keys */
if (so->numArrayKeys)
_bt_mark_array_keys(scan);
PG_RETURN_VOID();
}
/*
* btrestrpos() -- restore scan to last saved position
*/
Datum
btrestrpos(PG_FUNCTION_ARGS)
void
btrestrpos(IndexScanDesc scan)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
BTScanOpaque so = (BTScanOpaque) scan->opaque;
/* Restore the marked positions of any array keys */
@@ -643,8 +658,6 @@ btrestrpos(PG_FUNCTION_ARGS)
else
BTScanPosInvalidate(so->currPos);
}
PG_RETURN_VOID();
}
/*
@@ -654,13 +667,10 @@ btrestrpos(PG_FUNCTION_ARGS)
*
* Result: a palloc'd struct containing statistical info for VACUUM displays.
*/
Datum
btbulkdelete(PG_FUNCTION_ARGS)
IndexBulkDeleteResult *
btbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
IndexBulkDeleteCallback callback, void *callback_state)
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
IndexBulkDeleteResult *volatile stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
void *callback_state = (void *) PG_GETARG_POINTER(3);
Relation rel = info->index;
BTCycleId cycleid;
@@ -679,7 +689,7 @@ btbulkdelete(PG_FUNCTION_ARGS)
PG_END_ENSURE_ERROR_CLEANUP(_bt_end_vacuum_callback, PointerGetDatum(rel));
_bt_end_vacuum(rel);
PG_RETURN_POINTER(stats);
return stats;
}
/*
@@ -687,15 +697,12 @@ btbulkdelete(PG_FUNCTION_ARGS)
*
* Result: a palloc'd struct containing statistical info for VACUUM displays.
*/
Datum
btvacuumcleanup(PG_FUNCTION_ARGS)
IndexBulkDeleteResult *
btvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
/* No-op in ANALYZE ONLY mode */
if (info->analyze_only)
PG_RETURN_POINTER(stats);
return stats;
/*
* If btbulkdelete was called, we need not do anything, just return the
@@ -727,7 +734,7 @@ btvacuumcleanup(PG_FUNCTION_ARGS)
stats->num_index_tuples = info->num_heap_tuples;
}
PG_RETURN_POINTER(stats);
return stats;
}
/*
@@ -1148,8 +1155,8 @@ restart:
*
* btrees always do, so this is trivial.
*/
Datum
btcanreturn(PG_FUNCTION_ARGS)
bool
btcanreturn(Relation index, int attno)
{
PG_RETURN_BOOL(true);
return true;
}

View File

@@ -2058,15 +2058,8 @@ BTreeShmemInit(void)
Assert(found);
}
Datum
btoptions(PG_FUNCTION_ARGS)
bytea *
btoptions(Datum reloptions, bool validate)
{
Datum reloptions = PG_GETARG_DATUM(0);
bool validate = PG_GETARG_BOOL(1);
bytea *result;
result = default_reloptions(reloptions, validate, RELOPT_KIND_BTREE);
if (result)
PG_RETURN_BYTEA_P(result);
PG_RETURN_NULL();
return default_reloptions(reloptions, validate, RELOPT_KIND_BTREE);
}

View File

@@ -0,0 +1,204 @@
/*-------------------------------------------------------------------------
*
* nbtvalidate.c
* Opclass validator for btree.
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/nbtree/nbtvalidate.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup_details.h"
#include "access/nbtree.h"
#include "catalog/pg_amop.h"
#include "catalog/pg_amproc.h"
#include "catalog/pg_opclass.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/syscache.h"
/*
* Validator for a btree opclass.
*
* Some of the checks done here cover the whole opfamily, and therefore are
* redundant when checking each opclass in a family. But they don't run long
* enough to be much of a problem, so we accept the duplication rather than
* complicate the amvalidate API.
*/
bool
btvalidate(Oid opclassoid)
{
HeapTuple classtup;
Form_pg_opclass classform;
Oid opfamilyoid;
Oid opcintype;
int numclassops;
int32 classfuncbits;
CatCList *proclist,
*oprlist;
Oid lastlefttype,
lastrighttype;
int numOps;
int i,
j;
/* Fetch opclass information */
classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
if (!HeapTupleIsValid(classtup))
elog(ERROR, "cache lookup failed for operator class %u", opclassoid);
classform = (Form_pg_opclass) GETSTRUCT(classtup);
opfamilyoid = classform->opcfamily;
opcintype = classform->opcintype;
ReleaseSysCache(classtup);
/* Fetch all operators and support functions of the opfamily */
oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid));
/* We rely on the oprlist to be ordered */
if (!oprlist->ordered)
elog(ERROR, "cannot validate btree opclass without ordered data");
/* We'll track the ops and functions belonging to the named opclass */
numclassops = 0;
classfuncbits = 0;
/* Check support functions */
for (i = 0; i < proclist->n_members; i++)
{
HeapTuple proctup = &proclist->members[i]->tuple;
Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
/* Check that only allowed procedure numbers exist */
if (procform->amprocnum != BTORDER_PROC &&
procform->amprocnum != BTSORTSUPPORT_PROC)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("btree opfamily %u contains invalid support number %d for procedure %u",
opfamilyoid,
procform->amprocnum, procform->amproc)));
/* Remember functions that are specifically for the named opclass */
if (procform->amproclefttype == opcintype &&
procform->amprocrighttype == opcintype)
classfuncbits |= (1 << procform->amprocnum);
}
/* Check operators */
lastlefttype = lastrighttype = InvalidOid;
numOps = 0;
for (i = 0; i < oprlist->n_members; i++)
{
HeapTuple oprtup = &oprlist->members[i]->tuple;
Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
/* Check that only allowed strategy numbers exist */
if (oprform->amopstrategy < 1 ||
oprform->amopstrategy > BTMaxStrategyNumber)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("btree opfamily %u contains invalid strategy number %d for operator %u",
opfamilyoid,
oprform->amopstrategy, oprform->amopopr)));
/*
* Check that we have all strategies for each supported datatype
* combination. This is easy since the list will be sorted in
* datatype order and there can't be duplicate strategy numbers.
*/
if (oprform->amoplefttype == lastlefttype &&
oprform->amoprighttype == lastrighttype)
numOps++;
else
{
/* reached a group boundary, so check ... */
if (numOps > 0 && numOps != BTMaxStrategyNumber)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("btree opfamily %u has a partial set of operators for datatypes %s and %s",
opfamilyoid,
format_type_be(lastlefttype),
format_type_be(lastrighttype))));
/* ... and reset for new group */
lastlefttype = oprform->amoplefttype;
lastrighttype = oprform->amoprighttype;
numOps = 1;
}
/*
* There should be a relevant support function for each operator, but
* we only need to check this once per pair of datatypes.
*/
if (numOps == 1)
{
bool found = false;
for (j = 0; j < proclist->n_members; j++)
{
HeapTuple proctup = &proclist->members[j]->tuple;
Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
if (procform->amprocnum == BTORDER_PROC &&
procform->amproclefttype == oprform->amoplefttype &&
procform->amprocrighttype == oprform->amoprighttype)
{
found = true;
break;
}
}
if (!found)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("btree opfamily %u lacks support function for operator %u",
opfamilyoid, oprform->amopopr)));
}
/* btree doesn't support ORDER BY operators */
if (oprform->amoppurpose != AMOP_SEARCH ||
OidIsValid(oprform->amopsortfamily))
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("btree opfamily %u contains invalid ORDER BY specification for operator %u",
opfamilyoid, oprform->amopopr)));
/* Count operators that are specifically for the named opclass */
if (oprform->amoplefttype == opcintype &&
oprform->amoprighttype == opcintype)
numclassops++;
}
/* don't forget to check the last batch of operators for completeness */
if (numOps > 0 && numOps != BTMaxStrategyNumber)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("btree opfamily %u has a partial set of operators for datatypes %s and %s",
opfamilyoid,
format_type_be(lastlefttype),
format_type_be(lastrighttype))));
/* Check that the named opclass is complete */
if (numclassops != BTMaxStrategyNumber)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("btree opclass %u is missing operator(s)",
opclassoid)));
if ((classfuncbits & (1 << BTORDER_PROC)) == 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("btree opclass %u is missing required support function",
opclassoid)));
ReleaseCatCacheList(proclist);
ReleaseCatCacheList(oprlist);
return true;
}

View File

@@ -12,7 +12,7 @@ subdir = src/backend/access/spgist
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = spgutils.o spginsert.o spgscan.o spgvacuum.o \
OBJS = spgutils.o spginsert.o spgscan.o spgvacuum.o spgvalidate.o \
spgdoinsert.o spgxlog.o \
spgtextproc.o spgquadtreeproc.o spgkdtreeproc.o

View File

@@ -65,12 +65,9 @@ spgistBuildCallback(Relation index, HeapTuple htup, Datum *values,
/*
* Build an SP-GiST index.
*/
Datum
spgbuild(PG_FUNCTION_ARGS)
IndexBuildResult *
spgbuild(Relation heap, Relation index, IndexInfo *indexInfo)
{
Relation heap = (Relation) PG_GETARG_POINTER(0);
Relation index = (Relation) PG_GETARG_POINTER(1);
IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
IndexBuildResult *result;
double reltuples;
SpGistBuildState buildstate;
@@ -151,16 +148,15 @@ spgbuild(PG_FUNCTION_ARGS)
result = (IndexBuildResult *) palloc0(sizeof(IndexBuildResult));
result->heap_tuples = result->index_tuples = reltuples;
PG_RETURN_POINTER(result);
return result;
}
/*
* Build an empty SPGiST index in the initialization fork
*/
Datum
spgbuildempty(PG_FUNCTION_ARGS)
void
spgbuildempty(Relation index)
{
Relation index = (Relation) PG_GETARG_POINTER(0);
Page page;
/* Construct metapage. */
@@ -201,25 +197,16 @@ spgbuildempty(PG_FUNCTION_ARGS)
* checkpoint may have moved the redo pointer past our xlog record.
*/
smgrimmedsync(index->rd_smgr, INIT_FORKNUM);
PG_RETURN_VOID();
}
/*
* Insert one new tuple into an SPGiST index.
*/
Datum
spginsert(PG_FUNCTION_ARGS)
bool
spginsert(Relation index, Datum *values, bool *isnull,
ItemPointer ht_ctid, Relation heapRel,
IndexUniqueCheck checkUnique)
{
Relation index = (Relation) PG_GETARG_POINTER(0);
Datum *values = (Datum *) PG_GETARG_POINTER(1);
bool *isnull = (bool *) PG_GETARG_POINTER(2);
ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
#ifdef NOT_USED
Relation heapRel = (Relation) PG_GETARG_POINTER(4);
IndexUniqueCheck checkUnique = (IndexUniqueCheck) PG_GETARG_INT32(5);
#endif
SpGistState spgstate;
MemoryContext oldCtx;
MemoryContext insertCtx;
@@ -251,5 +238,5 @@ spginsert(PG_FUNCTION_ARGS)
MemoryContextDelete(insertCtx);
/* return false since we've not done any unique check */
PG_RETURN_BOOL(false);
return false;
}

View File

@@ -173,13 +173,9 @@ spgPrepareScanKeys(IndexScanDesc scan)
}
}
Datum
spgbeginscan(PG_FUNCTION_ARGS)
IndexScanDesc
spgbeginscan(Relation rel, int keysz, int orderbysz)
{
Relation rel = (Relation) PG_GETARG_POINTER(0);
int keysz = PG_GETARG_INT32(1);
/* ScanKey scankey = (ScanKey) PG_GETARG_POINTER(2); */
IndexScanDesc scan;
SpGistScanOpaque so;
@@ -202,15 +198,14 @@ spgbeginscan(PG_FUNCTION_ARGS)
scan->opaque = so;
PG_RETURN_POINTER(scan);
return scan;
}
Datum
spgrescan(PG_FUNCTION_ARGS)
void
spgrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
ScanKey orderbys, int norderbys)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
ScanKey scankey = (ScanKey) PG_GETARG_POINTER(1);
/* copy scankeys into local storage */
if (scankey && scan->numberOfKeys > 0)
@@ -224,33 +219,14 @@ spgrescan(PG_FUNCTION_ARGS)
/* set up starting stack entries */
resetSpGistScanOpaque(so);
PG_RETURN_VOID();
}
Datum
spgendscan(PG_FUNCTION_ARGS)
void
spgendscan(IndexScanDesc scan)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
MemoryContextDelete(so->tempCxt);
PG_RETURN_VOID();
}
Datum
spgmarkpos(PG_FUNCTION_ARGS)
{
elog(ERROR, "SPGiST does not support mark/restore");
PG_RETURN_VOID();
}
Datum
spgrestrpos(PG_FUNCTION_ARGS)
{
elog(ERROR, "SPGiST does not support mark/restore");
PG_RETURN_VOID();
}
/*
@@ -571,11 +547,9 @@ storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
so->ntids++;
}
Datum
spggetbitmap(PG_FUNCTION_ARGS)
int64
spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
TIDBitmap *tbm = (TIDBitmap *) PG_GETARG_POINTER(1);
SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
/* Copy want_itup to *so so we don't need to pass it around separately */
@@ -586,7 +560,7 @@ spggetbitmap(PG_FUNCTION_ARGS)
spgWalk(scan->indexRelation, so, true, storeBitmap);
PG_RETURN_INT64(so->ntids);
return so->ntids;
}
/* storeRes subroutine for gettuple case */
@@ -610,11 +584,9 @@ storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
so->nPtrs++;
}
Datum
spggettuple(PG_FUNCTION_ARGS)
bool
spggettuple(IndexScanDesc scan, ScanDirection dir)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
if (dir != ForwardScanDirection)
@@ -632,7 +604,7 @@ spggettuple(PG_FUNCTION_ARGS)
scan->xs_recheck = so->recheck[so->iPtr];
scan->xs_itup = so->indexTups[so->iPtr];
so->iPtr++;
PG_RETURN_BOOL(true);
return true;
}
if (so->want_itup)
@@ -651,19 +623,16 @@ spggettuple(PG_FUNCTION_ARGS)
break; /* must have completed scan */
}
PG_RETURN_BOOL(false);
return false;
}
Datum
spgcanreturn(PG_FUNCTION_ARGS)
bool
spgcanreturn(Relation index, int attno)
{
Relation index = (Relation) PG_GETARG_POINTER(0);
/* int i = PG_GETARG_INT32(1); */
SpGistCache *cache;
/* We can do it if the opclass config function says so */
cache = spgGetCache(index);
PG_RETURN_BOOL(cache->config.canReturnData);
return cache->config.canReturnData;
}

View File

@@ -15,7 +15,6 @@
#include "postgres.h"
#include "access/genam.h"
#include "access/reloptions.h"
#include "access/spgist_private.h"
#include "access/transam.h"
@@ -23,9 +22,54 @@
#include "storage/bufmgr.h"
#include "storage/indexfsm.h"
#include "storage/lmgr.h"
#include "utils/index_selfuncs.h"
#include "utils/lsyscache.h"
/*
* SP-GiST handler function: return IndexAmRoutine with access method parameters
* and callbacks.
*/
Datum
spghandler(PG_FUNCTION_ARGS)
{
IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
amroutine->amstrategies = 0;
amroutine->amsupport = 5;
amroutine->amcanorder = false;
amroutine->amcanorderbyop = false;
amroutine->amcanbackward = false;
amroutine->amcanunique = false;
amroutine->amcanmulticol = false;
amroutine->amoptionalkey = true;
amroutine->amsearcharray = false;
amroutine->amsearchnulls = true;
amroutine->amstorage = false;
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = spgbuild;
amroutine->ambuildempty = spgbuildempty;
amroutine->aminsert = spginsert;
amroutine->ambulkdelete = spgbulkdelete;
amroutine->amvacuumcleanup = spgvacuumcleanup;
amroutine->amcanreturn = spgcanreturn;
amroutine->amcostestimate = spgcostestimate;
amroutine->amoptions = spgoptions;
amroutine->amvalidate = spgvalidate;
amroutine->ambeginscan = spgbeginscan;
amroutine->amrescan = spgrescan;
amroutine->amgettuple = spggettuple;
amroutine->amgetbitmap = spggetbitmap;
amroutine->amendscan = spgendscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
PG_RETURN_POINTER(amroutine);
}
/* Fill in a SpGistTypeDesc struct with info about the specified data type */
static void
fillTypeDesc(SpGistTypeDesc *desc, Oid type)
@@ -489,18 +533,10 @@ SpGistInitMetapage(Page page)
/*
* reloptions processing for SPGiST
*/
Datum
spgoptions(PG_FUNCTION_ARGS)
bytea *
spgoptions(Datum reloptions, bool validate)
{
Datum reloptions = PG_GETARG_DATUM(0);
bool validate = PG_GETARG_BOOL(1);
bytea *result;
result = default_reloptions(reloptions, validate, RELOPT_KIND_SPGIST);
if (result)
PG_RETURN_BYTEA_P(result);
PG_RETURN_NULL();
return default_reloptions(reloptions, validate, RELOPT_KIND_SPGIST);
}
/*

View File

@@ -881,13 +881,10 @@ spgvacuumscan(spgBulkDeleteState *bds)
*
* Result: a palloc'd struct containing statistical info for VACUUM displays.
*/
Datum
spgbulkdelete(PG_FUNCTION_ARGS)
IndexBulkDeleteResult *
spgbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
IndexBulkDeleteCallback callback, void *callback_state)
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
void *callback_state = (void *) PG_GETARG_POINTER(3);
spgBulkDeleteState bds;
/* allocate stats if first time through, else re-use existing struct */
@@ -900,7 +897,7 @@ spgbulkdelete(PG_FUNCTION_ARGS)
spgvacuumscan(&bds);
PG_RETURN_POINTER(stats);
return stats;
}
/* Dummy callback to delete no tuples during spgvacuumcleanup */
@@ -915,17 +912,15 @@ dummy_callback(ItemPointer itemptr, void *state)
*
* Result: a palloc'd struct containing statistical info for VACUUM displays.
*/
Datum
spgvacuumcleanup(PG_FUNCTION_ARGS)
IndexBulkDeleteResult *
spgvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
Relation index = info->index;
spgBulkDeleteState bds;
/* No-op in ANALYZE ONLY mode */
if (info->analyze_only)
PG_RETURN_POINTER(stats);
return stats;
/*
* We don't need to scan the index if there was a preceding bulkdelete
@@ -959,5 +954,5 @@ spgvacuumcleanup(PG_FUNCTION_ARGS)
stats->num_index_tuples = info->num_heap_tuples;
}
PG_RETURN_POINTER(stats);
return stats;
}

View File

@@ -0,0 +1,129 @@
/*-------------------------------------------------------------------------
*
* spgvalidate.c
* Opclass validator for SP-GiST.
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/spgist/spgvalidate.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup_details.h"
#include "access/spgist_private.h"
#include "catalog/pg_amop.h"
#include "catalog/pg_amproc.h"
#include "catalog/pg_opclass.h"
#include "utils/catcache.h"
#include "utils/syscache.h"
/*
* Validator for an SP-GiST opclass.
*/
bool
spgvalidate(Oid opclassoid)
{
HeapTuple classtup;
Form_pg_opclass classform;
Oid opfamilyoid;
Oid opcintype;
int numclassops;
int32 classfuncbits;
CatCList *proclist,
*oprlist;
int i;
/* Fetch opclass information */
classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
if (!HeapTupleIsValid(classtup))
elog(ERROR, "cache lookup failed for operator class %u", opclassoid);
classform = (Form_pg_opclass) GETSTRUCT(classtup);
opfamilyoid = classform->opcfamily;
opcintype = classform->opcintype;
ReleaseSysCache(classtup);
/* Fetch all operators and support functions of the opfamily */
oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid));
/* We'll track the ops and functions belonging to the named opclass */
numclassops = 0;
classfuncbits = 0;
/* Check support functions */
for (i = 0; i < proclist->n_members; i++)
{
HeapTuple proctup = &proclist->members[i]->tuple;
Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
/* Check that only allowed procedure numbers exist */
if (procform->amprocnum < 1 ||
procform->amprocnum > SPGISTNProc)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("spgist opfamily %u contains invalid support number %d for procedure %u",
opfamilyoid,
procform->amprocnum, procform->amproc)));
/* Remember functions that are specifically for the named opclass */
if (procform->amproclefttype == opcintype &&
procform->amprocrighttype == opcintype)
classfuncbits |= (1 << procform->amprocnum);
}
/* Check operators */
for (i = 0; i < oprlist->n_members; i++)
{
HeapTuple oprtup = &oprlist->members[i]->tuple;
Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
/* TODO: Check that only allowed strategy numbers exist */
if (oprform->amopstrategy < 1)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("spgist opfamily %u contains invalid strategy number %d for operator %u",
opfamilyoid,
oprform->amopstrategy, oprform->amopopr)));
/* spgist doesn't support ORDER BY operators */
if (oprform->amoppurpose != AMOP_SEARCH ||
OidIsValid(oprform->amopsortfamily))
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("spgist opfamily %u contains invalid ORDER BY specification for operator %u",
opfamilyoid, oprform->amopopr)));
/* Count operators that are specifically for the named opclass */
if (oprform->amoplefttype == opcintype &&
oprform->amoprighttype == opcintype)
numclassops++;
}
/* Check that the named opclass is complete */
if (numclassops == 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("spgist opclass %u is missing operator(s)",
opclassoid)));
for (i = 1; i <= SPGISTNProc; i++)
{
if ((classfuncbits & (1 << i)) != 0)
continue; /* got it */
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("spgist opclass %u is missing required support function %d",
opclassoid, i)));
}
ReleaseCatCacheList(proclist);
ReleaseCatCacheList(oprlist);
return true;
}