mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-31 10:30:33 +03:00 
			
		
		
		
	Logical decoding should not publish anything about tables created as
part of a heap rewrite during DDL.  Those tables don't exist externally,
so consumers of logical decoding cannot do anything sensible with that
information.  In ab28feae2b, we worked
around this for built-in logical replication, but that was hack.
This is a more proper fix: We mark such transient heaps using the new
field pg_class.relwrite, linking to the original relation OID.  By
default, we ignore them in logical decoding before they get to the
output plugin.  Optionally, a plugin can register their interest in
getting such changes, if they handle DDL specially, in which case the
new field will help them get information about the actual table.
Reviewed-by: Craig Ringer <craig@2ndquadrant.com>
		
	
		
			
				
	
	
		
			494 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			494 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| /*-------------------------------------------------------------------------
 | |
|  *
 | |
|  * test_decoding.c
 | |
|  *		  example logical decoding output plugin
 | |
|  *
 | |
|  * Copyright (c) 2012-2018, PostgreSQL Global Development Group
 | |
|  *
 | |
|  * IDENTIFICATION
 | |
|  *		  contrib/test_decoding/test_decoding.c
 | |
|  *
 | |
|  *-------------------------------------------------------------------------
 | |
|  */
 | |
| #include "postgres.h"
 | |
| 
 | |
| #include "catalog/pg_type.h"
 | |
| 
 | |
| #include "replication/logical.h"
 | |
| #include "replication/origin.h"
 | |
| 
 | |
| #include "utils/builtins.h"
 | |
| #include "utils/lsyscache.h"
 | |
| #include "utils/memutils.h"
 | |
| #include "utils/rel.h"
 | |
| 
 | |
| PG_MODULE_MAGIC;
 | |
| 
 | |
| /* These must be available to pg_dlsym() */
 | |
| extern void _PG_init(void);
 | |
| extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
 | |
| 
 | |
| typedef struct
 | |
| {
 | |
| 	MemoryContext context;
 | |
| 	bool		include_xids;
 | |
| 	bool		include_timestamp;
 | |
| 	bool		skip_empty_xacts;
 | |
| 	bool		xact_wrote_changes;
 | |
| 	bool		only_local;
 | |
| } TestDecodingData;
 | |
| 
 | |
| static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 | |
| 				  bool is_init);
 | |
| static void pg_decode_shutdown(LogicalDecodingContext *ctx);
 | |
| static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
 | |
| 					ReorderBufferTXN *txn);
 | |
| static void pg_output_begin(LogicalDecodingContext *ctx,
 | |
| 				TestDecodingData *data,
 | |
| 				ReorderBufferTXN *txn,
 | |
| 				bool last_write);
 | |
| static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
 | |
| 					 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
 | |
| static void pg_decode_change(LogicalDecodingContext *ctx,
 | |
| 				 ReorderBufferTXN *txn, Relation rel,
 | |
| 				 ReorderBufferChange *change);
 | |
| static bool pg_decode_filter(LogicalDecodingContext *ctx,
 | |
| 				 RepOriginId origin_id);
 | |
| static void pg_decode_message(LogicalDecodingContext *ctx,
 | |
| 				  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 | |
| 				  bool transactional, const char *prefix,
 | |
| 				  Size sz, const char *message);
 | |
| 
 | |
| void
 | |
| _PG_init(void)
 | |
| {
 | |
| 	/* other plugins can perform things here */
 | |
| }
 | |
| 
 | |
| /* specify output plugin callbacks */
 | |
| void
 | |
| _PG_output_plugin_init(OutputPluginCallbacks *cb)
 | |
| {
 | |
| 	AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
 | |
| 
 | |
| 	cb->startup_cb = pg_decode_startup;
 | |
| 	cb->begin_cb = pg_decode_begin_txn;
 | |
| 	cb->change_cb = pg_decode_change;
 | |
| 	cb->commit_cb = pg_decode_commit_txn;
 | |
| 	cb->filter_by_origin_cb = pg_decode_filter;
 | |
| 	cb->shutdown_cb = pg_decode_shutdown;
 | |
| 	cb->message_cb = pg_decode_message;
 | |
| }
 | |
| 
 | |
| 
 | |
| /* initialize this plugin */
 | |
| static void
 | |
| pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 | |
| 				  bool is_init)
 | |
| {
 | |
| 	ListCell   *option;
 | |
| 	TestDecodingData *data;
 | |
| 
 | |
| 	data = palloc0(sizeof(TestDecodingData));
 | |
| 	data->context = AllocSetContextCreate(ctx->context,
 | |
| 										  "text conversion context",
 | |
| 										  ALLOCSET_DEFAULT_SIZES);
 | |
| 	data->include_xids = true;
 | |
| 	data->include_timestamp = false;
 | |
| 	data->skip_empty_xacts = false;
 | |
| 	data->only_local = false;
 | |
| 
 | |
| 	ctx->output_plugin_private = data;
 | |
| 
 | |
| 	opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
 | |
| 	opt->receive_rewrites = false;
 | |
| 
 | |
| 	foreach(option, ctx->output_plugin_options)
 | |
| 	{
 | |
| 		DefElem    *elem = lfirst(option);
 | |
| 
 | |
| 		Assert(elem->arg == NULL || IsA(elem->arg, String));
 | |
| 
 | |
| 		if (strcmp(elem->defname, "include-xids") == 0)
 | |
| 		{
 | |
| 			/* if option does not provide a value, it means its value is true */
 | |
| 			if (elem->arg == NULL)
 | |
| 				data->include_xids = true;
 | |
| 			else if (!parse_bool(strVal(elem->arg), &data->include_xids))
 | |
| 				ereport(ERROR,
 | |
| 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | |
| 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 | |
| 								strVal(elem->arg), elem->defname)));
 | |
| 		}
 | |
| 		else if (strcmp(elem->defname, "include-timestamp") == 0)
 | |
| 		{
 | |
| 			if (elem->arg == NULL)
 | |
| 				data->include_timestamp = true;
 | |
| 			else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
 | |
| 				ereport(ERROR,
 | |
| 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | |
| 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 | |
| 								strVal(elem->arg), elem->defname)));
 | |
| 		}
 | |
| 		else if (strcmp(elem->defname, "force-binary") == 0)
 | |
| 		{
 | |
| 			bool		force_binary;
 | |
| 
 | |
| 			if (elem->arg == NULL)
 | |
| 				continue;
 | |
| 			else if (!parse_bool(strVal(elem->arg), &force_binary))
 | |
| 				ereport(ERROR,
 | |
| 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | |
| 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 | |
| 								strVal(elem->arg), elem->defname)));
 | |
| 
 | |
| 			if (force_binary)
 | |
| 				opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
 | |
| 		}
 | |
| 		else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
 | |
| 		{
 | |
| 
 | |
| 			if (elem->arg == NULL)
 | |
| 				data->skip_empty_xacts = true;
 | |
| 			else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
 | |
| 				ereport(ERROR,
 | |
| 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | |
| 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 | |
| 								strVal(elem->arg), elem->defname)));
 | |
| 		}
 | |
| 		else if (strcmp(elem->defname, "only-local") == 0)
 | |
| 		{
 | |
| 
 | |
| 			if (elem->arg == NULL)
 | |
| 				data->only_local = true;
 | |
| 			else if (!parse_bool(strVal(elem->arg), &data->only_local))
 | |
| 				ereport(ERROR,
 | |
| 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | |
| 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 | |
| 								strVal(elem->arg), elem->defname)));
 | |
| 		}
 | |
| 		else if (strcmp(elem->defname, "include-rewrites") == 0)
 | |
| 		{
 | |
| 
 | |
| 			if (elem->arg == NULL)
 | |
| 				continue;
 | |
| 			else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
 | |
| 				ereport(ERROR,
 | |
| 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | |
| 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 | |
| 								strVal(elem->arg), elem->defname)));
 | |
| 		}
 | |
| 		else
 | |
| 		{
 | |
| 			ereport(ERROR,
 | |
| 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | |
| 					 errmsg("option \"%s\" = \"%s\" is unknown",
 | |
| 							elem->defname,
 | |
| 							elem->arg ? strVal(elem->arg) : "(null)")));
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /* cleanup this plugin's resources */
 | |
| static void
 | |
| pg_decode_shutdown(LogicalDecodingContext *ctx)
 | |
| {
 | |
| 	TestDecodingData *data = ctx->output_plugin_private;
 | |
| 
 | |
| 	/* cleanup our own resources via memory context reset */
 | |
| 	MemoryContextDelete(data->context);
 | |
| }
 | |
| 
 | |
| /* BEGIN callback */
 | |
| static void
 | |
| pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 | |
| {
 | |
| 	TestDecodingData *data = ctx->output_plugin_private;
 | |
| 
 | |
| 	data->xact_wrote_changes = false;
 | |
| 	if (data->skip_empty_xacts)
 | |
| 		return;
 | |
| 
 | |
| 	pg_output_begin(ctx, data, txn, true);
 | |
| }
 | |
| 
 | |
| static void
 | |
| pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 | |
| {
 | |
| 	OutputPluginPrepareWrite(ctx, last_write);
 | |
| 	if (data->include_xids)
 | |
| 		appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
 | |
| 	else
 | |
| 		appendStringInfoString(ctx->out, "BEGIN");
 | |
| 	OutputPluginWrite(ctx, last_write);
 | |
| }
 | |
| 
 | |
| /* COMMIT callback */
 | |
| static void
 | |
| pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 | |
| 					 XLogRecPtr commit_lsn)
 | |
| {
 | |
| 	TestDecodingData *data = ctx->output_plugin_private;
 | |
| 
 | |
| 	if (data->skip_empty_xacts && !data->xact_wrote_changes)
 | |
| 		return;
 | |
| 
 | |
| 	OutputPluginPrepareWrite(ctx, true);
 | |
| 	if (data->include_xids)
 | |
| 		appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
 | |
| 	else
 | |
| 		appendStringInfoString(ctx->out, "COMMIT");
 | |
| 
 | |
| 	if (data->include_timestamp)
 | |
| 		appendStringInfo(ctx->out, " (at %s)",
 | |
| 						 timestamptz_to_str(txn->commit_time));
 | |
| 
 | |
| 	OutputPluginWrite(ctx, true);
 | |
| }
 | |
| 
 | |
| static bool
 | |
| pg_decode_filter(LogicalDecodingContext *ctx,
 | |
| 				 RepOriginId origin_id)
 | |
| {
 | |
| 	TestDecodingData *data = ctx->output_plugin_private;
 | |
| 
 | |
| 	if (data->only_local && origin_id != InvalidRepOriginId)
 | |
| 		return true;
 | |
| 	return false;
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Print literal `outputstr' already represented as string of type `typid'
 | |
|  * into stringbuf `s'.
 | |
|  *
 | |
|  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
 | |
|  * if standard_conforming_strings were enabled.
 | |
|  */
 | |
| static void
 | |
| print_literal(StringInfo s, Oid typid, char *outputstr)
 | |
| {
 | |
| 	const char *valptr;
 | |
| 
 | |
| 	switch (typid)
 | |
| 	{
 | |
| 		case INT2OID:
 | |
| 		case INT4OID:
 | |
| 		case INT8OID:
 | |
| 		case OIDOID:
 | |
| 		case FLOAT4OID:
 | |
| 		case FLOAT8OID:
 | |
| 		case NUMERICOID:
 | |
| 			/* NB: We don't care about Inf, NaN et al. */
 | |
| 			appendStringInfoString(s, outputstr);
 | |
| 			break;
 | |
| 
 | |
| 		case BITOID:
 | |
| 		case VARBITOID:
 | |
| 			appendStringInfo(s, "B'%s'", outputstr);
 | |
| 			break;
 | |
| 
 | |
| 		case BOOLOID:
 | |
| 			if (strcmp(outputstr, "t") == 0)
 | |
| 				appendStringInfoString(s, "true");
 | |
| 			else
 | |
| 				appendStringInfoString(s, "false");
 | |
| 			break;
 | |
| 
 | |
| 		default:
 | |
| 			appendStringInfoChar(s, '\'');
 | |
| 			for (valptr = outputstr; *valptr; valptr++)
 | |
| 			{
 | |
| 				char		ch = *valptr;
 | |
| 
 | |
| 				if (SQL_STR_DOUBLE(ch, false))
 | |
| 					appendStringInfoChar(s, ch);
 | |
| 				appendStringInfoChar(s, ch);
 | |
| 			}
 | |
| 			appendStringInfoChar(s, '\'');
 | |
| 			break;
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /* print the tuple 'tuple' into the StringInfo s */
 | |
| static void
 | |
| tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
 | |
| {
 | |
| 	int			natt;
 | |
| 	Oid			oid;
 | |
| 
 | |
| 	/* print oid of tuple, it's not included in the TupleDesc */
 | |
| 	if ((oid = HeapTupleHeaderGetOid(tuple->t_data)) != InvalidOid)
 | |
| 	{
 | |
| 		appendStringInfo(s, " oid[oid]:%u", oid);
 | |
| 	}
 | |
| 
 | |
| 	/* print all columns individually */
 | |
| 	for (natt = 0; natt < tupdesc->natts; natt++)
 | |
| 	{
 | |
| 		Form_pg_attribute attr; /* the attribute itself */
 | |
| 		Oid			typid;		/* type of current attribute */
 | |
| 		Oid			typoutput;	/* output function */
 | |
| 		bool		typisvarlena;
 | |
| 		Datum		origval;	/* possibly toasted Datum */
 | |
| 		bool		isnull;		/* column is null? */
 | |
| 
 | |
| 		attr = TupleDescAttr(tupdesc, natt);
 | |
| 
 | |
| 		/*
 | |
| 		 * don't print dropped columns, we can't be sure everything is
 | |
| 		 * available for them
 | |
| 		 */
 | |
| 		if (attr->attisdropped)
 | |
| 			continue;
 | |
| 
 | |
| 		/*
 | |
| 		 * Don't print system columns, oid will already have been printed if
 | |
| 		 * present.
 | |
| 		 */
 | |
| 		if (attr->attnum < 0)
 | |
| 			continue;
 | |
| 
 | |
| 		typid = attr->atttypid;
 | |
| 
 | |
| 		/* get Datum from tuple */
 | |
| 		origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
 | |
| 
 | |
| 		if (isnull && skip_nulls)
 | |
| 			continue;
 | |
| 
 | |
| 		/* print attribute name */
 | |
| 		appendStringInfoChar(s, ' ');
 | |
| 		appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
 | |
| 
 | |
| 		/* print attribute type */
 | |
| 		appendStringInfoChar(s, '[');
 | |
| 		appendStringInfoString(s, format_type_be(typid));
 | |
| 		appendStringInfoChar(s, ']');
 | |
| 
 | |
| 		/* query output function */
 | |
| 		getTypeOutputInfo(typid,
 | |
| 						  &typoutput, &typisvarlena);
 | |
| 
 | |
| 		/* print separator */
 | |
| 		appendStringInfoChar(s, ':');
 | |
| 
 | |
| 		/* print data */
 | |
| 		if (isnull)
 | |
| 			appendStringInfoString(s, "null");
 | |
| 		else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
 | |
| 			appendStringInfoString(s, "unchanged-toast-datum");
 | |
| 		else if (!typisvarlena)
 | |
| 			print_literal(s, typid,
 | |
| 						  OidOutputFunctionCall(typoutput, origval));
 | |
| 		else
 | |
| 		{
 | |
| 			Datum		val;	/* definitely detoasted Datum */
 | |
| 
 | |
| 			val = PointerGetDatum(PG_DETOAST_DATUM(origval));
 | |
| 			print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * callback for individual changed tuples
 | |
|  */
 | |
| static void
 | |
| pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 | |
| 				 Relation relation, ReorderBufferChange *change)
 | |
| {
 | |
| 	TestDecodingData *data;
 | |
| 	Form_pg_class class_form;
 | |
| 	TupleDesc	tupdesc;
 | |
| 	MemoryContext old;
 | |
| 
 | |
| 	data = ctx->output_plugin_private;
 | |
| 
 | |
| 	/* output BEGIN if we haven't yet */
 | |
| 	if (data->skip_empty_xacts && !data->xact_wrote_changes)
 | |
| 	{
 | |
| 		pg_output_begin(ctx, data, txn, false);
 | |
| 	}
 | |
| 	data->xact_wrote_changes = true;
 | |
| 
 | |
| 	class_form = RelationGetForm(relation);
 | |
| 	tupdesc = RelationGetDescr(relation);
 | |
| 
 | |
| 	/* Avoid leaking memory by using and resetting our own context */
 | |
| 	old = MemoryContextSwitchTo(data->context);
 | |
| 
 | |
| 	OutputPluginPrepareWrite(ctx, true);
 | |
| 
 | |
| 	appendStringInfoString(ctx->out, "table ");
 | |
| 	appendStringInfoString(ctx->out,
 | |
| 						   quote_qualified_identifier(
 | |
| 													  get_namespace_name(
 | |
| 																		 get_rel_namespace(RelationGetRelid(relation))),
 | |
| 													  class_form->relrewrite ?
 | |
| 													  get_rel_name(class_form->relrewrite) :
 | |
| 													  NameStr(class_form->relname)));
 | |
| 	appendStringInfoChar(ctx->out, ':');
 | |
| 
 | |
| 	switch (change->action)
 | |
| 	{
 | |
| 		case REORDER_BUFFER_CHANGE_INSERT:
 | |
| 			appendStringInfoString(ctx->out, " INSERT:");
 | |
| 			if (change->data.tp.newtuple == NULL)
 | |
| 				appendStringInfoString(ctx->out, " (no-tuple-data)");
 | |
| 			else
 | |
| 				tuple_to_stringinfo(ctx->out, tupdesc,
 | |
| 									&change->data.tp.newtuple->tuple,
 | |
| 									false);
 | |
| 			break;
 | |
| 		case REORDER_BUFFER_CHANGE_UPDATE:
 | |
| 			appendStringInfoString(ctx->out, " UPDATE:");
 | |
| 			if (change->data.tp.oldtuple != NULL)
 | |
| 			{
 | |
| 				appendStringInfoString(ctx->out, " old-key:");
 | |
| 				tuple_to_stringinfo(ctx->out, tupdesc,
 | |
| 									&change->data.tp.oldtuple->tuple,
 | |
| 									true);
 | |
| 				appendStringInfoString(ctx->out, " new-tuple:");
 | |
| 			}
 | |
| 
 | |
| 			if (change->data.tp.newtuple == NULL)
 | |
| 				appendStringInfoString(ctx->out, " (no-tuple-data)");
 | |
| 			else
 | |
| 				tuple_to_stringinfo(ctx->out, tupdesc,
 | |
| 									&change->data.tp.newtuple->tuple,
 | |
| 									false);
 | |
| 			break;
 | |
| 		case REORDER_BUFFER_CHANGE_DELETE:
 | |
| 			appendStringInfoString(ctx->out, " DELETE:");
 | |
| 
 | |
| 			/* if there was no PK, we only know that a delete happened */
 | |
| 			if (change->data.tp.oldtuple == NULL)
 | |
| 				appendStringInfoString(ctx->out, " (no-tuple-data)");
 | |
| 			/* In DELETE, only the replica identity is present; display that */
 | |
| 			else
 | |
| 				tuple_to_stringinfo(ctx->out, tupdesc,
 | |
| 									&change->data.tp.oldtuple->tuple,
 | |
| 									true);
 | |
| 			break;
 | |
| 		default:
 | |
| 			Assert(false);
 | |
| 	}
 | |
| 
 | |
| 	MemoryContextSwitchTo(old);
 | |
| 	MemoryContextReset(data->context);
 | |
| 
 | |
| 	OutputPluginWrite(ctx, true);
 | |
| }
 | |
| 
 | |
| static void
 | |
| pg_decode_message(LogicalDecodingContext *ctx,
 | |
| 				  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
 | |
| 				  const char *prefix, Size sz, const char *message)
 | |
| {
 | |
| 	OutputPluginPrepareWrite(ctx, true);
 | |
| 	appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
 | |
| 					 transactional, prefix, sz);
 | |
| 	appendBinaryStringInfo(ctx->out, message, sz);
 | |
| 	OutputPluginWrite(ctx, true);
 | |
| }
 |