1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-25 13:17:41 +03:00

Refactor COPY FROM to use format callback functions.

This commit introduces a new CopyFromRoutine struct, which is a set of
callback routines to read tuples in a specific format. It also makes
COPY FROM with the existing formats (text, CSV, and binary) utilize
these format callbacks.

This change is a preliminary step towards making the COPY FROM command
extensible in terms of input formats.

Similar to 2e4127b6d2, this refactoring contributes to a performance
improvement by reducing the number of "if" branches that need to be
checked on a per-row basis when sending field representations in text
or CSV mode. The performance benchmark results showed ~5% performance
gain in text or CSV mode.

Author: Sutou Kouhei <kou@clear-code.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Michael Paquier <michael@paquier.xyz>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Tomas Vondra <tomas.vondra@enterprisedb.com>
Reviewed-by: Junwang Zhao <zhjwpku@gmail.com>
Discussion: https://postgr.es/m/20231204.153548.2126325458835528809.kou@clear-code.com
This commit is contained in:
Masahiko Sawada
2025-02-28 10:29:36 -08:00
parent 77cb08be51
commit 7717f63006
6 changed files with 481 additions and 265 deletions

View File

@@ -28,7 +28,7 @@
#include "access/tableam.h" #include "access/tableam.h"
#include "access/xact.h" #include "access/xact.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "commands/copy.h" #include "commands/copyapi.h"
#include "commands/copyfrom_internal.h" #include "commands/copyfrom_internal.h"
#include "commands/progress.h" #include "commands/progress.h"
#include "commands/trigger.h" #include "commands/trigger.h"
@@ -106,6 +106,145 @@ typedef struct CopyMultiInsertInfo
/* non-export function prototypes */ /* non-export function prototypes */
static void ClosePipeFromProgram(CopyFromState cstate); static void ClosePipeFromProgram(CopyFromState cstate);
/*
* Built-in format-specific routines. One-row callbacks are defined in
* copyfromparse.c.
*/
static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
Oid *typioparam);
static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc);
static void CopyFromTextLikeEnd(CopyFromState cstate);
static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
FmgrInfo *finfo, Oid *typioparam);
static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
static void CopyFromBinaryEnd(CopyFromState cstate);
/*
* COPY FROM routines for built-in formats.
*
* CSV and text formats share the same TextLike routines except for the
* one-row callback.
*/
/* text format */
static const CopyFromRoutine CopyFromRoutineText = {
.CopyFromInFunc = CopyFromTextLikeInFunc,
.CopyFromStart = CopyFromTextLikeStart,
.CopyFromOneRow = CopyFromTextOneRow,
.CopyFromEnd = CopyFromTextLikeEnd,
};
/* CSV format */
static const CopyFromRoutine CopyFromRoutineCSV = {
.CopyFromInFunc = CopyFromTextLikeInFunc,
.CopyFromStart = CopyFromTextLikeStart,
.CopyFromOneRow = CopyFromCSVOneRow,
.CopyFromEnd = CopyFromTextLikeEnd,
};
/* binary format */
static const CopyFromRoutine CopyFromRoutineBinary = {
.CopyFromInFunc = CopyFromBinaryInFunc,
.CopyFromStart = CopyFromBinaryStart,
.CopyFromOneRow = CopyFromBinaryOneRow,
.CopyFromEnd = CopyFromBinaryEnd,
};
/* Return a COPY FROM routine for the given options */
static const CopyFromRoutine *
CopyFromGetRoutine(CopyFormatOptions opts)
{
if (opts.csv_mode)
return &CopyFromRoutineCSV;
else if (opts.binary)
return &CopyFromRoutineBinary;
/* default is text */
return &CopyFromRoutineText;
}
/* Implementation of the start callback for text and CSV formats */
static void
CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
{
AttrNumber attr_count;
/*
* If encoding conversion is needed, we need another buffer to hold the
* converted input data. Otherwise, we can just point input_buf to the
* same buffer as raw_buf.
*/
if (cstate->need_transcoding)
{
cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
cstate->input_buf_index = cstate->input_buf_len = 0;
}
else
cstate->input_buf = cstate->raw_buf;
cstate->input_reached_eof = false;
initStringInfo(&cstate->line_buf);
/*
* Create workspace for CopyReadAttributes results; used by CSV and text
* format.
*/
attr_count = list_length(cstate->attnumlist);
cstate->max_fields = attr_count;
cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
}
/*
* Implementation of the infunc callback for text and CSV formats. Assign
* the input function data to the given *finfo.
*/
static void
CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
Oid *typioparam)
{
Oid func_oid;
getTypeInputInfo(atttypid, &func_oid, typioparam);
fmgr_info(func_oid, finfo);
}
/* Implementation of the end callback for text and CSV formats */
static void
CopyFromTextLikeEnd(CopyFromState cstate)
{
/* nothing to do */
}
/* Implementation of the start callback for binary format */
static void
CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
{
/* Read and verify binary header */
ReceiveCopyBinaryHeader(cstate);
}
/*
* Implementation of the infunc callback for binary format. Assign
* the binary input function to the given *finfo.
*/
static void
CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
FmgrInfo *finfo, Oid *typioparam)
{
Oid func_oid;
getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
fmgr_info(func_oid, finfo);
}
/* Implementation of the end callback for binary format */
static void
CopyFromBinaryEnd(CopyFromState cstate)
{
/* nothing to do */
}
/* /*
* error context callback for COPY FROM * error context callback for COPY FROM
* *
@@ -1403,7 +1542,6 @@ BeginCopyFrom(ParseState *pstate,
num_defaults; num_defaults;
FmgrInfo *in_functions; FmgrInfo *in_functions;
Oid *typioparams; Oid *typioparams;
Oid in_func_oid;
int *defmap; int *defmap;
ExprState **defexprs; ExprState **defexprs;
MemoryContext oldcontext; MemoryContext oldcontext;
@@ -1435,6 +1573,9 @@ BeginCopyFrom(ParseState *pstate,
/* Extract options from the statement node tree */ /* Extract options from the statement node tree */
ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options); ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
/* Set the format routine */
cstate->routine = CopyFromGetRoutine(cstate->opts);
/* Process the target relation */ /* Process the target relation */
cstate->rel = rel; cstate->rel = rel;
@@ -1590,25 +1731,6 @@ BeginCopyFrom(ParseState *pstate,
cstate->raw_buf_index = cstate->raw_buf_len = 0; cstate->raw_buf_index = cstate->raw_buf_len = 0;
cstate->raw_reached_eof = false; cstate->raw_reached_eof = false;
if (!cstate->opts.binary)
{
/*
* If encoding conversion is needed, we need another buffer to hold
* the converted input data. Otherwise, we can just point input_buf
* to the same buffer as raw_buf.
*/
if (cstate->need_transcoding)
{
cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
cstate->input_buf_index = cstate->input_buf_len = 0;
}
else
cstate->input_buf = cstate->raw_buf;
cstate->input_reached_eof = false;
initStringInfo(&cstate->line_buf);
}
initStringInfo(&cstate->attribute_buf); initStringInfo(&cstate->attribute_buf);
/* Assign range table and rteperminfos, we'll need them in CopyFrom. */ /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
@@ -1641,13 +1763,9 @@ BeginCopyFrom(ParseState *pstate,
continue; continue;
/* Fetch the input function and typioparam info */ /* Fetch the input function and typioparam info */
if (cstate->opts.binary) cstate->routine->CopyFromInFunc(cstate, att->atttypid,
getTypeBinaryInputInfo(att->atttypid, &in_functions[attnum - 1],
&in_func_oid, &typioparams[attnum - 1]); &typioparams[attnum - 1]);
else
getTypeInputInfo(att->atttypid,
&in_func_oid, &typioparams[attnum - 1]);
fmgr_info(in_func_oid, &in_functions[attnum - 1]);
/* Get default info if available */ /* Get default info if available */
defexprs[attnum - 1] = NULL; defexprs[attnum - 1] = NULL;
@@ -1782,20 +1900,7 @@ BeginCopyFrom(ParseState *pstate,
pgstat_progress_update_multi_param(3, progress_cols, progress_vals); pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
if (cstate->opts.binary) cstate->routine->CopyFromStart(cstate, tupDesc);
{
/* Read and verify binary header */
ReceiveCopyBinaryHeader(cstate);
}
/* create workspace for CopyReadAttributes results */
if (!cstate->opts.binary)
{
AttrNumber attr_count = list_length(cstate->attnumlist);
cstate->max_fields = attr_count;
cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
}
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
@@ -1808,6 +1913,9 @@ BeginCopyFrom(ParseState *pstate,
void void
EndCopyFrom(CopyFromState cstate) EndCopyFrom(CopyFromState cstate)
{ {
/* Invoke the end callback */
cstate->routine->CopyFromEnd(cstate);
/* No COPY FROM related resources except memory. */ /* No COPY FROM related resources except memory. */
if (cstate->is_program) if (cstate->is_program)
{ {

View File

@@ -62,7 +62,7 @@
#include <unistd.h> #include <unistd.h>
#include <sys/stat.h> #include <sys/stat.h>
#include "commands/copy.h" #include "commands/copyapi.h"
#include "commands/copyfrom_internal.h" #include "commands/copyfrom_internal.h"
#include "commands/progress.h" #include "commands/progress.h"
#include "executor/executor.h" #include "executor/executor.h"
@@ -140,13 +140,18 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
/* non-export function prototypes */ /* non-export function prototypes */
static bool CopyReadLine(CopyFromState cstate); static bool CopyReadLine(CopyFromState cstate, bool is_csv);
static bool CopyReadLineText(CopyFromState cstate); static bool CopyReadLineText(CopyFromState cstate, bool is_csv);
static int CopyReadAttributesText(CopyFromState cstate); static int CopyReadAttributesText(CopyFromState cstate);
static int CopyReadAttributesCSV(CopyFromState cstate); static int CopyReadAttributesCSV(CopyFromState cstate);
static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo, static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo,
Oid typioparam, int32 typmod, Oid typioparam, int32 typmod,
bool *isnull); bool *isnull);
static pg_attribute_always_inline bool CopyFromTextLikeOneRow(CopyFromState cstate,
ExprContext *econtext,
Datum *values,
bool *nulls,
bool is_csv);
/* Low-level communications functions */ /* Low-level communications functions */
@@ -740,9 +745,12 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
* in the relation. * in the relation.
* *
* NOTE: force_not_null option are not applied to the returned fields. * NOTE: force_not_null option are not applied to the returned fields.
*
* We use pg_attribute_always_inline to reduce function call overhead
* and to help compilers to optimize away the 'is_csv' condition.
*/ */
bool static pg_attribute_always_inline bool
NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool is_csv)
{ {
int fldct; int fldct;
bool done; bool done;
@@ -759,13 +767,13 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
tupDesc = RelationGetDescr(cstate->rel); tupDesc = RelationGetDescr(cstate->rel);
cstate->cur_lineno++; cstate->cur_lineno++;
done = CopyReadLine(cstate); done = CopyReadLine(cstate, is_csv);
if (cstate->opts.header_line == COPY_HEADER_MATCH) if (cstate->opts.header_line == COPY_HEADER_MATCH)
{ {
int fldnum; int fldnum;
if (cstate->opts.csv_mode) if (is_csv)
fldct = CopyReadAttributesCSV(cstate); fldct = CopyReadAttributesCSV(cstate);
else else
fldct = CopyReadAttributesText(cstate); fldct = CopyReadAttributesText(cstate);
@@ -809,7 +817,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
cstate->cur_lineno++; cstate->cur_lineno++;
/* Actually read the line into memory here */ /* Actually read the line into memory here */
done = CopyReadLine(cstate); done = CopyReadLine(cstate, is_csv);
/* /*
* EOF at start of line means we're done. If we see EOF after some * EOF at start of line means we're done. If we see EOF after some
@@ -820,7 +828,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
return false; return false;
/* Parse the line into de-escaped field values */ /* Parse the line into de-escaped field values */
if (cstate->opts.csv_mode) if (is_csv)
fldct = CopyReadAttributesCSV(cstate); fldct = CopyReadAttributesCSV(cstate);
else else
fldct = CopyReadAttributesText(cstate); fldct = CopyReadAttributesText(cstate);
@@ -847,33 +855,86 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
{ {
TupleDesc tupDesc; TupleDesc tupDesc;
AttrNumber num_phys_attrs, AttrNumber num_phys_attrs,
attr_count,
num_defaults = cstate->num_defaults; num_defaults = cstate->num_defaults;
FmgrInfo *in_functions = cstate->in_functions;
Oid *typioparams = cstate->typioparams;
int i; int i;
int *defmap = cstate->defmap; int *defmap = cstate->defmap;
ExprState **defexprs = cstate->defexprs; ExprState **defexprs = cstate->defexprs;
tupDesc = RelationGetDescr(cstate->rel); tupDesc = RelationGetDescr(cstate->rel);
num_phys_attrs = tupDesc->natts; num_phys_attrs = tupDesc->natts;
attr_count = list_length(cstate->attnumlist);
/* Initialize all values for row to NULL */ /* Initialize all values for row to NULL */
MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(values, 0, num_phys_attrs * sizeof(Datum));
MemSet(nulls, true, num_phys_attrs * sizeof(bool)); MemSet(nulls, true, num_phys_attrs * sizeof(bool));
MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool)); MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
if (!cstate->opts.binary) /* Get one row from source */
if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls))
return false;
/*
* Now compute and insert any defaults available for the columns not
* provided by the input data. Anything not processed here or above will
* remain NULL.
*/
for (i = 0; i < num_defaults; i++)
{ {
/*
* The caller must supply econtext and have switched into the
* per-tuple memory context in it.
*/
Assert(econtext != NULL);
Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
values[defmap[i]] = ExecEvalExpr(defexprs[defmap[i]], econtext,
&nulls[defmap[i]]);
}
return true;
}
/* Implementation of the per-row callback for text format */
bool
CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
bool *nulls)
{
return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, false);
}
/* Implementation of the per-row callback for CSV format */
bool
CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
bool *nulls)
{
return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, true);
}
/*
* Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow().
*
* We use pg_attribute_always_inline to reduce function call overhead
* and to help compilers to optimize away the 'is_csv' condition.
*/
static pg_attribute_always_inline bool
CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext,
Datum *values, bool *nulls, bool is_csv)
{
TupleDesc tupDesc;
AttrNumber attr_count;
FmgrInfo *in_functions = cstate->in_functions;
Oid *typioparams = cstate->typioparams;
ExprState **defexprs = cstate->defexprs;
char **field_strings; char **field_strings;
ListCell *cur; ListCell *cur;
int fldct; int fldct;
int fieldno; int fieldno;
char *string; char *string;
tupDesc = RelationGetDescr(cstate->rel);
attr_count = list_length(cstate->attnumlist);
/* read raw fields in the next line */ /* read raw fields in the next line */
if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) if (!NextCopyFromRawFields(cstate, &field_strings, &fldct, is_csv))
return false; return false;
/* check for overflowing fields */ /* check for overflowing fields */
@@ -905,14 +966,14 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
continue; continue;
} }
if (cstate->opts.csv_mode) if (is_csv)
{ {
if (string == NULL && if (string == NULL &&
cstate->opts.force_notnull_flags[m]) cstate->opts.force_notnull_flags[m])
{ {
/* /*
* FORCE_NOT_NULL option is set and column is NULL - * FORCE_NOT_NULL option is set and column is NULL - convert
* convert it to the NULL string. * it to the NULL string.
*/ */
string = cstate->opts.null_print; string = cstate->opts.null_print;
} }
@@ -921,9 +982,9 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
{ {
/* /*
* FORCE_NULL option is set and column matches the NULL * FORCE_NULL option is set and column matches the NULL
* string. It must have been quoted, or otherwise the * string. It must have been quoted, or otherwise the string
* string would already have been set to NULL. Convert it * would already have been set to NULL. Convert it to NULL as
* to NULL as specified. * specified.
*/ */
string = NULL; string = NULL;
} }
@@ -937,10 +998,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
if (cstate->defaults[m]) if (cstate->defaults[m])
{ {
/* /* We must have switched into the per-tuple memory context */
* The caller must supply econtext and have switched into the
* per-tuple memory context in it.
*/
Assert(econtext != NULL); Assert(econtext != NULL);
Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
@@ -948,8 +1006,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
} }
/* /*
* If ON_ERROR is specified with IGNORE, skip rows with soft * If ON_ERROR is specified with IGNORE, skip rows with soft errors
* errors
*/ */
else if (!InputFunctionCallSafe(&in_functions[m], else if (!InputFunctionCallSafe(&in_functions[m],
string, string,
@@ -966,8 +1023,8 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
{ {
/* /*
* Since we emit line number and column info in the below * Since we emit line number and column info in the below
* notice message, we suppress error context information * notice message, we suppress error context information other
* other than the relation name. * than the relation name.
*/ */
Assert(!cstate->relname_only); Assert(!cstate->relname_only);
cstate->relname_only = true; cstate->relname_only = true;
@@ -1002,13 +1059,25 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
} }
Assert(fieldno == attr_count); Assert(fieldno == attr_count);
}
else return true;
{ }
/* binary */
/* Implementation of the per-row callback for binary format */
bool
CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
bool *nulls)
{
TupleDesc tupDesc;
AttrNumber attr_count;
FmgrInfo *in_functions = cstate->in_functions;
Oid *typioparams = cstate->typioparams;
int16 fld_count; int16 fld_count;
ListCell *cur; ListCell *cur;
tupDesc = RelationGetDescr(cstate->rel);
attr_count = list_length(cstate->attnumlist);
cstate->cur_lineno++; cstate->cur_lineno++;
if (!CopyGetInt16(cstate, &fld_count)) if (!CopyGetInt16(cstate, &fld_count))
@@ -1020,12 +1089,12 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
if (fld_count == -1) if (fld_count == -1)
{ {
/* /*
* Received EOF marker. Wait for the protocol-level EOF, and * Received EOF marker. Wait for the protocol-level EOF, and complain
* complain if it doesn't come immediately. In COPY FROM STDIN, * if it doesn't come immediately. In COPY FROM STDIN, this ensures
* this ensures that we correctly handle CopyFail, if client * that we correctly handle CopyFail, if client chooses to send that
* chooses to send that now. When copying from file, we could * now. When copying from file, we could ignore the rest of the file
* ignore the rest of the file like in text mode, but we choose to * like in text mode, but we choose to be consistent with the COPY
* be consistent with the COPY FROM STDIN case. * FROM STDIN case.
*/ */
char dummy; char dummy;
@@ -1056,25 +1125,6 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
&nulls[m]); &nulls[m]);
cstate->cur_attname = NULL; cstate->cur_attname = NULL;
} }
}
/*
* Now compute and insert any defaults available for the columns not
* provided by the input data. Anything not processed here or above will
* remain NULL.
*/
for (i = 0; i < num_defaults; i++)
{
/*
* The caller must supply econtext and have switched into the
* per-tuple memory context in it.
*/
Assert(econtext != NULL);
Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
values[defmap[i]] = ExecEvalExpr(defexprs[defmap[i]], econtext,
&nulls[defmap[i]]);
}
return true; return true;
} }
@@ -1087,7 +1137,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
* in the final value of line_buf. * in the final value of line_buf.
*/ */
static bool static bool
CopyReadLine(CopyFromState cstate) CopyReadLine(CopyFromState cstate, bool is_csv)
{ {
bool result; bool result;
@@ -1095,7 +1145,7 @@ CopyReadLine(CopyFromState cstate)
cstate->line_buf_valid = false; cstate->line_buf_valid = false;
/* Parse data and transfer into line_buf */ /* Parse data and transfer into line_buf */
result = CopyReadLineText(cstate); result = CopyReadLineText(cstate, is_csv);
if (result) if (result)
{ {
@@ -1163,7 +1213,7 @@ CopyReadLine(CopyFromState cstate)
* CopyReadLineText - inner loop of CopyReadLine for text mode * CopyReadLineText - inner loop of CopyReadLine for text mode
*/ */
static bool static bool
CopyReadLineText(CopyFromState cstate) CopyReadLineText(CopyFromState cstate, bool is_csv)
{ {
char *copy_input_buf; char *copy_input_buf;
int input_buf_ptr; int input_buf_ptr;
@@ -1178,7 +1228,7 @@ CopyReadLineText(CopyFromState cstate)
char quotec = '\0'; char quotec = '\0';
char escapec = '\0'; char escapec = '\0';
if (cstate->opts.csv_mode) if (is_csv)
{ {
quotec = cstate->opts.quote[0]; quotec = cstate->opts.quote[0];
escapec = cstate->opts.escape[0]; escapec = cstate->opts.escape[0];
@@ -1255,7 +1305,7 @@ CopyReadLineText(CopyFromState cstate)
prev_raw_ptr = input_buf_ptr; prev_raw_ptr = input_buf_ptr;
c = copy_input_buf[input_buf_ptr++]; c = copy_input_buf[input_buf_ptr++];
if (cstate->opts.csv_mode) if (is_csv)
{ {
/* /*
* If character is '\r', we may need to look ahead below. Force * If character is '\r', we may need to look ahead below. Force
@@ -1294,7 +1344,7 @@ CopyReadLineText(CopyFromState cstate)
} }
/* Process \r */ /* Process \r */
if (c == '\r' && (!cstate->opts.csv_mode || !in_quote)) if (c == '\r' && (!is_csv || !in_quote))
{ {
/* Check for \r\n on first line, _and_ handle \r\n. */ /* Check for \r\n on first line, _and_ handle \r\n. */
if (cstate->eol_type == EOL_UNKNOWN || if (cstate->eol_type == EOL_UNKNOWN ||
@@ -1322,10 +1372,10 @@ CopyReadLineText(CopyFromState cstate)
if (cstate->eol_type == EOL_CRNL) if (cstate->eol_type == EOL_CRNL)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT), (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
!cstate->opts.csv_mode ? !is_csv ?
errmsg("literal carriage return found in data") : errmsg("literal carriage return found in data") :
errmsg("unquoted carriage return found in data"), errmsg("unquoted carriage return found in data"),
!cstate->opts.csv_mode ? !is_csv ?
errhint("Use \"\\r\" to represent carriage return.") : errhint("Use \"\\r\" to represent carriage return.") :
errhint("Use quoted CSV field to represent carriage return."))); errhint("Use quoted CSV field to represent carriage return.")));
@@ -1339,10 +1389,10 @@ CopyReadLineText(CopyFromState cstate)
else if (cstate->eol_type == EOL_NL) else if (cstate->eol_type == EOL_NL)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT), (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
!cstate->opts.csv_mode ? !is_csv ?
errmsg("literal carriage return found in data") : errmsg("literal carriage return found in data") :
errmsg("unquoted carriage return found in data"), errmsg("unquoted carriage return found in data"),
!cstate->opts.csv_mode ? !is_csv ?
errhint("Use \"\\r\" to represent carriage return.") : errhint("Use \"\\r\" to represent carriage return.") :
errhint("Use quoted CSV field to represent carriage return."))); errhint("Use quoted CSV field to represent carriage return.")));
/* If reach here, we have found the line terminator */ /* If reach here, we have found the line terminator */
@@ -1350,15 +1400,15 @@ CopyReadLineText(CopyFromState cstate)
} }
/* Process \n */ /* Process \n */
if (c == '\n' && (!cstate->opts.csv_mode || !in_quote)) if (c == '\n' && (!is_csv || !in_quote))
{ {
if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL) if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT), (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
!cstate->opts.csv_mode ? !is_csv ?
errmsg("literal newline found in data") : errmsg("literal newline found in data") :
errmsg("unquoted newline found in data"), errmsg("unquoted newline found in data"),
!cstate->opts.csv_mode ? !is_csv ?
errhint("Use \"\\n\" to represent newline.") : errhint("Use \"\\n\" to represent newline.") :
errhint("Use quoted CSV field to represent newline."))); errhint("Use quoted CSV field to represent newline.")));
cstate->eol_type = EOL_NL; /* in case not set yet */ cstate->eol_type = EOL_NL; /* in case not set yet */
@@ -1370,7 +1420,7 @@ CopyReadLineText(CopyFromState cstate)
* Process backslash, except in CSV mode where backslash is a normal * Process backslash, except in CSV mode where backslash is a normal
* character. * character.
*/ */
if (c == '\\' && !cstate->opts.csv_mode) if (c == '\\' && !is_csv)
{ {
char c2; char c2;

View File

@@ -107,8 +107,6 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where
extern void EndCopyFrom(CopyFromState cstate); extern void EndCopyFrom(CopyFromState cstate);
extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext, extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
Datum *values, bool *nulls); Datum *values, bool *nulls);
extern bool NextCopyFromRawFields(CopyFromState cstate,
char ***fields, int *nfields);
extern void CopyFromErrorCallback(void *arg); extern void CopyFromErrorCallback(void *arg);
extern char *CopyLimitPrintoutLength(const char *str); extern char *CopyLimitPrintoutLength(const char *str);

View File

@@ -1,7 +1,7 @@
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* *
* copyapi.h * copyapi.h
* API for COPY TO handlers * API for COPY TO/FROM handlers
* *
* *
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
@@ -54,4 +54,52 @@ typedef struct CopyToRoutine
void (*CopyToEnd) (CopyToState cstate); void (*CopyToEnd) (CopyToState cstate);
} CopyToRoutine; } CopyToRoutine;
/*
* API structure for a COPY FROM format implementation. Note this must be
* allocated in a server-lifetime manner, typically as a static const struct.
*/
typedef struct CopyFromRoutine
{
/*
* Set input function information. This callback is called once at the
* beginning of COPY FROM.
*
* 'finfo' can be optionally filled to provide the catalog information of
* the input function.
*
* 'typioparam' can be optionally filled to define the OID of the type to
* pass to the input function.'atttypid' is the OID of data type used by
* the relation's attribute.
*/
void (*CopyFromInFunc) (CopyFromState cstate, Oid atttypid,
FmgrInfo *finfo, Oid *typioparam);
/*
* Start a COPY FROM. This callback is called once at the beginning of
* COPY FROM.
*
* 'tupDesc' is the tuple descriptor of the relation where the data needs
* to be copied. This can be used for any initialization steps required by
* a format.
*/
void (*CopyFromStart) (CopyFromState cstate, TupleDesc tupDesc);
/*
* Read one row from the source and fill *values and *nulls.
*
* 'econtext' is used to evaluate default expression for each column that
* is either not read from the file or is using the DEFAULT option of COPY
* FROM. It is NULL if no default values are used.
*
* Returns false if there are no more tuples to read.
*/
bool (*CopyFromOneRow) (CopyFromState cstate, ExprContext *econtext,
Datum *values, bool *nulls);
/*
* End a COPY FROM. This callback is called once at the end of COPY FROM.
*/
void (*CopyFromEnd) (CopyFromState cstate);
} CopyFromRoutine;
#endif /* COPYAPI_H */ #endif /* COPYAPI_H */

View File

@@ -58,6 +58,9 @@ typedef enum CopyInsertMethod
*/ */
typedef struct CopyFromStateData typedef struct CopyFromStateData
{ {
/* format routine */
const struct CopyFromRoutine *routine;
/* low-level state data */ /* low-level state data */
CopySource copy_src; /* type of copy source */ CopySource copy_src; /* type of copy source */
FILE *copy_file; /* used if copy_src == COPY_FILE */ FILE *copy_file; /* used if copy_src == COPY_FILE */
@@ -183,4 +186,12 @@ typedef struct CopyFromStateData
extern void ReceiveCopyBegin(CopyFromState cstate); extern void ReceiveCopyBegin(CopyFromState cstate);
extern void ReceiveCopyBinaryHeader(CopyFromState cstate); extern void ReceiveCopyBinaryHeader(CopyFromState cstate);
/* One-row callbacks for built-in formats defined in copyfromparse.c */
extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext,
Datum *values, bool *nulls);
extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext,
Datum *values, bool *nulls);
extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext,
Datum *values, bool *nulls);
#endif /* COPYFROM_INTERNAL_H */ #endif /* COPYFROM_INTERNAL_H */

View File

@@ -501,6 +501,7 @@ ConvertRowtypeExpr
CookedConstraint CookedConstraint
CopyDest CopyDest
CopyFormatOptions CopyFormatOptions
CopyFromRoutine
CopyFromState CopyFromState
CopyFromStateData CopyFromStateData
CopyHeaderChoice CopyHeaderChoice