mirror of
https://github.com/postgres/postgres.git
synced 2025-07-05 07:21:24 +03:00
Hot Standby feedback for avoidance of cleanup conflicts on standby.
Standby optionally sends back information about oldestXmin of queries which is then checked and applied to the WALSender's proc->xmin. GetOldestXmin() is modified slightly to agree with GetSnapshotData(), so that all backends on primary include WALSender within their snapshots. Note this does nothing to change the snapshot xmin on either master or standby. Feedback piggybacks on the standby reply message. vacuum_defer_cleanup_age is no longer used on standby, though parameter still exists on primary, since some use cases still exist. Simon Riggs, review comments from Fujii Masao, Heikki Linnakangas, Robert Haas
This commit is contained in:
@ -158,6 +158,11 @@ static XLogRecPtr LastRec;
|
||||
* known, need to check the shared state".
|
||||
*/
|
||||
static bool LocalRecoveryInProgress = true;
|
||||
/*
|
||||
* Local copy of SharedHotStandbyActive variable. False actually means "not
|
||||
* known, need to check the shared state".
|
||||
*/
|
||||
static bool LocalHotStandbyActive = false;
|
||||
|
||||
/*
|
||||
* Local state for XLogInsertAllowed():
|
||||
@ -405,6 +410,12 @@ typedef struct XLogCtlData
|
||||
*/
|
||||
bool SharedRecoveryInProgress;
|
||||
|
||||
/*
|
||||
* SharedHotStandbyActive indicates if we're still in crash or archive
|
||||
* recovery. Protected by info_lck.
|
||||
*/
|
||||
bool SharedHotStandbyActive;
|
||||
|
||||
/*
|
||||
* recoveryWakeupLatch is used to wake up the startup process to
|
||||
* continue WAL replay, if it is waiting for WAL to arrive or failover
|
||||
@ -4917,6 +4928,7 @@ XLOGShmemInit(void)
|
||||
*/
|
||||
XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
|
||||
XLogCtl->SharedRecoveryInProgress = true;
|
||||
XLogCtl->SharedHotStandbyActive = false;
|
||||
XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
|
||||
SpinLockInit(&XLogCtl->info_lck);
|
||||
InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
|
||||
@ -6790,8 +6802,6 @@ StartupXLOG(void)
|
||||
static void
|
||||
CheckRecoveryConsistency(void)
|
||||
{
|
||||
static bool backendsAllowed = false;
|
||||
|
||||
/*
|
||||
* Have we passed our safe starting point?
|
||||
*/
|
||||
@ -6811,11 +6821,19 @@ CheckRecoveryConsistency(void)
|
||||
* enabling connections.
|
||||
*/
|
||||
if (standbyState == STANDBY_SNAPSHOT_READY &&
|
||||
!backendsAllowed &&
|
||||
!LocalHotStandbyActive &&
|
||||
reachedMinRecoveryPoint &&
|
||||
IsUnderPostmaster)
|
||||
{
|
||||
backendsAllowed = true;
|
||||
/* use volatile pointer to prevent code rearrangement */
|
||||
volatile XLogCtlData *xlogctl = XLogCtl;
|
||||
|
||||
SpinLockAcquire(&xlogctl->info_lck);
|
||||
xlogctl->SharedHotStandbyActive = true;
|
||||
SpinLockRelease(&xlogctl->info_lck);
|
||||
|
||||
LocalHotStandbyActive = true;
|
||||
|
||||
SendPostmasterSignal(PMSIGNAL_BEGIN_HOT_STANDBY);
|
||||
}
|
||||
}
|
||||
@ -6862,6 +6880,38 @@ RecoveryInProgress(void)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Is HotStandby active yet? This is only important in special backends
|
||||
* since normal backends won't ever be able to connect until this returns
|
||||
* true. Postmaster knows this by way of signal, not via shared memory.
|
||||
*
|
||||
* Unlike testing standbyState, this works in any process that's connected to
|
||||
* shared memory.
|
||||
*/
|
||||
bool
|
||||
HotStandbyActive(void)
|
||||
{
|
||||
/*
|
||||
* We check shared state each time only until Hot Standby is active. We
|
||||
* can't de-activate Hot Standby, so there's no need to keep checking after
|
||||
* the shared variable has once been seen true.
|
||||
*/
|
||||
if (LocalHotStandbyActive)
|
||||
return true;
|
||||
else
|
||||
{
|
||||
/* use volatile pointer to prevent code rearrangement */
|
||||
volatile XLogCtlData *xlogctl = XLogCtl;
|
||||
|
||||
/* spinlock is essential on machines with weak memory ordering! */
|
||||
SpinLockAcquire(&xlogctl->info_lck);
|
||||
LocalHotStandbyActive = xlogctl->SharedHotStandbyActive;
|
||||
SpinLockRelease(&xlogctl->info_lck);
|
||||
|
||||
return LocalHotStandbyActive;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Is this process allowed to insert new WAL records?
|
||||
*
|
||||
|
@ -38,6 +38,7 @@
|
||||
#include <signal.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "access/transam.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "libpq/pqsignal.h"
|
||||
#include "miscadmin.h"
|
||||
@ -45,6 +46,7 @@
|
||||
#include "replication/walreceiver.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/pmsignal.h"
|
||||
#include "storage/procarray.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/memutils.h"
|
||||
@ -56,6 +58,7 @@ bool am_walreceiver;
|
||||
|
||||
/* GUC variable */
|
||||
int wal_receiver_status_interval;
|
||||
bool hot_standby_feedback;
|
||||
|
||||
/* libpqreceiver hooks to these when loaded */
|
||||
walrcv_connect_type walrcv_connect = NULL;
|
||||
@ -610,16 +613,43 @@ XLogWalRcvSendReply(void)
|
||||
wal_receiver_status_interval * 1000))
|
||||
return;
|
||||
|
||||
/* Construct a new message. */
|
||||
/* Construct a new message */
|
||||
reply_message.write = LogstreamResult.Write;
|
||||
reply_message.flush = LogstreamResult.Flush;
|
||||
reply_message.apply = GetXLogReplayRecPtr();
|
||||
reply_message.sendTime = now;
|
||||
|
||||
elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
|
||||
/*
|
||||
* Get the OldestXmin and its associated epoch
|
||||
*/
|
||||
if (hot_standby_feedback && HotStandbyActive())
|
||||
{
|
||||
TransactionId nextXid;
|
||||
uint32 nextEpoch;
|
||||
|
||||
reply_message.xmin = GetOldestXmin(true, false);
|
||||
|
||||
/*
|
||||
* Get epoch and adjust if nextXid and oldestXmin are different
|
||||
* sides of the epoch boundary.
|
||||
*/
|
||||
GetNextXidAndEpoch(&nextXid, &nextEpoch);
|
||||
if (nextXid < reply_message.xmin)
|
||||
nextEpoch--;
|
||||
reply_message.epoch = nextEpoch;
|
||||
}
|
||||
else
|
||||
{
|
||||
reply_message.xmin = InvalidTransactionId;
|
||||
reply_message.epoch = 0;
|
||||
}
|
||||
|
||||
elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
|
||||
reply_message.write.xlogid, reply_message.write.xrecoff,
|
||||
reply_message.flush.xlogid, reply_message.flush.xrecoff,
|
||||
reply_message.apply.xlogid, reply_message.apply.xrecoff);
|
||||
reply_message.apply.xlogid, reply_message.apply.xrecoff,
|
||||
reply_message.xmin,
|
||||
reply_message.epoch);
|
||||
|
||||
/* Prepend with the message type and send it. */
|
||||
buf[0] = 'r';
|
||||
|
@ -53,6 +53,7 @@
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/pmsignal.h"
|
||||
#include "storage/proc.h"
|
||||
#include "storage/procarray.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/guc.h"
|
||||
@ -502,6 +503,7 @@ ProcessStandbyReplyMessage(void)
|
||||
{
|
||||
StandbyReplyMessage reply;
|
||||
char msgtype;
|
||||
TransactionId newxmin = InvalidTransactionId;
|
||||
|
||||
resetStringInfo(&reply_message);
|
||||
|
||||
@ -531,10 +533,12 @@ ProcessStandbyReplyMessage(void)
|
||||
|
||||
pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
|
||||
|
||||
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X ",
|
||||
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
|
||||
reply.write.xlogid, reply.write.xrecoff,
|
||||
reply.flush.xlogid, reply.flush.xrecoff,
|
||||
reply.apply.xlogid, reply.apply.xrecoff);
|
||||
reply.apply.xlogid, reply.apply.xrecoff,
|
||||
reply.xmin,
|
||||
reply.epoch);
|
||||
|
||||
/*
|
||||
* Update shared state for this WalSender process
|
||||
@ -550,6 +554,69 @@ ProcessStandbyReplyMessage(void)
|
||||
walsnd->apply = reply.apply;
|
||||
SpinLockRelease(&walsnd->mutex);
|
||||
}
|
||||
|
||||
/*
|
||||
* Update the WalSender's proc xmin to allow it to be visible
|
||||
* to snapshots. This will hold back the removal of dead rows
|
||||
* and thereby prevent the generation of cleanup conflicts
|
||||
* on the standby server.
|
||||
*/
|
||||
if (TransactionIdIsValid(reply.xmin))
|
||||
{
|
||||
TransactionId nextXid;
|
||||
uint32 nextEpoch;
|
||||
bool epochOK;
|
||||
|
||||
GetNextXidAndEpoch(&nextXid, &nextEpoch);
|
||||
|
||||
/*
|
||||
* Epoch of oldestXmin should be same as standby or
|
||||
* if the counter has wrapped, then one less than reply.
|
||||
*/
|
||||
if (reply.xmin <= nextXid)
|
||||
{
|
||||
if (reply.epoch == nextEpoch)
|
||||
epochOK = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (nextEpoch > 0 && reply.epoch == nextEpoch - 1)
|
||||
epochOK = true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Feedback from standby must not go backwards, nor should it go
|
||||
* forwards further than our most recent xid.
|
||||
*/
|
||||
if (epochOK && TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
|
||||
{
|
||||
if (!TransactionIdIsValid(MyProc->xmin))
|
||||
{
|
||||
TransactionId oldestXmin = GetOldestXmin(true, true);
|
||||
if (TransactionIdPrecedes(oldestXmin, reply.xmin))
|
||||
newxmin = reply.xmin;
|
||||
else
|
||||
newxmin = oldestXmin;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (TransactionIdPrecedes(MyProc->xmin, reply.xmin))
|
||||
newxmin = reply.xmin;
|
||||
else
|
||||
newxmin = MyProc->xmin; /* stay the same */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Grab the ProcArrayLock to set xmin, or invalidate for bad reply
|
||||
*/
|
||||
if (MyProc->xmin != newxmin)
|
||||
{
|
||||
LWLockAcquire(ProcArrayLock, LW_SHARED);
|
||||
MyProc->xmin = newxmin;
|
||||
LWLockRelease(ProcArrayLock);
|
||||
}
|
||||
}
|
||||
|
||||
/* Main loop of walsender process */
|
||||
|
@ -1034,7 +1034,9 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
|
||||
if (ignoreVacuum && (proc->vacuumFlags & PROC_IN_VACUUM))
|
||||
continue;
|
||||
|
||||
if (allDbs || proc->databaseId == MyDatabaseId)
|
||||
if (allDbs ||
|
||||
proc->databaseId == MyDatabaseId ||
|
||||
proc->databaseId == 0) /* include WalSender */
|
||||
{
|
||||
/* Fetch xid just once - see GetNewTransactionId */
|
||||
TransactionId xid = proc->xid;
|
||||
@ -1066,28 +1068,35 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
|
||||
*/
|
||||
TransactionId kaxmin = KnownAssignedXidsGetOldestXmin();
|
||||
|
||||
LWLockRelease(ProcArrayLock);
|
||||
|
||||
if (TransactionIdIsNormal(kaxmin) &&
|
||||
TransactionIdPrecedes(kaxmin, result))
|
||||
result = kaxmin;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* No other information needed, so release the lock immediately.
|
||||
*/
|
||||
LWLockRelease(ProcArrayLock);
|
||||
|
||||
LWLockRelease(ProcArrayLock);
|
||||
|
||||
/*
|
||||
* Compute the cutoff XID, being careful not to generate a "permanent"
|
||||
* XID.
|
||||
*
|
||||
* vacuum_defer_cleanup_age provides some additional "slop" for the
|
||||
* benefit of hot standby queries on slave servers. This is quick and
|
||||
* dirty, and perhaps not all that useful unless the master has a
|
||||
* predictable transaction rate, but it's what we've got. Note that we
|
||||
* are assuming vacuum_defer_cleanup_age isn't large enough to cause
|
||||
* wraparound --- so guc.c should limit it to no more than the
|
||||
* xidStopLimit threshold in varsup.c.
|
||||
*/
|
||||
result -= vacuum_defer_cleanup_age;
|
||||
if (!TransactionIdIsNormal(result))
|
||||
result = FirstNormalTransactionId;
|
||||
/*
|
||||
* Compute the cutoff XID, being careful not to generate a "permanent"
|
||||
* XID. We need do this only on the primary, never on standby.
|
||||
*
|
||||
* vacuum_defer_cleanup_age provides some additional "slop" for the
|
||||
* benefit of hot standby queries on slave servers. This is quick and
|
||||
* dirty, and perhaps not all that useful unless the master has a
|
||||
* predictable transaction rate, but it's what we've got. Note that we
|
||||
* are assuming vacuum_defer_cleanup_age isn't large enough to cause
|
||||
* wraparound --- so guc.c should limit it to no more than the
|
||||
* xidStopLimit threshold in varsup.c.
|
||||
*/
|
||||
result -= vacuum_defer_cleanup_age;
|
||||
if (!TransactionIdIsNormal(result))
|
||||
result = FirstNormalTransactionId;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
@ -1278,6 +1278,15 @@ static struct config_bool ConfigureNamesBool[] =
|
||||
false, NULL, NULL
|
||||
},
|
||||
|
||||
{
|
||||
{"hot_standby_feedback", PGC_SIGHUP, WAL_STANDBY_SERVERS,
|
||||
gettext_noop("Allows feedback from a hot standby primary that will avoid query conflicts."),
|
||||
NULL
|
||||
},
|
||||
&hot_standby_feedback,
|
||||
false, NULL, NULL
|
||||
},
|
||||
|
||||
{
|
||||
{"allow_system_table_mods", PGC_POSTMASTER, DEVELOPER_OPTIONS,
|
||||
gettext_noop("Allows modifications of the structure of system tables."),
|
||||
|
@ -196,6 +196,7 @@
|
||||
|
||||
#hot_standby = off # "on" allows queries during recovery
|
||||
# (change requires restart)
|
||||
#hot_standby_feedback = off # info from standby to prevent query conflicts
|
||||
#max_standby_archive_delay = 30s # max delay before canceling queries
|
||||
# when reading WAL from archive;
|
||||
# -1 allows indefinite delay
|
||||
|
Reference in New Issue
Block a user