mirror of
https://github.com/postgres/postgres.git
synced 2025-10-24 01:29:19 +03:00
Fix low-probability loss of NOTIFY messages due to XID wraparound.
Up to now async.c has used TransactionIdIsInProgress() to detect whether a notify message's source transaction is still running. However, that function has a quick-exit path that reports that XIDs before RecentXmin are no longer running. If a listening backend is doing nothing but listening, and not running any queries, there is nothing that will advance its value of RecentXmin. Once 2 billion transactions elapse, the RecentXmin check causes active transactions to be reported as not running. If they aren't committed yet according to CLOG, async.c decides they aborted and discards their messages. The timing for that is a bit tight but it can happen when multiple backends are sending notifies concurrently. The net symptom therefore is that a sufficiently-long-surviving listen-only backend starts to miss some fraction of NOTIFY traffic, but only under heavy load. The only function that updates RecentXmin is GetSnapshotData(). A brute-force fix would therefore be to take a snapshot before processing incoming notify messages. But that would add cycles, as well as contention for the ProcArrayLock. We can be smarter: having taken the snapshot, let's use that to check for running XIDs, and not call TransactionIdIsInProgress() at all. In this way we reduce the number of ProcArrayLock acquisitions from one per message to one per notify interrupt; that's the same under light load but should be a benefit under heavy load. Light testing says that this change is a wash performance-wise for normal loads. I looked around for other callers of TransactionIdIsInProgress() that might be at similar risk, and didn't find any; all of them are inside transactions that presumably have already taken a snapshot. Problem report and diagnosis by Marko Tiikkaja, patch by me. Back-patch to all supported branches, since it's been like this since 9.0. Discussion: https://postgr.es/m/20170926182935.14128.65278@wrigleys.postgresql.org
This commit is contained in:
@@ -133,7 +133,9 @@
|
|||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
#include "utils/ps_status.h"
|
#include "utils/ps_status.h"
|
||||||
|
#include "utils/snapmgr.h"
|
||||||
#include "utils/timestamp.h"
|
#include "utils/timestamp.h"
|
||||||
|
#include "utils/tqual.h"
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -386,7 +388,8 @@ static bool SignalBackends(void);
|
|||||||
static void asyncQueueReadAllNotifications(void);
|
static void asyncQueueReadAllNotifications(void);
|
||||||
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
|
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
|
||||||
QueuePosition stop,
|
QueuePosition stop,
|
||||||
char *page_buffer);
|
char *page_buffer,
|
||||||
|
Snapshot snapshot);
|
||||||
static void asyncQueueAdvanceTail(void);
|
static void asyncQueueAdvanceTail(void);
|
||||||
static void ProcessIncomingNotify(void);
|
static void ProcessIncomingNotify(void);
|
||||||
static void NotifyMyFrontEnd(const char *channel,
|
static void NotifyMyFrontEnd(const char *channel,
|
||||||
@@ -797,7 +800,7 @@ PreCommit_Notify(void)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Queue any pending notifies */
|
/* Queue any pending notifies (must happen after the above) */
|
||||||
if (pendingNotifies)
|
if (pendingNotifies)
|
||||||
{
|
{
|
||||||
ListCell *nextNotify;
|
ListCell *nextNotify;
|
||||||
@@ -986,7 +989,9 @@ Exec_ListenPreCommit(void)
|
|||||||
* have already committed before we started to LISTEN.
|
* have already committed before we started to LISTEN.
|
||||||
*
|
*
|
||||||
* Note that we are not yet listening on anything, so we won't deliver any
|
* Note that we are not yet listening on anything, so we won't deliver any
|
||||||
* notification to the frontend.
|
* notification to the frontend. Also, although our transaction might
|
||||||
|
* have executed NOTIFY, those message(s) aren't queued yet so we can't
|
||||||
|
* see them in the queue.
|
||||||
*
|
*
|
||||||
* This will also advance the global tail pointer if possible.
|
* This will also advance the global tail pointer if possible.
|
||||||
*/
|
*/
|
||||||
@@ -1835,6 +1840,7 @@ asyncQueueReadAllNotifications(void)
|
|||||||
volatile QueuePosition pos;
|
volatile QueuePosition pos;
|
||||||
QueuePosition oldpos;
|
QueuePosition oldpos;
|
||||||
QueuePosition head;
|
QueuePosition head;
|
||||||
|
Snapshot snapshot;
|
||||||
bool advanceTail;
|
bool advanceTail;
|
||||||
|
|
||||||
/* page_buffer must be adequately aligned, so use a union */
|
/* page_buffer must be adequately aligned, so use a union */
|
||||||
@@ -1858,6 +1864,9 @@ asyncQueueReadAllNotifications(void)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Get snapshot we'll use to decide which xacts are still in progress */
|
||||||
|
snapshot = RegisterSnapshot(GetLatestSnapshot());
|
||||||
|
|
||||||
/*----------
|
/*----------
|
||||||
* Note that we deliver everything that we see in the queue and that
|
* Note that we deliver everything that we see in the queue and that
|
||||||
* matches our _current_ listening state.
|
* matches our _current_ listening state.
|
||||||
@@ -1945,7 +1954,8 @@ asyncQueueReadAllNotifications(void)
|
|||||||
* while sending the notifications to the frontend.
|
* while sending the notifications to the frontend.
|
||||||
*/
|
*/
|
||||||
reachedStop = asyncQueueProcessPageEntries(&pos, head,
|
reachedStop = asyncQueueProcessPageEntries(&pos, head,
|
||||||
page_buffer.buf);
|
page_buffer.buf,
|
||||||
|
snapshot);
|
||||||
} while (!reachedStop);
|
} while (!reachedStop);
|
||||||
}
|
}
|
||||||
PG_CATCH();
|
PG_CATCH();
|
||||||
@@ -1973,6 +1983,9 @@ asyncQueueReadAllNotifications(void)
|
|||||||
/* If we were the laziest backend, try to advance the tail pointer */
|
/* If we were the laziest backend, try to advance the tail pointer */
|
||||||
if (advanceTail)
|
if (advanceTail)
|
||||||
asyncQueueAdvanceTail();
|
asyncQueueAdvanceTail();
|
||||||
|
|
||||||
|
/* Done with snapshot */
|
||||||
|
UnregisterSnapshot(snapshot);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -1994,7 +2007,8 @@ asyncQueueReadAllNotifications(void)
|
|||||||
static bool
|
static bool
|
||||||
asyncQueueProcessPageEntries(volatile QueuePosition *current,
|
asyncQueueProcessPageEntries(volatile QueuePosition *current,
|
||||||
QueuePosition stop,
|
QueuePosition stop,
|
||||||
char *page_buffer)
|
char *page_buffer,
|
||||||
|
Snapshot snapshot)
|
||||||
{
|
{
|
||||||
bool reachedStop = false;
|
bool reachedStop = false;
|
||||||
bool reachedEndOfPage;
|
bool reachedEndOfPage;
|
||||||
@@ -2019,7 +2033,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
|
|||||||
/* Ignore messages destined for other databases */
|
/* Ignore messages destined for other databases */
|
||||||
if (qe->dboid == MyDatabaseId)
|
if (qe->dboid == MyDatabaseId)
|
||||||
{
|
{
|
||||||
if (TransactionIdIsInProgress(qe->xid))
|
if (XidInMVCCSnapshot(qe->xid, snapshot))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* The source transaction is still in progress, so we can't
|
* The source transaction is still in progress, so we can't
|
||||||
@@ -2030,10 +2044,15 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
|
|||||||
* this advance-then-back-up behavior when dealing with an
|
* this advance-then-back-up behavior when dealing with an
|
||||||
* uncommitted message.)
|
* uncommitted message.)
|
||||||
*
|
*
|
||||||
* Note that we must test TransactionIdIsInProgress before we
|
* Note that we must test XidInMVCCSnapshot before we test
|
||||||
* test TransactionIdDidCommit, else we might return a message
|
* TransactionIdDidCommit, else we might return a message from
|
||||||
* from a transaction that is not yet visible to snapshots;
|
* a transaction that is not yet visible to snapshots; compare
|
||||||
* compare the comments at the head of tqual.c.
|
* the comments at the head of tqual.c.
|
||||||
|
*
|
||||||
|
* Also, while our own xact won't be listed in the snapshot,
|
||||||
|
* we need not check for TransactionIdIsCurrentTransactionId
|
||||||
|
* because our transaction cannot (yet) have queued any
|
||||||
|
* messages.
|
||||||
*/
|
*/
|
||||||
*current = thisentry;
|
*current = thisentry;
|
||||||
reachedStop = true;
|
reachedStop = true;
|
||||||
|
@@ -73,8 +73,6 @@ SnapshotData SnapshotSelfData = {HeapTupleSatisfiesSelf};
|
|||||||
SnapshotData SnapshotAnyData = {HeapTupleSatisfiesAny};
|
SnapshotData SnapshotAnyData = {HeapTupleSatisfiesAny};
|
||||||
SnapshotData SnapshotToastData = {HeapTupleSatisfiesToast};
|
SnapshotData SnapshotToastData = {HeapTupleSatisfiesToast};
|
||||||
|
|
||||||
/* local functions */
|
|
||||||
static bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SetHintBits()
|
* SetHintBits()
|
||||||
@@ -1404,10 +1402,11 @@ HeapTupleIsSurelyDead(HeapTuple htup, TransactionId OldestXmin)
|
|||||||
*
|
*
|
||||||
* Note: GetSnapshotData never stores either top xid or subxids of our own
|
* Note: GetSnapshotData never stores either top xid or subxids of our own
|
||||||
* backend into a snapshot, so these xids will not be reported as "running"
|
* backend into a snapshot, so these xids will not be reported as "running"
|
||||||
* by this function. This is OK for current uses, because we actually only
|
* by this function. This is OK for current uses, because we always check
|
||||||
* apply this for known-committed XIDs.
|
* TransactionIdIsCurrentTransactionId first, except when it's known the
|
||||||
|
* XID could not be ours anyway.
|
||||||
*/
|
*/
|
||||||
static bool
|
bool
|
||||||
XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
|
XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
|
||||||
{
|
{
|
||||||
uint32 i;
|
uint32 i;
|
||||||
|
@@ -85,6 +85,7 @@ extern HTSV_Result HeapTupleSatisfiesVacuum(HeapTuple htup,
|
|||||||
TransactionId OldestXmin, Buffer buffer);
|
TransactionId OldestXmin, Buffer buffer);
|
||||||
extern bool HeapTupleIsSurelyDead(HeapTuple htup,
|
extern bool HeapTupleIsSurelyDead(HeapTuple htup,
|
||||||
TransactionId OldestXmin);
|
TransactionId OldestXmin);
|
||||||
|
extern bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
|
||||||
|
|
||||||
extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer,
|
extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer,
|
||||||
uint16 infomask, TransactionId xid);
|
uint16 infomask, TransactionId xid);
|
||||||
|
Reference in New Issue
Block a user