1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-12 21:01:52 +03:00

Support synchronization of snapshots through an export/import procedure.

A transaction can export a snapshot with pg_export_snapshot(), and then
others can import it with SET TRANSACTION SNAPSHOT.  The data does not
leave the server so there are not security issues.  A snapshot can only
be imported while the exporting transaction is still running, and there
are some other restrictions.

I'm not totally convinced that we've covered all the bases for SSI (true
serializable) mode, but it works fine for lesser isolation modes.

Joachim Wieland, reviewed by Marko Tiikkaja, and rather heavily modified
by Tom Lane
This commit is contained in:
Tom Lane
2011-10-22 18:22:45 -04:00
parent b436c72f61
commit bb446b689b
17 changed files with 1030 additions and 34 deletions

View File

@ -15,12 +15,16 @@
* handle this reference as an internally-tracked registration, so that this
* module is entirely lower-level than ResourceOwners.
*
* Likewise, any snapshots that have been exported by pg_export_snapshot
* have regd_count = 1 and are counted in RegisteredSnapshots, but are not
* tracked by any resource owner.
*
* These arrangements let us reset MyProc->xmin when there are no snapshots
* referenced by this transaction. (One possible improvement would be to be
* able to advance Xmin when the snapshot with the earliest Xmin is no longer
* referenced. That's a bit harder though, it requires more locking, and
* anyway it should be rather uncommon to keep snapshots referenced for too
* long.)
* anyway it should be rather uncommon to keep temporary snapshots referenced
* for too long.)
*
*
* Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
@ -33,12 +37,16 @@
*/
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/transam.h"
#include "access/xact.h"
#include "miscadmin.h"
#include "storage/predicate.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/memutils.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/tqual.h"
@ -111,6 +119,15 @@ bool FirstSnapshotSet = false;
*/
static Snapshot FirstXactSnapshot = NULL;
/* Define pathname of exported-snapshot files */
#define SNAPSHOT_EXPORT_DIR "pg_snapshots"
#define XactExportFilePath(path, xid, num, suffix) \
snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%d%s", \
xid, num, suffix)
/* Current xact's exported snapshots (a list of Snapshot structs) */
static List *exportedSnapshots = NIL;
static Snapshot CopySnapshot(Snapshot snapshot);
static void FreeSnapshot(Snapshot snapshot);
@ -139,7 +156,8 @@ GetTransactionSnapshot(void)
* In transaction-snapshot mode, the first snapshot must live until
* end of xact regardless of what the caller does with it, so we must
* make a copy of it rather than returning CurrentSnapshotData
* directly.
* directly. Furthermore, if we're running in serializable mode,
* predicate.c needs to wrap the snapshot fetch in its own processing.
*/
if (IsolationUsesXactSnapshot())
{
@ -203,6 +221,88 @@ SnapshotSetCommandId(CommandId curcid)
SecondarySnapshot->curcid = curcid;
}
/*
* SetTransactionSnapshot
* Set the transaction's snapshot from an imported MVCC snapshot.
*
* Note that this is very closely tied to GetTransactionSnapshot --- it
* must take care of all the same considerations as the first-snapshot case
* in GetTransactionSnapshot.
*/
static void
SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
{
/* Caller should have checked this already */
Assert(!FirstSnapshotSet);
Assert(RegisteredSnapshots == 0);
Assert(FirstXactSnapshot == NULL);
/*
* Even though we are not going to use the snapshot it computes, we must
* call GetSnapshotData, for two reasons: (1) to be sure that
* CurrentSnapshotData's XID arrays have been allocated, and (2) to update
* RecentXmin and RecentGlobalXmin. (We could alternatively include those
* two variables in exported snapshot files, but it seems better to have
* snapshot importers compute reasonably up-to-date values for them.)
*/
CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
/*
* Now copy appropriate fields from the source snapshot.
*/
CurrentSnapshot->xmin = sourcesnap->xmin;
CurrentSnapshot->xmax = sourcesnap->xmax;
CurrentSnapshot->xcnt = sourcesnap->xcnt;
Assert(sourcesnap->xcnt <= GetMaxSnapshotXidCount());
memcpy(CurrentSnapshot->xip, sourcesnap->xip,
sourcesnap->xcnt * sizeof(TransactionId));
CurrentSnapshot->subxcnt = sourcesnap->subxcnt;
Assert(sourcesnap->subxcnt <= GetMaxSnapshotSubxidCount());
memcpy(CurrentSnapshot->subxip, sourcesnap->subxip,
sourcesnap->subxcnt * sizeof(TransactionId));
CurrentSnapshot->suboverflowed = sourcesnap->suboverflowed;
CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery;
/* NB: curcid should NOT be copied, it's a local matter */
/*
* Now we have to fix what GetSnapshotData did with MyProc->xmin and
* TransactionXmin. There is a race condition: to make sure we are not
* causing the global xmin to go backwards, we have to test that the
* source transaction is still running, and that has to be done atomically.
* So let procarray.c do it.
*
* Note: in serializable mode, predicate.c will do this a second time.
* It doesn't seem worth contorting the logic here to avoid two calls,
* especially since it's not clear that predicate.c *must* do this.
*/
if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not import the requested snapshot"),
errdetail("The source transaction %u is not running anymore.",
sourcexid)));
/*
* In transaction-snapshot mode, the first snapshot must live until end of
* xact, so we must make a copy of it. Furthermore, if we're running in
* serializable mode, predicate.c needs to do its own processing.
*/
if (IsolationUsesXactSnapshot())
{
if (IsolationIsSerializable())
SetSerializableTransactionSnapshot(CurrentSnapshot, sourcexid);
/* Make a saved copy */
CurrentSnapshot = CopySnapshot(CurrentSnapshot);
FirstXactSnapshot = CurrentSnapshot;
/* Mark it as "registered" in FirstXactSnapshot */
FirstXactSnapshot->regd_count++;
RegisteredSnapshots++;
}
FirstSnapshotSet = true;
}
/*
* CopySnapshot
* Copy the given snapshot.
@ -558,6 +658,42 @@ AtEOXact_Snapshot(bool isCommit)
}
FirstXactSnapshot = NULL;
/*
* If we exported any snapshots, clean them up.
*/
if (exportedSnapshots != NIL)
{
TransactionId myxid = GetTopTransactionId();
int i;
char buf[MAXPGPATH];
/*
* Get rid of the files. Unlink failure is only a WARNING because
* (1) it's too late to abort the transaction, and (2) leaving a
* leaked file around has little real consequence anyway.
*/
for (i = 1; i <= list_length(exportedSnapshots); i++)
{
XactExportFilePath(buf, myxid, i, "");
if (unlink(buf))
elog(WARNING, "could not unlink file \"%s\": %m", buf);
}
/*
* As with the FirstXactSnapshot, we needn't spend any effort on
* cleaning up the per-snapshot data structures, but we do need to
* adjust the RegisteredSnapshots count to prevent a warning below.
*
* Note: you might be thinking "why do we have the exportedSnapshots
* list at all? All we need is a counter!". You're right, but we do
* it this way in case we ever feel like improving xmin management.
*/
Assert(RegisteredSnapshots >= list_length(exportedSnapshots));
RegisteredSnapshots -= list_length(exportedSnapshots);
exportedSnapshots = NIL;
}
/* On commit, complain about leftover snapshots */
if (isCommit)
{
@ -586,3 +722,464 @@ AtEOXact_Snapshot(bool isCommit)
SnapshotResetXmin();
}
/*
* ExportSnapshot
* Export the snapshot to a file so that other backends can import it.
* Returns the token (the file name) that can be used to import this
* snapshot.
*/
static char *
ExportSnapshot(Snapshot snapshot)
{
TransactionId topXid;
TransactionId *children;
int nchildren;
int addTopXid;
StringInfoData buf;
FILE *f;
int i;
MemoryContext oldcxt;
char path[MAXPGPATH];
char pathtmp[MAXPGPATH];
/*
* It's tempting to call RequireTransactionChain here, since it's not
* very useful to export a snapshot that will disappear immediately
* afterwards. However, we haven't got enough information to do that,
* since we don't know if we're at top level or not. For example, we
* could be inside a plpgsql function that is going to fire off other
* transactions via dblink. Rather than disallow perfectly legitimate
* usages, don't make a check.
*
* Also note that we don't make any restriction on the transaction's
* isolation level; however, importers must check the level if they
* are serializable.
*/
/*
* This will assign a transaction ID if we do not yet have one.
*/
topXid = GetTopTransactionId();
/*
* We cannot export a snapshot from a subtransaction because there's no
* easy way for importers to verify that the same subtransaction is still
* running.
*/
if (IsSubTransaction())
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot export a snapshot from a subtransaction")));
/*
* We do however allow previous committed subtransactions to exist.
* Importers of the snapshot must see them as still running, so get their
* XIDs to add them to the snapshot.
*/
nchildren = xactGetCommittedChildren(&children);
/*
* Copy the snapshot into TopTransactionContext, add it to the
* exportedSnapshots list, and mark it pseudo-registered. We do this to
* ensure that the snapshot's xmin is honored for the rest of the
* transaction. (Right now, because SnapshotResetXmin is so stupid, this
* is overkill; but later we might make that routine smarter.)
*/
snapshot = CopySnapshot(snapshot);
oldcxt = MemoryContextSwitchTo(TopTransactionContext);
exportedSnapshots = lappend(exportedSnapshots, snapshot);
MemoryContextSwitchTo(oldcxt);
snapshot->regd_count++;
RegisteredSnapshots++;
/*
* Fill buf with a text serialization of the snapshot, plus identification
* data about this transaction. The format expected by ImportSnapshot
* is pretty rigid: each line must be fieldname:value.
*/
initStringInfo(&buf);
appendStringInfo(&buf, "xid:%u\n", topXid);
appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId);
appendStringInfo(&buf, "iso:%d\n", XactIsoLevel);
appendStringInfo(&buf, "ro:%d\n", XactReadOnly);
appendStringInfo(&buf, "xmin:%u\n", snapshot->xmin);
appendStringInfo(&buf, "xmax:%u\n", snapshot->xmax);
/*
* We must include our own top transaction ID in the top-xid data, since
* by definition we will still be running when the importing transaction
* adopts the snapshot, but GetSnapshotData never includes our own XID in
* the snapshot. (There must, therefore, be enough room to add it.)
*
* However, it could be that our topXid is after the xmax, in which case
* we shouldn't include it because xip[] members are expected to be before
* xmax. (We need not make the same check for subxip[] members, see
* snapshot.h.)
*/
addTopXid = TransactionIdPrecedes(topXid, snapshot->xmax) ? 1 : 0;
appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid);
for (i = 0; i < snapshot->xcnt; i++)
appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]);
if (addTopXid)
appendStringInfo(&buf, "xip:%u\n", topXid);
/*
* Similarly, we add our subcommitted child XIDs to the subxid data.
* Here, we have to cope with possible overflow.
*/
if (snapshot->suboverflowed ||
snapshot->subxcnt + nchildren > GetMaxSnapshotSubxidCount())
appendStringInfoString(&buf, "sof:1\n");
else
{
appendStringInfoString(&buf, "sof:0\n");
appendStringInfo(&buf, "sxcnt:%d\n", snapshot->subxcnt + nchildren);
for (i = 0; i < snapshot->subxcnt; i++)
appendStringInfo(&buf, "sxp:%u\n", snapshot->subxip[i]);
for (i = 0; i < nchildren; i++)
appendStringInfo(&buf, "sxp:%u\n", children[i]);
}
appendStringInfo(&buf, "rec:%u\n", snapshot->takenDuringRecovery);
/*
* Now write the text representation into a file. We first write to a
* ".tmp" filename, and rename to final filename if no error. This
* ensures that no other backend can read an incomplete file
* (ImportSnapshot won't allow it because of its valid-characters check).
*/
XactExportFilePath(pathtmp, topXid, list_length(exportedSnapshots), ".tmp");
if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not create file \"%s\": %m", pathtmp)));
if (fwrite(buf.data, buf.len, 1, f) != 1)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to file \"%s\": %m", pathtmp)));
/* no fsync() since file need not survive a system crash */
if (FreeFile(f))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to file \"%s\": %m", pathtmp)));
/*
* Now that we have written everything into a .tmp file, rename the file
* to remove the .tmp suffix.
*/
XactExportFilePath(path, topXid, list_length(exportedSnapshots), "");
if (rename(pathtmp, path) < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not rename file \"%s\" to \"%s\": %m",
pathtmp, path)));
/*
* The basename of the file is what we return from pg_export_snapshot().
* It's already in path in a textual format and we know that the path
* starts with SNAPSHOT_EXPORT_DIR. Skip over the prefix and the slash
* and pstrdup it so as not to return the address of a local variable.
*/
return pstrdup(path + strlen(SNAPSHOT_EXPORT_DIR) + 1);
}
/*
* pg_export_snapshot
* SQL-callable wrapper for ExportSnapshot.
*/
Datum
pg_export_snapshot(PG_FUNCTION_ARGS)
{
char *snapshotName;
snapshotName = ExportSnapshot(GetActiveSnapshot());
PG_RETURN_TEXT_P(cstring_to_text(snapshotName));
}
/*
* Parsing subroutines for ImportSnapshot: parse a line with the given
* prefix followed by a value, and advance *s to the next line. The
* filename is provided for use in error messages.
*/
static int
parseIntFromText(const char *prefix, char **s, const char *filename)
{
char *ptr = *s;
int prefixlen = strlen(prefix);
int val;
if (strncmp(ptr, prefix, prefixlen) != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid snapshot data in file \"%s\"", filename)));
ptr += prefixlen;
if (sscanf(ptr, "%d", &val) != 1)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid snapshot data in file \"%s\"", filename)));
ptr = strchr(ptr, '\n');
if (!ptr)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid snapshot data in file \"%s\"", filename)));
*s = ptr + 1;
return val;
}
static TransactionId
parseXidFromText(const char *prefix, char **s, const char *filename)
{
char *ptr = *s;
int prefixlen = strlen(prefix);
TransactionId val;
if (strncmp(ptr, prefix, prefixlen) != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid snapshot data in file \"%s\"", filename)));
ptr += prefixlen;
if (sscanf(ptr, "%u", &val) != 1)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid snapshot data in file \"%s\"", filename)));
ptr = strchr(ptr, '\n');
if (!ptr)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid snapshot data in file \"%s\"", filename)));
*s = ptr + 1;
return val;
}
/*
* ImportSnapshot
* Import a previously exported snapshot. The argument should be a
* filename in SNAPSHOT_EXPORT_DIR. Load the snapshot from that file.
* This is called by "SET TRANSACTION SNAPSHOT 'foo'".
*/
void
ImportSnapshot(const char *idstr)
{
char path[MAXPGPATH];
FILE *f;
struct stat stat_buf;
char *filebuf;
int xcnt;
int i;
TransactionId src_xid;
Oid src_dbid;
int src_isolevel;
bool src_readonly;
SnapshotData snapshot;
/*
* Must be at top level of a fresh transaction. Note in particular that
* we check we haven't acquired an XID --- if we have, it's conceivable
* that the snapshot would show it as not running, making for very
* screwy behavior.
*/
if (FirstSnapshotSet ||
GetTopTransactionIdIfAny() != InvalidTransactionId ||
IsSubTransaction())
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("SET TRANSACTION SNAPSHOT must be called before any query")));
/*
* If we are in read committed mode then the next query would execute
* with a new snapshot thus making this function call quite useless.
*/
if (!IsolationUsesXactSnapshot())
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ")));
/*
* Verify the identifier: only 0-9, A-F and hyphens are allowed. We do
* this mainly to prevent reading arbitrary files.
*/
if (strspn(idstr, "0123456789ABCDEF-") != strlen(idstr))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid snapshot identifier \"%s\"", idstr)));
/* OK, read the file */
snprintf(path, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", idstr);
f = AllocateFile(path, PG_BINARY_R);
if (!f)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid snapshot identifier \"%s\"", idstr)));
/* get the size of the file so that we know how much memory we need */
if (fstat(fileno(f), &stat_buf))
elog(ERROR, "could not stat file \"%s\": %m", path);
/* and read the file into a palloc'd string */
filebuf = (char *) palloc(stat_buf.st_size + 1);
if (fread(filebuf, stat_buf.st_size, 1, f) != 1)
elog(ERROR, "could not read file \"%s\": %m", path);
filebuf[stat_buf.st_size] = '\0';
FreeFile(f);
/*
* Construct a snapshot struct by parsing the file content.
*/
memset(&snapshot, 0, sizeof(snapshot));
src_xid = parseXidFromText("xid:", &filebuf, path);
/* we abuse parseXidFromText a bit here ... */
src_dbid = parseXidFromText("dbid:", &filebuf, path);
src_isolevel = parseIntFromText("iso:", &filebuf, path);
src_readonly = parseIntFromText("ro:", &filebuf, path);
snapshot.xmin = parseXidFromText("xmin:", &filebuf, path);
snapshot.xmax = parseXidFromText("xmax:", &filebuf, path);
snapshot.xcnt = xcnt = parseIntFromText("xcnt:", &filebuf, path);
/* sanity-check the xid count before palloc */
if (xcnt < 0 || xcnt > GetMaxSnapshotXidCount())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid snapshot data in file \"%s\"", path)));
snapshot.xip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
for (i = 0; i < xcnt; i++)
snapshot.xip[i] = parseXidFromText("xip:", &filebuf, path);
snapshot.suboverflowed = parseIntFromText("sof:", &filebuf, path);
if (!snapshot.suboverflowed)
{
snapshot.subxcnt = xcnt = parseIntFromText("sxcnt:", &filebuf, path);
/* sanity-check the xid count before palloc */
if (xcnt < 0 || xcnt > GetMaxSnapshotSubxidCount())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid snapshot data in file \"%s\"", path)));
snapshot.subxip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
for (i = 0; i < xcnt; i++)
snapshot.subxip[i] = parseXidFromText("sxp:", &filebuf, path);
}
else
{
snapshot.subxcnt = 0;
snapshot.subxip = NULL;
}
snapshot.takenDuringRecovery = parseIntFromText("rec:", &filebuf, path);
/*
* Do some additional sanity checking, just to protect ourselves. We
* don't trouble to check the array elements, just the most critical
* fields.
*/
if (!TransactionIdIsNormal(src_xid) ||
!OidIsValid(src_dbid) ||
!TransactionIdIsNormal(snapshot.xmin) ||
!TransactionIdIsNormal(snapshot.xmax))
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid snapshot data in file \"%s\"", path)));
/*
* If we're serializable, the source transaction must be too, otherwise
* predicate.c has problems (SxactGlobalXmin could go backwards). Also,
* a non-read-only transaction can't adopt a snapshot from a read-only
* transaction, as predicate.c handles the cases very differently.
*/
if (IsolationIsSerializable())
{
if (src_isolevel != XACT_SERIALIZABLE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("a serializable transaction cannot import a snapshot from a non-serializable transaction")));
if (src_readonly && !XactReadOnly)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("a non-read-only serializable transaction cannot import a snapshot from a read-only transaction")));
}
/*
* We cannot import a snapshot that was taken in a different database,
* because vacuum calculates OldestXmin on a per-database basis; so the
* source transaction's xmin doesn't protect us from data loss. This
* restriction could be removed if the source transaction were to mark
* its xmin as being globally applicable. But that would require some
* additional syntax, since that has to be known when the snapshot is
* initially taken. (See pgsql-hackers discussion of 2011-10-21.)
*/
if (src_dbid != MyDatabaseId)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot import a snapshot from a different database")));
/* OK, install the snapshot */
SetTransactionSnapshot(&snapshot, src_xid);
}
/*
* XactHasExportedSnapshots
* Test whether current transaction has exported any snapshots.
*/
bool
XactHasExportedSnapshots(void)
{
return (exportedSnapshots != NIL);
}
/*
* DeleteAllExportedSnapshotFiles
* Clean up any files that have been left behind by a crashed backend
* that had exported snapshots before it died.
*
* This should be called during database startup or crash recovery.
*/
void
DeleteAllExportedSnapshotFiles(void)
{
char buf[MAXPGPATH];
DIR *s_dir;
struct dirent *s_de;
if (!(s_dir = AllocateDir(SNAPSHOT_EXPORT_DIR)))
{
/*
* We really should have that directory in a sane cluster setup. But
* then again if we don't, it's not fatal enough to make it FATAL.
* Since we're running in the postmaster, LOG is our best bet.
*/
elog(LOG, "could not open directory \"%s\": %m", SNAPSHOT_EXPORT_DIR);
return;
}
while ((s_de = ReadDir(s_dir, SNAPSHOT_EXPORT_DIR)) != NULL)
{
if (strcmp(s_de->d_name, ".") == 0 ||
strcmp(s_de->d_name, "..") == 0)
continue;
snprintf(buf, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", s_de->d_name);
/* Again, unlink failure is not worthy of FATAL */
if (unlink(buf))
elog(LOG, "could not unlink file \"%s\": %m", buf);
}
FreeDir(s_dir);
}