mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-25 13:17:41 +03:00 
			
		
		
		
	postgres_fdw: Fix connection leak.
In postgres_fdw, the cached connections to foreign servers will not be closed until the local session exits if the user mappings or foreign servers that those connections depend on are dropped. Those connections can be leaked. To fix that connection leak issue, after a change to a pg_foreign_server or pg_user_mapping catalog entry, this commit makes postgres_fdw close the connections depending on that entry immediately if current transaction has not used those connections yet. Otherwise, mark those connections as invalid and then close them at the end of current transaction, since they cannot be closed in the midst of the transaction using them. Closed connections will be remade at the next opportunity if necessary. Back-patch to all supported branches. Author: Bharath Rupireddy Reviewed-by: Zhihong Yu, Zhijie Hou, Fujii Masao Discussion: https://postgr.es/m/CALj2ACVNcGH_6qLY-4_tXz8JLvA+4yeBThRfxMz7Oxbk1aHcpQ@mail.gmail.com
This commit is contained in:
		| @@ -812,12 +812,14 @@ pgfdw_xact_callback(XactEvent event, void *arg) | |||||||
| 		entry->xact_depth = 0; | 		entry->xact_depth = 0; | ||||||
|  |  | ||||||
| 		/* | 		/* | ||||||
| 		 * If the connection isn't in a good idle state, discard it to | 		 * If the connection isn't in a good idle state or it is marked as | ||||||
| 		 * recover. Next GetConnection will open a new connection. | 		 * invalid, then discard it to recover. Next GetConnection will open a | ||||||
|  | 		 * new connection. | ||||||
| 		 */ | 		 */ | ||||||
| 		if (PQstatus(entry->conn) != CONNECTION_OK || | 		if (PQstatus(entry->conn) != CONNECTION_OK || | ||||||
| 			PQtransactionStatus(entry->conn) != PQTRANS_IDLE || | 			PQtransactionStatus(entry->conn) != PQTRANS_IDLE || | ||||||
| 			entry->changing_xact_state) | 			entry->changing_xact_state || | ||||||
|  | 			entry->invalidated) | ||||||
| 		{ | 		{ | ||||||
| 			elog(DEBUG3, "discarding connection %p", entry->conn); | 			elog(DEBUG3, "discarding connection %p", entry->conn); | ||||||
| 			disconnect_pg_server(entry); | 			disconnect_pg_server(entry); | ||||||
| @@ -941,9 +943,12 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, | |||||||
|  * Connection invalidation callback function |  * Connection invalidation callback function | ||||||
|  * |  * | ||||||
|  * After a change to a pg_foreign_server or pg_user_mapping catalog entry, |  * After a change to a pg_foreign_server or pg_user_mapping catalog entry, | ||||||
|  * mark connections depending on that entry as needing to be remade. |  * close connections depending on that entry immediately if current transaction | ||||||
|  * We can't immediately destroy them, since they might be in the midst of |  * has not used those connections yet. Otherwise, mark those connections as | ||||||
|  * a transaction, but we'll remake them at the next opportunity. |  * invalid and then make pgfdw_xact_callback() close them at the end of current | ||||||
|  |  * transaction, since they cannot be closed in the midst of the transaction | ||||||
|  |  * using them. Closed connections will be remade at the next opportunity if | ||||||
|  |  * necessary. | ||||||
|  * |  * | ||||||
|  * Although most cache invalidation callbacks blow away all the related stuff |  * Although most cache invalidation callbacks blow away all the related stuff | ||||||
|  * regardless of the given hashvalue, connections are expensive enough that |  * regardless of the given hashvalue, connections are expensive enough that | ||||||
| @@ -974,9 +979,23 @@ pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue) | |||||||
| 			 entry->server_hashvalue == hashvalue) || | 			 entry->server_hashvalue == hashvalue) || | ||||||
| 			(cacheid == USERMAPPINGOID && | 			(cacheid == USERMAPPINGOID && | ||||||
| 			 entry->mapping_hashvalue == hashvalue)) | 			 entry->mapping_hashvalue == hashvalue)) | ||||||
|  | 		{ | ||||||
|  | 			/* | ||||||
|  | 			 * Close the connection immediately if it's not used yet in this | ||||||
|  | 			 * transaction. Otherwise mark it as invalid so that | ||||||
|  | 			 * pgfdw_xact_callback() can close it at the end of this | ||||||
|  | 			 * transaction. | ||||||
|  | 			 */ | ||||||
|  | 			if (entry->xact_depth == 0) | ||||||
|  | 			{ | ||||||
|  | 				elog(DEBUG3, "discarding connection %p", entry->conn); | ||||||
|  | 				disconnect_pg_server(entry); | ||||||
|  | 			} | ||||||
|  | 			else | ||||||
| 				entry->invalidated = true; | 				entry->invalidated = true; | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| /* | /* | ||||||
|  * Raise an error if the given connection cache entry is marked as being |  * Raise an error if the given connection cache entry is marked as being | ||||||
|   | |||||||
| @@ -8842,3 +8842,21 @@ SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 | |||||||
|  |  | ||||||
| -- Clean-up | -- Clean-up | ||||||
| RESET enable_partitionwise_aggregate; | RESET enable_partitionwise_aggregate; | ||||||
|  | -- =================================================================== | ||||||
|  | -- test connection invalidation cases | ||||||
|  | -- =================================================================== | ||||||
|  | -- This test case is for closing the connection in pgfdw_xact_callback | ||||||
|  | BEGIN; | ||||||
|  | -- Connection xact depth becomes 1 i.e. the connection is in midst of the xact. | ||||||
|  | SELECT 1 FROM ft1 LIMIT 1; | ||||||
|  |  ?column?  | ||||||
|  | ---------- | ||||||
|  |         1 | ||||||
|  | (1 row) | ||||||
|  |  | ||||||
|  | -- Connection is not closed at the end of the alter statement in | ||||||
|  | -- pgfdw_inval_callback. That's because the connection is in midst of this | ||||||
|  | -- xact, it is just marked as invalid. | ||||||
|  | ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off'); | ||||||
|  | -- The invalid connection gets closed in pgfdw_xact_callback during commit. | ||||||
|  | COMMIT; | ||||||
|   | |||||||
| @@ -2516,3 +2516,17 @@ SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 | |||||||
|  |  | ||||||
| -- Clean-up | -- Clean-up | ||||||
| RESET enable_partitionwise_aggregate; | RESET enable_partitionwise_aggregate; | ||||||
|  |  | ||||||
|  | -- =================================================================== | ||||||
|  | -- test connection invalidation cases | ||||||
|  | -- =================================================================== | ||||||
|  | -- This test case is for closing the connection in pgfdw_xact_callback | ||||||
|  | BEGIN; | ||||||
|  | -- Connection xact depth becomes 1 i.e. the connection is in midst of the xact. | ||||||
|  | SELECT 1 FROM ft1 LIMIT 1; | ||||||
|  | -- Connection is not closed at the end of the alter statement in | ||||||
|  | -- pgfdw_inval_callback. That's because the connection is in midst of this | ||||||
|  | -- xact, it is just marked as invalid. | ||||||
|  | ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off'); | ||||||
|  | -- The invalid connection gets closed in pgfdw_xact_callback during commit. | ||||||
|  | COMMIT; | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user