mirror of
https://github.com/postgres/postgres.git
synced 2025-05-02 11:44:50 +03:00
1023 lines
28 KiB
C
1023 lines
28 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* tqueue.c
|
|
* Use shm_mq to send & receive tuples between parallel backends
|
|
*
|
|
* A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
|
|
* under the hood, writes tuples from the executor to a shm_mq. If
|
|
* necessary, it also writes control messages describing transient
|
|
* record types used within the tuple.
|
|
*
|
|
* A TupleQueueReader reads tuples, and if any are sent control messages,
|
|
* from a shm_mq and returns the tuples. If transient record types are
|
|
* in use, it registers those types based on the received control messages
|
|
* and rewrites the typemods sent by the remote side to the corresponding
|
|
* local record typemods.
|
|
*
|
|
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/executor/tqueue.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
#include "access/htup_details.h"
|
|
#include "catalog/pg_type.h"
|
|
#include "executor/tqueue.h"
|
|
#include "funcapi.h"
|
|
#include "lib/stringinfo.h"
|
|
#include "miscadmin.h"
|
|
#include "utils/array.h"
|
|
#include "utils/lsyscache.h"
|
|
#include "utils/memutils.h"
|
|
#include "utils/rangetypes.h"
|
|
#include "utils/syscache.h"
|
|
#include "utils/typcache.h"
|
|
|
|
typedef enum
|
|
{
|
|
TQUEUE_REMAP_NONE, /* no special processing required */
|
|
TQUEUE_REMAP_ARRAY, /* array */
|
|
TQUEUE_REMAP_RANGE, /* range */
|
|
TQUEUE_REMAP_RECORD /* composite type, named or anonymous */
|
|
} RemapClass;
|
|
|
|
typedef struct
|
|
{
|
|
int natts;
|
|
RemapClass mapping[FLEXIBLE_ARRAY_MEMBER];
|
|
} RemapInfo;
|
|
|
|
typedef struct
|
|
{
|
|
DestReceiver pub;
|
|
shm_mq_handle *handle;
|
|
MemoryContext tmpcontext;
|
|
HTAB *recordhtab;
|
|
char mode;
|
|
TupleDesc tupledesc;
|
|
RemapInfo *remapinfo;
|
|
} TQueueDestReceiver;
|
|
|
|
typedef struct RecordTypemodMap
|
|
{
|
|
int remotetypmod;
|
|
int localtypmod;
|
|
} RecordTypemodMap;
|
|
|
|
struct TupleQueueReader
|
|
{
|
|
shm_mq_handle *queue;
|
|
char mode;
|
|
TupleDesc tupledesc;
|
|
RemapInfo *remapinfo;
|
|
HTAB *typmodmap;
|
|
};
|
|
|
|
#define TUPLE_QUEUE_MODE_CONTROL 'c'
|
|
#define TUPLE_QUEUE_MODE_DATA 'd'
|
|
|
|
static void tqueueWalk(TQueueDestReceiver *tqueue, RemapClass walktype,
|
|
Datum value);
|
|
static void tqueueWalkRecord(TQueueDestReceiver *tqueue, Datum value);
|
|
static void tqueueWalkArray(TQueueDestReceiver *tqueue, Datum value);
|
|
static void tqueueWalkRange(TQueueDestReceiver *tqueue, Datum value);
|
|
static void tqueueSendTypmodInfo(TQueueDestReceiver *tqueue, int typmod,
|
|
TupleDesc tupledesc);
|
|
static void TupleQueueHandleControlMessage(TupleQueueReader *reader,
|
|
Size nbytes, char *data);
|
|
static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader,
|
|
Size nbytes, HeapTupleHeader data);
|
|
static HeapTuple TupleQueueRemapTuple(TupleQueueReader *reader,
|
|
TupleDesc tupledesc, RemapInfo *remapinfo,
|
|
HeapTuple tuple);
|
|
static Datum TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass,
|
|
Datum value);
|
|
static Datum TupleQueueRemapArray(TupleQueueReader *reader, Datum value);
|
|
static Datum TupleQueueRemapRange(TupleQueueReader *reader, Datum value);
|
|
static Datum TupleQueueRemapRecord(TupleQueueReader *reader, Datum value);
|
|
static RemapClass GetRemapClass(Oid typeid);
|
|
static RemapInfo *BuildRemapInfo(TupleDesc tupledesc);
|
|
|
|
/*
|
|
* Receive a tuple.
|
|
*
|
|
* This is, at core, pretty simple: just send the tuple to the designated
|
|
* shm_mq. The complicated part is that if the tuple contains transient
|
|
* record types (see lookup_rowtype_tupdesc), we need to send control
|
|
* information to the shm_mq receiver so that those typemods can be correctly
|
|
* interpreted, as they are merely held in a backend-local cache. Worse, the
|
|
* record type may not at the top level: we could have a range over an array
|
|
* type over a range type over a range type over an array type over a record,
|
|
* or something like that.
|
|
*/
|
|
static bool
|
|
tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
|
|
{
|
|
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
|
|
TupleDesc tupledesc = slot->tts_tupleDescriptor;
|
|
HeapTuple tuple;
|
|
shm_mq_result result;
|
|
|
|
/*
|
|
* Test to see whether the tupledesc has changed; if so, set up for the
|
|
* new tupledesc. This is a strange test both because the executor really
|
|
* shouldn't change the tupledesc, and also because it would be unsafe if
|
|
* the old tupledesc could be freed and a new one allocated at the same
|
|
* address. But since some very old code in printtup.c uses a similar
|
|
* test, we adopt it here as well.
|
|
*/
|
|
if (tqueue->tupledesc != tupledesc)
|
|
{
|
|
if (tqueue->remapinfo != NULL)
|
|
pfree(tqueue->remapinfo);
|
|
tqueue->remapinfo = BuildRemapInfo(tupledesc);
|
|
tqueue->tupledesc = tupledesc;
|
|
}
|
|
|
|
tuple = ExecMaterializeSlot(slot);
|
|
|
|
/*
|
|
* When, because of the types being transmitted, no record typemod mapping
|
|
* can be needed, we can skip a good deal of work.
|
|
*/
|
|
if (tqueue->remapinfo != NULL)
|
|
{
|
|
RemapInfo *remapinfo = tqueue->remapinfo;
|
|
AttrNumber i;
|
|
MemoryContext oldcontext = NULL;
|
|
|
|
/* Deform the tuple so we can examine it, if not done already. */
|
|
slot_getallattrs(slot);
|
|
|
|
/* Iterate over each attribute and search it for transient typemods. */
|
|
Assert(slot->tts_tupleDescriptor->natts == remapinfo->natts);
|
|
for (i = 0; i < remapinfo->natts; ++i)
|
|
{
|
|
/* Ignore nulls and types that don't need special handling. */
|
|
if (slot->tts_isnull[i] ||
|
|
remapinfo->mapping[i] == TQUEUE_REMAP_NONE)
|
|
continue;
|
|
|
|
/* Switch to temporary memory context to avoid leaking. */
|
|
if (oldcontext == NULL)
|
|
{
|
|
if (tqueue->tmpcontext == NULL)
|
|
tqueue->tmpcontext =
|
|
AllocSetContextCreate(TopMemoryContext,
|
|
"tqueue temporary context",
|
|
ALLOCSET_DEFAULT_MINSIZE,
|
|
ALLOCSET_DEFAULT_INITSIZE,
|
|
ALLOCSET_DEFAULT_MAXSIZE);
|
|
oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext);
|
|
}
|
|
|
|
/* Invoke the appropriate walker function. */
|
|
tqueueWalk(tqueue, remapinfo->mapping[i], slot->tts_values[i]);
|
|
}
|
|
|
|
/* If we used the temp context, reset it and restore prior context. */
|
|
if (oldcontext != NULL)
|
|
{
|
|
MemoryContextSwitchTo(oldcontext);
|
|
MemoryContextReset(tqueue->tmpcontext);
|
|
}
|
|
|
|
/* If we entered control mode, switch back to data mode. */
|
|
if (tqueue->mode != TUPLE_QUEUE_MODE_DATA)
|
|
{
|
|
tqueue->mode = TUPLE_QUEUE_MODE_DATA;
|
|
shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false);
|
|
}
|
|
}
|
|
|
|
/* Send the tuple itself. */
|
|
result = shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
|
|
|
|
if (result == SHM_MQ_DETACHED)
|
|
return false;
|
|
else if (result != SHM_MQ_SUCCESS)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("unable to send tuples")));
|
|
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* Invoke the appropriate walker function based on the given RemapClass.
|
|
*/
|
|
static void
|
|
tqueueWalk(TQueueDestReceiver *tqueue, RemapClass walktype, Datum value)
|
|
{
|
|
check_stack_depth();
|
|
|
|
switch (walktype)
|
|
{
|
|
case TQUEUE_REMAP_NONE:
|
|
break;
|
|
case TQUEUE_REMAP_ARRAY:
|
|
tqueueWalkArray(tqueue, value);
|
|
break;
|
|
case TQUEUE_REMAP_RANGE:
|
|
tqueueWalkRange(tqueue, value);
|
|
break;
|
|
case TQUEUE_REMAP_RECORD:
|
|
tqueueWalkRecord(tqueue, value);
|
|
break;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Walk a record and send control messages for transient record types
|
|
* contained therein.
|
|
*/
|
|
static void
|
|
tqueueWalkRecord(TQueueDestReceiver *tqueue, Datum value)
|
|
{
|
|
HeapTupleHeader tup;
|
|
Oid typeid;
|
|
Oid typmod;
|
|
TupleDesc tupledesc;
|
|
RemapInfo *remapinfo;
|
|
|
|
/* Extract typmod from tuple. */
|
|
tup = DatumGetHeapTupleHeader(value);
|
|
typeid = HeapTupleHeaderGetTypeId(tup);
|
|
typmod = HeapTupleHeaderGetTypMod(tup);
|
|
|
|
/* Look up tuple descriptor in typecache. */
|
|
tupledesc = lookup_rowtype_tupdesc(typeid, typmod);
|
|
|
|
/*
|
|
* If this is a transient record time, send its TupleDesc as a control
|
|
* message. (tqueueSendTypemodInfo is smart enough to do this only once
|
|
* per typmod.)
|
|
*/
|
|
if (typeid == RECORDOID)
|
|
tqueueSendTypmodInfo(tqueue, typmod, tupledesc);
|
|
|
|
/*
|
|
* Build the remap information for this tupledesc. We might want to think
|
|
* about keeping a cache of this information keyed by typeid and typemod,
|
|
* but let's keep it simple for now.
|
|
*/
|
|
remapinfo = BuildRemapInfo(tupledesc);
|
|
|
|
/*
|
|
* If remapping is required, deform the tuple and process each field. When
|
|
* BuildRemapInfo is null, the data types are such that there can be no
|
|
* transient record types here, so we can skip all this work.
|
|
*/
|
|
if (remapinfo != NULL)
|
|
{
|
|
Datum *values;
|
|
bool *isnull;
|
|
HeapTupleData tdata;
|
|
AttrNumber i;
|
|
|
|
/* Deform the tuple so we can check each column within. */
|
|
values = palloc(tupledesc->natts * sizeof(Datum));
|
|
isnull = palloc(tupledesc->natts * sizeof(bool));
|
|
tdata.t_len = HeapTupleHeaderGetDatumLength(tup);
|
|
ItemPointerSetInvalid(&(tdata.t_self));
|
|
tdata.t_tableOid = InvalidOid;
|
|
tdata.t_data = tup;
|
|
heap_deform_tuple(&tdata, tupledesc, values, isnull);
|
|
|
|
/* Recursively check each non-NULL attribute. */
|
|
for (i = 0; i < tupledesc->natts; ++i)
|
|
if (!isnull[i])
|
|
tqueueWalk(tqueue, remapinfo->mapping[i], values[i]);
|
|
}
|
|
|
|
/* Release reference count acquired by lookup_rowtype_tupdesc. */
|
|
DecrTupleDescRefCount(tupledesc);
|
|
}
|
|
|
|
/*
|
|
* Walk a record and send control messages for transient record types
|
|
* contained therein.
|
|
*/
|
|
static void
|
|
tqueueWalkArray(TQueueDestReceiver *tqueue, Datum value)
|
|
{
|
|
ArrayType *arr = DatumGetArrayTypeP(value);
|
|
Oid typeid = ARR_ELEMTYPE(arr);
|
|
RemapClass remapclass;
|
|
int16 typlen;
|
|
bool typbyval;
|
|
char typalign;
|
|
Datum *elem_values;
|
|
bool *elem_nulls;
|
|
int num_elems;
|
|
int i;
|
|
|
|
remapclass = GetRemapClass(typeid);
|
|
|
|
/*
|
|
* If the elements of the array don't need to be walked, we shouldn't have
|
|
* been called in the first place: GetRemapClass should have returned NULL
|
|
* when asked about this array type.
|
|
*/
|
|
Assert(remapclass != TQUEUE_REMAP_NONE);
|
|
|
|
/* Deconstruct the array. */
|
|
get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign);
|
|
deconstruct_array(arr, typeid, typlen, typbyval, typalign,
|
|
&elem_values, &elem_nulls, &num_elems);
|
|
|
|
/* Walk each element. */
|
|
for (i = 0; i < num_elems; ++i)
|
|
if (!elem_nulls[i])
|
|
tqueueWalk(tqueue, remapclass, elem_values[i]);
|
|
}
|
|
|
|
/*
|
|
* Walk a range type and send control messages for transient record types
|
|
* contained therein.
|
|
*/
|
|
static void
|
|
tqueueWalkRange(TQueueDestReceiver *tqueue, Datum value)
|
|
{
|
|
RangeType *range = DatumGetRangeType(value);
|
|
Oid typeid = RangeTypeGetOid(range);
|
|
RemapClass remapclass;
|
|
TypeCacheEntry *typcache;
|
|
RangeBound lower;
|
|
RangeBound upper;
|
|
bool empty;
|
|
|
|
/*
|
|
* Extract the lower and upper bounds. It might be worth implementing
|
|
* some caching scheme here so that we don't look up the same typeids in
|
|
* the type cache repeatedly, but for now let's keep it simple.
|
|
*/
|
|
typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO);
|
|
if (typcache->rngelemtype == NULL)
|
|
elog(ERROR, "type %u is not a range type", typeid);
|
|
range_deserialize(typcache, range, &lower, &upper, &empty);
|
|
|
|
/* Nothing to do for an empty range. */
|
|
if (empty)
|
|
return;
|
|
|
|
/*
|
|
* If the range bounds don't need to be walked, we shouldn't have been
|
|
* called in the first place: GetRemapClass should have returned NULL when
|
|
* asked about this range type.
|
|
*/
|
|
remapclass = GetRemapClass(typeid);
|
|
Assert(remapclass != TQUEUE_REMAP_NONE);
|
|
|
|
/* Walk each bound, if present. */
|
|
if (!upper.infinite)
|
|
tqueueWalk(tqueue, remapclass, upper.val);
|
|
if (!lower.infinite)
|
|
tqueueWalk(tqueue, remapclass, lower.val);
|
|
}
|
|
|
|
/*
|
|
* Send tuple descriptor information for a transient typemod, unless we've
|
|
* already done so previously.
|
|
*/
|
|
static void
|
|
tqueueSendTypmodInfo(TQueueDestReceiver *tqueue, int typmod,
|
|
TupleDesc tupledesc)
|
|
{
|
|
StringInfoData buf;
|
|
bool found;
|
|
AttrNumber i;
|
|
|
|
/* Initialize hash table if not done yet. */
|
|
if (tqueue->recordhtab == NULL)
|
|
{
|
|
HASHCTL ctl;
|
|
|
|
ctl.keysize = sizeof(int);
|
|
ctl.entrysize = sizeof(int);
|
|
ctl.hcxt = TopMemoryContext;
|
|
tqueue->recordhtab = hash_create("tqueue record hashtable",
|
|
100, &ctl, HASH_ELEM | HASH_CONTEXT);
|
|
}
|
|
|
|
/* Have we already seen this record type? If not, must report it. */
|
|
hash_search(tqueue->recordhtab, &typmod, HASH_ENTER, &found);
|
|
if (found)
|
|
return;
|
|
|
|
/* If message queue is in data mode, switch to control mode. */
|
|
if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL)
|
|
{
|
|
tqueue->mode = TUPLE_QUEUE_MODE_CONTROL;
|
|
shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false);
|
|
}
|
|
|
|
/* Assemble a control message. */
|
|
initStringInfo(&buf);
|
|
appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int));
|
|
appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int));
|
|
appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid,
|
|
sizeof(bool));
|
|
for (i = 0; i < tupledesc->natts; ++i)
|
|
appendBinaryStringInfo(&buf, (char *) tupledesc->attrs[i],
|
|
sizeof(FormData_pg_attribute));
|
|
|
|
/* Send control message. */
|
|
shm_mq_send(tqueue->handle, buf.len, buf.data, false);
|
|
}
|
|
|
|
/*
|
|
* Prepare to receive tuples from executor.
|
|
*/
|
|
static void
|
|
tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
|
|
{
|
|
/* do nothing */
|
|
}
|
|
|
|
/*
|
|
* Clean up at end of an executor run
|
|
*/
|
|
static void
|
|
tqueueShutdownReceiver(DestReceiver *self)
|
|
{
|
|
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
|
|
|
|
shm_mq_detach(shm_mq_get_queue(tqueue->handle));
|
|
}
|
|
|
|
/*
|
|
* Destroy receiver when done with it
|
|
*/
|
|
static void
|
|
tqueueDestroyReceiver(DestReceiver *self)
|
|
{
|
|
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
|
|
|
|
if (tqueue->tmpcontext != NULL)
|
|
MemoryContextDelete(tqueue->tmpcontext);
|
|
if (tqueue->recordhtab != NULL)
|
|
hash_destroy(tqueue->recordhtab);
|
|
if (tqueue->remapinfo != NULL)
|
|
pfree(tqueue->remapinfo);
|
|
pfree(self);
|
|
}
|
|
|
|
/*
|
|
* Create a DestReceiver that writes tuples to a tuple queue.
|
|
*/
|
|
DestReceiver *
|
|
CreateTupleQueueDestReceiver(shm_mq_handle *handle)
|
|
{
|
|
TQueueDestReceiver *self;
|
|
|
|
self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
|
|
|
|
self->pub.receiveSlot = tqueueReceiveSlot;
|
|
self->pub.rStartup = tqueueStartupReceiver;
|
|
self->pub.rShutdown = tqueueShutdownReceiver;
|
|
self->pub.rDestroy = tqueueDestroyReceiver;
|
|
self->pub.mydest = DestTupleQueue;
|
|
self->handle = handle;
|
|
self->tmpcontext = NULL;
|
|
self->recordhtab = NULL;
|
|
self->mode = TUPLE_QUEUE_MODE_DATA;
|
|
self->remapinfo = NULL;
|
|
|
|
return (DestReceiver *) self;
|
|
}
|
|
|
|
/*
|
|
* Create a tuple queue reader.
|
|
*/
|
|
TupleQueueReader *
|
|
CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
|
|
{
|
|
TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
|
|
|
|
reader->queue = handle;
|
|
reader->mode = TUPLE_QUEUE_MODE_DATA;
|
|
reader->tupledesc = tupledesc;
|
|
reader->remapinfo = BuildRemapInfo(tupledesc);
|
|
|
|
return reader;
|
|
}
|
|
|
|
/*
|
|
* Destroy a tuple queue reader.
|
|
*/
|
|
void
|
|
DestroyTupleQueueReader(TupleQueueReader *reader)
|
|
{
|
|
shm_mq_detach(shm_mq_get_queue(reader->queue));
|
|
if (reader->remapinfo != NULL)
|
|
pfree(reader->remapinfo);
|
|
pfree(reader);
|
|
}
|
|
|
|
/*
|
|
* Fetch a tuple from a tuple queue reader.
|
|
*
|
|
* Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
|
|
* accumulate bytes from a partially-read message, so it's useful to call
|
|
* this with nowait = true even if nothing is returned.
|
|
*
|
|
* The return value is NULL if there are no remaining queues or if
|
|
* nowait = true and no tuple is ready to return. *done, if not NULL,
|
|
* is set to true when queue is detached and otherwise to false.
|
|
*/
|
|
HeapTuple
|
|
TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
|
|
{
|
|
shm_mq_result result;
|
|
|
|
if (done != NULL)
|
|
*done = false;
|
|
|
|
for (;;)
|
|
{
|
|
Size nbytes;
|
|
void *data;
|
|
|
|
/* Attempt to read a message. */
|
|
result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
|
|
|
|
/* If queue is detached, set *done and return NULL. */
|
|
if (result == SHM_MQ_DETACHED)
|
|
{
|
|
if (done != NULL)
|
|
*done = true;
|
|
return NULL;
|
|
}
|
|
|
|
/* In non-blocking mode, bail out if no message ready yet. */
|
|
if (result == SHM_MQ_WOULD_BLOCK)
|
|
return NULL;
|
|
Assert(result == SHM_MQ_SUCCESS);
|
|
|
|
/*
|
|
* OK, we got a message. Process it.
|
|
*
|
|
* One-byte messages are mode switch messages, so that we can switch
|
|
* between "control" and "data" mode. When in "data" mode, each
|
|
* message (unless exactly one byte) is a tuple. When in "control"
|
|
* mode, each message provides a transient-typmod-to-tupledesc mapping
|
|
* so we can interpret future tuples.
|
|
*/
|
|
if (nbytes == 1)
|
|
{
|
|
/* Mode switch message. */
|
|
reader->mode = ((char *) data)[0];
|
|
}
|
|
else if (reader->mode == TUPLE_QUEUE_MODE_DATA)
|
|
{
|
|
/* Tuple data. */
|
|
return TupleQueueHandleDataMessage(reader, nbytes, data);
|
|
}
|
|
else if (reader->mode == TUPLE_QUEUE_MODE_CONTROL)
|
|
{
|
|
/* Control message, describing a transient record type. */
|
|
TupleQueueHandleControlMessage(reader, nbytes, data);
|
|
}
|
|
else
|
|
elog(ERROR, "invalid mode: %d", (int) reader->mode);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Handle a data message - that is, a tuple - from the remote side.
|
|
*/
|
|
static HeapTuple
|
|
TupleQueueHandleDataMessage(TupleQueueReader *reader,
|
|
Size nbytes,
|
|
HeapTupleHeader data)
|
|
{
|
|
HeapTupleData htup;
|
|
|
|
ItemPointerSetInvalid(&htup.t_self);
|
|
htup.t_tableOid = InvalidOid;
|
|
htup.t_len = nbytes;
|
|
htup.t_data = data;
|
|
|
|
return TupleQueueRemapTuple(reader, reader->tupledesc, reader->remapinfo,
|
|
&htup);
|
|
}
|
|
|
|
/*
|
|
* Remap tuple typmods per control information received from remote side.
|
|
*/
|
|
static HeapTuple
|
|
TupleQueueRemapTuple(TupleQueueReader *reader, TupleDesc tupledesc,
|
|
RemapInfo *remapinfo, HeapTuple tuple)
|
|
{
|
|
Datum *values;
|
|
bool *isnull;
|
|
int i;
|
|
|
|
/*
|
|
* If no remapping is necessary, just copy the tuple into a single
|
|
* palloc'd chunk, as caller will expect.
|
|
*/
|
|
if (remapinfo == NULL)
|
|
return heap_copytuple(tuple);
|
|
|
|
/* Deform tuple so we can remap record typmods for individual attrs. */
|
|
values = palloc(tupledesc->natts * sizeof(Datum));
|
|
isnull = palloc(tupledesc->natts * sizeof(bool));
|
|
heap_deform_tuple(tuple, tupledesc, values, isnull);
|
|
Assert(tupledesc->natts == remapinfo->natts);
|
|
|
|
/* Recursively check each non-NULL attribute. */
|
|
for (i = 0; i < tupledesc->natts; ++i)
|
|
{
|
|
if (isnull[i] || remapinfo->mapping[i] == TQUEUE_REMAP_NONE)
|
|
continue;
|
|
values[i] = TupleQueueRemap(reader, remapinfo->mapping[i], values[i]);
|
|
}
|
|
|
|
/* Reform the modified tuple. */
|
|
return heap_form_tuple(tupledesc, values, isnull);
|
|
}
|
|
|
|
/*
|
|
* Remap a value based on the specified remap class.
|
|
*/
|
|
static Datum
|
|
TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass, Datum value)
|
|
{
|
|
check_stack_depth();
|
|
|
|
switch (remapclass)
|
|
{
|
|
case TQUEUE_REMAP_NONE:
|
|
/* caller probably shouldn't have called us at all, but... */
|
|
return value;
|
|
|
|
case TQUEUE_REMAP_ARRAY:
|
|
return TupleQueueRemapArray(reader, value);
|
|
|
|
case TQUEUE_REMAP_RANGE:
|
|
return TupleQueueRemapRange(reader, value);
|
|
|
|
case TQUEUE_REMAP_RECORD:
|
|
return TupleQueueRemapRecord(reader, value);
|
|
}
|
|
|
|
elog(ERROR, "unknown remap class: %d", (int) remapclass);
|
|
return (Datum) 0;
|
|
}
|
|
|
|
/*
|
|
* Remap an array.
|
|
*/
|
|
static Datum
|
|
TupleQueueRemapArray(TupleQueueReader *reader, Datum value)
|
|
{
|
|
ArrayType *arr = DatumGetArrayTypeP(value);
|
|
Oid typeid = ARR_ELEMTYPE(arr);
|
|
RemapClass remapclass;
|
|
int16 typlen;
|
|
bool typbyval;
|
|
char typalign;
|
|
Datum *elem_values;
|
|
bool *elem_nulls;
|
|
int num_elems;
|
|
int i;
|
|
|
|
remapclass = GetRemapClass(typeid);
|
|
|
|
/*
|
|
* If the elements of the array don't need to be walked, we shouldn't have
|
|
* been called in the first place: GetRemapClass should have returned NULL
|
|
* when asked about this array type.
|
|
*/
|
|
Assert(remapclass != TQUEUE_REMAP_NONE);
|
|
|
|
/* Deconstruct the array. */
|
|
get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign);
|
|
deconstruct_array(arr, typeid, typlen, typbyval, typalign,
|
|
&elem_values, &elem_nulls, &num_elems);
|
|
|
|
/* Remap each element. */
|
|
for (i = 0; i < num_elems; ++i)
|
|
if (!elem_nulls[i])
|
|
elem_values[i] = TupleQueueRemap(reader, remapclass,
|
|
elem_values[i]);
|
|
|
|
/* Reconstruct and return the array. */
|
|
arr = construct_md_array(elem_values, elem_nulls,
|
|
ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
|
|
typeid, typlen, typbyval, typalign);
|
|
return PointerGetDatum(arr);
|
|
}
|
|
|
|
/*
|
|
* Remap a range type.
|
|
*/
|
|
static Datum
|
|
TupleQueueRemapRange(TupleQueueReader *reader, Datum value)
|
|
{
|
|
RangeType *range = DatumGetRangeType(value);
|
|
Oid typeid = RangeTypeGetOid(range);
|
|
RemapClass remapclass;
|
|
TypeCacheEntry *typcache;
|
|
RangeBound lower;
|
|
RangeBound upper;
|
|
bool empty;
|
|
|
|
/*
|
|
* Extract the lower and upper bounds. As in tqueueWalkRange, some
|
|
* caching might be a good idea here.
|
|
*/
|
|
typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO);
|
|
if (typcache->rngelemtype == NULL)
|
|
elog(ERROR, "type %u is not a range type", typeid);
|
|
range_deserialize(typcache, range, &lower, &upper, &empty);
|
|
|
|
/* Nothing to do for an empty range. */
|
|
if (empty)
|
|
return value;
|
|
|
|
/*
|
|
* If the range bounds don't need to be walked, we shouldn't have been
|
|
* called in the first place: GetRemapClass should have returned NULL when
|
|
* asked about this range type.
|
|
*/
|
|
remapclass = GetRemapClass(typeid);
|
|
Assert(remapclass != TQUEUE_REMAP_NONE);
|
|
|
|
/* Remap each bound, if present. */
|
|
if (!upper.infinite)
|
|
upper.val = TupleQueueRemap(reader, remapclass, upper.val);
|
|
if (!lower.infinite)
|
|
lower.val = TupleQueueRemap(reader, remapclass, lower.val);
|
|
|
|
/* And reserialize. */
|
|
range = range_serialize(typcache, &lower, &upper, empty);
|
|
return RangeTypeGetDatum(range);
|
|
}
|
|
|
|
/*
|
|
* Remap a record.
|
|
*/
|
|
static Datum
|
|
TupleQueueRemapRecord(TupleQueueReader *reader, Datum value)
|
|
{
|
|
HeapTupleHeader tup;
|
|
Oid typeid;
|
|
int typmod;
|
|
RecordTypemodMap *mapent;
|
|
TupleDesc tupledesc;
|
|
RemapInfo *remapinfo;
|
|
HeapTupleData htup;
|
|
HeapTuple atup;
|
|
|
|
/* Fetch type OID and typemod. */
|
|
tup = DatumGetHeapTupleHeader(value);
|
|
typeid = HeapTupleHeaderGetTypeId(tup);
|
|
typmod = HeapTupleHeaderGetTypMod(tup);
|
|
|
|
/* If transient record, replace remote typmod with local typmod. */
|
|
if (typeid == RECORDOID)
|
|
{
|
|
Assert(reader->typmodmap != NULL);
|
|
mapent = hash_search(reader->typmodmap, &typmod,
|
|
HASH_FIND, NULL);
|
|
if (mapent == NULL)
|
|
elog(ERROR, "found unrecognized remote typmod %d", typmod);
|
|
typmod = mapent->localtypmod;
|
|
}
|
|
|
|
/*
|
|
* Fetch tupledesc and compute remap info. We should probably cache this
|
|
* so that we don't have to keep recomputing it.
|
|
*/
|
|
tupledesc = lookup_rowtype_tupdesc(typeid, typmod);
|
|
remapinfo = BuildRemapInfo(tupledesc);
|
|
DecrTupleDescRefCount(tupledesc);
|
|
|
|
/* Remap tuple. */
|
|
ItemPointerSetInvalid(&htup.t_self);
|
|
htup.t_tableOid = InvalidOid;
|
|
htup.t_len = HeapTupleHeaderGetDatumLength(tup);
|
|
htup.t_data = tup;
|
|
atup = TupleQueueRemapTuple(reader, tupledesc, remapinfo, &htup);
|
|
HeapTupleHeaderSetTypeId(atup->t_data, typeid);
|
|
HeapTupleHeaderSetTypMod(atup->t_data, typmod);
|
|
HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len);
|
|
|
|
/* And return the results. */
|
|
return HeapTupleHeaderGetDatum(atup->t_data);
|
|
}
|
|
|
|
/*
|
|
* Handle a control message from the tuple queue reader.
|
|
*
|
|
* Control messages are sent when the remote side is sending tuples that
|
|
* contain transient record types. We need to arrange to bless those
|
|
* record types locally and translate between remote and local typmods.
|
|
*/
|
|
static void
|
|
TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes,
|
|
char *data)
|
|
{
|
|
int natts;
|
|
int remotetypmod;
|
|
bool hasoid;
|
|
char *buf = data;
|
|
int rc = 0;
|
|
int i;
|
|
Form_pg_attribute *attrs;
|
|
MemoryContext oldcontext;
|
|
TupleDesc tupledesc;
|
|
RecordTypemodMap *mapent;
|
|
bool found;
|
|
|
|
/* Extract remote typmod. */
|
|
memcpy(&remotetypmod, &buf[rc], sizeof(int));
|
|
rc += sizeof(int);
|
|
|
|
/* Extract attribute count. */
|
|
memcpy(&natts, &buf[rc], sizeof(int));
|
|
rc += sizeof(int);
|
|
|
|
/* Extract hasoid flag. */
|
|
memcpy(&hasoid, &buf[rc], sizeof(bool));
|
|
rc += sizeof(bool);
|
|
|
|
/* Extract attribute details. */
|
|
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
|
|
attrs = palloc(natts * sizeof(Form_pg_attribute));
|
|
for (i = 0; i < natts; ++i)
|
|
{
|
|
attrs[i] = palloc(sizeof(FormData_pg_attribute));
|
|
memcpy(attrs[i], &buf[rc], sizeof(FormData_pg_attribute));
|
|
rc += sizeof(FormData_pg_attribute);
|
|
}
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
/* We should have read the whole message. */
|
|
Assert(rc == nbytes);
|
|
|
|
/* Construct TupleDesc. */
|
|
tupledesc = CreateTupleDesc(natts, hasoid, attrs);
|
|
tupledesc = BlessTupleDesc(tupledesc);
|
|
|
|
/* Create map if it doesn't exist already. */
|
|
if (reader->typmodmap == NULL)
|
|
{
|
|
HASHCTL ctl;
|
|
|
|
ctl.keysize = sizeof(int);
|
|
ctl.entrysize = sizeof(RecordTypemodMap);
|
|
ctl.hcxt = CurTransactionContext;
|
|
reader->typmodmap = hash_create("typmodmap hashtable",
|
|
100, &ctl, HASH_ELEM | HASH_CONTEXT);
|
|
}
|
|
|
|
/* Create map entry. */
|
|
mapent = hash_search(reader->typmodmap, &remotetypmod, HASH_ENTER,
|
|
&found);
|
|
if (found)
|
|
elog(ERROR, "duplicate message for typmod %d",
|
|
remotetypmod);
|
|
mapent->localtypmod = tupledesc->tdtypmod;
|
|
elog(DEBUG3, "mapping remote typmod %d to local typmod %d",
|
|
remotetypmod, tupledesc->tdtypmod);
|
|
}
|
|
|
|
/*
|
|
* Build a mapping indicating what remapping class applies to each attribute
|
|
* described by a tupledesc.
|
|
*/
|
|
static RemapInfo *
|
|
BuildRemapInfo(TupleDesc tupledesc)
|
|
{
|
|
RemapInfo *remapinfo;
|
|
Size size;
|
|
AttrNumber i;
|
|
bool noop = true;
|
|
|
|
size = offsetof(RemapInfo, mapping) +
|
|
sizeof(RemapClass) * tupledesc->natts;
|
|
remapinfo = MemoryContextAllocZero(TopMemoryContext, size);
|
|
remapinfo->natts = tupledesc->natts;
|
|
for (i = 0; i < tupledesc->natts; ++i)
|
|
{
|
|
Form_pg_attribute attr = tupledesc->attrs[i];
|
|
|
|
if (attr->attisdropped)
|
|
{
|
|
remapinfo->mapping[i] = TQUEUE_REMAP_NONE;
|
|
continue;
|
|
}
|
|
|
|
remapinfo->mapping[i] = GetRemapClass(attr->atttypid);
|
|
if (remapinfo->mapping[i] != TQUEUE_REMAP_NONE)
|
|
noop = false;
|
|
}
|
|
|
|
if (noop)
|
|
{
|
|
pfree(remapinfo);
|
|
remapinfo = NULL;
|
|
}
|
|
|
|
return remapinfo;
|
|
}
|
|
|
|
/*
|
|
* Determine the remap class assocociated with a particular data type.
|
|
*
|
|
* Transient record types need to have the typmod applied on the sending side
|
|
* replaced with a value on the receiving side that has the same meaning.
|
|
*
|
|
* Arrays, range types, and all record types (including named composite types)
|
|
* need to searched for transient record values buried within them.
|
|
* Surprisingly, a walker is required even when the indicated type is a
|
|
* composite type, because the actual value may be a compatible transient
|
|
* record type.
|
|
*/
|
|
static RemapClass
|
|
GetRemapClass(Oid typeid)
|
|
{
|
|
RemapClass forceResult = TQUEUE_REMAP_NONE;
|
|
RemapClass innerResult = TQUEUE_REMAP_NONE;
|
|
|
|
for (;;)
|
|
{
|
|
HeapTuple tup;
|
|
Form_pg_type typ;
|
|
|
|
/* Simple cases. */
|
|
if (typeid == RECORDOID)
|
|
{
|
|
innerResult = TQUEUE_REMAP_RECORD;
|
|
break;
|
|
}
|
|
if (typeid == RECORDARRAYOID)
|
|
{
|
|
innerResult = TQUEUE_REMAP_ARRAY;
|
|
break;
|
|
}
|
|
|
|
/* Otherwise, we need a syscache lookup to figure it out. */
|
|
tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typeid));
|
|
if (!HeapTupleIsValid(tup))
|
|
elog(ERROR, "cache lookup failed for type %u", typeid);
|
|
typ = (Form_pg_type) GETSTRUCT(tup);
|
|
|
|
/* Look through domains to underlying base type. */
|
|
if (typ->typtype == TYPTYPE_DOMAIN)
|
|
{
|
|
typeid = typ->typbasetype;
|
|
ReleaseSysCache(tup);
|
|
continue;
|
|
}
|
|
|
|
/*
|
|
* Look through arrays to underlying base type, but the final return
|
|
* value must be either TQUEUE_REMAP_ARRAY or TQUEUE_REMAP_NONE. (If
|
|
* this is an array of integers, for example, we don't need to walk
|
|
* it.)
|
|
*/
|
|
if (OidIsValid(typ->typelem) && typ->typlen == -1)
|
|
{
|
|
typeid = typ->typelem;
|
|
ReleaseSysCache(tup);
|
|
if (forceResult == TQUEUE_REMAP_NONE)
|
|
forceResult = TQUEUE_REMAP_ARRAY;
|
|
continue;
|
|
}
|
|
|
|
/*
|
|
* Similarly, look through ranges to the underlying base type, but the
|
|
* final return value must be either TQUEUE_REMAP_RANGE or
|
|
* TQUEUE_REMAP_NONE.
|
|
*/
|
|
if (typ->typtype == TYPTYPE_RANGE)
|
|
{
|
|
ReleaseSysCache(tup);
|
|
if (forceResult == TQUEUE_REMAP_NONE)
|
|
forceResult = TQUEUE_REMAP_RANGE;
|
|
typeid = get_range_subtype(typeid);
|
|
continue;
|
|
}
|
|
|
|
/* Walk composite types. Nothing else needs special handling. */
|
|
if (typ->typtype == TYPTYPE_COMPOSITE)
|
|
innerResult = TQUEUE_REMAP_RECORD;
|
|
ReleaseSysCache(tup);
|
|
break;
|
|
}
|
|
|
|
if (innerResult != TQUEUE_REMAP_NONE && forceResult != TQUEUE_REMAP_NONE)
|
|
return forceResult;
|
|
return innerResult;
|
|
}
|