1
0
mirror of https://github.com/postgres/postgres.git synced 2025-08-30 06:01:21 +03:00

Support retrieval of results in chunks with libpq.

This patch generalizes libpq's existing single-row mode to allow
individual partial-result PGresults to contain up to N rows, rather
than always one row.  This reduces malloc overhead compared to plain
single-row mode, and it is very useful for psql's FETCH_COUNT feature,
since otherwise we'd have to add code (and cycles) to either merge
single-row PGresults into a bigger one or teach psql's
results-printing logic to accept arrays of PGresults.

To avoid API breakage, PQsetSingleRowMode() remains the same, and we
add a new function PQsetChunkedRowsMode() to invoke the more general
case.  Also, PGresults obtained the old way continue to carry the
PGRES_SINGLE_TUPLE status code, while if PQsetChunkedRowsMode() is
used then their status code is PGRES_TUPLES_CHUNK.  The underlying
logic is the same either way, though.

Daniel Vérité, reviewed by Laurenz Albe and myself (and whacked
around a bit by me, so any remaining bugs are my fault)

Discussion: https://postgr.es/m/CAKZiRmxsVTkO928CM+-ADvsMyePmU3L9DQCa9NwqjvLPcEe5QA@mail.gmail.com
This commit is contained in:
Tom Lane
2024-04-06 20:41:32 -04:00
parent 92641d8d65
commit 4643a2b265
10 changed files with 249 additions and 92 deletions

View File

@@ -203,3 +203,4 @@ PQcancelErrorMessage 200
PQcancelReset 201
PQcancelFinish 202
PQsocketPoll 203
PQsetChunkedRowsMode 204

View File

@@ -41,7 +41,8 @@ char *const pgresStatus[] = {
"PGRES_COPY_BOTH",
"PGRES_SINGLE_TUPLE",
"PGRES_PIPELINE_SYNC",
"PGRES_PIPELINE_ABORTED"
"PGRES_PIPELINE_ABORTED",
"PGRES_TUPLES_CHUNK"
};
/* We return this if we're unable to make a PGresult at all */
@@ -200,6 +201,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
case PGRES_SINGLE_TUPLE:
case PGRES_TUPLES_CHUNK:
/* non-error cases */
break;
default:
@@ -771,7 +773,7 @@ PQclear(PGresult *res)
/*
* Handy subroutine to deallocate any partially constructed async result.
*
* Any "next" result gets cleared too.
* Any "saved" result gets cleared too.
*/
void
pqClearAsyncResult(PGconn *conn)
@@ -779,8 +781,8 @@ pqClearAsyncResult(PGconn *conn)
PQclear(conn->result);
conn->result = NULL;
conn->error_result = false;
PQclear(conn->next_result);
conn->next_result = NULL;
PQclear(conn->saved_result);
conn->saved_result = NULL;
}
/*
@@ -911,14 +913,14 @@ pqPrepareAsyncResult(PGconn *conn)
}
/*
* Replace conn->result with next_result, if any. In the normal case
* there isn't a next result and we're just dropping ownership of the
* current result. In single-row mode this restores the situation to what
* it was before we created the current single-row result.
* Replace conn->result with saved_result, if any. In the normal case
* there isn't a saved result and we're just dropping ownership of the
* current result. In partial-result mode this restores the situation to
* what it was before we created the current partial result.
*/
conn->result = conn->next_result;
conn->error_result = false; /* next_result is never an error */
conn->next_result = NULL;
conn->result = conn->saved_result;
conn->error_result = false; /* saved_result is never an error */
conn->saved_result = NULL;
return res;
}
@@ -1199,11 +1201,6 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value)
* On error, *errmsgp can be set to an error string to be returned.
* (Such a string should already be translated via libpq_gettext().)
* If it is left NULL, the error is presumed to be "out of memory".
*
* In single-row mode, we create a new result holding just the current row,
* stashing the previous result in conn->next_result so that it becomes
* active again after pqPrepareAsyncResult(). This allows the result metadata
* (column descriptions) to be carried forward to each result row.
*/
int
pqRowProcessor(PGconn *conn, const char **errmsgp)
@@ -1215,11 +1212,14 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
int i;
/*
* In single-row mode, make a new PGresult that will hold just this one
* row; the original conn->result is left unchanged so that it can be used
* again as the template for future rows.
* In partial-result mode, if we don't already have a partial PGresult
* then make one by cloning conn->result (which should hold the correct
* result metadata by now). Then the original conn->result is moved over
* to saved_result so that we can re-use it as a reference for future
* partial results. The saved result will become active again after
* pqPrepareAsyncResult() returns the partial result to the application.
*/
if (conn->singleRowMode)
if (conn->partialResMode && conn->saved_result == NULL)
{
/* Copy everything that should be in the result at this point */
res = PQcopyResult(res,
@@ -1227,6 +1227,11 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
PG_COPYRES_NOTICEHOOKS);
if (!res)
return 0;
/* Change result status to appropriate special value */
res->resultStatus = (conn->singleRowMode ? PGRES_SINGLE_TUPLE : PGRES_TUPLES_CHUNK);
/* And stash it as the active result */
conn->saved_result = conn->result;
conn->result = res;
}
/*
@@ -1241,7 +1246,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
tup = (PGresAttValue *)
pqResultAlloc(res, nfields * sizeof(PGresAttValue), true);
if (tup == NULL)
goto fail;
return 0;
for (i = 0; i < nfields; i++)
{
@@ -1260,7 +1265,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
val = (char *) pqResultAlloc(res, clen + 1, isbinary);
if (val == NULL)
goto fail;
return 0;
/* copy and zero-terminate the data (even if it's binary) */
memcpy(val, columns[i].value, clen);
@@ -1273,30 +1278,16 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
/* And add the tuple to the PGresult's tuple array */
if (!pqAddTuple(res, tup, errmsgp))
goto fail;
return 0;
/*
* Success. In single-row mode, make the result available to the client
* immediately.
* Success. In partial-result mode, if we have enough rows then make the
* result available to the client immediately.
*/
if (conn->singleRowMode)
{
/* Change result status to special single-row value */
res->resultStatus = PGRES_SINGLE_TUPLE;
/* Stash old result for re-use later */
conn->next_result = conn->result;
conn->result = res;
/* And mark the result ready to return */
if (conn->partialResMode && res->ntups >= conn->maxChunkSize)
conn->asyncStatus = PGASYNC_READY_MORE;
}
return 1;
fail:
/* release locally allocated PGresult, if we made one */
if (res != conn->result)
PQclear(res);
return 0;
}
@@ -1745,8 +1736,10 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
*/
pqClearAsyncResult(conn);
/* reset single-row processing mode */
/* reset partial-result mode */
conn->partialResMode = false;
conn->singleRowMode = false;
conn->maxChunkSize = 0;
}
/* ready to send command message */
@@ -1925,30 +1918,61 @@ sendFailed:
return 0;
}
/*
* Is it OK to change partial-result mode now?
*/
static bool
canChangeResultMode(PGconn *conn)
{
/*
* Only allow changing the mode when we have launched a query and not yet
* received any results.
*/
if (!conn)
return false;
if (conn->asyncStatus != PGASYNC_BUSY)
return false;
if (!conn->cmd_queue_head ||
(conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE &&
conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED))
return false;
if (pgHavePendingResult(conn))
return false;
return true;
}
/*
* Select row-by-row processing mode
*/
int
PQsetSingleRowMode(PGconn *conn)
{
/*
* Only allow setting the flag when we have launched a query and not yet
* received any results.
*/
if (!conn)
return 0;
if (conn->asyncStatus != PGASYNC_BUSY)
return 0;
if (!conn->cmd_queue_head ||
(conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE &&
conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED))
return 0;
if (pgHavePendingResult(conn))
if (canChangeResultMode(conn))
{
conn->partialResMode = true;
conn->singleRowMode = true;
conn->maxChunkSize = 1;
return 1;
}
else
return 0;
}
/* OK, set flag */
conn->singleRowMode = true;
return 1;
/*
* Select chunked results processing mode
*/
int
PQsetChunkedRowsMode(PGconn *conn, int chunkSize)
{
if (chunkSize > 0 && canChangeResultMode(conn))
{
conn->partialResMode = true;
conn->singleRowMode = false;
conn->maxChunkSize = chunkSize;
return 1;
}
else
return 0;
}
/*
@@ -2117,6 +2141,20 @@ PQgetResult(PGconn *conn)
case PGASYNC_READY:
res = pqPrepareAsyncResult(conn);
/*
* Normally pqPrepareAsyncResult will have left conn->result
* empty. Otherwise, "res" must be a not-full PGRES_TUPLES_CHUNK
* result, which we want to return to the caller while staying in
* PGASYNC_READY state. Then the next call here will return the
* empty PGRES_TUPLES_OK result that was restored from
* saved_result, after which we can proceed.
*/
if (conn->result)
{
Assert(res->resultStatus == PGRES_TUPLES_CHUNK);
break;
}
/* Advance the queue as appropriate */
pqCommandQueueAdvance(conn, false,
res->resultStatus == PGRES_PIPELINE_SYNC);
@@ -3173,10 +3211,12 @@ pqPipelineProcessQueue(PGconn *conn)
}
/*
* Reset single-row processing mode. (Client has to set it up for each
* query, if desired.)
* Reset partial-result mode. (Client has to set it up for each query, if
* desired.)
*/
conn->partialResMode = false;
conn->singleRowMode = false;
conn->maxChunkSize = 0;
/*
* If there are no further commands to process in the queue, get us in

View File

@@ -379,7 +379,8 @@ pqParseInput3(PGconn *conn)
break;
case PqMsg_DataRow:
if (conn->result != NULL &&
conn->result->resultStatus == PGRES_TUPLES_OK)
(conn->result->resultStatus == PGRES_TUPLES_OK ||
conn->result->resultStatus == PGRES_TUPLES_CHUNK))
{
/* Read another tuple of a normal query response */
if (getAnotherTuple(conn, msgLength))

View File

@@ -112,8 +112,9 @@ typedef enum
PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */
PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */
PGRES_PIPELINE_SYNC, /* pipeline synchronization point */
PGRES_PIPELINE_ABORTED /* Command didn't run because of an abort
PGRES_PIPELINE_ABORTED, /* Command didn't run because of an abort
* earlier in a pipeline */
PGRES_TUPLES_CHUNK /* chunk of tuples from larger resultset */
} ExecStatusType;
typedef enum
@@ -489,6 +490,7 @@ extern int PQsendQueryPrepared(PGconn *conn,
const int *paramFormats,
int resultFormat);
extern int PQsetSingleRowMode(PGconn *conn);
extern int PQsetChunkedRowsMode(PGconn *conn, int chunkSize);
extern PGresult *PQgetResult(PGconn *conn);
/* Routines for managing an asynchronous query */

View File

@@ -434,7 +434,10 @@ struct pg_conn
bool nonblocking; /* whether this connection is using nonblock
* sending semantics */
PGpipelineStatus pipelineStatus; /* status of pipeline mode */
bool partialResMode; /* true if single-row or chunked mode */
bool singleRowMode; /* return current query result row-by-row? */
int maxChunkSize; /* return query result in chunks not exceeding
* this number of rows */
char copy_is_binary; /* 1 = copy binary, 0 = copy text */
int copy_already_done; /* # bytes already returned in COPY OUT */
PGnotify *notifyHead; /* oldest unreported Notify msg */
@@ -535,12 +538,13 @@ struct pg_conn
* and error_result is true, then we need to return a PGRES_FATAL_ERROR
* result, but haven't yet constructed it; text for the error has been
* appended to conn->errorMessage. (Delaying construction simplifies
* dealing with out-of-memory cases.) If next_result isn't NULL, it is a
* PGresult that will replace "result" after we return that one.
* dealing with out-of-memory cases.) If saved_result isn't NULL, it is a
* PGresult that will replace "result" after we return that one; we use
* that in partial-result mode to remember the query's tuple metadata.
*/
PGresult *result; /* result being constructed */
bool error_result; /* do we need to make an ERROR result? */
PGresult *next_result; /* next result (used in single-row mode) */
PGresult *saved_result; /* original, empty result in partialResMode */
/* Assorted state for SASL, SSL, GSS, etc */
const pg_fe_sasl_mech *sasl;