diff --git a/okhttp-tests/src/test/java/com/squareup/okhttp/internal/spdy/Spdy3ConnectionTest.java b/okhttp-tests/src/test/java/com/squareup/okhttp/internal/spdy/Spdy3ConnectionTest.java index 110412ec2..074d9ad6f 100644 --- a/okhttp-tests/src/test/java/com/squareup/okhttp/internal/spdy/Spdy3ConnectionTest.java +++ b/okhttp-tests/src/test/java/com/squareup/okhttp/internal/spdy/Spdy3ConnectionTest.java @@ -941,8 +941,9 @@ public final class Spdy3ConnectionTest { sink.write(new Buffer().writeUtf8("abcde"), 5); stream.writeTimeout().timeout(500, TimeUnit.MILLISECONDS); long startNanos = System.nanoTime(); + sink.write(new Buffer().writeUtf8("f"), 1); try { - sink.write(new Buffer().writeUtf8("f"), 1); // This will time out waiting on the write window. + sink.flush(); // This will time out waiting on the write window. fail(); } catch (InterruptedIOException expected) { } @@ -957,6 +958,31 @@ public final class Spdy3ConnectionTest { assertEquals(TYPE_RST_STREAM, peer.takeFrame().type); } + @Test public void outgoingWritesAreBatched() throws Exception { + // write the mocking script + peer.acceptFrame(); // SYN_STREAM + peer.sendFrame().synReply(false, 1, headerEntries("a", "android")); + peer.acceptFrame(); // DATA + peer.play(); + + // play it back + SpdyConnection connection = connection(peer, SPDY3); + SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true); + + // two outgoing writes + Sink sink = stream.getSink(); + sink.write(new Buffer().writeUtf8("abcde"), 5); + sink.write(new Buffer().writeUtf8("fghij"), 5); + sink.close(); + + // verify the peer received one incoming frame + assertEquals(TYPE_HEADERS, peer.takeFrame().type); + MockSpdyPeer.InFrame data = peer.takeFrame(); + assertEquals(TYPE_DATA, data.type); + assertTrue(Arrays.equals("abcdefghij".getBytes("UTF-8"), data.data)); + assertTrue(data.inFinished); + } + @Test public void headers() throws Exception { // write the mocking script peer.acceptFrame(); // SYN_STREAM diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java b/okhttp/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java index 331536d37..abc5df651 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java @@ -53,7 +53,6 @@ public final class SpdyStream { private final int id; private final SpdyConnection connection; - private long readTimeoutMillis = 0; /** Headers sent by the stream initiator. Immutable and non null. */ private final List
requestHeaders; @@ -468,6 +467,14 @@ public final class SpdyStream { * thread safe. */ final class SpdyDataSink implements Sink { + private static final long EMIT_BUFFER_SIZE = 16384; + + /** + * Buffer of outgoing data. This batches writes of small writes into this sink as larges + * frames written to the outgoing connection. Batching saves the (small) framing overhead. + */ + private final Buffer sendBuffer = new Buffer(); + private boolean closed; /** @@ -478,26 +485,34 @@ public final class SpdyStream { @Override public void write(Buffer source, long byteCount) throws IOException { assert (!Thread.holdsLock(SpdyStream.this)); - while (byteCount > 0) { - long toWrite; - synchronized (SpdyStream.this) { - writeTimeout.enter(); - try { - while (bytesLeftInWriteWindow <= 0 && !finished && !closed && errorCode == null) { - waitForIo(); // Wait until we receive a WINDOW_UPDATE. - } - } finally { - writeTimeout.exitAndThrowIfTimedOut(); - } + sendBuffer.write(source, byteCount); + while (sendBuffer.size() >= EMIT_BUFFER_SIZE) { + emitDataFrame(false); + } + } - checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting. - toWrite = Math.min(bytesLeftInWriteWindow, byteCount); - bytesLeftInWriteWindow -= toWrite; + /** + * Emit a single data frame to the connection. The frame's size be limited by this stream's + * write window. This method will block until the write window is nonempty. + */ + private void emitDataFrame(boolean outFinished) throws IOException { + long toWrite; + synchronized (SpdyStream.this) { + writeTimeout.enter(); + try { + while (bytesLeftInWriteWindow <= 0 && !finished && !closed && errorCode == null) { + waitForIo(); // Wait until we receive a WINDOW_UPDATE. + } + } finally { + writeTimeout.exitAndThrowIfTimedOut(); } - byteCount -= toWrite; - connection.writeData(id, false, source, toWrite); + checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting. + toWrite = Math.min(bytesLeftInWriteWindow, sendBuffer.size()); + bytesLeftInWriteWindow -= toWrite; } + + connection.writeData(id, outFinished && toWrite == sendBuffer.size(), sendBuffer, toWrite); } @Override public void flush() throws IOException { @@ -505,6 +520,9 @@ public final class SpdyStream { synchronized (SpdyStream.this) { checkOutNotClosed(); } + while (sendBuffer.size() > 0) { + emitDataFrame(false); + } connection.flush(); } @@ -518,7 +536,15 @@ public final class SpdyStream { if (closed) return; } if (!sink.finished) { - connection.writeData(id, true, null, 0); + // Emit the remaining data, setting the END_STREAM flag on the last frame. + if (sendBuffer.size() > 0) { + while (sendBuffer.size() > 0) { + emitDataFrame(true); + } + } else { + // Send an empty frame just so we can set the END_STREAM flag. + connection.writeData(id, true, null, 0); + } } synchronized (SpdyStream.this) { closed = true;