1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-20 15:22:23 +03:00

New version attached. The following is implemented:

- CLUSTER ALL clusters all the tables that have some index with
  indisclustered set and the calling user owns.
- CLUSTER tablename clusters the named table, using the index with
  indisclustered set.  If no index has the bit set, throws elog(ERROR).
- The multi-relation version (CLUSTER ALL) uses a multitransaction
  approach, similar to what VACUUM does.

Alvaro Herrera
This commit is contained in:
Bruce Momjian
2002-11-15 03:09:39 +00:00
parent 5b7eb4dd45
commit 8bc717cb88
8 changed files with 484 additions and 38 deletions

View File

@ -11,7 +11,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.93 2002/11/11 22:19:21 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.94 2002/11/15 03:09:35 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@ -25,9 +25,11 @@
#include "catalog/index.h"
#include "catalog/indexing.h"
#include "catalog/catname.h"
#include "catalog/namespace.h"
#include "commands/cluster.h"
#include "commands/tablecmds.h"
#include "miscadmin.h"
#include "utils/acl.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
@ -48,12 +50,27 @@ typedef struct
bool isclustered;
} IndexAttrs;
/* This struct is used to pass around the information on tables to be
* clustered. We need this so we can make a list of them when invoked without
* a specific table/index pair.
*/
typedef struct
{
Oid tableOid;
Oid indexOid;
bool isPrevious;
} relToCluster;
static Oid make_new_heap(Oid OIDOldHeap, const char *NewName);
static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
static List *get_indexattr_list(Relation OldHeap, Oid OldIndex);
static void recreate_indexattr(Oid OIDOldHeap, List *indexes);
static void swap_relfilenodes(Oid r1, Oid r2);
static void cluster_rel(relToCluster *rv);
static bool check_cluster_ownership(Oid relOid);
static List *get_tables_to_cluster(Oid owner);
static MemoryContext cluster_context = NULL;
/*
* cluster
@ -69,43 +86,70 @@ static void swap_relfilenodes(Oid r1, Oid r2);
* the new table, it's better to create the indexes afterwards than to fill
* them incrementally while we load the table.
*
* Permissions checks were done already.
* Since we may open a new transaction for each relation, we have to
* check that the relation still is what we think it is.
*/
void
cluster(RangeVar *oldrelation, char *oldindexname)
cluster_rel(relToCluster *rvtc)
{
Oid OIDOldHeap,
OIDOldIndex,
OIDNewHeap;
Oid OIDNewHeap;
Relation OldHeap,
OldIndex;
char NewHeapName[NAMEDATALEN];
ObjectAddress object;
List *indexes;
/* Check for user-requested abort. */
CHECK_FOR_INTERRUPTS();
/* Check if the relation and index still exist before opening them
*/
if (!SearchSysCacheExists(RELOID,
ObjectIdGetDatum(rvtc->tableOid),
0, 0, 0) ||
!SearchSysCacheExists(RELOID,
ObjectIdGetDatum(rvtc->indexOid),
0, 0, 0))
return;
/* Check that the user still owns the relation */
if (!check_cluster_ownership(rvtc->tableOid))
return;
/* Check that the index is still the one with indisclustered set.
* If this is a standalone cluster, skip this test.
*/
if (rvtc->isPrevious)
{
HeapTuple tuple;
Form_pg_index indexForm;
tuple = SearchSysCache(INDEXRELID,
ObjectIdGetDatum(rvtc->indexOid),
0, 0, 0);
indexForm = (Form_pg_index) GETSTRUCT(tuple);
if (!indexForm->indisclustered)
{
ReleaseSysCache(tuple);
return;
}
ReleaseSysCache(tuple);
}
/*
* We grab exclusive access to the target rel and index for the
* duration of the transaction.
*/
OldHeap = heap_openrv(oldrelation, AccessExclusiveLock);
OIDOldHeap = RelationGetRelid(OldHeap);
OldHeap = heap_open(rvtc->tableOid, AccessExclusiveLock);
/*
* The index is expected to be in the same namespace as the relation.
*/
OIDOldIndex = get_relname_relid(oldindexname,
RelationGetNamespace(OldHeap));
if (!OidIsValid(OIDOldIndex))
elog(ERROR, "CLUSTER: cannot find index \"%s\" for table \"%s\"",
oldindexname, RelationGetRelationName(OldHeap));
OldIndex = index_open(OIDOldIndex);
OldIndex = index_open(rvtc->indexOid);
LockRelation(OldIndex, AccessExclusiveLock);
/*
* Check that index is in fact an index on the given relation
*/
if (OldIndex->rd_index == NULL ||
OldIndex->rd_index->indrelid != OIDOldHeap)
OldIndex->rd_index->indrelid != rvtc->tableOid)
elog(ERROR, "CLUSTER: \"%s\" is not an index for table \"%s\"",
RelationGetRelationName(OldIndex),
RelationGetRelationName(OldHeap));
@ -122,7 +166,7 @@ cluster(RangeVar *oldrelation, char *oldindexname)
RelationGetRelationName(OldHeap));
/* Save the information of all indexes on the relation. */
indexes = get_indexattr_list(OldHeap, OIDOldIndex);
indexes = get_indexattr_list(OldHeap, rvtc->indexOid);
/* Drop relcache refcnts, but do NOT give up the locks */
index_close(OldIndex);
@ -136,9 +180,9 @@ cluster(RangeVar *oldrelation, char *oldindexname)
* namespace from the old, or we will have problems with the TEMP
* status of temp tables.
*/
snprintf(NewHeapName, NAMEDATALEN, "pg_temp_%u", OIDOldHeap);
snprintf(NewHeapName, NAMEDATALEN, "pg_temp_%u", rvtc->tableOid);
OIDNewHeap = make_new_heap(OIDOldHeap, NewHeapName);
OIDNewHeap = make_new_heap(rvtc->tableOid, NewHeapName);
/*
* We don't need CommandCounterIncrement() because make_new_heap did
@ -148,13 +192,13 @@ cluster(RangeVar *oldrelation, char *oldindexname)
/*
* Copy the heap data into the new table in the desired order.
*/
copy_heap_data(OIDNewHeap, OIDOldHeap, OIDOldIndex);
copy_heap_data(OIDNewHeap, rvtc->tableOid, rvtc->indexOid);
/* To make the new heap's data visible (probably not needed?). */
CommandCounterIncrement();
/* Swap the relfilenodes of the old and new heaps. */
swap_relfilenodes(OIDOldHeap, OIDNewHeap);
swap_relfilenodes(rvtc->tableOid, OIDNewHeap);
CommandCounterIncrement();
@ -175,7 +219,7 @@ cluster(RangeVar *oldrelation, char *oldindexname)
* Recreate each index on the relation. We do not need
* CommandCounterIncrement() because recreate_indexattr does it.
*/
recreate_indexattr(OIDOldHeap, indexes);
recreate_indexattr(rvtc->tableOid, indexes);
}
/*
@ -571,3 +615,236 @@ swap_relfilenodes(Oid r1, Oid r2)
heap_close(relRelation, RowExclusiveLock);
}
/*---------------------------------------------------------------------------
* This cluster code allows for clustering multiple tables at once. Because
* of this, we cannot just run everything on a single transaction, or we
* would be forced to acquire exclusive locks on all the tables being
* clustered. To solve this we follow a similar strategy to VACUUM code,
* clustering each relation in a separate transaction. For this to work,
* we need to:
* - provide a separate memory context so that we can pass information in
* a way that trascends transactions
* - start a new transaction every time a new relation is clustered
* - check for validity of the information on to-be-clustered relations,
* as someone might have deleted a relation behind our back, or
* clustered one on a different index
* - end the transaction
*
* The single relation code does not have any overhead.
*
* We also allow a relation being specified without index. In that case,
* the indisclustered bit will be looked up, and an ERROR will be thrown
* if there is no index with the bit set.
*---------------------------------------------------------------------------
*/
void
cluster(ClusterStmt *stmt)
{
/* This is the single relation case. */
if (stmt->relation != NULL)
{
Oid indexOid = InvalidOid,
tableOid;
relToCluster rvtc;
HeapTuple tuple;
Form_pg_class classForm;
tableOid = RangeVarGetRelid(stmt->relation, false);
if (!check_cluster_ownership(tableOid))
elog(ERROR, "CLUSTER: You do not own relation %s",
stmt->relation->relname);
tuple = SearchSysCache(RELOID,
ObjectIdGetDatum(tableOid),
0, 0, 0);
if (!HeapTupleIsValid(tuple))
elog(ERROR, "Cache lookup failed for relation %u", tableOid);
classForm = (Form_pg_class) GETSTRUCT(tuple);
if (stmt->indexname == NULL)
{
List *index;
Relation rel = RelationIdGetRelation(tableOid);
HeapTuple ituple = NULL,
idxtuple = NULL;
/* We need to fetch the index that has indisclustered set. */
foreach (index, RelationGetIndexList(rel))
{
Form_pg_index indexForm;
indexOid = lfirsti(index);
ituple = SearchSysCache(RELOID,
ObjectIdGetDatum(indexOid),
0, 0, 0);
if (!HeapTupleIsValid(ituple))
elog(ERROR, "Cache lookup failed for relation %u", indexOid);
idxtuple = SearchSysCache(INDEXRELID,
ObjectIdGetDatum(HeapTupleGetOid(ituple)),
0, 0, 0);
if (!HeapTupleIsValid(idxtuple))
elog(ERROR, "Cache lookup failed for index %u", HeapTupleGetOid(ituple));
indexForm = (Form_pg_index) GETSTRUCT(idxtuple);
if (indexForm->indisclustered)
break;
indexOid = InvalidOid;
}
if (indexOid == InvalidOid)
elog(ERROR, "CLUSTER: No previously clustered index found on table %s",
stmt->relation->relname);
RelationClose(rel);
ReleaseSysCache(ituple);
ReleaseSysCache(idxtuple);
}
else
{
/* The index is expected to be in the same namespace as the relation. */
indexOid = get_relname_relid(stmt->indexname, classForm->relnamespace);
}
ReleaseSysCache(tuple);
/* XXX Maybe the namespace should be reported as well */
if (!OidIsValid(indexOid))
elog(ERROR, "CLUSTER: cannot find index \"%s\" for table \"%s\"",
stmt->indexname, stmt->relation->relname);
rvtc.tableOid = tableOid;
rvtc.indexOid = indexOid;
rvtc.isPrevious = false;
/* Do the job */
cluster_rel(&rvtc);
}
else
{
/*
* This is the "no relation" case. We need to cluster all tables
* that have some index with indisclustered set.
*/
relToCluster *rvtc;
List *rv,
*rvs;
/*
* We cannot run CLUSTER ALL inside a user transaction block; if we were inside
* a transaction, then our commit- and start-transaction-command calls
* would not have the intended effect!
*/
if (IsTransactionBlock())
elog(ERROR, "CLUSTER cannot run inside a BEGIN/END block");
/* Running CLUSTER from a function would free the function context */
if (!MemoryContextContains(QueryContext, stmt))
elog(ERROR, "CLUSTER cannot be called from a function");
/*
* Create special memory context for cross-transaction storage.
*
* Since it is a child of QueryContext, it will go away even in case
* of error.
*/
cluster_context = AllocSetContextCreate(QueryContext,
"Cluster",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/*
* Build the list of relations to cluster. Note that this lives in
* cluster_context.
*/
rvs = get_tables_to_cluster(GetUserId());
/* Ok, now that we've got them all, cluster them one by one */
foreach (rv, rvs)
{
rvtc = (relToCluster *)lfirst(rv);
/* Start a new transaction for this relation. */
StartTransactionCommand(true);
cluster_rel(rvtc);
CommitTransactionCommand(true);
}
}
/* Start a new transaction for the cleanup work. */
StartTransactionCommand(true);
/* Clean up working storage */
if (stmt->relation == NULL)
{
MemoryContextDelete(cluster_context);
cluster_context = NULL;
}
}
/* Checks if the user owns the relation. Superusers
* are allowed to cluster any table.
*/
bool
check_cluster_ownership(Oid relOid)
{
/* Superusers bypass this check */
return pg_class_ownercheck(relOid, GetUserId());
}
/* Get a list of tables that the current user owns and
* have indisclustered set. Return the list in a List * of rvsToCluster
* with the tableOid and the indexOid on which the table is already
* clustered.
*/
List *
get_tables_to_cluster(Oid owner)
{
Relation indRelation;
HeapScanDesc scan;
ScanKeyData entry;
HeapTuple indexTuple;
Form_pg_index index;
relToCluster *rvtc;
List *rvs = NIL;
/*
* Get all indexes that have indisclustered set. System
* relations or nailed-in relations cannot ever have
* indisclustered set, because CLUSTER will refuse to
* set it when called with one of them as argument.
*/
indRelation = relation_openr(IndexRelationName, RowExclusiveLock);
ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indisclustered,
F_BOOLEQ, true);
scan = heap_beginscan(indRelation, SnapshotNow, 1, &entry);
while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
MemoryContext old_context = NULL;
index = (Form_pg_index) GETSTRUCT(indexTuple);
if (!check_cluster_ownership(index->indrelid))
continue;
/*
* We have to build the struct in a different memory context so
* it will survive the cross-transaction processing
*/
old_context = MemoryContextSwitchTo(cluster_context);
rvtc = (relToCluster *)palloc(sizeof(relToCluster));
rvtc->indexOid = index->indexrelid;
rvtc->tableOid = index->indrelid;
rvtc->isPrevious = true;
rvs = lcons((void *)rvtc, rvs);
MemoryContextSwitchTo(old_context);
}
heap_endscan(scan);
/*
* Release the lock on pg_index. We will check the indexes
* later again.
*
*/
relation_close(indRelation, RowExclusiveLock);
return rvs;
}