1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-09 06:21:09 +03:00

Introduce latches. A latch is a boolean variable, with the capability to

wait until it is set. Latches can be used to reliably wait until a signal
arrives, which is hard otherwise because signals don't interrupt select()
on some platforms, and even when they do, there's race conditions.

On Unix, latches use the so called self-pipe trick under the covers to
implement the sleep until the latch is set, without race conditions. On
Windows, Windows events are used.

Use the new latch abstraction to sleep in walsender, so that as soon as
a transaction finishes, walsender is woken up to immediately send the WAL
to the standby. This reduces the latency between master and standby, which
is good.

Preliminary work by Fujii Masao. The latch implementation is by me, with
helpful comments from many people.
This commit is contained in:
Heikki Linnakangas
2010-09-11 15:48:04 +00:00
parent 81624db39a
commit 2746e5f21d
13 changed files with 928 additions and 39 deletions

View File

@@ -28,12 +28,13 @@
* Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.29 2010/07/22 13:03:11 rhaas Exp $
* $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.30 2010/09/11 15:48:04 heikki Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/xlog_internal.h"
@@ -66,9 +67,6 @@ bool am_walsender = false; /* Am I a walsender process ? */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int WalSndDelay = 200; /* max sleep time between some actions */
#define NAPTIME_PER_CYCLE 100000L /* max sleep time between cycles
* (100ms) */
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
* but for walsender to read the XLOG.
@@ -93,6 +91,7 @@ static volatile sig_atomic_t ready_to_stop = false;
static void WalSndSigHupHandler(SIGNAL_ARGS);
static void WalSndShutdownHandler(SIGNAL_ARGS);
static void WalSndQuickDieHandler(SIGNAL_ARGS);
static void WalSndXLogSendHandler(SIGNAL_ARGS);
static void WalSndLastCycleHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
@@ -144,6 +143,16 @@ WalSenderMain(void)
/* Handle handshake messages before streaming */
WalSndHandshake();
/* Initialize shared memory status */
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
walsnd->sentPtr = sentPtr;
SpinLockRelease(&walsnd->mutex);
}
/* Main loop of walsender */
return WalSndLoop();
}
@@ -380,8 +389,6 @@ WalSndLoop(void)
/* Loop forever, unless we get an error */
for (;;)
{
long remain; /* remaining time (us) */
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
@@ -421,32 +428,42 @@ WalSndLoop(void)
/*
* If we had sent all accumulated WAL in last round, nap for the
* configured time before retrying.
*
* On some platforms, signals won't interrupt the sleep. To ensure we
* respond reasonably promptly when someone signals us, break down the
* sleep into NAPTIME_PER_CYCLE increments, and check for interrupts
* after each nap.
*/
if (caughtup)
{
remain = WalSndDelay * 1000L;
while (remain > 0)
/*
* Even if we wrote all the WAL that was available when we started
* sending, more might have arrived while we were sending this
* batch. We had the latch set while sending, so we have not
* received any signals from that time. Let's arm the latch
* again, and after that check that we're still up-to-date.
*/
ResetLatch(&MyWalSnd->latch);
if (!XLogSend(output_message, &caughtup))
break;
if (caughtup && !got_SIGHUP && !ready_to_stop && !shutdown_requested)
{
/* Check for interrupts */
if (got_SIGHUP || shutdown_requested || ready_to_stop)
break;
/*
* XXX: We don't really need the periodic wakeups anymore,
* WaitLatchOrSocket should reliably wake up as soon as
* something interesting happens.
*/
/* Sleep and check that the connection is still alive */
pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
CheckClosedConnection();
remain -= NAPTIME_PER_CYCLE;
/* Sleep */
WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
WalSndDelay);
}
}
/* Attempt to send the log once every loop */
if (!XLogSend(output_message, &caughtup))
break;
/* Check if the connection was closed */
CheckClosedConnection();
}
else
{
/* Attempt to send the log once every loop */
if (!XLogSend(output_message, &caughtup))
break;
}
}
/*
@@ -493,10 +510,15 @@ InitWalSnd(void)
}
else
{
/* found */
MyWalSnd = (WalSnd *) walsnd;
/*
* Found a free slot. Take ownership of the latch and initialize
* the other fields.
*/
OwnLatch((Latch *) &walsnd->latch);
walsnd->pid = MyProcPid;
MemSet(&MyWalSnd->sentPtr, 0, sizeof(XLogRecPtr));
MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
/* Set MyWalSnd only after it's fully initialized. */
MyWalSnd = (WalSnd *) walsnd;
SpinLockRelease(&walsnd->mutex);
break;
}
@@ -523,6 +545,7 @@ WalSndKill(int code, Datum arg)
* for this.
*/
MyWalSnd->pid = 0;
DisownLatch(&MyWalSnd->latch);
/* WalSnd struct isn't mine anymore */
MyWalSnd = NULL;
@@ -787,6 +810,8 @@ static void
WalSndSigHupHandler(SIGNAL_ARGS)
{
got_SIGHUP = true;
if (MyWalSnd)
SetLatch(&MyWalSnd->latch);
}
/* SIGTERM: set flag to shut down */
@@ -794,6 +819,8 @@ static void
WalSndShutdownHandler(SIGNAL_ARGS)
{
shutdown_requested = true;
if (MyWalSnd)
SetLatch(&MyWalSnd->latch);
}
/*
@@ -828,11 +855,20 @@ WalSndQuickDieHandler(SIGNAL_ARGS)
exit(2);
}
/* SIGUSR1: set flag to send WAL records */
static void
WalSndXLogSendHandler(SIGNAL_ARGS)
{
latch_sigusr1_handler();
}
/* SIGUSR2: set flag to do a last cycle and shut down afterwards */
static void
WalSndLastCycleHandler(SIGNAL_ARGS)
{
ready_to_stop = true;
if (MyWalSnd)
SetLatch(&MyWalSnd->latch);
}
/* Set up signal handlers */
@@ -847,7 +883,7 @@ WalSndSignals(void)
pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, SIG_IGN); /* not used */
pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
* shutdown */
@@ -891,10 +927,21 @@ WalSndShmemInit(void)
WalSnd *walsnd = &WalSndCtl->walsnds[i];
SpinLockInit(&walsnd->mutex);
InitSharedLatch(&walsnd->latch);
}
}
}
/* Wake up all walsenders */
void
WalSndWakeup(void)
{
int i;
for (i = 0; i < max_wal_senders; i++)
SetLatch(&WalSndCtl->walsnds[i].latch);
}
/*
* This isn't currently used for anything. Monitoring tools might be
* interested in the future, and we'll need something like this in the