mirror of
https://github.com/square/okhttp.git
synced 2026-01-15 20:56:41 +03:00
Merge pull request #3960 from square/http2-connection-flow-control
Defer StreamResetException until response body buffer is fully read.
This commit is contained in:
@@ -84,6 +84,7 @@ import static okhttp3.TestUtil.defaultClient;
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
@@ -300,6 +301,40 @@ public final class HttpOverHttp2Test {
|
||||
response2.close();
|
||||
}
|
||||
|
||||
@Test public void connectionWindowUpdateAfterCanceling() throws Exception {
|
||||
server.enqueue(new MockResponse()
|
||||
.setBody(new Buffer().write(new byte[Http2Connection.OKHTTP_CLIENT_WINDOW_SIZE + 1])));
|
||||
server.enqueue(new MockResponse()
|
||||
.setBody("abc"));
|
||||
|
||||
Call call1 = client.newCall(new Request.Builder()
|
||||
.url(server.url("/"))
|
||||
.build());
|
||||
Response response1 = call1.execute();
|
||||
|
||||
// Wait until the server has completely filled the stream and connection flow-control windows.
|
||||
int expectedFrameCount = Http2Connection.OKHTTP_CLIENT_WINDOW_SIZE / 16384;
|
||||
int dataFrameCount = 0;
|
||||
while (dataFrameCount < expectedFrameCount) {
|
||||
String log = http2Handler.take();
|
||||
if (log.equals("FINE: << 0x00000003 16384 DATA ")) {
|
||||
dataFrameCount++;
|
||||
}
|
||||
}
|
||||
|
||||
// Cancel the call and discard what we've buffered for the response body. This should free up
|
||||
// the connection flow-control window so new requests can proceed.
|
||||
call1.cancel();
|
||||
assertFalse("Call should not have completed successfully.",
|
||||
Util.discard(response1.body().source(), 1, TimeUnit.SECONDS));
|
||||
|
||||
Call call2 = client.newCall(new Request.Builder()
|
||||
.url(server.url("/"))
|
||||
.build());
|
||||
Response response2 = call2.execute();
|
||||
assertEquals("abc", response2.body().string());
|
||||
}
|
||||
|
||||
/** https://github.com/square/okhttp/issues/373 */
|
||||
@Test @Ignore public void synchronousRequest() throws Exception {
|
||||
server.enqueue(new MockResponse().setBody("A"));
|
||||
|
||||
@@ -71,7 +71,7 @@ public final class Http2Connection implements Closeable {
|
||||
// operations must synchronize on 'this' last. This ensures that we never
|
||||
// wait for a blocking operation while holding 'this'.
|
||||
|
||||
private static final int OKHTTP_CLIENT_WINDOW_SIZE = 16 * 1024 * 1024;
|
||||
static final int OKHTTP_CLIENT_WINDOW_SIZE = 16 * 1024 * 1024;
|
||||
|
||||
/**
|
||||
* Shared executor to send notifications of incoming streams. This executor requires multiple
|
||||
|
||||
@@ -332,35 +332,53 @@ public final class Http2Stream {
|
||||
@Override public long read(Buffer sink, long byteCount) throws IOException {
|
||||
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
|
||||
|
||||
long read;
|
||||
long read = -1;
|
||||
ErrorCode errorCode;
|
||||
synchronized (Http2Stream.this) {
|
||||
waitUntilReadable();
|
||||
checkNotClosed();
|
||||
if (readBuffer.size() == 0) return -1; // This source is exhausted.
|
||||
if (closed) {
|
||||
throw new IOException("stream closed");
|
||||
}
|
||||
errorCode = Http2Stream.this.errorCode;
|
||||
|
||||
// Move bytes from the read buffer into the caller's buffer.
|
||||
read = readBuffer.read(sink, Math.min(byteCount, readBuffer.size()));
|
||||
if (readBuffer.size() > 0) {
|
||||
// Move bytes from the read buffer into the caller's buffer.
|
||||
read = readBuffer.read(sink, Math.min(byteCount, readBuffer.size()));
|
||||
unacknowledgedBytesRead += read;
|
||||
}
|
||||
|
||||
// Flow control: notify the peer that we're ready for more data!
|
||||
unacknowledgedBytesRead += read;
|
||||
if (unacknowledgedBytesRead
|
||||
>= connection.okHttpSettings.getInitialWindowSize() / 2) {
|
||||
if (errorCode == null
|
||||
&& unacknowledgedBytesRead >= connection.okHttpSettings.getInitialWindowSize() / 2) {
|
||||
// Flow control: notify the peer that we're ready for more data! Only send a WINDOW_UPDATE
|
||||
// if the stream isn't in error.
|
||||
connection.writeWindowUpdateLater(id, unacknowledgedBytesRead);
|
||||
unacknowledgedBytesRead = 0;
|
||||
}
|
||||
}
|
||||
|
||||
// Update connection.unacknowledgedBytesRead outside the stream lock.
|
||||
synchronized (connection) { // Multiple application threads may hit this section.
|
||||
connection.unacknowledgedBytesRead += read;
|
||||
if (connection.unacknowledgedBytesRead
|
||||
>= connection.okHttpSettings.getInitialWindowSize() / 2) {
|
||||
connection.writeWindowUpdateLater(0, connection.unacknowledgedBytesRead);
|
||||
connection.unacknowledgedBytesRead = 0;
|
||||
if (read != -1) {
|
||||
// Update connection.unacknowledgedBytesRead outside the stream lock.
|
||||
synchronized (connection) { // Multiple application threads may hit this section.
|
||||
connection.unacknowledgedBytesRead += read;
|
||||
if (connection.unacknowledgedBytesRead
|
||||
>= connection.okHttpSettings.getInitialWindowSize() / 2) {
|
||||
connection.writeWindowUpdateLater(0, connection.unacknowledgedBytesRead);
|
||||
connection.unacknowledgedBytesRead = 0;
|
||||
}
|
||||
}
|
||||
|
||||
return read;
|
||||
}
|
||||
|
||||
return read;
|
||||
if (errorCode != null) {
|
||||
// We defer throwing the exception until now so that we can refill the connection
|
||||
// flow-control window. This is necessary because we don't transmit window updates until the
|
||||
// application reads the data. If we throw this prior to updating the connection
|
||||
// flow-control window, we risk having it go to 0 preventing the server from sending data.
|
||||
throw new StreamResetException(errorCode);
|
||||
}
|
||||
|
||||
return -1; // This source is exhausted.
|
||||
}
|
||||
|
||||
/** Returns once the source is either readable or finished. */
|
||||
@@ -427,15 +445,6 @@ public final class Http2Stream {
|
||||
}
|
||||
cancelStreamIfNecessary();
|
||||
}
|
||||
|
||||
private void checkNotClosed() throws IOException {
|
||||
if (closed) {
|
||||
throw new IOException("stream closed");
|
||||
}
|
||||
if (errorCode != null) {
|
||||
throw new StreamResetException(errorCode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void cancelStreamIfNecessary() throws IOException {
|
||||
|
||||
Reference in New Issue
Block a user