diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 8185e18c718..7b861e73381 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2168,7 +2168,7 @@ WalSndLoop(WalSndSendDataCallback send_data) if (MyWalSnd->state == WALSNDSTATE_CATCHUP) { ereport(DEBUG1, - (errmsg("standby \"%s\" has now caught up with primary", + (errmsg("\"%s\" has now caught up with upstream server", application_name))); WalSndSetState(WALSNDSTATE_STREAMING); } @@ -2757,10 +2757,10 @@ XLogSendLogical(void) char *errm; /* - * Don't know whether we've caught up yet. We'll set it to true in - * WalSndWaitForWal, if we're actually waiting. We also set to true if - * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait - - * i.e. when we're shutting down. + * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to + * true in WalSndWaitForWal, if we're actually waiting. We also set to + * true if XLogReadRecord() had to stop reading but WalSndWaitForWal + * didn't wait - i.e. when we're shutting down. */ WalSndCaughtUp = false; @@ -2773,6 +2773,9 @@ XLogSendLogical(void) if (record != NULL) { + /* XXX: Note that logical decoding cannot be used while in recovery */ + XLogRecPtr flushPtr = GetFlushRecPtr(); + /* * Note the lack of any call to LagTrackerWrite() which is handled by * WalSndUpdateProgress which is called by output plugin through @@ -2781,6 +2784,13 @@ XLogSendLogical(void) LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader); sentPtr = logical_decoding_ctx->reader->EndRecPtr; + + /* + * If we have sent a record that is at or beyond the flushed point, we + * have caught up. + */ + if (sentPtr >= flushPtr) + WalSndCaughtUp = true; } else {