From 768059cfc4c611f3212d0b92ff5073541a852faf Mon Sep 17 00:00:00 2001 From: jwilson Date: Sat, 29 Dec 2012 18:29:10 -0700 Subject: [PATCH] Improve SPDY error handling and concurrency. --- .../internal/net/http/SpdyTransport.java | 15 +- .../internal/net/spdy/SpdyConnection.java | 24 +-- .../okhttp/internal/net/spdy/SpdyStream.java | 70 ++++++--- .../internal/net/spdy/MockSpdyPeer.java | 24 ++- .../internal/net/spdy/SpdyConnectionTest.java | 143 ++++++++++++++++-- 5 files changed, 218 insertions(+), 58 deletions(-) diff --git a/src/main/java/com/squareup/okhttp/internal/net/http/SpdyTransport.java b/src/main/java/com/squareup/okhttp/internal/net/http/SpdyTransport.java index cbcb9a43b..af2e6f073 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/http/SpdyTransport.java +++ b/src/main/java/com/squareup/okhttp/internal/net/http/SpdyTransport.java @@ -20,7 +20,6 @@ import com.squareup.okhttp.internal.net.spdy.SpdyConnection; import com.squareup.okhttp.internal.net.spdy.SpdyStream; import java.io.IOException; import java.io.InputStream; -import java.io.InterruptedIOException; import java.io.OutputStream; import java.net.CacheRequest; import java.util.List; @@ -68,16 +67,10 @@ public final class SpdyTransport implements Transport { @Override public ResponseHeaders readResponseHeaders() throws IOException { // TODO: fix the SPDY implementation so this throws a (buffered) IOException - try { - List nameValueBlock = stream.getResponseHeaders(); - RawHeaders rawHeaders = RawHeaders.fromNameValueBlock(nameValueBlock); - rawHeaders.computeResponseStatusLineFromSpdyHeaders(); - return new ResponseHeaders(httpEngine.uri, rawHeaders); - } catch (InterruptedException e) { - InterruptedIOException rethrow = new InterruptedIOException(); - rethrow.initCause(e); - throw rethrow; - } + List nameValueBlock = stream.getResponseHeaders(); + RawHeaders rawHeaders = RawHeaders.fromNameValueBlock(nameValueBlock); + rawHeaders.computeResponseStatusLineFromSpdyHeaders(); + return new ResponseHeaders(httpEngine.uri, rawHeaders); } @Override public InputStream getTransferStream(CacheRequest cacheRequest) throws IOException { diff --git a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java index 473d3ce2c..19c5083d0 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java +++ b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyConnection.java @@ -22,6 +22,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.ProtocolException; import java.net.Socket; import java.util.HashMap; import java.util.List; @@ -301,7 +302,12 @@ public final class SpdyConnection implements Closeable { throws IOException { SpdyStream dataStream = getStream(streamId); if (dataStream != null) { - dataStream.receiveData(in, flags, length); + try { + dataStream.receiveData(in, flags, length); + } catch (ProtocolException e) { + Streams.skipByReading(in, length); + dataStream.closeLater(SpdyStream.RST_FLOW_CONTROL_ERROR); + } } else { writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM); Streams.skipByReading(in, length); @@ -317,14 +323,8 @@ public final class SpdyConnection implements Closeable { previous = streams.put(streamId, synStream); } if (previous != null) { - writeExecutor.execute(new Runnable() { - @Override public void run() { - try { - previous.close(SpdyStream.RST_PROTOCOL_ERROR); - } catch (IOException ignored) { - } - } - }); + previous.closeLater(SpdyStream.RST_PROTOCOL_ERROR); + removeStream(streamId); return; } callbackExecutor.execute(new Runnable() { @@ -343,7 +343,11 @@ public final class SpdyConnection implements Closeable { SpdyStream replyStream = getStream(streamId); if (replyStream != null) { // TODO: honor incoming FLAG_FIN. - replyStream.receiveReply(nameValueBlock); + try { + replyStream.receiveReply(nameValueBlock); + } catch (ProtocolException e) { + replyStream.closeLater(SpdyStream.RST_PROTOCOL_ERROR); + } } else { writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM); } diff --git a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java index eedaeb8f5..6b78ca381 100644 --- a/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java +++ b/src/main/java/com/squareup/okhttp/internal/net/spdy/SpdyStream.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.net.ProtocolException; import static java.nio.ByteOrder.BIG_ENDIAN; import java.util.List; @@ -101,13 +102,20 @@ public final class SpdyStream { * Returns the stream's response headers, blocking if necessary if they * have not been received yet. */ - public synchronized List getResponseHeaders() throws InterruptedException { - while (responseHeaders == null && rstStatusCode == -1) { - wait(); + public synchronized List getResponseHeaders() throws IOException { + try { + while (responseHeaders == null && rstStatusCode == -1) { + wait(); + } + if (responseHeaders != null) { + return responseHeaders; + } + throw new IOException("stream was reset: " + rstStatusCode); + } catch (InterruptedException e) { + InterruptedIOException rethrow = new InterruptedIOException(); + rethrow.initCause(e); + throw rethrow; } - // TODO: throw InterruptedIOException? - // TODO: throw if responseHeaders == null - return responseHeaders; } /** @@ -176,24 +184,41 @@ public final class SpdyStream { * Abnormally terminate this stream. */ public void close(int rstStatusCode) throws IOException { + if (!closeInternal(rstStatusCode)) { + return; // Already closed. + } + connection.writeSynReset(id, rstStatusCode); + } + + void closeLater(int rstStatusCode) { + if (!closeInternal(rstStatusCode)) { + return; // Already closed. + } + connection.writeSynResetLater(id, rstStatusCode); + } + + /** + * Returns true if this stream was closed. + */ + private boolean closeInternal(int rstStatusCode) { assert (!Thread.holdsLock(this)); synchronized (this) { // TODO: no-op if inFinished == true and outFinished == true ? if (this.rstStatusCode != -1) { - return; // Already closed. + return false; } this.rstStatusCode = rstStatusCode; in.finished = true; out.finished = true; notifyAll(); } - connection.writeSynReset(id, rstStatusCode); connection.removeStream(id); + return true; } synchronized void receiveReply(List strings) throws IOException { if (!isLocallyInitiated() || responseHeaders != null) { - throw new IOException(); // TODO: send RST + throw new ProtocolException(); } responseHeaders = strings; notifyAll(); @@ -328,21 +353,28 @@ public final class SpdyStream { void receive(InputStream in, int byteCount) throws IOException { assert (!Thread.holdsLock(SpdyStream.this)); + if (byteCount == 0) { + return; + } + int pos; int limit; int firstNewByte; + boolean finished; synchronized (SpdyStream.this) { - if (finished) { - return; // ignore this; probably a benign race - } - if (byteCount == 0) { - return; - } - if (byteCount > buffer.length - available()) { - throw new IOException(); // TODO: RST the stream - } + finished = this.finished; pos = this.pos; - firstNewByte = limit = this.limit; + firstNewByte = this.limit; + limit = this.limit; + if (byteCount > buffer.length - available()) { + throw new ProtocolException(); + } + } + + // Discard data received after the stream is finished. It's probably a benign race. + if (finished) { + Streams.skipByReading(in, byteCount); + return; } // Fill the buffer without holding any locks. First fill [limit..buffer.length) if that diff --git a/src/test/java/com/squareup/okhttp/internal/net/spdy/MockSpdyPeer.java b/src/test/java/com/squareup/okhttp/internal/net/spdy/MockSpdyPeer.java index f3dc3d07b..1f595d986 100644 --- a/src/test/java/com/squareup/okhttp/internal/net/spdy/MockSpdyPeer.java +++ b/src/test/java/com/squareup/okhttp/internal/net/spdy/MockSpdyPeer.java @@ -36,6 +36,8 @@ import java.util.concurrent.LinkedBlockingQueue; */ public final class MockSpdyPeer { private int frameCount = 0; + private final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + private final SpdyWriter spdyWriter = new SpdyWriter(bytesOut); private final List outFrames = new ArrayList(); private final BlockingQueue inFrames = new LinkedBlockingQueue(); private int port; @@ -47,9 +49,9 @@ public final class MockSpdyPeer { } public SpdyWriter sendFrame() { - OutFrame frame = new OutFrame(frameCount++); + OutFrame frame = new OutFrame(frameCount++, bytesOut.size()); outFrames.add(frame); - return new SpdyWriter(frame.out); + return spdyWriter; } public int getPort() { @@ -81,6 +83,7 @@ public final class MockSpdyPeer { InputStream in = socket.getInputStream(); Iterator outFramesIterator = outFrames.iterator(); + byte[] outBytes = bytesOut.toByteArray(); OutFrame nextOutFrame = null; for (int i = 0; i < frameCount; i++) { @@ -89,9 +92,17 @@ public final class MockSpdyPeer { } if (nextOutFrame != null && nextOutFrame.sequence == i) { + int start = nextOutFrame.start; + int end; + if (outFramesIterator.hasNext()) { + nextOutFrame = outFramesIterator.next(); + end = nextOutFrame.start; + } else { + end = outBytes.length; + } + // write a frame - nextOutFrame.out.writeTo(out); - nextOutFrame = null; + out.write(outBytes, start, end - start); } else { // read a frame @@ -109,10 +120,11 @@ public final class MockSpdyPeer { private static class OutFrame { private final int sequence; - private final ByteArrayOutputStream out = new ByteArrayOutputStream(); + private final int start; - private OutFrame(int sequence) { + private OutFrame(int sequence, int start) { this.sequence = sequence; + this.start = start; } } diff --git a/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java b/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java index 11e7724c3..bbb47f43f 100644 --- a/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java +++ b/src/test/java/com/squareup/okhttp/internal/net/spdy/SpdyConnectionTest.java @@ -25,7 +25,10 @@ import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_PING; import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_RST_STREAM; import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_SYN_REPLY; import static com.squareup.okhttp.internal.net.spdy.SpdyConnection.TYPE_SYN_STREAM; +import static com.squareup.okhttp.internal.net.spdy.SpdyStream.RST_FLOW_CONTROL_ERROR; import static com.squareup.okhttp.internal.net.spdy.SpdyStream.RST_INVALID_STREAM; +import static com.squareup.okhttp.internal.net.spdy.SpdyStream.RST_PROTOCOL_ERROR; +import static com.squareup.okhttp.internal.net.spdy.SpdyStream.RST_REFUSED_STREAM; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -483,24 +486,140 @@ public final class SpdyConnectionTest { assertEquals(SpdyConnection.FLAG_FIN, synStream.flags); } - @Test public void remoteDoubleReply() { - // We should get a PROTOCOL ERROR - // TODO + @Test public void remoteDoubleSynReply() throws Exception { + // write the mocking script + peer.acceptFrame(); + peer.sendFrame().synReply(0, 1, Arrays.asList("a", "android")); + peer.sendFrame().synReply(0, 1, Arrays.asList("b", "banana")); + peer.acceptFrame(); + peer.play(); + + // play it back + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyStream stream = connection.newStream(Arrays.asList("c", "cola"), true, true); + assertEquals(Arrays.asList("a", "android"), stream.getResponseHeaders()); + assertStreamData("", stream.getInputStream()); + + // verify the peer received what was expected + MockSpdyPeer.InFrame synStream = peer.takeFrame(); + assertEquals(TYPE_SYN_STREAM, synStream.type); + MockSpdyPeer.InFrame rstStream = peer.takeFrame(); + assertEquals(TYPE_RST_STREAM, rstStream.type); + assertEquals(1, rstStream.streamId); + assertEquals(0, rstStream.flags); + assertEquals(RST_PROTOCOL_ERROR, rstStream.statusCode); } - @Test public void remoteSendsDataAfterInFinished() { - // We have a bug where we don't fastfoward the stream - // TODO + @Test public void remoteDoubleSynStream() throws Exception { + // write the mocking script + peer.sendFrame().synStream(0, 2, 0, 0, Arrays.asList("a", "android")); + peer.acceptFrame(); + peer.sendFrame().synStream(0, 2, 0, 0, Arrays.asList("b", "banana")); + peer.acceptFrame(); + peer.play(); + + // play it back + final AtomicInteger receiveCount = new AtomicInteger(); + IncomingStreamHandler handler = new IncomingStreamHandler() { + @Override public void receive(SpdyStream stream) throws IOException { + receiveCount.incrementAndGet(); + assertEquals(Arrays.asList("a", "android"), stream.getRequestHeaders()); + assertEquals(-1, stream.getRstStatusCode()); + stream.reply(Arrays.asList("c", "cola"), true); + } + }; + new SpdyConnection.Builder(true, peer.openSocket()) + .handler(handler) + .build(); + + // verify the peer received what was expected + MockSpdyPeer.InFrame reply = peer.takeFrame(); + assertEquals(TYPE_SYN_REPLY, reply.type); + MockSpdyPeer.InFrame rstStream = peer.takeFrame(); + assertEquals(TYPE_RST_STREAM, rstStream.type); + assertEquals(2, rstStream.streamId); + assertEquals(0, rstStream.flags); + assertEquals(RST_PROTOCOL_ERROR, rstStream.statusCode); + assertEquals(1, receiveCount.intValue()); } - @Test public void remoteSendsTooMuchData() { - // We should send RST_FLOW_CONTROL_ERROR (and fastforward the stream) - // TODO + @Test public void remoteSendsDataAfterInFinished() throws Exception { + // write the mocking script + peer.acceptFrame(); + peer.sendFrame().synReply(0, 1, Arrays.asList("a", "android")); + peer.sendFrame().data(SpdyConnection.FLAG_FIN, 1, "robot".getBytes("UTF-8")); + peer.sendFrame().data(SpdyConnection.FLAG_FIN, 1, "c3po".getBytes("UTF-8")); // Ignored. + peer.sendFrame().ping(0, 2); // Ping just to make sure the stream was fastforwarded. + peer.acceptFrame(); + peer.play(); + + // play it back + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyStream stream = connection.newStream(Arrays.asList("b", "banana"), true, true); + assertEquals(Arrays.asList("a", "android"), stream.getResponseHeaders()); + assertStreamData("robot", stream.getInputStream()); + + // verify the peer received what was expected + MockSpdyPeer.InFrame synStream = peer.takeFrame(); + assertEquals(TYPE_SYN_STREAM, synStream.type); + MockSpdyPeer.InFrame ping = peer.takeFrame(); + assertEquals(TYPE_PING, ping.type); + assertEquals(2, ping.streamId); + assertEquals(0, ping.flags); } - @Test public void remoteSendsRefusedStreamBeforeReplyHeaders() { - // Calling getResponseHeaders() should throw an IOException if the stream is refused. - // TODO + @Test public void remoteSendsTooMuchData() throws Exception { + // write the mocking script + peer.acceptFrame(); + peer.sendFrame().synReply(0, 1, Arrays.asList("b", "banana")); + peer.sendFrame().data(0, 1, new byte[64 * 1024 + 1]); + peer.acceptFrame(); + peer.sendFrame().ping(0, 2); // Ping just to make sure the stream was fastforwarded. + peer.acceptFrame(); + peer.play(); + + // play it back + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyStream stream = connection.newStream(Arrays.asList("a", "android"), true, true); + assertEquals(Arrays.asList("b", "banana"), stream.getResponseHeaders()); + + // verify the peer received what was expected + MockSpdyPeer.InFrame synStream = peer.takeFrame(); + assertEquals(TYPE_SYN_STREAM, synStream.type); + MockSpdyPeer.InFrame rstStream = peer.takeFrame(); + assertEquals(TYPE_RST_STREAM, rstStream.type); + assertEquals(1, rstStream.streamId); + assertEquals(0, rstStream.flags); + assertEquals(RST_FLOW_CONTROL_ERROR, rstStream.statusCode); + MockSpdyPeer.InFrame ping = peer.takeFrame(); + assertEquals(TYPE_PING, ping.type); + assertEquals(2, ping.streamId); + } + + @Test public void remoteSendsRefusedStreamBeforeReplyHeaders() throws Exception { + // write the mocking script + peer.acceptFrame(); + peer.sendFrame().synReset(1, RST_REFUSED_STREAM); + peer.sendFrame().ping(0, 2); + peer.acceptFrame(); + peer.play(); + + // play it back + SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); + SpdyStream stream = connection.newStream(Arrays.asList("a", "android"), true, true); + try { + stream.getResponseHeaders(); + fail(); + } catch (IOException expected) { + } + + // verify the peer received what was expected + MockSpdyPeer.InFrame synStream = peer.takeFrame(); + assertEquals(TYPE_SYN_STREAM, synStream.type); + MockSpdyPeer.InFrame ping = peer.takeFrame(); + assertEquals(TYPE_PING, ping.type); + assertEquals(2, ping.streamId); + assertEquals(0, ping.flags); } private void writeAndClose(SpdyStream stream, String data) throws IOException {