mirror of
https://github.com/postgres/postgres.git
synced 2025-04-29 13:56:47 +03:00
Reported-by: Simon Riggs Author: Takamichi Osumi Reviewed-by: Amit Kapila Backpatch-through: 9.6 Discussion: https://www.postgresql.org/message-id/20210222222847.tpnb6eg3yiykzpky@alap3.anarazel.de
1328 lines
57 KiB
Plaintext
1328 lines
57 KiB
Plaintext
<!-- doc/src/sgml/logicaldecoding.sgml -->
|
|
<chapter id="logicaldecoding">
|
|
<title>Logical Decoding</title>
|
|
<indexterm zone="logicaldecoding">
|
|
<primary>Logical Decoding</primary>
|
|
</indexterm>
|
|
<para>
|
|
PostgreSQL provides infrastructure to stream the modifications performed
|
|
via SQL to external consumers. This functionality can be used for a
|
|
variety of purposes, including replication solutions and auditing.
|
|
</para>
|
|
|
|
<para>
|
|
Changes are sent out in streams identified by logical replication slots.
|
|
</para>
|
|
|
|
<para>
|
|
The format in which those changes are streamed is determined by the output
|
|
plugin used. An example plugin is provided in the PostgreSQL distribution.
|
|
Additional plugins can be
|
|
written to extend the choice of available formats without modifying any
|
|
core code.
|
|
Every output plugin has access to each individual new row produced
|
|
by <command>INSERT</command> and the new row version created
|
|
by <command>UPDATE</command>. Availability of old row versions for
|
|
<command>UPDATE</command> and <command>DELETE</command> depends on
|
|
the configured replica identity (see <xref linkend="sql-altertable-replica-identity"/>).
|
|
</para>
|
|
|
|
<para>
|
|
Changes can be consumed either using the streaming replication protocol
|
|
(see <xref linkend="protocol-replication"/> and
|
|
<xref linkend="logicaldecoding-walsender"/>), or by calling functions
|
|
via SQL (see <xref linkend="logicaldecoding-sql"/>). It is also possible
|
|
to write additional methods of consuming the output of a replication slot
|
|
without modifying core code
|
|
(see <xref linkend="logicaldecoding-writer"/>).
|
|
</para>
|
|
|
|
<sect1 id="logicaldecoding-example">
|
|
<title>Logical Decoding Examples</title>
|
|
|
|
<para>
|
|
The following example demonstrates controlling logical decoding using the
|
|
SQL interface.
|
|
</para>
|
|
|
|
<para>
|
|
Before you can use logical decoding, you must set
|
|
<xref linkend="guc-wal-level"/> to <literal>logical</literal> and
|
|
<xref linkend="guc-max-replication-slots"/> to at least 1. Then, you
|
|
should connect to the target database (in the example
|
|
below, <literal>postgres</literal>) as a superuser.
|
|
</para>
|
|
|
|
<programlisting>
|
|
postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding'
|
|
postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
|
|
slot_name | lsn
|
|
-----------------+-----------
|
|
regression_slot | 0/16B1970
|
|
(1 row)
|
|
|
|
postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
|
|
slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn
|
|
-----------------+---------------+-----------+----------+--------+-------------+-----------------
|
|
regression_slot | test_decoding | logical | postgres | f | 0/16A4408 | 0/16A4440
|
|
(1 row)
|
|
|
|
postgres=# -- There are no changes to see yet
|
|
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
|
|
lsn | xid | data
|
|
-----+-----+------
|
|
(0 rows)
|
|
|
|
postgres=# CREATE TABLE data(id serial primary key, data text);
|
|
CREATE TABLE
|
|
|
|
postgres=# -- DDL isn't replicated, so all you'll see is the transaction
|
|
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
|
|
lsn | xid | data
|
|
-----------+-------+--------------
|
|
0/BA2DA58 | 10297 | BEGIN 10297
|
|
0/BA5A5A0 | 10297 | COMMIT 10297
|
|
(2 rows)
|
|
|
|
postgres=# -- Once changes are read, they're consumed and not emitted
|
|
postgres=# -- in a subsequent call:
|
|
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
|
|
lsn | xid | data
|
|
-----+-----+------
|
|
(0 rows)
|
|
|
|
postgres=# BEGIN;
|
|
postgres=*# INSERT INTO data(data) VALUES('1');
|
|
postgres=*# INSERT INTO data(data) VALUES('2');
|
|
postgres=*# COMMIT;
|
|
|
|
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
|
|
lsn | xid | data
|
|
-----------+-------+---------------------------------------------------------
|
|
0/BA5A688 | 10298 | BEGIN 10298
|
|
0/BA5A6F0 | 10298 | table public.data: INSERT: id[integer]:1 data[text]:'1'
|
|
0/BA5A7F8 | 10298 | table public.data: INSERT: id[integer]:2 data[text]:'2'
|
|
0/BA5A8A8 | 10298 | COMMIT 10298
|
|
(4 rows)
|
|
|
|
postgres=# INSERT INTO data(data) VALUES('3');
|
|
|
|
postgres=# -- You can also peek ahead in the change stream without consuming changes
|
|
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
|
|
lsn | xid | data
|
|
-----------+-------+---------------------------------------------------------
|
|
0/BA5A8E0 | 10299 | BEGIN 10299
|
|
0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
|
|
0/BA5A990 | 10299 | COMMIT 10299
|
|
(3 rows)
|
|
|
|
postgres=# -- The next call to pg_logical_slot_peek_changes() returns the same changes again
|
|
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
|
|
lsn | xid | data
|
|
-----------+-------+---------------------------------------------------------
|
|
0/BA5A8E0 | 10299 | BEGIN 10299
|
|
0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
|
|
0/BA5A990 | 10299 | COMMIT 10299
|
|
(3 rows)
|
|
|
|
postgres=# -- options can be passed to output plugin, to influence the formatting
|
|
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on');
|
|
lsn | xid | data
|
|
-----------+-------+---------------------------------------------------------
|
|
0/BA5A8E0 | 10299 | BEGIN 10299
|
|
0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
|
|
0/BA5A990 | 10299 | COMMIT 10299 (at 2017-05-10 12:07:21.272494-04)
|
|
(3 rows)
|
|
|
|
postgres=# -- Remember to destroy a slot you no longer need to stop it consuming
|
|
postgres=# -- server resources:
|
|
postgres=# SELECT pg_drop_replication_slot('regression_slot');
|
|
pg_drop_replication_slot
|
|
-----------------------
|
|
|
|
(1 row)
|
|
</programlisting>
|
|
|
|
<para>
|
|
The following example shows how logical decoding is controlled over the
|
|
streaming replication protocol, using the
|
|
program <xref linkend="app-pgrecvlogical"/> included in the PostgreSQL
|
|
distribution. This requires that client authentication is set up to allow
|
|
replication connections
|
|
(see <xref linkend="streaming-replication-authentication"/>) and
|
|
that <varname>max_wal_senders</varname> is set sufficiently high to allow
|
|
an additional connection.
|
|
</para>
|
|
<programlisting>
|
|
$ pg_recvlogical -d postgres --slot=test --create-slot
|
|
$ pg_recvlogical -d postgres --slot=test --start -f -
|
|
<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
|
|
$ psql -d postgres -c "INSERT INTO data(data) VALUES('4');"
|
|
$ fg
|
|
BEGIN 693
|
|
table public.data: INSERT: id[integer]:4 data[text]:'4'
|
|
COMMIT 693
|
|
<keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
|
|
$ pg_recvlogical -d postgres --slot=test --drop-slot
|
|
</programlisting>
|
|
|
|
<para>
|
|
The following example shows SQL interface that can be used to decode prepared
|
|
transactions. Before you use two-phase commit commands, you must set
|
|
<varname>max_prepared_transactions</varname> to at least 1. You must also have
|
|
set the two-phase parameter as 'true' while creating the slot using
|
|
<function>pg_create_logical_replication_slot</function>
|
|
Note that we will stream the entire transaction after the commit if it
|
|
is not already decoded.
|
|
</para>
|
|
<programlisting>
|
|
postgres=# BEGIN;
|
|
postgres=*# INSERT INTO data(data) VALUES('5');
|
|
postgres=*# PREPARE TRANSACTION 'test_prepared1';
|
|
|
|
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
|
|
lsn | xid | data
|
|
-----------+-----+---------------------------------------------------------
|
|
0/1689DC0 | 529 | BEGIN 529
|
|
0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
|
|
0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
|
|
(3 rows)
|
|
|
|
postgres=# COMMIT PREPARED 'test_prepared1';
|
|
postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
|
|
lsn | xid | data
|
|
-----------+-----+--------------------------------------------
|
|
0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
|
|
(4 row)
|
|
|
|
postgres=#-- you can also rollback a prepared transaction
|
|
postgres=# BEGIN;
|
|
postgres=*# INSERT INTO data(data) VALUES('6');
|
|
postgres=*# PREPARE TRANSACTION 'test_prepared2';
|
|
postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
|
|
lsn | xid | data
|
|
-----------+-----+---------------------------------------------------------
|
|
0/168A180 | 530 | BEGIN 530
|
|
0/168A1E8 | 530 | table public.data: INSERT: id[integer]:4 data[text]:'6'
|
|
0/168A430 | 530 | PREPARE TRANSACTION 'test_prepared2', txid 530
|
|
(3 rows)
|
|
|
|
postgres=# ROLLBACK PREPARED 'test_prepared2';
|
|
postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
|
|
lsn | xid | data
|
|
-----------+-----+----------------------------------------------
|
|
0/168A4B8 | 530 | ROLLBACK PREPARED 'test_prepared2', txid 530
|
|
(1 row)
|
|
</programlisting>
|
|
</sect1>
|
|
|
|
<sect1 id="logicaldecoding-explanation">
|
|
<title>Logical Decoding Concepts</title>
|
|
<sect2>
|
|
<title>Logical Decoding</title>
|
|
|
|
<indexterm>
|
|
<primary>Logical Decoding</primary>
|
|
</indexterm>
|
|
|
|
<para>
|
|
Logical decoding is the process of extracting all persistent changes
|
|
to a database's tables into a coherent, easy to understand format which
|
|
can be interpreted without detailed knowledge of the database's internal
|
|
state.
|
|
</para>
|
|
|
|
<para>
|
|
In <productname>PostgreSQL</productname>, logical decoding is implemented
|
|
by decoding the contents of the <link linkend="wal">write-ahead
|
|
log</link>, which describe changes on a storage level, into an
|
|
application-specific form such as a stream of tuples or SQL statements.
|
|
</para>
|
|
</sect2>
|
|
|
|
<sect2 id="logicaldecoding-replication-slots">
|
|
<title>Replication Slots</title>
|
|
|
|
<indexterm>
|
|
<primary>replication slot</primary>
|
|
<secondary>logical replication</secondary>
|
|
</indexterm>
|
|
|
|
<para>
|
|
In the context of logical replication, a slot represents a stream of
|
|
changes that can be replayed to a client in the order they were made on
|
|
the origin server. Each slot streams a sequence of changes from a single
|
|
database.
|
|
</para>
|
|
|
|
<note>
|
|
<para><productname>PostgreSQL</productname> also has streaming replication slots
|
|
(see <xref linkend="streaming-replication"/>), but they are used somewhat
|
|
differently there.
|
|
</para>
|
|
</note>
|
|
|
|
<para>
|
|
A replication slot has an identifier that is unique across all databases
|
|
in a <productname>PostgreSQL</productname> cluster. Slots persist
|
|
independently of the connection using them and are crash-safe.
|
|
</para>
|
|
|
|
<para>
|
|
A logical slot will emit each change just once in normal operation.
|
|
The current position of each slot is persisted only at checkpoint, so in
|
|
the case of a crash the slot may return to an earlier LSN, which will
|
|
then cause recent changes to be sent again when the server restarts.
|
|
Logical decoding clients are responsible for avoiding ill effects from
|
|
handling the same message more than once. Clients may wish to record
|
|
the last LSN they saw when decoding and skip over any repeated data or
|
|
(when using the replication protocol) request that decoding start from
|
|
that LSN rather than letting the server determine the start point.
|
|
The Replication Progress Tracking feature is designed for this purpose,
|
|
refer to <link linkend="replication-origins">replication origins</link>.
|
|
</para>
|
|
|
|
<para>
|
|
Multiple independent slots may exist for a single database. Each slot has
|
|
its own state, allowing different consumers to receive changes from
|
|
different points in the database change stream. For most applications, a
|
|
separate slot will be required for each consumer.
|
|
</para>
|
|
|
|
<para>
|
|
A logical replication slot knows nothing about the state of the
|
|
receiver(s). It's even possible to have multiple different receivers using
|
|
the same slot at different times; they'll just get the changes following
|
|
on from when the last receiver stopped consuming them. Only one receiver
|
|
may consume changes from a slot at any given time.
|
|
</para>
|
|
|
|
<caution>
|
|
<para>
|
|
Replication slots persist across crashes and know nothing about the state
|
|
of their consumer(s). They will prevent removal of required resources
|
|
even when there is no connection using them. This consumes storage
|
|
because neither required WAL nor required rows from the system catalogs
|
|
can be removed by <command>VACUUM</command> as long as they are required by a replication
|
|
slot. In extreme cases this could cause the database to shut down to prevent
|
|
transaction ID wraparound (see <xref linkend="vacuum-for-wraparound"/>).
|
|
So if a slot is no longer required it should be dropped.
|
|
</para>
|
|
</caution>
|
|
</sect2>
|
|
|
|
<sect2>
|
|
<title>Output Plugins</title>
|
|
<para>
|
|
Output plugins transform the data from the write-ahead log's internal
|
|
representation into the format the consumer of a replication slot desires.
|
|
</para>
|
|
</sect2>
|
|
|
|
<sect2>
|
|
<title>Exported Snapshots</title>
|
|
<para>
|
|
When a new replication slot is created using the streaming replication
|
|
interface (see <xref linkend="protocol-replication-create-slot"/>), a
|
|
snapshot is exported
|
|
(see <xref linkend="functions-snapshot-synchronization"/>), which will show
|
|
exactly the state of the database after which all changes will be
|
|
included in the change stream. This can be used to create a new replica by
|
|
using <link linkend="sql-set-transaction"><literal>SET TRANSACTION
|
|
SNAPSHOT</literal></link> to read the state of the database at the moment
|
|
the slot was created. This transaction can then be used to dump the
|
|
database's state at that point in time, which afterwards can be updated
|
|
using the slot's contents without losing any changes.
|
|
</para>
|
|
<para>
|
|
Creation of a snapshot is not always possible. In particular, it will
|
|
fail when connected to a hot standby. Applications that do not require
|
|
snapshot export may suppress it with the <literal>NOEXPORT_SNAPSHOT</literal>
|
|
option.
|
|
</para>
|
|
</sect2>
|
|
</sect1>
|
|
|
|
<sect1 id="logicaldecoding-walsender">
|
|
<title>Streaming Replication Protocol Interface</title>
|
|
|
|
<para>
|
|
The commands
|
|
<itemizedlist>
|
|
<listitem>
|
|
<para><literal>CREATE_REPLICATION_SLOT <replaceable>slot_name</replaceable> LOGICAL <replaceable>output_plugin</replaceable></literal></para>
|
|
</listitem>
|
|
|
|
<listitem>
|
|
<para><literal>DROP_REPLICATION_SLOT <replaceable>slot_name</replaceable></literal> <optional> <literal>WAIT</literal> </optional></para>
|
|
</listitem>
|
|
|
|
<listitem>
|
|
<para><literal>START_REPLICATION SLOT <replaceable>slot_name</replaceable> LOGICAL ...</literal></para>
|
|
</listitem>
|
|
</itemizedlist>
|
|
are used to create, drop, and stream changes from a replication
|
|
slot, respectively. These commands are only available over a replication
|
|
connection; they cannot be used via SQL.
|
|
See <xref linkend="protocol-replication"/> for details on these commands.
|
|
</para>
|
|
|
|
<para>
|
|
The command <xref linkend="app-pgrecvlogical"/> can be used to control
|
|
logical decoding over a streaming replication connection. (It uses
|
|
these commands internally.)
|
|
</para>
|
|
</sect1>
|
|
|
|
<sect1 id="logicaldecoding-sql">
|
|
<title>Logical Decoding <acronym>SQL</acronym> Interface</title>
|
|
|
|
<para>
|
|
See <xref linkend="functions-replication"/> for detailed documentation on
|
|
the SQL-level API for interacting with logical decoding.
|
|
</para>
|
|
|
|
<para>
|
|
Synchronous replication (see <xref linkend="synchronous-replication"/>) is
|
|
only supported on replication slots used over the streaming replication interface. The
|
|
function interface and additional, non-core interfaces do not support
|
|
synchronous replication.
|
|
</para>
|
|
</sect1>
|
|
|
|
<sect1 id="logicaldecoding-catalogs">
|
|
<title>System Catalogs Related to Logical Decoding</title>
|
|
|
|
<para>
|
|
The <link linkend="view-pg-replication-slots"><structname>pg_replication_slots</structname></link>
|
|
view and the
|
|
<link linkend="monitoring-pg-stat-replication-view">
|
|
<structname>pg_stat_replication</structname></link>
|
|
view provide information about the current state of replication slots and
|
|
streaming replication connections respectively. These views apply to both physical and
|
|
logical replication. The
|
|
<link linkend="monitoring-pg-stat-replication-slots-view">
|
|
<structname>pg_stat_replication_slots</structname></link>
|
|
view provides statistics information about the logical replication slots.
|
|
</para>
|
|
</sect1>
|
|
|
|
<sect1 id="logicaldecoding-output-plugin">
|
|
<title>Logical Decoding Output Plugins</title>
|
|
<para>
|
|
An example output plugin can be found in the
|
|
<link linkend="test-decoding">
|
|
<filename>contrib/test_decoding</filename>
|
|
</link>
|
|
subdirectory of the PostgreSQL source tree.
|
|
</para>
|
|
<sect2 id="logicaldecoding-output-init">
|
|
<title>Initialization Function</title>
|
|
<indexterm zone="logicaldecoding-output-init">
|
|
<primary>_PG_output_plugin_init</primary>
|
|
</indexterm>
|
|
<para>
|
|
An output plugin is loaded by dynamically loading a shared library with
|
|
the output plugin's name as the library base name. The normal library
|
|
search path is used to locate the library. To provide the required output
|
|
plugin callbacks and to indicate that the library is actually an output
|
|
plugin it needs to provide a function named
|
|
<function>_PG_output_plugin_init</function>. This function is passed a
|
|
struct that needs to be filled with the callback function pointers for
|
|
individual actions.
|
|
<programlisting>
|
|
typedef struct OutputPluginCallbacks
|
|
{
|
|
LogicalDecodeStartupCB startup_cb;
|
|
LogicalDecodeBeginCB begin_cb;
|
|
LogicalDecodeChangeCB change_cb;
|
|
LogicalDecodeTruncateCB truncate_cb;
|
|
LogicalDecodeCommitCB commit_cb;
|
|
LogicalDecodeMessageCB message_cb;
|
|
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
|
|
LogicalDecodeShutdownCB shutdown_cb;
|
|
LogicalDecodeFilterPrepareCB filter_prepare_cb;
|
|
LogicalDecodeBeginPrepareCB begin_prepare_cb;
|
|
LogicalDecodePrepareCB prepare_cb;
|
|
LogicalDecodeCommitPreparedCB commit_prepared_cb;
|
|
LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
|
|
LogicalDecodeStreamStartCB stream_start_cb;
|
|
LogicalDecodeStreamStopCB stream_stop_cb;
|
|
LogicalDecodeStreamAbortCB stream_abort_cb;
|
|
LogicalDecodeStreamPrepareCB stream_prepare_cb;
|
|
LogicalDecodeStreamCommitCB stream_commit_cb;
|
|
LogicalDecodeStreamChangeCB stream_change_cb;
|
|
LogicalDecodeStreamMessageCB stream_message_cb;
|
|
LogicalDecodeStreamTruncateCB stream_truncate_cb;
|
|
} OutputPluginCallbacks;
|
|
|
|
typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
|
|
</programlisting>
|
|
The <function>begin_cb</function>, <function>change_cb</function>
|
|
and <function>commit_cb</function> callbacks are required,
|
|
while <function>startup_cb</function>,
|
|
<function>filter_by_origin_cb</function>, <function>truncate_cb</function>,
|
|
and <function>shutdown_cb</function> are optional.
|
|
If <function>truncate_cb</function> is not set but a
|
|
<command>TRUNCATE</command> is to be decoded, the action will be ignored.
|
|
</para>
|
|
|
|
<para>
|
|
An output plugin may also define functions to support streaming of large,
|
|
in-progress transactions. The <function>stream_start_cb</function>,
|
|
<function>stream_stop_cb</function>, <function>stream_abort_cb</function>,
|
|
<function>stream_commit_cb</function>, <function>stream_change_cb</function>,
|
|
and <function>stream_prepare_cb</function>
|
|
are required, while <function>stream_message_cb</function> and
|
|
<function>stream_truncate_cb</function> are optional.
|
|
</para>
|
|
|
|
<para>
|
|
An output plugin may also define functions to support two-phase commits,
|
|
which allows actions to be decoded on the <command>PREPARE TRANSACTION</command>.
|
|
The <function>begin_prepare_cb</function>, <function>prepare_cb</function>,
|
|
<function>stream_prepare_cb</function>,
|
|
<function>commit_prepared_cb</function> and <function>rollback_prepared_cb</function>
|
|
callbacks are required, while <function>filter_prepare_cb</function> is optional.
|
|
</para>
|
|
</sect2>
|
|
|
|
<sect2 id="logicaldecoding-capabilities">
|
|
<title>Capabilities</title>
|
|
|
|
<para>
|
|
To decode, format and output changes, output plugins can use most of the
|
|
backend's normal infrastructure, including calling output functions. Read
|
|
only access to relations is permitted as long as only relations are
|
|
accessed that either have been created by <command>initdb</command> in
|
|
the <literal>pg_catalog</literal> schema, or have been marked as user
|
|
provided catalog tables using
|
|
<programlisting>
|
|
ALTER TABLE user_catalog_table SET (user_catalog_table = true);
|
|
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
|
|
</programlisting>
|
|
Note that access to user catalog tables or regular system catalog tables
|
|
in the output plugins has to be done via the <literal>systable_*</literal>
|
|
scan APIs only. Access via the <literal>heap_*</literal> scan APIs will
|
|
error out. Additionally, any actions leading to transaction ID assignment
|
|
are prohibited. That, among others, includes writing to tables, performing
|
|
DDL changes, and calling <literal>pg_current_xact_id()</literal>.
|
|
</para>
|
|
</sect2>
|
|
|
|
<sect2 id="logicaldecoding-output-mode">
|
|
<title>Output Modes</title>
|
|
|
|
<para>
|
|
Output plugin callbacks can pass data to the consumer in nearly arbitrary
|
|
formats. For some use cases, like viewing the changes via SQL, returning
|
|
data in a data type that can contain arbitrary data (e.g., <type>bytea</type>) is
|
|
cumbersome. If the output plugin only outputs textual data in the
|
|
server's encoding, it can declare that by
|
|
setting <literal>OutputPluginOptions.output_type</literal>
|
|
to <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal> instead
|
|
of <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal> in
|
|
the <link linkend="logicaldecoding-output-plugin-startup">startup
|
|
callback</link>. In that case, all the data has to be in the server's encoding
|
|
so that a <type>text</type> datum can contain it. This is checked in assertion-enabled
|
|
builds.
|
|
</para>
|
|
</sect2>
|
|
|
|
<sect2 id="logicaldecoding-output-plugin-callbacks">
|
|
<title>Output Plugin Callbacks</title>
|
|
|
|
<para>
|
|
An output plugin gets notified about changes that are happening via
|
|
various callbacks it needs to provide.
|
|
</para>
|
|
|
|
<para>
|
|
Concurrent transactions are decoded in commit order, and only changes
|
|
belonging to a specific transaction are decoded between
|
|
the <literal>begin</literal> and <literal>commit</literal>
|
|
callbacks. Transactions that were rolled back explicitly or implicitly
|
|
never get
|
|
decoded. Successful savepoints are
|
|
folded into the transaction containing them in the order they were
|
|
executed within that transaction. A transaction that is prepared for
|
|
a two-phase commit using <command>PREPARE TRANSACTION</command> will
|
|
also be decoded if the output plugin callbacks needed for decoding
|
|
them are provided. It is possible that the current prepared transaction
|
|
which is being decoded is aborted concurrently via a
|
|
<command>ROLLBACK PREPARED</command> command. In that case, the logical
|
|
decoding of this transaction will be aborted too. All the changes of such
|
|
a transaction are skipped once the abort is detected and the
|
|
<function>prepare_cb</function> callback is invoked. Thus even in case of
|
|
a concurrent abort, enough information is provided to the output plugin
|
|
for it to properly deal with <command>ROLLBACK PREPARED</command> once
|
|
that is decoded.
|
|
</para>
|
|
|
|
<note>
|
|
<para>
|
|
Only transactions that have already safely been flushed to disk will be
|
|
decoded. That can lead to a <command>COMMIT</command> not immediately being decoded in a
|
|
directly following <literal>pg_logical_slot_get_changes()</literal>
|
|
when <varname>synchronous_commit</varname> is set
|
|
to <literal>off</literal>.
|
|
</para>
|
|
</note>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-startup">
|
|
<title>Startup Callback</title>
|
|
<para>
|
|
The optional <function>startup_cb</function> callback is called whenever
|
|
a replication slot is created or asked to stream changes, independent
|
|
of the number of changes that are ready to be put out.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
|
|
OutputPluginOptions *options,
|
|
bool is_init);
|
|
</programlisting>
|
|
The <literal>is_init</literal> parameter will be true when the
|
|
replication slot is being created and false
|
|
otherwise. <parameter>options</parameter> points to a struct of options
|
|
that output plugins can set:
|
|
<programlisting>
|
|
typedef struct OutputPluginOptions
|
|
{
|
|
OutputPluginOutputType output_type;
|
|
bool receive_rewrites;
|
|
} OutputPluginOptions;
|
|
</programlisting>
|
|
<literal>output_type</literal> has to either be set to
|
|
<literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal>
|
|
or <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal>. See also
|
|
<xref linkend="logicaldecoding-output-mode"/>.
|
|
If <literal>receive_rewrites</literal> is true, the output plugin will
|
|
also be called for changes made by heap rewrites during certain DDL
|
|
operations. These are of interest to plugins that handle DDL
|
|
replication, but they require special handling.
|
|
</para>
|
|
|
|
<para>
|
|
The startup callback should validate the options present in
|
|
<literal>ctx->output_plugin_options</literal>. If the output plugin
|
|
needs to have a state, it can
|
|
use <literal>ctx->output_plugin_private</literal> to store it.
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-shutdown">
|
|
<title>Shutdown Callback</title>
|
|
|
|
<para>
|
|
The optional <function>shutdown_cb</function> callback is called
|
|
whenever a formerly active replication slot is not used anymore and can
|
|
be used to deallocate resources private to the output plugin. The slot
|
|
isn't necessarily being dropped, streaming is just being stopped.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
|
|
</programlisting>
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-begin">
|
|
<title>Transaction Begin Callback</title>
|
|
|
|
<para>
|
|
The required <function>begin_cb</function> callback is called whenever a
|
|
start of a committed transaction has been decoded. Aborted transactions
|
|
and their contents never get decoded.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn);
|
|
</programlisting>
|
|
The <parameter>txn</parameter> parameter contains meta information about
|
|
the transaction, like the time stamp at which it has been committed and
|
|
its XID.
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-commit">
|
|
<title>Transaction End Callback</title>
|
|
|
|
<para>
|
|
The required <function>commit_cb</function> callback is called whenever
|
|
a transaction commit has been
|
|
decoded. The <function>change_cb</function> callbacks for all modified
|
|
rows will have been called before this, if there have been any modified
|
|
rows.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn,
|
|
XLogRecPtr commit_lsn);
|
|
</programlisting>
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-change">
|
|
<title>Change Callback</title>
|
|
|
|
<para>
|
|
The required <function>change_cb</function> callback is called for every
|
|
individual row modification inside a transaction, may it be
|
|
an <command>INSERT</command>, <command>UPDATE</command>,
|
|
or <command>DELETE</command>. Even if the original command modified
|
|
several rows at once the callback will be called individually for each
|
|
row. The <function>change_cb</function> callback may access system or
|
|
user catalog tables to aid in the process of outputting the row
|
|
modification details. In case of decoding a prepared (but yet
|
|
uncommitted) transaction or decoding of an uncommitted transaction, this
|
|
change callback might also error out due to simultaneous rollback of
|
|
this very same transaction. In that case, the logical decoding of this
|
|
aborted transaction is stopped gracefully.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn,
|
|
Relation relation,
|
|
ReorderBufferChange *change);
|
|
</programlisting>
|
|
The <parameter>ctx</parameter> and <parameter>txn</parameter> parameters
|
|
have the same contents as for the <function>begin_cb</function>
|
|
and <function>commit_cb</function> callbacks, but additionally the
|
|
relation descriptor <parameter>relation</parameter> points to the
|
|
relation the row belongs to and a struct
|
|
<parameter>change</parameter> describing the row modification are passed
|
|
in.
|
|
</para>
|
|
|
|
<note>
|
|
<para>
|
|
Only changes in user defined tables that are not unlogged
|
|
(see <xref linkend="sql-createtable-unlogged"/>) and not temporary
|
|
(see <xref linkend="sql-createtable-temporary"/>) can be extracted using
|
|
logical decoding.
|
|
</para>
|
|
</note>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-truncate">
|
|
<title>Truncate Callback</title>
|
|
|
|
<para>
|
|
The <function>truncate_cb</function> callback is called for a
|
|
<command>TRUNCATE</command> command.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn,
|
|
int nrelations,
|
|
Relation relations[],
|
|
ReorderBufferChange *change);
|
|
</programlisting>
|
|
The parameters are analogous to the <function>change_cb</function>
|
|
callback. However, because <command>TRUNCATE</command> actions on
|
|
tables connected by foreign keys need to be executed together, this
|
|
callback receives an array of relations instead of just a single one.
|
|
See the description of the <xref linkend="sql-truncate"/> statement for
|
|
details.
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-filter-origin">
|
|
<title>Origin Filter Callback</title>
|
|
|
|
<para>
|
|
The optional <function>filter_by_origin_cb</function> callback
|
|
is called to determine whether data that has been replayed
|
|
from <parameter>origin_id</parameter> is of interest to the
|
|
output plugin.
|
|
<programlisting>
|
|
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
|
|
RepOriginId origin_id);
|
|
</programlisting>
|
|
The <parameter>ctx</parameter> parameter has the same contents
|
|
as for the other callbacks. No information but the origin is
|
|
available. To signal that changes originating on the passed in
|
|
node are irrelevant, return true, causing them to be filtered
|
|
away; false otherwise. The other callbacks will not be called
|
|
for transactions and changes that have been filtered away.
|
|
</para>
|
|
<para>
|
|
This is useful when implementing cascading or multidirectional
|
|
replication solutions. Filtering by the origin allows to
|
|
prevent replicating the same changes back and forth in such
|
|
setups. While transactions and changes also carry information
|
|
about the origin, filtering via this callback is noticeably
|
|
more efficient.
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-message">
|
|
<title>Generic Message Callback</title>
|
|
|
|
<para>
|
|
The optional <function>message_cb</function> callback is called whenever
|
|
a logical decoding message has been decoded.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn,
|
|
XLogRecPtr message_lsn,
|
|
bool transactional,
|
|
const char *prefix,
|
|
Size message_size,
|
|
const char *message);
|
|
</programlisting>
|
|
The <parameter>txn</parameter> parameter contains meta information about
|
|
the transaction, like the time stamp at which it has been committed and
|
|
its XID. Note however that it can be NULL when the message is
|
|
non-transactional and the XID was not assigned yet in the transaction
|
|
which logged the message. The <parameter>lsn</parameter> has WAL
|
|
location of the message. The <parameter>transactional</parameter> says
|
|
if the message was sent as transactional or not. Similar to the change
|
|
callback, in case of decoding a prepared (but yet uncommitted)
|
|
transaction or decoding of an uncommitted transaction, this message
|
|
callback might also error out due to simultaneous rollback of
|
|
this very same transaction. In that case, the logical decoding of this
|
|
aborted transaction is stopped gracefully.
|
|
|
|
The <parameter>prefix</parameter> is arbitrary null-terminated prefix
|
|
which can be used for identifying interesting messages for the current
|
|
plugin. And finally the <parameter>message</parameter> parameter holds
|
|
the actual message of <parameter>message_size</parameter> size.
|
|
</para>
|
|
<para>
|
|
Extra care should be taken to ensure that the prefix the output plugin
|
|
considers interesting is unique. Using name of the extension or the
|
|
output plugin itself is often a good choice.
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-filter-prepare">
|
|
<title>Prepare Filter Callback</title>
|
|
|
|
<para>
|
|
The optional <function>filter_prepare_cb</function> callback
|
|
is called to determine whether data that is part of the current
|
|
two-phase commit transaction should be considered for decoding
|
|
at this prepare stage or later as a regular one-phase transaction at
|
|
<command>COMMIT PREPARED</command> time. To signal that
|
|
decoding should be skipped, return <literal>true</literal>;
|
|
<literal>false</literal> otherwise. When the callback is not
|
|
defined, <literal>false</literal> is assumed (i.e. no filtering, all
|
|
transactions using two-phase commit are decoded in two phases as well).
|
|
<programlisting>
|
|
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
|
|
TransactionId xid,
|
|
const char *gid);
|
|
</programlisting>
|
|
The <parameter>ctx</parameter> parameter has the same contents as for
|
|
the other callbacks. The parameters <parameter>xid</parameter>
|
|
and <parameter>gid</parameter> provide two different ways to identify
|
|
the transaction. The later <command>COMMIT PREPARED</command> or
|
|
<command>ROLLBACK PREPARED</command> carries both identifiers,
|
|
providing an output plugin the choice of what to use.
|
|
</para>
|
|
<para>
|
|
The callback may be invoked multiple times per transaction to decode
|
|
and must provide the same static answer for a given pair of
|
|
<parameter>xid</parameter> and <parameter>gid</parameter> every time
|
|
it is called.
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-begin-prepare">
|
|
<title>Transaction Begin Prepare Callback</title>
|
|
|
|
<para>
|
|
The required <function>begin_prepare_cb</function> callback is called
|
|
whenever the start of a prepared transaction has been decoded. The
|
|
<parameter>gid</parameter> field, which is part of the
|
|
<parameter>txn</parameter> parameter, can be used in this callback to
|
|
check if the plugin has already received this <command>PREPARE</command>
|
|
in which case it can either error out or skip the remaining changes of
|
|
the transaction.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn);
|
|
</programlisting>
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-prepare">
|
|
<title>Transaction Prepare Callback</title>
|
|
|
|
<para>
|
|
The required <function>prepare_cb</function> callback is called whenever
|
|
a transaction which is prepared for two-phase commit has been
|
|
decoded. The <function>change_cb</function> callback for all modified
|
|
rows will have been called before this, if there have been any modified
|
|
rows. The <parameter>gid</parameter> field, which is part of the
|
|
<parameter>txn</parameter> parameter, can be used in this callback.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn,
|
|
XLogRecPtr prepare_lsn);
|
|
</programlisting>
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-commit-prepared">
|
|
<title>Transaction Commit Prepared Callback</title>
|
|
|
|
<para>
|
|
The required <function>commit_prepared_cb</function> callback is called
|
|
whenever a transaction <command>COMMIT PREPARED</command> has been decoded.
|
|
The <parameter>gid</parameter> field, which is part of the
|
|
<parameter>txn</parameter> parameter, can be used in this callback.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn,
|
|
XLogRecPtr commit_lsn);
|
|
</programlisting>
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-rollback-prepared">
|
|
<title>Transaction Rollback Prepared Callback</title>
|
|
|
|
<para>
|
|
The required <function>rollback_prepared_cb</function> callback is called
|
|
whenever a transaction <command>ROLLBACK PREPARED</command> has been
|
|
decoded. The <parameter>gid</parameter> field, which is part of the
|
|
<parameter>txn</parameter> parameter, can be used in this callback. The
|
|
parameters <parameter>prepare_end_lsn</parameter> and
|
|
<parameter>prepare_time</parameter> can be used to check if the plugin
|
|
has received this <command>PREPARE TRANSACTION</command> in which case
|
|
it can apply the rollback, otherwise, it can skip the rollback operation. The
|
|
<parameter>gid</parameter> alone is not sufficient because the downstream
|
|
node can have a prepared transaction with same identifier.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn,
|
|
XLogRecPtr prepare_end_lsn,
|
|
TimestampTz prepare_time);
|
|
</programlisting>
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-stream-start">
|
|
<title>Stream Start Callback</title>
|
|
<para>
|
|
The <function>stream_start_cb</function> callback is called when opening
|
|
a block of streamed changes from an in-progress transaction.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn);
|
|
</programlisting>
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-stream-stop">
|
|
<title>Stream Stop Callback</title>
|
|
<para>
|
|
The <function>stream_stop_cb</function> callback is called when closing
|
|
a block of streamed changes from an in-progress transaction.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn);
|
|
</programlisting>
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-stream-abort">
|
|
<title>Stream Abort Callback</title>
|
|
<para>
|
|
The <function>stream_abort_cb</function> callback is called to abort
|
|
a previously streamed transaction.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn,
|
|
XLogRecPtr abort_lsn);
|
|
</programlisting>
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-stream-prepare">
|
|
<title>Stream Prepare Callback</title>
|
|
<para>
|
|
The <function>stream_prepare_cb</function> callback is called to prepare
|
|
a previously streamed transaction as part of a two-phase commit.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn,
|
|
XLogRecPtr prepare_lsn);
|
|
</programlisting>
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-stream-commit">
|
|
<title>Stream Commit Callback</title>
|
|
<para>
|
|
The <function>stream_commit_cb</function> callback is called to commit
|
|
a previously streamed transaction.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn,
|
|
XLogRecPtr commit_lsn);
|
|
</programlisting>
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-stream-change">
|
|
<title>Stream Change Callback</title>
|
|
<para>
|
|
The <function>stream_change_cb</function> callback is called when sending
|
|
a change in a block of streamed changes (demarcated by
|
|
<function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
|
|
The actual changes are not displayed as the transaction can abort at a later
|
|
point in time and we don't decode changes for aborted transactions.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn,
|
|
Relation relation,
|
|
ReorderBufferChange *change);
|
|
</programlisting>
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-stream-message">
|
|
<title>Stream Message Callback</title>
|
|
<para>
|
|
The <function>stream_message_cb</function> callback is called when sending
|
|
a generic message in a block of streamed changes (demarcated by
|
|
<function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
|
|
The message contents for transactional messages are not displayed as the transaction
|
|
can abort at a later point in time and we don't decode changes for aborted
|
|
transactions.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn,
|
|
XLogRecPtr message_lsn,
|
|
bool transactional,
|
|
const char *prefix,
|
|
Size message_size,
|
|
const char *message);
|
|
</programlisting>
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-stream-truncate">
|
|
<title>Stream Truncate Callback</title>
|
|
<para>
|
|
The <function>stream_truncate_cb</function> callback is called for a
|
|
<command>TRUNCATE</command> command in a block of streamed changes
|
|
(demarcated by <function>stream_start_cb</function> and
|
|
<function>stream_stop_cb</function> calls).
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn,
|
|
int nrelations,
|
|
Relation relations[],
|
|
ReorderBufferChange *change);
|
|
</programlisting>
|
|
The parameters are analogous to the <function>stream_change_cb</function>
|
|
callback. However, because <command>TRUNCATE</command> actions on
|
|
tables connected by foreign keys need to be executed together, this
|
|
callback receives an array of relations instead of just a single one.
|
|
See the description of the <xref linkend="sql-truncate"/> statement for
|
|
details.
|
|
</para>
|
|
</sect3>
|
|
|
|
</sect2>
|
|
|
|
<sect2 id="logicaldecoding-output-plugin-output">
|
|
<title>Functions for Producing Output</title>
|
|
|
|
<para>
|
|
To actually produce output, output plugins can write data to
|
|
the <literal>StringInfo</literal> output buffer
|
|
in <literal>ctx->out</literal> when inside
|
|
the <function>begin_cb</function>, <function>commit_cb</function>,
|
|
or <function>change_cb</function> callbacks. Before writing to the output
|
|
buffer, <function>OutputPluginPrepareWrite(ctx, last_write)</function> has
|
|
to be called, and after finishing writing to the
|
|
buffer, <function>OutputPluginWrite(ctx, last_write)</function> has to be
|
|
called to perform the write. The <parameter>last_write</parameter>
|
|
indicates whether a particular write was the callback's last write.
|
|
</para>
|
|
|
|
<para>
|
|
The following example shows how to output data to the consumer of an
|
|
output plugin:
|
|
<programlisting>
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
|
|
OutputPluginWrite(ctx, true);
|
|
</programlisting>
|
|
</para>
|
|
</sect2>
|
|
</sect1>
|
|
|
|
<sect1 id="logicaldecoding-writer">
|
|
<title>Logical Decoding Output Writers</title>
|
|
|
|
<para>
|
|
It is possible to add more output methods for logical decoding.
|
|
For details, see
|
|
<filename>src/backend/replication/logical/logicalfuncs.c</filename>.
|
|
Essentially, three functions need to be provided: one to read WAL, one to
|
|
prepare writing output, and one to write the output
|
|
(see <xref linkend="logicaldecoding-output-plugin-output"/>).
|
|
</para>
|
|
</sect1>
|
|
|
|
<sect1 id="logicaldecoding-synchronous">
|
|
<title>Synchronous Replication Support for Logical Decoding</title>
|
|
<sect2>
|
|
<title>Overview</title>
|
|
|
|
<para>
|
|
Logical decoding can be used to build
|
|
<link linkend="synchronous-replication">synchronous
|
|
replication</link> solutions with the same user interface as synchronous
|
|
replication for <link linkend="streaming-replication">streaming
|
|
replication</link>. To do this, the streaming replication interface
|
|
(see <xref linkend="logicaldecoding-walsender"/>) must be used to stream out
|
|
data. Clients have to send <literal>Standby status update (F)</literal>
|
|
(see <xref linkend="protocol-replication"/>) messages, just like streaming
|
|
replication clients do.
|
|
</para>
|
|
|
|
<note>
|
|
<para>
|
|
A synchronous replica receiving changes via logical decoding will work in
|
|
the scope of a single database. Since, in contrast to
|
|
that, <parameter>synchronous_standby_names</parameter> currently is
|
|
server wide, this means this technique will not work properly if more
|
|
than one database is actively used.
|
|
</para>
|
|
</note>
|
|
</sect2>
|
|
|
|
<sect2 id="logicaldecoding-synchronous-caveats">
|
|
<title>Caveats</title>
|
|
|
|
<para>
|
|
In synchronous replication setup, a deadlock can happen, if the transaction
|
|
has locked [user] catalog tables exclusively. See
|
|
<xref linkend="logicaldecoding-capabilities"/> for information on user
|
|
catalog tables. This is because logical decoding of transactions can lock
|
|
catalog tables to access them. To avoid this users must refrain from taking
|
|
an exclusive lock on [user] catalog tables. This can happen in the following
|
|
ways:
|
|
|
|
<itemizedlist>
|
|
<listitem>
|
|
<para>
|
|
Issuing an explicit <command>LOCK</command> on <structname>pg_class</structname>
|
|
in a transaction.
|
|
</para>
|
|
</listitem>
|
|
|
|
<listitem>
|
|
<para>
|
|
Perform <command>CLUSTER</command> on <structname>pg_class</structname> in
|
|
a transaction.
|
|
</para>
|
|
</listitem>
|
|
|
|
<listitem>
|
|
<para>
|
|
<command>PREPARE TRANSACTION</command> after <command>LOCK</command> command
|
|
on <structname>pg_class</structname> and allow logical decoding of two-phase
|
|
transactions.
|
|
</para>
|
|
</listitem>
|
|
|
|
<listitem>
|
|
<para>
|
|
<command>PREPARE TRANSACTION</command> after <command>CLUSTER</command>
|
|
command on <structname>pg_trigger</structname> and allow logical decoding of
|
|
two-phase transactions. This will lead to deadlock only when published table
|
|
have a trigger.
|
|
</para>
|
|
</listitem>
|
|
|
|
<listitem>
|
|
<para>
|
|
Executing <command>TRUNCATE</command> on [user] catalog table in a
|
|
transaction.
|
|
</para>
|
|
</listitem>
|
|
</itemizedlist>
|
|
|
|
Note that these commands that can cause deadlock apply to not only explicitly
|
|
indicated system catalog tables above but also to any other [user] catalog
|
|
table.
|
|
</para>
|
|
</sect2>
|
|
</sect1>
|
|
|
|
<sect1 id="logicaldecoding-streaming">
|
|
<title>Streaming of Large Transactions for Logical Decoding</title>
|
|
|
|
<para>
|
|
The basic output plugin callbacks (e.g., <function>begin_cb</function>,
|
|
<function>change_cb</function>, <function>commit_cb</function> and
|
|
<function>message_cb</function>) are only invoked when the transaction
|
|
actually commits. The changes are still decoded from the transaction
|
|
log, but are only passed to the output plugin at commit (and discarded
|
|
if the transaction aborts).
|
|
</para>
|
|
|
|
<para>
|
|
This means that while the decoding happens incrementally, and may spill
|
|
to disk to keep memory usage under control, all the decoded changes have
|
|
to be transmitted when the transaction finally commits (or more precisely,
|
|
when the commit is decoded from the transaction log). Depending on the
|
|
size of the transaction and network bandwidth, the transfer time may
|
|
significantly increase the apply lag.
|
|
</para>
|
|
|
|
<para>
|
|
To reduce the apply lag caused by large transactions, an output plugin
|
|
may provide additional callback to support incremental streaming of
|
|
in-progress transactions. There are multiple required streaming callbacks
|
|
(<function>stream_start_cb</function>, <function>stream_stop_cb</function>,
|
|
<function>stream_abort_cb</function>, <function>stream_commit_cb</function>
|
|
and <function>stream_change_cb</function>) and two optional callbacks
|
|
(<function>stream_message_cb</function>) and (<function>stream_truncate_cb</function>).
|
|
</para>
|
|
|
|
<para>
|
|
When streaming an in-progress transaction, the changes (and messages) are
|
|
streamed in blocks demarcated by <function>stream_start_cb</function>
|
|
and <function>stream_stop_cb</function> callbacks. Once all the decoded
|
|
changes are transmitted, the transaction can be committed using the
|
|
the <function>stream_commit_cb</function> callback
|
|
(or possibly aborted using the <function>stream_abort_cb</function> callback).
|
|
If two-phase commits are supported, the transaction can be prepared using the
|
|
<function>stream_prepare_cb</function> callback,
|
|
<command>COMMIT PREPARED</command> using the
|
|
<function>commit_prepared_cb</function> callback or aborted using the
|
|
<function>rollback_prepared_cb</function>.
|
|
</para>
|
|
|
|
<para>
|
|
One example sequence of streaming callback calls for one transaction may
|
|
look like this:
|
|
<programlisting>
|
|
stream_start_cb(...); <-- start of first block of changes
|
|
stream_change_cb(...);
|
|
stream_change_cb(...);
|
|
stream_message_cb(...);
|
|
stream_change_cb(...);
|
|
...
|
|
stream_change_cb(...);
|
|
stream_stop_cb(...); <-- end of first block of changes
|
|
|
|
stream_start_cb(...); <-- start of second block of changes
|
|
stream_change_cb(...);
|
|
stream_change_cb(...);
|
|
stream_change_cb(...);
|
|
...
|
|
stream_message_cb(...);
|
|
stream_change_cb(...);
|
|
stream_stop_cb(...); <-- end of second block of changes
|
|
|
|
stream_commit_cb(...); <-- commit of the streamed transaction
|
|
</programlisting>
|
|
</para>
|
|
|
|
<para>
|
|
The actual sequence of callback calls may be more complicated, of course.
|
|
There may be blocks for multiple streamed transactions, some of the
|
|
transactions may get aborted, etc.
|
|
</para>
|
|
|
|
<para>
|
|
Similar to spill-to-disk behavior, streaming is triggered when the total
|
|
amount of changes decoded from the WAL (for all in-progress transactions)
|
|
exceeds the limit defined by <varname>logical_decoding_work_mem</varname> setting.
|
|
At that point, the largest toplevel transaction (measured by the amount of memory
|
|
currently used for decoded changes) is selected and streamed. However, in
|
|
some cases we still have to spill to disk even if streaming is enabled
|
|
because we exceed the memory threshold but still have not decoded the
|
|
complete tuple e.g., only decoded toast table insert but not the main table
|
|
insert.
|
|
</para>
|
|
|
|
<para>
|
|
Even when streaming large transactions, the changes are still applied in
|
|
commit order, preserving the same guarantees as the non-streaming mode.
|
|
</para>
|
|
|
|
</sect1>
|
|
|
|
<sect1 id="logicaldecoding-two-phase-commits">
|
|
<title>Two-phase commit support for Logical Decoding</title>
|
|
|
|
<para>
|
|
With the basic output plugin callbacks (eg., <function>begin_cb</function>,
|
|
<function>change_cb</function>, <function>commit_cb</function> and
|
|
<function>message_cb</function>) two-phase commit commands like
|
|
<command>PREPARE TRANSACTION</command>, <command>COMMIT PREPARED</command>
|
|
and <command>ROLLBACK PREPARED</command> are not decoded. While the
|
|
<command>PREPARE TRANSACTION</command> is ignored,
|
|
<command>COMMIT PREPARED</command> is decoded as a <command>COMMIT</command>
|
|
and <command>ROLLBACK PREPARED</command> is decoded as a
|
|
<command>ROLLBACK</command>.
|
|
</para>
|
|
|
|
<para>
|
|
To support the streaming of two-phase commands, an output plugin needs to
|
|
provide additional callbacks. There are multiple two-phase commit callbacks
|
|
that are required, (<function>begin_prepare_cb</function>,
|
|
<function>prepare_cb</function>, <function>commit_prepared_cb</function>,
|
|
<function>rollback_prepared_cb</function> and
|
|
<function>stream_prepare_cb</function>) and an optional callback
|
|
(<function>filter_prepare_cb</function>).
|
|
</para>
|
|
|
|
<para>
|
|
If the output plugin callbacks for decoding two-phase commit commands are
|
|
provided, then on <command>PREPARE TRANSACTION</command>, the changes of
|
|
that transaction are decoded, passed to the output plugin, and the
|
|
<function>prepare_cb</function> callback is invoked. This differs from the
|
|
basic decoding setup where changes are only passed to the output plugin
|
|
when a transaction is committed. The start of a prepared transaction is
|
|
indicated by the <function>begin_prepare_cb</function> callback.
|
|
</para>
|
|
|
|
<para>
|
|
When a prepared transaction is rolled back using the
|
|
<command>ROLLBACK PREPARED</command>, then the
|
|
<function>rollback_prepared_cb</function> callback is invoked and when the
|
|
prepared transaction is committed using <command>COMMIT PREPARED</command>,
|
|
then the <function>commit_prepared_cb</function> callback is invoked.
|
|
</para>
|
|
|
|
<para>
|
|
Optionally the output plugin can define filtering rules via
|
|
<function>filter_prepare_cb</function> to decode only specific transaction
|
|
in two phases. This can be achieved by pattern matching on the
|
|
<parameter>gid</parameter> or via lookups using the
|
|
<parameter>xid</parameter>.
|
|
</para>
|
|
|
|
<para>
|
|
The users that want to decode prepared transactions need to be careful about
|
|
below mentioned points:
|
|
|
|
<itemizedlist>
|
|
<listitem>
|
|
<para>
|
|
If the prepared transaction has locked [user] catalog tables exclusively
|
|
then decoding prepare can block till the main transaction is committed.
|
|
</para>
|
|
</listitem>
|
|
|
|
<listitem>
|
|
<para>
|
|
The logical replication solution that builds distributed two phase commit
|
|
using this feature can deadlock if the prepared transaction has locked
|
|
[user] catalog tables exclusively. To avoid this users must refrain from
|
|
having locks on catalog tables (e.g. explicit <command>LOCK</command> command)
|
|
in such transactions.
|
|
See <xref linkend="logicaldecoding-synchronous-caveats"/> for the details.
|
|
</para>
|
|
</listitem>
|
|
</itemizedlist>
|
|
</para>
|
|
|
|
</sect1>
|
|
</chapter>
|