diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index df2ea94d468..9fd879ee0c8 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -65,6 +65,13 @@ static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, static bool publications_valid; static bool in_streaming; +/* + * Private memory context for publication data, created in + * PGOutputData->context when starting pgoutput, and set to NULL when its + * parent context is reset via a dedicated MemoryContextCallback. + */ +static MemoryContext pubctx = NULL; + static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); @@ -252,6 +259,15 @@ parse_output_parameters(List *options, PGOutputData *data) } } +/* + * Callback of PGOutputData->context in charge of cleaning pubctx. + */ +static void +pgoutput_pubctx_reset_callback(void *arg) +{ + pubctx = NULL; +} + /* * Initialize this plugin */ @@ -261,12 +277,22 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, { PGOutputData *data = palloc0(sizeof(PGOutputData)); static bool publication_callback_registered = false; + MemoryContextCallback *mcallback; /* Create our memory context for private allocations. */ data->context = AllocSetContextCreate(ctx->context, "logical replication output context", ALLOCSET_DEFAULT_SIZES); + Assert(pubctx == NULL); + pubctx = AllocSetContextCreate(ctx->context, + "logical replication publication list context", + ALLOCSET_SMALL_SIZES); + + mcallback = palloc0(sizeof(MemoryContextCallback)); + mcallback->func = pgoutput_pubctx_reset_callback; + MemoryContextRegisterResetCallback(ctx->context, mcallback); + ctx->output_plugin_private = data; /* This plugin uses binary protocol. */ @@ -786,8 +812,9 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx, /* * Shutdown the output plugin. * - * Note, we don't need to clean the data->context as it's child context - * of the ctx->context so it will be cleaned up by logical decoding machinery. + * Note, we don't need to clean the data->context and pubctx as they are + * child contexts of the ctx->context so they will be cleaned up by logical + * decoding machinery. */ static void pgoutput_shutdown(LogicalDecodingContext *ctx) @@ -797,6 +824,9 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) hash_destroy(RelationSyncCache); RelationSyncCache = NULL; } + + /* Better safe than sorry */ + pubctx = NULL; } /* @@ -1071,9 +1101,10 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) /* Reload publications if needed before use. */ if (!publications_valid) { - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - if (data->publications) - list_free_deep(data->publications); + Assert(pubctx); + + MemoryContextReset(pubctx); + oldctx = MemoryContextSwitchTo(pubctx); data->publications = LoadPublications(data->publication_names); MemoryContextSwitchTo(oldctx);