diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index bacc08eb84f..a93c81bca28 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -137,7 +137,9 @@ #include "utils/builtins.h" #include "utils/memutils.h" #include "utils/ps_status.h" +#include "utils/snapmgr.h" #include "utils/timestamp.h" +#include "utils/tqual.h" /* @@ -387,7 +389,8 @@ static bool SignalBackends(void); static void asyncQueueReadAllNotifications(void); static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, - char *page_buffer); + char *page_buffer, + Snapshot snapshot); static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(void); static bool AsyncExistsPendingNotify(const char *channel, const char *payload); @@ -798,7 +801,7 @@ PreCommit_Notify(void) } } - /* Queue any pending notifies */ + /* Queue any pending notifies (must happen after the above) */ if (pendingNotifies) { ListCell *nextNotify; @@ -987,7 +990,9 @@ Exec_ListenPreCommit(void) * have already committed before we started to LISTEN. * * Note that we are not yet listening on anything, so we won't deliver any - * notification to the frontend. + * notification to the frontend. Also, although our transaction might + * have executed NOTIFY, those message(s) aren't queued yet so we can't + * see them in the queue. * * This will also advance the global tail pointer if possible. */ @@ -1744,6 +1749,7 @@ asyncQueueReadAllNotifications(void) volatile QueuePosition pos; QueuePosition oldpos; QueuePosition head; + Snapshot snapshot; bool advanceTail; /* page_buffer must be adequately aligned, so use a union */ @@ -1767,6 +1773,9 @@ asyncQueueReadAllNotifications(void) return; } + /* Get snapshot we'll use to decide which xacts are still in progress */ + snapshot = RegisterSnapshot(GetLatestSnapshot()); + /*---------- * Note that we deliver everything that we see in the queue and that * matches our _current_ listening state. @@ -1854,7 +1863,8 @@ asyncQueueReadAllNotifications(void) * while sending the notifications to the frontend. */ reachedStop = asyncQueueProcessPageEntries(&pos, head, - page_buffer.buf); + page_buffer.buf, + snapshot); } while (!reachedStop); } PG_CATCH(); @@ -1882,6 +1892,9 @@ asyncQueueReadAllNotifications(void) /* If we were the laziest backend, try to advance the tail pointer */ if (advanceTail) asyncQueueAdvanceTail(); + + /* Done with snapshot */ + UnregisterSnapshot(snapshot); } /* @@ -1903,7 +1916,8 @@ asyncQueueReadAllNotifications(void) static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, - char *page_buffer) + char *page_buffer, + Snapshot snapshot) { bool reachedStop = false; bool reachedEndOfPage; @@ -1928,7 +1942,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, /* Ignore messages destined for other databases */ if (qe->dboid == MyDatabaseId) { - if (TransactionIdIsInProgress(qe->xid)) + if (XidInMVCCSnapshot(qe->xid, snapshot)) { /* * The source transaction is still in progress, so we can't @@ -1939,10 +1953,15 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, * this advance-then-back-up behavior when dealing with an * uncommitted message.) * - * Note that we must test TransactionIdIsInProgress before we - * test TransactionIdDidCommit, else we might return a message - * from a transaction that is not yet visible to snapshots; - * compare the comments at the head of tqual.c. + * Note that we must test XidInMVCCSnapshot before we test + * TransactionIdDidCommit, else we might return a message from + * a transaction that is not yet visible to snapshots; compare + * the comments at the head of tqual.c. + * + * Also, while our own xact won't be listed in the snapshot, + * we need not check for TransactionIdIsCurrentTransactionId + * because our transaction cannot (yet) have queued any + * messages. */ *current = thisentry; reachedStop = true; diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c index bbac4083c98..b7aab0dd190 100644 --- a/src/backend/utils/time/tqual.c +++ b/src/backend/utils/time/tqual.c @@ -81,8 +81,6 @@ SnapshotData SnapshotSelfData = {HeapTupleSatisfiesSelf}; SnapshotData SnapshotAnyData = {HeapTupleSatisfiesAny}; -/* local functions */ -static bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot); /* * SetHintBits() @@ -1479,10 +1477,10 @@ HeapTupleIsSurelyDead(HeapTuple htup, TransactionId OldestXmin) * Note: GetSnapshotData never stores either top xid or subxids of our own * backend into a snapshot, so these xids will not be reported as "running" * by this function. This is OK for current uses, because we always check - * TransactionIdIsCurrentTransactionId first, except for known-committed - * XIDs which could not be ours anyway. + * TransactionIdIsCurrentTransactionId first, except when it's known the + * XID could not be ours anyway. */ -static bool +bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot) { uint32 i; diff --git a/src/include/utils/tqual.h b/src/include/utils/tqual.h index 9a3b56e5f03..96eaf01ca0e 100644 --- a/src/include/utils/tqual.h +++ b/src/include/utils/tqual.h @@ -78,6 +78,7 @@ extern HTSV_Result HeapTupleSatisfiesVacuum(HeapTuple htup, TransactionId OldestXmin, Buffer buffer); extern bool HeapTupleIsSurelyDead(HeapTuple htup, TransactionId OldestXmin); +extern bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot); extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer, uint16 infomask, TransactionId xid);