1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-18 02:02:55 +03:00

Refactor COPY FROM to use format callback functions.

This commit introduces a new CopyFromRoutine struct, which is a set of
callback routines to read tuples in a specific format. It also makes
COPY FROM with the existing formats (text, CSV, and binary) utilize
these format callbacks.

This change is a preliminary step towards making the COPY FROM command
extensible in terms of input formats.

Similar to 2e4127b6d2, this refactoring contributes to a performance
improvement by reducing the number of "if" branches that need to be
checked on a per-row basis when sending field representations in text
or CSV mode. The performance benchmark results showed ~5% performance
gain in text or CSV mode.

Author: Sutou Kouhei <kou@clear-code.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Michael Paquier <michael@paquier.xyz>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Tomas Vondra <tomas.vondra@enterprisedb.com>
Reviewed-by: Junwang Zhao <zhjwpku@gmail.com>
Discussion: https://postgr.es/m/20231204.153548.2126325458835528809.kou@clear-code.com
This commit is contained in:
Masahiko Sawada
2025-02-28 10:29:36 -08:00
parent 77cb08be51
commit 7717f63006
6 changed files with 481 additions and 265 deletions

View File

@@ -62,7 +62,7 @@
#include <unistd.h>
#include <sys/stat.h>
#include "commands/copy.h"
#include "commands/copyapi.h"
#include "commands/copyfrom_internal.h"
#include "commands/progress.h"
#include "executor/executor.h"
@@ -140,13 +140,18 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
/* non-export function prototypes */
static bool CopyReadLine(CopyFromState cstate);
static bool CopyReadLineText(CopyFromState cstate);
static bool CopyReadLine(CopyFromState cstate, bool is_csv);
static bool CopyReadLineText(CopyFromState cstate, bool is_csv);
static int CopyReadAttributesText(CopyFromState cstate);
static int CopyReadAttributesCSV(CopyFromState cstate);
static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo,
Oid typioparam, int32 typmod,
bool *isnull);
static pg_attribute_always_inline bool CopyFromTextLikeOneRow(CopyFromState cstate,
ExprContext *econtext,
Datum *values,
bool *nulls,
bool is_csv);
/* Low-level communications functions */
@@ -740,9 +745,12 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
* in the relation.
*
* NOTE: force_not_null option are not applied to the returned fields.
*
* We use pg_attribute_always_inline to reduce function call overhead
* and to help compilers to optimize away the 'is_csv' condition.
*/
bool
NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
static pg_attribute_always_inline bool
NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool is_csv)
{
int fldct;
bool done;
@@ -759,13 +767,13 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
tupDesc = RelationGetDescr(cstate->rel);
cstate->cur_lineno++;
done = CopyReadLine(cstate);
done = CopyReadLine(cstate, is_csv);
if (cstate->opts.header_line == COPY_HEADER_MATCH)
{
int fldnum;
if (cstate->opts.csv_mode)
if (is_csv)
fldct = CopyReadAttributesCSV(cstate);
else
fldct = CopyReadAttributesText(cstate);
@@ -809,7 +817,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
cstate->cur_lineno++;
/* Actually read the line into memory here */
done = CopyReadLine(cstate);
done = CopyReadLine(cstate, is_csv);
/*
* EOF at start of line means we're done. If we see EOF after some
@@ -820,7 +828,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
return false;
/* Parse the line into de-escaped field values */
if (cstate->opts.csv_mode)
if (is_csv)
fldct = CopyReadAttributesCSV(cstate);
else
fldct = CopyReadAttributesText(cstate);
@@ -847,216 +855,22 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
{
TupleDesc tupDesc;
AttrNumber num_phys_attrs,
attr_count,
num_defaults = cstate->num_defaults;
FmgrInfo *in_functions = cstate->in_functions;
Oid *typioparams = cstate->typioparams;
int i;
int *defmap = cstate->defmap;
ExprState **defexprs = cstate->defexprs;
tupDesc = RelationGetDescr(cstate->rel);
num_phys_attrs = tupDesc->natts;
attr_count = list_length(cstate->attnumlist);
/* Initialize all values for row to NULL */
MemSet(values, 0, num_phys_attrs * sizeof(Datum));
MemSet(nulls, true, num_phys_attrs * sizeof(bool));
MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
if (!cstate->opts.binary)
{
char **field_strings;
ListCell *cur;
int fldct;
int fieldno;
char *string;
/* read raw fields in the next line */
if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
return false;
/* check for overflowing fields */
if (attr_count > 0 && fldct > attr_count)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("extra data after last expected column")));
fieldno = 0;
/* Loop to read the user attributes on the line. */
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
int m = attnum - 1;
Form_pg_attribute att = TupleDescAttr(tupDesc, m);
if (fieldno >= fldct)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("missing data for column \"%s\"",
NameStr(att->attname))));
string = field_strings[fieldno++];
if (cstate->convert_select_flags &&
!cstate->convert_select_flags[m])
{
/* ignore input field, leaving column as NULL */
continue;
}
if (cstate->opts.csv_mode)
{
if (string == NULL &&
cstate->opts.force_notnull_flags[m])
{
/*
* FORCE_NOT_NULL option is set and column is NULL -
* convert it to the NULL string.
*/
string = cstate->opts.null_print;
}
else if (string != NULL && cstate->opts.force_null_flags[m]
&& strcmp(string, cstate->opts.null_print) == 0)
{
/*
* FORCE_NULL option is set and column matches the NULL
* string. It must have been quoted, or otherwise the
* string would already have been set to NULL. Convert it
* to NULL as specified.
*/
string = NULL;
}
}
cstate->cur_attname = NameStr(att->attname);
cstate->cur_attval = string;
if (string != NULL)
nulls[m] = false;
if (cstate->defaults[m])
{
/*
* The caller must supply econtext and have switched into the
* per-tuple memory context in it.
*/
Assert(econtext != NULL);
Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
}
/*
* If ON_ERROR is specified with IGNORE, skip rows with soft
* errors
*/
else if (!InputFunctionCallSafe(&in_functions[m],
string,
typioparams[m],
att->atttypmod,
(Node *) cstate->escontext,
&values[m]))
{
Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
cstate->num_errors++;
if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
{
/*
* Since we emit line number and column info in the below
* notice message, we suppress error context information
* other than the relation name.
*/
Assert(!cstate->relname_only);
cstate->relname_only = true;
if (cstate->cur_attval)
{
char *attval;
attval = CopyLimitPrintoutLength(cstate->cur_attval);
ereport(NOTICE,
errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"",
(unsigned long long) cstate->cur_lineno,
cstate->cur_attname,
attval));
pfree(attval);
}
else
ereport(NOTICE,
errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input",
(unsigned long long) cstate->cur_lineno,
cstate->cur_attname));
/* reset relname_only */
cstate->relname_only = false;
}
return true;
}
cstate->cur_attname = NULL;
cstate->cur_attval = NULL;
}
Assert(fieldno == attr_count);
}
else
{
/* binary */
int16 fld_count;
ListCell *cur;
cstate->cur_lineno++;
if (!CopyGetInt16(cstate, &fld_count))
{
/* EOF detected (end of file, or protocol-level EOF) */
return false;
}
if (fld_count == -1)
{
/*
* Received EOF marker. Wait for the protocol-level EOF, and
* complain if it doesn't come immediately. In COPY FROM STDIN,
* this ensures that we correctly handle CopyFail, if client
* chooses to send that now. When copying from file, we could
* ignore the rest of the file like in text mode, but we choose to
* be consistent with the COPY FROM STDIN case.
*/
char dummy;
if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("received copy data after EOF marker")));
return false;
}
if (fld_count != attr_count)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("row field count is %d, expected %d",
(int) fld_count, attr_count)));
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
int m = attnum - 1;
Form_pg_attribute att = TupleDescAttr(tupDesc, m);
cstate->cur_attname = NameStr(att->attname);
values[m] = CopyReadBinaryAttribute(cstate,
&in_functions[m],
typioparams[m],
att->atttypmod,
&nulls[m]);
cstate->cur_attname = NULL;
}
}
/* Get one row from source */
if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls))
return false;
/*
* Now compute and insert any defaults available for the columns not
@@ -1079,6 +893,242 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
return true;
}
/* Implementation of the per-row callback for text format */
bool
CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
bool *nulls)
{
return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, false);
}
/* Implementation of the per-row callback for CSV format */
bool
CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
bool *nulls)
{
return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, true);
}
/*
* Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow().
*
* We use pg_attribute_always_inline to reduce function call overhead
* and to help compilers to optimize away the 'is_csv' condition.
*/
static pg_attribute_always_inline bool
CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext,
Datum *values, bool *nulls, bool is_csv)
{
TupleDesc tupDesc;
AttrNumber attr_count;
FmgrInfo *in_functions = cstate->in_functions;
Oid *typioparams = cstate->typioparams;
ExprState **defexprs = cstate->defexprs;
char **field_strings;
ListCell *cur;
int fldct;
int fieldno;
char *string;
tupDesc = RelationGetDescr(cstate->rel);
attr_count = list_length(cstate->attnumlist);
/* read raw fields in the next line */
if (!NextCopyFromRawFields(cstate, &field_strings, &fldct, is_csv))
return false;
/* check for overflowing fields */
if (attr_count > 0 && fldct > attr_count)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("extra data after last expected column")));
fieldno = 0;
/* Loop to read the user attributes on the line. */
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
int m = attnum - 1;
Form_pg_attribute att = TupleDescAttr(tupDesc, m);
if (fieldno >= fldct)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("missing data for column \"%s\"",
NameStr(att->attname))));
string = field_strings[fieldno++];
if (cstate->convert_select_flags &&
!cstate->convert_select_flags[m])
{
/* ignore input field, leaving column as NULL */
continue;
}
if (is_csv)
{
if (string == NULL &&
cstate->opts.force_notnull_flags[m])
{
/*
* FORCE_NOT_NULL option is set and column is NULL - convert
* it to the NULL string.
*/
string = cstate->opts.null_print;
}
else if (string != NULL && cstate->opts.force_null_flags[m]
&& strcmp(string, cstate->opts.null_print) == 0)
{
/*
* FORCE_NULL option is set and column matches the NULL
* string. It must have been quoted, or otherwise the string
* would already have been set to NULL. Convert it to NULL as
* specified.
*/
string = NULL;
}
}
cstate->cur_attname = NameStr(att->attname);
cstate->cur_attval = string;
if (string != NULL)
nulls[m] = false;
if (cstate->defaults[m])
{
/* We must have switched into the per-tuple memory context */
Assert(econtext != NULL);
Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
}
/*
* If ON_ERROR is specified with IGNORE, skip rows with soft errors
*/
else if (!InputFunctionCallSafe(&in_functions[m],
string,
typioparams[m],
att->atttypmod,
(Node *) cstate->escontext,
&values[m]))
{
Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
cstate->num_errors++;
if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
{
/*
* Since we emit line number and column info in the below
* notice message, we suppress error context information other
* than the relation name.
*/
Assert(!cstate->relname_only);
cstate->relname_only = true;
if (cstate->cur_attval)
{
char *attval;
attval = CopyLimitPrintoutLength(cstate->cur_attval);
ereport(NOTICE,
errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"",
(unsigned long long) cstate->cur_lineno,
cstate->cur_attname,
attval));
pfree(attval);
}
else
ereport(NOTICE,
errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input",
(unsigned long long) cstate->cur_lineno,
cstate->cur_attname));
/* reset relname_only */
cstate->relname_only = false;
}
return true;
}
cstate->cur_attname = NULL;
cstate->cur_attval = NULL;
}
Assert(fieldno == attr_count);
return true;
}
/* Implementation of the per-row callback for binary format */
bool
CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
bool *nulls)
{
TupleDesc tupDesc;
AttrNumber attr_count;
FmgrInfo *in_functions = cstate->in_functions;
Oid *typioparams = cstate->typioparams;
int16 fld_count;
ListCell *cur;
tupDesc = RelationGetDescr(cstate->rel);
attr_count = list_length(cstate->attnumlist);
cstate->cur_lineno++;
if (!CopyGetInt16(cstate, &fld_count))
{
/* EOF detected (end of file, or protocol-level EOF) */
return false;
}
if (fld_count == -1)
{
/*
* Received EOF marker. Wait for the protocol-level EOF, and complain
* if it doesn't come immediately. In COPY FROM STDIN, this ensures
* that we correctly handle CopyFail, if client chooses to send that
* now. When copying from file, we could ignore the rest of the file
* like in text mode, but we choose to be consistent with the COPY
* FROM STDIN case.
*/
char dummy;
if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("received copy data after EOF marker")));
return false;
}
if (fld_count != attr_count)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("row field count is %d, expected %d",
(int) fld_count, attr_count)));
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
int m = attnum - 1;
Form_pg_attribute att = TupleDescAttr(tupDesc, m);
cstate->cur_attname = NameStr(att->attname);
values[m] = CopyReadBinaryAttribute(cstate,
&in_functions[m],
typioparams[m],
att->atttypmod,
&nulls[m]);
cstate->cur_attname = NULL;
}
return true;
}
/*
* Read the next input line and stash it in line_buf.
*
@@ -1087,7 +1137,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
* in the final value of line_buf.
*/
static bool
CopyReadLine(CopyFromState cstate)
CopyReadLine(CopyFromState cstate, bool is_csv)
{
bool result;
@@ -1095,7 +1145,7 @@ CopyReadLine(CopyFromState cstate)
cstate->line_buf_valid = false;
/* Parse data and transfer into line_buf */
result = CopyReadLineText(cstate);
result = CopyReadLineText(cstate, is_csv);
if (result)
{
@@ -1163,7 +1213,7 @@ CopyReadLine(CopyFromState cstate)
* CopyReadLineText - inner loop of CopyReadLine for text mode
*/
static bool
CopyReadLineText(CopyFromState cstate)
CopyReadLineText(CopyFromState cstate, bool is_csv)
{
char *copy_input_buf;
int input_buf_ptr;
@@ -1178,7 +1228,7 @@ CopyReadLineText(CopyFromState cstate)
char quotec = '\0';
char escapec = '\0';
if (cstate->opts.csv_mode)
if (is_csv)
{
quotec = cstate->opts.quote[0];
escapec = cstate->opts.escape[0];
@@ -1255,7 +1305,7 @@ CopyReadLineText(CopyFromState cstate)
prev_raw_ptr = input_buf_ptr;
c = copy_input_buf[input_buf_ptr++];
if (cstate->opts.csv_mode)
if (is_csv)
{
/*
* If character is '\r', we may need to look ahead below. Force
@@ -1294,7 +1344,7 @@ CopyReadLineText(CopyFromState cstate)
}
/* Process \r */
if (c == '\r' && (!cstate->opts.csv_mode || !in_quote))
if (c == '\r' && (!is_csv || !in_quote))
{
/* Check for \r\n on first line, _and_ handle \r\n. */
if (cstate->eol_type == EOL_UNKNOWN ||
@@ -1322,10 +1372,10 @@ CopyReadLineText(CopyFromState cstate)
if (cstate->eol_type == EOL_CRNL)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
!cstate->opts.csv_mode ?
!is_csv ?
errmsg("literal carriage return found in data") :
errmsg("unquoted carriage return found in data"),
!cstate->opts.csv_mode ?
!is_csv ?
errhint("Use \"\\r\" to represent carriage return.") :
errhint("Use quoted CSV field to represent carriage return.")));
@@ -1339,10 +1389,10 @@ CopyReadLineText(CopyFromState cstate)
else if (cstate->eol_type == EOL_NL)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
!cstate->opts.csv_mode ?
!is_csv ?
errmsg("literal carriage return found in data") :
errmsg("unquoted carriage return found in data"),
!cstate->opts.csv_mode ?
!is_csv ?
errhint("Use \"\\r\" to represent carriage return.") :
errhint("Use quoted CSV field to represent carriage return.")));
/* If reach here, we have found the line terminator */
@@ -1350,15 +1400,15 @@ CopyReadLineText(CopyFromState cstate)
}
/* Process \n */
if (c == '\n' && (!cstate->opts.csv_mode || !in_quote))
if (c == '\n' && (!is_csv || !in_quote))
{
if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
!cstate->opts.csv_mode ?
!is_csv ?
errmsg("literal newline found in data") :
errmsg("unquoted newline found in data"),
!cstate->opts.csv_mode ?
!is_csv ?
errhint("Use \"\\n\" to represent newline.") :
errhint("Use quoted CSV field to represent newline.")));
cstate->eol_type = EOL_NL; /* in case not set yet */
@@ -1370,7 +1420,7 @@ CopyReadLineText(CopyFromState cstate)
* Process backslash, except in CSV mode where backslash is a normal
* character.
*/
if (c == '\\' && !cstate->opts.csv_mode)
if (c == '\\' && !is_csv)
{
char c2;