mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-25 13:17:41 +03:00 
			
		
		
		
	Add worker type to pg_stat_subscription.
Thanks to commit 2a8b40e368, the logical replication worker type is
easily determined.  The worker type could already be deduced via
other columns such as leader_pid and relid, but that is unnecessary
complexity for users.
Bumps catversion.
Author: Peter Smith
Reviewed-by: Michael Paquier, Maxim Orlov, Amit Kapila
Discussion: https://postgr.es/m/CAHut%2BPtmbSMfErSk0S7xxVdZJ9XVE3xVLhqBTmT91kf57BeKDQ%40mail.gmail.com
			
			
This commit is contained in:
		| @@ -1993,6 +1993,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage | |||||||
|       </para></entry> |       </para></entry> | ||||||
|      </row> |      </row> | ||||||
|  |  | ||||||
|  |      <row> | ||||||
|  |       <entry role="catalog_table_entry"><para role="column_definition"> | ||||||
|  |        <structfield>worker_type</structfield> <type>text</type> | ||||||
|  |       </para> | ||||||
|  |       <para> | ||||||
|  |        Type of the subscription worker process.  Possible types are | ||||||
|  |        <literal>apply</literal>, <literal>parallel apply</literal>, and | ||||||
|  |        <literal>table synchronization</literal>. | ||||||
|  |       </para></entry> | ||||||
|  |      </row> | ||||||
|  |  | ||||||
|      <row> |      <row> | ||||||
|       <entry role="catalog_table_entry"><para role="column_definition"> |       <entry role="catalog_table_entry"><para role="column_definition"> | ||||||
|        <structfield>pid</structfield> <type>integer</type> |        <structfield>pid</structfield> <type>integer</type> | ||||||
| @@ -2008,7 +2019,7 @@ description | Waiting for a newly initialized WAL file to reach durable storage | |||||||
|       </para> |       </para> | ||||||
|       <para> |       <para> | ||||||
|        Process ID of the leader apply worker if this process is a parallel |        Process ID of the leader apply worker if this process is a parallel | ||||||
|        apply worker; NULL if this process is a leader apply worker or a |        apply worker; NULL if this process is a leader apply worker or a table | ||||||
|        synchronization worker |        synchronization worker | ||||||
|       </para></entry> |       </para></entry> | ||||||
|      </row> |      </row> | ||||||
|   | |||||||
| @@ -949,6 +949,7 @@ CREATE VIEW pg_stat_subscription AS | |||||||
|     SELECT |     SELECT | ||||||
|             su.oid AS subid, |             su.oid AS subid, | ||||||
|             su.subname, |             su.subname, | ||||||
|  |             st.worker_type, | ||||||
|             st.pid, |             st.pid, | ||||||
|             st.leader_pid, |             st.leader_pid, | ||||||
|             st.relid, |             st.relid, | ||||||
|   | |||||||
| @@ -1278,7 +1278,7 @@ GetLeaderApplyWorkerPid(pid_t pid) | |||||||
| Datum | Datum | ||||||
| pg_stat_get_subscription(PG_FUNCTION_ARGS) | pg_stat_get_subscription(PG_FUNCTION_ARGS) | ||||||
| { | { | ||||||
| #define PG_STAT_GET_SUBSCRIPTION_COLS	9 | #define PG_STAT_GET_SUBSCRIPTION_COLS	10 | ||||||
| 	Oid			subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); | 	Oid			subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); | ||||||
| 	int			i; | 	int			i; | ||||||
| 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; | 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; | ||||||
| @@ -1339,6 +1339,22 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) | |||||||
| 		else | 		else | ||||||
| 			values[8] = TimestampTzGetDatum(worker.reply_time); | 			values[8] = TimestampTzGetDatum(worker.reply_time); | ||||||
|  |  | ||||||
|  | 		switch (worker.type) | ||||||
|  | 		{ | ||||||
|  | 			case WORKERTYPE_APPLY: | ||||||
|  | 				values[9] = CStringGetTextDatum("apply"); | ||||||
|  | 				break; | ||||||
|  | 			case WORKERTYPE_PARALLEL_APPLY: | ||||||
|  | 				values[9] = CStringGetTextDatum("parallel apply"); | ||||||
|  | 				break; | ||||||
|  | 			case WORKERTYPE_TABLESYNC: | ||||||
|  | 				values[9] = CStringGetTextDatum("table synchronization"); | ||||||
|  | 				break; | ||||||
|  | 			case WORKERTYPE_UNKNOWN: | ||||||
|  | 				/* Should never happen. */ | ||||||
|  | 				elog(ERROR, "unknown worker type"); | ||||||
|  | 		} | ||||||
|  |  | ||||||
| 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, | 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, | ||||||
| 							 values, nulls); | 							 values, nulls); | ||||||
|  |  | ||||||
|   | |||||||
| @@ -57,6 +57,6 @@ | |||||||
|  */ |  */ | ||||||
|  |  | ||||||
| /*							yyyymmddN */ | /*							yyyymmddN */ | ||||||
| #define CATALOG_VERSION_NO	202309221 | #define CATALOG_VERSION_NO	202309251 | ||||||
|  |  | ||||||
| #endif | #endif | ||||||
|   | |||||||
| @@ -5484,9 +5484,9 @@ | |||||||
|   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', |   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', | ||||||
|   proretset => 't', provolatile => 's', proparallel => 'r', |   proretset => 't', provolatile => 's', proparallel => 'r', | ||||||
|   prorettype => 'record', proargtypes => 'oid', |   prorettype => 'record', proargtypes => 'oid', | ||||||
|   proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}', |   proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}', | ||||||
|   proargmodes => '{i,o,o,o,o,o,o,o,o,o}', |   proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', | ||||||
|   proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}', |   proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}', | ||||||
|   prosrc => 'pg_stat_get_subscription' }, |   prosrc => 'pg_stat_get_subscription' }, | ||||||
| { oid => '2026', descr => 'statistics: current backend PID', | { oid => '2026', descr => 'statistics: current backend PID', | ||||||
|   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r', |   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r', | ||||||
|   | |||||||
| @@ -2118,6 +2118,7 @@ pg_stat_ssl| SELECT pid, | |||||||
|   WHERE (client_port IS NOT NULL); |   WHERE (client_port IS NOT NULL); | ||||||
| pg_stat_subscription| SELECT su.oid AS subid, | pg_stat_subscription| SELECT su.oid AS subid, | ||||||
|     su.subname, |     su.subname, | ||||||
|  |     st.worker_type, | ||||||
|     st.pid, |     st.pid, | ||||||
|     st.leader_pid, |     st.leader_pid, | ||||||
|     st.relid, |     st.relid, | ||||||
| @@ -2127,7 +2128,7 @@ pg_stat_subscription| SELECT su.oid AS subid, | |||||||
|     st.latest_end_lsn, |     st.latest_end_lsn, | ||||||
|     st.latest_end_time |     st.latest_end_time | ||||||
|    FROM (pg_subscription su |    FROM (pg_subscription su | ||||||
|      LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid))); |      LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid))); | ||||||
| pg_stat_subscription_stats| SELECT ss.subid, | pg_stat_subscription_stats| SELECT ss.subid, | ||||||
|     s.subname, |     s.subname, | ||||||
|     ss.apply_error_count, |     ss.apply_error_count, | ||||||
|   | |||||||
| @@ -80,7 +80,7 @@ $node_subscriber->safe_psql('postgres', | |||||||
|  |  | ||||||
| # wait for it to start | # wait for it to start | ||||||
| $node_subscriber->poll_query_until('postgres', | $node_subscriber->poll_query_until('postgres', | ||||||
| 	"SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND relid IS NULL" | 	"SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND worker_type = 'apply'" | ||||||
| ) or die "Timed out while waiting for subscriber to start"; | ) or die "Timed out while waiting for subscriber to start"; | ||||||
|  |  | ||||||
| # and drop both subscriptions | # and drop both subscriptions | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user