mirror of
https://github.com/postgres/postgres.git
synced 2026-01-26 09:41:40 +03:00
Optimize LISTEN/NOTIFY via shared channel map and direct advancement.
This patch reworks LISTEN/NOTIFY to avoid waking backends that have no need to process the notification messages we just sent. The primary change is to create a shared hash table that tracks which processes are listening to which channels (where a "channel" is defined by a database OID and channel name). This allows a notifying process to accurately determine which listeners are interested, replacing the previous weak approximation that listeners in other databases couldn't be interested. Secondly, if a listener is known not to be interested and is currently stopped at the old queue head, we avoid waking it at all and just directly advance its queue pointer past the notifications we inserted. These changes permit very significant improvements (integer multiples) in NOTIFY throughput, as well as a noticeable reduction in latency, when there are many listeners but only a few are interested in any specific message. There is no improvement for the simplest case where every listener reads every message, but any loss seems below the noise level. Author: Joel Jacobson <joel@compiler.org> Reviewed-by: Tom Lane <tgl@sss.pgh.pa.us> Discussion: https://postgr.es/m/6899c044-4a82-49be-8117-e6f669765f7e@app.fastmail.com
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -372,6 +372,7 @@ SubtransBuffer "Waiting for I/O on a sub-transaction SLRU buffer."
|
||||
MultiXactOffsetBuffer "Waiting for I/O on a multixact offset SLRU buffer."
|
||||
MultiXactMemberBuffer "Waiting for I/O on a multixact member SLRU buffer."
|
||||
NotifyBuffer "Waiting for I/O on a <command>NOTIFY</command> message SLRU buffer."
|
||||
NotifyChannelHash "Waiting to access the <command>NOTIFY</command> channel hash table."
|
||||
SerialBuffer "Waiting for I/O on a serializable transaction conflict SLRU buffer."
|
||||
WALInsert "Waiting to insert WAL data into a memory buffer."
|
||||
BufferContent "Waiting to access a data page in memory."
|
||||
|
||||
@@ -102,6 +102,7 @@ PG_LWLOCKTRANCHE(SUBTRANS_BUFFER, SubtransBuffer)
|
||||
PG_LWLOCKTRANCHE(MULTIXACTOFFSET_BUFFER, MultiXactOffsetBuffer)
|
||||
PG_LWLOCKTRANCHE(MULTIXACTMEMBER_BUFFER, MultiXactMemberBuffer)
|
||||
PG_LWLOCKTRANCHE(NOTIFY_BUFFER, NotifyBuffer)
|
||||
PG_LWLOCKTRANCHE(NOTIFY_CHANNEL_HASH, NotifyChannelHash)
|
||||
PG_LWLOCKTRANCHE(SERIAL_BUFFER, SerialBuffer)
|
||||
PG_LWLOCKTRANCHE(WAL_INSERT, WALInsert)
|
||||
PG_LWLOCKTRANCHE(BUFFER_CONTENT, BufferContent)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
Parsed test spec with 3 sessions
|
||||
Parsed test spec with 7 sessions
|
||||
|
||||
starting permutation: listenc notify1 notify2 notify3 notifyf
|
||||
step listenc: LISTEN c1; LISTEN c2;
|
||||
@@ -47,6 +47,115 @@ notifier: NOTIFY "c2" with payload "payload" from notifier
|
||||
notifier: NOTIFY "c1" with payload "payloads" from notifier
|
||||
notifier: NOTIFY "c2" with payload "payloads" from notifier
|
||||
|
||||
starting permutation: listenc notifys_simple
|
||||
step listenc: LISTEN c1; LISTEN c2;
|
||||
step notifys_simple:
|
||||
BEGIN;
|
||||
SAVEPOINT s1;
|
||||
NOTIFY c1, 'simple1';
|
||||
NOTIFY c2, 'simple2';
|
||||
RELEASE SAVEPOINT s1;
|
||||
COMMIT;
|
||||
|
||||
notifier: NOTIFY "c1" with payload "simple1" from notifier
|
||||
notifier: NOTIFY "c2" with payload "simple2" from notifier
|
||||
|
||||
starting permutation: lsbegin lssavepoint lslisten lsrelease lscommit lsnotify
|
||||
step lsbegin: BEGIN;
|
||||
step lssavepoint: SAVEPOINT s1;
|
||||
step lslisten: LISTEN c1; LISTEN c2;
|
||||
step lsrelease: RELEASE SAVEPOINT s1;
|
||||
step lscommit: COMMIT;
|
||||
step lsnotify: NOTIFY c1, 'subxact_test';
|
||||
listen_subxact: NOTIFY "c1" with payload "subxact_test" from listen_subxact
|
||||
|
||||
starting permutation: lsbegin lslisten_outer lssavepoint lslisten lsrelease lscommit lsnotify
|
||||
step lsbegin: BEGIN;
|
||||
step lslisten_outer: LISTEN c3;
|
||||
step lssavepoint: SAVEPOINT s1;
|
||||
step lslisten: LISTEN c1; LISTEN c2;
|
||||
step lsrelease: RELEASE SAVEPOINT s1;
|
||||
step lscommit: COMMIT;
|
||||
step lsnotify: NOTIFY c1, 'subxact_test';
|
||||
listen_subxact: NOTIFY "c1" with payload "subxact_test" from listen_subxact
|
||||
|
||||
starting permutation: lsbegin lssavepoint lslisten lsrollback lscommit lsnotify_check
|
||||
step lsbegin: BEGIN;
|
||||
step lssavepoint: SAVEPOINT s1;
|
||||
step lslisten: LISTEN c1; LISTEN c2;
|
||||
step lsrollback: ROLLBACK TO SAVEPOINT s1;
|
||||
step lscommit: COMMIT;
|
||||
step lsnotify_check: NOTIFY c1, 'should_not_receive';
|
||||
|
||||
starting permutation: lunlisten_all notify1 lcheck
|
||||
step lunlisten_all: BEGIN; LISTEN c1; UNLISTEN *; COMMIT;
|
||||
step notify1: NOTIFY c1;
|
||||
step lcheck: SELECT 1 AS x;
|
||||
x
|
||||
-
|
||||
1
|
||||
(1 row)
|
||||
|
||||
|
||||
starting permutation: listenc notify_many_with_dup
|
||||
step listenc: LISTEN c1; LISTEN c2;
|
||||
step notify_many_with_dup:
|
||||
BEGIN;
|
||||
SELECT pg_notify('c1', 'msg' || s::text) FROM generate_series(1, 17) s;
|
||||
SELECT pg_notify('c1', 'msg1');
|
||||
COMMIT;
|
||||
|
||||
pg_notify
|
||||
---------
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
(17 rows)
|
||||
|
||||
pg_notify
|
||||
---------
|
||||
|
||||
(1 row)
|
||||
|
||||
notifier: NOTIFY "c1" with payload "msg1" from notifier
|
||||
notifier: NOTIFY "c1" with payload "msg2" from notifier
|
||||
notifier: NOTIFY "c1" with payload "msg3" from notifier
|
||||
notifier: NOTIFY "c1" with payload "msg4" from notifier
|
||||
notifier: NOTIFY "c1" with payload "msg5" from notifier
|
||||
notifier: NOTIFY "c1" with payload "msg6" from notifier
|
||||
notifier: NOTIFY "c1" with payload "msg7" from notifier
|
||||
notifier: NOTIFY "c1" with payload "msg8" from notifier
|
||||
notifier: NOTIFY "c1" with payload "msg9" from notifier
|
||||
notifier: NOTIFY "c1" with payload "msg10" from notifier
|
||||
notifier: NOTIFY "c1" with payload "msg11" from notifier
|
||||
notifier: NOTIFY "c1" with payload "msg12" from notifier
|
||||
notifier: NOTIFY "c1" with payload "msg13" from notifier
|
||||
notifier: NOTIFY "c1" with payload "msg14" from notifier
|
||||
notifier: NOTIFY "c1" with payload "msg15" from notifier
|
||||
notifier: NOTIFY "c1" with payload "msg16" from notifier
|
||||
notifier: NOTIFY "c1" with payload "msg17" from notifier
|
||||
|
||||
starting permutation: listenc llisten l2listen l3listen lslisten
|
||||
step listenc: LISTEN c1; LISTEN c2;
|
||||
step llisten: LISTEN c1; LISTEN c2;
|
||||
step l2listen: LISTEN c1;
|
||||
step l3listen: LISTEN c1;
|
||||
step lslisten: LISTEN c1; LISTEN c2;
|
||||
|
||||
starting permutation: llisten notify1 notify2 notify3 notifyf lcheck
|
||||
step llisten: LISTEN c1; LISTEN c2;
|
||||
step notify1: NOTIFY c1;
|
||||
@@ -95,6 +204,8 @@ listener: NOTIFY "c2" with payload "" from notifier
|
||||
|
||||
starting permutation: l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop
|
||||
step l2listen: LISTEN c1;
|
||||
listener2: NOTIFY "c1" with payload "" from notifier
|
||||
listener2: NOTIFY "c1" with payload "" from notifier
|
||||
step l2begin: BEGIN;
|
||||
step notify1: NOTIFY c1;
|
||||
step lbegins: BEGIN ISOLATION LEVEL SERIALIZABLE;
|
||||
@@ -104,6 +215,17 @@ step l2commit: COMMIT;
|
||||
listener2: NOTIFY "c1" with payload "" from notifier
|
||||
step l2stop: UNLISTEN *;
|
||||
|
||||
starting permutation: lch_listen nch_notify lch_check
|
||||
step lch_listen: LISTEN ch;
|
||||
step nch_notify: NOTIFY ch, 'aa';
|
||||
step lch_check: SELECT 1 AS x;
|
||||
x
|
||||
-
|
||||
1
|
||||
(1 row)
|
||||
|
||||
listener_ch: NOTIFY "ch" with payload "aa" from notifier_ch
|
||||
|
||||
starting permutation: llisten lbegin usage bignotify usage
|
||||
step llisten: LISTEN c1; LISTEN c2;
|
||||
step lbegin: BEGIN;
|
||||
|
||||
@@ -31,6 +31,20 @@ step notifys1 {
|
||||
ROLLBACK TO SAVEPOINT s2;
|
||||
COMMIT;
|
||||
}
|
||||
step notifys_simple {
|
||||
BEGIN;
|
||||
SAVEPOINT s1;
|
||||
NOTIFY c1, 'simple1';
|
||||
NOTIFY c2, 'simple2';
|
||||
RELEASE SAVEPOINT s1;
|
||||
COMMIT;
|
||||
}
|
||||
step notify_many_with_dup {
|
||||
BEGIN;
|
||||
SELECT pg_notify('c1', 'msg' || s::text) FROM generate_series(1, 17) s;
|
||||
SELECT pg_notify('c1', 'msg1');
|
||||
COMMIT;
|
||||
}
|
||||
step usage { SELECT pg_notification_queue_usage() > 0 AS nonzero; }
|
||||
step bignotify { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; }
|
||||
teardown { UNLISTEN *; }
|
||||
@@ -43,6 +57,7 @@ step lcheck { SELECT 1 AS x; }
|
||||
step lbegin { BEGIN; }
|
||||
step lbegins { BEGIN ISOLATION LEVEL SERIALIZABLE; }
|
||||
step lcommit { COMMIT; }
|
||||
step lunlisten_all { BEGIN; LISTEN c1; UNLISTEN *; COMMIT; }
|
||||
teardown { UNLISTEN *; }
|
||||
|
||||
# In some tests we need a second listener, just to block the queue.
|
||||
@@ -53,6 +68,38 @@ step l2begin { BEGIN; }
|
||||
step l2commit { COMMIT; }
|
||||
step l2stop { UNLISTEN *; }
|
||||
|
||||
# Third listener session for testing array growth.
|
||||
|
||||
session listener3
|
||||
step l3listen { LISTEN c1; }
|
||||
teardown { UNLISTEN *; }
|
||||
|
||||
# Listener session for cross-session notification test with channel 'ch'.
|
||||
|
||||
session listener_ch
|
||||
step lch_listen { LISTEN ch; }
|
||||
step lch_check { SELECT 1 AS x; }
|
||||
teardown { UNLISTEN *; }
|
||||
|
||||
# Notifier session for cross-session notification test with channel 'ch'.
|
||||
|
||||
session notifier_ch
|
||||
step nch_notify { NOTIFY ch, 'aa'; }
|
||||
|
||||
# Session for testing LISTEN in subtransaction with separate steps.
|
||||
|
||||
session listen_subxact
|
||||
step lsbegin { BEGIN; }
|
||||
step lslisten_outer { LISTEN c3; }
|
||||
step lssavepoint { SAVEPOINT s1; }
|
||||
step lslisten { LISTEN c1; LISTEN c2; }
|
||||
step lsrelease { RELEASE SAVEPOINT s1; }
|
||||
step lsrollback { ROLLBACK TO SAVEPOINT s1; }
|
||||
step lscommit { COMMIT; }
|
||||
step lsnotify { NOTIFY c1, 'subxact_test'; }
|
||||
step lsnotify_check { NOTIFY c1, 'should_not_receive'; }
|
||||
teardown { UNLISTEN *; }
|
||||
|
||||
|
||||
# Trivial cases.
|
||||
permutation listenc notify1 notify2 notify3 notifyf
|
||||
@@ -60,6 +107,27 @@ permutation listenc notify1 notify2 notify3 notifyf
|
||||
# Check simple and less-simple deduplication.
|
||||
permutation listenc notifyd1 notifyd2 notifys1
|
||||
|
||||
# Check simple NOTIFY reparenting when parent has no action.
|
||||
permutation listenc notifys_simple
|
||||
|
||||
# Check LISTEN reparenting in subtransaction.
|
||||
permutation lsbegin lssavepoint lslisten lsrelease lscommit lsnotify
|
||||
|
||||
# Check LISTEN merge path when both outer and inner transactions have actions.
|
||||
permutation lsbegin lslisten_outer lssavepoint lslisten lsrelease lscommit lsnotify
|
||||
|
||||
# Check LISTEN abort path (ROLLBACK TO SAVEPOINT discards pending actions).
|
||||
permutation lsbegin lssavepoint lslisten lsrollback lscommit lsnotify_check
|
||||
|
||||
# Check UNLISTEN * cancels a LISTEN in the same transaction.
|
||||
permutation lunlisten_all notify1 lcheck
|
||||
|
||||
# Check notification_match function (triggered by hash table duplicate detection).
|
||||
permutation listenc notify_many_with_dup
|
||||
|
||||
# Check ChannelHashAddListener array growth.
|
||||
permutation listenc llisten l2listen l3listen lslisten
|
||||
|
||||
# Cross-backend notification delivery. We use a "select 1" to force the
|
||||
# listener session to check for notifies. In principle we could just wait
|
||||
# for delivery, but that would require extra support in isolationtester
|
||||
@@ -73,6 +141,10 @@ permutation listenc llisten notify1 notify2 notify3 notifyf lcheck
|
||||
# and notify queue is not empty
|
||||
permutation l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop
|
||||
|
||||
# Check that notifications sent from a backend that has not done LISTEN
|
||||
# are properly delivered to a listener in another backend.
|
||||
permutation lch_listen nch_notify lch_check
|
||||
|
||||
# Verify that pg_notification_queue_usage correctly reports a non-zero result,
|
||||
# after submitting notifications while another connection is listening for
|
||||
# those notifications and waiting inside an active transaction. We have to
|
||||
|
||||
@@ -422,6 +422,7 @@ CatalogIdMapEntry
|
||||
CatalogIndexState
|
||||
ChangeVarNodes_callback
|
||||
ChangeVarNodes_context
|
||||
ChannelName
|
||||
CheckPoint
|
||||
CheckPointStmt
|
||||
CheckpointStatsData
|
||||
@@ -1111,6 +1112,8 @@ GistSplitUnion
|
||||
GistSplitVector
|
||||
GistTsVectorOptions
|
||||
GistVacState
|
||||
GlobalChannelEntry
|
||||
GlobalChannelKey
|
||||
GlobalTransaction
|
||||
GlobalTransactionData
|
||||
GlobalVisHorizonKind
|
||||
@@ -1580,6 +1583,7 @@ ListParsedLex
|
||||
ListenAction
|
||||
ListenActionKind
|
||||
ListenStmt
|
||||
ListenerEntry
|
||||
LoInfo
|
||||
LoadStmt
|
||||
LocalBufferLookupEnt
|
||||
@@ -2176,6 +2180,8 @@ PatternInfoArray
|
||||
Pattern_Prefix_Status
|
||||
Pattern_Type
|
||||
PendingFsyncEntry
|
||||
PendingListenAction
|
||||
PendingListenEntry
|
||||
PendingRelDelete
|
||||
PendingRelSync
|
||||
PendingUnlinkEntry
|
||||
|
||||
Reference in New Issue
Block a user