1
0
mirror of https://github.com/postgres/postgres.git synced 2025-05-20 05:13:53 +03:00
Tom Lane 32ebb35128 Fix logical replication's ideas about which type OIDs are built-in.
Only hand-assigned type OIDs should be presumed to match across different
PG servers; those assigned during genbki.pl or during initdb are likely
to change due to addition or removal of unrelated objects.

This means that the cutoff should be FirstGenbkiObjectId (in HEAD)
or FirstBootstrapObjectId (before that), not FirstNormalObjectId.
Compare postgres_fdw's is_builtin() test.

It's likely that this error has no observable consequence in a
normally-functioning system, since ATM the only affected type OIDs are
system catalog rowtypes and information_schema types, which would not
typically be interesting for logical replication.  But you could
probably break it if you tried hard, so back-patch.

Discussion: https://postgr.es/m/15150.1557257111@sss.pgh.pa.us
2019-05-13 17:23:00 -04:00

462 lines
12 KiB
C

/*-------------------------------------------------------------------------
* relation.c
* PostgreSQL logical replication
*
* Copyright (c) 2016-2019, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/logical/relation.c
*
* NOTES
* This file contains helper functions for logical replication relation
* mapping cache.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/sysattr.h"
#include "access/table.h"
#include "catalog/namespace.h"
#include "catalog/pg_subscription_rel.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "replication/logicalrelation.h"
#include "replication/worker_internal.h"
#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
static MemoryContext LogicalRepRelMapContext = NULL;
static HTAB *LogicalRepRelMap = NULL;
static HTAB *LogicalRepTypMap = NULL;
/*
* Relcache invalidation callback for our relation map cache.
*/
static void
logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
{
LogicalRepRelMapEntry *entry;
/* Just to be sure. */
if (LogicalRepRelMap == NULL)
return;
if (reloid != InvalidOid)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, LogicalRepRelMap);
/* TODO, use inverse lookup hashtable? */
while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
{
if (entry->localreloid == reloid)
{
entry->localreloid = InvalidOid;
hash_seq_term(&status);
break;
}
}
}
else
{
/* invalidate all cache entries */
HASH_SEQ_STATUS status;
hash_seq_init(&status, LogicalRepRelMap);
while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
entry->localreloid = InvalidOid;
}
}
/*
* Initialize the relation map cache.
*/
static void
logicalrep_relmap_init(void)
{
HASHCTL ctl;
if (!LogicalRepRelMapContext)
LogicalRepRelMapContext =
AllocSetContextCreate(CacheMemoryContext,
"LogicalRepRelMapContext",
ALLOCSET_DEFAULT_SIZES);
/* Initialize the relation hash table. */
MemSet(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(LogicalRepRelId);
ctl.entrysize = sizeof(LogicalRepRelMapEntry);
ctl.hcxt = LogicalRepRelMapContext;
LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
/* Initialize the type hash table. */
MemSet(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(Oid);
ctl.entrysize = sizeof(LogicalRepTyp);
ctl.hcxt = LogicalRepRelMapContext;
/* This will usually be small. */
LogicalRepTypMap = hash_create("logicalrep type map cache", 2, &ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
(Datum) 0);
}
/*
* Free the entry of a relation map cache.
*/
static void
logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
{
LogicalRepRelation *remoterel;
remoterel = &entry->remoterel;
pfree(remoterel->nspname);
pfree(remoterel->relname);
if (remoterel->natts > 0)
{
int i;
for (i = 0; i < remoterel->natts; i++)
pfree(remoterel->attnames[i]);
pfree(remoterel->attnames);
pfree(remoterel->atttyps);
}
bms_free(remoterel->attkeys);
if (entry->attrmap)
pfree(entry->attrmap);
}
/*
* Add new entry or update existing entry in the relation map cache.
*
* Called when new relation mapping is sent by the publisher to update
* our expected view of incoming data from said publisher.
*/
void
logicalrep_relmap_update(LogicalRepRelation *remoterel)
{
MemoryContext oldctx;
LogicalRepRelMapEntry *entry;
bool found;
int i;
if (LogicalRepRelMap == NULL)
logicalrep_relmap_init();
/*
* HASH_ENTER returns the existing entry if present or creates a new one.
*/
entry = hash_search(LogicalRepRelMap, (void *) &remoterel->remoteid,
HASH_ENTER, &found);
if (found)
logicalrep_relmap_free_entry(entry);
memset(entry, 0, sizeof(LogicalRepRelMapEntry));
/* Make cached copy of the data */
oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
entry->remoterel.remoteid = remoterel->remoteid;
entry->remoterel.nspname = pstrdup(remoterel->nspname);
entry->remoterel.relname = pstrdup(remoterel->relname);
entry->remoterel.natts = remoterel->natts;
entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
for (i = 0; i < remoterel->natts; i++)
{
entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
entry->remoterel.atttyps[i] = remoterel->atttyps[i];
}
entry->remoterel.replident = remoterel->replident;
entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
MemoryContextSwitchTo(oldctx);
}
/*
* Find attribute index in TupleDesc struct by attribute name.
*
* Returns -1 if not found.
*/
static int
logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
{
int i;
for (i = 0; i < remoterel->natts; i++)
{
if (strcmp(remoterel->attnames[i], attname) == 0)
return i;
}
return -1;
}
/*
* Open the local relation associated with the remote one.
*
* Optionally rebuilds the Relcache mapping if it was invalidated
* by local DDL.
*/
LogicalRepRelMapEntry *
logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
{
LogicalRepRelMapEntry *entry;
bool found;
if (LogicalRepRelMap == NULL)
logicalrep_relmap_init();
/* Search for existing entry. */
entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
HASH_FIND, &found);
if (!found)
elog(ERROR, "no relation map entry for remote relation ID %u",
remoteid);
/* Need to update the local cache? */
if (!OidIsValid(entry->localreloid))
{
Oid relid;
int i;
int found;
Bitmapset *idkey;
TupleDesc desc;
LogicalRepRelation *remoterel;
MemoryContext oldctx;
remoterel = &entry->remoterel;
/* Try to find and lock the relation by name. */
relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname,
remoterel->relname, -1),
lockmode, true);
if (!OidIsValid(relid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication target relation \"%s.%s\" does not exist",
remoterel->nspname, remoterel->relname)));
entry->localrel = table_open(relid, NoLock);
/* Check for supported relkind. */
CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
remoterel->nspname, remoterel->relname);
/*
* Build the mapping of local attribute numbers to remote attribute
* numbers and validate that we don't miss any replicated columns as
* that would result in potentially unwanted data loss.
*/
desc = RelationGetDescr(entry->localrel);
oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
entry->attrmap = palloc(desc->natts * sizeof(int));
MemoryContextSwitchTo(oldctx);
found = 0;
for (i = 0; i < desc->natts; i++)
{
int attnum;
Form_pg_attribute attr = TupleDescAttr(desc, i);
if (attr->attisdropped || attr->attgenerated)
{
entry->attrmap[i] = -1;
continue;
}
attnum = logicalrep_rel_att_by_name(remoterel,
NameStr(attr->attname));
entry->attrmap[i] = attnum;
if (attnum >= 0)
found++;
}
/* TODO, detail message with names of missing columns */
if (found < remoterel->natts)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication target relation \"%s.%s\" is missing "
"some replicated columns",
remoterel->nspname, remoterel->relname)));
/*
* Check that replica identity matches. We allow for stricter replica
* identity (fewer columns) on subscriber as that will not stop us
* from finding unique tuple. IE, if publisher has identity
* (id,timestamp) and subscriber just (id) this will not be a problem,
* but in the opposite scenario it will.
*
* Don't throw any error here just mark the relation entry as not
* updatable, as replica identity is only for updates and deletes but
* inserts can be replicated even without it.
*/
entry->updatable = true;
idkey = RelationGetIndexAttrBitmap(entry->localrel,
INDEX_ATTR_BITMAP_IDENTITY_KEY);
/* fallback to PK if no replica identity */
if (idkey == NULL)
{
idkey = RelationGetIndexAttrBitmap(entry->localrel,
INDEX_ATTR_BITMAP_PRIMARY_KEY);
/*
* If no replica identity index and no PK, the published table
* must have replica identity FULL.
*/
if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
entry->updatable = false;
}
i = -1;
while ((i = bms_next_member(idkey, i)) >= 0)
{
int attnum = i + FirstLowInvalidHeapAttributeNumber;
if (!AttrNumberIsForUserDefinedAttr(attnum))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication target relation \"%s.%s\" uses "
"system columns in REPLICA IDENTITY index",
remoterel->nspname, remoterel->relname)));
attnum = AttrNumberGetAttrOffset(attnum);
if (!bms_is_member(entry->attrmap[attnum], remoterel->attkeys))
{
entry->updatable = false;
break;
}
}
entry->localreloid = relid;
}
else
entry->localrel = table_open(entry->localreloid, lockmode);
if (entry->state != SUBREL_STATE_READY)
entry->state = GetSubscriptionRelState(MySubscription->oid,
entry->localreloid,
&entry->statelsn,
true);
return entry;
}
/*
* Close the previously opened logical relation.
*/
void
logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
{
table_close(rel->localrel, lockmode);
rel->localrel = NULL;
}
/*
* Free the type map cache entry data.
*/
static void
logicalrep_typmap_free_entry(LogicalRepTyp *entry)
{
pfree(entry->nspname);
pfree(entry->typname);
}
/*
* Add new entry or update existing entry in the type map cache.
*/
void
logicalrep_typmap_update(LogicalRepTyp *remotetyp)
{
MemoryContext oldctx;
LogicalRepTyp *entry;
bool found;
if (LogicalRepTypMap == NULL)
logicalrep_relmap_init();
/*
* HASH_ENTER returns the existing entry if present or creates a new one.
*/
entry = hash_search(LogicalRepTypMap, (void *) &remotetyp->remoteid,
HASH_ENTER, &found);
if (found)
logicalrep_typmap_free_entry(entry);
/* Make cached copy of the data */
entry->remoteid = remotetyp->remoteid;
oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
entry->nspname = pstrdup(remotetyp->nspname);
entry->typname = pstrdup(remotetyp->typname);
MemoryContextSwitchTo(oldctx);
}
/*
* Fetch type name from the cache by remote type OID.
*
* Return a substitute value if we cannot find the data type; no message is
* sent to the log in that case, because this is used by error callback
* already.
*/
char *
logicalrep_typmap_gettypname(Oid remoteid)
{
LogicalRepTyp *entry;
bool found;
/* Internal types are mapped directly. */
if (remoteid < FirstGenbkiObjectId)
{
if (!get_typisdefined(remoteid))
{
/*
* This can be caused by having a publisher with a higher
* PostgreSQL major version than the subscriber.
*/
return psprintf("unrecognized %u", remoteid);
}
return format_type_be(remoteid);
}
if (LogicalRepTypMap == NULL)
{
/*
* If the typemap is not initialized yet, we cannot possibly attempt
* to search the hash table; but there's no way we know the type
* locally yet, since we haven't received a message about this type,
* so this is the best we can do.
*/
return psprintf("unrecognized %u", remoteid);
}
/* search the mapping */
entry = hash_search(LogicalRepTypMap, (void *) &remoteid,
HASH_FIND, &found);
if (!found)
return psprintf("unrecognized %u", remoteid);
Assert(OidIsValid(entry->remoteid));
return psprintf("%s.%s", entry->nspname, entry->typname);
}