1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-26 12:21:12 +03:00

Logical replication support for TRUNCATE

Update the built-in logical replication system to make use of the
previously added logical decoding for TRUNCATE support.  Add the
required truncate callback to pgoutput and a new logical replication
protocol message.

Publications get a new attribute to determine whether to replicate
truncate actions.  When updating a publication via pg_dump from an older
version, this is not set, thus preserving the previous behavior.

Author: Simon Riggs <simon@2ndquadrant.com>
Author: Marco Nenciarini <marco.nenciarini@2ndquadrant.it>
Author: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
This commit is contained in:
Peter Eisentraut
2018-04-07 11:24:53 -04:00
parent 5dfd1e5a66
commit 039eb6e92f
19 changed files with 572 additions and 111 deletions

View File

@ -39,6 +39,9 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
static void pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, Relation rel,
ReorderBufferChange *change);
static void pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, int nrelations, Relation relations[],
ReorderBufferChange *change);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
@ -77,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->startup_cb = pgoutput_startup;
cb->begin_cb = pgoutput_begin_txn;
cb->change_cb = pgoutput_change;
cb->truncate_cb = pgoutput_truncate;
cb->commit_cb = pgoutput_commit_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown;
@ -250,6 +254,46 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
/*
* Write the relation schema if the current schema hasn't been sent yet.
*/
static void
maybe_send_schema(LogicalDecodingContext *ctx,
Relation relation, RelationSyncEntry *relentry)
{
if (!relentry->schema_sent)
{
TupleDesc desc;
int i;
desc = RelationGetDescr(relation);
/*
* Write out type info if needed. We do that only for user created
* types.
*/
for (i = 0; i < desc->natts; i++)
{
Form_pg_attribute att = TupleDescAttr(desc, i);
if (att->attisdropped)
continue;
if (att->atttypid < FirstNormalObjectId)
continue;
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_typ(ctx->out, att->atttypid);
OutputPluginWrite(ctx, false);
}
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_rel(ctx->out, relation);
OutputPluginWrite(ctx, false);
relentry->schema_sent = true;
}
}
/*
* Sends the decoded DML over wire.
*/
@ -288,40 +332,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
/*
* Write the relation schema if the current schema haven't been sent yet.
*/
if (!relentry->schema_sent)
{
TupleDesc desc;
int i;
desc = RelationGetDescr(relation);
/*
* Write out type info if needed. We do that only for user created
* types.
*/
for (i = 0; i < desc->natts; i++)
{
Form_pg_attribute att = TupleDescAttr(desc, i);
if (att->attisdropped)
continue;
if (att->atttypid < FirstNormalObjectId)
continue;
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_typ(ctx->out, att->atttypid);
OutputPluginWrite(ctx, false);
}
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_rel(ctx->out, relation);
OutputPluginWrite(ctx, false);
relentry->schema_sent = true;
}
maybe_send_schema(ctx, relation, relentry);
/* Send the data */
switch (change->action)
@ -363,6 +374,51 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
MemoryContextReset(data->context);
}
static void
pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
MemoryContext old;
RelationSyncEntry *relentry;
int i;
int nrelids;
Oid *relids;
old = MemoryContextSwitchTo(data->context);
relids = palloc0(nrelations * sizeof(Oid));
nrelids = 0;
for (i = 0; i < nrelations; i++)
{
Relation relation = relations[i];
Oid relid = RelationGetRelid(relation);
if (!is_publishable_relation(relation))
continue;
relentry = get_rel_sync_entry(data, relid);
if (!relentry->pubactions.pubtruncate)
continue;
relids[nrelids++] = relid;
maybe_send_schema(ctx, relation, relentry);
}
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_truncate(ctx->out,
nrelids,
relids,
change->data.truncate.cascade,
change->data.truncate.restart_seqs);
OutputPluginWrite(ctx, true);
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}
/*
* Currently we always forward.
*/
@ -504,7 +560,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
* we only need to consider ones that the subscriber requested.
*/
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
entry->pubactions.pubdelete = false;
entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
foreach(lc, data->publications)
{
@ -515,10 +571,11 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
}
if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
entry->pubactions.pubdelete)
entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
break;
}