1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-21 03:41:07 +03:00

Use BufferedSink in HttpConnection.

This commit is contained in:
jwilson
2014-02-23 12:55:03 -05:00
parent 2ea11f66b8
commit c16436f8b2
3 changed files with 49 additions and 60 deletions

View File

@@ -23,7 +23,6 @@ import com.squareup.okhttp.internal.http.HttpEngine;
import com.squareup.okhttp.internal.http.HttpTransport;
import com.squareup.okhttp.internal.http.SpdyTransport;
import com.squareup.okhttp.internal.spdy.SpdyConnection;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
@@ -100,8 +99,8 @@ public final class Connection implements Closeable {
if (route.address.sslSocketFactory != null) {
upgradeToTls(tunnelRequest);
} else {
streamWrapper(true);
httpConnection = new HttpConnection(pool, this, source, out);
initSourceAndSink();
httpConnection = new HttpConnection(pool, this, source, sink);
}
connected = true;
}
@@ -155,6 +154,7 @@ public final class Connection implements Closeable {
out = sslSocket.getOutputStream();
in = sslSocket.getInputStream();
handshake = Handshake.get(sslSocket.getSession());
initSourceAndSink();
ByteString maybeProtocol;
Protocol selectedProtocol = Protocol.HTTP_11;
@@ -163,14 +163,12 @@ public final class Connection implements Closeable {
}
if (selectedProtocol.spdyVariant) {
streamWrapper(false);
sslSocket.setSoTimeout(0); // SPDY timeouts are set per-stream.
spdyConnection = new SpdyConnection.Builder(route.address.getUriHost(), true, source, sink)
.protocol(selectedProtocol).build();
spdyConnection.sendConnectionHeader();
} else {
streamWrapper(true);
httpConnection = new HttpConnection(pool, this, source, out);
httpConnection = new HttpConnection(pool, this, source, sink);
}
}
@@ -310,11 +308,13 @@ public final class Connection implements Closeable {
*/
private void makeTunnel(TunnelRequest tunnelRequest) throws IOException {
BufferedSource tunnelSource = Okio.buffer(Okio.source(in));
HttpConnection tunnelConnection = new HttpConnection(pool, this, tunnelSource, out);
BufferedSink tunnelSink = Okio.buffer(Okio.sink(out));
HttpConnection tunnelConnection = new HttpConnection(pool, this, tunnelSource, tunnelSink);
Request request = tunnelRequest.getRequest();
String requestLine = tunnelRequest.requestLine();
while (true) {
tunnelConnection.writeRequest(request.headers(), requestLine);
tunnelConnection.flush();
Response response = tunnelConnection.readResponse().request(request).build();
tunnelConnection.emptyResponseBody();
@@ -340,14 +340,8 @@ public final class Connection implements Closeable {
}
}
// TODO: drop the outputStream option when we use Okio's sink in HttpConnection.
private void streamWrapper(boolean outputStream) throws IOException {
private void initSourceAndSink() throws IOException {
source = Okio.buffer(Okio.source(in));
if (outputStream) {
out = new BufferedOutputStream(out, 256);
} else {
sink = Okio.buffer(Okio.sink(out));
}
sink = Okio.buffer(Okio.sink(out));
}
}

View File

@@ -23,12 +23,12 @@ import com.squareup.okhttp.Protocol;
import com.squareup.okhttp.Response;
import com.squareup.okhttp.internal.AbstractOutputStream;
import com.squareup.okhttp.internal.Util;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.CacheRequest;
import java.net.ProtocolException;
import java.net.Socket;
import okio.BufferedSink;
import okio.BufferedSource;
import okio.Deadline;
import okio.OkBuffer;
@@ -74,17 +74,17 @@ public final class HttpConnection {
private final ConnectionPool pool;
private final Connection connection;
private final BufferedSource source;
private final OutputStream out;
private final BufferedSink sink;
private int state = STATE_IDLE;
private int onIdle = ON_IDLE_HOLD;
public HttpConnection(ConnectionPool pool, Connection connection, BufferedSource source,
OutputStream out) {
BufferedSink sink) {
this.pool = pool;
this.connection = connection;
this.source = source;
this.out = out;
this.sink = sink;
}
/**
@@ -121,22 +121,20 @@ public final class HttpConnection {
}
public void flush() throws IOException {
out.flush();
sink.flush();
}
/** Returns bytes of a request header for sending on an HTTP transport. */
public void writeRequest(Headers headers, String requestLine) throws IOException {
if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
StringBuilder result = new StringBuilder(256);
result.append(requestLine).append("\r\n");
sink.writeUtf8(requestLine).writeUtf8("\r\n");
for (int i = 0; i < headers.size(); i ++) {
result.append(headers.name(i))
.append(": ")
.append(headers.value(i))
.append("\r\n");
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n");
}
result.append("\r\n");
out.write(result.toString().getBytes("ISO-8859-1"));
sink.writeUtf8("\r\n");
state = STATE_OPEN_REQUEST_BODY;
}
@@ -214,19 +212,19 @@ public final class HttpConnection {
public OutputStream newChunkedOutputStream(int defaultChunkLength) {
if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
state = STATE_WRITING_REQUEST_BODY;
return new ChunkedOutputStream(out, defaultChunkLength);
return new ChunkedOutputStream(sink, defaultChunkLength);
}
public OutputStream newFixedLengthOutputStream(long contentLength) {
if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
state = STATE_WRITING_REQUEST_BODY;
return new FixedLengthOutputStream(out, contentLength);
return new FixedLengthOutputStream(sink, contentLength);
}
public void writeRequestBody(RetryableOutputStream requestBody) throws IOException {
if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
state = STATE_READ_RESPONSE_HEADERS;
requestBody.writeToSocket(out);
requestBody.writeToSocket(sink);
}
public Source newFixedLengthSource(CacheRequest cacheRequest, long length)
@@ -259,11 +257,11 @@ public final class HttpConnection {
/** An HTTP body with a fixed length known in advance. */
private final class FixedLengthOutputStream extends AbstractOutputStream {
private final OutputStream socketOut;
private final BufferedSink sink;
private long bytesRemaining;
private FixedLengthOutputStream(OutputStream socketOut, long bytesRemaining) {
this.socketOut = socketOut;
private FixedLengthOutputStream(BufferedSink sink, long bytesRemaining) {
this.sink = sink;
this.bytesRemaining = bytesRemaining;
}
@@ -273,7 +271,7 @@ public final class HttpConnection {
if (count > bytesRemaining) {
throw new ProtocolException("expected " + bytesRemaining + " bytes but received " + count);
}
socketOut.write(buffer, offset, count);
sink.write(buffer, offset, count);
bytesRemaining -= count;
}
@@ -281,7 +279,7 @@ public final class HttpConnection {
if (closed) {
return; // don't throw; this stream might have been closed on the caller's behalf
}
socketOut.flush();
sink.flush();
}
@Override public void close() throws IOException {
@@ -296,7 +294,7 @@ public final class HttpConnection {
}
}
private static final byte[] CRLF = { '\r', '\n' };
private static final String CRLF = "\r\n";
private static final byte[] HEX_DIGITS = {
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
};
@@ -311,14 +309,14 @@ public final class HttpConnection {
/** Scratch space for up to 8 hex digits, and then a constant CRLF. */
private final byte[] hex = { 0, 0, 0, 0, 0, 0, 0, 0, '\r', '\n' };
private final OutputStream socketOut;
private final BufferedSink sink;
private final int maxChunkLength;
private final ByteArrayOutputStream bufferedChunk;
private final OkBuffer bufferedChunk;
private ChunkedOutputStream(OutputStream socketOut, int maxChunkLength) {
this.socketOut = socketOut;
private ChunkedOutputStream(BufferedSink sink, int maxChunkLength) {
this.sink = sink;
this.maxChunkLength = Math.max(1, dataLength(maxChunkLength));
this.bufferedChunk = new ByteArrayOutputStream(maxChunkLength);
this.bufferedChunk = new OkBuffer();
}
/**
@@ -342,20 +340,20 @@ public final class HttpConnection {
while (count > 0) {
int numBytesWritten;
if (bufferedChunk.size() > 0 || count < maxChunkLength) {
if (bufferedChunk.byteCount() > 0 || count < maxChunkLength) {
// fill the buffered chunk and then maybe write that to the stream
numBytesWritten = Math.min(count, maxChunkLength - bufferedChunk.size());
numBytesWritten = (int) Math.min(count, maxChunkLength - bufferedChunk.byteCount());
// TODO: skip unnecessary copies from buffer->bufferedChunk?
bufferedChunk.write(buffer, offset, numBytesWritten);
if (bufferedChunk.size() == maxChunkLength) {
if (bufferedChunk.byteCount() == maxChunkLength) {
writeBufferedChunkToSocket();
}
} else {
// write a single chunk of size maxChunkLength to the stream
numBytesWritten = maxChunkLength;
writeHex(numBytesWritten);
socketOut.write(buffer, offset, numBytesWritten);
socketOut.write(CRLF);
sink.write(buffer, offset, numBytesWritten);
sink.writeUtf8(CRLF);
}
offset += numBytesWritten;
@@ -372,7 +370,7 @@ public final class HttpConnection {
do {
hex[--cursor] = HEX_DIGITS[i & 0xf];
} while ((i >>>= 4) != 0);
socketOut.write(hex, cursor, hex.length - cursor);
sink.write(hex, cursor, hex.length - cursor);
}
@Override public synchronized void flush() throws IOException {
@@ -380,7 +378,7 @@ public final class HttpConnection {
return; // don't throw; this stream might have been closed on the caller's behalf
}
writeBufferedChunkToSocket();
socketOut.flush();
sink.flush();
}
@Override public synchronized void close() throws IOException {
@@ -389,20 +387,17 @@ public final class HttpConnection {
}
closed = true;
writeBufferedChunkToSocket();
socketOut.write(FINAL_CHUNK);
sink.write(FINAL_CHUNK);
state = STATE_READ_RESPONSE_HEADERS;
}
private void writeBufferedChunkToSocket() throws IOException {
int size = bufferedChunk.size();
if (size <= 0) {
return;
}
int size = (int) bufferedChunk.byteCount();
if (size == 0) return;
writeHex(size);
bufferedChunk.writeTo(socketOut);
bufferedChunk.reset();
socketOut.write(CRLF);
sink.write(bufferedChunk, bufferedChunk.byteCount());
sink.writeUtf8(CRLF);
}
}

View File

@@ -19,8 +19,8 @@ package com.squareup.okhttp.internal.http;
import com.squareup.okhttp.internal.AbstractOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ProtocolException;
import okio.BufferedSink;
import static com.squareup.okhttp.internal.Util.checkOffsetAndCount;
@@ -69,7 +69,7 @@ final class RetryableOutputStream extends AbstractOutputStream {
return content.size();
}
public void writeToSocket(OutputStream socketOut) throws IOException {
content.writeTo(socketOut);
public void writeToSocket(BufferedSink socketOut) throws IOException {
content.writeTo(socketOut.outputStream());
}
}