From b2310de1a86a6f902e386a6ef910dea22883dffa Mon Sep 17 00:00:00 2001 From: Jesse Wilson Date: Thu, 10 Mar 2022 21:54:56 -0500 Subject: [PATCH] Never put pooled connections in the result queue (#7154) In a race this could cause us to prefer a new connection over a pooled connection, which violates our invariant that pooled connections are always used if they're found. Closes: https://github.com/square/okhttp/issues/7152 --- .../okhttp3/internal/concurrent/TaskFaker.kt | 12 ++++- .../connection/FastFallbackExchangeFinder.kt | 42 ++++++++-------- .../jvmTest/java/okhttp3/FakeRoutePlanner.kt | 16 ++++-- .../FastFallbackExchangeFinderTest.kt | 50 ++++++++++++++++++- 4 files changed, 93 insertions(+), 27 deletions(-) diff --git a/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt b/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt index 453efd2b1..349b6cd78 100644 --- a/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt +++ b/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt @@ -15,6 +15,8 @@ */ package okhttp3.internal.concurrent +import okhttp3.OkHttpClient +import org.assertj.core.api.Assertions.assertThat import java.io.Closeable import java.util.AbstractQueue import java.util.concurrent.BlockingQueue @@ -23,8 +25,6 @@ import java.util.concurrent.Semaphore import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.logging.Logger -import okhttp3.OkHttpClient -import org.assertj.core.api.Assertions.assertThat /** * Runs a [TaskRunner] in a controlled environment so that everything is sequential and @@ -266,6 +266,14 @@ class TaskFaker : Closeable { } } + /** + * Artificially stall until manually resumed by the test thread with [runTasks]. Use this to + * simulate races in tasks that doesn't have a deterministic sequence. + */ + fun yield() { + stall() + } + /** * This blocking queue hooks into a fake clock rather than using regular JVM timing for functions * like [poll]. It is only usable within task faker tasks. diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/FastFallbackExchangeFinder.kt b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/FastFallbackExchangeFinder.kt index 4faf55559..8090b4bdb 100644 --- a/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/FastFallbackExchangeFinder.kt +++ b/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/FastFallbackExchangeFinder.kt @@ -15,15 +15,15 @@ */ package okhttp3.internal.connection -import java.io.IOException -import java.util.concurrent.CopyOnWriteArrayList -import java.util.concurrent.LinkedBlockingDeque -import java.util.concurrent.TimeUnit import okhttp3.internal.concurrent.Task import okhttp3.internal.concurrent.TaskRunner import okhttp3.internal.connection.RoutePlanner.ConnectResult import okhttp3.internal.connection.RoutePlanner.Plan import okhttp3.internal.okHttpName +import java.io.IOException +import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.LinkedBlockingDeque +import java.util.concurrent.TimeUnit /** * Speculatively connects to each IP address of a target address, returning as soon as one of them @@ -66,14 +66,17 @@ internal class FastFallbackExchangeFinder( // Launch a new connection if we're ready to. val now = taskRunner.backend.nanoTime() var awaitTimeoutNanos = nextTcpConnectAtNanos - now + var connectResult: ConnectResult? = null if (tcpConnectsInFlight.isEmpty() || awaitTimeoutNanos <= 0) { - launchTcpConnect() + connectResult = launchTcpConnect() nextTcpConnectAtNanos = now + connectDelayNanos awaitTimeoutNanos = connectDelayNanos } // Wait for an in-flight connect to complete or fail. - var connectResult = awaitTcpConnect(awaitTimeoutNanos, TimeUnit.NANOSECONDS) ?: continue + if (connectResult == null) { + connectResult = awaitTcpConnect(awaitTimeoutNanos, TimeUnit.NANOSECONDS) ?: continue + } if (connectResult.isSuccess) { // We have a connected TCP connection. Cancel and defer the racing connects that all lost. @@ -112,7 +115,12 @@ internal class FastFallbackExchangeFinder( throw firstException!! } - private fun launchTcpConnect() { + /** + * Returns non-null if we don't need to wait for the launched result. In such cases, this result + * must be processed before whatever is waiting in the queue because we may have already acquired + * its connection. + */ + private fun launchTcpConnect(): ConnectResult? { val plan = when { deferredPlans.isNotEmpty() -> { deferredPlans.removeFirst() @@ -124,24 +132,17 @@ internal class FastFallbackExchangeFinder( FailedPlan(e) } } - else -> return // Nothing further to try. + else -> return null // Nothing further to try. } - tcpConnectsInFlight += plan + // Already connected. Return it immediately. + if (plan.isReady) return ConnectResult(plan) - // Already connected? Enqueue the result immediately. - if (plan.isReady) { - connectResults.put(ConnectResult(plan)) - return - } - - // Already failed? Enqueue the result immediately. - if (plan is FailedPlan) { - connectResults.put(plan.result) - return - } + // Already failed? Return it immediately. + if (plan is FailedPlan) return plan.result // Connect TCP asynchronously. + tcpConnectsInFlight += plan val taskName = "$okHttpName connect ${routePlanner.address.url.redact()}" taskRunner.newQueue().schedule(object : Task(taskName) { override fun runOnce(): Long { @@ -157,6 +158,7 @@ internal class FastFallbackExchangeFinder( return -1L } }) + return null } private fun awaitTcpConnect(timeout: Long, unit: TimeUnit): ConnectResult? { diff --git a/okhttp/src/jvmTest/java/okhttp3/FakeRoutePlanner.kt b/okhttp/src/jvmTest/java/okhttp3/FakeRoutePlanner.kt index 0b56359ea..e25e75a63 100644 --- a/okhttp/src/jvmTest/java/okhttp3/FakeRoutePlanner.kt +++ b/okhttp/src/jvmTest/java/okhttp3/FakeRoutePlanner.kt @@ -15,13 +15,13 @@ */ package okhttp3 -import java.io.Closeable -import java.io.IOException -import java.util.concurrent.LinkedBlockingDeque import okhttp3.internal.concurrent.TaskFaker import okhttp3.internal.connection.RealConnection import okhttp3.internal.connection.RoutePlanner import okhttp3.internal.connection.RoutePlanner.ConnectResult +import java.io.Closeable +import java.io.IOException +import java.util.concurrent.LinkedBlockingDeque class FakeRoutePlanner( private val taskFaker: TaskFaker, @@ -57,6 +57,10 @@ class FakeRoutePlanner( val result = plans[nextPlanIndex++] events += "take plan ${result.id}" + if (result.yieldBeforePlanReturns) { + taskFaker.yield() + } + val planningThrowable = result.planningThrowable if (planningThrowable != null) throw planningThrowable @@ -84,12 +88,14 @@ class FakeRoutePlanner( val connection = factory.newConnection(pool, factory.newRoute(address)) var retry: FakePlan? = null var retryTaken = false + var yieldBeforePlanReturns = false override val isReady: Boolean get() = connectState == ConnectState.TLS_CONNECTED var tcpConnectDelayNanos = 0L var tcpConnectThrowable: Throwable? = null + var yieldBeforeTcpConnectReturns = false var connectTcpNextPlan: FakePlan? = null var tlsConnectDelayNanos = 0L var tlsConnectThrowable: Throwable? = null @@ -125,6 +131,10 @@ class FakeRoutePlanner( taskFaker.sleep(tcpConnectDelayNanos) + if (yieldBeforeTcpConnectReturns) { + taskFaker.yield() + } + return when { tcpConnectThrowable != null -> { events += "plan $id TCP connect failed" diff --git a/okhttp/src/jvmTest/java/okhttp3/internal/connection/FastFallbackExchangeFinderTest.kt b/okhttp/src/jvmTest/java/okhttp3/internal/connection/FastFallbackExchangeFinderTest.kt index 2eddc33a5..5b70e1d9d 100644 --- a/okhttp/src/jvmTest/java/okhttp3/internal/connection/FastFallbackExchangeFinderTest.kt +++ b/okhttp/src/jvmTest/java/okhttp3/internal/connection/FastFallbackExchangeFinderTest.kt @@ -15,8 +15,6 @@ */ package okhttp3.internal.connection -import java.io.IOException -import java.net.UnknownServiceException import okhttp3.FakeRoutePlanner import okhttp3.FakeRoutePlanner.ConnectState.TLS_CONNECTED import okhttp3.internal.concurrent.TaskFaker @@ -24,6 +22,8 @@ import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.fail import org.junit.jupiter.api.Test +import java.io.IOException +import java.net.UnknownServiceException /** * Unit test for [FastFallbackExchangeFinder] implementation details. @@ -775,6 +775,52 @@ internal class FastFallbackExchangeFinderTest { taskFaker.assertNoMoreTasks() } + /** + * This test causes two connections to become available simultaneously, one from a TCP connect and + * one from the pool. We must take the pooled connection because by taking it from the pool, we've + * fully acquired it. + * + * This test yields threads to force the decision of plan1 to be deliberate and not lucky. In + * particular, we set up this sequence of events: + * + * 1. take plan 0 + * 3. plan 0 connects + * 4. finish taking plan 1 + * + * https://github.com/square/okhttp/issues/7152 + */ + @Test + fun reusePlanAndNewConnectRace() { + val plan0 = routePlanner.addPlan() + plan0.tcpConnectDelayNanos = 250.ms + plan0.yieldBeforeTcpConnectReturns = true // Yield so we get a chance to take plan1... + val plan1 = routePlanner.addPlan() + plan1.connectState = TLS_CONNECTED + plan1.yieldBeforePlanReturns = true // ... but let plan 0 connect before we act upon it. + + taskRunner.newQueue().execute("connect") { + val result0 = finder.find() + assertThat(result0).isEqualTo(plan0.connection) + } + + taskFaker.runTasks() + assertEvents( + "take plan 0", + "plan 0 TCP connecting...", + ) + + taskFaker.advanceUntil(250.ms) + assertEvents( + "take plan 1", + ) + + taskFaker.runTasks() + assertEvents( + "plan 0 TCP connected", + "plan 0 cancel" + ) + } + private fun assertEvents(vararg expected: String) { val actual = generateSequence { routePlanner.events.poll() }.toList() assertThat(actual).containsExactly(*expected)