mirror of
https://github.com/postgres/postgres.git
synced 2025-05-28 05:21:27 +03:00
94 lines
2.6 KiB
C
94 lines
2.6 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* message.c
|
|
* Generic logical messages.
|
|
*
|
|
* Copyright (c) 2013-2018, PostgreSQL Global Development Group
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/replication/logical/message.c
|
|
*
|
|
* NOTES
|
|
*
|
|
* Generic logical messages allow XLOG logging of arbitrary binary blobs that
|
|
* get passed to the logical decoding plugin. In normal XLOG processing they
|
|
* are same as NOOP.
|
|
*
|
|
* These messages can be either transactional or non-transactional.
|
|
* Transactional messages are part of current transaction and will be sent to
|
|
* decoding plugin using in a same way as DML operations.
|
|
* Non-transactional messages are sent to the plugin at the time when the
|
|
* logical decoding reads them from XLOG. This also means that transactional
|
|
* messages won't be delivered if the transaction was rolled back but the
|
|
* non-transactional one will always be delivered.
|
|
*
|
|
* Every message carries prefix to avoid conflicts between different decoding
|
|
* plugins. The plugin authors must take extra care to use unique prefix,
|
|
* good options seems to be for example to use the name of the extension.
|
|
*
|
|
* ---------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
#include "miscadmin.h"
|
|
|
|
#include "access/xact.h"
|
|
|
|
#include "catalog/indexing.h"
|
|
|
|
#include "nodes/execnodes.h"
|
|
|
|
#include "replication/message.h"
|
|
#include "replication/logical.h"
|
|
|
|
#include "utils/memutils.h"
|
|
|
|
/*
|
|
* Write logical decoding message into XLog.
|
|
*/
|
|
XLogRecPtr
|
|
LogLogicalMessage(const char *prefix, const char *message, size_t size,
|
|
bool transactional)
|
|
{
|
|
xl_logical_message xlrec;
|
|
|
|
/*
|
|
* Force xid to be allocated if we're emitting a transactional message.
|
|
*/
|
|
if (transactional)
|
|
{
|
|
Assert(IsTransactionState());
|
|
GetCurrentTransactionId();
|
|
}
|
|
|
|
xlrec.dbId = MyDatabaseId;
|
|
xlrec.transactional = transactional;
|
|
xlrec.prefix_size = strlen(prefix) + 1;
|
|
xlrec.message_size = size;
|
|
|
|
XLogBeginInsert();
|
|
XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
|
|
XLogRegisterData((char *) prefix, xlrec.prefix_size);
|
|
XLogRegisterData((char *) message, size);
|
|
|
|
/* allow origin filtering */
|
|
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
|
|
|
|
return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
|
|
}
|
|
|
|
/*
|
|
* Redo is basically just noop for logical decoding messages.
|
|
*/
|
|
void
|
|
logicalmsg_redo(XLogReaderState *record)
|
|
{
|
|
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
|
|
|
|
if (info != XLOG_LOGICAL_MESSAGE)
|
|
elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
|
|
|
|
/* This is only interesting for logical decoding, see decode.c. */
|
|
}
|