diff --git a/okhttp-testing-support/src/main/kotlin/okhttp3/TestValueFactory.kt b/okhttp-testing-support/src/main/kotlin/okhttp3/TestValueFactory.kt index 10404f7dd..bbc348fa8 100644 --- a/okhttp-testing-support/src/main/kotlin/okhttp3/TestValueFactory.kt +++ b/okhttp-testing-support/src/main/kotlin/okhttp3/TestValueFactory.kt @@ -35,10 +35,13 @@ import okhttp3.internal.RecordingOkAuthenticator import okhttp3.internal.concurrent.TaskFaker import okhttp3.internal.concurrent.TaskRunner import okhttp3.internal.connection.CallConnectionUser +import okhttp3.internal.connection.FastFallbackExchangeFinder import okhttp3.internal.connection.RealCall import okhttp3.internal.connection.RealConnection import okhttp3.internal.connection.RealConnectionPool import okhttp3.internal.connection.RealRoutePlanner +import okhttp3.internal.connection.RouteDatabase +import okhttp3.internal.connection.RoutePlanner import okhttp3.internal.http.RealInterceptorChain import okhttp3.internal.http.RecordingProxySelector import okhttp3.tls.HandshakeCertificates @@ -97,6 +100,7 @@ class TestValueFactory : Closeable { fun newConnectionPool( taskRunner: TaskRunner = this.taskRunner, maxIdleConnections: Int = Int.MAX_VALUE, + routePlanner: RoutePlanner? = null, ): RealConnectionPool { return RealConnectionPool( taskRunner = taskRunner, @@ -104,6 +108,25 @@ class TestValueFactory : Closeable { keepAliveDuration = 100L, timeUnit = TimeUnit.NANOSECONDS, connectionListener = ConnectionListener.NONE, + exchangeFinderFactory = { pool, address, user -> + FastFallbackExchangeFinder( + routePlanner ?: RealRoutePlanner( + taskRunner = taskRunner, + connectionPool = pool, + readTimeoutMillis = 10_000, + writeTimeoutMillis = 10_000, + socketConnectTimeoutMillis = 10_000, + socketReadTimeoutMillis = 10_000, + pingIntervalMillis = 10_000, + retryOnConnectionFailure = false, + fastFallback = true, + address = address, + routeDatabase = RouteDatabase(), + connectionUser = user, + ), + taskRunner, + ) + }, ) } diff --git a/okhttp/api/okhttp.api b/okhttp/api/okhttp.api index ce3ecbc72..6eb3024b6 100644 --- a/okhttp/api/okhttp.api +++ b/okhttp/api/okhttp.api @@ -384,6 +384,16 @@ public final class okhttp3/ConnectionPool { public final fun connectionCount ()I public final fun evictAll ()V public final fun idleConnectionCount ()I + public final fun setPolicy (Lokhttp3/Address;Lokhttp3/ConnectionPool$AddressPolicy;)V +} + +public final class okhttp3/ConnectionPool$AddressPolicy { + public final field backoffDelayMillis J + public final field backoffJitterMillis I + public final field minimumConcurrentCalls I + public fun ()V + public fun (IJI)V + public synthetic fun (IJIILkotlin/jvm/internal/DefaultConstructorMarker;)V } public final class okhttp3/ConnectionSpec { @@ -913,6 +923,7 @@ public class okhttp3/OkHttpClient : okhttp3/Call$Factory, okhttp3/WebSocket$Fact public final fun -deprecated_sslSocketFactory ()Ljavax/net/ssl/SSLSocketFactory; public final fun -deprecated_writeTimeoutMillis ()I public fun ()V + public final fun address (Lokhttp3/HttpUrl;)Lokhttp3/Address; public final fun authenticator ()Lokhttp3/Authenticator; public final fun cache ()Lokhttp3/Cache; public final fun callTimeoutMillis ()I diff --git a/okhttp/src/main/kotlin/okhttp3/ConnectionPool.kt b/okhttp/src/main/kotlin/okhttp3/ConnectionPool.kt index 050d9f945..34af096c8 100644 --- a/okhttp/src/main/kotlin/okhttp3/ConnectionPool.kt +++ b/okhttp/src/main/kotlin/okhttp3/ConnectionPool.kt @@ -18,7 +18,11 @@ package okhttp3 import java.util.concurrent.TimeUnit import okhttp3.internal.concurrent.TaskRunner +import okhttp3.internal.connection.FastFallbackExchangeFinder +import okhttp3.internal.connection.ForceConnectRoutePlanner import okhttp3.internal.connection.RealConnectionPool +import okhttp3.internal.connection.RealRoutePlanner +import okhttp3.internal.connection.RouteDatabase /** * Manages reuse of HTTP and HTTP/2 connections for reduced network latency. HTTP requests that @@ -39,6 +43,14 @@ class ConnectionPool internal constructor( timeUnit: TimeUnit = TimeUnit.MINUTES, taskRunner: TaskRunner = TaskRunner.INSTANCE, connectionListener: ConnectionListener = ConnectionListener.NONE, + readTimeoutMillis: Int = 10_000, + writeTimeoutMillis: Int = 10_000, + socketConnectTimeoutMillis: Int = 10_000, + socketReadTimeoutMillis: Int = 10_000, + pingIntervalMillis: Int = 10_000, + retryOnConnectionFailure: Boolean = true, + fastFallback: Boolean = true, + routeDatabase: RouteDatabase = RouteDatabase(), ) : this( RealConnectionPool( taskRunner = taskRunner, @@ -46,6 +58,27 @@ class ConnectionPool internal constructor( keepAliveDuration = keepAliveDuration, timeUnit = timeUnit, connectionListener = connectionListener, + exchangeFinderFactory = { pool, address, user -> + FastFallbackExchangeFinder( + ForceConnectRoutePlanner( + RealRoutePlanner( + taskRunner = taskRunner, + connectionPool = pool, + readTimeoutMillis = readTimeoutMillis, + writeTimeoutMillis = writeTimeoutMillis, + socketConnectTimeoutMillis = socketConnectTimeoutMillis, + socketReadTimeoutMillis = socketReadTimeoutMillis, + pingIntervalMillis = pingIntervalMillis, + retryOnConnectionFailure = retryOnConnectionFailure, + fastFallback = fastFallback, + address = address, + routeDatabase = routeDatabase, + connectionUser = user, + ), + ), + taskRunner, + ) + }, ), ) @@ -92,4 +125,32 @@ class ConnectionPool internal constructor( fun evictAll() { delegate.evictAll() } + + /** + * Sets a policy that applies to [address]. + * Overwrites any existing policy for that address. + */ + @ExperimentalOkHttpApi + fun setPolicy( + address: Address, + policy: AddressPolicy, + ) { + delegate.setPolicy(address, policy) + } + + /** + * A policy for how the pool should treat a specific address. + */ + class AddressPolicy( + /** + * How many concurrent calls should be possible to make at any time. + * The pool will routinely try to pre-emptively open connections to satisfy this minimum. + * Connections will still be closed if they idle beyond the keep-alive but will be replaced. + */ + @JvmField val minimumConcurrentCalls: Int = 0, + /** How long to wait to retry pre-emptive connection attempts that fail. */ + @JvmField val backoffDelayMillis: Long = 60 * 1000, + /** How much jitter to introduce in connection retry backoff delays */ + @JvmField val backoffJitterMillis: Int = 100, + ) } diff --git a/okhttp/src/main/kotlin/okhttp3/OkHttpClient.kt b/okhttp/src/main/kotlin/okhttp3/OkHttpClient.kt index 7dc77571c..6aabffe02 100644 --- a/okhttp/src/main/kotlin/okhttp3/OkHttpClient.kt +++ b/okhttp/src/main/kotlin/okhttp3/OkHttpClient.kt @@ -136,9 +136,6 @@ open class OkHttpClient internal constructor( @get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher - @get:JvmName("connectionPool") - val connectionPool: ConnectionPool = builder.connectionPool - /** * Returns an immutable list of interceptors that observe the full span of each call: from before * the connection is established (if any) until after the response source is selected (either the @@ -269,6 +266,22 @@ open class OkHttpClient internal constructor( internal val routeDatabase: RouteDatabase = builder.routeDatabase ?: RouteDatabase() internal val taskRunner: TaskRunner = builder.taskRunner ?: TaskRunner.INSTANCE + @get:JvmName("connectionPool") + val connectionPool: ConnectionPool = + builder.connectionPool ?: ConnectionPool( + readTimeoutMillis = readTimeoutMillis, + writeTimeoutMillis = writeTimeoutMillis, + socketConnectTimeoutMillis = connectTimeoutMillis, + socketReadTimeoutMillis = readTimeoutMillis, + pingIntervalMillis = pingIntervalMillis, + retryOnConnectionFailure = retryOnConnectionFailure, + fastFallback = fastFallback, + routeDatabase = routeDatabase, + ).also { + // Cache the pool in the builder so that it will be shared with other clients + builder.connectionPool = it + } + constructor() : this(Builder()) init { @@ -296,6 +309,36 @@ open class OkHttpClient internal constructor( verifyClientState() } + /** + * Creates an [Address] of out of the provided [HttpUrl] + * that uses this client’s DNS, TLS, and proxy configuration. + */ + fun address(url: HttpUrl): Address { + var useSslSocketFactory: SSLSocketFactory? = null + var useHostnameVerifier: HostnameVerifier? = null + var useCertificatePinner: CertificatePinner? = null + if (url.isHttps) { + useSslSocketFactory = sslSocketFactory + useHostnameVerifier = hostnameVerifier + useCertificatePinner = certificatePinner + } + + return Address( + uriHost = url.host, + uriPort = url.port, + dns = dns, + socketFactory = socketFactory, + sslSocketFactory = useSslSocketFactory, + hostnameVerifier = useHostnameVerifier, + certificatePinner = useCertificatePinner, + proxyAuthenticator = proxyAuthenticator, + proxy = proxy, + protocols = protocols, + connectionSpecs = connectionSpecs, + proxySelector = proxySelector, + ) + } + private fun verifyClientState() { check(null !in (interceptors as List)) { "Null interceptor: $interceptors" @@ -552,7 +595,7 @@ open class OkHttpClient internal constructor( class Builder() { internal var dispatcher: Dispatcher = Dispatcher() - internal var connectionPool: ConnectionPool = ConnectionPool() + internal var connectionPool: ConnectionPool? = null internal val interceptors: MutableList = mutableListOf() internal val networkInterceptors: MutableList = mutableListOf() internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory() diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/ForceConnectRoutePlanner.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/ForceConnectRoutePlanner.kt new file mode 100644 index 000000000..927ab7037 --- /dev/null +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/ForceConnectRoutePlanner.kt @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2024 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.internal.connection + +/** + * A RoutePlanner that will always establish a new connection, ignoring any connection pooling + */ +class ForceConnectRoutePlanner(private val delegate: RealRoutePlanner) : RoutePlanner by delegate { + override fun plan(): RoutePlanner.Plan = delegate.planConnect() // not delegate.plan() +} diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/PoolConnectionUser.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/PoolConnectionUser.kt new file mode 100644 index 000000000..94b567768 --- /dev/null +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/PoolConnectionUser.kt @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2024 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.internal.connection + +import java.io.IOException +import java.net.InetAddress +import java.net.Proxy +import java.net.Socket +import okhttp3.Connection +import okhttp3.Handshake +import okhttp3.HttpUrl +import okhttp3.Protocol +import okhttp3.Route + +/** + * A user that is a connection pool creating connections in the background + * without an intent to immediately use them. + */ +object PoolConnectionUser : ConnectionUser { + override fun addPlanToCancel(connectPlan: ConnectPlan) { + } + + override fun removePlanToCancel(connectPlan: ConnectPlan) { + } + + override fun updateRouteDatabaseAfterSuccess(route: Route) { + } + + override fun connectStart(route: Route) { + } + + override fun secureConnectStart() { + } + + override fun secureConnectEnd(handshake: Handshake?) { + } + + override fun callConnectEnd( + route: Route, + protocol: Protocol?, + ) { + } + + override fun connectionConnectEnd( + connection: Connection, + route: Route, + ) { + } + + override fun connectFailed( + route: Route, + protocol: Protocol?, + e: IOException, + ) { + } + + override fun connectionAcquired(connection: Connection) { + } + + override fun acquireConnectionNoEvents(connection: RealConnection) { + } + + override fun releaseConnectionNoEvents(): Socket? { + return null + } + + override fun connectionReleased(connection: Connection) { + } + + override fun connectionConnectionAcquired(connection: RealConnection) { + } + + override fun connectionConnectionReleased(connection: RealConnection) { + } + + override fun connectionConnectionClosed(connection: RealConnection) { + } + + override fun noNewExchanges(connection: RealConnection) { + } + + override fun doExtensiveHealthChecks(): Boolean = false + + override fun isCanceled(): Boolean = false + + override fun candidateConnection(): RealConnection? = null + + override fun proxySelectStart(url: HttpUrl) { + } + + override fun proxySelectEnd( + url: HttpUrl, + proxies: List, + ) { + } + + override fun dnsStart(socketHost: String) { + } + + override fun dnsEnd( + socketHost: String, + result: List, + ) { + } +} diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt index b35bfe485..2f7be64ba 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt @@ -25,14 +25,9 @@ import java.util.concurrent.RejectedExecutionException import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger -import javax.net.ssl.HostnameVerifier -import javax.net.ssl.SSLSocketFactory -import okhttp3.Address import okhttp3.Call import okhttp3.Callback -import okhttp3.CertificatePinner import okhttp3.EventListener -import okhttp3.HttpUrl import okhttp3.Interceptor import okhttp3.OkHttpClient import okhttp3.Request @@ -256,7 +251,7 @@ class RealCall( pingIntervalMillis = client.pingIntervalMillis, retryOnConnectionFailure = client.retryOnConnectionFailure, fastFallback = client.fastFallback, - address = createAddress(request.url), + address = client.address(request.url), connectionUser = CallConnectionUser(this, connectionPool.connectionListener, chain), routeDatabase = client.routeDatabase, ) @@ -459,32 +454,6 @@ class RealCall( interceptorScopedExchange = null } - private fun createAddress(url: HttpUrl): Address { - var sslSocketFactory: SSLSocketFactory? = null - var hostnameVerifier: HostnameVerifier? = null - var certificatePinner: CertificatePinner? = null - if (url.isHttps) { - sslSocketFactory = client.sslSocketFactory - hostnameVerifier = client.hostnameVerifier - certificatePinner = client.certificatePinner - } - - return Address( - uriHost = url.host, - uriPort = url.port, - dns = client.dns, - socketFactory = client.socketFactory, - sslSocketFactory = sslSocketFactory, - hostnameVerifier = hostnameVerifier, - certificatePinner = certificatePinner, - proxyAuthenticator = client.proxyAuthenticator, - proxy = client.proxy, - protocols = client.protocols, - connectionSpecs = client.connectionSpecs, - proxySelector = client.proxySelector, - ) - } - fun retryAfterFailure(): Boolean { return exchange?.hasFailure == true && exchangeFinder!!.routePlanner.hasNext(exchange?.connection) diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt index b9b63c9cd..b19c52492 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt @@ -111,7 +111,8 @@ class RealConnection( * The maximum number of concurrent streams that can be carried by this connection. If * `allocations.size() < allocationLimit` then new streams can be created on this connection. */ - private var allocationLimit = 1 + internal var allocationLimit = 1 + private set /** Current calls carried by this connection. */ val calls = mutableListOf>() @@ -344,7 +345,16 @@ class RealConnection( connection: Http2Connection, settings: Settings, ) { + val oldLimit = allocationLimit allocationLimit = settings.getMaxConcurrentStreams() + + if (allocationLimit < oldLimit) { + // We might need new connections to keep policies satisfied + connectionPool.scheduleConnectionOpener() + } else if (allocationLimit > oldLimit) { + // We might no longer need some connections + connectionPool.scheduleConnectionCloser() + } } override fun handshake(): Handshake? = handshake diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnectionPool.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnectionPool.kt index 8f2f345fe..9c57a44e3 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnectionPool.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnectionPool.kt @@ -18,6 +18,7 @@ package okhttp3.internal.connection import java.net.Socket import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeUnit import okhttp3.Address import okhttp3.ConnectionListener @@ -33,21 +34,38 @@ import okhttp3.internal.okHttpName import okhttp3.internal.platform.Platform class RealConnectionPool( - taskRunner: TaskRunner, - /** The maximum number of idle connections for each address. */ + private val taskRunner: TaskRunner, + /** + * The maximum number of idle connections across all addresses. + * Connections needed to satisfy a [ConnectionPool.AddressPolicy] are not considered idle. + */ private val maxIdleConnections: Int, keepAliveDuration: Long, timeUnit: TimeUnit, internal val connectionListener: ConnectionListener, + private val exchangeFinderFactory: (RealConnectionPool, Address, ConnectionUser) -> ExchangeFinder, ) { - private val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration) + internal val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration) + + // guarded by [this] + private var policies: Map = mapOf() + private val user = PoolConnectionUser private val cleanupQueue: TaskQueue = taskRunner.newQueue() private val cleanupTask = - object : Task("$okHttpName ConnectionPool") { + object : Task("$okHttpName ConnectionPool connection closer") { override fun runOnce(): Long = cleanup(System.nanoTime()) } + private fun MinimumConnectionState.schedule() { + val state = this + queue.schedule( + object : Task("$okHttpName ConnectionPool connection opener") { + override fun runOnce(): Long = ensureMinimumConnections(state) + }, + ) + } + /** * Holding the lock of the connection being added or removed when mutating this, and check its * [RealConnection.noNewExchanges] property. This defends against races where a connection is @@ -131,7 +149,7 @@ class RealConnectionPool( connections.add(connection) // connection.queueEvent { connectionListener.connectEnd(connection) } - cleanupQueue.schedule(cleanupTask) + scheduleConnectionCloser() } /** @@ -145,9 +163,10 @@ class RealConnectionPool( connection.noNewExchanges = true connections.remove(connection) if (connections.isEmpty()) cleanupQueue.cancelAll() + scheduleConnectionOpener() true } else { - cleanupQueue.schedule(cleanupTask) + scheduleConnectionCloser() false } } @@ -173,6 +192,7 @@ class RealConnectionPool( } if (connections.isEmpty()) cleanupQueue.cancelAll() + scheduleConnectionOpener() } /** @@ -187,21 +207,34 @@ class RealConnectionPool( var idleConnectionCount = 0 var longestIdleConnection: RealConnection? = null var longestIdleDurationNs = Long.MIN_VALUE + var policyAffected: MinimumConnectionState? = null + policies + .forEach { it.value.unsatisfiedCountCleanupTask = it.value.policy.minimumConcurrentCalls } // Find either a connection to evict, or the time that the next eviction is due. for (connection in connections) { synchronized(connection) { - // If the connection is in use, keep searching. + val satisfiablePolicy = + policies.entries.firstOrNull { + it.value.unsatisfiedCountCleanupTask > 0 && connection.isEligible(it.key, null) + }?.value + val idleDurationNs = now - connection.idleAtNs + if (pruneAndGetAllocationCount(connection, now) > 0) { + // If the connection is in use, keep searching. + inUseConnectionCount++ + } else if (satisfiablePolicy != null && idleDurationNs < this.keepAliveDurationNs) { + // If the connection hasn't expired and helps satisfy a policy, keep searching. + satisfiablePolicy.unsatisfiedCountCleanupTask -= connection.allocationLimit inUseConnectionCount++ } else { idleConnectionCount++ // If the connection is ready to be evicted, we're done. - val idleDurationNs = now - connection.idleAtNs if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs longestIdleConnection = connection + policyAffected = satisfiablePolicy } else { Unit } @@ -212,7 +245,7 @@ class RealConnectionPool( when { longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections -> { - // We've chosen a connection to evict. Confirm it's still okay to be evict, then close it. + // We've chosen a connection to evict. Confirm it's still okay to be evicted, then close it. val connection = longestIdleConnection!! synchronized(connection) { if (connection.calls.isNotEmpty()) return 0L // No longer idle. @@ -220,6 +253,7 @@ class RealConnectionPool( connection.noNewExchanges = true connections.remove(longestIdleConnection) } + policyAffected?.schedule() connection.socket().closeQuietly() connectionListener.connectionClosed(connection) if (connections.isEmpty()) cleanupQueue.cancelAll() @@ -286,6 +320,93 @@ class RealConnectionPool( return references.size } + /** + * Adds or replaces the policy for [address]. + * This will trigger a background task to start creating connections as needed. + */ + fun setPolicy( + address: Address, + policy: ConnectionPool.AddressPolicy, + ) { + val state = MinimumConnectionState(address, taskRunner.newQueue(), policy) + val oldPolicy: ConnectionPool.AddressPolicy? + synchronized(this) { + oldPolicy = policies[address]?.policy + policies = policies + (address to state) + } + + val newConnectionsNeeded = + policy.minimumConcurrentCalls - (oldPolicy?.minimumConcurrentCalls ?: 0) + if (newConnectionsNeeded > 0) { + state.schedule() + } else if (newConnectionsNeeded < 0) { + scheduleConnectionCloser() + } + } + + fun scheduleConnectionOpener() { + policies.values.forEach { it.schedule() } + } + + fun scheduleConnectionCloser() { + cleanupQueue.schedule(cleanupTask) + } + + /** + * Ensure enough connections open to [address] to satisfy its [ConnectionPool.AddressPolicy]. + * If there are already enough connections, we're done. + * If not, we create one and then schedule the task to run again immediately. + */ + private fun ensureMinimumConnections(state: MinimumConnectionState): Long { + // This policy does not require minimum connections, don't run again + if (state.policy.minimumConcurrentCalls < 1) return -1 + + var unsatisfiedCountMinTask = state.policy.minimumConcurrentCalls + + for (connection in connections) { + synchronized(connection) { + if (connection.isEligible(state.address, null)) { + unsatisfiedCountMinTask -= connection.allocationLimit + } + } + + // The policy was satisfied by existing connections, don't run again + if (unsatisfiedCountMinTask < 1) return -1 + } + + // If we got here then the policy was not satisfied -- open a connection! + try { + val connection = exchangeFinderFactory(this, state.address, user).find() + + // RealRoutePlanner will add the connection to the pool itself, other RoutePlanners may not + // TODO: make all RoutePlanners consistent in this behavior + if (connection !in connections) { + synchronized(connection) { put(connection) } + } + + return 0 // run again immediately to create more connections if needed + } catch (ex: Exception) { + // No need to log, user.connectFailed() will already have been called. Just try again later. + return state.policy.backoffDelayMillis.jitterBy(state.policy.backoffJitterMillis) * 1_000_000 + } + } + + private fun Long.jitterBy(amount: Int): Long { + return this + ThreadLocalRandom.current().nextInt(amount * -1, amount) + } + + class MinimumConnectionState( + val address: Address, + val queue: TaskQueue, + var policy: ConnectionPool.AddressPolicy, + ) { + /** + * This field is only ever accessed by the cleanup task, and it tracks + * how many concurrent calls are already satisfied by existing connections. + */ + var unsatisfiedCountCleanupTask: Int = 0 + } + companion object { fun get(connectionPool: ConnectionPool): RealConnectionPool = connectionPool.delegate } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt index dc81e4bbe..3684a19da 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt @@ -129,7 +129,7 @@ class RealRoutePlanner( /** Plans to make a new connection by deciding which route to try next. */ @Throws(IOException::class) - private fun planConnect(): ConnectPlan { + internal fun planConnect(): ConnectPlan { // Use a route from a preceding coalesced connection. val localNextRouteToTry = nextRouteToTry if (localNextRouteToTry != null) { diff --git a/okhttp/src/test/java/okhttp3/FakeRoutePlanner.kt b/okhttp/src/test/java/okhttp3/FakeRoutePlanner.kt index e2a91ef38..e817d2e09 100644 --- a/okhttp/src/test/java/okhttp3/FakeRoutePlanner.kt +++ b/okhttp/src/test/java/okhttp3/FakeRoutePlanner.kt @@ -36,6 +36,8 @@ class FakeRoutePlanner( val events = LinkedBlockingDeque() var canceled = false + var autoGeneratePlans = false + var defaultConnectionIdleAtNanos = Long.MAX_VALUE private var nextPlanId = 0 private var nextPlanIndex = 0 private val plans = mutableListOf() @@ -56,8 +58,10 @@ class FakeRoutePlanner( // Return deferred plans preferentially. These don't require addPlan(). if (deferredPlans.isNotEmpty()) return deferredPlans.removeFirst() as FakePlan + if (nextPlanIndex >= plans.size && autoGeneratePlans) addPlan() + require(nextPlanIndex < plans.size) { - "not enough plans! call addPlan() in the test to set this up" + "not enough plans! call addPlan() or set autoGeneratePlans=true in the test to set this up" } val result = plans[nextPlanIndex++] events += "take plan ${result.id}" @@ -73,7 +77,7 @@ class FakeRoutePlanner( } override fun hasNext(failedConnection: RealConnection?): Boolean { - return deferredPlans.isNotEmpty() || nextPlanIndex < plans.size + return deferredPlans.isNotEmpty() || nextPlanIndex < plans.size || autoGeneratePlans } override fun sameHostAndPort(url: HttpUrl): Boolean { @@ -90,7 +94,12 @@ class FakeRoutePlanner( var planningThrowable: Throwable? = null var canceled = false var connectState = ConnectState.READY - val connection = factory.newConnection(pool, factory.newRoute(address)) + val connection = + factory.newConnection( + pool = pool, + route = factory.newRoute(address), + idleAtNanos = defaultConnectionIdleAtNanos, + ) var retry: FakePlan? = null var retryTaken = false var yieldBeforePlanReturns = false diff --git a/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.kt b/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.kt index 6c5697dc3..1cfa0f774 100644 --- a/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.kt +++ b/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.kt @@ -22,6 +22,7 @@ import assertk.assertions.isFalse import assertk.assertions.isNotEmpty import assertk.assertions.isTrue import okhttp3.ConnectionPool +import okhttp3.FakeRoutePlanner import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.TestUtil.awaitGarbageCollection @@ -41,6 +42,8 @@ class ConnectionPoolTest { private val addressC = factory.newAddress("c") private val routeC1 = factory.newRoute(addressC) + private val routePlanner = FakeRoutePlanner(factory.taskFaker) + @AfterEach fun tearDown() { factory.close() } @@ -192,6 +195,55 @@ class ConnectionPoolTest { assertThat(realTaskRunner.activeQueues()).isEmpty() } + @Test fun connectionPreWarming() { + // TODO this test spins forever due to bugs in TaskFaker.runTasks() + + // routePlanner.autoGeneratePlans = true + // routePlanner.defaultConnectionIdleAtNanos = System.nanoTime() + 1_000_000_000_000 + // val address = routePlanner.address + // val pool = factory.newConnectionPool(routePlanner = routePlanner) + // + // // Connections are created as soon as a policy is set + // setPolicy(pool, address, ConnectionPool.AddressPolicy(2)) + // assertThat(pool.connectionCount()).isEqualTo(2) + // + // // Connections are replaced if they idle out or are evicted from the pool + // evictAllConnections(pool) + // assertThat(pool.connectionCount()).isEqualTo(2) + // forceConnectionsToExpire(pool, routePlanner) + // assertThat(pool.connectionCount()).isEqualTo(2) + // + // // Excess connections aren't removed until they idle out, even if no longer needed + // setPolicy(pool, address, ConnectionPool.AddressPolicy(1)) + // assertThat(pool.connectionCount()).isEqualTo(2) + // forceConnectionsToExpire(pool, routePlanner) + // assertThat(pool.connectionCount()).isEqualTo(1) + + // TODO test that http/2 connections will be opened/closed based on concurrent stream settings + } + + // private fun setPolicy( + // pool: RealConnectionPool, + // address: Address, + // policy: ConnectionPool.AddressPolicy + // ) { + // pool.setPolicy(address, policy) + // factory.taskFaker.runTasks() + // } + // + // private fun evictAllConnections(pool: RealConnectionPool) { + // pool.evictAll() + // assertThat(pool.connectionCount()).isEqualTo(0) + // factory.taskFaker.runTasks() + // } + // + // private fun forceConnectionsToExpire(pool: RealConnectionPool, routePlanner: FakeRoutePlanner) { + // val idleTimeNanos = routePlanner.defaultConnectionIdleAtNanos + pool.keepAliveDurationNs + // repeat(pool.connectionCount()) { pool.cleanup(idleTimeNanos) } + // assertThat(pool.connectionCount()).isEqualTo(0) + // factory.taskFaker.runTasks() + // } + /** Use a helper method so there's no hidden reference remaining on the stack. */ private fun allocateAndLeakAllocation( pool: ConnectionPool,