mirror of
https://github.com/postgres/postgres.git
synced 2025-05-17 06:41:24 +03:00
When doing record typmod remapping, tqueue.c did fresh catalog lookups for each tuple it processed, which was pretty horrible performance-wise (it seemed to about halve the already none-too-quick speed of bulk reads in parallel mode). Worse, it insisted on putting bits of that data into TopMemoryContext, from where it never freed them, causing a session-lifespan memory leak. (I suppose this was coded with the idea that the sender process would quit after finishing the query --- but the receiver uses the same code.) Restructure to avoid repetitive catalog lookups and to keep that data in a query-lifespan context, in or below the context where the TQueueDestReceiver or TupleQueueReader itself lives. Fix some other bugs such as continuing to use a tupledesc after releasing our refcount on it. Clean up cavalier datatype choices (typmods are int32, please, not int, and certainly not Oid). Improve comments and error message wording.
1279 lines
38 KiB
C
1279 lines
38 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* tqueue.c
|
|
* Use shm_mq to send & receive tuples between parallel backends
|
|
*
|
|
* Most of the complexity in this module arises from transient RECORD types,
|
|
* which all have type RECORDOID and are distinguished by typmod numbers
|
|
* that are managed per-backend (see src/backend/utils/cache/typcache.c).
|
|
* The sender's set of RECORD typmod assignments probably doesn't match the
|
|
* receiver's. To deal with this, we make the sender send a description
|
|
* of each transient RECORD type appearing in the data it sends. The
|
|
* receiver finds or creates a matching type in its own typcache, and then
|
|
* maps the sender's typmod for that type to its own typmod.
|
|
*
|
|
* A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
|
|
* under the hood, writes tuples from the executor to a shm_mq. If
|
|
* necessary, it also writes control messages describing transient
|
|
* record types used within the tuple.
|
|
*
|
|
* A TupleQueueReader reads tuples, and control messages if any are sent,
|
|
* from a shm_mq and returns the tuples. If transient record types are
|
|
* in use, it registers those types locally based on the control messages
|
|
* and rewrites the typmods sent by the remote side to the corresponding
|
|
* local record typmods.
|
|
*
|
|
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/executor/tqueue.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
#include "access/htup_details.h"
|
|
#include "catalog/pg_type.h"
|
|
#include "executor/tqueue.h"
|
|
#include "funcapi.h"
|
|
#include "lib/stringinfo.h"
|
|
#include "miscadmin.h"
|
|
#include "utils/array.h"
|
|
#include "utils/lsyscache.h"
|
|
#include "utils/memutils.h"
|
|
#include "utils/rangetypes.h"
|
|
#include "utils/syscache.h"
|
|
#include "utils/typcache.h"
|
|
|
|
|
|
/*
|
|
* The data transferred through the shm_mq is divided into messages.
|
|
* One-byte messages are mode-switch messages, telling the receiver to switch
|
|
* between "control" and "data" modes. (We always start up in "data" mode.)
|
|
* Otherwise, when in "data" mode, each message is a tuple. When in "control"
|
|
* mode, each message defines one transient-typmod-to-tupledesc mapping to
|
|
* let us interpret future tuples. Both of those cases certainly require
|
|
* more than one byte, so no confusion is possible.
|
|
*/
|
|
#define TUPLE_QUEUE_MODE_CONTROL 'c' /* mode-switch message contents */
|
|
#define TUPLE_QUEUE_MODE_DATA 'd'
|
|
|
|
/*
|
|
* Both the sender and receiver build trees of TupleRemapInfo nodes to help
|
|
* them identify which (sub) fields of transmitted tuples are composite and
|
|
* may thus need remap processing. We might need to look within arrays and
|
|
* ranges, not only composites, to find composite sub-fields. A NULL
|
|
* TupleRemapInfo pointer indicates that it is known that the described field
|
|
* is not composite and has no composite substructure.
|
|
*
|
|
* Note that we currently have to look at each composite field at runtime,
|
|
* even if we believe it's of a named composite type (i.e., not RECORD).
|
|
* This is because we allow the actual value to be a compatible transient
|
|
* RECORD type. That's grossly inefficient, and it would be good to get
|
|
* rid of the requirement, but it's not clear what would need to change.
|
|
*
|
|
* Also, we allow the top-level tuple structure, as well as the actual
|
|
* structure of composite subfields, to change from one tuple to the next
|
|
* at runtime. This may well be entirely historical, but it's mostly free
|
|
* to support given the previous requirement; and other places in the system
|
|
* also permit this, so it's not entirely clear if we could drop it.
|
|
*/
|
|
|
|
typedef enum
|
|
{
|
|
TQUEUE_REMAP_ARRAY, /* array */
|
|
TQUEUE_REMAP_RANGE, /* range */
|
|
TQUEUE_REMAP_RECORD /* composite type, named or transient */
|
|
} TupleRemapClass;
|
|
|
|
typedef struct TupleRemapInfo TupleRemapInfo;
|
|
|
|
typedef struct ArrayRemapInfo
|
|
{
|
|
int16 typlen; /* array element type's storage properties */
|
|
bool typbyval;
|
|
char typalign;
|
|
TupleRemapInfo *element_remap; /* array element type's remap info */
|
|
} ArrayRemapInfo;
|
|
|
|
typedef struct RangeRemapInfo
|
|
{
|
|
TypeCacheEntry *typcache; /* range type's typcache entry */
|
|
TupleRemapInfo *bound_remap; /* range bound type's remap info */
|
|
} RangeRemapInfo;
|
|
|
|
typedef struct RecordRemapInfo
|
|
{
|
|
/* Original (remote) type ID info last seen for this composite field */
|
|
Oid rectypid;
|
|
int32 rectypmod;
|
|
/* Local RECORD typmod, or -1 if unset; not used on sender side */
|
|
int32 localtypmod;
|
|
/* If no fields of the record require remapping, these are NULL: */
|
|
TupleDesc tupledesc; /* copy of record's tupdesc */
|
|
TupleRemapInfo **field_remap; /* each field's remap info */
|
|
} RecordRemapInfo;
|
|
|
|
struct TupleRemapInfo
|
|
{
|
|
TupleRemapClass remapclass;
|
|
union
|
|
{
|
|
ArrayRemapInfo arr;
|
|
RangeRemapInfo rng;
|
|
RecordRemapInfo rec;
|
|
} u;
|
|
};
|
|
|
|
/*
|
|
* DestReceiver object's private contents
|
|
*
|
|
* queue and tupledesc are pointers to data supplied by DestReceiver's caller.
|
|
* The recordhtab and remap info are owned by the DestReceiver and are kept
|
|
* in mycontext. tmpcontext is a tuple-lifespan context to hold cruft
|
|
* created while traversing each tuple to find record subfields.
|
|
*/
|
|
typedef struct TQueueDestReceiver
|
|
{
|
|
DestReceiver pub; /* public fields */
|
|
shm_mq_handle *queue; /* shm_mq to send to */
|
|
MemoryContext mycontext; /* context containing TQueueDestReceiver */
|
|
MemoryContext tmpcontext; /* per-tuple context, if needed */
|
|
HTAB *recordhtab; /* table of transmitted typmods, if needed */
|
|
char mode; /* current message mode */
|
|
TupleDesc tupledesc; /* current top-level tuple descriptor */
|
|
TupleRemapInfo **field_remapinfo; /* current top-level remap info */
|
|
} TQueueDestReceiver;
|
|
|
|
/*
|
|
* Hash table entries for mapping remote to local typmods.
|
|
*/
|
|
typedef struct RecordTypmodMap
|
|
{
|
|
int32 remotetypmod; /* hash key (must be first!) */
|
|
int32 localtypmod;
|
|
} RecordTypmodMap;
|
|
|
|
/*
|
|
* TupleQueueReader object's private contents
|
|
*
|
|
* queue and tupledesc are pointers to data supplied by reader's caller.
|
|
* The typmodmap and remap info are owned by the TupleQueueReader and
|
|
* are kept in mycontext.
|
|
*
|
|
* "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
|
|
*/
|
|
struct TupleQueueReader
|
|
{
|
|
shm_mq_handle *queue; /* shm_mq to receive from */
|
|
MemoryContext mycontext; /* context containing TupleQueueReader */
|
|
HTAB *typmodmap; /* RecordTypmodMap hashtable, if needed */
|
|
char mode; /* current message mode */
|
|
TupleDesc tupledesc; /* current top-level tuple descriptor */
|
|
TupleRemapInfo **field_remapinfo; /* current top-level remap info */
|
|
};
|
|
|
|
/* Local function prototypes */
|
|
static void TQExamine(TQueueDestReceiver *tqueue,
|
|
TupleRemapInfo *remapinfo,
|
|
Datum value);
|
|
static void TQExamineArray(TQueueDestReceiver *tqueue,
|
|
ArrayRemapInfo *remapinfo,
|
|
Datum value);
|
|
static void TQExamineRange(TQueueDestReceiver *tqueue,
|
|
RangeRemapInfo *remapinfo,
|
|
Datum value);
|
|
static void TQExamineRecord(TQueueDestReceiver *tqueue,
|
|
RecordRemapInfo *remapinfo,
|
|
Datum value);
|
|
static void TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod,
|
|
TupleDesc tupledesc);
|
|
static void TupleQueueHandleControlMessage(TupleQueueReader *reader,
|
|
Size nbytes, char *data);
|
|
static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader,
|
|
Size nbytes, HeapTupleHeader data);
|
|
static HeapTuple TQRemapTuple(TupleQueueReader *reader,
|
|
TupleDesc tupledesc,
|
|
TupleRemapInfo **field_remapinfo,
|
|
HeapTuple tuple);
|
|
static Datum TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo,
|
|
Datum value, bool *changed);
|
|
static Datum TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo,
|
|
Datum value, bool *changed);
|
|
static Datum TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo,
|
|
Datum value, bool *changed);
|
|
static Datum TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo,
|
|
Datum value, bool *changed);
|
|
static TupleRemapInfo *BuildTupleRemapInfo(Oid typid, MemoryContext mycontext);
|
|
static TupleRemapInfo *BuildArrayRemapInfo(Oid elemtypid,
|
|
MemoryContext mycontext);
|
|
static TupleRemapInfo *BuildRangeRemapInfo(Oid rngtypid,
|
|
MemoryContext mycontext);
|
|
static TupleRemapInfo **BuildFieldRemapInfo(TupleDesc tupledesc,
|
|
MemoryContext mycontext);
|
|
|
|
|
|
/*
|
|
* Receive a tuple from a query, and send it to the designated shm_mq.
|
|
*
|
|
* Returns TRUE if successful, FALSE if shm_mq has been detached.
|
|
*/
|
|
static bool
|
|
tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
|
|
{
|
|
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
|
|
TupleDesc tupledesc = slot->tts_tupleDescriptor;
|
|
HeapTuple tuple;
|
|
shm_mq_result result;
|
|
|
|
/*
|
|
* If first time through, compute remapping info for the top-level fields.
|
|
* On later calls, if the tupledesc has changed, set up for the new
|
|
* tupledesc. (This is a strange test both because the executor really
|
|
* shouldn't change the tupledesc, and also because it would be unsafe if
|
|
* the old tupledesc could be freed and a new one allocated at the same
|
|
* address. But since some very old code in printtup.c uses a similar
|
|
* approach, we adopt it here as well.)
|
|
*
|
|
* Here and elsewhere in this module, when replacing remapping info we
|
|
* pfree the top-level object because that's easy, but we don't bother to
|
|
* recursively free any substructure. This would lead to query-lifespan
|
|
* memory leaks if the mapping info actually changed frequently, but since
|
|
* we don't expect that to happen, it doesn't seem worth expending code to
|
|
* prevent it.
|
|
*/
|
|
if (tqueue->tupledesc != tupledesc)
|
|
{
|
|
/* Is it worth trying to free substructure of the remap tree? */
|
|
if (tqueue->field_remapinfo != NULL)
|
|
pfree(tqueue->field_remapinfo);
|
|
tqueue->field_remapinfo = BuildFieldRemapInfo(tupledesc,
|
|
tqueue->mycontext);
|
|
tqueue->tupledesc = tupledesc;
|
|
}
|
|
|
|
/*
|
|
* When, because of the types being transmitted, no record typmod mapping
|
|
* can be needed, we can skip a good deal of work.
|
|
*/
|
|
if (tqueue->field_remapinfo != NULL)
|
|
{
|
|
TupleRemapInfo **remapinfo = tqueue->field_remapinfo;
|
|
int i;
|
|
MemoryContext oldcontext = NULL;
|
|
|
|
/* Deform the tuple so we can examine fields, if not done already. */
|
|
slot_getallattrs(slot);
|
|
|
|
/* Iterate over each attribute and search it for transient typmods. */
|
|
for (i = 0; i < tupledesc->natts; i++)
|
|
{
|
|
/* Ignore nulls and types that don't need special handling. */
|
|
if (slot->tts_isnull[i] || remapinfo[i] == NULL)
|
|
continue;
|
|
|
|
/* Switch to temporary memory context to avoid leaking. */
|
|
if (oldcontext == NULL)
|
|
{
|
|
if (tqueue->tmpcontext == NULL)
|
|
tqueue->tmpcontext =
|
|
AllocSetContextCreate(tqueue->mycontext,
|
|
"tqueue sender temp context",
|
|
ALLOCSET_DEFAULT_MINSIZE,
|
|
ALLOCSET_DEFAULT_INITSIZE,
|
|
ALLOCSET_DEFAULT_MAXSIZE);
|
|
oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext);
|
|
}
|
|
|
|
/* Examine the value. */
|
|
TQExamine(tqueue, remapinfo[i], slot->tts_values[i]);
|
|
}
|
|
|
|
/* If we used the temp context, reset it and restore prior context. */
|
|
if (oldcontext != NULL)
|
|
{
|
|
MemoryContextSwitchTo(oldcontext);
|
|
MemoryContextReset(tqueue->tmpcontext);
|
|
}
|
|
|
|
/* If we entered control mode, switch back to data mode. */
|
|
if (tqueue->mode != TUPLE_QUEUE_MODE_DATA)
|
|
{
|
|
tqueue->mode = TUPLE_QUEUE_MODE_DATA;
|
|
shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
|
|
}
|
|
}
|
|
|
|
/* Send the tuple itself. */
|
|
tuple = ExecMaterializeSlot(slot);
|
|
result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false);
|
|
|
|
/* Check for failure. */
|
|
if (result == SHM_MQ_DETACHED)
|
|
return false;
|
|
else if (result != SHM_MQ_SUCCESS)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("could not send tuple to shared-memory queue")));
|
|
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* Examine the given datum and send any necessary control messages for
|
|
* transient record types contained in it.
|
|
*
|
|
* remapinfo is previously-computed remapping info about the datum's type.
|
|
*
|
|
* This function just dispatches based on the remap class.
|
|
*/
|
|
static void
|
|
TQExamine(TQueueDestReceiver *tqueue, TupleRemapInfo *remapinfo, Datum value)
|
|
{
|
|
/* This is recursive, so it could be driven to stack overflow. */
|
|
check_stack_depth();
|
|
|
|
switch (remapinfo->remapclass)
|
|
{
|
|
case TQUEUE_REMAP_ARRAY:
|
|
TQExamineArray(tqueue, &remapinfo->u.arr, value);
|
|
break;
|
|
case TQUEUE_REMAP_RANGE:
|
|
TQExamineRange(tqueue, &remapinfo->u.rng, value);
|
|
break;
|
|
case TQUEUE_REMAP_RECORD:
|
|
TQExamineRecord(tqueue, &remapinfo->u.rec, value);
|
|
break;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Examine a record datum and send any necessary control messages for
|
|
* transient record types contained in it.
|
|
*/
|
|
static void
|
|
TQExamineRecord(TQueueDestReceiver *tqueue, RecordRemapInfo *remapinfo,
|
|
Datum value)
|
|
{
|
|
HeapTupleHeader tup;
|
|
Oid typid;
|
|
int32 typmod;
|
|
TupleDesc tupledesc;
|
|
|
|
/* Extract type OID and typmod from tuple. */
|
|
tup = DatumGetHeapTupleHeader(value);
|
|
typid = HeapTupleHeaderGetTypeId(tup);
|
|
typmod = HeapTupleHeaderGetTypMod(tup);
|
|
|
|
/*
|
|
* If first time through, or if this isn't the same composite type as last
|
|
* time, consider sending a control message, and then look up the
|
|
* necessary information for examining the fields.
|
|
*/
|
|
if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
|
|
{
|
|
/* Free any old data. */
|
|
if (remapinfo->tupledesc != NULL)
|
|
FreeTupleDesc(remapinfo->tupledesc);
|
|
/* Is it worth trying to free substructure of the remap tree? */
|
|
if (remapinfo->field_remap != NULL)
|
|
pfree(remapinfo->field_remap);
|
|
|
|
/* Look up tuple descriptor in typcache. */
|
|
tupledesc = lookup_rowtype_tupdesc(typid, typmod);
|
|
|
|
/*
|
|
* If this is a transient record type, send the tupledesc in a control
|
|
* message. (TQSendRecordInfo is smart enough to do this only once
|
|
* per typmod.)
|
|
*/
|
|
if (typid == RECORDOID)
|
|
TQSendRecordInfo(tqueue, typmod, tupledesc);
|
|
|
|
/* Figure out whether fields need recursive processing. */
|
|
remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
|
|
tqueue->mycontext);
|
|
if (remapinfo->field_remap != NULL)
|
|
{
|
|
/*
|
|
* We need to inspect the record contents, so save a copy of the
|
|
* tupdesc. (We could possibly just reference the typcache's
|
|
* copy, but then it's problematic when to release the refcount.)
|
|
*/
|
|
MemoryContext oldcontext = MemoryContextSwitchTo(tqueue->mycontext);
|
|
|
|
remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
|
|
MemoryContextSwitchTo(oldcontext);
|
|
}
|
|
else
|
|
{
|
|
/* No fields of the record require remapping. */
|
|
remapinfo->tupledesc = NULL;
|
|
}
|
|
remapinfo->rectypid = typid;
|
|
remapinfo->rectypmod = typmod;
|
|
|
|
/* Release reference count acquired by lookup_rowtype_tupdesc. */
|
|
DecrTupleDescRefCount(tupledesc);
|
|
}
|
|
|
|
/*
|
|
* If field remapping is required, deform the tuple and examine each
|
|
* field.
|
|
*/
|
|
if (remapinfo->field_remap != NULL)
|
|
{
|
|
Datum *values;
|
|
bool *isnull;
|
|
HeapTupleData tdata;
|
|
int i;
|
|
|
|
/* Deform the tuple so we can check each column within. */
|
|
tupledesc = remapinfo->tupledesc;
|
|
values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
|
|
isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
|
|
tdata.t_len = HeapTupleHeaderGetDatumLength(tup);
|
|
ItemPointerSetInvalid(&(tdata.t_self));
|
|
tdata.t_tableOid = InvalidOid;
|
|
tdata.t_data = tup;
|
|
heap_deform_tuple(&tdata, tupledesc, values, isnull);
|
|
|
|
/* Recursively check each interesting non-NULL attribute. */
|
|
for (i = 0; i < tupledesc->natts; i++)
|
|
{
|
|
if (!isnull[i] && remapinfo->field_remap[i])
|
|
TQExamine(tqueue, remapinfo->field_remap[i], values[i]);
|
|
}
|
|
|
|
/* Need not clean up, since we're in a short-lived context. */
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Examine an array datum and send any necessary control messages for
|
|
* transient record types contained in it.
|
|
*/
|
|
static void
|
|
TQExamineArray(TQueueDestReceiver *tqueue, ArrayRemapInfo *remapinfo,
|
|
Datum value)
|
|
{
|
|
ArrayType *arr = DatumGetArrayTypeP(value);
|
|
Oid typid = ARR_ELEMTYPE(arr);
|
|
Datum *elem_values;
|
|
bool *elem_nulls;
|
|
int num_elems;
|
|
int i;
|
|
|
|
/* Deconstruct the array. */
|
|
deconstruct_array(arr, typid, remapinfo->typlen,
|
|
remapinfo->typbyval, remapinfo->typalign,
|
|
&elem_values, &elem_nulls, &num_elems);
|
|
|
|
/* Examine each element. */
|
|
for (i = 0; i < num_elems; i++)
|
|
{
|
|
if (!elem_nulls[i])
|
|
TQExamine(tqueue, remapinfo->element_remap, elem_values[i]);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Examine a range datum and send any necessary control messages for
|
|
* transient record types contained in it.
|
|
*/
|
|
static void
|
|
TQExamineRange(TQueueDestReceiver *tqueue, RangeRemapInfo *remapinfo,
|
|
Datum value)
|
|
{
|
|
RangeType *range = DatumGetRangeType(value);
|
|
RangeBound lower;
|
|
RangeBound upper;
|
|
bool empty;
|
|
|
|
/* Extract the lower and upper bounds. */
|
|
range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
|
|
|
|
/* Nothing to do for an empty range. */
|
|
if (empty)
|
|
return;
|
|
|
|
/* Examine each bound, if present. */
|
|
if (!upper.infinite)
|
|
TQExamine(tqueue, remapinfo->bound_remap, upper.val);
|
|
if (!lower.infinite)
|
|
TQExamine(tqueue, remapinfo->bound_remap, lower.val);
|
|
}
|
|
|
|
/*
|
|
* Send tuple descriptor information for a transient typmod, unless we've
|
|
* already done so previously.
|
|
*/
|
|
static void
|
|
TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod, TupleDesc tupledesc)
|
|
{
|
|
StringInfoData buf;
|
|
bool found;
|
|
int i;
|
|
|
|
/* Initialize hash table if not done yet. */
|
|
if (tqueue->recordhtab == NULL)
|
|
{
|
|
HASHCTL ctl;
|
|
|
|
MemSet(&ctl, 0, sizeof(ctl));
|
|
/* Hash table entries are just typmods */
|
|
ctl.keysize = sizeof(int32);
|
|
ctl.entrysize = sizeof(int32);
|
|
ctl.hcxt = tqueue->mycontext;
|
|
tqueue->recordhtab = hash_create("tqueue sender record type hashtable",
|
|
100, &ctl,
|
|
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
|
|
}
|
|
|
|
/* Have we already seen this record type? If not, must report it. */
|
|
hash_search(tqueue->recordhtab, &typmod, HASH_ENTER, &found);
|
|
if (found)
|
|
return;
|
|
|
|
elog(DEBUG3, "sending tqueue control message for record typmod %d", typmod);
|
|
|
|
/* If message queue is in data mode, switch to control mode. */
|
|
if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL)
|
|
{
|
|
tqueue->mode = TUPLE_QUEUE_MODE_CONTROL;
|
|
shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
|
|
}
|
|
|
|
/* Assemble a control message. */
|
|
initStringInfo(&buf);
|
|
appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int32));
|
|
appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int));
|
|
appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid, sizeof(bool));
|
|
for (i = 0; i < tupledesc->natts; i++)
|
|
{
|
|
appendBinaryStringInfo(&buf, (char *) tupledesc->attrs[i],
|
|
sizeof(FormData_pg_attribute));
|
|
}
|
|
|
|
/* Send control message. */
|
|
shm_mq_send(tqueue->queue, buf.len, buf.data, false);
|
|
|
|
/* We assume it's OK to leak buf because we're in a short-lived context. */
|
|
}
|
|
|
|
/*
|
|
* Prepare to receive tuples from executor.
|
|
*/
|
|
static void
|
|
tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
|
|
{
|
|
/* do nothing */
|
|
}
|
|
|
|
/*
|
|
* Clean up at end of an executor run
|
|
*/
|
|
static void
|
|
tqueueShutdownReceiver(DestReceiver *self)
|
|
{
|
|
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
|
|
|
|
shm_mq_detach(shm_mq_get_queue(tqueue->queue));
|
|
}
|
|
|
|
/*
|
|
* Destroy receiver when done with it
|
|
*/
|
|
static void
|
|
tqueueDestroyReceiver(DestReceiver *self)
|
|
{
|
|
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
|
|
|
|
if (tqueue->tmpcontext != NULL)
|
|
MemoryContextDelete(tqueue->tmpcontext);
|
|
if (tqueue->recordhtab != NULL)
|
|
hash_destroy(tqueue->recordhtab);
|
|
/* Is it worth trying to free substructure of the remap tree? */
|
|
if (tqueue->field_remapinfo != NULL)
|
|
pfree(tqueue->field_remapinfo);
|
|
pfree(self);
|
|
}
|
|
|
|
/*
|
|
* Create a DestReceiver that writes tuples to a tuple queue.
|
|
*/
|
|
DestReceiver *
|
|
CreateTupleQueueDestReceiver(shm_mq_handle *handle)
|
|
{
|
|
TQueueDestReceiver *self;
|
|
|
|
self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
|
|
|
|
self->pub.receiveSlot = tqueueReceiveSlot;
|
|
self->pub.rStartup = tqueueStartupReceiver;
|
|
self->pub.rShutdown = tqueueShutdownReceiver;
|
|
self->pub.rDestroy = tqueueDestroyReceiver;
|
|
self->pub.mydest = DestTupleQueue;
|
|
self->queue = handle;
|
|
self->mycontext = CurrentMemoryContext;
|
|
self->tmpcontext = NULL;
|
|
self->recordhtab = NULL;
|
|
self->mode = TUPLE_QUEUE_MODE_DATA;
|
|
/* Top-level tupledesc is not known yet */
|
|
self->tupledesc = NULL;
|
|
self->field_remapinfo = NULL;
|
|
|
|
return (DestReceiver *) self;
|
|
}
|
|
|
|
/*
|
|
* Create a tuple queue reader.
|
|
*/
|
|
TupleQueueReader *
|
|
CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
|
|
{
|
|
TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
|
|
|
|
reader->queue = handle;
|
|
reader->mycontext = CurrentMemoryContext;
|
|
reader->typmodmap = NULL;
|
|
reader->mode = TUPLE_QUEUE_MODE_DATA;
|
|
reader->tupledesc = tupledesc;
|
|
reader->field_remapinfo = BuildFieldRemapInfo(tupledesc, reader->mycontext);
|
|
|
|
return reader;
|
|
}
|
|
|
|
/*
|
|
* Destroy a tuple queue reader.
|
|
*/
|
|
void
|
|
DestroyTupleQueueReader(TupleQueueReader *reader)
|
|
{
|
|
shm_mq_detach(shm_mq_get_queue(reader->queue));
|
|
if (reader->typmodmap != NULL)
|
|
hash_destroy(reader->typmodmap);
|
|
/* Is it worth trying to free substructure of the remap tree? */
|
|
if (reader->field_remapinfo != NULL)
|
|
pfree(reader->field_remapinfo);
|
|
pfree(reader);
|
|
}
|
|
|
|
/*
|
|
* Fetch a tuple from a tuple queue reader.
|
|
*
|
|
* The return value is NULL if there are no remaining tuples or if
|
|
* nowait = true and no tuple is ready to return. *done, if not NULL,
|
|
* is set to true when there are no remaining tuples and otherwise to false.
|
|
*
|
|
* The returned tuple, if any, is allocated in CurrentMemoryContext.
|
|
* That should be a short-lived (tuple-lifespan) context, because we are
|
|
* pretty cavalier about leaking memory in that context if we have to do
|
|
* tuple remapping.
|
|
*
|
|
* Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
|
|
* accumulate bytes from a partially-read message, so it's useful to call
|
|
* this with nowait = true even if nothing is returned.
|
|
*/
|
|
HeapTuple
|
|
TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
|
|
{
|
|
shm_mq_result result;
|
|
|
|
if (done != NULL)
|
|
*done = false;
|
|
|
|
for (;;)
|
|
{
|
|
Size nbytes;
|
|
void *data;
|
|
|
|
/* Attempt to read a message. */
|
|
result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
|
|
|
|
/* If queue is detached, set *done and return NULL. */
|
|
if (result == SHM_MQ_DETACHED)
|
|
{
|
|
if (done != NULL)
|
|
*done = true;
|
|
return NULL;
|
|
}
|
|
|
|
/* In non-blocking mode, bail out if no message ready yet. */
|
|
if (result == SHM_MQ_WOULD_BLOCK)
|
|
return NULL;
|
|
Assert(result == SHM_MQ_SUCCESS);
|
|
|
|
/*
|
|
* We got a message (see message spec at top of file). Process it.
|
|
*/
|
|
if (nbytes == 1)
|
|
{
|
|
/* Mode switch message. */
|
|
reader->mode = ((char *) data)[0];
|
|
}
|
|
else if (reader->mode == TUPLE_QUEUE_MODE_DATA)
|
|
{
|
|
/* Tuple data. */
|
|
return TupleQueueHandleDataMessage(reader, nbytes, data);
|
|
}
|
|
else if (reader->mode == TUPLE_QUEUE_MODE_CONTROL)
|
|
{
|
|
/* Control message, describing a transient record type. */
|
|
TupleQueueHandleControlMessage(reader, nbytes, data);
|
|
}
|
|
else
|
|
elog(ERROR, "unrecognized tqueue mode: %d", (int) reader->mode);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Handle a data message - that is, a tuple - from the remote side.
|
|
*/
|
|
static HeapTuple
|
|
TupleQueueHandleDataMessage(TupleQueueReader *reader,
|
|
Size nbytes,
|
|
HeapTupleHeader data)
|
|
{
|
|
HeapTupleData htup;
|
|
|
|
/*
|
|
* Set up a dummy HeapTupleData pointing to the data from the shm_mq
|
|
* (which had better be sufficiently aligned).
|
|
*/
|
|
ItemPointerSetInvalid(&htup.t_self);
|
|
htup.t_tableOid = InvalidOid;
|
|
htup.t_len = nbytes;
|
|
htup.t_data = data;
|
|
|
|
/*
|
|
* Either just copy the data into a regular palloc'd tuple, or remap it,
|
|
* as required.
|
|
*/
|
|
return TQRemapTuple(reader,
|
|
reader->tupledesc,
|
|
reader->field_remapinfo,
|
|
&htup);
|
|
}
|
|
|
|
/*
|
|
* Copy the given tuple, remapping any transient typmods contained in it.
|
|
*/
|
|
static HeapTuple
|
|
TQRemapTuple(TupleQueueReader *reader,
|
|
TupleDesc tupledesc,
|
|
TupleRemapInfo **field_remapinfo,
|
|
HeapTuple tuple)
|
|
{
|
|
Datum *values;
|
|
bool *isnull;
|
|
bool changed = false;
|
|
int i;
|
|
|
|
/*
|
|
* If no remapping is necessary, just copy the tuple into a single
|
|
* palloc'd chunk, as caller will expect.
|
|
*/
|
|
if (field_remapinfo == NULL)
|
|
return heap_copytuple(tuple);
|
|
|
|
/* Deform tuple so we can remap record typmods for individual attrs. */
|
|
values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
|
|
isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
|
|
heap_deform_tuple(tuple, tupledesc, values, isnull);
|
|
|
|
/* Recursively process each interesting non-NULL attribute. */
|
|
for (i = 0; i < tupledesc->natts; i++)
|
|
{
|
|
if (isnull[i] || field_remapinfo[i] == NULL)
|
|
continue;
|
|
values[i] = TQRemap(reader, field_remapinfo[i], values[i], &changed);
|
|
}
|
|
|
|
/* Reconstruct the modified tuple, if anything was modified. */
|
|
if (changed)
|
|
return heap_form_tuple(tupledesc, values, isnull);
|
|
else
|
|
return heap_copytuple(tuple);
|
|
}
|
|
|
|
/*
|
|
* Process the given datum and replace any transient record typmods
|
|
* contained in it. Set *changed to TRUE if we actually changed the datum.
|
|
*
|
|
* remapinfo is previously-computed remapping info about the datum's type.
|
|
*
|
|
* This function just dispatches based on the remap class.
|
|
*/
|
|
static Datum
|
|
TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo,
|
|
Datum value, bool *changed)
|
|
{
|
|
/* This is recursive, so it could be driven to stack overflow. */
|
|
check_stack_depth();
|
|
|
|
switch (remapinfo->remapclass)
|
|
{
|
|
case TQUEUE_REMAP_ARRAY:
|
|
return TQRemapArray(reader, &remapinfo->u.arr, value, changed);
|
|
|
|
case TQUEUE_REMAP_RANGE:
|
|
return TQRemapRange(reader, &remapinfo->u.rng, value, changed);
|
|
|
|
case TQUEUE_REMAP_RECORD:
|
|
return TQRemapRecord(reader, &remapinfo->u.rec, value, changed);
|
|
}
|
|
|
|
elog(ERROR, "unrecognized tqueue remap class: %d",
|
|
(int) remapinfo->remapclass);
|
|
return (Datum) 0;
|
|
}
|
|
|
|
/*
|
|
* Process the given array datum and replace any transient record typmods
|
|
* contained in it. Set *changed to TRUE if we actually changed the datum.
|
|
*/
|
|
static Datum
|
|
TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo,
|
|
Datum value, bool *changed)
|
|
{
|
|
ArrayType *arr = DatumGetArrayTypeP(value);
|
|
Oid typid = ARR_ELEMTYPE(arr);
|
|
bool element_changed = false;
|
|
Datum *elem_values;
|
|
bool *elem_nulls;
|
|
int num_elems;
|
|
int i;
|
|
|
|
/* Deconstruct the array. */
|
|
deconstruct_array(arr, typid, remapinfo->typlen,
|
|
remapinfo->typbyval, remapinfo->typalign,
|
|
&elem_values, &elem_nulls, &num_elems);
|
|
|
|
/* Remap each element. */
|
|
for (i = 0; i < num_elems; i++)
|
|
{
|
|
if (!elem_nulls[i])
|
|
elem_values[i] = TQRemap(reader,
|
|
remapinfo->element_remap,
|
|
elem_values[i],
|
|
&element_changed);
|
|
}
|
|
|
|
if (element_changed)
|
|
{
|
|
/* Reconstruct and return the array. */
|
|
*changed = true;
|
|
arr = construct_md_array(elem_values, elem_nulls,
|
|
ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
|
|
typid, remapinfo->typlen,
|
|
remapinfo->typbyval, remapinfo->typalign);
|
|
return PointerGetDatum(arr);
|
|
}
|
|
|
|
/* Else just return the value as-is. */
|
|
return value;
|
|
}
|
|
|
|
/*
|
|
* Process the given range datum and replace any transient record typmods
|
|
* contained in it. Set *changed to TRUE if we actually changed the datum.
|
|
*/
|
|
static Datum
|
|
TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo,
|
|
Datum value, bool *changed)
|
|
{
|
|
RangeType *range = DatumGetRangeType(value);
|
|
bool bound_changed = false;
|
|
RangeBound lower;
|
|
RangeBound upper;
|
|
bool empty;
|
|
|
|
/* Extract the lower and upper bounds. */
|
|
range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
|
|
|
|
/* Nothing to do for an empty range. */
|
|
if (empty)
|
|
return value;
|
|
|
|
/* Remap each bound, if present. */
|
|
if (!upper.infinite)
|
|
upper.val = TQRemap(reader, remapinfo->bound_remap,
|
|
upper.val, &bound_changed);
|
|
if (!lower.infinite)
|
|
lower.val = TQRemap(reader, remapinfo->bound_remap,
|
|
lower.val, &bound_changed);
|
|
|
|
if (bound_changed)
|
|
{
|
|
/* Reserialize. */
|
|
*changed = true;
|
|
range = range_serialize(remapinfo->typcache, &lower, &upper, empty);
|
|
return RangeTypeGetDatum(range);
|
|
}
|
|
|
|
/* Else just return the value as-is. */
|
|
return value;
|
|
}
|
|
|
|
/*
|
|
* Process the given record datum and replace any transient record typmods
|
|
* contained in it. Set *changed to TRUE if we actually changed the datum.
|
|
*/
|
|
static Datum
|
|
TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo,
|
|
Datum value, bool *changed)
|
|
{
|
|
HeapTupleHeader tup;
|
|
Oid typid;
|
|
int32 typmod;
|
|
bool changed_typmod;
|
|
TupleDesc tupledesc;
|
|
|
|
/* Extract type OID and typmod from tuple. */
|
|
tup = DatumGetHeapTupleHeader(value);
|
|
typid = HeapTupleHeaderGetTypeId(tup);
|
|
typmod = HeapTupleHeaderGetTypMod(tup);
|
|
|
|
/*
|
|
* If first time through, or if this isn't the same composite type as last
|
|
* time, identify the required typmod mapping, and then look up the
|
|
* necessary information for processing the fields.
|
|
*/
|
|
if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
|
|
{
|
|
/* Free any old data. */
|
|
if (remapinfo->tupledesc != NULL)
|
|
FreeTupleDesc(remapinfo->tupledesc);
|
|
/* Is it worth trying to free substructure of the remap tree? */
|
|
if (remapinfo->field_remap != NULL)
|
|
pfree(remapinfo->field_remap);
|
|
|
|
/* If transient record type, look up matching local typmod. */
|
|
if (typid == RECORDOID)
|
|
{
|
|
RecordTypmodMap *mapent;
|
|
|
|
Assert(reader->typmodmap != NULL);
|
|
mapent = hash_search(reader->typmodmap, &typmod,
|
|
HASH_FIND, NULL);
|
|
if (mapent == NULL)
|
|
elog(ERROR, "tqueue received unrecognized remote typmod %d",
|
|
typmod);
|
|
remapinfo->localtypmod = mapent->localtypmod;
|
|
}
|
|
else
|
|
remapinfo->localtypmod = -1;
|
|
|
|
/* Look up tuple descriptor in typcache. */
|
|
tupledesc = lookup_rowtype_tupdesc(typid, remapinfo->localtypmod);
|
|
|
|
/* Figure out whether fields need recursive processing. */
|
|
remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
|
|
reader->mycontext);
|
|
if (remapinfo->field_remap != NULL)
|
|
{
|
|
/*
|
|
* We need to inspect the record contents, so save a copy of the
|
|
* tupdesc. (We could possibly just reference the typcache's
|
|
* copy, but then it's problematic when to release the refcount.)
|
|
*/
|
|
MemoryContext oldcontext = MemoryContextSwitchTo(reader->mycontext);
|
|
|
|
remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
|
|
MemoryContextSwitchTo(oldcontext);
|
|
}
|
|
else
|
|
{
|
|
/* No fields of the record require remapping. */
|
|
remapinfo->tupledesc = NULL;
|
|
}
|
|
remapinfo->rectypid = typid;
|
|
remapinfo->rectypmod = typmod;
|
|
|
|
/* Release reference count acquired by lookup_rowtype_tupdesc. */
|
|
DecrTupleDescRefCount(tupledesc);
|
|
}
|
|
|
|
/* If transient record, replace remote typmod with local typmod. */
|
|
if (typid == RECORDOID && typmod != remapinfo->localtypmod)
|
|
{
|
|
typmod = remapinfo->localtypmod;
|
|
changed_typmod = true;
|
|
}
|
|
else
|
|
changed_typmod = false;
|
|
|
|
/*
|
|
* If we need to change the typmod, or if there are any potentially
|
|
* remappable fields, replace the tuple.
|
|
*/
|
|
if (changed_typmod || remapinfo->field_remap != NULL)
|
|
{
|
|
HeapTupleData htup;
|
|
HeapTuple atup;
|
|
|
|
/* For now, assume we always need to change the tuple in this case. */
|
|
*changed = true;
|
|
|
|
/* Copy tuple, possibly remapping contained fields. */
|
|
ItemPointerSetInvalid(&htup.t_self);
|
|
htup.t_tableOid = InvalidOid;
|
|
htup.t_len = HeapTupleHeaderGetDatumLength(tup);
|
|
htup.t_data = tup;
|
|
atup = TQRemapTuple(reader,
|
|
remapinfo->tupledesc,
|
|
remapinfo->field_remap,
|
|
&htup);
|
|
|
|
/* Apply the correct labeling for a local Datum. */
|
|
HeapTupleHeaderSetTypeId(atup->t_data, typid);
|
|
HeapTupleHeaderSetTypMod(atup->t_data, typmod);
|
|
HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len);
|
|
|
|
/* And return the results. */
|
|
return HeapTupleHeaderGetDatum(atup->t_data);
|
|
}
|
|
|
|
/* Else just return the value as-is. */
|
|
return value;
|
|
}
|
|
|
|
/*
|
|
* Handle a control message from the tuple queue reader.
|
|
*
|
|
* Control messages are sent when the remote side is sending tuples that
|
|
* contain transient record types. We need to arrange to bless those
|
|
* record types locally and translate between remote and local typmods.
|
|
*/
|
|
static void
|
|
TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes,
|
|
char *data)
|
|
{
|
|
int32 remotetypmod;
|
|
int natts;
|
|
bool hasoid;
|
|
Size offset = 0;
|
|
Form_pg_attribute *attrs;
|
|
TupleDesc tupledesc;
|
|
RecordTypmodMap *mapent;
|
|
bool found;
|
|
int i;
|
|
|
|
/* Extract remote typmod. */
|
|
memcpy(&remotetypmod, &data[offset], sizeof(int32));
|
|
offset += sizeof(int32);
|
|
|
|
/* Extract attribute count. */
|
|
memcpy(&natts, &data[offset], sizeof(int));
|
|
offset += sizeof(int);
|
|
|
|
/* Extract hasoid flag. */
|
|
memcpy(&hasoid, &data[offset], sizeof(bool));
|
|
offset += sizeof(bool);
|
|
|
|
/* Extract attribute details. The tupledesc made here is just transient. */
|
|
attrs = palloc(natts * sizeof(Form_pg_attribute));
|
|
for (i = 0; i < natts; i++)
|
|
{
|
|
attrs[i] = palloc(sizeof(FormData_pg_attribute));
|
|
memcpy(attrs[i], &data[offset], sizeof(FormData_pg_attribute));
|
|
offset += sizeof(FormData_pg_attribute);
|
|
}
|
|
|
|
/* We should have read the whole message. */
|
|
Assert(offset == nbytes);
|
|
|
|
/* Construct TupleDesc, and assign a local typmod. */
|
|
tupledesc = CreateTupleDesc(natts, hasoid, attrs);
|
|
tupledesc = BlessTupleDesc(tupledesc);
|
|
|
|
/* Create mapping hashtable if it doesn't exist already. */
|
|
if (reader->typmodmap == NULL)
|
|
{
|
|
HASHCTL ctl;
|
|
|
|
MemSet(&ctl, 0, sizeof(ctl));
|
|
ctl.keysize = sizeof(int32);
|
|
ctl.entrysize = sizeof(RecordTypmodMap);
|
|
ctl.hcxt = reader->mycontext;
|
|
reader->typmodmap = hash_create("tqueue receiver record type hashtable",
|
|
100, &ctl,
|
|
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
|
|
}
|
|
|
|
/* Create map entry. */
|
|
mapent = hash_search(reader->typmodmap, &remotetypmod, HASH_ENTER,
|
|
&found);
|
|
if (found)
|
|
elog(ERROR, "duplicate tqueue control message for typmod %d",
|
|
remotetypmod);
|
|
mapent->localtypmod = tupledesc->tdtypmod;
|
|
|
|
elog(DEBUG3, "tqueue mapping remote typmod %d to local typmod %d",
|
|
remotetypmod, mapent->localtypmod);
|
|
}
|
|
|
|
/*
|
|
* Build remap info for the specified data type, storing it in mycontext.
|
|
* Returns NULL if neither the type nor any subtype could require remapping.
|
|
*/
|
|
static TupleRemapInfo *
|
|
BuildTupleRemapInfo(Oid typid, MemoryContext mycontext)
|
|
{
|
|
HeapTuple tup;
|
|
Form_pg_type typ;
|
|
|
|
/* This is recursive, so it could be driven to stack overflow. */
|
|
check_stack_depth();
|
|
|
|
restart:
|
|
tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
|
|
if (!HeapTupleIsValid(tup))
|
|
elog(ERROR, "cache lookup failed for type %u", typid);
|
|
typ = (Form_pg_type) GETSTRUCT(tup);
|
|
|
|
/* Look through domains to underlying base type. */
|
|
if (typ->typtype == TYPTYPE_DOMAIN)
|
|
{
|
|
typid = typ->typbasetype;
|
|
ReleaseSysCache(tup);
|
|
goto restart;
|
|
}
|
|
|
|
/* If it's a true array type, deal with it that way. */
|
|
if (OidIsValid(typ->typelem) && typ->typlen == -1)
|
|
{
|
|
typid = typ->typelem;
|
|
ReleaseSysCache(tup);
|
|
return BuildArrayRemapInfo(typid, mycontext);
|
|
}
|
|
|
|
/* Similarly, deal with ranges appropriately. */
|
|
if (typ->typtype == TYPTYPE_RANGE)
|
|
{
|
|
ReleaseSysCache(tup);
|
|
return BuildRangeRemapInfo(typid, mycontext);
|
|
}
|
|
|
|
/*
|
|
* If it's a composite type (including RECORD), set up for remapping. We
|
|
* don't attempt to determine the status of subfields here, since we do
|
|
* not have enough information yet; just mark everything invalid.
|
|
*/
|
|
if (typ->typtype == TYPTYPE_COMPOSITE || typid == RECORDOID)
|
|
{
|
|
TupleRemapInfo *remapinfo;
|
|
|
|
remapinfo = (TupleRemapInfo *)
|
|
MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
|
|
remapinfo->remapclass = TQUEUE_REMAP_RECORD;
|
|
remapinfo->u.rec.rectypid = InvalidOid;
|
|
remapinfo->u.rec.rectypmod = -1;
|
|
remapinfo->u.rec.localtypmod = -1;
|
|
remapinfo->u.rec.tupledesc = NULL;
|
|
remapinfo->u.rec.field_remap = NULL;
|
|
ReleaseSysCache(tup);
|
|
return remapinfo;
|
|
}
|
|
|
|
/* Nothing else can possibly need remapping attention. */
|
|
ReleaseSysCache(tup);
|
|
return NULL;
|
|
}
|
|
|
|
static TupleRemapInfo *
|
|
BuildArrayRemapInfo(Oid elemtypid, MemoryContext mycontext)
|
|
{
|
|
TupleRemapInfo *remapinfo;
|
|
TupleRemapInfo *element_remapinfo;
|
|
|
|
/* See if element type requires remapping. */
|
|
element_remapinfo = BuildTupleRemapInfo(elemtypid, mycontext);
|
|
/* If not, the array doesn't either. */
|
|
if (element_remapinfo == NULL)
|
|
return NULL;
|
|
/* OK, set up to remap the array. */
|
|
remapinfo = (TupleRemapInfo *)
|
|
MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
|
|
remapinfo->remapclass = TQUEUE_REMAP_ARRAY;
|
|
get_typlenbyvalalign(elemtypid,
|
|
&remapinfo->u.arr.typlen,
|
|
&remapinfo->u.arr.typbyval,
|
|
&remapinfo->u.arr.typalign);
|
|
remapinfo->u.arr.element_remap = element_remapinfo;
|
|
return remapinfo;
|
|
}
|
|
|
|
static TupleRemapInfo *
|
|
BuildRangeRemapInfo(Oid rngtypid, MemoryContext mycontext)
|
|
{
|
|
TupleRemapInfo *remapinfo;
|
|
TupleRemapInfo *bound_remapinfo;
|
|
TypeCacheEntry *typcache;
|
|
|
|
/*
|
|
* Get range info from the typcache. We assume this pointer will stay
|
|
* valid for the duration of the query.
|
|
*/
|
|
typcache = lookup_type_cache(rngtypid, TYPECACHE_RANGE_INFO);
|
|
if (typcache->rngelemtype == NULL)
|
|
elog(ERROR, "type %u is not a range type", rngtypid);
|
|
|
|
/* See if range bound type requires remapping. */
|
|
bound_remapinfo = BuildTupleRemapInfo(typcache->rngelemtype->type_id,
|
|
mycontext);
|
|
/* If not, the range doesn't either. */
|
|
if (bound_remapinfo == NULL)
|
|
return NULL;
|
|
/* OK, set up to remap the range. */
|
|
remapinfo = (TupleRemapInfo *)
|
|
MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
|
|
remapinfo->remapclass = TQUEUE_REMAP_RANGE;
|
|
remapinfo->u.rng.typcache = typcache;
|
|
remapinfo->u.rng.bound_remap = bound_remapinfo;
|
|
return remapinfo;
|
|
}
|
|
|
|
/*
|
|
* Build remap info for fields of the type described by the given tupdesc.
|
|
* Returns an array of TupleRemapInfo pointers, or NULL if no field
|
|
* requires remapping. Data is allocated in mycontext.
|
|
*/
|
|
static TupleRemapInfo **
|
|
BuildFieldRemapInfo(TupleDesc tupledesc, MemoryContext mycontext)
|
|
{
|
|
TupleRemapInfo **remapinfo;
|
|
bool noop = true;
|
|
int i;
|
|
|
|
/* Recursively determine the remapping status of each field. */
|
|
remapinfo = (TupleRemapInfo **)
|
|
MemoryContextAlloc(mycontext,
|
|
tupledesc->natts * sizeof(TupleRemapInfo *));
|
|
for (i = 0; i < tupledesc->natts; i++)
|
|
{
|
|
Form_pg_attribute attr = tupledesc->attrs[i];
|
|
|
|
if (attr->attisdropped)
|
|
{
|
|
remapinfo[i] = NULL;
|
|
continue;
|
|
}
|
|
remapinfo[i] = BuildTupleRemapInfo(attr->atttypid, mycontext);
|
|
if (remapinfo[i] != NULL)
|
|
noop = false;
|
|
}
|
|
|
|
/* If no fields require remapping, report that by returning NULL. */
|
|
if (noop)
|
|
{
|
|
pfree(remapinfo);
|
|
remapinfo = NULL;
|
|
}
|
|
|
|
return remapinfo;
|
|
}
|