From e174f699c476a4cc01875211a5f43e57c3190a37 Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Fri, 1 Nov 2019 22:51:05 +0900 Subject: [PATCH] Add some assertions in syncrep.c A couple of routines assume that the LWLock SyncRepLock needs to be taken, so add a couple of assertions to be sure of that. Also, when waiting for a given LSN at transaction commit, the code implied that the syncrep queue cleanup happens while holding interrupts, but the code never checked after that. Author: Michael Paquier Reviewed-by: Fujii Masao, Kyotaro Horiguchi, Dongming Liu Discussion: https://postgr.es/m/a0806273-8bbb-43b3-bbe1-c45a58f6ae21.lingce.ldm@alibaba-inc.com --- src/backend/replication/syncrep.c | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 59a3d5bda6b..7bf7a1b7f8b 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -149,6 +149,13 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) const char *old_status; int mode; + /* + * This should be called while holding interrupts during a transaction + * commit to prevent the follow-up shared memory queue cleanups to be + * influenced by external interruptions. + */ + Assert(InterruptHoldoffCount > 0); + /* Cap the level for anything other than commit to remote flush only. */ if (commit) mode = SyncRepWaitMode; @@ -516,6 +523,8 @@ SyncRepReleaseWaiters(void) /* * Calculate the synced Write, Flush and Apply positions among sync standbys. * + * The caller must hold SyncRepLock. + * * Return false if the number of sync standbys is less than * synchronous_standby_names specifies. Otherwise return true and * store the positions into *writePtr, *flushPtr and *applyPtr. @@ -529,6 +538,8 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, { List *sync_standbys; + Assert(LWLockHeldByMe(SyncRepLock)); + *writePtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr; *applyPtr = InvalidXLogRecPtr; @@ -688,6 +699,8 @@ cmp_lsn(const void *a, const void *b) List * SyncRepGetSyncStandbys(bool *am_sync) { + Assert(LWLockHeldByMe(SyncRepLock)); + /* Set default result */ if (am_sync != NULL) *am_sync = false; @@ -992,7 +1005,7 @@ SyncRepGetStandbyPriority(void) * Pass all = true to wake whole queue; otherwise, just wake up to * the walsender's LSN. * - * Must hold SyncRepLock. + * The caller must hold SyncRepLock in exclusive mode. */ static int SyncRepWakeQueue(bool all, int mode) @@ -1003,6 +1016,7 @@ SyncRepWakeQueue(bool all, int mode) int numprocs = 0; Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); + Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE)); Assert(SyncRepQueueIsOrderedByLSN(mode)); proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),