diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt index eee8a7b2c..bbf4a6d6d 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt @@ -87,10 +87,7 @@ class RealCall( var connection: RealConnection? = null private set - private var requestBodyOpen = false - private var responseBodyOpen = false private var timeoutEarlyExit = false - private var expectMoreExchanges = true /** * This is the same value as [exchange], but scoped to the execution of the network interceptors. @@ -100,6 +97,18 @@ class RealCall( internal var interceptorScopedExchange: Exchange? = null private set + // These properties are guarded by this. They are typically only accessed by the thread executing + // the call, but they may be accessed by other threads for duplex requests. + + /** True if this call still has a request body open. */ + private var requestBodyOpen = false + + /** True if this call still has a response body open. */ + private var responseBodyOpen = false + + /** True if there are more exchanges expected for this call. */ + private var expectMoreExchanges = true + // These properties are accessed by canceling threads. Any thread can cancel a call, and once it's // canceled it's canceled forever. @@ -215,11 +224,14 @@ class RealCall( */ fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) { check(interceptorScopedExchange == null) - check(!responseBodyOpen) { - "cannot make a new request because the previous response is still open: " + - "please call response.close()" + + synchronized(this) { + check(!responseBodyOpen) { + "cannot make a new request because the previous response is still open: " + + "please call response.close()" + } + check(!requestBodyOpen) } - check(!requestBodyOpen) if (newExchangeFinder) { this.exchangeFinder = ExchangeFinder( @@ -233,17 +245,21 @@ class RealCall( /** Finds a new or pooled connection to carry a forthcoming request and response. */ internal fun initExchange(chain: RealInterceptorChain): Exchange { - check(expectMoreExchanges) { "released" } - check(!responseBodyOpen) - check(!requestBodyOpen) + synchronized(this) { + check(expectMoreExchanges) { "released" } + check(!responseBodyOpen) + check(!requestBodyOpen) + } val exchangeFinder = this.exchangeFinder!! val codec = exchangeFinder.find(client, chain) val result = Exchange(this, eventListener, exchangeFinder, codec) this.interceptorScopedExchange = result this.exchange = result - this.requestBodyOpen = true - this.responseBodyOpen = true + synchronized(this) { + this.requestBodyOpen = true + this.responseBodyOpen = true + } if (canceled) throw IOException("Canceled") return result @@ -271,39 +287,61 @@ class RealCall( responseDone: Boolean, e: E ): E { - if (exchange != this.exchange) { - return e // This exchange was detached violently! + if (exchange != this.exchange) return e // This exchange was detached violently! + + var bothStreamsDone = false + var callDone = false + synchronized(this) { + if (requestDone && requestBodyOpen || responseDone && responseBodyOpen) { + if (requestDone) requestBodyOpen = false + if (responseDone) responseBodyOpen = false + bothStreamsDone = !requestBodyOpen && !responseBodyOpen + callDone = !requestBodyOpen && !responseBodyOpen && !expectMoreExchanges + } } - var changed = false - if (requestDone) { - if (requestBodyOpen) changed = true - this.requestBodyOpen = false - } - if (responseDone) { - if (responseBodyOpen) changed = true - this.responseBodyOpen = false - } - if (!requestBodyOpen && !responseBodyOpen && changed) { + + if (bothStreamsDone) { this.exchange = null this.connection?.incrementSuccessCount() } - return maybeReleaseConnection(e) + + if (callDone) { + return callDone(e) + } + + return e } internal fun noMoreExchanges(e: IOException?): IOException? { - expectMoreExchanges = false - return maybeReleaseConnection(e) + var callDone = false + synchronized(this) { + if (expectMoreExchanges) { + expectMoreExchanges = false + callDone = !requestBodyOpen && !responseBodyOpen + } + } + + if (callDone) { + return callDone(e) + } + + return e } /** - * Release the connection if it is no longer needed. This is called after each exchange completes - * and after the call signals that no more exchanges are expected. + * Complete this call. This should be called once these properties are all false: + * [requestBodyOpen], [responseBodyOpen], and [expectMoreExchanges]. + * + * This will release the connection if it is still held. + * + * It will also notify the listener that the call completed; either successfully or + * unsuccessfully. * * If the call was canceled or timed out, this will wrap [e] in an exception that provides that * additional context. Otherwise [e] is returned as-is. */ - private fun maybeReleaseConnection(e: E): E { - if (requestBodyOpen || responseBodyOpen || expectMoreExchanges) return e + private fun callDone(e: E): E { + assertThreadDoesntHoldLock() val connection = this.connection if (connection != null) { @@ -378,12 +416,12 @@ class RealCall( * This is usually due to either an exception or a retry. */ internal fun exitNetworkInterceptorExchange(closeExchange: Boolean) { - check(expectMoreExchanges) { "released" } + synchronized(this) { + check(expectMoreExchanges) { "released" } + } if (closeExchange) { exchange?.detachWithViolence() - check(exchange == null) - check(!requestBodyOpen && !responseBodyOpen) } interceptorScopedExchange = null