mirror of
https://github.com/postgres/postgres.git
synced 2025-05-01 01:04:50 +03:00
Reported-by: Michael Paquier Discussion: https://postgr.es/m/ZZKTDPxBBMt3C0J9@paquier.xyz Backpatch-through: 12
1793 lines
54 KiB
C
1793 lines
54 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* copyfrom.c
|
|
* COPY <table> FROM file/program/client
|
|
*
|
|
* This file contains routines needed to efficiently load tuples into a
|
|
* table. That includes looking up the correct partition, firing triggers,
|
|
* calling the table AM function to insert the data, and updating indexes.
|
|
* Reading data from the input file or client and parsing it into Datums
|
|
* is handled in copyfromparse.c.
|
|
*
|
|
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/commands/copyfrom.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
#include "postgres.h"
|
|
|
|
#include <ctype.h>
|
|
#include <unistd.h>
|
|
#include <sys/stat.h>
|
|
|
|
#include "access/heapam.h"
|
|
#include "access/htup_details.h"
|
|
#include "access/tableam.h"
|
|
#include "access/xact.h"
|
|
#include "access/xlog.h"
|
|
#include "catalog/namespace.h"
|
|
#include "commands/copy.h"
|
|
#include "commands/copyfrom_internal.h"
|
|
#include "commands/progress.h"
|
|
#include "commands/trigger.h"
|
|
#include "executor/execPartition.h"
|
|
#include "executor/executor.h"
|
|
#include "executor/nodeModifyTable.h"
|
|
#include "executor/tuptable.h"
|
|
#include "foreign/fdwapi.h"
|
|
#include "libpq/libpq.h"
|
|
#include "libpq/pqformat.h"
|
|
#include "miscadmin.h"
|
|
#include "optimizer/optimizer.h"
|
|
#include "pgstat.h"
|
|
#include "rewrite/rewriteHandler.h"
|
|
#include "storage/fd.h"
|
|
#include "tcop/tcopprot.h"
|
|
#include "utils/lsyscache.h"
|
|
#include "utils/memutils.h"
|
|
#include "utils/portal.h"
|
|
#include "utils/rel.h"
|
|
#include "utils/snapmgr.h"
|
|
|
|
/*
|
|
* No more than this many tuples per CopyMultiInsertBuffer
|
|
*
|
|
* Caution: Don't make this too big, as we could end up with this many
|
|
* CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
|
|
* multiInsertBuffers list. Increasing this can cause quadratic growth in
|
|
* memory requirements during copies into partitioned tables with a large
|
|
* number of partitions.
|
|
*/
|
|
#define MAX_BUFFERED_TUPLES 1000
|
|
|
|
/*
|
|
* Flush buffers if there are >= this many bytes, as counted by the input
|
|
* size, of tuples stored.
|
|
*/
|
|
#define MAX_BUFFERED_BYTES 65535
|
|
|
|
/* Trim the list of buffers back down to this number after flushing */
|
|
#define MAX_PARTITION_BUFFERS 32
|
|
|
|
/* Stores multi-insert data related to a single relation in CopyFrom. */
|
|
typedef struct CopyMultiInsertBuffer
|
|
{
|
|
TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
|
|
ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
|
|
BulkInsertState bistate; /* BulkInsertState for this rel if plain
|
|
* table; NULL if foreign table */
|
|
int nused; /* number of 'slots' containing tuples */
|
|
uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
|
|
* stream */
|
|
} CopyMultiInsertBuffer;
|
|
|
|
/*
|
|
* Stores one or many CopyMultiInsertBuffers and details about the size and
|
|
* number of tuples which are stored in them. This allows multiple buffers to
|
|
* exist at once when COPYing into a partitioned table.
|
|
*/
|
|
typedef struct CopyMultiInsertInfo
|
|
{
|
|
List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
|
|
int bufferedTuples; /* number of tuples buffered over all buffers */
|
|
int bufferedBytes; /* number of bytes from all buffered tuples */
|
|
CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */
|
|
EState *estate; /* Executor state used for COPY */
|
|
CommandId mycid; /* Command Id used for COPY */
|
|
int ti_options; /* table insert options */
|
|
} CopyMultiInsertInfo;
|
|
|
|
|
|
/* non-export function prototypes */
|
|
static char *limit_printout_length(const char *str);
|
|
|
|
static void ClosePipeFromProgram(CopyFromState cstate);
|
|
|
|
/*
|
|
* error context callback for COPY FROM
|
|
*
|
|
* The argument for the error context must be CopyFromState.
|
|
*/
|
|
void
|
|
CopyFromErrorCallback(void *arg)
|
|
{
|
|
CopyFromState cstate = (CopyFromState) arg;
|
|
|
|
if (cstate->relname_only)
|
|
{
|
|
errcontext("COPY %s",
|
|
cstate->cur_relname);
|
|
return;
|
|
}
|
|
if (cstate->opts.binary)
|
|
{
|
|
/* can't usefully display the data */
|
|
if (cstate->cur_attname)
|
|
errcontext("COPY %s, line %llu, column %s",
|
|
cstate->cur_relname,
|
|
(unsigned long long) cstate->cur_lineno,
|
|
cstate->cur_attname);
|
|
else
|
|
errcontext("COPY %s, line %llu",
|
|
cstate->cur_relname,
|
|
(unsigned long long) cstate->cur_lineno);
|
|
}
|
|
else
|
|
{
|
|
if (cstate->cur_attname && cstate->cur_attval)
|
|
{
|
|
/* error is relevant to a particular column */
|
|
char *attval;
|
|
|
|
attval = limit_printout_length(cstate->cur_attval);
|
|
errcontext("COPY %s, line %llu, column %s: \"%s\"",
|
|
cstate->cur_relname,
|
|
(unsigned long long) cstate->cur_lineno,
|
|
cstate->cur_attname,
|
|
attval);
|
|
pfree(attval);
|
|
}
|
|
else if (cstate->cur_attname)
|
|
{
|
|
/* error is relevant to a particular column, value is NULL */
|
|
errcontext("COPY %s, line %llu, column %s: null input",
|
|
cstate->cur_relname,
|
|
(unsigned long long) cstate->cur_lineno,
|
|
cstate->cur_attname);
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
* Error is relevant to a particular line.
|
|
*
|
|
* If line_buf still contains the correct line, print it.
|
|
*/
|
|
if (cstate->line_buf_valid)
|
|
{
|
|
char *lineval;
|
|
|
|
lineval = limit_printout_length(cstate->line_buf.data);
|
|
errcontext("COPY %s, line %llu: \"%s\"",
|
|
cstate->cur_relname,
|
|
(unsigned long long) cstate->cur_lineno, lineval);
|
|
pfree(lineval);
|
|
}
|
|
else
|
|
{
|
|
errcontext("COPY %s, line %llu",
|
|
cstate->cur_relname,
|
|
(unsigned long long) cstate->cur_lineno);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Make sure we don't print an unreasonable amount of COPY data in a message.
|
|
*
|
|
* Returns a pstrdup'd copy of the input.
|
|
*/
|
|
static char *
|
|
limit_printout_length(const char *str)
|
|
{
|
|
#define MAX_COPY_DATA_DISPLAY 100
|
|
|
|
int slen = strlen(str);
|
|
int len;
|
|
char *res;
|
|
|
|
/* Fast path if definitely okay */
|
|
if (slen <= MAX_COPY_DATA_DISPLAY)
|
|
return pstrdup(str);
|
|
|
|
/* Apply encoding-dependent truncation */
|
|
len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
|
|
|
|
/*
|
|
* Truncate, and add "..." to show we truncated the input.
|
|
*/
|
|
res = (char *) palloc(len + 4);
|
|
memcpy(res, str, len);
|
|
strcpy(res + len, "...");
|
|
|
|
return res;
|
|
}
|
|
|
|
/*
|
|
* Allocate memory and initialize a new CopyMultiInsertBuffer for this
|
|
* ResultRelInfo.
|
|
*/
|
|
static CopyMultiInsertBuffer *
|
|
CopyMultiInsertBufferInit(ResultRelInfo *rri)
|
|
{
|
|
CopyMultiInsertBuffer *buffer;
|
|
|
|
buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
|
|
memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
|
|
buffer->resultRelInfo = rri;
|
|
buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
|
|
buffer->nused = 0;
|
|
|
|
return buffer;
|
|
}
|
|
|
|
/*
|
|
* Make a new buffer for this ResultRelInfo.
|
|
*/
|
|
static inline void
|
|
CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
|
|
ResultRelInfo *rri)
|
|
{
|
|
CopyMultiInsertBuffer *buffer;
|
|
|
|
buffer = CopyMultiInsertBufferInit(rri);
|
|
|
|
/* Setup back-link so we can easily find this buffer again */
|
|
rri->ri_CopyMultiInsertBuffer = buffer;
|
|
/* Record that we're tracking this buffer */
|
|
miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
|
|
}
|
|
|
|
/*
|
|
* Initialize an already allocated CopyMultiInsertInfo.
|
|
*
|
|
* If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
|
|
* for that table.
|
|
*/
|
|
static void
|
|
CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
|
|
CopyFromState cstate, EState *estate, CommandId mycid,
|
|
int ti_options)
|
|
{
|
|
miinfo->multiInsertBuffers = NIL;
|
|
miinfo->bufferedTuples = 0;
|
|
miinfo->bufferedBytes = 0;
|
|
miinfo->cstate = cstate;
|
|
miinfo->estate = estate;
|
|
miinfo->mycid = mycid;
|
|
miinfo->ti_options = ti_options;
|
|
|
|
/*
|
|
* Only setup the buffer when not dealing with a partitioned table.
|
|
* Buffers for partitioned tables will just be setup when we need to send
|
|
* tuples their way for the first time.
|
|
*/
|
|
if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
|
|
CopyMultiInsertInfoSetupBuffer(miinfo, rri);
|
|
}
|
|
|
|
/*
|
|
* Returns true if the buffers are full
|
|
*/
|
|
static inline bool
|
|
CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
|
|
{
|
|
if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
|
|
miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
|
|
return true;
|
|
return false;
|
|
}
|
|
|
|
/*
|
|
* Returns true if we have no buffered tuples
|
|
*/
|
|
static inline bool
|
|
CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
|
|
{
|
|
return miinfo->bufferedTuples == 0;
|
|
}
|
|
|
|
/*
|
|
* Write the tuples stored in 'buffer' out to the table.
|
|
*/
|
|
static inline void
|
|
CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
|
|
CopyMultiInsertBuffer *buffer,
|
|
int64 *processed)
|
|
{
|
|
CopyFromState cstate = miinfo->cstate;
|
|
EState *estate = miinfo->estate;
|
|
int nused = buffer->nused;
|
|
ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
|
|
TupleTableSlot **slots = buffer->slots;
|
|
int i;
|
|
|
|
if (resultRelInfo->ri_FdwRoutine)
|
|
{
|
|
int batch_size = resultRelInfo->ri_BatchSize;
|
|
int sent = 0;
|
|
|
|
Assert(buffer->bistate == NULL);
|
|
|
|
/* Ensure that the FDW supports batching and it's enabled */
|
|
Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
|
|
Assert(batch_size > 1);
|
|
|
|
/*
|
|
* We suppress error context information other than the relation name,
|
|
* if one of the operations below fails.
|
|
*/
|
|
Assert(!cstate->relname_only);
|
|
cstate->relname_only = true;
|
|
|
|
while (sent < nused)
|
|
{
|
|
int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
|
|
int inserted = size;
|
|
TupleTableSlot **rslots;
|
|
|
|
/* insert into foreign table: let the FDW do it */
|
|
rslots =
|
|
resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
|
|
resultRelInfo,
|
|
&slots[sent],
|
|
NULL,
|
|
&inserted);
|
|
|
|
sent += size;
|
|
|
|
/* No need to do anything if there are no inserted rows */
|
|
if (inserted <= 0)
|
|
continue;
|
|
|
|
/* Triggers on foreign tables should not have transition tables */
|
|
Assert(resultRelInfo->ri_TrigDesc == NULL ||
|
|
resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
|
|
|
|
/* Run AFTER ROW INSERT triggers */
|
|
if (resultRelInfo->ri_TrigDesc != NULL &&
|
|
resultRelInfo->ri_TrigDesc->trig_insert_after_row)
|
|
{
|
|
Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
|
|
|
|
for (i = 0; i < inserted; i++)
|
|
{
|
|
TupleTableSlot *slot = rslots[i];
|
|
|
|
/*
|
|
* AFTER ROW Triggers might reference the tableoid column,
|
|
* so (re-)initialize tts_tableOid before evaluating them.
|
|
*/
|
|
slot->tts_tableOid = relid;
|
|
|
|
ExecARInsertTriggers(estate, resultRelInfo,
|
|
slot, NIL,
|
|
cstate->transition_capture);
|
|
}
|
|
}
|
|
|
|
/* Update the row counter and progress of the COPY command */
|
|
*processed += inserted;
|
|
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
|
|
*processed);
|
|
}
|
|
|
|
for (i = 0; i < nused; i++)
|
|
ExecClearTuple(slots[i]);
|
|
|
|
/* reset relname_only */
|
|
cstate->relname_only = false;
|
|
}
|
|
else
|
|
{
|
|
CommandId mycid = miinfo->mycid;
|
|
int ti_options = miinfo->ti_options;
|
|
bool line_buf_valid = cstate->line_buf_valid;
|
|
uint64 save_cur_lineno = cstate->cur_lineno;
|
|
MemoryContext oldcontext;
|
|
|
|
Assert(buffer->bistate != NULL);
|
|
|
|
/*
|
|
* Print error context information correctly, if one of the operations
|
|
* below fails.
|
|
*/
|
|
cstate->line_buf_valid = false;
|
|
|
|
/*
|
|
* table_multi_insert may leak memory, so switch to short-lived memory
|
|
* context before calling it.
|
|
*/
|
|
oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
|
|
table_multi_insert(resultRelInfo->ri_RelationDesc,
|
|
slots,
|
|
nused,
|
|
mycid,
|
|
ti_options,
|
|
buffer->bistate);
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
for (i = 0; i < nused; i++)
|
|
{
|
|
/*
|
|
* If there are any indexes, update them for all the inserted
|
|
* tuples, and run AFTER ROW INSERT triggers.
|
|
*/
|
|
if (resultRelInfo->ri_NumIndices > 0)
|
|
{
|
|
List *recheckIndexes;
|
|
|
|
cstate->cur_lineno = buffer->linenos[i];
|
|
recheckIndexes =
|
|
ExecInsertIndexTuples(resultRelInfo,
|
|
buffer->slots[i], estate, false,
|
|
false, NULL, NIL, false);
|
|
ExecARInsertTriggers(estate, resultRelInfo,
|
|
slots[i], recheckIndexes,
|
|
cstate->transition_capture);
|
|
list_free(recheckIndexes);
|
|
}
|
|
|
|
/*
|
|
* There's no indexes, but see if we need to run AFTER ROW INSERT
|
|
* triggers anyway.
|
|
*/
|
|
else if (resultRelInfo->ri_TrigDesc != NULL &&
|
|
(resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
|
|
resultRelInfo->ri_TrigDesc->trig_insert_new_table))
|
|
{
|
|
cstate->cur_lineno = buffer->linenos[i];
|
|
ExecARInsertTriggers(estate, resultRelInfo,
|
|
slots[i], NIL,
|
|
cstate->transition_capture);
|
|
}
|
|
|
|
ExecClearTuple(slots[i]);
|
|
}
|
|
|
|
/* Update the row counter and progress of the COPY command */
|
|
*processed += nused;
|
|
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
|
|
*processed);
|
|
|
|
/* reset cur_lineno and line_buf_valid to what they were */
|
|
cstate->line_buf_valid = line_buf_valid;
|
|
cstate->cur_lineno = save_cur_lineno;
|
|
}
|
|
|
|
/* Mark that all slots are free */
|
|
buffer->nused = 0;
|
|
}
|
|
|
|
/*
|
|
* Drop used slots and free member for this buffer.
|
|
*
|
|
* The buffer must be flushed before cleanup.
|
|
*/
|
|
static inline void
|
|
CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
|
|
CopyMultiInsertBuffer *buffer)
|
|
{
|
|
ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
|
|
int i;
|
|
|
|
/* Ensure buffer was flushed */
|
|
Assert(buffer->nused == 0);
|
|
|
|
/* Remove back-link to ourself */
|
|
resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
|
|
|
|
if (resultRelInfo->ri_FdwRoutine == NULL)
|
|
{
|
|
Assert(buffer->bistate != NULL);
|
|
FreeBulkInsertState(buffer->bistate);
|
|
}
|
|
else
|
|
Assert(buffer->bistate == NULL);
|
|
|
|
/* Since we only create slots on demand, just drop the non-null ones. */
|
|
for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
|
|
ExecDropSingleTupleTableSlot(buffer->slots[i]);
|
|
|
|
if (resultRelInfo->ri_FdwRoutine == NULL)
|
|
table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
|
|
miinfo->ti_options);
|
|
|
|
pfree(buffer);
|
|
}
|
|
|
|
/*
|
|
* Write out all stored tuples in all buffers out to the tables.
|
|
*
|
|
* Once flushed we also trim the tracked buffers list down to size by removing
|
|
* the buffers created earliest first.
|
|
*
|
|
* Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
|
|
* used. When cleaning up old buffers we'll never remove the one for
|
|
* 'curr_rri'.
|
|
*/
|
|
static inline void
|
|
CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
|
|
int64 *processed)
|
|
{
|
|
ListCell *lc;
|
|
|
|
foreach(lc, miinfo->multiInsertBuffers)
|
|
{
|
|
CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
|
|
|
|
CopyMultiInsertBufferFlush(miinfo, buffer, processed);
|
|
}
|
|
|
|
miinfo->bufferedTuples = 0;
|
|
miinfo->bufferedBytes = 0;
|
|
|
|
/*
|
|
* Trim the list of tracked buffers down if it exceeds the limit. Here we
|
|
* remove buffers starting with the ones we created first. It seems less
|
|
* likely that these older ones will be needed than the ones that were
|
|
* just created.
|
|
*/
|
|
while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
|
|
{
|
|
CopyMultiInsertBuffer *buffer;
|
|
|
|
buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
|
|
|
|
/*
|
|
* We never want to remove the buffer that's currently being used, so
|
|
* if we happen to find that then move it to the end of the list.
|
|
*/
|
|
if (buffer->resultRelInfo == curr_rri)
|
|
{
|
|
miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
|
|
miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
|
|
buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
|
|
}
|
|
|
|
CopyMultiInsertBufferCleanup(miinfo, buffer);
|
|
miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Cleanup allocated buffers and free memory
|
|
*/
|
|
static inline void
|
|
CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
|
|
{
|
|
ListCell *lc;
|
|
|
|
foreach(lc, miinfo->multiInsertBuffers)
|
|
CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
|
|
|
|
list_free(miinfo->multiInsertBuffers);
|
|
}
|
|
|
|
/*
|
|
* Get the next TupleTableSlot that the next tuple should be stored in.
|
|
*
|
|
* Callers must ensure that the buffer is not full.
|
|
*
|
|
* Note: 'miinfo' is unused but has been included for consistency with the
|
|
* other functions in this area.
|
|
*/
|
|
static inline TupleTableSlot *
|
|
CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
|
|
ResultRelInfo *rri)
|
|
{
|
|
CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
|
|
int nused = buffer->nused;
|
|
|
|
Assert(buffer != NULL);
|
|
Assert(nused < MAX_BUFFERED_TUPLES);
|
|
|
|
if (buffer->slots[nused] == NULL)
|
|
buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
|
|
return buffer->slots[nused];
|
|
}
|
|
|
|
/*
|
|
* Record the previously reserved TupleTableSlot that was reserved by
|
|
* CopyMultiInsertInfoNextFreeSlot as being consumed.
|
|
*/
|
|
static inline void
|
|
CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
|
|
TupleTableSlot *slot, int tuplen, uint64 lineno)
|
|
{
|
|
CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
|
|
|
|
Assert(buffer != NULL);
|
|
Assert(slot == buffer->slots[buffer->nused]);
|
|
|
|
/* Store the line number so we can properly report any errors later */
|
|
buffer->linenos[buffer->nused] = lineno;
|
|
|
|
/* Record this slot as being used */
|
|
buffer->nused++;
|
|
|
|
/* Update how many tuples are stored and their size */
|
|
miinfo->bufferedTuples++;
|
|
miinfo->bufferedBytes += tuplen;
|
|
}
|
|
|
|
/*
|
|
* Copy FROM file to relation.
|
|
*/
|
|
uint64
|
|
CopyFrom(CopyFromState cstate)
|
|
{
|
|
ResultRelInfo *resultRelInfo;
|
|
ResultRelInfo *target_resultRelInfo;
|
|
ResultRelInfo *prevResultRelInfo = NULL;
|
|
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
|
|
ModifyTableState *mtstate;
|
|
ExprContext *econtext;
|
|
TupleTableSlot *singleslot = NULL;
|
|
MemoryContext oldcontext = CurrentMemoryContext;
|
|
|
|
PartitionTupleRouting *proute = NULL;
|
|
ErrorContextCallback errcallback;
|
|
CommandId mycid = GetCurrentCommandId(true);
|
|
int ti_options = 0; /* start with default options for insert */
|
|
BulkInsertState bistate = NULL;
|
|
CopyInsertMethod insertMethod;
|
|
CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
|
|
int64 processed = 0;
|
|
int64 excluded = 0;
|
|
bool has_before_insert_row_trig;
|
|
bool has_instead_insert_row_trig;
|
|
bool leafpart_use_multi_insert = false;
|
|
|
|
Assert(cstate->rel);
|
|
Assert(list_length(cstate->range_table) == 1);
|
|
|
|
/*
|
|
* The target must be a plain, foreign, or partitioned relation, or have
|
|
* an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
|
|
* allowed on views, so we only hint about them in the view case.)
|
|
*/
|
|
if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
|
|
cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
|
|
cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
|
|
!(cstate->rel->trigdesc &&
|
|
cstate->rel->trigdesc->trig_insert_instead_row))
|
|
{
|
|
if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
|
errmsg("cannot copy to view \"%s\"",
|
|
RelationGetRelationName(cstate->rel)),
|
|
errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
|
|
else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
|
errmsg("cannot copy to materialized view \"%s\"",
|
|
RelationGetRelationName(cstate->rel))));
|
|
else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
|
errmsg("cannot copy to sequence \"%s\"",
|
|
RelationGetRelationName(cstate->rel))));
|
|
else
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
|
errmsg("cannot copy to non-table relation \"%s\"",
|
|
RelationGetRelationName(cstate->rel))));
|
|
}
|
|
|
|
/*
|
|
* If the target file is new-in-transaction, we assume that checking FSM
|
|
* for free space is a waste of time. This could possibly be wrong, but
|
|
* it's unlikely.
|
|
*/
|
|
if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
|
|
(cstate->rel->rd_createSubid != InvalidSubTransactionId ||
|
|
cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
|
|
ti_options |= TABLE_INSERT_SKIP_FSM;
|
|
|
|
/*
|
|
* Optimize if new relation storage was created in this subxact or one of
|
|
* its committed children and we won't see those rows later as part of an
|
|
* earlier scan or command. The subxact test ensures that if this subxact
|
|
* aborts then the frozen rows won't be visible after xact cleanup. Note
|
|
* that the stronger test of exactly which subtransaction created it is
|
|
* crucial for correctness of this optimization. The test for an earlier
|
|
* scan or command tolerates false negatives. FREEZE causes other sessions
|
|
* to see rows they would not see under MVCC, and a false negative merely
|
|
* spreads that anomaly to the current session.
|
|
*/
|
|
if (cstate->opts.freeze)
|
|
{
|
|
/*
|
|
* We currently disallow COPY FREEZE on partitioned tables. The
|
|
* reason for this is that we've simply not yet opened the partitions
|
|
* to determine if the optimization can be applied to them. We could
|
|
* go and open them all here, but doing so may be quite a costly
|
|
* overhead for small copies. In any case, we may just end up routing
|
|
* tuples to a small number of partitions. It seems better just to
|
|
* raise an ERROR for partitioned tables.
|
|
*/
|
|
if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
|
|
{
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("cannot perform COPY FREEZE on a partitioned table")));
|
|
}
|
|
|
|
/*
|
|
* Tolerate one registration for the benefit of FirstXactSnapshot.
|
|
* Scan-bearing queries generally create at least two registrations,
|
|
* though relying on that is fragile, as is ignoring ActiveSnapshot.
|
|
* Clear CatalogSnapshot to avoid counting its registration. We'll
|
|
* still detect ongoing catalog scans, each of which separately
|
|
* registers the snapshot it uses.
|
|
*/
|
|
InvalidateCatalogSnapshot();
|
|
if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
|
|
errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
|
|
|
|
if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
|
|
cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
|
|
|
|
ti_options |= TABLE_INSERT_FROZEN;
|
|
}
|
|
|
|
/*
|
|
* We need a ResultRelInfo so we can use the regular executor's
|
|
* index-entry-making machinery. (There used to be a huge amount of code
|
|
* here that basically duplicated execUtils.c ...)
|
|
*/
|
|
ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos);
|
|
resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
|
|
ExecInitResultRelation(estate, resultRelInfo, 1);
|
|
|
|
/* Verify the named relation is a valid target for INSERT */
|
|
CheckValidResultRel(resultRelInfo, CMD_INSERT);
|
|
|
|
ExecOpenIndices(resultRelInfo, false);
|
|
|
|
/*
|
|
* Set up a ModifyTableState so we can let FDW(s) init themselves for
|
|
* foreign-table result relation(s).
|
|
*/
|
|
mtstate = makeNode(ModifyTableState);
|
|
mtstate->ps.plan = NULL;
|
|
mtstate->ps.state = estate;
|
|
mtstate->operation = CMD_INSERT;
|
|
mtstate->mt_nrels = 1;
|
|
mtstate->resultRelInfo = resultRelInfo;
|
|
mtstate->rootResultRelInfo = resultRelInfo;
|
|
|
|
if (resultRelInfo->ri_FdwRoutine != NULL &&
|
|
resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
|
|
resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
|
|
resultRelInfo);
|
|
|
|
/*
|
|
* Also, if the named relation is a foreign table, determine if the FDW
|
|
* supports batch insert and determine the batch size (a FDW may support
|
|
* batching, but it may be disabled for the server/table).
|
|
*
|
|
* If the FDW does not support batching, we set the batch size to 1.
|
|
*/
|
|
if (resultRelInfo->ri_FdwRoutine != NULL &&
|
|
resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
|
|
resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
|
|
resultRelInfo->ri_BatchSize =
|
|
resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
|
|
else
|
|
resultRelInfo->ri_BatchSize = 1;
|
|
|
|
Assert(resultRelInfo->ri_BatchSize >= 1);
|
|
|
|
/* Prepare to catch AFTER triggers. */
|
|
AfterTriggerBeginQuery();
|
|
|
|
/*
|
|
* If there are any triggers with transition tables on the named relation,
|
|
* we need to be prepared to capture transition tuples.
|
|
*
|
|
* Because partition tuple routing would like to know about whether
|
|
* transition capture is active, we also set it in mtstate, which is
|
|
* passed to ExecFindPartition() below.
|
|
*/
|
|
cstate->transition_capture = mtstate->mt_transition_capture =
|
|
MakeTransitionCaptureState(cstate->rel->trigdesc,
|
|
RelationGetRelid(cstate->rel),
|
|
CMD_INSERT);
|
|
|
|
/*
|
|
* If the named relation is a partitioned table, initialize state for
|
|
* CopyFrom tuple routing.
|
|
*/
|
|
if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
|
|
proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
|
|
|
|
if (cstate->whereClause)
|
|
cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
|
|
&mtstate->ps);
|
|
|
|
/*
|
|
* It's generally more efficient to prepare a bunch of tuples for
|
|
* insertion, and insert them in one
|
|
* table_multi_insert()/ExecForeignBatchInsert() call, than call
|
|
* table_tuple_insert()/ExecForeignInsert() separately for every tuple.
|
|
* However, there are a number of reasons why we might not be able to do
|
|
* this. These are explained below.
|
|
*/
|
|
if (resultRelInfo->ri_TrigDesc != NULL &&
|
|
(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
|
|
resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
|
|
{
|
|
/*
|
|
* Can't support multi-inserts when there are any BEFORE/INSTEAD OF
|
|
* triggers on the table. Such triggers might query the table we're
|
|
* inserting into and act differently if the tuples that have already
|
|
* been processed and prepared for insertion are not there.
|
|
*/
|
|
insertMethod = CIM_SINGLE;
|
|
}
|
|
else if (resultRelInfo->ri_FdwRoutine != NULL &&
|
|
resultRelInfo->ri_BatchSize == 1)
|
|
{
|
|
/*
|
|
* Can't support multi-inserts to a foreign table if the FDW does not
|
|
* support batching, or it's disabled for the server or foreign table.
|
|
*/
|
|
insertMethod = CIM_SINGLE;
|
|
}
|
|
else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
|
|
resultRelInfo->ri_TrigDesc->trig_insert_new_table)
|
|
{
|
|
/*
|
|
* For partitioned tables we can't support multi-inserts when there
|
|
* are any statement level insert triggers. It might be possible to
|
|
* allow partitioned tables with such triggers in the future, but for
|
|
* now, CopyMultiInsertInfoFlush expects that any after row insert and
|
|
* statement level insert triggers are on the same relation.
|
|
*/
|
|
insertMethod = CIM_SINGLE;
|
|
}
|
|
else if (cstate->volatile_defexprs)
|
|
{
|
|
/*
|
|
* Can't support multi-inserts if there are any volatile default
|
|
* expressions in the table. Similarly to the trigger case above,
|
|
* such expressions may query the table we're inserting into.
|
|
*
|
|
* Note: It does not matter if any partitions have any volatile
|
|
* default expressions as we use the defaults from the target of the
|
|
* COPY command.
|
|
*/
|
|
insertMethod = CIM_SINGLE;
|
|
}
|
|
else if (contain_volatile_functions(cstate->whereClause))
|
|
{
|
|
/*
|
|
* Can't support multi-inserts if there are any volatile function
|
|
* expressions in WHERE clause. Similarly to the trigger case above,
|
|
* such expressions may query the table we're inserting into.
|
|
*
|
|
* Note: the whereClause was already preprocessed in DoCopy(), so it's
|
|
* okay to use contain_volatile_functions() directly.
|
|
*/
|
|
insertMethod = CIM_SINGLE;
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
* For partitioned tables, we may still be able to perform bulk
|
|
* inserts. However, the possibility of this depends on which types
|
|
* of triggers exist on the partition. We must disable bulk inserts
|
|
* if the partition is a foreign table that can't use batching or it
|
|
* has any before row insert or insert instead triggers (same as we
|
|
* checked above for the parent table). Since the partition's
|
|
* resultRelInfos are initialized only when we actually need to insert
|
|
* the first tuple into them, we must have the intermediate insert
|
|
* method of CIM_MULTI_CONDITIONAL to flag that we must later
|
|
* determine if we can use bulk-inserts for the partition being
|
|
* inserted into.
|
|
*/
|
|
if (proute)
|
|
insertMethod = CIM_MULTI_CONDITIONAL;
|
|
else
|
|
insertMethod = CIM_MULTI;
|
|
|
|
CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
|
|
estate, mycid, ti_options);
|
|
}
|
|
|
|
/*
|
|
* If not using batch mode (which allocates slots as needed) set up a
|
|
* tuple slot too. When inserting into a partitioned table, we also need
|
|
* one, even if we might batch insert, to read the tuple in the root
|
|
* partition's form.
|
|
*/
|
|
if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
|
|
{
|
|
singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
|
|
&estate->es_tupleTable);
|
|
bistate = GetBulkInsertState();
|
|
}
|
|
|
|
has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
|
|
resultRelInfo->ri_TrigDesc->trig_insert_before_row);
|
|
|
|
has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
|
|
resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
|
|
|
|
/*
|
|
* Check BEFORE STATEMENT insertion triggers. It's debatable whether we
|
|
* should do this for COPY, since it's not really an "INSERT" statement as
|
|
* such. However, executing these triggers maintains consistency with the
|
|
* EACH ROW triggers that we already fire on COPY.
|
|
*/
|
|
ExecBSInsertTriggers(estate, resultRelInfo);
|
|
|
|
econtext = GetPerTupleExprContext(estate);
|
|
|
|
/* Set up callback to identify error line number */
|
|
errcallback.callback = CopyFromErrorCallback;
|
|
errcallback.arg = (void *) cstate;
|
|
errcallback.previous = error_context_stack;
|
|
error_context_stack = &errcallback;
|
|
|
|
for (;;)
|
|
{
|
|
TupleTableSlot *myslot;
|
|
bool skip_tuple;
|
|
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
/*
|
|
* Reset the per-tuple exprcontext. We do this after every tuple, to
|
|
* clean-up after expression evaluations etc.
|
|
*/
|
|
ResetPerTupleExprContext(estate);
|
|
|
|
/* select slot to (initially) load row into */
|
|
if (insertMethod == CIM_SINGLE || proute)
|
|
{
|
|
myslot = singleslot;
|
|
Assert(myslot != NULL);
|
|
}
|
|
else
|
|
{
|
|
Assert(resultRelInfo == target_resultRelInfo);
|
|
Assert(insertMethod == CIM_MULTI);
|
|
|
|
myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
|
|
resultRelInfo);
|
|
}
|
|
|
|
/*
|
|
* Switch to per-tuple context before calling NextCopyFrom, which does
|
|
* evaluate default expressions etc. and requires per-tuple context.
|
|
*/
|
|
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
|
|
|
|
ExecClearTuple(myslot);
|
|
|
|
/* Directly store the values/nulls array in the slot */
|
|
if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
|
|
break;
|
|
|
|
ExecStoreVirtualTuple(myslot);
|
|
|
|
/*
|
|
* Constraints and where clause might reference the tableoid column,
|
|
* so (re-)initialize tts_tableOid before evaluating them.
|
|
*/
|
|
myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
|
|
|
|
/* Triggers and stuff need to be invoked in query context. */
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
if (cstate->whereClause)
|
|
{
|
|
econtext->ecxt_scantuple = myslot;
|
|
/* Skip items that don't match COPY's WHERE clause */
|
|
if (!ExecQual(cstate->qualexpr, econtext))
|
|
{
|
|
/*
|
|
* Report that this tuple was filtered out by the WHERE
|
|
* clause.
|
|
*/
|
|
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
|
|
++excluded);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
/* Determine the partition to insert the tuple into */
|
|
if (proute)
|
|
{
|
|
TupleConversionMap *map;
|
|
|
|
/*
|
|
* Attempt to find a partition suitable for this tuple.
|
|
* ExecFindPartition() will raise an error if none can be found or
|
|
* if the found partition is not suitable for INSERTs.
|
|
*/
|
|
resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
|
|
proute, myslot, estate);
|
|
|
|
if (prevResultRelInfo != resultRelInfo)
|
|
{
|
|
/* Determine which triggers exist on this partition */
|
|
has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
|
|
resultRelInfo->ri_TrigDesc->trig_insert_before_row);
|
|
|
|
has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
|
|
resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
|
|
|
|
/*
|
|
* Disable multi-inserts when the partition has BEFORE/INSTEAD
|
|
* OF triggers, or if the partition is a foreign table that
|
|
* can't use batching.
|
|
*/
|
|
leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
|
|
!has_before_insert_row_trig &&
|
|
!has_instead_insert_row_trig &&
|
|
(resultRelInfo->ri_FdwRoutine == NULL ||
|
|
resultRelInfo->ri_BatchSize > 1);
|
|
|
|
/* Set the multi-insert buffer to use for this partition. */
|
|
if (leafpart_use_multi_insert)
|
|
{
|
|
if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
|
|
CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
|
|
resultRelInfo);
|
|
}
|
|
else if (insertMethod == CIM_MULTI_CONDITIONAL &&
|
|
!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
|
|
{
|
|
/*
|
|
* Flush pending inserts if this partition can't use
|
|
* batching, so rows are visible to triggers etc.
|
|
*/
|
|
CopyMultiInsertInfoFlush(&multiInsertInfo,
|
|
resultRelInfo,
|
|
&processed);
|
|
}
|
|
|
|
if (bistate != NULL)
|
|
ReleaseBulkInsertStatePin(bistate);
|
|
prevResultRelInfo = resultRelInfo;
|
|
}
|
|
|
|
/*
|
|
* If we're capturing transition tuples, we might need to convert
|
|
* from the partition rowtype to root rowtype. But if there are no
|
|
* BEFORE triggers on the partition that could change the tuple,
|
|
* we can just remember the original unconverted tuple to avoid a
|
|
* needless round trip conversion.
|
|
*/
|
|
if (cstate->transition_capture != NULL)
|
|
cstate->transition_capture->tcs_original_insert_tuple =
|
|
!has_before_insert_row_trig ? myslot : NULL;
|
|
|
|
/*
|
|
* We might need to convert from the root rowtype to the partition
|
|
* rowtype.
|
|
*/
|
|
map = ExecGetRootToChildMap(resultRelInfo, estate);
|
|
if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
|
|
{
|
|
/* non batch insert */
|
|
if (map != NULL)
|
|
{
|
|
TupleTableSlot *new_slot;
|
|
|
|
new_slot = resultRelInfo->ri_PartitionTupleSlot;
|
|
myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
* Prepare to queue up tuple for later batch insert into
|
|
* current partition.
|
|
*/
|
|
TupleTableSlot *batchslot;
|
|
|
|
/* no other path available for partitioned table */
|
|
Assert(insertMethod == CIM_MULTI_CONDITIONAL);
|
|
|
|
batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
|
|
resultRelInfo);
|
|
|
|
if (map != NULL)
|
|
myslot = execute_attr_map_slot(map->attrMap, myslot,
|
|
batchslot);
|
|
else
|
|
{
|
|
/*
|
|
* This looks more expensive than it is (Believe me, I
|
|
* optimized it away. Twice.). The input is in virtual
|
|
* form, and we'll materialize the slot below - for most
|
|
* slot types the copy performs the work materialization
|
|
* would later require anyway.
|
|
*/
|
|
ExecCopySlot(batchslot, myslot);
|
|
myslot = batchslot;
|
|
}
|
|
}
|
|
|
|
/* ensure that triggers etc see the right relation */
|
|
myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
|
|
}
|
|
|
|
skip_tuple = false;
|
|
|
|
/* BEFORE ROW INSERT Triggers */
|
|
if (has_before_insert_row_trig)
|
|
{
|
|
if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
|
|
skip_tuple = true; /* "do nothing" */
|
|
}
|
|
|
|
if (!skip_tuple)
|
|
{
|
|
/*
|
|
* If there is an INSTEAD OF INSERT ROW trigger, let it handle the
|
|
* tuple. Otherwise, proceed with inserting the tuple into the
|
|
* table or foreign table.
|
|
*/
|
|
if (has_instead_insert_row_trig)
|
|
{
|
|
ExecIRInsertTriggers(estate, resultRelInfo, myslot);
|
|
}
|
|
else
|
|
{
|
|
/* Compute stored generated columns */
|
|
if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
|
|
resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
|
|
ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
|
|
CMD_INSERT);
|
|
|
|
/*
|
|
* If the target is a plain table, check the constraints of
|
|
* the tuple.
|
|
*/
|
|
if (resultRelInfo->ri_FdwRoutine == NULL &&
|
|
resultRelInfo->ri_RelationDesc->rd_att->constr)
|
|
ExecConstraints(resultRelInfo, myslot, estate);
|
|
|
|
/*
|
|
* Also check the tuple against the partition constraint, if
|
|
* there is one; except that if we got here via tuple-routing,
|
|
* we don't need to if there's no BR trigger defined on the
|
|
* partition.
|
|
*/
|
|
if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
|
|
(proute == NULL || has_before_insert_row_trig))
|
|
ExecPartitionCheck(resultRelInfo, myslot, estate, true);
|
|
|
|
/* Store the slot in the multi-insert buffer, when enabled. */
|
|
if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
|
|
{
|
|
/*
|
|
* The slot previously might point into the per-tuple
|
|
* context. For batching it needs to be longer lived.
|
|
*/
|
|
ExecMaterializeSlot(myslot);
|
|
|
|
/* Add this tuple to the tuple buffer */
|
|
CopyMultiInsertInfoStore(&multiInsertInfo,
|
|
resultRelInfo, myslot,
|
|
cstate->line_buf.len,
|
|
cstate->cur_lineno);
|
|
|
|
/*
|
|
* If enough inserts have queued up, then flush all
|
|
* buffers out to their tables.
|
|
*/
|
|
if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
|
|
CopyMultiInsertInfoFlush(&multiInsertInfo,
|
|
resultRelInfo,
|
|
&processed);
|
|
|
|
/*
|
|
* We delay updating the row counter and progress of the
|
|
* COPY command until after writing the tuples stored in
|
|
* the buffer out to the table, as in single insert mode.
|
|
* See CopyMultiInsertBufferFlush().
|
|
*/
|
|
continue; /* next tuple please */
|
|
}
|
|
else
|
|
{
|
|
List *recheckIndexes = NIL;
|
|
|
|
/* OK, store the tuple */
|
|
if (resultRelInfo->ri_FdwRoutine != NULL)
|
|
{
|
|
myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
|
|
resultRelInfo,
|
|
myslot,
|
|
NULL);
|
|
|
|
if (myslot == NULL) /* "do nothing" */
|
|
continue; /* next tuple please */
|
|
|
|
/*
|
|
* AFTER ROW Triggers might reference the tableoid
|
|
* column, so (re-)initialize tts_tableOid before
|
|
* evaluating them.
|
|
*/
|
|
myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
|
|
}
|
|
else
|
|
{
|
|
/* OK, store the tuple and create index entries for it */
|
|
table_tuple_insert(resultRelInfo->ri_RelationDesc,
|
|
myslot, mycid, ti_options, bistate);
|
|
|
|
if (resultRelInfo->ri_NumIndices > 0)
|
|
recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
|
|
myslot,
|
|
estate,
|
|
false,
|
|
false,
|
|
NULL,
|
|
NIL,
|
|
false);
|
|
}
|
|
|
|
/* AFTER ROW INSERT Triggers */
|
|
ExecARInsertTriggers(estate, resultRelInfo, myslot,
|
|
recheckIndexes, cstate->transition_capture);
|
|
|
|
list_free(recheckIndexes);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* We count only tuples not suppressed by a BEFORE INSERT trigger
|
|
* or FDW; this is the same definition used by nodeModifyTable.c
|
|
* for counting tuples inserted by an INSERT command. Update
|
|
* progress of the COPY command as well.
|
|
*/
|
|
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
|
|
++processed);
|
|
}
|
|
}
|
|
|
|
/* Flush any remaining buffered tuples */
|
|
if (insertMethod != CIM_SINGLE)
|
|
{
|
|
if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
|
|
CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
|
|
}
|
|
|
|
/* Done, clean up */
|
|
error_context_stack = errcallback.previous;
|
|
|
|
if (bistate != NULL)
|
|
FreeBulkInsertState(bistate);
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
/* Execute AFTER STATEMENT insertion triggers */
|
|
ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
|
|
|
|
/* Handle queued AFTER triggers */
|
|
AfterTriggerEndQuery(estate);
|
|
|
|
ExecResetTupleTable(estate->es_tupleTable, false);
|
|
|
|
/* Allow the FDW to shut down */
|
|
if (target_resultRelInfo->ri_FdwRoutine != NULL &&
|
|
target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
|
|
target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
|
|
target_resultRelInfo);
|
|
|
|
/* Tear down the multi-insert buffer data */
|
|
if (insertMethod != CIM_SINGLE)
|
|
CopyMultiInsertInfoCleanup(&multiInsertInfo);
|
|
|
|
/* Close all the partitioned tables, leaf partitions, and their indices */
|
|
if (proute)
|
|
ExecCleanupTupleRouting(mtstate, proute);
|
|
|
|
/* Close the result relations, including any trigger target relations */
|
|
ExecCloseResultRelations(estate);
|
|
ExecCloseRangeTableRelations(estate);
|
|
|
|
FreeExecutorState(estate);
|
|
|
|
return processed;
|
|
}
|
|
|
|
/*
|
|
* Setup to read tuples from a file for COPY FROM.
|
|
*
|
|
* 'rel': Used as a template for the tuples
|
|
* 'whereClause': WHERE clause from the COPY FROM command
|
|
* 'filename': Name of server-local file to read, NULL for STDIN
|
|
* 'is_program': true if 'filename' is program to execute
|
|
* 'data_source_cb': callback that provides the input data
|
|
* 'attnamelist': List of char *, columns to include. NIL selects all cols.
|
|
* 'options': List of DefElem. See copy_opt_item in gram.y for selections.
|
|
*
|
|
* Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
|
|
*/
|
|
CopyFromState
|
|
BeginCopyFrom(ParseState *pstate,
|
|
Relation rel,
|
|
Node *whereClause,
|
|
const char *filename,
|
|
bool is_program,
|
|
copy_data_source_cb data_source_cb,
|
|
List *attnamelist,
|
|
List *options)
|
|
{
|
|
CopyFromState cstate;
|
|
bool pipe = (filename == NULL);
|
|
TupleDesc tupDesc;
|
|
AttrNumber num_phys_attrs,
|
|
num_defaults;
|
|
FmgrInfo *in_functions;
|
|
Oid *typioparams;
|
|
Oid in_func_oid;
|
|
int *defmap;
|
|
ExprState **defexprs;
|
|
MemoryContext oldcontext;
|
|
bool volatile_defexprs;
|
|
const int progress_cols[] = {
|
|
PROGRESS_COPY_COMMAND,
|
|
PROGRESS_COPY_TYPE,
|
|
PROGRESS_COPY_BYTES_TOTAL
|
|
};
|
|
int64 progress_vals[] = {
|
|
PROGRESS_COPY_COMMAND_FROM,
|
|
0,
|
|
0
|
|
};
|
|
|
|
/* Allocate workspace and zero all fields */
|
|
cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
|
|
|
|
/*
|
|
* We allocate everything used by a cstate in a new memory context. This
|
|
* avoids memory leaks during repeated use of COPY in a query.
|
|
*/
|
|
cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
|
|
"COPY",
|
|
ALLOCSET_DEFAULT_SIZES);
|
|
|
|
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
|
|
|
|
/* Extract options from the statement node tree */
|
|
ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
|
|
|
|
/* Process the target relation */
|
|
cstate->rel = rel;
|
|
|
|
tupDesc = RelationGetDescr(cstate->rel);
|
|
|
|
/* process common options or initialization */
|
|
|
|
/* Generate or convert list of attributes to process */
|
|
cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
|
|
|
|
num_phys_attrs = tupDesc->natts;
|
|
|
|
/* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
|
|
cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
|
|
if (cstate->opts.force_notnull_all)
|
|
MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
|
|
else if (cstate->opts.force_notnull)
|
|
{
|
|
List *attnums;
|
|
ListCell *cur;
|
|
|
|
attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
|
|
|
|
foreach(cur, attnums)
|
|
{
|
|
int attnum = lfirst_int(cur);
|
|
Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
|
|
|
|
if (!list_member_int(cstate->attnumlist, attnum))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
|
|
errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
|
|
NameStr(attr->attname))));
|
|
cstate->opts.force_notnull_flags[attnum - 1] = true;
|
|
}
|
|
}
|
|
|
|
/* Convert FORCE_NULL name list to per-column flags, check validity */
|
|
cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
|
|
if (cstate->opts.force_null_all)
|
|
MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
|
|
else if (cstate->opts.force_null)
|
|
{
|
|
List *attnums;
|
|
ListCell *cur;
|
|
|
|
attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
|
|
|
|
foreach(cur, attnums)
|
|
{
|
|
int attnum = lfirst_int(cur);
|
|
Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
|
|
|
|
if (!list_member_int(cstate->attnumlist, attnum))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
|
|
errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
|
|
NameStr(attr->attname))));
|
|
cstate->opts.force_null_flags[attnum - 1] = true;
|
|
}
|
|
}
|
|
|
|
/* Convert convert_selectively name list to per-column flags */
|
|
if (cstate->opts.convert_selectively)
|
|
{
|
|
List *attnums;
|
|
ListCell *cur;
|
|
|
|
cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
|
|
|
|
attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
|
|
|
|
foreach(cur, attnums)
|
|
{
|
|
int attnum = lfirst_int(cur);
|
|
Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
|
|
|
|
if (!list_member_int(cstate->attnumlist, attnum))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
|
|
errmsg_internal("selected column \"%s\" not referenced by COPY",
|
|
NameStr(attr->attname))));
|
|
cstate->convert_select_flags[attnum - 1] = true;
|
|
}
|
|
}
|
|
|
|
/* Use client encoding when ENCODING option is not specified. */
|
|
if (cstate->opts.file_encoding < 0)
|
|
cstate->file_encoding = pg_get_client_encoding();
|
|
else
|
|
cstate->file_encoding = cstate->opts.file_encoding;
|
|
|
|
/*
|
|
* Look up encoding conversion function.
|
|
*/
|
|
if (cstate->file_encoding == GetDatabaseEncoding() ||
|
|
cstate->file_encoding == PG_SQL_ASCII ||
|
|
GetDatabaseEncoding() == PG_SQL_ASCII)
|
|
{
|
|
cstate->need_transcoding = false;
|
|
}
|
|
else
|
|
{
|
|
cstate->need_transcoding = true;
|
|
cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
|
|
GetDatabaseEncoding());
|
|
if (!OidIsValid(cstate->conversion_proc))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_UNDEFINED_FUNCTION),
|
|
errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
|
|
pg_encoding_to_char(cstate->file_encoding),
|
|
pg_encoding_to_char(GetDatabaseEncoding()))));
|
|
}
|
|
|
|
cstate->copy_src = COPY_FILE; /* default */
|
|
|
|
cstate->whereClause = whereClause;
|
|
|
|
/* Initialize state variables */
|
|
cstate->eol_type = EOL_UNKNOWN;
|
|
cstate->cur_relname = RelationGetRelationName(cstate->rel);
|
|
cstate->cur_lineno = 0;
|
|
cstate->cur_attname = NULL;
|
|
cstate->cur_attval = NULL;
|
|
cstate->relname_only = false;
|
|
|
|
/*
|
|
* Allocate buffers for the input pipeline.
|
|
*
|
|
* attribute_buf and raw_buf are used in both text and binary modes, but
|
|
* input_buf and line_buf only in text mode.
|
|
*/
|
|
cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
|
|
cstate->raw_buf_index = cstate->raw_buf_len = 0;
|
|
cstate->raw_reached_eof = false;
|
|
|
|
if (!cstate->opts.binary)
|
|
{
|
|
/*
|
|
* If encoding conversion is needed, we need another buffer to hold
|
|
* the converted input data. Otherwise, we can just point input_buf
|
|
* to the same buffer as raw_buf.
|
|
*/
|
|
if (cstate->need_transcoding)
|
|
{
|
|
cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
|
|
cstate->input_buf_index = cstate->input_buf_len = 0;
|
|
}
|
|
else
|
|
cstate->input_buf = cstate->raw_buf;
|
|
cstate->input_reached_eof = false;
|
|
|
|
initStringInfo(&cstate->line_buf);
|
|
}
|
|
|
|
initStringInfo(&cstate->attribute_buf);
|
|
|
|
/* Assign range table and rteperminfos, we'll need them in CopyFrom. */
|
|
if (pstate)
|
|
{
|
|
cstate->range_table = pstate->p_rtable;
|
|
cstate->rteperminfos = pstate->p_rteperminfos;
|
|
}
|
|
|
|
num_defaults = 0;
|
|
volatile_defexprs = false;
|
|
|
|
/*
|
|
* Pick up the required catalog information for each attribute in the
|
|
* relation, including the input function, the element type (to pass to
|
|
* the input function), and info about defaults and constraints. (Which
|
|
* input function we use depends on text/binary format choice.)
|
|
*/
|
|
in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
|
|
typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
|
|
defmap = (int *) palloc(num_phys_attrs * sizeof(int));
|
|
defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
|
|
|
|
for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
|
|
{
|
|
Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
|
|
|
|
/* We don't need info for dropped attributes */
|
|
if (att->attisdropped)
|
|
continue;
|
|
|
|
/* Fetch the input function and typioparam info */
|
|
if (cstate->opts.binary)
|
|
getTypeBinaryInputInfo(att->atttypid,
|
|
&in_func_oid, &typioparams[attnum - 1]);
|
|
else
|
|
getTypeInputInfo(att->atttypid,
|
|
&in_func_oid, &typioparams[attnum - 1]);
|
|
fmgr_info(in_func_oid, &in_functions[attnum - 1]);
|
|
|
|
/* Get default info if available */
|
|
defexprs[attnum - 1] = NULL;
|
|
|
|
/*
|
|
* We only need the default values for columns that do not appear in
|
|
* the column list, unless the DEFAULT option was given. We never need
|
|
* default values for generated columns.
|
|
*/
|
|
if ((cstate->opts.default_print != NULL ||
|
|
!list_member_int(cstate->attnumlist, attnum)) &&
|
|
!att->attgenerated)
|
|
{
|
|
Expr *defexpr = (Expr *) build_column_default(cstate->rel,
|
|
attnum);
|
|
|
|
if (defexpr != NULL)
|
|
{
|
|
/* Run the expression through planner */
|
|
defexpr = expression_planner(defexpr);
|
|
|
|
/* Initialize executable expression in copycontext */
|
|
defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
|
|
|
|
/* if NOT copied from input */
|
|
/* use default value if one exists */
|
|
if (!list_member_int(cstate->attnumlist, attnum))
|
|
{
|
|
defmap[num_defaults] = attnum - 1;
|
|
num_defaults++;
|
|
}
|
|
|
|
/*
|
|
* If a default expression looks at the table being loaded,
|
|
* then it could give the wrong answer when using
|
|
* multi-insert. Since database access can be dynamic this is
|
|
* hard to test for exactly, so we use the much wider test of
|
|
* whether the default expression is volatile. We allow for
|
|
* the special case of when the default expression is the
|
|
* nextval() of a sequence which in this specific case is
|
|
* known to be safe for use with the multi-insert
|
|
* optimization. Hence we use this special case function
|
|
* checker rather than the standard check for
|
|
* contain_volatile_functions(). Note also that we already
|
|
* ran the expression through expression_planner().
|
|
*/
|
|
if (!volatile_defexprs)
|
|
volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
|
|
}
|
|
}
|
|
}
|
|
|
|
cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
|
|
|
|
/* initialize progress */
|
|
pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
|
|
cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
|
|
cstate->bytes_processed = 0;
|
|
|
|
/* We keep those variables in cstate. */
|
|
cstate->in_functions = in_functions;
|
|
cstate->typioparams = typioparams;
|
|
cstate->defmap = defmap;
|
|
cstate->defexprs = defexprs;
|
|
cstate->volatile_defexprs = volatile_defexprs;
|
|
cstate->num_defaults = num_defaults;
|
|
cstate->is_program = is_program;
|
|
|
|
if (data_source_cb)
|
|
{
|
|
progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
|
|
cstate->copy_src = COPY_CALLBACK;
|
|
cstate->data_source_cb = data_source_cb;
|
|
}
|
|
else if (pipe)
|
|
{
|
|
progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
|
|
Assert(!is_program); /* the grammar does not allow this */
|
|
if (whereToSendOutput == DestRemote)
|
|
ReceiveCopyBegin(cstate);
|
|
else
|
|
cstate->copy_file = stdin;
|
|
}
|
|
else
|
|
{
|
|
cstate->filename = pstrdup(filename);
|
|
|
|
if (cstate->is_program)
|
|
{
|
|
progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
|
|
cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
|
|
if (cstate->copy_file == NULL)
|
|
ereport(ERROR,
|
|
(errcode_for_file_access(),
|
|
errmsg("could not execute command \"%s\": %m",
|
|
cstate->filename)));
|
|
}
|
|
else
|
|
{
|
|
struct stat st;
|
|
|
|
progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
|
|
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
|
|
if (cstate->copy_file == NULL)
|
|
{
|
|
/* copy errno because ereport subfunctions might change it */
|
|
int save_errno = errno;
|
|
|
|
ereport(ERROR,
|
|
(errcode_for_file_access(),
|
|
errmsg("could not open file \"%s\" for reading: %m",
|
|
cstate->filename),
|
|
(save_errno == ENOENT || save_errno == EACCES) ?
|
|
errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
|
|
"You may want a client-side facility such as psql's \\copy.") : 0));
|
|
}
|
|
|
|
if (fstat(fileno(cstate->copy_file), &st))
|
|
ereport(ERROR,
|
|
(errcode_for_file_access(),
|
|
errmsg("could not stat file \"%s\": %m",
|
|
cstate->filename)));
|
|
|
|
if (S_ISDIR(st.st_mode))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
|
errmsg("\"%s\" is a directory", cstate->filename)));
|
|
|
|
progress_vals[2] = st.st_size;
|
|
}
|
|
}
|
|
|
|
pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
|
|
|
|
if (cstate->opts.binary)
|
|
{
|
|
/* Read and verify binary header */
|
|
ReceiveCopyBinaryHeader(cstate);
|
|
}
|
|
|
|
/* create workspace for CopyReadAttributes results */
|
|
if (!cstate->opts.binary)
|
|
{
|
|
AttrNumber attr_count = list_length(cstate->attnumlist);
|
|
|
|
cstate->max_fields = attr_count;
|
|
cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
|
|
}
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
return cstate;
|
|
}
|
|
|
|
/*
|
|
* Clean up storage and release resources for COPY FROM.
|
|
*/
|
|
void
|
|
EndCopyFrom(CopyFromState cstate)
|
|
{
|
|
/* No COPY FROM related resources except memory. */
|
|
if (cstate->is_program)
|
|
{
|
|
ClosePipeFromProgram(cstate);
|
|
}
|
|
else
|
|
{
|
|
if (cstate->filename != NULL && FreeFile(cstate->copy_file))
|
|
ereport(ERROR,
|
|
(errcode_for_file_access(),
|
|
errmsg("could not close file \"%s\": %m",
|
|
cstate->filename)));
|
|
}
|
|
|
|
pgstat_progress_end_command();
|
|
|
|
MemoryContextDelete(cstate->copycontext);
|
|
pfree(cstate);
|
|
}
|
|
|
|
/*
|
|
* Closes the pipe from an external program, checking the pclose() return code.
|
|
*/
|
|
static void
|
|
ClosePipeFromProgram(CopyFromState cstate)
|
|
{
|
|
int pclose_rc;
|
|
|
|
Assert(cstate->is_program);
|
|
|
|
pclose_rc = ClosePipeStream(cstate->copy_file);
|
|
if (pclose_rc == -1)
|
|
ereport(ERROR,
|
|
(errcode_for_file_access(),
|
|
errmsg("could not close pipe to external command: %m")));
|
|
else if (pclose_rc != 0)
|
|
{
|
|
/*
|
|
* If we ended a COPY FROM PROGRAM before reaching EOF, then it's
|
|
* expectable for the called program to fail with SIGPIPE, and we
|
|
* should not report that as an error. Otherwise, SIGPIPE indicates a
|
|
* problem.
|
|
*/
|
|
if (!cstate->raw_reached_eof &&
|
|
wait_result_is_signal(pclose_rc, SIGPIPE))
|
|
return;
|
|
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
|
|
errmsg("program \"%s\" failed",
|
|
cstate->filename),
|
|
errdetail_internal("%s", wait_result_to_str(pclose_rc))));
|
|
}
|
|
}
|