From bafad8c9984af6013a24b8b8dfa948aa37ac6df5 Mon Sep 17 00:00:00 2001 From: Jesse Wilson Date: Sun, 24 May 2020 23:18:56 -0400 Subject: [PATCH] Use finer-grained locks in the connection pool Previously we were locking the RealConnectionPool for all connection-related state, including cancelation. This changes the locks to be per-Connection, leaning heavily on thread-confined fields in RealCall, Exchange, and ExchangeFinder. --- docs/concurrency.md | 26 +- .../internal/connection/ExchangeFinder.kt | 307 ++++++++---------- .../okhttp3/internal/connection/RealCall.kt | 176 +++++----- .../internal/connection/RealConnection.kt | 54 ++- .../internal/connection/RealConnectionPool.kt | 144 ++++---- .../connection/ConnectionPoolTest.java | 26 +- 6 files changed, 360 insertions(+), 373 deletions(-) diff --git a/docs/concurrency.md b/docs/concurrency.md index eeb0fa219..615990198 100644 --- a/docs/concurrency.md +++ b/docs/concurrency.md @@ -69,30 +69,26 @@ This is necessary for bookkeeping when creating new streams. Correct framing req ## Connection Pool -### Background - A primary responsibility for any HTTP client is to efficiently manage network connections. Creating and establishing new connections require a fair amount of overhead and added latency. OkHttp will make every effort to reuse existing connections to avoid this overhead and added latency. -Every OkHttpClient uses a connection pool. Its job is to maintain a reference to all open connections. When an HTTP request is started, OkHttp will attempt to reuse an existing connection from the pool. If there are no existing connections, a new one is created and put into the connection pool. For http/2, the connection can be reused immediately. For http/1, the request must be completed before it can be reused. +Every OkHttpClient uses a connection pool. Its job is to maintain a reference to all open connections. When an HTTP request is started, OkHttp will attempt to reuse an existing connection from the pool. If there are no existing connections, a new one is created and put into the connection pool. For HTTP/2, the connection can be reused immediately. For HTTP/1, the request must be completed before it can be reused. -Since HTTP requests frequently happen in parallel, the connection pool implementation must be thread-safe. +Since HTTP requests frequently happen in parallel, connection pooling must be thread-safe. -### ConnectionPool, RealConnection, and StreamAllocation +These are the primary classes involved with establishing, sharing, and terminating connections: -The primary classes involved with establishing, sharing and terminating connections are ConnectionPool, RealConnection and StreamAllocation. + * **RealConnectionPool** manages reuse of HTTP and HTTP/2 connections for reduced latency. Every OkHttpClient has one, and its lifetime spans the lifetime of the OkHttpClient. -**ConnectionPool**: Manages reuse of HTTP and HTTP/2 connections for reduced latency. Every OkHttpClient has one, and its lifetime spans the lifetime of the OkHttpClient. + * **RealConnection** is the socket and streams of an HTTP/1 or HTTP/2 connection. These are created on demand to fulfill HTTP requests. They may be reused for many HTTP request/response exchanges. Their lifetime is typically shorter than a connection pool. -**RealConnection**: The socket and streams of an HTTP and HTTP/2 connection. These are created on demand to fulfill HTTP requests. They may be reused for many HTTP request/response exchanges. Their lifetime is typically shorter than ConnectionPool. + * **Exchange** carries a single HTTP request/response pair. -**StreamAllocation**: Coordinates the relationship between connections, streams and calls. These are created for a single HTTP request/response exchange. Their lifetime is typically shorter than RealConnection. + * **ExchangeFinder** chooses which connection carries each exchange. Where possible it will use the same connection for all exchanges in a single call. It prefers reusing pooled connections over establishing new connections. -### Locks +#### Per-Connection Locks -A single lock is used to synchronize and guard the state of ConnectionPool, RealConnection and StreamAllocation. +Each connection has its own lock. The connections in the pool are all in a `ConcurrentLinkedQueue`. Due to data races, iterators of this queue may return removed connections. Callers must check the connection's `noNewExchanges` property before using connections from the pool. -### ConnectionPool +The connection lock is never held while doing I/O (even closing a socket) to prevent contention. -The fields in ConnectionPool, RealConnection and StreamAllocation are all guarded by the connection pool instance. This lock is never held while doing I/O (even closing a socket) to prevent contention. - -A single lock is preferred to avoid deadlock scenarios and the added overhead of aggregate lock/unlock that would occur if multiple locks were used. \ No newline at end of file +A lock-per-connection is used to maximize concurrency. diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/ExchangeFinder.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/ExchangeFinder.kt index 7eef18c32..0c8583b2a 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/ExchangeFinder.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/ExchangeFinder.kt @@ -22,8 +22,6 @@ import okhttp3.EventListener import okhttp3.HttpUrl import okhttp3.OkHttpClient import okhttp3.Route -import okhttp3.internal.assertThreadDoesntHoldLock -import okhttp3.internal.assertThreadHoldsLock import okhttp3.internal.canReuseConnectionFor import okhttp3.internal.closeQuietly import okhttp3.internal.http.ExchangeCodec @@ -51,6 +49,9 @@ import okhttp3.internal.http2.StreamResetException * will prefer pooled connections. Only pooled HTTP/2 connections are used for such de-duplication. * * It is possible to cancel the finding process. + * + * Instances of this class are not thread-safe. Each instance is thread-confined to the thread + * executing [call]. */ class ExchangeFinder( private val connectionPool: RealConnectionPool, @@ -59,10 +60,7 @@ class ExchangeFinder( private val eventListener: EventListener ) { private var routeSelection: RouteSelector.Selection? = null - - // State guarded by connectionPool. private var routeSelector: RouteSelector? = null - private var connectingConnection: RealConnection? = null private var refusedStreamCount = 0 private var connectionShutdownCount = 0 private var otherFailureCount = 0 @@ -123,23 +121,23 @@ class ExchangeFinder( // Make sure we have some routes left to try. One example where we may exhaust all the routes // would happen if we made a new connection and it immediately is detected as unhealthy. - synchronized(connectionPool) { - if (nextRouteToTry != null) return@synchronized + if (nextRouteToTry != null) continue - val routesLeft = routeSelection?.hasNext() ?: true - if (routesLeft) return@synchronized + val routesLeft = routeSelection?.hasNext() ?: true + if (routesLeft) continue - val routesSelectionLeft = routeSelector?.hasNext() ?: true - if (routesSelectionLeft) return@synchronized + val routesSelectionLeft = routeSelector?.hasNext() ?: true + if (routesSelectionLeft) continue - throw IOException("exhausted all routes") - } + throw IOException("exhausted all routes") } } /** * Returns a connection to host a new stream. This prefers the existing connection if it exists, * then the pool, finally building a new connection. + * + * This checks for cancellation before each blocking operation. */ @Throws(IOException::class) private fun findConnection( @@ -149,156 +147,123 @@ class ExchangeFinder( pingIntervalMillis: Int, connectionRetryEnabled: Boolean ): RealConnection { - var foundPooledConnection = false - var result: RealConnection? = null - var selectedRoute: Route? = null - var releasedConnection: RealConnection? - val toClose: Socket? - synchronized(connectionPool) { - if (call.isCanceled()) throw IOException("Canceled") + if (call.isCanceled()) throw IOException("Canceled") - val callConnection = call.connection // changes within this overall method - releasedConnection = callConnection - toClose = if (callConnection != null && (callConnection.noNewExchanges || - !sameHostAndPort(callConnection.route().address.url))) { - call.releaseConnectionNoEvents() - } else { - null - } - - if (call.connection != null) { - // We had an already-allocated connection and it's good. - result = call.connection - releasedConnection = null - } - - if (result == null) { - // The connection hasn't had any problems for this call. - refusedStreamCount = 0 - connectionShutdownCount = 0 - otherFailureCount = 0 - - // Attempt to get a connection from the pool. - if (connectionPool.callAcquirePooledConnection(address, call, null, false)) { - foundPooledConnection = true - result = call.connection - } else if (nextRouteToTry != null) { - selectedRoute = nextRouteToTry - nextRouteToTry = null + // Attempt to reuse the connection from the call. + val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()! + if (callConnection != null) { + var toClose: Socket? = null + synchronized(callConnection) { + if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) { + toClose = call.releaseConnectionNoEvents() } } - } - toClose?.closeQuietly() - if (releasedConnection != null) { - eventListener.connectionReleased(call, releasedConnection!!) - } - if (foundPooledConnection) { - eventListener.connectionAcquired(call, result!!) - } - if (result != null) { - // If we found an already-allocated or pooled connection, we're done. - return result!! + // If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here + // because we already acquired it. + if (call.connection != null) { + check(toClose == null) + return callConnection + } + + // The call's connection was released. + toClose?.closeQuietly() + eventListener.connectionReleased(call, callConnection) } - // If we need a route selection, make one. This is a blocking operation. - var newRouteSelection = false - if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) { + // We need a new connection. Give it fresh stats. + refusedStreamCount = 0 + connectionShutdownCount = 0 + otherFailureCount = 0 + + // Attempt to get a connection from the pool. + if (connectionPool.callAcquirePooledConnection(address, call, null, false)) { + val result = call.connection!! + eventListener.connectionAcquired(call, result) + return result + } + + // Nothing in the pool. Figure out what route we'll try next. + val routes: List? + val route: Route + if (nextRouteToTry != null) { + // Use a route from a preceding coalesced connection. + routes = null + route = nextRouteToTry!! + nextRouteToTry = null + } else if (routeSelection != null && routeSelection!!.hasNext()) { + // Use a route from an existing route selection. + routes = null + route = routeSelection!!.next() + } else { + // Compute a new route selection. This is a blocking operation! var localRouteSelector = routeSelector if (localRouteSelector == null) { localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener) this.routeSelector = localRouteSelector } - newRouteSelection = true - routeSelection = localRouteSelector.next() - } + val localRouteSelection = localRouteSelector.next() + routeSelection = localRouteSelection + routes = localRouteSelection.routes - var routes: List? = null - synchronized(connectionPool) { if (call.isCanceled()) throw IOException("Canceled") - if (newRouteSelection) { - // Now that we have a set of IP addresses, make another attempt at getting a connection from - // the pool. This could match due to connection coalescing. - routes = routeSelection!!.routes - if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) { - foundPooledConnection = true - result = call.connection - } + // Now that we have a set of IP addresses, make another attempt at getting a connection from + // the pool. We have a better chance of matching thanks to connection coalescing. + if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) { + val result = call.connection!! + eventListener.connectionAcquired(call, result) + return result } - if (!foundPooledConnection) { - if (selectedRoute == null) { - selectedRoute = routeSelection!!.next() - } - - // Create a connection and assign it to this allocation immediately. This makes it possible - // for an asynchronous cancel() to interrupt the handshake we're about to do. - result = RealConnection(connectionPool, selectedRoute!!) - connectingConnection = result - } + route = localRouteSelection.next() } - // If we found a pooled connection on the 2nd time around, we're done. - if (foundPooledConnection) { - eventListener.connectionAcquired(call, result!!) - return result!! + // Connect. Tell the call about the connecting call so async cancels work. + val newConnection = RealConnection(connectionPool, route) + call.connectionToCancel = newConnection + try { + newConnection.connect( + connectTimeout, + readTimeout, + writeTimeout, + pingIntervalMillis, + connectionRetryEnabled, + call, + eventListener + ) + } finally { + call.connectionToCancel = null + } + call.client.routeDatabase.connected(newConnection.route()) + + // If we raced another call connecting to this host, coalesce the connections. This makes for 3 + // different lookups in the connection pool! + if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) { + val result = call.connection!! + nextRouteToTry = route + newConnection.socket().closeQuietly() + eventListener.connectionAcquired(call, result) + return result } - // Do TCP + TLS handshakes. This is a blocking operation. - result!!.connect( - connectTimeout, - readTimeout, - writeTimeout, - pingIntervalMillis, - connectionRetryEnabled, - call, - eventListener - ) - call.client.routeDatabase.connected(result!!.route()) - - var socket: Socket? = null - synchronized(connectionPool) { - connectingConnection = null - // Last attempt at connection coalescing, which only occurs if we attempted multiple - // concurrent connections to the same host. - if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) { - // We lost the race! Close the connection we created and return the pooled connection. - result!!.noNewExchanges = true - socket = result!!.socket() - result = call.connection - - // It's possible for us to obtain a coalesced connection that is immediately unhealthy. In - // that case we will retry the route we just successfully connected with. - nextRouteToTry = selectedRoute - } else { - connectionPool.put(result!!) - call.acquireConnectionNoEvents(result!!) - } + synchronized(newConnection) { + connectionPool.put(newConnection) + call.acquireConnectionNoEvents(newConnection) } - socket?.closeQuietly() - eventListener.connectionAcquired(call, result!!) - return result!! - } - - fun connectingConnection(): RealConnection? { - connectionPool.assertThreadHoldsLock() - return connectingConnection + eventListener.connectionAcquired(call, newConnection) + return newConnection } fun trackFailure(e: IOException) { - connectionPool.assertThreadDoesntHoldLock() - - synchronized(connectionPool) { - nextRouteToTry = null - if (e is StreamResetException && e.errorCode == ErrorCode.REFUSED_STREAM) { - refusedStreamCount++ - } else if (e is ConnectionShutdownException) { - connectionShutdownCount++ - } else { - otherFailureCount++ - } + nextRouteToTry = null + if (e is StreamResetException && e.errorCode == ErrorCode.REFUSED_STREAM) { + refusedStreamCount++ + } else if (e is ConnectionShutdownException) { + connectionShutdownCount++ + } else { + otherFailureCount++ } } @@ -307,46 +272,48 @@ class ExchangeFinder( * a route to retry on. */ fun retryAfterFailure(): Boolean { - synchronized(connectionPool) { - if (refusedStreamCount == 0 && connectionShutdownCount == 0 && otherFailureCount == 0) { - return false // Nothing to recover from. - } - - if (nextRouteToTry != null) { - return true - } - - if (retryCurrentRoute()) { - // Lock in the route because retryCurrentRoute() is racy and we don't want to call it twice. - nextRouteToTry = call.connection!!.route() - return true - } - - // If we have a routes left, use 'em. - if (routeSelection?.hasNext() == true) return true - - // If we haven't initialized the route selector yet, assume it'll have at least one route. - val localRouteSelector = routeSelector ?: return true - - // If we do have a route selector, use its routes. - return localRouteSelector.hasNext() + if (refusedStreamCount == 0 && connectionShutdownCount == 0 && otherFailureCount == 0) { + return false // Nothing to recover from. } + + if (nextRouteToTry != null) { + return true + } + + val retryRoute = retryRoute() + if (retryRoute != null) { + // Lock in the route because retryRoute() is racy and we don't want to call it twice. + nextRouteToTry = retryRoute + return true + } + + // If we have a routes left, use 'em. + if (routeSelection?.hasNext() == true) return true + + // If we haven't initialized the route selector yet, assume it'll have at least one route. + val localRouteSelector = routeSelector ?: return true + + // If we do have a route selector, use its routes. + return localRouteSelector.hasNext() } /** - * Return true if the route used for the current connection should be retried, even if the - * connection itself is unhealthy. The biggest gotcha here is that we shouldn't reuse routes from - * coalesced connections. + * Return the route from the current connection if it should be retried, even if the connection + * itself is unhealthy. The biggest gotcha here is that we shouldn't reuse routes from coalesced + * connections. */ - private fun retryCurrentRoute(): Boolean { + private fun retryRoute(): Route? { if (refusedStreamCount > 1 || connectionShutdownCount > 1 || otherFailureCount > 0) { - return false // This route has too many problems to retry. + return null // This route has too many problems to retry. } - val connection = call.connection - return connection != null && - connection.routeFailureCount == 0 && - connection.route().address.url.canReuseConnectionFor(address.url) + val connection = call.connection ?: return null + + synchronized(connection) { + if (connection.routeFailureCount != 0) return null + if (!connection.route().address.url.canReuseConnectionFor(address.url)) return null + return connection.route() + } } /** diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt index d4e01ac99..be65c39db 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt @@ -22,6 +22,7 @@ import java.net.Socket import java.util.concurrent.ExecutorService import java.util.concurrent.RejectedExecutionException import java.util.concurrent.TimeUnit.MILLISECONDS +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger import javax.net.ssl.HostnameVerifier import javax.net.ssl.SSLSocketFactory @@ -29,7 +30,6 @@ import okhttp3.Address import okhttp3.Call import okhttp3.Callback import okhttp3.CertificatePinner -import okhttp3.Connection import okhttp3.EventListener import okhttp3.HttpUrl import okhttp3.Interceptor @@ -75,24 +75,23 @@ class RealCall( timeout(client.callTimeoutMillis.toLong(), MILLISECONDS) } + private val executed = AtomicBoolean() + + // These properties are only accessed by the thread executing the call. + /** Initialized in [callStart]. */ private var callStackTrace: Any? = null /** Finds an exchange to send the next request and receive the next response. */ private var exchangeFinder: ExchangeFinder? = null - // Guarded by connectionPool. var connection: RealConnection? = null - private var exchange: Exchange? = null + private set private var exchangeRequestDone = false private var exchangeResponseDone = false - private var canceled = false private var timeoutEarlyExit = false private var noMoreExchanges = false - // Guarded by this. - private var executed = false - /** * This is the same value as [exchange], but scoped to the execution of the network interceptors. * The [exchange] field is assigned to null when its streams end, which may be before or after the @@ -101,6 +100,13 @@ class RealCall( internal var interceptorScopedExchange: Exchange? = null private set + // These properties are accessed by canceling threads. Any thread can cancel a call, and once it's + // canceled it's canceled forever. + + @Volatile private var canceled = false + @Volatile private var exchange: Exchange? = null + @Volatile var connectionToCancel: RealConnection? = null + override fun timeout() = timeout @SuppressWarnings("CloneDoesntCallSuperClone") // We are a final type & this saves clearing state. @@ -118,29 +124,20 @@ class RealCall( * if a socket connection is being established, that is terminated. */ override fun cancel() { - val exchangeToCancel: Exchange? - val connectionToCancel: RealConnection? - synchronized(connectionPool) { - if (canceled) return // Already canceled. - canceled = true - exchangeToCancel = exchange - connectionToCancel = exchangeFinder?.connectingConnection() ?: connection - } - exchangeToCancel?.cancel() ?: connectionToCancel?.cancel() + if (canceled) return // Already canceled. + + canceled = true + exchange?.cancel() + connectionToCancel?.cancel() + eventListener.canceled(this) } - override fun isCanceled(): Boolean { - synchronized(connectionPool) { - return canceled - } - } + override fun isCanceled() = canceled override fun execute(): Response { - synchronized(this) { - check(!executed) { "Already Executed" } - executed = true - } + check(executed.compareAndSet(false, true)) { "Already Executed" } + timeout.enter() callStart() try { @@ -152,15 +149,13 @@ class RealCall( } override fun enqueue(responseCallback: Callback) { - synchronized(this) { - check(!executed) { "Already Executed" } - executed = true - } + check(executed.compareAndSet(false, true)) { "Already Executed" } + callStart() client.dispatcher.enqueue(AsyncCall(responseCallback)) } - @Synchronized override fun isExecuted(): Boolean = executed + override fun isExecuted(): Boolean = executed.get() private fun callStart() { this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()") @@ -237,25 +232,23 @@ class RealCall( /** Finds a new or pooled connection to carry a forthcoming request and response. */ internal fun initExchange(chain: RealInterceptorChain): Exchange { - synchronized(connectionPool) { - check(!noMoreExchanges) { "released" } - check(exchange == null) - } + check(!noMoreExchanges) { "released" } + check(exchange == null) - val codec = exchangeFinder!!.find(client, chain) - val result = Exchange(this, eventListener, exchangeFinder!!, codec) + val exchangeFinder = this.exchangeFinder!! + val codec = exchangeFinder.find(client, chain) + val result = Exchange(this, eventListener, exchangeFinder, codec) this.interceptorScopedExchange = result + this.exchange = result + this.exchangeRequestDone = false + this.exchangeResponseDone = false - synchronized(connectionPool) { - this.exchange = result - this.exchangeRequestDone = false - this.exchangeResponseDone = false - return result - } + if (canceled) throw IOException("Canceled") + return result } fun acquireConnectionNoEvents(connection: RealConnection) { - connectionPool.assertThreadHoldsLock() + connection.assertThreadHoldsLock() check(this.connection == null) this.connection = connection @@ -277,37 +270,28 @@ class RealCall( e: E ): E { var result = e - var exchangeDone = false - synchronized(connectionPool) { - if (exchange != this.exchange) { - return result // This exchange was detached violently! - } - var changed = false - if (requestDone) { - if (!exchangeRequestDone) changed = true - this.exchangeRequestDone = true - } - if (responseDone) { - if (!exchangeResponseDone) changed = true - this.exchangeResponseDone = true - } - if (exchangeRequestDone && exchangeResponseDone && changed) { - exchangeDone = true - this.exchange!!.connection.successCount++ - this.exchange = null - } + if (exchange != this.exchange) { + return result // This exchange was detached violently! } - if (exchangeDone) { - result = maybeReleaseConnection(result, false) + var changed = false + if (requestDone) { + if (!exchangeRequestDone) changed = true + this.exchangeRequestDone = true + } + if (responseDone) { + if (!exchangeResponseDone) changed = true + this.exchangeResponseDone = true + } + if (exchangeRequestDone && exchangeResponseDone && changed) { + this.exchange = null + result = maybeReleaseConnection(result, incrementSuccesses = true) } return result } internal fun noMoreExchanges(e: IOException?): IOException? { - synchronized(connectionPool) { - noMoreExchanges = true - } - return maybeReleaseConnection(e, false) + noMoreExchanges = true + return maybeReleaseConnection(e, incrementSuccesses = false) } /** @@ -316,31 +300,30 @@ class RealCall( * * If the call was canceled or timed out, this will wrap [e] in an exception that provides that * additional context. Otherwise [e] is returned as-is. - * - * @param force true to release the connection even if more exchanges are expected for the call. */ - private fun maybeReleaseConnection(e: E, force: Boolean): E { + private fun maybeReleaseConnection(e: E, incrementSuccesses: Boolean): E { var result = e - val socket: Socket? - var releasedConnection: Connection? - val callEnd: Boolean - synchronized(connectionPool) { - check(!force || exchange == null) { "cannot release connection while it is in use" } - releasedConnection = this.connection - socket = if (this.connection != null && exchange == null && (force || noMoreExchanges)) { - releaseConnectionNoEvents() - } else { - null + val connection = this.connection + if (connection != null) { + connection.assertThreadDoesntHoldLock() + var socket: Socket? = null + synchronized(connection) { + if (incrementSuccesses) { + connection.successCount++ + } + if (exchange == null && noMoreExchanges) { + socket = releaseConnectionNoEvents() // Sets this.connection to null. + } + } + if (this.connection == null) { + socket?.closeQuietly() + eventListener.connectionReleased(this, connection) + } else { + check(socket == null) // If we still have a connection we shouldn't be closing any sockets. } - if (this.connection != null) releasedConnection = null - callEnd = noMoreExchanges && exchange == null - } - socket?.closeQuietly() - - if (releasedConnection != null) { - eventListener.connectionReleased(this, releasedConnection!!) } + val callEnd = noMoreExchanges && exchange == null if (callEnd) { val callFailed = result != null result = timeoutExit(result) @@ -358,19 +341,20 @@ class RealCall( * should close. */ internal fun releaseConnectionNoEvents(): Socket? { - connectionPool.assertThreadHoldsLock() + val connection = this.connection!! + connection.assertThreadHoldsLock() - val index = connection!!.calls.indexOfFirst { it.get() == this@RealCall } + val calls = connection.calls + val index = calls.indexOfFirst { it.get() == this@RealCall } check(index != -1) - val released = this.connection - released!!.calls.removeAt(index) + calls.removeAt(index) this.connection = null - if (released.calls.isEmpty()) { - released.idleAtNs = System.nanoTime() - if (connectionPool.connectionBecameIdle(released)) { - return released.socket() + if (calls.isEmpty()) { + connection.idleAtNs = System.nanoTime() + if (connectionPool.connectionBecameIdle(connection)) { + return connection.socket() } } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt index 2af046146..be614a8d9 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt @@ -46,6 +46,7 @@ import okhttp3.Response import okhttp3.Route import okhttp3.internal.EMPTY_RESPONSE import okhttp3.internal.assertThreadDoesntHoldLock +import okhttp3.internal.assertThreadHoldsLock import okhttp3.internal.closeQuietly import okhttp3.internal.concurrent.TaskRunner import okhttp3.internal.http.ExchangeCodec @@ -70,12 +71,23 @@ import okio.buffer import okio.sink import okio.source +/** + * A connection to a remote web server capable of carrying 1 or more concurrent streams. + * + * A connection's lifecycle has two phases. + * + * 1. While it's connecting, the connection is owned by a single call using single thread. In this + * phase the connection is not shared and no locking is necessary. + * + * 2. Once connected, a connection is shared to a connection pool. In this phase accesses to the + * connection's state must be guarded by holding a lock on the connection. + */ class RealConnection( val connectionPool: RealConnectionPool, private val route: Route ) : Http2Connection.Listener(), Connection { - // The fields below are initialized by connect() and never reassigned. + // These properties are initialized by connect() and never reassigned. /** The low-level TCP socket. */ private var rawSocket: Socket? = null @@ -91,11 +103,15 @@ class RealConnection( private var source: BufferedSource? = null private var sink: BufferedSink? = null - // The fields below track connection state and are guarded by connectionPool. + // These properties are guarded by this. /** - * If true, no new exchanges can be created on this connection. Once true this is always true. - * Guarded by [connectionPool]. + * If true, no new exchanges can be created on this connection. It is necessary to set this to + * true when removing a connection from the pool; otherwise a racing caller might get it from the + * pool when it shouldn't. Symmetrically, this must always be checked before returning a + * connection from the pool. + * + * Once true this is always true. Guarded by this. */ var noNewExchanges = false @@ -107,7 +123,7 @@ class RealConnection( /** * The number of times there was a problem establishing a stream that could be due to route - * chosen. Guarded by [connectionPool]. + * chosen. Guarded by this. */ internal var routeFailureCount = 0 @@ -135,18 +151,18 @@ class RealConnection( /** Prevent further exchanges from being created on this connection. */ fun noNewExchanges() { - connectionPool.assertThreadDoesntHoldLock() + assertThreadDoesntHoldLock() - synchronized(connectionPool) { + synchronized(this) { noNewExchanges = true } } /** Prevent this connection from being used for hosts other than the one in [route]. */ fun noCoalescedConnections() { - connectionPool.assertThreadDoesntHoldLock() + assertThreadDoesntHoldLock() - synchronized(connectionPool) { + synchronized(this) { noCoalescedConnections = true } } @@ -514,6 +530,8 @@ class RealConnection( * `route` is the resolved route for a connection. */ internal fun isEligible(address: Address, routes: List?): Boolean { + assertThreadHoldsLock() + // If this connection is not accepting new exchanges, we're done. if (calls.size >= allocationLimit || noNewExchanges) return false @@ -564,7 +582,9 @@ class RealConnection( } } - fun supportsUrl(url: HttpUrl): Boolean { + private fun supportsUrl(url: HttpUrl): Boolean { + assertThreadHoldsLock() + val routeUrl = route.address.url if (url.port != routeUrl.port) { @@ -629,6 +649,8 @@ class RealConnection( /** Returns true if this connection is ready to host new streams. */ fun isHealthy(doExtensiveChecks: Boolean): Boolean { + assertThreadDoesntHoldLock() + val nowNs = System.nanoTime() val rawSocket = this.rawSocket!! @@ -644,7 +666,7 @@ class RealConnection( return http2Connection.isHealthy(nowNs) } - val idleDurationNs = nowNs - idleAtNs + val idleDurationNs = synchronized(this) { nowNs - idleAtNs } if (idleDurationNs >= IDLE_CONNECTION_HEALTHY_NS && doExtensiveChecks) { return socket.isHealthy(source) } @@ -660,7 +682,7 @@ class RealConnection( /** When settings are received, adjust the allocation limit. */ override fun onSettings(connection: Http2Connection, settings: Settings) { - synchronized(connectionPool) { + synchronized(this) { allocationLimit = settings.getMaxConcurrentStreams() } } @@ -684,9 +706,9 @@ class RealConnection( * being used for future exchanges. */ internal fun trackFailure(call: RealCall, e: IOException?) { - connectionPool.assertThreadDoesntHoldLock() + assertThreadDoesntHoldLock() - synchronized(connectionPool) { + synchronized(this) { if (e is StreamResetException) { when { e.errorCode == ErrorCode.REFUSED_STREAM -> { @@ -742,11 +764,11 @@ class RealConnection( connectionPool: RealConnectionPool, route: Route, socket: Socket, - idleAtNanos: Long + idleAtNs: Long ): RealConnection { val result = RealConnection(connectionPool, route) result.socket = socket - result.idleAtNs = idleAtNanos + result.idleAtNs = idleAtNs return result } } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnectionPool.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnectionPool.kt index 7f49c0469..c349be707 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnectionPool.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnectionPool.kt @@ -16,7 +16,7 @@ */ package okhttp3.internal.connection -import java.util.ArrayDeque +import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.TimeUnit import okhttp3.Address import okhttp3.ConnectionPool @@ -44,18 +44,25 @@ class RealConnectionPool( override fun runOnce() = cleanup(System.nanoTime()) } - private val connections = ArrayDeque() + /** + * Holding the lock of the connection being added or removed when mutating this, and check its + * [RealConnection.noNewExchanges] property. This defends against races where a connection is + * simultaneously adopted and removed. + */ + private val connections = ConcurrentLinkedQueue() init { // Put a floor on the keep alive duration, otherwise cleanup will spin loop. require(keepAliveDuration > 0L) { "keepAliveDuration <= 0: $keepAliveDuration" } } - @Synchronized fun idleConnectionCount(): Int { - return connections.count { it.calls.isEmpty() } + fun idleConnectionCount(): Int { + return connections.count { + synchronized(it) { it.calls.isEmpty() } + } } - @Synchronized fun connectionCount(): Int { + fun connectionCount(): Int { return connections.size } @@ -73,32 +80,33 @@ class RealConnectionPool( routes: List?, requireMultiplexed: Boolean ): Boolean { - this.assertThreadHoldsLock() - for (connection in connections) { - if (requireMultiplexed && !connection.isMultiplexed) continue - if (!connection.isEligible(address, routes)) continue - call.acquireConnectionNoEvents(connection) - return true + synchronized(connection) { + if (requireMultiplexed && !connection.isMultiplexed) return@synchronized + if (!connection.isEligible(address, routes)) return@synchronized + call.acquireConnectionNoEvents(connection) + return true + } } return false } fun put(connection: RealConnection) { - this.assertThreadHoldsLock() + connection.assertThreadHoldsLock() connections.add(connection) cleanupQueue.schedule(cleanupTask) } /** - * Notify this pool that [connection] has become idle. Returns true if the connection has - * been removed from the pool and should be closed. + * Notify this pool that [connection] has become idle. Returns true if the connection has been + * removed from the pool and should be closed. */ fun connectionBecameIdle(connection: RealConnection): Boolean { - this.assertThreadHoldsLock() + connection.assertThreadHoldsLock() return if (connection.noNewExchanges || maxIdleConnections == 0) { + connection.noNewExchanges = true connections.remove(connection) if (connections.isEmpty()) cleanupQueue.cancelAll() true @@ -109,23 +117,22 @@ class RealConnectionPool( } fun evictAll() { - val evictedConnections = mutableListOf() - synchronized(this) { - val i = connections.iterator() - while (i.hasNext()) { - val connection = i.next() + val i = connections.iterator() + while (i.hasNext()) { + val connection = i.next() + val socketToClose = synchronized(connection) { if (connection.calls.isEmpty()) { - connection.noNewExchanges = true - evictedConnections.add(connection) i.remove() + connection.noNewExchanges = true + return@synchronized connection.socket() + } else { + return@synchronized null } } - if (connections.isEmpty()) cleanupQueue.cancelAll() + socketToClose?.closeQuietly() } - for (connection in evictedConnections) { - connection.socket().closeQuietly() - } + if (connections.isEmpty()) cleanupQueue.cancelAll() } /** @@ -142,52 +149,61 @@ class RealConnectionPool( var longestIdleDurationNs = Long.MIN_VALUE // Find either a connection to evict, or the time that the next eviction is due. - synchronized(this) { - for (connection in connections) { + for (connection in connections) { + synchronized(connection) { // If the connection is in use, keep searching. if (pruneAndGetAllocationCount(connection, now) > 0) { inUseConnectionCount++ - continue - } + } else { + idleConnectionCount++ - idleConnectionCount++ - - // If the connection is ready to be evicted, we're done. - val idleDurationNs = now - connection.idleAtNs - if (idleDurationNs > longestIdleDurationNs) { - longestIdleDurationNs = idleDurationNs - longestIdleConnection = connection - } - } - - when { - longestIdleDurationNs >= this.keepAliveDurationNs - || idleConnectionCount > this.maxIdleConnections -> { - // We've found a connection to evict. Remove it from the list, then close it below - // (outside of the synchronized block). - connections.remove(longestIdleConnection) - if (connections.isEmpty()) cleanupQueue.cancelAll() - } - idleConnectionCount > 0 -> { - // A connection will be ready to evict soon. - return keepAliveDurationNs - longestIdleDurationNs - } - inUseConnectionCount > 0 -> { - // All connections are in use. It'll be at least the keep alive duration 'til we run - // again. - return keepAliveDurationNs - } - else -> { - // No connections, idle or in use. - return -1 + // If the connection is ready to be evicted, we're done. + val idleDurationNs = now - connection.idleAtNs + if (idleDurationNs > longestIdleDurationNs) { + longestIdleDurationNs = idleDurationNs + longestIdleConnection = connection + } else { + Unit + } } } } - longestIdleConnection!!.socket().closeQuietly() + when { + longestIdleDurationNs >= this.keepAliveDurationNs + || idleConnectionCount > this.maxIdleConnections -> { + // We've chosen a connection to evict. Confirm it's still okay to be evict, then close it. + val connection = longestIdleConnection!! + synchronized(connection) { + if (connection.calls.isNotEmpty()) return 0L // No longer idle. + if (connection.idleAtNs + longestIdleDurationNs != now) return 0L // No longer oldest. + connection.noNewExchanges = true + connections.remove(longestIdleConnection) + } - // Cleanup again immediately. - return 0L + connection.socket().closeQuietly() + if (connections.isEmpty()) cleanupQueue.cancelAll() + + // Clean up again immediately. + return 0L + } + + idleConnectionCount > 0 -> { + // A connection will be ready to evict soon. + return keepAliveDurationNs - longestIdleDurationNs + } + + inUseConnectionCount > 0 -> { + // All connections are in use. It'll be at least the keep alive duration 'til we run + // again. + return keepAliveDurationNs + } + + else -> { + // No connections, idle or in use. + return -1 + } + } } /** @@ -196,6 +212,8 @@ class RealConnectionPool( * them. Leak detection is imprecise and relies on garbage collection. */ private fun pruneAndGetAllocationCount(connection: RealConnection, now: Long): Int { + connection.assertThreadHoldsLock() + val references = connection.calls var i = 0 while (i < references.size) { diff --git a/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.java b/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.java index b195e9071..cc516a476 100644 --- a/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.java +++ b/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.java @@ -84,12 +84,12 @@ public final class ConnectionPoolTest { ConnectionPool poolApi = new ConnectionPool(pool); RealConnection c1 = newConnection(pool, routeA1, 50L); - synchronized (pool) { - OkHttpClient client = new OkHttpClient.Builder() - .connectionPool(poolApi) - .build(); - RealCall call = (RealCall) client.newCall(newRequest(addressA)); - call.enterNetworkInterceptorExchange(call.request(), true); + OkHttpClient client = new OkHttpClient.Builder() + .connectionPool(poolApi) + .build(); + RealCall call = (RealCall) client.newCall(newRequest(addressA)); + call.enterNetworkInterceptorExchange(call.request(), true); + synchronized (c1) { call.acquireConnectionNoEvents(c1); } @@ -206,12 +206,12 @@ public final class ConnectionPoolTest { /** Use a helper method so there's no hidden reference remaining on the stack. */ private void allocateAndLeakAllocation(ConnectionPool pool, RealConnection connection) { - synchronized (RealConnectionPool.Companion.get(pool)) { - OkHttpClient client = new OkHttpClient.Builder() - .connectionPool(pool) - .build(); - RealCall call = (RealCall) client.newCall(newRequest(connection.route().address())); - call.enterNetworkInterceptorExchange(call.request(), true); + OkHttpClient client = new OkHttpClient.Builder() + .connectionPool(pool) + .build(); + RealCall call = (RealCall) client.newCall(newRequest(connection.route().address())); + call.enterNetworkInterceptorExchange(call.request(), true); + synchronized (connection) { call.acquireConnectionNoEvents(connection); } } @@ -219,7 +219,7 @@ public final class ConnectionPoolTest { private RealConnection newConnection(RealConnectionPool pool, Route route, long idleAtNanos) { RealConnection result = RealConnection.Companion.newTestConnection( pool, route, new Socket(), idleAtNanos); - synchronized (pool) { + synchronized (result) { pool.put(result); } return result;