mirror of
https://github.com/postgres/postgres.git
synced 2025-05-01 01:04:50 +03:00
pg_upgrade: Parallelize retrieving relation information.
This commit makes use of the new task framework in pg_upgrade to parallelize retrieving relation and logical slot information. This step will now process multiple databases concurrently when pg_upgrade's --jobs option is provided a value greater than 1. Reviewed-by: Daniel Gustafsson, Ilya Gladyshev Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
This commit is contained in:
parent
40e2e5e92b
commit
6d3d2e8e54
@ -11,6 +11,7 @@
|
||||
|
||||
#include "access/transam.h"
|
||||
#include "catalog/pg_class_d.h"
|
||||
#include "pqexpbuffer.h"
|
||||
#include "pg_upgrade.h"
|
||||
|
||||
static void create_rel_filename_map(const char *old_data, const char *new_data,
|
||||
@ -22,12 +23,14 @@ static void report_unmatched_relation(const RelInfo *rel, const DbInfo *db,
|
||||
static void free_db_and_rel_infos(DbInfoArr *db_arr);
|
||||
static void get_template0_info(ClusterInfo *cluster);
|
||||
static void get_db_infos(ClusterInfo *cluster);
|
||||
static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
|
||||
static char *get_rel_infos_query(void);
|
||||
static void process_rel_infos(DbInfo *dbinfo, PGresult *res, void *arg);
|
||||
static void free_rel_infos(RelInfoArr *rel_arr);
|
||||
static void print_db_infos(DbInfoArr *db_arr);
|
||||
static void print_rel_infos(RelInfoArr *rel_arr);
|
||||
static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
|
||||
static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo);
|
||||
static char *get_old_cluster_logical_slot_infos_query(void);
|
||||
static void process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg);
|
||||
|
||||
|
||||
/*
|
||||
@ -276,7 +279,9 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db)
|
||||
void
|
||||
get_db_rel_and_slot_infos(ClusterInfo *cluster)
|
||||
{
|
||||
int dbnum;
|
||||
UpgradeTask *task = upgrade_task_create();
|
||||
char *rel_infos_query = NULL;
|
||||
char *logical_slot_infos_query = NULL;
|
||||
|
||||
if (cluster->dbarr.dbs != NULL)
|
||||
free_db_and_rel_infos(&cluster->dbarr);
|
||||
@ -284,16 +289,38 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster)
|
||||
get_template0_info(cluster);
|
||||
get_db_infos(cluster);
|
||||
|
||||
for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
|
||||
rel_infos_query = get_rel_infos_query();
|
||||
upgrade_task_add_step(task,
|
||||
rel_infos_query,
|
||||
process_rel_infos,
|
||||
true, NULL);
|
||||
|
||||
/*
|
||||
* Logical slots are only carried over to the new cluster when the old
|
||||
* cluster is on PG17 or newer. This is because before that the logical
|
||||
* slots are not saved at shutdown, so there is no guarantee that the
|
||||
* latest confirmed_flush_lsn is saved to disk which can lead to data
|
||||
* loss. It is still not guaranteed for manually created slots in PG17, so
|
||||
* subsequent checks done in check_old_cluster_for_valid_slots() would
|
||||
* raise a FATAL error if such slots are included.
|
||||
*/
|
||||
if (cluster == &old_cluster &&
|
||||
GET_MAJOR_VERSION(cluster->major_version) > 1600)
|
||||
{
|
||||
DbInfo *pDbInfo = &cluster->dbarr.dbs[dbnum];
|
||||
|
||||
get_rel_infos(cluster, pDbInfo);
|
||||
|
||||
if (cluster == &old_cluster)
|
||||
get_old_cluster_logical_slot_infos(pDbInfo);
|
||||
logical_slot_infos_query = get_old_cluster_logical_slot_infos_query();
|
||||
upgrade_task_add_step(task,
|
||||
logical_slot_infos_query,
|
||||
process_old_cluster_logical_slot_infos,
|
||||
true, NULL);
|
||||
}
|
||||
|
||||
upgrade_task_run(task, cluster);
|
||||
upgrade_task_free(task);
|
||||
|
||||
pg_free(rel_infos_query);
|
||||
if (logical_slot_infos_query)
|
||||
pg_free(logical_slot_infos_query);
|
||||
|
||||
if (cluster == &old_cluster)
|
||||
pg_log(PG_VERBOSE, "\nsource databases:");
|
||||
else
|
||||
@ -431,40 +458,21 @@ get_db_infos(ClusterInfo *cluster)
|
||||
|
||||
|
||||
/*
|
||||
* get_rel_infos()
|
||||
* get_rel_infos_query()
|
||||
*
|
||||
* gets the relinfos for all the user tables and indexes of the database
|
||||
* referred to by "dbinfo".
|
||||
* Returns the query for retrieving the relation information for all the user
|
||||
* tables and indexes in the database, for use by get_db_rel_and_slot_infos()'s
|
||||
* UpgradeTask.
|
||||
*
|
||||
* Note: the resulting RelInfo array is assumed to be sorted by OID.
|
||||
* This allows later processing to match up old and new databases efficiently.
|
||||
* Note: the result is assumed to be sorted by OID. This allows later
|
||||
* processing to match up old and new databases efficiently.
|
||||
*/
|
||||
static void
|
||||
get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
|
||||
static char *
|
||||
get_rel_infos_query(void)
|
||||
{
|
||||
PGconn *conn = connectToServer(cluster,
|
||||
dbinfo->db_name);
|
||||
PGresult *res;
|
||||
RelInfo *relinfos;
|
||||
int ntups;
|
||||
int relnum;
|
||||
int num_rels = 0;
|
||||
char *nspname = NULL;
|
||||
char *relname = NULL;
|
||||
char *tablespace = NULL;
|
||||
int i_spclocation,
|
||||
i_nspname,
|
||||
i_relname,
|
||||
i_reloid,
|
||||
i_indtable,
|
||||
i_toastheap,
|
||||
i_relfilenumber,
|
||||
i_reltablespace;
|
||||
char query[QUERY_ALLOC];
|
||||
char *last_namespace = NULL,
|
||||
*last_tablespace = NULL;
|
||||
PQExpBufferData query;
|
||||
|
||||
query[0] = '\0'; /* initialize query string to empty */
|
||||
initPQExpBuffer(&query);
|
||||
|
||||
/*
|
||||
* Create a CTE that collects OIDs of regular user tables and matviews,
|
||||
@ -476,34 +484,34 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
|
||||
* output, so we have to copy that system table. It's easiest to do that
|
||||
* by treating it as a user table.
|
||||
*/
|
||||
snprintf(query + strlen(query), sizeof(query) - strlen(query),
|
||||
"WITH regular_heap (reloid, indtable, toastheap) AS ( "
|
||||
" SELECT c.oid, 0::oid, 0::oid "
|
||||
" FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n "
|
||||
" ON c.relnamespace = n.oid "
|
||||
" WHERE relkind IN (" CppAsString2(RELKIND_RELATION) ", "
|
||||
CppAsString2(RELKIND_MATVIEW) ") AND "
|
||||
appendPQExpBuffer(&query,
|
||||
"WITH regular_heap (reloid, indtable, toastheap) AS ( "
|
||||
" SELECT c.oid, 0::oid, 0::oid "
|
||||
" FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n "
|
||||
" ON c.relnamespace = n.oid "
|
||||
" WHERE relkind IN (" CppAsString2(RELKIND_RELATION) ", "
|
||||
CppAsString2(RELKIND_MATVIEW) ") AND "
|
||||
/* exclude possible orphaned temp tables */
|
||||
" ((n.nspname !~ '^pg_temp_' AND "
|
||||
" n.nspname !~ '^pg_toast_temp_' AND "
|
||||
" n.nspname NOT IN ('pg_catalog', 'information_schema', "
|
||||
" 'binary_upgrade', 'pg_toast') AND "
|
||||
" c.oid >= %u::pg_catalog.oid) OR "
|
||||
" (n.nspname = 'pg_catalog' AND "
|
||||
" relname IN ('pg_largeobject') ))), ",
|
||||
FirstNormalObjectId);
|
||||
" ((n.nspname !~ '^pg_temp_' AND "
|
||||
" n.nspname !~ '^pg_toast_temp_' AND "
|
||||
" n.nspname NOT IN ('pg_catalog', 'information_schema', "
|
||||
" 'binary_upgrade', 'pg_toast') AND "
|
||||
" c.oid >= %u::pg_catalog.oid) OR "
|
||||
" (n.nspname = 'pg_catalog' AND "
|
||||
" relname IN ('pg_largeobject') ))), ",
|
||||
FirstNormalObjectId);
|
||||
|
||||
/*
|
||||
* Add a CTE that collects OIDs of toast tables belonging to the tables
|
||||
* selected by the regular_heap CTE. (We have to do this separately
|
||||
* because the namespace-name rules above don't work for toast tables.)
|
||||
*/
|
||||
snprintf(query + strlen(query), sizeof(query) - strlen(query),
|
||||
" toast_heap (reloid, indtable, toastheap) AS ( "
|
||||
" SELECT c.reltoastrelid, 0::oid, c.oid "
|
||||
" FROM regular_heap JOIN pg_catalog.pg_class c "
|
||||
" ON regular_heap.reloid = c.oid "
|
||||
" WHERE c.reltoastrelid != 0), ");
|
||||
appendPQExpBufferStr(&query,
|
||||
" toast_heap (reloid, indtable, toastheap) AS ( "
|
||||
" SELECT c.reltoastrelid, 0::oid, c.oid "
|
||||
" FROM regular_heap JOIN pg_catalog.pg_class c "
|
||||
" ON regular_heap.reloid = c.oid "
|
||||
" WHERE c.reltoastrelid != 0), ");
|
||||
|
||||
/*
|
||||
* Add a CTE that collects OIDs of all valid indexes on the previously
|
||||
@ -511,53 +519,68 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
|
||||
* Testing indisready is necessary in 9.2, and harmless in earlier/later
|
||||
* versions.
|
||||
*/
|
||||
snprintf(query + strlen(query), sizeof(query) - strlen(query),
|
||||
" all_index (reloid, indtable, toastheap) AS ( "
|
||||
" SELECT indexrelid, indrelid, 0::oid "
|
||||
" FROM pg_catalog.pg_index "
|
||||
" WHERE indisvalid AND indisready "
|
||||
" AND indrelid IN "
|
||||
" (SELECT reloid FROM regular_heap "
|
||||
" UNION ALL "
|
||||
" SELECT reloid FROM toast_heap)) ");
|
||||
appendPQExpBufferStr(&query,
|
||||
" all_index (reloid, indtable, toastheap) AS ( "
|
||||
" SELECT indexrelid, indrelid, 0::oid "
|
||||
" FROM pg_catalog.pg_index "
|
||||
" WHERE indisvalid AND indisready "
|
||||
" AND indrelid IN "
|
||||
" (SELECT reloid FROM regular_heap "
|
||||
" UNION ALL "
|
||||
" SELECT reloid FROM toast_heap)) ");
|
||||
|
||||
/*
|
||||
* And now we can write the query that retrieves the data we want for each
|
||||
* heap and index relation. Make sure result is sorted by OID.
|
||||
*/
|
||||
snprintf(query + strlen(query), sizeof(query) - strlen(query),
|
||||
"SELECT all_rels.*, n.nspname, c.relname, "
|
||||
" c.relfilenode, c.reltablespace, "
|
||||
" pg_catalog.pg_tablespace_location(t.oid) AS spclocation "
|
||||
"FROM (SELECT * FROM regular_heap "
|
||||
" UNION ALL "
|
||||
" SELECT * FROM toast_heap "
|
||||
" UNION ALL "
|
||||
" SELECT * FROM all_index) all_rels "
|
||||
" JOIN pg_catalog.pg_class c "
|
||||
" ON all_rels.reloid = c.oid "
|
||||
" JOIN pg_catalog.pg_namespace n "
|
||||
" ON c.relnamespace = n.oid "
|
||||
" LEFT OUTER JOIN pg_catalog.pg_tablespace t "
|
||||
" ON c.reltablespace = t.oid "
|
||||
"ORDER BY 1;");
|
||||
appendPQExpBufferStr(&query,
|
||||
"SELECT all_rels.*, n.nspname, c.relname, "
|
||||
" c.relfilenode, c.reltablespace, "
|
||||
" pg_catalog.pg_tablespace_location(t.oid) AS spclocation "
|
||||
"FROM (SELECT * FROM regular_heap "
|
||||
" UNION ALL "
|
||||
" SELECT * FROM toast_heap "
|
||||
" UNION ALL "
|
||||
" SELECT * FROM all_index) all_rels "
|
||||
" JOIN pg_catalog.pg_class c "
|
||||
" ON all_rels.reloid = c.oid "
|
||||
" JOIN pg_catalog.pg_namespace n "
|
||||
" ON c.relnamespace = n.oid "
|
||||
" LEFT OUTER JOIN pg_catalog.pg_tablespace t "
|
||||
" ON c.reltablespace = t.oid "
|
||||
"ORDER BY 1");
|
||||
|
||||
res = executeQueryOrDie(conn, "%s", query);
|
||||
return query.data;
|
||||
}
|
||||
|
||||
ntups = PQntuples(res);
|
||||
/*
|
||||
* Callback function for processing results of the query returned by
|
||||
* get_rel_infos_query(), which is used for get_db_rel_and_slot_infos()'s
|
||||
* UpgradeTask. This function stores the relation information for later use.
|
||||
*/
|
||||
static void
|
||||
process_rel_infos(DbInfo *dbinfo, PGresult *res, void *arg)
|
||||
{
|
||||
int ntups = PQntuples(res);
|
||||
RelInfo *relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups);
|
||||
int i_reloid = PQfnumber(res, "reloid");
|
||||
int i_indtable = PQfnumber(res, "indtable");
|
||||
int i_toastheap = PQfnumber(res, "toastheap");
|
||||
int i_nspname = PQfnumber(res, "nspname");
|
||||
int i_relname = PQfnumber(res, "relname");
|
||||
int i_relfilenumber = PQfnumber(res, "relfilenode");
|
||||
int i_reltablespace = PQfnumber(res, "reltablespace");
|
||||
int i_spclocation = PQfnumber(res, "spclocation");
|
||||
int num_rels = 0;
|
||||
char *nspname = NULL;
|
||||
char *relname = NULL;
|
||||
char *tablespace = NULL;
|
||||
char *last_namespace = NULL;
|
||||
char *last_tablespace = NULL;
|
||||
|
||||
relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups);
|
||||
AssertVariableIsOfType(&process_rel_infos, UpgradeTaskProcessCB);
|
||||
|
||||
i_reloid = PQfnumber(res, "reloid");
|
||||
i_indtable = PQfnumber(res, "indtable");
|
||||
i_toastheap = PQfnumber(res, "toastheap");
|
||||
i_nspname = PQfnumber(res, "nspname");
|
||||
i_relname = PQfnumber(res, "relname");
|
||||
i_relfilenumber = PQfnumber(res, "relfilenode");
|
||||
i_reltablespace = PQfnumber(res, "reltablespace");
|
||||
i_spclocation = PQfnumber(res, "spclocation");
|
||||
|
||||
for (relnum = 0; relnum < ntups; relnum++)
|
||||
for (int relnum = 0; relnum < ntups; relnum++)
|
||||
{
|
||||
RelInfo *curr = &relinfos[num_rels++];
|
||||
|
||||
@ -610,44 +633,22 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
|
||||
/* A zero reltablespace oid indicates the database tablespace. */
|
||||
curr->tablespace = dbinfo->db_tablespace;
|
||||
}
|
||||
PQclear(res);
|
||||
|
||||
PQfinish(conn);
|
||||
|
||||
dbinfo->rel_arr.rels = relinfos;
|
||||
dbinfo->rel_arr.nrels = num_rels;
|
||||
}
|
||||
|
||||
/*
|
||||
* get_old_cluster_logical_slot_infos()
|
||||
* get_old_cluster_logical_slot_infos_query()
|
||||
*
|
||||
* Gets the LogicalSlotInfos for all the logical replication slots of the
|
||||
* database referred to by "dbinfo". The status of each logical slot is gotten
|
||||
* here, but they are used at the checking phase. See
|
||||
* check_old_cluster_for_valid_slots().
|
||||
*
|
||||
* Note: This function will not do anything if the old cluster is pre-PG17.
|
||||
* This is because before that the logical slots are not saved at shutdown, so
|
||||
* there is no guarantee that the latest confirmed_flush_lsn is saved to disk
|
||||
* which can lead to data loss. It is still not guaranteed for manually created
|
||||
* slots in PG17, so subsequent checks done in
|
||||
* check_old_cluster_for_valid_slots() would raise a FATAL error if such slots
|
||||
* are included.
|
||||
* Returns the query for retrieving the logical slot information for all the
|
||||
* logical replication slots in the database, for use by
|
||||
* get_db_rel_and_slot_infos()'s UpgradeTask. The status of each logical slot
|
||||
* is checked in check_old_cluster_for_valid_slots().
|
||||
*/
|
||||
static void
|
||||
get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
|
||||
static char *
|
||||
get_old_cluster_logical_slot_infos_query(void)
|
||||
{
|
||||
PGconn *conn;
|
||||
PGresult *res;
|
||||
LogicalSlotInfo *slotinfos = NULL;
|
||||
int num_slots;
|
||||
|
||||
/* Logical slots can be migrated since PG17. */
|
||||
if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
|
||||
return;
|
||||
|
||||
conn = connectToServer(&old_cluster, dbinfo->db_name);
|
||||
|
||||
/*
|
||||
* Fetch the logical replication slot information. The check whether the
|
||||
* slot is considered caught up is done by an upgrade function. This
|
||||
@ -665,18 +666,32 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
|
||||
* started and stopped several times causing any temporary slots to be
|
||||
* removed.
|
||||
*/
|
||||
res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, "
|
||||
"%s as caught_up, invalidation_reason IS NOT NULL as invalid "
|
||||
"FROM pg_catalog.pg_replication_slots "
|
||||
"WHERE slot_type = 'logical' AND "
|
||||
"database = current_database() AND "
|
||||
"temporary IS FALSE;",
|
||||
user_opts.live_check ? "FALSE" :
|
||||
"(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE "
|
||||
"ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
|
||||
"END)");
|
||||
return psprintf("SELECT slot_name, plugin, two_phase, failover, "
|
||||
"%s as caught_up, invalidation_reason IS NOT NULL as invalid "
|
||||
"FROM pg_catalog.pg_replication_slots "
|
||||
"WHERE slot_type = 'logical' AND "
|
||||
"database = current_database() AND "
|
||||
"temporary IS FALSE;",
|
||||
user_opts.live_check ? "FALSE" :
|
||||
"(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE "
|
||||
"ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
|
||||
"END)");
|
||||
}
|
||||
|
||||
num_slots = PQntuples(res);
|
||||
/*
|
||||
* Callback function for processing results of the query returned by
|
||||
* get_old_cluster_logical_slot_infos_query(), which is used for
|
||||
* get_db_rel_and_slot_infos()'s UpgradeTask. This function stores the logical
|
||||
* slot information for later use.
|
||||
*/
|
||||
static void
|
||||
process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg)
|
||||
{
|
||||
LogicalSlotInfo *slotinfos = NULL;
|
||||
int num_slots = PQntuples(res);
|
||||
|
||||
AssertVariableIsOfType(&process_old_cluster_logical_slot_infos,
|
||||
UpgradeTaskProcessCB);
|
||||
|
||||
if (num_slots)
|
||||
{
|
||||
@ -709,9 +724,6 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
|
||||
}
|
||||
}
|
||||
|
||||
PQclear(res);
|
||||
PQfinish(conn);
|
||||
|
||||
dbinfo->slot_arr.slots = slotinfos;
|
||||
dbinfo->slot_arr.nslots = num_slots;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user