diff --git a/src/bin/scripts/reindexdb.c b/src/bin/scripts/reindexdb.c index 7781fb1151a..9f072ac49ae 100644 --- a/src/bin/scripts/reindexdb.c +++ b/src/bin/scripts/reindexdb.c @@ -466,6 +466,7 @@ reindex_one_database(const ConnParams *cparams, ReindexType type, goto finish; } + ParallelSlotSetHandler(free_slot, TableCommandResultHandler, NULL); run_reindex_command(free_slot->connection, process_type, objname, echo, verbose, concurrently, true); diff --git a/src/bin/scripts/vacuumdb.c b/src/bin/scripts/vacuumdb.c index ed320817bc4..9dc8aca29f3 100644 --- a/src/bin/scripts/vacuumdb.c +++ b/src/bin/scripts/vacuumdb.c @@ -713,6 +713,7 @@ vacuum_one_database(const ConnParams *cparams, * Execute the vacuum. All errors are handled in processQueryResult * through ParallelSlotsGetIdle. */ + ParallelSlotSetHandler(free_slot, TableCommandResultHandler, NULL); run_vacuum_command(free_slot->connection, sql.data, echo, tabname); diff --git a/src/fe_utils/parallel_slot.c b/src/fe_utils/parallel_slot.c index 3987a4702b5..b625deb2545 100644 --- a/src/fe_utils/parallel_slot.c +++ b/src/fe_utils/parallel_slot.c @@ -30,7 +30,7 @@ static void init_slot(ParallelSlot *slot, PGconn *conn); static int select_loop(int maxFd, fd_set *workerset); -static bool processQueryResult(PGconn *conn, PGresult *result); +static bool processQueryResult(ParallelSlot *slot, PGresult *result); static void init_slot(ParallelSlot *slot, PGconn *conn) @@ -38,34 +38,24 @@ init_slot(ParallelSlot *slot, PGconn *conn) slot->connection = conn; /* Initially assume connection is idle */ slot->isFree = true; + ParallelSlotClearHandler(slot); } /* - * Process (and delete) a query result. Returns true if there's no error, - * false otherwise -- but errors about trying to work on a missing relation - * are reported and subsequently ignored. + * Process (and delete) a query result. Returns true if there's no problem, + * false otherwise. It's up to the handler to decide what cosntitutes a + * problem. */ static bool -processQueryResult(PGconn *conn, PGresult *result) +processQueryResult(ParallelSlot *slot, PGresult *result) { - /* - * If it's an error, report it. Errors about a missing table are harmless - * so we continue processing; but die for other errors. - */ - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE); + Assert(slot->handler != NULL); - pg_log_error("processing of database \"%s\" failed: %s", - PQdb(conn), PQerrorMessage(conn)); - - if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0) - { - PQclear(result); - return false; - } - } + /* On failure, the handler should return NULL after freeing the result */ + if (!slot->handler(result, slot->connection, slot->handler_context)) + return false; + /* Ok, we have to free it ourself */ PQclear(result); return true; } @@ -76,15 +66,15 @@ processQueryResult(PGconn *conn, PGresult *result) * Note that this will block if the connection is busy. */ static bool -consumeQueryResult(PGconn *conn) +consumeQueryResult(ParallelSlot *slot) { bool ok = true; PGresult *result; - SetCancelConn(conn); - while ((result = PQgetResult(conn)) != NULL) + SetCancelConn(slot->connection); + while ((result = PQgetResult(slot->connection)) != NULL) { - if (!processQueryResult(conn, result)) + if (!processQueryResult(slot, result)) ok = false; } ResetCancelConn(); @@ -227,14 +217,15 @@ ParallelSlotsGetIdle(ParallelSlot *slots, int numslots) if (result != NULL) { - /* Check and discard the command result */ - if (!processQueryResult(slots[i].connection, result)) + /* Handle and discard the command result */ + if (!processQueryResult(slots + i, result)) return NULL; } else { /* This connection has become idle */ slots[i].isFree = true; + ParallelSlotClearHandler(slots + i); if (firstFree < 0) firstFree = i; break; @@ -329,9 +320,53 @@ ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots) for (i = 0; i < numslots; i++) { - if (!consumeQueryResult((slots + i)->connection)) + if (!consumeQueryResult(slots + i)) return false; } return true; } + +/* + * TableCommandResultHandler + * + * ParallelSlotResultHandler for results of commands (not queries) against + * tables. + * + * Requires that the result status is either PGRES_COMMAND_OK or an error about + * a missing table. This is useful for utilities that compile a list of tables + * to process and then run commands (vacuum, reindex, or whatever) against + * those tables, as there is a race condition between the time the list is + * compiled and the time the command attempts to open the table. + * + * For missing tables, logs an error but allows processing to continue. + * + * For all other errors, logs an error and terminates further processing. + * + * res: PGresult from the query executed on the slot's connection + * conn: connection belonging to the slot + * context: unused + */ +bool +TableCommandResultHandler(PGresult *res, PGconn *conn, void *context) +{ + /* + * If it's an error, report it. Errors about a missing table are harmless + * so we continue processing; but die for other errors. + */ + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE); + + pg_log_error("processing of database \"%s\" failed: %s", + PQdb(conn), PQerrorMessage(conn)); + + if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0) + { + PQclear(res); + return false; + } + } + + return true; +} diff --git a/src/include/fe_utils/parallel_slot.h b/src/include/fe_utils/parallel_slot.h index 99eeb3328d6..8902f8d4f48 100644 --- a/src/include/fe_utils/parallel_slot.h +++ b/src/include/fe_utils/parallel_slot.h @@ -15,12 +15,39 @@ #include "fe_utils/connect_utils.h" #include "libpq-fe.h" +typedef bool (*ParallelSlotResultHandler) (PGresult *res, PGconn *conn, + void *context); + typedef struct ParallelSlot { PGconn *connection; /* One connection */ bool isFree; /* Is it known to be idle? */ + + /* + * Prior to issuing a command or query on 'connection', a handler callback + * function may optionally be registered to be invoked to process the + * results, and context information may optionally be registered for use + * by the handler. If unset, these fields should be NULL. + */ + ParallelSlotResultHandler handler; + void *handler_context; } ParallelSlot; +static inline void +ParallelSlotSetHandler(ParallelSlot *slot, ParallelSlotResultHandler handler, + void *context) +{ + slot->handler = handler; + slot->handler_context = context; +} + +static inline void +ParallelSlotClearHandler(ParallelSlot *slot) +{ + slot->handler = NULL; + slot->handler_context = NULL; +} + extern ParallelSlot *ParallelSlotsGetIdle(ParallelSlot *slots, int numslots); extern ParallelSlot *ParallelSlotsSetup(const ConnParams *cparams, @@ -31,5 +58,7 @@ extern void ParallelSlotsTerminate(ParallelSlot *slots, int numslots); extern bool ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots); +extern bool TableCommandResultHandler(PGresult *res, PGconn *conn, + void *context); #endif /* PARALLEL_SLOT_H */