diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1a58dd76497..c3ec97a0a62 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -93,6 +93,11 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); +/* callback to update txn's progress */ +static void update_progress_txn_cb_wrapper(ReorderBuffer *cache, + ReorderBufferTXN *txn, + XLogRecPtr lsn); + static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin); /* @@ -278,6 +283,12 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->commit_prepared = commit_prepared_cb_wrapper; ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper; + /* + * Callback to support updating progress during sending data of a + * transaction (and its subtransactions) to the output plugin. + */ + ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper; + ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; ctx->write = do_write; @@ -1584,6 +1595,45 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "update_progress_txn"; + state.report_location = lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = false; + ctx->write_xid = txn->xid; + + /* + * Report this change's lsn so replies from clients can give an up-to-date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = lsn; + + ctx->end_xact = false; + + OutputPluginUpdateProgress(ctx, false); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + /* * Set the required catalog xmin horizon for historic snapshots in the current * replication slot. diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 0468d12936f..d5f90a5f5d2 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2100,6 +2100,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, PG_TRY(); { ReorderBufferChange *change; + int changes_count = 0; /* used to accumulate the number of + * changes */ if (using_subtxn) BeginInternalSubTransaction(streaming ? "stream" : "replay"); @@ -2440,6 +2442,24 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, elog(ERROR, "tuplecid value in changequeue"); break; } + + /* + * It is possible that the data is not sent to downstream for a + * long time either because the output plugin filtered it or there + * is a DDL that generates a lot of data that is not processed by + * the plugin. So, in such cases, the downstream can timeout. To + * avoid that we try to send a keepalive message if required. + * Trying to send a keepalive message after every change has some + * overhead, but testing showed there is no noticeable overhead if + * we do it after every ~100 changes. + */ +#define CHANGES_THRESHOLD 100 + + if (++changes_count >= CHANGES_THRESHOLD) + { + rb->update_progress_txn(rb, txn, change->lsn); + changes_count = 0; + } } /* speculative insertion record must be freed by now */ diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index e4938d8888f..73b080060da 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -92,8 +92,6 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid, static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); -static void update_replication_progress(LogicalDecodingContext *ctx, - bool skipped_xact); /* * Only 3 publication actions are used for row filtering ("insert", "update", @@ -586,7 +584,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * from this transaction has been sent to the downstream. */ sent_begin_txn = txndata->sent_begin_txn; - update_replication_progress(ctx, !sent_begin_txn); + OutputPluginUpdateProgress(ctx, !sent_begin_txn); pfree(txndata); txn->output_plugin_private = NULL; @@ -625,7 +623,7 @@ static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_prepare(ctx->out, txn, prepare_lsn); @@ -639,7 +637,7 @@ static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); @@ -655,7 +653,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time) { - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, @@ -1401,8 +1399,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleTableSlot *old_slot = NULL; TupleTableSlot *new_slot = NULL; - update_replication_progress(ctx, false); - if (!is_publishable_relation(relation)) return; @@ -1637,8 +1633,6 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Oid *relids; TransactionId xid = InvalidTransactionId; - update_replication_progress(ctx, false); - /* Remember the xid for the change in streaming mode. See pgoutput_change. */ if (in_streaming) xid = change->txn->xid; @@ -1702,8 +1696,6 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; TransactionId xid = InvalidTransactionId; - update_replication_progress(ctx, false); - if (!data->messages) return; @@ -1903,7 +1895,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, Assert(!in_streaming); Assert(rbtxn_is_streamed(txn)); - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); @@ -1924,7 +1916,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, { Assert(rbtxn_is_streamed(txn)); - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); OutputPluginWrite(ctx, true); @@ -2424,37 +2416,3 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, } } } - -/* - * Try to update progress and send a keepalive message if too many changes were - * processed. - * - * For a large transaction, if we don't send any change to the downstream for a - * long time (exceeds the wal_receiver_timeout of standby) then it can timeout. - * This can happen when all or most of the changes are either not published or - * got filtered out. - */ -static void -update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact) -{ - static int changes_count = 0; - - /* - * We don't want to try sending a keepalive message after processing each - * change as that can have overhead. Tests revealed that there is no - * noticeable overhead in doing it after continuously processing 100 or so - * changes. - */ -#define CHANGES_THRESHOLD 100 - - /* - * If we are at the end of transaction LSN, update progress tracking. - * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we - * try to send a keepalive message if required. - */ - if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD) - { - OutputPluginUpdateProgress(ctx, skipped_xact); - changes_count = 0; - } -} diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index e5db041df18..215d1494e90 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -526,6 +526,12 @@ typedef void (*ReorderBufferStreamTruncateCB) ( Relation relations[], ReorderBufferChange *change); +/* update progress txn callback signature */ +typedef void (*ReorderBufferUpdateProgressTxnCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr lsn); + struct ReorderBuffer { /* @@ -589,6 +595,12 @@ struct ReorderBuffer ReorderBufferStreamMessageCB stream_message; ReorderBufferStreamTruncateCB stream_truncate; + /* + * Callback to be called when updating progress during sending data of a + * transaction (and its subtransactions) to the output plugin. + */ + ReorderBufferUpdateProgressTxnCB update_progress_txn; + /* * Pointer that will be passed untouched to the callbacks. */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 07fbb7ccf6e..d3224dfc36e 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2311,6 +2311,7 @@ ReorderBufferToastEnt ReorderBufferTupleBuf ReorderBufferTupleCidEnt ReorderBufferTupleCidKey +ReorderBufferUpdateProgressTxnCB ReorderTuple RepOriginId ReparameterizeForeignPathByChild_function