1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-18 20:40:58 +03:00

Merge pull request #743 from square/adrian.cancel-with-disconnect

Disconnect on OkHttpClient.cancel
This commit is contained in:
Adrian Cole
2014-04-22 10:38:55 -07:00
8 changed files with 118 additions and 31 deletions

View File

@@ -26,6 +26,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
@@ -261,58 +262,118 @@ public final class AsyncApiTest {
receiver.await(server.getUrl("/20")).assertFailure("Too many redirects: 21");
}
@Test public void canceledBeforeResponseReadIsNeverDelivered() throws Exception {
/**
* This test puts a request in front of one that is to be canceled, so that it is canceled
* before I/O takes place.
*/
@Test public void canceledBeforeIOSignalsOnFailure() throws Exception {
client.getDispatcher().setMaxRequests(1); // Force requests to be executed serially.
server.setDispatcher(new Dispatcher() {
char nextResponse = 'A';
@Override public MockResponse dispatch(RecordedRequest request) {
client.cancel("request A");
client.cancel("request B");
return new MockResponse().setBody(Character.toString(nextResponse++));
}
});
server.play();
// Canceling a request after the server has received a request but before
// it has delivered the response. That request will never be received to the
// client.
Request requestA = new Request.Builder().url(server.getUrl("/a")).tag("request A").build();
client.enqueue(requestA, receiver);
assertEquals("/a", server.takeRequest().getPath());
// We then make a second request (not canceled) to make sure the receiver
// has nothing left to wait for.
Request requestB = new Request.Builder().url(server.getUrl("/b")).tag("request B").build();
client.enqueue(requestB, receiver);
assertEquals("/b", server.takeRequest().getPath());
receiver.await(requestB.url()).assertBody("B");
// At this point we know the receiver is ready: if it hasn't received 'A'
// yet it never will.
receiver.assertNoResponse(requestA.url());
receiver.await(requestA.url()).assertBody("A");
// At this point we know the receiver is ready, and that it will receive a cancel failure.
receiver.await(requestB.url()).assertFailure("Canceled");
}
@Test public void canceledAfterResponseIsDeliveredDoesNothing() throws Exception {
@Test public void canceledBeforeIOSignalsOnFailure_HTTP_2() throws Exception {
enableNpn(Protocol.HTTP_2);
canceledBeforeIOSignalsOnFailure();
}
@Test public void canceledBeforeIOSignalsOnFailure_SPDY_3() throws Exception {
enableNpn(Protocol.SPDY_3);
canceledBeforeIOSignalsOnFailure();
}
@Test public void canceledBeforeResponseReadSignalsOnFailure() throws Exception {
server.setDispatcher(new Dispatcher() {
@Override public MockResponse dispatch(RecordedRequest request) {
client.cancel("request A");
return new MockResponse().setBody("A");
}
});
server.play();
Request requestA = new Request.Builder().url(server.getUrl("/a")).tag("request A").build();
client.enqueue(requestA, receiver);
assertEquals("/a", server.takeRequest().getPath());
receiver.await(requestA.url()).assertFailure("Canceled");
}
@Test public void canceledBeforeResponseReadSignalsOnFailure_HTTP_2() throws Exception {
enableNpn(Protocol.HTTP_2);
canceledBeforeResponseReadSignalsOnFailure();
}
@Test public void canceledBeforeResponseReadSignalsOnFailure_SPDY_3() throws Exception {
enableNpn(Protocol.SPDY_3);
canceledBeforeResponseReadSignalsOnFailure();
}
/**
* There's a race condition where the cancel may apply after the stream has already been
* processed.
*/
@Test public void canceledAfterResponseIsDeliveredBreaksStreamButSignalsOnce() throws Exception {
server.enqueue(new MockResponse().setBody("A"));
server.play();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<String> bodyRef = new AtomicReference<String>();
final AtomicReference<Failure> failureRef = new AtomicReference<Failure>();
Request request = new Request.Builder().url(server.getUrl("/a")).tag("request A").build();
client.enqueue(request, new Response.Receiver() {
@Override public void onFailure(Failure failure) {
throw new AssertionError();
latch.countDown();
failureRef.set(failure); // This should never occur as we don't signal twice.
}
@Override public void onResponse(Response response) throws IOException {
client.cancel("request A");
bodyRef.set(response.body().string());
latch.countDown();
try {
bodyRef.set(response.body().string());
} catch (IOException e) { // It is ok if this broke the stream.
bodyRef.set("A");
throw e; // We expect to not loop into onFailure in this case.
} finally {
latch.countDown();
}
}
});
latch.await();
assertEquals("A", bodyRef.get());
assertNull(failureRef.get());
}
@Test public void canceledAfterResponseIsDeliveredBreaksStreamButSignalsOnce_HTTP_2()
throws Exception {
enableNpn(Protocol.HTTP_2);
canceledAfterResponseIsDeliveredBreaksStreamButSignalsOnce();
}
@Test public void canceledAfterResponseIsDeliveredBreaksStreamButSignalsOnce_SPDY_3()
throws Exception {
enableNpn(Protocol.SPDY_3);
canceledAfterResponseIsDeliveredBreaksStreamButSignalsOnce();
}
@Test public void connectionReuseWhenResponseBodyConsumed() throws Exception {
@@ -372,4 +433,18 @@ public final class AsyncApiTest {
assertEquals("text/plain; charset=utf-8", request2.getHeader("Content-Type"));
assertEquals(1, request2.getSequenceNumber());
}
/**
* Tests that use this will fail unless boot classpath is set. Ex. {@code
* -Xbootclasspath/p:/tmp/npn-boot-1.1.7.v20140316.jar}
*/
private void enableNpn(Protocol protocol) {
client.setSslSocketFactory(sslContext.getSocketFactory());
client.setHostnameVerifier(new RecordingHostnameVerifier());
client.setProtocols(Arrays.asList(protocol, Protocol.HTTP_1_1));
server.useHttps(sslContext.getSocketFactory(), false);
server.setNpnEnabled(true);
server.setNpnProtocols(client.getProtocols());
}
}

View File

@@ -3086,7 +3086,7 @@ public final class URLConnectionTest {
/**
* Tests that use this will fail unless boot classpath is set. Ex. {@code
* -Xbootclasspath/p:/tmp/npn-boot-8.1.2.v20120308.jar}
* -Xbootclasspath/p:/tmp/npn-boot-1.1.7.v20140316.jar}
*/
private void enableNpn(Protocol protocol) {
client.setSslSocketFactory(sslContext.getSocketFactory());

View File

@@ -16,6 +16,7 @@
package com.squareup.okhttp;
import com.squareup.okhttp.internal.Util;
import com.squareup.okhttp.internal.http.HttpEngine;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
@@ -113,17 +114,18 @@ public final class Dispatcher {
}
}
/**
* Cancel all jobs with the tag {@code tag}. If a canceled job is running it
* may continue running until it reaches a safe point to finish.
*/
/** Cancel all jobs with the tag {@code tag}. */
public synchronized void cancel(Object tag) {
for (Iterator<Job> i = readyJobs.iterator(); i.hasNext(); ) {
if (Util.equal(tag, i.next().tag())) i.remove();
}
for (Job job : runningJobs) {
if (Util.equal(tag, job.tag())) job.canceled = true;
if (Util.equal(tag, job.tag())) {
job.canceled = true;
HttpEngine engine = job.engine;
if (engine != null) engine.disconnect();
}
}
}

View File

@@ -20,6 +20,7 @@ import com.squareup.okhttp.internal.http.HttpEngine;
import com.squareup.okhttp.internal.http.OkHeaders;
import java.io.IOException;
import java.net.ProtocolException;
import java.util.concurrent.CancellationException;
import okio.BufferedSink;
import okio.BufferedSource;
@@ -59,12 +60,21 @@ final class Job extends NamedRunnable {
}
@Override protected void execute() {
boolean signalledReceiver = false;
try {
Response response = getResponse();
if (response != null && !canceled) {
if (canceled) {
signalledReceiver = true;
responseReceiver.onFailure(new Failure.Builder()
.request(request)
.exception(new CancellationException("Canceled"))
.build());
} else {
signalledReceiver = true;
responseReceiver.onResponse(response);
}
} catch (IOException e) {
if (signalledReceiver) return; // Do not signal the receiver twice!
responseReceiver.onFailure(new Failure.Builder()
.request(request)
.exception(e)

View File

@@ -416,9 +416,12 @@ public final class HttpEngine {
* the caller's responsibility to close the request body and response body
* streams; otherwise resources may be leaked.
*/
public void disconnect() throws IOException {
public void disconnect() {
if (transport != null) {
transport.disconnect(this);
try {
transport.disconnect(this);
} catch (IOException ignored) {
}
}
}

View File

@@ -107,10 +107,7 @@ public class HttpURLConnectionImpl extends HttpURLConnection {
// Calling disconnect() before a connection exists should have no effect.
if (httpEngine == null) return;
try {
httpEngine.disconnect();
} catch (IOException ignored) {
}
httpEngine.disconnect();
// This doesn't close the stream because doing so would require all stream
// access to be synchronized. It's expected that the thread using the

View File

@@ -49,7 +49,7 @@ public final class Ping {
/**
* Returns the round trip time for this ping in nanoseconds, waiting for the
* response to arrive if necessary. Returns -1 if the response was
* cancelled.
* canceled.
*/
public long roundTripTime() throws InterruptedException {
latch.await();
@@ -58,7 +58,7 @@ public final class Ping {
/**
* Returns the round trip time for this ping in nanoseconds, or -1 if the
* response was cancelled, or -2 if the timeout elapsed before the round
* response was canceled, or -2 if the timeout elapsed before the round
* trip completed.
*/
public long roundTripTime(long timeout, TimeUnit unit) throws InterruptedException {

View File

@@ -60,7 +60,7 @@ public interface PushObserver {
boolean onData(int streamId, BufferedSource source, int byteCount, boolean last)
throws IOException;
/** Indicates the reason why this stream was cancelled. */
/** Indicates the reason why this stream was canceled. */
void onReset(int streamId, ErrorCode errorCode);
PushObserver CANCEL = new PushObserver() {