mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-25 13:17:41 +03:00 
			
		
		
		
	Refactor COPY FROM to use format callback functions.
This commit introduces a new CopyFromRoutine struct, which is a set of
callback routines to read tuples in a specific format. It also makes
COPY FROM with the existing formats (text, CSV, and binary) utilize
these format callbacks.
This change is a preliminary step towards making the COPY FROM command
extensible in terms of input formats.
Similar to 2e4127b6d2, this refactoring contributes to a performance
improvement by reducing the number of "if" branches that need to be
checked on a per-row basis when sending field representations in text
or CSV mode. The performance benchmark results showed ~5% performance
gain in text or CSV mode.
Author: Sutou Kouhei <kou@clear-code.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Michael Paquier <michael@paquier.xyz>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Tomas Vondra <tomas.vondra@enterprisedb.com>
Reviewed-by: Junwang Zhao <zhjwpku@gmail.com>
Discussion: https://postgr.es/m/20231204.153548.2126325458835528809.kou@clear-code.com
			
			
This commit is contained in:
		| @@ -28,7 +28,7 @@ | ||||
| #include "access/tableam.h" | ||||
| #include "access/xact.h" | ||||
| #include "catalog/namespace.h" | ||||
| #include "commands/copy.h" | ||||
| #include "commands/copyapi.h" | ||||
| #include "commands/copyfrom_internal.h" | ||||
| #include "commands/progress.h" | ||||
| #include "commands/trigger.h" | ||||
| @@ -106,6 +106,145 @@ typedef struct CopyMultiInsertInfo | ||||
| /* non-export function prototypes */ | ||||
| static void ClosePipeFromProgram(CopyFromState cstate); | ||||
|  | ||||
| /* | ||||
|  * Built-in format-specific routines. One-row callbacks are defined in | ||||
|  * copyfromparse.c. | ||||
|  */ | ||||
| static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, | ||||
| 								   Oid *typioparam); | ||||
| static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc); | ||||
| static void CopyFromTextLikeEnd(CopyFromState cstate); | ||||
| static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid, | ||||
| 								 FmgrInfo *finfo, Oid *typioparam); | ||||
| static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc); | ||||
| static void CopyFromBinaryEnd(CopyFromState cstate); | ||||
|  | ||||
|  | ||||
| /* | ||||
|  * COPY FROM routines for built-in formats. | ||||
|  * | ||||
|  * CSV and text formats share the same TextLike routines except for the | ||||
|  * one-row callback. | ||||
|  */ | ||||
|  | ||||
| /* text format */ | ||||
| static const CopyFromRoutine CopyFromRoutineText = { | ||||
| 	.CopyFromInFunc = CopyFromTextLikeInFunc, | ||||
| 	.CopyFromStart = CopyFromTextLikeStart, | ||||
| 	.CopyFromOneRow = CopyFromTextOneRow, | ||||
| 	.CopyFromEnd = CopyFromTextLikeEnd, | ||||
| }; | ||||
|  | ||||
| /* CSV format */ | ||||
| static const CopyFromRoutine CopyFromRoutineCSV = { | ||||
| 	.CopyFromInFunc = CopyFromTextLikeInFunc, | ||||
| 	.CopyFromStart = CopyFromTextLikeStart, | ||||
| 	.CopyFromOneRow = CopyFromCSVOneRow, | ||||
| 	.CopyFromEnd = CopyFromTextLikeEnd, | ||||
| }; | ||||
|  | ||||
| /* binary format */ | ||||
| static const CopyFromRoutine CopyFromRoutineBinary = { | ||||
| 	.CopyFromInFunc = CopyFromBinaryInFunc, | ||||
| 	.CopyFromStart = CopyFromBinaryStart, | ||||
| 	.CopyFromOneRow = CopyFromBinaryOneRow, | ||||
| 	.CopyFromEnd = CopyFromBinaryEnd, | ||||
| }; | ||||
|  | ||||
| /* Return a COPY FROM routine for the given options */ | ||||
| static const CopyFromRoutine * | ||||
| CopyFromGetRoutine(CopyFormatOptions opts) | ||||
| { | ||||
| 	if (opts.csv_mode) | ||||
| 		return &CopyFromRoutineCSV; | ||||
| 	else if (opts.binary) | ||||
| 		return &CopyFromRoutineBinary; | ||||
|  | ||||
| 	/* default is text */ | ||||
| 	return &CopyFromRoutineText; | ||||
| } | ||||
|  | ||||
| /* Implementation of the start callback for text and CSV formats */ | ||||
| static void | ||||
| CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc) | ||||
| { | ||||
| 	AttrNumber	attr_count; | ||||
|  | ||||
| 	/* | ||||
| 	 * If encoding conversion is needed, we need another buffer to hold the | ||||
| 	 * converted input data.  Otherwise, we can just point input_buf to the | ||||
| 	 * same buffer as raw_buf. | ||||
| 	 */ | ||||
| 	if (cstate->need_transcoding) | ||||
| 	{ | ||||
| 		cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); | ||||
| 		cstate->input_buf_index = cstate->input_buf_len = 0; | ||||
| 	} | ||||
| 	else | ||||
| 		cstate->input_buf = cstate->raw_buf; | ||||
| 	cstate->input_reached_eof = false; | ||||
|  | ||||
| 	initStringInfo(&cstate->line_buf); | ||||
|  | ||||
| 	/* | ||||
| 	 * Create workspace for CopyReadAttributes results; used by CSV and text | ||||
| 	 * format. | ||||
| 	 */ | ||||
| 	attr_count = list_length(cstate->attnumlist); | ||||
| 	cstate->max_fields = attr_count; | ||||
| 	cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Implementation of the infunc callback for text and CSV formats. Assign | ||||
|  * the input function data to the given *finfo. | ||||
|  */ | ||||
| static void | ||||
| CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, | ||||
| 					   Oid *typioparam) | ||||
| { | ||||
| 	Oid			func_oid; | ||||
|  | ||||
| 	getTypeInputInfo(atttypid, &func_oid, typioparam); | ||||
| 	fmgr_info(func_oid, finfo); | ||||
| } | ||||
|  | ||||
| /* Implementation of the end callback for text and CSV formats */ | ||||
| static void | ||||
| CopyFromTextLikeEnd(CopyFromState cstate) | ||||
| { | ||||
| 	/* nothing to do */ | ||||
| } | ||||
|  | ||||
| /* Implementation of the start callback for binary format */ | ||||
| static void | ||||
| CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc) | ||||
| { | ||||
| 	/* Read and verify binary header */ | ||||
| 	ReceiveCopyBinaryHeader(cstate); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Implementation of the infunc callback for binary format. Assign | ||||
|  * the binary input function to the given *finfo. | ||||
|  */ | ||||
| static void | ||||
| CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid, | ||||
| 					 FmgrInfo *finfo, Oid *typioparam) | ||||
| { | ||||
| 	Oid			func_oid; | ||||
|  | ||||
| 	getTypeBinaryInputInfo(atttypid, &func_oid, typioparam); | ||||
| 	fmgr_info(func_oid, finfo); | ||||
| } | ||||
|  | ||||
| /* Implementation of the end callback for binary format */ | ||||
| static void | ||||
| CopyFromBinaryEnd(CopyFromState cstate) | ||||
| { | ||||
| 	/* nothing to do */ | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * error context callback for COPY FROM | ||||
|  * | ||||
| @@ -1403,7 +1542,6 @@ BeginCopyFrom(ParseState *pstate, | ||||
| 				num_defaults; | ||||
| 	FmgrInfo   *in_functions; | ||||
| 	Oid		   *typioparams; | ||||
| 	Oid			in_func_oid; | ||||
| 	int		   *defmap; | ||||
| 	ExprState **defexprs; | ||||
| 	MemoryContext oldcontext; | ||||
| @@ -1435,6 +1573,9 @@ BeginCopyFrom(ParseState *pstate, | ||||
| 	/* Extract options from the statement node tree */ | ||||
| 	ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options); | ||||
|  | ||||
| 	/* Set the format routine */ | ||||
| 	cstate->routine = CopyFromGetRoutine(cstate->opts); | ||||
|  | ||||
| 	/* Process the target relation */ | ||||
| 	cstate->rel = rel; | ||||
|  | ||||
| @@ -1590,25 +1731,6 @@ BeginCopyFrom(ParseState *pstate, | ||||
| 	cstate->raw_buf_index = cstate->raw_buf_len = 0; | ||||
| 	cstate->raw_reached_eof = false; | ||||
|  | ||||
| 	if (!cstate->opts.binary) | ||||
| 	{ | ||||
| 		/* | ||||
| 		 * If encoding conversion is needed, we need another buffer to hold | ||||
| 		 * the converted input data.  Otherwise, we can just point input_buf | ||||
| 		 * to the same buffer as raw_buf. | ||||
| 		 */ | ||||
| 		if (cstate->need_transcoding) | ||||
| 		{ | ||||
| 			cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); | ||||
| 			cstate->input_buf_index = cstate->input_buf_len = 0; | ||||
| 		} | ||||
| 		else | ||||
| 			cstate->input_buf = cstate->raw_buf; | ||||
| 		cstate->input_reached_eof = false; | ||||
|  | ||||
| 		initStringInfo(&cstate->line_buf); | ||||
| 	} | ||||
|  | ||||
| 	initStringInfo(&cstate->attribute_buf); | ||||
|  | ||||
| 	/* Assign range table and rteperminfos, we'll need them in CopyFrom. */ | ||||
| @@ -1641,13 +1763,9 @@ BeginCopyFrom(ParseState *pstate, | ||||
| 			continue; | ||||
|  | ||||
| 		/* Fetch the input function and typioparam info */ | ||||
| 		if (cstate->opts.binary) | ||||
| 			getTypeBinaryInputInfo(att->atttypid, | ||||
| 								   &in_func_oid, &typioparams[attnum - 1]); | ||||
| 		else | ||||
| 			getTypeInputInfo(att->atttypid, | ||||
| 							 &in_func_oid, &typioparams[attnum - 1]); | ||||
| 		fmgr_info(in_func_oid, &in_functions[attnum - 1]); | ||||
| 		cstate->routine->CopyFromInFunc(cstate, att->atttypid, | ||||
| 										&in_functions[attnum - 1], | ||||
| 										&typioparams[attnum - 1]); | ||||
|  | ||||
| 		/* Get default info if available */ | ||||
| 		defexprs[attnum - 1] = NULL; | ||||
| @@ -1782,20 +1900,7 @@ BeginCopyFrom(ParseState *pstate, | ||||
|  | ||||
| 	pgstat_progress_update_multi_param(3, progress_cols, progress_vals); | ||||
|  | ||||
| 	if (cstate->opts.binary) | ||||
| 	{ | ||||
| 		/* Read and verify binary header */ | ||||
| 		ReceiveCopyBinaryHeader(cstate); | ||||
| 	} | ||||
|  | ||||
| 	/* create workspace for CopyReadAttributes results */ | ||||
| 	if (!cstate->opts.binary) | ||||
| 	{ | ||||
| 		AttrNumber	attr_count = list_length(cstate->attnumlist); | ||||
|  | ||||
| 		cstate->max_fields = attr_count; | ||||
| 		cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); | ||||
| 	} | ||||
| 	cstate->routine->CopyFromStart(cstate, tupDesc); | ||||
|  | ||||
| 	MemoryContextSwitchTo(oldcontext); | ||||
|  | ||||
| @@ -1808,6 +1913,9 @@ BeginCopyFrom(ParseState *pstate, | ||||
| void | ||||
| EndCopyFrom(CopyFromState cstate) | ||||
| { | ||||
| 	/* Invoke the end callback */ | ||||
| 	cstate->routine->CopyFromEnd(cstate); | ||||
|  | ||||
| 	/* No COPY FROM related resources except memory. */ | ||||
| 	if (cstate->is_program) | ||||
| 	{ | ||||
|   | ||||
| @@ -62,7 +62,7 @@ | ||||
| #include <unistd.h> | ||||
| #include <sys/stat.h> | ||||
|  | ||||
| #include "commands/copy.h" | ||||
| #include "commands/copyapi.h" | ||||
| #include "commands/copyfrom_internal.h" | ||||
| #include "commands/progress.h" | ||||
| #include "executor/executor.h" | ||||
| @@ -140,13 +140,18 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; | ||||
|  | ||||
|  | ||||
| /* non-export function prototypes */ | ||||
| static bool CopyReadLine(CopyFromState cstate); | ||||
| static bool CopyReadLineText(CopyFromState cstate); | ||||
| static bool CopyReadLine(CopyFromState cstate, bool is_csv); | ||||
| static bool CopyReadLineText(CopyFromState cstate, bool is_csv); | ||||
| static int	CopyReadAttributesText(CopyFromState cstate); | ||||
| static int	CopyReadAttributesCSV(CopyFromState cstate); | ||||
| static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo, | ||||
| 									 Oid typioparam, int32 typmod, | ||||
| 									 bool *isnull); | ||||
| static pg_attribute_always_inline bool CopyFromTextLikeOneRow(CopyFromState cstate, | ||||
| 															  ExprContext *econtext, | ||||
| 															  Datum *values, | ||||
| 															  bool *nulls, | ||||
| 															  bool is_csv); | ||||
|  | ||||
|  | ||||
| /* Low-level communications functions */ | ||||
| @@ -740,9 +745,12 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes) | ||||
|  * in the relation. | ||||
|  * | ||||
|  * NOTE: force_not_null option are not applied to the returned fields. | ||||
|  * | ||||
|  * We use pg_attribute_always_inline to reduce function call overhead | ||||
|  * and to help compilers to optimize away the 'is_csv' condition. | ||||
|  */ | ||||
| bool | ||||
| NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) | ||||
| static pg_attribute_always_inline bool | ||||
| NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool is_csv) | ||||
| { | ||||
| 	int			fldct; | ||||
| 	bool		done; | ||||
| @@ -759,13 +767,13 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) | ||||
| 		tupDesc = RelationGetDescr(cstate->rel); | ||||
|  | ||||
| 		cstate->cur_lineno++; | ||||
| 		done = CopyReadLine(cstate); | ||||
| 		done = CopyReadLine(cstate, is_csv); | ||||
|  | ||||
| 		if (cstate->opts.header_line == COPY_HEADER_MATCH) | ||||
| 		{ | ||||
| 			int			fldnum; | ||||
|  | ||||
| 			if (cstate->opts.csv_mode) | ||||
| 			if (is_csv) | ||||
| 				fldct = CopyReadAttributesCSV(cstate); | ||||
| 			else | ||||
| 				fldct = CopyReadAttributesText(cstate); | ||||
| @@ -809,7 +817,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) | ||||
| 	cstate->cur_lineno++; | ||||
|  | ||||
| 	/* Actually read the line into memory here */ | ||||
| 	done = CopyReadLine(cstate); | ||||
| 	done = CopyReadLine(cstate, is_csv); | ||||
|  | ||||
| 	/* | ||||
| 	 * EOF at start of line means we're done.  If we see EOF after some | ||||
| @@ -820,7 +828,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) | ||||
| 		return false; | ||||
|  | ||||
| 	/* Parse the line into de-escaped field values */ | ||||
| 	if (cstate->opts.csv_mode) | ||||
| 	if (is_csv) | ||||
| 		fldct = CopyReadAttributesCSV(cstate); | ||||
| 	else | ||||
| 		fldct = CopyReadAttributesText(cstate); | ||||
| @@ -847,33 +855,86 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, | ||||
| { | ||||
| 	TupleDesc	tupDesc; | ||||
| 	AttrNumber	num_phys_attrs, | ||||
| 				attr_count, | ||||
| 				num_defaults = cstate->num_defaults; | ||||
| 	FmgrInfo   *in_functions = cstate->in_functions; | ||||
| 	Oid		   *typioparams = cstate->typioparams; | ||||
| 	int			i; | ||||
| 	int		   *defmap = cstate->defmap; | ||||
| 	ExprState **defexprs = cstate->defexprs; | ||||
|  | ||||
| 	tupDesc = RelationGetDescr(cstate->rel); | ||||
| 	num_phys_attrs = tupDesc->natts; | ||||
| 	attr_count = list_length(cstate->attnumlist); | ||||
|  | ||||
| 	/* Initialize all values for row to NULL */ | ||||
| 	MemSet(values, 0, num_phys_attrs * sizeof(Datum)); | ||||
| 	MemSet(nulls, true, num_phys_attrs * sizeof(bool)); | ||||
| 	MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool)); | ||||
|  | ||||
| 	if (!cstate->opts.binary) | ||||
| 	/* Get one row from source */ | ||||
| 	if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls)) | ||||
| 		return false; | ||||
|  | ||||
| 	/* | ||||
| 	 * Now compute and insert any defaults available for the columns not | ||||
| 	 * provided by the input data.  Anything not processed here or above will | ||||
| 	 * remain NULL. | ||||
| 	 */ | ||||
| 	for (i = 0; i < num_defaults; i++) | ||||
| 	{ | ||||
| 		/* | ||||
| 		 * The caller must supply econtext and have switched into the | ||||
| 		 * per-tuple memory context in it. | ||||
| 		 */ | ||||
| 		Assert(econtext != NULL); | ||||
| 		Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); | ||||
|  | ||||
| 		values[defmap[i]] = ExecEvalExpr(defexprs[defmap[i]], econtext, | ||||
| 										 &nulls[defmap[i]]); | ||||
| 	} | ||||
|  | ||||
| 	return true; | ||||
| } | ||||
|  | ||||
| /* Implementation of the per-row callback for text format */ | ||||
| bool | ||||
| CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, | ||||
| 				   bool *nulls) | ||||
| { | ||||
| 	return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, false); | ||||
| } | ||||
|  | ||||
| /* Implementation of the per-row callback for CSV format */ | ||||
| bool | ||||
| CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, | ||||
| 				  bool *nulls) | ||||
| { | ||||
| 	return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, true); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow(). | ||||
|  * | ||||
|  * We use pg_attribute_always_inline to reduce function call overhead | ||||
|  * and to help compilers to optimize away the 'is_csv' condition. | ||||
|  */ | ||||
| static pg_attribute_always_inline bool | ||||
| CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext, | ||||
| 					   Datum *values, bool *nulls, bool is_csv) | ||||
| { | ||||
| 	TupleDesc	tupDesc; | ||||
| 	AttrNumber	attr_count; | ||||
| 	FmgrInfo   *in_functions = cstate->in_functions; | ||||
| 	Oid		   *typioparams = cstate->typioparams; | ||||
| 	ExprState **defexprs = cstate->defexprs; | ||||
| 	char	  **field_strings; | ||||
| 	ListCell   *cur; | ||||
| 	int			fldct; | ||||
| 	int			fieldno; | ||||
| 	char	   *string; | ||||
|  | ||||
| 	tupDesc = RelationGetDescr(cstate->rel); | ||||
| 	attr_count = list_length(cstate->attnumlist); | ||||
|  | ||||
| 	/* read raw fields in the next line */ | ||||
| 		if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) | ||||
| 	if (!NextCopyFromRawFields(cstate, &field_strings, &fldct, is_csv)) | ||||
| 		return false; | ||||
|  | ||||
| 	/* check for overflowing fields */ | ||||
| @@ -905,14 +966,14 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, | ||||
| 			continue; | ||||
| 		} | ||||
|  | ||||
| 			if (cstate->opts.csv_mode) | ||||
| 		if (is_csv) | ||||
| 		{ | ||||
| 			if (string == NULL && | ||||
| 				cstate->opts.force_notnull_flags[m]) | ||||
| 			{ | ||||
| 				/* | ||||
| 					 * FORCE_NOT_NULL option is set and column is NULL - | ||||
| 					 * convert it to the NULL string. | ||||
| 				 * FORCE_NOT_NULL option is set and column is NULL - convert | ||||
| 				 * it to the NULL string. | ||||
| 				 */ | ||||
| 				string = cstate->opts.null_print; | ||||
| 			} | ||||
| @@ -921,9 +982,9 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, | ||||
| 			{ | ||||
| 				/* | ||||
| 				 * FORCE_NULL option is set and column matches the NULL | ||||
| 					 * string. It must have been quoted, or otherwise the | ||||
| 					 * string would already have been set to NULL. Convert it | ||||
| 					 * to NULL as specified. | ||||
| 				 * string. It must have been quoted, or otherwise the string | ||||
| 				 * would already have been set to NULL. Convert it to NULL as | ||||
| 				 * specified. | ||||
| 				 */ | ||||
| 				string = NULL; | ||||
| 			} | ||||
| @@ -937,10 +998,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, | ||||
|  | ||||
| 		if (cstate->defaults[m]) | ||||
| 		{ | ||||
| 				/* | ||||
| 				 * The caller must supply econtext and have switched into the | ||||
| 				 * per-tuple memory context in it. | ||||
| 				 */ | ||||
| 			/* We must have switched into the per-tuple memory context */ | ||||
| 			Assert(econtext != NULL); | ||||
| 			Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); | ||||
|  | ||||
| @@ -948,8 +1006,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, | ||||
| 		} | ||||
|  | ||||
| 		/* | ||||
| 			 * If ON_ERROR is specified with IGNORE, skip rows with soft | ||||
| 			 * errors | ||||
| 		 * If ON_ERROR is specified with IGNORE, skip rows with soft errors | ||||
| 		 */ | ||||
| 		else if (!InputFunctionCallSafe(&in_functions[m], | ||||
| 										string, | ||||
| @@ -966,8 +1023,8 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, | ||||
| 			{ | ||||
| 				/* | ||||
| 				 * Since we emit line number and column info in the below | ||||
| 					 * notice message, we suppress error context information | ||||
| 					 * other than the relation name. | ||||
| 				 * notice message, we suppress error context information other | ||||
| 				 * than the relation name. | ||||
| 				 */ | ||||
| 				Assert(!cstate->relname_only); | ||||
| 				cstate->relname_only = true; | ||||
| @@ -1002,13 +1059,25 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, | ||||
| 	} | ||||
|  | ||||
| 	Assert(fieldno == attr_count); | ||||
| 	} | ||||
| 	else | ||||
| 	{ | ||||
| 		/* binary */ | ||||
|  | ||||
| 	return true; | ||||
| } | ||||
|  | ||||
| /* Implementation of the per-row callback for binary format */ | ||||
| bool | ||||
| CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, | ||||
| 					 bool *nulls) | ||||
| { | ||||
| 	TupleDesc	tupDesc; | ||||
| 	AttrNumber	attr_count; | ||||
| 	FmgrInfo   *in_functions = cstate->in_functions; | ||||
| 	Oid		   *typioparams = cstate->typioparams; | ||||
| 	int16		fld_count; | ||||
| 	ListCell   *cur; | ||||
|  | ||||
| 	tupDesc = RelationGetDescr(cstate->rel); | ||||
| 	attr_count = list_length(cstate->attnumlist); | ||||
|  | ||||
| 	cstate->cur_lineno++; | ||||
|  | ||||
| 	if (!CopyGetInt16(cstate, &fld_count)) | ||||
| @@ -1020,12 +1089,12 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, | ||||
| 	if (fld_count == -1) | ||||
| 	{ | ||||
| 		/* | ||||
| 			 * Received EOF marker.  Wait for the protocol-level EOF, and | ||||
| 			 * complain if it doesn't come immediately.  In COPY FROM STDIN, | ||||
| 			 * this ensures that we correctly handle CopyFail, if client | ||||
| 			 * chooses to send that now.  When copying from file, we could | ||||
| 			 * ignore the rest of the file like in text mode, but we choose to | ||||
| 			 * be consistent with the COPY FROM STDIN case. | ||||
| 		 * Received EOF marker.  Wait for the protocol-level EOF, and complain | ||||
| 		 * if it doesn't come immediately.  In COPY FROM STDIN, this ensures | ||||
| 		 * that we correctly handle CopyFail, if client chooses to send that | ||||
| 		 * now.  When copying from file, we could ignore the rest of the file | ||||
| 		 * like in text mode, but we choose to be consistent with the COPY | ||||
| 		 * FROM STDIN case. | ||||
| 		 */ | ||||
| 		char		dummy; | ||||
|  | ||||
| @@ -1056,25 +1125,6 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, | ||||
| 											&nulls[m]); | ||||
| 		cstate->cur_attname = NULL; | ||||
| 	} | ||||
| 	} | ||||
|  | ||||
| 	/* | ||||
| 	 * Now compute and insert any defaults available for the columns not | ||||
| 	 * provided by the input data.  Anything not processed here or above will | ||||
| 	 * remain NULL. | ||||
| 	 */ | ||||
| 	for (i = 0; i < num_defaults; i++) | ||||
| 	{ | ||||
| 		/* | ||||
| 		 * The caller must supply econtext and have switched into the | ||||
| 		 * per-tuple memory context in it. | ||||
| 		 */ | ||||
| 		Assert(econtext != NULL); | ||||
| 		Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); | ||||
|  | ||||
| 		values[defmap[i]] = ExecEvalExpr(defexprs[defmap[i]], econtext, | ||||
| 										 &nulls[defmap[i]]); | ||||
| 	} | ||||
|  | ||||
| 	return true; | ||||
| } | ||||
| @@ -1087,7 +1137,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, | ||||
|  * in the final value of line_buf. | ||||
|  */ | ||||
| static bool | ||||
| CopyReadLine(CopyFromState cstate) | ||||
| CopyReadLine(CopyFromState cstate, bool is_csv) | ||||
| { | ||||
| 	bool		result; | ||||
|  | ||||
| @@ -1095,7 +1145,7 @@ CopyReadLine(CopyFromState cstate) | ||||
| 	cstate->line_buf_valid = false; | ||||
|  | ||||
| 	/* Parse data and transfer into line_buf */ | ||||
| 	result = CopyReadLineText(cstate); | ||||
| 	result = CopyReadLineText(cstate, is_csv); | ||||
|  | ||||
| 	if (result) | ||||
| 	{ | ||||
| @@ -1163,7 +1213,7 @@ CopyReadLine(CopyFromState cstate) | ||||
|  * CopyReadLineText - inner loop of CopyReadLine for text mode | ||||
|  */ | ||||
| static bool | ||||
| CopyReadLineText(CopyFromState cstate) | ||||
| CopyReadLineText(CopyFromState cstate, bool is_csv) | ||||
| { | ||||
| 	char	   *copy_input_buf; | ||||
| 	int			input_buf_ptr; | ||||
| @@ -1178,7 +1228,7 @@ CopyReadLineText(CopyFromState cstate) | ||||
| 	char		quotec = '\0'; | ||||
| 	char		escapec = '\0'; | ||||
|  | ||||
| 	if (cstate->opts.csv_mode) | ||||
| 	if (is_csv) | ||||
| 	{ | ||||
| 		quotec = cstate->opts.quote[0]; | ||||
| 		escapec = cstate->opts.escape[0]; | ||||
| @@ -1255,7 +1305,7 @@ CopyReadLineText(CopyFromState cstate) | ||||
| 		prev_raw_ptr = input_buf_ptr; | ||||
| 		c = copy_input_buf[input_buf_ptr++]; | ||||
|  | ||||
| 		if (cstate->opts.csv_mode) | ||||
| 		if (is_csv) | ||||
| 		{ | ||||
| 			/* | ||||
| 			 * If character is '\r', we may need to look ahead below.  Force | ||||
| @@ -1294,7 +1344,7 @@ CopyReadLineText(CopyFromState cstate) | ||||
| 		} | ||||
|  | ||||
| 		/* Process \r */ | ||||
| 		if (c == '\r' && (!cstate->opts.csv_mode || !in_quote)) | ||||
| 		if (c == '\r' && (!is_csv || !in_quote)) | ||||
| 		{ | ||||
| 			/* Check for \r\n on first line, _and_ handle \r\n. */ | ||||
| 			if (cstate->eol_type == EOL_UNKNOWN || | ||||
| @@ -1322,10 +1372,10 @@ CopyReadLineText(CopyFromState cstate) | ||||
| 					if (cstate->eol_type == EOL_CRNL) | ||||
| 						ereport(ERROR, | ||||
| 								(errcode(ERRCODE_BAD_COPY_FILE_FORMAT), | ||||
| 								 !cstate->opts.csv_mode ? | ||||
| 								 !is_csv ? | ||||
| 								 errmsg("literal carriage return found in data") : | ||||
| 								 errmsg("unquoted carriage return found in data"), | ||||
| 								 !cstate->opts.csv_mode ? | ||||
| 								 !is_csv ? | ||||
| 								 errhint("Use \"\\r\" to represent carriage return.") : | ||||
| 								 errhint("Use quoted CSV field to represent carriage return."))); | ||||
|  | ||||
| @@ -1339,10 +1389,10 @@ CopyReadLineText(CopyFromState cstate) | ||||
| 			else if (cstate->eol_type == EOL_NL) | ||||
| 				ereport(ERROR, | ||||
| 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT), | ||||
| 						 !cstate->opts.csv_mode ? | ||||
| 						 !is_csv ? | ||||
| 						 errmsg("literal carriage return found in data") : | ||||
| 						 errmsg("unquoted carriage return found in data"), | ||||
| 						 !cstate->opts.csv_mode ? | ||||
| 						 !is_csv ? | ||||
| 						 errhint("Use \"\\r\" to represent carriage return.") : | ||||
| 						 errhint("Use quoted CSV field to represent carriage return."))); | ||||
| 			/* If reach here, we have found the line terminator */ | ||||
| @@ -1350,15 +1400,15 @@ CopyReadLineText(CopyFromState cstate) | ||||
| 		} | ||||
|  | ||||
| 		/* Process \n */ | ||||
| 		if (c == '\n' && (!cstate->opts.csv_mode || !in_quote)) | ||||
| 		if (c == '\n' && (!is_csv || !in_quote)) | ||||
| 		{ | ||||
| 			if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL) | ||||
| 				ereport(ERROR, | ||||
| 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT), | ||||
| 						 !cstate->opts.csv_mode ? | ||||
| 						 !is_csv ? | ||||
| 						 errmsg("literal newline found in data") : | ||||
| 						 errmsg("unquoted newline found in data"), | ||||
| 						 !cstate->opts.csv_mode ? | ||||
| 						 !is_csv ? | ||||
| 						 errhint("Use \"\\n\" to represent newline.") : | ||||
| 						 errhint("Use quoted CSV field to represent newline."))); | ||||
| 			cstate->eol_type = EOL_NL;	/* in case not set yet */ | ||||
| @@ -1370,7 +1420,7 @@ CopyReadLineText(CopyFromState cstate) | ||||
| 		 * Process backslash, except in CSV mode where backslash is a normal | ||||
| 		 * character. | ||||
| 		 */ | ||||
| 		if (c == '\\' && !cstate->opts.csv_mode) | ||||
| 		if (c == '\\' && !is_csv) | ||||
| 		{ | ||||
| 			char		c2; | ||||
|  | ||||
|   | ||||
| @@ -107,8 +107,6 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where | ||||
| extern void EndCopyFrom(CopyFromState cstate); | ||||
| extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext, | ||||
| 						 Datum *values, bool *nulls); | ||||
| extern bool NextCopyFromRawFields(CopyFromState cstate, | ||||
| 								  char ***fields, int *nfields); | ||||
| extern void CopyFromErrorCallback(void *arg); | ||||
| extern char *CopyLimitPrintoutLength(const char *str); | ||||
|  | ||||
|   | ||||
| @@ -1,7 +1,7 @@ | ||||
| /*------------------------------------------------------------------------- | ||||
|  * | ||||
|  * copyapi.h | ||||
|  *	  API for COPY TO handlers | ||||
|  *	  API for COPY TO/FROM handlers | ||||
|  * | ||||
|  * | ||||
|  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group | ||||
| @@ -54,4 +54,52 @@ typedef struct CopyToRoutine | ||||
| 	void		(*CopyToEnd) (CopyToState cstate); | ||||
| } CopyToRoutine; | ||||
|  | ||||
| /* | ||||
|  * API structure for a COPY FROM format implementation. Note this must be | ||||
|  * allocated in a server-lifetime manner, typically as a static const struct. | ||||
|  */ | ||||
| typedef struct CopyFromRoutine | ||||
| { | ||||
| 	/* | ||||
| 	 * Set input function information. This callback is called once at the | ||||
| 	 * beginning of COPY FROM. | ||||
| 	 * | ||||
| 	 * 'finfo' can be optionally filled to provide the catalog information of | ||||
| 	 * the input function. | ||||
| 	 * | ||||
| 	 * 'typioparam' can be optionally filled to define the OID of the type to | ||||
| 	 * pass to the input function.'atttypid' is the OID of data type used by | ||||
| 	 * the relation's attribute. | ||||
| 	 */ | ||||
| 	void		(*CopyFromInFunc) (CopyFromState cstate, Oid atttypid, | ||||
| 								   FmgrInfo *finfo, Oid *typioparam); | ||||
|  | ||||
| 	/* | ||||
| 	 * Start a COPY FROM. This callback is called once at the beginning of | ||||
| 	 * COPY FROM. | ||||
| 	 * | ||||
| 	 * 'tupDesc' is the tuple descriptor of the relation where the data needs | ||||
| 	 * to be copied. This can be used for any initialization steps required by | ||||
| 	 * a format. | ||||
| 	 */ | ||||
| 	void		(*CopyFromStart) (CopyFromState cstate, TupleDesc tupDesc); | ||||
|  | ||||
| 	/* | ||||
| 	 * Read one row from the source and fill *values and *nulls. | ||||
| 	 * | ||||
| 	 * 'econtext' is used to evaluate default expression for each column that | ||||
| 	 * is either not read from the file or is using the DEFAULT option of COPY | ||||
| 	 * FROM. It is NULL if no default values are used. | ||||
| 	 * | ||||
| 	 * Returns false if there are no more tuples to read. | ||||
| 	 */ | ||||
| 	bool		(*CopyFromOneRow) (CopyFromState cstate, ExprContext *econtext, | ||||
| 								   Datum *values, bool *nulls); | ||||
|  | ||||
| 	/* | ||||
| 	 * End a COPY FROM. This callback is called once at the end of COPY FROM. | ||||
| 	 */ | ||||
| 	void		(*CopyFromEnd) (CopyFromState cstate); | ||||
| } CopyFromRoutine; | ||||
|  | ||||
| #endif							/* COPYAPI_H */ | ||||
|   | ||||
| @@ -58,6 +58,9 @@ typedef enum CopyInsertMethod | ||||
|  */ | ||||
| typedef struct CopyFromStateData | ||||
| { | ||||
| 	/* format routine */ | ||||
| 	const struct CopyFromRoutine *routine; | ||||
|  | ||||
| 	/* low-level state data */ | ||||
| 	CopySource	copy_src;		/* type of copy source */ | ||||
| 	FILE	   *copy_file;		/* used if copy_src == COPY_FILE */ | ||||
| @@ -183,4 +186,12 @@ typedef struct CopyFromStateData | ||||
| extern void ReceiveCopyBegin(CopyFromState cstate); | ||||
| extern void ReceiveCopyBinaryHeader(CopyFromState cstate); | ||||
|  | ||||
| /* One-row callbacks for built-in formats defined in copyfromparse.c */ | ||||
| extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, | ||||
| 							   Datum *values, bool *nulls); | ||||
| extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, | ||||
| 							  Datum *values, bool *nulls); | ||||
| extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, | ||||
| 								 Datum *values, bool *nulls); | ||||
|  | ||||
| #endif							/* COPYFROM_INTERNAL_H */ | ||||
|   | ||||
| @@ -501,6 +501,7 @@ ConvertRowtypeExpr | ||||
| CookedConstraint | ||||
| CopyDest | ||||
| CopyFormatOptions | ||||
| CopyFromRoutine | ||||
| CopyFromState | ||||
| CopyFromStateData | ||||
| CopyHeaderChoice | ||||
|   | ||||
		Reference in New Issue
	
	Block a user