mirror of
https://github.com/postgres/postgres.git
synced 2025-11-09 06:21:09 +03:00
Move "hot" members of PGPROC into a separate PGXACT array.
This speeds up snapshot-taking and reduces ProcArrayLock contention. Also, the PGPROC (and PGXACT) structures used by two-phase commit are now allocated as part of the main array, rather than in a separate array, and we keep ProcArray sorted in pointer order. These changes are intended to minimize the number of cache lines that must be pulled in to take a snapshot, and testing shows a substantial increase in performance on both read and write workloads at high concurrencies. Pavan Deolasee, Heikki Linnakangas, Robert Haas
This commit is contained in:
@@ -113,7 +113,8 @@ int max_prepared_xacts = 0;
|
||||
|
||||
typedef struct GlobalTransactionData
|
||||
{
|
||||
PGPROC proc; /* dummy proc */
|
||||
GlobalTransaction next;
|
||||
int pgprocno; /* dummy proc */
|
||||
BackendId dummyBackendId; /* similar to backend id for backends */
|
||||
TimestampTz prepared_at; /* time of preparation */
|
||||
XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */
|
||||
@@ -207,7 +208,8 @@ TwoPhaseShmemInit(void)
|
||||
sizeof(GlobalTransaction) * max_prepared_xacts));
|
||||
for (i = 0; i < max_prepared_xacts; i++)
|
||||
{
|
||||
gxacts[i].proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
|
||||
gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
|
||||
gxacts[i].next = TwoPhaseState->freeGXacts;
|
||||
TwoPhaseState->freeGXacts = &gxacts[i];
|
||||
|
||||
/*
|
||||
@@ -243,6 +245,8 @@ MarkAsPreparing(TransactionId xid, const char *gid,
|
||||
TimestampTz prepared_at, Oid owner, Oid databaseid)
|
||||
{
|
||||
GlobalTransaction gxact;
|
||||
PGPROC *proc;
|
||||
PGXACT *pgxact;
|
||||
int i;
|
||||
|
||||
if (strlen(gid) >= GIDSIZE)
|
||||
@@ -274,7 +278,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
|
||||
TwoPhaseState->numPrepXacts--;
|
||||
TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
|
||||
/* and put it back in the freelist */
|
||||
gxact->proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
|
||||
gxact->next = TwoPhaseState->freeGXacts;
|
||||
TwoPhaseState->freeGXacts = gxact;
|
||||
/* Back up index count too, so we don't miss scanning one */
|
||||
i--;
|
||||
@@ -302,32 +306,36 @@ MarkAsPreparing(TransactionId xid, const char *gid,
|
||||
errhint("Increase max_prepared_transactions (currently %d).",
|
||||
max_prepared_xacts)));
|
||||
gxact = TwoPhaseState->freeGXacts;
|
||||
TwoPhaseState->freeGXacts = (GlobalTransaction) gxact->proc.links.next;
|
||||
TwoPhaseState->freeGXacts = (GlobalTransaction) gxact->next;
|
||||
|
||||
/* Initialize it */
|
||||
MemSet(&gxact->proc, 0, sizeof(PGPROC));
|
||||
SHMQueueElemInit(&(gxact->proc.links));
|
||||
gxact->proc.waitStatus = STATUS_OK;
|
||||
proc = &ProcGlobal->allProcs[gxact->pgprocno];
|
||||
pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
|
||||
|
||||
/* Initialize the PGPROC entry */
|
||||
MemSet(proc, 0, sizeof(PGPROC));
|
||||
proc->pgprocno = gxact->pgprocno;
|
||||
SHMQueueElemInit(&(proc->links));
|
||||
proc->waitStatus = STATUS_OK;
|
||||
/* We set up the gxact's VXID as InvalidBackendId/XID */
|
||||
gxact->proc.lxid = (LocalTransactionId) xid;
|
||||
gxact->proc.xid = xid;
|
||||
gxact->proc.xmin = InvalidTransactionId;
|
||||
gxact->proc.pid = 0;
|
||||
gxact->proc.backendId = InvalidBackendId;
|
||||
gxact->proc.databaseId = databaseid;
|
||||
gxact->proc.roleId = owner;
|
||||
gxact->proc.inCommit = false;
|
||||
gxact->proc.vacuumFlags = 0;
|
||||
gxact->proc.lwWaiting = false;
|
||||
gxact->proc.lwExclusive = false;
|
||||
gxact->proc.lwWaitLink = NULL;
|
||||
gxact->proc.waitLock = NULL;
|
||||
gxact->proc.waitProcLock = NULL;
|
||||
proc->lxid = (LocalTransactionId) xid;
|
||||
pgxact->xid = xid;
|
||||
pgxact->xmin = InvalidTransactionId;
|
||||
pgxact->inCommit = false;
|
||||
pgxact->vacuumFlags = 0;
|
||||
proc->pid = 0;
|
||||
proc->backendId = InvalidBackendId;
|
||||
proc->databaseId = databaseid;
|
||||
proc->roleId = owner;
|
||||
proc->lwWaiting = false;
|
||||
proc->lwExclusive = false;
|
||||
proc->lwWaitLink = NULL;
|
||||
proc->waitLock = NULL;
|
||||
proc->waitProcLock = NULL;
|
||||
for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
|
||||
SHMQueueInit(&(gxact->proc.myProcLocks[i]));
|
||||
SHMQueueInit(&(proc->myProcLocks[i]));
|
||||
/* subxid data must be filled later by GXactLoadSubxactData */
|
||||
gxact->proc.subxids.overflowed = false;
|
||||
gxact->proc.subxids.nxids = 0;
|
||||
pgxact->overflowed = false;
|
||||
pgxact->nxids = 0;
|
||||
|
||||
gxact->prepared_at = prepared_at;
|
||||
/* initialize LSN to 0 (start of WAL) */
|
||||
@@ -358,17 +366,19 @@ static void
|
||||
GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
|
||||
TransactionId *children)
|
||||
{
|
||||
PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
|
||||
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
|
||||
/* We need no extra lock since the GXACT isn't valid yet */
|
||||
if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
|
||||
{
|
||||
gxact->proc.subxids.overflowed = true;
|
||||
pgxact->overflowed = true;
|
||||
nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
|
||||
}
|
||||
if (nsubxacts > 0)
|
||||
{
|
||||
memcpy(gxact->proc.subxids.xids, children,
|
||||
memcpy(proc->subxids.xids, children,
|
||||
nsubxacts * sizeof(TransactionId));
|
||||
gxact->proc.subxids.nxids = nsubxacts;
|
||||
pgxact->nxids = nsubxacts;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -389,7 +399,7 @@ MarkAsPrepared(GlobalTransaction gxact)
|
||||
* Put it into the global ProcArray so TransactionIdIsInProgress considers
|
||||
* the XID as still running.
|
||||
*/
|
||||
ProcArrayAdd(&gxact->proc);
|
||||
ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -406,6 +416,7 @@ LockGXact(const char *gid, Oid user)
|
||||
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
|
||||
{
|
||||
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
|
||||
PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
|
||||
|
||||
/* Ignore not-yet-valid GIDs */
|
||||
if (!gxact->valid)
|
||||
@@ -436,7 +447,7 @@ LockGXact(const char *gid, Oid user)
|
||||
* there may be some other issues as well. Hence disallow until
|
||||
* someone gets motivated to make it work.
|
||||
*/
|
||||
if (MyDatabaseId != gxact->proc.databaseId)
|
||||
if (MyDatabaseId != proc->databaseId)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("prepared transaction belongs to another database"),
|
||||
@@ -483,7 +494,7 @@ RemoveGXact(GlobalTransaction gxact)
|
||||
TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
|
||||
|
||||
/* and put it back in the freelist */
|
||||
gxact->proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
|
||||
gxact->next = TwoPhaseState->freeGXacts;
|
||||
TwoPhaseState->freeGXacts = gxact;
|
||||
|
||||
LWLockRelease(TwoPhaseStateLock);
|
||||
@@ -518,8 +529,9 @@ TransactionIdIsPrepared(TransactionId xid)
|
||||
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
|
||||
{
|
||||
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
|
||||
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
|
||||
|
||||
if (gxact->valid && gxact->proc.xid == xid)
|
||||
if (gxact->valid && pgxact->xid == xid)
|
||||
{
|
||||
result = true;
|
||||
break;
|
||||
@@ -642,6 +654,8 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
|
||||
while (status->array != NULL && status->currIdx < status->ngxacts)
|
||||
{
|
||||
GlobalTransaction gxact = &status->array[status->currIdx++];
|
||||
PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
|
||||
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
|
||||
Datum values[5];
|
||||
bool nulls[5];
|
||||
HeapTuple tuple;
|
||||
@@ -656,11 +670,11 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
|
||||
MemSet(values, 0, sizeof(values));
|
||||
MemSet(nulls, 0, sizeof(nulls));
|
||||
|
||||
values[0] = TransactionIdGetDatum(gxact->proc.xid);
|
||||
values[0] = TransactionIdGetDatum(pgxact->xid);
|
||||
values[1] = CStringGetTextDatum(gxact->gid);
|
||||
values[2] = TimestampTzGetDatum(gxact->prepared_at);
|
||||
values[3] = ObjectIdGetDatum(gxact->owner);
|
||||
values[4] = ObjectIdGetDatum(gxact->proc.databaseId);
|
||||
values[4] = ObjectIdGetDatum(proc->databaseId);
|
||||
|
||||
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
|
||||
result = HeapTupleGetDatum(tuple);
|
||||
@@ -711,10 +725,11 @@ TwoPhaseGetDummyProc(TransactionId xid)
|
||||
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
|
||||
{
|
||||
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
|
||||
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
|
||||
|
||||
if (gxact->proc.xid == xid)
|
||||
if (pgxact->xid == xid)
|
||||
{
|
||||
result = &gxact->proc;
|
||||
result = &ProcGlobal->allProcs[gxact->pgprocno];
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -841,7 +856,9 @@ save_state_data(const void *data, uint32 len)
|
||||
void
|
||||
StartPrepare(GlobalTransaction gxact)
|
||||
{
|
||||
TransactionId xid = gxact->proc.xid;
|
||||
PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
|
||||
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
|
||||
TransactionId xid = pgxact->xid;
|
||||
TwoPhaseFileHeader hdr;
|
||||
TransactionId *children;
|
||||
RelFileNode *commitrels;
|
||||
@@ -865,7 +882,7 @@ StartPrepare(GlobalTransaction gxact)
|
||||
hdr.magic = TWOPHASE_MAGIC;
|
||||
hdr.total_len = 0; /* EndPrepare will fill this in */
|
||||
hdr.xid = xid;
|
||||
hdr.database = gxact->proc.databaseId;
|
||||
hdr.database = proc->databaseId;
|
||||
hdr.prepared_at = gxact->prepared_at;
|
||||
hdr.owner = gxact->owner;
|
||||
hdr.nsubxacts = xactGetCommittedChildren(&children);
|
||||
@@ -913,7 +930,8 @@ StartPrepare(GlobalTransaction gxact)
|
||||
void
|
||||
EndPrepare(GlobalTransaction gxact)
|
||||
{
|
||||
TransactionId xid = gxact->proc.xid;
|
||||
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
|
||||
TransactionId xid = pgxact->xid;
|
||||
TwoPhaseFileHeader *hdr;
|
||||
char path[MAXPGPATH];
|
||||
XLogRecData *record;
|
||||
@@ -1021,7 +1039,7 @@ EndPrepare(GlobalTransaction gxact)
|
||||
*/
|
||||
START_CRIT_SECTION();
|
||||
|
||||
MyProc->inCommit = true;
|
||||
MyPgXact->inCommit = true;
|
||||
|
||||
gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
|
||||
records.head);
|
||||
@@ -1069,7 +1087,7 @@ EndPrepare(GlobalTransaction gxact)
|
||||
* checkpoint starting after this will certainly see the gxact as a
|
||||
* candidate for fsyncing.
|
||||
*/
|
||||
MyProc->inCommit = false;
|
||||
MyPgXact->inCommit = false;
|
||||
|
||||
END_CRIT_SECTION();
|
||||
|
||||
@@ -1242,6 +1260,8 @@ void
|
||||
FinishPreparedTransaction(const char *gid, bool isCommit)
|
||||
{
|
||||
GlobalTransaction gxact;
|
||||
PGPROC *proc;
|
||||
PGXACT *pgxact;
|
||||
TransactionId xid;
|
||||
char *buf;
|
||||
char *bufptr;
|
||||
@@ -1260,7 +1280,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
|
||||
* try to commit the same GID at once.
|
||||
*/
|
||||
gxact = LockGXact(gid, GetUserId());
|
||||
xid = gxact->proc.xid;
|
||||
proc = &ProcGlobal->allProcs[gxact->pgprocno];
|
||||
pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
|
||||
xid = pgxact->xid;
|
||||
|
||||
/*
|
||||
* Read and validate the state file
|
||||
@@ -1309,7 +1331,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
|
||||
hdr->nsubxacts, children,
|
||||
hdr->nabortrels, abortrels);
|
||||
|
||||
ProcArrayRemove(&gxact->proc, latestXid);
|
||||
ProcArrayRemove(proc, latestXid);
|
||||
|
||||
/*
|
||||
* In case we fail while running the callbacks, mark the gxact invalid so
|
||||
@@ -1540,10 +1562,11 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
|
||||
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
|
||||
{
|
||||
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
|
||||
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
|
||||
|
||||
if (gxact->valid &&
|
||||
XLByteLE(gxact->prepare_lsn, redo_horizon))
|
||||
xids[nxids++] = gxact->proc.xid;
|
||||
xids[nxids++] = pgxact->xid;
|
||||
}
|
||||
|
||||
LWLockRelease(TwoPhaseStateLock);
|
||||
@@ -1972,7 +1995,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
|
||||
START_CRIT_SECTION();
|
||||
|
||||
/* See notes in RecordTransactionCommit */
|
||||
MyProc->inCommit = true;
|
||||
MyPgXact->inCommit = true;
|
||||
|
||||
/* Emit the XLOG commit record */
|
||||
xlrec.xid = xid;
|
||||
@@ -2037,7 +2060,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
|
||||
TransactionIdCommitTree(xid, nchildren, children);
|
||||
|
||||
/* Checkpoint can proceed now */
|
||||
MyProc->inCommit = false;
|
||||
MyPgXact->inCommit = false;
|
||||
|
||||
END_CRIT_SECTION();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user