diff --git a/okhttp-tests/src/test/java/okhttp3/DispatcherTest.java b/okhttp-tests/src/test/java/okhttp3/DispatcherTest.java index bb82b2f78..84e789c11 100644 --- a/okhttp-tests/src/test/java/okhttp3/DispatcherTest.java +++ b/okhttp-tests/src/test/java/okhttp3/DispatcherTest.java @@ -6,6 +6,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.TimeUnit; +import okhttp3.RealCall.AsyncCall; import org.junit.Before; import org.junit.Test; @@ -122,23 +123,23 @@ public final class DispatcherTest { } class RecordingExecutor extends AbstractExecutorService { - private List calls = new ArrayList<>(); + private List calls = new ArrayList<>(); @Override public void execute(Runnable command) { - calls.add((Call.AsyncCall) command); + calls.add((AsyncCall) command); } public void assertJobs(String... expectedUrls) { List actualUrls = new ArrayList<>(); - for (Call.AsyncCall call : calls) { + for (AsyncCall call : calls) { actualUrls.add(call.request().url().toString()); } assertEquals(Arrays.asList(expectedUrls), actualUrls); } public void finishJob(String url) { - for (Iterator i = calls.iterator(); i.hasNext(); ) { - Call.AsyncCall call = i.next(); + for (Iterator i = calls.iterator(); i.hasNext(); ) { + AsyncCall call = i.next(); if (call.request().url().toString().equals(url)) { i.remove(); dispatcher.finished(call); diff --git a/okhttp/src/main/java/okhttp3/Call.java b/okhttp/src/main/java/okhttp3/Call.java index f5c2cb9ed..6ffa8cc5b 100644 --- a/okhttp/src/main/java/okhttp3/Call.java +++ b/okhttp/src/main/java/okhttp3/Call.java @@ -15,41 +15,14 @@ */ package okhttp3; -import okhttp3.internal.NamedRunnable; -import okhttp3.internal.http.HttpEngine; -import okhttp3.internal.http.RequestException; -import okhttp3.internal.http.RouteException; -import okhttp3.internal.http.StreamAllocation; import java.io.IOException; -import java.net.ProtocolException; -import java.util.logging.Level; - -import static okhttp3.internal.Internal.logger; -import static okhttp3.internal.http.HttpEngine.MAX_FOLLOW_UPS; /** * A call is a request that has been prepared for execution. A call can be * canceled. As this object represents a single request/response pair (stream), * it cannot be executed twice. */ -public class Call { - private final OkHttpClient client; - - // Guarded by this. - private boolean executed; - volatile boolean canceled; - - /** The application's original request unadulterated by redirects or auth headers. */ - Request originalRequest; - HttpEngine engine; - - protected Call(OkHttpClient client, Request originalRequest) { - // Copy the client. Otherwise changes (socket factory, redirect policy, - // etc.) may incorrectly be reflected in the request when it is executed. - this.client = client.copyWithDefaults(); - this.originalRequest = originalRequest; - } - +public interface Call { /** * Invokes the request immediately, and blocks until the response can be * processed or is in error. @@ -70,24 +43,7 @@ public class Call { * * @throws IllegalStateException when the call has already been executed. */ - public Response execute() throws IOException { - synchronized (this) { - if (executed) throw new IllegalStateException("Already Executed"); - executed = true; - } - try { - client.getDispatcher().executed(this); - Response result = getResponseWithInterceptorChain(false); - if (result == null) throw new IOException("Canceled"); - return result; - } finally { - client.getDispatcher().finished(this); - } - } - - Object tag() { - return originalRequest.tag(); - } + Response execute() throws IOException; /** * Schedules the request to be executed at some point in the future. @@ -102,247 +58,23 @@ public class Call { * * @throws IllegalStateException when the call has already been executed. */ - public void enqueue(Callback responseCallback) { - enqueue(responseCallback, false); - } - - void enqueue(Callback responseCallback, boolean forWebSocket) { - synchronized (this) { - if (executed) throw new IllegalStateException("Already Executed"); - executed = true; - } - client.getDispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket)); - } + void enqueue(Callback responseCallback); /** * Cancels the request, if possible. Requests that are already complete * cannot be canceled. */ - public void cancel() { - canceled = true; - if (engine != null) engine.cancel(); - } + void cancel(); /** * Returns true if this call has been either {@linkplain #execute() executed} or {@linkplain * #enqueue(Callback) enqueued}. It is an error to execute a call more than once. */ - public synchronized boolean isExecuted() { - return executed; - } + boolean isExecuted(); - public boolean isCanceled() { - return canceled; - } + boolean isCanceled(); - final class AsyncCall extends NamedRunnable { - private final Callback responseCallback; - private final boolean forWebSocket; - - private AsyncCall(Callback responseCallback, boolean forWebSocket) { - super("OkHttp %s", originalRequest.url().toString()); - this.responseCallback = responseCallback; - this.forWebSocket = forWebSocket; - } - - String host() { - return originalRequest.url().host(); - } - - Request request() { - return originalRequest; - } - - Object tag() { - return originalRequest.tag(); - } - - void cancel() { - Call.this.cancel(); - } - - Call get() { - return Call.this; - } - - @Override protected void execute() { - boolean signalledCallback = false; - try { - Response response = getResponseWithInterceptorChain(forWebSocket); - if (canceled) { - signalledCallback = true; - responseCallback.onFailure(originalRequest, new IOException("Canceled")); - } else { - signalledCallback = true; - responseCallback.onResponse(response); - } - } catch (IOException e) { - if (signalledCallback) { - // Do not signal the callback twice! - logger.log(Level.INFO, "Callback failure for " + toLoggableString(), e); - } else { - Request request = engine == null ? originalRequest : engine.getRequest(); - responseCallback.onFailure(request, e); - } - } finally { - client.getDispatcher().finished(this); - } - } - } - - /** - * Returns a string that describes this call. Doesn't include a full URL as that might contain - * sensitive information. - */ - private String toLoggableString() { - String string = canceled ? "canceled call" : "call"; - HttpUrl redactedUrl = originalRequest.url().resolve("/..."); - return string + " to " + redactedUrl; - } - - private Response getResponseWithInterceptorChain(boolean forWebSocket) throws IOException { - Interceptor.Chain chain = new ApplicationInterceptorChain(0, originalRequest, forWebSocket); - return chain.proceed(originalRequest); - } - - class ApplicationInterceptorChain implements Interceptor.Chain { - private final int index; - private final Request request; - private final boolean forWebSocket; - - ApplicationInterceptorChain(int index, Request request, boolean forWebSocket) { - this.index = index; - this.request = request; - this.forWebSocket = forWebSocket; - } - - @Override public Connection connection() { - return null; - } - - @Override public Request request() { - return request; - } - - @Override public Response proceed(Request request) throws IOException { - // If there's another interceptor in the chain, call that. - if (index < client.interceptors().size()) { - Interceptor.Chain chain = new ApplicationInterceptorChain(index + 1, request, forWebSocket); - Interceptor interceptor = client.interceptors().get(index); - Response interceptedResponse = interceptor.intercept(chain); - - if (interceptedResponse == null) { - throw new NullPointerException("application interceptor " + interceptor - + " returned null"); - } - - return interceptedResponse; - } - - // No more interceptors. Do HTTP. - return getResponse(request, forWebSocket); - } - } - - /** - * Performs the request and returns the response. May return null if this - * call was canceled. - */ - Response getResponse(Request request, boolean forWebSocket) throws IOException { - // Copy body metadata to the appropriate request headers. - RequestBody body = request.body(); - if (body != null) { - Request.Builder requestBuilder = request.newBuilder(); - - MediaType contentType = body.contentType(); - if (contentType != null) { - requestBuilder.header("Content-Type", contentType.toString()); - } - - long contentLength = body.contentLength(); - if (contentLength != -1) { - requestBuilder.header("Content-Length", Long.toString(contentLength)); - requestBuilder.removeHeader("Transfer-Encoding"); - } else { - requestBuilder.header("Transfer-Encoding", "chunked"); - requestBuilder.removeHeader("Content-Length"); - } - - request = requestBuilder.build(); - } - - // Create the initial HTTP engine. Retries and redirects need new engine for each attempt. - engine = new HttpEngine(client, request, false, false, forWebSocket, null, null, null); - - int followUpCount = 0; - while (true) { - if (canceled) { - engine.releaseStreamAllocation(); - throw new IOException("Canceled"); - } - - boolean releaseConnection = true; - try { - engine.sendRequest(); - engine.readResponse(); - releaseConnection = false; - } catch (RequestException e) { - // The attempt to interpret the request failed. Give up. - throw e.getCause(); - } catch (RouteException e) { - // The attempt to connect via a route failed. The request will not have been sent. - HttpEngine retryEngine = engine.recover(e.getLastConnectException(), null); - if (retryEngine != null) { - releaseConnection = false; - engine = retryEngine; - continue; - } - // Give up; recovery is not possible. - throw e.getLastConnectException(); - } catch (IOException e) { - // An attempt to communicate with a server failed. The request may have been sent. - HttpEngine retryEngine = engine.recover(e, null); - if (retryEngine != null) { - releaseConnection = false; - engine = retryEngine; - continue; - } - - // Give up; recovery is not possible. - throw e; - } finally { - // We're throwing an unchecked exception. Release any resources. - if (releaseConnection) { - StreamAllocation streamAllocation = engine.close(); - streamAllocation.release(); - } - } - - Response response = engine.getResponse(); - Request followUp = engine.followUpRequest(); - - if (followUp == null) { - if (!forWebSocket) { - engine.releaseStreamAllocation(); - } - return response; - } - - StreamAllocation streamAllocation = engine.close(); - - if (++followUpCount > MAX_FOLLOW_UPS) { - streamAllocation.release(); - throw new ProtocolException("Too many follow-up requests: " + followUpCount); - } - - if (!engine.sameConnection(followUp.url())) { - streamAllocation.release(); - streamAllocation = null; - } - - request = followUp; - engine = new HttpEngine(client, request, false, false, forWebSocket, streamAllocation, null, - response); - } + interface Factory { + Call newCall(Request request); } } diff --git a/okhttp/src/main/java/okhttp3/Dispatcher.java b/okhttp/src/main/java/okhttp3/Dispatcher.java index 4c1dc0bda..084a5cd9d 100644 --- a/okhttp/src/main/java/okhttp3/Dispatcher.java +++ b/okhttp/src/main/java/okhttp3/Dispatcher.java @@ -15,7 +15,7 @@ */ package okhttp3; -import okhttp3.Call.AsyncCall; +import okhttp3.RealCall.AsyncCall; import okhttp3.internal.Util; import okhttp3.internal.http.HttpEngine; import java.util.ArrayDeque; @@ -47,7 +47,7 @@ public final class Dispatcher { private final Deque runningCalls = new ArrayDeque<>(); /** In-flight synchronous calls. Includes canceled calls that haven't finished yet. */ - private final Deque executedCalls = new ArrayDeque<>(); + private final Deque executedCalls = new ArrayDeque<>(); public Dispatcher(ExecutorService executorService) { this.executorService = executorService; @@ -129,7 +129,7 @@ public final class Dispatcher { } } - for (Call call : executedCalls) { + for (RealCall call : executedCalls) { if (Util.equal(tag, call.tag())) { call.cancel(); } @@ -169,7 +169,7 @@ public final class Dispatcher { } /** Used by {@code Call#execute} to signal it is in-flight. */ - synchronized void executed(Call call) { + synchronized void executed(RealCall call) { executedCalls.add(call); } diff --git a/okhttp/src/main/java/okhttp3/OkHttpClient.java b/okhttp/src/main/java/okhttp3/OkHttpClient.java index 7e1c2549c..416df41ce 100644 --- a/okhttp/src/main/java/okhttp3/OkHttpClient.java +++ b/okhttp/src/main/java/okhttp3/OkHttpClient.java @@ -50,7 +50,7 @@ import okhttp3.internal.tls.OkHostnameVerifier; * {@link #clone()} to make a shallow copy of the OkHttpClient that can be * safely modified with further configuration changes. */ -public class OkHttpClient implements Cloneable { +public class OkHttpClient implements Cloneable, Call.Factory { private static final List DEFAULT_PROTOCOLS = Util.immutableList( Protocol.HTTP_2, Protocol.SPDY_3, Protocol.HTTP_1_1); @@ -95,11 +95,11 @@ public class OkHttpClient implements Cloneable { @Override public void callEnqueue(Call call, Callback responseCallback, boolean forWebSocket) { - call.enqueue(responseCallback, forWebSocket); + ((RealCall) call).enqueue(responseCallback, forWebSocket); } @Override public StreamAllocation callEngineGetStreamAllocation(Call call) { - return call.engine.streamAllocation; + return ((RealCall) call).engine.streamAllocation; } @Override @@ -582,8 +582,8 @@ public class OkHttpClient implements Cloneable { /** * Prepares the {@code request} to be executed at some point in the future. */ - public Call newCall(Request request) { - return new Call(this, request); + @Override public Call newCall(Request request) { + return new RealCall(this, request); } /** diff --git a/okhttp/src/main/java/okhttp3/RealCall.java b/okhttp/src/main/java/okhttp3/RealCall.java new file mode 100644 index 000000000..ab8202819 --- /dev/null +++ b/okhttp/src/main/java/okhttp3/RealCall.java @@ -0,0 +1,302 @@ +/* + * Copyright (C) 2014 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3; + +import java.io.IOException; +import java.net.ProtocolException; +import java.util.logging.Level; +import okhttp3.internal.NamedRunnable; +import okhttp3.internal.http.HttpEngine; +import okhttp3.internal.http.RequestException; +import okhttp3.internal.http.RouteException; +import okhttp3.internal.http.StreamAllocation; + +import static okhttp3.internal.Internal.logger; +import static okhttp3.internal.http.HttpEngine.MAX_FOLLOW_UPS; + +final class RealCall implements Call { + private final OkHttpClient client; + + // Guarded by this. + private boolean executed; + volatile boolean canceled; + + /** The application's original request unadulterated by redirects or auth headers. */ + Request originalRequest; + HttpEngine engine; + + protected RealCall(OkHttpClient client, Request originalRequest) { + // Copy the client. Otherwise changes (socket factory, redirect policy, + // etc.) may incorrectly be reflected in the request when it is executed. + this.client = client.copyWithDefaults(); + this.originalRequest = originalRequest; + } + + @Override public Response execute() throws IOException { + synchronized (this) { + if (executed) throw new IllegalStateException("Already Executed"); + executed = true; + } + try { + client.getDispatcher().executed(this); + Response result = getResponseWithInterceptorChain(false); + if (result == null) throw new IOException("Canceled"); + return result; + } finally { + client.getDispatcher().finished(this); + } + } + + Object tag() { + return originalRequest.tag(); + } + + @Override public void enqueue(Callback responseCallback) { + enqueue(responseCallback, false); + } + + void enqueue(Callback responseCallback, boolean forWebSocket) { + synchronized (this) { + if (executed) throw new IllegalStateException("Already Executed"); + executed = true; + } + client.getDispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket)); + } + + @Override public void cancel() { + canceled = true; + if (engine != null) engine.cancel(); + } + + @Override public synchronized boolean isExecuted() { + return executed; + } + + @Override public boolean isCanceled() { + return canceled; + } + + final class AsyncCall extends NamedRunnable { + private final Callback responseCallback; + private final boolean forWebSocket; + + private AsyncCall(Callback responseCallback, boolean forWebSocket) { + super("OkHttp %s", originalRequest.url().toString()); + this.responseCallback = responseCallback; + this.forWebSocket = forWebSocket; + } + + String host() { + return originalRequest.url().host(); + } + + Request request() { + return originalRequest; + } + + Object tag() { + return originalRequest.tag(); + } + + void cancel() { + RealCall.this.cancel(); + } + + RealCall get() { + return RealCall.this; + } + + @Override protected void execute() { + boolean signalledCallback = false; + try { + Response response = getResponseWithInterceptorChain(forWebSocket); + if (canceled) { + signalledCallback = true; + responseCallback.onFailure(originalRequest, new IOException("Canceled")); + } else { + signalledCallback = true; + responseCallback.onResponse(response); + } + } catch (IOException e) { + if (signalledCallback) { + // Do not signal the callback twice! + logger.log(Level.INFO, "Callback failure for " + toLoggableString(), e); + } else { + Request request = engine == null ? originalRequest : engine.getRequest(); + responseCallback.onFailure(request, e); + } + } finally { + client.getDispatcher().finished(this); + } + } + } + + /** + * Returns a string that describes this call. Doesn't include a full URL as that might contain + * sensitive information. + */ + private String toLoggableString() { + String string = canceled ? "canceled call" : "call"; + HttpUrl redactedUrl = originalRequest.url().resolve("/..."); + return string + " to " + redactedUrl; + } + + private Response getResponseWithInterceptorChain(boolean forWebSocket) throws IOException { + Interceptor.Chain chain = new ApplicationInterceptorChain(0, originalRequest, forWebSocket); + return chain.proceed(originalRequest); + } + + class ApplicationInterceptorChain implements Interceptor.Chain { + private final int index; + private final Request request; + private final boolean forWebSocket; + + ApplicationInterceptorChain(int index, Request request, boolean forWebSocket) { + this.index = index; + this.request = request; + this.forWebSocket = forWebSocket; + } + + @Override public Connection connection() { + return null; + } + + @Override public Request request() { + return request; + } + + @Override public Response proceed(Request request) throws IOException { + // If there's another interceptor in the chain, call that. + if (index < client.interceptors().size()) { + Interceptor.Chain chain = new ApplicationInterceptorChain(index + 1, request, forWebSocket); + Interceptor interceptor = client.interceptors().get(index); + Response interceptedResponse = interceptor.intercept(chain); + + if (interceptedResponse == null) { + throw new NullPointerException("application interceptor " + interceptor + + " returned null"); + } + + return interceptedResponse; + } + + // No more interceptors. Do HTTP. + return getResponse(request, forWebSocket); + } + } + + /** + * Performs the request and returns the response. May return null if this + * call was canceled. + */ + Response getResponse(Request request, boolean forWebSocket) throws IOException { + // Copy body metadata to the appropriate request headers. + RequestBody body = request.body(); + if (body != null) { + Request.Builder requestBuilder = request.newBuilder(); + + MediaType contentType = body.contentType(); + if (contentType != null) { + requestBuilder.header("Content-Type", contentType.toString()); + } + + long contentLength = body.contentLength(); + if (contentLength != -1) { + requestBuilder.header("Content-Length", Long.toString(contentLength)); + requestBuilder.removeHeader("Transfer-Encoding"); + } else { + requestBuilder.header("Transfer-Encoding", "chunked"); + requestBuilder.removeHeader("Content-Length"); + } + + request = requestBuilder.build(); + } + + // Create the initial HTTP engine. Retries and redirects need new engine for each attempt. + engine = new HttpEngine(client, request, false, false, forWebSocket, null, null, null); + + int followUpCount = 0; + while (true) { + if (canceled) { + engine.releaseStreamAllocation(); + throw new IOException("Canceled"); + } + + boolean releaseConnection = true; + try { + engine.sendRequest(); + engine.readResponse(); + releaseConnection = false; + } catch (RequestException e) { + // The attempt to interpret the request failed. Give up. + throw e.getCause(); + } catch (RouteException e) { + // The attempt to connect via a route failed. The request will not have been sent. + HttpEngine retryEngine = engine.recover(e.getLastConnectException(), null); + if (retryEngine != null) { + releaseConnection = false; + engine = retryEngine; + continue; + } + // Give up; recovery is not possible. + throw e.getLastConnectException(); + } catch (IOException e) { + // An attempt to communicate with a server failed. The request may have been sent. + HttpEngine retryEngine = engine.recover(e, null); + if (retryEngine != null) { + releaseConnection = false; + engine = retryEngine; + continue; + } + + // Give up; recovery is not possible. + throw e; + } finally { + // We're throwing an unchecked exception. Release any resources. + if (releaseConnection) { + StreamAllocation streamAllocation = engine.close(); + streamAllocation.release(); + } + } + + Response response = engine.getResponse(); + Request followUp = engine.followUpRequest(); + + if (followUp == null) { + if (!forWebSocket) { + engine.releaseStreamAllocation(); + } + return response; + } + + StreamAllocation streamAllocation = engine.close(); + + if (++followUpCount > MAX_FOLLOW_UPS) { + streamAllocation.release(); + throw new ProtocolException("Too many follow-up requests: " + followUpCount); + } + + if (!engine.sameConnection(followUp.url())) { + streamAllocation.release(); + streamAllocation = null; + } + + request = followUp; + engine = new HttpEngine(client, request, false, false, forWebSocket, streamAllocation, null, + response); + } + } +}