1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-13 16:22:44 +03:00

BRIN: Block Range Indexes

BRIN is a new index access method intended to accelerate scans of very
large tables, without the maintenance overhead of btrees or other
traditional indexes.  They work by maintaining "summary" data about
block ranges.  Bitmap index scans work by reading each summary tuple and
comparing them with the query quals; all pages in the range are returned
in a lossy TID bitmap if the quals are consistent with the values in the
summary tuple, otherwise not.  Normal index scans are not supported
because these indexes do not store TIDs.

As new tuples are added into the index, the summary information is
updated (if the block range in which the tuple is added is already
summarized) or not; in the latter case, a subsequent pass of VACUUM or
the brin_summarize_new_values() function will create the summary
information.

For data types with natural 1-D sort orders, the summary info consists
of the maximum and the minimum values of each indexed column within each
page range.  This type of operator class we call "Minmax", and we
supply a bunch of them for most data types with B-tree opclasses.
Since the BRIN code is generalized, other approaches are possible for
things such as arrays, geometric types, ranges, etc; even for things
such as enum types we could do something different than minmax with
better results.  In this commit I only include minmax.

Catalog version bumped due to new builtin catalog entries.

There's more that could be done here, but this is a good step forwards.

Loosely based on ideas from Simon Riggs; code mostly by Álvaro Herrera,
with contribution by Heikki Linnakangas.

Patch reviewed by: Amit Kapila, Heikki Linnakangas, Robert Haas.
Testing help from Jeff Janes, Erik Rijkers, Emanuel Calvo.

PS:
  The research leading to these results has received funding from the
  European Union's Seventh Framework Programme (FP7/2007-2013) under
  grant agreement n° 318633.
This commit is contained in:
Alvaro Herrera
2014-11-07 16:38:14 -03:00
parent 1961b1c131
commit 7516f52594
57 changed files with 6807 additions and 24 deletions

View File

@@ -8,6 +8,6 @@ subdir = src/backend/access
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
SUBDIRS = common gin gist hash heap index nbtree rmgrdesc spgist transam
SUBDIRS = brin common gin gist hash heap index nbtree rmgrdesc spgist transam
include $(top_srcdir)/src/backend/common.mk

View File

@@ -0,0 +1,18 @@
#-------------------------------------------------------------------------
#
# Makefile--
# Makefile for access/brin
#
# IDENTIFICATION
# src/backend/access/brin/Makefile
#
#-------------------------------------------------------------------------
subdir = src/backend/access/brin
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = brin.o brin_pageops.o brin_revmap.o brin_tuple.o brin_xlog.o \
brin_minmax.o
include $(top_srcdir)/src/backend/common.mk

View File

@@ -0,0 +1,189 @@
Block Range Indexes (BRIN)
==========================
BRIN indexes intend to enable very fast scanning of extremely large tables.
The essential idea of a BRIN index is to keep track of summarizing values in
consecutive groups of heap pages (page ranges); for example, the minimum and
maximum values for datatypes with a btree opclass, or the bounding box for
geometric types. These values can be used to avoid scanning such pages
during a table scan, depending on query quals.
The cost of this is having to update the stored summary values of each page
range as tuples are inserted into them.
Access Method Design
--------------------
Since item pointers are not stored inside indexes of this type, it is not
possible to support the amgettuple interface. Instead, we only provide
amgetbitmap support. The amgetbitmap routine returns a lossy TIDBitmap
comprising all pages in those page ranges that match the query
qualifications. The recheck step in the BitmapHeapScan node prunes tuples
that are not visible according to the query qualifications.
An operator class must have the following entries:
- generic support procedures (pg_amproc), identical to all opclasses:
* "opcinfo" (BRIN_PROCNUM_OPCINFO) initializes a structure for index
creation or scanning
* "addValue" (BRIN_PROCNUM_ADDVALUE) takes an index tuple and a heap item,
and possibly changes the index tuple so that it includes the heap item
values
* "consistent" (BRIN_PROCNUM_CONSISTENT) takes an index tuple and query
quals, and returns whether the index tuple values match the query quals.
* "union" (BRIN_PROCNUM_UNION) takes two index tuples and modifies the first
one so that it represents the union of the two.
Procedure numbers up to 10 are reserved for future expansion.
Additionally, each opclass needs additional support functions:
- Minmax-style operator classes:
* Proc numbers 11-14 are used for the functions implementing inequality
operators for the type, in this order: less than, less or equal,
greater or equal, greater than.
Opclasses using a different design will require different additional procedure
numbers.
Operator classes also need to have operator (pg_amop) entries so that the
optimizer can choose the index to execute queries.
- Minmax-style operator classes:
* The same operators as btree (<=, <, =, >=, >)
Each index tuple stores some NULL bits and some opclass-specified values, which
are stored in a single null bitmask of length twice the number of columns. The
generic NULL bits indicate, for each column:
* bt_hasnulls: Whether there's any NULL value at all in the page range
* bt_allnulls: Whether all values are NULLs in the page range
The opclass-specified values are:
- Minmax-style operator classes
* minimum value across all tuples in the range
* maximum value across all tuples in the range
Note that the addValue and Union support procedures must be careful to
datumCopy() the values they want to store in the in-memory BRIN tuple, and
must pfree() the old copies when replacing older ones. Since some values
referenced from the tuple persist and others go away, there is no
well-defined lifetime for a memory context that would make this automatic.
The Range Map
-------------
To find the index tuple for a particular page range, we have an internal
structure we call the range map, or "revmap" for short. This stores one TID
per page range, which is the address of the index tuple summarizing that
range. Since the map entries are fixed size, it is possible to compute the
address of the range map entry for any given heap page by simple arithmetic.
When a new heap tuple is inserted in a summarized page range, we compare the
existing index tuple with the new heap tuple. If the heap tuple is outside
the summarization data given by the index tuple for any indexed column (or
if the new heap tuple contains null values but the index tuple indicates
there are no nulls), the index is updated with the new values. In many
cases it is possible to update the index tuple in-place, but if the new
index tuple is larger than the old one and there's not enough space in the
page, it is necessary to create a new index tuple with the new values. The
range map can be updated quickly to point to it; the old index tuple is
removed.
If the range map points to an invalid TID, the corresponding page range is
considered to be not summarized. When tuples are added to unsummarized
pages, nothing needs to happen.
To scan a table following a BRIN index, we scan the range map sequentially.
This yields index tuples in ascending page range order. Query quals are
matched to each index tuple; if they match, each page within the page range
is returned as part of the output TID bitmap. If there's no match, they are
skipped. Range map entries returning invalid index TIDs, that is
unsummarized page ranges, are also returned in the TID bitmap.
The revmap is stored in the first few blocks of the index main fork,
immediately following the metapage. Whenever the revmap needs to be
extended by another page, existing tuples in that page are moved to some
other page.
Heap tuples can be removed from anywhere without restriction. It might be
useful to mark the corresponding index tuple somehow, if the heap tuple is
one of the constraining values of the summary data (i.e. either min or max
in the case of a btree-opclass-bearing datatype), so that in the future we
are aware of the need to re-execute summarization on that range, leading to
a possible tightening of the summary values.
Summarization
-------------
At index creation time, the whole table is scanned; for each page range the
summarizing values of each indexed column and nulls bitmap are collected and
stored in the index. The partially-filled page range at the end of the
table is also summarized.
As new tuples get inserted at the end of the table, they may update the
index tuple that summarizes the partial page range at the end. Eventually
that page range is complete and new tuples belong in a new page range that
hasn't yet been summarized. Those insertions do not create a new index
entry; instead, the page range remains unsummarized until later.
Wehn VACUUM is run on the table, all unsummarized page ranges are
summarized. This action can also be invoked by the user via
brin_summarize_new_values(). Both these procedures scan all the
unsummarized ranges, and create a summary tuple. Again, this includes the
partially-filled page range at the end of the table.
Vacuuming
---------
Since no heap TIDs are stored in a BRIN index, it's not necessary to scan the
index when heap tuples are removed. It might be that some summary values can
be tightened if heap tuples have been deleted; but this would represent an
optimization opportunity only, not a correctness issue. It's simpler to
represent this as the need to re-run summarization on the affected page range
rather than "subtracting" values from the existing one. This is not
currently implemented.
Note that if there are no indexes on the table other than the BRIN index,
usage of maintenance_work_mem by vacuum can be decreased significantly, because
no detailed index scan needs to take place (and thus it's not necessary for
vacuum to save TIDs to remove). It's unlikely that BRIN would be the only
indexes in a table, though, because primary keys can be btrees only, and so
we don't implement this optimization.
Optimizer
---------
The optimizer selects the index based on the operator class' pg_amop
entries for the column.
Future improvements
-------------------
* Different-size page ranges?
In the current design, each "index entry" in a BRIN index covers the same
number of pages. There's no hard reason for this; it might make sense to
allow the index to self-tune so that some index entries cover smaller page
ranges, if this allows the summary values to be more compact. This would incur
larger BRIN overhead for the index itself, but might allow better pruning of
page ranges during scan. In the limit of one index tuple per page, the index
itself would occupy too much space, even though we would be able to skip
reading the most heap pages, because the summary values are tight; in the
opposite limit of a single tuple that summarizes the whole table, we wouldn't
be able to prune anything even though the index is very small. This can
probably be made to work by using the range map as an index in itself.
* More compact representation for TIDBitmap?
TIDBitmap is the structure used to represent bitmap scans. The
representation of lossy page ranges is not optimal for our purposes, because
it uses a Bitmapset to represent pages in the range; since we're going to return
all pages in a large range, it might be more convenient to allow for a
struct that uses start and end page numbers to represent the range, instead.
* Better vacuuming?
It might be useful to enable passing more useful info to BRIN indexes during
vacuuming about tuples that are deleted, i.e. do not require the callback to
pass each tuple's TID. For instance we might need a callback that passes a
block number instead of a TID. That would help determine when to re-run
summarization on blocks that have seen lots of tuple deletions.

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,341 @@
/*
* brin_minmax.c
* Implementation of Min/Max opclass for BRIN
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/brin/brin_minmax.c
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/brin_internal.h"
#include "access/brin_tuple.h"
#include "access/skey.h"
#include "catalog/pg_type.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
/*
* Procedure numbers must not collide with BRIN_PROCNUM defines in
* brin_internal.h. Note we only need inequality functions.
*/
#define MINMAX_NUM_PROCNUMS 4 /* # support procs we need */
#define PROCNUM_LESS 11
#define PROCNUM_LESSEQUAL 12
#define PROCNUM_GREATEREQUAL 13
#define PROCNUM_GREATER 14
/*
* Subtract this from procnum to obtain index in MinmaxOpaque arrays
* (Must be equal to minimum of private procnums)
*/
#define PROCNUM_BASE 11
static FmgrInfo *minmax_get_procinfo(BrinDesc *bdesc, uint16 attno,
uint16 procnum);
PG_FUNCTION_INFO_V1(minmaxOpcInfo);
PG_FUNCTION_INFO_V1(minmaxAddValue);
PG_FUNCTION_INFO_V1(minmaxConsistent);
PG_FUNCTION_INFO_V1(minmaxUnion);
typedef struct MinmaxOpaque
{
FmgrInfo operators[MINMAX_NUM_PROCNUMS];
bool inited[MINMAX_NUM_PROCNUMS];
} MinmaxOpaque;
Datum
minmaxOpcInfo(PG_FUNCTION_ARGS)
{
Oid typoid = PG_GETARG_OID(0);
BrinOpcInfo *result;
/*
* opaque->operators is initialized lazily, as indicated by 'inited' which
* is initialized to all false by palloc0.
*/
result = palloc0(MAXALIGN(SizeofBrinOpcInfo(2)) +
sizeof(MinmaxOpaque));
result->oi_nstored = 2;
result->oi_opaque = (MinmaxOpaque *)
MAXALIGN((char *) result + SizeofBrinOpcInfo(2));
result->oi_typids[0] = typoid;
result->oi_typids[1] = typoid;
PG_RETURN_POINTER(result);
}
/*
* Examine the given index tuple (which contains partial status of a certain
* page range) by comparing it to the given value that comes from another heap
* tuple. If the new value is outside the min/max range specified by the
* existing tuple values, update the index tuple and return true. Otherwise,
* return false and do not modify in this case.
*/
Datum
minmaxAddValue(PG_FUNCTION_ARGS)
{
BrinDesc *bdesc = (BrinDesc *) PG_GETARG_POINTER(0);
BrinValues *column = (BrinValues *) PG_GETARG_POINTER(1);
Datum newval = PG_GETARG_DATUM(2);
bool isnull = PG_GETARG_DATUM(3);
Oid colloid = PG_GET_COLLATION();
FmgrInfo *cmpFn;
Datum compar;
bool updated = false;
Form_pg_attribute attr;
AttrNumber attno;
/*
* If the new value is null, we record that we saw it if it's the first
* one; otherwise, there's nothing to do.
*/
if (isnull)
{
if (column->bv_hasnulls)
PG_RETURN_BOOL(false);
column->bv_hasnulls = true;
PG_RETURN_BOOL(true);
}
attno = column->bv_attno;
attr = bdesc->bd_tupdesc->attrs[attno - 1];
/*
* If the recorded value is null, store the new value (which we know to be
* not null) as both minimum and maximum, and we're done.
*/
if (column->bv_allnulls)
{
column->bv_values[0] = datumCopy(newval, attr->attbyval, attr->attlen);
column->bv_values[1] = datumCopy(newval, attr->attbyval, attr->attlen);
column->bv_allnulls = false;
PG_RETURN_BOOL(true);
}
/*
* Otherwise, need to compare the new value with the existing boundaries
* and update them accordingly. First check if it's less than the
* existing minimum.
*/
cmpFn = minmax_get_procinfo(bdesc, attno, PROCNUM_LESS);
compar = FunctionCall2Coll(cmpFn, colloid, newval, column->bv_values[0]);
if (DatumGetBool(compar))
{
if (!attr->attbyval)
pfree(DatumGetPointer(column->bv_values[0]));
column->bv_values[0] = datumCopy(newval, attr->attbyval, attr->attlen);
updated = true;
}
/*
* And now compare it to the existing maximum.
*/
cmpFn = minmax_get_procinfo(bdesc, attno, PROCNUM_GREATER);
compar = FunctionCall2Coll(cmpFn, colloid, newval, column->bv_values[1]);
if (DatumGetBool(compar))
{
if (!attr->attbyval)
pfree(DatumGetPointer(column->bv_values[1]));
column->bv_values[1] = datumCopy(newval, attr->attbyval, attr->attlen);
updated = true;
}
PG_RETURN_BOOL(updated);
}
/*
* Given an index tuple corresponding to a certain page range and a scan key,
* return whether the scan key is consistent with the index tuple's min/max
* values. Return true if so, false otherwise.
*/
Datum
minmaxConsistent(PG_FUNCTION_ARGS)
{
BrinDesc *bdesc = (BrinDesc *) PG_GETARG_POINTER(0);
BrinValues *column = (BrinValues *) PG_GETARG_POINTER(1);
ScanKey key = (ScanKey) PG_GETARG_POINTER(2);
Oid colloid = PG_GET_COLLATION();
AttrNumber attno;
Datum value;
Datum matches;
Assert(key->sk_attno == column->bv_attno);
/* handle IS NULL/IS NOT NULL tests */
if (key->sk_flags & SK_ISNULL)
{
if (key->sk_flags & SK_SEARCHNULL)
{
if (column->bv_allnulls || column->bv_hasnulls)
PG_RETURN_BOOL(true);
PG_RETURN_BOOL(false);
}
/*
* For IS NOT NULL, we can only skip ranges that are known to have
* only nulls.
*/
Assert(key->sk_flags & SK_SEARCHNOTNULL);
PG_RETURN_BOOL(!column->bv_allnulls);
}
/* if the range is all empty, it cannot possibly be consistent */
if (column->bv_allnulls)
PG_RETURN_BOOL(false);
attno = key->sk_attno;
value = key->sk_argument;
switch (key->sk_strategy)
{
case BTLessStrategyNumber:
matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
PROCNUM_LESS),
colloid, column->bv_values[0], value);
break;
case BTLessEqualStrategyNumber:
matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
PROCNUM_LESSEQUAL),
colloid, column->bv_values[0], value);
break;
case BTEqualStrategyNumber:
/*
* In the equality case (WHERE col = someval), we want to return
* the current page range if the minimum value in the range <=
* scan key, and the maximum value >= scan key.
*/
matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
PROCNUM_LESSEQUAL),
colloid, column->bv_values[0], value);
if (!DatumGetBool(matches))
break;
/* max() >= scankey */
matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
PROCNUM_GREATEREQUAL),
colloid, column->bv_values[1], value);
break;
case BTGreaterEqualStrategyNumber:
matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
PROCNUM_GREATEREQUAL),
colloid, column->bv_values[1], value);
break;
case BTGreaterStrategyNumber:
matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
PROCNUM_GREATER),
colloid, column->bv_values[1], value);
break;
default:
/* shouldn't happen */
elog(ERROR, "invalid strategy number %d", key->sk_strategy);
matches = 0;
break;
}
PG_RETURN_DATUM(matches);
}
/*
* Given two BrinValues, update the first of them as a union of the summary
* values contained in both. The second one is untouched.
*/
Datum
minmaxUnion(PG_FUNCTION_ARGS)
{
BrinDesc *bdesc = (BrinDesc *) PG_GETARG_POINTER(0);
BrinValues *col_a = (BrinValues *) PG_GETARG_POINTER(1);
BrinValues *col_b = (BrinValues *) PG_GETARG_POINTER(2);
Oid colloid = PG_GET_COLLATION();
AttrNumber attno;
Form_pg_attribute attr;
bool needsadj;
Assert(col_a->bv_attno == col_b->bv_attno);
/* If there are no values in B, there's nothing to do */
if (col_b->bv_allnulls)
PG_RETURN_VOID();
attno = col_a->bv_attno;
attr = bdesc->bd_tupdesc->attrs[attno - 1];
/* Adjust "hasnulls" */
if (col_b->bv_hasnulls && !col_a->bv_hasnulls)
col_a->bv_hasnulls = true;
/*
* Adjust "allnulls". If B has values but A doesn't, just copy the values
* from B into A, and we're done. (We cannot run the operators in this
* case, because values in A might contain garbage.)
*/
if (!col_b->bv_allnulls && col_a->bv_allnulls)
{
col_a->bv_allnulls = false;
col_a->bv_values[0] = datumCopy(col_b->bv_values[0],
attr->attbyval, attr->attlen);
col_a->bv_values[1] = datumCopy(col_b->bv_values[1],
attr->attbyval, attr->attlen);
PG_RETURN_VOID();
}
/* Adjust minimum, if B's min is less than A's min */
needsadj = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
PROCNUM_LESS),
colloid, col_b->bv_values[0], col_a->bv_values[0]);
if (needsadj)
{
if (!attr->attbyval)
pfree(DatumGetPointer(col_a->bv_values[0]));
col_a->bv_values[0] = datumCopy(col_b->bv_values[0],
attr->attbyval, attr->attlen);
}
/* Adjust maximum, if B's max is greater than A's max */
needsadj = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
PROCNUM_GREATER),
colloid, col_b->bv_values[1], col_a->bv_values[1]);
if (needsadj)
{
if (!attr->attbyval)
pfree(DatumGetPointer(col_a->bv_values[1]));
col_a->bv_values[1] = datumCopy(col_b->bv_values[1],
attr->attbyval, attr->attlen);
}
PG_RETURN_VOID();
}
/*
* Return the procedure corresponding to the given function support number.
*/
static FmgrInfo *
minmax_get_procinfo(BrinDesc *bdesc, uint16 attno, uint16 procnum)
{
MinmaxOpaque *opaque;
uint16 basenum = procnum - PROCNUM_BASE;
opaque = (MinmaxOpaque *) bdesc->bd_info[attno - 1]->oi_opaque;
/*
* We cache these in the opaque struct, to avoid repetitive syscache
* lookups.
*/
if (!opaque->inited[basenum])
{
fmgr_info_copy(&opaque->operators[basenum],
index_getprocinfo(bdesc->bd_index, attno, procnum),
bdesc->bd_context);
opaque->inited[basenum] = true;
}
return &opaque->operators[basenum];
}

View File

@@ -0,0 +1,723 @@
/*
* brin_pageops.c
* Page-handling routines for BRIN indexes
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/brin/brin_pageops.c
*/
#include "postgres.h"
#include "access/brin_pageops.h"
#include "access/brin_page.h"
#include "access/brin_revmap.h"
#include "access/brin_xlog.h"
#include "access/xloginsert.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "storage/lmgr.h"
#include "storage/smgr.h"
#include "utils/rel.h"
static Buffer brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
bool *was_extended);
static Size br_page_get_freespace(Page page);
/*
* Update tuple origtup (size origsz), located in offset oldoff of buffer
* oldbuf, to newtup (size newsz) as summary tuple for the page range starting
* at heapBlk. oldbuf must not be locked on entry, and is not locked at exit.
*
* If samepage is true, attempt to put the new tuple in the same page, but if
* there's no room, use some other one.
*
* If the update is successful, return true; the revmap is updated to point to
* the new tuple. If the update is not done for whatever reason, return false.
* Caller may retry the update if this happens.
*/
bool
brin_doupdate(Relation idxrel, BlockNumber pagesPerRange,
BrinRevmap *revmap, BlockNumber heapBlk,
Buffer oldbuf, OffsetNumber oldoff,
const BrinTuple *origtup, Size origsz,
const BrinTuple *newtup, Size newsz,
bool samepage)
{
Page oldpage;
ItemId oldlp;
BrinTuple *oldtup;
Size oldsz;
Buffer newbuf;
BrinSpecialSpace *special;
bool extended = false;
newsz = MAXALIGN(newsz);
/* make sure the revmap is long enough to contain the entry we need */
brinRevmapExtend(revmap, heapBlk);
if (!samepage)
{
/* need a page on which to put the item */
newbuf = brin_getinsertbuffer(idxrel, oldbuf, newsz, &extended);
/* XXX delay vacuuming FSM until locks are released? */
if (extended)
FreeSpaceMapVacuum(idxrel);
if (!BufferIsValid(newbuf))
return false;
/*
* Note: it's possible (though unlikely) that the returned newbuf is
* the same as oldbuf, if brin_getinsertbuffer determined that the old
* buffer does in fact have enough space.
*/
if (newbuf == oldbuf)
newbuf = InvalidBuffer;
}
else
{
LockBuffer(oldbuf, BUFFER_LOCK_EXCLUSIVE);
newbuf = InvalidBuffer;
}
oldpage = BufferGetPage(oldbuf);
oldlp = PageGetItemId(oldpage, oldoff);
/*
* Check that the old tuple wasn't updated concurrently: it might have
* moved someplace else entirely ...
*/
if (!ItemIdIsNormal(oldlp))
{
LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
if (BufferIsValid(newbuf))
UnlockReleaseBuffer(newbuf);
return false;
}
oldsz = ItemIdGetLength(oldlp);
oldtup = (BrinTuple *) PageGetItem(oldpage, oldlp);
/*
* ... or it might have been updated in place to different contents.
*/
if (!brin_tuples_equal(oldtup, oldsz, origtup, origsz))
{
LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
if (BufferIsValid(newbuf))
UnlockReleaseBuffer(newbuf);
return false;
}
special = (BrinSpecialSpace *) PageGetSpecialPointer(oldpage);
/*
* Great, the old tuple is intact. We can proceed with the update.
*
* If there's enough room in the old page for the new tuple, replace it.
*
* Note that there might now be enough space on the page even though the
* caller told us there isn't, if a concurrent update moved another tuple
* elsewhere or replaced a tuple with a smaller one.
*/
if (((special->flags & BRIN_EVACUATE_PAGE) == 0) &&
brin_can_do_samepage_update(oldbuf, origsz, newsz))
{
if (BufferIsValid(newbuf))
UnlockReleaseBuffer(newbuf);
START_CRIT_SECTION();
PageIndexDeleteNoCompact(oldpage, &oldoff, 1);
if (PageAddItem(oldpage, (Item) newtup, newsz, oldoff, true,
false) == InvalidOffsetNumber)
elog(ERROR, "failed to add BRIN tuple");
MarkBufferDirty(oldbuf);
/* XLOG stuff */
if (RelationNeedsWAL(idxrel))
{
BlockNumber blk = BufferGetBlockNumber(oldbuf);
xl_brin_samepage_update xlrec;
XLogRecPtr recptr;
XLogRecData rdata[2];
uint8 info = XLOG_BRIN_SAMEPAGE_UPDATE;
xlrec.node = idxrel->rd_node;
ItemPointerSetBlockNumber(&xlrec.tid, blk);
ItemPointerSetOffsetNumber(&xlrec.tid, oldoff);
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfBrinSamepageUpdate;
rdata[0].buffer = InvalidBuffer;
rdata[0].next = &(rdata[1]);
rdata[1].data = (char *) newtup;
rdata[1].len = newsz;
rdata[1].buffer = oldbuf;
rdata[1].buffer_std = true;
rdata[1].next = NULL;
recptr = XLogInsert(RM_BRIN_ID, info, rdata);
PageSetLSN(oldpage, recptr);
}
END_CRIT_SECTION();
LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
return true;
}
else if (newbuf == InvalidBuffer)
{
/*
* Not enough space, but caller said that there was. Tell them to
* start over.
*/
LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
return false;
}
else
{
/*
* Not enough free space on the oldpage. Put the new tuple on the new
* page, and update the revmap.
*/
Page newpage = BufferGetPage(newbuf);
Buffer revmapbuf;
ItemPointerData newtid;
OffsetNumber newoff;
revmapbuf = brinLockRevmapPageForUpdate(revmap, heapBlk);
START_CRIT_SECTION();
PageIndexDeleteNoCompact(oldpage, &oldoff, 1);
newoff = PageAddItem(newpage, (Item) newtup, newsz,
InvalidOffsetNumber, false, false);
if (newoff == InvalidOffsetNumber)
elog(ERROR, "failed to add BRIN tuple to new page");
MarkBufferDirty(oldbuf);
MarkBufferDirty(newbuf);
ItemPointerSet(&newtid, BufferGetBlockNumber(newbuf), newoff);
brinSetHeapBlockItemptr(revmapbuf, pagesPerRange, heapBlk, newtid);
MarkBufferDirty(revmapbuf);
/* XLOG stuff */
if (RelationNeedsWAL(idxrel))
{
xl_brin_update xlrec;
XLogRecPtr recptr;
XLogRecData rdata[4];
uint8 info;
info = XLOG_BRIN_UPDATE | (extended ? XLOG_BRIN_INIT_PAGE : 0);
xlrec.new.node = idxrel->rd_node;
ItemPointerSet(&xlrec.new.tid, BufferGetBlockNumber(newbuf), newoff);
xlrec.new.heapBlk = heapBlk;
xlrec.new.tuplen = newsz;
xlrec.new.revmapBlk = BufferGetBlockNumber(revmapbuf);
xlrec.new.pagesPerRange = pagesPerRange;
ItemPointerSet(&xlrec.oldtid, BufferGetBlockNumber(oldbuf), oldoff);
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfBrinUpdate;
rdata[0].buffer = InvalidBuffer;
rdata[0].next = &(rdata[1]);
rdata[1].data = (char *) newtup;
rdata[1].len = newsz;
rdata[1].buffer = extended ? InvalidBuffer : newbuf;
rdata[1].buffer_std = true;
rdata[1].next = &(rdata[2]);
rdata[2].data = (char *) NULL;
rdata[2].len = 0;
rdata[2].buffer = revmapbuf;
rdata[2].buffer_std = true;
rdata[2].next = &(rdata[3]);
rdata[3].data = (char *) NULL;
rdata[3].len = 0;
rdata[3].buffer = oldbuf;
rdata[3].buffer_std = true;
rdata[3].next = NULL;
recptr = XLogInsert(RM_BRIN_ID, info, rdata);
PageSetLSN(oldpage, recptr);
PageSetLSN(newpage, recptr);
PageSetLSN(BufferGetPage(revmapbuf), recptr);
}
END_CRIT_SECTION();
LockBuffer(revmapbuf, BUFFER_LOCK_UNLOCK);
LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
UnlockReleaseBuffer(newbuf);
return true;
}
}
/*
* Return whether brin_doupdate can do a samepage update.
*/
bool
brin_can_do_samepage_update(Buffer buffer, Size origsz, Size newsz)
{
return
((newsz <= origsz) ||
PageGetExactFreeSpace(BufferGetPage(buffer)) >= (newsz - origsz));
}
/*
* Insert an index tuple into the index relation. The revmap is updated to
* mark the range containing the given page as pointing to the inserted entry.
* A WAL record is written.
*
* The buffer, if valid, is first checked for free space to insert the new
* entry; if there isn't enough, a new buffer is obtained and pinned. No
* buffer lock must be held on entry, no buffer lock is held on exit.
*
* Return value is the offset number where the tuple was inserted.
*/
OffsetNumber
brin_doinsert(Relation idxrel, BlockNumber pagesPerRange,
BrinRevmap *revmap, Buffer *buffer, BlockNumber heapBlk,
BrinTuple *tup, Size itemsz)
{
Page page;
BlockNumber blk;
OffsetNumber off;
Buffer revmapbuf;
ItemPointerData tid;
bool extended = false;
itemsz = MAXALIGN(itemsz);
/* Make sure the revmap is long enough to contain the entry we need */
brinRevmapExtend(revmap, heapBlk);
/*
* Obtain a locked buffer to insert the new tuple. Note
* brin_getinsertbuffer ensures there's enough space in the returned
* buffer.
*/
if (BufferIsValid(*buffer))
{
/*
* It's possible that another backend (or ourselves!) extended the
* revmap over the page we held a pin on, so we cannot assume that
* it's still a regular page.
*/
LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
if (br_page_get_freespace(BufferGetPage(*buffer)) < itemsz)
{
UnlockReleaseBuffer(*buffer);
*buffer = InvalidBuffer;
}
}
if (!BufferIsValid(*buffer))
{
*buffer = brin_getinsertbuffer(idxrel, InvalidBuffer, itemsz, &extended);
Assert(BufferIsValid(*buffer));
Assert(br_page_get_freespace(BufferGetPage(*buffer)) >= itemsz);
}
/* Now obtain lock on revmap buffer */
revmapbuf = brinLockRevmapPageForUpdate(revmap, heapBlk);
page = BufferGetPage(*buffer);
blk = BufferGetBlockNumber(*buffer);
START_CRIT_SECTION();
off = PageAddItem(page, (Item) tup, itemsz, InvalidOffsetNumber,
false, false);
if (off == InvalidOffsetNumber)
elog(ERROR, "could not insert new index tuple to page");
MarkBufferDirty(*buffer);
BRIN_elog(DEBUG2, "inserted tuple (%u,%u) for range starting at %u",
blk, off, heapBlk);
ItemPointerSet(&tid, blk, off);
brinSetHeapBlockItemptr(revmapbuf, pagesPerRange, heapBlk, tid);
MarkBufferDirty(revmapbuf);
/* XLOG stuff */
if (RelationNeedsWAL(idxrel))
{
xl_brin_insert xlrec;
XLogRecPtr recptr;
XLogRecData rdata[3];
uint8 info;
info = XLOG_BRIN_INSERT | (extended ? XLOG_BRIN_INIT_PAGE : 0);
xlrec.node = idxrel->rd_node;
xlrec.heapBlk = heapBlk;
xlrec.pagesPerRange = pagesPerRange;
xlrec.revmapBlk = BufferGetBlockNumber(revmapbuf);
xlrec.tuplen = itemsz;
ItemPointerSet(&xlrec.tid, blk, off);
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfBrinInsert;
rdata[0].buffer = InvalidBuffer;
rdata[0].buffer_std = false;
rdata[0].next = &(rdata[1]);
rdata[1].data = (char *) tup;
rdata[1].len = itemsz;
rdata[1].buffer = extended ? InvalidBuffer : *buffer;
rdata[1].buffer_std = true;
rdata[1].next = &(rdata[2]);
rdata[2].data = (char *) NULL;
rdata[2].len = 0;
rdata[2].buffer = revmapbuf;
rdata[2].buffer_std = false;
rdata[2].next = NULL;
recptr = XLogInsert(RM_BRIN_ID, info, rdata);
PageSetLSN(page, recptr);
PageSetLSN(BufferGetPage(revmapbuf), recptr);
}
END_CRIT_SECTION();
/* Tuple is firmly on buffer; we can release our locks */
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
LockBuffer(revmapbuf, BUFFER_LOCK_UNLOCK);
if (extended)
FreeSpaceMapVacuum(idxrel);
return off;
}
/*
* Initialize a page with the given type.
*
* Caller is responsible for marking it dirty, as appropriate.
*/
void
brin_page_init(Page page, uint16 type)
{
BrinSpecialSpace *special;
PageInit(page, BLCKSZ, sizeof(BrinSpecialSpace));
special = (BrinSpecialSpace *) PageGetSpecialPointer(page);
special->type = type;
}
/*
* Initialize a new BRIN index' metapage.
*/
void
brin_metapage_init(Page page, BlockNumber pagesPerRange, uint16 version)
{
BrinMetaPageData *metadata;
brin_page_init(page, BRIN_PAGETYPE_META);
metadata = (BrinMetaPageData *) PageGetContents(page);
metadata->brinMagic = BRIN_META_MAGIC;
metadata->brinVersion = version;
metadata->pagesPerRange = pagesPerRange;
/*
* Note we cheat here a little. 0 is not a valid revmap block number
* (because it's the metapage buffer), but doing this enables the first
* revmap page to be created when the index is.
*/
metadata->lastRevmapPage = 0;
}
/*
* Initiate page evacuation protocol.
*
* The page must be locked in exclusive mode by the caller.
*
* If the page is not yet initialized or empty, return false without doing
* anything; it can be used for revmap without any further changes. If it
* contains tuples, mark it for evacuation and return true.
*/
bool
brin_start_evacuating_page(Relation idxRel, Buffer buf)
{
OffsetNumber off;
OffsetNumber maxoff;
BrinSpecialSpace *special;
Page page;
page = BufferGetPage(buf);
if (PageIsNew(page))
return false;
special = (BrinSpecialSpace *) PageGetSpecialPointer(page);
maxoff = PageGetMaxOffsetNumber(page);
for (off = FirstOffsetNumber; off <= maxoff; off++)
{
ItemId lp;
lp = PageGetItemId(page, off);
if (ItemIdIsUsed(lp))
{
/* prevent other backends from adding more stuff to this page */
special->flags |= BRIN_EVACUATE_PAGE;
MarkBufferDirtyHint(buf, true);
return true;
}
}
return false;
}
/*
* Move all tuples out of a page.
*
* The caller must hold lock on the page. The lock and pin are released.
*/
void
brin_evacuate_page(Relation idxRel, BlockNumber pagesPerRange,
BrinRevmap *revmap, Buffer buf)
{
OffsetNumber off;
OffsetNumber maxoff;
Page page;
page = BufferGetPage(buf);
Assert(((BrinSpecialSpace *)
PageGetSpecialPointer(page))->flags & BRIN_EVACUATE_PAGE);
maxoff = PageGetMaxOffsetNumber(page);
for (off = FirstOffsetNumber; off <= maxoff; off++)
{
BrinTuple *tup;
Size sz;
ItemId lp;
CHECK_FOR_INTERRUPTS();
lp = PageGetItemId(page, off);
if (ItemIdIsUsed(lp))
{
sz = ItemIdGetLength(lp);
tup = (BrinTuple *) PageGetItem(page, lp);
tup = brin_copy_tuple(tup, sz);
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
if (!brin_doupdate(idxRel, pagesPerRange, revmap, tup->bt_blkno,
buf, off, tup, sz, tup, sz, false))
off--; /* retry */
LockBuffer(buf, BUFFER_LOCK_SHARE);
/* It's possible that someone extended the revmap over this page */
if (!BRIN_IS_REGULAR_PAGE(page))
break;
}
}
UnlockReleaseBuffer(buf);
}
/*
* Return a pinned and exclusively locked buffer which can be used to insert an
* index item of size itemsz. If oldbuf is a valid buffer, it is also locked
* (in a order determined to avoid deadlocks.)
*
* If there's no existing page with enough free space to accomodate the new
* item, the relation is extended. If this happens, *extended is set to true.
*
* If we find that the old page is no longer a regular index page (because
* of a revmap extension), the old buffer is unlocked and we return
* InvalidBuffer.
*/
static Buffer
brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
bool *was_extended)
{
BlockNumber oldblk;
BlockNumber newblk;
Page page;
int freespace;
if (BufferIsValid(oldbuf))
oldblk = BufferGetBlockNumber(oldbuf);
else
oldblk = InvalidBlockNumber;
/*
* Loop until we find a page with sufficient free space. By the time we
* return to caller out of this loop, both buffers are valid and locked;
* if we have to restart here, neither buffer is locked and buf is not a
* pinned buffer.
*/
newblk = RelationGetTargetBlock(irel);
if (newblk == InvalidBlockNumber)
newblk = GetPageWithFreeSpace(irel, itemsz);
for (;;)
{
Buffer buf;
bool extensionLockHeld = false;
bool extended = false;
CHECK_FOR_INTERRUPTS();
if (newblk == InvalidBlockNumber)
{
/*
* There's not enough free space in any existing index page,
* according to the FSM: extend the relation to obtain a shiny new
* page.
*/
if (!RELATION_IS_LOCAL(irel))
{
LockRelationForExtension(irel, ExclusiveLock);
extensionLockHeld = true;
}
buf = ReadBuffer(irel, P_NEW);
newblk = BufferGetBlockNumber(buf);
*was_extended = extended = true;
BRIN_elog(DEBUG2, "brin_getinsertbuffer: extending to page %u",
BufferGetBlockNumber(buf));
}
else if (newblk == oldblk)
{
/*
* There's an odd corner-case here where the FSM is out-of-date,
* and gave us the old page.
*/
buf = oldbuf;
}
else
{
buf = ReadBuffer(irel, newblk);
}
/*
* We lock the old buffer first, if it's earlier than the new one; but
* before we do, we need to check that it hasn't been turned into a
* revmap page concurrently; if we detect that it happened, give up
* and tell caller to start over.
*/
if (BufferIsValid(oldbuf) && oldblk < newblk)
{
LockBuffer(oldbuf, BUFFER_LOCK_EXCLUSIVE);
if (!BRIN_IS_REGULAR_PAGE(BufferGetPage(oldbuf)))
{
LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buf);
return InvalidBuffer;
}
}
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
if (extensionLockHeld)
UnlockRelationForExtension(irel, ExclusiveLock);
page = BufferGetPage(buf);
if (extended)
brin_page_init(page, BRIN_PAGETYPE_REGULAR);
/*
* We have a new buffer to insert into. Check that the new page has
* enough free space, and return it if it does; otherwise start over.
* Note that we allow for the FSM to be out of date here, and in that
* case we update it and move on.
*
* (br_page_get_freespace also checks that the FSM didn't hand us a
* page that has since been repurposed for the revmap.)
*/
freespace = br_page_get_freespace(page);
if (freespace >= itemsz)
{
RelationSetTargetBlock(irel, BufferGetBlockNumber(buf));
/*
* Since the target block specification can get lost on cache
* invalidations, make sure we update the more permanent FSM with
* data about it before going away.
*/
if (extended)
RecordPageWithFreeSpace(irel, BufferGetBlockNumber(buf),
freespace);
/*
* Lock the old buffer if not locked already. Note that in this
* case we know for sure it's a regular page: it's later than the
* new page we just got, which is not a revmap page, and revmap
* pages are always consecutive.
*/
if (BufferIsValid(oldbuf) && oldblk > newblk)
{
LockBuffer(oldbuf, BUFFER_LOCK_EXCLUSIVE);
Assert(BRIN_IS_REGULAR_PAGE(BufferGetPage(oldbuf)));
}
return buf;
}
/* This page is no good. */
/*
* If an entirely new page does not contain enough free space for the
* new item, then surely that item is oversized. Complain loudly; but
* first make sure we record the page as free, for next time.
*/
if (extended)
{
RecordPageWithFreeSpace(irel, BufferGetBlockNumber(buf),
freespace);
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
(unsigned long) itemsz,
(unsigned long) freespace,
RelationGetRelationName(irel))));
return InvalidBuffer; /* keep compiler quiet */
}
if (newblk != oldblk)
UnlockReleaseBuffer(buf);
if (BufferIsValid(oldbuf) && oldblk <= newblk)
LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
newblk = RecordAndGetPageWithFreeSpace(irel, newblk, freespace, itemsz);
}
}
/*
* Return the amount of free space on a regular BRIN index page.
*
* If the page is not a regular page, or has been marked with the
* BRIN_EVACUATE_PAGE flag, returns 0.
*/
static Size
br_page_get_freespace(Page page)
{
BrinSpecialSpace *special;
special = (BrinSpecialSpace *) PageGetSpecialPointer(page);
if (!BRIN_IS_REGULAR_PAGE(page) ||
(special->flags & BRIN_EVACUATE_PAGE) != 0)
return 0;
else
return PageGetFreeSpace(page);
}

View File

@@ -0,0 +1,510 @@
/*
* brin_revmap.c
* Range map for BRIN indexes
*
* The range map (revmap) is a translation structure for BRIN indexes: for each
* page range there is one summary tuple, and its location is tracked by the
* revmap. Whenever a new tuple is inserted into a table that violates the
* previously recorded summary values, a new tuple is inserted into the index
* and the revmap is updated to point to it.
*
* The revmap is stored in the first pages of the index, immediately following
* the metapage. When the revmap needs to be expanded, all tuples on the
* regular BRIN page at that block (if any) are moved out of the way.
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/brin/brin_revmap.c
*/
#include "postgres.h"
#include "access/brin_page.h"
#include "access/brin_pageops.h"
#include "access/brin_revmap.h"
#include "access/brin_tuple.h"
#include "access/brin_xlog.h"
#include "access/rmgr.h"
#include "access/xloginsert.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "utils/rel.h"
/*
* In revmap pages, each item stores an ItemPointerData. These defines let one
* find the logical revmap page number and index number of the revmap item for
* the given heap block number.
*/
#define HEAPBLK_TO_REVMAP_BLK(pagesPerRange, heapBlk) \
((heapBlk / pagesPerRange) / REVMAP_PAGE_MAXITEMS)
#define HEAPBLK_TO_REVMAP_INDEX(pagesPerRange, heapBlk) \
((heapBlk / pagesPerRange) % REVMAP_PAGE_MAXITEMS)
struct BrinRevmap
{
Relation rm_irel;
BlockNumber rm_pagesPerRange;
BlockNumber rm_lastRevmapPage; /* cached from the metapage */
Buffer rm_metaBuf;
Buffer rm_currBuf;
};
/* typedef appears in brin_revmap.h */
static BlockNumber revmap_get_blkno(BrinRevmap *revmap,
BlockNumber heapBlk);
static Buffer revmap_get_buffer(BrinRevmap *revmap, BlockNumber heapBlk);
static BlockNumber revmap_extend_and_get_blkno(BrinRevmap *revmap,
BlockNumber heapBlk);
static void revmap_physical_extend(BrinRevmap *revmap);
/*
* Initialize an access object for a range map. This must be freed by
* brinRevmapTerminate when caller is done with it.
*/
BrinRevmap *
brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange)
{
BrinRevmap *revmap;
Buffer meta;
BrinMetaPageData *metadata;
meta = ReadBuffer(idxrel, BRIN_METAPAGE_BLKNO);
LockBuffer(meta, BUFFER_LOCK_SHARE);
metadata = (BrinMetaPageData *) PageGetContents(BufferGetPage(meta));
revmap = palloc(sizeof(BrinRevmap));
revmap->rm_irel = idxrel;
revmap->rm_pagesPerRange = metadata->pagesPerRange;
revmap->rm_lastRevmapPage = metadata->lastRevmapPage;
revmap->rm_metaBuf = meta;
revmap->rm_currBuf = InvalidBuffer;
*pagesPerRange = metadata->pagesPerRange;
LockBuffer(meta, BUFFER_LOCK_UNLOCK);
return revmap;
}
/*
* Release resources associated with a revmap access object.
*/
void
brinRevmapTerminate(BrinRevmap *revmap)
{
ReleaseBuffer(revmap->rm_metaBuf);
if (revmap->rm_currBuf != InvalidBuffer)
ReleaseBuffer(revmap->rm_currBuf);
pfree(revmap);
}
/*
* Extend the revmap to cover the given heap block number.
*/
void
brinRevmapExtend(BrinRevmap *revmap, BlockNumber heapBlk)
{
BlockNumber mapBlk;
mapBlk = revmap_extend_and_get_blkno(revmap, heapBlk);
/* Ensure the buffer we got is in the expected range */
Assert(mapBlk != InvalidBlockNumber &&
mapBlk != BRIN_METAPAGE_BLKNO &&
mapBlk <= revmap->rm_lastRevmapPage);
}
/*
* Prepare to insert an entry into the revmap; the revmap buffer in which the
* entry is to reside is locked and returned. Most callers should call
* brinRevmapExtend beforehand, as this routine does not extend the revmap if
* it's not long enough.
*
* The returned buffer is also recorded in the revmap struct; finishing that
* releases the buffer, therefore the caller needn't do it explicitely.
*/
Buffer
brinLockRevmapPageForUpdate(BrinRevmap *revmap, BlockNumber heapBlk)
{
Buffer rmBuf;
rmBuf = revmap_get_buffer(revmap, heapBlk);
LockBuffer(rmBuf, BUFFER_LOCK_EXCLUSIVE);
return rmBuf;
}
/*
* In the given revmap buffer (locked appropriately by caller), which is used
* in a BRIN index of pagesPerRange pages per range, set the element
* corresponding to heap block number heapBlk to the given TID.
*
* Once the operation is complete, the caller must update the LSN on the
* returned buffer.
*
* This is used both in regular operation and during WAL replay.
*/
void
brinSetHeapBlockItemptr(Buffer buf, BlockNumber pagesPerRange,
BlockNumber heapBlk, ItemPointerData tid)
{
RevmapContents *contents;
ItemPointerData *iptr;
Page page;
/* The correct page should already be pinned and locked */
page = BufferGetPage(buf);
contents = (RevmapContents *) PageGetContents(page);
iptr = (ItemPointerData *) contents->rm_tids;
iptr += HEAPBLK_TO_REVMAP_INDEX(pagesPerRange, heapBlk);
ItemPointerSet(iptr,
ItemPointerGetBlockNumber(&tid),
ItemPointerGetOffsetNumber(&tid));
}
/*
* Fetch the BrinTuple for a given heap block.
*
* The buffer containing the tuple is locked, and returned in *buf. As an
* optimization, the caller can pass a pinned buffer *buf on entry, which will
* avoid a pin-unpin cycle when the next tuple is on the same page as a
* previous one.
*
* If no tuple is found for the given heap range, returns NULL. In that case,
* *buf might still be updated, but it's not locked.
*
* The output tuple offset within the buffer is returned in *off, and its size
* is returned in *size.
*/
BrinTuple *
brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk,
Buffer *buf, OffsetNumber *off, Size *size, int mode)
{
Relation idxRel = revmap->rm_irel;
BlockNumber mapBlk;
RevmapContents *contents;
ItemPointerData *iptr;
BlockNumber blk;
Page page;
ItemId lp;
BrinTuple *tup;
ItemPointerData previptr;
/* normalize the heap block number to be the first page in the range */
heapBlk = (heapBlk / revmap->rm_pagesPerRange) * revmap->rm_pagesPerRange;
/* Compute the revmap page number we need */
mapBlk = revmap_get_blkno(revmap, heapBlk);
if (mapBlk == InvalidBlockNumber)
{
*off = InvalidOffsetNumber;
return NULL;
}
ItemPointerSetInvalid(&previptr);
for (;;)
{
CHECK_FOR_INTERRUPTS();
if (revmap->rm_currBuf == InvalidBuffer ||
BufferGetBlockNumber(revmap->rm_currBuf) != mapBlk)
{
if (revmap->rm_currBuf != InvalidBuffer)
ReleaseBuffer(revmap->rm_currBuf);
Assert(mapBlk != InvalidBlockNumber);
revmap->rm_currBuf = ReadBuffer(revmap->rm_irel, mapBlk);
}
LockBuffer(revmap->rm_currBuf, BUFFER_LOCK_SHARE);
contents = (RevmapContents *)
PageGetContents(BufferGetPage(revmap->rm_currBuf));
iptr = contents->rm_tids;
iptr += HEAPBLK_TO_REVMAP_INDEX(revmap->rm_pagesPerRange, heapBlk);
if (!ItemPointerIsValid(iptr))
{
LockBuffer(revmap->rm_currBuf, BUFFER_LOCK_UNLOCK);
return NULL;
}
/*
* Check the TID we got in a previous iteration, if any, and save the
* current TID we got from the revmap; if we loop, we can sanity-check
* that the next one we get is different. Otherwise we might be stuck
* looping forever if the revmap is somehow badly broken.
*/
if (ItemPointerIsValid(&previptr) && ItemPointerEquals(&previptr, iptr))
ereport(ERROR,
(errcode(ERRCODE_INDEX_CORRUPTED),
errmsg_internal("corrupted BRIN index: inconsistent range map")));
previptr = *iptr;
blk = ItemPointerGetBlockNumber(iptr);
*off = ItemPointerGetOffsetNumber(iptr);
LockBuffer(revmap->rm_currBuf, BUFFER_LOCK_UNLOCK);
/* Ok, got a pointer to where the BrinTuple should be. Fetch it. */
if (!BufferIsValid(*buf) || BufferGetBlockNumber(*buf) != blk)
{
if (BufferIsValid(*buf))
ReleaseBuffer(*buf);
*buf = ReadBuffer(idxRel, blk);
}
LockBuffer(*buf, mode);
page = BufferGetPage(*buf);
/* If we land on a revmap page, start over */
if (BRIN_IS_REGULAR_PAGE(page))
{
lp = PageGetItemId(page, *off);
if (ItemIdIsUsed(lp))
{
tup = (BrinTuple *) PageGetItem(page, lp);
if (tup->bt_blkno == heapBlk)
{
if (size)
*size = ItemIdGetLength(lp);
/* found it! */
return tup;
}
}
}
/*
* No luck. Assume that the revmap was updated concurrently.
*/
LockBuffer(*buf, BUFFER_LOCK_UNLOCK);
}
/* not reached, but keep compiler quiet */
return NULL;
}
/*
* Given a heap block number, find the corresponding physical revmap block
* number and return it. If the revmap page hasn't been allocated yet, return
* InvalidBlockNumber.
*/
static BlockNumber
revmap_get_blkno(BrinRevmap *revmap, BlockNumber heapBlk)
{
BlockNumber targetblk;
/* obtain revmap block number, skip 1 for metapage block */
targetblk = HEAPBLK_TO_REVMAP_BLK(revmap->rm_pagesPerRange, heapBlk) + 1;
/* Normal case: the revmap page is already allocated */
if (targetblk <= revmap->rm_lastRevmapPage)
return targetblk;
return InvalidBlockNumber;
}
/*
* Obtain and return a buffer containing the revmap page for the given heap
* page. The revmap must have been previously extended to cover that page.
* The returned buffer is also recorded in the revmap struct; finishing that
* releases the buffer, therefore the caller needn't do it explicitely.
*/
static Buffer
revmap_get_buffer(BrinRevmap *revmap, BlockNumber heapBlk)
{
BlockNumber mapBlk;
/* Translate the heap block number to physical index location. */
mapBlk = revmap_get_blkno(revmap, heapBlk);
if (mapBlk == InvalidBlockNumber)
elog(ERROR, "revmap does not cover heap block %u", heapBlk);
/* Ensure the buffer we got is in the expected range */
Assert(mapBlk != BRIN_METAPAGE_BLKNO &&
mapBlk <= revmap->rm_lastRevmapPage);
BRIN_elog(DEBUG2, "getting revmap page for logical page %lu (physical %u) for heap %u",
HEAPBLK_TO_REVMAP_BLK(revmap->rm_pagesPerRange, heapBlk),
mapBlk, heapBlk);
/*
* Obtain the buffer from which we need to read. If we already have the
* correct buffer in our access struct, use that; otherwise, release that,
* (if valid) and read the one we need.
*/
if (revmap->rm_currBuf == InvalidBuffer ||
mapBlk != BufferGetBlockNumber(revmap->rm_currBuf))
{
if (revmap->rm_currBuf != InvalidBuffer)
ReleaseBuffer(revmap->rm_currBuf);
revmap->rm_currBuf = ReadBuffer(revmap->rm_irel, mapBlk);
}
return revmap->rm_currBuf;
}
/*
* Given a heap block number, find the corresponding physical revmap block
* number and return it. If the revmap page hasn't been allocated yet, extend
* the revmap until it is.
*/
static BlockNumber
revmap_extend_and_get_blkno(BrinRevmap *revmap, BlockNumber heapBlk)
{
BlockNumber targetblk;
/* obtain revmap block number, skip 1 for metapage block */
targetblk = HEAPBLK_TO_REVMAP_BLK(revmap->rm_pagesPerRange, heapBlk) + 1;
/* Extend the revmap, if necessary */
while (targetblk > revmap->rm_lastRevmapPage)
{
CHECK_FOR_INTERRUPTS();
revmap_physical_extend(revmap);
}
return targetblk;
}
/*
* Try to extend the revmap by one page. This might not happen for a number of
* reasons; caller is expected to retry until the expected outcome is obtained.
*/
static void
revmap_physical_extend(BrinRevmap *revmap)
{
Buffer buf;
Page page;
Page metapage;
BrinMetaPageData *metadata;
BlockNumber mapBlk;
BlockNumber nblocks;
Relation irel = revmap->rm_irel;
bool needLock = !RELATION_IS_LOCAL(irel);
/*
* Lock the metapage. This locks out concurrent extensions of the revmap,
* but note that we still need to grab the relation extension lock because
* another backend can extend the index with regular BRIN pages.
*/
LockBuffer(revmap->rm_metaBuf, BUFFER_LOCK_EXCLUSIVE);
metapage = BufferGetPage(revmap->rm_metaBuf);
metadata = (BrinMetaPageData *) PageGetContents(metapage);
/*
* Check that our cached lastRevmapPage value was up-to-date; if it
* wasn't, update the cached copy and have caller start over.
*/
if (metadata->lastRevmapPage != revmap->rm_lastRevmapPage)
{
revmap->rm_lastRevmapPage = metadata->lastRevmapPage;
LockBuffer(revmap->rm_metaBuf, BUFFER_LOCK_UNLOCK);
return;
}
mapBlk = metadata->lastRevmapPage + 1;
nblocks = RelationGetNumberOfBlocks(irel);
if (mapBlk < nblocks)
{
buf = ReadBuffer(irel, mapBlk);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf);
}
else
{
if (needLock)
LockRelationForExtension(irel, ExclusiveLock);
buf = ReadBuffer(irel, P_NEW);
if (BufferGetBlockNumber(buf) != mapBlk)
{
/*
* Very rare corner case: somebody extended the relation
* concurrently after we read its length. If this happens, give
* up and have caller start over. We will have to evacuate that
* page from under whoever is using it.
*/
if (needLock)
UnlockRelationForExtension(irel, ExclusiveLock);
LockBuffer(revmap->rm_metaBuf, BUFFER_LOCK_UNLOCK);
return;
}
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf);
if (needLock)
UnlockRelationForExtension(irel, ExclusiveLock);
}
/* Check that it's a regular block (or an empty page) */
if (!PageIsNew(page) && !BRIN_IS_REGULAR_PAGE(page))
ereport(ERROR,
(errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("unexpected page type 0x%04X in BRIN index \"%s\" block %u",
BRIN_PAGE_TYPE(page),
RelationGetRelationName(irel),
BufferGetBlockNumber(buf))));
/* If the page is in use, evacuate it and restart */
if (brin_start_evacuating_page(irel, buf))
{
LockBuffer(revmap->rm_metaBuf, BUFFER_LOCK_UNLOCK);
brin_evacuate_page(irel, revmap->rm_pagesPerRange, revmap, buf);
/* have caller start over */
return;
}
/*
* Ok, we have now locked the metapage and the target block. Re-initialize
* it as a revmap page.
*/
START_CRIT_SECTION();
/* the rm_tids array is initialized to all invalid by PageInit */
brin_page_init(page, BRIN_PAGETYPE_REVMAP);
MarkBufferDirty(buf);
metadata->lastRevmapPage = mapBlk;
MarkBufferDirty(revmap->rm_metaBuf);
if (RelationNeedsWAL(revmap->rm_irel))
{
xl_brin_revmap_extend xlrec;
XLogRecPtr recptr;
XLogRecData rdata[2];
xlrec.node = revmap->rm_irel->rd_node;
xlrec.targetBlk = mapBlk;
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfBrinRevmapExtend;
rdata[0].buffer = InvalidBuffer;
rdata[0].buffer_std = false;
rdata[0].next = &(rdata[1]);
rdata[1].data = (char *) NULL;
rdata[1].len = 0;
rdata[1].buffer = revmap->rm_metaBuf;
rdata[1].buffer_std = false;
rdata[1].next = NULL;
recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_REVMAP_EXTEND, rdata);
PageSetLSN(metapage, recptr);
PageSetLSN(page, recptr);
}
END_CRIT_SECTION();
LockBuffer(revmap->rm_metaBuf, BUFFER_LOCK_UNLOCK);
UnlockReleaseBuffer(buf);
}

View File

@@ -0,0 +1,554 @@
/*
* brin_tuples.c
* Method implementations for tuples in BRIN indexes.
*
* Intended usage is that code outside this file only deals with
* BrinMemTuples, and convert to and from the on-disk representation through
* functions in this file.
*
* NOTES
*
* A BRIN tuple is similar to a heap tuple, with a few key differences. The
* first interesting difference is that the tuple header is much simpler, only
* containing its total length and a small area for flags. Also, the stored
* data does not match the relation tuple descriptor exactly: for each
* attribute in the descriptor, the index tuple carries an arbitrary number
* of values, depending on the opclass.
*
* Also, for each column of the index relation there are two null bits: one
* (hasnulls) stores whether any tuple within the page range has that column
* set to null; the other one (allnulls) stores whether the column values are
* all null. If allnulls is true, then the tuple data area does not contain
* values for that column at all; whereas it does if the hasnulls is set.
* Note the size of the null bitmask may not be the same as that of the
* datum array.
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/brin/brin_tuple.c
*/
#include "postgres.h"
#include "access/htup_details.h"
#include "access/brin_tuple.h"
#include "access/tupdesc.h"
#include "access/tupmacs.h"
#include "utils/datum.h"
#include "utils/memutils.h"
static inline void brin_deconstruct_tuple(BrinDesc *brdesc,
char *tp, bits8 *nullbits, bool nulls,
Datum *values, bool *allnulls, bool *hasnulls);
/*
* Return a tuple descriptor used for on-disk storage of BRIN tuples.
*/
static TupleDesc
brtuple_disk_tupdesc(BrinDesc *brdesc)
{
/* We cache these in the BrinDesc */
if (brdesc->bd_disktdesc == NULL)
{
int i;
int j;
AttrNumber attno = 1;
TupleDesc tupdesc;
MemoryContext oldcxt;
/* make sure it's in the bdesc's context */
oldcxt = MemoryContextSwitchTo(brdesc->bd_context);
tupdesc = CreateTemplateTupleDesc(brdesc->bd_totalstored, false);
for (i = 0; i < brdesc->bd_tupdesc->natts; i++)
{
for (j = 0; j < brdesc->bd_info[i]->oi_nstored; j++)
TupleDescInitEntry(tupdesc, attno++, NULL,
brdesc->bd_info[i]->oi_typids[j],
-1, 0);
}
MemoryContextSwitchTo(oldcxt);
brdesc->bd_disktdesc = tupdesc;
}
return brdesc->bd_disktdesc;
}
/*
* Generate a new on-disk tuple to be inserted in a BRIN index.
*
* See brin_form_placeholder_tuple if you touch this.
*/
BrinTuple *
brin_form_tuple(BrinDesc *brdesc, BlockNumber blkno, BrinMemTuple *tuple,
Size *size)
{
Datum *values;
bool *nulls;
bool anynulls = false;
BrinTuple *rettuple;
int keyno;
int idxattno;
uint16 phony_infomask;
bits8 *phony_nullbitmap;
Size len,
hoff,
data_len;
Assert(brdesc->bd_totalstored > 0);
values = palloc(sizeof(Datum) * brdesc->bd_totalstored);
nulls = palloc0(sizeof(bool) * brdesc->bd_totalstored);
phony_nullbitmap = palloc(sizeof(bits8) * BITMAPLEN(brdesc->bd_totalstored));
/*
* Set up the values/nulls arrays for heap_fill_tuple
*/
idxattno = 0;
for (keyno = 0; keyno < brdesc->bd_tupdesc->natts; keyno++)
{
int datumno;
/*
* "allnulls" is set when there's no nonnull value in any row in the
* column; when this happens, there is no data to store. Thus set the
* nullable bits for all data elements of this column and we're done.
*/
if (tuple->bt_columns[keyno].bv_allnulls)
{
for (datumno = 0;
datumno < brdesc->bd_info[keyno]->oi_nstored;
datumno++)
nulls[idxattno++] = true;
anynulls = true;
continue;
}
/*
* The "hasnulls" bit is set when there are some null values in the
* data. We still need to store a real value, but the presence of
* this means we need a null bitmap.
*/
if (tuple->bt_columns[keyno].bv_hasnulls)
anynulls = true;
for (datumno = 0;
datumno < brdesc->bd_info[keyno]->oi_nstored;
datumno++)
values[idxattno++] = tuple->bt_columns[keyno].bv_values[datumno];
}
/* compute total space needed */
len = SizeOfBrinTuple;
if (anynulls)
{
/*
* We need a double-length bitmap on an on-disk BRIN index tuple; the
* first half stores the "allnulls" bits, the second stores
* "hasnulls".
*/
len += BITMAPLEN(brdesc->bd_tupdesc->natts * 2);
}
len = hoff = MAXALIGN(len);
data_len = heap_compute_data_size(brtuple_disk_tupdesc(brdesc),
values, nulls);
len += data_len;
rettuple = palloc0(len);
rettuple->bt_blkno = blkno;
rettuple->bt_info = hoff;
Assert((rettuple->bt_info & BRIN_OFFSET_MASK) == hoff);
/*
* The infomask and null bitmap as computed by heap_fill_tuple are useless
* to us. However, that function will not accept a null infomask; and we
* need to pass a valid null bitmap so that it will correctly skip
* outputting null attributes in the data area.
*/
heap_fill_tuple(brtuple_disk_tupdesc(brdesc),
values,
nulls,
(char *) rettuple + hoff,
data_len,
&phony_infomask,
phony_nullbitmap);
/* done with these */
pfree(values);
pfree(nulls);
pfree(phony_nullbitmap);
/*
* Now fill in the real null bitmasks. allnulls first.
*/
if (anynulls)
{
bits8 *bitP;
int bitmask;
rettuple->bt_info |= BRIN_NULLS_MASK;
/*
* Note that we reverse the sense of null bits in this module: we
* store a 1 for a null attribute rather than a 0. So we must reverse
* the sense of the att_isnull test in br_deconstruct_tuple as well.
*/
bitP = ((bits8 *) ((char *) rettuple + SizeOfBrinTuple)) - 1;
bitmask = HIGHBIT;
for (keyno = 0; keyno < brdesc->bd_tupdesc->natts; keyno++)
{
if (bitmask != HIGHBIT)
bitmask <<= 1;
else
{
bitP += 1;
*bitP = 0x0;
bitmask = 1;
}
if (!tuple->bt_columns[keyno].bv_allnulls)
continue;
*bitP |= bitmask;
}
/* hasnulls bits follow */
for (keyno = 0; keyno < brdesc->bd_tupdesc->natts; keyno++)
{
if (bitmask != HIGHBIT)
bitmask <<= 1;
else
{
bitP += 1;
*bitP = 0x0;
bitmask = 1;
}
if (!tuple->bt_columns[keyno].bv_hasnulls)
continue;
*bitP |= bitmask;
}
bitP = ((bits8 *) (rettuple + SizeOfBrinTuple)) - 1;
}
if (tuple->bt_placeholder)
rettuple->bt_info |= BRIN_PLACEHOLDER_MASK;
*size = len;
return rettuple;
}
/*
* Generate a new on-disk tuple with no data values, marked as placeholder.
*
* This is a cut-down version of brin_form_tuple.
*/
BrinTuple *
brin_form_placeholder_tuple(BrinDesc *brdesc, BlockNumber blkno, Size *size)
{
Size len;
Size hoff;
BrinTuple *rettuple;
int keyno;
bits8 *bitP;
int bitmask;
/* compute total space needed: always add nulls */
len = SizeOfBrinTuple;
len += BITMAPLEN(brdesc->bd_tupdesc->natts * 2);
len = hoff = MAXALIGN(len);
rettuple = palloc0(len);
rettuple->bt_blkno = blkno;
rettuple->bt_info = hoff;
rettuple->bt_info |= BRIN_NULLS_MASK | BRIN_PLACEHOLDER_MASK;
bitP = ((bits8 *) ((char *) rettuple + SizeOfBrinTuple)) - 1;
bitmask = HIGHBIT;
/* set allnulls true for all attributes */
for (keyno = 0; keyno < brdesc->bd_tupdesc->natts; keyno++)
{
if (bitmask != HIGHBIT)
bitmask <<= 1;
else
{
bitP += 1;
*bitP = 0x0;
bitmask = 1;
}
*bitP |= bitmask;
}
/* no need to set hasnulls */
*size = len;
return rettuple;
}
/*
* Free a tuple created by brin_form_tuple
*/
void
brin_free_tuple(BrinTuple *tuple)
{
pfree(tuple);
}
/*
* Create an palloc'd copy of a BrinTuple.
*/
BrinTuple *
brin_copy_tuple(BrinTuple *tuple, Size len)
{
BrinTuple *newtup;
newtup = palloc(len);
memcpy(newtup, tuple, len);
return newtup;
}
/*
* Return whether two BrinTuples are bitwise identical.
*/
bool
brin_tuples_equal(const BrinTuple *a, Size alen, const BrinTuple *b, Size blen)
{
if (alen != blen)
return false;
if (memcmp(a, b, alen) != 0)
return false;
return true;
}
/*
* Create a new BrinMemTuple from scratch, and initialize it to an empty
* state.
*
* Note: we don't provide any means to free a deformed tuple, so make sure to
* use a temporary memory context.
*/
BrinMemTuple *
brin_new_memtuple(BrinDesc *brdesc)
{
BrinMemTuple *dtup;
char *currdatum;
long basesize;
int i;
basesize = MAXALIGN(sizeof(BrinMemTuple) +
sizeof(BrinValues) * brdesc->bd_tupdesc->natts);
dtup = palloc0(basesize + sizeof(Datum) * brdesc->bd_totalstored);
currdatum = (char *) dtup + basesize;
for (i = 0; i < brdesc->bd_tupdesc->natts; i++)
{
dtup->bt_columns[i].bv_attno = i + 1;
dtup->bt_columns[i].bv_allnulls = true;
dtup->bt_columns[i].bv_hasnulls = false;
dtup->bt_columns[i].bv_values = (Datum *) currdatum;
currdatum += sizeof(Datum) * brdesc->bd_info[i]->oi_nstored;
}
dtup->bt_context = AllocSetContextCreate(CurrentMemoryContext,
"brin dtuple",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
return dtup;
}
/*
* Reset a BrinMemTuple to initial state
*/
void
brin_memtuple_initialize(BrinMemTuple *dtuple, BrinDesc *brdesc)
{
int i;
MemoryContextReset(dtuple->bt_context);
for (i = 0; i < brdesc->bd_tupdesc->natts; i++)
{
dtuple->bt_columns[i].bv_allnulls = true;
dtuple->bt_columns[i].bv_hasnulls = false;
}
}
/*
* Convert a BrinTuple back to a BrinMemTuple. This is the reverse of
* brin_form_tuple.
*
* Note we don't need the "on disk tupdesc" here; we rely on our own routine to
* deconstruct the tuple from the on-disk format.
*/
BrinMemTuple *
brin_deform_tuple(BrinDesc *brdesc, BrinTuple *tuple)
{
BrinMemTuple *dtup;
Datum *values;
bool *allnulls;
bool *hasnulls;
char *tp;
bits8 *nullbits;
int keyno;
int valueno;
MemoryContext oldcxt;
dtup = brin_new_memtuple(brdesc);
if (BrinTupleIsPlaceholder(tuple))
dtup->bt_placeholder = true;
dtup->bt_blkno = tuple->bt_blkno;
values = palloc(sizeof(Datum) * brdesc->bd_totalstored);
allnulls = palloc(sizeof(bool) * brdesc->bd_tupdesc->natts);
hasnulls = palloc(sizeof(bool) * brdesc->bd_tupdesc->natts);
tp = (char *) tuple + BrinTupleDataOffset(tuple);
if (BrinTupleHasNulls(tuple))
nullbits = (bits8 *) ((char *) tuple + SizeOfBrinTuple);
else
nullbits = NULL;
brin_deconstruct_tuple(brdesc,
tp, nullbits, BrinTupleHasNulls(tuple),
values, allnulls, hasnulls);
/*
* Iterate to assign each of the values to the corresponding item in the
* values array of each column. The copies occur in the tuple's context.
*/
oldcxt = MemoryContextSwitchTo(dtup->bt_context);
for (valueno = 0, keyno = 0; keyno < brdesc->bd_tupdesc->natts; keyno++)
{
int i;
if (allnulls[keyno])
{
valueno += brdesc->bd_info[keyno]->oi_nstored;
continue;
}
/*
* We would like to skip datumCopy'ing the values datum in some cases,
* caller permitting ...
*/
for (i = 0; i < brdesc->bd_info[keyno]->oi_nstored; i++)
dtup->bt_columns[keyno].bv_values[i] =
datumCopy(values[valueno++],
brdesc->bd_tupdesc->attrs[keyno]->attbyval,
brdesc->bd_tupdesc->attrs[keyno]->attlen);
dtup->bt_columns[keyno].bv_hasnulls = hasnulls[keyno];
dtup->bt_columns[keyno].bv_allnulls = false;
}
MemoryContextSwitchTo(oldcxt);
pfree(values);
pfree(allnulls);
pfree(hasnulls);
return dtup;
}
/*
* brin_deconstruct_tuple
* Guts of attribute extraction from an on-disk BRIN tuple.
*
* Its arguments are:
* brdesc BRIN descriptor for the stored tuple
* tp pointer to the tuple data area
* nullbits pointer to the tuple nulls bitmask
* nulls "has nulls" bit in tuple infomask
* values output values, array of size brdesc->bd_totalstored
* allnulls output "allnulls", size brdesc->bd_tupdesc->natts
* hasnulls output "hasnulls", size brdesc->bd_tupdesc->natts
*
* Output arrays must have been allocated by caller.
*/
static inline void
brin_deconstruct_tuple(BrinDesc *brdesc,
char *tp, bits8 *nullbits, bool nulls,
Datum *values, bool *allnulls, bool *hasnulls)
{
int attnum;
int stored;
TupleDesc diskdsc;
long off;
/*
* First iterate to natts to obtain both null flags for each attribute.
* Note that we reverse the sense of the att_isnull test, because we store
* 1 for a null value (rather than a 1 for a not null value as is the
* att_isnull convention used elsewhere.) See brin_form_tuple.
*/
for (attnum = 0; attnum < brdesc->bd_tupdesc->natts; attnum++)
{
/*
* the "all nulls" bit means that all values in the page range for
* this column are nulls. Therefore there are no values in the tuple
* data area.
*/
allnulls[attnum] = nulls && !att_isnull(attnum, nullbits);
/*
* the "has nulls" bit means that some tuples have nulls, but others
* have not-null values. Therefore we know the tuple contains data
* for this column.
*
* The hasnulls bits follow the allnulls bits in the same bitmask.
*/
hasnulls[attnum] =
nulls && !att_isnull(brdesc->bd_tupdesc->natts + attnum, nullbits);
}
/*
* Iterate to obtain each attribute's stored values. Note that since we
* may reuse attribute entries for more than one column, we cannot cache
* offsets here.
*/
diskdsc = brtuple_disk_tupdesc(brdesc);
stored = 0;
off = 0;
for (attnum = 0; attnum < brdesc->bd_tupdesc->natts; attnum++)
{
int datumno;
if (allnulls[attnum])
{
stored += brdesc->bd_info[attnum]->oi_nstored;
continue;
}
for (datumno = 0;
datumno < brdesc->bd_info[attnum]->oi_nstored;
datumno++)
{
Form_pg_attribute thisatt = diskdsc->attrs[stored];
if (thisatt->attlen == -1)
{
off = att_align_pointer(off, thisatt->attalign, -1,
tp + off);
}
else
{
/* not varlena, so safe to use att_align_nominal */
off = att_align_nominal(off, thisatt->attalign);
}
values[stored++] = fetchatt(thisatt, tp + off);
off = att_addlength_pointer(off, thisatt->attlen, tp + off);
}
}
}

View File

@@ -0,0 +1,291 @@
/*
* brin_xlog.c
* XLog replay routines for BRIN indexes
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/brin/brin_xlog.c
*/
#include "postgres.h"
#include "access/brin_page.h"
#include "access/brin_pageops.h"
#include "access/brin_xlog.h"
#include "access/xlogutils.h"
/*
* xlog replay routines
*/
static void
brin_xlog_createidx(XLogRecPtr lsn, XLogRecord *record)
{
xl_brin_createidx *xlrec = (xl_brin_createidx *) XLogRecGetData(record);
Buffer buf;
Page page;
/* Backup blocks are not used in create_index records */
Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
/* create the index' metapage */
buf = XLogReadBuffer(xlrec->node, BRIN_METAPAGE_BLKNO, true);
Assert(BufferIsValid(buf));
page = (Page) BufferGetPage(buf);
brin_metapage_init(page, xlrec->pagesPerRange, xlrec->version);
PageSetLSN(page, lsn);
MarkBufferDirty(buf);
UnlockReleaseBuffer(buf);
}
/*
* Common part of an insert or update. Inserts the new tuple and updates the
* revmap.
*/
static void
brin_xlog_insert_update(XLogRecPtr lsn, XLogRecord *record,
xl_brin_insert *xlrec, BrinTuple *tuple)
{
BlockNumber blkno;
Buffer buffer;
Page page;
XLogRedoAction action;
blkno = ItemPointerGetBlockNumber(&xlrec->tid);
/*
* If we inserted the first and only tuple on the page, re-initialize the
* page from scratch.
*/
if (record->xl_info & XLOG_BRIN_INIT_PAGE)
{
XLogReadBufferForRedoExtended(lsn, record, 0,
xlrec->node, MAIN_FORKNUM, blkno,
RBM_ZERO, false, &buffer);
page = BufferGetPage(buffer);
brin_page_init(page, BRIN_PAGETYPE_REGULAR);
action = BLK_NEEDS_REDO;
}
else
{
action = XLogReadBufferForRedo(lsn, record, 0,
xlrec->node, blkno, &buffer);
}
/* insert the index item into the page */
if (action == BLK_NEEDS_REDO)
{
OffsetNumber offnum;
Assert(tuple->bt_blkno == xlrec->heapBlk);
page = (Page) BufferGetPage(buffer);
offnum = ItemPointerGetOffsetNumber(&(xlrec->tid));
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "brin_xlog_insert_update: invalid max offset number");
offnum = PageAddItem(page, (Item) tuple, xlrec->tuplen, offnum, true,
false);
if (offnum == InvalidOffsetNumber)
elog(PANIC, "brin_xlog_insert_update: failed to add tuple");
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/* update the revmap */
action = XLogReadBufferForRedo(lsn, record, 1, xlrec->node,
xlrec->revmapBlk, &buffer);
if (action == BLK_NEEDS_REDO)
{
page = (Page) BufferGetPage(buffer);
brinSetHeapBlockItemptr(buffer, xlrec->pagesPerRange, xlrec->heapBlk,
xlrec->tid);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/* XXX no FSM updates here ... */
}
/*
* replay a BRIN index insertion
*/
static void
brin_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
{
xl_brin_insert *xlrec = (xl_brin_insert *) XLogRecGetData(record);
BrinTuple *newtup;
newtup = (BrinTuple *) ((char *) xlrec + SizeOfBrinInsert);
brin_xlog_insert_update(lsn, record, xlrec, newtup);
}
/*
* replay a BRIN index update
*/
static void
brin_xlog_update(XLogRecPtr lsn, XLogRecord *record)
{
xl_brin_update *xlrec = (xl_brin_update *) XLogRecGetData(record);
BlockNumber blkno;
Buffer buffer;
BrinTuple *newtup;
XLogRedoAction action;
newtup = (BrinTuple *) ((char *) xlrec + SizeOfBrinUpdate);
/* First remove the old tuple */
blkno = ItemPointerGetBlockNumber(&(xlrec->oldtid));
action = XLogReadBufferForRedo(lsn, record, 2, xlrec->new.node,
blkno, &buffer);
if (action == BLK_NEEDS_REDO)
{
Page page;
OffsetNumber offnum;
page = (Page) BufferGetPage(buffer);
offnum = ItemPointerGetOffsetNumber(&(xlrec->oldtid));
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "brin_xlog_update: invalid max offset number");
PageIndexDeleteNoCompact(page, &offnum, 1);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
/* Then insert the new tuple and update revmap, like in an insertion. */
brin_xlog_insert_update(lsn, record, &xlrec->new, newtup);
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
/*
* Update a tuple on a single page.
*/
static void
brin_xlog_samepage_update(XLogRecPtr lsn, XLogRecord *record)
{
xl_brin_samepage_update *xlrec;
BlockNumber blkno;
Buffer buffer;
XLogRedoAction action;
xlrec = (xl_brin_samepage_update *) XLogRecGetData(record);
blkno = ItemPointerGetBlockNumber(&(xlrec->tid));
action = XLogReadBufferForRedo(lsn, record, 0, xlrec->node, blkno,
&buffer);
if (action == BLK_NEEDS_REDO)
{
int tuplen;
BrinTuple *mmtuple;
Page page;
OffsetNumber offnum;
tuplen = record->xl_len - SizeOfBrinSamepageUpdate;
mmtuple = (BrinTuple *) ((char *) xlrec + SizeOfBrinSamepageUpdate);
page = (Page) BufferGetPage(buffer);
offnum = ItemPointerGetOffsetNumber(&(xlrec->tid));
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "brin_xlog_samepage_update: invalid max offset number");
PageIndexDeleteNoCompact(page, &offnum, 1);
offnum = PageAddItem(page, (Item) mmtuple, tuplen, offnum, true, false);
if (offnum == InvalidOffsetNumber)
elog(PANIC, "brin_xlog_samepage_update: failed to add tuple");
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/* XXX no FSM updates here ... */
}
/*
* Replay a revmap page extension
*/
static void
brin_xlog_revmap_extend(XLogRecPtr lsn, XLogRecord *record)
{
xl_brin_revmap_extend *xlrec;
Buffer metabuf;
Buffer buf;
Page page;
XLogRedoAction action;
xlrec = (xl_brin_revmap_extend *) XLogRecGetData(record);
/* Update the metapage */
action = XLogReadBufferForRedo(lsn, record, 0, xlrec->node,
BRIN_METAPAGE_BLKNO, &metabuf);
if (action == BLK_NEEDS_REDO)
{
Page metapg;
BrinMetaPageData *metadata;
metapg = BufferGetPage(metabuf);
metadata = (BrinMetaPageData *) PageGetContents(metapg);
Assert(metadata->lastRevmapPage == xlrec->targetBlk - 1);
metadata->lastRevmapPage = xlrec->targetBlk;
PageSetLSN(metapg, lsn);
MarkBufferDirty(metabuf);
}
/*
* Re-init the target block as a revmap page. There's never a full- page
* image here.
*/
buf = XLogReadBuffer(xlrec->node, xlrec->targetBlk, true);
page = (Page) BufferGetPage(buf);
brin_page_init(page, BRIN_PAGETYPE_REVMAP);
PageSetLSN(page, lsn);
MarkBufferDirty(buf);
UnlockReleaseBuffer(buf);
if (BufferIsValid(metabuf))
UnlockReleaseBuffer(metabuf);
}
void
brin_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
switch (info & XLOG_BRIN_OPMASK)
{
case XLOG_BRIN_CREATE_INDEX:
brin_xlog_createidx(lsn, record);
break;
case XLOG_BRIN_INSERT:
brin_xlog_insert(lsn, record);
break;
case XLOG_BRIN_UPDATE:
brin_xlog_update(lsn, record);
break;
case XLOG_BRIN_SAMEPAGE_UPDATE:
brin_xlog_samepage_update(lsn, record);
break;
case XLOG_BRIN_REVMAP_EXTEND:
brin_xlog_revmap_extend(lsn, record);
break;
default:
elog(PANIC, "brin_redo: unknown op code %u", info);
}
}

View File

@@ -209,6 +209,13 @@ static relopt_int intRelOpts[] =
RELOPT_KIND_HEAP | RELOPT_KIND_TOAST
}, -1, 0, 2000000000
},
{
{
"pages_per_range",
"Number of pages that each page range covers in a BRIN index",
RELOPT_KIND_BRIN
}, 128, 1, 131072
},
/* list terminator */
{{NULL}}

View File

@@ -272,6 +272,8 @@ initscan(HeapScanDesc scan, ScanKey key, bool is_rescan)
scan->rs_startblock = 0;
}
scan->rs_initblock = 0;
scan->rs_numblocks = InvalidBlockNumber;
scan->rs_inited = false;
scan->rs_ctup.t_data = NULL;
ItemPointerSetInvalid(&scan->rs_ctup.t_self);
@@ -297,6 +299,14 @@ initscan(HeapScanDesc scan, ScanKey key, bool is_rescan)
pgstat_count_heap_scan(scan->rs_rd);
}
void
heap_setscanlimits(HeapScanDesc scan, BlockNumber startBlk, BlockNumber numBlks)
{
scan->rs_startblock = startBlk;
scan->rs_initblock = startBlk;
scan->rs_numblocks = numBlks;
}
/*
* heapgetpage - subroutine for heapgettup()
*
@@ -637,7 +647,8 @@ heapgettup(HeapScanDesc scan,
*/
if (backward)
{
finished = (page == scan->rs_startblock);
finished = (page == scan->rs_startblock) ||
(scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false);
if (page == 0)
page = scan->rs_nblocks;
page--;
@@ -647,7 +658,8 @@ heapgettup(HeapScanDesc scan,
page++;
if (page >= scan->rs_nblocks)
page = 0;
finished = (page == scan->rs_startblock);
finished = (page == scan->rs_startblock) ||
(scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false);
/*
* Report our new scan position for synchronization purposes. We
@@ -898,7 +910,8 @@ heapgettup_pagemode(HeapScanDesc scan,
*/
if (backward)
{
finished = (page == scan->rs_startblock);
finished = (page == scan->rs_startblock) ||
(scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false);
if (page == 0)
page = scan->rs_nblocks;
page--;
@@ -908,7 +921,8 @@ heapgettup_pagemode(HeapScanDesc scan,
page++;
if (page >= scan->rs_nblocks)
page = 0;
finished = (page == scan->rs_startblock);
finished = (page == scan->rs_startblock) ||
(scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false);
/*
* Report our new scan position for synchronization purposes. We

View File

@@ -8,7 +8,8 @@ subdir = src/backend/access/rmgrdesc
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = clogdesc.o dbasedesc.o gindesc.o gistdesc.o hashdesc.o heapdesc.o \
OBJS = brindesc.o clogdesc.o dbasedesc.o gindesc.o gistdesc.o \
hashdesc.o heapdesc.o \
mxactdesc.o nbtdesc.o relmapdesc.o seqdesc.o smgrdesc.o spgdesc.o \
standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o

View File

@@ -0,0 +1,112 @@
/*-------------------------------------------------------------------------
*
* brindesc.c
* rmgr descriptor routines for BRIN indexes
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/access/rmgrdesc/brindesc.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/brin_xlog.h"
void
brin_desc(StringInfo buf, XLogRecord *record)
{
char *rec = XLogRecGetData(record);
uint8 info = record->xl_info & ~XLR_INFO_MASK;
info &= XLOG_BRIN_OPMASK;
if (info == XLOG_BRIN_CREATE_INDEX)
{
xl_brin_createidx *xlrec = (xl_brin_createidx *) rec;
appendStringInfo(buf, "v%d pagesPerRange %u rel %u/%u/%u",
xlrec->version, xlrec->pagesPerRange,
xlrec->node.spcNode, xlrec->node.dbNode,
xlrec->node.relNode);
}
else if (info == XLOG_BRIN_INSERT)
{
xl_brin_insert *xlrec = (xl_brin_insert *) rec;
appendStringInfo(buf, "rel %u/%u/%u heapBlk %u revmapBlk %u pagesPerRange %u TID (%u,%u)",
xlrec->node.spcNode, xlrec->node.dbNode,
xlrec->node.relNode,
xlrec->heapBlk, xlrec->revmapBlk,
xlrec->pagesPerRange,
ItemPointerGetBlockNumber(&xlrec->tid),
ItemPointerGetOffsetNumber(&xlrec->tid));
}
else if (info == XLOG_BRIN_UPDATE)
{
xl_brin_update *xlrec = (xl_brin_update *) rec;
appendStringInfo(buf, "rel %u/%u/%u heapBlk %u revmapBlk %u pagesPerRange %u old TID (%u,%u) TID (%u,%u)",
xlrec->new.node.spcNode, xlrec->new.node.dbNode,
xlrec->new.node.relNode,
xlrec->new.heapBlk, xlrec->new.revmapBlk,
xlrec->new.pagesPerRange,
ItemPointerGetBlockNumber(&xlrec->oldtid),
ItemPointerGetOffsetNumber(&xlrec->oldtid),
ItemPointerGetBlockNumber(&xlrec->new.tid),
ItemPointerGetOffsetNumber(&xlrec->new.tid));
}
else if (info == XLOG_BRIN_SAMEPAGE_UPDATE)
{
xl_brin_samepage_update *xlrec = (xl_brin_samepage_update *) rec;
appendStringInfo(buf, "rel %u/%u/%u TID (%u,%u)",
xlrec->node.spcNode, xlrec->node.dbNode,
xlrec->node.relNode,
ItemPointerGetBlockNumber(&xlrec->tid),
ItemPointerGetOffsetNumber(&xlrec->tid));
}
else if (info == XLOG_BRIN_REVMAP_EXTEND)
{
xl_brin_revmap_extend *xlrec = (xl_brin_revmap_extend *) rec;
appendStringInfo(buf, "rel %u/%u/%u targetBlk %u",
xlrec->node.spcNode, xlrec->node.dbNode,
xlrec->node.relNode, xlrec->targetBlk);
}
}
const char *
brin_identify(uint8 info)
{
const char *id = NULL;
switch (info & ~XLR_INFO_MASK)
{
case XLOG_BRIN_CREATE_INDEX:
id = "CREATE_INDEX";
break;
case XLOG_BRIN_INSERT:
id = "INSERT";
break;
case XLOG_BRIN_INSERT | XLOG_BRIN_INIT_PAGE:
id = "INSERT+INIT";
break;
case XLOG_BRIN_UPDATE:
id = "UPDATE";
break;
case XLOG_BRIN_UPDATE | XLOG_BRIN_INIT_PAGE:
id = "UPDATE+INIT";
break;
case XLOG_BRIN_SAMEPAGE_UPDATE:
id = "SAMEPAGE_UPDATE";
break;
case XLOG_BRIN_REVMAP_EXTEND:
id = "REVMAP_EXTEND";
break;
}
return id;
}

View File

@@ -12,6 +12,7 @@
#include "access/gist_private.h"
#include "access/hash.h"
#include "access/heapam_xlog.h"
#include "access/brin_xlog.h"
#include "access/multixact.h"
#include "access/nbtree.h"
#include "access/spgist.h"