From ac4645c0157fc5fcef0af8ff571512aa284a2cec Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Tue, 6 Apr 2021 08:40:47 +0530 Subject: [PATCH] Allow pgoutput to send logical decoding messages. The output plugin accepts a new parameter (messages) that controls if logical decoding messages are written into the replication stream. It is useful for those clients that use pgoutput as an output plugin and needs to process messages that were written by pg_logical_emit_message(). Although logical streaming replication protocol supports logical decoding messages now, logical replication does not use this feature yet. Author: David Pirotte, Euler Taveira Reviewed-by: Euler Taveira, Andres Freund, Ashutosh Bapat, Amit Kapila Discussion: https://postgr.es/m/CADK3HHJ-+9SO7KuRLH=9Wa1rAo60Yreq1GFNkH_kd0=CdaWM+A@mail.gmail.com --- doc/src/sgml/protocol.sgml | 76 ++++++++++ src/backend/replication/logical/proto.c | 28 ++++ src/backend/replication/logical/worker.c | 9 ++ src/backend/replication/pgoutput/pgoutput.c | 47 +++++++ src/include/replication/logicalproto.h | 3 + src/include/replication/pgoutput.h | 1 + src/test/subscription/t/020_messages.pl | 148 ++++++++++++++++++++ 7 files changed, 312 insertions(+) create mode 100644 src/test/subscription/t/020_messages.pl diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 43092fe62a6..380be5fb17c 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -6433,6 +6433,82 @@ Begin + + +Message + + + + + + + + Byte1('M') + + + + Identifies the message as a logical decoding message. + + + + + + Int32 + + + + Xid of the transaction. The XID is sent only when user has + requested streaming of in-progress transactions. + + + + + + Int8 + + + + Flags; Either 0 for no flags or 1 if the logical decoding + message is transactional. + + + + + + Int64 + + + + The LSN of the logical decoding message. + + + + + + String + + + + The prefix of the logical decoding message. + + + + + + Byten + + + + The content of the logical decoding message. + + + + + + + + + Commit diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index f2c85cabb52..2a1f9830e05 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -25,6 +25,7 @@ */ #define LOGICALREP_IS_REPLICA_IDENTITY 1 +#define MESSAGE_TRANSACTIONAL (1<<0) #define TRUNCATE_CASCADE (1<<0) #define TRUNCATE_RESTART_SEQS (1<<1) @@ -361,6 +362,33 @@ logicalrep_read_truncate(StringInfo in, return relids; } +/* + * Write MESSAGE to stream + */ +void +logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, + bool transactional, const char *prefix, Size sz, + const char *message) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE); + + /* encode and send message flags */ + if (transactional) + flags |= MESSAGE_TRANSACTIONAL; + + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + + pq_sendint8(out, flags); + pq_sendint64(out, lsn); + pq_sendstring(out, prefix); + pq_sendint32(out, sz); + pq_sendbytes(out, message, sz); +} + /* * Write relation description to the output stream. */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 354fbe4b4bc..74d538b5e37 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1939,6 +1939,15 @@ apply_dispatch(StringInfo s) apply_handle_origin(s); return; + case LOGICAL_REP_MSG_MESSAGE: + + /* + * Logical replication does not use generic logical messages yet. + * Although, it could be used by other applications that use this + * output plugin. + */ + return; + case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); return; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 6146c5acdb3..f68348dcf45 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -45,6 +45,10 @@ static void pgoutput_change(LogicalDecodingContext *ctx, static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); +static void pgoutput_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, + Size sz, const char *message); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, @@ -142,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->begin_cb = pgoutput_begin_txn; cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; + cb->message_cb = pgoutput_message; cb->commit_cb = pgoutput_commit_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; @@ -152,6 +157,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_abort_cb = pgoutput_stream_abort; cb->stream_commit_cb = pgoutput_stream_commit; cb->stream_change_cb = pgoutput_change; + cb->stream_message_cb = pgoutput_message; cb->stream_truncate_cb = pgoutput_truncate; } @@ -162,10 +168,12 @@ parse_output_parameters(List *options, PGOutputData *data) bool protocol_version_given = false; bool publication_names_given = false; bool binary_option_given = false; + bool messages_option_given = false; bool streaming_given = false; data->binary = false; data->streaming = false; + data->messages = false; foreach(lc, options) { @@ -221,6 +229,16 @@ parse_output_parameters(List *options, PGOutputData *data) data->binary = defGetBoolean(defel); } + else if (strcmp(defel->defname, "messages") == 0) + { + if (messages_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + messages_option_given = true; + + data->messages = defGetBoolean(defel); + } else if (strcmp(defel->defname, "streaming") == 0) { if (streaming_given) @@ -689,6 +707,35 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, MemoryContextReset(data->context); } +static void +pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, + const char *message) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + TransactionId xid = InvalidTransactionId; + + if (!data->messages) + return; + + /* + * Remember the xid for the message in streaming mode. See + * pgoutput_change. + */ + if (in_streaming) + xid = txn->xid; + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_message(ctx->out, + xid, + message_lsn, + transactional, + prefix, + sz, + message); + OutputPluginWrite(ctx, true); +} + /* * Currently we always forward. */ diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index fa4c37277b1..55b90c03eac 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -54,6 +54,7 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_TRUNCATE = 'T', LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_TYPE = 'Y', + LOGICAL_REP_MSG_MESSAGE = 'M', LOGICAL_REP_MSG_STREAM_START = 'S', LOGICAL_REP_MSG_STREAM_END = 'E', LOGICAL_REP_MSG_STREAM_COMMIT = 'c', @@ -151,6 +152,8 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, bool cascade, bool restart_seqs); extern List *logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs); +extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, + bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index bb383d523ee..51e7c0348da 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -26,6 +26,7 @@ typedef struct PGOutputData List *publications; bool binary; bool streaming; + bool messages; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl new file mode 100644 index 00000000000..d9123ed3ef1 --- /dev/null +++ b/src/test/subscription/t/020_messages.pl @@ -0,0 +1,148 @@ +# Tests that logical decoding messages +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 5; + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_test (a int primary key)"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_test (a int primary key)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_test"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" +); + +# Ensure a transactional logical decoding message shows up on the slot +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); + +$node_publisher->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')" +); + +my $result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 0) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true') +)); + +# 66 77 67 == B M C == BEGIN MESSAGE COMMIT +is($result, qq(66 +77 +67), + 'messages on slot are B M C with message option'); + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 1), encode(substr(data, 11, 8), 'escape') + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true') + OFFSET 1 LIMIT 1 +)); + +is($result, qq(1|pgoutput), + "flag transactional is set to 1 and prefix is pgoutput"); + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 0) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub') +)); + +# 66 67 == B C == BEGIN COMMIT +is($result, qq(66 +67), + 'option messages defaults to false so message (M) is not available on slot'); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE"); +$node_publisher->wait_for_catchup('tap_sub'); + +# ensure a non-transactional logical decoding message shows up on the slot +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)"); + +my $message_lsn = $node_publisher->safe_psql('postgres', + "SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message')"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (2)"); + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 0), get_byte(data, 1) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true') + WHERE lsn = '$message_lsn' AND xid = 0 +)); + +is($result, qq(77|0), 'non-transactional message on slot is M'); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE"); +$node_publisher->wait_for_catchup('tap_sub'); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); + +# wait for the replication connection to drop from the publisher +$node_publisher->poll_query_until('postgres', + 'SELECT COUNT(*) FROM pg_catalog.pg_stat_replication', 0); + +# Ensure a non-transactional logical decoding message shows up on the slot when +# it is emitted within an aborted transaction. The message won't emit until +# something advances the LSN, hence, we intentionally forces the server to +# switch to a new WAL file. +$node_publisher->safe_psql( + 'postgres', qq( + BEGIN; + SELECT pg_logical_emit_message(false, 'pgoutput', + 'a non-transactional message is available even if the transaction is aborted 1'); + INSERT INTO tab_test VALUES (3); + SELECT pg_logical_emit_message(true, 'pgoutput', + 'a transactional message is not available if the transaction is aborted'); + SELECT pg_logical_emit_message(false, 'pgoutput', + 'a non-transactional message is available even if the transaction is aborted 2'); + ROLLBACK; + SELECT pg_switch_wal(); +)); + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 0), get_byte(data, 1) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true') +)); + +is($result, qq(77|0 +77|0), + 'non-transactional message on slot from aborted transaction is M'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast');