1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-30 21:42:05 +03:00

Create explain_dr.c and move DestReceiver-related code there.

explain.c has grown rather large, and the code that deals with the
DestReceiver that supports the SERIALIZE option is pretty easily severable
from the rest of explain.c; hence, move it to a separate file.

Reviewed-by: Peter Geoghegan <pg@bowt.ie>
Discussion: http://postgr.es/m/CA+TgmoYutMw1Jgo8BWUmB3TqnOhsEAJiYO=rOQufF4gPLWmkLQ@mail.gmail.com
This commit is contained in:
Robert Haas
2025-02-27 13:14:16 -05:00
parent 9173e8b604
commit 555960a0fb
5 changed files with 341 additions and 298 deletions

View File

@ -17,6 +17,7 @@
#include "catalog/pg_type.h"
#include "commands/createas.h"
#include "commands/defrem.h"
#include "commands/explain_dr.h"
#include "commands/explain_format.h"
#include "commands/prepare.h"
#include "foreign/fdwapi.h"
@ -50,14 +51,6 @@ ExplainOneQuery_hook_type ExplainOneQuery_hook = NULL;
explain_get_index_name_hook_type explain_get_index_name_hook = NULL;
/* Instrumentation data for SERIALIZE option */
typedef struct SerializeMetrics
{
uint64 bytesSent; /* # of bytes serialized */
instr_time timeSpent; /* time spent serializing */
BufferUsage bufferUsage; /* buffers accessed during serialization */
} SerializeMetrics;
/*
* Various places within need to convert bytes to kilobytes. Round these up
* to the next whole kilobyte.
@ -161,7 +154,6 @@ static ExplainWorkersState *ExplainCreateWorkersState(int num_workers);
static void ExplainOpenWorker(int n, ExplainState *es);
static void ExplainCloseWorker(int n, ExplainState *es);
static void ExplainFlushWorkersState(ExplainState *es);
static SerializeMetrics GetSerializationMetrics(DestReceiver *dest);
@ -4939,292 +4931,3 @@ ExplainFlushWorkersState(ExplainState *es)
pfree(wstate->worker_state_save);
pfree(wstate);
}
/*
* DestReceiver functions for SERIALIZE option
*
* A DestReceiver for query tuples, that serializes passed rows into RowData
* messages while measuring the resources expended and total serialized size,
* while never sending the data to the client. This allows measuring the
* overhead of deTOASTing and datatype out/sendfuncs, which are not otherwise
* exercisable without actually hitting the network.
*/
typedef struct SerializeDestReceiver
{
DestReceiver pub;
ExplainState *es; /* this EXPLAIN statement's ExplainState */
int8 format; /* text or binary, like pq wire protocol */
TupleDesc attrinfo; /* the output tuple desc */
int nattrs; /* current number of columns */
FmgrInfo *finfos; /* precomputed call info for output fns */
MemoryContext tmpcontext; /* per-row temporary memory context */
StringInfoData buf; /* buffer to hold the constructed message */
SerializeMetrics metrics; /* collected metrics */
} SerializeDestReceiver;
/*
* Get the function lookup info that we'll need for output.
*
* This is a subset of what printtup_prepare_info() does. We don't need to
* cope with format choices varying across columns, so it's slightly simpler.
*/
static void
serialize_prepare_info(SerializeDestReceiver *receiver,
TupleDesc typeinfo, int nattrs)
{
/* get rid of any old data */
if (receiver->finfos)
pfree(receiver->finfos);
receiver->finfos = NULL;
receiver->attrinfo = typeinfo;
receiver->nattrs = nattrs;
if (nattrs <= 0)
return;
receiver->finfos = (FmgrInfo *) palloc0(nattrs * sizeof(FmgrInfo));
for (int i = 0; i < nattrs; i++)
{
FmgrInfo *finfo = receiver->finfos + i;
Form_pg_attribute attr = TupleDescAttr(typeinfo, i);
Oid typoutput;
Oid typsend;
bool typisvarlena;
if (receiver->format == 0)
{
/* wire protocol format text */
getTypeOutputInfo(attr->atttypid,
&typoutput,
&typisvarlena);
fmgr_info(typoutput, finfo);
}
else if (receiver->format == 1)
{
/* wire protocol format binary */
getTypeBinaryOutputInfo(attr->atttypid,
&typsend,
&typisvarlena);
fmgr_info(typsend, finfo);
}
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("unsupported format code: %d", receiver->format)));
}
}
/*
* serializeAnalyzeReceive - collect tuples for EXPLAIN (SERIALIZE)
*
* This should match printtup() in printtup.c as closely as possible,
* except for the addition of measurement code.
*/
static bool
serializeAnalyzeReceive(TupleTableSlot *slot, DestReceiver *self)
{
TupleDesc typeinfo = slot->tts_tupleDescriptor;
SerializeDestReceiver *myState = (SerializeDestReceiver *) self;
MemoryContext oldcontext;
StringInfo buf = &myState->buf;
int natts = typeinfo->natts;
instr_time start,
end;
BufferUsage instr_start;
/* only measure time, buffers if requested */
if (myState->es->timing)
INSTR_TIME_SET_CURRENT(start);
if (myState->es->buffers)
instr_start = pgBufferUsage;
/* Set or update my derived attribute info, if needed */
if (myState->attrinfo != typeinfo || myState->nattrs != natts)
serialize_prepare_info(myState, typeinfo, natts);
/* Make sure the tuple is fully deconstructed */
slot_getallattrs(slot);
/* Switch into per-row context so we can recover memory below */
oldcontext = MemoryContextSwitchTo(myState->tmpcontext);
/*
* Prepare a DataRow message (note buffer is in per-query context)
*
* Note that we fill a StringInfo buffer the same as printtup() does, so
* as to capture the costs of manipulating the strings accurately.
*/
pq_beginmessage_reuse(buf, PqMsg_DataRow);
pq_sendint16(buf, natts);
/*
* send the attributes of this tuple
*/
for (int i = 0; i < natts; i++)
{
FmgrInfo *finfo = myState->finfos + i;
Datum attr = slot->tts_values[i];
if (slot->tts_isnull[i])
{
pq_sendint32(buf, -1);
continue;
}
if (myState->format == 0)
{
/* Text output */
char *outputstr;
outputstr = OutputFunctionCall(finfo, attr);
pq_sendcountedtext(buf, outputstr, strlen(outputstr));
}
else
{
/* Binary output */
bytea *outputbytes;
outputbytes = SendFunctionCall(finfo, attr);
pq_sendint32(buf, VARSIZE(outputbytes) - VARHDRSZ);
pq_sendbytes(buf, VARDATA(outputbytes),
VARSIZE(outputbytes) - VARHDRSZ);
}
}
/*
* We mustn't call pq_endmessage_reuse(), since that would actually send
* the data to the client. Just count the data, instead. We can leave
* the buffer alone; it'll be reset on the next iteration (as would also
* happen in printtup()).
*/
myState->metrics.bytesSent += buf->len;
/* Return to caller's context, and flush row's temporary memory */
MemoryContextSwitchTo(oldcontext);
MemoryContextReset(myState->tmpcontext);
/* Update timing data */
if (myState->es->timing)
{
INSTR_TIME_SET_CURRENT(end);
INSTR_TIME_ACCUM_DIFF(myState->metrics.timeSpent, end, start);
}
/* Update buffer metrics */
if (myState->es->buffers)
BufferUsageAccumDiff(&myState->metrics.bufferUsage,
&pgBufferUsage,
&instr_start);
return true;
}
/*
* serializeAnalyzeStartup - start up the serializeAnalyze receiver
*/
static void
serializeAnalyzeStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
{
SerializeDestReceiver *receiver = (SerializeDestReceiver *) self;
Assert(receiver->es != NULL);
switch (receiver->es->serialize)
{
case EXPLAIN_SERIALIZE_NONE:
Assert(false);
break;
case EXPLAIN_SERIALIZE_TEXT:
receiver->format = 0; /* wire protocol format text */
break;
case EXPLAIN_SERIALIZE_BINARY:
receiver->format = 1; /* wire protocol format binary */
break;
}
/* Create per-row temporary memory context */
receiver->tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
"SerializeTupleReceive",
ALLOCSET_DEFAULT_SIZES);
/* The output buffer is re-used across rows, as in printtup.c */
initStringInfo(&receiver->buf);
/* Initialize results counters */
memset(&receiver->metrics, 0, sizeof(SerializeMetrics));
INSTR_TIME_SET_ZERO(receiver->metrics.timeSpent);
}
/*
* serializeAnalyzeShutdown - shut down the serializeAnalyze receiver
*/
static void
serializeAnalyzeShutdown(DestReceiver *self)
{
SerializeDestReceiver *receiver = (SerializeDestReceiver *) self;
if (receiver->finfos)
pfree(receiver->finfos);
receiver->finfos = NULL;
if (receiver->buf.data)
pfree(receiver->buf.data);
receiver->buf.data = NULL;
if (receiver->tmpcontext)
MemoryContextDelete(receiver->tmpcontext);
receiver->tmpcontext = NULL;
}
/*
* serializeAnalyzeDestroy - destroy the serializeAnalyze receiver
*/
static void
serializeAnalyzeDestroy(DestReceiver *self)
{
pfree(self);
}
/*
* Build a DestReceiver for EXPLAIN (SERIALIZE) instrumentation.
*/
DestReceiver *
CreateExplainSerializeDestReceiver(ExplainState *es)
{
SerializeDestReceiver *self;
self = (SerializeDestReceiver *) palloc0(sizeof(SerializeDestReceiver));
self->pub.receiveSlot = serializeAnalyzeReceive;
self->pub.rStartup = serializeAnalyzeStartup;
self->pub.rShutdown = serializeAnalyzeShutdown;
self->pub.rDestroy = serializeAnalyzeDestroy;
self->pub.mydest = DestExplainSerialize;
self->es = es;
return (DestReceiver *) self;
}
/*
* GetSerializationMetrics - collect metrics
*
* We have to be careful here since the receiver could be an IntoRel
* receiver if the subject statement is CREATE TABLE AS. In that
* case, return all-zeroes stats.
*/
static SerializeMetrics
GetSerializationMetrics(DestReceiver *dest)
{
SerializeMetrics empty;
if (dest->mydest == DestExplainSerialize)
return ((SerializeDestReceiver *) dest)->metrics;
memset(&empty, 0, sizeof(SerializeMetrics));
INSTR_TIME_SET_ZERO(empty.timeSpent);
return empty;
}