mirror of
https://github.com/postgres/postgres.git
synced 2025-04-20 00:42:27 +03:00
This is a mechanical change in preparation for a later commit that will change the layout of TupleDesc. Introducing a macro to abstract the details of where attributes are stored will allow us to change that in separate step and revise it in future. Author: Thomas Munro, editorialized by Andres Freund Reviewed-By: Andres Freund Discussion: https://postgr.es/m/CAEepm=0ZtQ-SpsgCyzzYpsXS6e=kZWqk3g5Ygn3MDV7A8dabUA@mail.gmail.com
1277 lines
38 KiB
C
1277 lines
38 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* tqueue.c
|
|
* Use shm_mq to send & receive tuples between parallel backends
|
|
*
|
|
* Most of the complexity in this module arises from transient RECORD types,
|
|
* which all have type RECORDOID and are distinguished by typmod numbers
|
|
* that are managed per-backend (see src/backend/utils/cache/typcache.c).
|
|
* The sender's set of RECORD typmod assignments probably doesn't match the
|
|
* receiver's. To deal with this, we make the sender send a description
|
|
* of each transient RECORD type appearing in the data it sends. The
|
|
* receiver finds or creates a matching type in its own typcache, and then
|
|
* maps the sender's typmod for that type to its own typmod.
|
|
*
|
|
* A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
|
|
* under the hood, writes tuples from the executor to a shm_mq. If
|
|
* necessary, it also writes control messages describing transient
|
|
* record types used within the tuple.
|
|
*
|
|
* A TupleQueueReader reads tuples, and control messages if any are sent,
|
|
* from a shm_mq and returns the tuples. If transient record types are
|
|
* in use, it registers those types locally based on the control messages
|
|
* and rewrites the typmods sent by the remote side to the corresponding
|
|
* local record typmods.
|
|
*
|
|
* Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/executor/tqueue.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
#include "access/htup_details.h"
|
|
#include "catalog/pg_type.h"
|
|
#include "executor/tqueue.h"
|
|
#include "funcapi.h"
|
|
#include "lib/stringinfo.h"
|
|
#include "miscadmin.h"
|
|
#include "utils/array.h"
|
|
#include "utils/lsyscache.h"
|
|
#include "utils/memutils.h"
|
|
#include "utils/rangetypes.h"
|
|
#include "utils/syscache.h"
|
|
#include "utils/typcache.h"
|
|
|
|
|
|
/*
|
|
* The data transferred through the shm_mq is divided into messages.
|
|
* One-byte messages are mode-switch messages, telling the receiver to switch
|
|
* between "control" and "data" modes. (We always start up in "data" mode.)
|
|
* Otherwise, when in "data" mode, each message is a tuple. When in "control"
|
|
* mode, each message defines one transient-typmod-to-tupledesc mapping to
|
|
* let us interpret future tuples. Both of those cases certainly require
|
|
* more than one byte, so no confusion is possible.
|
|
*/
|
|
#define TUPLE_QUEUE_MODE_CONTROL 'c' /* mode-switch message contents */
|
|
#define TUPLE_QUEUE_MODE_DATA 'd'
|
|
|
|
/*
|
|
* Both the sender and receiver build trees of TupleRemapInfo nodes to help
|
|
* them identify which (sub) fields of transmitted tuples are composite and
|
|
* may thus need remap processing. We might need to look within arrays and
|
|
* ranges, not only composites, to find composite sub-fields. A NULL
|
|
* TupleRemapInfo pointer indicates that it is known that the described field
|
|
* is not composite and has no composite substructure.
|
|
*
|
|
* Note that we currently have to look at each composite field at runtime,
|
|
* even if we believe it's of a named composite type (i.e., not RECORD).
|
|
* This is because we allow the actual value to be a compatible transient
|
|
* RECORD type. That's grossly inefficient, and it would be good to get
|
|
* rid of the requirement, but it's not clear what would need to change.
|
|
*
|
|
* Also, we allow the top-level tuple structure, as well as the actual
|
|
* structure of composite subfields, to change from one tuple to the next
|
|
* at runtime. This may well be entirely historical, but it's mostly free
|
|
* to support given the previous requirement; and other places in the system
|
|
* also permit this, so it's not entirely clear if we could drop it.
|
|
*/
|
|
|
|
typedef enum
|
|
{
|
|
TQUEUE_REMAP_ARRAY, /* array */
|
|
TQUEUE_REMAP_RANGE, /* range */
|
|
TQUEUE_REMAP_RECORD /* composite type, named or transient */
|
|
} TupleRemapClass;
|
|
|
|
typedef struct TupleRemapInfo TupleRemapInfo;
|
|
|
|
typedef struct ArrayRemapInfo
|
|
{
|
|
int16 typlen; /* array element type's storage properties */
|
|
bool typbyval;
|
|
char typalign;
|
|
TupleRemapInfo *element_remap; /* array element type's remap info */
|
|
} ArrayRemapInfo;
|
|
|
|
typedef struct RangeRemapInfo
|
|
{
|
|
TypeCacheEntry *typcache; /* range type's typcache entry */
|
|
TupleRemapInfo *bound_remap; /* range bound type's remap info */
|
|
} RangeRemapInfo;
|
|
|
|
typedef struct RecordRemapInfo
|
|
{
|
|
/* Original (remote) type ID info last seen for this composite field */
|
|
Oid rectypid;
|
|
int32 rectypmod;
|
|
/* Local RECORD typmod, or -1 if unset; not used on sender side */
|
|
int32 localtypmod;
|
|
/* If no fields of the record require remapping, these are NULL: */
|
|
TupleDesc tupledesc; /* copy of record's tupdesc */
|
|
TupleRemapInfo **field_remap; /* each field's remap info */
|
|
} RecordRemapInfo;
|
|
|
|
struct TupleRemapInfo
|
|
{
|
|
TupleRemapClass remapclass;
|
|
union
|
|
{
|
|
ArrayRemapInfo arr;
|
|
RangeRemapInfo rng;
|
|
RecordRemapInfo rec;
|
|
} u;
|
|
};
|
|
|
|
/*
|
|
* DestReceiver object's private contents
|
|
*
|
|
* queue and tupledesc are pointers to data supplied by DestReceiver's caller.
|
|
* The recordhtab and remap info are owned by the DestReceiver and are kept
|
|
* in mycontext. tmpcontext is a tuple-lifespan context to hold cruft
|
|
* created while traversing each tuple to find record subfields.
|
|
*/
|
|
typedef struct TQueueDestReceiver
|
|
{
|
|
DestReceiver pub; /* public fields */
|
|
shm_mq_handle *queue; /* shm_mq to send to */
|
|
MemoryContext mycontext; /* context containing TQueueDestReceiver */
|
|
MemoryContext tmpcontext; /* per-tuple context, if needed */
|
|
HTAB *recordhtab; /* table of transmitted typmods, if needed */
|
|
char mode; /* current message mode */
|
|
TupleDesc tupledesc; /* current top-level tuple descriptor */
|
|
TupleRemapInfo **field_remapinfo; /* current top-level remap info */
|
|
} TQueueDestReceiver;
|
|
|
|
/*
|
|
* Hash table entries for mapping remote to local typmods.
|
|
*/
|
|
typedef struct RecordTypmodMap
|
|
{
|
|
int32 remotetypmod; /* hash key (must be first!) */
|
|
int32 localtypmod;
|
|
} RecordTypmodMap;
|
|
|
|
/*
|
|
* TupleQueueReader object's private contents
|
|
*
|
|
* queue and tupledesc are pointers to data supplied by reader's caller.
|
|
* The typmodmap and remap info are owned by the TupleQueueReader and
|
|
* are kept in mycontext.
|
|
*
|
|
* "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
|
|
*/
|
|
struct TupleQueueReader
|
|
{
|
|
shm_mq_handle *queue; /* shm_mq to receive from */
|
|
MemoryContext mycontext; /* context containing TupleQueueReader */
|
|
HTAB *typmodmap; /* RecordTypmodMap hashtable, if needed */
|
|
char mode; /* current message mode */
|
|
TupleDesc tupledesc; /* current top-level tuple descriptor */
|
|
TupleRemapInfo **field_remapinfo; /* current top-level remap info */
|
|
};
|
|
|
|
/* Local function prototypes */
|
|
static void TQExamine(TQueueDestReceiver *tqueue,
|
|
TupleRemapInfo *remapinfo,
|
|
Datum value);
|
|
static void TQExamineArray(TQueueDestReceiver *tqueue,
|
|
ArrayRemapInfo *remapinfo,
|
|
Datum value);
|
|
static void TQExamineRange(TQueueDestReceiver *tqueue,
|
|
RangeRemapInfo *remapinfo,
|
|
Datum value);
|
|
static void TQExamineRecord(TQueueDestReceiver *tqueue,
|
|
RecordRemapInfo *remapinfo,
|
|
Datum value);
|
|
static void TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod,
|
|
TupleDesc tupledesc);
|
|
static void TupleQueueHandleControlMessage(TupleQueueReader *reader,
|
|
Size nbytes, char *data);
|
|
static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader,
|
|
Size nbytes, HeapTupleHeader data);
|
|
static HeapTuple TQRemapTuple(TupleQueueReader *reader,
|
|
TupleDesc tupledesc,
|
|
TupleRemapInfo **field_remapinfo,
|
|
HeapTuple tuple);
|
|
static Datum TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo,
|
|
Datum value, bool *changed);
|
|
static Datum TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo,
|
|
Datum value, bool *changed);
|
|
static Datum TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo,
|
|
Datum value, bool *changed);
|
|
static Datum TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo,
|
|
Datum value, bool *changed);
|
|
static TupleRemapInfo *BuildTupleRemapInfo(Oid typid, MemoryContext mycontext);
|
|
static TupleRemapInfo *BuildArrayRemapInfo(Oid elemtypid,
|
|
MemoryContext mycontext);
|
|
static TupleRemapInfo *BuildRangeRemapInfo(Oid rngtypid,
|
|
MemoryContext mycontext);
|
|
static TupleRemapInfo **BuildFieldRemapInfo(TupleDesc tupledesc,
|
|
MemoryContext mycontext);
|
|
|
|
|
|
/*
|
|
* Receive a tuple from a query, and send it to the designated shm_mq.
|
|
*
|
|
* Returns TRUE if successful, FALSE if shm_mq has been detached.
|
|
*/
|
|
static bool
|
|
tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
|
|
{
|
|
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
|
|
TupleDesc tupledesc = slot->tts_tupleDescriptor;
|
|
HeapTuple tuple;
|
|
shm_mq_result result;
|
|
|
|
/*
|
|
* If first time through, compute remapping info for the top-level fields.
|
|
* On later calls, if the tupledesc has changed, set up for the new
|
|
* tupledesc. (This is a strange test both because the executor really
|
|
* shouldn't change the tupledesc, and also because it would be unsafe if
|
|
* the old tupledesc could be freed and a new one allocated at the same
|
|
* address. But since some very old code in printtup.c uses a similar
|
|
* approach, we adopt it here as well.)
|
|
*
|
|
* Here and elsewhere in this module, when replacing remapping info we
|
|
* pfree the top-level object because that's easy, but we don't bother to
|
|
* recursively free any substructure. This would lead to query-lifespan
|
|
* memory leaks if the mapping info actually changed frequently, but since
|
|
* we don't expect that to happen, it doesn't seem worth expending code to
|
|
* prevent it.
|
|
*/
|
|
if (tqueue->tupledesc != tupledesc)
|
|
{
|
|
/* Is it worth trying to free substructure of the remap tree? */
|
|
if (tqueue->field_remapinfo != NULL)
|
|
pfree(tqueue->field_remapinfo);
|
|
tqueue->field_remapinfo = BuildFieldRemapInfo(tupledesc,
|
|
tqueue->mycontext);
|
|
tqueue->tupledesc = tupledesc;
|
|
}
|
|
|
|
/*
|
|
* When, because of the types being transmitted, no record typmod mapping
|
|
* can be needed, we can skip a good deal of work.
|
|
*/
|
|
if (tqueue->field_remapinfo != NULL)
|
|
{
|
|
TupleRemapInfo **remapinfo = tqueue->field_remapinfo;
|
|
int i;
|
|
MemoryContext oldcontext = NULL;
|
|
|
|
/* Deform the tuple so we can examine fields, if not done already. */
|
|
slot_getallattrs(slot);
|
|
|
|
/* Iterate over each attribute and search it for transient typmods. */
|
|
for (i = 0; i < tupledesc->natts; i++)
|
|
{
|
|
/* Ignore nulls and types that don't need special handling. */
|
|
if (slot->tts_isnull[i] || remapinfo[i] == NULL)
|
|
continue;
|
|
|
|
/* Switch to temporary memory context to avoid leaking. */
|
|
if (oldcontext == NULL)
|
|
{
|
|
if (tqueue->tmpcontext == NULL)
|
|
tqueue->tmpcontext =
|
|
AllocSetContextCreate(tqueue->mycontext,
|
|
"tqueue sender temp context",
|
|
ALLOCSET_DEFAULT_SIZES);
|
|
oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext);
|
|
}
|
|
|
|
/* Examine the value. */
|
|
TQExamine(tqueue, remapinfo[i], slot->tts_values[i]);
|
|
}
|
|
|
|
/* If we used the temp context, reset it and restore prior context. */
|
|
if (oldcontext != NULL)
|
|
{
|
|
MemoryContextSwitchTo(oldcontext);
|
|
MemoryContextReset(tqueue->tmpcontext);
|
|
}
|
|
|
|
/* If we entered control mode, switch back to data mode. */
|
|
if (tqueue->mode != TUPLE_QUEUE_MODE_DATA)
|
|
{
|
|
tqueue->mode = TUPLE_QUEUE_MODE_DATA;
|
|
shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
|
|
}
|
|
}
|
|
|
|
/* Send the tuple itself. */
|
|
tuple = ExecMaterializeSlot(slot);
|
|
result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false);
|
|
|
|
/* Check for failure. */
|
|
if (result == SHM_MQ_DETACHED)
|
|
return false;
|
|
else if (result != SHM_MQ_SUCCESS)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("could not send tuple to shared-memory queue")));
|
|
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* Examine the given datum and send any necessary control messages for
|
|
* transient record types contained in it.
|
|
*
|
|
* remapinfo is previously-computed remapping info about the datum's type.
|
|
*
|
|
* This function just dispatches based on the remap class.
|
|
*/
|
|
static void
|
|
TQExamine(TQueueDestReceiver *tqueue, TupleRemapInfo *remapinfo, Datum value)
|
|
{
|
|
/* This is recursive, so it could be driven to stack overflow. */
|
|
check_stack_depth();
|
|
|
|
switch (remapinfo->remapclass)
|
|
{
|
|
case TQUEUE_REMAP_ARRAY:
|
|
TQExamineArray(tqueue, &remapinfo->u.arr, value);
|
|
break;
|
|
case TQUEUE_REMAP_RANGE:
|
|
TQExamineRange(tqueue, &remapinfo->u.rng, value);
|
|
break;
|
|
case TQUEUE_REMAP_RECORD:
|
|
TQExamineRecord(tqueue, &remapinfo->u.rec, value);
|
|
break;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Examine a record datum and send any necessary control messages for
|
|
* transient record types contained in it.
|
|
*/
|
|
static void
|
|
TQExamineRecord(TQueueDestReceiver *tqueue, RecordRemapInfo *remapinfo,
|
|
Datum value)
|
|
{
|
|
HeapTupleHeader tup;
|
|
Oid typid;
|
|
int32 typmod;
|
|
TupleDesc tupledesc;
|
|
|
|
/* Extract type OID and typmod from tuple. */
|
|
tup = DatumGetHeapTupleHeader(value);
|
|
typid = HeapTupleHeaderGetTypeId(tup);
|
|
typmod = HeapTupleHeaderGetTypMod(tup);
|
|
|
|
/*
|
|
* If first time through, or if this isn't the same composite type as last
|
|
* time, consider sending a control message, and then look up the
|
|
* necessary information for examining the fields.
|
|
*/
|
|
if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
|
|
{
|
|
/* Free any old data. */
|
|
if (remapinfo->tupledesc != NULL)
|
|
FreeTupleDesc(remapinfo->tupledesc);
|
|
/* Is it worth trying to free substructure of the remap tree? */
|
|
if (remapinfo->field_remap != NULL)
|
|
pfree(remapinfo->field_remap);
|
|
|
|
/* Look up tuple descriptor in typcache. */
|
|
tupledesc = lookup_rowtype_tupdesc(typid, typmod);
|
|
|
|
/*
|
|
* If this is a transient record type, send the tupledesc in a control
|
|
* message. (TQSendRecordInfo is smart enough to do this only once
|
|
* per typmod.)
|
|
*/
|
|
if (typid == RECORDOID)
|
|
TQSendRecordInfo(tqueue, typmod, tupledesc);
|
|
|
|
/* Figure out whether fields need recursive processing. */
|
|
remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
|
|
tqueue->mycontext);
|
|
if (remapinfo->field_remap != NULL)
|
|
{
|
|
/*
|
|
* We need to inspect the record contents, so save a copy of the
|
|
* tupdesc. (We could possibly just reference the typcache's
|
|
* copy, but then it's problematic when to release the refcount.)
|
|
*/
|
|
MemoryContext oldcontext = MemoryContextSwitchTo(tqueue->mycontext);
|
|
|
|
remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
|
|
MemoryContextSwitchTo(oldcontext);
|
|
}
|
|
else
|
|
{
|
|
/* No fields of the record require remapping. */
|
|
remapinfo->tupledesc = NULL;
|
|
}
|
|
remapinfo->rectypid = typid;
|
|
remapinfo->rectypmod = typmod;
|
|
|
|
/* Release reference count acquired by lookup_rowtype_tupdesc. */
|
|
DecrTupleDescRefCount(tupledesc);
|
|
}
|
|
|
|
/*
|
|
* If field remapping is required, deform the tuple and examine each
|
|
* field.
|
|
*/
|
|
if (remapinfo->field_remap != NULL)
|
|
{
|
|
Datum *values;
|
|
bool *isnull;
|
|
HeapTupleData tdata;
|
|
int i;
|
|
|
|
/* Deform the tuple so we can check each column within. */
|
|
tupledesc = remapinfo->tupledesc;
|
|
values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
|
|
isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
|
|
tdata.t_len = HeapTupleHeaderGetDatumLength(tup);
|
|
ItemPointerSetInvalid(&(tdata.t_self));
|
|
tdata.t_tableOid = InvalidOid;
|
|
tdata.t_data = tup;
|
|
heap_deform_tuple(&tdata, tupledesc, values, isnull);
|
|
|
|
/* Recursively check each interesting non-NULL attribute. */
|
|
for (i = 0; i < tupledesc->natts; i++)
|
|
{
|
|
if (!isnull[i] && remapinfo->field_remap[i])
|
|
TQExamine(tqueue, remapinfo->field_remap[i], values[i]);
|
|
}
|
|
|
|
/* Need not clean up, since we're in a short-lived context. */
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Examine an array datum and send any necessary control messages for
|
|
* transient record types contained in it.
|
|
*/
|
|
static void
|
|
TQExamineArray(TQueueDestReceiver *tqueue, ArrayRemapInfo *remapinfo,
|
|
Datum value)
|
|
{
|
|
ArrayType *arr = DatumGetArrayTypeP(value);
|
|
Oid typid = ARR_ELEMTYPE(arr);
|
|
Datum *elem_values;
|
|
bool *elem_nulls;
|
|
int num_elems;
|
|
int i;
|
|
|
|
/* Deconstruct the array. */
|
|
deconstruct_array(arr, typid, remapinfo->typlen,
|
|
remapinfo->typbyval, remapinfo->typalign,
|
|
&elem_values, &elem_nulls, &num_elems);
|
|
|
|
/* Examine each element. */
|
|
for (i = 0; i < num_elems; i++)
|
|
{
|
|
if (!elem_nulls[i])
|
|
TQExamine(tqueue, remapinfo->element_remap, elem_values[i]);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Examine a range datum and send any necessary control messages for
|
|
* transient record types contained in it.
|
|
*/
|
|
static void
|
|
TQExamineRange(TQueueDestReceiver *tqueue, RangeRemapInfo *remapinfo,
|
|
Datum value)
|
|
{
|
|
RangeType *range = DatumGetRangeType(value);
|
|
RangeBound lower;
|
|
RangeBound upper;
|
|
bool empty;
|
|
|
|
/* Extract the lower and upper bounds. */
|
|
range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
|
|
|
|
/* Nothing to do for an empty range. */
|
|
if (empty)
|
|
return;
|
|
|
|
/* Examine each bound, if present. */
|
|
if (!upper.infinite)
|
|
TQExamine(tqueue, remapinfo->bound_remap, upper.val);
|
|
if (!lower.infinite)
|
|
TQExamine(tqueue, remapinfo->bound_remap, lower.val);
|
|
}
|
|
|
|
/*
|
|
* Send tuple descriptor information for a transient typmod, unless we've
|
|
* already done so previously.
|
|
*/
|
|
static void
|
|
TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod, TupleDesc tupledesc)
|
|
{
|
|
StringInfoData buf;
|
|
bool found;
|
|
int i;
|
|
|
|
/* Initialize hash table if not done yet. */
|
|
if (tqueue->recordhtab == NULL)
|
|
{
|
|
HASHCTL ctl;
|
|
|
|
MemSet(&ctl, 0, sizeof(ctl));
|
|
/* Hash table entries are just typmods */
|
|
ctl.keysize = sizeof(int32);
|
|
ctl.entrysize = sizeof(int32);
|
|
ctl.hcxt = tqueue->mycontext;
|
|
tqueue->recordhtab = hash_create("tqueue sender record type hashtable",
|
|
100, &ctl,
|
|
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
|
|
}
|
|
|
|
/* Have we already seen this record type? If not, must report it. */
|
|
hash_search(tqueue->recordhtab, &typmod, HASH_ENTER, &found);
|
|
if (found)
|
|
return;
|
|
|
|
elog(DEBUG3, "sending tqueue control message for record typmod %d", typmod);
|
|
|
|
/* If message queue is in data mode, switch to control mode. */
|
|
if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL)
|
|
{
|
|
tqueue->mode = TUPLE_QUEUE_MODE_CONTROL;
|
|
shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
|
|
}
|
|
|
|
/* Assemble a control message. */
|
|
initStringInfo(&buf);
|
|
appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int32));
|
|
appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int));
|
|
appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid, sizeof(bool));
|
|
for (i = 0; i < tupledesc->natts; i++)
|
|
{
|
|
appendBinaryStringInfo(&buf, (char *) TupleDescAttr(tupledesc, i),
|
|
sizeof(FormData_pg_attribute));
|
|
}
|
|
|
|
/* Send control message. */
|
|
shm_mq_send(tqueue->queue, buf.len, buf.data, false);
|
|
|
|
/* We assume it's OK to leak buf because we're in a short-lived context. */
|
|
}
|
|
|
|
/*
|
|
* Prepare to receive tuples from executor.
|
|
*/
|
|
static void
|
|
tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
|
|
{
|
|
/* do nothing */
|
|
}
|
|
|
|
/*
|
|
* Clean up at end of an executor run
|
|
*/
|
|
static void
|
|
tqueueShutdownReceiver(DestReceiver *self)
|
|
{
|
|
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
|
|
|
|
shm_mq_detach(shm_mq_get_queue(tqueue->queue));
|
|
}
|
|
|
|
/*
|
|
* Destroy receiver when done with it
|
|
*/
|
|
static void
|
|
tqueueDestroyReceiver(DestReceiver *self)
|
|
{
|
|
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
|
|
|
|
if (tqueue->tmpcontext != NULL)
|
|
MemoryContextDelete(tqueue->tmpcontext);
|
|
if (tqueue->recordhtab != NULL)
|
|
hash_destroy(tqueue->recordhtab);
|
|
/* Is it worth trying to free substructure of the remap tree? */
|
|
if (tqueue->field_remapinfo != NULL)
|
|
pfree(tqueue->field_remapinfo);
|
|
pfree(self);
|
|
}
|
|
|
|
/*
|
|
* Create a DestReceiver that writes tuples to a tuple queue.
|
|
*/
|
|
DestReceiver *
|
|
CreateTupleQueueDestReceiver(shm_mq_handle *handle)
|
|
{
|
|
TQueueDestReceiver *self;
|
|
|
|
self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
|
|
|
|
self->pub.receiveSlot = tqueueReceiveSlot;
|
|
self->pub.rStartup = tqueueStartupReceiver;
|
|
self->pub.rShutdown = tqueueShutdownReceiver;
|
|
self->pub.rDestroy = tqueueDestroyReceiver;
|
|
self->pub.mydest = DestTupleQueue;
|
|
self->queue = handle;
|
|
self->mycontext = CurrentMemoryContext;
|
|
self->tmpcontext = NULL;
|
|
self->recordhtab = NULL;
|
|
self->mode = TUPLE_QUEUE_MODE_DATA;
|
|
/* Top-level tupledesc is not known yet */
|
|
self->tupledesc = NULL;
|
|
self->field_remapinfo = NULL;
|
|
|
|
return (DestReceiver *) self;
|
|
}
|
|
|
|
/*
|
|
* Create a tuple queue reader.
|
|
*/
|
|
TupleQueueReader *
|
|
CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
|
|
{
|
|
TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
|
|
|
|
reader->queue = handle;
|
|
reader->mycontext = CurrentMemoryContext;
|
|
reader->typmodmap = NULL;
|
|
reader->mode = TUPLE_QUEUE_MODE_DATA;
|
|
reader->tupledesc = tupledesc;
|
|
reader->field_remapinfo = BuildFieldRemapInfo(tupledesc, reader->mycontext);
|
|
|
|
return reader;
|
|
}
|
|
|
|
/*
|
|
* Destroy a tuple queue reader.
|
|
*/
|
|
void
|
|
DestroyTupleQueueReader(TupleQueueReader *reader)
|
|
{
|
|
shm_mq_detach(shm_mq_get_queue(reader->queue));
|
|
if (reader->typmodmap != NULL)
|
|
hash_destroy(reader->typmodmap);
|
|
/* Is it worth trying to free substructure of the remap tree? */
|
|
if (reader->field_remapinfo != NULL)
|
|
pfree(reader->field_remapinfo);
|
|
pfree(reader);
|
|
}
|
|
|
|
/*
|
|
* Fetch a tuple from a tuple queue reader.
|
|
*
|
|
* The return value is NULL if there are no remaining tuples or if
|
|
* nowait = true and no tuple is ready to return. *done, if not NULL,
|
|
* is set to true when there are no remaining tuples and otherwise to false.
|
|
*
|
|
* The returned tuple, if any, is allocated in CurrentMemoryContext.
|
|
* That should be a short-lived (tuple-lifespan) context, because we are
|
|
* pretty cavalier about leaking memory in that context if we have to do
|
|
* tuple remapping.
|
|
*
|
|
* Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
|
|
* accumulate bytes from a partially-read message, so it's useful to call
|
|
* this with nowait = true even if nothing is returned.
|
|
*/
|
|
HeapTuple
|
|
TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
|
|
{
|
|
shm_mq_result result;
|
|
|
|
if (done != NULL)
|
|
*done = false;
|
|
|
|
for (;;)
|
|
{
|
|
Size nbytes;
|
|
void *data;
|
|
|
|
/* Attempt to read a message. */
|
|
result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
|
|
|
|
/* If queue is detached, set *done and return NULL. */
|
|
if (result == SHM_MQ_DETACHED)
|
|
{
|
|
if (done != NULL)
|
|
*done = true;
|
|
return NULL;
|
|
}
|
|
|
|
/* In non-blocking mode, bail out if no message ready yet. */
|
|
if (result == SHM_MQ_WOULD_BLOCK)
|
|
return NULL;
|
|
Assert(result == SHM_MQ_SUCCESS);
|
|
|
|
/*
|
|
* We got a message (see message spec at top of file). Process it.
|
|
*/
|
|
if (nbytes == 1)
|
|
{
|
|
/* Mode switch message. */
|
|
reader->mode = ((char *) data)[0];
|
|
}
|
|
else if (reader->mode == TUPLE_QUEUE_MODE_DATA)
|
|
{
|
|
/* Tuple data. */
|
|
return TupleQueueHandleDataMessage(reader, nbytes, data);
|
|
}
|
|
else if (reader->mode == TUPLE_QUEUE_MODE_CONTROL)
|
|
{
|
|
/* Control message, describing a transient record type. */
|
|
TupleQueueHandleControlMessage(reader, nbytes, data);
|
|
}
|
|
else
|
|
elog(ERROR, "unrecognized tqueue mode: %d", (int) reader->mode);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Handle a data message - that is, a tuple - from the remote side.
|
|
*/
|
|
static HeapTuple
|
|
TupleQueueHandleDataMessage(TupleQueueReader *reader,
|
|
Size nbytes,
|
|
HeapTupleHeader data)
|
|
{
|
|
HeapTupleData htup;
|
|
|
|
/*
|
|
* Set up a dummy HeapTupleData pointing to the data from the shm_mq
|
|
* (which had better be sufficiently aligned).
|
|
*/
|
|
ItemPointerSetInvalid(&htup.t_self);
|
|
htup.t_tableOid = InvalidOid;
|
|
htup.t_len = nbytes;
|
|
htup.t_data = data;
|
|
|
|
/*
|
|
* Either just copy the data into a regular palloc'd tuple, or remap it,
|
|
* as required.
|
|
*/
|
|
return TQRemapTuple(reader,
|
|
reader->tupledesc,
|
|
reader->field_remapinfo,
|
|
&htup);
|
|
}
|
|
|
|
/*
|
|
* Copy the given tuple, remapping any transient typmods contained in it.
|
|
*/
|
|
static HeapTuple
|
|
TQRemapTuple(TupleQueueReader *reader,
|
|
TupleDesc tupledesc,
|
|
TupleRemapInfo **field_remapinfo,
|
|
HeapTuple tuple)
|
|
{
|
|
Datum *values;
|
|
bool *isnull;
|
|
bool changed = false;
|
|
int i;
|
|
|
|
/*
|
|
* If no remapping is necessary, just copy the tuple into a single
|
|
* palloc'd chunk, as caller will expect.
|
|
*/
|
|
if (field_remapinfo == NULL)
|
|
return heap_copytuple(tuple);
|
|
|
|
/* Deform tuple so we can remap record typmods for individual attrs. */
|
|
values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
|
|
isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
|
|
heap_deform_tuple(tuple, tupledesc, values, isnull);
|
|
|
|
/* Recursively process each interesting non-NULL attribute. */
|
|
for (i = 0; i < tupledesc->natts; i++)
|
|
{
|
|
if (isnull[i] || field_remapinfo[i] == NULL)
|
|
continue;
|
|
values[i] = TQRemap(reader, field_remapinfo[i], values[i], &changed);
|
|
}
|
|
|
|
/* Reconstruct the modified tuple, if anything was modified. */
|
|
if (changed)
|
|
return heap_form_tuple(tupledesc, values, isnull);
|
|
else
|
|
return heap_copytuple(tuple);
|
|
}
|
|
|
|
/*
|
|
* Process the given datum and replace any transient record typmods
|
|
* contained in it. Set *changed to TRUE if we actually changed the datum.
|
|
*
|
|
* remapinfo is previously-computed remapping info about the datum's type.
|
|
*
|
|
* This function just dispatches based on the remap class.
|
|
*/
|
|
static Datum
|
|
TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo,
|
|
Datum value, bool *changed)
|
|
{
|
|
/* This is recursive, so it could be driven to stack overflow. */
|
|
check_stack_depth();
|
|
|
|
switch (remapinfo->remapclass)
|
|
{
|
|
case TQUEUE_REMAP_ARRAY:
|
|
return TQRemapArray(reader, &remapinfo->u.arr, value, changed);
|
|
|
|
case TQUEUE_REMAP_RANGE:
|
|
return TQRemapRange(reader, &remapinfo->u.rng, value, changed);
|
|
|
|
case TQUEUE_REMAP_RECORD:
|
|
return TQRemapRecord(reader, &remapinfo->u.rec, value, changed);
|
|
}
|
|
|
|
elog(ERROR, "unrecognized tqueue remap class: %d",
|
|
(int) remapinfo->remapclass);
|
|
return (Datum) 0;
|
|
}
|
|
|
|
/*
|
|
* Process the given array datum and replace any transient record typmods
|
|
* contained in it. Set *changed to TRUE if we actually changed the datum.
|
|
*/
|
|
static Datum
|
|
TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo,
|
|
Datum value, bool *changed)
|
|
{
|
|
ArrayType *arr = DatumGetArrayTypeP(value);
|
|
Oid typid = ARR_ELEMTYPE(arr);
|
|
bool element_changed = false;
|
|
Datum *elem_values;
|
|
bool *elem_nulls;
|
|
int num_elems;
|
|
int i;
|
|
|
|
/* Deconstruct the array. */
|
|
deconstruct_array(arr, typid, remapinfo->typlen,
|
|
remapinfo->typbyval, remapinfo->typalign,
|
|
&elem_values, &elem_nulls, &num_elems);
|
|
|
|
/* Remap each element. */
|
|
for (i = 0; i < num_elems; i++)
|
|
{
|
|
if (!elem_nulls[i])
|
|
elem_values[i] = TQRemap(reader,
|
|
remapinfo->element_remap,
|
|
elem_values[i],
|
|
&element_changed);
|
|
}
|
|
|
|
if (element_changed)
|
|
{
|
|
/* Reconstruct and return the array. */
|
|
*changed = true;
|
|
arr = construct_md_array(elem_values, elem_nulls,
|
|
ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
|
|
typid, remapinfo->typlen,
|
|
remapinfo->typbyval, remapinfo->typalign);
|
|
return PointerGetDatum(arr);
|
|
}
|
|
|
|
/* Else just return the value as-is. */
|
|
return value;
|
|
}
|
|
|
|
/*
|
|
* Process the given range datum and replace any transient record typmods
|
|
* contained in it. Set *changed to TRUE if we actually changed the datum.
|
|
*/
|
|
static Datum
|
|
TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo,
|
|
Datum value, bool *changed)
|
|
{
|
|
RangeType *range = DatumGetRangeType(value);
|
|
bool bound_changed = false;
|
|
RangeBound lower;
|
|
RangeBound upper;
|
|
bool empty;
|
|
|
|
/* Extract the lower and upper bounds. */
|
|
range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
|
|
|
|
/* Nothing to do for an empty range. */
|
|
if (empty)
|
|
return value;
|
|
|
|
/* Remap each bound, if present. */
|
|
if (!upper.infinite)
|
|
upper.val = TQRemap(reader, remapinfo->bound_remap,
|
|
upper.val, &bound_changed);
|
|
if (!lower.infinite)
|
|
lower.val = TQRemap(reader, remapinfo->bound_remap,
|
|
lower.val, &bound_changed);
|
|
|
|
if (bound_changed)
|
|
{
|
|
/* Reserialize. */
|
|
*changed = true;
|
|
range = range_serialize(remapinfo->typcache, &lower, &upper, empty);
|
|
return RangeTypeGetDatum(range);
|
|
}
|
|
|
|
/* Else just return the value as-is. */
|
|
return value;
|
|
}
|
|
|
|
/*
|
|
* Process the given record datum and replace any transient record typmods
|
|
* contained in it. Set *changed to TRUE if we actually changed the datum.
|
|
*/
|
|
static Datum
|
|
TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo,
|
|
Datum value, bool *changed)
|
|
{
|
|
HeapTupleHeader tup;
|
|
Oid typid;
|
|
int32 typmod;
|
|
bool changed_typmod;
|
|
TupleDesc tupledesc;
|
|
|
|
/* Extract type OID and typmod from tuple. */
|
|
tup = DatumGetHeapTupleHeader(value);
|
|
typid = HeapTupleHeaderGetTypeId(tup);
|
|
typmod = HeapTupleHeaderGetTypMod(tup);
|
|
|
|
/*
|
|
* If first time through, or if this isn't the same composite type as last
|
|
* time, identify the required typmod mapping, and then look up the
|
|
* necessary information for processing the fields.
|
|
*/
|
|
if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
|
|
{
|
|
/* Free any old data. */
|
|
if (remapinfo->tupledesc != NULL)
|
|
FreeTupleDesc(remapinfo->tupledesc);
|
|
/* Is it worth trying to free substructure of the remap tree? */
|
|
if (remapinfo->field_remap != NULL)
|
|
pfree(remapinfo->field_remap);
|
|
|
|
/* If transient record type, look up matching local typmod. */
|
|
if (typid == RECORDOID)
|
|
{
|
|
RecordTypmodMap *mapent;
|
|
|
|
Assert(reader->typmodmap != NULL);
|
|
mapent = hash_search(reader->typmodmap, &typmod,
|
|
HASH_FIND, NULL);
|
|
if (mapent == NULL)
|
|
elog(ERROR, "tqueue received unrecognized remote typmod %d",
|
|
typmod);
|
|
remapinfo->localtypmod = mapent->localtypmod;
|
|
}
|
|
else
|
|
remapinfo->localtypmod = -1;
|
|
|
|
/* Look up tuple descriptor in typcache. */
|
|
tupledesc = lookup_rowtype_tupdesc(typid, remapinfo->localtypmod);
|
|
|
|
/* Figure out whether fields need recursive processing. */
|
|
remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
|
|
reader->mycontext);
|
|
if (remapinfo->field_remap != NULL)
|
|
{
|
|
/*
|
|
* We need to inspect the record contents, so save a copy of the
|
|
* tupdesc. (We could possibly just reference the typcache's
|
|
* copy, but then it's problematic when to release the refcount.)
|
|
*/
|
|
MemoryContext oldcontext = MemoryContextSwitchTo(reader->mycontext);
|
|
|
|
remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
|
|
MemoryContextSwitchTo(oldcontext);
|
|
}
|
|
else
|
|
{
|
|
/* No fields of the record require remapping. */
|
|
remapinfo->tupledesc = NULL;
|
|
}
|
|
remapinfo->rectypid = typid;
|
|
remapinfo->rectypmod = typmod;
|
|
|
|
/* Release reference count acquired by lookup_rowtype_tupdesc. */
|
|
DecrTupleDescRefCount(tupledesc);
|
|
}
|
|
|
|
/* If transient record, replace remote typmod with local typmod. */
|
|
if (typid == RECORDOID && typmod != remapinfo->localtypmod)
|
|
{
|
|
typmod = remapinfo->localtypmod;
|
|
changed_typmod = true;
|
|
}
|
|
else
|
|
changed_typmod = false;
|
|
|
|
/*
|
|
* If we need to change the typmod, or if there are any potentially
|
|
* remappable fields, replace the tuple.
|
|
*/
|
|
if (changed_typmod || remapinfo->field_remap != NULL)
|
|
{
|
|
HeapTupleData htup;
|
|
HeapTuple atup;
|
|
|
|
/* For now, assume we always need to change the tuple in this case. */
|
|
*changed = true;
|
|
|
|
/* Copy tuple, possibly remapping contained fields. */
|
|
ItemPointerSetInvalid(&htup.t_self);
|
|
htup.t_tableOid = InvalidOid;
|
|
htup.t_len = HeapTupleHeaderGetDatumLength(tup);
|
|
htup.t_data = tup;
|
|
atup = TQRemapTuple(reader,
|
|
remapinfo->tupledesc,
|
|
remapinfo->field_remap,
|
|
&htup);
|
|
|
|
/* Apply the correct labeling for a local Datum. */
|
|
HeapTupleHeaderSetTypeId(atup->t_data, typid);
|
|
HeapTupleHeaderSetTypMod(atup->t_data, typmod);
|
|
HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len);
|
|
|
|
/* And return the results. */
|
|
return HeapTupleHeaderGetDatum(atup->t_data);
|
|
}
|
|
|
|
/* Else just return the value as-is. */
|
|
return value;
|
|
}
|
|
|
|
/*
|
|
* Handle a control message from the tuple queue reader.
|
|
*
|
|
* Control messages are sent when the remote side is sending tuples that
|
|
* contain transient record types. We need to arrange to bless those
|
|
* record types locally and translate between remote and local typmods.
|
|
*/
|
|
static void
|
|
TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes,
|
|
char *data)
|
|
{
|
|
int32 remotetypmod;
|
|
int natts;
|
|
bool hasoid;
|
|
Size offset = 0;
|
|
Form_pg_attribute *attrs;
|
|
TupleDesc tupledesc;
|
|
RecordTypmodMap *mapent;
|
|
bool found;
|
|
int i;
|
|
|
|
/* Extract remote typmod. */
|
|
memcpy(&remotetypmod, &data[offset], sizeof(int32));
|
|
offset += sizeof(int32);
|
|
|
|
/* Extract attribute count. */
|
|
memcpy(&natts, &data[offset], sizeof(int));
|
|
offset += sizeof(int);
|
|
|
|
/* Extract hasoid flag. */
|
|
memcpy(&hasoid, &data[offset], sizeof(bool));
|
|
offset += sizeof(bool);
|
|
|
|
/* Extract attribute details. The tupledesc made here is just transient. */
|
|
attrs = palloc(natts * sizeof(Form_pg_attribute));
|
|
for (i = 0; i < natts; i++)
|
|
{
|
|
attrs[i] = palloc(sizeof(FormData_pg_attribute));
|
|
memcpy(attrs[i], &data[offset], sizeof(FormData_pg_attribute));
|
|
offset += sizeof(FormData_pg_attribute);
|
|
}
|
|
|
|
/* We should have read the whole message. */
|
|
Assert(offset == nbytes);
|
|
|
|
/* Construct TupleDesc, and assign a local typmod. */
|
|
tupledesc = CreateTupleDesc(natts, hasoid, attrs);
|
|
tupledesc = BlessTupleDesc(tupledesc);
|
|
|
|
/* Create mapping hashtable if it doesn't exist already. */
|
|
if (reader->typmodmap == NULL)
|
|
{
|
|
HASHCTL ctl;
|
|
|
|
MemSet(&ctl, 0, sizeof(ctl));
|
|
ctl.keysize = sizeof(int32);
|
|
ctl.entrysize = sizeof(RecordTypmodMap);
|
|
ctl.hcxt = reader->mycontext;
|
|
reader->typmodmap = hash_create("tqueue receiver record type hashtable",
|
|
100, &ctl,
|
|
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
|
|
}
|
|
|
|
/* Create map entry. */
|
|
mapent = hash_search(reader->typmodmap, &remotetypmod, HASH_ENTER,
|
|
&found);
|
|
if (found)
|
|
elog(ERROR, "duplicate tqueue control message for typmod %d",
|
|
remotetypmod);
|
|
mapent->localtypmod = tupledesc->tdtypmod;
|
|
|
|
elog(DEBUG3, "tqueue mapping remote typmod %d to local typmod %d",
|
|
remotetypmod, mapent->localtypmod);
|
|
}
|
|
|
|
/*
|
|
* Build remap info for the specified data type, storing it in mycontext.
|
|
* Returns NULL if neither the type nor any subtype could require remapping.
|
|
*/
|
|
static TupleRemapInfo *
|
|
BuildTupleRemapInfo(Oid typid, MemoryContext mycontext)
|
|
{
|
|
HeapTuple tup;
|
|
Form_pg_type typ;
|
|
|
|
/* This is recursive, so it could be driven to stack overflow. */
|
|
check_stack_depth();
|
|
|
|
restart:
|
|
tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
|
|
if (!HeapTupleIsValid(tup))
|
|
elog(ERROR, "cache lookup failed for type %u", typid);
|
|
typ = (Form_pg_type) GETSTRUCT(tup);
|
|
|
|
/* Look through domains to underlying base type. */
|
|
if (typ->typtype == TYPTYPE_DOMAIN)
|
|
{
|
|
typid = typ->typbasetype;
|
|
ReleaseSysCache(tup);
|
|
goto restart;
|
|
}
|
|
|
|
/* If it's a true array type, deal with it that way. */
|
|
if (OidIsValid(typ->typelem) && typ->typlen == -1)
|
|
{
|
|
typid = typ->typelem;
|
|
ReleaseSysCache(tup);
|
|
return BuildArrayRemapInfo(typid, mycontext);
|
|
}
|
|
|
|
/* Similarly, deal with ranges appropriately. */
|
|
if (typ->typtype == TYPTYPE_RANGE)
|
|
{
|
|
ReleaseSysCache(tup);
|
|
return BuildRangeRemapInfo(typid, mycontext);
|
|
}
|
|
|
|
/*
|
|
* If it's a composite type (including RECORD), set up for remapping. We
|
|
* don't attempt to determine the status of subfields here, since we do
|
|
* not have enough information yet; just mark everything invalid.
|
|
*/
|
|
if (typ->typtype == TYPTYPE_COMPOSITE || typid == RECORDOID)
|
|
{
|
|
TupleRemapInfo *remapinfo;
|
|
|
|
remapinfo = (TupleRemapInfo *)
|
|
MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
|
|
remapinfo->remapclass = TQUEUE_REMAP_RECORD;
|
|
remapinfo->u.rec.rectypid = InvalidOid;
|
|
remapinfo->u.rec.rectypmod = -1;
|
|
remapinfo->u.rec.localtypmod = -1;
|
|
remapinfo->u.rec.tupledesc = NULL;
|
|
remapinfo->u.rec.field_remap = NULL;
|
|
ReleaseSysCache(tup);
|
|
return remapinfo;
|
|
}
|
|
|
|
/* Nothing else can possibly need remapping attention. */
|
|
ReleaseSysCache(tup);
|
|
return NULL;
|
|
}
|
|
|
|
static TupleRemapInfo *
|
|
BuildArrayRemapInfo(Oid elemtypid, MemoryContext mycontext)
|
|
{
|
|
TupleRemapInfo *remapinfo;
|
|
TupleRemapInfo *element_remapinfo;
|
|
|
|
/* See if element type requires remapping. */
|
|
element_remapinfo = BuildTupleRemapInfo(elemtypid, mycontext);
|
|
/* If not, the array doesn't either. */
|
|
if (element_remapinfo == NULL)
|
|
return NULL;
|
|
/* OK, set up to remap the array. */
|
|
remapinfo = (TupleRemapInfo *)
|
|
MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
|
|
remapinfo->remapclass = TQUEUE_REMAP_ARRAY;
|
|
get_typlenbyvalalign(elemtypid,
|
|
&remapinfo->u.arr.typlen,
|
|
&remapinfo->u.arr.typbyval,
|
|
&remapinfo->u.arr.typalign);
|
|
remapinfo->u.arr.element_remap = element_remapinfo;
|
|
return remapinfo;
|
|
}
|
|
|
|
static TupleRemapInfo *
|
|
BuildRangeRemapInfo(Oid rngtypid, MemoryContext mycontext)
|
|
{
|
|
TupleRemapInfo *remapinfo;
|
|
TupleRemapInfo *bound_remapinfo;
|
|
TypeCacheEntry *typcache;
|
|
|
|
/*
|
|
* Get range info from the typcache. We assume this pointer will stay
|
|
* valid for the duration of the query.
|
|
*/
|
|
typcache = lookup_type_cache(rngtypid, TYPECACHE_RANGE_INFO);
|
|
if (typcache->rngelemtype == NULL)
|
|
elog(ERROR, "type %u is not a range type", rngtypid);
|
|
|
|
/* See if range bound type requires remapping. */
|
|
bound_remapinfo = BuildTupleRemapInfo(typcache->rngelemtype->type_id,
|
|
mycontext);
|
|
/* If not, the range doesn't either. */
|
|
if (bound_remapinfo == NULL)
|
|
return NULL;
|
|
/* OK, set up to remap the range. */
|
|
remapinfo = (TupleRemapInfo *)
|
|
MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
|
|
remapinfo->remapclass = TQUEUE_REMAP_RANGE;
|
|
remapinfo->u.rng.typcache = typcache;
|
|
remapinfo->u.rng.bound_remap = bound_remapinfo;
|
|
return remapinfo;
|
|
}
|
|
|
|
/*
|
|
* Build remap info for fields of the type described by the given tupdesc.
|
|
* Returns an array of TupleRemapInfo pointers, or NULL if no field
|
|
* requires remapping. Data is allocated in mycontext.
|
|
*/
|
|
static TupleRemapInfo **
|
|
BuildFieldRemapInfo(TupleDesc tupledesc, MemoryContext mycontext)
|
|
{
|
|
TupleRemapInfo **remapinfo;
|
|
bool noop = true;
|
|
int i;
|
|
|
|
/* Recursively determine the remapping status of each field. */
|
|
remapinfo = (TupleRemapInfo **)
|
|
MemoryContextAlloc(mycontext,
|
|
tupledesc->natts * sizeof(TupleRemapInfo *));
|
|
for (i = 0; i < tupledesc->natts; i++)
|
|
{
|
|
Form_pg_attribute attr = TupleDescAttr(tupledesc, i);
|
|
|
|
if (attr->attisdropped)
|
|
{
|
|
remapinfo[i] = NULL;
|
|
continue;
|
|
}
|
|
remapinfo[i] = BuildTupleRemapInfo(attr->atttypid, mycontext);
|
|
if (remapinfo[i] != NULL)
|
|
noop = false;
|
|
}
|
|
|
|
/* If no fields require remapping, report that by returning NULL. */
|
|
if (noop)
|
|
{
|
|
pfree(remapinfo);
|
|
remapinfo = NULL;
|
|
}
|
|
|
|
return remapinfo;
|
|
}
|