1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-02 09:02:37 +03:00

Introduce xid8-based functions to replace txid_XXX.

The txid_XXX family of fmgr functions exposes 64 bit transaction IDs to
users as int8.  Now that we have an SQL type xid8 for FullTransactionId,
define a new set of functions including pg_current_xact_id() and
pg_current_snapshot() based on that.  Keep the old functions around too,
for now.

It's a bit sneaky to use the same C functions for both, but since the
binary representation is identical except for the signedness of the
type, and since older functions are the ones using the wrong signedness,
and since we'll presumably drop the older ones after a reasonable period
of time, it seems reasonable to switch to FullTransactionId internally
and share the code for both.

Reviewed-by: Fujii Masao <masao.fujii@oss.nttdata.com>
Reviewed-by: Takao Fujii <btfujiitkp@oss.nttdata.com>
Reviewed-by: Yoshikazu Imai <imai.yoshikazu@fujitsu.com>
Reviewed-by: Mark Dilger <mark.dilger@enterprisedb.com>
Discussion: https://postgr.es/m/20190725000636.666m5mad25wfbrri%40alap3.anarazel.de
This commit is contained in:
Thomas Munro
2020-04-07 11:33:56 +12:00
parent aeec457de8
commit 4c04be9b05
36 changed files with 871 additions and 355 deletions

View File

@ -101,7 +101,6 @@ OBJS = \
tsvector.o \
tsvector_op.o \
tsvector_parser.o \
txid.o \
uuid.o \
varbit.o \
varchar.o \
@ -109,6 +108,7 @@ OBJS = \
version.o \
windowfuncs.o \
xid.o \
xid8funcs.o \
xml.o
jsonpath_scan.c: FLEXFLAGS = -CF -p -p

View File

@ -1,20 +1,25 @@
/*-------------------------------------------------------------------------
* txid.c
* xid8funcs.c
*
* Export internal transaction IDs to user level.
*
* Note that only top-level transaction IDs are ever converted to TXID.
* This is important because TXIDs frequently persist beyond the global
* Note that only top-level transaction IDs are exposed to user sessions.
* This is important because xid8s frequently persist beyond the global
* xmin horizon, or may even be shipped to other machines, so we cannot
* rely on being able to correlate subtransaction IDs with their parents
* via functions such as SubTransGetTopmostTransaction().
*
* These functions are used to support the txid_XXX functions and the newer
* pg_current_xact, pg_current_snapshot and related fmgr functions, since the
* only difference between them is whether they expose xid8 or int8 values to
* users. The txid_XXX variants should eventually be dropped.
*
*
* Copyright (c) 2003-2020, PostgreSQL Global Development Group
* Author: Jan Wieck, Afilias USA INC.
* 64-bit txids: Marko Kreen, Skype Technologies
*
* src/backend/utils/adt/txid.c
* src/backend/utils/adt/xid8funcs.c
*
*-------------------------------------------------------------------------
*/
@ -34,25 +39,18 @@
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/xid8.h"
/* txid will be signed int8 in database, so must limit to 63 bits */
#define MAX_TXID ((uint64) PG_INT64_MAX)
/* Use unsigned variant internally */
typedef uint64 txid;
/* sprintf format code for uint64 */
#define TXID_FMT UINT64_FORMAT
/*
* If defined, use bsearch() function for searching for txids in snapshots
* If defined, use bsearch() function for searching for xid8s in snapshots
* that have more than the specified number of values.
*/
#define USE_BSEARCH_IF_NXIP_GREATER 30
/*
* Snapshot containing 8byte txids.
* Snapshot containing FullTransactionIds.
*/
typedef struct
{
@ -63,39 +61,17 @@ typedef struct
*/
int32 __varsz;
uint32 nxip; /* number of txids in xip array */
txid xmin;
txid xmax;
/* in-progress txids, xmin <= xip[i] < xmax: */
txid xip[FLEXIBLE_ARRAY_MEMBER];
} TxidSnapshot;
uint32 nxip; /* number of fxids in xip array */
FullTransactionId xmin;
FullTransactionId xmax;
/* in-progress fxids, xmin <= xip[i] < xmax: */
FullTransactionId xip[FLEXIBLE_ARRAY_MEMBER];
} pg_snapshot;
#define TXID_SNAPSHOT_SIZE(nxip) \
(offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
#define TXID_SNAPSHOT_MAX_NXIP \
((MaxAllocSize - offsetof(TxidSnapshot, xip)) / sizeof(txid))
/*
* Epoch values from xact.c
*/
typedef struct
{
TransactionId last_xid;
uint32 epoch;
} TxidEpoch;
/*
* Fetch epoch data from xact.c.
*/
static void
load_xid_epoch(TxidEpoch *state)
{
FullTransactionId fullXid = ReadNextFullTransactionId();
state->last_xid = XidFromFullTransactionId(fullXid);
state->epoch = EpochFromFullTransactionId(fullXid);
}
#define PG_SNAPSHOT_SIZE(nxip) \
(offsetof(pg_snapshot, xip) + sizeof(FullTransactionId) * (nxip))
#define PG_SNAPSHOT_MAX_NXIP \
((MaxAllocSize - offsetof(pg_snapshot, xip)) / sizeof(FullTransactionId))
/*
* Helper to get a TransactionId from a 64-bit xid with wraparound detection.
@ -111,10 +87,10 @@ load_xid_epoch(TxidEpoch *state)
* relating to those XIDs.
*/
static bool
TransactionIdInRecentPast(uint64 xid_with_epoch, TransactionId *extracted_xid)
TransactionIdInRecentPast(FullTransactionId fxid, TransactionId *extracted_xid)
{
uint32 xid_epoch = (uint32) (xid_with_epoch >> 32);
TransactionId xid = (TransactionId) xid_with_epoch;
uint32 xid_epoch = EpochFromFullTransactionId(fxid);
TransactionId xid = XidFromFullTransactionId(fxid);
uint32 now_epoch;
TransactionId now_epoch_next_xid;
FullTransactionId now_fullxid;
@ -134,11 +110,12 @@ TransactionIdInRecentPast(uint64 xid_with_epoch, TransactionId *extracted_xid)
return true;
/* If the transaction ID is in the future, throw an error. */
if (xid_with_epoch >= U64FromFullTransactionId(now_fullxid))
if (!FullTransactionIdPrecedes(fxid, now_fullxid))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("transaction ID %s is in the future",
psprintf(UINT64_FORMAT, xid_with_epoch))));
psprintf(UINT64_FORMAT,
U64FromFullTransactionId(fxid)))));
/*
* ShmemVariableCache->oldestClogXid is protected by CLogTruncationLock,
@ -164,41 +141,46 @@ TransactionIdInRecentPast(uint64 xid_with_epoch, TransactionId *extracted_xid)
}
/*
* do a TransactionId -> txid conversion for an XID near the given epoch
* Convert a TransactionId obtained from a snapshot held by the caller to a
* FullTransactionId. Use next_fxid as a reference FullTransactionId, so that
* we can compute the high order bits. It must have been obtained by the
* caller with ReadNextFullTransactionId() after the snapshot was created.
*/
static txid
convert_xid(TransactionId xid, const TxidEpoch *state)
static FullTransactionId
widen_snapshot_xid(TransactionId xid, FullTransactionId next_fxid)
{
uint64 epoch;
TransactionId next_xid = XidFromFullTransactionId(next_fxid);
uint32 epoch = EpochFromFullTransactionId(next_fxid);
/* return special xid's as-is */
/* Special transaction ID. */
if (!TransactionIdIsNormal(xid))
return (txid) xid;
return FullTransactionIdFromEpochAndXid(0, xid);
/* xid can be on either side when near wrap-around */
epoch = (uint64) state->epoch;
if (xid > state->last_xid &&
TransactionIdPrecedes(xid, state->last_xid))
/*
* The 64 bit result must be <= next_fxid, since next_fxid hadn't been
* issued yet when the snapshot was created. Every TransactionId in the
* snapshot must therefore be from the same epoch as next_fxid, or the
* epoch before. We know this because next_fxid is never allow to get
* more than one epoch ahead of the TransactionIds in any snapshot.
*/
if (xid > next_xid)
epoch--;
else if (xid < state->last_xid &&
TransactionIdFollows(xid, state->last_xid))
epoch++;
return (epoch << 32) | xid;
return FullTransactionIdFromEpochAndXid(epoch, xid);
}
/*
* txid comparator for qsort/bsearch
*/
static int
cmp_txid(const void *aa, const void *bb)
cmp_fxid(const void *aa, const void *bb)
{
txid a = *(const txid *) aa;
txid b = *(const txid *) bb;
FullTransactionId a = *(const FullTransactionId *) aa;
FullTransactionId b = *(const FullTransactionId *) bb;
if (a < b)
if (FullTransactionIdPrecedes(a, b))
return -1;
if (a > b)
if (FullTransactionIdPrecedes(b, a))
return 1;
return 0;
}
@ -211,31 +193,33 @@ cmp_txid(const void *aa, const void *bb)
* will not be used.
*/
static void
sort_snapshot(TxidSnapshot *snap)
sort_snapshot(pg_snapshot *snap)
{
if (snap->nxip > 1)
{
qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
snap->nxip = qunique(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
qsort(snap->xip, snap->nxip, sizeof(FullTransactionId), cmp_fxid);
snap->nxip = qunique(snap->xip, snap->nxip, sizeof(FullTransactionId),
cmp_fxid);
}
}
/*
* check txid visibility.
* check fxid visibility.
*/
static bool
is_visible_txid(txid value, const TxidSnapshot *snap)
is_visible_fxid(FullTransactionId value, const pg_snapshot *snap)
{
if (value < snap->xmin)
if (FullTransactionIdPrecedes(value, snap->xmin))
return true;
else if (value >= snap->xmax)
else if (!FullTransactionIdPrecedes(value, snap->xmax))
return false;
#ifdef USE_BSEARCH_IF_NXIP_GREATER
else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
{
void *res;
res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
res = bsearch(&value, snap->xip, snap->nxip, sizeof(FullTransactionId),
cmp_fxid);
/* if found, transaction is still in progress */
return (res) ? false : true;
}
@ -246,7 +230,7 @@ is_visible_txid(txid value, const TxidSnapshot *snap)
for (i = 0; i < snap->nxip; i++)
{
if (value == snap->xip[i])
if (FullTransactionIdEquals(value, snap->xip[i]))
return false;
}
return true;
@ -254,13 +238,13 @@ is_visible_txid(txid value, const TxidSnapshot *snap)
}
/*
* helper functions to use StringInfo for TxidSnapshot creation.
* helper functions to use StringInfo for pg_snapshot creation.
*/
static StringInfo
buf_init(txid xmin, txid xmax)
buf_init(FullTransactionId xmin, FullTransactionId xmax)
{
TxidSnapshot snap;
pg_snapshot snap;
StringInfo buf;
snap.xmin = xmin;
@ -268,25 +252,25 @@ buf_init(txid xmin, txid xmax)
snap.nxip = 0;
buf = makeStringInfo();
appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0));
appendBinaryStringInfo(buf, (char *) &snap, PG_SNAPSHOT_SIZE(0));
return buf;
}
static void
buf_add_txid(StringInfo buf, txid xid)
buf_add_txid(StringInfo buf, FullTransactionId fxid)
{
TxidSnapshot *snap = (TxidSnapshot *) buf->data;
pg_snapshot *snap = (pg_snapshot *) buf->data;
/* do this before possible realloc */
snap->nxip++;
appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));
appendBinaryStringInfo(buf, (char *) &fxid, sizeof(fxid));
}
static TxidSnapshot *
static pg_snapshot *
buf_finalize(StringInfo buf)
{
TxidSnapshot *snap = (TxidSnapshot *) buf->data;
pg_snapshot *snap = (pg_snapshot *) buf->data;
SET_VARSIZE(snap, buf->len);
@ -297,68 +281,34 @@ buf_finalize(StringInfo buf)
return snap;
}
/*
* simple number parser.
*
* We return 0 on error, which is invalid value for txid.
*/
static txid
str2txid(const char *s, const char **endp)
{
txid val = 0;
txid cutoff = MAX_TXID / 10;
txid cutlim = MAX_TXID % 10;
for (; *s; s++)
{
unsigned d;
if (*s < '0' || *s > '9')
break;
d = *s - '0';
/*
* check for overflow
*/
if (val > cutoff || (val == cutoff && d > cutlim))
{
val = 0;
break;
}
val = val * 10 + d;
}
if (endp)
*endp = s;
return val;
}
/*
* parse snapshot from cstring
*/
static TxidSnapshot *
static pg_snapshot *
parse_snapshot(const char *str)
{
txid xmin;
txid xmax;
txid last_val = 0,
val;
FullTransactionId xmin;
FullTransactionId xmax;
FullTransactionId last_val = InvalidFullTransactionId;
FullTransactionId val;
const char *str_start = str;
const char *endp;
char *endp;
StringInfo buf;
xmin = str2txid(str, &endp);
xmin = FullTransactionIdFromU64(pg_strtouint64(str, &endp, 10));
if (*endp != ':')
goto bad_format;
str = endp + 1;
xmax = str2txid(str, &endp);
xmax = FullTransactionIdFromU64(pg_strtouint64(str, &endp, 10));
if (*endp != ':')
goto bad_format;
str = endp + 1;
/* it should look sane */
if (xmin == 0 || xmax == 0 || xmin > xmax)
if (!FullTransactionIdIsValid(xmin) ||
!FullTransactionIdIsValid(xmax) ||
FullTransactionIdPrecedes(xmax, xmin))
goto bad_format;
/* allocate buffer */
@ -368,15 +318,17 @@ parse_snapshot(const char *str)
while (*str != '\0')
{
/* read next value */
val = str2txid(str, &endp);
val = FullTransactionIdFromU64(pg_strtouint64(str, &endp, 10));
str = endp;
/* require the input to be in order */
if (val < xmin || val >= xmax || val < last_val)
if (FullTransactionIdPrecedes(val, xmin) ||
FullTransactionIdFollowsOrEquals(val, xmax) ||
FullTransactionIdPrecedes(val, last_val))
goto bad_format;
/* skip duplicates */
if (val != last_val)
if (!FullTransactionIdEquals(val, last_val))
buf_add_txid(buf, val);
last_val = val;
@ -392,108 +344,82 @@ bad_format:
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid input syntax for type %s: \"%s\"",
"txid_snapshot", str_start)));
"pg_snapshot", str_start)));
return NULL; /* keep compiler quiet */
}
/*
* Public functions.
* pg_current_xact_id() returns xid8
*
* txid_current() and txid_current_snapshot() are the only ones that
* communicate with core xid machinery. All the others work on data
* returned by them.
*/
/*
* txid_current() returns int8
*
* Return the current toplevel transaction ID as TXID
* Return the current toplevel full transaction ID.
* If the current transaction does not have one, one is assigned.
*
* This value has the epoch as the high 32 bits and the 32-bit xid
* as the low 32 bits.
*/
Datum
txid_current(PG_FUNCTION_ARGS)
pg_current_xact_id(PG_FUNCTION_ARGS)
{
txid val;
TxidEpoch state;
/*
* Must prevent during recovery because if an xid is not assigned we try
* to assign one, which would fail. Programs already rely on this function
* to always return a valid current xid, so we should not change this to
* return NULL or similar invalid xid.
*/
PreventCommandDuringRecovery("txid_current()");
PreventCommandDuringRecovery("pg_current_xact_id()");
load_xid_epoch(&state);
val = convert_xid(GetTopTransactionId(), &state);
PG_RETURN_INT64(val);
PG_RETURN_FULLTRANSACTIONID(GetTopFullTransactionId());
}
/*
* Same as txid_current() but doesn't assign a new xid if there isn't one
* yet.
* Same as pg_current_xact_if_assigned() but doesn't assign a new xid if there
* isn't one yet.
*/
Datum
txid_current_if_assigned(PG_FUNCTION_ARGS)
pg_current_xact_id_if_assigned(PG_FUNCTION_ARGS)
{
txid val;
TxidEpoch state;
TransactionId topxid = GetTopTransactionIdIfAny();
FullTransactionId topfxid = GetTopFullTransactionIdIfAny();
if (topxid == InvalidTransactionId)
if (!FullTransactionIdIsValid(topfxid))
PG_RETURN_NULL();
load_xid_epoch(&state);
val = convert_xid(topxid, &state);
PG_RETURN_INT64(val);
PG_RETURN_FULLTRANSACTIONID(topfxid);
}
/*
* txid_current_snapshot() returns txid_snapshot
* pg_current_snapshot() returns pg_snapshot
*
* Return current snapshot in TXID format
* Return current snapshot
*
* Note that only top-transaction XIDs are included in the snapshot.
*/
Datum
txid_current_snapshot(PG_FUNCTION_ARGS)
pg_current_snapshot(PG_FUNCTION_ARGS)
{
TxidSnapshot *snap;
pg_snapshot *snap;
uint32 nxip,
i;
TxidEpoch state;
Snapshot cur;
FullTransactionId next_fxid = ReadNextFullTransactionId();
cur = GetActiveSnapshot();
if (cur == NULL)
elog(ERROR, "no active snapshot set");
load_xid_epoch(&state);
/*
* Compile-time limits on the procarray (MAX_BACKENDS processes plus
* MAX_BACKENDS prepared transactions) guarantee nxip won't be too large.
*/
StaticAssertStmt(MAX_BACKENDS * 2 <= TXID_SNAPSHOT_MAX_NXIP,
"possible overflow in txid_current_snapshot()");
StaticAssertStmt(MAX_BACKENDS * 2 <= PG_SNAPSHOT_MAX_NXIP,
"possible overflow in pg_current_snapshot()");
/* allocate */
nxip = cur->xcnt;
snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
snap = palloc(PG_SNAPSHOT_SIZE(nxip));
/* fill */
snap->xmin = convert_xid(cur->xmin, &state);
snap->xmax = convert_xid(cur->xmax, &state);
snap->xmin = widen_snapshot_xid(cur->xmin, next_fxid);
snap->xmax = widen_snapshot_xid(cur->xmax, next_fxid);
snap->nxip = nxip;
for (i = 0; i < nxip; i++)
snap->xip[i] = convert_xid(cur->xip[i], &state);
snap->xip[i] = widen_snapshot_xid(cur->xip[i], next_fxid);
/*
* We want them guaranteed to be in ascending order. This also removes
@ -505,21 +431,21 @@ txid_current_snapshot(PG_FUNCTION_ARGS)
sort_snapshot(snap);
/* set size after sorting, because it may have removed duplicate xips */
SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(snap->nxip));
SET_VARSIZE(snap, PG_SNAPSHOT_SIZE(snap->nxip));
PG_RETURN_POINTER(snap);
}
/*
* txid_snapshot_in(cstring) returns txid_snapshot
* pg_snapshot_in(cstring) returns pg_snapshot
*
* input function for type txid_snapshot
* input function for type pg_snapshot
*/
Datum
txid_snapshot_in(PG_FUNCTION_ARGS)
pg_snapshot_in(PG_FUNCTION_ARGS)
{
char *str = PG_GETARG_CSTRING(0);
TxidSnapshot *snap;
pg_snapshot *snap;
snap = parse_snapshot(str);
@ -527,73 +453,81 @@ txid_snapshot_in(PG_FUNCTION_ARGS)
}
/*
* txid_snapshot_out(txid_snapshot) returns cstring
* pg_snapshot_out(pg_snapshot) returns cstring
*
* output function for type txid_snapshot
* output function for type pg_snapshot
*/
Datum
txid_snapshot_out(PG_FUNCTION_ARGS)
pg_snapshot_out(PG_FUNCTION_ARGS)
{
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
StringInfoData str;
uint32 i;
initStringInfo(&str);
appendStringInfo(&str, TXID_FMT ":", snap->xmin);
appendStringInfo(&str, TXID_FMT ":", snap->xmax);
appendStringInfo(&str, UINT64_FORMAT ":",
U64FromFullTransactionId(snap->xmin));
appendStringInfo(&str, UINT64_FORMAT ":",
U64FromFullTransactionId(snap->xmax));
for (i = 0; i < snap->nxip; i++)
{
if (i > 0)
appendStringInfoChar(&str, ',');
appendStringInfo(&str, TXID_FMT, snap->xip[i]);
appendStringInfo(&str, UINT64_FORMAT,
U64FromFullTransactionId(snap->xip[i]));
}
PG_RETURN_CSTRING(str.data);
}
/*
* txid_snapshot_recv(internal) returns txid_snapshot
* pg_snapshot_recv(internal) returns pg_snapshot
*
* binary input function for type txid_snapshot
* binary input function for type pg_snapshot
*
* format: int4 nxip, int8 xmin, int8 xmax, int8 xip
*/
Datum
txid_snapshot_recv(PG_FUNCTION_ARGS)
pg_snapshot_recv(PG_FUNCTION_ARGS)
{
StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
TxidSnapshot *snap;
txid last = 0;
pg_snapshot *snap;
FullTransactionId last = InvalidFullTransactionId;
int nxip;
int i;
txid xmin,
xmax;
FullTransactionId xmin;
FullTransactionId xmax;
/* load and validate nxip */
nxip = pq_getmsgint(buf, 4);
if (nxip < 0 || nxip > TXID_SNAPSHOT_MAX_NXIP)
if (nxip < 0 || nxip > PG_SNAPSHOT_MAX_NXIP)
goto bad_format;
xmin = pq_getmsgint64(buf);
xmax = pq_getmsgint64(buf);
if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
xmin = FullTransactionIdFromU64((uint64) pq_getmsgint64(buf));
xmax = FullTransactionIdFromU64((uint64) pq_getmsgint64(buf));
if (!FullTransactionIdIsValid(xmin) ||
!FullTransactionIdIsValid(xmax) ||
FullTransactionIdPrecedes(xmax, xmin))
goto bad_format;
snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
snap = palloc(PG_SNAPSHOT_SIZE(nxip));
snap->xmin = xmin;
snap->xmax = xmax;
for (i = 0; i < nxip; i++)
{
txid cur = pq_getmsgint64(buf);
FullTransactionId cur =
FullTransactionIdFromU64((uint64) pq_getmsgint64(buf));
if (cur < last || cur < xmin || cur >= xmax)
if (FullTransactionIdPrecedes(cur, last) ||
FullTransactionIdPrecedes(cur, xmin) ||
FullTransactionIdPrecedes(xmax, cur))
goto bad_format;
/* skip duplicate xips */
if (cur == last)
if (FullTransactionIdEquals(cur, last))
{
i--;
nxip--;
@ -604,95 +538,95 @@ txid_snapshot_recv(PG_FUNCTION_ARGS)
last = cur;
}
snap->nxip = nxip;
SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
SET_VARSIZE(snap, PG_SNAPSHOT_SIZE(nxip));
PG_RETURN_POINTER(snap);
bad_format:
ereport(ERROR,
(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
errmsg("invalid external txid_snapshot data")));
errmsg("invalid external pg_snapshot data")));
PG_RETURN_POINTER(NULL); /* keep compiler quiet */
}
/*
* txid_snapshot_send(txid_snapshot) returns bytea
* pg_snapshot_send(pg_snapshot) returns bytea
*
* binary output function for type txid_snapshot
* binary output function for type pg_snapshot
*
* format: int4 nxip, int8 xmin, int8 xmax, int8 xip
* format: int4 nxip, u64 xmin, u64 xmax, u64 xip...
*/
Datum
txid_snapshot_send(PG_FUNCTION_ARGS)
pg_snapshot_send(PG_FUNCTION_ARGS)
{
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
StringInfoData buf;
uint32 i;
pq_begintypsend(&buf);
pq_sendint32(&buf, snap->nxip);
pq_sendint64(&buf, snap->xmin);
pq_sendint64(&buf, snap->xmax);
pq_sendint64(&buf, (int64) U64FromFullTransactionId(snap->xmin));
pq_sendint64(&buf, (int64) U64FromFullTransactionId(snap->xmax));
for (i = 0; i < snap->nxip; i++)
pq_sendint64(&buf, snap->xip[i]);
pq_sendint64(&buf, (int64) U64FromFullTransactionId(snap->xip[i]));
PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
}
/*
* txid_visible_in_snapshot(int8, txid_snapshot) returns bool
* pg_visible_in_snapshot(xid8, pg_snapshot) returns bool
*
* is txid visible in snapshot ?
*/
Datum
txid_visible_in_snapshot(PG_FUNCTION_ARGS)
pg_visible_in_snapshot(PG_FUNCTION_ARGS)
{
txid value = PG_GETARG_INT64(0);
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
FullTransactionId value = PG_GETARG_FULLTRANSACTIONID(0);
pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(1);
PG_RETURN_BOOL(is_visible_txid(value, snap));
PG_RETURN_BOOL(is_visible_fxid(value, snap));
}
/*
* txid_snapshot_xmin(txid_snapshot) returns int8
* pg_snapshot_xmin(pg_snapshot) returns xid8
*
* return snapshot's xmin
*/
Datum
txid_snapshot_xmin(PG_FUNCTION_ARGS)
pg_snapshot_xmin(PG_FUNCTION_ARGS)
{
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
PG_RETURN_INT64(snap->xmin);
PG_RETURN_FULLTRANSACTIONID(snap->xmin);
}
/*
* txid_snapshot_xmax(txid_snapshot) returns int8
* pg_snapshot_xmax(pg_snapshot) returns xid8
*
* return snapshot's xmax
*/
Datum
txid_snapshot_xmax(PG_FUNCTION_ARGS)
pg_snapshot_xmax(PG_FUNCTION_ARGS)
{
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
PG_RETURN_INT64(snap->xmax);
PG_RETURN_FULLTRANSACTIONID(snap->xmax);
}
/*
* txid_snapshot_xip(txid_snapshot) returns setof int8
* pg_snapshot_xip(pg_snapshot) returns setof xid8
*
* return in-progress TXIDs in snapshot.
* return in-progress xid8s in snapshot.
*/
Datum
txid_snapshot_xip(PG_FUNCTION_ARGS)
pg_snapshot_xip(PG_FUNCTION_ARGS)
{
FuncCallContext *fctx;
TxidSnapshot *snap;
txid value;
pg_snapshot *snap;
FullTransactionId value;
/* on first call initialize fctx and get copy of snapshot */
if (SRF_IS_FIRSTCALL())
{
TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
pg_snapshot *arg = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
fctx = SRF_FIRSTCALL_INIT();
@ -709,7 +643,7 @@ txid_snapshot_xip(PG_FUNCTION_ARGS)
if (fctx->call_cntr < snap->nxip)
{
value = snap->xip[fctx->call_cntr];
SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
SRF_RETURN_NEXT(fctx, FullTransactionIdGetDatum(value));
}
else
{
@ -728,10 +662,10 @@ txid_snapshot_xip(PG_FUNCTION_ARGS)
* though the parent xact may still be in progress or may have aborted.
*/
Datum
txid_status(PG_FUNCTION_ARGS)
pg_xact_status(PG_FUNCTION_ARGS)
{
const char *status;
uint64 xid_with_epoch = PG_GETARG_INT64(0);
FullTransactionId fxid = PG_GETARG_FULLTRANSACTIONID(0);
TransactionId xid;
/*
@ -739,7 +673,7 @@ txid_status(PG_FUNCTION_ARGS)
* an I/O error on SLRU lookup.
*/
LWLockAcquire(CLogTruncationLock, LW_SHARED);
if (TransactionIdInRecentPast(xid_with_epoch, &xid))
if (TransactionIdInRecentPast(fxid, &xid))
{
Assert(TransactionIdIsValid(xid));