1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-14 07:22:20 +03:00

Add limited locking around the end of the call (#6095)

This is mostly necessary because we have a possibility of duplex
calls.
This commit is contained in:
Jesse Wilson
2020-05-29 03:27:58 -04:00
committed by GitHub
parent 72227dfb7a
commit 930a8aeee2

View File

@@ -87,10 +87,7 @@ class RealCall(
var connection: RealConnection? = null
private set
private var requestBodyOpen = false
private var responseBodyOpen = false
private var timeoutEarlyExit = false
private var expectMoreExchanges = true
/**
* This is the same value as [exchange], but scoped to the execution of the network interceptors.
@@ -100,6 +97,18 @@ class RealCall(
internal var interceptorScopedExchange: Exchange? = null
private set
// These properties are guarded by this. They are typically only accessed by the thread executing
// the call, but they may be accessed by other threads for duplex requests.
/** True if this call still has a request body open. */
private var requestBodyOpen = false
/** True if this call still has a response body open. */
private var responseBodyOpen = false
/** True if there are more exchanges expected for this call. */
private var expectMoreExchanges = true
// These properties are accessed by canceling threads. Any thread can cancel a call, and once it's
// canceled it's canceled forever.
@@ -215,11 +224,14 @@ class RealCall(
*/
fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
check(interceptorScopedExchange == null)
check(!responseBodyOpen) {
"cannot make a new request because the previous response is still open: " +
"please call response.close()"
synchronized(this) {
check(!responseBodyOpen) {
"cannot make a new request because the previous response is still open: " +
"please call response.close()"
}
check(!requestBodyOpen)
}
check(!requestBodyOpen)
if (newExchangeFinder) {
this.exchangeFinder = ExchangeFinder(
@@ -233,17 +245,21 @@ class RealCall(
/** Finds a new or pooled connection to carry a forthcoming request and response. */
internal fun initExchange(chain: RealInterceptorChain): Exchange {
check(expectMoreExchanges) { "released" }
check(!responseBodyOpen)
check(!requestBodyOpen)
synchronized(this) {
check(expectMoreExchanges) { "released" }
check(!responseBodyOpen)
check(!requestBodyOpen)
}
val exchangeFinder = this.exchangeFinder!!
val codec = exchangeFinder.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
this.interceptorScopedExchange = result
this.exchange = result
this.requestBodyOpen = true
this.responseBodyOpen = true
synchronized(this) {
this.requestBodyOpen = true
this.responseBodyOpen = true
}
if (canceled) throw IOException("Canceled")
return result
@@ -271,39 +287,61 @@ class RealCall(
responseDone: Boolean,
e: E
): E {
if (exchange != this.exchange) {
return e // This exchange was detached violently!
if (exchange != this.exchange) return e // This exchange was detached violently!
var bothStreamsDone = false
var callDone = false
synchronized(this) {
if (requestDone && requestBodyOpen || responseDone && responseBodyOpen) {
if (requestDone) requestBodyOpen = false
if (responseDone) responseBodyOpen = false
bothStreamsDone = !requestBodyOpen && !responseBodyOpen
callDone = !requestBodyOpen && !responseBodyOpen && !expectMoreExchanges
}
}
var changed = false
if (requestDone) {
if (requestBodyOpen) changed = true
this.requestBodyOpen = false
}
if (responseDone) {
if (responseBodyOpen) changed = true
this.responseBodyOpen = false
}
if (!requestBodyOpen && !responseBodyOpen && changed) {
if (bothStreamsDone) {
this.exchange = null
this.connection?.incrementSuccessCount()
}
return maybeReleaseConnection(e)
if (callDone) {
return callDone(e)
}
return e
}
internal fun noMoreExchanges(e: IOException?): IOException? {
expectMoreExchanges = false
return maybeReleaseConnection(e)
var callDone = false
synchronized(this) {
if (expectMoreExchanges) {
expectMoreExchanges = false
callDone = !requestBodyOpen && !responseBodyOpen
}
}
if (callDone) {
return callDone(e)
}
return e
}
/**
* Release the connection if it is no longer needed. This is called after each exchange completes
* and after the call signals that no more exchanges are expected.
* Complete this call. This should be called once these properties are all false:
* [requestBodyOpen], [responseBodyOpen], and [expectMoreExchanges].
*
* This will release the connection if it is still held.
*
* It will also notify the listener that the call completed; either successfully or
* unsuccessfully.
*
* If the call was canceled or timed out, this will wrap [e] in an exception that provides that
* additional context. Otherwise [e] is returned as-is.
*/
private fun <E : IOException?> maybeReleaseConnection(e: E): E {
if (requestBodyOpen || responseBodyOpen || expectMoreExchanges) return e
private fun <E : IOException?> callDone(e: E): E {
assertThreadDoesntHoldLock()
val connection = this.connection
if (connection != null) {
@@ -378,12 +416,12 @@ class RealCall(
* This is usually due to either an exception or a retry.
*/
internal fun exitNetworkInterceptorExchange(closeExchange: Boolean) {
check(expectMoreExchanges) { "released" }
synchronized(this) {
check(expectMoreExchanges) { "released" }
}
if (closeExchange) {
exchange?.detachWithViolence()
check(exchange == null)
check(!requestBodyOpen && !responseBodyOpen)
}
interceptorScopedExchange = null