mirror of
https://github.com/square/okhttp.git
synced 2026-01-21 03:41:07 +03:00
Merge pull request #590 from square/jwilson_chunk_buffering
Rely in external buffers in ChunkedSink.
This commit is contained in:
@@ -44,9 +44,8 @@ import static com.squareup.okhttp.internal.http.Transport.DISCARD_STREAM_TIMEOUT
|
||||
* strictly enforces the following lifecycle:
|
||||
* <ol>
|
||||
* <li>{@link #writeRequest Send request headers}.
|
||||
* <li>Open the request body output stream. Either {@link
|
||||
* #newFixedLengthOutputStream fixed-length} or {@link
|
||||
* #newChunkedOutputStream chunked}.
|
||||
* <li>Open a sink to write the request body. Either {@link
|
||||
* #newFixedLengthSink fixed-length} or {@link #newChunkedSink chunked}.
|
||||
* <li>Write to and then close that stream.
|
||||
* <li>{@link #readResponse Read response headers}.
|
||||
* <li>Open the HTTP response body input stream. Either {@link
|
||||
@@ -192,16 +191,16 @@ public final class HttpConnection {
|
||||
}
|
||||
}
|
||||
|
||||
public Sink newChunkedOutputStream(int defaultChunkLength) {
|
||||
public Sink newChunkedSink() {
|
||||
if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
|
||||
state = STATE_WRITING_REQUEST_BODY;
|
||||
return new ChunkedSink(sink, defaultChunkLength);
|
||||
return new ChunkedSink();
|
||||
}
|
||||
|
||||
public Sink newFixedLengthOutputStream(long contentLength) {
|
||||
public Sink newFixedLengthSink(long contentLength) {
|
||||
if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
|
||||
state = STATE_WRITING_REQUEST_BODY;
|
||||
return new FixedLengthSink(sink, contentLength);
|
||||
return new FixedLengthSink(contentLength);
|
||||
}
|
||||
|
||||
public void writeRequestBody(RetryableSink requestBody) throws IOException {
|
||||
@@ -241,11 +240,9 @@ public final class HttpConnection {
|
||||
/** An HTTP body with a fixed length known in advance. */
|
||||
private final class FixedLengthSink implements Sink {
|
||||
private boolean closed;
|
||||
private final BufferedSink sink;
|
||||
private long bytesRemaining;
|
||||
|
||||
private FixedLengthSink(BufferedSink sink, long bytesRemaining) {
|
||||
this.sink = sink;
|
||||
private FixedLengthSink(long bytesRemaining) {
|
||||
this.bytesRemaining = bytesRemaining;
|
||||
}
|
||||
|
||||
@@ -284,37 +281,15 @@ public final class HttpConnection {
|
||||
private static final byte[] FINAL_CHUNK = new byte[] { '0', '\r', '\n', '\r', '\n' };
|
||||
|
||||
/**
|
||||
* An HTTP body with alternating chunk sizes and chunk bodies. Chunks are
|
||||
* buffered until {@code maxChunkLength} bytes are ready, at which point the
|
||||
* chunk is written and the buffer is cleared.
|
||||
* An HTTP body with alternating chunk sizes and chunk bodies. It is the
|
||||
* caller's responsibility to buffer chunks; typically by using a buffered
|
||||
* sink with this sink.
|
||||
*/
|
||||
private final class ChunkedSink implements Sink {
|
||||
/** Scratch space for up to 8 hex digits, and then a constant CRLF. */
|
||||
private final byte[] hex = { 0, 0, 0, 0, 0, 0, 0, 0, '\r', '\n' };
|
||||
/** Scratch space for up to 16 hex digits, and then a constant CRLF. */
|
||||
private final byte[] hex = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, '\r', '\n' };
|
||||
|
||||
private boolean closed;
|
||||
private final BufferedSink sink;
|
||||
private final int maxChunkLength;
|
||||
private final OkBuffer bufferedChunk;
|
||||
|
||||
private ChunkedSink(BufferedSink sink, int maxChunkLength) {
|
||||
this.sink = sink;
|
||||
this.maxChunkLength = Math.max(1, dataLength(maxChunkLength));
|
||||
this.bufferedChunk = new OkBuffer();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the amount of data that can be transmitted in a chunk whose total
|
||||
* length (data+headers) is {@code dataPlusHeaderLength}. This is presumably
|
||||
* useful to match sizes with wire-protocol packets.
|
||||
*/
|
||||
private int dataLength(int dataPlusHeaderLength) {
|
||||
int headerLength = 4; // "\r\n" after the size plus another "\r\n" after the data
|
||||
for (int i = dataPlusHeaderLength - headerLength; i > 0; i >>= 4) {
|
||||
headerLength++;
|
||||
}
|
||||
return dataPlusHeaderLength - headerLength;
|
||||
}
|
||||
|
||||
@Override public Sink deadline(Deadline deadline) {
|
||||
return this; // TODO: honor deadline.
|
||||
@@ -322,43 +297,33 @@ public final class HttpConnection {
|
||||
|
||||
@Override public void write(OkBuffer source, long byteCount) throws IOException {
|
||||
if (closed) throw new IllegalStateException("closed");
|
||||
bufferedChunk.write(source, byteCount);
|
||||
if (bufferedChunk.size() > maxChunkLength) {
|
||||
writeBufferedChunkToSocket();
|
||||
}
|
||||
if (byteCount == 0) return;
|
||||
|
||||
writeHex(byteCount);
|
||||
sink.write(source, byteCount);
|
||||
sink.writeUtf8(CRLF);
|
||||
}
|
||||
|
||||
@Override public synchronized void flush() throws IOException {
|
||||
if (closed) return; // Don't throw; this stream might have been closed on the caller's behalf.
|
||||
writeBufferedChunkToSocket();
|
||||
sink.flush();
|
||||
}
|
||||
|
||||
@Override public synchronized void close() throws IOException {
|
||||
if (closed) return;
|
||||
closed = true;
|
||||
writeBufferedChunkToSocket();
|
||||
sink.write(FINAL_CHUNK);
|
||||
state = STATE_READ_RESPONSE_HEADERS;
|
||||
}
|
||||
|
||||
private void writeBufferedChunkToSocket() throws IOException {
|
||||
int size = (int) bufferedChunk.size();
|
||||
if (size == 0) return;
|
||||
|
||||
writeHex(size);
|
||||
sink.write(bufferedChunk, bufferedChunk.size());
|
||||
sink.writeUtf8(CRLF);
|
||||
}
|
||||
|
||||
/**
|
||||
* Equivalent to, but cheaper than writing Integer.toHexString().getBytes()
|
||||
* Equivalent to, but cheaper than writing Long.toHexString().getBytes()
|
||||
* followed by CRLF.
|
||||
*/
|
||||
private void writeHex(int i) throws IOException {
|
||||
int cursor = 8;
|
||||
private void writeHex(long i) throws IOException {
|
||||
int cursor = 16;
|
||||
do {
|
||||
hex[--cursor] = HEX_DIGITS[i & 0xf];
|
||||
hex[--cursor] = HEX_DIGITS[((int) (i & 0xf))];
|
||||
} while ((i >>>= 4) != 0);
|
||||
sink.write(hex, cursor, hex.length - cursor);
|
||||
}
|
||||
|
||||
@@ -24,8 +24,6 @@ import okio.Sink;
|
||||
import okio.Source;
|
||||
|
||||
public final class HttpTransport implements Transport {
|
||||
public static final int DEFAULT_CHUNK_LENGTH = 1024;
|
||||
|
||||
private final HttpEngine httpEngine;
|
||||
private final HttpConnection httpConnection;
|
||||
|
||||
@@ -58,13 +56,13 @@ public final class HttpTransport implements Transport {
|
||||
if ("chunked".equalsIgnoreCase(request.header("Transfer-Encoding"))) {
|
||||
// Stream a request body of unknown length.
|
||||
writeRequestHeaders(request);
|
||||
return httpConnection.newChunkedOutputStream(DEFAULT_CHUNK_LENGTH);
|
||||
return httpConnection.newChunkedSink();
|
||||
}
|
||||
|
||||
if (contentLength != -1) {
|
||||
// Stream a request body of a known length.
|
||||
writeRequestHeaders(request);
|
||||
return httpConnection.newFixedLengthOutputStream(contentLength);
|
||||
return httpConnection.newFixedLengthSink(contentLength);
|
||||
}
|
||||
|
||||
throw new IllegalStateException(
|
||||
|
||||
Reference in New Issue
Block a user