mirror of
https://github.com/postgres/postgres.git
synced 2025-05-03 22:24:49 +03:00
Fix NOTIFY to cope with I/O problems, such as out-of-disk-space.
The LISTEN/NOTIFY subsystem got confused if SimpleLruZeroPage failed, which would typically happen as a result of a write() failure while attempting to dump a dirty pg_notify page out of memory. Subsequently, all attempts to send more NOTIFY messages would fail with messages like "Could not read from file "pg_notify/nnnn" at offset nnnnn: Success". Only restarting the server would clear this condition. Per reports from Kevin Grittner and Christoph Berg. Back-patch to 9.0, where the problem was introduced during the LISTEN/NOTIFY rewrite.
This commit is contained in:
parent
c8967e38a6
commit
f374098e8c
@ -1285,6 +1285,7 @@ static ListCell *
|
|||||||
asyncQueueAddEntries(ListCell *nextNotify)
|
asyncQueueAddEntries(ListCell *nextNotify)
|
||||||
{
|
{
|
||||||
AsyncQueueEntry qe;
|
AsyncQueueEntry qe;
|
||||||
|
QueuePosition queue_head;
|
||||||
int pageno;
|
int pageno;
|
||||||
int offset;
|
int offset;
|
||||||
int slotno;
|
int slotno;
|
||||||
@ -1292,8 +1293,21 @@ asyncQueueAddEntries(ListCell *nextNotify)
|
|||||||
/* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
|
/* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
|
||||||
LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
|
LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We work with a local copy of QUEUE_HEAD, which we write back to shared
|
||||||
|
* memory upon exiting. The reason for this is that if we have to advance
|
||||||
|
* to a new page, SimpleLruZeroPage might fail (out of disk space, for
|
||||||
|
* instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
|
||||||
|
* subsequent insertions would try to put entries into a page that slru.c
|
||||||
|
* thinks doesn't exist yet.) So, use a local position variable. Note
|
||||||
|
* that if we do fail, any already-inserted queue entries are forgotten;
|
||||||
|
* this is okay, since they'd be useless anyway after our transaction
|
||||||
|
* rolls back.
|
||||||
|
*/
|
||||||
|
queue_head = QUEUE_HEAD;
|
||||||
|
|
||||||
/* Fetch the current page */
|
/* Fetch the current page */
|
||||||
pageno = QUEUE_POS_PAGE(QUEUE_HEAD);
|
pageno = QUEUE_POS_PAGE(queue_head);
|
||||||
slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId);
|
slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId);
|
||||||
/* Note we mark the page dirty before writing in it */
|
/* Note we mark the page dirty before writing in it */
|
||||||
AsyncCtl->shared->page_dirty[slotno] = true;
|
AsyncCtl->shared->page_dirty[slotno] = true;
|
||||||
@ -1305,7 +1319,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
|
|||||||
/* Construct a valid queue entry in local variable qe */
|
/* Construct a valid queue entry in local variable qe */
|
||||||
asyncQueueNotificationToEntry(n, &qe);
|
asyncQueueNotificationToEntry(n, &qe);
|
||||||
|
|
||||||
offset = QUEUE_POS_OFFSET(QUEUE_HEAD);
|
offset = QUEUE_POS_OFFSET(queue_head);
|
||||||
|
|
||||||
/* Check whether the entry really fits on the current page */
|
/* Check whether the entry really fits on the current page */
|
||||||
if (offset + qe.length <= QUEUE_PAGESIZE)
|
if (offset + qe.length <= QUEUE_PAGESIZE)
|
||||||
@ -1331,8 +1345,8 @@ asyncQueueAddEntries(ListCell *nextNotify)
|
|||||||
&qe,
|
&qe,
|
||||||
qe.length);
|
qe.length);
|
||||||
|
|
||||||
/* Advance QUEUE_HEAD appropriately, and note if page is full */
|
/* Advance queue_head appropriately, and detect if page is full */
|
||||||
if (asyncQueueAdvance(&(QUEUE_HEAD), qe.length))
|
if (asyncQueueAdvance(&(queue_head), qe.length))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Page is full, so we're done here, but first fill the next page
|
* Page is full, so we're done here, but first fill the next page
|
||||||
@ -1342,12 +1356,15 @@ asyncQueueAddEntries(ListCell *nextNotify)
|
|||||||
* asyncQueueIsFull() ensured that there is room to create this
|
* asyncQueueIsFull() ensured that there is room to create this
|
||||||
* page without overrunning the queue.
|
* page without overrunning the queue.
|
||||||
*/
|
*/
|
||||||
slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
|
slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head));
|
||||||
/* And exit the loop */
|
/* And exit the loop */
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Success, so update the global QUEUE_HEAD */
|
||||||
|
QUEUE_HEAD = queue_head;
|
||||||
|
|
||||||
LWLockRelease(AsyncCtlLock);
|
LWLockRelease(AsyncCtlLock);
|
||||||
|
|
||||||
return nextNotify;
|
return nextNotify;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user