diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 012987a9727..c7f1597f96e 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -276,25 +276,31 @@ logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, static Datum pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary) { - Name name = PG_GETARG_NAME(0); + Name name; XLogRecPtr upto_lsn; int32 upto_nchanges; - ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; MemoryContext per_query_ctx; MemoryContext oldcontext; - XLogRecPtr end_of_wal; XLogRecPtr startptr; - LogicalDecodingContext *ctx; - ResourceOwner old_resowner = CurrentResourceOwner; ArrayType *arr; Size ndim; List *options = NIL; DecodingOutputState *p; + check_permissions(); + + CheckLogicalDecodingRequirements(); + + if (PG_ARGISNULL(0)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("slot name must not be null"))); + name = PG_GETARG_NAME(0); + if (PG_ARGISNULL(1)) upto_lsn = InvalidXLogRecPtr; else @@ -305,6 +311,12 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin else upto_nchanges = PG_GETARG_INT32(2); + if (PG_ARGISNULL(3)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("options array must not be null"))); + arr = PG_GETARG_ARRAYTYPE_P(3); + /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) ereport(ERROR, @@ -324,16 +336,11 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin if (get_call_result_type(fcinfo, NULL, &p->tupdesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); - check_permissions(); - - CheckLogicalDecodingRequirements(); - - arr = PG_GETARG_ARRAYTYPE_P(3); - ndim = ARR_NDIM(arr); - per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; oldcontext = MemoryContextSwitchTo(per_query_ctx); + /* Deconstruct options array */ + ndim = ARR_NDIM(arr); if (ndim > 1) { ereport(ERROR, @@ -382,7 +389,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin else end_of_wal = GetXLogReplayRecPtr(NULL); - CheckLogicalDecodingRequirements(); ReplicationSlotAcquire(NameStr(*name)); PG_TRY(); @@ -444,6 +450,23 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin break; CHECK_FOR_INTERRUPTS(); } + + tuplestore_donestoring(tupstore); + + CurrentResourceOwner = old_resowner; + + /* + * Next time, start where we left off. (Hunting things, the family + * business..) + */ + if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm) + LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr); + + /* free context, call shutdown callback */ + FreeDecodingContext(ctx); + + ReplicationSlotRelease(); + InvalidateSystemCaches(); } PG_CATCH(); { @@ -454,23 +477,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin } PG_END_TRY(); - tuplestore_donestoring(tupstore); - - CurrentResourceOwner = old_resowner; - - /* - * Next time, start where we left off. (Hunting things, the family - * business..) - */ - if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm) - LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr); - - /* free context, call shutdown callback */ - FreeDecodingContext(ctx); - - ReplicationSlotRelease(); - InvalidateSystemCaches(); - return (Datum) 0; } @@ -480,9 +486,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS) { - Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, false); - - return ret; + return pg_logical_slot_get_changes_guts(fcinfo, true, false); } /* @@ -491,9 +495,7 @@ pg_logical_slot_get_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS) { - Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, false); - - return ret; + return pg_logical_slot_get_changes_guts(fcinfo, false, false); } /* @@ -502,9 +504,7 @@ pg_logical_slot_peek_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS) { - Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, true); - - return ret; + return pg_logical_slot_get_changes_guts(fcinfo, true, true); } /* @@ -513,7 +513,5 @@ pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS) { - Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, true); - - return ret; + return pg_logical_slot_get_changes_guts(fcinfo, false, true); } diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index c72fb933066..ae96a554175 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -602,7 +602,7 @@ DATA(insert OID = 3800 ( brincostestimate PGNSP PGUID 12 1 0 0 0 f f f f t f v DESCR("brin(internal)"); DATA(insert OID = 3801 ( brinoptions PGNSP PGUID 12 1 0 0 0 f f f f t f s 2 0 17 "1009 16" _null_ _null_ _null_ _null_ _null_ brinoptions _null_ _null_ _null_ )); DESCR("brin(internal)"); -DATA(insert OID = 3952 ( brin_summarize_new_values PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 23 "2205" _null_ _null_ _null_ _null_ _null_ brin_summarize_new_values _null_ _null_ _null_ )); +DATA(insert OID = 3952 ( brin_summarize_new_values PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 23 "2205" _null_ _null_ _null_ _null_ _null_ brin_summarize_new_values _null_ _null_ _null_ )); DESCR("brin: standalone scan new table pages"); DATA(insert OID = 339 ( poly_same PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 16 "604 604" _null_ _null_ _null_ _null_ _null_ poly_same _null_ _null_ _null_ )); @@ -2912,9 +2912,9 @@ DATA(insert OID = 2274 ( pg_stat_reset PGNSP PGUID 12 1 0 0 0 f f f f f f v DESCR("statistics: reset collected statistics for current database"); DATA(insert OID = 3775 ( pg_stat_reset_shared PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "25" _null_ _null_ _null_ _null_ _null_ pg_stat_reset_shared _null_ _null_ _null_ )); DESCR("statistics: reset collected statistics shared across the cluster"); -DATA(insert OID = 3776 ( pg_stat_reset_single_table_counters PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "26" _null_ _null_ _null_ _null_ _null_ pg_stat_reset_single_table_counters _null_ _null_ _null_ )); +DATA(insert OID = 3776 ( pg_stat_reset_single_table_counters PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "26" _null_ _null_ _null_ _null_ _null_ pg_stat_reset_single_table_counters _null_ _null_ _null_ )); DESCR("statistics: reset collected statistics for a single table or index in the current database"); -DATA(insert OID = 3777 ( pg_stat_reset_single_function_counters PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "26" _null_ _null_ _null_ _null_ _null_ pg_stat_reset_single_function_counters _null_ _null_ _null_ )); +DATA(insert OID = 3777 ( pg_stat_reset_single_function_counters PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "26" _null_ _null_ _null_ _null_ _null_ pg_stat_reset_single_function_counters _null_ _null_ _null_ )); DESCR("statistics: reset collected statistics for a single function in the current database"); DATA(insert OID = 3163 ( pg_trigger_depth PGNSP PGUID 12 1 0 0 0 f f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ _null_ pg_trigger_depth _null_ _null_ _null_ )); @@ -5187,13 +5187,13 @@ DATA(insert OID = 3473 ( spg_range_quad_leaf_consistent PGNSP PGUID 12 1 0 0 0 DESCR("SP-GiST support for quad tree over range"); /* replication slots */ -DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2249 "19" "{19,19,3220}" "{i,o,o}" "{slot_name,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ )); +DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2249 "19" "{19,19,3220}" "{i,o,o}" "{slot_name,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ )); DESCR("create a physical replication slot"); -DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); +DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); DESCR("drop a replication slot"); DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220}" "{o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ )); DESCR("information about replication slots currently in use"); -DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ )); +DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ )); DESCR("set up a logical replication slot"); DATA(insert OID = 3782 ( pg_logical_slot_get_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_get_changes _null_ _null_ _null_ )); DESCR("get changes from replication slot");