1
0
mirror of https://github.com/square/okhttp.git synced 2025-08-07 12:42:57 +03:00

Use finer-grained locks in the connection pool

Previously we were locking the RealConnectionPool for all
connection-related state, including cancelation.

This changes the locks to be per-Connection, leaning heavily
on thread-confined fields in RealCall, Exchange, and
ExchangeFinder.
This commit is contained in:
Jesse Wilson
2020-05-24 23:18:56 -04:00
parent cfdeb570c2
commit bafad8c998
6 changed files with 360 additions and 373 deletions

View File

@@ -69,30 +69,26 @@ This is necessary for bookkeeping when creating new streams. Correct framing req
## Connection Pool ## Connection Pool
### Background
A primary responsibility for any HTTP client is to efficiently manage network connections. Creating and establishing new connections require a fair amount of overhead and added latency. OkHttp will make every effort to reuse existing connections to avoid this overhead and added latency. A primary responsibility for any HTTP client is to efficiently manage network connections. Creating and establishing new connections require a fair amount of overhead and added latency. OkHttp will make every effort to reuse existing connections to avoid this overhead and added latency.
Every OkHttpClient uses a connection pool. Its job is to maintain a reference to all open connections. When an HTTP request is started, OkHttp will attempt to reuse an existing connection from the pool. If there are no existing connections, a new one is created and put into the connection pool. For http/2, the connection can be reused immediately. For http/1, the request must be completed before it can be reused. Every OkHttpClient uses a connection pool. Its job is to maintain a reference to all open connections. When an HTTP request is started, OkHttp will attempt to reuse an existing connection from the pool. If there are no existing connections, a new one is created and put into the connection pool. For HTTP/2, the connection can be reused immediately. For HTTP/1, the request must be completed before it can be reused.
Since HTTP requests frequently happen in parallel, the connection pool implementation must be thread-safe. Since HTTP requests frequently happen in parallel, connection pooling must be thread-safe.
### ConnectionPool, RealConnection, and StreamAllocation These are the primary classes involved with establishing, sharing, and terminating connections:
The primary classes involved with establishing, sharing and terminating connections are ConnectionPool, RealConnection and StreamAllocation. * **RealConnectionPool** manages reuse of HTTP and HTTP/2 connections for reduced latency. Every OkHttpClient has one, and its lifetime spans the lifetime of the OkHttpClient.
**ConnectionPool**: Manages reuse of HTTP and HTTP/2 connections for reduced latency. Every OkHttpClient has one, and its lifetime spans the lifetime of the OkHttpClient. * **RealConnection** is the socket and streams of an HTTP/1 or HTTP/2 connection. These are created on demand to fulfill HTTP requests. They may be reused for many HTTP request/response exchanges. Their lifetime is typically shorter than a connection pool.
**RealConnection**: The socket and streams of an HTTP and HTTP/2 connection. These are created on demand to fulfill HTTP requests. They may be reused for many HTTP request/response exchanges. Their lifetime is typically shorter than ConnectionPool. * **Exchange** carries a single HTTP request/response pair.
**StreamAllocation**: Coordinates the relationship between connections, streams and calls. These are created for a single HTTP request/response exchange. Their lifetime is typically shorter than RealConnection. * **ExchangeFinder** chooses which connection carries each exchange. Where possible it will use the same connection for all exchanges in a single call. It prefers reusing pooled connections over establishing new connections.
### Locks #### Per-Connection Locks
A single lock is used to synchronize and guard the state of ConnectionPool, RealConnection and StreamAllocation. Each connection has its own lock. The connections in the pool are all in a `ConcurrentLinkedQueue`. Due to data races, iterators of this queue may return removed connections. Callers must check the connection's `noNewExchanges` property before using connections from the pool.
### ConnectionPool The connection lock is never held while doing I/O (even closing a socket) to prevent contention.
The fields in ConnectionPool, RealConnection and StreamAllocation are all guarded by the connection pool instance. This lock is never held while doing I/O (even closing a socket) to prevent contention. A lock-per-connection is used to maximize concurrency.
A single lock is preferred to avoid deadlock scenarios and the added overhead of aggregate lock/unlock that would occur if multiple locks were used.

View File

@@ -22,8 +22,6 @@ import okhttp3.EventListener
import okhttp3.HttpUrl import okhttp3.HttpUrl
import okhttp3.OkHttpClient import okhttp3.OkHttpClient
import okhttp3.Route import okhttp3.Route
import okhttp3.internal.assertThreadDoesntHoldLock
import okhttp3.internal.assertThreadHoldsLock
import okhttp3.internal.canReuseConnectionFor import okhttp3.internal.canReuseConnectionFor
import okhttp3.internal.closeQuietly import okhttp3.internal.closeQuietly
import okhttp3.internal.http.ExchangeCodec import okhttp3.internal.http.ExchangeCodec
@@ -51,6 +49,9 @@ import okhttp3.internal.http2.StreamResetException
* will prefer pooled connections. Only pooled HTTP/2 connections are used for such de-duplication. * will prefer pooled connections. Only pooled HTTP/2 connections are used for such de-duplication.
* *
* It is possible to cancel the finding process. * It is possible to cancel the finding process.
*
* Instances of this class are not thread-safe. Each instance is thread-confined to the thread
* executing [call].
*/ */
class ExchangeFinder( class ExchangeFinder(
private val connectionPool: RealConnectionPool, private val connectionPool: RealConnectionPool,
@@ -59,10 +60,7 @@ class ExchangeFinder(
private val eventListener: EventListener private val eventListener: EventListener
) { ) {
private var routeSelection: RouteSelector.Selection? = null private var routeSelection: RouteSelector.Selection? = null
// State guarded by connectionPool.
private var routeSelector: RouteSelector? = null private var routeSelector: RouteSelector? = null
private var connectingConnection: RealConnection? = null
private var refusedStreamCount = 0 private var refusedStreamCount = 0
private var connectionShutdownCount = 0 private var connectionShutdownCount = 0
private var otherFailureCount = 0 private var otherFailureCount = 0
@@ -123,23 +121,23 @@ class ExchangeFinder(
// Make sure we have some routes left to try. One example where we may exhaust all the routes // Make sure we have some routes left to try. One example where we may exhaust all the routes
// would happen if we made a new connection and it immediately is detected as unhealthy. // would happen if we made a new connection and it immediately is detected as unhealthy.
synchronized(connectionPool) { if (nextRouteToTry != null) continue
if (nextRouteToTry != null) return@synchronized
val routesLeft = routeSelection?.hasNext() ?: true val routesLeft = routeSelection?.hasNext() ?: true
if (routesLeft) return@synchronized if (routesLeft) continue
val routesSelectionLeft = routeSelector?.hasNext() ?: true val routesSelectionLeft = routeSelector?.hasNext() ?: true
if (routesSelectionLeft) return@synchronized if (routesSelectionLeft) continue
throw IOException("exhausted all routes") throw IOException("exhausted all routes")
}
} }
} }
/** /**
* Returns a connection to host a new stream. This prefers the existing connection if it exists, * Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection. * then the pool, finally building a new connection.
*
* This checks for cancellation before each blocking operation.
*/ */
@Throws(IOException::class) @Throws(IOException::class)
private fun findConnection( private fun findConnection(
@@ -149,156 +147,123 @@ class ExchangeFinder(
pingIntervalMillis: Int, pingIntervalMillis: Int,
connectionRetryEnabled: Boolean connectionRetryEnabled: Boolean
): RealConnection { ): RealConnection {
var foundPooledConnection = false if (call.isCanceled()) throw IOException("Canceled")
var result: RealConnection? = null
var selectedRoute: Route? = null
var releasedConnection: RealConnection?
val toClose: Socket?
synchronized(connectionPool) {
if (call.isCanceled()) throw IOException("Canceled")
val callConnection = call.connection // changes within this overall method // Attempt to reuse the connection from the call.
releasedConnection = callConnection val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
toClose = if (callConnection != null && (callConnection.noNewExchanges || if (callConnection != null) {
!sameHostAndPort(callConnection.route().address.url))) { var toClose: Socket? = null
call.releaseConnectionNoEvents() synchronized(callConnection) {
} else { if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
null toClose = call.releaseConnectionNoEvents()
}
if (call.connection != null) {
// We had an already-allocated connection and it's good.
result = call.connection
releasedConnection = null
}
if (result == null) {
// The connection hasn't had any problems for this call.
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
// Attempt to get a connection from the pool.
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
foundPooledConnection = true
result = call.connection
} else if (nextRouteToTry != null) {
selectedRoute = nextRouteToTry
nextRouteToTry = null
} }
} }
}
toClose?.closeQuietly()
if (releasedConnection != null) { // If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here
eventListener.connectionReleased(call, releasedConnection!!) // because we already acquired it.
} if (call.connection != null) {
if (foundPooledConnection) { check(toClose == null)
eventListener.connectionAcquired(call, result!!) return callConnection
} }
if (result != null) {
// If we found an already-allocated or pooled connection, we're done. // The call's connection was released.
return result!! toClose?.closeQuietly()
eventListener.connectionReleased(call, callConnection)
} }
// If we need a route selection, make one. This is a blocking operation. // We need a new connection. Give it fresh stats.
var newRouteSelection = false refusedStreamCount = 0
if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) { connectionShutdownCount = 0
otherFailureCount = 0
// Attempt to get a connection from the pool.
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
// Nothing in the pool. Figure out what route we'll try next.
val routes: List<Route>?
val route: Route
if (nextRouteToTry != null) {
// Use a route from a preceding coalesced connection.
routes = null
route = nextRouteToTry!!
nextRouteToTry = null
} else if (routeSelection != null && routeSelection!!.hasNext()) {
// Use a route from an existing route selection.
routes = null
route = routeSelection!!.next()
} else {
// Compute a new route selection. This is a blocking operation!
var localRouteSelector = routeSelector var localRouteSelector = routeSelector
if (localRouteSelector == null) { if (localRouteSelector == null) {
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener) localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector this.routeSelector = localRouteSelector
} }
newRouteSelection = true val localRouteSelection = localRouteSelector.next()
routeSelection = localRouteSelector.next() routeSelection = localRouteSelection
} routes = localRouteSelection.routes
var routes: List<Route>? = null
synchronized(connectionPool) {
if (call.isCanceled()) throw IOException("Canceled") if (call.isCanceled()) throw IOException("Canceled")
if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from
// Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. We have a better chance of matching thanks to connection coalescing.
// the pool. This could match due to connection coalescing. if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
routes = routeSelection!!.routes val result = call.connection!!
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) { eventListener.connectionAcquired(call, result)
foundPooledConnection = true return result
result = call.connection
}
} }
if (!foundPooledConnection) { route = localRouteSelection.next()
if (selectedRoute == null) {
selectedRoute = routeSelection!!.next()
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
result = RealConnection(connectionPool, selectedRoute!!)
connectingConnection = result
}
} }
// If we found a pooled connection on the 2nd time around, we're done. // Connect. Tell the call about the connecting call so async cancels work.
if (foundPooledConnection) { val newConnection = RealConnection(connectionPool, route)
eventListener.connectionAcquired(call, result!!) call.connectionToCancel = newConnection
return result!! try {
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())
// If we raced another call connecting to this host, coalesce the connections. This makes for 3
// different lookups in the connection pool!
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
nextRouteToTry = route
newConnection.socket().closeQuietly()
eventListener.connectionAcquired(call, result)
return result
} }
// Do TCP + TLS handshakes. This is a blocking operation. synchronized(newConnection) {
result!!.connect( connectionPool.put(newConnection)
connectTimeout, call.acquireConnectionNoEvents(newConnection)
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
call.client.routeDatabase.connected(result!!.route())
var socket: Socket? = null
synchronized(connectionPool) {
connectingConnection = null
// Last attempt at connection coalescing, which only occurs if we attempted multiple
// concurrent connections to the same host.
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
// We lost the race! Close the connection we created and return the pooled connection.
result!!.noNewExchanges = true
socket = result!!.socket()
result = call.connection
// It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
// that case we will retry the route we just successfully connected with.
nextRouteToTry = selectedRoute
} else {
connectionPool.put(result!!)
call.acquireConnectionNoEvents(result!!)
}
} }
socket?.closeQuietly()
eventListener.connectionAcquired(call, result!!) eventListener.connectionAcquired(call, newConnection)
return result!! return newConnection
}
fun connectingConnection(): RealConnection? {
connectionPool.assertThreadHoldsLock()
return connectingConnection
} }
fun trackFailure(e: IOException) { fun trackFailure(e: IOException) {
connectionPool.assertThreadDoesntHoldLock() nextRouteToTry = null
if (e is StreamResetException && e.errorCode == ErrorCode.REFUSED_STREAM) {
synchronized(connectionPool) { refusedStreamCount++
nextRouteToTry = null } else if (e is ConnectionShutdownException) {
if (e is StreamResetException && e.errorCode == ErrorCode.REFUSED_STREAM) { connectionShutdownCount++
refusedStreamCount++ } else {
} else if (e is ConnectionShutdownException) { otherFailureCount++
connectionShutdownCount++
} else {
otherFailureCount++
}
} }
} }
@@ -307,46 +272,48 @@ class ExchangeFinder(
* a route to retry on. * a route to retry on.
*/ */
fun retryAfterFailure(): Boolean { fun retryAfterFailure(): Boolean {
synchronized(connectionPool) { if (refusedStreamCount == 0 && connectionShutdownCount == 0 && otherFailureCount == 0) {
if (refusedStreamCount == 0 && connectionShutdownCount == 0 && otherFailureCount == 0) { return false // Nothing to recover from.
return false // Nothing to recover from.
}
if (nextRouteToTry != null) {
return true
}
if (retryCurrentRoute()) {
// Lock in the route because retryCurrentRoute() is racy and we don't want to call it twice.
nextRouteToTry = call.connection!!.route()
return true
}
// If we have a routes left, use 'em.
if (routeSelection?.hasNext() == true) return true
// If we haven't initialized the route selector yet, assume it'll have at least one route.
val localRouteSelector = routeSelector ?: return true
// If we do have a route selector, use its routes.
return localRouteSelector.hasNext()
} }
if (nextRouteToTry != null) {
return true
}
val retryRoute = retryRoute()
if (retryRoute != null) {
// Lock in the route because retryRoute() is racy and we don't want to call it twice.
nextRouteToTry = retryRoute
return true
}
// If we have a routes left, use 'em.
if (routeSelection?.hasNext() == true) return true
// If we haven't initialized the route selector yet, assume it'll have at least one route.
val localRouteSelector = routeSelector ?: return true
// If we do have a route selector, use its routes.
return localRouteSelector.hasNext()
} }
/** /**
* Return true if the route used for the current connection should be retried, even if the * Return the route from the current connection if it should be retried, even if the connection
* connection itself is unhealthy. The biggest gotcha here is that we shouldn't reuse routes from * itself is unhealthy. The biggest gotcha here is that we shouldn't reuse routes from coalesced
* coalesced connections. * connections.
*/ */
private fun retryCurrentRoute(): Boolean { private fun retryRoute(): Route? {
if (refusedStreamCount > 1 || connectionShutdownCount > 1 || otherFailureCount > 0) { if (refusedStreamCount > 1 || connectionShutdownCount > 1 || otherFailureCount > 0) {
return false // This route has too many problems to retry. return null // This route has too many problems to retry.
} }
val connection = call.connection val connection = call.connection ?: return null
return connection != null &&
connection.routeFailureCount == 0 && synchronized(connection) {
connection.route().address.url.canReuseConnectionFor(address.url) if (connection.routeFailureCount != 0) return null
if (!connection.route().address.url.canReuseConnectionFor(address.url)) return null
return connection.route()
}
} }
/** /**

View File

@@ -22,6 +22,7 @@ import java.net.Socket
import java.util.concurrent.ExecutorService import java.util.concurrent.ExecutorService
import java.util.concurrent.RejectedExecutionException import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import javax.net.ssl.HostnameVerifier import javax.net.ssl.HostnameVerifier
import javax.net.ssl.SSLSocketFactory import javax.net.ssl.SSLSocketFactory
@@ -29,7 +30,6 @@ import okhttp3.Address
import okhttp3.Call import okhttp3.Call
import okhttp3.Callback import okhttp3.Callback
import okhttp3.CertificatePinner import okhttp3.CertificatePinner
import okhttp3.Connection
import okhttp3.EventListener import okhttp3.EventListener
import okhttp3.HttpUrl import okhttp3.HttpUrl
import okhttp3.Interceptor import okhttp3.Interceptor
@@ -75,24 +75,23 @@ class RealCall(
timeout(client.callTimeoutMillis.toLong(), MILLISECONDS) timeout(client.callTimeoutMillis.toLong(), MILLISECONDS)
} }
private val executed = AtomicBoolean()
// These properties are only accessed by the thread executing the call.
/** Initialized in [callStart]. */ /** Initialized in [callStart]. */
private var callStackTrace: Any? = null private var callStackTrace: Any? = null
/** Finds an exchange to send the next request and receive the next response. */ /** Finds an exchange to send the next request and receive the next response. */
private var exchangeFinder: ExchangeFinder? = null private var exchangeFinder: ExchangeFinder? = null
// Guarded by connectionPool.
var connection: RealConnection? = null var connection: RealConnection? = null
private var exchange: Exchange? = null private set
private var exchangeRequestDone = false private var exchangeRequestDone = false
private var exchangeResponseDone = false private var exchangeResponseDone = false
private var canceled = false
private var timeoutEarlyExit = false private var timeoutEarlyExit = false
private var noMoreExchanges = false private var noMoreExchanges = false
// Guarded by this.
private var executed = false
/** /**
* This is the same value as [exchange], but scoped to the execution of the network interceptors. * This is the same value as [exchange], but scoped to the execution of the network interceptors.
* The [exchange] field is assigned to null when its streams end, which may be before or after the * The [exchange] field is assigned to null when its streams end, which may be before or after the
@@ -101,6 +100,13 @@ class RealCall(
internal var interceptorScopedExchange: Exchange? = null internal var interceptorScopedExchange: Exchange? = null
private set private set
// These properties are accessed by canceling threads. Any thread can cancel a call, and once it's
// canceled it's canceled forever.
@Volatile private var canceled = false
@Volatile private var exchange: Exchange? = null
@Volatile var connectionToCancel: RealConnection? = null
override fun timeout() = timeout override fun timeout() = timeout
@SuppressWarnings("CloneDoesntCallSuperClone") // We are a final type & this saves clearing state. @SuppressWarnings("CloneDoesntCallSuperClone") // We are a final type & this saves clearing state.
@@ -118,29 +124,20 @@ class RealCall(
* if a socket connection is being established, that is terminated. * if a socket connection is being established, that is terminated.
*/ */
override fun cancel() { override fun cancel() {
val exchangeToCancel: Exchange? if (canceled) return // Already canceled.
val connectionToCancel: RealConnection?
synchronized(connectionPool) { canceled = true
if (canceled) return // Already canceled. exchange?.cancel()
canceled = true connectionToCancel?.cancel()
exchangeToCancel = exchange
connectionToCancel = exchangeFinder?.connectingConnection() ?: connection
}
exchangeToCancel?.cancel() ?: connectionToCancel?.cancel()
eventListener.canceled(this) eventListener.canceled(this)
} }
override fun isCanceled(): Boolean { override fun isCanceled() = canceled
synchronized(connectionPool) {
return canceled
}
}
override fun execute(): Response { override fun execute(): Response {
synchronized(this) { check(executed.compareAndSet(false, true)) { "Already Executed" }
check(!executed) { "Already Executed" }
executed = true
}
timeout.enter() timeout.enter()
callStart() callStart()
try { try {
@@ -152,15 +149,13 @@ class RealCall(
} }
override fun enqueue(responseCallback: Callback) { override fun enqueue(responseCallback: Callback) {
synchronized(this) { check(executed.compareAndSet(false, true)) { "Already Executed" }
check(!executed) { "Already Executed" }
executed = true
}
callStart() callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback)) client.dispatcher.enqueue(AsyncCall(responseCallback))
} }
@Synchronized override fun isExecuted(): Boolean = executed override fun isExecuted(): Boolean = executed.get()
private fun callStart() { private fun callStart() {
this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()") this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
@@ -237,25 +232,23 @@ class RealCall(
/** Finds a new or pooled connection to carry a forthcoming request and response. */ /** Finds a new or pooled connection to carry a forthcoming request and response. */
internal fun initExchange(chain: RealInterceptorChain): Exchange { internal fun initExchange(chain: RealInterceptorChain): Exchange {
synchronized(connectionPool) { check(!noMoreExchanges) { "released" }
check(!noMoreExchanges) { "released" } check(exchange == null)
check(exchange == null)
}
val codec = exchangeFinder!!.find(client, chain) val exchangeFinder = this.exchangeFinder!!
val result = Exchange(this, eventListener, exchangeFinder!!, codec) val codec = exchangeFinder.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
this.interceptorScopedExchange = result this.interceptorScopedExchange = result
this.exchange = result
this.exchangeRequestDone = false
this.exchangeResponseDone = false
synchronized(connectionPool) { if (canceled) throw IOException("Canceled")
this.exchange = result return result
this.exchangeRequestDone = false
this.exchangeResponseDone = false
return result
}
} }
fun acquireConnectionNoEvents(connection: RealConnection) { fun acquireConnectionNoEvents(connection: RealConnection) {
connectionPool.assertThreadHoldsLock() connection.assertThreadHoldsLock()
check(this.connection == null) check(this.connection == null)
this.connection = connection this.connection = connection
@@ -277,37 +270,28 @@ class RealCall(
e: E e: E
): E { ): E {
var result = e var result = e
var exchangeDone = false if (exchange != this.exchange) {
synchronized(connectionPool) { return result // This exchange was detached violently!
if (exchange != this.exchange) {
return result // This exchange was detached violently!
}
var changed = false
if (requestDone) {
if (!exchangeRequestDone) changed = true
this.exchangeRequestDone = true
}
if (responseDone) {
if (!exchangeResponseDone) changed = true
this.exchangeResponseDone = true
}
if (exchangeRequestDone && exchangeResponseDone && changed) {
exchangeDone = true
this.exchange!!.connection.successCount++
this.exchange = null
}
} }
if (exchangeDone) { var changed = false
result = maybeReleaseConnection(result, false) if (requestDone) {
if (!exchangeRequestDone) changed = true
this.exchangeRequestDone = true
}
if (responseDone) {
if (!exchangeResponseDone) changed = true
this.exchangeResponseDone = true
}
if (exchangeRequestDone && exchangeResponseDone && changed) {
this.exchange = null
result = maybeReleaseConnection(result, incrementSuccesses = true)
} }
return result return result
} }
internal fun noMoreExchanges(e: IOException?): IOException? { internal fun noMoreExchanges(e: IOException?): IOException? {
synchronized(connectionPool) { noMoreExchanges = true
noMoreExchanges = true return maybeReleaseConnection(e, incrementSuccesses = false)
}
return maybeReleaseConnection(e, false)
} }
/** /**
@@ -316,31 +300,30 @@ class RealCall(
* *
* If the call was canceled or timed out, this will wrap [e] in an exception that provides that * If the call was canceled or timed out, this will wrap [e] in an exception that provides that
* additional context. Otherwise [e] is returned as-is. * additional context. Otherwise [e] is returned as-is.
*
* @param force true to release the connection even if more exchanges are expected for the call.
*/ */
private fun <E : IOException?> maybeReleaseConnection(e: E, force: Boolean): E { private fun <E : IOException?> maybeReleaseConnection(e: E, incrementSuccesses: Boolean): E {
var result = e var result = e
val socket: Socket? val connection = this.connection
var releasedConnection: Connection? if (connection != null) {
val callEnd: Boolean connection.assertThreadDoesntHoldLock()
synchronized(connectionPool) { var socket: Socket? = null
check(!force || exchange == null) { "cannot release connection while it is in use" } synchronized(connection) {
releasedConnection = this.connection if (incrementSuccesses) {
socket = if (this.connection != null && exchange == null && (force || noMoreExchanges)) { connection.successCount++
releaseConnectionNoEvents() }
} else { if (exchange == null && noMoreExchanges) {
null socket = releaseConnectionNoEvents() // Sets this.connection to null.
}
}
if (this.connection == null) {
socket?.closeQuietly()
eventListener.connectionReleased(this, connection)
} else {
check(socket == null) // If we still have a connection we shouldn't be closing any sockets.
} }
if (this.connection != null) releasedConnection = null
callEnd = noMoreExchanges && exchange == null
}
socket?.closeQuietly()
if (releasedConnection != null) {
eventListener.connectionReleased(this, releasedConnection!!)
} }
val callEnd = noMoreExchanges && exchange == null
if (callEnd) { if (callEnd) {
val callFailed = result != null val callFailed = result != null
result = timeoutExit(result) result = timeoutExit(result)
@@ -358,19 +341,20 @@ class RealCall(
* should close. * should close.
*/ */
internal fun releaseConnectionNoEvents(): Socket? { internal fun releaseConnectionNoEvents(): Socket? {
connectionPool.assertThreadHoldsLock() val connection = this.connection!!
connection.assertThreadHoldsLock()
val index = connection!!.calls.indexOfFirst { it.get() == this@RealCall } val calls = connection.calls
val index = calls.indexOfFirst { it.get() == this@RealCall }
check(index != -1) check(index != -1)
val released = this.connection calls.removeAt(index)
released!!.calls.removeAt(index)
this.connection = null this.connection = null
if (released.calls.isEmpty()) { if (calls.isEmpty()) {
released.idleAtNs = System.nanoTime() connection.idleAtNs = System.nanoTime()
if (connectionPool.connectionBecameIdle(released)) { if (connectionPool.connectionBecameIdle(connection)) {
return released.socket() return connection.socket()
} }
} }

View File

@@ -46,6 +46,7 @@ import okhttp3.Response
import okhttp3.Route import okhttp3.Route
import okhttp3.internal.EMPTY_RESPONSE import okhttp3.internal.EMPTY_RESPONSE
import okhttp3.internal.assertThreadDoesntHoldLock import okhttp3.internal.assertThreadDoesntHoldLock
import okhttp3.internal.assertThreadHoldsLock
import okhttp3.internal.closeQuietly import okhttp3.internal.closeQuietly
import okhttp3.internal.concurrent.TaskRunner import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.http.ExchangeCodec import okhttp3.internal.http.ExchangeCodec
@@ -70,12 +71,23 @@ import okio.buffer
import okio.sink import okio.sink
import okio.source import okio.source
/**
* A connection to a remote web server capable of carrying 1 or more concurrent streams.
*
* A connection's lifecycle has two phases.
*
* 1. While it's connecting, the connection is owned by a single call using single thread. In this
* phase the connection is not shared and no locking is necessary.
*
* 2. Once connected, a connection is shared to a connection pool. In this phase accesses to the
* connection's state must be guarded by holding a lock on the connection.
*/
class RealConnection( class RealConnection(
val connectionPool: RealConnectionPool, val connectionPool: RealConnectionPool,
private val route: Route private val route: Route
) : Http2Connection.Listener(), Connection { ) : Http2Connection.Listener(), Connection {
// The fields below are initialized by connect() and never reassigned. // These properties are initialized by connect() and never reassigned.
/** The low-level TCP socket. */ /** The low-level TCP socket. */
private var rawSocket: Socket? = null private var rawSocket: Socket? = null
@@ -91,11 +103,15 @@ class RealConnection(
private var source: BufferedSource? = null private var source: BufferedSource? = null
private var sink: BufferedSink? = null private var sink: BufferedSink? = null
// The fields below track connection state and are guarded by connectionPool. // These properties are guarded by this.
/** /**
* If true, no new exchanges can be created on this connection. Once true this is always true. * If true, no new exchanges can be created on this connection. It is necessary to set this to
* Guarded by [connectionPool]. * true when removing a connection from the pool; otherwise a racing caller might get it from the
* pool when it shouldn't. Symmetrically, this must always be checked before returning a
* connection from the pool.
*
* Once true this is always true. Guarded by this.
*/ */
var noNewExchanges = false var noNewExchanges = false
@@ -107,7 +123,7 @@ class RealConnection(
/** /**
* The number of times there was a problem establishing a stream that could be due to route * The number of times there was a problem establishing a stream that could be due to route
* chosen. Guarded by [connectionPool]. * chosen. Guarded by this.
*/ */
internal var routeFailureCount = 0 internal var routeFailureCount = 0
@@ -135,18 +151,18 @@ class RealConnection(
/** Prevent further exchanges from being created on this connection. */ /** Prevent further exchanges from being created on this connection. */
fun noNewExchanges() { fun noNewExchanges() {
connectionPool.assertThreadDoesntHoldLock() assertThreadDoesntHoldLock()
synchronized(connectionPool) { synchronized(this) {
noNewExchanges = true noNewExchanges = true
} }
} }
/** Prevent this connection from being used for hosts other than the one in [route]. */ /** Prevent this connection from being used for hosts other than the one in [route]. */
fun noCoalescedConnections() { fun noCoalescedConnections() {
connectionPool.assertThreadDoesntHoldLock() assertThreadDoesntHoldLock()
synchronized(connectionPool) { synchronized(this) {
noCoalescedConnections = true noCoalescedConnections = true
} }
} }
@@ -514,6 +530,8 @@ class RealConnection(
* `route` is the resolved route for a connection. * `route` is the resolved route for a connection.
*/ */
internal fun isEligible(address: Address, routes: List<Route>?): Boolean { internal fun isEligible(address: Address, routes: List<Route>?): Boolean {
assertThreadHoldsLock()
// If this connection is not accepting new exchanges, we're done. // If this connection is not accepting new exchanges, we're done.
if (calls.size >= allocationLimit || noNewExchanges) return false if (calls.size >= allocationLimit || noNewExchanges) return false
@@ -564,7 +582,9 @@ class RealConnection(
} }
} }
fun supportsUrl(url: HttpUrl): Boolean { private fun supportsUrl(url: HttpUrl): Boolean {
assertThreadHoldsLock()
val routeUrl = route.address.url val routeUrl = route.address.url
if (url.port != routeUrl.port) { if (url.port != routeUrl.port) {
@@ -629,6 +649,8 @@ class RealConnection(
/** Returns true if this connection is ready to host new streams. */ /** Returns true if this connection is ready to host new streams. */
fun isHealthy(doExtensiveChecks: Boolean): Boolean { fun isHealthy(doExtensiveChecks: Boolean): Boolean {
assertThreadDoesntHoldLock()
val nowNs = System.nanoTime() val nowNs = System.nanoTime()
val rawSocket = this.rawSocket!! val rawSocket = this.rawSocket!!
@@ -644,7 +666,7 @@ class RealConnection(
return http2Connection.isHealthy(nowNs) return http2Connection.isHealthy(nowNs)
} }
val idleDurationNs = nowNs - idleAtNs val idleDurationNs = synchronized(this) { nowNs - idleAtNs }
if (idleDurationNs >= IDLE_CONNECTION_HEALTHY_NS && doExtensiveChecks) { if (idleDurationNs >= IDLE_CONNECTION_HEALTHY_NS && doExtensiveChecks) {
return socket.isHealthy(source) return socket.isHealthy(source)
} }
@@ -660,7 +682,7 @@ class RealConnection(
/** When settings are received, adjust the allocation limit. */ /** When settings are received, adjust the allocation limit. */
override fun onSettings(connection: Http2Connection, settings: Settings) { override fun onSettings(connection: Http2Connection, settings: Settings) {
synchronized(connectionPool) { synchronized(this) {
allocationLimit = settings.getMaxConcurrentStreams() allocationLimit = settings.getMaxConcurrentStreams()
} }
} }
@@ -684,9 +706,9 @@ class RealConnection(
* being used for future exchanges. * being used for future exchanges.
*/ */
internal fun trackFailure(call: RealCall, e: IOException?) { internal fun trackFailure(call: RealCall, e: IOException?) {
connectionPool.assertThreadDoesntHoldLock() assertThreadDoesntHoldLock()
synchronized(connectionPool) { synchronized(this) {
if (e is StreamResetException) { if (e is StreamResetException) {
when { when {
e.errorCode == ErrorCode.REFUSED_STREAM -> { e.errorCode == ErrorCode.REFUSED_STREAM -> {
@@ -742,11 +764,11 @@ class RealConnection(
connectionPool: RealConnectionPool, connectionPool: RealConnectionPool,
route: Route, route: Route,
socket: Socket, socket: Socket,
idleAtNanos: Long idleAtNs: Long
): RealConnection { ): RealConnection {
val result = RealConnection(connectionPool, route) val result = RealConnection(connectionPool, route)
result.socket = socket result.socket = socket
result.idleAtNs = idleAtNanos result.idleAtNs = idleAtNs
return result return result
} }
} }

View File

@@ -16,7 +16,7 @@
*/ */
package okhttp3.internal.connection package okhttp3.internal.connection
import java.util.ArrayDeque import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import okhttp3.Address import okhttp3.Address
import okhttp3.ConnectionPool import okhttp3.ConnectionPool
@@ -44,18 +44,25 @@ class RealConnectionPool(
override fun runOnce() = cleanup(System.nanoTime()) override fun runOnce() = cleanup(System.nanoTime())
} }
private val connections = ArrayDeque<RealConnection>() /**
* Holding the lock of the connection being added or removed when mutating this, and check its
* [RealConnection.noNewExchanges] property. This defends against races where a connection is
* simultaneously adopted and removed.
*/
private val connections = ConcurrentLinkedQueue<RealConnection>()
init { init {
// Put a floor on the keep alive duration, otherwise cleanup will spin loop. // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
require(keepAliveDuration > 0L) { "keepAliveDuration <= 0: $keepAliveDuration" } require(keepAliveDuration > 0L) { "keepAliveDuration <= 0: $keepAliveDuration" }
} }
@Synchronized fun idleConnectionCount(): Int { fun idleConnectionCount(): Int {
return connections.count { it.calls.isEmpty() } return connections.count {
synchronized(it) { it.calls.isEmpty() }
}
} }
@Synchronized fun connectionCount(): Int { fun connectionCount(): Int {
return connections.size return connections.size
} }
@@ -73,32 +80,33 @@ class RealConnectionPool(
routes: List<Route>?, routes: List<Route>?,
requireMultiplexed: Boolean requireMultiplexed: Boolean
): Boolean { ): Boolean {
this.assertThreadHoldsLock()
for (connection in connections) { for (connection in connections) {
if (requireMultiplexed && !connection.isMultiplexed) continue synchronized(connection) {
if (!connection.isEligible(address, routes)) continue if (requireMultiplexed && !connection.isMultiplexed) return@synchronized
call.acquireConnectionNoEvents(connection) if (!connection.isEligible(address, routes)) return@synchronized
return true call.acquireConnectionNoEvents(connection)
return true
}
} }
return false return false
} }
fun put(connection: RealConnection) { fun put(connection: RealConnection) {
this.assertThreadHoldsLock() connection.assertThreadHoldsLock()
connections.add(connection) connections.add(connection)
cleanupQueue.schedule(cleanupTask) cleanupQueue.schedule(cleanupTask)
} }
/** /**
* Notify this pool that [connection] has become idle. Returns true if the connection has * Notify this pool that [connection] has become idle. Returns true if the connection has been
* been removed from the pool and should be closed. * removed from the pool and should be closed.
*/ */
fun connectionBecameIdle(connection: RealConnection): Boolean { fun connectionBecameIdle(connection: RealConnection): Boolean {
this.assertThreadHoldsLock() connection.assertThreadHoldsLock()
return if (connection.noNewExchanges || maxIdleConnections == 0) { return if (connection.noNewExchanges || maxIdleConnections == 0) {
connection.noNewExchanges = true
connections.remove(connection) connections.remove(connection)
if (connections.isEmpty()) cleanupQueue.cancelAll() if (connections.isEmpty()) cleanupQueue.cancelAll()
true true
@@ -109,23 +117,22 @@ class RealConnectionPool(
} }
fun evictAll() { fun evictAll() {
val evictedConnections = mutableListOf<RealConnection>() val i = connections.iterator()
synchronized(this) { while (i.hasNext()) {
val i = connections.iterator() val connection = i.next()
while (i.hasNext()) { val socketToClose = synchronized(connection) {
val connection = i.next()
if (connection.calls.isEmpty()) { if (connection.calls.isEmpty()) {
connection.noNewExchanges = true
evictedConnections.add(connection)
i.remove() i.remove()
connection.noNewExchanges = true
return@synchronized connection.socket()
} else {
return@synchronized null
} }
} }
if (connections.isEmpty()) cleanupQueue.cancelAll() socketToClose?.closeQuietly()
} }
for (connection in evictedConnections) { if (connections.isEmpty()) cleanupQueue.cancelAll()
connection.socket().closeQuietly()
}
} }
/** /**
@@ -142,52 +149,61 @@ class RealConnectionPool(
var longestIdleDurationNs = Long.MIN_VALUE var longestIdleDurationNs = Long.MIN_VALUE
// Find either a connection to evict, or the time that the next eviction is due. // Find either a connection to evict, or the time that the next eviction is due.
synchronized(this) { for (connection in connections) {
for (connection in connections) { synchronized(connection) {
// If the connection is in use, keep searching. // If the connection is in use, keep searching.
if (pruneAndGetAllocationCount(connection, now) > 0) { if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++ inUseConnectionCount++
continue } else {
} idleConnectionCount++
idleConnectionCount++ // If the connection is ready to be evicted, we're done.
val idleDurationNs = now - connection.idleAtNs
// If the connection is ready to be evicted, we're done. if (idleDurationNs > longestIdleDurationNs) {
val idleDurationNs = now - connection.idleAtNs longestIdleDurationNs = idleDurationNs
if (idleDurationNs > longestIdleDurationNs) { longestIdleConnection = connection
longestIdleDurationNs = idleDurationNs } else {
longestIdleConnection = connection Unit
} }
}
when {
longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections -> {
// We've found a connection to evict. Remove it from the list, then close it below
// (outside of the synchronized block).
connections.remove(longestIdleConnection)
if (connections.isEmpty()) cleanupQueue.cancelAll()
}
idleConnectionCount > 0 -> {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs
}
inUseConnectionCount > 0 -> {
// All connections are in use. It'll be at least the keep alive duration 'til we run
// again.
return keepAliveDurationNs
}
else -> {
// No connections, idle or in use.
return -1
} }
} }
} }
longestIdleConnection!!.socket().closeQuietly() when {
longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections -> {
// We've chosen a connection to evict. Confirm it's still okay to be evict, then close it.
val connection = longestIdleConnection!!
synchronized(connection) {
if (connection.calls.isNotEmpty()) return 0L // No longer idle.
if (connection.idleAtNs + longestIdleDurationNs != now) return 0L // No longer oldest.
connection.noNewExchanges = true
connections.remove(longestIdleConnection)
}
// Cleanup again immediately. connection.socket().closeQuietly()
return 0L if (connections.isEmpty()) cleanupQueue.cancelAll()
// Clean up again immediately.
return 0L
}
idleConnectionCount > 0 -> {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs
}
inUseConnectionCount > 0 -> {
// All connections are in use. It'll be at least the keep alive duration 'til we run
// again.
return keepAliveDurationNs
}
else -> {
// No connections, idle or in use.
return -1
}
}
} }
/** /**
@@ -196,6 +212,8 @@ class RealConnectionPool(
* them. Leak detection is imprecise and relies on garbage collection. * them. Leak detection is imprecise and relies on garbage collection.
*/ */
private fun pruneAndGetAllocationCount(connection: RealConnection, now: Long): Int { private fun pruneAndGetAllocationCount(connection: RealConnection, now: Long): Int {
connection.assertThreadHoldsLock()
val references = connection.calls val references = connection.calls
var i = 0 var i = 0
while (i < references.size) { while (i < references.size) {

View File

@@ -84,12 +84,12 @@ public final class ConnectionPoolTest {
ConnectionPool poolApi = new ConnectionPool(pool); ConnectionPool poolApi = new ConnectionPool(pool);
RealConnection c1 = newConnection(pool, routeA1, 50L); RealConnection c1 = newConnection(pool, routeA1, 50L);
synchronized (pool) { OkHttpClient client = new OkHttpClient.Builder()
OkHttpClient client = new OkHttpClient.Builder() .connectionPool(poolApi)
.connectionPool(poolApi) .build();
.build(); RealCall call = (RealCall) client.newCall(newRequest(addressA));
RealCall call = (RealCall) client.newCall(newRequest(addressA)); call.enterNetworkInterceptorExchange(call.request(), true);
call.enterNetworkInterceptorExchange(call.request(), true); synchronized (c1) {
call.acquireConnectionNoEvents(c1); call.acquireConnectionNoEvents(c1);
} }
@@ -206,12 +206,12 @@ public final class ConnectionPoolTest {
/** Use a helper method so there's no hidden reference remaining on the stack. */ /** Use a helper method so there's no hidden reference remaining on the stack. */
private void allocateAndLeakAllocation(ConnectionPool pool, RealConnection connection) { private void allocateAndLeakAllocation(ConnectionPool pool, RealConnection connection) {
synchronized (RealConnectionPool.Companion.get(pool)) { OkHttpClient client = new OkHttpClient.Builder()
OkHttpClient client = new OkHttpClient.Builder() .connectionPool(pool)
.connectionPool(pool) .build();
.build(); RealCall call = (RealCall) client.newCall(newRequest(connection.route().address()));
RealCall call = (RealCall) client.newCall(newRequest(connection.route().address())); call.enterNetworkInterceptorExchange(call.request(), true);
call.enterNetworkInterceptorExchange(call.request(), true); synchronized (connection) {
call.acquireConnectionNoEvents(connection); call.acquireConnectionNoEvents(connection);
} }
} }
@@ -219,7 +219,7 @@ public final class ConnectionPoolTest {
private RealConnection newConnection(RealConnectionPool pool, Route route, long idleAtNanos) { private RealConnection newConnection(RealConnectionPool pool, Route route, long idleAtNanos) {
RealConnection result = RealConnection.Companion.newTestConnection( RealConnection result = RealConnection.Companion.newTestConnection(
pool, route, new Socket(), idleAtNanos); pool, route, new Socket(), idleAtNanos);
synchronized (pool) { synchronized (result) {
pool.put(result); pool.put(result);
} }
return result; return result;