From f5dc99aa49f4832e3534dca6fdc90aa072ef0180 Mon Sep 17 00:00:00 2001 From: jwilson Date: Sat, 14 Mar 2015 21:09:36 -0400 Subject: [PATCH] Introduce a buffer in SpdyStream's outgoing frames. Previously we'd consistently send 2048 byte frames because that was Okio's default segment size. This increases OkHttp's typical segment size to the 16 KiB frame size that all HTTP/2 endpoints support. --- .../internal/spdy/Spdy3ConnectionTest.java | 28 ++++++++- .../okhttp/internal/spdy/SpdyStream.java | 62 +++++++++++++------ 2 files changed, 71 insertions(+), 19 deletions(-) diff --git a/okhttp-tests/src/test/java/com/squareup/okhttp/internal/spdy/Spdy3ConnectionTest.java b/okhttp-tests/src/test/java/com/squareup/okhttp/internal/spdy/Spdy3ConnectionTest.java index 110412ec2..074d9ad6f 100644 --- a/okhttp-tests/src/test/java/com/squareup/okhttp/internal/spdy/Spdy3ConnectionTest.java +++ b/okhttp-tests/src/test/java/com/squareup/okhttp/internal/spdy/Spdy3ConnectionTest.java @@ -941,8 +941,9 @@ public final class Spdy3ConnectionTest { sink.write(new Buffer().writeUtf8("abcde"), 5); stream.writeTimeout().timeout(500, TimeUnit.MILLISECONDS); long startNanos = System.nanoTime(); + sink.write(new Buffer().writeUtf8("f"), 1); try { - sink.write(new Buffer().writeUtf8("f"), 1); // This will time out waiting on the write window. + sink.flush(); // This will time out waiting on the write window. fail(); } catch (InterruptedIOException expected) { } @@ -957,6 +958,31 @@ public final class Spdy3ConnectionTest { assertEquals(TYPE_RST_STREAM, peer.takeFrame().type); } + @Test public void outgoingWritesAreBatched() throws Exception { + // write the mocking script + peer.acceptFrame(); // SYN_STREAM + peer.sendFrame().synReply(false, 1, headerEntries("a", "android")); + peer.acceptFrame(); // DATA + peer.play(); + + // play it back + SpdyConnection connection = connection(peer, SPDY3); + SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true); + + // two outgoing writes + Sink sink = stream.getSink(); + sink.write(new Buffer().writeUtf8("abcde"), 5); + sink.write(new Buffer().writeUtf8("fghij"), 5); + sink.close(); + + // verify the peer received one incoming frame + assertEquals(TYPE_HEADERS, peer.takeFrame().type); + MockSpdyPeer.InFrame data = peer.takeFrame(); + assertEquals(TYPE_DATA, data.type); + assertTrue(Arrays.equals("abcdefghij".getBytes("UTF-8"), data.data)); + assertTrue(data.inFinished); + } + @Test public void headers() throws Exception { // write the mocking script peer.acceptFrame(); // SYN_STREAM diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java b/okhttp/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java index 331536d37..abc5df651 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java @@ -53,7 +53,6 @@ public final class SpdyStream { private final int id; private final SpdyConnection connection; - private long readTimeoutMillis = 0; /** Headers sent by the stream initiator. Immutable and non null. */ private final List
requestHeaders; @@ -468,6 +467,14 @@ public final class SpdyStream { * thread safe. */ final class SpdyDataSink implements Sink { + private static final long EMIT_BUFFER_SIZE = 16384; + + /** + * Buffer of outgoing data. This batches writes of small writes into this sink as larges + * frames written to the outgoing connection. Batching saves the (small) framing overhead. + */ + private final Buffer sendBuffer = new Buffer(); + private boolean closed; /** @@ -478,26 +485,34 @@ public final class SpdyStream { @Override public void write(Buffer source, long byteCount) throws IOException { assert (!Thread.holdsLock(SpdyStream.this)); - while (byteCount > 0) { - long toWrite; - synchronized (SpdyStream.this) { - writeTimeout.enter(); - try { - while (bytesLeftInWriteWindow <= 0 && !finished && !closed && errorCode == null) { - waitForIo(); // Wait until we receive a WINDOW_UPDATE. - } - } finally { - writeTimeout.exitAndThrowIfTimedOut(); - } + sendBuffer.write(source, byteCount); + while (sendBuffer.size() >= EMIT_BUFFER_SIZE) { + emitDataFrame(false); + } + } - checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting. - toWrite = Math.min(bytesLeftInWriteWindow, byteCount); - bytesLeftInWriteWindow -= toWrite; + /** + * Emit a single data frame to the connection. The frame's size be limited by this stream's + * write window. This method will block until the write window is nonempty. + */ + private void emitDataFrame(boolean outFinished) throws IOException { + long toWrite; + synchronized (SpdyStream.this) { + writeTimeout.enter(); + try { + while (bytesLeftInWriteWindow <= 0 && !finished && !closed && errorCode == null) { + waitForIo(); // Wait until we receive a WINDOW_UPDATE. + } + } finally { + writeTimeout.exitAndThrowIfTimedOut(); } - byteCount -= toWrite; - connection.writeData(id, false, source, toWrite); + checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting. + toWrite = Math.min(bytesLeftInWriteWindow, sendBuffer.size()); + bytesLeftInWriteWindow -= toWrite; } + + connection.writeData(id, outFinished && toWrite == sendBuffer.size(), sendBuffer, toWrite); } @Override public void flush() throws IOException { @@ -505,6 +520,9 @@ public final class SpdyStream { synchronized (SpdyStream.this) { checkOutNotClosed(); } + while (sendBuffer.size() > 0) { + emitDataFrame(false); + } connection.flush(); } @@ -518,7 +536,15 @@ public final class SpdyStream { if (closed) return; } if (!sink.finished) { - connection.writeData(id, true, null, 0); + // Emit the remaining data, setting the END_STREAM flag on the last frame. + if (sendBuffer.size() > 0) { + while (sendBuffer.size() > 0) { + emitDataFrame(true); + } + } else { + // Send an empty frame just so we can set the END_STREAM flag. + connection.writeData(id, true, null, 0); + } } synchronized (SpdyStream.this) { closed = true;