From ab78dadb4bb94db74770e179e04e2ed791c4a51e Mon Sep 17 00:00:00 2001 From: jwilson Date: Sun, 23 Feb 2014 11:42:59 -0500 Subject: [PATCH] Use BufferedSink in SPDY and HTTP/2. --- .../okhttp/internal/spdy/HpackDraft05.java | 18 +- .../okhttp/internal/spdy/Http20Draft09.java | 95 +++-- .../squareup/okhttp/internal/spdy/Spdy3.java | 143 ++++---- .../okhttp/internal/spdy/SpdyConnection.java | 16 +- .../okhttp/internal/spdy/SpdyStream.java | 6 +- .../okhttp/internal/spdy/Variant.java | 4 +- .../internal/spdy/HpackDraft05Test.java | 336 ++++++++---------- .../internal/spdy/Http20Draft09Test.java | 26 +- .../okhttp/internal/spdy/MockSpdyPeer.java | 24 +- .../okhttp/internal/spdy/Spdy3Test.java | 12 +- .../internal/spdy/SpdyConnectionTest.java | 2 +- .../java/com/squareup/okhttp/Connection.java | 19 +- okio/src/main/java/okio/DeflaterSink.java | 5 +- okio/src/main/java/okio/RealBufferedSink.java | 1 + 14 files changed, 344 insertions(+), 363 deletions(-) diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/HpackDraft05.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/HpackDraft05.java index e16f038d3..0f51db6c3 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/HpackDraft05.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/HpackDraft05.java @@ -2,7 +2,6 @@ package com.squareup.okhttp.internal.spdy; import com.squareup.okhttp.internal.BitArray; import java.io.IOException; -import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -11,6 +10,7 @@ import java.util.List; import java.util.Map; import okio.BufferedSource; import okio.ByteString; +import okio.OkBuffer; import okio.Okio; import okio.Source; @@ -399,9 +399,9 @@ final class HpackDraft05 { } static final class Writer { - private final OutputStream out; + private final OkBuffer out; - Writer(OutputStream out) { + Writer(OkBuffer out) { this.out = out; } @@ -415,7 +415,7 @@ final class HpackDraft05 { writeInt(staticIndex + 1, PREFIX_6_BITS, 0x40); writeByteString(headerBlock.get(i).value); } else { - out.write(0x40); // Literal Header without Indexing - New Name. + out.writeByte(0x40); // Literal Header without Indexing - New Name. writeByteString(name); writeByteString(headerBlock.get(i).value); } @@ -426,26 +426,26 @@ final class HpackDraft05 { void writeInt(int value, int prefixMask, int bits) throws IOException { // Write the raw value for a single byte value. if (value < prefixMask) { - out.write(bits | value); + out.writeByte(bits | value); return; } // Write the mask to start a multibyte value. - out.write(bits | prefixMask); + out.writeByte(bits | prefixMask); value -= prefixMask; // Write 7 bits at a time 'til we're done. while (value >= 0x80) { int b = value & 0x7f; - out.write(b | 0x80); + out.writeByte(b | 0x80); value >>>= 7; } - out.write(value); + out.writeByte(value); } void writeByteString(ByteString data) throws IOException { writeInt(data.size(), PREFIX_8_BITS, 0); - data.write(out); + out.write(data); } } } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft09.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft09.java index ffc810431..57feb3282 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft09.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft09.java @@ -16,11 +16,9 @@ package com.squareup.okhttp.internal.spdy; import com.squareup.okhttp.Protocol; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.IOException; -import java.io.OutputStream; import java.util.List; +import okio.BufferedSink; import okio.BufferedSource; import okio.ByteString; import okio.Deadline; @@ -62,8 +60,8 @@ public final class Http20Draft09 implements Variant { return new Reader(source, 4096, client); } - @Override public FrameWriter newWriter(OutputStream out, boolean client) { - return new Writer(out, client); + @Override public FrameWriter newWriter(BufferedSink sink, boolean client) { + return new Writer(sink, client); } static final class Reader implements FrameReader { @@ -287,20 +285,20 @@ public final class Http20Draft09 implements Variant { } static final class Writer implements FrameWriter { - private final DataOutputStream out; + private final BufferedSink sink; private final boolean client; - private final ByteArrayOutputStream hpackBuffer; + private final OkBuffer hpackBuffer; private final HpackDraft05.Writer hpackWriter; - Writer(OutputStream out, boolean client) { - this.out = new DataOutputStream(out); + Writer(BufferedSink sink, boolean client) { + this.sink = sink; this.client = client; - this.hpackBuffer = new ByteArrayOutputStream(); + this.hpackBuffer = new OkBuffer(); this.hpackWriter = new HpackDraft05.Writer(hpackBuffer); } @Override public synchronized void flush() throws IOException { - out.flush(); + sink.flush(); } @Override public synchronized void ackSettings() throws IOException { @@ -309,17 +307,17 @@ public final class Http20Draft09 implements Variant { byte flags = FLAG_ACK; int streamId = 0; frameHeader(length, type, flags, streamId); + sink.flush(); } @Override public synchronized void connectionHeader() throws IOException { if (!client) return; // Nothing to write; servers don't send connection headers! - out.write(CONNECTION_HEADER.toByteArray()); - out.flush(); + sink.write(CONNECTION_HEADER.toByteArray()); + sink.flush(); } - @Override - public synchronized void synStream(boolean outFinished, boolean inFinished, int streamId, - int associatedStreamId, int priority, int slot, List
headerBlock) + @Override public synchronized void synStream(boolean outFinished, boolean inFinished, + int streamId, int associatedStreamId, int priority, int slot, List
headerBlock) throws IOException { if (inFinished) throw new UnsupportedOperationException(); headers(outFinished, streamId, priority, headerBlock); @@ -335,34 +333,33 @@ public final class Http20Draft09 implements Variant { headers(false, streamId, -1, headerBlock); } - @Override - public synchronized void pushPromise(int streamId, int promisedStreamId, + @Override public synchronized void pushPromise(int streamId, int promisedStreamId, List
requestHeaders) throws IOException { - hpackBuffer.reset(); + if (hpackBuffer.byteCount() != 0) throw new IllegalStateException(); hpackWriter.writeHeaders(requestHeaders); - int length = 4 + hpackBuffer.size(); + int length = (int) (4 + hpackBuffer.byteCount()); byte type = TYPE_PUSH_PROMISE; byte flags = FLAG_END_HEADERS; frameHeader(length, type, flags, streamId); // TODO: CONTINUATION - out.writeInt(promisedStreamId & 0x7fffffff); - hpackBuffer.writeTo(out); + sink.writeInt(promisedStreamId & 0x7fffffff); + sink.write(hpackBuffer, hpackBuffer.byteCount()); } private void headers(boolean outFinished, int streamId, int priority, List
headerBlock) throws IOException { - hpackBuffer.reset(); + if (hpackBuffer.byteCount() != 0) throw new IllegalStateException(); hpackWriter.writeHeaders(headerBlock); - int length = hpackBuffer.size(); + int length = (int) hpackBuffer.byteCount(); byte type = TYPE_HEADERS; byte flags = FLAG_END_HEADERS; if (outFinished) flags |= FLAG_END_STREAM; if (priority != -1) flags |= FLAG_PRIORITY; if (priority != -1) length += 4; frameHeader(length, type, flags, streamId); // TODO: CONTINUATION - if (priority != -1) out.writeInt(priority & 0x7fffffff); - hpackBuffer.writeTo(out); + if (priority != -1) sink.writeInt(priority & 0x7fffffff); + sink.write(hpackBuffer, hpackBuffer.byteCount()); } @Override public synchronized void rstStream(int streamId, ErrorCode errorCode) @@ -373,8 +370,8 @@ public final class Http20Draft09 implements Variant { byte type = TYPE_RST_STREAM; byte flags = FLAG_NONE; frameHeader(length, type, flags, streamId); - out.writeInt(errorCode.httpCode); - out.flush(); + sink.writeInt(errorCode.httpCode); + sink.flush(); } @Override public synchronized void data(boolean outFinished, int streamId, byte[] data) @@ -393,7 +390,7 @@ public final class Http20Draft09 implements Variant { throws IOException { byte type = TYPE_DATA; frameHeader(length, type, flags, streamId); - out.write(data, offset, length); + sink.write(data, offset, length); } @Override public synchronized void settings(Settings settings) throws IOException { @@ -404,10 +401,10 @@ public final class Http20Draft09 implements Variant { frameHeader(length, type, flags, streamId); for (int i = 0; i < Settings.COUNT; i++) { if (!settings.isSet(i)) continue; - out.writeInt(i & 0xffffff); - out.writeInt(settings.get(i)); + sink.writeInt(i & 0xffffff); + sink.writeInt(settings.get(i)); } - out.flush(); + sink.flush(); } @Override public synchronized void ping(boolean ack, int payload1, int payload2) @@ -417,26 +414,25 @@ public final class Http20Draft09 implements Variant { byte flags = ack ? FLAG_ACK : FLAG_NONE; int streamId = 0; frameHeader(length, type, flags, streamId); - out.writeInt(payload1); - out.writeInt(payload2); - out.flush(); + sink.writeInt(payload1); + sink.writeInt(payload2); + sink.flush(); } - @Override - public synchronized void goAway(int lastGoodStreamId, ErrorCode errorCode, byte[] debugData) - throws IOException { + @Override public synchronized void goAway(int lastGoodStreamId, ErrorCode errorCode, + byte[] debugData) throws IOException { if (errorCode.httpCode == -1) throw illegalArgument("errorCode.httpCode == -1"); int length = 8 + debugData.length; byte type = TYPE_GOAWAY; byte flags = FLAG_NONE; int streamId = 0; frameHeader(length, type, flags, streamId); - out.writeInt(lastGoodStreamId); - out.writeInt(errorCode.httpCode); + sink.writeInt(lastGoodStreamId); + sink.writeInt(errorCode.httpCode); if (debugData.length > 0) { - out.write(debugData); + sink.write(debugData); } - out.flush(); + sink.flush(); } @Override public synchronized void windowUpdate(int streamId, long windowSizeIncrement) @@ -449,19 +445,19 @@ public final class Http20Draft09 implements Variant { byte type = TYPE_WINDOW_UPDATE; byte flags = FLAG_NONE; frameHeader(length, type, flags, streamId); - out.writeInt((int) windowSizeIncrement); - out.flush(); + sink.writeInt((int) windowSizeIncrement); + sink.flush(); } - @Override public void close() throws IOException { - out.close(); + @Override public synchronized void close() throws IOException { + sink.close(); } void frameHeader(int length, byte type, byte flags, int streamId) throws IOException { if (length > 16383) throw illegalArgument("FRAME_SIZE_ERROR length > 16383: %s", length); if ((streamId & 0x80000000) != 0) throw illegalArgument("reserved bit set: %s", streamId); - out.writeInt((length & 0x3fff) << 16 | (type & 0xff) << 8 | (flags & 0xff)); - out.writeInt(streamId & 0x7fffffff); + sink.writeInt((length & 0x3fff) << 16 | (type & 0xff) << 8 | (flags & 0xff)); + sink.writeInt(streamId & 0x7fffffff); } } @@ -491,8 +487,7 @@ public final class Http20Draft09 implements Variant { this.source = source; } - @Override public long read(OkBuffer sink, long byteCount) - throws IOException { + @Override public long read(OkBuffer sink, long byteCount) throws IOException { while (left == 0) { if ((flags & FLAG_END_HEADERS) != 0) return -1; readContinuationHeader(); diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java index 4566fe173..1c240fd0d 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java @@ -16,18 +16,18 @@ package com.squareup.okhttp.internal.spdy; import com.squareup.okhttp.Protocol; -import com.squareup.okhttp.internal.Platform; import com.squareup.okhttp.internal.Util; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.IOException; -import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.net.ProtocolException; import java.util.List; import java.util.zip.Deflater; +import okio.BufferedSink; import okio.BufferedSource; import okio.ByteString; +import okio.DeflaterSink; +import okio.OkBuffer; +import okio.Okio; /** * Read and write spdy/3.1 frames. @@ -99,8 +99,8 @@ final class Spdy3 implements Variant { return new Reader(source, client); } - @Override public FrameWriter newWriter(OutputStream out, boolean client) { - return new Writer(out, client); + @Override public FrameWriter newWriter(BufferedSink sink, boolean client) { + return new Writer(sink, client); } /** Read spdy/3 frames. */ @@ -287,20 +287,19 @@ final class Spdy3 implements Variant { /** Write spdy/3 frames. */ static final class Writer implements FrameWriter { - private final DataOutputStream out; - private final ByteArrayOutputStream headerBlockBuffer; - private final DataOutputStream headerBlockOut; + private final BufferedSink sink; + private final OkBuffer headerBlockBuffer; + private final BufferedSink headerBlockOut; private final boolean client; - Writer(OutputStream out, boolean client) { - this.out = new DataOutputStream(out); + Writer(BufferedSink sink, boolean client) { + this.sink = sink; this.client = client; Deflater deflater = new Deflater(); deflater.setDictionary(DICTIONARY); - headerBlockBuffer = new ByteArrayOutputStream(); - headerBlockOut = new DataOutputStream( - Platform.get().newDeflaterOutputStream(headerBlockBuffer, deflater, true)); + headerBlockBuffer = new OkBuffer(); + headerBlockOut = Okio.buffer(new DeflaterSink(headerBlockBuffer, deflater)); } @Override public void ackSettings() { @@ -318,26 +317,25 @@ final class Spdy3 implements Variant { } @Override public synchronized void flush() throws IOException { - out.flush(); + sink.flush(); } - @Override - public synchronized void synStream(boolean outFinished, boolean inFinished, int streamId, - int associatedStreamId, int priority, int slot, List
headerBlock) + @Override public synchronized void synStream(boolean outFinished, boolean inFinished, + int streamId, int associatedStreamId, int priority, int slot, List
headerBlock) throws IOException { writeNameValueBlockToBuffer(headerBlock); - int length = 10 + headerBlockBuffer.size(); + int length = (int) (10 + headerBlockBuffer.byteCount()); int type = TYPE_SYN_STREAM; int flags = (outFinished ? FLAG_FIN : 0) | (inFinished ? FLAG_UNIDIRECTIONAL : 0); int unused = 0; - out.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); - out.writeInt((flags & 0xff) << 24 | length & 0xffffff); - out.writeInt(streamId & 0x7fffffff); - out.writeInt(associatedStreamId & 0x7fffffff); - out.writeShort((priority & 0x7) << 13 | (unused & 0x1f) << 8 | (slot & 0xff)); - headerBlockBuffer.writeTo(out); - out.flush(); + sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); + sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); + sink.writeInt(streamId & 0x7fffffff); + sink.writeInt(associatedStreamId & 0x7fffffff); + sink.writeShort((priority & 0x7) << 13 | (unused & 0x1f) << 8 | (slot & 0xff)); + sink.write(headerBlockBuffer, headerBlockBuffer.byteCount()); + sink.flush(); } @Override public synchronized void synReply(boolean outFinished, int streamId, @@ -345,13 +343,13 @@ final class Spdy3 implements Variant { writeNameValueBlockToBuffer(headerBlock); int type = TYPE_SYN_REPLY; int flags = (outFinished ? FLAG_FIN : 0); - int length = headerBlockBuffer.size() + 4; + int length = (int) (headerBlockBuffer.byteCount() + 4); - out.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); - out.writeInt((flags & 0xff) << 24 | length & 0xffffff); - out.writeInt(streamId & 0x7fffffff); - headerBlockBuffer.writeTo(out); - out.flush(); + sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); + sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); + sink.writeInt(streamId & 0x7fffffff); + sink.write(headerBlockBuffer, headerBlockBuffer.byteCount()); + sink.flush(); } @Override public synchronized void headers(int streamId, List
headerBlock) @@ -359,12 +357,12 @@ final class Spdy3 implements Variant { writeNameValueBlockToBuffer(headerBlock); int flags = 0; int type = TYPE_HEADERS; - int length = headerBlockBuffer.size() + 4; + int length = (int) (headerBlockBuffer.byteCount() + 4); - out.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); - out.writeInt((flags & 0xff) << 24 | length & 0xffffff); - out.writeInt(streamId & 0x7fffffff); - headerBlockBuffer.writeTo(out); + sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); + sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); + sink.writeInt(streamId & 0x7fffffff); + sink.write(headerBlockBuffer, headerBlockBuffer.byteCount()); } @Override public synchronized void rstStream(int streamId, ErrorCode errorCode) @@ -373,11 +371,11 @@ final class Spdy3 implements Variant { int flags = 0; int type = TYPE_RST_STREAM; int length = 8; - out.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); - out.writeInt((flags & 0xff) << 24 | length & 0xffffff); - out.writeInt(streamId & 0x7fffffff); - out.writeInt(errorCode.spdyRstCode); - out.flush(); + sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); + sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); + sink.writeInt(streamId & 0x7fffffff); + sink.writeInt(errorCode.spdyRstCode); + sink.flush(); } @Override public synchronized void data(boolean outFinished, int streamId, byte[] data) @@ -397,21 +395,21 @@ final class Spdy3 implements Variant { if (byteCount > 0xffffffL) { throw new IllegalArgumentException("FRAME_TOO_LARGE max size is 16Mib: " + byteCount); } - out.writeInt(streamId & 0x7fffffff); - out.writeInt((flags & 0xff) << 24 | byteCount & 0xffffff); - out.write(data, offset, byteCount); + sink.writeInt(streamId & 0x7fffffff); + sink.writeInt((flags & 0xff) << 24 | byteCount & 0xffffff); + sink.write(data, offset, byteCount); } private void writeNameValueBlockToBuffer(List
headerBlock) throws IOException { - headerBlockBuffer.reset(); + if (headerBlockBuffer.byteCount() != 0) throw new IllegalStateException(); headerBlockOut.writeInt(headerBlock.size()); for (int i = 0, size = headerBlock.size(); i < size; i++) { ByteString name = headerBlock.get(i).name; headerBlockOut.writeInt(name.size()); - name.write(headerBlockOut); + headerBlockOut.write(name); ByteString value = headerBlock.get(i).value; headerBlockOut.writeInt(value.size()); - value.write(headerBlockOut); + headerBlockOut.write(value); } headerBlockOut.flush(); } @@ -421,16 +419,16 @@ final class Spdy3 implements Variant { int flags = 0; int size = settings.size(); int length = 4 + size * 8; - out.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); - out.writeInt((flags & 0xff) << 24 | length & 0xffffff); - out.writeInt(size); + sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); + sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); + sink.writeInt(size); for (int i = 0; i <= Settings.COUNT; i++) { if (!settings.isSet(i)) continue; int settingsFlags = settings.flags(i); - out.writeInt((settingsFlags & 0xff) << 24 | (i & 0xffffff)); - out.writeInt(settings.get(i)); + sink.writeInt((settingsFlags & 0xff) << 24 | (i & 0xffffff)); + sink.writeInt(settings.get(i)); } - out.flush(); + sink.flush(); } @Override public synchronized void ping(boolean reply, int payload1, int payload2) @@ -440,26 +438,25 @@ final class Spdy3 implements Variant { int type = TYPE_PING; int flags = 0; int length = 4; - out.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); - out.writeInt((flags & 0xff) << 24 | length & 0xffffff); - out.writeInt(payload1); - out.flush(); + sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); + sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); + sink.writeInt(payload1); + sink.flush(); } - @Override - public synchronized void goAway(int lastGoodStreamId, ErrorCode errorCode, byte[] ignored) - throws IOException { + @Override public synchronized void goAway(int lastGoodStreamId, ErrorCode errorCode, + byte[] ignored) throws IOException { if (errorCode.spdyGoAwayCode == -1) { throw new IllegalArgumentException("errorCode.spdyGoAwayCode == -1"); } int type = TYPE_GOAWAY; int flags = 0; int length = 8; - out.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); - out.writeInt((flags & 0xff) << 24 | length & 0xffffff); - out.writeInt(lastGoodStreamId); - out.writeInt(errorCode.spdyGoAwayCode); - out.flush(); + sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); + sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); + sink.writeInt(lastGoodStreamId); + sink.writeInt(errorCode.spdyGoAwayCode); + sink.flush(); } @Override public synchronized void windowUpdate(int streamId, long increment) @@ -471,15 +468,15 @@ final class Spdy3 implements Variant { int type = TYPE_WINDOW_UPDATE; int flags = 0; int length = 8; - out.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); - out.writeInt((flags & 0xff) << 24 | length & 0xffffff); - out.writeInt(streamId); - out.writeInt((int) increment); - out.flush(); + sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); + sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); + sink.writeInt(streamId); + sink.writeInt((int) increment); + sink.flush(); } - @Override public void close() throws IOException { - Util.closeAll(out, headerBlockOut); + @Override public synchronized void close() throws IOException { + Util.closeAll(sink, headerBlockOut); } } } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java index 3f2ac90a4..984df71da 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java @@ -21,7 +21,6 @@ import com.squareup.okhttp.internal.Util; import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; -import java.io.OutputStream; import java.net.Socket; import java.util.HashMap; import java.util.Iterator; @@ -31,6 +30,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import okio.BufferedSink; import okio.BufferedSource; import okio.ByteString; import okio.Okio; @@ -148,7 +148,7 @@ public final class SpdyConnection implements Closeable { bytesLeftInWriteWindow = peerSettings.getInitialWindowSize(); bufferPool = new ByteArrayPool(INITIAL_WINDOW_SIZE * 8); // TODO: revisit size limit! frameReader = variant.newReader(builder.source, client); - frameWriter = variant.newWriter(builder.out, client); + frameWriter = variant.newWriter(builder.sink, client); readerRunnable = new Reader(); new Thread(readerRunnable).start(); // Not a daemon thread. @@ -232,6 +232,10 @@ public final class SpdyConnection implements Closeable { requestHeaders); } + if (!out) { + frameWriter.flush(); + } + return stream; } @@ -458,25 +462,25 @@ public final class SpdyConnection implements Closeable { public static class Builder { private String hostName; private BufferedSource source; - private OutputStream out; + private BufferedSink sink; private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS; private Protocol protocol = Protocol.SPDY_3; private boolean client; public Builder(boolean client, Socket socket) throws IOException { this("", client, Okio.buffer(Okio.source(socket.getInputStream())), - socket.getOutputStream()); + Okio.buffer(Okio.sink(socket.getOutputStream()))); } /** * @param client true if this peer initiated the connection; false if this * peer accepted the connection. */ - public Builder(String hostName, boolean client, BufferedSource source, OutputStream out) { + public Builder(String hostName, boolean client, BufferedSource source, BufferedSink sink) { this.hostName = hostName; this.client = client; this.source = source; - this.out = out; + this.sink = sink; } public Builder handler(IncomingStreamHandler handler) { diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java index e1fd87518..a70a5661b 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java @@ -194,6 +194,10 @@ public final class SpdyStream { } } connection.writeSynReply(id, outFinished, responseHeaders); + + if (outFinished) { + connection.flush(); + } } /** @@ -547,8 +551,8 @@ public final class SpdyStream { } if (pos > 0) { writeFrame(); - connection.flush(); } + connection.flush(); } @Override public void close() throws IOException { diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Variant.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Variant.java index b9940d612..f8b73f9a9 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Variant.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Variant.java @@ -16,7 +16,7 @@ package com.squareup.okhttp.internal.spdy; import com.squareup.okhttp.Protocol; -import java.io.OutputStream; +import okio.BufferedSink; import okio.BufferedSource; /** A version and dialect of the framed socket protocol. */ @@ -33,5 +33,5 @@ interface Variant { /** * @param client true if this is the HTTP client's writer, writing frames to a server. */ - FrameWriter newWriter(OutputStream out, boolean client); + FrameWriter newWriter(BufferedSink sink, boolean client); } diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/HpackDraft05Test.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/HpackDraft05Test.java index e480e41b7..268d85585 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/HpackDraft05Test.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/HpackDraft05Test.java @@ -15,34 +15,28 @@ */ package com.squareup.okhttp.internal.spdy; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; import java.util.List; import okio.ByteString; -import okio.Okio; +import okio.OkBuffer; import org.junit.Before; import org.junit.Test; import static com.squareup.okhttp.internal.Util.headerEntries; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class HpackDraft05Test { - private final MutableByteArrayInputStream bytesIn = new MutableByteArrayInputStream(); + private final OkBuffer bytesIn = new OkBuffer(); private HpackDraft05.Reader hpackReader; - private ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + private OkBuffer bytesOut = new OkBuffer(); private HpackDraft05.Writer hpackWriter; @Before public void reset() { hpackReader = newReader(bytesIn); - hpackWriter = new HpackDraft05.Writer(new DataOutputStream(bytesOut)); + hpackWriter = new HpackDraft05.Writer(bytesOut); } /** @@ -50,16 +44,16 @@ public class HpackDraft05Test { * Ensure the larger header content is not lost. */ @Test public void tooLargeToHPackIsStillEmitted() throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + OkBuffer out = new OkBuffer(); - out.write(0x00); // Literal indexed - out.write(0x0a); // Literal name (len = 10) - out.write("custom-key".getBytes(), 0, 10); + out.writeByte(0x00); // Literal indexed + out.writeByte(0x0a); // Literal name (len = 10) + out.writeUtf8("custom-key"); - out.write(0x0d); // Literal value (len = 13) - out.write("custom-header".getBytes(), 0, 13); + out.writeByte(0x0d); // Literal value (len = 13) + out.writeUtf8("custom-header"); - bytesIn.set(out.toByteArray()); + bytesIn.write(out, out.byteCount()); hpackReader.maxHeaderTableByteCount(1); hpackReader.readHeaders(); hpackReader.emitReferenceSet(); @@ -71,30 +65,30 @@ public class HpackDraft05Test { /** Oldest entries are evicted to support newer ones. */ @Test public void testEviction() throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + OkBuffer out = new OkBuffer(); - out.write(0x00); // Literal indexed - out.write(0x0a); // Literal name (len = 10) - out.write("custom-foo".getBytes(), 0, 10); + out.writeByte(0x00); // Literal indexed + out.writeByte(0x0a); // Literal name (len = 10) + out.writeUtf8("custom-foo"); - out.write(0x0d); // Literal value (len = 13) - out.write("custom-header".getBytes(), 0, 13); + out.writeByte(0x0d); // Literal value (len = 13) + out.writeUtf8("custom-header"); - out.write(0x00); // Literal indexed - out.write(0x0a); // Literal name (len = 10) - out.write("custom-bar".getBytes(), 0, 10); + out.writeByte(0x00); // Literal indexed + out.writeByte(0x0a); // Literal name (len = 10) + out.writeUtf8("custom-bar"); - out.write(0x0d); // Literal value (len = 13) - out.write("custom-header".getBytes(), 0, 13); + out.writeByte(0x0d); // Literal value (len = 13) + out.writeUtf8("custom-header"); - out.write(0x00); // Literal indexed - out.write(0x0a); // Literal name (len = 10) - out.write("custom-baz".getBytes(), 0, 10); + out.writeByte(0x00); // Literal indexed + out.writeByte(0x0a); // Literal name (len = 10) + out.writeUtf8("custom-baz"); - out.write(0x0d); // Literal value (len = 13) - out.write("custom-header".getBytes(), 0, 13); + out.writeByte(0x0d); // Literal value (len = 13) + out.writeUtf8("custom-header"); - bytesIn.set(out.toByteArray()); + bytesIn.write(out, out.byteCount()); // Set to only support 110 bytes (enough for 2 headers). hpackReader.maxHeaderTableByteCount(110); hpackReader.readHeaders(); @@ -122,18 +116,18 @@ public class HpackDraft05Test { /** Header table backing array is initially 8 long, let's ensure it grows. */ @Test public void dynamicallyGrowsBeyond64Entries() throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + OkBuffer out = new OkBuffer(); for (int i = 0; i < 256; i++) { - out.write(0x00); // Literal indexed - out.write(0x0a); // Literal name (len = 10) - out.write("custom-foo".getBytes(), 0, 10); + out.writeByte(0x00); // Literal indexed + out.writeByte(0x0a); // Literal name (len = 10) + out.writeUtf8("custom-foo"); - out.write(0x0d); // Literal value (len = 13) - out.write("custom-header".getBytes(), 0, 13); + out.writeByte(0x0d); // Literal value (len = 13) + out.writeUtf8("custom-header"); } - bytesIn.set(out.toByteArray()); + bytesIn.write(out, out.byteCount()); hpackReader.maxHeaderTableByteCount(16384); // Lots of headers need more room! hpackReader.readHeaders(); hpackReader.emitReferenceSet(); @@ -144,19 +138,19 @@ public class HpackDraft05Test { } @Test public void huffmanDecodingSupported() throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + OkBuffer out = new OkBuffer(); - out.write(0x04); // == Literal indexed == - // Indexed name (idx = 4) -> :path - out.write(0x8b); // Literal value Huffman encoded 11 bytes - // decodes to www.example.com which is length 15 + out.writeByte(0x04); // == Literal indexed == + // Indexed name (idx = 4) -> :path + out.writeByte(0x8b); // Literal value Huffman encoded 11 bytes + // decodes to www.example.com which is length 15 byte[] huffmanBytes = new byte[] { (byte) 0xdb, (byte) 0x6d, (byte) 0x88, (byte) 0x3e, (byte) 0x68, (byte) 0xd1, (byte) 0xcb, (byte) 0x12, (byte) 0x25, (byte) 0xba, (byte) 0x7f}; out.write(huffmanBytes, 0, huffmanBytes.length); - bytesIn.set(out.toByteArray()); + bytesIn.write(out, out.byteCount()); hpackReader.readHeaders(); hpackReader.emitReferenceSet(); @@ -172,16 +166,16 @@ public class HpackDraft05Test { * http://tools.ietf.org/html/draft-ietf-httpbis-header-compression-05#appendix-E.1.1 */ @Test public void readLiteralHeaderFieldWithIndexing() throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + OkBuffer out = new OkBuffer(); - out.write(0x00); // Literal indexed - out.write(0x0a); // Literal name (len = 10) - out.write("custom-key".getBytes(), 0, 10); + out.writeByte(0x00); // Literal indexed + out.writeByte(0x0a); // Literal name (len = 10) + out.writeUtf8("custom-key"); - out.write(0x0d); // Literal value (len = 13) - out.write("custom-header".getBytes(), 0, 13); + out.writeByte(0x0d); // Literal value (len = 13) + out.writeUtf8("custom-header"); - bytesIn.set(out.toByteArray()); + bytesIn.write(out, out.byteCount()); hpackReader.readHeaders(); hpackReader.emitReferenceSet(); @@ -201,19 +195,19 @@ public class HpackDraft05Test { @Test public void literalHeaderFieldWithoutIndexingNewName() throws IOException { List
headerBlock = headerEntries("custom-key", "custom-header"); - ByteArrayOutputStream expectedBytes = new ByteArrayOutputStream(); + OkBuffer expectedBytes = new OkBuffer(); - expectedBytes.write(0x40); // Not indexed - expectedBytes.write(0x0a); // Literal name (len = 10) + expectedBytes.writeByte(0x40); // Not indexed + expectedBytes.writeByte(0x0a); // Literal name (len = 10) expectedBytes.write("custom-key".getBytes(), 0, 10); - expectedBytes.write(0x0d); // Literal value (len = 13) + expectedBytes.writeByte(0x0d); // Literal value (len = 13) expectedBytes.write("custom-header".getBytes(), 0, 13); hpackWriter.writeHeaders(headerBlock); - assertArrayEquals(expectedBytes.toByteArray(), bytesOut.toByteArray()); + assertEquals(expectedBytes, bytesOut); - bytesIn.set(bytesOut.toByteArray()); + bytesIn.write(bytesOut, bytesOut.byteCount()); hpackReader.readHeaders(); hpackReader.emitReferenceSet(); @@ -228,16 +222,16 @@ public class HpackDraft05Test { @Test public void literalHeaderFieldWithoutIndexingIndexedName() throws IOException { List
headerBlock = headerEntries(":path", "/sample/path"); - ByteArrayOutputStream expectedBytes = new ByteArrayOutputStream(); - expectedBytes.write(0x44); // == Literal not indexed == - // Indexed name (idx = 4) -> :path - expectedBytes.write(0x0c); // Literal value (len = 12) + OkBuffer expectedBytes = new OkBuffer(); + expectedBytes.writeByte(0x44); // == Literal not indexed == + // Indexed name (idx = 4) -> :path + expectedBytes.writeByte(0x0c); // Literal value (len = 12) expectedBytes.write("/sample/path".getBytes(), 0, 12); hpackWriter.writeHeaders(headerBlock); - assertArrayEquals(expectedBytes.toByteArray(), bytesOut.toByteArray()); + assertEquals(expectedBytes, bytesOut); - bytesIn.set(bytesOut.toByteArray()); + bytesIn.write(bytesOut, bytesOut.byteCount()); hpackReader.readHeaders(); hpackReader.emitReferenceSet(); @@ -250,12 +244,9 @@ public class HpackDraft05Test { * http://tools.ietf.org/html/draft-ietf-httpbis-header-compression-05#appendix-E.1.3 */ @Test public void readIndexedHeaderField() throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + bytesIn.writeByte(0x82); // == Indexed - Add == + // idx = 2 -> :method: GET - out.write(0x82); // == Indexed - Add == - // idx = 2 -> :method: GET - - bytesIn.set(out.toByteArray()); hpackReader.readHeaders(); hpackReader.emitReferenceSet(); @@ -273,16 +264,13 @@ public class HpackDraft05Test { * http://tools.ietf.org/html/draft-ietf-httpbis-header-compression-05#section-3.2.1 */ @Test public void toggleIndex() throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - // Static table entries are copied to the top of the reference set. - out.write(0x82); // == Indexed - Add == - // idx = 2 -> :method: GET + bytesIn.writeByte(0x82); // == Indexed - Add == + // idx = 2 -> :method: GET // Specifying an index to an entry in the reference set removes it. - out.write(0x81); // == Indexed - Remove == - // idx = 1 -> :method: GET + bytesIn.writeByte(0x81); // == Indexed - Remove == + // idx = 1 -> :method: GET - bytesIn.set(out.toByteArray()); hpackReader.readHeaders(); hpackReader.emitReferenceSet(); @@ -300,12 +288,9 @@ public class HpackDraft05Test { * http://tools.ietf.org/html/draft-ietf-httpbis-header-compression-05#appendix-E.1.4 */ @Test public void readIndexedHeaderFieldFromStaticTableWithoutBuffering() throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + bytesIn.writeByte(0x82); // == Indexed - Add == + // idx = 2 -> :method: GET - out.write(0x82); // == Indexed - Add == - // idx = 2 -> :method: GET - - bytesIn.set(out.toByteArray()); hpackReader.maxHeaderTableByteCount(0); // SETTINGS_HEADER_TABLE_SIZE == 0 hpackReader.readHeaders(); hpackReader.emitReferenceSet(); @@ -320,38 +305,38 @@ public class HpackDraft05Test { * http://tools.ietf.org/html/draft-ietf-httpbis-header-compression-05#appendix-E.2 */ @Test public void readRequestExamplesWithoutHuffman() throws IOException { - ByteArrayOutputStream out = firstRequestWithoutHuffman(); - bytesIn.set(out.toByteArray()); + OkBuffer out = firstRequestWithoutHuffman(); + bytesIn.write(out, out.byteCount()); hpackReader.readHeaders(); hpackReader.emitReferenceSet(); checkReadFirstRequestWithoutHuffman(); out = secondRequestWithoutHuffman(); - bytesIn.set(out.toByteArray()); + bytesIn.write(out, out.byteCount()); hpackReader.readHeaders(); hpackReader.emitReferenceSet(); checkReadSecondRequestWithoutHuffman(); out = thirdRequestWithoutHuffman(); - bytesIn.set(out.toByteArray()); + bytesIn.write(out, out.byteCount()); hpackReader.readHeaders(); hpackReader.emitReferenceSet(); checkReadThirdRequestWithoutHuffman(); } - private ByteArrayOutputStream firstRequestWithoutHuffman() { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + private OkBuffer firstRequestWithoutHuffman() { + OkBuffer out = new OkBuffer(); - out.write(0x82); // == Indexed - Add == - // idx = 2 -> :method: GET - out.write(0x87); // == Indexed - Add == - // idx = 7 -> :scheme: http - out.write(0x86); // == Indexed - Add == - // idx = 6 -> :path: / - out.write(0x04); // == Literal indexed == - // Indexed name (idx = 4) -> :authority - out.write(0x0f); // Literal value (len = 15) - out.write("www.example.com".getBytes(), 0, 15); + out.writeByte(0x82); // == Indexed - Add == + // idx = 2 -> :method: GET + out.writeByte(0x87); // == Indexed - Add == + // idx = 7 -> :scheme: http + out.writeByte(0x86); // == Indexed - Add == + // idx = 6 -> :path: / + out.writeByte(0x04); // == Literal indexed == + // Indexed name (idx = 4) -> :authority + out.writeByte(0x0f); // Literal value (len = 15) + out.writeUtf8("www.example.com"); return out; } @@ -390,13 +375,13 @@ public class HpackDraft05Test { ":authority", "www.example.com"), hpackReader.getAndReset()); } - private ByteArrayOutputStream secondRequestWithoutHuffman() { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + private OkBuffer secondRequestWithoutHuffman() { + OkBuffer out = new OkBuffer(); - out.write(0x1b); // == Literal indexed == - // Indexed name (idx = 27) -> cache-control - out.write(0x08); // Literal value (len = 8) - out.write("no-cache".getBytes(), 0, 8); + out.writeByte(0x1b); // == Literal indexed == + // Indexed name (idx = 27) -> cache-control + out.writeByte(0x08); // Literal value (len = 8) + out.writeUtf8("no-cache"); return out; } @@ -441,23 +426,23 @@ public class HpackDraft05Test { "cache-control", "no-cache"), hpackReader.getAndReset()); } - private ByteArrayOutputStream thirdRequestWithoutHuffman() { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + private OkBuffer thirdRequestWithoutHuffman() { + OkBuffer out = new OkBuffer(); - out.write(0x80); // == Empty reference set == - out.write(0x85); // == Indexed - Add == - // idx = 5 -> :method: GET - out.write(0x8c); // == Indexed - Add == - // idx = 12 -> :scheme: https - out.write(0x8b); // == Indexed - Add == - // idx = 11 -> :path: /index.html - out.write(0x84); // == Indexed - Add == - // idx = 4 -> :authority: www.example.com - out.write(0x00); // Literal indexed - out.write(0x0a); // Literal name (len = 10) - out.write("custom-key".getBytes(), 0, 10); - out.write(0x0c); // Literal value (len = 12) - out.write("custom-value".getBytes(), 0, 12); + out.writeByte(0x80); // == Empty reference set == + out.writeByte(0x85); // == Indexed - Add == + // idx = 5 -> :method: GET + out.writeByte(0x8c); // == Indexed - Add == + // idx = 12 -> :scheme: https + out.writeByte(0x8b); // == Indexed - Add == + // idx = 11 -> :path: /index.html + out.writeByte(0x84); // == Indexed - Add == + // idx = 4 -> :authority: www.example.com + out.writeByte(0x00); // Literal indexed + out.writeByte(0x0a); // Literal name (len = 10) + out.writeUtf8("custom-key"); + out.writeByte(0x0c); // Literal value (len = 12) + out.writeUtf8("custom-value"); return out; } @@ -522,38 +507,38 @@ public class HpackDraft05Test { * http://tools.ietf.org/html/draft-ietf-httpbis-header-compression-05#appendix-E.3 */ @Test public void readRequestExamplesWithHuffman() throws IOException { - ByteArrayOutputStream out = firstRequestWithHuffman(); - bytesIn.set(out.toByteArray()); + OkBuffer out = firstRequestWithHuffman(); + bytesIn.write(out, out.byteCount()); hpackReader.readHeaders(); hpackReader.emitReferenceSet(); checkReadFirstRequestWithHuffman(); out = secondRequestWithHuffman(); - bytesIn.set(out.toByteArray()); + bytesIn.write(out, out.byteCount()); hpackReader.readHeaders(); hpackReader.emitReferenceSet(); checkReadSecondRequestWithHuffman(); out = thirdRequestWithHuffman(); - bytesIn.set(out.toByteArray()); + bytesIn.write(out, out.byteCount()); hpackReader.readHeaders(); hpackReader.emitReferenceSet(); checkReadThirdRequestWithHuffman(); } - private ByteArrayOutputStream firstRequestWithHuffman() { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + private OkBuffer firstRequestWithHuffman() { + OkBuffer out = new OkBuffer(); - out.write(0x82); // == Indexed - Add == - // idx = 2 -> :method: GET - out.write(0x87); // == Indexed - Add == - // idx = 7 -> :scheme: http - out.write(0x86); // == Indexed - Add == - // idx = 6 -> :path: / - out.write(0x04); // == Literal indexed == - // Indexed name (idx = 4) -> :authority - out.write(0x8b); // Literal value Huffman encoded 11 bytes - // decodes to www.example.com which is length 15 + out.writeByte(0x82); // == Indexed - Add == + // idx = 2 -> :method: GET + out.writeByte(0x87); // == Indexed - Add == + // idx = 7 -> :scheme: http + out.writeByte(0x86); // == Indexed - Add == + // idx = 6 -> :path: / + out.writeByte(0x04); // == Literal indexed == + // Indexed name (idx = 4) -> :authority + out.writeByte(0x8b); // Literal value Huffman encoded 11 bytes + // decodes to www.example.com which is length 15 byte[] huffmanBytes = new byte[] { (byte) 0xdb, (byte) 0x6d, (byte) 0x88, (byte) 0x3e, (byte) 0x68, (byte) 0xd1, (byte) 0xcb, (byte) 0x12, @@ -597,13 +582,13 @@ public class HpackDraft05Test { ":authority", "www.example.com"), hpackReader.getAndReset()); } - private ByteArrayOutputStream secondRequestWithHuffman() { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + private OkBuffer secondRequestWithHuffman() { + OkBuffer out = new OkBuffer(); - out.write(0x1b); // == Literal indexed == - // Indexed name (idx = 27) -> cache-control - out.write(0x86); // Literal value Huffman encoded 6 bytes - // decodes to no-cache which is length 8 + out.writeByte(0x1b); // == Literal indexed == + // Indexed name (idx = 27) -> cache-control + out.writeByte(0x86); // Literal value Huffman encoded 6 bytes + // decodes to no-cache which is length 8 byte[] huffmanBytes = new byte[] { (byte) 0x63, (byte) 0x65, (byte) 0x4a, (byte) 0x13, (byte) 0x98, (byte) 0xff}; @@ -652,27 +637,27 @@ public class HpackDraft05Test { "cache-control", "no-cache"), hpackReader.getAndReset()); } - private ByteArrayOutputStream thirdRequestWithHuffman() { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + private OkBuffer thirdRequestWithHuffman() { + OkBuffer out = new OkBuffer(); - out.write(0x80); // == Empty reference set == - out.write(0x85); // == Indexed - Add == - // idx = 5 -> :method: GET - out.write(0x8c); // == Indexed - Add == - // idx = 12 -> :scheme: https - out.write(0x8b); // == Indexed - Add == - // idx = 11 -> :path: /index.html - out.write(0x84); // == Indexed - Add == - // idx = 4 -> :authority: www.example.com - out.write(0x00); // Literal indexed - out.write(0x88); // Literal name Huffman encoded 8 bytes - // decodes to custom-key which is length 10 + out.writeByte(0x80); // == Empty reference set == + out.writeByte(0x85); // == Indexed - Add == + // idx = 5 -> :method: GET + out.writeByte(0x8c); // == Indexed - Add == + // idx = 12 -> :scheme: https + out.writeByte(0x8b); // == Indexed - Add == + // idx = 11 -> :path: /index.html + out.writeByte(0x84); // == Indexed - Add == + // idx = 4 -> :authority: www.example.com + out.writeByte(0x00); // Literal indexed + out.writeByte(0x88); // Literal name Huffman encoded 8 bytes + // decodes to custom-key which is length 10 byte[] huffmanBytes = new byte[] { (byte) 0x4e, (byte) 0xb0, (byte) 0x8b, (byte) 0x74, (byte) 0x97, (byte) 0x90, (byte) 0xfa, (byte) 0x7f}; out.write(huffmanBytes, 0, huffmanBytes.length); - out.write(0x89); // Literal value Huffman encoded 6 bytes - // decodes to custom-value which is length 12 + out.writeByte(0x89); // Literal value Huffman encoded 6 bytes + // decodes to custom-value which is length 12 huffmanBytes = new byte[] { (byte) 0x4e, (byte) 0xb0, (byte) 0x8b, (byte) 0x74, (byte) 0x97, (byte) 0x9a, (byte) 0x17, (byte) 0xa8, @@ -799,13 +784,12 @@ public class HpackDraft05Test { assertEquals(ByteString.EMPTY, newReader(byteStream(0)).readByteString(false)); } - private HpackDraft05.Reader newReader(InputStream input) { - return new HpackDraft05.Reader(false, 4096, Okio.source(input)); + private HpackDraft05.Reader newReader(OkBuffer source) { + return new HpackDraft05.Reader(false, 4096, source); } - private InputStream byteStream(int... bytes) { - byte[] data = intArrayToByteArray(bytes); - return new ByteArrayInputStream(data); + private OkBuffer byteStream(int... bytes) { + return new OkBuffer().write(intArrayToByteArray(bytes)); } private void checkEntry(Header entry, String name, String value, int size) { @@ -815,18 +799,17 @@ public class HpackDraft05Test { } private void assertBytes(int... bytes) { - byte[] expected = intArrayToByteArray(bytes); - byte[] actual = bytesOut.toByteArray(); - assertEquals(Arrays.toString(expected), Arrays.toString(actual)); - bytesOut.reset(); // So the next test starts with a clean slate. + ByteString expected = intArrayToByteArray(bytes); + ByteString actual = bytesOut.readByteString((int) bytesOut.byteCount()); + assertEquals(expected, actual); } - private byte[] intArrayToByteArray(int[] bytes) { + private ByteString intArrayToByteArray(int[] bytes) { byte[] data = new byte[bytes.length]; for (int i = 0; i < bytes.length; i++) { data[i] = (byte) bytes[i]; } - return data; + return ByteString.of(data); } private void assertHeaderReferenced(int index) { @@ -840,17 +823,4 @@ public class HpackDraft05Test { private int headerTableLength() { return hpackReader.headerTable.length; } - - static class MutableByteArrayInputStream extends ByteArrayInputStream { - - MutableByteArrayInputStream() { - super(new byte[] { }); - } - - void set(byte[] replacement) { - this.buf = replacement; - this.pos = 0; - this.count = replacement.length; - } - } } diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/Http20Draft09Test.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/Http20Draft09Test.java index b21e5d2fb..77eb8dfe9 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/Http20Draft09Test.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/Http20Draft09Test.java @@ -480,7 +480,7 @@ public class Http20Draft09Test { } @Test public void frameSizeError() throws IOException { - Http20Draft09.Writer writer = new Http20Draft09.Writer(new ByteArrayOutputStream(), true); + Http20Draft09.Writer writer = new Http20Draft09.Writer(new OkBuffer(), true); try { writer.frameHeader(16384, Http20Draft09.TYPE_DATA, Http20Draft09.FLAG_NONE, 0); @@ -491,7 +491,7 @@ public class Http20Draft09Test { } @Test public void streamIdHasReservedBit() throws IOException { - Http20Draft09.Writer writer = new Http20Draft09.Writer(new ByteArrayOutputStream(), true); + Http20Draft09.Writer writer = new Http20Draft09.Writer(new OkBuffer(), true); try { int streamId = 3; @@ -504,22 +504,22 @@ public class Http20Draft09Test { } private byte[] literalHeaders(List
sentHeaders) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - new HpackDraft05.Writer(new DataOutputStream(out)).writeHeaders(sentHeaders); - return out.toByteArray(); + OkBuffer out = new OkBuffer(); + new HpackDraft05.Writer(out).writeHeaders(sentHeaders); + return out.readByteString((int) out.byteCount()).toByteArray(); } private byte[] sendPingFrame(boolean ack, int payload1, int payload2) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + OkBuffer out = new OkBuffer(); new Http20Draft09.Writer(out, true).ping(ack, payload1, payload2); - return out.toByteArray(); + return out.readByteString((int) out.byteCount()).toByteArray(); } private byte[] sendGoAway(int lastGoodStreamId, ErrorCode errorCode, byte[] debugData) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + OkBuffer out = new OkBuffer(); new Http20Draft09.Writer(out, true).goAway(lastGoodStreamId, errorCode, debugData); - return out.toByteArray(); + return out.readByteString((int) out.byteCount()).toByteArray(); } private byte[] sendDataFrame(byte[] data) throws IOException { @@ -527,15 +527,15 @@ public class Http20Draft09Test { } private byte[] sendDataFrame(byte[] data, int offset, int byteCount) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + OkBuffer out = new OkBuffer(); new Http20Draft09.Writer(out, true).dataFrame(expectedStreamId, Http20Draft09.FLAG_NONE, data, offset, byteCount); - return out.toByteArray(); + return out.readByteString((int) out.byteCount()).toByteArray(); } private byte[] windowUpdate(long windowSizeIncrement) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + OkBuffer out = new OkBuffer(); new Http20Draft09.Writer(out, true).windowUpdate(expectedStreamId, windowSizeIncrement); - return out.toByteArray(); + return out.readByteString((int) out.byteCount()).toByteArray(); } } diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java index 939bb99da..6bef5dd6a 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java @@ -17,7 +17,6 @@ package com.squareup.okhttp.internal.spdy; import com.squareup.okhttp.internal.Util; -import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; @@ -33,6 +32,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import okio.BufferedSource; import okio.ByteString; +import okio.OkBuffer; import okio.Okio; /** Replays prerecorded outgoing frames and records incoming frames. */ @@ -40,7 +40,7 @@ public final class MockSpdyPeer implements Closeable { private int frameCount = 0; private boolean client = false; private Variant variant = new Spdy3(); - private final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + private final OkBuffer bytesOut = new OkBuffer(); private FrameWriter frameWriter = variant.newWriter(bytesOut, client); private final List outFrames = new ArrayList(); private final BlockingQueue inFrames = new LinkedBlockingQueue(); @@ -69,7 +69,7 @@ public final class MockSpdyPeer implements Closeable { } public FrameWriter sendFrame() { - outFrames.add(new OutFrame(frameCount++, bytesOut.size(), Integer.MAX_VALUE)); + outFrames.add(new OutFrame(frameCount++, bytesOut.byteCount(), Integer.MAX_VALUE)); return frameWriter; } @@ -78,7 +78,7 @@ public final class MockSpdyPeer implements Closeable { * won't be generated naturally. */ public void sendFrame(byte[] frame) throws IOException { - outFrames.add(new OutFrame(frameCount++, bytesOut.size(), Integer.MAX_VALUE)); + outFrames.add(new OutFrame(frameCount++, bytesOut.byteCount(), Integer.MAX_VALUE)); bytesOut.write(frame); } @@ -88,7 +88,7 @@ public final class MockSpdyPeer implements Closeable { * malformed. */ public FrameWriter sendTruncatedFrame(int truncateToLength) { - outFrames.add(new OutFrame(frameCount++, bytesOut.size(), truncateToLength)); + outFrames.add(new OutFrame(frameCount++, bytesOut.byteCount(), truncateToLength)); return frameWriter; } @@ -121,7 +121,7 @@ public final class MockSpdyPeer implements Closeable { FrameReader reader = variant.newReader(Okio.buffer(Okio.source(in)), client); Iterator outFramesIterator = outFrames.iterator(); - byte[] outBytes = bytesOut.toByteArray(); + byte[] outBytes = bytesOut.readByteString((int) bytesOut.byteCount()).toByteArray(); OutFrame nextOutFrame = null; for (int i = 0; i < frameCount; i++) { @@ -130,9 +130,9 @@ public final class MockSpdyPeer implements Closeable { } if (nextOutFrame != null && nextOutFrame.sequence == i) { - int start = nextOutFrame.start; + long start = nextOutFrame.start; int truncateToLength = nextOutFrame.truncateToLength; - int end; + long end; if (outFramesIterator.hasNext()) { nextOutFrame = outFramesIterator.next(); end = nextOutFrame.start; @@ -141,8 +141,8 @@ public final class MockSpdyPeer implements Closeable { } // write a frame - int length = Math.min(end - start, truncateToLength); - out.write(outBytes, start, length); + int length = (int) Math.min(end - start, truncateToLength); + out.write(outBytes, (int) start, length); } else { // read a frame InFrame inFrame = new InFrame(i, reader); @@ -173,10 +173,10 @@ public final class MockSpdyPeer implements Closeable { private static class OutFrame { private final int sequence; - private final int start; + private final long start; private final int truncateToLength; - private OutFrame(int sequence, int start, int truncateToLength) { + private OutFrame(int sequence, long start, int truncateToLength) { this.sequence = sequence; this.start = start; this.truncateToLength = truncateToLength; diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/Spdy3Test.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/Spdy3Test.java index c56310aba..fc7b977ca 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/Spdy3Test.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/Spdy3Test.java @@ -98,21 +98,21 @@ public class Spdy3Test { } private byte[] sendDataFrame(byte[] data, int offset, int byteCount) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + OkBuffer out = new OkBuffer(); new Spdy3.Writer(out, true).sendDataFrame(expectedStreamId, 0, data, offset, byteCount); - return out.toByteArray(); + return out.readByteString((int) out.byteCount()).toByteArray(); } private byte[] windowUpdate(long increment) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + OkBuffer out = new OkBuffer(); new Spdy3.Writer(out, true).windowUpdate(expectedStreamId, increment); - return out.toByteArray(); + return out.readByteString((int) out.byteCount()).toByteArray(); } private byte[] sendGoAway(int lastGoodStreamId, ErrorCode errorCode, byte[] debugData) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + OkBuffer out = new OkBuffer(); new Spdy3.Writer(out, true).goAway(lastGoodStreamId, errorCode, debugData); - return out.toByteArray(); + return out.readByteString((int) out.byteCount()).toByteArray(); } } diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java index 30994e3e5..fcf32eb86 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java @@ -1060,7 +1060,7 @@ public final class SpdyConnectionTest { // Play it back. SpdyConnection connection = connection(peer, variant); connection.okHttpSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, INITIAL_WINDOW_SIZE); - SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true); + SpdyStream stream = connection.newStream(headerEntries("b", "banana"), false, true); assertEquals(0, stream.unacknowledgedBytesRead); assertEquals(headerEntries("a", "android"), stream.getResponseHeaders()); Source in = stream.getSource(); diff --git a/okhttp/src/main/java/com/squareup/okhttp/Connection.java b/okhttp/src/main/java/com/squareup/okhttp/Connection.java index 3e24376b3..8aaecd2a1 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/Connection.java +++ b/okhttp/src/main/java/com/squareup/okhttp/Connection.java @@ -32,6 +32,7 @@ import java.net.Proxy; import java.net.Socket; import java.net.SocketTimeoutException; import javax.net.ssl.SSLSocket; +import okio.BufferedSink; import okio.BufferedSource; import okio.ByteString; import okio.Okio; @@ -73,6 +74,7 @@ public final class Connection implements Closeable { private InputStream in; private OutputStream out; private BufferedSource source; + private BufferedSink sink; private boolean connected = false; private HttpConnection httpConnection; private SpdyConnection spdyConnection; @@ -98,7 +100,7 @@ public final class Connection implements Closeable { if (route.address.sslSocketFactory != null) { upgradeToTls(tunnelRequest); } else { - streamWrapper(); + streamWrapper(true); httpConnection = new HttpConnection(pool, this, source, out); } connected = true; @@ -153,7 +155,6 @@ public final class Connection implements Closeable { out = sslSocket.getOutputStream(); in = sslSocket.getInputStream(); handshake = Handshake.get(sslSocket.getSession()); - streamWrapper(); ByteString maybeProtocol; Protocol selectedProtocol = Protocol.HTTP_11; @@ -162,11 +163,13 @@ public final class Connection implements Closeable { } if (selectedProtocol.spdyVariant) { + streamWrapper(false); sslSocket.setSoTimeout(0); // SPDY timeouts are set per-stream. - spdyConnection = new SpdyConnection.Builder(route.address.getUriHost(), true, source, out) + spdyConnection = new SpdyConnection.Builder(route.address.getUriHost(), true, source, sink) .protocol(selectedProtocol).build(); spdyConnection.sendConnectionHeader(); } else { + streamWrapper(true); httpConnection = new HttpConnection(pool, this, source, out); } } @@ -337,8 +340,14 @@ public final class Connection implements Closeable { } } - private void streamWrapper() throws IOException { + // TODO: drop the outputStream option when we use Okio's sink in HttpConnection. + private void streamWrapper(boolean outputStream) throws IOException { source = Okio.buffer(Okio.source(in)); - out = new BufferedOutputStream(out, 256); + + if (outputStream) { + out = new BufferedOutputStream(out, 256); + } else { + sink = Okio.buffer(Okio.sink(out)); + } } } diff --git a/okio/src/main/java/okio/DeflaterSink.java b/okio/src/main/java/okio/DeflaterSink.java index 43a964ccd..276c3e636 100644 --- a/okio/src/main/java/okio/DeflaterSink.java +++ b/okio/src/main/java/okio/DeflaterSink.java @@ -70,8 +70,9 @@ public final class DeflaterSink implements Sink { @IgnoreJRERequirement private void deflate(boolean syncFlush) throws IOException { + OkBuffer buffer = sink.buffer(); while (true) { - Segment s = sink.buffer().writableSegment(1); + Segment s = buffer.writableSegment(1); // The 4-parameter overload of deflate() doesn't exist in the RI until // Java 1.7, and is public (although with @hide) on Android since 2.3. @@ -83,7 +84,7 @@ public final class DeflaterSink implements Sink { if (deflated == 0) return; s.limit += deflated; - sink.buffer().byteCount += deflated; + buffer.byteCount += deflated; sink.emitCompleteSegments(); } } diff --git a/okio/src/main/java/okio/RealBufferedSink.java b/okio/src/main/java/okio/RealBufferedSink.java index 2ad52c494..99ae0cc68 100644 --- a/okio/src/main/java/okio/RealBufferedSink.java +++ b/okio/src/main/java/okio/RealBufferedSink.java @@ -126,6 +126,7 @@ final class RealBufferedSink implements BufferedSink { } @Override public void close() throws IOException { + if (closed) return; flush(); sink.close(); closed = true;