diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 3876c0188df..e95398db05a 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -122,8 +122,8 @@ typedef enum WalRcvWakeupReason WALRCV_WAKEUP_TERMINATE, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_REPLY, - WALRCV_WAKEUP_HSFEEDBACK, - NUM_WALRCV_WAKEUPS + WALRCV_WAKEUP_HSFEEDBACK +#define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1) } WalRcvWakeupReason; /* @@ -206,8 +206,6 @@ WalReceiverMain(void) */ Assert(walrcv != NULL); - now = GetCurrentTimestamp(); - /* * Mark walreceiver as running in shared memory. * @@ -261,6 +259,7 @@ WalReceiverMain(void) Assert(!is_temp_slot || (slotname[0] == '\0')); /* Initialise to a sanish value */ + now = GetCurrentTimestamp(); walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now; @@ -464,6 +463,7 @@ WalReceiverMain(void) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); + /* recompute wakeup times */ now = GetCurrentTimestamp(); for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i) WalRcvComputeNextWakeup(i, now); @@ -472,7 +472,6 @@ WalReceiverMain(void) /* See if we can read data immediately */ len = walrcv_receive(wrconn, &buf, &wait_fd); - now = GetCurrentTimestamp(); if (len != 0) { /* @@ -487,6 +486,7 @@ WalReceiverMain(void) * Something was received from primary, so adjust * the ping and terminate wakeup times. */ + now = GetCurrentTimestamp(); WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE, now); WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now); @@ -506,7 +506,6 @@ WalReceiverMain(void) break; } len = walrcv_receive(wrconn, &buf, &wait_fd); - now = GetCurrentTimestamp(); } /* Let the primary know that we received some data. */ @@ -525,7 +524,7 @@ WalReceiverMain(void) break; /* Find the soonest wakeup time, to limit our nap. */ - nextWakeup = PG_INT64_MAX; + nextWakeup = TIMESTAMP_INFINITY; for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i) nextWakeup = Min(wakeup[i], nextWakeup); @@ -536,6 +535,7 @@ WalReceiverMain(void) * millisecond to avoid waking up too early and spinning until * one of the wakeup times. */ + now = GetCurrentTimestamp(); nap = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000)); /* @@ -556,7 +556,6 @@ WalReceiverMain(void) wait_fd, nap, WAIT_EVENT_WAL_RECEIVER_MAIN); - now = GetCurrentTimestamp(); if (rc & WL_LATCH_SET) { ResetLatch(MyLatch); @@ -592,19 +591,20 @@ WalReceiverMain(void) * Check if time since last receive from primary has * reached the configured limit. */ + now = GetCurrentTimestamp(); if (now >= wakeup[WALRCV_WAKEUP_TERMINATE]) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("terminating walreceiver due to timeout"))); /* - * We didn't receive anything new, for half of receiver - * replication timeout. Ping the server. + * If we didn't receive anything new for half of receiver + * replication timeout, then ping the server. */ if (now >= wakeup[WALRCV_WAKEUP_PING]) { requestReply = true; - wakeup[WALRCV_WAKEUP_PING] = PG_INT64_MAX; + wakeup[WALRCV_WAKEUP_PING] = TIMESTAMP_INFINITY; } XLogWalRcvSendReply(requestReply, requestReply); @@ -1266,7 +1266,6 @@ static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) { WalRcvData *walrcv = WalRcv; - TimestampTz lastMsgReceiptTime = GetCurrentTimestamp(); /* Update shared-memory status */ @@ -1310,7 +1309,10 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) /* * Compute the next wakeup time for a given wakeup reason. Can be called to * initialize a wakeup time, to adjust it for the next wakeup, or to - * reinitialize it when GUCs have changed. + * reinitialize it when GUCs have changed. We ask the caller to pass in the + * value of "now" because this frequently avoids multiple calls of + * GetCurrentTimestamp(). It had better be a reasonably up-to-date value + * though. */ static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now) @@ -1319,30 +1321,29 @@ WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now) { case WALRCV_WAKEUP_TERMINATE: if (wal_receiver_timeout <= 0) - wakeup[reason] = PG_INT64_MAX; + wakeup[reason] = TIMESTAMP_INFINITY; else - wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000); + wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout); break; case WALRCV_WAKEUP_PING: if (wal_receiver_timeout <= 0) - wakeup[reason] = PG_INT64_MAX; + wakeup[reason] = TIMESTAMP_INFINITY; else - wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000); + wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2); break; case WALRCV_WAKEUP_HSFEEDBACK: if (!hot_standby_feedback || wal_receiver_status_interval <= 0) - wakeup[reason] = PG_INT64_MAX; + wakeup[reason] = TIMESTAMP_INFINITY; else - wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000); + wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval); break; case WALRCV_WAKEUP_REPLY: if (wal_receiver_status_interval <= 0) - wakeup[reason] = PG_INT64_MAX; + wakeup[reason] = TIMESTAMP_INFINITY; else - wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000); - break; - default: + wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval); break; + /* there's intentionally no default: here */ } } diff --git a/src/include/datatype/timestamp.h b/src/include/datatype/timestamp.h index 21a37e21e9b..ab8ccf89ca9 100644 --- a/src/include/datatype/timestamp.h +++ b/src/include/datatype/timestamp.h @@ -143,10 +143,17 @@ struct pg_itm_in #define TZDISP_LIMIT ((MAX_TZDISP_HOUR + 1) * SECS_PER_HOUR) /* - * DT_NOBEGIN represents timestamp -infinity; DT_NOEND represents +infinity + * We reserve the minimum and maximum integer values to represent + * timestamp (or timestamptz) -infinity and +infinity. */ -#define DT_NOBEGIN PG_INT64_MIN -#define DT_NOEND PG_INT64_MAX +#define TIMESTAMP_MINUS_INFINITY PG_INT64_MIN +#define TIMESTAMP_INFINITY PG_INT64_MAX + +/* + * Historically these alias for infinity have been used. + */ +#define DT_NOBEGIN TIMESTAMP_MINUS_INFINITY +#define DT_NOEND TIMESTAMP_INFINITY #define TIMESTAMP_NOBEGIN(j) \ do {(j) = DT_NOBEGIN;} while (0) diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index 42f802bb9dc..edd59dc4322 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -81,7 +81,9 @@ IntervalPGetDatum(const Interval *X) #define INTERVAL_PRECISION(t) ((t) & INTERVAL_PRECISION_MASK) #define INTERVAL_RANGE(t) (((t) >> 16) & INTERVAL_RANGE_MASK) +/* Macros for doing timestamp arithmetic without assuming timestamp's units */ #define TimestampTzPlusMilliseconds(tz,ms) ((tz) + ((ms) * (int64) 1000)) +#define TimestampTzPlusSeconds(tz,s) ((tz) + ((s) * (int64) 1000000)) /* Set at postmaster start */