From f15c81b4e9e6a6066cc1a734de0d3ae6b0d13033 Mon Sep 17 00:00:00 2001 From: Jesse Wilson Date: Fri, 28 Jan 2022 11:11:05 -0500 Subject: [PATCH] HappyEyeballs (#7035) This doesn't yet introduce any mechanism to enable or disable happy eyeballs. It also doesn't sort IP addresses to alternate IPv6, IPv4 for best success. It also doesn't limit how many connections are attempted simultaneously. It also lacks an appropriate number of tests. --- .../logging/LoggingEventListenerTest.java | 25 ++-- .../kotlin/okhttp3/OkHttpClientTestRule.kt | 7 +- .../jvmMain/kotlin/okhttp3/OkHttpClient.kt | 17 +++ .../internal/connection/ExchangeFinder.kt | 2 +- .../connection/FastFallbackExchangeFinder.kt | 137 ++++++++++++++++++ .../okhttp3/internal/connection/RealCall.kt | 10 +- .../internal/connection/RealRoutePlanner.kt | 16 +- .../internal/connection/RoutePlanner.kt | 12 +- .../jvmTest/java/okhttp3/FakeRoutePlanner.kt | 87 +++++++++++ ...ppyEyeballsTest.kt => FastFallbackTest.kt} | 44 +++++- .../FastFallbackExchangeFinderTest.kt | 39 +++++ 11 files changed, 359 insertions(+), 37 deletions(-) create mode 100644 okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/FastFallbackExchangeFinder.kt create mode 100644 okhttp/src/jvmTest/java/okhttp3/FakeRoutePlanner.kt rename okhttp/src/jvmTest/java/okhttp3/{HappyEyeballsTest.kt => FastFallbackTest.kt} (89%) create mode 100644 okhttp/src/jvmTest/java/okhttp3/internal/connection/FastFallbackExchangeFinderTest.kt diff --git a/okhttp-logging-interceptor/src/test/java/okhttp3/logging/LoggingEventListenerTest.java b/okhttp-logging-interceptor/src/test/java/okhttp3/logging/LoggingEventListenerTest.java index 592f799a4..85a1de599 100644 --- a/okhttp-logging-interceptor/src/test/java/okhttp3/logging/LoggingEventListenerTest.java +++ b/okhttp-logging-interceptor/src/test/java/okhttp3/logging/LoggingEventListenerTest.java @@ -17,20 +17,21 @@ package okhttp3.logging; import java.io.IOException; import java.net.UnknownHostException; +import mockwebserver3.MockResponse; +import mockwebserver3.MockWebServer; +import mockwebserver3.SocketPolicy; import mockwebserver3.junit5.internal.MockWebServerExtension; import okhttp3.Call; import okhttp3.EventListener; import okhttp3.HttpUrl; import okhttp3.MediaType; import okhttp3.OkHttpClient; -import okhttp3.TestUtil; -import okhttp3.testing.PlatformRule; +import okhttp3.OkHttpClientTestRule; import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; -import mockwebserver3.MockResponse; -import mockwebserver3.MockWebServer; -import mockwebserver3.SocketPolicy; +import okhttp3.TestUtil; +import okhttp3.testing.PlatformRule; import okhttp3.tls.HandshakeCertificates; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -49,6 +50,7 @@ public final class LoggingEventListenerTest { private static final MediaType PLAIN = MediaType.get("text/plain"); @RegisterExtension public final PlatformRule platform = new PlatformRule(); + @RegisterExtension public final OkHttpClientTestRule clientTestRule = new OkHttpClientTestRule(); private MockWebServer server; private final HandshakeCertificates handshakeCertificates = localhost(); @@ -61,13 +63,12 @@ public final class LoggingEventListenerTest { @BeforeEach public void setUp(MockWebServer server) { this.server = server; - client = - new OkHttpClient.Builder() - .eventListenerFactory(loggingEventListenerFactory) - .sslSocketFactory( - handshakeCertificates.sslSocketFactory(), handshakeCertificates.trustManager()) - .retryOnConnectionFailure(false) - .build(); + this.client = clientTestRule.newClientBuilder() + .eventListenerFactory(loggingEventListenerFactory) + .sslSocketFactory(handshakeCertificates.sslSocketFactory(), + handshakeCertificates.trustManager()) + .retryOnConnectionFailure(false) + .build(); url = server.url("/"); } diff --git a/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt b/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt index dc781f188..3b5d5b079 100644 --- a/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt +++ b/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt @@ -114,9 +114,10 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback { var client = testClient if (client == null) { client = OkHttpClient.Builder() - .dns(SINGLE_INET_ADDRESS_DNS) // Prevent unexpected fallback addresses. - .eventListenerFactory { ClientRuleEventListener(logger = ::addEvent) } - .build() + .fastFallback(true) // Test this by default, since it'll soon be the default. + .dns(SINGLE_INET_ADDRESS_DNS) // Prevent unexpected fallback addresses. + .eventListenerFactory { ClientRuleEventListener(logger = ::addEvent) } + .build() testClient = client } return client diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/OkHttpClient.kt b/okhttp/src/jvmMain/kotlin/okhttp3/OkHttpClient.kt index 7f89f752d..ddc07d100 100644 --- a/okhttp/src/jvmMain/kotlin/okhttp3/OkHttpClient.kt +++ b/okhttp/src/jvmMain/kotlin/okhttp3/OkHttpClient.kt @@ -158,6 +158,8 @@ open class OkHttpClient internal constructor( @get:JvmName("retryOnConnectionFailure") val retryOnConnectionFailure: Boolean = builder.retryOnConnectionFailure + @get:JvmName("fastFallback") val fastFallback: Boolean = builder.fastFallback + @get:JvmName("authenticator") val authenticator: Authenticator = builder.authenticator @get:JvmName("followRedirects") val followRedirects: Boolean = builder.followRedirects @@ -510,6 +512,7 @@ open class OkHttpClient internal constructor( internal val networkInterceptors: MutableList = mutableListOf() internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory() internal var retryOnConnectionFailure = true + internal var fastFallback = false internal var authenticator: Authenticator = Authenticator.NONE internal var followRedirects = true internal var followSslRedirects = true @@ -543,6 +546,7 @@ open class OkHttpClient internal constructor( this.networkInterceptors += okHttpClient.networkInterceptors this.eventListenerFactory = okHttpClient.eventListenerFactory this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure + this.fastFallback = okHttpClient.fastFallback this.authenticator = okHttpClient.authenticator this.followRedirects = okHttpClient.followRedirects this.followSslRedirects = okHttpClient.followSslRedirects @@ -658,6 +662,19 @@ open class OkHttpClient internal constructor( this.retryOnConnectionFailure = retryOnConnectionFailure } + /** + * Configure this client to perform fast fallbacks by attempting multiple connections + * concurrently, returning once any connection connects successfully. + * + * This implements Happy Eyeballs ([RFC 6555][rfc_6555]), balancing connect latency vs. + * wasted resources. + * + * [rfc_6555]: https://datatracker.ietf.org/doc/html/rfc6555 + */ + fun fastFallback(fastFallback: Boolean) = apply { + this.fastFallback = fastFallback + } + /** * Sets the authenticator used to respond to challenges from origin servers. Use * [proxyAuthenticator] to set the authenticator for proxy servers. diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/ExchangeFinder.kt b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/ExchangeFinder.kt index 3abcc9b29..bd5890b5c 100644 --- a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/ExchangeFinder.kt +++ b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/ExchangeFinder.kt @@ -39,7 +39,7 @@ internal class ExchangeFinder( } else { firstException.addSuppressed(e) } - if (!routePlanner.retryAfterFailure()) { + if (!routePlanner.hasMoreRoutes()) { throw firstException } } diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/FastFallbackExchangeFinder.kt b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/FastFallbackExchangeFinder.kt new file mode 100644 index 000000000..693416084 --- /dev/null +++ b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/FastFallbackExchangeFinder.kt @@ -0,0 +1,137 @@ +/* + * Copyright (C) 2022 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.internal.connection + +import java.io.IOException +import java.util.concurrent.LinkedBlockingDeque +import java.util.concurrent.TimeUnit +import okhttp3.internal.concurrent.Task +import okhttp3.internal.concurrent.TaskRunner +import okhttp3.internal.connection.RoutePlanner.Plan +import okhttp3.internal.okHttpName + +/** + * Speculatively connects to each IP address of a target address, returning as soon as one of them + * connects successfully. This kicks off new attempts every 250 ms until a connect succeeds. + */ +internal class FastFallbackExchangeFinder( + private val routePlanner: RoutePlanner, + private val taskRunner: TaskRunner, +) { + private val connectDelayMillis = 250L + + /** Plans currently being connected, and that will later be added to [connectResults]. */ + private var connectsInFlight = mutableListOf() + + /** + * Results are posted here as they occur. The find job is done when either one plan completes + * successfully or all plans fail. + */ + private val connectResults = LinkedBlockingDeque() + + /** Exceptions accumulate here. */ + private var firstException: IOException? = null + + /** True until we've launched all the connects we'll ever launch. */ + private var morePlansExist = true + + fun find(): RealConnection { + try { + while (morePlansExist || connectsInFlight.isNotEmpty()) { + if (routePlanner.isCanceled()) throw IOException("Canceled") + + launchConnect() + + val connection = awaitConnection() + if (connection != null) return connection + + morePlansExist = morePlansExist && routePlanner.hasMoreRoutes() + } + + throw firstException!! + } finally { + for (plan in connectsInFlight) { + plan.cancel() + } + } + } + + private fun launchConnect() { + if (!morePlansExist) return + + val plan = try { + routePlanner.plan() + } catch (e: IOException) { + trackFailure(e) + return + } + + connectsInFlight += plan + + // Already connected? Enqueue the result immediately. + if (plan.isConnected) { + connectResults.put(ConnectResult(plan, null)) + return + } + + // Connect asynchronously. + val taskName = "$okHttpName connect ${routePlanner.address.url.redact()}" + taskRunner.newQueue().schedule(object : Task(taskName) { + override fun runOnce(): Long { + try { + plan.connect() + connectResults.put(ConnectResult(plan, null)) + } catch (e: Throwable) { + connectResults.put(ConnectResult(plan, e)) + } + return -1L + } + }) + } + + private fun awaitConnection(): RealConnection? { + if (connectsInFlight.isEmpty()) return null + + val completed = connectResults.poll(connectDelayMillis, TimeUnit.MILLISECONDS) ?: return null + + connectsInFlight.remove(completed.plan) + + val exception = completed.throwable + if (exception is IOException) { + trackFailure(exception) + return null + } else if (exception != null) { + throw exception + } + + return completed.plan.handleSuccess() + } + + private fun trackFailure(exception: IOException) { + routePlanner.trackFailure(exception) + + if (firstException == null) { + firstException = exception + } else { + firstException!!.addSuppressed(exception) + } + } + + private class ConnectResult( + val plan: Plan, + val throwable: Throwable?, + ) +} diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealCall.kt b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealCall.kt index c7191e931..c38caa43a 100644 --- a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealCall.kt +++ b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealCall.kt @@ -260,9 +260,11 @@ class RealCall( } val routePlanner = this.routePlanner!! - val codec = ExchangeFinder(routePlanner) - .find() - .newCodec(client, chain) + val connection = when { + client.fastFallback -> FastFallbackExchangeFinder(routePlanner, client.taskRunner).find() + else -> ExchangeFinder(routePlanner).find() + } + val codec = connection.newCodec(client, chain) val result = Exchange(this, eventListener, routePlanner, codec) this.interceptorScopedExchange = result this.exchange = result @@ -463,7 +465,7 @@ class RealCall( ) } - fun retryAfterFailure(): Boolean = routePlanner!!.retryAfterFailure() + fun retryAfterFailure(): Boolean = routePlanner!!.hasFailure() && routePlanner!!.hasMoreRoutes() /** * Returns a string that describes this call. Doesn't include a full URL as that might contain diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt index 9b93791b2..ad7ddd1a2 100644 --- a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt +++ b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt @@ -190,6 +190,10 @@ internal class RealRoutePlanner( } override fun handleSuccess() = connection + + override fun cancel() { + error("unexpected cancel of reused connection") + } } /** Establish a new connection. */ @@ -238,6 +242,10 @@ internal class RealRoutePlanner( eventListener.connectionAcquired(call, connection) return connection } + + override fun cancel() { + connection.cancel() + } } override fun trackFailure(e: IOException) { @@ -250,11 +258,11 @@ internal class RealRoutePlanner( } } - override fun retryAfterFailure(): Boolean { - if (refusedStreamCount == 0 && connectionShutdownCount == 0 && otherFailureCount == 0) { - return false // Nothing to recover from. - } + override fun hasFailure(): Boolean { + return refusedStreamCount > 0 || connectionShutdownCount > 0 || otherFailureCount > 0 + } + override fun hasMoreRoutes(): Boolean { if (nextRouteToTry != null) { return true } diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RoutePlanner.kt b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RoutePlanner.kt index dd037a98a..e7828ed59 100644 --- a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RoutePlanner.kt +++ b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RoutePlanner.kt @@ -53,11 +53,11 @@ interface RoutePlanner { fun trackFailure(e: IOException) - /** - * Returns true if the current route has a failure that retrying could fix, and that there's - * a route to retry on. - */ - fun retryAfterFailure(): Boolean + /** Returns true if this planner has received any failures. */ + fun hasFailure(): Boolean + + /** Returns true if this planner has more routes to try. */ + fun hasMoreRoutes(): Boolean /** * Returns true if the host and port are unchanged from when this was created. This is used to @@ -78,5 +78,7 @@ interface RoutePlanner { fun connect() fun handleSuccess(): RealConnection + + fun cancel() } } diff --git a/okhttp/src/jvmTest/java/okhttp3/FakeRoutePlanner.kt b/okhttp/src/jvmTest/java/okhttp3/FakeRoutePlanner.kt new file mode 100644 index 000000000..ef457ea49 --- /dev/null +++ b/okhttp/src/jvmTest/java/okhttp3/FakeRoutePlanner.kt @@ -0,0 +1,87 @@ +/* + * Copyright (C) 2022 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3 + +import java.io.IOException +import java.util.concurrent.LinkedBlockingDeque +import okhttp3.internal.connection.RoutePlanner + +class FakeRoutePlanner( + val factory: TestValueFactory, +) : RoutePlanner { + private val pool = factory.newConnectionPool() + + val events = LinkedBlockingDeque() + var canceled = false + var hasFailure = false + private var nextPlanId = 0 + private var nextPlanIndex = 0 + private val plans = mutableListOf() + + override val address = factory.newAddress("example.com") + + fun addPlan(): FakePlan { + return FakePlan(nextPlanId++).also { + plans += it + } + } + + override fun isCanceled() = canceled + + override fun plan(): FakePlan { + require(nextPlanIndex < plans.size) { + "not enough plans! call addPlan() in the test to set this up" + } + val result = plans[nextPlanIndex++] + events += "take plan ${result.id}" + return result + } + + override fun trackFailure(e: IOException) { + events += "failure" + hasFailure = true + } + + override fun hasFailure() = hasFailure + + override fun hasMoreRoutes(): Boolean { + return nextPlanIndex < plans.size + } + + override fun sameHostAndPort(url: HttpUrl): Boolean { + return url.host == address.url.host && url.port == address.url.port + } + + inner class FakePlan( + val id: Int + ) : RoutePlanner.Plan { + var canceled = false + val connection = factory.newConnection(pool, factory.newRoute(address)) + + override var isConnected = false + + override fun connect() { + events += "plan $id connect" + isConnected = true + } + + override fun handleSuccess() = connection + + override fun cancel() { + events += "plan $id cancel" + } + } +} diff --git a/okhttp/src/jvmTest/java/okhttp3/HappyEyeballsTest.kt b/okhttp/src/jvmTest/java/okhttp3/FastFallbackTest.kt similarity index 89% rename from okhttp/src/jvmTest/java/okhttp3/HappyEyeballsTest.kt rename to okhttp/src/jvmTest/java/okhttp3/FastFallbackTest.kt index 143a03dd9..44887fc61 100644 --- a/okhttp/src/jvmTest/java/okhttp3/HappyEyeballsTest.kt +++ b/okhttp/src/jvmTest/java/okhttp3/FastFallbackTest.kt @@ -19,13 +19,16 @@ import java.io.IOException import java.net.Inet4Address import java.net.Inet6Address import java.net.InetAddress +import java.util.concurrent.TimeUnit import kotlin.test.assertFailsWith +import kotlin.test.fail import mockwebserver3.MockResponse import mockwebserver3.MockWebServer import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.RegisterExtension import org.opentest4j.TestAbortedException @@ -39,7 +42,8 @@ import org.opentest4j.TestAbortedException * * This test only runs on host machines that have both IPv4 and IPv6 addresses for localhost. */ -class HappyEyeballsTest { +@Timeout(30) +class FastFallbackTest { @RegisterExtension val clientTestRule = OkHttpClientTestRule() @@ -80,7 +84,9 @@ class HappyEyeballsTest { client = clientTestRule.newClientBuilder() .eventListenerFactory(clientTestRule.wrap(listener)) + .connectTimeout(60, TimeUnit.SECONDS) // Deliberately exacerbate slow fallbacks. .dns { dnsResults } + .fastFallback(true) .build() url = serverIpv4.url("/") .newBuilder() @@ -208,13 +214,6 @@ class HappyEyeballsTest { assertThat(listener.recordedEventTypes().filter { it == "ConnectFailed" }).hasSize(2) } - /** - * This test currently completes successfully, but it takes 10 second to time out connecting to - * the unreachable address. - * - * Upon implementing Happy Eyeballs we should change this test to fail if it takes that long. We - * should also extend the connect timeout beyond 10 seconds to exacerbate the problem. - */ @Test fun reachesIpv6AfterUnreachableAddress() { dnsResults = listOf( @@ -239,4 +238,33 @@ class HappyEyeballsTest { assertThat(listener.recordedEventTypes().filter { it == "ConnectStart" }).hasSize(2) assertThat(listener.recordedEventTypes().filter { it == "ConnectFailed" }).hasSize(1) } + + @Test + fun timesOutWithFastFallbackDisabled() { + dnsResults = listOf( + TestUtil.UNREACHABLE_ADDRESS.address, + localhostIpv6, + ) + serverIpv4.shutdown() + serverIpv6.enqueue( + MockResponse() + .setBody("hello from IPv6") + ) + + client = client.newBuilder() + .fastFallback(false) + .callTimeout(1_000, TimeUnit.MILLISECONDS) + .build() + val call = client.newCall( + Request.Builder() + .url(url) + .build() + ) + try { + call.execute() + fail("expected a timeout") + } catch (e: IOException) { + assertThat(e).hasMessage("timeout") + } + } } diff --git a/okhttp/src/jvmTest/java/okhttp3/internal/connection/FastFallbackExchangeFinderTest.kt b/okhttp/src/jvmTest/java/okhttp3/internal/connection/FastFallbackExchangeFinderTest.kt new file mode 100644 index 000000000..54eb35b12 --- /dev/null +++ b/okhttp/src/jvmTest/java/okhttp3/internal/connection/FastFallbackExchangeFinderTest.kt @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2022 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.internal.connection + +import okhttp3.FakeRoutePlanner +import okhttp3.TestValueFactory +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test + +/** Unit test for [FastFallbackExchangeFinder] implementation details. */ +internal class FastFallbackExchangeFinderTest { + private val factory = TestValueFactory() + private val routePlanner = FakeRoutePlanner(factory) + private val finder = FastFallbackExchangeFinder(routePlanner, factory.taskRunner) + + @Test + fun takeConnectedConnection() { + val plan0 = routePlanner.addPlan() + plan0.isConnected = true + + val result0 = finder.find() + assertThat(result0).isEqualTo(plan0.connection) + assertThat(routePlanner.events.poll()).isEqualTo("take plan 0") + assertThat(routePlanner.events.poll()).isNull() + } +}