1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-29 10:41:53 +03:00

Implement array_send/array_recv (binary I/O for arrays). This exposed

the folly of not passing element type to typsend/typreceive, so fix that.
This commit is contained in:
Tom Lane
2003-05-09 23:01:45 +00:00
parent b1ee615a7f
commit ba1e066e46
7 changed files with 316 additions and 35 deletions

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/utils/adt/arrayfuncs.c,v 1.88 2003/05/08 22:19:56 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/utils/adt/arrayfuncs.c,v 1.89 2003/05/09 23:01:45 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -19,6 +19,7 @@
#include "access/tupmacs.h"
#include "catalog/catalog.h"
#include "catalog/pg_type.h"
#include "libpq/pqformat.h"
#include "parser/parse_coerce.h"
#include "utils/array.h"
#include "utils/builtins.h"
@ -69,6 +70,15 @@
#define RETURN_NULL(type) do { *isNull = true; return (type) 0; } while (0)
/* I/O function selector for system_cache_lookup */
typedef enum IOFuncSelector
{
IOFunc_input,
IOFunc_output,
IOFunc_receive,
IOFunc_send
} IOFuncSelector;
static int ArrayCount(char *str, int *dim, char typdelim);
static Datum *ReadArrayStr(char *arrayStr, int nitems, int ndim, int *dim,
@ -76,12 +86,17 @@ static Datum *ReadArrayStr(char *arrayStr, int nitems, int ndim, int *dim,
char typdelim,
int typlen, bool typbyval, char typalign,
int *nbytes);
static Datum *ReadArrayBinary(StringInfo buf, int nitems,
FmgrInfo *receiveproc, Oid typelem,
int typlen, bool typbyval, char typalign,
int *nbytes);
static void CopyArrayEls(char *p, Datum *values, int nitems,
int typlen, bool typbyval, char typalign,
bool freedata);
static void system_cache_lookup(Oid element_type, bool input, int *typlen,
bool *typbyval, char *typdelim, Oid *typelem,
Oid *proc, char *typalign);
static void system_cache_lookup(Oid element_type, IOFuncSelector which_func,
int *typlen, bool *typbyval,
char *typdelim, Oid *typelem,
Oid *proc, char *typalign);
static Datum ArrayCast(char *value, bool byval, int len);
static int ArrayCastAndSet(Datum src,
int typlen, bool typbyval, char typalign,
@ -141,7 +156,8 @@ array_in(PG_FUNCTION_ARGS)
char typalign;
/* Get info about element type, including its input conversion proc */
system_cache_lookup(element_type, true, &typlen, &typbyval, &typdelim,
system_cache_lookup(element_type, IOFunc_input,
&typlen, &typbyval, &typdelim,
&typelem, &typinput, &typalign);
fmgr_info(typinput, &inputproc);
@ -622,8 +638,9 @@ array_out(PG_FUNCTION_ARGS)
*dim;
element_type = ARR_ELEMTYPE(v);
system_cache_lookup(element_type, false, &typlen, &typbyval,
&typdelim, &typelem, &typoutput, &typalign);
system_cache_lookup(element_type, IOFunc_output,
&typlen, &typbyval, &typdelim,
&typelem, &typoutput, &typalign);
fmgr_info(typoutput, &outputproc);
ndim = ARR_NDIM(v);
@ -763,10 +780,178 @@ array_out(PG_FUNCTION_ARGS)
Datum
array_recv(PG_FUNCTION_ARGS)
{
elog(ERROR, "array_recv: not implemented yet");
return 0;
StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
Oid spec_element_type = PG_GETARG_OID(1); /* type of an array
* element */
Oid element_type;
int typlen;
bool typbyval;
char typdelim;
Oid typreceive;
Oid typelem;
FmgrInfo receiveproc;
int i,
nitems;
int32 nbytes;
Datum *dataPtr;
ArrayType *retval;
int ndim,
flags,
dim[MAXDIM],
lBound[MAXDIM];
char typalign;
/* Get the array header information */
ndim = pq_getmsgint(buf, 4);
if (ndim < 0 || ndim > MAXDIM)
elog(ERROR, "array_recv: invalid number of dimensions");
flags = pq_getmsgint(buf, 4);
if (flags != 0)
elog(ERROR, "array_recv: invalid array flags");
element_type = pq_getmsgint(buf, sizeof(Oid));
if (element_type != spec_element_type)
{
/* XXX Can we allow taking the input element type in any cases? */
elog(ERROR, "array_recv: wrong element type");
}
for (i = 0; i < ndim; i++)
{
dim[i] = pq_getmsgint(buf, 4);
lBound[i] = pq_getmsgint(buf, 4);
}
nitems = ArrayGetNItems(ndim, dim);
if (nitems == 0)
{
/* Return empty array */
retval = (ArrayType *) palloc0(sizeof(ArrayType));
retval->size = sizeof(ArrayType);
retval->elemtype = element_type;
PG_RETURN_ARRAYTYPE_P(retval);
}
/* Get info about element type, including its receive conversion proc */
system_cache_lookup(element_type, IOFunc_receive,
&typlen, &typbyval, &typdelim,
&typelem, &typreceive, &typalign);
if (!OidIsValid(typreceive))
elog(ERROR, "No binary input function available for type %s",
format_type_be(element_type));
fmgr_info(typreceive, &receiveproc);
dataPtr = ReadArrayBinary(buf, nitems, &receiveproc, typelem,
typlen, typbyval, typalign,
&nbytes);
nbytes += ARR_OVERHEAD(ndim);
retval = (ArrayType *) palloc0(nbytes);
retval->size = nbytes;
retval->ndim = ndim;
retval->elemtype = element_type;
memcpy((char *) ARR_DIMS(retval), (char *) dim,
ndim * sizeof(int));
memcpy((char *) ARR_LBOUND(retval), (char *) lBound,
ndim * sizeof(int));
CopyArrayEls(ARR_DATA_PTR(retval), dataPtr, nitems,
typlen, typbyval, typalign, true);
pfree(dataPtr);
PG_RETURN_ARRAYTYPE_P(retval);
}
/*---------------------------------------------------------------------------
* ReadArrayBinary:
* collect the data elements of an array being read in binary style.
* result :
* returns a palloc'd array of Datum representations of the array elements.
* If element type is pass-by-ref, the Datums point to palloc'd values.
* *nbytes is set to the amount of data space needed for the array,
* including alignment padding but not including array header overhead.
*---------------------------------------------------------------------------
*/
static Datum *
ReadArrayBinary(StringInfo buf,
int nitems,
FmgrInfo *receiveproc,
Oid typelem,
int typlen,
bool typbyval,
char typalign,
int *nbytes)
{
Datum *values;
int i;
values = (Datum *) palloc(nitems * sizeof(Datum));
for (i = 0; i < nitems; i++)
{
int itemlen;
StringInfoData elem_buf;
char csave;
/* Get and check the item length */
itemlen = pq_getmsgint(buf, 4);
if (itemlen < 0 || itemlen > (buf->len - buf->cursor))
elog(ERROR, "insufficient data left in message");
/*
* Rather than copying data around, we just set up a phony
* StringInfo pointing to the correct portion of the input
* buffer. We assume we can scribble on the input buffer
* so as to maintain the convention that StringInfos have
* a trailing null.
*/
elem_buf.data = &buf->data[buf->cursor];
elem_buf.maxlen = itemlen + 1;
elem_buf.len = itemlen;
elem_buf.cursor = 0;
buf->cursor += itemlen;
csave = buf->data[buf->cursor];
buf->data[buf->cursor] = '\0';
/* Now call the element's receiveproc */
values[i] = FunctionCall2(receiveproc,
PointerGetDatum(&elem_buf),
ObjectIdGetDatum(typelem));
/* Trouble if it didn't eat the whole buffer */
if (elem_buf.cursor != itemlen)
elog(ERROR, "Improper binary format in array element %d",
i + 1);
buf->data[buf->cursor] = csave;
}
/*
* Compute total data space needed
*/
if (typlen > 0)
{
*nbytes = nitems * att_align(typlen, typalign);
}
else
{
Assert(!typbyval);
*nbytes = 0;
for (i = 0; i < nitems; i++)
{
/* let's just make sure data is not toasted */
if (typlen == -1)
values[i] = PointerGetDatum(PG_DETOAST_DATUM(values[i]));
*nbytes = att_addlength(*nbytes, typlen, values[i]);
*nbytes = att_align(*nbytes, typalign);
}
}
return values;
}
/*-------------------------------------------------------------------------
* array_send :
* takes the internal representation of an array and returns a bytea
@ -776,8 +961,70 @@ array_recv(PG_FUNCTION_ARGS)
Datum
array_send(PG_FUNCTION_ARGS)
{
elog(ERROR, "array_send: not implemented yet");
return 0;
ArrayType *v = PG_GETARG_ARRAYTYPE_P(0);
Oid element_type;
int typlen;
bool typbyval;
char typdelim;
Oid typsend,
typelem;
FmgrInfo sendproc;
char typalign;
char *p;
int nitems,
i;
int ndim,
*dim;
StringInfoData buf;
/* Get information about the element type and the array dimensions */
element_type = ARR_ELEMTYPE(v);
system_cache_lookup(element_type, IOFunc_send, &typlen, &typbyval,
&typdelim, &typelem, &typsend, &typalign);
if (!OidIsValid(typsend))
elog(ERROR, "No binary output function available for type %s",
format_type_be(element_type));
fmgr_info(typsend, &sendproc);
ndim = ARR_NDIM(v);
dim = ARR_DIMS(v);
nitems = ArrayGetNItems(ndim, dim);
pq_begintypsend(&buf);
/* Send the array header information */
pq_sendint(&buf, ndim, 4);
pq_sendint(&buf, v->flags, 4);
pq_sendint(&buf, element_type, sizeof(Oid));
for (i = 0; i < ndim; i++)
{
pq_sendint(&buf, ARR_DIMS(v)[i], 4);
pq_sendint(&buf, ARR_LBOUND(v)[i], 4);
}
/* Send the array elements using the element's own sendproc */
p = ARR_DATA_PTR(v);
for (i = 0; i < nitems; i++)
{
Datum itemvalue;
bytea *outputbytes;
itemvalue = fetch_att(p, typbyval, typlen);
outputbytes = DatumGetByteaP(FunctionCall2(&sendproc,
itemvalue,
ObjectIdGetDatum(typelem)));
/* We assume the result will not have been toasted */
pq_sendint(&buf, VARSIZE(outputbytes) - VARHDRSZ, 4);
pq_sendbytes(&buf, VARDATA(outputbytes),
VARSIZE(outputbytes) - VARHDRSZ);
pfree(outputbytes);
p = att_addlength(p, typlen, PointerGetDatum(p));
p = (char *) att_align(p, typalign);
}
PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
}
/*-------------------------------------------------------------------------
@ -1583,9 +1830,9 @@ array_map(FunctionCallInfo fcinfo, Oid inpType, Oid retType)
PG_RETURN_ARRAYTYPE_P(v);
/* Lookup source and result types. Unneeded variables are reused. */
system_cache_lookup(inpType, false, &inp_typlen, &inp_typbyval,
system_cache_lookup(inpType, IOFunc_input, &inp_typlen, &inp_typbyval,
&typdelim, &typelem, &proc, &inp_typalign);
system_cache_lookup(retType, false, &typlen, &typbyval,
system_cache_lookup(retType, IOFunc_input, &typlen, &typbyval,
&typdelim, &typelem, &proc, &typalign);
/* Allocate temporary array for new values */
@ -1832,7 +2079,7 @@ array_eq(PG_FUNCTION_ARGS)
static void
system_cache_lookup(Oid element_type,
bool input,
IOFuncSelector which_func,
int *typlen,
bool *typbyval,
char *typdelim,
@ -1855,10 +2102,21 @@ system_cache_lookup(Oid element_type,
*typdelim = typeStruct->typdelim;
*typelem = typeStruct->typelem;
*typalign = typeStruct->typalign;
if (input)
*proc = typeStruct->typinput;
else
*proc = typeStruct->typoutput;
switch (which_func)
{
case IOFunc_input:
*proc = typeStruct->typinput;
break;
case IOFunc_output:
*proc = typeStruct->typoutput;
break;
case IOFunc_receive:
*proc = typeStruct->typreceive;
break;
case IOFunc_send:
*proc = typeStruct->typsend;
break;
}
ReleaseSysCache(typeTuple);
}