mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-22 14:32:25 +03:00 
			
		
		
		
	Fix low-probability leaks of PGresult objects in the backend.
We had three occurrences of essentially the same coding pattern wherein we tried to retrieve a query result from a libpq connection without blocking. In the case where PQconsumeInput failed (typically indicating a lost connection), all three loops simply gave up and returned, forgetting to clear any previously-collected PGresult object. Since those are malloc'd not palloc'd, the oversight results in a process-lifespan memory leak. One instance, in libpqwalreceiver, is of little significance because the walreceiver process would just quit anyway if its connection fails. But we might as well fix it. The other two instances, in postgres_fdw, are somewhat more worrisome because at least in principle the scenario could be repeated, allowing the amount of memory leaked to build up to something worth worrying about. Moreover, in these cases the loops contain CHECK_FOR_INTERRUPTS calls, as well as other calls that could potentially elog(ERROR), providing another way to exit without having cleared the PGresult. Here we need to add PG_TRY logic similar to what exists in quite a few other places in postgres_fdw. Coverity noted the libpqwalreceiver bug; I found the other two cases by checking all calls of PQconsumeInput. Back-patch to all supported versions as appropriate (9.2 lacks postgres_fdw, so this is really quite unexciting for that branch). Discussion: https://postgr.es/m/22620.1497486981@sss.pgh.pa.us
This commit is contained in:
		| @@ -495,7 +495,7 @@ pgfdw_exec_query(PGconn *conn, const char *query) | ||||
|  * | ||||
|  * This function offers quick responsiveness by checking for any interruptions. | ||||
|  * | ||||
|  * This function emulates the PQexec()'s behavior of returning the last result | ||||
|  * This function emulates PQexec()'s behavior of returning the last result | ||||
|  * when there are many. | ||||
|  * | ||||
|  * Caller is responsible for the error handling on the result. | ||||
| @@ -503,40 +503,50 @@ pgfdw_exec_query(PGconn *conn, const char *query) | ||||
| PGresult * | ||||
| pgfdw_get_result(PGconn *conn, const char *query) | ||||
| { | ||||
| 	PGresult   *last_res = NULL; | ||||
| 	PGresult   *volatile last_res = NULL; | ||||
|  | ||||
| 	for (;;) | ||||
| 	/* In what follows, do not leak any PGresults on an error. */ | ||||
| 	PG_TRY(); | ||||
| 	{ | ||||
| 		PGresult *res; | ||||
|  | ||||
| 		while (PQisBusy(conn)) | ||||
| 		for (;;) | ||||
| 		{ | ||||
| 			int		wc; | ||||
| 			PGresult   *res; | ||||
|  | ||||
| 			/* Sleep until there's something to do */ | ||||
| 			wc = WaitLatchOrSocket(MyLatch, | ||||
| 								   WL_LATCH_SET | WL_SOCKET_READABLE, | ||||
| 								   PQsocket(conn), | ||||
| 								   -1L); | ||||
| 			ResetLatch(MyLatch); | ||||
|  | ||||
| 			CHECK_FOR_INTERRUPTS(); | ||||
|  | ||||
| 			/* Data available in socket */ | ||||
| 			if (wc & WL_SOCKET_READABLE) | ||||
| 			while (PQisBusy(conn)) | ||||
| 			{ | ||||
| 				if (!PQconsumeInput(conn)) | ||||
| 					pgfdw_report_error(ERROR, NULL, conn, false, query); | ||||
| 				int			wc; | ||||
|  | ||||
| 				/* Sleep until there's something to do */ | ||||
| 				wc = WaitLatchOrSocket(MyLatch, | ||||
| 									   WL_LATCH_SET | WL_SOCKET_READABLE, | ||||
| 									   PQsocket(conn), | ||||
| 									   -1L); | ||||
| 				ResetLatch(MyLatch); | ||||
|  | ||||
| 				CHECK_FOR_INTERRUPTS(); | ||||
|  | ||||
| 				/* Data available in socket? */ | ||||
| 				if (wc & WL_SOCKET_READABLE) | ||||
| 				{ | ||||
| 					if (!PQconsumeInput(conn)) | ||||
| 						pgfdw_report_error(ERROR, NULL, conn, false, query); | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			res = PQgetResult(conn); | ||||
| 			if (res == NULL) | ||||
| 				break;			/* query is complete */ | ||||
|  | ||||
| 			PQclear(last_res); | ||||
| 			last_res = res; | ||||
| 		} | ||||
|  | ||||
| 		res = PQgetResult(conn); | ||||
| 		if (res == NULL) | ||||
| 			break;				/* query is complete */ | ||||
|  | ||||
| 		PQclear(last_res); | ||||
| 		last_res = res; | ||||
| 	} | ||||
| 	PG_CATCH(); | ||||
| 	{ | ||||
| 		PQclear(last_res); | ||||
| 		PG_RE_THROW(); | ||||
| 	} | ||||
| 	PG_END_TRY(); | ||||
|  | ||||
| 	return last_res; | ||||
| } | ||||
| @@ -1007,6 +1017,7 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors) | ||||
| 		pgfdw_report_error(WARNING, result, conn, true, query); | ||||
| 		return ignore_errors; | ||||
| 	} | ||||
| 	PQclear(result); | ||||
|  | ||||
| 	return true; | ||||
| } | ||||
| @@ -1029,56 +1040,75 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors) | ||||
| static bool | ||||
| pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result) | ||||
| { | ||||
| 	PGresult   *last_res = NULL; | ||||
| 	volatile bool timed_out = false; | ||||
| 	PGresult   *volatile last_res = NULL; | ||||
|  | ||||
| 	for (;;) | ||||
| 	/* In what follows, do not leak any PGresults on an error. */ | ||||
| 	PG_TRY(); | ||||
| 	{ | ||||
| 		PGresult   *res; | ||||
|  | ||||
| 		while (PQisBusy(conn)) | ||||
| 		for (;;) | ||||
| 		{ | ||||
| 			int			wc; | ||||
| 			TimestampTz now = GetCurrentTimestamp(); | ||||
| 			long		secs; | ||||
| 			int			microsecs; | ||||
| 			long		cur_timeout; | ||||
| 			PGresult   *res; | ||||
|  | ||||
| 			/* If timeout has expired, give up, else get sleep time. */ | ||||
| 			if (now >= endtime) | ||||
| 				return true; | ||||
| 			TimestampDifference(now, endtime, &secs, µsecs); | ||||
|  | ||||
| 			/* To protect against clock skew, limit sleep to one minute. */ | ||||
| 			cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs); | ||||
|  | ||||
| 			/* Sleep until there's something to do */ | ||||
| 			wc = WaitLatchOrSocket(MyLatch, | ||||
| 							  WL_LATCH_SET | WL_SOCKET_READABLE | WL_TIMEOUT, | ||||
| 								   PQsocket(conn), | ||||
| 								   cur_timeout); | ||||
| 			ResetLatch(MyLatch); | ||||
|  | ||||
| 			CHECK_FOR_INTERRUPTS(); | ||||
|  | ||||
| 			/* Data available in socket */ | ||||
| 			if (wc & WL_SOCKET_READABLE) | ||||
| 			while (PQisBusy(conn)) | ||||
| 			{ | ||||
| 				if (!PQconsumeInput(conn)) | ||||
| 				int			wc; | ||||
| 				TimestampTz now = GetCurrentTimestamp(); | ||||
| 				long		secs; | ||||
| 				int			microsecs; | ||||
| 				long		cur_timeout; | ||||
|  | ||||
| 				/* If timeout has expired, give up, else get sleep time. */ | ||||
| 				if (now >= endtime) | ||||
| 				{ | ||||
| 					*result = NULL; | ||||
| 					return false; | ||||
| 					timed_out = true; | ||||
| 					goto exit; | ||||
| 				} | ||||
| 				TimestampDifference(now, endtime, &secs, µsecs); | ||||
|  | ||||
| 				/* To protect against clock skew, limit sleep to one minute. */ | ||||
| 				cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs); | ||||
|  | ||||
| 				/* Sleep until there's something to do */ | ||||
| 				wc = WaitLatchOrSocket(MyLatch, | ||||
| 							  WL_LATCH_SET | WL_SOCKET_READABLE | WL_TIMEOUT, | ||||
| 									   PQsocket(conn), | ||||
| 									   cur_timeout); | ||||
| 				ResetLatch(MyLatch); | ||||
|  | ||||
| 				CHECK_FOR_INTERRUPTS(); | ||||
|  | ||||
| 				/* Data available in socket? */ | ||||
| 				if (wc & WL_SOCKET_READABLE) | ||||
| 				{ | ||||
| 					if (!PQconsumeInput(conn)) | ||||
| 					{ | ||||
| 						/* connection trouble; treat the same as a timeout */ | ||||
| 						timed_out = true; | ||||
| 						goto exit; | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			res = PQgetResult(conn); | ||||
| 			if (res == NULL) | ||||
| 				break;			/* query is complete */ | ||||
|  | ||||
| 			PQclear(last_res); | ||||
| 			last_res = res; | ||||
| 		} | ||||
|  | ||||
| 		res = PQgetResult(conn); | ||||
| 		if (res == NULL) | ||||
| 			break;				/* query is complete */ | ||||
|  | ||||
| 		PQclear(last_res); | ||||
| 		last_res = res; | ||||
| exit:	; | ||||
| 	} | ||||
| 	PG_CATCH(); | ||||
| 	{ | ||||
| 		PQclear(last_res); | ||||
| 		PG_RE_THROW(); | ||||
| 	} | ||||
| 	PG_END_TRY(); | ||||
|  | ||||
| 	*result = last_res; | ||||
| 	return false; | ||||
| 	if (timed_out) | ||||
| 		PQclear(last_res); | ||||
| 	else | ||||
| 		*result = last_res; | ||||
| 	return timed_out; | ||||
| } | ||||
|   | ||||
| @@ -426,14 +426,20 @@ libpqrcv_PQexec(const char *query) | ||||
| 			 */ | ||||
| 			if (!libpq_select(-1)) | ||||
| 				continue;		/* interrupted */ | ||||
|  | ||||
| 			/* Consume whatever data is available from the socket */ | ||||
| 			if (PQconsumeInput(streamConn) == 0) | ||||
| 				return NULL;	/* trouble */ | ||||
| 			{ | ||||
| 				/* trouble; drop whatever we had and return NULL */ | ||||
| 				PQclear(lastResult); | ||||
| 				return NULL; | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		/* | ||||
| 		 * Emulate the PQexec()'s behavior of returning the last result when | ||||
| 		 * there are many. Since walsender will never generate multiple | ||||
| 		 * results, we skip the concatenation of error messages. | ||||
| 		 * Emulate PQexec()'s behavior of returning the last result when there | ||||
| 		 * are many.  Since walsender will never generate multiple results, we | ||||
| 		 * skip the concatenation of error messages. | ||||
| 		 */ | ||||
| 		result = PQgetResult(streamConn); | ||||
| 		if (result == NULL) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user