diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 2b6e7f49aef..a805adf9e5b 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -133,7 +133,9 @@ #include "utils/builtins.h" #include "utils/memutils.h" #include "utils/ps_status.h" +#include "utils/snapmgr.h" #include "utils/timestamp.h" +#include "utils/tqual.h" /* @@ -388,7 +390,8 @@ static bool SignalBackends(void); static void asyncQueueReadAllNotifications(void); static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, - char *page_buffer); + char *page_buffer, + Snapshot snapshot); static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(void); static void NotifyMyFrontEnd(const char *channel, @@ -799,7 +802,7 @@ PreCommit_Notify(void) } } - /* Queue any pending notifies */ + /* Queue any pending notifies (must happen after the above) */ if (pendingNotifies) { ListCell *nextNotify; @@ -988,7 +991,9 @@ Exec_ListenPreCommit(void) * have already committed before we started to LISTEN. * * Note that we are not yet listening on anything, so we won't deliver any - * notification to the frontend. + * notification to the frontend. Also, although our transaction might + * have executed NOTIFY, those message(s) aren't queued yet so we can't + * see them in the queue. * * This will also advance the global tail pointer if possible. */ @@ -1837,6 +1842,7 @@ asyncQueueReadAllNotifications(void) volatile QueuePosition pos; QueuePosition oldpos; QueuePosition head; + Snapshot snapshot; bool advanceTail; /* page_buffer must be adequately aligned, so use a union */ @@ -1860,6 +1866,9 @@ asyncQueueReadAllNotifications(void) return; } + /* Get snapshot we'll use to decide which xacts are still in progress */ + snapshot = RegisterSnapshot(GetLatestSnapshot()); + /*---------- * Note that we deliver everything that we see in the queue and that * matches our _current_ listening state. @@ -1947,7 +1956,8 @@ asyncQueueReadAllNotifications(void) * while sending the notifications to the frontend. */ reachedStop = asyncQueueProcessPageEntries(&pos, head, - page_buffer.buf); + page_buffer.buf, + snapshot); } while (!reachedStop); } PG_CATCH(); @@ -1975,6 +1985,9 @@ asyncQueueReadAllNotifications(void) /* If we were the laziest backend, try to advance the tail pointer */ if (advanceTail) asyncQueueAdvanceTail(); + + /* Done with snapshot */ + UnregisterSnapshot(snapshot); } /* @@ -1996,7 +2009,8 @@ asyncQueueReadAllNotifications(void) static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, - char *page_buffer) + char *page_buffer, + Snapshot snapshot) { bool reachedStop = false; bool reachedEndOfPage; @@ -2021,7 +2035,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, /* Ignore messages destined for other databases */ if (qe->dboid == MyDatabaseId) { - if (TransactionIdIsInProgress(qe->xid)) + if (XidInMVCCSnapshot(qe->xid, snapshot)) { /* * The source transaction is still in progress, so we can't @@ -2032,10 +2046,15 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, * this advance-then-back-up behavior when dealing with an * uncommitted message.) * - * Note that we must test TransactionIdIsInProgress before we - * test TransactionIdDidCommit, else we might return a message - * from a transaction that is not yet visible to snapshots; - * compare the comments at the head of tqual.c. + * Note that we must test XidInMVCCSnapshot before we test + * TransactionIdDidCommit, else we might return a message from + * a transaction that is not yet visible to snapshots; compare + * the comments at the head of tqual.c. + * + * Also, while our own xact won't be listed in the snapshot, + * we need not check for TransactionIdIsCurrentTransactionId + * because our transaction cannot (yet) have queued any + * messages. */ *current = thisentry; reachedStop = true; diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c index 931e2fb51b7..df7f43910f9 100644 --- a/src/backend/utils/time/tqual.c +++ b/src/backend/utils/time/tqual.c @@ -73,8 +73,6 @@ SnapshotData SnapshotSelfData = {HeapTupleSatisfiesSelf}; SnapshotData SnapshotAnyData = {HeapTupleSatisfiesAny}; SnapshotData SnapshotToastData = {HeapTupleSatisfiesToast}; -/* local functions */ -static bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot); /* @@ -1575,10 +1573,11 @@ HeapTupleIsSurelyDead(HeapTupleHeader tuple, TransactionId OldestXmin) * * Note: GetSnapshotData never stores either top xid or subxids of our own * backend into a snapshot, so these xids will not be reported as "running" - * by this function. This is OK for current uses, because we actually only - * apply this for known-committed XIDs. + * by this function. This is OK for current uses, because we always check + * TransactionIdIsCurrentTransactionId first, except when it's known the + * XID could not be ours anyway. */ -static bool +bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot) { uint32 i; diff --git a/src/include/utils/tqual.h b/src/include/utils/tqual.h index a7df3390b85..a19dac6875d 100644 --- a/src/include/utils/tqual.h +++ b/src/include/utils/tqual.h @@ -85,6 +85,7 @@ extern HTSV_Result HeapTupleSatisfiesVacuum(HeapTupleHeader tuple, TransactionId OldestXmin, Buffer buffer); extern bool HeapTupleIsSurelyDead(HeapTupleHeader tuple, TransactionId OldestXmin); +extern bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot); extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer, uint16 infomask, TransactionId xid);