1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-24 04:02:07 +03:00

Merge pull request #484 from adriancole/stream-window-update

spdy streams honor write window updates
This commit is contained in:
Adrian Cole
2014-01-26 14:02:55 -08:00
4 changed files with 214 additions and 106 deletions

View File

@@ -128,7 +128,7 @@ public final class SpdyConnection implements Closeable {
peerSettings = new Settings();
// TODO: implement stream limit
// okHttpSettings.set(Settings.MAX_CONCURRENT_STREAMS, 0, max);
bufferPool = new ByteArrayPool(peerSettings.getInitialWindowSize() * 8);
bufferPool = new ByteArrayPool(initialWindowSize * 8); // TODO: revisit size limit!
frameReader = variant.newReader(builder.in, client);
frameWriter = variant.newWriter(builder.out, client);
@@ -149,7 +149,7 @@ public final class SpdyConnection implements Closeable {
return streams.size();
}
private synchronized SpdyStream getStream(int id) {
synchronized SpdyStream getStream(int id) {
return streams.get(id);
}
@@ -223,8 +223,27 @@ public final class SpdyConnection implements Closeable {
frameWriter.synReply(outFinished, streamId, alternating);
}
/**
* Callers of this method are not thread safe, and sometimes on application
* threads. Most often, this method will be called to send a buffer worth of
* data to the peer.
* <p>
* Writes are subject to the write window of the stream and the connection.
* Until there is a window sufficient to send {@code byteCount}, the caller
* will block. For example, a user of {@code HttpURLConnection} who flushes
* more bytes to the output stream than the connection's write window will
* block.
* <p>
* Zero {@code byteCount} writes are not subject to flow control and
* will not block. The only use case for zero {@code byteCount} is closing
* a flushed output stream.
*/
public void writeData(int streamId, boolean outFinished, byte[] buffer, int offset, int byteCount)
throws IOException {
if (byteCount == 0) { // Empty data frames are not flow-controlled.
frameWriter.data(outFinished, streamId, buffer, offset, byteCount);
return;
}
synchronized (SpdyConnection.this) {
waitUntilWritable(byteCount);
bytesLeftInWriteWindow -= byteCount;
@@ -267,21 +286,17 @@ public final class SpdyConnection implements Closeable {
frameWriter.rstStream(streamId, statusCode);
}
void writeWindowUpdateLater(final int streamId, final int windowSizeIncrement) {
void writeWindowUpdateLater(final int streamId, final long windowSizeIncrement) {
executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) {
@Override public void execute() {
try {
writeWindowUpdate(streamId, windowSizeIncrement);
frameWriter.windowUpdate(streamId, windowSizeIncrement);
} catch (IOException ignored) {
}
}
});
}
void writeWindowUpdate(int streamId, int windowSizeIncrement) throws IOException {
frameWriter.windowUpdate(streamId, windowSizeIncrement);
}
/**
* Sends a ping frame to the peer. Use the returned object to await the
* ping's response and observe its round trip time.
@@ -486,6 +501,10 @@ public final class SpdyConnection implements Closeable {
}
}
/**
* Methods in this class must not lock FrameWriter. If a method needs to
* write a frame, create an async task to do so.
*/
class Reader extends NamedRunnable implements FrameReader.Handler {
private Reader() {
super("OkHttp %s", hostName);
@@ -585,6 +604,7 @@ public final class SpdyConnection implements Closeable {
}
@Override public void settings(boolean clearPrevious, Settings newSettings) {
long delta = 0;
SpdyStream[] streamsToNotify = null;
synchronized (SpdyConnection.this) {
if (clearPrevious) {
@@ -597,27 +617,21 @@ public final class SpdyConnection implements Closeable {
}
int peerInitialWindowSize = peerSettings.getInitialWindowSize();
if (peerInitialWindowSize != -1 && peerInitialWindowSize != initialWindowSize) {
long delta = peerInitialWindowSize - initialWindowSize;
delta = peerInitialWindowSize - initialWindowSize;
SpdyConnection.this.initialWindowSize = peerInitialWindowSize;
if (!receivedInitialPeerSettings) {
addBytesToWriteWindow(delta);
receivedInitialPeerSettings = true;
}
}
if (!streams.isEmpty()) {
streamsToNotify = streams.values().toArray(new SpdyStream[streams.size()]);
if (!streams.isEmpty()) {
streamsToNotify = streams.values().toArray(new SpdyStream[streams.size()]);
}
}
}
if (streamsToNotify != null) {
for (SpdyStream stream : streamsToNotify) {
// The synchronization here is ugly. We need to synchronize on 'this' to guard
// reads to 'peerSettings'. We synchronize on 'stream' to guard the state change.
// And we need to acquire the 'stream' lock first, since that may block.
// TODO: this can block the reader thread until a write completes. That's bad!
if (streamsToNotify != null && delta != 0) {
for (SpdyStream stream : streams.values()) {
synchronized (stream) {
synchronized (SpdyConnection.this) {
stream.receiveSettings(peerSettings);
}
stream.addBytesToWriteWindow(delta);
}
}
}
@@ -653,8 +667,7 @@ public final class SpdyConnection implements Closeable {
}
}
@Override
public void goAway(int lastGoodStreamId, ErrorCode errorCode, byte[] debugData) {
@Override public void goAway(int lastGoodStreamId, ErrorCode errorCode, byte[] debugData) {
if (debugData.length > 0) { // TODO: log the debugData
}
synchronized (SpdyConnection.this) {
@@ -680,10 +693,11 @@ public final class SpdyConnection implements Closeable {
SpdyConnection.this.notifyAll();
}
} else {
// TODO: honor endFlowControl
SpdyStream stream = getStream(streamId);
if (stream != null) {
stream.receiveWindowUpdate(windowSizeIncrement);
synchronized (stream) {
stream.addBytesToWriteWindow(windowSizeIncrement);
}
}
}
}

View File

@@ -41,7 +41,14 @@ public final class SpdyStream {
* stream. (Chrome 25 uses 5 MiB.)
*/
int windowUpdateThreshold;
private int writeWindowSize;
/**
* Count of bytes that can be written on the stream before receiving a
* window update. Even if this is positive, writes will block until there
* available bytes in {@code connection.bytesLeftInWriteWindow}.
*/
// guarded by this
long bytesLeftInWriteWindow = 0;
private final int id;
private final SpdyConnection connection;
@@ -70,7 +77,7 @@ public final class SpdyStream {
if (requestHeaders == null) throw new NullPointerException("requestHeaders == null");
this.id = id;
this.connection = connection;
this.writeWindowSize = initialWriteWindow;
this.bytesLeftInWriteWindow = initialWriteWindow;
this.windowUpdateThreshold = initialWriteWindow / 2;
this.in = new SpdyDataInputStream(initialWriteWindow);
this.out = new SpdyDataOutputStream();
@@ -311,28 +318,6 @@ public final class SpdyStream {
}
}
private void setPeerSettings(Settings peerSettings) {
// TODO: For HTTP/2, also adjust the stream flow control window size
// by the difference between the new value and the old value.
assert (Thread.holdsLock(connection)); // Because 'settings' is guarded by 'connection'.
long delta = peerSettings.getInitialWindowSize() - writeWindowSize;
this.writeWindowSize = peerSettings.getInitialWindowSize();
this.windowUpdateThreshold = peerSettings.getInitialWindowSize() / 2;
receiveWindowUpdate(delta);
}
/** Notification received when peer settings change. */
void receiveSettings(Settings peerSettings) {
assert (Thread.holdsLock(this));
setPeerSettings(peerSettings);
notifyAll();
}
synchronized void receiveWindowUpdate(long windowSizeIncrement) {
out.unacknowledgedBytes -= windowSizeIncrement;
notifyAll();
}
int getPriority() {
return priority;
}
@@ -579,7 +564,7 @@ public final class SpdyStream {
* An output stream that writes outgoing data frames of a stream. This class
* is not thread safe.
*/
private final class SpdyDataOutputStream extends OutputStream {
final class SpdyDataOutputStream extends OutputStream {
private final byte[] buffer = SpdyStream.this.connection.bufferPool.getBuf(OUTPUT_BUFFER_SIZE);
private int pos = 0;
@@ -592,13 +577,6 @@ public final class SpdyStream {
*/
private boolean finished;
/**
* The total number of bytes written out to the peer, but not yet
* acknowledged with an incoming {@code WINDOW_UPDATE} frame. Writes
* block if they cause this to exceed the {@code WINDOW_SIZE}.
*/
private long unacknowledgedBytes = 0;
@Override public void write(int b) throws IOException {
Util.writeSingleByte(this, b);
}
@@ -606,11 +584,13 @@ public final class SpdyStream {
@Override public void write(byte[] bytes, int offset, int count) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
checkOffsetAndCount(bytes.length, offset, count);
checkNotClosed();
synchronized (SpdyStream.this) {
checkOutNotClosed();
}
while (count > 0) {
if (pos == buffer.length) {
writeFrame(false);
writeFrame();
}
int bytesToCopy = Math.min(count, buffer.length - pos);
System.arraycopy(bytes, offset, buffer, pos, bytesToCopy);
@@ -622,9 +602,11 @@ public final class SpdyStream {
@Override public void flush() throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
checkNotClosed();
synchronized (SpdyStream.this) {
checkOutNotClosed();
}
if (pos > 0) {
writeFrame(false);
writeFrame();
connection.flush();
}
}
@@ -639,59 +621,53 @@ public final class SpdyStream {
SpdyStream.this.connection.bufferPool.returnBuf(buffer);
}
if (!out.finished) {
writeFrame(true);
connection.writeData(id, true, buffer, 0, pos);
}
connection.flush();
cancelStreamIfNecessary();
}
private void writeFrame(boolean outFinished) throws IOException {
private void writeFrame() throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
int length = pos;
synchronized (SpdyStream.this) {
waitUntilWritable(length, outFinished);
unacknowledgedBytes += length;
waitUntilWritable(length);
checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting.
bytesLeftInWriteWindow -= length;
}
connection.writeData(id, outFinished, buffer, 0, pos);
connection.writeData(id, false, buffer, 0, pos);
pos = 0;
}
}
/**
* Returns once the peer is ready to receive {@code count} bytes.
*
* @throws IOException if the stream was finished or closed, or the
* thread was interrupted.
*/
private void waitUntilWritable(int count, boolean last) throws IOException {
try {
while (unacknowledgedBytes + count >= writeWindowSize) {
SpdyStream.this.wait(); // Wait until we receive a WINDOW_UPDATE.
// The stream may have been closed or reset while we were waiting!
if (!last && closed) {
throw new IOException("stream closed");
} else if (finished) {
throw new IOException("stream finished");
} else if (errorCode != null) {
throw new IOException("stream was reset: " + errorCode);
}
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
/** Returns once the peer is ready to receive {@code byteCount} bytes. */
private void waitUntilWritable(int byteCount) throws IOException {
try {
while (byteCount > bytesLeftInWriteWindow) {
SpdyStream.this.wait(); // Wait until we receive a WINDOW_UPDATE.
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
private void checkNotClosed() throws IOException {
synchronized (SpdyStream.this) {
if (closed) {
throw new IOException("stream closed");
} else if (finished) {
throw new IOException("stream finished");
} else if (errorCode != null) {
throw new IOException("stream was reset: " + errorCode);
}
}
/**
* {@code delta} will be negative if a settings frame initial window is
* smaller than the last.
*/
void addBytesToWriteWindow(long delta) {
bytesLeftInWriteWindow += delta;
if (delta > 0) SpdyStream.this.notifyAll();
}
private void checkOutNotClosed() throws IOException {
if (out.closed) {
throw new IOException("stream closed");
} else if (out.finished) {
throw new IOException("stream finished");
} else if (errorCode != null) {
throw new IOException("stream was reset: " + errorCode);
}
}
}

View File

@@ -322,7 +322,8 @@ public final class SpdyConnectionTest {
assertEquals(3368, connection.initialWindowSize);
assertEquals(1684, connection.bytesLeftInWriteWindow); // initial wasn't affected.
assertEquals(1684, stream.windowUpdateThreshold);
// New Stream is has the most recent initial window size.
assertEquals(3368, stream.bytesLeftInWriteWindow);
}
@Test public void unexpectedPingIsNotReturned() throws Exception {
@@ -1177,13 +1178,8 @@ public final class SpdyConnectionTest {
SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true);
OutputStream out = stream.getOutputStream();
out.write(new byte[windowSize]);
interruptAfterDelay(500);
try {
out.write('a');
out.flush();
fail();
} catch (InterruptedIOException expected) {
}
out.write('a');
assertFlushBlocks(out);
// Verify the peer received what was expected.
MockSpdyPeer.InFrame synStream = peer.takeFrame();
@@ -1192,6 +1188,56 @@ public final class SpdyConnectionTest {
assertEquals(TYPE_DATA, data.type);
}
@Test public void initialSettingsWithWindowSizeAdjustsConnection() throws Exception {
int initialWindowSize = 65535;
int framesThatFillWindow = roundUp(initialWindowSize, SpdyStream.OUTPUT_BUFFER_SIZE);
// Write the mocking script. This accepts more data frames than necessary!
peer.acceptFrame(); // SYN_STREAM
for (int i = 0; i < framesThatFillWindow; i++) {
peer.acceptFrame(); // DATA on stream 1
}
peer.acceptFrame(); // DATA on stream 2
peer.play();
// Play it back.
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyStream stream = connection.newStream(headerEntries("a", "apple"), true, true);
OutputStream out = stream.getOutputStream();
out.write(new byte[initialWindowSize]);
out.flush();
// write 1 more than the window size
out.write('a');
assertFlushBlocks(out);
// Check that we've filled the window for both the stream and also the connection.
assertEquals(0, connection.bytesLeftInWriteWindow);
assertEquals(0, connection.getStream(1).bytesLeftInWriteWindow);
// Receiving a Settings with a larger window size will unblock the streams.
Settings initial = new Settings();
initial.set(Settings.INITIAL_WINDOW_SIZE, PERSIST_VALUE, initialWindowSize + 1);
connection.readerRunnable.settings(false, initial);
assertEquals(1, connection.bytesLeftInWriteWindow);
assertEquals(1, connection.getStream(1).bytesLeftInWriteWindow);
// The stream should no longer be blocked.
out.flush();
assertEquals(0, connection.bytesLeftInWriteWindow);
assertEquals(0, connection.getStream(1).bytesLeftInWriteWindow);
// Settings after the initial do not affect the connection window size.
Settings next = new Settings();
next.set(Settings.INITIAL_WINDOW_SIZE, PERSIST_VALUE, initialWindowSize + 2);
connection.readerRunnable.settings(false, next);
assertEquals(0, connection.bytesLeftInWriteWindow); // connection wasn't affected.
assertEquals(1, connection.getStream(1).bytesLeftInWriteWindow);
}
@Test public void testTruncatedDataFrame() throws Exception {
// write the mocking script
peer.acceptFrame(); // SYN_STREAM
@@ -1212,6 +1258,46 @@ public final class SpdyConnectionTest {
}
}
@Test public void blockedStreamDoesntStarveNewStream() throws Exception {
int initialWindowSize = 65535;
int framesThatFillWindow = roundUp(initialWindowSize, SpdyStream.OUTPUT_BUFFER_SIZE);
// Write the mocking script. This accepts more data frames than necessary!
peer.acceptFrame(); // SYN_STREAM
for (int i = 0; i < framesThatFillWindow; i++) {
peer.acceptFrame(); // DATA on stream 1
}
peer.acceptFrame(); // DATA on stream 2
peer.play();
// Play it back.
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
SpdyStream stream1 = connection.newStream(headerEntries("a", "apple"), true, true);
OutputStream out1 = stream1.getOutputStream();
out1.write(new byte[initialWindowSize]);
out1.flush();
// Check that we've filled the window for both the stream and also the connection.
assertEquals(0, connection.bytesLeftInWriteWindow);
assertEquals(0, connection.getStream(1).bytesLeftInWriteWindow);
// receiving a window update on the the connection will unblock new streams.
connection.readerRunnable.windowUpdate(0, 3);
assertEquals(3, connection.bytesLeftInWriteWindow);
assertEquals(0, connection.getStream(1).bytesLeftInWriteWindow);
// Another stream should be able to send data even though 1 is blocked.
SpdyStream stream2 = connection.newStream(headerEntries("b", "banana"), true, true);
OutputStream out2 = stream2.getOutputStream();
out2.write("foo".getBytes(UTF_8));
out2.flush();
assertEquals(0, connection.bytesLeftInWriteWindow);
assertEquals(0, connection.getStream(1).bytesLeftInWriteWindow);
assertEquals(initialWindowSize - 3, connection.getStream(3).bytesLeftInWriteWindow);
}
/**
* This tests that data frames are written in chunks limited by the
* SpdyDataOutputStream buffer size. A side-effect is that this size
@@ -1344,6 +1430,15 @@ public final class SpdyConnectionTest {
assertEquals(expected, actual);
}
private void assertFlushBlocks(OutputStream out) throws IOException {
interruptAfterDelay(500);
try {
out.flush();
fail();
} catch (InterruptedIOException expected) {
}
}
/** Interrupts the current thread after {@code delayMillis}. */
private void interruptAfterDelay(final long delayMillis) {
final Thread toInterrupt = Thread.currentThread();
@@ -1358,4 +1453,8 @@ public final class SpdyConnectionTest {
}
}.start();
}
static int roundUp(int num, int divisor) {
return (num + divisor - 1) / divisor;
}
}

View File

@@ -157,6 +157,25 @@ public abstract class HttpOverSpdyTest {
assertEquals(postBytes.length, Integer.parseInt(request.getHeader("Content-Length")));
}
@Test public void closeAfterFlush() throws Exception {
MockResponse response = new MockResponse().setBody("ABCDE");
server.enqueue(response);
server.play();
HttpURLConnection connection = client.open(server.getUrl("/foo"));
connection.setRequestProperty("Content-Length", String.valueOf(postBytes.length));
connection.setDoOutput(true);
connection.getOutputStream().write(postBytes); // push bytes into SpdyDataOutputStream.buffer
connection.getOutputStream().flush(); // SpdyConnection.writeData subject to write window
connection.getOutputStream().close(); // SpdyConnection.writeData empty frame
assertContent("ABCDE", connection, Integer.MAX_VALUE);
RecordedRequest request = server.takeRequest();
assertEquals("POST /foo HTTP/1.1", request.getRequestLine());
assertArrayEquals(postBytes, request.getBody());
assertEquals(postBytes.length, Integer.parseInt(request.getHeader("Content-Length")));
}
@Test public void setFixedLengthStreamingModeSetsContentLength() throws Exception {
MockResponse response = new MockResponse().setBody("ABCDE");
server.enqueue(response);