diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index c5be15bf223..7b774cc5e51 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -448,7 +448,6 @@ static void SignalBackends(void); static void asyncQueueReadAllNotifications(void); static bool asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, - char *page_buffer, Snapshot snapshot); static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(bool flush); @@ -1854,13 +1853,6 @@ asyncQueueReadAllNotifications(void) QueuePosition head; Snapshot snapshot; - /* page_buffer must be adequately aligned, so use a union */ - union - { - char buf[QUEUE_PAGESIZE]; - AsyncQueueEntry align; - } page_buffer; - /* Fetch current state */ LWLockAcquire(NotifyQueueLock, LW_SHARED); /* Assert checks that we have a valid state entry */ @@ -1941,37 +1933,6 @@ asyncQueueReadAllNotifications(void) do { - int64 curpage = QUEUE_POS_PAGE(pos); - int curoffset = QUEUE_POS_OFFSET(pos); - int slotno; - int copysize; - - /* - * We copy the data from SLRU into a local buffer, so as to avoid - * holding the SLRU lock while we are examining the entries and - * possibly transmitting them to our frontend. Copy only the part - * of the page we will actually inspect. - */ - slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, - InvalidTransactionId); - if (curpage == QUEUE_POS_PAGE(head)) - { - /* we only want to read as far as head */ - copysize = QUEUE_POS_OFFSET(head) - curoffset; - if (copysize < 0) - copysize = 0; /* just for safety */ - } - else - { - /* fetch all the rest of the page */ - copysize = QUEUE_PAGESIZE - curoffset; - } - memcpy(page_buffer.buf + curoffset, - NotifyCtl->shared->page_buffer[slotno] + curoffset, - copysize); - /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ - LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); - /* * Process messages up to the stop position, end of page, or an * uncommitted message. @@ -1987,9 +1948,7 @@ asyncQueueReadAllNotifications(void) * rewrite pages under us. Especially we don't want to hold a lock * while sending the notifications to the frontend. */ - reachedStop = asyncQueueProcessPageEntries(&pos, head, - page_buffer.buf, - snapshot); + reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot); } while (!reachedStop); /* Update shared state */ @@ -2008,13 +1967,6 @@ asyncQueueReadAllNotifications(void) * Fetch notifications from the shared queue, beginning at position current, * and deliver relevant ones to my frontend. * - * The current page must have been fetched into page_buffer from shared - * memory. (We could access the page right in shared memory, but that - * would imply holding the SLRU bank lock throughout this routine.) - * - * We stop if we reach the "stop" position, or reach a notification from an - * uncommitted transaction, or reach the end of the page. - * * The function returns true once we have reached the stop position or an * uncommitted notification, and false if we have finished with the page. * In other words: once it returns true there is no need to look further. @@ -2023,16 +1975,34 @@ asyncQueueReadAllNotifications(void) static bool asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, - char *page_buffer, Snapshot snapshot) { + int64 curpage = QUEUE_POS_PAGE(*current); + int slotno; + char *page_buffer; bool reachedStop = false; bool reachedEndOfPage; - AsyncQueueEntry *qe; + + /* + * We copy the entries into a local buffer to avoid holding the SLRU lock + * while we transmit them to our frontend. The local buffer must be + * adequately aligned, so use a union. + */ + union + { + char buf[QUEUE_PAGESIZE]; + AsyncQueueEntry align; + } local_buf; + char *local_buf_end = local_buf.buf; + + slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, + InvalidTransactionId); + page_buffer = NotifyCtl->shared->page_buffer[slotno]; do { QueuePosition thisentry = *current; + AsyncQueueEntry *qe; if (QUEUE_POS_EQUAL(thisentry, stop)) break; @@ -2074,18 +2044,23 @@ asyncQueueProcessPageEntries(QueuePosition *current, reachedStop = true; break; } - else if (TransactionIdDidCommit(qe->xid)) + + /* + * Quick check for the case that we're not listening on any + * channels, before calling TransactionIdDidCommit(). This makes + * that case a little faster, but more importantly, it ensures + * that if there's a bad entry in the queue for which + * TransactionIdDidCommit() fails for some reason, we can skip + * over it on the first LISTEN in a session, and not get stuck on + * it indefinitely. + */ + if (listenChannels == NIL) + continue; + + if (TransactionIdDidCommit(qe->xid)) { - /* qe->data is the null-terminated channel name */ - char *channel = qe->data; - - if (IsListeningOn(channel)) - { - /* payload follows channel name */ - char *payload = qe->data + strlen(channel) + 1; - - NotifyMyFrontEnd(channel, payload, qe->srcPid); - } + memcpy(local_buf_end, qe, qe->length); + local_buf_end += qe->length; } else { @@ -2099,6 +2074,32 @@ asyncQueueProcessPageEntries(QueuePosition *current, /* Loop back if we're not at end of page */ } while (!reachedEndOfPage); + /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ + LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); + + /* + * Now that we have let go of the SLRU bank lock, send the notifications + * to our backend + */ + Assert(local_buf_end - local_buf.buf <= BLCKSZ); + for (char *p = local_buf.buf; p < local_buf_end;) + { + AsyncQueueEntry *qe = (AsyncQueueEntry *) p; + + /* qe->data is the null-terminated channel name */ + char *channel = qe->data; + + if (IsListeningOn(channel)) + { + /* payload follows channel name */ + char *payload = qe->data + strlen(channel) + 1; + + NotifyMyFrontEnd(channel, payload, qe->srcPid); + } + + p += qe->length; + } + if (QUEUE_POS_EQUAL(*current, stop)) reachedStop = true;