From 7f4249625960b7f10dc61aae4d1896c2e126f538 Mon Sep 17 00:00:00 2001 From: jwilson Date: Mon, 31 Dec 2012 18:51:09 -0700 Subject: [PATCH] Discard streams once they're no longer open. This avoids a memory leak; we don't hold onto the input stream and it's large 64 KiB buffer long after that stream is done. It also allows SpdyConnection to track how many streams are currently active. This will allow a follow up change to shut down connections that don't host any streams. --- .../internal/net/spdy/SpdyConnection.java | 52 ++++++++++----- .../okhttp/internal/net/spdy/SpdyStream.java | 66 ++++++++++++++----- .../internal/net/spdy/SpdyConnectionTest.java | 51 ++++++++++++-- 3 files changed, 129 insertions(+), 40 deletions(-) diff --git a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java index b2e0a3235..568c93a7c 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java +++ b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java @@ -121,6 +121,14 @@ public final class SpdyConnection implements Closeable { readExecutor.execute(new Reader()); } + /** + * Returns the number of {@link SpdyStream#isOpen() open streams} on this + * connection. + */ + public synchronized int openStreamCount() { + return streams.size(); + } + private synchronized SpdyStream getStream(int id) { return streams.get(id); } @@ -153,7 +161,9 @@ public final class SpdyConnection implements Closeable { streamId = nextStreamId; nextStreamId += 2; stream = new SpdyStream(streamId, this, requestHeaders, flags); - streams.put(streamId, stream); + if (stream.isOpen()) { + streams.put(streamId, stream); + } } spdyWriter.synStream(flags, streamId, associatedStreamId, priority, requestHeaders); @@ -263,6 +273,9 @@ public final class SpdyConnection implements Closeable { } private synchronized void close(Throwable reason) throws IOException { + if (reason != null) { + reason.printStackTrace(); + } // TODO: forward 'reason' to forced closed streams? // TODO: graceful close; send RST frames // TODO: close all streams to release waiting readers @@ -324,16 +337,19 @@ public final class SpdyConnection implements Closeable { @Override public void data(int flags, int streamId, InputStream in, int length) throws IOException { SpdyStream dataStream = getStream(streamId); - if (dataStream != null) { - try { - dataStream.receiveData(in, flags, length); - } catch (ProtocolException e) { - Streams.skipByReading(in, length); - dataStream.closeLater(SpdyStream.RST_FLOW_CONTROL_ERROR); - } - } else { + if (dataStream == null) { writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM); Streams.skipByReading(in, length); + return; + } + try { + dataStream.receiveData(in, length); + if ((flags & SpdyConnection.FLAG_FIN) != 0) { + dataStream.receiveFin(); + } + } catch (ProtocolException e) { + Streams.skipByReading(in, length); + dataStream.closeLater(SpdyStream.RST_FLOW_CONTROL_ERROR); } } @@ -368,15 +384,17 @@ public final class SpdyConnection implements Closeable { @Override public void synReply(int flags, int streamId, List nameValueBlock) throws IOException { SpdyStream replyStream = getStream(streamId); - if (replyStream != null) { - // TODO: honor incoming FLAG_FIN. - try { - replyStream.receiveReply(nameValueBlock); - } catch (ProtocolException e) { - replyStream.closeLater(SpdyStream.RST_PROTOCOL_ERROR); - } - } else { + if (replyStream == null) { writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM); + return; + } + try { + replyStream.receiveReply(nameValueBlock); + if ((flags & SpdyConnection.FLAG_FIN) != 0) { + replyStream.receiveFin(); + } + } catch (ProtocolException e) { + replyStream.closeLater(SpdyStream.RST_PROTOCOL_ERROR); } } diff --git a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java index 0bc9645f7..7614c6de4 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java +++ b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java @@ -93,6 +93,25 @@ public final class SpdyStream { } } + /** + * Returns true if this stream is open. A stream is open until either: + * + * Note that the input stream may continue to yield data even after a stream + * reports itself as not open. This is because input data is buffered. + */ + public synchronized boolean isOpen() { + if (rstStatusCode != -1) { + return false; + } + if ((in.finished || in.closed) && (out.finished || out.closed)) { + return false; + } + return true; + } + /** * Returns true if this stream was created by this peer. */ @@ -227,26 +246,33 @@ public final class SpdyStream { return true; } - synchronized void receiveReply(List strings) throws IOException { - if (!isLocallyInitiated() || responseHeaders != null) { - throw new ProtocolException(); + void receiveReply(List strings) throws IOException { + assert (!Thread.holdsLock(SpdyStream.this)); + synchronized (this) { + if (!isLocallyInitiated() || responseHeaders != null) { + throw new ProtocolException(); + } + responseHeaders = strings; + notifyAll(); } - responseHeaders = strings; - notifyAll(); } - void receiveData(InputStream in, int flags, int length) throws IOException { + void receiveData(InputStream in, int length) throws IOException { assert (!Thread.holdsLock(SpdyStream.this)); this.in.receive(in, length); - if ((flags & SpdyConnection.FLAG_FIN) == 0) { - return; - } + } - // This is the last incoming data in the stream. + void receiveFin() { + assert (!Thread.holdsLock(SpdyStream.this)); + boolean open; synchronized (this) { this.in.finished = true; + open = isOpen(); notifyAll(); } + if (!open) { + connection.removeStream(id); + } } synchronized void receiveRstStream(int statusCode) { @@ -438,15 +464,21 @@ public final class SpdyStream { private void cancelStreamIfNecessary() throws IOException { assert (!Thread.holdsLock(SpdyStream.this)); + boolean open; + boolean cancel; synchronized (this) { - if (!in.closed || in.finished || (!out.finished && !out.closed)) { - return; // We shouldn't cancel this stream (or don't need to). - } + cancel = !in.finished && in.closed && (out.finished || out.closed); + open = isOpen(); + } + if (cancel) { + // RST this stream to prevent additional data from being sent. This + // is safe because the input stream is closed (we won't use any + // further bytes) and the output stream is either finished or closed + // (so RSTing both streams doesn't cause harm). + SpdyStream.this.close(RST_CANCEL); + } else if (!open) { + connection.removeStream(id); } - // RST this stream to prevent additional data from being sent. This is safe because - // the input stream is closed (we won't use any further bytes) and the output stream - // is either finished or closed (so RSTing both streams doesn't cause harm). - SpdyStream.this.close(RST_CANCEL); } /** diff --git a/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java b/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java index 333b1c7ad..c8917b171 100644 --- a/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java +++ b/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java @@ -19,6 +19,7 @@ package com.squareup.okhttp.internal.net.spdy; import static com.squareup.okhttp.internal.Util.UTF_8; import static com.squareup.okhttp.internal.net.spdy.Settings.PERSIST_VALUE; import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.FLAG_FIN; +import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.FLAG_UNIDIRECTIONAL; import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_DATA; import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_GOAWAY; import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_NOOP; @@ -65,6 +66,7 @@ public final class SpdyConnectionTest { assertEquals(Arrays.asList("a", "android"), stream.getResponseHeaders()); assertStreamData("robot", stream.getInputStream()); writeAndClose(stream, "c3po"); + assertEquals(0, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream = peer.takeFrame(); @@ -77,6 +79,38 @@ public final class SpdyConnectionTest { assertTrue(Arrays.equals("c3po".getBytes("UTF-8"), requestData.data)); } + @Test public void headersOnlyStreamIsClosedImmediately() throws Exception { + peer.acceptFrame(); // SYN STREAM + peer.sendFrame().synReply(0, 1, Arrays.asList("b", "banana")); + peer.play(); + + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + connection.newStream(Arrays.asList("a", "android"), false, false); + assertEquals(0, connection.openStreamCount()); + } + + @Test public void clientCreatesStreamAndServerRepliesWithFin() throws Exception { + // write the mocking script + peer.acceptFrame(); // SYN STREAM + peer.acceptFrame(); // PING + peer.sendFrame().synReply(FLAG_FIN, 1, Arrays.asList("a", "android")); + peer.sendFrame().ping(0, 1); + peer.play(); + + // play it back + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + connection.newStream(Arrays.asList("b", "banana"), false, true); + assertEquals(1, connection.openStreamCount()); + connection.ping().roundTripTime(); // Ensure that the SYN_REPLY has been received. + assertEquals(0, connection.openStreamCount()); + + // verify the peer received what was expected + MockSpdyPeer.InFrame synStream = peer.takeFrame(); + assertEquals(TYPE_SYN_STREAM, synStream.type); + MockSpdyPeer.InFrame ping = peer.takeFrame(); + assertEquals(TYPE_PING, ping.type); + } + @Test public void serverCreatesStreamAndClientReplies() throws Exception { // write the mocking script peer.sendFrame().synStream(0, 2, 0, 0, Arrays.asList("a", "android")); @@ -323,10 +357,11 @@ public final class SpdyConnectionTest { SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()) .handler(REJECT_INCOMING_STREAMS) .build(); - SpdyStream stream = connection.newStream(Arrays.asList("a", "android"), true, true); + SpdyStream stream = connection.newStream(Arrays.asList("a", "android"), true, false); OutputStream out = stream.getOutputStream(); out.write("square".getBytes(UTF_8)); out.flush(); + assertEquals(1, connection.openStreamCount()); out.close(); try { out.write("round".getBytes(UTF_8)); @@ -334,11 +369,12 @@ public final class SpdyConnectionTest { } catch (Exception expected) { assertEquals("stream closed", expected.getMessage()); } + assertEquals(0, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream = peer.takeFrame(); assertEquals(TYPE_SYN_STREAM, synStream.type); - assertEquals(0, synStream.flags); + assertEquals(FLAG_UNIDIRECTIONAL, synStream.flags); MockSpdyPeer.InFrame data = peer.takeFrame(); assertEquals(TYPE_DATA, data.type); assertEquals(0, data.flags); @@ -373,6 +409,7 @@ public final class SpdyConnectionTest { assertEquals("stream was reset: CANCEL", expected.getMessage()); } out.close(); + assertEquals(0, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream = peer.takeFrame(); @@ -413,12 +450,12 @@ public final class SpdyConnectionTest { } catch (IOException expected) { assertEquals("stream finished", expected.getMessage()); } + assertEquals(0, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream = peer.takeFrame(); assertEquals(TYPE_SYN_STREAM, synStream.type); assertEquals(SpdyConnection.FLAG_FIN, synStream.flags); - MockSpdyPeer.InFrame rstStream = peer.takeFrame(); assertEquals(TYPE_RST_STREAM, rstStream.type); assertEquals(SpdyStream.RST_CANCEL, rstStream.statusCode); @@ -453,20 +490,18 @@ public final class SpdyConnectionTest { out.write("square".getBytes(UTF_8)); out.flush(); out.close(); + assertEquals(0, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream = peer.takeFrame(); assertEquals(TYPE_SYN_STREAM, synStream.type); assertEquals(0, synStream.flags); - MockSpdyPeer.InFrame data = peer.takeFrame(); assertEquals(TYPE_DATA, data.type); assertTrue(Arrays.equals("square".getBytes("UTF-8"), data.data)); - MockSpdyPeer.InFrame fin = peer.takeFrame(); assertEquals(TYPE_DATA, fin.type); assertEquals(FLAG_FIN, fin.flags); - MockSpdyPeer.InFrame rstStream = peer.takeFrame(); assertEquals(TYPE_RST_STREAM, rstStream.type); assertEquals(SpdyStream.RST_CANCEL, rstStream.statusCode); @@ -485,6 +520,7 @@ public final class SpdyConnectionTest { SpdyStream stream = connection.newStream(Arrays.asList("a", "android"), false, true); InputStream in = stream.getInputStream(); assertStreamData("square", in); + assertEquals(0, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream = peer.takeFrame(); @@ -629,6 +665,7 @@ public final class SpdyConnectionTest { } catch (IOException expected) { assertEquals("stream was reset: REFUSED_STREAM", expected.getMessage()); } + assertEquals(0, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream = peer.takeFrame(); @@ -669,6 +706,7 @@ public final class SpdyConnectionTest { } catch (IOException expected) { assertEquals("shutdown", expected.getMessage()); } + assertEquals(1, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream1 = peer.takeFrame(); @@ -697,6 +735,7 @@ public final class SpdyConnectionTest { connection.newStream(Arrays.asList("a", "android"), true, true); connection.shutdown(); connection.ping().roundTripTime(); // Ensure that the SYN STREAM has been received. + assertEquals(1, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream1 = peer.takeFrame();