From 68ebb704fb73ad1545ae82287f8b2a5f68d8e69d Mon Sep 17 00:00:00 2001 From: Jesse Wilson Date: Tue, 25 Sep 2012 17:32:28 -0400 Subject: [PATCH] Address some todos around stream failures and peer errors. --- .../java/libcore/net/spdy/SpdyConnection.java | 72 ++++++----- .../java/libcore/net/spdy/SpdyReader.java | 4 +- .../java/libcore/net/spdy/SpdyStream.java | 8 +- .../libcore/net/spdy/SpdyConnectionTest.java | 114 +++++++++++++++--- 4 files changed, 150 insertions(+), 48 deletions(-) diff --git a/src/main/java/libcore/net/spdy/SpdyConnection.java b/src/main/java/libcore/net/spdy/SpdyConnection.java index 4f84b9ef9..8737bdb5f 100644 --- a/src/main/java/libcore/net/spdy/SpdyConnection.java +++ b/src/main/java/libcore/net/spdy/SpdyConnection.java @@ -29,6 +29,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import libcore.io.Streams; import static libcore.net.spdy.Threads.newThreadFactory; @@ -162,16 +163,11 @@ public final class SpdyConnection implements Closeable { } private synchronized SpdyStream getStream(int id) { - SpdyStream stream = streams.get(id); - if (stream == null) { - // TODO: rst stream - throw new UnsupportedOperationException("TODO " + id + "; " + streams); - } - return stream; + return streams.get(id); } - synchronized void removeStream(int streamId) { - streams.remove(streamId); + synchronized SpdyStream removeStream(int streamId) { + return streams.remove(streamId); } /** @@ -185,8 +181,8 @@ public final class SpdyConnection implements Closeable { public SpdyStream newStream(List requestHeaders, boolean out, boolean in) throws IOException { int flags = (out ? 0 : FLAG_FIN) | (in ? 0 : FLAG_UNIDIRECTIONAL); - int associatedStreamId = 0; // TODO - int priority = 0; // TODO + int associatedStreamId = 0; // TODO: permit the caller to specify an associated stream. + int priority = 0; // TODO: permit the caller to specify a priority. SpdyStream stream; int streamId; @@ -211,7 +207,7 @@ public final class SpdyConnection implements Closeable { void writeSynReply(int streamId, List alternating) throws IOException { synchronized (spdyWriter) { - int flags = 0; // TODO + int flags = 0; // TODO: permit the caller to send FLAG_FIN spdyWriter.flags = flags; spdyWriter.id = streamId; spdyWriter.nameValueBlock = alternating; @@ -239,8 +235,7 @@ public final class SpdyConnection implements Closeable { void writeSynReset(int streamId, int statusCode) throws IOException { synchronized (spdyWriter) { - int flags = 0; // TODO - spdyWriter.flags = flags; + spdyWriter.flags = 0; spdyWriter.id = streamId; spdyWriter.statusCode = statusCode; spdyWriter.synReset(); @@ -294,6 +289,11 @@ public final class SpdyConnection implements Closeable { } @Override public void close() throws IOException { + close(null); + } + + private synchronized void close(Throwable reason) throws IOException { + // TODO: forward 'reason' to forced closed streams? // TODO: graceful close; send RST frames // TODO: close all streams to release waiting readers writeExecutor.shutdown(); @@ -337,12 +337,17 @@ public final class SpdyConnection implements Closeable { private class Reader implements Runnable { @Override public void run() { + Throwable failure = null; try { while (readFrame()) { } - close(); } catch (Throwable e) { - e.printStackTrace(); // TODO + failure = e; + } + + try { + close(failure); + } catch (IOException ignored) { } } @@ -355,21 +360,26 @@ public final class SpdyConnection implements Closeable { return false; case TYPE_DATA: - getStream(spdyReader.id) - .receiveData(spdyReader.in, spdyReader.flags, spdyReader.length); + SpdyStream dataStream = getStream(spdyReader.id); + if (dataStream != null) { + dataStream.receiveData(spdyReader.in, spdyReader.flags, spdyReader.length); + } else { + writeSynResetLater(spdyReader.id, SpdyStream.RST_INVALID_STREAM); + Streams.skipByReading(spdyReader.in, spdyReader.length); + } return true; case TYPE_SYN_STREAM: - final SpdyStream stream = new SpdyStream(spdyReader.id, SpdyConnection.this, + final SpdyStream synStream = new SpdyStream(spdyReader.id, SpdyConnection.this, spdyReader.nameValueBlock, spdyReader.flags); - SpdyStream previous = streams.put(spdyReader.id, stream); + SpdyStream previous = streams.put(spdyReader.id, synStream); if (previous != null) { previous.close(SpdyStream.RST_PROTOCOL_ERROR); } callbackExecutor.execute(new Runnable() { @Override public void run() { try { - handler.receive(stream); + handler.receive(synStream); } catch (IOException e) { throw new RuntimeException(e); } @@ -378,18 +388,27 @@ public final class SpdyConnection implements Closeable { return true; case TYPE_SYN_REPLY: - // TODO: honor flags - getStream(spdyReader.id).receiveReply(spdyReader.nameValueBlock); + SpdyStream replyStream = getStream(spdyReader.id); + if (replyStream != null) { + // TODO: honor incoming FLAG_FIN. + replyStream.receiveReply(spdyReader.nameValueBlock); + } else { + writeSynResetLater(spdyReader.id, SpdyStream.RST_INVALID_STREAM); + } return true; case TYPE_RST_STREAM: - getStream(spdyReader.id).receiveRstStream(spdyReader.statusCode); + SpdyStream rstStream = removeStream(spdyReader.id); + if (rstStream != null) { + rstStream.receiveRstStream(spdyReader.statusCode); + } return true; case SpdyConnection.TYPE_SETTINGS: int numberOfEntries = spdyReader.in.readInt(); - if (spdyReader.length != 4 + numberOfEntries * 8) { - // TODO: DIE + if (spdyReader.length != 4 + 8 * numberOfEntries) { + throw new IOException("TYPE_SETTINGS frame length is inconsistent: " + + spdyReader.length + " != 4 + 8 * " + numberOfEntries); } if ((spdyReader.flags & FLAG_SETTINGS_CLEAR_PREVIOUSLY_PERSISTED_SETTINGS) != 0) { clearSettings(); @@ -427,8 +446,7 @@ public final class SpdyConnection implements Closeable { throw new UnsupportedOperationException(); default: - // TODO: throw IOException here? - return false; + throw new IOException("Unexpected frame: " + Integer.toHexString(spdyReader.type)); } } } diff --git a/src/main/java/libcore/net/spdy/SpdyReader.java b/src/main/java/libcore/net/spdy/SpdyReader.java index 9dc51a492..0c79472fd 100644 --- a/src/main/java/libcore/net/spdy/SpdyReader.java +++ b/src/main/java/libcore/net/spdy/SpdyReader.java @@ -87,7 +87,7 @@ final class SpdyReader { try { w1 = in.readInt(); } catch (EOFException e) { - return SpdyConnection.TYPE_EOF; + return type = SpdyConnection.TYPE_EOF; } int w2 = in.readInt(); @@ -125,7 +125,7 @@ final class SpdyReader { } } else { id = w1 & 0x7fffffff; - return SpdyConnection.TYPE_DATA; + return type = SpdyConnection.TYPE_DATA; } } diff --git a/src/main/java/libcore/net/spdy/SpdyStream.java b/src/main/java/libcore/net/spdy/SpdyStream.java index e781dea89..605843e1b 100644 --- a/src/main/java/libcore/net/spdy/SpdyStream.java +++ b/src/main/java/libcore/net/spdy/SpdyStream.java @@ -118,9 +118,13 @@ public final class SpdyStream { /** * Returns the reason why this stream was closed, or -1 if it closed - * normally or has not yet been closed. + * normally or has not yet been closed. Valid reasons are {@link + * #RST_PROTOCOL_ERROR}, {@link #RST_INVALID_STREAM}, {@link + * #RST_REFUSED_STREAM}, {@link #RST_UNSUPPORTED_VERSION}, {@link + * #RST_CANCEL}, {@link #RST_INTERNAL_ERROR} and {@link + * #RST_FLOW_CONTROL_ERROR}. */ - public synchronized int getRstStatusCode() { // TODO: rename this? + public synchronized int getRstStatusCode() { return rstStatusCode; } diff --git a/src/test/java/libcore/net/spdy/SpdyConnectionTest.java b/src/test/java/libcore/net/spdy/SpdyConnectionTest.java index 3450d0146..9f07e438b 100644 --- a/src/test/java/libcore/net/spdy/SpdyConnectionTest.java +++ b/src/test/java/libcore/net/spdy/SpdyConnectionTest.java @@ -16,15 +16,21 @@ package libcore.net.spdy; -import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStreamReader; +import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; -import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import junit.framework.TestCase; +import static libcore.net.spdy.SpdyConnection.TYPE_PING; +import static libcore.net.spdy.SpdyConnection.TYPE_RST_STREAM; +import static libcore.net.spdy.SpdyConnection.TYPE_SYN_REPLY; +import static libcore.net.spdy.SpdyConnection.TYPE_SYN_STREAM; +import static libcore.net.spdy.SpdyStream.RST_INVALID_STREAM; + public final class SpdyConnectionTest extends TestCase { private static final IncomingStreamHandler REJECT_INCOMING_STREAMS = new IncomingStreamHandler() { @@ -51,17 +57,13 @@ public final class SpdyConnectionTest extends TestCase { // play it back SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); SpdyStream stream = connection.newStream(Arrays.asList("b", "banana"), true, true); - List responseHeaders = stream.getResponseHeaders(); - assertEquals(Arrays.asList("a", "android"), responseHeaders); - BufferedReader reader = new BufferedReader(new InputStreamReader(stream.getInputStream())); - assertEquals("robot", reader.readLine()); - assertEquals(null, reader.readLine()); - OutputStream out = stream.getOutputStream(); - out.write("c3po".getBytes("UTF-8")); - out.close(); + assertEquals(Arrays.asList("a", "android"), stream.getResponseHeaders()); + assertStreamData("robot", stream.getInputStream()); + writeAndClose(stream, "c3po"); // verify the peer received what was expected MockSpdyPeer.InFrame synStream = peer.takeFrame(); + assertEquals(TYPE_SYN_STREAM, synStream.reader.type); assertEquals(0, synStream.reader.flags); assertEquals(1, synStream.reader.id); assertEquals(0, synStream.reader.associatedStreamId); @@ -82,11 +84,14 @@ public final class SpdyConnectionTest extends TestCase { peer.play(); // play it back + final AtomicInteger receiveCount = new AtomicInteger(); IncomingStreamHandler handler = new IncomingStreamHandler() { @Override public void receive(SpdyStream stream) throws IOException { + receiveCount.incrementAndGet(); assertEquals(Arrays.asList("a", "android"), stream.getRequestHeaders()); assertEquals(-1, stream.getRstStatusCode()); stream.reply(Arrays.asList("b", "banana")); + } }; new SpdyConnection.Builder(true, peer.openSocket()) @@ -94,11 +99,13 @@ public final class SpdyConnectionTest extends TestCase { .build(); // verify the peer received what was expected - MockSpdyPeer.InFrame synStream = peer.takeFrame(); - assertEquals(0, synStream.reader.flags); - assertEquals(2, synStream.reader.id); - assertEquals(0, synStream.reader.associatedStreamId); - assertEquals(Arrays.asList("b", "banana"), synStream.reader.nameValueBlock); + MockSpdyPeer.InFrame reply = peer.takeFrame(); + assertEquals(TYPE_SYN_REPLY, reply.reader.type); + assertEquals(0, reply.reader.flags); + assertEquals(2, reply.reader.id); + assertEquals(0, reply.reader.associatedStreamId); + assertEquals(Arrays.asList("b", "banana"), reply.reader.nameValueBlock); + assertEquals(1, receiveCount.get()); } public void testServerPingsClient() throws Exception { @@ -114,6 +121,7 @@ public final class SpdyConnectionTest extends TestCase { // verify the peer received what was expected MockSpdyPeer.InFrame ping = peer.takeFrame(); + assertEquals(TYPE_PING, ping.reader.type); assertEquals(0, ping.reader.flags); assertEquals(2, ping.reader.id); } @@ -134,6 +142,7 @@ public final class SpdyConnectionTest extends TestCase { // verify the peer received what was expected MockSpdyPeer.InFrame pingFrame = peer.takeFrame(); + assertEquals(TYPE_PING, pingFrame.reader.type); assertEquals(0, pingFrame.reader.flags); assertEquals(1, pingFrame.reader.id); } @@ -148,7 +157,7 @@ public final class SpdyConnectionTest extends TestCase { peer.play(); // play it back - SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()) + new SpdyConnection.Builder(true, peer.openSocket()) .handler(REJECT_INCOMING_STREAMS) .build(); @@ -180,4 +189,75 @@ public final class SpdyConnectionTest extends TestCase { assertEquals(10, connection.peerMaxConcurrentStreams); } } + + public void testBogusDataFrameDoesNotDisruptConnection() throws Exception { + // write the mocking script + SpdyWriter unexpectedData = peer.sendFrame(); + unexpectedData.flags = SpdyConnection.FLAG_FIN; + unexpectedData.id = 42; + unexpectedData.data("bogus".getBytes("UTF-8")); + peer.acceptFrame(); // RST_STREAM + peer.sendPing(2); + peer.acceptFrame(); // PING + peer.play(); + + // play it back + new SpdyConnection.Builder(true, peer.openSocket()) + .handler(REJECT_INCOMING_STREAMS) + .build(); + + // verify the peer received what was expected + MockSpdyPeer.InFrame rstStream = peer.takeFrame(); + assertEquals(TYPE_RST_STREAM, rstStream.reader.type); + assertEquals(0, rstStream.reader.flags); + assertEquals(8, rstStream.reader.length); + assertEquals(42, rstStream.reader.id); + assertEquals(RST_INVALID_STREAM, rstStream.reader.statusCode); + MockSpdyPeer.InFrame ping = peer.takeFrame(); + assertEquals(2, ping.reader.id); + } + + public void testBogusReplyFrameDoesNotDisruptConnection() throws Exception { + // write the mocking script + SpdyWriter unexpectedReply = peer.sendFrame(); + unexpectedReply.nameValueBlock = Arrays.asList("a", "android"); + unexpectedReply.flags = 0; + unexpectedReply.id = 42; + unexpectedReply.synReply(); + peer.acceptFrame(); // RST_STREAM + peer.sendPing(2); + peer.acceptFrame(); // PING + peer.play(); + + // play it back + new SpdyConnection.Builder(true, peer.openSocket()) + .handler(REJECT_INCOMING_STREAMS) + .build(); + + // verify the peer received what was expected + MockSpdyPeer.InFrame rstStream = peer.takeFrame(); + assertEquals(TYPE_RST_STREAM, rstStream.reader.type); + assertEquals(0, rstStream.reader.flags); + assertEquals(8, rstStream.reader.length); + assertEquals(42, rstStream.reader.id); + assertEquals(RST_INVALID_STREAM, rstStream.reader.statusCode); + MockSpdyPeer.InFrame ping = peer.takeFrame(); + assertEquals(2, ping.reader.id); + } + + private void writeAndClose(SpdyStream stream, String data) throws IOException { + OutputStream out = stream.getOutputStream(); + out.write(data.getBytes("UTF-8")); + out.close(); + } + + private void assertStreamData(String expected, InputStream inputStream) throws IOException { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + byte[] buffer = new byte[1024]; + for (int count; (count = inputStream.read(buffer)) != -1; ) { + bytesOut.write(buffer, 0, count); + } + String actual = bytesOut.toString("UTF-8"); + assertEquals(expected, actual); + } }