From c9ba5d732b23ae25d7603b7b19afbca3a75e9fd6 Mon Sep 17 00:00:00 2001 From: Evan Nelson Date: Wed, 3 Apr 2024 13:39:41 -0700 Subject: [PATCH] Support minimum connection pool size (#8287) This PR adds support for configuring the connection pool to proactively create connections to specified hosts. This is useful for "prewarming" the connection pool before it starts being used. The main public API change is adding `ConnectionPool.setPolicy(Address, AddressPolicy)`. A policy specifies how many _concurrent streams_ should be supported by the pool (which might be satisfied by a single http/2 connection or by N http/1.1 connections). The main internal change is adding a new task queue to `RealConnectionPool` which checks to see if it needs to create new connections to satisfy existing policies. This task is enqueued any time a policy changes, a connection is closed, or an http/2 connection's settings decrease the number of concurrent streams supported. --- .../main/kotlin/okhttp3/TestValueFactory.kt | 23 +++ okhttp/api/okhttp.api | 11 ++ .../src/main/kotlin/okhttp3/ConnectionPool.kt | 61 ++++++++ .../src/main/kotlin/okhttp3/OkHttpClient.kt | 51 ++++++- .../connection/ForceConnectRoutePlanner.kt | 23 +++ .../internal/connection/PoolConnectionUser.kt | 118 +++++++++++++++ .../okhttp3/internal/connection/RealCall.kt | 33 +---- .../internal/connection/RealConnection.kt | 12 +- .../internal/connection/RealConnectionPool.kt | 139 ++++++++++++++++-- .../internal/connection/RealRoutePlanner.kt | 2 +- .../src/test/java/okhttp3/FakeRoutePlanner.kt | 15 +- .../internal/connection/ConnectionPoolTest.kt | 52 +++++++ 12 files changed, 490 insertions(+), 50 deletions(-) create mode 100644 okhttp/src/main/kotlin/okhttp3/internal/connection/ForceConnectRoutePlanner.kt create mode 100644 okhttp/src/main/kotlin/okhttp3/internal/connection/PoolConnectionUser.kt diff --git a/okhttp-testing-support/src/main/kotlin/okhttp3/TestValueFactory.kt b/okhttp-testing-support/src/main/kotlin/okhttp3/TestValueFactory.kt index 10404f7dd..bbc348fa8 100644 --- a/okhttp-testing-support/src/main/kotlin/okhttp3/TestValueFactory.kt +++ b/okhttp-testing-support/src/main/kotlin/okhttp3/TestValueFactory.kt @@ -35,10 +35,13 @@ import okhttp3.internal.RecordingOkAuthenticator import okhttp3.internal.concurrent.TaskFaker import okhttp3.internal.concurrent.TaskRunner import okhttp3.internal.connection.CallConnectionUser +import okhttp3.internal.connection.FastFallbackExchangeFinder import okhttp3.internal.connection.RealCall import okhttp3.internal.connection.RealConnection import okhttp3.internal.connection.RealConnectionPool import okhttp3.internal.connection.RealRoutePlanner +import okhttp3.internal.connection.RouteDatabase +import okhttp3.internal.connection.RoutePlanner import okhttp3.internal.http.RealInterceptorChain import okhttp3.internal.http.RecordingProxySelector import okhttp3.tls.HandshakeCertificates @@ -97,6 +100,7 @@ class TestValueFactory : Closeable { fun newConnectionPool( taskRunner: TaskRunner = this.taskRunner, maxIdleConnections: Int = Int.MAX_VALUE, + routePlanner: RoutePlanner? = null, ): RealConnectionPool { return RealConnectionPool( taskRunner = taskRunner, @@ -104,6 +108,25 @@ class TestValueFactory : Closeable { keepAliveDuration = 100L, timeUnit = TimeUnit.NANOSECONDS, connectionListener = ConnectionListener.NONE, + exchangeFinderFactory = { pool, address, user -> + FastFallbackExchangeFinder( + routePlanner ?: RealRoutePlanner( + taskRunner = taskRunner, + connectionPool = pool, + readTimeoutMillis = 10_000, + writeTimeoutMillis = 10_000, + socketConnectTimeoutMillis = 10_000, + socketReadTimeoutMillis = 10_000, + pingIntervalMillis = 10_000, + retryOnConnectionFailure = false, + fastFallback = true, + address = address, + routeDatabase = RouteDatabase(), + connectionUser = user, + ), + taskRunner, + ) + }, ) } diff --git a/okhttp/api/okhttp.api b/okhttp/api/okhttp.api index ce3ecbc72..6eb3024b6 100644 --- a/okhttp/api/okhttp.api +++ b/okhttp/api/okhttp.api @@ -384,6 +384,16 @@ public final class okhttp3/ConnectionPool { public final fun connectionCount ()I public final fun evictAll ()V public final fun idleConnectionCount ()I + public final fun setPolicy (Lokhttp3/Address;Lokhttp3/ConnectionPool$AddressPolicy;)V +} + +public final class okhttp3/ConnectionPool$AddressPolicy { + public final field backoffDelayMillis J + public final field backoffJitterMillis I + public final field minimumConcurrentCalls I + public fun ()V + public fun (IJI)V + public synthetic fun (IJIILkotlin/jvm/internal/DefaultConstructorMarker;)V } public final class okhttp3/ConnectionSpec { @@ -913,6 +923,7 @@ public class okhttp3/OkHttpClient : okhttp3/Call$Factory, okhttp3/WebSocket$Fact public final fun -deprecated_sslSocketFactory ()Ljavax/net/ssl/SSLSocketFactory; public final fun -deprecated_writeTimeoutMillis ()I public fun ()V + public final fun address (Lokhttp3/HttpUrl;)Lokhttp3/Address; public final fun authenticator ()Lokhttp3/Authenticator; public final fun cache ()Lokhttp3/Cache; public final fun callTimeoutMillis ()I diff --git a/okhttp/src/main/kotlin/okhttp3/ConnectionPool.kt b/okhttp/src/main/kotlin/okhttp3/ConnectionPool.kt index 050d9f945..34af096c8 100644 --- a/okhttp/src/main/kotlin/okhttp3/ConnectionPool.kt +++ b/okhttp/src/main/kotlin/okhttp3/ConnectionPool.kt @@ -18,7 +18,11 @@ package okhttp3 import java.util.concurrent.TimeUnit import okhttp3.internal.concurrent.TaskRunner +import okhttp3.internal.connection.FastFallbackExchangeFinder +import okhttp3.internal.connection.ForceConnectRoutePlanner import okhttp3.internal.connection.RealConnectionPool +import okhttp3.internal.connection.RealRoutePlanner +import okhttp3.internal.connection.RouteDatabase /** * Manages reuse of HTTP and HTTP/2 connections for reduced network latency. HTTP requests that @@ -39,6 +43,14 @@ class ConnectionPool internal constructor( timeUnit: TimeUnit = TimeUnit.MINUTES, taskRunner: TaskRunner = TaskRunner.INSTANCE, connectionListener: ConnectionListener = ConnectionListener.NONE, + readTimeoutMillis: Int = 10_000, + writeTimeoutMillis: Int = 10_000, + socketConnectTimeoutMillis: Int = 10_000, + socketReadTimeoutMillis: Int = 10_000, + pingIntervalMillis: Int = 10_000, + retryOnConnectionFailure: Boolean = true, + fastFallback: Boolean = true, + routeDatabase: RouteDatabase = RouteDatabase(), ) : this( RealConnectionPool( taskRunner = taskRunner, @@ -46,6 +58,27 @@ class ConnectionPool internal constructor( keepAliveDuration = keepAliveDuration, timeUnit = timeUnit, connectionListener = connectionListener, + exchangeFinderFactory = { pool, address, user -> + FastFallbackExchangeFinder( + ForceConnectRoutePlanner( + RealRoutePlanner( + taskRunner = taskRunner, + connectionPool = pool, + readTimeoutMillis = readTimeoutMillis, + writeTimeoutMillis = writeTimeoutMillis, + socketConnectTimeoutMillis = socketConnectTimeoutMillis, + socketReadTimeoutMillis = socketReadTimeoutMillis, + pingIntervalMillis = pingIntervalMillis, + retryOnConnectionFailure = retryOnConnectionFailure, + fastFallback = fastFallback, + address = address, + routeDatabase = routeDatabase, + connectionUser = user, + ), + ), + taskRunner, + ) + }, ), ) @@ -92,4 +125,32 @@ class ConnectionPool internal constructor( fun evictAll() { delegate.evictAll() } + + /** + * Sets a policy that applies to [address]. + * Overwrites any existing policy for that address. + */ + @ExperimentalOkHttpApi + fun setPolicy( + address: Address, + policy: AddressPolicy, + ) { + delegate.setPolicy(address, policy) + } + + /** + * A policy for how the pool should treat a specific address. + */ + class AddressPolicy( + /** + * How many concurrent calls should be possible to make at any time. + * The pool will routinely try to pre-emptively open connections to satisfy this minimum. + * Connections will still be closed if they idle beyond the keep-alive but will be replaced. + */ + @JvmField val minimumConcurrentCalls: Int = 0, + /** How long to wait to retry pre-emptive connection attempts that fail. */ + @JvmField val backoffDelayMillis: Long = 60 * 1000, + /** How much jitter to introduce in connection retry backoff delays */ + @JvmField val backoffJitterMillis: Int = 100, + ) } diff --git a/okhttp/src/main/kotlin/okhttp3/OkHttpClient.kt b/okhttp/src/main/kotlin/okhttp3/OkHttpClient.kt index 7dc77571c..6aabffe02 100644 --- a/okhttp/src/main/kotlin/okhttp3/OkHttpClient.kt +++ b/okhttp/src/main/kotlin/okhttp3/OkHttpClient.kt @@ -136,9 +136,6 @@ open class OkHttpClient internal constructor( @get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher - @get:JvmName("connectionPool") - val connectionPool: ConnectionPool = builder.connectionPool - /** * Returns an immutable list of interceptors that observe the full span of each call: from before * the connection is established (if any) until after the response source is selected (either the @@ -269,6 +266,22 @@ open class OkHttpClient internal constructor( internal val routeDatabase: RouteDatabase = builder.routeDatabase ?: RouteDatabase() internal val taskRunner: TaskRunner = builder.taskRunner ?: TaskRunner.INSTANCE + @get:JvmName("connectionPool") + val connectionPool: ConnectionPool = + builder.connectionPool ?: ConnectionPool( + readTimeoutMillis = readTimeoutMillis, + writeTimeoutMillis = writeTimeoutMillis, + socketConnectTimeoutMillis = connectTimeoutMillis, + socketReadTimeoutMillis = readTimeoutMillis, + pingIntervalMillis = pingIntervalMillis, + retryOnConnectionFailure = retryOnConnectionFailure, + fastFallback = fastFallback, + routeDatabase = routeDatabase, + ).also { + // Cache the pool in the builder so that it will be shared with other clients + builder.connectionPool = it + } + constructor() : this(Builder()) init { @@ -296,6 +309,36 @@ open class OkHttpClient internal constructor( verifyClientState() } + /** + * Creates an [Address] of out of the provided [HttpUrl] + * that uses this client’s DNS, TLS, and proxy configuration. + */ + fun address(url: HttpUrl): Address { + var useSslSocketFactory: SSLSocketFactory? = null + var useHostnameVerifier: HostnameVerifier? = null + var useCertificatePinner: CertificatePinner? = null + if (url.isHttps) { + useSslSocketFactory = sslSocketFactory + useHostnameVerifier = hostnameVerifier + useCertificatePinner = certificatePinner + } + + return Address( + uriHost = url.host, + uriPort = url.port, + dns = dns, + socketFactory = socketFactory, + sslSocketFactory = useSslSocketFactory, + hostnameVerifier = useHostnameVerifier, + certificatePinner = useCertificatePinner, + proxyAuthenticator = proxyAuthenticator, + proxy = proxy, + protocols = protocols, + connectionSpecs = connectionSpecs, + proxySelector = proxySelector, + ) + } + private fun verifyClientState() { check(null !in (interceptors as List)) { "Null interceptor: $interceptors" @@ -552,7 +595,7 @@ open class OkHttpClient internal constructor( class Builder() { internal var dispatcher: Dispatcher = Dispatcher() - internal var connectionPool: ConnectionPool = ConnectionPool() + internal var connectionPool: ConnectionPool? = null internal val interceptors: MutableList = mutableListOf() internal val networkInterceptors: MutableList = mutableListOf() internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory() diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/ForceConnectRoutePlanner.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/ForceConnectRoutePlanner.kt new file mode 100644 index 000000000..927ab7037 --- /dev/null +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/ForceConnectRoutePlanner.kt @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2024 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.internal.connection + +/** + * A RoutePlanner that will always establish a new connection, ignoring any connection pooling + */ +class ForceConnectRoutePlanner(private val delegate: RealRoutePlanner) : RoutePlanner by delegate { + override fun plan(): RoutePlanner.Plan = delegate.planConnect() // not delegate.plan() +} diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/PoolConnectionUser.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/PoolConnectionUser.kt new file mode 100644 index 000000000..94b567768 --- /dev/null +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/PoolConnectionUser.kt @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2024 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.internal.connection + +import java.io.IOException +import java.net.InetAddress +import java.net.Proxy +import java.net.Socket +import okhttp3.Connection +import okhttp3.Handshake +import okhttp3.HttpUrl +import okhttp3.Protocol +import okhttp3.Route + +/** + * A user that is a connection pool creating connections in the background + * without an intent to immediately use them. + */ +object PoolConnectionUser : ConnectionUser { + override fun addPlanToCancel(connectPlan: ConnectPlan) { + } + + override fun removePlanToCancel(connectPlan: ConnectPlan) { + } + + override fun updateRouteDatabaseAfterSuccess(route: Route) { + } + + override fun connectStart(route: Route) { + } + + override fun secureConnectStart() { + } + + override fun secureConnectEnd(handshake: Handshake?) { + } + + override fun callConnectEnd( + route: Route, + protocol: Protocol?, + ) { + } + + override fun connectionConnectEnd( + connection: Connection, + route: Route, + ) { + } + + override fun connectFailed( + route: Route, + protocol: Protocol?, + e: IOException, + ) { + } + + override fun connectionAcquired(connection: Connection) { + } + + override fun acquireConnectionNoEvents(connection: RealConnection) { + } + + override fun releaseConnectionNoEvents(): Socket? { + return null + } + + override fun connectionReleased(connection: Connection) { + } + + override fun connectionConnectionAcquired(connection: RealConnection) { + } + + override fun connectionConnectionReleased(connection: RealConnection) { + } + + override fun connectionConnectionClosed(connection: RealConnection) { + } + + override fun noNewExchanges(connection: RealConnection) { + } + + override fun doExtensiveHealthChecks(): Boolean = false + + override fun isCanceled(): Boolean = false + + override fun candidateConnection(): RealConnection? = null + + override fun proxySelectStart(url: HttpUrl) { + } + + override fun proxySelectEnd( + url: HttpUrl, + proxies: List, + ) { + } + + override fun dnsStart(socketHost: String) { + } + + override fun dnsEnd( + socketHost: String, + result: List, + ) { + } +} diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt index b35bfe485..2f7be64ba 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt @@ -25,14 +25,9 @@ import java.util.concurrent.RejectedExecutionException import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger -import javax.net.ssl.HostnameVerifier -import javax.net.ssl.SSLSocketFactory -import okhttp3.Address import okhttp3.Call import okhttp3.Callback -import okhttp3.CertificatePinner import okhttp3.EventListener -import okhttp3.HttpUrl import okhttp3.Interceptor import okhttp3.OkHttpClient import okhttp3.Request @@ -256,7 +251,7 @@ class RealCall( pingIntervalMillis = client.pingIntervalMillis, retryOnConnectionFailure = client.retryOnConnectionFailure, fastFallback = client.fastFallback, - address = createAddress(request.url), + address = client.address(request.url), connectionUser = CallConnectionUser(this, connectionPool.connectionListener, chain), routeDatabase = client.routeDatabase, ) @@ -459,32 +454,6 @@ class RealCall( interceptorScopedExchange = null } - private fun createAddress(url: HttpUrl): Address { - var sslSocketFactory: SSLSocketFactory? = null - var hostnameVerifier: HostnameVerifier? = null - var certificatePinner: CertificatePinner? = null - if (url.isHttps) { - sslSocketFactory = client.sslSocketFactory - hostnameVerifier = client.hostnameVerifier - certificatePinner = client.certificatePinner - } - - return Address( - uriHost = url.host, - uriPort = url.port, - dns = client.dns, - socketFactory = client.socketFactory, - sslSocketFactory = sslSocketFactory, - hostnameVerifier = hostnameVerifier, - certificatePinner = certificatePinner, - proxyAuthenticator = client.proxyAuthenticator, - proxy = client.proxy, - protocols = client.protocols, - connectionSpecs = client.connectionSpecs, - proxySelector = client.proxySelector, - ) - } - fun retryAfterFailure(): Boolean { return exchange?.hasFailure == true && exchangeFinder!!.routePlanner.hasNext(exchange?.connection) diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt index b9b63c9cd..b19c52492 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt @@ -111,7 +111,8 @@ class RealConnection( * The maximum number of concurrent streams that can be carried by this connection. If * `allocations.size() < allocationLimit` then new streams can be created on this connection. */ - private var allocationLimit = 1 + internal var allocationLimit = 1 + private set /** Current calls carried by this connection. */ val calls = mutableListOf>() @@ -344,7 +345,16 @@ class RealConnection( connection: Http2Connection, settings: Settings, ) { + val oldLimit = allocationLimit allocationLimit = settings.getMaxConcurrentStreams() + + if (allocationLimit < oldLimit) { + // We might need new connections to keep policies satisfied + connectionPool.scheduleConnectionOpener() + } else if (allocationLimit > oldLimit) { + // We might no longer need some connections + connectionPool.scheduleConnectionCloser() + } } override fun handshake(): Handshake? = handshake diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnectionPool.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnectionPool.kt index 8f2f345fe..9c57a44e3 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnectionPool.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnectionPool.kt @@ -18,6 +18,7 @@ package okhttp3.internal.connection import java.net.Socket import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeUnit import okhttp3.Address import okhttp3.ConnectionListener @@ -33,21 +34,38 @@ import okhttp3.internal.okHttpName import okhttp3.internal.platform.Platform class RealConnectionPool( - taskRunner: TaskRunner, - /** The maximum number of idle connections for each address. */ + private val taskRunner: TaskRunner, + /** + * The maximum number of idle connections across all addresses. + * Connections needed to satisfy a [ConnectionPool.AddressPolicy] are not considered idle. + */ private val maxIdleConnections: Int, keepAliveDuration: Long, timeUnit: TimeUnit, internal val connectionListener: ConnectionListener, + private val exchangeFinderFactory: (RealConnectionPool, Address, ConnectionUser) -> ExchangeFinder, ) { - private val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration) + internal val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration) + + // guarded by [this] + private var policies: Map = mapOf() + private val user = PoolConnectionUser private val cleanupQueue: TaskQueue = taskRunner.newQueue() private val cleanupTask = - object : Task("$okHttpName ConnectionPool") { + object : Task("$okHttpName ConnectionPool connection closer") { override fun runOnce(): Long = cleanup(System.nanoTime()) } + private fun MinimumConnectionState.schedule() { + val state = this + queue.schedule( + object : Task("$okHttpName ConnectionPool connection opener") { + override fun runOnce(): Long = ensureMinimumConnections(state) + }, + ) + } + /** * Holding the lock of the connection being added or removed when mutating this, and check its * [RealConnection.noNewExchanges] property. This defends against races where a connection is @@ -131,7 +149,7 @@ class RealConnectionPool( connections.add(connection) // connection.queueEvent { connectionListener.connectEnd(connection) } - cleanupQueue.schedule(cleanupTask) + scheduleConnectionCloser() } /** @@ -145,9 +163,10 @@ class RealConnectionPool( connection.noNewExchanges = true connections.remove(connection) if (connections.isEmpty()) cleanupQueue.cancelAll() + scheduleConnectionOpener() true } else { - cleanupQueue.schedule(cleanupTask) + scheduleConnectionCloser() false } } @@ -173,6 +192,7 @@ class RealConnectionPool( } if (connections.isEmpty()) cleanupQueue.cancelAll() + scheduleConnectionOpener() } /** @@ -187,21 +207,34 @@ class RealConnectionPool( var idleConnectionCount = 0 var longestIdleConnection: RealConnection? = null var longestIdleDurationNs = Long.MIN_VALUE + var policyAffected: MinimumConnectionState? = null + policies + .forEach { it.value.unsatisfiedCountCleanupTask = it.value.policy.minimumConcurrentCalls } // Find either a connection to evict, or the time that the next eviction is due. for (connection in connections) { synchronized(connection) { - // If the connection is in use, keep searching. + val satisfiablePolicy = + policies.entries.firstOrNull { + it.value.unsatisfiedCountCleanupTask > 0 && connection.isEligible(it.key, null) + }?.value + val idleDurationNs = now - connection.idleAtNs + if (pruneAndGetAllocationCount(connection, now) > 0) { + // If the connection is in use, keep searching. + inUseConnectionCount++ + } else if (satisfiablePolicy != null && idleDurationNs < this.keepAliveDurationNs) { + // If the connection hasn't expired and helps satisfy a policy, keep searching. + satisfiablePolicy.unsatisfiedCountCleanupTask -= connection.allocationLimit inUseConnectionCount++ } else { idleConnectionCount++ // If the connection is ready to be evicted, we're done. - val idleDurationNs = now - connection.idleAtNs if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs longestIdleConnection = connection + policyAffected = satisfiablePolicy } else { Unit } @@ -212,7 +245,7 @@ class RealConnectionPool( when { longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections -> { - // We've chosen a connection to evict. Confirm it's still okay to be evict, then close it. + // We've chosen a connection to evict. Confirm it's still okay to be evicted, then close it. val connection = longestIdleConnection!! synchronized(connection) { if (connection.calls.isNotEmpty()) return 0L // No longer idle. @@ -220,6 +253,7 @@ class RealConnectionPool( connection.noNewExchanges = true connections.remove(longestIdleConnection) } + policyAffected?.schedule() connection.socket().closeQuietly() connectionListener.connectionClosed(connection) if (connections.isEmpty()) cleanupQueue.cancelAll() @@ -286,6 +320,93 @@ class RealConnectionPool( return references.size } + /** + * Adds or replaces the policy for [address]. + * This will trigger a background task to start creating connections as needed. + */ + fun setPolicy( + address: Address, + policy: ConnectionPool.AddressPolicy, + ) { + val state = MinimumConnectionState(address, taskRunner.newQueue(), policy) + val oldPolicy: ConnectionPool.AddressPolicy? + synchronized(this) { + oldPolicy = policies[address]?.policy + policies = policies + (address to state) + } + + val newConnectionsNeeded = + policy.minimumConcurrentCalls - (oldPolicy?.minimumConcurrentCalls ?: 0) + if (newConnectionsNeeded > 0) { + state.schedule() + } else if (newConnectionsNeeded < 0) { + scheduleConnectionCloser() + } + } + + fun scheduleConnectionOpener() { + policies.values.forEach { it.schedule() } + } + + fun scheduleConnectionCloser() { + cleanupQueue.schedule(cleanupTask) + } + + /** + * Ensure enough connections open to [address] to satisfy its [ConnectionPool.AddressPolicy]. + * If there are already enough connections, we're done. + * If not, we create one and then schedule the task to run again immediately. + */ + private fun ensureMinimumConnections(state: MinimumConnectionState): Long { + // This policy does not require minimum connections, don't run again + if (state.policy.minimumConcurrentCalls < 1) return -1 + + var unsatisfiedCountMinTask = state.policy.minimumConcurrentCalls + + for (connection in connections) { + synchronized(connection) { + if (connection.isEligible(state.address, null)) { + unsatisfiedCountMinTask -= connection.allocationLimit + } + } + + // The policy was satisfied by existing connections, don't run again + if (unsatisfiedCountMinTask < 1) return -1 + } + + // If we got here then the policy was not satisfied -- open a connection! + try { + val connection = exchangeFinderFactory(this, state.address, user).find() + + // RealRoutePlanner will add the connection to the pool itself, other RoutePlanners may not + // TODO: make all RoutePlanners consistent in this behavior + if (connection !in connections) { + synchronized(connection) { put(connection) } + } + + return 0 // run again immediately to create more connections if needed + } catch (ex: Exception) { + // No need to log, user.connectFailed() will already have been called. Just try again later. + return state.policy.backoffDelayMillis.jitterBy(state.policy.backoffJitterMillis) * 1_000_000 + } + } + + private fun Long.jitterBy(amount: Int): Long { + return this + ThreadLocalRandom.current().nextInt(amount * -1, amount) + } + + class MinimumConnectionState( + val address: Address, + val queue: TaskQueue, + var policy: ConnectionPool.AddressPolicy, + ) { + /** + * This field is only ever accessed by the cleanup task, and it tracks + * how many concurrent calls are already satisfied by existing connections. + */ + var unsatisfiedCountCleanupTask: Int = 0 + } + companion object { fun get(connectionPool: ConnectionPool): RealConnectionPool = connectionPool.delegate } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt index dc81e4bbe..3684a19da 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt @@ -129,7 +129,7 @@ class RealRoutePlanner( /** Plans to make a new connection by deciding which route to try next. */ @Throws(IOException::class) - private fun planConnect(): ConnectPlan { + internal fun planConnect(): ConnectPlan { // Use a route from a preceding coalesced connection. val localNextRouteToTry = nextRouteToTry if (localNextRouteToTry != null) { diff --git a/okhttp/src/test/java/okhttp3/FakeRoutePlanner.kt b/okhttp/src/test/java/okhttp3/FakeRoutePlanner.kt index e2a91ef38..e817d2e09 100644 --- a/okhttp/src/test/java/okhttp3/FakeRoutePlanner.kt +++ b/okhttp/src/test/java/okhttp3/FakeRoutePlanner.kt @@ -36,6 +36,8 @@ class FakeRoutePlanner( val events = LinkedBlockingDeque() var canceled = false + var autoGeneratePlans = false + var defaultConnectionIdleAtNanos = Long.MAX_VALUE private var nextPlanId = 0 private var nextPlanIndex = 0 private val plans = mutableListOf() @@ -56,8 +58,10 @@ class FakeRoutePlanner( // Return deferred plans preferentially. These don't require addPlan(). if (deferredPlans.isNotEmpty()) return deferredPlans.removeFirst() as FakePlan + if (nextPlanIndex >= plans.size && autoGeneratePlans) addPlan() + require(nextPlanIndex < plans.size) { - "not enough plans! call addPlan() in the test to set this up" + "not enough plans! call addPlan() or set autoGeneratePlans=true in the test to set this up" } val result = plans[nextPlanIndex++] events += "take plan ${result.id}" @@ -73,7 +77,7 @@ class FakeRoutePlanner( } override fun hasNext(failedConnection: RealConnection?): Boolean { - return deferredPlans.isNotEmpty() || nextPlanIndex < plans.size + return deferredPlans.isNotEmpty() || nextPlanIndex < plans.size || autoGeneratePlans } override fun sameHostAndPort(url: HttpUrl): Boolean { @@ -90,7 +94,12 @@ class FakeRoutePlanner( var planningThrowable: Throwable? = null var canceled = false var connectState = ConnectState.READY - val connection = factory.newConnection(pool, factory.newRoute(address)) + val connection = + factory.newConnection( + pool = pool, + route = factory.newRoute(address), + idleAtNanos = defaultConnectionIdleAtNanos, + ) var retry: FakePlan? = null var retryTaken = false var yieldBeforePlanReturns = false diff --git a/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.kt b/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.kt index 6c5697dc3..1cfa0f774 100644 --- a/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.kt +++ b/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.kt @@ -22,6 +22,7 @@ import assertk.assertions.isFalse import assertk.assertions.isNotEmpty import assertk.assertions.isTrue import okhttp3.ConnectionPool +import okhttp3.FakeRoutePlanner import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.TestUtil.awaitGarbageCollection @@ -41,6 +42,8 @@ class ConnectionPoolTest { private val addressC = factory.newAddress("c") private val routeC1 = factory.newRoute(addressC) + private val routePlanner = FakeRoutePlanner(factory.taskFaker) + @AfterEach fun tearDown() { factory.close() } @@ -192,6 +195,55 @@ class ConnectionPoolTest { assertThat(realTaskRunner.activeQueues()).isEmpty() } + @Test fun connectionPreWarming() { + // TODO this test spins forever due to bugs in TaskFaker.runTasks() + + // routePlanner.autoGeneratePlans = true + // routePlanner.defaultConnectionIdleAtNanos = System.nanoTime() + 1_000_000_000_000 + // val address = routePlanner.address + // val pool = factory.newConnectionPool(routePlanner = routePlanner) + // + // // Connections are created as soon as a policy is set + // setPolicy(pool, address, ConnectionPool.AddressPolicy(2)) + // assertThat(pool.connectionCount()).isEqualTo(2) + // + // // Connections are replaced if they idle out or are evicted from the pool + // evictAllConnections(pool) + // assertThat(pool.connectionCount()).isEqualTo(2) + // forceConnectionsToExpire(pool, routePlanner) + // assertThat(pool.connectionCount()).isEqualTo(2) + // + // // Excess connections aren't removed until they idle out, even if no longer needed + // setPolicy(pool, address, ConnectionPool.AddressPolicy(1)) + // assertThat(pool.connectionCount()).isEqualTo(2) + // forceConnectionsToExpire(pool, routePlanner) + // assertThat(pool.connectionCount()).isEqualTo(1) + + // TODO test that http/2 connections will be opened/closed based on concurrent stream settings + } + + // private fun setPolicy( + // pool: RealConnectionPool, + // address: Address, + // policy: ConnectionPool.AddressPolicy + // ) { + // pool.setPolicy(address, policy) + // factory.taskFaker.runTasks() + // } + // + // private fun evictAllConnections(pool: RealConnectionPool) { + // pool.evictAll() + // assertThat(pool.connectionCount()).isEqualTo(0) + // factory.taskFaker.runTasks() + // } + // + // private fun forceConnectionsToExpire(pool: RealConnectionPool, routePlanner: FakeRoutePlanner) { + // val idleTimeNanos = routePlanner.defaultConnectionIdleAtNanos + pool.keepAliveDurationNs + // repeat(pool.connectionCount()) { pool.cleanup(idleTimeNanos) } + // assertThat(pool.connectionCount()).isEqualTo(0) + // factory.taskFaker.runTasks() + // } + /** Use a helper method so there's no hidden reference remaining on the stack. */ private fun allocateAndLeakAllocation( pool: ConnectionPool,