1
0
mirror of https://github.com/postgres/postgres.git synced 2025-08-24 09:27:52 +03:00

Database-level collation version tracking

This adds to database objects the same version tracking that collation
objects have.  There is a new pg_database column datcollversion that
stores the version, a new function
pg_database_collation_actual_version() to get the version from the
operating system, and a new subcommand ALTER DATABASE ... REFRESH
COLLATION VERSION.

This was not originally added together with pg_collation.collversion,
since originally version tracking was only supported for ICU, and ICU
on a database-level is not currently supported.  But we now have
version tracking for glibc (since PG13), FreeBSD (since PG14), and
Windows (since PG13), so this is useful to have now.

Reviewed-by: Julien Rouhaud <rjuju123@gmail.com>
Discussion: https://www.postgresql.org/message-id/flat/f0ff3190-29a3-5b39-a179-fa32eee57db6%40enterprisedb.com
This commit is contained in:
Peter Eisentraut
2022-02-14 08:09:04 +01:00
parent 9898c5e03c
commit 37851a8b83
22 changed files with 367 additions and 15 deletions

View File

@@ -36,6 +36,7 @@
#include "catalog/indexing.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_database.h"
#include "catalog/pg_db_role_setting.h"
#include "catalog/pg_subscription.h"
@@ -85,7 +86,8 @@ static bool get_db_info(const char *name, LOCKMODE lockmode,
Oid *dbIdP, Oid *ownerIdP,
int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
Oid *dbTablespace, char **dbCollate, char **dbCtype);
Oid *dbTablespace, char **dbCollate, char **dbCtype,
char **dbCollversion);
static bool have_createdb_privilege(void);
static void remove_dbtablespaces(Oid db_id);
static bool check_db_file_conflict(Oid db_id);
@@ -105,6 +107,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
int src_encoding = -1;
char *src_collate = NULL;
char *src_ctype = NULL;
char *src_collversion = NULL;
bool src_istemplate;
bool src_allowconn;
TransactionId src_frozenxid = InvalidTransactionId;
@@ -128,6 +131,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
DefElem *distemplate = NULL;
DefElem *dallowconnections = NULL;
DefElem *dconnlimit = NULL;
DefElem *dcollversion = NULL;
char *dbname = stmt->dbname;
char *dbowner = NULL;
const char *dbtemplate = NULL;
@@ -138,6 +142,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
bool dbistemplate = false;
bool dballowconnections = true;
int dbconnlimit = -1;
char *dbcollversion = NULL;
int notherbackends;
int npreparedxacts;
createdb_failure_params fparms;
@@ -207,6 +212,12 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
errorConflictingDefElem(defel, pstate);
dconnlimit = defel;
}
else if (strcmp(defel->defname, "collation_version") == 0)
{
if (dcollversion)
errorConflictingDefElem(defel, pstate);
dcollversion = defel;
}
else if (strcmp(defel->defname, "location") == 0)
{
ereport(WARNING,
@@ -305,6 +316,8 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid connection limit: %d", dbconnlimit)));
}
if (dcollversion)
dbcollversion = defGetString(dcollversion);
/* obtain OID of proposed owner */
if (dbowner)
@@ -342,7 +355,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
&src_dboid, &src_owner, &src_encoding,
&src_istemplate, &src_allowconn,
&src_frozenxid, &src_minmxid, &src_deftablespace,
&src_collate, &src_ctype))
&src_collate, &src_ctype, &src_collversion))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("template database \"%s\" does not exist",
@@ -424,6 +437,52 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
}
/*
* If we got a collation version for the template database, check that it
* matches the actual OS collation version. Otherwise error; the user
* needs to fix the template database first. Don't complain if a
* collation version was specified explicitly as a statement option; that
* is used by pg_upgrade to reproduce the old state exactly.
*
* (If the template database has no collation version, then either the
* platform/provider does not support collation versioning, or it's
* template0, for which we stipulate that it does not contain
* collation-using objects.)
*/
if (src_collversion && !dcollversion)
{
char *actual_versionstr;
actual_versionstr = get_collation_actual_version(COLLPROVIDER_LIBC, dbcollate);
if (!actual_versionstr)
ereport(ERROR,
(errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined",
dbtemplate)));
if (strcmp(actual_versionstr, src_collversion) != 0)
ereport(ERROR,
(errmsg("template database \"%s\" has a collation version mismatch",
dbtemplate),
errdetail("The template database was created using collation version %s, "
"but the operating system provides version %s.",
src_collversion, actual_versionstr),
errhint("Rebuild all objects in the template database that use the default collation and run "
"ALTER DATABASE %s REFRESH COLLATION VERSION, "
"or build PostgreSQL with the right library version.",
quote_identifier(dbtemplate))));
}
if (dbcollversion == NULL)
dbcollversion = src_collversion;
/*
* Normally, we copy the collation version from the template database.
* This last resort only applies if the template database does not have a
* collation version, which is normally only the case for template0.
*/
if (dbcollversion == NULL)
dbcollversion = get_collation_actual_version(COLLPROVIDER_LIBC, dbcollate);
/* Resolve default tablespace for new database */
if (dtablespacename && dtablespacename->arg)
{
@@ -578,6 +637,10 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate);
new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype);
if (dbcollversion)
new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion);
else
new_record_nulls[Anum_pg_database_datcollversion - 1] = true;
/*
* We deliberately set datacl to default (NULL), rather than copying it
@@ -844,7 +907,7 @@ dropdb(const char *dbname, bool missing_ok, bool force)
pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
&db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL))
&db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
{
if (!missing_ok)
{
@@ -1046,7 +1109,7 @@ RenameDatabase(const char *oldname, const char *newname)
rel = table_open(DatabaseRelationId, RowExclusiveLock);
if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
NULL, NULL, NULL, NULL, NULL, NULL, NULL))
NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", oldname)));
@@ -1159,7 +1222,7 @@ movedb(const char *dbname, const char *tblspcname)
pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL))
NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", dbname)));
@@ -1651,6 +1714,88 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
}
/*
* ALTER DATABASE name REFRESH COLLATION VERSION
*/
ObjectAddress
AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
{
Relation rel;
ScanKeyData scankey;
SysScanDesc scan;
Oid db_id;
HeapTuple tuple;
Form_pg_database datForm;
ObjectAddress address;
Datum datum;
bool isnull;
char *oldversion;
char *newversion;
rel = table_open(DatabaseRelationId, RowExclusiveLock);
ScanKeyInit(&scankey,
Anum_pg_database_datname,
BTEqualStrategyNumber, F_NAMEEQ,
CStringGetDatum(stmt->dbname));
scan = systable_beginscan(rel, DatabaseNameIndexId, true,
NULL, 1, &scankey);
tuple = systable_getnext(scan);
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", stmt->dbname)));
datForm = (Form_pg_database) GETSTRUCT(tuple);
db_id = datForm->oid;
if (!pg_database_ownercheck(db_id, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
stmt->dbname);
datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
oldversion = isnull ? NULL : TextDatumGetCString(datum);
datum = heap_getattr(tuple, Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull);
Assert(!isnull);
newversion = get_collation_actual_version(COLLPROVIDER_LIBC, TextDatumGetCString(datum));
/* cannot change from NULL to non-NULL or vice versa */
if ((!oldversion && newversion) || (oldversion && !newversion))
elog(ERROR, "invalid collation version change");
else if (oldversion && newversion && strcmp(newversion, oldversion) != 0)
{
bool nulls[Natts_pg_database] = {0};
bool replaces[Natts_pg_database] = {0};
Datum values[Natts_pg_database] = {0};
ereport(NOTICE,
(errmsg("changing version from %s to %s",
oldversion, newversion)));
values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
replaces[Anum_pg_database_datcollversion - 1] = true;
tuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
values, nulls, replaces);
CatalogTupleUpdate(rel, &tuple->t_self, tuple);
heap_freetuple(tuple);
}
else
ereport(NOTICE,
(errmsg("version has not changed")));
InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
ObjectAddressSet(address, DatabaseRelationId, db_id);
systable_endscan(scan);
table_close(rel, NoLock);
return address;
}
/*
* ALTER DATABASE name SET ...
*/
@@ -1793,6 +1938,34 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
}
Datum
pg_database_collation_actual_version(PG_FUNCTION_ARGS)
{
Oid dbid = PG_GETARG_OID(0);
HeapTuple tp;
Datum datum;
bool isnull;
char *version;
tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
if (!HeapTupleIsValid(tp))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("database with OID %u does not exist", dbid)));
datum = SysCacheGetAttr(DATABASEOID, tp, Anum_pg_database_datcollate, &isnull);
Assert(!isnull);
version = get_collation_actual_version(COLLPROVIDER_LIBC, TextDatumGetCString(datum));
ReleaseSysCache(tp);
if (version)
PG_RETURN_TEXT_P(cstring_to_text(version));
else
PG_RETURN_NULL();
}
/*
* Helper functions
*/
@@ -1808,7 +1981,8 @@ get_db_info(const char *name, LOCKMODE lockmode,
Oid *dbIdP, Oid *ownerIdP,
int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
Oid *dbTablespace, char **dbCollate, char **dbCtype)
Oid *dbTablespace, char **dbCollate, char **dbCtype,
char **dbCollversion)
{
bool result = false;
Relation relation;
@@ -1913,6 +2087,14 @@ get_db_info(const char *name, LOCKMODE lockmode,
Assert(!isnull);
*dbCtype = TextDatumGetCString(datum);
}
if (dbCollversion)
{
datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull);
if (isnull)
*dbCollversion = NULL;
else
*dbCollversion = TextDatumGetCString(datum);
}
ReleaseSysCache(tuple);
result = true;
break;