mirror of
https://github.com/postgres/postgres.git
synced 2025-11-10 17:42:29 +03:00
Generic Messages for Logical Decoding
API and mechanism to allow generic messages to be inserted into WAL that are intended to be read by logical decoding plugins. This commit adds an optional new callback to the logical decoding API. Messages are either text or bytea. Messages can be transactional, or not, and are identified by a prefix to allow multiple concurrent decoding plugins. (Not to be confused with Generic WAL records, which are intended to allow crash recovery of extensible objects.) Author: Petr Jelinek and Andres Freund Reviewers: Artur Zakirov, Tomas Vondra, Simon Riggs Discussion: 5685F999.6010202@2ndquadrant.com
This commit is contained in:
@@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global
|
||||
|
||||
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
|
||||
|
||||
OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o origin.o \
|
||||
OBJS = decode.o logical.o logicalfuncs.o message.o origin.o reorderbuffer.o \
|
||||
snapbuild.o
|
||||
|
||||
include $(top_srcdir)/src/backend/common.mk
|
||||
|
||||
@@ -39,6 +39,7 @@
|
||||
|
||||
#include "replication/decode.h"
|
||||
#include "replication/logical.h"
|
||||
#include "replication/message.h"
|
||||
#include "replication/reorderbuffer.h"
|
||||
#include "replication/origin.h"
|
||||
#include "replication/snapbuild.h"
|
||||
@@ -58,6 +59,7 @@ static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||
static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||
static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||
static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||
static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||
|
||||
/* individual record(group)'s handlers */
|
||||
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||
@@ -123,6 +125,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
|
||||
DecodeHeapOp(ctx, &buf);
|
||||
break;
|
||||
|
||||
case RM_LOGICALMSG_ID:
|
||||
DecodeLogicalMsgOp(ctx, &buf);
|
||||
break;
|
||||
|
||||
/*
|
||||
* Rmgrs irrelevant for logical decoding; they describe stuff not
|
||||
* represented in logical decoding. Add new rmgrs in rmgrlist.h's
|
||||
@@ -458,6 +464,46 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
|
||||
*/
|
||||
static void
|
||||
DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
{
|
||||
SnapBuild *builder = ctx->snapshot_builder;
|
||||
XLogReaderState *r = buf->record;
|
||||
TransactionId xid = XLogRecGetXid(r);
|
||||
uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
|
||||
Snapshot snapshot;
|
||||
xl_logical_message *message;
|
||||
|
||||
if (info != XLOG_LOGICAL_MESSAGE)
|
||||
elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
|
||||
|
||||
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
|
||||
|
||||
/* No point in doing anything yet. */
|
||||
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
|
||||
return;
|
||||
|
||||
message = (xl_logical_message *) XLogRecGetData(r);
|
||||
|
||||
if (message->transactional &&
|
||||
!SnapBuildProcessChange(builder, xid, buf->origptr))
|
||||
return;
|
||||
else if (!message->transactional &&
|
||||
(SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
|
||||
SnapBuildXactNeedsSkip(builder, buf->origptr)))
|
||||
return;
|
||||
|
||||
snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
|
||||
ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
|
||||
message->transactional,
|
||||
message->message, /* first part of message is prefix */
|
||||
message->message_size,
|
||||
message->message + message->prefix_size);
|
||||
}
|
||||
|
||||
static inline bool
|
||||
FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
|
||||
{
|
||||
|
||||
@@ -62,6 +62,9 @@ static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
XLogRecPtr commit_lsn);
|
||||
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change);
|
||||
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
XLogRecPtr message_lsn, bool transactional,
|
||||
const char *prefix, Size message_size, const char *message);
|
||||
|
||||
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
|
||||
|
||||
@@ -178,6 +181,7 @@ StartupDecodingContext(List *output_plugin_options,
|
||||
ctx->reorder->begin = begin_cb_wrapper;
|
||||
ctx->reorder->apply_change = change_cb_wrapper;
|
||||
ctx->reorder->commit = commit_cb_wrapper;
|
||||
ctx->reorder->message = message_cb_wrapper;
|
||||
|
||||
ctx->out = makeStringInfo();
|
||||
ctx->prepare_write = prepare_write;
|
||||
@@ -702,6 +706,40 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void
|
||||
message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
XLogRecPtr message_lsn, bool transactional,
|
||||
const char *prefix, Size message_size, const char *message)
|
||||
{
|
||||
LogicalDecodingContext *ctx = cache->private_data;
|
||||
LogicalErrorCallbackState state;
|
||||
ErrorContextCallback errcallback;
|
||||
|
||||
if (ctx->callbacks.message_cb == NULL)
|
||||
return;
|
||||
|
||||
/* Push callback + info on the error context stack */
|
||||
state.ctx = ctx;
|
||||
state.callback_name = "message";
|
||||
state.report_location = message_lsn;
|
||||
errcallback.callback = output_plugin_error_callback;
|
||||
errcallback.arg = (void *) &state;
|
||||
errcallback.previous = error_context_stack;
|
||||
error_context_stack = &errcallback;
|
||||
|
||||
/* set output state */
|
||||
ctx->accept_writes = true;
|
||||
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
|
||||
ctx->write_location = message_lsn;
|
||||
|
||||
/* do the actual work: call callback */
|
||||
ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
|
||||
message_size, message);
|
||||
|
||||
/* Pop the error context stack */
|
||||
error_context_stack = errcallback.previous;
|
||||
}
|
||||
|
||||
/*
|
||||
* Set the required catalog xmin horizon for historic snapshots in the current
|
||||
* replication slot.
|
||||
|
||||
@@ -24,6 +24,8 @@
|
||||
#include "access/xlog_internal.h"
|
||||
#include "access/xlogutils.h"
|
||||
|
||||
#include "access/xact.h"
|
||||
|
||||
#include "catalog/pg_type.h"
|
||||
|
||||
#include "nodes/makefuncs.h"
|
||||
@@ -41,6 +43,7 @@
|
||||
#include "replication/decode.h"
|
||||
#include "replication/logical.h"
|
||||
#include "replication/logicalfuncs.h"
|
||||
#include "replication/message.h"
|
||||
|
||||
#include "storage/fd.h"
|
||||
|
||||
@@ -380,3 +383,27 @@ pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
|
||||
{
|
||||
return pg_logical_slot_get_changes_guts(fcinfo, false, true);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SQL function for writing logical decding message into WAL.
|
||||
*/
|
||||
Datum
|
||||
pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
|
||||
{
|
||||
bool transactional = PG_GETARG_BOOL(0);
|
||||
char *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
|
||||
bytea *data = PG_GETARG_BYTEA_PP(2);
|
||||
XLogRecPtr lsn;
|
||||
|
||||
lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
|
||||
transactional);
|
||||
PG_RETURN_LSN(lsn);
|
||||
}
|
||||
|
||||
Datum
|
||||
pg_logical_emit_message_text(PG_FUNCTION_ARGS)
|
||||
{
|
||||
/* bytea and text are compatible */
|
||||
return pg_logical_emit_message_bytea(fcinfo);
|
||||
}
|
||||
|
||||
87
src/backend/replication/logical/message.c
Normal file
87
src/backend/replication/logical/message.c
Normal file
@@ -0,0 +1,87 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* message.c
|
||||
* Generic logical messages.
|
||||
*
|
||||
* Copyright (c) 2013-2016, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/replication/logical/message.c
|
||||
*
|
||||
* NOTES
|
||||
*
|
||||
* Generic logical messages allow XLOG logging of arbitrary binary blobs that
|
||||
* get passed to the logical decoding plugin. In normal XLOG processing they
|
||||
* are same as NOOP.
|
||||
*
|
||||
* These messages can be either transactional or non-transactional.
|
||||
* Transactional messages are part of current transaction and will be sent to
|
||||
* decoding plugin using in a same way as DML operations.
|
||||
* Non-transactional messages are sent to the plugin at the time when the
|
||||
* logical decoding reads them from XLOG. This also means that transactional
|
||||
* messages won't be delivered if the transaction was rolled back but the
|
||||
* non-transactional one will be delivered always.
|
||||
*
|
||||
* Every message carries prefix to avoid conflicts between different decoding
|
||||
* plugins. The plugin authors must take extra care to use unique prefix,
|
||||
* good options seems to be for example to use the name of the extension.
|
||||
*
|
||||
* ---------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/xact.h"
|
||||
|
||||
#include "catalog/indexing.h"
|
||||
|
||||
#include "nodes/execnodes.h"
|
||||
|
||||
#include "replication/message.h"
|
||||
#include "replication/logical.h"
|
||||
|
||||
#include "utils/memutils.h"
|
||||
|
||||
/*
|
||||
* Write logical decoding message into XLog.
|
||||
*/
|
||||
XLogRecPtr
|
||||
LogLogicalMessage(const char *prefix, const char *message, size_t size,
|
||||
bool transactional)
|
||||
{
|
||||
xl_logical_message xlrec;
|
||||
|
||||
/*
|
||||
* Force xid to be allocated if we're emitting a transactional message.
|
||||
*/
|
||||
if (transactional)
|
||||
{
|
||||
Assert(IsTransactionState());
|
||||
GetCurrentTransactionId();
|
||||
}
|
||||
|
||||
xlrec.transactional = transactional;
|
||||
xlrec.prefix_size = strlen(prefix) + 1;
|
||||
xlrec.message_size = size;
|
||||
|
||||
XLogBeginInsert();
|
||||
XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
|
||||
XLogRegisterData((char *) prefix, xlrec.prefix_size);
|
||||
XLogRegisterData((char *) message, size);
|
||||
|
||||
return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
|
||||
}
|
||||
|
||||
/*
|
||||
* Redo is basically just noop for logical decoding messages.
|
||||
*/
|
||||
void
|
||||
logicalmsg_redo(XLogReaderState *record)
|
||||
{
|
||||
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
|
||||
|
||||
if (info != XLOG_LOGICAL_MESSAGE)
|
||||
elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
|
||||
|
||||
/* This is only interesting for logical decoding, see decode.c. */
|
||||
}
|
||||
@@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
|
||||
change->data.tp.oldtuple = NULL;
|
||||
}
|
||||
break;
|
||||
case REORDER_BUFFER_CHANGE_MESSAGE:
|
||||
if (change->data.msg.prefix != NULL)
|
||||
pfree(change->data.msg.prefix);
|
||||
change->data.msg.prefix = NULL;
|
||||
if (change->data.msg.message != NULL)
|
||||
pfree(change->data.msg.message);
|
||||
change->data.msg.message = NULL;
|
||||
break;
|
||||
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
|
||||
if (change->data.snapshot)
|
||||
{
|
||||
@@ -627,6 +635,61 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
|
||||
ReorderBufferCheckSerializeTXN(rb, txn);
|
||||
}
|
||||
|
||||
/*
|
||||
* Queue message into a transaction so it can be processed upon commit.
|
||||
*/
|
||||
void
|
||||
ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
|
||||
Snapshot snapshot, XLogRecPtr lsn,
|
||||
bool transactional, const char *prefix,
|
||||
Size message_size, const char *message)
|
||||
{
|
||||
if (transactional)
|
||||
{
|
||||
MemoryContext oldcontext;
|
||||
ReorderBufferChange *change;
|
||||
|
||||
Assert(xid != InvalidTransactionId);
|
||||
|
||||
oldcontext = MemoryContextSwitchTo(rb->context);
|
||||
|
||||
change = ReorderBufferGetChange(rb);
|
||||
change->action = REORDER_BUFFER_CHANGE_MESSAGE;
|
||||
change->data.msg.prefix = pstrdup(prefix);
|
||||
change->data.msg.message_size = message_size;
|
||||
change->data.msg.message = palloc(message_size);
|
||||
memcpy(change->data.msg.message, message, message_size);
|
||||
|
||||
ReorderBufferQueueChange(rb, xid, lsn, change);
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
}
|
||||
else
|
||||
{
|
||||
ReorderBufferTXN *txn = NULL;
|
||||
volatile Snapshot snapshot_now = snapshot;
|
||||
|
||||
if (xid != InvalidTransactionId)
|
||||
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
|
||||
|
||||
/* setup snapshot to allow catalog access */
|
||||
SetupHistoricSnapshot(snapshot_now, NULL);
|
||||
PG_TRY();
|
||||
{
|
||||
rb->message(rb, txn, lsn, false, prefix, message_size, message);
|
||||
|
||||
TeardownHistoricSnapshot(false);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
TeardownHistoricSnapshot(true);
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
AssertTXNLsnOrder(ReorderBuffer *rb)
|
||||
{
|
||||
@@ -1493,6 +1556,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
|
||||
specinsert = change;
|
||||
break;
|
||||
|
||||
case REORDER_BUFFER_CHANGE_MESSAGE:
|
||||
rb->message(rb, txn, change->lsn, true,
|
||||
change->data.msg.prefix,
|
||||
change->data.msg.message_size,
|
||||
change->data.msg.message);
|
||||
break;
|
||||
|
||||
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
|
||||
/* get rid of the old */
|
||||
TeardownHistoricSnapshot(false);
|
||||
@@ -2157,6 +2227,33 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
memcpy(data, newtup->tuple.t_data, newlen);
|
||||
data += newlen;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case REORDER_BUFFER_CHANGE_MESSAGE:
|
||||
{
|
||||
char *data;
|
||||
Size prefix_size = strlen(change->data.msg.prefix) + 1;
|
||||
|
||||
sz += prefix_size + change->data.msg.message_size +
|
||||
sizeof(Size) + sizeof(Size);
|
||||
ReorderBufferSerializeReserve(rb, sz);
|
||||
|
||||
data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
|
||||
|
||||
/* write the prefix including the size */
|
||||
memcpy(data, &prefix_size, sizeof(Size));
|
||||
data += sizeof(Size);
|
||||
memcpy(data, change->data.msg.prefix,
|
||||
prefix_size);
|
||||
data += prefix_size;
|
||||
|
||||
/* write the message including the size */
|
||||
memcpy(data, &change->data.msg.message_size, sizeof(Size));
|
||||
data += sizeof(Size);
|
||||
memcpy(data, change->data.msg.message,
|
||||
change->data.msg.message_size);
|
||||
data += change->data.msg.message_size;
|
||||
|
||||
break;
|
||||
}
|
||||
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
|
||||
@@ -2415,6 +2512,30 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
}
|
||||
|
||||
break;
|
||||
case REORDER_BUFFER_CHANGE_MESSAGE:
|
||||
{
|
||||
Size prefix_size;
|
||||
|
||||
/* read prefix */
|
||||
memcpy(&prefix_size, data, sizeof(Size));
|
||||
data += sizeof(Size);
|
||||
change->data.msg.prefix = MemoryContextAlloc(rb->context,
|
||||
prefix_size);
|
||||
memcpy(change->data.msg.prefix, data, prefix_size);
|
||||
Assert(change->data.msg.prefix[prefix_size-1] == '\0');
|
||||
data += prefix_size;
|
||||
|
||||
/* read the messsage */
|
||||
memcpy(&change->data.msg.message_size, data, sizeof(Size));
|
||||
data += sizeof(Size);
|
||||
change->data.msg.message = MemoryContextAlloc(rb->context,
|
||||
change->data.msg.message_size);
|
||||
memcpy(change->data.msg.message, data,
|
||||
change->data.msg.message_size);
|
||||
data += change->data.msg.message_size;
|
||||
|
||||
break;
|
||||
}
|
||||
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
|
||||
{
|
||||
Snapshot oldsnap;
|
||||
|
||||
@@ -604,6 +604,25 @@ SnapBuildExportSnapshot(SnapBuild *builder)
|
||||
return snapname;
|
||||
}
|
||||
|
||||
/*
|
||||
* Ensure there is a snapshot and if not build one for current transaction.
|
||||
*/
|
||||
Snapshot
|
||||
SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
|
||||
{
|
||||
Assert(builder->state == SNAPBUILD_CONSISTENT);
|
||||
|
||||
/* only build a new snapshot if we don't have a prebuilt one */
|
||||
if (builder->snapshot == NULL)
|
||||
{
|
||||
builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
|
||||
/* inrease refcount for the snapshot builder */
|
||||
SnapBuildSnapIncRefcount(builder->snapshot);
|
||||
}
|
||||
|
||||
return builder->snapshot;
|
||||
}
|
||||
|
||||
/*
|
||||
* Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
|
||||
* any. Aborts the previously started transaction and resets the resource
|
||||
|
||||
Reference in New Issue
Block a user