1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-16 06:01:02 +03:00

Implement ALTER DATABASE SET TABLESPACE to move a whole database (or at least

as much of it as lives in its default tablespace) to a new tablespace.

Guillaume Lelarge, with some help from Bernd Helmle and Tom Lane
This commit is contained in:
Tom Lane
2008-11-07 18:25:07 +00:00
parent 85e2cedf98
commit 6517f377d6
8 changed files with 437 additions and 13 deletions

View File

@ -13,7 +13,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.215 2008/11/02 01:45:27 tgl Exp $
* $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.216 2008/11/07 18:25:06 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -42,6 +42,7 @@
#include "pgstat.h"
#include "postmaster/bgwriter.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/ipc.h"
#include "storage/procarray.h"
@ -53,6 +54,7 @@
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/pg_locale.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/tqual.h"
@ -63,8 +65,16 @@ typedef struct
Oid dest_dboid; /* DB we are trying to create */
} createdb_failure_params;
typedef struct
{
Oid dest_dboid; /* DB we are trying to move */
Oid dest_tsoid; /* tablespace we are trying to move to */
} movedb_failure_params;
/* non-export function prototypes */
static void createdb_failure_callback(int code, Datum arg);
static void movedb(const char *dbname, const char *tblspcname);
static void movedb_failure_callback(int code, Datum arg);
static bool get_db_info(const char *name, LOCKMODE lockmode,
Oid *dbIdP, Oid *ownerIdP,
int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
@ -934,11 +944,330 @@ RenameDatabase(const char *oldname, const char *newname)
}
/*
* ALTER DATABASE SET TABLESPACE
*/
static void
movedb(const char *dbname, const char *tblspcname)
{
Oid db_id;
Relation pgdbrel;
int notherbackends;
int npreparedxacts;
HeapTuple oldtuple, newtuple;
Oid src_tblspcoid, dst_tblspcoid;
Datum new_record[Natts_pg_database];
bool new_record_nulls[Natts_pg_database];
bool new_record_repl[Natts_pg_database];
ScanKeyData scankey;
SysScanDesc sysscan;
AclResult aclresult;
char *src_dbpath;
char *dst_dbpath;
DIR *dstdir;
struct dirent *xlde;
movedb_failure_params fparms;
/*
* Look up the target database's OID, and get exclusive lock on it. We
* need this to ensure that no new backend starts up in the database while
* we are moving it, and that no one is using it as a CREATE DATABASE
* template or trying to delete it.
*/
pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", dbname)));
/*
* We actually need a session lock, so that the lock will persist across
* the commit/restart below. (We could almost get away with letting the
* lock be released at commit, except that someone could try to move
* relations of the DB back into the old directory while we rmtree() it.)
*/
LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
AccessExclusiveLock);
/*
* Permission checks
*/
if (!pg_database_ownercheck(db_id, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
dbname);
/*
* Obviously can't move the tables of my own database
*/
if (db_id == MyDatabaseId)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("cannot change the tablespace of the currently open database")));
/*
* Get tablespace's oid
*/
dst_tblspcoid = get_tablespace_oid(tblspcname);
if (dst_tblspcoid == InvalidOid)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("tablespace \"%s\" does not exist", tblspcname)));
/*
* Permission checks
*/
aclresult = pg_tablespace_aclcheck(dst_tblspcoid, GetUserId(),
ACL_CREATE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
tblspcname);
/*
* pg_global must never be the default tablespace
*/
if (dst_tblspcoid == GLOBALTABLESPACE_OID)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("pg_global cannot be used as default tablespace")));
/*
* No-op if same tablespace
*/
if (src_tblspcoid == dst_tblspcoid)
{
heap_close(pgdbrel, NoLock);
UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
AccessExclusiveLock);
return;
}
/*
* Check for other backends in the target database. (Because we hold the
* database lock, no new ones can start after this.)
*
* As in CREATE DATABASE, check this after other error conditions.
*/
if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("database \"%s\" is being accessed by other users",
dbname),
errdetail_busy_db(notherbackends, npreparedxacts)));
/*
* Get old and new database paths
*/
src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
/*
* Force a checkpoint before proceeding. This will force dirty buffers out
* to disk, to ensure source database is up-to-date on disk for the
* copy. FlushDatabaseBuffers() would suffice for that, but we also want
* to process any pending unlink requests. Otherwise, the check for
* existing files in the target directory might fail unnecessarily, not to
* mention that the copy might fail due to source files getting deleted
* under it. On Windows, this also ensures that the bgwriter doesn't hold
* any open files, which would cause rmdir() to fail.
*/
RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
/*
* Check for existence of files in the target directory, i.e., objects of
* this database that are already in the target tablespace. We can't
* allow the move in such a case, because we would need to change those
* relations' pg_class.reltablespace entries to zero, and we don't have
* access to the DB's pg_class to do so.
*/
dstdir = AllocateDir(dst_dbpath);
if (dstdir != NULL)
{
while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
{
if (strcmp(xlde->d_name, ".") == 0 ||
strcmp(xlde->d_name, "..") == 0)
continue;
ereport(ERROR,
(errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
dbname, tblspcname),
errhint("You must move them back to the database's default tablespace before using this command.")));
}
FreeDir(dstdir);
/*
* The directory exists but is empty.
* We must remove it before using the copydir function.
*/
if (rmdir(dst_dbpath) != 0)
elog(ERROR, "could not remove directory \"%s\": %m",
dst_dbpath);
}
/*
* Use an ENSURE block to make sure we remove the debris if the copy fails
* (eg, due to out-of-disk-space). This is not a 100% solution, because
* of the possibility of failure during transaction commit, but it should
* handle most scenarios.
*/
fparms.dest_dboid = db_id;
fparms.dest_tsoid = dst_tblspcoid;
PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
PointerGetDatum(&fparms));
{
/*
* Copy files from the old tablespace to the new one
*/
copydir(src_dbpath, dst_dbpath, false);
/*
* Record the filesystem change in XLOG
*/
{
xl_dbase_create_rec xlrec;
XLogRecData rdata[1];
xlrec.db_id = db_id;
xlrec.tablespace_id = dst_tblspcoid;
xlrec.src_db_id = db_id;
xlrec.src_tablespace_id = src_tblspcoid;
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(xl_dbase_create_rec);
rdata[0].buffer = InvalidBuffer;
rdata[0].next = NULL;
(void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
}
/*
* Update the database's pg_database tuple
*/
ScanKeyInit(&scankey,
Anum_pg_database_datname,
BTEqualStrategyNumber, F_NAMEEQ,
NameGetDatum(dbname));
sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
SnapshotNow, 1, &scankey);
oldtuple = systable_getnext(sysscan);
if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", dbname)));
MemSet(new_record, 0, sizeof(new_record));
MemSet(new_record_nulls, false, sizeof(new_record_nulls));
MemSet(new_record_repl, false, sizeof(new_record_repl));
new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
new_record_repl[Anum_pg_database_dattablespace - 1] = true;
newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
new_record,
new_record_nulls, new_record_repl);
simple_heap_update(pgdbrel, &oldtuple->t_self, newtuple);
/* Update indexes */
CatalogUpdateIndexes(pgdbrel, newtuple);
systable_endscan(sysscan);
/*
* Force another checkpoint here. As in CREATE DATABASE, this is to
* ensure that we don't have to replay a committed XLOG_DBASE_CREATE
* operation, which would cause us to lose any unlogged operations
* done in the new DB tablespace before the next checkpoint.
*/
RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
/*
* Set flag to update flat database file at commit. Note: this also
* forces synchronous commit, which minimizes the window between
* copying the database files and commital of the transaction. If we
* crash before committing, we'll leave an orphaned set of files on
* disk, which is not fatal but not good either.
*/
database_file_update_needed();
/*
* Close pg_database, but keep lock till commit (this is important to
* prevent any risk of deadlock failure while updating flat file)
*/
heap_close(pgdbrel, NoLock);
}
PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
PointerGetDatum(&fparms));
/*
* Commit the transaction so that the pg_database update is committed.
* If we crash while removing files, the database won't be corrupt,
* we'll just leave some orphaned files in the old directory.
*
* (This is OK because we know we aren't inside a transaction block.)
*
* XXX would it be safe/better to do this inside the ensure block? Not
* convinced it's a good idea; consider elog just after the transaction
* really commits.
*/
PopActiveSnapshot();
CommitTransactionCommand();
/* Start new transaction for the remaining work; don't need a snapshot */
StartTransactionCommand();
/*
* Remove files from the old tablespace
*/
if (!rmtree(src_dbpath, true))
ereport(WARNING,
(errmsg("some useless files may be left behind in old database directory \"%s\"",
src_dbpath)));
/*
* Record the filesystem change in XLOG
*/
{
xl_dbase_drop_rec xlrec;
XLogRecData rdata[1];
xlrec.db_id = db_id;
xlrec.tablespace_id = src_tblspcoid;
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(xl_dbase_drop_rec);
rdata[0].buffer = InvalidBuffer;
rdata[0].next = NULL;
(void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
}
/* Now it's safe to release the database lock */
UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
AccessExclusiveLock);
}
/* Error cleanup callback for movedb */
static void
movedb_failure_callback(int code, Datum arg)
{
movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
char *dstpath;
/* Get rid of anything we managed to copy to the target directory */
dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
(void) rmtree(dstpath, true);
}
/*
* ALTER DATABASE name ...
*/
void
AlterDatabase(AlterDatabaseStmt *stmt)
AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
{
Relation rel;
HeapTuple tuple,
@ -948,6 +1277,7 @@ AlterDatabase(AlterDatabaseStmt *stmt)
ListCell *option;
int connlimit = -1;
DefElem *dconnlimit = NULL;
DefElem *dtablespace = NULL;
Datum new_record[Natts_pg_database];
bool new_record_nulls[Natts_pg_database];
bool new_record_repl[Natts_pg_database];
@ -965,11 +1295,29 @@ AlterDatabase(AlterDatabaseStmt *stmt)
errmsg("conflicting or redundant options")));
dconnlimit = defel;
}
else if (strcmp(defel->defname, "tablespace") == 0)
{
if (dtablespace)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
dtablespace = defel;
}
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
}
if (dtablespace)
{
/* currently, can't be specified along with any other options */
Assert(!dconnlimit);
/* this case isn't allowed within a transaction block */
PreventTransactionChain(isTopLevel, "ALTER DATABASE SET TABLESPACE");
movedb(stmt->dbname, strVal(dtablespace->arg));
return;
}
if (dconnlimit)
connlimit = intVal(dconnlimit->arg);