mirror of
				https://github.com/postgres/postgres.git
				synced 2025-11-03 09:13:20 +03:00 
			
		
		
		
	Currently, there are quite a few places in reorderbuffer.c that tries to access top-transaction for a subtransaction. This makes the code to access top-transaction consistent and easier to follow. Author: Peter Smith Reviewed-by: Vignesh C, Sawada Masahiko Discussion: https://postgr.es/m/CAHut+PuCznOyTqBQwjRUu-ibG-=KHyCv-0FTcWQtZUdR88umfg@mail.gmail.com
		
			
				
	
	
		
			977 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			977 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
/*-------------------------------------------------------------------------
 | 
						|
 *
 | 
						|
 * test_decoding.c
 | 
						|
 *		  example logical decoding output plugin
 | 
						|
 *
 | 
						|
 * Copyright (c) 2012-2023, PostgreSQL Global Development Group
 | 
						|
 *
 | 
						|
 * IDENTIFICATION
 | 
						|
 *		  contrib/test_decoding/test_decoding.c
 | 
						|
 *
 | 
						|
 *-------------------------------------------------------------------------
 | 
						|
 */
 | 
						|
#include "postgres.h"
 | 
						|
 | 
						|
#include "catalog/pg_type.h"
 | 
						|
 | 
						|
#include "replication/logical.h"
 | 
						|
#include "replication/origin.h"
 | 
						|
 | 
						|
#include "utils/builtins.h"
 | 
						|
#include "utils/lsyscache.h"
 | 
						|
#include "utils/memutils.h"
 | 
						|
#include "utils/rel.h"
 | 
						|
 | 
						|
PG_MODULE_MAGIC;
 | 
						|
 | 
						|
typedef struct
 | 
						|
{
 | 
						|
	MemoryContext context;
 | 
						|
	bool		include_xids;
 | 
						|
	bool		include_timestamp;
 | 
						|
	bool		skip_empty_xacts;
 | 
						|
	bool		only_local;
 | 
						|
} TestDecodingData;
 | 
						|
 | 
						|
/*
 | 
						|
 * Maintain the per-transaction level variables to track whether the
 | 
						|
 * transaction and or streams have written any changes. In streaming mode the
 | 
						|
 * transaction can be decoded in streams so along with maintaining whether the
 | 
						|
 * transaction has written any changes, we also need to track whether the
 | 
						|
 * current stream has written any changes. This is required so that if user
 | 
						|
 * has requested to skip the empty transactions we can skip the empty streams
 | 
						|
 * even though the transaction has written some changes.
 | 
						|
 */
 | 
						|
typedef struct
 | 
						|
{
 | 
						|
	bool		xact_wrote_changes;
 | 
						|
	bool		stream_wrote_changes;
 | 
						|
} TestDecodingTxnData;
 | 
						|
 | 
						|
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 | 
						|
							  bool is_init);
 | 
						|
static void pg_decode_shutdown(LogicalDecodingContext *ctx);
 | 
						|
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
 | 
						|
								ReorderBufferTXN *txn);
 | 
						|
static void pg_output_begin(LogicalDecodingContext *ctx,
 | 
						|
							TestDecodingData *data,
 | 
						|
							ReorderBufferTXN *txn,
 | 
						|
							bool last_write);
 | 
						|
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
 | 
						|
								 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
 | 
						|
static void pg_decode_change(LogicalDecodingContext *ctx,
 | 
						|
							 ReorderBufferTXN *txn, Relation relation,
 | 
						|
							 ReorderBufferChange *change);
 | 
						|
static void pg_decode_truncate(LogicalDecodingContext *ctx,
 | 
						|
							   ReorderBufferTXN *txn,
 | 
						|
							   int nrelations, Relation relations[],
 | 
						|
							   ReorderBufferChange *change);
 | 
						|
static bool pg_decode_filter(LogicalDecodingContext *ctx,
 | 
						|
							 RepOriginId origin_id);
 | 
						|
static void pg_decode_message(LogicalDecodingContext *ctx,
 | 
						|
							  ReorderBufferTXN *txn, XLogRecPtr lsn,
 | 
						|
							  bool transactional, const char *prefix,
 | 
						|
							  Size sz, const char *message);
 | 
						|
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
 | 
						|
									 TransactionId xid,
 | 
						|
									 const char *gid);
 | 
						|
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 | 
						|
										ReorderBufferTXN *txn);
 | 
						|
static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
 | 
						|
								  ReorderBufferTXN *txn,
 | 
						|
								  XLogRecPtr prepare_lsn);
 | 
						|
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
 | 
						|
										  ReorderBufferTXN *txn,
 | 
						|
										  XLogRecPtr commit_lsn);
 | 
						|
static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
 | 
						|
											ReorderBufferTXN *txn,
 | 
						|
											XLogRecPtr prepare_end_lsn,
 | 
						|
											TimestampTz prepare_time);
 | 
						|
static void pg_decode_stream_start(LogicalDecodingContext *ctx,
 | 
						|
								   ReorderBufferTXN *txn);
 | 
						|
static void pg_output_stream_start(LogicalDecodingContext *ctx,
 | 
						|
								   TestDecodingData *data,
 | 
						|
								   ReorderBufferTXN *txn,
 | 
						|
								   bool last_write);
 | 
						|
static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
 | 
						|
								  ReorderBufferTXN *txn);
 | 
						|
static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
 | 
						|
								   ReorderBufferTXN *txn,
 | 
						|
								   XLogRecPtr abort_lsn);
 | 
						|
static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
 | 
						|
									 ReorderBufferTXN *txn,
 | 
						|
									 XLogRecPtr prepare_lsn);
 | 
						|
static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
 | 
						|
									ReorderBufferTXN *txn,
 | 
						|
									XLogRecPtr commit_lsn);
 | 
						|
static void pg_decode_stream_change(LogicalDecodingContext *ctx,
 | 
						|
									ReorderBufferTXN *txn,
 | 
						|
									Relation relation,
 | 
						|
									ReorderBufferChange *change);
 | 
						|
static void pg_decode_stream_message(LogicalDecodingContext *ctx,
 | 
						|
									 ReorderBufferTXN *txn, XLogRecPtr lsn,
 | 
						|
									 bool transactional, const char *prefix,
 | 
						|
									 Size sz, const char *message);
 | 
						|
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
 | 
						|
									  ReorderBufferTXN *txn,
 | 
						|
									  int nrelations, Relation relations[],
 | 
						|
									  ReorderBufferChange *change);
 | 
						|
 | 
						|
void
 | 
						|
_PG_init(void)
 | 
						|
{
 | 
						|
	/* other plugins can perform things here */
 | 
						|
}
 | 
						|
 | 
						|
/* specify output plugin callbacks */
 | 
						|
void
 | 
						|
_PG_output_plugin_init(OutputPluginCallbacks *cb)
 | 
						|
{
 | 
						|
	cb->startup_cb = pg_decode_startup;
 | 
						|
	cb->begin_cb = pg_decode_begin_txn;
 | 
						|
	cb->change_cb = pg_decode_change;
 | 
						|
	cb->truncate_cb = pg_decode_truncate;
 | 
						|
	cb->commit_cb = pg_decode_commit_txn;
 | 
						|
	cb->filter_by_origin_cb = pg_decode_filter;
 | 
						|
	cb->shutdown_cb = pg_decode_shutdown;
 | 
						|
	cb->message_cb = pg_decode_message;
 | 
						|
	cb->filter_prepare_cb = pg_decode_filter_prepare;
 | 
						|
	cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
 | 
						|
	cb->prepare_cb = pg_decode_prepare_txn;
 | 
						|
	cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
 | 
						|
	cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
 | 
						|
	cb->stream_start_cb = pg_decode_stream_start;
 | 
						|
	cb->stream_stop_cb = pg_decode_stream_stop;
 | 
						|
	cb->stream_abort_cb = pg_decode_stream_abort;
 | 
						|
	cb->stream_prepare_cb = pg_decode_stream_prepare;
 | 
						|
	cb->stream_commit_cb = pg_decode_stream_commit;
 | 
						|
	cb->stream_change_cb = pg_decode_stream_change;
 | 
						|
	cb->stream_message_cb = pg_decode_stream_message;
 | 
						|
	cb->stream_truncate_cb = pg_decode_stream_truncate;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
/* initialize this plugin */
 | 
						|
static void
 | 
						|
pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 | 
						|
				  bool is_init)
 | 
						|
{
 | 
						|
	ListCell   *option;
 | 
						|
	TestDecodingData *data;
 | 
						|
	bool		enable_streaming = false;
 | 
						|
 | 
						|
	data = palloc0(sizeof(TestDecodingData));
 | 
						|
	data->context = AllocSetContextCreate(ctx->context,
 | 
						|
										  "text conversion context",
 | 
						|
										  ALLOCSET_DEFAULT_SIZES);
 | 
						|
	data->include_xids = true;
 | 
						|
	data->include_timestamp = false;
 | 
						|
	data->skip_empty_xacts = false;
 | 
						|
	data->only_local = false;
 | 
						|
 | 
						|
	ctx->output_plugin_private = data;
 | 
						|
 | 
						|
	opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
 | 
						|
	opt->receive_rewrites = false;
 | 
						|
 | 
						|
	foreach(option, ctx->output_plugin_options)
 | 
						|
	{
 | 
						|
		DefElem    *elem = lfirst(option);
 | 
						|
 | 
						|
		Assert(elem->arg == NULL || IsA(elem->arg, String));
 | 
						|
 | 
						|
		if (strcmp(elem->defname, "include-xids") == 0)
 | 
						|
		{
 | 
						|
			/* if option does not provide a value, it means its value is true */
 | 
						|
			if (elem->arg == NULL)
 | 
						|
				data->include_xids = true;
 | 
						|
			else if (!parse_bool(strVal(elem->arg), &data->include_xids))
 | 
						|
				ereport(ERROR,
 | 
						|
						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | 
						|
						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 | 
						|
								strVal(elem->arg), elem->defname)));
 | 
						|
		}
 | 
						|
		else if (strcmp(elem->defname, "include-timestamp") == 0)
 | 
						|
		{
 | 
						|
			if (elem->arg == NULL)
 | 
						|
				data->include_timestamp = true;
 | 
						|
			else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
 | 
						|
				ereport(ERROR,
 | 
						|
						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | 
						|
						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 | 
						|
								strVal(elem->arg), elem->defname)));
 | 
						|
		}
 | 
						|
		else if (strcmp(elem->defname, "force-binary") == 0)
 | 
						|
		{
 | 
						|
			bool		force_binary;
 | 
						|
 | 
						|
			if (elem->arg == NULL)
 | 
						|
				continue;
 | 
						|
			else if (!parse_bool(strVal(elem->arg), &force_binary))
 | 
						|
				ereport(ERROR,
 | 
						|
						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | 
						|
						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 | 
						|
								strVal(elem->arg), elem->defname)));
 | 
						|
 | 
						|
			if (force_binary)
 | 
						|
				opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
 | 
						|
		}
 | 
						|
		else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
 | 
						|
		{
 | 
						|
 | 
						|
			if (elem->arg == NULL)
 | 
						|
				data->skip_empty_xacts = true;
 | 
						|
			else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
 | 
						|
				ereport(ERROR,
 | 
						|
						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | 
						|
						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 | 
						|
								strVal(elem->arg), elem->defname)));
 | 
						|
		}
 | 
						|
		else if (strcmp(elem->defname, "only-local") == 0)
 | 
						|
		{
 | 
						|
 | 
						|
			if (elem->arg == NULL)
 | 
						|
				data->only_local = true;
 | 
						|
			else if (!parse_bool(strVal(elem->arg), &data->only_local))
 | 
						|
				ereport(ERROR,
 | 
						|
						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | 
						|
						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 | 
						|
								strVal(elem->arg), elem->defname)));
 | 
						|
		}
 | 
						|
		else if (strcmp(elem->defname, "include-rewrites") == 0)
 | 
						|
		{
 | 
						|
 | 
						|
			if (elem->arg == NULL)
 | 
						|
				continue;
 | 
						|
			else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
 | 
						|
				ereport(ERROR,
 | 
						|
						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | 
						|
						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 | 
						|
								strVal(elem->arg), elem->defname)));
 | 
						|
		}
 | 
						|
		else if (strcmp(elem->defname, "stream-changes") == 0)
 | 
						|
		{
 | 
						|
			if (elem->arg == NULL)
 | 
						|
				continue;
 | 
						|
			else if (!parse_bool(strVal(elem->arg), &enable_streaming))
 | 
						|
				ereport(ERROR,
 | 
						|
						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | 
						|
						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 | 
						|
								strVal(elem->arg), elem->defname)));
 | 
						|
		}
 | 
						|
		else
 | 
						|
		{
 | 
						|
			ereport(ERROR,
 | 
						|
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | 
						|
					 errmsg("option \"%s\" = \"%s\" is unknown",
 | 
						|
							elem->defname,
 | 
						|
							elem->arg ? strVal(elem->arg) : "(null)")));
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	ctx->streaming &= enable_streaming;
 | 
						|
}
 | 
						|
 | 
						|
/* cleanup this plugin's resources */
 | 
						|
static void
 | 
						|
pg_decode_shutdown(LogicalDecodingContext *ctx)
 | 
						|
{
 | 
						|
	TestDecodingData *data = ctx->output_plugin_private;
 | 
						|
 | 
						|
	/* cleanup our own resources via memory context reset */
 | 
						|
	MemoryContextDelete(data->context);
 | 
						|
}
 | 
						|
 | 
						|
/* BEGIN callback */
 | 
						|
static void
 | 
						|
pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 | 
						|
{
 | 
						|
	TestDecodingData *data = ctx->output_plugin_private;
 | 
						|
	TestDecodingTxnData *txndata =
 | 
						|
	MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
 | 
						|
 | 
						|
	txndata->xact_wrote_changes = false;
 | 
						|
	txn->output_plugin_private = txndata;
 | 
						|
 | 
						|
	/*
 | 
						|
	 * If asked to skip empty transactions, we'll emit BEGIN at the point
 | 
						|
	 * where the first operation is received for this transaction.
 | 
						|
	 */
 | 
						|
	if (data->skip_empty_xacts)
 | 
						|
		return;
 | 
						|
 | 
						|
	pg_output_begin(ctx, data, txn, true);
 | 
						|
}
 | 
						|
 | 
						|
static void
 | 
						|
pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 | 
						|
{
 | 
						|
	OutputPluginPrepareWrite(ctx, last_write);
 | 
						|
	if (data->include_xids)
 | 
						|
		appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
 | 
						|
	else
 | 
						|
		appendStringInfoString(ctx->out, "BEGIN");
 | 
						|
	OutputPluginWrite(ctx, last_write);
 | 
						|
}
 | 
						|
 | 
						|
/* COMMIT callback */
 | 
						|
static void
 | 
						|
pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 | 
						|
					 XLogRecPtr commit_lsn)
 | 
						|
{
 | 
						|
	TestDecodingData *data = ctx->output_plugin_private;
 | 
						|
	TestDecodingTxnData *txndata = txn->output_plugin_private;
 | 
						|
	bool		xact_wrote_changes = txndata->xact_wrote_changes;
 | 
						|
 | 
						|
	pfree(txndata);
 | 
						|
	txn->output_plugin_private = NULL;
 | 
						|
 | 
						|
	if (data->skip_empty_xacts && !xact_wrote_changes)
 | 
						|
		return;
 | 
						|
 | 
						|
	OutputPluginPrepareWrite(ctx, true);
 | 
						|
	if (data->include_xids)
 | 
						|
		appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
 | 
						|
	else
 | 
						|
		appendStringInfoString(ctx->out, "COMMIT");
 | 
						|
 | 
						|
	if (data->include_timestamp)
 | 
						|
		appendStringInfo(ctx->out, " (at %s)",
 | 
						|
						 timestamptz_to_str(txn->xact_time.commit_time));
 | 
						|
 | 
						|
	OutputPluginWrite(ctx, true);
 | 
						|
}
 | 
						|
 | 
						|
/* BEGIN PREPARE callback */
 | 
						|
static void
 | 
						|
pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 | 
						|
{
 | 
						|
	TestDecodingData *data = ctx->output_plugin_private;
 | 
						|
	TestDecodingTxnData *txndata =
 | 
						|
	MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
 | 
						|
 | 
						|
	txndata->xact_wrote_changes = false;
 | 
						|
	txn->output_plugin_private = txndata;
 | 
						|
 | 
						|
	/*
 | 
						|
	 * If asked to skip empty transactions, we'll emit BEGIN at the point
 | 
						|
	 * where the first operation is received for this transaction.
 | 
						|
	 */
 | 
						|
	if (data->skip_empty_xacts)
 | 
						|
		return;
 | 
						|
 | 
						|
	pg_output_begin(ctx, data, txn, true);
 | 
						|
}
 | 
						|
 | 
						|
/* PREPARE callback */
 | 
						|
static void
 | 
						|
pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 | 
						|
					  XLogRecPtr prepare_lsn)
 | 
						|
{
 | 
						|
	TestDecodingData *data = ctx->output_plugin_private;
 | 
						|
	TestDecodingTxnData *txndata = txn->output_plugin_private;
 | 
						|
 | 
						|
	/*
 | 
						|
	 * If asked to skip empty transactions, we'll emit PREPARE at the point
 | 
						|
	 * where the first operation is received for this transaction.
 | 
						|
	 */
 | 
						|
	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 | 
						|
		return;
 | 
						|
 | 
						|
	OutputPluginPrepareWrite(ctx, true);
 | 
						|
 | 
						|
	appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
 | 
						|
					 quote_literal_cstr(txn->gid));
 | 
						|
 | 
						|
	if (data->include_xids)
 | 
						|
		appendStringInfo(ctx->out, ", txid %u", txn->xid);
 | 
						|
 | 
						|
	if (data->include_timestamp)
 | 
						|
		appendStringInfo(ctx->out, " (at %s)",
 | 
						|
						 timestamptz_to_str(txn->xact_time.prepare_time));
 | 
						|
 | 
						|
	OutputPluginWrite(ctx, true);
 | 
						|
}
 | 
						|
 | 
						|
/* COMMIT PREPARED callback */
 | 
						|
static void
 | 
						|
pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 | 
						|
							  XLogRecPtr commit_lsn)
 | 
						|
{
 | 
						|
	TestDecodingData *data = ctx->output_plugin_private;
 | 
						|
 | 
						|
	OutputPluginPrepareWrite(ctx, true);
 | 
						|
 | 
						|
	appendStringInfo(ctx->out, "COMMIT PREPARED %s",
 | 
						|
					 quote_literal_cstr(txn->gid));
 | 
						|
 | 
						|
	if (data->include_xids)
 | 
						|
		appendStringInfo(ctx->out, ", txid %u", txn->xid);
 | 
						|
 | 
						|
	if (data->include_timestamp)
 | 
						|
		appendStringInfo(ctx->out, " (at %s)",
 | 
						|
						 timestamptz_to_str(txn->xact_time.commit_time));
 | 
						|
 | 
						|
	OutputPluginWrite(ctx, true);
 | 
						|
}
 | 
						|
 | 
						|
/* ROLLBACK PREPARED callback */
 | 
						|
static void
 | 
						|
pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
 | 
						|
								ReorderBufferTXN *txn,
 | 
						|
								XLogRecPtr prepare_end_lsn,
 | 
						|
								TimestampTz prepare_time)
 | 
						|
{
 | 
						|
	TestDecodingData *data = ctx->output_plugin_private;
 | 
						|
 | 
						|
	OutputPluginPrepareWrite(ctx, true);
 | 
						|
 | 
						|
	appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
 | 
						|
					 quote_literal_cstr(txn->gid));
 | 
						|
 | 
						|
	if (data->include_xids)
 | 
						|
		appendStringInfo(ctx->out, ", txid %u", txn->xid);
 | 
						|
 | 
						|
	if (data->include_timestamp)
 | 
						|
		appendStringInfo(ctx->out, " (at %s)",
 | 
						|
						 timestamptz_to_str(txn->xact_time.commit_time));
 | 
						|
 | 
						|
	OutputPluginWrite(ctx, true);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 * Filter out two-phase transactions.
 | 
						|
 *
 | 
						|
 * Each plugin can implement its own filtering logic. Here we demonstrate a
 | 
						|
 * simple logic by checking the GID. If the GID contains the "_nodecode"
 | 
						|
 * substring, then we filter it out.
 | 
						|
 */
 | 
						|
static bool
 | 
						|
pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
 | 
						|
						 const char *gid)
 | 
						|
{
 | 
						|
	if (strstr(gid, "_nodecode") != NULL)
 | 
						|
		return true;
 | 
						|
 | 
						|
	return false;
 | 
						|
}
 | 
						|
 | 
						|
static bool
 | 
						|
pg_decode_filter(LogicalDecodingContext *ctx,
 | 
						|
				 RepOriginId origin_id)
 | 
						|
{
 | 
						|
	TestDecodingData *data = ctx->output_plugin_private;
 | 
						|
 | 
						|
	if (data->only_local && origin_id != InvalidRepOriginId)
 | 
						|
		return true;
 | 
						|
	return false;
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 * Print literal `outputstr' already represented as string of type `typid'
 | 
						|
 * into stringbuf `s'.
 | 
						|
 *
 | 
						|
 * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
 | 
						|
 * if standard_conforming_strings were enabled.
 | 
						|
 */
 | 
						|
static void
 | 
						|
print_literal(StringInfo s, Oid typid, char *outputstr)
 | 
						|
{
 | 
						|
	const char *valptr;
 | 
						|
 | 
						|
	switch (typid)
 | 
						|
	{
 | 
						|
		case INT2OID:
 | 
						|
		case INT4OID:
 | 
						|
		case INT8OID:
 | 
						|
		case OIDOID:
 | 
						|
		case FLOAT4OID:
 | 
						|
		case FLOAT8OID:
 | 
						|
		case NUMERICOID:
 | 
						|
			/* NB: We don't care about Inf, NaN et al. */
 | 
						|
			appendStringInfoString(s, outputstr);
 | 
						|
			break;
 | 
						|
 | 
						|
		case BITOID:
 | 
						|
		case VARBITOID:
 | 
						|
			appendStringInfo(s, "B'%s'", outputstr);
 | 
						|
			break;
 | 
						|
 | 
						|
		case BOOLOID:
 | 
						|
			if (strcmp(outputstr, "t") == 0)
 | 
						|
				appendStringInfoString(s, "true");
 | 
						|
			else
 | 
						|
				appendStringInfoString(s, "false");
 | 
						|
			break;
 | 
						|
 | 
						|
		default:
 | 
						|
			appendStringInfoChar(s, '\'');
 | 
						|
			for (valptr = outputstr; *valptr; valptr++)
 | 
						|
			{
 | 
						|
				char		ch = *valptr;
 | 
						|
 | 
						|
				if (SQL_STR_DOUBLE(ch, false))
 | 
						|
					appendStringInfoChar(s, ch);
 | 
						|
				appendStringInfoChar(s, ch);
 | 
						|
			}
 | 
						|
			appendStringInfoChar(s, '\'');
 | 
						|
			break;
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
/* print the tuple 'tuple' into the StringInfo s */
 | 
						|
static void
 | 
						|
tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
 | 
						|
{
 | 
						|
	int			natt;
 | 
						|
 | 
						|
	/* print all columns individually */
 | 
						|
	for (natt = 0; natt < tupdesc->natts; natt++)
 | 
						|
	{
 | 
						|
		Form_pg_attribute attr; /* the attribute itself */
 | 
						|
		Oid			typid;		/* type of current attribute */
 | 
						|
		Oid			typoutput;	/* output function */
 | 
						|
		bool		typisvarlena;
 | 
						|
		Datum		origval;	/* possibly toasted Datum */
 | 
						|
		bool		isnull;		/* column is null? */
 | 
						|
 | 
						|
		attr = TupleDescAttr(tupdesc, natt);
 | 
						|
 | 
						|
		/*
 | 
						|
		 * don't print dropped columns, we can't be sure everything is
 | 
						|
		 * available for them
 | 
						|
		 */
 | 
						|
		if (attr->attisdropped)
 | 
						|
			continue;
 | 
						|
 | 
						|
		/*
 | 
						|
		 * Don't print system columns, oid will already have been printed if
 | 
						|
		 * present.
 | 
						|
		 */
 | 
						|
		if (attr->attnum < 0)
 | 
						|
			continue;
 | 
						|
 | 
						|
		typid = attr->atttypid;
 | 
						|
 | 
						|
		/* get Datum from tuple */
 | 
						|
		origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
 | 
						|
 | 
						|
		if (isnull && skip_nulls)
 | 
						|
			continue;
 | 
						|
 | 
						|
		/* print attribute name */
 | 
						|
		appendStringInfoChar(s, ' ');
 | 
						|
		appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
 | 
						|
 | 
						|
		/* print attribute type */
 | 
						|
		appendStringInfoChar(s, '[');
 | 
						|
		appendStringInfoString(s, format_type_be(typid));
 | 
						|
		appendStringInfoChar(s, ']');
 | 
						|
 | 
						|
		/* query output function */
 | 
						|
		getTypeOutputInfo(typid,
 | 
						|
						  &typoutput, &typisvarlena);
 | 
						|
 | 
						|
		/* print separator */
 | 
						|
		appendStringInfoChar(s, ':');
 | 
						|
 | 
						|
		/* print data */
 | 
						|
		if (isnull)
 | 
						|
			appendStringInfoString(s, "null");
 | 
						|
		else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
 | 
						|
			appendStringInfoString(s, "unchanged-toast-datum");
 | 
						|
		else if (!typisvarlena)
 | 
						|
			print_literal(s, typid,
 | 
						|
						  OidOutputFunctionCall(typoutput, origval));
 | 
						|
		else
 | 
						|
		{
 | 
						|
			Datum		val;	/* definitely detoasted Datum */
 | 
						|
 | 
						|
			val = PointerGetDatum(PG_DETOAST_DATUM(origval));
 | 
						|
			print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 * callback for individual changed tuples
 | 
						|
 */
 | 
						|
static void
 | 
						|
pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 | 
						|
				 Relation relation, ReorderBufferChange *change)
 | 
						|
{
 | 
						|
	TestDecodingData *data;
 | 
						|
	TestDecodingTxnData *txndata;
 | 
						|
	Form_pg_class class_form;
 | 
						|
	TupleDesc	tupdesc;
 | 
						|
	MemoryContext old;
 | 
						|
 | 
						|
	data = ctx->output_plugin_private;
 | 
						|
	txndata = txn->output_plugin_private;
 | 
						|
 | 
						|
	/* output BEGIN if we haven't yet */
 | 
						|
	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 | 
						|
	{
 | 
						|
		pg_output_begin(ctx, data, txn, false);
 | 
						|
	}
 | 
						|
	txndata->xact_wrote_changes = true;
 | 
						|
 | 
						|
	class_form = RelationGetForm(relation);
 | 
						|
	tupdesc = RelationGetDescr(relation);
 | 
						|
 | 
						|
	/* Avoid leaking memory by using and resetting our own context */
 | 
						|
	old = MemoryContextSwitchTo(data->context);
 | 
						|
 | 
						|
	OutputPluginPrepareWrite(ctx, true);
 | 
						|
 | 
						|
	appendStringInfoString(ctx->out, "table ");
 | 
						|
	appendStringInfoString(ctx->out,
 | 
						|
						   quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
 | 
						|
													  class_form->relrewrite ?
 | 
						|
													  get_rel_name(class_form->relrewrite) :
 | 
						|
													  NameStr(class_form->relname)));
 | 
						|
	appendStringInfoChar(ctx->out, ':');
 | 
						|
 | 
						|
	switch (change->action)
 | 
						|
	{
 | 
						|
		case REORDER_BUFFER_CHANGE_INSERT:
 | 
						|
			appendStringInfoString(ctx->out, " INSERT:");
 | 
						|
			if (change->data.tp.newtuple == NULL)
 | 
						|
				appendStringInfoString(ctx->out, " (no-tuple-data)");
 | 
						|
			else
 | 
						|
				tuple_to_stringinfo(ctx->out, tupdesc,
 | 
						|
									&change->data.tp.newtuple->tuple,
 | 
						|
									false);
 | 
						|
			break;
 | 
						|
		case REORDER_BUFFER_CHANGE_UPDATE:
 | 
						|
			appendStringInfoString(ctx->out, " UPDATE:");
 | 
						|
			if (change->data.tp.oldtuple != NULL)
 | 
						|
			{
 | 
						|
				appendStringInfoString(ctx->out, " old-key:");
 | 
						|
				tuple_to_stringinfo(ctx->out, tupdesc,
 | 
						|
									&change->data.tp.oldtuple->tuple,
 | 
						|
									true);
 | 
						|
				appendStringInfoString(ctx->out, " new-tuple:");
 | 
						|
			}
 | 
						|
 | 
						|
			if (change->data.tp.newtuple == NULL)
 | 
						|
				appendStringInfoString(ctx->out, " (no-tuple-data)");
 | 
						|
			else
 | 
						|
				tuple_to_stringinfo(ctx->out, tupdesc,
 | 
						|
									&change->data.tp.newtuple->tuple,
 | 
						|
									false);
 | 
						|
			break;
 | 
						|
		case REORDER_BUFFER_CHANGE_DELETE:
 | 
						|
			appendStringInfoString(ctx->out, " DELETE:");
 | 
						|
 | 
						|
			/* if there was no PK, we only know that a delete happened */
 | 
						|
			if (change->data.tp.oldtuple == NULL)
 | 
						|
				appendStringInfoString(ctx->out, " (no-tuple-data)");
 | 
						|
			/* In DELETE, only the replica identity is present; display that */
 | 
						|
			else
 | 
						|
				tuple_to_stringinfo(ctx->out, tupdesc,
 | 
						|
									&change->data.tp.oldtuple->tuple,
 | 
						|
									true);
 | 
						|
			break;
 | 
						|
		default:
 | 
						|
			Assert(false);
 | 
						|
	}
 | 
						|
 | 
						|
	MemoryContextSwitchTo(old);
 | 
						|
	MemoryContextReset(data->context);
 | 
						|
 | 
						|
	OutputPluginWrite(ctx, true);
 | 
						|
}
 | 
						|
 | 
						|
static void
 | 
						|
pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 | 
						|
				   int nrelations, Relation relations[], ReorderBufferChange *change)
 | 
						|
{
 | 
						|
	TestDecodingData *data;
 | 
						|
	TestDecodingTxnData *txndata;
 | 
						|
	MemoryContext old;
 | 
						|
	int			i;
 | 
						|
 | 
						|
	data = ctx->output_plugin_private;
 | 
						|
	txndata = txn->output_plugin_private;
 | 
						|
 | 
						|
	/* output BEGIN if we haven't yet */
 | 
						|
	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 | 
						|
	{
 | 
						|
		pg_output_begin(ctx, data, txn, false);
 | 
						|
	}
 | 
						|
	txndata->xact_wrote_changes = true;
 | 
						|
 | 
						|
	/* Avoid leaking memory by using and resetting our own context */
 | 
						|
	old = MemoryContextSwitchTo(data->context);
 | 
						|
 | 
						|
	OutputPluginPrepareWrite(ctx, true);
 | 
						|
 | 
						|
	appendStringInfoString(ctx->out, "table ");
 | 
						|
 | 
						|
	for (i = 0; i < nrelations; i++)
 | 
						|
	{
 | 
						|
		if (i > 0)
 | 
						|
			appendStringInfoString(ctx->out, ", ");
 | 
						|
 | 
						|
		appendStringInfoString(ctx->out,
 | 
						|
							   quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
 | 
						|
														  NameStr(relations[i]->rd_rel->relname)));
 | 
						|
	}
 | 
						|
 | 
						|
	appendStringInfoString(ctx->out, ": TRUNCATE:");
 | 
						|
 | 
						|
	if (change->data.truncate.restart_seqs
 | 
						|
		|| change->data.truncate.cascade)
 | 
						|
	{
 | 
						|
		if (change->data.truncate.restart_seqs)
 | 
						|
			appendStringInfoString(ctx->out, " restart_seqs");
 | 
						|
		if (change->data.truncate.cascade)
 | 
						|
			appendStringInfoString(ctx->out, " cascade");
 | 
						|
	}
 | 
						|
	else
 | 
						|
		appendStringInfoString(ctx->out, " (no-flags)");
 | 
						|
 | 
						|
	MemoryContextSwitchTo(old);
 | 
						|
	MemoryContextReset(data->context);
 | 
						|
 | 
						|
	OutputPluginWrite(ctx, true);
 | 
						|
}
 | 
						|
 | 
						|
static void
 | 
						|
pg_decode_message(LogicalDecodingContext *ctx,
 | 
						|
				  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
 | 
						|
				  const char *prefix, Size sz, const char *message)
 | 
						|
{
 | 
						|
	OutputPluginPrepareWrite(ctx, true);
 | 
						|
	appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
 | 
						|
					 transactional, prefix, sz);
 | 
						|
	appendBinaryStringInfo(ctx->out, message, sz);
 | 
						|
	OutputPluginWrite(ctx, true);
 | 
						|
}
 | 
						|
 | 
						|
static void
 | 
						|
pg_decode_stream_start(LogicalDecodingContext *ctx,
 | 
						|
					   ReorderBufferTXN *txn)
 | 
						|
{
 | 
						|
	TestDecodingData *data = ctx->output_plugin_private;
 | 
						|
	TestDecodingTxnData *txndata = txn->output_plugin_private;
 | 
						|
 | 
						|
	/*
 | 
						|
	 * Allocate the txn plugin data for the first stream in the transaction.
 | 
						|
	 */
 | 
						|
	if (txndata == NULL)
 | 
						|
	{
 | 
						|
		txndata =
 | 
						|
			MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
 | 
						|
		txndata->xact_wrote_changes = false;
 | 
						|
		txn->output_plugin_private = txndata;
 | 
						|
	}
 | 
						|
 | 
						|
	txndata->stream_wrote_changes = false;
 | 
						|
	if (data->skip_empty_xacts)
 | 
						|
		return;
 | 
						|
	pg_output_stream_start(ctx, data, txn, true);
 | 
						|
}
 | 
						|
 | 
						|
static void
 | 
						|
pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 | 
						|
{
 | 
						|
	OutputPluginPrepareWrite(ctx, last_write);
 | 
						|
	if (data->include_xids)
 | 
						|
		appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
 | 
						|
	else
 | 
						|
		appendStringInfoString(ctx->out, "opening a streamed block for transaction");
 | 
						|
	OutputPluginWrite(ctx, last_write);
 | 
						|
}
 | 
						|
 | 
						|
static void
 | 
						|
pg_decode_stream_stop(LogicalDecodingContext *ctx,
 | 
						|
					  ReorderBufferTXN *txn)
 | 
						|
{
 | 
						|
	TestDecodingData *data = ctx->output_plugin_private;
 | 
						|
	TestDecodingTxnData *txndata = txn->output_plugin_private;
 | 
						|
 | 
						|
	if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
 | 
						|
		return;
 | 
						|
 | 
						|
	OutputPluginPrepareWrite(ctx, true);
 | 
						|
	if (data->include_xids)
 | 
						|
		appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
 | 
						|
	else
 | 
						|
		appendStringInfoString(ctx->out, "closing a streamed block for transaction");
 | 
						|
	OutputPluginWrite(ctx, true);
 | 
						|
}
 | 
						|
 | 
						|
static void
 | 
						|
pg_decode_stream_abort(LogicalDecodingContext *ctx,
 | 
						|
					   ReorderBufferTXN *txn,
 | 
						|
					   XLogRecPtr abort_lsn)
 | 
						|
{
 | 
						|
	TestDecodingData *data = ctx->output_plugin_private;
 | 
						|
 | 
						|
	/*
 | 
						|
	 * stream abort can be sent for an individual subtransaction but we
 | 
						|
	 * maintain the output_plugin_private only under the toptxn so if this is
 | 
						|
	 * not the toptxn then fetch the toptxn.
 | 
						|
	 */
 | 
						|
	ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
 | 
						|
	TestDecodingTxnData *txndata = toptxn->output_plugin_private;
 | 
						|
	bool		xact_wrote_changes = txndata->xact_wrote_changes;
 | 
						|
 | 
						|
	if (rbtxn_is_toptxn(txn))
 | 
						|
	{
 | 
						|
		Assert(txn->output_plugin_private != NULL);
 | 
						|
		pfree(txndata);
 | 
						|
		txn->output_plugin_private = NULL;
 | 
						|
	}
 | 
						|
 | 
						|
	if (data->skip_empty_xacts && !xact_wrote_changes)
 | 
						|
		return;
 | 
						|
 | 
						|
	OutputPluginPrepareWrite(ctx, true);
 | 
						|
	if (data->include_xids)
 | 
						|
		appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
 | 
						|
	else
 | 
						|
		appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
 | 
						|
	OutputPluginWrite(ctx, true);
 | 
						|
}
 | 
						|
 | 
						|
static void
 | 
						|
pg_decode_stream_prepare(LogicalDecodingContext *ctx,
 | 
						|
						 ReorderBufferTXN *txn,
 | 
						|
						 XLogRecPtr prepare_lsn)
 | 
						|
{
 | 
						|
	TestDecodingData *data = ctx->output_plugin_private;
 | 
						|
	TestDecodingTxnData *txndata = txn->output_plugin_private;
 | 
						|
 | 
						|
	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 | 
						|
		return;
 | 
						|
 | 
						|
	OutputPluginPrepareWrite(ctx, true);
 | 
						|
 | 
						|
	if (data->include_xids)
 | 
						|
		appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
 | 
						|
						 quote_literal_cstr(txn->gid), txn->xid);
 | 
						|
	else
 | 
						|
		appendStringInfo(ctx->out, "preparing streamed transaction %s",
 | 
						|
						 quote_literal_cstr(txn->gid));
 | 
						|
 | 
						|
	if (data->include_timestamp)
 | 
						|
		appendStringInfo(ctx->out, " (at %s)",
 | 
						|
						 timestamptz_to_str(txn->xact_time.prepare_time));
 | 
						|
 | 
						|
	OutputPluginWrite(ctx, true);
 | 
						|
}
 | 
						|
 | 
						|
static void
 | 
						|
pg_decode_stream_commit(LogicalDecodingContext *ctx,
 | 
						|
						ReorderBufferTXN *txn,
 | 
						|
						XLogRecPtr commit_lsn)
 | 
						|
{
 | 
						|
	TestDecodingData *data = ctx->output_plugin_private;
 | 
						|
	TestDecodingTxnData *txndata = txn->output_plugin_private;
 | 
						|
	bool		xact_wrote_changes = txndata->xact_wrote_changes;
 | 
						|
 | 
						|
	pfree(txndata);
 | 
						|
	txn->output_plugin_private = NULL;
 | 
						|
 | 
						|
	if (data->skip_empty_xacts && !xact_wrote_changes)
 | 
						|
		return;
 | 
						|
 | 
						|
	OutputPluginPrepareWrite(ctx, true);
 | 
						|
 | 
						|
	if (data->include_xids)
 | 
						|
		appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
 | 
						|
	else
 | 
						|
		appendStringInfoString(ctx->out, "committing streamed transaction");
 | 
						|
 | 
						|
	if (data->include_timestamp)
 | 
						|
		appendStringInfo(ctx->out, " (at %s)",
 | 
						|
						 timestamptz_to_str(txn->xact_time.commit_time));
 | 
						|
 | 
						|
	OutputPluginWrite(ctx, true);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 * In streaming mode, we don't display the changes as the transaction can abort
 | 
						|
 * at a later point in time.  We don't want users to see the changes until the
 | 
						|
 * transaction is committed.
 | 
						|
 */
 | 
						|
static void
 | 
						|
pg_decode_stream_change(LogicalDecodingContext *ctx,
 | 
						|
						ReorderBufferTXN *txn,
 | 
						|
						Relation relation,
 | 
						|
						ReorderBufferChange *change)
 | 
						|
{
 | 
						|
	TestDecodingData *data = ctx->output_plugin_private;
 | 
						|
	TestDecodingTxnData *txndata = txn->output_plugin_private;
 | 
						|
 | 
						|
	/* output stream start if we haven't yet */
 | 
						|
	if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
 | 
						|
	{
 | 
						|
		pg_output_stream_start(ctx, data, txn, false);
 | 
						|
	}
 | 
						|
	txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
 | 
						|
 | 
						|
	OutputPluginPrepareWrite(ctx, true);
 | 
						|
	if (data->include_xids)
 | 
						|
		appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
 | 
						|
	else
 | 
						|
		appendStringInfoString(ctx->out, "streaming change for transaction");
 | 
						|
	OutputPluginWrite(ctx, true);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 * In streaming mode, we don't display the contents for transactional messages
 | 
						|
 * as the transaction can abort at a later point in time.  We don't want users to
 | 
						|
 * see the message contents until the transaction is committed.
 | 
						|
 */
 | 
						|
static void
 | 
						|
pg_decode_stream_message(LogicalDecodingContext *ctx,
 | 
						|
						 ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
 | 
						|
						 const char *prefix, Size sz, const char *message)
 | 
						|
{
 | 
						|
	OutputPluginPrepareWrite(ctx, true);
 | 
						|
 | 
						|
	if (transactional)
 | 
						|
	{
 | 
						|
		appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
 | 
						|
						 transactional, prefix, sz);
 | 
						|
	}
 | 
						|
	else
 | 
						|
	{
 | 
						|
		appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
 | 
						|
						 transactional, prefix, sz);
 | 
						|
		appendBinaryStringInfo(ctx->out, message, sz);
 | 
						|
	}
 | 
						|
 | 
						|
	OutputPluginWrite(ctx, true);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 * In streaming mode, we don't display the detailed information of Truncate.
 | 
						|
 * See pg_decode_stream_change.
 | 
						|
 */
 | 
						|
static void
 | 
						|
pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 | 
						|
						  int nrelations, Relation relations[],
 | 
						|
						  ReorderBufferChange *change)
 | 
						|
{
 | 
						|
	TestDecodingData *data = ctx->output_plugin_private;
 | 
						|
	TestDecodingTxnData *txndata = txn->output_plugin_private;
 | 
						|
 | 
						|
	if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
 | 
						|
	{
 | 
						|
		pg_output_stream_start(ctx, data, txn, false);
 | 
						|
	}
 | 
						|
	txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
 | 
						|
 | 
						|
	OutputPluginPrepareWrite(ctx, true);
 | 
						|
	if (data->include_xids)
 | 
						|
		appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
 | 
						|
	else
 | 
						|
		appendStringInfoString(ctx->out, "streaming truncate for transaction");
 | 
						|
	OutputPluginWrite(ctx, true);
 | 
						|
}
 |