|
|
|
@ -24,11 +24,17 @@
|
|
|
|
|
* are treated as not a crash but approximately normal termination;
|
|
|
|
|
* the walsender will exit quickly without sending any more XLOG records.
|
|
|
|
|
*
|
|
|
|
|
* If the server is shut down, postmaster sends us SIGUSR2 after all
|
|
|
|
|
* regular backends have exited and the shutdown checkpoint has been written.
|
|
|
|
|
* This instructs walsender to send any outstanding WAL, including the
|
|
|
|
|
* shutdown checkpoint record, wait for it to be replicated to the standby,
|
|
|
|
|
* and then exit.
|
|
|
|
|
* If the server is shut down, checkpointer sends us
|
|
|
|
|
* PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
|
|
|
|
|
* the backend is idle or runs an SQL query this causes the backend to
|
|
|
|
|
* shutdown, if logical replication is in progress all existing WAL records
|
|
|
|
|
* are processed followed by a shutdown. Otherwise this causes the walsender
|
|
|
|
|
* to switch to the "stopping" state. In this state, the walsender will reject
|
|
|
|
|
* any further replication commands. The checkpointer begins the shutdown
|
|
|
|
|
* checkpoint once all walsenders are confirmed as stopping. When the shutdown
|
|
|
|
|
* checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
|
|
|
|
|
* walsender to send any outstanding WAL, including the shutdown checkpoint
|
|
|
|
|
* record, wait for it to be replicated to the standby, and then exit.
|
|
|
|
|
*
|
|
|
|
|
*
|
|
|
|
|
* Portions Copyright (c) 2010-2014, PostgreSQL Global Development Group
|
|
|
|
@ -169,13 +175,14 @@ static bool WalSndCaughtUp = false;
|
|
|
|
|
|
|
|
|
|
/* Flags set by signal handlers for later service in main loop */
|
|
|
|
|
static volatile sig_atomic_t got_SIGHUP = false;
|
|
|
|
|
static volatile sig_atomic_t walsender_ready_to_stop = false;
|
|
|
|
|
static volatile sig_atomic_t got_SIGUSR2 = false;
|
|
|
|
|
static volatile sig_atomic_t got_STOPPING = false;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* This is set while we are streaming. When not set, SIGUSR2 signal will be
|
|
|
|
|
* handled like SIGTERM. When set, the main loop is responsible for checking
|
|
|
|
|
* walsender_ready_to_stop and terminating when it's set (after streaming any
|
|
|
|
|
* remaining WAL).
|
|
|
|
|
* This is set while we are streaming. When not set
|
|
|
|
|
* PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
|
|
|
|
|
* the main loop is responsible for checking got_STOPPING and terminating when
|
|
|
|
|
* it's set (after streaming any remaining WAL).
|
|
|
|
|
*/
|
|
|
|
|
static volatile sig_atomic_t replication_active = false;
|
|
|
|
|
|
|
|
|
@ -261,7 +268,8 @@ WalSndErrorCleanup()
|
|
|
|
|
ReplicationSlotRelease();
|
|
|
|
|
|
|
|
|
|
replication_active = false;
|
|
|
|
|
if (walsender_ready_to_stop)
|
|
|
|
|
|
|
|
|
|
if (got_STOPPING || got_SIGUSR2)
|
|
|
|
|
proc_exit(0);
|
|
|
|
|
|
|
|
|
|
/* Revert back to startup state */
|
|
|
|
@ -669,7 +677,7 @@ StartReplication(StartReplicationCmd *cmd)
|
|
|
|
|
WalSndLoop(XLogSendPhysical);
|
|
|
|
|
|
|
|
|
|
replication_active = false;
|
|
|
|
|
if (walsender_ready_to_stop)
|
|
|
|
|
if (got_STOPPING)
|
|
|
|
|
proc_exit(0);
|
|
|
|
|
WalSndSetState(WALSNDSTATE_STARTUP);
|
|
|
|
|
|
|
|
|
@ -959,7 +967,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
|
|
|
|
|
{
|
|
|
|
|
ereport(LOG,
|
|
|
|
|
(errmsg("terminating walsender process after promotion")));
|
|
|
|
|
walsender_ready_to_stop = true;
|
|
|
|
|
got_STOPPING = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
WalSndSetState(WALSNDSTATE_CATCHUP);
|
|
|
|
@ -1014,7 +1022,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
|
|
|
|
|
ReplicationSlotRelease();
|
|
|
|
|
|
|
|
|
|
replication_active = false;
|
|
|
|
|
if (walsender_ready_to_stop)
|
|
|
|
|
if (got_STOPPING)
|
|
|
|
|
proc_exit(0);
|
|
|
|
|
WalSndSetState(WALSNDSTATE_STARTUP);
|
|
|
|
|
|
|
|
|
@ -1193,6 +1201,14 @@ WalSndWaitForWal(XLogRecPtr loc)
|
|
|
|
|
/* Clear any already-pending wakeups */
|
|
|
|
|
ResetLatch(&MyWalSnd->latch);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* If we're shutting down, trigger pending WAL to be written out,
|
|
|
|
|
* otherwise we'd possibly end up waiting for WAL that never gets
|
|
|
|
|
* written, because walwriter has shut down already.
|
|
|
|
|
*/
|
|
|
|
|
if (got_STOPPING)
|
|
|
|
|
XLogBackgroundFlush();
|
|
|
|
|
|
|
|
|
|
/* Update our idea of the currently flushed position. */
|
|
|
|
|
if (!RecoveryInProgress())
|
|
|
|
|
RecentFlushPtr = GetFlushRecPtr();
|
|
|
|
@ -1208,7 +1224,7 @@ WalSndWaitForWal(XLogRecPtr loc)
|
|
|
|
|
* RecentFlushPtr, so we can send all remaining data before shutting
|
|
|
|
|
* down.
|
|
|
|
|
*/
|
|
|
|
|
if (walsender_ready_to_stop)
|
|
|
|
|
if (got_STOPPING)
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
@ -1281,6 +1297,22 @@ exec_replication_command(const char *cmd_string)
|
|
|
|
|
MemoryContext cmd_context;
|
|
|
|
|
MemoryContext old_context;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* If WAL sender has been told that shutdown is getting close, switch its
|
|
|
|
|
* status accordingly to handle the next replication commands correctly.
|
|
|
|
|
*/
|
|
|
|
|
if (got_STOPPING)
|
|
|
|
|
WalSndSetState(WALSNDSTATE_STOPPING);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Throw error if in stopping mode. We need prevent commands that could
|
|
|
|
|
* generate WAL while the shutdown checkpoint is being written. To be
|
|
|
|
|
* safe, we just prohibit all new commands.
|
|
|
|
|
*/
|
|
|
|
|
if (MyWalSnd->state == WALSNDSTATE_STOPPING)
|
|
|
|
|
ereport(ERROR,
|
|
|
|
|
(errmsg("cannot execute new commands while WAL sender is in stopping mode")));
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
|
|
|
|
|
* command arrives. Clean up the old stuff if there's anything.
|
|
|
|
@ -1875,7 +1907,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
|
|
|
|
|
* normal termination at shutdown, or a promotion, the walsender
|
|
|
|
|
* is not sure which.
|
|
|
|
|
*/
|
|
|
|
|
if (walsender_ready_to_stop)
|
|
|
|
|
if (got_SIGUSR2)
|
|
|
|
|
WalSndDone(send_data);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -2194,6 +2226,10 @@ XLogSendPhysical(void)
|
|
|
|
|
XLogRecPtr endptr;
|
|
|
|
|
Size nbytes;
|
|
|
|
|
|
|
|
|
|
/* If requested switch the WAL sender to the stopping state. */
|
|
|
|
|
if (got_STOPPING)
|
|
|
|
|
WalSndSetState(WALSNDSTATE_STOPPING);
|
|
|
|
|
|
|
|
|
|
if (streamingDoneSending)
|
|
|
|
|
{
|
|
|
|
|
WalSndCaughtUp = true;
|
|
|
|
@ -2454,7 +2490,16 @@ XLogSendLogical(void)
|
|
|
|
|
* point, then we're caught up.
|
|
|
|
|
*/
|
|
|
|
|
if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
|
|
|
|
|
{
|
|
|
|
|
WalSndCaughtUp = true;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Have WalSndLoop() terminate the connection in an orderly
|
|
|
|
|
* manner, after writing out all the pending data.
|
|
|
|
|
*/
|
|
|
|
|
if (got_STOPPING)
|
|
|
|
|
got_SIGUSR2 = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Update shared memory status */
|
|
|
|
@ -2567,6 +2612,30 @@ WalSndRqstFileReload(void)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Handle PROCSIG_WALSND_INIT_STOPPING signal.
|
|
|
|
|
*/
|
|
|
|
|
void
|
|
|
|
|
HandleWalSndInitStopping(void)
|
|
|
|
|
{
|
|
|
|
|
Assert(am_walsender);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* If replication has not yet started, die like with SIGTERM. If
|
|
|
|
|
* replication is active, only set a flag and wake up the main loop. It
|
|
|
|
|
* will send any outstanding WAL, wait for it to be replicated to the
|
|
|
|
|
* standby, and then exit gracefully.
|
|
|
|
|
*/
|
|
|
|
|
if (!replication_active)
|
|
|
|
|
kill(MyProcPid, SIGTERM);
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
got_STOPPING = true;
|
|
|
|
|
if (MyWalSnd)
|
|
|
|
|
SetLatch(&MyWalSnd->latch);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* SIGHUP: set flag to re-read config file at next convenient time */
|
|
|
|
|
static void
|
|
|
|
|
WalSndSigHupHandler(SIGNAL_ARGS)
|
|
|
|
@ -2580,22 +2649,16 @@ WalSndSigHupHandler(SIGNAL_ARGS)
|
|
|
|
|
errno = save_errno;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* SIGUSR2: set flag to do a last cycle and shut down afterwards */
|
|
|
|
|
/*
|
|
|
|
|
* SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
|
|
|
|
|
* sender should already have been switched to WALSNDSTATE_STOPPING at
|
|
|
|
|
* this point.
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
WalSndLastCycleHandler(SIGNAL_ARGS)
|
|
|
|
|
{
|
|
|
|
|
int save_errno = errno;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* If replication has not yet started, die like with SIGTERM. If
|
|
|
|
|
* replication is active, only set a flag and wake up the main loop. It
|
|
|
|
|
* will send any outstanding WAL, wait for it to be replicated to the
|
|
|
|
|
* standby, and then exit gracefully.
|
|
|
|
|
*/
|
|
|
|
|
if (!replication_active)
|
|
|
|
|
kill(MyProcPid, SIGTERM);
|
|
|
|
|
|
|
|
|
|
walsender_ready_to_stop = true;
|
|
|
|
|
if (MyWalSnd)
|
|
|
|
|
SetLatch(&MyWalSnd->latch);
|
|
|
|
|
|
|
|
|
@ -2681,6 +2744,77 @@ WalSndWakeup(void)
|
|
|
|
|
SetLatch(&WalSndCtl->walsnds[i].latch);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Signal all walsenders to move to stopping state.
|
|
|
|
|
*
|
|
|
|
|
* This will trigger walsenders to move to a state where no further WAL can be
|
|
|
|
|
* generated. See this file's header for details.
|
|
|
|
|
*/
|
|
|
|
|
void
|
|
|
|
|
WalSndInitStopping(void)
|
|
|
|
|
{
|
|
|
|
|
int i;
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < max_wal_senders; i++)
|
|
|
|
|
{
|
|
|
|
|
WalSnd *walsnd = &WalSndCtl->walsnds[i];
|
|
|
|
|
pid_t pid;
|
|
|
|
|
|
|
|
|
|
SpinLockAcquire(&walsnd->mutex);
|
|
|
|
|
pid = walsnd->pid;
|
|
|
|
|
SpinLockRelease(&walsnd->mutex);
|
|
|
|
|
|
|
|
|
|
if (pid == 0)
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Wait that all the WAL senders have quit or reached the stopping state. This
|
|
|
|
|
* is used by the checkpointer to control when the shutdown checkpoint can
|
|
|
|
|
* safely be performed.
|
|
|
|
|
*/
|
|
|
|
|
void
|
|
|
|
|
WalSndWaitStopping(void)
|
|
|
|
|
{
|
|
|
|
|
for (;;)
|
|
|
|
|
{
|
|
|
|
|
int i;
|
|
|
|
|
bool all_stopped = true;
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < max_wal_senders; i++)
|
|
|
|
|
{
|
|
|
|
|
WalSndState state;
|
|
|
|
|
WalSnd *walsnd = &WalSndCtl->walsnds[i];
|
|
|
|
|
|
|
|
|
|
SpinLockAcquire(&walsnd->mutex);
|
|
|
|
|
|
|
|
|
|
if (walsnd->pid == 0)
|
|
|
|
|
{
|
|
|
|
|
SpinLockRelease(&walsnd->mutex);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
state = walsnd->state;
|
|
|
|
|
SpinLockRelease(&walsnd->mutex);
|
|
|
|
|
|
|
|
|
|
if (state != WALSNDSTATE_STOPPING)
|
|
|
|
|
{
|
|
|
|
|
all_stopped = false;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* safe to leave if confirmation is done for all WAL senders */
|
|
|
|
|
if (all_stopped)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
pg_usleep(10000L); /* wait for 10 msec */
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Set state for current walsender (only called in walsender) */
|
|
|
|
|
void
|
|
|
|
|
WalSndSetState(WalSndState state)
|
|
|
|
@ -2715,6 +2849,8 @@ WalSndGetStateString(WalSndState state)
|
|
|
|
|
return "catchup";
|
|
|
|
|
case WALSNDSTATE_STREAMING:
|
|
|
|
|
return "streaming";
|
|
|
|
|
case WALSNDSTATE_STOPPING:
|
|
|
|
|
return "stopping";
|
|
|
|
|
}
|
|
|
|
|
return "UNKNOWN";
|
|
|
|
|
}
|
|
|
|
|