1
0
mirror of https://github.com/square/okhttp.git synced 2025-07-31 05:04:26 +03:00

HappyEyeballs (#7035)

This doesn't yet introduce any mechanism to enable or disable
happy eyeballs.

It also doesn't sort IP addresses to alternate IPv6, IPv4
for best success.

It also doesn't limit how many connections are attempted
simultaneously.

It also lacks an appropriate number of tests.
This commit is contained in:
Jesse Wilson
2022-01-28 11:11:05 -05:00
committed by GitHub
parent d4b5c9eac8
commit f15c81b4e9
11 changed files with 359 additions and 37 deletions

View File

@ -17,20 +17,21 @@ package okhttp3.logging;
import java.io.IOException; import java.io.IOException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import mockwebserver3.MockResponse;
import mockwebserver3.MockWebServer;
import mockwebserver3.SocketPolicy;
import mockwebserver3.junit5.internal.MockWebServerExtension; import mockwebserver3.junit5.internal.MockWebServerExtension;
import okhttp3.Call; import okhttp3.Call;
import okhttp3.EventListener; import okhttp3.EventListener;
import okhttp3.HttpUrl; import okhttp3.HttpUrl;
import okhttp3.MediaType; import okhttp3.MediaType;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import okhttp3.TestUtil; import okhttp3.OkHttpClientTestRule;
import okhttp3.testing.PlatformRule;
import okhttp3.Request; import okhttp3.Request;
import okhttp3.RequestBody; import okhttp3.RequestBody;
import okhttp3.Response; import okhttp3.Response;
import mockwebserver3.MockResponse; import okhttp3.TestUtil;
import mockwebserver3.MockWebServer; import okhttp3.testing.PlatformRule;
import mockwebserver3.SocketPolicy;
import okhttp3.tls.HandshakeCertificates; import okhttp3.tls.HandshakeCertificates;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -49,6 +50,7 @@ public final class LoggingEventListenerTest {
private static final MediaType PLAIN = MediaType.get("text/plain"); private static final MediaType PLAIN = MediaType.get("text/plain");
@RegisterExtension public final PlatformRule platform = new PlatformRule(); @RegisterExtension public final PlatformRule platform = new PlatformRule();
@RegisterExtension public final OkHttpClientTestRule clientTestRule = new OkHttpClientTestRule();
private MockWebServer server; private MockWebServer server;
private final HandshakeCertificates handshakeCertificates = localhost(); private final HandshakeCertificates handshakeCertificates = localhost();
@ -61,13 +63,12 @@ public final class LoggingEventListenerTest {
@BeforeEach @BeforeEach
public void setUp(MockWebServer server) { public void setUp(MockWebServer server) {
this.server = server; this.server = server;
client = this.client = clientTestRule.newClientBuilder()
new OkHttpClient.Builder() .eventListenerFactory(loggingEventListenerFactory)
.eventListenerFactory(loggingEventListenerFactory) .sslSocketFactory(handshakeCertificates.sslSocketFactory(),
.sslSocketFactory( handshakeCertificates.trustManager())
handshakeCertificates.sslSocketFactory(), handshakeCertificates.trustManager()) .retryOnConnectionFailure(false)
.retryOnConnectionFailure(false) .build();
.build();
url = server.url("/"); url = server.url("/");
} }

View File

@ -114,9 +114,10 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback {
var client = testClient var client = testClient
if (client == null) { if (client == null) {
client = OkHttpClient.Builder() client = OkHttpClient.Builder()
.dns(SINGLE_INET_ADDRESS_DNS) // Prevent unexpected fallback addresses. .fastFallback(true) // Test this by default, since it'll soon be the default.
.eventListenerFactory { ClientRuleEventListener(logger = ::addEvent) } .dns(SINGLE_INET_ADDRESS_DNS) // Prevent unexpected fallback addresses.
.build() .eventListenerFactory { ClientRuleEventListener(logger = ::addEvent) }
.build()
testClient = client testClient = client
} }
return client return client

View File

@ -158,6 +158,8 @@ open class OkHttpClient internal constructor(
@get:JvmName("retryOnConnectionFailure") val retryOnConnectionFailure: Boolean = @get:JvmName("retryOnConnectionFailure") val retryOnConnectionFailure: Boolean =
builder.retryOnConnectionFailure builder.retryOnConnectionFailure
@get:JvmName("fastFallback") val fastFallback: Boolean = builder.fastFallback
@get:JvmName("authenticator") val authenticator: Authenticator = builder.authenticator @get:JvmName("authenticator") val authenticator: Authenticator = builder.authenticator
@get:JvmName("followRedirects") val followRedirects: Boolean = builder.followRedirects @get:JvmName("followRedirects") val followRedirects: Boolean = builder.followRedirects
@ -510,6 +512,7 @@ open class OkHttpClient internal constructor(
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf() internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory() internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
internal var retryOnConnectionFailure = true internal var retryOnConnectionFailure = true
internal var fastFallback = false
internal var authenticator: Authenticator = Authenticator.NONE internal var authenticator: Authenticator = Authenticator.NONE
internal var followRedirects = true internal var followRedirects = true
internal var followSslRedirects = true internal var followSslRedirects = true
@ -543,6 +546,7 @@ open class OkHttpClient internal constructor(
this.networkInterceptors += okHttpClient.networkInterceptors this.networkInterceptors += okHttpClient.networkInterceptors
this.eventListenerFactory = okHttpClient.eventListenerFactory this.eventListenerFactory = okHttpClient.eventListenerFactory
this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure
this.fastFallback = okHttpClient.fastFallback
this.authenticator = okHttpClient.authenticator this.authenticator = okHttpClient.authenticator
this.followRedirects = okHttpClient.followRedirects this.followRedirects = okHttpClient.followRedirects
this.followSslRedirects = okHttpClient.followSslRedirects this.followSslRedirects = okHttpClient.followSslRedirects
@ -658,6 +662,19 @@ open class OkHttpClient internal constructor(
this.retryOnConnectionFailure = retryOnConnectionFailure this.retryOnConnectionFailure = retryOnConnectionFailure
} }
/**
* Configure this client to perform fast fallbacks by attempting multiple connections
* concurrently, returning once any connection connects successfully.
*
* This implements Happy Eyeballs ([RFC 6555][rfc_6555]), balancing connect latency vs.
* wasted resources.
*
* [rfc_6555]: https://datatracker.ietf.org/doc/html/rfc6555
*/
fun fastFallback(fastFallback: Boolean) = apply {
this.fastFallback = fastFallback
}
/** /**
* Sets the authenticator used to respond to challenges from origin servers. Use * Sets the authenticator used to respond to challenges from origin servers. Use
* [proxyAuthenticator] to set the authenticator for proxy servers. * [proxyAuthenticator] to set the authenticator for proxy servers.

View File

@ -39,7 +39,7 @@ internal class ExchangeFinder(
} else { } else {
firstException.addSuppressed(e) firstException.addSuppressed(e)
} }
if (!routePlanner.retryAfterFailure()) { if (!routePlanner.hasMoreRoutes()) {
throw firstException throw firstException
} }
} }

View File

@ -0,0 +1,137 @@
/*
* Copyright (C) 2022 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3.internal.connection
import java.io.IOException
import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.TimeUnit
import okhttp3.internal.concurrent.Task
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.connection.RoutePlanner.Plan
import okhttp3.internal.okHttpName
/**
* Speculatively connects to each IP address of a target address, returning as soon as one of them
* connects successfully. This kicks off new attempts every 250 ms until a connect succeeds.
*/
internal class FastFallbackExchangeFinder(
private val routePlanner: RoutePlanner,
private val taskRunner: TaskRunner,
) {
private val connectDelayMillis = 250L
/** Plans currently being connected, and that will later be added to [connectResults]. */
private var connectsInFlight = mutableListOf<Plan>()
/**
* Results are posted here as they occur. The find job is done when either one plan completes
* successfully or all plans fail.
*/
private val connectResults = LinkedBlockingDeque<ConnectResult>()
/** Exceptions accumulate here. */
private var firstException: IOException? = null
/** True until we've launched all the connects we'll ever launch. */
private var morePlansExist = true
fun find(): RealConnection {
try {
while (morePlansExist || connectsInFlight.isNotEmpty()) {
if (routePlanner.isCanceled()) throw IOException("Canceled")
launchConnect()
val connection = awaitConnection()
if (connection != null) return connection
morePlansExist = morePlansExist && routePlanner.hasMoreRoutes()
}
throw firstException!!
} finally {
for (plan in connectsInFlight) {
plan.cancel()
}
}
}
private fun launchConnect() {
if (!morePlansExist) return
val plan = try {
routePlanner.plan()
} catch (e: IOException) {
trackFailure(e)
return
}
connectsInFlight += plan
// Already connected? Enqueue the result immediately.
if (plan.isConnected) {
connectResults.put(ConnectResult(plan, null))
return
}
// Connect asynchronously.
val taskName = "$okHttpName connect ${routePlanner.address.url.redact()}"
taskRunner.newQueue().schedule(object : Task(taskName) {
override fun runOnce(): Long {
try {
plan.connect()
connectResults.put(ConnectResult(plan, null))
} catch (e: Throwable) {
connectResults.put(ConnectResult(plan, e))
}
return -1L
}
})
}
private fun awaitConnection(): RealConnection? {
if (connectsInFlight.isEmpty()) return null
val completed = connectResults.poll(connectDelayMillis, TimeUnit.MILLISECONDS) ?: return null
connectsInFlight.remove(completed.plan)
val exception = completed.throwable
if (exception is IOException) {
trackFailure(exception)
return null
} else if (exception != null) {
throw exception
}
return completed.plan.handleSuccess()
}
private fun trackFailure(exception: IOException) {
routePlanner.trackFailure(exception)
if (firstException == null) {
firstException = exception
} else {
firstException!!.addSuppressed(exception)
}
}
private class ConnectResult(
val plan: Plan,
val throwable: Throwable?,
)
}

View File

@ -260,9 +260,11 @@ class RealCall(
} }
val routePlanner = this.routePlanner!! val routePlanner = this.routePlanner!!
val codec = ExchangeFinder(routePlanner) val connection = when {
.find() client.fastFallback -> FastFallbackExchangeFinder(routePlanner, client.taskRunner).find()
.newCodec(client, chain) else -> ExchangeFinder(routePlanner).find()
}
val codec = connection.newCodec(client, chain)
val result = Exchange(this, eventListener, routePlanner, codec) val result = Exchange(this, eventListener, routePlanner, codec)
this.interceptorScopedExchange = result this.interceptorScopedExchange = result
this.exchange = result this.exchange = result
@ -463,7 +465,7 @@ class RealCall(
) )
} }
fun retryAfterFailure(): Boolean = routePlanner!!.retryAfterFailure() fun retryAfterFailure(): Boolean = routePlanner!!.hasFailure() && routePlanner!!.hasMoreRoutes()
/** /**
* Returns a string that describes this call. Doesn't include a full URL as that might contain * Returns a string that describes this call. Doesn't include a full URL as that might contain

View File

@ -190,6 +190,10 @@ internal class RealRoutePlanner(
} }
override fun handleSuccess() = connection override fun handleSuccess() = connection
override fun cancel() {
error("unexpected cancel of reused connection")
}
} }
/** Establish a new connection. */ /** Establish a new connection. */
@ -238,6 +242,10 @@ internal class RealRoutePlanner(
eventListener.connectionAcquired(call, connection) eventListener.connectionAcquired(call, connection)
return connection return connection
} }
override fun cancel() {
connection.cancel()
}
} }
override fun trackFailure(e: IOException) { override fun trackFailure(e: IOException) {
@ -250,11 +258,11 @@ internal class RealRoutePlanner(
} }
} }
override fun retryAfterFailure(): Boolean { override fun hasFailure(): Boolean {
if (refusedStreamCount == 0 && connectionShutdownCount == 0 && otherFailureCount == 0) { return refusedStreamCount > 0 || connectionShutdownCount > 0 || otherFailureCount > 0
return false // Nothing to recover from. }
}
override fun hasMoreRoutes(): Boolean {
if (nextRouteToTry != null) { if (nextRouteToTry != null) {
return true return true
} }

View File

@ -53,11 +53,11 @@ interface RoutePlanner {
fun trackFailure(e: IOException) fun trackFailure(e: IOException)
/** /** Returns true if this planner has received any failures. */
* Returns true if the current route has a failure that retrying could fix, and that there's fun hasFailure(): Boolean
* a route to retry on.
*/ /** Returns true if this planner has more routes to try. */
fun retryAfterFailure(): Boolean fun hasMoreRoutes(): Boolean
/** /**
* Returns true if the host and port are unchanged from when this was created. This is used to * Returns true if the host and port are unchanged from when this was created. This is used to
@ -78,5 +78,7 @@ interface RoutePlanner {
fun connect() fun connect()
fun handleSuccess(): RealConnection fun handleSuccess(): RealConnection
fun cancel()
} }
} }

View File

@ -0,0 +1,87 @@
/*
* Copyright (C) 2022 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3
import java.io.IOException
import java.util.concurrent.LinkedBlockingDeque
import okhttp3.internal.connection.RoutePlanner
class FakeRoutePlanner(
val factory: TestValueFactory,
) : RoutePlanner {
private val pool = factory.newConnectionPool()
val events = LinkedBlockingDeque<String>()
var canceled = false
var hasFailure = false
private var nextPlanId = 0
private var nextPlanIndex = 0
private val plans = mutableListOf<FakePlan>()
override val address = factory.newAddress("example.com")
fun addPlan(): FakePlan {
return FakePlan(nextPlanId++).also {
plans += it
}
}
override fun isCanceled() = canceled
override fun plan(): FakePlan {
require(nextPlanIndex < plans.size) {
"not enough plans! call addPlan() in the test to set this up"
}
val result = plans[nextPlanIndex++]
events += "take plan ${result.id}"
return result
}
override fun trackFailure(e: IOException) {
events += "failure"
hasFailure = true
}
override fun hasFailure() = hasFailure
override fun hasMoreRoutes(): Boolean {
return nextPlanIndex < plans.size
}
override fun sameHostAndPort(url: HttpUrl): Boolean {
return url.host == address.url.host && url.port == address.url.port
}
inner class FakePlan(
val id: Int
) : RoutePlanner.Plan {
var canceled = false
val connection = factory.newConnection(pool, factory.newRoute(address))
override var isConnected = false
override fun connect() {
events += "plan $id connect"
isConnected = true
}
override fun handleSuccess() = connection
override fun cancel() {
events += "plan $id cancel"
}
}
}

View File

@ -19,13 +19,16 @@ import java.io.IOException
import java.net.Inet4Address import java.net.Inet4Address
import java.net.Inet6Address import java.net.Inet6Address
import java.net.InetAddress import java.net.InetAddress
import java.util.concurrent.TimeUnit
import kotlin.test.assertFailsWith import kotlin.test.assertFailsWith
import kotlin.test.fail
import mockwebserver3.MockResponse import mockwebserver3.MockResponse
import mockwebserver3.MockWebServer import mockwebserver3.MockWebServer
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.extension.RegisterExtension
import org.opentest4j.TestAbortedException import org.opentest4j.TestAbortedException
@ -39,7 +42,8 @@ import org.opentest4j.TestAbortedException
* *
* This test only runs on host machines that have both IPv4 and IPv6 addresses for localhost. * This test only runs on host machines that have both IPv4 and IPv6 addresses for localhost.
*/ */
class HappyEyeballsTest { @Timeout(30)
class FastFallbackTest {
@RegisterExtension @RegisterExtension
val clientTestRule = OkHttpClientTestRule() val clientTestRule = OkHttpClientTestRule()
@ -80,7 +84,9 @@ class HappyEyeballsTest {
client = clientTestRule.newClientBuilder() client = clientTestRule.newClientBuilder()
.eventListenerFactory(clientTestRule.wrap(listener)) .eventListenerFactory(clientTestRule.wrap(listener))
.connectTimeout(60, TimeUnit.SECONDS) // Deliberately exacerbate slow fallbacks.
.dns { dnsResults } .dns { dnsResults }
.fastFallback(true)
.build() .build()
url = serverIpv4.url("/") url = serverIpv4.url("/")
.newBuilder() .newBuilder()
@ -208,13 +214,6 @@ class HappyEyeballsTest {
assertThat(listener.recordedEventTypes().filter { it == "ConnectFailed" }).hasSize(2) assertThat(listener.recordedEventTypes().filter { it == "ConnectFailed" }).hasSize(2)
} }
/**
* This test currently completes successfully, but it takes 10 second to time out connecting to
* the unreachable address.
*
* Upon implementing Happy Eyeballs we should change this test to fail if it takes that long. We
* should also extend the connect timeout beyond 10 seconds to exacerbate the problem.
*/
@Test @Test
fun reachesIpv6AfterUnreachableAddress() { fun reachesIpv6AfterUnreachableAddress() {
dnsResults = listOf( dnsResults = listOf(
@ -239,4 +238,33 @@ class HappyEyeballsTest {
assertThat(listener.recordedEventTypes().filter { it == "ConnectStart" }).hasSize(2) assertThat(listener.recordedEventTypes().filter { it == "ConnectStart" }).hasSize(2)
assertThat(listener.recordedEventTypes().filter { it == "ConnectFailed" }).hasSize(1) assertThat(listener.recordedEventTypes().filter { it == "ConnectFailed" }).hasSize(1)
} }
@Test
fun timesOutWithFastFallbackDisabled() {
dnsResults = listOf(
TestUtil.UNREACHABLE_ADDRESS.address,
localhostIpv6,
)
serverIpv4.shutdown()
serverIpv6.enqueue(
MockResponse()
.setBody("hello from IPv6")
)
client = client.newBuilder()
.fastFallback(false)
.callTimeout(1_000, TimeUnit.MILLISECONDS)
.build()
val call = client.newCall(
Request.Builder()
.url(url)
.build()
)
try {
call.execute()
fail("expected a timeout")
} catch (e: IOException) {
assertThat(e).hasMessage("timeout")
}
}
} }

View File

@ -0,0 +1,39 @@
/*
* Copyright (C) 2022 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3.internal.connection
import okhttp3.FakeRoutePlanner
import okhttp3.TestValueFactory
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
/** Unit test for [FastFallbackExchangeFinder] implementation details. */
internal class FastFallbackExchangeFinderTest {
private val factory = TestValueFactory()
private val routePlanner = FakeRoutePlanner(factory)
private val finder = FastFallbackExchangeFinder(routePlanner, factory.taskRunner)
@Test
fun takeConnectedConnection() {
val plan0 = routePlanner.addPlan()
plan0.isConnected = true
val result0 = finder.find()
assertThat(result0).isEqualTo(plan0.connection)
assertThat(routePlanner.events.poll()).isEqualTo("take plan 0")
assertThat(routePlanner.events.poll()).isNull()
}
}