mirror of
https://github.com/postgres/postgres.git
synced 2025-07-28 23:42:10 +03:00
Add a "row processor" API to libpq for better handling of large results.
Traditionally libpq has collected an entire query result before passing it back to the application. That provides a simple and transactional API, but it's pretty inefficient for large result sets. This patch allows the application to process each row on-the-fly instead of accumulating the rows into the PGresult. Error recovery becomes a bit more complex, but often that tradeoff is well worth making. Kyotaro Horiguchi, reviewed by Marko Kreen and Tom Lane
This commit is contained in:
@ -49,11 +49,19 @@ static int getNotify(PGconn *conn);
|
||||
PostgresPollingStatusType
|
||||
pqSetenvPoll(PGconn *conn)
|
||||
{
|
||||
PostgresPollingStatusType result;
|
||||
PGresult *res;
|
||||
PQrowProcessor savedRowProcessor;
|
||||
void *savedRowProcessorParam;
|
||||
|
||||
if (conn == NULL || conn->status == CONNECTION_BAD)
|
||||
return PGRES_POLLING_FAILED;
|
||||
|
||||
/* Ensure the standard row processor is used to collect any results */
|
||||
savedRowProcessor = conn->rowProcessor;
|
||||
savedRowProcessorParam = conn->rowProcessorParam;
|
||||
PQsetRowProcessor(conn, NULL, NULL);
|
||||
|
||||
/* Check whether there are any data for us */
|
||||
switch (conn->setenv_state)
|
||||
{
|
||||
@ -69,7 +77,10 @@ pqSetenvPoll(PGconn *conn)
|
||||
if (n < 0)
|
||||
goto error_return;
|
||||
if (n == 0)
|
||||
return PGRES_POLLING_READING;
|
||||
{
|
||||
result = PGRES_POLLING_READING;
|
||||
goto normal_return;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
@ -83,7 +94,8 @@ pqSetenvPoll(PGconn *conn)
|
||||
|
||||
/* Should we raise an error if called when not active? */
|
||||
case SETENV_STATE_IDLE:
|
||||
return PGRES_POLLING_OK;
|
||||
result = PGRES_POLLING_OK;
|
||||
goto normal_return;
|
||||
|
||||
default:
|
||||
printfPQExpBuffer(&conn->errorMessage,
|
||||
@ -180,7 +192,10 @@ pqSetenvPoll(PGconn *conn)
|
||||
case SETENV_STATE_CLIENT_ENCODING_WAIT:
|
||||
{
|
||||
if (PQisBusy(conn))
|
||||
return PGRES_POLLING_READING;
|
||||
{
|
||||
result = PGRES_POLLING_READING;
|
||||
goto normal_return;
|
||||
}
|
||||
|
||||
res = PQgetResult(conn);
|
||||
|
||||
@ -205,7 +220,10 @@ pqSetenvPoll(PGconn *conn)
|
||||
case SETENV_STATE_OPTION_WAIT:
|
||||
{
|
||||
if (PQisBusy(conn))
|
||||
return PGRES_POLLING_READING;
|
||||
{
|
||||
result = PGRES_POLLING_READING;
|
||||
goto normal_return;
|
||||
}
|
||||
|
||||
res = PQgetResult(conn);
|
||||
|
||||
@ -244,13 +262,17 @@ pqSetenvPoll(PGconn *conn)
|
||||
goto error_return;
|
||||
|
||||
conn->setenv_state = SETENV_STATE_QUERY1_WAIT;
|
||||
return PGRES_POLLING_READING;
|
||||
result = PGRES_POLLING_READING;
|
||||
goto normal_return;
|
||||
}
|
||||
|
||||
case SETENV_STATE_QUERY1_WAIT:
|
||||
{
|
||||
if (PQisBusy(conn))
|
||||
return PGRES_POLLING_READING;
|
||||
{
|
||||
result = PGRES_POLLING_READING;
|
||||
goto normal_return;
|
||||
}
|
||||
|
||||
res = PQgetResult(conn);
|
||||
|
||||
@ -327,13 +349,17 @@ pqSetenvPoll(PGconn *conn)
|
||||
goto error_return;
|
||||
|
||||
conn->setenv_state = SETENV_STATE_QUERY2_WAIT;
|
||||
return PGRES_POLLING_READING;
|
||||
result = PGRES_POLLING_READING;
|
||||
goto normal_return;
|
||||
}
|
||||
|
||||
case SETENV_STATE_QUERY2_WAIT:
|
||||
{
|
||||
if (PQisBusy(conn))
|
||||
return PGRES_POLLING_READING;
|
||||
{
|
||||
result = PGRES_POLLING_READING;
|
||||
goto normal_return;
|
||||
}
|
||||
|
||||
res = PQgetResult(conn);
|
||||
|
||||
@ -380,7 +406,8 @@ pqSetenvPoll(PGconn *conn)
|
||||
{
|
||||
/* Query finished, so we're done */
|
||||
conn->setenv_state = SETENV_STATE_IDLE;
|
||||
return PGRES_POLLING_OK;
|
||||
result = PGRES_POLLING_OK;
|
||||
goto normal_return;
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -398,7 +425,12 @@ pqSetenvPoll(PGconn *conn)
|
||||
|
||||
error_return:
|
||||
conn->setenv_state = SETENV_STATE_IDLE;
|
||||
return PGRES_POLLING_FAILED;
|
||||
result = PGRES_POLLING_FAILED;
|
||||
|
||||
normal_return:
|
||||
conn->rowProcessor = savedRowProcessor;
|
||||
conn->rowProcessorParam = savedRowProcessorParam;
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@ -406,6 +438,9 @@ error_return:
|
||||
* parseInput: if appropriate, parse input data from backend
|
||||
* until input is exhausted or a stopping state is reached.
|
||||
* Note that this function will NOT attempt to read more data from the backend.
|
||||
*
|
||||
* Note: callers of parseInput must be prepared for a longjmp exit when we are
|
||||
* in PGASYNC_BUSY state, since an external row processor might do that.
|
||||
*/
|
||||
void
|
||||
pqParseInput2(PGconn *conn)
|
||||
@ -549,6 +584,8 @@ pqParseInput2(PGconn *conn)
|
||||
/* First 'T' in a query sequence */
|
||||
if (getRowDescriptions(conn))
|
||||
return;
|
||||
/* getRowDescriptions() moves inStart itself */
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -569,6 +606,8 @@ pqParseInput2(PGconn *conn)
|
||||
/* Read another tuple of a normal query response */
|
||||
if (getAnotherTuple(conn, FALSE))
|
||||
return;
|
||||
/* getAnotherTuple() moves inStart itself */
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -585,6 +624,8 @@ pqParseInput2(PGconn *conn)
|
||||
/* Read another tuple of a normal query response */
|
||||
if (getAnotherTuple(conn, TRUE))
|
||||
return;
|
||||
/* getAnotherTuple() moves inStart itself */
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -627,27 +668,32 @@ pqParseInput2(PGconn *conn)
|
||||
/*
|
||||
* parseInput subroutine to read a 'T' (row descriptions) message.
|
||||
* We build a PGresult structure containing the attribute data.
|
||||
* Returns: 0 if completed message, EOF if not enough data yet.
|
||||
* Returns: 0 if completed message, EOF if error or not enough data
|
||||
* received yet.
|
||||
*
|
||||
* Note that if we run out of data, we have to release the partially
|
||||
* constructed PGresult, and rebuild it again next time. Fortunately,
|
||||
* that shouldn't happen often, since 'T' messages usually fit in a packet.
|
||||
* Note that if we run out of data, we have to suspend and reprocess
|
||||
* the message after more data is received. Otherwise, conn->inStart
|
||||
* must get advanced past the processed data.
|
||||
*/
|
||||
static int
|
||||
getRowDescriptions(PGconn *conn)
|
||||
{
|
||||
PGresult *result = NULL;
|
||||
PGresult *result;
|
||||
int nfields;
|
||||
const char *errmsg;
|
||||
int i;
|
||||
|
||||
result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
|
||||
if (!result)
|
||||
goto failure;
|
||||
{
|
||||
errmsg = NULL; /* means "out of memory", see below */
|
||||
goto advance_and_error;
|
||||
}
|
||||
|
||||
/* parseInput already read the 'T' label. */
|
||||
/* the next two bytes are the number of fields */
|
||||
if (pqGetInt(&(result->numAttributes), 2, conn))
|
||||
goto failure;
|
||||
goto EOFexit;
|
||||
nfields = result->numAttributes;
|
||||
|
||||
/* allocate space for the attribute descriptors */
|
||||
@ -656,7 +702,10 @@ getRowDescriptions(PGconn *conn)
|
||||
result->attDescs = (PGresAttDesc *)
|
||||
pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
|
||||
if (!result->attDescs)
|
||||
goto failure;
|
||||
{
|
||||
errmsg = NULL; /* means "out of memory", see below */
|
||||
goto advance_and_error;
|
||||
}
|
||||
MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
|
||||
}
|
||||
|
||||
@ -671,7 +720,7 @@ getRowDescriptions(PGconn *conn)
|
||||
pqGetInt(&typid, 4, conn) ||
|
||||
pqGetInt(&typlen, 2, conn) ||
|
||||
pqGetInt(&atttypmod, 4, conn))
|
||||
goto failure;
|
||||
goto EOFexit;
|
||||
|
||||
/*
|
||||
* Since pqGetInt treats 2-byte integers as unsigned, we need to
|
||||
@ -682,7 +731,10 @@ getRowDescriptions(PGconn *conn)
|
||||
result->attDescs[i].name = pqResultStrdup(result,
|
||||
conn->workBuffer.data);
|
||||
if (!result->attDescs[i].name)
|
||||
goto failure;
|
||||
{
|
||||
errmsg = NULL; /* means "out of memory", see below */
|
||||
goto advance_and_error;
|
||||
}
|
||||
result->attDescs[i].tableid = 0;
|
||||
result->attDescs[i].columnid = 0;
|
||||
result->attDescs[i].format = 0;
|
||||
@ -693,30 +745,90 @@ getRowDescriptions(PGconn *conn)
|
||||
|
||||
/* Success! */
|
||||
conn->result = result;
|
||||
return 0;
|
||||
|
||||
failure:
|
||||
if (result)
|
||||
/*
|
||||
* Advance inStart to show that the "T" message has been processed. We
|
||||
* must do this before calling the row processor, in case it longjmps.
|
||||
*/
|
||||
conn->inStart = conn->inCursor;
|
||||
|
||||
/* Give the row processor a chance to initialize for new result set */
|
||||
errmsg = NULL;
|
||||
switch ((*conn->rowProcessor) (result, NULL, &errmsg,
|
||||
conn->rowProcessorParam))
|
||||
{
|
||||
case 1:
|
||||
/* everything is good */
|
||||
return 0;
|
||||
|
||||
case -1:
|
||||
/* error, report the errmsg below */
|
||||
break;
|
||||
|
||||
default:
|
||||
/* unrecognized return code */
|
||||
errmsg = libpq_gettext("unrecognized return value from row processor");
|
||||
break;
|
||||
}
|
||||
goto set_error_result;
|
||||
|
||||
advance_and_error:
|
||||
/*
|
||||
* Discard the failed message. Unfortunately we don't know for sure
|
||||
* where the end is, so just throw away everything in the input buffer.
|
||||
* This is not very desirable but it's the best we can do in protocol v2.
|
||||
*/
|
||||
conn->inStart = conn->inEnd;
|
||||
|
||||
set_error_result:
|
||||
|
||||
/*
|
||||
* Replace partially constructed result with an error result. First
|
||||
* discard the old result to try to win back some memory.
|
||||
*/
|
||||
pqClearAsyncResult(conn);
|
||||
|
||||
/*
|
||||
* If row processor didn't provide an error message, assume "out of
|
||||
* memory" was meant. The advantage of having this special case is that
|
||||
* freeing the old result first greatly improves the odds that gettext()
|
||||
* will succeed in providing a translation.
|
||||
*/
|
||||
if (!errmsg)
|
||||
errmsg = libpq_gettext("out of memory for query result");
|
||||
|
||||
printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
|
||||
|
||||
/*
|
||||
* XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
|
||||
* do to recover...
|
||||
*/
|
||||
conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
|
||||
conn->asyncStatus = PGASYNC_READY;
|
||||
|
||||
EOFexit:
|
||||
if (result && result != conn->result)
|
||||
PQclear(result);
|
||||
return EOF;
|
||||
}
|
||||
|
||||
/*
|
||||
* parseInput subroutine to read a 'B' or 'D' (row data) message.
|
||||
* We add another tuple to the existing PGresult structure.
|
||||
* Returns: 0 if completed message, EOF if error or not enough data yet.
|
||||
* We fill rowbuf with column pointers and then call the row processor.
|
||||
* Returns: 0 if completed message, EOF if error or not enough data
|
||||
* received yet.
|
||||
*
|
||||
* Note that if we run out of data, we have to suspend and reprocess
|
||||
* the message after more data is received. We keep a partially constructed
|
||||
* tuple in conn->curTuple, and avoid reallocating already-allocated storage.
|
||||
* the message after more data is received. Otherwise, conn->inStart
|
||||
* must get advanced past the processed data.
|
||||
*/
|
||||
static int
|
||||
getAnotherTuple(PGconn *conn, bool binary)
|
||||
{
|
||||
PGresult *result = conn->result;
|
||||
int nfields = result->numAttributes;
|
||||
PGresAttValue *tup;
|
||||
|
||||
const char *errmsg;
|
||||
PGdataValue *rowbuf;
|
||||
/* the backend sends us a bitmap of which attributes are null */
|
||||
char std_bitmap[64]; /* used unless it doesn't fit */
|
||||
char *bitmap = std_bitmap;
|
||||
@ -727,28 +839,33 @@ getAnotherTuple(PGconn *conn, bool binary)
|
||||
int bitcnt; /* number of bits examined in current byte */
|
||||
int vlen; /* length of the current field value */
|
||||
|
||||
/* Resize row buffer if needed */
|
||||
rowbuf = conn->rowBuf;
|
||||
if (nfields > conn->rowBufLen)
|
||||
{
|
||||
rowbuf = (PGdataValue *) realloc(rowbuf,
|
||||
nfields * sizeof(PGdataValue));
|
||||
if (!rowbuf)
|
||||
{
|
||||
errmsg = NULL; /* means "out of memory", see below */
|
||||
goto advance_and_error;
|
||||
}
|
||||
conn->rowBuf = rowbuf;
|
||||
conn->rowBufLen = nfields;
|
||||
}
|
||||
|
||||
/* Save format specifier */
|
||||
result->binary = binary;
|
||||
|
||||
/* Allocate tuple space if first time for this data message */
|
||||
if (conn->curTuple == NULL)
|
||||
/*
|
||||
* If it's binary, fix the column format indicators. We assume the
|
||||
* backend will consistently send either B or D, not a mix.
|
||||
*/
|
||||
if (binary)
|
||||
{
|
||||
conn->curTuple = (PGresAttValue *)
|
||||
pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
|
||||
if (conn->curTuple == NULL)
|
||||
goto outOfMemory;
|
||||
MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
|
||||
|
||||
/*
|
||||
* If it's binary, fix the column format indicators. We assume the
|
||||
* backend will consistently send either B or D, not a mix.
|
||||
*/
|
||||
if (binary)
|
||||
{
|
||||
for (i = 0; i < nfields; i++)
|
||||
result->attDescs[i].format = 1;
|
||||
}
|
||||
for (i = 0; i < nfields; i++)
|
||||
result->attDescs[i].format = 1;
|
||||
}
|
||||
tup = conn->curTuple;
|
||||
|
||||
/* Get the null-value bitmap */
|
||||
nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
|
||||
@ -757,7 +874,10 @@ getAnotherTuple(PGconn *conn, bool binary)
|
||||
{
|
||||
bitmap = (char *) malloc(nbytes);
|
||||
if (!bitmap)
|
||||
goto outOfMemory;
|
||||
{
|
||||
errmsg = NULL; /* means "out of memory", see below */
|
||||
goto advance_and_error;
|
||||
}
|
||||
}
|
||||
|
||||
if (pqGetnchar(bitmap, nbytes, conn))
|
||||
@ -770,35 +890,34 @@ getAnotherTuple(PGconn *conn, bool binary)
|
||||
|
||||
for (i = 0; i < nfields; i++)
|
||||
{
|
||||
/* get the value length */
|
||||
if (!(bmap & 0200))
|
||||
{
|
||||
/* if the field value is absent, make it a null string */
|
||||
tup[i].value = result->null_field;
|
||||
tup[i].len = NULL_LEN;
|
||||
}
|
||||
vlen = NULL_LEN;
|
||||
else if (pqGetInt(&vlen, 4, conn))
|
||||
goto EOFexit;
|
||||
else
|
||||
{
|
||||
/* get the value length (the first four bytes are for length) */
|
||||
if (pqGetInt(&vlen, 4, conn))
|
||||
goto EOFexit;
|
||||
if (!binary)
|
||||
vlen = vlen - 4;
|
||||
if (vlen < 0)
|
||||
vlen = 0;
|
||||
if (tup[i].value == NULL)
|
||||
{
|
||||
tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
|
||||
if (tup[i].value == NULL)
|
||||
goto outOfMemory;
|
||||
}
|
||||
tup[i].len = vlen;
|
||||
/* read in the value */
|
||||
if (vlen > 0)
|
||||
if (pqGetnchar((char *) (tup[i].value), vlen, conn))
|
||||
goto EOFexit;
|
||||
/* we have to terminate this ourselves */
|
||||
tup[i].value[vlen] = '\0';
|
||||
}
|
||||
rowbuf[i].len = vlen;
|
||||
|
||||
/*
|
||||
* rowbuf[i].value always points to the next address in the data
|
||||
* buffer even if the value is NULL. This allows row processors to
|
||||
* estimate data sizes more easily.
|
||||
*/
|
||||
rowbuf[i].value = conn->inBuffer + conn->inCursor;
|
||||
|
||||
/* Skip over the data value */
|
||||
if (vlen > 0)
|
||||
{
|
||||
if (pqSkipnchar(vlen, conn))
|
||||
goto EOFexit;
|
||||
}
|
||||
|
||||
/* advance the bitmap stuff */
|
||||
bitcnt++;
|
||||
if (bitcnt == BITS_PER_BYTE)
|
||||
@ -811,26 +930,63 @@ getAnotherTuple(PGconn *conn, bool binary)
|
||||
bmap <<= 1;
|
||||
}
|
||||
|
||||
/* Success! Store the completed tuple in the result */
|
||||
if (!pqAddTuple(result, tup))
|
||||
goto outOfMemory;
|
||||
/* and reset for a new message */
|
||||
conn->curTuple = NULL;
|
||||
|
||||
/* Release bitmap now if we allocated it */
|
||||
if (bitmap != std_bitmap)
|
||||
free(bitmap);
|
||||
return 0;
|
||||
|
||||
outOfMemory:
|
||||
/* Replace partially constructed result with an error result */
|
||||
bitmap = NULL;
|
||||
|
||||
/*
|
||||
* we do NOT use pqSaveErrorResult() here, because of the likelihood that
|
||||
* there's not enough memory to concatenate messages...
|
||||
* Advance inStart to show that the "D" message has been processed. We
|
||||
* must do this before calling the row processor, in case it longjmps.
|
||||
*/
|
||||
conn->inStart = conn->inCursor;
|
||||
|
||||
/* Pass the completed row values to rowProcessor */
|
||||
errmsg = NULL;
|
||||
switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
|
||||
conn->rowProcessorParam))
|
||||
{
|
||||
case 1:
|
||||
/* everything is good */
|
||||
return 0;
|
||||
|
||||
case -1:
|
||||
/* error, report the errmsg below */
|
||||
break;
|
||||
|
||||
default:
|
||||
/* unrecognized return code */
|
||||
errmsg = libpq_gettext("unrecognized return value from row processor");
|
||||
break;
|
||||
}
|
||||
goto set_error_result;
|
||||
|
||||
advance_and_error:
|
||||
/*
|
||||
* Discard the failed message. Unfortunately we don't know for sure
|
||||
* where the end is, so just throw away everything in the input buffer.
|
||||
* This is not very desirable but it's the best we can do in protocol v2.
|
||||
*/
|
||||
conn->inStart = conn->inEnd;
|
||||
|
||||
set_error_result:
|
||||
|
||||
/*
|
||||
* Replace partially constructed result with an error result. First
|
||||
* discard the old result to try to win back some memory.
|
||||
*/
|
||||
pqClearAsyncResult(conn);
|
||||
printfPQExpBuffer(&conn->errorMessage,
|
||||
libpq_gettext("out of memory for query result\n"));
|
||||
|
||||
/*
|
||||
* If row processor didn't provide an error message, assume "out of
|
||||
* memory" was meant. The advantage of having this special case is that
|
||||
* freeing the old result first greatly improves the odds that gettext()
|
||||
* will succeed in providing a translation.
|
||||
*/
|
||||
if (!errmsg)
|
||||
errmsg = libpq_gettext("out of memory for query result");
|
||||
|
||||
printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
|
||||
|
||||
/*
|
||||
* XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
|
||||
@ -838,8 +994,6 @@ outOfMemory:
|
||||
*/
|
||||
conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
|
||||
conn->asyncStatus = PGASYNC_READY;
|
||||
/* Discard the failed message --- good idea? */
|
||||
conn->inStart = conn->inEnd;
|
||||
|
||||
EOFexit:
|
||||
if (bitmap != NULL && bitmap != std_bitmap)
|
||||
@ -1122,7 +1276,8 @@ pqGetline2(PGconn *conn, char *s, int maxlen)
|
||||
{
|
||||
int result = 1; /* return value if buffer overflows */
|
||||
|
||||
if (conn->sock < 0)
|
||||
if (conn->sock < 0 ||
|
||||
conn->asyncStatus != PGASYNC_COPY_OUT)
|
||||
{
|
||||
*s = '\0';
|
||||
return EOF;
|
||||
|
Reference in New Issue
Block a user