diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 6497100a1a4..dcea5648acd 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -120,8 +120,8 @@ static List *sock_paths = NIL; static char *PqSendBuffer; static int PqSendBufferSize; /* Size send buffer */ -static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */ -static int PqSendStart; /* Next index to send a byte in PqSendBuffer */ +static size_t PqSendPointer; /* Next index to store a byte in PqSendBuffer */ +static size_t PqSendStart; /* Next index to send a byte in PqSendBuffer */ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE]; static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */ @@ -143,8 +143,10 @@ static int socket_flush_if_writable(void); static bool socket_is_send_pending(void); static int socket_putmessage(char msgtype, const char *s, size_t len); static void socket_putmessage_noblock(char msgtype, const char *s, size_t len); -static int internal_putbytes(const char *s, size_t len); -static int internal_flush(void); +static inline int internal_putbytes(const char *s, size_t len); +static inline int internal_flush(void); +static pg_noinline int internal_flush_buffer(const char *buf, size_t *start, + size_t *end); static int Lock_AF_UNIX(const char *unixSocketDir, const char *unixSocketPath); static int Setup_AF_UNIX(const char *sock_path); @@ -1268,11 +1270,9 @@ pq_getmessage(StringInfo s, int maxlen) } -static int +static inline int internal_putbytes(const char *s, size_t len) { - size_t amount; - while (len > 0) { /* If buffer is full, then flush it out */ @@ -1282,14 +1282,33 @@ internal_putbytes(const char *s, size_t len) if (internal_flush()) return EOF; } - amount = PqSendBufferSize - PqSendPointer; - if (amount > len) - amount = len; - memcpy(PqSendBuffer + PqSendPointer, s, amount); - PqSendPointer += amount; - s += amount; - len -= amount; + + /* + * If the buffer is empty and data length is larger than the buffer + * size, send it without buffering. Otherwise, copy as much data as + * possible into the buffer. + */ + if (len >= PqSendBufferSize && PqSendStart == PqSendPointer) + { + size_t start = 0; + + socket_set_nonblocking(false); + if (internal_flush_buffer(s, &start, &len)) + return EOF; + } + else + { + size_t amount = PqSendBufferSize - PqSendPointer; + + if (amount > len) + amount = len; + memcpy(PqSendBuffer + PqSendPointer, s, amount); + PqSendPointer += amount; + s += amount; + len -= amount; + } } + return 0; } @@ -1321,19 +1340,32 @@ socket_flush(void) * and the socket is in non-blocking mode), or EOF if trouble. * -------------------------------- */ -static int +static inline int internal_flush(void) +{ + return internal_flush_buffer(PqSendBuffer, &PqSendStart, &PqSendPointer); +} + +/* -------------------------------- + * internal_flush_buffer - flush the given buffer content + * + * Returns 0 if OK (meaning everything was sent, or operation would block + * and the socket is in non-blocking mode), or EOF if trouble. + * -------------------------------- + */ +static pg_noinline int +internal_flush_buffer(const char *buf, size_t *start, size_t *end) { static int last_reported_send_errno = 0; - char *bufptr = PqSendBuffer + PqSendStart; - char *bufend = PqSendBuffer + PqSendPointer; + const char *bufptr = buf + *start; + const char *bufend = buf + *end; while (bufptr < bufend) { int r; - r = secure_write(MyProcPort, bufptr, bufend - bufptr); + r = secure_write(MyProcPort, (char *) bufptr, bufend - bufptr); if (r <= 0) { @@ -1373,7 +1405,7 @@ internal_flush(void) * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate * the connection. */ - PqSendStart = PqSendPointer = 0; + *start = *end = 0; ClientConnectionLost = 1; InterruptPending = 1; return EOF; @@ -1381,10 +1413,10 @@ internal_flush(void) last_reported_send_errno = 0; /* reset after any successful send */ bufptr += r; - PqSendStart += r; + *start += r; } - PqSendStart = PqSendPointer = 0; + *start = *end = 0; return 0; }