diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 886e626be80..370cdc2e1a1 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2708,10 +2708,10 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
stream_bytesbigint
- Amount of decoded in-progress transaction data streamed to the decoding
- output plugin while decoding changes from WAL for this slot. This and other
- streaming counters for this slot can be used to gauge the network I/O which
- occurred during logical decoding and allow tuning logical_decoding_work_mem.
+ Amount of transaction data decoded for streaming in-progress
+ transactions to the decoding output plugin while decoding changes from
+ WAL for this slot. This and other streaming counters for this slot can
+ be used to tune logical_decoding_work_mem.
@@ -2733,10 +2733,9 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
total_bytesbigint
- Amount of decoded transaction data sent to the decoding output plugin
- while decoding the changes from WAL for this slot. This can be used to
- gauge the total amount of data sent during logical decoding. Note that
- this includes data that is streamed and/or spilled.
+ Amount of transaction data decoded for sending transactions to the
+ decoding output plugin while decoding changes from WAL for this slot.
+ Note that this includes data that is streamed and/or spilled.
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 7924581cdcd..888e064ec0f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -746,9 +746,10 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
}
/*
- * Update the decoding stats at transaction prepare/commit/abort. It is
- * not clear that sending more or less frequently than this would be
- * better.
+ * Update the decoding stats at transaction prepare/commit/abort.
+ * Additionally we send the stats when we spill or stream the changes to
+ * avoid losing them in case the decoding is interrupted. It is not clear
+ * that sending more or less frequently than this would be better.
*/
UpdateDecodingStats(ctx);
}
@@ -828,9 +829,10 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
/*
- * Update the decoding stats at transaction prepare/commit/abort. It is
- * not clear that sending more or less frequently than this would be
- * better.
+ * Update the decoding stats at transaction prepare/commit/abort.
+ * Additionally we send the stats when we spill or stream the changes to
+ * avoid losing them in case the decoding is interrupted. It is not clear
+ * that sending more or less frequently than this would be better.
*/
UpdateDecodingStats(ctx);
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c79425fbb73..e80a195472e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -3559,6 +3559,9 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* don't consider already serialized transactions */
rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
+
+ /* update the decoding stats */
+ UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
}
Assert(spilled == txn->nentries_mem);
@@ -3928,6 +3931,9 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* Don't consider already streamed transaction. */
rb->streamTxns += (txn_is_streamed) ? 0 : 1;
+ /* update the decoding stats */
+ UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
+
Assert(dlist_is_empty(&txn->changes));
Assert(txn->nentries == 0);
Assert(txn->nentries_mem == 0);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index bfab8303ee7..53cdfa5d88f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -617,14 +617,14 @@ struct ReorderBuffer
/* Statistics about transactions streamed to the decoding output plugin */
int64 streamTxns; /* number of transactions streamed */
int64 streamCount; /* streaming invocation counter */
- int64 streamBytes; /* amount of data streamed */
+ int64 streamBytes; /* amount of data decoded */
/*
* Statistics about all the transactions sent to the decoding output
* plugin
*/
int64 totalTxns; /* total number of transactions sent */
- int64 totalBytes; /* total amount of data sent */
+ int64 totalBytes; /* total amount of data decoded */
};