diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 6cb2d445f0d..f26269b5eae 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -218,6 +218,7 @@ typedef struct QueueBackendStatus { int32 pid; /* either a PID or InvalidPid */ Oid dboid; /* backend's database OID, or InvalidOid */ + BackendId nextListener; /* id of next listener, or InvalidBackendId */ QueuePosition pos; /* backend has read queue up to here */ } QueueBackendStatus; @@ -241,12 +242,19 @@ typedef struct QueueBackendStatus * Each backend uses the backend[] array entry with index equal to its * BackendId (which can range from 1 to MaxBackends). We rely on this to make * SendProcSignal fast. + * + * The backend[] array entries for actively-listening backends are threaded + * together using firstListener and the nextListener links, so that we can + * scan them without having to iterate over inactive entries. We keep this + * list in order by BackendId so that the scan is cache-friendly when there + * are many active entries. */ typedef struct AsyncQueueControl { QueuePosition head; /* head points to the next free location */ QueuePosition tail; /* the global tail is equivalent to the pos of * the "slowest" backend */ + BackendId firstListener; /* id of first listener, or InvalidBackendId */ TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; /* backend[0] is not used; used entries are from [1] to [MaxBackends] */ @@ -256,8 +264,10 @@ static AsyncQueueControl *asyncQueueControl; #define QUEUE_HEAD (asyncQueueControl->head) #define QUEUE_TAIL (asyncQueueControl->tail) +#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener) #define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid) #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) +#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener) #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) /* @@ -490,16 +500,16 @@ AsyncShmemInit(void) if (!found) { /* First time through, so initialize it */ - int i; - SET_QUEUE_POS(QUEUE_HEAD, 0, 0); SET_QUEUE_POS(QUEUE_TAIL, 0, 0); + QUEUE_FIRST_LISTENER = InvalidBackendId; asyncQueueControl->lastQueueFillWarn = 0; /* zero'th entry won't be used, but let's initialize it anyway */ - for (i = 0; i <= MaxBackends; i++) + for (int i = 0; i <= MaxBackends; i++) { QUEUE_BACKEND_PID(i) = InvalidPid; QUEUE_BACKEND_DBOID(i) = InvalidOid; + QUEUE_NEXT_LISTENER(i) = InvalidBackendId; SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); } } @@ -959,7 +969,7 @@ Exec_ListenPreCommit(void) { QueuePosition head; QueuePosition max; - int i; + BackendId prevListener; /* * Nothing to do if we are already listening to something, nor if we @@ -996,26 +1006,37 @@ Exec_ListenPreCommit(void) * our database; any notifications it's already advanced over are surely * committed and need not be re-examined by us. (We must consider only * backends connected to our DB, because others will not have bothered to - * check committed-ness of notifications in our DB.) But we only bother - * with that if there's more than a page worth of notifications - * outstanding, otherwise scanning all the other backends isn't worth it. + * check committed-ness of notifications in our DB.) * - * We need exclusive lock here so we can look at other backends' entries. + * We need exclusive lock here so we can look at other backends' entries + * and manipulate the list links. */ LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); head = QUEUE_HEAD; max = QUEUE_TAIL; - if (QUEUE_POS_PAGE(max) != QUEUE_POS_PAGE(head)) + prevListener = InvalidBackendId; + for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) { - for (i = 1; i <= MaxBackends; i++) - { - if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) - max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i)); - } + if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) + max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i)); + /* Also find last listening backend before this one */ + if (i < MyBackendId) + prevListener = i; } QUEUE_BACKEND_POS(MyBackendId) = max; QUEUE_BACKEND_PID(MyBackendId) = MyProcPid; QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId; + /* Insert backend into list of listeners at correct position */ + if (prevListener > 0) + { + QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_NEXT_LISTENER(prevListener); + QUEUE_NEXT_LISTENER(prevListener) = MyBackendId; + } + else + { + QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_FIRST_LISTENER; + QUEUE_FIRST_LISTENER = MyBackendId; + } LWLockRelease(AsyncQueueLock); /* Now we are listed in the global array, so remember we're listening */ @@ -1228,13 +1249,31 @@ asyncQueueUnregister(void) if (!amRegisteredListener) /* nothing to do */ return; - LWLockAcquire(AsyncQueueLock, LW_SHARED); + /* + * Need exclusive lock here to manipulate list links. + */ + LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); /* check if entry is valid and oldest ... */ advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) && QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL); /* ... then mark it invalid */ QUEUE_BACKEND_PID(MyBackendId) = InvalidPid; QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid; + /* and remove it from the list */ + if (QUEUE_FIRST_LISTENER == MyBackendId) + QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyBackendId); + else + { + for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) + { + if (QUEUE_NEXT_LISTENER(i) == MyBackendId) + { + QUEUE_NEXT_LISTENER(i) = QUEUE_NEXT_LISTENER(MyBackendId); + break; + } + } + } + QUEUE_NEXT_LISTENER(MyBackendId) = InvalidBackendId; LWLockRelease(AsyncQueueLock); /* mark ourselves as no longer listed in the global array */ @@ -1508,16 +1547,13 @@ asyncQueueFillWarning(void) { QueuePosition min = QUEUE_HEAD; int32 minPid = InvalidPid; - int i; - for (i = 1; i <= MaxBackends; i++) + for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) { - if (QUEUE_BACKEND_PID(i) != InvalidPid) - { - min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); - if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i))) - minPid = QUEUE_BACKEND_PID(i); - } + Assert(QUEUE_BACKEND_PID(i) != InvalidPid); + min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); + if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i))) + minPid = QUEUE_BACKEND_PID(i); } ereport(WARNING, @@ -1553,7 +1589,6 @@ SignalBackends(void) int32 *pids; BackendId *ids; int count; - int i; int32 pid; /* @@ -1570,10 +1605,11 @@ SignalBackends(void) count = 0; LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); - for (i = 1; i <= MaxBackends; i++) + for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) { pid = QUEUE_BACKEND_PID(i); - if (pid != InvalidPid && pid != MyProcPid) + Assert(pid != InvalidPid); + if (pid != MyProcPid) { QueuePosition pos = QUEUE_BACKEND_POS(i); @@ -1588,7 +1624,7 @@ SignalBackends(void) LWLockRelease(AsyncQueueLock); /* Now send signals */ - for (i = 0; i < count; i++) + for (int i = 0; i < count; i++) { pid = pids[i]; @@ -2064,17 +2100,16 @@ static void asyncQueueAdvanceTail(void) { QueuePosition min; - int i; int oldtailpage; int newtailpage; int boundary; LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); min = QUEUE_HEAD; - for (i = 1; i <= MaxBackends; i++) + for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) { - if (QUEUE_BACKEND_PID(i) != InvalidPid) - min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); + Assert(QUEUE_BACKEND_PID(i) != InvalidPid); + min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); } oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL); QUEUE_TAIL = min;