1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-09 06:21:09 +03:00

Add decoding of sequences to built-in replication

This commit adds support for decoding of sequences to the built-in
replication (the infrastructure was added by commit 0da92dc530).

The syntax and behavior mostly mimics handling of tables, i.e. a
publication may be defined as FOR ALL SEQUENCES (replicating all
sequences in a database), FOR ALL SEQUENCES IN SCHEMA (replicating
all sequences in a particular schema) or individual sequences.

To publish sequence modifications, the publication has to include
'sequence' action. The protocol is extended with a new message,
describing sequence increments.

A new system view pg_publication_sequences lists all the sequences
added to a publication, both directly and indirectly. Various psql
commands (\d and \dRp) are improved to also display publications
including a given sequence, or sequences included in a publication.

Author: Tomas Vondra, Cary Huang
Reviewed-by: Peter Eisentraut, Amit Kapila, Hannu Krosing, Andres
             Freund, Petr Jelinek
Discussion: https://postgr.es/m/d045f3c2-6cfb-06d3-5540-e63c320df8bc@enterprisedb.com
Discussion: https://postgr.es/m/1710ed7e13b.cd7177461430746.3372264562543607781@highgo.ca
This commit is contained in:
Tomas Vondra
2022-03-24 18:20:21 +01:00
parent 0adb3dc68b
commit 75b1521dae
40 changed files with 3236 additions and 469 deletions

View File

@@ -648,6 +648,56 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
pq_sendbytes(out, message, sz);
}
/*
* Write SEQUENCE to stream
*/
void
logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
XLogRecPtr lsn, bool transactional,
int64 last_value, int64 log_cnt, bool is_called)
{
uint8 flags = 0;
char *relname;
pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE);
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
pq_sendint8(out, flags);
pq_sendint64(out, lsn);
logicalrep_write_namespace(out, RelationGetNamespace(rel));
relname = RelationGetRelationName(rel);
pq_sendstring(out, relname);
pq_sendint8(out, transactional);
pq_sendint64(out, last_value);
pq_sendint64(out, log_cnt);
pq_sendint8(out, is_called);
}
/*
* Read SEQUENCE from the stream.
*/
void
logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
{
/* XXX skipping flags and lsn */
pq_getmsgint(in, 1);
pq_getmsgint64(in);
/* Read relation name from stream */
seqdata->nspname = pstrdup(logicalrep_read_namespace(in));
seqdata->seqname = pstrdup(pq_getmsgstring(in));
seqdata->transactional = pq_getmsgint(in, 1);
seqdata->last_value = pq_getmsgint64(in);
seqdata->log_cnt = pq_getmsgint64(in);
seqdata->is_called = pq_getmsgint(in, 1);
}
/*
* Write relation description to the output stream.
*/
@@ -1203,6 +1253,8 @@ logicalrep_message_type(LogicalRepMsgType action)
return "STREAM ABORT";
case LOGICAL_REP_MSG_STREAM_PREPARE:
return "STREAM PREPARE";
case LOGICAL_REP_MSG_SEQUENCE:
return "SEQUENCE";
}
elog(ERROR, "invalid logical replication message type \"%c\"", action);

View File

@@ -100,6 +100,7 @@
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
#include "commands/sequence.h"
#include "miscadmin.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
@@ -999,6 +1000,95 @@ copy_table(Relation rel)
logicalrep_rel_close(relmapentry, NoLock);
}
/*
* Fetch sequence data (current state) from the remote node.
*/
static void
fetch_sequence_data(char *nspname, char *relname,
int64 *last_value, int64 *log_cnt, bool *is_called)
{
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[3] = {INT8OID, INT8OID, BOOLOID};
initStringInfo(&cmd);
appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n"
" FROM %s", quote_qualified_identifier(nspname, relname));
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow);
pfree(cmd.data);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
(errmsg("could not receive list of replicated tables from the publisher: %s",
res->err)));
/* Process the sequence. */
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
{
bool isnull;
*last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
Assert(!isnull);
*log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull));
Assert(!isnull);
*is_called = DatumGetBool(slot_getattr(slot, 3, &isnull));
Assert(!isnull);
ExecClearTuple(slot);
}
ExecDropSingleTupleTableSlot(slot);
walrcv_clear_result(res);
}
/*
* Copy existing data of a sequence from publisher.
*
* Caller is responsible for locking the local relation.
*/
static void
copy_sequence(Relation rel)
{
LogicalRepRelMapEntry *relmapentry;
LogicalRepRelation lrel;
List *qual = NIL;
StringInfoData cmd;
int64 last_value = 0,
log_cnt = 0;
bool is_called = 0;
/* Get the publisher relation info. */
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
RelationGetRelationName(rel), &lrel, &qual);
/* sequences don't have row filters */
Assert(!qual);
/* Put the relation into relmap. */
logicalrep_relmap_update(&lrel);
/* Map the publisher relation to local one. */
relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
Assert(rel == relmapentry->localrel);
/* Start copy on the publisher. */
initStringInfo(&cmd);
Assert(lrel.relkind == RELKIND_SEQUENCE);
fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called);
/* tablesync sets the sequences in non-transactional way */
SetSequence(RelationGetRelid(rel), false, last_value, log_cnt, is_called);
logicalrep_rel_close(relmapentry, NoLock);
}
/*
* Determine the tablesync slot name.
*
@@ -1260,10 +1350,21 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
originname)));
}
/* Now do the initial data copy */
PushActiveSnapshot(GetTransactionSnapshot());
copy_table(rel);
PopActiveSnapshot();
/* Do the right action depending on the relation kind. */
if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE)
{
/* Now do the initial sequence copy */
PushActiveSnapshot(GetTransactionSnapshot());
copy_sequence(rel);
PopActiveSnapshot();
}
else
{
/* Now do the initial data copy */
PushActiveSnapshot(GetTransactionSnapshot());
copy_table(rel);
PopActiveSnapshot();
}
res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)

View File

@@ -143,6 +143,7 @@
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_tablespace.h"
#include "commands/sequence.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "commands/trigger.h"
@@ -1143,6 +1144,57 @@ apply_handle_origin(StringInfo s)
errmsg_internal("ORIGIN message sent out of order")));
}
/*
* Handle SEQUENCE message.
*/
static void
apply_handle_sequence(StringInfo s)
{
LogicalRepSequence seq;
Oid relid;
if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s))
return;
logicalrep_read_sequence(s, &seq);
/*
* Non-transactional sequence updates should not be part of a remote
* transaction. There should not be any running transaction.
*/
Assert((!seq.transactional) || in_remote_transaction);
Assert(!(!seq.transactional && in_remote_transaction));
Assert(!(!seq.transactional && IsTransactionState()));
/*
* Make sure we're in a transaction (needed by SetSequence). For
* non-transactional updates we're guaranteed to start a new one,
* and we'll commit it at the end.
*/
if (!IsTransactionState())
{
StartTransactionCommand();
maybe_reread_subscription();
}
relid = RangeVarGetRelid(makeRangeVar(seq.nspname,
seq.seqname, -1),
RowExclusiveLock, false);
/* lock the sequence in AccessExclusiveLock, as expected by SetSequence */
LockRelationOid(relid, AccessExclusiveLock);
/* apply the sequence change */
SetSequence(relid, seq.transactional, seq.last_value, seq.log_cnt, seq.is_called);
/*
* Commit the per-stream transaction (we only do this when not in
* remote transaction, i.e. for non-transactional sequence updates.
*/
if (!in_remote_transaction)
CommitTransactionCommand();
}
/*
* Handle STREAM START message.
*/
@@ -2511,6 +2563,10 @@ apply_dispatch(StringInfo s)
*/
break;
case LOGICAL_REP_MSG_SEQUENCE:
apply_handle_sequence(s);
return;
case LOGICAL_REP_MSG_STREAM_START:
apply_handle_stream_start(s);
break;

View File

@@ -15,6 +15,7 @@
#include "access/tupconvert.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_namespace.h"
#include "catalog/pg_publication_rel.h"
#include "commands/defrem.h"
#include "executor/executor.h"
@@ -53,6 +54,10 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
static void pgoutput_sequence(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
Relation relation, bool transactional,
int64 last_value, int64 log_cnt, bool is_called);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
@@ -208,6 +213,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->change_cb = pgoutput_change;
cb->truncate_cb = pgoutput_truncate;
cb->message_cb = pgoutput_message;
cb->sequence_cb = pgoutput_sequence;
cb->commit_cb = pgoutput_commit_txn;
cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
@@ -224,6 +230,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_commit_cb = pgoutput_stream_commit;
cb->stream_change_cb = pgoutput_change;
cb->stream_message_cb = pgoutput_message;
cb->stream_sequence_cb = pgoutput_sequence;
cb->stream_truncate_cb = pgoutput_truncate;
/* transaction streaming - two-phase commit */
cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
@@ -237,6 +244,7 @@ parse_output_parameters(List *options, PGOutputData *data)
bool publication_names_given = false;
bool binary_option_given = false;
bool messages_option_given = false;
bool sequences_option_given = false;
bool streaming_given = false;
bool two_phase_option_given = false;
@@ -244,6 +252,7 @@ parse_output_parameters(List *options, PGOutputData *data)
data->streaming = false;
data->messages = false;
data->two_phase = false;
data->sequences = true;
foreach(lc, options)
{
@@ -312,6 +321,16 @@ parse_output_parameters(List *options, PGOutputData *data)
data->messages = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "sequences") == 0)
{
if (sequences_option_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
sequences_option_given = true;
data->sequences = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "streaming") == 0)
{
if (streaming_given)
@@ -1440,6 +1459,51 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
static void
pgoutput_sequence(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
Relation relation, bool transactional,
int64 last_value, int64 log_cnt, bool is_called)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
TransactionId xid = InvalidTransactionId;
RelationSyncEntry *relentry;
if (!data->sequences)
return;
if (!is_publishable_relation(relation))
return;
/*
* Remember the xid for the message in streaming mode. See
* pgoutput_change.
*/
if (in_streaming)
xid = txn->xid;
relentry = get_rel_sync_entry(data, relation);
/*
* First check the sequence filter.
*
* We handle just REORDER_BUFFER_CHANGE_SEQUENCE here.
*/
if (!relentry->pubactions.pubsequence)
return;
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_sequence(ctx->out,
relation,
xid,
sequence_lsn,
transactional,
last_value,
log_cnt,
is_called);
OutputPluginWrite(ctx, true);
}
/*
* Currently we always forward.
*/
@@ -1725,7 +1789,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->schema_sent = false;
entry->streamed_txns = NIL;
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
entry->pubactions.pubdelete = entry->pubactions.pubtruncate =
entry->pubactions.pubsequence = false;
entry->new_slot = NULL;
entry->old_slot = NULL;
memset(entry->exprstate, 0, sizeof(entry->exprstate));
@@ -1739,13 +1804,13 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
{
Oid schemaId = get_rel_namespace(relid);
List *pubids = GetRelationPublications(relid);
char objectType = pub_get_object_type_for_relkind(get_rel_relkind(relid));
/*
* We don't acquire a lock on the namespace system table as we build
* the cache entry using a historic snapshot and all the later changes
* are absorbed while decoding WAL.
*/
List *schemaPubids = GetSchemaPublications(schemaId);
List *schemaPubids = GetSchemaPublications(schemaId, objectType);
ListCell *lc;
Oid publish_as_relid = relid;
int publish_ancestor_level = 0;
@@ -1780,6 +1845,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->pubactions.pubupdate = false;
entry->pubactions.pubdelete = false;
entry->pubactions.pubtruncate = false;
entry->pubactions.pubsequence = false;
/*
* Tuple slots cleanups. (Will be rebuilt later if needed).
@@ -1826,9 +1892,11 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
/*
* If this is a FOR ALL TABLES publication, pick the partition root
* and set the ancestor level accordingly.
* and set the ancestor level accordingly. If this is a FOR ALL
* SEQUENCES publication, we publish it too but we don't need to
* pick the partition root etc.
*/
if (pub->alltables)
if (pub->alltables || pub->allsequences)
{
publish = true;
if (pub->pubviaroot && am_partition)
@@ -1889,6 +1957,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
entry->pubactions.pubsequence |= pub->pubactions.pubsequence;
/*
* We want to publish the changes as the top-most ancestor