1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-27 23:21:58 +03:00

Add parallel query support functions for assorted aggregates.

This lets us use parallel aggregate for a variety of useful cases
that didn't work before, like sum(int8), sum(numeric), several
versions of avg(), and various other functions.

Add some regression tests, as well, testing the general sanity of
these and future catalog entries.

David Rowley, reviewed by Tomas Vondra, with a few further changes
by me.
This commit is contained in:
Robert Haas
2016-04-05 14:24:59 -04:00
parent 7117685461
commit 11c8669c0c
11 changed files with 1318 additions and 80 deletions

View File

@ -436,6 +436,7 @@ static int32 numericvar_to_int32(NumericVar *var);
static bool numericvar_to_int64(NumericVar *var, int64 *result);
static void int64_to_numericvar(int64 val, NumericVar *var);
#ifdef HAVE_INT128
static bool numericvar_to_int128(NumericVar *var, int128 *result);
static void int128_to_numericvar(int128 val, NumericVar *var);
#endif
static double numeric_to_double_no_overflow(Numeric num);
@ -3348,6 +3349,77 @@ numeric_accum(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(state);
}
/*
* Generic combine function for numeric aggregates which require sumX2
*/
Datum
numeric_combine(PG_FUNCTION_ARGS)
{
NumericAggState *state1;
NumericAggState *state2;
MemoryContext agg_context;
MemoryContext old_context;
if (!AggCheckCallContext(fcinfo, &agg_context))
elog(ERROR, "aggregate function called in non-aggregate context");
state1 = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
state2 = PG_ARGISNULL(1) ? NULL : (NumericAggState *) PG_GETARG_POINTER(1);
if (state2 == NULL)
PG_RETURN_POINTER(state1);
/* manually copy all fields from state2 to state1 */
if (state1 == NULL)
{
old_context = MemoryContextSwitchTo(agg_context);
state1 = makeNumericAggState(fcinfo, true);
state1->N = state2->N;
state1->NaNcount = state2->NaNcount;
state1->maxScale = state2->maxScale;
state1->maxScaleCount = state2->maxScaleCount;
init_var(&state1->sumX);
set_var_from_var(&state2->sumX, &state1->sumX);
init_var(&state1->sumX2);
set_var_from_var(&state2->sumX2, &state1->sumX2);
MemoryContextSwitchTo(old_context);
PG_RETURN_POINTER(state1);
}
if (state2->N > 0)
{
state1->N += state2->N;
state1->NaNcount += state2->NaNcount;
/*
* These are currently only needed for moving aggregates, but let's
* do the right thing anyway...
*/
if (state2->maxScale > state1->maxScale)
{
state1->maxScale = state2->maxScale;
state1->maxScaleCount = state2->maxScaleCount;
}
else if (state2->maxScale == state1->maxScale)
state1->maxScaleCount += state2->maxScaleCount;
/* The rest of this needs to work in the aggregate context */
old_context = MemoryContextSwitchTo(agg_context);
/* Accumulate sums */
add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
add_var(&(state1->sumX2), &(state2->sumX2), &(state1->sumX2));
MemoryContextSwitchTo(old_context);
}
PG_RETURN_POINTER(state1);
}
/*
* Generic transition function for numeric aggregates that don't require sumX2.
*/
@ -3368,6 +3440,307 @@ numeric_avg_accum(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(state);
}
/*
* Combine function for numeric aggregates which don't require sumX2
*/
Datum
numeric_avg_combine(PG_FUNCTION_ARGS)
{
NumericAggState *state1;
NumericAggState *state2;
MemoryContext agg_context;
MemoryContext old_context;
if (!AggCheckCallContext(fcinfo, &agg_context))
elog(ERROR, "aggregate function called in non-aggregate context");
state1 = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
state2 = PG_ARGISNULL(1) ? NULL : (NumericAggState *) PG_GETARG_POINTER(1);
if (state2 == NULL)
PG_RETURN_POINTER(state1);
/* manually copy all fields from state2 to state1 */
if (state1 == NULL)
{
old_context = MemoryContextSwitchTo(agg_context);
state1 = makeNumericAggState(fcinfo, false);
state1->N = state2->N;
state1->NaNcount = state2->NaNcount;
state1->maxScale = state2->maxScale;
state1->maxScaleCount = state2->maxScaleCount;
init_var(&state1->sumX);
set_var_from_var(&state2->sumX, &state1->sumX);
MemoryContextSwitchTo(old_context);
PG_RETURN_POINTER(state1);
}
if (state2->N > 0)
{
state1->N += state2->N;
state1->NaNcount += state2->NaNcount;
/*
* These are currently only needed for moving aggregates, but let's
* do the right thing anyway...
*/
if (state2->maxScale > state1->maxScale)
{
state1->maxScale = state2->maxScale;
state1->maxScaleCount = state2->maxScaleCount;
}
else if (state2->maxScale == state1->maxScale)
state1->maxScaleCount += state2->maxScaleCount;
/* The rest of this needs to work in the aggregate context */
old_context = MemoryContextSwitchTo(agg_context);
/* Accumulate sums */
add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
MemoryContextSwitchTo(old_context);
}
PG_RETURN_POINTER(state1);
}
/*
* numeric_avg_serialize
* Serialize NumericAggState for numeric aggregates that don't require
* sumX2. Serializes NumericAggState into bytea using the standard pq API.
*
* numeric_avg_deserialize(numeric_avg_serialize(state)) must result in a state
* which matches the original input state.
*/
Datum
numeric_avg_serialize(PG_FUNCTION_ARGS)
{
NumericAggState *state;
StringInfoData buf;
Datum temp;
bytea *sumX;
bytea *result;
/* Ensure we disallow calling when not in aggregate context */
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
state = (NumericAggState *) PG_GETARG_POINTER(0);
/*
* This is a little wasteful since make_result converts the NumericVar
* into a Numeric and numeric_send converts it back again. Is it worth
* splitting the tasks in numeric_send into separate functions to stop
* this? Doing so would also remove the fmgr call overhead.
*/
temp = DirectFunctionCall1(numeric_send,
NumericGetDatum(make_result(&state->sumX)));
sumX = DatumGetByteaP(temp);
pq_begintypsend(&buf);
/* N */
pq_sendint64(&buf, state->N);
/* sumX */
pq_sendbytes(&buf, VARDATA(sumX), VARSIZE(sumX) - VARHDRSZ);
/* maxScale */
pq_sendint(&buf, state->maxScale, 4);
/* maxScaleCount */
pq_sendint64(&buf, state->maxScaleCount);
/* NaNcount */
pq_sendint64(&buf, state->NaNcount);
result = pq_endtypsend(&buf);
PG_RETURN_BYTEA_P(result);
}
/*
* numeric_avg_deserialize
* Deserialize bytea into NumericAggState for numeric aggregates that
* don't require sumX2. Deserializes bytea into NumericAggState using the
* standard pq API.
*
* numeric_avg_serialize(numeric_avg_deserialize(bytea)) must result in a value
* which matches the original bytea value.
*/
Datum
numeric_avg_deserialize(PG_FUNCTION_ARGS)
{
bytea *sstate = PG_GETARG_BYTEA_P(0);
NumericAggState *result;
Datum temp;
StringInfoData buf;
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
/*
* Copy the bytea into a StringInfo so that we can "receive" it using the
* standard pq API.
*/
initStringInfo(&buf);
appendBinaryStringInfo(&buf, VARDATA(sstate), VARSIZE(sstate) - VARHDRSZ);
result = makeNumericAggState(fcinfo, false);
/* N */
result->N = pq_getmsgint64(&buf);
/* sumX */
temp = DirectFunctionCall3(numeric_recv,
PointerGetDatum(&buf),
InvalidOid,
-1);
set_var_from_num(DatumGetNumeric(temp), &result->sumX);
/* maxScale */
result->maxScale = pq_getmsgint(&buf, 4);
/* maxScaleCount */
result->maxScaleCount = pq_getmsgint64(&buf);
/* NaNcount */
result->NaNcount = pq_getmsgint64(&buf);
pq_getmsgend(&buf);
pfree(buf.data);
PG_RETURN_POINTER(result);
}
/*
* numeric_serialize
* Serialization function for NumericAggState for numeric aggregates that
* require sumX2. Serializes NumericAggState into bytea using the standard
* pq API.
*
* numeric_deserialize(numeric_serialize(state)) must result in a state which
* matches the original input state.
*/
Datum
numeric_serialize(PG_FUNCTION_ARGS)
{
NumericAggState *state;
StringInfoData buf;
Datum temp;
bytea *sumX;
bytea *sumX2;
bytea *result;
/* Ensure we disallow calling when not in aggregate context */
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
state = (NumericAggState *) PG_GETARG_POINTER(0);
/*
* This is a little wasteful since make_result converts the NumericVar
* into a Numeric and numeric_send converts it back again. Is it worth
* splitting the tasks in numeric_send into separate functions to stop
* this? Doing so would also remove the fmgr call overhead.
*/
temp = DirectFunctionCall1(numeric_send,
NumericGetDatum(make_result(&state->sumX)));
sumX = DatumGetByteaP(temp);
temp = DirectFunctionCall1(numeric_send,
NumericGetDatum(make_result(&state->sumX2)));
sumX2 = DatumGetByteaP(temp);
pq_begintypsend(&buf);
/* N */
pq_sendint64(&buf, state->N);
/* sumX */
pq_sendbytes(&buf, VARDATA(sumX), VARSIZE(sumX) - VARHDRSZ);
/* sumX2 */
pq_sendbytes(&buf, VARDATA(sumX2), VARSIZE(sumX2) - VARHDRSZ);
/* maxScale */
pq_sendint(&buf, state->maxScale, 4);
/* maxScaleCount */
pq_sendint64(&buf, state->maxScaleCount);
/* NaNcount */
pq_sendint64(&buf, state->NaNcount);
result = pq_endtypsend(&buf);
PG_RETURN_BYTEA_P(result);
}
/*
* numeric_deserialize
* Deserialization function for NumericAggState for numeric aggregates that
* require sumX2. Deserializes bytea into into NumericAggState using the
* standard pq API.
*
* numeric_serialize(numeric_deserialize(bytea)) must result in a value which
* matches the original bytea value.
*/
Datum
numeric_deserialize(PG_FUNCTION_ARGS)
{
bytea *sstate = PG_GETARG_BYTEA_P(0);
NumericAggState *result;
Datum temp;
StringInfoData buf;
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
/*
* Copy the bytea into a StringInfo so that we can "receive" it using the
* standard pq API.
*/
initStringInfo(&buf);
appendBinaryStringInfo(&buf, VARDATA(sstate), VARSIZE(sstate) - VARHDRSZ);
result = makeNumericAggState(fcinfo, false);
/* N */
result->N = pq_getmsgint64(&buf);
/* sumX */
temp = DirectFunctionCall3(numeric_recv,
PointerGetDatum(&buf),
InvalidOid,
-1);
set_var_from_num(DatumGetNumeric(temp), &result->sumX);
/* sumX2 */
temp = DirectFunctionCall3(numeric_recv,
PointerGetDatum(&buf),
InvalidOid,
-1);
set_var_from_num(DatumGetNumeric(temp), &result->sumX2);
/* maxScale */
result->maxScale = pq_getmsgint(&buf, 4);
/* maxScaleCount */
result->maxScaleCount = pq_getmsgint64(&buf);
/* NaNcount */
result->NaNcount = pq_getmsgint64(&buf);
pq_getmsgend(&buf);
pfree(buf.data);
PG_RETURN_POINTER(result);
}
/*
* Generic inverse transition function for numeric aggregates
* (with or without requirement for X^2).
@ -3551,6 +3924,215 @@ int8_accum(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(state);
}
/*
* Combine function for numeric aggregates which require sumX2
*/
Datum
numeric_poly_combine(PG_FUNCTION_ARGS)
{
PolyNumAggState *state1;
PolyNumAggState *state2;
MemoryContext agg_context;
MemoryContext old_context;
if (!AggCheckCallContext(fcinfo, &agg_context))
elog(ERROR, "aggregate function called in non-aggregate context");
state1 = PG_ARGISNULL(0) ? NULL : (PolyNumAggState *) PG_GETARG_POINTER(0);
state2 = PG_ARGISNULL(1) ? NULL : (PolyNumAggState *) PG_GETARG_POINTER(1);
if (state2 == NULL)
PG_RETURN_POINTER(state1);
/* manually copy all fields from state2 to state1 */
if (state1 == NULL)
{
old_context = MemoryContextSwitchTo(agg_context);
state1 = makePolyNumAggState(fcinfo, true);
state1->N = state2->N;
#ifdef HAVE_INT128
state1->sumX = state2->sumX;
state1->sumX2 = state2->sumX2;
#else
init_var(&(state1->sumX));
set_var_from_var(&(state2->sumX), &(state1->sumX));
init_var(&state1->sumX2);
set_var_from_var(&(state2->sumX2), &(state1->sumX2));
#endif
MemoryContextSwitchTo(old_context);
PG_RETURN_POINTER(state1);
}
if (state2->N > 0)
{
state1->N += state2->N;
#ifdef HAVE_INT128
state1->sumX += state2->sumX;
state1->sumX2 += state2->sumX2;
#else
/* The rest of this needs to work in the aggregate context */
old_context = MemoryContextSwitchTo(agg_context);
/* Accumulate sums */
add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
add_var(&(state1->sumX2), &(state2->sumX2), &(state1->sumX2));
MemoryContextSwitchTo(old_context);
#endif
}
PG_RETURN_POINTER(state1);
}
/*
* numeric_poly_serialize
* Serialize PolyNumAggState into bytea using the standard pq API for
* aggregate functions which require sumX2.
*
* numeric_poly_deserialize(numeric_poly_serialize(state)) must result in a
* state which matches the original input state.
*/
Datum
numeric_poly_serialize(PG_FUNCTION_ARGS)
{
PolyNumAggState *state;
StringInfoData buf;
bytea *sumX;
bytea *sumX2;
bytea *result;
/* Ensure we disallow calling when not in aggregate context */
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
state = (PolyNumAggState *) PG_GETARG_POINTER(0);
/*
* If the platform supports int128 then sumX and sumX2 will be a 128 bit
* integer type. Here we'll convert that into a numeric type so that the
* combine state is in the same format for both int128 enabled machines
* and machines which don't support that type. The logic here is that one
* day we might like to send these over to another server for further
* processing and we want a standard format to work with.
*/
{
Datum temp;
#ifdef HAVE_INT128
NumericVar num;
init_var(&num);
int128_to_numericvar(state->sumX, &num);
temp = DirectFunctionCall1(numeric_send,
NumericGetDatum(make_result(&num)));
sumX = DatumGetByteaP(temp);
int128_to_numericvar(state->sumX2, &num);
temp = DirectFunctionCall1(numeric_send,
NumericGetDatum(make_result(&num)));
sumX2 = DatumGetByteaP(temp);
free_var(&num);
#else
temp = DirectFunctionCall1(numeric_send,
NumericGetDatum(make_result(&state->sumX)));
sumX = DatumGetByteaP(temp);
temp = DirectFunctionCall1(numeric_send,
NumericGetDatum(make_result(&state->sumX2)));
sumX2 = DatumGetByteaP(temp);
#endif
}
pq_begintypsend(&buf);
/* N */
pq_sendint64(&buf, state->N);
/* sumX */
pq_sendbytes(&buf, VARDATA(sumX), VARSIZE(sumX) - VARHDRSZ);
/* sumX2 */
pq_sendbytes(&buf, VARDATA(sumX2), VARSIZE(sumX2) - VARHDRSZ);
result = pq_endtypsend(&buf);
PG_RETURN_BYTEA_P(result);
}
/*
* numeric_poly_deserialize
* Deserialize PolyNumAggState from bytea using the standard pq API for
* aggregate functions which require sumX2.
*
* numeric_poly_serialize(numeric_poly_deserialize(bytea)) must result in a
* state which matches the original input state.
*/
Datum
numeric_poly_deserialize(PG_FUNCTION_ARGS)
{
bytea *sstate = PG_GETARG_BYTEA_P(0);
PolyNumAggState *result;
Datum sumX;
Datum sumX2;
StringInfoData buf;
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
/*
* Copy the bytea into a StringInfo so that we can "receive" it using the
* standard pq API.
*/
initStringInfo(&buf);
appendBinaryStringInfo(&buf, VARDATA(sstate), VARSIZE(sstate) - VARHDRSZ);
result = makePolyNumAggState(fcinfo, false);
/* N */
result->N = pq_getmsgint64(&buf);
/* sumX */
sumX = DirectFunctionCall3(numeric_recv,
PointerGetDatum(&buf),
InvalidOid,
-1);
/* sumX2 */
sumX2 = DirectFunctionCall3(numeric_recv,
PointerGetDatum(&buf),
InvalidOid,
-1);
#ifdef HAVE_INT128
{
NumericVar num;
init_var(&num);
set_var_from_num(DatumGetNumeric(sumX), &num);
numericvar_to_int128(&num, &result->sumX);
set_var_from_num(DatumGetNumeric(sumX2), &num);
numericvar_to_int128(&num, &result->sumX2);
free_var(&num);
}
#else
set_var_from_num(DatumGetNumeric(sumX), &result->sumX);
set_var_from_num(DatumGetNumeric(sumX2), &result->sumX2);
#endif
pq_getmsgend(&buf);
pfree(buf.data);
PG_RETURN_POINTER(result);
}
/*
* Transition function for int8 input when we don't need sumX2.
*/
@ -3581,6 +4163,180 @@ int8_avg_accum(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(state);
}
/*
* Combine function for PolyNumAggState for aggregates which don't require
* sumX2
*/
Datum
int8_avg_combine(PG_FUNCTION_ARGS)
{
PolyNumAggState *state1;
PolyNumAggState *state2;
MemoryContext agg_context;
MemoryContext old_context;
if (!AggCheckCallContext(fcinfo, &agg_context))
elog(ERROR, "aggregate function called in non-aggregate context");
state1 = PG_ARGISNULL(0) ? NULL : (PolyNumAggState *) PG_GETARG_POINTER(0);
state2 = PG_ARGISNULL(1) ? NULL : (PolyNumAggState *) PG_GETARG_POINTER(1);
if (state2 == NULL)
PG_RETURN_POINTER(state1);
/* manually copy all fields from state2 to state1 */
if (state1 == NULL)
{
old_context = MemoryContextSwitchTo(agg_context);
state1 = makePolyNumAggState(fcinfo, false);
state1->N = state2->N;
#ifdef HAVE_INT128
state1->sumX = state2->sumX;
#else
init_var(&state1->sumX);
set_var_from_var(&state2->sumX, &state1->sumX);
#endif
MemoryContextSwitchTo(old_context);
PG_RETURN_POINTER(state1);
}
if (state2->N > 0)
{
state1->N += state2->N;
#ifdef HAVE_INT128
state1->sumX += state2->sumX;
#else
/* The rest of this needs to work in the aggregate context */
old_context = MemoryContextSwitchTo(agg_context);
/* Accumulate sums */
add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
MemoryContextSwitchTo(old_context);
#endif
}
PG_RETURN_POINTER(state1);
}
/*
* int8_avg_serialize
* Serialize PolyNumAggState into bytea using the standard pq API.
*
* int8_avg_deserialize(int8_avg_serialize(state)) must result in a state which
* matches the original input state.
*/
Datum
int8_avg_serialize(PG_FUNCTION_ARGS)
{
PolyNumAggState *state;
StringInfoData buf;
bytea *sumX;
bytea *result;
/* Ensure we disallow calling when not in aggregate context */
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
state = (PolyNumAggState *) PG_GETARG_POINTER(0);
/*
* If the platform supports int128 then sumX will be a 128 integer type.
* Here we'll convert that into a numeric type so that the combine state
* is in the same format for both int128 enabled machines and machines
* which don't support that type. The logic here is that one day we might
* like to send these over to another server for further processing and we
* want a standard format to work with.
*/
{
Datum temp;
#ifdef HAVE_INT128
NumericVar num;
init_var(&num);
int128_to_numericvar(state->sumX, &num);
temp = DirectFunctionCall1(numeric_send,
NumericGetDatum(make_result(&num)));
free_var(&num);
sumX = DatumGetByteaP(temp);
#else
temp = DirectFunctionCall1(numeric_send,
NumericGetDatum(make_result(&state->sumX)));
sumX = DatumGetByteaP(temp);
#endif
}
pq_begintypsend(&buf);
/* N */
pq_sendint64(&buf, state->N);
/* sumX */
pq_sendbytes(&buf, VARDATA(sumX), VARSIZE(sumX) - VARHDRSZ);
result = pq_endtypsend(&buf);
PG_RETURN_BYTEA_P(result);
}
/*
* int8_avg_deserialize
* Deserialize bytea back into PolyNumAggState.
*
* int8_avg_serialize(int8_avg_deserialize(bytea)) must result in a value which
* matches the original bytea value.
*/
Datum
int8_avg_deserialize(PG_FUNCTION_ARGS)
{
bytea *sstate = PG_GETARG_BYTEA_P(0);
PolyNumAggState *result;
StringInfoData buf;
Datum temp;
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
/*
* Copy the bytea into a StringInfo so that we can "receive" it using the
* standard pq API.
*/
initStringInfo(&buf);
appendBinaryStringInfo(&buf, VARDATA(sstate), VARSIZE(sstate) - VARHDRSZ);
result = makePolyNumAggState(fcinfo, false);
/* N */
result->N = pq_getmsgint64(&buf);
/* sumX */
temp = DirectFunctionCall3(numeric_recv,
PointerGetDatum(&buf),
InvalidOid,
-1);
#ifdef HAVE_INT128
{
NumericVar num;
init_var(&num);
set_var_from_num(DatumGetNumeric(temp), &num);
numericvar_to_int128(&num, &result->sumX);
free_var(&num);
}
#else
set_var_from_num(DatumGetNumeric(temp), &result->sumX);
#endif
pq_getmsgend(&buf);
pfree(buf.data);
PG_RETURN_POINTER(result);
}
/*
* Inverse transition functions to go with the above.
@ -4309,6 +5065,37 @@ int4_avg_accum(PG_FUNCTION_ARGS)
PG_RETURN_ARRAYTYPE_P(transarray);
}
Datum
int4_avg_combine(PG_FUNCTION_ARGS)
{
ArrayType *transarray1;
ArrayType *transarray2;
Int8TransTypeData *state1;
Int8TransTypeData *state2;
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
transarray1 = PG_GETARG_ARRAYTYPE_P(0);
transarray2 = PG_GETARG_ARRAYTYPE_P(1);
if (ARR_HASNULL(transarray1) ||
ARR_SIZE(transarray1) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
elog(ERROR, "expected 2-element int8 array");
if (ARR_HASNULL(transarray2) ||
ARR_SIZE(transarray2) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
elog(ERROR, "expected 2-element int8 array");
state1 = (Int8TransTypeData *) ARR_DATA_PTR(transarray1);
state2 = (Int8TransTypeData *) ARR_DATA_PTR(transarray2);
state1->count += state2->count;
state1->sum += state2->sum;
PG_RETURN_ARRAYTYPE_P(transarray1);
}
Datum
int2_avg_accum_inv(PG_FUNCTION_ARGS)
{
@ -5307,6 +6094,79 @@ int64_to_numericvar(int64 val, NumericVar *var)
}
#ifdef HAVE_INT128
/*
* Convert numeric to int128, rounding if needed.
*
* If overflow, return FALSE (no error is raised). Return TRUE if okay.
*/
static bool
numericvar_to_int128(NumericVar *var, int128 *result)
{
NumericDigit *digits;
int ndigits;
int weight;
int i;
int128 val,
oldval;
bool neg;
NumericVar rounded;
/* Round to nearest integer */
init_var(&rounded);
set_var_from_var(var, &rounded);
round_var(&rounded, 0);
/* Check for zero input */
strip_var(&rounded);
ndigits = rounded.ndigits;
if (ndigits == 0)
{
*result = 0;
free_var(&rounded);
return true;
}
/*
* For input like 10000000000, we must treat stripped digits as real. So
* the loop assumes there are weight+1 digits before the decimal point.
*/
weight = rounded.weight;
Assert(weight >= 0 && ndigits <= weight + 1);
/* Construct the result */
digits = rounded.digits;
neg = (rounded.sign == NUMERIC_NEG);
val = digits[0];
for (i = 1; i <= weight; i++)
{
oldval = val;
val *= NBASE;
if (i < ndigits)
val += digits[i];
/*
* The overflow check is a bit tricky because we want to accept
* INT128_MIN, which will overflow the positive accumulator. We can
* detect this case easily though because INT128_MIN is the only
* nonzero value for which -val == val (on a two's complement machine,
* anyway).
*/
if ((val / NBASE) != oldval) /* possible overflow? */
{
if (!neg || (-val) != val || val == 0 || oldval < 0)
{
free_var(&rounded);
return false;
}
}
}
free_var(&rounded);
*result = neg ? -val : val;
return true;
}
/*
* Convert 128 bit integer to numeric.
*/