1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-09 06:21:09 +03:00

Rethink the way FSM truncation works. Instead of WAL-logging FSM

truncations in FSM code, call FreeSpaceMapTruncateRel from smgr_redo. To
make that cleaner from modularity point of view, move the WAL-logging one
level up to RelationTruncate, and move RelationTruncate and all the
related WAL-logging to new src/backend/catalog/storage.c file. Introduce
new RelationCreateStorage and RelationDropStorage functions that are used
instead of calling smgrcreate/smgrscheduleunlink directly. Move the
pending rel deletion stuff from smgrcreate/smgrscheduleunlink to the new
functions. This leaves smgr.c as a thin wrapper around md.c; all the
transactional stuff is now in storage.c.

This will make it easier to add new forks with similar truncation logic,
like the visibility map.
This commit is contained in:
Heikki Linnakangas
2008-11-19 10:34:52 +00:00
parent 26e6c896c9
commit 3396000684
30 changed files with 675 additions and 794 deletions

View File

@@ -7,7 +7,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.47 2008/11/02 21:24:51 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.48 2008/11/19 10:34:50 heikki Exp $
*
* NOTES
* Each global transaction is associated with a global transaction
@@ -48,7 +48,9 @@
#include "access/twophase.h"
#include "access/twophase_rmgr.h"
#include "access/xact.h"
#include "access/xlogutils.h"
#include "catalog/pg_type.h"
#include "catalog/storage.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pg_trace.h"
@@ -141,12 +143,12 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
RelFileFork *rels);
RelFileNode *rels);
static void RecordTransactionAbortPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
RelFileFork *rels);
RelFileNode *rels);
static void ProcessRecords(char *bufptr, TransactionId xid,
const TwoPhaseCallback callbacks[]);
@@ -694,8 +696,8 @@ TwoPhaseGetDummyProc(TransactionId xid)
*
* 1. TwoPhaseFileHeader
* 2. TransactionId[] (subtransactions)
* 3. RelFileFork[] (files to be deleted at commit)
* 4. RelFileFork[] (files to be deleted at abort)
* 3. RelFileNode[] (files to be deleted at commit)
* 4. RelFileNode[] (files to be deleted at abort)
* 5. TwoPhaseRecordOnDisk
* 6. ...
* 7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
@@ -793,8 +795,8 @@ StartPrepare(GlobalTransaction gxact)
TransactionId xid = gxact->proc.xid;
TwoPhaseFileHeader hdr;
TransactionId *children;
RelFileFork *commitrels;
RelFileFork *abortrels;
RelFileNode *commitrels;
RelFileNode *abortrels;
/* Initialize linked list */
records.head = palloc0(sizeof(XLogRecData));
@@ -832,12 +834,12 @@ StartPrepare(GlobalTransaction gxact)
}
if (hdr.ncommitrels > 0)
{
save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileFork));
save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
pfree(commitrels);
}
if (hdr.nabortrels > 0)
{
save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileFork));
save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
pfree(abortrels);
}
}
@@ -1140,8 +1142,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
TwoPhaseFileHeader *hdr;
TransactionId latestXid;
TransactionId *children;
RelFileFork *commitrels;
RelFileFork *abortrels;
RelFileNode *commitrels;
RelFileNode *abortrels;
RelFileNode *delrels;
int ndelrels;
int i;
/*
@@ -1169,10 +1173,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
children = (TransactionId *) bufptr;
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
commitrels = (RelFileFork *) bufptr;
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileFork));
abortrels = (RelFileFork *) bufptr;
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileFork));
commitrels = (RelFileNode *) bufptr;
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
abortrels = (RelFileNode *) bufptr;
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
/* compute latestXid among all children */
latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
@@ -1214,21 +1218,28 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
*/
if (isCommit)
{
for (i = 0; i < hdr->ncommitrels; i++)
{
SMgrRelation srel = smgropen(commitrels[i].rnode);
smgrdounlink(srel, commitrels[i].forknum, false, false);
smgrclose(srel);
}
delrels = commitrels;
ndelrels = hdr->ncommitrels;
}
else
{
for (i = 0; i < hdr->nabortrels; i++)
delrels = abortrels;
ndelrels = hdr->nabortrels;
}
for (i = 0; i < ndelrels; i++)
{
SMgrRelation srel = smgropen(delrels[i]);
ForkNumber fork;
for (fork = 0; fork <= MAX_FORKNUM; fork++)
{
SMgrRelation srel = smgropen(abortrels[i].rnode);
smgrdounlink(srel, abortrels[i].forknum, false, false);
smgrclose(srel);
if (smgrexists(srel, fork))
{
XLogDropRelation(delrels[i], fork);
smgrdounlink(srel, fork, false, true);
}
}
smgrclose(srel);
}
/* And now do the callbacks */
@@ -1639,8 +1650,8 @@ RecoverPreparedTransactions(void)
bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
subxids = (TransactionId *) bufptr;
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileFork));
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileFork));
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
/*
* Reconstruct subtrans state for the transaction --- needed
@@ -1693,7 +1704,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
RelFileFork *rels)
RelFileNode *rels)
{
XLogRecData rdata[3];
int lastrdata = 0;
@@ -1718,7 +1729,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
{
rdata[0].next = &(rdata[1]);
rdata[1].data = (char *) rels;
rdata[1].len = nrels * sizeof(RelFileFork);
rdata[1].len = nrels * sizeof(RelFileNode);
rdata[1].buffer = InvalidBuffer;
lastrdata = 1;
}
@@ -1766,7 +1777,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
RelFileFork *rels)
RelFileNode *rels)
{
XLogRecData rdata[3];
int lastrdata = 0;
@@ -1796,7 +1807,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
{
rdata[0].next = &(rdata[1]);
rdata[1].data = (char *) rels;
rdata[1].len = nrels * sizeof(RelFileFork);
rdata[1].len = nrels * sizeof(RelFileNode);
rdata[1].buffer = InvalidBuffer;
lastrdata = 1;
}