diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 86a2b14807f..961110c94be 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -515,7 +515,7 @@ logicalrep_worker_stop(Oid subid, Oid relid) } /* - * Wake up (using latch) the logical replication worker. + * Wake up (using latch) any logical replication worker for specified sub/rel. */ void logicalrep_worker_wakeup(Oid subid, Oid relid) @@ -523,19 +523,25 @@ logicalrep_worker_wakeup(Oid subid, Oid relid) LogicalRepWorker *worker; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + worker = logicalrep_worker_find(subid, relid, true); - LWLockRelease(LogicalRepWorkerLock); if (worker) logicalrep_worker_wakeup_ptr(worker); + + LWLockRelease(LogicalRepWorkerLock); } /* - * Wake up (using latch) the logical replication worker. + * Wake up (using latch) the specified logical replication worker. + * + * Caller must hold lock, else worker->proc could change under us. */ void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker) { + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + SetLatch(&worker->proc->procLatch); } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 3ef12dfd26a..32abf5b368a 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -212,8 +212,7 @@ wait_for_relation_state_change(Oid relid, char expected_state) * * Used when transitioning from SYNCWAIT state to CATCHUP. * - * Returns false if the apply worker has disappeared or the table state has been - * reset. + * Returns false if the apply worker has disappeared. */ static bool wait_for_worker_state_change(char expected_state) @@ -226,17 +225,30 @@ wait_for_worker_state_change(char expected_state) CHECK_FOR_INTERRUPTS(); - /* Bail if the apply has died. */ - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(MyLogicalRepWorker->subid, - InvalidOid, false); - LWLockRelease(LogicalRepWorkerLock); - if (!worker) - return false; - + /* + * Done if already in correct state. (We assume this fetch is atomic + * enough to not give a misleading answer if we do it with no lock.) + */ if (MyLogicalRepWorker->relstate == expected_state) return true; + /* + * Bail out if the apply worker has died, else signal it we're + * waiting. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + worker = logicalrep_worker_find(MyLogicalRepWorker->subid, + InvalidOid, false); + if (worker && worker->proc) + logicalrep_worker_wakeup_ptr(worker); + LWLockRelease(LogicalRepWorkerLock); + if (!worker) + break; + + /* + * Wait. We expect to get a latch signal back from the apply worker, + * but use a timeout in case it dies without sending one. + */ rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); @@ -245,7 +257,8 @@ wait_for_worker_state_change(char expected_state) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(MyLatch); + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); } return false; @@ -422,83 +435,96 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) else { LogicalRepWorker *syncworker; - int nsyncworkers = 0; + /* + * Look for a sync worker for this relation. + */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, rstate->relid, false); + if (syncworker) { + /* Found one, update our copy of its state */ SpinLockAcquire(&syncworker->relmutex); rstate->state = syncworker->relstate; rstate->lsn = syncworker->relstate_lsn; + if (rstate->state == SUBREL_STATE_SYNCWAIT) + { + /* + * Sync worker is waiting for apply. Tell sync worker it + * can catchup now. + */ + syncworker->relstate = SUBREL_STATE_CATCHUP; + syncworker->relstate_lsn = + Max(syncworker->relstate_lsn, current_lsn); + } SpinLockRelease(&syncworker->relmutex); + + /* If we told worker to catch up, wait for it. */ + if (rstate->state == SUBREL_STATE_SYNCWAIT) + { + /* Signal the sync worker, as it may be waiting for us. */ + if (syncworker->proc) + logicalrep_worker_wakeup_ptr(syncworker); + + /* Now safe to release the LWLock */ + LWLockRelease(LogicalRepWorkerLock); + + /* + * Enter busy loop and wait for synchronization worker to + * reach expected state (or die trying). + */ + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } + + wait_for_relation_state_change(rstate->relid, + SUBREL_STATE_SYNCDONE); + } + else + LWLockRelease(LogicalRepWorkerLock); } else - + { /* * If there is no sync worker for this table yet, count * running sync workers for this subscription, while we have - * the lock, for later. + * the lock. */ - nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); - LWLockRelease(LogicalRepWorkerLock); + int nsyncworkers = + logicalrep_sync_worker_count(MyLogicalRepWorker->subid); - /* - * There is a worker synchronizing the relation and waiting for - * apply to do something. - */ - if (syncworker && rstate->state == SUBREL_STATE_SYNCWAIT) - { - /* - * Tell sync worker it can catchup now. We'll wait for it so - * it does not get lost. - */ - SpinLockAcquire(&syncworker->relmutex); - syncworker->relstate = SUBREL_STATE_CATCHUP; - syncworker->relstate_lsn = - Max(syncworker->relstate_lsn, current_lsn); - SpinLockRelease(&syncworker->relmutex); - - /* Signal the sync worker, as it may be waiting for us. */ - logicalrep_worker_wakeup_ptr(syncworker); + /* Now safe to release the LWLock */ + LWLockRelease(LogicalRepWorkerLock); /* - * Enter busy loop and wait for synchronization worker to - * reach expected state (or die trying). + * If there are free sync worker slot(s), start a new sync + * worker for the table. */ - if (!started_tx) + if (nsyncworkers < max_sync_workers_per_subscription) { - StartTransactionCommand(); - started_tx = true; - } - wait_for_relation_state_change(rstate->relid, - SUBREL_STATE_SYNCDONE); - } + TimestampTz now = GetCurrentTimestamp(); + struct tablesync_start_time_mapping *hentry; + bool found; - /* - * If there is no sync worker registered for the table and there - * is some free sync worker slot, start a new sync worker for the - * table. - */ - else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription) - { - TimestampTz now = GetCurrentTimestamp(); - struct tablesync_start_time_mapping *hentry; - bool found; + hentry = hash_search(last_start_times, &rstate->relid, + HASH_ENTER, &found); - hentry = hash_search(last_start_times, &rstate->relid, HASH_ENTER, &found); - - if (!found || - TimestampDifferenceExceeds(hentry->last_start_time, now, - wal_retrieve_retry_interval)) - { - logicalrep_worker_launch(MyLogicalRepWorker->dbid, - MySubscription->oid, - MySubscription->name, - MyLogicalRepWorker->userid, - rstate->relid); - hentry->last_start_time = now; + if (!found || + TimestampDifferenceExceeds(hentry->last_start_time, now, + wal_retrieve_retry_interval)) + { + logicalrep_worker_launch(MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + rstate->relid); + hentry->last_start_time = now; + } } } } @@ -512,7 +538,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } /* - * Process state possible change(s) of tables that are being synchronized. + * Process possible state change(s) of tables that are being synchronized. */ void process_syncing_tables(XLogRecPtr current_lsn)