diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index a0fc480646f..8b79d670012 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -624,6 +624,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.startup_cb(ctx, opt, is_init); @@ -651,6 +652,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx) /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.shutdown_cb(ctx); @@ -686,6 +688,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->first_lsn; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.begin_cb(ctx, txn); @@ -717,6 +720,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ + ctx->end_xact = true; /* do the actual work: call callback */ ctx->callbacks.commit_cb(ctx, txn, commit_lsn); @@ -756,6 +760,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; + ctx->end_xact = false; + ctx->callbacks.change_cb(ctx, txn, relation, change); /* Pop the error context stack */ @@ -796,6 +802,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; + ctx->end_xact = false; + ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change); /* Pop the error context stack */ @@ -822,6 +830,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id); @@ -859,6 +868,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; ctx->write_location = message_lsn; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix, diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index d317fd70063..6710f983ea4 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -50,6 +50,7 @@ static bool publications_valid; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); +static void update_replication_progress(LogicalDecodingContext *ctx); /* Entry in the map used to remember which relation schemas we sent. */ typedef struct RelationSyncEntry @@ -247,7 +248,7 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - OutputPluginUpdateProgress(ctx); + update_replication_progress(ctx); OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit(ctx->out, txn, commit_lsn); @@ -309,6 +310,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, MemoryContext old; RelationSyncEntry *relentry; + update_replication_progress(ctx); + if (!is_publishable_relation(relation)) return; @@ -389,6 +392,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelids; Oid *relids; + update_replication_progress(ctx); + old = MemoryContextSwitchTo(data->context); relids = palloc0(nrelations * sizeof(Oid)); @@ -660,3 +665,36 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL) entry->replicate_valid = false; } + +/* + * Try to update progress and send a keepalive message if too many changes were + * processed. + * + * For a large transaction, if we don't send any change to the downstream for a + * long time (exceeds the wal_receiver_timeout of standby) then it can timeout. + * This can happen when all or most of the changes are not published. + */ +static void +update_replication_progress(LogicalDecodingContext *ctx) +{ + static int changes_count = 0; + + /* + * We don't want to try sending a keepalive message after processing each + * change as that can have overhead. Tests revealed that there is no + * noticeable overhead in doing it after continuously processing 100 or so + * changes. + */ +#define CHANGES_THRESHOLD 100 + + /* + * If we are at the end of transaction LSN, update progress tracking. + * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we + * try to send a keepalive message if required. + */ + if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD) + { + OutputPluginUpdateProgress(ctx); + changes_count = 0; + } +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3698135e493..c2c0c7df430 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -244,6 +244,7 @@ static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); static void ProcessRepliesIfAny(void); +static void ProcessPendingWrites(void); static void WalSndKeepalive(bool requestReply); static void WalSndKeepaliveIfNecessary(void); static void WalSndCheckTimeOut(void); @@ -1214,6 +1215,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, } /* If we have pending write here, go to slow path */ + ProcessPendingWrites(); +} + +/* + * Wait until there is no pending write. Also process replies from the other + * side and check timeouts during that. + */ +static void +ProcessPendingWrites(void) +{ for (;;) { int wakeEvents; @@ -1273,18 +1284,35 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId { static TimestampTz sendTime = 0; TimestampTz now = GetCurrentTimestamp(); + bool end_xact = ctx->end_xact; /* * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to * avoid flooding the lag tracker when we commit frequently. + * + * We don't have a mechanism to get the ack for any LSN other than end + * xact LSN from the downstream. So, we track lag only for end of + * transaction LSN. */ #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000 - if (!TimestampDifferenceExceeds(sendTime, now, - WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) - return; + if (end_xact && TimestampDifferenceExceeds(sendTime, now, + WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) + { + LagTrackerWrite(lsn, now); + sendTime = now; + } - LagTrackerWrite(lsn, now); - sendTime = now; + /* + * Try to send a keepalive if required. We don't need to try sending keep + * alive messages at the transaction end as that will be done at a later + * point in time. This is required only for large transactions where we + * don't send any changes to the downstream and the receiver can timeout + * due to that. + */ + if (!end_xact && + now >= TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2)) + ProcessPendingWrites(); } /* diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 31c796b7651..718080d54a7 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -50,6 +50,9 @@ typedef struct LogicalDecodingContext */ bool fast_forward; + /* Are we processing the end LSN of a transaction? */ + bool end_xact; + OutputPluginCallbacks callbacks; OutputPluginOptions options;