mirror of
https://github.com/postgres/postgres.git
synced 2025-04-25 21:42:33 +03:00
Improve performance of binary COPY FROM through better buffering.
At least on Linux and macOS, fread() turns out to have far higher per-call overhead than one could wish. Reading 64KB of data at a time and then parceling it out with our own memcpy logic makes binary COPY from a file significantly faster --- around 30% in simple testing for cases with narrow text columns (on Linux ... even more on macOS). In binary COPY from frontend, there's no per-call fread(), and this patch introduces an extra layer of memcpy'ing, but it still manages to eke out a small win. Apparently, the control-logic overhead in CopyGetData() is enough to be worth avoiding for small fetches. Bharath Rupireddy and Amit Langote, reviewed by Vignesh C, cosmetic tweaks by me Discussion: https://postgr.es/m/CALj2ACU5Bz06HWLwqSzNMN=Gupoj6Rcn_QVC+k070V4em9wu=A@mail.gmail.com
This commit is contained in:
parent
8a37951eeb
commit
0a0727ccfc
@ -187,15 +187,15 @@ typedef struct CopyStateData
|
|||||||
TransitionCaptureState *transition_capture;
|
TransitionCaptureState *transition_capture;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* These variables are used to reduce overhead in textual COPY FROM.
|
* These variables are used to reduce overhead in COPY FROM.
|
||||||
*
|
*
|
||||||
* attribute_buf holds the separated, de-escaped text for each field of
|
* attribute_buf holds the separated, de-escaped text for each field of
|
||||||
* the current line. The CopyReadAttributes functions return arrays of
|
* the current line. The CopyReadAttributes functions return arrays of
|
||||||
* pointers into this buffer. We avoid palloc/pfree overhead by re-using
|
* pointers into this buffer. We avoid palloc/pfree overhead by re-using
|
||||||
* the buffer on each cycle.
|
* the buffer on each cycle.
|
||||||
*
|
*
|
||||||
* (In binary COPY FROM, attribute_buf holds the binary data for the
|
* In binary COPY FROM, attribute_buf holds the binary data for the
|
||||||
* current field, while the other variables are not used.)
|
* current field, but the usage is otherwise similar.
|
||||||
*/
|
*/
|
||||||
StringInfoData attribute_buf;
|
StringInfoData attribute_buf;
|
||||||
|
|
||||||
@ -209,7 +209,8 @@ typedef struct CopyStateData
|
|||||||
* input cycle is first to read the whole line into line_buf, convert it
|
* input cycle is first to read the whole line into line_buf, convert it
|
||||||
* to server encoding there, and then extract the individual attribute
|
* to server encoding there, and then extract the individual attribute
|
||||||
* fields into attribute_buf. line_buf is preserved unmodified so that we
|
* fields into attribute_buf. line_buf is preserved unmodified so that we
|
||||||
* can display it in error messages if appropriate.
|
* can display it in error messages if appropriate. (In binary mode,
|
||||||
|
* line_buf is not used.)
|
||||||
*/
|
*/
|
||||||
StringInfoData line_buf;
|
StringInfoData line_buf;
|
||||||
bool line_buf_converted; /* converted to server encoding? */
|
bool line_buf_converted; /* converted to server encoding? */
|
||||||
@ -217,15 +218,18 @@ typedef struct CopyStateData
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* Finally, raw_buf holds raw data read from the data source (file or
|
* Finally, raw_buf holds raw data read from the data source (file or
|
||||||
* client connection). CopyReadLine parses this data sufficiently to
|
* client connection). In text mode, CopyReadLine parses this data
|
||||||
* locate line boundaries, then transfers the data to line_buf and
|
* sufficiently to locate line boundaries, then transfers the data to
|
||||||
* converts it. Note: we guarantee that there is a \0 at
|
* line_buf and converts it. In binary mode, CopyReadBinaryData fetches
|
||||||
* raw_buf[raw_buf_len].
|
* appropriate amounts of data from this buffer. In both modes, we
|
||||||
|
* guarantee that there is a \0 at raw_buf[raw_buf_len].
|
||||||
*/
|
*/
|
||||||
#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
|
#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
|
||||||
char *raw_buf;
|
char *raw_buf;
|
||||||
int raw_buf_index; /* next byte to process */
|
int raw_buf_index; /* next byte to process */
|
||||||
int raw_buf_len; /* total # of bytes stored */
|
int raw_buf_len; /* total # of bytes stored */
|
||||||
|
/* Shorthand for number of unconsumed bytes available in raw_buf */
|
||||||
|
#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
|
||||||
} CopyStateData;
|
} CopyStateData;
|
||||||
|
|
||||||
/* DestReceiver for COPY (query) TO */
|
/* DestReceiver for COPY (query) TO */
|
||||||
@ -394,6 +398,8 @@ static void CopySendInt32(CopyState cstate, int32 val);
|
|||||||
static bool CopyGetInt32(CopyState cstate, int32 *val);
|
static bool CopyGetInt32(CopyState cstate, int32 *val);
|
||||||
static void CopySendInt16(CopyState cstate, int16 val);
|
static void CopySendInt16(CopyState cstate, int16 val);
|
||||||
static bool CopyGetInt16(CopyState cstate, int16 *val);
|
static bool CopyGetInt16(CopyState cstate, int16 *val);
|
||||||
|
static bool CopyLoadRawBuf(CopyState cstate);
|
||||||
|
static int CopyReadBinaryData(CopyState cstate, char *dest, int nbytes);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -723,7 +729,7 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
|
|||||||
/*
|
/*
|
||||||
* CopySendInt32 sends an int32 in network byte order
|
* CopySendInt32 sends an int32 in network byte order
|
||||||
*/
|
*/
|
||||||
static void
|
static inline void
|
||||||
CopySendInt32(CopyState cstate, int32 val)
|
CopySendInt32(CopyState cstate, int32 val)
|
||||||
{
|
{
|
||||||
uint32 buf;
|
uint32 buf;
|
||||||
@ -737,12 +743,12 @@ CopySendInt32(CopyState cstate, int32 val)
|
|||||||
*
|
*
|
||||||
* Returns true if OK, false if EOF
|
* Returns true if OK, false if EOF
|
||||||
*/
|
*/
|
||||||
static bool
|
static inline bool
|
||||||
CopyGetInt32(CopyState cstate, int32 *val)
|
CopyGetInt32(CopyState cstate, int32 *val)
|
||||||
{
|
{
|
||||||
uint32 buf;
|
uint32 buf;
|
||||||
|
|
||||||
if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
|
if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf))
|
||||||
{
|
{
|
||||||
*val = 0; /* suppress compiler warning */
|
*val = 0; /* suppress compiler warning */
|
||||||
return false;
|
return false;
|
||||||
@ -754,7 +760,7 @@ CopyGetInt32(CopyState cstate, int32 *val)
|
|||||||
/*
|
/*
|
||||||
* CopySendInt16 sends an int16 in network byte order
|
* CopySendInt16 sends an int16 in network byte order
|
||||||
*/
|
*/
|
||||||
static void
|
static inline void
|
||||||
CopySendInt16(CopyState cstate, int16 val)
|
CopySendInt16(CopyState cstate, int16 val)
|
||||||
{
|
{
|
||||||
uint16 buf;
|
uint16 buf;
|
||||||
@ -766,12 +772,12 @@ CopySendInt16(CopyState cstate, int16 val)
|
|||||||
/*
|
/*
|
||||||
* CopyGetInt16 reads an int16 that appears in network byte order
|
* CopyGetInt16 reads an int16 that appears in network byte order
|
||||||
*/
|
*/
|
||||||
static bool
|
static inline bool
|
||||||
CopyGetInt16(CopyState cstate, int16 *val)
|
CopyGetInt16(CopyState cstate, int16 *val)
|
||||||
{
|
{
|
||||||
uint16 buf;
|
uint16 buf;
|
||||||
|
|
||||||
if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
|
if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf))
|
||||||
{
|
{
|
||||||
*val = 0; /* suppress compiler warning */
|
*val = 0; /* suppress compiler warning */
|
||||||
return false;
|
return false;
|
||||||
@ -786,26 +792,20 @@ CopyGetInt16(CopyState cstate, int16 *val)
|
|||||||
*
|
*
|
||||||
* Returns true if able to obtain at least one more byte, else false.
|
* Returns true if able to obtain at least one more byte, else false.
|
||||||
*
|
*
|
||||||
* If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
|
* If RAW_BUF_BYTES(cstate) > 0, the unprocessed bytes are moved to the start
|
||||||
* down to the start of the buffer and then we load more data after that.
|
* of the buffer and then we load more data after that. This case occurs only
|
||||||
* This case is used only when a frontend multibyte character crosses a
|
* when a multibyte character crosses a bufferload boundary.
|
||||||
* bufferload boundary.
|
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
CopyLoadRawBuf(CopyState cstate)
|
CopyLoadRawBuf(CopyState cstate)
|
||||||
{
|
{
|
||||||
int nbytes;
|
int nbytes = RAW_BUF_BYTES(cstate);
|
||||||
int inbytes;
|
int inbytes;
|
||||||
|
|
||||||
if (cstate->raw_buf_index < cstate->raw_buf_len)
|
/* Copy down the unprocessed data if any. */
|
||||||
{
|
if (nbytes > 0)
|
||||||
/* Copy down the unprocessed data */
|
|
||||||
nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
|
|
||||||
memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
|
memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
|
||||||
nbytes);
|
nbytes);
|
||||||
}
|
|
||||||
else
|
|
||||||
nbytes = 0; /* no data need be saved */
|
|
||||||
|
|
||||||
inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
|
inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
|
||||||
1, RAW_BUF_SIZE - nbytes);
|
1, RAW_BUF_SIZE - nbytes);
|
||||||
@ -816,6 +816,54 @@ CopyLoadRawBuf(CopyState cstate)
|
|||||||
return (inbytes > 0);
|
return (inbytes > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CopyReadBinaryData
|
||||||
|
*
|
||||||
|
* Reads up to 'nbytes' bytes from cstate->copy_file via cstate->raw_buf
|
||||||
|
* and writes them to 'dest'. Returns the number of bytes read (which
|
||||||
|
* would be less than 'nbytes' only if we reach EOF).
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
CopyReadBinaryData(CopyState cstate, char *dest, int nbytes)
|
||||||
|
{
|
||||||
|
int copied_bytes = 0;
|
||||||
|
|
||||||
|
if (RAW_BUF_BYTES(cstate) >= nbytes)
|
||||||
|
{
|
||||||
|
/* Enough bytes are present in the buffer. */
|
||||||
|
memcpy(dest, cstate->raw_buf + cstate->raw_buf_index, nbytes);
|
||||||
|
cstate->raw_buf_index += nbytes;
|
||||||
|
copied_bytes = nbytes;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Not enough bytes in the buffer, so must read from the file. Need
|
||||||
|
* to loop since 'nbytes' could be larger than the buffer size.
|
||||||
|
*/
|
||||||
|
do
|
||||||
|
{
|
||||||
|
int copy_bytes;
|
||||||
|
|
||||||
|
/* Load more data if buffer is empty. */
|
||||||
|
if (RAW_BUF_BYTES(cstate) == 0)
|
||||||
|
{
|
||||||
|
if (!CopyLoadRawBuf(cstate))
|
||||||
|
break; /* EOF */
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Transfer some bytes. */
|
||||||
|
copy_bytes = Min(nbytes - copied_bytes, RAW_BUF_BYTES(cstate));
|
||||||
|
memcpy(dest, cstate->raw_buf + cstate->raw_buf_index, copy_bytes);
|
||||||
|
cstate->raw_buf_index += copy_bytes;
|
||||||
|
dest += copy_bytes;
|
||||||
|
copied_bytes += copy_bytes;
|
||||||
|
} while (copied_bytes < nbytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
return copied_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DoCopy executes the SQL COPY statement
|
* DoCopy executes the SQL COPY statement
|
||||||
@ -3366,17 +3414,17 @@ BeginCopyFrom(ParseState *pstate,
|
|||||||
cstate->cur_attval = NULL;
|
cstate->cur_attval = NULL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Set up variables to avoid per-attribute overhead. attribute_buf is
|
* Set up variables to avoid per-attribute overhead. attribute_buf and
|
||||||
* used in both text and binary modes, but we use line_buf and raw_buf
|
* raw_buf are used in both text and binary modes, but we use line_buf
|
||||||
* only in text mode.
|
* only in text mode.
|
||||||
*/
|
*/
|
||||||
initStringInfo(&cstate->attribute_buf);
|
initStringInfo(&cstate->attribute_buf);
|
||||||
|
cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
|
||||||
|
cstate->raw_buf_index = cstate->raw_buf_len = 0;
|
||||||
if (!cstate->binary)
|
if (!cstate->binary)
|
||||||
{
|
{
|
||||||
initStringInfo(&cstate->line_buf);
|
initStringInfo(&cstate->line_buf);
|
||||||
cstate->line_buf_converted = false;
|
cstate->line_buf_converted = false;
|
||||||
cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
|
|
||||||
cstate->raw_buf_index = cstate->raw_buf_len = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Assign range table, we'll need it in CopyFrom. */
|
/* Assign range table, we'll need it in CopyFrom. */
|
||||||
@ -3527,7 +3575,7 @@ BeginCopyFrom(ParseState *pstate,
|
|||||||
int32 tmp;
|
int32 tmp;
|
||||||
|
|
||||||
/* Signature */
|
/* Signature */
|
||||||
if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
|
if (CopyReadBinaryData(cstate, readSig, 11) != 11 ||
|
||||||
memcmp(readSig, BinarySignature, 11) != 0)
|
memcmp(readSig, BinarySignature, 11) != 0)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
|
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
|
||||||
@ -3555,7 +3603,7 @@ BeginCopyFrom(ParseState *pstate,
|
|||||||
/* Skip extension header, if present */
|
/* Skip extension header, if present */
|
||||||
while (tmp-- > 0)
|
while (tmp-- > 0)
|
||||||
{
|
{
|
||||||
if (CopyGetData(cstate, readSig, 1, 1) != 1)
|
if (CopyReadBinaryData(cstate, readSig, 1) != 1)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
|
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
|
||||||
errmsg("invalid COPY file header (wrong length)")));
|
errmsg("invalid COPY file header (wrong length)")));
|
||||||
@ -3771,7 +3819,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
|
|||||||
char dummy;
|
char dummy;
|
||||||
|
|
||||||
if (cstate->copy_dest != COPY_OLD_FE &&
|
if (cstate->copy_dest != COPY_OLD_FE &&
|
||||||
CopyGetData(cstate, &dummy, 1, 1) > 0)
|
CopyReadBinaryData(cstate, &dummy, 1) > 0)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
|
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
|
||||||
errmsg("received copy data after EOF marker")));
|
errmsg("received copy data after EOF marker")));
|
||||||
@ -4744,8 +4792,8 @@ CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo,
|
|||||||
resetStringInfo(&cstate->attribute_buf);
|
resetStringInfo(&cstate->attribute_buf);
|
||||||
|
|
||||||
enlargeStringInfo(&cstate->attribute_buf, fld_size);
|
enlargeStringInfo(&cstate->attribute_buf, fld_size);
|
||||||
if (CopyGetData(cstate, cstate->attribute_buf.data,
|
if (CopyReadBinaryData(cstate, cstate->attribute_buf.data,
|
||||||
fld_size, fld_size) != fld_size)
|
fld_size) != fld_size)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
|
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
|
||||||
errmsg("unexpected EOF in COPY data")));
|
errmsg("unexpected EOF in COPY data")));
|
||||||
|
Loading…
x
Reference in New Issue
Block a user