1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-12 21:01:52 +03:00

Avoid bogus TwoPhaseState locking sequences

The optimized code in 728bd991c3 contains a few invalid locking
sequences.  To wit, the original code would try to acquire an lwlock
that it already holds.  Avoid this by moving lock acquisitions to
higher-level code, and install appropriate assertions in low-level that
the correct mode is held.

Authors: Michael Paquier, Álvaro Herrera
Reported-By: chuanting wang
Bug: #14680
Discussion: https://postgr.es/m/20170531033228.1487.10124@wrigleys.postgresql.org
This commit is contained in:
Alvaro Herrera
2017-06-14 11:29:05 -04:00
parent 0d9bdbcaae
commit e90ceeaa49
2 changed files with 85 additions and 74 deletions

View File

@ -5351,6 +5351,8 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
int i;
TimestampTz commit_time;
Assert(TransactionIdIsValid(xid));
max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts);
/*
@ -5518,6 +5520,8 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
int i;
TransactionId max_xid;
Assert(TransactionIdIsValid(xid));
/*
* Make sure nextXid is beyond any XID mentioned in the record.
*
@ -5598,51 +5602,49 @@ xact_redo(XLogReaderState *record)
/* Backup blocks are not used in xact records */
Assert(!XLogRecHasAnyBlockRefs(record));
if (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED)
if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
xl_xact_parsed_commit parsed;
ParseCommitRecord(XLogRecGetInfo(record), xlrec,
&parsed);
if (info == XLOG_XACT_COMMIT)
{
Assert(!TransactionIdIsValid(parsed.twophase_xid));
xact_redo_commit(&parsed, XLogRecGetXid(record),
record->EndRecPtr, XLogRecGetOrigin(record));
}
else
{
Assert(TransactionIdIsValid(parsed.twophase_xid));
xact_redo_commit(&parsed, parsed.twophase_xid,
record->EndRecPtr, XLogRecGetOrigin(record));
/* Delete TwoPhaseState gxact entry and/or 2PC file. */
PrepareRedoRemove(parsed.twophase_xid, false);
}
ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed);
xact_redo_commit(&parsed, XLogRecGetXid(record),
record->EndRecPtr, XLogRecGetOrigin(record));
}
else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
else if (info == XLOG_XACT_COMMIT_PREPARED)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
xl_xact_parsed_commit parsed;
ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed);
xact_redo_commit(&parsed, parsed.twophase_xid,
record->EndRecPtr, XLogRecGetOrigin(record));
/* Delete TwoPhaseState gxact entry and/or 2PC file. */
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
PrepareRedoRemove(parsed.twophase_xid, false);
LWLockRelease(TwoPhaseStateLock);
}
else if (info == XLOG_XACT_ABORT)
{
xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
xl_xact_parsed_abort parsed;
ParseAbortRecord(XLogRecGetInfo(record), xlrec,
&parsed);
ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
xact_redo_abort(&parsed, XLogRecGetXid(record));
}
else if (info == XLOG_XACT_ABORT_PREPARED)
{
xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
xl_xact_parsed_abort parsed;
if (info == XLOG_XACT_ABORT)
{
Assert(!TransactionIdIsValid(parsed.twophase_xid));
xact_redo_abort(&parsed, XLogRecGetXid(record));
}
else
{
Assert(TransactionIdIsValid(parsed.twophase_xid));
xact_redo_abort(&parsed, parsed.twophase_xid);
ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
xact_redo_abort(&parsed, parsed.twophase_xid);
/* Delete TwoPhaseState gxact entry and/or 2PC file. */
PrepareRedoRemove(parsed.twophase_xid, false);
}
/* Delete TwoPhaseState gxact entry and/or 2PC file. */
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
PrepareRedoRemove(parsed.twophase_xid, false);
LWLockRelease(TwoPhaseStateLock);
}
else if (info == XLOG_XACT_PREPARE)
{
@ -5650,9 +5652,11 @@ xact_redo(XLogReaderState *record)
* Store xid and start/end pointers of the WAL record in TwoPhaseState
* gxact entry.
*/
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
PrepareRedoAdd(XLogRecGetData(record),
record->ReadRecPtr,
record->EndRecPtr);
LWLockRelease(TwoPhaseStateLock);
}
else if (info == XLOG_XACT_ASSIGNMENT)
{