1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-28 11:55:03 +03:00

Implement reindex command

This commit is contained in:
Hiroshi Inoue
2000-02-18 09:30:20 +00:00
parent e3befe4a66
commit e3a97b370c
29 changed files with 1208 additions and 229 deletions

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/indexcmds.c,v 1.20 2000/01/26 05:56:13 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/commands/indexcmds.c,v 1.21 2000/02/18 09:29:37 inoue Exp $
*
*-------------------------------------------------------------------------
*/
@@ -17,12 +17,15 @@
#include "access/genam.h"
#include "access/heapam.h"
#include "catalog/catname.h"
#include "catalog/heap.h"
#include "catalog/index.h"
#include "catalog/pg_index.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "catalog/pg_database.h"
#include "catalog/pg_shadow.h"
#include "commands/defrem.h"
#include "optimizer/clauses.h"
#include "optimizer/planmain.h"
@@ -30,6 +33,9 @@
#include "parser/parsetree.h"
#include "utils/builtins.h"
#include "utils/syscache.h"
#include "miscadmin.h" /* ReindexDatabase() */
#include "utils/portal.h" /* ReindexDatabase() */
#include "catalog/catalog.h" /* ReindexDatabase() */
#define IsFuncIndex(ATTR_LIST) (((IndexElem*)lfirst(ATTR_LIST))->args!=NULL)
@@ -149,6 +155,8 @@ DefineIndex(char *heapRelationName,
CheckPredicate(cnfPred, rangetable, relationId);
}
if (!IsBootstrapProcessingMode() && !IndexesAreActive(relationId, false))
elog(ERROR, "existent indexes are inactive. REINDEX first");
if (IsFuncIndex(attributeList))
{
IndexElem *funcIndex = lfirst(attributeList);
@@ -195,6 +203,7 @@ DefineIndex(char *heapRelationName,
classObjectId, parameterCount, parameterA, (Node *) cnfPred,
lossy, unique, primary);
}
setRelhasindexInplace(relationId, true, false);
}
@@ -570,3 +579,163 @@ RemoveIndex(char *name)
index_drop(tuple->t_data->t_oid);
}
/*
* Reindex
* Recreate an index.
*
* Exceptions:
* "ERROR" if index nonexistent.
* ...
*/
void
ReindexIndex(const char *name, bool force /* currently unused */)
{
HeapTuple tuple;
tuple = SearchSysCacheTuple(RELNAME,
PointerGetDatum(name),
0, 0, 0);
if (!HeapTupleIsValid(tuple))
elog(ERROR, "index \"%s\" nonexistent", name);
if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
{
elog(ERROR, "relation \"%s\" is of type \"%c\"",
name,
((Form_pg_class) GETSTRUCT(tuple))->relkind);
}
reindex_index(tuple->t_data->t_oid, force);
}
/*
* ReindexTable
* Recreate indexes of a table.
*
* Exceptions:
* "ERROR" if table nonexistent.
* ...
*/
void
ReindexTable(const char *name, bool force)
{
HeapTuple tuple;
tuple = SearchSysCacheTuple(RELNAME,
PointerGetDatum(name),
0, 0, 0);
if (!HeapTupleIsValid(tuple))
elog(ERROR, "table \"%s\" nonexistent", name);
if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_RELATION)
{
elog(ERROR, "relation \"%s\" is of type \"%c\"",
name,
((Form_pg_class) GETSTRUCT(tuple))->relkind);
}
reindex_relation(tuple->t_data->t_oid, force);
}
/*
* ReindexDatabase
* Recreate indexes of a database.
*
* Exceptions:
* "ERROR" if table nonexistent.
* ...
*/
extern Oid MyDatabaseId;
void
ReindexDatabase(const char *dbname, bool force, bool all)
{
Relation relation, relationRelation;
HeapTuple usertuple, dbtuple, tuple;
HeapScanDesc scan;
int4 user_id, db_owner;
bool superuser;
Oid db_id;
char *username;
ScanKeyData scankey;
PortalVariableMemory pmem;
MemoryContext old;
int relcnt, relalc, i, oncealc = 200;
Oid *relids = (Oid *) NULL;
AssertArg(dbname);
username = GetPgUserName();
usertuple = SearchSysCacheTuple(SHADOWNAME, PointerGetDatum(username),
0, 0, 0);
if (!HeapTupleIsValid(usertuple))
elog(ERROR, "Current user '%s' is invalid.", username);
user_id = ((Form_pg_shadow) GETSTRUCT(usertuple))->usesysid;
superuser = ((Form_pg_shadow) GETSTRUCT(usertuple))->usesuper;
relation = heap_openr(DatabaseRelationName, AccessShareLock);
ScanKeyEntryInitialize(&scankey, 0, Anum_pg_database_datname,
F_NAMEEQ, NameGetDatum(dbname));
scan = heap_beginscan(relation, 0, SnapshotNow, 1, &scankey);
dbtuple = heap_getnext(scan, 0);
if (!HeapTupleIsValid(dbtuple))
elog(ERROR, "Database '%s' doesn't exist", dbname);
db_id = dbtuple->t_data->t_oid;
db_owner = ((Form_pg_database) GETSTRUCT(dbtuple))->datdba;
heap_endscan(scan);
if (user_id != db_owner && !superuser)
elog(ERROR, "REINDEX DATABASE: Permission denied.");
if (db_id != MyDatabaseId)
elog(ERROR, "REINDEX DATABASE: Can be executed only on the currently open database.");
heap_close(relation, NoLock);
/** reindex_database(db_id, force, !all); **/
CommonSpecialPortalOpen();
pmem = CommonSpecialPortalGetMemory();
relationRelation = heap_openr(RelationRelationName, AccessShareLock);
scan = heap_beginscan(relationRelation, false, SnapshotNow, 0, NULL);
relcnt = relalc = 0;
while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
{
if (!all)
{
if (!IsSystemRelationName(NameStr(((Form_pg_class) GETSTRUCT(tuple))->relname)))
continue;
if (((Form_pg_class) GETSTRUCT(tuple))->relhasrules)
continue;
}
if (((Form_pg_class) GETSTRUCT(tuple))->relkind == RELKIND_RELATION)
{
old = MemoryContextSwitchTo((MemoryContext) pmem);
if (relcnt == 0)
{
relalc = oncealc;
relids = palloc(sizeof(Oid) * relalc);
}
else if (relcnt >= relalc)
{
relalc *= 2;
relids = repalloc(relids, sizeof(Oid) * relalc);
}
MemoryContextSwitchTo(old);
relids[relcnt] = tuple->t_data->t_oid;
relcnt++;
}
}
heap_endscan(scan);
heap_close(relationRelation, AccessShareLock);
CommitTransactionCommand();
for (i = 0; i < relcnt; i++)
{
StartTransactionCommand();
reindex_relation(relids[i], force);
CommitTransactionCommand();
}
CommonSpecialPortalClose();
StartTransactionCommand();
}

View File

@@ -7,7 +7,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/trigger.c,v 1.60 2000/02/13 13:21:10 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/commands/trigger.c,v 1.61 2000/02/18 09:29:37 inoue Exp $
*
*-------------------------------------------------------------------------
*/
@@ -433,15 +433,18 @@ RelationBuildTriggers(Relation relation)
Trigger *build;
Relation tgrel;
Form_pg_trigger pg_trigger;
Relation irel;
Relation irel = (Relation) NULL;
ScanKeyData skey;
HeapTupleData tuple;
IndexScanDesc sd;
IndexScanDesc sd = (IndexScanDesc) NULL;
HeapScanDesc tgscan = (HeapScanDesc) NULL;
HeapTuple htup;
RetrieveIndexResult indexRes;
Buffer buffer;
struct varlena *val;
bool isnull;
int found;
bool hasindex;
MemSet(trigdesc, 0, sizeof(TriggerDesc));
@@ -452,25 +455,41 @@ RelationBuildTriggers(Relation relation)
ObjectIdGetDatum(RelationGetRelid(relation)));
tgrel = heap_openr(TriggerRelationName, AccessShareLock);
irel = index_openr(TriggerRelidIndex);
sd = index_beginscan(irel, false, 1, &skey);
hasindex = (tgrel->rd_rel->relhasindex && !IsIgnoringSystemIndexes());
if (hasindex)
{
irel = index_openr(TriggerRelidIndex);
sd = index_beginscan(irel, false, 1, &skey);
}
else
tgscan = heap_beginscan(tgrel, 0, SnapshotNow, 1, &skey);
for (found = 0;;)
{
indexRes = index_getnext(sd, ForwardScanDirection);
if (!indexRes)
break;
if (hasindex)
{
indexRes = index_getnext(sd, ForwardScanDirection);
if (!indexRes)
break;
tuple.t_self = indexRes->heap_iptr;
heap_fetch(tgrel, SnapshotNow, &tuple, &buffer);
pfree(indexRes);
if (!tuple.t_data)
continue;
tuple.t_self = indexRes->heap_iptr;
heap_fetch(tgrel, SnapshotNow, &tuple, &buffer);
pfree(indexRes);
if (!tuple.t_data)
continue;
htup = &tuple;
}
else
{
htup = heap_getnext(tgscan, 0);
if (!HeapTupleIsValid(htup))
break;
}
if (found == ntrigs)
elog(ERROR, "RelationBuildTriggers: unexpected record found for rel %s",
RelationGetRelationName(relation));
pg_trigger = (Form_pg_trigger) GETSTRUCT(&tuple);
pg_trigger = (Form_pg_trigger) GETSTRUCT(htup);
if (triggers == NULL)
triggers = (Trigger *) palloc(sizeof(Trigger));
@@ -478,7 +497,7 @@ RelationBuildTriggers(Relation relation)
triggers = (Trigger *) repalloc(triggers, (found + 1) * sizeof(Trigger));
build = &(triggers[found]);
build->tgoid = tuple.t_data->t_oid;
build->tgoid = htup->t_data->t_oid;
build->tgname = nameout(&pg_trigger->tgname);
build->tgfoid = pg_trigger->tgfoid;
build->tgfunc.fn_addr = NULL;
@@ -489,7 +508,7 @@ RelationBuildTriggers(Relation relation)
build->tginitdeferred = pg_trigger->tginitdeferred;
build->tgnargs = pg_trigger->tgnargs;
memcpy(build->tgattr, &(pg_trigger->tgattr), FUNC_MAX_ARGS * sizeof(int16));
val = (struct varlena *) fastgetattr(&tuple,
val = (struct varlena *) fastgetattr(htup,
Anum_pg_trigger_tgargs,
tgrel->rd_att, &isnull);
if (isnull)
@@ -500,7 +519,7 @@ RelationBuildTriggers(Relation relation)
char *p;
int i;
val = (struct varlena *) fastgetattr(&tuple,
val = (struct varlena *) fastgetattr(htup,
Anum_pg_trigger_tgargs,
tgrel->rd_att, &isnull);
if (isnull)
@@ -518,7 +537,8 @@ RelationBuildTriggers(Relation relation)
build->tgargs = NULL;
found++;
ReleaseBuffer(buffer);
if (hasindex)
ReleaseBuffer(buffer);
}
if (found < ntrigs)
@@ -526,8 +546,13 @@ RelationBuildTriggers(Relation relation)
ntrigs - found,
RelationGetRelationName(relation));
index_endscan(sd);
index_close(irel);
if (hasindex)
{
index_endscan(sd);
index_close(irel);
}
else
heap_endscan(tgscan);
heap_close(tgrel, AccessShareLock);
/* Build trigdesc */
@@ -1460,7 +1485,7 @@ void
DeferredTriggerSetState(ConstraintsSetStmt *stmt)
{
Relation tgrel;
Relation irel;
Relation irel = (Relation) NULL;
List *l;
List *ls;
List *lnext;
@@ -1468,6 +1493,7 @@ DeferredTriggerSetState(ConstraintsSetStmt *stmt)
MemoryContext oldcxt;
bool found;
DeferredTriggerStatus state;
bool hasindex;
/* ----------
* Handle SET CONSTRAINTS ALL ...
@@ -1548,13 +1574,17 @@ DeferredTriggerSetState(ConstraintsSetStmt *stmt)
* ----------
*/
tgrel = heap_openr(TriggerRelationName, AccessShareLock);
irel = index_openr(TriggerConstrNameIndex);
hasindex = (tgrel->rd_rel->relhasindex && !IsIgnoringSystemIndexes());
if (hasindex)
irel = index_openr(TriggerConstrNameIndex);
foreach (l, stmt->constraints)
{
ScanKeyData skey;
HeapTupleData tuple;
IndexScanDesc sd;
IndexScanDesc sd = (IndexScanDesc) NULL;
HeapScanDesc tgscan = (HeapScanDesc) NULL;
HeapTuple htup;
RetrieveIndexResult indexRes;
Buffer buffer;
Form_pg_trigger pg_trigger;
@@ -1577,7 +1607,10 @@ DeferredTriggerSetState(ConstraintsSetStmt *stmt)
(RegProcedure) F_NAMEEQ,
PointerGetDatum((char *)lfirst(l)));
sd = index_beginscan(irel, false, 1, &skey);
if (hasindex)
sd = index_beginscan(irel, false, 1, &skey);
else
tgscan = heap_beginscan(tgrel, 0, SnapshotNow, 1, &skey);
/* ----------
* ... and search for the constraint trigger row
@@ -1586,33 +1619,43 @@ DeferredTriggerSetState(ConstraintsSetStmt *stmt)
found = false;
for (;;)
{
indexRes = index_getnext(sd, ForwardScanDirection);
if (!indexRes)
break;
tuple.t_self = indexRes->heap_iptr;
heap_fetch(tgrel, SnapshotNow, &tuple, &buffer);
pfree(indexRes);
if (!tuple.t_data)
if (hasindex)
{
ReleaseBuffer(buffer);
continue;
indexRes = index_getnext(sd, ForwardScanDirection);
if (!indexRes)
break;
tuple.t_self = indexRes->heap_iptr;
heap_fetch(tgrel, SnapshotNow, &tuple, &buffer);
pfree(indexRes);
if (!tuple.t_data)
{
continue;
}
htup = &tuple;
}
else
{
htup = heap_getnext(tgscan, 0);
if (!HeapTupleIsValid(htup))
break;
}
/* ----------
* If we found some, check that they fit the deferrability
* ----------
*/
pg_trigger = (Form_pg_trigger) GETSTRUCT(&tuple);
pg_trigger = (Form_pg_trigger) GETSTRUCT(htup);
if (stmt->deferred & !pg_trigger->tgdeferrable)
elog(ERROR, "Constraint '%s' is not deferrable",
(char *)lfirst(l));
constr_oid = tuple.t_data->t_oid;
constr_oid = htup->t_data->t_oid;
loid = lappend(loid, (Node *)constr_oid);
found = true;
ReleaseBuffer(buffer);
if (hasindex)
ReleaseBuffer(buffer);
}
/* ----------
@@ -1622,9 +1665,13 @@ DeferredTriggerSetState(ConstraintsSetStmt *stmt)
if (!found)
elog(ERROR, "Constraint '%s' does not exist", (char *)lfirst(l));
index_endscan(sd);
if (hasindex)
index_endscan(sd);
else
heap_endscan(tgscan);
}
index_close(irel);
if (hasindex)
index_close(irel);
heap_close(tgrel, AccessShareLock);

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.138 2000/01/26 05:56:13 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.139 2000/02/18 09:29:37 inoue Exp $
*
*-------------------------------------------------------------------------
*/
@@ -51,7 +51,7 @@
#endif
bool VacuumRunning = false;
bool CommonSpecialPortalInUse = false;
static Portal vc_portal;
@@ -99,6 +99,53 @@ static bool vc_enough_space(VPageDescr vpd, Size len);
static char *vc_show_rusage(struct rusage * ru0);
/*
* This routines handle a special cross-transaction portal.
* However it is automatically closed in case of abort.
*/
void CommonSpecialPortalOpen(void)
{
char *pname;
/*
* Create a portal for safe memory across transactions. We need to
* palloc the name space for it because our hash function expects the
* name to be on a longword boundary. CreatePortal copies the name to
* safe storage for us.
*/
pname = pstrdup(VACPNAME);
vc_portal = CreatePortal(pname);
pfree(pname);
/*
* Set flag to indicate that vc_portal must be removed after an error.
* This global variable is checked in the transaction manager on xact
* abort, and the routine CommonSpecialPortalClose() is called if
* necessary.
*/
CommonSpecialPortalInUse = true;
}
void CommonSpecialPortalClose(void)
{
/* Clear flag first, to avoid recursion if PortalDrop elog's */
CommonSpecialPortalInUse = false;
/*
* Release our portal for cross-transaction memory.
*/
PortalDrop(&vc_portal);
}
PortalVariableMemory CommonSpecialPortalGetMemory(void)
{
return PortalGetVariableMemory(vc_portal);
}
bool CommonSpecialPortalIsOpen(void)
{
return CommonSpecialPortalInUse;
}
void
vacuum(char *vacrel, bool verbose, bool analyze, List *va_spec)
{
@@ -136,7 +183,7 @@ vacuum(char *vacrel, bool verbose, bool analyze, List *va_spec)
strcpy(NameStr(VacRel), vacrel);
/* must also copy the column list, if any, to safe storage */
pmem = PortalGetVariableMemory(vc_portal);
pmem = CommonSpecialPortalGetMemory();
old = MemoryContextSwitchTo((MemoryContext) pmem);
foreach(le, va_spec)
{
@@ -179,24 +226,7 @@ vacuum(char *vacrel, bool verbose, bool analyze, List *va_spec)
static void
vc_init()
{
char *pname;
/*
* Create a portal for safe memory across transactions. We need to
* palloc the name space for it because our hash function expects the
* name to be on a longword boundary. CreatePortal copies the name to
* safe storage for us.
*/
pname = pstrdup(VACPNAME);
vc_portal = CreatePortal(pname);
pfree(pname);
/*
* Set flag to indicate that vc_portal must be removed after an error.
* This global variable is checked in the transaction manager on xact
* abort, and the routine vc_abort() is called if necessary.
*/
VacuumRunning = true;
CommonSpecialPortalOpen();
/* matches the StartTransaction in PostgresMain() */
CommitTransactionCommand();
@@ -219,30 +249,12 @@ vc_shutdown()
*/
unlink(RELCACHE_INIT_FILENAME);
/*
* Release our portal for cross-transaction memory.
*/
PortalDrop(&vc_portal);
/* okay, we're done */
VacuumRunning = false;
CommonSpecialPortalClose();
/* matches the CommitTransaction in PostgresMain() */
StartTransactionCommand();
}
void
vc_abort()
{
/* Clear flag first, to avoid recursion if PortalDrop elog's */
VacuumRunning = false;
/*
* Release our portal for cross-transaction memory.
*/
PortalDrop(&vc_portal);
}
/*
* vc_vacuum() -- vacuum the database.
*
@@ -302,7 +314,7 @@ vc_getrels(NameData *VacRelP)
F_CHAREQ, CharGetDatum('r'));
}
portalmem = PortalGetVariableMemory(vc_portal);
portalmem = CommonSpecialPortalGetMemory();
vrl = cur = (VRelList) NULL;
rel = heap_openr(RelationRelationName, AccessShareLock);
@@ -379,6 +391,7 @@ vc_vacone(Oid relid, bool analyze, List *va_cols)
int32 nindices,
i;
VRelStats *vacrelstats;
bool reindex = false;
StartTransactionCommand();
@@ -552,17 +565,31 @@ vc_vacone(Oid relid, bool analyze, List *va_cols)
GetXmaxRecent(&XmaxRecent);
/* scan it */
reindex = false;
vacuum_pages.vpl_num_pages = fraged_pages.vpl_num_pages = 0;
vc_scanheap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
if (IsIgnoringSystemIndexes() && IsSystemRelationName(RelationGetRelationName(onerel)))
reindex = true;
/* Now open indices */
nindices = 0;
Irel = (Relation *) NULL;
vc_getindices(vacrelstats->relid, &nindices, &Irel);
if (!Irel)
reindex = false;
else if (!RelationGetForm(onerel)->relhasindex)
reindex = true;
if (nindices > 0)
vacrelstats->hasindex = true;
else
vacrelstats->hasindex = false;
if (reindex)
{
for (i = 0; i < nindices; i++)
index_close(Irel[i]);
Irel = (Relation *) NULL;
activate_indexes_of_a_table(relid, false);
}
/* Clean/scan index relation(s) */
if (Irel != (Relation *) NULL)
@@ -590,6 +617,8 @@ vc_vacone(Oid relid, bool analyze, List *va_cols)
* vacuum_pages list */
vc_vacheap(vacrelstats, onerel, &vacuum_pages);
}
if (reindex)
activate_indexes_of_a_table(relid, true);
/* ok - free vacuum_pages list of reaped pages */
if (vacuum_pages.vpl_num_pages > 0)