1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-30 11:03:19 +03:00

Implement ALTER TABLE .. SET LOGGED / UNLOGGED

This enables changing permanent (logged) tables to unlogged and
vice-versa.

(Docs for ALTER TABLE / SET TABLESPACE got shuffled in an order that
hopefully makes more sense than the original.)

Author: Fabrízio de Royes Mello
Reviewed by: Christoph Berg, Andres Freund, Thom Brown
Some tweaking by Álvaro Herrera
This commit is contained in:
Alvaro Herrera
2014-08-22 14:27:00 -04:00
parent 01d15a2677
commit f41872d0c1
11 changed files with 472 additions and 55 deletions

View File

@ -574,7 +574,8 @@ rebuild_relation(Relation OldHeap, Oid indexOid, bool verbose)
heap_close(OldHeap, NoLock);
/* Create the transient table that will receive the re-ordered data */
OIDNewHeap = make_new_heap(tableOid, tableSpace, false,
OIDNewHeap = make_new_heap(tableOid, tableSpace,
OldHeap->rd_rel->relpersistence,
AccessExclusiveLock);
/* Copy the heap data into the new table in the desired order */
@ -595,13 +596,14 @@ rebuild_relation(Relation OldHeap, Oid indexOid, bool verbose)
* Create the transient table that will be filled with new data during
* CLUSTER, ALTER TABLE, and similar operations. The transient table
* duplicates the logical structure of the OldHeap, but is placed in
* NewTableSpace which might be different from OldHeap's.
* NewTableSpace which might be different from OldHeap's. Also, it's built
* with the specified persistence, which might differ from the original's.
*
* After this, the caller should load the new heap with transferred/modified
* data, then call finish_heap_swap to complete the operation.
*/
Oid
make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, bool forcetemp,
make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, char relpersistence,
LOCKMODE lockmode)
{
TupleDesc OldHeapDesc;
@ -613,7 +615,6 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, bool forcetemp,
Datum reloptions;
bool isNull;
Oid namespaceid;
char relpersistence;
OldHeap = heap_open(OIDOldHeap, lockmode);
OldHeapDesc = RelationGetDescr(OldHeap);
@ -636,16 +637,10 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, bool forcetemp,
if (isNull)
reloptions = (Datum) 0;
if (forcetemp)
{
if (relpersistence == RELPERSISTENCE_TEMP)
namespaceid = LookupCreationNamespace("pg_temp");
relpersistence = RELPERSISTENCE_TEMP;
}
else
{
namespaceid = RelationGetNamespace(OldHeap);
relpersistence = OldHeap->rd_rel->relpersistence;
}
/*
* Create the new heap, using a temporary name in the same namespace as
@ -1109,8 +1104,10 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, bool verbose,
/*
* Swap the physical files of two given relations.
*
* We swap the physical identity (reltablespace and relfilenode) while
* keeping the same logical identities of the two relations.
* We swap the physical identity (reltablespace, relfilenode) while keeping the
* same logical identities of the two relations. relpersistence is also
* swapped, which is critical since it determines where buffers live for each
* relation.
*
* We can swap associated TOAST data in either of two ways: recursively swap
* the physical content of the toast tables (and their indexes), or swap the
@ -1146,6 +1143,7 @@ swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class,
Oid relfilenode1,
relfilenode2;
Oid swaptemp;
char swptmpchr;
CatalogIndexState indstate;
/* We need writable copies of both pg_class tuples. */
@ -1166,7 +1164,10 @@ swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class,
if (OidIsValid(relfilenode1) && OidIsValid(relfilenode2))
{
/* Normal non-mapped relations: swap relfilenodes and reltablespaces */
/*
* Normal non-mapped relations: swap relfilenodes, reltablespaces,
* relpersistence
*/
Assert(!target_is_pg_class);
swaptemp = relform1->relfilenode;
@ -1177,6 +1178,10 @@ swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class,
relform1->reltablespace = relform2->reltablespace;
relform2->reltablespace = swaptemp;
swptmpchr = relform1->relpersistence;
relform1->relpersistence = relform2->relpersistence;
relform2->relpersistence = swptmpchr;
/* Also swap toast links, if we're swapping by links */
if (!swap_toast_by_content)
{
@ -1196,15 +1201,18 @@ swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class,
NameStr(relform1->relname));
/*
* We can't change the tablespace of a mapped rel, and we can't handle
* toast link swapping for one either, because we must not apply any
* critical changes to its pg_class row. These cases should be
* prevented by upstream permissions tests, so this check is a
* non-user-facing emergency backstop.
* We can't change the tablespace nor persistence of a mapped rel, and
* we can't handle toast link swapping for one either, because we must
* not apply any critical changes to its pg_class row. These cases
* should be prevented by upstream permissions tests, so these checks
* are non-user-facing emergency backstop.
*/
if (relform1->reltablespace != relform2->reltablespace)
elog(ERROR, "cannot change tablespace of mapped relation \"%s\"",
NameStr(relform1->relname));
if (relform1->relpersistence != relform2->relpersistence)
elog(ERROR, "cannot change persistence of mapped relation \"%s\"",
NameStr(relform1->relname));
if (!swap_toast_by_content &&
(relform1->reltoastrelid || relform2->reltoastrelid))
elog(ERROR, "cannot swap toast by links for mapped relation \"%s\"",

View File

@ -147,6 +147,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
DestReceiver *dest;
bool concurrent;
LOCKMODE lockmode;
char relpersistence;
/* Determine strength of lock needed. */
concurrent = stmt->concurrent;
@ -233,9 +234,15 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
/* Concurrent refresh builds new data in temp tablespace, and does diff. */
if (concurrent)
{
tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP);
relpersistence = RELPERSISTENCE_TEMP;
}
else
{
tableSpace = matviewRel->rd_rel->reltablespace;
relpersistence = matviewRel->rd_rel->relpersistence;
}
owner = matviewRel->rd_rel->relowner;
@ -244,7 +251,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
* it against access by any other process until commit (by which time it
* will be gone).
*/
OIDNewHeap = make_new_heap(matviewOid, tableSpace, concurrent,
OIDNewHeap = make_new_heap(matviewOid, tableSpace, relpersistence,
ExclusiveLock);
LockRelationOid(OIDNewHeap, AccessExclusiveLock);
dest = CreateTransientRelDestReceiver(OIDNewHeap);

View File

@ -152,6 +152,8 @@ typedef struct AlteredTableInfo
bool new_notnull; /* T if we added new NOT NULL constraints */
bool rewrite; /* T if a rewrite is forced */
Oid newTableSpace; /* new tablespace; 0 means no change */
bool chgLoggedness; /* T if SET LOGGED/UNLOGGED is used */
char newrelpersistence; /* if above is true */
/* Objects to rebuild after completing ALTER TYPE operations */
List *changedConstraintOids; /* OIDs of constraints to rebuild */
List *changedConstraintDefs; /* string definitions of same */
@ -372,7 +374,8 @@ static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
AlterTableCmd *cmd, LOCKMODE lockmode);
static void ATExecAlterColumnGenericOptions(Relation rel, const char *colName,
List *options, LOCKMODE lockmode);
static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode);
static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab,
LOCKMODE lockmode);
static void ATPostAlterTypeParse(Oid oldId, Oid oldRelId, Oid refRelId,
char *cmd, List **wqueue, LOCKMODE lockmode,
bool rewrite);
@ -382,8 +385,11 @@ static void change_owner_fix_column_acls(Oid relationOid,
Oid oldOwnerId, Oid newOwnerId);
static void change_owner_recurse_to_sequences(Oid relationOid,
Oid newOwnerId, LOCKMODE lockmode);
static void ATExecClusterOn(Relation rel, const char *indexName, LOCKMODE lockmode);
static void ATExecClusterOn(Relation rel, const char *indexName,
LOCKMODE lockmode);
static void ATExecDropCluster(Relation rel, LOCKMODE lockmode);
static bool ATPrepChangeLoggedness(Relation rel, bool toLogged);
static void ATChangeIndexesLoggedness(Oid relid, char relpersistence);
static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel,
char *tablespacename, LOCKMODE lockmode);
static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace, LOCKMODE lockmode);
@ -2949,6 +2955,11 @@ AlterTableGetLockLevel(List *cmds)
cmd_lockmode = ShareUpdateExclusiveLock;
break;
case AT_SetLogged:
case AT_SetUnLogged:
cmd_lockmode = AccessExclusiveLock;
break;
case AT_ValidateConstraint: /* Uses MVCC in
* getConstraints() */
cmd_lockmode = ShareUpdateExclusiveLock;
@ -3161,6 +3172,24 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
case AT_SetLogged: /* SET LOGGED */
ATSimplePermissions(rel, ATT_TABLE);
tab->chgLoggedness = ATPrepChangeLoggedness(rel, true);
tab->newrelpersistence = RELPERSISTENCE_PERMANENT;
/* force rewrite if necessary */
if (tab->chgLoggedness)
tab->rewrite = true;
pass = AT_PASS_MISC;
break;
case AT_SetUnLogged: /* SET UNLOGGED */
ATSimplePermissions(rel, ATT_TABLE);
tab->chgLoggedness = ATPrepChangeLoggedness(rel, false);
tab->newrelpersistence = RELPERSISTENCE_UNLOGGED;
/* force rewrite if necessary */
if (tab->chgLoggedness)
tab->rewrite = true;
pass = AT_PASS_MISC;
break;
case AT_AddOids: /* SET WITH OIDS */
ATSimplePermissions(rel, ATT_TABLE);
if (!rel->rd_rel->relhasoids || recursing)
@ -3431,6 +3460,9 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
case AT_DropCluster: /* SET WITHOUT CLUSTER */
ATExecDropCluster(rel, lockmode);
break;
case AT_SetLogged: /* SET LOGGED */
case AT_SetUnLogged: /* SET UNLOGGED */
break;
case AT_AddOids: /* SET WITH OIDS */
/* Use the ADD COLUMN code, unless prep decided to do nothing */
if (cmd->def != NULL)
@ -3584,7 +3616,8 @@ ATRewriteTables(List **wqueue, LOCKMODE lockmode)
/*
* We only need to rewrite the table if at least one column needs to
* be recomputed, or we are adding/removing the OID column.
* be recomputed, we are adding/removing the OID column, or we are
* changing its persistence.
*/
if (tab->rewrite)
{
@ -3592,6 +3625,7 @@ ATRewriteTables(List **wqueue, LOCKMODE lockmode)
Relation OldHeap;
Oid OIDNewHeap;
Oid NewTableSpace;
char persistence;
OldHeap = heap_open(tab->relid, NoLock);
@ -3630,10 +3664,31 @@ ATRewriteTables(List **wqueue, LOCKMODE lockmode)
else
NewTableSpace = OldHeap->rd_rel->reltablespace;
/*
* Select persistence of transient table (same as original unless
* user requested a change)
*/
persistence = tab->chgLoggedness ?
tab->newrelpersistence : OldHeap->rd_rel->relpersistence;
heap_close(OldHeap, NoLock);
/* Create transient table that will receive the modified data */
OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, false,
/*
* Create transient table that will receive the modified data.
*
* Ensure it is marked correctly as logged or unlogged. We have
* to do this here so that buffers for the new relfilenode will
* have the right persistence set, and at the same time ensure
* that the original filenode's buffers will get read in with the
* correct setting (i.e. the original one). Otherwise a rollback
* after the rewrite would possibly result with buffers for the
* original filenode having the wrong persistence setting.
*
* NB: This relies on swap_relation_files() also swapping the
* persistence. That wouldn't work for pg_class, but that can't be
* unlogged anyway.
*/
OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, persistence,
lockmode);
/*
@ -3643,6 +3698,16 @@ ATRewriteTables(List **wqueue, LOCKMODE lockmode)
*/
ATRewriteTable(tab, OIDNewHeap, lockmode);
/*
* Change the persistence marking of indexes, if necessary. This
* is so that the new copies are built with the right persistence
* in the reindex step below. Note we cannot do this earlier,
* because the rewrite step might read the indexes, and that would
* cause buffers for them to have the wrong setting.
*/
if (tab->chgLoggedness)
ATChangeIndexesLoggedness(tab->relid, tab->newrelpersistence);
/*
* Swap the physical files of the old and new heaps, then rebuild
* indexes and discard the old heap. We can use RecentXmin for
@ -4053,6 +4118,8 @@ ATGetQueueEntry(List **wqueue, Relation rel)
tab->relid = relid;
tab->relkind = rel->rd_rel->relkind;
tab->oldDesc = CreateTupleDescCopy(RelationGetDescr(rel));
tab->newrelpersistence = RELPERSISTENCE_PERMANENT;
tab->chgLoggedness = false;
*wqueue = lappend(*wqueue, tab);
@ -10600,6 +10667,168 @@ ATExecGenericOptions(Relation rel, List *options)
heap_freetuple(tuple);
}
/*
* Preparation phase for SET LOGGED/UNLOGGED
*
* This verifies that we're not trying to change a temp table. Also,
* existing foreign key constraints are checked to avoid ending up with
* permanent tables referencing unlogged tables.
*
* Return value is false if the operation is a no-op (in which case the
* checks are skipped), otherwise true.
*/
static bool
ATPrepChangeLoggedness(Relation rel, bool toLogged)
{
Relation pg_constraint;
HeapTuple tuple;
SysScanDesc scan;
ScanKeyData skey[1];
/*
* Disallow changing status for a temp table. Also verify whether we can
* get away with doing nothing; in such cases we don't need to run the
* checks below, either.
*/
switch (rel->rd_rel->relpersistence)
{
case RELPERSISTENCE_TEMP:
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("cannot change logged status of table %s",
RelationGetRelationName(rel)),
errdetail("Table %s is temporary.",
RelationGetRelationName(rel)),
errtable(rel)));
break;
case RELPERSISTENCE_PERMANENT:
if (toLogged)
/* nothing to do */
return false;
break;
case RELPERSISTENCE_UNLOGGED:
if (!toLogged)
/* nothing to do */
return false;
break;
}
/*
* Check existing foreign key constraints to preserve the invariant that
* no permanent tables cannot reference unlogged ones. Self-referencing
* foreign keys can safely be ignored.
*/
pg_constraint = heap_open(ConstraintRelationId, AccessShareLock);
/*
* Scan conrelid if changing to permanent, else confrelid. This also
* determines whether an useful index exists.
*/
ScanKeyInit(&skey[0],
toLogged ? Anum_pg_constraint_conrelid :
Anum_pg_constraint_confrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(rel)));
scan = systable_beginscan(pg_constraint,
toLogged ? ConstraintRelidIndexId : InvalidOid,
true, NULL, 1, skey);
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
if (con->contype == CONSTRAINT_FOREIGN)
{
Oid foreignrelid;
Relation foreignrel;
/* the opposite end of what we used as scankey */
foreignrelid = toLogged ? con->confrelid : con->conrelid;
/* ignore if self-referencing */
if (RelationGetRelid(rel) == foreignrelid)
continue;
foreignrel = relation_open(foreignrelid, AccessShareLock);
if (toLogged)
{
if (foreignrel->rd_rel->relpersistence != RELPERSISTENCE_PERMANENT)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("cannot change status of table %s to logged",
RelationGetRelationName(rel)),
errdetail("Table %s references unlogged table %s.",
RelationGetRelationName(rel),
RelationGetRelationName(foreignrel)),
errtableconstraint(rel, NameStr(con->conname))));
}
else
{
if (foreignrel->rd_rel->relpersistence == RELPERSISTENCE_PERMANENT)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("cannot change status of table %s to unlogged",
RelationGetRelationName(rel)),
errdetail("Logged table %s is referenced by table %s.",
RelationGetRelationName(foreignrel),
RelationGetRelationName(rel)),
errtableconstraint(rel, NameStr(con->conname))));
}
relation_close(foreignrel, AccessShareLock);
}
}
systable_endscan(scan);
heap_close(pg_constraint, AccessShareLock);
return true;
}
/*
* Update the pg_class entry of each index for the given relation to the
* given persistence.
*/
static void
ATChangeIndexesLoggedness(Oid relid, char relpersistence)
{
Relation rel;
Relation pg_class;
List *indexes;
ListCell *cell;
pg_class = heap_open(RelationRelationId, RowExclusiveLock);
/* We already have a lock on the table */
rel = relation_open(relid, NoLock);
indexes = RelationGetIndexList(rel);
foreach(cell, indexes)
{
Oid indexid = lfirst_oid(cell);
HeapTuple tuple;
Form_pg_class pg_class_form;
tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(indexid));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u",
indexid);
pg_class_form = (Form_pg_class) GETSTRUCT(tuple);
pg_class_form->relpersistence = relpersistence;
simple_heap_update(pg_class, &tuple->t_self, tuple);
/* keep catalog indexes current */
CatalogUpdateIndexes(pg_class, tuple);
heap_freetuple(tuple);
}
heap_close(pg_class, RowExclusiveLock);
heap_close(rel, NoLock);
}
/*
* Execute ALTER TABLE SET SCHEMA
*/