mirror of
https://github.com/postgres/postgres.git
synced 2025-12-10 14:22:35 +03:00
Remove fundamentally-redundant processing in jsonb_agg() et al.
The various variants of jsonb_agg() operate as follows, for each aggregate input value: 1. Build a JsonbValue tree representation of the input value. 2. Flatten the JsonbValue tree into a Jsonb in on-disk format. 3. Iterate through the Jsonb, building a JsonbValue that is part of the aggregate's state stored in aggcontext, but is otherwise identical to what phase 1 built. This is very slightly less silly than it sounds, because phase 1 involves calling non-JSONB code such as datatype output functions, which are likely to leak memory, and we don't want to leak into the aggcontext. Nonetheless, phases 2 and 3 are accomplishing exactly nothing that is useful if we can make phase 1 put the JsonbValue tree where we need it. We could probably do that with a bunch of MemoryContextSwitchTo's, but what seems more robust is to give pushJsonbValue the responsibility of building the JsonbValue tree in a specified non-current memory context. The previous patch created the infrastructure for that, and this patch simply makes the aggregate functions use it and then rips out phases 2 and 3. For me, this makes jsonb_agg() with a text column as input run about 2X faster than before. It's not yet on par with json_agg(), but this removes a whole lot of the difference. Author: Tom Lane <tgl@sss.pgh.pa.us> Reviewed-by: jian he <jian.universality@gmail.com> Reviewed-by: Chao Li <li.evan.chao@gmail.com> Discussion: https://postgr.es/m/1060917.1753202222@sss.pgh.pa.us
This commit is contained in:
@@ -27,7 +27,7 @@
|
||||
|
||||
typedef struct JsonbAggState
|
||||
{
|
||||
JsonbInState *res;
|
||||
JsonbInState pstate;
|
||||
JsonTypeCategory key_category;
|
||||
Oid key_output_func;
|
||||
JsonTypeCategory val_category;
|
||||
@@ -54,7 +54,6 @@ static void datum_to_jsonb_internal(Datum val, bool is_null, JsonbInState *resul
|
||||
bool key_scalar);
|
||||
static void add_jsonb(Datum val, bool is_null, JsonbInState *result,
|
||||
Oid val_type, bool key_scalar);
|
||||
static JsonbParseState *clone_parse_state(JsonbParseState *state);
|
||||
static char *JsonbToCStringWorker(StringInfo out, JsonbContainer *in, int estimated_len, bool indent);
|
||||
static void add_indent(StringInfo out, bool indent, int level);
|
||||
|
||||
@@ -1454,54 +1453,16 @@ close_object:
|
||||
|
||||
|
||||
/*
|
||||
* shallow clone of a parse state, suitable for use in aggregate
|
||||
* final functions that will only append to the values rather than
|
||||
* change them.
|
||||
* Functions for jsonb_agg, jsonb_object_agg, and variants
|
||||
*/
|
||||
static JsonbParseState *
|
||||
clone_parse_state(JsonbParseState *state)
|
||||
{
|
||||
JsonbParseState *result,
|
||||
*icursor,
|
||||
*ocursor;
|
||||
|
||||
if (state == NULL)
|
||||
return NULL;
|
||||
|
||||
result = palloc(sizeof(JsonbParseState));
|
||||
icursor = state;
|
||||
ocursor = result;
|
||||
for (;;)
|
||||
{
|
||||
ocursor->contVal = icursor->contVal;
|
||||
ocursor->size = icursor->size;
|
||||
ocursor->unique_keys = icursor->unique_keys;
|
||||
ocursor->skip_nulls = icursor->skip_nulls;
|
||||
icursor = icursor->next;
|
||||
if (icursor == NULL)
|
||||
break;
|
||||
ocursor->next = palloc(sizeof(JsonbParseState));
|
||||
ocursor = ocursor->next;
|
||||
}
|
||||
ocursor->next = NULL;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
static Datum
|
||||
jsonb_agg_transfn_worker(FunctionCallInfo fcinfo, bool absent_on_null)
|
||||
{
|
||||
MemoryContext oldcontext,
|
||||
aggcontext;
|
||||
MemoryContext aggcontext;
|
||||
JsonbAggState *state;
|
||||
JsonbInState elem;
|
||||
Datum val;
|
||||
JsonbInState *result;
|
||||
bool single_scalar = false;
|
||||
JsonbIterator *it;
|
||||
Jsonb *jbelem;
|
||||
JsonbValue v;
|
||||
JsonbIteratorToken type;
|
||||
|
||||
if (!AggCheckCallContext(fcinfo, &aggcontext))
|
||||
{
|
||||
@@ -1520,12 +1481,10 @@ jsonb_agg_transfn_worker(FunctionCallInfo fcinfo, bool absent_on_null)
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("could not determine input data type")));
|
||||
|
||||
oldcontext = MemoryContextSwitchTo(aggcontext);
|
||||
state = palloc(sizeof(JsonbAggState));
|
||||
result = palloc0(sizeof(JsonbInState));
|
||||
state->res = result;
|
||||
state = MemoryContextAllocZero(aggcontext, sizeof(JsonbAggState));
|
||||
result = &state->pstate;
|
||||
result->outcontext = aggcontext;
|
||||
pushJsonbValue(result, WJB_BEGIN_ARRAY, NULL);
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
json_categorize_type(arg_type, true, &state->val_category,
|
||||
&state->val_output_func);
|
||||
@@ -1533,74 +1492,23 @@ jsonb_agg_transfn_worker(FunctionCallInfo fcinfo, bool absent_on_null)
|
||||
else
|
||||
{
|
||||
state = (JsonbAggState *) PG_GETARG_POINTER(0);
|
||||
result = state->res;
|
||||
result = &state->pstate;
|
||||
}
|
||||
|
||||
if (absent_on_null && PG_ARGISNULL(1))
|
||||
PG_RETURN_POINTER(state);
|
||||
|
||||
/* turn the argument into jsonb in the normal function context */
|
||||
|
||||
/*
|
||||
* We run this code in the normal function context, so that we don't leak
|
||||
* any cruft from datatype output functions and such into the aggcontext.
|
||||
* But the "result" JsonbValue will be constructed in aggcontext, so that
|
||||
* it remains available across calls.
|
||||
*/
|
||||
val = PG_ARGISNULL(1) ? (Datum) 0 : PG_GETARG_DATUM(1);
|
||||
|
||||
memset(&elem, 0, sizeof(JsonbInState));
|
||||
|
||||
datum_to_jsonb_internal(val, PG_ARGISNULL(1), &elem, state->val_category,
|
||||
datum_to_jsonb_internal(val, PG_ARGISNULL(1), result, state->val_category,
|
||||
state->val_output_func, false);
|
||||
|
||||
jbelem = JsonbValueToJsonb(elem.result);
|
||||
|
||||
/* switch to the aggregate context for accumulation operations */
|
||||
|
||||
oldcontext = MemoryContextSwitchTo(aggcontext);
|
||||
|
||||
it = JsonbIteratorInit(&jbelem->root);
|
||||
|
||||
while ((type = JsonbIteratorNext(&it, &v, false)) != WJB_DONE)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case WJB_BEGIN_ARRAY:
|
||||
if (v.val.array.rawScalar)
|
||||
single_scalar = true;
|
||||
else
|
||||
pushJsonbValue(result, type, NULL);
|
||||
break;
|
||||
case WJB_END_ARRAY:
|
||||
if (!single_scalar)
|
||||
pushJsonbValue(result, type, NULL);
|
||||
break;
|
||||
case WJB_BEGIN_OBJECT:
|
||||
case WJB_END_OBJECT:
|
||||
pushJsonbValue(result, type, NULL);
|
||||
break;
|
||||
case WJB_ELEM:
|
||||
case WJB_KEY:
|
||||
case WJB_VALUE:
|
||||
if (v.type == jbvString)
|
||||
{
|
||||
/* copy string values in the aggregate context */
|
||||
char *buf = palloc(v.val.string.len + 1);
|
||||
|
||||
snprintf(buf, v.val.string.len + 1, "%s", v.val.string.val);
|
||||
v.val.string.val = buf;
|
||||
}
|
||||
else if (v.type == jbvNumeric)
|
||||
{
|
||||
/* same for numeric */
|
||||
v.val.numeric =
|
||||
DatumGetNumeric(DirectFunctionCall1(numeric_uplus,
|
||||
NumericGetDatum(v.val.numeric)));
|
||||
}
|
||||
pushJsonbValue(result, type, &v);
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unknown jsonb iterator token type");
|
||||
}
|
||||
}
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
PG_RETURN_POINTER(state);
|
||||
}
|
||||
|
||||
@@ -1638,17 +1546,18 @@ jsonb_agg_finalfn(PG_FUNCTION_ARGS)
|
||||
arg = (JsonbAggState *) PG_GETARG_POINTER(0);
|
||||
|
||||
/*
|
||||
* We need to do a shallow clone of the argument in case the final
|
||||
* function is called more than once, so we avoid changing the argument. A
|
||||
* shallow clone is sufficient as we aren't going to change any of the
|
||||
* values, just add the final array end marker.
|
||||
* The final function can be called more than once, so we must not change
|
||||
* the stored JsonbValue data structure. Fortunately, the WJB_END_ARRAY
|
||||
* action will only change fields in the JsonbInState struct itself, so we
|
||||
* can simply invoke pushJsonbValue on a local copy of that.
|
||||
*/
|
||||
memset(&result, 0, sizeof(JsonbInState));
|
||||
|
||||
result.parseState = clone_parse_state(arg->res->parseState);
|
||||
result = arg->pstate;
|
||||
|
||||
pushJsonbValue(&result, WJB_END_ARRAY, NULL);
|
||||
|
||||
/* We expect result.parseState == NULL after closing the array */
|
||||
Assert(result.parseState == NULL);
|
||||
|
||||
out = JsonbValueToJsonb(result.result);
|
||||
|
||||
PG_RETURN_POINTER(out);
|
||||
@@ -1658,18 +1567,10 @@ static Datum
|
||||
jsonb_object_agg_transfn_worker(FunctionCallInfo fcinfo,
|
||||
bool absent_on_null, bool unique_keys)
|
||||
{
|
||||
MemoryContext oldcontext,
|
||||
aggcontext;
|
||||
JsonbInState elem;
|
||||
MemoryContext aggcontext;
|
||||
JsonbAggState *state;
|
||||
Datum val;
|
||||
JsonbInState *result;
|
||||
bool single_scalar;
|
||||
JsonbIterator *it;
|
||||
Jsonb *jbkey,
|
||||
*jbval;
|
||||
JsonbValue v;
|
||||
JsonbIteratorToken type;
|
||||
bool skip;
|
||||
|
||||
if (!AggCheckCallContext(fcinfo, &aggcontext))
|
||||
@@ -1684,16 +1585,13 @@ jsonb_object_agg_transfn_worker(FunctionCallInfo fcinfo,
|
||||
{
|
||||
Oid arg_type;
|
||||
|
||||
oldcontext = MemoryContextSwitchTo(aggcontext);
|
||||
state = palloc(sizeof(JsonbAggState));
|
||||
result = palloc0(sizeof(JsonbInState));
|
||||
state->res = result;
|
||||
state = MemoryContextAllocZero(aggcontext, sizeof(JsonbAggState));
|
||||
result = &state->pstate;
|
||||
result->outcontext = aggcontext;
|
||||
pushJsonbValue(result, WJB_BEGIN_OBJECT, NULL);
|
||||
result->parseState->unique_keys = unique_keys;
|
||||
result->parseState->skip_nulls = absent_on_null;
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
arg_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
|
||||
|
||||
if (arg_type == InvalidOid)
|
||||
@@ -1717,11 +1615,9 @@ jsonb_object_agg_transfn_worker(FunctionCallInfo fcinfo,
|
||||
else
|
||||
{
|
||||
state = (JsonbAggState *) PG_GETARG_POINTER(0);
|
||||
result = state->res;
|
||||
result = &state->pstate;
|
||||
}
|
||||
|
||||
/* turn the argument into jsonb in the normal function context */
|
||||
|
||||
if (PG_ARGISNULL(1))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
@@ -1736,133 +1632,22 @@ jsonb_object_agg_transfn_worker(FunctionCallInfo fcinfo,
|
||||
if (skip && !unique_keys)
|
||||
PG_RETURN_POINTER(state);
|
||||
|
||||
/*
|
||||
* We run this code in the normal function context, so that we don't leak
|
||||
* any cruft from datatype output functions and such into the aggcontext.
|
||||
* But the "result" JsonbValue will be constructed in aggcontext, so that
|
||||
* it remains available across calls.
|
||||
*/
|
||||
val = PG_GETARG_DATUM(1);
|
||||
|
||||
memset(&elem, 0, sizeof(JsonbInState));
|
||||
|
||||
datum_to_jsonb_internal(val, false, &elem, state->key_category,
|
||||
datum_to_jsonb_internal(val, false, result, state->key_category,
|
||||
state->key_output_func, true);
|
||||
|
||||
jbkey = JsonbValueToJsonb(elem.result);
|
||||
|
||||
val = PG_ARGISNULL(2) ? (Datum) 0 : PG_GETARG_DATUM(2);
|
||||
|
||||
memset(&elem, 0, sizeof(JsonbInState));
|
||||
|
||||
datum_to_jsonb_internal(val, PG_ARGISNULL(2), &elem, state->val_category,
|
||||
datum_to_jsonb_internal(val, PG_ARGISNULL(2), result, state->val_category,
|
||||
state->val_output_func, false);
|
||||
|
||||
jbval = JsonbValueToJsonb(elem.result);
|
||||
|
||||
it = JsonbIteratorInit(&jbkey->root);
|
||||
|
||||
/* switch to the aggregate context for accumulation operations */
|
||||
|
||||
oldcontext = MemoryContextSwitchTo(aggcontext);
|
||||
|
||||
/*
|
||||
* keys should be scalar, and we should have already checked for that
|
||||
* above when calling datum_to_jsonb, so we only need to look for these
|
||||
* things.
|
||||
*/
|
||||
|
||||
while ((type = JsonbIteratorNext(&it, &v, false)) != WJB_DONE)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case WJB_BEGIN_ARRAY:
|
||||
if (!v.val.array.rawScalar)
|
||||
elog(ERROR, "unexpected structure for key");
|
||||
break;
|
||||
case WJB_ELEM:
|
||||
if (v.type == jbvString)
|
||||
{
|
||||
/* copy string values in the aggregate context */
|
||||
char *buf = palloc(v.val.string.len + 1);
|
||||
|
||||
snprintf(buf, v.val.string.len + 1, "%s", v.val.string.val);
|
||||
v.val.string.val = buf;
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("object keys must be strings")));
|
||||
}
|
||||
pushJsonbValue(result, WJB_KEY, &v);
|
||||
|
||||
if (skip)
|
||||
{
|
||||
v.type = jbvNull;
|
||||
pushJsonbValue(result, WJB_VALUE, &v);
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
PG_RETURN_POINTER(state);
|
||||
}
|
||||
|
||||
break;
|
||||
case WJB_END_ARRAY:
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unexpected structure for key");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
it = JsonbIteratorInit(&jbval->root);
|
||||
|
||||
single_scalar = false;
|
||||
|
||||
/*
|
||||
* values can be anything, including structured and null, so we treat them
|
||||
* as in json_agg_transfn, except that single scalars are always pushed as
|
||||
* WJB_VALUE items.
|
||||
*/
|
||||
|
||||
while ((type = JsonbIteratorNext(&it, &v, false)) != WJB_DONE)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case WJB_BEGIN_ARRAY:
|
||||
if (v.val.array.rawScalar)
|
||||
single_scalar = true;
|
||||
else
|
||||
pushJsonbValue(result, type, NULL);
|
||||
break;
|
||||
case WJB_END_ARRAY:
|
||||
if (!single_scalar)
|
||||
pushJsonbValue(result, type, NULL);
|
||||
break;
|
||||
case WJB_BEGIN_OBJECT:
|
||||
case WJB_END_OBJECT:
|
||||
pushJsonbValue(result, type, NULL);
|
||||
break;
|
||||
case WJB_ELEM:
|
||||
case WJB_KEY:
|
||||
case WJB_VALUE:
|
||||
if (v.type == jbvString)
|
||||
{
|
||||
/* copy string values in the aggregate context */
|
||||
char *buf = palloc(v.val.string.len + 1);
|
||||
|
||||
snprintf(buf, v.val.string.len + 1, "%s", v.val.string.val);
|
||||
v.val.string.val = buf;
|
||||
}
|
||||
else if (v.type == jbvNumeric)
|
||||
{
|
||||
/* same for numeric */
|
||||
v.val.numeric =
|
||||
DatumGetNumeric(DirectFunctionCall1(numeric_uplus,
|
||||
NumericGetDatum(v.val.numeric)));
|
||||
}
|
||||
pushJsonbValue(result, single_scalar ? WJB_VALUE : type, &v);
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unknown jsonb iterator token type");
|
||||
}
|
||||
}
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
PG_RETURN_POINTER(state);
|
||||
}
|
||||
|
||||
@@ -1919,18 +1704,20 @@ jsonb_object_agg_finalfn(PG_FUNCTION_ARGS)
|
||||
arg = (JsonbAggState *) PG_GETARG_POINTER(0);
|
||||
|
||||
/*
|
||||
* We need to do a shallow clone of the argument's res field in case the
|
||||
* final function is called more than once, so we avoid changing the
|
||||
* aggregate state value. A shallow clone is sufficient as we aren't
|
||||
* going to change any of the values, just add the final object end
|
||||
* marker.
|
||||
* The final function can be called more than once, so we must not change
|
||||
* the stored JsonbValue data structure. Fortunately, the WJB_END_OBJECT
|
||||
* action will only destructively change fields in the JsonbInState struct
|
||||
* itself, so we can simply invoke pushJsonbValue on a local copy of that.
|
||||
* (This technique results in running uniqueifyJsonbObject each time, but
|
||||
* for now we won't bother trying to avoid that.)
|
||||
*/
|
||||
memset(&result, 0, sizeof(JsonbInState));
|
||||
|
||||
result.parseState = clone_parse_state(arg->res->parseState);
|
||||
result = arg->pstate;
|
||||
|
||||
pushJsonbValue(&result, WJB_END_OBJECT, NULL);
|
||||
|
||||
/* We expect result.parseState == NULL after closing the object */
|
||||
Assert(result.parseState == NULL);
|
||||
|
||||
out = JsonbValueToJsonb(result.result);
|
||||
|
||||
PG_RETURN_POINTER(out);
|
||||
|
||||
Reference in New Issue
Block a user