1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-24 01:29:19 +03:00

Suppress useless wakeups in walreceiver.

Instead of waking up 10 times per second to check for various timeout
conditions, keep track of when we next have periodic work to do.

Author: Thomas Munro <thomas.munro@gmail.com>
Author: Nathan Bossart <nathandbossart@gmail.com>
Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
Discussion: https://postgr.es/m/CA%2BhUKGJGhX4r2LPUE3Oy9BX71Eum6PBcS8L3sJpScR9oKaTVaA%40mail.gmail.com
This commit is contained in:
Thomas Munro
2022-11-08 20:36:36 +13:00
parent bd95816f74
commit 05a7be9355
2 changed files with 115 additions and 57 deletions

View File

@@ -95,8 +95,6 @@ bool hot_standby_feedback;
static WalReceiverConn *wrconn = NULL; static WalReceiverConn *wrconn = NULL;
WalReceiverFunctionsType *WalReceiverFunctions = NULL; WalReceiverFunctionsType *WalReceiverFunctions = NULL;
#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
/* /*
* These variables are used similarly to openLogFile/SegNo, * These variables are used similarly to openLogFile/SegNo,
* but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
@@ -116,6 +114,23 @@ static struct
XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
} LogstreamResult; } LogstreamResult;
/*
* Reasons to wake up and perform periodic tasks.
*/
typedef enum WalRcvWakeupReason
{
WALRCV_WAKEUP_TERMINATE,
WALRCV_WAKEUP_PING,
WALRCV_WAKEUP_REPLY,
WALRCV_WAKEUP_HSFEEDBACK,
NUM_WALRCV_WAKEUPS
} WalRcvWakeupReason;
/*
* Wake up times for periodic tasks.
*/
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
static StringInfoData reply_message; static StringInfoData reply_message;
static StringInfoData incoming_message; static StringInfoData incoming_message;
@@ -132,6 +147,7 @@ static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendReply(bool force, bool requestReply);
static void XLogWalRcvSendHSFeedback(bool immed); static void XLogWalRcvSendHSFeedback(bool immed);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
/* /*
* Process any interrupts the walreceiver process may have received. * Process any interrupts the walreceiver process may have received.
@@ -179,9 +195,7 @@ WalReceiverMain(void)
TimeLineID primaryTLI; TimeLineID primaryTLI;
bool first_stream; bool first_stream;
WalRcvData *walrcv = WalRcv; WalRcvData *walrcv = WalRcv;
TimestampTz last_recv_timestamp; TimestampTz now;
TimestampTz starttime;
bool ping_sent;
char *err; char *err;
char *sender_host = NULL; char *sender_host = NULL;
int sender_port = 0; int sender_port = 0;
@@ -192,7 +206,7 @@ WalReceiverMain(void)
*/ */
Assert(walrcv != NULL); Assert(walrcv != NULL);
starttime = GetCurrentTimestamp(); now = GetCurrentTimestamp();
/* /*
* Mark walreceiver as running in shared memory. * Mark walreceiver as running in shared memory.
@@ -248,7 +262,7 @@ WalReceiverMain(void)
/* Initialise to a sanish value */ /* Initialise to a sanish value */
walrcv->lastMsgSendTime = walrcv->lastMsgSendTime =
walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = starttime; walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
/* Report the latch to use to awaken this process */ /* Report the latch to use to awaken this process */
walrcv->latch = &MyProc->procLatch; walrcv->latch = &MyProc->procLatch;
@@ -414,9 +428,10 @@ WalReceiverMain(void)
initStringInfo(&reply_message); initStringInfo(&reply_message);
initStringInfo(&incoming_message); initStringInfo(&incoming_message);
/* Initialize the last recv timestamp */ /* Initialize nap wakeup times. */
last_recv_timestamp = GetCurrentTimestamp(); now = GetCurrentTimestamp();
ping_sent = false; for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
WalRcvComputeNextWakeup(i, now);
/* Loop until end-of-streaming or error */ /* Loop until end-of-streaming or error */
for (;;) for (;;)
@@ -426,6 +441,8 @@ WalReceiverMain(void)
bool endofwal = false; bool endofwal = false;
pgsocket wait_fd = PGINVALID_SOCKET; pgsocket wait_fd = PGINVALID_SOCKET;
int rc; int rc;
TimestampTz nextWakeup;
int nap;
/* /*
* Exit walreceiver if we're not in recovery. This should not * Exit walreceiver if we're not in recovery. This should not
@@ -443,11 +460,15 @@ WalReceiverMain(void)
{ {
ConfigReloadPending = false; ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP); ProcessConfigFile(PGC_SIGHUP);
now = GetCurrentTimestamp();
for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
WalRcvComputeNextWakeup(i, now);
XLogWalRcvSendHSFeedback(true); XLogWalRcvSendHSFeedback(true);
} }
/* See if we can read data immediately */ /* See if we can read data immediately */
len = walrcv_receive(wrconn, &buf, &wait_fd); len = walrcv_receive(wrconn, &buf, &wait_fd);
now = GetCurrentTimestamp();
if (len != 0) if (len != 0)
{ {
/* /*
@@ -459,11 +480,12 @@ WalReceiverMain(void)
if (len > 0) if (len > 0)
{ {
/* /*
* Something was received from primary, so reset * Something was received from primary, so adjust
* timeout * the ping and terminate wakeup times.
*/ */
last_recv_timestamp = GetCurrentTimestamp(); WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE,
ping_sent = false; now);
WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now);
XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1, XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
startpointTLI); startpointTLI);
} }
@@ -480,6 +502,7 @@ WalReceiverMain(void)
break; break;
} }
len = walrcv_receive(wrconn, &buf, &wait_fd); len = walrcv_receive(wrconn, &buf, &wait_fd);
now = GetCurrentTimestamp();
} }
/* Let the primary know that we received some data. */ /* Let the primary know that we received some data. */
@@ -497,6 +520,20 @@ WalReceiverMain(void)
if (endofwal) if (endofwal)
break; break;
/* Find the soonest wakeup time, to limit our nap. */
nextWakeup = PG_INT64_MAX;
for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
nextWakeup = Min(wakeup[i], nextWakeup);
/*
* Calculate the nap time. WaitLatchOrSocket() doesn't accept
* timeouts longer than INT_MAX milliseconds, so we limit the
* result accordingly. Also, we round up to the next
* millisecond to avoid waking up too early and spinning until
* one of the wakeup times.
*/
nap = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000));
/* /*
* Ideally we would reuse a WaitEventSet object repeatedly * Ideally we would reuse a WaitEventSet object repeatedly
* here to avoid the overheads of WaitLatchOrSocket on epoll * here to avoid the overheads of WaitLatchOrSocket on epoll
@@ -513,8 +550,9 @@ WalReceiverMain(void)
WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
WL_TIMEOUT | WL_LATCH_SET, WL_TIMEOUT | WL_LATCH_SET,
wait_fd, wait_fd,
NAPTIME_PER_CYCLE, nap,
WAIT_EVENT_WAL_RECEIVER_MAIN); WAIT_EVENT_WAL_RECEIVER_MAIN);
now = GetCurrentTimestamp();
if (rc & WL_LATCH_SET) if (rc & WL_LATCH_SET)
{ {
ResetLatch(MyLatch); ResetLatch(MyLatch);
@@ -550,34 +588,19 @@ WalReceiverMain(void)
* Check if time since last receive from primary has * Check if time since last receive from primary has
* reached the configured limit. * reached the configured limit.
*/ */
if (wal_receiver_timeout > 0) if (now >= wakeup[WALRCV_WAKEUP_TERMINATE])
{
TimestampTz now = GetCurrentTimestamp();
TimestampTz timeout;
timeout =
TimestampTzPlusMilliseconds(last_recv_timestamp,
wal_receiver_timeout);
if (now >= timeout)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE), (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("terminating walreceiver due to timeout"))); errmsg("terminating walreceiver due to timeout")));
/* /*
* We didn't receive anything new, for half of * We didn't receive anything new, for half of receiver
* receiver replication timeout. Ping the server. * replication timeout. Ping the server.
*/ */
if (!ping_sent) if (now >= wakeup[WALRCV_WAKEUP_PING])
{
timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
(wal_receiver_timeout / 2));
if (now >= timeout)
{ {
requestReply = true; requestReply = true;
ping_sent = true; wakeup[WALRCV_WAKEUP_PING] = PG_INT64_MAX;
}
}
} }
XLogWalRcvSendReply(requestReply, requestReply); XLogWalRcvSendReply(requestReply, requestReply);
@@ -1076,7 +1099,6 @@ XLogWalRcvSendReply(bool force, bool requestReply)
static XLogRecPtr writePtr = 0; static XLogRecPtr writePtr = 0;
static XLogRecPtr flushPtr = 0; static XLogRecPtr flushPtr = 0;
XLogRecPtr applyPtr; XLogRecPtr applyPtr;
static TimestampTz sendTime = 0;
TimestampTz now; TimestampTz now;
/* /*
@@ -1101,10 +1123,11 @@ XLogWalRcvSendReply(bool force, bool requestReply)
if (!force if (!force
&& writePtr == LogstreamResult.Write && writePtr == LogstreamResult.Write
&& flushPtr == LogstreamResult.Flush && flushPtr == LogstreamResult.Flush
&& !TimestampDifferenceExceeds(sendTime, now, && now < wakeup[WALRCV_WAKEUP_REPLY])
wal_receiver_status_interval * 1000))
return; return;
sendTime = now;
/* Make sure we wake up when it's time to send another reply. */
WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now);
/* Construct a new message */ /* Construct a new message */
writePtr = LogstreamResult.Write; writePtr = LogstreamResult.Write;
@@ -1149,7 +1172,6 @@ XLogWalRcvSendHSFeedback(bool immed)
catalog_xmin_epoch; catalog_xmin_epoch;
TransactionId xmin, TransactionId xmin,
catalog_xmin; catalog_xmin;
static TimestampTz sendTime = 0;
/* initially true so we always send at least one feedback message */ /* initially true so we always send at least one feedback message */
static bool primary_has_standby_xmin = true; static bool primary_has_standby_xmin = true;
@@ -1165,16 +1187,12 @@ XLogWalRcvSendHSFeedback(bool immed)
/* Get current timestamp. */ /* Get current timestamp. */
now = GetCurrentTimestamp(); now = GetCurrentTimestamp();
if (!immed) /* Send feedback at most once per wal_receiver_status_interval. */
{ if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
/*
* Send feedback at most once per wal_receiver_status_interval.
*/
if (!TimestampDifferenceExceeds(sendTime, now,
wal_receiver_status_interval * 1000))
return; return;
sendTime = now;
} /* Make sure we wake up when it's time to send feedback again. */
WalRcvComputeNextWakeup(WALRCV_WAKEUP_HSFEEDBACK, now);
/* /*
* If Hot Standby is not yet accepting connections there is nothing to * If Hot Standby is not yet accepting connections there is nothing to
@@ -1285,6 +1303,45 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
} }
} }
/*
* Compute the next wakeup time for a given wakeup reason. Can be called to
* initialize a wakeup time, to adjust it for the next wakeup, or to
* reinitialize it when GUCs have changed.
*/
static void
WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
{
switch (reason)
{
case WALRCV_WAKEUP_TERMINATE:
if (wal_receiver_timeout <= 0)
wakeup[reason] = PG_INT64_MAX;
else
wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000);
break;
case WALRCV_WAKEUP_PING:
if (wal_receiver_timeout <= 0)
wakeup[reason] = PG_INT64_MAX;
else
wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000);
break;
case WALRCV_WAKEUP_HSFEEDBACK:
if (!hot_standby_feedback || wal_receiver_status_interval <= 0)
wakeup[reason] = PG_INT64_MAX;
else
wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
break;
case WALRCV_WAKEUP_REPLY:
if (wal_receiver_status_interval <= 0)
wakeup[reason] = PG_INT64_MAX;
else
wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
break;
default:
break;
}
}
/* /*
* Wake up the walreceiver main loop. * Wake up the walreceiver main loop.
* *

View File

@@ -2927,6 +2927,7 @@ WALInsertLock
WALInsertLockPadded WALInsertLockPadded
WALOpenSegment WALOpenSegment
WALReadError WALReadError
WalRcvWakeupReason
WALSegmentCloseCB WALSegmentCloseCB
WALSegmentContext WALSegmentContext
WALSegmentOpenCB WALSegmentOpenCB