1
0
mirror of https://github.com/postgres/postgres.git synced 2025-12-12 02:37:31 +03:00

Create a FETCH_COUNT parameter that causes psql to execute SELECT-like

queries via a cursor, fetching a limited number of rows at a time and
therefore not risking exhausting memory.  A disadvantage of the scheme
is that 'aligned' output mode will align each group of rows independently
leading to odd-looking output, but all the other output formats work
reasonably well.  Chris Mair, with some additional hacking by moi.
This commit is contained in:
Tom Lane
2006-08-29 22:25:08 +00:00
parent 7c5ac5ce22
commit c2f60711d2
6 changed files with 878 additions and 503 deletions

View File

@@ -3,7 +3,7 @@
*
* Copyright (c) 2000-2006, PostgreSQL Global Development Group
*
* $PostgreSQL: pgsql/src/bin/psql/common.c,v 1.126 2006/08/29 15:19:50 tgl Exp $
* $PostgreSQL: pgsql/src/bin/psql/common.c,v 1.127 2006/08/29 22:25:07 tgl Exp $
*/
#include "postgres_fe.h"
#include "common.h"
@@ -28,6 +28,7 @@
#include "command.h"
#include "copy.h"
#include "mb/pg_wchar.h"
#include "mbprint.h"
/* Workarounds for Windows */
@@ -53,7 +54,9 @@ typedef struct _timeb TimevalStruct;
#endif
static bool ExecQueryUsingCursor(const char *query, double *elapsed_msec);
static bool command_no_begin(const char *query);
static bool is_select_command(const char *query);
/*
* "Safe" wrapper around strdup()
@@ -450,18 +453,15 @@ ResetCancelConn(void)
* AcceptResult
*
* Checks whether a result is valid, giving an error message if necessary;
* resets cancelConn as needed, and ensures that the connection to the backend
* is still up.
* and ensures that the connection to the backend is still up.
*
* Returns true for valid result, false for error state.
*/
static bool
AcceptResult(const PGresult *result, const char *query)
AcceptResult(const PGresult *result)
{
bool OK = true;
ResetCancelConn();
if (!result)
OK = false;
else
@@ -560,7 +560,9 @@ PSQLexec(const char *query, bool start_xact)
res = PQexec(pset.db, query);
if (!AcceptResult(res, query) && res)
ResetCancelConn();
if (!AcceptResult(res))
{
PQclear(res);
res = NULL;
@@ -602,6 +604,7 @@ PrintQueryTuples(const PGresult *results)
/* write output to \g argument, if any */
if (pset.gfname)
{
/* keep this code in sync with ExecQueryUsingCursor */
FILE *queryFout_copy = pset.queryFout;
bool queryFoutPipe_copy = pset.queryFoutPipe;
@@ -782,11 +785,10 @@ bool
SendQuery(const char *query)
{
PGresult *results;
TimevalStruct before,
after;
PGTransactionStatusType transaction_status;
double elapsed_msec = 0;
bool OK,
on_error_rollback_savepoint = false;
PGTransactionStatusType transaction_status;
static bool on_error_rollback_warning = false;
if (!pset.db)
@@ -869,20 +871,38 @@ SendQuery(const char *query)
}
}
if (pset.timing)
GETTIMEOFDAY(&before);
if (pset.fetch_count <= 0 || !is_select_command(query))
{
/* Default fetch-it-all-and-print mode */
TimevalStruct before,
after;
results = PQexec(pset.db, query);
if (pset.timing)
GETTIMEOFDAY(&before);
/* these operations are included in the timing result: */
OK = (AcceptResult(results, query) && ProcessCopyResult(results));
results = PQexec(pset.db, query);
if (pset.timing)
GETTIMEOFDAY(&after);
/* these operations are included in the timing result: */
ResetCancelConn();
OK = (AcceptResult(results) && ProcessCopyResult(results));
/* but printing results isn't: */
if (OK)
OK = PrintQueryResults(results);
if (pset.timing)
{
GETTIMEOFDAY(&after);
elapsed_msec = DIFF_MSEC(&after, &before);
}
/* but printing results isn't: */
if (OK)
OK = PrintQueryResults(results);
}
else
{
/* Fetch-in-segments mode */
OK = ExecQueryUsingCursor(query, &elapsed_msec);
ResetCancelConn();
results = NULL; /* PQclear(NULL) does nothing */
}
/* If we made a temporary savepoint, possibly release/rollback */
if (on_error_rollback_savepoint)
@@ -904,9 +924,10 @@ SendQuery(const char *query)
* the user did RELEASE or ROLLBACK, our savepoint is gone. If
* they issued a SAVEPOINT, releasing ours would remove theirs.
*/
if (strcmp(PQcmdStatus(results), "SAVEPOINT") == 0 ||
strcmp(PQcmdStatus(results), "RELEASE") == 0 ||
strcmp(PQcmdStatus(results), "ROLLBACK") == 0)
if (results &&
(strcmp(PQcmdStatus(results), "SAVEPOINT") == 0 ||
strcmp(PQcmdStatus(results), "RELEASE") == 0 ||
strcmp(PQcmdStatus(results), "ROLLBACK") == 0))
svptres = NULL;
else
svptres = PQexec(pset.db, "RELEASE pg_psql_temporary_savepoint");
@@ -927,7 +948,7 @@ SendQuery(const char *query)
/* Possible microtiming output */
if (OK && pset.timing)
printf(_("Time: %.3f ms\n"), DIFF_MSEC(&after, &before));
printf(_("Time: %.3f ms\n"), elapsed_msec);
/* check for events that may occur during query execution */
@@ -947,6 +968,198 @@ SendQuery(const char *query)
}
/*
* ExecQueryUsingCursor: run a SELECT-like query using a cursor
*
* This feature allows result sets larger than RAM to be dealt with.
*
* Returns true if the query executed successfully, false otherwise.
*
* If pset.timing is on, total query time (exclusive of result-printing) is
* stored into *elapsed_msec.
*/
static bool
ExecQueryUsingCursor(const char *query, double *elapsed_msec)
{
bool OK = true;
PGresult *results;
PQExpBufferData buf;
printQueryOpt my_popt = pset.popt;
FILE *queryFout_copy = pset.queryFout;
bool queryFoutPipe_copy = pset.queryFoutPipe;
bool started_txn = false;
bool did_pager = false;
int ntuples;
char fetch_cmd[64];
TimevalStruct before,
after;
*elapsed_msec = 0;
/* initialize print options for partial table output */
my_popt.topt.start_table = true;
my_popt.topt.stop_table = false;
my_popt.topt.prior_records = 0;
if (pset.timing)
GETTIMEOFDAY(&before);
/* if we're not in a transaction, start one */
if (PQtransactionStatus(pset.db) == PQTRANS_IDLE)
{
results = PQexec(pset.db, "BEGIN");
OK = AcceptResult(results) &&
(PQresultStatus(results) == PGRES_COMMAND_OK);
PQclear(results);
if (!OK)
return false;
started_txn = true;
}
/* Send DECLARE CURSOR */
initPQExpBuffer(&buf);
appendPQExpBuffer(&buf, "DECLARE _psql_cursor NO SCROLL CURSOR FOR\n%s",
query);
results = PQexec(pset.db, buf.data);
OK = AcceptResult(results) &&
(PQresultStatus(results) == PGRES_COMMAND_OK);
PQclear(results);
termPQExpBuffer(&buf);
if (!OK)
goto cleanup;
if (pset.timing)
{
GETTIMEOFDAY(&after);
*elapsed_msec += DIFF_MSEC(&after, &before);
}
snprintf(fetch_cmd, sizeof(fetch_cmd),
"FETCH FORWARD %d FROM _psql_cursor",
pset.fetch_count);
/* prepare to write output to \g argument, if any */
if (pset.gfname)
{
/* keep this code in sync with PrintQueryTuples */
pset.queryFout = stdout; /* so it doesn't get closed */
/* open file/pipe */
if (!setQFout(pset.gfname))
{
pset.queryFout = queryFout_copy;
pset.queryFoutPipe = queryFoutPipe_copy;
OK = false;
goto cleanup;
}
}
for (;;)
{
if (pset.timing)
GETTIMEOFDAY(&before);
/* get FETCH_COUNT tuples at a time */
results = PQexec(pset.db, fetch_cmd);
OK = AcceptResult(results) &&
(PQresultStatus(results) == PGRES_TUPLES_OK);
if (pset.timing)
{
GETTIMEOFDAY(&after);
*elapsed_msec += DIFF_MSEC(&after, &before);
}
if (!OK)
{
PQclear(results);
break;
}
ntuples = PQntuples(results);
if (ntuples < pset.fetch_count)
{
/* this is the last result set, so allow footer decoration */
my_popt.topt.stop_table = true;
}
else if (pset.queryFout == stdout && !did_pager)
{
/*
* If query requires multiple result sets, hack to ensure that
* only one pager instance is used for the whole mess
*/
pset.queryFout = PageOutput(100000, my_popt.topt.pager);
did_pager = true;
}
printQuery(results, &my_popt, pset.queryFout, pset.logfile);
/* after the first result set, disallow header decoration */
my_popt.topt.start_table = false;
my_popt.topt.prior_records += ntuples;
PQclear(results);
if (ntuples < pset.fetch_count || cancel_pressed)
break;
}
/* close \g argument file/pipe, restore old setting */
if (pset.gfname)
{
/* keep this code in sync with PrintQueryTuples */
setQFout(NULL);
pset.queryFout = queryFout_copy;
pset.queryFoutPipe = queryFoutPipe_copy;
free(pset.gfname);
pset.gfname = NULL;
}
else if (did_pager)
{
ClosePager(pset.queryFout);
pset.queryFout = queryFout_copy;
pset.queryFoutPipe = queryFoutPipe_copy;
}
cleanup:
if (pset.timing)
GETTIMEOFDAY(&before);
/*
* We try to close the cursor on either success or failure, but on
* failure ignore the result (it's probably just a bleat about
* being in an aborted transaction)
*/
results = PQexec(pset.db, "CLOSE _psql_cursor");
if (OK)
{
OK = AcceptResult(results) &&
(PQresultStatus(results) == PGRES_COMMAND_OK);
}
PQclear(results);
if (started_txn)
{
results = PQexec(pset.db, OK ? "COMMIT" : "ROLLBACK");
OK &= AcceptResult(results) &&
(PQresultStatus(results) == PGRES_COMMAND_OK);
PQclear(results);
}
if (pset.timing)
{
GETTIMEOFDAY(&after);
*elapsed_msec += DIFF_MSEC(&after, &before);
}
return OK;
}
/*
* Advance the given char pointer over white space and SQL comments.
*/
@@ -1158,6 +1371,43 @@ command_no_begin(const char *query)
}
/*
* Check whether the specified command is a SELECT (or VALUES).
*/
static bool
is_select_command(const char *query)
{
int wordlen;
/*
* First advance over any whitespace, comments and left parentheses.
*/
for (;;)
{
query = skip_white_space(query);
if (query[0] == '(')
query++;
else
break;
}
/*
* Check word length (since "selectx" is not "select").
*/
wordlen = 0;
while (isalpha((unsigned char) query[wordlen]))
wordlen += PQmblen(&query[wordlen], pset.encoding);
if (wordlen == 6 && pg_strncasecmp(query, "select", 6) == 0)
return true;
if (wordlen == 6 && pg_strncasecmp(query, "values", 6) == 0)
return true;
return false;
}
/*
* Test if the current user is a database superuser.
*