mirror of
https://github.com/postgres/postgres.git
synced 2025-06-23 14:01:44 +03:00
pg_dump can now dump large objects even in plain-text output mode, by
using the recently added lo_create() function. The restore logic in pg_restore is greatly simplified as well, since there's no need anymore to try to adjust database references to match a new set of blob OIDs.
This commit is contained in:
@ -15,7 +15,7 @@
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.110 2005/06/09 17:56:51 momjian Exp $
|
||||
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.111 2005/06/21 20:45:44 tgl Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -49,8 +49,6 @@ static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
|
||||
static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass);
|
||||
|
||||
|
||||
static void fixPriorBlobRefs(ArchiveHandle *AH, TocEntry *blobte,
|
||||
RestoreOptions *ropt);
|
||||
static void _doSetFixedOutputState(ArchiveHandle *AH);
|
||||
static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
|
||||
static void _doSetWithOids(ArchiveHandle *AH, const bool withOids);
|
||||
@ -67,12 +65,10 @@ static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
|
||||
static void _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
|
||||
static int _discoverArchiveFormat(ArchiveHandle *AH);
|
||||
|
||||
static void dump_lo_buf(ArchiveHandle *AH);
|
||||
static void _write_msg(const char *modulename, const char *fmt, va_list ap);
|
||||
static void _die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt, va_list ap);
|
||||
|
||||
static int _canRestoreBlobs(ArchiveHandle *AH);
|
||||
static int _restoringToDB(ArchiveHandle *AH);
|
||||
|
||||
static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
|
||||
|
||||
|
||||
@ -306,22 +302,13 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
|
||||
|
||||
_printTocEntry(AH, te, ropt, true, false);
|
||||
|
||||
/*
|
||||
* Maybe we can't do BLOBS, so check if this node is
|
||||
* for BLOBS
|
||||
*/
|
||||
if ((strcmp(te->desc, "BLOBS") == 0) &&
|
||||
!_canRestoreBlobs(AH))
|
||||
if (strcmp(te->desc, "BLOBS") == 0)
|
||||
{
|
||||
ahprintf(AH, "--\n-- SKIPPED \n--\n\n");
|
||||
ahlog(AH, 1, "restoring blob data\n");
|
||||
|
||||
/*
|
||||
* This is a bit nasty - we assume, for the
|
||||
* moment, that if a custom output is used, then
|
||||
* we don't want warnings.
|
||||
*/
|
||||
if (!AH->CustomOutPtr)
|
||||
write_msg(modulename, "WARNING: skipping large-object restoration\n");
|
||||
_selectOutputSchema(AH, "pg_catalog");
|
||||
|
||||
(*AH->PrintTocDataPtr) (AH, te, ropt);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -331,7 +318,8 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
|
||||
_becomeOwner(AH, te);
|
||||
_selectOutputSchema(AH, te->namespace);
|
||||
|
||||
ahlog(AH, 1, "restoring data for table \"%s\"\n", te->tag);
|
||||
ahlog(AH, 1, "restoring data for table \"%s\"\n",
|
||||
te->tag);
|
||||
|
||||
/*
|
||||
* If we have a copy statement, use it. As of
|
||||
@ -349,24 +337,6 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
|
||||
|
||||
(*AH->PrintTocDataPtr) (AH, te, ropt);
|
||||
|
||||
/*
|
||||
* If we just restored blobs, fix references in
|
||||
* previously-loaded tables; otherwise, if we
|
||||
* previously restored blobs, fix references in
|
||||
* this table. Note that in standard cases the
|
||||
* BLOBS entry comes after all TABLE DATA entries,
|
||||
* but we should cope with other orders in case
|
||||
* the user demands reordering.
|
||||
*/
|
||||
if (strcmp(te->desc, "BLOBS") == 0)
|
||||
fixPriorBlobRefs(AH, te, ropt);
|
||||
else if (AH->createdBlobXref &&
|
||||
strcmp(te->desc, "TABLE DATA") == 0)
|
||||
{
|
||||
ahlog(AH, 1, "fixing up large-object cross-reference for \"%s\"\n", te->tag);
|
||||
FixupBlobRefs(AH, te);
|
||||
}
|
||||
|
||||
_enableTriggersIfNecessary(AH, te, ropt);
|
||||
}
|
||||
}
|
||||
@ -415,47 +385,6 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
|
||||
{
|
||||
PQfinish(AH->connection);
|
||||
AH->connection = NULL;
|
||||
|
||||
if (AH->blobConnection)
|
||||
{
|
||||
PQfinish(AH->blobConnection);
|
||||
AH->blobConnection = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* After restoring BLOBS, fix all blob references in previously-restored
|
||||
* tables. (Normally, the BLOBS entry should appear after all TABLE DATA
|
||||
* entries, so this will in fact handle all blob references.)
|
||||
*/
|
||||
static void
|
||||
fixPriorBlobRefs(ArchiveHandle *AH, TocEntry *blobte, RestoreOptions *ropt)
|
||||
{
|
||||
TocEntry *te;
|
||||
teReqs reqs;
|
||||
|
||||
if (AH->createdBlobXref)
|
||||
{
|
||||
/* NULL parameter means disable ALL user triggers */
|
||||
_disableTriggersIfNecessary(AH, NULL, ropt);
|
||||
|
||||
for (te = AH->toc->next; te != blobte; te = te->next)
|
||||
{
|
||||
if (strcmp(te->desc, "TABLE DATA") == 0)
|
||||
{
|
||||
reqs = _tocEntryRequired(te, ropt, false);
|
||||
|
||||
if ((reqs & REQ_DATA) != 0) /* We loaded the data */
|
||||
{
|
||||
ahlog(AH, 1, "fixing up large-object cross-reference for \"%s\"\n", te->tag);
|
||||
FixupBlobRefs(AH, te);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* NULL parameter means enable ALL user triggers */
|
||||
_enableTriggersIfNecessary(AH, NULL, ropt);
|
||||
}
|
||||
}
|
||||
|
||||
@ -477,22 +406,6 @@ NewRestoreOptions(void)
|
||||
return opts;
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns true if we're restoring directly to the database (and
|
||||
* aren't just making a psql script that can do the restoration).
|
||||
*/
|
||||
static int
|
||||
_restoringToDB(ArchiveHandle *AH)
|
||||
{
|
||||
return (AH->ropt->useDB && AH->connection);
|
||||
}
|
||||
|
||||
static int
|
||||
_canRestoreBlobs(ArchiveHandle *AH)
|
||||
{
|
||||
return _restoringToDB(AH);
|
||||
}
|
||||
|
||||
static void
|
||||
_disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
|
||||
{
|
||||
@ -500,10 +413,6 @@ _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *rop
|
||||
if (!ropt->dataOnly || !ropt->disable_triggers)
|
||||
return;
|
||||
|
||||
/* Don't do it for the BLOBS TocEntry, either */
|
||||
if (te && strcmp(te->desc, "BLOBS") == 0)
|
||||
return;
|
||||
|
||||
/*
|
||||
* Become superuser if possible, since they are the only ones who can
|
||||
* update pg_class. If -S was not given, assume the initial user
|
||||
@ -539,10 +448,6 @@ _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt
|
||||
if (!ropt->dataOnly || !ropt->disable_triggers)
|
||||
return;
|
||||
|
||||
/* Don't do it for the BLOBS TocEntry, either */
|
||||
if (te && strcmp(te->desc, "BLOBS") == 0)
|
||||
return;
|
||||
|
||||
/*
|
||||
* Become superuser if possible, since they are the only ones who can
|
||||
* update pg_class. If -S was not given, assume the initial user
|
||||
@ -757,6 +662,11 @@ EndBlob(Archive *AHX, Oid oid)
|
||||
void
|
||||
StartRestoreBlobs(ArchiveHandle *AH)
|
||||
{
|
||||
if (AH->connection)
|
||||
StartTransaction(AH);
|
||||
else
|
||||
ahprintf(AH, "BEGIN;\n\n");
|
||||
|
||||
AH->blobCount = 0;
|
||||
}
|
||||
|
||||
@ -766,17 +676,10 @@ StartRestoreBlobs(ArchiveHandle *AH)
|
||||
void
|
||||
EndRestoreBlobs(ArchiveHandle *AH)
|
||||
{
|
||||
if (AH->txActive)
|
||||
{
|
||||
ahlog(AH, 2, "committing large-object transactions\n");
|
||||
if (AH->connection)
|
||||
CommitTransaction(AH);
|
||||
}
|
||||
|
||||
if (AH->blobTxActive)
|
||||
CommitTransactionXref(AH);
|
||||
|
||||
if (AH->createdBlobXref)
|
||||
CreateBlobXrefIndex(AH);
|
||||
else
|
||||
ahprintf(AH, "COMMIT;\n\n");
|
||||
|
||||
ahlog(AH, 1, "restored %d large objects\n", AH->blobCount);
|
||||
}
|
||||
@ -792,40 +695,26 @@ StartRestoreBlob(ArchiveHandle *AH, Oid oid)
|
||||
|
||||
AH->blobCount++;
|
||||
|
||||
if (!AH->createdBlobXref)
|
||||
{
|
||||
if (!AH->connection)
|
||||
die_horribly(AH, modulename, "cannot restore large objects without a database connection\n");
|
||||
|
||||
CreateBlobXrefTable(AH);
|
||||
AH->createdBlobXref = 1;
|
||||
}
|
||||
|
||||
/* Initialize the LO Buffer */
|
||||
AH->lo_buf_used = 0;
|
||||
|
||||
/*
|
||||
* Start long-running TXs if necessary
|
||||
*/
|
||||
if (!AH->txActive)
|
||||
ahlog(AH, 2, "restoring large object with OID %u\n", oid);
|
||||
|
||||
if (AH->connection)
|
||||
{
|
||||
ahlog(AH, 2, "starting large-object transactions\n");
|
||||
StartTransaction(AH);
|
||||
loOid = lo_create(AH->connection, oid);
|
||||
if (loOid == 0 || loOid != oid)
|
||||
die_horribly(AH, modulename, "could not create large object %u\n",
|
||||
oid);
|
||||
|
||||
AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
|
||||
if (AH->loFd == -1)
|
||||
die_horribly(AH, modulename, "could not open large object\n");
|
||||
}
|
||||
else
|
||||
{
|
||||
ahprintf(AH, "SELECT lo_open(lo_create(%u), %d);\n", oid, INV_WRITE);
|
||||
}
|
||||
if (!AH->blobTxActive)
|
||||
StartTransactionXref(AH);
|
||||
|
||||
loOid = lo_creat(AH->connection, INV_READ | INV_WRITE);
|
||||
if (loOid == 0)
|
||||
die_horribly(AH, modulename, "could not create large object\n");
|
||||
|
||||
ahlog(AH, 2, "restoring large object with OID %u as %u\n", oid, loOid);
|
||||
|
||||
InsertBlobXref(AH, oid, loOid);
|
||||
|
||||
AH->loFd = lo_open(AH->connection, loOid, INV_WRITE);
|
||||
if (AH->loFd == -1)
|
||||
die_horribly(AH, modulename, "could not open large object\n");
|
||||
|
||||
AH->writingBlob = 1;
|
||||
}
|
||||
@ -836,29 +725,19 @@ EndRestoreBlob(ArchiveHandle *AH, Oid oid)
|
||||
if (AH->lo_buf_used > 0)
|
||||
{
|
||||
/* Write remaining bytes from the LO buffer */
|
||||
size_t res;
|
||||
|
||||
res = lo_write(AH->connection, AH->loFd, (void *) AH->lo_buf, AH->lo_buf_used);
|
||||
|
||||
ahlog(AH, 5, "wrote remaining %lu bytes of large-object data (result = %lu)\n",
|
||||
(unsigned long) AH->lo_buf_used, (unsigned long) res);
|
||||
if (res != AH->lo_buf_used)
|
||||
die_horribly(AH, modulename, "could not write to large object (result: %lu, expected: %lu)\n",
|
||||
(unsigned long) res, (unsigned long) AH->lo_buf_used);
|
||||
AH->lo_buf_used = 0;
|
||||
dump_lo_buf(AH);
|
||||
}
|
||||
|
||||
lo_close(AH->connection, AH->loFd);
|
||||
AH->writingBlob = 0;
|
||||
|
||||
/*
|
||||
* Commit every BLOB_BATCH_SIZE blobs...
|
||||
*/
|
||||
if (((AH->blobCount / BLOB_BATCH_SIZE) * BLOB_BATCH_SIZE) == AH->blobCount)
|
||||
if (AH->connection)
|
||||
{
|
||||
ahlog(AH, 2, "committing large-object transactions\n");
|
||||
CommitTransaction(AH);
|
||||
CommitTransactionXref(AH);
|
||||
lo_close(AH->connection, AH->loFd);
|
||||
AH->loFd = -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
ahprintf(AH, "SELECT lo_close(0);\n\n");
|
||||
}
|
||||
}
|
||||
|
||||
@ -1107,6 +986,45 @@ RestoringToDB(ArchiveHandle *AH)
|
||||
return (AH->ropt && AH->ropt->useDB && AH->connection);
|
||||
}
|
||||
|
||||
/*
|
||||
* Dump the current contents of the LO data buffer while writing a BLOB
|
||||
*/
|
||||
static void
|
||||
dump_lo_buf(ArchiveHandle *AH)
|
||||
{
|
||||
if (AH->connection)
|
||||
{
|
||||
size_t res;
|
||||
|
||||
res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
|
||||
ahlog(AH, 5, "wrote %lu bytes of large object data (result = %lu)\n",
|
||||
(unsigned long) AH->lo_buf_used, (unsigned long) res);
|
||||
if (res != AH->lo_buf_used)
|
||||
die_horribly(AH, modulename,
|
||||
"could not write to large object (result: %lu, expected: %lu)\n",
|
||||
(unsigned long) res, (unsigned long) AH->lo_buf_used);
|
||||
}
|
||||
else
|
||||
{
|
||||
unsigned char *str;
|
||||
size_t len;
|
||||
|
||||
str = PQescapeBytea((const unsigned char *) AH->lo_buf,
|
||||
AH->lo_buf_used, &len);
|
||||
if (!str)
|
||||
die_horribly(AH, modulename, "out of memory\n");
|
||||
|
||||
/* Hack: turn off writingBlob so ahwrite doesn't recurse to here */
|
||||
AH->writingBlob = 0;
|
||||
ahprintf(AH, "SELECT lowrite(0, '%s');\n", str);
|
||||
AH->writingBlob = 1;
|
||||
|
||||
free(str);
|
||||
}
|
||||
AH->lo_buf_used = 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Write buffer to the output file (usually stdout). This is user for
|
||||
* outputting 'restore' scripts etc. It is even possible for an archive
|
||||
@ -1120,30 +1038,22 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
|
||||
|
||||
if (AH->writingBlob)
|
||||
{
|
||||
if (AH->lo_buf_used + size * nmemb > AH->lo_buf_size)
|
||||
{
|
||||
/* Split LO buffer */
|
||||
size_t remaining = AH->lo_buf_size - AH->lo_buf_used;
|
||||
size_t slack = nmemb * size - remaining;
|
||||
size_t remaining = size * nmemb;
|
||||
|
||||
memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
|
||||
res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_size);
|
||||
ahlog(AH, 5, "wrote %lu bytes of large object data (result = %lu)\n",
|
||||
(unsigned long) AH->lo_buf_size, (unsigned long) res);
|
||||
if (res != AH->lo_buf_size)
|
||||
die_horribly(AH, modulename,
|
||||
"could not write to large object (result: %lu, expected: %lu)\n",
|
||||
(unsigned long) res, (unsigned long) AH->lo_buf_size);
|
||||
memcpy(AH->lo_buf, (char *) ptr + remaining, slack);
|
||||
AH->lo_buf_used = slack;
|
||||
}
|
||||
else
|
||||
while (AH->lo_buf_used + remaining > AH->lo_buf_size)
|
||||
{
|
||||
/* LO Buffer is still large enough, buffer it */
|
||||
memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, size * nmemb);
|
||||
AH->lo_buf_used += size * nmemb;
|
||||
size_t avail = AH->lo_buf_size - AH->lo_buf_used;
|
||||
|
||||
memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
|
||||
ptr = (const void *) ((const char *) ptr + avail);
|
||||
remaining -= avail;
|
||||
AH->lo_buf_used += avail;
|
||||
dump_lo_buf(AH);
|
||||
}
|
||||
|
||||
memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
|
||||
AH->lo_buf_used += remaining;
|
||||
|
||||
return size * nmemb;
|
||||
}
|
||||
else if (AH->gzOut)
|
||||
@ -1213,8 +1123,6 @@ _die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt, va_lis
|
||||
write_msg(NULL, "*** aborted because of error\n");
|
||||
if (AH->connection)
|
||||
PQfinish(AH->connection);
|
||||
if (AH->blobConnection)
|
||||
PQfinish(AH->blobConnection);
|
||||
}
|
||||
|
||||
exit(1);
|
||||
|
Reference in New Issue
Block a user