1
0
mirror of https://github.com/square/okhttp.git synced 2025-11-27 18:21:14 +03:00

Merge Job into Call. CallTest = AsyncApiTest + SyncApiTest.

This commit is contained in:
Adrian Cole
2014-04-25 09:36:38 -07:00
parent f4c83e8eb8
commit 63d4dfaaa6
14 changed files with 728 additions and 689 deletions

View File

@@ -84,7 +84,7 @@ class OkHttpAsync implements HttpClient {
@Override public void enqueue(URL url) throws Exception {
requestsInFlight.incrementAndGet();
client.call(new Request.Builder().tag(System.nanoTime()).url(url).build()).execute(callback);
client.newCall(new Request.Builder().tag(System.nanoTime()).url(url).build()).execute(callback);
}
@Override public synchronized boolean acceptingJobs() {

View File

@@ -126,7 +126,7 @@ public class Main extends HelpOption implements Runnable {
client = createClient();
Request request = createRequest();
try {
Response response = client.execute(request);
Response response = client.newCall(request).execute();
if (showHeaders) {
System.out.println(StatusLine.get(response));
Headers headers = response.headers();

View File

@@ -159,7 +159,7 @@ public final class OkApacheClient implements HttpClient {
@Override public HttpResponse execute(HttpHost host, HttpRequest request, HttpContext context)
throws IOException {
Request okRequest = transformRequest(request);
Response okResponse = client.execute(okRequest);
Response okResponse = client.newCall(okRequest).execute();
return transformResponse(okResponse);
}

View File

@@ -28,9 +28,15 @@ import java.io.InputStream;
import java.net.HttpURLConnection;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLContext;
import okio.BufferedSource;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -38,8 +44,9 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public final class AsyncApiTest {
public final class CallTest {
private MockWebServer server = new MockWebServer();
private OkHttpClient client = new OkHttpClient();
private RecordingCallback callback = new RecordingCallback();
@@ -58,17 +65,98 @@ public final class AsyncApiTest {
cache.delete();
}
@Test public void illegalToExecuteTwice() throws Exception {
server.enqueue(new MockResponse()
.setBody("abc")
.addHeader("Content-Type: text/plain"));
server.play();
Request request = new Request.Builder()
.url(server.getUrl("/"))
.header("User-Agent", "SyncApiTest")
.build();
Call call = client.newCall(request);
call.execute();
try {
call.execute();
fail();
} catch (IllegalStateException e){
assertEquals("Already Executed", e.getMessage());
}
try {
call.execute(callback);
fail();
} catch (IllegalStateException e){
assertEquals("Already Executed", e.getMessage());
}
assertTrue(server.takeRequest().getHeaders().contains("User-Agent: SyncApiTest"));
}
@Test public void illegalToExecuteTwice_Async() throws Exception {
server.enqueue(new MockResponse()
.setBody("abc")
.addHeader("Content-Type: text/plain"));
server.play();
Request request = new Request.Builder()
.url(server.getUrl("/"))
.header("User-Agent", "SyncApiTest")
.build();
Call call = client.newCall(request);
call.execute(callback);
try {
call.execute();
fail();
} catch (IllegalStateException e){
assertEquals("Already Executed", e.getMessage());
}
try {
call.execute(callback);
fail();
} catch (IllegalStateException e){
assertEquals("Already Executed", e.getMessage());
}
assertTrue(server.takeRequest().getHeaders().contains("User-Agent: SyncApiTest"));
}
@Test public void get() throws Exception {
server.enqueue(new MockResponse()
.setBody("abc")
.addHeader("Content-Type: text/plain"));
server.play();
Request request = new Request.Builder()
.url(server.getUrl("/"))
.header("User-Agent", "SyncApiTest")
.build();
executeSynchronously(request)
.assertCode(200)
.assertContainsHeaders("Content-Type: text/plain")
.assertBody("abc");
assertTrue(server.takeRequest().getHeaders().contains("User-Agent: SyncApiTest"));
}
@Test public void get_Async() throws Exception {
server.enqueue(new MockResponse()
.setBody("abc")
.addHeader("Content-Type: text/plain"));
server.play();
Request request = new Request.Builder()
.url(server.getUrl("/"))
.header("User-Agent", "AsyncApiTest")
.build();
client.call(request).execute(callback);
client.newCall(request).execute(callback);
callback.await(request.url())
.assertCode(200)
@@ -84,13 +172,33 @@ public final class AsyncApiTest {
server.enqueue(new MockResponse().setBody("ghi"));
server.play();
client.call(new Request.Builder().url(server.getUrl("/a")).build()).execute(callback);
executeSynchronously(new Request.Builder().url(server.getUrl("/a")).build())
.assertBody("abc");
executeSynchronously(new Request.Builder().url(server.getUrl("/b")).build())
.assertBody("def");
executeSynchronously(new Request.Builder().url(server.getUrl("/c")).build())
.assertBody("ghi");
assertEquals(0, server.takeRequest().getSequenceNumber());
assertEquals(1, server.takeRequest().getSequenceNumber());
assertEquals(2, server.takeRequest().getSequenceNumber());
}
@Test public void connectionPooling_Async() throws Exception {
server.enqueue(new MockResponse().setBody("abc"));
server.enqueue(new MockResponse().setBody("def"));
server.enqueue(new MockResponse().setBody("ghi"));
server.play();
client.newCall(new Request.Builder().url(server.getUrl("/a")).build()).execute(callback);
callback.await(server.getUrl("/a")).assertBody("abc");
client.call(new Request.Builder().url(server.getUrl("/b")).build()).execute(callback);
client.newCall(new Request.Builder().url(server.getUrl("/b")).build()).execute(callback);
callback.await(server.getUrl("/b")).assertBody("def");
client.call(new Request.Builder().url(server.getUrl("/c")).build()).execute(callback);
client.newCall(new Request.Builder().url(server.getUrl("/c")).build()).execute(callback);
callback.await(server.getUrl("/c")).assertBody("ghi");
assertEquals(0, server.takeRequest().getSequenceNumber());
@@ -98,6 +206,62 @@ public final class AsyncApiTest {
assertEquals(2, server.takeRequest().getSequenceNumber());
}
@Test public void connectionReuseWhenResponseBodyConsumed_Async() throws Exception {
server.enqueue(new MockResponse().setBody("abc"));
server.enqueue(new MockResponse().setBody("def"));
server.play();
Request request = new Request.Builder().url(server.getUrl("/a")).build();
client.newCall(request).execute(new Response.Callback() {
@Override public void onFailure(Failure failure) {
throw new AssertionError();
}
@Override public void onResponse(Response response) throws IOException {
InputStream bytes = response.body().byteStream();
assertEquals('a', bytes.read());
assertEquals('b', bytes.read());
assertEquals('c', bytes.read());
// This request will share a connection with 'A' cause it's all done.
client.newCall(new Request.Builder().url(server.getUrl("/b")).build()).execute(callback);
}
});
callback.await(server.getUrl("/b")).assertCode(200).assertBody("def");
assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection.
assertEquals(1, server.takeRequest().getSequenceNumber()); // Connection reuse!
}
@Test public void timeoutsUpdatedOnReusedConnections() throws Exception {
server.enqueue(new MockResponse().setBody("abc"));
server.enqueue(new MockResponse().setBody("def").throttleBody(1, 750, TimeUnit.MILLISECONDS));
server.play();
// First request: time out after 1000ms.
client.setReadTimeout(1000, TimeUnit.MILLISECONDS);
executeSynchronously(new Request.Builder().url(server.getUrl("/a")).build()).assertBody("abc");
// Second request: time out after 250ms.
client.setReadTimeout(250, TimeUnit.MILLISECONDS);
Request request = new Request.Builder().url(server.getUrl("/b")).build();
Response response = client.newCall(request).execute();
BufferedSource bodySource = response.body().source();
assertEquals('d', bodySource.readByte());
// The second byte of this request will be delayed by 750ms so we should time out after 250ms.
long startNanos = System.nanoTime();
try {
bodySource.readByte();
fail();
} catch (IOException expected) {
// Timed out as expected.
long elapsedNanos = System.nanoTime() - startNanos;
long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedNanos);
assertTrue(String.format("Timed out: %sms", elapsedMillis), elapsedMillis < 500);
}
}
@Test public void tls() throws Exception {
server.useHttps(sslContext.getSocketFactory(), false);
server.enqueue(new MockResponse()
@@ -108,10 +272,24 @@ public final class AsyncApiTest {
client.setSslSocketFactory(sslContext.getSocketFactory());
client.setHostnameVerifier(new RecordingHostnameVerifier());
executeSynchronously(new Request.Builder().url(server.getUrl("/")).build())
.assertHandshake();
}
@Test public void tls_Async() throws Exception {
server.useHttps(sslContext.getSocketFactory(), false);
server.enqueue(new MockResponse()
.setBody("abc")
.addHeader("Content-Type: text/plain"));
server.play();
client.setSslSocketFactory(sslContext.getSocketFactory());
client.setHostnameVerifier(new RecordingHostnameVerifier());
Request request = new Request.Builder()
.url(server.getUrl("/"))
.build();
client.call(request).execute(callback);
client.newCall(request).execute(callback);
callback.await(request.url()).assertHandshake();
}
@@ -125,14 +303,43 @@ public final class AsyncApiTest {
client.setSslSocketFactory(sslContext.getSocketFactory());
client.setHostnameVerifier(new RecordingHostnameVerifier());
executeSynchronously(new Request.Builder().url(server.getUrl("/")).build())
.assertBody("abc");
}
@Test public void recoverFromTlsHandshakeFailure_Async() throws Exception {
server.useHttps(sslContext.getSocketFactory(), false);
server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.FAIL_HANDSHAKE));
server.enqueue(new MockResponse().setBody("abc"));
server.play();
client.setSslSocketFactory(sslContext.getSocketFactory());
client.setHostnameVerifier(new RecordingHostnameVerifier());
Request request = new Request.Builder()
.url(server.getUrl("/"))
.build();
client.call(request).execute(callback);
client.newCall(request).execute(callback);
callback.await(request.url()).assertBody("abc");
}
@Test public void setFollowSslRedirectsFalse() throws Exception {
server.useHttps(sslContext.getSocketFactory(), false);
server.enqueue(new MockResponse()
.setResponseCode(301)
.addHeader("Location: http://square.com"));
server.play();
client.setFollowSslRedirects(false);
client.setSslSocketFactory(sslContext.getSocketFactory());
client.setHostnameVerifier(new RecordingHostnameVerifier());
Request request = new Request.Builder().url(server.getUrl("/")).build();
Response response = client.newCall(request).execute();
assertEquals(301, response.code());
}
@Test public void post() throws Exception {
server.enqueue(new MockResponse().setBody("abc"));
server.play();
@@ -141,7 +348,26 @@ public final class AsyncApiTest {
.url(server.getUrl("/"))
.post(Request.Body.create(MediaType.parse("text/plain"), "def"))
.build();
client.call(request).execute(callback);
executeSynchronously(request)
.assertCode(200)
.assertBody("abc");
RecordedRequest recordedRequest = server.takeRequest();
assertEquals("def", recordedRequest.getUtf8Body());
assertEquals("3", recordedRequest.getHeader("Content-Length"));
assertEquals("text/plain; charset=utf-8", recordedRequest.getHeader("Content-Type"));
}
@Test public void post_Async() throws Exception {
server.enqueue(new MockResponse().setBody("abc"));
server.play();
Request request = new Request.Builder()
.url(server.getUrl("/"))
.post(Request.Body.create(MediaType.parse("text/plain"), "def"))
.build();
client.newCall(request).execute(callback);
callback.await(request.url())
.assertCode(200)
@@ -153,6 +379,36 @@ public final class AsyncApiTest {
assertEquals("text/plain; charset=utf-8", recordedRequest.getHeader("Content-Type"));
}
@Test public void postBodyRetransmittedOnFailureRecovery() throws Exception {
server.enqueue(new MockResponse().setBody("abc"));
server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AFTER_REQUEST));
server.enqueue(new MockResponse().setBody("def"));
server.play();
// Seed the connection pool so we have something that can fail.
Request request1 = new Request.Builder().url(server.getUrl("/")).build();
Response response1 = client.newCall(request1).execute();
assertEquals("abc", response1.body().string());
Request request2 = new Request.Builder()
.url(server.getUrl("/"))
.post(Request.Body.create(MediaType.parse("text/plain"), "body!"))
.build();
Response response2 = client.newCall(request2).execute();
assertEquals("def", response2.body().string());
RecordedRequest get = server.takeRequest();
assertEquals(0, get.getSequenceNumber());
RecordedRequest post1 = server.takeRequest();
assertEquals("body!", post1.getUtf8Body());
assertEquals(1, post1.getSequenceNumber());
RecordedRequest post2 = server.takeRequest();
assertEquals("body!", post2.getUtf8Body());
assertEquals(0, post2.getSequenceNumber());
}
@Test public void conditionalCacheHit() throws Exception {
server.enqueue(new MockResponse().setBody("A").addHeader("ETag: v1"));
server.enqueue(new MockResponse()
@@ -162,17 +418,35 @@ public final class AsyncApiTest {
client.setOkResponseCache(cache);
executeSynchronously(new Request.Builder().url(server.getUrl("/")).build())
.assertCode(200).assertBody("A");
assertNull(server.takeRequest().getHeader("If-None-Match"));
executeSynchronously(new Request.Builder().url(server.getUrl("/")).build())
.assertCode(200).assertBody("A");
assertEquals("v1", server.takeRequest().getHeader("If-None-Match"));
}
@Test public void conditionalCacheHit_Async() throws Exception {
server.enqueue(new MockResponse().setBody("A").addHeader("ETag: v1"));
server.enqueue(new MockResponse()
.clearHeaders()
.setResponseCode(HttpURLConnection.HTTP_NOT_MODIFIED));
server.play();
client.setOkResponseCache(cache);
Request request1 = new Request.Builder()
.url(server.getUrl("/"))
.build();
client.call(request1).execute(callback);
client.newCall(request1).execute(callback);
callback.await(request1.url()).assertCode(200).assertBody("A");
assertNull(server.takeRequest().getHeader("If-None-Match"));
Request request2 = new Request.Builder()
.url(server.getUrl("/"))
.build();
client.call(request2).execute(callback);
client.newCall(request2).execute(callback);
callback.await(request2.url()).assertCode(200).assertBody("A");
assertEquals("v1", server.takeRequest().getHeader("If-None-Match"));
}
@@ -184,17 +458,33 @@ public final class AsyncApiTest {
client.setOkResponseCache(cache);
executeSynchronously(new Request.Builder().url(server.getUrl("/")).build())
.assertCode(200).assertBody("A");
assertNull(server.takeRequest().getHeader("If-None-Match"));
executeSynchronously(new Request.Builder().url(server.getUrl("/")).build())
.assertCode(200).assertBody("B");
assertEquals("v1", server.takeRequest().getHeader("If-None-Match"));
}
@Test public void conditionalCacheMiss_Async() throws Exception {
server.enqueue(new MockResponse().setBody("A").addHeader("ETag: v1"));
server.enqueue(new MockResponse().setBody("B"));
server.play();
client.setOkResponseCache(cache);
Request request1 = new Request.Builder()
.url(server.getUrl("/"))
.build();
client.call(request1).execute(callback);
client.newCall(request1).execute(callback);
callback.await(request1.url()).assertCode(200).assertBody("A");
assertNull(server.takeRequest().getHeader("If-None-Match"));
Request request2 = new Request.Builder()
.url(server.getUrl("/"))
.build();
client.call(request2).execute(callback);
client.newCall(request2).execute(callback);
callback.await(request2.url()).assertCode(200).assertBody("B");
assertEquals("v1", server.takeRequest().getHeader("If-None-Match"));
}
@@ -213,8 +503,37 @@ public final class AsyncApiTest {
server.enqueue(new MockResponse().setBody("C"));
server.play();
executeSynchronously(new Request.Builder().url(server.getUrl("/a")).build())
.assertCode(200)
.assertBody("C")
.redirectedBy()
.assertCode(302)
.assertContainsHeaders("Test: Redirect from /b to /c")
.redirectedBy()
.assertCode(301)
.assertContainsHeaders("Test: Redirect from /a to /b");
assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection.
assertEquals(1, server.takeRequest().getSequenceNumber()); // Connection reused.
assertEquals(2, server.takeRequest().getSequenceNumber()); // Connection reused again!
}
@Test public void redirect_Async() throws Exception {
server.enqueue(new MockResponse()
.setResponseCode(301)
.addHeader("Location: /b")
.addHeader("Test", "Redirect from /a to /b")
.setBody("/a has moved!"));
server.enqueue(new MockResponse()
.setResponseCode(302)
.addHeader("Location: /c")
.addHeader("Test", "Redirect from /b to /c")
.setBody("/b has moved!"));
server.enqueue(new MockResponse().setBody("C"));
server.play();
Request request = new Request.Builder().url(server.getUrl("/a")).build();
client.call(request).execute(callback);
client.newCall(request).execute(callback);
callback.await(server.getUrl("/c"))
.assertCode(200)
@@ -241,8 +560,23 @@ public final class AsyncApiTest {
server.enqueue(new MockResponse().setBody("Success!"));
server.play();
executeSynchronously(new Request.Builder().url(server.getUrl("/0")).build())
.assertCode(200)
.assertBody("Success!");
}
@Test public void follow20Redirects_Async() throws Exception {
for (int i = 0; i < 20; i++) {
server.enqueue(new MockResponse()
.setResponseCode(301)
.addHeader("Location: /" + (i + 1))
.setBody("Redirecting to /" + (i + 1)));
}
server.enqueue(new MockResponse().setBody("Success!"));
server.play();
Request request = new Request.Builder().url(server.getUrl("/0")).build();
client.call(request).execute(callback);
client.newCall(request).execute(callback);
callback.await(server.getUrl("/20"))
.assertCode(200)
.assertBody("Success!");
@@ -257,19 +591,70 @@ public final class AsyncApiTest {
}
server.play();
try {
client.newCall(new Request.Builder().url(server.getUrl("/0")).build()).execute();
fail();
} catch (IOException e) {
assertEquals("Too many redirects: 21", e.getMessage());
}
}
@Test public void doesNotFollow21Redirects_Async() throws Exception {
for (int i = 0; i < 21; i++) {
server.enqueue(new MockResponse()
.setResponseCode(301)
.addHeader("Location: /" + (i + 1))
.setBody("Redirecting to /" + (i + 1)));
}
server.play();
Request request = new Request.Builder().url(server.getUrl("/0")).build();
client.call(request).execute(callback);
client.newCall(request).execute(callback);
callback.await(server.getUrl("/20")).assertFailure("Too many redirects: 21");
}
@Test public void canceledBeforeExecute() throws Exception {
server.play();
Call call = client.newCall(new Request.Builder().url(server.getUrl("/a")).build());
call.cancel();
assertNull(call.execute());
assertEquals(0, server.getRequestCount());
}
@Test public void cancelBeforeBodyIsRead() throws Exception {
server.enqueue(new MockResponse().setBody("def").throttleBody(1, 750, TimeUnit.MILLISECONDS));
server.play();
final Call call = client.newCall(new Request.Builder().url(server.getUrl("/a")).build());
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Response> result = executor.submit(new Callable<Response>() {
@Override public Response call() throws Exception {
return call.execute();
}
});
Thread.sleep(100); // wait for it to go in flight.
call.cancel();
try {
result.get().body().bytes();
fail();
} catch (IOException e) {
}
assertEquals(1, server.getRequestCount());
}
/**
* This test puts a request in front of one that is to be canceled, so that it is canceled
* before I/O takes place.
* This test puts a request in front of one that is to be canceled, so that it is canceled before
* I/O takes place.
*/
@Test public void canceledBeforeIOSignalsOnFailure() throws Exception {
client.getDispatcher().setMaxRequests(1); // Force requests to be executed serially.
server.setDispatcher(new Dispatcher() {
char nextResponse = 'A';
@Override public MockResponse dispatch(RecordedRequest request) {
client.cancel("request B");
return new MockResponse().setBody(Character.toString(nextResponse++));
@@ -278,11 +663,11 @@ public final class AsyncApiTest {
server.play();
Request requestA = new Request.Builder().url(server.getUrl("/a")).tag("request A").build();
client.call(requestA).execute(callback);
client.newCall(requestA).execute(callback);
assertEquals("/a", server.takeRequest().getPath());
Request requestB = new Request.Builder().url(server.getUrl("/b")).tag("request B").build();
client.call(requestB).execute(callback);
client.newCall(requestB).execute(callback);
assertEquals("/b", server.takeRequest().getPath());
callback.await(requestA.url()).assertBody("A");
@@ -300,7 +685,6 @@ public final class AsyncApiTest {
canceledBeforeIOSignalsOnFailure();
}
@Test public void canceledBeforeResponseReadSignalsOnFailure() throws Exception {
server.setDispatcher(new Dispatcher() {
@Override public MockResponse dispatch(RecordedRequest request) {
@@ -311,7 +695,7 @@ public final class AsyncApiTest {
server.play();
Request requestA = new Request.Builder().url(server.getUrl("/a")).tag("request A").build();
client.call(requestA).execute(callback);
client.newCall(requestA).execute(callback);
assertEquals("/a", server.takeRequest().getPath());
callback.await(requestA.url()).assertFailure("Canceled");
@@ -340,7 +724,7 @@ public final class AsyncApiTest {
final AtomicReference<Failure> failureRef = new AtomicReference<Failure>();
Request request = new Request.Builder().url(server.getUrl("/a")).tag("request A").build();
final Call call = client.call(request);
final Call call = client.newCall(request);
call.execute(new Response.Callback() {
@Override public void onFailure(Failure failure) {
latch.countDown();
@@ -377,30 +761,9 @@ public final class AsyncApiTest {
canceledAfterResponseIsDeliveredBreaksStreamButSignalsOnce();
}
@Test public void connectionReuseWhenResponseBodyConsumed() throws Exception {
server.enqueue(new MockResponse().setBody("abc"));
server.enqueue(new MockResponse().setBody("def"));
server.play();
Request request = new Request.Builder().url(server.getUrl("/a")).build();
client.call(request).execute(new Response.Callback() {
@Override public void onFailure(Failure failure) {
throw new AssertionError();
}
@Override public void onResponse(Response response) throws IOException {
InputStream bytes = response.body().byteStream();
assertEquals('a', bytes.read());
assertEquals('b', bytes.read());
assertEquals('c', bytes.read());
// This request will share a connection with 'A' cause it's all done.
client.call(new Request.Builder().url(server.getUrl("/b")).build()).execute(callback);
}
});
callback.await(server.getUrl("/b")).assertCode(200).assertBody("def");
assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection.
assertEquals(1, server.takeRequest().getSequenceNumber()); // Connection reuse!
private RecordedResponse executeSynchronously(Request request) throws IOException {
Response response = client.newCall(request).execute();
return new RecordedResponse(request, response, response.body().string(), null);
}
/**

View File

@@ -1,5 +1,6 @@
package com.squareup.okhttp;
import com.squareup.okhttp.Call.AsyncCall;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
@@ -40,53 +41,53 @@ public final class DispatcherTest {
}
@Test public void enqueuedJobsRunImmediately() throws Exception {
client.call(newRequest("http://a/1")).execute(callback);
client.newCall(newRequest("http://a/1")).execute(callback);
executor.assertJobs("http://a/1");
}
@Test public void maxRequestsEnforced() throws Exception {
dispatcher.setMaxRequests(3);
client.call(newRequest("http://a/1")).execute(callback);
client.call(newRequest("http://a/2")).execute(callback);
client.call(newRequest("http://b/1")).execute(callback);
client.call(newRequest("http://b/2")).execute(callback);
client.newCall(newRequest("http://a/1")).execute(callback);
client.newCall(newRequest("http://a/2")).execute(callback);
client.newCall(newRequest("http://b/1")).execute(callback);
client.newCall(newRequest("http://b/2")).execute(callback);
executor.assertJobs("http://a/1", "http://a/2", "http://b/1");
}
@Test public void maxPerHostEnforced() throws Exception {
dispatcher.setMaxRequestsPerHost(2);
client.call(newRequest("http://a/1")).execute(callback);
client.call(newRequest("http://a/2")).execute(callback);
client.call(newRequest("http://a/3")).execute(callback);
client.newCall(newRequest("http://a/1")).execute(callback);
client.newCall(newRequest("http://a/2")).execute(callback);
client.newCall(newRequest("http://a/3")).execute(callback);
executor.assertJobs("http://a/1", "http://a/2");
}
@Test public void increasingMaxRequestsPromotesJobsImmediately() throws Exception {
dispatcher.setMaxRequests(2);
client.call(newRequest("http://a/1")).execute(callback);
client.call(newRequest("http://b/1")).execute(callback);
client.call(newRequest("http://c/1")).execute(callback);
client.call(newRequest("http://a/2")).execute(callback);
client.call(newRequest("http://b/2")).execute(callback);
client.newCall(newRequest("http://a/1")).execute(callback);
client.newCall(newRequest("http://b/1")).execute(callback);
client.newCall(newRequest("http://c/1")).execute(callback);
client.newCall(newRequest("http://a/2")).execute(callback);
client.newCall(newRequest("http://b/2")).execute(callback);
dispatcher.setMaxRequests(4);
executor.assertJobs("http://a/1", "http://b/1", "http://c/1", "http://a/2");
}
@Test public void increasingMaxPerHostPromotesJobsImmediately() throws Exception {
dispatcher.setMaxRequestsPerHost(2);
client.call(newRequest("http://a/1")).execute(callback);
client.call(newRequest("http://a/2")).execute(callback);
client.call(newRequest("http://a/3")).execute(callback);
client.call(newRequest("http://a/4")).execute(callback);
client.call(newRequest("http://a/5")).execute(callback);
client.newCall(newRequest("http://a/1")).execute(callback);
client.newCall(newRequest("http://a/2")).execute(callback);
client.newCall(newRequest("http://a/3")).execute(callback);
client.newCall(newRequest("http://a/4")).execute(callback);
client.newCall(newRequest("http://a/5")).execute(callback);
dispatcher.setMaxRequestsPerHost(4);
executor.assertJobs("http://a/1", "http://a/2", "http://a/3", "http://a/4");
}
@Test public void oldJobFinishesNewJobCanRunDifferentHost() throws Exception {
dispatcher.setMaxRequests(1);
client.call(newRequest("http://a/1")).execute(callback);
client.call(newRequest("http://b/1")).execute(callback);
client.newCall(newRequest("http://a/1")).execute(callback);
client.newCall(newRequest("http://b/1")).execute(callback);
executor.finishJob("http://a/1");
executor.assertJobs("http://b/1");
}
@@ -94,27 +95,27 @@ public final class DispatcherTest {
@Test public void oldJobFinishesNewJobWithSameHostStarts() throws Exception {
dispatcher.setMaxRequests(2);
dispatcher.setMaxRequestsPerHost(1);
client.call(newRequest("http://a/1")).execute(callback);
client.call(newRequest("http://b/1")).execute(callback);
client.call(newRequest("http://b/2")).execute(callback);
client.call(newRequest("http://a/2")).execute(callback);
client.newCall(newRequest("http://a/1")).execute(callback);
client.newCall(newRequest("http://b/1")).execute(callback);
client.newCall(newRequest("http://b/2")).execute(callback);
client.newCall(newRequest("http://a/2")).execute(callback);
executor.finishJob("http://a/1");
executor.assertJobs("http://b/1", "http://a/2");
}
@Test public void oldJobFinishesNewJobCantRunDueToHostLimit() throws Exception {
dispatcher.setMaxRequestsPerHost(1);
client.call(newRequest("http://a/1")).execute(callback);
client.call(newRequest("http://b/1")).execute(callback);
client.call(newRequest("http://a/2")).execute(callback);
client.newCall(newRequest("http://a/1")).execute(callback);
client.newCall(newRequest("http://b/1")).execute(callback);
client.newCall(newRequest("http://a/2")).execute(callback);
executor.finishJob("http://b/1");
executor.assertJobs("http://a/1");
}
@Test public void cancelingReadyJobPreventsItFromStarting() throws Exception {
dispatcher.setMaxRequestsPerHost(1);
client.call(newRequest("http://a/1")).execute(callback);
client.call(newRequest("http://a/2", "tag1")).execute(callback);
client.newCall(newRequest("http://a/1")).execute(callback);
client.newCall(newRequest("http://a/2", "tag1")).execute(callback);
dispatcher.cancel("tag1");
executor.finishJob("http://a/1");
executor.assertJobs();
@@ -122,8 +123,8 @@ public final class DispatcherTest {
@Test public void cancelingRunningJobTakesNoEffectUntilJobFinishes() throws Exception {
dispatcher.setMaxRequests(1);
client.call(newRequest("http://a/1", "tag1")).execute(callback);
client.call(newRequest("http://a/2")).execute(callback);
client.newCall(newRequest("http://a/1", "tag1")).execute(callback);
client.newCall(newRequest("http://a/2")).execute(callback);
dispatcher.cancel("tag1");
executor.assertJobs("http://a/1");
executor.finishJob("http://a/1");
@@ -131,26 +132,26 @@ public final class DispatcherTest {
}
class RecordingExecutor extends AbstractExecutorService {
private List<Job> jobs = new ArrayList<Job>();
private List<AsyncCall> calls = new ArrayList<AsyncCall>();
@Override public void execute(Runnable command) {
jobs.add((Job) command);
calls.add((AsyncCall) command);
}
public void assertJobs(String... expectedUrls) {
List<String> actualUrls = new ArrayList<String>();
for (Job job : jobs) {
actualUrls.add(job.request().urlString());
for (AsyncCall call : calls) {
actualUrls.add(call.request().urlString());
}
assertEquals(Arrays.asList(expectedUrls), actualUrls);
}
public void finishJob(String url) {
for (Iterator<Job> i = jobs.iterator(); i.hasNext(); ) {
Job job = i.next();
if (job.request().urlString().equals(url)) {
for (Iterator<AsyncCall> i = calls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (call.request().urlString().equals(url)) {
i.remove();
dispatcher.finished(job);
dispatcher.finished(call);
return;
}
}

View File

@@ -1,317 +0,0 @@
/*
* Copyright (C) 2014 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.squareup.okhttp;
import com.squareup.okhttp.internal.RecordingHostnameVerifier;
import com.squareup.okhttp.internal.SslContextBuilder;
import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.MockWebServer;
import com.squareup.okhttp.mockwebserver.RecordedRequest;
import com.squareup.okhttp.mockwebserver.SocketPolicy;
import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import okio.BufferedSource;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public final class SyncApiTest {
private MockWebServer server = new MockWebServer();
private OkHttpClient client = new OkHttpClient();
private static final SSLContext sslContext = SslContextBuilder.localhost();
private HttpResponseCache cache;
@Before public void setUp() throws Exception {
String tmp = System.getProperty("java.io.tmpdir");
File cacheDir = new File(tmp, "HttpCache-" + UUID.randomUUID());
cache = new HttpResponseCache(cacheDir, Integer.MAX_VALUE);
}
@After public void tearDown() throws Exception {
server.shutdown();
cache.delete();
}
@Test public void get() throws Exception {
server.enqueue(new MockResponse()
.setBody("abc")
.addHeader("Content-Type: text/plain"));
server.play();
Request request = new Request.Builder()
.url(server.getUrl("/"))
.header("User-Agent", "SyncApiTest")
.build();
onSuccess(request)
.assertCode(200)
.assertContainsHeaders("Content-Type: text/plain")
.assertBody("abc");
assertTrue(server.takeRequest().getHeaders().contains("User-Agent: SyncApiTest"));
}
@Test public void connectionPooling() throws Exception {
server.enqueue(new MockResponse().setBody("abc"));
server.enqueue(new MockResponse().setBody("def"));
server.enqueue(new MockResponse().setBody("ghi"));
server.play();
onSuccess(new Request.Builder().url(server.getUrl("/a")).build())
.assertBody("abc");
onSuccess(new Request.Builder().url(server.getUrl("/b")).build())
.assertBody("def");
onSuccess(new Request.Builder().url(server.getUrl("/c")).build())
.assertBody("ghi");
assertEquals(0, server.takeRequest().getSequenceNumber());
assertEquals(1, server.takeRequest().getSequenceNumber());
assertEquals(2, server.takeRequest().getSequenceNumber());
}
@Test public void timeoutsUpdatedOnReusedConnections() throws Exception {
server.enqueue(new MockResponse().setBody("abc"));
server.enqueue(new MockResponse().setBody("def").throttleBody(1, 750, TimeUnit.MILLISECONDS));
server.play();
// First request: time out after 1000ms.
client.setReadTimeout(1000, TimeUnit.MILLISECONDS);
onSuccess(new Request.Builder().url(server.getUrl("/a")).build()).assertBody("abc");
// Second request: time out after 250ms.
client.setReadTimeout(250, TimeUnit.MILLISECONDS);
Request request = new Request.Builder().url(server.getUrl("/b")).build();
Response response = client.execute(request);
BufferedSource bodySource = response.body().source();
assertEquals('d', bodySource.readByte());
// The second byte of this request will be delayed by 750ms so we should time out after 250ms.
long startNanos = System.nanoTime();
try {
bodySource.readByte();
fail();
} catch (IOException expected) {
// Timed out as expected.
long elapsedNanos = System.nanoTime() - startNanos;
long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedNanos);
assertTrue(String.format("Timed out: %sms", elapsedMillis), elapsedMillis < 500);
}
}
@Test public void tls() throws Exception {
server.useHttps(sslContext.getSocketFactory(), false);
server.enqueue(new MockResponse()
.setBody("abc")
.addHeader("Content-Type: text/plain"));
server.play();
client.setSslSocketFactory(sslContext.getSocketFactory());
client.setHostnameVerifier(new RecordingHostnameVerifier());
onSuccess(new Request.Builder().url(server.getUrl("/")).build())
.assertHandshake();
}
@Test public void recoverFromTlsHandshakeFailure() throws Exception {
server.useHttps(sslContext.getSocketFactory(), false);
server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.FAIL_HANDSHAKE));
server.enqueue(new MockResponse().setBody("abc"));
server.play();
client.setSslSocketFactory(sslContext.getSocketFactory());
client.setHostnameVerifier(new RecordingHostnameVerifier());
onSuccess(new Request.Builder().url(server.getUrl("/")).build())
.assertBody("abc");
}
@Test public void setFollowSslRedirectsFalse() throws Exception {
server.useHttps(sslContext.getSocketFactory(), false);
server.enqueue(new MockResponse()
.setResponseCode(301)
.addHeader("Location: http://square.com"));
server.play();
client.setFollowSslRedirects(false);
client.setSslSocketFactory(sslContext.getSocketFactory());
client.setHostnameVerifier(new RecordingHostnameVerifier());
Request request = new Request.Builder().url(server.getUrl("/")).build();
Response response = client.execute(request);
assertEquals(301, response.code());
}
@Test public void post() throws Exception {
server.enqueue(new MockResponse().setBody("abc"));
server.play();
Request request = new Request.Builder()
.url(server.getUrl("/"))
.post(Request.Body.create(MediaType.parse("text/plain"), "def"))
.build();
onSuccess(request)
.assertCode(200)
.assertBody("abc");
RecordedRequest recordedRequest = server.takeRequest();
assertEquals("def", recordedRequest.getUtf8Body());
assertEquals("3", recordedRequest.getHeader("Content-Length"));
assertEquals("text/plain; charset=utf-8", recordedRequest.getHeader("Content-Type"));
}
@Test public void conditionalCacheHit() throws Exception {
server.enqueue(new MockResponse().setBody("A").addHeader("ETag: v1"));
server.enqueue(new MockResponse()
.clearHeaders()
.setResponseCode(HttpURLConnection.HTTP_NOT_MODIFIED));
server.play();
client.setOkResponseCache(cache);
onSuccess(new Request.Builder().url(server.getUrl("/")).build())
.assertCode(200).assertBody("A");
assertNull(server.takeRequest().getHeader("If-None-Match"));
onSuccess(new Request.Builder().url(server.getUrl("/")).build())
.assertCode(200).assertBody("A");
assertEquals("v1", server.takeRequest().getHeader("If-None-Match"));
}
@Test public void conditionalCacheMiss() throws Exception {
server.enqueue(new MockResponse().setBody("A").addHeader("ETag: v1"));
server.enqueue(new MockResponse().setBody("B"));
server.play();
client.setOkResponseCache(cache);
onSuccess(new Request.Builder().url(server.getUrl("/")).build())
.assertCode(200).assertBody("A");
assertNull(server.takeRequest().getHeader("If-None-Match"));
onSuccess(new Request.Builder().url(server.getUrl("/")).build())
.assertCode(200).assertBody("B");
assertEquals("v1", server.takeRequest().getHeader("If-None-Match"));
}
@Test public void redirect() throws Exception {
server.enqueue(new MockResponse()
.setResponseCode(301)
.addHeader("Location: /b")
.addHeader("Test", "Redirect from /a to /b")
.setBody("/a has moved!"));
server.enqueue(new MockResponse()
.setResponseCode(302)
.addHeader("Location: /c")
.addHeader("Test", "Redirect from /b to /c")
.setBody("/b has moved!"));
server.enqueue(new MockResponse().setBody("C"));
server.play();
onSuccess(new Request.Builder().url(server.getUrl("/a")).build())
.assertCode(200)
.assertBody("C")
.redirectedBy()
.assertCode(302)
.assertContainsHeaders("Test: Redirect from /b to /c")
.redirectedBy()
.assertCode(301)
.assertContainsHeaders("Test: Redirect from /a to /b");
assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection.
assertEquals(1, server.takeRequest().getSequenceNumber()); // Connection reused.
assertEquals(2, server.takeRequest().getSequenceNumber()); // Connection reused again!
}
@Test public void follow20Redirects() throws Exception {
for (int i = 0; i < 20; i++) {
server.enqueue(new MockResponse()
.setResponseCode(301)
.addHeader("Location: /" + (i + 1))
.setBody("Redirecting to /" + (i + 1)));
}
server.enqueue(new MockResponse().setBody("Success!"));
server.play();
onSuccess(new Request.Builder().url(server.getUrl("/0")).build())
.assertCode(200)
.assertBody("Success!");
}
@Test public void doesNotFollow21Redirects() throws Exception {
for (int i = 0; i < 21; i++) {
server.enqueue(new MockResponse()
.setResponseCode(301)
.addHeader("Location: /" + (i + 1))
.setBody("Redirecting to /" + (i + 1)));
}
server.play();
try {
client.execute(new Request.Builder().url(server.getUrl("/0")).build());
fail();
} catch (IOException e) {
assertEquals("Too many redirects: 21", e.getMessage());
}
}
@Test public void postBodyRetransmittedOnFailureRecovery() throws Exception {
server.enqueue(new MockResponse().setBody("abc"));
server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AFTER_REQUEST));
server.enqueue(new MockResponse().setBody("def"));
server.play();
// Seed the connection pool so we have something that can fail.
Request request1 = new Request.Builder().url(server.getUrl("/")).build();
Response response1 = client.execute(request1);
assertEquals("abc", response1.body().string());
Request request2 = new Request.Builder()
.url(server.getUrl("/"))
.post(Request.Body.create(MediaType.parse("text/plain"), "body!"))
.build();
Response response2 = client.execute(request2);
assertEquals("def", response2.body().string());
RecordedRequest get = server.takeRequest();
assertEquals(0, get.getSequenceNumber());
RecordedRequest post1 = server.takeRequest();
assertEquals("body!", post1.getUtf8Body());
assertEquals(1, post1.getSequenceNumber());
RecordedRequest post2 = server.takeRequest();
assertEquals("body!", post2.getUtf8Body());
assertEquals(0, post2.getSequenceNumber());
}
private RecordedResponse onSuccess(Request request) throws IOException {
Response response = client.execute(request);
return new RecordedResponse(request, response, response.body().string(), null);
}
}

View File

@@ -15,42 +15,258 @@
*/
package com.squareup.okhttp;
import com.squareup.okhttp.internal.NamedRunnable;
import com.squareup.okhttp.internal.http.HttpEngine;
import com.squareup.okhttp.internal.http.OkHeaders;
import java.io.IOException;
import java.net.ProtocolException;
import java.util.concurrent.CancellationException;
import okio.BufferedSink;
import okio.BufferedSource;
import static com.squareup.okhttp.internal.http.HttpEngine.MAX_REDIRECTS;
/**
* A call is an asynchronous {@code request} that has been prepared for
* execution. Once executed, a call can be cancelled. As this object represents
* a single request/response pair (or stream), it cannot be executed twice.
* A call is a request that has been prepared for execution. A call can be
* canceled. As this object represents a single request/response pair (stream),
* it cannot be executed twice.
*/
public final class Call {
private final OkHttpClient client;
private final Dispatcher dispatcher;
private final Request request;
private int redirectionCount;
public Call(OkHttpClient client, Dispatcher dispatcher,
Request request) {
// Guarded by this.
private boolean executed;
volatile boolean canceled;
/** The request; possibly a consequence of redirects or auth headers. */
private Request request;
HttpEngine engine;
Call(OkHttpClient client, Dispatcher dispatcher, Request request) {
this.client = client;
this.dispatcher = dispatcher;
this.request = request;
}
/**
* Schedules the {@code request} to be executed at some point in the future.
* The {@link OkHttpClient#getDispatcher dispatcher} defines when the request
* will run: usually immediately unless there are several other requests
* currently being executed.
* Invokes the request immediately, and blocks until the response can be
* processed or is in error.
*
* <p>The caller may read the response body with the response's
* {@link Response#body} method. To facilitate connection recycling, callers
* should always {@link Response.Body#close() close the response body}.
*
* <p>Note that transport-layer success (receiving a HTTP response code,
* headers and body) does not necessarily indicate application-layer success:
* {@code response} may still indicate an unhappy HTTP response code like 404
* or 500.
*
* @return null if the call was canceled.
*
* @throws IOException if the request could not be executed due to a
* connectivity problem or timeout. Because networks can fail during an
* exchange, it is possible that the remote server accepted the request
* before the failure.
*
* @throws IllegalStateException when the call has already been executed.
*/
public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
Response result = getResponse(); // Since we don't cancel, this won't be null.
engine.releaseConnection(); // Transfer ownership of the body to the caller.
return result;
}
/**
* Schedules the request to be executed at some point in the future.
*
* <p>The {@link OkHttpClient#getDispatcher dispatcher} defines when the
* request will run: usually immediately unless there are several other
* requests currently being executed.
*
* <p>This client will later call back {@code responseCallback} with either
* an HTTP response or a failure exception. If you {@link #cancel} a request
* before it completes the callback will not be invoked.
*
* @throws IllegalStateException when the call has already been executed.
*/
public void execute(Response.Callback responseCallback) {
dispatcher.enqueue(client, request, responseCallback);
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
dispatcher.enqueue(new AsyncCall(responseCallback));
}
/**
* Cancels the request, if possible. Requests that are already complete cannot
* be canceled.
* Cancels the request, if possible. Requests that are already complete
* cannot be canceled.
*/
public void cancel() {
dispatcher.cancel(request.tag());
canceled = true;
if (engine != null) engine.disconnect();
}
final class AsyncCall extends NamedRunnable {
private final Response.Callback responseCallback;
private AsyncCall(Response.Callback responseCallback) {
super("OkHttp %s", request.urlString());
this.responseCallback = responseCallback;
}
String host() {
return request.url().getHost();
}
Request request() {
return request;
}
Object tag() {
return request.tag();
}
Call get() {
return Call.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponse();
if (canceled) {
signalledCallback = true;
responseCallback.onFailure(new Failure.Builder()
.request(request)
.exception(new CancellationException("Canceled"))
.build());
} else {
signalledCallback = true;
responseCallback.onResponse(response);
}
} catch (IOException e) {
if (signalledCallback) return; // Do not signal the callback twice!
responseCallback.onFailure(new Failure.Builder()
.request(request)
.exception(e)
.build());
} finally {
engine.close(); // Close the connection if it isn't already.
dispatcher.finished(this);
}
}
}
/**
* Performs the request and returns the response. May return null if this
* call was canceled.
*/
private Response getResponse() throws IOException {
Response redirectedBy = null;
// Copy body metadata to the appropriate request headers.
Request.Body body = request.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType == null) throw new IllegalStateException("contentType == null");
Request.Builder requestBuilder = request.newBuilder();
requestBuilder.header("Content-Type", contentType.toString());
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
request = requestBuilder.build();
}
// Create the initial HTTP engine. Retries and redirects need new engine for each attempt.
engine = new HttpEngine(client, request, false, null, null, null);
while (true) {
if (canceled) return null;
try {
engine.sendRequest();
if (body != null) {
BufferedSink sink = engine.getBufferedRequestBody();
body.writeTo(sink);
sink.flush();
}
engine.readResponse();
} catch (IOException e) {
HttpEngine retryEngine = engine.recover(e, null);
if (retryEngine != null) {
engine = retryEngine;
continue;
}
// Give up; recovery is not possible.
throw e;
}
Response response = engine.getResponse();
Request followUp = engine.followUpRequest();
if (followUp == null) {
engine.releaseConnection();
return response.newBuilder()
.body(new RealResponseBody(response, engine.getResponseBody()))
.redirectedBy(redirectedBy)
.build();
}
if (engine.getResponse().isRedirect() && ++redirectionCount > MAX_REDIRECTS) {
throw new ProtocolException("Too many redirects: " + redirectionCount);
}
// TODO: drop from POST to GET when redirected? HttpURLConnection does.
// TODO: confirm that Cookies are not retained across hosts.
if (!engine.sameConnection(followUp)) {
engine.releaseConnection();
}
Connection connection = engine.close();
redirectedBy = response.newBuilder().redirectedBy(redirectedBy).build(); // Chained.
request = followUp;
engine = new HttpEngine(client, request, false, connection, null, null);
}
}
private static class RealResponseBody extends Response.Body {
private final Response response;
private final BufferedSource source;
RealResponseBody(Response response, BufferedSource source) {
this.response = response;
this.source = source;
}
@Override public MediaType contentType() {
String contentType = response.header("Content-Type");
return contentType != null ? MediaType.parse(contentType) : null;
}
@Override public long contentLength() {
return OkHeaders.contentLength(response);
}
@Override public BufferedSource source() {
return source;
}
}
}

View File

@@ -15,6 +15,7 @@
*/
package com.squareup.okhttp;
import com.squareup.okhttp.Call.AsyncCall;
import com.squareup.okhttp.internal.Util;
import com.squareup.okhttp.internal.http.HttpEngine;
import java.util.ArrayDeque;
@@ -28,22 +29,22 @@ import java.util.concurrent.TimeUnit;
/**
* Policy on when async requests are executed.
*
* <p>Each dispatcher uses an {@link ExecutorService} to run jobs internally. If you
* <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you
* supply your own executor, it should be able to run {@linkplain #getMaxRequests the
* configured maximum} number of jobs concurrently.
* configured maximum} number of calls concurrently.
*/
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
/** Executes jobs. Created lazily. */
/** Executes calls. Created lazily. */
private ExecutorService executorService;
/** Ready jobs in the order they'll be run. */
private final Deque<Job> readyJobs = new ArrayDeque<Job>();
/** Ready calls in the order they'll be run. */
private final Deque<AsyncCall> readyCalls = new ArrayDeque<AsyncCall>();
/** Running jobs. Includes canceled jobs that haven't finished yet. */
private final Deque<Job> runningJobs = new ArrayDeque<Job>();
/** Running calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningCalls = new ArrayDeque<AsyncCall>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
@@ -62,7 +63,7 @@ public final class Dispatcher {
/**
* Set the maximum number of requests to execute concurrently. Above this
* requests queue in memory, waiting for the running jobs to complete.
* requests queue in memory, waiting for the running calls to complete.
*
* <p>If more than {@code maxRequests} requests are in flight when this is
* invoked, those requests will remain in flight.
@@ -72,7 +73,7 @@ public final class Dispatcher {
throw new IllegalArgumentException("max < 1: " + maxRequests);
}
this.maxRequests = maxRequests;
promoteJobs();
promoteCalls();
}
public synchronized int getMaxRequests() {
@@ -93,70 +94,65 @@ public final class Dispatcher {
throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
}
this.maxRequestsPerHost = maxRequestsPerHost;
promoteJobs();
promoteCalls();
}
public synchronized int getMaxRequestsPerHost() {
return maxRequestsPerHost;
}
synchronized void enqueue(OkHttpClient client, Request request, Response.Callback callback) {
// Copy the client. Otherwise changes (socket factory, redirect policy,
// etc.) may incorrectly be reflected in the request when it is executed.
client = client.copyWithDefaults();
Job job = new Job(this, client, request, callback);
if (runningJobs.size() < maxRequests && runningJobsForHost(job) < maxRequestsPerHost) {
runningJobs.add(job);
getExecutorService().execute(job);
synchronized void enqueue(AsyncCall call) {
if (runningCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningCalls.add(call);
getExecutorService().execute(call);
} else {
readyJobs.add(job);
readyCalls.add(call);
}
}
/** Cancel all jobs with the tag {@code tag}. */
/** Cancel all calls with the tag {@code tag}. */
public synchronized void cancel(Object tag) {
for (Iterator<Job> i = readyJobs.iterator(); i.hasNext(); ) {
for (Iterator<AsyncCall> i = readyCalls.iterator(); i.hasNext(); ) {
if (Util.equal(tag, i.next().tag())) i.remove();
}
for (Job job : runningJobs) {
if (Util.equal(tag, job.tag())) {
job.canceled = true;
HttpEngine engine = job.engine;
for (AsyncCall call : runningCalls) {
if (Util.equal(tag, call.tag())) {
call.get().canceled = true;
HttpEngine engine = call.get().engine;
if (engine != null) engine.disconnect();
}
}
}
/** Used by {@code Job#run} to signal completion. */
synchronized void finished(Job job) {
if (!runningJobs.remove(job)) throw new AssertionError("Job wasn't running!");
promoteJobs();
/** Used by {@code AsyncCall#run} to signal completion. */
synchronized void finished(AsyncCall call) {
if (!runningCalls.remove(call)) throw new AssertionError("AsyncCall wasn't running!");
promoteCalls();
}
private void promoteJobs() {
if (runningJobs.size() >= maxRequests) return; // Already running max capacity.
if (readyJobs.isEmpty()) return; // No ready jobs to promote.
private void promoteCalls() {
if (runningCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<Job> i = readyJobs.iterator(); i.hasNext(); ) {
Job job = i.next();
for (Iterator<AsyncCall> i = readyCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningJobsForHost(job) < maxRequestsPerHost) {
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningJobs.add(job);
getExecutorService().execute(job);
runningCalls.add(call);
getExecutorService().execute(call);
}
if (runningJobs.size() >= maxRequests) return; // Reached max capacity.
if (runningCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
/** Returns the number of running jobs that share a host with {@code job}. */
private int runningJobsForHost(Job job) {
/** Returns the number of running calls that share a host with {@code call}. */
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (Job j : runningJobs) {
if (j.host().equals(job.host())) result++;
for (AsyncCall c : runningCalls) {
if (c.host().equals(call.host())) result++;
}
return result;
}

View File

@@ -1,194 +0,0 @@
/*
* Copyright (C) 2013 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.squareup.okhttp;
import com.squareup.okhttp.internal.NamedRunnable;
import com.squareup.okhttp.internal.http.HttpEngine;
import com.squareup.okhttp.internal.http.OkHeaders;
import java.io.IOException;
import java.net.ProtocolException;
import java.util.concurrent.CancellationException;
import okio.BufferedSink;
import okio.BufferedSource;
import static com.squareup.okhttp.internal.http.HttpEngine.MAX_REDIRECTS;
final class Job extends NamedRunnable {
private final Dispatcher dispatcher;
private final OkHttpClient client;
private final Response.Callback responseCallback;
private int redirectionCount;
volatile boolean canceled;
/** The request; possibly a consequence of redirects or auth headers. */
private Request request;
HttpEngine engine;
public Job(Dispatcher dispatcher, OkHttpClient client, Request request,
Response.Callback responseCallback) {
super("OkHttp %s", request.urlString());
this.dispatcher = dispatcher;
this.client = client;
this.request = request;
this.responseCallback = responseCallback;
}
String host() {
return request.url().getHost();
}
Request request() {
return request;
}
Object tag() {
return request.tag();
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponse();
if (canceled) {
signalledCallback = true;
responseCallback.onFailure(new Failure.Builder()
.request(request)
.exception(new CancellationException("Canceled"))
.build());
} else {
signalledCallback = true;
responseCallback.onResponse(response);
}
} catch (IOException e) {
if (signalledCallback) return; // Do not signal the callback twice!
responseCallback.onFailure(new Failure.Builder()
.request(request)
.exception(e)
.build());
} finally {
engine.close(); // Close the connection if it isn't already.
dispatcher.finished(this);
}
}
/**
* Performs the request and returns the response. May return null if this job
* was canceled.
*/
Response getResponse() throws IOException {
Response redirectedBy = null;
// Copy body metadata to the appropriate request headers.
Request.Body body = request.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType == null) throw new IllegalStateException("contentType == null");
Request.Builder requestBuilder = request.newBuilder();
requestBuilder.header("Content-Type", contentType.toString());
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
request = requestBuilder.build();
}
// Create the initial HTTP engine. Retries and redirects need new engine for each attempt.
engine = new HttpEngine(client, request, false, null, null, null);
while (true) {
if (canceled) return null;
try {
engine.sendRequest();
if (body != null) {
BufferedSink sink = engine.getBufferedRequestBody();
body.writeTo(sink);
sink.flush();
}
engine.readResponse();
} catch (IOException e) {
HttpEngine retryEngine = engine.recover(e, null);
if (retryEngine != null) {
engine = retryEngine;
continue;
}
// Give up; recovery is not possible.
throw e;
}
Response response = engine.getResponse();
Request followUp = engine.followUpRequest();
if (followUp == null) {
engine.releaseConnection();
return response.newBuilder()
.body(new RealResponseBody(response, engine.getResponseBody()))
.redirectedBy(redirectedBy)
.build();
}
if (engine.getResponse().isRedirect() && ++redirectionCount > MAX_REDIRECTS) {
throw new ProtocolException("Too many redirects: " + redirectionCount);
}
// TODO: drop from POST to GET when redirected? HttpURLConnection does.
// TODO: confirm that Cookies are not retained across hosts.
if (!engine.sameConnection(followUp)) {
engine.releaseConnection();
}
Connection connection = engine.close();
redirectedBy = response.newBuilder().redirectedBy(redirectedBy).build(); // Chained.
request = followUp;
engine = new HttpEngine(client, request, false, connection, null, null);
}
}
static class RealResponseBody extends Response.Body {
private final Response response;
private final BufferedSource source;
RealResponseBody(Response response, BufferedSource source) {
this.response = response;
this.source = source;
}
@Override public MediaType contentType() {
String contentType = response.header("Content-Type");
return contentType != null ? MediaType.parse(contentType) : null;
}
@Override public long contentLength() {
return OkHeaders.contentLength(response);
}
@Override public BufferedSource source() {
return source;
}
}
}

View File

@@ -21,7 +21,6 @@ import com.squareup.okhttp.internal.huc.HttpURLConnectionImpl;
import com.squareup.okhttp.internal.huc.HttpsURLConnectionImpl;
import com.squareup.okhttp.internal.huc.ResponseCacheAdapter;
import com.squareup.okhttp.internal.tls.OkHostnameVerifier;
import java.io.IOException;
import java.net.CookieHandler;
import java.net.HttpURLConnection;
import java.net.Proxy;
@@ -353,38 +352,13 @@ public final class OkHttpClient implements URLStreamHandlerFactory, Cloneable {
}
/**
* Invokes {@code request} immediately, and blocks until the response can be
* processed or is in error.
*
* <p>The caller may read the response body with the response's
* {@link Response#body} method. To facilitate connection recycling, callers
* should always {@link Response.Body#close() close the response body}.
*
* <p>Note that transport-layer success (receiving a HTTP response code,
* headers and body) does not necessarily indicate application-layer
* success: {@code response} may still indicate an unhappy HTTP response
* code like 404 or 500.
*
* @throws IOException if the request could not be executed due to a
* connectivity problem or timeout. Because networks can fail during an
* exchange, it is possible that the remote server accepted the request
* before the failure.
* Prepares the {@code request} to be executed at some point in the future.
*/
public Response execute(Request request) throws IOException {
public Call newCall(Request request) {
// Copy the client. Otherwise changes (socket factory, redirect policy,
// etc.) may incorrectly be reflected in the request when it is executed.
OkHttpClient client = copyWithDefaults();
Job job = new Job(dispatcher, client, request, null);
Response result = job.getResponse(); // Since we don't cancel, this won't be null.
job.engine.releaseConnection(); // Transfer ownership of the body to the caller.
return result;
}
/**
* Prepares the {@code request} to be executed at some point in the future.
*/
public Call call(Request request) {
return new Call(this, dispatcher, request);
return new Call(client, dispatcher, request);
}
/**

View File

@@ -80,7 +80,7 @@ public final class Crawler {
Request request = new Request.Builder()
.url(url)
.build();
Response response = client.execute(request);
Response response = client.newCall(request).execute();
String responseSource = response.header(OkHeaders.RESPONSE_SOURCE);
int responseCode = response.code();

View File

@@ -13,7 +13,7 @@ public class GetExample {
.url("https://raw.github.com/square/okhttp/master/README.md")
.build();
Response response = client.execute(request);
Response response = client.newCall(request).execute();
System.out.println(response.body().string());
}

View File

@@ -17,7 +17,7 @@ public class PostExample {
.method("POST", body)
.build();
Response response = client.execute(request);
Response response = client.newCall(request).execute();
System.out.println(response.body().string());
}

View File

@@ -31,7 +31,7 @@ public class OkHttpContributors {
.build();
// Execute the request and retrieve the response.
Response response = client.execute(request);
Response response = client.newCall(request).execute();
// Deserialize HTTP response to concrete type.
Reader body = response.body().charStream();