mirror of
https://github.com/postgres/postgres.git
synced 2025-10-24 01:29:19 +03:00
Simplify determining logical replication worker types.
We deduce a LogicalRepWorker's type from the values of several different fields ('relid' and 'leader_pid') whenever logic needs to know it. In fact, the logical replication worker type is already known at the time of launching the LogicalRepWorker and it never changes for the lifetime of that process. Instead of deducing the type, it is simpler to just store it one time, and access it directly thereafter. Author: Peter Smith Reviewed-by: Amit Kapila, Bharath Rupireddy Discussion: http://postgr.es/m/CAHut+PttPSuP0yoZ=9zLDXKqTJ=d0bhxwKaEaNcaym1XqcvDEg@mail.gmail.com
This commit is contained in:
@@ -435,7 +435,8 @@ pa_launch_parallel_worker(void)
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
|
launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
|
||||||
|
MyLogicalRepWorker->dbid,
|
||||||
MySubscription->oid,
|
MySubscription->oid,
|
||||||
MySubscription->name,
|
MySubscription->name,
|
||||||
MyLogicalRepWorker->userid,
|
MyLogicalRepWorker->userid,
|
||||||
|
@@ -303,7 +303,8 @@ logicalrep_workers_find(Oid subid, bool only_running)
|
|||||||
* Returns true on success, false on failure.
|
* Returns true on success, false on failure.
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
|
logicalrep_worker_launch(LogicalRepWorkerType wtype,
|
||||||
|
Oid dbid, Oid subid, const char *subname, Oid userid,
|
||||||
Oid relid, dsm_handle subworker_dsm)
|
Oid relid, dsm_handle subworker_dsm)
|
||||||
{
|
{
|
||||||
BackgroundWorker bgw;
|
BackgroundWorker bgw;
|
||||||
@@ -315,10 +316,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
|
|||||||
int nsyncworkers;
|
int nsyncworkers;
|
||||||
int nparallelapplyworkers;
|
int nparallelapplyworkers;
|
||||||
TimestampTz now;
|
TimestampTz now;
|
||||||
bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
|
bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
|
||||||
|
bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
|
||||||
|
|
||||||
/* Sanity check - tablesync worker cannot be a subworker */
|
/*----------
|
||||||
Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
|
* Sanity checks:
|
||||||
|
* - must be valid worker type
|
||||||
|
* - tablesync workers are only ones to have relid
|
||||||
|
* - parallel apply worker is the only kind of subworker
|
||||||
|
*/
|
||||||
|
Assert(wtype != WORKERTYPE_UNKNOWN);
|
||||||
|
Assert(is_tablesync_worker == OidIsValid(relid));
|
||||||
|
Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
|
||||||
|
|
||||||
ereport(DEBUG1,
|
ereport(DEBUG1,
|
||||||
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
|
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
|
||||||
@@ -393,7 +402,7 @@ retry:
|
|||||||
* sync worker limit per subscription. So, just return silently as we
|
* sync worker limit per subscription. So, just return silently as we
|
||||||
* might get here because of an otherwise harmless race condition.
|
* might get here because of an otherwise harmless race condition.
|
||||||
*/
|
*/
|
||||||
if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
|
if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
|
||||||
{
|
{
|
||||||
LWLockRelease(LogicalRepWorkerLock);
|
LWLockRelease(LogicalRepWorkerLock);
|
||||||
return false;
|
return false;
|
||||||
@@ -427,6 +436,7 @@ retry:
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Prepare the worker slot. */
|
/* Prepare the worker slot. */
|
||||||
|
worker->type = wtype;
|
||||||
worker->launch_time = now;
|
worker->launch_time = now;
|
||||||
worker->in_use = true;
|
worker->in_use = true;
|
||||||
worker->generation++;
|
worker->generation++;
|
||||||
@@ -466,7 +476,7 @@ retry:
|
|||||||
subid);
|
subid);
|
||||||
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
|
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
|
||||||
}
|
}
|
||||||
else if (OidIsValid(relid))
|
else if (is_tablesync_worker)
|
||||||
{
|
{
|
||||||
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
|
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
|
||||||
snprintf(bgw.bgw_name, BGW_MAXLEN,
|
snprintf(bgw.bgw_name, BGW_MAXLEN,
|
||||||
@@ -847,7 +857,7 @@ logicalrep_sync_worker_count(Oid subid)
|
|||||||
{
|
{
|
||||||
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
|
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
|
||||||
|
|
||||||
if (w->subid == subid && OidIsValid(w->relid))
|
if (w->subid == subid && isTablesyncWorker(w))
|
||||||
res++;
|
res++;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1180,7 +1190,8 @@ ApplyLauncherMain(Datum main_arg)
|
|||||||
(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
|
(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
|
||||||
{
|
{
|
||||||
ApplyLauncherSetWorkerStartTime(sub->oid, now);
|
ApplyLauncherSetWorkerStartTime(sub->oid, now);
|
||||||
logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
|
logicalrep_worker_launch(WORKERTYPE_APPLY,
|
||||||
|
sub->dbid, sub->oid, sub->name,
|
||||||
sub->owner, InvalidOid,
|
sub->owner, InvalidOid,
|
||||||
DSM_HANDLE_INVALID);
|
DSM_HANDLE_INVALID);
|
||||||
}
|
}
|
||||||
@@ -1290,7 +1301,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
|
|||||||
worker_pid = worker.proc->pid;
|
worker_pid = worker.proc->pid;
|
||||||
|
|
||||||
values[0] = ObjectIdGetDatum(worker.subid);
|
values[0] = ObjectIdGetDatum(worker.subid);
|
||||||
if (OidIsValid(worker.relid))
|
if (isTablesyncWorker(&worker))
|
||||||
values[1] = ObjectIdGetDatum(worker.relid);
|
values[1] = ObjectIdGetDatum(worker.relid);
|
||||||
else
|
else
|
||||||
nulls[1] = true;
|
nulls[1] = true;
|
||||||
|
@@ -587,7 +587,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
|
|||||||
TimestampDifferenceExceeds(hentry->last_start_time, now,
|
TimestampDifferenceExceeds(hentry->last_start_time, now,
|
||||||
wal_retrieve_retry_interval))
|
wal_retrieve_retry_interval))
|
||||||
{
|
{
|
||||||
logicalrep_worker_launch(MyLogicalRepWorker->dbid,
|
logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
|
||||||
|
MyLogicalRepWorker->dbid,
|
||||||
MySubscription->oid,
|
MySubscription->oid,
|
||||||
MySubscription->name,
|
MySubscription->name,
|
||||||
MyLogicalRepWorker->userid,
|
MyLogicalRepWorker->userid,
|
||||||
|
@@ -27,9 +27,20 @@
|
|||||||
#include "storage/shm_toc.h"
|
#include "storage/shm_toc.h"
|
||||||
#include "storage/spin.h"
|
#include "storage/spin.h"
|
||||||
|
|
||||||
|
/* Different types of worker */
|
||||||
|
typedef enum LogicalRepWorkerType
|
||||||
|
{
|
||||||
|
WORKERTYPE_UNKNOWN = 0,
|
||||||
|
WORKERTYPE_TABLESYNC,
|
||||||
|
WORKERTYPE_APPLY,
|
||||||
|
WORKERTYPE_PARALLEL_APPLY
|
||||||
|
} LogicalRepWorkerType;
|
||||||
|
|
||||||
typedef struct LogicalRepWorker
|
typedef struct LogicalRepWorker
|
||||||
{
|
{
|
||||||
|
/* What type of worker is this? */
|
||||||
|
LogicalRepWorkerType type;
|
||||||
|
|
||||||
/* Time at which this worker was launched. */
|
/* Time at which this worker was launched. */
|
||||||
TimestampTz launch_time;
|
TimestampTz launch_time;
|
||||||
|
|
||||||
@@ -232,7 +243,8 @@ extern void logicalrep_worker_attach(int slot);
|
|||||||
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
|
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
|
||||||
bool only_running);
|
bool only_running);
|
||||||
extern List *logicalrep_workers_find(Oid subid, bool only_running);
|
extern List *logicalrep_workers_find(Oid subid, bool only_running);
|
||||||
extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
|
extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
|
||||||
|
Oid dbid, Oid subid, const char *subname,
|
||||||
Oid userid, Oid relid,
|
Oid userid, Oid relid,
|
||||||
dsm_handle subworker_dsm);
|
dsm_handle subworker_dsm);
|
||||||
extern void logicalrep_worker_stop(Oid subid, Oid relid);
|
extern void logicalrep_worker_stop(Oid subid, Oid relid);
|
||||||
@@ -315,19 +327,19 @@ extern void pa_decr_and_wait_stream_block(void);
|
|||||||
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
|
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
|
||||||
XLogRecPtr remote_lsn);
|
XLogRecPtr remote_lsn);
|
||||||
|
|
||||||
#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
|
#define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY)
|
||||||
|
#define isTablesyncWorker(worker) ((worker)->type == WORKERTYPE_TABLESYNC)
|
||||||
|
|
||||||
static inline bool
|
static inline bool
|
||||||
am_tablesync_worker(void)
|
am_tablesync_worker(void)
|
||||||
{
|
{
|
||||||
return OidIsValid(MyLogicalRepWorker->relid);
|
return isTablesyncWorker(MyLogicalRepWorker);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline bool
|
static inline bool
|
||||||
am_leader_apply_worker(void)
|
am_leader_apply_worker(void)
|
||||||
{
|
{
|
||||||
return (!am_tablesync_worker() &&
|
return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
|
||||||
!isParallelApplyWorker(MyLogicalRepWorker));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline bool
|
static inline bool
|
||||||
|
@@ -1500,6 +1500,7 @@ LogicalRepStreamAbortData
|
|||||||
LogicalRepTupleData
|
LogicalRepTupleData
|
||||||
LogicalRepTyp
|
LogicalRepTyp
|
||||||
LogicalRepWorker
|
LogicalRepWorker
|
||||||
|
LogicalRepWorkerType
|
||||||
LogicalRewriteMappingData
|
LogicalRewriteMappingData
|
||||||
LogicalTape
|
LogicalTape
|
||||||
LogicalTapeSet
|
LogicalTapeSet
|
||||||
|
Reference in New Issue
Block a user