1
0
mirror of https://github.com/postgres/postgres.git synced 2025-08-28 18:48:04 +03:00

For inplace update durability, make heap_update() callers wait.

The previous commit fixed some ways of losing an inplace update.  It
remained possible to lose one when a backend working toward a
heap_update() copied a tuple into memory just before inplace update of
that tuple.  In catalogs eligible for inplace update, use LOCKTAG_TUPLE
to govern admission to the steps of copying an old tuple, modifying it,
and issuing heap_update().  This includes MERGE commands.  To avoid
changing most of the pg_class DDL, don't require LOCKTAG_TUPLE when
holding a relation lock sufficient to exclude inplace updaters.
Back-patch to v12 (all supported versions).  In v13 and v12, "UPDATE
pg_class" or "UPDATE pg_database" can still lose an inplace update.  The
v14+ UPDATE fix needs commit 86dc90056d,
and it wasn't worth reimplementing that fix without such infrastructure.

Reviewed by Nitin Motiani and (in earlier versions) Heikki Linnakangas.

Discussion: https://postgr.es/m/20231027214946.79.nmisch@google.com
This commit is contained in:
Noah Misch
2024-09-24 15:25:18 -07:00
parent 8590c942c1
commit 5c837f8fa0
19 changed files with 490 additions and 49 deletions

View File

@@ -1750,6 +1750,7 @@ RenameDatabase(const char *oldname, const char *newname)
{
Oid db_id;
HeapTuple newtup;
ItemPointerData otid;
Relation rel;
int notherbackends;
int npreparedxacts;
@@ -1821,11 +1822,13 @@ RenameDatabase(const char *oldname, const char *newname)
errdetail_busy_db(notherbackends, npreparedxacts)));
/* rename */
newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
newtup = SearchSysCacheLockedCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
if (!HeapTupleIsValid(newtup))
elog(ERROR, "cache lookup failed for database %u", db_id);
otid = newtup->t_self;
namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
CatalogTupleUpdate(rel, &newtup->t_self, newtup);
CatalogTupleUpdate(rel, &otid, newtup);
UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
@@ -2073,6 +2076,7 @@ movedb(const char *dbname, const char *tblspcname)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", dbname)));
LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
MemSet(new_record, 0, sizeof(new_record));
MemSet(new_record_nulls, false, sizeof(new_record_nulls));
@@ -2085,6 +2089,7 @@ movedb(const char *dbname, const char *tblspcname)
new_record,
new_record_nulls, new_record_repl);
CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
@@ -2315,6 +2320,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", stmt->dbname)));
LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
datform = (Form_pg_database) GETSTRUCT(tuple);
dboid = datform->oid;
@@ -2368,6 +2374,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
new_record_nulls, new_record_repl);
CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
@@ -2417,6 +2424,7 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
if (!pg_database_ownercheck(db_id, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
stmt->dbname);
LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
oldversion = isnull ? NULL : TextDatumGetCString(datum);
@@ -2434,6 +2442,7 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
bool nulls[Natts_pg_database] = {0};
bool replaces[Natts_pg_database] = {0};
Datum values[Natts_pg_database] = {0};
HeapTuple newtuple;
ereport(NOTICE,
(errmsg("changing version from %s to %s",
@@ -2442,14 +2451,15 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
replaces[Anum_pg_database_datcollversion - 1] = true;
tuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
values, nulls, replaces);
CatalogTupleUpdate(rel, &tuple->t_self, tuple);
heap_freetuple(tuple);
newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
values, nulls, replaces);
CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
heap_freetuple(newtuple);
}
else
ereport(NOTICE,
(errmsg("version has not changed")));
UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
@@ -2561,6 +2571,8 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied to change owner of database")));
LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
memset(repl_null, false, sizeof(repl_null));
memset(repl_repl, false, sizeof(repl_repl));
@@ -2585,6 +2597,7 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
heap_freetuple(newtuple);

View File

@@ -4287,14 +4287,17 @@ update_relispartition(Oid relationId, bool newval)
{
HeapTuple tup;
Relation classRel;
ItemPointerData otid;
classRel = table_open(RelationRelationId, RowExclusiveLock);
tup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relationId));
tup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relationId));
if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for relation %u", relationId);
otid = tup->t_self;
Assert(((Form_pg_class) GETSTRUCT(tup))->relispartition != newval);
((Form_pg_class) GETSTRUCT(tup))->relispartition = newval;
CatalogTupleUpdate(classRel, &tup->t_self, tup);
CatalogTupleUpdate(classRel, &otid, tup);
UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
heap_freetuple(tup);
table_close(classRel, RowExclusiveLock);
}

View File

@@ -3354,6 +3354,7 @@ SetRelationTableSpace(Relation rel,
{
Relation pg_class;
HeapTuple tuple;
ItemPointerData otid;
Form_pg_class rd_rel;
Oid reloid = RelationGetRelid(rel);
@@ -3362,9 +3363,10 @@ SetRelationTableSpace(Relation rel,
/* Get a modifiable copy of the relation's pg_class row. */
pg_class = table_open(RelationRelationId, RowExclusiveLock);
tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(reloid));
tuple = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(reloid));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u", reloid);
otid = tuple->t_self;
rd_rel = (Form_pg_class) GETSTRUCT(tuple);
/* Update the pg_class row. */
@@ -3372,7 +3374,8 @@ SetRelationTableSpace(Relation rel,
InvalidOid : newTableSpaceId;
if (OidIsValid(newRelFileNode))
rd_rel->relfilenode = newRelFileNode;
CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
CatalogTupleUpdate(pg_class, &otid, tuple);
UnlockTuple(pg_class, &otid, InplaceUpdateTupleLock);
/*
* Record dependency on tablespace. This is only required for relations
@@ -3866,6 +3869,7 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bo
{
Relation targetrelation;
Relation relrelation; /* for RELATION relation */
ItemPointerData otid;
HeapTuple reltup;
Form_pg_class relform;
Oid namespaceId;
@@ -3888,7 +3892,8 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bo
*/
relrelation = table_open(RelationRelationId, RowExclusiveLock);
reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(myrelid));
reltup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(myrelid));
otid = reltup->t_self;
if (!HeapTupleIsValid(reltup)) /* shouldn't happen */
elog(ERROR, "cache lookup failed for relation %u", myrelid);
relform = (Form_pg_class) GETSTRUCT(reltup);
@@ -3915,7 +3920,8 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bo
*/
namestrcpy(&(relform->relname), newrelname);
CatalogTupleUpdate(relrelation, &reltup->t_self, reltup);
CatalogTupleUpdate(relrelation, &otid, reltup);
UnlockTuple(relrelation, &otid, InplaceUpdateTupleLock);
InvokeObjectPostAlterHookArg(RelationRelationId, myrelid, 0,
InvalidOid, is_internal);
@@ -14405,7 +14411,7 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
/* Fetch heap tuple */
relid = RelationGetRelid(rel);
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
tuple = SearchSysCacheLocked1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u", relid);
@@ -14509,6 +14515,7 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
repl_val, repl_null, repl_repl);
CatalogTupleUpdate(pgclass, &newtuple->t_self, newtuple);
UnlockTuple(pgclass, &tuple->t_self, InplaceUpdateTupleLock);
InvokeObjectPostAlterHook(RelationRelationId, RelationGetRelid(rel), 0);
@@ -16705,7 +16712,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
ObjectAddress thisobj;
bool already_done = false;
classTup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid));
/* no rel lock for relkind=c so use LOCKTAG_TUPLE */
classTup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relOid));
if (!HeapTupleIsValid(classTup))
elog(ERROR, "cache lookup failed for relation %u", relOid);
classForm = (Form_pg_class) GETSTRUCT(classTup);
@@ -16724,6 +16732,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
already_done = object_address_present(&thisobj, objsMoved);
if (!already_done && oldNspOid != newNspOid)
{
ItemPointerData otid = classTup->t_self;
/* check for duplicate name (more friendly than unique-index failure) */
if (get_relname_relid(NameStr(classForm->relname),
newNspOid) != InvalidOid)
@@ -16736,7 +16746,9 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
/* classTup is a copy, so OK to scribble on */
classForm->relnamespace = newNspOid;
CatalogTupleUpdate(classRel, &classTup->t_self, classTup);
CatalogTupleUpdate(classRel, &otid, classTup);
UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
/* Update dependency on schema if caller said so */
if (hasDependEntry &&
@@ -16748,6 +16760,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
elog(ERROR, "failed to change schema dependency for relation \"%s\"",
NameStr(classForm->relname));
}
else
UnlockTuple(classRel, &classTup->t_self, InplaceUpdateTupleLock);
if (!already_done)
{
add_exact_object_address(&thisobj, objsMoved);