diff --git a/src/bin/scripts/vacuumdb.c b/src/bin/scripts/vacuumdb.c index 437f610cfa6..0b89fc4c7b2 100644 --- a/src/bin/scripts/vacuumdb.c +++ b/src/bin/scripts/vacuumdb.c @@ -25,9 +25,8 @@ /* Parallel vacuuming stuff */ typedef struct ParallelSlot { - PGconn *connection; - pgsocket sock; - bool isFree; + PGconn *connection; /* One connection */ + bool isFree; /* Is it known to be idle? */ } ParallelSlot; /* vacuum options controlled by user flags */ @@ -68,6 +67,9 @@ static void run_vacuum_command(PGconn *conn, const char *sql, bool echo, static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots, const char *progname); +static bool ProcessQueryResult(PGconn *conn, PGresult *result, + const char *progname); + static bool GetQueryResult(PGconn *conn, const char *progname); static void DisconnectDatabase(ParallelSlot *slot); @@ -340,7 +342,7 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts, PQExpBufferData sql; PGconn *conn; SimpleStringListCell *cell; - ParallelSlot *slots = NULL; + ParallelSlot *slots; SimpleStringList dbtables = {NULL, NULL}; int i; bool failed = false; @@ -384,7 +386,6 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts, PQExpBufferData buf; PGresult *res; int ntups; - int i; initPQExpBuffer(&buf); @@ -425,6 +426,8 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts, * for the first slot. If not in parallel mode, the first slot in the * array contains the connection. */ + if (concurrentCons <= 0) + concurrentCons = 1; slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons); init_slot(slots, conn); if (parallel) @@ -455,11 +458,8 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts, cell = tables ? tables->head : NULL; do { - ParallelSlot *free_slot; const char *tabname = cell ? cell->val : NULL; - - prepare_vacuum_command(&sql, conn, vacopts, tabname, - tables == &dbtables, progname, echo); + ParallelSlot *free_slot; if (CancelRequested) { @@ -491,10 +491,17 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts, else free_slot = slots; + /* + * Prepare the vacuum command. Note that in some cases this requires + * query execution, so be sure to use the free connection. + */ + prepare_vacuum_command(&sql, free_slot->connection, vacopts, tabname, + tables == &dbtables, progname, echo); + /* * Execute the vacuum. If not in parallel mode, this terminates the * program in case of an error. (The parallel case handles query - * errors in GetQueryResult through GetIdleSlot.) + * errors in ProcessQueryResult through GetIdleSlot.) */ run_vacuum_command(free_slot->connection, sql.data, echo, tabname, progname, parallel); @@ -507,13 +514,11 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts, { int j; + /* wait for all connections to finish */ for (j = 0; j < concurrentCons; j++) { - /* wait for all connection to return the results */ if (!GetQueryResult((slots + j)->connection, progname)) goto finish; - - (slots + j)->isFree = true; } } @@ -684,7 +689,8 @@ prepare_vacuum_command(PQExpBuffer sql, PGconn *conn, } /* - * Execute a vacuum/analyze command to the server. + * Send a vacuum/analyze command to the server. In async mode, return after + * sending the command; else, wait for it to finish. * * Any errors during command execution are reported to stderr. If async is * false, this function exits the program after reporting the error. @@ -732,10 +738,6 @@ run_vacuum_command(PGconn *conn, const char *sql, bool echo, * this happens, we read the whole set and mark as free all sockets that become * available. * - * Process the slot list, if any free slot is available then return the slotid - * else perform the select on all the socket's and wait until at least one slot - * becomes available. - * * If an error occurs, NULL is returned. */ static ParallelSlot * @@ -744,31 +746,43 @@ GetIdleSlot(ParallelSlot slots[], int numslots, { int i; int firstFree = -1; - fd_set slotset; - pgsocket maxFd; - for (i = 0; i < numslots; i++) - if ((slots + i)->isFree) - return slots + i; - - FD_ZERO(&slotset); - - maxFd = slots->sock; + /* Any connection already known free? */ for (i = 0; i < numslots; i++) { - FD_SET((slots + i)->sock, &slotset); - if ((slots + i)->sock > maxFd) - maxFd = (slots + i)->sock; + if (slots[i].isFree) + return slots + i; } /* * No free slot found, so wait until one of the connections has finished * its task and return the available slot. */ - for (firstFree = -1; firstFree < 0;) + while (firstFree < 0) { + fd_set slotset; + int maxFd = 0; bool aborting; + /* We must reconstruct the fd_set for each call to select_loop */ + FD_ZERO(&slotset); + + for (i = 0; i < numslots; i++) + { + int sock = PQsocket(slots[i].connection); + + /* + * We don't really expect any connections to lose their sockets + * after startup, but just in case, cope by ignoring them. + */ + if (sock < 0) + continue; + + FD_SET(sock, &slotset); + if (sock > maxFd) + maxFd = sock; + } + SetCancelConn(slots->connection); i = select_loop(maxFd, &slotset, &aborting); ResetCancelConn(); @@ -786,64 +800,93 @@ GetIdleSlot(ParallelSlot slots[], int numslots, for (i = 0; i < numslots; i++) { - if (!FD_ISSET((slots + i)->sock, &slotset)) - continue; + int sock = PQsocket(slots[i].connection); - PQconsumeInput((slots + i)->connection); - if (PQisBusy((slots + i)->connection)) - continue; + if (sock >= 0 && FD_ISSET(sock, &slotset)) + { + /* select() says input is available, so consume it */ + PQconsumeInput(slots[i].connection); + } - (slots + i)->isFree = true; + /* Collect result(s) as long as any are available */ + while (!PQisBusy(slots[i].connection)) + { + PGresult *result = PQgetResult(slots[i].connection); - if (!GetQueryResult((slots + i)->connection, progname)) - return NULL; - - if (firstFree < 0) - firstFree = i; + if (result != NULL) + { + /* Check and discard the command result */ + if (!ProcessQueryResult(slots[i].connection, result, + progname)) + return NULL; + } + else + { + /* This connection has become idle */ + slots[i].isFree = true; + if (firstFree < 0) + firstFree = i; + break; + } + } } } return slots + firstFree; } +/* + * ProcessQueryResult + * + * Process (and delete) a query result. Returns true if there's no error, + * false otherwise -- but errors about trying to vacuum a missing relation + * are reported and subsequently ignored. + */ +static bool +ProcessQueryResult(PGconn *conn, PGresult *result, const char *progname) +{ + /* + * If it's an error, report it. Errors about a missing table are harmless + * so we continue processing; but die for other errors. + */ + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE); + + fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"), + progname, PQdb(conn), PQerrorMessage(conn)); + + if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0) + { + PQclear(result); + return false; + } + } + + PQclear(result); + return true; +} + /* * GetQueryResult * - * Process the query result. Returns true if there's no error, false - * otherwise -- but errors about trying to vacuum a missing relation are - * reported and subsequently ignored. + * Pump the conn till it's dry of results; return false if any are errors. + * Note that this will block if the conn is busy. */ static bool GetQueryResult(PGconn *conn, const char *progname) { + bool ok = true; PGresult *result; SetCancelConn(conn); while ((result = PQgetResult(conn)) != NULL) { - /* - * If errors are found, report them. Errors about a missing table are - * harmless so we continue processing; but die for other errors. - */ - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE); - - fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"), - progname, PQdb(conn), PQerrorMessage(conn)); - - if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0) - { - PQclear(result); - return false; - } - } - - PQclear(result); + if (!ProcessQueryResult(conn, result, progname)) + ok = false; } ResetCancelConn(); - - return true; + return ok; } /* @@ -938,8 +981,8 @@ static void init_slot(ParallelSlot *slot, PGconn *conn) { slot->connection = conn; + /* Initially assume connection is idle */ slot->isFree = true; - slot->sock = PQsocket(conn); } static void