diff --git a/src/bin/scripts/Makefile b/src/bin/scripts/Makefile index 9f352b5e2b2..3cd793b1340 100644 --- a/src/bin/scripts/Makefile +++ b/src/bin/scripts/Makefile @@ -28,7 +28,7 @@ createuser: createuser.o common.o $(WIN32RES) | submake-libpq submake-libpgport dropdb: dropdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils dropuser: dropuser.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils clusterdb: clusterdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils -vacuumdb: vacuumdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils +vacuumdb: vacuumdb.o common.o scripts_parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils reindexdb: reindexdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils pg_isready: pg_isready.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils @@ -50,7 +50,7 @@ uninstall: clean distclean maintainer-clean: rm -f $(addsuffix $(X), $(PROGRAMS)) $(addsuffix .o, $(PROGRAMS)) - rm -f common.o $(WIN32RES) + rm -f common.o scripts_parallel.o $(WIN32RES) rm -rf tmp_check check: diff --git a/src/bin/scripts/clusterdb.c b/src/bin/scripts/clusterdb.c index ae0facd5a7f..d3801273566 100644 --- a/src/bin/scripts/clusterdb.c +++ b/src/bin/scripts/clusterdb.c @@ -206,7 +206,7 @@ cluster_one_database(const char *dbname, bool verbose, const char *table, if (table) { appendPQExpBufferChar(&sql, ' '); - appendQualifiedRelation(&sql, table, conn, progname, echo); + appendQualifiedRelation(&sql, table, conn, echo); } appendPQExpBufferChar(&sql, ';'); @@ -239,7 +239,7 @@ cluster_all_databases(bool verbose, const char *maintenance_db, conn = connectMaintenanceDatabase(maintenance_db, host, port, username, prompt_password, progname, echo); - result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", progname, echo); + result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", echo); PQfinish(conn); initPQExpBuffer(&connstr); diff --git a/src/bin/scripts/common.c b/src/bin/scripts/common.c index 296029d8093..1b38a1da494 100644 --- a/src/bin/scripts/common.c +++ b/src/bin/scripts/common.c @@ -22,6 +22,8 @@ #include "fe_utils/connect.h" #include "fe_utils/string_utils.h" +#define ERRCODE_UNDEFINED_TABLE "42P01" + static PGcancel *volatile cancelConn = NULL; bool CancelRequested = false; @@ -146,8 +148,7 @@ connectDatabase(const char *dbname, const char *pghost, exit(1); } - PQclear(executeQuery(conn, ALWAYS_SECURE_SEARCH_PATH_SQL, - progname, echo)); + PQclear(executeQuery(conn, ALWAYS_SECURE_SEARCH_PATH_SQL, echo)); return conn; } @@ -178,11 +179,35 @@ connectMaintenanceDatabase(const char *maintenance_db, return conn; } +/* + * Disconnect the given connection, canceling any statement if one is active. + */ +void +disconnectDatabase(PGconn *conn) +{ + char errbuf[256]; + + Assert(conn != NULL); + + if (PQtransactionStatus(conn) == PQTRANS_ACTIVE) + { + PGcancel *cancel; + + if ((cancel = PQgetCancel(conn))) + { + (void) PQcancel(cancel, errbuf, sizeof(errbuf)); + PQfreeCancel(cancel); + } + } + + PQfinish(conn); +} + /* * Run a query, return the results, exit program on failure. */ PGresult * -executeQuery(PGconn *conn, const char *query, const char *progname, bool echo) +executeQuery(PGconn *conn, const char *query, bool echo) { PGresult *res; @@ -207,8 +232,7 @@ executeQuery(PGconn *conn, const char *query, const char *progname, bool echo) * As above for a SQL command (which returns nothing). */ void -executeCommand(PGconn *conn, const char *query, - const char *progname, bool echo) +executeCommand(PGconn *conn, const char *query, bool echo) { PGresult *res; @@ -255,6 +279,57 @@ executeMaintenanceCommand(PGconn *conn, const char *query, bool echo) return r; } +/* + * Consume all the results generated for the given connection until + * nothing remains. If at least one error is encountered, return false. + * Note that this will block if the connection is busy. + */ +bool +consumeQueryResult(PGconn *conn) +{ + bool ok = true; + PGresult *result; + + SetCancelConn(conn); + while ((result = PQgetResult(conn)) != NULL) + { + if (!processQueryResult(conn, result)) + ok = false; + } + ResetCancelConn(); + return ok; +} + +/* + * Process (and delete) a query result. Returns true if there's no error, + * false otherwise -- but errors about trying to work on a missing relation + * are reported and subsequently ignored. + */ +bool +processQueryResult(PGconn *conn, PGresult *result) +{ + /* + * If it's an error, report it. Errors about a missing table are harmless + * so we continue processing; but die for other errors. + */ + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE); + + pg_log_error("processing of database \"%s\" failed: %s", + PQdb(conn), PQerrorMessage(conn)); + + if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0) + { + PQclear(result); + return false; + } + } + + PQclear(result); + return true; +} + /* * Split TABLE[(COLUMNS)] into TABLE and [(COLUMNS)] portions. When you @@ -299,7 +374,7 @@ splitTableColumnsSpec(const char *spec, int encoding, */ void appendQualifiedRelation(PQExpBuffer buf, const char *spec, - PGconn *conn, const char *progname, bool echo) + PGconn *conn, bool echo) { char *table; const char *columns; @@ -324,7 +399,7 @@ appendQualifiedRelation(PQExpBuffer buf, const char *spec, appendStringLiteralConn(&sql, table, conn); appendPQExpBufferStr(&sql, "::pg_catalog.regclass;"); - executeCommand(conn, "RESET search_path;", progname, echo); + executeCommand(conn, "RESET search_path;", echo); /* * One row is a typical result, as is a nonexistent relation ERROR. @@ -332,7 +407,7 @@ appendQualifiedRelation(PQExpBuffer buf, const char *spec, * relation has that OID; this query returns no rows. Catalog corruption * might elicit other row counts. */ - res = executeQuery(conn, sql.data, progname, echo); + res = executeQuery(conn, sql.data, echo); ntups = PQntuples(res); if (ntups != 1) { @@ -351,8 +426,7 @@ appendQualifiedRelation(PQExpBuffer buf, const char *spec, termPQExpBuffer(&sql); pg_free(table); - PQclear(executeQuery(conn, ALWAYS_SECURE_SEARCH_PATH_SQL, - progname, echo)); + PQclear(executeQuery(conn, ALWAYS_SECURE_SEARCH_PATH_SQL, echo)); } diff --git a/src/bin/scripts/common.h b/src/bin/scripts/common.h index 35d1a3e0d51..f36b26a5765 100644 --- a/src/bin/scripts/common.h +++ b/src/bin/scripts/common.h @@ -39,20 +39,24 @@ extern PGconn *connectMaintenanceDatabase(const char *maintenance_db, const char *pguser, enum trivalue prompt_password, const char *progname, bool echo); -extern PGresult *executeQuery(PGconn *conn, const char *query, - const char *progname, bool echo); +extern void disconnectDatabase(PGconn *conn); -extern void executeCommand(PGconn *conn, const char *query, - const char *progname, bool echo); +extern PGresult *executeQuery(PGconn *conn, const char *query, bool echo); + +extern void executeCommand(PGconn *conn, const char *query, bool echo); extern bool executeMaintenanceCommand(PGconn *conn, const char *query, bool echo); +extern bool consumeQueryResult(PGconn *conn); + +extern bool processQueryResult(PGconn *conn, PGresult *result); + extern void splitTableColumnsSpec(const char *spec, int encoding, char **table, const char **columns); extern void appendQualifiedRelation(PQExpBuffer buf, const char *name, - PGconn *conn, const char *progname, bool echo); + PGconn *conn, bool echo); extern bool yesno_prompt(const char *question); diff --git a/src/bin/scripts/reindexdb.c b/src/bin/scripts/reindexdb.c index ca61348a0e5..219a9a92118 100644 --- a/src/bin/scripts/reindexdb.c +++ b/src/bin/scripts/reindexdb.c @@ -348,7 +348,7 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type, break; case REINDEX_INDEX: case REINDEX_TABLE: - appendQualifiedRelation(&sql, name, conn, progname, echo); + appendQualifiedRelation(&sql, name, conn, echo); break; case REINDEX_SCHEMA: appendPQExpBufferStr(&sql, name); @@ -405,7 +405,7 @@ reindex_all_databases(const char *maintenance_db, conn = connectMaintenanceDatabase(maintenance_db, host, port, username, prompt_password, progname, echo); - result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", progname, echo); + result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", echo); PQfinish(conn); initPQExpBuffer(&connstr); diff --git a/src/bin/scripts/scripts_parallel.c b/src/bin/scripts/scripts_parallel.c new file mode 100644 index 00000000000..ffdc4e49ef8 --- /dev/null +++ b/src/bin/scripts/scripts_parallel.c @@ -0,0 +1,283 @@ +/*------------------------------------------------------------------------- + * + * scripts_parallel.c + * Parallel support for bin/scripts/ + * + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/bin/scripts/scripts_parallel.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#ifdef HAVE_SYS_SELECT_H +#include +#endif + +#include "common.h" +#include "common/logging.h" +#include "scripts_parallel.h" + +static void init_slot(ParallelSlot *slot, PGconn *conn); +static int select_loop(int maxFd, fd_set *workerset, bool *aborting); + +static void +init_slot(ParallelSlot *slot, PGconn *conn) +{ + slot->connection = conn; + /* Initially assume connection is idle */ + slot->isFree = true; +} + +/* + * Loop on select() until a descriptor from the given set becomes readable. + * + * If we get a cancel request while we're waiting, we forego all further + * processing and set the *aborting flag to true. The return value must be + * ignored in this case. Otherwise, *aborting is set to false. + */ +static int +select_loop(int maxFd, fd_set *workerset, bool *aborting) +{ + int i; + fd_set saveSet = *workerset; + + if (CancelRequested) + { + *aborting = true; + return -1; + } + else + *aborting = false; + + for (;;) + { + /* + * On Windows, we need to check once in a while for cancel requests; + * on other platforms we rely on select() returning when interrupted. + */ + struct timeval *tvp; +#ifdef WIN32 + struct timeval tv = {0, 1000000}; + + tvp = &tv; +#else + tvp = NULL; +#endif + + *workerset = saveSet; + i = select(maxFd + 1, workerset, NULL, NULL, tvp); + +#ifdef WIN32 + if (i == SOCKET_ERROR) + { + i = -1; + + if (WSAGetLastError() == WSAEINTR) + errno = EINTR; + } +#endif + + if (i < 0 && errno == EINTR) + continue; /* ignore this */ + if (i < 0 || CancelRequested) + *aborting = true; /* but not this */ + if (i == 0) + continue; /* timeout (Win32 only) */ + break; + } + + return i; +} + +/* + * ParallelSlotsGetIdle + * Return a connection slot that is ready to execute a command. + * + * This returns the first slot we find that is marked isFree, if one is; + * otherwise, we loop on select() until one socket becomes available. When + * this happens, we read the whole set and mark as free all sockets that + * become available. If an error occurs, NULL is returned. + */ +ParallelSlot * +ParallelSlotsGetIdle(ParallelSlot *slots, int numslots) +{ + int i; + int firstFree = -1; + + /* + * Look for any connection currently free. If there is one, mark it as + * taken and let the caller know the slot to use. + */ + for (i = 0; i < numslots; i++) + { + if (slots[i].isFree) + { + slots[i].isFree = false; + return slots + i; + } + } + + /* + * No free slot found, so wait until one of the connections has finished + * its task and return the available slot. + */ + while (firstFree < 0) + { + fd_set slotset; + int maxFd = 0; + bool aborting; + + /* We must reconstruct the fd_set for each call to select_loop */ + FD_ZERO(&slotset); + + for (i = 0; i < numslots; i++) + { + int sock = PQsocket(slots[i].connection); + + /* + * We don't really expect any connections to lose their sockets + * after startup, but just in case, cope by ignoring them. + */ + if (sock < 0) + continue; + + FD_SET(sock, &slotset); + if (sock > maxFd) + maxFd = sock; + } + + SetCancelConn(slots->connection); + i = select_loop(maxFd, &slotset, &aborting); + ResetCancelConn(); + + if (aborting) + { + /* + * We set the cancel-receiving connection to the one in the zeroth + * slot above, so fetch the error from there. + */ + consumeQueryResult(slots->connection); + return NULL; + } + Assert(i != 0); + + for (i = 0; i < numslots; i++) + { + int sock = PQsocket(slots[i].connection); + + if (sock >= 0 && FD_ISSET(sock, &slotset)) + { + /* select() says input is available, so consume it */ + PQconsumeInput(slots[i].connection); + } + + /* Collect result(s) as long as any are available */ + while (!PQisBusy(slots[i].connection)) + { + PGresult *result = PQgetResult(slots[i].connection); + + if (result != NULL) + { + /* Check and discard the command result */ + if (!processQueryResult(slots[i].connection, result)) + return NULL; + } + else + { + /* This connection has become idle */ + slots[i].isFree = true; + if (firstFree < 0) + firstFree = i; + break; + } + } + } + } + + slots[firstFree].isFree = false; + return slots + firstFree; +} + +/* + * ParallelSlotsSetup + * Prepare a set of parallel slots to use on a given database. + * + * This creates and initializes a set of connections to the database + * using the information given by the caller, marking all parallel slots + * as free and ready to use. "conn" is an initial connection set up + * by the caller and is associated with the first slot in the parallel + * set. + */ +ParallelSlot * +ParallelSlotsSetup(const char *dbname, const char *host, const char *port, + const char *username, bool prompt_password, + const char *progname, bool echo, + PGconn *conn, int numslots) +{ + ParallelSlot *slots; + int i; + + Assert(conn != NULL); + + slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * numslots); + init_slot(slots, conn); + if (numslots > 1) + { + for (i = 1; i < numslots; i++) + { + conn = connectDatabase(dbname, host, port, username, prompt_password, + progname, echo, false, true); + init_slot(slots + i, conn); + } + } + + return slots; +} + +/* + * ParallelSlotsTerminate + * Clean up a set of parallel slots + * + * Iterate through all connections in a given set of ParallelSlots and + * terminate all connections. + */ +void +ParallelSlotsTerminate(ParallelSlot *slots, int numslots) +{ + int i; + + for (i = 0; i < numslots; i++) + { + PGconn *conn = slots[i].connection; + + if (conn == NULL) + continue; + + disconnectDatabase(conn); + } +} + +/* + * ParallelSlotsWaitCompletion + * + * Wait for all connections to finish, returning false if at least one + * error has been found on the way. + */ +bool +ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots) +{ + int i; + + for (i = 0; i < numslots; i++) + { + if (!consumeQueryResult((slots + i)->connection)) + return false; + } + + return true; +} diff --git a/src/bin/scripts/scripts_parallel.h b/src/bin/scripts/scripts_parallel.h new file mode 100644 index 00000000000..f1a724a64f1 --- /dev/null +++ b/src/bin/scripts/scripts_parallel.h @@ -0,0 +1,36 @@ +/*------------------------------------------------------------------------- + * + * scripts_parallel.h + * Parallel support for bin/scripts/ + * + * Copyright (c) 2003-2019, PostgreSQL Global Development Group + * + * src/bin/scripts/scripts_parallel.h + * + *------------------------------------------------------------------------- + */ +#ifndef SCRIPTS_PARALLEL_H +#define SCRIPTS_PARALLEL_H + + +typedef struct ParallelSlot +{ + PGconn *connection; /* One connection */ + bool isFree; /* Is it known to be idle? */ +} ParallelSlot; + +extern ParallelSlot *ParallelSlotsGetIdle(ParallelSlot *slots, int numslots); + +extern ParallelSlot *ParallelSlotsSetup(const char *dbname, const char *host, + const char *port, + const char *username, + bool prompt_password, + const char *progname, bool echo, + PGconn *conn, int numslots); + +extern void ParallelSlotsTerminate(ParallelSlot *slots, int numslots); + +extern bool ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots); + + +#endif /* SCRIPTS_PARALLEL_H */ diff --git a/src/bin/scripts/vacuumdb.c b/src/bin/scripts/vacuumdb.c index 3bcd14b4dcf..d3ee0da917b 100644 --- a/src/bin/scripts/vacuumdb.c +++ b/src/bin/scripts/vacuumdb.c @@ -12,10 +12,6 @@ #include "postgres_fe.h" -#ifdef HAVE_SYS_SELECT_H -#include -#endif - #include "catalog/pg_class_d.h" #include "common.h" @@ -23,17 +19,9 @@ #include "fe_utils/connect.h" #include "fe_utils/simple_list.h" #include "fe_utils/string_utils.h" +#include "scripts_parallel.h" -#define ERRCODE_UNDEFINED_TABLE "42P01" - -/* Parallel vacuuming stuff */ -typedef struct ParallelSlot -{ - PGconn *connection; /* One connection */ - bool isFree; /* Is it known to be idle? */ -} ParallelSlot; - /* vacuum options controlled by user flags */ typedef struct vacuumingOptions { @@ -69,21 +57,7 @@ static void prepare_vacuum_command(PQExpBuffer sql, int serverVersion, vacuumingOptions *vacopts, const char *table); static void run_vacuum_command(PGconn *conn, const char *sql, bool echo, - const char *table, const char *progname, bool async); - -static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots, - const char *progname); - -static bool ProcessQueryResult(PGconn *conn, PGresult *result, - const char *progname); - -static bool GetQueryResult(PGconn *conn, const char *progname); - -static void DisconnectDatabase(ParallelSlot *slot); - -static int select_loop(int maxFd, fd_set *workerset, bool *aborting); - -static void init_slot(ParallelSlot *slot, PGconn *conn); + const char *table, const char *progname); static void help(const char *progname); @@ -569,11 +543,10 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts, * query for consistency with table lookups done elsewhere by the user. */ appendPQExpBufferStr(&catalog_query, " ORDER BY c.relpages DESC;"); - executeCommand(conn, "RESET search_path;", progname, echo); - res = executeQuery(conn, catalog_query.data, progname, echo); + executeCommand(conn, "RESET search_path;", echo); + res = executeQuery(conn, catalog_query.data, echo); termPQExpBuffer(&catalog_query); - PQclear(executeQuery(conn, ALWAYS_SECURE_SEARCH_PATH_SQL, - progname, echo)); + PQclear(executeQuery(conn, ALWAYS_SECURE_SEARCH_PATH_SQL, echo)); /* * If no rows are returned, there are no matching tables, so we are done. @@ -625,17 +598,9 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts, */ if (concurrentCons <= 0) concurrentCons = 1; - slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons); - init_slot(slots, conn); - if (parallel) - { - for (i = 1; i < concurrentCons; i++) - { - conn = connectDatabase(dbname, host, port, username, prompt_password, - progname, echo, false, true); - init_slot(slots + i, conn); - } - } + + slots = ParallelSlotsSetup(dbname, host, port, username, prompt_password, + progname, echo, conn, concurrentCons); /* * Prepare all the connections to run the appropriate analyze stage, if @@ -649,7 +614,7 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts, for (j = 0; j < concurrentCons; j++) executeCommand((slots + j)->connection, - stage_commands[stage], progname, echo); + stage_commands[stage], echo); } initPQExpBuffer(&sql); @@ -666,63 +631,32 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts, goto finish; } - /* - * Get the connection slot to use. If in parallel mode, here we wait - * for one connection to become available if none already is. In - * non-parallel mode we simply use the only slot we have, which we - * know to be free. - */ - if (parallel) + free_slot = ParallelSlotsGetIdle(slots, concurrentCons); + if (!free_slot) { - /* - * Get a free slot, waiting until one becomes free if none - * currently is. - */ - free_slot = GetIdleSlot(slots, concurrentCons, progname); - if (!free_slot) - { - failed = true; - goto finish; - } - - free_slot->isFree = false; + failed = true; + goto finish; } - else - free_slot = slots; prepare_vacuum_command(&sql, PQserverVersion(free_slot->connection), vacopts, tabname); /* - * Execute the vacuum. If not in parallel mode, this terminates the - * program in case of an error. (The parallel case handles query - * errors in ProcessQueryResult through GetIdleSlot.) + * Execute the vacuum. All errors are handled in processQueryResult + * through ParallelSlotsGetIdle. */ run_vacuum_command(free_slot->connection, sql.data, - echo, tabname, progname, parallel); + echo, tabname, progname); cell = cell->next; } while (cell != NULL); - if (parallel) - { - int j; - - /* wait for all connections to finish */ - for (j = 0; j < concurrentCons; j++) - { - if (!GetQueryResult((slots + j)->connection, progname)) - { - failed = true; - goto finish; - } - } - } + if (!ParallelSlotsWaitCompletion(slots, concurrentCons)) + failed = true; finish: - for (i = 0; i < concurrentCons; i++) - DisconnectDatabase(slots + i); - pfree(slots); + ParallelSlotsTerminate(slots, concurrentCons); + pg_free(slots); termPQExpBuffer(&sql); @@ -756,7 +690,7 @@ vacuum_all_databases(vacuumingOptions *vacopts, prompt_password, progname, echo); result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", - progname, echo); + echo); PQfinish(conn); initPQExpBuffer(&connstr); @@ -914,27 +848,21 @@ prepare_vacuum_command(PQExpBuffer sql, int serverVersion, } /* - * Send a vacuum/analyze command to the server. In async mode, return after - * sending the command; else, wait for it to finish. + * Send a vacuum/analyze command to the server, returning after sending the + * command. * - * Any errors during command execution are reported to stderr. If async is - * false, this function exits the program after reporting the error. + * Any errors during command execution are reported to stderr. */ static void run_vacuum_command(PGconn *conn, const char *sql, bool echo, - const char *table, const char *progname, bool async) + const char *table, const char *progname) { bool status; - if (async) - { - if (echo) - printf("%s\n", sql); + if (echo) + printf("%s\n", sql); - status = PQsendQuery(conn, sql) == 1; - } - else - status = executeMaintenanceCommand(conn, sql, echo); + status = PQsendQuery(conn, sql) == 1; if (!status) { @@ -944,271 +872,9 @@ run_vacuum_command(PGconn *conn, const char *sql, bool echo, else pg_log_error("vacuuming of database \"%s\" failed: %s", PQdb(conn), PQerrorMessage(conn)); - - if (!async) - { - PQfinish(conn); - exit(1); - } } } -/* - * GetIdleSlot - * Return a connection slot that is ready to execute a command. - * - * We return the first slot we find that is marked isFree, if one is; - * otherwise, we loop on select() until one socket becomes available. When - * this happens, we read the whole set and mark as free all sockets that become - * available. - * - * If an error occurs, NULL is returned. - */ -static ParallelSlot * -GetIdleSlot(ParallelSlot slots[], int numslots, - const char *progname) -{ - int i; - int firstFree = -1; - - /* Any connection already known free? */ - for (i = 0; i < numslots; i++) - { - if (slots[i].isFree) - return slots + i; - } - - /* - * No free slot found, so wait until one of the connections has finished - * its task and return the available slot. - */ - while (firstFree < 0) - { - fd_set slotset; - int maxFd = 0; - bool aborting; - - /* We must reconstruct the fd_set for each call to select_loop */ - FD_ZERO(&slotset); - - for (i = 0; i < numslots; i++) - { - int sock = PQsocket(slots[i].connection); - - /* - * We don't really expect any connections to lose their sockets - * after startup, but just in case, cope by ignoring them. - */ - if (sock < 0) - continue; - - FD_SET(sock, &slotset); - if (sock > maxFd) - maxFd = sock; - } - - SetCancelConn(slots->connection); - i = select_loop(maxFd, &slotset, &aborting); - ResetCancelConn(); - - if (aborting) - { - /* - * We set the cancel-receiving connection to the one in the zeroth - * slot above, so fetch the error from there. - */ - GetQueryResult(slots->connection, progname); - return NULL; - } - Assert(i != 0); - - for (i = 0; i < numslots; i++) - { - int sock = PQsocket(slots[i].connection); - - if (sock >= 0 && FD_ISSET(sock, &slotset)) - { - /* select() says input is available, so consume it */ - PQconsumeInput(slots[i].connection); - } - - /* Collect result(s) as long as any are available */ - while (!PQisBusy(slots[i].connection)) - { - PGresult *result = PQgetResult(slots[i].connection); - - if (result != NULL) - { - /* Check and discard the command result */ - if (!ProcessQueryResult(slots[i].connection, result, - progname)) - return NULL; - } - else - { - /* This connection has become idle */ - slots[i].isFree = true; - if (firstFree < 0) - firstFree = i; - break; - } - } - } - } - - return slots + firstFree; -} - -/* - * ProcessQueryResult - * - * Process (and delete) a query result. Returns true if there's no error, - * false otherwise -- but errors about trying to vacuum a missing relation - * are reported and subsequently ignored. - */ -static bool -ProcessQueryResult(PGconn *conn, PGresult *result, const char *progname) -{ - /* - * If it's an error, report it. Errors about a missing table are harmless - * so we continue processing; but die for other errors. - */ - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE); - - pg_log_error("vacuuming of database \"%s\" failed: %s", - PQdb(conn), PQerrorMessage(conn)); - - if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0) - { - PQclear(result); - return false; - } - } - - PQclear(result); - return true; -} - -/* - * GetQueryResult - * - * Pump the conn till it's dry of results; return false if any are errors. - * Note that this will block if the conn is busy. - */ -static bool -GetQueryResult(PGconn *conn, const char *progname) -{ - bool ok = true; - PGresult *result; - - SetCancelConn(conn); - while ((result = PQgetResult(conn)) != NULL) - { - if (!ProcessQueryResult(conn, result, progname)) - ok = false; - } - ResetCancelConn(); - return ok; -} - -/* - * DisconnectDatabase - * Disconnect the connection associated with the given slot - */ -static void -DisconnectDatabase(ParallelSlot *slot) -{ - char errbuf[256]; - - if (!slot->connection) - return; - - if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE) - { - PGcancel *cancel; - - if ((cancel = PQgetCancel(slot->connection))) - { - (void) PQcancel(cancel, errbuf, sizeof(errbuf)); - PQfreeCancel(cancel); - } - } - - PQfinish(slot->connection); - slot->connection = NULL; -} - -/* - * Loop on select() until a descriptor from the given set becomes readable. - * - * If we get a cancel request while we're waiting, we forego all further - * processing and set the *aborting flag to true. The return value must be - * ignored in this case. Otherwise, *aborting is set to false. - */ -static int -select_loop(int maxFd, fd_set *workerset, bool *aborting) -{ - int i; - fd_set saveSet = *workerset; - - if (CancelRequested) - { - *aborting = true; - return -1; - } - else - *aborting = false; - - for (;;) - { - /* - * On Windows, we need to check once in a while for cancel requests; - * on other platforms we rely on select() returning when interrupted. - */ - struct timeval *tvp; -#ifdef WIN32 - struct timeval tv = {0, 1000000}; - - tvp = &tv; -#else - tvp = NULL; -#endif - - *workerset = saveSet; - i = select(maxFd + 1, workerset, NULL, NULL, tvp); - -#ifdef WIN32 - if (i == SOCKET_ERROR) - { - i = -1; - - if (WSAGetLastError() == WSAEINTR) - errno = EINTR; - } -#endif - - if (i < 0 && errno == EINTR) - continue; /* ignore this */ - if (i < 0 || CancelRequested) - *aborting = true; /* but not this */ - if (i == 0) - continue; /* timeout (Win32 only) */ - break; - } - - return i; -} - -static void -init_slot(ParallelSlot *slot, PGconn *conn) -{ - slot->connection = conn; - /* Initially assume connection is idle */ - slot->isFree = true; -} - static void help(const char *progname) {