mirror of
https://github.com/postgres/postgres.git
synced 2025-05-21 15:54:08 +03:00
reindexdb: Fix the index-level REINDEX with multiple jobs
47f99a407d introduced a parallel index-level REINDEX. The code was written assuming that running run_reindex_command() with 'async == true' can schedule a number of queries for a connection. That's not true, and the second query sent using run_reindex_command() will wait for the completion of the previous one. This commit fixes that by putting REINDEX commands for the same table into a single query. Also, this commit removes the 'async' argument from run_reindex_command(), as only its call always passes 'async == true'. Reported-by: Álvaro Herrera <alvherre@alvh.no-ip.org> Discussion: https://postgr.es/m/202503071820.j25zn3lo4hvn%40alvherre.pgsql Reviewed-by: Álvaro Herrera <alvherre@alvh.no-ip.org> Backpatch-through: 17
This commit is contained in:
parent
c826cd1b1d
commit
09ef2f8df1
@ -50,10 +50,13 @@ static void reindex_all_databases(ConnParams *cparams,
|
|||||||
bool syscatalog, SimpleStringList *schemas,
|
bool syscatalog, SimpleStringList *schemas,
|
||||||
SimpleStringList *tables,
|
SimpleStringList *tables,
|
||||||
SimpleStringList *indexes);
|
SimpleStringList *indexes);
|
||||||
static void run_reindex_command(PGconn *conn, ReindexType type,
|
static void gen_reindex_command(PGconn *conn, ReindexType type,
|
||||||
const char *name, bool echo, bool verbose,
|
const char *name, bool echo, bool verbose,
|
||||||
bool concurrently, bool async,
|
bool concurrently, const char *tablespace,
|
||||||
const char *tablespace);
|
PQExpBufferData *sql);
|
||||||
|
static void run_reindex_command(PGconn *conn, ReindexType type,
|
||||||
|
const char *name, bool echo,
|
||||||
|
PQExpBufferData *sq);
|
||||||
|
|
||||||
static void help(const char *progname);
|
static void help(const char *progname);
|
||||||
|
|
||||||
@ -285,7 +288,6 @@ reindex_one_database(ConnParams *cparams, ReindexType type,
|
|||||||
ParallelSlotArray *sa;
|
ParallelSlotArray *sa;
|
||||||
bool failed = false;
|
bool failed = false;
|
||||||
int items_count = 0;
|
int items_count = 0;
|
||||||
char *prev_index_table_name = NULL;
|
|
||||||
ParallelSlot *free_slot = NULL;
|
ParallelSlot *free_slot = NULL;
|
||||||
|
|
||||||
conn = connectDatabase(cparams, progname, echo, false, true);
|
conn = connectDatabase(cparams, progname, echo, false, true);
|
||||||
@ -421,8 +423,8 @@ reindex_one_database(ConnParams *cparams, ReindexType type,
|
|||||||
cell = process_list->head;
|
cell = process_list->head;
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
|
PQExpBufferData sql;
|
||||||
const char *objname = cell->val;
|
const char *objname = cell->val;
|
||||||
bool need_new_slot = true;
|
|
||||||
|
|
||||||
if (CancelRequested)
|
if (CancelRequested)
|
||||||
{
|
{
|
||||||
@ -430,23 +432,6 @@ reindex_one_database(ConnParams *cparams, ReindexType type,
|
|||||||
goto finish;
|
goto finish;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* For parallel index-level REINDEX, the indices of the same table are
|
|
||||||
* ordered together and they are to be processed by the same job. So,
|
|
||||||
* we don't switch the job as soon as the index belongs to the same
|
|
||||||
* table as the previous one.
|
|
||||||
*/
|
|
||||||
if (parallel && process_type == REINDEX_INDEX)
|
|
||||||
{
|
|
||||||
if (prev_index_table_name != NULL &&
|
|
||||||
strcmp(prev_index_table_name, indices_tables_cell->val) == 0)
|
|
||||||
need_new_slot = false;
|
|
||||||
prev_index_table_name = indices_tables_cell->val;
|
|
||||||
indices_tables_cell = indices_tables_cell->next;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (need_new_slot)
|
|
||||||
{
|
|
||||||
free_slot = ParallelSlotsGetIdle(sa, NULL);
|
free_slot = ParallelSlotsGetIdle(sa, NULL);
|
||||||
if (!free_slot)
|
if (!free_slot)
|
||||||
{
|
{
|
||||||
@ -455,10 +440,37 @@ reindex_one_database(ConnParams *cparams, ReindexType type,
|
|||||||
}
|
}
|
||||||
|
|
||||||
ParallelSlotSetHandler(free_slot, TableCommandResultHandler, NULL);
|
ParallelSlotSetHandler(free_slot, TableCommandResultHandler, NULL);
|
||||||
|
initPQExpBuffer(&sql);
|
||||||
|
if (parallel && process_type == REINDEX_INDEX)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* For parallel index-level REINDEX, the indices of the same table
|
||||||
|
* are ordered together and they are to be processed by the same
|
||||||
|
* job. So, we put all the relevant REINDEX commands into the
|
||||||
|
* same SQL query to be processed by this job at once.
|
||||||
|
*/
|
||||||
|
gen_reindex_command(free_slot->connection, process_type, objname,
|
||||||
|
echo, verbose, concurrently, tablespace, &sql);
|
||||||
|
while (indices_tables_cell->next &&
|
||||||
|
strcmp(indices_tables_cell->val, indices_tables_cell->next->val) == 0)
|
||||||
|
{
|
||||||
|
indices_tables_cell = indices_tables_cell->next;
|
||||||
|
cell = cell->next;
|
||||||
|
objname = cell->val;
|
||||||
|
appendPQExpBufferChar(&sql, '\n');
|
||||||
|
gen_reindex_command(free_slot->connection, process_type, objname,
|
||||||
|
echo, verbose, concurrently, tablespace, &sql);
|
||||||
|
}
|
||||||
|
indices_tables_cell = indices_tables_cell->next;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
gen_reindex_command(free_slot->connection, process_type, objname,
|
||||||
|
echo, verbose, concurrently, tablespace, &sql);
|
||||||
}
|
}
|
||||||
|
|
||||||
run_reindex_command(free_slot->connection, process_type, objname,
|
run_reindex_command(free_slot->connection, process_type, objname,
|
||||||
echo, verbose, concurrently, true, tablespace);
|
echo, &sql);
|
||||||
|
termPQExpBuffer(&sql);
|
||||||
|
|
||||||
cell = cell->next;
|
cell = cell->next;
|
||||||
} while (cell != NULL);
|
} while (cell != NULL);
|
||||||
@ -486,57 +498,57 @@ finish:
|
|||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Append a SQL command required to reindex a given database object to the
|
||||||
|
* '*sql' string.
|
||||||
|
*/
|
||||||
static void
|
static void
|
||||||
run_reindex_command(PGconn *conn, ReindexType type, const char *name,
|
gen_reindex_command(PGconn *conn, ReindexType type, const char *name,
|
||||||
bool echo, bool verbose, bool concurrently, bool async,
|
bool echo, bool verbose, bool concurrently,
|
||||||
const char *tablespace)
|
const char *tablespace, PQExpBufferData *sql)
|
||||||
{
|
{
|
||||||
const char *paren = "(";
|
const char *paren = "(";
|
||||||
const char *comma = ", ";
|
const char *comma = ", ";
|
||||||
const char *sep = paren;
|
const char *sep = paren;
|
||||||
PQExpBufferData sql;
|
|
||||||
bool status;
|
|
||||||
|
|
||||||
Assert(name);
|
Assert(name);
|
||||||
|
|
||||||
/* build the REINDEX query */
|
/* build the REINDEX query */
|
||||||
initPQExpBuffer(&sql);
|
appendPQExpBufferStr(sql, "REINDEX ");
|
||||||
|
|
||||||
appendPQExpBufferStr(&sql, "REINDEX ");
|
|
||||||
|
|
||||||
if (verbose)
|
if (verbose)
|
||||||
{
|
{
|
||||||
appendPQExpBuffer(&sql, "%sVERBOSE", sep);
|
appendPQExpBuffer(sql, "%sVERBOSE", sep);
|
||||||
sep = comma;
|
sep = comma;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tablespace)
|
if (tablespace)
|
||||||
{
|
{
|
||||||
appendPQExpBuffer(&sql, "%sTABLESPACE %s", sep,
|
appendPQExpBuffer(sql, "%sTABLESPACE %s", sep,
|
||||||
fmtIdEnc(tablespace, PQclientEncoding(conn)));
|
fmtIdEnc(tablespace, PQclientEncoding(conn)));
|
||||||
sep = comma;
|
sep = comma;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sep != paren)
|
if (sep != paren)
|
||||||
appendPQExpBufferStr(&sql, ") ");
|
appendPQExpBufferStr(sql, ") ");
|
||||||
|
|
||||||
/* object type */
|
/* object type */
|
||||||
switch (type)
|
switch (type)
|
||||||
{
|
{
|
||||||
case REINDEX_DATABASE:
|
case REINDEX_DATABASE:
|
||||||
appendPQExpBufferStr(&sql, "DATABASE ");
|
appendPQExpBufferStr(sql, "DATABASE ");
|
||||||
break;
|
break;
|
||||||
case REINDEX_INDEX:
|
case REINDEX_INDEX:
|
||||||
appendPQExpBufferStr(&sql, "INDEX ");
|
appendPQExpBufferStr(sql, "INDEX ");
|
||||||
break;
|
break;
|
||||||
case REINDEX_SCHEMA:
|
case REINDEX_SCHEMA:
|
||||||
appendPQExpBufferStr(&sql, "SCHEMA ");
|
appendPQExpBufferStr(sql, "SCHEMA ");
|
||||||
break;
|
break;
|
||||||
case REINDEX_SYSTEM:
|
case REINDEX_SYSTEM:
|
||||||
appendPQExpBufferStr(&sql, "SYSTEM ");
|
appendPQExpBufferStr(sql, "SYSTEM ");
|
||||||
break;
|
break;
|
||||||
case REINDEX_TABLE:
|
case REINDEX_TABLE:
|
||||||
appendPQExpBufferStr(&sql, "TABLE ");
|
appendPQExpBufferStr(sql, "TABLE ");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -546,37 +558,43 @@ run_reindex_command(PGconn *conn, ReindexType type, const char *name,
|
|||||||
* object type.
|
* object type.
|
||||||
*/
|
*/
|
||||||
if (concurrently)
|
if (concurrently)
|
||||||
appendPQExpBufferStr(&sql, "CONCURRENTLY ");
|
appendPQExpBufferStr(sql, "CONCURRENTLY ");
|
||||||
|
|
||||||
/* object name */
|
/* object name */
|
||||||
switch (type)
|
switch (type)
|
||||||
{
|
{
|
||||||
case REINDEX_DATABASE:
|
case REINDEX_DATABASE:
|
||||||
case REINDEX_SYSTEM:
|
case REINDEX_SYSTEM:
|
||||||
appendPQExpBufferStr(&sql,
|
appendPQExpBufferStr(sql,
|
||||||
fmtIdEnc(name, PQclientEncoding(conn)));
|
fmtIdEnc(name, PQclientEncoding(conn)));
|
||||||
break;
|
break;
|
||||||
case REINDEX_INDEX:
|
case REINDEX_INDEX:
|
||||||
case REINDEX_TABLE:
|
case REINDEX_TABLE:
|
||||||
appendQualifiedRelation(&sql, name, conn, echo);
|
appendQualifiedRelation(sql, name, conn, echo);
|
||||||
break;
|
break;
|
||||||
case REINDEX_SCHEMA:
|
case REINDEX_SCHEMA:
|
||||||
appendPQExpBufferStr(&sql, name);
|
appendPQExpBufferStr(sql, name);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* finish the query */
|
/* finish the query */
|
||||||
appendPQExpBufferChar(&sql, ';');
|
appendPQExpBufferChar(sql, ';');
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Run one or more reindex commands accumulated in the '*sql' string against
|
||||||
|
* a given database connection.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
run_reindex_command(PGconn *conn, ReindexType type, const char *name,
|
||||||
|
bool echo, PQExpBufferData *sql)
|
||||||
|
{
|
||||||
|
bool status;
|
||||||
|
|
||||||
if (async)
|
|
||||||
{
|
|
||||||
if (echo)
|
if (echo)
|
||||||
printf("%s\n", sql.data);
|
printf("%s\n", sql->data);
|
||||||
|
|
||||||
status = PQsendQuery(conn, sql.data) == 1;
|
status = PQsendQuery(conn, sql->data) == 1;
|
||||||
}
|
|
||||||
else
|
|
||||||
status = executeMaintenanceCommand(conn, sql.data, echo);
|
|
||||||
|
|
||||||
if (!status)
|
if (!status)
|
||||||
{
|
{
|
||||||
@ -603,14 +621,7 @@ run_reindex_command(PGconn *conn, ReindexType type, const char *name,
|
|||||||
name, PQdb(conn), PQerrorMessage(conn));
|
name, PQdb(conn), PQerrorMessage(conn));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (!async)
|
|
||||||
{
|
|
||||||
PQfinish(conn);
|
|
||||||
exit(1);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
termPQExpBuffer(&sql);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
Loading…
x
Reference in New Issue
Block a user