1
0
mirror of https://github.com/postgres/postgres.git synced 2025-05-01 01:04:50 +03:00

pg_upgrade: Parallelize incompatible polymorphics check.

This commit makes use of the new task framework in pg_upgrade to
parallelize the check for usage of incompatible polymorphic
functions, i.e., those with arguments of type anyarray/anyelement
rather than the newer anycompatible variants.  This step will now
process multiple databases concurrently when pg_upgrade's --jobs
option is provided a value greater than 1.

Reviewed-by: Daniel Gustafsson, Ilya Gladyshev
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
This commit is contained in:
Nathan Bossart 2024-09-16 16:10:33 -05:00
parent c34eabfbbf
commit cf2f82a37c

View File

@ -1413,6 +1413,40 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
check_ok(); check_ok();
} }
/*
* Callback function for processing results of query for
* check_for_incompatible_polymorphics()'s UpgradeTask. If the query returned
* any rows (i.e., the check failed), write the details to the report file.
*/
static void
process_incompat_polymorphics(DbInfo *dbinfo, PGresult *res, void *arg)
{
UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
bool db_used = false;
int ntups = PQntuples(res);
int i_objkind = PQfnumber(res, "objkind");
int i_objname = PQfnumber(res, "objname");
AssertVariableIsOfType(&process_incompat_polymorphics,
UpgradeTaskProcessCB);
for (int rowno = 0; rowno < ntups; rowno++)
{
if (report->file == NULL &&
(report->file = fopen_priv(report->path, "w")) == NULL)
pg_fatal("could not open file \"%s\": %m", report->path);
if (!db_used)
{
fprintf(report->file, "In database: %s\n", dbinfo->db_name);
db_used = true;
}
fprintf(report->file, " %s: %s\n",
PQgetvalue(res, rowno, i_objkind),
PQgetvalue(res, rowno, i_objname));
}
}
/* /*
* check_for_incompatible_polymorphics() * check_for_incompatible_polymorphics()
* *
@ -1422,14 +1456,15 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
static void static void
check_for_incompatible_polymorphics(ClusterInfo *cluster) check_for_incompatible_polymorphics(ClusterInfo *cluster)
{ {
PGresult *res;
FILE *script = NULL;
char output_path[MAXPGPATH];
PQExpBufferData old_polymorphics; PQExpBufferData old_polymorphics;
UpgradeTask *task = upgrade_task_create();
UpgradeTaskReport report;
char *query;
prep_status("Checking for incompatible polymorphic functions"); prep_status("Checking for incompatible polymorphic functions");
snprintf(output_path, sizeof(output_path), "%s/%s", report.file = NULL;
snprintf(report.path, sizeof(report.path), "%s/%s",
log_opts.basedir, log_opts.basedir,
"incompatible_polymorphics.txt"); "incompatible_polymorphics.txt");
@ -1453,80 +1488,51 @@ check_for_incompatible_polymorphics(ClusterInfo *cluster)
", 'array_positions(anyarray,anyelement)'" ", 'array_positions(anyarray,anyelement)'"
", 'width_bucket(anyelement,anyarray)'"); ", 'width_bucket(anyelement,anyarray)'");
for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) /*
* The query below hardcodes FirstNormalObjectId as 16384 rather than
* interpolating that C #define into the query because, if that #define is
* ever changed, the cutoff we want to use is the value used by
* pre-version 14 servers, not that of some future version.
*/
/* Aggregate transition functions */
query = psprintf("SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
"FROM pg_proc AS p "
"JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
"JOIN pg_proc AS transfn ON transfn.oid=a.aggtransfn "
"WHERE p.oid >= 16384 "
"AND a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) "
"AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
/* Aggregate final functions */
"UNION ALL "
"SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
"FROM pg_proc AS p "
"JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
"JOIN pg_proc AS finalfn ON finalfn.oid=a.aggfinalfn "
"WHERE p.oid >= 16384 "
"AND a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) "
"AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
/* Operators */
"UNION ALL "
"SELECT 'operator' AS objkind, op.oid::regoperator::text AS objname "
"FROM pg_operator AS op "
"WHERE op.oid >= 16384 "
"AND oprcode = ANY(ARRAY[%s]::regprocedure[]) "
"AND oprleft = ANY(ARRAY['anyarray', 'anyelement']::regtype[])",
old_polymorphics.data,
old_polymorphics.data,
old_polymorphics.data);
upgrade_task_add_step(task, query, process_incompat_polymorphics,
true, &report);
upgrade_task_run(task, cluster);
upgrade_task_free(task);
if (report.file)
{ {
bool db_used = false; fclose(report.file);
DbInfo *active_db = &cluster->dbarr.dbs[dbnum];
PGconn *conn = connectToServer(cluster, active_db->db_name);
int ntups;
int i_objkind,
i_objname;
/*
* The query below hardcodes FirstNormalObjectId as 16384 rather than
* interpolating that C #define into the query because, if that
* #define is ever changed, the cutoff we want to use is the value
* used by pre-version 14 servers, not that of some future version.
*/
res = executeQueryOrDie(conn,
/* Aggregate transition functions */
"SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
"FROM pg_proc AS p "
"JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
"JOIN pg_proc AS transfn ON transfn.oid=a.aggtransfn "
"WHERE p.oid >= 16384 "
"AND a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) "
"AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
/* Aggregate final functions */
"UNION ALL "
"SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
"FROM pg_proc AS p "
"JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
"JOIN pg_proc AS finalfn ON finalfn.oid=a.aggfinalfn "
"WHERE p.oid >= 16384 "
"AND a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) "
"AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
/* Operators */
"UNION ALL "
"SELECT 'operator' AS objkind, op.oid::regoperator::text AS objname "
"FROM pg_operator AS op "
"WHERE op.oid >= 16384 "
"AND oprcode = ANY(ARRAY[%s]::regprocedure[]) "
"AND oprleft = ANY(ARRAY['anyarray', 'anyelement']::regtype[]);",
old_polymorphics.data,
old_polymorphics.data,
old_polymorphics.data);
ntups = PQntuples(res);
i_objkind = PQfnumber(res, "objkind");
i_objname = PQfnumber(res, "objname");
for (int rowno = 0; rowno < ntups; rowno++)
{
if (script == NULL &&
(script = fopen_priv(output_path, "w")) == NULL)
pg_fatal("could not open file \"%s\": %m", output_path);
if (!db_used)
{
fprintf(script, "In database: %s\n", active_db->db_name);
db_used = true;
}
fprintf(script, " %s: %s\n",
PQgetvalue(res, rowno, i_objkind),
PQgetvalue(res, rowno, i_objname));
}
PQclear(res);
PQfinish(conn);
}
if (script)
{
fclose(script);
pg_log(PG_REPORT, "fatal"); pg_log(PG_REPORT, "fatal");
pg_fatal("Your installation contains user-defined objects that refer to internal\n" pg_fatal("Your installation contains user-defined objects that refer to internal\n"
"polymorphic functions with arguments of type \"anyarray\" or \"anyelement\".\n" "polymorphic functions with arguments of type \"anyarray\" or \"anyelement\".\n"
@ -1534,12 +1540,13 @@ check_for_incompatible_polymorphics(ClusterInfo *cluster)
"afterwards, changing them to refer to the new corresponding functions with\n" "afterwards, changing them to refer to the new corresponding functions with\n"
"arguments of type \"anycompatiblearray\" and \"anycompatible\".\n" "arguments of type \"anycompatiblearray\" and \"anycompatible\".\n"
"A list of the problematic objects is in the file:\n" "A list of the problematic objects is in the file:\n"
" %s", output_path); " %s", report.path);
} }
else else
check_ok(); check_ok();
termPQExpBuffer(&old_polymorphics); termPQExpBuffer(&old_polymorphics);
pg_free(query);
} }
/* /*