1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-27 04:22:07 +03:00

Discard streams once they're no longer open.

This avoids a memory leak; we don't hold onto the input stream
and it's large 64 KiB buffer long after that stream is done.

It also allows SpdyConnection to track how many streams are
currently active. This will allow a follow up change to shut
down connections that don't host any streams.
This commit is contained in:
jwilson
2012-12-31 18:51:09 -07:00
parent a52eaa4ee6
commit 7f42496259
3 changed files with 129 additions and 40 deletions

View File

@@ -121,6 +121,14 @@ public final class SpdyConnection implements Closeable {
readExecutor.execute(new Reader());
}
/**
* Returns the number of {@link SpdyStream#isOpen() open streams} on this
* connection.
*/
public synchronized int openStreamCount() {
return streams.size();
}
private synchronized SpdyStream getStream(int id) {
return streams.get(id);
}
@@ -153,7 +161,9 @@ public final class SpdyConnection implements Closeable {
streamId = nextStreamId;
nextStreamId += 2;
stream = new SpdyStream(streamId, this, requestHeaders, flags);
streams.put(streamId, stream);
if (stream.isOpen()) {
streams.put(streamId, stream);
}
}
spdyWriter.synStream(flags, streamId, associatedStreamId, priority, requestHeaders);
@@ -263,6 +273,9 @@ public final class SpdyConnection implements Closeable {
}
private synchronized void close(Throwable reason) throws IOException {
if (reason != null) {
reason.printStackTrace();
}
// TODO: forward 'reason' to forced closed streams?
// TODO: graceful close; send RST frames
// TODO: close all streams to release waiting readers
@@ -324,16 +337,19 @@ public final class SpdyConnection implements Closeable {
@Override public void data(int flags, int streamId, InputStream in, int length)
throws IOException {
SpdyStream dataStream = getStream(streamId);
if (dataStream != null) {
try {
dataStream.receiveData(in, flags, length);
} catch (ProtocolException e) {
Streams.skipByReading(in, length);
dataStream.closeLater(SpdyStream.RST_FLOW_CONTROL_ERROR);
}
} else {
if (dataStream == null) {
writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM);
Streams.skipByReading(in, length);
return;
}
try {
dataStream.receiveData(in, length);
if ((flags & SpdyConnection.FLAG_FIN) != 0) {
dataStream.receiveFin();
}
} catch (ProtocolException e) {
Streams.skipByReading(in, length);
dataStream.closeLater(SpdyStream.RST_FLOW_CONTROL_ERROR);
}
}
@@ -368,15 +384,17 @@ public final class SpdyConnection implements Closeable {
@Override public void synReply(int flags, int streamId, List<String> nameValueBlock)
throws IOException {
SpdyStream replyStream = getStream(streamId);
if (replyStream != null) {
// TODO: honor incoming FLAG_FIN.
try {
replyStream.receiveReply(nameValueBlock);
} catch (ProtocolException e) {
replyStream.closeLater(SpdyStream.RST_PROTOCOL_ERROR);
}
} else {
if (replyStream == null) {
writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM);
return;
}
try {
replyStream.receiveReply(nameValueBlock);
if ((flags & SpdyConnection.FLAG_FIN) != 0) {
replyStream.receiveFin();
}
} catch (ProtocolException e) {
replyStream.closeLater(SpdyStream.RST_PROTOCOL_ERROR);
}
}

View File

@@ -93,6 +93,25 @@ public final class SpdyStream {
}
}
/**
* Returns true if this stream is open. A stream is open until either:
* <ul>
* <li>A {@code SYN_RESET} frame abnormally terminates the stream.
* <li>Both input and output streams have transmitted all data.
* </ul>
* Note that the input stream may continue to yield data even after a stream
* reports itself as not open. This is because input data is buffered.
*/
public synchronized boolean isOpen() {
if (rstStatusCode != -1) {
return false;
}
if ((in.finished || in.closed) && (out.finished || out.closed)) {
return false;
}
return true;
}
/**
* Returns true if this stream was created by this peer.
*/
@@ -227,26 +246,33 @@ public final class SpdyStream {
return true;
}
synchronized void receiveReply(List<String> strings) throws IOException {
if (!isLocallyInitiated() || responseHeaders != null) {
throw new ProtocolException();
void receiveReply(List<String> strings) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
synchronized (this) {
if (!isLocallyInitiated() || responseHeaders != null) {
throw new ProtocolException();
}
responseHeaders = strings;
notifyAll();
}
responseHeaders = strings;
notifyAll();
}
void receiveData(InputStream in, int flags, int length) throws IOException {
void receiveData(InputStream in, int length) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
this.in.receive(in, length);
if ((flags & SpdyConnection.FLAG_FIN) == 0) {
return;
}
}
// This is the last incoming data in the stream.
void receiveFin() {
assert (!Thread.holdsLock(SpdyStream.this));
boolean open;
synchronized (this) {
this.in.finished = true;
open = isOpen();
notifyAll();
}
if (!open) {
connection.removeStream(id);
}
}
synchronized void receiveRstStream(int statusCode) {
@@ -438,15 +464,21 @@ public final class SpdyStream {
private void cancelStreamIfNecessary() throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
boolean open;
boolean cancel;
synchronized (this) {
if (!in.closed || in.finished || (!out.finished && !out.closed)) {
return; // We shouldn't cancel this stream (or don't need to).
}
cancel = !in.finished && in.closed && (out.finished || out.closed);
open = isOpen();
}
if (cancel) {
// RST this stream to prevent additional data from being sent. This
// is safe because the input stream is closed (we won't use any
// further bytes) and the output stream is either finished or closed
// (so RSTing both streams doesn't cause harm).
SpdyStream.this.close(RST_CANCEL);
} else if (!open) {
connection.removeStream(id);
}
// RST this stream to prevent additional data from being sent. This is safe because
// the input stream is closed (we won't use any further bytes) and the output stream
// is either finished or closed (so RSTing both streams doesn't cause harm).
SpdyStream.this.close(RST_CANCEL);
}
/**

View File

@@ -19,6 +19,7 @@ package com.squareup.okhttp.internal.net.spdy;
import static com.squareup.okhttp.internal.Util.UTF_8;
import static com.squareup.okhttp.internal.net.spdy.Settings.PERSIST_VALUE;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.FLAG_FIN;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.FLAG_UNIDIRECTIONAL;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_DATA;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_GOAWAY;
import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_NOOP;
@@ -65,6 +66,7 @@ public final class SpdyConnectionTest {
assertEquals(Arrays.asList("a", "android"), stream.getResponseHeaders());
assertStreamData("robot", stream.getInputStream());
writeAndClose(stream, "c3po");
assertEquals(0, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
@@ -77,6 +79,38 @@ public final class SpdyConnectionTest {
assertTrue(Arrays.equals("c3po".getBytes("UTF-8"), requestData.data));
}
@Test public void headersOnlyStreamIsClosedImmediately() throws Exception {
peer.acceptFrame(); // SYN STREAM
peer.sendFrame().synReply(0, 1, Arrays.asList("b", "banana"));
peer.play();
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
connection.newStream(Arrays.asList("a", "android"), false, false);
assertEquals(0, connection.openStreamCount());
}
@Test public void clientCreatesStreamAndServerRepliesWithFin() throws Exception {
// write the mocking script
peer.acceptFrame(); // SYN STREAM
peer.acceptFrame(); // PING
peer.sendFrame().synReply(FLAG_FIN, 1, Arrays.asList("a", "android"));
peer.sendFrame().ping(0, 1);
peer.play();
// play it back
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build();
connection.newStream(Arrays.asList("b", "banana"), false, true);
assertEquals(1, connection.openStreamCount());
connection.ping().roundTripTime(); // Ensure that the SYN_REPLY has been received.
assertEquals(0, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream.type);
MockSpdyPeer.InFrame ping = peer.takeFrame();
assertEquals(TYPE_PING, ping.type);
}
@Test public void serverCreatesStreamAndClientReplies() throws Exception {
// write the mocking script
peer.sendFrame().synStream(0, 2, 0, 0, Arrays.asList("a", "android"));
@@ -323,10 +357,11 @@ public final class SpdyConnectionTest {
SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket())
.handler(REJECT_INCOMING_STREAMS)
.build();
SpdyStream stream = connection.newStream(Arrays.asList("a", "android"), true, true);
SpdyStream stream = connection.newStream(Arrays.asList("a", "android"), true, false);
OutputStream out = stream.getOutputStream();
out.write("square".getBytes(UTF_8));
out.flush();
assertEquals(1, connection.openStreamCount());
out.close();
try {
out.write("round".getBytes(UTF_8));
@@ -334,11 +369,12 @@ public final class SpdyConnectionTest {
} catch (Exception expected) {
assertEquals("stream closed", expected.getMessage());
}
assertEquals(0, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream.type);
assertEquals(0, synStream.flags);
assertEquals(FLAG_UNIDIRECTIONAL, synStream.flags);
MockSpdyPeer.InFrame data = peer.takeFrame();
assertEquals(TYPE_DATA, data.type);
assertEquals(0, data.flags);
@@ -373,6 +409,7 @@ public final class SpdyConnectionTest {
assertEquals("stream was reset: CANCEL", expected.getMessage());
}
out.close();
assertEquals(0, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
@@ -413,12 +450,12 @@ public final class SpdyConnectionTest {
} catch (IOException expected) {
assertEquals("stream finished", expected.getMessage());
}
assertEquals(0, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream.type);
assertEquals(SpdyConnection.FLAG_FIN, synStream.flags);
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(SpdyStream.RST_CANCEL, rstStream.statusCode);
@@ -453,20 +490,18 @@ public final class SpdyConnectionTest {
out.write("square".getBytes(UTF_8));
out.flush();
out.close();
assertEquals(0, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
assertEquals(TYPE_SYN_STREAM, synStream.type);
assertEquals(0, synStream.flags);
MockSpdyPeer.InFrame data = peer.takeFrame();
assertEquals(TYPE_DATA, data.type);
assertTrue(Arrays.equals("square".getBytes("UTF-8"), data.data));
MockSpdyPeer.InFrame fin = peer.takeFrame();
assertEquals(TYPE_DATA, fin.type);
assertEquals(FLAG_FIN, fin.flags);
MockSpdyPeer.InFrame rstStream = peer.takeFrame();
assertEquals(TYPE_RST_STREAM, rstStream.type);
assertEquals(SpdyStream.RST_CANCEL, rstStream.statusCode);
@@ -485,6 +520,7 @@ public final class SpdyConnectionTest {
SpdyStream stream = connection.newStream(Arrays.asList("a", "android"), false, true);
InputStream in = stream.getInputStream();
assertStreamData("square", in);
assertEquals(0, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
@@ -629,6 +665,7 @@ public final class SpdyConnectionTest {
} catch (IOException expected) {
assertEquals("stream was reset: REFUSED_STREAM", expected.getMessage());
}
assertEquals(0, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream = peer.takeFrame();
@@ -669,6 +706,7 @@ public final class SpdyConnectionTest {
} catch (IOException expected) {
assertEquals("shutdown", expected.getMessage());
}
assertEquals(1, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream1 = peer.takeFrame();
@@ -697,6 +735,7 @@ public final class SpdyConnectionTest {
connection.newStream(Arrays.asList("a", "android"), true, true);
connection.shutdown();
connection.ping().roundTripTime(); // Ensure that the SYN STREAM has been received.
assertEquals(1, connection.openStreamCount());
// verify the peer received what was expected
MockSpdyPeer.InFrame synStream1 = peer.takeFrame();