mirror of
https://github.com/postgres/postgres.git
synced 2025-07-30 11:03:19 +03:00
Libpq non-blocking mode, from Alfred Perlstein
This commit is contained in:
@ -7,12 +7,13 @@
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $Header: /cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v 1.86 1999/11/11 00:10:14 momjian Exp $
|
||||
* $Header: /cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v 1.87 2000/01/18 06:09:24 momjian Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include <errno.h>
|
||||
#include <ctype.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
#include "postgres.h"
|
||||
#include "libpq-fe.h"
|
||||
@ -24,7 +25,6 @@
|
||||
#include <unistd.h>
|
||||
#endif
|
||||
|
||||
|
||||
/* keep this in same order as ExecStatusType in libpq-fe.h */
|
||||
const char *const pgresStatus[] = {
|
||||
"PGRES_EMPTY_QUERY",
|
||||
@ -514,13 +514,53 @@ PQsendQuery(PGconn *conn, const char *query)
|
||||
conn->curTuple = NULL;
|
||||
|
||||
/* send the query to the backend; */
|
||||
/* the frontend-backend protocol uses 'Q' to designate queries */
|
||||
if (pqPutnchar("Q", 1, conn) ||
|
||||
pqPuts(query, conn) ||
|
||||
pqFlush(conn))
|
||||
|
||||
/*
|
||||
* in order to guarantee that we don't send a partial query
|
||||
* where we would become out of sync with the backend and/or
|
||||
* block during a non-blocking connection we must first flush
|
||||
* the send buffer before sending more data
|
||||
*
|
||||
* an alternative is to implement 'queue reservations' where
|
||||
* we are able to roll up a transaction
|
||||
* (the 'Q' along with our query) and make sure we have
|
||||
* enough space for it all in the send buffer.
|
||||
*/
|
||||
if (pqIsnonblocking(conn))
|
||||
{
|
||||
handleSendFailure(conn);
|
||||
return 0;
|
||||
/*
|
||||
* the buffer must have emptied completely before we allow
|
||||
* a new query to be buffered
|
||||
*/
|
||||
if (pqFlush(conn))
|
||||
return 0;
|
||||
/* 'Q' == queries */
|
||||
/* XXX: if we fail here we really ought to not block */
|
||||
if (pqPutnchar("Q", 1, conn) ||
|
||||
pqPuts(query, conn))
|
||||
{
|
||||
handleSendFailure(conn);
|
||||
return 0;
|
||||
}
|
||||
/*
|
||||
* give the data a push, ignore the return value as
|
||||
* ConsumeInput() will do any aditional flushing if needed
|
||||
*/
|
||||
(void) pqFlush(conn);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* the frontend-backend protocol uses 'Q' to
|
||||
* designate queries
|
||||
*/
|
||||
if (pqPutnchar("Q", 1, conn) ||
|
||||
pqPuts(query, conn) ||
|
||||
pqFlush(conn))
|
||||
{
|
||||
handleSendFailure(conn);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* OK, it's launched! */
|
||||
@ -574,7 +614,17 @@ PQconsumeInput(PGconn *conn)
|
||||
* we will NOT block waiting for more input.
|
||||
*/
|
||||
if (pqReadData(conn) < 0)
|
||||
{
|
||||
/*
|
||||
* for non-blocking connections
|
||||
* try to flush the send-queue otherwise we may never get a
|
||||
* responce for something that may not have already been sent
|
||||
* because it's in our write buffer!
|
||||
*/
|
||||
if (pqIsnonblocking(conn))
|
||||
(void) pqFlush(conn);
|
||||
return 0;
|
||||
}
|
||||
/* Parsing of the data waits till later. */
|
||||
return 1;
|
||||
}
|
||||
@ -1088,6 +1138,16 @@ PQexec(PGconn *conn, const char *query)
|
||||
{
|
||||
PGresult *result;
|
||||
PGresult *lastResult;
|
||||
bool savedblocking;
|
||||
|
||||
/*
|
||||
* we assume anyone calling PQexec wants blocking behaviour,
|
||||
* we force the blocking status of the connection to blocking
|
||||
* for the duration of this function and restore it on return
|
||||
*/
|
||||
savedblocking = pqIsnonblocking(conn);
|
||||
if (PQsetnonblocking(conn, FALSE) == -1)
|
||||
return NULL;
|
||||
|
||||
/*
|
||||
* Silently discard any prior query result that application didn't
|
||||
@ -1102,14 +1162,15 @@ PQexec(PGconn *conn, const char *query)
|
||||
PQclear(result);
|
||||
printfPQExpBuffer(&conn->errorMessage,
|
||||
"PQexec: you gotta get out of a COPY state yourself.\n");
|
||||
return NULL;
|
||||
/* restore blocking status */
|
||||
goto errout;
|
||||
}
|
||||
PQclear(result);
|
||||
}
|
||||
|
||||
/* OK to send the message */
|
||||
if (!PQsendQuery(conn, query))
|
||||
return NULL;
|
||||
goto errout; /* restore blocking status */
|
||||
|
||||
/*
|
||||
* For backwards compatibility, return the last result if there are
|
||||
@ -1142,7 +1203,15 @@ PQexec(PGconn *conn, const char *query)
|
||||
result->resultStatus == PGRES_COPY_OUT)
|
||||
break;
|
||||
}
|
||||
|
||||
if (PQsetnonblocking(conn, savedblocking) == -1)
|
||||
return NULL;
|
||||
return lastResult;
|
||||
|
||||
errout:
|
||||
if (PQsetnonblocking(conn, savedblocking) == -1)
|
||||
return NULL;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
@ -1432,7 +1501,16 @@ PQendcopy(PGconn *conn)
|
||||
return 1;
|
||||
}
|
||||
|
||||
(void) pqFlush(conn); /* make sure no data is waiting to be sent */
|
||||
/*
|
||||
* make sure no data is waiting to be sent,
|
||||
* abort if we are non-blocking and the flush fails
|
||||
*/
|
||||
if (pqFlush(conn) && pqIsnonblocking(conn))
|
||||
return (1);
|
||||
|
||||
/* non blocking connections may have to abort at this point. */
|
||||
if (pqIsnonblocking(conn) && PQisBusy(conn))
|
||||
return (1);
|
||||
|
||||
/* Return to active duty */
|
||||
conn->asyncStatus = PGASYNC_BUSY;
|
||||
@ -2026,3 +2104,89 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num)
|
||||
else
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* PQsetnonblocking:
|
||||
sets the PGconn's database connection non-blocking if the arg is TRUE
|
||||
or makes it non-blocking if the arg is FALSE, this will not protect
|
||||
you from PQexec(), you'll only be safe when using the non-blocking
|
||||
API
|
||||
Needs to be called only on a connected database connection.
|
||||
*/
|
||||
|
||||
int
|
||||
PQsetnonblocking(PGconn *conn, int arg)
|
||||
{
|
||||
int fcntlarg;
|
||||
|
||||
arg = (arg == TRUE) ? 1 : 0;
|
||||
/* early out if the socket is already in the state requested */
|
||||
if (arg == conn->nonblocking)
|
||||
return (0);
|
||||
|
||||
/*
|
||||
* to guarantee constancy for flushing/query/result-polling behavior
|
||||
* we need to flush the send queue at this point in order to guarantee
|
||||
* proper behavior.
|
||||
* this is ok because either they are making a transition
|
||||
* _from_ or _to_ blocking mode, either way we can block them.
|
||||
*/
|
||||
/* if we are going from blocking to non-blocking flush here */
|
||||
if (!pqIsnonblocking(conn) && pqFlush(conn))
|
||||
return (-1);
|
||||
|
||||
|
||||
#ifdef USE_SSL
|
||||
if (conn->ssl)
|
||||
{
|
||||
printfPQExpBuffer(&conn->errorMessage,
|
||||
"PQsetnonblocking() -- not supported when using SSL\n");
|
||||
return (-1);
|
||||
}
|
||||
#endif /* USE_SSL */
|
||||
|
||||
#ifndef WIN32
|
||||
fcntlarg = fcntl(conn->sock, F_GETFL, 0);
|
||||
if (fcntlarg == -1)
|
||||
return (-1);
|
||||
|
||||
if ((arg == TRUE &&
|
||||
fcntl(conn->sock, F_SETFL, fcntlarg | O_NONBLOCK) == -1) ||
|
||||
(arg == FALSE &&
|
||||
fcntl(conn->sock, F_SETFL, fcntlarg & ~O_NONBLOCK) == -1))
|
||||
#else
|
||||
fcntlarg = arg;
|
||||
if (ioctlsocket(conn->sock, FIONBIO, &fcntlarg) != 0)
|
||||
#endif
|
||||
{
|
||||
printfPQExpBuffer(&conn->errorMessage,
|
||||
"PQsetblocking() -- unable to set nonblocking status to %s\n",
|
||||
arg == TRUE ? "TRUE" : "FALSE");
|
||||
return (-1);
|
||||
}
|
||||
|
||||
conn->nonblocking = arg;
|
||||
|
||||
/* if we are going from non-blocking to blocking flush here */
|
||||
if (pqIsnonblocking(conn) && pqFlush(conn))
|
||||
return (-1);
|
||||
|
||||
return (0);
|
||||
}
|
||||
|
||||
/* return the blocking status of the database connection, TRUE == nonblocking,
|
||||
FALSE == blocking
|
||||
*/
|
||||
int
|
||||
PQisnonblocking(const PGconn *conn)
|
||||
{
|
||||
|
||||
return (pqIsnonblocking(conn));
|
||||
}
|
||||
|
||||
/* try to force data out, really only useful for non-blocking users */
|
||||
int
|
||||
PQflush(PGconn *conn)
|
||||
{
|
||||
|
||||
return (pqFlush(conn));
|
||||
}
|
||||
|
Reference in New Issue
Block a user