mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-25 13:17:41 +03:00 
			
		
		
		
	We were inconsistently specifying the required and optional marking for plugin callbacks. Additionally, this patch improves the description for stream_prepare callback. Author: Wang wei Reviewed-by: Amit Kapila Discussion: https://postgr.es/m/OS3PR01MB627553DAFD39ECDADD08DC909EFC9@OS3PR01MB6275.jpnprd01.prod.outlook.com
		
			
				
	
	
		
			1360 lines
		
	
	
		
			59 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
			
		
		
	
	
			1360 lines
		
	
	
		
			59 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
| <!-- doc/src/sgml/logicaldecoding.sgml -->
 | |
|  <chapter id="logicaldecoding">
 | |
|   <title>Logical Decoding</title>
 | |
|   <indexterm zone="logicaldecoding">
 | |
|    <primary>Logical Decoding</primary>
 | |
|   </indexterm>
 | |
|   <para>
 | |
|    PostgreSQL provides infrastructure to stream the modifications performed
 | |
|    via SQL to external consumers.  This functionality can be used for a
 | |
|    variety of purposes, including replication solutions and auditing.
 | |
|   </para>
 | |
| 
 | |
|   <para>
 | |
|    Changes are sent out in streams identified by logical replication slots.
 | |
|   </para>
 | |
| 
 | |
|   <para>
 | |
|    The format in which those changes are streamed is determined by the output
 | |
|    plugin used.  An example plugin is provided in the PostgreSQL distribution.
 | |
|    Additional plugins can be
 | |
|    written to extend the choice of available formats without modifying any
 | |
|    core code.
 | |
|    Every output plugin has access to each individual new row produced
 | |
|    by <command>INSERT</command> and the new row version created
 | |
|    by <command>UPDATE</command>.  Availability of old row versions for
 | |
|    <command>UPDATE</command> and <command>DELETE</command> depends on
 | |
|    the configured replica identity (see <xref linkend="sql-altertable-replica-identity"/>).
 | |
|   </para>
 | |
| 
 | |
|   <para>
 | |
|    Changes can be consumed either using the streaming replication protocol
 | |
|    (see <xref linkend="protocol-replication"/> and
 | |
|    <xref linkend="logicaldecoding-walsender"/>), or by calling functions
 | |
|    via SQL (see <xref linkend="logicaldecoding-sql"/>). It is also possible
 | |
|    to write additional methods of consuming the output of a replication slot
 | |
|    without modifying core code
 | |
|    (see <xref linkend="logicaldecoding-writer"/>).
 | |
|   </para>
 | |
| 
 | |
|   <sect1 id="logicaldecoding-example">
 | |
|    <title>Logical Decoding Examples</title>
 | |
| 
 | |
|    <para>
 | |
|     The following example demonstrates controlling logical decoding using the
 | |
|     SQL interface.
 | |
|    </para>
 | |
| 
 | |
|    <para>
 | |
|     Before you can use logical decoding, you must set
 | |
|     <xref linkend="guc-wal-level"/> to <literal>logical</literal> and
 | |
|     <xref linkend="guc-max-replication-slots"/> to at least 1.  Then, you
 | |
|     should connect to the target database (in the example
 | |
|     below, <literal>postgres</literal>) as a superuser.
 | |
|    </para>
 | |
| 
 | |
| <programlisting>
 | |
| postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding'
 | |
| postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
 | |
|     slot_name    |    lsn
 | |
| -----------------+-----------
 | |
|  regression_slot | 0/16B1970
 | |
| (1 row)
 | |
| 
 | |
| postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
 | |
|     slot_name    |    plugin     | slot_type | database | active | restart_lsn | confirmed_flush_lsn
 | |
| -----------------+---------------+-----------+----------+--------+-------------+-----------------
 | |
|  regression_slot | test_decoding | logical   | postgres | f      | 0/16A4408   | 0/16A4440
 | |
| (1 row)
 | |
| 
 | |
| postgres=# -- There are no changes to see yet
 | |
| postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 | |
|  lsn | xid | data
 | |
| -----+-----+------
 | |
| (0 rows)
 | |
| 
 | |
| postgres=# CREATE TABLE data(id serial primary key, data text);
 | |
| CREATE TABLE
 | |
| 
 | |
| postgres=# -- DDL isn't replicated, so all you'll see is the transaction
 | |
| postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 | |
|     lsn    |  xid  |     data
 | |
| -----------+-------+--------------
 | |
|  0/BA2DA58 | 10297 | BEGIN 10297
 | |
|  0/BA5A5A0 | 10297 | COMMIT 10297
 | |
| (2 rows)
 | |
| 
 | |
| postgres=# -- Once changes are read, they're consumed and not emitted
 | |
| postgres=# -- in a subsequent call:
 | |
| postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 | |
|  lsn | xid | data
 | |
| -----+-----+------
 | |
| (0 rows)
 | |
| 
 | |
| postgres=# BEGIN;
 | |
| postgres=*# INSERT INTO data(data) VALUES('1');
 | |
| postgres=*# INSERT INTO data(data) VALUES('2');
 | |
| postgres=*# COMMIT;
 | |
| 
 | |
| postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 | |
|     lsn    |  xid  |                          data
 | |
| -----------+-------+---------------------------------------------------------
 | |
|  0/BA5A688 | 10298 | BEGIN 10298
 | |
|  0/BA5A6F0 | 10298 | table public.data: INSERT: id[integer]:1 data[text]:'1'
 | |
|  0/BA5A7F8 | 10298 | table public.data: INSERT: id[integer]:2 data[text]:'2'
 | |
|  0/BA5A8A8 | 10298 | COMMIT 10298
 | |
| (4 rows)
 | |
| 
 | |
| postgres=# INSERT INTO data(data) VALUES('3');
 | |
| 
 | |
| postgres=# -- You can also peek ahead in the change stream without consuming changes
 | |
| postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
 | |
|     lsn    |  xid  |                          data
 | |
| -----------+-------+---------------------------------------------------------
 | |
|  0/BA5A8E0 | 10299 | BEGIN 10299
 | |
|  0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
 | |
|  0/BA5A990 | 10299 | COMMIT 10299
 | |
| (3 rows)
 | |
| 
 | |
| postgres=# -- The next call to pg_logical_slot_peek_changes() returns the same changes again
 | |
| postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
 | |
|     lsn    |  xid  |                          data
 | |
| -----------+-------+---------------------------------------------------------
 | |
|  0/BA5A8E0 | 10299 | BEGIN 10299
 | |
|  0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
 | |
|  0/BA5A990 | 10299 | COMMIT 10299
 | |
| (3 rows)
 | |
| 
 | |
| postgres=# -- options can be passed to output plugin, to influence the formatting
 | |
| postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on');
 | |
|     lsn    |  xid  |                          data
 | |
| -----------+-------+---------------------------------------------------------
 | |
|  0/BA5A8E0 | 10299 | BEGIN 10299
 | |
|  0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
 | |
|  0/BA5A990 | 10299 | COMMIT 10299 (at 2017-05-10 12:07:21.272494-04)
 | |
| (3 rows)
 | |
| 
 | |
| postgres=# -- Remember to destroy a slot you no longer need to stop it consuming
 | |
| postgres=# -- server resources:
 | |
| postgres=# SELECT pg_drop_replication_slot('regression_slot');
 | |
|  pg_drop_replication_slot
 | |
| -----------------------
 | |
| 
 | |
| (1 row)
 | |
| </programlisting>
 | |
| 
 | |
|    <para>
 | |
|     The following examples shows how logical decoding is controlled over the
 | |
|     streaming replication protocol, using the
 | |
|     program <xref linkend="app-pgrecvlogical"/> included in the PostgreSQL
 | |
|     distribution.  This requires that client authentication is set up to allow
 | |
|     replication connections
 | |
|     (see <xref linkend="streaming-replication-authentication"/>) and
 | |
|     that <varname>max_wal_senders</varname> is set sufficiently high to allow
 | |
|     an additional connection.  The second example shows how to stream two-phase
 | |
|     transactions.  Before you use two-phase commands, you must set
 | |
|     <xref linkend="guc-max-prepared-transactions"/> to at least 1.
 | |
|    </para>
 | |
| <programlisting>
 | |
| Example 1:
 | |
| $ pg_recvlogical -d postgres --slot=test --create-slot
 | |
| $ pg_recvlogical -d postgres --slot=test --start -f -
 | |
| <keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
 | |
| $ psql -d postgres -c "INSERT INTO data(data) VALUES('4');"
 | |
| $ fg
 | |
| BEGIN 693
 | |
| table public.data: INSERT: id[integer]:4 data[text]:'4'
 | |
| COMMIT 693
 | |
| <keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
 | |
| $ pg_recvlogical -d postgres --slot=test --drop-slot
 | |
| 
 | |
| Example 2:
 | |
| $ pg_recvlogical -d postgres --slot=test --create-slot --two-phase
 | |
| $ pg_recvlogical -d postgres --slot=test --start -f -
 | |
| <keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
 | |
| $ psql -d postgres -c "BEGIN;INSERT INTO data(data) VALUES('5');PREPARE TRANSACTION 'test';"
 | |
| $ fg
 | |
| BEGIN 694
 | |
| table public.data: INSERT: id[integer]:5 data[text]:'5'
 | |
| PREPARE TRANSACTION 'test', txid 694
 | |
| <keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
 | |
| $ psql -d postgres -c "COMMIT PREPARED 'test';"
 | |
| $ fg
 | |
| COMMIT PREPARED 'test', txid 694
 | |
| <keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
 | |
| $ pg_recvlogical -d postgres --slot=test --drop-slot
 | |
| </programlisting>
 | |
| 
 | |
|   <para>
 | |
|   The following example shows SQL interface that can be used to decode prepared
 | |
|   transactions. Before you use two-phase commit commands, you must set
 | |
|   <varname>max_prepared_transactions</varname> to at least 1. You must also have
 | |
|   set the two-phase parameter as 'true' while creating the slot using
 | |
|   <function>pg_create_logical_replication_slot</function>
 | |
|   Note that we will stream the entire transaction after the commit if it
 | |
|   is not already decoded.
 | |
|   </para>
 | |
| <programlisting>
 | |
| postgres=# BEGIN;
 | |
| postgres=*# INSERT INTO data(data) VALUES('5');
 | |
| postgres=*# PREPARE TRANSACTION 'test_prepared1';
 | |
| 
 | |
| postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 | |
|     lsn    | xid |                          data
 | |
| -----------+-----+---------------------------------------------------------
 | |
|  0/1689DC0 | 529 | BEGIN 529
 | |
|  0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
 | |
|  0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
 | |
| (3 rows)
 | |
| 
 | |
| postgres=# COMMIT PREPARED 'test_prepared1';
 | |
| postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 | |
|     lsn    | xid |                    data
 | |
| -----------+-----+--------------------------------------------
 | |
|  0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
 | |
| (4 row)
 | |
| 
 | |
| postgres=#-- you can also rollback a prepared transaction
 | |
| postgres=# BEGIN;
 | |
| postgres=*# INSERT INTO data(data) VALUES('6');
 | |
| postgres=*# PREPARE TRANSACTION 'test_prepared2';
 | |
| postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 | |
|     lsn    | xid |                          data
 | |
| -----------+-----+---------------------------------------------------------
 | |
|  0/168A180 | 530 | BEGIN 530
 | |
|  0/168A1E8 | 530 | table public.data: INSERT: id[integer]:4 data[text]:'6'
 | |
|  0/168A430 | 530 | PREPARE TRANSACTION 'test_prepared2', txid 530
 | |
| (3 rows)
 | |
| 
 | |
| postgres=# ROLLBACK PREPARED 'test_prepared2';
 | |
| postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 | |
|     lsn    | xid |                     data
 | |
| -----------+-----+----------------------------------------------
 | |
|  0/168A4B8 | 530 | ROLLBACK PREPARED 'test_prepared2', txid 530
 | |
| (1 row)
 | |
| </programlisting>
 | |
| </sect1>
 | |
| 
 | |
|   <sect1 id="logicaldecoding-explanation">
 | |
|    <title>Logical Decoding Concepts</title>
 | |
|    <sect2 id="logicaldecoding-explanation-log-dec">
 | |
|     <title>Logical Decoding</title>
 | |
| 
 | |
|     <indexterm>
 | |
|      <primary>Logical Decoding</primary>
 | |
|     </indexterm>
 | |
| 
 | |
|     <para>
 | |
|      Logical decoding is the process of extracting all persistent changes
 | |
|      to a database's tables into a coherent, easy to understand format which
 | |
|      can be interpreted without detailed knowledge of the database's internal
 | |
|      state.
 | |
|     </para>
 | |
| 
 | |
|     <para>
 | |
|      In <productname>PostgreSQL</productname>, logical decoding is implemented
 | |
|      by decoding the contents of the <link linkend="wal">write-ahead
 | |
|      log</link>, which describe changes on a storage level, into an
 | |
|      application-specific form such as a stream of tuples or SQL statements.
 | |
|     </para>
 | |
|    </sect2>
 | |
| 
 | |
|    <sect2 id="logicaldecoding-replication-slots">
 | |
|     <title>Replication Slots</title>
 | |
| 
 | |
|     <indexterm>
 | |
|      <primary>replication slot</primary>
 | |
|      <secondary>logical replication</secondary>
 | |
|     </indexterm>
 | |
| 
 | |
|     <para>
 | |
|      In the context of logical replication, a slot represents a stream of
 | |
|      changes that can be replayed to a client in the order they were made on
 | |
|      the origin server. Each slot streams a sequence of changes from a single
 | |
|      database.
 | |
|     </para>
 | |
| 
 | |
|     <note>
 | |
|      <para><productname>PostgreSQL</productname> also has streaming replication slots
 | |
|      (see <xref linkend="streaming-replication"/>), but they are used somewhat
 | |
|      differently there.
 | |
|      </para>
 | |
|     </note>
 | |
| 
 | |
|     <para>
 | |
|      A replication slot has an identifier that is unique across all databases
 | |
|      in a <productname>PostgreSQL</productname> cluster. Slots persist
 | |
|      independently of the connection using them and are crash-safe.
 | |
|     </para>
 | |
| 
 | |
|     <para>
 | |
|      A logical slot will emit each change just once in normal operation.
 | |
|      The current position of each slot is persisted only at checkpoint, so in
 | |
|      the case of a crash the slot may return to an earlier LSN, which will
 | |
|      then cause recent changes to be sent again when the server restarts.
 | |
|      Logical decoding clients are responsible for avoiding ill effects from
 | |
|      handling the same message more than once.  Clients may wish to record
 | |
|      the last LSN they saw when decoding and skip over any repeated data or
 | |
|      (when using the replication protocol) request that decoding start from
 | |
|      that LSN rather than letting the server determine the start point.
 | |
|      The Replication Progress Tracking feature is designed for this purpose,
 | |
|      refer to <link linkend="replication-origins">replication origins</link>.
 | |
|     </para>
 | |
| 
 | |
|     <para>
 | |
|      Multiple independent slots may exist for a single database. Each slot has
 | |
|      its own state, allowing different consumers to receive changes from
 | |
|      different points in the database change stream. For most applications, a
 | |
|      separate slot will be required for each consumer.
 | |
|     </para>
 | |
| 
 | |
|     <para>
 | |
|      A logical replication slot knows nothing about the state of the
 | |
|      receiver(s).  It's even possible to have multiple different receivers using
 | |
|      the same slot at different times; they'll just get the changes following
 | |
|      on from when the last receiver stopped consuming them. Only one receiver
 | |
|      may consume changes from a slot at any given time.
 | |
|     </para>
 | |
| 
 | |
|     <caution>
 | |
|      <para>
 | |
|       Replication slots persist across crashes and know nothing about the state
 | |
|       of their consumer(s). They will prevent removal of required resources
 | |
|       even when there is no connection using them. This consumes storage
 | |
|       because neither required WAL nor required rows from the system catalogs
 | |
|       can be removed by <command>VACUUM</command> as long as they are required by a replication
 | |
|       slot.  In extreme cases this could cause the database to shut down to prevent
 | |
|       transaction ID wraparound (see <xref linkend="vacuum-for-wraparound"/>).
 | |
|       So if a slot is no longer required it should be dropped.
 | |
|      </para>
 | |
|     </caution>
 | |
|    </sect2>
 | |
| 
 | |
|    <sect2 id="logicaldecoding-explanation-output-plugins">
 | |
|     <title>Output Plugins</title>
 | |
|     <para>
 | |
|      Output plugins transform the data from the write-ahead log's internal
 | |
|      representation into the format the consumer of a replication slot desires.
 | |
|     </para>
 | |
|    </sect2>
 | |
| 
 | |
|    <sect2 id="logicaldecoding-explanation-exported-snapshots">
 | |
|     <title>Exported Snapshots</title>
 | |
|     <para>
 | |
|      When a new replication slot is created using the streaming replication
 | |
|      interface (see <xref linkend="protocol-replication-create-replication-slot"/>), a
 | |
|      snapshot is exported
 | |
|      (see <xref linkend="functions-snapshot-synchronization"/>), which will show
 | |
|      exactly the state of the database after which all changes will be
 | |
|      included in the change stream. This can be used to create a new replica by
 | |
|      using <link linkend="sql-set-transaction"><literal>SET TRANSACTION
 | |
|      SNAPSHOT</literal></link> to read the state of the database at the moment
 | |
|      the slot was created. This transaction can then be used to dump the
 | |
|      database's state at that point in time, which afterwards can be updated
 | |
|      using the slot's contents without losing any changes.
 | |
|     </para>
 | |
|     <para>
 | |
|      Creation of a snapshot is not always possible.  In particular, it will
 | |
|      fail when connected to a hot standby.  Applications that do not require
 | |
|      snapshot export may suppress it with the <literal>NOEXPORT_SNAPSHOT</literal>
 | |
|      option.
 | |
|     </para>
 | |
|    </sect2>
 | |
|   </sect1>
 | |
| 
 | |
|   <sect1 id="logicaldecoding-walsender">
 | |
|    <title>Streaming Replication Protocol Interface</title>
 | |
| 
 | |
|    <para>
 | |
|     The commands
 | |
|     <itemizedlist>
 | |
|      <listitem>
 | |
|       <para><literal>CREATE_REPLICATION_SLOT <replaceable>slot_name</replaceable> LOGICAL <replaceable>output_plugin</replaceable></literal></para>
 | |
|      </listitem>
 | |
| 
 | |
|      <listitem>
 | |
|       <para><literal>DROP_REPLICATION_SLOT <replaceable>slot_name</replaceable></literal> <optional> <literal>WAIT</literal> </optional></para>
 | |
|      </listitem>
 | |
| 
 | |
|      <listitem>
 | |
|       <para><literal>START_REPLICATION SLOT <replaceable>slot_name</replaceable> LOGICAL ...</literal></para>
 | |
|      </listitem>
 | |
|     </itemizedlist>
 | |
|     are used to create, drop, and stream changes from a replication
 | |
|     slot, respectively. These commands are only available over a replication
 | |
|     connection; they cannot be used via SQL.
 | |
|     See <xref linkend="protocol-replication"/> for details on these commands.
 | |
|    </para>
 | |
| 
 | |
|    <para>
 | |
|     The command <xref linkend="app-pgrecvlogical"/> can be used to control
 | |
|     logical decoding over a streaming replication connection.  (It uses
 | |
|     these commands internally.)
 | |
|    </para>
 | |
|   </sect1>
 | |
| 
 | |
|   <sect1 id="logicaldecoding-sql">
 | |
|    <title>Logical Decoding <acronym>SQL</acronym> Interface</title>
 | |
| 
 | |
|    <para>
 | |
|      See <xref linkend="functions-replication"/> for detailed documentation on
 | |
|      the SQL-level API for interacting with logical decoding.
 | |
|    </para>
 | |
| 
 | |
|    <para>
 | |
|     Synchronous replication (see <xref linkend="synchronous-replication"/>) is
 | |
|     only supported on replication slots used over the streaming replication interface. The
 | |
|     function interface and additional, non-core interfaces do not support
 | |
|     synchronous replication.
 | |
|    </para>
 | |
|   </sect1>
 | |
| 
 | |
|   <sect1 id="logicaldecoding-catalogs">
 | |
|    <title>System Catalogs Related to Logical Decoding</title>
 | |
| 
 | |
|    <para>
 | |
|     The <link linkend="view-pg-replication-slots"><structname>pg_replication_slots</structname></link>
 | |
|     view and the
 | |
|     <link linkend="monitoring-pg-stat-replication-view">
 | |
|     <structname>pg_stat_replication</structname></link>
 | |
|     view provide information about the current state of replication slots and
 | |
|     streaming replication connections respectively. These views apply to both physical and
 | |
|     logical replication. The
 | |
|     <link linkend="monitoring-pg-stat-replication-slots-view">
 | |
|     <structname>pg_stat_replication_slots</structname></link>
 | |
|     view provides statistics information about the logical replication slots.
 | |
|    </para>
 | |
|   </sect1>
 | |
| 
 | |
|   <sect1 id="logicaldecoding-output-plugin">
 | |
|    <title>Logical Decoding Output Plugins</title>
 | |
|    <para>
 | |
|     An example output plugin can be found in the
 | |
|     <link linkend="test-decoding">
 | |
|      <filename>contrib/test_decoding</filename>
 | |
|     </link>
 | |
|     subdirectory of the PostgreSQL source tree.
 | |
|    </para>
 | |
|    <sect2 id="logicaldecoding-output-init">
 | |
|     <title>Initialization Function</title>
 | |
|     <indexterm zone="logicaldecoding-output-init">
 | |
|      <primary>_PG_output_plugin_init</primary>
 | |
|     </indexterm>
 | |
|     <para>
 | |
|      An output plugin is loaded by dynamically loading a shared library with
 | |
|      the output plugin's name as the library base name. The normal library
 | |
|      search path is used to locate the library. To provide the required output
 | |
|      plugin callbacks and to indicate that the library is actually an output
 | |
|      plugin it needs to provide a function named
 | |
|      <function>_PG_output_plugin_init</function>. This function is passed a
 | |
|      struct that needs to be filled with the callback function pointers for
 | |
|      individual actions.
 | |
| <programlisting>
 | |
| typedef struct OutputPluginCallbacks
 | |
| {
 | |
|     LogicalDecodeStartupCB startup_cb;
 | |
|     LogicalDecodeBeginCB begin_cb;
 | |
|     LogicalDecodeChangeCB change_cb;
 | |
|     LogicalDecodeTruncateCB truncate_cb;
 | |
|     LogicalDecodeCommitCB commit_cb;
 | |
|     LogicalDecodeMessageCB message_cb;
 | |
|     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 | |
|     LogicalDecodeShutdownCB shutdown_cb;
 | |
|     LogicalDecodeFilterPrepareCB filter_prepare_cb;
 | |
|     LogicalDecodeBeginPrepareCB begin_prepare_cb;
 | |
|     LogicalDecodePrepareCB prepare_cb;
 | |
|     LogicalDecodeCommitPreparedCB commit_prepared_cb;
 | |
|     LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
 | |
|     LogicalDecodeStreamStartCB stream_start_cb;
 | |
|     LogicalDecodeStreamStopCB stream_stop_cb;
 | |
|     LogicalDecodeStreamAbortCB stream_abort_cb;
 | |
|     LogicalDecodeStreamPrepareCB stream_prepare_cb;
 | |
|     LogicalDecodeStreamCommitCB stream_commit_cb;
 | |
|     LogicalDecodeStreamChangeCB stream_change_cb;
 | |
|     LogicalDecodeStreamMessageCB stream_message_cb;
 | |
|     LogicalDecodeStreamTruncateCB stream_truncate_cb;
 | |
| } OutputPluginCallbacks;
 | |
| 
 | |
| typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
 | |
| </programlisting>
 | |
|      The <function>begin_cb</function>, <function>change_cb</function>
 | |
|      and <function>commit_cb</function> callbacks are required,
 | |
|      while <function>startup_cb</function>, <function>truncate_cb</function>,
 | |
|      <function>message_cb</function>, <function>filter_by_origin_cb</function>,
 | |
|      and <function>shutdown_cb</function> are optional.
 | |
|      If <function>truncate_cb</function> is not set but a
 | |
|      <command>TRUNCATE</command> is to be decoded, the action will be ignored.
 | |
|     </para>
 | |
| 
 | |
|     <para>
 | |
|      An output plugin may also define functions to support streaming of large,
 | |
|      in-progress transactions. The <function>stream_start_cb</function>,
 | |
|      <function>stream_stop_cb</function>, <function>stream_abort_cb</function>,
 | |
|      <function>stream_commit_cb</function>, and <function>stream_change_cb</function>
 | |
|      are required, while <function>stream_message_cb</function> and
 | |
|      <function>stream_truncate_cb</function> are optional. The
 | |
|      <function>stream_prepare_cb</function> is also required if the output
 | |
|      plugin also support two-phase commits.
 | |
|     </para>
 | |
| 
 | |
|     <para>
 | |
|     An output plugin may also define functions to support two-phase commits,
 | |
|     which allows actions to be decoded on the <command>PREPARE TRANSACTION</command>.
 | |
|     The <function>begin_prepare_cb</function>, <function>prepare_cb</function>,
 | |
|     <function>commit_prepared_cb</function> and <function>rollback_prepared_cb</function>
 | |
|     callbacks are required, while <function>filter_prepare_cb</function> is optional.
 | |
|     The <function>stream_prepare_cb</function> is also required if the output plugin
 | |
|     also supports the streaming of large in-progress transactions.
 | |
|     </para>
 | |
|    </sect2>
 | |
| 
 | |
|    <sect2 id="logicaldecoding-capabilities">
 | |
|     <title>Capabilities</title>
 | |
| 
 | |
|     <para>
 | |
|      To decode, format and output changes, output plugins can use most of the
 | |
|      backend's normal infrastructure, including calling output functions. Read
 | |
|      only access to relations is permitted as long as only relations are
 | |
|      accessed that either have been created by <command>initdb</command> in
 | |
|      the <literal>pg_catalog</literal> schema, or have been marked as user
 | |
|      provided catalog tables using
 | |
| <programlisting>
 | |
| ALTER TABLE user_catalog_table SET (user_catalog_table = true);
 | |
| CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
 | |
| </programlisting>
 | |
|      Note that access to user catalog tables or regular system catalog tables
 | |
|      in the output plugins has to be done via the <literal>systable_*</literal>
 | |
|      scan APIs only. Access via the <literal>heap_*</literal> scan APIs will
 | |
|      error out. Additionally, any actions leading to transaction ID assignment
 | |
|      are prohibited. That, among others, includes writing to tables, performing
 | |
|      DDL changes, and calling <literal>pg_current_xact_id()</literal>.
 | |
|     </para>
 | |
|    </sect2>
 | |
| 
 | |
|    <sect2 id="logicaldecoding-output-mode">
 | |
|     <title>Output Modes</title>
 | |
| 
 | |
|     <para>
 | |
|      Output plugin callbacks can pass data to the consumer in nearly arbitrary
 | |
|      formats. For some use cases, like viewing the changes via SQL, returning
 | |
|      data in a data type that can contain arbitrary data (e.g., <type>bytea</type>) is
 | |
|      cumbersome. If the output plugin only outputs textual data in the
 | |
|      server's encoding, it can declare that by
 | |
|      setting <literal>OutputPluginOptions.output_type</literal>
 | |
|      to <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal> instead
 | |
|      of <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal> in
 | |
|      the <link linkend="logicaldecoding-output-plugin-startup">startup
 | |
|      callback</link>. In that case, all the data has to be in the server's encoding
 | |
|      so that a <type>text</type> datum can contain it. This is checked in assertion-enabled
 | |
|      builds.
 | |
|     </para>
 | |
|    </sect2>
 | |
| 
 | |
|    <sect2 id="logicaldecoding-output-plugin-callbacks">
 | |
|     <title>Output Plugin Callbacks</title>
 | |
| 
 | |
|     <para>
 | |
|      An output plugin gets notified about changes that are happening via
 | |
|      various callbacks it needs to provide.
 | |
|     </para>
 | |
| 
 | |
|     <para>
 | |
|      Concurrent transactions are decoded in commit order, and only changes
 | |
|      belonging to a specific transaction are decoded between
 | |
|      the <literal>begin</literal> and <literal>commit</literal>
 | |
|      callbacks. Transactions that were rolled back explicitly or implicitly
 | |
|      never get
 | |
|      decoded. Successful savepoints are
 | |
|      folded into the transaction containing them in the order they were
 | |
|      executed within that transaction. A transaction that is prepared for
 | |
|      a two-phase commit using <command>PREPARE TRANSACTION</command> will
 | |
|      also be decoded if the output plugin callbacks needed for decoding
 | |
|      them are provided. It is possible that the current prepared transaction
 | |
|      which is being decoded is aborted concurrently via a
 | |
|      <command>ROLLBACK PREPARED</command> command. In that case, the logical
 | |
|      decoding of this transaction will be aborted too. All the changes of such
 | |
|      a transaction are skipped once the abort is detected and the
 | |
|      <function>prepare_cb</function> callback is invoked. Thus even in case of
 | |
|      a concurrent abort, enough information is provided to the output plugin
 | |
|      for it to properly deal with <command>ROLLBACK PREPARED</command> once
 | |
|      that is decoded.
 | |
|     </para>
 | |
| 
 | |
|     <note>
 | |
|      <para>
 | |
|       Only transactions that have already safely been flushed to disk will be
 | |
|       decoded. That can lead to a <command>COMMIT</command> not immediately being decoded in a
 | |
|       directly following <literal>pg_logical_slot_get_changes()</literal>
 | |
|       when <varname>synchronous_commit</varname> is set
 | |
|       to <literal>off</literal>.
 | |
|      </para>
 | |
|     </note>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-startup">
 | |
|      <title>Startup Callback</title>
 | |
|      <para>
 | |
|       The optional <function>startup_cb</function> callback is called whenever
 | |
|       a replication slot is created or asked to stream changes, independent
 | |
|       of the number of changes that are ready to be put out.
 | |
| <programlisting>
 | |
| typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
 | |
|                                         OutputPluginOptions *options,
 | |
|                                         bool is_init);
 | |
| </programlisting>
 | |
|       The <literal>is_init</literal> parameter will be true when the
 | |
|       replication slot is being created and false
 | |
|       otherwise. <parameter>options</parameter> points to a struct of options
 | |
|       that output plugins can set:
 | |
| <programlisting>
 | |
| typedef struct OutputPluginOptions
 | |
| {
 | |
|     OutputPluginOutputType output_type;
 | |
|     bool        receive_rewrites;
 | |
| } OutputPluginOptions;
 | |
| </programlisting>
 | |
|       <literal>output_type</literal> has to either be set to
 | |
|       <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal>
 | |
|       or <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal>. See also
 | |
|       <xref linkend="logicaldecoding-output-mode"/>.
 | |
|       If <literal>receive_rewrites</literal> is true, the output plugin will
 | |
|       also be called for changes made by heap rewrites during certain DDL
 | |
|       operations.  These are of interest to plugins that handle DDL
 | |
|       replication, but they require special handling.
 | |
|      </para>
 | |
| 
 | |
|      <para>
 | |
|       The startup callback should validate the options present in
 | |
|       <literal>ctx->output_plugin_options</literal>. If the output plugin
 | |
|       needs to have a state, it can
 | |
|       use <literal>ctx->output_plugin_private</literal> to store it.
 | |
|      </para>
 | |
|     </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-shutdown">
 | |
|      <title>Shutdown Callback</title>
 | |
| 
 | |
|      <para>
 | |
|       The optional <function>shutdown_cb</function> callback is called
 | |
|       whenever a formerly active replication slot is not used anymore and can
 | |
|       be used to deallocate resources private to the output plugin. The slot
 | |
|       isn't necessarily being dropped, streaming is just being stopped.
 | |
| <programlisting>
 | |
| typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
 | |
| </programlisting>
 | |
|      </para>
 | |
|     </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-begin">
 | |
|      <title>Transaction Begin Callback</title>
 | |
| 
 | |
|      <para>
 | |
|       The required <function>begin_cb</function> callback is called whenever a
 | |
|       start of a committed transaction has been decoded. Aborted transactions
 | |
|       and their contents never get decoded.
 | |
| <programlisting>
 | |
| typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
 | |
|                                       ReorderBufferTXN *txn);
 | |
| </programlisting>
 | |
|       The <parameter>txn</parameter> parameter contains meta information about
 | |
|       the transaction, like the time stamp at which it has been committed and
 | |
|       its XID.
 | |
|      </para>
 | |
|     </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-commit">
 | |
|      <title>Transaction End Callback</title>
 | |
| 
 | |
|      <para>
 | |
|       The required <function>commit_cb</function> callback is called whenever
 | |
|       a transaction commit has been
 | |
|       decoded. The <function>change_cb</function> callbacks for all modified
 | |
|       rows will have been called before this, if there have been any modified
 | |
|       rows.
 | |
| <programlisting>
 | |
| typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
 | |
|                                        ReorderBufferTXN *txn,
 | |
|                                        XLogRecPtr commit_lsn);
 | |
| </programlisting>
 | |
|      </para>
 | |
|     </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-change">
 | |
|      <title>Change Callback</title>
 | |
| 
 | |
|      <para>
 | |
|       The required <function>change_cb</function> callback is called for every
 | |
|       individual row modification inside a transaction, may it be
 | |
|       an <command>INSERT</command>, <command>UPDATE</command>,
 | |
|       or <command>DELETE</command>. Even if the original command modified
 | |
|       several rows at once the callback will be called individually for each
 | |
|       row. The <function>change_cb</function> callback may access system or
 | |
|       user catalog tables to aid in the process of outputting the row
 | |
|       modification details. In case of decoding a prepared (but yet
 | |
|       uncommitted) transaction or decoding of an uncommitted transaction, this
 | |
|       change callback might also error out due to simultaneous rollback of
 | |
|       this very same transaction. In that case, the logical decoding of this
 | |
|       aborted transaction is stopped gracefully.
 | |
| <programlisting>
 | |
| typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
 | |
|                                        ReorderBufferTXN *txn,
 | |
|                                        Relation relation,
 | |
|                                        ReorderBufferChange *change);
 | |
| </programlisting>
 | |
|       The <parameter>ctx</parameter> and <parameter>txn</parameter> parameters
 | |
|       have the same contents as for the <function>begin_cb</function>
 | |
|       and <function>commit_cb</function> callbacks, but additionally the
 | |
|       relation descriptor <parameter>relation</parameter> points to the
 | |
|       relation the row belongs to and a struct
 | |
|       <parameter>change</parameter> describing the row modification are passed
 | |
|       in.
 | |
|      </para>
 | |
| 
 | |
|      <note>
 | |
|       <para>
 | |
|        Only changes in user defined tables that are not unlogged
 | |
|        (see <xref linkend="sql-createtable-unlogged"/>) and not temporary
 | |
|        (see <xref linkend="sql-createtable-temporary"/>) can be extracted using
 | |
|        logical decoding.
 | |
|       </para>
 | |
|      </note>
 | |
|     </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-truncate">
 | |
|      <title>Truncate Callback</title>
 | |
| 
 | |
|      <para>
 | |
|       The optional <function>truncate_cb</function> callback is called for a
 | |
|       <command>TRUNCATE</command> command.
 | |
| <programlisting>
 | |
| typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
 | |
|                                          ReorderBufferTXN *txn,
 | |
|                                          int nrelations,
 | |
|                                          Relation relations[],
 | |
|                                          ReorderBufferChange *change);
 | |
| </programlisting>
 | |
|       The parameters are analogous to the <function>change_cb</function>
 | |
|       callback.  However, because <command>TRUNCATE</command> actions on
 | |
|       tables connected by foreign keys need to be executed together, this
 | |
|       callback receives an array of relations instead of just a single one.
 | |
|       See the description of the <xref linkend="sql-truncate"/> statement for
 | |
|       details.
 | |
|      </para>
 | |
|     </sect3>
 | |
| 
 | |
|      <sect3 id="logicaldecoding-output-plugin-filter-origin">
 | |
|      <title>Origin Filter Callback</title>
 | |
| 
 | |
|      <para>
 | |
|        The optional <function>filter_by_origin_cb</function> callback
 | |
|        is called to determine whether data that has been replayed
 | |
|        from <parameter>origin_id</parameter> is of interest to the
 | |
|        output plugin.
 | |
| <programlisting>
 | |
| typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
 | |
|                                                RepOriginId origin_id);
 | |
| </programlisting>
 | |
|       The <parameter>ctx</parameter> parameter has the same contents
 | |
|       as for the other callbacks. No information but the origin is
 | |
|       available. To signal that changes originating on the passed in
 | |
|       node are irrelevant, return true, causing them to be filtered
 | |
|       away; false otherwise. The other callbacks will not be called
 | |
|       for transactions and changes that have been filtered away.
 | |
|      </para>
 | |
|      <para>
 | |
|        This is useful when implementing cascading or multidirectional
 | |
|        replication solutions. Filtering by the origin allows to
 | |
|        prevent replicating the same changes back and forth in such
 | |
|        setups.  While transactions and changes also carry information
 | |
|        about the origin, filtering via this callback is noticeably
 | |
|        more efficient.
 | |
|      </para>
 | |
|      </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-message">
 | |
|      <title>Generic Message Callback</title>
 | |
| 
 | |
|      <para>
 | |
|       The optional <function>message_cb</function> callback is called whenever
 | |
|       a logical decoding message has been decoded.
 | |
| <programlisting>
 | |
| typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
 | |
|                                         ReorderBufferTXN *txn,
 | |
|                                         XLogRecPtr message_lsn,
 | |
|                                         bool transactional,
 | |
|                                         const char *prefix,
 | |
|                                         Size message_size,
 | |
|                                         const char *message);
 | |
| </programlisting>
 | |
|       The <parameter>txn</parameter> parameter contains meta information about
 | |
|       the transaction, like the time stamp at which it has been committed and
 | |
|       its XID. Note however that it can be NULL when the message is
 | |
|       non-transactional and the XID was not assigned yet in the transaction
 | |
|       which logged the message. The <parameter>lsn</parameter> has WAL
 | |
|       location of the message. The <parameter>transactional</parameter> says
 | |
|       if the message was sent as transactional or not. Similar to the change
 | |
|       callback, in case of decoding a prepared (but yet uncommitted)
 | |
|       transaction or decoding of an uncommitted transaction, this message
 | |
|       callback might also error out due to simultaneous rollback of
 | |
|       this very same transaction. In that case, the logical decoding of this
 | |
|       aborted transaction is stopped gracefully.
 | |
| 
 | |
|       The <parameter>prefix</parameter> is arbitrary null-terminated prefix
 | |
|       which can be used for identifying interesting messages for the current
 | |
|       plugin. And finally the <parameter>message</parameter> parameter holds
 | |
|       the actual message of <parameter>message_size</parameter> size.
 | |
|      </para>
 | |
|      <para>
 | |
|       Extra care should be taken to ensure that the prefix the output plugin
 | |
|       considers interesting is unique. Using name of the extension or the
 | |
|       output plugin itself is often a good choice.
 | |
|      </para>
 | |
|     </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-filter-prepare">
 | |
|      <title>Prepare Filter Callback</title>
 | |
| 
 | |
|      <para>
 | |
|        The optional <function>filter_prepare_cb</function> callback
 | |
|        is called to determine whether data that is part of the current
 | |
|        two-phase commit transaction should be considered for decoding
 | |
|        at this prepare stage or later as a regular one-phase transaction at
 | |
|        <command>COMMIT PREPARED</command> time. To signal that
 | |
|        decoding should be skipped, return <literal>true</literal>;
 | |
|        <literal>false</literal> otherwise. When the callback is not
 | |
|        defined, <literal>false</literal> is assumed (i.e. no filtering, all
 | |
|        transactions using two-phase commit are decoded in two phases as well).
 | |
| <programlisting>
 | |
| typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
 | |
|                                               TransactionId xid,
 | |
|                                               const char *gid);
 | |
| </programlisting>
 | |
|        The <parameter>ctx</parameter> parameter has the same contents as for
 | |
|        the other callbacks. The parameters <parameter>xid</parameter>
 | |
|        and <parameter>gid</parameter> provide two different ways to identify
 | |
|        the transaction.  The later <command>COMMIT PREPARED</command> or
 | |
|        <command>ROLLBACK PREPARED</command> carries both identifiers,
 | |
|        providing an output plugin the choice of what to use.
 | |
|      </para>
 | |
|      <para>
 | |
|        The callback may be invoked multiple times per transaction to decode
 | |
|        and must provide the same static answer for a given pair of
 | |
|        <parameter>xid</parameter> and <parameter>gid</parameter> every time
 | |
|        it is called.
 | |
|      </para>
 | |
|      </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-begin-prepare">
 | |
|      <title>Transaction Begin Prepare Callback</title>
 | |
| 
 | |
|      <para>
 | |
|       The required <function>begin_prepare_cb</function> callback is called
 | |
|       whenever the start of a prepared transaction has been decoded. The
 | |
|       <parameter>gid</parameter> field, which is part of the
 | |
|       <parameter>txn</parameter> parameter, can be used in this callback to
 | |
|       check if the plugin has already received this <command>PREPARE</command>
 | |
|       in which case it can either error out or skip the remaining changes of
 | |
|       the transaction.
 | |
| <programlisting>
 | |
| typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
 | |
|                                              ReorderBufferTXN *txn);
 | |
| </programlisting>
 | |
|      </para>
 | |
|     </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-prepare">
 | |
|      <title>Transaction Prepare Callback</title>
 | |
| 
 | |
|      <para>
 | |
|       The required <function>prepare_cb</function> callback is called whenever
 | |
|       a transaction which is prepared for two-phase commit has been
 | |
|       decoded. The <function>change_cb</function> callback for all modified
 | |
|       rows will have been called before this, if there have been any modified
 | |
|       rows. The <parameter>gid</parameter> field, which is part of the
 | |
|       <parameter>txn</parameter> parameter, can be used in this callback.
 | |
| <programlisting>
 | |
| typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
 | |
|                                         ReorderBufferTXN *txn,
 | |
|                                         XLogRecPtr prepare_lsn);
 | |
| </programlisting>
 | |
|      </para>
 | |
|     </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-commit-prepared">
 | |
|      <title>Transaction Commit Prepared Callback</title>
 | |
| 
 | |
|      <para>
 | |
|       The required <function>commit_prepared_cb</function> callback is called
 | |
|       whenever a transaction <command>COMMIT PREPARED</command> has been decoded.
 | |
|       The <parameter>gid</parameter> field, which is part of the
 | |
|       <parameter>txn</parameter> parameter, can be used in this callback.
 | |
| <programlisting>
 | |
| typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
 | |
|                                                ReorderBufferTXN *txn,
 | |
|                                                XLogRecPtr commit_lsn);
 | |
| </programlisting>
 | |
|      </para>
 | |
|     </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-rollback-prepared">
 | |
|      <title>Transaction Rollback Prepared Callback</title>
 | |
| 
 | |
|      <para>
 | |
|       The required <function>rollback_prepared_cb</function> callback is called
 | |
|       whenever a transaction <command>ROLLBACK PREPARED</command> has been
 | |
|       decoded. The <parameter>gid</parameter> field, which is part of the
 | |
|       <parameter>txn</parameter> parameter, can be used in this callback. The
 | |
|       parameters <parameter>prepare_end_lsn</parameter> and
 | |
|       <parameter>prepare_time</parameter> can be used to check if the plugin
 | |
|       has received this <command>PREPARE TRANSACTION</command> in which case
 | |
|       it can apply the rollback, otherwise, it can skip the rollback operation. The
 | |
|       <parameter>gid</parameter> alone is not sufficient because the downstream
 | |
|       node can have a prepared transaction with same identifier.
 | |
| <programlisting>
 | |
| typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
 | |
|                                                  ReorderBufferTXN *txn,
 | |
|                                                  XLogRecPtr prepare_end_lsn,
 | |
|                                                  TimestampTz prepare_time);
 | |
| </programlisting>
 | |
|      </para>
 | |
|     </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-stream-start">
 | |
|      <title>Stream Start Callback</title>
 | |
|      <para>
 | |
|       The required <function>stream_start_cb</function> callback is called when
 | |
|       opening a block of streamed changes from an in-progress transaction.
 | |
| <programlisting>
 | |
| typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
 | |
|                                             ReorderBufferTXN *txn);
 | |
| </programlisting>
 | |
|      </para>
 | |
|     </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-stream-stop">
 | |
|      <title>Stream Stop Callback</title>
 | |
|      <para>
 | |
|       The required <function>stream_stop_cb</function> callback is called when
 | |
|       closing a block of streamed changes from an in-progress transaction.
 | |
| <programlisting>
 | |
| typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
 | |
|                                            ReorderBufferTXN *txn);
 | |
| </programlisting>
 | |
|      </para>
 | |
|     </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-stream-abort">
 | |
|      <title>Stream Abort Callback</title>
 | |
|      <para>
 | |
|       The required <function>stream_abort_cb</function> callback is called to
 | |
|       abort a previously streamed transaction.
 | |
| <programlisting>
 | |
| typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
 | |
|                                             ReorderBufferTXN *txn,
 | |
|                                             XLogRecPtr abort_lsn);
 | |
| </programlisting>
 | |
|      </para>
 | |
|     </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-stream-prepare">
 | |
|      <title>Stream Prepare Callback</title>
 | |
|      <para>
 | |
|       The <function>stream_prepare_cb</function> callback is called to prepare
 | |
|       a previously streamed transaction as part of a two-phase commit. This
 | |
|       callback is required when the output plugin supports both the streaming
 | |
|       of large in-progress transactions and two-phase commits.
 | |
|       <programlisting>
 | |
| typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
 | |
|                                               ReorderBufferTXN *txn,
 | |
|                                               XLogRecPtr prepare_lsn);
 | |
| </programlisting>
 | |
|      </para>
 | |
|     </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-stream-commit">
 | |
|      <title>Stream Commit Callback</title>
 | |
|      <para>
 | |
|       The required <function>stream_commit_cb</function> callback is called to
 | |
|       commit a previously streamed transaction.
 | |
| <programlisting>
 | |
| typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
 | |
|                                              ReorderBufferTXN *txn,
 | |
|                                              XLogRecPtr commit_lsn);
 | |
| </programlisting>
 | |
|      </para>
 | |
|     </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-stream-change">
 | |
|      <title>Stream Change Callback</title>
 | |
|      <para>
 | |
|       The required <function>stream_change_cb</function> callback is called
 | |
|       when sending a change in a block of streamed changes (demarcated by
 | |
|       <function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
 | |
|       The actual changes are not displayed as the transaction can abort at a later
 | |
|       point in time and we don't decode changes for aborted transactions.
 | |
| <programlisting>
 | |
| typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
 | |
|                                              ReorderBufferTXN *txn,
 | |
|                                              Relation relation,
 | |
|                                              ReorderBufferChange *change);
 | |
| </programlisting>
 | |
|      </para>
 | |
|     </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-stream-message">
 | |
|      <title>Stream Message Callback</title>
 | |
|      <para>
 | |
|       The optional <function>stream_message_cb</function> callback is called when
 | |
|       sending a generic message in a block of streamed changes (demarcated by
 | |
|       <function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
 | |
|       The message contents for transactional messages are not displayed as the transaction
 | |
|       can abort at a later point in time and we don't decode changes for aborted
 | |
|       transactions.
 | |
| <programlisting>
 | |
| typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
 | |
|                                               ReorderBufferTXN *txn,
 | |
|                                               XLogRecPtr message_lsn,
 | |
|                                               bool transactional,
 | |
|                                               const char *prefix,
 | |
|                                               Size message_size,
 | |
|                                               const char *message);
 | |
| </programlisting>
 | |
|      </para>
 | |
|     </sect3>
 | |
| 
 | |
|     <sect3 id="logicaldecoding-output-plugin-stream-truncate">
 | |
|      <title>Stream Truncate Callback</title>
 | |
|      <para>
 | |
|       The optional <function>stream_truncate_cb</function> callback is called
 | |
|       for a <command>TRUNCATE</command> command in a block of streamed changes
 | |
|       (demarcated by <function>stream_start_cb</function> and
 | |
|       <function>stream_stop_cb</function> calls).
 | |
| <programlisting>
 | |
| typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
 | |
|                                                ReorderBufferTXN *txn,
 | |
|                                                int nrelations,
 | |
|                                                Relation relations[],
 | |
|                                                ReorderBufferChange *change);
 | |
| </programlisting>
 | |
|       The parameters are analogous to the <function>stream_change_cb</function>
 | |
|       callback.  However, because <command>TRUNCATE</command> actions on
 | |
|       tables connected by foreign keys need to be executed together, this
 | |
|       callback receives an array of relations instead of just a single one.
 | |
|       See the description of the <xref linkend="sql-truncate"/> statement for
 | |
|       details.
 | |
|      </para>
 | |
|     </sect3>
 | |
| 
 | |
|    </sect2>
 | |
| 
 | |
|    <sect2 id="logicaldecoding-output-plugin-output">
 | |
|     <title>Functions for Producing Output</title>
 | |
| 
 | |
|     <para>
 | |
|      To actually produce output, output plugins can write data to
 | |
|      the <literal>StringInfo</literal> output buffer
 | |
|      in <literal>ctx->out</literal> when inside
 | |
|      the <function>begin_cb</function>, <function>commit_cb</function>,
 | |
|      or <function>change_cb</function> callbacks. Before writing to the output
 | |
|      buffer, <function>OutputPluginPrepareWrite(ctx, last_write)</function> has
 | |
|      to be called, and after finishing writing to the
 | |
|      buffer, <function>OutputPluginWrite(ctx, last_write)</function> has to be
 | |
|      called to perform the write. The <parameter>last_write</parameter>
 | |
|      indicates whether a particular write was the callback's last write.
 | |
|     </para>
 | |
| 
 | |
|     <para>
 | |
|      The following example shows how to output data to the consumer of an
 | |
|      output plugin:
 | |
| <programlisting>
 | |
| OutputPluginPrepareWrite(ctx, true);
 | |
| appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
 | |
| OutputPluginWrite(ctx, true);
 | |
| </programlisting>
 | |
|     </para>
 | |
|    </sect2>
 | |
|   </sect1>
 | |
| 
 | |
|   <sect1 id="logicaldecoding-writer">
 | |
|    <title>Logical Decoding Output Writers</title>
 | |
| 
 | |
|    <para>
 | |
|     It is possible to add more output methods for logical decoding.
 | |
|     For details, see
 | |
|     <filename>src/backend/replication/logical/logicalfuncs.c</filename>.
 | |
|     Essentially, three functions need to be provided: one to read WAL, one to
 | |
|     prepare writing output, and one to write the output
 | |
|     (see <xref linkend="logicaldecoding-output-plugin-output"/>).
 | |
|    </para>
 | |
|   </sect1>
 | |
| 
 | |
|   <sect1 id="logicaldecoding-synchronous">
 | |
|    <title>Synchronous Replication Support for Logical Decoding</title>
 | |
|    <sect2 id="logicaldecoding-synchronous-overview">
 | |
|     <title>Overview</title>
 | |
| 
 | |
|     <para>
 | |
|      Logical decoding can be used to build
 | |
|      <link linkend="synchronous-replication">synchronous
 | |
|      replication</link> solutions with the same user interface as synchronous
 | |
|      replication for <link linkend="streaming-replication">streaming
 | |
|      replication</link>.  To do this, the streaming replication interface
 | |
|      (see <xref linkend="logicaldecoding-walsender"/>) must be used to stream out
 | |
|      data. Clients have to send <literal>Standby status update (F)</literal>
 | |
|      (see <xref linkend="protocol-replication"/>) messages, just like streaming
 | |
|      replication clients do.
 | |
|     </para>
 | |
| 
 | |
|     <note>
 | |
|      <para>
 | |
|       A synchronous replica receiving changes via logical decoding will work in
 | |
|       the scope of a single database. Since, in contrast to
 | |
|       that, <parameter>synchronous_standby_names</parameter> currently is
 | |
|       server wide, this means this technique will not work properly if more
 | |
|       than one database is actively used.
 | |
|      </para>
 | |
|     </note>
 | |
|    </sect2>
 | |
| 
 | |
|    <sect2 id="logicaldecoding-synchronous-caveats">
 | |
|     <title>Caveats</title>
 | |
| 
 | |
|     <para>
 | |
|      In synchronous replication setup, a deadlock can happen, if the transaction
 | |
|      has locked [user] catalog tables exclusively. See
 | |
|      <xref linkend="logicaldecoding-capabilities"/> for information on user
 | |
|      catalog tables. This is because logical decoding of transactions can lock
 | |
|      catalog tables to access them. To avoid this users must refrain from taking
 | |
|      an exclusive lock on [user] catalog tables. This can happen in the following
 | |
|      ways:
 | |
| 
 | |
|      <itemizedlist>
 | |
|       <listitem>
 | |
|        <para>
 | |
|         Issuing an explicit <command>LOCK</command> on <structname>pg_class</structname>
 | |
|         in a transaction.
 | |
|        </para>
 | |
|       </listitem>
 | |
| 
 | |
|       <listitem>
 | |
|        <para>
 | |
|         Perform <command>CLUSTER</command> on <structname>pg_class</structname> in
 | |
|         a transaction.
 | |
|        </para>
 | |
|       </listitem>
 | |
| 
 | |
|       <listitem>
 | |
|        <para>
 | |
|         <command>PREPARE TRANSACTION</command> after <command>LOCK</command> command
 | |
|         on <structname>pg_class</structname> and allow logical decoding of two-phase
 | |
|         transactions.
 | |
|        </para>
 | |
|       </listitem>
 | |
| 
 | |
|       <listitem>
 | |
|        <para>
 | |
|         <command>PREPARE TRANSACTION</command> after <command>CLUSTER</command>
 | |
|         command on <structname>pg_trigger</structname> and allow logical decoding of
 | |
|         two-phase transactions. This will lead to deadlock only when published table
 | |
|         have a trigger.
 | |
|        </para>
 | |
|       </listitem>
 | |
| 
 | |
|       <listitem>
 | |
|        <para>
 | |
|         Executing <command>TRUNCATE</command> on [user] catalog table in a
 | |
|         transaction.
 | |
|        </para>
 | |
|       </listitem>
 | |
|      </itemizedlist>
 | |
| 
 | |
|      Note that these commands that can cause deadlock apply to not only explicitly
 | |
|      indicated system catalog tables above but also to any other [user] catalog
 | |
|      table.
 | |
|     </para>
 | |
|    </sect2>
 | |
|   </sect1>
 | |
| 
 | |
|   <sect1 id="logicaldecoding-streaming">
 | |
|    <title>Streaming of Large Transactions for Logical Decoding</title>
 | |
| 
 | |
|    <para>
 | |
|     The basic output plugin callbacks (e.g., <function>begin_cb</function>,
 | |
|     <function>change_cb</function>, <function>commit_cb</function> and
 | |
|     <function>message_cb</function>) are only invoked when the transaction
 | |
|     actually commits. The changes are still decoded from the transaction
 | |
|     log, but are only passed to the output plugin at commit (and discarded
 | |
|     if the transaction aborts).
 | |
|    </para>
 | |
| 
 | |
|    <para>
 | |
|     This means that while the decoding happens incrementally, and may spill
 | |
|     to disk to keep memory usage under control, all the decoded changes have
 | |
|     to be transmitted when the transaction finally commits (or more precisely,
 | |
|     when the commit is decoded from the transaction log). Depending on the
 | |
|     size of the transaction and network bandwidth, the transfer time may
 | |
|     significantly increase the apply lag.
 | |
|    </para>
 | |
| 
 | |
|    <para>
 | |
|     To reduce the apply lag caused by large transactions, an output plugin
 | |
|     may provide additional callback to support incremental streaming of
 | |
|     in-progress transactions. There are multiple required streaming callbacks
 | |
|     (<function>stream_start_cb</function>, <function>stream_stop_cb</function>,
 | |
|     <function>stream_abort_cb</function>, <function>stream_commit_cb</function>
 | |
|     and <function>stream_change_cb</function>) and two optional callbacks
 | |
|     (<function>stream_message_cb</function> and <function>stream_truncate_cb</function>).
 | |
|     Also, if streaming of two-phase commands is to be supported, then additional
 | |
|     callbacks must be provided. (See <xref linkend="logicaldecoding-two-phase-commits"/>
 | |
|     for details).
 | |
|    </para>
 | |
| 
 | |
|    <para>
 | |
|     When streaming an in-progress transaction, the changes (and messages) are
 | |
|     streamed in blocks demarcated by <function>stream_start_cb</function>
 | |
|     and <function>stream_stop_cb</function> callbacks. Once all the decoded
 | |
|     changes are transmitted, the transaction can be committed using the
 | |
|     <function>stream_commit_cb</function> callback
 | |
|     (or possibly aborted using the <function>stream_abort_cb</function> callback).
 | |
|     If two-phase commits are supported, the transaction can be prepared using the
 | |
|     <function>stream_prepare_cb</function> callback,
 | |
|     <command>COMMIT PREPARED</command> using the
 | |
|     <function>commit_prepared_cb</function> callback or aborted using the
 | |
|     <function>rollback_prepared_cb</function>.
 | |
|    </para>
 | |
| 
 | |
|    <para>
 | |
|     One example sequence of streaming callback calls for one transaction may
 | |
|     look like this:
 | |
| <programlisting>
 | |
| stream_start_cb(...);   <-- start of first block of changes
 | |
|   stream_change_cb(...);
 | |
|   stream_change_cb(...);
 | |
|   stream_message_cb(...);
 | |
|   stream_change_cb(...);
 | |
|   ...
 | |
|   stream_change_cb(...);
 | |
| stream_stop_cb(...);    <-- end of first block of changes
 | |
| 
 | |
| stream_start_cb(...);   <-- start of second block of changes
 | |
|   stream_change_cb(...);
 | |
|   stream_change_cb(...);
 | |
|   stream_change_cb(...);
 | |
|   ...
 | |
|   stream_message_cb(...);
 | |
|   stream_change_cb(...);
 | |
| stream_stop_cb(...);    <-- end of second block of changes
 | |
| 
 | |
| 
 | |
| [a. when using normal commit]
 | |
| stream_commit_cb(...);    <-- commit of the streamed transaction
 | |
| 
 | |
| [b. when using two-phase commit]
 | |
| stream_prepare_cb(...);   <-- prepare the streamed transaction
 | |
| commit_prepared_cb(...);  <-- commit of the prepared transaction
 | |
| </programlisting>
 | |
|    </para>
 | |
| 
 | |
|    <para>
 | |
|     The actual sequence of callback calls may be more complicated, of course.
 | |
|     There may be blocks for multiple streamed transactions, some of the
 | |
|     transactions may get aborted, etc.
 | |
|    </para>
 | |
| 
 | |
|    <para>
 | |
|     Similar to spill-to-disk behavior, streaming is triggered when the total
 | |
|     amount of changes decoded from the WAL (for all in-progress transactions)
 | |
|     exceeds the limit defined by <varname>logical_decoding_work_mem</varname> setting.
 | |
|     At that point, the largest top-level transaction (measured by the amount of memory
 | |
|     currently used for decoded changes) is selected and streamed.  However, in
 | |
|     some cases we still have to spill to disk even if streaming is enabled
 | |
|     because we exceed the memory threshold but still have not decoded the
 | |
|     complete tuple e.g., only decoded toast table insert but not the main table
 | |
|     insert.
 | |
|    </para>
 | |
| 
 | |
|    <para>
 | |
|     Even when streaming large transactions, the changes are still applied in
 | |
|     commit order, preserving the same guarantees as the non-streaming mode.
 | |
|    </para>
 | |
| 
 | |
|   </sect1>
 | |
| 
 | |
|   <sect1 id="logicaldecoding-two-phase-commits">
 | |
|    <title>Two-phase Commit Support for Logical Decoding</title>
 | |
| 
 | |
|    <para>
 | |
|     With the basic output plugin callbacks (eg., <function>begin_cb</function>,
 | |
|     <function>change_cb</function>, <function>commit_cb</function> and
 | |
|     <function>message_cb</function>) two-phase commit commands like
 | |
|     <command>PREPARE TRANSACTION</command>, <command>COMMIT PREPARED</command>
 | |
|     and <command>ROLLBACK PREPARED</command> are not decoded. While the
 | |
|     <command>PREPARE TRANSACTION</command> is ignored,
 | |
|     <command>COMMIT PREPARED</command> is decoded as a <command>COMMIT</command>
 | |
|     and <command>ROLLBACK PREPARED</command> is decoded as a
 | |
|     <command>ROLLBACK</command>.
 | |
|    </para>
 | |
| 
 | |
|    <para>
 | |
|     To support the streaming of two-phase commands, an output plugin needs to
 | |
|     provide additional callbacks. There are multiple two-phase commit callbacks
 | |
|     that are required, (<function>begin_prepare_cb</function>,
 | |
|     <function>prepare_cb</function>, <function>commit_prepared_cb</function>,
 | |
|     <function>rollback_prepared_cb</function> and
 | |
|     <function>stream_prepare_cb</function>) and an optional callback
 | |
|     (<function>filter_prepare_cb</function>).
 | |
|    </para>
 | |
| 
 | |
|    <para>
 | |
|     If the output plugin callbacks for decoding two-phase commit commands are
 | |
|     provided, then on <command>PREPARE TRANSACTION</command>, the changes of
 | |
|     that transaction are decoded, passed to the output plugin, and the
 | |
|     <function>prepare_cb</function> callback is invoked. This differs from the
 | |
|     basic decoding setup where changes are only passed to the output plugin
 | |
|     when a transaction is committed. The start of a prepared transaction is
 | |
|     indicated by the <function>begin_prepare_cb</function> callback.
 | |
|    </para>
 | |
| 
 | |
|    <para>
 | |
|     When a prepared transaction is rolled back using the
 | |
|     <command>ROLLBACK PREPARED</command>, then the
 | |
|     <function>rollback_prepared_cb</function> callback is invoked and when the
 | |
|     prepared transaction is committed using <command>COMMIT PREPARED</command>,
 | |
|     then the <function>commit_prepared_cb</function> callback is invoked.
 | |
|    </para>
 | |
| 
 | |
|    <para>
 | |
|     Optionally the output plugin can define filtering rules via
 | |
|     <function>filter_prepare_cb</function> to decode only specific transaction
 | |
|     in two phases.  This can be achieved by pattern matching on the
 | |
|     <parameter>gid</parameter> or via lookups using the
 | |
|     <parameter>xid</parameter>.
 | |
|    </para>
 | |
| 
 | |
|    <para>
 | |
|     The users that want to decode prepared transactions need to be careful about
 | |
|     below mentioned points:
 | |
| 
 | |
|     <itemizedlist>
 | |
|      <listitem>
 | |
|       <para>
 | |
|        If the prepared transaction has locked [user] catalog tables exclusively
 | |
|        then decoding prepare can block till the main transaction is committed.
 | |
|       </para>
 | |
|      </listitem>
 | |
| 
 | |
|      <listitem>
 | |
|       <para>
 | |
|        The logical replication solution that builds distributed two phase commit
 | |
|        using this feature can deadlock if the prepared transaction has locked
 | |
|        [user] catalog tables exclusively. To avoid this users must refrain from
 | |
|        having locks on catalog tables (e.g. explicit <command>LOCK</command> command)
 | |
|        in such transactions.
 | |
|        See <xref linkend="logicaldecoding-synchronous-caveats"/> for the details.
 | |
|       </para>
 | |
|      </listitem>
 | |
|     </itemizedlist>
 | |
|    </para>
 | |
| 
 | |
|   </sect1>
 | |
|  </chapter>
 |