diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 43092fe62a6..380be5fb17c 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -6433,6 +6433,82 @@ Begin
+
+
+Message
+
+
+
+
+
+
+
+ Byte1('M')
+
+
+
+ Identifies the message as a logical decoding message.
+
+
+
+
+
+ Int32
+
+
+
+ Xid of the transaction. The XID is sent only when user has
+ requested streaming of in-progress transactions.
+
+
+
+
+
+ Int8
+
+
+
+ Flags; Either 0 for no flags or 1 if the logical decoding
+ message is transactional.
+
+
+
+
+
+ Int64
+
+
+
+ The LSN of the logical decoding message.
+
+
+
+
+
+ String
+
+
+
+ The prefix of the logical decoding message.
+
+
+
+
+
+ Byten
+
+
+
+ The content of the logical decoding message.
+
+
+
+
+
+
+
+
+
Commit
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index f2c85cabb52..2a1f9830e05 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -25,6 +25,7 @@
*/
#define LOGICALREP_IS_REPLICA_IDENTITY 1
+#define MESSAGE_TRANSACTIONAL (1<<0)
#define TRUNCATE_CASCADE (1<<0)
#define TRUNCATE_RESTART_SEQS (1<<1)
@@ -361,6 +362,33 @@ logicalrep_read_truncate(StringInfo in,
return relids;
}
+/*
+ * Write MESSAGE to stream
+ */
+void
+logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+ bool transactional, const char *prefix, Size sz,
+ const char *message)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
+
+ /* encode and send message flags */
+ if (transactional)
+ flags |= MESSAGE_TRANSACTIONAL;
+
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
+ pq_sendint8(out, flags);
+ pq_sendint64(out, lsn);
+ pq_sendstring(out, prefix);
+ pq_sendint32(out, sz);
+ pq_sendbytes(out, message, sz);
+}
+
/*
* Write relation description to the output stream.
*/
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 354fbe4b4bc..74d538b5e37 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1939,6 +1939,15 @@ apply_dispatch(StringInfo s)
apply_handle_origin(s);
return;
+ case LOGICAL_REP_MSG_MESSAGE:
+
+ /*
+ * Logical replication does not use generic logical messages yet.
+ * Although, it could be used by other applications that use this
+ * output plugin.
+ */
+ return;
+
case LOGICAL_REP_MSG_STREAM_START:
apply_handle_stream_start(s);
return;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 6146c5acdb3..f68348dcf45 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -45,6 +45,10 @@ static void pgoutput_change(LogicalDecodingContext *ctx,
static void pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, int nrelations, Relation relations[],
ReorderBufferChange *change);
+static void pgoutput_message(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+ bool transactional, const char *prefix,
+ Size sz, const char *message);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
@@ -142,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->begin_cb = pgoutput_begin_txn;
cb->change_cb = pgoutput_change;
cb->truncate_cb = pgoutput_truncate;
+ cb->message_cb = pgoutput_message;
cb->commit_cb = pgoutput_commit_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown;
@@ -152,6 +157,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_abort_cb = pgoutput_stream_abort;
cb->stream_commit_cb = pgoutput_stream_commit;
cb->stream_change_cb = pgoutput_change;
+ cb->stream_message_cb = pgoutput_message;
cb->stream_truncate_cb = pgoutput_truncate;
}
@@ -162,10 +168,12 @@ parse_output_parameters(List *options, PGOutputData *data)
bool protocol_version_given = false;
bool publication_names_given = false;
bool binary_option_given = false;
+ bool messages_option_given = false;
bool streaming_given = false;
data->binary = false;
data->streaming = false;
+ data->messages = false;
foreach(lc, options)
{
@@ -221,6 +229,16 @@ parse_output_parameters(List *options, PGOutputData *data)
data->binary = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "messages") == 0)
+ {
+ if (messages_option_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ messages_option_given = true;
+
+ data->messages = defGetBoolean(defel);
+ }
else if (strcmp(defel->defname, "streaming") == 0)
{
if (streaming_given)
@@ -689,6 +707,35 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
MemoryContextReset(data->context);
}
+static void
+pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
+ const char *message)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ TransactionId xid = InvalidTransactionId;
+
+ if (!data->messages)
+ return;
+
+ /*
+ * Remember the xid for the message in streaming mode. See
+ * pgoutput_change.
+ */
+ if (in_streaming)
+ xid = txn->xid;
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_message(ctx->out,
+ xid,
+ message_lsn,
+ transactional,
+ prefix,
+ sz,
+ message);
+ OutputPluginWrite(ctx, true);
+}
+
/*
* Currently we always forward.
*/
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index fa4c37277b1..55b90c03eac 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -54,6 +54,7 @@ typedef enum LogicalRepMsgType
LOGICAL_REP_MSG_TRUNCATE = 'T',
LOGICAL_REP_MSG_RELATION = 'R',
LOGICAL_REP_MSG_TYPE = 'Y',
+ LOGICAL_REP_MSG_MESSAGE = 'M',
LOGICAL_REP_MSG_STREAM_START = 'S',
LOGICAL_REP_MSG_STREAM_END = 'E',
LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
@@ -151,6 +152,8 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
bool cascade, bool restart_seqs);
extern List *logicalrep_read_truncate(StringInfo in,
bool *cascade, bool *restart_seqs);
+extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+ bool transactional, const char *prefix, Size sz, const char *message);
extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
Relation rel);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index bb383d523ee..51e7c0348da 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -26,6 +26,7 @@ typedef struct PGOutputData
List *publications;
bool binary;
bool streaming;
+ bool messages;
} PGOutputData;
#endif /* PGOUTPUT_H */
diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl
new file mode 100644
index 00000000000..d9123ed3ef1
--- /dev/null
+++ b/src/test/subscription/t/020_messages.pl
@@ -0,0 +1,148 @@
+# Tests that logical decoding messages
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 5;
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_test (a int primary key)");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_test (a int primary key)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_test");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
+);
+
+# Ensure a transactional logical decoding message shows up on the slot
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
+
+$node_publisher->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')"
+);
+
+my $result = $node_publisher->safe_psql(
+ 'postgres', qq(
+ SELECT get_byte(data, 0)
+ FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+ 'proto_version', '1',
+ 'publication_names', 'tap_pub',
+ 'messages', 'true')
+));
+
+# 66 77 67 == B M C == BEGIN MESSAGE COMMIT
+is($result, qq(66
+77
+67),
+ 'messages on slot are B M C with message option');
+
+$result = $node_publisher->safe_psql(
+ 'postgres', qq(
+ SELECT get_byte(data, 1), encode(substr(data, 11, 8), 'escape')
+ FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+ 'proto_version', '1',
+ 'publication_names', 'tap_pub',
+ 'messages', 'true')
+ OFFSET 1 LIMIT 1
+));
+
+is($result, qq(1|pgoutput),
+ "flag transactional is set to 1 and prefix is pgoutput");
+
+$result = $node_publisher->safe_psql(
+ 'postgres', qq(
+ SELECT get_byte(data, 0)
+ FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+ 'proto_version', '1',
+ 'publication_names', 'tap_pub')
+));
+
+# 66 67 == B C == BEGIN COMMIT
+is($result, qq(66
+67),
+ 'option messages defaults to false so message (M) is not available on slot');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE");
+$node_publisher->wait_for_catchup('tap_sub');
+
+# ensure a non-transactional logical decoding message shows up on the slot
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)");
+
+my $message_lsn = $node_publisher->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message')");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (2)");
+
+$result = $node_publisher->safe_psql(
+ 'postgres', qq(
+ SELECT get_byte(data, 0), get_byte(data, 1)
+ FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+ 'proto_version', '1',
+ 'publication_names', 'tap_pub',
+ 'messages', 'true')
+ WHERE lsn = '$message_lsn' AND xid = 0
+));
+
+is($result, qq(77|0), 'non-transactional message on slot is M');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE");
+$node_publisher->wait_for_catchup('tap_sub');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
+
+# wait for the replication connection to drop from the publisher
+$node_publisher->poll_query_until('postgres',
+ 'SELECT COUNT(*) FROM pg_catalog.pg_stat_replication', 0);
+
+# Ensure a non-transactional logical decoding message shows up on the slot when
+# it is emitted within an aborted transaction. The message won't emit until
+# something advances the LSN, hence, we intentionally forces the server to
+# switch to a new WAL file.
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ BEGIN;
+ SELECT pg_logical_emit_message(false, 'pgoutput',
+ 'a non-transactional message is available even if the transaction is aborted 1');
+ INSERT INTO tab_test VALUES (3);
+ SELECT pg_logical_emit_message(true, 'pgoutput',
+ 'a transactional message is not available if the transaction is aborted');
+ SELECT pg_logical_emit_message(false, 'pgoutput',
+ 'a non-transactional message is available even if the transaction is aborted 2');
+ ROLLBACK;
+ SELECT pg_switch_wal();
+));
+
+$result = $node_publisher->safe_psql(
+ 'postgres', qq(
+ SELECT get_byte(data, 0), get_byte(data, 1)
+ FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+ 'proto_version', '1',
+ 'publication_names', 'tap_pub',
+ 'messages', 'true')
+));
+
+is($result, qq(77|0
+77|0),
+ 'non-transactional message on slot from aborted transaction is M');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');