diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4dbffea240a..c1b5ad35deb 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -799,7 +799,7 @@ StartReplication(StartReplicationCmd *cmd) } /* Send CommandComplete message */ - pq_puttextmessage('C', "START_STREAMING"); + EndReplicationCommand("START_STREAMING"); } /* @@ -1122,11 +1122,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) static void DropReplicationSlot(DropReplicationSlotCmd *cmd) { - QueryCompletion qc; - ReplicationSlotDrop(cmd->slotname, !cmd->wait); - SetQueryCompletion(&qc, CMDTAG_DROP_REPLICATION_SLOT, 0); - EndCommand(&qc, DestRemote, false); } /* @@ -1517,9 +1513,9 @@ exec_replication_command(const char *cmd_string) { int parse_rc; Node *cmd_node; + const char *cmdtag; MemoryContext cmd_context; MemoryContext old_context; - QueryCompletion qc; /* * If WAL sender has been told that shutdown is getting close, switch its @@ -1619,40 +1615,53 @@ exec_replication_command(const char *cmd_string) switch (cmd_node->type) { case T_IdentifySystemCmd: + cmdtag = "IDENTIFY_SYSTEM"; IdentifySystem(); + EndReplicationCommand(cmdtag); break; case T_BaseBackupCmd: - PreventInTransactionBlock(true, "BASE_BACKUP"); + cmdtag = "BASE_BACKUP"; + PreventInTransactionBlock(true, cmdtag); SendBaseBackup((BaseBackupCmd *) cmd_node); + EndReplicationCommand(cmdtag); break; case T_CreateReplicationSlotCmd: + cmdtag = "CREATE_REPLICATION_SLOT"; CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node); + EndReplicationCommand(cmdtag); break; case T_DropReplicationSlotCmd: + cmdtag = "DROP_REPLICATION_SLOT"; DropReplicationSlot((DropReplicationSlotCmd *) cmd_node); + EndReplicationCommand(cmdtag); break; case T_StartReplicationCmd: { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; - PreventInTransactionBlock(true, "START_REPLICATION"); + cmdtag = "START_REPLICATION"; + PreventInTransactionBlock(true, cmdtag); if (cmd->kind == REPLICATION_KIND_PHYSICAL) StartReplication(cmd); else StartLogicalReplication(cmd); + /* callees already sent their own completion message */ + Assert(xlogreader != NULL); break; } case T_TimeLineHistoryCmd: - PreventInTransactionBlock(true, "TIMELINE_HISTORY"); + cmdtag = "TIMELINE_HISTORY"; + PreventInTransactionBlock(true, cmdtag); SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node); + EndReplicationCommand(cmdtag); break; case T_VariableShowStmt: @@ -1660,10 +1669,13 @@ exec_replication_command(const char *cmd_string) DestReceiver *dest = CreateDestReceiver(DestRemoteSimple); VariableShowStmt *n = (VariableShowStmt *) cmd_node; + cmdtag = "SHOW"; + /* syscache access needs a transaction environment */ StartTransactionCommand(); GetPGVariable(n->name, dest); CommitTransactionCommand(); + EndReplicationCommand(cmdtag); } break; @@ -1676,10 +1688,6 @@ exec_replication_command(const char *cmd_string) MemoryContextSwitchTo(old_context); MemoryContextDelete(cmd_context); - /* Send CommandComplete message */ - SetQueryCompletion(&qc, CMDTAG_SELECT, 0); - EndCommand(&qc, DestRemote, true); - /* Report to pgstat that this process is now idle */ pgstat_report_activity(STATE_IDLE, NULL); debug_query_string = NULL; diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c index 7208751ec78..96789f88ef9 100644 --- a/src/backend/tcop/dest.c +++ b/src/backend/tcop/dest.c @@ -211,6 +211,18 @@ EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_o } } +/* ---------------- + * EndReplicationCommand - stripped down version of EndCommand + * + * For use by replication commands. + * ---------------- + */ +void +EndReplicationCommand(const char *commandTag) +{ + pq_putmessage('C', commandTag, strlen(commandTag) + 1); +} + /* ---------------- * NullCommand - tell dest that an empty query string was recognized * diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h index 8ef0f55e748..be94852bbd3 100644 --- a/src/include/tcop/cmdtaglist.h +++ b/src/include/tcop/cmdtaglist.h @@ -157,7 +157,6 @@ PG_CMDTAG(CMDTAG_DROP_OWNED, "DROP OWNED", true, false, false) PG_CMDTAG(CMDTAG_DROP_POLICY, "DROP POLICY", true, false, false) PG_CMDTAG(CMDTAG_DROP_PROCEDURE, "DROP PROCEDURE", true, false, false) PG_CMDTAG(CMDTAG_DROP_PUBLICATION, "DROP PUBLICATION", true, false, false) -PG_CMDTAG(CMDTAG_DROP_REPLICATION_SLOT, "DROP REPLICATION SLOT", false, false, false) PG_CMDTAG(CMDTAG_DROP_ROLE, "DROP ROLE", false, false, false) PG_CMDTAG(CMDTAG_DROP_ROUTINE, "DROP ROUTINE", true, false, false) PG_CMDTAG(CMDTAG_DROP_RULE, "DROP RULE", true, false, false) diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h index 662ce8a56f8..2e07f1516d1 100644 --- a/src/include/tcop/dest.h +++ b/src/include/tcop/dest.h @@ -139,6 +139,7 @@ extern void BeginCommand(CommandTag commandTag, CommandDest dest); extern DestReceiver *CreateDestReceiver(CommandDest dest); extern void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output); +extern void EndReplicationCommand(const char *commandTag); /* Additional functions that go with destination management, more or less. */