diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 40c42f572ed..657c591618d 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -13,19 +13,16 @@ */ /*------------------------------------------------------------------------- - * Async Notification Model as of 9.0: + * Async Notification Model as of v19: * - * 1. Multiple backends on same machine. Multiple backends listening on - * several channels. (Channels are also called "conditions" in other - * parts of the code.) + * 1. Multiple backends on same machine. Multiple backends may be listening + * on each of several channels. * * 2. There is one central queue in disk-based storage (directory pg_notify/), * with actively-used pages mapped into shared memory by the slru.c module. * All notification messages are placed in the queue and later read out - * by listening backends. - * - * There is no central knowledge of which backend listens on which channel; - * every backend has its own list of interesting channels. + * by listening backends. The single queue allows us to guarantee that + * notifications are received in commit order. * * Although there is only one queue, notifications are treated as being * database-local; this is done by including the sender's database OID @@ -62,22 +59,17 @@ * page number and the offset in that page. This is done before marking the * transaction as committed in clog. If we run into problems writing the * notifications, we can still call elog(ERROR, ...) and the transaction - * will roll back. + * will roll back safely. * * Once we have put all of the notifications into the queue, we return to * CommitTransaction() which will then do the actual transaction commit. * * After commit we are called another time (AtCommit_Notify()). Here we - * make any actual updates to the effective listen state (listenChannels). + * make any required updates to the effective listen state (see below). * Then we signal any backends that may be interested in our messages * (including our own backend, if listening). This is done by - * SignalBackends(), which scans the list of listening backends and sends a - * PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't - * know which backend is listening on which channel so we must signal them - * all). We can exclude backends that are already up to date, though, and - * we can also exclude backends that are in other databases (unless they - * are way behind and should be kicked to make them advance their - * pointers). + * SignalBackends(), which sends a PROCSIG_NOTIFY_INTERRUPT signal to + * each relevant backend, as described below. * * Finally, after we are out of the transaction altogether and about to go * idle, we scan the queue for messages that need to be sent to our @@ -109,6 +101,47 @@ * often. We make sending backends do this work if they advanced the queue * head into a new page, but only once every QUEUE_CLEANUP_DELAY pages. * + * 7. So far we have not discussed how backends change their listening state, + * nor how notification senders know which backends to awaken. To handle + * the latter, we maintain a global channel table (implemented as a dynamic + * shared hash table, or dshash) that maps channel names to the set of + * backends listening on each channel. This table is created lazily on the + * first LISTEN command and grows dynamically as needed. There is also a + * local channel table (a plain dynahash table) in each listening backend, + * tracking which channels that backend is listening to. The local table + * serves to reduce the number of accesses needed to the shared table. + * + * If the current transaction has executed any LISTEN/UNLISTEN actions, + * PreCommit_Notify() prepares to commit those. For LISTEN, it + * pre-allocates entries in both the per-backend localChannelTable and the + * shared globalChannelTable (with listening=false so that these entries + * are no-ops for the moment). It also records the final per-channel + * intent in pendingListenActions, so post-commit/abort processing can + * apply that in a single step. Since all these allocations happen before + * committing to clog, we can safely abort the transaction on failure. + * + * After commit, AtCommit_Notify() runs through pendingListenActions and + * updates the backend's per-channel listening flags to activate or + * deactivate listening. This happens before sending signals. + * + * SignalBackends() consults the shared global channel table to identify + * listeners for the channels that the current transaction sent + * notification(s) to. Each selected backend is marked as having a wakeup + * pending to avoid duplicate signals, and a PROCSIG_NOTIFY_INTERRUPT + * signal is sent to it. + * + * 8. While writing notifications, PreCommit_Notify() records the queue head + * position both before and after the write. Because all writers serialize + * on a cluster-wide heavyweight lock, no other backend can insert entries + * between these two points. SignalBackends() uses this fact to directly + * advance the queue pointer for any backend that is still positioned at + * the old head, or within the range written, but is not interested in any + * of our notifications. This avoids unnecessary wakeups for idle + * listeners that have nothing to read. Backends that are not interested + * in our notifications, but cannot be directly advanced, are signaled only + * if they are far behind the current queue head; that is to ensure that + * we can advance the queue tail without undue delay. + * * An application that listens on the same channel it notifies will get * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful, * by comparing be_pid in the NOTIFY message to the application's own backend's @@ -119,7 +152,7 @@ * The amount of shared memory used for notify management (notify_buffers) * can be varied without affecting anything but performance. The maximum * amount of notification data that can be queued at one time is determined - * by max_notify_queue_pages GUC. + * by the max_notify_queue_pages GUC. *------------------------------------------------------------------------- */ @@ -137,14 +170,17 @@ #include "commands/async.h" #include "common/hashfn.h" #include "funcapi.h" +#include "lib/dshash.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "miscadmin.h" +#include "storage/dsm_registry.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/procsignal.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/dsa.h" #include "utils/guc_hooks.h" #include "utils/memutils.h" #include "utils/ps_status.h" @@ -224,11 +260,17 @@ typedef struct QueuePosition (x).page != (y).page ? (x) : \ (x).offset > (y).offset ? (x) : (y)) +/* returns true if x comes before y in queue order */ +#define QUEUE_POS_PRECEDES(x,y) \ + (asyncQueuePagePrecedes((x).page, (y).page) || \ + ((x).page == (y).page && (x).offset < (y).offset)) + /* * Parameter determining how often we try to advance the tail pointer: * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is - * also the distance by which a backend in another database needs to be - * behind before we'll decide we need to wake it up to advance its pointer. + * also the distance by which a backend that's not interested in our + * notifications needs to be behind before we'll decide we need to wake it + * up so it can advance its pointer. * * Resist the temptation to make this really large. While that would save * work in some places, it would add cost in others. In particular, this @@ -246,6 +288,8 @@ typedef struct QueueBackendStatus Oid dboid; /* backend's database OID, or InvalidOid */ ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */ QueuePosition pos; /* backend has read queue up to here */ + bool wakeupPending; /* signal sent to backend, not yet processed */ + bool isAdvancing; /* backend is advancing its position */ } QueueBackendStatus; /* @@ -260,14 +304,18 @@ typedef struct QueueBackendStatus * (since no other backend will inspect it). * * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the - * entries of other backends and also change the head pointer. When holding - * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends - * can change the tail pointers. + * entries of other backends and also change the head pointer. They can + * also advance other backends' queue positions, unless the other backend + * has isAdvancing set (i.e., is in process of doing that itself). + * + * When holding both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE + * mode, backends can change the tail pointers. * * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as * the control lock for the pg_notify SLRU buffers. * In order to avoid deadlocks, whenever we need multiple locks, we first get - * NotifyQueueTailLock, then NotifyQueueLock, and lastly SLRU bank lock. + * NotifyQueueTailLock, then NotifyQueueLock, then SLRU bank lock, and lastly + * globalChannelTable partition locks. * * Each backend uses the backend[] array entry with index equal to its * ProcNumber. We rely on this to make SendProcSignal fast. @@ -288,6 +336,9 @@ typedef struct AsyncQueueControl ProcNumber firstListener; /* id of first listener, or * INVALID_PROC_NUMBER */ TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ + dsa_handle globalChannelTableDSA; /* global channel table's DSA handle */ + dshash_table_handle globalChannelTableDSH; /* and its dshash handle */ + /* Array with room for MaxBackends entries: */ QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; } AsyncQueueControl; @@ -301,6 +352,8 @@ static AsyncQueueControl *asyncQueueControl; #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) #define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener) #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) +#define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending) +#define QUEUE_BACKEND_IS_ADVANCING(i) (asyncQueueControl->backend[i].isAdvancing) /* * The SLRU buffer area through which we access the notification queue @@ -313,16 +366,54 @@ static SlruCtlData NotifyCtlData; #define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */ /* - * listenChannels identifies the channels we are actually listening to - * (ie, have committed a LISTEN on). It is a simple list of channel names, - * allocated in TopMemoryContext. + * Global channel table definitions + * + * This hash table maps (database OID, channel name) keys to arrays of + * ProcNumbers representing the backends listening or about to listen + * on each channel. The "listening" flags allow us to create hash table + * entries pre-commit and not have to assume that creating them post-commit + * will succeed. */ -static List *listenChannels = NIL; /* list of C strings */ +#define INITIAL_LISTENERS_ARRAY_SIZE 4 + +typedef struct GlobalChannelKey +{ + Oid dboid; + char channel[NAMEDATALEN]; +} GlobalChannelKey; + +typedef struct ListenerEntry +{ + ProcNumber procNo; /* listener's ProcNumber */ + bool listening; /* true if committed listener */ +} ListenerEntry; + +typedef struct GlobalChannelEntry +{ + GlobalChannelKey key; /* hash key */ + dsa_pointer listenersArray; /* DSA pointer to ListenerEntry array */ + int numListeners; /* Number of listeners currently stored */ + int allocatedListeners; /* Allocated size of array */ +} GlobalChannelEntry; + +static dshash_table *globalChannelTable = NULL; +static dsa_area *globalChannelDSA = NULL; + +/* + * localChannelTable caches the channel names this backend is listening on + * (including those we have staged to be listened on, but not yet committed). + * Used by IsListeningOn() for fast lookups when reading notifications. + */ +static HTAB *localChannelTable = NULL; + +/* We test this condition to detect that we're not listening at all */ +#define LocalChannelTableIsEmpty() \ + (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0) /* * State for pending LISTEN/UNLISTEN actions consists of an ordered list of * all actions requested in the current transaction. As explained above, - * we don't actually change listenChannels until we reach transaction commit. + * we don't actually change listen state until we reach transaction commit. * * The list is kept in CurTransactionContext. In subtransactions, each * subtransaction has its own list in its own CurTransactionContext, but @@ -351,6 +442,28 @@ typedef struct ActionList static ActionList *pendingActions = NULL; +/* + * Hash table recording the final listen/unlisten intent per channel for + * the current transaction. Key is channel name, value is PENDING_LISTEN or + * PENDING_UNLISTEN. This keeps critical commit/abort processing to one step + * per channel instead of replaying every action. This is built from the + * pendingActions list by PreCommit_Notify, then used by AtCommit_Notify or + * AtAbort_Notify. + */ +typedef enum +{ + PENDING_LISTEN, + PENDING_UNLISTEN, +} PendingListenAction; + +typedef struct PendingListenEntry +{ + char channel[NAMEDATALEN]; /* hash key */ + PendingListenAction action; /* which action should we perform? */ +} PendingListenEntry; + +static HTAB *pendingListenActions = NULL; + /* * State for outbound notifies consists of a list of all channels+payloads * NOTIFYed in the current transaction. We do not actually perform a NOTIFY @@ -391,6 +504,8 @@ typedef struct NotificationList int nestingLevel; /* current transaction nesting depth */ List *events; /* list of Notification structs */ HTAB *hashtab; /* hash of NotificationHash structs, or NULL */ + List *uniqueChannelNames; /* unique channel names being notified */ + HTAB *uniqueChannelHash; /* hash of unique channel names, or NULL */ struct NotificationList *upper; /* details for upper transaction levels */ } NotificationList; @@ -403,6 +518,15 @@ struct NotificationHash static NotificationList *pendingNotifies = NULL; +/* + * Hash entry in NotificationList.uniqueChannelHash or localChannelTable + * (both just carry the channel name, with no payload). + */ +typedef struct ChannelName +{ + char channel[NAMEDATALEN]; /* hash key */ +} ChannelName; + /* * Inbound notifications are initially processed by HandleNotifyInterrupt(), * called from inside a signal handler. That just sets the @@ -418,6 +542,23 @@ static bool unlistenExitRegistered = false; /* True if we're currently registered as a listener in asyncQueueControl */ static bool amRegisteredListener = false; +/* + * Queue head positions for direct advancement. + * These are captured during PreCommit_Notify while holding the heavyweight + * lock on database 0, ensuring no other backend can insert notifications + * between them. SignalBackends uses these to advance idle backends. + */ +static QueuePosition queueHeadBeforeWrite; +static QueuePosition queueHeadAfterWrite; + +/* + * Workspace arrays for SignalBackends. These are preallocated in + * PreCommit_Notify to avoid needing memory allocation after committing to + * clog. + */ +static int32 *signalPids = NULL; +static ProcNumber *signalProcnos = NULL; + /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */ static bool tryAdvanceTail = false; @@ -430,12 +571,23 @@ int max_notify_queue_pages = 1048576; /* local function prototypes */ static inline int64 asyncQueuePageDiff(int64 p, int64 q); static inline bool asyncQueuePagePrecedes(int64 p, int64 q); +static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, + const char *channel); +static dshash_hash globalChannelTableHash(const void *key, size_t size, + void *arg); +static void initGlobalChannelTable(void); +static void initLocalChannelTable(void); static void queue_listen(ListenActionKind action, const char *channel); static void Async_UnlistenOnExit(int code, Datum arg); -static void Exec_ListenPreCommit(void); -static void Exec_ListenCommit(const char *channel); -static void Exec_UnlistenCommit(const char *channel); -static void Exec_UnlistenAllCommit(void); +static void BecomeRegisteredListener(void); +static void PrepareTableEntriesForListen(const char *channel); +static void PrepareTableEntriesForUnlisten(const char *channel); +static void PrepareTableEntriesForUnlistenAll(void); +static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr, + ListenerEntry *listeners, + int idx); +static void ApplyPendingListenActions(bool isCommit); +static void CleanupListenersOnExit(void); static bool IsListeningOn(const char *channel); static void asyncQueueUnregister(void); static bool asyncQueueIsFull(void); @@ -477,6 +629,145 @@ asyncQueuePagePrecedes(int64 p, int64 q) return p < q; } +/* + * GlobalChannelKeyInit + * Prepare a global channel table key for hashing. + */ +static inline void +GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel) +{ + memset(key, 0, sizeof(GlobalChannelKey)); + key->dboid = dboid; + strlcpy(key->channel, channel, NAMEDATALEN); +} + +/* + * globalChannelTableHash + * Hash function for global channel table keys. + */ +static dshash_hash +globalChannelTableHash(const void *key, size_t size, void *arg) +{ + const GlobalChannelKey *k = (const GlobalChannelKey *) key; + dshash_hash h; + + h = DatumGetUInt32(hash_uint32(k->dboid)); + h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel, + strnlen(k->channel, NAMEDATALEN))); + + return h; +} + +/* parameters for the global channel table */ +static const dshash_parameters globalChannelTableDSHParams = { + sizeof(GlobalChannelKey), + sizeof(GlobalChannelEntry), + dshash_memcmp, + globalChannelTableHash, + dshash_memcpy, + LWTRANCHE_NOTIFY_CHANNEL_HASH +}; + +/* + * initGlobalChannelTable + * Lazy initialization of the global channel table. + */ +static void +initGlobalChannelTable(void) +{ + MemoryContext oldcontext; + + /* Quick exit if we already did this */ + if (asyncQueueControl->globalChannelTableDSH != DSHASH_HANDLE_INVALID && + globalChannelTable != NULL) + return; + + /* Otherwise, use a lock to ensure only one process creates the table */ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + + /* Be sure any local memory allocated by DSA routines is persistent */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + if (asyncQueueControl->globalChannelTableDSH == DSHASH_HANDLE_INVALID) + { + /* Initialize dynamic shared hash table for global channels */ + globalChannelDSA = dsa_create(LWTRANCHE_NOTIFY_CHANNEL_HASH); + dsa_pin(globalChannelDSA); + dsa_pin_mapping(globalChannelDSA); + globalChannelTable = dshash_create(globalChannelDSA, + &globalChannelTableDSHParams, + NULL); + + /* Store handles in shared memory for other backends to use */ + asyncQueueControl->globalChannelTableDSA = dsa_get_handle(globalChannelDSA); + asyncQueueControl->globalChannelTableDSH = + dshash_get_hash_table_handle(globalChannelTable); + } + else if (!globalChannelTable) + { + /* Attach to existing dynamic shared hash table */ + globalChannelDSA = dsa_attach(asyncQueueControl->globalChannelTableDSA); + dsa_pin_mapping(globalChannelDSA); + globalChannelTable = dshash_attach(globalChannelDSA, + &globalChannelTableDSHParams, + asyncQueueControl->globalChannelTableDSH, + NULL); + } + + MemoryContextSwitchTo(oldcontext); + LWLockRelease(NotifyQueueLock); +} + +/* + * initLocalChannelTable + * Lazy initialization of the local channel table. + * Once created, this table lasts for the life of the session. + */ +static void +initLocalChannelTable(void) +{ + HASHCTL hash_ctl; + + /* Quick exit if we already did this */ + if (localChannelTable != NULL) + return; + + /* Initialize local hash table for this backend's listened channels */ + hash_ctl.keysize = NAMEDATALEN; + hash_ctl.entrysize = sizeof(ChannelName); + + localChannelTable = + hash_create("Local Listen Channels", + 64, + &hash_ctl, + HASH_ELEM | HASH_STRINGS); +} + +/* + * initPendingListenActions + * Lazy initialization of the pending listen actions hash table. + * This is allocated in CurTransactionContext during PreCommit_Notify, + * and destroyed at transaction end. + */ +static void +initPendingListenActions(void) +{ + HASHCTL hash_ctl; + + if (pendingListenActions != NULL) + return; + + hash_ctl.keysize = NAMEDATALEN; + hash_ctl.entrysize = sizeof(PendingListenEntry); + hash_ctl.hcxt = CurTransactionContext; + + pendingListenActions = + hash_create("Pending Listen Actions", + list_length(pendingActions->actions), + &hash_ctl, + HASH_ELEM | HASH_STRINGS | HASH_CONTEXT); +} + /* * Report space needed for our shared memory area */ @@ -520,12 +811,16 @@ AsyncShmemInit(void) QUEUE_STOP_PAGE = 0; QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER; asyncQueueControl->lastQueueFillWarn = 0; + asyncQueueControl->globalChannelTableDSA = DSA_HANDLE_INVALID; + asyncQueueControl->globalChannelTableDSH = DSHASH_HANDLE_INVALID; for (int i = 0; i < MaxBackends; i++) { QUEUE_BACKEND_PID(i) = InvalidPid; QUEUE_BACKEND_DBOID(i) = InvalidOid; QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER; SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); + QUEUE_BACKEND_WAKEUP_PENDING(i) = false; + QUEUE_BACKEND_IS_ADVANCING(i) = false; } } @@ -656,6 +951,9 @@ Async_Notify(const char *channel, const char *payload) notifies->events = list_make1(n); /* We certainly don't need a hashtable yet */ notifies->hashtab = NULL; + /* We won't build uniqueChannelNames/Hash till later, either */ + notifies->uniqueChannelNames = NIL; + notifies->uniqueChannelHash = NULL; notifies->upper = pendingNotifies; pendingNotifies = notifies; } @@ -682,8 +980,8 @@ Async_Notify(const char *channel, const char *payload) * Common code for listen, unlisten, unlisten all commands. * * Adds the request to the list of pending actions. - * Actual update of the listenChannels list happens during transaction - * commit. + * Actual update of localChannelTable and globalChannelTable happens during + * PreCommit_Notify, with staged changes committed in AtCommit_Notify. */ static void queue_listen(ListenActionKind action, const char *channel) @@ -693,10 +991,9 @@ queue_listen(ListenActionKind action, const char *channel) int my_level = GetCurrentTransactionNestLevel(); /* - * Unlike Async_Notify, we don't try to collapse out duplicates. It would - * be too complicated to ensure we get the right interactions of - * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there - * would be any performance benefit anyway in sane applications. + * Unlike Async_Notify, we don't try to collapse out duplicates here. We + * keep the ordered list to preserve interactions like UNLISTEN ALL; the + * final per-channel intent is computed during PreCommit_Notify. */ oldcontext = MemoryContextSwitchTo(CurTransactionContext); @@ -782,30 +1079,49 @@ Async_UnlistenAll(void) * SQL function: return a set of the channel names this backend is actively * listening to. * - * Note: this coding relies on the fact that the listenChannels list cannot + * Note: this coding relies on the fact that the localChannelTable cannot * change within a transaction. */ Datum pg_listening_channels(PG_FUNCTION_ARGS) { FuncCallContext *funcctx; + HASH_SEQ_STATUS *status; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) { /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); + + /* Initialize hash table iteration if we have any channels */ + if (localChannelTable != NULL) + { + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS)); + hash_seq_init(status, localChannelTable); + funcctx->user_fctx = status; + MemoryContextSwitchTo(oldcontext); + } + else + { + funcctx->user_fctx = NULL; + } } /* stuff done on every call of the function */ funcctx = SRF_PERCALL_SETUP(); + status = (HASH_SEQ_STATUS *) funcctx->user_fctx; - if (funcctx->call_cntr < list_length(listenChannels)) + if (status != NULL) { - char *channel = (char *) list_nth(listenChannels, - funcctx->call_cntr); + ChannelName *entry; - SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel)); + entry = (ChannelName *) hash_seq_search(status); + if (entry != NULL) + SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(entry->channel)); } SRF_RETURN_DONE(funcctx); @@ -821,7 +1137,7 @@ pg_listening_channels(PG_FUNCTION_ARGS) static void Async_UnlistenOnExit(int code, Datum arg) { - Exec_UnlistenAllCommit(); + CleanupListenersOnExit(); asyncQueueUnregister(); } @@ -868,8 +1184,16 @@ PreCommit_Notify(void) elog(DEBUG1, "PreCommit_Notify"); /* Preflight for any pending listen/unlisten actions */ + initGlobalChannelTable(); + if (pendingActions != NULL) { + /* Ensure we have a local channel table */ + initLocalChannelTable(); + /* Create pendingListenActions hash table for this transaction */ + initPendingListenActions(); + + /* Stage all the actions this transaction wants to perform */ foreach(p, pendingActions->actions) { ListenAction *actrec = (ListenAction *) lfirst(p); @@ -877,13 +1201,14 @@ PreCommit_Notify(void) switch (actrec->action) { case LISTEN_LISTEN: - Exec_ListenPreCommit(); + BecomeRegisteredListener(); + PrepareTableEntriesForListen(actrec->channel); break; case LISTEN_UNLISTEN: - /* there is no Exec_UnlistenPreCommit() */ + PrepareTableEntriesForUnlisten(actrec->channel); break; case LISTEN_UNLISTEN_ALL: - /* there is no Exec_UnlistenAllPreCommit() */ + PrepareTableEntriesForUnlistenAll(); break; } } @@ -893,6 +1218,60 @@ PreCommit_Notify(void) if (pendingNotifies) { ListCell *nextNotify; + bool firstIteration = true; + + /* + * Build list of unique channel names being notified for use by + * SignalBackends(). + * + * If uniqueChannelHash is available, use it to efficiently get the + * unique channels. Otherwise, fall back to the O(N^2) approach. + */ + pendingNotifies->uniqueChannelNames = NIL; + if (pendingNotifies->uniqueChannelHash != NULL) + { + HASH_SEQ_STATUS status; + ChannelName *channelEntry; + + hash_seq_init(&status, pendingNotifies->uniqueChannelHash); + while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL) + pendingNotifies->uniqueChannelNames = + lappend(pendingNotifies->uniqueChannelNames, + channelEntry->channel); + } + else + { + /* O(N^2) approach is better for small number of notifications */ + foreach_ptr(Notification, n, pendingNotifies->events) + { + char *channel = n->data; + bool found = false; + + /* Name present in list? */ + foreach_ptr(char, oldchan, pendingNotifies->uniqueChannelNames) + { + if (strcmp(oldchan, channel) == 0) + { + found = true; + break; + } + } + /* Add if not already in list */ + if (!found) + pendingNotifies->uniqueChannelNames = + lappend(pendingNotifies->uniqueChannelNames, + channel); + } + } + + /* Preallocate workspace that will be needed by SignalBackends() */ + if (signalPids == NULL) + signalPids = MemoryContextAlloc(TopMemoryContext, + MaxBackends * sizeof(int32)); + + if (signalProcnos == NULL) + signalProcnos = MemoryContextAlloc(TopMemoryContext, + MaxBackends * sizeof(ProcNumber)); /* * Make sure that we have an XID assigned to the current transaction. @@ -921,6 +1300,23 @@ PreCommit_Notify(void) LockSharedObject(DatabaseRelationId, InvalidOid, 0, AccessExclusiveLock); + /* + * For the direct advancement optimization in SignalBackends(), we + * need to ensure that no other backend can insert queue entries + * between queueHeadBeforeWrite and queueHeadAfterWrite. The + * heavyweight lock above provides this guarantee, since it serializes + * all writers. + * + * Note: if the heavyweight lock were ever removed for scalability + * reasons, we could achieve the same guarantee by holding + * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather + * than releasing and reacquiring it for each page as we do below. + */ + + /* Initialize values to a safe default in case list is empty */ + SET_QUEUE_POS(queueHeadBeforeWrite, 0, 0); + SET_QUEUE_POS(queueHeadAfterWrite, 0, 0); + /* Now push the notifications into the queue */ nextNotify = list_head(pendingNotifies->events); while (nextNotify != NULL) @@ -938,12 +1334,18 @@ PreCommit_Notify(void) * point in time we can still roll the transaction back. */ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + if (firstIteration) + { + queueHeadBeforeWrite = QUEUE_HEAD; + firstIteration = false; + } asyncQueueFillWarning(); if (asyncQueueIsFull()) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("too many notifications in the NOTIFY queue"))); nextNotify = asyncQueueAddEntries(nextNotify); + queueHeadAfterWrite = QUEUE_HEAD; LWLockRelease(NotifyQueueLock); } @@ -956,7 +1358,7 @@ PreCommit_Notify(void) * * This is called at transaction commit, after committing to clog. * - * Update listenChannels and clear transaction-local state. + * Apply pending listen/unlisten changes and clear transaction-local state. * * If we issued any notifications in the transaction, send signals to * listening backends (possibly including ourselves) to process them. @@ -966,8 +1368,6 @@ PreCommit_Notify(void) void AtCommit_Notify(void) { - ListCell *p; - /* * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to * return as soon as possible @@ -978,30 +1378,11 @@ AtCommit_Notify(void) if (Trace_notify) elog(DEBUG1, "AtCommit_Notify"); - /* Perform any pending listen/unlisten actions */ - if (pendingActions != NULL) - { - foreach(p, pendingActions->actions) - { - ListenAction *actrec = (ListenAction *) lfirst(p); - - switch (actrec->action) - { - case LISTEN_LISTEN: - Exec_ListenCommit(actrec->channel); - break; - case LISTEN_UNLISTEN: - Exec_UnlistenCommit(actrec->channel); - break; - case LISTEN_UNLISTEN_ALL: - Exec_UnlistenAllCommit(); - break; - } - } - } + /* Apply staged listen/unlisten changes */ + ApplyPendingListenActions(true); /* If no longer listening to anything, get out of listener array */ - if (amRegisteredListener && listenChannels == NIL) + if (amRegisteredListener && LocalChannelTableIsEmpty()) asyncQueueUnregister(); /* @@ -1032,12 +1413,12 @@ AtCommit_Notify(void) } /* - * Exec_ListenPreCommit --- subroutine for PreCommit_Notify + * BecomeRegisteredListener --- subroutine for PreCommit_Notify * * This function must make sure we are ready to catch any incoming messages. */ static void -Exec_ListenPreCommit(void) +BecomeRegisteredListener(void) { QueuePosition head; QueuePosition max; @@ -1051,7 +1432,7 @@ Exec_ListenPreCommit(void) return; if (Trace_notify) - elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid); + elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid); /* * Before registering, make sure we will unlisten before dying. (Note: @@ -1098,6 +1479,8 @@ Exec_ListenPreCommit(void) QUEUE_BACKEND_POS(MyProcNumber) = max; QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid; QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId; + QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false; + QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false; /* Insert backend into list of listeners at correct position */ if (prevListener != INVALID_PROC_NUMBER) { @@ -1127,99 +1510,393 @@ Exec_ListenPreCommit(void) } /* - * Exec_ListenCommit --- subroutine for AtCommit_Notify + * PrepareTableEntriesForListen --- subroutine for PreCommit_Notify * - * Add the channel to the list of channels we are listening on. + * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating + * an entry in localChannelTable, and pre-allocating an entry in the shared + * globalChannelTable with listening=false. The listening flag will be set + * to true in AtCommit_Notify. If we abort later, unwanted table entries + * will be removed. */ static void -Exec_ListenCommit(const char *channel) +PrepareTableEntriesForListen(const char *channel) { - MemoryContext oldcontext; - - /* Do nothing if we are already listening on this channel */ - if (IsListeningOn(channel)) - return; + GlobalChannelKey key; + GlobalChannelEntry *entry; + bool found; + ListenerEntry *listeners; + PendingListenEntry *pending; /* - * Add the new channel name to listenChannels. - * - * XXX It is theoretically possible to get an out-of-memory failure here, - * which would be bad because we already committed. For the moment it - * doesn't seem worth trying to guard against that, but maybe improve this - * later. + * Record in local pending hash that we want to LISTEN, overwriting any + * earlier attempt to UNLISTEN. */ - oldcontext = MemoryContextSwitchTo(TopMemoryContext); - listenChannels = lappend(listenChannels, pstrdup(channel)); - MemoryContextSwitchTo(oldcontext); -} + pending = (PendingListenEntry *) + hash_search(pendingListenActions, channel, HASH_ENTER, NULL); + pending->action = PENDING_LISTEN; -/* - * Exec_UnlistenCommit --- subroutine for AtCommit_Notify - * - * Remove the specified channel name from listenChannels. - */ -static void -Exec_UnlistenCommit(const char *channel) -{ - ListCell *q; + /* + * Ensure that there is an entry for the channel in localChannelTable. + * (Should this fail, we can just roll back.) If the transaction fails + * after this point, we will remove the entry if appropriate during + * ApplyPendingListenActions. Note that this entry allows IsListeningOn() + * to return TRUE; we assume nothing is going to consult that before + * AtCommit_Notify/AtAbort_Notify. However, if later actions attempt to + * UNLISTEN this channel or UNLISTEN *, we need to have the local entry + * present to ensure they do the right things; see + * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll. + */ + (void) hash_search(localChannelTable, channel, HASH_ENTER, NULL); - if (Trace_notify) - elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid); + /* Pre-allocate entry in shared globalChannelTable with listening=false */ + GlobalChannelKeyInit(&key, MyDatabaseId, channel); + entry = dshash_find_or_insert(globalChannelTable, &key, &found); - foreach(q, listenChannels) + if (!found) { - char *lchan = (char *) lfirst(q); - - if (strcmp(lchan, channel) == 0) - { - listenChannels = foreach_delete_current(listenChannels, q); - pfree(lchan); - break; - } + /* New channel entry, so initialize it to a safe state */ + entry->listenersArray = InvalidDsaPointer; + entry->numListeners = 0; + entry->allocatedListeners = 0; } /* - * We do not complain about unlistening something not being listened; - * should we? + * Create listenersArray if entry doesn't have one. It's tempting to fold + * this into the !found case, but this coding allows us to cope in case + * dsa_allocate() failed in an earlier attempt. */ + if (!DsaPointerIsValid(entry->listenersArray)) + { + entry->listenersArray = dsa_allocate(globalChannelDSA, + sizeof(ListenerEntry) * INITIAL_LISTENERS_ARRAY_SIZE); + entry->allocatedListeners = INITIAL_LISTENERS_ARRAY_SIZE; + } + + listeners = (ListenerEntry *) + dsa_get_address(globalChannelDSA, entry->listenersArray); + + /* + * Check if we already have a ListenerEntry (possibly from earlier in this + * transaction) + */ + for (int i = 0; i < entry->numListeners; i++) + { + if (listeners[i].procNo == MyProcNumber) + { + /* Already have an entry; listening flag stays as-is until commit */ + dshash_release_lock(globalChannelTable, entry); + return; + } + } + + /* Need to add a new entry; grow array if necessary */ + if (entry->numListeners >= entry->allocatedListeners) + { + int new_size = entry->allocatedListeners * 2; + dsa_pointer old_array = entry->listenersArray; + dsa_pointer new_array = dsa_allocate(globalChannelDSA, + sizeof(ListenerEntry) * new_size); + ListenerEntry *new_listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA, new_array); + + memcpy(new_listeners, listeners, sizeof(ListenerEntry) * entry->numListeners); + entry->listenersArray = new_array; + entry->allocatedListeners = new_size; + dsa_free(globalChannelDSA, old_array); + listeners = new_listeners; + } + + listeners[entry->numListeners].procNo = MyProcNumber; + listeners[entry->numListeners].listening = false; /* staged, not yet + * committed */ + entry->numListeners++; + + dshash_release_lock(globalChannelTable, entry); } /* - * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify + * PrepareTableEntriesForUnlisten --- subroutine for PreCommit_Notify * - * Unlisten on all channels for this backend. + * Prepare an UNLISTEN by recording it in pendingListenActions, but only if + * we're currently listening (committed or staged). We don't touch + * globalChannelTable yet - the listener keeps receiving signals until + * commit, when the entry is removed. */ static void -Exec_UnlistenAllCommit(void) +PrepareTableEntriesForUnlisten(const char *channel) { - if (Trace_notify) - elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid); + PendingListenEntry *pending; - list_free_deep(listenChannels); - listenChannels = NIL; + /* + * If the channel name is not in localChannelTable, then we are neither + * listening on it nor preparing to listen on it, so we don't need to + * record an UNLISTEN action. + */ + Assert(localChannelTable != NULL); + if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL) + return; + + /* + * Record in local pending hash that we want to UNLISTEN, overwriting any + * earlier attempt to LISTEN. Don't touch localChannelTable or + * globalChannelTable yet - we keep receiving signals until commit. + */ + pending = (PendingListenEntry *) + hash_search(pendingListenActions, channel, HASH_ENTER, NULL); + pending->action = PENDING_UNLISTEN; +} + +/* + * PrepareTableEntriesForUnlistenAll --- subroutine for PreCommit_Notify + * + * Prepare UNLISTEN * by recording an UNLISTEN for all listened or + * about-to-be-listened channels in pendingListenActions. + */ +static void +PrepareTableEntriesForUnlistenAll(void) +{ + HASH_SEQ_STATUS seq; + ChannelName *channelEntry; + PendingListenEntry *pending; + + /* + * Scan localChannelTable, which will have the names of all channels that + * we are listening on or have prepared to listen on. Record an UNLISTEN + * action for each one, overwriting any earlier attempt to LISTEN. + */ + hash_seq_init(&seq, localChannelTable); + while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL) + { + pending = (PendingListenEntry *) + hash_search(pendingListenActions, channelEntry->channel, HASH_ENTER, NULL); + pending->action = PENDING_UNLISTEN; + } +} + +/* + * RemoveListenerFromChannel --- remove idx'th listener from global channel entry + * + * Decrements numListeners, compacts the array, and frees the entry if empty. + * Sets *entry_ptr to NULL if the entry was deleted. + * + * We could get the listeners pointer from the entry, but all callers + * already have it at hand. + */ +static void +RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr, + ListenerEntry *listeners, + int idx) +{ + GlobalChannelEntry *entry = *entry_ptr; + + entry->numListeners--; + if (idx < entry->numListeners) + memmove(&listeners[idx], &listeners[idx + 1], + sizeof(ListenerEntry) * (entry->numListeners - idx)); + + if (entry->numListeners == 0) + { + dsa_free(globalChannelDSA, entry->listenersArray); + dshash_delete_entry(globalChannelTable, entry); + /* tells caller not to release the entry's lock: */ + *entry_ptr = NULL; + } +} + +/* + * ApplyPendingListenActions + * + * Apply, or revert, staged listen/unlisten changes to the local and global + * hash tables. + */ +static void +ApplyPendingListenActions(bool isCommit) +{ + HASH_SEQ_STATUS seq; + PendingListenEntry *pending; + + /* Quick exit if nothing to do */ + if (pendingListenActions == NULL) + return; + + /* We made a globalChannelTable before building pendingListenActions */ + if (globalChannelTable == NULL) + elog(PANIC, "global channel table missing post-commit/abort"); + + /* For each staged action ... */ + hash_seq_init(&seq, pendingListenActions); + while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL) + { + GlobalChannelKey key; + GlobalChannelEntry *entry; + bool removeLocal = true; + bool foundListener = false; + + /* + * Find the global entry for this channel. If isCommit, it had better + * exist (it was created in PreCommit). In an abort, it might not + * exist, in which case we are not listening and should discard any + * local entry that PreCommit may have managed to create. + */ + GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel); + entry = dshash_find(globalChannelTable, &key, true); + if (entry != NULL) + { + /* Scan entry to find the ListenerEntry for this backend */ + ListenerEntry *listeners; + + listeners = (ListenerEntry *) + dsa_get_address(globalChannelDSA, entry->listenersArray); + + for (int i = 0; i < entry->numListeners; i++) + { + if (listeners[i].procNo != MyProcNumber) + continue; + foundListener = true; + if (isCommit) + { + if (pending->action == PENDING_LISTEN) + { + /* + * LISTEN being committed: set listening=true. + * localChannelTable entry was created during + * PreCommit and should be kept. + */ + listeners[i].listening = true; + removeLocal = false; + } + else + { + /* + * UNLISTEN being committed: remove pre-allocated + * entries from both tables. + */ + RemoveListenerFromChannel(&entry, listeners, i); + } + } + else + { + /* + * Note: this part is reachable only if the transaction + * aborts after PreCommit_Notify() has made some + * pendingListenActions entries, so it's pretty hard to + * test. + */ + if (!listeners[i].listening) + { + /* + * Staged LISTEN (or LISTEN+UNLISTEN) being aborted, + * and we weren't listening before, so remove + * pre-allocated entries from both tables. + */ + RemoveListenerFromChannel(&entry, listeners, i); + } + else + { + /* + * We're aborting, but the previous state was that + * we're listening, so keep localChannelTable entry. + */ + removeLocal = false; + } + } + break; /* there shouldn't be another match */ + } + + /* We might have already released the entry by removing it */ + if (entry != NULL) + dshash_release_lock(globalChannelTable, entry); + } + + /* + * If we're committing a LISTEN action, we should have found a + * matching ListenerEntry, but otherwise it's okay if we didn't. + */ + if (isCommit && pending->action == PENDING_LISTEN && !foundListener) + elog(PANIC, "could not find listener entry for channel \"%s\" backend %d", + pending->channel, MyProcNumber); + + /* + * If we did not find a globalChannelTable entry for our backend, or + * if we are unlistening, remove any localChannelTable entry that may + * exist. (Note in particular that this cleans up if we created a + * localChannelTable entry and then failed while trying to create a + * globalChannelTable entry.) + */ + if (removeLocal && localChannelTable != NULL) + (void) hash_search(localChannelTable, pending->channel, + HASH_REMOVE, NULL); + } +} + +/* + * CleanupListenersOnExit --- called from Async_UnlistenOnExit + * + * Remove this backend from all channels in the shared global table. + */ +static void +CleanupListenersOnExit(void) +{ + dshash_seq_status status; + GlobalChannelEntry *entry; + + if (Trace_notify) + elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid); + + /* Clear our local cache (not really necessary, but be consistent) */ + if (localChannelTable != NULL) + { + hash_destroy(localChannelTable); + localChannelTable = NULL; + } + + /* Now remove our entries from the shared globalChannelTable */ + if (globalChannelTable == NULL) + return; + + dshash_seq_init(&status, globalChannelTable, true); + while ((entry = dshash_seq_next(&status)) != NULL) + { + ListenerEntry *listeners; + + if (entry->key.dboid != MyDatabaseId) + continue; /* not relevant */ + + listeners = (ListenerEntry *) + dsa_get_address(globalChannelDSA, entry->listenersArray); + + for (int i = 0; i < entry->numListeners; i++) + { + if (listeners[i].procNo == MyProcNumber) + { + entry->numListeners--; + if (i < entry->numListeners) + memmove(&listeners[i], &listeners[i + 1], + sizeof(ListenerEntry) * (entry->numListeners - i)); + + if (entry->numListeners == 0) + { + dsa_free(globalChannelDSA, entry->listenersArray); + dshash_delete_current(&status); + } + break; + } + } + } + dshash_seq_term(&status); } /* * Test whether we are actively listening on the given channel name. * * Note: this function is executed for every notification found in the queue. - * Perhaps it is worth further optimization, eg convert the list to a sorted - * array so we can binary-search it. In practice the list is likely to be - * fairly short, though. */ static bool IsListeningOn(const char *channel) { - ListCell *p; + if (localChannelTable == NULL) + return false; - foreach(p, listenChannels) - { - char *lchan = (char *) lfirst(p); - - if (strcmp(lchan, channel) == 0) - return true; - } - return false; + return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL); } /* @@ -1229,7 +1906,7 @@ IsListeningOn(const char *channel) static void asyncQueueUnregister(void) { - Assert(listenChannels == NIL); /* else caller error */ + Assert(LocalChannelTableIsEmpty()); /* else caller error */ if (!amRegisteredListener) /* nothing to do */ return; @@ -1241,6 +1918,8 @@ asyncQueueUnregister(void) /* Mark our entry as invalid */ QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid; QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid; + QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false; + QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false; /* and remove it from the list */ if (QUEUE_FIRST_LISTENER == MyProcNumber) QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber); @@ -1565,11 +2244,9 @@ asyncQueueFillWarning(void) /* * Send signals to listening backends. * - * Normally we signal only backends in our own database, since only those - * backends could be interested in notifies we send. However, if there's - * notify traffic in our database but no traffic in another database that - * does have listener(s), those listeners will fall further and further - * behind. Waken them anyway if they're far enough behind, so that they'll + * Normally we signal only backends that are interested in the notifies that + * we just sent. However, that will leave idle listeners falling further and + * further behind. Waken them anyway if they're far enough behind, so they'll * advance their queue position pointers, allowing the global tail to advance. * * Since we know the ProcNumber and the Pid the signaling is quite cheap. @@ -1580,60 +2257,124 @@ asyncQueueFillWarning(void) static void SignalBackends(void) { - int32 *pids; - ProcNumber *procnos; int count; + /* Can't get here without PreCommit_Notify having made the global table */ + Assert(globalChannelTable != NULL); + + /* It should have set up these arrays, too */ + Assert(signalPids != NULL && signalProcnos != NULL); + /* * Identify backends that we need to signal. We don't want to send - * signals while holding the NotifyQueueLock, so this loop just builds a - * list of target PIDs. - * - * XXX in principle these pallocs could fail, which would be bad. Maybe - * preallocate the arrays? They're not that large, though. + * signals while holding the NotifyQueueLock, so this part just builds a + * list of target PIDs in signalPids[] and signalProcnos[]. */ - pids = (int32 *) palloc(MaxBackends * sizeof(int32)); - procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber)); count = 0; LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + + /* Scan each channel name that we notified in this transaction */ + foreach_ptr(char, channel, pendingNotifies->uniqueChannelNames) + { + GlobalChannelKey key; + GlobalChannelEntry *entry; + ListenerEntry *listeners; + + GlobalChannelKeyInit(&key, MyDatabaseId, channel); + entry = dshash_find(globalChannelTable, &key, false); + if (entry == NULL) + continue; /* nobody is listening */ + + listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA, + entry->listenersArray); + + /* Identify listeners that now need waking, add them to arrays */ + for (int j = 0; j < entry->numListeners; j++) + { + ProcNumber i; + int32 pid; + QueuePosition pos; + + if (!listeners[j].listening) + continue; /* ignore not-yet-committed listeners */ + + i = listeners[j].procNo; + + if (QUEUE_BACKEND_WAKEUP_PENDING(i)) + continue; /* already signaled, no need to repeat */ + + pid = QUEUE_BACKEND_PID(i); + pos = QUEUE_BACKEND_POS(i); + + if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) + continue; /* it's fully caught up already */ + + Assert(pid != InvalidPid); + + QUEUE_BACKEND_WAKEUP_PENDING(i) = true; + signalPids[count] = pid; + signalProcnos[count] = i; + count++; + } + + dshash_release_lock(globalChannelTable, entry); + } + + /* + * Scan all listeners. Any that are not already pending wakeup must not + * be interested in our notifications (else we'd have set their wakeup + * flags above). Check to see if we can directly advance their queue + * pointers to save a wakeup. Otherwise, if they are far behind, wake + * them anyway so they will catch up. + */ for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) { - int32 pid = QUEUE_BACKEND_PID(i); + int32 pid; QueuePosition pos; - Assert(pid != InvalidPid); + if (QUEUE_BACKEND_WAKEUP_PENDING(i)) + continue; + + /* If it's currently advancing, we should not touch it */ + if (QUEUE_BACKEND_IS_ADVANCING(i)) + continue; + + pid = QUEUE_BACKEND_PID(i); pos = QUEUE_BACKEND_POS(i); - if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) + + /* + * We can directly advance the other backend's queue pointer if it's + * not currently advancing (else there are race conditions), and its + * current pointer is not behind queueHeadBeforeWrite (else we'd make + * it miss some older messages), and we'd not be moving the pointer + * backward. + */ + if (!QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite) && + QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite)) { - /* - * Always signal listeners in our own database, unless they're - * already caught up (unlikely, but possible). - */ - if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) - continue; + /* We can directly advance its pointer past what we wrote */ + QUEUE_BACKEND_POS(i) = queueHeadAfterWrite; } - else + else if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), + QUEUE_POS_PAGE(pos)) >= QUEUE_CLEANUP_DELAY) { - /* - * Listeners in other databases should be signaled only if they - * are far behind. - */ - if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), - QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY) - continue; + /* It's idle and far behind, so wake it up */ + Assert(pid != InvalidPid); + + QUEUE_BACKEND_WAKEUP_PENDING(i) = true; + signalPids[count] = pid; + signalProcnos[count] = i; + count++; } - /* OK, need to signal this one */ - pids[count] = pid; - procnos[count] = i; - count++; } + LWLockRelease(NotifyQueueLock); /* Now send signals */ for (int i = 0; i < count; i++) { - int32 pid = pids[i]; + int32 pid = signalPids[i]; /* * If we are signaling our own process, no need to involve the kernel; @@ -1651,12 +2392,9 @@ SignalBackends(void) * NotifyQueueLock; which is unlikely but certainly possible. So we * just log a low-level debug message if it happens. */ - if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0) + if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, signalProcnos[i]) < 0) elog(DEBUG3, "could not signal backend with PID %d: %m", pid); } - - pfree(pids); - pfree(procnos); } /* @@ -1664,18 +2402,18 @@ SignalBackends(void) * * This is called at transaction abort. * - * Gets rid of pending actions and outbound notifies that we would have - * executed if the transaction got committed. + * Revert any staged listen/unlisten changes and clean up transaction state. + * This only does anything if we abort after PreCommit_Notify has staged + * some entries. */ void AtAbort_Notify(void) { - /* - * If we LISTEN but then roll back the transaction after PreCommit_Notify, - * we have registered as a listener but have not made any entry in - * listenChannels. In that case, deregister again. - */ - if (amRegisteredListener && listenChannels == NIL) + /* Revert staged listen/unlisten changes */ + ApplyPendingListenActions(false); + + /* If we're no longer listening on anything, unregister */ + if (amRegisteredListener && LocalChannelTableIsEmpty()) asyncQueueUnregister(); /* And clean up */ @@ -1854,20 +2592,27 @@ asyncQueueReadAllNotifications(void) QueuePosition head; Snapshot snapshot; - /* Fetch current state */ + /* + * Fetch current state, indicate to others that we have woken up, and that + * we are in process of advancing our position. + */ LWLockAcquire(NotifyQueueLock, LW_SHARED); /* Assert checks that we have a valid state entry */ Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber)); + QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false; pos = QUEUE_BACKEND_POS(MyProcNumber); head = QUEUE_HEAD; - LWLockRelease(NotifyQueueLock); if (QUEUE_POS_EQUAL(pos, head)) { /* Nothing to do, we have read all notifications already. */ + LWLockRelease(NotifyQueueLock); return; } + QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = true; + LWLockRelease(NotifyQueueLock); + /*---------- * Get snapshot we'll use to decide which xacts are still in progress. * This is trickier than it might seem, because of race conditions. @@ -1902,7 +2647,7 @@ asyncQueueReadAllNotifications(void) * * What we do guarantee is that we'll see all notifications from * transactions committing after the snapshot we take here. - * Exec_ListenPreCommit has already added us to the listener array, + * BecomeRegisteredListener has already added us to the listener array, * so no not-yet-committed messages can be removed from the queue * before we see them. *---------- @@ -1955,6 +2700,7 @@ asyncQueueReadAllNotifications(void) /* Update shared state */ LWLockAcquire(NotifyQueueLock, LW_SHARED); QUEUE_BACKEND_POS(MyProcNumber) = pos; + QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false; LWLockRelease(NotifyQueueLock); ExitOnAnyError = save_ExitOnAnyError; @@ -2049,9 +2795,11 @@ asyncQueueProcessPageEntries(QueuePosition *current, * that if there's a bad entry in the queue for which * TransactionIdDidCommit() fails for some reason, we can skip * over it on the first LISTEN in a session, and not get stuck on - * it indefinitely. + * it indefinitely. (This is a little trickier than it looks: it + * works because BecomeRegisteredListener runs this code before we + * have made the first entry in localChannelTable.) */ - if (listenChannels == NIL) + if (LocalChannelTableIsEmpty()) continue; if (TransactionIdDidCommit(qe->xid)) @@ -2306,7 +3054,7 @@ ProcessIncomingNotify(bool flush) notifyInterruptPending = false; /* Do nothing else if we aren't actively listening */ - if (listenChannels == NIL) + if (LocalChannelTableIsEmpty()) return; if (Trace_notify) @@ -2410,7 +3158,7 @@ AddEventToPendingNotifies(Notification *n) { Assert(pendingNotifies->events != NIL); - /* Create the hash table if it's time to */ + /* Create the hash tables if it's time to */ if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES && pendingNotifies->hashtab == NULL) { @@ -2429,10 +3177,22 @@ AddEventToPendingNotifies(Notification *n) &hash_ctl, HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT); + /* Create the unique channel name table */ + Assert(pendingNotifies->uniqueChannelHash == NULL); + hash_ctl.keysize = NAMEDATALEN; + hash_ctl.entrysize = sizeof(ChannelName); + hash_ctl.hcxt = CurTransactionContext; + pendingNotifies->uniqueChannelHash = + hash_create("Pending Notify Channel Names", + 64L, + &hash_ctl, + HASH_ELEM | HASH_STRINGS | HASH_CONTEXT); + /* Insert all the already-existing events */ foreach(l, pendingNotifies->events) { Notification *oldn = (Notification *) lfirst(l); + char *channel = oldn->data; bool found; (void) hash_search(pendingNotifies->hashtab, @@ -2440,15 +3200,22 @@ AddEventToPendingNotifies(Notification *n) HASH_ENTER, &found); Assert(!found); + + /* Add channel name to uniqueChannelHash; might be there already */ + (void) hash_search(pendingNotifies->uniqueChannelHash, + channel, + HASH_ENTER, + NULL); } } /* Add new event to the list, in order */ pendingNotifies->events = lappend(pendingNotifies->events, n); - /* Add event to the hash table if needed */ + /* Add event to the hash tables if needed */ if (pendingNotifies->hashtab != NULL) { + char *channel = n->data; bool found; (void) hash_search(pendingNotifies->hashtab, @@ -2456,6 +3223,12 @@ AddEventToPendingNotifies(Notification *n) HASH_ENTER, &found); Assert(!found); + + /* Add channel name to uniqueChannelHash; might be there already */ + (void) hash_search(pendingNotifies->uniqueChannelHash, + channel, + HASH_ENTER, + NULL); } } @@ -2505,6 +3278,8 @@ ClearPendingActionsAndNotifies(void) */ pendingActions = NULL; pendingNotifies = NULL; + /* Also clear pendingListenActions, which is derived from pendingActions */ + pendingListenActions = NULL; } /* diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 3299de23bb3..7194aee3532 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -372,6 +372,7 @@ SubtransBuffer "Waiting for I/O on a sub-transaction SLRU buffer." MultiXactOffsetBuffer "Waiting for I/O on a multixact offset SLRU buffer." MultiXactMemberBuffer "Waiting for I/O on a multixact member SLRU buffer." NotifyBuffer "Waiting for I/O on a NOTIFY message SLRU buffer." +NotifyChannelHash "Waiting to access the NOTIFY channel hash table." SerialBuffer "Waiting for I/O on a serializable transaction conflict SLRU buffer." WALInsert "Waiting to insert WAL data into a memory buffer." BufferContent "Waiting to access a data page in memory." diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index 94f818b9f10..e6b32daff99 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -102,6 +102,7 @@ PG_LWLOCKTRANCHE(SUBTRANS_BUFFER, SubtransBuffer) PG_LWLOCKTRANCHE(MULTIXACTOFFSET_BUFFER, MultiXactOffsetBuffer) PG_LWLOCKTRANCHE(MULTIXACTMEMBER_BUFFER, MultiXactMemberBuffer) PG_LWLOCKTRANCHE(NOTIFY_BUFFER, NotifyBuffer) +PG_LWLOCKTRANCHE(NOTIFY_CHANNEL_HASH, NotifyChannelHash) PG_LWLOCKTRANCHE(SERIAL_BUFFER, SerialBuffer) PG_LWLOCKTRANCHE(WAL_INSERT, WALInsert) PG_LWLOCKTRANCHE(BUFFER_CONTENT, BufferContent) diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out index 556e1805893..5d6bcce2b02 100644 --- a/src/test/isolation/expected/async-notify.out +++ b/src/test/isolation/expected/async-notify.out @@ -1,4 +1,4 @@ -Parsed test spec with 3 sessions +Parsed test spec with 7 sessions starting permutation: listenc notify1 notify2 notify3 notifyf step listenc: LISTEN c1; LISTEN c2; @@ -47,6 +47,115 @@ notifier: NOTIFY "c2" with payload "payload" from notifier notifier: NOTIFY "c1" with payload "payloads" from notifier notifier: NOTIFY "c2" with payload "payloads" from notifier +starting permutation: listenc notifys_simple +step listenc: LISTEN c1; LISTEN c2; +step notifys_simple: + BEGIN; + SAVEPOINT s1; + NOTIFY c1, 'simple1'; + NOTIFY c2, 'simple2'; + RELEASE SAVEPOINT s1; + COMMIT; + +notifier: NOTIFY "c1" with payload "simple1" from notifier +notifier: NOTIFY "c2" with payload "simple2" from notifier + +starting permutation: lsbegin lssavepoint lslisten lsrelease lscommit lsnotify +step lsbegin: BEGIN; +step lssavepoint: SAVEPOINT s1; +step lslisten: LISTEN c1; LISTEN c2; +step lsrelease: RELEASE SAVEPOINT s1; +step lscommit: COMMIT; +step lsnotify: NOTIFY c1, 'subxact_test'; +listen_subxact: NOTIFY "c1" with payload "subxact_test" from listen_subxact + +starting permutation: lsbegin lslisten_outer lssavepoint lslisten lsrelease lscommit lsnotify +step lsbegin: BEGIN; +step lslisten_outer: LISTEN c3; +step lssavepoint: SAVEPOINT s1; +step lslisten: LISTEN c1; LISTEN c2; +step lsrelease: RELEASE SAVEPOINT s1; +step lscommit: COMMIT; +step lsnotify: NOTIFY c1, 'subxact_test'; +listen_subxact: NOTIFY "c1" with payload "subxact_test" from listen_subxact + +starting permutation: lsbegin lssavepoint lslisten lsrollback lscommit lsnotify_check +step lsbegin: BEGIN; +step lssavepoint: SAVEPOINT s1; +step lslisten: LISTEN c1; LISTEN c2; +step lsrollback: ROLLBACK TO SAVEPOINT s1; +step lscommit: COMMIT; +step lsnotify_check: NOTIFY c1, 'should_not_receive'; + +starting permutation: lunlisten_all notify1 lcheck +step lunlisten_all: BEGIN; LISTEN c1; UNLISTEN *; COMMIT; +step notify1: NOTIFY c1; +step lcheck: SELECT 1 AS x; +x +- +1 +(1 row) + + +starting permutation: listenc notify_many_with_dup +step listenc: LISTEN c1; LISTEN c2; +step notify_many_with_dup: + BEGIN; + SELECT pg_notify('c1', 'msg' || s::text) FROM generate_series(1, 17) s; + SELECT pg_notify('c1', 'msg1'); + COMMIT; + +pg_notify +--------- + + + + + + + + + + + + + + + + + +(17 rows) + +pg_notify +--------- + +(1 row) + +notifier: NOTIFY "c1" with payload "msg1" from notifier +notifier: NOTIFY "c1" with payload "msg2" from notifier +notifier: NOTIFY "c1" with payload "msg3" from notifier +notifier: NOTIFY "c1" with payload "msg4" from notifier +notifier: NOTIFY "c1" with payload "msg5" from notifier +notifier: NOTIFY "c1" with payload "msg6" from notifier +notifier: NOTIFY "c1" with payload "msg7" from notifier +notifier: NOTIFY "c1" with payload "msg8" from notifier +notifier: NOTIFY "c1" with payload "msg9" from notifier +notifier: NOTIFY "c1" with payload "msg10" from notifier +notifier: NOTIFY "c1" with payload "msg11" from notifier +notifier: NOTIFY "c1" with payload "msg12" from notifier +notifier: NOTIFY "c1" with payload "msg13" from notifier +notifier: NOTIFY "c1" with payload "msg14" from notifier +notifier: NOTIFY "c1" with payload "msg15" from notifier +notifier: NOTIFY "c1" with payload "msg16" from notifier +notifier: NOTIFY "c1" with payload "msg17" from notifier + +starting permutation: listenc llisten l2listen l3listen lslisten +step listenc: LISTEN c1; LISTEN c2; +step llisten: LISTEN c1; LISTEN c2; +step l2listen: LISTEN c1; +step l3listen: LISTEN c1; +step lslisten: LISTEN c1; LISTEN c2; + starting permutation: llisten notify1 notify2 notify3 notifyf lcheck step llisten: LISTEN c1; LISTEN c2; step notify1: NOTIFY c1; @@ -95,6 +204,8 @@ listener: NOTIFY "c2" with payload "" from notifier starting permutation: l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop step l2listen: LISTEN c1; +listener2: NOTIFY "c1" with payload "" from notifier +listener2: NOTIFY "c1" with payload "" from notifier step l2begin: BEGIN; step notify1: NOTIFY c1; step lbegins: BEGIN ISOLATION LEVEL SERIALIZABLE; @@ -104,6 +215,17 @@ step l2commit: COMMIT; listener2: NOTIFY "c1" with payload "" from notifier step l2stop: UNLISTEN *; +starting permutation: lch_listen nch_notify lch_check +step lch_listen: LISTEN ch; +step nch_notify: NOTIFY ch, 'aa'; +step lch_check: SELECT 1 AS x; +x +- +1 +(1 row) + +listener_ch: NOTIFY "ch" with payload "aa" from notifier_ch + starting permutation: llisten lbegin usage bignotify usage step llisten: LISTEN c1; LISTEN c2; step lbegin: BEGIN; diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec index 0b8cfd91083..d09c2297f09 100644 --- a/src/test/isolation/specs/async-notify.spec +++ b/src/test/isolation/specs/async-notify.spec @@ -31,6 +31,20 @@ step notifys1 { ROLLBACK TO SAVEPOINT s2; COMMIT; } +step notifys_simple { + BEGIN; + SAVEPOINT s1; + NOTIFY c1, 'simple1'; + NOTIFY c2, 'simple2'; + RELEASE SAVEPOINT s1; + COMMIT; +} +step notify_many_with_dup { + BEGIN; + SELECT pg_notify('c1', 'msg' || s::text) FROM generate_series(1, 17) s; + SELECT pg_notify('c1', 'msg1'); + COMMIT; +} step usage { SELECT pg_notification_queue_usage() > 0 AS nonzero; } step bignotify { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; } teardown { UNLISTEN *; } @@ -43,6 +57,7 @@ step lcheck { SELECT 1 AS x; } step lbegin { BEGIN; } step lbegins { BEGIN ISOLATION LEVEL SERIALIZABLE; } step lcommit { COMMIT; } +step lunlisten_all { BEGIN; LISTEN c1; UNLISTEN *; COMMIT; } teardown { UNLISTEN *; } # In some tests we need a second listener, just to block the queue. @@ -53,6 +68,38 @@ step l2begin { BEGIN; } step l2commit { COMMIT; } step l2stop { UNLISTEN *; } +# Third listener session for testing array growth. + +session listener3 +step l3listen { LISTEN c1; } +teardown { UNLISTEN *; } + +# Listener session for cross-session notification test with channel 'ch'. + +session listener_ch +step lch_listen { LISTEN ch; } +step lch_check { SELECT 1 AS x; } +teardown { UNLISTEN *; } + +# Notifier session for cross-session notification test with channel 'ch'. + +session notifier_ch +step nch_notify { NOTIFY ch, 'aa'; } + +# Session for testing LISTEN in subtransaction with separate steps. + +session listen_subxact +step lsbegin { BEGIN; } +step lslisten_outer { LISTEN c3; } +step lssavepoint { SAVEPOINT s1; } +step lslisten { LISTEN c1; LISTEN c2; } +step lsrelease { RELEASE SAVEPOINT s1; } +step lsrollback { ROLLBACK TO SAVEPOINT s1; } +step lscommit { COMMIT; } +step lsnotify { NOTIFY c1, 'subxact_test'; } +step lsnotify_check { NOTIFY c1, 'should_not_receive'; } +teardown { UNLISTEN *; } + # Trivial cases. permutation listenc notify1 notify2 notify3 notifyf @@ -60,6 +107,27 @@ permutation listenc notify1 notify2 notify3 notifyf # Check simple and less-simple deduplication. permutation listenc notifyd1 notifyd2 notifys1 +# Check simple NOTIFY reparenting when parent has no action. +permutation listenc notifys_simple + +# Check LISTEN reparenting in subtransaction. +permutation lsbegin lssavepoint lslisten lsrelease lscommit lsnotify + +# Check LISTEN merge path when both outer and inner transactions have actions. +permutation lsbegin lslisten_outer lssavepoint lslisten lsrelease lscommit lsnotify + +# Check LISTEN abort path (ROLLBACK TO SAVEPOINT discards pending actions). +permutation lsbegin lssavepoint lslisten lsrollback lscommit lsnotify_check + +# Check UNLISTEN * cancels a LISTEN in the same transaction. +permutation lunlisten_all notify1 lcheck + +# Check notification_match function (triggered by hash table duplicate detection). +permutation listenc notify_many_with_dup + +# Check ChannelHashAddListener array growth. +permutation listenc llisten l2listen l3listen lslisten + # Cross-backend notification delivery. We use a "select 1" to force the # listener session to check for notifies. In principle we could just wait # for delivery, but that would require extra support in isolationtester @@ -73,6 +141,10 @@ permutation listenc llisten notify1 notify2 notify3 notifyf lcheck # and notify queue is not empty permutation l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop +# Check that notifications sent from a backend that has not done LISTEN +# are properly delivered to a listener in another backend. +permutation lch_listen nch_notify lch_check + # Verify that pg_notification_queue_usage correctly reports a non-zero result, # after submitting notifications while another connection is listening for # those notifications and waiting inside an active transaction. We have to diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 14dec2d49c1..3f3a888fd0e 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -422,6 +422,7 @@ CatalogIdMapEntry CatalogIndexState ChangeVarNodes_callback ChangeVarNodes_context +ChannelName CheckPoint CheckPointStmt CheckpointStatsData @@ -1111,6 +1112,8 @@ GistSplitUnion GistSplitVector GistTsVectorOptions GistVacState +GlobalChannelEntry +GlobalChannelKey GlobalTransaction GlobalTransactionData GlobalVisHorizonKind @@ -1580,6 +1583,7 @@ ListParsedLex ListenAction ListenActionKind ListenStmt +ListenerEntry LoInfo LoadStmt LocalBufferLookupEnt @@ -2176,6 +2180,8 @@ PatternInfoArray Pattern_Prefix_Status Pattern_Type PendingFsyncEntry +PendingListenAction +PendingListenEntry PendingRelDelete PendingRelSync PendingUnlinkEntry