diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 4007bb2baf0..105da98c9ba 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -16,6 +16,7 @@ #include "postgres.h" +#include "access/relation.h" #include "access/sysattr.h" #include "access/table.h" #include "catalog/namespace.h" @@ -77,7 +78,7 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) { if (entry->localreloid == reloid) { - entry->localreloid = InvalidOid; + entry->localrelvalid = false; hash_seq_term(&status); break; } @@ -91,7 +92,7 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) hash_seq_init(&status, LogicalRepRelMap); while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) - entry->localreloid = InvalidOid; + entry->localrelvalid = false; } } @@ -230,15 +231,13 @@ logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname) /* * Open the local relation associated with the remote one. * - * Optionally rebuilds the Relcache mapping if it was invalidated - * by local DDL. + * Rebuilds the Relcache mapping if it was invalidated by local DDL. */ LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) { LogicalRepRelMapEntry *entry; bool found; - Oid relid = InvalidOid; LogicalRepRelation *remoterel; if (LogicalRepRelMap == NULL) @@ -254,14 +253,45 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) remoterel = &entry->remoterel; + /* Ensure we don't leak a relcache refcount. */ + if (entry->localrel) + elog(ERROR, "remote relation ID %u is already open", remoteid); + /* * When opening and locking a relation, pending invalidation messages are - * processed which can invalidate the relation. We need to update the - * local cache both when we are first time accessing the relation and when - * the relation is invalidated (aka entry->localreloid is set InvalidOid). + * processed which can invalidate the relation. Hence, if the entry is + * currently considered valid, try to open the local relation by OID and + * see if invalidation ensues. */ - if (!OidIsValid(entry->localreloid)) + if (entry->localrelvalid) { + entry->localrel = try_relation_open(entry->localreloid, lockmode); + if (!entry->localrel) + { + /* Table was renamed or dropped. */ + entry->localrelvalid = false; + } + else if (!entry->localrelvalid) + { + /* Note we release the no-longer-useful lock here. */ + table_close(entry->localrel, lockmode); + entry->localrel = NULL; + } + } + + /* + * If the entry has been marked invalid since we last had lock on it, + * re-open the local relation by name and rebuild all derived data. + */ + if (!entry->localrelvalid) + { + Oid relid; + int found; + Bitmapset *idkey; + TupleDesc desc; + MemoryContext oldctx; + int i; + /* Try to find and lock the relation by name. */ relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname, remoterel->relname, -1), @@ -272,21 +302,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) errmsg("logical replication target relation \"%s.%s\" does not exist", remoterel->nspname, remoterel->relname))); entry->localrel = table_open(relid, NoLock); - - } - else - { - relid = entry->localreloid; - entry->localrel = table_open(entry->localreloid, lockmode); - } - - if (!OidIsValid(entry->localreloid)) - { - int found; - Bitmapset *idkey; - TupleDesc desc; - MemoryContext oldctx; - int i; + entry->localreloid = relid; /* Check for supported relkind. */ CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind, @@ -380,7 +396,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) } } - entry->localreloid = relid; + entry->localrelvalid = true; } if (entry->state != SUBREL_STATE_READY) @@ -523,7 +539,7 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) { if (entry->localreloid == reloid) { - entry->localreloid = InvalidOid; + entry->localrelvalid = false; hash_seq_term(&status); break; } @@ -537,7 +553,7 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) hash_seq_init(&status, LogicalRepPartMap); while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) - entry->localreloid = InvalidOid; + entry->localrelvalid = false; } } @@ -656,6 +672,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, entry->updatable = root->updatable; + entry->localrelvalid = true; + /* state and statelsn are left set to 0. */ MemoryContextSwitchTo(oldctx); diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index a6b44b12bd1..7e431af753f 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -19,14 +19,16 @@ typedef struct LogicalRepRelMapEntry { LogicalRepRelation remoterel; /* key is remoterel.remoteid */ - /* Mapping to local relation, filled as needed. */ + /* Mapping to local relation. */ Oid localreloid; /* local relation id */ - Relation localrel; /* relcache entry */ + Relation localrel; /* relcache entry (NULL when closed) */ AttrMap *attrmap; /* map of local attributes to remote ones */ bool updatable; /* Can apply updates/deletes? */ /* Sync state. */ char state; + /* Validity flag ... inserted here to avoid ABI break in back branches. */ + bool localrelvalid; XLogRecPtr statelsn; } LogicalRepRelMapEntry;