1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-30 11:03:19 +03:00

Add contrib/pg_logicalinspect.

This module provides SQL functions that allow to inspect logical
decoding components.

It currently allows to inspect the contents of serialized logical
snapshots of a running database cluster, which is useful for debugging
or educational purposes.

Author: Bertrand Drouvot
Reviewed-by: Amit Kapila, Shveta Malik, Peter Smith, Peter Eisentraut
Reviewed-by: David G. Johnston
Discussion: https://postgr.es/m/ZscuZ92uGh3wm4tW%40ip-10-97-1-34.eu-west-3.compute.internal
This commit is contained in:
Masahiko Sawada
2024-10-14 17:22:02 -07:00
parent e2fd615ecc
commit 7cdfeee320
18 changed files with 598 additions and 39 deletions

View File

@ -32,6 +32,7 @@ SUBDIRS = \
passwordcheck \
pg_buffercache \
pg_freespacemap \
pg_logicalinspect \
pg_prewarm \
pg_stat_statements \
pg_surgery \

View File

@ -46,6 +46,7 @@ subdir('passwordcheck')
subdir('pg_buffercache')
subdir('pgcrypto')
subdir('pg_freespacemap')
subdir('pg_logicalinspect')
subdir('pg_prewarm')
subdir('pgrowlocks')
subdir('pg_stat_statements')

6
contrib/pg_logicalinspect/.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
# Generated subdirectories
/log/
/results/
/output_iso/
/tmp_check/
/tmp_check_iso/

View File

@ -0,0 +1,31 @@
# contrib/pg_logicalinspect/Makefile
MODULE_big = pg_logicalinspect
OBJS = \
$(WIN32RES) \
pg_logicalinspect.o
PGFILEDESC = "pg_logicalinspect - functions to inspect logical decoding components"
EXTENSION = pg_logicalinspect
DATA = pg_logicalinspect--1.0.sql
EXTRA_INSTALL = contrib/test_decoding
ISOLATION = logical_inspect
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/pg_logicalinspect/logicalinspect.conf
# Disabled because these tests require "wal_level=logical", which
# some installcheck users do not have (e.g. buildfarm clients).
NO_INSTALLCHECK = 1
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
else
subdir = contrib/pg_logicalinspect
top_builddir = ../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif

View File

@ -0,0 +1,52 @@
Parsed test spec with 2 sessions
starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes s1_get_logical_snapshot_info s1_get_logical_snapshot_meta
step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
?column?
--------
init
(1 row)
step s0_begin: BEGIN;
step s0_savepoint: SAVEPOINT sp1;
step s0_truncate: TRUNCATE tbl1;
step s1_checkpoint: CHECKPOINT;
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
data
----
(0 rows)
step s0_commit: COMMIT;
step s0_begin: BEGIN;
step s0_insert: INSERT INTO tbl1 VALUES (1);
step s1_checkpoint: CHECKPOINT;
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
data
---------------------------------------
BEGIN
table public.tbl1: TRUNCATE: (no-flags)
COMMIT
(3 rows)
step s0_commit: COMMIT;
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
data
-------------------------------------------------------------
BEGIN
table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
COMMIT
(3 rows)
step s1_get_logical_snapshot_info: SELECT info.state, info.catchange_count, array_length(info.catchange_xip,1) AS catchange_array_length, info.committed_count, array_length(info.committed_xip,1) AS committed_array_length FROM pg_ls_logicalsnapdir(), pg_get_logical_snapshot_info(name) AS info ORDER BY 2;
state |catchange_count|catchange_array_length|committed_count|committed_array_length
----------+---------------+----------------------+---------------+----------------------
consistent| 0| | 2| 2
consistent| 2| 2| 0|
(2 rows)
step s1_get_logical_snapshot_meta: SELECT COUNT(meta.*) from pg_ls_logicalsnapdir(), pg_get_logical_snapshot_meta(name) as meta;
count
-----
2
(1 row)

View File

@ -0,0 +1 @@
wal_level = logical

View File

@ -0,0 +1,39 @@
# Copyright (c) 2024, PostgreSQL Global Development Group
pg_logicalinspect_sources = files('pg_logicalinspect.c')
if host_system == 'windows'
pg_logicalinspect_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
'--NAME', 'pg_logicalinspect',
'--FILEDESC', 'pg_logicalinspect - functions to inspect logical decoding components',])
endif
pg_logicalinspect = shared_module('pg_logicalinspect',
pg_logicalinspect_sources,
kwargs: contrib_mod_args + {
'dependencies': contrib_mod_args['dependencies'],
},
)
contrib_targets += pg_logicalinspect
install_data(
'pg_logicalinspect.control',
'pg_logicalinspect--1.0.sql',
kwargs: contrib_data_args,
)
tests += {
'name': 'pg_logicalinspect',
'sd': meson.current_source_dir(),
'bd': meson.current_build_dir(),
'isolation': {
'specs': [
'logical_inspect',
],
'regress_args': [
'--temp-config', files('logicalinspect.conf'),
],
# see above
'runningcheck': false,
},
}

View File

@ -0,0 +1,43 @@
/* contrib/pg_logicalinspect/pg_logicalinspect--1.0.sql */
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION pg_logicalinspect" to load this file. \quit
--
-- pg_get_logical_snapshot_meta()
--
CREATE FUNCTION pg_get_logical_snapshot_meta(IN filename text,
OUT magic int4,
OUT checksum int8,
OUT version int4
)
AS 'MODULE_PATHNAME', 'pg_get_logical_snapshot_meta'
LANGUAGE C STRICT PARALLEL SAFE;
REVOKE EXECUTE ON FUNCTION pg_get_logical_snapshot_meta(text) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION pg_get_logical_snapshot_meta(text) TO pg_read_server_files;
--
-- pg_get_logical_snapshot_info()
--
CREATE FUNCTION pg_get_logical_snapshot_info(IN filename text,
OUT state text,
OUT xmin xid,
OUT xmax xid,
OUT start_decoding_at pg_lsn,
OUT two_phase_at pg_lsn,
OUT initial_xmin_horizon xid,
OUT building_full_snapshot boolean,
OUT in_slot_creation boolean,
OUT last_serialized_snapshot pg_lsn,
OUT next_phase_at xid,
OUT committed_count int4,
OUT committed_xip xid[],
OUT catchange_count int4,
OUT catchange_xip xid[]
)
AS 'MODULE_PATHNAME', 'pg_get_logical_snapshot_info'
LANGUAGE C STRICT PARALLEL SAFE;
REVOKE EXECUTE ON FUNCTION pg_get_logical_snapshot_info(text) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION pg_get_logical_snapshot_info(text) TO pg_read_server_files;

View File

@ -0,0 +1,167 @@
/*-------------------------------------------------------------------------
*
* pg_logicalinspect.c
* Functions to inspect contents of PostgreSQL logical snapshots
*
* Copyright (c) 2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* contrib/pg_logicalinspect/pg_logicalinspect.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "funcapi.h"
#include "replication/snapbuild_internal.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(pg_get_logical_snapshot_meta);
PG_FUNCTION_INFO_V1(pg_get_logical_snapshot_info);
/* Return the description of SnapBuildState */
static const char *
get_snapbuild_state_desc(SnapBuildState state)
{
const char *stateDesc = "unknown state";
switch (state)
{
case SNAPBUILD_START:
stateDesc = "start";
break;
case SNAPBUILD_BUILDING_SNAPSHOT:
stateDesc = "building";
break;
case SNAPBUILD_FULL_SNAPSHOT:
stateDesc = "full";
break;
case SNAPBUILD_CONSISTENT:
stateDesc = "consistent";
break;
}
return stateDesc;
}
/*
* Retrieve the logical snapshot file metadata.
*/
Datum
pg_get_logical_snapshot_meta(PG_FUNCTION_ARGS)
{
#define PG_GET_LOGICAL_SNAPSHOT_META_COLS 3
SnapBuildOnDisk ondisk;
HeapTuple tuple;
Datum values[PG_GET_LOGICAL_SNAPSHOT_META_COLS] = {0};
bool nulls[PG_GET_LOGICAL_SNAPSHOT_META_COLS] = {0};
TupleDesc tupdesc;
char path[MAXPGPATH];
int i = 0;
text *filename_t = PG_GETARG_TEXT_PP(0);
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
sprintf(path, "%s/%s",
PG_LOGICAL_SNAPSHOTS_DIR,
text_to_cstring(filename_t));
/* Validate and restore the snapshot to 'ondisk' */
SnapBuildRestoreSnapshot(&ondisk, path, CurrentMemoryContext, false);
values[i++] = UInt32GetDatum(ondisk.magic);
values[i++] = Int64GetDatum((int64) ondisk.checksum);
values[i++] = UInt32GetDatum(ondisk.version);
Assert(i == PG_GET_LOGICAL_SNAPSHOT_META_COLS);
tuple = heap_form_tuple(tupdesc, values, nulls);
PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
#undef PG_GET_LOGICAL_SNAPSHOT_META_COLS
}
Datum
pg_get_logical_snapshot_info(PG_FUNCTION_ARGS)
{
#define PG_GET_LOGICAL_SNAPSHOT_INFO_COLS 14
SnapBuildOnDisk ondisk;
HeapTuple tuple;
Datum values[PG_GET_LOGICAL_SNAPSHOT_INFO_COLS] = {0};
bool nulls[PG_GET_LOGICAL_SNAPSHOT_INFO_COLS] = {0};
TupleDesc tupdesc;
char path[MAXPGPATH];
int i = 0;
text *filename_t = PG_GETARG_TEXT_PP(0);
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
sprintf(path, "%s/%s",
PG_LOGICAL_SNAPSHOTS_DIR,
text_to_cstring(filename_t));
/* Validate and restore the snapshot to 'ondisk' */
SnapBuildRestoreSnapshot(&ondisk, path, CurrentMemoryContext, false);
values[i++] = CStringGetTextDatum(get_snapbuild_state_desc(ondisk.builder.state));
values[i++] = TransactionIdGetDatum(ondisk.builder.xmin);
values[i++] = TransactionIdGetDatum(ondisk.builder.xmax);
values[i++] = LSNGetDatum(ondisk.builder.start_decoding_at);
values[i++] = LSNGetDatum(ondisk.builder.two_phase_at);
values[i++] = TransactionIdGetDatum(ondisk.builder.initial_xmin_horizon);
values[i++] = BoolGetDatum(ondisk.builder.building_full_snapshot);
values[i++] = BoolGetDatum(ondisk.builder.in_slot_creation);
values[i++] = LSNGetDatum(ondisk.builder.last_serialized_snapshot);
values[i++] = TransactionIdGetDatum(ondisk.builder.next_phase_at);
values[i++] = UInt32GetDatum(ondisk.builder.committed.xcnt);
if (ondisk.builder.committed.xcnt > 0)
{
Datum *arrayelems;
arrayelems = (Datum *) palloc(ondisk.builder.committed.xcnt * sizeof(Datum));
for (int j = 0; j < ondisk.builder.committed.xcnt; j++)
arrayelems[j] = TransactionIdGetDatum(ondisk.builder.committed.xip[j]);
values[i++] = PointerGetDatum(construct_array_builtin(arrayelems,
ondisk.builder.committed.xcnt,
XIDOID));
}
else
nulls[i++] = true;
values[i++] = UInt32GetDatum(ondisk.builder.catchange.xcnt);
if (ondisk.builder.catchange.xcnt > 0)
{
Datum *arrayelems;
arrayelems = (Datum *) palloc(ondisk.builder.catchange.xcnt * sizeof(Datum));
for (int j = 0; j < ondisk.builder.catchange.xcnt; j++)
arrayelems[j] = TransactionIdGetDatum(ondisk.builder.catchange.xip[j]);
values[i++] = PointerGetDatum(construct_array_builtin(arrayelems,
ondisk.builder.catchange.xcnt,
XIDOID));
}
else
nulls[i++] = true;
Assert(i == PG_GET_LOGICAL_SNAPSHOT_INFO_COLS);
tuple = heap_form_tuple(tupdesc, values, nulls);
PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
#undef PG_GET_LOGICAL_SNAPSHOT_INFO_COLS
}

View File

@ -0,0 +1,5 @@
# pg_logicalinspect extension
comment = 'functions to inspect logical decoding components'
default_version = '1.0'
module_pathname = '$libdir/pg_logicalinspect'
relocatable = true

View File

@ -0,0 +1,34 @@
# Test the pg_logicalinspect functions: that needs some permutation to
# ensure that we are creating multiple logical snapshots and that one of them
# contains ongoing catalogs changes.
setup
{
DROP TABLE IF EXISTS tbl1;
CREATE TABLE tbl1 (val1 integer, val2 integer);
CREATE EXTENSION pg_logicalinspect;
}
teardown
{
DROP TABLE tbl1;
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
DROP EXTENSION pg_logicalinspect;
}
session "s0"
setup { SET synchronous_commit=on; }
step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
step "s0_begin" { BEGIN; }
step "s0_savepoint" { SAVEPOINT sp1; }
step "s0_truncate" { TRUNCATE tbl1; }
step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
step "s0_commit" { COMMIT; }
session "s1"
setup { SET synchronous_commit=on; }
step "s1_checkpoint" { CHECKPOINT; }
step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
step "s1_get_logical_snapshot_meta" { SELECT COUNT(meta.*) from pg_ls_logicalsnapdir(), pg_get_logical_snapshot_meta(name) as meta;}
step "s1_get_logical_snapshot_info" { SELECT info.state, info.catchange_count, array_length(info.catchange_xip,1) AS catchange_array_length, info.committed_count, array_length(info.committed_xip,1) AS committed_array_length FROM pg_ls_logicalsnapdir(), pg_get_logical_snapshot_info(name) AS info ORDER BY 2; }
permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" "s1_get_logical_snapshot_info" "s1_get_logical_snapshot_meta"