diff --git a/okhttp-logging-interceptor/src/test/java/okhttp3/logging/LoggingEventListenerTest.java b/okhttp-logging-interceptor/src/test/java/okhttp3/logging/LoggingEventListenerTest.java index 592f799a4..85a1de599 100644 --- a/okhttp-logging-interceptor/src/test/java/okhttp3/logging/LoggingEventListenerTest.java +++ b/okhttp-logging-interceptor/src/test/java/okhttp3/logging/LoggingEventListenerTest.java @@ -17,20 +17,21 @@ package okhttp3.logging; import java.io.IOException; import java.net.UnknownHostException; +import mockwebserver3.MockResponse; +import mockwebserver3.MockWebServer; +import mockwebserver3.SocketPolicy; import mockwebserver3.junit5.internal.MockWebServerExtension; import okhttp3.Call; import okhttp3.EventListener; import okhttp3.HttpUrl; import okhttp3.MediaType; import okhttp3.OkHttpClient; -import okhttp3.TestUtil; -import okhttp3.testing.PlatformRule; +import okhttp3.OkHttpClientTestRule; import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; -import mockwebserver3.MockResponse; -import mockwebserver3.MockWebServer; -import mockwebserver3.SocketPolicy; +import okhttp3.TestUtil; +import okhttp3.testing.PlatformRule; import okhttp3.tls.HandshakeCertificates; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -49,6 +50,7 @@ public final class LoggingEventListenerTest { private static final MediaType PLAIN = MediaType.get("text/plain"); @RegisterExtension public final PlatformRule platform = new PlatformRule(); + @RegisterExtension public final OkHttpClientTestRule clientTestRule = new OkHttpClientTestRule(); private MockWebServer server; private final HandshakeCertificates handshakeCertificates = localhost(); @@ -61,13 +63,12 @@ public final class LoggingEventListenerTest { @BeforeEach public void setUp(MockWebServer server) { this.server = server; - client = - new OkHttpClient.Builder() - .eventListenerFactory(loggingEventListenerFactory) - .sslSocketFactory( - handshakeCertificates.sslSocketFactory(), handshakeCertificates.trustManager()) - .retryOnConnectionFailure(false) - .build(); + this.client = clientTestRule.newClientBuilder() + .eventListenerFactory(loggingEventListenerFactory) + .sslSocketFactory(handshakeCertificates.sslSocketFactory(), + handshakeCertificates.trustManager()) + .retryOnConnectionFailure(false) + .build(); url = server.url("/"); } diff --git a/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt b/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt index dc781f188..3b5d5b079 100644 --- a/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt +++ b/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt @@ -114,9 +114,10 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback { var client = testClient if (client == null) { client = OkHttpClient.Builder() - .dns(SINGLE_INET_ADDRESS_DNS) // Prevent unexpected fallback addresses. - .eventListenerFactory { ClientRuleEventListener(logger = ::addEvent) } - .build() + .fastFallback(true) // Test this by default, since it'll soon be the default. + .dns(SINGLE_INET_ADDRESS_DNS) // Prevent unexpected fallback addresses. + .eventListenerFactory { ClientRuleEventListener(logger = ::addEvent) } + .build() testClient = client } return client diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/OkHttpClient.kt b/okhttp/src/jvmMain/kotlin/okhttp3/OkHttpClient.kt index 7f89f752d..ddc07d100 100644 --- a/okhttp/src/jvmMain/kotlin/okhttp3/OkHttpClient.kt +++ b/okhttp/src/jvmMain/kotlin/okhttp3/OkHttpClient.kt @@ -158,6 +158,8 @@ open class OkHttpClient internal constructor( @get:JvmName("retryOnConnectionFailure") val retryOnConnectionFailure: Boolean = builder.retryOnConnectionFailure + @get:JvmName("fastFallback") val fastFallback: Boolean = builder.fastFallback + @get:JvmName("authenticator") val authenticator: Authenticator = builder.authenticator @get:JvmName("followRedirects") val followRedirects: Boolean = builder.followRedirects @@ -510,6 +512,7 @@ open class OkHttpClient internal constructor( internal val networkInterceptors: MutableList = mutableListOf() internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory() internal var retryOnConnectionFailure = true + internal var fastFallback = false internal var authenticator: Authenticator = Authenticator.NONE internal var followRedirects = true internal var followSslRedirects = true @@ -543,6 +546,7 @@ open class OkHttpClient internal constructor( this.networkInterceptors += okHttpClient.networkInterceptors this.eventListenerFactory = okHttpClient.eventListenerFactory this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure + this.fastFallback = okHttpClient.fastFallback this.authenticator = okHttpClient.authenticator this.followRedirects = okHttpClient.followRedirects this.followSslRedirects = okHttpClient.followSslRedirects @@ -658,6 +662,19 @@ open class OkHttpClient internal constructor( this.retryOnConnectionFailure = retryOnConnectionFailure } + /** + * Configure this client to perform fast fallbacks by attempting multiple connections + * concurrently, returning once any connection connects successfully. + * + * This implements Happy Eyeballs ([RFC 6555][rfc_6555]), balancing connect latency vs. + * wasted resources. + * + * [rfc_6555]: https://datatracker.ietf.org/doc/html/rfc6555 + */ + fun fastFallback(fastFallback: Boolean) = apply { + this.fastFallback = fastFallback + } + /** * Sets the authenticator used to respond to challenges from origin servers. Use * [proxyAuthenticator] to set the authenticator for proxy servers. diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/ExchangeFinder.kt b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/ExchangeFinder.kt index 3abcc9b29..bd5890b5c 100644 --- a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/ExchangeFinder.kt +++ b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/ExchangeFinder.kt @@ -39,7 +39,7 @@ internal class ExchangeFinder( } else { firstException.addSuppressed(e) } - if (!routePlanner.retryAfterFailure()) { + if (!routePlanner.hasMoreRoutes()) { throw firstException } } diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/FastFallbackExchangeFinder.kt b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/FastFallbackExchangeFinder.kt new file mode 100644 index 000000000..693416084 --- /dev/null +++ b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/FastFallbackExchangeFinder.kt @@ -0,0 +1,137 @@ +/* + * Copyright (C) 2022 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.internal.connection + +import java.io.IOException +import java.util.concurrent.LinkedBlockingDeque +import java.util.concurrent.TimeUnit +import okhttp3.internal.concurrent.Task +import okhttp3.internal.concurrent.TaskRunner +import okhttp3.internal.connection.RoutePlanner.Plan +import okhttp3.internal.okHttpName + +/** + * Speculatively connects to each IP address of a target address, returning as soon as one of them + * connects successfully. This kicks off new attempts every 250 ms until a connect succeeds. + */ +internal class FastFallbackExchangeFinder( + private val routePlanner: RoutePlanner, + private val taskRunner: TaskRunner, +) { + private val connectDelayMillis = 250L + + /** Plans currently being connected, and that will later be added to [connectResults]. */ + private var connectsInFlight = mutableListOf() + + /** + * Results are posted here as they occur. The find job is done when either one plan completes + * successfully or all plans fail. + */ + private val connectResults = LinkedBlockingDeque() + + /** Exceptions accumulate here. */ + private var firstException: IOException? = null + + /** True until we've launched all the connects we'll ever launch. */ + private var morePlansExist = true + + fun find(): RealConnection { + try { + while (morePlansExist || connectsInFlight.isNotEmpty()) { + if (routePlanner.isCanceled()) throw IOException("Canceled") + + launchConnect() + + val connection = awaitConnection() + if (connection != null) return connection + + morePlansExist = morePlansExist && routePlanner.hasMoreRoutes() + } + + throw firstException!! + } finally { + for (plan in connectsInFlight) { + plan.cancel() + } + } + } + + private fun launchConnect() { + if (!morePlansExist) return + + val plan = try { + routePlanner.plan() + } catch (e: IOException) { + trackFailure(e) + return + } + + connectsInFlight += plan + + // Already connected? Enqueue the result immediately. + if (plan.isConnected) { + connectResults.put(ConnectResult(plan, null)) + return + } + + // Connect asynchronously. + val taskName = "$okHttpName connect ${routePlanner.address.url.redact()}" + taskRunner.newQueue().schedule(object : Task(taskName) { + override fun runOnce(): Long { + try { + plan.connect() + connectResults.put(ConnectResult(plan, null)) + } catch (e: Throwable) { + connectResults.put(ConnectResult(plan, e)) + } + return -1L + } + }) + } + + private fun awaitConnection(): RealConnection? { + if (connectsInFlight.isEmpty()) return null + + val completed = connectResults.poll(connectDelayMillis, TimeUnit.MILLISECONDS) ?: return null + + connectsInFlight.remove(completed.plan) + + val exception = completed.throwable + if (exception is IOException) { + trackFailure(exception) + return null + } else if (exception != null) { + throw exception + } + + return completed.plan.handleSuccess() + } + + private fun trackFailure(exception: IOException) { + routePlanner.trackFailure(exception) + + if (firstException == null) { + firstException = exception + } else { + firstException!!.addSuppressed(exception) + } + } + + private class ConnectResult( + val plan: Plan, + val throwable: Throwable?, + ) +} diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealCall.kt b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealCall.kt index c7191e931..c38caa43a 100644 --- a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealCall.kt +++ b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealCall.kt @@ -260,9 +260,11 @@ class RealCall( } val routePlanner = this.routePlanner!! - val codec = ExchangeFinder(routePlanner) - .find() - .newCodec(client, chain) + val connection = when { + client.fastFallback -> FastFallbackExchangeFinder(routePlanner, client.taskRunner).find() + else -> ExchangeFinder(routePlanner).find() + } + val codec = connection.newCodec(client, chain) val result = Exchange(this, eventListener, routePlanner, codec) this.interceptorScopedExchange = result this.exchange = result @@ -463,7 +465,7 @@ class RealCall( ) } - fun retryAfterFailure(): Boolean = routePlanner!!.retryAfterFailure() + fun retryAfterFailure(): Boolean = routePlanner!!.hasFailure() && routePlanner!!.hasMoreRoutes() /** * Returns a string that describes this call. Doesn't include a full URL as that might contain diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt index 9b93791b2..ad7ddd1a2 100644 --- a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt +++ b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt @@ -190,6 +190,10 @@ internal class RealRoutePlanner( } override fun handleSuccess() = connection + + override fun cancel() { + error("unexpected cancel of reused connection") + } } /** Establish a new connection. */ @@ -238,6 +242,10 @@ internal class RealRoutePlanner( eventListener.connectionAcquired(call, connection) return connection } + + override fun cancel() { + connection.cancel() + } } override fun trackFailure(e: IOException) { @@ -250,11 +258,11 @@ internal class RealRoutePlanner( } } - override fun retryAfterFailure(): Boolean { - if (refusedStreamCount == 0 && connectionShutdownCount == 0 && otherFailureCount == 0) { - return false // Nothing to recover from. - } + override fun hasFailure(): Boolean { + return refusedStreamCount > 0 || connectionShutdownCount > 0 || otherFailureCount > 0 + } + override fun hasMoreRoutes(): Boolean { if (nextRouteToTry != null) { return true } diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RoutePlanner.kt b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RoutePlanner.kt index dd037a98a..e7828ed59 100644 --- a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RoutePlanner.kt +++ b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RoutePlanner.kt @@ -53,11 +53,11 @@ interface RoutePlanner { fun trackFailure(e: IOException) - /** - * Returns true if the current route has a failure that retrying could fix, and that there's - * a route to retry on. - */ - fun retryAfterFailure(): Boolean + /** Returns true if this planner has received any failures. */ + fun hasFailure(): Boolean + + /** Returns true if this planner has more routes to try. */ + fun hasMoreRoutes(): Boolean /** * Returns true if the host and port are unchanged from when this was created. This is used to @@ -78,5 +78,7 @@ interface RoutePlanner { fun connect() fun handleSuccess(): RealConnection + + fun cancel() } } diff --git a/okhttp/src/jvmTest/java/okhttp3/FakeRoutePlanner.kt b/okhttp/src/jvmTest/java/okhttp3/FakeRoutePlanner.kt new file mode 100644 index 000000000..ef457ea49 --- /dev/null +++ b/okhttp/src/jvmTest/java/okhttp3/FakeRoutePlanner.kt @@ -0,0 +1,87 @@ +/* + * Copyright (C) 2022 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3 + +import java.io.IOException +import java.util.concurrent.LinkedBlockingDeque +import okhttp3.internal.connection.RoutePlanner + +class FakeRoutePlanner( + val factory: TestValueFactory, +) : RoutePlanner { + private val pool = factory.newConnectionPool() + + val events = LinkedBlockingDeque() + var canceled = false + var hasFailure = false + private var nextPlanId = 0 + private var nextPlanIndex = 0 + private val plans = mutableListOf() + + override val address = factory.newAddress("example.com") + + fun addPlan(): FakePlan { + return FakePlan(nextPlanId++).also { + plans += it + } + } + + override fun isCanceled() = canceled + + override fun plan(): FakePlan { + require(nextPlanIndex < plans.size) { + "not enough plans! call addPlan() in the test to set this up" + } + val result = plans[nextPlanIndex++] + events += "take plan ${result.id}" + return result + } + + override fun trackFailure(e: IOException) { + events += "failure" + hasFailure = true + } + + override fun hasFailure() = hasFailure + + override fun hasMoreRoutes(): Boolean { + return nextPlanIndex < plans.size + } + + override fun sameHostAndPort(url: HttpUrl): Boolean { + return url.host == address.url.host && url.port == address.url.port + } + + inner class FakePlan( + val id: Int + ) : RoutePlanner.Plan { + var canceled = false + val connection = factory.newConnection(pool, factory.newRoute(address)) + + override var isConnected = false + + override fun connect() { + events += "plan $id connect" + isConnected = true + } + + override fun handleSuccess() = connection + + override fun cancel() { + events += "plan $id cancel" + } + } +} diff --git a/okhttp/src/jvmTest/java/okhttp3/HappyEyeballsTest.kt b/okhttp/src/jvmTest/java/okhttp3/FastFallbackTest.kt similarity index 89% rename from okhttp/src/jvmTest/java/okhttp3/HappyEyeballsTest.kt rename to okhttp/src/jvmTest/java/okhttp3/FastFallbackTest.kt index 143a03dd9..44887fc61 100644 --- a/okhttp/src/jvmTest/java/okhttp3/HappyEyeballsTest.kt +++ b/okhttp/src/jvmTest/java/okhttp3/FastFallbackTest.kt @@ -19,13 +19,16 @@ import java.io.IOException import java.net.Inet4Address import java.net.Inet6Address import java.net.InetAddress +import java.util.concurrent.TimeUnit import kotlin.test.assertFailsWith +import kotlin.test.fail import mockwebserver3.MockResponse import mockwebserver3.MockWebServer import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.RegisterExtension import org.opentest4j.TestAbortedException @@ -39,7 +42,8 @@ import org.opentest4j.TestAbortedException * * This test only runs on host machines that have both IPv4 and IPv6 addresses for localhost. */ -class HappyEyeballsTest { +@Timeout(30) +class FastFallbackTest { @RegisterExtension val clientTestRule = OkHttpClientTestRule() @@ -80,7 +84,9 @@ class HappyEyeballsTest { client = clientTestRule.newClientBuilder() .eventListenerFactory(clientTestRule.wrap(listener)) + .connectTimeout(60, TimeUnit.SECONDS) // Deliberately exacerbate slow fallbacks. .dns { dnsResults } + .fastFallback(true) .build() url = serverIpv4.url("/") .newBuilder() @@ -208,13 +214,6 @@ class HappyEyeballsTest { assertThat(listener.recordedEventTypes().filter { it == "ConnectFailed" }).hasSize(2) } - /** - * This test currently completes successfully, but it takes 10 second to time out connecting to - * the unreachable address. - * - * Upon implementing Happy Eyeballs we should change this test to fail if it takes that long. We - * should also extend the connect timeout beyond 10 seconds to exacerbate the problem. - */ @Test fun reachesIpv6AfterUnreachableAddress() { dnsResults = listOf( @@ -239,4 +238,33 @@ class HappyEyeballsTest { assertThat(listener.recordedEventTypes().filter { it == "ConnectStart" }).hasSize(2) assertThat(listener.recordedEventTypes().filter { it == "ConnectFailed" }).hasSize(1) } + + @Test + fun timesOutWithFastFallbackDisabled() { + dnsResults = listOf( + TestUtil.UNREACHABLE_ADDRESS.address, + localhostIpv6, + ) + serverIpv4.shutdown() + serverIpv6.enqueue( + MockResponse() + .setBody("hello from IPv6") + ) + + client = client.newBuilder() + .fastFallback(false) + .callTimeout(1_000, TimeUnit.MILLISECONDS) + .build() + val call = client.newCall( + Request.Builder() + .url(url) + .build() + ) + try { + call.execute() + fail("expected a timeout") + } catch (e: IOException) { + assertThat(e).hasMessage("timeout") + } + } } diff --git a/okhttp/src/jvmTest/java/okhttp3/internal/connection/FastFallbackExchangeFinderTest.kt b/okhttp/src/jvmTest/java/okhttp3/internal/connection/FastFallbackExchangeFinderTest.kt new file mode 100644 index 000000000..54eb35b12 --- /dev/null +++ b/okhttp/src/jvmTest/java/okhttp3/internal/connection/FastFallbackExchangeFinderTest.kt @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2022 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.internal.connection + +import okhttp3.FakeRoutePlanner +import okhttp3.TestValueFactory +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test + +/** Unit test for [FastFallbackExchangeFinder] implementation details. */ +internal class FastFallbackExchangeFinderTest { + private val factory = TestValueFactory() + private val routePlanner = FakeRoutePlanner(factory) + private val finder = FastFallbackExchangeFinder(routePlanner, factory.taskRunner) + + @Test + fun takeConnectedConnection() { + val plan0 = routePlanner.addPlan() + plan0.isConnected = true + + val result0 = finder.find() + assertThat(result0).isEqualTo(plan0.connection) + assertThat(routePlanner.events.poll()).isEqualTo("take plan 0") + assertThat(routePlanner.events.poll()).isNull() + } +}