diff --git a/src/main/java/com/squareup/okhttp/internal/spdy/Settings.java b/src/main/java/com/squareup/okhttp/internal/spdy/Settings.java index 3bcfbccdd..cc7f119ba 100644 --- a/src/main/java/com/squareup/okhttp/internal/spdy/Settings.java +++ b/src/main/java/com/squareup/okhttp/internal/spdy/Settings.java @@ -16,6 +16,9 @@ package com.squareup.okhttp.internal.spdy; final class Settings { + /** From the spdy/3 spec, the default initial window size for all streams is 64 KiB. */ + static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * 1024; + /** Peer request to clear durable settings. */ static final int FLAG_CLEAR_PREVIOUSLY_PERSISTED_SETTINGS = 0x1; diff --git a/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java b/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java index 1834fa8af..04bf39755 100644 --- a/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java +++ b/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java @@ -205,6 +205,21 @@ public final class SpdyConnection implements Closeable { spdyWriter.rstStream(streamId, statusCode); } + void writeWindowUpdateLater(final int streamId, final int deltaWindowSize) { + writeExecutor.execute(new Runnable() { + @Override public void run() { + try { + writeWindowUpdate(streamId, deltaWindowSize); + } catch (IOException ignored) { + } + } + }); + } + + void writeWindowUpdate(int streamId, int deltaWindowSize) throws IOException { + spdyWriter.windowUpdate(streamId, deltaWindowSize); + } + /** * Sends a ping frame to the peer. Use the returned object to await the * ping's response and observe its round trip time. diff --git a/src/main/java/com/squareup/okhttp/internal/spdy/SpdyReader.java b/src/main/java/com/squareup/okhttp/internal/spdy/SpdyReader.java index 15a4cece2..c7ca771e5 100644 --- a/src/main/java/com/squareup/okhttp/internal/spdy/SpdyReader.java +++ b/src/main/java/com/squareup/okhttp/internal/spdy/SpdyReader.java @@ -187,6 +187,7 @@ final class SpdyReader implements Closeable { } private void readWindowUpdate(Handler handler, int flags, int length) throws IOException { + if (length != 8) throw ioException("TYPE_WINDOW_UPDATE length: %d != 8", length); int w1 = in.readInt(); int w2 = in.readInt(); int streamId = w1 & 0x7fffffff; diff --git a/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java b/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java index 4005de677..3189d130d 100644 --- a/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java +++ b/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java @@ -68,6 +68,14 @@ public final class SpdyStream { public static final int RST_INVALID_CREDENTIALS = 10; public static final int RST_FRAME_TOO_LARGE = 11; + /** + * The number of unacknowledged bytes at which the input stream will send + * the peer a {@code WINDOW_UPDATE} frame. Must be less than this client's + * window size, otherwise the remote peer will stop sending data on this + * stream. + */ + public static final int WINDOW_UPDATE_THRESHOLD = Settings.DEFAULT_INITIAL_WINDOW_SIZE / 2; + private final int id; private final SpdyConnection connection; private final int priority; @@ -357,7 +365,7 @@ public final class SpdyStream { * limit pos */ - private final byte[] buffer = new byte[64 * 1024]; // 64KiB specified by TODO + private final byte[] buffer = new byte[Settings.DEFAULT_INITIAL_WINDOW_SIZE]; /** the next byte to be read, or -1 if the buffer is empty. Never buffer.length */ private int pos = -1; @@ -374,6 +382,13 @@ public final class SpdyStream { */ private boolean finished; + /** + * The total number of bytes consumed by the application (with {@link + * #read}), but not yet acknowledged by sending a {@code WINDOW_UPDATE} + * frame. + */ + private int unacknowledgedBytes = 0; + @Override public int available() throws IOException { synchronized (SpdyStream.this) { checkNotClosed(); @@ -422,7 +437,12 @@ public final class SpdyStream { copied += bytesToCopy; } - // TODO: notify peer of flow-control + // Flow control: notify the peer that we're ready for more data! + unacknowledgedBytes += copied; + if (unacknowledgedBytes >= WINDOW_UPDATE_THRESHOLD) { + connection.writeWindowUpdateLater(id, unacknowledgedBytes); + unacknowledgedBytes = 0; + } if (pos == limit) { pos = -1; @@ -614,6 +634,9 @@ public final class SpdyStream { private void writeFrame(boolean last) throws IOException { assert (!Thread.holdsLock(SpdyStream.this)); + + // TODO: Await flow control (WINDOW_UPDATE) if necessary. + int flags = 0; if (last) { flags |= SpdyConnection.FLAG_FIN; diff --git a/src/main/java/com/squareup/okhttp/internal/spdy/SpdyWriter.java b/src/main/java/com/squareup/okhttp/internal/spdy/SpdyWriter.java index 2119acb3f..8605ed707 100644 --- a/src/main/java/com/squareup/okhttp/internal/spdy/SpdyWriter.java +++ b/src/main/java/com/squareup/okhttp/internal/spdy/SpdyWriter.java @@ -165,9 +165,15 @@ final class SpdyWriter implements Closeable { out.flush(); } - public synchronized void windowUpdate(int flags, int streamId, int deltaWindowSize) - throws IOException { - throw new UnsupportedOperationException("TODO"); // TODO + public synchronized void windowUpdate(int streamId, int deltaWindowSize) throws IOException { + int type = SpdyConnection.TYPE_WINDOW_UPDATE; + int flags = 0; + int length = 8; + out.writeInt(0x80000000 | (SpdyConnection.VERSION & 0x7fff) << 16 | type & 0xffff); + out.writeInt((flags & 0xff) << 24 | length & 0xffffff); + out.writeInt(streamId); + out.writeInt(deltaWindowSize); + out.flush(); } @Override public void close() throws IOException { diff --git a/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java b/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java index 5824d0c23..581231877 100644 --- a/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java +++ b/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java @@ -29,11 +29,13 @@ import static com.squareup.okhttp.internal.spdy.SpdyConnection.TYPE_PING; import static com.squareup.okhttp.internal.spdy.SpdyConnection.TYPE_RST_STREAM; import static com.squareup.okhttp.internal.spdy.SpdyConnection.TYPE_SYN_REPLY; import static com.squareup.okhttp.internal.spdy.SpdyConnection.TYPE_SYN_STREAM; +import static com.squareup.okhttp.internal.spdy.SpdyConnection.TYPE_WINDOW_UPDATE; import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_FLOW_CONTROL_ERROR; import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_INVALID_STREAM; import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_PROTOCOL_ERROR; import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_REFUSED_STREAM; import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_STREAM_IN_USE; +import static com.squareup.okhttp.internal.spdy.SpdyStream.WINDOW_UPDATE_THRESHOLD; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -913,6 +915,42 @@ public final class SpdyConnectionTest { assertEquals(RST_PROTOCOL_ERROR, rstStream.statusCode); } + @Test public void readSendsWindowUpdate() throws Exception { + // Write the mocking script. + peer.acceptFrame(); // SYN STREAM + peer.sendFrame().synReply(0, 1, Arrays.asList("a", "android")); + for (int i = 0; i < 3; i++) { + peer.sendFrame().data(0, 1, new byte[WINDOW_UPDATE_THRESHOLD]); + peer.acceptFrame(); // WINDOW UPDATE + } + peer.sendFrame().data(FLAG_FIN, 1, new byte[0]); + peer.play(); + + // Play it back. + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyStream stream = connection.newStream(Arrays.asList("b", "banana"), true, true); + assertEquals(Arrays.asList("a", "android"), stream.getResponseHeaders()); + InputStream in = stream.getInputStream(); + int total = 0; + byte[] buffer = new byte[1024]; + int count; + while ((count = in.read(buffer)) != -1) { + total += count; + if (total == 3 * WINDOW_UPDATE_THRESHOLD) break; + } + assertEquals(-1, in.read()); + + // Verify the peer received what was expected. + MockSpdyPeer.InFrame synStream = peer.takeFrame(); + assertEquals(TYPE_SYN_STREAM, synStream.type); + for (int i = 0; i < 3; i++) { + MockSpdyPeer.InFrame windowUpdate = peer.takeFrame(); + assertEquals(TYPE_WINDOW_UPDATE, windowUpdate.type); + assertEquals(1, windowUpdate.streamId); + assertEquals(WINDOW_UPDATE_THRESHOLD, windowUpdate.deltaWindowSize); + } + } + private void writeAndClose(SpdyStream stream, String data) throws IOException { OutputStream out = stream.getOutputStream(); out.write(data.getBytes("UTF-8"));