1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-24 04:02:07 +03:00

Merge pull request #482 from square/jwilson_0126_flow_control_writes

Honor connection-level flow control for writes.
This commit is contained in:
Adrian Cole
2014-01-26 09:36:54 -08:00

View File

@@ -21,6 +21,7 @@ import com.squareup.okhttp.internal.Util;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.HashMap;
@@ -81,6 +82,12 @@ public final class SpdyConnection implements Closeable {
private Map<Integer, Ping> pings;
private int nextPingId;
/**
* Count of bytes that can be written on the connection before receiving a
* window update.
*/
private long bytesLeftInWriteWindow = 65535; // TODO: initialize this with settings.
// TODO: Do we want to dynamically adjust settings, or KISS and only set once?
// Settings we might send include toggling push, adjusting compression table size.
final Settings okHttpSettings;
@@ -209,9 +216,24 @@ public final class SpdyConnection implements Closeable {
public void writeData(int streamId, boolean outFinished, byte[] buffer, int offset, int byteCount)
throws IOException {
synchronized (SpdyConnection.this) {
waitUntilWritable(byteCount);
bytesLeftInWriteWindow -= byteCount;
}
frameWriter.data(outFinished, streamId, buffer, offset, byteCount);
}
/** Returns once the peer is ready to receive {@code byteCount} bytes. */
private void waitUntilWritable(int byteCount) throws IOException {
try {
while (byteCount > bytesLeftInWriteWindow) {
SpdyConnection.this.wait(); // Wait until we receive a WINDOW_UPDATE.
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
void writeSynResetLater(final int streamId, final ErrorCode errorCode) {
executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) {
@Override public void execute() {
@@ -622,14 +644,16 @@ public final class SpdyConnection implements Closeable {
@Override public void windowUpdate(int streamId, long windowSizeIncrement) {
if (streamId == 0) {
// TODO: honor connection-level flow control
return;
}
// TODO: honor endFlowControl
SpdyStream stream = getStream(streamId);
if (stream != null) {
stream.receiveWindowUpdate(windowSizeIncrement);
synchronized (SpdyConnection.this) {
bytesLeftInWriteWindow += windowSizeIncrement;
notifyAll();
}
} else {
// TODO: honor endFlowControl
SpdyStream stream = getStream(streamId);
if (stream != null) {
stream.receiveWindowUpdate(windowSizeIncrement);
}
}
}