diff --git a/okhttp-tests/src/test/java/com/squareup/okhttp/internal/http/DisconnectTest.java b/okhttp-tests/src/test/java/com/squareup/okhttp/internal/http/DisconnectTest.java new file mode 100644 index 000000000..db8421410 --- /dev/null +++ b/okhttp-tests/src/test/java/com/squareup/okhttp/internal/http/DisconnectTest.java @@ -0,0 +1,96 @@ +/* + * Copyright (C) 2014 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.squareup.okhttp.internal.http; + +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.mockwebserver.MockResponse; +import com.squareup.okhttp.mockwebserver.MockWebServer; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.util.concurrent.TimeUnit; +import org.junit.Test; + +import static org.junit.Assert.fail; + +public final class DisconnectTest { + private final MockWebServer server = new MockWebServer(); + private final OkHttpClient client = new OkHttpClient(); + + @Test public void interruptWritingRequestBody() throws Exception { + int requestBodySize = 2 * 1024 * 1024; // 2 MiB + + server.enqueue(new MockResponse() + .throttleBody(64 * 1024, 125, TimeUnit.MILLISECONDS)); // 500 Kbps + server.play(); + + HttpURLConnection connection = client.open(server.getUrl("/")); + disconnectLater(connection, 500); + + connection.setDoOutput(true); + connection.setFixedLengthStreamingMode(requestBodySize); + OutputStream requestBody = connection.getOutputStream(); + byte[] buffer = new byte[1024]; + try { + for (int i = 0; i < requestBodySize; i += buffer.length) { + requestBody.write(buffer); + requestBody.flush(); + } + fail("Expected connection to be closed"); + } catch (IOException expected) { + } + + connection.disconnect(); + } + + @Test public void interruptReadingResponseBody() throws Exception { + int responseBodySize = 2 * 1024 * 1024; // 2 MiB + + server.enqueue(new MockResponse() + .setBody(new byte[responseBodySize]) + .throttleBody(64 * 1024, 125, TimeUnit.MILLISECONDS)); // 500 Kbps + server.play(); + + HttpURLConnection connection = client.open(server.getUrl("/")); + disconnectLater(connection, 500); + + InputStream responseBody = connection.getInputStream(); + byte[] buffer = new byte[1024]; + try { + while (responseBody.read(buffer) != -1) { + } + fail("Expected connection to be closed"); + } catch (IOException expected) { + } + + connection.disconnect(); + } + + private void disconnectLater(final HttpURLConnection connection, final int delayMillis) { + Thread interruptingCow = new Thread() { + @Override public void run() { + try { + sleep(delayMillis); + connection.disconnect(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }; + interruptingCow.start(); + } +} diff --git a/okhttp-tests/src/test/java/com/squareup/okhttp/internal/http/URLConnectionTest.java b/okhttp-tests/src/test/java/com/squareup/okhttp/internal/http/URLConnectionTest.java index de3bd1b0a..83266d26c 100644 --- a/okhttp-tests/src/test/java/com/squareup/okhttp/internal/http/URLConnectionTest.java +++ b/okhttp-tests/src/test/java/com/squareup/okhttp/internal/http/URLConnectionTest.java @@ -63,6 +63,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -942,7 +943,9 @@ public final class URLConnectionTest { } @Test public void disconnectedConnection() throws IOException { - server.enqueue(new MockResponse().setBody("ABCDEFGHIJKLMNOPQR")); + server.enqueue(new MockResponse() + .throttleBody(2, 100, TimeUnit.MILLISECONDS) + .setBody("ABCD")); server.play(); connection = client.open(server.getUrl("/")); @@ -950,6 +953,10 @@ public final class URLConnectionTest { assertEquals('A', (char) in.read()); connection.disconnect(); try { + // Reading 'B' may succeed if it's buffered. + in.read(); + + // But 'C' shouldn't be buffered (the response is throttled) and this should fail. in.read(); fail("Expected a connection closed exception"); } catch (IOException expected) { @@ -1230,11 +1237,13 @@ public final class URLConnectionTest { HttpURLConnection connection1 = client.open(server.getUrl("/")); InputStream in1 = connection1.getInputStream(); assertEquals("ABCDE", readAscii(in1, 5)); + in1.close(); connection1.disconnect(); HttpURLConnection connection2 = client.open(server.getUrl("/")); InputStream in2 = connection2.getInputStream(); assertEquals("LMNOP", readAscii(in2, 5)); + in2.close(); connection2.disconnect(); assertEquals(0, server.takeRequest().getSequenceNumber()); diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpConnection.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpConnection.java index e3f1c45d2..aa1c23ee3 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpConnection.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpConnection.java @@ -117,6 +117,10 @@ public final class HttpConnection { return state == STATE_CLOSED; } + public void closeIfOwnedBy(Object owner) throws IOException { + connection.closeIfOwnedBy(owner); + } + public void flush() throws IOException { sink.flush(); } diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java index 93d6583b8..526777490 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpEngine.java @@ -384,6 +384,18 @@ public class HttpEngine { connection = null; } + /** + * Immediately closes the socket connection if it's currently held by this + * engine. Use this to interrupt an in-flight request from any thread. It's + * the caller's responsibility to close the request body and response body + * streams; otherwise resources may be leaked. + */ + public final void disconnect() throws IOException { + if (transport != null) { + transport.disconnect(this); + } + } + /** * Release any resources held by this engine. If a connection is still held by * this engine, it is returned. diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpTransport.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpTransport.java index 026e6589f..c41a244a5 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpTransport.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpTransport.java @@ -148,4 +148,8 @@ public final class HttpTransport implements Transport { // reference escapes. return httpConnection.newUnknownLengthSource(cacheRequest); } + + @Override public void disconnect(HttpEngine engine) throws IOException { + httpConnection.closeIfOwnedBy(engine); + } } diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java index 024f0fee6..e57545bdb 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java @@ -106,9 +106,18 @@ public class HttpURLConnectionImpl extends HttpURLConnection { @Override public final void disconnect() { // Calling disconnect() before a connection exists should have no effect. - if (httpEngine != null) { - httpEngine.close(); + if (httpEngine == null) return; + + try { + httpEngine.disconnect(); + } catch (IOException ignored) { } + + // This doesn't close the stream because doing so would require all stream + // access to be synchronized. It's expected that the thread using the + // connection will close its streams directly. If it doesn't, the worst + // case is that the GzipSource's Inflater won't be released until it's + // finalized. (This logs a warning on Android.) } /** diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/SpdyTransport.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/SpdyTransport.java index 2327153ba..6b54a10e8 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/SpdyTransport.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/SpdyTransport.java @@ -197,6 +197,10 @@ public final class SpdyTransport implements Transport { @Override public void releaseConnectionOnIdle() { } + @Override public void disconnect(HttpEngine engine) throws IOException { + stream.close(ErrorCode.CANCEL); + } + @Override public boolean canReuseConnection() { return true; // TODO: spdyConnection.isClosed() ? } diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/Transport.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/Transport.java index 81ddc0f5a..3a96c6022 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/Transport.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/Transport.java @@ -74,6 +74,8 @@ interface Transport { */ void releaseConnectionOnIdle() throws IOException; + void disconnect(HttpEngine engine) throws IOException; + /** * Returns true if the socket connection held by this transport can be reused * for a follow-up exchange.