1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-08 11:42:09 +03:00

Complete TODO item:

o -Allow dump/load of CSV format

This adds new keywords to COPY and \copy:

        CSV - enable CSV mode (comma separated variable)
        QUOTE - specify quote character
        ESCAPE - specify escape character
        FORCE - force quoting of specified column
	LITERAL - suppress null comparison for columns

Doc changes included.  Regression updates coming from Andrew.
This commit is contained in:
Bruce Momjian
2004-04-19 17:22:31 +00:00
parent 83ab1c0475
commit 862b20b382
7 changed files with 776 additions and 63 deletions

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/copy.c,v 1.220 2004/04/15 22:36:03 momjian Exp $
* $PostgreSQL: pgsql/src/backend/commands/copy.c,v 1.221 2004/04/19 17:22:30 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@ -70,7 +70,8 @@ typedef enum CopyDest
typedef enum CopyReadResult
{
NORMAL_ATTR,
END_OF_LINE
END_OF_LINE,
UNTERMINATED_FIELD
} CopyReadResult;
/*
@ -130,15 +131,22 @@ static bool line_buf_converted;
/* non-export function prototypes */
static void CopyTo(Relation rel, List *attnumlist, bool binary, bool oids,
char *delim, char *null_print);
char *delim, char *null_print, bool csv_mode, char *quote, char *escape,
List *force_atts);
static void CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
char *delim, char *null_print);
char *delim, char *null_print, bool csv_mode, char *quote, char *escape,
List *literal_atts);
static bool CopyReadLine(void);
static char *CopyReadAttribute(const char *delim, const char *null_print,
CopyReadResult *result, bool *isnull);
static char *CopyReadAttributeCSV(const char *delim, const char *null_print,
char *quote, char *escape,
CopyReadResult *result, bool *isnull);
static Datum CopyReadBinaryAttribute(int column_no, FmgrInfo *flinfo,
Oid typelem, bool *isnull);
static void CopyAttributeOut(char *string, char *delim);
static void CopyAttributeOutCSV(char *string, char *delim, char *quote,
char *escape, bool force_quote);
static List *CopyGetAttnums(Relation rel, List *attnamelist);
static void limit_printout_length(StringInfo buf);
@ -682,8 +690,15 @@ DoCopy(const CopyStmt *stmt)
List *attnumlist;
bool binary = false;
bool oids = false;
bool csv_mode = false;
char *delim = NULL;
char *quote = NULL;
char *escape = NULL;
char *null_print = NULL;
List *force = NIL;
List *literal = NIL;
List *force_atts = NIL;
List *literal_atts = NIL;
Relation rel;
AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
AclResult aclresult;
@ -725,6 +740,46 @@ DoCopy(const CopyStmt *stmt)
errmsg("conflicting or redundant options")));
null_print = strVal(defel->arg);
}
else if (strcmp(defel->defname, "csv") == 0)
{
if (csv_mode)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
csv_mode = intVal(defel->arg);
}
else if (strcmp(defel->defname, "quote") == 0)
{
if (quote)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
quote = strVal(defel->arg);
}
else if (strcmp(defel->defname, "escape") == 0)
{
if (escape)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
escape = strVal(defel->arg);
}
else if (strcmp(defel->defname, "force") == 0)
{
if (force)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
force = (List *)defel->arg;
}
else if (strcmp(defel->defname, "literal") == 0)
{
if (literal)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
literal = (List *)defel->arg;
}
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
@ -735,6 +790,11 @@ DoCopy(const CopyStmt *stmt)
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("cannot specify DELIMITER in BINARY mode")));
if (binary && csv_mode)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("cannot specify CSV in BINARY mode")));
if (binary && null_print)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@ -742,10 +802,92 @@ DoCopy(const CopyStmt *stmt)
/* Set defaults */
if (!delim)
delim = "\t";
delim = csv_mode ? "," : "\t";
if (!null_print)
null_print = "\\N";
null_print = csv_mode ? "" : "\\N";
if (csv_mode)
{
if (!quote)
quote = "\"";
if (!escape)
escape = quote;
}
/*
* Only single-character delimiter strings are supported.
*/
if (strlen(delim) != 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY delimiter must be a single character")));
/*
* Check quote
*/
if (!csv_mode && quote != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY quote available only in CSV mode")));
if (csv_mode && strlen(quote) != 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY quote must be a single character")));
/*
* Check escape
*/
if (!csv_mode && escape != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY escape available only in CSV mode")));
if (csv_mode && strlen(escape) != 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY escape must be a single character")));
/*
* Check force
*/
if (!csv_mode && force != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY force available only in CSV mode")));
if (force != NIL && is_from)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY force only available using COPY TO")));
/*
* Check literal
*/
if (!csv_mode && literal != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY literal available only in CSV mode")));
if (literal != NIL && !is_from)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY literal only available using COPY FROM")));
/*
* Don't allow the delimiter to appear in the null string.
*/
if (strchr(null_print, delim[0]) != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY delimiter must not appear in the NULL specification")));
/*
* Don't allow the csv quote char to appear in the null string.
*/
if (csv_mode && strchr(null_print, quote[0]) != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CSV quote character must not appear in the NULL specification")));
/*
* Open and lock the relation, using the appropriate lock type.
@ -771,22 +913,6 @@ DoCopy(const CopyStmt *stmt)
errhint("Anyone can COPY to stdout or from stdin. "
"psql's \\copy command also works for anyone.")));
/*
* Presently, only single-character delimiter strings are supported.
*/
if (strlen(delim) != 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY delimiter must be a single character")));
/*
* Don't allow the delimiter to appear in the null string.
*/
if (strchr(null_print, delim[0]) != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY delimiter must not appear in the NULL specification")));
/*
* Don't allow COPY w/ OIDs to or from a table without them
*/
@ -801,6 +927,52 @@ DoCopy(const CopyStmt *stmt)
*/
attnumlist = CopyGetAttnums(rel, attnamelist);
/*
* Check that FORCE references valid COPY columns
*/
if (force)
{
TupleDesc tupDesc = RelationGetDescr(rel);
Form_pg_attribute *attr = tupDesc->attrs;
List *cur;
force_atts = CopyGetAttnums(rel, force);
foreach(cur, force_atts)
{
int attnum = lfirsti(cur);
if (!intMember(attnum, attnumlist))
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("FORCE column \"%s\" not referenced by COPY",
NameStr(attr[attnum - 1]->attname))));
}
}
/*
* Check that LITERAL references valid COPY columns
*/
if (literal)
{
List *cur;
TupleDesc tupDesc = RelationGetDescr(rel);
Form_pg_attribute *attr = tupDesc->attrs;
literal_atts = CopyGetAttnums(rel, literal);
foreach(cur, literal_atts)
{
int attnum = lfirsti(cur);
if (!intMember(attnum, attnumlist))
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("LITERAL column \"%s\" not referenced by COPY",
NameStr(attr[attnum - 1]->attname))));
}
}
/*
* Set up variables to avoid per-attribute overhead.
*/
@ -864,7 +1036,8 @@ DoCopy(const CopyStmt *stmt)
errmsg("\"%s\" is a directory", filename)));
}
}
CopyFrom(rel, attnumlist, binary, oids, delim, null_print);
CopyFrom(rel, attnumlist, binary, oids, delim, null_print, csv_mode,
quote, escape, literal_atts);
}
else
{ /* copy from database to file */
@ -926,7 +1099,8 @@ DoCopy(const CopyStmt *stmt)
errmsg("\"%s\" is a directory", filename)));
}
}
CopyTo(rel, attnumlist, binary, oids, delim, null_print);
CopyTo(rel, attnumlist, binary, oids, delim, null_print, csv_mode,
quote, escape, force_atts);
}
if (!pipe)
@ -958,7 +1132,8 @@ DoCopy(const CopyStmt *stmt)
*/
static void
CopyTo(Relation rel, List *attnumlist, bool binary, bool oids,
char *delim, char *null_print)
char *delim, char *null_print, bool csv_mode, char *quote,
char *escape, List *force_atts)
{
HeapTuple tuple;
TupleDesc tupDesc;
@ -967,6 +1142,7 @@ CopyTo(Relation rel, List *attnumlist, bool binary, bool oids,
int attr_count;
Form_pg_attribute *attr;
FmgrInfo *out_functions;
bool *force_quote;
Oid *elements;
bool *isvarlena;
char *string;
@ -988,11 +1164,12 @@ CopyTo(Relation rel, List *attnumlist, bool binary, bool oids,
out_functions = (FmgrInfo *) palloc((num_phys_attrs + 1) * sizeof(FmgrInfo));
elements = (Oid *) palloc((num_phys_attrs + 1) * sizeof(Oid));
isvarlena = (bool *) palloc((num_phys_attrs + 1) * sizeof(bool));
force_quote = (bool *) palloc((num_phys_attrs + 1) * sizeof(bool));
foreach(cur, attnumlist)
{
int attnum = lfirsti(cur);
Oid out_func_oid;
if (binary)
getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
&out_func_oid, &elements[attnum - 1],
@ -1002,6 +1179,11 @@ CopyTo(Relation rel, List *attnumlist, bool binary, bool oids,
&out_func_oid, &elements[attnum - 1],
&isvarlena[attnum - 1]);
fmgr_info(out_func_oid, &out_functions[attnum - 1]);
if (intMember(attnum, force_atts))
force_quote[attnum - 1] = true;
else
force_quote[attnum - 1] = false;
}
/*
@ -1051,7 +1233,6 @@ CopyTo(Relation rel, List *attnumlist, bool binary, bool oids,
while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
{
bool need_delim = false;
CHECK_FOR_INTERRUPTS();
MemoryContextReset(mycontext);
@ -1113,7 +1294,15 @@ CopyTo(Relation rel, List *attnumlist, bool binary, bool oids,
value,
ObjectIdGetDatum(elements[attnum - 1]),
Int32GetDatum(attr[attnum - 1]->atttypmod)));
CopyAttributeOut(string, delim);
if (csv_mode)
{
CopyAttributeOutCSV(string, delim, quote, escape,
(strcmp(string, null_print) == 0 ||
force_quote[attnum - 1]));
}
else
CopyAttributeOut(string, delim);
}
else
{
@ -1148,6 +1337,7 @@ CopyTo(Relation rel, List *attnumlist, bool binary, bool oids,
pfree(out_functions);
pfree(elements);
pfree(isvarlena);
pfree(force_quote);
}
@ -1243,7 +1433,8 @@ limit_printout_length(StringInfo buf)
*/
static void
CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
char *delim, char *null_print)
char *delim, char *null_print, bool csv_mode, char *quote,
char *escape, List *literal_atts)
{
HeapTuple tuple;
TupleDesc tupDesc;
@ -1256,9 +1447,10 @@ CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
Oid *elements;
Oid oid_in_element;
ExprState **constraintexprs;
bool *literal_nullstr;
bool hasConstraints = false;
int i;
int attnum;
int i;
List *cur;
Oid in_func_oid;
Datum *values;
@ -1317,6 +1509,7 @@ CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
defmap = (int *) palloc((num_phys_attrs + 1) * sizeof(int));
defexprs = (ExprState **) palloc((num_phys_attrs + 1) * sizeof(ExprState *));
constraintexprs = (ExprState **) palloc0((num_phys_attrs + 1) * sizeof(ExprState *));
literal_nullstr = (bool *) palloc((num_phys_attrs + 1) * sizeof(bool));
for (attnum = 1; attnum <= num_phys_attrs; attnum++)
{
@ -1333,6 +1526,11 @@ CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
&in_func_oid, &elements[attnum - 1]);
fmgr_info(in_func_oid, &in_functions[attnum - 1]);
if (intMember(attnum, literal_atts))
literal_nullstr[attnum - 1] = true;
else
literal_nullstr[attnum - 1] = false;
/* Get default info if needed */
if (!intMember(attnum, attnumlist))
{
@ -1389,9 +1587,7 @@ CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
ExecBSInsertTriggers(estate, resultRelInfo);
if (!binary)
{
file_has_oids = oids; /* must rely on user to tell us this... */
}
else
{
/* Read and verify binary header */
@ -1500,6 +1696,7 @@ CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
if (file_has_oids)
{
/* can't be in CSV mode here */
string = CopyReadAttribute(delim, null_print,
&result, &isnull);
@ -1538,14 +1735,27 @@ CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
errmsg("missing data for column \"%s\"",
NameStr(attr[m]->attname))));
string = CopyReadAttribute(delim, null_print,
&result, &isnull);
if (isnull)
if (csv_mode)
{
/* we read an SQL NULL, no need to do anything */
string = CopyReadAttributeCSV(delim, null_print, quote,
escape, &result, &isnull);
if (result == UNTERMINATED_FIELD)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("unterminated CSV quoted field")));
}
else
string = CopyReadAttribute(delim, null_print,
&result, &isnull);
if (csv_mode && isnull && literal_nullstr[m])
{
string = null_print; /* set to NULL string */
isnull = false;
}
/* we read an SQL NULL, no need to do anything */
if (!isnull)
{
copy_attname = NameStr(attr[m]->attname);
values[m] = FunctionCall3(&in_functions[m],
@ -1732,11 +1942,12 @@ CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
pfree(values);
pfree(nulls);
if (!binary)
{
pfree(in_functions);
pfree(elements);
}
pfree(in_functions);
pfree(elements);
pfree(defmap);
pfree(defexprs);
pfree(constraintexprs);
pfree(literal_nullstr);
ExecDropTupleTable(tupleTable, true);
@ -2070,6 +2281,152 @@ CopyReadAttribute(const char *delim, const char *null_print,
return attribute_buf.data;
}
/*
* Read the value of a single attribute in CSV mode,
* performing de-escaping as needed. Escaping does not follow the normal
* PostgreSQL text mode, but instead "standard" (i.e. common) CSV usage.
*
* Quoted fields can span lines, in which case the line end is embedded
* in the returned string.
*
* null_print is the null marker string. Note that this is compared to
* the pre-de-escaped input string (thus if it is quoted it is not a NULL).
*
* *result is set to indicate what terminated the read:
* NORMAL_ATTR: column delimiter
* END_OF_LINE: end of line
* UNTERMINATED_FIELD no quote detected at end of a quoted field
*
* In any case, the string read up to the terminator (or end of file)
* is returned.
*
* *isnull is set true or false depending on whether the input matched
* the null marker. Note that the caller cannot check this since the
* returned string will be the post-de-escaping equivalent, which may
* look the same as some valid data string.
*----------
*/
static char *
CopyReadAttributeCSV(const char *delim, const char *null_print, char *quote,
char *escape, CopyReadResult *result, bool *isnull)
{
char delimc = delim[0];
char quotec = quote[0];
char escapec = escape[0];
char c;
int start_cursor = line_buf.cursor;
int end_cursor = start_cursor;
int input_len;
bool in_quote = false;
bool saw_quote = false;
/* reset attribute_buf to empty */
attribute_buf.len = 0;
attribute_buf.data[0] = '\0';
/* set default status */
*result = END_OF_LINE;
for (;;)
{
/* handle multiline quoted fields */
if (in_quote && line_buf.cursor >= line_buf.len)
{
bool done;
switch(eol_type)
{
case EOL_NL:
appendStringInfoString(&attribute_buf,"\n");
break;
case EOL_CR:
appendStringInfoString(&attribute_buf,"\r");
break;
case EOL_CRNL:
appendStringInfoString(&attribute_buf,"\r\n");
break;
case EOL_UNKNOWN:
/* shouldn't happen - just keep going */
break;
}
copy_lineno++;
done = CopyReadLine();
if (done && line_buf.len == 0)
break;
start_cursor = line_buf.cursor;
}
end_cursor = line_buf.cursor;
if (line_buf.cursor >= line_buf.len)
break;
c = line_buf.data[line_buf.cursor++];
/*
* unquoted field delimiter
*/
if (!in_quote && c == delimc)
{
*result = NORMAL_ATTR;
break;
}
/*
* start of quoted field (or part of field)
*/
if (!in_quote && c == quotec)
{
saw_quote = true;
in_quote = true;
continue;
}
/*
* escape within a quoted field
*/
if (in_quote && c == escapec)
{
/*
* peek at the next char if available, and escape it if it
* is an escape char or a quote char
*/
if (line_buf.cursor <= line_buf.len)
{
char nextc = line_buf.data[line_buf.cursor];
if (nextc == escapec || nextc == quotec)
{
appendStringInfoCharMacro(&attribute_buf, nextc);
line_buf.cursor++;
continue;
}
}
}
/*
* end of quoted field.
* Must do this test after testing for escape in case quote char
* and escape char are the same (which is the common case).
*/
if (in_quote && c == quotec)
{
in_quote = false;
continue;
}
appendStringInfoCharMacro(&attribute_buf, c);
}
if (in_quote)
*result = UNTERMINATED_FIELD;
/* check whether raw input matched null marker */
input_len = end_cursor - start_cursor;
if (!saw_quote && input_len == strlen(null_print) &&
strncmp(&line_buf.data[start_cursor], null_print, input_len) == 0)
*isnull = true;
else
*isnull = false;
return attribute_buf.data;
}
/*
* Read a binary attribute
*/
@ -2195,6 +2552,73 @@ CopyAttributeOut(char *server_string, char *delim)
}
}
/*
* Send CSV representation of one attribute, with conversion and
* CSV type escaping
*/
static void
CopyAttributeOutCSV(char *server_string, char *delim, char *quote,
char *escape, bool force_quote)
{
char *string;
char c;
char delimc = delim[0];
char quotec = quote[0];
char escapec = escape[0];
bool need_quote = force_quote;
char *test_string;
bool same_encoding;
int mblen;
int i;
same_encoding = (server_encoding == client_encoding);
if (!same_encoding)
string = (char *) pg_server_to_client((unsigned char *) server_string,
strlen(server_string));
else
string = server_string;
/* have to run through the string twice,
* first time to see if it needs quoting, second to actually send it
*/
for(test_string = string;
!need_quote && (c = *test_string) != '\0';
test_string += mblen)
{
if (c == delimc || c == quotec || c == '\n' || c == '\r')
need_quote = true;
if (!same_encoding)
mblen = pg_encoding_mblen(client_encoding, test_string);
else
mblen = 1;
}
if (need_quote)
CopySendChar(quotec);
for (; (c = *string) != '\0'; string += mblen)
{
if (c == quotec || c == escapec)
CopySendChar(escapec);
CopySendChar(c);
if (!same_encoding)
{
/* send additional bytes of the char, if any */
mblen = pg_encoding_mblen(client_encoding, string);
for (i = 1; i < mblen; i++)
CopySendChar(string[i]);
}
else
mblen = 1;
}
if (need_quote)
CopySendChar(quotec);
}
/*
* CopyGetAttnums - build an integer list of attnums to be copied
*