mirror of
https://github.com/square/okhttp.git
synced 2026-01-12 10:23:16 +03:00
Prefer reusing the call connection over deferred plans (#7190)
I still need to figure out a test for this. Closes: https://github.com/square/okhttp/issues/7186
This commit is contained in:
@@ -15,15 +15,15 @@
|
||||
*/
|
||||
package okhttp3.internal.connection
|
||||
|
||||
import java.io.IOException
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
import java.util.concurrent.LinkedBlockingDeque
|
||||
import java.util.concurrent.TimeUnit
|
||||
import okhttp3.internal.concurrent.Task
|
||||
import okhttp3.internal.concurrent.TaskRunner
|
||||
import okhttp3.internal.connection.RoutePlanner.ConnectResult
|
||||
import okhttp3.internal.connection.RoutePlanner.Plan
|
||||
import okhttp3.internal.okHttpName
|
||||
import java.io.IOException
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
import java.util.concurrent.LinkedBlockingDeque
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/**
|
||||
* Speculatively connects to each IP address of a target address, returning as soon as one of them
|
||||
@@ -42,12 +42,6 @@ internal class FastFallbackExchangeFinder(
|
||||
*/
|
||||
private val tcpConnectsInFlight = CopyOnWriteArrayList<Plan>()
|
||||
|
||||
/**
|
||||
* These are retries of plans that were canceled when they lost a race. If the race's winner ends
|
||||
* up not working out, this is what we'll attempt first.
|
||||
*/
|
||||
private val deferredPlans = ArrayDeque<Plan>()
|
||||
|
||||
/**
|
||||
* Results are posted here as they occur. The find job is done when either one plan completes
|
||||
* successfully or all plans fail.
|
||||
@@ -57,10 +51,7 @@ internal class FastFallbackExchangeFinder(
|
||||
override fun find(): RealConnection {
|
||||
var firstException: IOException? = null
|
||||
try {
|
||||
while (tcpConnectsInFlight.isNotEmpty() ||
|
||||
deferredPlans.isNotEmpty() ||
|
||||
routePlanner.hasNext()
|
||||
) {
|
||||
while (tcpConnectsInFlight.isNotEmpty() || routePlanner.hasNext()) {
|
||||
if (routePlanner.isCanceled()) throw IOException("Canceled")
|
||||
|
||||
// Launch a new connection if we're ready to.
|
||||
@@ -105,7 +96,7 @@ internal class FastFallbackExchangeFinder(
|
||||
val nextPlan = connectResult.nextPlan
|
||||
if (nextPlan != null) {
|
||||
// Try this plan's successor before deferred plans because it won the race!
|
||||
deferredPlans.addFirst(nextPlan)
|
||||
routePlanner.deferredPlans.addFirst(nextPlan)
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
@@ -122,9 +113,6 @@ internal class FastFallbackExchangeFinder(
|
||||
*/
|
||||
private fun launchTcpConnect(): ConnectResult? {
|
||||
val plan = when {
|
||||
deferredPlans.isNotEmpty() -> {
|
||||
deferredPlans.removeFirst()
|
||||
}
|
||||
routePlanner.hasNext() -> {
|
||||
try {
|
||||
routePlanner.plan()
|
||||
@@ -175,7 +163,7 @@ internal class FastFallbackExchangeFinder(
|
||||
for (plan in tcpConnectsInFlight) {
|
||||
plan.cancel()
|
||||
val retry = plan.retry() ?: continue
|
||||
deferredPlans += retry
|
||||
routePlanner.deferredPlans.addLast(retry)
|
||||
}
|
||||
tcpConnectsInFlight.clear()
|
||||
}
|
||||
|
||||
@@ -104,7 +104,7 @@ class RealConnectionPool(
|
||||
if (connection.isHealthy(doExtensiveHealthChecks)) return connection
|
||||
|
||||
// In the second synchronized block, release the unhealthy acquired connection. We're also on
|
||||
// the hook to close this connection if its no longer in use.
|
||||
// the hook to close this connection if it's no longer in use.
|
||||
val toClose: Socket? = synchronized(connection) {
|
||||
connection.noNewExchanges = true
|
||||
call.releaseConnectionNoEvents()
|
||||
|
||||
@@ -48,6 +48,8 @@ class RealRoutePlanner(
|
||||
private var routeSelector: RouteSelector? = null
|
||||
private var nextRouteToTry: Route? = null
|
||||
|
||||
override val deferredPlans = ArrayDeque<Plan>()
|
||||
|
||||
override fun isCanceled(): Boolean = call.isCanceled()
|
||||
|
||||
@Throws(IOException::class)
|
||||
@@ -59,6 +61,9 @@ class RealRoutePlanner(
|
||||
val pooled1 = planReusePooledConnection()
|
||||
if (pooled1 != null) return pooled1
|
||||
|
||||
// Attempt a deferred plan before new routes.
|
||||
if (deferredPlans.isNotEmpty()) return deferredPlans.removeFirst()
|
||||
|
||||
// Do blocking calls to plan a route for a new connection.
|
||||
val connect = planConnect()
|
||||
|
||||
@@ -253,6 +258,10 @@ class RealRoutePlanner(
|
||||
}
|
||||
|
||||
override fun hasNext(failedConnection: RealConnection?): Boolean {
|
||||
if (deferredPlans.isNotEmpty()) {
|
||||
return true
|
||||
}
|
||||
|
||||
if (nextRouteToTry != null) {
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -30,7 +30,11 @@ import okhttp3.HttpUrl
|
||||
* possible for shared exchanges to make requests to different host names! See
|
||||
* [RealConnection.isEligible] for details.
|
||||
*
|
||||
* 3. If there's no existing connection, make a list of routes (which may require blocking DNS
|
||||
* 3. Attempt plans from prior connect attempts for this call. These occur as either follow-ups to
|
||||
* failed connect attempts (such as trying the next [ConnectionSpec]), or as attempts that lost
|
||||
* a race in fast follow-up.
|
||||
*
|
||||
* 4. If there's no existing connection, make a list of routes (which may require blocking DNS
|
||||
* lookups) and attempt a new connection them. When failures occur, retries iterate the list of
|
||||
* available routes.
|
||||
*
|
||||
@@ -45,6 +49,9 @@ import okhttp3.HttpUrl
|
||||
interface RoutePlanner {
|
||||
val address: Address
|
||||
|
||||
/** Follow-ups for failed plans and plans that lost a race. */
|
||||
val deferredPlans: ArrayDeque<Plan>
|
||||
|
||||
fun isCanceled(): Boolean
|
||||
|
||||
/** Returns a plan to attempt. */
|
||||
@@ -68,7 +75,7 @@ interface RoutePlanner {
|
||||
|
||||
/**
|
||||
* A plan holds either an immediately-usable connection, or one that must be connected first.
|
||||
* These steps are split so callers can call [connect] on a background thread if attempting
|
||||
* These steps are split so callers can call [connectTcp] on a background thread if attempting
|
||||
* multiple plans concurrently.
|
||||
*/
|
||||
interface Plan {
|
||||
|
||||
@@ -23,19 +23,11 @@ internal class SequentialExchangeFinder(
|
||||
) : ExchangeFinder {
|
||||
override fun find(): RealConnection {
|
||||
var firstException: IOException? = null
|
||||
var queuedPlan: RoutePlanner.Plan? = null
|
||||
while (true) {
|
||||
if (routePlanner.isCanceled()) throw IOException("Canceled")
|
||||
|
||||
try {
|
||||
val plan = when {
|
||||
queuedPlan != null -> {
|
||||
val result = queuedPlan
|
||||
queuedPlan = null
|
||||
result
|
||||
}
|
||||
else -> routePlanner.plan()
|
||||
}
|
||||
val plan = routePlanner.plan()
|
||||
|
||||
if (!plan.isReady) {
|
||||
val tcpConnectResult = plan.connectTcp()
|
||||
@@ -46,9 +38,11 @@ internal class SequentialExchangeFinder(
|
||||
|
||||
val (_, nextPlan, failure) = connectResult
|
||||
|
||||
queuedPlan = nextPlan
|
||||
if (failure != null) throw failure
|
||||
if (nextPlan != null) continue
|
||||
if (nextPlan != null) {
|
||||
routePlanner.deferredPlans.addFirst(nextPlan)
|
||||
continue
|
||||
}
|
||||
}
|
||||
return plan.handleSuccess()
|
||||
} catch (e: IOException) {
|
||||
@@ -57,7 +51,7 @@ internal class SequentialExchangeFinder(
|
||||
} else {
|
||||
firstException.addSuppressed(e)
|
||||
}
|
||||
if (queuedPlan == null && !routePlanner.hasNext()) {
|
||||
if (!routePlanner.hasNext()) {
|
||||
throw firstException
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,13 +15,13 @@
|
||||
*/
|
||||
package okhttp3
|
||||
|
||||
import java.io.Closeable
|
||||
import java.io.IOException
|
||||
import java.util.concurrent.LinkedBlockingDeque
|
||||
import okhttp3.internal.concurrent.TaskFaker
|
||||
import okhttp3.internal.connection.RealConnection
|
||||
import okhttp3.internal.connection.RoutePlanner
|
||||
import okhttp3.internal.connection.RoutePlanner.ConnectResult
|
||||
import java.io.Closeable
|
||||
import java.io.IOException
|
||||
import java.util.concurrent.LinkedBlockingDeque
|
||||
|
||||
class FakeRoutePlanner(
|
||||
private val taskFaker: TaskFaker,
|
||||
@@ -40,6 +40,8 @@ class FakeRoutePlanner(
|
||||
private var nextPlanIndex = 0
|
||||
private val plans = mutableListOf<FakePlan>()
|
||||
|
||||
override val deferredPlans = ArrayDeque<RoutePlanner.Plan>()
|
||||
|
||||
override val address = factory.newAddress("example.com")
|
||||
|
||||
fun addPlan(): FakePlan {
|
||||
@@ -51,6 +53,9 @@ class FakeRoutePlanner(
|
||||
override fun isCanceled() = canceled
|
||||
|
||||
override fun plan(): FakePlan {
|
||||
// Return deferred plans preferentially. These don't require addPlan().
|
||||
if (deferredPlans.isNotEmpty()) return deferredPlans.removeFirst() as FakePlan
|
||||
|
||||
require(nextPlanIndex < plans.size) {
|
||||
"not enough plans! call addPlan() in the test to set this up"
|
||||
}
|
||||
@@ -68,7 +73,7 @@ class FakeRoutePlanner(
|
||||
}
|
||||
|
||||
override fun hasNext(failedConnection: RealConnection?): Boolean {
|
||||
return nextPlanIndex < plans.size
|
||||
return deferredPlans.isNotEmpty() || nextPlanIndex < plans.size
|
||||
}
|
||||
|
||||
override fun sameHostAndPort(url: HttpUrl): Boolean {
|
||||
|
||||
Reference in New Issue
Block a user