mirror of
https://github.com/postgres/postgres.git
synced 2025-05-05 09:19:17 +03:00
In successful pg_recvlogical, end PGRES_COPY_OUT cleanly.
pg_recvlogical merely called PQfinish(), so the backend sent messages after the disconnect. When that caused EPIPE in internal_flush(), before a LogicalConfirmReceivedLocation(), the next pg_recvlogical would repeat already-acknowledged records. Whether or not the defect causes EPIPE, post-disconnect messages could contain an ErrorResponse that the user should see. One properly ends PGRES_COPY_OUT by repeating PQgetCopyData() until it returns a negative value. Augment one of the tests to cover the case of WAL past --endpos. Back-patch to v10, where commit 7c030783a5bd07cadffc2a1018bc33119a4c7505 first appeared. Before that commit, pg_recvlogical never reached PGRES_COPY_OUT. Reported by Thomas Munro. Discussion: https://postgr.es/m/CAEepm=1MzM2Z_xNe4foGwZ1a+MO_2S9oYDq3M5D11=JDU_+0Nw@mail.gmail.com
This commit is contained in:
parent
d439e27d5f
commit
357012e17f
@ -605,14 +605,40 @@ StreamLogicalLog(void)
|
|||||||
res = PQgetResult(conn);
|
res = PQgetResult(conn);
|
||||||
if (PQresultStatus(res) == PGRES_COPY_OUT)
|
if (PQresultStatus(res) == PGRES_COPY_OUT)
|
||||||
{
|
{
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We're doing a client-initiated clean exit and have sent CopyDone to
|
* We're doing a client-initiated clean exit and have sent CopyDone to
|
||||||
* the server. We've already sent replay confirmation and fsync'd so
|
* the server. Drain any messages, so we don't miss a last-minute
|
||||||
* we can just clean up the connection now.
|
* ErrorResponse. The walsender stops generating XLogData records once
|
||||||
|
* it sees CopyDone, so expect this to finish quickly. After CopyDone,
|
||||||
|
* it's too late for sendFeedback(), even if this were to take a long
|
||||||
|
* time. Hence, use synchronous-mode PQgetCopyData().
|
||||||
*/
|
*/
|
||||||
goto error;
|
while (1)
|
||||||
|
{
|
||||||
|
int r;
|
||||||
|
|
||||||
|
if (copybuf != NULL)
|
||||||
|
{
|
||||||
|
PQfreemem(copybuf);
|
||||||
|
copybuf = NULL;
|
||||||
|
}
|
||||||
|
r = PQgetCopyData(conn, ©buf, 0);
|
||||||
|
if (r == -1)
|
||||||
|
break;
|
||||||
|
if (r == -2)
|
||||||
|
{
|
||||||
|
fprintf(stderr, _("%s: could not read COPY data: %s"),
|
||||||
|
progname, PQerrorMessage(conn));
|
||||||
|
time_to_abort = false; /* unclean exit */
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
res = PQgetResult(conn);
|
||||||
}
|
}
|
||||||
else if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
fprintf(stderr,
|
fprintf(stderr,
|
||||||
_("%s: unexpected termination of replication stream: %s"),
|
_("%s: unexpected termination of replication stream: %s"),
|
||||||
|
@ -71,6 +71,11 @@ my $endpos = $node_master->safe_psql('postgres',
|
|||||||
);
|
);
|
||||||
print "waiting to replay $endpos\n";
|
print "waiting to replay $endpos\n";
|
||||||
|
|
||||||
|
# Insert some rows after $endpos, which we won't read.
|
||||||
|
$node_master->safe_psql('postgres',
|
||||||
|
qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,50) s;]
|
||||||
|
);
|
||||||
|
|
||||||
my $stdout_recv = $node_master->pg_recvlogical_upto(
|
my $stdout_recv = $node_master->pg_recvlogical_upto(
|
||||||
'postgres', 'test_slot', $endpos, 180,
|
'postgres', 'test_slot', $endpos, 180,
|
||||||
'include-xids' => '0',
|
'include-xids' => '0',
|
||||||
@ -89,7 +94,7 @@ $stdout_recv = $node_master->pg_recvlogical_upto(
|
|||||||
'skip-empty-xacts' => '1');
|
'skip-empty-xacts' => '1');
|
||||||
chomp($stdout_recv);
|
chomp($stdout_recv);
|
||||||
is($stdout_recv, '',
|
is($stdout_recv, '',
|
||||||
'pg_recvlogical acknowledged changes, nothing pending on slot');
|
'pg_recvlogical acknowledged changes');
|
||||||
|
|
||||||
$node_master->safe_psql('postgres', 'CREATE DATABASE otherdb');
|
$node_master->safe_psql('postgres', 'CREATE DATABASE otherdb');
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user