diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4c53de08b9b..f433658a0d1 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -90,11 +90,14 @@ #include "utils/guc.h" #include "utils/memutils.h" #include "utils/pg_lsn.h" -#include "utils/portal.h" +#include "utils/pgstat_internal.h" #include "utils/ps_status.h" #include "utils/timeout.h" #include "utils/timestamp.h" +/* Minimum interval used by walsender for stats flushes, in ms */ +#define WALSENDER_STATS_FLUSH_INTERVAL 1000 + /* * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ. * @@ -1540,6 +1543,7 @@ WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + TimestampTz last_flush = 0; /* * Fast path to avoid acquiring the spinlock in case we already know we @@ -1559,6 +1563,7 @@ WalSndWaitForWal(XLogRecPtr loc) for (;;) { long sleeptime; + TimestampTz now; /* Clear any already-pending wakeups */ ResetLatch(MyLatch); @@ -1648,13 +1653,22 @@ WalSndWaitForWal(XLogRecPtr loc) * new WAL to be generated. (But if we have nothing to send, we don't * want to wake on socket-writable.) */ - sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + now = GetCurrentTimestamp(); + sleeptime = WalSndComputeSleeptime(now); wakeEvents = WL_SOCKET_READABLE; if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; + /* Report IO statistics, if needed */ + if (TimestampDifferenceExceeds(last_flush, now, + WALSENDER_STATS_FLUSH_INTERVAL)) + { + pgstat_flush_io(false); + last_flush = now; + } + WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL); } @@ -2446,6 +2460,8 @@ WalSndCheckTimeOut(void) static void WalSndLoop(WalSndSendDataCallback send_data) { + TimestampTz last_flush = 0; + /* * Initialize the last reply timestamp. That enables timeout processing * from hereon. @@ -2540,6 +2556,9 @@ WalSndLoop(WalSndSendDataCallback send_data) * WalSndWaitForWal() handle any other blocking; idle receivers need * its additional actions. For physical replication, also block if * caught up; its send_data does not block. + * + * The IO statistics are reported in WalSndWaitForWal() for the + * logical WAL senders. */ if ((WalSndCaughtUp && send_data != XLogSendLogical && !streamingDoneSending) || @@ -2547,6 +2566,7 @@ WalSndLoop(WalSndSendDataCallback send_data) { long sleeptime; int wakeEvents; + TimestampTz now; if (!streamingDoneReceiving) wakeEvents = WL_SOCKET_READABLE; @@ -2557,11 +2577,20 @@ WalSndLoop(WalSndSendDataCallback send_data) * Use fresh timestamp, not last_processing, to reduce the chance * of reaching wal_sender_timeout before sending a keepalive. */ - sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + now = GetCurrentTimestamp(); + sleeptime = WalSndComputeSleeptime(now); if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; + /* Report IO statistics, if needed */ + if (TimestampDifferenceExceeds(last_flush, now, + WALSENDER_STATS_FLUSH_INTERVAL)) + { + pgstat_flush_io(false); + last_flush = now; + } + /* Sleep until something happens or we time out */ WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN); }