1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-22 15:42:00 +03:00

Merge pull request #532 from square/jwilson_0215_statemachine

Enforce the implicit state machine in HttpConnection.
This commit is contained in:
Jesse Wilson
2014-02-15 13:30:23 -05:00
8 changed files with 293 additions and 189 deletions

View File

@@ -312,6 +312,7 @@ public final class Connection implements Closeable {
while (true) {
tunnelConnection.writeRequest(request.headers(), requestLine);
Response response = tunnelConnection.readResponse().request(request).build();
tunnelConnection.emptyResponseBody();
switch (response.code()) {
case HTTP_OK:

View File

@@ -1,107 +0,0 @@
/*
* Copyright (C) 2010 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.squareup.okhttp.internal.http;
import com.squareup.okhttp.internal.Util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.CacheRequest;
/**
* An input stream for the body of an HTTP response.
*
* <p>Since a single socket's input stream may be used to read multiple HTTP
* responses from the same server, subclasses shouldn't close the socket stream.
*
* <p>A side effect of reading an HTTP response is that the response cache
* is populated. If the stream is closed early, that cache entry will be
* invalidated.
*/
abstract class AbstractHttpInputStream extends InputStream {
protected final InputStream in;
protected final HttpEngine httpEngine;
private final CacheRequest cacheRequest;
protected final OutputStream cacheBody;
protected boolean closed;
AbstractHttpInputStream(InputStream in, HttpEngine httpEngine, CacheRequest cacheRequest)
throws IOException {
this.in = in;
this.httpEngine = httpEngine;
OutputStream cacheBody = cacheRequest != null ? cacheRequest.getBody() : null;
// Some apps return a null body; for compatibility we treat that like a null cache request.
if (cacheBody == null) {
cacheRequest = null;
}
this.cacheBody = cacheBody;
this.cacheRequest = cacheRequest;
}
/**
* read() is implemented using read(byte[], int, int) so subclasses only
* need to override the latter.
*/
@Override public final int read() throws IOException {
return Util.readSingleByte(this);
}
protected final void checkNotClosed() throws IOException {
if (closed) {
throw new IOException("stream closed");
}
}
protected final void cacheWrite(byte[] buffer, int offset, int count) throws IOException {
if (cacheBody != null) {
cacheBody.write(buffer, offset, count);
}
}
/**
* Closes the cache entry and makes the socket available for reuse. This
* should be invoked when the end of the body has been reached.
*/
protected final void endOfInput() throws IOException {
if (cacheRequest != null) {
cacheBody.close();
}
httpEngine.release(false);
}
/**
* Calls abort on the cache entry and disconnects the socket. This
* should be invoked when the connection is closed unexpectedly to
* invalidate the cache entry and to prevent the HTTP connection from
* being reused. HTTP messages are sent in serial so whenever a message
* cannot be read to completion, subsequent messages cannot be read
* either and the connection must be discarded.
*
* <p>An earlier implementation skipped the remaining bytes, but this
* requires that the entire transfer be completed. If the intention was
* to cancel the transfer, closing the connection is the only solution.
*/
protected final void unexpectedEndOfInput() {
if (cacheRequest != null) {
cacheRequest.abort();
}
httpEngine.release(true);
}
}

View File

@@ -33,11 +33,39 @@ import java.net.Socket;
import static com.squareup.okhttp.internal.Util.checkOffsetAndCount;
import static com.squareup.okhttp.internal.http.StatusLine.HTTP_CONTINUE;
/** A socket connection that can be used to send HTTP/1.1 messages. */
/**
* A socket connection that can be used to send HTTP/1.1 messages. This class
* strictly enforces the following lifecycle:
* <ol>
* <li>{@link #writeRequest Send request headers}.
* <li>Open the request body output stream. Either {@link
* #newFixedLengthOutputStream fixed-length} or {@link
* #newChunkedOutputStream chunked}.
* <li>Write to and then close that stream.
* <li>{@link #readResponse Read response headers}.
* <li>Open the HTTP response body input stream. Either {@link
* #newFixedLengthInputStream fixed-length}, {@link #newChunkedInputStream
* chunked} or {@link #newUnknownLengthInputStream unknown length}.
* <li>Read from and close that stream.
* </ol>
* <p>Exchanges that do not have a request body may skip creating and closing
* the request body. Exchanges that do not have a response body must call {@link
* #emptyResponseBody}.
*/
public final class HttpConnection {
private static final int STATE_IDLE = 0; // Idle connections are ready to write request headers.
private static final int STATE_OPEN_REQUEST_BODY = 1;
private static final int STATE_WRITING_REQUEST_BODY = 2;
private static final int STATE_READ_RESPONSE_HEADERS = 3;
private static final int STATE_OPEN_RESPONSE_BODY = 4;
private static final int STATE_READING_RESPONSE_BODY = 5;
private static final int STATE_CLOSED = 6;
private final InputStream in;
private final OutputStream out;
private int state = STATE_IDLE;
public HttpConnection(InputStream in, OutputStream out) {
this.in = in;
this.out = out;
@@ -49,6 +77,7 @@ public final class HttpConnection {
/** Returns bytes of a request header for sending on an HTTP transport. */
public void writeRequest(Headers headers, String requestLine) throws IOException {
if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
StringBuilder result = new StringBuilder(256);
result.append(requestLine).append("\r\n");
for (int i = 0; i < headers.size(); i ++) {
@@ -59,10 +88,15 @@ public final class HttpConnection {
}
result.append("\r\n");
out.write(result.toString().getBytes("ISO-8859-1"));
state = STATE_OPEN_REQUEST_BODY;
}
/** Parses bytes of a response header from an HTTP transport. */
public Response.Builder readResponse() throws IOException {
if (state != STATE_OPEN_REQUEST_BODY
&& state != STATE_READ_RESPONSE_HEADERS) {
throw new IllegalStateException("state: " + state);
}
while (true) {
String statusLineString = Util.readAsciiLine(in);
StatusLine statusLine = new StatusLine(statusLineString);
@@ -76,7 +110,10 @@ public final class HttpConnection {
OkHeaders.readHeaders(headersBuilder, in);
responseBuilder.headers(headersBuilder.build());
if (statusLine.code() != HTTP_CONTINUE) return responseBuilder;
if (statusLine.code() != HTTP_CONTINUE) {
state = STATE_OPEN_RESPONSE_BODY;
return responseBuilder;
}
}
}
@@ -109,25 +146,50 @@ public final class HttpConnection {
}
public OutputStream newChunkedOutputStream(int defaultChunkLength) {
if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
state = STATE_WRITING_REQUEST_BODY;
return new ChunkedOutputStream(out, defaultChunkLength);
}
public OutputStream newFixedLengthOutputStream(long contentLength) {
if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
state = STATE_WRITING_REQUEST_BODY;
return new FixedLengthOutputStream(out, contentLength);
}
public void writeRequestBody(RetryableOutputStream requestBody) throws IOException {
if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
state = STATE_READ_RESPONSE_HEADERS;
requestBody.writeToSocket(out);
}
public InputStream newFixedLengthInputStream(
CacheRequest cacheRequest, HttpEngine httpEngine, long length) throws IOException {
if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
state = STATE_READING_RESPONSE_BODY;
return new FixedLengthInputStream(in, cacheRequest, httpEngine, length);
}
/**
* Call this to advance past a response body for HTTP responses that do not
* have a response body.
*/
public void emptyResponseBody() {
if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
state = STATE_IDLE;
}
public InputStream newChunkedInputStream(
CacheRequest cacheRequest, HttpEngine httpEngine) throws IOException {
if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
state = STATE_READING_RESPONSE_BODY;
return new ChunkedInputStream(in, cacheRequest, httpEngine);
}
public InputStream newUnknownLengthInputStream(CacheRequest cacheRequest, HttpEngine httpEngine)
throws IOException {
if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
state = STATE_READING_RESPONSE_BODY;
return new UnknownLengthHttpInputStream(in, cacheRequest, httpEngine);
}
@@ -135,13 +197,8 @@ public final class HttpConnection {
return discardStream(httpEngine, responseBodyIn);
}
public void writeRequestBody(RetryableOutputStream requestBody) throws IOException {
requestBody.writeToSocket(out);
}
/** An HTTP body with a fixed length known in advance. */
private static final class FixedLengthOutputStream extends AbstractOutputStream {
private final class FixedLengthOutputStream extends AbstractOutputStream {
private final OutputStream socketOut;
private long bytesRemaining;
@@ -175,21 +232,22 @@ public final class HttpConnection {
if (bytesRemaining > 0) {
throw new ProtocolException("unexpected end of stream");
}
state = STATE_READ_RESPONSE_HEADERS;
}
}
private static final byte[] CRLF = { '\r', '\n' };
private static final byte[] HEX_DIGITS = {
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
};
private static final byte[] FINAL_CHUNK = new byte[] { '0', '\r', '\n', '\r', '\n' };
/**
* An HTTP body with alternating chunk sizes and chunk bodies. Chunks are
* buffered until {@code maxChunkLength} bytes are ready, at which point the
* chunk is written and the buffer is cleared.
*/
private static final class ChunkedOutputStream extends AbstractOutputStream {
private static final byte[] CRLF = { '\r', '\n' };
private static final byte[] HEX_DIGITS = {
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
};
private static final byte[] FINAL_CHUNK = new byte[] { '0', '\r', '\n', '\r', '\n' };
private final class ChunkedOutputStream extends AbstractOutputStream {
/** Scratch space for up to 8 hex digits, and then a constant CRLF. */
private final byte[] hex = { 0, 0, 0, 0, 0, 0, 0, 0, '\r', '\n' };
@@ -272,6 +330,7 @@ public final class HttpConnection {
closed = true;
writeBufferedChunkToSocket();
socketOut.write(FINAL_CHUNK);
state = STATE_READ_RESPONSE_HEADERS;
}
private void writeBufferedChunkToSocket() throws IOException {
@@ -287,8 +346,85 @@ public final class HttpConnection {
}
}
private class AbstractHttpInputStream extends InputStream {
protected final InputStream in;
protected final HttpEngine httpEngine;
private final CacheRequest cacheRequest;
protected final OutputStream cacheBody;
protected boolean closed;
AbstractHttpInputStream(InputStream in, HttpEngine httpEngine, CacheRequest cacheRequest)
throws IOException {
this.in = in;
this.httpEngine = httpEngine;
OutputStream cacheBody = cacheRequest != null ? cacheRequest.getBody() : null;
// Some apps return a null body; for compatibility we treat that like a null cache request.
if (cacheBody == null) {
cacheRequest = null;
}
this.cacheBody = cacheBody;
this.cacheRequest = cacheRequest;
}
/**
* read() is implemented using read(byte[], int, int) so subclasses only
* need to override the latter.
*/
@Override public final int read() throws IOException {
return Util.readSingleByte(this);
}
protected final void checkNotClosed() throws IOException {
if (closed) {
throw new IOException("stream closed");
}
}
protected final void cacheWrite(byte[] buffer, int offset, int count) throws IOException {
if (cacheBody != null) {
cacheBody.write(buffer, offset, count);
}
}
/**
* Closes the cache entry and makes the socket available for reuse. This
* should be invoked when the end of the body has been reached.
*/
protected final void endOfInput() throws IOException {
if (state != STATE_READING_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
if (cacheRequest != null) {
cacheBody.close();
}
httpEngine.release(false);
state = STATE_IDLE;
}
/**
* Calls abort on the cache entry and disconnects the socket. This
* should be invoked when the connection is closed unexpectedly to
* invalidate the cache entry and to prevent the HTTP connection from
* being reused. HTTP messages are sent in serial so whenever a message
* cannot be read to completion, subsequent messages cannot be read
* either and the connection must be discarded.
*
* <p>An earlier implementation skipped the remaining bytes, but this
* requires that the entire transfer be completed. If the intention was
* to cancel the transfer, closing the connection is the only solution.
*/
protected final void unexpectedEndOfInput() {
if (cacheRequest != null) {
cacheRequest.abort();
}
httpEngine.release(true);
state = STATE_CLOSED;
}
}
/** An HTTP body with a fixed length specified in advance. */
private static class FixedLengthInputStream extends AbstractHttpInputStream {
private class FixedLengthInputStream extends AbstractHttpInputStream {
private long bytesRemaining;
public FixedLengthInputStream(InputStream is, CacheRequest cacheRequest, HttpEngine httpEngine,
@@ -336,7 +472,7 @@ public final class HttpConnection {
}
/** An HTTP body with alternating chunk sizes and chunk bodies. */
private static class ChunkedInputStream extends AbstractHttpInputStream {
private class ChunkedInputStream extends AbstractHttpInputStream {
private static final int NO_CHUNK_YET = -1;
private int bytesRemainingInChunk = NO_CHUNK_YET;
private boolean hasMoreChunks = true;
@@ -411,4 +547,45 @@ public final class HttpConnection {
closed = true;
}
}
/** An HTTP message body terminated by the end of the underlying stream. */
class UnknownLengthHttpInputStream extends AbstractHttpInputStream {
private boolean inputExhausted;
UnknownLengthHttpInputStream(InputStream in, CacheRequest cacheRequest, HttpEngine httpEngine)
throws IOException {
super(in, httpEngine, cacheRequest);
}
@Override public int read(byte[] buffer, int offset, int count) throws IOException {
checkOffsetAndCount(buffer.length, offset, count);
checkNotClosed();
if (in == null || inputExhausted) {
return -1;
}
int read = in.read(buffer, offset, count);
if (read == -1) {
inputExhausted = true;
endOfInput();
return -1;
}
cacheWrite(buffer, offset, read);
return read;
}
@Override public int available() throws IOException {
checkNotClosed();
return in == null ? 0 : in.available();
}
@Override public void close() throws IOException {
if (closed) {
return;
}
closed = true;
if (!inputExhausted) {
unexpectedEndOfInput();
}
}
}
}

View File

@@ -552,6 +552,7 @@ public class HttpEngine {
if (responseSource == ResponseSource.CONDITIONAL_CACHE) {
if (validatingResponse.validate(response)) {
transport.emptyTransferStream();
release(false);
response = combine(validatingResponse, response);

View File

@@ -126,7 +126,7 @@ public final class HttpTransport implements Transport {
return false;
}
if (responseBodyIn instanceof UnknownLengthHttpInputStream) {
if (responseBodyIn instanceof HttpConnection.UnknownLengthHttpInputStream) {
return false;
}
@@ -137,6 +137,10 @@ public final class HttpTransport implements Transport {
return true;
}
@Override public void emptyTransferStream() {
httpConnection.emptyResponseBody();
}
@Override public InputStream getTransferStream(CacheRequest cacheRequest) throws IOException {
if (!httpEngine.hasResponseBody()) {
return httpConnection.newFixedLengthInputStream(cacheRequest, httpEngine, 0);

View File

@@ -20,8 +20,8 @@ import com.squareup.okhttp.Headers;
import com.squareup.okhttp.Protocol;
import com.squareup.okhttp.Request;
import com.squareup.okhttp.Response;
import com.squareup.okhttp.internal.bytes.ByteString;
import com.squareup.okhttp.internal.Util;
import com.squareup.okhttp.internal.bytes.ByteString;
import com.squareup.okhttp.internal.spdy.ErrorCode;
import com.squareup.okhttp.internal.spdy.Header;
import com.squareup.okhttp.internal.spdy.SpdyConnection;
@@ -187,6 +187,10 @@ public final class SpdyTransport implements Transport {
.headers(headersBuilder.build());
}
@Override public void emptyTransferStream() {
// Do nothing.
}
@Override public InputStream getTransferStream(CacheRequest cacheRequest) throws IOException {
return new SpdyInputStream(stream, cacheRequest, httpEngine);
}
@@ -226,6 +230,90 @@ public final class SpdyTransport implements Transport {
return prohibited;
}
/**
* An input stream for the body of an HTTP response.
*
* <p>Since a single socket's input stream may be used to read multiple HTTP
* responses from the same server, subclasses shouldn't close the socket stream.
*
* <p>A side effect of reading an HTTP response is that the response cache
* is populated. If the stream is closed early, that cache entry will be
* invalidated.
*/
abstract static class AbstractHttpInputStream extends InputStream {
protected final InputStream in;
protected final HttpEngine httpEngine;
private final CacheRequest cacheRequest;
protected final OutputStream cacheBody;
protected boolean closed;
AbstractHttpInputStream(InputStream in, HttpEngine httpEngine, CacheRequest cacheRequest)
throws IOException {
this.in = in;
this.httpEngine = httpEngine;
OutputStream cacheBody = cacheRequest != null ? cacheRequest.getBody() : null;
// Some apps return a null body; for compatibility we treat that like a null cache request.
if (cacheBody == null) {
cacheRequest = null;
}
this.cacheBody = cacheBody;
this.cacheRequest = cacheRequest;
}
/**
* read() is implemented using read(byte[], int, int) so subclasses only
* need to override the latter.
*/
@Override public final int read() throws IOException {
return Util.readSingleByte(this);
}
protected final void checkNotClosed() throws IOException {
if (closed) {
throw new IOException("stream closed");
}
}
protected final void cacheWrite(byte[] buffer, int offset, int count) throws IOException {
if (cacheBody != null) {
cacheBody.write(buffer, offset, count);
}
}
/**
* Closes the cache entry and makes the socket available for reuse. This
* should be invoked when the end of the body has been reached.
*/
protected final void endOfInput() throws IOException {
if (cacheRequest != null) {
cacheBody.close();
}
httpEngine.release(false);
}
/**
* Calls abort on the cache entry and disconnects the socket. This
* should be invoked when the connection is closed unexpectedly to
* invalidate the cache entry and to prevent the HTTP connection from
* being reused. HTTP messages are sent in serial so whenever a message
* cannot be read to completion, subsequent messages cannot be read
* either and the connection must be discarded.
*
* <p>An earlier implementation skipped the remaining bytes, but this
* requires that the entire transfer be completed. If the intention was
* to cancel the transfer, closing the connection is the only solution.
*/
protected final void unexpectedEndOfInput() {
if (cacheRequest != null) {
cacheRequest.abort();
}
httpEngine.release(true);
}
}
/** An HTTP message body terminated by the end of the underlying stream. */
private static class SpdyInputStream extends AbstractHttpInputStream {
private final SpdyStream stream;

View File

@@ -64,6 +64,9 @@ interface Transport {
/** Read response headers and update the cookie manager. */
Response.Builder readResponseHeaders() throws IOException;
/** Notify the transport that no response body will be read. */
void emptyTransferStream();
// TODO: make this the content stream?
InputStream getTransferStream(CacheRequest cacheRequest) throws IOException;

View File

@@ -1,63 +0,0 @@
/*
* Copyright (C) 2012 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.squareup.okhttp.internal.http;
import java.io.IOException;
import java.io.InputStream;
import java.net.CacheRequest;
import static com.squareup.okhttp.internal.Util.checkOffsetAndCount;
/** An HTTP message body terminated by the end of the underlying stream. */
final class UnknownLengthHttpInputStream extends AbstractHttpInputStream {
private boolean inputExhausted;
UnknownLengthHttpInputStream(InputStream in, CacheRequest cacheRequest, HttpEngine httpEngine)
throws IOException {
super(in, httpEngine, cacheRequest);
}
@Override public int read(byte[] buffer, int offset, int count) throws IOException {
checkOffsetAndCount(buffer.length, offset, count);
checkNotClosed();
if (in == null || inputExhausted) {
return -1;
}
int read = in.read(buffer, offset, count);
if (read == -1) {
inputExhausted = true;
endOfInput();
return -1;
}
cacheWrite(buffer, offset, read);
return read;
}
@Override public int available() throws IOException {
checkNotClosed();
return in == null ? 0 : in.available();
}
@Override public void close() throws IOException {
if (closed) {
return;
}
closed = true;
if (!inputExhausted) {
unexpectedEndOfInput();
}
}
}