1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-28 23:42:10 +03:00

Replace libpq's "row processor" API with a "single row" mode.

After taking awhile to digest the row-processor feature that was added to
libpq in commit 92785dac2e, we've concluded
it is over-complicated and too hard to use.  Leave the core infrastructure
changes in place (that is, there's still a row processor function inside
libpq), but remove the exposed API pieces, and instead provide a "single
row" mode switch that causes PQgetResult to return one row at a time in
separate PGresult objects.

This approach incurs more overhead than proper use of a row processor
callback would, since construction of a PGresult per row adds extra cycles.
However, it is far easier to use and harder to break.  The single-row mode
still affords applications the primary benefit that the row processor API
was meant to provide, namely not having to accumulate large result sets in
memory before processing them.  Preliminary testing suggests that we can
probably buy back most of the extra cycles by micro-optimizing construction
of the extra results, but that task will be left for another day.

Marko Kreen
This commit is contained in:
Tom Lane
2012-08-02 13:10:36 -04:00
parent f6fb9f103f
commit ea56ed9a1e
10 changed files with 404 additions and 655 deletions

View File

@ -70,6 +70,9 @@ typedef struct storeInfo
AttInMetadata *attinmeta;
MemoryContext tmpcontext;
char **cstrs;
/* temp storage for results to avoid leaks on exception */
PGresult *last_res;
PGresult *cur_res;
} storeInfo;
/*
@ -83,8 +86,8 @@ static void materializeQueryResult(FunctionCallInfo fcinfo,
const char *conname,
const char *sql,
bool fail);
static int storeHandler(PGresult *res, const PGdataValue *columns,
const char **errmsgp, void *param);
static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql);
static void storeRow(storeInfo *sinfo, PGresult *res, bool first);
static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn *rconn);
@ -630,7 +633,7 @@ dblink_send_query(PG_FUNCTION_ARGS)
/* async query send */
retval = PQsendQuery(conn, sql);
if (retval != 1)
elog(NOTICE, "%s", PQerrorMessage(conn));
elog(NOTICE, "could not send query: %s", PQerrorMessage(conn));
PG_RETURN_INT32(retval);
}
@ -927,8 +930,10 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res)
/*
* Execute the given SQL command and store its results into a tuplestore
* to be returned as the result of the current function.
*
* This is equivalent to PQexec followed by materializeResult, but we make
* use of libpq's "row processor" API to reduce per-row overhead.
* use of libpq's single-row mode to avoid accumulating the whole result
* inside libpq before it gets transferred to the tuplestore.
*/
static void
materializeQueryResult(FunctionCallInfo fcinfo,
@ -944,19 +949,14 @@ materializeQueryResult(FunctionCallInfo fcinfo,
/* prepTuplestoreResult must have been called previously */
Assert(rsinfo->returnMode == SFRM_Materialize);
/* initialize storeInfo to empty */
memset(&sinfo, 0, sizeof(sinfo));
sinfo.fcinfo = fcinfo;
PG_TRY();
{
/* initialize storeInfo to empty */
memset(&sinfo, 0, sizeof(sinfo));
sinfo.fcinfo = fcinfo;
/* We'll collect tuples using storeHandler */
PQsetRowProcessor(conn, storeHandler, &sinfo);
res = PQexec(conn, sql);
/* We don't keep the custom row processor installed permanently */
PQsetRowProcessor(conn, NULL, NULL);
/* execute query, collecting any tuples into the tuplestore */
res = storeQueryResult(&sinfo, conn, sql);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
@ -975,8 +975,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
/*
* storeHandler didn't get called, so we need to convert the
* command status string to a tuple manually
* storeRow didn't get called, so we need to convert the command
* status string to a tuple manually
*/
TupleDesc tupdesc;
AttInMetadata *attinmeta;
@ -1008,25 +1008,30 @@ materializeQueryResult(FunctionCallInfo fcinfo,
tuplestore_puttuple(tupstore, tuple);
PQclear(res);
res = NULL;
}
else
{
Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
/* storeHandler should have created a tuplestore */
/* storeRow should have created a tuplestore */
Assert(rsinfo->setResult != NULL);
PQclear(res);
res = NULL;
}
PQclear(sinfo.last_res);
sinfo.last_res = NULL;
PQclear(sinfo.cur_res);
sinfo.cur_res = NULL;
}
PG_CATCH();
{
/* be sure to unset the custom row processor */
PQsetRowProcessor(conn, NULL, NULL);
/* be sure to release any libpq result we collected */
if (res)
PQclear(res);
PQclear(res);
PQclear(sinfo.last_res);
PQclear(sinfo.cur_res);
/* and clear out any pending data in libpq */
while ((res = PQskipResult(conn)) != NULL)
while ((res = PQgetResult(conn)) != NULL)
PQclear(res);
PG_RE_THROW();
}
@ -1034,23 +1039,72 @@ materializeQueryResult(FunctionCallInfo fcinfo,
}
/*
* Custom row processor for materializeQueryResult.
* Prototype of this function must match PQrowProcessor.
* Execute query, and send any result rows to sinfo->tuplestore.
*/
static int
storeHandler(PGresult *res, const PGdataValue *columns,
const char **errmsgp, void *param)
static PGresult *
storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
{
bool first = true;
PGresult *res;
if (!PQsendQuery(conn, sql))
elog(ERROR, "could not send query: %s", PQerrorMessage(conn));
if (!PQsetSingleRowMode(conn)) /* shouldn't fail */
elog(ERROR, "failed to set single-row mode for dblink query");
for (;;)
{
CHECK_FOR_INTERRUPTS();
sinfo->cur_res = PQgetResult(conn);
if (!sinfo->cur_res)
break;
if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
{
/* got one row from possibly-bigger resultset */
storeRow(sinfo, sinfo->cur_res, first);
PQclear(sinfo->cur_res);
sinfo->cur_res = NULL;
first = false;
}
else
{
/* if empty resultset, fill tuplestore header */
if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
storeRow(sinfo, sinfo->cur_res, first);
/* store completed result at last_res */
PQclear(sinfo->last_res);
sinfo->last_res = sinfo->cur_res;
sinfo->cur_res = NULL;
first = true;
}
}
/* return last_res */
res = sinfo->last_res;
sinfo->last_res = NULL;
return res;
}
/*
* Send single row to sinfo->tuplestore.
*
* If "first" is true, create the tuplestore using PGresult's metadata
* (in this case the PGresult might contain either zero or one row).
*/
static void
storeRow(storeInfo *sinfo, PGresult *res, bool first)
{
storeInfo *sinfo = (storeInfo *) param;
int nfields = PQnfields(res);
char **cstrs = sinfo->cstrs;
HeapTuple tuple;
char *pbuf;
int pbuflen;
int i;
MemoryContext oldcontext;
if (columns == NULL)
if (first)
{
/* Prepare for new result set */
ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
@ -1098,13 +1152,16 @@ storeHandler(PGresult *res, const PGdataValue *columns,
sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
/* Create a new, empty tuplestore */
oldcontext = MemoryContextSwitchTo(
rsinfo->econtext->ecxt_per_query_memory);
oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->setResult = sinfo->tuplestore;
rsinfo->setDesc = tupdesc;
MemoryContextSwitchTo(oldcontext);
/* Done if empty resultset */
if (PQntuples(res) == 0)
return;
/*
* Set up sufficiently-wide string pointers array; this won't change
* in size so it's easy to preallocate.
@ -1121,11 +1178,10 @@ storeHandler(PGresult *res, const PGdataValue *columns,
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
return 1;
}
CHECK_FOR_INTERRUPTS();
/* Should have a single-row result if we get here */
Assert(PQntuples(res) == 1);
/*
* Do the following work in a temp context that we reset after each tuple.
@ -1135,46 +1191,24 @@ storeHandler(PGresult *res, const PGdataValue *columns,
oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
/*
* The strings passed to us are not null-terminated, but the datatype
* input functions we're about to call require null termination. Copy the
* strings and add null termination. As a micro-optimization, allocate
* all the strings with one palloc.
* Fill cstrs with null-terminated strings of column values.
*/
pbuflen = nfields; /* count the null terminators themselves */
for (i = 0; i < nfields; i++)
{
int len = columns[i].len;
if (len > 0)
pbuflen += len;
}
pbuf = (char *) palloc(pbuflen);
for (i = 0; i < nfields; i++)
{
int len = columns[i].len;
if (len < 0)
cstrs[i] = NULL;
if (PQgetisnull(res, 0, i))
sinfo->cstrs[i] = NULL;
else
{
cstrs[i] = pbuf;
memcpy(pbuf, columns[i].value, len);
pbuf += len;
*pbuf++ = '\0';
}
sinfo->cstrs[i] = PQgetvalue(res, 0, i);
}
/* Convert row to a tuple, and add it to the tuplestore */
tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
tuplestore_puttuple(sinfo->tuplestore, tuple);
/* Clean up */
MemoryContextSwitchTo(oldcontext);
MemoryContextReset(sinfo->tmpcontext);
return 1;
}
/*