mirror of
https://github.com/postgres/postgres.git
synced 2025-07-28 23:42:10 +03:00
file_fdw: Add on_error and log_verbosity options to file_fdw.
In v17, the on_error and log_verbosity options were introduced for the COPY command. This commit extends support for these options to file_fdw. Setting on_error = 'ignore' for a file_fdw foreign table allows users to query it without errors, even when the input file contains malformed rows, by skipping the problematic rows. Both on_error and log_verbosity options apply to SELECT and ANALYZE operations on file_fdw foreign tables. Author: Atsushi Torikoshi Reviewed-by: Masahiko Sawada, Fujii Masao Discussion: https://postgr.es/m/ab59dad10490ea3734cf022b16c24cfd@oss.nttdata.com
This commit is contained in:
@ -206,6 +206,25 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a;
|
|||||||
SELECT * FROM agg_bad; -- ERROR
|
SELECT * FROM agg_bad; -- ERROR
|
||||||
ERROR: invalid input syntax for type real: "aaa"
|
ERROR: invalid input syntax for type real: "aaa"
|
||||||
CONTEXT: COPY agg_bad, line 3, column b: "aaa"
|
CONTEXT: COPY agg_bad, line 3, column b: "aaa"
|
||||||
|
-- on_error and log_verbosity tests
|
||||||
|
ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore');
|
||||||
|
SELECT * FROM agg_bad;
|
||||||
|
NOTICE: 1 row was skipped due to data type incompatibility
|
||||||
|
a | b
|
||||||
|
-----+--------
|
||||||
|
100 | 99.097
|
||||||
|
42 | 324.78
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'silent');
|
||||||
|
SELECT * FROM agg_bad;
|
||||||
|
a | b
|
||||||
|
-----+--------
|
||||||
|
100 | 99.097
|
||||||
|
42 | 324.78
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
ANALYZE agg_bad;
|
||||||
-- misc query tests
|
-- misc query tests
|
||||||
\t on
|
\t on
|
||||||
SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv');
|
SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv');
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
#include "catalog/pg_authid.h"
|
#include "catalog/pg_authid.h"
|
||||||
#include "catalog/pg_foreign_table.h"
|
#include "catalog/pg_foreign_table.h"
|
||||||
#include "commands/copy.h"
|
#include "commands/copy.h"
|
||||||
|
#include "commands/copyfrom_internal.h"
|
||||||
#include "commands/defrem.h"
|
#include "commands/defrem.h"
|
||||||
#include "commands/explain.h"
|
#include "commands/explain.h"
|
||||||
#include "commands/vacuum.h"
|
#include "commands/vacuum.h"
|
||||||
@ -74,6 +75,8 @@ static const struct FileFdwOption valid_options[] = {
|
|||||||
{"null", ForeignTableRelationId},
|
{"null", ForeignTableRelationId},
|
||||||
{"default", ForeignTableRelationId},
|
{"default", ForeignTableRelationId},
|
||||||
{"encoding", ForeignTableRelationId},
|
{"encoding", ForeignTableRelationId},
|
||||||
|
{"on_error", ForeignTableRelationId},
|
||||||
|
{"log_verbosity", ForeignTableRelationId},
|
||||||
{"force_not_null", AttributeRelationId},
|
{"force_not_null", AttributeRelationId},
|
||||||
{"force_null", AttributeRelationId},
|
{"force_null", AttributeRelationId},
|
||||||
|
|
||||||
@ -723,38 +726,74 @@ fileIterateForeignScan(ForeignScanState *node)
|
|||||||
FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
|
FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
|
||||||
EState *estate = CreateExecutorState();
|
EState *estate = CreateExecutorState();
|
||||||
ExprContext *econtext;
|
ExprContext *econtext;
|
||||||
MemoryContext oldcontext;
|
MemoryContext oldcontext = CurrentMemoryContext;
|
||||||
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
|
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
|
||||||
bool found;
|
CopyFromState cstate = festate->cstate;
|
||||||
ErrorContextCallback errcallback;
|
ErrorContextCallback errcallback;
|
||||||
|
|
||||||
/* Set up callback to identify error line number. */
|
/* Set up callback to identify error line number. */
|
||||||
errcallback.callback = CopyFromErrorCallback;
|
errcallback.callback = CopyFromErrorCallback;
|
||||||
errcallback.arg = (void *) festate->cstate;
|
errcallback.arg = (void *) cstate;
|
||||||
errcallback.previous = error_context_stack;
|
errcallback.previous = error_context_stack;
|
||||||
error_context_stack = &errcallback;
|
error_context_stack = &errcallback;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We pass ExprContext because there might be a use of the DEFAULT option
|
||||||
|
* in COPY FROM, so we may need to evaluate default expressions.
|
||||||
|
*/
|
||||||
|
econtext = GetPerTupleExprContext(estate);
|
||||||
|
|
||||||
|
retry:
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DEFAULT expressions need to be evaluated in a per-tuple context, so
|
||||||
|
* switch in case we are doing that.
|
||||||
|
*/
|
||||||
|
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The protocol for loading a virtual tuple into a slot is first
|
* The protocol for loading a virtual tuple into a slot is first
|
||||||
* ExecClearTuple, then fill the values/isnull arrays, then
|
* ExecClearTuple, then fill the values/isnull arrays, then
|
||||||
* ExecStoreVirtualTuple. If we don't find another row in the file, we
|
* ExecStoreVirtualTuple. If we don't find another row in the file, we
|
||||||
* just skip the last step, leaving the slot empty as required.
|
* just skip the last step, leaving the slot empty as required.
|
||||||
*
|
*
|
||||||
* We pass ExprContext because there might be a use of the DEFAULT option
|
|
||||||
* in COPY FROM, so we may need to evaluate default expressions.
|
|
||||||
*/
|
*/
|
||||||
ExecClearTuple(slot);
|
ExecClearTuple(slot);
|
||||||
econtext = GetPerTupleExprContext(estate);
|
|
||||||
|
|
||||||
/*
|
if (NextCopyFrom(cstate, econtext, slot->tts_values, slot->tts_isnull))
|
||||||
* DEFAULT expressions need to be evaluated in a per-tuple context, so
|
{
|
||||||
* switch in case we are doing that.
|
if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
|
||||||
*/
|
cstate->escontext->error_occurred)
|
||||||
oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
|
{
|
||||||
found = NextCopyFrom(festate->cstate, econtext,
|
/*
|
||||||
slot->tts_values, slot->tts_isnull);
|
* Soft error occurred, skip this tuple and just make
|
||||||
if (found)
|
* ErrorSaveContext ready for the next NextCopyFrom. Since we
|
||||||
|
* don't set details_wanted and error_data is not to be filled,
|
||||||
|
* just resetting error_occurred is enough.
|
||||||
|
*/
|
||||||
|
cstate->escontext->error_occurred = false;
|
||||||
|
|
||||||
|
/* Switch back to original memory context */
|
||||||
|
MemoryContextSwitchTo(oldcontext);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Make sure we are interruptible while repeatedly calling
|
||||||
|
* NextCopyFrom() until no soft error occurs.
|
||||||
|
*/
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Reset the per-tuple exprcontext, to clean-up after expression
|
||||||
|
* evaluations etc.
|
||||||
|
*/
|
||||||
|
ResetPerTupleExprContext(estate);
|
||||||
|
|
||||||
|
/* Repeat NextCopyFrom() until no soft error occurs */
|
||||||
|
goto retry;
|
||||||
|
}
|
||||||
|
|
||||||
ExecStoreVirtualTuple(slot);
|
ExecStoreVirtualTuple(slot);
|
||||||
|
}
|
||||||
|
|
||||||
/* Switch back to original memory context */
|
/* Switch back to original memory context */
|
||||||
MemoryContextSwitchTo(oldcontext);
|
MemoryContextSwitchTo(oldcontext);
|
||||||
@ -796,8 +835,19 @@ fileEndForeignScan(ForeignScanState *node)
|
|||||||
FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
|
FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
|
||||||
|
|
||||||
/* if festate is NULL, we are in EXPLAIN; nothing to do */
|
/* if festate is NULL, we are in EXPLAIN; nothing to do */
|
||||||
if (festate)
|
if (!festate)
|
||||||
EndCopyFrom(festate->cstate);
|
return;
|
||||||
|
|
||||||
|
if (festate->cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
|
||||||
|
festate->cstate->num_errors > 0 &&
|
||||||
|
festate->cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
|
||||||
|
ereport(NOTICE,
|
||||||
|
errmsg_plural("%llu row was skipped due to data type incompatibility",
|
||||||
|
"%llu rows were skipped due to data type incompatibility",
|
||||||
|
(unsigned long long) festate->cstate->num_errors,
|
||||||
|
(unsigned long long) festate->cstate->num_errors));
|
||||||
|
|
||||||
|
EndCopyFrom(festate->cstate);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -1113,7 +1163,8 @@ estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
|
|||||||
* which must have at least targrows entries.
|
* which must have at least targrows entries.
|
||||||
* The actual number of rows selected is returned as the function result.
|
* The actual number of rows selected is returned as the function result.
|
||||||
* We also count the total number of rows in the file and return it into
|
* We also count the total number of rows in the file and return it into
|
||||||
* *totalrows. Note that *totaldeadrows is always set to 0.
|
* *totalrows. Rows skipped due to on_error = 'ignore' are not included
|
||||||
|
* in this count. Note that *totaldeadrows is always set to 0.
|
||||||
*
|
*
|
||||||
* Note that the returned list of rows is not always in order by physical
|
* Note that the returned list of rows is not always in order by physical
|
||||||
* position in the file. Therefore, correlation estimates derived later
|
* position in the file. Therefore, correlation estimates derived later
|
||||||
@ -1191,6 +1242,21 @@ file_acquire_sample_rows(Relation onerel, int elevel,
|
|||||||
if (!found)
|
if (!found)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
|
||||||
|
cstate->escontext->error_occurred)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Soft error occurred, skip this tuple and just make
|
||||||
|
* ErrorSaveContext ready for the next NextCopyFrom. Since we
|
||||||
|
* don't set details_wanted and error_data is not to be filled,
|
||||||
|
* just resetting error_occurred is enough.
|
||||||
|
*/
|
||||||
|
cstate->escontext->error_occurred = false;
|
||||||
|
|
||||||
|
/* Repeat NextCopyFrom() until no soft error occurs */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The first targrows sample rows are simply copied into the
|
* The first targrows sample rows are simply copied into the
|
||||||
* reservoir. Then we start replacing tuples in the sample until we
|
* reservoir. Then we start replacing tuples in the sample until we
|
||||||
@ -1236,6 +1302,15 @@ file_acquire_sample_rows(Relation onerel, int elevel,
|
|||||||
/* Clean up. */
|
/* Clean up. */
|
||||||
MemoryContextDelete(tupcontext);
|
MemoryContextDelete(tupcontext);
|
||||||
|
|
||||||
|
if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
|
||||||
|
cstate->num_errors > 0 &&
|
||||||
|
cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
|
||||||
|
ereport(NOTICE,
|
||||||
|
errmsg_plural("%llu row was skipped due to data type incompatibility",
|
||||||
|
"%llu rows were skipped due to data type incompatibility",
|
||||||
|
(unsigned long long) cstate->num_errors,
|
||||||
|
(unsigned long long) cstate->num_errors));
|
||||||
|
|
||||||
EndCopyFrom(cstate);
|
EndCopyFrom(cstate);
|
||||||
|
|
||||||
pfree(values);
|
pfree(values);
|
||||||
|
@ -150,6 +150,13 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a;
|
|||||||
-- error context report tests
|
-- error context report tests
|
||||||
SELECT * FROM agg_bad; -- ERROR
|
SELECT * FROM agg_bad; -- ERROR
|
||||||
|
|
||||||
|
-- on_error and log_verbosity tests
|
||||||
|
ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore');
|
||||||
|
SELECT * FROM agg_bad;
|
||||||
|
ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'silent');
|
||||||
|
SELECT * FROM agg_bad;
|
||||||
|
ANALYZE agg_bad;
|
||||||
|
|
||||||
-- misc query tests
|
-- misc query tests
|
||||||
\t on
|
\t on
|
||||||
SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv');
|
SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv');
|
||||||
|
@ -126,6 +126,29 @@
|
|||||||
</listitem>
|
</listitem>
|
||||||
</varlistentry>
|
</varlistentry>
|
||||||
|
|
||||||
|
<varlistentry>
|
||||||
|
<term><literal>on_error</literal></term>
|
||||||
|
|
||||||
|
<listitem>
|
||||||
|
<para>
|
||||||
|
Specifies how to behave when encountering an error converting a column's
|
||||||
|
input value into its data type,
|
||||||
|
the same as <command>COPY</command>'s <literal>ON_ERROR</literal> option.
|
||||||
|
</para>
|
||||||
|
</listitem>
|
||||||
|
</varlistentry>
|
||||||
|
|
||||||
|
<varlistentry>
|
||||||
|
<term><literal>log_verbosity</literal></term>
|
||||||
|
|
||||||
|
<listitem>
|
||||||
|
<para>
|
||||||
|
Specifies the amount of messages emitted by <literal>file_fdw</literal>,
|
||||||
|
the same as <command>COPY</command>'s <literal>LOG_VERBOSITY</literal> option.
|
||||||
|
</para>
|
||||||
|
</listitem>
|
||||||
|
</varlistentry>
|
||||||
|
|
||||||
</variablelist>
|
</variablelist>
|
||||||
|
|
||||||
<para>
|
<para>
|
||||||
|
Reference in New Issue
Block a user