1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-27 04:22:07 +03:00

Merge pull request #73 from square/jwilson/open

Discard streams once they're no longer open.
This commit is contained in:
Jake Wharton
2012-12-31 18:03:12 -08:00
3 changed files with 129 additions and 40 deletions

View File

@@ -121,6 +121,14 @@ public final class SpdyConnection implements Closeable {
readExecutor.execute(new Reader());
}
/**
* Returns the number of {@link SpdyStream#isOpen() open streams} on this
* connection.
*/
public synchronized int openStreamCount() {
return streams.size();
}
private synchronized SpdyStream getStream(int id) {
return streams.get(id);
}
@@ -153,7 +161,9 @@ public final class SpdyConnection implements Closeable {
streamId = nextStreamId;
nextStreamId += 2;
stream = new SpdyStream(streamId, this, requestHeaders, flags);
streams.put(streamId, stream);
if (stream.isOpen()) {
streams.put(streamId, stream);
}
}
spdyWriter.synStream(flags, streamId, associatedStreamId, priority, requestHeaders);
@@ -263,6 +273,9 @@ public final class SpdyConnection implements Closeable {
}
private synchronized void close(Throwable reason) throws IOException {
if (reason != null) {
reason.printStackTrace();
}
// TODO: forward 'reason' to forced closed streams?
// TODO: graceful close; send RST frames
// TODO: close all streams to release waiting readers
@@ -324,16 +337,19 @@ public final class SpdyConnection implements Closeable {
@Override public void data(int flags, int streamId, InputStream in, int length)
throws IOException {
SpdyStream dataStream = getStream(streamId);
if (dataStream != null) {
try {
dataStream.receiveData(in, flags, length);
} catch (ProtocolException e) {
Streams.skipByReading(in, length);
dataStream.closeLater(SpdyStream.RST_FLOW_CONTROL_ERROR);
}
} else {
if (dataStream == null) {
writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM);
Streams.skipByReading(in, length);
return;
}
try {
dataStream.receiveData(in, length);
if ((flags & SpdyConnection.FLAG_FIN) != 0) {
dataStream.receiveFin();
}
} catch (ProtocolException e) {
Streams.skipByReading(in, length);
dataStream.closeLater(SpdyStream.RST_FLOW_CONTROL_ERROR);
}
}
@@ -368,15 +384,17 @@ public final class SpdyConnection implements Closeable {
@Override public void synReply(int flags, int streamId, List<String> nameValueBlock)
throws IOException {
SpdyStream replyStream = getStream(streamId);
if (replyStream != null) {
// TODO: honor incoming FLAG_FIN.
try {
replyStream.receiveReply(nameValueBlock);
} catch (ProtocolException e) {
replyStream.closeLater(SpdyStream.RST_PROTOCOL_ERROR);
}
} else {
if (replyStream == null) {
writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM);
return;
}
try {
replyStream.receiveReply(nameValueBlock);
if ((flags & SpdyConnection.FLAG_FIN) != 0) {
replyStream.receiveFin();
}
} catch (ProtocolException e) {
replyStream.closeLater(SpdyStream.RST_PROTOCOL_ERROR);
}
}

View File

@@ -93,6 +93,25 @@ public final class SpdyStream {
}
}
/**
* Returns true if this stream is open. A stream is open until either:
* <ul>
* <li>A {@code SYN_RESET} frame abnormally terminates the stream.
* <li>Both input and output streams have transmitted all data.
* </ul>
* Note that the input stream may continue to yield data even after a stream
* reports itself as not open. This is because input data is buffered.
*/
public synchronized boolean isOpen() {
if (rstStatusCode != -1) {
return false;
}
if ((in.finished || in.closed) && (out.finished || out.closed)) {
return false;
}
return true;
}
/**
* Returns true if this stream was created by this peer.
*/
@@ -227,26 +246,33 @@ public final class SpdyStream {
return true;
}
synchronized void receiveReply(List<String> strings) throws IOException {
if (!isLocallyInitiated() || responseHeaders != null) {
throw new ProtocolException();
void receiveReply(List<String> strings) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
synchronized (this) {
if (!isLocallyInitiated() || responseHeaders != null) {
throw new ProtocolException();
}
responseHeaders = strings;
notifyAll();
}
responseHeaders = strings;
notifyAll();
}
void receiveData(InputStream in, int flags, int length) throws IOException {
void receiveData(InputStream in, int length) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
this.in.receive(in, length);
if ((flags & SpdyConnection.FLAG_FIN) == 0) {
return;
}
}
// This is the last incoming data in the stream.
void receiveFin() {
assert (!Thread.holdsLock(SpdyStream.this));
boolean open;
synchronized (this) {
this.in.finished = true;
open = isOpen();
notifyAll();
}
if (!open) {
connection.removeStream(id);
}
}
synchronized void receiveRstStream(int statusCode) {
@@ -438,15 +464,21 @@ public final class SpdyStream {
private void cancelStreamIfNecessary() throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
boolean open;
boolean cancel;
synchronized (this) {
if (!in.closed || in.finished || (!out.finished && !out.closed)) {
return; // We shouldn't cancel this stream (or don't need to).
}
cancel = !in.finished && in.closed && (out.finished || out.closed);
open = isOpen();
}
if (cancel) {
// RST this stream to prevent additional data from being sent. This
// is safe because the input stream is closed (we won't use any
// further bytes) and the output stream is either finished or closed
// (so RSTing both streams doesn't cause harm).
SpdyStream.this.close(RST_CANCEL);
} else if (!open) {
connection.removeStream(id);
}
// RST this stream to prevent additional data from being sent. This is safe because
// the input stream is closed (we won't use any further bytes) and the output stream
// is either finished or closed (so RSTing both streams doesn't cause harm).
SpdyStream.this.close(RST_CANCEL);
}
/**

View File

@@ -19,6 +19,7 @@ package com.squareup.okhttp.internal.net.spdy;
import static com.squareup.okhttp.internal.Util.UTF_8;
import static com.squareup.okhttp.internal.net.spdy.Settings.PERSIST_VALUE;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.FLAG_FIN;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.FLAG_UNIDIRECTIONAL;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_DATA;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_GOAWAY;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_NOOP;
@@ -65,6 +66,7 @@ public final class SpdyConnectionTest {
assertEquals(Arrays.asList("a", "android"), stream.getResponseHeaders());
assertStreamData("robot", stream.getInputStream());
writeAndClose(stream, "c3po");
assertEquals(0, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
@@ -77,6 +79,38 @@ public final class SpdyConnectionTest {
assertTrue(Arrays.equals("c3po".getBytes("UTF-8"), requestData.data));
}
@Test public void headersOnlyStreamIsClosedImmediately() throws Exception {
peer.acceptFrame(); // SYN STREAM
peer.sendFrame().synReply(0, 1, Arrays.asList("b", "banana"));
peer.play();
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
connection.newStream(Arrays.asList("a", "android"), false, false);
assertEquals(0, connection.openStreamCount());
}
@Test public void clientCreatesStreamAndServerRepliesWithFin() throws Exception {
// write the mocking script
peer.acceptFrame(); // SYN STREAM
peer.acceptFrame(); // PING
peer.sendFrame().synReply(FLAG_FIN, 1, Arrays.asList("a", "android"));
peer.sendFrame().ping(0, 1);
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
connection.newStream(Arrays.asList("b", "banana"), false, true);
assertEquals(1, connection.openStreamCount());
connection.ping().roundTripTime(); // Ensure that the SYN_REPLY has been received.
assertEquals(0, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream.type);
MockSpdyPeer.InFrame ping = peer.takeFrame();
assertEquals(TYPE_PING, ping.type);
}
@Test public void serverCreatesStreamAndClientReplies() throws Exception {
// write the mocking script
peer.sendFrame().synStream(0, 2, 0, 0, Arrays.asList("a", "android"));
@@ -323,10 +357,11 @@ public final class SpdyConnectionTest {
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket())
.handler(REJECT_INCOMING_STREAMS)
.build();
SpdyStream stream = connection.newStream(Arrays.asList("a", "android"), true, true);
SpdyStream stream = connection.newStream(Arrays.asList("a", "android"), true, false);
OutputStream out = stream.getOutputStream();
out.write("square".getBytes(UTF_8));
out.flush();
assertEquals(1, connection.openStreamCount());
out.close();
try {
out.write("round".getBytes(UTF_8));
@@ -334,11 +369,12 @@ public final class SpdyConnectionTest {
} catch (Exception expected) {
assertEquals("stream closed", expected.getMessage());
}
assertEquals(0, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream.type);
assertEquals(0, synStream.flags);
assertEquals(FLAG_UNIDIRECTIONAL, synStream.flags);
MockSpdyPeer.InFrame data = peer.takeFrame();
assertEquals(TYPE_DATA, data.type);
assertEquals(0, data.flags);
@@ -373,6 +409,7 @@ public final class SpdyConnectionTest {
assertEquals("stream was reset: CANCEL", expected.getMessage());
}
out.close();
assertEquals(0, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
@@ -413,12 +450,12 @@ public final class SpdyConnectionTest {
} catch (IOException expected) {
assertEquals("stream finished", expected.getMessage());
}
assertEquals(0, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream.type);
assertEquals(SpdyConnection.FLAG_FIN, synStream.flags);
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(SpdyStream.RST_CANCEL, rstStream.statusCode);
@@ -453,20 +490,18 @@ public final class SpdyConnectionTest {
out.write("square".getBytes(UTF_8));
out.flush();
out.close();
assertEquals(0, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream.type);
assertEquals(0, synStream.flags);
MockSpdyPeer.InFrame data = peer.takeFrame();
assertEquals(TYPE_DATA, data.type);
assertTrue(Arrays.equals("square".getBytes("UTF-8"), data.data));
MockSpdyPeer.InFrame fin = peer.takeFrame();
assertEquals(TYPE_DATA, fin.type);
assertEquals(FLAG_FIN, fin.flags);
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(SpdyStream.RST_CANCEL, rstStream.statusCode);
@@ -485,6 +520,7 @@ public final class SpdyConnectionTest {
SpdyStream stream = connection.newStream(Arrays.asList("a", "android"), false, true);
InputStream in = stream.getInputStream();
assertStreamData("square", in);
assertEquals(0, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
@@ -629,6 +665,7 @@ public final class SpdyConnectionTest {
} catch (IOException expected) {
assertEquals("stream was reset: REFUSED_STREAM", expected.getMessage());
}
assertEquals(0, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
@@ -669,6 +706,7 @@ public final class SpdyConnectionTest {
} catch (IOException expected) {
assertEquals("shutdown", expected.getMessage());
}
assertEquals(1, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream1 = peer.takeFrame();
@@ -697,6 +735,7 @@ public final class SpdyConnectionTest {
connection.newStream(Arrays.asList("a", "android"), true, true);
connection.shutdown();
connection.ping().roundTripTime(); // Ensure that the SYN STREAM has been received.
assertEquals(1, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream1 = peer.takeFrame();