diff --git a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java index b2e0a3235..568c93a7c 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java +++ b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java @@ -121,6 +121,14 @@ public final class SpdyConnection implements Closeable { readExecutor.execute(new Reader()); } + /** + * Returns the number of {@link SpdyStream#isOpen() open streams} on this + * connection. + */ + public synchronized int openStreamCount() { + return streams.size(); + } + private synchronized SpdyStream getStream(int id) { return streams.get(id); } @@ -153,7 +161,9 @@ public final class SpdyConnection implements Closeable { streamId = nextStreamId; nextStreamId += 2; stream = new SpdyStream(streamId, this, requestHeaders, flags); - streams.put(streamId, stream); + if (stream.isOpen()) { + streams.put(streamId, stream); + } } spdyWriter.synStream(flags, streamId, associatedStreamId, priority, requestHeaders); @@ -263,6 +273,9 @@ public final class SpdyConnection implements Closeable { } private synchronized void close(Throwable reason) throws IOException { + if (reason != null) { + reason.printStackTrace(); + } // TODO: forward 'reason' to forced closed streams? // TODO: graceful close; send RST frames // TODO: close all streams to release waiting readers @@ -324,16 +337,19 @@ public final class SpdyConnection implements Closeable { @Override public void data(int flags, int streamId, InputStream in, int length) throws IOException { SpdyStream dataStream = getStream(streamId); - if (dataStream != null) { - try { - dataStream.receiveData(in, flags, length); - } catch (ProtocolException e) { - Streams.skipByReading(in, length); - dataStream.closeLater(SpdyStream.RST_FLOW_CONTROL_ERROR); - } - } else { + if (dataStream == null) { writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM); Streams.skipByReading(in, length); + return; + } + try { + dataStream.receiveData(in, length); + if ((flags & SpdyConnection.FLAG_FIN) != 0) { + dataStream.receiveFin(); + } + } catch (ProtocolException e) { + Streams.skipByReading(in, length); + dataStream.closeLater(SpdyStream.RST_FLOW_CONTROL_ERROR); } } @@ -368,15 +384,17 @@ public final class SpdyConnection implements Closeable { @Override public void synReply(int flags, int streamId, List nameValueBlock) throws IOException { SpdyStream replyStream = getStream(streamId); - if (replyStream != null) { - // TODO: honor incoming FLAG_FIN. - try { - replyStream.receiveReply(nameValueBlock); - } catch (ProtocolException e) { - replyStream.closeLater(SpdyStream.RST_PROTOCOL_ERROR); - } - } else { + if (replyStream == null) { writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM); + return; + } + try { + replyStream.receiveReply(nameValueBlock); + if ((flags & SpdyConnection.FLAG_FIN) != 0) { + replyStream.receiveFin(); + } + } catch (ProtocolException e) { + replyStream.closeLater(SpdyStream.RST_PROTOCOL_ERROR); } } diff --git a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java index 0bc9645f7..7614c6de4 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java +++ b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java @@ -93,6 +93,25 @@ public final class SpdyStream { } } + /** + * Returns true if this stream is open. A stream is open until either: + * + * Note that the input stream may continue to yield data even after a stream + * reports itself as not open. This is because input data is buffered. + */ + public synchronized boolean isOpen() { + if (rstStatusCode != -1) { + return false; + } + if ((in.finished || in.closed) && (out.finished || out.closed)) { + return false; + } + return true; + } + /** * Returns true if this stream was created by this peer. */ @@ -227,26 +246,33 @@ public final class SpdyStream { return true; } - synchronized void receiveReply(List strings) throws IOException { - if (!isLocallyInitiated() || responseHeaders != null) { - throw new ProtocolException(); + void receiveReply(List strings) throws IOException { + assert (!Thread.holdsLock(SpdyStream.this)); + synchronized (this) { + if (!isLocallyInitiated() || responseHeaders != null) { + throw new ProtocolException(); + } + responseHeaders = strings; + notifyAll(); } - responseHeaders = strings; - notifyAll(); } - void receiveData(InputStream in, int flags, int length) throws IOException { + void receiveData(InputStream in, int length) throws IOException { assert (!Thread.holdsLock(SpdyStream.this)); this.in.receive(in, length); - if ((flags & SpdyConnection.FLAG_FIN) == 0) { - return; - } + } - // This is the last incoming data in the stream. + void receiveFin() { + assert (!Thread.holdsLock(SpdyStream.this)); + boolean open; synchronized (this) { this.in.finished = true; + open = isOpen(); notifyAll(); } + if (!open) { + connection.removeStream(id); + } } synchronized void receiveRstStream(int statusCode) { @@ -438,15 +464,21 @@ public final class SpdyStream { private void cancelStreamIfNecessary() throws IOException { assert (!Thread.holdsLock(SpdyStream.this)); + boolean open; + boolean cancel; synchronized (this) { - if (!in.closed || in.finished || (!out.finished && !out.closed)) { - return; // We shouldn't cancel this stream (or don't need to). - } + cancel = !in.finished && in.closed && (out.finished || out.closed); + open = isOpen(); + } + if (cancel) { + // RST this stream to prevent additional data from being sent. This + // is safe because the input stream is closed (we won't use any + // further bytes) and the output stream is either finished or closed + // (so RSTing both streams doesn't cause harm). + SpdyStream.this.close(RST_CANCEL); + } else if (!open) { + connection.removeStream(id); } - // RST this stream to prevent additional data from being sent. This is safe because - // the input stream is closed (we won't use any further bytes) and the output stream - // is either finished or closed (so RSTing both streams doesn't cause harm). - SpdyStream.this.close(RST_CANCEL); } /** diff --git a/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java b/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java index 333b1c7ad..c8917b171 100644 --- a/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java +++ b/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java @@ -19,6 +19,7 @@ package com.squareup.okhttp.internal.net.spdy; import static com.squareup.okhttp.internal.Util.UTF_8; import static com.squareup.okhttp.internal.net.spdy.Settings.PERSIST_VALUE; import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.FLAG_FIN; +import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.FLAG_UNIDIRECTIONAL; import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_DATA; import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_GOAWAY; import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_NOOP; @@ -65,6 +66,7 @@ public final class SpdyConnectionTest { assertEquals(Arrays.asList("a", "android"), stream.getResponseHeaders()); assertStreamData("robot", stream.getInputStream()); writeAndClose(stream, "c3po"); + assertEquals(0, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream = peer.takeFrame(); @@ -77,6 +79,38 @@ public final class SpdyConnectionTest { assertTrue(Arrays.equals("c3po".getBytes("UTF-8"), requestData.data)); } + @Test public void headersOnlyStreamIsClosedImmediately() throws Exception { + peer.acceptFrame(); // SYN STREAM + peer.sendFrame().synReply(0, 1, Arrays.asList("b", "banana")); + peer.play(); + + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + connection.newStream(Arrays.asList("a", "android"), false, false); + assertEquals(0, connection.openStreamCount()); + } + + @Test public void clientCreatesStreamAndServerRepliesWithFin() throws Exception { + // write the mocking script + peer.acceptFrame(); // SYN STREAM + peer.acceptFrame(); // PING + peer.sendFrame().synReply(FLAG_FIN, 1, Arrays.asList("a", "android")); + peer.sendFrame().ping(0, 1); + peer.play(); + + // play it back + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + connection.newStream(Arrays.asList("b", "banana"), false, true); + assertEquals(1, connection.openStreamCount()); + connection.ping().roundTripTime(); // Ensure that the SYN_REPLY has been received. + assertEquals(0, connection.openStreamCount()); + + // verify the peer received what was expected + MockSpdyPeer.InFrame synStream = peer.takeFrame(); + assertEquals(TYPE_SYN_STREAM, synStream.type); + MockSpdyPeer.InFrame ping = peer.takeFrame(); + assertEquals(TYPE_PING, ping.type); + } + @Test public void serverCreatesStreamAndClientReplies() throws Exception { // write the mocking script peer.sendFrame().synStream(0, 2, 0, 0, Arrays.asList("a", "android")); @@ -323,10 +357,11 @@ public final class SpdyConnectionTest { SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()) .handler(REJECT_INCOMING_STREAMS) .build(); - SpdyStream stream = connection.newStream(Arrays.asList("a", "android"), true, true); + SpdyStream stream = connection.newStream(Arrays.asList("a", "android"), true, false); OutputStream out = stream.getOutputStream(); out.write("square".getBytes(UTF_8)); out.flush(); + assertEquals(1, connection.openStreamCount()); out.close(); try { out.write("round".getBytes(UTF_8)); @@ -334,11 +369,12 @@ public final class SpdyConnectionTest { } catch (Exception expected) { assertEquals("stream closed", expected.getMessage()); } + assertEquals(0, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream = peer.takeFrame(); assertEquals(TYPE_SYN_STREAM, synStream.type); - assertEquals(0, synStream.flags); + assertEquals(FLAG_UNIDIRECTIONAL, synStream.flags); MockSpdyPeer.InFrame data = peer.takeFrame(); assertEquals(TYPE_DATA, data.type); assertEquals(0, data.flags); @@ -373,6 +409,7 @@ public final class SpdyConnectionTest { assertEquals("stream was reset: CANCEL", expected.getMessage()); } out.close(); + assertEquals(0, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream = peer.takeFrame(); @@ -413,12 +450,12 @@ public final class SpdyConnectionTest { } catch (IOException expected) { assertEquals("stream finished", expected.getMessage()); } + assertEquals(0, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream = peer.takeFrame(); assertEquals(TYPE_SYN_STREAM, synStream.type); assertEquals(SpdyConnection.FLAG_FIN, synStream.flags); - MockSpdyPeer.InFrame rstStream = peer.takeFrame(); assertEquals(TYPE_RST_STREAM, rstStream.type); assertEquals(SpdyStream.RST_CANCEL, rstStream.statusCode); @@ -453,20 +490,18 @@ public final class SpdyConnectionTest { out.write("square".getBytes(UTF_8)); out.flush(); out.close(); + assertEquals(0, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream = peer.takeFrame(); assertEquals(TYPE_SYN_STREAM, synStream.type); assertEquals(0, synStream.flags); - MockSpdyPeer.InFrame data = peer.takeFrame(); assertEquals(TYPE_DATA, data.type); assertTrue(Arrays.equals("square".getBytes("UTF-8"), data.data)); - MockSpdyPeer.InFrame fin = peer.takeFrame(); assertEquals(TYPE_DATA, fin.type); assertEquals(FLAG_FIN, fin.flags); - MockSpdyPeer.InFrame rstStream = peer.takeFrame(); assertEquals(TYPE_RST_STREAM, rstStream.type); assertEquals(SpdyStream.RST_CANCEL, rstStream.statusCode); @@ -485,6 +520,7 @@ public final class SpdyConnectionTest { SpdyStream stream = connection.newStream(Arrays.asList("a", "android"), false, true); InputStream in = stream.getInputStream(); assertStreamData("square", in); + assertEquals(0, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream = peer.takeFrame(); @@ -629,6 +665,7 @@ public final class SpdyConnectionTest { } catch (IOException expected) { assertEquals("stream was reset: REFUSED_STREAM", expected.getMessage()); } + assertEquals(0, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream = peer.takeFrame(); @@ -669,6 +706,7 @@ public final class SpdyConnectionTest { } catch (IOException expected) { assertEquals("shutdown", expected.getMessage()); } + assertEquals(1, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream1 = peer.takeFrame(); @@ -697,6 +735,7 @@ public final class SpdyConnectionTest { connection.newStream(Arrays.asList("a", "android"), true, true); connection.shutdown(); connection.ping().roundTripTime(); // Ensure that the SYN STREAM has been received. + assertEquals(1, connection.openStreamCount()); // verify the peer received what was expected MockSpdyPeer.InFrame synStream1 = peer.takeFrame();