1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-30 11:03:19 +03:00

Back-patch assorted latch-related fixes.

Fix a whole bunch of signal handlers that had been hacked to do things that
might change errno, without adding the necessary save/restore logic for
errno.  Also make some minor fixes in unix_latch.c, and clean up bizarre
and unsafe scheme for disowning the process's latch.  While at it, rename
the PGPROC latch field to procLatch for consistency with 9.2.

Issues noted while reviewing a patch by Peter Geoghegan.
This commit is contained in:
Tom Lane
2011-08-10 12:20:45 -04:00
parent 74d099494c
commit 989f530d3f
9 changed files with 124 additions and 39 deletions

View File

@ -9881,34 +9881,50 @@ startupproc_quickdie(SIGNAL_ARGS)
static void static void
StartupProcSigUsr1Handler(SIGNAL_ARGS) StartupProcSigUsr1Handler(SIGNAL_ARGS)
{ {
int save_errno = errno;
latch_sigusr1_handler(); latch_sigusr1_handler();
errno = save_errno;
} }
/* SIGUSR2: set flag to finish recovery */ /* SIGUSR2: set flag to finish recovery */
static void static void
StartupProcTriggerHandler(SIGNAL_ARGS) StartupProcTriggerHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
promote_triggered = true; promote_triggered = true;
WakeupRecovery(); WakeupRecovery();
errno = save_errno;
} }
/* SIGHUP: set flag to re-read config file at next convenient time */ /* SIGHUP: set flag to re-read config file at next convenient time */
static void static void
StartupProcSigHupHandler(SIGNAL_ARGS) StartupProcSigHupHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
got_SIGHUP = true; got_SIGHUP = true;
WakeupRecovery(); WakeupRecovery();
errno = save_errno;
} }
/* SIGTERM: set flag to abort redo and exit */ /* SIGTERM: set flag to abort redo and exit */
static void static void
StartupProcShutdownHandler(SIGNAL_ARGS) StartupProcShutdownHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
if (in_restore_command) if (in_restore_command)
proc_exit(1); proc_exit(1);
else else
shutdown_requested = true; shutdown_requested = true;
WakeupRecovery(); WakeupRecovery();
errno = save_errno;
} }
/* Handle SIGHUP and SIGTERM signals of startup process */ /* Handle SIGHUP and SIGTERM signals of startup process */

View File

@ -135,9 +135,12 @@ DisownLatch(volatile Latch *latch)
* If the latch is already set, the function returns immediately. * If the latch is already set, the function returns immediately.
* *
* The 'timeout' is given in milliseconds, and -1 means wait forever. * The 'timeout' is given in milliseconds, and -1 means wait forever.
* On some platforms, signals cause the timeout to be restarted, so beware * On some platforms, signals do not interrupt the wait, or even
* that the function can sleep for several times longer than the specified * cause the timeout to be restarted, so beware that the function can sleep
* timeout. * for several times longer than the requested timeout. However, this
* difficulty is not so great as it seems, because the signal handlers for any
* signals that the caller should respond to ought to be programmed to end the
* wait by calling SetLatch. Ideally, the timeout parameter is vestigial.
* *
* The latch must be owned by the current process, ie. it must be a * The latch must be owned by the current process, ie. it must be a
* backend-local latch initialized with InitLatch, or a shared latch * backend-local latch initialized with InitLatch, or a shared latch
@ -228,6 +231,7 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead,
{ {
if (errno == EINTR) if (errno == EINTR)
continue; continue;
waiting = false;
ereport(ERROR, ereport(ERROR,
(errcode_for_socket_access(), (errcode_for_socket_access(),
errmsg("select() failed: %m"))); errmsg("select() failed: %m")));
@ -255,6 +259,10 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead,
* Sets a latch and wakes up anyone waiting on it. * Sets a latch and wakes up anyone waiting on it.
* *
* This is cheap if the latch is already set, otherwise not so much. * This is cheap if the latch is already set, otherwise not so much.
*
* NB: when calling this in a signal handler, be sure to save and restore
* errno around it. (That's standard practice in most signal handlers, of
* course, but we used to omit it in handlers that only set a flag.)
*/ */
void void
SetLatch(volatile Latch *latch) SetLatch(volatile Latch *latch)
@ -300,7 +308,10 @@ SetLatch(volatile Latch *latch)
if (owner_pid == 0) if (owner_pid == 0)
return; return;
else if (owner_pid == MyProcPid) else if (owner_pid == MyProcPid)
sendSelfPipeByte(); {
if (waiting)
sendSelfPipeByte();
}
else else
kill(owner_pid, SIGUSR1); kill(owner_pid, SIGUSR1);
} }
@ -332,7 +343,11 @@ ResetLatch(volatile Latch *latch)
* SetLatch uses SIGUSR1 to wake up the process waiting on the latch. * SetLatch uses SIGUSR1 to wake up the process waiting on the latch.
* *
* Wake up WaitLatch, if we're waiting. (We might not be, since SIGUSR1 is * Wake up WaitLatch, if we're waiting. (We might not be, since SIGUSR1 is
* overloaded for multiple purposes.) * overloaded for multiple purposes; or we might not have reached WaitLatch
* yet, in which case we don't need to fill the pipe either.)
*
* NB: when calling this in a signal handler, be sure to save and restore
* errno around it.
*/ */
void void
latch_sigusr1_handler(void) latch_sigusr1_handler(void)
@ -396,13 +411,19 @@ retry:
} }
} }
/* Read all available data from the self-pipe */ /*
* Read all available data from the self-pipe
*
* Note: this is only called when waiting = true. If it fails and doesn't
* return, it must reset that flag first (though ideally, this will never
* happen).
*/
static void static void
drainSelfPipe(void) drainSelfPipe(void)
{ {
/* /*
* There shouldn't normally be more than one byte in the pipe, or maybe a * There shouldn't normally be more than one byte in the pipe, or maybe a
* few more if multiple processes run SetLatch at the same instant. * few bytes if multiple processes run SetLatch at the same instant.
*/ */
char buf[16]; char buf[16];
int rc; int rc;
@ -417,9 +438,21 @@ drainSelfPipe(void)
else if (errno == EINTR) else if (errno == EINTR)
continue; /* retry */ continue; /* retry */
else else
{
waiting = false;
elog(ERROR, "read() on self-pipe failed: %m"); elog(ERROR, "read() on self-pipe failed: %m");
}
} }
else if (rc == 0) else if (rc == 0)
{
waiting = false;
elog(ERROR, "unexpected EOF on self-pipe"); elog(ERROR, "unexpected EOF on self-pipe");
}
else if (rc < sizeof(buf))
{
/* we successfully drained the pipe; no need to read() again */
break;
}
/* else buffer wasn't big enough, so read again */
} }
} }

View File

@ -111,9 +111,6 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
Assert(WalSndCtl != NULL); Assert(WalSndCtl != NULL);
/* Reset the latch before adding ourselves to the queue. */
ResetLatch(&MyProc->waitLatch);
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING); Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
@ -167,7 +164,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
int syncRepState; int syncRepState;
/* Must reset the latch before testing state. */ /* Must reset the latch before testing state. */
ResetLatch(&MyProc->waitLatch); ResetLatch(&MyProc->procLatch);
/* /*
* Try checking the state without the lock first. There's no * Try checking the state without the lock first. There's no
@ -247,11 +244,11 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
} }
/* /*
* Wait on latch for up to 60 seconds. This allows us to check for * Wait on latch for up to 10 seconds. This allows us to check for
* cancel/die signal or postmaster death regularly while waiting. Note * cancel/die signal or postmaster death regularly while waiting. Note
* that timeout here does not necessarily release from loop. * that timeout here does not necessarily release from loop.
*/ */
WaitLatch(&MyProc->waitLatch, 60000L); WaitLatch(&MyProc->procLatch, 10000L);
} }
/* /*
@ -322,7 +319,7 @@ SyncRepCancelWait(void)
} }
void void
SyncRepCleanupAtProcExit(int code, Datum arg) SyncRepCleanupAtProcExit(void)
{ {
if (!SHMQueueIsDetached(&(MyProc->syncRepLinks))) if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
{ {
@ -330,8 +327,6 @@ SyncRepCleanupAtProcExit(int code, Datum arg)
SHMQueueDelete(&(MyProc->syncRepLinks)); SHMQueueDelete(&(MyProc->syncRepLinks));
LWLockRelease(SyncRepLock); LWLockRelease(SyncRepLock);
} }
DisownLatch(&MyProc->waitLatch);
} }
/* /*
@ -560,9 +555,7 @@ SyncRepWakeQueue(bool all)
/* /*
* Wake only when we have set state and removed from queue. * Wake only when we have set state and removed from queue.
*/ */
Assert(SHMQueueIsDetached(&(thisproc->syncRepLinks))); SetLatch(&(thisproc->procLatch));
Assert(thisproc->syncRepState == SYNC_REP_WAIT_COMPLETE);
SetLatch(&(thisproc->waitLatch));
numprocs++; numprocs++;
} }

View File

@ -373,11 +373,15 @@ WalRcvSigHupHandler(SIGNAL_ARGS)
static void static void
WalRcvShutdownHandler(SIGNAL_ARGS) WalRcvShutdownHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
got_SIGTERM = true; got_SIGTERM = true;
/* Don't joggle the elbow of proc_exit */ /* Don't joggle the elbow of proc_exit */
if (!proc_exit_inprogress && WalRcvImmediateInterruptOK) if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
ProcessWalRcvInterrupts(); ProcessWalRcvInterrupts();
errno = save_errno;
} }
/* /*

View File

@ -1188,18 +1188,26 @@ XLogSend(char *msgbuf, bool *caughtup)
static void static void
WalSndSigHupHandler(SIGNAL_ARGS) WalSndSigHupHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
got_SIGHUP = true; got_SIGHUP = true;
if (MyWalSnd) if (MyWalSnd)
SetLatch(&MyWalSnd->latch); SetLatch(&MyWalSnd->latch);
errno = save_errno;
} }
/* SIGTERM: set flag to shut down */ /* SIGTERM: set flag to shut down */
static void static void
WalSndShutdownHandler(SIGNAL_ARGS) WalSndShutdownHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
walsender_shutdown_requested = true; walsender_shutdown_requested = true;
if (MyWalSnd) if (MyWalSnd)
SetLatch(&MyWalSnd->latch); SetLatch(&MyWalSnd->latch);
errno = save_errno;
} }
/* /*
@ -1238,16 +1246,24 @@ WalSndQuickDieHandler(SIGNAL_ARGS)
static void static void
WalSndXLogSendHandler(SIGNAL_ARGS) WalSndXLogSendHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
latch_sigusr1_handler(); latch_sigusr1_handler();
errno = save_errno;
} }
/* SIGUSR2: set flag to do a last cycle and shut down afterwards */ /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
static void static void
WalSndLastCycleHandler(SIGNAL_ARGS) WalSndLastCycleHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
walsender_ready_to_stop = true; walsender_ready_to_stop = true;
if (MyWalSnd) if (MyWalSnd)
SetLatch(&MyWalSnd->latch); SetLatch(&MyWalSnd->latch);
errno = save_errno;
} }
/* Set up signal handlers */ /* Set up signal handlers */

View File

@ -187,7 +187,8 @@ InitProcGlobal(void)
ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY; ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY;
/* /*
* Pre-create the PGPROC structures and create a semaphore for each. * Pre-create the PGPROC structures and create a semaphore and latch
* for each.
*/ */
procs = (PGPROC *) ShmemAlloc((MaxConnections) * sizeof(PGPROC)); procs = (PGPROC *) ShmemAlloc((MaxConnections) * sizeof(PGPROC));
if (!procs) if (!procs)
@ -198,9 +199,9 @@ InitProcGlobal(void)
for (i = 0; i < MaxConnections; i++) for (i = 0; i < MaxConnections; i++)
{ {
PGSemaphoreCreate(&(procs[i].sem)); PGSemaphoreCreate(&(procs[i].sem));
InitSharedLatch(&procs[i].procLatch);
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs; procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs;
ProcGlobal->freeProcs = &procs[i]; ProcGlobal->freeProcs = &procs[i];
InitSharedLatch(&procs[i].waitLatch);
} }
/* /*
@ -217,9 +218,9 @@ InitProcGlobal(void)
for (i = 0; i < autovacuum_max_workers + 1; i++) for (i = 0; i < autovacuum_max_workers + 1; i++)
{ {
PGSemaphoreCreate(&(procs[i].sem)); PGSemaphoreCreate(&(procs[i].sem));
InitSharedLatch(&procs[i].procLatch);
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs; procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs;
ProcGlobal->autovacFreeProcs = &procs[i]; ProcGlobal->autovacFreeProcs = &procs[i];
InitSharedLatch(&procs[i].waitLatch);
} }
/* /*
@ -230,7 +231,7 @@ InitProcGlobal(void)
{ {
AuxiliaryProcs[i].pid = 0; /* marks auxiliary proc as not in use */ AuxiliaryProcs[i].pid = 0; /* marks auxiliary proc as not in use */
PGSemaphoreCreate(&(AuxiliaryProcs[i].sem)); PGSemaphoreCreate(&(AuxiliaryProcs[i].sem));
InitSharedLatch(&procs[i].waitLatch); InitSharedLatch(&AuxiliaryProcs[i].procLatch);
} }
/* Create ProcStructLock spinlock, too */ /* Create ProcStructLock spinlock, too */
@ -306,8 +307,8 @@ InitProcess(void)
MarkPostmasterChildActive(); MarkPostmasterChildActive();
/* /*
* Initialize all fields of MyProc, except for the semaphore which was * Initialize all fields of MyProc, except for the semaphore and latch,
* prepared for us by InitProcGlobal. * which were prepared for us by InitProcGlobal.
*/ */
SHMQueueElemInit(&(MyProc->links)); SHMQueueElemInit(&(MyProc->links));
MyProc->waitStatus = STATUS_OK; MyProc->waitStatus = STATUS_OK;
@ -333,12 +334,17 @@ InitProcess(void)
SHMQueueInit(&(MyProc->myProcLocks[i])); SHMQueueInit(&(MyProc->myProcLocks[i]));
MyProc->recoveryConflictPending = false; MyProc->recoveryConflictPending = false;
/* Initialise for sync rep */ /* Initialize fields for sync rep */
MyProc->waitLSN.xlogid = 0; MyProc->waitLSN.xlogid = 0;
MyProc->waitLSN.xrecoff = 0; MyProc->waitLSN.xrecoff = 0;
MyProc->syncRepState = SYNC_REP_NOT_WAITING; MyProc->syncRepState = SYNC_REP_NOT_WAITING;
SHMQueueElemInit(&(MyProc->syncRepLinks)); SHMQueueElemInit(&(MyProc->syncRepLinks));
OwnLatch(&MyProc->waitLatch);
/*
* Acquire ownership of the PGPROC's latch, so that we can use WaitLatch.
* Note that there's no particular need to do ResetLatch here.
*/
OwnLatch(&MyProc->procLatch);
/* /*
* We might be reusing a semaphore that belonged to a failed process. So * We might be reusing a semaphore that belonged to a failed process. So
@ -379,7 +385,6 @@ InitProcessPhase2(void)
/* /*
* Arrange to clean that up at backend exit. * Arrange to clean that up at backend exit.
*/ */
on_shmem_exit(SyncRepCleanupAtProcExit, 0);
on_shmem_exit(RemoveProcFromArray, 0); on_shmem_exit(RemoveProcFromArray, 0);
} }
@ -454,8 +459,8 @@ InitAuxiliaryProcess(void)
SpinLockRelease(ProcStructLock); SpinLockRelease(ProcStructLock);
/* /*
* Initialize all fields of MyProc, except for the semaphore which was * Initialize all fields of MyProc, except for the semaphore and latch,
* prepared for us by InitProcGlobal. * which were prepared for us by InitProcGlobal.
*/ */
SHMQueueElemInit(&(MyProc->links)); SHMQueueElemInit(&(MyProc->links));
MyProc->waitStatus = STATUS_OK; MyProc->waitStatus = STATUS_OK;
@ -475,6 +480,12 @@ InitAuxiliaryProcess(void)
for (i = 0; i < NUM_LOCK_PARTITIONS; i++) for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
SHMQueueInit(&(MyProc->myProcLocks[i])); SHMQueueInit(&(MyProc->myProcLocks[i]));
/*
* Acquire ownership of the PGPROC's latch, so that we can use WaitLatch.
* Note that there's no particular need to do ResetLatch here.
*/
OwnLatch(&MyProc->procLatch);
/* /*
* We might be reusing a semaphore that belonged to a failed process. So * We might be reusing a semaphore that belonged to a failed process. So
* be careful and reinitialize its value here. (This is not strictly * be careful and reinitialize its value here. (This is not strictly
@ -677,6 +688,9 @@ ProcKill(int code, Datum arg)
Assert(MyProc != NULL); Assert(MyProc != NULL);
/* Make sure we're out of the sync rep lists */
SyncRepCleanupAtProcExit();
/* /*
* Release any LW locks I am holding. There really shouldn't be any, but * Release any LW locks I am holding. There really shouldn't be any, but
* it's cheap to check again before we cut the knees off the LWLock * it's cheap to check again before we cut the knees off the LWLock
@ -684,6 +698,9 @@ ProcKill(int code, Datum arg)
*/ */
LWLockReleaseAll(); LWLockReleaseAll();
/* Release ownership of the process's latch, too */
DisownLatch(&MyProc->procLatch);
SpinLockAcquire(ProcStructLock); SpinLockAcquire(ProcStructLock);
/* Return PGPROC structure (and semaphore) to appropriate freelist */ /* Return PGPROC structure (and semaphore) to appropriate freelist */
@ -739,6 +756,9 @@ AuxiliaryProcKill(int code, Datum arg)
/* Release any LW locks I am holding (see notes above) */ /* Release any LW locks I am holding (see notes above) */
LWLockReleaseAll(); LWLockReleaseAll();
/* Release ownership of the process's latch, too */
DisownLatch(&MyProc->procLatch);
SpinLockAcquire(ProcStructLock); SpinLockAcquire(ProcStructLock);
/* Mark auxiliary proc no longer in use */ /* Mark auxiliary proc no longer in use */

View File

@ -2643,11 +2643,12 @@ die(SIGNAL_ARGS)
InterruptHoldoffCount--; InterruptHoldoffCount--;
ProcessInterrupts(); ProcessInterrupts();
} }
/* Interrupt any sync rep wait which is currently in progress. */
SetLatch(&(MyProc->waitLatch));
} }
/* If we're still here, waken anything waiting on the process latch */
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno; errno = save_errno;
} }
@ -2684,11 +2685,12 @@ StatementCancelHandler(SIGNAL_ARGS)
InterruptHoldoffCount--; InterruptHoldoffCount--;
ProcessInterrupts(); ProcessInterrupts();
} }
/* Interrupt any sync rep wait which is currently in progress. */
SetLatch(&(MyProc->waitLatch));
} }
/* If we're still here, waken anything waiting on the process latch */
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno; errno = save_errno;
} }

View File

@ -33,8 +33,8 @@ extern char *SyncRepStandbyNames;
/* called by user backend */ /* called by user backend */
extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN);
/* callback at backend exit */ /* called at backend exit */
extern void SyncRepCleanupAtProcExit(int code, Datum arg); extern void SyncRepCleanupAtProcExit(void);
/* called by wal sender */ /* called by wal sender */
extern void SyncRepInitConfig(void); extern void SyncRepInitConfig(void);

View File

@ -118,13 +118,14 @@ struct PGPROC
LOCKMASK heldLocks; /* bitmask for lock types already held on this LOCKMASK heldLocks; /* bitmask for lock types already held on this
* lock object by this backend */ * lock object by this backend */
Latch procLatch; /* generic latch for process */
/* /*
* Info to allow us to wait for synchronous replication, if needed. * Info to allow us to wait for synchronous replication, if needed.
* waitLSN is InvalidXLogRecPtr if not waiting; set only by user backend. * waitLSN is InvalidXLogRecPtr if not waiting; set only by user backend.
* syncRepState must not be touched except by owning process or WALSender. * syncRepState must not be touched except by owning process or WALSender.
* syncRepLinks used only while holding SyncRepLock. * syncRepLinks used only while holding SyncRepLock.
*/ */
Latch waitLatch; /* allow us to wait for sync rep */
XLogRecPtr waitLSN; /* waiting for this LSN or higher */ XLogRecPtr waitLSN; /* waiting for this LSN or higher */
int syncRepState; /* wait state for sync rep */ int syncRepState; /* wait state for sync rep */
SHM_QUEUE syncRepLinks; /* list link if process is in syncrep queue */ SHM_QUEUE syncRepLinks; /* list link if process is in syncrep queue */