mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-31 10:30:33 +03:00 
			
		
		
		
	Allow parallel aggregate on string_agg and array_agg
This adds combine, serial and deserial functions for the array_agg() and string_agg() aggregate functions, thus allowing these aggregates to partake in partial aggregations. This allows both parallel aggregation to take place when these aggregates are present and also allows additional partition-wise aggregation plan shapes to include plans that require additional aggregation once the partially aggregated results from the partitions have been combined. Author: David Rowley Reviewed-by: Andres Freund, Tomas Vondra, Stephen Frost, Tom Lane Discussion: https://postgr.es/m/CAKJS1f9sx_6GTcvd6TMuZnNtCh0VhBzhX6FZqw17TgVFH-ga_A@mail.gmail.com
This commit is contained in:
		| @@ -19746,7 +19746,7 @@ SELECT NULLIF(value, '(none)') ... | ||||
|        <para> | ||||
|         Collects all the input values, including nulls, into an array. | ||||
|        </para></entry> | ||||
|        <entry>No</entry> | ||||
|        <entry>Yes</entry> | ||||
|       </row> | ||||
| 
 | ||||
|       <row> | ||||
| @@ -19759,7 +19759,7 @@ SELECT NULLIF(value, '(none)') ... | ||||
|         dimension.  (The inputs must all have the same dimensionality, and | ||||
|         cannot be empty or null.) | ||||
|        </para></entry> | ||||
|        <entry>No</entry> | ||||
|        <entry>Yes</entry> | ||||
|       </row> | ||||
| 
 | ||||
|       <row> | ||||
| @@ -20099,7 +20099,7 @@ SELECT NULLIF(value, '(none)') ... | ||||
|         after the first is preceded by the | ||||
|         corresponding <parameter>delimiter</parameter> (if it's not null). | ||||
|        </para></entry> | ||||
|        <entry>No</entry> | ||||
|        <entry>Yes</entry> | ||||
|       </row> | ||||
| 
 | ||||
|       <row> | ||||
|   | ||||
| @@ -305,10 +305,30 @@ preprocess_aggref(Aggref *aggref, PlannerInfo *root) | ||||
| 				 * functions; if not, we can't serialize partial-aggregation | ||||
| 				 * results. | ||||
| 				 */ | ||||
| 				else if (transinfo->aggtranstype == INTERNALOID && | ||||
| 						 (!OidIsValid(transinfo->serialfn_oid) || | ||||
| 						  !OidIsValid(transinfo->deserialfn_oid))) | ||||
| 					root->hasNonSerialAggs = true; | ||||
| 				else if (transinfo->aggtranstype == INTERNALOID) | ||||
| 				{ | ||||
|  | ||||
| 					if (!OidIsValid(transinfo->serialfn_oid) || | ||||
| 						!OidIsValid(transinfo->deserialfn_oid)) | ||||
| 						root->hasNonSerialAggs = true; | ||||
|  | ||||
| 					/* | ||||
| 					 * array_agg_serialize and array_agg_deserialize make use | ||||
| 					 * of the aggregate non-byval input type's send and | ||||
| 					 * receive functions.  There's a chance that the type | ||||
| 					 * being aggregated has one or both of these functions | ||||
| 					 * missing.  In this case we must not allow the | ||||
| 					 * aggregate's serial and deserial functions to be used. | ||||
| 					 * It would be nice not to have special case this and | ||||
| 					 * instead provide some sort of supporting function within | ||||
| 					 * the aggregate to do this, but for now, that seems like | ||||
| 					 * overkill for this one case. | ||||
| 					 */ | ||||
| 					if ((transinfo->serialfn_oid == F_ARRAY_AGG_SERIALIZE || | ||||
| 						 transinfo->deserialfn_oid == F_ARRAY_AGG_DESERIALIZE) && | ||||
| 						!agg_args_support_sendreceive(aggref)) | ||||
| 						root->hasNonSerialAggs = true; | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		agginfo->transno = transno; | ||||
|   | ||||
| @@ -14,6 +14,7 @@ | ||||
|  */ | ||||
| #include "postgres.h" | ||||
|  | ||||
| #include "access/htup_details.h" | ||||
| #include "catalog/pg_aggregate.h" | ||||
| #include "catalog/pg_constraint.h" | ||||
| #include "catalog/pg_type.h" | ||||
| @@ -28,7 +29,7 @@ | ||||
| #include "rewrite/rewriteManip.h" | ||||
| #include "utils/builtins.h" | ||||
| #include "utils/lsyscache.h" | ||||
|  | ||||
| #include "utils/syscache.h" | ||||
|  | ||||
| typedef struct | ||||
| { | ||||
| @@ -1947,6 +1948,40 @@ resolve_aggregate_transtype(Oid aggfuncid, | ||||
| 	return aggtranstype; | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * agg_args_support_sendreceive | ||||
|  *		Returns true if all non-byval of aggref's arg types have send and | ||||
|  *		receive functions. | ||||
|  */ | ||||
| bool | ||||
| agg_args_support_sendreceive(Aggref *aggref) | ||||
| { | ||||
| 	ListCell   *lc; | ||||
|  | ||||
| 	foreach(lc, aggref->args) | ||||
| 	{ | ||||
| 		HeapTuple	typeTuple; | ||||
| 		Form_pg_type pt; | ||||
| 		TargetEntry *tle = (TargetEntry *) lfirst(lc); | ||||
| 		Oid			type = exprType((Node *) tle->expr); | ||||
|  | ||||
| 		typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type)); | ||||
| 		if (!HeapTupleIsValid(typeTuple)) | ||||
| 			elog(ERROR, "cache lookup failed for type %u", type); | ||||
|  | ||||
| 		pt = (Form_pg_type) GETSTRUCT(typeTuple); | ||||
|  | ||||
| 		if (!pt->typbyval && | ||||
| 			(!OidIsValid(pt->typsend) || !OidIsValid(pt->typreceive))) | ||||
| 		{ | ||||
| 			ReleaseSysCache(typeTuple); | ||||
| 			return false; | ||||
| 		} | ||||
| 		ReleaseSysCache(typeTuple); | ||||
| 	} | ||||
| 	return true; | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Create an expression tree for the transition function of an aggregate. | ||||
|  * This is needed so that polymorphic functions can be used within an | ||||
|   | ||||
| @@ -13,12 +13,33 @@ | ||||
| #include "postgres.h" | ||||
|  | ||||
| #include "catalog/pg_type.h" | ||||
| #include "libpq/pqformat.h" | ||||
| #include "common/int.h" | ||||
| #include "port/pg_bitutils.h" | ||||
| #include "utils/array.h" | ||||
| #include "utils/datum.h" | ||||
| #include "utils/builtins.h" | ||||
| #include "utils/lsyscache.h" | ||||
| #include "utils/typcache.h" | ||||
|  | ||||
| /* | ||||
|  * SerialIOData | ||||
|  *		Used for caching element-type data in array_agg_serialize | ||||
|  */ | ||||
| typedef struct SerialIOData | ||||
| { | ||||
| 	FmgrInfo	typsend; | ||||
| } SerialIOData; | ||||
|  | ||||
| /* | ||||
|  * DeserialIOData | ||||
|  *		Used for caching element-type data in array_agg_deserialize | ||||
|  */ | ||||
| typedef struct DeserialIOData | ||||
| { | ||||
| 	FmgrInfo	typreceive; | ||||
| 	Oid			typioparam; | ||||
| } DeserialIOData; | ||||
|  | ||||
| static Datum array_position_common(FunctionCallInfo fcinfo); | ||||
|  | ||||
| @@ -499,6 +520,316 @@ array_agg_transfn(PG_FUNCTION_ARGS) | ||||
| 	PG_RETURN_POINTER(state); | ||||
| } | ||||
|  | ||||
| Datum | ||||
| array_agg_combine(PG_FUNCTION_ARGS) | ||||
| { | ||||
| 	ArrayBuildState *state1; | ||||
| 	ArrayBuildState *state2; | ||||
| 	MemoryContext agg_context; | ||||
| 	MemoryContext old_context; | ||||
|  | ||||
| 	if (!AggCheckCallContext(fcinfo, &agg_context)) | ||||
| 		elog(ERROR, "aggregate function called in non-aggregate context"); | ||||
|  | ||||
| 	state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildState *) PG_GETARG_POINTER(0); | ||||
| 	state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildState *) PG_GETARG_POINTER(1); | ||||
|  | ||||
| 	if (state2 == NULL) | ||||
| 	{ | ||||
| 		/* | ||||
| 		 * NULL state2 is easy, just return state1, which we know is already | ||||
| 		 * in the agg_context | ||||
| 		 */ | ||||
| 		if (state1 == NULL) | ||||
| 			PG_RETURN_NULL(); | ||||
| 		PG_RETURN_POINTER(state1); | ||||
| 	} | ||||
|  | ||||
| 	if (state1 == NULL) | ||||
| 	{ | ||||
| 		/* We must copy state2's data into the agg_context */ | ||||
| 		state1 = initArrayResultWithSize(state2->element_type, agg_context, | ||||
| 										 false, state2->alen); | ||||
|  | ||||
| 		old_context = MemoryContextSwitchTo(agg_context); | ||||
|  | ||||
| 		for (int i = 0; i < state2->nelems; i++) | ||||
| 		{ | ||||
| 			if (!state2->dnulls[i]) | ||||
| 				state1->dvalues[i] = datumCopy(state2->dvalues[i], | ||||
| 											   state1->typbyval, | ||||
| 											   state1->typlen); | ||||
| 			else | ||||
| 				state1->dvalues[i] = (Datum) 0; | ||||
| 		} | ||||
|  | ||||
| 		MemoryContextSwitchTo(old_context); | ||||
|  | ||||
| 		memcpy(state1->dnulls, state2->dnulls, sizeof(bool) * state2->nelems); | ||||
|  | ||||
| 		state1->nelems = state2->nelems; | ||||
|  | ||||
| 		PG_RETURN_POINTER(state1); | ||||
| 	} | ||||
| 	else if (state2->nelems > 0) | ||||
| 	{ | ||||
| 		/* We only need to combine the two states if state2 has any elements */ | ||||
| 		int			reqsize = state1->nelems + state2->nelems; | ||||
| 		MemoryContext oldContext = MemoryContextSwitchTo(state1->mcontext); | ||||
|  | ||||
| 		Assert(state1->element_type == state2->element_type); | ||||
|  | ||||
| 		/* Enlarge state1 arrays if needed */ | ||||
| 		if (state1->alen < reqsize) | ||||
| 		{ | ||||
| 			/* Use a power of 2 size rather than allocating just reqsize */ | ||||
| 			state1->alen = pg_nextpower2_32(reqsize); | ||||
| 			state1->dvalues = (Datum *) repalloc(state1->dvalues, | ||||
| 												 state1->alen * sizeof(Datum)); | ||||
| 			state1->dnulls = (bool *) repalloc(state1->dnulls, | ||||
| 											   state1->alen * sizeof(bool)); | ||||
| 		} | ||||
|  | ||||
| 		/* Copy in the state2 elements to the end of the state1 arrays */ | ||||
| 		for (int i = 0; i < state2->nelems; i++) | ||||
| 		{ | ||||
| 			if (!state2->dnulls[i]) | ||||
| 				state1->dvalues[i + state1->nelems] = | ||||
| 					datumCopy(state2->dvalues[i], | ||||
| 							  state1->typbyval, | ||||
| 							  state1->typlen); | ||||
| 			else | ||||
| 				state1->dvalues[i + state1->nelems] = (Datum) 0; | ||||
| 		} | ||||
|  | ||||
| 		memcpy(&state1->dnulls[state1->nelems], state2->dnulls, | ||||
| 			   sizeof(bool) * state2->nelems); | ||||
|  | ||||
| 		state1->nelems = reqsize; | ||||
|  | ||||
| 		MemoryContextSwitchTo(oldContext); | ||||
| 	} | ||||
|  | ||||
| 	PG_RETURN_POINTER(state1); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * array_agg_serialize | ||||
|  *		Serialize ArrayBuildState into bytea. | ||||
|  */ | ||||
| Datum | ||||
| array_agg_serialize(PG_FUNCTION_ARGS) | ||||
| { | ||||
| 	ArrayBuildState *state; | ||||
| 	StringInfoData buf; | ||||
| 	bytea	   *result; | ||||
|  | ||||
| 	/* cannot be called directly because of internal-type argument */ | ||||
| 	Assert(AggCheckCallContext(fcinfo, NULL)); | ||||
|  | ||||
| 	state = (ArrayBuildState *) PG_GETARG_POINTER(0); | ||||
|  | ||||
| 	pq_begintypsend(&buf); | ||||
|  | ||||
| 	/* | ||||
| 	 * element_type. Putting this first is more convenient in deserialization | ||||
| 	 */ | ||||
| 	pq_sendint32(&buf, state->element_type); | ||||
|  | ||||
| 	/* | ||||
| 	 * nelems -- send first so we know how large to make the dvalues and | ||||
| 	 * dnulls array during deserialization. | ||||
| 	 */ | ||||
| 	pq_sendint64(&buf, state->nelems); | ||||
|  | ||||
| 	/* alen can be decided during deserialization */ | ||||
|  | ||||
| 	/* typlen */ | ||||
| 	pq_sendint16(&buf, state->typlen); | ||||
|  | ||||
| 	/* typbyval */ | ||||
| 	pq_sendbyte(&buf, state->typbyval); | ||||
|  | ||||
| 	/* typalign */ | ||||
| 	pq_sendbyte(&buf, state->typalign); | ||||
|  | ||||
| 	/* dnulls */ | ||||
| 	pq_sendbytes(&buf, (char *) state->dnulls, sizeof(bool) * state->nelems); | ||||
|  | ||||
| 	/* | ||||
| 	 * dvalues.  By agreement with array_agg_deserialize, when the element | ||||
| 	 * type is byval, we just transmit the Datum array as-is, including any | ||||
| 	 * null elements.  For by-ref types, we must invoke the element type's | ||||
| 	 * send function, and we skip null elements (which is why the nulls flags | ||||
| 	 * must be sent first). | ||||
| 	 */ | ||||
| 	if (state->typbyval) | ||||
| 		pq_sendbytes(&buf, (char *) state->dvalues, | ||||
| 					 sizeof(Datum) * state->nelems); | ||||
| 	else | ||||
| 	{ | ||||
| 		SerialIOData *iodata; | ||||
| 		int			i; | ||||
|  | ||||
| 		/* Avoid repeat catalog lookups for typsend function */ | ||||
| 		iodata = (SerialIOData *) fcinfo->flinfo->fn_extra; | ||||
| 		if (iodata == NULL) | ||||
| 		{ | ||||
| 			Oid			typsend; | ||||
| 			bool		typisvarlena; | ||||
|  | ||||
| 			iodata = (SerialIOData *) | ||||
| 				MemoryContextAlloc(fcinfo->flinfo->fn_mcxt, | ||||
| 								   sizeof(SerialIOData)); | ||||
| 			getTypeBinaryOutputInfo(state->element_type, &typsend, | ||||
| 									&typisvarlena); | ||||
| 			fmgr_info_cxt(typsend, &iodata->typsend, | ||||
| 						  fcinfo->flinfo->fn_mcxt); | ||||
| 			fcinfo->flinfo->fn_extra = (void *) iodata; | ||||
| 		} | ||||
|  | ||||
| 		for (i = 0; i < state->nelems; i++) | ||||
| 		{ | ||||
| 			bytea	   *outputbytes; | ||||
|  | ||||
| 			if (state->dnulls[i]) | ||||
| 				continue; | ||||
| 			outputbytes = SendFunctionCall(&iodata->typsend, | ||||
| 										   state->dvalues[i]); | ||||
| 			pq_sendint32(&buf, VARSIZE(outputbytes) - VARHDRSZ); | ||||
| 			pq_sendbytes(&buf, VARDATA(outputbytes), | ||||
| 						 VARSIZE(outputbytes) - VARHDRSZ); | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	result = pq_endtypsend(&buf); | ||||
|  | ||||
| 	PG_RETURN_BYTEA_P(result); | ||||
| } | ||||
|  | ||||
| Datum | ||||
| array_agg_deserialize(PG_FUNCTION_ARGS) | ||||
| { | ||||
| 	bytea	   *sstate; | ||||
| 	ArrayBuildState *result; | ||||
| 	StringInfoData buf; | ||||
| 	Oid			element_type; | ||||
| 	int64		nelems; | ||||
| 	const char *temp; | ||||
|  | ||||
| 	if (!AggCheckCallContext(fcinfo, NULL)) | ||||
| 		elog(ERROR, "aggregate function called in non-aggregate context"); | ||||
|  | ||||
| 	sstate = PG_GETARG_BYTEA_PP(0); | ||||
|  | ||||
| 	/* | ||||
| 	 * Copy the bytea into a StringInfo so that we can "receive" it using the | ||||
| 	 * standard recv-function infrastructure. | ||||
| 	 */ | ||||
| 	initStringInfo(&buf); | ||||
| 	appendBinaryStringInfo(&buf, | ||||
| 						   VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); | ||||
|  | ||||
| 	/* element_type */ | ||||
| 	element_type = pq_getmsgint(&buf, 4); | ||||
|  | ||||
| 	/* nelems */ | ||||
| 	nelems = pq_getmsgint64(&buf); | ||||
|  | ||||
| 	/* Create output ArrayBuildState with the needed number of elements */ | ||||
| 	result = initArrayResultWithSize(element_type, CurrentMemoryContext, | ||||
| 									 false, nelems); | ||||
| 	result->nelems = nelems; | ||||
|  | ||||
| 	/* typlen */ | ||||
| 	result->typlen = pq_getmsgint(&buf, 2); | ||||
|  | ||||
| 	/* typbyval */ | ||||
| 	result->typbyval = pq_getmsgbyte(&buf); | ||||
|  | ||||
| 	/* typalign */ | ||||
| 	result->typalign = pq_getmsgbyte(&buf); | ||||
|  | ||||
| 	/* dnulls */ | ||||
| 	temp = pq_getmsgbytes(&buf, sizeof(bool) * nelems); | ||||
| 	memcpy(result->dnulls, temp, sizeof(bool) * nelems); | ||||
|  | ||||
| 	/* dvalues --- see comment in array_agg_serialize */ | ||||
| 	if (result->typbyval) | ||||
| 	{ | ||||
| 		temp = pq_getmsgbytes(&buf, sizeof(Datum) * nelems); | ||||
| 		memcpy(result->dvalues, temp, sizeof(Datum) * nelems); | ||||
| 	} | ||||
| 	else | ||||
| 	{ | ||||
| 		DeserialIOData *iodata; | ||||
|  | ||||
| 		/* Avoid repeat catalog lookups for typreceive function */ | ||||
| 		iodata = (DeserialIOData *) fcinfo->flinfo->fn_extra; | ||||
| 		if (iodata == NULL) | ||||
| 		{ | ||||
| 			Oid			typreceive; | ||||
|  | ||||
| 			iodata = (DeserialIOData *) | ||||
| 				MemoryContextAlloc(fcinfo->flinfo->fn_mcxt, | ||||
| 								   sizeof(DeserialIOData)); | ||||
| 			getTypeBinaryInputInfo(element_type, &typreceive, | ||||
| 								   &iodata->typioparam); | ||||
| 			fmgr_info_cxt(typreceive, &iodata->typreceive, | ||||
| 						  fcinfo->flinfo->fn_mcxt); | ||||
| 			fcinfo->flinfo->fn_extra = (void *) iodata; | ||||
| 		} | ||||
|  | ||||
| 		for (int i = 0; i < nelems; i++) | ||||
| 		{ | ||||
| 			int			itemlen; | ||||
| 			StringInfoData elem_buf; | ||||
| 			char		csave; | ||||
|  | ||||
| 			if (result->dnulls[i]) | ||||
| 			{ | ||||
| 				result->dvalues[i] = (Datum) 0; | ||||
| 				continue; | ||||
| 			} | ||||
|  | ||||
| 			itemlen = pq_getmsgint(&buf, 4); | ||||
| 			if (itemlen < 0 || itemlen > (buf.len - buf.cursor)) | ||||
| 				ereport(ERROR, | ||||
| 						(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), | ||||
| 						 errmsg("insufficient data left in message"))); | ||||
|  | ||||
| 			/* | ||||
| 			 * Rather than copying data around, we just set up a phony | ||||
| 			 * StringInfo pointing to the correct portion of the input buffer. | ||||
| 			 * We assume we can scribble on the input buffer so as to maintain | ||||
| 			 * the convention that StringInfos have a trailing null. | ||||
| 			 */ | ||||
| 			elem_buf.data = &buf.data[buf.cursor]; | ||||
| 			elem_buf.maxlen = itemlen + 1; | ||||
| 			elem_buf.len = itemlen; | ||||
| 			elem_buf.cursor = 0; | ||||
|  | ||||
| 			buf.cursor += itemlen; | ||||
|  | ||||
| 			csave = buf.data[buf.cursor]; | ||||
| 			buf.data[buf.cursor] = '\0'; | ||||
|  | ||||
| 			/* Now call the element's receiveproc */ | ||||
| 			result->dvalues[i] = ReceiveFunctionCall(&iodata->typreceive, | ||||
| 													 &elem_buf, | ||||
| 													 iodata->typioparam, | ||||
| 													 -1); | ||||
|  | ||||
| 			buf.data[buf.cursor] = csave; | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	pq_getmsgend(&buf); | ||||
| 	pfree(buf.data); | ||||
|  | ||||
| 	PG_RETURN_POINTER(result); | ||||
| } | ||||
|  | ||||
| Datum | ||||
| array_agg_finalfn(PG_FUNCTION_ARGS) | ||||
| { | ||||
| @@ -578,6 +909,299 @@ array_agg_array_transfn(PG_FUNCTION_ARGS) | ||||
| 	PG_RETURN_POINTER(state); | ||||
| } | ||||
|  | ||||
| Datum | ||||
| array_agg_array_combine(PG_FUNCTION_ARGS) | ||||
| { | ||||
| 	ArrayBuildStateArr *state1; | ||||
| 	ArrayBuildStateArr *state2; | ||||
| 	MemoryContext agg_context; | ||||
| 	MemoryContext old_context; | ||||
|  | ||||
| 	if (!AggCheckCallContext(fcinfo, &agg_context)) | ||||
| 		elog(ERROR, "aggregate function called in non-aggregate context"); | ||||
|  | ||||
| 	state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildStateArr *) PG_GETARG_POINTER(0); | ||||
| 	state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildStateArr *) PG_GETARG_POINTER(1); | ||||
|  | ||||
| 	if (state2 == NULL) | ||||
| 	{ | ||||
| 		/* | ||||
| 		 * NULL state2 is easy, just return state1, which we know is already | ||||
| 		 * in the agg_context | ||||
| 		 */ | ||||
| 		if (state1 == NULL) | ||||
| 			PG_RETURN_NULL(); | ||||
| 		PG_RETURN_POINTER(state1); | ||||
| 	} | ||||
|  | ||||
| 	if (state1 == NULL) | ||||
| 	{ | ||||
| 		/* We must copy state2's data into the agg_context */ | ||||
| 		old_context = MemoryContextSwitchTo(agg_context); | ||||
|  | ||||
| 		state1 = initArrayResultArr(state2->array_type, InvalidOid, | ||||
| 									agg_context, false); | ||||
|  | ||||
| 		state1->abytes = state2->abytes; | ||||
| 		state1->data = (char *) palloc(state1->abytes); | ||||
|  | ||||
| 		if (state2->nullbitmap) | ||||
| 		{ | ||||
| 			int			size = (state2->aitems + 7) / 8; | ||||
|  | ||||
| 			state1->nullbitmap = (bits8 *) palloc(size); | ||||
| 			memcpy(state1->nullbitmap, state2->nullbitmap, size); | ||||
| 		} | ||||
|  | ||||
| 		memcpy(state1->data, state2->data, state2->nbytes); | ||||
| 		state1->nbytes = state2->nbytes; | ||||
| 		state1->aitems = state2->aitems; | ||||
| 		state1->nitems = state2->nitems; | ||||
| 		state1->ndims = state2->ndims; | ||||
| 		memcpy(state1->dims, state2->dims, sizeof(state2->dims)); | ||||
| 		memcpy(state1->lbs, state2->lbs, sizeof(state2->lbs)); | ||||
| 		state1->array_type = state2->array_type; | ||||
| 		state1->element_type = state2->element_type; | ||||
|  | ||||
| 		MemoryContextSwitchTo(old_context); | ||||
|  | ||||
| 		PG_RETURN_POINTER(state1); | ||||
| 	} | ||||
|  | ||||
| 	/* We only need to combine the two states if state2 has any items */ | ||||
| 	else if (state2->nitems > 0) | ||||
| 	{ | ||||
| 		MemoryContext oldContext; | ||||
| 		int			reqsize = state1->nbytes + state2->nbytes; | ||||
| 		int			i; | ||||
|  | ||||
| 		/* | ||||
| 		 * Check the states are compatible with each other.  Ensure we use the | ||||
| 		 * same error messages that are listed in accumArrayResultArr so that | ||||
| 		 * the same error is shown as would have been if we'd not used the | ||||
| 		 * combine function for the aggregation. | ||||
| 		 */ | ||||
| 		if (state1->ndims != state2->ndims) | ||||
| 			ereport(ERROR, | ||||
| 					(errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR), | ||||
| 					 errmsg("cannot accumulate arrays of different dimensionality"))); | ||||
|  | ||||
| 		/* Check dimensions match ignoring the first dimension. */ | ||||
| 		for (i = 1; i < state1->ndims; i++) | ||||
| 		{ | ||||
| 			if (state1->dims[i] != state2->dims[i] || state1->lbs[i] != state2->lbs[i]) | ||||
| 				ereport(ERROR, | ||||
| 						(errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR), | ||||
| 						 errmsg("cannot accumulate arrays of different dimensionality"))); | ||||
| 		} | ||||
|  | ||||
|  | ||||
| 		oldContext = MemoryContextSwitchTo(state1->mcontext); | ||||
|  | ||||
| 		/* | ||||
| 		 * If there's not enough space in state1 then we'll need to reallocate | ||||
| 		 * more. | ||||
| 		 */ | ||||
| 		if (state1->abytes < reqsize) | ||||
| 		{ | ||||
| 			/* use a power of 2 size rather than allocating just reqsize */ | ||||
| 			state1->abytes = pg_nextpower2_32(reqsize); | ||||
| 			state1->data = (char *) repalloc(state1->data, state1->abytes); | ||||
| 		} | ||||
|  | ||||
| 		if (state2->nullbitmap) | ||||
| 		{ | ||||
| 			int			newnitems = state1->nitems + state2->nitems; | ||||
|  | ||||
| 			if (state1->nullbitmap == NULL) | ||||
| 			{ | ||||
| 				/* | ||||
| 				 * First input with nulls; we must retrospectively handle any | ||||
| 				 * previous inputs by marking all their items non-null. | ||||
| 				 */ | ||||
| 				state1->aitems = pg_nextpower2_32(Max(256, newnitems + 1)); | ||||
| 				state1->nullbitmap = (bits8 *) palloc((state1->aitems + 7) / 8); | ||||
| 				array_bitmap_copy(state1->nullbitmap, 0, | ||||
| 								  NULL, 0, | ||||
| 								  state1->nitems); | ||||
| 			} | ||||
| 			else if (newnitems > state1->aitems) | ||||
| 			{ | ||||
| 				int			newaitems = state1->aitems + state2->aitems; | ||||
|  | ||||
| 				state1->aitems = pg_nextpower2_32(newaitems); | ||||
| 				state1->nullbitmap = (bits8 *) | ||||
| 					repalloc(state1->nullbitmap, (state1->aitems + 7) / 8); | ||||
| 			} | ||||
| 			array_bitmap_copy(state1->nullbitmap, state1->nitems, | ||||
| 							  state2->nullbitmap, 0, | ||||
| 							  state2->nitems); | ||||
| 		} | ||||
|  | ||||
| 		memcpy(state1->data + state1->nbytes, state2->data, state2->nbytes); | ||||
| 		state1->nbytes += state2->nbytes; | ||||
| 		state1->nitems += state2->nitems; | ||||
|  | ||||
| 		state1->dims[0] += state2->dims[0]; | ||||
| 		/* remaing dims already match, per test above */ | ||||
|  | ||||
| 		Assert(state1->array_type == state2->array_type); | ||||
| 		Assert(state1->element_type == state2->element_type); | ||||
|  | ||||
| 		MemoryContextSwitchTo(oldContext); | ||||
| 	} | ||||
|  | ||||
| 	PG_RETURN_POINTER(state1); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * array_agg_array_serialize | ||||
|  *		Serialize ArrayBuildStateArr into bytea. | ||||
|  */ | ||||
| Datum | ||||
| array_agg_array_serialize(PG_FUNCTION_ARGS) | ||||
| { | ||||
| 	ArrayBuildStateArr *state; | ||||
| 	StringInfoData buf; | ||||
| 	bytea	   *result; | ||||
|  | ||||
| 	/* cannot be called directly because of internal-type argument */ | ||||
| 	Assert(AggCheckCallContext(fcinfo, NULL)); | ||||
|  | ||||
| 	state = (ArrayBuildStateArr *) PG_GETARG_POINTER(0); | ||||
|  | ||||
| 	pq_begintypsend(&buf); | ||||
|  | ||||
| 	/* | ||||
| 	 * element_type. Putting this first is more convenient in deserialization | ||||
| 	 * so that we can init the new state sooner. | ||||
| 	 */ | ||||
| 	pq_sendint32(&buf, state->element_type); | ||||
|  | ||||
| 	/* array_type */ | ||||
| 	pq_sendint32(&buf, state->array_type); | ||||
|  | ||||
| 	/* nbytes */ | ||||
| 	pq_sendint32(&buf, state->nbytes); | ||||
|  | ||||
| 	/* data */ | ||||
| 	pq_sendbytes(&buf, state->data, state->nbytes); | ||||
|  | ||||
| 	/* abytes */ | ||||
| 	pq_sendint32(&buf, state->abytes); | ||||
|  | ||||
| 	/* aitems */ | ||||
| 	pq_sendint32(&buf, state->aitems); | ||||
|  | ||||
| 	/* nullbitmap */ | ||||
| 	if (state->nullbitmap) | ||||
| 	{ | ||||
| 		Assert(state->aitems > 0); | ||||
| 		pq_sendbytes(&buf, (char *) state->nullbitmap, (state->aitems + 7) / 8); | ||||
| 	} | ||||
|  | ||||
| 	/* nitems */ | ||||
| 	pq_sendint32(&buf, state->nitems); | ||||
|  | ||||
| 	/* ndims */ | ||||
| 	pq_sendint32(&buf, state->ndims); | ||||
|  | ||||
| 	/* dims: XXX should we just send ndims elements? */ | ||||
| 	pq_sendbytes(&buf, (char *) state->dims, sizeof(state->dims)); | ||||
|  | ||||
| 	/* lbs */ | ||||
| 	pq_sendbytes(&buf, (char *) state->lbs, sizeof(state->lbs)); | ||||
|  | ||||
| 	result = pq_endtypsend(&buf); | ||||
|  | ||||
| 	PG_RETURN_BYTEA_P(result); | ||||
| } | ||||
|  | ||||
| Datum | ||||
| array_agg_array_deserialize(PG_FUNCTION_ARGS) | ||||
| { | ||||
| 	bytea	   *sstate; | ||||
| 	ArrayBuildStateArr *result; | ||||
| 	StringInfoData buf; | ||||
| 	Oid			element_type; | ||||
| 	Oid			array_type; | ||||
| 	int			nbytes; | ||||
| 	const char *temp; | ||||
|  | ||||
| 	/* cannot be called directly because of internal-type argument */ | ||||
| 	Assert(AggCheckCallContext(fcinfo, NULL)); | ||||
|  | ||||
| 	sstate = PG_GETARG_BYTEA_PP(0); | ||||
|  | ||||
| 	/* | ||||
| 	 * Copy the bytea into a StringInfo so that we can "receive" it using the | ||||
| 	 * standard recv-function infrastructure. | ||||
| 	 */ | ||||
| 	initStringInfo(&buf); | ||||
| 	appendBinaryStringInfo(&buf, | ||||
| 						   VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); | ||||
|  | ||||
| 	/* element_type */ | ||||
| 	element_type = pq_getmsgint(&buf, 4); | ||||
|  | ||||
| 	/* array_type */ | ||||
| 	array_type = pq_getmsgint(&buf, 4); | ||||
|  | ||||
| 	/* nbytes */ | ||||
| 	nbytes = pq_getmsgint(&buf, 4); | ||||
|  | ||||
| 	result = initArrayResultArr(array_type, element_type, | ||||
| 								CurrentMemoryContext, false); | ||||
|  | ||||
| 	result->abytes = 1024; | ||||
| 	while (result->abytes < nbytes) | ||||
| 		result->abytes *= 2; | ||||
|  | ||||
| 	result->data = (char *) palloc(result->abytes); | ||||
|  | ||||
| 	/* data */ | ||||
| 	temp = pq_getmsgbytes(&buf, nbytes); | ||||
| 	memcpy(result->data, temp, nbytes); | ||||
| 	result->nbytes = nbytes; | ||||
|  | ||||
| 	/* abytes */ | ||||
| 	result->abytes = pq_getmsgint(&buf, 4); | ||||
|  | ||||
| 	/* aitems: might be 0 */ | ||||
| 	result->aitems = pq_getmsgint(&buf, 4); | ||||
|  | ||||
| 	/* nullbitmap */ | ||||
| 	if (result->aitems > 0) | ||||
| 	{ | ||||
| 		int			size = (result->aitems + 7) / 8; | ||||
|  | ||||
| 		result->nullbitmap = (bits8 *) palloc(size); | ||||
| 		temp = pq_getmsgbytes(&buf, size); | ||||
| 		memcpy(result->nullbitmap, temp, size); | ||||
| 	} | ||||
| 	else | ||||
| 		result->nullbitmap = NULL; | ||||
|  | ||||
| 	/* nitems */ | ||||
| 	result->nitems = pq_getmsgint(&buf, 4); | ||||
|  | ||||
| 	/* ndims */ | ||||
| 	result->ndims = pq_getmsgint(&buf, 4); | ||||
|  | ||||
| 	/* dims */ | ||||
| 	temp = pq_getmsgbytes(&buf, sizeof(result->dims)); | ||||
| 	memcpy(result->dims, temp, sizeof(result->dims)); | ||||
|  | ||||
| 	/* lbs */ | ||||
| 	temp = pq_getmsgbytes(&buf, sizeof(result->lbs)); | ||||
| 	memcpy(result->lbs, temp, sizeof(result->lbs)); | ||||
|  | ||||
| 	pq_getmsgend(&buf); | ||||
| 	pfree(buf.data); | ||||
|  | ||||
| 	PG_RETURN_POINTER(result); | ||||
| } | ||||
|  | ||||
| Datum | ||||
| array_agg_array_finalfn(PG_FUNCTION_ARGS) | ||||
| { | ||||
|   | ||||
| @@ -5262,6 +5262,24 @@ array_insert_slice(ArrayType *destArray, | ||||
|  */ | ||||
| ArrayBuildState * | ||||
| initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext) | ||||
| { | ||||
| 	/* | ||||
| 	 * When using a subcontext, we can afford to start with a somewhat larger | ||||
| 	 * initial array size.  Without subcontexts, we'd better hope that most of | ||||
| 	 * the states stay small ... | ||||
| 	 */ | ||||
| 	return initArrayResultWithSize(element_type, rcontext, subcontext, | ||||
| 								   subcontext ? 64 : 8); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * initArrayResultWithSize | ||||
|  *		As initArrayResult, but allow the initial size of the allocated arrays | ||||
|  *		to be specified. | ||||
|  */ | ||||
| ArrayBuildState * | ||||
| initArrayResultWithSize(Oid element_type, MemoryContext rcontext, | ||||
| 						bool subcontext, int initsize) | ||||
| { | ||||
| 	ArrayBuildState *astate; | ||||
| 	MemoryContext arr_context = rcontext; | ||||
| @@ -5276,7 +5294,7 @@ initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext) | ||||
| 		MemoryContextAlloc(arr_context, sizeof(ArrayBuildState)); | ||||
| 	astate->mcontext = arr_context; | ||||
| 	astate->private_cxt = subcontext; | ||||
| 	astate->alen = (subcontext ? 64 : 8);	/* arbitrary starting array size */ | ||||
| 	astate->alen = initsize; | ||||
| 	astate->dvalues = (Datum *) | ||||
| 		MemoryContextAlloc(arr_context, astate->alen * sizeof(Datum)); | ||||
| 	astate->dnulls = (bool *) | ||||
|   | ||||
| @@ -506,29 +506,50 @@ bytea_string_agg_transfn(PG_FUNCTION_ARGS) | ||||
|  | ||||
| 	state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); | ||||
|  | ||||
| 	/* Append the value unless null. */ | ||||
| 	/* Append the value unless null, preceding it with the delimiter. */ | ||||
| 	if (!PG_ARGISNULL(1)) | ||||
| 	{ | ||||
| 		bytea	   *value = PG_GETARG_BYTEA_PP(1); | ||||
| 		bool		isfirst = false; | ||||
|  | ||||
| 		/* On the first time through, we ignore the delimiter. */ | ||||
| 		/* | ||||
| 		 * You might think we can just throw away the first delimiter, however | ||||
| 		 * we must keep it as we may be a parallel worker doing partial | ||||
| 		 * aggregation building a state to send to the main process.  We need | ||||
| 		 * to keep the delimiter of every aggregation so that the combine | ||||
| 		 * function can properly join up the strings of two separately | ||||
| 		 * partially aggregated results.  The first delimiter is only stripped | ||||
| 		 * off in the final function.  To know how much to strip off the front | ||||
| 		 * of the string, we store the length of the first delimiter in the | ||||
| 		 * StringInfo's cursor field, which we don't otherwise need here. | ||||
| 		 */ | ||||
| 		if (state == NULL) | ||||
| 		{ | ||||
| 			state = makeStringAggState(fcinfo); | ||||
| 		else if (!PG_ARGISNULL(2)) | ||||
| 			isfirst = true; | ||||
| 		} | ||||
|  | ||||
| 		if (!PG_ARGISNULL(2)) | ||||
| 		{ | ||||
| 			bytea	   *delim = PG_GETARG_BYTEA_PP(2); | ||||
|  | ||||
| 			appendBinaryStringInfo(state, VARDATA_ANY(delim), VARSIZE_ANY_EXHDR(delim)); | ||||
| 			appendBinaryStringInfo(state, VARDATA_ANY(delim), | ||||
| 								   VARSIZE_ANY_EXHDR(delim)); | ||||
| 			if (isfirst) | ||||
| 				state->cursor = VARSIZE_ANY_EXHDR(delim); | ||||
| 		} | ||||
|  | ||||
| 		appendBinaryStringInfo(state, VARDATA_ANY(value), VARSIZE_ANY_EXHDR(value)); | ||||
| 		appendBinaryStringInfo(state, VARDATA_ANY(value), | ||||
| 							   VARSIZE_ANY_EXHDR(value)); | ||||
| 	} | ||||
|  | ||||
| 	/* | ||||
| 	 * The transition type for string_agg() is declared to be "internal", | ||||
| 	 * which is a pass-by-value type the same size as a pointer. | ||||
| 	 */ | ||||
| 	PG_RETURN_POINTER(state); | ||||
| 	if (state) | ||||
| 		PG_RETURN_POINTER(state); | ||||
| 	PG_RETURN_NULL(); | ||||
| } | ||||
|  | ||||
| Datum | ||||
| @@ -543,11 +564,13 @@ bytea_string_agg_finalfn(PG_FUNCTION_ARGS) | ||||
|  | ||||
| 	if (state != NULL) | ||||
| 	{ | ||||
| 		/* As per comment in transfn, strip data before the cursor position */ | ||||
| 		bytea	   *result; | ||||
| 		int			strippedlen = state->len - state->cursor; | ||||
|  | ||||
| 		result = (bytea *) palloc(state->len + VARHDRSZ); | ||||
| 		SET_VARSIZE(result, state->len + VARHDRSZ); | ||||
| 		memcpy(VARDATA(result), state->data, state->len); | ||||
| 		result = (bytea *) palloc(strippedlen + VARHDRSZ); | ||||
| 		SET_VARSIZE(result, strippedlen + VARHDRSZ); | ||||
| 		memcpy(VARDATA(result), &state->data[state->cursor], strippedlen); | ||||
| 		PG_RETURN_BYTEA_P(result); | ||||
| 	} | ||||
| 	else | ||||
| @@ -5372,23 +5395,171 @@ string_agg_transfn(PG_FUNCTION_ARGS) | ||||
|  | ||||
| 	state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); | ||||
|  | ||||
| 	/* Append the value unless null. */ | ||||
| 	/* Append the value unless null, preceding it with the delimiter. */ | ||||
| 	if (!PG_ARGISNULL(1)) | ||||
| 	{ | ||||
| 		/* On the first time through, we ignore the delimiter. */ | ||||
| 		if (state == NULL) | ||||
| 			state = makeStringAggState(fcinfo); | ||||
| 		else if (!PG_ARGISNULL(2)) | ||||
| 			appendStringInfoText(state, PG_GETARG_TEXT_PP(2));	/* delimiter */ | ||||
| 		text	   *value = PG_GETARG_TEXT_PP(1); | ||||
| 		bool		isfirst = false; | ||||
|  | ||||
| 		appendStringInfoText(state, PG_GETARG_TEXT_PP(1));	/* value */ | ||||
| 		/* | ||||
| 		 * You might think we can just throw away the first delimiter, however | ||||
| 		 * we must keep it as we may be a parallel worker doing partial | ||||
| 		 * aggregation building a state to send to the main process.  We need | ||||
| 		 * to keep the delimiter of every aggregation so that the combine | ||||
| 		 * function can properly join up the strings of two separately | ||||
| 		 * partially aggregated results.  The first delimiter is only stripped | ||||
| 		 * off in the final function.  To know how much to strip off the front | ||||
| 		 * of the string, we store the length of the first delimiter in the | ||||
| 		 * StringInfo's cursor field, which we don't otherwise need here. | ||||
| 		 */ | ||||
| 		if (state == NULL) | ||||
| 		{ | ||||
| 			state = makeStringAggState(fcinfo); | ||||
| 			isfirst = true; | ||||
| 		} | ||||
|  | ||||
| 		if (!PG_ARGISNULL(2)) | ||||
| 		{ | ||||
| 			text	   *delim = PG_GETARG_TEXT_PP(2); | ||||
|  | ||||
| 			appendStringInfoText(state, delim); | ||||
| 			if (isfirst) | ||||
| 				state->cursor = VARSIZE_ANY_EXHDR(delim); | ||||
| 		} | ||||
|  | ||||
| 		appendStringInfoText(state, value); | ||||
| 	} | ||||
|  | ||||
| 	/* | ||||
| 	 * The transition type for string_agg() is declared to be "internal", | ||||
| 	 * which is a pass-by-value type the same size as a pointer. | ||||
| 	 */ | ||||
| 	PG_RETURN_POINTER(state); | ||||
| 	if (state) | ||||
| 		PG_RETURN_POINTER(state); | ||||
| 	PG_RETURN_NULL(); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * string_agg_combine | ||||
|  *		Aggregate combine function for string_agg(text) and string_agg(bytea) | ||||
|  */ | ||||
| Datum | ||||
| string_agg_combine(PG_FUNCTION_ARGS) | ||||
| { | ||||
| 	StringInfo	state1; | ||||
| 	StringInfo	state2; | ||||
| 	MemoryContext agg_context; | ||||
|  | ||||
| 	if (!AggCheckCallContext(fcinfo, &agg_context)) | ||||
| 		elog(ERROR, "aggregate function called in non-aggregate context"); | ||||
|  | ||||
| 	state1 = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); | ||||
| 	state2 = PG_ARGISNULL(1) ? NULL : (StringInfo) PG_GETARG_POINTER(1); | ||||
|  | ||||
| 	if (state2 == NULL) | ||||
| 	{ | ||||
| 		/* | ||||
| 		 * NULL state2 is easy, just return state1, which we know is already | ||||
| 		 * in the agg_context | ||||
| 		 */ | ||||
| 		if (state1 == NULL) | ||||
| 			PG_RETURN_NULL(); | ||||
| 		PG_RETURN_POINTER(state1); | ||||
| 	} | ||||
|  | ||||
| 	if (state1 == NULL) | ||||
| 	{ | ||||
| 		/* We must copy state2's data into the agg_context */ | ||||
| 		MemoryContext old_context; | ||||
|  | ||||
| 		old_context = MemoryContextSwitchTo(agg_context); | ||||
| 		state1 = makeStringAggState(fcinfo); | ||||
| 		appendBinaryStringInfo(state1, state2->data, state2->len); | ||||
| 		state1->cursor = state2->cursor; | ||||
| 		MemoryContextSwitchTo(old_context); | ||||
| 	} | ||||
| 	else if (state2->len > 0) | ||||
| 	{ | ||||
| 		/* Combine ... state1->cursor does not change in this case */ | ||||
| 		appendBinaryStringInfo(state1, state2->data, state2->len); | ||||
| 	} | ||||
|  | ||||
| 	PG_RETURN_POINTER(state1); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * string_agg_serialize | ||||
|  *		Aggregate serialize function for string_agg(text) and string_agg(bytea) | ||||
|  * | ||||
|  * This is strict, so we need not handle NULL input | ||||
|  */ | ||||
| Datum | ||||
| string_agg_serialize(PG_FUNCTION_ARGS) | ||||
| { | ||||
| 	StringInfo	state; | ||||
| 	StringInfoData buf; | ||||
| 	bytea	   *result; | ||||
|  | ||||
| 	/* cannot be called directly because of internal-type argument */ | ||||
| 	Assert(AggCheckCallContext(fcinfo, NULL)); | ||||
|  | ||||
| 	state = (StringInfo) PG_GETARG_POINTER(0); | ||||
|  | ||||
| 	pq_begintypsend(&buf); | ||||
|  | ||||
| 	/* cursor */ | ||||
| 	pq_sendint(&buf, state->cursor, 4); | ||||
|  | ||||
| 	/* data */ | ||||
| 	pq_sendbytes(&buf, state->data, state->len); | ||||
|  | ||||
| 	result = pq_endtypsend(&buf); | ||||
|  | ||||
| 	PG_RETURN_BYTEA_P(result); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * string_agg_deserialize | ||||
|  *		Aggregate deserial function for string_agg(text) and string_agg(bytea) | ||||
|  * | ||||
|  * This is strict, so we need not handle NULL input | ||||
|  */ | ||||
| Datum | ||||
| string_agg_deserialize(PG_FUNCTION_ARGS) | ||||
| { | ||||
| 	bytea	   *sstate; | ||||
| 	StringInfo	result; | ||||
| 	StringInfoData buf; | ||||
| 	char	   *data; | ||||
| 	int			datalen; | ||||
|  | ||||
| 	/* cannot be called directly because of internal-type argument */ | ||||
| 	Assert(AggCheckCallContext(fcinfo, NULL)); | ||||
|  | ||||
| 	sstate = PG_GETARG_BYTEA_PP(0); | ||||
|  | ||||
| 	/* | ||||
| 	 * Copy the bytea into a StringInfo so that we can "receive" it using the | ||||
| 	 * standard recv-function infrastructure. | ||||
| 	 */ | ||||
| 	initStringInfo(&buf); | ||||
| 	appendBinaryStringInfo(&buf, | ||||
| 						   VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); | ||||
|  | ||||
| 	result = makeStringAggState(fcinfo); | ||||
|  | ||||
| 	/* cursor */ | ||||
| 	result->cursor = pq_getmsgint(&buf, 4); | ||||
|  | ||||
| 	/* data */ | ||||
| 	datalen = VARSIZE_ANY_EXHDR(sstate) - 4; | ||||
| 	data = (char *) pq_getmsgbytes(&buf, datalen); | ||||
| 	appendBinaryStringInfo(result, data, datalen); | ||||
|  | ||||
| 	pq_getmsgend(&buf); | ||||
| 	pfree(buf.data); | ||||
|  | ||||
| 	PG_RETURN_POINTER(result); | ||||
| } | ||||
|  | ||||
| Datum | ||||
| @@ -5402,7 +5573,11 @@ string_agg_finalfn(PG_FUNCTION_ARGS) | ||||
| 	state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); | ||||
|  | ||||
| 	if (state != NULL) | ||||
| 		PG_RETURN_TEXT_P(cstring_to_text_with_len(state->data, state->len)); | ||||
| 	{ | ||||
| 		/* As per comment in transfn, strip data before the cursor position */ | ||||
| 		PG_RETURN_TEXT_P(cstring_to_text_with_len(&state->data[state->cursor], | ||||
| 												  state->len - state->cursor)); | ||||
| 	} | ||||
| 	else | ||||
| 		PG_RETURN_NULL(); | ||||
| } | ||||
|   | ||||
| @@ -57,6 +57,6 @@ | ||||
|  */ | ||||
|  | ||||
| /*							yyyymmddN */ | ||||
| #define CATALOG_VERSION_NO	202301201 | ||||
| #define CATALOG_VERSION_NO	202301231 | ||||
|  | ||||
| #endif | ||||
|   | ||||
| @@ -537,19 +537,28 @@ | ||||
|  | ||||
| # array | ||||
| { aggfnoid => 'array_agg(anynonarray)', aggtransfn => 'array_agg_transfn', | ||||
|   aggfinalfn => 'array_agg_finalfn', aggfinalextra => 't', | ||||
|   aggtranstype => 'internal' }, | ||||
|   aggcombinefn => 'array_agg_combine', aggserialfn => 'array_agg_serialize', | ||||
|   aggdeserialfn => 'array_agg_deserialize', aggfinalfn => 'array_agg_finalfn', | ||||
|   aggfinalextra => 't', aggtranstype => 'internal' }, | ||||
| { aggfnoid => 'array_agg(anyarray)', aggtransfn => 'array_agg_array_transfn', | ||||
|   aggcombinefn => 'array_agg_array_combine', | ||||
|   aggserialfn => 'array_agg_array_serialize', | ||||
|   aggdeserialfn => 'array_agg_array_deserialize', | ||||
|   aggfinalfn => 'array_agg_array_finalfn', aggfinalextra => 't', | ||||
|   aggtranstype => 'internal' }, | ||||
|  | ||||
| # text | ||||
| { aggfnoid => 'string_agg(text,text)', aggtransfn => 'string_agg_transfn', | ||||
|   aggcombinefn => 'string_agg_combine', aggserialfn => 'string_agg_serialize', | ||||
|   aggdeserialfn => 'string_agg_deserialize', | ||||
|   aggfinalfn => 'string_agg_finalfn', aggtranstype => 'internal' }, | ||||
|  | ||||
| # bytea | ||||
| { aggfnoid => 'string_agg(bytea,bytea)', | ||||
|   aggtransfn => 'bytea_string_agg_transfn', | ||||
|   aggcombinefn => 'string_agg_combine', | ||||
|   aggserialfn => 'string_agg_serialize', | ||||
|   aggdeserialfn => 'string_agg_deserialize', | ||||
|   aggfinalfn => 'bytea_string_agg_finalfn', aggtranstype => 'internal' }, | ||||
|  | ||||
| # range | ||||
|   | ||||
| @@ -1672,6 +1672,15 @@ | ||||
| { oid => '2333', descr => 'aggregate transition function', | ||||
|   proname => 'array_agg_transfn', proisstrict => 'f', prorettype => 'internal', | ||||
|   proargtypes => 'internal anynonarray', prosrc => 'array_agg_transfn' }, | ||||
| { oid => '9328', descr => 'aggregate combine function', | ||||
|   proname => 'array_agg_combine', proisstrict => 'f', prorettype => 'internal', | ||||
|   proargtypes => 'internal internal', prosrc => 'array_agg_combine' }, | ||||
| { oid => '9329', descr => 'aggregate serial function', | ||||
|   proname => 'array_agg_serialize', prorettype => 'bytea', | ||||
|   proargtypes => 'internal', prosrc => 'array_agg_serialize' }, | ||||
| { oid => '9330', descr => 'aggregate deserial function', | ||||
|   proname => 'array_agg_deserialize', prorettype => 'internal', | ||||
|   proargtypes => 'bytea internal', prosrc => 'array_agg_deserialize' }, | ||||
| { oid => '2334', descr => 'aggregate final function', | ||||
|   proname => 'array_agg_finalfn', proisstrict => 'f', prorettype => 'anyarray', | ||||
|   proargtypes => 'internal anynonarray', prosrc => 'array_agg_finalfn' }, | ||||
| @@ -1683,6 +1692,15 @@ | ||||
|   proname => 'array_agg_array_transfn', proisstrict => 'f', | ||||
|   prorettype => 'internal', proargtypes => 'internal anyarray', | ||||
|   prosrc => 'array_agg_array_transfn' }, | ||||
| { oid => '9331', descr => 'aggregate combine function', | ||||
|   proname => 'array_agg_array_combine', proisstrict => 'f', prorettype => 'internal', | ||||
|   proargtypes => 'internal internal', prosrc => 'array_agg_array_combine' }, | ||||
| { oid => '9332', descr => 'aggregate serial function', | ||||
|   proname => 'array_agg_array_serialize', prorettype => 'bytea', | ||||
|   proargtypes => 'internal', prosrc => 'array_agg_array_serialize' }, | ||||
| { oid => '9333', descr => 'aggregate deserial function', | ||||
|   proname => 'array_agg_array_deserialize', prorettype => 'internal', | ||||
|   proargtypes => 'bytea internal', prosrc => 'array_agg_array_deserialize' }, | ||||
| { oid => '4052', descr => 'aggregate final function', | ||||
|   proname => 'array_agg_array_finalfn', proisstrict => 'f', | ||||
|   prorettype => 'anyarray', proargtypes => 'internal anyarray', | ||||
| @@ -4955,6 +4973,15 @@ | ||||
| { oid => '3535', descr => 'aggregate transition function', | ||||
|   proname => 'string_agg_transfn', proisstrict => 'f', prorettype => 'internal', | ||||
|   proargtypes => 'internal text text', prosrc => 'string_agg_transfn' }, | ||||
| { oid => '9334', descr => 'aggregate combine function', | ||||
|   proname => 'string_agg_combine', proisstrict => 'f', prorettype => 'internal', | ||||
|   proargtypes => 'internal internal', prosrc => 'string_agg_combine' }, | ||||
| { oid => '9335', descr => 'aggregate serial function', | ||||
|   proname => 'string_agg_serialize', prorettype => 'bytea', | ||||
|   proargtypes => 'internal', prosrc => 'string_agg_serialize' }, | ||||
| { oid => '9336', descr => 'aggregate deserial function', | ||||
|   proname => 'string_agg_deserialize', prorettype => 'internal', | ||||
|   proargtypes => 'bytea internal', prosrc => 'string_agg_deserialize' }, | ||||
| { oid => '3536', descr => 'aggregate final function', | ||||
|   proname => 'string_agg_finalfn', proisstrict => 'f', prorettype => 'text', | ||||
|   proargtypes => 'internal', prosrc => 'string_agg_finalfn' }, | ||||
|   | ||||
| @@ -35,6 +35,8 @@ extern Oid	resolve_aggregate_transtype(Oid aggfuncid, | ||||
| 										Oid *inputTypes, | ||||
| 										int numArguments); | ||||
|  | ||||
| extern bool agg_args_support_sendreceive(Aggref *aggref); | ||||
|  | ||||
| extern void build_aggregate_transfn_expr(Oid *agg_input_types, | ||||
| 										 int agg_num_inputs, | ||||
| 										 int agg_num_direct_inputs, | ||||
|   | ||||
| @@ -409,6 +409,9 @@ extern bool array_contains_nulls(ArrayType *array); | ||||
|  | ||||
| extern ArrayBuildState *initArrayResult(Oid element_type, | ||||
| 										MemoryContext rcontext, bool subcontext); | ||||
| extern ArrayBuildState *initArrayResultWithSize(Oid element_type, | ||||
| 												MemoryContext rcontext, | ||||
| 												bool subcontext, int initsize); | ||||
| extern ArrayBuildState *accumArrayResult(ArrayBuildState *astate, | ||||
| 										 Datum dvalue, bool disnull, | ||||
| 										 Oid element_type, | ||||
|   | ||||
| @@ -1862,6 +1862,104 @@ select string_agg(v, decode('ee', 'hex')) from bytea_test_table; | ||||
| (1 row) | ||||
|  | ||||
| drop table bytea_test_table; | ||||
| -- Test parallel string_agg and array_agg | ||||
| create table pagg_test (x int, y int); | ||||
| insert into pagg_test | ||||
| select (case x % 4 when 1 then null else x end), x % 10 | ||||
| from generate_series(1,5000) x; | ||||
| set parallel_setup_cost TO 0; | ||||
| set parallel_tuple_cost TO 0; | ||||
| set parallel_leader_participation TO 0; | ||||
| set min_parallel_table_scan_size = 0; | ||||
| set bytea_output = 'escape'; | ||||
| -- create a view as we otherwise have to repeat this query a few times. | ||||
| create view v_pagg_test AS | ||||
| select | ||||
| 	y, | ||||
| 	min(t) AS tmin,max(t) AS tmax,count(distinct t) AS tndistinct, | ||||
| 	min(b) AS bmin,max(b) AS bmax,count(distinct b) AS bndistinct, | ||||
| 	min(a) AS amin,max(a) AS amax,count(distinct a) AS andistinct, | ||||
| 	min(aa) AS aamin,max(aa) AS aamax,count(distinct aa) AS aandistinct | ||||
| from ( | ||||
| 	select | ||||
| 		y, | ||||
| 		unnest(regexp_split_to_array(a1.t, ','))::int AS t, | ||||
| 		unnest(regexp_split_to_array(a1.b::text, ',')) AS b, | ||||
| 		unnest(a1.a) AS a, | ||||
| 		unnest(a1.aa) AS aa | ||||
| 	from ( | ||||
| 		select | ||||
| 			y, | ||||
| 			string_agg(x::text, ',') AS t, | ||||
| 			string_agg(x::text::bytea, ',') AS b, | ||||
| 			array_agg(x) AS a, | ||||
| 			array_agg(ARRAY[x]) AS aa | ||||
| 		from pagg_test | ||||
| 		group by y | ||||
| 	) a1 | ||||
| ) a2 | ||||
| group by y; | ||||
| -- Ensure results are correct. | ||||
| select * from v_pagg_test order by y; | ||||
|  y | tmin | tmax | tndistinct | bmin | bmax | bndistinct | amin | amax | andistinct | aamin | aamax | aandistinct  | ||||
| ---+------+------+------------+------+------+------------+------+------+------------+-------+-------+------------- | ||||
|  0 |   10 | 5000 |        500 | 10   | 990  |        500 |   10 | 5000 |        500 |    10 |  5000 |         500 | ||||
|  1 |   11 | 4991 |        250 | 1011 | 991  |        250 |   11 | 4991 |        250 |    11 |  4991 |         250 | ||||
|  2 |    2 | 4992 |        500 | 1002 | 992  |        500 |    2 | 4992 |        500 |     2 |  4992 |         500 | ||||
|  3 |    3 | 4983 |        250 | 1003 | 983  |        250 |    3 | 4983 |        250 |     3 |  4983 |         250 | ||||
|  4 |    4 | 4994 |        500 | 1004 | 994  |        500 |    4 | 4994 |        500 |     4 |  4994 |         500 | ||||
|  5 |   15 | 4995 |        250 | 1015 | 995  |        250 |   15 | 4995 |        250 |    15 |  4995 |         250 | ||||
|  6 |    6 | 4996 |        500 | 1006 | 996  |        500 |    6 | 4996 |        500 |     6 |  4996 |         500 | ||||
|  7 |    7 | 4987 |        250 | 1007 | 987  |        250 |    7 | 4987 |        250 |     7 |  4987 |         250 | ||||
|  8 |    8 | 4998 |        500 | 1008 | 998  |        500 |    8 | 4998 |        500 |     8 |  4998 |         500 | ||||
|  9 |   19 | 4999 |        250 | 1019 | 999  |        250 |   19 | 4999 |        250 |    19 |  4999 |         250 | ||||
| (10 rows) | ||||
|  | ||||
| -- Ensure parallel aggregation is actually being used. | ||||
| explain (costs off) select * from v_pagg_test order by y; | ||||
|                                                               QUERY PLAN                                                               | ||||
| -------------------------------------------------------------------------------------------------------------------------------------- | ||||
|  GroupAggregate | ||||
|    Group Key: pagg_test.y | ||||
|    ->  Sort | ||||
|          Sort Key: pagg_test.y, (((unnest(regexp_split_to_array((string_agg((pagg_test.x)::text, ','::text)), ','::text))))::integer) | ||||
|          ->  Result | ||||
|                ->  ProjectSet | ||||
|                      ->  Finalize HashAggregate | ||||
|                            Group Key: pagg_test.y | ||||
|                            ->  Gather | ||||
|                                  Workers Planned: 2 | ||||
|                                  ->  Partial HashAggregate | ||||
|                                        Group Key: pagg_test.y | ||||
|                                        ->  Parallel Seq Scan on pagg_test | ||||
| (13 rows) | ||||
|  | ||||
| set max_parallel_workers_per_gather = 0; | ||||
| -- Ensure results are the same without parallel aggregation. | ||||
| select * from v_pagg_test order by y; | ||||
|  y | tmin | tmax | tndistinct | bmin | bmax | bndistinct | amin | amax | andistinct | aamin | aamax | aandistinct  | ||||
| ---+------+------+------------+------+------+------------+------+------+------------+-------+-------+------------- | ||||
|  0 |   10 | 5000 |        500 | 10   | 990  |        500 |   10 | 5000 |        500 |    10 |  5000 |         500 | ||||
|  1 |   11 | 4991 |        250 | 1011 | 991  |        250 |   11 | 4991 |        250 |    11 |  4991 |         250 | ||||
|  2 |    2 | 4992 |        500 | 1002 | 992  |        500 |    2 | 4992 |        500 |     2 |  4992 |         500 | ||||
|  3 |    3 | 4983 |        250 | 1003 | 983  |        250 |    3 | 4983 |        250 |     3 |  4983 |         250 | ||||
|  4 |    4 | 4994 |        500 | 1004 | 994  |        500 |    4 | 4994 |        500 |     4 |  4994 |         500 | ||||
|  5 |   15 | 4995 |        250 | 1015 | 995  |        250 |   15 | 4995 |        250 |    15 |  4995 |         250 | ||||
|  6 |    6 | 4996 |        500 | 1006 | 996  |        500 |    6 | 4996 |        500 |     6 |  4996 |         500 | ||||
|  7 |    7 | 4987 |        250 | 1007 | 987  |        250 |    7 | 4987 |        250 |     7 |  4987 |         250 | ||||
|  8 |    8 | 4998 |        500 | 1008 | 998  |        500 |    8 | 4998 |        500 |     8 |  4998 |         500 | ||||
|  9 |   19 | 4999 |        250 | 1019 | 999  |        250 |   19 | 4999 |        250 |    19 |  4999 |         250 | ||||
| (10 rows) | ||||
|  | ||||
| -- Clean up | ||||
| reset max_parallel_workers_per_gather; | ||||
| reset bytea_output; | ||||
| reset min_parallel_table_scan_size; | ||||
| reset parallel_leader_participation; | ||||
| reset parallel_tuple_cost; | ||||
| reset parallel_setup_cost; | ||||
| drop view v_pagg_test; | ||||
| drop table pagg_test; | ||||
| -- FILTER tests | ||||
| select min(unique1) filter (where unique1 > 100) from tenk1; | ||||
|  min  | ||||
|   | ||||
| @@ -717,6 +717,68 @@ select string_agg(v, decode('ee', 'hex')) from bytea_test_table; | ||||
|  | ||||
| drop table bytea_test_table; | ||||
|  | ||||
| -- Test parallel string_agg and array_agg | ||||
| create table pagg_test (x int, y int); | ||||
| insert into pagg_test | ||||
| select (case x % 4 when 1 then null else x end), x % 10 | ||||
| from generate_series(1,5000) x; | ||||
|  | ||||
| set parallel_setup_cost TO 0; | ||||
| set parallel_tuple_cost TO 0; | ||||
| set parallel_leader_participation TO 0; | ||||
| set min_parallel_table_scan_size = 0; | ||||
| set bytea_output = 'escape'; | ||||
|  | ||||
| -- create a view as we otherwise have to repeat this query a few times. | ||||
| create view v_pagg_test AS | ||||
| select | ||||
| 	y, | ||||
| 	min(t) AS tmin,max(t) AS tmax,count(distinct t) AS tndistinct, | ||||
| 	min(b) AS bmin,max(b) AS bmax,count(distinct b) AS bndistinct, | ||||
| 	min(a) AS amin,max(a) AS amax,count(distinct a) AS andistinct, | ||||
| 	min(aa) AS aamin,max(aa) AS aamax,count(distinct aa) AS aandistinct | ||||
| from ( | ||||
| 	select | ||||
| 		y, | ||||
| 		unnest(regexp_split_to_array(a1.t, ','))::int AS t, | ||||
| 		unnest(regexp_split_to_array(a1.b::text, ',')) AS b, | ||||
| 		unnest(a1.a) AS a, | ||||
| 		unnest(a1.aa) AS aa | ||||
| 	from ( | ||||
| 		select | ||||
| 			y, | ||||
| 			string_agg(x::text, ',') AS t, | ||||
| 			string_agg(x::text::bytea, ',') AS b, | ||||
| 			array_agg(x) AS a, | ||||
| 			array_agg(ARRAY[x]) AS aa | ||||
| 		from pagg_test | ||||
| 		group by y | ||||
| 	) a1 | ||||
| ) a2 | ||||
| group by y; | ||||
|  | ||||
| -- Ensure results are correct. | ||||
| select * from v_pagg_test order by y; | ||||
|  | ||||
| -- Ensure parallel aggregation is actually being used. | ||||
| explain (costs off) select * from v_pagg_test order by y; | ||||
|  | ||||
| set max_parallel_workers_per_gather = 0; | ||||
|  | ||||
| -- Ensure results are the same without parallel aggregation. | ||||
| select * from v_pagg_test order by y; | ||||
|  | ||||
| -- Clean up | ||||
| reset max_parallel_workers_per_gather; | ||||
| reset bytea_output; | ||||
| reset min_parallel_table_scan_size; | ||||
| reset parallel_leader_participation; | ||||
| reset parallel_tuple_cost; | ||||
| reset parallel_setup_cost; | ||||
|  | ||||
| drop view v_pagg_test; | ||||
| drop table pagg_test; | ||||
|  | ||||
| -- FILTER tests | ||||
|  | ||||
| select min(unique1) filter (where unique1 > 100) from tenk1; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user