diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 8875d79d59a..198cee2bc48 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -28,7 +28,7 @@ #include "access/tableam.h" #include "access/xact.h" #include "catalog/namespace.h" -#include "commands/copy.h" +#include "commands/copyapi.h" #include "commands/copyfrom_internal.h" #include "commands/progress.h" #include "commands/trigger.h" @@ -106,6 +106,145 @@ typedef struct CopyMultiInsertInfo /* non-export function prototypes */ static void ClosePipeFromProgram(CopyFromState cstate); +/* + * Built-in format-specific routines. One-row callbacks are defined in + * copyfromparse.c. + */ +static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, + Oid *typioparam); +static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc); +static void CopyFromTextLikeEnd(CopyFromState cstate); +static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam); +static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc); +static void CopyFromBinaryEnd(CopyFromState cstate); + + +/* + * COPY FROM routines for built-in formats. + * + * CSV and text formats share the same TextLike routines except for the + * one-row callback. + */ + +/* text format */ +static const CopyFromRoutine CopyFromRoutineText = { + .CopyFromInFunc = CopyFromTextLikeInFunc, + .CopyFromStart = CopyFromTextLikeStart, + .CopyFromOneRow = CopyFromTextOneRow, + .CopyFromEnd = CopyFromTextLikeEnd, +}; + +/* CSV format */ +static const CopyFromRoutine CopyFromRoutineCSV = { + .CopyFromInFunc = CopyFromTextLikeInFunc, + .CopyFromStart = CopyFromTextLikeStart, + .CopyFromOneRow = CopyFromCSVOneRow, + .CopyFromEnd = CopyFromTextLikeEnd, +}; + +/* binary format */ +static const CopyFromRoutine CopyFromRoutineBinary = { + .CopyFromInFunc = CopyFromBinaryInFunc, + .CopyFromStart = CopyFromBinaryStart, + .CopyFromOneRow = CopyFromBinaryOneRow, + .CopyFromEnd = CopyFromBinaryEnd, +}; + +/* Return a COPY FROM routine for the given options */ +static const CopyFromRoutine * +CopyFromGetRoutine(CopyFormatOptions opts) +{ + if (opts.csv_mode) + return &CopyFromRoutineCSV; + else if (opts.binary) + return &CopyFromRoutineBinary; + + /* default is text */ + return &CopyFromRoutineText; +} + +/* Implementation of the start callback for text and CSV formats */ +static void +CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc) +{ + AttrNumber attr_count; + + /* + * If encoding conversion is needed, we need another buffer to hold the + * converted input data. Otherwise, we can just point input_buf to the + * same buffer as raw_buf. + */ + if (cstate->need_transcoding) + { + cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); + cstate->input_buf_index = cstate->input_buf_len = 0; + } + else + cstate->input_buf = cstate->raw_buf; + cstate->input_reached_eof = false; + + initStringInfo(&cstate->line_buf); + + /* + * Create workspace for CopyReadAttributes results; used by CSV and text + * format. + */ + attr_count = list_length(cstate->attnumlist); + cstate->max_fields = attr_count; + cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); +} + +/* + * Implementation of the infunc callback for text and CSV formats. Assign + * the input function data to the given *finfo. + */ +static void +CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, + Oid *typioparam) +{ + Oid func_oid; + + getTypeInputInfo(atttypid, &func_oid, typioparam); + fmgr_info(func_oid, finfo); +} + +/* Implementation of the end callback for text and CSV formats */ +static void +CopyFromTextLikeEnd(CopyFromState cstate) +{ + /* nothing to do */ +} + +/* Implementation of the start callback for binary format */ +static void +CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc) +{ + /* Read and verify binary header */ + ReceiveCopyBinaryHeader(cstate); +} + +/* + * Implementation of the infunc callback for binary format. Assign + * the binary input function to the given *finfo. + */ +static void +CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam) +{ + Oid func_oid; + + getTypeBinaryInputInfo(atttypid, &func_oid, typioparam); + fmgr_info(func_oid, finfo); +} + +/* Implementation of the end callback for binary format */ +static void +CopyFromBinaryEnd(CopyFromState cstate) +{ + /* nothing to do */ +} + /* * error context callback for COPY FROM * @@ -1403,7 +1542,6 @@ BeginCopyFrom(ParseState *pstate, num_defaults; FmgrInfo *in_functions; Oid *typioparams; - Oid in_func_oid; int *defmap; ExprState **defexprs; MemoryContext oldcontext; @@ -1435,6 +1573,9 @@ BeginCopyFrom(ParseState *pstate, /* Extract options from the statement node tree */ ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options); + /* Set the format routine */ + cstate->routine = CopyFromGetRoutine(cstate->opts); + /* Process the target relation */ cstate->rel = rel; @@ -1590,25 +1731,6 @@ BeginCopyFrom(ParseState *pstate, cstate->raw_buf_index = cstate->raw_buf_len = 0; cstate->raw_reached_eof = false; - if (!cstate->opts.binary) - { - /* - * If encoding conversion is needed, we need another buffer to hold - * the converted input data. Otherwise, we can just point input_buf - * to the same buffer as raw_buf. - */ - if (cstate->need_transcoding) - { - cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); - cstate->input_buf_index = cstate->input_buf_len = 0; - } - else - cstate->input_buf = cstate->raw_buf; - cstate->input_reached_eof = false; - - initStringInfo(&cstate->line_buf); - } - initStringInfo(&cstate->attribute_buf); /* Assign range table and rteperminfos, we'll need them in CopyFrom. */ @@ -1641,13 +1763,9 @@ BeginCopyFrom(ParseState *pstate, continue; /* Fetch the input function and typioparam info */ - if (cstate->opts.binary) - getTypeBinaryInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - else - getTypeInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); + cstate->routine->CopyFromInFunc(cstate, att->atttypid, + &in_functions[attnum - 1], + &typioparams[attnum - 1]); /* Get default info if available */ defexprs[attnum - 1] = NULL; @@ -1782,20 +1900,7 @@ BeginCopyFrom(ParseState *pstate, pgstat_progress_update_multi_param(3, progress_cols, progress_vals); - if (cstate->opts.binary) - { - /* Read and verify binary header */ - ReceiveCopyBinaryHeader(cstate); - } - - /* create workspace for CopyReadAttributes results */ - if (!cstate->opts.binary) - { - AttrNumber attr_count = list_length(cstate->attnumlist); - - cstate->max_fields = attr_count; - cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); - } + cstate->routine->CopyFromStart(cstate, tupDesc); MemoryContextSwitchTo(oldcontext); @@ -1808,6 +1913,9 @@ BeginCopyFrom(ParseState *pstate, void EndCopyFrom(CopyFromState cstate) { + /* Invoke the end callback */ + cstate->routine->CopyFromEnd(cstate); + /* No COPY FROM related resources except memory. */ if (cstate->is_program) { diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index caccdc8563c..bad577aa67b 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -62,7 +62,7 @@ #include #include -#include "commands/copy.h" +#include "commands/copyapi.h" #include "commands/copyfrom_internal.h" #include "commands/progress.h" #include "executor/executor.h" @@ -140,13 +140,18 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; /* non-export function prototypes */ -static bool CopyReadLine(CopyFromState cstate); -static bool CopyReadLineText(CopyFromState cstate); +static bool CopyReadLine(CopyFromState cstate, bool is_csv); +static bool CopyReadLineText(CopyFromState cstate, bool is_csv); static int CopyReadAttributesText(CopyFromState cstate); static int CopyReadAttributesCSV(CopyFromState cstate); static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull); +static pg_attribute_always_inline bool CopyFromTextLikeOneRow(CopyFromState cstate, + ExprContext *econtext, + Datum *values, + bool *nulls, + bool is_csv); /* Low-level communications functions */ @@ -740,9 +745,12 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes) * in the relation. * * NOTE: force_not_null option are not applied to the returned fields. + * + * We use pg_attribute_always_inline to reduce function call overhead + * and to help compilers to optimize away the 'is_csv' condition. */ -bool -NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) +static pg_attribute_always_inline bool +NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool is_csv) { int fldct; bool done; @@ -759,13 +767,13 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) tupDesc = RelationGetDescr(cstate->rel); cstate->cur_lineno++; - done = CopyReadLine(cstate); + done = CopyReadLine(cstate, is_csv); if (cstate->opts.header_line == COPY_HEADER_MATCH) { int fldnum; - if (cstate->opts.csv_mode) + if (is_csv) fldct = CopyReadAttributesCSV(cstate); else fldct = CopyReadAttributesText(cstate); @@ -809,7 +817,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) cstate->cur_lineno++; /* Actually read the line into memory here */ - done = CopyReadLine(cstate); + done = CopyReadLine(cstate, is_csv); /* * EOF at start of line means we're done. If we see EOF after some @@ -820,7 +828,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) return false; /* Parse the line into de-escaped field values */ - if (cstate->opts.csv_mode) + if (is_csv) fldct = CopyReadAttributesCSV(cstate); else fldct = CopyReadAttributesText(cstate); @@ -847,216 +855,22 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, { TupleDesc tupDesc; AttrNumber num_phys_attrs, - attr_count, num_defaults = cstate->num_defaults; - FmgrInfo *in_functions = cstate->in_functions; - Oid *typioparams = cstate->typioparams; int i; int *defmap = cstate->defmap; ExprState **defexprs = cstate->defexprs; tupDesc = RelationGetDescr(cstate->rel); num_phys_attrs = tupDesc->natts; - attr_count = list_length(cstate->attnumlist); /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool)); - if (!cstate->opts.binary) - { - char **field_strings; - ListCell *cur; - int fldct; - int fieldno; - char *string; - - /* read raw fields in the next line */ - if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) - return false; - - /* check for overflowing fields */ - if (attr_count > 0 && fldct > attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); - - fieldno = 0; - - /* Loop to read the user attributes on the line. */ - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); - string = field_strings[fieldno++]; - - if (cstate->convert_select_flags && - !cstate->convert_select_flags[m]) - { - /* ignore input field, leaving column as NULL */ - continue; - } - - if (cstate->opts.csv_mode) - { - if (string == NULL && - cstate->opts.force_notnull_flags[m]) - { - /* - * FORCE_NOT_NULL option is set and column is NULL - - * convert it to the NULL string. - */ - string = cstate->opts.null_print; - } - else if (string != NULL && cstate->opts.force_null_flags[m] - && strcmp(string, cstate->opts.null_print) == 0) - { - /* - * FORCE_NULL option is set and column matches the NULL - * string. It must have been quoted, or otherwise the - * string would already have been set to NULL. Convert it - * to NULL as specified. - */ - string = NULL; - } - } - - cstate->cur_attname = NameStr(att->attname); - cstate->cur_attval = string; - - if (string != NULL) - nulls[m] = false; - - if (cstate->defaults[m]) - { - /* - * The caller must supply econtext and have switched into the - * per-tuple memory context in it. - */ - Assert(econtext != NULL); - Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); - - values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); - } - - /* - * If ON_ERROR is specified with IGNORE, skip rows with soft - * errors - */ - else if (!InputFunctionCallSafe(&in_functions[m], - string, - typioparams[m], - att->atttypmod, - (Node *) cstate->escontext, - &values[m])) - { - Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP); - - cstate->num_errors++; - - if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE) - { - /* - * Since we emit line number and column info in the below - * notice message, we suppress error context information - * other than the relation name. - */ - Assert(!cstate->relname_only); - cstate->relname_only = true; - - if (cstate->cur_attval) - { - char *attval; - - attval = CopyLimitPrintoutLength(cstate->cur_attval); - ereport(NOTICE, - errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"", - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname, - attval)); - pfree(attval); - } - else - ereport(NOTICE, - errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input", - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname)); - - /* reset relname_only */ - cstate->relname_only = false; - } - - return true; - } - - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; - } - - Assert(fieldno == attr_count); - } - else - { - /* binary */ - int16 fld_count; - ListCell *cur; - - cstate->cur_lineno++; - - if (!CopyGetInt16(cstate, &fld_count)) - { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } - - if (fld_count == -1) - { - /* - * Received EOF marker. Wait for the protocol-level EOF, and - * complain if it doesn't come immediately. In COPY FROM STDIN, - * this ensures that we correctly handle CopyFail, if client - * chooses to send that now. When copying from file, we could - * ignore the rest of the file like in text mode, but we choose to - * be consistent with the COPY FROM STDIN case. - */ - char dummy; - - if (CopyReadBinaryData(cstate, &dummy, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; - } - - if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - cstate->cur_attname = NameStr(att->attname); - values[m] = CopyReadBinaryAttribute(cstate, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; - } - } + /* Get one row from source */ + if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls)) + return false; /* * Now compute and insert any defaults available for the columns not @@ -1079,6 +893,242 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, return true; } +/* Implementation of the per-row callback for text format */ +bool +CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, + bool *nulls) +{ + return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, false); +} + +/* Implementation of the per-row callback for CSV format */ +bool +CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, + bool *nulls) +{ + return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, true); +} + +/* + * Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow(). + * + * We use pg_attribute_always_inline to reduce function call overhead + * and to help compilers to optimize away the 'is_csv' condition. + */ +static pg_attribute_always_inline bool +CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls, bool is_csv) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + ExprState **defexprs = cstate->defexprs; + char **field_strings; + ListCell *cur; + int fldct; + int fieldno; + char *string; + + tupDesc = RelationGetDescr(cstate->rel); + attr_count = list_length(cstate->attnumlist); + + /* read raw fields in the next line */ + if (!NextCopyFromRawFields(cstate, &field_strings, &fldct, is_csv)) + return false; + + /* check for overflowing fields */ + if (attr_count > 0 && fldct > attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + + fieldno = 0; + + /* Loop to read the user attributes on the line. */ + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + if (fieldno >= fldct) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + string = field_strings[fieldno++]; + + if (cstate->convert_select_flags && + !cstate->convert_select_flags[m]) + { + /* ignore input field, leaving column as NULL */ + continue; + } + + if (is_csv) + { + if (string == NULL && + cstate->opts.force_notnull_flags[m]) + { + /* + * FORCE_NOT_NULL option is set and column is NULL - convert + * it to the NULL string. + */ + string = cstate->opts.null_print; + } + else if (string != NULL && cstate->opts.force_null_flags[m] + && strcmp(string, cstate->opts.null_print) == 0) + { + /* + * FORCE_NULL option is set and column matches the NULL + * string. It must have been quoted, or otherwise the string + * would already have been set to NULL. Convert it to NULL as + * specified. + */ + string = NULL; + } + } + + cstate->cur_attname = NameStr(att->attname); + cstate->cur_attval = string; + + if (string != NULL) + nulls[m] = false; + + if (cstate->defaults[m]) + { + /* We must have switched into the per-tuple memory context */ + Assert(econtext != NULL); + Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); + + values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); + } + + /* + * If ON_ERROR is specified with IGNORE, skip rows with soft errors + */ + else if (!InputFunctionCallSafe(&in_functions[m], + string, + typioparams[m], + att->atttypmod, + (Node *) cstate->escontext, + &values[m])) + { + Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP); + + cstate->num_errors++; + + if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE) + { + /* + * Since we emit line number and column info in the below + * notice message, we suppress error context information other + * than the relation name. + */ + Assert(!cstate->relname_only); + cstate->relname_only = true; + + if (cstate->cur_attval) + { + char *attval; + + attval = CopyLimitPrintoutLength(cstate->cur_attval); + ereport(NOTICE, + errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"", + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname, + attval)); + pfree(attval); + } + else + ereport(NOTICE, + errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input", + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname)); + + /* reset relname_only */ + cstate->relname_only = false; + } + + return true; + } + + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + } + + Assert(fieldno == attr_count); + + return true; +} + +/* Implementation of the per-row callback for binary format */ +bool +CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, + bool *nulls) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + int16 fld_count; + ListCell *cur; + + tupDesc = RelationGetDescr(cstate->rel); + attr_count = list_length(cstate->attnumlist); + + cstate->cur_lineno++; + + if (!CopyGetInt16(cstate, &fld_count)) + { + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } + + if (fld_count == -1) + { + /* + * Received EOF marker. Wait for the protocol-level EOF, and complain + * if it doesn't come immediately. In COPY FROM STDIN, this ensures + * that we correctly handle CopyFail, if client chooses to send that + * now. When copying from file, we could ignore the rest of the file + * like in text mode, but we choose to be consistent with the COPY + * FROM STDIN case. + */ + char dummy; + + if (CopyReadBinaryData(cstate, &dummy, 1) > 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("received copy data after EOF marker"))); + return false; + } + + if (fld_count != attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + values[m] = CopyReadBinaryAttribute(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + + return true; +} + /* * Read the next input line and stash it in line_buf. * @@ -1087,7 +1137,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, * in the final value of line_buf. */ static bool -CopyReadLine(CopyFromState cstate) +CopyReadLine(CopyFromState cstate, bool is_csv) { bool result; @@ -1095,7 +1145,7 @@ CopyReadLine(CopyFromState cstate) cstate->line_buf_valid = false; /* Parse data and transfer into line_buf */ - result = CopyReadLineText(cstate); + result = CopyReadLineText(cstate, is_csv); if (result) { @@ -1163,7 +1213,7 @@ CopyReadLine(CopyFromState cstate) * CopyReadLineText - inner loop of CopyReadLine for text mode */ static bool -CopyReadLineText(CopyFromState cstate) +CopyReadLineText(CopyFromState cstate, bool is_csv) { char *copy_input_buf; int input_buf_ptr; @@ -1178,7 +1228,7 @@ CopyReadLineText(CopyFromState cstate) char quotec = '\0'; char escapec = '\0'; - if (cstate->opts.csv_mode) + if (is_csv) { quotec = cstate->opts.quote[0]; escapec = cstate->opts.escape[0]; @@ -1255,7 +1305,7 @@ CopyReadLineText(CopyFromState cstate) prev_raw_ptr = input_buf_ptr; c = copy_input_buf[input_buf_ptr++]; - if (cstate->opts.csv_mode) + if (is_csv) { /* * If character is '\r', we may need to look ahead below. Force @@ -1294,7 +1344,7 @@ CopyReadLineText(CopyFromState cstate) } /* Process \r */ - if (c == '\r' && (!cstate->opts.csv_mode || !in_quote)) + if (c == '\r' && (!is_csv || !in_quote)) { /* Check for \r\n on first line, _and_ handle \r\n. */ if (cstate->eol_type == EOL_UNKNOWN || @@ -1322,10 +1372,10 @@ CopyReadLineText(CopyFromState cstate) if (cstate->eol_type == EOL_CRNL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal carriage return found in data") : errmsg("unquoted carriage return found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\r\" to represent carriage return.") : errhint("Use quoted CSV field to represent carriage return."))); @@ -1339,10 +1389,10 @@ CopyReadLineText(CopyFromState cstate) else if (cstate->eol_type == EOL_NL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal carriage return found in data") : errmsg("unquoted carriage return found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\r\" to represent carriage return.") : errhint("Use quoted CSV field to represent carriage return."))); /* If reach here, we have found the line terminator */ @@ -1350,15 +1400,15 @@ CopyReadLineText(CopyFromState cstate) } /* Process \n */ - if (c == '\n' && (!cstate->opts.csv_mode || !in_quote)) + if (c == '\n' && (!is_csv || !in_quote)) { if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal newline found in data") : errmsg("unquoted newline found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\n\" to represent newline.") : errhint("Use quoted CSV field to represent newline."))); cstate->eol_type = EOL_NL; /* in case not set yet */ @@ -1370,7 +1420,7 @@ CopyReadLineText(CopyFromState cstate) * Process backslash, except in CSV mode where backslash is a normal * character. */ - if (c == '\\' && !cstate->opts.csv_mode) + if (c == '\\' && !is_csv) { char c2; diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 06dfdfef721..7bc044e2816 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -107,8 +107,6 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where extern void EndCopyFrom(CopyFromState cstate); extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls); -extern bool NextCopyFromRawFields(CopyFromState cstate, - char ***fields, int *nfields); extern void CopyFromErrorCallback(void *arg); extern char *CopyLimitPrintoutLength(const char *str); diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index bd2d386816e..2a2d2f9876b 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * * copyapi.h - * API for COPY TO handlers + * API for COPY TO/FROM handlers * * * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group @@ -54,4 +54,52 @@ typedef struct CopyToRoutine void (*CopyToEnd) (CopyToState cstate); } CopyToRoutine; +/* + * API structure for a COPY FROM format implementation. Note this must be + * allocated in a server-lifetime manner, typically as a static const struct. + */ +typedef struct CopyFromRoutine +{ + /* + * Set input function information. This callback is called once at the + * beginning of COPY FROM. + * + * 'finfo' can be optionally filled to provide the catalog information of + * the input function. + * + * 'typioparam' can be optionally filled to define the OID of the type to + * pass to the input function.'atttypid' is the OID of data type used by + * the relation's attribute. + */ + void (*CopyFromInFunc) (CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam); + + /* + * Start a COPY FROM. This callback is called once at the beginning of + * COPY FROM. + * + * 'tupDesc' is the tuple descriptor of the relation where the data needs + * to be copied. This can be used for any initialization steps required by + * a format. + */ + void (*CopyFromStart) (CopyFromState cstate, TupleDesc tupDesc); + + /* + * Read one row from the source and fill *values and *nulls. + * + * 'econtext' is used to evaluate default expression for each column that + * is either not read from the file or is using the DEFAULT option of COPY + * FROM. It is NULL if no default values are used. + * + * Returns false if there are no more tuples to read. + */ + bool (*CopyFromOneRow) (CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); + + /* + * End a COPY FROM. This callback is called once at the end of COPY FROM. + */ + void (*CopyFromEnd) (CopyFromState cstate); +} CopyFromRoutine; + #endif /* COPYAPI_H */ diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index 1d8ac8f62e6..c8b22af22d8 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -58,6 +58,9 @@ typedef enum CopyInsertMethod */ typedef struct CopyFromStateData { + /* format routine */ + const struct CopyFromRoutine *routine; + /* low-level state data */ CopySource copy_src; /* type of copy source */ FILE *copy_file; /* used if copy_src == COPY_FILE */ @@ -183,4 +186,12 @@ typedef struct CopyFromStateData extern void ReceiveCopyBegin(CopyFromState cstate); extern void ReceiveCopyBinaryHeader(CopyFromState cstate); +/* One-row callbacks for built-in formats defined in copyfromparse.c */ +extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); + #endif /* COPYFROM_INTERNAL_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index fcb968e1ffe..56989aa0b84 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -501,6 +501,7 @@ ConvertRowtypeExpr CookedConstraint CopyDest CopyFormatOptions +CopyFromRoutine CopyFromState CopyFromStateData CopyHeaderChoice