1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-14 18:42:34 +03:00

Trial implementation of ALTER DROP COLUMN.

They are #ifdef'd.
Add -D_DROP_COLUMN_HACK__ compile option
to evaluate it.
This commit is contained in:
Hiroshi Inoue
2000-03-09 05:00:26 +00:00
parent 6513946cbb
commit fd9ff86bd9
8 changed files with 514 additions and 10 deletions

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/Attic/command.c,v 1.69 2000/02/15 18:15:12 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/commands/Attic/command.c,v 1.70 2000/03/09 05:00:23 inoue Exp $
*
* NOTES
* The PortalExecutorHeapMemory crap needs to be eliminated
@ -43,6 +43,15 @@
#include "utils/syscache.h"
#include "utils/temprel.h"
#include "commands/trigger.h"
#ifdef _DROP_COLUMN_HACK__
#include "catalog/pg_index.h"
#include "catalog/pg_relcheck.h"
#include "commands/defrem.h"
#include "commands/comment.h"
#include "access/genam.h"
#include "optimizer/clauses.h"
#include "../parser/parse.h"
#endif /* _DROP_COLUMN_HACK__ */
/* ----------------
* PortalExecutorHeapMemory stuff
@ -668,6 +677,213 @@ drop_default(Oid relid, int16 attnum)
}
#ifdef _DROP_COLUMN_HACK__
/*
* ALTER TABLE DROP COLUMN trial implementation
*
*/
/*
* system table scan(index scan/sequential scan)
*/
typedef struct SysScanDescData
{
Relation heap_rel;
Relation irel;
HeapScanDesc scan;
IndexScanDesc iscan;
HeapTupleData tuple;
Buffer buffer;
} SysScanDescData, *SysScanDesc;
static void *
systable_beginscan(Relation rel, const char *indexRelname, int nkeys, ScanKey entry)
{
bool hasindex = (rel->rd_rel->relhasindex && !IsIgnoringSystemIndexes());
SysScanDesc sysscan;
sysscan = (SysScanDesc) palloc(sizeof(SysScanDescData));
sysscan->heap_rel = rel;
sysscan->irel = (Relation) NULL;
sysscan->tuple.t_datamcxt = NULL;
sysscan->tuple.t_data = NULL;
sysscan->buffer = InvalidBuffer;
if (hasindex)
{
sysscan->irel = index_openr((char *)indexRelname);
sysscan->iscan = index_beginscan(sysscan->irel, false, nkeys, entry);
}
else
sysscan->scan = heap_beginscan(rel, false, SnapshotNow, nkeys, entry);
return (void *) sysscan;
}
static void
systable_endscan(void *scan)
{
SysScanDesc sysscan = (SysScanDesc) scan;
if (sysscan->irel)
{
if (BufferIsValid(sysscan->buffer))
ReleaseBuffer(sysscan->buffer);
index_endscan(sysscan->iscan);
index_close(sysscan->irel);
}
else
heap_endscan(sysscan->scan);
pfree(scan);
}
static HeapTuple
systable_getnext(void *scan)
{
SysScanDesc sysscan = (SysScanDesc) scan;
HeapTuple htup = (HeapTuple) NULL;
RetrieveIndexResult indexRes;
if (sysscan->irel)
{
if (BufferIsValid(sysscan->buffer))
{
ReleaseBuffer(sysscan->buffer);
sysscan->buffer = InvalidBuffer;
}
while (indexRes = index_getnext(sysscan->iscan, ForwardScanDirection), indexRes != NULL)
{
sysscan->tuple.t_self = indexRes->heap_iptr;
heap_fetch(sysscan->heap_rel, SnapshotNow, &sysscan->tuple, &(sysscan->buffer));
pfree(indexRes);
if (sysscan->tuple.t_data != NULL)
{
htup = &sysscan->tuple;
break;
}
}
}
else
htup = heap_getnext(sysscan->scan, 0);
return htup;
}
/*
* find a specified attribute in a node entry
*/
static bool
find_attribute_walker(Node *node, int attnum)
{
if (node == NULL)
return false;
if (IsA(node, Var))
{
Var *var = (Var *) node;
if (var->varlevelsup == 0 && var->varno == 1 &&
var->varattno == attnum)
return true;
}
return expression_tree_walker(node, find_attribute_walker, (void *)attnum);
}
static bool
find_attribute_in_node(Node *node, int attnum)
{
return expression_tree_walker(node, find_attribute_walker, (void *)attnum);
}
/*
* Remove/check references for the column
*/
static bool
RemoveColumnReferences(Oid reloid, int attnum, bool checkonly, HeapTuple reltup)
{
Relation indexRelation, rcrel;
ScanKeyData entry;
HeapScanDesc scan;
void *sysscan;
HeapTuple htup, indexTuple;
Form_pg_index index;
Form_pg_relcheck relcheck;
Form_pg_class pgcform = (Form_pg_class) NULL;
int i;
bool checkok = true;
if (!checkonly)
pgcform = (Form_pg_class) GETSTRUCT (reltup);
/*
* Remove/check constraints here
*/
ScanKeyEntryInitialize(&entry, (bits16) 0x0, Anum_pg_relcheck_rcrelid,
(RegProcedure) F_OIDEQ, ObjectIdGetDatum(reloid));
rcrel = heap_openr(RelCheckRelationName, RowExclusiveLock);
sysscan = systable_beginscan(rcrel, RelCheckIndex,1 ,&entry);
while (HeapTupleIsValid(htup = systable_getnext(sysscan)))
{
char *ccbin;
Node *node;
relcheck = (Form_pg_relcheck) GETSTRUCT(htup);
ccbin = textout(&relcheck->rcbin);
if (!ccbin)
continue;
node = stringToNode(ccbin);
pfree(ccbin);
if (find_attribute_in_node(node, attnum))
{
if (checkonly)
{
checkok = false;
elog(ERROR, "target column is used in a constraint");
}
else
{
heap_delete(rcrel, &htup->t_self, NULL);
pgcform->relchecks--;
}
}
}
systable_endscan(sysscan);
heap_close(rcrel, NoLock);
/*
* What to do with triggers/rules/views/procedues ?
*/
/*
* Remove/check indexes
*/
indexRelation = heap_openr(IndexRelationName, RowExclusiveLock);
ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid, F_OIDEQ,
ObjectIdGetDatum(reloid));
scan = heap_beginscan(indexRelation, false, SnapshotNow, 1, &entry);
while (HeapTupleIsValid(indexTuple = heap_getnext(scan, 0)))
{
index = (Form_pg_index) GETSTRUCT(indexTuple);
for (i = 0; i < INDEX_MAX_KEYS; i++)
{
if (index->indkey[i] == InvalidAttrNumber)
break;
else if (index->indkey[i] == attnum)
{
if (checkonly)
{
checkok = false;
elog(ERROR, "target column is used in an index");
}
else
{
htup = SearchSysCacheTuple(RELOID,
ObjectIdGetDatum(index->indexrelid),
0, 0, 0);
RemoveIndex(NameStr(((Form_pg_class) GETSTRUCT(htup))->relname));
}
break;
}
}
}
heap_endscan(scan);
heap_close(indexRelation, NoLock);
return checkok;
}
#endif /* _DROP_COLUMN_HACK__ */
/*
* ALTER TABLE DROP COLUMN
@ -677,7 +893,170 @@ AlterTableDropColumn(const char *relationName,
bool inh, const char *colName,
int behavior)
{
#ifdef _DROP_COLUMN_HACK__
Relation rel, attrdesc, adrel;
Oid myrelid, attoid;
HeapTuple reltup;
HeapTupleData classtuple;
Buffer buffer;
Form_pg_attribute attribute;
HeapTuple tup;
Relation idescs[Num_pg_attr_indices];
int attnum;
bool hasindex;
char dropColname[32];
void *sysscan;
ScanKeyData scankeys[2];
if (inh)
elog(ERROR, "ALTER TABLE / DROP COLUMN with inherit option is not supported yet");
/*
* permissions checking. this would normally be done in utility.c,
* but this particular routine is recursive.
*
* normally, only the owner of a class can change its schema.
*/
if (!allowSystemTableMods && IsSystemRelationName(relationName))
elog(ERROR, "ALTER TABLE: relation \"%s\" is a system catalog",
relationName);
#ifndef NO_SECURITY
if (!pg_ownercheck(UserName, relationName, RELNAME))
elog(ERROR, "ALTER TABLE: permission denied");
#endif
/*
* Grab an exclusive lock on the target table, which we will NOT release
* until end of transaction.
*/
rel = heap_openr(relationName, AccessExclusiveLock);
myrelid = RelationGetRelid(rel);
heap_close(rel, NoLock); /* close rel but keep lock! */
/*
* What to do when rel has inheritors ?
*/
if (length(find_all_inheritors(myrelid)) > 1)
elog(ERROR, "ALTER TABLE: cannot drop a column on table that is inherited from");
/*
* lock the pg_class tuple for update
*/
reltup = SearchSysCacheTuple(RELNAME, PointerGetDatum(relationName),
0, 0, 0);
if (!HeapTupleIsValid(reltup))
elog(ERROR, "ALTER TABLE: relation \"%s\" not found",
relationName);
rel = heap_openr(RelationRelationName, RowExclusiveLock);
classtuple.t_self = reltup->t_self;
switch (heap_mark4update(rel, &classtuple, &buffer))
{
case HeapTupleSelfUpdated:
case HeapTupleMayBeUpdated:
break;
default:
elog(ERROR, "couldn't lock pg_class tuple");
}
reltup = heap_copytuple(&classtuple);
ReleaseBuffer(buffer);
/*
* XXX is the following check sufficient?
*/
if (((Form_pg_class) GETSTRUCT(reltup))->relkind != RELKIND_RELATION)
{
elog(ERROR, "ALTER TABLE: relation \"%s\" is not a table",
relationName);
}
attrdesc = heap_openr(AttributeRelationName, RowExclusiveLock);
/*
* Get the target pg_attribute tuple
*/
tup = SearchSysCacheTupleCopy(ATTNAME,
ObjectIdGetDatum(reltup->t_data->t_oid),
PointerGetDatum(colName), 0, 0);
if (!HeapTupleIsValid(tup))
elog(ERROR, "ALTER TABLE: column name \"%s\" doesn't exist in table \"%s\"",
colName, relationName);
attribute = (Form_pg_attribute) GETSTRUCT(tup);
if (attribute->attnum <= 0)
elog(ERROR, "ALTER TABLE: column name \"%s\" was already dropped", colName);
attnum = attribute->attnum;
attoid = tup->t_data->t_oid;
/*
* Check constraints/indices etc here
*/
if (behavior != CASCADE)
{
if (!RemoveColumnReferences(myrelid, attnum, true, NULL))
elog(ERROR, "the column is referenced");
}
/*
* change the target pg_attribute tuple
*/
sprintf(dropColname, "*already Dropped*%d", attnum);
namestrcpy(&(attribute->attname), dropColname);
ATTRIBUTE_DROP_COLUMN(attribute);
heap_update(attrdesc, &tup->t_self, tup, NULL);
hasindex = (!IsIgnoringSystemIndexes() && RelationGetForm(attrdesc)->relhasindex);
if (hasindex)
{
CatalogOpenIndices(Num_pg_attr_indices, Name_pg_attr_indices, idescs);
CatalogIndexInsert(idescs, Num_pg_attr_indices,
attrdesc, tup);
CatalogCloseIndices(Num_pg_attr_indices, idescs);
}
heap_close(attrdesc, NoLock);
heap_freetuple(tup);
/* delete comments */
DeleteComments(attoid);
/* delete attrdef */
adrel = heap_openr(AttrDefaultRelationName, RowExclusiveLock);
ScanKeyEntryInitialize(&scankeys[0], 0x0, Anum_pg_attrdef_adrelid,
F_OIDEQ, ObjectIdGetDatum(myrelid));
/* Oops pg_attrdef doesn't have (adrelid,adnum) index
ScanKeyEntryInitialize(&scankeys[1], 0x0, Anum_pg_attrdef_adnum,
F_INT2EQ, Int16GetDatum(attnum));
sysscan = systable_beginscan(adrel, AttrDefaultIndex, 2, scankeys);
*/
sysscan = systable_beginscan(adrel, AttrDefaultIndex, 1, scankeys);
while (HeapTupleIsValid(tup = systable_getnext(sysscan)))
{
if (((Form_pg_attrdef) GETSTRUCT(tup))->adnum == attnum)
{
heap_delete(adrel, &tup->t_self, NULL);
break;
}
}
systable_endscan(sysscan);
heap_close(adrel, NoLock);
/*
* Remove objects which reference this column
*/
if (behavior == CASCADE)
{
Relation ridescs[Num_pg_class_indices];
RemoveColumnReferences(myrelid, attnum, false, reltup);
/* update pg_class tuple */
heap_update(rel, &reltup->t_self, reltup, NULL);
CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, ridescs);
CatalogIndexInsert(ridescs, Num_pg_class_indices, rel, reltup);
CatalogCloseIndices(Num_pg_class_indices, ridescs);
}
heap_freetuple(reltup);
heap_close(rel, NoLock);
#else
elog(ERROR, "ALTER TABLE / DROP COLUMN is not implemented");
#endif /* _DROP_COLUMN_HACK__ */
}