From 2207d16d458d5f595a2f3dbfcff3fd6f64bed396 Mon Sep 17 00:00:00 2001 From: jwilson Date: Sun, 26 Jan 2014 09:37:05 -0500 Subject: [PATCH] Honor connection-level flow control for writes. We don't write until the peer has given us a budget. --- .../okhttp/internal/spdy/SpdyConnection.java | 40 +++++++++++++++---- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java index c8cb75093..1a206edd5 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java @@ -22,6 +22,7 @@ import com.squareup.okhttp.internal.Util; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.io.OutputStream; import java.net.Socket; import java.util.HashMap; @@ -82,6 +83,12 @@ public final class SpdyConnection implements Closeable { private Map pings; private int nextPingId; + /** + * Count of bytes that can be written on the connection before receiving a + * window update. + */ + private long bytesLeftInWriteWindow = 65535; // TODO: initialize this with settings. + // TODO: Do we want to dynamically adjust settings, or KISS and only set once? // Settings we might send include toggling push, adjusting compression table size. final Settings okHttpSettings; @@ -200,9 +207,24 @@ public final class SpdyConnection implements Closeable { public void writeData(int streamId, boolean outFinished, byte[] buffer, int offset, int byteCount) throws IOException { + synchronized (SpdyConnection.this) { + waitUntilWritable(byteCount); + bytesLeftInWriteWindow -= byteCount; + } frameWriter.data(outFinished, streamId, buffer, offset, byteCount); } + /** Returns once the peer is ready to receive {@code byteCount} bytes. */ + private void waitUntilWritable(int byteCount) throws IOException { + try { + while (byteCount > bytesLeftInWriteWindow) { + SpdyConnection.this.wait(); // Wait until we receive a WINDOW_UPDATE. + } + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } + void writeSynResetLater(final int streamId, final ErrorCode errorCode) { executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) { @Override public void execute() { @@ -620,14 +642,16 @@ public final class SpdyConnection implements Closeable { @Override public void windowUpdate(int streamId, long windowSizeIncrement) { if (streamId == 0) { - // TODO: honor connection-level flow control - return; - } - - // TODO: honor endFlowControl - SpdyStream stream = getStream(streamId); - if (stream != null) { - stream.receiveWindowUpdate(windowSizeIncrement); + synchronized (SpdyConnection.this) { + bytesLeftInWriteWindow += windowSizeIncrement; + notifyAll(); + } + } else { + // TODO: honor endFlowControl + SpdyStream stream = getStream(streamId); + if (stream != null) { + stream.receiveWindowUpdate(windowSizeIncrement); + } } }