From d990b5819cd6282923134564f3bcc59263eb0616 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Sun, 26 Jan 2014 13:01:34 -0800 Subject: [PATCH] spdy streams honor write window updates. --- .../okhttp/internal/spdy/SpdyConnection.java | 64 +++++---- .../okhttp/internal/spdy/SpdyStream.java | 122 +++++++----------- .../internal/spdy/SpdyConnectionTest.java | 115 +++++++++++++++-- .../internal/http/HttpOverSpdyTest.java | 19 +++ 4 files changed, 214 insertions(+), 106 deletions(-) diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java index f9226e332..b784392d1 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java @@ -128,7 +128,7 @@ public final class SpdyConnection implements Closeable { peerSettings = new Settings(); // TODO: implement stream limit // okHttpSettings.set(Settings.MAX_CONCURRENT_STREAMS, 0, max); - bufferPool = new ByteArrayPool(peerSettings.getInitialWindowSize() * 8); + bufferPool = new ByteArrayPool(initialWindowSize * 8); // TODO: revisit size limit! frameReader = variant.newReader(builder.in, client); frameWriter = variant.newWriter(builder.out, client); @@ -149,7 +149,7 @@ public final class SpdyConnection implements Closeable { return streams.size(); } - private synchronized SpdyStream getStream(int id) { + synchronized SpdyStream getStream(int id) { return streams.get(id); } @@ -223,8 +223,27 @@ public final class SpdyConnection implements Closeable { frameWriter.synReply(outFinished, streamId, alternating); } + /** + * Callers of this method are not thread safe, and sometimes on application + * threads. Most often, this method will be called to send a buffer worth of + * data to the peer. + *

+ * Writes are subject to the write window of the stream and the connection. + * Until there is a window sufficient to send {@code byteCount}, the caller + * will block. For example, a user of {@code HttpURLConnection} who flushes + * more bytes to the output stream than the connection's write window will + * block. + *

+ * Zero {@code byteCount} writes are not subject to flow control and + * will not block. The only use case for zero {@code byteCount} is closing + * a flushed output stream. + */ public void writeData(int streamId, boolean outFinished, byte[] buffer, int offset, int byteCount) throws IOException { + if (byteCount == 0) { // Empty data frames are not flow-controlled. + frameWriter.data(outFinished, streamId, buffer, offset, byteCount); + return; + } synchronized (SpdyConnection.this) { waitUntilWritable(byteCount); bytesLeftInWriteWindow -= byteCount; @@ -267,21 +286,17 @@ public final class SpdyConnection implements Closeable { frameWriter.rstStream(streamId, statusCode); } - void writeWindowUpdateLater(final int streamId, final int windowSizeIncrement) { + void writeWindowUpdateLater(final int streamId, final long windowSizeIncrement) { executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) { @Override public void execute() { try { - writeWindowUpdate(streamId, windowSizeIncrement); + frameWriter.windowUpdate(streamId, windowSizeIncrement); } catch (IOException ignored) { } } }); } - void writeWindowUpdate(int streamId, int windowSizeIncrement) throws IOException { - frameWriter.windowUpdate(streamId, windowSizeIncrement); - } - /** * Sends a ping frame to the peer. Use the returned object to await the * ping's response and observe its round trip time. @@ -486,6 +501,10 @@ public final class SpdyConnection implements Closeable { } } + /** + * Methods in this class must not lock FrameWriter. If a method needs to + * write a frame, create an async task to do so. + */ class Reader extends NamedRunnable implements FrameReader.Handler { private Reader() { super("OkHttp %s", hostName); @@ -585,6 +604,7 @@ public final class SpdyConnection implements Closeable { } @Override public void settings(boolean clearPrevious, Settings newSettings) { + long delta = 0; SpdyStream[] streamsToNotify = null; synchronized (SpdyConnection.this) { if (clearPrevious) { @@ -597,27 +617,21 @@ public final class SpdyConnection implements Closeable { } int peerInitialWindowSize = peerSettings.getInitialWindowSize(); if (peerInitialWindowSize != -1 && peerInitialWindowSize != initialWindowSize) { - long delta = peerInitialWindowSize - initialWindowSize; + delta = peerInitialWindowSize - initialWindowSize; SpdyConnection.this.initialWindowSize = peerInitialWindowSize; if (!receivedInitialPeerSettings) { addBytesToWriteWindow(delta); receivedInitialPeerSettings = true; } - } - if (!streams.isEmpty()) { - streamsToNotify = streams.values().toArray(new SpdyStream[streams.size()]); + if (!streams.isEmpty()) { + streamsToNotify = streams.values().toArray(new SpdyStream[streams.size()]); + } } } - if (streamsToNotify != null) { - for (SpdyStream stream : streamsToNotify) { - // The synchronization here is ugly. We need to synchronize on 'this' to guard - // reads to 'peerSettings'. We synchronize on 'stream' to guard the state change. - // And we need to acquire the 'stream' lock first, since that may block. - // TODO: this can block the reader thread until a write completes. That's bad! + if (streamsToNotify != null && delta != 0) { + for (SpdyStream stream : streams.values()) { synchronized (stream) { - synchronized (SpdyConnection.this) { - stream.receiveSettings(peerSettings); - } + stream.addBytesToWriteWindow(delta); } } } @@ -653,8 +667,7 @@ public final class SpdyConnection implements Closeable { } } - @Override - public void goAway(int lastGoodStreamId, ErrorCode errorCode, byte[] debugData) { + @Override public void goAway(int lastGoodStreamId, ErrorCode errorCode, byte[] debugData) { if (debugData.length > 0) { // TODO: log the debugData } synchronized (SpdyConnection.this) { @@ -680,10 +693,11 @@ public final class SpdyConnection implements Closeable { SpdyConnection.this.notifyAll(); } } else { - // TODO: honor endFlowControl SpdyStream stream = getStream(streamId); if (stream != null) { - stream.receiveWindowUpdate(windowSizeIncrement); + synchronized (stream) { + stream.addBytesToWriteWindow(windowSizeIncrement); + } } } } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java index 7336f1dbb..c8ac6d9e9 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java @@ -41,7 +41,14 @@ public final class SpdyStream { * stream. (Chrome 25 uses 5 MiB.) */ int windowUpdateThreshold; - private int writeWindowSize; + + /** + * Count of bytes that can be written on the stream before receiving a + * window update. Even if this is positive, writes will block until there + * available bytes in {@code connection.bytesLeftInWriteWindow}. + */ + // guarded by this + long bytesLeftInWriteWindow = 0; private final int id; private final SpdyConnection connection; @@ -70,7 +77,7 @@ public final class SpdyStream { if (requestHeaders == null) throw new NullPointerException("requestHeaders == null"); this.id = id; this.connection = connection; - this.writeWindowSize = initialWriteWindow; + this.bytesLeftInWriteWindow = initialWriteWindow; this.windowUpdateThreshold = initialWriteWindow / 2; this.in = new SpdyDataInputStream(initialWriteWindow); this.out = new SpdyDataOutputStream(); @@ -311,28 +318,6 @@ public final class SpdyStream { } } - private void setPeerSettings(Settings peerSettings) { - // TODO: For HTTP/2, also adjust the stream flow control window size - // by the difference between the new value and the old value. - assert (Thread.holdsLock(connection)); // Because 'settings' is guarded by 'connection'. - long delta = peerSettings.getInitialWindowSize() - writeWindowSize; - this.writeWindowSize = peerSettings.getInitialWindowSize(); - this.windowUpdateThreshold = peerSettings.getInitialWindowSize() / 2; - receiveWindowUpdate(delta); - } - - /** Notification received when peer settings change. */ - void receiveSettings(Settings peerSettings) { - assert (Thread.holdsLock(this)); - setPeerSettings(peerSettings); - notifyAll(); - } - - synchronized void receiveWindowUpdate(long windowSizeIncrement) { - out.unacknowledgedBytes -= windowSizeIncrement; - notifyAll(); - } - int getPriority() { return priority; } @@ -579,7 +564,7 @@ public final class SpdyStream { * An output stream that writes outgoing data frames of a stream. This class * is not thread safe. */ - private final class SpdyDataOutputStream extends OutputStream { + final class SpdyDataOutputStream extends OutputStream { private final byte[] buffer = SpdyStream.this.connection.bufferPool.getBuf(OUTPUT_BUFFER_SIZE); private int pos = 0; @@ -592,13 +577,6 @@ public final class SpdyStream { */ private boolean finished; - /** - * The total number of bytes written out to the peer, but not yet - * acknowledged with an incoming {@code WINDOW_UPDATE} frame. Writes - * block if they cause this to exceed the {@code WINDOW_SIZE}. - */ - private long unacknowledgedBytes = 0; - @Override public void write(int b) throws IOException { Util.writeSingleByte(this, b); } @@ -606,11 +584,13 @@ public final class SpdyStream { @Override public void write(byte[] bytes, int offset, int count) throws IOException { assert (!Thread.holdsLock(SpdyStream.this)); checkOffsetAndCount(bytes.length, offset, count); - checkNotClosed(); + synchronized (SpdyStream.this) { + checkOutNotClosed(); + } while (count > 0) { if (pos == buffer.length) { - writeFrame(false); + writeFrame(); } int bytesToCopy = Math.min(count, buffer.length - pos); System.arraycopy(bytes, offset, buffer, pos, bytesToCopy); @@ -622,9 +602,11 @@ public final class SpdyStream { @Override public void flush() throws IOException { assert (!Thread.holdsLock(SpdyStream.this)); - checkNotClosed(); + synchronized (SpdyStream.this) { + checkOutNotClosed(); + } if (pos > 0) { - writeFrame(false); + writeFrame(); connection.flush(); } } @@ -639,59 +621,53 @@ public final class SpdyStream { SpdyStream.this.connection.bufferPool.returnBuf(buffer); } if (!out.finished) { - writeFrame(true); + connection.writeData(id, true, buffer, 0, pos); } connection.flush(); cancelStreamIfNecessary(); } - private void writeFrame(boolean outFinished) throws IOException { + private void writeFrame() throws IOException { assert (!Thread.holdsLock(SpdyStream.this)); int length = pos; synchronized (SpdyStream.this) { - waitUntilWritable(length, outFinished); - unacknowledgedBytes += length; + waitUntilWritable(length); + checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting. + bytesLeftInWriteWindow -= length; } - connection.writeData(id, outFinished, buffer, 0, pos); + connection.writeData(id, false, buffer, 0, pos); pos = 0; } + } - /** - * Returns once the peer is ready to receive {@code count} bytes. - * - * @throws IOException if the stream was finished or closed, or the - * thread was interrupted. - */ - private void waitUntilWritable(int count, boolean last) throws IOException { - try { - while (unacknowledgedBytes + count >= writeWindowSize) { - SpdyStream.this.wait(); // Wait until we receive a WINDOW_UPDATE. - - // The stream may have been closed or reset while we were waiting! - if (!last && closed) { - throw new IOException("stream closed"); - } else if (finished) { - throw new IOException("stream finished"); - } else if (errorCode != null) { - throw new IOException("stream was reset: " + errorCode); - } - } - } catch (InterruptedException e) { - throw new InterruptedIOException(); + /** Returns once the peer is ready to receive {@code byteCount} bytes. */ + private void waitUntilWritable(int byteCount) throws IOException { + try { + while (byteCount > bytesLeftInWriteWindow) { + SpdyStream.this.wait(); // Wait until we receive a WINDOW_UPDATE. } + } catch (InterruptedException e) { + throw new InterruptedIOException(); } + } - private void checkNotClosed() throws IOException { - synchronized (SpdyStream.this) { - if (closed) { - throw new IOException("stream closed"); - } else if (finished) { - throw new IOException("stream finished"); - } else if (errorCode != null) { - throw new IOException("stream was reset: " + errorCode); - } - } + /** + * {@code delta} will be negative if a settings frame initial window is + * smaller than the last. + */ + void addBytesToWriteWindow(long delta) { + bytesLeftInWriteWindow += delta; + if (delta > 0) SpdyStream.this.notifyAll(); + } + + private void checkOutNotClosed() throws IOException { + if (out.closed) { + throw new IOException("stream closed"); + } else if (out.finished) { + throw new IOException("stream finished"); + } else if (errorCode != null) { + throw new IOException("stream was reset: " + errorCode); } } } diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java index 1d1e5e25a..52169a512 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java @@ -322,7 +322,8 @@ public final class SpdyConnectionTest { assertEquals(3368, connection.initialWindowSize); assertEquals(1684, connection.bytesLeftInWriteWindow); // initial wasn't affected. - assertEquals(1684, stream.windowUpdateThreshold); + // New Stream is has the most recent initial window size. + assertEquals(3368, stream.bytesLeftInWriteWindow); } @Test public void unexpectedPingIsNotReturned() throws Exception { @@ -1177,13 +1178,8 @@ public final class SpdyConnectionTest { SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true); OutputStream out = stream.getOutputStream(); out.write(new byte[windowSize]); - interruptAfterDelay(500); - try { - out.write('a'); - out.flush(); - fail(); - } catch (InterruptedIOException expected) { - } + out.write('a'); + assertFlushBlocks(out); // Verify the peer received what was expected. MockSpdyPeer.InFrame synStream = peer.takeFrame(); @@ -1192,6 +1188,56 @@ public final class SpdyConnectionTest { assertEquals(TYPE_DATA, data.type); } + @Test public void initialSettingsWithWindowSizeAdjustsConnection() throws Exception { + int initialWindowSize = 65535; + int framesThatFillWindow = roundUp(initialWindowSize, SpdyStream.OUTPUT_BUFFER_SIZE); + + // Write the mocking script. This accepts more data frames than necessary! + peer.acceptFrame(); // SYN_STREAM + for (int i = 0; i < framesThatFillWindow; i++) { + peer.acceptFrame(); // DATA on stream 1 + } + peer.acceptFrame(); // DATA on stream 2 + peer.play(); + + // Play it back. + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyStream stream = connection.newStream(headerEntries("a", "apple"), true, true); + OutputStream out = stream.getOutputStream(); + out.write(new byte[initialWindowSize]); + out.flush(); + + // write 1 more than the window size + out.write('a'); + assertFlushBlocks(out); + + // Check that we've filled the window for both the stream and also the connection. + assertEquals(0, connection.bytesLeftInWriteWindow); + assertEquals(0, connection.getStream(1).bytesLeftInWriteWindow); + + // Receiving a Settings with a larger window size will unblock the streams. + Settings initial = new Settings(); + initial.set(Settings.INITIAL_WINDOW_SIZE, PERSIST_VALUE, initialWindowSize + 1); + connection.readerRunnable.settings(false, initial); + + assertEquals(1, connection.bytesLeftInWriteWindow); + assertEquals(1, connection.getStream(1).bytesLeftInWriteWindow); + + // The stream should no longer be blocked. + out.flush(); + + assertEquals(0, connection.bytesLeftInWriteWindow); + assertEquals(0, connection.getStream(1).bytesLeftInWriteWindow); + + // Settings after the initial do not affect the connection window size. + Settings next = new Settings(); + next.set(Settings.INITIAL_WINDOW_SIZE, PERSIST_VALUE, initialWindowSize + 2); + connection.readerRunnable.settings(false, next); + + assertEquals(0, connection.bytesLeftInWriteWindow); // connection wasn't affected. + assertEquals(1, connection.getStream(1).bytesLeftInWriteWindow); + } + @Test public void testTruncatedDataFrame() throws Exception { // write the mocking script peer.acceptFrame(); // SYN_STREAM @@ -1212,6 +1258,46 @@ public final class SpdyConnectionTest { } } + @Test public void blockedStreamDoesntStarveNewStream() throws Exception { + int initialWindowSize = 65535; + int framesThatFillWindow = roundUp(initialWindowSize, SpdyStream.OUTPUT_BUFFER_SIZE); + + // Write the mocking script. This accepts more data frames than necessary! + peer.acceptFrame(); // SYN_STREAM + for (int i = 0; i < framesThatFillWindow; i++) { + peer.acceptFrame(); // DATA on stream 1 + } + peer.acceptFrame(); // DATA on stream 2 + peer.play(); + + // Play it back. + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyStream stream1 = connection.newStream(headerEntries("a", "apple"), true, true); + OutputStream out1 = stream1.getOutputStream(); + out1.write(new byte[initialWindowSize]); + out1.flush(); + + // Check that we've filled the window for both the stream and also the connection. + assertEquals(0, connection.bytesLeftInWriteWindow); + assertEquals(0, connection.getStream(1).bytesLeftInWriteWindow); + + // receiving a window update on the the connection will unblock new streams. + connection.readerRunnable.windowUpdate(0, 3); + + assertEquals(3, connection.bytesLeftInWriteWindow); + assertEquals(0, connection.getStream(1).bytesLeftInWriteWindow); + + // Another stream should be able to send data even though 1 is blocked. + SpdyStream stream2 = connection.newStream(headerEntries("b", "banana"), true, true); + OutputStream out2 = stream2.getOutputStream(); + out2.write("foo".getBytes(UTF_8)); + out2.flush(); + + assertEquals(0, connection.bytesLeftInWriteWindow); + assertEquals(0, connection.getStream(1).bytesLeftInWriteWindow); + assertEquals(initialWindowSize - 3, connection.getStream(3).bytesLeftInWriteWindow); + } + /** * This tests that data frames are written in chunks limited by the * SpdyDataOutputStream buffer size. A side-effect is that this size @@ -1344,6 +1430,15 @@ public final class SpdyConnectionTest { assertEquals(expected, actual); } + private void assertFlushBlocks(OutputStream out) throws IOException { + interruptAfterDelay(500); + try { + out.flush(); + fail(); + } catch (InterruptedIOException expected) { + } + } + /** Interrupts the current thread after {@code delayMillis}. */ private void interruptAfterDelay(final long delayMillis) { final Thread toInterrupt = Thread.currentThread(); @@ -1358,4 +1453,8 @@ public final class SpdyConnectionTest { } }.start(); } + + static int roundUp(int num, int divisor) { + return (num + divisor - 1) / divisor; + } } diff --git a/okhttp/src/test/java/com/squareup/okhttp/internal/http/HttpOverSpdyTest.java b/okhttp/src/test/java/com/squareup/okhttp/internal/http/HttpOverSpdyTest.java index 8cc7bfd70..b22520a1d 100644 --- a/okhttp/src/test/java/com/squareup/okhttp/internal/http/HttpOverSpdyTest.java +++ b/okhttp/src/test/java/com/squareup/okhttp/internal/http/HttpOverSpdyTest.java @@ -157,6 +157,25 @@ public abstract class HttpOverSpdyTest { assertEquals(postBytes.length, Integer.parseInt(request.getHeader("Content-Length"))); } + @Test public void closeAfterFlush() throws Exception { + MockResponse response = new MockResponse().setBody("ABCDE"); + server.enqueue(response); + server.play(); + + HttpURLConnection connection = client.open(server.getUrl("/foo")); + connection.setRequestProperty("Content-Length", String.valueOf(postBytes.length)); + connection.setDoOutput(true); + connection.getOutputStream().write(postBytes); // push bytes into SpdyDataOutputStream.buffer + connection.getOutputStream().flush(); // SpdyConnection.writeData subject to write window + connection.getOutputStream().close(); // SpdyConnection.writeData empty frame + assertContent("ABCDE", connection, Integer.MAX_VALUE); + + RecordedRequest request = server.takeRequest(); + assertEquals("POST /foo HTTP/1.1", request.getRequestLine()); + assertArrayEquals(postBytes, request.getBody()); + assertEquals(postBytes.length, Integer.parseInt(request.getHeader("Content-Length"))); + } + @Test public void setFixedLengthStreamingModeSetsContentLength() throws Exception { MockResponse response = new MockResponse().setBody("ABCDE"); server.enqueue(response);