mirror of
https://github.com/postgres/postgres.git
synced 2025-07-28 23:42:10 +03:00
Second round of FE/BE protocol changes. Frontend->backend messages now
have length counts, and COPY IN data is packetized into messages.
This commit is contained in:
@ -16,16 +16,14 @@
|
||||
* will cause repeat printouts.
|
||||
*
|
||||
* We must speak the same transmitted data representations as the backend
|
||||
* routines. Note that this module supports *only* network byte order
|
||||
* for transmitted ints, whereas the backend modules (as of this writing)
|
||||
* still handle either network or little-endian byte order.
|
||||
* routines.
|
||||
*
|
||||
*
|
||||
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $Header: /cvsroot/pgsql/src/interfaces/libpq/fe-misc.c,v 1.88 2003/04/02 00:49:28 tgl Exp $
|
||||
* $Header: /cvsroot/pgsql/src/interfaces/libpq/fe-misc.c,v 1.89 2003/04/19 00:02:30 tgl Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -63,15 +61,15 @@
|
||||
#define DONOTICE(conn,message) \
|
||||
((*(conn)->noticeHook) ((conn)->noticeArg, (message)))
|
||||
|
||||
static int pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
|
||||
static int pqSendSome(PGconn *conn, int len);
|
||||
static int pqSocketCheck(PGconn *conn, int forRead, int forWrite,
|
||||
time_t end_time);
|
||||
static int pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time);
|
||||
static int pqPutBytes(const char *s, size_t nbytes, PGconn *conn);
|
||||
|
||||
|
||||
/*
|
||||
* pqGetc:
|
||||
* get a character from the connection
|
||||
* pqGetc: get 1 character from the connection
|
||||
*
|
||||
* All these routines return 0 on success, EOF on error.
|
||||
* Note that for the Get routines, EOF only means there is not enough
|
||||
@ -93,12 +91,12 @@ pqGetc(char *result, PGconn *conn)
|
||||
|
||||
|
||||
/*
|
||||
* write 1 char to the connection
|
||||
* pqPutc: write 1 char to the current message
|
||||
*/
|
||||
int
|
||||
pqPutc(char c, PGconn *conn)
|
||||
{
|
||||
if (pqPutBytes(&c, 1, conn) == EOF)
|
||||
if (pqPutMsgBytes(&c, 1, conn))
|
||||
return EOF;
|
||||
|
||||
if (conn->Pfdebug)
|
||||
@ -108,93 +106,6 @@ pqPutc(char c, PGconn *conn)
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* pqPutBytes: local routine to write N bytes to the connection,
|
||||
* with buffering
|
||||
*/
|
||||
static int
|
||||
pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
|
||||
{
|
||||
/*
|
||||
* Strategy to handle blocking and non-blocking connections: Fill the
|
||||
* output buffer and flush it repeatedly until either all data has
|
||||
* been sent or is at least queued in the buffer.
|
||||
*
|
||||
* For non-blocking connections, grow the buffer if not all data fits
|
||||
* into it and the buffer can't be sent because the socket would
|
||||
* block.
|
||||
*/
|
||||
|
||||
while (nbytes)
|
||||
{
|
||||
size_t avail,
|
||||
remaining;
|
||||
|
||||
/* fill the output buffer */
|
||||
avail = Max(conn->outBufSize - conn->outCount, 0);
|
||||
remaining = Min(avail, nbytes);
|
||||
memcpy(conn->outBuffer + conn->outCount, s, remaining);
|
||||
conn->outCount += remaining;
|
||||
s += remaining;
|
||||
nbytes -= remaining;
|
||||
|
||||
/*
|
||||
* if the data didn't fit completely into the buffer, try to flush
|
||||
* the buffer
|
||||
*/
|
||||
if (nbytes)
|
||||
{
|
||||
int send_result = pqSendSome(conn);
|
||||
|
||||
/* if there were errors, report them */
|
||||
if (send_result < 0)
|
||||
return EOF;
|
||||
|
||||
/*
|
||||
* if not all data could be sent, increase the output buffer,
|
||||
* put the rest of s into it and return successfully. This
|
||||
* case will only happen in a non-blocking connection
|
||||
*/
|
||||
if (send_result > 0)
|
||||
{
|
||||
/*
|
||||
* try to grow the buffer. FIXME: The new size could be
|
||||
* chosen more intelligently.
|
||||
*/
|
||||
size_t buflen = (size_t) conn->outCount + nbytes;
|
||||
|
||||
if (buflen > (size_t) conn->outBufSize)
|
||||
{
|
||||
char *newbuf = realloc(conn->outBuffer, buflen);
|
||||
|
||||
if (!newbuf)
|
||||
{
|
||||
/* realloc failed. Probably out of memory */
|
||||
printfPQExpBuffer(&conn->errorMessage,
|
||||
"cannot allocate memory for output buffer\n");
|
||||
return EOF;
|
||||
}
|
||||
conn->outBuffer = newbuf;
|
||||
conn->outBufSize = buflen;
|
||||
}
|
||||
/* put the data into it */
|
||||
memcpy(conn->outBuffer + conn->outCount, s, nbytes);
|
||||
conn->outCount += nbytes;
|
||||
|
||||
/* report success. */
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* pqSendSome was able to send all data. Continue with the next
|
||||
* chunk of s.
|
||||
*/
|
||||
} /* while */
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* pqGets:
|
||||
* get a null-terminated string from the connection,
|
||||
@ -232,14 +143,17 @@ pqGets(PQExpBuffer buf, PGconn *conn)
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* pqPuts: write a null-terminated string to the current message
|
||||
*/
|
||||
int
|
||||
pqPuts(const char *s, PGconn *conn)
|
||||
{
|
||||
if (pqPutBytes(s, strlen(s) + 1, conn))
|
||||
if (pqPutMsgBytes(s, strlen(s) + 1, conn))
|
||||
return EOF;
|
||||
|
||||
if (conn->Pfdebug)
|
||||
fprintf(conn->Pfdebug, "To backend> %s\n", s);
|
||||
fprintf(conn->Pfdebug, "To backend> '%s'\n", s);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -267,12 +181,12 @@ pqGetnchar(char *s, size_t len, PGconn *conn)
|
||||
|
||||
/*
|
||||
* pqPutnchar:
|
||||
* send a string of exactly len bytes, no null termination needed
|
||||
* write exactly len bytes to the current message
|
||||
*/
|
||||
int
|
||||
pqPutnchar(const char *s, size_t len, PGconn *conn)
|
||||
{
|
||||
if (pqPutBytes(s, len, conn))
|
||||
if (pqPutMsgBytes(s, len, conn))
|
||||
return EOF;
|
||||
|
||||
if (conn->Pfdebug)
|
||||
@ -325,7 +239,7 @@ pqGetInt(int *result, size_t bytes, PGconn *conn)
|
||||
|
||||
/*
|
||||
* pgPutInt
|
||||
* send an integer of 2 or 4 bytes, converting from host byte order
|
||||
* write an integer of 2 or 4 bytes, converting from host byte order
|
||||
* to network byte order.
|
||||
*/
|
||||
int
|
||||
@ -339,12 +253,12 @@ pqPutInt(int value, size_t bytes, PGconn *conn)
|
||||
{
|
||||
case 2:
|
||||
tmp2 = htons((uint16) value);
|
||||
if (pqPutBytes((const char *) &tmp2, 2, conn))
|
||||
if (pqPutMsgBytes((const char *) &tmp2, 2, conn))
|
||||
return EOF;
|
||||
break;
|
||||
case 4:
|
||||
tmp4 = htonl((uint32) value);
|
||||
if (pqPutBytes((const char *) &tmp4, 4, conn))
|
||||
if (pqPutMsgBytes((const char *) &tmp4, 4, conn))
|
||||
return EOF;
|
||||
break;
|
||||
default:
|
||||
@ -362,24 +276,162 @@ pqPutInt(int value, size_t bytes, PGconn *conn)
|
||||
}
|
||||
|
||||
/*
|
||||
* pqReadReady: is select() saying the file is ready to read?
|
||||
* JAB: -or- if SSL is enabled and used, is it buffering bytes?
|
||||
* Returns -1 on failure, 0 if not ready, 1 if ready.
|
||||
* Make sure conn's output buffer can hold bytes_needed bytes (caller must
|
||||
* include existing outCount into the value!)
|
||||
*
|
||||
* Returns 0 on success, EOF on error
|
||||
*/
|
||||
int
|
||||
pqReadReady(PGconn *conn)
|
||||
static int
|
||||
checkOutBufferSpace(int bytes_needed, PGconn *conn)
|
||||
{
|
||||
return pqSocketCheck(conn, 1, 0, (time_t) 0);
|
||||
int newsize = conn->outBufSize;
|
||||
char *newbuf;
|
||||
|
||||
if (bytes_needed <= newsize)
|
||||
return 0;
|
||||
/*
|
||||
* If we need to enlarge the buffer, we first try to double it in size;
|
||||
* if that doesn't work, enlarge in multiples of 8K. This avoids
|
||||
* thrashing the malloc pool by repeated small enlargements.
|
||||
*
|
||||
* Note: tests for newsize > 0 are to catch integer overflow.
|
||||
*/
|
||||
do {
|
||||
newsize *= 2;
|
||||
} while (bytes_needed > newsize && newsize > 0);
|
||||
|
||||
if (bytes_needed <= newsize)
|
||||
{
|
||||
newbuf = realloc(conn->outBuffer, newsize);
|
||||
if (newbuf)
|
||||
{
|
||||
/* realloc succeeded */
|
||||
conn->outBuffer = newbuf;
|
||||
conn->outBufSize = newsize;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
newsize = conn->outBufSize;
|
||||
do {
|
||||
newsize += 8192;
|
||||
} while (bytes_needed > newsize && newsize > 0);
|
||||
|
||||
if (bytes_needed <= newsize)
|
||||
{
|
||||
newbuf = realloc(conn->outBuffer, newsize);
|
||||
if (newbuf)
|
||||
{
|
||||
/* realloc succeeded */
|
||||
conn->outBuffer = newbuf;
|
||||
conn->outBufSize = newsize;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* realloc failed. Probably out of memory */
|
||||
printfPQExpBuffer(&conn->errorMessage,
|
||||
"cannot allocate memory for output buffer\n");
|
||||
return EOF;
|
||||
}
|
||||
|
||||
/*
|
||||
* pqWriteReady: is select() saying the file is ready to write?
|
||||
* Returns -1 on failure, 0 if not ready, 1 if ready.
|
||||
* pqPutMsgStart: begin construction of a message to the server
|
||||
*
|
||||
* msg_type is the message type byte, or 0 for a message without type byte
|
||||
* (only startup messages have no type byte)
|
||||
*
|
||||
* Returns 0 on success, EOF on error
|
||||
*
|
||||
* The idea here is that we construct the message in conn->outBuffer,
|
||||
* beginning just past any data already in outBuffer (ie, at
|
||||
* outBuffer+outCount). We enlarge the buffer as needed to hold the message.
|
||||
* When the message is complete, we fill in the length word and then advance
|
||||
* outCount past the message, making it eligible to send. The state
|
||||
* variable conn->outMsgStart points to the incomplete message's length word
|
||||
* (it is either outCount or outCount+1 depending on whether there is a
|
||||
* type byte). The state variable conn->outMsgEnd is the end of the data
|
||||
* collected so far.
|
||||
*/
|
||||
int
|
||||
pqWriteReady(PGconn *conn)
|
||||
pqPutMsgStart(char msg_type, PGconn *conn)
|
||||
{
|
||||
return pqSocketCheck(conn, 0, 1, (time_t) 0);
|
||||
int lenPos;
|
||||
|
||||
/* where the message length word will go */
|
||||
if (msg_type)
|
||||
lenPos = conn->outCount + 1;
|
||||
else
|
||||
lenPos = conn->outCount;
|
||||
/* make sure there is room for it */
|
||||
if (checkOutBufferSpace(lenPos + 4, conn))
|
||||
return EOF;
|
||||
/* okay, save the message type byte if any */
|
||||
if (msg_type)
|
||||
conn->outBuffer[conn->outCount] = msg_type;
|
||||
/* set up the message pointers */
|
||||
conn->outMsgStart = lenPos;
|
||||
conn->outMsgEnd = lenPos + 4;
|
||||
/* length word will be filled in by pqPutMsgEnd */
|
||||
|
||||
if (conn->Pfdebug)
|
||||
fprintf(conn->Pfdebug, "To backend> Msg %c\n",
|
||||
msg_type ? msg_type : ' ');
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* pqPutMsgBytes: add bytes to a partially-constructed message
|
||||
*
|
||||
* Returns 0 on success, EOF on error
|
||||
*/
|
||||
static int
|
||||
pqPutMsgBytes(const void *buf, size_t len, PGconn *conn)
|
||||
{
|
||||
/* make sure there is room for it */
|
||||
if (checkOutBufferSpace(conn->outMsgEnd + len, conn))
|
||||
return EOF;
|
||||
/* okay, save the data */
|
||||
memcpy(conn->outBuffer + conn->outMsgEnd, buf, len);
|
||||
conn->outMsgEnd += len;
|
||||
/* no Pfdebug call here, caller should do it */
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* pqPutMsgEnd: finish constructing a message and possibly send it
|
||||
*
|
||||
* Returns 0 on success, EOF on error
|
||||
*
|
||||
* We don't actually send anything here unless we've accumulated at least
|
||||
* 8K worth of data (the typical size of a pipe buffer on Unix systems).
|
||||
* This avoids sending small partial packets. The caller must use pqFlush
|
||||
* when it's important to flush all the data out to the server.
|
||||
*/
|
||||
int
|
||||
pqPutMsgEnd(PGconn *conn)
|
||||
{
|
||||
uint32 msgLen = conn->outMsgEnd - conn->outMsgStart;
|
||||
|
||||
if (conn->Pfdebug)
|
||||
fprintf(conn->Pfdebug, "To backend> Msg complete, length %u\n",
|
||||
msgLen);
|
||||
|
||||
msgLen = htonl(msgLen);
|
||||
memcpy(conn->outBuffer + conn->outMsgStart, &msgLen, 4);
|
||||
conn->outCount = conn->outMsgEnd;
|
||||
|
||||
if (conn->outCount >= 8192)
|
||||
{
|
||||
int toSend = conn->outCount - (conn->outCount % 8192);
|
||||
|
||||
if (pqSendSome(conn, toSend) < 0)
|
||||
return EOF;
|
||||
/* in nonblock mode, don't complain if unable to send it all */
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* ----------
|
||||
@ -580,16 +632,20 @@ definitelyFailed:
|
||||
}
|
||||
|
||||
/*
|
||||
* pqSendSome: send any data waiting in the output buffer.
|
||||
* pqSendSome: send data waiting in the output buffer.
|
||||
*
|
||||
* Return 0 on sucess, -1 on failure and 1 when data remains because the
|
||||
* socket would block and the connection is non-blocking.
|
||||
* len is how much to try to send (typically equal to outCount, but may
|
||||
* be less).
|
||||
*
|
||||
* Return 0 on success, -1 on failure and 1 when not all data could be sent
|
||||
* because the socket would block and the connection is non-blocking.
|
||||
*/
|
||||
int
|
||||
pqSendSome(PGconn *conn)
|
||||
static int
|
||||
pqSendSome(PGconn *conn, int len)
|
||||
{
|
||||
char *ptr = conn->outBuffer;
|
||||
int len = conn->outCount;
|
||||
int remaining = conn->outCount;
|
||||
int result = 0;
|
||||
|
||||
if (conn->sock < 0)
|
||||
{
|
||||
@ -598,13 +654,6 @@ pqSendSome(PGconn *conn)
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* don't try to send zero data, allows us to use this function without
|
||||
* too much worry about overhead
|
||||
*/
|
||||
if (len == 0)
|
||||
return (0);
|
||||
|
||||
/* while there's still data to send */
|
||||
while (len > 0)
|
||||
{
|
||||
@ -648,8 +697,9 @@ pqSendSome(PGconn *conn)
|
||||
* (typically, a NOTICE message from the backend
|
||||
* telling us it's committing hara-kiri...). Leave
|
||||
* the socket open until pqReadData finds no more data
|
||||
* can be read.
|
||||
* can be read. But abandon attempt to send data.
|
||||
*/
|
||||
conn->outCount = 0;
|
||||
return -1;
|
||||
|
||||
default:
|
||||
@ -657,6 +707,7 @@ pqSendSome(PGconn *conn)
|
||||
libpq_gettext("could not send data to server: %s\n"),
|
||||
SOCK_STRERROR(SOCK_ERRNO));
|
||||
/* We don't assume it's a fatal error... */
|
||||
conn->outCount = 0;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
@ -664,6 +715,7 @@ pqSendSome(PGconn *conn)
|
||||
{
|
||||
ptr += sent;
|
||||
len -= sent;
|
||||
remaining -= sent;
|
||||
}
|
||||
|
||||
if (len > 0)
|
||||
@ -681,46 +733,49 @@ pqSendSome(PGconn *conn)
|
||||
#endif
|
||||
if (pqIsnonblocking(conn))
|
||||
{
|
||||
/* shift the contents of the buffer */
|
||||
memmove(conn->outBuffer, ptr, len);
|
||||
conn->outCount = len;
|
||||
return 1;
|
||||
result = 1;
|
||||
break;
|
||||
}
|
||||
#ifdef USE_SSL
|
||||
}
|
||||
#endif
|
||||
|
||||
if (pqWait(FALSE, TRUE, conn))
|
||||
return -1;
|
||||
{
|
||||
result = -1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
conn->outCount = 0;
|
||||
/* shift the remaining contents of the buffer */
|
||||
if (remaining > 0)
|
||||
memmove(conn->outBuffer, ptr, remaining);
|
||||
conn->outCount = remaining;
|
||||
|
||||
if (conn->Pfdebug)
|
||||
fflush(conn->Pfdebug);
|
||||
|
||||
return 0;
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* pqFlush: send any data waiting in the output buffer
|
||||
*
|
||||
* Implemented in terms of pqSendSome to recreate the old behavior which
|
||||
* returned 0 if all data was sent or EOF. EOF was sent regardless of
|
||||
* whether an error occurred or not all data was sent on a non-blocking
|
||||
* socket.
|
||||
* Return 0 on success, -1 on failure and 1 when not all data could be sent
|
||||
* because the socket would block and the connection is non-blocking.
|
||||
*/
|
||||
int
|
||||
pqFlush(PGconn *conn)
|
||||
{
|
||||
if (pqSendSome(conn))
|
||||
return EOF;
|
||||
if (conn->Pfdebug)
|
||||
fflush(conn->Pfdebug);
|
||||
|
||||
if (conn->outCount > 0)
|
||||
return pqSendSome(conn, conn->outCount);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* pqWait: wait until we can read or write the connection socket
|
||||
*
|
||||
@ -766,10 +821,31 @@ pqWaitTimed(int forRead, int forWrite, PGconn *conn, time_t finish_time)
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* pqReadReady: is select() saying the file is ready to read?
|
||||
* Returns -1 on failure, 0 if not ready, 1 if ready.
|
||||
*/
|
||||
int
|
||||
pqReadReady(PGconn *conn)
|
||||
{
|
||||
return pqSocketCheck(conn, 1, 0, (time_t) 0);
|
||||
}
|
||||
|
||||
/*
|
||||
* pqWriteReady: is select() saying the file is ready to write?
|
||||
* Returns -1 on failure, 0 if not ready, 1 if ready.
|
||||
*/
|
||||
int
|
||||
pqWriteReady(PGconn *conn)
|
||||
{
|
||||
return pqSocketCheck(conn, 0, 1, (time_t) 0);
|
||||
}
|
||||
|
||||
/*
|
||||
* Checks a socket, using poll or select, for data to be read, written,
|
||||
* or both. Returns >0 if one or more conditions are met, 0 if it timed
|
||||
* out, -1 if an error occurred.
|
||||
*
|
||||
* If SSL is in use, the SSL buffer is checked prior to checking the socket
|
||||
* for read data directly.
|
||||
*/
|
||||
@ -787,8 +863,8 @@ pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time)
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* JAB: Check for SSL library buffering read bytes */
|
||||
#ifdef USE_SSL
|
||||
/* Check for SSL library buffering read bytes */
|
||||
if (forRead && conn->ssl && SSL_pending(conn->ssl) > 0)
|
||||
{
|
||||
/* short-circuit the select */
|
||||
@ -819,6 +895,7 @@ pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time)
|
||||
* If neither forRead nor forWrite are set, immediately return a timeout
|
||||
* condition (without waiting). Return >0 if condition is met, 0
|
||||
* if a timeout occurred, -1 if an error or interrupt occurred.
|
||||
*
|
||||
* Timeout is infinite if end_time is -1. Timeout is immediate (no blocking)
|
||||
* if end_time is 0 (or indeed, any time before now).
|
||||
*/
|
||||
@ -830,16 +907,17 @@ pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time)
|
||||
struct pollfd input_fd;
|
||||
int timeout_ms;
|
||||
|
||||
if (!forRead && !forWrite)
|
||||
return 0;
|
||||
|
||||
input_fd.fd = sock;
|
||||
input_fd.events = 0;
|
||||
input_fd.events = POLLERR;
|
||||
input_fd.revents = 0;
|
||||
|
||||
if (forRead)
|
||||
input_fd.events |= POLLIN;
|
||||
if (forWrite)
|
||||
input_fd.events |= POLLOUT;
|
||||
if (!input_fd.events)
|
||||
return 0;
|
||||
|
||||
/* Compute appropriate timeout interval */
|
||||
if (end_time == ((time_t) -1))
|
||||
|
Reference in New Issue
Block a user