1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-20 05:03:10 +03:00

Fix up pg_dump's treatment of large object ownership and ACLs. We now emit

a separate archive entry for each BLOB, and use pg_dump's standard methods
for dealing with its ownership, ACL if any, and comment if any.  This means
that switches like --no-owner and --no-privileges do what they're supposed
to.  Preliminary testing says that performance is still reasonable even
with many blobs, though we'll have to see how that shakes out in the field.

KaiGai Kohei, revised by me
This commit is contained in:
Tom Lane
2010-02-18 01:29:10 +00:00
parent 2b44d74dd4
commit c0d5be5d6a
8 changed files with 284 additions and 248 deletions

View File

@ -15,7 +15,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.178 2010/01/19 18:39:19 tgl Exp $
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.179 2010/02/18 01:29:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -98,6 +98,7 @@ static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
static teReqs _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls);
static bool _tocEntryIsACL(TocEntry *te);
static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
@ -329,9 +330,9 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
AH->currentTE = te;
reqs = _tocEntryRequired(te, ropt, false /* needn't drop ACLs */ );
if (((reqs & REQ_SCHEMA) != 0) && te->dropStmt)
/* We want anything that's selected and has a dropStmt */
if (((reqs & (REQ_SCHEMA|REQ_DATA)) != 0) && te->dropStmt)
{
/* We want the schema */
ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag);
/* Select owner and schema as necessary */
_becomeOwner(AH, te);
@ -381,7 +382,8 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
/* Work out what, if anything, we want from this entry */
reqs = _tocEntryRequired(te, ropt, true);
if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */
/* Both schema and data objects might now have ownership/ACLs */
if ((reqs & (REQ_SCHEMA|REQ_DATA)) != 0)
{
ahlog(AH, 1, "setting owner and privileges for %s %s\n",
te->desc, te->tag);
@ -905,6 +907,7 @@ EndRestoreBlobs(ArchiveHandle *AH)
void
StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
{
bool old_blob_style = (AH->version < K_VERS_1_12);
Oid loOid;
AH->blobCount++;
@ -914,24 +917,32 @@ StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
ahlog(AH, 2, "restoring large object with OID %u\n", oid);
if (drop)
/* With an old archive we must do drop and create logic here */
if (old_blob_style && drop)
DropBlobIfExists(AH, oid);
if (AH->connection)
{
loOid = lo_create(AH->connection, oid);
if (loOid == 0 || loOid != oid)
die_horribly(AH, modulename, "could not create large object %u\n",
oid);
if (old_blob_style)
{
loOid = lo_create(AH->connection, oid);
if (loOid == 0 || loOid != oid)
die_horribly(AH, modulename, "could not create large object %u\n",
oid);
}
AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
if (AH->loFd == -1)
die_horribly(AH, modulename, "could not open large object\n");
die_horribly(AH, modulename, "could not open large object %u\n",
oid);
}
else
{
ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
oid, INV_WRITE);
if (old_blob_style)
ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
oid, INV_WRITE);
else
ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
oid, INV_WRITE);
}
AH->writingBlob = 1;
@ -1829,6 +1840,9 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
AH->vmin = K_VERS_MINOR;
AH->vrev = K_VERS_REV;
/* Make a convenient integer <maj><min><rev>00 */
AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
/* initialize for backwards compatible string processing */
AH->public.encoding = 0; /* PG_SQL_ASCII */
AH->public.std_strings = false;
@ -2068,12 +2082,13 @@ ReadToc(ArchiveHandle *AH)
else
{
/*
* rules for pre-8.4 archives wherein pg_dump hasn't classified
* the entries into sections
* Rules for pre-8.4 archives wherein pg_dump hasn't classified
* the entries into sections. This list need not cover entry
* types added later than 8.4.
*/
if (strcmp(te->desc, "COMMENT") == 0 ||
strcmp(te->desc, "ACL") == 0 ||
strcmp(te->desc, "DEFAULT ACL") == 0)
strcmp(te->desc, "ACL LANGUAGE") == 0)
te->section = SECTION_NONE;
else if (strcmp(te->desc, "TABLE DATA") == 0 ||
strcmp(te->desc, "BLOBS") == 0 ||
@ -2228,10 +2243,10 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
return 0;
/* If it's an ACL, maybe ignore it */
if ((!include_acls || ropt->aclsSkip) &&
(strcmp(te->desc, "ACL") == 0 || strcmp(te->desc, "DEFAULT ACL") == 0))
if ((!include_acls || ropt->aclsSkip) && _tocEntryIsACL(te))
return 0;
/* Ignore DATABASE entry unless we should create it */
if (!ropt->create && strcmp(te->desc, "DATABASE") == 0)
return 0;
@ -2286,9 +2301,18 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
if (!te->hadDumper)
{
/*
* Special Case: If 'SEQUENCE SET' then it is considered a data entry
* Special Case: If 'SEQUENCE SET' or anything to do with BLOBs,
* then it is considered a data entry. We don't need to check for
* the BLOBS entry or old-style BLOB COMMENTS, because they will
* have hadDumper = true ... but we do need to check new-style
* BLOB comments.
*/
if (strcmp(te->desc, "SEQUENCE SET") == 0)
if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
strcmp(te->desc, "BLOB") == 0 ||
(strcmp(te->desc, "ACL") == 0 &&
strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
(strcmp(te->desc, "COMMENT") == 0 &&
strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
res = res & REQ_DATA;
else
res = res & ~REQ_DATA;
@ -2320,6 +2344,20 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
return res;
}
/*
* Identify TOC entries that are ACLs.
*/
static bool
_tocEntryIsACL(TocEntry *te)
{
/* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
if (strcmp(te->desc, "ACL") == 0 ||
strcmp(te->desc, "ACL LANGUAGE") == 0 ||
strcmp(te->desc, "DEFAULT ACL") == 0)
return true;
return false;
}
/*
* Issue SET commands for parameters that we want to have set the same way
* at all times during execution of a restore script.
@ -2685,6 +2723,13 @@ _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
return;
}
/* BLOBs just have a name, but it's numeric so must not use fmtId */
if (strcmp(type, "BLOB") == 0)
{
appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
return;
}
/*
* These object types require additional decoration. Fortunately, the
* information needed is exactly what's in the DROP command.
@ -2723,14 +2768,12 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat
/* ACLs are dumped only during acl pass */
if (acl_pass)
{
if (!(strcmp(te->desc, "ACL") == 0 ||
strcmp(te->desc, "DEFAULT ACL") == 0))
if (!_tocEntryIsACL(te))
return;
}
else
{
if (strcmp(te->desc, "ACL") == 0 ||
strcmp(te->desc, "DEFAULT ACL") == 0)
if (_tocEntryIsACL(te))
return;
}
@ -2824,6 +2867,7 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat
strlen(te->owner) > 0 && strlen(te->dropStmt) > 0)
{
if (strcmp(te->desc, "AGGREGATE") == 0 ||
strcmp(te->desc, "BLOB") == 0 ||
strcmp(te->desc, "CONVERSION") == 0 ||
strcmp(te->desc, "DATABASE") == 0 ||
strcmp(te->desc, "DOMAIN") == 0 ||
@ -2873,7 +2917,7 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat
* If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
* commands, so we can no longer assume we know the current auth setting.
*/
if (strncmp(te->desc, "ACL", 3) == 0)
if (acl_pass)
{
if (AH->currUser)
free(AH->currUser);