1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-08 11:42:09 +03:00

IMPROVED VERSION APPLIED:

The attached patch completes the following TODO item:

    * Generate failure on short COPY lines rather than pad NULLs

I also restructed a lot of the existing COPY code, did some code
review on the column list patch sent in by Brent Verner a little
while ago, and added some regression tests. I also added an
explicit check (and resultant error) for extra data before
the end-of-line.

Neil Conway
This commit is contained in:
Bruce Momjian
2002-07-30 16:55:06 +00:00
parent 1dedbf2da5
commit 874148fe34
4 changed files with 318 additions and 209 deletions

View File

@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.160 2002/07/20 05:16:57 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.161 2002/07/30 16:55:06 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@ -45,16 +45,21 @@
#define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
#define OCTVALUE(c) ((c) - '0')
typedef enum CopyReadResult
{
NORMAL_ATTR,
END_OF_LINE,
END_OF_FILE
} CopyReadResult;
/* non-export function prototypes */
static void CopyTo(Relation rel, List *attlist, bool binary, bool oids, FILE *fp, char *delim, char *null_print);
static void CopyFrom(Relation rel, List *attlist, bool binary, bool oids, FILE *fp, char *delim, char *null_print);
static Oid GetInputFunction(Oid type);
static Oid GetTypeElement(Oid type);
static void CopyReadNewline(FILE *fp, int *newline);
static char *CopyReadAttribute(FILE *fp, bool *isnull, char *delim, int *newline, char *null_print);
static char *CopyReadAttribute(FILE *fp, const char *delim, CopyReadResult *result);
static void CopyAttributeOut(FILE *fp, char *string, char *delim);
static void CopyAssertAttlist(Relation rel, List* attlist, bool from);
static void CopyCheckAttlist(Relation rel, List *attlist);
static const char BinarySignature[12] = "PGBCOPY\n\377\r\n\0";
@ -271,14 +276,10 @@ DoCopy(const CopyStmt *stmt)
bool pipe = (stmt->filename == NULL);
List *option;
List *attlist = stmt->attlist;
DefElem *dbinary = NULL;
DefElem *doids = NULL;
DefElem *ddelim = NULL;
DefElem *dnull = NULL;
bool binary = false;
bool oids = false;
char *delim = "\t";
char *null_print = "\\N";
char *delim = NULL;
char *null_print = NULL;
FILE *fp;
Relation rel;
AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
@ -289,51 +290,53 @@ DoCopy(const CopyStmt *stmt)
{
DefElem *defel = (DefElem *) lfirst(option);
/* XXX: Should we bother checking for doubled options? */
if (strcmp(defel->defname, "binary") == 0)
{
if (dbinary)
/* should this really be an error? */
if (binary)
elog(ERROR, "COPY: BINARY option appears more than once");
dbinary = defel;
binary = intVal(defel->arg);
}
else if (strcmp(defel->defname, "oids") == 0)
{
if (doids)
/* should this really be an error? */
if (oids)
elog(ERROR, "COPY: OIDS option appears more than once");
doids = defel;
oids = intVal(defel->arg);
}
else if (strcmp(defel->defname, "delimiter") == 0)
{
if (ddelim)
if (delim)
elog(ERROR, "COPY: DELIMITER string may only be defined once in query");
ddelim = defel;
delim = strVal(defel->arg);
}
else if (strcmp(defel->defname, "null") == 0)
{
if (dnull)
if (null_print)
elog(ERROR, "COPY: NULL representation may only be defined once in query");
dnull = defel;
null_print = strVal(defel->arg);
}
else
elog(ERROR, "COPY: option \"%s\" not recognized",
defel->defname);
}
if (dbinary)
binary = intVal(dbinary->arg);
if (doids)
oids = intVal(doids->arg);
if (ddelim)
delim = strVal(ddelim->arg);
if (dnull)
null_print = strVal(dnull->arg);
if (binary && ddelim)
if (binary && delim)
elog(ERROR, "You can not specify the DELIMITER in BINARY mode.");
if (binary && dnull)
if (binary && null_print)
elog(ERROR, "You can not specify NULL in BINARY mode.");
/* Set defaults */
if (!delim)
delim = "\t";
if (!null_print)
null_print = "\\N";
/*
* Open and lock the relation, using the appropriate lock type.
@ -363,6 +366,13 @@ DoCopy(const CopyStmt *stmt)
if (strlen(delim) != 1)
elog(ERROR, "COPY delimiter must be a single character");
/*
* Don't allow COPY w/ OIDs to or from a table without them
*/
if (oids && !rel->rd_rel->relhasoids)
elog(ERROR, "COPY: table \"%s\" does not have OIDs",
RelationGetRelationName(rel));
/*
* Set up variables to avoid per-attribute overhead.
*/
@ -372,22 +382,24 @@ DoCopy(const CopyStmt *stmt)
server_encoding = GetDatabaseEncoding();
#endif
if( attlist == NIL ){
if (attlist == NIL)
{
/* get list of attributes in the relation */
TupleDesc desc = RelationGetDescr(rel);
int i;
for(i = 0; i < desc->natts; ++i){
Ident* id = makeNode(Ident);
for (i = 0; i < desc->natts; ++i)
{
Ident *id = makeNode(Ident);
id->name = NameStr(desc->attrs[i]->attname);
attlist = lappend(attlist,id);
}
}
else{
if( binary ){
else
{
if (binary)
elog(ERROR,"COPY: BINARY format cannot be used with specific column list");
}
/* verify that any user-specified attributes exist in the relation */
CopyAssertAttlist(rel,attlist,is_from);
CopyCheckAttlist(rel, attlist);
}
if (is_from)
@ -532,27 +544,28 @@ CopyTo(Relation rel, List *attlist, bool binary, bool oids,
int16 fld_size;
char *string;
Snapshot mySnapshot;
int copy_attr_count;
int* attmap;
int p = 0;
List* cur;
if (oids && !rel->rd_rel->relhasoids)
elog(ERROR, "COPY: table %s does not have OIDs",
RelationGetRelationName(rel));
int copy_attr_count;
int *attmap;
int p = 0;
List *cur;
tupDesc = rel->rd_att;
attr_count = rel->rd_att->natts;
attr = rel->rd_att->attrs;
copy_attr_count = length(attlist);
attmap = (int *) palloc(copy_attr_count * sizeof(int));
foreach(cur, attlist)
{
attmap = (int*)palloc(copy_attr_count * sizeof(int));
foreach(cur,attlist){
for (i = 0; i < attr_count; i++){
if( strcmp(strVal(lfirst(cur)),NameStr(attr[i]->attname)) == 0){
attmap[p++] = i;
continue;
}
const char *currAtt = strVal(lfirst(cur));
for (i = 0; i < attr_count; i++)
{
if (namestrcmp(&attr[i]->attname, currAtt) == 0)
{
attmap[p++] = i;
continue;
}
}
}
@ -642,7 +655,7 @@ CopyTo(Relation rel, List *attlist, bool binary, bool oids,
Datum origvalue,
value;
bool isnull;
int mi = attmap[i];
int mi = attmap[i];
origvalue = heap_getattr(tuple, mi + 1, tupDesc, &isnull);
@ -767,19 +780,15 @@ CopyFrom(Relation rel, List *attlist, bool binary, bool oids,
Oid in_func_oid;
Datum *values;
char *nulls;
bool isnull;
int done = 0;
char *string;
ResultRelInfo *resultRelInfo;
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
TupleTable tupleTable;
TupleTableSlot *slot;
Oid loaded_oid = InvalidOid;
bool skip_tuple = false;
bool file_has_oids;
int* attmap = NULL;
int* defmap = NULL;
Node** defexprs = NULL; /* array of default att expressions */
int *attmap = NULL;
int *defmap = NULL;
Node **defexprs = NULL; /* array of default att expressions */
ExprContext *econtext; /* used for ExecEvalExpr for default atts */
ExprDoneCond isdone;
@ -810,40 +819,47 @@ CopyFrom(Relation rel, List *attlist, bool binary, bool oids,
slot = ExecAllocTableSlot(tupleTable);
ExecSetSlotDescriptor(slot, tupDesc, false);
if (!binary)
{
/*
* pick up the input function and default expression (if any) for
* each attribute in the relation.
*/
List* cur;
attmap = (int*)palloc(sizeof(int) * attr_count);
defmap = (int*)palloc(sizeof(int) * attr_count);
defexprs = (Node**)palloc(sizeof(Node*) * attr_count);
attmap = (int *) palloc(sizeof(int) * attr_count);
defmap = (int *) palloc(sizeof(int) * attr_count);
defexprs = (Node **) palloc(sizeof(Node *) * attr_count);
in_functions = (FmgrInfo *) palloc(attr_count * sizeof(FmgrInfo));
elements = (Oid *) palloc(attr_count * sizeof(Oid));
for (i = 0; i < attr_count; i++)
{
int p = 0;
bool specified = false;
List *l;
int p = 0;
bool specified = false;
in_func_oid = (Oid) GetInputFunction(attr[i]->atttypid);
fmgr_info(in_func_oid, &in_functions[i]);
elements[i] = GetTypeElement(attr[i]->atttypid);
foreach(cur,attlist){
if( strcmp(strVal(lfirst(cur)),NameStr(attr[i]->attname)) == 0){
foreach(l, attlist)
{
if (namestrcmp(&attr[i]->attname, strVal(lfirst(l))) == 0)
{
attmap[p] = i;
specified = true;
continue;
}
++p;
p++;
}
if( ! specified ){
/* column not specified, try to get a default */
defexprs[def_attr_count] = build_column_default(rel,i+1);
if( defexprs[def_attr_count] != NULL ){
/* if column not specified, use default value if one exists */
if (! specified)
{
defexprs[def_attr_count] = build_column_default(rel, i + 1);
if (defexprs[def_attr_count] != NULL)
{
defmap[def_attr_count] = i;
++def_attr_count;
def_attr_count++;
}
}
}
@ -857,13 +873,11 @@ CopyFrom(Relation rel, List *attlist, bool binary, bool oids,
/* Signature */
CopyGetData(readSig, 12, fp);
if (CopyGetEof(fp) ||
memcmp(readSig, BinarySignature, 12) != 0)
if (CopyGetEof(fp) || memcmp(readSig, BinarySignature, 12) != 0)
elog(ERROR, "COPY BINARY: file signature not recognized");
/* Integer layout field */
CopyGetData(&tmp, sizeof(int32), fp);
if (CopyGetEof(fp) ||
tmp != 0x01020304)
if (CopyGetEof(fp) || tmp != 0x01020304)
elog(ERROR, "COPY BINARY: incompatible integer layout");
/* Flags field */
CopyGetData(&tmp, sizeof(int32), fp);
@ -875,8 +889,7 @@ CopyFrom(Relation rel, List *attlist, bool binary, bool oids,
elog(ERROR, "COPY BINARY: unrecognized critical flags in header");
/* Header extension length */
CopyGetData(&tmp, sizeof(int32), fp);
if (CopyGetEof(fp) ||
tmp < 0)
if (CopyGetEof(fp) || tmp < 0)
elog(ERROR, "COPY BINARY: bogus file header (missing length)");
/* Skip extension header, if present */
while (tmp-- > 0)
@ -890,10 +903,6 @@ CopyFrom(Relation rel, List *attlist, bool binary, bool oids,
elements = NULL;
}
/* Silently drop incoming OIDs if table does not have OIDs */
if (!rel->rd_rel->relhasoids)
oids = false;
values = (Datum *) palloc(attr_count * sizeof(Datum));
nulls = (char *) palloc(attr_count * sizeof(char));
@ -904,6 +913,9 @@ CopyFrom(Relation rel, List *attlist, bool binary, bool oids,
while (!done)
{
bool skip_tuple;
Oid loaded_oid;
CHECK_FOR_INTERRUPTS();
copy_lineno++;
@ -917,16 +929,17 @@ CopyFrom(Relation rel, List *attlist, bool binary, bool oids,
if (!binary)
{
int newline = 0;
CopyReadResult result;
char *string;
if (file_has_oids)
{
string = CopyReadAttribute(fp, &isnull, delim,
&newline, null_print);
if (isnull)
string = CopyReadAttribute(fp, delim, &result);
if (result == END_OF_FILE)
done = 1;
else if (string == NULL || strcmp(string, null_print) == 0)
elog(ERROR, "COPY TEXT: NULL Oid");
else if (string == NULL)
done = 1; /* end of file */
else
{
loaded_oid = DatumGetObjectId(DirectFunctionCall1(oidin,
@ -943,36 +956,45 @@ CopyFrom(Relation rel, List *attlist, bool binary, bool oids,
for (i = 0; i < copy_attr_count && !done; i++)
{
int m = attmap[i];
string = CopyReadAttribute(fp, &isnull, delim,
&newline, null_print);
if( isnull ){
/* nothing */
string = CopyReadAttribute(fp, delim, &result);
/* If we got an end-of-line before we expected, bail out */
if (result == END_OF_LINE && i < (copy_attr_count - 1))
elog(ERROR, "COPY TEXT: Missing data for attribute %d", i + 1);
if (result == END_OF_FILE)
done = 1;
else if (strcmp(string, null_print) == 0)
{
/* we read an SQL NULL, no need to do anything */
}
else if (string == NULL)
done = 1; /* end of file */
else
{
values[m] = FunctionCall3(&in_functions[m],
CStringGetDatum(string),
ObjectIdGetDatum(elements[m]),
Int32GetDatum(attr[m]->atttypmod));
CStringGetDatum(string),
ObjectIdGetDatum(elements[m]),
Int32GetDatum(attr[m]->atttypmod));
nulls[m] = ' ';
}
}
if (result == NORMAL_ATTR && !done)
elog(ERROR, "COPY TEXT: Extra data encountered");
/*
* as above, we only try a default lookup if one is
* known to be available
*/
for (i = 0; i < def_attr_count && !done; i++){
for (i = 0; i < def_attr_count && !done; i++)
{
bool isnull;
values[defmap[i]] = ExecEvalExpr(defexprs[i],econtext,&isnull,&isdone);
if( ! isnull )
nulls[defmap[i]] = ' ';
values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
&isnull, &isdone);
if (! isnull)
nulls[defmap[i]] = ' ';
}
if (!done)
CopyReadNewline(fp, &newline);
}
else
{ /* binary */
@ -980,8 +1002,7 @@ CopyFrom(Relation rel, List *attlist, bool binary, bool oids,
fld_size;
CopyGetData(&fld_count, sizeof(int16), fp);
if (CopyGetEof(fp) ||
fld_count == -1)
if (CopyGetEof(fp) || fld_count == -1)
done = 1;
else
{
@ -1173,36 +1194,25 @@ GetTypeElement(Oid type)
return result;
}
/*
* Reads input from fp until an end of line is seen.
*/
static void
CopyReadNewline(FILE *fp, int *newline)
{
if (!*newline)
{
elog(WARNING, "CopyReadNewline: extra fields ignored");
while (!CopyGetEof(fp) && (CopyGetChar(fp) != '\n'));
}
*newline = 0;
}
/*
* Read the value of a single attribute.
*
* Result is either a string, or NULL (if EOF or a null attribute).
* Note that the caller should not pfree the string!
* Results are returned in the status indicator, as well as the
* return value. If a value was successfully read but there is
* more to read before EOL, NORMAL_ATTR is set and the value read
* is returned. If a value was read and we hit EOL, END_OF_LINE
* is set and the value read is returned. If we hit the EOF,
* END_OF_FILE is set and NULL is returned.
*
* *isnull is set true if a null attribute, else false.
* delim is the column delimiter string (currently always 1 character).
* *newline remembers whether we've seen a newline ending this tuple.
* null_print says how NULL values are represented
* Note: This function does not care about SQL NULL values -- it
* is the caller's responsibility to check if the returned string
* matches what the user specified for the SQL NULL value.
*
* delim is the column delimiter string.
*/
static char *
CopyReadAttribute(FILE *fp, bool *isnull, char *delim, int *newline, char *null_print)
CopyReadAttribute(FILE *fp, const char *delim, CopyReadResult *result)
{
int c;
int delimc = (unsigned char)delim[0];
@ -1220,23 +1230,20 @@ CopyReadAttribute(FILE *fp, bool *isnull, char *delim, int *newline, char *null_
attribute_buf.len = 0;
attribute_buf.data[0] = '\0';
/* if last delimiter was a newline return a NULL attribute */
if (*newline)
{
*isnull = (bool) true;
return NULL;
}
*isnull = (bool) false; /* set default */
/* set default */
*result = NORMAL_ATTR;
for (;;)
{
c = CopyGetChar(fp);
if (c == EOF)
goto endOfFile;
{
*result = END_OF_FILE;
return NULL;
}
if (c == '\n')
{
*newline = 1;
*result = END_OF_LINE;
break;
}
if (c == delimc)
@ -1245,7 +1252,10 @@ CopyReadAttribute(FILE *fp, bool *isnull, char *delim, int *newline, char *null_
{
c = CopyGetChar(fp);
if (c == EOF)
goto endOfFile;
{
*result = END_OF_FILE;
return NULL;
}
switch (c)
{
case '0':
@ -1274,14 +1284,20 @@ CopyReadAttribute(FILE *fp, bool *isnull, char *delim, int *newline, char *null_
else
{
if (c == EOF)
goto endOfFile;
{
*result = END_OF_FILE;
return NULL;
}
CopyDonePeek(fp, c, false /* put back */ );
}
}
else
{
if (c == EOF)
goto endOfFile;
{
*result = END_OF_FILE;
return NULL;
}
CopyDonePeek(fp, c, false /* put back */ );
}
c = val & 0377;
@ -1319,7 +1335,8 @@ CopyReadAttribute(FILE *fp, bool *isnull, char *delim, int *newline, char *null_
c = CopyGetChar(fp);
if (c != '\n')
elog(ERROR, "CopyReadAttribute: end of record marker corrupted");
goto endOfFile;
*result = END_OF_FILE;
return NULL;
}
}
appendStringInfoCharMacro(&attribute_buf, c);
@ -1334,7 +1351,10 @@ CopyReadAttribute(FILE *fp, bool *isnull, char *delim, int *newline, char *null_
{
c = CopyGetChar(fp);
if (c == EOF)
goto endOfFile;
{
*result = END_OF_FILE;
return NULL;
}
appendStringInfoCharMacro(&attribute_buf, c);
}
}
@ -1357,13 +1377,7 @@ CopyReadAttribute(FILE *fp, bool *isnull, char *delim, int *newline, char *null_
}
#endif
if (strcmp(attribute_buf.data, null_print) == 0)
*isnull = true;
return attribute_buf.data;
endOfFile:
return NULL;
}
static void
@ -1454,49 +1468,50 @@ CopyAttributeOut(FILE *fp, char *server_string, char *delim)
}
/*
* CopyAssertAttlist: elog(ERROR,...) if the specified attlist
* CopyCheckAttlist: elog(ERROR,...) if the specified attlist
* is not valid for the Relation
*/
static void
CopyAssertAttlist(Relation rel, List* attlist, bool from)
CopyCheckAttlist(Relation rel, List *attlist)
{
TupleDesc tupDesc;
List* cur;
char* illegalattname = NULL;
int attr_count;
const char* to_or_from;
if( attlist == NIL )
TupleDesc tupDesc;
int attr_count;
List *l;
if (attlist == NIL)
return;
to_or_from = (from == true ? "FROM" : "TO");
tupDesc = RelationGetDescr(rel);
Assert(tupDesc != NULL);
/*
* make sure there aren't more columns specified than are in the table
*/
attr_count = tupDesc->natts;
if( attr_count < length(attlist) )
elog(ERROR,"More columns specified in COPY %s command than in target relation",to_or_from);
Assert(tupDesc != NULL);
/*
* make sure there aren't more columns specified than are in the table
*/
attr_count = tupDesc->natts;
if (attr_count < length(attlist))
elog(ERROR, "COPY: Too many columns specified");
/*
* make sure no columns are specified that don't exist in the table
*/
foreach(cur,attlist)
{
int found = 0;
int i = 0;
for(;i<attr_count;++i)
{
if( strcmp(strVal(lfirst(cur)),NameStr(tupDesc->attrs[i]->attname)) == 0)
++found;
}
if( ! found )
illegalattname = strVal(lfirst(cur));
}
if( illegalattname )
elog(ERROR,"Attribute referenced in COPY %s command does not exist: \"%s.%s\"",to_or_from,RelationGetRelationName(rel),illegalattname);
/*
* make sure no columns are specified that don't exist in the table
*/
foreach(l, attlist)
{
char *colName = strVal(lfirst(l));
bool found = false;
int i;
for (i = 0; i < attr_count; i++)
{
if (namestrcmp(&tupDesc->attrs[i]->attname, colName) == 0)
{
found = true;
break;
}
}
if (!found)
elog(ERROR, "COPY: Specified column \"%s\" does not exist",
colName);
}
}