From 07082b08cc5d3c378d22c105c65841ec0952e3ed Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Wed, 16 Sep 2020 13:04:38 -0300 Subject: [PATCH] Fix bogus completion tag usage in walsender Since commit fd5942c18f97 (2012, 9.3-era), walsender has been sending completion tags for certain replication commands twice -- and they're not even consistent. Apparently neither libpq nor JDBC have a problem with it, but it's not kosher. Fix by remove the EndCommand() call in the common code path for them all, and inserting specific calls to EndReplicationCommand() specifically in those places where it's needed. EndReplicationCommand() is a new simple function to send the completion tag for replication commands. Do this instead of sending a generic SELECT completion tag for them all, which was also pretty bogus (if innocuous). While at it, change StartReplication() to use EndReplicationCommand() instead of pg_puttextmessage(). In commit 2f9661311b83, I failed to realize that replication commands are not close-enough kin of regular SQL commands, so the DROP_REPLICATION_SLOT tag I added is undeserved and a type pun. Take it out. Backpatch to 13, where the latter commit appeared. The duplicate tag has been sent since 9.3, but since nothing is broken, it doesn't seem worth fixing. Per complaints from Tom Lane. Discussion: https://postgr.es/m/1347966.1600195735@sss.pgh.pa.us --- src/backend/replication/walsender.c | 34 ++++++++++++++++++----------- src/backend/tcop/dest.c | 12 ++++++++++ src/include/tcop/cmdtaglist.h | 1 - src/include/tcop/dest.h | 1 + 4 files changed, 34 insertions(+), 14 deletions(-) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4dbffea240a..c1b5ad35deb 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -799,7 +799,7 @@ StartReplication(StartReplicationCmd *cmd) } /* Send CommandComplete message */ - pq_puttextmessage('C', "START_STREAMING"); + EndReplicationCommand("START_STREAMING"); } /* @@ -1122,11 +1122,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) static void DropReplicationSlot(DropReplicationSlotCmd *cmd) { - QueryCompletion qc; - ReplicationSlotDrop(cmd->slotname, !cmd->wait); - SetQueryCompletion(&qc, CMDTAG_DROP_REPLICATION_SLOT, 0); - EndCommand(&qc, DestRemote, false); } /* @@ -1517,9 +1513,9 @@ exec_replication_command(const char *cmd_string) { int parse_rc; Node *cmd_node; + const char *cmdtag; MemoryContext cmd_context; MemoryContext old_context; - QueryCompletion qc; /* * If WAL sender has been told that shutdown is getting close, switch its @@ -1619,40 +1615,53 @@ exec_replication_command(const char *cmd_string) switch (cmd_node->type) { case T_IdentifySystemCmd: + cmdtag = "IDENTIFY_SYSTEM"; IdentifySystem(); + EndReplicationCommand(cmdtag); break; case T_BaseBackupCmd: - PreventInTransactionBlock(true, "BASE_BACKUP"); + cmdtag = "BASE_BACKUP"; + PreventInTransactionBlock(true, cmdtag); SendBaseBackup((BaseBackupCmd *) cmd_node); + EndReplicationCommand(cmdtag); break; case T_CreateReplicationSlotCmd: + cmdtag = "CREATE_REPLICATION_SLOT"; CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node); + EndReplicationCommand(cmdtag); break; case T_DropReplicationSlotCmd: + cmdtag = "DROP_REPLICATION_SLOT"; DropReplicationSlot((DropReplicationSlotCmd *) cmd_node); + EndReplicationCommand(cmdtag); break; case T_StartReplicationCmd: { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; - PreventInTransactionBlock(true, "START_REPLICATION"); + cmdtag = "START_REPLICATION"; + PreventInTransactionBlock(true, cmdtag); if (cmd->kind == REPLICATION_KIND_PHYSICAL) StartReplication(cmd); else StartLogicalReplication(cmd); + /* callees already sent their own completion message */ + Assert(xlogreader != NULL); break; } case T_TimeLineHistoryCmd: - PreventInTransactionBlock(true, "TIMELINE_HISTORY"); + cmdtag = "TIMELINE_HISTORY"; + PreventInTransactionBlock(true, cmdtag); SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node); + EndReplicationCommand(cmdtag); break; case T_VariableShowStmt: @@ -1660,10 +1669,13 @@ exec_replication_command(const char *cmd_string) DestReceiver *dest = CreateDestReceiver(DestRemoteSimple); VariableShowStmt *n = (VariableShowStmt *) cmd_node; + cmdtag = "SHOW"; + /* syscache access needs a transaction environment */ StartTransactionCommand(); GetPGVariable(n->name, dest); CommitTransactionCommand(); + EndReplicationCommand(cmdtag); } break; @@ -1676,10 +1688,6 @@ exec_replication_command(const char *cmd_string) MemoryContextSwitchTo(old_context); MemoryContextDelete(cmd_context); - /* Send CommandComplete message */ - SetQueryCompletion(&qc, CMDTAG_SELECT, 0); - EndCommand(&qc, DestRemote, true); - /* Report to pgstat that this process is now idle */ pgstat_report_activity(STATE_IDLE, NULL); debug_query_string = NULL; diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c index 7208751ec78..96789f88ef9 100644 --- a/src/backend/tcop/dest.c +++ b/src/backend/tcop/dest.c @@ -211,6 +211,18 @@ EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_o } } +/* ---------------- + * EndReplicationCommand - stripped down version of EndCommand + * + * For use by replication commands. + * ---------------- + */ +void +EndReplicationCommand(const char *commandTag) +{ + pq_putmessage('C', commandTag, strlen(commandTag) + 1); +} + /* ---------------- * NullCommand - tell dest that an empty query string was recognized * diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h index 8ef0f55e748..be94852bbd3 100644 --- a/src/include/tcop/cmdtaglist.h +++ b/src/include/tcop/cmdtaglist.h @@ -157,7 +157,6 @@ PG_CMDTAG(CMDTAG_DROP_OWNED, "DROP OWNED", true, false, false) PG_CMDTAG(CMDTAG_DROP_POLICY, "DROP POLICY", true, false, false) PG_CMDTAG(CMDTAG_DROP_PROCEDURE, "DROP PROCEDURE", true, false, false) PG_CMDTAG(CMDTAG_DROP_PUBLICATION, "DROP PUBLICATION", true, false, false) -PG_CMDTAG(CMDTAG_DROP_REPLICATION_SLOT, "DROP REPLICATION SLOT", false, false, false) PG_CMDTAG(CMDTAG_DROP_ROLE, "DROP ROLE", false, false, false) PG_CMDTAG(CMDTAG_DROP_ROUTINE, "DROP ROUTINE", true, false, false) PG_CMDTAG(CMDTAG_DROP_RULE, "DROP RULE", true, false, false) diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h index 662ce8a56f8..2e07f1516d1 100644 --- a/src/include/tcop/dest.h +++ b/src/include/tcop/dest.h @@ -139,6 +139,7 @@ extern void BeginCommand(CommandTag commandTag, CommandDest dest); extern DestReceiver *CreateDestReceiver(CommandDest dest); extern void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output); +extern void EndReplicationCommand(const char *commandTag); /* Additional functions that go with destination management, more or less. */