1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-06 07:49:08 +03:00

Local partitioned indexes

When CREATE INDEX is run on a partitioned table, create catalog entries
for an index on the partitioned table (which is just a placeholder since
the table proper has no data of its own), and recurse to create actual
indexes on the existing partitions; create them in future partitions
also.

As a convenience gadget, if the new index definition matches some
existing index in partitions, these are picked up and used instead of
creating new ones.  Whichever way these indexes come about, they become
attached to the index on the parent table and are dropped alongside it,
and cannot be dropped on isolation unless they are detached first.

To support pg_dump'ing these indexes, add commands
    CREATE INDEX ON ONLY <table>
(which creates the index on the parent partitioned table, without
recursing) and
    ALTER INDEX ATTACH PARTITION
(which is used after the indexes have been created individually on each
partition, to attach them to the parent index).  These reconstruct prior
database state exactly.

Reviewed-by: (in alphabetical order) Peter Eisentraut, Robert Haas, Amit
	Langote, Jesper Pedersen, Simon Riggs, David Rowley
Discussion: https://postgr.es/m/20171113170646.gzweigyrgg6pwsg4@alvherre.pgsql
This commit is contained in:
Alvaro Herrera
2018-01-19 11:49:22 -03:00
parent 1ef61ddce9
commit 8b08f7d482
49 changed files with 3172 additions and 182 deletions

View File

@@ -23,7 +23,10 @@
#include "catalog/catalog.h"
#include "catalog/index.h"
#include "catalog/indexing.h"
#include "catalog/partition.h"
#include "catalog/pg_am.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_inherits_fn.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_opfamily.h"
#include "catalog/pg_tablespace.h"
@@ -35,6 +38,7 @@
#include "commands/tablespace.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/clauses.h"
#include "optimizer/planner.h"
@@ -42,6 +46,7 @@
#include "parser/parse_coerce.h"
#include "parser/parse_func.h"
#include "parser/parse_oper.h"
#include "rewrite/rewriteManip.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
@@ -77,6 +82,7 @@ static char *ChooseIndexNameAddition(List *colnames);
static List *ChooseIndexColumnNames(List *indexElems);
static void RangeVarCallbackForReindexIndex(const RangeVar *relation,
Oid relId, Oid oldRelId, void *arg);
static void ReindexPartitionedIndex(Relation parentIdx);
/*
* CheckIndexCompatible
@@ -183,6 +189,7 @@ CheckIndexCompatible(Oid oldId,
indexInfo->ii_ExclusionOps = NULL;
indexInfo->ii_ExclusionProcs = NULL;
indexInfo->ii_ExclusionStrats = NULL;
indexInfo->ii_Am = accessMethodId;
indexInfo->ii_AmCache = NULL;
indexInfo->ii_Context = CurrentMemoryContext;
typeObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
@@ -292,14 +299,15 @@ CheckIndexCompatible(Oid oldId,
* 'stmt': IndexStmt describing the properties of the new index.
* 'indexRelationId': normally InvalidOid, but during bootstrap can be
* nonzero to specify a preselected OID for the index.
* 'parentIndexId': the OID of the parent index; InvalidOid if not the child
* of a partitioned index.
* 'is_alter_table': this is due to an ALTER rather than a CREATE operation.
* 'check_rights': check for CREATE rights in namespace and tablespace. (This
* should be true except when ALTER is deleting/recreating an index.)
* 'check_not_in_use': check for table not already in use in current session.
* This should be true unless caller is holding the table open, in which
* case the caller had better have checked it earlier.
* 'skip_build': make the catalog entries but leave the index file empty;
* it will be filled later.
* 'skip_build': make the catalog entries but don't create the index files
* 'quiet': suppress the NOTICE chatter ordinarily provided for constraints.
*
* Returns the object address of the created index.
@@ -308,6 +316,7 @@ ObjectAddress
DefineIndex(Oid relationId,
IndexStmt *stmt,
Oid indexRelationId,
Oid parentIndexId,
bool is_alter_table,
bool check_rights,
bool check_not_in_use,
@@ -330,6 +339,7 @@ DefineIndex(Oid relationId,
IndexAmRoutine *amRoutine;
bool amcanorder;
amoptions_function amoptions;
bool partitioned;
Datum reloptions;
int16 *coloptions;
IndexInfo *indexInfo;
@@ -382,23 +392,56 @@ DefineIndex(Oid relationId,
{
case RELKIND_RELATION:
case RELKIND_MATVIEW:
case RELKIND_PARTITIONED_TABLE:
/* OK */
break;
case RELKIND_FOREIGN_TABLE:
/*
* Custom error message for FOREIGN TABLE since the term is close
* to a regular table and can confuse the user.
*/
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot create index on foreign table \"%s\"",
RelationGetRelationName(rel))));
case RELKIND_PARTITIONED_TABLE:
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot create index on partitioned table \"%s\"",
RelationGetRelationName(rel))));
default:
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table or materialized view",
RelationGetRelationName(rel))));
break;
}
/*
* Establish behavior for partitioned tables, and verify sanity of
* parameters.
*
* We do not build an actual index in this case; we only create a few
* catalog entries. The actual indexes are built by recursing for each
* partition.
*/
partitioned = rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE;
if (partitioned)
{
if (stmt->concurrent)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create index on partitioned table \"%s\" concurrently",
RelationGetRelationName(rel))));
if (stmt->unique)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create unique index on partitioned table \"%s\"",
RelationGetRelationName(rel))));
if (stmt->excludeOpNames)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create exclusion constraints on partitioned table \"%s\"",
RelationGetRelationName(rel))));
if (stmt->primary || stmt->isconstraint)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create constraints on partitioned tables")));
}
/*
@@ -574,6 +617,7 @@ DefineIndex(Oid relationId,
indexInfo->ii_ReadyForInserts = !stmt->concurrent;
indexInfo->ii_Concurrent = stmt->concurrent;
indexInfo->ii_BrokenHotChain = false;
indexInfo->ii_Am = accessMethodId;
indexInfo->ii_AmCache = NULL;
indexInfo->ii_Context = CurrentMemoryContext;
@@ -665,19 +709,24 @@ DefineIndex(Oid relationId,
/*
* Make the catalog entries for the index, including constraints. This
* step also actually builds the index, except if caller requested not to
* or in concurrent mode, in which case it'll be done later.
* or in concurrent mode, in which case it'll be done later, or
* doing a partitioned index (because those don't have storage).
*/
flags = constr_flags = 0;
if (stmt->isconstraint)
flags |= INDEX_CREATE_ADD_CONSTRAINT;
if (skip_build || stmt->concurrent)
if (skip_build || stmt->concurrent || partitioned)
flags |= INDEX_CREATE_SKIP_BUILD;
if (stmt->if_not_exists)
flags |= INDEX_CREATE_IF_NOT_EXISTS;
if (stmt->concurrent)
flags |= INDEX_CREATE_CONCURRENT;
if (partitioned)
flags |= INDEX_CREATE_PARTITIONED;
if (stmt->primary)
flags |= INDEX_CREATE_IS_PRIMARY;
if (partitioned && stmt->relation && !stmt->relation->inh)
flags |= INDEX_CREATE_INVALID;
if (stmt->deferrable)
constr_flags |= INDEX_CONSTR_CREATE_DEFERRABLE;
@@ -685,8 +734,8 @@ DefineIndex(Oid relationId,
constr_flags |= INDEX_CONSTR_CREATE_INIT_DEFERRED;
indexRelationId =
index_create(rel, indexRelationName, indexRelationId, stmt->oldNode,
indexInfo, indexColNames,
index_create(rel, indexRelationName, indexRelationId, parentIndexId,
stmt->oldNode, indexInfo, indexColNames,
accessMethodId, tablespaceId,
collationObjectId, classObjectId,
coloptions, reloptions,
@@ -706,6 +755,160 @@ DefineIndex(Oid relationId,
CreateComments(indexRelationId, RelationRelationId, 0,
stmt->idxcomment);
if (partitioned)
{
/*
* Unless caller specified to skip this step (via ONLY), process
* each partition to make sure they all contain a corresponding index.
*
* If we're called internally (no stmt->relation), recurse always.
*/
if (!stmt->relation || stmt->relation->inh)
{
PartitionDesc partdesc = RelationGetPartitionDesc(rel);
int nparts = partdesc->nparts;
Oid *part_oids = palloc(sizeof(Oid) * nparts);
bool invalidate_parent = false;
TupleDesc parentDesc;
Oid *opfamOids;
memcpy(part_oids, partdesc->oids, sizeof(Oid) * nparts);
parentDesc = CreateTupleDescCopy(RelationGetDescr(rel));
opfamOids = palloc(sizeof(Oid) * numberOfAttributes);
for (i = 0; i < numberOfAttributes; i++)
opfamOids[i] = get_opclass_family(classObjectId[i]);
heap_close(rel, NoLock);
/*
* For each partition, scan all existing indexes; if one matches
* our index definition and is not already attached to some other
* parent index, attach it to the one we just created.
*
* If none matches, build a new index by calling ourselves
* recursively with the same options (except for the index name).
*/
for (i = 0; i < nparts; i++)
{
Oid childRelid = part_oids[i];
Relation childrel;
List *childidxs;
ListCell *cell;
AttrNumber *attmap;
bool found = false;
int maplen;
childrel = heap_open(childRelid, lockmode);
childidxs = RelationGetIndexList(childrel);
attmap =
convert_tuples_by_name_map(RelationGetDescr(childrel),
parentDesc,
gettext_noop("could not convert row type"));
maplen = parentDesc->natts;
foreach(cell, childidxs)
{
Oid cldidxid = lfirst_oid(cell);
Relation cldidx;
IndexInfo *cldIdxInfo;
/* this index is already partition of another one */
if (has_superclass(cldidxid))
continue;
cldidx = index_open(cldidxid, lockmode);
cldIdxInfo = BuildIndexInfo(cldidx);
if (CompareIndexInfo(cldIdxInfo, indexInfo,
cldidx->rd_indcollation,
collationObjectId,
cldidx->rd_opfamily,
opfamOids,
attmap, maplen))
{
/*
* Found a match. Attach index to parent and we're
* done, but keep lock till commit.
*/
IndexSetParentIndex(cldidx, indexRelationId);
if (!IndexIsValid(cldidx->rd_index))
invalidate_parent = true;
found = true;
index_close(cldidx, NoLock);
break;
}
index_close(cldidx, lockmode);
}
list_free(childidxs);
heap_close(childrel, NoLock);
/*
* If no matching index was found, create our own.
*/
if (!found)
{
IndexStmt *childStmt = copyObject(stmt);
bool found_whole_row;
childStmt->whereClause =
map_variable_attnos(stmt->whereClause, 1, 0,
attmap, maplen,
InvalidOid, &found_whole_row);
if (found_whole_row)
elog(ERROR, "cannot convert whole-row table reference");
childStmt->idxname = NULL;
childStmt->relationId = childRelid;
DefineIndex(childRelid, childStmt,
InvalidOid, /* no predefined OID */
indexRelationId, /* this is our child */
false, check_rights, check_not_in_use,
false, quiet);
}
pfree(attmap);
}
/*
* The pg_index row we inserted for this index was marked
* indisvalid=true. But if we attached an existing index that
* is invalid, this is incorrect, so update our row to
* invalid too.
*/
if (invalidate_parent)
{
Relation pg_index = heap_open(IndexRelationId, RowExclusiveLock);
HeapTuple tup,
newtup;
tup = SearchSysCache1(INDEXRELID,
ObjectIdGetDatum(indexRelationId));
if (!tup)
elog(ERROR, "cache lookup failed for index %u",
indexRelationId);
newtup = heap_copytuple(tup);
((Form_pg_index) GETSTRUCT(newtup))->indisvalid = false;
CatalogTupleUpdate(pg_index, &tup->t_self, newtup);
ReleaseSysCache(tup);
heap_close(pg_index, RowExclusiveLock);
heap_freetuple(newtup);
}
}
else
heap_close(rel, NoLock);
/*
* Indexes on partitioned tables are not themselves built, so we're
* done here.
*/
return address;
}
if (!stmt->concurrent)
{
/* Close the heap and we're done, in the non-concurrent case */
@@ -1765,7 +1968,7 @@ ChooseIndexColumnNames(List *indexElems)
* ReindexIndex
* Recreate a specific index.
*/
Oid
void
ReindexIndex(RangeVar *indexRelation, int options)
{
Oid indOid;
@@ -1788,12 +1991,17 @@ ReindexIndex(RangeVar *indexRelation, int options)
* lock on the index.
*/
irel = index_open(indOid, NoLock);
if (irel->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
{
ReindexPartitionedIndex(irel);
return;
}
persistence = irel->rd_rel->relpersistence;
index_close(irel, NoLock);
reindex_index(indOid, false, persistence, options);
return indOid;
}
/*
@@ -1832,7 +2040,8 @@ RangeVarCallbackForReindexIndex(const RangeVar *relation,
relkind = get_rel_relkind(relId);
if (!relkind)
return;
if (relkind != RELKIND_INDEX)
if (relkind != RELKIND_INDEX &&
relkind != RELKIND_PARTITIONED_INDEX)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not an index", relation->relname)));
@@ -1976,6 +2185,12 @@ ReindexMultipleTables(const char *objectName, ReindexObjectType objectKind,
/*
* Only regular tables and matviews can have indexes, so ignore any
* other kind of relation.
*
* It is tempting to also consider partitioned tables here, but that
* has the problem that if the children are in the same schema, they
* would be processed twice. Maybe we could have a separate list of
* partitioned tables, and expand that afterwards into relids,
* ignoring any duplicates.
*/
if (classtuple->relkind != RELKIND_RELATION &&
classtuple->relkind != RELKIND_MATVIEW)
@@ -2038,3 +2253,155 @@ ReindexMultipleTables(const char *objectName, ReindexObjectType objectKind,
MemoryContextDelete(private_context);
}
/*
* ReindexPartitionedIndex
* Reindex each child of the given partitioned index.
*
* Not yet implemented.
*/
static void
ReindexPartitionedIndex(Relation parentIdx)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("REINDEX is not yet implemented for partitioned indexes")));
}
/*
* Insert or delete an appropriate pg_inherits tuple to make the given index
* be a partition of the indicated parent index.
*
* This also corrects the pg_depend information for the affected index.
*/
void
IndexSetParentIndex(Relation partitionIdx, Oid parentOid)
{
Relation pg_inherits;
ScanKeyData key[2];
SysScanDesc scan;
Oid partRelid = RelationGetRelid(partitionIdx);
HeapTuple tuple;
bool fix_dependencies;
/* Make sure this is an index */
Assert(partitionIdx->rd_rel->relkind == RELKIND_INDEX ||
partitionIdx->rd_rel->relkind == RELKIND_PARTITIONED_INDEX);
/*
* Scan pg_inherits for rows linking our index to some parent.
*/
pg_inherits = relation_open(InheritsRelationId, RowExclusiveLock);
ScanKeyInit(&key[0],
Anum_pg_inherits_inhrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(partRelid));
ScanKeyInit(&key[1],
Anum_pg_inherits_inhseqno,
BTEqualStrategyNumber, F_INT4EQ,
Int32GetDatum(1));
scan = systable_beginscan(pg_inherits, InheritsRelidSeqnoIndexId, true,
NULL, 2, key);
tuple = systable_getnext(scan);
if (!HeapTupleIsValid(tuple))
{
if (parentOid == InvalidOid)
{
/*
* No pg_inherits row, and no parent wanted: nothing to do in
* this case.
*/
fix_dependencies = false;
}
else
{
Datum values[Natts_pg_inherits];
bool isnull[Natts_pg_inherits];
/*
* No pg_inherits row exists, and we want a parent for this index,
* so insert it.
*/
values[Anum_pg_inherits_inhrelid - 1] = ObjectIdGetDatum(partRelid);
values[Anum_pg_inherits_inhparent - 1] =
ObjectIdGetDatum(parentOid);
values[Anum_pg_inherits_inhseqno - 1] = Int32GetDatum(1);
memset(isnull, false, sizeof(isnull));
tuple = heap_form_tuple(RelationGetDescr(pg_inherits),
values, isnull);
CatalogTupleInsert(pg_inherits, tuple);
fix_dependencies = true;
}
}
else
{
Form_pg_inherits inhForm = (Form_pg_inherits) GETSTRUCT(tuple);
if (parentOid == InvalidOid)
{
/*
* There exists a pg_inherits row, which we want to clear; do so.
*/
CatalogTupleDelete(pg_inherits, &tuple->t_self);
fix_dependencies = true;
}
else
{
/*
* A pg_inherits row exists. If it's the same we want, then we're
* good; if it differs, that amounts to a corrupt catalog and
* should not happen.
*/
if (inhForm->inhparent != parentOid)
{
/* unexpected: we should not get called in this case */
elog(ERROR, "bogus pg_inherit row: inhrelid %u inhparent %u",
inhForm->inhrelid, inhForm->inhparent);
}
/* already in the right state */
fix_dependencies = false;
}
}
/* done with pg_inherits */
systable_endscan(scan);
relation_close(pg_inherits, RowExclusiveLock);
if (fix_dependencies)
{
ObjectAddress partIdx;
/*
* Insert/delete pg_depend rows. If setting a parent, add an
* INTERNAL_AUTO dependency to the parent index; if making standalone,
* remove all existing rows and put back the regular dependency on the
* table.
*/
ObjectAddressSet(partIdx, RelationRelationId, partRelid);
if (OidIsValid(parentOid))
{
ObjectAddress parentIdx;
ObjectAddressSet(parentIdx, RelationRelationId, parentOid);
recordDependencyOn(&partIdx, &parentIdx, DEPENDENCY_INTERNAL_AUTO);
}
else
{
ObjectAddress partitionTbl;
ObjectAddressSet(partitionTbl, RelationRelationId,
partitionIdx->rd_index->indrelid);
deleteDependencyRecordsForClass(RelationRelationId, partRelid,
RelationRelationId,
DEPENDENCY_INTERNAL_AUTO);
recordDependencyOn(&partIdx, &partitionTbl, DEPENDENCY_AUTO);
}
}
}

View File

@@ -266,6 +266,12 @@ static const struct dropmsgstrings dropmsgstringarray[] = {
gettext_noop("table \"%s\" does not exist, skipping"),
gettext_noop("\"%s\" is not a table"),
gettext_noop("Use DROP TABLE to remove a table.")},
{RELKIND_PARTITIONED_INDEX,
ERRCODE_UNDEFINED_OBJECT,
gettext_noop("index \"%s\" does not exist"),
gettext_noop("index \"%s\" does not exist, skipping"),
gettext_noop("\"%s\" is not an index"),
gettext_noop("Use DROP INDEX to remove an index.")},
{'\0', 0, NULL, NULL, NULL, NULL}
};
@@ -284,6 +290,7 @@ struct DropRelationCallbackState
#define ATT_INDEX 0x0008
#define ATT_COMPOSITE_TYPE 0x0010
#define ATT_FOREIGN_TABLE 0x0020
#define ATT_PARTITIONED_INDEX 0x0040
/*
* Partition tables are expected to be dropped when the parent partitioned
@@ -475,11 +482,17 @@ static void CreateInheritance(Relation child_rel, Relation parent_rel);
static void RemoveInheritance(Relation child_rel, Relation parent_rel);
static ObjectAddress ATExecAttachPartition(List **wqueue, Relation rel,
PartitionCmd *cmd);
static void AttachPartitionEnsureIndexes(Relation rel, Relation attachrel);
static void ValidatePartitionConstraints(List **wqueue, Relation scanrel,
List *scanrel_children,
List *partConstraint,
bool validate_default);
static ObjectAddress ATExecDetachPartition(Relation rel, RangeVar *name);
static ObjectAddress ATExecAttachPartitionIdx(List **wqueue, Relation rel,
RangeVar *name);
static void validatePartitionedIndex(Relation partedIdx, Relation partedTbl);
static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx,
Relation partitionTbl);
/* ----------------------------------------------------------------
@@ -897,6 +910,53 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
StorePartitionKey(rel, strategy, partnatts, partattrs, partexprs,
partopclass, partcollation);
/* make it all visible */
CommandCounterIncrement();
}
/*
* If we're creating a partition, create now all the indexes defined in
* the parent. We can't do it earlier, because DefineIndex wants to know
* the partition key which we just stored.
*/
if (stmt->partbound)
{
Oid parentId = linitial_oid(inheritOids);
Relation parent;
List *idxlist;
ListCell *cell;
/* Already have strong enough lock on the parent */
parent = heap_open(parentId, NoLock);
idxlist = RelationGetIndexList(parent);
/*
* For each index in the parent table, create one in the partition
*/
foreach(cell, idxlist)
{
Relation idxRel = index_open(lfirst_oid(cell), AccessShareLock);
AttrNumber *attmap;
IndexStmt *idxstmt;
attmap = convert_tuples_by_name_map(RelationGetDescr(rel),
RelationGetDescr(parent),
gettext_noop("could not convert row type"));
idxstmt =
generateClonedIndexStmt(NULL, RelationGetRelid(rel), idxRel,
attmap, RelationGetDescr(rel)->natts);
DefineIndex(RelationGetRelid(rel),
idxstmt,
InvalidOid,
RelationGetRelid(idxRel),
false, false, false, false, false);
index_close(idxRel, AccessShareLock);
}
list_free(idxlist);
heap_close(parent, NoLock);
}
/*
@@ -1179,10 +1239,13 @@ RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, Oid oldRelOid,
* but RemoveRelations() can only pass one relkind for a given relation.
* It chooses RELKIND_RELATION for both regular and partitioned tables.
* That means we must be careful before giving the wrong type error when
* the relation is RELKIND_PARTITIONED_TABLE.
* the relation is RELKIND_PARTITIONED_TABLE. An equivalent problem
* exists with indexes.
*/
if (classform->relkind == RELKIND_PARTITIONED_TABLE)
expected_relkind = RELKIND_RELATION;
else if (classform->relkind == RELKIND_PARTITIONED_INDEX)
expected_relkind = RELKIND_INDEX;
else
expected_relkind = classform->relkind;
@@ -1210,7 +1273,8 @@ RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, Oid oldRelOid,
* we do it the other way around. No error if we don't find a pg_index
* entry, though --- the relation may have been dropped.
*/
if (relkind == RELKIND_INDEX && relOid != oldRelOid)
if ((relkind == RELKIND_INDEX || relkind == RELKIND_PARTITIONED_INDEX) &&
relOid != oldRelOid)
{
state->heapOid = IndexGetRelation(relOid, true);
if (OidIsValid(state->heapOid))
@@ -2396,27 +2460,11 @@ StoreCatalogInheritance1(Oid relationId, Oid parentOid,
int32 seqNumber, Relation inhRelation,
bool child_is_partition)
{
TupleDesc desc = RelationGetDescr(inhRelation);
Datum values[Natts_pg_inherits];
bool nulls[Natts_pg_inherits];
ObjectAddress childobject,
parentobject;
HeapTuple tuple;
/*
* Make the pg_inherits entry
*/
values[Anum_pg_inherits_inhrelid - 1] = ObjectIdGetDatum(relationId);
values[Anum_pg_inherits_inhparent - 1] = ObjectIdGetDatum(parentOid);
values[Anum_pg_inherits_inhseqno - 1] = Int32GetDatum(seqNumber);
memset(nulls, 0, sizeof(nulls));
tuple = heap_form_tuple(desc, values, nulls);
CatalogTupleInsert(inhRelation, tuple);
heap_freetuple(tuple);
/* store the pg_inherits row */
StoreSingleInheritance(relationId, parentOid, seqNumber);
/*
* Store a dependency too
@@ -2540,6 +2588,7 @@ renameatt_check(Oid myrelid, Form_pg_class classform, bool recursing)
relkind != RELKIND_MATVIEW &&
relkind != RELKIND_COMPOSITE_TYPE &&
relkind != RELKIND_INDEX &&
relkind != RELKIND_PARTITIONED_INDEX &&
relkind != RELKIND_FOREIGN_TABLE &&
relkind != RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
@@ -3019,7 +3068,8 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal)
/*
* Also rename the associated constraint, if any.
*/
if (targetrelation->rd_rel->relkind == RELKIND_INDEX)
if (targetrelation->rd_rel->relkind == RELKIND_INDEX ||
targetrelation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
{
Oid constraintId = get_index_constraint(myrelid);
@@ -3073,6 +3123,7 @@ CheckTableNotInUse(Relation rel, const char *stmt)
stmt, RelationGetRelationName(rel))));
if (rel->rd_rel->relkind != RELKIND_INDEX &&
rel->rd_rel->relkind != RELKIND_PARTITIONED_INDEX &&
AfterTriggerPendingOnRel(RelationGetRelid(rel)))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
@@ -3764,6 +3815,10 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
pass = AT_PASS_MISC;
break;
case AT_AttachPartition:
ATSimplePermissions(rel, ATT_TABLE | ATT_PARTITIONED_INDEX);
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
case AT_DetachPartition:
ATSimplePermissions(rel, ATT_TABLE);
/* No command-specific prep needed */
@@ -4112,9 +4167,15 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
ATExecGenericOptions(rel, (List *) cmd->def);
break;
case AT_AttachPartition:
ATExecAttachPartition(wqueue, rel, (PartitionCmd *) cmd->def);
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
ATExecAttachPartition(wqueue, rel, (PartitionCmd *) cmd->def);
else
ATExecAttachPartitionIdx(wqueue, rel,
((PartitionCmd *) cmd->def)->name);
break;
case AT_DetachPartition:
/* ATPrepCmd ensures it must be a table */
Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
ATExecDetachPartition(rel, ((PartitionCmd *) cmd->def)->name);
break;
default: /* oops */
@@ -4148,9 +4209,13 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode)
{
AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
/* Foreign tables have no storage, nor do partitioned tables. */
/*
* Foreign tables have no storage, nor do partitioned tables and
* indexes.
*/
if (tab->relkind == RELKIND_FOREIGN_TABLE ||
tab->relkind == RELKIND_PARTITIONED_TABLE)
tab->relkind == RELKIND_PARTITIONED_TABLE ||
tab->relkind == RELKIND_PARTITIONED_INDEX)
continue;
/*
@@ -4752,6 +4817,9 @@ ATSimplePermissions(Relation rel, int allowed_targets)
case RELKIND_INDEX:
actual_target = ATT_INDEX;
break;
case RELKIND_PARTITIONED_INDEX:
actual_target = ATT_PARTITIONED_INDEX;
break;
case RELKIND_COMPOSITE_TYPE:
actual_target = ATT_COMPOSITE_TYPE;
break;
@@ -6194,6 +6262,7 @@ ATPrepSetStatistics(Relation rel, const char *colName, int16 colNum, Node *newVa
if (rel->rd_rel->relkind != RELKIND_RELATION &&
rel->rd_rel->relkind != RELKIND_MATVIEW &&
rel->rd_rel->relkind != RELKIND_INDEX &&
rel->rd_rel->relkind != RELKIND_PARTITIONED_INDEX &&
rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
@@ -6205,7 +6274,9 @@ ATPrepSetStatistics(Relation rel, const char *colName, int16 colNum, Node *newVa
* We allow referencing columns by numbers only for indexes, since table
* column numbers could contain gaps if columns are later dropped.
*/
if (rel->rd_rel->relkind != RELKIND_INDEX && !colName)
if (rel->rd_rel->relkind != RELKIND_INDEX &&
rel->rd_rel->relkind != RELKIND_PARTITIONED_INDEX &&
!colName)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot refer to non-index column by number")));
@@ -6283,7 +6354,8 @@ ATExecSetStatistics(Relation rel, const char *colName, int16 colNum, Node *newVa
errmsg("cannot alter system column \"%s\"",
colName)));
if (rel->rd_rel->relkind == RELKIND_INDEX &&
if ((rel->rd_rel->relkind == RELKIND_INDEX ||
rel->rd_rel->relkind == RELKIND_PARTITIONED_INDEX) &&
rel->rd_index->indkey.values[attnum - 1] != 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@@ -6736,6 +6808,7 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
address = DefineIndex(RelationGetRelid(rel),
stmt,
InvalidOid, /* no predefined OID */
InvalidOid, /* no parent index */
true, /* is_alter_table */
check_rights,
false, /* check_not_in_use - we did it already */
@@ -9139,7 +9212,8 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
{
char relKind = get_rel_relkind(foundObject.objectId);
if (relKind == RELKIND_INDEX)
if (relKind == RELKIND_INDEX ||
relKind == RELKIND_PARTITIONED_INDEX)
{
Assert(foundObject.objectSubId == 0);
if (!list_member_oid(tab->changedIndexOids, foundObject.objectId))
@@ -9982,6 +10056,15 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, LOCKMODE lock
newOwnerId = tuple_class->relowner;
}
break;
case RELKIND_PARTITIONED_INDEX:
if (recursing)
break;
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot change owner of index \"%s\"",
NameStr(tuple_class->relname)),
errhint("Change the ownership of the index's table, instead.")));
break;
case RELKIND_SEQUENCE:
if (!recursing &&
tuple_class->relowner != newOwnerId)
@@ -10103,6 +10186,7 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, LOCKMODE lock
*/
if (tuple_class->relkind != RELKIND_COMPOSITE_TYPE &&
tuple_class->relkind != RELKIND_INDEX &&
tuple_class->relkind != RELKIND_PARTITIONED_INDEX &&
tuple_class->relkind != RELKIND_TOASTVALUE)
changeDependencyOnOwner(RelationRelationId, relationOid,
newOwnerId);
@@ -10110,7 +10194,8 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, LOCKMODE lock
/*
* Also change the ownership of the table's row type, if it has one
*/
if (tuple_class->relkind != RELKIND_INDEX)
if (tuple_class->relkind != RELKIND_INDEX &&
tuple_class->relkind != RELKIND_PARTITIONED_INDEX)
AlterTypeOwnerInternal(tuple_class->reltype, newOwnerId);
/*
@@ -10119,6 +10204,7 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, LOCKMODE lock
* relation, as well as its toast table (if it has one).
*/
if (tuple_class->relkind == RELKIND_RELATION ||
tuple_class->relkind == RELKIND_PARTITIONED_TABLE ||
tuple_class->relkind == RELKIND_MATVIEW ||
tuple_class->relkind == RELKIND_TOASTVALUE)
{
@@ -10427,6 +10513,7 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
(void) view_reloptions(newOptions, true);
break;
case RELKIND_INDEX:
case RELKIND_PARTITIONED_INDEX:
(void) index_reloptions(rel->rd_amroutine->amoptions, newOptions, true);
break;
default:
@@ -10839,7 +10926,8 @@ AlterTableMoveAll(AlterTableMoveAllStmt *stmt)
relForm->relkind != RELKIND_RELATION &&
relForm->relkind != RELKIND_PARTITIONED_TABLE) ||
(stmt->objtype == OBJECT_INDEX &&
relForm->relkind != RELKIND_INDEX) ||
relForm->relkind != RELKIND_INDEX &&
relForm->relkind != RELKIND_PARTITIONED_INDEX) ||
(stmt->objtype == OBJECT_MATVIEW &&
relForm->relkind != RELKIND_MATVIEW))
continue;
@@ -11633,45 +11721,18 @@ RemoveInheritance(Relation child_rel, Relation parent_rel)
Relation catalogRelation;
SysScanDesc scan;
ScanKeyData key[3];
HeapTuple inheritsTuple,
attributeTuple,
HeapTuple attributeTuple,
constraintTuple;
List *connames;
bool found = false;
bool found;
bool child_is_partition = false;
/* If parent_rel is a partitioned table, child_rel must be a partition */
if (parent_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
child_is_partition = true;
/*
* Find and destroy the pg_inherits entry linking the two, or error out if
* there is none.
*/
catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
ScanKeyInit(&key[0],
Anum_pg_inherits_inhrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(child_rel)));
scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId,
true, NULL, 1, key);
while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
{
Oid inhparent;
inhparent = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent;
if (inhparent == RelationGetRelid(parent_rel))
{
CatalogTupleDelete(catalogRelation, &inheritsTuple->t_self);
found = true;
break;
}
}
systable_endscan(scan);
heap_close(catalogRelation, RowExclusiveLock);
found = DeleteInheritsTuple(RelationGetRelid(child_rel),
RelationGetRelid(parent_rel));
if (!found)
{
if (child_is_partition)
@@ -13226,7 +13287,8 @@ RangeVarCallbackForAlterRelation(const RangeVar *rv, Oid relid, Oid oldrelid,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a composite type", rv->relname)));
if (reltype == OBJECT_INDEX && relkind != RELKIND_INDEX
if (reltype == OBJECT_INDEX && relkind != RELKIND_INDEX &&
relkind != RELKIND_PARTITIONED_INDEX
&& !IsA(stmt, RenameStmt))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
@@ -13946,6 +14008,9 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
/* Update the pg_class entry. */
StorePartitionBound(attachrel, rel, cmd->bound);
/* Ensure there exists a correct set of indexes in the partition. */
AttachPartitionEnsureIndexes(rel, attachrel);
/*
* Generate partition constraint from the partition bound specification.
* If the parent itself is a partition, make sure to include its
@@ -14015,6 +14080,127 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
return address;
}
/*
* AttachPartitionEnsureIndexes
* subroutine for ATExecAttachPartition to create/match indexes
*
* Enforce the indexing rule for partitioned tables during ALTER TABLE / ATTACH
* PARTITION: every partition must have an index attached to each index on the
* partitioned table.
*/
static void
AttachPartitionEnsureIndexes(Relation rel, Relation attachrel)
{
List *idxes;
List *attachRelIdxs;
Relation *attachrelIdxRels;
IndexInfo **attachInfos;
int i;
ListCell *cell;
MemoryContext cxt;
MemoryContext oldcxt;
cxt = AllocSetContextCreate(CurrentMemoryContext,
"AttachPartitionEnsureIndexes",
ALLOCSET_DEFAULT_SIZES);
oldcxt = MemoryContextSwitchTo(cxt);
idxes = RelationGetIndexList(rel);
attachRelIdxs = RelationGetIndexList(attachrel);
attachrelIdxRels = palloc(sizeof(Relation) * list_length(attachRelIdxs));
attachInfos = palloc(sizeof(IndexInfo *) * list_length(attachRelIdxs));
/* Build arrays of all existing indexes and their IndexInfos */
i = 0;
foreach(cell, attachRelIdxs)
{
Oid cldIdxId = lfirst_oid(cell);
attachrelIdxRels[i] = index_open(cldIdxId, AccessShareLock);
attachInfos[i] = BuildIndexInfo(attachrelIdxRels[i]);
i++;
}
/*
* For each index on the partitioned table, find a matching one in the
* partition-to-be; if one is not found, create one.
*/
foreach(cell, idxes)
{
Oid idx = lfirst_oid(cell);
Relation idxRel = index_open(idx, AccessShareLock);
IndexInfo *info;
AttrNumber *attmap;
bool found = false;
/*
* Ignore indexes in the partitioned table other than partitioned
* indexes.
*/
if (idxRel->rd_rel->relkind != RELKIND_PARTITIONED_INDEX)
{
index_close(idxRel, AccessShareLock);
continue;
}
/* construct an indexinfo to compare existing indexes against */
info = BuildIndexInfo(idxRel);
attmap = convert_tuples_by_name_map(RelationGetDescr(attachrel),
RelationGetDescr(rel),
gettext_noop("could not convert row type"));
/*
* Scan the list of existing indexes in the partition-to-be, and mark
* the first matching, unattached one we find, if any, as partition of
* the parent index. If we find one, we're done.
*/
for (i = 0; i < list_length(attachRelIdxs); i++)
{
/* does this index have a parent? if so, can't use it */
if (has_superclass(RelationGetRelid(attachrelIdxRels[i])))
continue;
if (CompareIndexInfo(attachInfos[i], info,
attachrelIdxRels[i]->rd_indcollation,
idxRel->rd_indcollation,
attachrelIdxRels[i]->rd_opfamily,
idxRel->rd_opfamily,
attmap,
RelationGetDescr(rel)->natts))
{
/* bingo. */
IndexSetParentIndex(attachrelIdxRels[i], idx);
found = true;
break;
}
}
/*
* If no suitable index was found in the partition-to-be, create one
* now.
*/
if (!found)
{
IndexStmt *stmt;
stmt = generateClonedIndexStmt(NULL, RelationGetRelid(attachrel),
idxRel, attmap,
RelationGetDescr(rel)->natts);
DefineIndex(RelationGetRelid(attachrel), stmt, InvalidOid,
RelationGetRelid(idxRel),
false, false, false, false, false);
}
index_close(idxRel, AccessShareLock);
}
/* Clean up. */
for (i = 0; i < list_length(attachRelIdxs); i++)
index_close(attachrelIdxRels[i], AccessShareLock);
MemoryContextSwitchTo(oldcxt);
MemoryContextDelete(cxt);
}
/*
* ALTER TABLE DETACH PARTITION
*
@@ -14033,6 +14219,8 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
new_repl[Natts_pg_class];
ObjectAddress address;
Oid defaultPartOid;
List *indexes;
ListCell *cell;
/*
* We must lock the default partition, because detaching this partition
@@ -14094,6 +14282,24 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
}
}
/* detach indexes too */
indexes = RelationGetIndexList(partRel);
foreach(cell, indexes)
{
Oid idxid = lfirst_oid(cell);
Relation idx;
if (!has_superclass(idxid))
continue;
Assert((IndexGetRelation(get_partition_parent(idxid), false) ==
RelationGetRelid(rel)));
idx = index_open(idxid, AccessExclusiveLock);
IndexSetParentIndex(idx, InvalidOid);
relation_close(idx, AccessExclusiveLock);
}
/*
* Invalidate the parent's relcache so that the partition is no longer
* included in its partition descriptor.
@@ -14107,3 +14313,328 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
return address;
}
/*
* Before acquiring lock on an index, acquire the same lock on the owning
* table.
*/
struct AttachIndexCallbackState
{
Oid partitionOid;
Oid parentTblOid;
bool lockedParentTbl;
};
static void
RangeVarCallbackForAttachIndex(const RangeVar *rv, Oid relOid, Oid oldRelOid,
void *arg)
{
struct AttachIndexCallbackState *state;
Form_pg_class classform;
HeapTuple tuple;
state = (struct AttachIndexCallbackState *) arg;
if (!state->lockedParentTbl)
{
LockRelationOid(state->parentTblOid, AccessShareLock);
state->lockedParentTbl = true;
}
/*
* If we previously locked some other heap, and the name we're looking up
* no longer refers to an index on that relation, release the now-useless
* lock. XXX maybe we should do *after* we verify whether the index does
* not actually belong to the same relation ...
*/
if (relOid != oldRelOid && OidIsValid(state->partitionOid))
{
UnlockRelationOid(state->partitionOid, AccessShareLock);
state->partitionOid = InvalidOid;
}
/* Didn't find a relation, so no need for locking or permission checks. */
if (!OidIsValid(relOid))
return;
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
if (!HeapTupleIsValid(tuple))
return; /* concurrently dropped, so nothing to do */
classform = (Form_pg_class) GETSTRUCT(tuple);
if (classform->relkind != RELKIND_PARTITIONED_INDEX &&
classform->relkind != RELKIND_INDEX)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("\"%s\" is not an index", rv->relname)));
ReleaseSysCache(tuple);
/*
* Since we need only examine the heap's tupledesc, an access share lock
* on it (preventing any DDL) is sufficient.
*/
state->partitionOid = IndexGetRelation(relOid, false);
LockRelationOid(state->partitionOid, AccessShareLock);
}
/*
* ALTER INDEX i1 ATTACH PARTITION i2
*/
static ObjectAddress
ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name)
{
Relation partIdx;
Relation partTbl;
Relation parentTbl;
ObjectAddress address;
Oid partIdxId;
Oid currParent;
struct AttachIndexCallbackState state;
/*
* We need to obtain lock on the index 'name' to modify it, but we also
* need to read its owning table's tuple descriptor -- so we need to lock
* both. To avoid deadlocks, obtain lock on the table before doing so on
* the index. Furthermore, we need to examine the parent table of the
* partition, so lock that one too.
*/
state.partitionOid = InvalidOid;
state.parentTblOid = parentIdx->rd_index->indrelid;
state.lockedParentTbl = false;
partIdxId =
RangeVarGetRelidExtended(name, AccessExclusiveLock, false, false,
RangeVarCallbackForAttachIndex,
(void *) &state);
/* Not there? */
if (!OidIsValid(partIdxId))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("index \"%s\" does not exist", name->relname)));
/* no deadlock risk: RangeVarGetRelidExtended already acquired the lock */
partIdx = relation_open(partIdxId, AccessExclusiveLock);
/* we already hold locks on both tables, so this is safe: */
parentTbl = relation_open(parentIdx->rd_index->indrelid, AccessShareLock);
partTbl = relation_open(partIdx->rd_index->indrelid, NoLock);
ObjectAddressSet(address, RelationRelationId, RelationGetRelid(partIdx));
/* Silently do nothing if already in the right state */
currParent = !has_superclass(partIdxId) ? InvalidOid :
get_partition_parent(partIdxId);
if (currParent != RelationGetRelid(parentIdx))
{
IndexInfo *childInfo;
IndexInfo *parentInfo;
AttrNumber *attmap;
bool found;
int i;
PartitionDesc partDesc;
/*
* If this partition already has an index attached, refuse the operation.
*/
refuseDupeIndexAttach(parentIdx, partIdx, partTbl);
if (OidIsValid(currParent))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot attach index \"%s\" as a partition of index \"%s\"",
RelationGetRelationName(partIdx),
RelationGetRelationName(parentIdx)),
errdetail("Index \"%s\" is already attached to another index.",
RelationGetRelationName(partIdx))));
/* Make sure it indexes a partition of the other index's table */
partDesc = RelationGetPartitionDesc(parentTbl);
found = false;
for (i = 0; i < partDesc->nparts; i++)
{
if (partDesc->oids[i] == state.partitionOid)
{
found = true;
break;
}
}
if (!found)
ereport(ERROR,
(errmsg("cannot attach index \"%s\" as a partition of index \"%s\"",
RelationGetRelationName(partIdx),
RelationGetRelationName(parentIdx)),
errdetail("Index \"%s\" is not an index on any partition of table \"%s\".",
RelationGetRelationName(partIdx),
RelationGetRelationName(parentTbl))));
/* Ensure the indexes are compatible */
childInfo = BuildIndexInfo(partIdx);
parentInfo = BuildIndexInfo(parentIdx);
attmap = convert_tuples_by_name_map(RelationGetDescr(partTbl),
RelationGetDescr(parentTbl),
gettext_noop("could not convert row type"));
if (!CompareIndexInfo(childInfo, parentInfo,
partIdx->rd_indcollation,
parentIdx->rd_indcollation,
partIdx->rd_opfamily,
parentIdx->rd_opfamily,
attmap,
RelationGetDescr(partTbl)->natts))
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("cannot attach index \"%s\" as a partition of index \"%s\"",
RelationGetRelationName(partIdx),
RelationGetRelationName(parentIdx)),
errdetail("The index definitions do not match.")));
/* All good -- do it */
IndexSetParentIndex(partIdx, RelationGetRelid(parentIdx));
pfree(attmap);
CommandCounterIncrement();
validatePartitionedIndex(parentIdx, parentTbl);
}
relation_close(parentTbl, AccessShareLock);
/* keep these locks till commit */
relation_close(partTbl, NoLock);
relation_close(partIdx, NoLock);
return address;
}
/*
* Verify whether the given partition already contains an index attached
* to the given partitioned index. If so, raise an error.
*/
static void
refuseDupeIndexAttach(Relation parentIdx, Relation partIdx, Relation partitionTbl)
{
Relation pg_inherits;
ScanKeyData key;
HeapTuple tuple;
SysScanDesc scan;
pg_inherits = heap_open(InheritsRelationId, AccessShareLock);
ScanKeyInit(&key, Anum_pg_inherits_inhparent,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(parentIdx)));
scan = systable_beginscan(pg_inherits, InheritsParentIndexId, true,
NULL, 1, &key);
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
Form_pg_inherits inhForm;
Oid tab;
inhForm = (Form_pg_inherits) GETSTRUCT(tuple);
tab = IndexGetRelation(inhForm->inhrelid, false);
if (tab == RelationGetRelid(partitionTbl))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot attach index \"%s\" as a partition of index \"%s\"",
RelationGetRelationName(partIdx),
RelationGetRelationName(parentIdx)),
errdetail("Another index is already attached for partition \"%s\".",
RelationGetRelationName(partitionTbl))));
}
systable_endscan(scan);
heap_close(pg_inherits, AccessShareLock);
}
/*
* Verify whether the set of attached partition indexes to a parent index on
* a partitioned table is complete. If it is, mark the parent index valid.
*
* This should be called each time a partition index is attached.
*/
static void
validatePartitionedIndex(Relation partedIdx, Relation partedTbl)
{
Relation inheritsRel;
SysScanDesc scan;
ScanKeyData key;
int tuples = 0;
HeapTuple inhTup;
bool updated = false;
Assert(partedIdx->rd_rel->relkind == RELKIND_PARTITIONED_INDEX);
/*
* Scan pg_inherits for this parent index. Count each valid index we find
* (verifying the pg_index entry for each), and if we reach the total
* amount we expect, we can mark this parent index as valid.
*/
inheritsRel = heap_open(InheritsRelationId, AccessShareLock);
ScanKeyInit(&key, Anum_pg_inherits_inhparent,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(partedIdx)));
scan = systable_beginscan(inheritsRel, InheritsParentIndexId, true,
NULL, 1, &key);
while ((inhTup = systable_getnext(scan)) != NULL)
{
Form_pg_inherits inhForm = (Form_pg_inherits) GETSTRUCT(inhTup);
HeapTuple indTup;
Form_pg_index indexForm;
indTup = SearchSysCache1(INDEXRELID,
ObjectIdGetDatum(inhForm->inhrelid));
if (!indTup)
elog(ERROR, "cache lookup failed for index %u",
inhForm->inhrelid);
indexForm = (Form_pg_index) GETSTRUCT(indTup);
if (IndexIsValid(indexForm))
tuples += 1;
ReleaseSysCache(indTup);
}
/* Done with pg_inherits */
systable_endscan(scan);
heap_close(inheritsRel, AccessShareLock);
/*
* If we found as many inherited indexes as the partitioned table has
* partitions, we're good; update pg_index to set indisvalid.
*/
if (tuples == RelationGetPartitionDesc(partedTbl)->nparts)
{
Relation idxRel;
HeapTuple newtup;
idxRel = heap_open(IndexRelationId, RowExclusiveLock);
newtup = heap_copytuple(partedIdx->rd_indextuple);
((Form_pg_index) GETSTRUCT(newtup))->indisvalid = true;
updated = true;
CatalogTupleUpdate(idxRel, &partedIdx->rd_indextuple->t_self, newtup);
heap_close(idxRel, RowExclusiveLock);
}
/*
* If this index is in turn a partition of a larger index, validating it
* might cause the parent to become valid also. Try that.
*/
if (updated &&
has_superclass(RelationGetRelid(partedIdx)))
{
Oid parentIdxId,
parentTblId;
Relation parentIdx,
parentTbl;
/* make sure we see the validation we just did */
CommandCounterIncrement();
parentIdxId = get_partition_parent(RelationGetRelid(partedIdx));
parentTblId = get_partition_parent(RelationGetRelid(partedTbl));
parentIdx = relation_open(parentIdxId, AccessExclusiveLock);
parentTbl = relation_open(parentTblId, AccessExclusiveLock);
Assert(!parentIdx->rd_index->indisvalid);
validatePartitionedIndex(parentIdx, parentTbl);
relation_close(parentIdx, AccessExclusiveLock);
relation_close(parentTbl, AccessExclusiveLock);
}
}