diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 5fd47689dd2..6a28becdad5 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -711,14 +711,24 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) for (i = 0; i < max_wal_senders; i++) { + XLogRecPtr flush; + WalSndState state; + int pid; + walsnd = &WalSndCtl->walsnds[i]; + SpinLockAcquire(&walsnd->mutex); + pid = walsnd->pid; + flush = walsnd->flush; + state = walsnd->state; + SpinLockRelease(&walsnd->mutex); + /* Must be active */ - if (walsnd->pid == 0) + if (pid == 0) continue; /* Must be streaming */ - if (walsnd->state != WALSNDSTATE_STREAMING) + if (state != WALSNDSTATE_STREAMING) continue; /* Must be synchronous */ @@ -726,7 +736,7 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) continue; /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(walsnd->flush)) + if (XLogRecPtrIsInvalid(flush)) continue; /* @@ -780,14 +790,24 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) */ for (i = 0; i < max_wal_senders; i++) { + XLogRecPtr flush; + WalSndState state; + int pid; + walsnd = &WalSndCtl->walsnds[i]; + SpinLockAcquire(&walsnd->mutex); + pid = walsnd->pid; + flush = walsnd->flush; + state = walsnd->state; + SpinLockRelease(&walsnd->mutex); + /* Must be active */ - if (walsnd->pid == 0) + if (pid == 0) continue; /* Must be streaming */ - if (walsnd->state != WALSNDSTATE_STREAMING) + if (state != WALSNDSTATE_STREAMING) continue; /* Must be synchronous */ @@ -796,7 +816,7 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) continue; /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(walsnd->flush)) + if (XLogRecPtrIsInvalid(flush)) continue; /* diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 8a249e22b9f..ea9d21a46b3 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1379,7 +1379,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) TupleDesc tupdesc; Datum *values; bool *nulls; - WalRcvData *walrcv = WalRcv; + int pid; + bool ready_to_display; WalRcvState state; XLogRecPtr receive_start_lsn; TimeLineID receive_start_tli; @@ -1392,11 +1393,28 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) char *slotname; char *conninfo; + /* Take a lock to ensure value consistency */ + SpinLockAcquire(&WalRcv->mutex); + pid = (int) WalRcv->pid; + ready_to_display = WalRcv->ready_to_display; + state = WalRcv->walRcvState; + receive_start_lsn = WalRcv->receiveStart; + receive_start_tli = WalRcv->receiveStartTLI; + received_lsn = WalRcv->receivedUpto; + received_tli = WalRcv->receivedTLI; + last_send_time = WalRcv->lastMsgSendTime; + last_receipt_time = WalRcv->lastMsgReceiptTime; + latest_end_lsn = WalRcv->latestWalEnd; + latest_end_time = WalRcv->latestWalEndTime; + slotname = pstrdup(WalRcv->slotname); + conninfo = pstrdup(WalRcv->conninfo); + SpinLockRelease(&WalRcv->mutex); + /* * No WAL receiver (or not ready yet), just return a tuple with NULL * values */ - if (walrcv->pid == 0 || !walrcv->ready_to_display) + if (pid == 0 || !ready_to_display) PG_RETURN_NULL(); /* determine result type */ @@ -1406,23 +1424,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) values = palloc0(sizeof(Datum) * tupdesc->natts); nulls = palloc0(sizeof(bool) * tupdesc->natts); - /* Take a lock to ensure value consistency */ - SpinLockAcquire(&walrcv->mutex); - state = walrcv->walRcvState; - receive_start_lsn = walrcv->receiveStart; - receive_start_tli = walrcv->receiveStartTLI; - received_lsn = walrcv->receivedUpto; - received_tli = walrcv->receivedTLI; - last_send_time = walrcv->lastMsgSendTime; - last_receipt_time = walrcv->lastMsgReceiptTime; - latest_end_lsn = walrcv->latestWalEnd; - latest_end_time = walrcv->latestWalEndTime; - slotname = pstrdup(walrcv->slotname); - conninfo = pstrdup(walrcv->conninfo); - SpinLockRelease(&walrcv->mutex); - /* Fetch values */ - values[0] = Int32GetDatum(walrcv->pid); + values[0] = Int32GetDatum(pid); if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS)) { @@ -1473,6 +1476,5 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) } /* Returns the record as Datum */ - PG_RETURN_DATUM(HeapTupleGetDatum( - heap_form_tuple(tupdesc, values, nulls))); + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index da0553e016a..002143b26a2 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -668,13 +668,9 @@ StartReplication(StartReplicationCmd *cmd) sentPtr = cmd->startpoint; /* Initialize shared memory status, too */ - { - WalSnd *walsnd = MyWalSnd; - - SpinLockAcquire(&walsnd->mutex); - walsnd->sentPtr = sentPtr; - SpinLockRelease(&walsnd->mutex); - } + SpinLockAcquire(&MyWalSnd->mutex); + MyWalSnd->sentPtr = sentPtr; + SpinLockRelease(&MyWalSnd->mutex); SyncRepInitConfig(); @@ -1093,13 +1089,9 @@ StartLogicalReplication(StartReplicationCmd *cmd) sentPtr = MyReplicationSlot->data.confirmed_flush; /* Also update the sent position status in shared memory */ - { - WalSnd *walsnd = MyWalSnd; - - SpinLockAcquire(&walsnd->mutex); - walsnd->sentPtr = MyReplicationSlot->data.restart_lsn; - SpinLockRelease(&walsnd->mutex); - } + SpinLockAcquire(&MyWalSnd->mutex); + MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn; + SpinLockRelease(&MyWalSnd->mutex); replication_active = true; @@ -2892,10 +2884,12 @@ WalSndRqstFileReload(void) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; - if (walsnd->pid == 0) - continue; - SpinLockAcquire(&walsnd->mutex); + if (walsnd->pid == 0) + { + SpinLockRelease(&walsnd->mutex); + continue; + } walsnd->needreload = true; SpinLockRelease(&walsnd->mutex); } @@ -3071,7 +3065,6 @@ WalSndWaitStopping(void) for (i = 0; i < max_wal_senders; i++) { - WalSndState state; WalSnd *walsnd = &WalSndCtl->walsnds[i]; SpinLockAcquire(&walsnd->mutex); @@ -3082,14 +3075,13 @@ WalSndWaitStopping(void) continue; } - state = walsnd->state; - SpinLockRelease(&walsnd->mutex); - - if (state != WALSNDSTATE_STOPPING) + if (walsnd->state != WALSNDSTATE_STOPPING) { all_stopped = false; + SpinLockRelease(&walsnd->mutex); break; } + SpinLockRelease(&walsnd->mutex); } /* safe to leave if confirmation is done for all WAL senders */ @@ -3210,14 +3202,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) TimeOffset flushLag; TimeOffset applyLag; int priority; + int pid; WalSndState state; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; - if (walsnd->pid == 0) - continue; - SpinLockAcquire(&walsnd->mutex); + if (walsnd->pid == 0) + { + SpinLockRelease(&walsnd->mutex); + continue; + } + pid = walsnd->pid; sentPtr = walsnd->sentPtr; state = walsnd->state; write = walsnd->write; @@ -3230,7 +3226,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) SpinLockRelease(&walsnd->mutex); memset(nulls, 0, sizeof(nulls)); - values[0] = Int32GetDatum(walsnd->pid); + values[0] = Int32GetDatum(pid); if (!superuser()) { @@ -3265,7 +3261,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) * which always returns an invalid flush location, as an * asynchronous standby. */ - priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority; + priority = XLogRecPtrIsInvalid(flush) ? 0 : priority; if (writeLag < 0) nulls[6] = true; diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index c8652dbd489..9a8b2e207ec 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -114,6 +114,9 @@ typedef struct */ char slotname[NAMEDATALEN]; + /* set true once conninfo is ready to display (obfuscated pwds etc) */ + bool ready_to_display; + slock_t mutex; /* locks shared variables shown above */ /* @@ -122,9 +125,6 @@ typedef struct */ bool force_reply; - /* set true once conninfo is ready to display (obfuscated pwds etc) */ - bool ready_to_display; - /* * Latch used by startup process to wake up walreceiver after telling it * where to start streaming (after setting receiveStart and diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 0aa80d5c3e2..17c68cba235 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -30,10 +30,17 @@ typedef enum WalSndState /* * Each walsender has a WalSnd struct in shared memory. + * + * This struct is protected by 'mutex', with two exceptions: one is + * sync_standby_priority as noted below. The other exception is that some + * members are only written by the walsender process itself, and thus that + * process is free to read those members without holding spinlock. pid and + * needreload always require the spinlock to be held for all accesses. */ typedef struct WalSnd { - pid_t pid; /* this walsender's process id, or 0 */ + pid_t pid; /* this walsender's PID, or 0 if not active */ + WalSndState state; /* this walsender's state */ XLogRecPtr sentPtr; /* WAL has been sent up to this point */ bool needreload; /* does currently-open file need to be