mirror of
https://github.com/postgres/postgres.git
synced 2025-09-03 15:22:11 +03:00
Simplify and improve ProcessStandbyHSFeedbackMessage logic.
There's no need to clamp the standby's xmin to be greater than GetOldestXmin's result; if there were any such need this logic would be hopelessly inadequate anyway, because it fails to account for within-database versus cluster-wide values of GetOldestXmin. So get rid of that, and just rely on sanity-checking that the xmin is not wrapped around relative to the nextXid counter. Also, don't reset the walsender's xmin if the current feedback xmin is indeed out of range; that just creates more problems than we already had. Lastly, don't bother to take the ProcArrayLock; there's no need to do that to set xmin. Also improve the comments about this in GetOldestXmin itself.
This commit is contained in:
@@ -642,76 +642,67 @@ static void
|
||||
ProcessStandbyHSFeedbackMessage(void)
|
||||
{
|
||||
StandbyHSFeedbackMessage reply;
|
||||
TransactionId newxmin = InvalidTransactionId;
|
||||
TransactionId nextXid;
|
||||
uint32 nextEpoch;
|
||||
|
||||
pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage));
|
||||
/* Decipher the reply message */
|
||||
pq_copymsgbytes(&reply_message, (char *) &reply,
|
||||
sizeof(StandbyHSFeedbackMessage));
|
||||
|
||||
elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
|
||||
reply.xmin,
|
||||
reply.epoch);
|
||||
|
||||
/*
|
||||
* Update the WalSender's proc xmin to allow it to be visible to
|
||||
* snapshots. This will hold back the removal of dead rows and thereby
|
||||
* prevent the generation of cleanup conflicts on the standby server.
|
||||
*/
|
||||
if (TransactionIdIsValid(reply.xmin))
|
||||
{
|
||||
TransactionId nextXid;
|
||||
uint32 nextEpoch;
|
||||
bool epochOK = false;
|
||||
|
||||
GetNextXidAndEpoch(&nextXid, &nextEpoch);
|
||||
|
||||
/*
|
||||
* Epoch of oldestXmin should be same as standby or if the counter has
|
||||
* wrapped, then one less than reply.
|
||||
*/
|
||||
if (reply.xmin <= nextXid)
|
||||
{
|
||||
if (reply.epoch == nextEpoch)
|
||||
epochOK = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (nextEpoch > 0 && reply.epoch == nextEpoch - 1)
|
||||
epochOK = true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Feedback from standby must not go backwards, nor should it go
|
||||
* forwards further than our most recent xid.
|
||||
*/
|
||||
if (epochOK && TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
|
||||
{
|
||||
if (!TransactionIdIsValid(MyProc->xmin))
|
||||
{
|
||||
TransactionId oldestXmin = GetOldestXmin(true, true);
|
||||
|
||||
if (TransactionIdPrecedes(oldestXmin, reply.xmin))
|
||||
newxmin = reply.xmin;
|
||||
else
|
||||
newxmin = oldestXmin;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (TransactionIdPrecedes(MyProc->xmin, reply.xmin))
|
||||
newxmin = reply.xmin;
|
||||
else
|
||||
newxmin = MyProc->xmin; /* stay the same */
|
||||
}
|
||||
}
|
||||
}
|
||||
/* Ignore invalid xmin (can't actually happen with current walreceiver) */
|
||||
if (!TransactionIdIsNormal(reply.xmin))
|
||||
return;
|
||||
|
||||
/*
|
||||
* Grab the ProcArrayLock to set xmin, or invalidate for bad reply
|
||||
* Check that the provided xmin/epoch are sane, that is, not in the future
|
||||
* and not so far back as to be already wrapped around. Ignore if not.
|
||||
*
|
||||
* Epoch of nextXid should be same as standby, or if the counter has
|
||||
* wrapped, then one greater than standby.
|
||||
*/
|
||||
if (MyProc->xmin != newxmin)
|
||||
GetNextXidAndEpoch(&nextXid, &nextEpoch);
|
||||
|
||||
if (reply.xmin <= nextXid)
|
||||
{
|
||||
LWLockAcquire(ProcArrayLock, LW_SHARED);
|
||||
MyProc->xmin = newxmin;
|
||||
LWLockRelease(ProcArrayLock);
|
||||
if (reply.epoch != nextEpoch)
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (reply.epoch + 1 != nextEpoch)
|
||||
return;
|
||||
}
|
||||
|
||||
if (!TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
|
||||
return; /* epoch OK, but it's wrapped around */
|
||||
|
||||
/*
|
||||
* Set the WalSender's xmin equal to the standby's requested xmin, so that
|
||||
* the xmin will be taken into account by GetOldestXmin. This will hold
|
||||
* back the removal of dead rows and thereby prevent the generation of
|
||||
* cleanup conflicts on the standby server.
|
||||
*
|
||||
* There is a small window for a race condition here: although we just
|
||||
* checked that reply.xmin precedes nextXid, the nextXid could have gotten
|
||||
* advanced between our fetching it and applying the xmin below, perhaps
|
||||
* far enough to make reply.xmin wrap around. In that case the xmin we
|
||||
* set here would be "in the future" and have no effect. No point in
|
||||
* worrying about this since it's too late to save the desired data
|
||||
* anyway. Assuming that the standby sends us an increasing sequence of
|
||||
* xmins, this could only happen during the first reply cycle, else our
|
||||
* own xmin would prevent nextXid from advancing so far.
|
||||
*
|
||||
* We don't bother taking the ProcArrayLock here. Setting the xmin field
|
||||
* is assumed atomic, and there's no real need to prevent a concurrent
|
||||
* GetOldestXmin. (If we're moving our xmin forward, this is obviously
|
||||
* safe, and if we're moving it backwards, well, the data is at risk
|
||||
* already since a VACUUM could have just finished calling GetOldestXmin.)
|
||||
*/
|
||||
MyProc->xmin = reply.xmin;
|
||||
}
|
||||
|
||||
/* Main loop of walsender process */
|
||||
|
Reference in New Issue
Block a user