mirror of
https://github.com/postgres/postgres.git
synced 2025-06-17 17:02:08 +03:00
pgindent run for 9.0
This commit is contained in:
@ -7,7 +7,7 @@
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.154 2010/02/20 21:24:02 tgl Exp $
|
||||
* $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.155 2010/02/26 02:00:37 momjian Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -149,7 +149,7 @@
|
||||
*
|
||||
* This struct declaration has the maximal length, but in a real queue entry
|
||||
* the data area is only big enough for the actual channel and payload strings
|
||||
* (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible
|
||||
* (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible
|
||||
* entry size, if both channel and payload strings are empty (but note it
|
||||
* doesn't include alignment padding).
|
||||
*
|
||||
@ -158,11 +158,11 @@
|
||||
*/
|
||||
typedef struct AsyncQueueEntry
|
||||
{
|
||||
int length; /* total allocated length of entry */
|
||||
Oid dboid; /* sender's database OID */
|
||||
TransactionId xid; /* sender's XID */
|
||||
int32 srcPid; /* sender's PID */
|
||||
char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
|
||||
int length; /* total allocated length of entry */
|
||||
Oid dboid; /* sender's database OID */
|
||||
TransactionId xid; /* sender's XID */
|
||||
int32 srcPid; /* sender's PID */
|
||||
char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
|
||||
} AsyncQueueEntry;
|
||||
|
||||
/* Currently, no field of AsyncQueueEntry requires more than int alignment */
|
||||
@ -175,8 +175,8 @@ typedef struct AsyncQueueEntry
|
||||
*/
|
||||
typedef struct QueuePosition
|
||||
{
|
||||
int page; /* SLRU page number */
|
||||
int offset; /* byte offset within page */
|
||||
int page; /* SLRU page number */
|
||||
int offset; /* byte offset within page */
|
||||
} QueuePosition;
|
||||
|
||||
#define QUEUE_POS_PAGE(x) ((x).page)
|
||||
@ -202,11 +202,11 @@ typedef struct QueuePosition
|
||||
*/
|
||||
typedef struct QueueBackendStatus
|
||||
{
|
||||
int32 pid; /* either a PID or InvalidPid */
|
||||
QueuePosition pos; /* backend has read queue up to here */
|
||||
int32 pid; /* either a PID or InvalidPid */
|
||||
QueuePosition pos; /* backend has read queue up to here */
|
||||
} QueueBackendStatus;
|
||||
|
||||
#define InvalidPid (-1)
|
||||
#define InvalidPid (-1)
|
||||
|
||||
/*
|
||||
* Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
|
||||
@ -230,15 +230,15 @@ typedef struct QueueBackendStatus
|
||||
*/
|
||||
typedef struct AsyncQueueControl
|
||||
{
|
||||
QueuePosition head; /* head points to the next free location */
|
||||
QueuePosition tail; /* the global tail is equivalent to the
|
||||
tail of the "slowest" backend */
|
||||
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
|
||||
QueueBackendStatus backend[1]; /* actually of length MaxBackends+1 */
|
||||
QueuePosition head; /* head points to the next free location */
|
||||
QueuePosition tail; /* the global tail is equivalent to the tail
|
||||
* of the "slowest" backend */
|
||||
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
|
||||
QueueBackendStatus backend[1]; /* actually of length MaxBackends+1 */
|
||||
/* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
|
||||
} AsyncQueueControl;
|
||||
|
||||
static AsyncQueueControl *asyncQueueControl;
|
||||
static AsyncQueueControl *asyncQueueControl;
|
||||
|
||||
#define QUEUE_HEAD (asyncQueueControl->head)
|
||||
#define QUEUE_TAIL (asyncQueueControl->tail)
|
||||
@ -248,11 +248,11 @@ static AsyncQueueControl *asyncQueueControl;
|
||||
/*
|
||||
* The SLRU buffer area through which we access the notification queue
|
||||
*/
|
||||
static SlruCtlData AsyncCtlData;
|
||||
static SlruCtlData AsyncCtlData;
|
||||
|
||||
#define AsyncCtl (&AsyncCtlData)
|
||||
#define QUEUE_PAGESIZE BLCKSZ
|
||||
#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
|
||||
#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
|
||||
|
||||
/*
|
||||
* slru.c currently assumes that all filenames are four characters of hex
|
||||
@ -265,7 +265,7 @@ static SlruCtlData AsyncCtlData;
|
||||
*
|
||||
* The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
|
||||
* pages, because more than that would confuse slru.c into thinking there
|
||||
* was a wraparound condition. With the default BLCKSZ this means there
|
||||
* was a wraparound condition. With the default BLCKSZ this means there
|
||||
* can be up to 8GB of queued-and-not-read data.
|
||||
*
|
||||
* Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
|
||||
@ -309,7 +309,7 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */
|
||||
|
||||
/*
|
||||
* State for outbound notifies consists of a list of all channels+payloads
|
||||
* NOTIFYed in the current transaction. We do not actually perform a NOTIFY
|
||||
* NOTIFYed in the current transaction. We do not actually perform a NOTIFY
|
||||
* until and unless the transaction commits. pendingNotifies is NIL if no
|
||||
* NOTIFYs have been done in the current transaction.
|
||||
*
|
||||
@ -325,11 +325,11 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */
|
||||
*/
|
||||
typedef struct Notification
|
||||
{
|
||||
char *channel; /* channel name */
|
||||
char *payload; /* payload string (can be empty) */
|
||||
char *channel; /* channel name */
|
||||
char *payload; /* payload string (can be empty) */
|
||||
} Notification;
|
||||
|
||||
static List *pendingNotifies = NIL; /* list of Notifications */
|
||||
static List *pendingNotifies = NIL; /* list of Notifications */
|
||||
|
||||
static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
|
||||
|
||||
@ -348,8 +348,10 @@ static volatile sig_atomic_t notifyInterruptOccurred = 0;
|
||||
|
||||
/* True if we've registered an on_shmem_exit cleanup */
|
||||
static bool unlistenExitRegistered = false;
|
||||
|
||||
/* has this backend sent notifications in the current transaction? */
|
||||
static bool backendHasSentNotifications = false;
|
||||
|
||||
/* has this backend executed its first LISTEN in the current transaction? */
|
||||
static bool backendHasExecutedInitialListen = false;
|
||||
|
||||
@ -380,8 +382,8 @@ static bool asyncQueueProcessPageEntries(QueuePosition *current,
|
||||
static void asyncQueueAdvanceTail(void);
|
||||
static void ProcessIncomingNotify(void);
|
||||
static void NotifyMyFrontEnd(const char *channel,
|
||||
const char *payload,
|
||||
int32 srcPid);
|
||||
const char *payload,
|
||||
int32 srcPid);
|
||||
static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
|
||||
static void ClearPendingActionsAndNotifies(void);
|
||||
|
||||
@ -408,17 +410,17 @@ asyncQueuePagePrecedesLogically(int p, int q)
|
||||
int diff;
|
||||
|
||||
/*
|
||||
* We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should
|
||||
* be in the range 0..QUEUE_MAX_PAGE.
|
||||
* We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
|
||||
* in the range 0..QUEUE_MAX_PAGE.
|
||||
*/
|
||||
Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
|
||||
Assert(q >= 0 && q <= QUEUE_MAX_PAGE);
|
||||
|
||||
diff = p - q;
|
||||
if (diff >= ((QUEUE_MAX_PAGE+1)/2))
|
||||
diff -= QUEUE_MAX_PAGE+1;
|
||||
else if (diff < -((QUEUE_MAX_PAGE+1)/2))
|
||||
diff += QUEUE_MAX_PAGE+1;
|
||||
if (diff >= ((QUEUE_MAX_PAGE + 1) / 2))
|
||||
diff -= QUEUE_MAX_PAGE + 1;
|
||||
else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
|
||||
diff += QUEUE_MAX_PAGE + 1;
|
||||
return diff < 0;
|
||||
}
|
||||
|
||||
@ -428,7 +430,7 @@ asyncQueuePagePrecedesLogically(int p, int q)
|
||||
Size
|
||||
AsyncShmemSize(void)
|
||||
{
|
||||
Size size;
|
||||
Size size;
|
||||
|
||||
/* This had better match AsyncShmemInit */
|
||||
size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
|
||||
@ -445,9 +447,9 @@ AsyncShmemSize(void)
|
||||
void
|
||||
AsyncShmemInit(void)
|
||||
{
|
||||
bool found;
|
||||
int slotno;
|
||||
Size size;
|
||||
bool found;
|
||||
int slotno;
|
||||
Size size;
|
||||
|
||||
/*
|
||||
* Create or attach to the AsyncQueueControl structure.
|
||||
@ -468,7 +470,7 @@ AsyncShmemInit(void)
|
||||
if (!found)
|
||||
{
|
||||
/* First time through, so initialize it */
|
||||
int i;
|
||||
int i;
|
||||
|
||||
SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
|
||||
SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
|
||||
@ -598,8 +600,8 @@ Async_Notify(const char *channel, const char *payload)
|
||||
n->payload = "";
|
||||
|
||||
/*
|
||||
* We want to preserve the order so we need to append every
|
||||
* notification. See comments at AsyncExistsPendingNotify().
|
||||
* We want to preserve the order so we need to append every notification.
|
||||
* See comments at AsyncExistsPendingNotify().
|
||||
*/
|
||||
pendingNotifies = lappend(pendingNotifies, n);
|
||||
|
||||
@ -698,13 +700,13 @@ Async_UnlistenAll(void)
|
||||
Datum
|
||||
pg_listening_channels(PG_FUNCTION_ARGS)
|
||||
{
|
||||
FuncCallContext *funcctx;
|
||||
ListCell **lcp;
|
||||
FuncCallContext *funcctx;
|
||||
ListCell **lcp;
|
||||
|
||||
/* stuff done only on the first call of the function */
|
||||
if (SRF_IS_FIRSTCALL())
|
||||
{
|
||||
MemoryContext oldcontext;
|
||||
MemoryContext oldcontext;
|
||||
|
||||
/* create a function context for cross-call persistence */
|
||||
funcctx = SRF_FIRSTCALL_INIT();
|
||||
@ -726,7 +728,7 @@ pg_listening_channels(PG_FUNCTION_ARGS)
|
||||
|
||||
while (*lcp != NULL)
|
||||
{
|
||||
char *channel = (char *) lfirst(*lcp);
|
||||
char *channel = (char *) lfirst(*lcp);
|
||||
|
||||
*lcp = lnext(*lcp);
|
||||
SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
|
||||
@ -818,9 +820,9 @@ PreCommit_Notify(void)
|
||||
|
||||
/*
|
||||
* Make sure that we have an XID assigned to the current transaction.
|
||||
* GetCurrentTransactionId is cheap if we already have an XID, but
|
||||
* not so cheap if we don't, and we'd prefer not to do that work
|
||||
* while holding AsyncQueueLock.
|
||||
* GetCurrentTransactionId is cheap if we already have an XID, but not
|
||||
* so cheap if we don't, and we'd prefer not to do that work while
|
||||
* holding AsyncQueueLock.
|
||||
*/
|
||||
(void) GetCurrentTransactionId();
|
||||
|
||||
@ -850,7 +852,7 @@ PreCommit_Notify(void)
|
||||
while (nextNotify != NULL)
|
||||
{
|
||||
/*
|
||||
* Add the pending notifications to the queue. We acquire and
|
||||
* Add the pending notifications to the queue. We acquire and
|
||||
* release AsyncQueueLock once per page, which might be overkill
|
||||
* but it does allow readers to get in while we're doing this.
|
||||
*
|
||||
@ -866,7 +868,7 @@ PreCommit_Notify(void)
|
||||
if (asyncQueueIsFull())
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
|
||||
errmsg("too many notifications in the NOTIFY queue")));
|
||||
errmsg("too many notifications in the NOTIFY queue")));
|
||||
nextNotify = asyncQueueAddEntries(nextNotify);
|
||||
LWLockRelease(AsyncQueueLock);
|
||||
}
|
||||
@ -915,8 +917,8 @@ AtCommit_Notify(void)
|
||||
}
|
||||
|
||||
/*
|
||||
* If we did an initial LISTEN, listenChannels now has the entry, so
|
||||
* we no longer need or want the flag to be set.
|
||||
* If we did an initial LISTEN, listenChannels now has the entry, so we no
|
||||
* longer need or want the flag to be set.
|
||||
*/
|
||||
backendHasExecutedInitialListen = false;
|
||||
|
||||
@ -943,15 +945,15 @@ Exec_ListenPreCommit(void)
|
||||
elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
|
||||
|
||||
/*
|
||||
* We need this variable to detect an aborted initial LISTEN.
|
||||
* In that case we would set up our pointer but not listen on any channel.
|
||||
* This flag gets cleared in AtCommit_Notify or AtAbort_Notify().
|
||||
* We need this variable to detect an aborted initial LISTEN. In that case
|
||||
* we would set up our pointer but not listen on any channel. This flag
|
||||
* gets cleared in AtCommit_Notify or AtAbort_Notify().
|
||||
*/
|
||||
backendHasExecutedInitialListen = true;
|
||||
|
||||
/*
|
||||
* Before registering, make sure we will unlisten before dying.
|
||||
* (Note: this action does not get undone if we abort later.)
|
||||
* Before registering, make sure we will unlisten before dying. (Note:
|
||||
* this action does not get undone if we abort later.)
|
||||
*/
|
||||
if (!unlistenExitRegistered)
|
||||
{
|
||||
@ -977,8 +979,8 @@ Exec_ListenPreCommit(void)
|
||||
* already-committed notifications. Still, we could get notifications that
|
||||
* have already committed before we started to LISTEN.
|
||||
*
|
||||
* Note that we are not yet listening on anything, so we won't deliver
|
||||
* any notification to the frontend.
|
||||
* Note that we are not yet listening on anything, so we won't deliver any
|
||||
* notification to the frontend.
|
||||
*
|
||||
* This will also advance the global tail pointer if possible.
|
||||
*/
|
||||
@ -1020,8 +1022,8 @@ Exec_ListenCommit(const char *channel)
|
||||
static void
|
||||
Exec_UnlistenCommit(const char *channel)
|
||||
{
|
||||
ListCell *q;
|
||||
ListCell *prev;
|
||||
ListCell *q;
|
||||
ListCell *prev;
|
||||
|
||||
if (Trace_notify)
|
||||
elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
|
||||
@ -1029,7 +1031,7 @@ Exec_UnlistenCommit(const char *channel)
|
||||
prev = NULL;
|
||||
foreach(q, listenChannels)
|
||||
{
|
||||
char *lchan = (char *) lfirst(q);
|
||||
char *lchan = (char *) lfirst(q);
|
||||
|
||||
if (strcmp(lchan, channel) == 0)
|
||||
{
|
||||
@ -1078,12 +1080,12 @@ Exec_UnlistenAllCommit(void)
|
||||
* The reason that this is not done in AtCommit_Notify is that there is
|
||||
* a nonzero chance of errors here (for example, encoding conversion errors
|
||||
* while trying to format messages to our frontend). An error during
|
||||
* AtCommit_Notify would be a PANIC condition. The timing is also arranged
|
||||
* AtCommit_Notify would be a PANIC condition. The timing is also arranged
|
||||
* to ensure that a transaction's self-notifies are delivered to the frontend
|
||||
* before it gets the terminating ReadyForQuery message.
|
||||
*
|
||||
* Note that we send signals and process the queue even if the transaction
|
||||
* eventually aborted. This is because we need to clean out whatever got
|
||||
* eventually aborted. This is because we need to clean out whatever got
|
||||
* added to the queue.
|
||||
*
|
||||
* NOTE: we are outside of any transaction here.
|
||||
@ -1098,9 +1100,9 @@ ProcessCompletedNotifies(void)
|
||||
return;
|
||||
|
||||
/*
|
||||
* We reset the flag immediately; otherwise, if any sort of error
|
||||
* occurs below, we'd be locked up in an infinite loop, because
|
||||
* control will come right back here after error cleanup.
|
||||
* We reset the flag immediately; otherwise, if any sort of error occurs
|
||||
* below, we'd be locked up in an infinite loop, because control will come
|
||||
* right back here after error cleanup.
|
||||
*/
|
||||
backendHasSentNotifications = false;
|
||||
|
||||
@ -1108,8 +1110,8 @@ ProcessCompletedNotifies(void)
|
||||
elog(DEBUG1, "ProcessCompletedNotifies");
|
||||
|
||||
/*
|
||||
* We must run asyncQueueReadAllNotifications inside a transaction,
|
||||
* else bad things happen if it gets an error.
|
||||
* We must run asyncQueueReadAllNotifications inside a transaction, else
|
||||
* bad things happen if it gets an error.
|
||||
*/
|
||||
StartTransactionCommand();
|
||||
|
||||
@ -1125,11 +1127,11 @@ ProcessCompletedNotifies(void)
|
||||
{
|
||||
/*
|
||||
* If we found no other listening backends, and we aren't listening
|
||||
* ourselves, then we must execute asyncQueueAdvanceTail to flush
|
||||
* the queue, because ain't nobody else gonna do it. This prevents
|
||||
* queue overflow when we're sending useless notifies to nobody.
|
||||
* (A new listener could have joined since we looked, but if so this
|
||||
* is harmless.)
|
||||
* ourselves, then we must execute asyncQueueAdvanceTail to flush the
|
||||
* queue, because ain't nobody else gonna do it. This prevents queue
|
||||
* overflow when we're sending useless notifies to nobody. (A new
|
||||
* listener could have joined since we looked, but if so this is
|
||||
* harmless.)
|
||||
*/
|
||||
asyncQueueAdvanceTail();
|
||||
}
|
||||
@ -1164,14 +1166,14 @@ IsListeningOn(const char *channel)
|
||||
|
||||
/*
|
||||
* Remove our entry from the listeners array when we are no longer listening
|
||||
* on any channel. NB: must not fail if we're already not listening.
|
||||
* on any channel. NB: must not fail if we're already not listening.
|
||||
*/
|
||||
static void
|
||||
asyncQueueUnregister(void)
|
||||
{
|
||||
bool advanceTail;
|
||||
bool advanceTail;
|
||||
|
||||
Assert(listenChannels == NIL); /* else caller error */
|
||||
Assert(listenChannels == NIL); /* else caller error */
|
||||
|
||||
LWLockAcquire(AsyncQueueLock, LW_SHARED);
|
||||
/* check if entry is valid and oldest ... */
|
||||
@ -1200,7 +1202,7 @@ asyncQueueIsFull(void)
|
||||
/*
|
||||
* The queue is full if creating a new head page would create a page that
|
||||
* logically precedes the current global tail pointer, ie, the head
|
||||
* pointer would wrap around compared to the tail. We cannot create such
|
||||
* pointer would wrap around compared to the tail. We cannot create such
|
||||
* a head page for fear of confusing slru.c. For safety we round the tail
|
||||
* pointer back to a segment boundary (compare the truncation logic in
|
||||
* asyncQueueAdvanceTail).
|
||||
@ -1219,15 +1221,15 @@ asyncQueueIsFull(void)
|
||||
|
||||
/*
|
||||
* Advance the QueuePosition to the next entry, assuming that the current
|
||||
* entry is of length entryLength. If we jump to a new page the function
|
||||
* entry is of length entryLength. If we jump to a new page the function
|
||||
* returns true, else false.
|
||||
*/
|
||||
static bool
|
||||
asyncQueueAdvance(QueuePosition *position, int entryLength)
|
||||
{
|
||||
int pageno = QUEUE_POS_PAGE(*position);
|
||||
int offset = QUEUE_POS_OFFSET(*position);
|
||||
bool pageJump = false;
|
||||
int pageno = QUEUE_POS_PAGE(*position);
|
||||
int offset = QUEUE_POS_OFFSET(*position);
|
||||
bool pageJump = false;
|
||||
|
||||
/*
|
||||
* Move to the next writing position: First jump over what we have just
|
||||
@ -1245,7 +1247,7 @@ asyncQueueAdvance(QueuePosition *position, int entryLength)
|
||||
{
|
||||
pageno++;
|
||||
if (pageno > QUEUE_MAX_PAGE)
|
||||
pageno = 0; /* wrap around */
|
||||
pageno = 0; /* wrap around */
|
||||
offset = 0;
|
||||
pageJump = true;
|
||||
}
|
||||
@ -1260,9 +1262,9 @@ asyncQueueAdvance(QueuePosition *position, int entryLength)
|
||||
static void
|
||||
asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
|
||||
{
|
||||
size_t channellen = strlen(n->channel);
|
||||
size_t payloadlen = strlen(n->payload);
|
||||
int entryLength;
|
||||
size_t channellen = strlen(n->channel);
|
||||
size_t payloadlen = strlen(n->payload);
|
||||
int entryLength;
|
||||
|
||||
Assert(channellen < NAMEDATALEN);
|
||||
Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
|
||||
@ -1288,7 +1290,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
|
||||
* the last byte which simplifies reading the page later.
|
||||
*
|
||||
* We are passed the list cell containing the next notification to write
|
||||
* and return the first still-unwritten cell back. Eventually we will return
|
||||
* and return the first still-unwritten cell back. Eventually we will return
|
||||
* NULL indicating all is done.
|
||||
*
|
||||
* We are holding AsyncQueueLock already from the caller and grab AsyncCtlLock
|
||||
@ -1297,10 +1299,10 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
|
||||
static ListCell *
|
||||
asyncQueueAddEntries(ListCell *nextNotify)
|
||||
{
|
||||
AsyncQueueEntry qe;
|
||||
int pageno;
|
||||
int offset;
|
||||
int slotno;
|
||||
AsyncQueueEntry qe;
|
||||
int pageno;
|
||||
int offset;
|
||||
int slotno;
|
||||
|
||||
/* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
|
||||
LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
|
||||
@ -1313,7 +1315,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
|
||||
|
||||
while (nextNotify != NULL)
|
||||
{
|
||||
Notification *n = (Notification *) lfirst(nextNotify);
|
||||
Notification *n = (Notification *) lfirst(nextNotify);
|
||||
|
||||
/* Construct a valid queue entry in local variable qe */
|
||||
asyncQueueNotificationToEntry(n, &qe);
|
||||
@ -1335,8 +1337,8 @@ asyncQueueAddEntries(ListCell *nextNotify)
|
||||
*/
|
||||
qe.length = QUEUE_PAGESIZE - offset;
|
||||
qe.dboid = InvalidOid;
|
||||
qe.data[0] = '\0'; /* empty channel */
|
||||
qe.data[1] = '\0'; /* empty payload */
|
||||
qe.data[0] = '\0'; /* empty channel */
|
||||
qe.data[1] = '\0'; /* empty payload */
|
||||
}
|
||||
|
||||
/* Now copy qe into the shared buffer page */
|
||||
@ -1348,12 +1350,12 @@ asyncQueueAddEntries(ListCell *nextNotify)
|
||||
if (asyncQueueAdvance(&(QUEUE_HEAD), qe.length))
|
||||
{
|
||||
/*
|
||||
* Page is full, so we're done here, but first fill the next
|
||||
* page with zeroes. The reason to do this is to ensure that
|
||||
* slru.c's idea of the head page is always the same as ours,
|
||||
* which avoids boundary problems in SimpleLruTruncate. The
|
||||
* test in asyncQueueIsFull() ensured that there is room to
|
||||
* create this page without overrunning the queue.
|
||||
* Page is full, so we're done here, but first fill the next page
|
||||
* with zeroes. The reason to do this is to ensure that slru.c's
|
||||
* idea of the head page is always the same as ours, which avoids
|
||||
* boundary problems in SimpleLruTruncate. The test in
|
||||
* asyncQueueIsFull() ensured that there is room to create this
|
||||
* page without overrunning the queue.
|
||||
*/
|
||||
slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
|
||||
/* And exit the loop */
|
||||
@ -1377,24 +1379,24 @@ asyncQueueAddEntries(ListCell *nextNotify)
|
||||
static void
|
||||
asyncQueueFillWarning(void)
|
||||
{
|
||||
int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
|
||||
int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
|
||||
int occupied;
|
||||
double fillDegree;
|
||||
TimestampTz t;
|
||||
int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
|
||||
int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
|
||||
int occupied;
|
||||
double fillDegree;
|
||||
TimestampTz t;
|
||||
|
||||
occupied = headPage - tailPage;
|
||||
|
||||
if (occupied == 0)
|
||||
return; /* fast exit for common case */
|
||||
|
||||
|
||||
if (occupied < 0)
|
||||
{
|
||||
/* head has wrapped around, tail not yet */
|
||||
occupied += QUEUE_MAX_PAGE+1;
|
||||
occupied += QUEUE_MAX_PAGE + 1;
|
||||
}
|
||||
|
||||
fillDegree = (double) occupied / (double) ((QUEUE_MAX_PAGE+1)/2);
|
||||
fillDegree = (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
|
||||
|
||||
if (fillDegree < 0.5)
|
||||
return;
|
||||
@ -1404,9 +1406,9 @@ asyncQueueFillWarning(void)
|
||||
if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
|
||||
t, QUEUE_FULL_WARN_INTERVAL))
|
||||
{
|
||||
QueuePosition min = QUEUE_HEAD;
|
||||
int32 minPid = InvalidPid;
|
||||
int i;
|
||||
QueuePosition min = QUEUE_HEAD;
|
||||
int32 minPid = InvalidPid;
|
||||
int i;
|
||||
|
||||
for (i = 1; i <= MaxBackends; i++)
|
||||
{
|
||||
@ -1455,13 +1457,13 @@ SignalBackends(void)
|
||||
int32 pid;
|
||||
|
||||
/*
|
||||
* Identify all backends that are listening and not already up-to-date.
|
||||
* We don't want to send signals while holding the AsyncQueueLock, so
|
||||
* we just build a list of target PIDs.
|
||||
* Identify all backends that are listening and not already up-to-date. We
|
||||
* don't want to send signals while holding the AsyncQueueLock, so we just
|
||||
* build a list of target PIDs.
|
||||
*
|
||||
* XXX in principle these pallocs could fail, which would be bad.
|
||||
* Maybe preallocate the arrays? But in practice this is only run
|
||||
* in trivial transactions, so there should surely be space available.
|
||||
* XXX in principle these pallocs could fail, which would be bad. Maybe
|
||||
* preallocate the arrays? But in practice this is only run in trivial
|
||||
* transactions, so there should surely be space available.
|
||||
*/
|
||||
pids = (int32 *) palloc(MaxBackends * sizeof(int32));
|
||||
ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
|
||||
@ -1493,8 +1495,8 @@ SignalBackends(void)
|
||||
/*
|
||||
* Note: assuming things aren't broken, a signal failure here could
|
||||
* only occur if the target backend exited since we released
|
||||
* AsyncQueueLock; which is unlikely but certainly possible.
|
||||
* So we just log a low-level debug message if it happens.
|
||||
* AsyncQueueLock; which is unlikely but certainly possible. So we
|
||||
* just log a low-level debug message if it happens.
|
||||
*/
|
||||
if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
|
||||
elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
|
||||
@ -1521,8 +1523,8 @@ AtAbort_Notify(void)
|
||||
{
|
||||
/*
|
||||
* If we LISTEN but then roll back the transaction we have set our pointer
|
||||
* but have not made any entry in listenChannels. In that case, remove
|
||||
* our pointer again.
|
||||
* but have not made any entry in listenChannels. In that case, remove our
|
||||
* pointer again.
|
||||
*/
|
||||
if (backendHasExecutedInitialListen)
|
||||
{
|
||||
@ -1778,7 +1780,7 @@ EnableNotifyInterrupt(void)
|
||||
* is disabled until the next EnableNotifyInterrupt call.
|
||||
*
|
||||
* The PROCSIG_CATCHUP_INTERRUPT signal handler also needs to call this,
|
||||
* so as to prevent conflicts if one signal interrupts the other. So we
|
||||
* so as to prevent conflicts if one signal interrupts the other. So we
|
||||
* must return the previous state of the flag.
|
||||
*/
|
||||
bool
|
||||
@ -1799,15 +1801,17 @@ DisableNotifyInterrupt(void)
|
||||
static void
|
||||
asyncQueueReadAllNotifications(void)
|
||||
{
|
||||
QueuePosition pos;
|
||||
QueuePosition oldpos;
|
||||
QueuePosition head;
|
||||
QueuePosition pos;
|
||||
QueuePosition oldpos;
|
||||
QueuePosition head;
|
||||
bool advanceTail;
|
||||
|
||||
/* page_buffer must be adequately aligned, so use a union */
|
||||
union {
|
||||
union
|
||||
{
|
||||
char buf[QUEUE_PAGESIZE];
|
||||
AsyncQueueEntry align;
|
||||
} page_buffer;
|
||||
} page_buffer;
|
||||
|
||||
/* Fetch current state */
|
||||
LWLockAcquire(AsyncQueueLock, LW_SHARED);
|
||||
@ -1829,16 +1833,16 @@ asyncQueueReadAllNotifications(void)
|
||||
* Especially we do not take into account different commit times.
|
||||
* Consider the following example:
|
||||
*
|
||||
* Backend 1: Backend 2:
|
||||
* Backend 1: Backend 2:
|
||||
*
|
||||
* transaction starts
|
||||
* NOTIFY foo;
|
||||
* commit starts
|
||||
* transaction starts
|
||||
* LISTEN foo;
|
||||
* commit starts
|
||||
* transaction starts
|
||||
* LISTEN foo;
|
||||
* commit starts
|
||||
* commit to clog
|
||||
* commit to clog
|
||||
* commit to clog
|
||||
*
|
||||
* It could happen that backend 2 sees the notification from backend 1 in
|
||||
* the queue. Even though the notifying transaction committed before
|
||||
@ -1861,7 +1865,7 @@ asyncQueueReadAllNotifications(void)
|
||||
{
|
||||
bool reachedStop;
|
||||
|
||||
do
|
||||
do
|
||||
{
|
||||
int curpage = QUEUE_POS_PAGE(pos);
|
||||
int curoffset = QUEUE_POS_OFFSET(pos);
|
||||
@ -1871,7 +1875,7 @@ asyncQueueReadAllNotifications(void)
|
||||
/*
|
||||
* We copy the data from SLRU into a local buffer, so as to avoid
|
||||
* holding the AsyncCtlLock while we are examining the entries and
|
||||
* possibly transmitting them to our frontend. Copy only the part
|
||||
* possibly transmitting them to our frontend. Copy only the part
|
||||
* of the page we will actually inspect.
|
||||
*/
|
||||
slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, curpage,
|
||||
@ -1881,7 +1885,7 @@ asyncQueueReadAllNotifications(void)
|
||||
/* we only want to read as far as head */
|
||||
copysize = QUEUE_POS_OFFSET(head) - curoffset;
|
||||
if (copysize < 0)
|
||||
copysize = 0; /* just for safety */
|
||||
copysize = 0; /* just for safety */
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1899,9 +1903,9 @@ asyncQueueReadAllNotifications(void)
|
||||
* uncommitted message.
|
||||
*
|
||||
* Our stop position is what we found to be the head's position
|
||||
* when we entered this function. It might have changed
|
||||
* already. But if it has, we will receive (or have already
|
||||
* received and queued) another signal and come here again.
|
||||
* when we entered this function. It might have changed already.
|
||||
* But if it has, we will receive (or have already received and
|
||||
* queued) another signal and come here again.
|
||||
*
|
||||
* We are not holding AsyncQueueLock here! The queue can only
|
||||
* extend beyond the head pointer (see above) and we leave our
|
||||
@ -1945,7 +1949,7 @@ asyncQueueReadAllNotifications(void)
|
||||
* and deliver relevant ones to my frontend.
|
||||
*
|
||||
* The current page must have been fetched into page_buffer from shared
|
||||
* memory. (We could access the page right in shared memory, but that
|
||||
* memory. (We could access the page right in shared memory, but that
|
||||
* would imply holding the AsyncCtlLock throughout this routine.)
|
||||
*
|
||||
* We stop if we reach the "stop" position, or reach a notification from an
|
||||
@ -1963,11 +1967,11 @@ asyncQueueProcessPageEntries(QueuePosition *current,
|
||||
{
|
||||
bool reachedStop = false;
|
||||
bool reachedEndOfPage;
|
||||
AsyncQueueEntry *qe;
|
||||
AsyncQueueEntry *qe;
|
||||
|
||||
do
|
||||
{
|
||||
QueuePosition thisentry = *current;
|
||||
QueuePosition thisentry = *current;
|
||||
|
||||
if (QUEUE_POS_EQUAL(thisentry, stop))
|
||||
break;
|
||||
@ -1975,9 +1979,9 @@ asyncQueueProcessPageEntries(QueuePosition *current,
|
||||
qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
|
||||
|
||||
/*
|
||||
* Advance *current over this message, possibly to the next page.
|
||||
* As noted in the comments for asyncQueueReadAllNotifications, we
|
||||
* must do this before possibly failing while processing the message.
|
||||
* Advance *current over this message, possibly to the next page. As
|
||||
* noted in the comments for asyncQueueReadAllNotifications, we must
|
||||
* do this before possibly failing while processing the message.
|
||||
*/
|
||||
reachedEndOfPage = asyncQueueAdvance(current, qe->length);
|
||||
|
||||
@ -1987,12 +1991,12 @@ asyncQueueProcessPageEntries(QueuePosition *current,
|
||||
if (TransactionIdDidCommit(qe->xid))
|
||||
{
|
||||
/* qe->data is the null-terminated channel name */
|
||||
char *channel = qe->data;
|
||||
char *channel = qe->data;
|
||||
|
||||
if (IsListeningOn(channel))
|
||||
{
|
||||
/* payload follows channel name */
|
||||
char *payload = qe->data + strlen(channel) + 1;
|
||||
char *payload = qe->data + strlen(channel) + 1;
|
||||
|
||||
NotifyMyFrontEnd(channel, payload, qe->srcPid);
|
||||
}
|
||||
@ -2008,12 +2012,12 @@ asyncQueueProcessPageEntries(QueuePosition *current,
|
||||
{
|
||||
/*
|
||||
* The transaction has neither committed nor aborted so far,
|
||||
* so we can't process its message yet. Break out of the loop,
|
||||
* but first back up *current so we will reprocess the message
|
||||
* next time. (Note: it is unlikely but not impossible for
|
||||
* TransactionIdDidCommit to fail, so we can't really avoid
|
||||
* this advance-then-back-up behavior when dealing with an
|
||||
* uncommitted message.)
|
||||
* so we can't process its message yet. Break out of the
|
||||
* loop, but first back up *current so we will reprocess the
|
||||
* message next time. (Note: it is unlikely but not
|
||||
* impossible for TransactionIdDidCommit to fail, so we can't
|
||||
* really avoid this advance-then-back-up behavior when
|
||||
* dealing with an uncommitted message.)
|
||||
*/
|
||||
*current = thisentry;
|
||||
reachedStop = true;
|
||||
@ -2037,11 +2041,11 @@ asyncQueueProcessPageEntries(QueuePosition *current,
|
||||
static void
|
||||
asyncQueueAdvanceTail(void)
|
||||
{
|
||||
QueuePosition min;
|
||||
int i;
|
||||
int oldtailpage;
|
||||
int newtailpage;
|
||||
int boundary;
|
||||
QueuePosition min;
|
||||
int i;
|
||||
int oldtailpage;
|
||||
int newtailpage;
|
||||
int boundary;
|
||||
|
||||
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
|
||||
min = QUEUE_HEAD;
|
||||
@ -2058,16 +2062,16 @@ asyncQueueAdvanceTail(void)
|
||||
* We can truncate something if the global tail advanced across an SLRU
|
||||
* segment boundary.
|
||||
*
|
||||
* XXX it might be better to truncate only once every several segments,
|
||||
* to reduce the number of directory scans.
|
||||
* XXX it might be better to truncate only once every several segments, to
|
||||
* reduce the number of directory scans.
|
||||
*/
|
||||
newtailpage = QUEUE_POS_PAGE(min);
|
||||
boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
|
||||
if (asyncQueuePagePrecedesLogically(oldtailpage, boundary))
|
||||
{
|
||||
/*
|
||||
* SimpleLruTruncate() will ask for AsyncCtlLock but will also
|
||||
* release the lock again.
|
||||
* SimpleLruTruncate() will ask for AsyncCtlLock but will also release
|
||||
* the lock again.
|
||||
*/
|
||||
SimpleLruTruncate(AsyncCtl, newtailpage);
|
||||
}
|
||||
@ -2104,8 +2108,8 @@ ProcessIncomingNotify(void)
|
||||
notifyInterruptOccurred = 0;
|
||||
|
||||
/*
|
||||
* We must run asyncQueueReadAllNotifications inside a transaction,
|
||||
* else bad things happen if it gets an error.
|
||||
* We must run asyncQueueReadAllNotifications inside a transaction, else
|
||||
* bad things happen if it gets an error.
|
||||
*/
|
||||
StartTransactionCommand();
|
||||
|
||||
|
Reference in New Issue
Block a user