1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-27 04:22:07 +03:00

Merge pull request #84 from square/jwilson/send_window_update

Implement flow control for the incoming stream.
This commit is contained in:
Jake Wharton
2013-01-18 11:00:49 -08:00
6 changed files with 91 additions and 5 deletions

View File

@@ -16,6 +16,9 @@
package com.squareup.okhttp.internal.spdy;
final class Settings {
/** From the spdy/3 spec, the default initial window size for all streams is 64 KiB. */
static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * 1024;
/** Peer request to clear durable settings. */
static final int FLAG_CLEAR_PREVIOUSLY_PERSISTED_SETTINGS = 0x1;

View File

@@ -205,6 +205,21 @@ public final class SpdyConnection implements Closeable {
spdyWriter.rstStream(streamId, statusCode);
}
void writeWindowUpdateLater(final int streamId, final int deltaWindowSize) {
writeExecutor.execute(new Runnable() {
@Override public void run() {
try {
writeWindowUpdate(streamId, deltaWindowSize);
} catch (IOException ignored) {
}
}
});
}
void writeWindowUpdate(int streamId, int deltaWindowSize) throws IOException {
spdyWriter.windowUpdate(streamId, deltaWindowSize);
}
/**
* Sends a ping frame to the peer. Use the returned object to await the
* ping's response and observe its round trip time.

View File

@@ -187,6 +187,7 @@ final class SpdyReader implements Closeable {
}
private void readWindowUpdate(Handler handler, int flags, int length) throws IOException {
if (length != 8) throw ioException("TYPE_WINDOW_UPDATE length: %d != 8", length);
int w1 = in.readInt();
int w2 = in.readInt();
int streamId = w1 & 0x7fffffff;

View File

@@ -68,6 +68,14 @@ public final class SpdyStream {
public static final int RST_INVALID_CREDENTIALS = 10;
public static final int RST_FRAME_TOO_LARGE = 11;
/**
* The number of unacknowledged bytes at which the input stream will send
* the peer a {@code WINDOW_UPDATE} frame. Must be less than this client's
* window size, otherwise the remote peer will stop sending data on this
* stream.
*/
public static final int WINDOW_UPDATE_THRESHOLD = Settings.DEFAULT_INITIAL_WINDOW_SIZE / 2;
private final int id;
private final SpdyConnection connection;
private final int priority;
@@ -357,7 +365,7 @@ public final class SpdyStream {
* limit pos
*/
private final byte[] buffer = new byte[64 * 1024]; // 64KiB specified by TODO
private final byte[] buffer = new byte[Settings.DEFAULT_INITIAL_WINDOW_SIZE];
/** the next byte to be read, or -1 if the buffer is empty. Never buffer.length */
private int pos = -1;
@@ -374,6 +382,13 @@ public final class SpdyStream {
*/
private boolean finished;
/**
* The total number of bytes consumed by the application (with {@link
* #read}), but not yet acknowledged by sending a {@code WINDOW_UPDATE}
* frame.
*/
private int unacknowledgedBytes = 0;
@Override public int available() throws IOException {
synchronized (SpdyStream.this) {
checkNotClosed();
@@ -422,7 +437,12 @@ public final class SpdyStream {
copied += bytesToCopy;
}
// TODO: notify peer of flow-control
// Flow control: notify the peer that we're ready for more data!
unacknowledgedBytes += copied;
if (unacknowledgedBytes >= WINDOW_UPDATE_THRESHOLD) {
connection.writeWindowUpdateLater(id, unacknowledgedBytes);
unacknowledgedBytes = 0;
}
if (pos == limit) {
pos = -1;
@@ -614,6 +634,9 @@ public final class SpdyStream {
private void writeFrame(boolean last) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
// TODO: Await flow control (WINDOW_UPDATE) if necessary.
int flags = 0;
if (last) {
flags |= SpdyConnection.FLAG_FIN;

View File

@@ -165,9 +165,15 @@ final class SpdyWriter implements Closeable {
out.flush();
}
public synchronized void windowUpdate(int flags, int streamId, int deltaWindowSize)
throws IOException {
throw new UnsupportedOperationException("TODO"); // TODO
public synchronized void windowUpdate(int streamId, int deltaWindowSize) throws IOException {
int type = SpdyConnection.TYPE_WINDOW_UPDATE;
int flags = 0;
int length = 8;
out.writeInt(0x80000000 | (SpdyConnection.VERSION & 0x7fff) << 16 | type & 0xffff);
out.writeInt((flags & 0xff) << 24 | length & 0xffffff);
out.writeInt(streamId);
out.writeInt(deltaWindowSize);
out.flush();
}
@Override public void close() throws IOException {

View File

@@ -29,11 +29,13 @@ import static com.squareup.okhttp.internal.spdy.SpdyConnection.TYPE_PING;
import static com.squareup.okhttp.internal.spdy.SpdyConnection.TYPE_RST_STREAM;
import static com.squareup.okhttp.internal.spdy.SpdyConnection.TYPE_SYN_REPLY;
import static com.squareup.okhttp.internal.spdy.SpdyConnection.TYPE_SYN_STREAM;
import static com.squareup.okhttp.internal.spdy.SpdyConnection.TYPE_WINDOW_UPDATE;
import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_FLOW_CONTROL_ERROR;
import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_INVALID_STREAM;
import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_PROTOCOL_ERROR;
import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_REFUSED_STREAM;
import static com.squareup.okhttp.internal.spdy.SpdyStream.RST_STREAM_IN_USE;
import static com.squareup.okhttp.internal.spdy.SpdyStream.WINDOW_UPDATE_THRESHOLD;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -913,6 +915,42 @@ public final class SpdyConnectionTest {
assertEquals(RST_PROTOCOL_ERROR, rstStream.statusCode);
}
@Test public void readSendsWindowUpdate() throws Exception {
// Write the mocking script.
peer.acceptFrame(); // SYN STREAM
peer.sendFrame().synReply(0, 1, Arrays.asList("a", "android"));
for (int i = 0; i < 3; i++) {
peer.sendFrame().data(0, 1, new byte[WINDOW_UPDATE_THRESHOLD]);
peer.acceptFrame(); // WINDOW UPDATE
}
peer.sendFrame().data(FLAG_FIN, 1, new byte[0]);
peer.play();
// Play it back.
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyStream stream = connection.newStream(Arrays.asList("b", "banana"), true, true);
assertEquals(Arrays.asList("a", "android"), stream.getResponseHeaders());
InputStream in = stream.getInputStream();
int total = 0;
byte[] buffer = new byte[1024];
int count;
while ((count = in.read(buffer)) != -1) {
total += count;
if (total == 3 * WINDOW_UPDATE_THRESHOLD) break;
}
assertEquals(-1, in.read());
// Verify the peer received what was expected.
MockSpdyPeer.InFrame synStream = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream.type);
for (int i = 0; i < 3; i++) {
MockSpdyPeer.InFrame windowUpdate = peer.takeFrame();
assertEquals(TYPE_WINDOW_UPDATE, windowUpdate.type);
assertEquals(1, windowUpdate.streamId);
assertEquals(WINDOW_UPDATE_THRESHOLD, windowUpdate.deltaWindowSize);
}
}
private void writeAndClose(SpdyStream stream, String data) throws IOException {
OutputStream out = stream.getOutputStream();
out.write(data.getBytes("UTF-8"));