mirror of
https://github.com/postgres/postgres.git
synced 2025-06-22 02:52:08 +03:00
Add a "row processor" API to libpq for better handling of large results.
Traditionally libpq has collected an entire query result before passing it back to the application. That provides a simple and transactional API, but it's pretty inefficient for large result sets. This patch allows the application to process each row on-the-fly instead of accumulating the rows into the PGresult. Error recovery becomes a bit more complex, but often that tradeoff is well worth making. Kyotaro Horiguchi, reviewed by Marko Kreen and Tom Lane
This commit is contained in:
@ -5581,6 +5581,274 @@ defaultNoticeProcessor(void *arg, const char *message)
|
|||||||
|
|
||||||
</sect1>
|
</sect1>
|
||||||
|
|
||||||
|
<sect1 id="libpq-row-processor">
|
||||||
|
<title>Custom Row Processing</title>
|
||||||
|
|
||||||
|
<indexterm zone="libpq-row-processor">
|
||||||
|
<primary>PQrowProcessor</primary>
|
||||||
|
</indexterm>
|
||||||
|
|
||||||
|
<indexterm zone="libpq-row-processor">
|
||||||
|
<primary>row processor</primary>
|
||||||
|
<secondary>in libpq</secondary>
|
||||||
|
</indexterm>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
Ordinarily, when receiving a query result from the server,
|
||||||
|
<application>libpq</> adds each row value to the current
|
||||||
|
<type>PGresult</type> until the entire result set is received; then
|
||||||
|
the <type>PGresult</type> is returned to the application as a unit.
|
||||||
|
This approach is simple to work with, but becomes inefficient for large
|
||||||
|
result sets. To improve performance, an application can register a
|
||||||
|
custom <firstterm>row processor</> function that processes each row
|
||||||
|
as the data is received from the network. The custom row processor could
|
||||||
|
process the data fully, or store it into some application-specific data
|
||||||
|
structure for later processing.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<caution>
|
||||||
|
<para>
|
||||||
|
The row processor function sees the rows before it is known whether the
|
||||||
|
query will succeed overall, since the server might return some rows before
|
||||||
|
encountering an error. For proper transactional behavior, it must be
|
||||||
|
possible to discard or undo whatever the row processor has done, if the
|
||||||
|
query ultimately fails.
|
||||||
|
</para>
|
||||||
|
</caution>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
When using a custom row processor, row data is not accumulated into the
|
||||||
|
<type>PGresult</type>, so the <type>PGresult</type> ultimately delivered to
|
||||||
|
the application will contain no rows (<function>PQntuples</> =
|
||||||
|
<literal>0</>). However, it still has <function>PQresultStatus</> =
|
||||||
|
<literal>PGRES_TUPLES_OK</>, and it contains correct information about the
|
||||||
|
set of columns in the query result. On the other hand, if the query fails
|
||||||
|
partway through, the returned <type>PGresult</type> has
|
||||||
|
<function>PQresultStatus</> = <literal>PGRES_FATAL_ERROR</>. The
|
||||||
|
application must be prepared to undo any actions of the row processor
|
||||||
|
whenever it gets a <literal>PGRES_FATAL_ERROR</> result.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
A custom row processor is registered for a particular connection by
|
||||||
|
calling <function>PQsetRowProcessor</function>, described below.
|
||||||
|
This row processor will be used for all subsequent query results on that
|
||||||
|
connection until changed again. A row processor function must have a
|
||||||
|
signature matching
|
||||||
|
|
||||||
|
<synopsis>
|
||||||
|
typedef int (*PQrowProcessor) (PGresult *res, const PGdataValue *columns,
|
||||||
|
const char **errmsgp, void *param);
|
||||||
|
</synopsis>
|
||||||
|
where <type>PGdataValue</> is described by
|
||||||
|
<synopsis>
|
||||||
|
typedef struct pgDataValue
|
||||||
|
{
|
||||||
|
int len; /* data length in bytes, or <0 if NULL */
|
||||||
|
const char *value; /* data value, without zero-termination */
|
||||||
|
} PGdataValue;
|
||||||
|
</synopsis>
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
The <parameter>res</> parameter is the <literal>PGRES_TUPLES_OK</>
|
||||||
|
<type>PGresult</type> that will eventually be delivered to the calling
|
||||||
|
application (if no error intervenes). It contains information about
|
||||||
|
the set of columns in the query result, but no row data. In particular the
|
||||||
|
row processor must fetch <literal>PQnfields(res)</> to know the number of
|
||||||
|
data columns.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
Immediately after <application>libpq</> has determined the result set's
|
||||||
|
column information, it will make a call to the row processor with
|
||||||
|
<parameter>columns</parameter> set to NULL, but the other parameters as
|
||||||
|
usual. The row processor can use this call to initialize for a new result
|
||||||
|
set; if it has nothing to do, it can just return <literal>1</>. In
|
||||||
|
subsequent calls, one per received row, <parameter>columns</parameter>
|
||||||
|
is non-NULL and points to an array of <type>PGdataValue</> structs, one per
|
||||||
|
data column.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
<parameter>errmsgp</parameter> is an output parameter used only for error
|
||||||
|
reporting. If the row processor needs to report an error, it can set
|
||||||
|
<literal>*</><parameter>errmsgp</parameter> to point to a suitable message
|
||||||
|
string (and then return <literal>-1</>). As a special case, returning
|
||||||
|
<literal>-1</> without changing <literal>*</><parameter>errmsgp</parameter>
|
||||||
|
from its initial value of NULL is taken to mean <quote>out of memory</>.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
The last parameter, <parameter>param</parameter>, is just a void pointer
|
||||||
|
passed through from <function>PQsetRowProcessor</function>. This can be
|
||||||
|
used for communication between the row processor function and the
|
||||||
|
surrounding application.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
In the <type>PGdataValue</> array passed to a row processor, data values
|
||||||
|
cannot be assumed to be zero-terminated, whether the data format is text
|
||||||
|
or binary. A SQL NULL value is indicated by a negative length field.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
The row processor <emphasis>must</> process the row data values
|
||||||
|
immediately, or else copy them into application-controlled storage.
|
||||||
|
The value pointers passed to the row processor point into
|
||||||
|
<application>libpq</>'s internal data input buffer, which will be
|
||||||
|
overwritten by the next packet fetch.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
The row processor function must return either <literal>1</> or
|
||||||
|
<literal>-1</>.
|
||||||
|
<literal>1</> is the normal, successful result value; <application>libpq</>
|
||||||
|
will continue with receiving row values from the server and passing them to
|
||||||
|
the row processor. <literal>-1</> indicates that the row processor has
|
||||||
|
encountered an error. In that case,
|
||||||
|
<application>libpq</> will discard all remaining rows in the result set
|
||||||
|
and then return a <literal>PGRES_FATAL_ERROR</> <type>PGresult</type> to
|
||||||
|
the application (containing the specified error message, or <quote>out of
|
||||||
|
memory for query result</> if <literal>*</><parameter>errmsgp</parameter>
|
||||||
|
was left as NULL).
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
Another option for exiting a row processor is to throw an exception using
|
||||||
|
C's <function>longjmp()</> or C++'s <literal>throw</>. If this is done,
|
||||||
|
processing of the incoming data can be resumed later by calling
|
||||||
|
<function>PQgetResult</>; the row processor will be invoked as normal for
|
||||||
|
any remaining rows in the current result.
|
||||||
|
As with any usage of <function>PQgetResult</>, the application
|
||||||
|
should continue calling <function>PQgetResult</> until it gets a NULL
|
||||||
|
result before issuing any new query.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
In some cases, an exception may mean that the remainder of the
|
||||||
|
query result is not interesting. In such cases the application can discard
|
||||||
|
the remaining rows with <function>PQskipResult</>, described below.
|
||||||
|
Another possible recovery option is to close the connection altogether with
|
||||||
|
<function>PQfinish</>.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
<variablelist>
|
||||||
|
<varlistentry id="libpq-pqsetrowprocessor">
|
||||||
|
<term>
|
||||||
|
<function>PQsetRowProcessor</function>
|
||||||
|
<indexterm>
|
||||||
|
<primary>PQsetRowProcessor</primary>
|
||||||
|
</indexterm>
|
||||||
|
</term>
|
||||||
|
|
||||||
|
<listitem>
|
||||||
|
<para>
|
||||||
|
Sets a callback function to process each row.
|
||||||
|
|
||||||
|
<synopsis>
|
||||||
|
void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
|
||||||
|
</synopsis>
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
The specified row processor function <parameter>func</> is installed as
|
||||||
|
the active row processor for the given connection <parameter>conn</>.
|
||||||
|
Also, <parameter>param</> is installed as the passthrough pointer to
|
||||||
|
pass to it. Alternatively, if <parameter>func</> is NULL, the standard
|
||||||
|
row processor is reinstalled on the given connection (and
|
||||||
|
<parameter>param</> is ignored).
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
Although the row processor can be changed at any time in the life of a
|
||||||
|
connection, it's generally unwise to do so while a query is active.
|
||||||
|
In particular, when using asynchronous mode, be aware that both
|
||||||
|
<function>PQisBusy</> and <function>PQgetResult</> can call the current
|
||||||
|
row processor.
|
||||||
|
</para>
|
||||||
|
</listitem>
|
||||||
|
</varlistentry>
|
||||||
|
|
||||||
|
<varlistentry id="libpq-pqgetrowprocessor">
|
||||||
|
<term>
|
||||||
|
<function>PQgetRowProcessor</function>
|
||||||
|
<indexterm>
|
||||||
|
<primary>PQgetRowProcessor</primary>
|
||||||
|
</indexterm>
|
||||||
|
</term>
|
||||||
|
|
||||||
|
<listitem>
|
||||||
|
<para>
|
||||||
|
Fetches the current row processor for the specified connection.
|
||||||
|
|
||||||
|
<synopsis>
|
||||||
|
PQrowProcessor PQgetRowProcessor(const PGconn *conn, void **param);
|
||||||
|
</synopsis>
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
In addition to returning the row processor function pointer, the
|
||||||
|
current passthrough pointer will be returned at
|
||||||
|
<literal>*</><parameter>param</>, if <parameter>param</> is not NULL.
|
||||||
|
</para>
|
||||||
|
</listitem>
|
||||||
|
</varlistentry>
|
||||||
|
|
||||||
|
<varlistentry id="libpq-pqskipresult">
|
||||||
|
<term>
|
||||||
|
<function>PQskipResult</function>
|
||||||
|
<indexterm>
|
||||||
|
<primary>PQskipResult</primary>
|
||||||
|
</indexterm>
|
||||||
|
</term>
|
||||||
|
|
||||||
|
<listitem>
|
||||||
|
<para>
|
||||||
|
Discard all the remaining rows in the incoming result set.
|
||||||
|
|
||||||
|
<synopsis>
|
||||||
|
PGresult *PQskipResult(PGconn *conn);
|
||||||
|
</synopsis>
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
This is a simple convenience function to discard incoming data after a
|
||||||
|
row processor has failed or it's determined that the rest of the result
|
||||||
|
set is not interesting. <function>PQskipResult</> is exactly
|
||||||
|
equivalent to <function>PQgetResult</> except that it transiently
|
||||||
|
installs a dummy row processor function that just discards data.
|
||||||
|
The returned <type>PGresult</> can be discarded without further ado
|
||||||
|
if it has status <literal>PGRES_TUPLES_OK</>; but other status values
|
||||||
|
should be handled normally. (In particular,
|
||||||
|
<literal>PGRES_FATAL_ERROR</> indicates a server-reported error that
|
||||||
|
will still need to be dealt with.)
|
||||||
|
As when using <function>PQgetResult</>, one should usually repeat the
|
||||||
|
call until NULL is returned to ensure the connection has reached an
|
||||||
|
idle state. Another possible usage is to call
|
||||||
|
<function>PQskipResult</> just once, and then resume using
|
||||||
|
<function>PQgetResult</> to process subsequent result sets normally.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
Because <function>PQskipResult</> will wait for server input, it is not
|
||||||
|
very useful in asynchronous applications. In particular you should not
|
||||||
|
code a loop of <function>PQisBusy</> and <function>PQskipResult</>,
|
||||||
|
because that will result in the installed row processor being called
|
||||||
|
within <function>PQisBusy</>. To get the proper behavior in an
|
||||||
|
asynchronous application, you'll need to install a dummy row processor
|
||||||
|
(or set a flag to make your normal row processor do nothing) and leave
|
||||||
|
it that way until you have discarded all incoming data via your normal
|
||||||
|
<function>PQisBusy</> and <function>PQgetResult</> loop.
|
||||||
|
</para>
|
||||||
|
</listitem>
|
||||||
|
</varlistentry>
|
||||||
|
</variablelist>
|
||||||
|
</para>
|
||||||
|
|
||||||
|
</sect1>
|
||||||
|
|
||||||
<sect1 id="libpq-events">
|
<sect1 id="libpq-events">
|
||||||
<title>Event System</title>
|
<title>Event System</title>
|
||||||
|
|
||||||
|
@ -160,3 +160,6 @@ PQconnectStartParams 157
|
|||||||
PQping 158
|
PQping 158
|
||||||
PQpingParams 159
|
PQpingParams 159
|
||||||
PQlibVersion 160
|
PQlibVersion 160
|
||||||
|
PQsetRowProcessor 161
|
||||||
|
PQgetRowProcessor 162
|
||||||
|
PQskipResult 163
|
||||||
|
@ -2425,7 +2425,7 @@ keep_going: /* We will come back to here until there is
|
|||||||
conn->status = CONNECTION_AUTH_OK;
|
conn->status = CONNECTION_AUTH_OK;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Set asyncStatus so that PQsetResult will think that
|
* Set asyncStatus so that PQgetResult will think that
|
||||||
* what comes back next is the result of a query. See
|
* what comes back next is the result of a query. See
|
||||||
* below.
|
* below.
|
||||||
*/
|
*/
|
||||||
@ -2686,8 +2686,11 @@ makeEmptyPGconn(void)
|
|||||||
/* Zero all pointers and booleans */
|
/* Zero all pointers and booleans */
|
||||||
MemSet(conn, 0, sizeof(PGconn));
|
MemSet(conn, 0, sizeof(PGconn));
|
||||||
|
|
||||||
|
/* install default row processor and notice hooks */
|
||||||
|
PQsetRowProcessor(conn, NULL, NULL);
|
||||||
conn->noticeHooks.noticeRec = defaultNoticeReceiver;
|
conn->noticeHooks.noticeRec = defaultNoticeReceiver;
|
||||||
conn->noticeHooks.noticeProc = defaultNoticeProcessor;
|
conn->noticeHooks.noticeProc = defaultNoticeProcessor;
|
||||||
|
|
||||||
conn->status = CONNECTION_BAD;
|
conn->status = CONNECTION_BAD;
|
||||||
conn->asyncStatus = PGASYNC_IDLE;
|
conn->asyncStatus = PGASYNC_IDLE;
|
||||||
conn->xactStatus = PQTRANS_IDLE;
|
conn->xactStatus = PQTRANS_IDLE;
|
||||||
@ -2721,11 +2724,14 @@ makeEmptyPGconn(void)
|
|||||||
conn->inBuffer = (char *) malloc(conn->inBufSize);
|
conn->inBuffer = (char *) malloc(conn->inBufSize);
|
||||||
conn->outBufSize = 16 * 1024;
|
conn->outBufSize = 16 * 1024;
|
||||||
conn->outBuffer = (char *) malloc(conn->outBufSize);
|
conn->outBuffer = (char *) malloc(conn->outBufSize);
|
||||||
|
conn->rowBufLen = 32;
|
||||||
|
conn->rowBuf = (PGdataValue *) malloc(conn->rowBufLen * sizeof(PGdataValue));
|
||||||
initPQExpBuffer(&conn->errorMessage);
|
initPQExpBuffer(&conn->errorMessage);
|
||||||
initPQExpBuffer(&conn->workBuffer);
|
initPQExpBuffer(&conn->workBuffer);
|
||||||
|
|
||||||
if (conn->inBuffer == NULL ||
|
if (conn->inBuffer == NULL ||
|
||||||
conn->outBuffer == NULL ||
|
conn->outBuffer == NULL ||
|
||||||
|
conn->rowBuf == NULL ||
|
||||||
PQExpBufferBroken(&conn->errorMessage) ||
|
PQExpBufferBroken(&conn->errorMessage) ||
|
||||||
PQExpBufferBroken(&conn->workBuffer))
|
PQExpBufferBroken(&conn->workBuffer))
|
||||||
{
|
{
|
||||||
@ -2829,6 +2835,8 @@ freePGconn(PGconn *conn)
|
|||||||
free(conn->inBuffer);
|
free(conn->inBuffer);
|
||||||
if (conn->outBuffer)
|
if (conn->outBuffer)
|
||||||
free(conn->outBuffer);
|
free(conn->outBuffer);
|
||||||
|
if (conn->rowBuf)
|
||||||
|
free(conn->rowBuf);
|
||||||
termPQExpBuffer(&conn->errorMessage);
|
termPQExpBuffer(&conn->errorMessage);
|
||||||
termPQExpBuffer(&conn->workBuffer);
|
termPQExpBuffer(&conn->workBuffer);
|
||||||
|
|
||||||
@ -2888,7 +2896,7 @@ closePGconn(PGconn *conn)
|
|||||||
conn->status = CONNECTION_BAD; /* Well, not really _bad_ - just
|
conn->status = CONNECTION_BAD; /* Well, not really _bad_ - just
|
||||||
* absent */
|
* absent */
|
||||||
conn->asyncStatus = PGASYNC_IDLE;
|
conn->asyncStatus = PGASYNC_IDLE;
|
||||||
pqClearAsyncResult(conn); /* deallocate result and curTuple */
|
pqClearAsyncResult(conn); /* deallocate result */
|
||||||
pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist);
|
pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist);
|
||||||
conn->addrlist = NULL;
|
conn->addrlist = NULL;
|
||||||
conn->addr_cur = NULL;
|
conn->addr_cur = NULL;
|
||||||
|
@ -50,6 +50,9 @@ static bool static_std_strings = false;
|
|||||||
|
|
||||||
|
|
||||||
static PGEvent *dupEvents(PGEvent *events, int count);
|
static PGEvent *dupEvents(PGEvent *events, int count);
|
||||||
|
static bool pqAddTuple(PGresult *res, PGresAttValue *tup);
|
||||||
|
static int pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
|
||||||
|
const char **errmsgp, void *param);
|
||||||
static bool PQsendQueryStart(PGconn *conn);
|
static bool PQsendQueryStart(PGconn *conn);
|
||||||
static int PQsendQueryGuts(PGconn *conn,
|
static int PQsendQueryGuts(PGconn *conn,
|
||||||
const char *command,
|
const char *command,
|
||||||
@ -61,6 +64,8 @@ static int PQsendQueryGuts(PGconn *conn,
|
|||||||
const int *paramFormats,
|
const int *paramFormats,
|
||||||
int resultFormat);
|
int resultFormat);
|
||||||
static void parseInput(PGconn *conn);
|
static void parseInput(PGconn *conn);
|
||||||
|
static int dummyRowProcessor(PGresult *res, const PGdataValue *columns,
|
||||||
|
const char **errmsgp, void *param);
|
||||||
static bool PQexecStart(PGconn *conn);
|
static bool PQexecStart(PGconn *conn);
|
||||||
static PGresult *PQexecFinish(PGconn *conn);
|
static PGresult *PQexecFinish(PGconn *conn);
|
||||||
static int PQsendDescribe(PGconn *conn, char desc_type,
|
static int PQsendDescribe(PGconn *conn, char desc_type,
|
||||||
@ -694,14 +699,12 @@ PQclear(PGresult *res)
|
|||||||
/*
|
/*
|
||||||
* Handy subroutine to deallocate any partially constructed async result.
|
* Handy subroutine to deallocate any partially constructed async result.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
void
|
void
|
||||||
pqClearAsyncResult(PGconn *conn)
|
pqClearAsyncResult(PGconn *conn)
|
||||||
{
|
{
|
||||||
if (conn->result)
|
if (conn->result)
|
||||||
PQclear(conn->result);
|
PQclear(conn->result);
|
||||||
conn->result = NULL;
|
conn->result = NULL;
|
||||||
conn->curTuple = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -756,7 +759,6 @@ pqPrepareAsyncResult(PGconn *conn)
|
|||||||
*/
|
*/
|
||||||
res = conn->result;
|
res = conn->result;
|
||||||
conn->result = NULL; /* handing over ownership to caller */
|
conn->result = NULL; /* handing over ownership to caller */
|
||||||
conn->curTuple = NULL; /* just in case */
|
|
||||||
if (!res)
|
if (!res)
|
||||||
res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
|
res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
|
||||||
else
|
else
|
||||||
@ -832,7 +834,7 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
|
|||||||
* add a row pointer to the PGresult structure, growing it if necessary
|
* add a row pointer to the PGresult structure, growing it if necessary
|
||||||
* Returns TRUE if OK, FALSE if not enough memory to add the row
|
* Returns TRUE if OK, FALSE if not enough memory to add the row
|
||||||
*/
|
*/
|
||||||
int
|
static bool
|
||||||
pqAddTuple(PGresult *res, PGresAttValue *tup)
|
pqAddTuple(PGresult *res, PGresAttValue *tup)
|
||||||
{
|
{
|
||||||
if (res->ntups >= res->tupArrSize)
|
if (res->ntups >= res->tupArrSize)
|
||||||
@ -978,6 +980,124 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PQsetRowProcessor
|
||||||
|
* Set function that copies row data out from the network buffer,
|
||||||
|
* along with a passthrough parameter for it.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param)
|
||||||
|
{
|
||||||
|
if (!conn)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (func)
|
||||||
|
{
|
||||||
|
/* set custom row processor */
|
||||||
|
conn->rowProcessor = func;
|
||||||
|
conn->rowProcessorParam = param;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* set default row processor */
|
||||||
|
conn->rowProcessor = pqStdRowProcessor;
|
||||||
|
conn->rowProcessorParam = conn;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PQgetRowProcessor
|
||||||
|
* Get current row processor of PGconn.
|
||||||
|
* If param is not NULL, also store the passthrough parameter at *param.
|
||||||
|
*/
|
||||||
|
PQrowProcessor
|
||||||
|
PQgetRowProcessor(const PGconn *conn, void **param)
|
||||||
|
{
|
||||||
|
if (!conn)
|
||||||
|
{
|
||||||
|
if (param)
|
||||||
|
*param = NULL;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (param)
|
||||||
|
*param = conn->rowProcessorParam;
|
||||||
|
return conn->rowProcessor;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* pqStdRowProcessor
|
||||||
|
* Add the received row to the PGresult structure
|
||||||
|
* Returns 1 if OK, -1 if error occurred.
|
||||||
|
*
|
||||||
|
* Note: "param" should point to the PGconn, but we don't actually need that
|
||||||
|
* as of the current coding.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
|
||||||
|
const char **errmsgp, void *param)
|
||||||
|
{
|
||||||
|
int nfields = res->numAttributes;
|
||||||
|
PGresAttValue *tup;
|
||||||
|
int i;
|
||||||
|
|
||||||
|
if (columns == NULL)
|
||||||
|
{
|
||||||
|
/* New result set ... we have nothing to do in this function. */
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Basically we just allocate space in the PGresult for each field and
|
||||||
|
* copy the data over.
|
||||||
|
*
|
||||||
|
* Note: on malloc failure, we return -1 leaving *errmsgp still NULL,
|
||||||
|
* which caller will take to mean "out of memory". This is preferable to
|
||||||
|
* trying to set up such a message here, because evidently there's not
|
||||||
|
* enough memory for gettext() to do anything.
|
||||||
|
*/
|
||||||
|
tup = (PGresAttValue *)
|
||||||
|
pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
|
||||||
|
if (tup == NULL)
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
for (i = 0; i < nfields; i++)
|
||||||
|
{
|
||||||
|
int clen = columns[i].len;
|
||||||
|
|
||||||
|
if (clen < 0)
|
||||||
|
{
|
||||||
|
/* null field */
|
||||||
|
tup[i].len = NULL_LEN;
|
||||||
|
tup[i].value = res->null_field;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
bool isbinary = (res->attDescs[i].format != 0);
|
||||||
|
char *val;
|
||||||
|
|
||||||
|
val = (char *) pqResultAlloc(res, clen + 1, isbinary);
|
||||||
|
if (val == NULL)
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
/* copy and zero-terminate the data (even if it's binary) */
|
||||||
|
memcpy(val, columns[i].value, clen);
|
||||||
|
val[clen] = '\0';
|
||||||
|
|
||||||
|
tup[i].len = clen;
|
||||||
|
tup[i].value = val;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* And add the tuple to the PGresult's tuple array */
|
||||||
|
if (!pqAddTuple(res, tup))
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
/* Success */
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* PQsendQuery
|
* PQsendQuery
|
||||||
* Submit a query, but don't wait for it to finish
|
* Submit a query, but don't wait for it to finish
|
||||||
@ -1223,7 +1343,6 @@ PQsendQueryStart(PGconn *conn)
|
|||||||
|
|
||||||
/* initialize async result-accumulation state */
|
/* initialize async result-accumulation state */
|
||||||
conn->result = NULL;
|
conn->result = NULL;
|
||||||
conn->curTuple = NULL;
|
|
||||||
|
|
||||||
/* ready to send command message */
|
/* ready to send command message */
|
||||||
return true;
|
return true;
|
||||||
@ -1468,6 +1587,9 @@ PQconsumeInput(PGconn *conn)
|
|||||||
* parseInput: if appropriate, parse input data from backend
|
* parseInput: if appropriate, parse input data from backend
|
||||||
* until input is exhausted or a stopping state is reached.
|
* until input is exhausted or a stopping state is reached.
|
||||||
* Note that this function will NOT attempt to read more data from the backend.
|
* Note that this function will NOT attempt to read more data from the backend.
|
||||||
|
*
|
||||||
|
* Note: callers of parseInput must be prepared for a longjmp exit when we are
|
||||||
|
* in PGASYNC_BUSY state, since an external row processor might do that.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
parseInput(PGconn *conn)
|
parseInput(PGconn *conn)
|
||||||
@ -1615,6 +1737,49 @@ PQgetResult(PGconn *conn)
|
|||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PQskipResult
|
||||||
|
* Get the next PGresult produced by a query, but discard any data rows.
|
||||||
|
*
|
||||||
|
* This is mainly useful for cleaning up after a longjmp out of a row
|
||||||
|
* processor, when resuming processing of the current query result isn't
|
||||||
|
* wanted. Note that this is of little value in an async-style application,
|
||||||
|
* since any preceding calls to PQisBusy would have already called the regular
|
||||||
|
* row processor.
|
||||||
|
*/
|
||||||
|
PGresult *
|
||||||
|
PQskipResult(PGconn *conn)
|
||||||
|
{
|
||||||
|
PGresult *res;
|
||||||
|
PQrowProcessor savedRowProcessor;
|
||||||
|
|
||||||
|
if (!conn)
|
||||||
|
return NULL;
|
||||||
|
|
||||||
|
/* temporarily install dummy row processor */
|
||||||
|
savedRowProcessor = conn->rowProcessor;
|
||||||
|
conn->rowProcessor = dummyRowProcessor;
|
||||||
|
/* no need to save/change rowProcessorParam */
|
||||||
|
|
||||||
|
/* fetch the next result */
|
||||||
|
res = PQgetResult(conn);
|
||||||
|
|
||||||
|
/* restore previous row processor */
|
||||||
|
conn->rowProcessor = savedRowProcessor;
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Do-nothing row processor for PQskipResult
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
dummyRowProcessor(PGresult *res, const PGdataValue *columns,
|
||||||
|
const char **errmsgp, void *param)
|
||||||
|
{
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* PQexec
|
* PQexec
|
||||||
@ -1721,7 +1886,7 @@ PQexecStart(PGconn *conn)
|
|||||||
* Silently discard any prior query result that application didn't eat.
|
* Silently discard any prior query result that application didn't eat.
|
||||||
* This is probably poor design, but it's here for backward compatibility.
|
* This is probably poor design, but it's here for backward compatibility.
|
||||||
*/
|
*/
|
||||||
while ((result = PQgetResult(conn)) != NULL)
|
while ((result = PQskipResult(conn)) != NULL)
|
||||||
{
|
{
|
||||||
ExecStatusType resultStatus = result->resultStatus;
|
ExecStatusType resultStatus = result->resultStatus;
|
||||||
|
|
||||||
|
@ -40,9 +40,7 @@
|
|||||||
#define LO_BUFSIZE 8192
|
#define LO_BUFSIZE 8192
|
||||||
|
|
||||||
static int lo_initialize(PGconn *conn);
|
static int lo_initialize(PGconn *conn);
|
||||||
|
static Oid lo_import_internal(PGconn *conn, const char *filename, Oid oid);
|
||||||
static Oid
|
|
||||||
lo_import_internal(PGconn *conn, const char *filename, const Oid oid);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* lo_open
|
* lo_open
|
||||||
@ -59,7 +57,7 @@ lo_open(PGconn *conn, Oid lobjId, int mode)
|
|||||||
PQArgBlock argv[2];
|
PQArgBlock argv[2];
|
||||||
PGresult *res;
|
PGresult *res;
|
||||||
|
|
||||||
if (conn->lobjfuncs == NULL)
|
if (conn == NULL || conn->lobjfuncs == NULL)
|
||||||
{
|
{
|
||||||
if (lo_initialize(conn) < 0)
|
if (lo_initialize(conn) < 0)
|
||||||
return -1;
|
return -1;
|
||||||
@ -101,7 +99,7 @@ lo_close(PGconn *conn, int fd)
|
|||||||
int retval;
|
int retval;
|
||||||
int result_len;
|
int result_len;
|
||||||
|
|
||||||
if (conn->lobjfuncs == NULL)
|
if (conn == NULL || conn->lobjfuncs == NULL)
|
||||||
{
|
{
|
||||||
if (lo_initialize(conn) < 0)
|
if (lo_initialize(conn) < 0)
|
||||||
return -1;
|
return -1;
|
||||||
@ -139,7 +137,7 @@ lo_truncate(PGconn *conn, int fd, size_t len)
|
|||||||
int retval;
|
int retval;
|
||||||
int result_len;
|
int result_len;
|
||||||
|
|
||||||
if (conn->lobjfuncs == NULL)
|
if (conn == NULL || conn->lobjfuncs == NULL)
|
||||||
{
|
{
|
||||||
if (lo_initialize(conn) < 0)
|
if (lo_initialize(conn) < 0)
|
||||||
return -1;
|
return -1;
|
||||||
@ -192,7 +190,7 @@ lo_read(PGconn *conn, int fd, char *buf, size_t len)
|
|||||||
PGresult *res;
|
PGresult *res;
|
||||||
int result_len;
|
int result_len;
|
||||||
|
|
||||||
if (conn->lobjfuncs == NULL)
|
if (conn == NULL || conn->lobjfuncs == NULL)
|
||||||
{
|
{
|
||||||
if (lo_initialize(conn) < 0)
|
if (lo_initialize(conn) < 0)
|
||||||
return -1;
|
return -1;
|
||||||
@ -234,7 +232,7 @@ lo_write(PGconn *conn, int fd, const char *buf, size_t len)
|
|||||||
int result_len;
|
int result_len;
|
||||||
int retval;
|
int retval;
|
||||||
|
|
||||||
if (conn->lobjfuncs == NULL)
|
if (conn == NULL || conn->lobjfuncs == NULL)
|
||||||
{
|
{
|
||||||
if (lo_initialize(conn) < 0)
|
if (lo_initialize(conn) < 0)
|
||||||
return -1;
|
return -1;
|
||||||
@ -280,7 +278,7 @@ lo_lseek(PGconn *conn, int fd, int offset, int whence)
|
|||||||
int retval;
|
int retval;
|
||||||
int result_len;
|
int result_len;
|
||||||
|
|
||||||
if (conn->lobjfuncs == NULL)
|
if (conn == NULL || conn->lobjfuncs == NULL)
|
||||||
{
|
{
|
||||||
if (lo_initialize(conn) < 0)
|
if (lo_initialize(conn) < 0)
|
||||||
return -1;
|
return -1;
|
||||||
@ -328,7 +326,7 @@ lo_creat(PGconn *conn, int mode)
|
|||||||
int retval;
|
int retval;
|
||||||
int result_len;
|
int result_len;
|
||||||
|
|
||||||
if (conn->lobjfuncs == NULL)
|
if (conn == NULL || conn->lobjfuncs == NULL)
|
||||||
{
|
{
|
||||||
if (lo_initialize(conn) < 0)
|
if (lo_initialize(conn) < 0)
|
||||||
return InvalidOid;
|
return InvalidOid;
|
||||||
@ -367,7 +365,7 @@ lo_create(PGconn *conn, Oid lobjId)
|
|||||||
int retval;
|
int retval;
|
||||||
int result_len;
|
int result_len;
|
||||||
|
|
||||||
if (conn->lobjfuncs == NULL)
|
if (conn == NULL || conn->lobjfuncs == NULL)
|
||||||
{
|
{
|
||||||
if (lo_initialize(conn) < 0)
|
if (lo_initialize(conn) < 0)
|
||||||
return InvalidOid;
|
return InvalidOid;
|
||||||
@ -413,7 +411,7 @@ lo_tell(PGconn *conn, int fd)
|
|||||||
PGresult *res;
|
PGresult *res;
|
||||||
int result_len;
|
int result_len;
|
||||||
|
|
||||||
if (conn->lobjfuncs == NULL)
|
if (conn == NULL || conn->lobjfuncs == NULL)
|
||||||
{
|
{
|
||||||
if (lo_initialize(conn) < 0)
|
if (lo_initialize(conn) < 0)
|
||||||
return -1;
|
return -1;
|
||||||
@ -451,7 +449,7 @@ lo_unlink(PGconn *conn, Oid lobjId)
|
|||||||
int result_len;
|
int result_len;
|
||||||
int retval;
|
int retval;
|
||||||
|
|
||||||
if (conn->lobjfuncs == NULL)
|
if (conn == NULL || conn->lobjfuncs == NULL)
|
||||||
{
|
{
|
||||||
if (lo_initialize(conn) < 0)
|
if (lo_initialize(conn) < 0)
|
||||||
return -1;
|
return -1;
|
||||||
@ -505,7 +503,7 @@ lo_import_with_oid(PGconn *conn, const char *filename, Oid lobjId)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static Oid
|
static Oid
|
||||||
lo_import_internal(PGconn *conn, const char *filename, const Oid oid)
|
lo_import_internal(PGconn *conn, const char *filename, Oid oid)
|
||||||
{
|
{
|
||||||
int fd;
|
int fd;
|
||||||
int nbytes,
|
int nbytes,
|
||||||
@ -684,8 +682,13 @@ lo_initialize(PGconn *conn)
|
|||||||
int n;
|
int n;
|
||||||
const char *query;
|
const char *query;
|
||||||
const char *fname;
|
const char *fname;
|
||||||
|
PQrowProcessor savedRowProcessor;
|
||||||
|
void *savedRowProcessorParam;
|
||||||
Oid foid;
|
Oid foid;
|
||||||
|
|
||||||
|
if (!conn)
|
||||||
|
return -1;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Allocate the structure to hold the functions OID's
|
* Allocate the structure to hold the functions OID's
|
||||||
*/
|
*/
|
||||||
@ -729,7 +732,16 @@ lo_initialize(PGconn *conn)
|
|||||||
"or proname = 'loread' "
|
"or proname = 'loread' "
|
||||||
"or proname = 'lowrite'";
|
"or proname = 'lowrite'";
|
||||||
|
|
||||||
|
/* Ensure the standard row processor is used to collect the result */
|
||||||
|
savedRowProcessor = conn->rowProcessor;
|
||||||
|
savedRowProcessorParam = conn->rowProcessorParam;
|
||||||
|
PQsetRowProcessor(conn, NULL, NULL);
|
||||||
|
|
||||||
res = PQexec(conn, query);
|
res = PQexec(conn, query);
|
||||||
|
|
||||||
|
conn->rowProcessor = savedRowProcessor;
|
||||||
|
conn->rowProcessorParam = savedRowProcessorParam;
|
||||||
|
|
||||||
if (res == NULL)
|
if (res == NULL)
|
||||||
{
|
{
|
||||||
free(lobjfuncs);
|
free(lobjfuncs);
|
||||||
|
@ -218,6 +218,32 @@ pqGetnchar(char *s, size_t len, PGconn *conn)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* pqSkipnchar:
|
||||||
|
* skip over len bytes in input buffer.
|
||||||
|
*
|
||||||
|
* Note: this is primarily useful for its debug output, which should
|
||||||
|
* be exactly the same as for pqGetnchar. We assume the data in question
|
||||||
|
* will actually be used, but just isn't getting copied anywhere as yet.
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
pqSkipnchar(size_t len, PGconn *conn)
|
||||||
|
{
|
||||||
|
if (len > (size_t) (conn->inEnd - conn->inCursor))
|
||||||
|
return EOF;
|
||||||
|
|
||||||
|
if (conn->Pfdebug)
|
||||||
|
{
|
||||||
|
fprintf(conn->Pfdebug, "From backend (%lu)> ", (unsigned long) len);
|
||||||
|
fputnbytes(conn->Pfdebug, conn->inBuffer + conn->inCursor, len);
|
||||||
|
fprintf(conn->Pfdebug, "\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
conn->inCursor += len;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* pqPutnchar:
|
* pqPutnchar:
|
||||||
* write exactly len bytes to the current message
|
* write exactly len bytes to the current message
|
||||||
|
@ -49,11 +49,19 @@ static int getNotify(PGconn *conn);
|
|||||||
PostgresPollingStatusType
|
PostgresPollingStatusType
|
||||||
pqSetenvPoll(PGconn *conn)
|
pqSetenvPoll(PGconn *conn)
|
||||||
{
|
{
|
||||||
|
PostgresPollingStatusType result;
|
||||||
PGresult *res;
|
PGresult *res;
|
||||||
|
PQrowProcessor savedRowProcessor;
|
||||||
|
void *savedRowProcessorParam;
|
||||||
|
|
||||||
if (conn == NULL || conn->status == CONNECTION_BAD)
|
if (conn == NULL || conn->status == CONNECTION_BAD)
|
||||||
return PGRES_POLLING_FAILED;
|
return PGRES_POLLING_FAILED;
|
||||||
|
|
||||||
|
/* Ensure the standard row processor is used to collect any results */
|
||||||
|
savedRowProcessor = conn->rowProcessor;
|
||||||
|
savedRowProcessorParam = conn->rowProcessorParam;
|
||||||
|
PQsetRowProcessor(conn, NULL, NULL);
|
||||||
|
|
||||||
/* Check whether there are any data for us */
|
/* Check whether there are any data for us */
|
||||||
switch (conn->setenv_state)
|
switch (conn->setenv_state)
|
||||||
{
|
{
|
||||||
@ -69,7 +77,10 @@ pqSetenvPoll(PGconn *conn)
|
|||||||
if (n < 0)
|
if (n < 0)
|
||||||
goto error_return;
|
goto error_return;
|
||||||
if (n == 0)
|
if (n == 0)
|
||||||
return PGRES_POLLING_READING;
|
{
|
||||||
|
result = PGRES_POLLING_READING;
|
||||||
|
goto normal_return;
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -83,7 +94,8 @@ pqSetenvPoll(PGconn *conn)
|
|||||||
|
|
||||||
/* Should we raise an error if called when not active? */
|
/* Should we raise an error if called when not active? */
|
||||||
case SETENV_STATE_IDLE:
|
case SETENV_STATE_IDLE:
|
||||||
return PGRES_POLLING_OK;
|
result = PGRES_POLLING_OK;
|
||||||
|
goto normal_return;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
printfPQExpBuffer(&conn->errorMessage,
|
printfPQExpBuffer(&conn->errorMessage,
|
||||||
@ -180,7 +192,10 @@ pqSetenvPoll(PGconn *conn)
|
|||||||
case SETENV_STATE_CLIENT_ENCODING_WAIT:
|
case SETENV_STATE_CLIENT_ENCODING_WAIT:
|
||||||
{
|
{
|
||||||
if (PQisBusy(conn))
|
if (PQisBusy(conn))
|
||||||
return PGRES_POLLING_READING;
|
{
|
||||||
|
result = PGRES_POLLING_READING;
|
||||||
|
goto normal_return;
|
||||||
|
}
|
||||||
|
|
||||||
res = PQgetResult(conn);
|
res = PQgetResult(conn);
|
||||||
|
|
||||||
@ -205,7 +220,10 @@ pqSetenvPoll(PGconn *conn)
|
|||||||
case SETENV_STATE_OPTION_WAIT:
|
case SETENV_STATE_OPTION_WAIT:
|
||||||
{
|
{
|
||||||
if (PQisBusy(conn))
|
if (PQisBusy(conn))
|
||||||
return PGRES_POLLING_READING;
|
{
|
||||||
|
result = PGRES_POLLING_READING;
|
||||||
|
goto normal_return;
|
||||||
|
}
|
||||||
|
|
||||||
res = PQgetResult(conn);
|
res = PQgetResult(conn);
|
||||||
|
|
||||||
@ -244,13 +262,17 @@ pqSetenvPoll(PGconn *conn)
|
|||||||
goto error_return;
|
goto error_return;
|
||||||
|
|
||||||
conn->setenv_state = SETENV_STATE_QUERY1_WAIT;
|
conn->setenv_state = SETENV_STATE_QUERY1_WAIT;
|
||||||
return PGRES_POLLING_READING;
|
result = PGRES_POLLING_READING;
|
||||||
|
goto normal_return;
|
||||||
}
|
}
|
||||||
|
|
||||||
case SETENV_STATE_QUERY1_WAIT:
|
case SETENV_STATE_QUERY1_WAIT:
|
||||||
{
|
{
|
||||||
if (PQisBusy(conn))
|
if (PQisBusy(conn))
|
||||||
return PGRES_POLLING_READING;
|
{
|
||||||
|
result = PGRES_POLLING_READING;
|
||||||
|
goto normal_return;
|
||||||
|
}
|
||||||
|
|
||||||
res = PQgetResult(conn);
|
res = PQgetResult(conn);
|
||||||
|
|
||||||
@ -327,13 +349,17 @@ pqSetenvPoll(PGconn *conn)
|
|||||||
goto error_return;
|
goto error_return;
|
||||||
|
|
||||||
conn->setenv_state = SETENV_STATE_QUERY2_WAIT;
|
conn->setenv_state = SETENV_STATE_QUERY2_WAIT;
|
||||||
return PGRES_POLLING_READING;
|
result = PGRES_POLLING_READING;
|
||||||
|
goto normal_return;
|
||||||
}
|
}
|
||||||
|
|
||||||
case SETENV_STATE_QUERY2_WAIT:
|
case SETENV_STATE_QUERY2_WAIT:
|
||||||
{
|
{
|
||||||
if (PQisBusy(conn))
|
if (PQisBusy(conn))
|
||||||
return PGRES_POLLING_READING;
|
{
|
||||||
|
result = PGRES_POLLING_READING;
|
||||||
|
goto normal_return;
|
||||||
|
}
|
||||||
|
|
||||||
res = PQgetResult(conn);
|
res = PQgetResult(conn);
|
||||||
|
|
||||||
@ -380,7 +406,8 @@ pqSetenvPoll(PGconn *conn)
|
|||||||
{
|
{
|
||||||
/* Query finished, so we're done */
|
/* Query finished, so we're done */
|
||||||
conn->setenv_state = SETENV_STATE_IDLE;
|
conn->setenv_state = SETENV_STATE_IDLE;
|
||||||
return PGRES_POLLING_OK;
|
result = PGRES_POLLING_OK;
|
||||||
|
goto normal_return;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -398,7 +425,12 @@ pqSetenvPoll(PGconn *conn)
|
|||||||
|
|
||||||
error_return:
|
error_return:
|
||||||
conn->setenv_state = SETENV_STATE_IDLE;
|
conn->setenv_state = SETENV_STATE_IDLE;
|
||||||
return PGRES_POLLING_FAILED;
|
result = PGRES_POLLING_FAILED;
|
||||||
|
|
||||||
|
normal_return:
|
||||||
|
conn->rowProcessor = savedRowProcessor;
|
||||||
|
conn->rowProcessorParam = savedRowProcessorParam;
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -406,6 +438,9 @@ error_return:
|
|||||||
* parseInput: if appropriate, parse input data from backend
|
* parseInput: if appropriate, parse input data from backend
|
||||||
* until input is exhausted or a stopping state is reached.
|
* until input is exhausted or a stopping state is reached.
|
||||||
* Note that this function will NOT attempt to read more data from the backend.
|
* Note that this function will NOT attempt to read more data from the backend.
|
||||||
|
*
|
||||||
|
* Note: callers of parseInput must be prepared for a longjmp exit when we are
|
||||||
|
* in PGASYNC_BUSY state, since an external row processor might do that.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
pqParseInput2(PGconn *conn)
|
pqParseInput2(PGconn *conn)
|
||||||
@ -549,6 +584,8 @@ pqParseInput2(PGconn *conn)
|
|||||||
/* First 'T' in a query sequence */
|
/* First 'T' in a query sequence */
|
||||||
if (getRowDescriptions(conn))
|
if (getRowDescriptions(conn))
|
||||||
return;
|
return;
|
||||||
|
/* getRowDescriptions() moves inStart itself */
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -569,6 +606,8 @@ pqParseInput2(PGconn *conn)
|
|||||||
/* Read another tuple of a normal query response */
|
/* Read another tuple of a normal query response */
|
||||||
if (getAnotherTuple(conn, FALSE))
|
if (getAnotherTuple(conn, FALSE))
|
||||||
return;
|
return;
|
||||||
|
/* getAnotherTuple() moves inStart itself */
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -585,6 +624,8 @@ pqParseInput2(PGconn *conn)
|
|||||||
/* Read another tuple of a normal query response */
|
/* Read another tuple of a normal query response */
|
||||||
if (getAnotherTuple(conn, TRUE))
|
if (getAnotherTuple(conn, TRUE))
|
||||||
return;
|
return;
|
||||||
|
/* getAnotherTuple() moves inStart itself */
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -627,27 +668,32 @@ pqParseInput2(PGconn *conn)
|
|||||||
/*
|
/*
|
||||||
* parseInput subroutine to read a 'T' (row descriptions) message.
|
* parseInput subroutine to read a 'T' (row descriptions) message.
|
||||||
* We build a PGresult structure containing the attribute data.
|
* We build a PGresult structure containing the attribute data.
|
||||||
* Returns: 0 if completed message, EOF if not enough data yet.
|
* Returns: 0 if completed message, EOF if error or not enough data
|
||||||
|
* received yet.
|
||||||
*
|
*
|
||||||
* Note that if we run out of data, we have to release the partially
|
* Note that if we run out of data, we have to suspend and reprocess
|
||||||
* constructed PGresult, and rebuild it again next time. Fortunately,
|
* the message after more data is received. Otherwise, conn->inStart
|
||||||
* that shouldn't happen often, since 'T' messages usually fit in a packet.
|
* must get advanced past the processed data.
|
||||||
*/
|
*/
|
||||||
static int
|
static int
|
||||||
getRowDescriptions(PGconn *conn)
|
getRowDescriptions(PGconn *conn)
|
||||||
{
|
{
|
||||||
PGresult *result = NULL;
|
PGresult *result;
|
||||||
int nfields;
|
int nfields;
|
||||||
|
const char *errmsg;
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
|
result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
|
||||||
if (!result)
|
if (!result)
|
||||||
goto failure;
|
{
|
||||||
|
errmsg = NULL; /* means "out of memory", see below */
|
||||||
|
goto advance_and_error;
|
||||||
|
}
|
||||||
|
|
||||||
/* parseInput already read the 'T' label. */
|
/* parseInput already read the 'T' label. */
|
||||||
/* the next two bytes are the number of fields */
|
/* the next two bytes are the number of fields */
|
||||||
if (pqGetInt(&(result->numAttributes), 2, conn))
|
if (pqGetInt(&(result->numAttributes), 2, conn))
|
||||||
goto failure;
|
goto EOFexit;
|
||||||
nfields = result->numAttributes;
|
nfields = result->numAttributes;
|
||||||
|
|
||||||
/* allocate space for the attribute descriptors */
|
/* allocate space for the attribute descriptors */
|
||||||
@ -656,7 +702,10 @@ getRowDescriptions(PGconn *conn)
|
|||||||
result->attDescs = (PGresAttDesc *)
|
result->attDescs = (PGresAttDesc *)
|
||||||
pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
|
pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
|
||||||
if (!result->attDescs)
|
if (!result->attDescs)
|
||||||
goto failure;
|
{
|
||||||
|
errmsg = NULL; /* means "out of memory", see below */
|
||||||
|
goto advance_and_error;
|
||||||
|
}
|
||||||
MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
|
MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -671,7 +720,7 @@ getRowDescriptions(PGconn *conn)
|
|||||||
pqGetInt(&typid, 4, conn) ||
|
pqGetInt(&typid, 4, conn) ||
|
||||||
pqGetInt(&typlen, 2, conn) ||
|
pqGetInt(&typlen, 2, conn) ||
|
||||||
pqGetInt(&atttypmod, 4, conn))
|
pqGetInt(&atttypmod, 4, conn))
|
||||||
goto failure;
|
goto EOFexit;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Since pqGetInt treats 2-byte integers as unsigned, we need to
|
* Since pqGetInt treats 2-byte integers as unsigned, we need to
|
||||||
@ -682,7 +731,10 @@ getRowDescriptions(PGconn *conn)
|
|||||||
result->attDescs[i].name = pqResultStrdup(result,
|
result->attDescs[i].name = pqResultStrdup(result,
|
||||||
conn->workBuffer.data);
|
conn->workBuffer.data);
|
||||||
if (!result->attDescs[i].name)
|
if (!result->attDescs[i].name)
|
||||||
goto failure;
|
{
|
||||||
|
errmsg = NULL; /* means "out of memory", see below */
|
||||||
|
goto advance_and_error;
|
||||||
|
}
|
||||||
result->attDescs[i].tableid = 0;
|
result->attDescs[i].tableid = 0;
|
||||||
result->attDescs[i].columnid = 0;
|
result->attDescs[i].columnid = 0;
|
||||||
result->attDescs[i].format = 0;
|
result->attDescs[i].format = 0;
|
||||||
@ -693,30 +745,90 @@ getRowDescriptions(PGconn *conn)
|
|||||||
|
|
||||||
/* Success! */
|
/* Success! */
|
||||||
conn->result = result;
|
conn->result = result;
|
||||||
return 0;
|
|
||||||
|
|
||||||
failure:
|
/*
|
||||||
if (result)
|
* Advance inStart to show that the "T" message has been processed. We
|
||||||
|
* must do this before calling the row processor, in case it longjmps.
|
||||||
|
*/
|
||||||
|
conn->inStart = conn->inCursor;
|
||||||
|
|
||||||
|
/* Give the row processor a chance to initialize for new result set */
|
||||||
|
errmsg = NULL;
|
||||||
|
switch ((*conn->rowProcessor) (result, NULL, &errmsg,
|
||||||
|
conn->rowProcessorParam))
|
||||||
|
{
|
||||||
|
case 1:
|
||||||
|
/* everything is good */
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
case -1:
|
||||||
|
/* error, report the errmsg below */
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
/* unrecognized return code */
|
||||||
|
errmsg = libpq_gettext("unrecognized return value from row processor");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
goto set_error_result;
|
||||||
|
|
||||||
|
advance_and_error:
|
||||||
|
/*
|
||||||
|
* Discard the failed message. Unfortunately we don't know for sure
|
||||||
|
* where the end is, so just throw away everything in the input buffer.
|
||||||
|
* This is not very desirable but it's the best we can do in protocol v2.
|
||||||
|
*/
|
||||||
|
conn->inStart = conn->inEnd;
|
||||||
|
|
||||||
|
set_error_result:
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Replace partially constructed result with an error result. First
|
||||||
|
* discard the old result to try to win back some memory.
|
||||||
|
*/
|
||||||
|
pqClearAsyncResult(conn);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If row processor didn't provide an error message, assume "out of
|
||||||
|
* memory" was meant. The advantage of having this special case is that
|
||||||
|
* freeing the old result first greatly improves the odds that gettext()
|
||||||
|
* will succeed in providing a translation.
|
||||||
|
*/
|
||||||
|
if (!errmsg)
|
||||||
|
errmsg = libpq_gettext("out of memory for query result");
|
||||||
|
|
||||||
|
printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
|
||||||
|
* do to recover...
|
||||||
|
*/
|
||||||
|
conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
|
||||||
|
conn->asyncStatus = PGASYNC_READY;
|
||||||
|
|
||||||
|
EOFexit:
|
||||||
|
if (result && result != conn->result)
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
return EOF;
|
return EOF;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* parseInput subroutine to read a 'B' or 'D' (row data) message.
|
* parseInput subroutine to read a 'B' or 'D' (row data) message.
|
||||||
* We add another tuple to the existing PGresult structure.
|
* We fill rowbuf with column pointers and then call the row processor.
|
||||||
* Returns: 0 if completed message, EOF if error or not enough data yet.
|
* Returns: 0 if completed message, EOF if error or not enough data
|
||||||
|
* received yet.
|
||||||
*
|
*
|
||||||
* Note that if we run out of data, we have to suspend and reprocess
|
* Note that if we run out of data, we have to suspend and reprocess
|
||||||
* the message after more data is received. We keep a partially constructed
|
* the message after more data is received. Otherwise, conn->inStart
|
||||||
* tuple in conn->curTuple, and avoid reallocating already-allocated storage.
|
* must get advanced past the processed data.
|
||||||
*/
|
*/
|
||||||
static int
|
static int
|
||||||
getAnotherTuple(PGconn *conn, bool binary)
|
getAnotherTuple(PGconn *conn, bool binary)
|
||||||
{
|
{
|
||||||
PGresult *result = conn->result;
|
PGresult *result = conn->result;
|
||||||
int nfields = result->numAttributes;
|
int nfields = result->numAttributes;
|
||||||
PGresAttValue *tup;
|
const char *errmsg;
|
||||||
|
PGdataValue *rowbuf;
|
||||||
/* the backend sends us a bitmap of which attributes are null */
|
/* the backend sends us a bitmap of which attributes are null */
|
||||||
char std_bitmap[64]; /* used unless it doesn't fit */
|
char std_bitmap[64]; /* used unless it doesn't fit */
|
||||||
char *bitmap = std_bitmap;
|
char *bitmap = std_bitmap;
|
||||||
@ -727,28 +839,33 @@ getAnotherTuple(PGconn *conn, bool binary)
|
|||||||
int bitcnt; /* number of bits examined in current byte */
|
int bitcnt; /* number of bits examined in current byte */
|
||||||
int vlen; /* length of the current field value */
|
int vlen; /* length of the current field value */
|
||||||
|
|
||||||
|
/* Resize row buffer if needed */
|
||||||
|
rowbuf = conn->rowBuf;
|
||||||
|
if (nfields > conn->rowBufLen)
|
||||||
|
{
|
||||||
|
rowbuf = (PGdataValue *) realloc(rowbuf,
|
||||||
|
nfields * sizeof(PGdataValue));
|
||||||
|
if (!rowbuf)
|
||||||
|
{
|
||||||
|
errmsg = NULL; /* means "out of memory", see below */
|
||||||
|
goto advance_and_error;
|
||||||
|
}
|
||||||
|
conn->rowBuf = rowbuf;
|
||||||
|
conn->rowBufLen = nfields;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Save format specifier */
|
||||||
result->binary = binary;
|
result->binary = binary;
|
||||||
|
|
||||||
/* Allocate tuple space if first time for this data message */
|
/*
|
||||||
if (conn->curTuple == NULL)
|
* If it's binary, fix the column format indicators. We assume the
|
||||||
|
* backend will consistently send either B or D, not a mix.
|
||||||
|
*/
|
||||||
|
if (binary)
|
||||||
{
|
{
|
||||||
conn->curTuple = (PGresAttValue *)
|
for (i = 0; i < nfields; i++)
|
||||||
pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
|
result->attDescs[i].format = 1;
|
||||||
if (conn->curTuple == NULL)
|
|
||||||
goto outOfMemory;
|
|
||||||
MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If it's binary, fix the column format indicators. We assume the
|
|
||||||
* backend will consistently send either B or D, not a mix.
|
|
||||||
*/
|
|
||||||
if (binary)
|
|
||||||
{
|
|
||||||
for (i = 0; i < nfields; i++)
|
|
||||||
result->attDescs[i].format = 1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
tup = conn->curTuple;
|
|
||||||
|
|
||||||
/* Get the null-value bitmap */
|
/* Get the null-value bitmap */
|
||||||
nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
|
nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
|
||||||
@ -757,7 +874,10 @@ getAnotherTuple(PGconn *conn, bool binary)
|
|||||||
{
|
{
|
||||||
bitmap = (char *) malloc(nbytes);
|
bitmap = (char *) malloc(nbytes);
|
||||||
if (!bitmap)
|
if (!bitmap)
|
||||||
goto outOfMemory;
|
{
|
||||||
|
errmsg = NULL; /* means "out of memory", see below */
|
||||||
|
goto advance_and_error;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pqGetnchar(bitmap, nbytes, conn))
|
if (pqGetnchar(bitmap, nbytes, conn))
|
||||||
@ -770,35 +890,34 @@ getAnotherTuple(PGconn *conn, bool binary)
|
|||||||
|
|
||||||
for (i = 0; i < nfields; i++)
|
for (i = 0; i < nfields; i++)
|
||||||
{
|
{
|
||||||
|
/* get the value length */
|
||||||
if (!(bmap & 0200))
|
if (!(bmap & 0200))
|
||||||
{
|
vlen = NULL_LEN;
|
||||||
/* if the field value is absent, make it a null string */
|
else if (pqGetInt(&vlen, 4, conn))
|
||||||
tup[i].value = result->null_field;
|
goto EOFexit;
|
||||||
tup[i].len = NULL_LEN;
|
|
||||||
}
|
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/* get the value length (the first four bytes are for length) */
|
|
||||||
if (pqGetInt(&vlen, 4, conn))
|
|
||||||
goto EOFexit;
|
|
||||||
if (!binary)
|
if (!binary)
|
||||||
vlen = vlen - 4;
|
vlen = vlen - 4;
|
||||||
if (vlen < 0)
|
if (vlen < 0)
|
||||||
vlen = 0;
|
vlen = 0;
|
||||||
if (tup[i].value == NULL)
|
|
||||||
{
|
|
||||||
tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
|
|
||||||
if (tup[i].value == NULL)
|
|
||||||
goto outOfMemory;
|
|
||||||
}
|
|
||||||
tup[i].len = vlen;
|
|
||||||
/* read in the value */
|
|
||||||
if (vlen > 0)
|
|
||||||
if (pqGetnchar((char *) (tup[i].value), vlen, conn))
|
|
||||||
goto EOFexit;
|
|
||||||
/* we have to terminate this ourselves */
|
|
||||||
tup[i].value[vlen] = '\0';
|
|
||||||
}
|
}
|
||||||
|
rowbuf[i].len = vlen;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* rowbuf[i].value always points to the next address in the data
|
||||||
|
* buffer even if the value is NULL. This allows row processors to
|
||||||
|
* estimate data sizes more easily.
|
||||||
|
*/
|
||||||
|
rowbuf[i].value = conn->inBuffer + conn->inCursor;
|
||||||
|
|
||||||
|
/* Skip over the data value */
|
||||||
|
if (vlen > 0)
|
||||||
|
{
|
||||||
|
if (pqSkipnchar(vlen, conn))
|
||||||
|
goto EOFexit;
|
||||||
|
}
|
||||||
|
|
||||||
/* advance the bitmap stuff */
|
/* advance the bitmap stuff */
|
||||||
bitcnt++;
|
bitcnt++;
|
||||||
if (bitcnt == BITS_PER_BYTE)
|
if (bitcnt == BITS_PER_BYTE)
|
||||||
@ -811,26 +930,63 @@ getAnotherTuple(PGconn *conn, bool binary)
|
|||||||
bmap <<= 1;
|
bmap <<= 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Success! Store the completed tuple in the result */
|
/* Release bitmap now if we allocated it */
|
||||||
if (!pqAddTuple(result, tup))
|
|
||||||
goto outOfMemory;
|
|
||||||
/* and reset for a new message */
|
|
||||||
conn->curTuple = NULL;
|
|
||||||
|
|
||||||
if (bitmap != std_bitmap)
|
if (bitmap != std_bitmap)
|
||||||
free(bitmap);
|
free(bitmap);
|
||||||
return 0;
|
bitmap = NULL;
|
||||||
|
|
||||||
outOfMemory:
|
|
||||||
/* Replace partially constructed result with an error result */
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* we do NOT use pqSaveErrorResult() here, because of the likelihood that
|
* Advance inStart to show that the "D" message has been processed. We
|
||||||
* there's not enough memory to concatenate messages...
|
* must do this before calling the row processor, in case it longjmps.
|
||||||
|
*/
|
||||||
|
conn->inStart = conn->inCursor;
|
||||||
|
|
||||||
|
/* Pass the completed row values to rowProcessor */
|
||||||
|
errmsg = NULL;
|
||||||
|
switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
|
||||||
|
conn->rowProcessorParam))
|
||||||
|
{
|
||||||
|
case 1:
|
||||||
|
/* everything is good */
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
case -1:
|
||||||
|
/* error, report the errmsg below */
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
/* unrecognized return code */
|
||||||
|
errmsg = libpq_gettext("unrecognized return value from row processor");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
goto set_error_result;
|
||||||
|
|
||||||
|
advance_and_error:
|
||||||
|
/*
|
||||||
|
* Discard the failed message. Unfortunately we don't know for sure
|
||||||
|
* where the end is, so just throw away everything in the input buffer.
|
||||||
|
* This is not very desirable but it's the best we can do in protocol v2.
|
||||||
|
*/
|
||||||
|
conn->inStart = conn->inEnd;
|
||||||
|
|
||||||
|
set_error_result:
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Replace partially constructed result with an error result. First
|
||||||
|
* discard the old result to try to win back some memory.
|
||||||
*/
|
*/
|
||||||
pqClearAsyncResult(conn);
|
pqClearAsyncResult(conn);
|
||||||
printfPQExpBuffer(&conn->errorMessage,
|
|
||||||
libpq_gettext("out of memory for query result\n"));
|
/*
|
||||||
|
* If row processor didn't provide an error message, assume "out of
|
||||||
|
* memory" was meant. The advantage of having this special case is that
|
||||||
|
* freeing the old result first greatly improves the odds that gettext()
|
||||||
|
* will succeed in providing a translation.
|
||||||
|
*/
|
||||||
|
if (!errmsg)
|
||||||
|
errmsg = libpq_gettext("out of memory for query result");
|
||||||
|
|
||||||
|
printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
|
* XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
|
||||||
@ -838,8 +994,6 @@ outOfMemory:
|
|||||||
*/
|
*/
|
||||||
conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
|
conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
|
||||||
conn->asyncStatus = PGASYNC_READY;
|
conn->asyncStatus = PGASYNC_READY;
|
||||||
/* Discard the failed message --- good idea? */
|
|
||||||
conn->inStart = conn->inEnd;
|
|
||||||
|
|
||||||
EOFexit:
|
EOFexit:
|
||||||
if (bitmap != NULL && bitmap != std_bitmap)
|
if (bitmap != NULL && bitmap != std_bitmap)
|
||||||
@ -1122,7 +1276,8 @@ pqGetline2(PGconn *conn, char *s, int maxlen)
|
|||||||
{
|
{
|
||||||
int result = 1; /* return value if buffer overflows */
|
int result = 1; /* return value if buffer overflows */
|
||||||
|
|
||||||
if (conn->sock < 0)
|
if (conn->sock < 0 ||
|
||||||
|
conn->asyncStatus != PGASYNC_COPY_OUT)
|
||||||
{
|
{
|
||||||
*s = '\0';
|
*s = '\0';
|
||||||
return EOF;
|
return EOF;
|
||||||
|
@ -44,7 +44,7 @@
|
|||||||
|
|
||||||
|
|
||||||
static void handleSyncLoss(PGconn *conn, char id, int msgLength);
|
static void handleSyncLoss(PGconn *conn, char id, int msgLength);
|
||||||
static int getRowDescriptions(PGconn *conn);
|
static int getRowDescriptions(PGconn *conn, int msgLength);
|
||||||
static int getParamDescriptions(PGconn *conn);
|
static int getParamDescriptions(PGconn *conn);
|
||||||
static int getAnotherTuple(PGconn *conn, int msgLength);
|
static int getAnotherTuple(PGconn *conn, int msgLength);
|
||||||
static int getParameterStatus(PGconn *conn);
|
static int getParameterStatus(PGconn *conn);
|
||||||
@ -61,6 +61,9 @@ static int build_startup_packet(const PGconn *conn, char *packet,
|
|||||||
* parseInput: if appropriate, parse input data from backend
|
* parseInput: if appropriate, parse input data from backend
|
||||||
* until input is exhausted or a stopping state is reached.
|
* until input is exhausted or a stopping state is reached.
|
||||||
* Note that this function will NOT attempt to read more data from the backend.
|
* Note that this function will NOT attempt to read more data from the backend.
|
||||||
|
*
|
||||||
|
* Note: callers of parseInput must be prepared for a longjmp exit when we are
|
||||||
|
* in PGASYNC_BUSY state, since an external row processor might do that.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
pqParseInput3(PGconn *conn)
|
pqParseInput3(PGconn *conn)
|
||||||
@ -269,15 +272,10 @@ pqParseInput3(PGconn *conn)
|
|||||||
conn->queryclass == PGQUERY_DESCRIBE)
|
conn->queryclass == PGQUERY_DESCRIBE)
|
||||||
{
|
{
|
||||||
/* First 'T' in a query sequence */
|
/* First 'T' in a query sequence */
|
||||||
if (getRowDescriptions(conn))
|
if (getRowDescriptions(conn, msgLength))
|
||||||
return;
|
return;
|
||||||
|
/* getRowDescriptions() moves inStart itself */
|
||||||
/*
|
continue;
|
||||||
* If we're doing a Describe, we're ready to pass the
|
|
||||||
* result back to the client.
|
|
||||||
*/
|
|
||||||
if (conn->queryclass == PGQUERY_DESCRIBE)
|
|
||||||
conn->asyncStatus = PGASYNC_READY;
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -327,6 +325,8 @@ pqParseInput3(PGconn *conn)
|
|||||||
/* Read another tuple of a normal query response */
|
/* Read another tuple of a normal query response */
|
||||||
if (getAnotherTuple(conn, msgLength))
|
if (getAnotherTuple(conn, msgLength))
|
||||||
return;
|
return;
|
||||||
|
/* getAnotherTuple() moves inStart itself */
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
else if (conn->result != NULL &&
|
else if (conn->result != NULL &&
|
||||||
conn->result->resultStatus == PGRES_FATAL_ERROR)
|
conn->result->resultStatus == PGRES_FATAL_ERROR)
|
||||||
@ -443,17 +443,20 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
|
|||||||
* parseInput subroutine to read a 'T' (row descriptions) message.
|
* parseInput subroutine to read a 'T' (row descriptions) message.
|
||||||
* We'll build a new PGresult structure (unless called for a Describe
|
* We'll build a new PGresult structure (unless called for a Describe
|
||||||
* command for a prepared statement) containing the attribute data.
|
* command for a prepared statement) containing the attribute data.
|
||||||
* Returns: 0 if completed message, EOF if not enough data yet.
|
* Returns: 0 if processed message successfully, EOF to suspend parsing
|
||||||
|
* (the latter case is not actually used currently).
|
||||||
|
* In either case, conn->inStart has been advanced past the message.
|
||||||
*
|
*
|
||||||
* Note that if we run out of data, we have to release the partially
|
* Note: the row processor could also choose to longjmp out of libpq,
|
||||||
* constructed PGresult, and rebuild it again next time. Fortunately,
|
* in which case the library's state must allow for resumption at the
|
||||||
* that shouldn't happen often, since 'T' messages usually fit in a packet.
|
* next message.
|
||||||
*/
|
*/
|
||||||
static int
|
static int
|
||||||
getRowDescriptions(PGconn *conn)
|
getRowDescriptions(PGconn *conn, int msgLength)
|
||||||
{
|
{
|
||||||
PGresult *result;
|
PGresult *result;
|
||||||
int nfields;
|
int nfields;
|
||||||
|
const char *errmsg;
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -471,12 +474,19 @@ getRowDescriptions(PGconn *conn)
|
|||||||
else
|
else
|
||||||
result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
|
result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
|
||||||
if (!result)
|
if (!result)
|
||||||
goto failure;
|
{
|
||||||
|
errmsg = NULL; /* means "out of memory", see below */
|
||||||
|
goto advance_and_error;
|
||||||
|
}
|
||||||
|
|
||||||
/* parseInput already read the 'T' label and message length. */
|
/* parseInput already read the 'T' label and message length. */
|
||||||
/* the next two bytes are the number of fields */
|
/* the next two bytes are the number of fields */
|
||||||
if (pqGetInt(&(result->numAttributes), 2, conn))
|
if (pqGetInt(&(result->numAttributes), 2, conn))
|
||||||
goto failure;
|
{
|
||||||
|
/* We should not run out of data here, so complain */
|
||||||
|
errmsg = libpq_gettext("insufficient data in \"T\" message");
|
||||||
|
goto advance_and_error;
|
||||||
|
}
|
||||||
nfields = result->numAttributes;
|
nfields = result->numAttributes;
|
||||||
|
|
||||||
/* allocate space for the attribute descriptors */
|
/* allocate space for the attribute descriptors */
|
||||||
@ -485,7 +495,10 @@ getRowDescriptions(PGconn *conn)
|
|||||||
result->attDescs = (PGresAttDesc *)
|
result->attDescs = (PGresAttDesc *)
|
||||||
pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
|
pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
|
||||||
if (!result->attDescs)
|
if (!result->attDescs)
|
||||||
goto failure;
|
{
|
||||||
|
errmsg = NULL; /* means "out of memory", see below */
|
||||||
|
goto advance_and_error;
|
||||||
|
}
|
||||||
MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
|
MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -510,7 +523,9 @@ getRowDescriptions(PGconn *conn)
|
|||||||
pqGetInt(&atttypmod, 4, conn) ||
|
pqGetInt(&atttypmod, 4, conn) ||
|
||||||
pqGetInt(&format, 2, conn))
|
pqGetInt(&format, 2, conn))
|
||||||
{
|
{
|
||||||
goto failure;
|
/* We should not run out of data here, so complain */
|
||||||
|
errmsg = libpq_gettext("insufficient data in \"T\" message");
|
||||||
|
goto advance_and_error;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -524,7 +539,10 @@ getRowDescriptions(PGconn *conn)
|
|||||||
result->attDescs[i].name = pqResultStrdup(result,
|
result->attDescs[i].name = pqResultStrdup(result,
|
||||||
conn->workBuffer.data);
|
conn->workBuffer.data);
|
||||||
if (!result->attDescs[i].name)
|
if (!result->attDescs[i].name)
|
||||||
goto failure;
|
{
|
||||||
|
errmsg = NULL; /* means "out of memory", see below */
|
||||||
|
goto advance_and_error;
|
||||||
|
}
|
||||||
result->attDescs[i].tableid = tableid;
|
result->attDescs[i].tableid = tableid;
|
||||||
result->attDescs[i].columnid = columnid;
|
result->attDescs[i].columnid = columnid;
|
||||||
result->attDescs[i].format = format;
|
result->attDescs[i].format = format;
|
||||||
@ -536,24 +554,84 @@ getRowDescriptions(PGconn *conn)
|
|||||||
result->binary = 0;
|
result->binary = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Sanity check that we absorbed all the data */
|
||||||
|
if (conn->inCursor != conn->inStart + 5 + msgLength)
|
||||||
|
{
|
||||||
|
errmsg = libpq_gettext("extraneous data in \"T\" message");
|
||||||
|
goto advance_and_error;
|
||||||
|
}
|
||||||
|
|
||||||
/* Success! */
|
/* Success! */
|
||||||
conn->result = result;
|
conn->result = result;
|
||||||
return 0;
|
|
||||||
|
|
||||||
failure:
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Discard incomplete result, unless it's from getParamDescriptions.
|
* Advance inStart to show that the "T" message has been processed. We
|
||||||
*
|
* must do this before calling the row processor, in case it longjmps.
|
||||||
* Note that if we hit a bufferload boundary while handling the
|
|
||||||
* describe-statement case, we'll forget any PGresult space we just
|
|
||||||
* allocated, and then reallocate it on next try. This will bloat the
|
|
||||||
* PGresult a little bit but the space will be freed at PQclear, so it
|
|
||||||
* doesn't seem worth trying to be smarter.
|
|
||||||
*/
|
*/
|
||||||
if (result != conn->result)
|
conn->inStart = conn->inCursor;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If we're doing a Describe, we're done, and ready to pass the result
|
||||||
|
* back to the client.
|
||||||
|
*/
|
||||||
|
if (conn->queryclass == PGQUERY_DESCRIBE)
|
||||||
|
{
|
||||||
|
conn->asyncStatus = PGASYNC_READY;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Give the row processor a chance to initialize for new result set */
|
||||||
|
errmsg = NULL;
|
||||||
|
switch ((*conn->rowProcessor) (result, NULL, &errmsg,
|
||||||
|
conn->rowProcessorParam))
|
||||||
|
{
|
||||||
|
case 1:
|
||||||
|
/* everything is good */
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
case -1:
|
||||||
|
/* error, report the errmsg below */
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
/* unrecognized return code */
|
||||||
|
errmsg = libpq_gettext("unrecognized return value from row processor");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
goto set_error_result;
|
||||||
|
|
||||||
|
advance_and_error:
|
||||||
|
/* Discard unsaved result, if any */
|
||||||
|
if (result && result != conn->result)
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
return EOF;
|
|
||||||
|
/* Discard the failed message by pretending we read it */
|
||||||
|
conn->inStart += 5 + msgLength;
|
||||||
|
|
||||||
|
set_error_result:
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Replace partially constructed result with an error result. First
|
||||||
|
* discard the old result to try to win back some memory.
|
||||||
|
*/
|
||||||
|
pqClearAsyncResult(conn);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If row processor didn't provide an error message, assume "out of
|
||||||
|
* memory" was meant.
|
||||||
|
*/
|
||||||
|
if (!errmsg)
|
||||||
|
errmsg = libpq_gettext("out of memory for query result");
|
||||||
|
|
||||||
|
printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
|
||||||
|
pqSaveErrorResult(conn);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Return zero to allow input parsing to continue. Subsequent "D"
|
||||||
|
* messages will be ignored until we get to end of data, since an error
|
||||||
|
* result is already set up.
|
||||||
|
*/
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -613,47 +691,53 @@ failure:
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* parseInput subroutine to read a 'D' (row data) message.
|
* parseInput subroutine to read a 'D' (row data) message.
|
||||||
* We add another tuple to the existing PGresult structure.
|
* We fill rowbuf with column pointers and then call the row processor.
|
||||||
* Returns: 0 if completed message, EOF if error or not enough data yet.
|
* Returns: 0 if processed message successfully, EOF to suspend parsing
|
||||||
|
* (the latter case is not actually used currently).
|
||||||
|
* In either case, conn->inStart has been advanced past the message.
|
||||||
*
|
*
|
||||||
* Note that if we run out of data, we have to suspend and reprocess
|
* Note: the row processor could also choose to longjmp out of libpq,
|
||||||
* the message after more data is received. We keep a partially constructed
|
* in which case the library's state must allow for resumption at the
|
||||||
* tuple in conn->curTuple, and avoid reallocating already-allocated storage.
|
* next message.
|
||||||
*/
|
*/
|
||||||
static int
|
static int
|
||||||
getAnotherTuple(PGconn *conn, int msgLength)
|
getAnotherTuple(PGconn *conn, int msgLength)
|
||||||
{
|
{
|
||||||
PGresult *result = conn->result;
|
PGresult *result = conn->result;
|
||||||
int nfields = result->numAttributes;
|
int nfields = result->numAttributes;
|
||||||
PGresAttValue *tup;
|
const char *errmsg;
|
||||||
|
PGdataValue *rowbuf;
|
||||||
int tupnfields; /* # fields from tuple */
|
int tupnfields; /* # fields from tuple */
|
||||||
int vlen; /* length of the current field value */
|
int vlen; /* length of the current field value */
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
/* Allocate tuple space if first time for this data message */
|
|
||||||
if (conn->curTuple == NULL)
|
|
||||||
{
|
|
||||||
conn->curTuple = (PGresAttValue *)
|
|
||||||
pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
|
|
||||||
if (conn->curTuple == NULL)
|
|
||||||
goto outOfMemory;
|
|
||||||
MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
|
|
||||||
}
|
|
||||||
tup = conn->curTuple;
|
|
||||||
|
|
||||||
/* Get the field count and make sure it's what we expect */
|
/* Get the field count and make sure it's what we expect */
|
||||||
if (pqGetInt(&tupnfields, 2, conn))
|
if (pqGetInt(&tupnfields, 2, conn))
|
||||||
return EOF;
|
{
|
||||||
|
/* We should not run out of data here, so complain */
|
||||||
|
errmsg = libpq_gettext("insufficient data in \"D\" message");
|
||||||
|
goto advance_and_error;
|
||||||
|
}
|
||||||
|
|
||||||
if (tupnfields != nfields)
|
if (tupnfields != nfields)
|
||||||
{
|
{
|
||||||
/* Replace partially constructed result with an error result */
|
errmsg = libpq_gettext("unexpected field count in \"D\" message");
|
||||||
printfPQExpBuffer(&conn->errorMessage,
|
goto advance_and_error;
|
||||||
libpq_gettext("unexpected field count in \"D\" message\n"));
|
}
|
||||||
pqSaveErrorResult(conn);
|
|
||||||
/* Discard the failed message by pretending we read it */
|
/* Resize row buffer if needed */
|
||||||
conn->inCursor = conn->inStart + 5 + msgLength;
|
rowbuf = conn->rowBuf;
|
||||||
return 0;
|
if (nfields > conn->rowBufLen)
|
||||||
|
{
|
||||||
|
rowbuf = (PGdataValue *) realloc(rowbuf,
|
||||||
|
nfields * sizeof(PGdataValue));
|
||||||
|
if (!rowbuf)
|
||||||
|
{
|
||||||
|
errmsg = NULL; /* means "out of memory", see below */
|
||||||
|
goto advance_and_error;
|
||||||
|
}
|
||||||
|
conn->rowBuf = rowbuf;
|
||||||
|
conn->rowBufLen = nfields;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Scan the fields */
|
/* Scan the fields */
|
||||||
@ -661,54 +745,94 @@ getAnotherTuple(PGconn *conn, int msgLength)
|
|||||||
{
|
{
|
||||||
/* get the value length */
|
/* get the value length */
|
||||||
if (pqGetInt(&vlen, 4, conn))
|
if (pqGetInt(&vlen, 4, conn))
|
||||||
return EOF;
|
|
||||||
if (vlen == -1)
|
|
||||||
{
|
{
|
||||||
/* null field */
|
/* We should not run out of data here, so complain */
|
||||||
tup[i].value = result->null_field;
|
errmsg = libpq_gettext("insufficient data in \"D\" message");
|
||||||
tup[i].len = NULL_LEN;
|
goto advance_and_error;
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
if (vlen < 0)
|
rowbuf[i].len = vlen;
|
||||||
vlen = 0;
|
|
||||||
if (tup[i].value == NULL)
|
|
||||||
{
|
|
||||||
bool isbinary = (result->attDescs[i].format != 0);
|
|
||||||
|
|
||||||
tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
|
/*
|
||||||
if (tup[i].value == NULL)
|
* rowbuf[i].value always points to the next address in the data
|
||||||
goto outOfMemory;
|
* buffer even if the value is NULL. This allows row processors to
|
||||||
}
|
* estimate data sizes more easily.
|
||||||
tup[i].len = vlen;
|
*/
|
||||||
/* read in the value */
|
rowbuf[i].value = conn->inBuffer + conn->inCursor;
|
||||||
|
|
||||||
|
/* Skip over the data value */
|
||||||
if (vlen > 0)
|
if (vlen > 0)
|
||||||
if (pqGetnchar((char *) (tup[i].value), vlen, conn))
|
{
|
||||||
return EOF;
|
if (pqSkipnchar(vlen, conn))
|
||||||
/* we have to terminate this ourselves */
|
{
|
||||||
tup[i].value[vlen] = '\0';
|
/* We should not run out of data here, so complain */
|
||||||
|
errmsg = libpq_gettext("insufficient data in \"D\" message");
|
||||||
|
goto advance_and_error;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Success! Store the completed tuple in the result */
|
/* Sanity check that we absorbed all the data */
|
||||||
if (!pqAddTuple(result, tup))
|
if (conn->inCursor != conn->inStart + 5 + msgLength)
|
||||||
goto outOfMemory;
|
{
|
||||||
/* and reset for a new message */
|
errmsg = libpq_gettext("extraneous data in \"D\" message");
|
||||||
conn->curTuple = NULL;
|
goto advance_and_error;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
/*
|
||||||
|
* Advance inStart to show that the "D" message has been processed. We
|
||||||
|
* must do this before calling the row processor, in case it longjmps.
|
||||||
|
*/
|
||||||
|
conn->inStart = conn->inCursor;
|
||||||
|
|
||||||
outOfMemory:
|
/* Pass the completed row values to rowProcessor */
|
||||||
|
errmsg = NULL;
|
||||||
|
switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
|
||||||
|
conn->rowProcessorParam))
|
||||||
|
{
|
||||||
|
case 1:
|
||||||
|
/* everything is good */
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
case -1:
|
||||||
|
/* error, report the errmsg below */
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
/* unrecognized return code */
|
||||||
|
errmsg = libpq_gettext("unrecognized return value from row processor");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
goto set_error_result;
|
||||||
|
|
||||||
|
advance_and_error:
|
||||||
|
/* Discard the failed message by pretending we read it */
|
||||||
|
conn->inStart += 5 + msgLength;
|
||||||
|
|
||||||
|
set_error_result:
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Replace partially constructed result with an error result. First
|
* Replace partially constructed result with an error result. First
|
||||||
* discard the old result to try to win back some memory.
|
* discard the old result to try to win back some memory.
|
||||||
*/
|
*/
|
||||||
pqClearAsyncResult(conn);
|
pqClearAsyncResult(conn);
|
||||||
printfPQExpBuffer(&conn->errorMessage,
|
|
||||||
libpq_gettext("out of memory for query result\n"));
|
/*
|
||||||
|
* If row processor didn't provide an error message, assume "out of
|
||||||
|
* memory" was meant. The advantage of having this special case is that
|
||||||
|
* freeing the old result first greatly improves the odds that gettext()
|
||||||
|
* will succeed in providing a translation.
|
||||||
|
*/
|
||||||
|
if (!errmsg)
|
||||||
|
errmsg = libpq_gettext("out of memory for query result");
|
||||||
|
|
||||||
|
printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
|
||||||
pqSaveErrorResult(conn);
|
pqSaveErrorResult(conn);
|
||||||
|
|
||||||
/* Discard the failed message by pretending we read it */
|
/*
|
||||||
conn->inCursor = conn->inStart + 5 + msgLength;
|
* Return zero to allow input parsing to continue. Subsequent "D"
|
||||||
|
* messages will be ignored until we get to end of data, since an error
|
||||||
|
* result is already set up.
|
||||||
|
*/
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,13 +38,14 @@ extern "C"
|
|||||||
|
|
||||||
/* Application-visible enum types */
|
/* Application-visible enum types */
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Although it is okay to add to these lists, values which become unused
|
||||||
|
* should never be removed, nor should constants be redefined - that would
|
||||||
|
* break compatibility with existing code.
|
||||||
|
*/
|
||||||
|
|
||||||
typedef enum
|
typedef enum
|
||||||
{
|
{
|
||||||
/*
|
|
||||||
* Although it is okay to add to this list, values which become unused
|
|
||||||
* should never be removed, nor should constants be redefined - that would
|
|
||||||
* break compatibility with existing code.
|
|
||||||
*/
|
|
||||||
CONNECTION_OK,
|
CONNECTION_OK,
|
||||||
CONNECTION_BAD,
|
CONNECTION_BAD,
|
||||||
/* Non-blocking mode only below here */
|
/* Non-blocking mode only below here */
|
||||||
@ -128,6 +129,17 @@ typedef struct pg_conn PGconn;
|
|||||||
*/
|
*/
|
||||||
typedef struct pg_result PGresult;
|
typedef struct pg_result PGresult;
|
||||||
|
|
||||||
|
/* PGdataValue represents a data field value being passed to a row processor.
|
||||||
|
* It could be either text or binary data; text data is not zero-terminated.
|
||||||
|
* A SQL NULL is represented by len < 0; then value is still valid but there
|
||||||
|
* are no data bytes there.
|
||||||
|
*/
|
||||||
|
typedef struct pgDataValue
|
||||||
|
{
|
||||||
|
int len; /* data length in bytes, or <0 if NULL */
|
||||||
|
const char *value; /* data value, without zero-termination */
|
||||||
|
} PGdataValue;
|
||||||
|
|
||||||
/* PGcancel encapsulates the information needed to cancel a running
|
/* PGcancel encapsulates the information needed to cancel a running
|
||||||
* query on an existing connection.
|
* query on an existing connection.
|
||||||
* The contents of this struct are not supposed to be known to applications.
|
* The contents of this struct are not supposed to be known to applications.
|
||||||
@ -149,6 +161,10 @@ typedef struct pgNotify
|
|||||||
struct pgNotify *next; /* list link */
|
struct pgNotify *next; /* list link */
|
||||||
} PGnotify;
|
} PGnotify;
|
||||||
|
|
||||||
|
/* Function type for row-processor callback */
|
||||||
|
typedef int (*PQrowProcessor) (PGresult *res, const PGdataValue *columns,
|
||||||
|
const char **errmsgp, void *param);
|
||||||
|
|
||||||
/* Function types for notice-handling callbacks */
|
/* Function types for notice-handling callbacks */
|
||||||
typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);
|
typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);
|
||||||
typedef void (*PQnoticeProcessor) (void *arg, const char *message);
|
typedef void (*PQnoticeProcessor) (void *arg, const char *message);
|
||||||
@ -388,11 +404,16 @@ extern int PQsendQueryPrepared(PGconn *conn,
|
|||||||
const int *paramFormats,
|
const int *paramFormats,
|
||||||
int resultFormat);
|
int resultFormat);
|
||||||
extern PGresult *PQgetResult(PGconn *conn);
|
extern PGresult *PQgetResult(PGconn *conn);
|
||||||
|
extern PGresult *PQskipResult(PGconn *conn);
|
||||||
|
|
||||||
/* Routines for managing an asynchronous query */
|
/* Routines for managing an asynchronous query */
|
||||||
extern int PQisBusy(PGconn *conn);
|
extern int PQisBusy(PGconn *conn);
|
||||||
extern int PQconsumeInput(PGconn *conn);
|
extern int PQconsumeInput(PGconn *conn);
|
||||||
|
|
||||||
|
/* Override default per-row processing */
|
||||||
|
extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
|
||||||
|
extern PQrowProcessor PQgetRowProcessor(const PGconn *conn, void **param);
|
||||||
|
|
||||||
/* LISTEN/NOTIFY support */
|
/* LISTEN/NOTIFY support */
|
||||||
extern PGnotify *PQnotifies(PGconn *conn);
|
extern PGnotify *PQnotifies(PGconn *conn);
|
||||||
|
|
||||||
|
@ -324,6 +324,10 @@ struct pg_conn
|
|||||||
/* Optional file to write trace info to */
|
/* Optional file to write trace info to */
|
||||||
FILE *Pfdebug;
|
FILE *Pfdebug;
|
||||||
|
|
||||||
|
/* Callback procedure for per-row processing */
|
||||||
|
PQrowProcessor rowProcessor; /* function pointer */
|
||||||
|
void *rowProcessorParam; /* passthrough argument */
|
||||||
|
|
||||||
/* Callback procedures for notice message processing */
|
/* Callback procedures for notice message processing */
|
||||||
PGNoticeHooks noticeHooks;
|
PGNoticeHooks noticeHooks;
|
||||||
|
|
||||||
@ -396,9 +400,14 @@ struct pg_conn
|
|||||||
* msg has no length word */
|
* msg has no length word */
|
||||||
int outMsgEnd; /* offset to msg end (so far) */
|
int outMsgEnd; /* offset to msg end (so far) */
|
||||||
|
|
||||||
|
/* Row processor interface workspace */
|
||||||
|
PGdataValue *rowBuf; /* array for passing values to rowProcessor */
|
||||||
|
int rowBufLen; /* number of entries allocated in rowBuf */
|
||||||
|
|
||||||
/* Status for asynchronous result construction */
|
/* Status for asynchronous result construction */
|
||||||
PGresult *result; /* result being constructed */
|
PGresult *result; /* result being constructed */
|
||||||
PGresAttValue *curTuple; /* tuple currently being read */
|
|
||||||
|
/* Assorted state for SSL, GSS, etc */
|
||||||
|
|
||||||
#ifdef USE_SSL
|
#ifdef USE_SSL
|
||||||
bool allow_ssl_try; /* Allowed to try SSL negotiation */
|
bool allow_ssl_try; /* Allowed to try SSL negotiation */
|
||||||
@ -435,7 +444,6 @@ struct pg_conn
|
|||||||
* connection */
|
* connection */
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
/* Buffer for current error message */
|
/* Buffer for current error message */
|
||||||
PQExpBufferData errorMessage; /* expansible string */
|
PQExpBufferData errorMessage; /* expansible string */
|
||||||
|
|
||||||
@ -505,7 +513,6 @@ extern void
|
|||||||
pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
|
pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
|
||||||
/* This lets gcc check the format string for consistency. */
|
/* This lets gcc check the format string for consistency. */
|
||||||
__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
|
__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
|
||||||
extern int pqAddTuple(PGresult *res, PGresAttValue *tup);
|
|
||||||
extern void pqSaveMessageField(PGresult *res, char code,
|
extern void pqSaveMessageField(PGresult *res, char code,
|
||||||
const char *value);
|
const char *value);
|
||||||
extern void pqSaveParameterStatus(PGconn *conn, const char *name,
|
extern void pqSaveParameterStatus(PGconn *conn, const char *name,
|
||||||
@ -558,6 +565,7 @@ extern int pqGets(PQExpBuffer buf, PGconn *conn);
|
|||||||
extern int pqGets_append(PQExpBuffer buf, PGconn *conn);
|
extern int pqGets_append(PQExpBuffer buf, PGconn *conn);
|
||||||
extern int pqPuts(const char *s, PGconn *conn);
|
extern int pqPuts(const char *s, PGconn *conn);
|
||||||
extern int pqGetnchar(char *s, size_t len, PGconn *conn);
|
extern int pqGetnchar(char *s, size_t len, PGconn *conn);
|
||||||
|
extern int pqSkipnchar(size_t len, PGconn *conn);
|
||||||
extern int pqPutnchar(const char *s, size_t len, PGconn *conn);
|
extern int pqPutnchar(const char *s, size_t len, PGconn *conn);
|
||||||
extern int pqGetInt(int *result, size_t bytes, PGconn *conn);
|
extern int pqGetInt(int *result, size_t bytes, PGconn *conn);
|
||||||
extern int pqPutInt(int value, size_t bytes, PGconn *conn);
|
extern int pqPutInt(int value, size_t bytes, PGconn *conn);
|
||||||
|
Reference in New Issue
Block a user