mirror of
https://github.com/postgres/postgres.git
synced 2025-05-05 09:19:17 +03:00
%d can be used to track if the current connection is in a transaction block or not, and adding it by default to the prompt has the advantage to not need a modification of .psqlrc, something not possible depending on the environment. This discussion has happened across various sources, and there was a strong consensus in favor of this change. Author: Vik Fearing Reviewed-by: Fabien Coelho Discussion: https://postgr.es/m/09502c40-cfe1-bb29-10f9-4b3fa7b2bbb2@2ndquadrant.com
750 lines
31 KiB
Plaintext
750 lines
31 KiB
Plaintext
<!-- doc/src/sgml/logicaldecoding.sgml -->
|
|
<chapter id="logicaldecoding">
|
|
<title>Logical Decoding</title>
|
|
<indexterm zone="logicaldecoding">
|
|
<primary>Logical Decoding</primary>
|
|
</indexterm>
|
|
<para>
|
|
PostgreSQL provides infrastructure to stream the modifications performed
|
|
via SQL to external consumers. This functionality can be used for a
|
|
variety of purposes, including replication solutions and auditing.
|
|
</para>
|
|
|
|
<para>
|
|
Changes are sent out in streams identified by logical replication slots.
|
|
</para>
|
|
|
|
<para>
|
|
The format in which those changes are streamed is determined by the output
|
|
plugin used. An example plugin is provided in the PostgreSQL distribution.
|
|
Additional plugins can be
|
|
written to extend the choice of available formats without modifying any
|
|
core code.
|
|
Every output plugin has access to each individual new row produced
|
|
by <command>INSERT</command> and the new row version created
|
|
by <command>UPDATE</command>. Availability of old row versions for
|
|
<command>UPDATE</command> and <command>DELETE</command> depends on
|
|
the configured replica identity (see <xref linkend="sql-createtable-replica-identity"/>).
|
|
</para>
|
|
|
|
<para>
|
|
Changes can be consumed either using the streaming replication protocol
|
|
(see <xref linkend="protocol-replication"/> and
|
|
<xref linkend="logicaldecoding-walsender"/>), or by calling functions
|
|
via SQL (see <xref linkend="logicaldecoding-sql"/>). It is also possible
|
|
to write additional methods of consuming the output of a replication slot
|
|
without modifying core code
|
|
(see <xref linkend="logicaldecoding-writer"/>).
|
|
</para>
|
|
|
|
<sect1 id="logicaldecoding-example">
|
|
<title>Logical Decoding Examples</title>
|
|
|
|
<para>
|
|
The following example demonstrates controlling logical decoding using the
|
|
SQL interface.
|
|
</para>
|
|
|
|
<para>
|
|
Before you can use logical decoding, you must set
|
|
<xref linkend="guc-wal-level"/> to <literal>logical</literal> and
|
|
<xref linkend="guc-max-replication-slots"/> to at least 1. Then, you
|
|
should connect to the target database (in the example
|
|
below, <literal>postgres</literal>) as a superuser.
|
|
</para>
|
|
|
|
<programlisting>
|
|
postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding'
|
|
postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
|
|
slot_name | lsn
|
|
-----------------+-----------
|
|
regression_slot | 0/16B1970
|
|
(1 row)
|
|
|
|
postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
|
|
slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn
|
|
-----------------+---------------+-----------+----------+--------+-------------+-----------------
|
|
regression_slot | test_decoding | logical | postgres | f | 0/16A4408 | 0/16A4440
|
|
(1 row)
|
|
|
|
postgres=# -- There are no changes to see yet
|
|
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
|
|
lsn | xid | data
|
|
-----+-----+------
|
|
(0 rows)
|
|
|
|
postgres=# CREATE TABLE data(id serial primary key, data text);
|
|
CREATE TABLE
|
|
|
|
postgres=# -- DDL isn't replicated, so all you'll see is the transaction
|
|
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
|
|
lsn | xid | data
|
|
-----------+-------+--------------
|
|
0/BA2DA58 | 10297 | BEGIN 10297
|
|
0/BA5A5A0 | 10297 | COMMIT 10297
|
|
(2 rows)
|
|
|
|
postgres=# -- Once changes are read, they're consumed and not emitted
|
|
postgres=# -- in a subsequent call:
|
|
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
|
|
lsn | xid | data
|
|
-----+-----+------
|
|
(0 rows)
|
|
|
|
postgres=# BEGIN;
|
|
postgres=*# INSERT INTO data(data) VALUES('1');
|
|
postgres=*# INSERT INTO data(data) VALUES('2');
|
|
postgres=*# COMMIT;
|
|
|
|
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
|
|
lsn | xid | data
|
|
-----------+-------+---------------------------------------------------------
|
|
0/BA5A688 | 10298 | BEGIN 10298
|
|
0/BA5A6F0 | 10298 | table public.data: INSERT: id[integer]:1 data[text]:'1'
|
|
0/BA5A7F8 | 10298 | table public.data: INSERT: id[integer]:2 data[text]:'2'
|
|
0/BA5A8A8 | 10298 | COMMIT 10298
|
|
(4 rows)
|
|
|
|
postgres=# INSERT INTO data(data) VALUES('3');
|
|
|
|
postgres=# -- You can also peek ahead in the change stream without consuming changes
|
|
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
|
|
lsn | xid | data
|
|
-----------+-------+---------------------------------------------------------
|
|
0/BA5A8E0 | 10299 | BEGIN 10299
|
|
0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
|
|
0/BA5A990 | 10299 | COMMIT 10299
|
|
(3 rows)
|
|
|
|
postgres=# -- The next call to pg_logical_slot_peek_changes() returns the same changes again
|
|
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
|
|
lsn | xid | data
|
|
-----------+-------+---------------------------------------------------------
|
|
0/BA5A8E0 | 10299 | BEGIN 10299
|
|
0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
|
|
0/BA5A990 | 10299 | COMMIT 10299
|
|
(3 rows)
|
|
|
|
postgres=# -- options can be passed to output plugin, to influence the formatting
|
|
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on');
|
|
lsn | xid | data
|
|
-----------+-------+---------------------------------------------------------
|
|
0/BA5A8E0 | 10299 | BEGIN 10299
|
|
0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
|
|
0/BA5A990 | 10299 | COMMIT 10299 (at 2017-05-10 12:07:21.272494-04)
|
|
(3 rows)
|
|
|
|
postgres=# -- Remember to destroy a slot you no longer need to stop it consuming
|
|
postgres=# -- server resources:
|
|
postgres=# SELECT pg_drop_replication_slot('regression_slot');
|
|
pg_drop_replication_slot
|
|
-----------------------
|
|
|
|
(1 row)
|
|
</programlisting>
|
|
|
|
<para>
|
|
The following example shows how logical decoding is controlled over the
|
|
streaming replication protocol, using the
|
|
program <xref linkend="app-pgrecvlogical"/> included in the PostgreSQL
|
|
distribution. This requires that client authentication is set up to allow
|
|
replication connections
|
|
(see <xref linkend="streaming-replication-authentication"/>) and
|
|
that <varname>max_wal_senders</varname> is set sufficiently high to allow
|
|
an additional connection.
|
|
</para>
|
|
<programlisting>
|
|
$ pg_recvlogical -d postgres --slot=test --create-slot
|
|
$ pg_recvlogical -d postgres --slot=test --start -f -
|
|
<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
|
|
$ psql -d postgres -c "INSERT INTO data(data) VALUES('4');"
|
|
$ fg
|
|
BEGIN 693
|
|
table public.data: INSERT: id[integer]:4 data[text]:'4'
|
|
COMMIT 693
|
|
<keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
|
|
$ pg_recvlogical -d postgres --slot=test --drop-slot
|
|
</programlisting>
|
|
</sect1>
|
|
|
|
<sect1 id="logicaldecoding-explanation">
|
|
<title>Logical Decoding Concepts</title>
|
|
<sect2>
|
|
<title>Logical Decoding</title>
|
|
|
|
<indexterm>
|
|
<primary>Logical Decoding</primary>
|
|
</indexterm>
|
|
|
|
<para>
|
|
Logical decoding is the process of extracting all persistent changes
|
|
to a database's tables into a coherent, easy to understand format which
|
|
can be interpreted without detailed knowledge of the database's internal
|
|
state.
|
|
</para>
|
|
|
|
<para>
|
|
In <productname>PostgreSQL</productname>, logical decoding is implemented
|
|
by decoding the contents of the <link linkend="wal">write-ahead
|
|
log</link>, which describe changes on a storage level, into an
|
|
application-specific form such as a stream of tuples or SQL statements.
|
|
</para>
|
|
</sect2>
|
|
|
|
<sect2 id="logicaldecoding-replication-slots">
|
|
<title>Replication Slots</title>
|
|
|
|
<indexterm>
|
|
<primary>replication slot</primary>
|
|
<secondary>logical replication</secondary>
|
|
</indexterm>
|
|
|
|
<para>
|
|
In the context of logical replication, a slot represents a stream of
|
|
changes that can be replayed to a client in the order they were made on
|
|
the origin server. Each slot streams a sequence of changes from a single
|
|
database.
|
|
</para>
|
|
|
|
<note>
|
|
<para><productname>PostgreSQL</productname> also has streaming replication slots
|
|
(see <xref linkend="streaming-replication"/>), but they are used somewhat
|
|
differently there.
|
|
</para>
|
|
</note>
|
|
|
|
<para>
|
|
A replication slot has an identifier that is unique across all databases
|
|
in a <productname>PostgreSQL</productname> cluster. Slots persist
|
|
independently of the connection using them and are crash-safe.
|
|
</para>
|
|
|
|
<para>
|
|
A logical slot will emit each change just once in normal operation.
|
|
The current position of each slot is persisted only at checkpoint, so in
|
|
the case of a crash the slot may return to an earlier LSN, which will
|
|
then cause recent changes to be resent when the server restarts.
|
|
Logical decoding clients are responsible for avoiding ill effects from
|
|
handling the same message more than once. Clients may wish to record
|
|
the last LSN they saw when decoding and skip over any repeated data or
|
|
(when using the replication protocol) request that decoding start from
|
|
that LSN rather than letting the server determine the start point.
|
|
The Replication Progress Tracking feature is designed for this purpose,
|
|
refer to <link linkend="replication-origins">replication origins</link>.
|
|
</para>
|
|
|
|
<para>
|
|
Multiple independent slots may exist for a single database. Each slot has
|
|
its own state, allowing different consumers to receive changes from
|
|
different points in the database change stream. For most applications, a
|
|
separate slot will be required for each consumer.
|
|
</para>
|
|
|
|
<para>
|
|
A logical replication slot knows nothing about the state of the
|
|
receiver(s). It's even possible to have multiple different receivers using
|
|
the same slot at different times; they'll just get the changes following
|
|
on from when the last receiver stopped consuming them. Only one receiver
|
|
may consume changes from a slot at any given time.
|
|
</para>
|
|
|
|
<caution>
|
|
<para>
|
|
Replication slots persist across crashes and know nothing about the state
|
|
of their consumer(s). They will prevent removal of required resources
|
|
even when there is no connection using them. This consumes storage
|
|
because neither required WAL nor required rows from the system catalogs
|
|
can be removed by <command>VACUUM</command> as long as they are required by a replication
|
|
slot. In extreme cases this could cause the database to shut down to prevent
|
|
transaction ID wraparound (see <xref linkend="vacuum-for-wraparound"/>).
|
|
So if a slot is no longer required it should be dropped.
|
|
</para>
|
|
</caution>
|
|
</sect2>
|
|
|
|
<sect2>
|
|
<title>Output Plugins</title>
|
|
<para>
|
|
Output plugins transform the data from the write-ahead log's internal
|
|
representation into the format the consumer of a replication slot desires.
|
|
</para>
|
|
</sect2>
|
|
|
|
<sect2>
|
|
<title>Exported Snapshots</title>
|
|
<para>
|
|
When a new replication slot is created using the streaming replication
|
|
interface (see <xref linkend="protocol-replication-create-slot"/>), a
|
|
snapshot is exported
|
|
(see <xref linkend="functions-snapshot-synchronization"/>), which will show
|
|
exactly the state of the database after which all changes will be
|
|
included in the change stream. This can be used to create a new replica by
|
|
using <link linkend="sql-set-transaction"><literal>SET TRANSACTION
|
|
SNAPSHOT</literal></link> to read the state of the database at the moment
|
|
the slot was created. This transaction can then be used to dump the
|
|
database's state at that point in time, which afterwards can be updated
|
|
using the slot's contents without losing any changes.
|
|
</para>
|
|
<para>
|
|
Creation of a snapshot is not always possible. In particular, it will
|
|
fail when connected to a hot standby. Applications that do not require
|
|
snapshot export may suppress it with the <literal>NOEXPORT_SNAPSHOT</literal>
|
|
option.
|
|
</para>
|
|
</sect2>
|
|
</sect1>
|
|
|
|
<sect1 id="logicaldecoding-walsender">
|
|
<title>Streaming Replication Protocol Interface</title>
|
|
|
|
<para>
|
|
The commands
|
|
<itemizedlist>
|
|
<listitem>
|
|
<para><literal>CREATE_REPLICATION_SLOT <replaceable>slot_name</replaceable> LOGICAL <replaceable>output_plugin</replaceable></literal></para>
|
|
</listitem>
|
|
|
|
<listitem>
|
|
<para><literal>DROP_REPLICATION_SLOT <replaceable>slot_name</replaceable></literal> <optional> <literal>WAIT</literal> </optional></para>
|
|
</listitem>
|
|
|
|
<listitem>
|
|
<para><literal>START_REPLICATION SLOT <replaceable>slot_name</replaceable> LOGICAL ...</literal></para>
|
|
</listitem>
|
|
</itemizedlist>
|
|
are used to create, drop, and stream changes from a replication
|
|
slot, respectively. These commands are only available over a replication
|
|
connection; they cannot be used via SQL.
|
|
See <xref linkend="protocol-replication"/> for details on these commands.
|
|
</para>
|
|
|
|
<para>
|
|
The command <xref linkend="app-pgrecvlogical"/> can be used to control
|
|
logical decoding over a streaming replication connection. (It uses
|
|
these commands internally.)
|
|
</para>
|
|
</sect1>
|
|
|
|
<sect1 id="logicaldecoding-sql">
|
|
<title>Logical Decoding <acronym>SQL</acronym> Interface</title>
|
|
|
|
<para>
|
|
See <xref linkend="functions-replication"/> for detailed documentation on
|
|
the SQL-level API for interacting with logical decoding.
|
|
</para>
|
|
|
|
<para>
|
|
Synchronous replication (see <xref linkend="synchronous-replication"/>) is
|
|
only supported on replication slots used over the streaming replication interface. The
|
|
function interface and additional, non-core interfaces do not support
|
|
synchronous replication.
|
|
</para>
|
|
</sect1>
|
|
|
|
<sect1 id="logicaldecoding-catalogs">
|
|
<title>System Catalogs Related to Logical Decoding</title>
|
|
|
|
<para>
|
|
The <link linkend="view-pg-replication-slots"><structname>pg_replication_slots</structname></link>
|
|
view and the
|
|
<link linkend="pg-stat-replication-view"><structname>pg_stat_replication</structname></link>
|
|
view provide information about the current state of replication slots and
|
|
streaming replication connections respectively. These views apply to both physical and
|
|
logical replication.
|
|
</para>
|
|
</sect1>
|
|
|
|
<sect1 id="logicaldecoding-output-plugin">
|
|
<title>Logical Decoding Output Plugins</title>
|
|
<para>
|
|
An example output plugin can be found in the
|
|
<link linkend="test-decoding">
|
|
<filename>contrib/test_decoding</filename>
|
|
</link>
|
|
subdirectory of the PostgreSQL source tree.
|
|
</para>
|
|
<sect2 id="logicaldecoding-output-init">
|
|
<title>Initialization Function</title>
|
|
<indexterm zone="logicaldecoding-output-init">
|
|
<primary>_PG_output_plugin_init</primary>
|
|
</indexterm>
|
|
<para>
|
|
An output plugin is loaded by dynamically loading a shared library with
|
|
the output plugin's name as the library base name. The normal library
|
|
search path is used to locate the library. To provide the required output
|
|
plugin callbacks and to indicate that the library is actually an output
|
|
plugin it needs to provide a function named
|
|
<function>_PG_output_plugin_init</function>. This function is passed a
|
|
struct that needs to be filled with the callback function pointers for
|
|
individual actions.
|
|
<programlisting>
|
|
typedef struct OutputPluginCallbacks
|
|
{
|
|
LogicalDecodeStartupCB startup_cb;
|
|
LogicalDecodeBeginCB begin_cb;
|
|
LogicalDecodeChangeCB change_cb;
|
|
LogicalDecodeTruncateCB truncate_cb;
|
|
LogicalDecodeCommitCB commit_cb;
|
|
LogicalDecodeMessageCB message_cb;
|
|
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
|
|
LogicalDecodeShutdownCB shutdown_cb;
|
|
} OutputPluginCallbacks;
|
|
|
|
typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
|
|
</programlisting>
|
|
The <function>begin_cb</function>, <function>change_cb</function>
|
|
and <function>commit_cb</function> callbacks are required,
|
|
while <function>startup_cb</function>,
|
|
<function>filter_by_origin_cb</function>, <function>truncate_cb</function>,
|
|
and <function>shutdown_cb</function> are optional.
|
|
If <function>truncate_cb</function> is not set but a
|
|
<command>TRUNCATE</command> is to be decoded, the action will be ignored.
|
|
</para>
|
|
</sect2>
|
|
|
|
<sect2 id="logicaldecoding-capabilities">
|
|
<title>Capabilities</title>
|
|
|
|
<para>
|
|
To decode, format and output changes, output plugins can use most of the
|
|
backend's normal infrastructure, including calling output functions. Read
|
|
only access to relations is permitted as long as only relations are
|
|
accessed that either have been created by <command>initdb</command> in
|
|
the <literal>pg_catalog</literal> schema, or have been marked as user
|
|
provided catalog tables using
|
|
<programlisting>
|
|
ALTER TABLE user_catalog_table SET (user_catalog_table = true);
|
|
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
|
|
</programlisting>
|
|
Any actions leading to transaction ID assignment are prohibited. That, among others,
|
|
includes writing to tables, performing DDL changes, and
|
|
calling <literal>txid_current()</literal>.
|
|
</para>
|
|
</sect2>
|
|
|
|
<sect2 id="logicaldecoding-output-mode">
|
|
<title>Output Modes</title>
|
|
|
|
<para>
|
|
Output plugin callbacks can pass data to the consumer in nearly arbitrary
|
|
formats. For some use cases, like viewing the changes via SQL, returning
|
|
data in a data type that can contain arbitrary data (e.g., <type>bytea</type>) is
|
|
cumbersome. If the output plugin only outputs textual data in the
|
|
server's encoding, it can declare that by
|
|
setting <literal>OutputPluginOptions.output_type</literal>
|
|
to <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal> instead
|
|
of <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal> in
|
|
the <link linkend="logicaldecoding-output-plugin-startup">startup
|
|
callback</link>. In that case, all the data has to be in the server's encoding
|
|
so that a <type>text</type> datum can contain it. This is checked in assertion-enabled
|
|
builds.
|
|
</para>
|
|
</sect2>
|
|
|
|
<sect2 id="logicaldecoding-output-plugin-callbacks">
|
|
<title>Output Plugin Callbacks</title>
|
|
|
|
<para>
|
|
An output plugin gets notified about changes that are happening via
|
|
various callbacks it needs to provide.
|
|
</para>
|
|
|
|
<para>
|
|
Concurrent transactions are decoded in commit order, and only changes
|
|
belonging to a specific transaction are decoded between
|
|
the <literal>begin</literal> and <literal>commit</literal>
|
|
callbacks. Transactions that were rolled back explicitly or implicitly
|
|
never get
|
|
decoded. Successful savepoints are
|
|
folded into the transaction containing them in the order they were
|
|
executed within that transaction.
|
|
</para>
|
|
|
|
<note>
|
|
<para>
|
|
Only transactions that have already safely been flushed to disk will be
|
|
decoded. That can lead to a <command>COMMIT</command> not immediately being decoded in a
|
|
directly following <literal>pg_logical_slot_get_changes()</literal>
|
|
when <varname>synchronous_commit</varname> is set
|
|
to <literal>off</literal>.
|
|
</para>
|
|
</note>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-startup">
|
|
<title>Startup Callback</title>
|
|
<para>
|
|
The optional <function>startup_cb</function> callback is called whenever
|
|
a replication slot is created or asked to stream changes, independent
|
|
of the number of changes that are ready to be put out.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
|
|
OutputPluginOptions *options,
|
|
bool is_init);
|
|
</programlisting>
|
|
The <literal>is_init</literal> parameter will be true when the
|
|
replication slot is being created and false
|
|
otherwise. <parameter>options</parameter> points to a struct of options
|
|
that output plugins can set:
|
|
<programlisting>
|
|
typedef struct OutputPluginOptions
|
|
{
|
|
OutputPluginOutputType output_type;
|
|
bool receive_rewrites;
|
|
} OutputPluginOptions;
|
|
</programlisting>
|
|
<literal>output_type</literal> has to either be set to
|
|
<literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal>
|
|
or <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal>. See also
|
|
<xref linkend="logicaldecoding-output-mode"/>.
|
|
If <literal>receive_rewrites</literal> is true, the output plugin will
|
|
also be called for changes made by heap rewrites during certain DDL
|
|
operations. These are of interest to plugins that handle DDL
|
|
replication, but they require special handling.
|
|
</para>
|
|
|
|
<para>
|
|
The startup callback should validate the options present in
|
|
<literal>ctx->output_plugin_options</literal>. If the output plugin
|
|
needs to have a state, it can
|
|
use <literal>ctx->output_plugin_private</literal> to store it.
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-shutdown">
|
|
<title>Shutdown Callback</title>
|
|
|
|
<para>
|
|
The optional <function>shutdown_cb</function> callback is called
|
|
whenever a formerly active replication slot is not used anymore and can
|
|
be used to deallocate resources private to the output plugin. The slot
|
|
isn't necessarily being dropped, streaming is just being stopped.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
|
|
</programlisting>
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-begin">
|
|
<title>Transaction Begin Callback</title>
|
|
|
|
<para>
|
|
The required <function>begin_cb</function> callback is called whenever a
|
|
start of a committed transaction has been decoded. Aborted transactions
|
|
and their contents never get decoded.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn);
|
|
</programlisting>
|
|
The <parameter>txn</parameter> parameter contains meta information about
|
|
the transaction, like the time stamp at which it has been committed and
|
|
its XID.
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-commit">
|
|
<title>Transaction End Callback</title>
|
|
|
|
<para>
|
|
The required <function>commit_cb</function> callback is called whenever
|
|
a transaction commit has been
|
|
decoded. The <function>change_cb</function> callbacks for all modified
|
|
rows will have been called before this, if there have been any modified
|
|
rows.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn,
|
|
XLogRecPtr commit_lsn);
|
|
</programlisting>
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-change">
|
|
<title>Change Callback</title>
|
|
|
|
<para>
|
|
The required <function>change_cb</function> callback is called for every
|
|
individual row modification inside a transaction, may it be
|
|
an <command>INSERT</command>, <command>UPDATE</command>,
|
|
or <command>DELETE</command>. Even if the original command modified
|
|
several rows at once the callback will be called individually for each
|
|
row.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn,
|
|
Relation relation,
|
|
ReorderBufferChange *change);
|
|
</programlisting>
|
|
The <parameter>ctx</parameter> and <parameter>txn</parameter> parameters
|
|
have the same contents as for the <function>begin_cb</function>
|
|
and <function>commit_cb</function> callbacks, but additionally the
|
|
relation descriptor <parameter>relation</parameter> points to the
|
|
relation the row belongs to and a struct
|
|
<parameter>change</parameter> describing the row modification are passed
|
|
in.
|
|
</para>
|
|
|
|
<note>
|
|
<para>
|
|
Only changes in user defined tables that are not unlogged
|
|
(see <xref linkend="sql-createtable-unlogged"/>) and not temporary
|
|
(see <xref linkend="sql-createtable-temporary"/>) can be extracted using
|
|
logical decoding.
|
|
</para>
|
|
</note>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-truncate">
|
|
<title>Truncate Callback</title>
|
|
|
|
<para>
|
|
The <function>truncate_cb</function> callback is called for a
|
|
<command>TRUNCATE</command> command.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn,
|
|
int nrelations,
|
|
Relation relations[],
|
|
ReorderBufferChange *change);
|
|
</programlisting>
|
|
The parameters are analogous to the <function>change_cb</function>
|
|
callback. However, because <command>TRUNCATE</command> actions on
|
|
tables connected by foreign keys need to be executed together, this
|
|
callback receives an array of relations instead of just a single one.
|
|
See the description of the <xref linkend="sql-truncate"/> statement for
|
|
details.
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-filter-origin">
|
|
<title>Origin Filter Callback</title>
|
|
|
|
<para>
|
|
The optional <function>filter_by_origin_cb</function> callback
|
|
is called to determine whether data that has been replayed
|
|
from <parameter>origin_id</parameter> is of interest to the
|
|
output plugin.
|
|
<programlisting>
|
|
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
|
|
RepOriginId origin_id);
|
|
</programlisting>
|
|
The <parameter>ctx</parameter> parameter has the same contents
|
|
as for the other callbacks. No information but the origin is
|
|
available. To signal that changes originating on the passed in
|
|
node are irrelevant, return true, causing them to be filtered
|
|
away; false otherwise. The other callbacks will not be called
|
|
for transactions and changes that have been filtered away.
|
|
</para>
|
|
<para>
|
|
This is useful when implementing cascading or multidirectional
|
|
replication solutions. Filtering by the origin allows to
|
|
prevent replicating the same changes back and forth in such
|
|
setups. While transactions and changes also carry information
|
|
about the origin, filtering via this callback is noticeably
|
|
more efficient.
|
|
</para>
|
|
</sect3>
|
|
|
|
<sect3 id="logicaldecoding-output-plugin-message">
|
|
<title>Generic Message Callback</title>
|
|
|
|
<para>
|
|
The optional <function>message_cb</function> callback is called whenever
|
|
a logical decoding message has been decoded.
|
|
<programlisting>
|
|
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn,
|
|
XLogRecPtr message_lsn,
|
|
bool transactional,
|
|
const char *prefix,
|
|
Size message_size,
|
|
const char *message);
|
|
</programlisting>
|
|
The <parameter>txn</parameter> parameter contains meta information about
|
|
the transaction, like the time stamp at which it has been committed and
|
|
its XID. Note however that it can be NULL when the message is
|
|
non-transactional and the XID was not assigned yet in the transaction
|
|
which logged the message. The <parameter>lsn</parameter> has WAL
|
|
location of the message. The <parameter>transactional</parameter> says
|
|
if the message was sent as transactional or not.
|
|
The <parameter>prefix</parameter> is arbitrary null-terminated prefix
|
|
which can be used for identifying interesting messages for the current
|
|
plugin. And finally the <parameter>message</parameter> parameter holds
|
|
the actual message of <parameter>message_size</parameter> size.
|
|
</para>
|
|
<para>
|
|
Extra care should be taken to ensure that the prefix the output plugin
|
|
considers interesting is unique. Using name of the extension or the
|
|
output plugin itself is often a good choice.
|
|
</para>
|
|
</sect3>
|
|
|
|
</sect2>
|
|
|
|
<sect2 id="logicaldecoding-output-plugin-output">
|
|
<title>Functions for Producing Output</title>
|
|
|
|
<para>
|
|
To actually produce output, output plugins can write data to
|
|
the <literal>StringInfo</literal> output buffer
|
|
in <literal>ctx->out</literal> when inside
|
|
the <function>begin_cb</function>, <function>commit_cb</function>,
|
|
or <function>change_cb</function> callbacks. Before writing to the output
|
|
buffer, <function>OutputPluginPrepareWrite(ctx, last_write)</function> has
|
|
to be called, and after finishing writing to the
|
|
buffer, <function>OutputPluginWrite(ctx, last_write)</function> has to be
|
|
called to perform the write. The <parameter>last_write</parameter>
|
|
indicates whether a particular write was the callback's last write.
|
|
</para>
|
|
|
|
<para>
|
|
The following example shows how to output data to the consumer of an
|
|
output plugin:
|
|
<programlisting>
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
|
|
OutputPluginWrite(ctx, true);
|
|
</programlisting>
|
|
</para>
|
|
</sect2>
|
|
</sect1>
|
|
|
|
<sect1 id="logicaldecoding-writer">
|
|
<title>Logical Decoding Output Writers</title>
|
|
|
|
<para>
|
|
It is possible to add more output methods for logical decoding.
|
|
For details, see
|
|
<filename>src/backend/replication/logical/logicalfuncs.c</filename>.
|
|
Essentially, three functions need to be provided: one to read WAL, one to
|
|
prepare writing output, and one to write the output
|
|
(see <xref linkend="logicaldecoding-output-plugin-output"/>).
|
|
</para>
|
|
</sect1>
|
|
|
|
<sect1 id="logicaldecoding-synchronous">
|
|
<title>Synchronous Replication Support for Logical Decoding</title>
|
|
|
|
<para>
|
|
Logical decoding can be used to build
|
|
<link linkend="synchronous-replication">synchronous
|
|
replication</link> solutions with the same user interface as synchronous
|
|
replication for <link linkend="streaming-replication">streaming
|
|
replication</link>. To do this, the streaming replication interface
|
|
(see <xref linkend="logicaldecoding-walsender"/>) must be used to stream out
|
|
data. Clients have to send <literal>Standby status update (F)</literal>
|
|
(see <xref linkend="protocol-replication"/>) messages, just like streaming
|
|
replication clients do.
|
|
</para>
|
|
|
|
<note>
|
|
<para>
|
|
A synchronous replica receiving changes via logical decoding will work in
|
|
the scope of a single database. Since, in contrast to
|
|
that, <parameter>synchronous_standby_names</parameter> currently is
|
|
server wide, this means this technique will not work properly if more
|
|
than one database is actively used.
|
|
</para>
|
|
</note>
|
|
</sect1>
|
|
</chapter>
|