1
0
mirror of https://github.com/postgres/postgres.git synced 2025-04-22 23:02:54 +03:00

Add new function pg_notification_queue_usage.

This tells you what fraction of NOTIFY's queue is currently filled.

Brendan Jurd, reviewed by Merlin Moncure and Gurjeet Singh.  A few
further tweaks by me.
This commit is contained in:
Robert Haas 2015-07-17 09:12:03 -04:00
parent 43d89a23d5
commit a04bb65f70
10 changed files with 115 additions and 19 deletions

View File

@ -14805,6 +14805,12 @@ SELECT * FROM pg_ls_dir('.') WITH ORDINALITY AS t(ls,n);
<entry>channel names that the session is currently listening on</entry> <entry>channel names that the session is currently listening on</entry>
</row> </row>
<row>
<entry><literal><function>pg_notification_queue_usage()</function></literal></entry>
<entry><type>double</type></entry>
<entry>fraction of the asynchronous notification queue currently occupied (0-1)</entry>
</row>
<row> <row>
<entry><literal><function>pg_my_temp_schema()</function></literal></entry> <entry><literal><function>pg_my_temp_schema()</function></literal></entry>
<entry><type>oid</type></entry> <entry><type>oid</type></entry>
@ -14945,10 +14951,19 @@ SET search_path TO <replaceable>schema</> <optional>, <replaceable>schema</>, ..
<primary>pg_listening_channels</primary> <primary>pg_listening_channels</primary>
</indexterm> </indexterm>
<indexterm>
<primary>pg_notification_queue_usage</primary>
</indexterm>
<para> <para>
<function>pg_listening_channels</function> returns a set of names of <function>pg_listening_channels</function> returns a set of names of
channels that the current session is listening to. See <xref asynchronous notification channels that the current session is listening
linkend="sql-listen"> for more information. to. <function>pg_notification_queue_usage</function> returns the
fraction of the total available space for notifications currently
occupied by notifications that are waiting to be processed, as a
<type>double</type> in the range 0-1.
See <xref linkend="sql-listen"> and <xref linkend="sql-notify">
for more information.
</para> </para>
<indexterm> <indexterm>

View File

@ -165,6 +165,11 @@ NOTIFY <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable cla
cleanup. In this case you should make sure that this session ends its cleanup. In this case you should make sure that this session ends its
current transaction so that cleanup can proceed. current transaction so that cleanup can proceed.
</para> </para>
<para>
The function <function>pg_notification_queue_usage</function> returns the
fraction of the queue that is currently occupied by pending notifications.
See <xref linkend="functions-info"> for more information.
</para>
<para> <para>
A transaction that has executed <command>NOTIFY</command> cannot be A transaction that has executed <command>NOTIFY</command> cannot be
prepared for two-phase commit. prepared for two-phase commit.

View File

@ -371,6 +371,7 @@ static bool asyncQueueIsFull(void);
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength); static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
static ListCell *asyncQueueAddEntries(ListCell *nextNotify); static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
static double asyncQueueUsage(void);
static void asyncQueueFillWarning(void); static void asyncQueueFillWarning(void);
static bool SignalBackends(void); static bool SignalBackends(void);
static void asyncQueueReadAllNotifications(void); static void asyncQueueReadAllNotifications(void);
@ -1361,6 +1362,48 @@ asyncQueueAddEntries(ListCell *nextNotify)
return nextNotify; return nextNotify;
} }
/*
* SQL function to return the fraction of the notification queue currently
* occupied.
*/
Datum
pg_notification_queue_usage(PG_FUNCTION_ARGS)
{
double usage;
LWLockAcquire(AsyncQueueLock, LW_SHARED);
usage = asyncQueueUsage();
LWLockRelease(AsyncQueueLock);
PG_RETURN_FLOAT8(usage);
}
/*
* Return the fraction of the queue that is currently occupied.
*
* The caller must hold AysncQueueLock in (at least) shared mode.
*/
static double
asyncQueueUsage(void)
{
int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
int occupied;
occupied = headPage - tailPage;
if (occupied == 0)
return (double) 0; /* fast exit for common case */
if (occupied < 0)
{
/* head has wrapped around, tail not yet */
occupied += QUEUE_MAX_PAGE + 1;
}
return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
}
/* /*
* Check whether the queue is at least half full, and emit a warning if so. * Check whether the queue is at least half full, and emit a warning if so.
* *
@ -1372,25 +1415,10 @@ asyncQueueAddEntries(ListCell *nextNotify)
static void static void
asyncQueueFillWarning(void) asyncQueueFillWarning(void)
{ {
int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
int occupied;
double fillDegree; double fillDegree;
TimestampTz t; TimestampTz t;
occupied = headPage - tailPage; fillDegree = asyncQueueUsage();
if (occupied == 0)
return; /* fast exit for common case */
if (occupied < 0)
{
/* head has wrapped around, tail not yet */
occupied += QUEUE_MAX_PAGE + 1;
}
fillDegree = (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
if (fillDegree < 0.5) if (fillDegree < 0.5)
return; return;

View File

@ -53,6 +53,6 @@
*/ */
/* yyyymmddN */ /* yyyymmddN */
#define CATALOG_VERSION_NO 201507021 #define CATALOG_VERSION_NO 201507171
#endif #endif

View File

@ -4046,10 +4046,14 @@ DATA(insert OID = 2856 ( pg_timezone_names PGNSP PGUID 12 1 1000 0 0 f f f f t
DESCR("get the available time zone names"); DESCR("get the available time zone names");
DATA(insert OID = 2730 ( pg_get_triggerdef PGNSP PGUID 12 1 0 0 0 f f f f t f s 2 0 25 "26 16" _null_ _null_ _null_ _null_ _null_ pg_get_triggerdef_ext _null_ _null_ _null_ )); DATA(insert OID = 2730 ( pg_get_triggerdef PGNSP PGUID 12 1 0 0 0 f f f f t f s 2 0 25 "26 16" _null_ _null_ _null_ _null_ _null_ pg_get_triggerdef_ext _null_ _null_ _null_ ));
DESCR("trigger description with pretty-print option"); DESCR("trigger description with pretty-print option");
/* asynchronous notifications */
DATA(insert OID = 3035 ( pg_listening_channels PGNSP PGUID 12 1 10 0 0 f f f f t t s 0 0 25 "" _null_ _null_ _null_ _null_ _null_ pg_listening_channels _null_ _null_ _null_ )); DATA(insert OID = 3035 ( pg_listening_channels PGNSP PGUID 12 1 10 0 0 f f f f t t s 0 0 25 "" _null_ _null_ _null_ _null_ _null_ pg_listening_channels _null_ _null_ _null_ ));
DESCR("get the channels that the current backend listens to"); DESCR("get the channels that the current backend listens to");
DATA(insert OID = 3036 ( pg_notify PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2278 "25 25" _null_ _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_ )); DATA(insert OID = 3036 ( pg_notify PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2278 "25 25" _null_ _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_ ));
DESCR("send a notification event"); DESCR("send a notification event");
DATA(insert OID = 3296 ( pg_notification_queue_usage PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 701 "" _null_ _null_ _null_ _null_ _null_ pg_notification_queue_usage _null_ _null_ _null_ ));
DESCR("get the fraction of the asynchronous notification queue currently in use");
/* non-persistent series generator */ /* non-persistent series generator */
DATA(insert OID = 1066 ( generate_series PGNSP PGUID 12 1 1000 0 0 f f f f t t i 3 0 23 "23 23 23" _null_ _null_ _null_ _null_ _null_ generate_series_step_int4 _null_ _null_ _null_ )); DATA(insert OID = 1066 ( generate_series PGNSP PGUID 12 1 1000 0 0 f f f f t t i 3 0 23 "23 23 23" _null_ _null_ _null_ _null_ _null_ generate_series_step_int4 _null_ _null_ _null_ ));

View File

@ -37,6 +37,7 @@ extern void Async_UnlistenAll(void);
/* notify-related SQL functions */ /* notify-related SQL functions */
extern Datum pg_listening_channels(PG_FUNCTION_ARGS); extern Datum pg_listening_channels(PG_FUNCTION_ARGS);
extern Datum pg_notify(PG_FUNCTION_ARGS); extern Datum pg_notify(PG_FUNCTION_ARGS);
extern Datum pg_notification_queue_usage(PG_FUNCTION_ARGS);
/* perform (or cancel) outbound notify processing at transaction commit */ /* perform (or cancel) outbound notify processing at transaction commit */
extern void PreCommit_Notify(void); extern void PreCommit_Notify(void);

View File

@ -0,0 +1,17 @@
Parsed test spec with 2 sessions
starting permutation: listen begin check notify check
step listen: LISTEN a;
step begin: BEGIN;
step check: SELECT pg_notification_queue_usage() > 0 AS nonzero;
nonzero
f
step notify: SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s;
count
1000
step check: SELECT pg_notification_queue_usage() > 0 AS nonzero;
nonzero
t

View File

@ -0,0 +1,14 @@
# Verify that pg_notification_queue_usage correctly reports a non-zero result,
# after submitting notifications while another connection is listening for
# those notifications and waiting inside an active transaction.
session "listener"
step "listen" { LISTEN a; }
step "begin" { BEGIN; }
teardown { ROLLBACK; }
session "notifier"
step "check" { SELECT pg_notification_queue_usage() > 0 AS nonzero; }
step "notify" { SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s; }
permutation "listen" "begin" "check" "notify" "check"

View File

@ -32,3 +32,11 @@ NOTIFY notify_async2;
LISTEN notify_async2; LISTEN notify_async2;
UNLISTEN notify_async2; UNLISTEN notify_async2;
UNLISTEN *; UNLISTEN *;
-- Should return zero while there are no pending notifications.
-- src/test/isolation/specs/async-notify.spec tests for actual usage.
SELECT pg_notification_queue_usage();
pg_notification_queue_usage
-----------------------------
0
(1 row)

View File

@ -17,3 +17,7 @@ NOTIFY notify_async2;
LISTEN notify_async2; LISTEN notify_async2;
UNLISTEN notify_async2; UNLISTEN notify_async2;
UNLISTEN *; UNLISTEN *;
-- Should return zero while there are no pending notifications.
-- src/test/isolation/specs/async-notify.spec tests for actual usage.
SELECT pg_notification_queue_usage();