1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-07 00:36:50 +03:00

Add new COPY option SAVE_ERROR_TO

Currently, when source data contains unexpected data regarding data type or
range, the entire COPY fails. However, in some cases, such data can be ignored
and just copying normal data is preferable.

This commit adds a new option SAVE_ERROR_TO, which specifies where to save the
error information. When this option is specified, COPY skips soft errors and
continues copying.

Currently, SAVE_ERROR_TO only supports "none". This indicates error information
is not saved and COPY just skips the unexpected data and continues running.

Later works are expected to add more choices, such as 'log' and 'table'.

Author: Damir Belyalov, Atsushi Torikoshi, Alex Shulgin, Jian He
Discussion: https://postgr.es/m/87k31ftoe0.fsf_-_%40commandprompt.com
Reviewed-by: Pavel Stehule, Andres Freund, Tom Lane, Daniel Gustafsson,
Reviewed-by: Alena Rybakina, Andy Fan, Andrei Lepikhov, Masahiko Sawada
Reviewed-by: Vignesh C, Atsushi Torikoshi
This commit is contained in:
Alexander Korotkov
2024-01-16 23:08:53 +02:00
parent c7e5e994b2
commit 9e2d870119
10 changed files with 239 additions and 7 deletions

View File

@ -394,6 +394,42 @@ defGetCopyHeaderChoice(DefElem *def, bool is_from)
return COPY_HEADER_FALSE; /* keep compiler quiet */
}
/*
* Extract a CopySaveErrorToChoice value from a DefElem.
*/
static CopySaveErrorToChoice
defGetCopySaveErrorToChoice(DefElem *def, ParseState *pstate, bool is_from)
{
char *sval;
if (!is_from)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("COPY SAVE_ERROR_TO cannot be used with COPY TO"),
parser_errposition(pstate, def->location)));
/*
* If no parameter value given, assume the default value.
*/
if (def->arg == NULL)
return COPY_SAVE_ERROR_TO_ERROR;
/*
* Allow "error", or "none" values.
*/
sval = defGetString(def);
if (pg_strcasecmp(sval, "error") == 0)
return COPY_SAVE_ERROR_TO_ERROR;
if (pg_strcasecmp(sval, "none") == 0)
return COPY_SAVE_ERROR_TO_NONE;
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("COPY save_error_to \"%s\" not recognized", sval),
parser_errposition(pstate, def->location)));
return COPY_SAVE_ERROR_TO_ERROR; /* keep compiler quiet */
}
/*
* Process the statement option list for COPY.
*
@ -419,6 +455,7 @@ ProcessCopyOptions(ParseState *pstate,
bool format_specified = false;
bool freeze_specified = false;
bool header_specified = false;
bool save_error_to_specified = false;
ListCell *option;
/* Support external use for option sanity checking */
@ -571,6 +608,13 @@ ProcessCopyOptions(ParseState *pstate,
defel->defname),
parser_errposition(pstate, defel->location)));
}
else if (strcmp(defel->defname, "save_error_to") == 0)
{
if (save_error_to_specified)
errorConflictingDefElem(defel, pstate);
save_error_to_specified = true;
opts_out->save_error_to = defGetCopySaveErrorToChoice(defel, pstate, is_from);
}
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@ -598,6 +642,11 @@ ProcessCopyOptions(ParseState *pstate,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("cannot specify DEFAULT in BINARY mode")));
if (opts_out->binary && opts_out->save_error_to != COPY_SAVE_ERROR_TO_ERROR)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("cannot specify SAVE_ERROR_TO in BINARY mode")));
/* Set defaults for omitted options */
if (!opts_out->delim)
opts_out->delim = opts_out->csv_mode ? "," : "\t";

View File

@ -42,6 +42,7 @@
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/miscnodes.h"
#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
@ -656,6 +657,9 @@ CopyFrom(CopyFromState cstate)
Assert(cstate->rel);
Assert(list_length(cstate->range_table) == 1);
if (cstate->opts.save_error_to != COPY_SAVE_ERROR_TO_ERROR)
Assert(cstate->escontext);
/*
* The target must be a plain, foreign, or partitioned relation, or have
* an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
@ -992,6 +996,25 @@ CopyFrom(CopyFromState cstate)
if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
break;
if (cstate->opts.save_error_to != COPY_SAVE_ERROR_TO_ERROR &&
cstate->escontext->error_occurred)
{
/*
* Soft error occured, skip this tuple and save error information
* according to SAVE_ERROR_TO.
*/
if (cstate->opts.save_error_to == COPY_SAVE_ERROR_TO_NONE)
/*
* Just make ErrorSaveContext ready for the next NextCopyFrom.
* Since we don't set details_wanted and error_data is not to
* be filled, just resetting error_occurred is enough.
*/
cstate->escontext->error_occurred = false;
continue;
}
ExecStoreVirtualTuple(myslot);
/*
@ -1284,6 +1307,14 @@ CopyFrom(CopyFromState cstate)
/* Done, clean up */
error_context_stack = errcallback.previous;
if (cstate->opts.save_error_to != COPY_SAVE_ERROR_TO_ERROR &&
cstate->num_errors > 0)
ereport(NOTICE,
errmsg_plural("%zd row were skipped due to data type incompatibility",
"%zd rows were skipped due to data type incompatibility",
cstate->num_errors,
cstate->num_errors));
if (bistate != NULL)
FreeBulkInsertState(bistate);
@ -1419,6 +1450,23 @@ BeginCopyFrom(ParseState *pstate,
}
}
/* Set up soft error handler for SAVE_ERROR_TO */
if (cstate->opts.save_error_to != COPY_SAVE_ERROR_TO_ERROR)
{
cstate->escontext = makeNode(ErrorSaveContext);
cstate->escontext->type = T_ErrorSaveContext;
cstate->escontext->error_occurred = false;
/*
* Currently we only support COPY_SAVE_ERROR_TO_NONE. We'll add other
* options later
*/
if (cstate->opts.save_error_to == COPY_SAVE_ERROR_TO_NONE)
cstate->escontext->details_wanted = false;
}
else
cstate->escontext = NULL;
/* Convert FORCE_NULL name list to per-column flags, check validity */
cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
if (cstate->opts.force_null_all)

View File

@ -70,6 +70,7 @@
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/miscnodes.h"
#include "pgstat.h"
#include "port/pg_bswap.h"
#include "utils/builtins.h"
@ -955,11 +956,17 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
}
else
values[m] = InputFunctionCall(&in_functions[m],
string,
typioparams[m],
att->atttypmod);
/* If SAVE_ERROR_TO is specified, skip rows with soft errors */
else if (!InputFunctionCallSafe(&in_functions[m],
string,
typioparams[m],
att->atttypmod,
(Node *) cstate->escontext,
&values[m]))
{
cstate->num_errors++;
return true;
}
cstate->cur_attname = NULL;
cstate->cur_attval = NULL;