1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-25 13:17:41 +03:00

Allow batch insertion during COPY into a foreign table.

Commit 3d956d956 allowed the COPY, but it's done by inserting individual
rows to the foreign table, so it can be inefficient due to the overhead
caused by each round-trip to the foreign server.  To improve performance
of the COPY in such a case, this patch allows batch insertion, by
extending the multi-insert machinery in CopyFrom() to the foreign-table
case so that we insert multiple rows to the foreign table at once using
the FDW callback routine added by commit b663a4136.  This patch also
allows this for postgres_fdw.  It is enabled by the "batch_size" option
added by commit b663a4136, which is disabled by default.

When doing batch insertion, we update progress of the COPY command after
performing the FDW callback routine, to count rows not suppressed by the
FDW as well as a BEFORE ROW INSERT trigger.  For consistency, this patch
changes the timing of updating it for plain tables: previously, we
updated it immediately after adding each row to the multi-insert buffer,
but we do so only after writing the rows stored in the buffer out to the
table using table_multi_insert(), which I think would be consistent even
with non-batching mode, because in that mode we update it after writing
each row out to the table using table_tuple_insert().

Andrey Lepikhov, heavily revised by me, with review from Ian Barwick,
Andrey Lepikhov, and Zhihong Yu.

Discussion: https://postgr.es/m/bc489202-9855-7550-d64c-ad2d83c24867%40postgrespro.ru
This commit is contained in:
Etsuro Fujita
2022-10-13 18:45:00 +09:00
parent 56c19fee2d
commit 97da48246d
7 changed files with 464 additions and 93 deletions

View File

@@ -8608,6 +8608,39 @@ select tableoid::regclass, * FROM remp1;
remp1 | 1 | bar
(2 rows)
delete from ctrtest;
-- Test copy tuple routing with the batch_size option enabled
alter server loopback options (add batch_size '2');
copy ctrtest from stdin;
select tableoid::regclass, * FROM ctrtest;
tableoid | a | b
----------+---+-------
remp1 | 1 | foo
remp1 | 1 | bar
remp1 | 1 | test1
remp2 | 2 | baz
remp2 | 2 | qux
remp2 | 2 | test2
(6 rows)
select tableoid::regclass, * FROM remp1;
tableoid | a | b
----------+---+-------
remp1 | 1 | foo
remp1 | 1 | bar
remp1 | 1 | test1
(3 rows)
select tableoid::regclass, * FROM remp2;
tableoid | b | a
----------+-------+---
remp2 | baz | 2
remp2 | qux | 2
remp2 | test2 | 2
(3 rows)
delete from ctrtest;
alter server loopback options (drop batch_size);
drop table ctrtest;
drop table loct1;
drop table loct2;
@@ -8771,6 +8804,78 @@ select * from rem3;
drop foreign table rem3;
drop table loc3;
-- Test COPY FROM with the batch_size option enabled
alter server loopback options (add batch_size '2');
-- Test basic functionality
copy rem2 from stdin;
select * from rem2;
f1 | f2
----+-----
1 | foo
2 | bar
3 | baz
(3 rows)
delete from rem2;
-- Test check constraints
alter table loc2 add constraint loc2_f1positive check (f1 >= 0);
alter foreign table rem2 add constraint rem2_f1positive check (f1 >= 0);
-- check constraint is enforced on the remote side, not locally
copy rem2 from stdin;
copy rem2 from stdin; -- ERROR
ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive"
DETAIL: Failing row contains (-1, xyzzy).
CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2)
COPY rem2
select * from rem2;
f1 | f2
----+-----
1 | foo
2 | bar
3 | baz
(3 rows)
alter foreign table rem2 drop constraint rem2_f1positive;
alter table loc2 drop constraint loc2_f1positive;
delete from rem2;
-- Test remote triggers
create trigger trig_row_before_insert before insert on loc2
for each row execute procedure trig_row_before_insupdate();
-- The new values are concatenated with ' triggered !'
copy rem2 from stdin;
select * from rem2;
f1 | f2
----+-----------------
1 | foo triggered !
2 | bar triggered !
3 | baz triggered !
(3 rows)
drop trigger trig_row_before_insert on loc2;
delete from rem2;
create trigger trig_null before insert on loc2
for each row execute procedure trig_null();
-- Nothing happens
copy rem2 from stdin;
select * from rem2;
f1 | f2
----+----
(0 rows)
drop trigger trig_null on loc2;
delete from rem2;
-- Check with zero-column foreign table; batch insert will be disabled
alter table loc2 drop column f1;
alter table loc2 drop column f2;
alter table rem2 drop column f1;
alter table rem2 drop column f2;
copy rem2 from stdin;
select * from rem2;
--
(3 rows)
delete from rem2;
alter server loopback options (drop batch_size);
-- ===================================================================
-- test for TRUNCATE
-- ===================================================================

View File

@@ -2057,6 +2057,15 @@ postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo)
resultRelInfo->ri_TrigDesc->trig_insert_after_row)))
return 1;
/*
* If the foreign table has no columns, disable batching as the INSERT
* syntax doesn't allow batching multiple empty rows into a zero-column
* table in a single statement. This is needed for COPY FROM, in which
* case fmstate must be non-NULL.
*/
if (fmstate && list_length(fmstate->target_attrs) == 0)
return 1;
/*
* Otherwise use the batch size specified for server/table. The number of
* parameters in a batch is limited to 65535 (uint16), so make sure we

View File

@@ -2373,6 +2373,28 @@ copy remp1 from stdin;
select tableoid::regclass, * FROM remp1;
delete from ctrtest;
-- Test copy tuple routing with the batch_size option enabled
alter server loopback options (add batch_size '2');
copy ctrtest from stdin;
1 foo
1 bar
2 baz
2 qux
1 test1
2 test2
\.
select tableoid::regclass, * FROM ctrtest;
select tableoid::regclass, * FROM remp1;
select tableoid::regclass, * FROM remp2;
delete from ctrtest;
alter server loopback options (drop batch_size);
drop table ctrtest;
drop table loct1;
drop table loct2;
@@ -2527,6 +2549,86 @@ select * from rem3;
drop foreign table rem3;
drop table loc3;
-- Test COPY FROM with the batch_size option enabled
alter server loopback options (add batch_size '2');
-- Test basic functionality
copy rem2 from stdin;
1 foo
2 bar
3 baz
\.
select * from rem2;
delete from rem2;
-- Test check constraints
alter table loc2 add constraint loc2_f1positive check (f1 >= 0);
alter foreign table rem2 add constraint rem2_f1positive check (f1 >= 0);
-- check constraint is enforced on the remote side, not locally
copy rem2 from stdin;
1 foo
2 bar
3 baz
\.
copy rem2 from stdin; -- ERROR
-1 xyzzy
\.
select * from rem2;
alter foreign table rem2 drop constraint rem2_f1positive;
alter table loc2 drop constraint loc2_f1positive;
delete from rem2;
-- Test remote triggers
create trigger trig_row_before_insert before insert on loc2
for each row execute procedure trig_row_before_insupdate();
-- The new values are concatenated with ' triggered !'
copy rem2 from stdin;
1 foo
2 bar
3 baz
\.
select * from rem2;
drop trigger trig_row_before_insert on loc2;
delete from rem2;
create trigger trig_null before insert on loc2
for each row execute procedure trig_null();
-- Nothing happens
copy rem2 from stdin;
1 foo
2 bar
3 baz
\.
select * from rem2;
drop trigger trig_null on loc2;
delete from rem2;
-- Check with zero-column foreign table; batch insert will be disabled
alter table loc2 drop column f1;
alter table loc2 drop column f2;
alter table rem2 drop column f1;
alter table rem2 drop column f2;
copy rem2 from stdin;
\.
select * from rem2;
delete from rem2;
alter server loopback options (drop batch_size);
-- ===================================================================
-- test for TRUNCATE
-- ===================================================================

View File

@@ -665,7 +665,9 @@ ExecForeignBatchInsert(EState *estate,
<para>
Note that this function is also called when inserting routed tuples into
a foreign-table partition. See the callback functions
a foreign-table partition or executing <command>COPY FROM</command> on
a foreign table, in which case it is called in a different way than it
is in the <command>INSERT</command> case. See the callback functions
described below that allow the FDW to support that.
</para>

View File

@@ -398,6 +398,10 @@ OPTIONS (ADD password_required 'false');
exceeds the limit, the <literal>batch_size</literal> will be adjusted to
avoid an error.
</para>
<para>
This option also applies when copying into foreign tables.
</para>
</listitem>
</varlistentry>

View File

@@ -78,7 +78,8 @@ typedef struct CopyMultiInsertBuffer
{
TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
BulkInsertState bistate; /* BulkInsertState for this rel */
BulkInsertState bistate; /* BulkInsertState for this rel if plain
* table; NULL if foreign table */
int nused; /* number of 'slots' containing tuples */
uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
* stream */
@@ -116,6 +117,12 @@ CopyFromErrorCallback(void *arg)
{
CopyFromState cstate = (CopyFromState) arg;
if (cstate->relname_only)
{
errcontext("COPY %s",
cstate->cur_relname);
return;
}
if (cstate->opts.binary)
{
/* can't usefully display the data */
@@ -222,7 +229,7 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
buffer->resultRelInfo = rri;
buffer->bistate = GetBulkInsertState();
buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
buffer->nused = 0;
return buffer;
@@ -299,83 +306,171 @@ CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
*/
static inline void
CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
CopyMultiInsertBuffer *buffer)
CopyMultiInsertBuffer *buffer,
int64 *processed)
{
MemoryContext oldcontext;
int i;
uint64 save_cur_lineno;
CopyFromState cstate = miinfo->cstate;
EState *estate = miinfo->estate;
CommandId mycid = miinfo->mycid;
int ti_options = miinfo->ti_options;
bool line_buf_valid = cstate->line_buf_valid;
int nused = buffer->nused;
ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
TupleTableSlot **slots = buffer->slots;
int i;
/*
* Print error context information correctly, if one of the operations
* below fails.
*/
cstate->line_buf_valid = false;
save_cur_lineno = cstate->cur_lineno;
/*
* table_multi_insert may leak memory, so switch to short-lived memory
* context before calling it.
*/
oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
table_multi_insert(resultRelInfo->ri_RelationDesc,
slots,
nused,
mycid,
ti_options,
buffer->bistate);
MemoryContextSwitchTo(oldcontext);
for (i = 0; i < nused; i++)
if (resultRelInfo->ri_FdwRoutine)
{
/*
* If there are any indexes, update them for all the inserted tuples,
* and run AFTER ROW INSERT triggers.
*/
if (resultRelInfo->ri_NumIndices > 0)
{
List *recheckIndexes;
int batch_size = resultRelInfo->ri_BatchSize;
int sent = 0;
cstate->cur_lineno = buffer->linenos[i];
recheckIndexes =
ExecInsertIndexTuples(resultRelInfo,
buffer->slots[i], estate, false, false,
NULL, NIL);
ExecARInsertTriggers(estate, resultRelInfo,
slots[i], recheckIndexes,
cstate->transition_capture);
list_free(recheckIndexes);
}
Assert(buffer->bistate == NULL);
/* Ensure that the FDW supports batching and it's enabled */
Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
Assert(batch_size > 1);
/*
* There's no indexes, but see if we need to run AFTER ROW INSERT
* triggers anyway.
* We suppress error context information other than the relation name,
* if one of the operations below fails.
*/
else if (resultRelInfo->ri_TrigDesc != NULL &&
(resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
resultRelInfo->ri_TrigDesc->trig_insert_new_table))
Assert(!cstate->relname_only);
cstate->relname_only = true;
while (sent < nused)
{
cstate->cur_lineno = buffer->linenos[i];
ExecARInsertTriggers(estate, resultRelInfo,
slots[i], NIL, cstate->transition_capture);
int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
int inserted = size;
TupleTableSlot **rslots;
/* insert into foreign table: let the FDW do it */
rslots =
resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
resultRelInfo,
&slots[sent],
NULL,
&inserted);
sent += size;
/* No need to do anything if there are no inserted rows */
if (inserted <= 0)
continue;
/* Triggers on foreign tables should not have transition tables */
Assert(resultRelInfo->ri_TrigDesc == NULL ||
resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
/* Run AFTER ROW INSERT triggers */
if (resultRelInfo->ri_TrigDesc != NULL &&
resultRelInfo->ri_TrigDesc->trig_insert_after_row)
{
Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
for (i = 0; i < inserted; i++)
{
TupleTableSlot *slot = rslots[i];
/*
* AFTER ROW Triggers might reference the tableoid column,
* so (re-)initialize tts_tableOid before evaluating them.
*/
slot->tts_tableOid = relid;
ExecARInsertTriggers(estate, resultRelInfo,
slot, NIL,
cstate->transition_capture);
}
}
/* Update the row counter and progress of the COPY command */
*processed += inserted;
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
*processed);
}
ExecClearTuple(slots[i]);
for (i = 0; i < nused; i++)
ExecClearTuple(slots[i]);
/* reset relname_only */
cstate->relname_only = false;
}
else
{
CommandId mycid = miinfo->mycid;
int ti_options = miinfo->ti_options;
bool line_buf_valid = cstate->line_buf_valid;
uint64 save_cur_lineno = cstate->cur_lineno;
MemoryContext oldcontext;
Assert(buffer->bistate != NULL);
/*
* Print error context information correctly, if one of the operations
* below fails.
*/
cstate->line_buf_valid = false;
/*
* table_multi_insert may leak memory, so switch to short-lived memory
* context before calling it.
*/
oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
table_multi_insert(resultRelInfo->ri_RelationDesc,
slots,
nused,
mycid,
ti_options,
buffer->bistate);
MemoryContextSwitchTo(oldcontext);
for (i = 0; i < nused; i++)
{
/*
* If there are any indexes, update them for all the inserted
* tuples, and run AFTER ROW INSERT triggers.
*/
if (resultRelInfo->ri_NumIndices > 0)
{
List *recheckIndexes;
cstate->cur_lineno = buffer->linenos[i];
recheckIndexes =
ExecInsertIndexTuples(resultRelInfo,
buffer->slots[i], estate, false,
false, NULL, NIL);
ExecARInsertTriggers(estate, resultRelInfo,
slots[i], recheckIndexes,
cstate->transition_capture);
list_free(recheckIndexes);
}
/*
* There's no indexes, but see if we need to run AFTER ROW INSERT
* triggers anyway.
*/
else if (resultRelInfo->ri_TrigDesc != NULL &&
(resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
resultRelInfo->ri_TrigDesc->trig_insert_new_table))
{
cstate->cur_lineno = buffer->linenos[i];
ExecARInsertTriggers(estate, resultRelInfo,
slots[i], NIL,
cstate->transition_capture);
}
ExecClearTuple(slots[i]);
}
/* Update the row counter and progress of the COPY command */
*processed += nused;
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
*processed);
/* reset cur_lineno and line_buf_valid to what they were */
cstate->line_buf_valid = line_buf_valid;
cstate->cur_lineno = save_cur_lineno;
}
/* Mark that all slots are free */
buffer->nused = 0;
/* reset cur_lineno and line_buf_valid to what they were */
cstate->line_buf_valid = line_buf_valid;
cstate->cur_lineno = save_cur_lineno;
}
/*
@@ -387,22 +482,30 @@ static inline void
CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
CopyMultiInsertBuffer *buffer)
{
ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
int i;
/* Ensure buffer was flushed */
Assert(buffer->nused == 0);
/* Remove back-link to ourself */
buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
FreeBulkInsertState(buffer->bistate);
if (resultRelInfo->ri_FdwRoutine == NULL)
{
Assert(buffer->bistate != NULL);
FreeBulkInsertState(buffer->bistate);
}
else
Assert(buffer->bistate == NULL);
/* Since we only create slots on demand, just drop the non-null ones. */
for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
ExecDropSingleTupleTableSlot(buffer->slots[i]);
table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
miinfo->ti_options);
if (resultRelInfo->ri_FdwRoutine == NULL)
table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
miinfo->ti_options);
pfree(buffer);
}
@@ -418,7 +521,8 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
* 'curr_rri'.
*/
static inline void
CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
int64 *processed)
{
ListCell *lc;
@@ -426,7 +530,7 @@ CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
{
CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
CopyMultiInsertBufferFlush(miinfo, buffer);
CopyMultiInsertBufferFlush(miinfo, buffer, processed);
}
miinfo->bufferedTuples = 0;
@@ -679,6 +783,23 @@ CopyFrom(CopyFromState cstate)
resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
resultRelInfo);
/*
* Also, if the named relation is a foreign table, determine if the FDW
* supports batch insert and determine the batch size (a FDW may support
* batching, but it may be disabled for the server/table).
*
* If the FDW does not support batching, we set the batch size to 1.
*/
if (resultRelInfo->ri_FdwRoutine != NULL &&
resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
resultRelInfo->ri_BatchSize =
resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
else
resultRelInfo->ri_BatchSize = 1;
Assert(resultRelInfo->ri_BatchSize >= 1);
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
@@ -708,10 +829,11 @@ CopyFrom(CopyFromState cstate)
/*
* It's generally more efficient to prepare a bunch of tuples for
* insertion, and insert them in one table_multi_insert() call, than call
* table_tuple_insert() separately for every tuple. However, there are a
* number of reasons why we might not be able to do this. These are
* explained below.
* insertion, and insert them in one
* table_multi_insert()/ExecForeignBatchInsert() call, than call
* table_tuple_insert()/ExecForeignInsert() separately for every tuple.
* However, there are a number of reasons why we might not be able to do
* this. These are explained below.
*/
if (resultRelInfo->ri_TrigDesc != NULL &&
(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
@@ -725,6 +847,15 @@ CopyFrom(CopyFromState cstate)
*/
insertMethod = CIM_SINGLE;
}
else if (resultRelInfo->ri_FdwRoutine != NULL &&
resultRelInfo->ri_BatchSize == 1)
{
/*
* Can't support multi-inserts to a foreign table if the FDW does not
* support batching, or it's disabled for the server or foreign table.
*/
insertMethod = CIM_SINGLE;
}
else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
resultRelInfo->ri_TrigDesc->trig_insert_new_table)
{
@@ -737,14 +868,12 @@ CopyFrom(CopyFromState cstate)
*/
insertMethod = CIM_SINGLE;
}
else if (resultRelInfo->ri_FdwRoutine != NULL ||
cstate->volatile_defexprs)
else if (cstate->volatile_defexprs)
{
/*
* Can't support multi-inserts to foreign tables or if there are any
* volatile default expressions in the table. Similarly to the
* trigger case above, such expressions may query the table we're
* inserting into.
* Can't support multi-inserts if there are any volatile default
* expressions in the table. Similarly to the trigger case above,
* such expressions may query the table we're inserting into.
*
* Note: It does not matter if any partitions have any volatile
* default expressions as we use the defaults from the target of the
@@ -767,13 +896,14 @@ CopyFrom(CopyFromState cstate)
* For partitioned tables, we may still be able to perform bulk
* inserts. However, the possibility of this depends on which types
* of triggers exist on the partition. We must disable bulk inserts
* if the partition is a foreign table or it has any before row insert
* or insert instead triggers (same as we checked above for the parent
* table). Since the partition's resultRelInfos are initialized only
* when we actually need to insert the first tuple into them, we must
* have the intermediate insert method of CIM_MULTI_CONDITIONAL to
* flag that we must later determine if we can use bulk-inserts for
* the partition being inserted into.
* if the partition is a foreign table that can't use batching or it
* has any before row insert or insert instead triggers (same as we
* checked above for the parent table). Since the partition's
* resultRelInfos are initialized only when we actually need to insert
* the first tuple into them, we must have the intermediate insert
* method of CIM_MULTI_CONDITIONAL to flag that we must later
* determine if we can use bulk-inserts for the partition being
* inserted into.
*/
if (proute)
insertMethod = CIM_MULTI_CONDITIONAL;
@@ -910,12 +1040,14 @@ CopyFrom(CopyFromState cstate)
/*
* Disable multi-inserts when the partition has BEFORE/INSTEAD
* OF triggers, or if the partition is a foreign partition.
* OF triggers, or if the partition is a foreign table that
* can't use batching.
*/
leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
!has_before_insert_row_trig &&
!has_instead_insert_row_trig &&
resultRelInfo->ri_FdwRoutine == NULL;
(resultRelInfo->ri_FdwRoutine == NULL ||
resultRelInfo->ri_BatchSize > 1);
/* Set the multi-insert buffer to use for this partition. */
if (leafpart_use_multi_insert)
@@ -931,7 +1063,9 @@ CopyFrom(CopyFromState cstate)
* Flush pending inserts if this partition can't use
* batching, so rows are visible to triggers etc.
*/
CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
CopyMultiInsertInfoFlush(&multiInsertInfo,
resultRelInfo,
&processed);
}
if (bistate != NULL)
@@ -1067,7 +1201,17 @@ CopyFrom(CopyFromState cstate)
* buffers out to their tables.
*/
if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
CopyMultiInsertInfoFlush(&multiInsertInfo,
resultRelInfo,
&processed);
/*
* We delay updating the row counter and progress of the
* COPY command until after writing the tuples stored in
* the buffer out to the table, as in single insert mode.
* See CopyMultiInsertBufferFlush().
*/
continue; /* next tuple please */
}
else
{
@@ -1130,7 +1274,7 @@ CopyFrom(CopyFromState cstate)
if (insertMethod != CIM_SINGLE)
{
if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
}
/* Done, clean up */
@@ -1348,6 +1492,7 @@ BeginCopyFrom(ParseState *pstate,
cstate->cur_lineno = 0;
cstate->cur_attname = NULL;
cstate->cur_attval = NULL;
cstate->relname_only = false;
/*
* Allocate buffers for the input pipeline.

View File

@@ -40,13 +40,16 @@ typedef enum EolType
} EolType;
/*
* Represents the heap insert method to be used during COPY FROM.
* Represents the insert method to be used during COPY FROM.
*/
typedef enum CopyInsertMethod
{
CIM_SINGLE, /* use table_tuple_insert or fdw routine */
CIM_MULTI, /* always use table_multi_insert */
CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */
CIM_SINGLE, /* use table_tuple_insert or
* ExecForeignInsert */
CIM_MULTI, /* always use table_multi_insert or
* ExecForeignBatchInsert */
CIM_MULTI_CONDITIONAL /* use table_multi_insert or
* ExecForeignBatchInsert only if valid */
} CopyInsertMethod;
/*
@@ -81,6 +84,7 @@ typedef struct CopyFromStateData
uint64 cur_lineno; /* line number for error messages */
const char *cur_attname; /* current att for error messages */
const char *cur_attval; /* current att value for error messages */
bool relname_only; /* don't output line number, att, etc. */
/*
* Working state