From 1cdc6d86bfc3bbe2189a4d9d54a4fa8b6c98ea0a Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Tue, 22 Aug 2023 08:44:09 +0530 Subject: [PATCH] Simplify the logical worker type checks by using the switch on worker type. The current code uses if/else statements at various places to take worker specific actions. Change those to use the switch on worker type added by commit 2a8b40e368. This makes code easier to read and understand. Author: Peter Smith Reviewed-by: Amit Kapila, Hou Zhijie Discussion: http://postgr.es/m/CAHut+PttPSuP0yoZ=9zLDXKqTJ=d0bhxwKaEaNcaym1XqcvDEg@mail.gmail.com --- src/backend/replication/logical/launcher.c | 57 +++++++++++---------- src/backend/replication/logical/tablesync.c | 33 ++++++++---- src/backend/replication/logical/worker.c | 41 +++++++++------ 3 files changed, 78 insertions(+), 53 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 7cc0a16d3bc..72e44d5a02d 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -468,39 +468,44 @@ retry: bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); - if (is_parallel_apply_worker) + switch (worker->type) { - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain"); - snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication parallel apply worker for subscription %u", - subid); - snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker"); - } - else if (is_tablesync_worker) - { - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); - snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication tablesync worker for subscription %u sync %u", - subid, - relid); - snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker"); - } - else - { - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); - snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication apply worker for subscription %u", - subid); - snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker"); + case WORKERTYPE_APPLY: + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication apply worker for subscription %u", + subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker"); + break; + + case WORKERTYPE_PARALLEL_APPLY: + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication parallel apply worker for subscription %u", + subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker"); + + memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle)); + break; + + case WORKERTYPE_TABLESYNC: + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication tablesync worker for subscription %u sync %u", + subid, + relid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker"); + break; + + case WORKERTYPE_UNKNOWN: + /* Should never happen. */ + elog(ERROR, "unknown worker type"); } bgw.bgw_restart_time = BGW_NEVER_RESTART; bgw.bgw_notify_pid = MyProcPid; bgw.bgw_main_arg = Int32GetDatum(slot); - if (is_parallel_apply_worker) - memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle)); - if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) { /* Failed to start worker, so clean up the worker slot. */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 67bdd14095e..e2cee92cf26 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -649,18 +649,29 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) void process_syncing_tables(XLogRecPtr current_lsn) { - /* - * Skip for parallel apply workers because they only operate on tables - * that are in a READY state. See pa_can_start() and - * should_apply_changes_for_rel(). - */ - if (am_parallel_apply_worker()) - return; + switch (MyLogicalRepWorker->type) + { + case WORKERTYPE_PARALLEL_APPLY: - if (am_tablesync_worker()) - process_syncing_tables_for_sync(current_lsn); - else - process_syncing_tables_for_apply(current_lsn); + /* + * Skip for parallel apply workers because they only operate on + * tables that are in a READY state. See pa_can_start() and + * should_apply_changes_for_rel(). + */ + break; + + case WORKERTYPE_TABLESYNC: + process_syncing_tables_for_sync(current_lsn); + break; + + case WORKERTYPE_APPLY: + process_syncing_tables_for_apply(current_lsn); + break; + + case WORKERTYPE_UNKNOWN: + /* Should never happen. */ + elog(ERROR, "Unknown worker type"); + } } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index a20d4c11716..597947410f8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -485,25 +485,34 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) { - if (am_tablesync_worker()) - return MyLogicalRepWorker->relid == rel->localreloid; - else if (am_parallel_apply_worker()) + switch (MyLogicalRepWorker->type) { - /* We don't synchronize rel's that are in unknown state. */ - if (rel->state != SUBREL_STATE_READY && - rel->state != SUBREL_STATE_UNKNOWN) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical replication parallel apply worker for subscription \"%s\" will stop", - MySubscription->name), - errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized."))); + case WORKERTYPE_TABLESYNC: + return MyLogicalRepWorker->relid == rel->localreloid; - return rel->state == SUBREL_STATE_READY; + case WORKERTYPE_PARALLEL_APPLY: + /* We don't synchronize rel's that are in unknown state. */ + if (rel->state != SUBREL_STATE_READY && + rel->state != SUBREL_STATE_UNKNOWN) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication parallel apply worker for subscription \"%s\" will stop", + MySubscription->name), + errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized."))); + + return rel->state == SUBREL_STATE_READY; + + case WORKERTYPE_APPLY: + return (rel->state == SUBREL_STATE_READY || + (rel->state == SUBREL_STATE_SYNCDONE && + rel->statelsn <= remote_final_lsn)); + + case WORKERTYPE_UNKNOWN: + /* Should never happen. */ + elog(ERROR, "Unknown worker type"); } - else - return (rel->state == SUBREL_STATE_READY || - (rel->state == SUBREL_STATE_SYNCDONE && - rel->statelsn <= remote_final_lsn)); + + return false; /* dummy for compiler */ } /*