diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 77372425163..876adab38e5 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1929,7 +1929,22 @@ init_rel_sync_cache(MemoryContext cachectx) Assert(RelationSyncCache != NULL); + /* We must update the cache entry for a relation after a relcache flush */ CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0); + + /* + * Flush all cache entries after a pg_namespace change, in case it was a + * schema rename affecting a relation being replicated. + */ + CacheRegisterSyscacheCallback(NAMESPACEOID, + rel_sync_cache_publication_cb, + (Datum) 0); + + /* + * Flush all cache entries after any publication changes. (We need no + * callback entry for pg_publication, because publication_invalidation_cb + * will take care of it.) + */ CacheRegisterSyscacheCallback(PUBLICATIONRELMAP, rel_sync_cache_publication_cb, (Datum) 0); @@ -2325,8 +2340,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) /* * Publication relation/schema map syscache invalidation callback * - * Called for invalidations on pg_publication, pg_publication_rel, and - * pg_publication_namespace. + * Called for invalidations on pg_publication, pg_publication_rel, + * pg_publication_namespace, and pg_namespace. */ static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) @@ -2337,14 +2352,14 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) /* * We can get here if the plugin was used in SQL interface as the * RelSchemaSyncCache is destroyed when the decoding finishes, but there - * is no way to unregister the relcache invalidation callback. + * is no way to unregister the invalidation callbacks. */ if (RelationSyncCache == NULL) return; /* - * There is no way to find which entry in our cache the hash belongs to so - * mark the whole cache as invalid. + * We have no easy way to identify which cache entries this invalidation + * event might have affected, so just mark them all invalid. */ hash_seq_init(&status, RelationSyncCache); while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL) diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl index 143caac792e..036576acab5 100644 --- a/src/test/subscription/t/100_bugs.pl +++ b/src/test/subscription/t/100_bugs.pl @@ -70,9 +70,10 @@ $node_publisher->wait_for_catchup('sub1'); pass('index predicates do not cause crash'); # We'll re-use these nodes below, so drop their replication state. -# We don't bother to drop the tables though. $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub1"); $node_publisher->safe_psql('postgres', "DROP PUBLICATION pub1"); +# Drop the tables too. +$node_publisher->safe_psql('postgres', "DROP TABLE tab1"); $node_publisher->stop('fast'); $node_subscriber->stop('fast'); @@ -307,6 +308,68 @@ is( $node_subscriber->safe_psql( qq(-1|1), "update works with REPLICA IDENTITY"); +# Clean up +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub"); +$node_publisher->safe_psql('postgres', "DROP TABLE tab_replidentity_index"); +$node_subscriber->safe_psql('postgres', "DROP TABLE tab_replidentity_index"); + +# Test schema invalidation by renaming the schema + +# Create tables on publisher +$node_publisher->safe_psql('postgres', "CREATE SCHEMA sch1"); +$node_publisher->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)"); + +# Create tables on subscriber +$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch1"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)"); +$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch2"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)"); + +# Setup logical replication that will cover t1 under both schema names +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_sch FOR ALL TABLES"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher_connstr' PUBLICATION tap_pub_sch" +); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch'); + +# Check what happens to data inserted before and after schema rename +$node_publisher->safe_psql( + 'postgres', + "begin; +insert into sch1.t1 values(1); +alter schema sch1 rename to sch2; +create schema sch1; +create table sch1.t1(c1 int); +insert into sch1.t1 values(2); +insert into sch2.t1 values(3); +commit;"); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch'); + +# Subscriber's sch1.t1 should receive the row inserted into the new sch1.t1, +# but not the row inserted into the old sch1.t1 post-rename. +my $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1"); +is( $result, qq(1 +2), 'check data in subscriber sch1.t1 after schema rename'); + +# Subscriber's sch2.t1 won't have gotten anything yet ... +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch2.t1"); +is($result, '', 'no data yet in subscriber sch2.t1 after schema rename'); + +# ... but it should show up after REFRESH. +$node_subscriber->safe_psql('postgres', + 'ALTER SUBSCRIPTION tap_sub_sch REFRESH PUBLICATION'); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch2.t1"); +is( $result, qq(1 +3), 'check data in subscriber sch2.t1 after schema rename'); + $node_publisher->stop('fast'); $node_subscriber->stop('fast');