1
0
mirror of https://github.com/postgres/postgres.git synced 2025-08-18 12:22:09 +03:00

BRIN de-summarization

When the BRIN summary tuple for a page range becomes too "wide" for the
values actually stored in the table (because the tuples that were
present originally are no longer present due to updates or deletes), it
can be useful to remove the outdated summary tuple, so that a future
summarization can install a tighter summary.

This commit introduces a SQL-callable interface to do so.

Author: Álvaro Herrera
Reviewed-by: Eiji Seki
Discussion: https://postgr.es/m/20170228045643.n2ri74ara4fhhfxf@alvherre.pgsql
This commit is contained in:
Alvaro Herrera
2017-04-01 16:10:04 -03:00
parent 3a82129a40
commit c655899ba9
12 changed files with 340 additions and 5 deletions

View File

@@ -908,6 +908,80 @@ brin_summarize_range(PG_FUNCTION_ARGS)
PG_RETURN_INT32((int32) numSummarized);
}
/*
* SQL-callable interface to mark a range as no longer summarized
*/
Datum
brin_desummarize_range(PG_FUNCTION_ARGS)
{
Oid indexoid = PG_GETARG_OID(0);
int64 heapBlk64 = PG_GETARG_INT64(1);
BlockNumber heapBlk;
Oid heapoid;
Relation heapRel;
Relation indexRel;
bool done;
if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
{
char *blk = psprintf(INT64_FORMAT, heapBlk64);
ereport(ERROR,
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
errmsg("block number out of range: %s", blk)));
}
heapBlk = (BlockNumber) heapBlk64;
/*
* We must lock table before index to avoid deadlocks. However, if the
* passed indexoid isn't an index then IndexGetRelation() will fail.
* Rather than emitting a not-very-helpful error message, postpone
* complaining, expecting that the is-it-an-index test below will fail.
*/
heapoid = IndexGetRelation(indexoid, true);
if (OidIsValid(heapoid))
heapRel = heap_open(heapoid, ShareUpdateExclusiveLock);
else
heapRel = NULL;
indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
/* Must be a BRIN index */
if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
indexRel->rd_rel->relam != BRIN_AM_OID)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a BRIN index",
RelationGetRelationName(indexRel))));
/* User must own the index (comparable to privileges needed for VACUUM) */
if (!pg_class_ownercheck(indexoid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
RelationGetRelationName(indexRel));
/*
* Since we did the IndexGetRelation call above without any lock, it's
* barely possible that a race against an index drop/recreation could have
* netted us the wrong table. Recheck.
*/
if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("could not open parent table of index %s",
RelationGetRelationName(indexRel))));
/* the revmap does the hard work */
do {
done = brinRevmapDesummarizeRange(indexRel, heapBlk);
}
while (!done);
relation_close(indexRel, ShareUpdateExclusiveLock);
relation_close(heapRel, ShareUpdateExclusiveLock);
PG_RETURN_VOID();
}
/*
* Build a BrinDesc used to create or scan a BRIN index
*/

View File

@@ -168,9 +168,12 @@ brinSetHeapBlockItemptr(Buffer buf, BlockNumber pagesPerRange,
iptr = (ItemPointerData *) contents->rm_tids;
iptr += HEAPBLK_TO_REVMAP_INDEX(pagesPerRange, heapBlk);
ItemPointerSet(iptr,
ItemPointerGetBlockNumber(&tid),
ItemPointerGetOffsetNumber(&tid));
if (ItemPointerIsValid(&tid))
ItemPointerSet(iptr,
ItemPointerGetBlockNumber(&tid),
ItemPointerGetOffsetNumber(&tid));
else
ItemPointerSetInvalid(iptr);
}
/*
@@ -304,6 +307,137 @@ brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk,
return NULL;
}
/*
* Delete an index tuple, marking a page range as unsummarized.
*
* Index must be locked in ShareUpdateExclusiveLock mode.
*
* Return FALSE if caller should retry.
*/
bool
brinRevmapDesummarizeRange(Relation idxrel, BlockNumber heapBlk)
{
BrinRevmap *revmap;
BlockNumber pagesPerRange;
RevmapContents *contents;
ItemPointerData *iptr;
ItemPointerData invalidIptr;
BlockNumber revmapBlk;
Buffer revmapBuf;
Buffer regBuf;
Page revmapPg;
Page regPg;
OffsetNumber revmapOffset;
OffsetNumber regOffset;
ItemId lp;
BrinTuple *tup;
revmap = brinRevmapInitialize(idxrel, &pagesPerRange, NULL);
revmapBlk = revmap_get_blkno(revmap, heapBlk);
if (!BlockNumberIsValid(revmapBlk))
{
/* revmap page doesn't exist: range not summarized, we're done */
brinRevmapTerminate(revmap);
return true;
}
/* Lock the revmap page, obtain the index tuple pointer from it */
revmapBuf = brinLockRevmapPageForUpdate(revmap, heapBlk);
revmapPg = BufferGetPage(revmapBuf);
revmapOffset = HEAPBLK_TO_REVMAP_INDEX(revmap->rm_pagesPerRange, heapBlk);
contents = (RevmapContents *) PageGetContents(revmapPg);
iptr = contents->rm_tids;
iptr += revmapOffset;
if (!ItemPointerIsValid(iptr))
{
/* no index tuple: range not summarized, we're done */
LockBuffer(revmapBuf, BUFFER_LOCK_UNLOCK);
brinRevmapTerminate(revmap);
return true;
}
regBuf = ReadBuffer(idxrel, ItemPointerGetBlockNumber(iptr));
LockBuffer(regBuf, BUFFER_LOCK_EXCLUSIVE);
regPg = BufferGetPage(regBuf);
/* if this is no longer a regular page, tell caller to start over */
if (!BRIN_IS_REGULAR_PAGE(regPg))
{
LockBuffer(revmapBuf, BUFFER_LOCK_UNLOCK);
LockBuffer(regBuf, BUFFER_LOCK_UNLOCK);
brinRevmapTerminate(revmap);
return false;
}
regOffset = ItemPointerGetOffsetNumber(iptr);
if (regOffset > PageGetMaxOffsetNumber(regPg))
ereport(ERROR,
(errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("corrupted BRIN index: inconsistent range map")));
lp = PageGetItemId(regPg, regOffset);
if (!ItemIdIsUsed(lp))
ereport(ERROR,
(errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("corrupted BRIN index: inconsistent range map")));
tup = (BrinTuple *) PageGetItem(regPg, lp);
/* XXX apply sanity checks? Might as well delete a bogus tuple ... */
/*
* We're only removing data, not reading it, so there's no need to
* TestForOldSnapshot here.
*/
/*
* Because of SUE lock, this function shouldn't run concurrently with
* summarization. Placeholder tuples can only exist as leftovers from
* crashed summarization, so if we detect any, we complain but proceed.
*/
if (BrinTupleIsPlaceholder(tup))
ereport(WARNING,
(errmsg("leftover placeholder tuple detected in BRIN index \"%s\", deleting",
RelationGetRelationName(idxrel))));
START_CRIT_SECTION();
ItemPointerSetInvalid(&invalidIptr);
brinSetHeapBlockItemptr(revmapBuf, revmap->rm_pagesPerRange, heapBlk,
invalidIptr);
PageIndexTupleDeleteNoCompact(regPg, regOffset);
/* XXX record free space in FSM? */
MarkBufferDirty(regBuf);
MarkBufferDirty(revmapBuf);
if (RelationNeedsWAL(idxrel))
{
xl_brin_desummarize xlrec;
XLogRecPtr recptr;
xlrec.heapBlk = heapBlk;
xlrec.regOffset = regOffset;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfBrinDesummarize);
XLogRegisterBuffer(0, revmapBuf, 0);
XLogRegisterBuffer(1, regBuf, REGBUF_STANDARD);
recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_DESUMMARIZE);
PageSetLSN(revmapPg, recptr);
PageSetLSN(regPg, recptr);
}
END_CRIT_SECTION();
UnlockReleaseBuffer(regBuf);
LockBuffer(revmapBuf, BUFFER_LOCK_UNLOCK);
brinRevmapTerminate(revmap);
return true;
}
/*
* Given a heap block number, find the corresponding physical revmap block
* number and return it. If the revmap page hasn't been allocated yet, return

View File

@@ -254,6 +254,46 @@ brin_xlog_revmap_extend(XLogReaderState *record)
UnlockReleaseBuffer(metabuf);
}
static void
brin_xlog_desummarize_page(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_brin_desummarize *xlrec;
Buffer buffer;
XLogRedoAction action;
xlrec = (xl_brin_desummarize *) XLogRecGetData(record);
/* Update the revmap */
action = XLogReadBufferForRedo(record, 0, &buffer);
if (action == BLK_NEEDS_REDO)
{
ItemPointerData iptr;
ItemPointerSetInvalid(&iptr);
brinSetHeapBlockItemptr(buffer, xlrec->pagesPerRange, xlrec->heapBlk, iptr);
PageSetLSN(BufferGetPage(buffer), lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/* remove the leftover entry from the regular page */
action = XLogReadBufferForRedo(record, 1, &buffer);
if (action == BLK_NEEDS_REDO)
{
Page regPg = BufferGetPage(buffer);
PageIndexTupleDeleteNoCompact(regPg, xlrec->regOffset);
PageSetLSN(regPg, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
void
brin_redo(XLogReaderState *record)
{
@@ -276,6 +316,9 @@ brin_redo(XLogReaderState *record)
case XLOG_BRIN_REVMAP_EXTEND:
brin_xlog_revmap_extend(record);
break;
case XLOG_BRIN_DESUMMARIZE:
brin_xlog_desummarize_page(record);
break;
default:
elog(PANIC, "brin_redo: unknown op code %u", info);
}

View File

@@ -61,6 +61,13 @@ brin_desc(StringInfo buf, XLogReaderState *record)
appendStringInfo(buf, "targetBlk %u", xlrec->targetBlk);
}
else if (info == XLOG_BRIN_DESUMMARIZE)
{
xl_brin_desummarize *xlrec = (xl_brin_desummarize *) rec;
appendStringInfo(buf, "pagesPerRange %u, heapBlk %u, page offset %u",
xlrec->pagesPerRange, xlrec->heapBlk, xlrec->regOffset);
}
}
const char *
@@ -91,6 +98,9 @@ brin_identify(uint8 info)
case XLOG_BRIN_REVMAP_EXTEND:
id = "REVMAP_EXTEND";
break;
case XLOG_BRIN_DESUMMARIZE:
id = "DESUMMARIZE";
break;
}
return id;