1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-17 08:42:25 +03:00

Make call an interface, and introduce Call.Factory.

More documentation forthcoming.
This commit is contained in:
jwilson
2015-12-23 15:55:30 -05:00
parent 6aff563d5a
commit fb700e20af
5 changed files with 325 additions and 290 deletions

View File

@@ -6,6 +6,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.TimeUnit;
import okhttp3.RealCall.AsyncCall;
import org.junit.Before;
import org.junit.Test;
@@ -122,23 +123,23 @@ public final class DispatcherTest {
}
class RecordingExecutor extends AbstractExecutorService {
private List<Call.AsyncCall> calls = new ArrayList<>();
private List<AsyncCall> calls = new ArrayList<>();
@Override public void execute(Runnable command) {
calls.add((Call.AsyncCall) command);
calls.add((AsyncCall) command);
}
public void assertJobs(String... expectedUrls) {
List<String> actualUrls = new ArrayList<>();
for (Call.AsyncCall call : calls) {
for (AsyncCall call : calls) {
actualUrls.add(call.request().url().toString());
}
assertEquals(Arrays.asList(expectedUrls), actualUrls);
}
public void finishJob(String url) {
for (Iterator<Call.AsyncCall> i = calls.iterator(); i.hasNext(); ) {
Call.AsyncCall call = i.next();
for (Iterator<AsyncCall> i = calls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (call.request().url().toString().equals(url)) {
i.remove();
dispatcher.finished(call);

View File

@@ -15,41 +15,14 @@
*/
package okhttp3;
import okhttp3.internal.NamedRunnable;
import okhttp3.internal.http.HttpEngine;
import okhttp3.internal.http.RequestException;
import okhttp3.internal.http.RouteException;
import okhttp3.internal.http.StreamAllocation;
import java.io.IOException;
import java.net.ProtocolException;
import java.util.logging.Level;
import static okhttp3.internal.Internal.logger;
import static okhttp3.internal.http.HttpEngine.MAX_FOLLOW_UPS;
/**
* A call is a request that has been prepared for execution. A call can be
* canceled. As this object represents a single request/response pair (stream),
* it cannot be executed twice.
*/
public class Call {
private final OkHttpClient client;
// Guarded by this.
private boolean executed;
volatile boolean canceled;
/** The application's original request unadulterated by redirects or auth headers. */
Request originalRequest;
HttpEngine engine;
protected Call(OkHttpClient client, Request originalRequest) {
// Copy the client. Otherwise changes (socket factory, redirect policy,
// etc.) may incorrectly be reflected in the request when it is executed.
this.client = client.copyWithDefaults();
this.originalRequest = originalRequest;
}
public interface Call {
/**
* Invokes the request immediately, and blocks until the response can be
* processed or is in error.
@@ -70,24 +43,7 @@ public class Call {
*
* @throws IllegalStateException when the call has already been executed.
*/
public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
try {
client.getDispatcher().executed(this);
Response result = getResponseWithInterceptorChain(false);
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.getDispatcher().finished(this);
}
}
Object tag() {
return originalRequest.tag();
}
Response execute() throws IOException;
/**
* Schedules the request to be executed at some point in the future.
@@ -102,247 +58,23 @@ public class Call {
*
* @throws IllegalStateException when the call has already been executed.
*/
public void enqueue(Callback responseCallback) {
enqueue(responseCallback, false);
}
void enqueue(Callback responseCallback, boolean forWebSocket) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
client.getDispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket));
}
void enqueue(Callback responseCallback);
/**
* Cancels the request, if possible. Requests that are already complete
* cannot be canceled.
*/
public void cancel() {
canceled = true;
if (engine != null) engine.cancel();
}
void cancel();
/**
* Returns true if this call has been either {@linkplain #execute() executed} or {@linkplain
* #enqueue(Callback) enqueued}. It is an error to execute a call more than once.
*/
public synchronized boolean isExecuted() {
return executed;
}
boolean isExecuted();
public boolean isCanceled() {
return canceled;
}
boolean isCanceled();
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
private final boolean forWebSocket;
private AsyncCall(Callback responseCallback, boolean forWebSocket) {
super("OkHttp %s", originalRequest.url().toString());
this.responseCallback = responseCallback;
this.forWebSocket = forWebSocket;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
Object tag() {
return originalRequest.tag();
}
void cancel() {
Call.this.cancel();
}
Call get() {
return Call.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain(forWebSocket);
if (canceled) {
signalledCallback = true;
responseCallback.onFailure(originalRequest, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
logger.log(Level.INFO, "Callback failure for " + toLoggableString(), e);
} else {
Request request = engine == null ? originalRequest : engine.getRequest();
responseCallback.onFailure(request, e);
}
} finally {
client.getDispatcher().finished(this);
}
}
}
/**
* Returns a string that describes this call. Doesn't include a full URL as that might contain
* sensitive information.
*/
private String toLoggableString() {
String string = canceled ? "canceled call" : "call";
HttpUrl redactedUrl = originalRequest.url().resolve("/...");
return string + " to " + redactedUrl;
}
private Response getResponseWithInterceptorChain(boolean forWebSocket) throws IOException {
Interceptor.Chain chain = new ApplicationInterceptorChain(0, originalRequest, forWebSocket);
return chain.proceed(originalRequest);
}
class ApplicationInterceptorChain implements Interceptor.Chain {
private final int index;
private final Request request;
private final boolean forWebSocket;
ApplicationInterceptorChain(int index, Request request, boolean forWebSocket) {
this.index = index;
this.request = request;
this.forWebSocket = forWebSocket;
}
@Override public Connection connection() {
return null;
}
@Override public Request request() {
return request;
}
@Override public Response proceed(Request request) throws IOException {
// If there's another interceptor in the chain, call that.
if (index < client.interceptors().size()) {
Interceptor.Chain chain = new ApplicationInterceptorChain(index + 1, request, forWebSocket);
Interceptor interceptor = client.interceptors().get(index);
Response interceptedResponse = interceptor.intercept(chain);
if (interceptedResponse == null) {
throw new NullPointerException("application interceptor " + interceptor
+ " returned null");
}
return interceptedResponse;
}
// No more interceptors. Do HTTP.
return getResponse(request, forWebSocket);
}
}
/**
* Performs the request and returns the response. May return null if this
* call was canceled.
*/
Response getResponse(Request request, boolean forWebSocket) throws IOException {
// Copy body metadata to the appropriate request headers.
RequestBody body = request.body();
if (body != null) {
Request.Builder requestBuilder = request.newBuilder();
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
request = requestBuilder.build();
}
// Create the initial HTTP engine. Retries and redirects need new engine for each attempt.
engine = new HttpEngine(client, request, false, false, forWebSocket, null, null, null);
int followUpCount = 0;
while (true) {
if (canceled) {
engine.releaseStreamAllocation();
throw new IOException("Canceled");
}
boolean releaseConnection = true;
try {
engine.sendRequest();
engine.readResponse();
releaseConnection = false;
} catch (RequestException e) {
// The attempt to interpret the request failed. Give up.
throw e.getCause();
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
HttpEngine retryEngine = engine.recover(e.getLastConnectException(), null);
if (retryEngine != null) {
releaseConnection = false;
engine = retryEngine;
continue;
}
// Give up; recovery is not possible.
throw e.getLastConnectException();
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
HttpEngine retryEngine = engine.recover(e, null);
if (retryEngine != null) {
releaseConnection = false;
engine = retryEngine;
continue;
}
// Give up; recovery is not possible.
throw e;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
StreamAllocation streamAllocation = engine.close();
streamAllocation.release();
}
}
Response response = engine.getResponse();
Request followUp = engine.followUpRequest();
if (followUp == null) {
if (!forWebSocket) {
engine.releaseStreamAllocation();
}
return response;
}
StreamAllocation streamAllocation = engine.close();
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (!engine.sameConnection(followUp.url())) {
streamAllocation.release();
streamAllocation = null;
}
request = followUp;
engine = new HttpEngine(client, request, false, false, forWebSocket, streamAllocation, null,
response);
}
interface Factory {
Call newCall(Request request);
}
}

View File

@@ -15,7 +15,7 @@
*/
package okhttp3;
import okhttp3.Call.AsyncCall;
import okhttp3.RealCall.AsyncCall;
import okhttp3.internal.Util;
import okhttp3.internal.http.HttpEngine;
import java.util.ArrayDeque;
@@ -47,7 +47,7 @@ public final class Dispatcher {
private final Deque<AsyncCall> runningCalls = new ArrayDeque<>();
/** In-flight synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<Call> executedCalls = new ArrayDeque<>();
private final Deque<RealCall> executedCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
@@ -129,7 +129,7 @@ public final class Dispatcher {
}
}
for (Call call : executedCalls) {
for (RealCall call : executedCalls) {
if (Util.equal(tag, call.tag())) {
call.cancel();
}
@@ -169,7 +169,7 @@ public final class Dispatcher {
}
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(Call call) {
synchronized void executed(RealCall call) {
executedCalls.add(call);
}

View File

@@ -50,7 +50,7 @@ import okhttp3.internal.tls.OkHostnameVerifier;
* {@link #clone()} to make a shallow copy of the OkHttpClient that can be
* safely modified with further configuration changes.
*/
public class OkHttpClient implements Cloneable {
public class OkHttpClient implements Cloneable, Call.Factory {
private static final List<Protocol> DEFAULT_PROTOCOLS = Util.immutableList(
Protocol.HTTP_2, Protocol.SPDY_3, Protocol.HTTP_1_1);
@@ -95,11 +95,11 @@ public class OkHttpClient implements Cloneable {
@Override
public void callEnqueue(Call call, Callback responseCallback, boolean forWebSocket) {
call.enqueue(responseCallback, forWebSocket);
((RealCall) call).enqueue(responseCallback, forWebSocket);
}
@Override public StreamAllocation callEngineGetStreamAllocation(Call call) {
return call.engine.streamAllocation;
return ((RealCall) call).engine.streamAllocation;
}
@Override
@@ -582,8 +582,8 @@ public class OkHttpClient implements Cloneable {
/**
* Prepares the {@code request} to be executed at some point in the future.
*/
public Call newCall(Request request) {
return new Call(this, request);
@Override public Call newCall(Request request) {
return new RealCall(this, request);
}
/**

View File

@@ -0,0 +1,302 @@
/*
* Copyright (C) 2014 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3;
import java.io.IOException;
import java.net.ProtocolException;
import java.util.logging.Level;
import okhttp3.internal.NamedRunnable;
import okhttp3.internal.http.HttpEngine;
import okhttp3.internal.http.RequestException;
import okhttp3.internal.http.RouteException;
import okhttp3.internal.http.StreamAllocation;
import static okhttp3.internal.Internal.logger;
import static okhttp3.internal.http.HttpEngine.MAX_FOLLOW_UPS;
final class RealCall implements Call {
private final OkHttpClient client;
// Guarded by this.
private boolean executed;
volatile boolean canceled;
/** The application's original request unadulterated by redirects or auth headers. */
Request originalRequest;
HttpEngine engine;
protected RealCall(OkHttpClient client, Request originalRequest) {
// Copy the client. Otherwise changes (socket factory, redirect policy,
// etc.) may incorrectly be reflected in the request when it is executed.
this.client = client.copyWithDefaults();
this.originalRequest = originalRequest;
}
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
try {
client.getDispatcher().executed(this);
Response result = getResponseWithInterceptorChain(false);
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.getDispatcher().finished(this);
}
}
Object tag() {
return originalRequest.tag();
}
@Override public void enqueue(Callback responseCallback) {
enqueue(responseCallback, false);
}
void enqueue(Callback responseCallback, boolean forWebSocket) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
client.getDispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket));
}
@Override public void cancel() {
canceled = true;
if (engine != null) engine.cancel();
}
@Override public synchronized boolean isExecuted() {
return executed;
}
@Override public boolean isCanceled() {
return canceled;
}
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
private final boolean forWebSocket;
private AsyncCall(Callback responseCallback, boolean forWebSocket) {
super("OkHttp %s", originalRequest.url().toString());
this.responseCallback = responseCallback;
this.forWebSocket = forWebSocket;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
Object tag() {
return originalRequest.tag();
}
void cancel() {
RealCall.this.cancel();
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain(forWebSocket);
if (canceled) {
signalledCallback = true;
responseCallback.onFailure(originalRequest, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
logger.log(Level.INFO, "Callback failure for " + toLoggableString(), e);
} else {
Request request = engine == null ? originalRequest : engine.getRequest();
responseCallback.onFailure(request, e);
}
} finally {
client.getDispatcher().finished(this);
}
}
}
/**
* Returns a string that describes this call. Doesn't include a full URL as that might contain
* sensitive information.
*/
private String toLoggableString() {
String string = canceled ? "canceled call" : "call";
HttpUrl redactedUrl = originalRequest.url().resolve("/...");
return string + " to " + redactedUrl;
}
private Response getResponseWithInterceptorChain(boolean forWebSocket) throws IOException {
Interceptor.Chain chain = new ApplicationInterceptorChain(0, originalRequest, forWebSocket);
return chain.proceed(originalRequest);
}
class ApplicationInterceptorChain implements Interceptor.Chain {
private final int index;
private final Request request;
private final boolean forWebSocket;
ApplicationInterceptorChain(int index, Request request, boolean forWebSocket) {
this.index = index;
this.request = request;
this.forWebSocket = forWebSocket;
}
@Override public Connection connection() {
return null;
}
@Override public Request request() {
return request;
}
@Override public Response proceed(Request request) throws IOException {
// If there's another interceptor in the chain, call that.
if (index < client.interceptors().size()) {
Interceptor.Chain chain = new ApplicationInterceptorChain(index + 1, request, forWebSocket);
Interceptor interceptor = client.interceptors().get(index);
Response interceptedResponse = interceptor.intercept(chain);
if (interceptedResponse == null) {
throw new NullPointerException("application interceptor " + interceptor
+ " returned null");
}
return interceptedResponse;
}
// No more interceptors. Do HTTP.
return getResponse(request, forWebSocket);
}
}
/**
* Performs the request and returns the response. May return null if this
* call was canceled.
*/
Response getResponse(Request request, boolean forWebSocket) throws IOException {
// Copy body metadata to the appropriate request headers.
RequestBody body = request.body();
if (body != null) {
Request.Builder requestBuilder = request.newBuilder();
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
request = requestBuilder.build();
}
// Create the initial HTTP engine. Retries and redirects need new engine for each attempt.
engine = new HttpEngine(client, request, false, false, forWebSocket, null, null, null);
int followUpCount = 0;
while (true) {
if (canceled) {
engine.releaseStreamAllocation();
throw new IOException("Canceled");
}
boolean releaseConnection = true;
try {
engine.sendRequest();
engine.readResponse();
releaseConnection = false;
} catch (RequestException e) {
// The attempt to interpret the request failed. Give up.
throw e.getCause();
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
HttpEngine retryEngine = engine.recover(e.getLastConnectException(), null);
if (retryEngine != null) {
releaseConnection = false;
engine = retryEngine;
continue;
}
// Give up; recovery is not possible.
throw e.getLastConnectException();
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
HttpEngine retryEngine = engine.recover(e, null);
if (retryEngine != null) {
releaseConnection = false;
engine = retryEngine;
continue;
}
// Give up; recovery is not possible.
throw e;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
StreamAllocation streamAllocation = engine.close();
streamAllocation.release();
}
}
Response response = engine.getResponse();
Request followUp = engine.followUpRequest();
if (followUp == null) {
if (!forWebSocket) {
engine.releaseStreamAllocation();
}
return response;
}
StreamAllocation streamAllocation = engine.close();
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (!engine.sameConnection(followUp.url())) {
streamAllocation.release();
streamAllocation = null;
}
request = followUp;
engine = new HttpEngine(client, request, false, false, forWebSocket, streamAllocation, null,
response);
}
}
}