1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-05 23:56:58 +03:00

Refactor COPY TO to use format callback functions.

This commit introduces a new CopyToRoutine struct, which is a set of
callback routines to copy tuples in a specific format. It also makes
the existing formats (text, CSV, and binary) utilize these format
callbacks.

This change is a preliminary step towards making the COPY TO command
extensible in terms of output formats.

Additionally, this refactoring contributes to a performance
improvement by reducing the number of "if" branches that need to be
checked on a per-row basis when sending field representations in text
or CSV mode. The performance benchmark results showed ~5% performance
gain in text or CSV mode.

Author: Sutou Kouhei <kou@clear-code.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Michael Paquier <michael@paquier.xyz>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Tomas Vondra <tomas.vondra@enterprisedb.com>
Reviewed-by: Junwang Zhao <zhjwpku@gmail.com>
Discussion: https://postgr.es/m/20231204.153548.2126325458835528809.kou@clear-code.com
This commit is contained in:
Masahiko Sawada 2025-02-27 15:03:52 -08:00
parent 555960a0fb
commit 2e4127b6d2
3 changed files with 355 additions and 141 deletions

View File

@ -19,7 +19,7 @@
#include <sys/stat.h>
#include "access/tableam.h"
#include "commands/copy.h"
#include "commands/copyapi.h"
#include "commands/progress.h"
#include "executor/execdesc.h"
#include "executor/executor.h"
@ -64,6 +64,9 @@ typedef enum CopyDest
*/
typedef struct CopyToStateData
{
/* format-specific routines */
const CopyToRoutine *routine;
/* low-level state data */
CopyDest copy_dest; /* type of copy source/destination */
FILE *copy_file; /* used if copy_dest == COPY_FILE */
@ -114,6 +117,19 @@ static void CopyAttributeOutText(CopyToState cstate, const char *string);
static void CopyAttributeOutCSV(CopyToState cstate, const char *string,
bool use_quote);
/* built-in format-specific routines */
static void CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc);
static void CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo);
static void CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot);
static void CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot);
static void CopyToTextLikeOneRow(CopyToState cstate, TupleTableSlot *slot,
bool is_csv);
static void CopyToTextLikeEnd(CopyToState cstate);
static void CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc);
static void CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo);
static void CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot);
static void CopyToBinaryEnd(CopyToState cstate);
/* Low-level communications functions */
static void SendCopyBegin(CopyToState cstate);
static void SendCopyEnd(CopyToState cstate);
@ -121,9 +137,251 @@ static void CopySendData(CopyToState cstate, const void *databuf, int datasize);
static void CopySendString(CopyToState cstate, const char *str);
static void CopySendChar(CopyToState cstate, char c);
static void CopySendEndOfRow(CopyToState cstate);
static void CopySendTextLikeEndOfRow(CopyToState cstate);
static void CopySendInt32(CopyToState cstate, int32 val);
static void CopySendInt16(CopyToState cstate, int16 val);
/*
* COPY TO routines for built-in formats.
*
* CSV and text formats share the same TextLike routines except for the
* one-row callback.
*/
/* text format */
static const CopyToRoutine CopyToRoutineText = {
.CopyToStart = CopyToTextLikeStart,
.CopyToOutFunc = CopyToTextLikeOutFunc,
.CopyToOneRow = CopyToTextOneRow,
.CopyToEnd = CopyToTextLikeEnd,
};
/* CSV format */
static const CopyToRoutine CopyToRoutineCSV = {
.CopyToStart = CopyToTextLikeStart,
.CopyToOutFunc = CopyToTextLikeOutFunc,
.CopyToOneRow = CopyToCSVOneRow,
.CopyToEnd = CopyToTextLikeEnd,
};
/* binary format */
static const CopyToRoutine CopyToRoutineBinary = {
.CopyToStart = CopyToBinaryStart,
.CopyToOutFunc = CopyToBinaryOutFunc,
.CopyToOneRow = CopyToBinaryOneRow,
.CopyToEnd = CopyToBinaryEnd,
};
/* Return a COPY TO routine for the given options */
static const CopyToRoutine *
CopyToGetRoutine(CopyFormatOptions opts)
{
if (opts.csv_mode)
return &CopyToRoutineCSV;
else if (opts.binary)
return &CopyToRoutineBinary;
/* default is text */
return &CopyToRoutineText;
}
/* Implementation of the start callback for text and CSV formats */
static void
CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc)
{
/*
* For non-binary copy, we need to convert null_print to file encoding,
* because it will be sent directly with CopySendString.
*/
if (cstate->need_transcoding)
cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
cstate->opts.null_print_len,
cstate->file_encoding);
/* if a header has been requested send the line */
if (cstate->opts.header_line)
{
ListCell *cur;
bool hdr_delim = false;
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
char *colname;
if (hdr_delim)
CopySendChar(cstate, cstate->opts.delim[0]);
hdr_delim = true;
colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
if (cstate->opts.csv_mode)
CopyAttributeOutCSV(cstate, colname, false);
else
CopyAttributeOutText(cstate, colname);
}
CopySendTextLikeEndOfRow(cstate);
}
}
/*
* Implementation of the outfunc callback for text and CSV formats. Assign
* the output function data to the given *finfo.
*/
static void
CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
{
Oid func_oid;
bool is_varlena;
/* Set output function for an attribute */
getTypeOutputInfo(atttypid, &func_oid, &is_varlena);
fmgr_info(func_oid, finfo);
}
/* Implementation of the per-row callback for text format */
static void
CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot)
{
CopyToTextLikeOneRow(cstate, slot, false);
}
/* Implementation of the per-row callback for CSV format */
static void
CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot)
{
CopyToTextLikeOneRow(cstate, slot, true);
}
/*
* Workhorse for CopyToTextOneRow() and CopyToCSVOneRow().
*
* We use pg_attribute_always_inline to reduce function call overhead
* and to help compilers to optimize away the 'is_csv' condition.
*/
static pg_attribute_always_inline void
CopyToTextLikeOneRow(CopyToState cstate,
TupleTableSlot *slot,
bool is_csv)
{
bool need_delim = false;
FmgrInfo *out_functions = cstate->out_functions;
foreach_int(attnum, cstate->attnumlist)
{
Datum value = slot->tts_values[attnum - 1];
bool isnull = slot->tts_isnull[attnum - 1];
if (need_delim)
CopySendChar(cstate, cstate->opts.delim[0]);
need_delim = true;
if (isnull)
{
CopySendString(cstate, cstate->opts.null_print_client);
}
else
{
char *string;
string = OutputFunctionCall(&out_functions[attnum - 1],
value);
if (is_csv)
CopyAttributeOutCSV(cstate, string,
cstate->opts.force_quote_flags[attnum - 1]);
else
CopyAttributeOutText(cstate, string);
}
}
CopySendTextLikeEndOfRow(cstate);
}
/* Implementation of the end callback for text and CSV formats */
static void
CopyToTextLikeEnd(CopyToState cstate)
{
/* Nothing to do here */
}
/*
* Implementation of the start callback for binary format. Send a header
* for a binary copy.
*/
static void
CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc)
{
int32 tmp;
/* Signature */
CopySendData(cstate, BinarySignature, 11);
/* Flags field */
tmp = 0;
CopySendInt32(cstate, tmp);
/* No header extension */
tmp = 0;
CopySendInt32(cstate, tmp);
}
/*
* Implementation of the outfunc callback for binary format. Assign
* the binary output function to the given *finfo.
*/
static void
CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
{
Oid func_oid;
bool is_varlena;
/* Set output function for an attribute */
getTypeBinaryOutputInfo(atttypid, &func_oid, &is_varlena);
fmgr_info(func_oid, finfo);
}
/* Implementation of the per-row callback for binary format */
static void
CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
{
FmgrInfo *out_functions = cstate->out_functions;
/* Binary per-tuple header */
CopySendInt16(cstate, list_length(cstate->attnumlist));
foreach_int(attnum, cstate->attnumlist)
{
Datum value = slot->tts_values[attnum - 1];
bool isnull = slot->tts_isnull[attnum - 1];
if (isnull)
{
CopySendInt32(cstate, -1);
}
else
{
bytea *outputbytes;
outputbytes = SendFunctionCall(&out_functions[attnum - 1],
value);
CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
CopySendData(cstate, VARDATA(outputbytes),
VARSIZE(outputbytes) - VARHDRSZ);
}
}
CopySendEndOfRow(cstate);
}
/* Implementation of the end callback for binary format */
static void
CopyToBinaryEnd(CopyToState cstate)
{
/* Generate trailer for a binary copy */
CopySendInt16(cstate, -1);
/* Need to flush out the trailer */
CopySendEndOfRow(cstate);
}
/*
* Send copy start/stop messages for frontend copies. These have changed
@ -191,16 +449,6 @@ CopySendEndOfRow(CopyToState cstate)
switch (cstate->copy_dest)
{
case COPY_FILE:
if (!cstate->opts.binary)
{
/* Default line termination depends on platform */
#ifndef WIN32
CopySendChar(cstate, '\n');
#else
CopySendString(cstate, "\r\n");
#endif
}
if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
cstate->copy_file) != 1 ||
ferror(cstate->copy_file))
@ -235,10 +483,6 @@ CopySendEndOfRow(CopyToState cstate)
}
break;
case COPY_FRONTEND:
/* The FE/BE protocol uses \n as newline for all platforms */
if (!cstate->opts.binary)
CopySendChar(cstate, '\n');
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
break;
@ -254,6 +498,35 @@ CopySendEndOfRow(CopyToState cstate)
resetStringInfo(fe_msgbuf);
}
/*
* Wrapper function of CopySendEndOfRow for text and CSV formats. Sends the
* line termination and do common appropriate things for the end of row.
*/
static inline void
CopySendTextLikeEndOfRow(CopyToState cstate)
{
switch (cstate->copy_dest)
{
case COPY_FILE:
/* Default line termination depends on platform */
#ifndef WIN32
CopySendChar(cstate, '\n');
#else
CopySendString(cstate, "\r\n");
#endif
break;
case COPY_FRONTEND:
/* The FE/BE protocol uses \n as newline for all platforms */
CopySendChar(cstate, '\n');
break;
default:
break;
}
/* Now take the actions related to the end of a row */
CopySendEndOfRow(cstate);
}
/*
* These functions do apply some data conversion
*/
@ -426,6 +699,9 @@ BeginCopyTo(ParseState *pstate,
/* Extract options from the statement node tree */
ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
/* Set format routine */
cstate->routine = CopyToGetRoutine(cstate->opts);
/* Process the source/target relation or query */
if (rel)
{
@ -772,19 +1048,10 @@ DoCopyTo(CopyToState cstate)
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
Oid out_func_oid;
bool isvarlena;
Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
if (cstate->opts.binary)
getTypeBinaryOutputInfo(attr->atttypid,
&out_func_oid,
&isvarlena);
else
getTypeOutputInfo(attr->atttypid,
&out_func_oid,
&isvarlena);
fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
cstate->routine->CopyToOutFunc(cstate, attr->atttypid,
&cstate->out_functions[attnum - 1]);
}
/*
@ -797,56 +1064,7 @@ DoCopyTo(CopyToState cstate)
"COPY TO",
ALLOCSET_DEFAULT_SIZES);
if (cstate->opts.binary)
{
/* Generate header for a binary copy */
int32 tmp;
/* Signature */
CopySendData(cstate, BinarySignature, 11);
/* Flags field */
tmp = 0;
CopySendInt32(cstate, tmp);
/* No header extension */
tmp = 0;
CopySendInt32(cstate, tmp);
}
else
{
/*
* For non-binary copy, we need to convert null_print to file
* encoding, because it will be sent directly with CopySendString.
*/
if (cstate->need_transcoding)
cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
cstate->opts.null_print_len,
cstate->file_encoding);
/* if a header has been requested send the line */
if (cstate->opts.header_line)
{
bool hdr_delim = false;
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
char *colname;
if (hdr_delim)
CopySendChar(cstate, cstate->opts.delim[0]);
hdr_delim = true;
colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
if (cstate->opts.csv_mode)
CopyAttributeOutCSV(cstate, colname, false);
else
CopyAttributeOutText(cstate, colname);
}
CopySendEndOfRow(cstate);
}
}
cstate->routine->CopyToStart(cstate, tupDesc);
if (cstate->rel)
{
@ -885,13 +1103,7 @@ DoCopyTo(CopyToState cstate)
processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
}
if (cstate->opts.binary)
{
/* Generate trailer for a binary copy */
CopySendInt16(cstate, -1);
/* Need to flush out the trailer */
CopySendEndOfRow(cstate);
}
cstate->routine->CopyToEnd(cstate);
MemoryContextDelete(cstate->rowcontext);
@ -904,74 +1116,18 @@ DoCopyTo(CopyToState cstate)
/*
* Emit one row during DoCopyTo().
*/
static void
static inline void
CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
{
FmgrInfo *out_functions = cstate->out_functions;
MemoryContext oldcontext;
MemoryContextReset(cstate->rowcontext);
oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
if (cstate->opts.binary)
{
/* Binary per-tuple header */
CopySendInt16(cstate, list_length(cstate->attnumlist));
}
/* Make sure the tuple is fully deconstructed */
slot_getallattrs(slot);
if (!cstate->opts.binary)
{
bool need_delim = false;
foreach_int(attnum, cstate->attnumlist)
{
Datum value = slot->tts_values[attnum - 1];
bool isnull = slot->tts_isnull[attnum - 1];
char *string;
if (need_delim)
CopySendChar(cstate, cstate->opts.delim[0]);
need_delim = true;
if (isnull)
CopySendString(cstate, cstate->opts.null_print_client);
else
{
string = OutputFunctionCall(&out_functions[attnum - 1],
value);
if (cstate->opts.csv_mode)
CopyAttributeOutCSV(cstate, string,
cstate->opts.force_quote_flags[attnum - 1]);
else
CopyAttributeOutText(cstate, string);
}
}
}
else
{
foreach_int(attnum, cstate->attnumlist)
{
Datum value = slot->tts_values[attnum - 1];
bool isnull = slot->tts_isnull[attnum - 1];
bytea *outputbytes;
if (isnull)
CopySendInt32(cstate, -1);
else
{
outputbytes = SendFunctionCall(&out_functions[attnum - 1],
value);
CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
CopySendData(cstate, VARDATA(outputbytes),
VARSIZE(outputbytes) - VARHDRSZ);
}
}
}
CopySendEndOfRow(cstate);
cstate->routine->CopyToOneRow(cstate, slot);
MemoryContextSwitchTo(oldcontext);
}

View File

@ -0,0 +1,57 @@
/*-------------------------------------------------------------------------
*
* copyapi.h
* API for COPY TO handlers
*
*
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/commands/copyapi.h
*
*-------------------------------------------------------------------------
*/
#ifndef COPYAPI_H
#define COPYAPI_H
#include "commands/copy.h"
/*
* API structure for a COPY TO format implementation. Note this must be
* allocated in a server-lifetime manner, typically as a static const struct.
*/
typedef struct CopyToRoutine
{
/*
* Set output function information. This callback is called once at the
* beginning of COPY TO.
*
* 'finfo' can be optionally filled to provide the catalog information of
* the output function.
*
* 'atttypid' is the OID of data type used by the relation's attribute.
*/
void (*CopyToOutFunc) (CopyToState cstate, Oid atttypid,
FmgrInfo *finfo);
/*
* Start a COPY TO. This callback is called once at the beginning of COPY
* TO.
*
* 'tupDesc' is the tuple descriptor of the relation from where the data
* is read.
*/
void (*CopyToStart) (CopyToState cstate, TupleDesc tupDesc);
/*
* Write one row stored in 'slot' to the destination.
*/
void (*CopyToOneRow) (CopyToState cstate, TupleTableSlot *slot);
/*
* End a COPY TO. This callback is called once at the end of COPY TO.
*/
void (*CopyToEnd) (CopyToState cstate);
} CopyToRoutine;
#endif /* COPYAPI_H */

View File

@ -512,6 +512,7 @@ CopyMultiInsertInfo
CopyOnErrorChoice
CopySource
CopyStmt
CopyToRoutine
CopyToState
CopyToStateData
Cost