mirror of
https://github.com/postgres/postgres.git
synced 2025-05-08 07:21:33 +03:00
Remove the unnecessary PrepareWrite in pgoutput.
This issue exists from the inception of this code (PG-10) but got exposed by the recent commit ce0fdbfe97 where we are using origins in tablesync workers. The problem was that we were sometimes sending the prepare_write ('w') message but then the actual message was not being sent and on the subscriber side, we always expect a message after prepare_write message which led to this bug. I refrained from backpatching this because there is no way in the core code to hit this prior to commit ce0fdbfe97 and we haven't received any complaints so far. Reported-by: Erik Rijkers Author: Amit Kapila and Vignesh C Tested-by: Erik Rijkers Discussion: https://postgr.es/m/1295168140.139428.1613133237154@webmailclassic.xs4all.nl
This commit is contained in:
parent
8001cb77ee
commit
f672df5fdd
@ -342,10 +342,6 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
|
|||||||
{
|
{
|
||||||
char *origin;
|
char *origin;
|
||||||
|
|
||||||
/* Message boundary */
|
|
||||||
OutputPluginWrite(ctx, false);
|
|
||||||
OutputPluginPrepareWrite(ctx, true);
|
|
||||||
|
|
||||||
/*----------
|
/*----------
|
||||||
* XXX: which behaviour do we want here?
|
* XXX: which behaviour do we want here?
|
||||||
*
|
*
|
||||||
@ -357,7 +353,13 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
|
|||||||
*----------
|
*----------
|
||||||
*/
|
*/
|
||||||
if (replorigin_by_oid(txn->origin_id, true, &origin))
|
if (replorigin_by_oid(txn->origin_id, true, &origin))
|
||||||
|
{
|
||||||
|
/* Message boundary */
|
||||||
|
OutputPluginWrite(ctx, false);
|
||||||
|
OutputPluginPrepareWrite(ctx, true);
|
||||||
logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
|
logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
OutputPluginWrite(ctx, true);
|
OutputPluginWrite(ctx, true);
|
||||||
@ -780,12 +782,13 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx,
|
|||||||
{
|
{
|
||||||
char *origin;
|
char *origin;
|
||||||
|
|
||||||
/* Message boundary */
|
|
||||||
OutputPluginWrite(ctx, false);
|
|
||||||
OutputPluginPrepareWrite(ctx, true);
|
|
||||||
|
|
||||||
if (replorigin_by_oid(txn->origin_id, true, &origin))
|
if (replorigin_by_oid(txn->origin_id, true, &origin))
|
||||||
|
{
|
||||||
|
/* Message boundary */
|
||||||
|
OutputPluginWrite(ctx, false);
|
||||||
|
OutputPluginPrepareWrite(ctx, true);
|
||||||
logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr);
|
logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
OutputPluginWrite(ctx, true);
|
OutputPluginWrite(ctx, true);
|
||||||
|
@ -153,3 +153,72 @@ is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"),
|
|||||||
$rows * 2, "2x$rows rows in t");
|
$rows * 2, "2x$rows rows in t");
|
||||||
is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t2"),
|
is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t2"),
|
||||||
$rows * 2, "2x$rows rows in t2");
|
$rows * 2, "2x$rows rows in t2");
|
||||||
|
|
||||||
|
# Verify table data is synced with cascaded replication setup. This is mainly
|
||||||
|
# to test whether the data written by tablesync worker gets replicated.
|
||||||
|
my $node_pub = get_new_node('testpublisher1');
|
||||||
|
$node_pub->init(allows_streaming => 'logical');
|
||||||
|
$node_pub->start;
|
||||||
|
|
||||||
|
my $node_pub_sub = get_new_node('testpublisher_subscriber');
|
||||||
|
$node_pub_sub->init(allows_streaming => 'logical');
|
||||||
|
$node_pub_sub->start;
|
||||||
|
|
||||||
|
my $node_sub = get_new_node('testsubscriber1');
|
||||||
|
$node_sub->init(allows_streaming => 'logical');
|
||||||
|
$node_sub->start;
|
||||||
|
|
||||||
|
# Create the tables in all nodes.
|
||||||
|
$node_pub->safe_psql('postgres', "CREATE TABLE tab1 (a int)");
|
||||||
|
$node_pub_sub->safe_psql('postgres', "CREATE TABLE tab1 (a int)");
|
||||||
|
$node_sub->safe_psql('postgres', "CREATE TABLE tab1 (a int)");
|
||||||
|
|
||||||
|
# Create a cascaded replication setup like:
|
||||||
|
# N1 - Create publication testpub1.
|
||||||
|
# N2 - Create publication testpub2 and also include subscriber which subscribes
|
||||||
|
# to testpub1.
|
||||||
|
# N3 - Create subscription testsub2 subscribes to testpub2.
|
||||||
|
#
|
||||||
|
# Note that subscription on N3 needs to be created before subscription on N2 to
|
||||||
|
# test whether the data written by tablesync worker of N2 gets replicated.
|
||||||
|
$node_pub->safe_psql('postgres',
|
||||||
|
"CREATE PUBLICATION testpub1 FOR TABLE tab1");
|
||||||
|
|
||||||
|
$node_pub_sub->safe_psql('postgres',
|
||||||
|
"CREATE PUBLICATION testpub2 FOR TABLE tab1");
|
||||||
|
|
||||||
|
my $publisher1_connstr = $node_pub->connstr . ' dbname=postgres';
|
||||||
|
my $publisher2_connstr = $node_pub_sub->connstr . ' dbname=postgres';
|
||||||
|
|
||||||
|
$node_sub->safe_psql('postgres',
|
||||||
|
"CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher2_connstr' PUBLICATION testpub2"
|
||||||
|
);
|
||||||
|
|
||||||
|
$node_pub_sub->safe_psql('postgres',
|
||||||
|
"CREATE SUBSCRIPTION testsub1 CONNECTION '$publisher1_connstr' PUBLICATION testpub1"
|
||||||
|
);
|
||||||
|
|
||||||
|
$node_pub->safe_psql('postgres',
|
||||||
|
"INSERT INTO tab1 values(generate_series(1,10))");
|
||||||
|
|
||||||
|
# Verify that the data is cascaded from testpub1 to testsub1 and further from
|
||||||
|
# testpub2 (which had testsub1) to testsub2.
|
||||||
|
$node_pub->wait_for_catchup('testsub1');
|
||||||
|
$node_pub_sub->wait_for_catchup('testsub2');
|
||||||
|
|
||||||
|
# Drop subscriptions as we don't need them anymore
|
||||||
|
$node_pub_sub->safe_psql('postgres', "DROP SUBSCRIPTION testsub1");
|
||||||
|
$node_sub->safe_psql('postgres', "DROP SUBSCRIPTION testsub2");
|
||||||
|
|
||||||
|
# Drop publications as we don't need them anymore
|
||||||
|
$node_pub->safe_psql('postgres', "DROP PUBLICATION testpub1");
|
||||||
|
$node_pub_sub->safe_psql('postgres', "DROP PUBLICATION testpub2");
|
||||||
|
|
||||||
|
# Clean up the tables on both publisher and subscriber as we don't need them
|
||||||
|
$node_pub->safe_psql('postgres', "DROP TABLE tab1");
|
||||||
|
$node_pub_sub->safe_psql('postgres', "DROP TABLE tab1");
|
||||||
|
$node_sub->safe_psql('postgres', "DROP TABLE tab1");
|
||||||
|
|
||||||
|
$node_pub->stop('fast');
|
||||||
|
$node_pub_sub->stop('fast');
|
||||||
|
$node_sub->stop('fast');
|
||||||
|
Loading…
x
Reference in New Issue
Block a user