1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-31 22:04:40 +03:00

Allow multiple tables to be specified in one VACUUM or ANALYZE command.

Not much to say about this; does what it says on the tin.

However, formerly, if there was a column list then the ANALYZE action was
implied; now it must be specified, or you get an error.  This is because
it would otherwise be a bit unclear what the user meant if some tables
have column lists and some don't.

Nathan Bossart, reviewed by Michael Paquier and Masahiko Sawada, with some
editorialization by me

Discussion: https://postgr.es/m/E061A8E3-5E3D-494D-94F0-E8A9B312BBFC@amazon.com
This commit is contained in:
Tom Lane
2017-10-03 18:53:44 -04:00
parent 45f9d08684
commit 11d8d72c27
14 changed files with 329 additions and 159 deletions

View File

@ -37,6 +37,7 @@
#include "commands/cluster.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
#include "storage/bufmgr.h"
@ -67,7 +68,8 @@ static BufferAccessStrategy vac_strategy;
/* non-export function prototypes */
static List *get_rel_oids(Oid relid, const RangeVar *vacrel);
static List *expand_vacuum_rel(VacuumRelation *vrel);
static List *get_all_vacuum_rels(void);
static void vac_truncate_clog(TransactionId frozenXID,
MultiXactId minMulti,
TransactionId lastSaneFrozenXid,
@ -90,9 +92,26 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
Assert(vacstmt->options & (VACOPT_VACUUM | VACOPT_ANALYZE));
Assert((vacstmt->options & VACOPT_VACUUM) ||
!(vacstmt->options & (VACOPT_FULL | VACOPT_FREEZE)));
Assert((vacstmt->options & VACOPT_ANALYZE) || vacstmt->va_cols == NIL);
Assert(!(vacstmt->options & VACOPT_SKIPTOAST));
/*
* Make sure VACOPT_ANALYZE is specified if any column lists are present.
*/
if (!(vacstmt->options & VACOPT_ANALYZE))
{
ListCell *lc;
foreach(lc, vacstmt->rels)
{
VacuumRelation *vrel = lfirst_node(VacuumRelation, lc);
if (vrel->va_cols != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("ANALYZE option must be specified when a column list is provided")));
}
}
/*
* All freeze ages are zero if the FREEZE option is given; otherwise pass
* them as -1 which means to use the default values.
@ -119,26 +138,22 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
params.log_min_duration = -1;
/* Now go through the common routine */
vacuum(vacstmt->options, vacstmt->relation, InvalidOid, &params,
vacstmt->va_cols, NULL, isTopLevel);
vacuum(vacstmt->options, vacstmt->rels, &params, NULL, isTopLevel);
}
/*
* Primary entry point for VACUUM and ANALYZE commands.
* Internal entry point for VACUUM and ANALYZE commands.
*
* options is a bitmask of VacuumOption flags, indicating what to do.
*
* relid, if not InvalidOid, indicates the relation to process; otherwise,
* if a RangeVar is supplied, that's what to process; otherwise, we process
* all relevant tables in the database. (If both relid and a RangeVar are
* supplied, the relid is what is processed, but we use the RangeVar's name
* to report any open/lock failure.)
* relations, if not NIL, is a list of VacuumRelation to process; otherwise,
* we process all relevant tables in the database. For each VacuumRelation,
* if a valid OID is supplied, the table with that OID is what to process;
* otherwise, the VacuumRelation's RangeVar indicates what to process.
*
* params contains a set of parameters that can be used to customize the
* behavior.
*
* va_cols is a list of columns to analyze, or NIL to process them all.
*
* bstrategy is normally given as NULL, but in autovacuum it can be passed
* in to use the same buffer strategy object across multiple vacuum() calls.
*
@ -148,14 +163,14 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
* memory context that will not disappear at transaction commit.
*/
void
vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
List *va_cols, BufferAccessStrategy bstrategy, bool isTopLevel)
vacuum(int options, List *relations, VacuumParams *params,
BufferAccessStrategy bstrategy, bool isTopLevel)
{
static bool in_vacuum = false;
const char *stmttype;
volatile bool in_outer_xact,
use_own_xacts;
List *relations;
static bool in_vacuum = false;
Assert(params != NULL);
@ -228,10 +243,29 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
vac_strategy = bstrategy;
/*
* Build list of relation OID(s) to process, putting it in vac_context for
* safekeeping.
* Build list of relation(s) to process, putting any new data in
* vac_context for safekeeping.
*/
relations = get_rel_oids(relid, relation);
if (relations != NIL)
{
List *newrels = NIL;
ListCell *lc;
foreach(lc, relations)
{
VacuumRelation *vrel = lfirst_node(VacuumRelation, lc);
List *sublist;
MemoryContext old_context;
sublist = expand_vacuum_rel(vrel);
old_context = MemoryContextSwitchTo(vac_context);
newrels = list_concat(newrels, sublist);
MemoryContextSwitchTo(old_context);
}
relations = newrels;
}
else
relations = get_all_vacuum_rels();
/*
* Decide whether we need to start/commit our own transactions.
@ -282,7 +316,7 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
CommitTransactionCommand();
}
/* Turn vacuum cost accounting on or off */
/* Turn vacuum cost accounting on or off, and set/clear in_vacuum */
PG_TRY();
{
ListCell *cur;
@ -299,11 +333,11 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
*/
foreach(cur, relations)
{
Oid relid = lfirst_oid(cur);
VacuumRelation *vrel = lfirst_node(VacuumRelation, cur);
if (options & VACOPT_VACUUM)
{
if (!vacuum_rel(relid, relation, options, params))
if (!vacuum_rel(vrel->oid, vrel->relation, options, params))
continue;
}
@ -320,8 +354,8 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
PushActiveSnapshot(GetTransactionSnapshot());
}
analyze_rel(relid, relation, options, params,
va_cols, in_outer_xact, vac_strategy);
analyze_rel(vrel->oid, vrel->relation, options, params,
vrel->va_cols, in_outer_xact, vac_strategy);
if (use_own_xacts)
{
@ -375,25 +409,33 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
}
/*
* Build a list of Oids for each relation to be processed
* Given a VacuumRelation, fill in the table OID if it wasn't specified,
* and optionally add VacuumRelations for partitions of the table.
*
* The list is built in vac_context so that it will survive across our
* per-relation transactions.
* If a VacuumRelation does not have an OID supplied and is a partitioned
* table, an extra entry will be added to the output for each partition.
* Presently, only autovacuum supplies OIDs when calling vacuum(), and
* it does not want us to expand partitioned tables.
*
* We take care not to modify the input data structure, but instead build
* new VacuumRelation(s) to return. (But note that they will reference
* unmodified parts of the input, eg column lists.) New data structures
* are made in vac_context.
*/
static List *
get_rel_oids(Oid relid, const RangeVar *vacrel)
expand_vacuum_rel(VacuumRelation *vrel)
{
List *oid_list = NIL;
List *vacrels = NIL;
MemoryContext oldcontext;
/* OID supplied by VACUUM's caller? */
if (OidIsValid(relid))
/* If caller supplied OID, there's nothing we need do here. */
if (OidIsValid(vrel->oid))
{
oldcontext = MemoryContextSwitchTo(vac_context);
oid_list = lappend_oid(oid_list, relid);
vacrels = lappend(vacrels, vrel);
MemoryContextSwitchTo(oldcontext);
}
else if (vacrel)
else
{
/* Process a specific relation, and possibly partitions thereof */
Oid relid;
@ -406,7 +448,16 @@ get_rel_oids(Oid relid, const RangeVar *vacrel)
* below, as well as find_all_inheritors's expectation that the caller
* holds some lock on the starting relation.
*/
relid = RangeVarGetRelid(vacrel, AccessShareLock, false);
relid = RangeVarGetRelid(vrel->relation, AccessShareLock, false);
/*
* Make a returnable VacuumRelation for this rel.
*/
oldcontext = MemoryContextSwitchTo(vac_context);
vacrels = lappend(vacrels, makeVacuumRelation(vrel->relation,
relid,
vrel->va_cols));
MemoryContextSwitchTo(oldcontext);
/*
* To check whether the relation is a partitioned table, fetch its
@ -420,19 +471,36 @@ get_rel_oids(Oid relid, const RangeVar *vacrel)
ReleaseSysCache(tuple);
/*
* Make relation list entries for this rel and its partitions, if any.
* Note that the list returned by find_all_inheritors() includes the
* passed-in OID at its head. There's no point in taking locks on the
* individual partitions yet, and doing so would just add unnecessary
* deadlock risk.
* If it is, make relation list entries for its partitions. Note that
* the list returned by find_all_inheritors() includes the passed-in
* OID, so we have to skip that. There's no point in taking locks on
* the individual partitions yet, and doing so would just add
* unnecessary deadlock risk.
*/
oldcontext = MemoryContextSwitchTo(vac_context);
if (include_parts)
oid_list = list_concat(oid_list,
find_all_inheritors(relid, NoLock, NULL));
else
oid_list = lappend_oid(oid_list, relid);
MemoryContextSwitchTo(oldcontext);
{
List *part_oids = find_all_inheritors(relid, NoLock, NULL);
ListCell *part_lc;
foreach(part_lc, part_oids)
{
Oid part_oid = lfirst_oid(part_lc);
if (part_oid == relid)
continue; /* ignore original table */
/*
* We omit a RangeVar since it wouldn't be appropriate to
* complain about failure to open one of these relations
* later.
*/
oldcontext = MemoryContextSwitchTo(vac_context);
vacrels = lappend(vacrels, makeVacuumRelation(NULL,
part_oid,
vrel->va_cols));
MemoryContextSwitchTo(oldcontext);
}
}
/*
* Release lock again. This means that by the time we actually try to
@ -447,45 +515,57 @@ get_rel_oids(Oid relid, const RangeVar *vacrel)
*/
UnlockRelationOid(relid, AccessShareLock);
}
else
return vacrels;
}
/*
* Construct a list of VacuumRelations for all vacuumable rels in
* the current database. The list is built in vac_context.
*/
static List *
get_all_vacuum_rels(void)
{
List *vacrels = NIL;
Relation pgclass;
HeapScanDesc scan;
HeapTuple tuple;
pgclass = heap_open(RelationRelationId, AccessShareLock);
scan = heap_beginscan_catalog(pgclass, 0, NULL);
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
Form_pg_class classForm = (Form_pg_class) GETSTRUCT(tuple);
MemoryContext oldcontext;
/*
* Process all plain relations and materialized views listed in
* pg_class
* We include partitioned tables here; depending on which operation is
* to be performed, caller will decide whether to process or ignore
* them.
*/
Relation pgclass;
HeapScanDesc scan;
HeapTuple tuple;
if (classForm->relkind != RELKIND_RELATION &&
classForm->relkind != RELKIND_MATVIEW &&
classForm->relkind != RELKIND_PARTITIONED_TABLE)
continue;
pgclass = heap_open(RelationRelationId, AccessShareLock);
scan = heap_beginscan_catalog(pgclass, 0, NULL);
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
Form_pg_class classForm = (Form_pg_class) GETSTRUCT(tuple);
/*
* We include partitioned tables here; depending on which
* operation is to be performed, caller will decide whether to
* process or ignore them.
*/
if (classForm->relkind != RELKIND_RELATION &&
classForm->relkind != RELKIND_MATVIEW &&
classForm->relkind != RELKIND_PARTITIONED_TABLE)
continue;
/* Make a relation list entry for this rel */
oldcontext = MemoryContextSwitchTo(vac_context);
oid_list = lappend_oid(oid_list, HeapTupleGetOid(tuple));
MemoryContextSwitchTo(oldcontext);
}
heap_endscan(scan);
heap_close(pgclass, AccessShareLock);
/*
* Build VacuumRelation(s) specifying the table OIDs to be processed.
* We omit a RangeVar since it wouldn't be appropriate to complain
* about failure to open one of these relations later.
*/
oldcontext = MemoryContextSwitchTo(vac_context);
vacrels = lappend(vacrels, makeVacuumRelation(NULL,
HeapTupleGetOid(tuple),
NIL));
MemoryContextSwitchTo(oldcontext);
}
return oid_list;
heap_endscan(scan);
heap_close(pgclass, AccessShareLock);
return vacrels;
}
/*