1
0
mirror of https://github.com/postgres/postgres.git synced 2025-09-03 15:22:11 +03:00

For inplace update durability, make heap_update() callers wait.

The previous commit fixed some ways of losing an inplace update.  It
remained possible to lose one when a backend working toward a
heap_update() copied a tuple into memory just before inplace update of
that tuple.  In catalogs eligible for inplace update, use LOCKTAG_TUPLE
to govern admission to the steps of copying an old tuple, modifying it,
and issuing heap_update().  This includes MERGE commands.  To avoid
changing most of the pg_class DDL, don't require LOCKTAG_TUPLE when
holding a relation lock sufficient to exclude inplace updaters.
Back-patch to v12 (all supported versions).  In v13 and v12, "UPDATE
pg_class" or "UPDATE pg_database" can still lose an inplace update.  The
v14+ UPDATE fix needs commit 86dc90056d,
and it wasn't worth reimplementing that fix without such infrastructure.

Reviewed by Nitin Motiani and (in earlier versions) Heikki Linnakangas.

Discussion: https://postgr.es/m/20231027214946.79.nmisch@google.com
This commit is contained in:
Noah Misch
2024-09-24 15:25:18 -07:00
parent a8ad1929d2
commit 14c57cb639
17 changed files with 423 additions and 34 deletions

View File

@@ -1053,6 +1053,7 @@ RenameDatabase(const char *oldname, const char *newname)
{
Oid db_id;
HeapTuple newtup;
ItemPointerData otid;
Relation rel;
int notherbackends;
int npreparedxacts;
@@ -1124,11 +1125,13 @@ RenameDatabase(const char *oldname, const char *newname)
errdetail_busy_db(notherbackends, npreparedxacts)));
/* rename */
newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
newtup = SearchSysCacheLockedCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
if (!HeapTupleIsValid(newtup))
elog(ERROR, "cache lookup failed for database %u", db_id);
otid = newtup->t_self;
namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
CatalogTupleUpdate(rel, &newtup->t_self, newtup);
CatalogTupleUpdate(rel, &otid, newtup);
UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
@@ -1372,6 +1375,7 @@ movedb(const char *dbname, const char *tblspcname)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", dbname)));
LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
MemSet(new_record, 0, sizeof(new_record));
MemSet(new_record_nulls, false, sizeof(new_record_nulls));
@@ -1384,6 +1388,7 @@ movedb(const char *dbname, const char *tblspcname)
new_record,
new_record_nulls, new_record_repl);
CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
@@ -1620,6 +1625,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", stmt->dbname)));
LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
datform = (Form_pg_database) GETSTRUCT(tuple);
dboid = datform->oid;
@@ -1673,6 +1679,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
new_record_nulls, new_record_repl);
CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
@@ -1783,6 +1790,8 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied to change owner of database")));
LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
memset(repl_null, false, sizeof(repl_null));
memset(repl_repl, false, sizeof(repl_repl));
@@ -1807,6 +1816,7 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
heap_freetuple(newtuple);

View File

@@ -3816,14 +3816,17 @@ update_relispartition(Oid relationId, bool newval)
{
HeapTuple tup;
Relation classRel;
ItemPointerData otid;
classRel = table_open(RelationRelationId, RowExclusiveLock);
tup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relationId));
tup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relationId));
if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for relation %u", relationId);
otid = tup->t_self;
Assert(((Form_pg_class) GETSTRUCT(tup))->relispartition != newval);
((Form_pg_class) GETSTRUCT(tup))->relispartition = newval;
CatalogTupleUpdate(classRel, &tup->t_self, tup);
CatalogTupleUpdate(classRel, &otid, tup);
UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
heap_freetuple(tup);
table_close(classRel, RowExclusiveLock);
}

View File

@@ -3143,6 +3143,7 @@ SetRelationTableSpace(Relation rel,
{
Relation pg_class;
HeapTuple tuple;
ItemPointerData otid;
Form_pg_class rd_rel;
Oid reloid = RelationGetRelid(rel);
@@ -3151,9 +3152,10 @@ SetRelationTableSpace(Relation rel,
/* Get a modifiable copy of the relation's pg_class row. */
pg_class = table_open(RelationRelationId, RowExclusiveLock);
tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(reloid));
tuple = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(reloid));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u", reloid);
otid = tuple->t_self;
rd_rel = (Form_pg_class) GETSTRUCT(tuple);
/* Update the pg_class row. */
@@ -3161,7 +3163,8 @@ SetRelationTableSpace(Relation rel,
InvalidOid : newTableSpaceId;
if (OidIsValid(newRelFileNode))
rd_rel->relfilenode = newRelFileNode;
CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
CatalogTupleUpdate(pg_class, &otid, tuple);
UnlockTuple(pg_class, &otid, InplaceUpdateTupleLock);
/*
* Record dependency on tablespace. This is only required for relations
@@ -3655,6 +3658,7 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bo
{
Relation targetrelation;
Relation relrelation; /* for RELATION relation */
ItemPointerData otid;
HeapTuple reltup;
Form_pg_class relform;
Oid namespaceId;
@@ -3677,7 +3681,8 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bo
*/
relrelation = table_open(RelationRelationId, RowExclusiveLock);
reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(myrelid));
reltup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(myrelid));
otid = reltup->t_self;
if (!HeapTupleIsValid(reltup)) /* shouldn't happen */
elog(ERROR, "cache lookup failed for relation %u", myrelid);
relform = (Form_pg_class) GETSTRUCT(reltup);
@@ -3704,7 +3709,8 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bo
*/
namestrcpy(&(relform->relname), newrelname);
CatalogTupleUpdate(relrelation, &reltup->t_self, reltup);
CatalogTupleUpdate(relrelation, &otid, reltup);
UnlockTuple(relrelation, &otid, InplaceUpdateTupleLock);
InvokeObjectPostAlterHookArg(RelationRelationId, myrelid, 0,
InvalidOid, is_internal);
@@ -13488,7 +13494,7 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
/* Fetch heap tuple */
relid = RelationGetRelid(rel);
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
tuple = SearchSysCacheLocked1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u", relid);
@@ -13591,6 +13597,7 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
repl_val, repl_null, repl_repl);
CatalogTupleUpdate(pgclass, &newtuple->t_self, newtuple);
UnlockTuple(pgclass, &tuple->t_self, InplaceUpdateTupleLock);
InvokeObjectPostAlterHook(RelationRelationId, RelationGetRelid(rel), 0);
@@ -15659,7 +15666,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
ObjectAddress thisobj;
bool already_done = false;
classTup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid));
/* no rel lock for relkind=c so use LOCKTAG_TUPLE */
classTup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relOid));
if (!HeapTupleIsValid(classTup))
elog(ERROR, "cache lookup failed for relation %u", relOid);
classForm = (Form_pg_class) GETSTRUCT(classTup);
@@ -15678,6 +15686,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
already_done = object_address_present(&thisobj, objsMoved);
if (!already_done && oldNspOid != newNspOid)
{
ItemPointerData otid = classTup->t_self;
/* check for duplicate name (more friendly than unique-index failure) */
if (get_relname_relid(NameStr(classForm->relname),
newNspOid) != InvalidOid)
@@ -15690,7 +15700,9 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
/* classTup is a copy, so OK to scribble on */
classForm->relnamespace = newNspOid;
CatalogTupleUpdate(classRel, &classTup->t_self, classTup);
CatalogTupleUpdate(classRel, &otid, classTup);
UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
/* Update dependency on schema if caller said so */
if (hasDependEntry &&
@@ -15702,6 +15714,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
elog(ERROR, "failed to change schema dependency for relation \"%s\"",
NameStr(classForm->relname));
}
else
UnlockTuple(classRel, &classTup->t_self, InplaceUpdateTupleLock);
if (!already_done)
{
add_exact_object_address(&thisobj, objsMoved);