mirror of
https://github.com/postgres/postgres.git
synced 2025-10-24 01:29:19 +03:00
Optimize walsender wake up logic using condition variables
WalSndWakeup() currently loops through all the walsenders slots, with a
spinlock acquisition and release for every iteration, to wake up waiting
walsenders.
This commonly was not a problem before e101dfac3a
. But, to allow logical
decoding on standbys, we need to wake up logical walsenders after every WAL
record is applied on the standby, rather just when flushing WAL or switching
timelines. This causes a performance regression for workloads replaying a lot
of WAL records.
To solve this, we use condition variable (CV) to efficiently wake up
walsenders in WalSndWakeup().
Every walsender prepares to sleep on a shared memory CV. Note that it just
prepares to sleep on the CV (i.e., adds itself to the CV's waitlist), but does
not actually wait on the CV (IOW, it never calls ConditionVariableSleep()). It
still uses WaitEventSetWait() for waiting, because CV infrastructure doesn't
handle FeBe socket events currently. The processes (startup process,
walreceiver etc.) wanting to wake up walsenders use
ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
walsenders come out of WaitEventSetWait().
We use separate shared memory CVs for physical and logical walsenders for
selective wake ups, see WalSndWakeup() for more details.
This approach is simple and reasonably efficient. But not very elegant. But
for 16 it seems to be a better path than a larger redesign of the CV
mechanism. A desirable future improvement would be to add support for CVs
into WaitEventSetWait().
This still leaves us with a small regression in very extreme workloads (due to
the spinlock acquisition in ConditionVariableBroadcast() when there are no
waiters) - but that seems acceptable.
Reported-by: Andres Freund <andres@anarazel.de>
Suggested-by: Andres Freund <andres@anarazel.de>
Author: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
Reviewed-by: Zhijie Hou <houzj.fnst@fujitsu.com>
Discussion: https://www.postgresql.org/message-id/20230509190247.3rrplhdgem6su6cg%40awork3.anarazel.de
This commit is contained in:
@@ -3309,6 +3309,9 @@ WalSndShmemInit(void)
|
|||||||
|
|
||||||
SpinLockInit(&walsnd->mutex);
|
SpinLockInit(&walsnd->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ConditionVariableInit(&WalSndCtl->wal_flush_cv);
|
||||||
|
ConditionVariableInit(&WalSndCtl->wal_replay_cv);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -3330,31 +3333,17 @@ WalSndShmemInit(void)
|
|||||||
void
|
void
|
||||||
WalSndWakeup(bool physical, bool logical)
|
WalSndWakeup(bool physical, bool logical)
|
||||||
{
|
{
|
||||||
int i;
|
|
||||||
|
|
||||||
for (i = 0; i < max_wal_senders; i++)
|
|
||||||
{
|
|
||||||
Latch *latch;
|
|
||||||
ReplicationKind kind;
|
|
||||||
WalSnd *walsnd = &WalSndCtl->walsnds[i];
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Get latch pointer with spinlock held, for the unlikely case that
|
* Wake up all the walsenders waiting on WAL being flushed or replayed
|
||||||
* pointer reads aren't atomic (as they're 8 bytes). While at it, also
|
* respectively. Note that waiting walsender would have prepared to sleep
|
||||||
* get kind.
|
* on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
|
||||||
|
* before actually waiting.
|
||||||
*/
|
*/
|
||||||
SpinLockAcquire(&walsnd->mutex);
|
if (physical)
|
||||||
latch = walsnd->latch;
|
ConditionVariableBroadcast(&WalSndCtl->wal_flush_cv);
|
||||||
kind = walsnd->kind;
|
|
||||||
SpinLockRelease(&walsnd->mutex);
|
|
||||||
|
|
||||||
if (latch == NULL)
|
if (logical)
|
||||||
continue;
|
ConditionVariableBroadcast(&WalSndCtl->wal_replay_cv);
|
||||||
|
|
||||||
if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
|
|
||||||
(logical && kind == REPLICATION_KIND_LOGICAL))
|
|
||||||
SetLatch(latch);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -3368,9 +3357,44 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
|
|||||||
WaitEvent event;
|
WaitEvent event;
|
||||||
|
|
||||||
ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
|
ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We use a condition variable to efficiently wake up walsenders in
|
||||||
|
* WalSndWakeup().
|
||||||
|
*
|
||||||
|
* Every walsender prepares to sleep on a shared memory CV. Note that it
|
||||||
|
* just prepares to sleep on the CV (i.e., adds itself to the CV's
|
||||||
|
* waitlist), but does not actually wait on the CV (IOW, it never calls
|
||||||
|
* ConditionVariableSleep()). It still uses WaitEventSetWait() for
|
||||||
|
* waiting, because we also need to wait for socket events. The processes
|
||||||
|
* (startup process, walreceiver etc.) wanting to wake up walsenders use
|
||||||
|
* ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
|
||||||
|
* walsenders come out of WaitEventSetWait().
|
||||||
|
*
|
||||||
|
* This approach is simple and efficient because, one doesn't have to loop
|
||||||
|
* through all the walsenders slots, with a spinlock acquisition and
|
||||||
|
* release for every iteration, just to wake up only the waiting
|
||||||
|
* walsenders. It makes WalSndWakeup() callers' life easy.
|
||||||
|
*
|
||||||
|
* XXX: A desirable future improvement would be to add support for CVs
|
||||||
|
* into WaitEventSetWait().
|
||||||
|
*
|
||||||
|
* And, we use separate shared memory CVs for physical and logical
|
||||||
|
* walsenders for selective wake ups, see WalSndWakeup() for more details.
|
||||||
|
*/
|
||||||
|
if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
|
||||||
|
ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
|
||||||
|
else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
|
||||||
|
ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv);
|
||||||
|
|
||||||
if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
|
if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
|
||||||
(event.events & WL_POSTMASTER_DEATH))
|
(event.events & WL_POSTMASTER_DEATH))
|
||||||
|
{
|
||||||
|
ConditionVariableCancelSleep();
|
||||||
proc_exit(1);
|
proc_exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
ConditionVariableCancelSleep();
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@@ -17,6 +17,7 @@
|
|||||||
#include "nodes/nodes.h"
|
#include "nodes/nodes.h"
|
||||||
#include "nodes/replnodes.h"
|
#include "nodes/replnodes.h"
|
||||||
#include "replication/syncrep.h"
|
#include "replication/syncrep.h"
|
||||||
|
#include "storage/condition_variable.h"
|
||||||
#include "storage/latch.h"
|
#include "storage/latch.h"
|
||||||
#include "storage/shmem.h"
|
#include "storage/shmem.h"
|
||||||
#include "storage/spin.h"
|
#include "storage/spin.h"
|
||||||
@@ -108,6 +109,10 @@ typedef struct
|
|||||||
*/
|
*/
|
||||||
bool sync_standbys_defined;
|
bool sync_standbys_defined;
|
||||||
|
|
||||||
|
/* used as a registry of physical / logical walsenders to wake */
|
||||||
|
ConditionVariable wal_flush_cv;
|
||||||
|
ConditionVariable wal_replay_cv;
|
||||||
|
|
||||||
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER];
|
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER];
|
||||||
} WalSndCtlData;
|
} WalSndCtlData;
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user