1
0
mirror of https://github.com/postgres/postgres.git synced 2025-04-25 21:42:33 +03:00

Back-patch contrib/vacuumlo's new -l (limit) option into 9.0 and 9.1.

Since 9.0, removing lots of large objects in a single transaction risks
exceeding max_locks_per_transaction, because we merged large object removal
into the generic object-drop mechanism, which takes out an exclusive lock
on each object to be dropped.  This creates a hazard for contrib/vacuumlo,
which has historically tried to drop all unreferenced large objects in one
transaction.  There doesn't seem to be any correctness requirement to do it
that way, though; we only need to drop enough large objects per transaction
to amortize the commit costs.

To prevent a regression from pre-9.0 releases wherein vacuumlo worked just
fine, back-patch commits b69f2e36402aaa222ed03c1769b3de6d5be5f302 and
64c604898e812aa93c124c666e8709fff1b8dd26, which break vacuumlo's deletions
into multiple transactions with a user-controllable upper limit on the
number of objects dropped per transaction.

Tim Lewis, Robert Haas, Tom Lane
This commit is contained in:
Tom Lane 2012-03-21 13:04:07 -04:00
parent 16f42be840
commit 5bd06e619c
2 changed files with 133 additions and 32 deletions

View File

@ -3,7 +3,7 @@
* vacuumlo.c * vacuumlo.c
* This removes orphaned large objects from a database. * This removes orphaned large objects from a database.
* *
* Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* *
@ -22,7 +22,6 @@
#endif #endif
#include "libpq-fe.h" #include "libpq-fe.h"
#include "libpq/libpq-fs.h"
#define atooid(x) ((Oid) strtoul((x), NULL, 10)) #define atooid(x) ((Oid) strtoul((x), NULL, 10))
@ -30,8 +29,7 @@
extern char *optarg; extern char *optarg;
extern int optind, extern int optind,
opterr, opterr;
optopt;
enum trivalue enum trivalue
{ {
@ -48,29 +46,32 @@ struct _param
char *pg_host; char *pg_host;
int verbose; int verbose;
int dry_run; int dry_run;
long transaction_limit;
}; };
int vacuumlo(char *, struct _param *); static int vacuumlo(const char *database, const struct _param * param);
void usage(const char *progname); static void usage(const char *progname);
/* /*
* This vacuums LOs of one database. It returns 0 on success, -1 on failure. * This vacuums LOs of one database. It returns 0 on success, -1 on failure.
*/ */
int static int
vacuumlo(char *database, struct _param * param) vacuumlo(const char *database, const struct _param * param)
{ {
PGconn *conn; PGconn *conn;
PGresult *res, PGresult *res,
*res2; *res2;
char buf[BUFSIZE]; char buf[BUFSIZE];
int matched; long matched;
int deleted; long deleted;
int i; int i;
static char *password = NULL; static char *password = NULL;
bool new_pass; bool new_pass;
bool success = true;
/* Note: password can be carried over from a previous call */
if (param->pg_prompt == TRI_YES && password == NULL) if (param->pg_prompt == TRI_YES && password == NULL)
password = simple_prompt("Password: ", 100, false); password = simple_prompt("Password: ", 100, false);
@ -118,7 +119,7 @@ vacuumlo(char *database, struct _param * param)
if (param->verbose) if (param->verbose)
{ {
fprintf(stdout, "Connected to %s\n", database); fprintf(stdout, "Connected to database \"%s\"\n", database);
if (param->dry_run) if (param->dry_run)
fprintf(stdout, "Test run: no large objects will be removed!\n"); fprintf(stdout, "Test run: no large objects will be removed!\n");
} }
@ -219,9 +220,21 @@ vacuumlo(char *database, struct _param * param)
if (param->verbose) if (param->verbose)
fprintf(stdout, "Checking %s in %s.%s\n", field, schema, table); fprintf(stdout, "Checking %s in %s.%s\n", field, schema, table);
schema = PQescapeIdentifier(conn, schema, strlen(schema));
table = PQescapeIdentifier(conn, table, strlen(table));
field = PQescapeIdentifier(conn, field, strlen(field));
if (!schema || !table || !field)
{
fprintf(stderr, "Out of memory\n");
PQclear(res);
PQfinish(conn);
return -1;
}
snprintf(buf, BUFSIZE, snprintf(buf, BUFSIZE,
"DELETE FROM vacuum_l " "DELETE FROM vacuum_l "
"WHERE lo IN (SELECT \"%s\" FROM \"%s\".\"%s\")", "WHERE lo IN (SELECT %s FROM %s.%s)",
field, schema, table); field, schema, table);
res2 = PQexec(conn, buf); res2 = PQexec(conn, buf);
if (PQresultStatus(res2) != PGRES_COMMAND_OK) if (PQresultStatus(res2) != PGRES_COMMAND_OK)
@ -235,23 +248,35 @@ vacuumlo(char *database, struct _param * param)
return -1; return -1;
} }
PQclear(res2); PQclear(res2);
PQfreemem(schema);
PQfreemem(table);
PQfreemem(field);
} }
PQclear(res); PQclear(res);
/* /*
* Run the actual deletes in a single transaction. Note that this would * Now, those entries remaining in vacuum_l are orphans. Delete 'em.
* be a bad idea in pre-7.1 Postgres releases (since rolling back a table *
* delete used to cause problems), but it should be safe now. * We don't want to run each delete as an individual transaction, because
* the commit overhead would be high. However, since 9.0 the backend will
* acquire a lock per deleted LO, so deleting too many LOs per transaction
* risks running out of room in the shared-memory lock table.
* Accordingly, we delete up to transaction_limit LOs per transaction.
*/ */
res = PQexec(conn, "begin"); res = PQexec(conn, "begin");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Failed to start transaction:\n");
fprintf(stderr, "%s", PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
return -1;
}
PQclear(res); PQclear(res);
/*
* Finally, those entries remaining in vacuum_l are orphans.
*/
buf[0] = '\0'; buf[0] = '\0';
strcat(buf, "SELECT lo "); strcat(buf, "SELECT lo FROM vacuum_l");
strcat(buf, "FROM vacuum_l");
res = PQexec(conn, buf); res = PQexec(conn, buf);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
@ -280,37 +305,87 @@ vacuumlo(char *database, struct _param * param)
{ {
fprintf(stderr, "\nFailed to remove lo %u: ", lo); fprintf(stderr, "\nFailed to remove lo %u: ", lo);
fprintf(stderr, "%s", PQerrorMessage(conn)); fprintf(stderr, "%s", PQerrorMessage(conn));
if (PQtransactionStatus(conn) == PQTRANS_INERROR)
{
success = false;
break;
}
} }
else else
deleted++; deleted++;
} }
else else
deleted++; deleted++;
if (param->transaction_limit > 0 &&
(deleted % param->transaction_limit) == 0)
{
res2 = PQexec(conn, "commit");
if (PQresultStatus(res2) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Failed to commit transaction:\n");
fprintf(stderr, "%s", PQerrorMessage(conn));
PQclear(res2);
PQclear(res);
PQfinish(conn);
return -1;
}
PQclear(res2);
res2 = PQexec(conn, "begin");
if (PQresultStatus(res2) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Failed to start transaction:\n");
fprintf(stderr, "%s", PQerrorMessage(conn));
PQclear(res2);
PQclear(res);
PQfinish(conn);
return -1;
}
PQclear(res2);
}
} }
PQclear(res); PQclear(res);
/* /*
* That's all folks! * That's all folks!
*/ */
res = PQexec(conn, "end"); res = PQexec(conn, "commit");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Failed to commit transaction:\n");
fprintf(stderr, "%s", PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
return -1;
}
PQclear(res); PQclear(res);
PQfinish(conn); PQfinish(conn);
if (param->verbose) if (param->verbose)
fprintf(stdout, "\r%s %d large objects from %s.\n", {
(param->dry_run ? "Would remove" : "Removed"), deleted, database); if (param->dry_run)
fprintf(stdout, "\rWould remove %ld large objects from database \"%s\".\n",
deleted, database);
else if (success)
fprintf(stdout,
"\rSuccessfully removed %ld large objects from database \"%s\".\n",
deleted, database);
else
fprintf(stdout, "\rRemoval from database \"%s\" failed at object %ld of %ld.\n",
database, deleted, matched);
}
return 0; return ((param->dry_run || success) ? 0 : -1);
} }
void static void
usage(const char *progname) usage(const char *progname)
{ {
printf("%s removes unreferenced large objects from databases.\n\n", progname); printf("%s removes unreferenced large objects from databases.\n\n", progname);
printf("Usage:\n %s [OPTION]... DBNAME...\n\n", progname); printf("Usage:\n %s [OPTION]... DBNAME...\n\n", progname);
printf("Options:\n"); printf("Options:\n");
printf(" -h HOSTNAME database server host or socket directory\n"); printf(" -h HOSTNAME database server host or socket directory\n");
printf(" -l LIMIT commit after removing each LIMIT large objects\n");
printf(" -n don't remove large objects, just show what would be done\n"); printf(" -n don't remove large objects, just show what would be done\n");
printf(" -p PORT database server port\n"); printf(" -p PORT database server port\n");
printf(" -U USERNAME user name to connect as\n"); printf(" -U USERNAME user name to connect as\n");
@ -335,14 +410,16 @@ main(int argc, char **argv)
progname = get_progname(argv[0]); progname = get_progname(argv[0]);
/* Parameter handling */ /* Set default parameter values */
param.pg_user = NULL; param.pg_user = NULL;
param.pg_prompt = TRI_DEFAULT; param.pg_prompt = TRI_DEFAULT;
param.pg_host = NULL; param.pg_host = NULL;
param.pg_port = NULL; param.pg_port = NULL;
param.verbose = 0; param.verbose = 0;
param.dry_run = 0; param.dry_run = 0;
param.transaction_limit = 1000;
/* Process command-line arguments */
if (argc > 1) if (argc > 1)
{ {
if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
@ -359,7 +436,7 @@ main(int argc, char **argv)
while (1) while (1)
{ {
c = getopt(argc, argv, "h:U:p:vnwW"); c = getopt(argc, argv, "h:l:U:p:vnwW");
if (c == -1) if (c == -1)
break; break;
@ -377,6 +454,16 @@ main(int argc, char **argv)
param.dry_run = 1; param.dry_run = 1;
param.verbose = 1; param.verbose = 1;
break; break;
case 'l':
param.transaction_limit = strtol(optarg, NULL, 10);
if (param.transaction_limit < 0)
{
fprintf(stderr,
"%s: transaction limit must not be negative (0 disables)\n",
progname);
exit(1);
}
break;
case 'U': case 'U':
param.pg_user = strdup(optarg); param.pg_user = strdup(optarg);
break; break;
@ -405,7 +492,7 @@ main(int argc, char **argv)
if (optind >= argc) if (optind >= argc)
{ {
fprintf(stderr, "vacuumlo: missing required argument: database name\n"); fprintf(stderr, "vacuumlo: missing required argument: database name\n");
fprintf(stderr, "Try 'vacuumlo -?' for help.\n"); fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
exit(1); exit(1);
} }

View File

@ -49,6 +49,19 @@ vacuumlo [options] database [database2 ... databaseN]
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><option>-l</option> <replaceable>limit</></term>
<listitem>
<para>
Remove no more than <replaceable>limit</> large objects per
transaction (default 1000). Since the server acquires a lock per LO
removed, removing too many LOs in one transaction risks exceeding
<xref linkend="guc-max-locks-per-transaction">. Set the limit to
zero if you want all removals done in a single transaction.
</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term><option>-U</option> <replaceable>username</></term> <term><option>-U</option> <replaceable>username</></term>
<listitem> <listitem>
@ -110,18 +123,19 @@ vacuumlo [options] database [database2 ... databaseN]
<title>Method</title> <title>Method</title>
<para> <para>
First, it builds a temporary table which contains all of the OIDs of the First, <application>vacuumlo</> builds a temporary table which contains all
large objects in that database. of the OIDs of the large objects in the selected database.
</para> </para>
<para> <para>
It then scans through all columns in the database that are of type It then scans through all columns in the database that are of type
<type>oid</> or <type>lo</>, and removes matching entries from the <type>oid</> or <type>lo</>, and removes matching entries from the
temporary table. temporary table. (Note: only types with these names are considered;
in particular, domains over them are not considered.)
</para> </para>
<para> <para>
The remaining entries in the temp table identify orphaned LOs. The remaining entries in the temporary table identify orphaned LOs.
These are removed. These are removed.
</para> </para>
</sect2> </sect2>