mirror of
https://github.com/postgres/postgres.git
synced 2025-08-30 06:01:21 +03:00
Report catalog_xmin separately in hot_standby_feedback
If the upstream walsender is using a physical replication slot, store the catalog_xmin in the slot's catalog_xmin field. If the upstream doesn't use a slot and has only a PGPROC entry behaviour doesn't change, as we store the combined xmin and catalog_xmin in the PGPROC entry. Author: Craig Ringer
This commit is contained in:
@@ -1175,8 +1175,8 @@ XLogWalRcvSendHSFeedback(bool immed)
|
||||
{
|
||||
TimestampTz now;
|
||||
TransactionId nextXid;
|
||||
uint32 nextEpoch;
|
||||
TransactionId xmin;
|
||||
uint32 xmin_epoch, catalog_xmin_epoch;
|
||||
TransactionId xmin, catalog_xmin;
|
||||
static TimestampTz sendTime = 0;
|
||||
/* initially true so we always send at least one feedback message */
|
||||
static bool master_has_standby_xmin = true;
|
||||
@@ -1221,29 +1221,54 @@ XLogWalRcvSendHSFeedback(bool immed)
|
||||
* everything else has been checked.
|
||||
*/
|
||||
if (hot_standby_feedback)
|
||||
xmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT);
|
||||
{
|
||||
TransactionId slot_xmin;
|
||||
|
||||
/*
|
||||
* Usually GetOldestXmin() would include both global replication slot
|
||||
* xmin and catalog_xmin in its calculations, but we want to derive
|
||||
* separate values for each of those. So we ask for an xmin that
|
||||
* excludes the catalog_xmin.
|
||||
*/
|
||||
xmin = GetOldestXmin(NULL,
|
||||
PROCARRAY_FLAGS_DEFAULT|PROCARRAY_SLOTS_XMIN);
|
||||
|
||||
ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
|
||||
|
||||
if (TransactionIdIsValid(slot_xmin) &&
|
||||
TransactionIdPrecedes(slot_xmin, xmin))
|
||||
xmin = slot_xmin;
|
||||
}
|
||||
else
|
||||
{
|
||||
xmin = InvalidTransactionId;
|
||||
catalog_xmin = InvalidTransactionId;
|
||||
}
|
||||
|
||||
/*
|
||||
* Get epoch and adjust if nextXid and oldestXmin are different sides of
|
||||
* the epoch boundary.
|
||||
*/
|
||||
GetNextXidAndEpoch(&nextXid, &nextEpoch);
|
||||
GetNextXidAndEpoch(&nextXid, &xmin_epoch);
|
||||
catalog_xmin_epoch = xmin_epoch;
|
||||
if (nextXid < xmin)
|
||||
nextEpoch--;
|
||||
xmin_epoch --;
|
||||
if (nextXid < catalog_xmin)
|
||||
catalog_xmin_epoch --;
|
||||
|
||||
elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
|
||||
xmin, nextEpoch);
|
||||
elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
|
||||
xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
|
||||
|
||||
/* Construct the message and send it. */
|
||||
resetStringInfo(&reply_message);
|
||||
pq_sendbyte(&reply_message, 'h');
|
||||
pq_sendint64(&reply_message, GetCurrentTimestamp());
|
||||
pq_sendint(&reply_message, xmin, 4);
|
||||
pq_sendint(&reply_message, nextEpoch, 4);
|
||||
pq_sendint(&reply_message, xmin_epoch, 4);
|
||||
pq_sendint(&reply_message, catalog_xmin, 4);
|
||||
pq_sendint(&reply_message, catalog_xmin_epoch, 4);
|
||||
walrcv_send(wrconn, reply_message.data, reply_message.len);
|
||||
if (TransactionIdIsValid(xmin))
|
||||
if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
|
||||
master_has_standby_xmin = true;
|
||||
else
|
||||
master_has_standby_xmin = false;
|
||||
|
@@ -242,6 +242,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran
|
||||
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
|
||||
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
|
||||
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
|
||||
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
|
||||
|
||||
static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
|
||||
|
||||
@@ -1756,7 +1757,7 @@ ProcessStandbyReplyMessage(void)
|
||||
|
||||
/* compute new replication slot xmin horizon if needed */
|
||||
static void
|
||||
PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
|
||||
PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
|
||||
{
|
||||
bool changed = false;
|
||||
ReplicationSlot *slot = MyReplicationSlot;
|
||||
@@ -1777,6 +1778,14 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
|
||||
slot->data.xmin = feedbackXmin;
|
||||
slot->effective_xmin = feedbackXmin;
|
||||
}
|
||||
if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
|
||||
!TransactionIdIsNormal(feedbackCatalogXmin) ||
|
||||
TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
|
||||
{
|
||||
changed = true;
|
||||
slot->data.catalog_xmin = feedbackCatalogXmin;
|
||||
slot->effective_catalog_xmin = feedbackCatalogXmin;
|
||||
}
|
||||
SpinLockRelease(&slot->mutex);
|
||||
|
||||
if (changed)
|
||||
@@ -1786,60 +1795,93 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Check that the provided xmin/epoch are sane, that is, not in the future
|
||||
* and not so far back as to be already wrapped around.
|
||||
*
|
||||
* Epoch of nextXid should be same as standby, or if the counter has
|
||||
* wrapped, then one greater than standby.
|
||||
*
|
||||
* This check doesn't care about whether clog exists for these xids
|
||||
* at all.
|
||||
*/
|
||||
static bool
|
||||
TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
|
||||
{
|
||||
TransactionId nextXid;
|
||||
uint32 nextEpoch;
|
||||
|
||||
GetNextXidAndEpoch(&nextXid, &nextEpoch);
|
||||
|
||||
if (xid <= nextXid)
|
||||
{
|
||||
if (epoch != nextEpoch)
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (epoch + 1 != nextEpoch)
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!TransactionIdPrecedesOrEquals(xid, nextXid))
|
||||
return false; /* epoch OK, but it's wrapped around */
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Hot Standby feedback
|
||||
*/
|
||||
static void
|
||||
ProcessStandbyHSFeedbackMessage(void)
|
||||
{
|
||||
TransactionId nextXid;
|
||||
uint32 nextEpoch;
|
||||
TransactionId feedbackXmin;
|
||||
uint32 feedbackEpoch;
|
||||
TransactionId feedbackCatalogXmin;
|
||||
uint32 feedbackCatalogEpoch;
|
||||
|
||||
/*
|
||||
* Decipher the reply message. The caller already consumed the msgtype
|
||||
* byte.
|
||||
* byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
|
||||
* of this message.
|
||||
*/
|
||||
(void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
|
||||
feedbackXmin = pq_getmsgint(&reply_message, 4);
|
||||
feedbackEpoch = pq_getmsgint(&reply_message, 4);
|
||||
feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
|
||||
feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
|
||||
|
||||
elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
|
||||
elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
|
||||
feedbackXmin,
|
||||
feedbackEpoch);
|
||||
feedbackEpoch,
|
||||
feedbackCatalogXmin,
|
||||
feedbackCatalogEpoch);
|
||||
|
||||
/* Unset WalSender's xmin if the feedback message value is invalid */
|
||||
if (!TransactionIdIsNormal(feedbackXmin))
|
||||
/*
|
||||
* Unset WalSender's xmins if the feedback message values are invalid.
|
||||
* This happens when the downstream turned hot_standby_feedback off.
|
||||
*/
|
||||
if (!TransactionIdIsNormal(feedbackXmin)
|
||||
&& !TransactionIdIsNormal(feedbackCatalogXmin))
|
||||
{
|
||||
MyPgXact->xmin = InvalidTransactionId;
|
||||
if (MyReplicationSlot != NULL)
|
||||
PhysicalReplicationSlotNewXmin(feedbackXmin);
|
||||
PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Check that the provided xmin/epoch are sane, that is, not in the future
|
||||
* and not so far back as to be already wrapped around. Ignore if not.
|
||||
*
|
||||
* Epoch of nextXid should be same as standby, or if the counter has
|
||||
* wrapped, then one greater than standby.
|
||||
*/
|
||||
GetNextXidAndEpoch(&nextXid, &nextEpoch);
|
||||
if (TransactionIdIsNormal(feedbackXmin) &&
|
||||
!TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
|
||||
return;
|
||||
|
||||
if (feedbackXmin <= nextXid)
|
||||
{
|
||||
if (feedbackEpoch != nextEpoch)
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (feedbackEpoch + 1 != nextEpoch)
|
||||
return;
|
||||
}
|
||||
|
||||
if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
|
||||
return; /* epoch OK, but it's wrapped around */
|
||||
if (TransactionIdIsNormal(feedbackCatalogXmin) &&
|
||||
!TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
|
||||
return;
|
||||
|
||||
/*
|
||||
* Set the WalSender's xmin equal to the standby's requested xmin, so that
|
||||
@@ -1864,15 +1906,23 @@ ProcessStandbyHSFeedbackMessage(void)
|
||||
* already since a VACUUM could have just finished calling GetOldestXmin.)
|
||||
*
|
||||
* If we're using a replication slot we reserve the xmin via that,
|
||||
* otherwise via the walsender's PGXACT entry.
|
||||
* otherwise via the walsender's PGXACT entry. We can only track the
|
||||
* catalog xmin separately when using a slot, so we store the least
|
||||
* of the two provided when not using a slot.
|
||||
*
|
||||
* XXX: It might make sense to generalize the ephemeral slot concept and
|
||||
* always use the slot mechanism to handle the feedback xmin.
|
||||
*/
|
||||
if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
|
||||
PhysicalReplicationSlotNewXmin(feedbackXmin);
|
||||
PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
|
||||
else
|
||||
MyPgXact->xmin = feedbackXmin;
|
||||
{
|
||||
if (TransactionIdIsNormal(feedbackCatalogXmin)
|
||||
&& TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
|
||||
MyPgXact->xmin = feedbackCatalogXmin;
|
||||
else
|
||||
MyPgXact->xmin = feedbackXmin;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
@@ -1264,6 +1264,10 @@ TransactionIdIsActive(TransactionId xid)
|
||||
* corresponding flags is set. Typically, if you want to ignore ones with
|
||||
* PROC_IN_VACUUM flag, you can use PROCARRAY_FLAGS_VACUUM.
|
||||
*
|
||||
* PROCARRAY_SLOTS_XMIN causes GetOldestXmin to ignore the xmin and
|
||||
* catalog_xmin of any replication slots that exist in the system when
|
||||
* calculating the oldest xmin.
|
||||
*
|
||||
* This is used by VACUUM to decide which deleted tuples must be preserved in
|
||||
* the passed in table. For shared relations backends in all databases must be
|
||||
* considered, but for non-shared relations that's not required, since only
|
||||
@@ -1342,7 +1346,7 @@ GetOldestXmin(Relation rel, int flags)
|
||||
volatile PGPROC *proc = &allProcs[pgprocno];
|
||||
volatile PGXACT *pgxact = &allPgXact[pgprocno];
|
||||
|
||||
if (pgxact->vacuumFlags & flags)
|
||||
if (pgxact->vacuumFlags & (flags & PROCARRAY_PROC_FLAGS_MASK))
|
||||
continue;
|
||||
|
||||
if (allDbs ||
|
||||
@@ -1418,7 +1422,8 @@ GetOldestXmin(Relation rel, int flags)
|
||||
/*
|
||||
* Check whether there are replication slots requiring an older xmin.
|
||||
*/
|
||||
if (TransactionIdIsValid(replication_slot_xmin) &&
|
||||
if (!(flags & PROCARRAY_SLOTS_XMIN) &&
|
||||
TransactionIdIsValid(replication_slot_xmin) &&
|
||||
NormalTransactionIdPrecedes(replication_slot_xmin, result))
|
||||
result = replication_slot_xmin;
|
||||
|
||||
@@ -1428,7 +1433,8 @@ GetOldestXmin(Relation rel, int flags)
|
||||
* possible. We need to do so if we're computing the global limit (rel =
|
||||
* NULL) or if the passed relation is a catalog relation of some kind.
|
||||
*/
|
||||
if ((rel == NULL ||
|
||||
if (!(flags & PROCARRAY_SLOTS_XMIN) &&
|
||||
(rel == NULL ||
|
||||
RelationIsAccessibleInLogicalDecoding(rel)) &&
|
||||
TransactionIdIsValid(replication_slot_catalog_xmin) &&
|
||||
NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
|
||||
|
Reference in New Issue
Block a user