mirror of
https://github.com/postgres/postgres.git
synced 2025-05-31 03:21:24 +03:00
pg_upgrade: Parallelize data type checks.
This commit makes use of the new task framework in pg_upgrade to parallelize the checks for incompatible data types, i.e., data types whose on-disk format has changed, data types that have been removed, etc. This step will now process multiple databases concurrently when pg_upgrade's --jobs option is provided a value greater than 1. Reviewed-by: Daniel Gustafsson, Ilya Gladyshev Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
This commit is contained in:
parent
6ab8f27bc7
commit
bbf83cab98
@ -314,6 +314,147 @@ static DataTypesUsageChecks data_types_usage_checks[] =
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Private state for check_for_data_types_usage()'s UpgradeTask.
|
||||||
|
*/
|
||||||
|
struct data_type_check_state
|
||||||
|
{
|
||||||
|
DataTypesUsageChecks *check; /* the check for this step */
|
||||||
|
bool *result; /* true if check failed for any database */
|
||||||
|
PQExpBuffer *report; /* buffer for report on failed checks */
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Returns a palloc'd query string for the data type check, for use by
|
||||||
|
* check_for_data_types_usage()'s UpgradeTask.
|
||||||
|
*/
|
||||||
|
static char *
|
||||||
|
data_type_check_query(int checknum)
|
||||||
|
{
|
||||||
|
DataTypesUsageChecks *check = &data_types_usage_checks[checknum];
|
||||||
|
|
||||||
|
return psprintf("WITH RECURSIVE oids AS ( "
|
||||||
|
/* start with the type(s) returned by base_query */
|
||||||
|
" %s "
|
||||||
|
" UNION ALL "
|
||||||
|
" SELECT * FROM ( "
|
||||||
|
/* inner WITH because we can only reference the CTE once */
|
||||||
|
" WITH x AS (SELECT oid FROM oids) "
|
||||||
|
/* domains on any type selected so far */
|
||||||
|
" SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
|
||||||
|
" UNION ALL "
|
||||||
|
/* arrays over any type selected so far */
|
||||||
|
" SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' "
|
||||||
|
" UNION ALL "
|
||||||
|
/* composite types containing any type selected so far */
|
||||||
|
" SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
|
||||||
|
" WHERE t.typtype = 'c' AND "
|
||||||
|
" t.oid = c.reltype AND "
|
||||||
|
" c.oid = a.attrelid AND "
|
||||||
|
" NOT a.attisdropped AND "
|
||||||
|
" a.atttypid = x.oid "
|
||||||
|
" UNION ALL "
|
||||||
|
/* ranges containing any type selected so far */
|
||||||
|
" SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x "
|
||||||
|
" WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid"
|
||||||
|
" ) foo "
|
||||||
|
") "
|
||||||
|
/* now look for stored columns of any such type */
|
||||||
|
"SELECT n.nspname, c.relname, a.attname "
|
||||||
|
"FROM pg_catalog.pg_class c, "
|
||||||
|
" pg_catalog.pg_namespace n, "
|
||||||
|
" pg_catalog.pg_attribute a "
|
||||||
|
"WHERE c.oid = a.attrelid AND "
|
||||||
|
" NOT a.attisdropped AND "
|
||||||
|
" a.atttypid IN (SELECT oid FROM oids) AND "
|
||||||
|
" c.relkind IN ("
|
||||||
|
CppAsString2(RELKIND_RELATION) ", "
|
||||||
|
CppAsString2(RELKIND_MATVIEW) ", "
|
||||||
|
CppAsString2(RELKIND_INDEX) ") AND "
|
||||||
|
" c.relnamespace = n.oid AND "
|
||||||
|
/* exclude possible orphaned temp tables */
|
||||||
|
" n.nspname !~ '^pg_temp_' AND "
|
||||||
|
" n.nspname !~ '^pg_toast_temp_' AND "
|
||||||
|
/* exclude system catalogs, too */
|
||||||
|
" n.nspname NOT IN ('pg_catalog', 'information_schema')",
|
||||||
|
check->base_query);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Callback function for processing results of queries for
|
||||||
|
* check_for_data_types_usage()'s UpgradeTask. If the query returned any rows
|
||||||
|
* (i.e., the check failed), write the details to the report file.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
process_data_type_check(DbInfo *dbinfo, PGresult *res, void *arg)
|
||||||
|
{
|
||||||
|
struct data_type_check_state *state = (struct data_type_check_state *) arg;
|
||||||
|
int ntups = PQntuples(res);
|
||||||
|
|
||||||
|
AssertVariableIsOfType(&process_data_type_check, UpgradeTaskProcessCB);
|
||||||
|
|
||||||
|
if (ntups)
|
||||||
|
{
|
||||||
|
char output_path[MAXPGPATH];
|
||||||
|
int i_nspname;
|
||||||
|
int i_relname;
|
||||||
|
int i_attname;
|
||||||
|
FILE *script = NULL;
|
||||||
|
bool db_used = false;
|
||||||
|
|
||||||
|
snprintf(output_path, sizeof(output_path), "%s/%s",
|
||||||
|
log_opts.basedir,
|
||||||
|
state->check->report_filename);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Make sure we have a buffer to save reports to now that we found a
|
||||||
|
* first failing check.
|
||||||
|
*/
|
||||||
|
if (*state->report == NULL)
|
||||||
|
*state->report = createPQExpBuffer();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If this is the first time we see an error for the check in question
|
||||||
|
* then print a status message of the failure.
|
||||||
|
*/
|
||||||
|
if (!(*state->result))
|
||||||
|
{
|
||||||
|
pg_log(PG_REPORT, " failed check: %s", _(state->check->status));
|
||||||
|
appendPQExpBuffer(*state->report, "\n%s\n%s %s\n",
|
||||||
|
_(state->check->report_text),
|
||||||
|
_("A list of the problem columns is in the file:"),
|
||||||
|
output_path);
|
||||||
|
}
|
||||||
|
*state->result = true;
|
||||||
|
|
||||||
|
i_nspname = PQfnumber(res, "nspname");
|
||||||
|
i_relname = PQfnumber(res, "relname");
|
||||||
|
i_attname = PQfnumber(res, "attname");
|
||||||
|
|
||||||
|
for (int rowno = 0; rowno < ntups; rowno++)
|
||||||
|
{
|
||||||
|
if (script == NULL && (script = fopen_priv(output_path, "a")) == NULL)
|
||||||
|
pg_fatal("could not open file \"%s\": %m", output_path);
|
||||||
|
|
||||||
|
if (!db_used)
|
||||||
|
{
|
||||||
|
fprintf(script, "In database: %s\n", dbinfo->db_name);
|
||||||
|
db_used = true;
|
||||||
|
}
|
||||||
|
fprintf(script, " %s.%s.%s\n",
|
||||||
|
PQgetvalue(res, rowno, i_nspname),
|
||||||
|
PQgetvalue(res, rowno, i_relname),
|
||||||
|
PQgetvalue(res, rowno, i_attname));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (script)
|
||||||
|
{
|
||||||
|
fclose(script);
|
||||||
|
script = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* check_for_data_types_usage()
|
* check_for_data_types_usage()
|
||||||
* Detect whether there are any stored columns depending on given type(s)
|
* Detect whether there are any stored columns depending on given type(s)
|
||||||
@ -334,13 +475,15 @@ static DataTypesUsageChecks data_types_usage_checks[] =
|
|||||||
* there's no storage involved in a view.
|
* there's no storage involved in a view.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks)
|
check_for_data_types_usage(ClusterInfo *cluster)
|
||||||
{
|
{
|
||||||
bool found = false;
|
|
||||||
bool *results;
|
bool *results;
|
||||||
PQExpBufferData report;
|
PQExpBuffer report = NULL;
|
||||||
DataTypesUsageChecks *tmp = checks;
|
DataTypesUsageChecks *tmp = data_types_usage_checks;
|
||||||
int n_data_types_usage_checks = 0;
|
int n_data_types_usage_checks = 0;
|
||||||
|
UpgradeTask *task = upgrade_task_create();
|
||||||
|
char **queries = NULL;
|
||||||
|
struct data_type_check_state *states;
|
||||||
|
|
||||||
prep_status("Checking data type usage");
|
prep_status("Checking data type usage");
|
||||||
|
|
||||||
@ -353,175 +496,63 @@ check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks)
|
|||||||
|
|
||||||
/* Prepare an array to store the results of checks in */
|
/* Prepare an array to store the results of checks in */
|
||||||
results = pg_malloc0(sizeof(bool) * n_data_types_usage_checks);
|
results = pg_malloc0(sizeof(bool) * n_data_types_usage_checks);
|
||||||
|
queries = pg_malloc0(sizeof(char *) * n_data_types_usage_checks);
|
||||||
|
states = pg_malloc0(sizeof(struct data_type_check_state) * n_data_types_usage_checks);
|
||||||
|
|
||||||
|
for (int i = 0; i < n_data_types_usage_checks; i++)
|
||||||
|
{
|
||||||
|
DataTypesUsageChecks *check = &data_types_usage_checks[i];
|
||||||
|
|
||||||
|
if (check->threshold_version == MANUAL_CHECK)
|
||||||
|
{
|
||||||
|
Assert(check->version_hook);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Make sure that the check applies to the current cluster version
|
||||||
|
* and skip it if not.
|
||||||
|
*/
|
||||||
|
if (!check->version_hook(cluster))
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
else if (check->threshold_version != ALL_VERSIONS)
|
||||||
|
{
|
||||||
|
if (GET_MAJOR_VERSION(cluster->major_version) > check->threshold_version)
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
Assert(check->threshold_version == ALL_VERSIONS);
|
||||||
|
|
||||||
|
queries[i] = data_type_check_query(i);
|
||||||
|
|
||||||
|
states[i].check = check;
|
||||||
|
states[i].result = &results[i];
|
||||||
|
states[i].report = &report;
|
||||||
|
|
||||||
|
upgrade_task_add_step(task, queries[i], process_data_type_check,
|
||||||
|
true, &states[i]);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Connect to each database in the cluster and run all defined checks
|
* Connect to each database in the cluster and run all defined checks
|
||||||
* against that database before trying the next one.
|
* against that database before trying the next one.
|
||||||
*/
|
*/
|
||||||
for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
|
upgrade_task_run(task, cluster);
|
||||||
|
upgrade_task_free(task);
|
||||||
|
|
||||||
|
if (report)
|
||||||
{
|
{
|
||||||
DbInfo *active_db = &cluster->dbarr.dbs[dbnum];
|
pg_fatal("Data type checks failed: %s", report->data);
|
||||||
PGconn *conn = connectToServer(cluster, active_db->db_name);
|
destroyPQExpBuffer(report);
|
||||||
|
|
||||||
for (int checknum = 0; checknum < n_data_types_usage_checks; checknum++)
|
|
||||||
{
|
|
||||||
PGresult *res;
|
|
||||||
int ntups;
|
|
||||||
int i_nspname;
|
|
||||||
int i_relname;
|
|
||||||
int i_attname;
|
|
||||||
FILE *script = NULL;
|
|
||||||
bool db_used = false;
|
|
||||||
char output_path[MAXPGPATH];
|
|
||||||
DataTypesUsageChecks *cur_check = &checks[checknum];
|
|
||||||
|
|
||||||
if (cur_check->threshold_version == MANUAL_CHECK)
|
|
||||||
{
|
|
||||||
Assert(cur_check->version_hook);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Make sure that the check applies to the current cluster
|
|
||||||
* version and skip if not. If no check hook has been defined
|
|
||||||
* we run the check for all versions.
|
|
||||||
*/
|
|
||||||
if (!cur_check->version_hook(cluster))
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
else if (cur_check->threshold_version != ALL_VERSIONS)
|
|
||||||
{
|
|
||||||
if (GET_MAJOR_VERSION(cluster->major_version) > cur_check->threshold_version)
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
Assert(cur_check->threshold_version == ALL_VERSIONS);
|
|
||||||
|
|
||||||
snprintf(output_path, sizeof(output_path), "%s/%s",
|
|
||||||
log_opts.basedir,
|
|
||||||
cur_check->report_filename);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* The type(s) of interest might be wrapped in a domain, array,
|
|
||||||
* composite, or range, and these container types can be nested
|
|
||||||
* (to varying extents depending on server version, but that's not
|
|
||||||
* of concern here). To handle all these cases we need a
|
|
||||||
* recursive CTE.
|
|
||||||
*/
|
|
||||||
res = executeQueryOrDie(conn,
|
|
||||||
"WITH RECURSIVE oids AS ( "
|
|
||||||
/* start with the type(s) returned by base_query */
|
|
||||||
" %s "
|
|
||||||
" UNION ALL "
|
|
||||||
" SELECT * FROM ( "
|
|
||||||
/* inner WITH because we can only reference the CTE once */
|
|
||||||
" WITH x AS (SELECT oid FROM oids) "
|
|
||||||
/* domains on any type selected so far */
|
|
||||||
" SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
|
|
||||||
" UNION ALL "
|
|
||||||
/* arrays over any type selected so far */
|
|
||||||
" SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' "
|
|
||||||
" UNION ALL "
|
|
||||||
/* composite types containing any type selected so far */
|
|
||||||
" SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
|
|
||||||
" WHERE t.typtype = 'c' AND "
|
|
||||||
" t.oid = c.reltype AND "
|
|
||||||
" c.oid = a.attrelid AND "
|
|
||||||
" NOT a.attisdropped AND "
|
|
||||||
" a.atttypid = x.oid "
|
|
||||||
" UNION ALL "
|
|
||||||
/* ranges containing any type selected so far */
|
|
||||||
" SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x "
|
|
||||||
" WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid"
|
|
||||||
" ) foo "
|
|
||||||
") "
|
|
||||||
/* now look for stored columns of any such type */
|
|
||||||
"SELECT n.nspname, c.relname, a.attname "
|
|
||||||
"FROM pg_catalog.pg_class c, "
|
|
||||||
" pg_catalog.pg_namespace n, "
|
|
||||||
" pg_catalog.pg_attribute a "
|
|
||||||
"WHERE c.oid = a.attrelid AND "
|
|
||||||
" NOT a.attisdropped AND "
|
|
||||||
" a.atttypid IN (SELECT oid FROM oids) AND "
|
|
||||||
" c.relkind IN ("
|
|
||||||
CppAsString2(RELKIND_RELATION) ", "
|
|
||||||
CppAsString2(RELKIND_MATVIEW) ", "
|
|
||||||
CppAsString2(RELKIND_INDEX) ") AND "
|
|
||||||
" c.relnamespace = n.oid AND "
|
|
||||||
/* exclude possible orphaned temp tables */
|
|
||||||
" n.nspname !~ '^pg_temp_' AND "
|
|
||||||
" n.nspname !~ '^pg_toast_temp_' AND "
|
|
||||||
/* exclude system catalogs, too */
|
|
||||||
" n.nspname NOT IN ('pg_catalog', 'information_schema')",
|
|
||||||
cur_check->base_query);
|
|
||||||
|
|
||||||
ntups = PQntuples(res);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* The datatype was found, so extract the data and log to the
|
|
||||||
* requested filename. We need to open the file for appending
|
|
||||||
* since the check might have already found the type in another
|
|
||||||
* database earlier in the loop.
|
|
||||||
*/
|
|
||||||
if (ntups)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Make sure we have a buffer to save reports to now that we
|
|
||||||
* found a first failing check.
|
|
||||||
*/
|
|
||||||
if (!found)
|
|
||||||
initPQExpBuffer(&report);
|
|
||||||
found = true;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If this is the first time we see an error for the check in
|
|
||||||
* question then print a status message of the failure.
|
|
||||||
*/
|
|
||||||
if (!results[checknum])
|
|
||||||
{
|
|
||||||
pg_log(PG_REPORT, " failed check: %s", _(cur_check->status));
|
|
||||||
appendPQExpBuffer(&report, "\n%s\n%s %s\n",
|
|
||||||
_(cur_check->report_text),
|
|
||||||
_("A list of the problem columns is in the file:"),
|
|
||||||
output_path);
|
|
||||||
}
|
|
||||||
results[checknum] = true;
|
|
||||||
|
|
||||||
i_nspname = PQfnumber(res, "nspname");
|
|
||||||
i_relname = PQfnumber(res, "relname");
|
|
||||||
i_attname = PQfnumber(res, "attname");
|
|
||||||
|
|
||||||
for (int rowno = 0; rowno < ntups; rowno++)
|
|
||||||
{
|
|
||||||
if (script == NULL && (script = fopen_priv(output_path, "a")) == NULL)
|
|
||||||
pg_fatal("could not open file \"%s\": %m", output_path);
|
|
||||||
|
|
||||||
if (!db_used)
|
|
||||||
{
|
|
||||||
fprintf(script, "In database: %s\n", active_db->db_name);
|
|
||||||
db_used = true;
|
|
||||||
}
|
|
||||||
fprintf(script, " %s.%s.%s\n",
|
|
||||||
PQgetvalue(res, rowno, i_nspname),
|
|
||||||
PQgetvalue(res, rowno, i_relname),
|
|
||||||
PQgetvalue(res, rowno, i_attname));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (script)
|
|
||||||
{
|
|
||||||
fclose(script);
|
|
||||||
script = NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
PQclear(res);
|
|
||||||
}
|
|
||||||
|
|
||||||
PQfinish(conn);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (found)
|
|
||||||
pg_fatal("Data type checks failed: %s", report.data);
|
|
||||||
|
|
||||||
pg_free(results);
|
pg_free(results);
|
||||||
|
for (int i = 0; i < n_data_types_usage_checks; i++)
|
||||||
|
{
|
||||||
|
if (queries[i])
|
||||||
|
pg_free(queries[i]);
|
||||||
|
}
|
||||||
|
pg_free(queries);
|
||||||
|
pg_free(states);
|
||||||
|
|
||||||
check_ok();
|
check_ok();
|
||||||
}
|
}
|
||||||
@ -616,7 +647,7 @@ check_and_dump_old_cluster(void)
|
|||||||
check_old_cluster_subscription_state();
|
check_old_cluster_subscription_state();
|
||||||
}
|
}
|
||||||
|
|
||||||
check_for_data_types_usage(&old_cluster, data_types_usage_checks);
|
check_for_data_types_usage(&old_cluster);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* PG 14 changed the function signature of encoding conversion functions.
|
* PG 14 changed the function signature of encoding conversion functions.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user