diff --git a/doc/src/sgml/ref/cluster.sgml b/doc/src/sgml/ref/cluster.sgml
index e7aaa522811..a49d4d7d396 100644
--- a/doc/src/sgml/ref/cluster.sgml
+++ b/doc/src/sgml/ref/cluster.sgml
@@ -1,5 +1,5 @@
@@ -22,6 +22,8 @@ PostgreSQL documentation
CLUSTER indexname ON tablename
+CLUSTER tablename
+CLUSTER ALL
@@ -104,6 +106,20 @@ CLUSTER
periodically re-cluster by issuing the command again.
+
+ When a table is clustered, PostgreSQL
+ remembers on which index it was clustered. In calls to
+ CLUSTER tablename,
+ the table is clustered on the same index that it was clustered before.
+
+
+
+ In calls to CLUSTER ALL, all the tables in the database
+ that the calling user owns are clustered using the saved information. This
+ form of CLUSTER cannot be called from inside a
+ transaction or function.
+
+
1998-09-08
@@ -141,8 +157,15 @@ CLUSTER
- CLUSTER preserves GRANT, inheritance, index, foreign key, and other
- ancillary information about the table.
+ CLUSTER preserves GRANT, inheritance, index, foreign
+ key, and other ancillary information about the table.
+
+
+
+ Because CLUSTER remembers the clustering information,
+ one can cluster the tables one wants clustered manually the first time, and
+ setup a timed event similar to VACUUM so that the tables
+ are periodically and automatically clustered.
@@ -192,6 +215,18 @@ SELECT columnlist INTO TABLE
CLUSTER emp_ind ON emp;
+
+ Cluster the employees relation using the same index that was used before:
+
+
+CLUSTER emp;
+
+
+ Cluster all the tables on the database that have previously been clustered:
+
+
+CLUSTER ALL;
+
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 48cc81ea841..db78a179b69 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -11,7 +11,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.93 2002/11/11 22:19:21 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.94 2002/11/15 03:09:35 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -25,9 +25,11 @@
#include "catalog/index.h"
#include "catalog/indexing.h"
#include "catalog/catname.h"
+#include "catalog/namespace.h"
#include "commands/cluster.h"
#include "commands/tablecmds.h"
#include "miscadmin.h"
+#include "utils/acl.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
@@ -48,12 +50,27 @@ typedef struct
bool isclustered;
} IndexAttrs;
+/* This struct is used to pass around the information on tables to be
+ * clustered. We need this so we can make a list of them when invoked without
+ * a specific table/index pair.
+ */
+typedef struct
+{
+ Oid tableOid;
+ Oid indexOid;
+ bool isPrevious;
+} relToCluster;
+
static Oid make_new_heap(Oid OIDOldHeap, const char *NewName);
static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
static List *get_indexattr_list(Relation OldHeap, Oid OldIndex);
static void recreate_indexattr(Oid OIDOldHeap, List *indexes);
static void swap_relfilenodes(Oid r1, Oid r2);
+static void cluster_rel(relToCluster *rv);
+static bool check_cluster_ownership(Oid relOid);
+static List *get_tables_to_cluster(Oid owner);
+static MemoryContext cluster_context = NULL;
/*
* cluster
@@ -69,43 +86,70 @@ static void swap_relfilenodes(Oid r1, Oid r2);
* the new table, it's better to create the indexes afterwards than to fill
* them incrementally while we load the table.
*
- * Permissions checks were done already.
+ * Since we may open a new transaction for each relation, we have to
+ * check that the relation still is what we think it is.
*/
void
-cluster(RangeVar *oldrelation, char *oldindexname)
+cluster_rel(relToCluster *rvtc)
{
- Oid OIDOldHeap,
- OIDOldIndex,
- OIDNewHeap;
+ Oid OIDNewHeap;
Relation OldHeap,
OldIndex;
char NewHeapName[NAMEDATALEN];
ObjectAddress object;
List *indexes;
+ /* Check for user-requested abort. */
+ CHECK_FOR_INTERRUPTS();
+
+ /* Check if the relation and index still exist before opening them
+ */
+ if (!SearchSysCacheExists(RELOID,
+ ObjectIdGetDatum(rvtc->tableOid),
+ 0, 0, 0) ||
+ !SearchSysCacheExists(RELOID,
+ ObjectIdGetDatum(rvtc->indexOid),
+ 0, 0, 0))
+ return;
+
+ /* Check that the user still owns the relation */
+ if (!check_cluster_ownership(rvtc->tableOid))
+ return;
+
+ /* Check that the index is still the one with indisclustered set.
+ * If this is a standalone cluster, skip this test.
+ */
+ if (rvtc->isPrevious)
+ {
+ HeapTuple tuple;
+ Form_pg_index indexForm;
+
+ tuple = SearchSysCache(INDEXRELID,
+ ObjectIdGetDatum(rvtc->indexOid),
+ 0, 0, 0);
+ indexForm = (Form_pg_index) GETSTRUCT(tuple);
+ if (!indexForm->indisclustered)
+ {
+ ReleaseSysCache(tuple);
+ return;
+ }
+ ReleaseSysCache(tuple);
+ }
+
/*
* We grab exclusive access to the target rel and index for the
* duration of the transaction.
*/
- OldHeap = heap_openrv(oldrelation, AccessExclusiveLock);
- OIDOldHeap = RelationGetRelid(OldHeap);
+ OldHeap = heap_open(rvtc->tableOid, AccessExclusiveLock);
- /*
- * The index is expected to be in the same namespace as the relation.
- */
- OIDOldIndex = get_relname_relid(oldindexname,
- RelationGetNamespace(OldHeap));
- if (!OidIsValid(OIDOldIndex))
- elog(ERROR, "CLUSTER: cannot find index \"%s\" for table \"%s\"",
- oldindexname, RelationGetRelationName(OldHeap));
- OldIndex = index_open(OIDOldIndex);
+ OldIndex = index_open(rvtc->indexOid);
LockRelation(OldIndex, AccessExclusiveLock);
/*
* Check that index is in fact an index on the given relation
*/
if (OldIndex->rd_index == NULL ||
- OldIndex->rd_index->indrelid != OIDOldHeap)
+ OldIndex->rd_index->indrelid != rvtc->tableOid)
elog(ERROR, "CLUSTER: \"%s\" is not an index for table \"%s\"",
RelationGetRelationName(OldIndex),
RelationGetRelationName(OldHeap));
@@ -122,7 +166,7 @@ cluster(RangeVar *oldrelation, char *oldindexname)
RelationGetRelationName(OldHeap));
/* Save the information of all indexes on the relation. */
- indexes = get_indexattr_list(OldHeap, OIDOldIndex);
+ indexes = get_indexattr_list(OldHeap, rvtc->indexOid);
/* Drop relcache refcnts, but do NOT give up the locks */
index_close(OldIndex);
@@ -136,9 +180,9 @@ cluster(RangeVar *oldrelation, char *oldindexname)
* namespace from the old, or we will have problems with the TEMP
* status of temp tables.
*/
- snprintf(NewHeapName, NAMEDATALEN, "pg_temp_%u", OIDOldHeap);
+ snprintf(NewHeapName, NAMEDATALEN, "pg_temp_%u", rvtc->tableOid);
- OIDNewHeap = make_new_heap(OIDOldHeap, NewHeapName);
+ OIDNewHeap = make_new_heap(rvtc->tableOid, NewHeapName);
/*
* We don't need CommandCounterIncrement() because make_new_heap did
@@ -148,13 +192,13 @@ cluster(RangeVar *oldrelation, char *oldindexname)
/*
* Copy the heap data into the new table in the desired order.
*/
- copy_heap_data(OIDNewHeap, OIDOldHeap, OIDOldIndex);
+ copy_heap_data(OIDNewHeap, rvtc->tableOid, rvtc->indexOid);
/* To make the new heap's data visible (probably not needed?). */
CommandCounterIncrement();
/* Swap the relfilenodes of the old and new heaps. */
- swap_relfilenodes(OIDOldHeap, OIDNewHeap);
+ swap_relfilenodes(rvtc->tableOid, OIDNewHeap);
CommandCounterIncrement();
@@ -175,7 +219,7 @@ cluster(RangeVar *oldrelation, char *oldindexname)
* Recreate each index on the relation. We do not need
* CommandCounterIncrement() because recreate_indexattr does it.
*/
- recreate_indexattr(OIDOldHeap, indexes);
+ recreate_indexattr(rvtc->tableOid, indexes);
}
/*
@@ -571,3 +615,236 @@ swap_relfilenodes(Oid r1, Oid r2)
heap_close(relRelation, RowExclusiveLock);
}
+
+/*---------------------------------------------------------------------------
+ * This cluster code allows for clustering multiple tables at once. Because
+ * of this, we cannot just run everything on a single transaction, or we
+ * would be forced to acquire exclusive locks on all the tables being
+ * clustered. To solve this we follow a similar strategy to VACUUM code,
+ * clustering each relation in a separate transaction. For this to work,
+ * we need to:
+ * - provide a separate memory context so that we can pass information in
+ * a way that trascends transactions
+ * - start a new transaction every time a new relation is clustered
+ * - check for validity of the information on to-be-clustered relations,
+ * as someone might have deleted a relation behind our back, or
+ * clustered one on a different index
+ * - end the transaction
+ *
+ * The single relation code does not have any overhead.
+ *
+ * We also allow a relation being specified without index. In that case,
+ * the indisclustered bit will be looked up, and an ERROR will be thrown
+ * if there is no index with the bit set.
+ *---------------------------------------------------------------------------
+ */
+void
+cluster(ClusterStmt *stmt)
+{
+
+ /* This is the single relation case. */
+ if (stmt->relation != NULL)
+ {
+ Oid indexOid = InvalidOid,
+ tableOid;
+ relToCluster rvtc;
+ HeapTuple tuple;
+ Form_pg_class classForm;
+
+ tableOid = RangeVarGetRelid(stmt->relation, false);
+ if (!check_cluster_ownership(tableOid))
+ elog(ERROR, "CLUSTER: You do not own relation %s",
+ stmt->relation->relname);
+
+ tuple = SearchSysCache(RELOID,
+ ObjectIdGetDatum(tableOid),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "Cache lookup failed for relation %u", tableOid);
+ classForm = (Form_pg_class) GETSTRUCT(tuple);
+
+ if (stmt->indexname == NULL)
+ {
+ List *index;
+ Relation rel = RelationIdGetRelation(tableOid);
+ HeapTuple ituple = NULL,
+ idxtuple = NULL;
+
+ /* We need to fetch the index that has indisclustered set. */
+ foreach (index, RelationGetIndexList(rel))
+ {
+ Form_pg_index indexForm;
+
+ indexOid = lfirsti(index);
+ ituple = SearchSysCache(RELOID,
+ ObjectIdGetDatum(indexOid),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(ituple))
+ elog(ERROR, "Cache lookup failed for relation %u", indexOid);
+ idxtuple = SearchSysCache(INDEXRELID,
+ ObjectIdGetDatum(HeapTupleGetOid(ituple)),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(idxtuple))
+ elog(ERROR, "Cache lookup failed for index %u", HeapTupleGetOid(ituple));
+ indexForm = (Form_pg_index) GETSTRUCT(idxtuple);
+ if (indexForm->indisclustered)
+ break;
+ indexOid = InvalidOid;
+ }
+ if (indexOid == InvalidOid)
+ elog(ERROR, "CLUSTER: No previously clustered index found on table %s",
+ stmt->relation->relname);
+ RelationClose(rel);
+ ReleaseSysCache(ituple);
+ ReleaseSysCache(idxtuple);
+ }
+ else
+ {
+ /* The index is expected to be in the same namespace as the relation. */
+ indexOid = get_relname_relid(stmt->indexname, classForm->relnamespace);
+ }
+ ReleaseSysCache(tuple);
+
+ /* XXX Maybe the namespace should be reported as well */
+ if (!OidIsValid(indexOid))
+ elog(ERROR, "CLUSTER: cannot find index \"%s\" for table \"%s\"",
+ stmt->indexname, stmt->relation->relname);
+ rvtc.tableOid = tableOid;
+ rvtc.indexOid = indexOid;
+ rvtc.isPrevious = false;
+
+ /* Do the job */
+ cluster_rel(&rvtc);
+ }
+ else
+ {
+ /*
+ * This is the "no relation" case. We need to cluster all tables
+ * that have some index with indisclustered set.
+ */
+
+ relToCluster *rvtc;
+ List *rv,
+ *rvs;
+
+ /*
+ * We cannot run CLUSTER ALL inside a user transaction block; if we were inside
+ * a transaction, then our commit- and start-transaction-command calls
+ * would not have the intended effect!
+ */
+ if (IsTransactionBlock())
+ elog(ERROR, "CLUSTER cannot run inside a BEGIN/END block");
+
+ /* Running CLUSTER from a function would free the function context */
+ if (!MemoryContextContains(QueryContext, stmt))
+ elog(ERROR, "CLUSTER cannot be called from a function");
+ /*
+ * Create special memory context for cross-transaction storage.
+ *
+ * Since it is a child of QueryContext, it will go away even in case
+ * of error.
+ */
+ cluster_context = AllocSetContextCreate(QueryContext,
+ "Cluster",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ /*
+ * Build the list of relations to cluster. Note that this lives in
+ * cluster_context.
+ */
+ rvs = get_tables_to_cluster(GetUserId());
+
+ /* Ok, now that we've got them all, cluster them one by one */
+ foreach (rv, rvs)
+ {
+ rvtc = (relToCluster *)lfirst(rv);
+
+ /* Start a new transaction for this relation. */
+ StartTransactionCommand(true);
+ cluster_rel(rvtc);
+ CommitTransactionCommand(true);
+ }
+ }
+
+ /* Start a new transaction for the cleanup work. */
+ StartTransactionCommand(true);
+
+ /* Clean up working storage */
+ if (stmt->relation == NULL)
+ {
+ MemoryContextDelete(cluster_context);
+ cluster_context = NULL;
+ }
+}
+
+/* Checks if the user owns the relation. Superusers
+ * are allowed to cluster any table.
+ */
+bool
+check_cluster_ownership(Oid relOid)
+{
+ /* Superusers bypass this check */
+ return pg_class_ownercheck(relOid, GetUserId());
+}
+
+/* Get a list of tables that the current user owns and
+ * have indisclustered set. Return the list in a List * of rvsToCluster
+ * with the tableOid and the indexOid on which the table is already
+ * clustered.
+ */
+List *
+get_tables_to_cluster(Oid owner)
+{
+ Relation indRelation;
+ HeapScanDesc scan;
+ ScanKeyData entry;
+ HeapTuple indexTuple;
+ Form_pg_index index;
+ relToCluster *rvtc;
+ List *rvs = NIL;
+
+ /*
+ * Get all indexes that have indisclustered set. System
+ * relations or nailed-in relations cannot ever have
+ * indisclustered set, because CLUSTER will refuse to
+ * set it when called with one of them as argument.
+ */
+ indRelation = relation_openr(IndexRelationName, RowExclusiveLock);
+ ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indisclustered,
+ F_BOOLEQ, true);
+ scan = heap_beginscan(indRelation, SnapshotNow, 1, &entry);
+ while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ MemoryContext old_context = NULL;
+
+ index = (Form_pg_index) GETSTRUCT(indexTuple);
+ if (!check_cluster_ownership(index->indrelid))
+ continue;
+
+ /*
+ * We have to build the struct in a different memory context so
+ * it will survive the cross-transaction processing
+ */
+
+ old_context = MemoryContextSwitchTo(cluster_context);
+
+ rvtc = (relToCluster *)palloc(sizeof(relToCluster));
+ rvtc->indexOid = index->indexrelid;
+ rvtc->tableOid = index->indrelid;
+ rvtc->isPrevious = true;
+ rvs = lcons((void *)rvtc, rvs);
+
+ MemoryContextSwitchTo(old_context);
+ }
+ heap_endscan(scan);
+
+ /*
+ * Release the lock on pg_index. We will check the indexes
+ * later again.
+ *
+ */
+ relation_close(indRelation, RowExclusiveLock);
+ return rvs;
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index b3ca71013c9..2eef0c04904 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -11,7 +11,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/parser/gram.y,v 2.378 2002/11/15 02:50:08 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/parser/gram.y,v 2.379 2002/11/15 03:09:35 momjian Exp $
*
* HISTORY
* AUTHOR DATE MAJOR EVENT
@@ -3761,6 +3761,8 @@ CreateConversionStmt:
*
* QUERY:
* cluster on
+ * cluster
+ * cluster ALL
*
*****************************************************************************/
@@ -3772,6 +3774,20 @@ ClusterStmt:
n->indexname = $2;
$$ = (Node*)n;
}
+ | CLUSTER qualified_name
+ {
+ ClusterStmt *n = makeNode(ClusterStmt);
+ n->relation = $2;
+ n->indexname = NULL;
+ $$ = (Node*)n;
+ }
+ | CLUSTER ALL
+ {
+ ClusterStmt *n = makeNode(ClusterStmt);
+ n->relation = NULL;
+ n->indexname = NULL;
+ $$ = (Node*)n;
+ }
;
/*****************************************************************************
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index ddd5c7626a6..493dbd7587a 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -10,7 +10,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/tcop/utility.c,v 1.181 2002/11/13 00:44:09 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/tcop/utility.c,v 1.182 2002/11/15 03:09:38 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -186,7 +186,6 @@ ProcessUtility(Node *parsetree,
CommandDest dest,
char *completionTag)
{
- char *relname;
if (completionTag)
completionTag[0] = '\0';
@@ -702,9 +701,7 @@ ProcessUtility(Node *parsetree,
{
ClusterStmt *stmt = (ClusterStmt *) parsetree;
- CheckOwnership(stmt->relation, true);
-
- cluster(stmt->relation, stmt->indexname);
+ cluster(stmt);
}
break;
@@ -833,8 +830,8 @@ ProcessUtility(Node *parsetree,
switch (stmt->reindexType)
{
+ char *relname;
case INDEX:
- relname = (char *) stmt->relation->relname;
CheckOwnership(stmt->relation, false);
ReindexIndex(stmt->relation, stmt->force);
break;
diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h
index ff3a8d82d39..2490278b6ca 100644
--- a/src/include/commands/cluster.h
+++ b/src/include/commands/cluster.h
@@ -6,16 +6,17 @@
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994-5, Regents of the University of California
*
- * $Id: cluster.h,v 1.15 2002/08/10 21:00:34 momjian Exp $
+ * $Id: cluster.h,v 1.16 2002/11/15 03:09:39 momjian Exp $
*
*-------------------------------------------------------------------------
*/
#ifndef CLUSTER_H
#define CLUSTER_H
+#include
/*
* functions
*/
-extern void cluster(RangeVar *oldrelation, char *oldindexname);
+extern void cluster(ClusterStmt *stmt);
#endif /* CLUSTER_H */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 1198a81de5e..cd976cd1a14 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: parsenodes.h,v 1.214 2002/11/15 02:50:12 momjian Exp $
+ * $Id: parsenodes.h,v 1.215 2002/11/15 03:09:39 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -1533,7 +1533,7 @@ typedef struct DropdbStmt
typedef struct ClusterStmt
{
NodeTag type;
- RangeVar *relation; /* relation being indexed */
+ RangeVar *relation; /* relation being indexed, or NULL if all */
char *indexname; /* original index defined */
} ClusterStmt;
diff --git a/src/test/regress/expected/cluster.out b/src/test/regress/expected/cluster.out
index 6a2ba61e832..ee39c3a8482 100644
--- a/src/test/regress/expected/cluster.out
+++ b/src/test/regress/expected/cluster.out
@@ -285,3 +285,67 @@ WHERE pg_class.oid=indexrelid
clstr_tst_c
(1 row)
+-- Verify that clustering all tables does in fact cluster the right ones
+CREATE USER clstr_user;
+CREATE TABLE clstr_1 (a INT PRIMARY KEY);
+NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index 'clstr_1_pkey' for table 'clstr_1'
+CREATE TABLE clstr_2 (a INT PRIMARY KEY);
+NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index 'clstr_2_pkey' for table 'clstr_2'
+CREATE TABLE clstr_3 (a INT PRIMARY KEY);
+NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index 'clstr_3_pkey' for table 'clstr_3'
+ALTER TABLE clstr_1 OWNER TO clstr_user;
+ALTER TABLE clstr_3 OWNER TO clstr_user;
+GRANT SELECT ON clstr_2 TO clstr_user;
+INSERT INTO clstr_1 VALUES (2);
+INSERT INTO clstr_1 VALUES (1);
+INSERT INTO clstr_2 VALUES (2);
+INSERT INTO clstr_2 VALUES (1);
+INSERT INTO clstr_3 VALUES (2);
+INSERT INTO clstr_3 VALUES (1);
+CLUSTER clstr_1_pkey ON clstr_1;
+CLUSTER clstr_2_pkey ON clstr_2;
+SELECT * FROM clstr_1 UNION ALL
+ SELECT * FROM clstr_2 UNION ALL
+ SELECT * FROM clstr_3;
+ a
+---
+ 1
+ 2
+ 1
+ 2
+ 2
+ 1
+(6 rows)
+
+-- revert to the original state
+DELETE FROM clstr_1;
+DELETE FROM clstr_2;
+DELETE FROM clstr_3;
+INSERT INTO clstr_1 VALUES (2);
+INSERT INTO clstr_1 VALUES (1);
+INSERT INTO clstr_2 VALUES (2);
+INSERT INTO clstr_2 VALUES (1);
+INSERT INTO clstr_3 VALUES (2);
+INSERT INTO clstr_3 VALUES (1);
+-- this user can only cluster clstr_1 and clstr_3, but the latter
+-- has not been clustered
+SET SESSION AUTHORIZATION clstr_user;
+CLUSTER ALL;
+SELECT * FROM clstr_1 UNION ALL
+ SELECT * FROM clstr_2 UNION ALL
+ SELECT * FROM clstr_3;
+ a
+---
+ 1
+ 2
+ 2
+ 1
+ 2
+ 1
+(6 rows)
+
+-- clean up
+\c -
+DROP TABLE clstr_1;
+DROP TABLE clstr_3;
+DROP USER clstr_user;
diff --git a/src/test/regress/sql/cluster.sql b/src/test/regress/sql/cluster.sql
index 384a185d09e..dc05f9f3635 100644
--- a/src/test/regress/sql/cluster.sql
+++ b/src/test/regress/sql/cluster.sql
@@ -86,3 +86,59 @@ WHERE pg_class.oid=indexrelid
AND indrelid=pg_class_2.oid
AND pg_class_2.relname = 'clstr_tst'
AND indisclustered;
+
+-- Verify that clustering all tables does in fact cluster the right ones
+CREATE USER clstr_user;
+CREATE TABLE clstr_1 (a INT PRIMARY KEY);
+CREATE TABLE clstr_2 (a INT PRIMARY KEY);
+CREATE TABLE clstr_3 (a INT PRIMARY KEY);
+ALTER TABLE clstr_1 OWNER TO clstr_user;
+ALTER TABLE clstr_3 OWNER TO clstr_user;
+GRANT SELECT ON clstr_2 TO clstr_user;
+INSERT INTO clstr_1 VALUES (2);
+INSERT INTO clstr_1 VALUES (1);
+INSERT INTO clstr_2 VALUES (2);
+INSERT INTO clstr_2 VALUES (1);
+INSERT INTO clstr_3 VALUES (2);
+INSERT INTO clstr_3 VALUES (1);
+
+-- "CLUSTER " on a table that hasn't been clustered
+CLUSTER clstr_2;
+
+CLUSTER clstr_1_pkey ON clstr_1;
+CLUSTER clstr_2_pkey ON clstr_2;
+SELECT * FROM clstr_1 UNION ALL
+ SELECT * FROM clstr_2 UNION ALL
+ SELECT * FROM clstr_3;
+
+-- revert to the original state
+DELETE FROM clstr_1;
+DELETE FROM clstr_2;
+DELETE FROM clstr_3;
+INSERT INTO clstr_1 VALUES (2);
+INSERT INTO clstr_1 VALUES (1);
+INSERT INTO clstr_2 VALUES (2);
+INSERT INTO clstr_2 VALUES (1);
+INSERT INTO clstr_3 VALUES (2);
+INSERT INTO clstr_3 VALUES (1);
+
+-- this user can only cluster clstr_1 and clstr_3, but the latter
+-- has not been clustered
+SET SESSION AUTHORIZATION clstr_user;
+CLUSTER ALL;
+SELECT * FROM clstr_1 UNION ALL
+ SELECT * FROM clstr_2 UNION ALL
+ SELECT * FROM clstr_3;
+
+-- cluster a single table using the indisclustered bit previously set
+DELETE FROM clstr_1;
+INSERT INTO clstr_1 VALUES (2);
+INSERT INTO clstr_1 VALUES (1);
+CLUSTER clstr_1;
+SELECT * FROM clstr_1;
+
+-- clean up
+\c -
+DROP TABLE clstr_1;
+DROP TABLE clstr_3;
+DROP USER clstr_user;