mirror of
https://github.com/postgres/postgres.git
synced 2025-11-18 02:02:55 +03:00
Add mem_exceeded_count column to pg_stat_replication_slots.
This commit introduces a new column mem_exceeded_count to the pg_stat_replication_slots view. This counter tracks how often the memory used by logical decoding exceeds the logical_decoding_work_mem limit. The new statistic helps users determine whether exceeding the logical_decoding_work_mem limit is a rare occurrences or a frequent issue, information that wasn't available through existing statistics. Bumps catversion. Author: Bertrand Drouvot <bertranddrouvot.pg@gmail.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: shveta malik <shveta.malik@gmail.com> Reviewed-by: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com> Reviewed-by: Chao Li <li.evan.chao@gmail.com> Discussion: https://postgr.es/m/978D21E8-9D3B-40EA-A4B1-F87BABE7868C@yesql.se
This commit is contained in:
@@ -1063,6 +1063,7 @@ CREATE VIEW pg_stat_replication_slots AS
|
||||
s.stream_txns,
|
||||
s.stream_count,
|
||||
s.stream_bytes,
|
||||
s.mem_exceeded_count,
|
||||
s.total_txns,
|
||||
s.total_bytes,
|
||||
s.stats_reset
|
||||
|
||||
@@ -1955,10 +1955,11 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
|
||||
PgStat_StatReplSlotEntry repSlotStat;
|
||||
|
||||
/* Nothing to do if we don't have any replication stats to be sent. */
|
||||
if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
|
||||
if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0 &&
|
||||
rb->memExceededCount <= 0)
|
||||
return;
|
||||
|
||||
elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
|
||||
elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
|
||||
rb,
|
||||
rb->spillTxns,
|
||||
rb->spillCount,
|
||||
@@ -1966,6 +1967,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
|
||||
rb->streamTxns,
|
||||
rb->streamCount,
|
||||
rb->streamBytes,
|
||||
rb->memExceededCount,
|
||||
rb->totalTxns,
|
||||
rb->totalBytes);
|
||||
|
||||
@@ -1975,6 +1977,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
|
||||
repSlotStat.stream_txns = rb->streamTxns;
|
||||
repSlotStat.stream_count = rb->streamCount;
|
||||
repSlotStat.stream_bytes = rb->streamBytes;
|
||||
repSlotStat.mem_exceeded_count = rb->memExceededCount;
|
||||
repSlotStat.total_txns = rb->totalTxns;
|
||||
repSlotStat.total_bytes = rb->totalBytes;
|
||||
|
||||
@@ -1986,6 +1989,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
|
||||
rb->streamTxns = 0;
|
||||
rb->streamCount = 0;
|
||||
rb->streamBytes = 0;
|
||||
rb->memExceededCount = 0;
|
||||
rb->totalTxns = 0;
|
||||
rb->totalBytes = 0;
|
||||
}
|
||||
|
||||
@@ -390,6 +390,7 @@ ReorderBufferAllocate(void)
|
||||
buffer->streamTxns = 0;
|
||||
buffer->streamCount = 0;
|
||||
buffer->streamBytes = 0;
|
||||
buffer->memExceededCount = 0;
|
||||
buffer->totalTxns = 0;
|
||||
buffer->totalBytes = 0;
|
||||
|
||||
@@ -3898,14 +3899,26 @@ static void
|
||||
ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
|
||||
{
|
||||
ReorderBufferTXN *txn;
|
||||
bool update_stats = true;
|
||||
|
||||
/*
|
||||
* Bail out if debug_logical_replication_streaming is buffered and we
|
||||
* haven't exceeded the memory limit.
|
||||
*/
|
||||
if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED &&
|
||||
rb->size < logical_decoding_work_mem * (Size) 1024)
|
||||
if (rb->size >= logical_decoding_work_mem * (Size) 1024)
|
||||
{
|
||||
/*
|
||||
* Update the statistics as the memory usage has reached the limit. We
|
||||
* report the statistics update later in this function since we can
|
||||
* update the slot statistics altogether while streaming or
|
||||
* serializing transactions in most cases.
|
||||
*/
|
||||
rb->memExceededCount += 1;
|
||||
}
|
||||
else if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED)
|
||||
{
|
||||
/*
|
||||
* Bail out if debug_logical_replication_streaming is buffered and we
|
||||
* haven't exceeded the memory limit.
|
||||
*/
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* If debug_logical_replication_streaming is immediate, loop until there's
|
||||
@@ -3965,8 +3978,17 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
|
||||
*/
|
||||
Assert(txn->size == 0);
|
||||
Assert(txn->nentries_mem == 0);
|
||||
|
||||
/*
|
||||
* We've reported the memExceededCount update while streaming or
|
||||
* serializing the transaction.
|
||||
*/
|
||||
update_stats = false;
|
||||
}
|
||||
|
||||
if (update_stats)
|
||||
UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
|
||||
|
||||
/* We must be under the memory limit now. */
|
||||
Assert(rb->size < logical_decoding_work_mem * (Size) 1024);
|
||||
}
|
||||
|
||||
@@ -94,6 +94,7 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re
|
||||
REPLSLOT_ACC(stream_txns);
|
||||
REPLSLOT_ACC(stream_count);
|
||||
REPLSLOT_ACC(stream_bytes);
|
||||
REPLSLOT_ACC(mem_exceeded_count);
|
||||
REPLSLOT_ACC(total_txns);
|
||||
REPLSLOT_ACC(total_bytes);
|
||||
#undef REPLSLOT_ACC
|
||||
|
||||
@@ -2121,7 +2121,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
|
||||
Datum
|
||||
pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
|
||||
{
|
||||
#define PG_STAT_GET_REPLICATION_SLOT_COLS 10
|
||||
#define PG_STAT_GET_REPLICATION_SLOT_COLS 11
|
||||
text *slotname_text = PG_GETARG_TEXT_P(0);
|
||||
NameData slotname;
|
||||
TupleDesc tupdesc;
|
||||
@@ -2146,11 +2146,13 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
|
||||
INT8OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 7, "stream_bytes",
|
||||
INT8OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 8, "total_txns",
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 8, "mem_exceeded_count",
|
||||
INT8OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes",
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_txns",
|
||||
INT8OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_bytes",
|
||||
INT8OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
|
||||
TIMESTAMPTZOID, -1, 0);
|
||||
BlessTupleDesc(tupdesc);
|
||||
|
||||
@@ -2173,13 +2175,14 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
|
||||
values[4] = Int64GetDatum(slotent->stream_txns);
|
||||
values[5] = Int64GetDatum(slotent->stream_count);
|
||||
values[6] = Int64GetDatum(slotent->stream_bytes);
|
||||
values[7] = Int64GetDatum(slotent->total_txns);
|
||||
values[8] = Int64GetDatum(slotent->total_bytes);
|
||||
values[7] = Int64GetDatum(slotent->mem_exceeded_count);
|
||||
values[8] = Int64GetDatum(slotent->total_txns);
|
||||
values[9] = Int64GetDatum(slotent->total_bytes);
|
||||
|
||||
if (slotent->stat_reset_timestamp == 0)
|
||||
nulls[9] = true;
|
||||
nulls[10] = true;
|
||||
else
|
||||
values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
|
||||
values[10] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
|
||||
|
||||
/* Returns the record as Datum */
|
||||
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
|
||||
|
||||
Reference in New Issue
Block a user