1
0
mirror of https://github.com/square/okhttp.git synced 2025-08-07 12:42:57 +03:00

Support minimum connection pool size (#8287)

This PR adds support for configuring the connection pool to proactively create connections to specified hosts. This is useful for "prewarming" the connection pool before it starts being used.

The main public API change is adding `ConnectionPool.setPolicy(Address, AddressPolicy)`. A policy specifies how many _concurrent streams_ should be supported by the pool (which might be satisfied by a single http/2 connection or by N http/1.1 connections).

The main internal change is adding a new task queue to `RealConnectionPool` which checks to see if it needs to create new connections to satisfy existing policies. This task is enqueued any time a policy changes, a connection is closed, or an http/2 connection's settings decrease the number of concurrent streams supported.
This commit is contained in:
Evan Nelson
2024-04-03 13:39:41 -07:00
committed by GitHub
parent 787d19439f
commit c9ba5d732b
12 changed files with 490 additions and 50 deletions

View File

@@ -35,10 +35,13 @@ import okhttp3.internal.RecordingOkAuthenticator
import okhttp3.internal.concurrent.TaskFaker
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.connection.CallConnectionUser
import okhttp3.internal.connection.FastFallbackExchangeFinder
import okhttp3.internal.connection.RealCall
import okhttp3.internal.connection.RealConnection
import okhttp3.internal.connection.RealConnectionPool
import okhttp3.internal.connection.RealRoutePlanner
import okhttp3.internal.connection.RouteDatabase
import okhttp3.internal.connection.RoutePlanner
import okhttp3.internal.http.RealInterceptorChain
import okhttp3.internal.http.RecordingProxySelector
import okhttp3.tls.HandshakeCertificates
@@ -97,6 +100,7 @@ class TestValueFactory : Closeable {
fun newConnectionPool(
taskRunner: TaskRunner = this.taskRunner,
maxIdleConnections: Int = Int.MAX_VALUE,
routePlanner: RoutePlanner? = null,
): RealConnectionPool {
return RealConnectionPool(
taskRunner = taskRunner,
@@ -104,6 +108,25 @@ class TestValueFactory : Closeable {
keepAliveDuration = 100L,
timeUnit = TimeUnit.NANOSECONDS,
connectionListener = ConnectionListener.NONE,
exchangeFinderFactory = { pool, address, user ->
FastFallbackExchangeFinder(
routePlanner ?: RealRoutePlanner(
taskRunner = taskRunner,
connectionPool = pool,
readTimeoutMillis = 10_000,
writeTimeoutMillis = 10_000,
socketConnectTimeoutMillis = 10_000,
socketReadTimeoutMillis = 10_000,
pingIntervalMillis = 10_000,
retryOnConnectionFailure = false,
fastFallback = true,
address = address,
routeDatabase = RouteDatabase(),
connectionUser = user,
),
taskRunner,
)
},
)
}

View File

@@ -384,6 +384,16 @@ public final class okhttp3/ConnectionPool {
public final fun connectionCount ()I
public final fun evictAll ()V
public final fun idleConnectionCount ()I
public final fun setPolicy (Lokhttp3/Address;Lokhttp3/ConnectionPool$AddressPolicy;)V
}
public final class okhttp3/ConnectionPool$AddressPolicy {
public final field backoffDelayMillis J
public final field backoffJitterMillis I
public final field minimumConcurrentCalls I
public fun <init> ()V
public fun <init> (IJI)V
public synthetic fun <init> (IJIILkotlin/jvm/internal/DefaultConstructorMarker;)V
}
public final class okhttp3/ConnectionSpec {
@@ -913,6 +923,7 @@ public class okhttp3/OkHttpClient : okhttp3/Call$Factory, okhttp3/WebSocket$Fact
public final fun -deprecated_sslSocketFactory ()Ljavax/net/ssl/SSLSocketFactory;
public final fun -deprecated_writeTimeoutMillis ()I
public fun <init> ()V
public final fun address (Lokhttp3/HttpUrl;)Lokhttp3/Address;
public final fun authenticator ()Lokhttp3/Authenticator;
public final fun cache ()Lokhttp3/Cache;
public final fun callTimeoutMillis ()I

View File

@@ -18,7 +18,11 @@ package okhttp3
import java.util.concurrent.TimeUnit
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.connection.FastFallbackExchangeFinder
import okhttp3.internal.connection.ForceConnectRoutePlanner
import okhttp3.internal.connection.RealConnectionPool
import okhttp3.internal.connection.RealRoutePlanner
import okhttp3.internal.connection.RouteDatabase
/**
* Manages reuse of HTTP and HTTP/2 connections for reduced network latency. HTTP requests that
@@ -39,6 +43,14 @@ class ConnectionPool internal constructor(
timeUnit: TimeUnit = TimeUnit.MINUTES,
taskRunner: TaskRunner = TaskRunner.INSTANCE,
connectionListener: ConnectionListener = ConnectionListener.NONE,
readTimeoutMillis: Int = 10_000,
writeTimeoutMillis: Int = 10_000,
socketConnectTimeoutMillis: Int = 10_000,
socketReadTimeoutMillis: Int = 10_000,
pingIntervalMillis: Int = 10_000,
retryOnConnectionFailure: Boolean = true,
fastFallback: Boolean = true,
routeDatabase: RouteDatabase = RouteDatabase(),
) : this(
RealConnectionPool(
taskRunner = taskRunner,
@@ -46,6 +58,27 @@ class ConnectionPool internal constructor(
keepAliveDuration = keepAliveDuration,
timeUnit = timeUnit,
connectionListener = connectionListener,
exchangeFinderFactory = { pool, address, user ->
FastFallbackExchangeFinder(
ForceConnectRoutePlanner(
RealRoutePlanner(
taskRunner = taskRunner,
connectionPool = pool,
readTimeoutMillis = readTimeoutMillis,
writeTimeoutMillis = writeTimeoutMillis,
socketConnectTimeoutMillis = socketConnectTimeoutMillis,
socketReadTimeoutMillis = socketReadTimeoutMillis,
pingIntervalMillis = pingIntervalMillis,
retryOnConnectionFailure = retryOnConnectionFailure,
fastFallback = fastFallback,
address = address,
routeDatabase = routeDatabase,
connectionUser = user,
),
),
taskRunner,
)
},
),
)
@@ -92,4 +125,32 @@ class ConnectionPool internal constructor(
fun evictAll() {
delegate.evictAll()
}
/**
* Sets a policy that applies to [address].
* Overwrites any existing policy for that address.
*/
@ExperimentalOkHttpApi
fun setPolicy(
address: Address,
policy: AddressPolicy,
) {
delegate.setPolicy(address, policy)
}
/**
* A policy for how the pool should treat a specific address.
*/
class AddressPolicy(
/**
* How many concurrent calls should be possible to make at any time.
* The pool will routinely try to pre-emptively open connections to satisfy this minimum.
* Connections will still be closed if they idle beyond the keep-alive but will be replaced.
*/
@JvmField val minimumConcurrentCalls: Int = 0,
/** How long to wait to retry pre-emptive connection attempts that fail. */
@JvmField val backoffDelayMillis: Long = 60 * 1000,
/** How much jitter to introduce in connection retry backoff delays */
@JvmField val backoffJitterMillis: Int = 100,
)
}

View File

@@ -136,9 +136,6 @@ open class OkHttpClient internal constructor(
@get:JvmName("dispatcher")
val dispatcher: Dispatcher = builder.dispatcher
@get:JvmName("connectionPool")
val connectionPool: ConnectionPool = builder.connectionPool
/**
* Returns an immutable list of interceptors that observe the full span of each call: from before
* the connection is established (if any) until after the response source is selected (either the
@@ -269,6 +266,22 @@ open class OkHttpClient internal constructor(
internal val routeDatabase: RouteDatabase = builder.routeDatabase ?: RouteDatabase()
internal val taskRunner: TaskRunner = builder.taskRunner ?: TaskRunner.INSTANCE
@get:JvmName("connectionPool")
val connectionPool: ConnectionPool =
builder.connectionPool ?: ConnectionPool(
readTimeoutMillis = readTimeoutMillis,
writeTimeoutMillis = writeTimeoutMillis,
socketConnectTimeoutMillis = connectTimeoutMillis,
socketReadTimeoutMillis = readTimeoutMillis,
pingIntervalMillis = pingIntervalMillis,
retryOnConnectionFailure = retryOnConnectionFailure,
fastFallback = fastFallback,
routeDatabase = routeDatabase,
).also {
// Cache the pool in the builder so that it will be shared with other clients
builder.connectionPool = it
}
constructor() : this(Builder())
init {
@@ -296,6 +309,36 @@ open class OkHttpClient internal constructor(
verifyClientState()
}
/**
* Creates an [Address] of out of the provided [HttpUrl]
* that uses this clients DNS, TLS, and proxy configuration.
*/
fun address(url: HttpUrl): Address {
var useSslSocketFactory: SSLSocketFactory? = null
var useHostnameVerifier: HostnameVerifier? = null
var useCertificatePinner: CertificatePinner? = null
if (url.isHttps) {
useSslSocketFactory = sslSocketFactory
useHostnameVerifier = hostnameVerifier
useCertificatePinner = certificatePinner
}
return Address(
uriHost = url.host,
uriPort = url.port,
dns = dns,
socketFactory = socketFactory,
sslSocketFactory = useSslSocketFactory,
hostnameVerifier = useHostnameVerifier,
certificatePinner = useCertificatePinner,
proxyAuthenticator = proxyAuthenticator,
proxy = proxy,
protocols = protocols,
connectionSpecs = connectionSpecs,
proxySelector = proxySelector,
)
}
private fun verifyClientState() {
check(null !in (interceptors as List<Interceptor?>)) {
"Null interceptor: $interceptors"
@@ -552,7 +595,7 @@ open class OkHttpClient internal constructor(
class Builder() {
internal var dispatcher: Dispatcher = Dispatcher()
internal var connectionPool: ConnectionPool = ConnectionPool()
internal var connectionPool: ConnectionPool? = null
internal val interceptors: MutableList<Interceptor> = mutableListOf()
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()

View File

@@ -0,0 +1,23 @@
/*
* Copyright (C) 2024 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3.internal.connection
/**
* A RoutePlanner that will always establish a new connection, ignoring any connection pooling
*/
class ForceConnectRoutePlanner(private val delegate: RealRoutePlanner) : RoutePlanner by delegate {
override fun plan(): RoutePlanner.Plan = delegate.planConnect() // not delegate.plan()
}

View File

@@ -0,0 +1,118 @@
/*
* Copyright (C) 2024 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3.internal.connection
import java.io.IOException
import java.net.InetAddress
import java.net.Proxy
import java.net.Socket
import okhttp3.Connection
import okhttp3.Handshake
import okhttp3.HttpUrl
import okhttp3.Protocol
import okhttp3.Route
/**
* A user that is a connection pool creating connections in the background
* without an intent to immediately use them.
*/
object PoolConnectionUser : ConnectionUser {
override fun addPlanToCancel(connectPlan: ConnectPlan) {
}
override fun removePlanToCancel(connectPlan: ConnectPlan) {
}
override fun updateRouteDatabaseAfterSuccess(route: Route) {
}
override fun connectStart(route: Route) {
}
override fun secureConnectStart() {
}
override fun secureConnectEnd(handshake: Handshake?) {
}
override fun callConnectEnd(
route: Route,
protocol: Protocol?,
) {
}
override fun connectionConnectEnd(
connection: Connection,
route: Route,
) {
}
override fun connectFailed(
route: Route,
protocol: Protocol?,
e: IOException,
) {
}
override fun connectionAcquired(connection: Connection) {
}
override fun acquireConnectionNoEvents(connection: RealConnection) {
}
override fun releaseConnectionNoEvents(): Socket? {
return null
}
override fun connectionReleased(connection: Connection) {
}
override fun connectionConnectionAcquired(connection: RealConnection) {
}
override fun connectionConnectionReleased(connection: RealConnection) {
}
override fun connectionConnectionClosed(connection: RealConnection) {
}
override fun noNewExchanges(connection: RealConnection) {
}
override fun doExtensiveHealthChecks(): Boolean = false
override fun isCanceled(): Boolean = false
override fun candidateConnection(): RealConnection? = null
override fun proxySelectStart(url: HttpUrl) {
}
override fun proxySelectEnd(
url: HttpUrl,
proxies: List<Proxy>,
) {
}
override fun dnsStart(socketHost: String) {
}
override fun dnsEnd(
socketHost: String,
result: List<InetAddress>,
) {
}
}

View File

@@ -25,14 +25,9 @@ import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import javax.net.ssl.HostnameVerifier
import javax.net.ssl.SSLSocketFactory
import okhttp3.Address
import okhttp3.Call
import okhttp3.Callback
import okhttp3.CertificatePinner
import okhttp3.EventListener
import okhttp3.HttpUrl
import okhttp3.Interceptor
import okhttp3.OkHttpClient
import okhttp3.Request
@@ -256,7 +251,7 @@ class RealCall(
pingIntervalMillis = client.pingIntervalMillis,
retryOnConnectionFailure = client.retryOnConnectionFailure,
fastFallback = client.fastFallback,
address = createAddress(request.url),
address = client.address(request.url),
connectionUser = CallConnectionUser(this, connectionPool.connectionListener, chain),
routeDatabase = client.routeDatabase,
)
@@ -459,32 +454,6 @@ class RealCall(
interceptorScopedExchange = null
}
private fun createAddress(url: HttpUrl): Address {
var sslSocketFactory: SSLSocketFactory? = null
var hostnameVerifier: HostnameVerifier? = null
var certificatePinner: CertificatePinner? = null
if (url.isHttps) {
sslSocketFactory = client.sslSocketFactory
hostnameVerifier = client.hostnameVerifier
certificatePinner = client.certificatePinner
}
return Address(
uriHost = url.host,
uriPort = url.port,
dns = client.dns,
socketFactory = client.socketFactory,
sslSocketFactory = sslSocketFactory,
hostnameVerifier = hostnameVerifier,
certificatePinner = certificatePinner,
proxyAuthenticator = client.proxyAuthenticator,
proxy = client.proxy,
protocols = client.protocols,
connectionSpecs = client.connectionSpecs,
proxySelector = client.proxySelector,
)
}
fun retryAfterFailure(): Boolean {
return exchange?.hasFailure == true &&
exchangeFinder!!.routePlanner.hasNext(exchange?.connection)

View File

@@ -111,7 +111,8 @@ class RealConnection(
* The maximum number of concurrent streams that can be carried by this connection. If
* `allocations.size() < allocationLimit` then new streams can be created on this connection.
*/
private var allocationLimit = 1
internal var allocationLimit = 1
private set
/** Current calls carried by this connection. */
val calls = mutableListOf<Reference<RealCall>>()
@@ -344,7 +345,16 @@ class RealConnection(
connection: Http2Connection,
settings: Settings,
) {
val oldLimit = allocationLimit
allocationLimit = settings.getMaxConcurrentStreams()
if (allocationLimit < oldLimit) {
// We might need new connections to keep policies satisfied
connectionPool.scheduleConnectionOpener()
} else if (allocationLimit > oldLimit) {
// We might no longer need some connections
connectionPool.scheduleConnectionCloser()
}
}
override fun handshake(): Handshake? = handshake

View File

@@ -18,6 +18,7 @@ package okhttp3.internal.connection
import java.net.Socket
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeUnit
import okhttp3.Address
import okhttp3.ConnectionListener
@@ -33,21 +34,38 @@ import okhttp3.internal.okHttpName
import okhttp3.internal.platform.Platform
class RealConnectionPool(
taskRunner: TaskRunner,
/** The maximum number of idle connections for each address. */
private val taskRunner: TaskRunner,
/**
* The maximum number of idle connections across all addresses.
* Connections needed to satisfy a [ConnectionPool.AddressPolicy] are not considered idle.
*/
private val maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit,
internal val connectionListener: ConnectionListener,
private val exchangeFinderFactory: (RealConnectionPool, Address, ConnectionUser) -> ExchangeFinder,
) {
private val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration)
internal val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration)
// guarded by [this]
private var policies: Map<Address, MinimumConnectionState> = mapOf()
private val user = PoolConnectionUser
private val cleanupQueue: TaskQueue = taskRunner.newQueue()
private val cleanupTask =
object : Task("$okHttpName ConnectionPool") {
object : Task("$okHttpName ConnectionPool connection closer") {
override fun runOnce(): Long = cleanup(System.nanoTime())
}
private fun MinimumConnectionState.schedule() {
val state = this
queue.schedule(
object : Task("$okHttpName ConnectionPool connection opener") {
override fun runOnce(): Long = ensureMinimumConnections(state)
},
)
}
/**
* Holding the lock of the connection being added or removed when mutating this, and check its
* [RealConnection.noNewExchanges] property. This defends against races where a connection is
@@ -131,7 +149,7 @@ class RealConnectionPool(
connections.add(connection)
// connection.queueEvent { connectionListener.connectEnd(connection) }
cleanupQueue.schedule(cleanupTask)
scheduleConnectionCloser()
}
/**
@@ -145,9 +163,10 @@ class RealConnectionPool(
connection.noNewExchanges = true
connections.remove(connection)
if (connections.isEmpty()) cleanupQueue.cancelAll()
scheduleConnectionOpener()
true
} else {
cleanupQueue.schedule(cleanupTask)
scheduleConnectionCloser()
false
}
}
@@ -173,6 +192,7 @@ class RealConnectionPool(
}
if (connections.isEmpty()) cleanupQueue.cancelAll()
scheduleConnectionOpener()
}
/**
@@ -187,21 +207,34 @@ class RealConnectionPool(
var idleConnectionCount = 0
var longestIdleConnection: RealConnection? = null
var longestIdleDurationNs = Long.MIN_VALUE
var policyAffected: MinimumConnectionState? = null
policies
.forEach { it.value.unsatisfiedCountCleanupTask = it.value.policy.minimumConcurrentCalls }
// Find either a connection to evict, or the time that the next eviction is due.
for (connection in connections) {
synchronized(connection) {
// If the connection is in use, keep searching.
val satisfiablePolicy =
policies.entries.firstOrNull {
it.value.unsatisfiedCountCleanupTask > 0 && connection.isEligible(it.key, null)
}?.value
val idleDurationNs = now - connection.idleAtNs
if (pruneAndGetAllocationCount(connection, now) > 0) {
// If the connection is in use, keep searching.
inUseConnectionCount++
} else if (satisfiablePolicy != null && idleDurationNs < this.keepAliveDurationNs) {
// If the connection hasn't expired and helps satisfy a policy, keep searching.
satisfiablePolicy.unsatisfiedCountCleanupTask -= connection.allocationLimit
inUseConnectionCount++
} else {
idleConnectionCount++
// If the connection is ready to be evicted, we're done.
val idleDurationNs = now - connection.idleAtNs
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs
longestIdleConnection = connection
policyAffected = satisfiablePolicy
} else {
Unit
}
@@ -212,7 +245,7 @@ class RealConnectionPool(
when {
longestIdleDurationNs >= this.keepAliveDurationNs ||
idleConnectionCount > this.maxIdleConnections -> {
// We've chosen a connection to evict. Confirm it's still okay to be evict, then close it.
// We've chosen a connection to evict. Confirm it's still okay to be evicted, then close it.
val connection = longestIdleConnection!!
synchronized(connection) {
if (connection.calls.isNotEmpty()) return 0L // No longer idle.
@@ -220,6 +253,7 @@ class RealConnectionPool(
connection.noNewExchanges = true
connections.remove(longestIdleConnection)
}
policyAffected?.schedule()
connection.socket().closeQuietly()
connectionListener.connectionClosed(connection)
if (connections.isEmpty()) cleanupQueue.cancelAll()
@@ -286,6 +320,93 @@ class RealConnectionPool(
return references.size
}
/**
* Adds or replaces the policy for [address].
* This will trigger a background task to start creating connections as needed.
*/
fun setPolicy(
address: Address,
policy: ConnectionPool.AddressPolicy,
) {
val state = MinimumConnectionState(address, taskRunner.newQueue(), policy)
val oldPolicy: ConnectionPool.AddressPolicy?
synchronized(this) {
oldPolicy = policies[address]?.policy
policies = policies + (address to state)
}
val newConnectionsNeeded =
policy.minimumConcurrentCalls - (oldPolicy?.minimumConcurrentCalls ?: 0)
if (newConnectionsNeeded > 0) {
state.schedule()
} else if (newConnectionsNeeded < 0) {
scheduleConnectionCloser()
}
}
fun scheduleConnectionOpener() {
policies.values.forEach { it.schedule() }
}
fun scheduleConnectionCloser() {
cleanupQueue.schedule(cleanupTask)
}
/**
* Ensure enough connections open to [address] to satisfy its [ConnectionPool.AddressPolicy].
* If there are already enough connections, we're done.
* If not, we create one and then schedule the task to run again immediately.
*/
private fun ensureMinimumConnections(state: MinimumConnectionState): Long {
// This policy does not require minimum connections, don't run again
if (state.policy.minimumConcurrentCalls < 1) return -1
var unsatisfiedCountMinTask = state.policy.minimumConcurrentCalls
for (connection in connections) {
synchronized(connection) {
if (connection.isEligible(state.address, null)) {
unsatisfiedCountMinTask -= connection.allocationLimit
}
}
// The policy was satisfied by existing connections, don't run again
if (unsatisfiedCountMinTask < 1) return -1
}
// If we got here then the policy was not satisfied -- open a connection!
try {
val connection = exchangeFinderFactory(this, state.address, user).find()
// RealRoutePlanner will add the connection to the pool itself, other RoutePlanners may not
// TODO: make all RoutePlanners consistent in this behavior
if (connection !in connections) {
synchronized(connection) { put(connection) }
}
return 0 // run again immediately to create more connections if needed
} catch (ex: Exception) {
// No need to log, user.connectFailed() will already have been called. Just try again later.
return state.policy.backoffDelayMillis.jitterBy(state.policy.backoffJitterMillis) * 1_000_000
}
}
private fun Long.jitterBy(amount: Int): Long {
return this + ThreadLocalRandom.current().nextInt(amount * -1, amount)
}
class MinimumConnectionState(
val address: Address,
val queue: TaskQueue,
var policy: ConnectionPool.AddressPolicy,
) {
/**
* This field is only ever accessed by the cleanup task, and it tracks
* how many concurrent calls are already satisfied by existing connections.
*/
var unsatisfiedCountCleanupTask: Int = 0
}
companion object {
fun get(connectionPool: ConnectionPool): RealConnectionPool = connectionPool.delegate
}

View File

@@ -129,7 +129,7 @@ class RealRoutePlanner(
/** Plans to make a new connection by deciding which route to try next. */
@Throws(IOException::class)
private fun planConnect(): ConnectPlan {
internal fun planConnect(): ConnectPlan {
// Use a route from a preceding coalesced connection.
val localNextRouteToTry = nextRouteToTry
if (localNextRouteToTry != null) {

View File

@@ -36,6 +36,8 @@ class FakeRoutePlanner(
val events = LinkedBlockingDeque<String>()
var canceled = false
var autoGeneratePlans = false
var defaultConnectionIdleAtNanos = Long.MAX_VALUE
private var nextPlanId = 0
private var nextPlanIndex = 0
private val plans = mutableListOf<FakePlan>()
@@ -56,8 +58,10 @@ class FakeRoutePlanner(
// Return deferred plans preferentially. These don't require addPlan().
if (deferredPlans.isNotEmpty()) return deferredPlans.removeFirst() as FakePlan
if (nextPlanIndex >= plans.size && autoGeneratePlans) addPlan()
require(nextPlanIndex < plans.size) {
"not enough plans! call addPlan() in the test to set this up"
"not enough plans! call addPlan() or set autoGeneratePlans=true in the test to set this up"
}
val result = plans[nextPlanIndex++]
events += "take plan ${result.id}"
@@ -73,7 +77,7 @@ class FakeRoutePlanner(
}
override fun hasNext(failedConnection: RealConnection?): Boolean {
return deferredPlans.isNotEmpty() || nextPlanIndex < plans.size
return deferredPlans.isNotEmpty() || nextPlanIndex < plans.size || autoGeneratePlans
}
override fun sameHostAndPort(url: HttpUrl): Boolean {
@@ -90,7 +94,12 @@ class FakeRoutePlanner(
var planningThrowable: Throwable? = null
var canceled = false
var connectState = ConnectState.READY
val connection = factory.newConnection(pool, factory.newRoute(address))
val connection =
factory.newConnection(
pool = pool,
route = factory.newRoute(address),
idleAtNanos = defaultConnectionIdleAtNanos,
)
var retry: FakePlan? = null
var retryTaken = false
var yieldBeforePlanReturns = false

View File

@@ -22,6 +22,7 @@ import assertk.assertions.isFalse
import assertk.assertions.isNotEmpty
import assertk.assertions.isTrue
import okhttp3.ConnectionPool
import okhttp3.FakeRoutePlanner
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.TestUtil.awaitGarbageCollection
@@ -41,6 +42,8 @@ class ConnectionPoolTest {
private val addressC = factory.newAddress("c")
private val routeC1 = factory.newRoute(addressC)
private val routePlanner = FakeRoutePlanner(factory.taskFaker)
@AfterEach fun tearDown() {
factory.close()
}
@@ -192,6 +195,55 @@ class ConnectionPoolTest {
assertThat(realTaskRunner.activeQueues()).isEmpty()
}
@Test fun connectionPreWarming() {
// TODO this test spins forever due to bugs in TaskFaker.runTasks()
// routePlanner.autoGeneratePlans = true
// routePlanner.defaultConnectionIdleAtNanos = System.nanoTime() + 1_000_000_000_000
// val address = routePlanner.address
// val pool = factory.newConnectionPool(routePlanner = routePlanner)
//
// // Connections are created as soon as a policy is set
// setPolicy(pool, address, ConnectionPool.AddressPolicy(2))
// assertThat(pool.connectionCount()).isEqualTo(2)
//
// // Connections are replaced if they idle out or are evicted from the pool
// evictAllConnections(pool)
// assertThat(pool.connectionCount()).isEqualTo(2)
// forceConnectionsToExpire(pool, routePlanner)
// assertThat(pool.connectionCount()).isEqualTo(2)
//
// // Excess connections aren't removed until they idle out, even if no longer needed
// setPolicy(pool, address, ConnectionPool.AddressPolicy(1))
// assertThat(pool.connectionCount()).isEqualTo(2)
// forceConnectionsToExpire(pool, routePlanner)
// assertThat(pool.connectionCount()).isEqualTo(1)
// TODO test that http/2 connections will be opened/closed based on concurrent stream settings
}
// private fun setPolicy(
// pool: RealConnectionPool,
// address: Address,
// policy: ConnectionPool.AddressPolicy
// ) {
// pool.setPolicy(address, policy)
// factory.taskFaker.runTasks()
// }
//
// private fun evictAllConnections(pool: RealConnectionPool) {
// pool.evictAll()
// assertThat(pool.connectionCount()).isEqualTo(0)
// factory.taskFaker.runTasks()
// }
//
// private fun forceConnectionsToExpire(pool: RealConnectionPool, routePlanner: FakeRoutePlanner) {
// val idleTimeNanos = routePlanner.defaultConnectionIdleAtNanos + pool.keepAliveDurationNs
// repeat(pool.connectionCount()) { pool.cleanup(idleTimeNanos) }
// assertThat(pool.connectionCount()).isEqualTo(0)
// factory.taskFaker.runTasks()
// }
/** Use a helper method so there's no hidden reference remaining on the stack. */
private fun allocateAndLeakAllocation(
pool: ConnectionPool,