1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-14 18:42:34 +03:00

ALTER TABLE SET TABLESPACE. Gavin Sherry, some rework by Tom Lane.

This commit is contained in:
Tom Lane
2004-07-11 23:13:58 +00:00
parent 08d89db34d
commit af4de81469
6 changed files with 405 additions and 76 deletions

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.118 2004/07/01 00:50:10 tgl Exp $
* $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.119 2004/07/11 23:13:53 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -52,6 +52,7 @@
#include "parser/parse_relation.h"
#include "parser/parse_type.h"
#include "rewrite/rewriteHandler.h"
#include "storage/smgr.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
@ -120,6 +121,7 @@ typedef struct AlteredTableInfo
/* Information saved by Phases 1/2 for Phase 3: */
List *constraints; /* List of NewConstraint */
List *newvals; /* List of NewColumnValue */
Oid newTableSpace; /* new tablespace; 0 means no change */
/* Objects to rebuild after completing ALTER TYPE operations */
List *changedConstraintOids; /* OIDs of constraints to rebuild */
List *changedConstraintDefs; /* string definitions of same */
@ -237,6 +239,10 @@ static void ATPostAlterTypeParse(char *cmd, List **wqueue);
static void ATExecChangeOwner(Oid relationOid, int32 newOwnerSysId);
static void ATExecClusterOn(Relation rel, const char *indexName);
static void ATExecDropCluster(Relation rel);
static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel,
char *tablespacename);
static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace);
static void copy_relation_data(Relation rel, SMgrRelation dst);
static int ri_trigger_type(Oid tgfoid);
static void update_ri_trigger_args(Oid relid,
const char *oldname,
@ -1946,6 +1952,11 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
}
pass = AT_PASS_DROP;
break;
case AT_SetTableSpace: /* SET TABLESPACE */
/* This command never recurses */
ATPrepSetTableSpace(tab, rel, cmd->name);
pass = AT_PASS_MISC; /* doesn't actually matter */
break;
default: /* oops */
elog(ERROR, "unrecognized alter table type: %d",
(int) cmd->subtype);
@ -2097,6 +2108,11 @@ ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd)
* to do the real work
*/
break;
case AT_SetTableSpace: /* SET TABLESPACE */
/*
* Nothing to do here; Phase 3 does the work
*/
break;
default: /* oops */
elog(ERROR, "unrecognized alter table type: %d",
(int) cmd->subtype);
@ -2132,6 +2148,7 @@ ATRewriteTables(List **wqueue)
/* Build a temporary relation and copy data */
Oid OIDNewHeap;
char NewHeapName[NAMEDATALEN];
Oid NewTableSpace;
Relation OldHeap;
ObjectAddress object;
@ -2157,6 +2174,15 @@ ATRewriteTables(List **wqueue)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot rewrite temporary tables of other sessions")));
/*
* Select destination tablespace (same as original unless user
* requested a change)
*/
if (tab->newTableSpace)
NewTableSpace = tab->newTableSpace;
else
NewTableSpace = OldHeap->rd_rel->reltablespace;
heap_close(OldHeap, NoLock);
/*
@ -2170,7 +2196,7 @@ ATRewriteTables(List **wqueue)
snprintf(NewHeapName, sizeof(NewHeapName),
"pg_temp_%u", tab->relid);
OIDNewHeap = make_new_heap(tab->relid, NewHeapName);
OIDNewHeap = make_new_heap(tab->relid, NewHeapName, NewTableSpace);
/*
* Copy the heap data into the new table with the desired
@ -2179,8 +2205,8 @@ ATRewriteTables(List **wqueue)
*/
ATRewriteTable(tab, OIDNewHeap);
/* Swap the relfilenodes of the old and new heaps. */
swap_relfilenodes(tab->relid, OIDNewHeap);
/* Swap the physical files of the old and new heaps. */
swap_relation_files(tab->relid, OIDNewHeap);
CommandCounterIncrement();
@ -2203,13 +2229,20 @@ ATRewriteTables(List **wqueue)
*/
reindex_relation(tab->relid, false);
}
else if (tab->constraints != NIL)
else
{
/*
* Test the current data within the table against new constraints
* generated by ALTER TABLE commands, but don't rebuild data.
*/
ATRewriteTable(tab, InvalidOid);
if (tab->constraints != NIL)
ATRewriteTable(tab, InvalidOid);
/*
* If we had SET TABLESPACE but no reason to reconstruct tuples,
* just do a block-by-block copy.
*/
if (tab->newTableSpace)
ATExecSetTableSpace(tab->relid, tab->newTableSpace);
}
}
@ -5185,6 +5218,249 @@ ATExecDropCluster(Relation rel)
mark_index_clustered(rel, InvalidOid);
}
/*
* ALTER TABLE SET TABLESPACE
*/
static void
ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename)
{
Oid tablespaceId;
AclResult aclresult;
/*
* We do our own permission checking because we want to allow this on
* indexes.
*/
if (rel->rd_rel->relkind != RELKIND_RELATION &&
rel->rd_rel->relkind != RELKIND_INDEX)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table or index",
RelationGetRelationName(rel))));
/* Permissions checks */
if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
RelationGetRelationName(rel));
if (!allowSystemTableMods && IsSystemRelation(rel))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied: \"%s\" is a system catalog",
RelationGetRelationName(rel))));
/* Check that the tablespace exists */
tablespaceId = get_tablespace_oid(tablespacename);
if (!OidIsValid(tablespaceId))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("tablespace \"%s\" does not exist", tablespacename)));
/* Check its permissions */
aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), ACL_CREATE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_TABLESPACE, tablespacename);
/* Save info for Phase 3 to do the real work */
if (OidIsValid(tab->newTableSpace))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("multiple SET TABLESPACE subcommands are not valid")));
tab->newTableSpace = tablespaceId;
}
/*
* Execute ALTER TABLE SET TABLESPACE for cases where there is no tuple
* rewriting to be done, so we just want to copy the data as fast as possible.
*/
static void
ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
{
Relation rel;
Oid oldTableSpace;
Oid reltoastrelid;
Oid reltoastidxid;
RelFileNode newrnode;
SMgrRelation dstrel;
Relation pg_class;
HeapTuple tuple;
Form_pg_class rd_rel;
rel = relation_open(tableOid, NoLock);
/*
* We can never allow moving of shared or nailed-in-cache relations,
* because we can't support changing their reltablespace values.
*/
if (rel->rd_rel->relisshared || rel->rd_isnailed)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot move system relation \"%s\"",
RelationGetRelationName(rel))));
/*
* Don't allow moving temp tables of other backends ... their
* local buffer manager is not going to cope.
*/
if (isOtherTempNamespace(RelationGetNamespace(rel)))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot move temporary tables of other sessions")));
/*
* No work if no change in tablespace.
*/
oldTableSpace = rel->rd_rel->reltablespace;
if (newTableSpace == oldTableSpace ||
(newTableSpace == MyDatabaseTableSpace && oldTableSpace == 0))
{
relation_close(rel, NoLock);
return;
}
reltoastrelid = rel->rd_rel->reltoastrelid;
reltoastidxid = rel->rd_rel->reltoastidxid;
/* Get a modifiable copy of the relation's pg_class row */
pg_class = heap_openr(RelationRelationName, RowExclusiveLock);
tuple = SearchSysCacheCopy(RELOID,
ObjectIdGetDatum(tableOid),
0, 0, 0);
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u", tableOid);
rd_rel = (Form_pg_class) GETSTRUCT(tuple);
/* create another storage file. Is it a little ugly ? */
/* NOTE: any conflict in relfilenode value will be caught here */
newrnode = rel->rd_node;
newrnode.spcNode = newTableSpace;
dstrel = smgropen(newrnode);
smgrcreate(dstrel, rel->rd_istemp, false);
/* schedule unlinking old physical file */
if (rel->rd_smgr == NULL)
rel->rd_smgr = smgropen(rel->rd_node);
smgrscheduleunlink(rel->rd_smgr, rel->rd_istemp);
/* copy relation data to the new physical file */
copy_relation_data(rel, dstrel);
/*
* Now drop smgr references. We need not smgrclose() the old file,
* since it will be dropped anyway at commit by the pending unlink.
* We do need to get rid of relcache's reference to it, however.
*/
smgrclose(dstrel);
rel->rd_smgr = NULL;
/* update the pg_class row */
rd_rel->reltablespace = (newTableSpace == MyDatabaseTableSpace) ? InvalidOid : newTableSpace;
simple_heap_update(pg_class, &tuple->t_self, tuple);
CatalogUpdateIndexes(pg_class, tuple);
heap_freetuple(tuple);
heap_close(pg_class, RowExclusiveLock);
relation_close(rel, NoLock);
/* Make sure the reltablespace change is visible */
CommandCounterIncrement();
/* Move associated toast relation and/or index, too */
if (OidIsValid(reltoastrelid))
ATExecSetTableSpace(reltoastrelid, newTableSpace);
if (OidIsValid(reltoastidxid))
ATExecSetTableSpace(reltoastidxid, newTableSpace);
}
/*
* Copy data, block by block
*/
static void
copy_relation_data(Relation rel, SMgrRelation dst)
{
SMgrRelation src = rel->rd_smgr;
bool use_wal;
BlockNumber nblocks;
BlockNumber blkno;
char buf[BLCKSZ];
Page page = (Page) buf;
/*
* Since we copy the data directly without looking at the shared buffers,
* we'd better first flush out any pages of the source relation that are
* in shared buffers. We assume no new pages will get loaded into
* buffers while we are holding exclusive lock on the rel.
*/
FlushRelationBuffers(rel, 0);
/*
* We need to log the copied data in WAL iff WAL archiving is enabled
* AND it's not a temp rel.
*
* XXX when WAL archiving is actually supported, this test will likely
* need to change; and the hardwired extern is cruddy anyway ...
*/
{
extern char XLOG_archive_dir[];
use_wal = XLOG_archive_dir[0] && !rel->rd_istemp;
}
nblocks = RelationGetNumberOfBlocks(rel);
for (blkno = 0; blkno < nblocks; blkno++)
{
smgrread(src, blkno, buf);
/* XLOG stuff */
if (use_wal)
{
xl_heap_newpage xlrec;
XLogRecPtr recptr;
XLogRecData rdata[2];
/* NO ELOG(ERROR) from here till newpage op is logged */
START_CRIT_SECTION();
xlrec.node = dst->smgr_rnode;
xlrec.blkno = blkno;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfHeapNewpage;
rdata[0].next = &(rdata[1]);
rdata[1].buffer = InvalidBuffer;
rdata[1].data = (char *) page;
rdata[1].len = BLCKSZ;
rdata[1].next = NULL;
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
END_CRIT_SECTION();
}
/*
* Now write the page. If not using WAL, say isTemp = true, to
* suppress duplicate fsync. If we are using WAL, it surely isn't a
* temp rel, so !use_wal is a sufficient condition.
*/
smgrwrite(dst, blkno, buf, !use_wal);
}
/*
* If we weren't using WAL, and the rel isn't temp, we must fsync it
* down to disk before it's safe to commit the transaction.
*/
if (!use_wal && !rel->rd_istemp)
smgrimmedsync(dst);
}
/*
* ALTER TABLE CREATE TOAST TABLE