mirror of
https://github.com/postgres/postgres.git
synced 2025-07-02 09:02:37 +03:00
Add contrib/file_fdw foreign-data wrapper for reading files via COPY.
This is both very useful in its own right, and an important test case for the core FDW support. This commit includes a small refactoring of copy.c to expose its option checking code as a separately callable function. The original patch submission duplicated hundreds of lines of that code, which seemed pretty unmaintainable. Shigeru Hanada, reviewed by Itagaki Takahiro and Tom Lane
This commit is contained in:
@ -114,7 +114,10 @@ typedef struct CopyStateData
|
||||
char *delim; /* column delimiter (must be 1 byte) */
|
||||
char *quote; /* CSV quote char (must be 1 byte) */
|
||||
char *escape; /* CSV escape char (must be 1 byte) */
|
||||
List *force_quote; /* list of column names */
|
||||
bool force_quote_all; /* FORCE QUOTE *? */
|
||||
bool *force_quote_flags; /* per-column CSV FQ flags */
|
||||
List *force_notnull; /* list of column names */
|
||||
bool *force_notnull_flags; /* per-column CSV FNN flags */
|
||||
|
||||
/* these are just for error messages, see CopyFromErrorCallback */
|
||||
@ -764,7 +767,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
|
||||
|
||||
tupDesc = RelationGetDescr(rel);
|
||||
attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
|
||||
foreach (cur, attnums)
|
||||
foreach(cur, attnums)
|
||||
{
|
||||
int attno = lfirst_int(cur) -
|
||||
FirstLowInvalidHeapAttributeNumber;
|
||||
@ -814,52 +817,33 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
|
||||
}
|
||||
|
||||
/*
|
||||
* Common setup routines used by BeginCopyFrom and BeginCopyTo.
|
||||
* Process the statement option list for COPY.
|
||||
*
|
||||
* Iff <binary>, unload or reload in the binary format, as opposed to the
|
||||
* more wasteful but more robust and portable text format.
|
||||
* Scan the options list (a list of DefElem) and transpose the information
|
||||
* into cstate, applying appropriate error checking.
|
||||
*
|
||||
* Iff <oids>, unload or reload the format that includes OID information.
|
||||
* On input, we accept OIDs whether or not the table has an OID column,
|
||||
* but silently drop them if it does not. On output, we report an error
|
||||
* if the user asks for OIDs in a table that has none (not providing an
|
||||
* OID column might seem friendlier, but could seriously confuse programs).
|
||||
* cstate is assumed to be filled with zeroes initially.
|
||||
*
|
||||
* If in the text format, delimit columns with delimiter <delim> and print
|
||||
* NULL values as <null_print>.
|
||||
* This is exported so that external users of the COPY API can sanity-check
|
||||
* a list of options. In that usage, cstate should be passed as NULL
|
||||
* (since external users don't know sizeof(CopyStateData)) and the collected
|
||||
* data is just leaked until CurrentMemoryContext is reset.
|
||||
*
|
||||
* Note that additional checking, such as whether column names listed in FORCE
|
||||
* QUOTE actually exist, has to be applied later. This just checks for
|
||||
* self-consistency of the options list.
|
||||
*/
|
||||
static CopyState
|
||||
BeginCopy(bool is_from,
|
||||
Relation rel,
|
||||
Node *raw_query,
|
||||
const char *queryString,
|
||||
List *attnamelist,
|
||||
List *options)
|
||||
void
|
||||
ProcessCopyOptions(CopyState cstate,
|
||||
bool is_from,
|
||||
List *options)
|
||||
{
|
||||
CopyState cstate;
|
||||
List *force_quote = NIL;
|
||||
List *force_notnull = NIL;
|
||||
bool force_quote_all = false;
|
||||
bool format_specified = false;
|
||||
ListCell *option;
|
||||
TupleDesc tupDesc;
|
||||
int num_phys_attrs;
|
||||
MemoryContext oldcontext;
|
||||
|
||||
/* Allocate workspace and zero all fields */
|
||||
cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
|
||||
|
||||
/*
|
||||
* We allocate everything used by a cstate in a new memory context.
|
||||
* This would avoid memory leaks repeated uses of COPY in a query.
|
||||
*/
|
||||
cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"COPY",
|
||||
ALLOCSET_DEFAULT_MINSIZE,
|
||||
ALLOCSET_DEFAULT_INITSIZE,
|
||||
ALLOCSET_DEFAULT_MAXSIZE);
|
||||
|
||||
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
|
||||
/* Support external use for option sanity checking */
|
||||
if (cstate == NULL)
|
||||
cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
|
||||
|
||||
/* Extract options from the statement node tree */
|
||||
foreach(option, options)
|
||||
@ -936,14 +920,14 @@ BeginCopy(bool is_from,
|
||||
}
|
||||
else if (strcmp(defel->defname, "force_quote") == 0)
|
||||
{
|
||||
if (force_quote || force_quote_all)
|
||||
if (cstate->force_quote || cstate->force_quote_all)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("conflicting or redundant options")));
|
||||
if (defel->arg && IsA(defel->arg, A_Star))
|
||||
force_quote_all = true;
|
||||
cstate->force_quote_all = true;
|
||||
else if (defel->arg && IsA(defel->arg, List))
|
||||
force_quote = (List *) defel->arg;
|
||||
cstate->force_quote = (List *) defel->arg;
|
||||
else
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
@ -952,12 +936,12 @@ BeginCopy(bool is_from,
|
||||
}
|
||||
else if (strcmp(defel->defname, "force_not_null") == 0)
|
||||
{
|
||||
if (force_notnull)
|
||||
if (cstate->force_notnull)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("conflicting or redundant options")));
|
||||
if (defel->arg && IsA(defel->arg, List))
|
||||
force_notnull = (List *) defel->arg;
|
||||
cstate->force_notnull = (List *) defel->arg;
|
||||
else
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
@ -1071,21 +1055,21 @@ BeginCopy(bool is_from,
|
||||
errmsg("COPY escape must be a single one-byte character")));
|
||||
|
||||
/* Check force_quote */
|
||||
if (!cstate->csv_mode && (force_quote != NIL || force_quote_all))
|
||||
if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("COPY force quote available only in CSV mode")));
|
||||
if ((force_quote != NIL || force_quote_all) && is_from)
|
||||
if ((cstate->force_quote || cstate->force_quote_all) && is_from)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("COPY force quote only available using COPY TO")));
|
||||
|
||||
/* Check force_notnull */
|
||||
if (!cstate->csv_mode && force_notnull != NIL)
|
||||
if (!cstate->csv_mode && cstate->force_notnull != NIL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("COPY force not null available only in CSV mode")));
|
||||
if (force_notnull != NIL && !is_from)
|
||||
if (cstate->force_notnull != NIL && !is_from)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("COPY force not null only available using COPY FROM")));
|
||||
@ -1102,7 +1086,55 @@ BeginCopy(bool is_from,
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("CSV quote character must not appear in the NULL specification")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Common setup routines used by BeginCopyFrom and BeginCopyTo.
|
||||
*
|
||||
* Iff <binary>, unload or reload in the binary format, as opposed to the
|
||||
* more wasteful but more robust and portable text format.
|
||||
*
|
||||
* Iff <oids>, unload or reload the format that includes OID information.
|
||||
* On input, we accept OIDs whether or not the table has an OID column,
|
||||
* but silently drop them if it does not. On output, we report an error
|
||||
* if the user asks for OIDs in a table that has none (not providing an
|
||||
* OID column might seem friendlier, but could seriously confuse programs).
|
||||
*
|
||||
* If in the text format, delimit columns with delimiter <delim> and print
|
||||
* NULL values as <null_print>.
|
||||
*/
|
||||
static CopyState
|
||||
BeginCopy(bool is_from,
|
||||
Relation rel,
|
||||
Node *raw_query,
|
||||
const char *queryString,
|
||||
List *attnamelist,
|
||||
List *options)
|
||||
{
|
||||
CopyState cstate;
|
||||
TupleDesc tupDesc;
|
||||
int num_phys_attrs;
|
||||
MemoryContext oldcontext;
|
||||
|
||||
/* Allocate workspace and zero all fields */
|
||||
cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
|
||||
|
||||
/*
|
||||
* We allocate everything used by a cstate in a new memory context.
|
||||
* This avoids memory leaks during repeated use of COPY in a query.
|
||||
*/
|
||||
cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"COPY",
|
||||
ALLOCSET_DEFAULT_MINSIZE,
|
||||
ALLOCSET_DEFAULT_INITSIZE,
|
||||
ALLOCSET_DEFAULT_MAXSIZE);
|
||||
|
||||
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
|
||||
|
||||
/* Extract options from the statement node tree */
|
||||
ProcessCopyOptions(cstate, is_from, options);
|
||||
|
||||
/* Process the source/target relation or query */
|
||||
if (rel)
|
||||
{
|
||||
Assert(!raw_query);
|
||||
@ -1197,19 +1229,19 @@ BeginCopy(bool is_from,
|
||||
|
||||
/* Convert FORCE QUOTE name list to per-column flags, check validity */
|
||||
cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
|
||||
if (force_quote_all)
|
||||
if (cstate->force_quote_all)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i = 0; i < num_phys_attrs; i++)
|
||||
cstate->force_quote_flags[i] = true;
|
||||
}
|
||||
else if (force_quote)
|
||||
else if (cstate->force_quote)
|
||||
{
|
||||
List *attnums;
|
||||
ListCell *cur;
|
||||
|
||||
attnums = CopyGetAttnums(tupDesc, cstate->rel, force_quote);
|
||||
attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
|
||||
|
||||
foreach(cur, attnums)
|
||||
{
|
||||
@ -1226,12 +1258,12 @@ BeginCopy(bool is_from,
|
||||
|
||||
/* Convert FORCE NOT NULL name list to per-column flags, check validity */
|
||||
cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
|
||||
if (force_notnull)
|
||||
if (cstate->force_notnull)
|
||||
{
|
||||
List *attnums;
|
||||
ListCell *cur;
|
||||
|
||||
attnums = CopyGetAttnums(tupDesc, cstate->rel, force_notnull);
|
||||
attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
|
||||
|
||||
foreach(cur, attnums)
|
||||
{
|
||||
|
Reference in New Issue
Block a user