From 776239bfe43e65291ec734d24f2425f1bc5acfe1 Mon Sep 17 00:00:00 2001 From: Brian Peck Date: Wed, 24 Apr 2024 08:52:45 -0700 Subject: [PATCH] Allow AddressPolicy to have a max calls per connection This adds the ability for the Address Policy to also contain a maximum number of connections per Stream. In some environments it seems that OkHttp's HTTP/2 gets worse the more streams that are packed onto a single connection. By adding in the ability for users to alter their client's AddressPolicy to set a maximum number of connections per stream, they can control how much (if any) packing they want OkHttp to perform. --- .../internal/connection/RealConnection.kt | 9 ++- .../internal/connection/ConnectionPoolTest.kt | 72 ++++++++++++++++++- 2 files changed, 76 insertions(+), 5 deletions(-) diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt index 19f3e8a15..82c36acdf 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt @@ -120,7 +120,7 @@ class RealConnection( internal var allocationLimit = 1 private set - private var lastMaxConcurrentStreamsFromSettings: Int = Int.MAX_VALUE + private var lastMaxConcurrentStreamsFromSettings: Int? = null /** Current calls carried by this connection. */ val calls = mutableListOf>() @@ -385,11 +385,14 @@ class RealConnection( } private fun getMaximumAllocationLimit() : Int { - val maxConcurrentCalls = connectionPool.getPolicy(route.address) + // if we have not negotiated a max per streams yet, don't check for the policy override + val negotiatedMaxCurrentStreams = lastMaxConcurrentStreamsFromSettings ?: return 1 + + val maxPolicyValue = connectionPool.getPolicy(route.address) ?.maximumConcurrentCallsPerConnection ?: Int.MAX_VALUE - return min(lastMaxConcurrentStreamsFromSettings, maxConcurrentCalls) + return min(maxPolicyValue, negotiatedMaxCurrentStreams) } override fun handshake(): Handshake? = handshake diff --git a/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.kt b/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.kt index a6cea87ec..99d558e81 100644 --- a/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.kt +++ b/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.kt @@ -228,8 +228,8 @@ class ConnectionPoolTest { forceConnectionsToExpire(pool, expireTime) assertThat(pool.connectionCount()).isEqualTo(1) -// setPolicy(pool, address, ConnectionPool.AddressPolicy(3)) -// assertThat(pool.connectionCount()).isEqualTo(3) + setPolicy(pool, address, ConnectionPool.AddressPolicy(3)) + assertThat(pool.connectionCount()).isEqualTo(3) } @Test fun connectionPreWarmingHttp2() { @@ -266,6 +266,74 @@ class ConnectionPoolTest { assertThat(pool.connectionCount()).isEqualTo(1) } + @Test fun testSettingMaxConcurrentOnAddressPolicyHttp2() { + taskFaker.advanceUntil(System.nanoTime()) + val expireSooner = taskFaker.nanoTime + 1_000_000_000_000 + val expireLater = taskFaker.nanoTime + 2_000_000_000_000 + + routePlanner.autoGeneratePlans = true + val address = routePlanner.address + val pool = routePlanner.pool + + // Add a connection to the pool that won't expire for a while + routePlanner.defaultConnectionIdleAtNanos = expireLater + setPolicy(pool, address, ConnectionPool.AddressPolicy(1)) + assertThat(pool.connectionCount()).isEqualTo(1) + + // All other connections created will expire sooner + routePlanner.defaultConnectionIdleAtNanos = expireSooner + + // Turn it into an http/2 connection that supports 5 concurrent streams + // which can satisfy a larger policy + val connection = routePlanner.plans.first().connection + val http2Connection = connectHttp2(peer, connection, 5) + setPolicy(pool, address, ConnectionPool.AddressPolicy(5)) + assertThat(pool.connectionCount()).isEqualTo(1) + + // Decrease the policy max connections, and check that new connections are created + setPolicy(pool, address, ConnectionPool.AddressPolicy(5, 1)) + // fills up the first connect and then adds single connections + // 5 = 1 + 1 + 1 + 1 + 1 (five unique connections) + assertThat(pool.connectionCount()).isEqualTo(5) + + // increase the policy max connections, and check that new connections are created + setPolicy(pool, address, ConnectionPool.AddressPolicy(5, 2)) + forceConnectionsToExpire(pool, expireSooner) + // fills up the first connect and then adds single connections + // 5 = 2 + 1 + 1 + 1 (four unique connections) + assertThat(pool.connectionCount()).isEqualTo(4) + + // increase the policy max connections, and check that new connections are created + setPolicy(pool, address, ConnectionPool.AddressPolicy(5, 4)) + forceConnectionsToExpire(pool, expireSooner) + // fills up the first connect and then adds single connections + // 5 = 4 + 1 (two unique connections) + assertThat(pool.connectionCount()).isEqualTo(2) + + // Decrease the policy max connections, and check that new connections are created + setPolicy(pool, address, ConnectionPool.AddressPolicy(5, 3)) + // fills up the first connect and then removes an unused after + // 5 = 3 + 1 + 1 (three unique connections) + assertThat(pool.connectionCount()).isEqualTo(3) + + // If you update the settings to something smaller than the current + // set policy it should be adhered too + updateMaxConcurrentStreams(http2Connection, 2) + forceConnectionsToExpire(pool, expireSooner) + // fills up the first connect and then adds single connections + // 5 = 2 + 1 + 1 + 1 (four unique connections) + assertThat(pool.connectionCount()).isEqualTo(4) + + // If you update the settings to something more than the current + // set policy it should not go past the max in the policy + updateMaxConcurrentStreams(http2Connection, 5) + forceConnectionsToExpire(pool, expireSooner) + // fills up the first connect and then adds single connections + // 5 = 3 + 1 + 1 (three unique connections) + assertThat(pool.connectionCount()).isEqualTo(3) + } + + private fun setPolicy( pool: RealConnectionPool, address: Address,