mirror of
https://github.com/postgres/postgres.git
synced 2025-06-27 23:21:58 +03:00
Another round of protocol changes. Backend-to-frontend messages now all
have length words. COPY OUT reimplemented per new protocol: it doesn't need \. anymore, thank goodness. COPY BINARY to/from frontend works, at least as far as the backend is concerned --- libpq's PQgetline API is not up to snuff, and will have to be replaced with something that is null-safe. libpq uses message length words for performance improvement (no cycles wasted rescanning long messages), but not yet for error recovery.
This commit is contained in:
@ -8,7 +8,7 @@
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $Header: /cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v 1.129 2003/04/19 00:02:30 tgl Exp $
|
||||
* $Header: /cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v 1.130 2003/04/22 00:08:07 tgl Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -51,6 +51,7 @@ static PGresult *prepareAsyncResult(PGconn *conn);
|
||||
static int addTuple(PGresult *res, PGresAttValue * tup);
|
||||
static void parseInput(PGconn *conn);
|
||||
static void handleSendFailure(PGconn *conn);
|
||||
static void handleSyncLoss(PGconn *conn, char id, int msgLength);
|
||||
static int getRowDescriptions(PGconn *conn);
|
||||
static int getAnotherTuple(PGconn *conn, int binary);
|
||||
static int getNotify(PGconn *conn);
|
||||
@ -866,6 +867,8 @@ static void
|
||||
parseInput(PGconn *conn)
|
||||
{
|
||||
char id;
|
||||
int msgLength;
|
||||
int avail;
|
||||
char noticeWorkspace[128];
|
||||
|
||||
/*
|
||||
@ -874,25 +877,63 @@ parseInput(PGconn *conn)
|
||||
for (;;)
|
||||
{
|
||||
/*
|
||||
* Quit if in COPY_OUT state: we expect raw data from the server
|
||||
* until PQendcopy is called. Don't try to parse it according to
|
||||
* the normal protocol. (This is bogus. The data lines ought to
|
||||
* be part of the protocol and have identifying leading
|
||||
* characters.)
|
||||
*/
|
||||
if (conn->asyncStatus == PGASYNC_COPY_OUT)
|
||||
return;
|
||||
|
||||
/*
|
||||
* OK to try to read a message type code.
|
||||
* Try to read a message. First get the type code and length.
|
||||
* Return if not enough data.
|
||||
*/
|
||||
conn->inCursor = conn->inStart;
|
||||
if (pqGetc(&id, conn))
|
||||
return;
|
||||
if (pqGetInt(&msgLength, 4, conn))
|
||||
return;
|
||||
|
||||
/*
|
||||
* NOTIFY and NOTICE messages can happen in any state besides
|
||||
* COPY OUT; always process them right away.
|
||||
* Try to validate message type/length here. A length less than 4
|
||||
* is definitely broken. Large lengths should only be believed
|
||||
* for a few message types.
|
||||
*/
|
||||
if (msgLength < 4)
|
||||
{
|
||||
handleSyncLoss(conn, id, msgLength);
|
||||
return;
|
||||
}
|
||||
if (msgLength > 30000 &&
|
||||
!(id == 'T' || id == 'D' || id == 'B' || id == 'd'))
|
||||
{
|
||||
handleSyncLoss(conn, id, msgLength);
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Can't process if message body isn't all here yet.
|
||||
*/
|
||||
msgLength -= 4;
|
||||
avail = conn->inEnd - conn->inCursor;
|
||||
if (avail < msgLength)
|
||||
{
|
||||
/*
|
||||
* Before returning, enlarge the input buffer if needed to hold
|
||||
* the whole message. This is better than leaving it to
|
||||
* pqReadData because we can avoid multiple cycles of realloc()
|
||||
* when the message is large; also, we can implement a reasonable
|
||||
* recovery strategy if we are unable to make the buffer big
|
||||
* enough.
|
||||
*/
|
||||
if (pqCheckInBufferSpace(conn->inCursor + msgLength, conn))
|
||||
{
|
||||
/*
|
||||
* XXX add some better recovery code... plan is to skip
|
||||
* over the message using its length, then report an error.
|
||||
* For the moment, just treat this like loss of sync (which
|
||||
* indeed it might be!)
|
||||
*/
|
||||
handleSyncLoss(conn, id, msgLength);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* NOTIFY and NOTICE messages can happen in any state; always process
|
||||
* them right away.
|
||||
*
|
||||
* Most other messages should only be processed while in BUSY state.
|
||||
* (In particular, in READY state we hold off further parsing
|
||||
@ -936,9 +977,8 @@ parseInput(PGconn *conn)
|
||||
libpq_gettext("message type 0x%02x arrived from server while idle\n"),
|
||||
id);
|
||||
DONOTICE(conn, noticeWorkspace);
|
||||
/* Discard the unexpected message; good idea?? */
|
||||
conn->inStart = conn->inEnd;
|
||||
break;
|
||||
/* Discard the unexpected message */
|
||||
conn->inCursor += msgLength;
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -969,16 +1009,6 @@ parseInput(PGconn *conn)
|
||||
conn->asyncStatus = PGASYNC_IDLE;
|
||||
break;
|
||||
case 'I': /* empty query */
|
||||
/* read and throw away the closing '\0' */
|
||||
if (pqGetc(&id, conn))
|
||||
return;
|
||||
if (id != '\0')
|
||||
{
|
||||
snprintf(noticeWorkspace, sizeof(noticeWorkspace),
|
||||
libpq_gettext("unexpected character %c following empty query response (\"I\" message)\n"),
|
||||
id);
|
||||
DONOTICE(conn, noticeWorkspace);
|
||||
}
|
||||
if (conn->result == NULL)
|
||||
conn->result = PQmakeEmptyPGresult(conn,
|
||||
PGRES_EMPTY_QUERY);
|
||||
@ -996,11 +1026,6 @@ parseInput(PGconn *conn)
|
||||
if (pqGetInt(&(conn->be_key), 4, conn))
|
||||
return;
|
||||
break;
|
||||
case 'P': /* synchronous (normal) portal */
|
||||
if (pqGets(&conn->workBuffer, conn))
|
||||
return;
|
||||
/* We pretty much ignore this message type... */
|
||||
break;
|
||||
case 'T': /* row descriptions (start of query
|
||||
* results) */
|
||||
if (conn->result == NULL)
|
||||
@ -1034,9 +1059,8 @@ parseInput(PGconn *conn)
|
||||
snprintf(noticeWorkspace, sizeof(noticeWorkspace),
|
||||
libpq_gettext("server sent data (\"D\" message) without prior row description (\"T\" message)\n"));
|
||||
DONOTICE(conn, noticeWorkspace);
|
||||
/* Discard the unexpected message; good idea?? */
|
||||
conn->inStart = conn->inEnd;
|
||||
return;
|
||||
/* Discard the unexpected message */
|
||||
conn->inCursor += msgLength;
|
||||
}
|
||||
break;
|
||||
case 'B': /* Binary data tuple */
|
||||
@ -1051,16 +1075,36 @@ parseInput(PGconn *conn)
|
||||
snprintf(noticeWorkspace, sizeof(noticeWorkspace),
|
||||
libpq_gettext("server sent binary data (\"B\" message) without prior row description (\"T\" message)\n"));
|
||||
DONOTICE(conn, noticeWorkspace);
|
||||
/* Discard the unexpected message; good idea?? */
|
||||
conn->inStart = conn->inEnd;
|
||||
return;
|
||||
/* Discard the unexpected message */
|
||||
conn->inCursor += msgLength;
|
||||
}
|
||||
break;
|
||||
case 'G': /* Start Copy In */
|
||||
if (pqGetc(&conn->copy_is_binary, conn))
|
||||
return;
|
||||
conn->asyncStatus = PGASYNC_COPY_IN;
|
||||
break;
|
||||
case 'H': /* Start Copy Out */
|
||||
if (pqGetc(&conn->copy_is_binary, conn))
|
||||
return;
|
||||
conn->asyncStatus = PGASYNC_COPY_OUT;
|
||||
conn->copy_already_done = 0;
|
||||
break;
|
||||
case 'd': /* Copy Data */
|
||||
/*
|
||||
* If we see Copy Data, just silently drop it. This
|
||||
* would only occur if application exits COPY OUT mode
|
||||
* too early.
|
||||
*/
|
||||
conn->inCursor += msgLength;
|
||||
break;
|
||||
case 'c': /* Copy Done */
|
||||
/*
|
||||
* If we see Copy Done, just silently drop it. This
|
||||
* is the normal case during PQendcopy. We will keep
|
||||
* swallowing data, expecting to see command-complete
|
||||
* for the COPY command.
|
||||
*/
|
||||
break;
|
||||
default:
|
||||
printfPQExpBuffer(&conn->errorMessage,
|
||||
@ -1069,17 +1113,54 @@ parseInput(PGconn *conn)
|
||||
id);
|
||||
/* build an error result holding the error message */
|
||||
saveErrorResult(conn);
|
||||
/* Discard the unexpected message; good idea?? */
|
||||
conn->inStart = conn->inEnd;
|
||||
conn->asyncStatus = PGASYNC_READY;
|
||||
return;
|
||||
/* Discard the unexpected message */
|
||||
conn->inCursor += msgLength;
|
||||
break;
|
||||
} /* switch on protocol character */
|
||||
}
|
||||
/* Successfully consumed this message */
|
||||
conn->inStart = conn->inCursor;
|
||||
if (conn->inCursor == conn->inStart + 5 + msgLength)
|
||||
{
|
||||
/* Normal case: parsing agrees with specified length */
|
||||
conn->inStart = conn->inCursor;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Trouble --- report it */
|
||||
printfPQExpBuffer(&conn->errorMessage,
|
||||
libpq_gettext("Message contents do not agree with length in message type \"%c\"\n"),
|
||||
id);
|
||||
/* build an error result holding the error message */
|
||||
saveErrorResult(conn);
|
||||
conn->asyncStatus = PGASYNC_READY;
|
||||
/* trust the specified message length as what to skip */
|
||||
conn->inStart += 5 + msgLength;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* handleSyncLoss: clean up after loss of message-boundary sync
|
||||
*
|
||||
* There isn't really a lot we can do here except abandon the connection.
|
||||
*/
|
||||
static void
|
||||
handleSyncLoss(PGconn *conn, char id, int msgLength)
|
||||
{
|
||||
printfPQExpBuffer(&conn->errorMessage,
|
||||
libpq_gettext(
|
||||
"lost synchronization with server: got message type \"%c\", length %d\n"),
|
||||
id, msgLength);
|
||||
conn->status = CONNECTION_BAD; /* No more connection to backend */
|
||||
pqsecure_close(conn);
|
||||
#ifdef WIN32
|
||||
closesocket(conn->sock);
|
||||
#else
|
||||
close(conn->sock);
|
||||
#endif
|
||||
conn->sock = -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* parseInput subroutine to read a 'T' (row descriptions) message.
|
||||
@ -1100,7 +1181,7 @@ getRowDescriptions(PGconn *conn)
|
||||
|
||||
result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
|
||||
|
||||
/* parseInput already read the 'T' label. */
|
||||
/* parseInput already read the 'T' label and message length. */
|
||||
/* the next two bytes are the number of fields */
|
||||
if (pqGetInt(&(result->numAttributes), 2, conn))
|
||||
{
|
||||
@ -1461,7 +1542,7 @@ errout:
|
||||
/*
|
||||
* Attempt to read a Notice response message.
|
||||
* This is possible in several places, so we break it out as a subroutine.
|
||||
* Entry: 'N' flag character has already been consumed.
|
||||
* Entry: 'N' message type and length have already been consumed.
|
||||
* Exit: returns 0 if successfully consumed Notice message.
|
||||
* returns EOF if not enough data.
|
||||
*/
|
||||
@ -1489,7 +1570,7 @@ getNotice(PGconn *conn)
|
||||
/*
|
||||
* Attempt to read a Notify response message.
|
||||
* This is possible in several places, so we break it out as a subroutine.
|
||||
* Entry: 'A' flag character has already been consumed.
|
||||
* Entry: 'A' message type and length have already been consumed.
|
||||
* Exit: returns 0 if successfully consumed Notify message.
|
||||
* returns EOF if not enough data.
|
||||
*/
|
||||
@ -1511,10 +1592,18 @@ getNotify(PGconn *conn)
|
||||
*/
|
||||
newNotify = (PGnotify *) malloc(sizeof(PGnotify) +
|
||||
strlen(conn->workBuffer.data) +1);
|
||||
newNotify->relname = (char *) newNotify + sizeof(PGnotify);
|
||||
strcpy(newNotify->relname, conn->workBuffer.data);
|
||||
newNotify->be_pid = be_pid;
|
||||
DLAddTail(conn->notifyList, DLNewElem(newNotify));
|
||||
if (newNotify)
|
||||
{
|
||||
newNotify->relname = (char *) newNotify + sizeof(PGnotify);
|
||||
strcpy(newNotify->relname, conn->workBuffer.data);
|
||||
newNotify->be_pid = be_pid;
|
||||
DLAddTail(conn->notifyList, DLNewElem(newNotify));
|
||||
}
|
||||
|
||||
/* Swallow extra string (not presently used) */
|
||||
if (pqGets(&conn->workBuffer, conn))
|
||||
return EOF;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1556,6 +1645,9 @@ PQnotifies(PGconn *conn)
|
||||
* Chiefly here so that applications can use "COPY <rel> to stdout"
|
||||
* and read the output string. Returns a null-terminated string in s.
|
||||
*
|
||||
* XXX this routine is now deprecated, because it can't handle binary data.
|
||||
* If called during a COPY BINARY we return EOF.
|
||||
*
|
||||
* PQgetline reads up to maxlen-1 characters (like fgets(3)) but strips
|
||||
* the terminating \n (like gets(3)).
|
||||
*
|
||||
@ -1563,7 +1655,7 @@ PQnotifies(PGconn *conn)
|
||||
* (a line containing just "\.") when using this routine.
|
||||
*
|
||||
* RETURNS:
|
||||
* EOF if it is detected or invalid arguments are given
|
||||
* EOF if error (eg, invalid arguments are given)
|
||||
* 0 if EOL is reached (i.e., \n has been read)
|
||||
* (this is required for backward-compatibility -- this
|
||||
* routine used to always return EOF or 0, assuming that
|
||||
@ -1573,53 +1665,55 @@ PQnotifies(PGconn *conn)
|
||||
int
|
||||
PQgetline(PGconn *conn, char *s, int maxlen)
|
||||
{
|
||||
int result = 1; /* return value if buffer overflows */
|
||||
int status;
|
||||
|
||||
if (!s || maxlen <= 0)
|
||||
/* maxlen must be at least 3 to hold the \. terminator! */
|
||||
if (!conn || !s || maxlen < 3)
|
||||
return EOF;
|
||||
|
||||
if (!conn || conn->sock < 0)
|
||||
if (conn->sock < 0 ||
|
||||
conn->asyncStatus != PGASYNC_COPY_OUT ||
|
||||
conn->copy_is_binary)
|
||||
{
|
||||
printfPQExpBuffer(&conn->errorMessage,
|
||||
libpq_gettext("PQgetline: not doing text COPY OUT\n"));
|
||||
*s = '\0';
|
||||
return EOF;
|
||||
}
|
||||
|
||||
/*
|
||||
* Since this is a purely synchronous routine, we don't bother to
|
||||
* maintain conn->inCursor; there is no need to back up.
|
||||
*/
|
||||
while (maxlen > 1)
|
||||
while ((status = PQgetlineAsync(conn, s, maxlen-1)) == 0)
|
||||
{
|
||||
if (conn->inStart < conn->inEnd)
|
||||
/* need to load more data */
|
||||
if (pqWait(TRUE, FALSE, conn) ||
|
||||
pqReadData(conn) < 0)
|
||||
{
|
||||
char c = conn->inBuffer[conn->inStart++];
|
||||
|
||||
if (c == '\n')
|
||||
{
|
||||
result = 0; /* success exit */
|
||||
break;
|
||||
}
|
||||
*s++ = c;
|
||||
maxlen--;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* need to load more data */
|
||||
if (pqWait(TRUE, FALSE, conn) ||
|
||||
pqReadData(conn) < 0)
|
||||
{
|
||||
result = EOF;
|
||||
break;
|
||||
}
|
||||
*s = '\0';
|
||||
return EOF;
|
||||
}
|
||||
}
|
||||
*s = '\0';
|
||||
|
||||
return result;
|
||||
if (status < 0)
|
||||
{
|
||||
/* End of copy detected; gin up old-style terminator */
|
||||
strcpy(s, "\\.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Add null terminator, and strip trailing \n if present */
|
||||
if (s[status-1] == '\n')
|
||||
{
|
||||
s[status-1] = '\0';
|
||||
return 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
s[status] = '\0';
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* PQgetlineAsync - gets a newline-terminated string without blocking.
|
||||
* PQgetlineAsync - gets a COPY data row without blocking.
|
||||
*
|
||||
* This routine is for applications that want to do "COPY <rel> to stdout"
|
||||
* asynchronously, that is without blocking. Having issued the COPY command
|
||||
@ -1627,10 +1721,9 @@ PQgetline(PGconn *conn, char *s, int maxlen)
|
||||
* and this routine until the end-of-data signal is detected. Unlike
|
||||
* PQgetline, this routine takes responsibility for detecting end-of-data.
|
||||
*
|
||||
* On each call, PQgetlineAsync will return data if a complete newline-
|
||||
* terminated data line is available in libpq's input buffer, or if the
|
||||
* incoming data line is too long to fit in the buffer offered by the caller.
|
||||
* Otherwise, no data is returned until the rest of the line arrives.
|
||||
* On each call, PQgetlineAsync will return data if a complete data row
|
||||
* is available in libpq's input buffer. Otherwise, no data is returned
|
||||
* until the rest of the row arrives.
|
||||
*
|
||||
* If -1 is returned, the end-of-data signal has been recognized (and removed
|
||||
* from libpq's input buffer). The caller *must* next call PQendcopy and
|
||||
@ -1640,66 +1733,73 @@ PQgetline(PGconn *conn, char *s, int maxlen)
|
||||
* -1 if the end-of-copy-data marker has been recognized
|
||||
* 0 if no data is available
|
||||
* >0 the number of bytes returned.
|
||||
* The data returned will not extend beyond a newline character. If possible
|
||||
* a whole line will be returned at one time. But if the buffer offered by
|
||||
* the caller is too small to hold a line sent by the backend, then a partial
|
||||
* data line will be returned. This can be detected by testing whether the
|
||||
* last returned byte is '\n' or not.
|
||||
* The returned string is *not* null-terminated.
|
||||
*
|
||||
* The data returned will not extend beyond a data-row boundary. If possible
|
||||
* a whole row will be returned at one time. But if the buffer offered by
|
||||
* the caller is too small to hold a row sent by the backend, then a partial
|
||||
* data row will be returned. In text mode this can be detected by testing
|
||||
* whether the last returned byte is '\n' or not.
|
||||
*
|
||||
* The returned data is *not* null-terminated.
|
||||
*/
|
||||
|
||||
int
|
||||
PQgetlineAsync(PGconn *conn, char *buffer, int bufsize)
|
||||
{
|
||||
char id;
|
||||
int msgLength;
|
||||
int avail;
|
||||
|
||||
if (!conn || conn->asyncStatus != PGASYNC_COPY_OUT)
|
||||
return -1; /* we are not doing a copy... */
|
||||
|
||||
/*
|
||||
* Move data from libpq's buffer to the caller's. We want to accept
|
||||
* data only in units of whole lines, not partial lines. This ensures
|
||||
* that we can recognize the terminator line "\\.\n". (Otherwise, if
|
||||
* it happened to cross a packet/buffer boundary, we might hand the
|
||||
* first one or two characters off to the caller, which we shouldn't.)
|
||||
* Recognize the next input message. To make life simpler for async
|
||||
* callers, we keep returning 0 until the next message is fully available
|
||||
* even if it is not Copy Data. This should keep PQendcopy from blocking.
|
||||
*/
|
||||
|
||||
conn->inCursor = conn->inStart;
|
||||
|
||||
avail = bufsize;
|
||||
while (avail > 0 && conn->inCursor < conn->inEnd)
|
||||
{
|
||||
char c = conn->inBuffer[conn->inCursor++];
|
||||
|
||||
*buffer++ = c;
|
||||
--avail;
|
||||
if (c == '\n')
|
||||
{
|
||||
/* Got a complete line; mark the data removed from libpq */
|
||||
conn->inStart = conn->inCursor;
|
||||
/* Is it the endmarker line? */
|
||||
if (bufsize - avail == 3 && buffer[-3] == '\\' && buffer[-2] == '.')
|
||||
return -1;
|
||||
/* No, return the data line to the caller */
|
||||
return bufsize - avail;
|
||||
}
|
||||
}
|
||||
if (pqGetc(&id, conn))
|
||||
return 0;
|
||||
if (pqGetInt(&msgLength, 4, conn))
|
||||
return 0;
|
||||
avail = conn->inEnd - conn->inCursor;
|
||||
if (avail < msgLength - 4)
|
||||
return 0;
|
||||
|
||||
/*
|
||||
* We don't have a complete line. We'd prefer to leave it in libpq's
|
||||
* buffer until the rest arrives, but there is a special case: what if
|
||||
* the line is longer than the buffer the caller is offering us? In
|
||||
* that case we'd better hand over a partial line, else we'd get into
|
||||
* an infinite loop. Do this in a way that ensures we can't
|
||||
* misrecognize a terminator line later: leave last 3 characters in
|
||||
* libpq buffer.
|
||||
* Cannot proceed unless it's a Copy Data message. Anything else means
|
||||
* end of copy mode.
|
||||
*/
|
||||
if (avail == 0 && bufsize > 3)
|
||||
if (id != 'd')
|
||||
return -1;
|
||||
|
||||
/*
|
||||
* Move data from libpq's buffer to the caller's. In the case where
|
||||
* a prior call found the caller's buffer too small, we use
|
||||
* conn->copy_already_done to remember how much of the row was already
|
||||
* returned to the caller.
|
||||
*/
|
||||
conn->inCursor += conn->copy_already_done;
|
||||
avail = msgLength - 4 - conn->copy_already_done;
|
||||
if (avail <= bufsize)
|
||||
{
|
||||
conn->inStart = conn->inCursor - 3;
|
||||
return bufsize - 3;
|
||||
/* Able to consume the whole message */
|
||||
memcpy(buffer, &conn->inBuffer[conn->inCursor], avail);
|
||||
/* Mark message consumed */
|
||||
conn->inStart = conn->inCursor + avail;
|
||||
/* Reset state for next time */
|
||||
conn->copy_already_done = 0;
|
||||
return avail;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* We must return a partial message */
|
||||
memcpy(buffer, &conn->inBuffer[conn->inCursor], bufsize);
|
||||
/* The message is NOT consumed from libpq's buffer */
|
||||
conn->copy_already_done += bufsize;
|
||||
return bufsize;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1774,14 +1874,21 @@ PQendcopy(PGconn *conn)
|
||||
if (pqFlush(conn) && pqIsnonblocking(conn))
|
||||
return (1);
|
||||
|
||||
/* non blocking connections may have to abort at this point. */
|
||||
if (pqIsnonblocking(conn) && PQisBusy(conn))
|
||||
return (1);
|
||||
|
||||
/* Return to active duty */
|
||||
conn->asyncStatus = PGASYNC_BUSY;
|
||||
resetPQExpBuffer(&conn->errorMessage);
|
||||
|
||||
/*
|
||||
* Non blocking connections may have to abort at this point. If everyone
|
||||
* played the game there should be no problem, but in error scenarios
|
||||
* the expected messages may not have arrived yet. (We are assuming that
|
||||
* the backend's packetizing will ensure that CommandComplete arrives
|
||||
* along with the CopyDone; are there corner cases where that doesn't
|
||||
* happen?)
|
||||
*/
|
||||
if (pqIsnonblocking(conn) && PQisBusy(conn))
|
||||
return (1);
|
||||
|
||||
/* Wait for the completion response */
|
||||
result = PQgetResult(conn);
|
||||
|
||||
@ -1793,26 +1900,16 @@ PQendcopy(PGconn *conn)
|
||||
}
|
||||
|
||||
/*
|
||||
* Trouble. The worst case is that we've lost sync with the backend
|
||||
* entirely due to application screwup of the copy in/out protocol. To
|
||||
* recover, reset the connection (talk about using a sledgehammer...)
|
||||
* Trouble. For backwards-compatibility reasons, we issue the error
|
||||
* message as if it were a notice (would be nice to get rid of this
|
||||
* silliness, but too many apps probably don't handle errors from
|
||||
* PQendcopy reasonably). Note that the app can still obtain the
|
||||
* error status from the PGconn object.
|
||||
*/
|
||||
PQclear(result);
|
||||
|
||||
if (conn->errorMessage.len > 0)
|
||||
DONOTICE(conn, conn->errorMessage.data);
|
||||
|
||||
DONOTICE(conn, libpq_gettext("lost synchronization with server, resetting connection\n"));
|
||||
|
||||
/*
|
||||
* Users doing non-blocking connections need to handle the reset
|
||||
* themselves, they'll need to check the connection status if we
|
||||
* return an error.
|
||||
*/
|
||||
if (pqIsnonblocking(conn))
|
||||
PQresetStart(conn);
|
||||
else
|
||||
PQreset(conn);
|
||||
PQclear(result);
|
||||
|
||||
return 1;
|
||||
}
|
||||
@ -1853,6 +1950,8 @@ PQfn(PGconn *conn,
|
||||
bool needInput = false;
|
||||
ExecStatusType status = PGRES_FATAL_ERROR;
|
||||
char id;
|
||||
int msgLength;
|
||||
int avail;
|
||||
int i;
|
||||
|
||||
*actual_result_len = 0;
|
||||
@ -1927,11 +2026,55 @@ PQfn(PGconn *conn,
|
||||
* Scan the message. If we run out of data, loop around to try
|
||||
* again.
|
||||
*/
|
||||
conn->inCursor = conn->inStart;
|
||||
needInput = true;
|
||||
|
||||
conn->inCursor = conn->inStart;
|
||||
if (pqGetc(&id, conn))
|
||||
continue;
|
||||
if (pqGetInt(&msgLength, 4, conn))
|
||||
continue;
|
||||
|
||||
/*
|
||||
* Try to validate message type/length here. A length less than 4
|
||||
* is definitely broken. Large lengths should only be believed
|
||||
* for a few message types.
|
||||
*/
|
||||
if (msgLength < 4)
|
||||
{
|
||||
handleSyncLoss(conn, id, msgLength);
|
||||
break;
|
||||
}
|
||||
if (msgLength > 30000 &&
|
||||
!(id == 'T' || id == 'D' || id == 'B' || id == 'd' || id == 'V'))
|
||||
{
|
||||
handleSyncLoss(conn, id, msgLength);
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* Can't process if message body isn't all here yet.
|
||||
*/
|
||||
msgLength -= 4;
|
||||
avail = conn->inEnd - conn->inCursor;
|
||||
if (avail < msgLength)
|
||||
{
|
||||
/*
|
||||
* Before looping, enlarge the input buffer if needed to hold
|
||||
* the whole message. See notes in parseInput.
|
||||
*/
|
||||
if (pqCheckInBufferSpace(conn->inCursor + msgLength, conn))
|
||||
{
|
||||
/*
|
||||
* XXX add some better recovery code... plan is to skip
|
||||
* over the message using its length, then report an error.
|
||||
* For the moment, just treat this like loss of sync (which
|
||||
* indeed it might be!)
|
||||
*/
|
||||
handleSyncLoss(conn, id, msgLength);
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* We should see V or E response to the command, but might get N
|
||||
@ -1975,7 +2118,7 @@ PQfn(PGconn *conn,
|
||||
libpq_gettext("protocol error: id=0x%x\n"),
|
||||
id);
|
||||
saveErrorResult(conn);
|
||||
conn->inStart = conn->inCursor;
|
||||
conn->inStart += 5 + msgLength;
|
||||
return prepareAsyncResult(conn);
|
||||
}
|
||||
break;
|
||||
@ -1998,7 +2141,8 @@ PQfn(PGconn *conn,
|
||||
break;
|
||||
case 'Z': /* backend is ready for new query */
|
||||
/* consume the message and exit */
|
||||
conn->inStart = conn->inCursor;
|
||||
conn->inStart += 5 + msgLength;
|
||||
/* XXX expect additional fields here */
|
||||
/* if we saved a result object (probably an error), use it */
|
||||
if (conn->result)
|
||||
return prepareAsyncResult(conn);
|
||||
@ -2009,11 +2153,13 @@ PQfn(PGconn *conn,
|
||||
libpq_gettext("protocol error: id=0x%x\n"),
|
||||
id);
|
||||
saveErrorResult(conn);
|
||||
conn->inStart = conn->inCursor;
|
||||
/* trust the specified message length as what to skip */
|
||||
conn->inStart += 5 + msgLength;
|
||||
return prepareAsyncResult(conn);
|
||||
}
|
||||
/* Completed this message, keep going */
|
||||
conn->inStart = conn->inCursor;
|
||||
/* trust the specified message length as what to skip */
|
||||
conn->inStart += 5 + msgLength;
|
||||
needInput = false;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user