1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-24 04:02:07 +03:00

Merge pull request #377 from square/jwilson/async_apis

Define async APIs and switch Job to use HttpEngine directly.
This commit is contained in:
Jesse Wilson
2013-12-28 17:47:14 -08:00
10 changed files with 365 additions and 93 deletions

View File

@@ -15,9 +15,9 @@
*/
package com.squareup.okhttp;
import com.squareup.okhttp.internal.http.ResponseHeaders;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@@ -33,8 +33,8 @@ final class Dispatcher {
private final Map<Object, List<Job>> enqueuedJobs = new LinkedHashMap<Object, List<Job>>();
public synchronized void enqueue(
HttpURLConnection connection, Request request, Response.Receiver responseReceiver) {
Job job = new Job(this, connection, request, responseReceiver);
OkHttpClient client, Request request, Response.Receiver responseReceiver) {
Job job = new Job(this, client, request, responseReceiver);
List<Job> jobsForTag = enqueuedJobs.get(request.tag());
if (jobsForTag == null) {
jobsForTag = new ArrayList<Job>(2);
@@ -53,25 +53,30 @@ final class Dispatcher {
}
synchronized void finished(Job job) {
List<Job> jobs = enqueuedJobs.get(job.request.tag());
List<Job> jobs = enqueuedJobs.get(job.tag());
if (jobs != null) jobs.remove(job);
}
static class RealResponseBody extends Response.Body {
private final HttpURLConnection connection;
private final ResponseHeaders responseHeaders;
private final InputStream in;
RealResponseBody(HttpURLConnection connection, InputStream in) {
this.connection = connection;
RealResponseBody(ResponseHeaders responseHeaders, InputStream in) {
this.responseHeaders = responseHeaders;
this.in = in;
}
@Override public String contentType() {
return connection.getHeaderField("Content-Type");
@Override public boolean ready() throws IOException {
return true;
}
@Override public MediaType contentType() {
String contentType = responseHeaders.getContentType();
return contentType != null ? MediaType.parse(contentType) : null;
}
@Override public long contentLength() {
return connection.getContentLength(); // TODO: getContentLengthLong
return responseHeaders.getContentLength();
}
@Override public InputStream byteStream() throws IOException {

View File

@@ -15,27 +15,82 @@
*/
package com.squareup.okhttp;
import com.squareup.okhttp.internal.http.HttpAuthenticator;
import com.squareup.okhttp.internal.http.HttpEngine;
import com.squareup.okhttp.internal.http.HttpTransport;
import com.squareup.okhttp.internal.http.HttpsEngine;
import com.squareup.okhttp.internal.http.Policy;
import com.squareup.okhttp.internal.http.RawHeaders;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.ProtocolException;
import java.net.Proxy;
import java.net.URL;
final class Job implements Runnable {
final HttpURLConnection connection;
final Request request;
final Response.Receiver responseReceiver;
final Dispatcher dispatcher;
import static com.squareup.okhttp.internal.Util.getEffectivePort;
import static com.squareup.okhttp.internal.http.HttpURLConnectionImpl.HTTP_MOVED_PERM;
import static com.squareup.okhttp.internal.http.HttpURLConnectionImpl.HTTP_MOVED_TEMP;
import static com.squareup.okhttp.internal.http.HttpURLConnectionImpl.HTTP_MULT_CHOICE;
import static com.squareup.okhttp.internal.http.HttpURLConnectionImpl.HTTP_PROXY_AUTH;
import static com.squareup.okhttp.internal.http.HttpURLConnectionImpl.HTTP_SEE_OTHER;
import static com.squareup.okhttp.internal.http.HttpURLConnectionImpl.HTTP_TEMP_REDIRECT;
import static com.squareup.okhttp.internal.http.HttpURLConnectionImpl.HTTP_UNAUTHORIZED;
public Job(Dispatcher dispatcher, HttpURLConnection connection, Request request,
final class Job implements Runnable, Policy {
private final Dispatcher dispatcher;
private final OkHttpClient client;
private final Response.Receiver responseReceiver;
/** The request; possibly a consequence of redirects or auth headers. */
private Request request;
public Job(Dispatcher dispatcher, OkHttpClient client, Request request,
Response.Receiver responseReceiver) {
this.dispatcher = dispatcher;
this.connection = connection;
this.client = client;
this.request = request;
this.responseReceiver = responseReceiver;
}
@Override public int getChunkLength() {
return request.body().contentLength() == -1 ? HttpTransport.DEFAULT_CHUNK_LENGTH : -1;
}
@Override public long getFixedContentLength() {
return request.body().contentLength();
}
@Override public boolean getUseCaches() {
return false; // TODO.
}
@Override public HttpURLConnection getHttpConnectionToCache() {
return null;
}
@Override public URL getURL() {
return request.url();
}
@Override public long getIfModifiedSince() {
return 0; // For HttpURLConnection only. We let the cache drive this.
}
@Override public boolean usingProxy() {
return false; // We let the connection decide this.
}
@Override public void setSelectedProxy(Proxy proxy) {
// Do nothing.
}
Object tag() {
return request.tag();
}
@Override public void run() {
try {
sendRequest();
Response response = readResponse();
Response response = execute();
responseReceiver.onResponse(response);
} catch (IOException e) {
responseReceiver.onFailure(new Failure.Builder()
@@ -43,43 +98,135 @@ final class Job implements Runnable {
.exception(e)
.build());
} finally {
connection.disconnect();
// TODO: close the response body
// TODO: release the HTTP engine (potentially multiple!)
dispatcher.finished(this);
}
}
private HttpURLConnection sendRequest() throws IOException {
for (int i = 0; i < request.headerCount(); i++) {
connection.addRequestProperty(request.headerName(i), request.headerValue(i));
}
Request.Body body = request.body();
if (body != null) {
connection.setDoOutput(true);
long contentLength = body.contentLength();
if (contentLength == -1 || contentLength > Integer.MAX_VALUE) {
connection.setChunkedStreamingMode(0);
} else {
// Don't call setFixedLengthStreamingMode(long); that's only available on Java 1.7+.
connection.setFixedLengthStreamingMode((int) contentLength);
private Response execute() throws IOException {
Connection connection = null;
Response redirectedBy = null;
while (true) {
HttpEngine engine = newEngine(connection);
Request.Body body = request.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType == null) throw new IllegalStateException("contentType == null");
if (engine.getRequestHeaders().getContentType() == null) {
engine.getRequestHeaders().setContentType(contentType.toString());
}
}
body.writeTo(connection.getOutputStream());
engine.sendRequest();
if (body != null) {
body.writeTo(engine.getRequestBody());
}
engine.readResponse();
int responseCode = engine.getResponseCode();
Dispatcher.RealResponseBody responseBody = new Dispatcher.RealResponseBody(
engine.getResponseHeaders(), engine.getResponseBody());
Response response = new Response.Builder(request, responseCode)
.rawHeaders(engine.getResponseHeaders().getHeaders())
.body(responseBody)
.redirectedBy(redirectedBy)
.build();
Request redirect = processResponse(engine, response);
if (redirect == null) {
engine.automaticallyReleaseConnectionToPool();
return response;
}
// TODO: fail if too many redirects
// TODO: fail if not following redirects
// TODO: release engine
connection = sameConnection(request, redirect) ? engine.getConnection() : null;
redirectedBy = response;
request = redirect;
}
return connection;
}
private Response readResponse() throws IOException {
int responseCode = connection.getResponseCode();
Response.Builder responseBuilder = new Response.Builder(request, responseCode);
for (int i = 0; true; i++) {
String name = connection.getHeaderFieldKey(i);
if (name == null) break;
String value = connection.getHeaderField(i);
responseBuilder.addHeader(name, value);
HttpEngine newEngine(Connection connection) throws IOException {
String protocol = request.url().getProtocol();
RawHeaders requestHeaders = request.rawHeaders();
if (protocol.equals("http")) {
return new HttpEngine(client, this, request.method(), requestHeaders, connection, null);
} else if (protocol.equals("https")) {
return new HttpsEngine(client, this, request.method(), requestHeaders, connection, null);
} else {
throw new AssertionError();
}
}
responseBuilder.body(new Dispatcher.RealResponseBody(connection, connection.getInputStream()));
// TODO: set redirectedBy
return responseBuilder.build();
/**
* Figures out the HTTP request to make in response to receiving {@code
* response}. This will either add authentication headers or follow
* redirects. If a follow-up is either unnecessary or not applicable, this
* returns null.
*/
private Request processResponse(HttpEngine engine, Response response) throws IOException {
Request request = response.request();
Proxy selectedProxy = engine.getConnection() != null
? engine.getConnection().getRoute().getProxy()
: client.getProxy();
int responseCode = response.code();
switch (responseCode) {
case HTTP_PROXY_AUTH:
if (selectedProxy.type() != Proxy.Type.HTTP) {
throw new ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy");
}
// fall-through
case HTTP_UNAUTHORIZED:
RawHeaders successorRequestHeaders = request.rawHeaders();
boolean credentialsFound = HttpAuthenticator.processAuthHeader(client.getAuthenticator(),
response.code(), response.rawHeaders(), successorRequestHeaders, selectedProxy,
this.request.url());
return credentialsFound
? request.newBuilder().rawHeaders(successorRequestHeaders).build()
: null;
case HTTP_MULT_CHOICE:
case HTTP_MOVED_PERM:
case HTTP_MOVED_TEMP:
case HTTP_SEE_OTHER:
case HTTP_TEMP_REDIRECT:
String method = request.method();
if (responseCode == HTTP_TEMP_REDIRECT && !method.equals("GET") && !method.equals("HEAD")) {
// "If the 307 status code is received in response to a request other than GET or HEAD,
// the user agent MUST NOT automatically redirect the request"
return null;
}
String location = response.header("Location");
if (location == null) {
return null;
}
URL url = new URL(request.url(), location);
if (!url.getProtocol().equals("https") && !url.getProtocol().equals("http")) {
return null; // Don't follow redirects to unsupported protocols.
}
return this.request.newBuilder().url(url).build();
default:
return null;
}
}
private boolean sameConnection(Request a, Request b) {
return a.url().getHost().equals(b.url().getHost())
&& getEffectivePort(a.url()) == getEffectivePort(b.url())
&& a.url().getProtocol().equals(b.url().getProtocol());
}
}

View File

@@ -324,7 +324,7 @@ public final class OkHttpClient implements URLStreamHandlerFactory {
// Create the HttpURLConnection immediately so the enqueued job gets the current settings of
// this client. Otherwise changes to this client (socket factory, redirect policy, etc.) may
// incorrectly be reflected in the request when it is dispatched later.
dispatcher.enqueue(open(request.url()), request, responseReceiver);
dispatcher.enqueue(copyWithDefaults(), request, responseReceiver);
}
/**

View File

@@ -74,6 +74,10 @@ import java.util.Set;
return headers.names();
}
RawHeaders rawHeaders() {
return new RawHeaders(headers);
}
public int headerCount() {
return headers.length();
}
@@ -94,16 +98,21 @@ import java.util.Set;
return tag;
}
public abstract static class Body {
/**
* Returns the Content-Type header for this body, or null if the content
* type is unknown.
*/
public MediaType contentType() {
return null;
}
Builder newBuilder() {
return new Builder(url)
.method(method, body)
.rawHeaders(headers)
.tag(tag);
}
/** Returns the number of bytes in this body, or -1 if that count is unknown. */
public abstract static class Body {
/** Returns the Content-Type header for this body. */
public abstract MediaType contentType();
/**
* Returns the number of bytes that will be written to {@code out} in a call
* to {@link #writeTo}, or -1 if that count is unknown.
*/
public long contentLength() {
return -1;
}
@@ -183,7 +192,7 @@ import java.util.Set;
public static class Builder {
private URL url;
private String method = "GET";
private final RawHeaders headers = new RawHeaders();
private RawHeaders headers = new RawHeaders();
private Body body;
private Object tag;
@@ -228,6 +237,11 @@ import java.util.Set;
return this;
}
Builder rawHeaders(RawHeaders rawHeaders) {
headers = new RawHeaders(rawHeaders);
return this;
}
public Builder get() {
return method("GET", null);
}

View File

@@ -22,9 +22,12 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Set;
import static com.squareup.okhttp.internal.Util.UTF_8;
/**
* An HTTP response. Instances of this class are not immutable: the response
* body is a one-shot value that may be consumed only once. All other properties
@@ -93,6 +96,10 @@ import java.util.Set;
return headers.getFieldName(index);
}
RawHeaders rawHeaders() {
return new RawHeaders(headers);
}
public String headerValue(int index) {
return headers.getValue(index);
}
@@ -112,17 +119,34 @@ import java.util.Set;
}
public abstract static class Body {
public String contentType() {
return null;
}
/** Multiple calls to {@link #charStream()} must return the same instance. */
private Reader reader;
public long contentLength() {
return -1;
}
/**
* Returns true if further data from this response body should be read at
* this time. For asynchronous transports like SPDY and HTTP/2.0, this will
* return false once all locally-available body bytes have been read.
*
* <p>Clients with many concurrent downloads can use this method to reduce
* the number of idle threads blocking on reads. See {@link
* Receiver#onResponse} for details.
*/
// <h3>Body.ready() vs. InputStream.available()</h3>
// TODO: Can we fix response bodies to implement InputStream.available well?
// The deflater implementation is broken by default but we could do better.
public abstract boolean ready() throws IOException;
public abstract MediaType contentType();
/**
* Returns the number of bytes in that will returned by {@link #bytes}, or
* {@link #byteStream}, or -1 if unknown.
*/
public abstract long contentLength();
public abstract InputStream byteStream() throws IOException;
public byte[] bytes() throws IOException {
public final byte[] bytes() throws IOException {
long contentLength = contentLength();
if (contentLength > Integer.MAX_VALUE) {
throw new IOException("Cannot buffer entire body for content length: " + contentLength);
@@ -143,33 +167,77 @@ import java.util.Set;
}
/**
* Returns the response bytes as a UTF-8 character stream. Do not call this
* method if the response content is not a UTF-8 character stream.
* Returns the response as a character stream decoded with the charset
* of the Content-Type header. If that header is either absent or lacks a
* charset, this will attempt to decode the response body as UTF-8.
*/
public Reader charStream() throws IOException {
// TODO: parse content-type.
return new InputStreamReader(byteStream(), "UTF-8");
public final Reader charStream() throws IOException {
if (reader == null) {
reader = new InputStreamReader(byteStream(), charset());
}
return reader;
}
/**
* Returns the response bytes as a UTF-8 string. Do not call this method if
* the response content is not a UTF-8 character stream.
* Returns the response as a string decoded with the charset of the
* Content-Type header. If that header is either absent or lacks a charset,
* this will attempt to decode the response body as UTF-8.
*/
public String string() throws IOException {
// TODO: parse content-type.
return new String(bytes(), "UTF-8");
public final String string() throws IOException {
return new String(bytes(), charset().name());
}
private Charset charset() {
MediaType contentType = contentType();
return contentType != null ? contentType.charset(UTF_8) : UTF_8;
}
}
public interface Receiver {
/**
* Called when the request could not be executed due to a connectivity
* problem or timeout. Because networks can fail during an exchange, it is
* possible that the remote server accepted the request before the failure.
*/
void onFailure(Failure failure);
void onResponse(Response response) throws IOException;
/**
* Called when the HTTP response was successfully returned by the remote
* server. The receiver may proceed to read the response body with the
* response's {@link #body} method.
*
* <p>Note that transport-layer success (receiving a HTTP response code,
* headers and body) does not necessarily indicate application-layer
* success: {@code response} may still indicate an unhappy HTTP response
* code like 404 or 500.
*
* <h3>Non-blocking responses</h3>
*
* <p>Receivers do not need to block while waiting for the response body to
* download. Instead, they can get called back as data arrives. Use {@link
* Body#ready} to check if bytes should be read immediately. While there is
* data ready, read it. If there isn't, return false: receivers will be
* called back with {@code onResponse()} as additional data is downloaded.
*
* <p>Return true to indicate that the receiver has finished handling the
* response body. If the response body has unread data, it will be
* discarded.
*
* <p>When the response body has been fully consumed the returned value is
* undefined.
*
* <p>The current implementation of {@link Body#ready} always returns true
* when the underlying transport is HTTP/1. This results in blocking on that
* transport. For effective non-blocking your server must support SPDY or
* HTTP/2.
*/
boolean onResponse(Response response) throws IOException;
}
public static class Builder {
private final Request request;
private final int code;
private final RawHeaders headers = new RawHeaders();
private RawHeaders headers = new RawHeaders();
private Body body;
private Response redirectedBy;
@@ -198,6 +266,11 @@ import java.util.Set;
return this;
}
Builder rawHeaders(RawHeaders rawHeaders) {
headers = new RawHeaders(rawHeaders);
return this;
}
public Builder body(Body body) {
this.body = body;
return this;

View File

@@ -370,10 +370,10 @@ public final class HttpTransport implements Transport {
/** An HTTP body with a fixed length specified in advance. */
private static class FixedLengthInputStream extends AbstractHttpInputStream {
private int bytesRemaining;
private long bytesRemaining;
public FixedLengthInputStream(InputStream is, CacheRequest cacheRequest, HttpEngine httpEngine,
int length) throws IOException {
long length) throws IOException {
super(is, httpEngine, cacheRequest);
bytesRemaining = length;
if (bytesRemaining == 0) {
@@ -387,7 +387,7 @@ public final class HttpTransport implements Transport {
if (bytesRemaining == 0) {
return -1;
}
int read = in.read(buffer, offset, Math.min(count, bytesRemaining));
int read = in.read(buffer, offset, (int) Math.min(count, bytesRemaining));
if (read == -1) {
unexpectedEndOfInput(); // the server didn't supply the promised content length
throw new ProtocolException("unexpected end of stream");
@@ -402,7 +402,7 @@ public final class HttpTransport implements Transport {
@Override public int available() throws IOException {
checkNotClosed();
return bytesRemaining == 0 ? 0 : Math.min(in.available(), bytesRemaining);
return bytesRemaining == 0 ? 0 : (int) Math.min(in.available(), bytesRemaining);
}
@Override public void close() throws IOException {

View File

@@ -60,7 +60,7 @@ import static com.squareup.okhttp.internal.Util.getEffectivePort;
public class HttpURLConnectionImpl extends HttpURLConnection implements Policy {
/** Numeric status code, 307: Temporary Redirect. */
static final int HTTP_TEMP_REDIRECT = 307;
public static final int HTTP_TEMP_REDIRECT = 307;
/**
* How many redirects should we follow? Chrome follows 21; Firefox, curl,
@@ -311,7 +311,7 @@ public class HttpURLConnectionImpl extends HttpURLConnection implements Policy {
// Although RFC 2616 10.3.2 specifies that a HTTP_MOVED_PERM
// redirect should keep the same method, Chrome, Firefox and the
// RI all issue GETs when following any redirect.
int responseCode = getResponseCode();
int responseCode = httpEngine.getResponseCode();
if (responseCode == HTTP_MULT_CHOICE
|| responseCode == HTTP_MOVED_PERM
|| responseCode == HTTP_MOVED_TEMP
@@ -321,8 +321,7 @@ public class HttpURLConnectionImpl extends HttpURLConnection implements Policy {
}
if (requestBody != null && !(requestBody instanceof RetryableOutputStream)) {
throw new HttpRetryException("Cannot retry streamed HTTP body",
httpEngine.getResponseCode());
throw new HttpRetryException("Cannot retry streamed HTTP body", responseCode);
}
if (retry == Retry.DIFFERENT_CONNECTION) {
@@ -413,7 +412,7 @@ public class HttpURLConnectionImpl extends HttpURLConnection implements Policy {
/**
* Returns the retry action to take for the current response headers. The
* headers, proxy and target URL or this connection may be adjusted to
* headers, proxy and target URL for this connection may be adjusted to
* prepare for a follow up request.
*/
private Retry processResponseHeaders() throws IOException {

View File

@@ -114,8 +114,9 @@ public final class ResponseHeaders {
private String contentEncoding;
private String transferEncoding;
private int contentLength = -1;
private long contentLength = -1;
private String connection;
private String contentType;
public ResponseHeaders(URI uri, RawHeaders headers) {
this.uri = uri;
@@ -172,9 +173,11 @@ public final class ResponseHeaders {
transferEncoding = value;
} else if ("Content-Length".equalsIgnoreCase(fieldName)) {
try {
contentLength = Integer.parseInt(value);
contentLength = Long.parseLong(value);
} catch (NumberFormatException ignored) {
}
} else if ("Content-Type".equalsIgnoreCase(fieldName)) {
contentType = value;
} else if ("Connection".equalsIgnoreCase(fieldName)) {
connection = value;
} else if (SENT_MILLIS.equalsIgnoreCase(fieldName)) {
@@ -263,10 +266,14 @@ public final class ResponseHeaders {
return contentEncoding;
}
public int getContentLength() {
public long getContentLength() {
return contentLength;
}
public String getContentType() {
return contentType;
}
public String getConnection() {
return connection;
}

View File

@@ -68,5 +68,6 @@ public final class AsyncApiTest {
RecordedRequest recordedRequest = server.takeRequest();
assertEquals("def", recordedRequest.getUtf8Body());
assertEquals("3", recordedRequest.getHeader("Content-Length"));
assertEquals("text/plain; charset=utf-8", recordedRequest.getHeader("Content-Type"));
}
}

View File

@@ -15,9 +15,12 @@
*/
package com.squareup.okhttp;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
@@ -26,6 +29,8 @@ import java.util.concurrent.TimeUnit;
public class RecordingReceiver implements Response.Receiver {
public static final long TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);
private final Map<Request, ByteArrayOutputStream> inFlightResponses
= new LinkedHashMap<Request, ByteArrayOutputStream>();
private final List<RecordedResponse> responses = new ArrayList<RecordedResponse>();
@Override public synchronized void onFailure(Failure failure) {
@@ -33,10 +38,31 @@ public class RecordingReceiver implements Response.Receiver {
notifyAll();
}
@Override public synchronized void onResponse(Response response) throws IOException {
responses.add(new RecordedResponse(
response.request(), response, response.body().string(), null));
notifyAll();
@Override public synchronized boolean onResponse(Response response) throws IOException {
ByteArrayOutputStream out = inFlightResponses.get(response.request());
if (out == null) {
out = new ByteArrayOutputStream();
inFlightResponses.put(response.request(), out);
}
byte[] buffer = new byte[1024];
Response.Body body = response.body();
while (body.ready()) {
int c = body.byteStream().read(buffer);
if (c == -1) {
inFlightResponses.remove(response.request());
responses.add(new RecordedResponse(
response.request(), response, out.toString("UTF-8"), null));
notifyAll();
return true;
}
out.write(buffer, 0, c);
}
return false;
}
/**